diff --git a/peerconn.go b/peerconn.go index 7fd819be..7e20ba3b 100644 --- a/peerconn.go +++ b/peerconn.go @@ -371,15 +371,16 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { fmt.Fprintf(w, "\n") } -func (cn *Peer) close() { - if !cn.closed.Set() { +func (p *Peer) close() { + if !p.closed.Set() { return } - cn.discardPieceInclination() - cn._pieceRequestOrder.Clear() - cn.peerImpl.onClose() - for _, f := range cn.callbacks.PeerClosed { - f(cn) + p.discardPieceInclination() + p._pieceRequestOrder.Clear() + p.peerImpl.onClose() + p.t.decPeerPieceAvailability(p) + for _, f := range p.callbacks.PeerClosed { + f(p) } } @@ -764,7 +765,7 @@ func (cn *Peer) discardPieceInclination() { cn.pieceInclination = nil } -func (cn *PeerConn) peerPiecesChanged() { +func (cn *Peer) peerPiecesChanged() { if cn.t.haveInfo() { prioritiesChanged := false for i := pieceIndex(0); i < cn.t.numPieces(); i++ { @@ -776,7 +777,7 @@ func (cn *PeerConn) peerPiecesChanged() { cn.updateRequests() } } - cn.t.maybeDropMutuallyCompletePeer(&cn.Peer) + cn.t.maybeDropMutuallyCompletePeer(cn) } func (cn *PeerConn) raisePeerMinPieces(newMin pieceIndex) { @@ -793,6 +794,9 @@ func (cn *PeerConn) peerSentHave(piece pieceIndex) error { return nil } cn.raisePeerMinPieces(piece + 1) + if !cn.peerHasPiece(piece) { + cn.t.incPieceAvailability(piece) + } cn._peerPieces.Set(bitmap.BitIndex(piece), true) cn.t.maybeDropMutuallyCompletePeer(&cn.Peer) if cn.updatePiecePriority(piece) { @@ -802,20 +806,27 @@ func (cn *PeerConn) peerSentHave(piece pieceIndex) error { } func (cn *PeerConn) peerSentBitfield(bf []bool) error { - cn.peerSentHaveAll = false if len(bf)%8 != 0 { panic("expected bitfield length divisible by 8") } - // We know that the last byte means that at most the last 7 bits are - // wasted. + // We know that the last byte means that at most the last 7 bits are wasted. cn.raisePeerMinPieces(pieceIndex(len(bf) - 7)) if cn.t.haveInfo() && len(bf) > int(cn.t.numPieces()) { // Ignore known excess pieces. bf = bf[:cn.t.numPieces()] } + pp := cn.newPeerPieces() + cn.peerSentHaveAll = false for i, have := range bf { if have { cn.raisePeerMinPieces(pieceIndex(i) + 1) + if !pp.Contains(i) { + cn.t.incPieceAvailability(i) + } + } else { + if pp.Contains(i) { + cn.t.decPieceAvailability(i) + } } cn._peerPieces.Set(i, have) } @@ -823,14 +834,28 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error { return nil } -func (cn *PeerConn) onPeerSentHaveAll() error { +func (cn *Peer) onPeerHasAllPieces() { + t := cn.t + if t.haveInfo() { + pp := cn.newPeerPieces() + for i := range iter.N(t.numPieces()) { + if !pp.Contains(i) { + t.incPieceAvailability(i) + } + } + } cn.peerSentHaveAll = true cn._peerPieces.Clear() cn.peerPiecesChanged() +} + +func (cn *PeerConn) onPeerSentHaveAll() error { + cn.onPeerHasAllPieces() return nil } func (cn *PeerConn) peerSentHaveNone() error { + cn.t.decPeerPieceAvailability(&cn.Peer) cn._peerPieces.Clear() cn.peerSentHaveAll = false cn.peerPiecesChanged() @@ -1613,10 +1638,11 @@ func (cn *Peer) peerMaxRequests() int { func (cn *PeerConn) PeerPieces() bitmap.Bitmap { cn.locker().RLock() defer cn.locker().RUnlock() - return cn.peerPieces() + return cn.newPeerPieces() } -func (cn *Peer) peerPieces() bitmap.Bitmap { +// Returns a new Bitmap that includes bits for all pieces we have. +func (cn *Peer) newPeerPieces() bitmap.Bitmap { ret := cn._peerPieces.Copy() if cn.peerSentHaveAll { ret.AddRange(0, cn.t.numPieces()) diff --git a/piece.go b/piece.go index 248832d0..cee6b7d0 100644 --- a/piece.go +++ b/piece.go @@ -55,6 +55,7 @@ type Piece struct { publicPieceState PieceState priority piecePriority + availability int64 // This can be locked when the Client lock is taken, but probably not vice versa. pendingWritesMutex sync.Mutex diff --git a/request-strategy.go b/request-strategy.go index e7349aef..9369905b 100644 --- a/request-strategy.go +++ b/request-strategy.go @@ -19,7 +19,7 @@ type pieceRequestOrderPiece struct { index pieceIndex prio piecePriority partial bool - availability int + availability int64 } func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) { @@ -45,12 +45,13 @@ func (me clientPieceRequestOrder) sort() { sort.SliceStable(me.pieces, me.less) } -func (me clientPieceRequestOrder) update() { +func (me *clientPieceRequestOrder) update() { for i := range me.pieces { p := &me.pieces[i] - p.prio = p.t.piece(p.index).uncachedPriority() + tp := p.t.piece(p.index) + p.prio = tp.uncachedPriority() p.partial = p.t.piecePartiallyDownloaded(p.index) - p.availability = p.t.pieceAvailability(p.index) + p.availability = tp.availability } } @@ -61,7 +62,7 @@ func (me clientPieceRequestOrder) less(_i, _j int) bool { int(j.prio), int(i.prio), ).Bool( j.partial, i.partial, - ).Int( + ).Int64( i.availability, j.availability, ).Less() }