diff --git a/client.go b/client.go index 0dab418c..fddddf48 100644 --- a/client.go +++ b/client.go @@ -1039,11 +1039,10 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) ( maxEstablishedConns: cl.config.EstablishedConnsPerTorrent, networkingEnabled: true, - requestStrategy: requestStrategyOne{}, + requestStrategy: cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks()), metadataChanged: sync.Cond{ L: cl.locker(), }, - _duplicateRequestTimeout: 1 * time.Second, } t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string { return fmt.Sprintf("%v: %s", t, m.Text()) diff --git a/config.go b/config.go index 0c23ec46..3fd7461a 100644 --- a/config.go +++ b/config.go @@ -137,6 +137,8 @@ type ClientConfig struct { // OnQuery hook func DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool) + + DefaultRequestStrategy requestStrategyMaker } func (cfg *ClientConfig) SetListenAddr(addr string) *ClientConfig { @@ -174,6 +176,8 @@ func NewDefaultClientConfig() *ClientConfig { CryptoProvides: mse.AllSupportedCrypto, ListenPort: 42069, Logger: log.Default, + + DefaultRequestStrategy: requestStrategyThreeMaker(5 * time.Second), } //cc.ConnTracker.SetNoMaxEntries() //cc.ConnTracker.Timeout = func(conntrack.Entry) time.Duration { return 0 } diff --git a/connection.go b/connection.go index 24c15b92..0c25e343 100644 --- a/connection.go +++ b/connection.go @@ -388,7 +388,7 @@ func (cn *connection) nominalMaxRequests() (ret int) { } // The actual value to use as the maximum outbound requests. -func (requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (ret int) { +func (rs requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (ret int) { expectingTime := int64(cn.totalExpectingTime()) if expectingTime == 0 { expectingTime = math.MaxInt64 @@ -404,7 +404,7 @@ func (requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (re 2, // Request only as many as we expect to receive in the duplicateRequestTimeout // window. We are trying to avoid having to duplicate requests. - cn.chunksReceivedWhileExpecting()*int64(cn.torrent().duplicateRequestTimeout())/expectingTime, + cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime, ), )) } @@ -533,17 +533,7 @@ func (cn *connection) request(r request, mw messageWriter) bool { } cn.validReceiveChunks[r] = struct{}{} cn.t.pendingRequests[r]++ - cn.t.lastRequested[r] = time.AfterFunc(cn.t._duplicateRequestTimeout, func() { - torrent.Add("duplicate request timeouts", 1) - cn.mu().Lock() - defer cn.mu().Unlock() - delete(cn.t.lastRequested, r) - for cn := range cn.t.conns { - if cn.PeerHasPiece(pieceIndex(r.Index)) { - cn.updateRequests() - } - } - }) + cn.t.requestStrategy.hooks().sentRequest(r) cn.updateExpectingChunks() return mw(pp.Message{ Type: pp.Request, @@ -553,6 +543,13 @@ func (cn *connection) request(r request, mw messageWriter) bool { }) } +func (rs requestStrategyThree) onSentRequest(r request) { + rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() { + delete(rs.lastRequested, r) + rs.callbacks.requestTimedOut(r) + }) +} + func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) { if !cn.t.networkingEnabled { if !cn.SetInterested(false, msg) { @@ -805,22 +802,24 @@ func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) { } func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool) bool { - return cn.t.requestStrategy.iterUndirtiedChunks(cn.t.piece(piece).requestStrategyPiece(), + return cn.t.requestStrategy.iterUndirtiedChunks( + cn.t.piece(piece).requestStrategyPiece(), func(cs chunkSpec) bool { return f(request{pp.Integer(piece), cs}) - }) + }, + ) } -func (requestStrategyThree) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { +func (rs requestStrategyThree) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ { if p.dirtyChunks().Get(bitmap.BitIndex(i)) { continue } - ci := p.chunkIndexSpec(i) - if p.wouldDuplicateRecent(ci) { + r := p.chunkIndexRequest(i) + if rs.wouldDuplicateRecent(r) { continue } - if !f(p.chunkIndexSpec(i)) { + if !f(r.chunkSpec) { return false } } @@ -835,7 +834,7 @@ func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) if err != nil { panic(err) } - return f(p.chunkIndexSpec(pp.Integer(ci))) + return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec) }) } @@ -1524,10 +1523,7 @@ func (c *connection) deleteRequest(r request) bool { } delete(c.requests, r) c.updateExpectingChunks() - if t, ok := c.t.lastRequested[r]; ok { - t.Stop() - delete(c.t.lastRequested, r) - } + c.t.requestStrategy.hooks().deletedRequest(r) pr := c.t.pendingRequests pr[r]-- n := pr[r] diff --git a/go.mod b/go.mod index 3c480385..5f88dc10 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/anacrolix/log v0.4.0 github.com/anacrolix/missinggo v1.2.1 github.com/anacrolix/missinggo/perf v1.0.0 - github.com/anacrolix/missinggo/v2 v2.3.2-0.20200109120848-ad7ce9a1247a + github.com/anacrolix/missinggo/v2 v2.3.2-0.20200110051601-fc3212fb3984 github.com/anacrolix/multiless v0.0.0-20191223025854-070b7994e841 github.com/anacrolix/sync v0.2.0 github.com/anacrolix/tagflag v1.0.1 diff --git a/go.sum b/go.sum index 5786142a..29f47b5b 100644 --- a/go.sum +++ b/go.sum @@ -65,6 +65,8 @@ github.com/anacrolix/missinggo/v2 v2.2.1-0.20191103010835-12360f38ced0/go.mod h1 github.com/anacrolix/missinggo/v2 v2.3.0/go.mod h1:ZzG3/cc3t+5zcYWAgYrJW0MBsSwNwOkTlNquBbP51Bc= github.com/anacrolix/missinggo/v2 v2.3.2-0.20200109120848-ad7ce9a1247a h1:lYHFRvNiiBBGyreXVkcIztKyru+xAQJzg8AKaj/85TQ= github.com/anacrolix/missinggo/v2 v2.3.2-0.20200109120848-ad7ce9a1247a/go.mod h1:sjPqWXxdr3jWcMO/tXhhshXAaiTkGIgJpN93clGzGr8= +github.com/anacrolix/missinggo/v2 v2.3.2-0.20200110051601-fc3212fb3984 h1:/pEakYOx8jjk2HLIYEA/rTVXoU0Q/JA7Ojv/6X9iIkI= +github.com/anacrolix/missinggo/v2 v2.3.2-0.20200110051601-fc3212fb3984/go.mod h1:sjPqWXxdr3jWcMO/tXhhshXAaiTkGIgJpN93clGzGr8= github.com/anacrolix/mmsg v0.0.0-20180515031531-a4a3ba1fc8bb h1:2Or5ccMoY4Kfao+WdL2w6tpY6ZEe+2VTVbIPd7A/Ajk= github.com/anacrolix/mmsg v0.0.0-20180515031531-a4a3ba1fc8bb/go.mod h1:x2/ErsYUmT77kezS63+wzZp8E3byYB0gzirM/WMBLfw= github.com/anacrolix/mmsg v1.0.0 h1:btC7YLjOn29aTUAExJiVUhQOuf/8rhm+/nWCMAnL3Hg= diff --git a/piece.go b/piece.go index af4385c3..d2f72fd8 100644 --- a/piece.go +++ b/piece.go @@ -145,6 +145,13 @@ func (p *Piece) chunkIndexSpec(chunk pp.Integer) chunkSpec { return chunkIndexSpec(chunk, p.length(), p.chunkSize()) } +func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) request { + return request{ + pp.Integer(p.index), + chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()), + } +} + func (p *Piece) numDirtyBytes() (ret pp.Integer) { // defer func() { // if ret > p.length() { @@ -256,9 +263,9 @@ func (p *Piece) dirtyChunks() bitmap.Bitmap { return p._dirtyChunks } -func (p *Piece) wouldDuplicateRecent(cs chunkSpec) bool { +func (rs requestStrategyThree) wouldDuplicateRecent(r request) bool { // This piece has been requested on another connection, and the duplicate request timer is still // running. - _, ok := p.t.lastRequested[request{pp.Integer(p.index), cs}] + _, ok := rs.lastRequested[r] return ok } diff --git a/request_strategy.go b/request_strategy.go index a09932a3..e2e2c128 100644 --- a/request_strategy.go +++ b/request_strategy.go @@ -12,8 +12,7 @@ import ( type requestStrategyPiece interface { numChunks() pp.Integer dirtyChunks() bitmap.Bitmap - chunkIndexSpec(i pp.Integer) chunkSpec - wouldDuplicateRecent(chunkSpec) bool + chunkIndexRequest(i pp.Integer) request } type requestStrategyTorrent interface { @@ -23,7 +22,6 @@ type requestStrategyTorrent interface { readerPiecePriorities() (now, readahead bitmap.Bitmap) ignorePieces() bitmap.Bitmap pendingPieces() *prioritybitmap.PriorityBitmap - duplicateRequestTimeout() time.Duration } type requestStrategyConnection interface { @@ -43,6 +41,16 @@ type requestStrategy interface { nominalMaxRequests(requestStrategyConnection) int shouldRequestWithoutBias(requestStrategyConnection) bool piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int + hooks() requestStrategyHooks +} + +type requestStrategyHooks struct { + sentRequest func(request) + deletedRequest func(request) +} + +type requestStrategyCallbacks interface { + requestTimedOut(request) } // Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across @@ -69,4 +77,43 @@ func (requestStrategyTwo) ShouldRequestWithoutBias(cn requestStrategyConnection) // Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is // reached. type requestStrategyThree struct { + // How long to avoid duplicating a pending request. + duplicateRequestTimeout time.Duration + // The last time we requested a chunk. Deleting the request from any connection will clear this + // value. + lastRequested map[request]*time.Timer + callbacks requestStrategyCallbacks +} + +type requestStrategyMaker func(callbacks requestStrategyCallbacks) requestStrategy + +func requestStrategyThreeMaker(duplicateRequestTimeout time.Duration) requestStrategyMaker { + return func(callbacks requestStrategyCallbacks) requestStrategy { + return requestStrategyThree{ + duplicateRequestTimeout: duplicateRequestTimeout, + callbacks: callbacks, + lastRequested: make(map[request]*time.Timer), + } + } +} + +func (rs requestStrategyThree) hooks() requestStrategyHooks { + return requestStrategyHooks{ + deletedRequest: func(r request) { + if t, ok := rs.lastRequested[r]; ok { + t.Stop() + delete(rs.lastRequested, r) + } + }, + sentRequest: rs.onSentRequest, + } + +} + +func (rs requestStrategyOne) hooks() requestStrategyHooks { + return requestStrategyHooks{} +} + +func (rs requestStrategyTwo) hooks() requestStrategyHooks { + return requestStrategyHooks{} } diff --git a/torrent.go b/torrent.go index b29fd9f8..5bb013bc 100644 --- a/torrent.go +++ b/torrent.go @@ -48,8 +48,6 @@ type Torrent struct { // Determines what chunks to request from peers. requestStrategy requestStrategy - // How long to avoid duplicating a pending request. - _duplicateRequestTimeout time.Duration closed missinggo.Event infoHash metainfo.Hash @@ -135,9 +133,6 @@ type Torrent struct { // Count of each request across active connections. pendingRequests map[request]int - // The last time we requested a chunk. Deleting the request from any - // connection will clear this value. - lastRequested map[request]*time.Timer } func (t *Torrent) numConns() int { @@ -408,7 +403,6 @@ func (t *Torrent) onSetInfo() { t.gotMetainfo.Set() t.updateWantPeersEvent() t.pendingRequests = make(map[request]int) - t.lastRequested = make(map[request]*time.Timer) t.tryCreateMorePieceHashers() } @@ -1228,9 +1222,9 @@ func (t *Torrent) assertNoPendingRequests() { if len(t.pendingRequests) != 0 { panic(t.pendingRequests) } - if len(t.lastRequested) != 0 { - panic(t.lastRequested) - } + //if len(t.lastRequested) != 0 { + // panic(t.lastRequested) + //} } func (t *Torrent) dropConnection(c *connection) { @@ -1802,6 +1796,22 @@ func (t *Torrent) requestStrategyTorrent() requestStrategyTorrent { return t } -func (t *Torrent) duplicateRequestTimeout() time.Duration { - return t._duplicateRequestTimeout +type torrentRequestStrategyCallbacks struct { + t *Torrent +} + +func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) { + torrent.Add("request timeouts", 1) + cb.t.cl.lock() + defer cb.t.cl.unlock() + for cn := range cb.t.conns { + if cn.PeerHasPiece(pieceIndex(r.Index)) { + cn.updateRequests() + } + } + +} + +func (t *Torrent) requestStrategyCallbacks() requestStrategyCallbacks { + return torrentRequestStrategyCallbacks{t} }