Add default sqrt readahead algorithm
This commit is contained in:
parent
4912ae2781
commit
e8b496bee6
46
reader.go
46
reader.go
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"github.com/anacrolix/log"
|
||||
|
@ -43,6 +44,10 @@ type reader struct {
|
|||
mu sync.Locker
|
||||
pos int64
|
||||
readahead int64
|
||||
// Function to dynamically calculate readahead. If nil, readahead is static.
|
||||
readaheadFunc func() int64
|
||||
// Position that reads have continued contiguously from.
|
||||
contiguousReadStartPos int64
|
||||
// The cached piece range this reader wants downloaded. The zero value corresponds to nothing.
|
||||
// We cache this so that changes can be detected, and bubbled up to the Torrent only as
|
||||
// required.
|
||||
|
@ -65,6 +70,7 @@ func (r *reader) SetNonResponsive() {
|
|||
func (r *reader) SetReadahead(readahead int64) {
|
||||
r.mu.Lock()
|
||||
r.readahead = readahead
|
||||
r.readaheadFunc = nil
|
||||
r.mu.Unlock()
|
||||
r.t.cl.lock()
|
||||
defer r.t.cl.unlock()
|
||||
|
@ -100,6 +106,9 @@ func (r *reader) available(off, max int64) (ret int64) {
|
|||
// Calculates the pieces this reader wants downloaded, ignoring the cached value at r.pieces.
|
||||
func (r *reader) piecesUncached() (ret pieceRange) {
|
||||
ra := r.readahead
|
||||
if r.readaheadFunc != nil {
|
||||
ra = r.readaheadFunc()
|
||||
}
|
||||
if ra < 1 {
|
||||
// Needs to be at least 1, because [x, x) means we don't want
|
||||
// anything.
|
||||
|
@ -249,23 +258,31 @@ func (r *reader) posChanged() {
|
|||
r.t.readerPosChanged(from, to)
|
||||
}
|
||||
|
||||
func (r *reader) Seek(off int64, whence int) (ret int64, err error) {
|
||||
func (r *reader) Seek(off int64, whence int) (newPos int64, err error) {
|
||||
r.opMu.Lock()
|
||||
defer r.opMu.Unlock()
|
||||
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
r.pos = off
|
||||
case io.SeekCurrent:
|
||||
r.pos += off
|
||||
case io.SeekEnd:
|
||||
r.pos = r.length + off
|
||||
default:
|
||||
err = errors.New("bad whence")
|
||||
newPos, err = func() (int64, error) {
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
return off, err
|
||||
case io.SeekCurrent:
|
||||
return r.pos + off, nil
|
||||
case io.SeekEnd:
|
||||
return r.length + off, nil
|
||||
default:
|
||||
return r.pos, errors.New("bad whence")
|
||||
}
|
||||
}()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ret = r.pos
|
||||
if newPos == r.pos {
|
||||
return
|
||||
}
|
||||
r.pos = newPos
|
||||
r.contiguousReadStartPos = newPos
|
||||
|
||||
r.posChanged()
|
||||
return
|
||||
|
@ -274,3 +291,8 @@ func (r *reader) Seek(off int64, whence int) (ret int64, err error) {
|
|||
func (r *reader) log(m log.Msg) {
|
||||
r.t.logger.Log(m.Skip(1))
|
||||
}
|
||||
|
||||
// Implementation inspired from an arbitrary comment I found on HN.
|
||||
func (r *reader) sqrtReadahead() int64 {
|
||||
return int64(math.Sqrt(float64(r.pos - r.contiguousReadStartPos)))
|
||||
}
|
||||
|
|
10
t.go
10
t.go
|
@ -37,12 +37,12 @@ func (t *Torrent) NewReader() Reader {
|
|||
|
||||
func (t *Torrent) newReader(offset, length int64) Reader {
|
||||
r := reader{
|
||||
mu: t.cl.locker(),
|
||||
t: t,
|
||||
readahead: 5 * 1024 * 1024,
|
||||
offset: offset,
|
||||
length: length,
|
||||
mu: t.cl.locker(),
|
||||
t: t,
|
||||
offset: offset,
|
||||
length: length,
|
||||
}
|
||||
r.readaheadFunc = r.sqrtReadahead
|
||||
t.addReader(&r)
|
||||
return &r
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue