Extract the request timeout stuff into requestStrategyThree

This commit is contained in:
Matt Joiner 2020-01-10 16:18:55 +11:00
parent 4c989da21e
commit 2559af0f9c
8 changed files with 108 additions and 43 deletions

View File

@ -1039,11 +1039,10 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
networkingEnabled: true,
requestStrategy: requestStrategyOne{},
requestStrategy: cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks()),
metadataChanged: sync.Cond{
L: cl.locker(),
},
_duplicateRequestTimeout: 1 * time.Second,
}
t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
return fmt.Sprintf("%v: %s", t, m.Text())

View File

@ -137,6 +137,8 @@ type ClientConfig struct {
// OnQuery hook func
DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool)
DefaultRequestStrategy requestStrategyMaker
}
func (cfg *ClientConfig) SetListenAddr(addr string) *ClientConfig {
@ -174,6 +176,8 @@ func NewDefaultClientConfig() *ClientConfig {
CryptoProvides: mse.AllSupportedCrypto,
ListenPort: 42069,
Logger: log.Default,
DefaultRequestStrategy: requestStrategyThreeMaker(5 * time.Second),
}
//cc.ConnTracker.SetNoMaxEntries()
//cc.ConnTracker.Timeout = func(conntrack.Entry) time.Duration { return 0 }

View File

@ -388,7 +388,7 @@ func (cn *connection) nominalMaxRequests() (ret int) {
}
// The actual value to use as the maximum outbound requests.
func (requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
func (rs requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
expectingTime := int64(cn.totalExpectingTime())
if expectingTime == 0 {
expectingTime = math.MaxInt64
@ -404,7 +404,7 @@ func (requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (re
2,
// Request only as many as we expect to receive in the duplicateRequestTimeout
// window. We are trying to avoid having to duplicate requests.
cn.chunksReceivedWhileExpecting()*int64(cn.torrent().duplicateRequestTimeout())/expectingTime,
cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
),
))
}
@ -533,17 +533,7 @@ func (cn *connection) request(r request, mw messageWriter) bool {
}
cn.validReceiveChunks[r] = struct{}{}
cn.t.pendingRequests[r]++
cn.t.lastRequested[r] = time.AfterFunc(cn.t._duplicateRequestTimeout, func() {
torrent.Add("duplicate request timeouts", 1)
cn.mu().Lock()
defer cn.mu().Unlock()
delete(cn.t.lastRequested, r)
for cn := range cn.t.conns {
if cn.PeerHasPiece(pieceIndex(r.Index)) {
cn.updateRequests()
}
}
})
cn.t.requestStrategy.hooks().sentRequest(r)
cn.updateExpectingChunks()
return mw(pp.Message{
Type: pp.Request,
@ -553,6 +543,13 @@ func (cn *connection) request(r request, mw messageWriter) bool {
})
}
func (rs requestStrategyThree) onSentRequest(r request) {
rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
delete(rs.lastRequested, r)
rs.callbacks.requestTimedOut(r)
})
}
func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
if !cn.t.networkingEnabled {
if !cn.SetInterested(false, msg) {
@ -805,22 +802,24 @@ func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
}
func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
return cn.t.requestStrategy.iterUndirtiedChunks(cn.t.piece(piece).requestStrategyPiece(),
return cn.t.requestStrategy.iterUndirtiedChunks(
cn.t.piece(piece).requestStrategyPiece(),
func(cs chunkSpec) bool {
return f(request{pp.Integer(piece), cs})
})
},
)
}
func (requestStrategyThree) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
func (rs requestStrategyThree) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
continue
}
ci := p.chunkIndexSpec(i)
if p.wouldDuplicateRecent(ci) {
r := p.chunkIndexRequest(i)
if rs.wouldDuplicateRecent(r) {
continue
}
if !f(p.chunkIndexSpec(i)) {
if !f(r.chunkSpec) {
return false
}
}
@ -835,7 +834,7 @@ func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool)
if err != nil {
panic(err)
}
return f(p.chunkIndexSpec(pp.Integer(ci)))
return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec)
})
}
@ -1524,10 +1523,7 @@ func (c *connection) deleteRequest(r request) bool {
}
delete(c.requests, r)
c.updateExpectingChunks()
if t, ok := c.t.lastRequested[r]; ok {
t.Stop()
delete(c.t.lastRequested, r)
}
c.t.requestStrategy.hooks().deletedRequest(r)
pr := c.t.pendingRequests
pr[r]--
n := pr[r]

