diff --git a/client.go b/client.go index a513a0ae..dc862969 100644 --- a/client.go +++ b/client.go @@ -1241,11 +1241,11 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er // Record that we have the chunk. delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec) - t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement) me.dataReady(dataSpec{t.InfoHash, req}) if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 { me.queuePieceCheck(t, req.Index) } + t.PieceBytesLeftChanged(int(req.Index)) // Unprioritize the chunk. me.downloadStrategy.TorrentGotChunk(t, req) @@ -1301,6 +1301,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { t.pendAllChunkSpecs(piece) } } + t.PieceBytesLeftChanged(int(piece)) for _, conn := range t.Conns { if correct { conn.Post(pp.Message{ diff --git a/download_strategies.go b/download_strategies.go index db31bc3a..1de19a5b 100644 --- a/download_strategies.go +++ b/download_strategies.go @@ -62,7 +62,7 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { } // Then finish off incomplete pieces in order of bytes remaining. for _, heatThreshold := range []int{1, 4, 15, 60} { - for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() { + for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() { pieceIndex := pp.Integer(e.Value.(int)) for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() { // for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs { @@ -154,13 +154,14 @@ func (me *responsiveDownloadStrategy) DeleteRequest(t *torrent, r request) { } func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { + th := me.requestHeat[t] requestWrapper := func(req request) bool { if c.RequestPending(req) { return true } again := c.Request(req) if c.RequestPending(req) { - me.requestHeat[t][req]++ + th[req]++ } return again } @@ -189,22 +190,56 @@ func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { }() if lastReadOffset, ok := me.lastReadOffset[t]; ok { - for off := lastReadOffset; off < lastReadOffset+chunkSize-1+me.Readahead; off += chunkSize { + var nextAhead int64 + for ahead := int64(0); ahead < me.Readahead; ahead = nextAhead { + off := lastReadOffset + ahead req, ok := t.offsetRequest(off) if !ok { break } - if me.requestHeat[t][req] >= 2 { + if !t.wantPiece(int(req.Index)) { + nextAhead = ahead + int64(t.PieceLength(req.Index)) continue } + nextAhead = ahead + int64(req.Length) if !t.wantChunk(req) { continue } + if th[req] >= func() int { + // Determine allowed redundancy based on how far into the + // readahead zone we're looking. + if ahead >= (2*me.Readahead+2)/3 { + return 1 + } else if ahead >= (me.Readahead+2)/3 { + return 2 + } else { + return 3 + } + }() { + continue + } if !requestWrapper(req) { return } } } + + // t.assertIncompletePiecesByBytesLeftOrdering() + for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() { + p := e.Value.(int) + if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() { + break + } + for chunkSpec := range t.Pieces[p].PendingChunkSpecs { + r := request{pp.Integer(p), chunkSpec} + if th[r] >= 2 { + continue + } + if !requestWrapper(r) { + return + } + } + } } func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) { diff --git a/ordered.go b/ordered.go index 6a97da34..9c546054 100644 --- a/ordered.go +++ b/ordered.go @@ -38,3 +38,7 @@ func (me *OrderedList) Insert(value interface{}) (ret *list.Element) { func (me *OrderedList) Front() *list.Element { return me.list.Front() } + +func (me *OrderedList) Remove(e *list.Element) interface{} { + return me.list.Remove(e) +} diff --git a/torrent.go b/torrent.go index 792ce20f..07a18c48 100644 --- a/torrent.go +++ b/torrent.go @@ -44,17 +44,18 @@ type peersKey struct { } type torrent struct { - closed bool - InfoHash InfoHash - Pieces []*torrentPiece - PiecesByBytesLeft *OrderedList - Data mmap_span.MMapSpan - length int64 + closing chan struct{} + InfoHash InfoHash + Pieces []*torrentPiece + IncompletePiecesByBytesLeft *OrderedList + length int64 // Prevent mutations to Data memory maps while in use as they're not safe. dataLock sync.RWMutex - Info *metainfo.Info - Conns []*connection - Peers map[peersKey]Peer + Data mmap_span.MMapSpan + + Info *metainfo.Info + Conns []*connection + Peers map[peersKey]Peer // BEP 12 Multitracker Metadata Extension. The tracker.Client instances // mirror their respective URLs from the announce-list key. Trackers [][]tracker.Client @@ -63,6 +64,34 @@ type torrent struct { metadataHave []bool } +func (t *torrent) assertIncompletePiecesByBytesLeftOrdering() { + allIndexes := make(map[int]struct{}, t.NumPieces()) + for i := 0; i < t.NumPieces(); i++ { + allIndexes[i] = struct{}{} + } + var lastBytesLeft int + for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() { + i := e.Value.(int) + if _, ok := allIndexes[i]; !ok { + panic("duplicate entry") + } + delete(allIndexes, i) + if t.Pieces[i].Complete() { + panic("complete piece") + } + bytesLeft := int(t.PieceNumPendingBytes(pp.Integer(i))) + if bytesLeft < lastBytesLeft { + panic("ordering broken") + } + lastBytesLeft = bytesLeft + } + for i := range allIndexes { + if !t.Pieces[i].Complete() { + panic("leaked incomplete piece") + } + } +} + func (t *torrent) AddPeers(pp []Peer) { for _, p := range pp { t.Peers[peersKey{string(p.IP), p.Port}] = p @@ -124,7 +153,7 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte return } t.length = t.Data.Size() - t.PiecesByBytesLeft = NewList(func(a, b interface{}) bool { + t.IncompletePiecesByBytesLeft = NewList(func(a, b interface{}) bool { apb := t.PieceNumPendingBytes(pp.Integer(a.(int))) bpb := t.PieceNumPendingBytes(pp.Integer(b.(int))) if apb < bpb { @@ -139,9 +168,10 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte piece := &torrentPiece{} util.CopyExact(piece.Hash[:], hash) t.Pieces = append(t.Pieces, piece) - piece.bytesLeftElement = t.PiecesByBytesLeft.Insert(index) + piece.bytesLeftElement = t.IncompletePiecesByBytesLeft.Insert(index) t.pendAllChunkSpecs(pp.Integer(index)) } + t.assertIncompletePiecesByBytesLeftOrdering() for _, conn := range t.Conns { if err := conn.setNumPieces(t.NumPieces()); err != nil { log.Printf("closing connection: %s", err) @@ -391,10 +421,19 @@ func (t *torrent) pendAllChunkSpecs(index pp.Integer) { for _, cs := range t.pieceChunks(int(index)) { pcss[cs] = struct{}{} } - t.PiecesByBytesLeft.ValueChanged(piece.bytesLeftElement) + t.IncompletePiecesByBytesLeft.ValueChanged(piece.bytesLeftElement) return } +func (t *torrent) PieceBytesLeftChanged(index int) { + p := t.Pieces[index] + if p.Complete() { + t.IncompletePiecesByBytesLeft.Remove(p.bytesLeftElement) + } else { + t.IncompletePiecesByBytesLeft.ValueChanged(p.bytesLeftElement) + } +} + type Peer struct { Id [20]byte IP net.IP