From 4cfdc2f497d3a616292f321c688b46ee494e151b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 12 Dec 2021 16:56:00 +1100 Subject: [PATCH] Shuffle duplicate requests Add missing import --- requesting.go | 38 +++++++++++++++++++++++++++++++++----- requesting_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/requesting.go b/requesting.go index 7ffa19cc..a330df53 100644 --- a/requesting.go +++ b/requesting.go @@ -4,6 +4,7 @@ import ( "container/heap" "context" "encoding/gob" + "math/rand" "reflect" "runtime/pprof" "time" @@ -195,9 +196,9 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex) rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) { r := p.t.pieceRequestIndexOffset(pieceIndex) + ci - //if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) { + // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) { // return - //} + // } if !allowedFast { // We must signal interest to request this desired.Interested = true @@ -257,7 +258,10 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool { if !more { return false } - for _, req := range next.Requests { + shuffled := false + lastPending := 0 + for i := 0; i < len(next.Requests); i++ { + req := next.Requests[i] if p.cancelledRequests.Contains(req) { // Waiting for a reject or piece message, which will suitably trigger us to update our // requests, so we can skip this one with no additional consideration. @@ -269,14 +273,38 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool { // extra outstanding requests. We could subtract the number of outstanding cancels from the // next request cardinality, but peers might not like that. if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() { - //log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]", + // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]", // next.Requests.GetCardinality(), // p.cancelledRequests.GetCardinality(), // current.Requests.GetCardinality(), // p.nominalMaxRequests(), - //) + // ) break } + otherPending := p.t.pendingRequests.Get(next.Requests[0]) + if p.actualRequestState.Requests.Contains(next.Requests[0]) { + otherPending-- + } + if otherPending < lastPending { + // Pending should only rise. It's supposed to be the strongest ordering criteria. If it + // doesn't, our shuffling condition could be wrong. + panic(lastPending) + } + // If the request has already been requested by another peer, shuffle this and the rest of + // the requests (since according to the increasing condition, the rest of the indices + // already have an outstanding request with another peer). + if !shuffled && otherPending > 0 { + shuffleReqs := next.Requests[i:] + rand.Shuffle(len(shuffleReqs), func(i, j int) { + shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i] + }) + // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests)) + shuffled = true + // Repeat this index + i-- + continue + } + more = p.mustRequest(req) if !more { break diff --git a/requesting_test.go b/requesting_test.go index dd54df5d..5e58df5c 100644 --- a/requesting_test.go +++ b/requesting_test.go @@ -4,6 +4,7 @@ import ( "testing" pp "github.com/anacrolix/torrent/peer_protocol" + "github.com/bradfitz/iter" qt "github.com/frankban/quicktest" ) @@ -40,3 +41,37 @@ func TestRequestMapOrderAcrossInstances(t *testing.T) { // This shows that different map instances with the same contents can have the same range order. qt.Assert(t, keysAsSlice(makeTypicalRequests()), qt.ContentEquals, keysAsSlice(makeTypicalRequests())) } + +// Added for testing repeating loop iteration after shuffling in Peer.applyRequestState. +func TestForLoopRepeatItem(t *testing.T) { + t.Run("ExplicitLoopVar", func(t *testing.T) { + once := false + var seen []int + for i := 0; i < 4; i++ { + seen = append(seen, i) + if !once && i == 2 { + once = true + i-- + // Will i++ still run? + continue + } + } + // We can mutate i and it's observed by the loop. No special treatment of the loop var. + qt.Assert(t, seen, qt.DeepEquals, []int{0, 1, 2, 2, 3}) + }) + t.Run("Range", func(t *testing.T) { + once := false + var seen []int + for i := range iter.N(4) { + seen = append(seen, i) + if !once && i == 2 { + once = true + // Can we actually modify the next value of i produced by the range? + i-- + continue + } + } + // Range ignores any mutation to i. + qt.Assert(t, seen, qt.DeepEquals, []int{0, 1, 2, 3}) + }) +}