Keep pieces sorted by bytes left

This commit is contained in:
Matt Joiner 2014-07-10 00:26:58 +10:00
parent 5f9ed81917
commit 4c2d07337d
5 changed files with 131 additions and 31 deletions

View File

@ -1024,6 +1024,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
// Record that we have the chunk.
delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement)
if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
me.queuePieceCheck(t, req.Index)
}

View File

@ -50,16 +50,17 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
return
}
}
ppbs := t.piecesByPendingBytes()
// Then finish off incomplete pieces in order of bytes remaining.
for _, heatThreshold := range []int{0, 4, 60} {
for _, pieceIndex := range ppbs {
for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
pieceIndex := pp.Integer(e.Value.(int))
// for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
r := request{pieceIndex, chunkSpec}
if th[r] > heatThreshold {
continue
}
if !addRequest(request{pieceIndex, chunkSpec}) {
if !addRequest(r) {
return
}
}
@ -121,15 +122,16 @@ func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
}
}
// Then finish off incomplete pieces in order of bytes remaining.
for _, index := range t.piecesByPendingBytes() {
for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
index := e.Value.(int)
// Stop when we're onto untouched pieces.
if !t.PiecePartiallyDownloaded(int(index)) {
if !t.PiecePartiallyDownloaded(index) {
break
}
// Request chunks in random order to reduce overlap with other
// connections.
for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() {
if !c.Request(request{index, cs}) {
if !c.Request(request{pp.Integer(index), cs}) {
return
}
}

40
ordered.go Normal file
View File

@ -0,0 +1,40 @@
package torrent
import (
"container/list"
)
type OrderedList struct {
list *list.List
lessFunc func(a, b interface{}) bool
}
func (me *OrderedList) Len() int {
return me.list.Len()
}
func NewList(lessFunc func(a, b interface{}) bool) *OrderedList {
return &OrderedList{
list: list.New(),
lessFunc: lessFunc,
}
}
func (me *OrderedList) ValueChanged(e *list.Element) {
for prev := e.Prev(); prev != nil && me.lessFunc(e.Value, prev.Value); prev = e.Prev() {
me.list.MoveBefore(e, prev)
}
for next := e.Next(); next != nil && me.lessFunc(next.Value, e.Value); next = e.Next() {
me.list.MoveAfter(e, next)
}
}
func (me *OrderedList) Insert(value interface{}) (ret *list.Element) {
ret = me.list.PushFront(value)
me.ValueChanged(ret)
return
}
func (me *OrderedList) Front() *list.Element {
return me.list.Front()
}

47
ordered_test.go Normal file
View File

@ -0,0 +1,47 @@
package torrent
import (
"testing"
)
func TestOrderedList(t *testing.T) {
ol := NewList(func(a, b interface{}) bool {
return a.(int) < b.(int)
})
if ol.Len() != 0 {
t.FailNow()
}
e := ol.Insert(0)
if ol.Len() != 1 {
t.FailNow()
}
if e.Value.(int) != 0 {
t.FailNow()
}
e = ol.Front()
if e.Value.(int) != 0 {
t.FailNow()
}
if e.Next() != nil {
t.FailNow()
}
ol.Insert(1)
if e.Next().Value.(int) != 1 {
t.FailNow()
}
ol.Insert(-1)
if e.Prev().Value.(int) != -1 {
t.FailNow()
}
e.Value = -2
ol.ValueChanged(e)
if e.Prev() != nil {
t.FailNow()
}
if e.Next().Value.(int) != -1 {
t.FailNow()
}
if ol.Len() != 3 {
t.FailNow()
}
}

View File

@ -11,7 +11,6 @@ import (
"io"
"log"
"net"
"sort"
)
func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
@ -26,9 +25,19 @@ func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
return
}
type pieceBytesLeft struct {
Piece, BytesLeft int
}
type torrentPiece struct {
piece
bytesLeftElement *list.Element
}
type torrent struct {
InfoHash InfoHash
Pieces []*piece
Pieces []*torrentPiece
PiecesByBytesLeft *OrderedList
Data mmap_span.MMapSpan
Info *metainfo.Info
Conns []*connection
@ -89,11 +98,23 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
if err != nil {
return
}
for _, hash := range infoPieceHashes(&md) {
piece := &piece{}
t.PiecesByBytesLeft = NewList(func(a, b interface{}) bool {
apb := t.PieceNumPendingBytes(pp.Integer(a.(int)))
bpb := t.PieceNumPendingBytes(pp.Integer(b.(int)))
if apb < bpb {
return true
}
if apb > bpb {
return false
}
return a.(int) < b.(int)
})
for index, hash := range infoPieceHashes(&md) {
piece := &torrentPiece{}
copyHashSum(piece.Hash[:], []byte(hash))
t.Pieces = append(t.Pieces, piece)
t.pendAllChunkSpecs(pp.Integer(len(t.Pieces) - 1))
piece.bytesLeftElement = t.PiecesByBytesLeft.Insert(index)
t.pendAllChunkSpecs(pp.Integer(index))
}
t.Priorities = list.New()
return
@ -170,6 +191,7 @@ func (t *torrent) NewMetadataExtensionMessage(c *connection, msgType int, piece
}
func (t *torrent) WriteStatus(w io.Writer) {
fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
fmt.Fprint(w, "Pieces: ")
for index := range t.Pieces {
fmt.Fprintf(w, "%c", t.pieceStatusChar(index))
@ -251,19 +273,6 @@ func (t *torrent) Close() (err error) {
return
}
func (t *torrent) piecesByPendingBytes() (indices []pp.Integer) {
slice := pieceByBytesPendingSlice{
Pending: make([]pp.Integer, 0, len(t.Pieces)),
Indices: make([]pp.Integer, 0, len(t.Pieces)),
}
for i := range t.Pieces {
slice.Pending = append(slice.Pending, t.PieceNumPendingBytes(pp.Integer(i)))
slice.Indices = append(slice.Indices, pp.Integer(i))
}
sort.Sort(slice)
return slice.Indices
}
// Return the request that would include the given offset into the torrent data.
func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
r request, ok bool) {
@ -330,6 +339,7 @@ func (t *torrent) pendAllChunkSpecs(index pp.Integer) {
cs[c] = struct{}{}
c.Begin += c.Length
}
t.PiecesByBytesLeft.ValueChanged(piece.bytesLeftElement)
return
}