Track peer availability at the Torrent-level

This commit is contained in:
Matt Joiner 2021-05-10 17:02:17 +10:00
parent 2d2456249a
commit b508877d82
3 changed files with 48 additions and 20 deletions

View File

@ -371,15 +371,16 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
fmt.Fprintf(w, "\n") fmt.Fprintf(w, "\n")
} }
func (cn *Peer) close() { func (p *Peer) close() {
if !cn.closed.Set() { if !p.closed.Set() {
return return
} }
cn.discardPieceInclination() p.discardPieceInclination()
cn._pieceRequestOrder.Clear() p._pieceRequestOrder.Clear()
cn.peerImpl.onClose() p.peerImpl.onClose()
for _, f := range cn.callbacks.PeerClosed { p.t.decPeerPieceAvailability(p)
f(cn) for _, f := range p.callbacks.PeerClosed {
f(p)
} }
} }
@ -764,7 +765,7 @@ func (cn *Peer) discardPieceInclination() {
cn.pieceInclination = nil cn.pieceInclination = nil
} }
func (cn *PeerConn) peerPiecesChanged() { func (cn *Peer) peerPiecesChanged() {
if cn.t.haveInfo() { if cn.t.haveInfo() {
prioritiesChanged := false prioritiesChanged := false
for i := pieceIndex(0); i < cn.t.numPieces(); i++ { for i := pieceIndex(0); i < cn.t.numPieces(); i++ {
@ -776,7 +777,7 @@ func (cn *PeerConn) peerPiecesChanged() {
cn.updateRequests() cn.updateRequests()
} }
} }
cn.t.maybeDropMutuallyCompletePeer(&cn.Peer) cn.t.maybeDropMutuallyCompletePeer(cn)
} }
func (cn *PeerConn) raisePeerMinPieces(newMin pieceIndex) { func (cn *PeerConn) raisePeerMinPieces(newMin pieceIndex) {
@ -793,6 +794,9 @@ func (cn *PeerConn) peerSentHave(piece pieceIndex) error {
return nil return nil
} }
cn.raisePeerMinPieces(piece + 1) cn.raisePeerMinPieces(piece + 1)
if !cn.peerHasPiece(piece) {
cn.t.incPieceAvailability(piece)
}
cn._peerPieces.Set(bitmap.BitIndex(piece), true) cn._peerPieces.Set(bitmap.BitIndex(piece), true)
cn.t.maybeDropMutuallyCompletePeer(&cn.Peer) cn.t.maybeDropMutuallyCompletePeer(&cn.Peer)
if cn.updatePiecePriority(piece) { if cn.updatePiecePriority(piece) {
@ -802,20 +806,27 @@ func (cn *PeerConn) peerSentHave(piece pieceIndex) error {
} }
func (cn *PeerConn) peerSentBitfield(bf []bool) error { func (cn *PeerConn) peerSentBitfield(bf []bool) error {
cn.peerSentHaveAll = false
if len(bf)%8 != 0 { if len(bf)%8 != 0 {
panic("expected bitfield length divisible by 8") panic("expected bitfield length divisible by 8")
} }
// We know that the last byte means that at most the last 7 bits are // We know that the last byte means that at most the last 7 bits are wasted.
// wasted.
cn.raisePeerMinPieces(pieceIndex(len(bf) - 7)) cn.raisePeerMinPieces(pieceIndex(len(bf) - 7))
if cn.t.haveInfo() && len(bf) > int(cn.t.numPieces()) { if cn.t.haveInfo() && len(bf) > int(cn.t.numPieces()) {
// Ignore known excess pieces. // Ignore known excess pieces.
bf = bf[:cn.t.numPieces()] bf = bf[:cn.t.numPieces()]
} }
pp := cn.newPeerPieces()
cn.peerSentHaveAll = false
for i, have := range bf { for i, have := range bf {
if have { if have {
cn.raisePeerMinPieces(pieceIndex(i) + 1) cn.raisePeerMinPieces(pieceIndex(i) + 1)
if !pp.Contains(i) {
cn.t.incPieceAvailability(i)
}
} else {
if pp.Contains(i) {
cn.t.decPieceAvailability(i)
}
} }
cn._peerPieces.Set(i, have) cn._peerPieces.Set(i, have)
} }
@ -823,14 +834,28 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error {
return nil return nil
} }
func (cn *PeerConn) onPeerSentHaveAll() error { func (cn *Peer) onPeerHasAllPieces() {
t := cn.t
if t.haveInfo() {
pp := cn.newPeerPieces()
for i := range iter.N(t.numPieces()) {
if !pp.Contains(i) {
t.incPieceAvailability(i)
}
}
}
cn.peerSentHaveAll = true cn.peerSentHaveAll = true
cn._peerPieces.Clear() cn._peerPieces.Clear()
cn.peerPiecesChanged() cn.peerPiecesChanged()
}
func (cn *PeerConn) onPeerSentHaveAll() error {
cn.onPeerHasAllPieces()
return nil return nil
} }
func (cn *PeerConn) peerSentHaveNone() error { func (cn *PeerConn) peerSentHaveNone() error {
cn.t.decPeerPieceAvailability(&cn.Peer)
cn._peerPieces.Clear() cn._peerPieces.Clear()
cn.peerSentHaveAll = false cn.peerSentHaveAll = false
cn.peerPiecesChanged() cn.peerPiecesChanged()
@ -1613,10 +1638,11 @@ func (cn *Peer) peerMaxRequests() int {
func (cn *PeerConn) PeerPieces() bitmap.Bitmap { func (cn *PeerConn) PeerPieces() bitmap.Bitmap {
cn.locker().RLock() cn.locker().RLock()
defer cn.locker().RUnlock() defer cn.locker().RUnlock()
return cn.peerPieces() return cn.newPeerPieces()
} }
func (cn *Peer) peerPieces() bitmap.Bitmap { // Returns a new Bitmap that includes bits for all pieces we have.
func (cn *Peer) newPeerPieces() bitmap.Bitmap {
ret := cn._peerPieces.Copy() ret := cn._peerPieces.Copy()
if cn.peerSentHaveAll { if cn.peerSentHaveAll {
ret.AddRange(0, cn.t.numPieces()) ret.AddRange(0, cn.t.numPieces())

View File

@ -55,6 +55,7 @@ type Piece struct {
publicPieceState PieceState publicPieceState PieceState
priority piecePriority priority piecePriority
availability int64
// This can be locked when the Client lock is taken, but probably not vice versa. // This can be locked when the Client lock is taken, but probably not vice versa.
pendingWritesMutex sync.Mutex pendingWritesMutex sync.Mutex

View File

@ -19,7 +19,7 @@ type pieceRequestOrderPiece struct {
index pieceIndex index pieceIndex
prio piecePriority prio piecePriority
partial bool partial bool
availability int availability int64
} }
func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) { func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) {
@ -45,12 +45,13 @@ func (me clientPieceRequestOrder) sort() {
sort.SliceStable(me.pieces, me.less) sort.SliceStable(me.pieces, me.less)
} }
func (me clientPieceRequestOrder) update() { func (me *clientPieceRequestOrder) update() {
for i := range me.pieces { for i := range me.pieces {
p := &me.pieces[i] p := &me.pieces[i]
p.prio = p.t.piece(p.index).uncachedPriority() tp := p.t.piece(p.index)
p.prio = tp.uncachedPriority()
p.partial = p.t.piecePartiallyDownloaded(p.index) p.partial = p.t.piecePartiallyDownloaded(p.index)
p.availability = p.t.pieceAvailability(p.index) p.availability = tp.availability
} }
} }
@ -61,7 +62,7 @@ func (me clientPieceRequestOrder) less(_i, _j int) bool {
int(j.prio), int(i.prio), int(j.prio), int(i.prio),
).Bool( ).Bool(
j.partial, i.partial, j.partial, i.partial,
).Int( ).Int64(
i.availability, j.availability, i.availability, j.availability,
).Less() ).Less()
} }