Pass tests with new full-client request strategy implementation

This commit is contained in:
Matt Joiner 2021-05-09 14:14:11 +10:00
parent 56e5d08eff
commit 0830589b0a
9 changed files with 222 additions and 447 deletions

View File

@ -81,6 +81,8 @@ type Client struct {
websocketTrackers websocketTrackers
activeAnnounceLimiter limiter.Instance
clientPieceRequestOrder
}
type ipStr string
@ -293,6 +295,8 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
},
}
go cl.requester()
return
}
@ -1139,7 +1143,6 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
webSeeds: make(map[string]*Peer),
}
t._pendingPieces.NewSet = priorityBitmapStableNewSet
t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
t.logger = cl.logger.WithContextValue(t)
t.setChunkSize(defaultChunkSize)
return

View File

@ -137,8 +137,6 @@ type ClientConfig struct {
// OnQuery hook func
DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool)
DefaultRequestStrategy requestStrategyMaker
Extensions PeerExtensionBits
DisableWebtorrent bool
@ -185,10 +183,7 @@ func NewDefaultClientConfig() *ClientConfig {
CryptoSelector: mse.DefaultCryptoSelector,
CryptoProvides: mse.AllSupportedCrypto,
ListenPort: 42069,
DefaultRequestStrategy: RequestStrategyDuplicateRequestTimeout(5 * time.Second),
Extensions: defaultPeerExtensionBytes(),
Extensions: defaultPeerExtensionBytes(),
}
//cc.ConnTracker.SetNoMaxEntries()
//cc.ConnTracker.Timeout = func(conntrack.Entry) time.Duration { return 0 }

View File

@ -20,4 +20,5 @@ type peerImpl interface {
drop()
String() string
connStatusString() string
writeBufferFull() bool
}

View File

