Add File.NewReader

Fixes #222
This commit is contained in:
Matt Joiner 2018-01-06 16:37:13 +11:00
parent e13b0eccbf
commit 9b718566ba
11 changed files with 96 additions and 86 deletions

View File

@ -15,7 +15,6 @@ import (
"time" "time"
_ "github.com/anacrolix/envpprof" _ "github.com/anacrolix/envpprof"
"github.com/anacrolix/missinggo"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/jessevdk/go-flags" "github.com/jessevdk/go-flags"
@ -163,7 +162,7 @@ func main() {
if file.DisplayPath() != rootGroup.Pick { if file.DisplayPath() != rootGroup.Pick {
continue continue
} }
srcReader := missinggo.NewSectionReadSeeker(t.NewReader(), file.Offset(), file.Length()) srcReader := file.NewReader()
io.Copy(dstWriter, srcReader) io.Copy(dstWriter, srcReader)
return return
} }

View File

@ -3,8 +3,6 @@ package torrent_test
import ( import (
"log" "log"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
) )
@ -19,13 +17,9 @@ func Example() {
} }
func Example_fileReader() { func Example_fileReader() {
var ( var f torrent.File
t *torrent.Torrent // Accesses the parts of the torrent pertaining to f. Data will be
f torrent.File
)
r := t.NewReader()
defer r.Close()
// Access the parts of the torrent pertaining to f. Data will be
// downloaded as required, per the configuration of the torrent.Reader. // downloaded as required, per the configuration of the torrent.Reader.
_ = missinggo.NewSectionReadSeeker(r, f.Offset(), f.Length()) r := f.NewReader()
defer r.Close()
} }

View File

