From 48eb7ff3f2e7a623915fb034dbb9c1ef1a61528e Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 24 Jan 2020 17:30:57 +1100 Subject: [PATCH] Expose request strategies --- config.go | 4 ++-- connection.go | 32 ++++++++++++++++---------------- piece.go | 2 +- request_strategy.go | 31 ++++++++++++++++++++++--------- 4 files changed, 41 insertions(+), 28 deletions(-) diff --git a/config.go b/config.go index 3fd7461a..0dffd41b 100644 --- a/config.go +++ b/config.go @@ -138,7 +138,7 @@ type ClientConfig struct { // OnQuery hook func DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool) - DefaultRequestStrategy requestStrategyMaker + DefaultRequestStrategy RequestStrategyMaker } func (cfg *ClientConfig) SetListenAddr(addr string) *ClientConfig { @@ -177,7 +177,7 @@ func NewDefaultClientConfig() *ClientConfig { ListenPort: 42069, Logger: log.Default, - DefaultRequestStrategy: requestStrategyThreeMaker(5 * time.Second), + DefaultRequestStrategy: requestStrategyDuplicateRequestTimeout(5 * time.Second), } //cc.ConnTracker.SetNoMaxEntries() //cc.ConnTracker.Timeout = func(conntrack.Entry) time.Duration { return 0 } diff --git a/connection.go b/connection.go index 4c2ebe53..1b595e7c 100644 --- a/connection.go +++ b/connection.go @@ -388,7 +388,7 @@ func (cn *connection) nominalMaxRequests() (ret int) { } // The actual value to use as the maximum outbound requests. -func (rs requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (ret int) { +func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) { expectingTime := int64(cn.totalExpectingTime()) if expectingTime == 0 { expectingTime = math.MaxInt64 @@ -413,10 +413,10 @@ func defaultNominalMaxRequests(cn requestStrategyConnection) int { max(64, cn.stats().ChunksReadUseful.Int64()-(cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64()))) } -func (rs requestStrategyOne) nominalMaxRequests(cn requestStrategyConnection) int { +func (rs requestStrategyFuzzing) nominalMaxRequests(cn requestStrategyConnection) int { return defaultNominalMaxRequests(cn) } -func (rs requestStrategyTwo) nominalMaxRequests(cn requestStrategyConnection) int { +func (rs requestStrategyFastest) nominalMaxRequests(cn requestStrategyConnection) int { return defaultNominalMaxRequests(cn) } @@ -543,7 +543,7 @@ func (cn *connection) request(r request, mw messageWriter) bool { }) } -func (rs requestStrategyThree) onSentRequest(r request) { +func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) { rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() { rs.timeoutLocker.Lock() delete(rs.lastRequested, r) @@ -753,7 +753,7 @@ func defaultShouldRequestWithoutBias(cn requestStrategyConnection) bool { return false } -func (requestStrategyTwo) shouldRequestWithoutBias(cn requestStrategyConnection) bool { +func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool { if cn.torrent().numReaders() == 0 { return false } @@ -766,11 +766,11 @@ func (requestStrategyTwo) shouldRequestWithoutBias(cn requestStrategyConnection) return false } -func (requestStrategyOne) shouldRequestWithoutBias(cn requestStrategyConnection) bool { +func (requestStrategyFuzzing) shouldRequestWithoutBias(cn requestStrategyConnection) bool { return defaultShouldRequestWithoutBias(cn) } -func (requestStrategyThree) shouldRequestWithoutBias(cn requestStrategyConnection) bool { +func (requestStrategyDuplicateRequestTimeout) shouldRequestWithoutBias(cn requestStrategyConnection) bool { return defaultShouldRequestWithoutBias(cn) } @@ -780,7 +780,7 @@ func (cn *connection) iterPendingPieces(f func(pieceIndex) bool) bool { } return cn.t.requestStrategy.iterPendingPieces(cn, f) } -func (requestStrategyThree) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool { +func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool { return iterUnbiasedPieceRequestOrder(cn, f) } func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool { @@ -792,10 +792,10 @@ func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, }) } } -func (rs requestStrategyOne) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool { +func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool { return defaultIterPendingPieces(rs, cn, cb) } -func (rs requestStrategyTwo) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool { +func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool { return defaultIterPendingPieces(rs, cn, cb) } @@ -812,7 +812,7 @@ func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool ) } -func (rs requestStrategyThree) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { +func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ { if p.dirtyChunks().Get(bitmap.BitIndex(i)) { continue @@ -840,11 +840,11 @@ func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) }) } -func (rs requestStrategyOne) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { +func (rs requestStrategyFuzzing) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { return defaultIterUndirtiedChunks(p, f) } -func (rs requestStrategyTwo) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { +func (rs requestStrategyFastest) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { return defaultIterUndirtiedChunks(p, f) } @@ -870,7 +870,7 @@ func (cn *connection) updatePiecePriority(piece pieceIndex) bool { return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias() } -func (requestStrategyOne) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { +func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { switch tpp { case PiecePriorityNormal: case PiecePriorityReadahead: @@ -888,11 +888,11 @@ func defaultPiecePriority(cn requestStrategyConnection, piece pieceIndex, tpp pi return prio } -func (requestStrategyTwo) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { +func (requestStrategyFastest) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { return defaultPiecePriority(cn, piece, tpp, prio) } -func (requestStrategyThree) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { +func (requestStrategyDuplicateRequestTimeout) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { return defaultPiecePriority(cn, piece, tpp, prio) } diff --git a/piece.go b/piece.go index d2f72fd8..80fda070 100644 --- a/piece.go +++ b/piece.go @@ -263,7 +263,7 @@ func (p *Piece) dirtyChunks() bitmap.Bitmap { return p._dirtyChunks } -func (rs requestStrategyThree) wouldDuplicateRecent(r request) bool { +func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool { // This piece has been requested on another connection, and the duplicate request timer is still // running. _, ok := rs.lastRequested[r] diff --git a/request_strategy.go b/request_strategy.go index cbc5abf4..dc7538d1 100644 --- a/request_strategy.go +++ b/request_strategy.go @@ -65,17 +65,31 @@ type requestStrategyCallbacks interface { // Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across // connections. -type requestStrategyOne struct { +type requestStrategyFuzzing struct { requestStrategyDefaults } // The fastest connection downloads strictly in order of priority, while all others adhere to their // piece inclinations. -type requestStrategyTwo struct { +type requestStrategyFastest struct { requestStrategyDefaults } -func (requestStrategyTwo) ShouldRequestWithoutBias(cn requestStrategyConnection) bool { +func newRequestStrategyMaker(rs requestStrategy) RequestStrategyMaker { + return func(requestStrategyCallbacks, sync.Locker) requestStrategy { + return rs + } +} + +func RequestStrategyFastest() RequestStrategyMaker { + return newRequestStrategyMaker(requestStrategyFastest{}) +} + +func RequestStrategyFuzzing() RequestStrategyMaker { + return newRequestStrategyMaker(requestStrategyFuzzing{}) +} + +func (requestStrategyFastest) ShouldRequestWithoutBias(cn requestStrategyConnection) bool { if cn.torrent().numReaders() == 0 { return false } @@ -90,7 +104,7 @@ func (requestStrategyTwo) ShouldRequestWithoutBias(cn requestStrategyConnection) // Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is // reached. -type requestStrategyThree struct { +type requestStrategyDuplicateRequestTimeout struct { // How long to avoid duplicating a pending request. duplicateRequestTimeout time.Duration @@ -103,11 +117,11 @@ type requestStrategyThree struct { timeoutLocker sync.Locker } -type requestStrategyMaker func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy +type RequestStrategyMaker func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy -func requestStrategyThreeMaker(duplicateRequestTimeout time.Duration) requestStrategyMaker { +func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duration) RequestStrategyMaker { return func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy { - return requestStrategyThree{ + return requestStrategyDuplicateRequestTimeout{ duplicateRequestTimeout: duplicateRequestTimeout, callbacks: callbacks, lastRequested: make(map[request]*time.Timer), @@ -116,7 +130,7 @@ func requestStrategyThreeMaker(duplicateRequestTimeout time.Duration) requestStr } } -func (rs requestStrategyThree) hooks() requestStrategyHooks { +func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks { return requestStrategyHooks{ deletedRequest: func(r request) { if t, ok := rs.lastRequested[r]; ok { @@ -126,5 +140,4 @@ func (rs requestStrategyThree) hooks() requestStrategyHooks { }, sentRequest: rs.onSentRequest, } - }