Don't automatically delete requests if we're choked with fast extension

This commit is contained in:
Matt Joiner 2021-10-11 18:21:24 +11:00
parent 610f8e0185
commit 7f236506cb
2 changed files with 47 additions and 12 deletions

View File

@ -1035,12 +1035,28 @@ func (c *PeerConn) mainReadLoop() (err error) {
c.peerChoking = true c.peerChoking = true
if !c.fastEnabled() { if !c.fastEnabled() {
c.deleteAllRequests() c.deleteAllRequests()
} else {
c.actualRequestState.Requests.Iterate(func(x uint32) bool {
if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) {
c.t.pendingRequests.Dec(x)
}
return true
})
} }
// We can then reset our interest. // We can then reset our interest.
c.updateRequests("choked") c.updateRequests("choked")
c.updateExpectingChunks() c.updateExpectingChunks()
case pp.Unchoke: case pp.Unchoke:
if !c.peerChoking {
return errors.New("got unchoke but not choked")
}
c.peerChoking = false c.peerChoking = false
c.actualRequestState.Requests.Iterate(func(x uint32) bool {
if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) {
c.t.pendingRequests.Inc(x)
}
return true
})
c.updateRequests("unchoked") c.updateRequests("unchoked")
c.updateExpectingChunks() c.updateExpectingChunks()
case pp.Interested: case pp.Interested:
@ -1098,7 +1114,15 @@ func (c *PeerConn) mainReadLoop() (err error) {
case pp.AllowedFast: case pp.AllowedFast:
torrent.Add("allowed fasts received", 1) torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger) log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
c.peerAllowedFast.Add(bitmap.BitIndex(msg.Index)) pieceIndex := msg.Index.Int()
c.peerAllowedFast.AddInt(pieceIndex)
n := roaringBitmapRangeCardinality(
&c.actualRequestState.Requests,
t.pieceRequestIndexOffset(pieceIndex),
t.pieceRequestIndexOffset(pieceIndex+1))
if n != 0 {
panic(n)
}
c.updateRequests("allowed fast") c.updateRequests("allowed fast")
case pp.Extended: case pp.Extended:
err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload) err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
@ -1444,7 +1468,9 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)}) f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
} }
c.updateExpectingChunks() c.updateExpectingChunks()
c.t.pendingRequests.Dec(r) if !c.peerChoking || c.peerAllowedFast.Contains(r/c.t.chunksPerRegularPiece()) {
c.t.pendingRequests.Dec(r)
}
return true return true
} }

View File

@ -147,13 +147,22 @@ func (p peerRequests) Less(i, j int) bool {
return ret return ret
} }
ml := multiless.New() ml := multiless.New()
// Push requests that can't be served right now to the end. But we don't throw them away unless
// there's a better alternative. This is for when we're using the fast extension and get choked
// but our requests could still be good when we get unchoked.
if p.peer.peerChoking {
ml = ml.Bool(
!p.peer.peerAllowedFast.Contains(leftPieceIndex),
!p.peer.peerAllowedFast.Contains(rightPieceIndex),
)
}
ml = ml.Int( ml = ml.Int(
pending(leftRequest, leftCurrent), pending(leftRequest, leftCurrent),
pending(rightRequest, rightCurrent)) pending(rightRequest, rightCurrent))
ml = ml.Bool(rightCurrent, leftCurrent) ml = ml.Bool(rightCurrent, leftCurrent)
ml = ml.Int( ml = ml.Int(
int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority), int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority)) int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority))
ml = ml.Int( ml = ml.Int(
int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability), int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability)) int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
@ -198,15 +207,15 @@ func (p *Peer) getDesiredRequestState() (desired requestState) {
return return
} }
allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex) allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
if !allowedFast {
// We must signal interest to request this piece.
desired.Interested = true
if p.peerChoking {
// We can't request from this piece right now then.
return
}
}
rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) { rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
if !allowedFast {
// We must signal interest to request this..
desired.Interested = true
if p.peerChoking && !p.actualRequestState.Requests.Contains(ci) {
// We can't request this right now.
return
}
}
requestHeap.requestIndexes = append( requestHeap.requestIndexes = append(
requestHeap.requestIndexes, requestHeap.requestIndexes,
p.t.pieceRequestIndexOffset(pieceIndex)+ci) p.t.pieceRequestIndexOffset(pieceIndex)+ci)