Rework the Torrent Reader interface, to allow reader options, and add "responsive" as one such option
Had several weeks of testing. Removes a lot of the "helper" reading methods, but this was necessary to allow per-Torrent reading options.
This commit is contained in:
parent
ade6087b2f
commit
28b299e7c8
15
TODO
15
TODO
|
@ -1,7 +1,10 @@
|
|||
* Track upload and download data.
|
||||
* Emulate a UDP server in the UDP tracker tests.
|
||||
* Make use of sparse file regions in download data for faster hashing.
|
||||
* If we're choked and interested, we never send not-interested if there's nothing we want?
|
||||
* Don't announce torrents that don't need active peers. It spams UDP, fills memory, and publicizes what we have loaded.
|
||||
* Randomize triedAddrs bloom filter to allow different Addr sets on each Announce.
|
||||
* When lots of good connections, it'll do a huge readahead, then refuse to trickle new pieces because we sent not interested to them all, thereby reducing the number of unchoked connections.
|
||||
* Emulate a UDP server in the UDP tracker tests rather than communicating with the Internet.
|
||||
* Make use of sparse file regions in download data for faster hashing. This is available as whence 3 and 4 on some OS?
|
||||
* When we're choked and interested, are we not interested if there's no longer anything that we want?
|
||||
* dht: Randomize triedAddrs bloom filter to allow different Addr sets on each Announce.
|
||||
* dht: Verify that the triedAddrs bloom filter is working well, github's willf made a bunch of changes.
|
||||
* Rearrange the local-peer choked/interested status flags to be more natural to read.
|
||||
* Check that pruning is working correctly. worstConns sorting might need an adjustment to how it factors in the good/unwanted chunks ratio.
|
||||
* data/blob: Deleting incomplete data triggers io.ErrUnexpectedEOF that isn't recovered from.
|
||||
* Responsive reader needs to apply some readahead.
|
271
client.go
271
client.go
|
@ -257,80 +257,22 @@ func (cl *Client) WriteStatus(_w io.Writer) {
|
|||
}
|
||||
}
|
||||
|
||||
// Read torrent data at the given offset. Will block until it is available.
|
||||
func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) {
|
||||
cl.mu.Lock()
|
||||
defer cl.mu.Unlock()
|
||||
index := int(off / int64(t.usualPieceSize()))
|
||||
// Reading outside the bounds of a file is an error.
|
||||
if index < 0 {
|
||||
err = os.ErrInvalid
|
||||
return
|
||||
}
|
||||
if int(index) >= len(t.Pieces) {
|
||||
err = io.EOF
|
||||
return
|
||||
}
|
||||
pieceOff := pp.Integer(off % int64(t.usualPieceSize()))
|
||||
pieceLeft := int(t.pieceLength(index) - pieceOff)
|
||||
if pieceLeft <= 0 {
|
||||
err = io.EOF
|
||||
return
|
||||
}
|
||||
if len(p) > pieceLeft {
|
||||
p = p[:pieceLeft]
|
||||
}
|
||||
if len(p) == 0 {
|
||||
panic(len(p))
|
||||
}
|
||||
// TODO: ReadAt should always try to fill the buffer.
|
||||
for {
|
||||
avail := cl.prepareRead(t, off)
|
||||
if avail < int64(len(p)) {
|
||||
p = p[:avail]
|
||||
}
|
||||
n, err = dataReadAt(t.data, p, off)
|
||||
if n != 0 || err != io.ErrUnexpectedEOF {
|
||||
break
|
||||
}
|
||||
// If we reach here, the data we thought was ready, isn't. So we
|
||||
// prepare it again, and retry.
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Sets priorities to download from the given offset. Returns when the piece
|
||||
// at the given offset can be read. Returns the number of bytes that are
|
||||
// immediately available from the offset.
|
||||
func (cl *Client) prepareRead(t *torrent, off int64) (n int64) {
|
||||
index := int(off / int64(t.usualPieceSize()))
|
||||
// Reading outside the bounds of a file is an error.
|
||||
if index < 0 || index >= t.numPieces() {
|
||||
return
|
||||
}
|
||||
piece := t.Pieces[index]
|
||||
cl.readRaisePiecePriorities(t, off)
|
||||
for !t.pieceComplete(index) && !t.isClosed() {
|
||||
// This is to prevent being starved if a piece is dropped before we
|
||||
// can read it.
|
||||
cl.readRaisePiecePriorities(t, off)
|
||||
piece.Event.Wait()
|
||||
}
|
||||
return t.Info.Piece(index).Length() - off%t.Info.PieceLength
|
||||
}
|
||||
|
||||
func (T Torrent) prepareRead(off int64) (avail int64) {
|
||||
T.cl.mu.Lock()
|
||||
defer T.cl.mu.Unlock()
|
||||
return T.cl.prepareRead(T.torrent, off)
|
||||
}
|
||||
|
||||
// Data implements a streaming interface that's more efficient than ReadAt.
|
||||
// A Data that implements this has a streaming interface that should be
|
||||
// preferred over ReadAt. For example, the data is stored in blocks on the
|
||||
// network and have a fixed cost to open.
|
||||
type SectionOpener interface {
|
||||
// Open a ReadCloser at the given offset into torrent data. n is how many
|
||||
// bytes we intend to read.
|
||||
OpenSection(off, n int64) (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
|
||||
// defer func() {
|
||||
// if err == io.ErrUnexpectedEOF && n != 0 {
|
||||
// err = nil
|
||||
// }
|
||||
// }()
|
||||
// log.Println("data read at", len(b), off)
|
||||
again:
|
||||
if ra, ok := d.(io.ReaderAt); ok {
|
||||
return ra.ReadAt(b, off)
|
||||
|
@ -357,7 +299,7 @@ func readaheadPieces(readahead, pieceLength int64) int {
|
|||
return int((readahead+pieceLength-1)/pieceLength - 1)
|
||||
}
|
||||
|
||||
func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
|
||||
func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) {
|
||||
index := int(off / int64(t.usualPieceSize()))
|
||||
cl.raisePiecePriority(t, index, piecePriorityNow)
|
||||
index++
|
||||
|
@ -365,7 +307,7 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
|
|||
return
|
||||
}
|
||||
cl.raisePiecePriority(t, index, piecePriorityNext)
|
||||
for range iter.N(readaheadPieces(5*1024*1024, t.Info.PieceLength)) {
|
||||
for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) {
|
||||
index++
|
||||
if index >= t.numPieces() {
|
||||
break
|
||||
|
@ -374,6 +316,30 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
|
|||
}
|
||||
}
|
||||
|
||||
func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) {
|
||||
for n > 0 {
|
||||
req, ok := t.offsetRequest(off)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) {
|
||||
if t.urgent == nil {
|
||||
t.urgent = make(map[request]struct{}, (n+chunkSize-1)/chunkSize)
|
||||
}
|
||||
t.urgent[req] = struct{}{}
|
||||
cl.event.Broadcast() // Why?
|
||||
index := int(req.Index)
|
||||
cl.queueFirstHash(t, index)
|
||||
cl.pieceChanged(t, index)
|
||||
}
|
||||
reqOff := t.requestOffset(req)
|
||||
n1 := req.Length - pp.Integer(off-reqOff)
|
||||
off += int64(n1)
|
||||
n -= int(n1)
|
||||
}
|
||||
// log.Print(t.urgent)
|
||||
}
|
||||
|
||||
func (cl *Client) configDir() string {
|
||||
if cl._configDir == "" {
|
||||
return filepath.Join(os.Getenv("HOME"), ".config/torrent")
|
||||
|
@ -582,12 +548,12 @@ func NewClient(cfg *Config) (cl *Client, err error) {
|
|||
dhtCfg.Conn = cl.utpSock.PacketConn()
|
||||
}
|
||||
cl.dHT, err = dht.NewServer(dhtCfg)
|
||||
if cl.ipBlockList != nil {
|
||||
cl.dHT.SetIPBlockList(cl.ipBlockList)
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if cl.ipBlockList != nil {
|
||||
cl.dHT.SetIPBlockList(cl.ipBlockList)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
|
@ -1894,6 +1860,9 @@ func (cl *Client) setStorage(t *torrent, td data.Data) (err error) {
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
for index := range iter.N(t.numPieces()) {
|
||||
cl.pieceChanged(t, index)
|
||||
}
|
||||
cl.startTorrent(t)
|
||||
return
|
||||
}
|
||||
|
@ -1990,12 +1959,6 @@ func (t *torrent) addTrackers(announceList [][]string) {
|
|||
t.Trackers = newTrackers
|
||||
}
|
||||
|
||||
// A handle to a live torrent within a Client.
|
||||
type Torrent struct {
|
||||
cl *Client
|
||||
*torrent
|
||||
}
|
||||
|
||||
// Don't call this before the info is available.
|
||||
func (t *torrent) BytesCompleted() int64 {
|
||||
if !t.haveInfo() {
|
||||
|
@ -2014,23 +1977,6 @@ func (t Torrent) Drop() {
|
|||
t.cl.mu.Unlock()
|
||||
}
|
||||
|
||||
// Provides access to regions of torrent data that correspond to its files.
|
||||
type File struct {
|
||||
t Torrent
|
||||
path string
|
||||
offset int64
|
||||
length int64
|
||||
fi metainfo.FileInfo
|
||||
}
|
||||
|
||||
func (f File) FileInfo() metainfo.FileInfo {
|
||||
return f.fi
|
||||
}
|
||||
|
||||
func (f File) Path() string {
|
||||
return f.path
|
||||
}
|
||||
|
||||
// A file-like handle to some torrent data resource.
|
||||
type Handle interface {
|
||||
io.Reader
|
||||
|
@ -2039,114 +1985,6 @@ type Handle interface {
|
|||
io.ReaderAt
|
||||
}
|
||||
|
||||
// Implements a Handle within a subsection of another Handle.
|
||||
type sectionHandle struct {
|
||||
h Handle
|
||||
off, n, cur int64
|
||||
}
|
||||
|
||||
func (me *sectionHandle) Seek(offset int64, whence int) (ret int64, err error) {
|
||||
if whence == 0 {
|
||||
offset += me.off
|
||||
} else if whence == 2 {
|
||||
whence = 0
|
||||
offset += me.off + me.n
|
||||
}
|
||||
ret, err = me.h.Seek(offset, whence)
|
||||
me.cur = ret
|
||||
ret -= me.off
|
||||
return
|
||||
}
|
||||
|
||||
func (me *sectionHandle) Close() error {
|
||||
return me.h.Close()
|
||||
}
|
||||
|
||||
func (me *sectionHandle) Read(b []byte) (n int, err error) {
|
||||
max := me.off + me.n - me.cur
|
||||
if int64(len(b)) > max {
|
||||
b = b[:max]
|
||||
}
|
||||
n, err = me.h.Read(b)
|
||||
me.cur += int64(n)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if me.cur == me.off+me.n {
|
||||
err = io.EOF
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (me *sectionHandle) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
if off >= me.n {
|
||||
err = io.EOF
|
||||
return
|
||||
}
|
||||
if int64(len(b)) >= me.n-off {
|
||||
b = b[:me.n-off]
|
||||
}
|
||||
return me.h.ReadAt(b, me.off+off)
|
||||
}
|
||||
|
||||
func (f File) Open() (h Handle, err error) {
|
||||
h = f.t.NewReadHandle()
|
||||
_, err = h.Seek(f.offset, os.SEEK_SET)
|
||||
if err != nil {
|
||||
h.Close()
|
||||
return
|
||||
}
|
||||
h = §ionHandle{h, f.offset, f.Length(), f.offset}
|
||||
return
|
||||
}
|
||||
|
||||
func (f File) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
maxLen := f.length - off
|
||||
if int64(len(p)) > maxLen {
|
||||
p = p[:maxLen]
|
||||
}
|
||||
return f.t.ReadAt(p, off+f.offset)
|
||||
}
|
||||
|
||||
func (f *File) Length() int64 {
|
||||
return f.length
|
||||
}
|
||||
|
||||
type FilePieceState struct {
|
||||
Length int64
|
||||
State byte
|
||||
}
|
||||
|
||||
func (f *File) Progress() (ret []FilePieceState) {
|
||||
pieceSize := int64(f.t.usualPieceSize())
|
||||
off := f.offset % pieceSize
|
||||
remaining := f.length
|
||||
for i := int(f.offset / pieceSize); ; i++ {
|
||||
if remaining == 0 {
|
||||
break
|
||||
}
|
||||
len1 := pieceSize - off
|
||||
if len1 > remaining {
|
||||
len1 = remaining
|
||||
}
|
||||
ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)})
|
||||
off = 0
|
||||
remaining -= len1
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (f *File) PrioritizeRegion(off, len int64) {
|
||||
if off < 0 || off >= f.length {
|
||||
return
|
||||
}
|
||||
if off+len > f.length {
|
||||
len = f.length - off
|
||||
}
|
||||
off += f.offset
|
||||
f.t.SetRegionPriority(off, len)
|
||||
}
|
||||
|
||||
// Returns handles to the files in the torrent. This requires the metainfo is
|
||||
// available first.
|
||||
func (t Torrent) Files() (ret []File) {
|
||||
|
@ -2200,10 +2038,6 @@ func (t Torrent) DownloadAll() {
|
|||
t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, piecePriorityReadahead)
|
||||
}
|
||||
|
||||
func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
return me.cl.torrentReadAt(me.torrent, off, p)
|
||||
}
|
||||
|
||||
// Returns nil metainfo if it isn't in the cache. Checks that the retrieved
|
||||
// metainfo has the correct infohash.
|
||||
func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
|
||||
|
@ -2612,11 +2446,17 @@ func (me *Client) fillRequests(t *torrent, c *connection) {
|
|||
}
|
||||
}
|
||||
addRequest := func(req request) (again bool) {
|
||||
// TODO: Couldn't this check also be done *after* the request?
|
||||
if len(c.Requests) >= 64 {
|
||||
return false
|
||||
}
|
||||
return c.Request(req)
|
||||
}
|
||||
for req := range t.urgent {
|
||||
if !addRequest(req) {
|
||||
return
|
||||
}
|
||||
}
|
||||
for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
|
||||
pieceIndex := e.Piece()
|
||||
if !c.PeerHasPiece(pieceIndex) {
|
||||
|
@ -2664,7 +2504,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
|
|||
piece := t.Pieces[req.Index]
|
||||
|
||||
// Do we actually want this chunk?
|
||||
if _, ok := piece.PendingChunkSpecs[req.chunkSpec]; !ok || piece.Priority == piecePriorityNone {
|
||||
if !t.wantChunk(req) {
|
||||
unusedDownloadedChunksCount.Add(1)
|
||||
c.UnwantedChunksReceived++
|
||||
return nil
|
||||
|
@ -2679,8 +2519,11 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
|
|||
return fmt.Errorf("error writing chunk: %s", err)
|
||||
}
|
||||
|
||||
// log.Println("got chunk", req)
|
||||
piece.Event.Broadcast()
|
||||
// Record that we have the chunk.
|
||||
delete(piece.PendingChunkSpecs, req.chunkSpec)
|
||||
delete(t.urgent, req)
|
||||
if len(piece.PendingChunkSpecs) == 0 {
|
||||
for _, c := range t.Conns {
|
||||
c.pieceRequestOrder.DeletePiece(int(req.Index))
|
||||
|
@ -2717,18 +2560,24 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
|
|||
me.pieceChanged(t, int(piece))
|
||||
}
|
||||
|
||||
// TODO: Check this isn't called more than once for each piece being correct.
|
||||
func (me *Client) pieceChanged(t *torrent, piece int) {
|
||||
correct := t.pieceComplete(piece)
|
||||
p := t.Pieces[piece]
|
||||
if correct {
|
||||
p.Priority = piecePriorityNone
|
||||
p.PendingChunkSpecs = nil
|
||||
for req := range t.urgent {
|
||||
if int(req.Index) == piece {
|
||||
delete(t.urgent, req)
|
||||
}
|
||||
}
|
||||
p.Event.Broadcast()
|
||||
} else {
|
||||
if len(p.PendingChunkSpecs) == 0 {
|
||||
t.pendAllChunkSpecs(int(piece))
|
||||
}
|
||||
if p.Priority != piecePriorityNone {
|
||||
if t.wantPiece(piece) {
|
||||
me.openNewConns(t)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ package torrent
|
|||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
|
@ -273,9 +272,11 @@ func TestClientTransfer(t *testing.T) {
|
|||
Port: util.AddrPort(seeder.ListenAddr()),
|
||||
},
|
||||
})
|
||||
_greeting, err := ioutil.ReadAll(io.NewSectionReader(leecherGreeting, 0, leecherGreeting.Length()))
|
||||
r := leecherGreeting.NewReader()
|
||||
defer r.Close()
|
||||
_greeting, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
t.Fatalf("%q %s", string(_greeting), err)
|
||||
}
|
||||
greeting := string(_greeting)
|
||||
if greeting != testutil.GreetingFileContents {
|
||||
|
|
|
@ -106,8 +106,13 @@ func (cn *connection) pendPiece(piece int, priority piecePriority) {
|
|||
return
|
||||
}
|
||||
pp := cn.piecePriorities[piece]
|
||||
// Priority goes to Now, then Next in connection order. Then Readahead in
|
||||
// by piece index. Then normal again by connection order.
|
||||
// Priority regions not to scale. Within each region, piece is randomized
|
||||
// according to connection.
|
||||
|
||||
// [ Now ]
|
||||
// [ Next ]
|
||||
// [ Readahead ]
|
||||
// [ Normal ]
|
||||
key := func() int {
|
||||
switch priority {
|
||||
case piecePriorityNow:
|
||||
|
|
|
@ -3,6 +3,7 @@ package blob
|
|||
import (
|
||||
"encoding/hex"
|
||||
"io"
|
||||
"log"
|
||||
|
||||
"github.com/anacrolix/libtorgo/metainfo"
|
||||
)
|
||||
|
@ -19,16 +20,36 @@ func (me *data) pieceHashHex(i int) string {
|
|||
func (me *data) Close() {}
|
||||
|
||||
func (me *data) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
p := me.info.Piece(int(off / me.info.PieceLength))
|
||||
f := me.store.pieceRead(p)
|
||||
if f == nil {
|
||||
err = io.ErrUnexpectedEOF
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
n, err = f.ReadAt(b, off%me.info.PieceLength)
|
||||
if err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
for len(b) != 0 {
|
||||
if off >= me.info.TotalLength() {
|
||||
err = io.EOF
|
||||
break
|
||||
}
|
||||
p := me.info.Piece(int(off / me.info.PieceLength))
|
||||
f := me.store.pieceRead(p)
|
||||
if f == nil {
|
||||
log.Println("piece not found", p)
|
||||
err = io.ErrUnexpectedEOF
|
||||
break
|
||||
}
|
||||
b1 := b
|
||||
maxN1 := int(p.Length() - off%me.info.PieceLength)
|
||||
if len(b1) > maxN1 {
|
||||
b1 = b1[:maxN1]
|
||||
}
|
||||
var n1 int
|
||||
n1, err = f.ReadAt(b1, off%me.info.PieceLength)
|
||||
f.Close()
|
||||
n += n1
|
||||
off += int64(n1)
|
||||
b = b[n1:]
|
||||
if err == io.EOF {
|
||||
err = nil
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
package torrent
|
||||
|
||||
import "github.com/anacrolix/libtorgo/metainfo"
|
||||
|
||||
// Provides access to regions of torrent data that correspond to its files.
|
||||
type File struct {
|
||||
t Torrent
|
||||
path string
|
||||
offset int64
|
||||
length int64
|
||||
fi metainfo.FileInfo
|
||||
}
|
||||
|
||||
// Data for this file begins this far into the torrent.
|
||||
func (f *File) Offset() int64 {
|
||||
return f.offset
|
||||
}
|
||||
|
||||
func (f File) FileInfo() metainfo.FileInfo {
|
||||
return f.fi
|
||||
}
|
||||
|
||||
func (f File) Path() string {
|
||||
return f.path
|
||||
}
|
||||
|
||||
func (f *File) Length() int64 {
|
||||
return f.length
|
||||
}
|
||||
|
||||
type FilePieceState struct {
|
||||
Length int64
|
||||
State byte
|
||||
}
|
||||
|
||||
func (f *File) Progress() (ret []FilePieceState) {
|
||||
pieceSize := int64(f.t.usualPieceSize())
|
||||
off := f.offset % pieceSize
|
||||
remaining := f.length
|
||||
for i := int(f.offset / pieceSize); ; i++ {
|
||||
if remaining == 0 {
|
||||
break
|
||||
}
|
||||
len1 := pieceSize - off
|
||||
if len1 > remaining {
|
||||
len1 = remaining
|
||||
}
|
||||
ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)})
|
||||
off = 0
|
||||
remaining -= len1
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (f *File) PrioritizeRegion(off, len int64) {
|
||||
if off < 0 || off >= f.length {
|
||||
return
|
||||
}
|
||||
if off+len > f.length {
|
||||
len = f.length - off
|
||||
}
|
||||
off += f.offset
|
||||
f.t.SetRegionPriority(off, len)
|
||||
}
|
|
@ -91,7 +91,10 @@ func blockingRead(ctx context.Context, fs *TorrentFS, t torrent.Torrent, off int
|
|||
)
|
||||
readDone := make(chan struct{})
|
||||
go func() {
|
||||
_n, _err = t.ReadAt(p, off)
|
||||
r := t.NewReader()
|
||||
defer r.Close()
|
||||
_n, _err = r.ReadAt(p, off)
|
||||
log.Println(_n, p)
|
||||
close(readDone)
|
||||
}()
|
||||
select {
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
package torrent
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
// Accesses torrent data via a client.
|
||||
type Reader struct {
|
||||
t *Torrent
|
||||
pos int64
|
||||
responsive bool
|
||||
readahead int64
|
||||
}
|
||||
|
||||
var _ io.ReadCloser = &Reader{}
|
||||
|
||||
// Don't wait for pieces to complete and be verified. Read calls return as
|
||||
// soon as they can when the underlying chunks become available.
|
||||
func (r *Reader) SetResponsive() {
|
||||
r.responsive = true
|
||||
}
|
||||
|
||||
func (r *Reader) SetReadahead(readahead int64) {
|
||||
r.readahead = readahead
|
||||
}
|
||||
|
||||
func (r *Reader) raisePriorities(off int64, n int) {
|
||||
if r.responsive {
|
||||
r.t.cl.addUrgentRequests(r.t.torrent, off, n)
|
||||
}
|
||||
r.t.cl.readRaisePiecePriorities(r.t.torrent, off, int64(n)+r.readahead)
|
||||
}
|
||||
|
||||
func (r *Reader) readable(off int64) (ret bool) {
|
||||
// log.Println("readable", off)
|
||||
// defer func() {
|
||||
// log.Println("readable", ret)
|
||||
// }()
|
||||
req, ok := r.t.offsetRequest(off)
|
||||
if !ok {
|
||||
panic(off)
|
||||
}
|
||||
if r.responsive {
|
||||
return r.t.haveChunk(req)
|
||||
}
|
||||
return r.t.pieceComplete(int(req.Index))
|
||||
}
|
||||
|
||||
// How many bytes are available to read. Max is the most we could require.
|
||||
func (r *Reader) available(off, max int64) (ret int64) {
|
||||
for max > 0 {
|
||||
req, ok := r.t.offsetRequest(off)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if !r.t.haveChunk(req) {
|
||||
break
|
||||
}
|
||||
len1 := int64(req.Length) - (off - r.t.requestOffset(req))
|
||||
max -= len1
|
||||
ret += len1
|
||||
off += len1
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (r *Reader) waitReadable(off int64) {
|
||||
r.t.Pieces[off/int64(r.t.usualPieceSize())].Event.Wait()
|
||||
}
|
||||
|
||||
func (r *Reader) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
return r.readAt(b, off)
|
||||
}
|
||||
|
||||
func (r *Reader) Read(b []byte) (n int, err error) {
|
||||
n, err = r.readAt(b, r.pos)
|
||||
r.pos += int64(n)
|
||||
if n != 0 && err == io.ErrUnexpectedEOF {
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (r *Reader) readAt(b []byte, pos int64) (n int, err error) {
|
||||
// defer func() {
|
||||
// log.Println(pos, n, err)
|
||||
// }()
|
||||
r.t.cl.mu.Lock()
|
||||
defer r.t.cl.mu.Unlock()
|
||||
maxLen := r.t.Info.TotalLength() - pos
|
||||
if maxLen <= 0 {
|
||||
err = io.EOF
|
||||
return
|
||||
}
|
||||
if int64(len(b)) > maxLen {
|
||||
b = b[:maxLen]
|
||||
}
|
||||
r.raisePriorities(pos, len(b))
|
||||
for !r.readable(pos) {
|
||||
r.raisePriorities(pos, len(b))
|
||||
r.waitReadable(pos)
|
||||
}
|
||||
avail := r.available(pos, int64(len(b)))
|
||||
// log.Println("available", avail)
|
||||
if int64(len(b)) > avail {
|
||||
b = b[:avail]
|
||||
}
|
||||
n, err = dataReadAt(r.t.data, b, pos)
|
||||
return
|
||||
}
|
||||
|
||||
func (r *Reader) Close() error {
|
||||
r.t = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
|
||||
switch whence {
|
||||
case os.SEEK_SET:
|
||||
r.pos = off
|
||||
case os.SEEK_CUR:
|
||||
r.pos += off
|
||||
case os.SEEK_END:
|
||||
r.pos = r.t.Info.TotalLength() + off
|
||||
default:
|
||||
err = errors.New("bad whence")
|
||||
}
|
||||
ret = r.pos
|
||||
return
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
package torrent
|
||||
|
||||
// The public interface for a torrent within a Client.
|
||||
|
||||
// A handle to a live torrent within a Client.
|
||||
type Torrent struct {
|
||||
cl *Client
|
||||
*torrent
|
||||
}
|
||||
|
||||
func (t *Torrent) NewReader() (ret *Reader) {
|
||||
ret = &Reader{
|
||||
t: t,
|
||||
readahead: 5 * 1024 * 1024,
|
||||
}
|
||||
return
|
||||
}
|
136
torrent.go
136
torrent.go
|
@ -2,12 +2,10 @@ package torrent
|
|||
|
||||
import (
|
||||
"container/heap"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -66,6 +64,9 @@ type torrent struct {
|
|||
|
||||
InfoHash InfoHash
|
||||
Pieces []*piece
|
||||
// Chunks that are wanted before all others. This is for
|
||||
// responsive/streaming readers that want to unblock ASAP.
|
||||
urgent map[request]struct{}
|
||||
// Total length of the torrent in bytes. Stored because it's not O(1) to
|
||||
// get this from the info dict.
|
||||
length int64
|
||||
|
@ -110,91 +111,6 @@ func (t *torrent) pieceComplete(piece int) bool {
|
|||
return t.data != nil && t.data.PieceComplete(piece)
|
||||
}
|
||||
|
||||
// A file-like handle to torrent data that implements SectionOpener. Opened
|
||||
// sections will be reused so long as Reads and ReadAt's are contiguous.
|
||||
type handle struct {
|
||||
rc io.ReadCloser
|
||||
rcOff int64
|
||||
curOff int64
|
||||
so SectionOpener
|
||||
size int64
|
||||
t Torrent
|
||||
}
|
||||
|
||||
func (h *handle) Close() error {
|
||||
if h.rc != nil {
|
||||
return h.rc.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *handle) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
return h.readAt(b, off)
|
||||
}
|
||||
|
||||
func (h *handle) readAt(b []byte, off int64) (n int, err error) {
|
||||
avail := h.t.prepareRead(off)
|
||||
if int64(len(b)) > avail {
|
||||
b = b[:avail]
|
||||
}
|
||||
if int64(len(b)) > h.size-off {
|
||||
b = b[:h.size-off]
|
||||
}
|
||||
if h.rcOff != off && h.rc != nil {
|
||||
h.rc.Close()
|
||||
h.rc = nil
|
||||
}
|
||||
if h.rc == nil {
|
||||
h.rc, err = h.so.OpenSection(off, h.size-off)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
h.rcOff = off
|
||||
}
|
||||
n, err = h.rc.Read(b)
|
||||
h.rcOff += int64(n)
|
||||
return
|
||||
}
|
||||
|
||||
func (h *handle) Read(b []byte) (n int, err error) {
|
||||
n, err = h.readAt(b, h.curOff)
|
||||
h.curOff = h.rcOff
|
||||
return
|
||||
}
|
||||
|
||||
func (h *handle) Seek(off int64, whence int) (newOff int64, err error) {
|
||||
switch whence {
|
||||
case os.SEEK_SET:
|
||||
h.curOff = off
|
||||
case os.SEEK_CUR:
|
||||
h.curOff += off
|
||||
case os.SEEK_END:
|
||||
h.curOff = h.size + off
|
||||
default:
|
||||
err = errors.New("bad whence")
|
||||
}
|
||||
newOff = h.curOff
|
||||
return
|
||||
}
|
||||
|
||||
// Implements Handle on top of an io.SectionReader.
|
||||
type sectionReaderHandle struct {
|
||||
*io.SectionReader
|
||||
}
|
||||
|
||||
func (sectionReaderHandle) Close() error { return nil }
|
||||
|
||||
func (T Torrent) NewReadHandle() Handle {
|
||||
if so, ok := T.data.(SectionOpener); ok {
|
||||
return &handle{
|
||||
so: so,
|
||||
size: T.Length(),
|
||||
t: T,
|
||||
}
|
||||
}
|
||||
return sectionReaderHandle{io.NewSectionReader(T, 0, T.Length())}
|
||||
}
|
||||
|
||||
func (t *torrent) numConnsUnchoked() (num int) {
|
||||
for _, c := range t.Conns {
|
||||
if !c.PeerChoked {
|
||||
|
@ -238,7 +154,9 @@ func (t *torrent) ceaseNetworking() {
|
|||
for _, c := range t.Conns {
|
||||
c.Close()
|
||||
}
|
||||
t.pruneTimer.Stop()
|
||||
if t.pruneTimer != nil {
|
||||
t.pruneTimer.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *torrent) addPeer(p Peer) {
|
||||
|
@ -502,6 +420,11 @@ func (t *torrent) writeStatus(w io.Writer) {
|
|||
}
|
||||
fmt.Fprintln(w)
|
||||
}
|
||||
fmt.Fprintf(w, "Urgent:")
|
||||
for req := range t.urgent {
|
||||
fmt.Fprintf(w, " %s", req)
|
||||
}
|
||||
fmt.Fprintln(w)
|
||||
fmt.Fprintf(w, "Trackers: ")
|
||||
for _, tier := range t.Trackers {
|
||||
for _, tr := range tier {
|
||||
|
@ -647,6 +570,7 @@ func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
|
|||
|
||||
func (t *torrent) bitfield() (bf []bool) {
|
||||
for _, p := range t.Pieces {
|
||||
// TODO: Check this logic.
|
||||
bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
|
||||
}
|
||||
return
|
||||
|
@ -732,11 +656,12 @@ func (t *torrent) havePiece(index int) bool {
|
|||
}
|
||||
|
||||
func (t *torrent) haveChunk(r request) bool {
|
||||
p := t.Pieces[r.Index]
|
||||
if !p.EverHashed {
|
||||
if !t.haveInfo() {
|
||||
return false
|
||||
}
|
||||
_, ok := p.PendingChunkSpecs[r.chunkSpec]
|
||||
piece := t.Pieces[r.Index]
|
||||
_, ok := piece.PendingChunkSpecs[r.chunkSpec]
|
||||
// log.Println("have chunk", r, !ok)
|
||||
return !ok
|
||||
}
|
||||
|
||||
|
@ -745,16 +670,41 @@ func (t *torrent) wantChunk(r request) bool {
|
|||
return false
|
||||
}
|
||||
_, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec]
|
||||
if ok {
|
||||
return true
|
||||
}
|
||||
_, ok = t.urgent[r]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (t *torrent) urgentChunkInPiece(piece int) bool {
|
||||
for req := range t.urgent {
|
||||
if int(req.Index) == piece {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *torrent) wantPiece(index int) bool {
|
||||
if !t.haveInfo() {
|
||||
return false
|
||||
}
|
||||
p := t.Pieces[index]
|
||||
// Put piece complete check last, since it's the slowest!
|
||||
return p.Priority != piecePriorityNone && !p.QueuedForHash && !p.Hashing && !t.pieceComplete(index)
|
||||
if p.QueuedForHash {
|
||||
return false
|
||||
}
|
||||
if p.Hashing {
|
||||
return false
|
||||
}
|
||||
if p.Priority == piecePriorityNone {
|
||||
if !t.urgentChunkInPiece(index) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
// Put piece complete check last, since it's the slowest as it can involve
|
||||
// calling out into external data stores.
|
||||
return !t.pieceComplete(index)
|
||||
}
|
||||
|
||||
func (t *torrent) connHasWantedPieces(c *connection) bool {
|
||||
|
|
Loading…
Reference in New Issue