2
go.mod
View File

@ -10,7 +10,7 @@ require (
github.com/anacrolix/log v0.4.0
github.com/anacrolix/missinggo v1.2.1
github.com/anacrolix/missinggo/perf v1.0.0
github.com/anacrolix/missinggo/v2 v2.3.2-0.20200109120848-ad7ce9a1247a
github.com/anacrolix/missinggo/v2 v2.3.2-0.20200110051601-fc3212fb3984
github.com/anacrolix/multiless v0.0.0-20191223025854-070b7994e841
github.com/anacrolix/sync v0.2.0
github.com/anacrolix/tagflag v1.0.1

2
go.sum
View File

@ -65,6 +65,8 @@ github.com/anacrolix/missinggo/v2 v2.2.1-0.20191103010835-12360f38ced0/go.mod h1
github.com/anacrolix/missinggo/v2 v2.3.0/go.mod h1:ZzG3/cc3t+5zcYWAgYrJW0MBsSwNwOkTlNquBbP51Bc=
github.com/anacrolix/missinggo/v2 v2.3.2-0.20200109120848-ad7ce9a1247a h1:lYHFRvNiiBBGyreXVkcIztKyru+xAQJzg8AKaj/85TQ=
github.com/anacrolix/missinggo/v2 v2.3.2-0.20200109120848-ad7ce9a1247a/go.mod h1:sjPqWXxdr3jWcMO/tXhhshXAaiTkGIgJpN93clGzGr8=
github.com/anacrolix/missinggo/v2 v2.3.2-0.20200110051601-fc3212fb3984 h1:/pEakYOx8jjk2HLIYEA/rTVXoU0Q/JA7Ojv/6X9iIkI=
github.com/anacrolix/missinggo/v2 v2.3.2-0.20200110051601-fc3212fb3984/go.mod h1:sjPqWXxdr3jWcMO/tXhhshXAaiTkGIgJpN93clGzGr8=
github.com/anacrolix/mmsg v0.0.0-20180515031531-a4a3ba1fc8bb h1:2Or5ccMoY4Kfao+WdL2w6tpY6ZEe+2VTVbIPd7A/Ajk=
github.com/anacrolix/mmsg v0.0.0-20180515031531-a4a3ba1fc8bb/go.mod h1:x2/ErsYUmT77kezS63+wzZp8E3byYB0gzirM/WMBLfw=
github.com/anacrolix/mmsg v1.0.0 h1:btC7YLjOn29aTUAExJiVUhQOuf/8rhm+/nWCMAnL3Hg=

View File

@ -145,6 +145,13 @@ func (p *Piece) chunkIndexSpec(chunk pp.Integer) chunkSpec {
return chunkIndexSpec(chunk, p.length(), p.chunkSize())
}
func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) request {
return request{
pp.Integer(p.index),
chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()),
}
}
func (p *Piece) numDirtyBytes() (ret pp.Integer) {
// defer func() {
// if ret > p.length() {
@ -256,9 +263,9 @@ func (p *Piece) dirtyChunks() bitmap.Bitmap {
return p._dirtyChunks
}
func (p *Piece) wouldDuplicateRecent(cs chunkSpec) bool {
func (rs requestStrategyThree) wouldDuplicateRecent(r request) bool {
// This piece has been requested on another connection, and the duplicate request timer is still
// running.
_, ok := p.t.lastRequested[request{pp.Integer(p.index), cs}]
_, ok := rs.lastRequested[r]
return ok
}

View File

@ -12,8 +12,7 @@ import (
type requestStrategyPiece interface {
numChunks() pp.Integer
dirtyChunks() bitmap.Bitmap
chunkIndexSpec(i pp.Integer) chunkSpec
wouldDuplicateRecent(chunkSpec) bool
chunkIndexRequest(i pp.Integer) request
}
type requestStrategyTorrent interface {
@ -23,7 +22,6 @@ type requestStrategyTorrent interface {
readerPiecePriorities() (now, readahead bitmap.Bitmap)
ignorePieces() bitmap.Bitmap
pendingPieces() *prioritybitmap.PriorityBitmap
duplicateRequestTimeout() time.Duration
}
type requestStrategyConnection interface {
@ -43,6 +41,16 @@ type requestStrategy interface {
nominalMaxRequests(requestStrategyConnection) int
shouldRequestWithoutBias(requestStrategyConnection) bool
piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int
hooks() requestStrategyHooks
}
type requestStrategyHooks struct {
sentRequest func(request)
deletedRequest func(request)
}
type requestStrategyCallbacks interface {
requestTimedOut(request)
}
// Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across
@ -69,4 +77,43 @@ func (requestStrategyTwo) ShouldRequestWithoutBias(cn requestStrategyConnection)
// Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is
// reached.
type requestStrategyThree struct {
// How long to avoid duplicating a pending request.
duplicateRequestTimeout time.Duration
// The last time we requested a chunk. Deleting the request from any connection will clear this
// value.
lastRequested map[request]*time.Timer
callbacks requestStrategyCallbacks
}
type requestStrategyMaker func(callbacks requestStrategyCallbacks) requestStrategy
func requestStrategyThreeMaker(duplicateRequestTimeout time.Duration) requestStrategyMaker {
return func(callbacks requestStrategyCallbacks) requestStrategy {
return requestStrategyThree{
duplicateRequestTimeout: duplicateRequestTimeout,
callbacks: callbacks,
lastRequested: make(map[request]*time.Timer),
}
}
}
func (rs requestStrategyThree) hooks() requestStrategyHooks {
return requestStrategyHooks{
deletedRequest: func(r request) {
if t, ok := rs.lastRequested[r]; ok {
t.Stop()
delete(rs.lastRequested, r)
}
},
sentRequest: rs.onSentRequest,
}
}
func (rs requestStrategyOne) hooks() requestStrategyHooks {
return requestStrategyHooks{}
}
func (rs requestStrategyTwo) hooks() requestStrategyHooks {
return requestStrategyHooks{}
}

View File

@ -48,8 +48,6 @@ type Torrent struct {
// Determines what chunks to request from peers.
requestStrategy requestStrategy
// How long to avoid duplicating a pending request.
_duplicateRequestTimeout time.Duration
closed missinggo.Event
infoHash metainfo.Hash
@ -135,9 +133,6 @@ type Torrent struct {
// Count of each request across active connections.
pendingRequests map[request]int
// The last time we requested a chunk. Deleting the request from any
// connection will clear this value.
lastRequested map[request]*time.Timer
}
func (t *Torrent) numConns() int {
@ -408,7 +403,6 @@ func (t *Torrent) onSetInfo() {
t.gotMetainfo.Set()
t.updateWantPeersEvent()
t.pendingRequests = make(map[request]int)
t.lastRequested = make(map[request]*time.Timer)
t.tryCreateMorePieceHashers()
}
@ -1228,9 +1222,9 @@ func (t *Torrent) assertNoPendingRequests() {
if len(t.pendingRequests) != 0 {
panic(t.pendingRequests)
}
if len(t.lastRequested) != 0 {
panic(t.lastRequested)
}
//if len(t.lastRequested) != 0 {
// panic(t.lastRequested)
//}
}
func (t *Torrent) dropConnection(c *connection) {
@ -1802,6 +1796,22 @@ func (t *Torrent) requestStrategyTorrent() requestStrategyTorrent {
return t
}
func (t *Torrent) duplicateRequestTimeout() time.Duration {
return t._duplicateRequestTimeout
type torrentRequestStrategyCallbacks struct {
t *Torrent
}
func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) {
torrent.Add("request timeouts", 1)
cb.t.cl.lock()
defer cb.t.cl.unlock()
for cn := range cb.t.conns {
if cn.PeerHasPiece(pieceIndex(r.Index)) {
cn.updateRequests()
}
}
}
func (t *Torrent) requestStrategyCallbacks() requestStrategyCallbacks {
return torrentRequestStrategyCallbacks{t}
}