Shuffle duplicate requests

Add missing import
This commit is contained in:
Matt Joiner 2021-12-12 16:56:00 +11:00
parent 4913f17c01
commit 4cfdc2f497
2 changed files with 68 additions and 5 deletions

View File

@ -4,6 +4,7 @@ import (
"container/heap" "container/heap"
"context" "context"
"encoding/gob" "encoding/gob"
"math/rand"
"reflect" "reflect"
"runtime/pprof" "runtime/pprof"
"time" "time"
@ -195,9 +196,9 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex) allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) { rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
r := p.t.pieceRequestIndexOffset(pieceIndex) + ci r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
//if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) { // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
// return // return
//} // }
if !allowedFast { if !allowedFast {
// We must signal interest to request this // We must signal interest to request this
desired.Interested = true desired.Interested = true
@ -257,7 +258,10 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool {
if !more { if !more {
return false return false
} }
for _, req := range next.Requests { shuffled := false
lastPending := 0
for i := 0; i < len(next.Requests); i++ {
req := next.Requests[i]
if p.cancelledRequests.Contains(req) { if p.cancelledRequests.Contains(req) {
// Waiting for a reject or piece message, which will suitably trigger us to update our // Waiting for a reject or piece message, which will suitably trigger us to update our
// requests, so we can skip this one with no additional consideration. // requests, so we can skip this one with no additional consideration.
@ -269,14 +273,38 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool {
// extra outstanding requests. We could subtract the number of outstanding cancels from the // extra outstanding requests. We could subtract the number of outstanding cancels from the
// next request cardinality, but peers might not like that. // next request cardinality, but peers might not like that.
if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() { if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
//log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]", // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
// next.Requests.GetCardinality(), // next.Requests.GetCardinality(),
// p.cancelledRequests.GetCardinality(), // p.cancelledRequests.GetCardinality(),
// current.Requests.GetCardinality(), // current.Requests.GetCardinality(),
// p.nominalMaxRequests(), // p.nominalMaxRequests(),
//) // )
break break
} }
otherPending := p.t.pendingRequests.Get(next.Requests[0])
if p.actualRequestState.Requests.Contains(next.Requests[0]) {
otherPending--
}
if otherPending < lastPending {
// Pending should only rise. It's supposed to be the strongest ordering criteria. If it
// doesn't, our shuffling condition could be wrong.
panic(lastPending)
}
// If the request has already been requested by another peer, shuffle this and the rest of
// the requests (since according to the increasing condition, the rest of the indices
// already have an outstanding request with another peer).
if !shuffled && otherPending > 0 {
shuffleReqs := next.Requests[i:]
rand.Shuffle(len(shuffleReqs), func(i, j int) {
shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
})
// log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
shuffled = true
// Repeat this index
i--
continue
}
more = p.mustRequest(req) more = p.mustRequest(req)
if !more { if !more {
break break

View File

@ -4,6 +4,7 @@ import (
"testing" "testing"
pp "github.com/anacrolix/torrent/peer_protocol" pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/bradfitz/iter"
qt "github.com/frankban/quicktest" qt "github.com/frankban/quicktest"
) )
@ -40,3 +41,37 @@ func TestRequestMapOrderAcrossInstances(t *testing.T) {
// This shows that different map instances with the same contents can have the same range order. // This shows that different map instances with the same contents can have the same range order.
qt.Assert(t, keysAsSlice(makeTypicalRequests()), qt.ContentEquals, keysAsSlice(makeTypicalRequests())) qt.Assert(t, keysAsSlice(makeTypicalRequests()), qt.ContentEquals, keysAsSlice(makeTypicalRequests()))
} }
// Added for testing repeating loop iteration after shuffling in Peer.applyRequestState.
func TestForLoopRepeatItem(t *testing.T) {
t.Run("ExplicitLoopVar", func(t *testing.T) {
once := false
var seen []int
for i := 0; i < 4; i++ {
seen = append(seen, i)
if !once && i == 2 {
once = true
i--
// Will i++ still run?
continue
}
}
// We can mutate i and it's observed by the loop. No special treatment of the loop var.
qt.Assert(t, seen, qt.DeepEquals, []int{0, 1, 2, 2, 3})
})
t.Run("Range", func(t *testing.T) {
once := false
var seen []int
for i := range iter.N(4) {
seen = append(seen, i)
if !once && i == 2 {
once = true
// Can we actually modify the next value of i produced by the range?
i--
continue
}
}
// Range ignores any mutation to i.
qt.Assert(t, seen, qt.DeepEquals, []int{0, 1, 2, 3})
})
}