@ -3,6 +3,7 @@ package torrent
import ( import (
"strings" "strings"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
) )
@ -99,3 +100,8 @@ func (f *File) exclusivePieces() (begin, end int) {
func (f *File) Cancel() { func (f *File) Cancel() {
f.t.CancelPieces(f.exclusivePieces()) f.t.CancelPieces(f.exclusivePieces())
} }
func (f *File) NewReader() Reader {
tr := f.t.NewReader()
return fileReader{missinggo.NewSectionReadSeeker(tr, f.Offset(), f.Length()), tr}
}

18
file_reader.go Normal file
View File

@ -0,0 +1,18 @@
package torrent
import (
"io"
"github.com/anacrolix/missinggo"
)
type fileReaderInherited interface {
io.Closer
SetReadahead(int64)
SetResponsive()
}
type fileReader struct {
missinggo.ReadSeekContexter
fileReaderInherited
}

View File

@ -13,7 +13,7 @@ import (
type fileHandle struct { type fileHandle struct {
fn fileNode fn fileNode
r *torrent.Reader r torrent.Reader
} }
var _ interface { var _ interface {
@ -26,11 +26,11 @@ func (me fileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse
if req.Dir { if req.Dir {
panic("read on directory") panic("read on directory")
} }
pos, err := me.r.Seek(me.fn.TorrentOffset+req.Offset, io.SeekStart) pos, err := me.r.Seek(req.Offset, io.SeekStart)
if err != nil { if err != nil {
panic(err) panic(err)
} }
if pos != me.fn.TorrentOffset+req.Offset { if pos != req.Offset {
panic("seek failed") panic("seek failed")
} }
resp.Data = resp.Data[:req.Size] resp.Data = resp.Data[:req.Size]

View File

@ -1,17 +1,15 @@
package torrentfs package torrentfs
import ( import (
"io"
"bazil.org/fuse" "bazil.org/fuse"
fusefs "bazil.org/fuse/fs" fusefs "bazil.org/fuse/fs"
"github.com/anacrolix/torrent"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
type fileNode struct { type fileNode struct {
node node
size uint64 f *torrent.File
TorrentOffset int64
} }
var ( var (
@ -19,13 +17,12 @@ var (
) )
func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error { func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error {
attr.Size = fn.size attr.Size = uint64(fn.f.Length())
attr.Mode = defaultMode attr.Mode = defaultMode
return nil return nil
} }
func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) { func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) {
r := fn.t.NewReader() r := fn.f.NewReader()
r.Seek(fn.TorrentOffset, io.SeekStart)
return fileHandle{fn, r}, nil return fileHandle{fn, r}, nil
} }

View File

@ -3,7 +3,6 @@ package torrentfs
import ( import (
"expvar" "expvar"
"os" "os"
"path"
"strings" "strings"
"sync" "sync"
@ -67,11 +66,12 @@ func isSubPath(parent, child string) bool {
if !strings.HasPrefix(child, parent) { if !strings.HasPrefix(child, parent) {
return false return false
} }
s := child[len(parent):] extra := child[len(parent):]
if len(s) == 0 { if len(extra) == 0 {
return false return false
} }
return s[0] == '/' // Not just a file with more stuff on the end.
return extra[0] == '/'
} }
func (dn dirNode) ReadDirAll(ctx context.Context) (des []fuse.Dirent, err error) { func (dn dirNode) ReadDirAll(ctx context.Context) (des []fuse.Dirent, err error) {
@ -98,34 +98,30 @@ func (dn dirNode) ReadDirAll(ctx context.Context) (des []fuse.Dirent, err error)
return return
} }
func (dn dirNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) { func (dn dirNode) Lookup(_ context.Context, name string) (fusefs.Node, error) {
var torrentOffset int64 dir := false
for _, fi := range dn.metadata.Files { var file *torrent.File
if !isSubPath(dn.path, strings.Join(fi.Path, "/")) { fullPath := dn.path + "/" + name
torrentOffset += fi.Length for _, f := range dn.t.Files() {
continue if f.DisplayPath() == fullPath {
file = &f
} }
if fi.Path[len(dn.path)] != name { if isSubPath(fullPath, f.DisplayPath()) {
torrentOffset += fi.Length dir = true
continue
} }
__node := dn.node
__node.path = path.Join(__node.path, name)
if len(fi.Path) == len(dn.path)+1 {
_node = fileNode{
node: __node,
size: uint64(fi.Length),
TorrentOffset: torrentOffset,
}
} else {
_node = dirNode{__node}
}
break
} }
if _node == nil { n := dn.node
err = fuse.ENOENT n.path = fullPath
if dir && file != nil {
panic("both dir and file")
} }
return if file != nil {
return fileNode{n, file}, nil
}
if dir {
return dirNode{n}, nil
}
return nil, fuse.ENOENT
} }
func (dn dirNode) Attr(ctx context.Context, attr *fuse.Attr) error { func (dn dirNode) Attr(ctx context.Context, attr *fuse.Attr) error {
@ -145,7 +141,7 @@ func (rn rootNode) Lookup(ctx context.Context, name string) (_node fusefs.Node,
t: t, t: t,
} }
if !info.IsDir() { if !info.IsDir() {
_node = fileNode{__node, uint64(info.Length), 0} _node = fileNode{__node, &t.Files()[0]}
} else { } else {
_node = dirNode{__node} _node = dirNode{__node}
} }

View File

@ -10,6 +10,15 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
type Reader interface {
io.Reader
io.Seeker
io.Closer
missinggo.ReadContexter
SetReadahead(int64)
SetResponsive()
}
// Piece range by piece index, [begin, end). // Piece range by piece index, [begin, end).
type pieceRange struct { type pieceRange struct {
begin, end int begin, end int
@ -17,7 +26,7 @@ type pieceRange struct {
// Accesses Torrent data via a Client. Reads block until the data is // Accesses Torrent data via a Client. Reads block until the data is
// available. Seeks and readahead also drive Client behaviour. // available. Seeks and readahead also drive Client behaviour.
type Reader struct { type reader struct {
t *Torrent t *Torrent
responsive bool responsive bool
// Ensure operations that change the position are exclusive, like Read() // Ensure operations that change the position are exclusive, like Read()
@ -35,22 +44,22 @@ type Reader struct {
pieces pieceRange pieces pieceRange
} }
var _ io.ReadCloser = &Reader{} var _ io.ReadCloser = &reader{}
// Don't wait for pieces to complete and be verified. Read calls return as // Don't wait for pieces to complete and be verified. Read calls return as
// soon as they can when the underlying chunks become available. // soon as they can when the underlying chunks become available.
func (r *Reader) SetResponsive() { func (r *reader) SetResponsive() {
r.responsive = true r.responsive = true
} }
// Disable responsive mode. // Disable responsive mode. TODO: Remove?
func (r *Reader) SetNonResponsive() { func (r *reader) SetNonResponsive() {
r.responsive = false r.responsive = false
} }
// Configure the number of bytes ahead of a read that should also be // Configure the number of bytes ahead of a read that should also be
// prioritized in preparation for further reads. // prioritized in preparation for further reads.
func (r *Reader) SetReadahead(readahead int64) { func (r *reader) SetReadahead(readahead int64) {
r.mu.Lock() r.mu.Lock()
r.readahead = readahead r.readahead = readahead
r.mu.Unlock() r.mu.Unlock()
@ -59,12 +68,7 @@ func (r *Reader) SetReadahead(readahead int64) {
r.posChanged() r.posChanged()
} }
// Return reader's current position. func (r *reader) readable(off int64) (ret bool) {
func (r *Reader) CurrentPos() int64 {
return r.pos
}
func (r *Reader) readable(off int64) (ret bool) {
if r.t.closed.IsSet() { if r.t.closed.IsSet() {
return true return true
} }
@ -79,7 +83,7 @@ func (r *Reader) readable(off int64) (ret bool) {
} }
// How many bytes are available to read. Max is the most we could require. // How many bytes are available to read. Max is the most we could require.
func (r *Reader) available(off, max int64) (ret int64) { func (r *reader) available(off, max int64) (ret int64) {
for max > 0 { for max > 0 {
req, ok := r.t.offsetRequest(off) req, ok := r.t.offsetRequest(off)
if !ok { if !ok {
@ -100,7 +104,7 @@ func (r *Reader) available(off, max int64) (ret int64) {
return return
} }
func (r *Reader) waitReadable(off int64) { func (r *reader) waitReadable(off int64) {
// We may have been sent back here because we were told we could read but // We may have been sent back here because we were told we could read but
// it failed. // it failed.
r.t.cl.event.Wait() r.t.cl.event.Wait()
@ -108,7 +112,7 @@ func (r *Reader) waitReadable(off int64) {
// Calculates the pieces this reader wants downloaded, ignoring the cached // Calculates the pieces this reader wants downloaded, ignoring the cached
// value at r.pieces. // value at r.pieces.
func (r *Reader) piecesUncached() (ret pieceRange) { func (r *reader) piecesUncached() (ret pieceRange) {
ra := r.readahead ra := r.readahead
if ra < 1 { if ra < 1 {
ra = 1 ra = 1
@ -117,11 +121,11 @@ func (r *Reader) piecesUncached() (ret pieceRange) {
return return
} }
func (r *Reader) Read(b []byte) (n int, err error) { func (r *reader) Read(b []byte) (n int, err error) {
return r.ReadContext(context.Background(), b) return r.ReadContext(context.Background(), b)
} }
func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
// This is set under the Client lock if the Context is canceled. // This is set under the Client lock if the Context is canceled.
var ctxErr error var ctxErr error
if ctx.Done() != nil { if ctx.Done() != nil {
@ -166,7 +170,7 @@ func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
// Wait until some data should be available to read. Tickles the client if it // Wait until some data should be available to read. Tickles the client if it
// isn't. Returns how much should be readable without blocking. // isn't. Returns how much should be readable without blocking.
func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) { func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
r.t.cl.mu.Lock() r.t.cl.mu.Lock()
defer r.t.cl.mu.Unlock() defer r.t.cl.mu.Unlock()
for !r.readable(pos) && *ctxErr == nil { for !r.readable(pos) && *ctxErr == nil {
@ -176,7 +180,7 @@ func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
} }
// Performs at most one successful read to torrent storage. // Performs at most one successful read to torrent storage.
func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) { func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
if pos >= r.t.length { if pos >= r.t.length {
err = io.EOF err = io.EOF
return return
@ -210,14 +214,14 @@ func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro
} }
} }
func (r *Reader) Close() error { func (r *reader) Close() error {
r.t.cl.mu.Lock() r.t.cl.mu.Lock()
defer r.t.cl.mu.Unlock() defer r.t.cl.mu.Unlock()
r.t.deleteReader(r) r.t.deleteReader(r)
return nil return nil
} }
func (r *Reader) posChanged() { func (r *reader) posChanged() {
to := r.piecesUncached() to := r.piecesUncached()
from := r.pieces from := r.pieces
if to == from { if to == from {
@ -227,7 +231,7 @@ func (r *Reader) posChanged() {
r.t.readerPosChanged(from, to) r.t.readerPosChanged(from, to)
} }
func (r *Reader) Seek(off int64, whence int) (ret int64, err error) { func (r *reader) Seek(off int64, whence int) (ret int64, err error) {
r.opMu.Lock() r.opMu.Lock()
defer r.opMu.Unlock() defer r.opMu.Unlock()
@ -248,7 +252,3 @@ func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
r.posChanged() r.posChanged()
return return
} }
func (r *Reader) Torrent() *Torrent {
return r.t
}

View File

@ -18,7 +18,7 @@ func TestReaderReadContext(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer tt.Drop() defer tt.Drop()
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond)) ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond))
r := tt.NewReader() r := tt.Files()[0].NewReader()
defer r.Close() defer r.Close()
_, err = r.ReadContext(ctx, make([]byte, 1)) _, err = r.ReadContext(ctx, make([]byte, 1))
require.EqualValues(t, context.DeadlineExceeded, err) require.EqualValues(t, context.DeadlineExceeded, err)

14
t.go
View File

@ -31,14 +31,14 @@ func (t *Torrent) Info() *metainfo.Info {
// Returns a Reader bound to the torrent's data. All read calls block until // Returns a Reader bound to the torrent's data. All read calls block until
// the data requested is actually available. // the data requested is actually available.
func (t *Torrent) NewReader() (ret *Reader) { func (t *Torrent) NewReader() Reader {
ret = &Reader{ r := reader{
mu: &t.cl.mu, mu: &t.cl.mu,
t: t, t: t,
readahead: 5 * 1024 * 1024, readahead: 5 * 1024 * 1024,
} }
t.addReader(ret) t.addReader(&r)
return return &r
} }
// Returns the state of pieces of the torrent. They are grouped into runs of // Returns the state of pieces of the torrent. They are grouped into runs of
@ -133,17 +133,17 @@ func (t *Torrent) Metainfo() metainfo.MetaInfo {
return t.newMetaInfo() return t.newMetaInfo()
} }
func (t *Torrent) addReader(r *Reader) { func (t *Torrent) addReader(r *reader) {
t.cl.mu.Lock() t.cl.mu.Lock()
defer t.cl.mu.Unlock() defer t.cl.mu.Unlock()
if t.readers == nil { if t.readers == nil {
t.readers = make(map[*Reader]struct{}) t.readers = make(map[*reader]struct{})
} }
t.readers[r] = struct{}{} t.readers[r] = struct{}{}
r.posChanged() r.posChanged()
} }
func (t *Torrent) deleteReader(r *Reader) { func (t *Torrent) deleteReader(r *reader) {
delete(t.readers, r) delete(t.readers, r)
t.readersChanged() t.readersChanged()
} }

View File

@ -107,7 +107,7 @@ type Torrent struct {
// Set when .Info is obtained. // Set when .Info is obtained.
gotMetainfo missinggo.Event gotMetainfo missinggo.Event
readers map[*Reader]struct{} readers map[*reader]struct{}
readerNowPieces bitmap.Bitmap readerNowPieces bitmap.Bitmap
readerReadaheadPieces bitmap.Bitmap readerReadaheadPieces bitmap.Bitmap