Remove unused peer stuff in request strategy

This commit is contained in:
Matt Joiner 2021-11-29 10:14:16 +11:00
parent 0f495ce97d
commit 8e707c9b15
4 changed files with 1 additions and 318 deletions

View File

@ -2,12 +2,10 @@ package request_strategy
import (
"bytes"
"fmt"
"sort"
"sync"
"github.com/anacrolix/multiless"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types"
@ -165,54 +163,6 @@ type Input struct {
MaxUnverifiedBytes int64
}
// TODO: We could do metainfo requests here.
func Run(input Input) map[PeerId]PeerNextRequestState {
var requestPieces []requestablePiece
GetRequestablePieces(input, func(t *Torrent, piece *Piece, pieceIndex int) {
requestPieces = append(requestPieces, requestablePiece{
index: pieceIndex,
t: t,
NumPendingChunks: piece.NumPendingChunks,
IterPendingChunks: piece.iterPendingChunksWrapper,
alwaysReallocate: piece.Priority >= types.PiecePriorityNext,
})
})
torrents := input.Torrents
allPeers := make(map[metainfo.Hash][]*requestsPeer, len(torrents))
for _, t := range torrents {
peers := make([]*requestsPeer, 0, len(t.Peers))
for _, p := range t.Peers {
peers = append(peers, &requestsPeer{
Peer: p,
})
}
allPeers[t.InfoHash] = peers
}
for _, piece := range requestPieces {
for _, peer := range allPeers[piece.t.InfoHash] {
if peer.canRequestPiece(piece.index) {
peer.requestablePiecesRemaining++
}
}
}
for _, piece := range requestPieces {
allocatePendingChunks(piece, allPeers[piece.t.InfoHash])
}
ret := make(map[PeerId]PeerNextRequestState)
for _, peers := range allPeers {
for _, rp := range peers {
if rp.requestablePiecesRemaining != 0 {
panic(rp.requestablePiecesRemaining)
}
if _, ok := ret[rp.Id]; ok {
panic(fmt.Sprintf("duplicate peer id: %v", rp.Id))
}
ret[rp.Id] = rp.nextState
}
}
return ret
}
// Checks that a sorted peersForPiece slice makes sense.
func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
if !sort.IsSorted(peers) {

View File

@ -2,7 +2,6 @@ package request_strategy
import (
"encoding/gob"
"math"
"testing"
"github.com/RoaringBitmap/roaring"
@ -55,261 +54,19 @@ var hasAllRequests = func() (all roaring.Bitmap) {
return
}()
func TestStealingFromSlowerPeer(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
basePeer.Pieces.Add(0)
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
stealee.ExistingRequests = hasAllRequests
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := Run(Input{Torrents: []Torrent{{
ChunksPerPiece: 9,
Pieces: []Piece{{
Request: true,
NumPendingChunks: 5,
IterPendingChunks: chunkIterRange(5),
}},
Peers: []Peer{
stealee,
firstStealer,
secondStealer,
},
}}})
c.Assert(results, qt.HasLen, 3)
check := func(p PeerId, l uint64) {
addressableBm := results[p].Requests
c.Check(addressableBm.GetCardinality(), qt.ContentEquals, l)
c.Check(results[p].Interested, qt.Equals, l > 0)
}
check(stealee.Id, 1)
check(firstStealer.Id, 2)
check(secondStealer.Id, 2)
}
func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num uint64, interest bool) {
addressableBm := next.Requests
c.Check(addressableBm.GetCardinality(), qt.ContentEquals, num)
c.Check(next.Interested, qt.Equals, interest)
}
func TestStealingFromSlowerPeersBasic(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
basePeer.Pieces.Add(0)
stealee := basePeer
stealee.DownloadRate = 1
stealee.ExistingRequests = hasAllRequests
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := Run(Input{Torrents: []Torrent{{
ChunksPerPiece: 9,
Pieces: []Piece{{
Request: true,
NumPendingChunks: 2,
IterPendingChunks: chunkIter(0, 1),
}},
Peers: []Peer{
stealee,
firstStealer,
secondStealer,
},
}}})
checkNumRequestsAndInterest(c, results[firstStealer.Id], 1, true)
checkNumRequestsAndInterest(c, results[secondStealer.Id], 1, true)
checkNumRequestsAndInterest(c, results[stealee.Id], 0, false)
}
func checkResultsRequestsLen(t *testing.T, reqs roaring.Bitmap, l uint64) {
qt.Check(t, reqs.GetCardinality(), qt.Equals, l)
}
func TestPeerKeepsExistingIfReasonable(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
basePeer.Pieces.Add(0)
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
keepReq := RequestIndex(0)
stealee.ExistingRequests = requestSetFromSlice(keepReq)
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := Run(Input{Torrents: []Torrent{{
ChunksPerPiece: 9,
Pieces: []Piece{{
Request: true,
NumPendingChunks: 4,
IterPendingChunks: chunkIter(0, 1, 3, 4),
}},
Peers: []Peer{
stealee,
firstStealer,
secondStealer,
},
}}})
c.Assert(results, qt.HasLen, 3)
check := func(p PeerId, l uint64) {
checkResultsRequestsLen(t, results[p].Requests, l)
c.Check(results[p].Interested, qt.Equals, l > 0)
}
check(firstStealer.Id, 2)
check(secondStealer.Id, 1)
c.Check(
results[stealee.Id],
peerNextRequestStateChecker,
PeerNextRequestState{
Interested: true,
Requests: requestSetFromSlice(keepReq),
},
)
}
var peerNextRequestStateChecker = qt.CmpEquals(
cmp.Transformer(
"bitmap",
func(bm roaring.Bitmap) []uint32 {
return bm.ToArray()
}))
func TestDontStealUnnecessarily(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
basePeer.Pieces.AddRange(0, 5)
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
r := func(i, c RequestIndex) RequestIndex {
return i*9 + c
}
keepReqs := requestSetFromSlice(
r(3, 2), r(3, 4), r(3, 6), r(3, 8),
r(4, 0), r(4, 1), r(4, 7), r(4, 8))
stealee.ExistingRequests = keepReqs
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
secondStealer.Pieces = roaring.Bitmap{}
secondStealer.Pieces.Add(1)
secondStealer.Pieces.Add(3)
results := Run(Input{Torrents: []Torrent{{
ChunksPerPiece: 9,
Pieces: []Piece{
{
Request: true,
NumPendingChunks: 0,
IterPendingChunks: chunkIterRange(9),
},
{
Request: true,
NumPendingChunks: 7,
IterPendingChunks: chunkIterRange(7),
},
{
Request: true,
NumPendingChunks: 0,
IterPendingChunks: chunkIterRange(0),
},
{
Request: true,
NumPendingChunks: 9,
IterPendingChunks: chunkIterRange(9),
},
{
Request: true,
NumPendingChunks: 9,
IterPendingChunks: chunkIterRange(9),
},
},
Peers: []Peer{
firstStealer,
stealee,
secondStealer,
},
}}})
c.Assert(results, qt.HasLen, 3)
check := func(p PeerId, l uint64) {
checkResultsRequestsLen(t, results[p].Requests, l)
c.Check(results[p].Interested, qt.Equals, l > 0)
}
check(firstStealer.Id, 5)
check(secondStealer.Id, 7+9)
c.Check(
results[stealee.Id],
peerNextRequestStateChecker,
PeerNextRequestState{
Interested: true,
Requests: requestSetFromSlice(r(4, 0), r(4, 1), r(4, 7), r(4, 8)),
},
)
}
// This tests a situation where multiple peers had the same existing request, due to "actual" and
// "next" request states being out of sync. This reasonable occurs when a peer hasn't fully updated
// its actual request state since the last request strategy run.
func TestDuplicatePreallocations(t *testing.T) {
peer := func(id int, downloadRate float64) Peer {
p := Peer{
ExistingRequests: hasAllRequests,
MaxRequests: 2,
Id: intPeerId(id),
DownloadRate: downloadRate,
}
p.Pieces.AddRange(0, roaring.MaxRange)
return p
}
results := Run(Input{
Torrents: []Torrent{{
ChunksPerPiece: 1,
Pieces: []Piece{{
Request: true,
NumPendingChunks: 1,
IterPendingChunks: chunkIterRange(1),
}, {
Request: true,
NumPendingChunks: 1,
IterPendingChunks: chunkIterRange(1),
}},
Peers: []Peer{
// The second peer was be marked as the preallocation, clobbering the first. The
// first peer is preferred, and the piece isn't striped, so it gets preallocated a
// request, and then gets reallocated from the peer the same request.
peer(1, 2),
peer(2, 1),
},
}},
})
c := qt.New(t)
req1 := results[intPeerId(1)].Requests
req2 := results[intPeerId(2)].Requests
c.Assert(uint64(2), qt.Equals, req1.GetCardinality()+req2.GetCardinality())
}

View File

@ -8,9 +8,7 @@ import (
type Torrent struct {
Pieces []Piece
Capacity storage.TorrentCapacity
// Unclosed Peers. Not necessary for getting requestable piece ordering.
Peers []Peer
// Some value that's unique and stable between runs. Could even use the infohash?
// Some value that's unique and stable between runs.
InfoHash metainfo.Hash
ChunksPerPiece uint32

View File

@ -44,28 +44,6 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input {
IterPendingChunks: &p.undirtiedChunksIter,
})
}
t.iterPeers(func(p *Peer) {
if p.closed.IsSet() {
return
}
if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
}
p.piecesReceivedSinceLastRequestUpdate = 0
rst.Peers = append(rst.Peers, request_strategy.Peer{
Pieces: *p.newPeerPieces(),
MaxRequests: p.nominalMaxRequests(),
ExistingRequests: p.actualRequestState.Requests,
Choking: p.peerChoking,
PieceAllowedFast: p.peerAllowedFast,
DownloadRate: p.downloadRate(),
Age: time.Since(p.completedHandshake),
Id: peerId{
Peer: p,
ptr: uintptr(unsafe.Pointer(p)),
},
})
})
ts = append(ts, rst)
}
return request_strategy.Input{