Expose request strategies

This commit is contained in:
Matt Joiner 2020-01-24 17:30:57 +11:00
parent 2044457959
commit 48eb7ff3f2
4 changed files with 41 additions and 28 deletions

View File

@ -138,7 +138,7 @@ type ClientConfig struct {
// OnQuery hook func
DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool)
DefaultRequestStrategy requestStrategyMaker
DefaultRequestStrategy RequestStrategyMaker
}
func (cfg *ClientConfig) SetListenAddr(addr string) *ClientConfig {
@ -177,7 +177,7 @@ func NewDefaultClientConfig() *ClientConfig {
ListenPort: 42069,
Logger: log.Default,
DefaultRequestStrategy: requestStrategyThreeMaker(5 * time.Second),
DefaultRequestStrategy: requestStrategyDuplicateRequestTimeout(5 * time.Second),
}
//cc.ConnTracker.SetNoMaxEntries()
//cc.ConnTracker.Timeout = func(conntrack.Entry) time.Duration { return 0 }

View File

@ -388,7 +388,7 @@ func (cn *connection) nominalMaxRequests() (ret int) {
}
// The actual value to use as the maximum outbound requests.
func (rs requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
expectingTime := int64(cn.totalExpectingTime())
if expectingTime == 0 {
expectingTime = math.MaxInt64
@ -413,10 +413,10 @@ func defaultNominalMaxRequests(cn requestStrategyConnection) int {
max(64,
cn.stats().ChunksReadUseful.Int64()-(cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64())))
}
func (rs requestStrategyOne) nominalMaxRequests(cn requestStrategyConnection) int {
func (rs requestStrategyFuzzing) nominalMaxRequests(cn requestStrategyConnection) int {
return defaultNominalMaxRequests(cn)
}
func (rs requestStrategyTwo) nominalMaxRequests(cn requestStrategyConnection) int {
func (rs requestStrategyFastest) nominalMaxRequests(cn requestStrategyConnection) int {
return defaultNominalMaxRequests(cn)
}
@ -543,7 +543,7 @@ func (cn *connection) request(r request, mw messageWriter) bool {
})
}
func (rs requestStrategyThree) onSentRequest(r request) {
func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) {
rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
rs.timeoutLocker.Lock()
delete(rs.lastRequested, r)
@ -753,7 +753,7 @@ func defaultShouldRequestWithoutBias(cn requestStrategyConnection) bool {
return false
}
func (requestStrategyTwo) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
if cn.torrent().numReaders() == 0 {
return false
}
@ -766,11 +766,11 @@ func (requestStrategyTwo) shouldRequestWithoutBias(cn requestStrategyConnection)
return false
}
func (requestStrategyOne) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
func (requestStrategyFuzzing) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
return defaultShouldRequestWithoutBias(cn)
}
func (requestStrategyThree) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
func (requestStrategyDuplicateRequestTimeout) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
return defaultShouldRequestWithoutBias(cn)
}
@ -780,7 +780,7 @@ func (cn *connection) iterPendingPieces(f func(pieceIndex) bool) bool {
}
return cn.t.requestStrategy.iterPendingPieces(cn, f)
}
func (requestStrategyThree) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool {
func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool {
return iterUnbiasedPieceRequestOrder(cn, f)
}
func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool {
@ -792,10 +792,10 @@ func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection,
})
}
}
func (rs requestStrategyOne) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
return defaultIterPendingPieces(rs, cn, cb)
}
func (rs requestStrategyTwo) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
return defaultIterPendingPieces(rs, cn, cb)
}
@ -812,7 +812,7 @@ func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool
)
}
func (rs requestStrategyThree) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
continue
@ -840,11 +840,11 @@ func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool)
})
}
func (rs requestStrategyOne) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
func (rs requestStrategyFuzzing) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
return defaultIterUndirtiedChunks(p, f)
}
func (rs requestStrategyTwo) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
func (rs requestStrategyFastest) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
return defaultIterUndirtiedChunks(p, f)
}
@ -870,7 +870,7 @@ func (cn *connection) updatePiecePriority(piece pieceIndex) bool {
return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
}
func (requestStrategyOne) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
switch tpp {
case PiecePriorityNormal:
case PiecePriorityReadahead:
@ -888,11 +888,11 @@ func defaultPiecePriority(cn requestStrategyConnection, piece pieceIndex, tpp pi
return prio
}
func (requestStrategyTwo) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
func (requestStrategyFastest) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
return defaultPiecePriority(cn, piece, tpp, prio)
}
func (requestStrategyThree) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
func (requestStrategyDuplicateRequestTimeout) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
return defaultPiecePriority(cn, piece, tpp, prio)
}

View File

@ -263,7 +263,7 @@ func (p *Piece) dirtyChunks() bitmap.Bitmap {
return p._dirtyChunks
}
func (rs requestStrategyThree) wouldDuplicateRecent(r request) bool {
func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool {
// This piece has been requested on another connection, and the duplicate request timer is still
// running.
_, ok := rs.lastRequested[r]

View File

@ -65,17 +65,31 @@ type requestStrategyCallbacks interface {
// Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across
// connections.
type requestStrategyOne struct {
type requestStrategyFuzzing struct {
requestStrategyDefaults
}
// The fastest connection downloads strictly in order of priority, while all others adhere to their
// piece inclinations.
type requestStrategyTwo struct {
type requestStrategyFastest struct {
requestStrategyDefaults
}
func (requestStrategyTwo) ShouldRequestWithoutBias(cn requestStrategyConnection) bool {
func newRequestStrategyMaker(rs requestStrategy) RequestStrategyMaker {
return func(requestStrategyCallbacks, sync.Locker) requestStrategy {
return rs
}
}
func RequestStrategyFastest() RequestStrategyMaker {
return newRequestStrategyMaker(requestStrategyFastest{})
}
func RequestStrategyFuzzing() RequestStrategyMaker {
return newRequestStrategyMaker(requestStrategyFuzzing{})
}
func (requestStrategyFastest) ShouldRequestWithoutBias(cn requestStrategyConnection) bool {
if cn.torrent().numReaders() == 0 {
return false
}
@ -90,7 +104,7 @@ func (requestStrategyTwo) ShouldRequestWithoutBias(cn requestStrategyConnection)
// Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is
// reached.
type requestStrategyThree struct {
type requestStrategyDuplicateRequestTimeout struct {
// How long to avoid duplicating a pending request.
duplicateRequestTimeout time.Duration
@ -103,11 +117,11 @@ type requestStrategyThree struct {
timeoutLocker sync.Locker
}
type requestStrategyMaker func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy
type RequestStrategyMaker func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy
func requestStrategyThreeMaker(duplicateRequestTimeout time.Duration) requestStrategyMaker {
func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duration) RequestStrategyMaker {
return func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy {
return requestStrategyThree{
return requestStrategyDuplicateRequestTimeout{
duplicateRequestTimeout: duplicateRequestTimeout,
callbacks: callbacks,
lastRequested: make(map[request]*time.Timer),
@ -116,7 +130,7 @@ func requestStrategyThreeMaker(duplicateRequestTimeout time.Duration) requestStr
}
}
func (rs requestStrategyThree) hooks() requestStrategyHooks {
func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
return requestStrategyHooks{
deletedRequest: func(r request) {
if t, ok := rs.lastRequested[r]; ok {
@ -126,5 +140,4 @@ func (rs requestStrategyThree) hooks() requestStrategyHooks {
},
sentRequest: rs.onSentRequest,
}
}