@ -337,16 +337,16 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
cn.statusFlags(),
cn.downloadRate()/(1<<10),
)
fmt.Fprintf(w, " next pieces: %v%s\n",
iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
func() string {
if cn == t.fastestPeer {
return " (fastest)"
} else {
return ""
}
}(),
)
//fmt.Fprintf(w, " next pieces: %v%s\n",
// iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
// func() string {
// if cn == t.fastestPeer {
// return " (fastest)"
// } else {
// return ""
// }
// }(),
//)
}
func (cn *Peer) close() {
@ -402,7 +402,12 @@ func (cn *PeerConn) write(msg pp.Message) bool {
cn.wroteMsg(&msg)
cn.writeBuffer.Write(msg.MustMarshalBinary())
torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
return cn.writeBuffer.Len() < writeBufferHighWaterLen
cn.tickleWriter()
return !cn.writeBufferFull()
}
func (cn *PeerConn) writeBufferFull() bool {
return cn.writeBuffer.Len() >= writeBufferHighWaterLen
}
func (cn *PeerConn) requestMetadataPiece(index int) {
@ -440,11 +445,7 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool {
// The actual value to use as the maximum outbound requests.
func (cn *Peer) nominalMaxRequests() (ret int) {
return int(clamp(
1,
int64(cn.PeerMaxRequests),
int64(cn.t.requestStrategy.nominalMaxRequests(cn.requestStrategyConnection())),
))
return cn.PeerMaxRequests
}
func (cn *Peer) totalExpectingTime() (ret time.Duration) {
@ -528,12 +529,12 @@ func (pc *PeerConn) writeInterested(interested bool) bool {
// are okay.
type messageWriter func(pp.Message) bool
func (cn *Peer) request(r Request) bool {
func (cn *Peer) request(r Request) (more bool, err error) {
if _, ok := cn.requests[r]; ok {
panic("chunk already requested")
return true, nil
}
if !cn.peerHasPiece(pieceIndex(r.Index)) {
panic("requesting piece peer doesn't have")
return true, errors.New("requesting piece peer doesn't have")
}
if !cn.t.peerIsActive(cn) {
panic("requesting but not in active conns")
@ -545,7 +546,7 @@ func (cn *Peer) request(r Request) bool {
if cn.peerAllowedFast.Get(int(r.Index)) {
torrent.Add("allowed fast requests sent", 1)
} else {
panic("requesting while choking and not allowed fast")
return cn.setInterested(true), errors.New("requesting while choked and not allowed fast")
}
}
if cn.t.hashingPiece(pieceIndex(r.Index)) {
@ -563,12 +564,11 @@ func (cn *Peer) request(r Request) bool {
}
cn.validReceiveChunks[r]++
cn.t.pendingRequests[r]++
cn.t.requestStrategy.hooks().sentRequest(r)
cn.updateExpectingChunks()
for _, f := range cn.callbacks.SentRequest {
f(PeerRequestEvent{cn, r})
}
return cn.peerImpl.request(r)
return cn.peerImpl.request(r), nil
}
func (me *PeerConn) request(r Request) bool {
@ -584,64 +584,7 @@ func (me *PeerConn) cancel(r Request) bool {
return me.write(makeCancelMessage(r))
}
func (cn *Peer) doRequestState() bool {
if !cn.t.networkingEnabled || cn.t.dataDownloadDisallowed {
if !cn.setInterested(false) {
return false
}
if len(cn.requests) != 0 {
for r := range cn.requests {
cn.deleteRequest(r)
// log.Printf("%p: cancelling request: %v", cn, r)
if !cn.peerImpl.cancel(r) {
return false
}
}
}
} else if len(cn.requests) <= cn.requestsLowWater {
filledBuffer := false
cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
cn.iterPendingRequests(pieceIndex, func(r Request) bool {
if !cn.setInterested(true) {
filledBuffer = true
return false
}
if len(cn.requests) >= cn.nominalMaxRequests() {
return false
}
// Choking is looked at here because our interest is dependent
// on whether we'd make requests in its absence.
if cn.peerChoking {
if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) {
return false
}
}
if _, ok := cn.requests[r]; ok {
return true
}
filledBuffer = !cn.request(r)
return !filledBuffer
})
return !filledBuffer
})
if filledBuffer {
// If we didn't completely top up the requests, we shouldn't mark
// the low water, since we'll want to top up the requests as soon
// as we have more write buffer space.
return false
}
cn.requestsLowWater = len(cn.requests) / 2
if len(cn.requests) == 0 {
return cn.setInterested(false)
}
}
return true
}
func (cn *PeerConn) fillWriteBuffer() {
if !cn.doRequestState() {
return
}
if cn.pex.IsEnabled() {
if flow := cn.pex.Share(cn.write); !flow {
return
@ -743,10 +686,13 @@ func (cn *PeerConn) updateRequests() {
func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
return func(cb iter.Callback) {
for _, bm := range bms {
bm.Sub(*skip)
if !iter.All(
func(i interface{}) bool {
skip.Add(i.(int))
func(_i interface{}) bool {
i := _i.(int)
if skip.Contains(i) {
return true
}
skip.Add(i)
return cb(i)
},
bm.Iter,
@ -757,62 +703,6 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
}
}
func iterUnbiasedPieceRequestOrder(cn requestStrategyConnection, f func(piece pieceIndex) bool) bool {
now, readahead := cn.torrent().readerPiecePriorities()
skip := bitmap.Flip(cn.peerPieces(), 0, cn.torrent().numPieces())
skip.Union(cn.torrent().ignorePieces())
// Return an iterator over the different priority classes, minus the skip pieces.
return iter.All(
func(_piece interface{}) bool {
return f(pieceIndex(_piece.(bitmap.BitIndex)))
},
iterBitmapsDistinct(&skip, now, readahead),
// We have to iterate _pendingPieces separately because it isn't a Bitmap.
func(cb iter.Callback) {
cn.torrent().pendingPieces().IterTyped(func(piece int) bool {
if skip.Contains(piece) {
return true
}
more := cb(piece)
skip.Add(piece)
return more
})
},
)
}
// The connection should download highest priority pieces first, without any inclination toward
// avoiding wastage. Generally we might do this if there's a single connection, or this is the
// fastest connection, and we have active readers that signal an ordering preference. It's
// conceivable that the best connection should do this, since it's least likely to waste our time if
// assigned to the highest priority pieces, and assigning more than one this role would cause
// significant wasted bandwidth.
func (cn *Peer) shouldRequestWithoutBias() bool {
return cn.t.requestStrategy.shouldRequestWithoutBias(cn.requestStrategyConnection())
}
func (cn *Peer) iterPendingPieces(f func(pieceIndex) bool) {
if !cn.t.haveInfo() {
return
}
if cn.closed.IsSet() {
return
}
cn.t.requestStrategy.iterPendingPieces(cn, f)
}
func (cn *Peer) iterPendingPiecesUntyped(f iter.Callback) {
cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
}
func (cn *Peer) iterPendingRequests(piece pieceIndex, f func(Request) bool) bool {
return cn.t.requestStrategy.iterUndirtiedChunks(
cn.t.piece(piece).requestStrategyPiece(),
func(cs ChunkSpec) bool {
return f(Request{pp.Integer(piece), cs})
},
)
}
// check callers updaterequests
func (cn *Peer) stopRequestingPiece(piece pieceIndex) bool {
return cn._pieceRequestOrder.Remove(bitmap.BitIndex(piece))
@ -831,8 +721,7 @@ func (cn *Peer) updatePiecePriority(piece pieceIndex) bool {
return cn.stopRequestingPiece(piece)
}
prio := cn.getPieceInclination()[piece]
prio = cn.t.requestStrategy.piecePriority(cn, piece, tpp, prio)
return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio)
}
func (cn *Peer) getPieceInclination() []int {
@ -1571,7 +1460,6 @@ func (c *Peer) deleteRequest(r Request) bool {
f(PeerRequestEvent{c, r})
}
c.updateExpectingChunks()
c.t.requestStrategy.hooks().deletedRequest(r)
pr := c.t.pendingRequests
pr[r]--
n := pr[r]
@ -1722,10 +1610,6 @@ func (l connectionTrust) Less(r connectionTrust) bool {
return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less()
}
func (cn *Peer) requestStrategyConnection() requestStrategyConnection {
return cn
}
func (cn *Peer) chunksReceivedWhileExpecting() int64 {
return cn._chunksReceivedWhileExpecting
}
@ -1761,10 +1645,6 @@ func (cn *Peer) stats() *ConnStats {
return &cn._stats
}
func (cn *Peer) torrent() requestStrategyTorrent {
return cn.t.requestStrategyTorrent()
}
func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
pc, ok := p.peerImpl.(*PeerConn)
return pc, ok

View File

@ -144,7 +144,7 @@ func (p *Piece) chunkIndexSpec(chunk pp.Integer) ChunkSpec {
func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) Request {
return Request{
pp.Integer(p.index),
chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()),
p.chunkIndexSpec(chunkIndex),
}
}
@ -259,10 +259,6 @@ func (p *Piece) allChunksDirty() bool {
return p._dirtyChunks.Len() == int(p.numChunks())
}
func (p *Piece) requestStrategyPiece() requestStrategyPiece {
return p
}
func (p *Piece) dirtyChunks() bitmap.Bitmap {
return p._dirtyChunks
}
@ -270,3 +266,15 @@ func (p *Piece) dirtyChunks() bitmap.Bitmap {
func (p *Piece) State() PieceState {
return p.t.PieceState(p.index)
}
func (p *Piece) iterUndirtiedChunks(f func(ChunkSpec) bool) bool {
for i := pp.Integer(0); i < p.numChunks(); i++ {
if p.chunkIndexDirty(i) {
continue
}
if !f(p.chunkIndexSpec(i)) {
return false
}
}
return true
}

View File

@ -1,45 +1 @@
package torrent
import (
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
pp "github.com/anacrolix/torrent/peer_protocol"
)
// Provides default implementations for requestStrategy methods. Could be embedded, or delegated to.
type requestStrategyDefaults struct{}
func (requestStrategyDefaults) hooks() requestStrategyHooks {
return requestStrategyHooks{
sentRequest: func(Request) {},
deletedRequest: func(Request) {},
}
}
func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
chunkIndices := p.dirtyChunks().Copy()
chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
ci, err := chunkIndices.RB.Select(uint32(i))
if err != nil {
panic(err)
}
return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
})
}
func (requestStrategyDefaults) nominalMaxRequests(cn requestStrategyConnection) int {
return int(
max(
64,
cn.stats().ChunksReadUseful.Int64()-
(cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64())))
}
func (requestStrategyDefaults) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
return prio
}
func (requestStrategyDefaults) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
return false
}

View File

@ -1,223 +1,162 @@
package torrent
import (
"math"
"sync"
"sort"
"time"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/prioritybitmap"
"github.com/anacrolix/log"
"github.com/anacrolix/multiless"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/bradfitz/iter"
)
type requestStrategyPiece interface {
numChunks() pp.Integer
dirtyChunks() bitmap.Bitmap
chunkIndexRequest(i pp.Integer) Request
type clientPieceRequestOrder struct {
pieces []pieceRequestOrderPiece
}
type requestStrategyTorrent interface {
numConns() int
numReaders() int
numPieces() int
readerPiecePriorities() (now, readahead bitmap.Bitmap)
ignorePieces() bitmap.Bitmap
pendingPieces() *prioritybitmap.PriorityBitmap
type pieceRequestOrderPiece struct {
t *Torrent
index pieceIndex
prio piecePriority
partial bool
availability int
}
type requestStrategyConnection interface {
torrent() requestStrategyTorrent
peerPieces() bitmap.Bitmap
pieceRequestOrder() *prioritybitmap.PriorityBitmap
fastest() bool
stats() *ConnStats
totalExpectingTime() time.Duration
peerMaxRequests() int
chunksReceivedWhileExpecting() int64
}
type requestStrategy interface {
iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool
iterUndirtiedChunks(requestStrategyPiece, func(ChunkSpec) bool) bool
nominalMaxRequests(requestStrategyConnection) int
shouldRequestWithoutBias(requestStrategyConnection) bool
piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int
hooks() requestStrategyHooks
}
type requestStrategyHooks struct {
sentRequest func(Request)
deletedRequest func(Request)
}
type requestStrategyCallbacks interface {
requestTimedOut(Request)
}
type requestStrategyFuzzing struct {
requestStrategyDefaults
}
type requestStrategyFastest struct {
requestStrategyDefaults
}
func newRequestStrategyMaker(rs requestStrategy) requestStrategyMaker {
return func(requestStrategyCallbacks, sync.Locker) requestStrategy {
return rs
}
}
// The fastest connection downloads strictly in order of priority, while all others adhere to their
// piece inclinations.
func RequestStrategyFastest() requestStrategyMaker {
return newRequestStrategyMaker(requestStrategyFastest{})
}
// Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across
// connections.
func RequestStrategyFuzzing() requestStrategyMaker {
return newRequestStrategyMaker(requestStrategyFuzzing{})
}
func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
if cn.torrent().numReaders() == 0 {
return false
}
if cn.torrent().numConns() == 1 {
return true
}
if cn.fastest() {
return true
}
return false
}
type requestStrategyDuplicateRequestTimeout struct {
requestStrategyDefaults
// How long to avoid duplicating a pending request.
duplicateRequestTimeout time.Duration
callbacks requestStrategyCallbacks
// The last time we requested a chunk. Deleting the request from any connection will clear this
// value.
lastRequested map[Request]*time.Timer
// The lock to take when running a request timeout handler.
timeoutLocker sync.Locker
}
// Generates a request strategy instance for a given torrent. callbacks are probably specific to the torrent.
type requestStrategyMaker func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy
// Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is
// reached.
func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duration) requestStrategyMaker {
return func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy {
return requestStrategyDuplicateRequestTimeout{
duplicateRequestTimeout: duplicateRequestTimeout,
callbacks: callbacks,
lastRequested: make(map[Request]*time.Timer),
timeoutLocker: clientLocker,
}
}
}
func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
return requestStrategyHooks{
deletedRequest: func(r Request) {
if t, ok := rs.lastRequested[r]; ok {
t.Stop()
delete(rs.lastRequested, r)
}
},
sentRequest: rs.onSentRequest,
}
}
func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
continue
}
r := p.chunkIndexRequest(i)
if rs.wouldDuplicateRecent(r) {
continue
}
if !f(r.ChunkSpec) {
return false
}
}
return true
}
func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
switch tpp {
case PiecePriorityNormal:
case PiecePriorityReadahead:
prio -= int(cn.torrent().numPieces())
case PiecePriorityNext, PiecePriorityNow:
prio -= 2 * int(cn.torrent().numPieces())
default:
panic(tpp)
}
prio += int(piece / 3)
return prio
}
func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool {
return iterUnbiasedPieceRequestOrder(cn, f)
}
func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool {
if rs.shouldRequestWithoutBias(cn) {
return iterUnbiasedPieceRequestOrder(cn, f)
} else {
return cn.pieceRequestOrder().IterTyped(func(i int) bool {
return f(pieceIndex(i))
func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) {
for i := range iter.N(numPieces) {
me.pieces = append(me.pieces, pieceRequestOrderPiece{
t: t,
index: i,
})
}
}
func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
return defaultIterPendingPieces(rs, cn, cb)
}
func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
return defaultIterPendingPieces(rs, cn, cb)
}
func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r Request) {
rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
rs.timeoutLocker.Lock()
delete(rs.lastRequested, r)
rs.timeoutLocker.Unlock()
rs.callbacks.requestTimedOut(r)
})
}
// The actual value to use as the maximum outbound requests.
func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
expectingTime := int64(cn.totalExpectingTime())
if expectingTime == 0 {
expectingTime = math.MaxInt64
} else {
expectingTime *= 2
func (me *clientPieceRequestOrder) removePieces(t *Torrent) {
newPieces := make([]pieceRequestOrderPiece, 0, len(me.pieces)-t.numPieces())
for _, p := range me.pieces {
if p.t != t {
newPieces = append(newPieces, p)
}
}
return int(clamp(
1,
int64(cn.peerMaxRequests()),
max(
// It makes sense to always pipeline at least one connection, since latency must be
// non-zero.
2,
// Request only as many as we expect to receive in the duplicateRequestTimeout
// window. We are trying to avoid having to duplicate requests.
cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
),
))
me.pieces = newPieces
}
func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r Request) bool {
// This piece has been requested on another connection, and the duplicate request timer is still
// running.
_, ok := rs.lastRequested[r]
return ok
func (me clientPieceRequestOrder) sort() {
sort.SliceStable(me.pieces, me.less)
}
func (me clientPieceRequestOrder) update() {
for i := range me.pieces {
p := &me.pieces[i]
p.prio = p.t.piece(p.index).uncachedPriority()
p.partial = p.t.piecePartiallyDownloaded(p.index)
p.availability = p.t.pieceAvailability(p.index)
}
}
func (me clientPieceRequestOrder) less(_i, _j int) bool {
i := me.pieces[_i]
j := me.pieces[_j]
ml := multiless.New()
ml.Int(int(j.prio), int(i.prio))
ml.Bool(j.partial, i.partial)
ml.Int(i.availability, j.availability)
return ml.Less()
}
func (cl *Client) requester() {
for {
func() {
cl.lock()
defer cl.unlock()
cl.doRequests()
}()
select {
case <-cl.closed.LockedChan(cl.locker()):
return
case <-time.After(10 * time.Millisecond):
}
}
}
func (cl *Client) doRequests() {
requestOrder := clientPieceRequestOrder{}
allPeers := make(map[*Torrent][]*Peer)
storageCapacity := make(map[*Torrent]*int64)
for _, t := range cl.torrents {
// TODO: We could do metainfo requests here.
if t.haveInfo() {
value := int64(t.usualPieceSize())
storageCapacity[t] = &value
requestOrder.addPieces(t, t.numPieces())
}
var peers []*Peer
t.iterPeers(func(p *Peer) {
peers = append(peers, p)
})
allPeers[t] = peers
}
requestOrder.update()
requestOrder.sort()
for _, p := range requestOrder.pieces {
if p.t.ignorePieceForRequests(p.index) {
continue
}
peers := allPeers[p.t]
torrentPiece := p.t.piece(p.index)
if left := storageCapacity[p.t]; left != nil {
if *left < int64(torrentPiece.length()) {
continue
}
*left -= int64(torrentPiece.length())
}
p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool {
for _, peer := range peers {
req := Request{pp.Integer(p.index), chunk}
_, err := peer.request(req)
if err == nil {
log.Printf("requested %v", req)
break
}
}
return true
})
}
for _, t := range cl.torrents {
t.iterPeers(func(p *Peer) {
if !p.peerChoking && p.numLocalRequests() == 0 && !p.writeBufferFull() {
p.setInterested(false)
}
})
}
}
//func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
// chunkIndices := p.dirtyChunks().Copy()
// chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
// return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
// ci, err := chunkIndices.RB.Select(uint32(i))
// if err != nil {
// panic(err)
// }
// return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
// })
//}
//
//func iterUnbiasedPieceRequestOrder(
// cn requestStrategyConnection,
// f func(piece pieceIndex) bool,
// pieceRequestOrder []pieceIndex,
//) bool {
// cn.torrent().sortPieceRequestOrder(pieceRequestOrder)
// for _, i := range pieceRequestOrder {
// if !cn.peerHasPiece(i) || cn.torrent().ignorePieceForRequests(i) {
// continue
// }
// if !f(i) {
// return false
// }
// }
// return true
//}

