From 0e221dbdcda0300899e3ce9a3fa593251cb38273 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 23 Oct 2016 16:33:26 +1100 Subject: [PATCH] Optimize piece priorities when reader position changes Gives a decent boost to throughput and reduces a lot of CPU when reading very quickly from Reader. --- TODO | 1 - reader.go | 28 +++++++++++++++++++++++++++- t.go | 1 + torrent.go | 13 ++++--------- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/TODO b/TODO index 091ae4a2..05f032a2 100644 --- a/TODO +++ b/TODO @@ -13,4 +13,3 @@ * Implement BEP 40. * Rewrite tracker package to be announce-centric, rather than client. Currently the clients are private and adapted onto by the Announce() func. * Move tracker management code in the torrent package to its own file. - * Optimize Reader.posChanged, it triggers all piece priorities to be recomputed. diff --git a/reader.go b/reader.go index e50ce2bf..8dc9eefc 100644 --- a/reader.go +++ b/reader.go @@ -11,7 +11,13 @@ import ( "golang.org/x/net/context" ) -// Accesses torrent data via a client. +// Piece range by piece index, [begin, end). +type pieceRange struct { + begin, end int +} + +// Accesses Torrent data via a Client. Reads block until the data is +// available. Seeks and readahead also drive Client behaviour. type Reader struct { t *Torrent responsive bool @@ -24,6 +30,10 @@ type Reader struct { mu sync.Locker pos int64 readahead int64 + // The cached piece range this reader wants downloaded. The zero value + // corresponds to nothing. We cache this so that changes can be detected, + // and bubbled up to the Torrent only as required. + pieces pieceRange } var _ io.ReadCloser = &Reader{} @@ -92,6 +102,17 @@ func (r *Reader) waitReadable(off int64) { r.t.cl.event.Wait() } +// Calculates the pieces this reader wants downloaded, ignoring the cached +// value at r.pieces. +func (r *Reader) piecesUncached() (ret pieceRange) { + ra := r.readahead + if ra < 1 { + ra = 1 + } + ret.begin, ret.end = r.t.byteRegionPieces(r.pos, ra) + return +} + func (r *Reader) Read(b []byte) (n int, err error) { return r.ReadContext(b, context.Background()) } @@ -193,6 +214,11 @@ func (r *Reader) Close() error { } func (r *Reader) posChanged() { + p := r.piecesUncached() + if p == r.pieces { + return + } + r.pieces = p r.t.readersChanged() } diff --git a/t.go b/t.go index fcf53d5a..138770ea 100644 --- a/t.go +++ b/t.go @@ -36,6 +36,7 @@ func (t *Torrent) NewReader() (ret *Reader) { t: t, readahead: 5 * 1024 * 1024, } + ret.pieces = ret.piecesUncached() t.addReader(ret) return } diff --git a/torrent.go b/torrent.go index b9ac2f55..e458dbbd 100644 --- a/torrent.go +++ b/torrent.go @@ -872,6 +872,7 @@ func (t *Torrent) updatePiecePriorities() { } } +// Returns the range of pieces [begin, end) that contains the extent of bytes. func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) { if off >= t.length { return @@ -896,17 +897,11 @@ func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) { // callers depend on this method to enumerate readers. func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) { for r := range t.readers { - // r.mu.Lock() - pos, readahead := r.pos, r.readahead - // r.mu.Unlock() - if readahead < 1 { - readahead = 1 - } - begin, end := t.byteRegionPieces(pos, readahead) - if begin >= end { + p := r.pieces + if p.begin >= p.end { continue } - if !f(begin, end) { + if !f(p.begin, p.end) { return false } }