Rework Reader position changes affecting piece priorities

Torrent.updatePiecePriorities is always followed by Torrent.piecePriorityChanged, so move it into the former function. Also drop the separate mutex for Reader, and add a missing Reader.posChanged call.
This commit is contained in:
Matt Joiner 2016-08-30 15:41:26 +10:00
parent 6c15a030d6
commit 8b17d2a63a
4 changed files with 41 additions and 57 deletions

View File

@ -1715,10 +1715,7 @@ func (cl *Client) pieceChanged(t *Torrent, piece int) {
} else { } else {
cl.onFailedPiece(t, piece) cl.onFailedPiece(t, piece)
} }
if t.updatePiecePriority(piece) { t.updatePiecePriority(piece)
t.piecePriorityChanged(piece)
}
t.publishPieceChange(piece)
} }
func (cl *Client) verifyPiece(t *Torrent, piece int) { func (cl *Client) verifyPiece(t *Torrent, piece int) {
@ -1731,7 +1728,6 @@ func (cl *Client) verifyPiece(t *Torrent, piece int) {
p.QueuedForHash = false p.QueuedForHash = false
if t.closed.IsSet() || t.pieceComplete(piece) { if t.closed.IsSet() || t.pieceComplete(piece) {
t.updatePiecePriority(piece) t.updatePiecePriority(piece)
t.publishPieceChange(piece)
return return
} }
p.Hashing = true p.Hashing = true

View File

@ -21,7 +21,7 @@ type Reader struct {
// Required when modifying pos and readahead, or reading them without // Required when modifying pos and readahead, or reading them without
// opMu. // opMu.
mu sync.Mutex mu sync.Locker
pos int64 pos int64
readahead int64 readahead int64
} }
@ -128,6 +128,7 @@ func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) {
n += n1 n += n1
r.mu.Lock() r.mu.Lock()
r.pos += int64(n1) r.pos += int64(n1)
r.posChanged()
r.mu.Unlock() r.mu.Unlock()
} }
if r.pos >= r.t.length { if r.pos >= r.t.length {
@ -192,8 +193,6 @@ func (r *Reader) Close() error {
} }
func (r *Reader) posChanged() { func (r *Reader) posChanged() {
r.t.cl.mu.Lock()
defer r.t.cl.mu.Unlock()
r.t.readersChanged() r.t.readersChanged()
} }
@ -202,6 +201,7 @@ func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
defer r.opMu.Unlock() defer r.opMu.Unlock()
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock()
switch whence { switch whence {
case os.SEEK_SET: case os.SEEK_SET:
r.pos = off r.pos = off
@ -213,7 +213,6 @@ func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
err = errors.New("bad whence") err = errors.New("bad whence")
} }
ret = r.pos ret = r.pos
r.mu.Unlock()
r.posChanged() r.posChanged()
return return

1
t.go
View File

@ -32,6 +32,7 @@ func (t *Torrent) Info() *metainfo.Info {
// the data requested is actually available. // the data requested is actually available.
func (t *Torrent) NewReader() (ret *Reader) { func (t *Torrent) NewReader() (ret *Reader) {
ret = &Reader{ ret = &Reader{
mu: &t.cl.mu,
t: t, t: t,
readahead: 5 * 1024 * 1024, readahead: 5 * 1024 * 1024,
} }

View File

@ -97,7 +97,9 @@ type Torrent struct {
// Set when .Info is obtained. // Set when .Info is obtained.
gotMetainfo missinggo.Event gotMetainfo missinggo.Event
readers map[*Reader]struct{} readers map[*Reader]struct{}
readerNowPieces bitmap.Bitmap
readerReadaheadPieces bitmap.Bitmap
// The indexes of pieces we want with normal priority, that aren't // The indexes of pieces we want with normal priority, that aren't
// currently available. // currently available.
@ -824,6 +826,7 @@ func (t *Torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
} }
func (t *Torrent) readersChanged() { func (t *Torrent) readersChanged() {
t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
t.updatePiecePriorities() t.updatePiecePriorities()
} }
@ -841,42 +844,21 @@ func (t *Torrent) piecePriorityChanged(piece int) {
t.publishPieceChange(piece) t.publishPieceChange(piece)
} }
func (t *Torrent) updatePiecePriority(piece int) bool { func (t *Torrent) updatePiecePriority(piece int) {
p := &t.pieces[piece] p := &t.pieces[piece]
newPrio := t.piecePriorityUncached(piece) newPrio := t.piecePriorityUncached(piece)
if newPrio == p.priority { if newPrio == p.priority {
return false return
} }
p.priority = newPrio p.priority = newPrio
return true t.piecePriorityChanged(piece)
} }
// Update all piece priorities in one hit. This function should have the same // Update all piece priorities in one hit. This function should have the same
// output as updatePiecePriority, but across all pieces. // output as updatePiecePriority, but across all pieces.
func (t *Torrent) updatePiecePriorities() { func (t *Torrent) updatePiecePriorities() {
newPrios := make([]piecePriority, t.numPieces()) for i := range t.pieces {
t.pendingPieces.IterTyped(func(piece int) (more bool) { t.updatePiecePriority(i)
newPrios[piece] = PiecePriorityNormal
return true
})
t.forReaderOffsetPieces(func(begin, end int) (next bool) {
if begin < end {
newPrios[begin].Raise(PiecePriorityNow)
}
for i := begin + 1; i < end; i++ {
newPrios[i].Raise(PiecePriorityReadahead)
}
return true
})
t.completedPieces.IterTyped(func(piece int) (more bool) {
newPrios[piece] = PiecePriorityNone
return true
})
for i, prio := range newPrios {
if prio != t.pieces[i].priority {
t.pieces[i].priority = prio
t.piecePriorityChanged(i)
}
} }
} }
@ -904,9 +886,9 @@ func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
// callers depend on this method to enumerate readers. // callers depend on this method to enumerate readers.
func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) { func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
for r := range t.readers { for r := range t.readers {
r.mu.Lock() // r.mu.Lock()
pos, readahead := r.pos, r.readahead pos, readahead := r.pos, r.readahead
r.mu.Unlock() // r.mu.Unlock()
if readahead < 1 { if readahead < 1 {
readahead = 1 readahead = 1
} }
@ -928,25 +910,23 @@ func (t *Torrent) piecePriority(piece int) piecePriority {
return t.pieces[piece].priority return t.pieces[piece].priority
} }
func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) { func (t *Torrent) piecePriorityUncached(piece int) piecePriority {
ret = PiecePriorityNone
if t.pieceComplete(piece) { if t.pieceComplete(piece) {
return return PiecePriorityNone
}
if t.readerNowPieces.Contains(piece) {
return PiecePriorityNow
}
// if t.readerNowPieces.Contains(piece - 1) {
// return PiecePriorityNext
// }
if t.readerReadaheadPieces.Contains(piece) {
return PiecePriorityReadahead
} }
if t.pendingPieces.Contains(piece) { if t.pendingPieces.Contains(piece) {
ret = PiecePriorityNormal return PiecePriorityNormal
} }
raiseRet := ret.Raise return PiecePriorityNone
t.forReaderOffsetPieces(func(begin, end int) (again bool) {
if piece == begin {
raiseRet(PiecePriorityNow)
}
if begin <= piece && piece < end {
raiseRet(PiecePriorityReadahead)
}
return true
})
return
} }
func (t *Torrent) pendPiece(piece int) { func (t *Torrent) pendPiece(piece int) {
@ -957,10 +937,7 @@ func (t *Torrent) pendPiece(piece int) {
return return
} }
t.pendingPieces.Add(piece) t.pendingPieces.Add(piece)
if !t.updatePiecePriority(piece) { t.updatePiecePriority(piece)
return
}
t.piecePriorityChanged(piece)
} }
func (t *Torrent) getCompletedPieces() (ret bitmap.Bitmap) { func (t *Torrent) getCompletedPieces() (ret bitmap.Bitmap) {
@ -1075,6 +1052,17 @@ func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
return return
} }
func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
t.forReaderOffsetPieces(func(begin, end int) bool {
if end > begin {
now.Add(begin)
readahead.AddRange(begin+1, end)
}
return true
})
return
}
func (t *Torrent) needData() bool { func (t *Torrent) needData() bool {
if t.closed.IsSet() { if t.closed.IsSet() {
return false return false