Add and use typed roaring bitmap

This commit is contained in:
Matt Joiner 2022-05-09 11:34:08 +10:00
parent d5d940e643
commit 9a9c7dee00
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
16 changed files with 165 additions and 70 deletions

View File

@ -1484,6 +1484,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot
connString: connString,
conn: nc,
}
c.initRequestState()
// TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
if remoteAddr != nil {
netipAddrPort, err := netip.ParseAddrPort(remoteAddr.String())

View File

@ -26,6 +26,7 @@ import (
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/typed-roaring"
)
type PeerSource string
@ -120,7 +121,7 @@ type Peer struct {
peerMinPieces pieceIndex
// Pieces we've accepted chunks for from the peer.
peerTouchedPieces map[pieceIndex]struct{}
peerAllowedFast roaring.Bitmap
peerAllowedFast typedRoaring.Bitmap[pieceIndex]
PeerMaxRequests maxRequests // Maximum pending requests the peer allows.
PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
@ -129,6 +130,18 @@ type Peer struct {
logger log.Logger
}
type peerRequests struct {
typedRoaring.Bitmap[RequestIndex]
}
func (p *peerRequests) IterateSnapshot(f func(request_strategy.RequestIndex) bool) {
p.Clone().Iterate(f)
}
func (p *Peer) initRequestState() {
p.requestState.Requests = &peerRequests{}
}
// Maintains the state of a BitTorrent-protocol based connection with a peer.
type PeerConn struct {
Peer
@ -189,11 +202,11 @@ func (cn *Peer) expectingChunks() bool {
return true
}
haveAllowedFastRequests := false
cn.peerAllowedFast.Iterate(func(i uint32) bool {
haveAllowedFastRequests = roaringBitmapRangeCardinality(
&cn.requestState.Requests,
cn.t.pieceRequestIndexOffset(pieceIndex(i)),
cn.t.pieceRequestIndexOffset(pieceIndex(i+1)),
cn.peerAllowedFast.Iterate(func(i pieceIndex) bool {
haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex](
cn.requestState.Requests,
cn.t.pieceRequestIndexOffset(i),
cn.t.pieceRequestIndexOffset(i+1),
) == 0
return !haveAllowedFastRequests
})
@ -201,7 +214,7 @@ func (cn *Peer) expectingChunks() bool {
}
func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
return cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(piece))
return cn.peerChoking && !cn.peerAllowedFast.Contains(piece)
}
// Returns true if the connection is over IPv6.
@ -348,8 +361,8 @@ func (cn *Peer) downloadRate() float64 {
func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
ret = make(map[pieceIndex]int)
cn.requestState.Requests.Iterate(func(x uint32) bool {
ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++
cn.requestState.Requests.Iterate(func(x RequestIndex) bool {
ret[cn.t.pieceIndexOfRequestIndex(x)]++
return true
})
return
@ -597,7 +610,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error {
if cn.t.pieceQueuedForHash(pi) {
panic("piece is queued for hash")
}
if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) {
if cn.peerChoking && !cn.peerAllowedFast.Contains(pi) {
// This could occur if we made a request with the fast extension, and then got choked and
// haven't had the request rejected yet.
if !cn.requestState.Requests.Contains(r) {
@ -1152,13 +1165,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
break
}
if !c.fastEnabled() {
if !c.deleteAllRequests().IsEmpty() {
c.t.iterPeers(func(p *Peer) {
if p.isLowOnRequests() {
p.updateRequests("choked by non-fast PeerConn")
}
})
}
c.deleteAllRequests("choked by non-fast PeerConn")
} else {
// We don't decrement pending requests here, let's wait for the peer to either
// reject or satisfy the outstanding requests. Additionally, some peers may unchoke
@ -1178,8 +1185,8 @@ func (c *PeerConn) mainReadLoop() (err error) {
}
c.peerChoking = false
preservedCount := 0
c.requestState.Requests.Iterate(func(x uint32) bool {
if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) {
c.requestState.Requests.Iterate(func(x RequestIndex) bool {
if !c.peerAllowedFast.Contains(c.t.pieceIndexOfRequestIndex(x)) {
preservedCount++
}
return true
@ -1404,7 +1411,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
}
c.decExpectedChunkReceive(req)
if c.peerChoking && c.peerAllowedFast.Contains(bitmap.BitIndex(ppReq.Index)) {
if c.peerChoking && c.peerAllowedFast.Contains(pieceIndex(ppReq.Index)) {
chunksReceived.Add("due to allowed fast", 1)
}
@ -1636,15 +1643,22 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
return true
}
func (c *Peer) deleteAllRequests() (deleted *roaring.Bitmap) {
deleted = c.requestState.Requests.Clone()
deleted.Iterate(func(x uint32) bool {
func (c *Peer) deleteAllRequests(reason string) {
if c.requestState.Requests.IsEmpty() {
return
}
c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
if !c.deleteRequest(x) {
panic("request should exist")
}
return true
})
c.assertNoRequests()
c.t.iterPeers(func(p *Peer) {
if p.isLowOnRequests() {
p.updateRequests(reason)
}
})
return
}
@ -1654,9 +1668,8 @@ func (c *Peer) assertNoRequests() {
}
}
func (c *Peer) cancelAllRequests() (cancelled *roaring.Bitmap) {
cancelled = c.requestState.Requests.Clone()
cancelled.Iterate(func(x uint32) bool {
func (c *Peer) cancelAllRequests() {
c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
c.cancel(x)
return true
})

View File

@ -221,6 +221,7 @@ func TestHaveAllThenBitfield(t *testing.T) {
pc := PeerConn{
Peer: Peer{t: tt},
}
pc.initRequestState()
pc.peerImpl = &pc
tt.conns[&pc] = struct{}{}
c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)

View File

@ -5,12 +5,12 @@ import (
"fmt"
"sync"
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/chansync"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/typed-roaring"
)
type Piece struct {
@ -71,7 +71,7 @@ func (p *Piece) hasDirtyChunks() bool {
}
func (p *Piece) numDirtyChunks() chunkIndexType {
return chunkIndexType(roaringBitmapRangeCardinality(
return chunkIndexType(roaringBitmapRangeCardinality[RequestIndex](
&p.t.dirtyChunks,
p.requestIndexOffset(),
p.t.pieceRequestIndexOffset(p.index+1)))
@ -251,7 +251,7 @@ func init() {
// Use an iterator to jump between dirty bits.
type undirtiedChunksIter struct {
TorrentDirtyChunks *roaring.Bitmap
TorrentDirtyChunks *typedRoaring.Bitmap[RequestIndex]
StartRequestIndex RequestIndex
EndRequestIndex RequestIndex
}

View File

@ -55,10 +55,6 @@ func (r requestStrategyTorrent) IgnorePiece(i int) bool {
return false
}
func (r requestStrategyTorrent) ChunksPerPiece() uint32 {
return r.t.chunksPerRegularPiece()
}
func (r requestStrategyTorrent) PieceLength() int64 {
return r.t.info.PieceLength
}

View File

@ -11,8 +11,8 @@ import (
)
type (
RequestIndex = uint32
ChunkIndex = uint32
RequestIndex uint32
ChunkIndex = RequestIndex
Request = types.Request
pieceIndex = types.PieceIndex
piecePriority = types.PiecePriority

View File

@ -1,14 +1,34 @@
package request_strategy
import (
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/torrent/typed-roaring"
)
type PeerRequestState struct {
Interested bool
// Expecting. TODO: This should be ordered so webseed requesters initiate in the same order they
// were assigned.
Requests roaring.Bitmap
Requests PeerRequests
// Cancelled and waiting response
Cancelled roaring.Bitmap
Cancelled typedRoaring.Bitmap[RequestIndex]
}
// A set of request indices iterable by order added.
type PeerRequests interface {
// Can be more efficient than GetCardinality.
IsEmpty() bool
// See roaring.Bitmap.GetCardinality.
GetCardinality() uint64
Contains(RequestIndex) bool
// Should not adjust iteration order if item already exists, although I don't think that usage
// exists.
Add(RequestIndex)
// See roaring.Bitmap.Rank.
Rank(RequestIndex) uint64
// Must yield in order items were added.
Iterate(func(RequestIndex) bool)
// See roaring.Bitmap.CheckedRemove.
CheckedRemove(RequestIndex) bool
// Iterate a snapshot of the values. It is safe to mutate the underlying data structure.
IterateSnapshot(func(RequestIndex) bool)
}

View File

@ -2,6 +2,5 @@ package request_strategy
type Torrent interface {
IgnorePiece(int) bool
ChunksPerPiece() uint32
PieceLength() int64
}

View File

@ -67,21 +67,21 @@ type (
chunkIndexType = request_strategy.ChunkIndex
)
type peerRequests struct {
type desiredPeerRequests struct {
requestIndexes []RequestIndex
peer *Peer
}
func (p *peerRequests) Len() int {
func (p *desiredPeerRequests) Len() int {
return len(p.requestIndexes)
}
func (p *peerRequests) Less(i, j int) bool {
func (p *desiredPeerRequests) Less(i, j int) bool {
leftRequest := p.requestIndexes[i]
rightRequest := p.requestIndexes[j]
t := p.peer.t
leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
leftPieceIndex := t.pieceIndexOfRequestIndex(leftRequest)
rightPieceIndex := t.pieceIndexOfRequestIndex(rightRequest)
ml := multiless.New()
// Push requests that can't be served right now to the end. But we don't throw them away unless
// there's a better alternative. This is for when we're using the fast extension and get choked
@ -92,8 +92,8 @@ func (p *peerRequests) Less(i, j int) bool {
!p.peer.peerAllowedFast.Contains(rightPieceIndex),
)
}
leftPiece := t.piece(int(leftPieceIndex))
rightPiece := t.piece(int(rightPieceIndex))
leftPiece := t.piece(leftPieceIndex)
rightPiece := t.piece(rightPieceIndex)
// Putting this first means we can steal requests from lesser-performing peers for our first few
// new requests.
ml = ml.Int(
@ -133,15 +133,15 @@ func (p *peerRequests) Less(i, j int) bool {
return ml.Less()
}
func (p *peerRequests) Swap(i, j int) {
func (p *desiredPeerRequests) Swap(i, j int) {
p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
}
func (p *peerRequests) Push(x interface{}) {
func (p *desiredPeerRequests) Push(x interface{}) {
p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
}
func (p *peerRequests) Pop() interface{} {
func (p *desiredPeerRequests) Pop() interface{} {
last := len(p.requestIndexes) - 1
x := p.requestIndexes[last]
p.requestIndexes = p.requestIndexes[:last]
@ -149,7 +149,7 @@ func (p *peerRequests) Pop() interface{} {
}
type desiredRequestState struct {
Requests peerRequests
Requests desiredPeerRequests
Interested bool
}
@ -161,7 +161,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
return
}
input := p.t.getRequestStrategyInput()
requestHeap := peerRequests{
requestHeap := desiredPeerRequests{
peer: p,
}
request_strategy.GetRequestablePieces(
@ -174,7 +174,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
if !p.peerHasPiece(pieceIndex) {
return
}
allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
allowedFast := p.peerAllowedFast.Contains(pieceIndex)
p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) {
r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
if !allowedFast {

View File

@ -1,13 +1,13 @@
package torrent
import (
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/torrent/typed-roaring"
)
// Return the number of bits set in the range. To do this we need the rank of the item before the
// first, and the rank of the last item. An off-by-one minefield. Hopefully I haven't missed
// something in roaring's API that provides this.
func roaringBitmapRangeCardinality(bm *roaring.Bitmap, start, end uint32) (card uint64) {
func roaringBitmapRangeCardinality[T typedRoaring.BitConstraint](bm interface{ Rank(T) uint64 }, start, end T) (card uint64) {
card = bm.Rank(end - 1)
if start != 0 {
card -= bm.Rank(start - 1)

View File

@ -30,6 +30,7 @@ import (
"github.com/anacrolix/multiless"
"github.com/anacrolix/sync"
request_strategy "github.com/anacrolix/torrent/request-strategy"
typedRoaring "github.com/anacrolix/torrent/typed-roaring"
"github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel"
@ -143,7 +144,7 @@ type Torrent struct {
pendingRequests map[RequestIndex]*Peer
lastRequested map[RequestIndex]time.Time
// Chunks we've written to since the corresponding piece was last checked.
dirtyChunks roaring.Bitmap
dirtyChunks typedRoaring.Bitmap[RequestIndex]
pex pexState
@ -919,15 +920,15 @@ func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
}
func (t *Torrent) chunksPerRegularPiece() uint32 {
return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
return chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
}
func (t *Torrent) numRequests() RequestIndex {
if t.numPieces() == 0 {
return 0
}
return uint32(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
}
func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
@ -1173,7 +1174,7 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
if !c.peerHasPiece(piece) {
return
}
if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(uint32(piece)) {
if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
return
}
c.updateRequests(reason)
@ -1461,13 +1462,7 @@ func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
}
}
torrent.Add("deleted connections", 1)
if !c.deleteAllRequests().IsEmpty() {
t.iterPeers(func(p *Peer) {
if p.isLowOnRequests() {
p.updateRequests("Torrent.deletePeerConn")
}
})
}
c.deleteAllRequests("Torrent.deletePeerConn")
t.assertPendingRequests()
if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
panic(t.connsWithAllPieces)
@ -2408,6 +2403,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
activeRequests: make(map[Request]webseed.Request, maxRequests),
maxRequests: maxRequests,
}
ws.peer.initRequestState()
for _, opt := range opts {
opt(&ws.client)
}
@ -2445,7 +2441,7 @@ func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
}
func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + uint32(r.Begin/t.chunkSize)
return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
}
func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {

47
typed-roaring/bitmap.go Normal file
View File

@ -0,0 +1,47 @@
package typedRoaring
import (
"github.com/RoaringBitmap/roaring"
)
type Bitmap[T BitConstraint] struct {
roaring.Bitmap
}
func (me *Bitmap[T]) Contains(x T) bool {
return me.Bitmap.Contains(uint32(x))
}
func (me Bitmap[T]) Iterate(f func(x T) bool) {
me.Bitmap.Iterate(func(x uint32) bool {
return f(T(x))
})
}
func (me *Bitmap[T]) Add(x T) {
me.Bitmap.Add(uint32(x))
}
func (me *Bitmap[T]) Rank(x T) uint64 {
return me.Bitmap.Rank(uint32(x))
}
func (me *Bitmap[T]) CheckedRemove(x T) bool {
return me.Bitmap.CheckedRemove(uint32(x))
}
func (me *Bitmap[T]) Clone() Bitmap[T] {
return Bitmap[T]{*me.Bitmap.Clone()}
}
func (me *Bitmap[T]) CheckedAdd(x T) bool {
return me.Bitmap.CheckedAdd(uint32(x))
}
func (me *Bitmap[T]) Remove(x T) {
me.Bitmap.Remove(uint32(x))
}
func (me *Bitmap[T]) Iterator() Iterator[T] {
return Iterator[T]{me.Bitmap.Iterator()}
}

View File

@ -0,0 +1,5 @@
package typedRoaring
type BitConstraint interface {
~int | ~uint32
}

17
typed-roaring/iterator.go Normal file
View File

@ -0,0 +1,17 @@
package typedRoaring
import (
"github.com/RoaringBitmap/roaring"
)
type Iterator[T BitConstraint] struct {
roaring.IntPeekable
}
func (t Iterator[T]) Next() T {
return T(t.IntPeekable.Next())
}
func (t Iterator[T]) AdvanceIfNeeded(minVal T) {
t.IntPeekable.AdvanceIfNeeded(uint32(minVal))
}

View File

@ -3,11 +3,11 @@ package torrent
import (
"testing"
"github.com/RoaringBitmap/roaring"
typedRoaring "github.com/anacrolix/torrent/typed-roaring"
)
func BenchmarkUndirtiedChunksIter(b *testing.B) {
var bitmap roaring.Bitmap
var bitmap typedRoaring.Bitmap[RequestIndex]
a := undirtiedChunksIter{
TorrentDirtyChunks: &bitmap,
StartRequestIndex: 69,

View File

@ -86,7 +86,7 @@ func (ws *webseedPeer) requester(i int) {
start:
for !ws.peer.closed.IsSet() {
restart := false
ws.peer.requestState.Requests.Iterate(func(x uint32) bool {
ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
r := ws.peer.t.requestIndexToRequest(x)
if _, ok := ws.activeRequests[r]; ok {
return true