Improve incomplete piece ordering, responsive download strategy

This commit is contained in:
Matt Joiner 2014-08-25 05:31:34 +10:00
parent 8f89d7a07e
commit c8f335182f
4 changed files with 96 additions and 17 deletions

View File

@ -1241,11 +1241,11 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
// Record that we have the chunk. // Record that we have the chunk.
delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec) delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement)
me.dataReady(dataSpec{t.InfoHash, req}) me.dataReady(dataSpec{t.InfoHash, req})
if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 { if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
me.queuePieceCheck(t, req.Index) me.queuePieceCheck(t, req.Index)
} }
t.PieceBytesLeftChanged(int(req.Index))
// Unprioritize the chunk. // Unprioritize the chunk.
me.downloadStrategy.TorrentGotChunk(t, req) me.downloadStrategy.TorrentGotChunk(t, req)
@ -1301,6 +1301,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
t.pendAllChunkSpecs(piece) t.pendAllChunkSpecs(piece)
} }
} }
t.PieceBytesLeftChanged(int(piece))
for _, conn := range t.Conns { for _, conn := range t.Conns {
if correct { if correct {
conn.Post(pp.Message{ conn.Post(pp.Message{

View File

@ -62,7 +62,7 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
} }
// Then finish off incomplete pieces in order of bytes remaining. // Then finish off incomplete pieces in order of bytes remaining.
for _, heatThreshold := range []int{1, 4, 15, 60} { for _, heatThreshold := range []int{1, 4, 15, 60} {
for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() { for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
pieceIndex := pp.Integer(e.Value.(int)) pieceIndex := pp.Integer(e.Value.(int))
for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() { for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
// for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs { // for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
@ -154,13 +154,14 @@ func (me *responsiveDownloadStrategy) DeleteRequest(t *torrent, r request) {
} }
func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
th := me.requestHeat[t]
requestWrapper := func(req request) bool { requestWrapper := func(req request) bool {
if c.RequestPending(req) { if c.RequestPending(req) {
return true return true
} }
again := c.Request(req) again := c.Request(req)
if c.RequestPending(req) { if c.RequestPending(req) {
me.requestHeat[t][req]++ th[req]++
} }
return again return again
} }
@ -189,22 +190,56 @@ func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
}() }()
if lastReadOffset, ok := me.lastReadOffset[t]; ok { if lastReadOffset, ok := me.lastReadOffset[t]; ok {
for off := lastReadOffset; off < lastReadOffset+chunkSize-1+me.Readahead; off += chunkSize { var nextAhead int64
for ahead := int64(0); ahead < me.Readahead; ahead = nextAhead {
off := lastReadOffset + ahead
req, ok := t.offsetRequest(off) req, ok := t.offsetRequest(off)
if !ok { if !ok {
break break
} }
if me.requestHeat[t][req] >= 2 { if !t.wantPiece(int(req.Index)) {
nextAhead = ahead + int64(t.PieceLength(req.Index))
continue continue
} }
nextAhead = ahead + int64(req.Length)
if !t.wantChunk(req) { if !t.wantChunk(req) {
continue continue
} }
if th[req] >= func() int {
// Determine allowed redundancy based on how far into the
// readahead zone we're looking.
if ahead >= (2*me.Readahead+2)/3 {
return 1
} else if ahead >= (me.Readahead+2)/3 {
return 2
} else {
return 3
}
}() {
continue
}
if !requestWrapper(req) { if !requestWrapper(req) {
return return
} }
} }
} }
// t.assertIncompletePiecesByBytesLeftOrdering()
for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
p := e.Value.(int)
if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() {
break
}
for chunkSpec := range t.Pieces[p].PendingChunkSpecs {
r := request{pp.Integer(p), chunkSpec}
if th[r] >= 2 {
continue
}
if !requestWrapper(r) {
return
}
}
}
} }
func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) { func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {

View File

@ -38,3 +38,7 @@ func (me *OrderedList) Insert(value interface{}) (ret *list.Element) {
func (me *OrderedList) Front() *list.Element { func (me *OrderedList) Front() *list.Element {
return me.list.Front() return me.list.Front()
} }
func (me *OrderedList) Remove(e *list.Element) interface{} {
return me.list.Remove(e)
}

View File

@ -44,14 +44,15 @@ type peersKey struct {
} }
type torrent struct { type torrent struct {
closed bool closing chan struct{}
InfoHash InfoHash InfoHash InfoHash
Pieces []*torrentPiece Pieces []*torrentPiece
PiecesByBytesLeft *OrderedList IncompletePiecesByBytesLeft *OrderedList
Data mmap_span.MMapSpan
length int64 length int64
// Prevent mutations to Data memory maps while in use as they're not safe. // Prevent mutations to Data memory maps while in use as they're not safe.
dataLock sync.RWMutex dataLock sync.RWMutex
Data mmap_span.MMapSpan
Info *metainfo.Info Info *metainfo.Info
Conns []*connection Conns []*connection
Peers map[peersKey]Peer Peers map[peersKey]Peer
@ -63,6 +64,34 @@ type torrent struct {
metadataHave []bool metadataHave []bool
} }
func (t *torrent) assertIncompletePiecesByBytesLeftOrdering() {
allIndexes := make(map[int]struct{}, t.NumPieces())
for i := 0; i < t.NumPieces(); i++ {
allIndexes[i] = struct{}{}
}
var lastBytesLeft int
for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
i := e.Value.(int)
if _, ok := allIndexes[i]; !ok {
panic("duplicate entry")
}
delete(allIndexes, i)
if t.Pieces[i].Complete() {
panic("complete piece")
}
bytesLeft := int(t.PieceNumPendingBytes(pp.Integer(i)))
if bytesLeft < lastBytesLeft {
panic("ordering broken")
}
lastBytesLeft = bytesLeft
}
for i := range allIndexes {
if !t.Pieces[i].Complete() {
panic("leaked incomplete piece")
}
}
}
func (t *torrent) AddPeers(pp []Peer) { func (t *torrent) AddPeers(pp []Peer) {
for _, p := range pp { for _, p := range pp {
t.Peers[peersKey{string(p.IP), p.Port}] = p t.Peers[peersKey{string(p.IP), p.Port}] = p
@ -124,7 +153,7 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
return return
} }
t.length = t.Data.Size() t.length = t.Data.Size()
t.PiecesByBytesLeft = NewList(func(a, b interface{}) bool { t.IncompletePiecesByBytesLeft = NewList(func(a, b interface{}) bool {
apb := t.PieceNumPendingBytes(pp.Integer(a.(int))) apb := t.PieceNumPendingBytes(pp.Integer(a.(int)))
bpb := t.PieceNumPendingBytes(pp.Integer(b.(int))) bpb := t.PieceNumPendingBytes(pp.Integer(b.(int)))
if apb < bpb { if apb < bpb {
@ -139,9 +168,10 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
piece := &torrentPiece{} piece := &torrentPiece{}
util.CopyExact(piece.Hash[:], hash) util.CopyExact(piece.Hash[:], hash)
t.Pieces = append(t.Pieces, piece) t.Pieces = append(t.Pieces, piece)
piece.bytesLeftElement = t.PiecesByBytesLeft.Insert(index) piece.bytesLeftElement = t.IncompletePiecesByBytesLeft.Insert(index)
t.pendAllChunkSpecs(pp.Integer(index)) t.pendAllChunkSpecs(pp.Integer(index))
} }
t.assertIncompletePiecesByBytesLeftOrdering()
for _, conn := range t.Conns { for _, conn := range t.Conns {
if err := conn.setNumPieces(t.NumPieces()); err != nil { if err := conn.setNumPieces(t.NumPieces()); err != nil {
log.Printf("closing connection: %s", err) log.Printf("closing connection: %s", err)
@ -391,10 +421,19 @@ func (t *torrent) pendAllChunkSpecs(index pp.Integer) {
for _, cs := range t.pieceChunks(int(index)) { for _, cs := range t.pieceChunks(int(index)) {
pcss[cs] = struct{}{} pcss[cs] = struct{}{}
} }
t.PiecesByBytesLeft.ValueChanged(piece.bytesLeftElement) t.IncompletePiecesByBytesLeft.ValueChanged(piece.bytesLeftElement)
return return
} }
func (t *torrent) PieceBytesLeftChanged(index int) {
p := t.Pieces[index]
if p.Complete() {
t.IncompletePiecesByBytesLeft.Remove(p.bytesLeftElement)
} else {
t.IncompletePiecesByBytesLeft.ValueChanged(p.bytesLeftElement)
}
}
type Peer struct { type Peer struct {
Id [20]byte Id [20]byte
IP net.IP IP net.IP