From 8b17d2a63a5bf638f9427216a91e490e2954d021 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 30 Aug 2016 15:41:26 +1000 Subject: [PATCH] Rework Reader position changes affecting piece priorities Torrent.updatePiecePriorities is always followed by Torrent.piecePriorityChanged, so move it into the former function. Also drop the separate mutex for Reader, and add a missing Reader.posChanged call. --- client.go | 6 +--- reader.go | 7 ++--- t.go | 1 + torrent.go | 84 +++++++++++++++++++++++------------------------------- 4 files changed, 41 insertions(+), 57 deletions(-) diff --git a/client.go b/client.go index cae1ee7a..4de098f3 100644 --- a/client.go +++ b/client.go @@ -1715,10 +1715,7 @@ func (cl *Client) pieceChanged(t *Torrent, piece int) { } else { cl.onFailedPiece(t, piece) } - if t.updatePiecePriority(piece) { - t.piecePriorityChanged(piece) - } - t.publishPieceChange(piece) + t.updatePiecePriority(piece) } func (cl *Client) verifyPiece(t *Torrent, piece int) { @@ -1731,7 +1728,6 @@ func (cl *Client) verifyPiece(t *Torrent, piece int) { p.QueuedForHash = false if t.closed.IsSet() || t.pieceComplete(piece) { t.updatePiecePriority(piece) - t.publishPieceChange(piece) return } p.Hashing = true diff --git a/reader.go b/reader.go index a4178179..e50ce2bf 100644 --- a/reader.go +++ b/reader.go @@ -21,7 +21,7 @@ type Reader struct { // Required when modifying pos and readahead, or reading them without // opMu. - mu sync.Mutex + mu sync.Locker pos int64 readahead int64 } @@ -128,6 +128,7 @@ func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) { n += n1 r.mu.Lock() r.pos += int64(n1) + r.posChanged() r.mu.Unlock() } if r.pos >= r.t.length { @@ -192,8 +193,6 @@ func (r *Reader) Close() error { } func (r *Reader) posChanged() { - r.t.cl.mu.Lock() - defer r.t.cl.mu.Unlock() r.t.readersChanged() } @@ -202,6 +201,7 @@ func (r *Reader) Seek(off int64, whence int) (ret int64, err error) { defer r.opMu.Unlock() r.mu.Lock() + defer r.mu.Unlock() switch whence { case os.SEEK_SET: r.pos = off @@ -213,7 +213,6 @@ func (r *Reader) Seek(off int64, whence int) (ret int64, err error) { err = errors.New("bad whence") } ret = r.pos - r.mu.Unlock() r.posChanged() return diff --git a/t.go b/t.go index 2b985333..fcf53d5a 100644 --- a/t.go +++ b/t.go @@ -32,6 +32,7 @@ func (t *Torrent) Info() *metainfo.Info { // the data requested is actually available. func (t *Torrent) NewReader() (ret *Reader) { ret = &Reader{ + mu: &t.cl.mu, t: t, readahead: 5 * 1024 * 1024, } diff --git a/torrent.go b/torrent.go index c5f90f08..5c592f42 100644 --- a/torrent.go +++ b/torrent.go @@ -97,7 +97,9 @@ type Torrent struct { // Set when .Info is obtained. gotMetainfo missinggo.Event - readers map[*Reader]struct{} + readers map[*Reader]struct{} + readerNowPieces bitmap.Bitmap + readerReadaheadPieces bitmap.Bitmap // The indexes of pieces we want with normal priority, that aren't // currently available. @@ -824,6 +826,7 @@ func (t *Torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) { } func (t *Torrent) readersChanged() { + t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities() t.updatePiecePriorities() } @@ -841,42 +844,21 @@ func (t *Torrent) piecePriorityChanged(piece int) { t.publishPieceChange(piece) } -func (t *Torrent) updatePiecePriority(piece int) bool { +func (t *Torrent) updatePiecePriority(piece int) { p := &t.pieces[piece] newPrio := t.piecePriorityUncached(piece) if newPrio == p.priority { - return false + return } p.priority = newPrio - return true + t.piecePriorityChanged(piece) } // Update all piece priorities in one hit. This function should have the same // output as updatePiecePriority, but across all pieces. func (t *Torrent) updatePiecePriorities() { - newPrios := make([]piecePriority, t.numPieces()) - t.pendingPieces.IterTyped(func(piece int) (more bool) { - newPrios[piece] = PiecePriorityNormal - return true - }) - t.forReaderOffsetPieces(func(begin, end int) (next bool) { - if begin < end { - newPrios[begin].Raise(PiecePriorityNow) - } - for i := begin + 1; i < end; i++ { - newPrios[i].Raise(PiecePriorityReadahead) - } - return true - }) - t.completedPieces.IterTyped(func(piece int) (more bool) { - newPrios[piece] = PiecePriorityNone - return true - }) - for i, prio := range newPrios { - if prio != t.pieces[i].priority { - t.pieces[i].priority = prio - t.piecePriorityChanged(i) - } + for i := range t.pieces { + t.updatePiecePriority(i) } } @@ -904,9 +886,9 @@ func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) { // callers depend on this method to enumerate readers. func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) { for r := range t.readers { - r.mu.Lock() + // r.mu.Lock() pos, readahead := r.pos, r.readahead - r.mu.Unlock() + // r.mu.Unlock() if readahead < 1 { readahead = 1 } @@ -928,25 +910,23 @@ func (t *Torrent) piecePriority(piece int) piecePriority { return t.pieces[piece].priority } -func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) { - ret = PiecePriorityNone +func (t *Torrent) piecePriorityUncached(piece int) piecePriority { if t.pieceComplete(piece) { - return + return PiecePriorityNone + } + if t.readerNowPieces.Contains(piece) { + return PiecePriorityNow + } + // if t.readerNowPieces.Contains(piece - 1) { + // return PiecePriorityNext + // } + if t.readerReadaheadPieces.Contains(piece) { + return PiecePriorityReadahead } if t.pendingPieces.Contains(piece) { - ret = PiecePriorityNormal + return PiecePriorityNormal } - raiseRet := ret.Raise - t.forReaderOffsetPieces(func(begin, end int) (again bool) { - if piece == begin { - raiseRet(PiecePriorityNow) - } - if begin <= piece && piece < end { - raiseRet(PiecePriorityReadahead) - } - return true - }) - return + return PiecePriorityNone } func (t *Torrent) pendPiece(piece int) { @@ -957,10 +937,7 @@ func (t *Torrent) pendPiece(piece int) { return } t.pendingPieces.Add(piece) - if !t.updatePiecePriority(piece) { - return - } - t.piecePriorityChanged(piece) + t.updatePiecePriority(piece) } func (t *Torrent) getCompletedPieces() (ret bitmap.Bitmap) { @@ -1075,6 +1052,17 @@ func (t *Torrent) readerPieces() (ret bitmap.Bitmap) { return } +func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) { + t.forReaderOffsetPieces(func(begin, end int) bool { + if end > begin { + now.Add(begin) + readahead.AddRange(begin+1, end) + } + return true + }) + return +} + func (t *Torrent) needData() bool { if t.closed.IsSet() { return false