Implement new request strategy

The new strategy, 2, has the fastest connection download by priority in order, and all other pieces stick to a randomized ordering that's stable per connection.
This commit is contained in:
Matt Joiner 2017-09-23 15:25:47 +10:00
parent 25d1f371c3
commit 6aad8041ab
4 changed files with 106 additions and 31 deletions

View File

@ -1223,6 +1223,7 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
maxEstablishedConns: defaultEstablishedConnsPerTorrent,
networkingEnabled: true,
requestStrategy: 2,
}
t.setChunkSize(defaultChunkSize)
return

View File

@ -211,18 +211,16 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
len(cn.PeerRequests),
cn.statusFlags(),
)
fmt.Fprintf(w, " next pieces: %v\n", priorityBitmapHeadAsSlice(&cn.pieceRequestOrder, 10))
}
func priorityBitmapHeadAsSlice(pb *prioritybitmap.PriorityBitmap, n int) (ret []int) {
pb.IterTyped(func(i int) bool {
if len(ret) >= n {
return false
}
ret = append(ret, i)
return true
})
return
roi := cn.pieceRequestOrderIter()
fmt.Fprintf(w, " next pieces: %v%s\n",
iter.ToSlice(iter.Head(10, roi)),
func() string {
if cn.shouldRequestWithoutBias() {
return " (fastest)"
} else {
return ""
}
}())
}
func (cn *connection) Close() {
@ -472,22 +470,27 @@ func nextRequestState(
networkingEnabled bool,
currentRequests map[request]struct{},
peerChoking bool,
nextPieces *prioritybitmap.PriorityBitmap,
requestPieces iter.Func,
pendingChunks func(piece int, f func(chunkSpec) bool) bool,
requestsLowWater int,
requestsHighWater int,
) (
cancelExisting bool,
newRequests []request,
interested bool,
cancelExisting bool, // Cancel all our pending requests
newRequests []request, // Chunks to request that we currently aren't
interested bool, // Whether we should indicate interest, even if we don't request anything
) {
if !networkingEnabled || nextPieces.IsEmpty() {
if !networkingEnabled {
return true, nil, false
}
if peerChoking || len(currentRequests) > requestsLowWater {
return false, nil, !nextPieces.IsEmpty()
if len(currentRequests) > requestsLowWater {
return false, nil, true
}
nextPieces.IterTyped(func(piece int) bool {
requestPieces(func(_piece interface{}) bool {
interested = true
if peerChoking {
return false
}
piece := _piece.(int)
return pendingChunks(piece, func(cs chunkSpec) bool {
r := request{pp.Integer(piece), cs}
if _, ok := currentRequests[r]; !ok {
@ -499,19 +502,68 @@ func nextRequestState(
return len(currentRequests)+len(newRequests) < requestsHighWater
})
})
return false, newRequests, true
return
}
func (cn *connection) updateRequests() {
cn.tickleWriter()
}
func iterBitmapsDistinct(skip bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
return func(cb iter.Callback) {
for _, bm := range bms {
if !iter.All(func(i interface{}) bool {
skip.Add(i.(int))
return cb(i)
}, bitmap.Sub(bm, skip).Iter) {
return
}
}
}
}
func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
now, readahead := cn.t.readerPiecePriorities()
return iterBitmapsDistinct(cn.t.completedPieces.Copy(), now, readahead, cn.t.pendingPieces)
}
// The connection should download highest priority pieces first, without any
// inclination toward avoiding wastage. Generally we might do this if there's
// a single connection, or this is the fastest connection, and we have active
// readers that signal an ordering preference. It's conceivable that the best
// connection should do this, since it's least likely to waste our time if
// assigned to the highest priority pieces, and assigning more than one this
// role would cause significant wasted bandwidth.
func (cn *connection) shouldRequestWithoutBias() bool {
if cn.t.requestStrategy != 2 {
return false
}
if len(cn.t.readers) == 0 {
return false
}
if len(cn.t.conns) == 1 {
return true
}
if cn == cn.t.fastestConn {
return true
}
return false
}
func (cn *connection) pieceRequestOrderIter() iter.Func {
if cn.shouldRequestWithoutBias() {
return cn.unbiasedPieceRequestOrder()
} else {
return cn.pieceRequestOrder.Iter
}
}
func (cn *connection) desiredRequestState() (bool, []request, bool) {
return nextRequestState(
cn.t.networkingEnabled,
cn.requests,
cn.PeerChoked,
&cn.pieceRequestOrder,
cn.pieceRequestOrderIter(),
func(piece int, f func(chunkSpec) bool) bool {
return undirtiedChunks(piece, cn.t, f)
},
@ -545,16 +597,20 @@ func (cn *connection) updatePiecePriority(piece int) bool {
return cn.stopRequestingPiece(piece)
}
prio := cn.getPieceInclination()[piece]
switch tpp {
case PiecePriorityNormal:
case PiecePriorityReadahead:
prio -= cn.t.numPieces()
case PiecePriorityNext, PiecePriorityNow:
prio -= 2 * cn.t.numPieces()
switch cn.t.requestStrategy {
case 1:
switch tpp {
case PiecePriorityNormal:
case PiecePriorityReadahead:
prio -= cn.t.numPieces()
case PiecePriorityNext, PiecePriorityNow:
prio -= 2 * cn.t.numPieces()
default:
panic(tpp)
}
prio += piece / 3
default:
panic(tpp)
}
prio += piece / 3
return cn.pieceRequestOrder.Set(piece, prio)
}
@ -966,6 +1022,10 @@ func (c *connection) receiveChunk(msg *pp.Message) {
c.UsefulChunksReceived++
c.lastUsefulChunkReceived = time.Now()
if t.fastestConn != c {
// log.Printf("setting fastest connection %p", c)
}
t.fastestConn = c
// Need to record that it hasn't been written yet, before we attempt to do
// anything with it.

View File

@ -3,6 +3,8 @@ package torrent
import (
"testing"
"github.com/anacrolix/missinggo/bitmap"
"github.com/anacrolix/missinggo/iter"
"github.com/stretchr/testify/assert"
)
@ -17,3 +19,12 @@ func TestTorrentOffsetRequest(t *testing.T) {
check(13, 5, 11, newRequest(2, 0, 3), true)
check(13, 5, 13, request{}, false)
}
func TestIterBitmapsDistinct(t *testing.T) {
var skip, first, second bitmap.Bitmap
skip.Add(1)
first.Add(1, 0, 3)
second.Add(1, 2, 0)
assert.Equal(t, []interface{}{0, 3, 2}, iter.ToSlice(iterBitmapsDistinct(skip.Copy(), first, second)))
assert.Equal(t, []int{1}, skip.ToSortedSlice())
}

View File

@ -45,6 +45,7 @@ type Torrent struct {
cl *Client
networkingEnabled bool
requestStrategy int
closed missinggo.Event
infoHash metainfo.Hash
@ -68,12 +69,14 @@ type Torrent struct {
// The info dict. nil if we don't have it (yet).
info *metainfo.Info
// Active peer connections, running message stream loops.
conns map[*connection]struct{}
maxEstablishedConns int
// Set of addrs to which we're attempting to connect. Connections are
// half-open until all handshakes are completed.
halfOpen map[string]Peer
halfOpen map[string]Peer
fastestConn *connection
// Reserve of peers to connect to. A peer can be both here and in the
// active connections if were told about the peer after connecting with