Switch Peer.PieceAllowedFast and several request strategy inputs to raw roaring.Bitmaps

This is in preparation to support encoding request strategy run inputs for benchmarking.
This commit is contained in:
Matt Joiner 2021-10-05 20:06:23 +11:00
parent fd8995dcfd
commit eab111dd84
7 changed files with 166 additions and 108 deletions

View File

@ -121,7 +121,7 @@ type Peer struct {
peerMinPieces pieceIndex
// Pieces we've accepted chunks for from the peer.
peerTouchedPieces map[pieceIndex]struct{}
peerAllowedFast bitmap.Bitmap
peerAllowedFast roaring.Bitmap
PeerMaxRequests maxRequests // Maximum pending requests the peer allows.
PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
@ -182,16 +182,19 @@ func (cn *Peer) expectingChunks() bool {
if !cn.actualRequestState.Interested {
return false
}
if cn.peerAllowedFast.IterTyped(func(i int) bool {
return roaringBitmapRangeCardinality(
&cn.actualRequestState.Requests,
cn.t.pieceRequestIndexOffset(i),
cn.t.pieceRequestIndexOffset(i+1),
) == 0
}) {
if !cn.peerChoking {
return true
}
return !cn.peerChoking
haveAllowedFastRequests := false
cn.peerAllowedFast.Iterate(func(i uint32) bool {
haveAllowedFastRequests = roaringBitmapRangeCardinality(
&cn.actualRequestState.Requests,
cn.t.pieceRequestIndexOffset(pieceIndex(i)),
cn.t.pieceRequestIndexOffset(pieceIndex(i+1)),
) == 0
return !haveAllowedFastRequests
})
return haveAllowedFastRequests
}
func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
@ -1276,7 +1279,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
}
c.decExpectedChunkReceive(req)
if c.peerChoking && c.peerAllowedFast.Get(bitmap.BitIndex(ppReq.Index)) {
if c.peerChoking && c.peerAllowedFast.Contains(bitmap.BitIndex(ppReq.Index)) {
chunksReceived.Add("due to allowed fast", 1)
}

View File

@ -1,11 +1,14 @@
package torrent
import (
"encoding/gob"
"fmt"
"sync"
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/chansync"
"github.com/anacrolix/missinggo/v2/bitmap"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
@ -237,27 +240,50 @@ func (p *Piece) State() PieceState {
return p.t.PieceState(p.index)
}
func (p *Piece) iterUndirtiedChunks(f func(chunkIndexType)) {
// Use an iterator to jump between dirty bits.
if true {
it := p.t.dirtyChunks.Iterator()
startIndex := p.requestIndexOffset()
endIndex := startIndex + p.numChunks()
it.AdvanceIfNeeded(startIndex)
lastDirty := startIndex - 1
for it.HasNext() {
next := it.Next()
if next >= endIndex {
break
}
for index := lastDirty + 1; index < next; index++ {
f(index - startIndex)
}
lastDirty = next
func init() {
gob.Register(undirtiedChunksIter{})
}
type undirtiedChunksIter struct {
TorrentDirtyChunks *roaring.Bitmap
StartRequestIndex RequestIndex
EndRequestIndex RequestIndex
}
func (me undirtiedChunksIter) Iter(f func(chunkIndexType)) {
it := me.TorrentDirtyChunks.Iterator()
startIndex := me.StartRequestIndex
endIndex := me.EndRequestIndex
it.AdvanceIfNeeded(startIndex)
lastDirty := startIndex - 1
for it.HasNext() {
next := it.Next()
if next >= endIndex {
break
}
for index := lastDirty + 1; index < endIndex; index++ {
for index := lastDirty + 1; index < next; index++ {
f(index - startIndex)
}
lastDirty = next
}
for index := lastDirty + 1; index < endIndex; index++ {
f(index - startIndex)
}
return
}
func (p *Piece) undirtiedChunksIter() request_strategy.ChunksIter {
// Use an iterator to jump between dirty bits.
return undirtiedChunksIter{
TorrentDirtyChunks: &p.t.dirtyChunks,
StartRequestIndex: p.requestIndexOffset(),
EndRequestIndex: p.requestIndexOffset() + p.numChunks(),
}
}
func (p *Piece) iterUndirtiedChunks(f func(chunkIndexType)) {
if true {
p.undirtiedChunksIter().Iter(f)
return
}
// The original implementation.

View File

@ -84,7 +84,7 @@ type requestablePiece struct {
t *Torrent
alwaysReallocate bool
NumPendingChunks int
IterPendingChunks ChunksIter
IterPendingChunks ChunksIterFunc
}
func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
@ -338,7 +338,7 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
p.IterPendingChunks(func(spec ChunkIndex) {
req := p.chunkIndexToRequestIndex(spec)
for _, peer := range peersForPiece {
if h := peer.HasExistingRequest; h == nil || !h(req) {
if !peer.ExistingRequests.Contains(req) {
continue
}
if !peer.canFitRequest() {
@ -360,7 +360,7 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
if !peer.canFitRequest() {
continue
}
if !peer.pieceAllowedFastOrDefault(p.index) {
if !peer.PieceAllowedFast.ContainsInt(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
peer.nextState.Interested = true
if peer.Choking {
@ -389,7 +389,7 @@ chunk:
if !peer.canFitRequest() {
continue
}
if !peer.pieceAllowedFastOrDefault(p.index) {
if !peer.PieceAllowedFast.ContainsInt(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
peer.nextState.Interested = true
if peer.Choking {

View File

@ -1,6 +1,7 @@
package request_strategy
import (
"encoding/gob"
"math"
"testing"
@ -9,19 +10,28 @@ import (
"github.com/google/go-cmp/cmp"
)
func chunkIterRange(end ChunkIndex) ChunksIter {
return func(f func(ChunkIndex)) {
for offset := ChunkIndex(0); offset < end; offset += 1 {
f(offset)
}
func init() {
gob.Register(chunkIterRange(0))
gob.Register(sliceChunksIter{})
}
type chunkIterRange ChunkIndex
func (me chunkIterRange) Iter(f func(ChunkIndex)) {
for offset := ChunkIndex(0); offset < ChunkIndex(me); offset += 1 {
f(offset)
}
}
type sliceChunksIter []ChunkIndex
func chunkIter(offsets ...ChunkIndex) ChunksIter {
return func(f func(ChunkIndex)) {
for _, offset := range offsets {
f(offset)
}
return sliceChunksIter(offsets)
}
func (offsets sliceChunksIter) Iter(f func(ChunkIndex)) {
for _, offset := range offsets {
f(offset)
}
}
@ -30,27 +40,32 @@ func requestSetFromSlice(rs ...RequestIndex) (ret roaring.Bitmap) {
return
}
func init() {
gob.Register(intPeerId(0))
}
type intPeerId int
func (i intPeerId) Uintptr() uintptr {
return uintptr(i)
}
func hasAllRequests(RequestIndex) bool { return true }
var hasAllRequests = func() (all roaring.Bitmap) {
all.AddRange(0, roaring.MaxRange)
return
}()
func TestStealingFromSlowerPeer(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
HasPiece: func(i pieceIndex) bool {
return true
},
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
basePeer.Pieces.Add(0)
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
stealee.HasExistingRequest = hasAllRequests
stealee.ExistingRequests = hasAllRequests
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
@ -90,15 +105,13 @@ func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num uint64,
func TestStealingFromSlowerPeersBasic(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
HasPiece: func(i pieceIndex) bool {
return true
},
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
basePeer.Pieces.Add(0)
stealee := basePeer
stealee.DownloadRate = 1
stealee.HasExistingRequest = hasAllRequests
stealee.ExistingRequests = hasAllRequests
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
@ -130,19 +143,15 @@ func checkResultsRequestsLen(t *testing.T, reqs roaring.Bitmap, l uint64) {
func TestPeerKeepsExistingIfReasonable(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
HasPiece: func(i pieceIndex) bool {
return true
},
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
basePeer.Pieces.Add(0)
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
keepReq := RequestIndex(0)
stealee.HasExistingRequest = func(r RequestIndex) bool {
return r == keepReq
}
stealee.ExistingRequests = requestSetFromSlice(keepReq)
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
@ -189,12 +198,10 @@ var peerNextRequestStateChecker = qt.CmpEquals(
func TestDontStealUnnecessarily(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
HasPiece: func(i pieceIndex) bool {
return true
},
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
basePeer.Pieces.AddRange(0, 5)
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
@ -204,22 +211,15 @@ func TestDontStealUnnecessarily(t *testing.T) {
keepReqs := requestSetFromSlice(
r(3, 2), r(3, 4), r(3, 6), r(3, 8),
r(4, 0), r(4, 1), r(4, 7), r(4, 8))
stealee.HasExistingRequest = func(r RequestIndex) bool {
return keepReqs.Contains(r)
}
stealee.ExistingRequests = keepReqs
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
secondStealer.HasPiece = func(i pieceIndex) bool {
switch i {
case 1, 3:
return true
default:
return false
}
}
secondStealer.Pieces = roaring.Bitmap{}
secondStealer.Pieces.Add(1)
secondStealer.Pieces.Add(3)
results := Run(Input{Torrents: []Torrent{{
ChunksPerPiece: 9,
Pieces: []Piece{
@ -277,15 +277,14 @@ func TestDontStealUnnecessarily(t *testing.T) {
// its actual request state since the last request strategy run.
func TestDuplicatePreallocations(t *testing.T) {
peer := func(id int, downloadRate float64) Peer {
return Peer{
HasExistingRequest: hasAllRequests,
MaxRequests: 2,
HasPiece: func(i pieceIndex) bool {
return true
},
Id: intPeerId(id),
DownloadRate: downloadRate,
p := Peer{
ExistingRequests: hasAllRequests,
MaxRequests: 2,
Id: intPeerId(id),
DownloadRate: downloadRate,
}
p.Pieces.AddRange(0, roaring.MaxRange)
return p
}
results := Run(Input{
Torrents: []Torrent{{

View File

@ -16,25 +16,22 @@ type PeerId interface {
}
type Peer struct {
HasPiece func(i pieceIndex) bool
MaxRequests int
HasExistingRequest func(r RequestIndex) bool
Choking bool
PieceAllowedFast func(pieceIndex) bool
DownloadRate float64
Age time.Duration
Pieces roaring.Bitmap
MaxRequests int
ExistingRequests roaring.Bitmap
Choking bool
PieceAllowedFast roaring.Bitmap
DownloadRate float64
Age time.Duration
// This is passed back out at the end, so must support equality. Could be a type-param later.
Id PeerId
}
func (p *Peer) pieceAllowedFastOrDefault(i pieceIndex) bool {
if f := p.PieceAllowedFast; f != nil {
return f(i)
}
return false
}
// TODO: This might be used in more places I think.
func (p *Peer) canRequestPiece(i pieceIndex) bool {
return (!p.Choking || p.pieceAllowedFastOrDefault(i)) && p.HasPiece(i)
return (!p.Choking || p.PieceAllowedFast.Contains(uint32(i))) && p.HasPiece(i)
}
func (p *Peer) HasPiece(i pieceIndex) bool {
return p.Pieces.Contains(uint32(i))
}

View File

@ -1,6 +1,10 @@
package request_strategy
type ChunksIter func(func(ChunkIndex))
type ChunksIterFunc func(func(ChunkIndex))
type ChunksIter interface {
Iter(func(ChunkIndex))
}
type Piece struct {
Request bool
@ -15,6 +19,6 @@ type Piece struct {
func (p Piece) iterPendingChunksWrapper(f func(ChunkIndex)) {
i := p.IterPendingChunks
if i != nil {
i(f)
i.Iter(f)
}
}

View File

@ -1,10 +1,13 @@
package torrent
import (
"encoding/gob"
"reflect"
"time"
"unsafe"
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/chansync"
@ -69,7 +72,7 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input {
Availability: p.availability,
Length: int64(p.length()),
NumPendingChunks: int(t.pieceNumPendingChunks(i)),
IterPendingChunks: p.iterUndirtiedChunks,
IterPendingChunks: p.undirtiedChunksIter(),
})
}
t.iterPeers(func(p *Peer) {
@ -81,17 +84,13 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input {
}
p.piecesReceivedSinceLastRequestUpdate = 0
rst.Peers = append(rst.Peers, request_strategy.Peer{
HasPiece: p.peerHasPiece,
MaxRequests: p.nominalMaxRequests(),
HasExistingRequest: func(r RequestIndex) bool {
return p.actualRequestState.Requests.Contains(r)
},
Choking: p.peerChoking,
PieceAllowedFast: func(i pieceIndex) bool {
return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
},
DownloadRate: p.downloadRate(),
Age: time.Since(p.completedHandshake),
Pieces: *p.newPeerPieces(),
MaxRequests: p.nominalMaxRequests(),
ExistingRequests: p.actualRequestState.Requests,
Choking: p.peerChoking,
PieceAllowedFast: p.peerAllowedFast,
DownloadRate: p.downloadRate(),
Age: time.Since(p.completedHandshake),
Id: peerId{
Peer: p,
ptr: uintptr(unsafe.Pointer(p)),
@ -107,12 +106,17 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input {
}
func (cl *Client) doRequests() {
nextPeerStates := request_strategy.Run(cl.getRequestStrategyInput())
input := cl.getRequestStrategyInput()
nextPeerStates := request_strategy.Run(input)
for p, state := range nextPeerStates {
setPeerNextRequestState(p, state)
}
}
func init() {
gob.Register(peerId{})
}
type peerId struct {
*Peer
ptr uintptr
@ -122,6 +126,31 @@ func (p peerId) Uintptr() uintptr {
return p.ptr
}
func (p peerId) GobEncode() (b []byte, _ error) {
*(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
Data: uintptr(unsafe.Pointer(&p.ptr)),
Len: int(unsafe.Sizeof(p.ptr)),
Cap: int(unsafe.Sizeof(p.ptr)),
}
return
}
func (p *peerId) GobDecode(b []byte) error {
if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
panic(len(b))
}
ptr := unsafe.Pointer(&b[0])
p.ptr = *(*uintptr)(ptr)
log.Printf("%p", ptr)
dst := reflect.SliceHeader{
Data: uintptr(unsafe.Pointer(&p.Peer)),
Len: int(unsafe.Sizeof(p.Peer)),
Cap: int(unsafe.Sizeof(p.Peer)),
}
copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
return nil
}
func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
p := _p.(peerId).Peer
p.nextRequestState = rp