Implement writing of chunks as they arrive

This commit is contained in:
Matt Joiner 2013-10-02 19:55:03 +10:00
parent 381c6bef68
commit 39660d4e9d
1 changed files with 20 additions and 11 deletions

View File

@ -174,6 +174,11 @@ type torrent struct {
Peers []Peer Peers []Peer
} }
func (t *torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
_, err = t.Data.WriteAt(data, int64(piece)*t.MetaInfo.PieceLength+begin)
return
}
func (t *torrent) bitfield() (bf []bool) { func (t *torrent) bitfield() (bf []bool) {
for _, p := range t.Pieces { for _, p := range t.Pieces {
bf = append(bf, p.State == pieceStateComplete) bf = append(bf, p.State == pieceStateComplete)
@ -500,22 +505,24 @@ func (me *client) runConnection(torrent *torrent, conn *connection) error {
} }
} }
case peer_protocol.Piece: case peer_protocol.Piece:
request := request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}} request_ := request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
if _, ok := conn.Requests[request]; !ok { if _, ok := conn.Requests[request_]; !ok {
err = errors.New("unexpected piece") err = errors.New("unexpected piece")
break break
} }
delete(conn.Requests, request) delete(conn.Requests, request_)
me.replenishConnRequests(torrent, conn) if _, ok := torrent.Pieces[request_.Index].PendingChunkSpecs[request_.chunkSpec]; !ok {
if _, ok := torrent.Pieces[request.Index].PendingChunkSpecs[request.chunkSpec]; !ok { log.Printf("got unnecessary chunk: %s", request_)
log.Printf("got unnecessary chunk: %s", request)
break break
} }
log.Printf("need to implement write of chunk: %s", request) err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.chunkSpec) if err != nil {
if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 { break
torrent.Pieces[request.Index].State = pieceStateUnknown }
go me.verifyPiece(torrent, int(request.Index)) delete(torrent.Pieces[request_.Index].PendingChunkSpecs, request_.chunkSpec)
if len(torrent.Pieces[request_.Index].PendingChunkSpecs) == 0 {
torrent.Pieces[request_.Index].State = pieceStateUnknown
go me.verifyPiece(torrent, int(request_.Index))
} }
default: default:
log.Printf("received unknown message type: %#v", msg.Type) log.Printf("received unknown message type: %#v", msg.Type)
@ -523,7 +530,9 @@ func (me *client) runConnection(torrent *torrent, conn *connection) error {
if err != nil { if err != nil {
log.Print(err) log.Print(err)
me.dropConnection(torrent, conn) me.dropConnection(torrent, conn)
return
} }
me.replenishConnRequests(torrent, conn)
}) })
} }
} }