diff --git a/TODO b/TODO index 72aabfc6..d1b2c6bf 100644 --- a/TODO +++ b/TODO @@ -1,7 +1,10 @@ * Track upload and download data. - * Emulate a UDP server in the UDP tracker tests. - * Make use of sparse file regions in download data for faster hashing. - * If we're choked and interested, we never send not-interested if there's nothing we want? - * Don't announce torrents that don't need active peers. It spams UDP, fills memory, and publicizes what we have loaded. - * Randomize triedAddrs bloom filter to allow different Addr sets on each Announce. - * When lots of good connections, it'll do a huge readahead, then refuse to trickle new pieces because we sent not interested to them all, thereby reducing the number of unchoked connections. \ No newline at end of file + * Emulate a UDP server in the UDP tracker tests rather than communicating with the Internet. + * Make use of sparse file regions in download data for faster hashing. This is available as whence 3 and 4 on some OS? + * When we're choked and interested, are we not interested if there's no longer anything that we want? + * dht: Randomize triedAddrs bloom filter to allow different Addr sets on each Announce. + * dht: Verify that the triedAddrs bloom filter is working well, github's willf made a bunch of changes. + * Rearrange the local-peer choked/interested status flags to be more natural to read. + * Check that pruning is working correctly. worstConns sorting might need an adjustment to how it factors in the good/unwanted chunks ratio. + * data/blob: Deleting incomplete data triggers io.ErrUnexpectedEOF that isn't recovered from. + * Responsive reader needs to apply some readahead. \ No newline at end of file diff --git a/client.go b/client.go index 9b60a3a8..46d32791 100644 --- a/client.go +++ b/client.go @@ -257,80 +257,22 @@ func (cl *Client) WriteStatus(_w io.Writer) { } } -// Read torrent data at the given offset. Will block until it is available. -func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) { - cl.mu.Lock() - defer cl.mu.Unlock() - index := int(off / int64(t.usualPieceSize())) - // Reading outside the bounds of a file is an error. - if index < 0 { - err = os.ErrInvalid - return - } - if int(index) >= len(t.Pieces) { - err = io.EOF - return - } - pieceOff := pp.Integer(off % int64(t.usualPieceSize())) - pieceLeft := int(t.pieceLength(index) - pieceOff) - if pieceLeft <= 0 { - err = io.EOF - return - } - if len(p) > pieceLeft { - p = p[:pieceLeft] - } - if len(p) == 0 { - panic(len(p)) - } - // TODO: ReadAt should always try to fill the buffer. - for { - avail := cl.prepareRead(t, off) - if avail < int64(len(p)) { - p = p[:avail] - } - n, err = dataReadAt(t.data, p, off) - if n != 0 || err != io.ErrUnexpectedEOF { - break - } - // If we reach here, the data we thought was ready, isn't. So we - // prepare it again, and retry. - } - return -} - -// Sets priorities to download from the given offset. Returns when the piece -// at the given offset can be read. Returns the number of bytes that are -// immediately available from the offset. -func (cl *Client) prepareRead(t *torrent, off int64) (n int64) { - index := int(off / int64(t.usualPieceSize())) - // Reading outside the bounds of a file is an error. - if index < 0 || index >= t.numPieces() { - return - } - piece := t.Pieces[index] - cl.readRaisePiecePriorities(t, off) - for !t.pieceComplete(index) && !t.isClosed() { - // This is to prevent being starved if a piece is dropped before we - // can read it. - cl.readRaisePiecePriorities(t, off) - piece.Event.Wait() - } - return t.Info.Piece(index).Length() - off%t.Info.PieceLength -} - -func (T Torrent) prepareRead(off int64) (avail int64) { - T.cl.mu.Lock() - defer T.cl.mu.Unlock() - return T.cl.prepareRead(T.torrent, off) -} - -// Data implements a streaming interface that's more efficient than ReadAt. +// A Data that implements this has a streaming interface that should be +// preferred over ReadAt. For example, the data is stored in blocks on the +// network and have a fixed cost to open. type SectionOpener interface { + // Open a ReadCloser at the given offset into torrent data. n is how many + // bytes we intend to read. OpenSection(off, n int64) (io.ReadCloser, error) } func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) { + // defer func() { + // if err == io.ErrUnexpectedEOF && n != 0 { + // err = nil + // } + // }() + // log.Println("data read at", len(b), off) again: if ra, ok := d.(io.ReaderAt); ok { return ra.ReadAt(b, off) @@ -357,7 +299,7 @@ func readaheadPieces(readahead, pieceLength int64) int { return int((readahead+pieceLength-1)/pieceLength - 1) } -func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) { +func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) { index := int(off / int64(t.usualPieceSize())) cl.raisePiecePriority(t, index, piecePriorityNow) index++ @@ -365,7 +307,7 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) { return } cl.raisePiecePriority(t, index, piecePriorityNext) - for range iter.N(readaheadPieces(5*1024*1024, t.Info.PieceLength)) { + for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) { index++ if index >= t.numPieces() { break @@ -374,6 +316,30 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) { } } +func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) { + for n > 0 { + req, ok := t.offsetRequest(off) + if !ok { + break + } + if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) { + if t.urgent == nil { + t.urgent = make(map[request]struct{}, (n+chunkSize-1)/chunkSize) + } + t.urgent[req] = struct{}{} + cl.event.Broadcast() // Why? + index := int(req.Index) + cl.queueFirstHash(t, index) + cl.pieceChanged(t, index) + } + reqOff := t.requestOffset(req) + n1 := req.Length - pp.Integer(off-reqOff) + off += int64(n1) + n -= int(n1) + } + // log.Print(t.urgent) +} + func (cl *Client) configDir() string { if cl._configDir == "" { return filepath.Join(os.Getenv("HOME"), ".config/torrent") @@ -582,12 +548,12 @@ func NewClient(cfg *Config) (cl *Client, err error) { dhtCfg.Conn = cl.utpSock.PacketConn() } cl.dHT, err = dht.NewServer(dhtCfg) - if cl.ipBlockList != nil { - cl.dHT.SetIPBlockList(cl.ipBlockList) - } if err != nil { return } + if cl.ipBlockList != nil { + cl.dHT.SetIPBlockList(cl.ipBlockList) + } } return @@ -1894,6 +1860,9 @@ func (cl *Client) setStorage(t *torrent, td data.Data) (err error) { if err != nil { return } + for index := range iter.N(t.numPieces()) { + cl.pieceChanged(t, index) + } cl.startTorrent(t) return } @@ -1990,12 +1959,6 @@ func (t *torrent) addTrackers(announceList [][]string) { t.Trackers = newTrackers } -// A handle to a live torrent within a Client. -type Torrent struct { - cl *Client - *torrent -} - // Don't call this before the info is available. func (t *torrent) BytesCompleted() int64 { if !t.haveInfo() { @@ -2014,23 +1977,6 @@ func (t Torrent) Drop() { t.cl.mu.Unlock() } -// Provides access to regions of torrent data that correspond to its files. -type File struct { - t Torrent - path string - offset int64 - length int64 - fi metainfo.FileInfo -} - -func (f File) FileInfo() metainfo.FileInfo { - return f.fi -} - -func (f File) Path() string { - return f.path -} - // A file-like handle to some torrent data resource. type Handle interface { io.Reader @@ -2039,114 +1985,6 @@ type Handle interface { io.ReaderAt } -// Implements a Handle within a subsection of another Handle. -type sectionHandle struct { - h Handle - off, n, cur int64 -} - -func (me *sectionHandle) Seek(offset int64, whence int) (ret int64, err error) { - if whence == 0 { - offset += me.off - } else if whence == 2 { - whence = 0 - offset += me.off + me.n - } - ret, err = me.h.Seek(offset, whence) - me.cur = ret - ret -= me.off - return -} - -func (me *sectionHandle) Close() error { - return me.h.Close() -} - -func (me *sectionHandle) Read(b []byte) (n int, err error) { - max := me.off + me.n - me.cur - if int64(len(b)) > max { - b = b[:max] - } - n, err = me.h.Read(b) - me.cur += int64(n) - if err != nil { - return - } - if me.cur == me.off+me.n { - err = io.EOF - } - return -} - -func (me *sectionHandle) ReadAt(b []byte, off int64) (n int, err error) { - if off >= me.n { - err = io.EOF - return - } - if int64(len(b)) >= me.n-off { - b = b[:me.n-off] - } - return me.h.ReadAt(b, me.off+off) -} - -func (f File) Open() (h Handle, err error) { - h = f.t.NewReadHandle() - _, err = h.Seek(f.offset, os.SEEK_SET) - if err != nil { - h.Close() - return - } - h = §ionHandle{h, f.offset, f.Length(), f.offset} - return -} - -func (f File) ReadAt(p []byte, off int64) (n int, err error) { - maxLen := f.length - off - if int64(len(p)) > maxLen { - p = p[:maxLen] - } - return f.t.ReadAt(p, off+f.offset) -} - -func (f *File) Length() int64 { - return f.length -} - -type FilePieceState struct { - Length int64 - State byte -} - -func (f *File) Progress() (ret []FilePieceState) { - pieceSize := int64(f.t.usualPieceSize()) - off := f.offset % pieceSize - remaining := f.length - for i := int(f.offset / pieceSize); ; i++ { - if remaining == 0 { - break - } - len1 := pieceSize - off - if len1 > remaining { - len1 = remaining - } - ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)}) - off = 0 - remaining -= len1 - } - return -} - -func (f *File) PrioritizeRegion(off, len int64) { - if off < 0 || off >= f.length { - return - } - if off+len > f.length { - len = f.length - off - } - off += f.offset - f.t.SetRegionPriority(off, len) -} - // Returns handles to the files in the torrent. This requires the metainfo is // available first. func (t Torrent) Files() (ret []File) { @@ -2200,10 +2038,6 @@ func (t Torrent) DownloadAll() { t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, piecePriorityReadahead) } -func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) { - return me.cl.torrentReadAt(me.torrent, off, p) -} - // Returns nil metainfo if it isn't in the cache. Checks that the retrieved // metainfo has the correct infohash. func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) { @@ -2612,11 +2446,17 @@ func (me *Client) fillRequests(t *torrent, c *connection) { } } addRequest := func(req request) (again bool) { + // TODO: Couldn't this check also be done *after* the request? if len(c.Requests) >= 64 { return false } return c.Request(req) } + for req := range t.urgent { + if !addRequest(req) { + return + } + } for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() { pieceIndex := e.Piece() if !c.PeerHasPiece(pieceIndex) { @@ -2664,7 +2504,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er piece := t.Pieces[req.Index] // Do we actually want this chunk? - if _, ok := piece.PendingChunkSpecs[req.chunkSpec]; !ok || piece.Priority == piecePriorityNone { + if !t.wantChunk(req) { unusedDownloadedChunksCount.Add(1) c.UnwantedChunksReceived++ return nil @@ -2679,8 +2519,11 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er return fmt.Errorf("error writing chunk: %s", err) } + // log.Println("got chunk", req) + piece.Event.Broadcast() // Record that we have the chunk. delete(piece.PendingChunkSpecs, req.chunkSpec) + delete(t.urgent, req) if len(piece.PendingChunkSpecs) == 0 { for _, c := range t.Conns { c.pieceRequestOrder.DeletePiece(int(req.Index)) @@ -2717,18 +2560,24 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { me.pieceChanged(t, int(piece)) } +// TODO: Check this isn't called more than once for each piece being correct. func (me *Client) pieceChanged(t *torrent, piece int) { correct := t.pieceComplete(piece) p := t.Pieces[piece] if correct { p.Priority = piecePriorityNone p.PendingChunkSpecs = nil + for req := range t.urgent { + if int(req.Index) == piece { + delete(t.urgent, req) + } + } p.Event.Broadcast() } else { if len(p.PendingChunkSpecs) == 0 { t.pendAllChunkSpecs(int(piece)) } - if p.Priority != piecePriorityNone { + if t.wantPiece(piece) { me.openNewConns(t) } } diff --git a/client_test.go b/client_test.go index 08dbc80c..05d7c909 100644 --- a/client_test.go +++ b/client_test.go @@ -3,7 +3,6 @@ package torrent import ( "encoding/binary" "fmt" - "io" "io/ioutil" "log" "net" @@ -273,9 +272,11 @@ func TestClientTransfer(t *testing.T) { Port: util.AddrPort(seeder.ListenAddr()), }, }) - _greeting, err := ioutil.ReadAll(io.NewSectionReader(leecherGreeting, 0, leecherGreeting.Length())) + r := leecherGreeting.NewReader() + defer r.Close() + _greeting, err := ioutil.ReadAll(r) if err != nil { - t.Fatal(err) + t.Fatalf("%q %s", string(_greeting), err) } greeting := string(_greeting) if greeting != testutil.GreetingFileContents { diff --git a/connection.go b/connection.go index b365f743..25a83470 100644 --- a/connection.go +++ b/connection.go @@ -106,8 +106,13 @@ func (cn *connection) pendPiece(piece int, priority piecePriority) { return } pp := cn.piecePriorities[piece] - // Priority goes to Now, then Next in connection order. Then Readahead in - // by piece index. Then normal again by connection order. + // Priority regions not to scale. Within each region, piece is randomized + // according to connection. + + // [ Now ] + // [ Next ] + // [ Readahead ] + // [ Normal ] key := func() int { switch priority { case piecePriorityNow: diff --git a/data/blob/blob.go b/data/blob/blob.go index a3ade8e6..934bbc48 100644 --- a/data/blob/blob.go +++ b/data/blob/blob.go @@ -3,6 +3,7 @@ package blob import ( "encoding/hex" "io" + "log" "github.com/anacrolix/libtorgo/metainfo" ) @@ -19,16 +20,36 @@ func (me *data) pieceHashHex(i int) string { func (me *data) Close() {} func (me *data) ReadAt(b []byte, off int64) (n int, err error) { - p := me.info.Piece(int(off / me.info.PieceLength)) - f := me.store.pieceRead(p) - if f == nil { - err = io.ErrUnexpectedEOF - return - } - defer f.Close() - n, err = f.ReadAt(b, off%me.info.PieceLength) - if err == io.EOF { - err = io.ErrUnexpectedEOF + for len(b) != 0 { + if off >= me.info.TotalLength() { + err = io.EOF + break + } + p := me.info.Piece(int(off / me.info.PieceLength)) + f := me.store.pieceRead(p) + if f == nil { + log.Println("piece not found", p) + err = io.ErrUnexpectedEOF + break + } + b1 := b + maxN1 := int(p.Length() - off%me.info.PieceLength) + if len(b1) > maxN1 { + b1 = b1[:maxN1] + } + var n1 int + n1, err = f.ReadAt(b1, off%me.info.PieceLength) + f.Close() + n += n1 + off += int64(n1) + b = b[n1:] + if err == io.EOF { + err = nil + break + } + if err != nil { + break + } } return } diff --git a/file.go b/file.go new file mode 100644 index 00000000..1ec0d21d --- /dev/null +++ b/file.go @@ -0,0 +1,64 @@ +package torrent + +import "github.com/anacrolix/libtorgo/metainfo" + +// Provides access to regions of torrent data that correspond to its files. +type File struct { + t Torrent + path string + offset int64 + length int64 + fi metainfo.FileInfo +} + +// Data for this file begins this far into the torrent. +func (f *File) Offset() int64 { + return f.offset +} + +func (f File) FileInfo() metainfo.FileInfo { + return f.fi +} + +func (f File) Path() string { + return f.path +} + +func (f *File) Length() int64 { + return f.length +} + +type FilePieceState struct { + Length int64 + State byte +} + +func (f *File) Progress() (ret []FilePieceState) { + pieceSize := int64(f.t.usualPieceSize()) + off := f.offset % pieceSize + remaining := f.length + for i := int(f.offset / pieceSize); ; i++ { + if remaining == 0 { + break + } + len1 := pieceSize - off + if len1 > remaining { + len1 = remaining + } + ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)}) + off = 0 + remaining -= len1 + } + return +} + +func (f *File) PrioritizeRegion(off, len int64) { + if off < 0 || off >= f.length { + return + } + if off+len > f.length { + len = f.length - off + } + off += f.offset + f.t.SetRegionPriority(off, len) +} diff --git a/fs/torrentfs.go b/fs/torrentfs.go index 47fd6109..ffb9df68 100644 --- a/fs/torrentfs.go +++ b/fs/torrentfs.go @@ -91,7 +91,10 @@ func blockingRead(ctx context.Context, fs *TorrentFS, t torrent.Torrent, off int ) readDone := make(chan struct{}) go func() { - _n, _err = t.ReadAt(p, off) + r := t.NewReader() + defer r.Close() + _n, _err = r.ReadAt(p, off) + log.Println(_n, p) close(readDone) }() select { diff --git a/reader.go b/reader.go new file mode 100644 index 00000000..60357f4e --- /dev/null +++ b/reader.go @@ -0,0 +1,132 @@ +package torrent + +import ( + "errors" + "io" + "os" +) + +// Accesses torrent data via a client. +type Reader struct { + t *Torrent + pos int64 + responsive bool + readahead int64 +} + +var _ io.ReadCloser = &Reader{} + +// Don't wait for pieces to complete and be verified. Read calls return as +// soon as they can when the underlying chunks become available. +func (r *Reader) SetResponsive() { + r.responsive = true +} + +func (r *Reader) SetReadahead(readahead int64) { + r.readahead = readahead +} + +func (r *Reader) raisePriorities(off int64, n int) { + if r.responsive { + r.t.cl.addUrgentRequests(r.t.torrent, off, n) + } + r.t.cl.readRaisePiecePriorities(r.t.torrent, off, int64(n)+r.readahead) +} + +func (r *Reader) readable(off int64) (ret bool) { + // log.Println("readable", off) + // defer func() { + // log.Println("readable", ret) + // }() + req, ok := r.t.offsetRequest(off) + if !ok { + panic(off) + } + if r.responsive { + return r.t.haveChunk(req) + } + return r.t.pieceComplete(int(req.Index)) +} + +// How many bytes are available to read. Max is the most we could require. +func (r *Reader) available(off, max int64) (ret int64) { + for max > 0 { + req, ok := r.t.offsetRequest(off) + if !ok { + break + } + if !r.t.haveChunk(req) { + break + } + len1 := int64(req.Length) - (off - r.t.requestOffset(req)) + max -= len1 + ret += len1 + off += len1 + } + return +} + +func (r *Reader) waitReadable(off int64) { + r.t.Pieces[off/int64(r.t.usualPieceSize())].Event.Wait() +} + +func (r *Reader) ReadAt(b []byte, off int64) (n int, err error) { + return r.readAt(b, off) +} + +func (r *Reader) Read(b []byte) (n int, err error) { + n, err = r.readAt(b, r.pos) + r.pos += int64(n) + if n != 0 && err == io.ErrUnexpectedEOF { + err = nil + } + return +} + +func (r *Reader) readAt(b []byte, pos int64) (n int, err error) { + // defer func() { + // log.Println(pos, n, err) + // }() + r.t.cl.mu.Lock() + defer r.t.cl.mu.Unlock() + maxLen := r.t.Info.TotalLength() - pos + if maxLen <= 0 { + err = io.EOF + return + } + if int64(len(b)) > maxLen { + b = b[:maxLen] + } + r.raisePriorities(pos, len(b)) + for !r.readable(pos) { + r.raisePriorities(pos, len(b)) + r.waitReadable(pos) + } + avail := r.available(pos, int64(len(b))) + // log.Println("available", avail) + if int64(len(b)) > avail { + b = b[:avail] + } + n, err = dataReadAt(r.t.data, b, pos) + return +} + +func (r *Reader) Close() error { + r.t = nil + return nil +} + +func (r *Reader) Seek(off int64, whence int) (ret int64, err error) { + switch whence { + case os.SEEK_SET: + r.pos = off + case os.SEEK_CUR: + r.pos += off + case os.SEEK_END: + r.pos = r.t.Info.TotalLength() + off + default: + err = errors.New("bad whence") + } + ret = r.pos + return +} diff --git a/t.go b/t.go new file mode 100644 index 00000000..be35c8c6 --- /dev/null +++ b/t.go @@ -0,0 +1,17 @@ +package torrent + +// The public interface for a torrent within a Client. + +// A handle to a live torrent within a Client. +type Torrent struct { + cl *Client + *torrent +} + +func (t *Torrent) NewReader() (ret *Reader) { + ret = &Reader{ + t: t, + readahead: 5 * 1024 * 1024, + } + return +} diff --git a/torrent.go b/torrent.go index 8b5dbb71..72d0b129 100644 --- a/torrent.go +++ b/torrent.go @@ -2,12 +2,10 @@ package torrent import ( "container/heap" - "errors" "fmt" "io" "log" "net" - "os" "sort" "sync" "time" @@ -66,6 +64,9 @@ type torrent struct { InfoHash InfoHash Pieces []*piece + // Chunks that are wanted before all others. This is for + // responsive/streaming readers that want to unblock ASAP. + urgent map[request]struct{} // Total length of the torrent in bytes. Stored because it's not O(1) to // get this from the info dict. length int64 @@ -110,91 +111,6 @@ func (t *torrent) pieceComplete(piece int) bool { return t.data != nil && t.data.PieceComplete(piece) } -// A file-like handle to torrent data that implements SectionOpener. Opened -// sections will be reused so long as Reads and ReadAt's are contiguous. -type handle struct { - rc io.ReadCloser - rcOff int64 - curOff int64 - so SectionOpener - size int64 - t Torrent -} - -func (h *handle) Close() error { - if h.rc != nil { - return h.rc.Close() - } - return nil -} - -func (h *handle) ReadAt(b []byte, off int64) (n int, err error) { - return h.readAt(b, off) -} - -func (h *handle) readAt(b []byte, off int64) (n int, err error) { - avail := h.t.prepareRead(off) - if int64(len(b)) > avail { - b = b[:avail] - } - if int64(len(b)) > h.size-off { - b = b[:h.size-off] - } - if h.rcOff != off && h.rc != nil { - h.rc.Close() - h.rc = nil - } - if h.rc == nil { - h.rc, err = h.so.OpenSection(off, h.size-off) - if err != nil { - return - } - h.rcOff = off - } - n, err = h.rc.Read(b) - h.rcOff += int64(n) - return -} - -func (h *handle) Read(b []byte) (n int, err error) { - n, err = h.readAt(b, h.curOff) - h.curOff = h.rcOff - return -} - -func (h *handle) Seek(off int64, whence int) (newOff int64, err error) { - switch whence { - case os.SEEK_SET: - h.curOff = off - case os.SEEK_CUR: - h.curOff += off - case os.SEEK_END: - h.curOff = h.size + off - default: - err = errors.New("bad whence") - } - newOff = h.curOff - return -} - -// Implements Handle on top of an io.SectionReader. -type sectionReaderHandle struct { - *io.SectionReader -} - -func (sectionReaderHandle) Close() error { return nil } - -func (T Torrent) NewReadHandle() Handle { - if so, ok := T.data.(SectionOpener); ok { - return &handle{ - so: so, - size: T.Length(), - t: T, - } - } - return sectionReaderHandle{io.NewSectionReader(T, 0, T.Length())} -} - func (t *torrent) numConnsUnchoked() (num int) { for _, c := range t.Conns { if !c.PeerChoked { @@ -238,7 +154,9 @@ func (t *torrent) ceaseNetworking() { for _, c := range t.Conns { c.Close() } - t.pruneTimer.Stop() + if t.pruneTimer != nil { + t.pruneTimer.Stop() + } } func (t *torrent) addPeer(p Peer) { @@ -502,6 +420,11 @@ func (t *torrent) writeStatus(w io.Writer) { } fmt.Fprintln(w) } + fmt.Fprintf(w, "Urgent:") + for req := range t.urgent { + fmt.Fprintf(w, " %s", req) + } + fmt.Fprintln(w) fmt.Fprintf(w, "Trackers: ") for _, tier := range t.Trackers { for _, tr := range tier { @@ -647,6 +570,7 @@ func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) { func (t *torrent) bitfield() (bf []bool) { for _, p := range t.Pieces { + // TODO: Check this logic. bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0) } return @@ -732,11 +656,12 @@ func (t *torrent) havePiece(index int) bool { } func (t *torrent) haveChunk(r request) bool { - p := t.Pieces[r.Index] - if !p.EverHashed { + if !t.haveInfo() { return false } - _, ok := p.PendingChunkSpecs[r.chunkSpec] + piece := t.Pieces[r.Index] + _, ok := piece.PendingChunkSpecs[r.chunkSpec] + // log.Println("have chunk", r, !ok) return !ok } @@ -745,16 +670,41 @@ func (t *torrent) wantChunk(r request) bool { return false } _, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec] + if ok { + return true + } + _, ok = t.urgent[r] return ok } +func (t *torrent) urgentChunkInPiece(piece int) bool { + for req := range t.urgent { + if int(req.Index) == piece { + return true + } + } + return false +} + func (t *torrent) wantPiece(index int) bool { if !t.haveInfo() { return false } p := t.Pieces[index] - // Put piece complete check last, since it's the slowest! - return p.Priority != piecePriorityNone && !p.QueuedForHash && !p.Hashing && !t.pieceComplete(index) + if p.QueuedForHash { + return false + } + if p.Hashing { + return false + } + if p.Priority == piecePriorityNone { + if !t.urgentChunkInPiece(index) { + return false + } + } + // Put piece complete check last, since it's the slowest as it can involve + // calling out into external data stores. + return !t.pieceComplete(index) } func (t *torrent) connHasWantedPieces(c *connection) bool {