Coalesce piece state change notifications on client unlock

Reported by Craig Campbell <iamcraigcampbell@gmail.com>.
This commit is contained in:
Matt Joiner 2019-12-13 15:55:56 +11:00
parent 790ba6af3c
commit f448f55e88
4 changed files with 62 additions and 11 deletions

View File

@ -48,7 +48,7 @@ type Client struct {
// 64-bit alignment of fields. See #262.
stats ConnStats
_mu sync.RWMutex
_mu lockWithDeferreds
event sync.Cond
closed missinggo.Event

View File

@ -5,6 +5,7 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"reflect"
@ -288,6 +289,16 @@ type testClientTransferParams struct {
LeecherDownloadRateLimiter *rate.Limiter
}
func logPieceStateChanges(t *Torrent) {
sub := t.SubscribePieceStateChanges()
go func() {
defer sub.Close()
for e := range sub.Values {
log.Printf("%p %#v", t, e)
}
}()
}
// Creates a seeder and a leecher, and ensures the data transfers when a read
// is attempted on the leecher.
func testClientTransfer(t *testing.T, ps testClientTransferParams) {
@ -344,6 +355,10 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
}())
require.NoError(t, err)
assert.True(t, new)
//// This was used when observing coalescing of piece state changes.
//logPieceStateChanges(leecherTorrent)
// Now do some things with leecher and seeder.
leecherTorrent.AddClientPeer(seeder)
// The Torrent should not be interested in obtaining peers, so the one we

35
deferrwl.go Normal file
View File

@ -0,0 +1,35 @@
package torrent
import "sync"
// Runs deferred actions on Unlock. Note that actions are assumed to be the results of changes that
// would only occur with a write lock at present. The race detector should catch instances of defers
// without the write lock being held.
type lockWithDeferreds struct {
internal sync.RWMutex
unlockActions []func()
}
func (me *lockWithDeferreds) Lock() {
me.internal.Lock()
}
func (me *lockWithDeferreds) Unlock() {
for _, a := range me.unlockActions {
a()
}
me.unlockActions = me.unlockActions[:0]
me.internal.Unlock()
}
func (me *lockWithDeferreds) RLock() {
me.internal.RLock()
}
func (me *lockWithDeferreds) RUnlock() {
me.internal.RUnlock()
}
func (me *lockWithDeferreds) Defer(action func()) {
me.unlockActions = append(me.unlockActions, action)
}

View File

@ -849,15 +849,17 @@ type PieceStateChange struct {
}
func (t *Torrent) publishPieceChange(piece pieceIndex) {
cur := t.pieceState(piece)
p := &t.pieces[piece]
if cur != p.publicPieceState {
p.publicPieceState = cur
t.pieceStateChanges.Publish(PieceStateChange{
int(piece),
cur,
})
}
t.cl._mu.Defer(func() {
cur := t.pieceState(piece)
p := &t.pieces[piece]
if cur != p.publicPieceState {
p.publicPieceState = cur
t.pieceStateChanges.Publish(PieceStateChange{
int(piece),
cur,
})
}
})
}
func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
@ -1682,7 +1684,6 @@ func (t *Torrent) connsAsSlice() (ret []*connection) {
return
}
// Currently doesn't really queue, but should in the future.
func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
piece := t.piece(pieceIndex)
if piece.queuedForHash() {