Dynamic outbound max requests

This might be one solution to https://github.com/anacrolix/torrent/issues/698.
This commit is contained in:
Matt Joiner 2021-12-24 08:55:57 +11:00
parent 89b8b78980
commit 1bae62fd22
10 changed files with 117 additions and 39 deletions

View File

@ -985,23 +985,25 @@ func (p *Peer) initUpdateRequestsTimer() {
}
}
p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
p.updateRequestsTimer.Stop()
}
const peerUpdateRequestsTimerReason = "updateRequestsTimer"
func (c *Peer) updateRequestsTimerFunc() {
c.locker().Lock()
defer c.locker().Unlock()
if c.closed.IsSet() {
return
}
if c.needRequestUpdate != "" {
return
}
if c.isLowOnRequests() {
// If there are no outstanding requests, then a request update should have already run.
return
}
c.updateRequests("updateRequestsTimer")
if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
log.Printf("spurious timer requests update [interval=%v]", d)
return
}
c.updateRequests(peerUpdateRequestsTimerReason)
}
// Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this

10
misc.go
View File

@ -143,6 +143,16 @@ func max(as ...int64) int64 {
return ret
}
func maxInt(as ...int) int {
ret := as[0]
for _, a := range as[1:] {
if a > ret {
ret = a
}
}
return ret
}
func min(as ...int64) int64 {
ret := as[0]
for _, a := range as[1:] {

View File

@ -11,8 +11,6 @@ import (
type peerImpl interface {
// Trigger the actual request state to get updated
handleUpdateRequests()
// Whether the outstanding local request cardinality is low enough to warrant an update.
isLowOnRequests() bool
writeInterested(interested bool) bool
// _cancel initiates cancellation of a request and returns acked if it expects the cancel to be

15
peer.go Normal file
View File

@ -0,0 +1,15 @@
package torrent
func (p *Peer) isLowOnRequests() bool {
return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
}
func (p *Peer) decPeakRequests() {
// // This can occur when peak requests are altered by the update request timer to be lower than
// // the actual number of outstanding requests. Let's let it go negative and see what happens. I
// // wonder what happens if maxRequests is not signed.
// if p.peakRequests < 1 {
// panic(p.peakRequests)
// }
p.peakRequests--
}

View File

@ -86,6 +86,8 @@ type Peer struct {
needRequestUpdate string
requestState requestState
updateRequestsTimer *time.Timer
lastRequestUpdate time.Time
peakRequests maxRequests
lastBecameInterested time.Time
priorInterest time.Duration
@ -445,7 +447,10 @@ func (cn *Peer) peerHasPiece(piece pieceIndex) bool {
// 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when
// https://github.com/pion/datachannel/issues/59 is fixed.
const writeBufferHighWaterLen = 1 << 15
const (
writeBufferHighWaterLen = 1 << 15
writeBufferLowWaterLen = writeBufferHighWaterLen / 2
)
// Writes a message into the write buffer. Returns whether it's okay to keep writing. Writing is
// done asynchronously, so it may be that we're not able to honour backpressure from this method.
@ -481,9 +486,17 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool {
return index < len(cn.metadataRequests) && cn.metadataRequests[index]
}
var (
interestedMsgLen = len(pp.Message{Type: pp.Interested}.MustMarshalBinary())
requestMsgLen = len(pp.Message{Type: pp.Request}.MustMarshalBinary())
// This is the maximum request count that could fit in the write buffer if it's at or below the
// low water mark when we run maybeUpdateActualRequestState.
maxLocalToRemoteRequests = (writeBufferHighWaterLen - writeBufferLowWaterLen - interestedMsgLen) / requestMsgLen
)
// The actual value to use as the maximum outbound requests.
func (cn *Peer) nominalMaxRequests() (ret maxRequests) {
return maxRequests(clamp(1, int64(cn.PeerMaxRequests), 2048))
func (cn *Peer) nominalMaxRequests() maxRequests {
return maxRequests(maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests)))
}
func (cn *Peer) totalExpectingTime() (ret time.Duration) {
@ -649,6 +662,7 @@ func (me *Peer) cancel(r RequestIndex) {
panic("request already cancelled")
}
}
me.decPeakRequests()
if me.isLowOnRequests() {
me.updateRequests("Peer.cancel")
}
@ -662,9 +676,15 @@ func (me *PeerConn) _cancel(r RequestIndex) bool {
}
func (cn *PeerConn) fillWriteBuffer() {
if !cn.maybeUpdateActualRequestState() {
return
if cn.messageWriter.writeBuffer.Len() > writeBufferLowWaterLen {
// Fully committing to our max requests requires sufficient space (see
// maxLocalToRemoteRequests). Flush what we have instead. We also prefer always to make
// requests than to do PEX or upload, so we short-circuit before handling those. Any update
// request reason will not be cleared, so we'll come right back here when there's space. We
// can't do this in maybeUpdateActualRequestState because it's a method on Peer and has no
// knowledge of write buffers.
}
cn.maybeUpdateActualRequestState()
if cn.pex.IsEnabled() {
if flow := cn.pex.Share(cn.write); !flow {
return
@ -703,6 +723,9 @@ func (cn *Peer) updateRequests(reason string) {
if cn.needRequestUpdate != "" {
return
}
if reason != peerUpdateRequestsTimerReason && !cn.isLowOnRequests() {
return
}
cn.needRequestUpdate = reason
cn.handleUpdateRequests()
}
@ -1235,7 +1258,9 @@ func (c *PeerConn) mainReadLoop() (err error) {
// Returns true if it was valid to reject the request.
func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
if !c.deleteRequest(r) && !c.requestState.Cancelled.CheckedRemove(r) {
if c.deleteRequest(r) {
c.decPeakRequests()
} else if !c.requestState.Cancelled.CheckedRemove(r) {
return false
}
if c.isLowOnRequests() {
@ -1743,10 +1768,6 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
return pc, ok
}
func (pc *PeerConn) isLowOnRequests() bool {
return pc.requestState.Requests.IsEmpty() && pc.requestState.Cancelled.IsEmpty()
}
func (p *Peer) uncancelledRequests() uint64 {
return p.requestState.Requests.GetCardinality()
}

View File

@ -238,3 +238,11 @@ func TestHaveAllThenBitfield(t *testing.T) {
{2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
})
}
func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
c := qt.New(t)
c.Check(interestedMsgLen, qt.Equals, 5)
c.Check(requestMsgLen, qt.Equals, 17)
c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
}

View File

@ -6,7 +6,8 @@ import (
type PeerRequestState struct {
Interested bool
// Expecting
// Expecting. TODO: This should be ordered so webseed requesters initiate in the same order they
// were assigned.
Requests roaring.Bitmap
// Cancelled and waiting response
Cancelled roaring.Bitmap

View File

@ -4,6 +4,7 @@ import (
"container/heap"
"context"
"encoding/gob"
"fmt"
"reflect"
"runtime/pprof"
"time"
@ -201,31 +202,46 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
return
}
func (p *Peer) maybeUpdateActualRequestState() bool {
if p.needRequestUpdate == "" {
return true
func (p *Peer) maybeUpdateActualRequestState() {
if p.closed.IsSet() {
return
}
if p.needRequestUpdate == "" {
return
}
if p.needRequestUpdate == peerUpdateRequestsTimerReason {
since := time.Since(p.lastRequestUpdate)
if since < updateRequestsTimerDuration {
panic(since)
}
}
var more bool
pprof.Do(
context.Background(),
pprof.Labels("update request", p.needRequestUpdate),
func(_ context.Context) {
next := p.getDesiredRequestState()
more = p.applyRequestState(next)
p.applyRequestState(next)
},
)
return more
}
// Transmit/action the request state to the peer.
func (p *Peer) applyRequestState(next desiredRequestState) bool {
func (p *Peer) applyRequestState(next desiredRequestState) {
current := &p.requestState
if !p.setInterested(next.Interested) {
return false
panic("insufficient write buffer")
}
more := true
requestHeap := &next.Requests
t := p.t
originalRequestCount := current.Requests.GetCardinality()
// We're either here on a timer, or because we ran out of requests. Both are valid reasons to
// alter peakRequests.
if originalRequestCount != 0 && p.needRequestUpdate != peerUpdateRequestsTimerReason {
panic(fmt.Sprintf(
"expected zero existing requests (%v) for update reason %q",
originalRequestCount, p.needRequestUpdate))
}
heap.Init(requestHeap)
for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()+current.Cancelled.GetCardinality()) < p.nominalMaxRequests() {
req := heap.Pop(requestHeap).(RequestIndex)
@ -245,14 +261,24 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool {
break
}
}
// TODO: This may need to change, we might want to update even if there were no requests due to
// filtering them for being recently requested already.
p.updateRequestsTimer.Stop()
if more {
p.needRequestUpdate = ""
if current.Interested {
p.updateRequestsTimer.Reset(3 * time.Second)
}
if !more {
// This might fail if we incorrectly determine that we can fit up to the maximum allowed
// requests into the available write buffer space. We don't want that to happen because it
// makes our peak requests dependent on how much was already in the buffer.
panic(fmt.Sprintf(
"couldn't fill apply entire request state [newRequests=%v]",
current.Requests.GetCardinality()-originalRequestCount))
}
return more
newPeakRequests := maxRequests(current.Requests.GetCardinality() - originalRequestCount)
// log.Printf(
// "requests %v->%v (peak %v->%v) reason %q (peer %v)",
// originalRequestCount, current.Requests.GetCardinality(), p.peakRequests, newPeakRequests, p.needRequestUpdate, p)
p.peakRequests = newPeakRequests
p.needRequestUpdate = ""
p.lastRequestUpdate = time.Now()
p.updateRequestsTimer.Reset(updateRequestsTimerDuration)
}
// This could be set to 10s to match the unchoke/request update interval recommended by some
// specifications. I've set it shorter to trigger it more often for testing for now.
const updateRequestsTimerDuration = 3 * time.Second

View File

@ -2241,6 +2241,7 @@ func (t *Torrent) DisallowDataUpload() {
defer t.cl.unlock()
t.dataUploadDisallowed = true
for c := range t.conns {
// TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
c.updateRequests("disallow data upload")
}
}

View File

@ -187,10 +187,6 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
return err
}
func (me *webseedPeer) isLowOnRequests() bool {
return me.peer.requestState.Requests.GetCardinality() < uint64(me.maxRequests)
}
func (me *webseedPeer) peerPieces() *roaring.Bitmap {
return &me.client.Pieces
}