diff --git a/client.go b/client.go index a351f049..b498339e 100644 --- a/client.go +++ b/client.go @@ -1484,6 +1484,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot connString: connString, conn: nc, } + c.initRequestState() // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses. if remoteAddr != nil { netipAddrPort, err := netip.ParseAddrPort(remoteAddr.String()) diff --git a/peerconn.go b/peerconn.go index 6364affb..a344934e 100644 --- a/peerconn.go +++ b/peerconn.go @@ -26,6 +26,7 @@ import ( "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" request_strategy "github.com/anacrolix/torrent/request-strategy" + "github.com/anacrolix/torrent/typed-roaring" ) type PeerSource string @@ -120,7 +121,7 @@ type Peer struct { peerMinPieces pieceIndex // Pieces we've accepted chunks for from the peer. peerTouchedPieces map[pieceIndex]struct{} - peerAllowedFast roaring.Bitmap + peerAllowedFast typedRoaring.Bitmap[pieceIndex] PeerMaxRequests maxRequests // Maximum pending requests the peer allows. PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber @@ -129,6 +130,18 @@ type Peer struct { logger log.Logger } +type peerRequests struct { + typedRoaring.Bitmap[RequestIndex] +} + +func (p *peerRequests) IterateSnapshot(f func(request_strategy.RequestIndex) bool) { + p.Clone().Iterate(f) +} + +func (p *Peer) initRequestState() { + p.requestState.Requests = &peerRequests{} +} + // Maintains the state of a BitTorrent-protocol based connection with a peer. type PeerConn struct { Peer @@ -189,11 +202,11 @@ func (cn *Peer) expectingChunks() bool { return true } haveAllowedFastRequests := false - cn.peerAllowedFast.Iterate(func(i uint32) bool { - haveAllowedFastRequests = roaringBitmapRangeCardinality( - &cn.requestState.Requests, - cn.t.pieceRequestIndexOffset(pieceIndex(i)), - cn.t.pieceRequestIndexOffset(pieceIndex(i+1)), + cn.peerAllowedFast.Iterate(func(i pieceIndex) bool { + haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex]( + cn.requestState.Requests, + cn.t.pieceRequestIndexOffset(i), + cn.t.pieceRequestIndexOffset(i+1), ) == 0 return !haveAllowedFastRequests }) @@ -201,7 +214,7 @@ func (cn *Peer) expectingChunks() bool { } func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool { - return cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(piece)) + return cn.peerChoking && !cn.peerAllowedFast.Contains(piece) } // Returns true if the connection is over IPv6. @@ -348,8 +361,8 @@ func (cn *Peer) downloadRate() float64 { func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) { ret = make(map[pieceIndex]int) - cn.requestState.Requests.Iterate(func(x uint32) bool { - ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++ + cn.requestState.Requests.Iterate(func(x RequestIndex) bool { + ret[cn.t.pieceIndexOfRequestIndex(x)]++ return true }) return @@ -597,7 +610,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error { if cn.t.pieceQueuedForHash(pi) { panic("piece is queued for hash") } - if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) { + if cn.peerChoking && !cn.peerAllowedFast.Contains(pi) { // This could occur if we made a request with the fast extension, and then got choked and // haven't had the request rejected yet. if !cn.requestState.Requests.Contains(r) { @@ -1152,13 +1165,7 @@ func (c *PeerConn) mainReadLoop() (err error) { break } if !c.fastEnabled() { - if !c.deleteAllRequests().IsEmpty() { - c.t.iterPeers(func(p *Peer) { - if p.isLowOnRequests() { - p.updateRequests("choked by non-fast PeerConn") - } - }) - } + c.deleteAllRequests("choked by non-fast PeerConn") } else { // We don't decrement pending requests here, let's wait for the peer to either // reject or satisfy the outstanding requests. Additionally, some peers may unchoke @@ -1178,8 +1185,8 @@ func (c *PeerConn) mainReadLoop() (err error) { } c.peerChoking = false preservedCount := 0 - c.requestState.Requests.Iterate(func(x uint32) bool { - if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) { + c.requestState.Requests.Iterate(func(x RequestIndex) bool { + if !c.peerAllowedFast.Contains(c.t.pieceIndexOfRequestIndex(x)) { preservedCount++ } return true @@ -1404,7 +1411,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { } c.decExpectedChunkReceive(req) - if c.peerChoking && c.peerAllowedFast.Contains(bitmap.BitIndex(ppReq.Index)) { + if c.peerChoking && c.peerAllowedFast.Contains(pieceIndex(ppReq.Index)) { chunksReceived.Add("due to allowed fast", 1) } @@ -1636,15 +1643,22 @@ func (c *Peer) deleteRequest(r RequestIndex) bool { return true } -func (c *Peer) deleteAllRequests() (deleted *roaring.Bitmap) { - deleted = c.requestState.Requests.Clone() - deleted.Iterate(func(x uint32) bool { +func (c *Peer) deleteAllRequests(reason string) { + if c.requestState.Requests.IsEmpty() { + return + } + c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool { if !c.deleteRequest(x) { panic("request should exist") } return true }) c.assertNoRequests() + c.t.iterPeers(func(p *Peer) { + if p.isLowOnRequests() { + p.updateRequests(reason) + } + }) return } @@ -1654,9 +1668,8 @@ func (c *Peer) assertNoRequests() { } } -func (c *Peer) cancelAllRequests() (cancelled *roaring.Bitmap) { - cancelled = c.requestState.Requests.Clone() - cancelled.Iterate(func(x uint32) bool { +func (c *Peer) cancelAllRequests() { + c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool { c.cancel(x) return true }) diff --git a/peerconn_test.go b/peerconn_test.go index b1ac1dbc..1bd9712b 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -221,6 +221,7 @@ func TestHaveAllThenBitfield(t *testing.T) { pc := PeerConn{ Peer: Peer{t: tt}, } + pc.initRequestState() pc.peerImpl = &pc tt.conns[&pc] = struct{}{} c.Assert(pc.onPeerSentHaveAll(), qt.IsNil) diff --git a/piece.go b/piece.go index 6842f1f0..44f2dd9d 100644 --- a/piece.go +++ b/piece.go @@ -5,12 +5,12 @@ import ( "fmt" "sync" - "github.com/RoaringBitmap/roaring" "github.com/anacrolix/chansync" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/storage" + "github.com/anacrolix/torrent/typed-roaring" ) type Piece struct { @@ -71,7 +71,7 @@ func (p *Piece) hasDirtyChunks() bool { } func (p *Piece) numDirtyChunks() chunkIndexType { - return chunkIndexType(roaringBitmapRangeCardinality( + return chunkIndexType(roaringBitmapRangeCardinality[RequestIndex]( &p.t.dirtyChunks, p.requestIndexOffset(), p.t.pieceRequestIndexOffset(p.index+1))) @@ -251,7 +251,7 @@ func init() { // Use an iterator to jump between dirty bits. type undirtiedChunksIter struct { - TorrentDirtyChunks *roaring.Bitmap + TorrentDirtyChunks *typedRoaring.Bitmap[RequestIndex] StartRequestIndex RequestIndex EndRequestIndex RequestIndex } diff --git a/request-strategy-impls.go b/request-strategy-impls.go index bf3e4083..0b05ed41 100644 --- a/request-strategy-impls.go +++ b/request-strategy-impls.go @@ -55,10 +55,6 @@ func (r requestStrategyTorrent) IgnorePiece(i int) bool { return false } -func (r requestStrategyTorrent) ChunksPerPiece() uint32 { - return r.t.chunksPerRegularPiece() -} - func (r requestStrategyTorrent) PieceLength() int64 { return r.t.info.PieceLength } diff --git a/request-strategy/order.go b/request-strategy/order.go index 09f1ff0c..a23d5117 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -11,8 +11,8 @@ import ( ) type ( - RequestIndex = uint32 - ChunkIndex = uint32 + RequestIndex uint32 + ChunkIndex = RequestIndex Request = types.Request pieceIndex = types.PieceIndex piecePriority = types.PiecePriority diff --git a/request-strategy/peer.go b/request-strategy/peer.go index d5366d76..8b711f3c 100644 --- a/request-strategy/peer.go +++ b/request-strategy/peer.go @@ -1,14 +1,34 @@ package request_strategy import ( - "github.com/RoaringBitmap/roaring" + "github.com/anacrolix/torrent/typed-roaring" ) type PeerRequestState struct { Interested bool // Expecting. TODO: This should be ordered so webseed requesters initiate in the same order they // were assigned. - Requests roaring.Bitmap + Requests PeerRequests // Cancelled and waiting response - Cancelled roaring.Bitmap + Cancelled typedRoaring.Bitmap[RequestIndex] +} + +// A set of request indices iterable by order added. +type PeerRequests interface { + // Can be more efficient than GetCardinality. + IsEmpty() bool + // See roaring.Bitmap.GetCardinality. + GetCardinality() uint64 + Contains(RequestIndex) bool + // Should not adjust iteration order if item already exists, although I don't think that usage + // exists. + Add(RequestIndex) + // See roaring.Bitmap.Rank. + Rank(RequestIndex) uint64 + // Must yield in order items were added. + Iterate(func(RequestIndex) bool) + // See roaring.Bitmap.CheckedRemove. + CheckedRemove(RequestIndex) bool + // Iterate a snapshot of the values. It is safe to mutate the underlying data structure. + IterateSnapshot(func(RequestIndex) bool) } diff --git a/request-strategy/torrent.go b/request-strategy/torrent.go index 2460f2e4..591ac363 100644 --- a/request-strategy/torrent.go +++ b/request-strategy/torrent.go @@ -2,6 +2,5 @@ package request_strategy type Torrent interface { IgnorePiece(int) bool - ChunksPerPiece() uint32 PieceLength() int64 } diff --git a/requesting.go b/requesting.go index f1642181..d19f7c6d 100644 --- a/requesting.go +++ b/requesting.go @@ -67,21 +67,21 @@ type ( chunkIndexType = request_strategy.ChunkIndex ) -type peerRequests struct { +type desiredPeerRequests struct { requestIndexes []RequestIndex peer *Peer } -func (p *peerRequests) Len() int { +func (p *desiredPeerRequests) Len() int { return len(p.requestIndexes) } -func (p *peerRequests) Less(i, j int) bool { +func (p *desiredPeerRequests) Less(i, j int) bool { leftRequest := p.requestIndexes[i] rightRequest := p.requestIndexes[j] t := p.peer.t - leftPieceIndex := leftRequest / t.chunksPerRegularPiece() - rightPieceIndex := rightRequest / t.chunksPerRegularPiece() + leftPieceIndex := t.pieceIndexOfRequestIndex(leftRequest) + rightPieceIndex := t.pieceIndexOfRequestIndex(rightRequest) ml := multiless.New() // Push requests that can't be served right now to the end. But we don't throw them away unless // there's a better alternative. This is for when we're using the fast extension and get choked @@ -92,8 +92,8 @@ func (p *peerRequests) Less(i, j int) bool { !p.peer.peerAllowedFast.Contains(rightPieceIndex), ) } - leftPiece := t.piece(int(leftPieceIndex)) - rightPiece := t.piece(int(rightPieceIndex)) + leftPiece := t.piece(leftPieceIndex) + rightPiece := t.piece(rightPieceIndex) // Putting this first means we can steal requests from lesser-performing peers for our first few // new requests. ml = ml.Int( @@ -133,15 +133,15 @@ func (p *peerRequests) Less(i, j int) bool { return ml.Less() } -func (p *peerRequests) Swap(i, j int) { +func (p *desiredPeerRequests) Swap(i, j int) { p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i] } -func (p *peerRequests) Push(x interface{}) { +func (p *desiredPeerRequests) Push(x interface{}) { p.requestIndexes = append(p.requestIndexes, x.(RequestIndex)) } -func (p *peerRequests) Pop() interface{} { +func (p *desiredPeerRequests) Pop() interface{} { last := len(p.requestIndexes) - 1 x := p.requestIndexes[last] p.requestIndexes = p.requestIndexes[:last] @@ -149,7 +149,7 @@ func (p *peerRequests) Pop() interface{} { } type desiredRequestState struct { - Requests peerRequests + Requests desiredPeerRequests Interested bool } @@ -161,7 +161,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { return } input := p.t.getRequestStrategyInput() - requestHeap := peerRequests{ + requestHeap := desiredPeerRequests{ peer: p, } request_strategy.GetRequestablePieces( @@ -174,7 +174,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { if !p.peerHasPiece(pieceIndex) { return } - allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex) + allowedFast := p.peerAllowedFast.Contains(pieceIndex) p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) { r := p.t.pieceRequestIndexOffset(pieceIndex) + ci if !allowedFast { diff --git a/roaring.go b/roaring.go index 6e5409ec..8e39416c 100644 --- a/roaring.go +++ b/roaring.go @@ -1,13 +1,13 @@ package torrent import ( - "github.com/RoaringBitmap/roaring" + "github.com/anacrolix/torrent/typed-roaring" ) // Return the number of bits set in the range. To do this we need the rank of the item before the // first, and the rank of the last item. An off-by-one minefield. Hopefully I haven't missed // something in roaring's API that provides this. -func roaringBitmapRangeCardinality(bm *roaring.Bitmap, start, end uint32) (card uint64) { +func roaringBitmapRangeCardinality[T typedRoaring.BitConstraint](bm interface{ Rank(T) uint64 }, start, end T) (card uint64) { card = bm.Rank(end - 1) if start != 0 { card -= bm.Rank(start - 1) diff --git a/torrent.go b/torrent.go index a0872a88..b54e8bbc 100644 --- a/torrent.go +++ b/torrent.go @@ -30,6 +30,7 @@ import ( "github.com/anacrolix/multiless" "github.com/anacrolix/sync" request_strategy "github.com/anacrolix/torrent/request-strategy" + typedRoaring "github.com/anacrolix/torrent/typed-roaring" "github.com/davecgh/go-spew/spew" "github.com/pion/datachannel" @@ -143,7 +144,7 @@ type Torrent struct { pendingRequests map[RequestIndex]*Peer lastRequested map[RequestIndex]time.Time // Chunks we've written to since the corresponding piece was last checked. - dirtyChunks roaring.Bitmap + dirtyChunks typedRoaring.Bitmap[RequestIndex] pex pexState @@ -919,15 +920,15 @@ func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType { return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize) } -func (t *Torrent) chunksPerRegularPiece() uint32 { - return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize) +func (t *Torrent) chunksPerRegularPiece() chunkIndexType { + return chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize) } func (t *Torrent) numRequests() RequestIndex { if t.numPieces() == 0 { return 0 } - return uint32(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1) + return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1) } func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) { @@ -1173,7 +1174,7 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) { if !c.peerHasPiece(piece) { return } - if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(uint32(piece)) { + if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) { return } c.updateRequests(reason) @@ -1461,13 +1462,7 @@ func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) { } } torrent.Add("deleted connections", 1) - if !c.deleteAllRequests().IsEmpty() { - t.iterPeers(func(p *Peer) { - if p.isLowOnRequests() { - p.updateRequests("Torrent.deletePeerConn") - } - }) - } + c.deleteAllRequests("Torrent.deletePeerConn") t.assertPendingRequests() if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 { panic(t.connsWithAllPieces) @@ -2408,6 +2403,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) { activeRequests: make(map[Request]webseed.Request, maxRequests), maxRequests: maxRequests, } + ws.peer.initRequestState() for _, opt := range opts { opt(&ws.client) } @@ -2445,7 +2441,7 @@ func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request { } func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex { - return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + uint32(r.Begin/t.chunkSize) + return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize) } func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex { diff --git a/typed-roaring/bitmap.go b/typed-roaring/bitmap.go new file mode 100644 index 00000000..d9e7ce99 --- /dev/null +++ b/typed-roaring/bitmap.go @@ -0,0 +1,47 @@ +package typedRoaring + +import ( + "github.com/RoaringBitmap/roaring" +) + +type Bitmap[T BitConstraint] struct { + roaring.Bitmap +} + +func (me *Bitmap[T]) Contains(x T) bool { + return me.Bitmap.Contains(uint32(x)) +} + +func (me Bitmap[T]) Iterate(f func(x T) bool) { + me.Bitmap.Iterate(func(x uint32) bool { + return f(T(x)) + }) +} + +func (me *Bitmap[T]) Add(x T) { + me.Bitmap.Add(uint32(x)) +} + +func (me *Bitmap[T]) Rank(x T) uint64 { + return me.Bitmap.Rank(uint32(x)) +} + +func (me *Bitmap[T]) CheckedRemove(x T) bool { + return me.Bitmap.CheckedRemove(uint32(x)) +} + +func (me *Bitmap[T]) Clone() Bitmap[T] { + return Bitmap[T]{*me.Bitmap.Clone()} +} + +func (me *Bitmap[T]) CheckedAdd(x T) bool { + return me.Bitmap.CheckedAdd(uint32(x)) +} + +func (me *Bitmap[T]) Remove(x T) { + me.Bitmap.Remove(uint32(x)) +} + +func (me *Bitmap[T]) Iterator() Iterator[T] { + return Iterator[T]{me.Bitmap.Iterator()} +} diff --git a/typed-roaring/constraints.go b/typed-roaring/constraints.go new file mode 100644 index 00000000..d6e191f7 --- /dev/null +++ b/typed-roaring/constraints.go @@ -0,0 +1,5 @@ +package typedRoaring + +type BitConstraint interface { + ~int | ~uint32 +} diff --git a/typed-roaring/iterator.go b/typed-roaring/iterator.go new file mode 100644 index 00000000..359b7ffb --- /dev/null +++ b/typed-roaring/iterator.go @@ -0,0 +1,17 @@ +package typedRoaring + +import ( + "github.com/RoaringBitmap/roaring" +) + +type Iterator[T BitConstraint] struct { + roaring.IntPeekable +} + +func (t Iterator[T]) Next() T { + return T(t.IntPeekable.Next()) +} + +func (t Iterator[T]) AdvanceIfNeeded(minVal T) { + t.IntPeekable.AdvanceIfNeeded(uint32(minVal)) +} diff --git a/undirtied-chunks-iter_test.go b/undirtied-chunks-iter_test.go index d9b12df4..e811410e 100644 --- a/undirtied-chunks-iter_test.go +++ b/undirtied-chunks-iter_test.go @@ -3,11 +3,11 @@ package torrent import ( "testing" - "github.com/RoaringBitmap/roaring" + typedRoaring "github.com/anacrolix/torrent/typed-roaring" ) func BenchmarkUndirtiedChunksIter(b *testing.B) { - var bitmap roaring.Bitmap + var bitmap typedRoaring.Bitmap[RequestIndex] a := undirtiedChunksIter{ TorrentDirtyChunks: &bitmap, StartRequestIndex: 69, diff --git a/webseed-peer.go b/webseed-peer.go index 0cdf4cac..40110094 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -86,7 +86,7 @@ func (ws *webseedPeer) requester(i int) { start: for !ws.peer.closed.IsSet() { restart := false - ws.peer.requestState.Requests.Iterate(func(x uint32) bool { + ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool { r := ws.peer.t.requestIndexToRequest(x) if _, ok := ws.activeRequests[r]; ok { return true