View File

@ -55,9 +55,6 @@ type Torrent struct {
dataUploadDisallowed bool
userOnWriteChunkErr func(error)
// Determines what chunks to request from peers.
requestStrategy requestStrategy
closed missinggo.Event
infoHash metainfo.Hash
pieces []Piece
@ -150,6 +147,29 @@ type Torrent struct {
pex pexState
}
func (t *Torrent) pieceAvailability(i pieceIndex) (count int) {
t.iterPeers(func(peer *Peer) {
if peer.peerHasPiece(i) {
count++
}
})
return
}
func (t *Torrent) sortPieceRequestOrder(sl []pieceIndex) {
if len(sl) != t.numPieces() {
panic(len(sl))
}
availability := make([]int, len(sl))
t.iterPeers(func(peer *Peer) {
for i := range availability {
if peer.peerHasPiece(i) {
availability[i]++
}
}
})
}
func (t *Torrent) numConns() int {
return len(t.conns)
}
@ -166,15 +186,8 @@ func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
return t._readerReadaheadPieces
}
func (t *Torrent) ignorePieces() bitmap.Bitmap {
ret := t._completedPieces.Copy()
ret.Union(t.piecesQueuedForHash)
for i := 0; i < t.numPieces(); i++ {
if t.piece(i).hashing {
ret.Set(i, true)
}
}
return ret
func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
return !t.wantPieceIndex(i)
}
func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap {
@ -413,6 +426,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
// This seems to be all the follow-up tasks after info is set, that can't fail.
func (t *Torrent) onSetInfo() {
t.cl.clientPieceRequestOrder.addPieces(t, t.numPieces())
t.iterPeers(func(p *Peer) {
p.onGotInfo(t.info)
})
@ -2026,30 +2040,6 @@ func (t *Torrent) piece(i int) *Piece {
return &t.pieces[i]
}
func (t *Torrent) requestStrategyTorrent() requestStrategyTorrent {
return t
}
type torrentRequestStrategyCallbacks struct {
t *Torrent
}
func (cb torrentRequestStrategyCallbacks) requestTimedOut(r Request) {
torrent.Add("Request timeouts", 1)
cb.t.cl.lock()
defer cb.t.cl.unlock()
cb.t.iterPeers(func(cn *Peer) {
if cn.peerHasPiece(pieceIndex(r.Index)) {
cn.updateRequests()
}
})
}
func (t *Torrent) requestStrategyCallbacks() requestStrategyCallbacks {
return torrentRequestStrategyCallbacks{t}
}
func (t *Torrent) onWriteChunkErr(err error) {
if t.userOnWriteChunkErr != nil {
go t.userOnWriteChunkErr(err)
@ -2111,7 +2101,7 @@ func (t *Torrent) SetOnWriteChunkError(f func(error)) {
t.userOnWriteChunkErr = f
}
func (t *Torrent) iterPeers(f func(*Peer)) {
func (t *Torrent) iterPeers(f func(p *Peer)) {
for pc := range t.conns {
f(&pc.Peer)
}

View File

@ -24,6 +24,10 @@ type webseedPeer struct {
var _ peerImpl = (*webseedPeer)(nil)
func (me *webseedPeer) writeBufferFull() bool {
return false
}
func (me *webseedPeer) connStatusString() string {
return me.client.Url
}
@ -99,7 +103,6 @@ func (ws *webseedPeer) connectionFlags() string {
func (ws *webseedPeer) drop() {}
func (ws *webseedPeer) updateRequests() {
ws.peer.doRequestState()
}
func (ws *webseedPeer) onClose() {