Try request stealing

This commit is contained in:
Matt Joiner 2021-12-03 21:30:41 +11:00
parent 73be571f50
commit 117ae28b38
3 changed files with 35 additions and 101 deletions

View File

@ -619,7 +619,7 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
cn.validReceiveChunks = make(map[RequestIndex]int) cn.validReceiveChunks = make(map[RequestIndex]int)
} }
cn.validReceiveChunks[r]++ cn.validReceiveChunks[r]++
cn.t.pendingRequests.Inc(r) cn.t.pendingRequests[r] = cn
cn.t.lastRequested[r] = time.Now() cn.t.lastRequested[r] = time.Now()
cn.updateExpectingChunks() cn.updateExpectingChunks()
ppReq := cn.t.requestIndexToRequest(r) ppReq := cn.t.requestIndexToRequest(r)
@ -1550,10 +1550,8 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)}) f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
} }
c.updateExpectingChunks() c.updateExpectingChunks()
c.t.pendingRequests.Dec(r) delete(c.t.pendingRequests, r)
if c.t.pendingRequests.Get(r) == 0 {
delete(c.t.lastRequested, r) delete(c.t.lastRequested, r)
}
return true return true
} }

View File

@ -4,7 +4,6 @@ import (
"container/heap" "container/heap"
"context" "context"
"encoding/gob" "encoding/gob"
"math/rand"
"reflect" "reflect"
"runtime/pprof" "runtime/pprof"
"time" "time"
@ -82,20 +81,6 @@ func (p *peerRequests) Less(i, j int) bool {
t := p.peer.t t := p.peer.t
leftPieceIndex := leftRequest / t.chunksPerRegularPiece() leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
rightPieceIndex := rightRequest / t.chunksPerRegularPiece() rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
pending := func(index RequestIndex, current bool) int {
ret := t.pendingRequests.Get(index)
if current {
ret--
}
// See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
// resolved.
if ret < 0 {
panic(ret)
}
return ret
}
ml := multiless.New() ml := multiless.New()
// Push requests that can't be served right now to the end. But we don't throw them away unless // Push requests that can't be served right now to the end. But we don't throw them away unless
// there's a better alternative. This is for when we're using the fast extension and get choked // there's a better alternative. This is for when we're using the fast extension and get choked
@ -106,28 +91,33 @@ func (p *peerRequests) Less(i, j int) bool {
!p.peer.peerAllowedFast.Contains(rightPieceIndex), !p.peer.peerAllowedFast.Contains(rightPieceIndex),
) )
} }
ml = ml.Int( leftPeer := t.pendingRequests[leftRequest]
pending(leftRequest, leftCurrent), rightPeer := t.pendingRequests[rightRequest]
pending(rightRequest, rightCurrent)) ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer)
ml = ml.Bool(!leftCurrent, !rightCurrent) ml = ml.Bool(rightPeer == nil, leftPeer == nil)
if ml.Ok() {
return ml.MustLess()
}
if leftPeer != nil {
ml = ml.Uint64(
rightPeer.actualRequestState.Requests.GetCardinality(),
leftPeer.actualRequestState.Requests.GetCardinality(),
)
}
ml = ml.CmpInt64(t.lastRequested[rightRequest].Sub(t.lastRequested[leftRequest]).Nanoseconds())
leftPiece := t.piece(int(leftPieceIndex)) leftPiece := t.piece(int(leftPieceIndex))
rightPiece := t.piece(int(rightPieceIndex)) rightPiece := t.piece(int(rightPieceIndex))
ml = ml.Int( ml = ml.Int(
// Technically we would be happy with the cached priority here, except we don't actually
// cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve
// the priority through Piece.purePriority, which is probably slower.
-int(leftPiece.purePriority()), -int(leftPiece.purePriority()),
-int(rightPiece.purePriority()), -int(rightPiece.purePriority()),
) )
ml = ml.Int( ml = ml.Int(
int(leftPiece.availability), int(leftPiece.availability),
int(rightPiece.availability)) int(rightPiece.availability))
leftLastRequested := p.peer.t.lastRequested[leftRequest] return ml.Less()
rightLastRequested := p.peer.t.lastRequested[rightRequest]
ml = ml.EagerSameLess(
leftLastRequested.Equal(rightLastRequested),
leftLastRequested.Before(rightLastRequested),
)
ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
ml = ml.Uint32(leftRequest, rightRequest)
return ml.MustLess()
} }
func (p *peerRequests) Swap(i, j int) { func (p *peerRequests) Swap(i, j int) {
@ -146,7 +136,7 @@ func (p *peerRequests) Pop() interface{} {
} }
type desiredRequestState struct { type desiredRequestState struct {
Requests []RequestIndex Requests peerRequests
Interested bool Interested bool
} }
@ -175,7 +165,9 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
// return // return
// } // }
if !allowedFast { if !allowedFast {
// We must signal interest to request this // We must signal interest to request this. TODO: We could set interested if the
// peers pieces (minus the allowed fast set) overlap with our missing pieces if
// there are any readers, or any pending pieces.
desired.Interested = true desired.Interested = true
// We can make or will allow sustaining a request here if we're not choked, or // We can make or will allow sustaining a request here if we're not choked, or
// have made the request previously (presumably while unchoked), and haven't had // have made the request previously (presumably while unchoked), and haven't had
@ -186,23 +178,12 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
return return
} }
} }
// Note that we can still be interested if we filter all requests due to being
// recently requested from another peer.
if !p.actualRequestState.Requests.Contains(r) {
if time.Since(p.t.lastRequested[r]) < time.Second {
return
}
}
requestHeap.requestIndexes = append(requestHeap.requestIndexes, r) requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
}) })
}, },
) )
p.t.assertPendingRequests() p.t.assertPendingRequests()
heap.Init(&requestHeap) desired.Requests = requestHeap
for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
requestIndex := heap.Pop(&requestHeap).(RequestIndex)
desired.Requests = append(desired.Requests, requestIndex)
}
return return
} }
@ -229,64 +210,19 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool {
return false return false
} }
more := true more := true
cancel := current.Requests.Clone() requestHeap := &next.Requests
for _, ri := range next.Requests { heap.Init(requestHeap)
cancel.Remove(ri) for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()) < p.nominalMaxRequests() {
} req := heap.Pop(requestHeap).(RequestIndex)
cancel.Iterate(func(req uint32) bool {
more = p.cancel(req)
return more
})
if !more {
return false
}
shuffled := false
lastPending := 0
for i := 0; i < len(next.Requests); i++ {
req := next.Requests[i]
if p.cancelledRequests.Contains(req) { if p.cancelledRequests.Contains(req) {
// Waiting for a reject or piece message, which will suitably trigger us to update our // Waiting for a reject or piece message, which will suitably trigger us to update our
// requests, so we can skip this one with no additional consideration. // requests, so we can skip this one with no additional consideration.
continue continue
} }
// The cardinality of our desired requests shouldn't exceed the max requests since it's used existing := p.t.pendingRequests[req]
// in the calculation of the requests. However, if we cancelled requests and they haven't if existing != nil && existing != p && existing.actualRequestState.Requests.GetCardinality()-existing.cancelledRequests.GetCardinality() > current.Requests.GetCardinality() {
// been rejected or serviced yet with the fast extension enabled, we can end up with more existing.cancel(req)
// extra outstanding requests. We could subtract the number of outstanding cancels from the
// next request cardinality, but peers might not like that.
if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
// log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
// next.Requests.GetCardinality(),
// p.cancelledRequests.GetCardinality(),
// current.Requests.GetCardinality(),
// p.nominalMaxRequests(),
// )
break
} }
otherPending := p.t.pendingRequests.Get(next.Requests[0])
if p.actualRequestState.Requests.Contains(next.Requests[0]) {
otherPending--
}
if otherPending < lastPending {
// Pending should only rise. It's supposed to be the strongest ordering criteria. If it
// doesn't, our shuffling condition could be wrong.
panic(lastPending)
}
// If the request has already been requested by another peer, shuffle this and the rest of
// the requests (since according to the increasing condition, the rest of the indices
// already have an outstanding request with another peer).
if !shuffled && otherPending > 0 {
shuffleReqs := next.Requests[i:]
rand.Shuffle(len(shuffleReqs), func(i, j int) {
shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
})
// log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
shuffled = true
// Repeat this index
i--
continue
}
more = p.mustRequest(req) more = p.mustRequest(req)
if !more { if !more {
break break

View File

@ -138,7 +138,7 @@ type Torrent struct {
initialPieceCheckDisabled bool initialPieceCheckDisabled bool
// Count of each request across active connections. // Count of each request across active connections.
pendingRequests pendingRequests pendingRequests map[RequestIndex]*Peer
lastRequested map[RequestIndex]time.Time lastRequested map[RequestIndex]time.Time
// Chunks we've written to since the corresponding piece was last checked. // Chunks we've written to since the corresponding piece was last checked.
dirtyChunks roaring.Bitmap dirtyChunks roaring.Bitmap
@ -463,7 +463,7 @@ func (t *Torrent) onSetInfo() {
t.cl.event.Broadcast() t.cl.event.Broadcast()
close(t.gotMetainfoC) close(t.gotMetainfoC)
t.updateWantPeersEvent() t.updateWantPeersEvent()
t.pendingRequests.Init(t.numRequests()) t.pendingRequests = make(map[RequestIndex]*Peer)
t.lastRequested = make(map[RequestIndex]time.Time) t.lastRequested = make(map[RequestIndex]time.Time)
t.tryCreateMorePieceHashers() t.tryCreateMorePieceHashers()
t.iterPeers(func(p *Peer) { t.iterPeers(func(p *Peer) {