Change pieceIndex to peer_protocol.Integer

This commit is contained in:
Matt Joiner 2018-07-12 09:15:15 +10:00
parent ba8ec1a787
commit f5bd377941
12 changed files with 162 additions and 156 deletions

View File

@ -18,6 +18,7 @@ import (
"github.com/anacrolix/dht/krpc"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/bitmap"
"github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/pproffd"
"github.com/anacrolix/missinggo/pubsub"
@ -814,7 +815,7 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
if conn.fastEnabled() {
if torrent.haveAllPieces() {
conn.Post(pp.Message{Type: pp.HaveAll})
conn.sentHaves.AddRange(0, conn.t.NumPieces())
conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
return
} else if !torrent.haveAnyPieces() {
conn.Post(pp.Message{Type: pp.HaveNone})

View File

@ -98,9 +98,9 @@ type connection struct {
// The highest possible number of pieces the torrent could have based on
// communication with the peer. Generally only useful until we have the
// torrent info.
peerMinPieces int
peerMinPieces pieceIndex
// Pieces we've accepted chunks for from the peer.
peerTouchedPieces map[int]struct{}
peerTouchedPieces map[pieceIndex]struct{}
peerAllowedFast bitmap.Bitmap
PeerMaxRequests int // Maximum pending requests the peer allows.
@ -173,7 +173,7 @@ func (cn *connection) peerHasAllPieces() (all bool, known bool) {
if !cn.t.haveInfo() {
return false, false
}
return bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces()).IsEmpty(), true
return bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces())).IsEmpty(), true
}
func (cn *connection) mu() sync.Locker {
@ -194,7 +194,7 @@ func (cn *connection) supportsExtension(ext pp.ExtensionName) bool {
}
// The best guess at number of pieces in the torrent for this peer.
func (cn *connection) bestPeerNumPieces() int {
func (cn *connection) bestPeerNumPieces() pieceIndex {
if cn.t.haveInfo() {
return cn.t.numPieces()
}
@ -202,7 +202,7 @@ func (cn *connection) bestPeerNumPieces() int {
}
func (cn *connection) completedString() string {
have := cn.peerPieces.Len()
have := pieceIndex(cn.peerPieces.Len())
if cn.peerSentHaveAll {
have = cn.bestPeerNumPieces()
}
@ -212,8 +212,8 @@ func (cn *connection) completedString() string {
// Correct the PeerPieces slice length. Return false if the existing slice is
// invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
// messages.
func (cn *connection) setNumPieces(num int) error {
cn.peerPieces.RemoveRange(num, bitmap.ToEnd)
func (cn *connection) setNumPieces(num pieceIndex) error {
cn.peerPieces.RemoveRange(bitmap.BitIndex(num), bitmap.ToEnd)
cn.peerPiecesChanged()
return nil
}
@ -325,8 +325,8 @@ func (cn *connection) Close() {
}
}
func (cn *connection) PeerHasPiece(piece int) bool {
return cn.peerSentHaveAll || cn.peerPieces.Contains(piece)
func (cn *connection) PeerHasPiece(piece pieceIndex) bool {
return cn.peerSentHaveAll || cn.peerPieces.Contains(bitmap.BitIndex(piece))
}
// Writes a message into the write buffer.
@ -486,7 +486,7 @@ func (cn *connection) request(r request, mw messageWriter) bool {
if _, ok := cn.requests[r]; ok {
panic("chunk already requested")
}
if !cn.PeerHasPiece(r.Index.Int()) {
if !cn.PeerHasPiece(r.Index) {
panic("requesting piece peer doesn't have")
}
if _, ok := cn.t.conns[cn]; !ok {
@ -554,7 +554,7 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
}
if len(cn.requests) <= cn.requestsLowWater {
filledBuffer := false
cn.iterPendingPieces(func(pieceIndex int) bool {
cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
cn.iterPendingRequests(pieceIndex, func(r request) bool {
if !cn.SetInterested(true, msg) {
filledBuffer = true
@ -651,15 +651,15 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) {
}
}
func (cn *connection) Have(piece int) {
if cn.sentHaves.Get(piece) {
func (cn *connection) Have(piece pieceIndex) {
if cn.sentHaves.Get(bitmap.BitIndex(piece)) {
return
}
cn.Post(pp.Message{
Type: pp.Have,
Index: pp.Integer(piece),
})
cn.sentHaves.Add(piece)
cn.sentHaves.Add(bitmap.BitIndex(piece))
}
func (cn *connection) PostBitfield() {
@ -697,12 +697,12 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
}
}
func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece int) bool) bool {
func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece pieceIndex) bool) bool {
now, readahead := cn.t.readerPiecePriorities()
var skip bitmap.Bitmap
if !cn.peerSentHaveAll {
// Pieces to skip include pieces the peer doesn't have.
skip = bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
skip = bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces()))
}
// And pieces that we already have.
skip.Union(cn.t.completedPieces)
@ -711,11 +711,11 @@ func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece int) bool) bool
// pieces.
return iter.All(
func(_piece interface{}) bool {
i := _piece.(pieceIndex)
if cn.t.hashingPiece(i) {
i := _piece.(bitmap.BitIndex)
if cn.t.hashingPiece(pieceIndex(i)) {
return true
}
return f(i)
return f(pieceIndex(i))
},
iterBitmapsDistinct(&skip, now, readahead),
func(cb iter.Callback) {
@ -754,7 +754,7 @@ func (cn *connection) shouldRequestWithoutBias() bool {
return false
}
func (cn *connection) iterPendingPieces(f func(int) bool) bool {
func (cn *connection) iterPendingPieces(f func(pieceIndex) bool) bool {
if !cn.t.haveInfo() {
return false
}
@ -764,15 +764,17 @@ func (cn *connection) iterPendingPieces(f func(int) bool) bool {
if cn.shouldRequestWithoutBias() {
return cn.iterUnbiasedPieceRequestOrder(f)
} else {
return cn.pieceRequestOrder.IterTyped(f)
return cn.pieceRequestOrder.IterTyped(func(i int) bool {
return f(pieceIndex(i))
})
}
}
func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
cn.iterPendingPieces(func(i int) bool { return f(i) })
cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
}
func (cn *connection) iterPendingRequests(piece int, f func(request) bool) bool {
func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
r := request{pp.Integer(piece), cs}
if cn.t.requestStrategy == 3 {
@ -786,24 +788,24 @@ func (cn *connection) iterPendingRequests(piece int, f func(request) bool) bool
})
}
func iterUndirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
func iterUndirtiedChunks(piece pieceIndex, t *Torrent, f func(chunkSpec) bool) bool {
chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
// TODO: Use "math/rand".Shuffle >= Go 1.10
return iter.ForPerm(len(chunkIndices), func(i int) bool {
return f(t.chunkIndexSpec(chunkIndices[i], piece))
return f(t.chunkIndexSpec(pieceIndex(chunkIndices[i]), piece))
})
}
// check callers updaterequests
func (cn *connection) stopRequestingPiece(piece int) bool {
return cn.pieceRequestOrder.Remove(piece)
func (cn *connection) stopRequestingPiece(piece pieceIndex) bool {
return cn.pieceRequestOrder.Remove(bitmap.BitIndex(piece))
}
// This is distinct from Torrent piece priority, which is the user's
// preference. Connection piece priority is specific to a connection and is
// used to pseudorandomly avoid connections always requesting the same pieces
// and thus wasting effort.
func (cn *connection) updatePiecePriority(piece int) bool {
func (cn *connection) updatePiecePriority(piece pieceIndex) bool {
tpp := cn.t.piecePriority(piece)
if !cn.PeerHasPiece(piece) {
tpp = PiecePriorityNone
@ -817,16 +819,16 @@ func (cn *connection) updatePiecePriority(piece int) bool {
switch tpp {
case PiecePriorityNormal:
case PiecePriorityReadahead:
prio -= cn.t.numPieces()
prio -= int(cn.t.numPieces())
case PiecePriorityNext, PiecePriorityNow:
prio -= 2 * cn.t.numPieces()
prio -= 2 * int(cn.t.numPieces())
default:
panic(tpp)
}
prio += piece / 3
prio += int(piece / 3)
default:
}
return cn.pieceRequestOrder.Set(piece, prio) || cn.shouldRequestWithoutBias()
return cn.pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
}
func (cn *connection) getPieceInclination() []int {
@ -847,7 +849,7 @@ func (cn *connection) discardPieceInclination() {
func (cn *connection) peerPiecesChanged() {
if cn.t.haveInfo() {
prioritiesChanged := false
for i := range iter.N(cn.t.numPieces()) {
for i := pieceIndex(0); i < cn.t.numPieces(); i++ {
if cn.updatePiecePriority(i) {
prioritiesChanged = true
}
@ -858,13 +860,13 @@ func (cn *connection) peerPiecesChanged() {
}
}
func (cn *connection) raisePeerMinPieces(newMin int) {
func (cn *connection) raisePeerMinPieces(newMin pieceIndex) {
if newMin > cn.peerMinPieces {
cn.peerMinPieces = newMin
}
}
func (cn *connection) peerSentHave(piece int) error {
func (cn *connection) peerSentHave(piece pieceIndex) error {
if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
return errors.New("invalid piece")
}
@ -872,7 +874,7 @@ func (cn *connection) peerSentHave(piece int) error {
return nil
}
cn.raisePeerMinPieces(piece + 1)
cn.peerPieces.Set(piece, true)
cn.peerPieces.Set(bitmap.BitIndex(piece), true)
if cn.updatePiecePriority(piece) {
cn.updateRequests()
}
@ -886,14 +888,14 @@ func (cn *connection) peerSentBitfield(bf []bool) error {
}
// We know that the last byte means that at most the last 7 bits are
// wasted.
cn.raisePeerMinPieces(len(bf) - 7)
if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
cn.raisePeerMinPieces(pieceIndex(len(bf) - 7))
if cn.t.haveInfo() && len(bf) > int(cn.t.numPieces()) {
// Ignore known excess pieces.
bf = bf[:cn.t.numPieces()]
}
for i, have := range bf {
if have {
cn.raisePeerMinPieces(i + 1)
cn.raisePeerMinPieces(pieceIndex(i) + 1)
}
cn.peerPieces.Set(i, have)
}
@ -1011,7 +1013,7 @@ func (c *connection) reject(r request) {
func (c *connection) onReadRequest(r request) error {
requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
if r.Begin+r.Length > c.t.pieceLength(int(r.Index)) {
if r.Begin+r.Length > c.t.pieceLength(r.Index) {
torrent.Add("bad requests received", 1)
return errors.New("bad request")
}
@ -1035,7 +1037,7 @@ func (c *connection) onReadRequest(r request) error {
// BEP 6 says we may close here if we choose.
return nil
}
if !c.t.havePiece(r.Index.Int()) {
if !c.t.havePiece(r.Index) {
// This isn't necessarily them screwing up. We can drop pieces
// from our storage, and can't communicate this to peers
// except by reconnecting.
@ -1114,7 +1116,7 @@ func (c *connection) mainReadLoop() (err error) {
// We'll probably choke them for this, which will clear them if
// appropriate, and is clearly specified.
case pp.Have:
err = c.peerSentHave(int(msg.Index))
err = c.peerSentHave(msg.Index)
case pp.Request:
r := newRequestFromMessage(&msg)
err = c.onReadRequest(r)
@ -1288,8 +1290,7 @@ func (c *connection) receiveChunk(msg *pp.Message) error {
return nil
}
index := int(req.Index)
piece := &t.pieces[index]
piece := &t.pieces[req.Index]
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
@ -1328,28 +1329,28 @@ func (c *connection) receiveChunk(msg *pp.Message) error {
if err != nil {
log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
t.pendRequest(req)
t.updatePieceCompletion(int(msg.Index))
t.updatePieceCompletion(msg.Index)
return nil
}
// It's important that the piece is potentially queued before we check if
// the piece is still wanted, because if it is queued, it won't be wanted.
if t.pieceAllDirty(index) {
t.queuePieceCheck(int(req.Index))
t.pendAllChunkSpecs(index)
if t.pieceAllDirty(req.Index) {
t.queuePieceCheck(req.Index)
t.pendAllChunkSpecs(req.Index)
}
c.onDirtiedPiece(index)
c.onDirtiedPiece(req.Index)
cl.event.Broadcast()
t.publishPieceChange(int(req.Index))
t.publishPieceChange(req.Index)
return nil
}
func (c *connection) onDirtiedPiece(piece int) {
func (c *connection) onDirtiedPiece(piece pieceIndex) {
if c.peerTouchedPieces == nil {
c.peerTouchedPieces = make(map[int]struct{})
c.peerTouchedPieces = make(map[pieceIndex]struct{})
}
c.peerTouchedPieces[piece] = struct{}{}
ds := &c.t.pieces[piece].dirtiers
@ -1408,7 +1409,7 @@ another:
}
more, err := c.sendChunk(r, msg)
if err != nil {
i := int(r.Index)
i := r.Index
if c.t.pieceComplete(i) {
c.t.updatePieceCompletion(i)
if !c.t.pieceComplete(i) {

View File

@ -65,7 +65,7 @@ func (f *File) State() (ret []FilePieceState) {
pieceSize := int64(f.t.usualPieceSize())
off := f.offset % pieceSize
remaining := f.length
for i := int(f.offset / pieceSize); ; i++ {
for i := pieceIndex(f.offset / pieceSize); ; i++ {
if remaining == 0 {
break
}
@ -121,7 +121,7 @@ func (f *File) SetPriority(prio piecePriority) {
return
}
f.prio = prio
f.t.updatePiecePriorities(f.firstPieceIndex().Int(), f.endPieceIndex().Int())
f.t.updatePiecePriorities(f.firstPieceIndex(), f.endPieceIndex())
}
// Returns the priority per File.SetPriority.

View File

@ -152,5 +152,5 @@ func (info *Info) UpvertedFiles() []FileInfo {
}
func (info *Info) Piece(index int) Piece {
return Piece{info, index}
return Piece{info, pieceIndex(index)}
}

View File

@ -1,14 +1,18 @@
package metainfo
import "github.com/anacrolix/missinggo"
import (
"github.com/anacrolix/missinggo"
)
type Piece struct {
Info *Info
i int
i pieceIndex
}
type pieceIndex = int
func (p Piece) Length() int64 {
if p.i == p.Info.NumPieces()-1 {
if int(p.i) == p.Info.NumPieces()-1 {
return p.Info.TotalLength() - int64(p.i)*p.Info.PieceLength
}
return p.Info.PieceLength
@ -23,6 +27,6 @@ func (p Piece) Hash() (ret Hash) {
return
}
func (p Piece) Index() int {
func (p Piece) Index() pieceIndex {
return p.i
}

View File

@ -3,5 +3,5 @@ package metainfo
// Uniquely identifies a piece.
type PieceKey struct {
InfoHash Hash
Index int
Index pieceIndex
}

View File

@ -98,7 +98,7 @@ func validateInfo(info *metainfo.Info) error {
return nil
}
func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec {
func chunkIndexSpec(index pieceIndex, pieceLength, chunkSize pp.Integer) chunkSpec {
ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize}
if ret.Begin+ret.Length > pieceLength {
ret.Length = pieceLength - ret.Begin
@ -154,6 +154,6 @@ func min(as ...int64) int64 {
var unlimited = rate.NewLimiter(rate.Inf, 0)
type (
pieceIndex = int
pieceIndex = pp.Integer
InfoHash = metainfo.Hash
)

View File

@ -42,7 +42,7 @@ type Piece struct {
// The completed piece SHA1 hash, from the metainfo "pieces" field.
hash metainfo.Hash
t *Torrent
index int
index pieceIndex
files []*File
// Chunks we've written to since the last check. The chunk offset and
// length can be determined by the request chunkSize in use.
@ -69,7 +69,7 @@ func (p *Piece) String() string {
}
func (p *Piece) Info() metainfo.Piece {
return p.t.info.Piece(p.index)
return p.t.info.Piece(int(p.index))
}
func (p *Piece) Storage() storage.Piece {
@ -88,8 +88,8 @@ func (p *Piece) hasDirtyChunks() bool {
return p.dirtyChunks.Len() != 0
}
func (p *Piece) numDirtyChunks() (ret int) {
return p.dirtyChunks.Len()
func (p *Piece) numDirtyChunks() pp.Integer {
return pp.Integer(p.dirtyChunks.Len())
}
func (p *Piece) unpendChunkIndex(i int) {
@ -101,13 +101,13 @@ func (p *Piece) pendChunkIndex(i int) {
p.dirtyChunks.Remove(i)
}
func (p *Piece) numChunks() int {
func (p *Piece) numChunks() pp.Integer {
return p.t.pieceNumChunks(p.index)
}
func (p *Piece) undirtiedChunkIndices() (ret bitmap.Bitmap) {
ret = p.dirtyChunks.Copy()
ret.FlipRange(0, p.numChunks())
ret.FlipRange(0, bitmap.BitIndex(p.numChunks()))
return
}
@ -137,11 +137,11 @@ func (p *Piece) waitNoPendingWrites() {
p.pendingWritesMutex.Unlock()
}
func (p *Piece) chunkIndexDirty(chunk int) bool {
return p.dirtyChunks.Contains(chunk)
func (p *Piece) chunkIndexDirty(chunk pp.Integer) bool {
return p.dirtyChunks.Contains(bitmap.BitIndex(chunk))
}
func (p *Piece) chunkIndexSpec(chunk int) chunkSpec {
func (p *Piece) chunkIndexSpec(chunk pp.Integer) chunkSpec {
return chunkIndexSpec(chunk, p.length(), p.chunkSize())
}
@ -168,7 +168,7 @@ func (p *Piece) chunkSize() pp.Integer {
return p.t.chunkSize
}
func (p *Piece) lastChunkIndex() int {
func (p *Piece) lastChunkIndex() pp.Integer {
return p.numChunks() - 1
}
@ -196,7 +196,7 @@ func (p *Piece) VerifyData() {
}
func (p *Piece) queuedForHash() bool {
return p.t.piecesQueuedForHash.Get(p.index)
return p.t.piecesQueuedForHash.Get(bitmap.BitIndex(p.index))
}
func (p *Piece) torrentBeginOffset() int64 {
@ -221,13 +221,13 @@ func (p *Piece) uncachedPriority() (ret piecePriority) {
for _, f := range p.files {
ret.Raise(f.prio)
}
if p.t.readerNowPieces.Contains(p.index) {
if p.t.readerNowPieces.Contains(int(p.index)) {
ret.Raise(PiecePriorityNow)
}
// if t.readerNowPieces.Contains(piece - 1) {
// return PiecePriorityNext
// }
if p.t.readerReadaheadPieces.Contains(p.index) {
if p.t.readerReadaheadPieces.Contains(bitmap.BitIndex(p.index)) {
ret.Raise(PiecePriorityReadahead)
}
ret.Raise(p.priority)

View File

@ -8,6 +8,7 @@ import (
"sync"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/torrent/peer_protocol"
)
type Reader interface {
@ -21,7 +22,7 @@ type Reader interface {
// Piece range by piece index, [begin, end).
type pieceRange struct {
begin, end int
begin, end pieceIndex
}
// Accesses Torrent data via a Client. Reads block until the data is
@ -84,7 +85,7 @@ func (r *reader) readable(off int64) (ret bool) {
if r.responsive {
return r.t.haveChunk(req)
}
return r.t.pieceComplete(int(req.Index))
return r.t.pieceComplete(req.Index)
}
// How many bytes are available to read. Max is the most we could require.
@ -212,8 +213,8 @@ func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro
return
}
}
pi := int(r.torrentOffset(pos) / r.t.info.PieceLength)
ip := r.t.info.Piece(pi)
pi := peer_protocol.Integer(r.torrentOffset(pos) / r.t.info.PieceLength)
ip := r.t.info.Piece(int(pi))
po := r.torrentOffset(pos) % r.t.info.PieceLength
b1 := missinggo.LimitLen(b, ip.Length()-po, avail)
n, err = r.t.readAt(b1, r.torrentOffset(pos))

14
t.go
View File

@ -51,7 +51,7 @@ func (t *Torrent) PieceStateRuns() []PieceStateRun {
return t.pieceStateRuns()
}
func (t *Torrent) PieceState(piece int) PieceState {
func (t *Torrent) PieceState(piece pieceIndex) PieceState {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
return t.pieceState(piece)
@ -59,7 +59,7 @@ func (t *Torrent) PieceState(piece int) PieceState {
// The number of pieces in the torrent. This requires that the info has been
// obtained first.
func (t *Torrent) NumPieces() int {
func (t *Torrent) NumPieces() pieceIndex {
return t.numPieces()
}
@ -152,13 +152,13 @@ func (t *Torrent) deleteReader(r *reader) {
// Raise the priorities of pieces in the range [begin, end) to at least Normal
// priority. Piece indexes are not the same as bytes. Requires that the info
// has been obtained, see Torrent.Info and Torrent.GotInfo.
func (t *Torrent) DownloadPieces(begin, end int) {
func (t *Torrent) DownloadPieces(begin, end pieceIndex) {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
t.downloadPiecesLocked(begin, end)
}
func (t *Torrent) downloadPiecesLocked(begin, end int) {
func (t *Torrent) downloadPiecesLocked(begin, end pieceIndex) {
for i := begin; i < end; i++ {
if t.pieces[i].priority.Raise(PiecePriorityNormal) {
t.updatePiecePriority(i)
@ -166,13 +166,13 @@ func (t *Torrent) downloadPiecesLocked(begin, end int) {
}
}
func (t *Torrent) CancelPieces(begin, end int) {
func (t *Torrent) CancelPieces(begin, end pieceIndex) {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
t.cancelPiecesLocked(begin, end)
}
func (t *Torrent) cancelPiecesLocked(begin, end int) {
func (t *Torrent) cancelPiecesLocked(begin, end pieceIndex) {
for i := begin; i < end; i++ {
p := &t.pieces[i]
if p.priority == PiecePriorityNone {
@ -233,7 +233,7 @@ func (t *Torrent) AddTrackers(announceList [][]string) {
t.addTrackers(announceList)
}
func (t *Torrent) Piece(i int) *Piece {
func (t *Torrent) Piece(i pieceIndex) *Piece {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
return &t.pieces[i]

View File

@ -24,7 +24,6 @@ import (
"github.com/anacrolix/missinggo/prioritybitmap"
"github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/missinggo/slices"
"github.com/bradfitz/iter"
"github.com/davecgh/go-spew/spew"
"github.com/anacrolix/torrent/bencode"
@ -34,7 +33,7 @@ import (
"github.com/anacrolix/torrent/tracker"
)
func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
func (t *Torrent) chunkIndexSpec(chunkIndex, piece pieceIndex) chunkSpec {
return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
}
@ -220,11 +219,11 @@ func (t *Torrent) setDisplayName(dn string) {
t.displayName = dn
}
func (t *Torrent) pieceComplete(piece int) bool {
return t.completedPieces.Get(piece)
func (t *Torrent) pieceComplete(piece pieceIndex) bool {
return t.completedPieces.Get(bitmap.BitIndex(piece))
}
func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
return t.pieces[piece].Storage().Completion()
}
@ -329,7 +328,7 @@ func (t *Torrent) makePieces() {
for i, hash := range hashes {
piece := &t.pieces[i]
piece.t = t
piece.index = i
piece.index = pieceIndex(i)
piece.noPendingWrites.L = &piece.pendingWritesMutex
missinggo.CopyExact(piece.hash[:], hash)
files := *t.files
@ -396,11 +395,11 @@ func (t *Torrent) onSetInfo() {
}
}
for i := range t.pieces {
t.updatePieceCompletion(i)
t.updatePieceCompletion(pieceIndex(i))
p := &t.pieces[i]
if !p.storageCompletionOk {
// log.Printf("piece %s completion unknown, queueing check", p)
t.queuePieceCheck(i)
t.queuePieceCheck(pieceIndex(i))
}
}
t.cl.event.Broadcast()
@ -473,7 +472,7 @@ func (t *Torrent) name() string {
return t.displayName
}
func (t *Torrent) pieceState(index int) (ret PieceState) {
func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
p := &t.pieces[index]
ret.Priority = t.piecePriority(index)
ret.Completion = p.completion()
@ -514,7 +513,7 @@ func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
})
})
for index := range t.pieces {
rle.Append(t.pieceState(index), 1)
rle.Append(t.pieceState(pieceIndex(index)), 1)
}
rle.Flush()
return
@ -587,7 +586,7 @@ func (t *Torrent) writeStatus(w io.Writer) {
fmt.Fprintln(w)
}
fmt.Fprintf(w, "Reader Pieces:")
t.forReaderOffsetPieces(func(begin, end int) (again bool) {
t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
fmt.Fprintf(w, " %d:%d", begin, end)
return true
})
@ -651,7 +650,7 @@ func (t *Torrent) bytesMissingLocked() int64 {
}
func (t *Torrent) bytesLeft() (left int64) {
bitmap.Flip(t.completedPieces, 0, t.numPieces()).IterTyped(func(piece int) bool {
bitmap.Flip(t.completedPieces, 0, bitmap.BitIndex(t.numPieces())).IterTyped(func(piece int) bool {
p := &t.pieces[piece]
left += int64(p.length() - p.numDirtyBytes())
return true
@ -668,7 +667,7 @@ func (t *Torrent) bytesLeftAnnounce() uint64 {
}
}
func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
if t.pieceComplete(piece) {
return false
}
@ -682,8 +681,8 @@ func (t *Torrent) usualPieceSize() int {
return int(t.info.PieceLength)
}
func (t *Torrent) numPieces() int {
return t.info.NumPieces()
func (t *Torrent) numPieces() pieceIndex {
return pieceIndex(t.info.NumPieces())
}
func (t *Torrent) numPiecesCompleted() (num int) {
@ -735,15 +734,15 @@ func (t *Torrent) bitfield() (bf []bool) {
return
}
func (t *Torrent) pieceNumChunks(piece int) int {
return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
func (t *Torrent) pieceNumChunks(piece pieceIndex) pp.Integer {
return (t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize
}
func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
t.pieces[pieceIndex].dirtyChunks.Clear()
}
func (t *Torrent) pieceLength(piece int) pp.Integer {
func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
if t.info.PieceLength == 0 {
// There will be no variance amongst pieces. Only pain.
return 0
@ -757,11 +756,11 @@ func (t *Torrent) pieceLength(piece int) pp.Integer {
return pp.Integer(t.info.PieceLength)
}
func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash) {
hash := pieceHash.New()
p := &t.pieces[piece]
p.waitNoPendingWrites()
ip := t.info.Piece(piece)
ip := t.info.Piece(int(piece))
pl := ip.Length()
n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
if n == pl {
@ -782,10 +781,10 @@ func (t *Torrent) haveAllPieces() bool {
if !t.haveInfo() {
return false
}
return t.completedPieces.Len() == t.numPieces()
return t.completedPieces.Len() == bitmap.BitIndex(t.numPieces())
}
func (t *Torrent) havePiece(index int) bool {
func (t *Torrent) havePiece(index pieceIndex) bool {
return t.haveInfo() && t.pieceComplete(index)
}
@ -796,7 +795,7 @@ func (t *Torrent) haveChunk(r request) (ret bool) {
if !t.haveInfo() {
return false
}
if t.pieceComplete(int(r.Index)) {
if t.pieceComplete(r.Index) {
return true
}
p := &t.pieces[r.Index]
@ -808,7 +807,7 @@ func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
}
func (t *Torrent) wantPiece(r request) bool {
if !t.wantPieceIndex(int(r.Index)) {
if !t.wantPieceIndex(r.Index) {
return false
}
if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
@ -819,7 +818,7 @@ func (t *Torrent) wantPiece(r request) bool {
return false
}
func (t *Torrent) wantPieceIndex(index int) bool {
func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
if !t.haveInfo() {
return false
}
@ -836,11 +835,11 @@ func (t *Torrent) wantPieceIndex(index int) bool {
if t.pieceComplete(index) {
return false
}
if t.pendingPieces.Contains(index) {
if t.pendingPieces.Contains(bitmap.BitIndex(index)) {
return true
}
// log.Printf("piece %d not pending", index)
return !t.forReaderOffsetPieces(func(begin, end int) bool {
return !t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
return index < begin || index >= end
})
}
@ -874,27 +873,27 @@ type PieceStateChange struct {
PieceState
}
func (t *Torrent) publishPieceChange(piece int) {
func (t *Torrent) publishPieceChange(piece pieceIndex) {
cur := t.pieceState(piece)
p := &t.pieces[piece]
if cur != p.publicPieceState {
p.publicPieceState = cur
t.pieceStateChanges.Publish(PieceStateChange{
piece,
int(piece),
cur,
})
}
}
func (t *Torrent) pieceNumPendingChunks(piece int) int {
func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
if t.pieceComplete(piece) {
return 0
}
return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
}
func (t *Torrent) pieceAllDirty(piece int) bool {
return t.pieces[piece].dirtyChunks.Len() == t.pieceNumChunks(piece)
func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
return t.pieces[piece].dirtyChunks.Len() == int(t.pieceNumChunks(piece))
}
func (t *Torrent) readersChanged() {
@ -936,7 +935,7 @@ func (t *Torrent) maybeNewConns() {
t.openNewConns()
}
func (t *Torrent) piecePriorityChanged(piece int) {
func (t *Torrent) piecePriorityChanged(piece pieceIndex) {
// log.Printf("piece %d priority changed", piece)
for c := range t.conns {
if c.updatePiecePriority(piece) {
@ -948,16 +947,16 @@ func (t *Torrent) piecePriorityChanged(piece int) {
t.publishPieceChange(piece)
}
func (t *Torrent) updatePiecePriority(piece int) {
func (t *Torrent) updatePiecePriority(piece pieceIndex) {
p := &t.pieces[piece]
newPrio := p.uncachedPriority()
// log.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
if newPrio == PiecePriorityNone {
if !t.pendingPieces.Remove(piece) {
if !t.pendingPieces.Remove(bitmap.BitIndex(piece)) {
return
}
} else {
if !t.pendingPieces.Set(piece, newPrio.BitmapPriority()) {
if !t.pendingPieces.Set(bitmap.BitIndex(piece), newPrio.BitmapPriority()) {
return
}
}
@ -965,19 +964,19 @@ func (t *Torrent) updatePiecePriority(piece int) {
}
func (t *Torrent) updateAllPiecePriorities() {
t.updatePiecePriorities(0, len(t.pieces))
t.updatePiecePriorities(0, t.numPieces())
}
// Update all piece priorities in one hit. This function should have the same
// output as updatePiecePriority, but across all pieces.
func (t *Torrent) updatePiecePriorities(begin, end int) {
func (t *Torrent) updatePiecePriorities(begin, end pieceIndex) {
for i := begin; i < end; i++ {
t.updatePiecePriority(i)
}
}
// Returns the range of pieces [begin, end) that contains the extent of bytes.
func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
if off >= *t.length {
return
}
@ -988,10 +987,10 @@ func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
if size <= 0 {
return
}
begin = int(off / t.info.PieceLength)
end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
if end > t.info.NumPieces() {
end = t.info.NumPieces()
begin = pieceIndex(off / t.info.PieceLength)
end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
if end > pieceIndex(t.info.NumPieces()) {
end = pieceIndex(t.info.NumPieces())
}
return
}
@ -999,7 +998,7 @@ func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
// Returns true if all iterations complete without breaking. Returns the read
// regions for all readers. The reader regions should not be merged as some
// callers depend on this method to enumerate readers.
func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
for r := range t.readers {
p := r.pieces
if p.begin >= p.end {
@ -1012,8 +1011,8 @@ func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all
return true
}
func (t *Torrent) piecePriority(piece int) piecePriority {
prio, ok := t.pendingPieces.GetPriority(piece)
func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
prio, ok := t.pendingPieces.GetPriority(bitmap.BitIndex(piece))
if !ok {
return PiecePriorityNone
}
@ -1032,7 +1031,7 @@ func (t *Torrent) pendRequest(req request) {
t.pieces[req.Index].pendChunkIndex(ci)
}
func (t *Torrent) pieceCompletionChanged(piece int) {
func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
log.Call().Add("piece", piece).AddValue(debugLogValue).Log(t.logger)
t.cl.event.Broadcast()
if t.pieceComplete(piece) {
@ -1080,7 +1079,7 @@ func (t *Torrent) getConnPieceInclination() []int {
_ret := t.connPieceInclinationPool.Get()
if _ret == nil {
pieceInclinationsNew.Add(1)
return rand.Perm(t.numPieces())
return rand.Perm(int(t.numPieces()))
}
pieceInclinationsReused.Add(1)
return *_ret.(*[]int)
@ -1091,13 +1090,13 @@ func (t *Torrent) putPieceInclination(pi []int) {
pieceInclinationsPut.Add(1)
}
func (t *Torrent) updatePieceCompletion(piece int) {
func (t *Torrent) updatePieceCompletion(piece pieceIndex) {
pcu := t.pieceCompleteUncached(piece)
p := &t.pieces[piece]
changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
changed := t.completedPieces.Get(bitmap.BitIndex(piece)) != pcu.Complete || p.storageCompletionOk != pcu.Ok
log.Fmsg("piece %d completion: %v", piece, pcu.Ok).AddValue(debugLogValue).Log(t.logger)
p.storageCompletionOk = pcu.Ok
t.completedPieces.Set(piece, pcu.Complete)
t.completedPieces.Set(bitmap.BitIndex(piece), pcu.Complete)
t.tickleReaders()
// log.Printf("piece %d uncached completion: %v", piece, pcu.Complete)
// log.Printf("piece %d changed: %v", piece, changed)
@ -1114,7 +1113,7 @@ func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
}
func (t *Torrent) updateAllPieceCompletions() {
for i := range iter.N(t.numPieces()) {
for i := pieceIndex(0); i < t.numPieces(); i++ {
t.updatePieceCompletion(i)
}
}
@ -1142,18 +1141,18 @@ func (t *Torrent) maybeCompleteMetadata() error {
}
func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
t.forReaderOffsetPieces(func(begin, end int) bool {
ret.AddRange(begin, end)
t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
ret.AddRange(bitmap.BitIndex(begin), bitmap.BitIndex(end))
return true
})
return
}
func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
t.forReaderOffsetPieces(func(begin, end int) bool {
t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
if end > begin {
now.Add(begin)
readahead.AddRange(begin+1, end)
now.Add(bitmap.BitIndex(begin))
readahead.AddRange(bitmap.BitIndex(begin)+1, bitmap.BitIndex(end))
}
return true
})
@ -1578,7 +1577,7 @@ func (t *Torrent) mu() missinggo.RWLocker {
return &t.cl.mu
}
func (t *Torrent) pieceHashed(piece int, correct bool) {
func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
if t.closed.IsSet() {
return
@ -1636,14 +1635,14 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
t.updatePieceCompletion(piece)
}
func (t *Torrent) cancelRequestsForPiece(piece int) {
func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
// TODO: Make faster
for cn := range t.conns {
cn.tickleWriter()
}
}
func (t *Torrent) onPieceCompleted(piece int) {
func (t *Torrent) onPieceCompleted(piece pieceIndex) {
t.pendAllChunkSpecs(piece)
t.cancelRequestsForPiece(piece)
for conn := range t.conns {
@ -1652,7 +1651,7 @@ func (t *Torrent) onPieceCompleted(piece int) {
}
// Called when a piece is found to be not complete.
func (t *Torrent) onIncompletePiece(piece int) {
func (t *Torrent) onIncompletePiece(piece pieceIndex) {
if t.pieceAllDirty(piece) {
t.pendAllChunkSpecs(piece)
}
@ -1690,7 +1689,7 @@ func (t *Torrent) verifyPiece(piece pieceIndex) {
for p.hashing || t.storage == nil {
cl.event.Wait()
}
if !p.t.piecesQueuedForHash.Remove(piece) {
if !p.t.piecesQueuedForHash.Remove(bitmap.BitIndex(piece)) {
panic("piece was not queued")
}
t.updatePiecePriority(piece)
@ -1713,7 +1712,7 @@ func (t *Torrent) verifyPiece(piece pieceIndex) {
// Return the connections that touched a piece, and clear the entries while
// doing it.
func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
func (t *Torrent) reapPieceTouchers(piece pieceIndex) (ret []*connection) {
for c := range t.pieces[piece].dirtiers {
delete(c.peerTouchedPieces, piece)
ret = append(ret, c)
@ -1730,19 +1729,19 @@ func (t *Torrent) connsAsSlice() (ret []*connection) {
}
// Currently doesn't really queue, but should in the future.
func (t *Torrent) queuePieceCheck(pieceIndex int) {
func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
piece := &t.pieces[pieceIndex]
if piece.queuedForHash() {
return
}
t.piecesQueuedForHash.Add(pieceIndex)
t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
t.publishPieceChange(pieceIndex)
t.updatePiecePriority(pieceIndex)
go t.verifyPiece(pieceIndex)
}
func (t *Torrent) VerifyData() {
for i := range iter.N(t.NumPieces()) {
for i := pieceIndex(0); i < t.NumPieces(); i++ {
t.Piece(i).VerifyData()
}
}

View File

@ -95,7 +95,7 @@ func BenchmarkUpdatePiecePriorities(b *testing.B) {
r.Seek(3500000, 0)
}
assert.Len(b, t.readers, 7)
for i := 0; i < t.numPieces(); i += 3 {
for i := 0; i < int(t.numPieces()); i += 3 {
t.completedPieces.Set(i, true)
}
t.DownloadPieces(0, t.numPieces())