diff --git a/peerconn.go b/peerconn.go index 71ac5f3d..f3680e18 100644 --- a/peerconn.go +++ b/peerconn.go @@ -47,6 +47,10 @@ type PeerRemoteAddr interface { String() string } +// Since we have to store all the requests in memory, we can't reasonably exceed what would be +// indexable with the memory space available. +type maxRequests = int + type Peer struct { // First to ensure 64-bit alignment for atomics. See #262. _stats ConnStats @@ -83,9 +87,10 @@ type Peer struct { cumulativeExpectedToReceiveChunks time.Duration _chunksReceivedWhileExpecting int64 - choking bool - requests map[Request]struct{} - requestsLowWater int + choking bool + requests map[Request]struct{} + piecesReceivedSinceLastRequestUpdate maxRequests + maxPiecesReceivedBetweenRequestUpdates maxRequests // Chunks that we might reasonably expect to receive from the peer. Due to // latency, buffering, and implementation differences, we may receive // chunks that are no longer in the set of requests actually want. @@ -114,7 +119,7 @@ type Peer struct { peerTouchedPieces map[pieceIndex]struct{} peerAllowedFast bitmap.Bitmap - PeerMaxRequests int // Maximum pending requests the peer allows. + PeerMaxRequests maxRequests // Maximum pending requests the peer allows. PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber PeerClientName string @@ -470,8 +475,8 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool { } // The actual value to use as the maximum outbound requests. -func (cn *Peer) nominalMaxRequests() (ret int) { - return int(clamp(1, int64(cn.PeerMaxRequests), 64)) +func (cn *Peer) nominalMaxRequests() (ret maxRequests) { + return int(clamp(1, 2*int64(cn.maxPiecesReceivedBetweenRequestUpdates), int64(cn.PeerMaxRequests))) } func (cn *Peer) totalExpectingTime() (ret time.Duration) { @@ -1358,6 +1363,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) if deletedRequest { + c.piecesReceivedSinceLastRequestUpdate++ c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) } for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData { diff --git a/request-strategy.go b/request-strategy.go index 7c7660bc..f0470007 100644 --- a/request-strategy.go +++ b/request-strategy.go @@ -53,6 +53,10 @@ func (cl *Client) doRequests() { if p.closed.IsSet() { return } + if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates { + p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate + } + p.piecesReceivedSinceLastRequestUpdate = 0 rst.Peers = append(rst.Peers, request_strategy.Peer{ HasPiece: p.peerHasPiece, MaxRequests: p.nominalMaxRequests(),