Use rejiggered missinggo bitmap

This commit is contained in:
Matt Joiner 2021-05-20 11:16:54 +10:00
parent a26e7f9933
commit 95d808d3c5
8 changed files with 46 additions and 42 deletions

View File

@ -18,10 +18,10 @@ import (
"github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/dht/v2/krpc"
"github.com/anacrolix/log" "github.com/anacrolix/log"
"github.com/anacrolix/missinggo/bitmap"
"github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/missinggo/slices" "github.com/anacrolix/missinggo/slices"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync" "github.com/anacrolix/sync"
"github.com/anacrolix/torrent/internal/limiter" "github.com/anacrolix/torrent/internal/limiter"
@ -1007,7 +1007,7 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
if conn.fastEnabled() { if conn.fastEnabled() {
if torrent.haveAllPieces() { if torrent.haveAllPieces() {
conn.write(pp.Message{Type: pp.HaveAll}) conn.write(pp.Message{Type: pp.HaveAll})
conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces())) conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
return return
} else if !torrent.haveAnyPieces() { } else if !torrent.haveAnyPieces() {
conn.write(pp.Message{Type: pp.HaveNone}) conn.write(pp.Message{Type: pp.HaveNone})

12
file.go
View File

@ -66,20 +66,20 @@ func fileBytesLeft(
switch numPiecesSpanned { switch numPiecesSpanned {
case 0: case 0:
case 1: case 1:
if !torrentCompletedPieces.Get(fileFirstPieceIndex) { if !torrentCompletedPieces.Get(bitmap.BitIndex(fileFirstPieceIndex)) {
left += fileLength left += fileLength
} }
default: default:
if !torrentCompletedPieces.Get(fileFirstPieceIndex) { if !torrentCompletedPieces.Get(bitmap.BitIndex(fileFirstPieceIndex)) {
left += torrentUsualPieceSize - (fileTorrentOffset % torrentUsualPieceSize) left += torrentUsualPieceSize - (fileTorrentOffset % torrentUsualPieceSize)
} }
if !torrentCompletedPieces.Get(fileEndPieceIndex - 1) { if !torrentCompletedPieces.Get(bitmap.BitIndex(fileEndPieceIndex - 1)) {
left += fileTorrentOffset + fileLength - int64(fileEndPieceIndex-1)*torrentUsualPieceSize left += fileTorrentOffset + fileLength - int64(fileEndPieceIndex-1)*torrentUsualPieceSize
} }
completedMiddlePieces := torrentCompletedPieces.Copy() completedMiddlePieces := torrentCompletedPieces.Copy()
completedMiddlePieces.RemoveRange(0, fileFirstPieceIndex+1) completedMiddlePieces.RemoveRange(0, bitmap.BitRange(fileFirstPieceIndex+1))
completedMiddlePieces.RemoveRange(fileEndPieceIndex-1, bitmap.ToEnd) completedMiddlePieces.RemoveRange(bitmap.BitRange(fileEndPieceIndex-1), bitmap.ToEnd)
left += int64(numPiecesSpanned-2-completedMiddlePieces.Len()) * torrentUsualPieceSize left += int64(numPiecesSpanned-2-pieceIndex(completedMiddlePieces.Len())) * torrentUsualPieceSize
} }
return return
} }

View File

@ -232,7 +232,7 @@ func (cn *Peer) peerHasAllPieces() (all bool, known bool) {
if !cn.t.haveInfo() { if !cn.t.haveInfo() {
return false, false return false, false
} }
return bitmap.Flip(cn._peerPieces, 0, bitmap.BitIndex(cn.t.numPieces())).IsEmpty(), true return bitmap.Flip(cn._peerPieces, 0, bitmap.BitRange(cn.t.numPieces())).IsEmpty(), true
} }
func (cn *PeerConn) locker() *lockWithDeferreds { func (cn *PeerConn) locker() *lockWithDeferreds {
@ -267,7 +267,7 @@ func (cn *PeerConn) onGotInfo(info *metainfo.Info) {
// Correct the PeerPieces slice length. Return false if the existing slice is invalid, such as by // Correct the PeerPieces slice length. Return false if the existing slice is invalid, such as by
// receiving badly sized BITFIELD, or invalid HAVE messages. // receiving badly sized BITFIELD, or invalid HAVE messages.
func (cn *PeerConn) setNumPieces(num pieceIndex) { func (cn *PeerConn) setNumPieces(num pieceIndex) {
cn._peerPieces.RemoveRange(bitmap.BitIndex(num), bitmap.ToEnd) cn._peerPieces.RemoveRange(bitmap.BitRange(num), bitmap.ToEnd)
cn.peerPiecesChanged() cn.peerPiecesChanged()
} }
@ -730,10 +730,10 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
if !iter.All( if !iter.All(
func(_i interface{}) bool { func(_i interface{}) bool {
i := _i.(int) i := _i.(int)
if skip.Contains(i) { if skip.Contains(bitmap.BitIndex(i)) {
return true return true
} }
skip.Add(i) skip.Add(bitmap.BitIndex(i))
return cb(i) return cb(i)
}, },
bm.Iter, bm.Iter,
@ -746,7 +746,7 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
// check callers updaterequests // check callers updaterequests
func (cn *Peer) stopRequestingPiece(piece pieceIndex) bool { func (cn *Peer) stopRequestingPiece(piece pieceIndex) bool {
return cn._pieceRequestOrder.Remove(bitmap.BitIndex(piece)) return cn._pieceRequestOrder.Remove(piece)
} }
// This is distinct from Torrent piece priority, which is the user's // This is distinct from Torrent piece priority, which is the user's
@ -762,7 +762,7 @@ func (cn *Peer) updatePiecePriority(piece pieceIndex) bool {
return cn.stopRequestingPiece(piece) return cn.stopRequestingPiece(piece)
} }
prio := cn.getPieceInclination()[piece] prio := cn.getPieceInclination()[piece]
return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) return cn._pieceRequestOrder.Set(piece, prio)
} }
func (cn *Peer) getPieceInclination() []int { func (cn *Peer) getPieceInclination() []int {
@ -835,15 +835,15 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error {
for i, have := range bf { for i, have := range bf {
if have { if have {
cn.raisePeerMinPieces(pieceIndex(i) + 1) cn.raisePeerMinPieces(pieceIndex(i) + 1)
if !pp.Contains(i) { if !pp.Contains(bitmap.BitIndex(i)) {
cn.t.incPieceAvailability(i) cn.t.incPieceAvailability(i)
} }
} else { } else {
if pp.Contains(i) { if pp.Contains(bitmap.BitIndex(i)) {
cn.t.decPieceAvailability(i) cn.t.decPieceAvailability(i)
} }
} }
cn._peerPieces.Set(i, have) cn._peerPieces.Set(bitmap.BitIndex(i), have)
} }
cn.peerPiecesChanged() cn.peerPiecesChanged()
return nil return nil
@ -854,7 +854,7 @@ func (cn *Peer) onPeerHasAllPieces() {
if t.haveInfo() { if t.haveInfo() {
pp := cn.newPeerPieces() pp := cn.newPeerPieces()
for i := range iter.N(t.numPieces()) { for i := range iter.N(t.numPieces()) {
if !pp.Contains(i) { if !pp.Contains(bitmap.BitIndex(i)) {
t.incPieceAvailability(i) t.incPieceAvailability(i)
} }
} }
@ -1198,7 +1198,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
case pp.AllowedFast: case pp.AllowedFast:
torrent.Add("allowed fasts received", 1) torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger) log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
c.peerAllowedFast.Add(int(msg.Index)) c.peerAllowedFast.Add(bitmap.BitIndex(msg.Index))
c.updateRequests() c.updateRequests()
case pp.Extended: case pp.Extended:
err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload) err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
@ -1326,7 +1326,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
} }
c.decExpectedChunkReceive(req) c.decExpectedChunkReceive(req)
if c.peerChoking && c.peerAllowedFast.Get(int(req.Index)) { if c.peerChoking && c.peerAllowedFast.Get(bitmap.BitIndex(req.Index)) {
chunksReceived.Add("due to allowed fast", 1) chunksReceived.Add("due to allowed fast", 1)
} }
@ -1658,11 +1658,12 @@ func (cn *PeerConn) PeerPieces() bitmap.Bitmap {
return cn.newPeerPieces() return cn.newPeerPieces()
} }
// Returns a new Bitmap that includes bits for all pieces we have. // Returns a new Bitmap that includes bits for all pieces the peer claims to have.
func (cn *Peer) newPeerPieces() bitmap.Bitmap { func (cn *Peer) newPeerPieces() bitmap.Bitmap {
ret := cn._peerPieces.Copy() ret := cn._peerPieces.Copy()
if cn.peerSentHaveAll { if cn.peerSentHaveAll {
ret.AddRange(0, cn.t.numPieces())
ret.AddRange(0, bitmap.BitRange(cn.t.numPieces()))
} }
return ret return ret
} }

View File

@ -53,7 +53,7 @@ func (p *Piece) Storage() storage.Piece {
} }
func (p *Piece) pendingChunkIndex(chunkIndex int) bool { func (p *Piece) pendingChunkIndex(chunkIndex int) bool {
return !p._dirtyChunks.Contains(chunkIndex) return !p._dirtyChunks.Contains(bitmap.BitIndex(chunkIndex))
} }
func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool { func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool {
@ -69,12 +69,12 @@ func (p *Piece) numDirtyChunks() pp.Integer {
} }
func (p *Piece) unpendChunkIndex(i int) { func (p *Piece) unpendChunkIndex(i int) {
p._dirtyChunks.Add(i) p._dirtyChunks.Add(bitmap.BitIndex(i))
p.t.tickleReaders() p.t.tickleReaders()
} }
func (p *Piece) pendChunkIndex(i int) { func (p *Piece) pendChunkIndex(i int) {
p._dirtyChunks.Remove(i) p._dirtyChunks.Remove(bitmap.BitIndex(i))
} }
func (p *Piece) numChunks() pp.Integer { func (p *Piece) numChunks() pp.Integer {
@ -199,7 +199,7 @@ func (p *Piece) purePriority() (ret piecePriority) {
for _, f := range p.files { for _, f := range p.files {
ret.Raise(f.prio) ret.Raise(f.prio)
} }
if p.t.readerNowPieces().Contains(int(p.index)) { if p.t.readerNowPieces().Contains(bitmap.BitIndex(p.index)) {
ret.Raise(PiecePriorityNow) ret.Raise(PiecePriorityNow)
} }
// if t._readerNowPieces.Contains(piece - 1) { // if t._readerNowPieces.Contains(piece - 1) {
@ -234,7 +234,7 @@ func (p *Piece) completion() (ret storage.Completion) {
} }
func (p *Piece) allChunksDirty() bool { func (p *Piece) allChunksDirty() bool {
return p._dirtyChunks.Len() == int(p.numChunks()) return p._dirtyChunks.Len() == bitmap.BitRange(p.numChunks())
} }
func (p *Piece) dirtyChunks() bitmap.Bitmap { func (p *Piece) dirtyChunks() bitmap.Bitmap {

View File

@ -4,6 +4,7 @@ import (
"time" "time"
"unsafe" "unsafe"
"github.com/anacrolix/missinggo/v2/bitmap"
request_strategy "github.com/anacrolix/torrent/request-strategy" request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/types" "github.com/anacrolix/torrent/types"
) )
@ -66,7 +67,7 @@ func (cl *Client) doRequests() {
}, },
Choking: p.peerChoking, Choking: p.peerChoking,
PieceAllowedFast: func(i pieceIndex) bool { PieceAllowedFast: func(i pieceIndex) bool {
return p.peerAllowedFast.Contains(i) return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
}, },
DownloadRate: p.downloadRate(), DownloadRate: p.downloadRate(),
Age: time.Since(p.completedHandshake), Age: time.Since(p.completedHandshake),

View File

@ -12,6 +12,7 @@ import (
"testing/iotest" "testing/iotest"
"time" "time"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/filecache" "github.com/anacrolix/missinggo/v2/filecache"
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/internal/testutil" "github.com/anacrolix/torrent/internal/testutil"
@ -168,7 +169,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
for _, pc := range leecherPeerConns { for _, pc := range leecherPeerConns {
completed := pc.PeerPieces().Len() completed := pc.PeerPieces().Len()
t.Logf("peer conn %v has %v completed pieces", pc, completed) t.Logf("peer conn %v has %v completed pieces", pc, completed)
if completed == leecherTorrent.Info().NumPieces() { if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) {
foundSeeder = true foundSeeder = true
} }
} }

View File

@ -759,7 +759,7 @@ func (t *Torrent) bytesMissingLocked() int64 {
} }
func (t *Torrent) bytesLeft() (left int64) { func (t *Torrent) bytesLeft() (left int64) {
bitmap.Flip(t._completedPieces, 0, bitmap.BitIndex(t.numPieces())).IterTyped(func(piece int) bool { bitmap.Flip(t._completedPieces, 0, bitmap.BitRange(t.numPieces())).IterTyped(func(piece int) bool {
p := &t.pieces[piece] p := &t.pieces[piece]
left += int64(p.length() - p.numDirtyBytes()) left += int64(p.length() - p.numDirtyBytes())
return true return true
@ -794,8 +794,8 @@ func (t *Torrent) numPieces() pieceIndex {
return pieceIndex(t.info.NumPieces()) return pieceIndex(t.info.NumPieces())
} }
func (t *Torrent) numPiecesCompleted() (num int) { func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
return t._completedPieces.Len() return pieceIndex(t._completedPieces.Len())
} }
func (t *Torrent) close() (err error) { func (t *Torrent) close() (err error) {
@ -895,7 +895,7 @@ func (t *Torrent) haveAllPieces() bool {
if !t.haveInfo() { if !t.haveInfo() {
return false return false
} }
return t._completedPieces.Len() == bitmap.BitIndex(t.numPieces()) return t._completedPieces.Len() == bitmap.BitRange(t.numPieces())
} }
func (t *Torrent) havePiece(index pieceIndex) bool { func (t *Torrent) havePiece(index pieceIndex) bool {
@ -960,7 +960,7 @@ func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
if t.pieceComplete(index) { if t.pieceComplete(index) {
return false return false
} }
if t._pendingPieces.Contains(bitmap.BitIndex(index)) { if t._pendingPieces.Contains(int(index)) {
return true return true
} }
// t.logger.Printf("piece %d not pending", index) // t.logger.Printf("piece %d not pending", index)
@ -1019,7 +1019,7 @@ func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
} }
func (t *Torrent) pieceAllDirty(piece pieceIndex) bool { func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
return t.pieces[piece]._dirtyChunks.Len() == int(t.pieceNumChunks(piece)) return t.pieces[piece]._dirtyChunks.Len() == bitmap.BitRange(t.pieceNumChunks(piece))
} }
func (t *Torrent) readersChanged() { func (t *Torrent) readersChanged() {
@ -1078,11 +1078,11 @@ func (t *Torrent) updatePiecePriority(piece pieceIndex) {
newPrio := p.uncachedPriority() newPrio := p.uncachedPriority()
// t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio) // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
if newPrio == PiecePriorityNone { if newPrio == PiecePriorityNone {
if !t._pendingPieces.Remove(bitmap.BitIndex(piece)) { if !t._pendingPieces.Remove(int(piece)) {
return return
} }
} else { } else {
if !t._pendingPieces.Set(bitmap.BitIndex(piece), newPrio.BitmapPriority()) { if !t._pendingPieces.Set(int(piece), newPrio.BitmapPriority()) {
return return
} }
} }
@ -1138,7 +1138,7 @@ func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool
} }
func (t *Torrent) piecePriority(piece pieceIndex) piecePriority { func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
prio, ok := t._pendingPieces.GetPriority(bitmap.BitIndex(piece)) prio, ok := t._pendingPieces.GetPriority(piece)
if !ok { if !ok {
return PiecePriorityNone return PiecePriorityNone
} }
@ -1286,7 +1286,7 @@ func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
t.forReaderOffsetPieces(func(begin, end pieceIndex) bool { t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
if end > begin { if end > begin {
now.Add(bitmap.BitIndex(begin)) now.Add(bitmap.BitIndex(begin))
readahead.AddRange(bitmap.BitIndex(begin)+1, bitmap.BitIndex(end)) readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
} }
return true return true
}) })
@ -1966,7 +1966,7 @@ func (t *Torrent) tryCreatePieceHasher() bool {
return false return false
} }
p := t.piece(pi) p := t.piece(pi)
t.piecesQueuedForHash.Remove(pi) t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
p.hashing = true p.hashing = true
t.publishPieceChange(pi) t.publishPieceChange(pi)
t.updatePiecePriority(pi) t.updatePiecePriority(pi)

View File

@ -9,6 +9,7 @@ import (
"testing" "testing"
"github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/bradfitz/iter" "github.com/bradfitz/iter"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -96,8 +97,8 @@ func BenchmarkUpdatePiecePriorities(b *testing.B) {
r.Seek(3500000, io.SeekStart) r.Seek(3500000, io.SeekStart)
} }
assert.Len(b, t.readers, 7) assert.Len(b, t.readers, 7)
for i := 0; i < int(t.numPieces()); i += 3 { for i := 0; i < t.numPieces(); i += 3 {
t._completedPieces.Set(i, true) t._completedPieces.Set(bitmap.BitIndex(i), true)
} }
t.DownloadPieces(0, t.numPieces()) t.DownloadPieces(0, t.numPieces())
for range iter.N(b.N) { for range iter.N(b.N) {