From 2f40c48d3744f3988b353402dd98cb12506c1448 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 19 Jan 2016 01:28:56 +1100 Subject: [PATCH] Fix the download/prioritize piece functions This involves adding a pendingPieces field to torrent. --- client.go | 114 ++++++++++++++++-------------------------------- example_test.go | 5 ++- file.go | 13 +----- t.go | 8 ++++ torrent.go | 66 ++++++++++++++++++++++++++++ 5 files changed, 116 insertions(+), 90 deletions(-) diff --git a/client.go b/client.go index de9e4f6d..32134381 100644 --- a/client.go +++ b/client.go @@ -10,7 +10,6 @@ import ( "expvar" "fmt" "io" - "io/ioutil" "log" "math/big" mathRand "math/rand" @@ -29,6 +28,7 @@ import ( "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/sync" "github.com/anacrolix/utp" + "github.com/bradfitz/iter" "github.com/edsrzf/mmap-go" "github.com/anacrolix/torrent/bencode" @@ -1963,13 +1963,9 @@ func (t Torrent) AddPeers(pp []Peer) error { func (t Torrent) DownloadAll() { t.cl.mu.Lock() defer t.cl.mu.Unlock() - for i := range iter.N(t.torrent.numPieces()) { - t.cl.raisePiecePriority(t.torrent, i, PiecePriorityNormal) + for i := range iter.N(t.torrent.Info.NumPieces()) { + t.torrent.pendPiece(i, t.cl) } - // Nice to have the first and last pieces sooner for various interactive - // purposes. - t.cl.raisePiecePriority(t.torrent, 0, PiecePriorityReadahead) - t.cl.raisePiecePriority(t.torrent, t.torrent.numPieces()-1, PiecePriorityReadahead) } // Returns nil metainfo if it isn't in the cache. Checks that the retrieved @@ -2366,59 +2362,11 @@ func (me *Client) WaitAll() bool { return true } -func (me *Client) connAddRequest(c *connection, req request) (more bool) { - if len(c.Requests) >= 64 { - return false - } - more = c.Request(req) - return -} - -func (me *Client) connRequestPiecePendingChunks(c *connection, t *torrent, piece int) (more bool) { - for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) { - req := request{pp.Integer(piece), cs} - if !me.connAddRequest(c, req) { - return false - } - } - return true -} - -func (me *Client) fillRequests(t *torrent, c *connection) { - if c.Interested { - if c.PeerChoked { - return - } - if len(c.Requests) > c.requestsLowWater { - return - } - } - if !t.forUrgentPieces(func(piece int) (again bool) { - if !c.PeerHasPiece(piece) { - return true - } - return me.connRequestPiecePendingChunks(c, t, piece) - }) { - return - } - t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { - for i := begin + 1; i < end; i++ { - if !c.PeerHasPiece(i) { - continue - } - if !me.connRequestPiecePendingChunks(c, t, i) { - return false - } - } - return true - }) -} - func (me *Client) replenishConnRequests(t *torrent, c *connection) { if !t.haveInfo() { return } - me.fillRequests(t, c) + t.fillRequests(c) if len(c.Requests) == 0 && !c.PeerChoked { // So we're not choked, but we don't want anything right now. We may // have completed readahead, and the readahead window has not rolled @@ -2547,32 +2495,44 @@ func (me *Client) pieceHashed(t *torrent, piece int, correct bool) { me.pieceChanged(t, int(piece)) } +func (me *Client) onCompletedPiece(t *torrent, piece int) { + for _, conn := range t.Conns { + conn.Have(piece) + for r := range conn.Requests { + if int(r.Index) == piece { + conn.Cancel(r) + } + } + // Could check here if peer doesn't have piece, but due to caching + // some peers may have said they have a piece but they don't. + me.upload(t, conn) + } +} + +func (me *Client) onFailedPiece(t *torrent, piece int) { + if t.pieceAllDirty(piece) { + t.pendAllChunkSpecs(piece) + } + if !t.wantPiece(piece) { + return + } + me.openNewConns(t) + for _, conn := range t.Conns { + if conn.PeerHasPiece(piece) { + me.replenishConnRequests(t, conn) + } + } +} + func (me *Client) pieceChanged(t *torrent, piece int) { correct := t.pieceComplete(piece) defer t.publishPieceChange(piece) defer me.event.Broadcast() - if !correct { - if t.pieceAllDirty(piece) { - t.pendAllChunkSpecs(piece) - } - if t.wantPiece(piece) { - me.openNewConns(t) - } + if correct { + me.onCompletedPiece(t, piece) + } else { + me.onFailedPiece(t, piece) } - for _, conn := range t.Conns { - if correct { - conn.Have(piece) - for r := range conn.Requests { - if int(r.Index) == piece { - conn.Cancel(r) - } - } - me.upload(t, conn) - } else if t.wantPiece(piece) && conn.PeerHasPiece(piece) { - me.replenishConnRequests(t, conn) - } - } - me.event.Broadcast() } func (cl *Client) verifyPiece(t *torrent, piece int) { diff --git a/example_test.go b/example_test.go index 9dc7502c..8a4377af 100644 --- a/example_test.go +++ b/example_test.go @@ -1,9 +1,10 @@ package torrent_test import ( - "io" "log" + "github.com/anacrolix/missinggo" + "github.com/anacrolix/torrent" ) @@ -26,5 +27,5 @@ func Example_fileReader() { defer r.Close() // Access the parts of the torrent pertaining to f. Data will be // downloaded as required, per the configuration of the torrent.Reader. - _ = io.NewSectionReader(r, f.Offset(), f.Length()) + _ = missinggo.NewSectionReadSeeker(r, f.Offset(), f.Length()) } diff --git a/file.go b/file.go index 06858113..8e28781b 100644 --- a/file.go +++ b/file.go @@ -75,15 +75,6 @@ func (f *File) State() (ret []FilePieceState) { return } -// Marks pieces in the region of the file for download. This is a helper -// wrapping Torrent.SetRegionPriority. -func (f *File) PrioritizeRegion(off, len int64) { - if off < 0 || off >= f.length { - return - } - if off+len > f.length { - len = f.length - off - } - off += f.offset - f.t.SetRegionPriority(off, len) +func (f *File) Download() { + f.t.DownloadPieces(f.t.torrent.byteRegionPieces(f.offset, f.length)) } diff --git a/t.go b/t.go index bf2b9e22..8fac08e7 100644 --- a/t.go +++ b/t.go @@ -137,3 +137,11 @@ func (t Torrent) deleteReader(r *Reader) { delete(t.torrent.readers, r) t.torrent.readersChanged(t.cl) } + +func (t Torrent) DownloadPieces(begin, end int) { + t.cl.mu.Lock() + defer t.cl.mu.Unlock() + for i := begin; i < end; i++ { + t.torrent.pendPiece(i, t.cl) + } +} diff --git a/torrent.go b/torrent.go index 2933c809..cb1c3fa8 100644 --- a/torrent.go +++ b/torrent.go @@ -96,6 +96,8 @@ type torrent struct { gotMetainfo chan struct{} readers map[*Reader]struct{} + + pendingPieces map[int]struct{} } var ( @@ -860,6 +862,9 @@ func (t *torrent) piecePriority(piece int) (ret piecePriority) { if t.pieceComplete(piece) { return } + if _, ok := t.pendingPieces[piece]; ok { + ret = PiecePriorityNormal + } raiseRet := func(prio piecePriority) { if prio > ret { ret = prio @@ -876,3 +881,64 @@ func (t *torrent) piecePriority(piece int) (ret piecePriority) { }) return } + +func (t *torrent) pendPiece(piece int, cl *Client) { + if t.pendingPieces == nil { + t.pendingPieces = make(map[int]struct{}, t.Info.NumPieces()) + } + if _, ok := t.pendingPieces[piece]; ok { + return + } + if t.havePiece(piece) { + return + } + t.pendingPieces[piece] = struct{}{} + for _, c := range t.Conns { + if !c.PeerHasPiece(piece) { + continue + } + + } +} + +func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) { + if !c.PeerHasPiece(piece) { + return true + } + for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) { + req := request{pp.Integer(piece), cs} + if !c.Request(req) { + return false + } + } + return true +} + +func (t *torrent) fillRequests(c *connection) { + if c.Interested { + if c.PeerChoked { + return + } + if len(c.Requests) > c.requestsLowWater { + return + } + } + if !t.forUrgentPieces(func(piece int) (again bool) { + return t.connRequestPiecePendingChunks(c, piece) + }) { + return + } + t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + for i := begin + 1; i < end; i++ { + if !t.connRequestPiecePendingChunks(c, i) { + return false + } + } + return true + }) + for i := range t.pendingPieces { + if !t.connRequestPiecePendingChunks(c, i) { + return + } + } +}