From 9b718566ba5d19eeac4b68d4f9d5a2351ec02a7d Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sat, 6 Jan 2018 16:37:13 +1100 Subject: [PATCH] Add File.NewReader Fixes #222 --- cmd/torrent-pick/main.go | 3 +-- example_test.go | 14 +++-------- file.go | 6 +++++ file_reader.go | 18 ++++++++++++++ fs/file_handle.go | 6 ++--- fs/filenode.go | 11 +++----- fs/torrentfs.go | 54 +++++++++++++++++++--------------------- reader.go | 52 +++++++++++++++++++------------------- reader_test.go | 2 +- t.go | 14 +++++------ torrent.go | 2 +- 11 files changed, 96 insertions(+), 86 deletions(-) create mode 100644 file_reader.go diff --git a/cmd/torrent-pick/main.go b/cmd/torrent-pick/main.go index be590b8b..8ff22455 100644 --- a/cmd/torrent-pick/main.go +++ b/cmd/torrent-pick/main.go @@ -15,7 +15,6 @@ import ( "time" _ "github.com/anacrolix/envpprof" - "github.com/anacrolix/missinggo" "github.com/dustin/go-humanize" "github.com/jessevdk/go-flags" @@ -163,7 +162,7 @@ func main() { if file.DisplayPath() != rootGroup.Pick { continue } - srcReader := missinggo.NewSectionReadSeeker(t.NewReader(), file.Offset(), file.Length()) + srcReader := file.NewReader() io.Copy(dstWriter, srcReader) return } diff --git a/example_test.go b/example_test.go index 5c58e4d9..54cb7190 100644 --- a/example_test.go +++ b/example_test.go @@ -3,8 +3,6 @@ package torrent_test import ( "log" - "github.com/anacrolix/missinggo" - "github.com/anacrolix/torrent" ) @@ -19,13 +17,9 @@ func Example() { } func Example_fileReader() { - var ( - t *torrent.Torrent - f torrent.File - ) - r := t.NewReader() - defer r.Close() - // Access the parts of the torrent pertaining to f. Data will be + var f torrent.File + // Accesses the parts of the torrent pertaining to f. Data will be // downloaded as required, per the configuration of the torrent.Reader. - _ = missinggo.NewSectionReadSeeker(r, f.Offset(), f.Length()) + r := f.NewReader() + defer r.Close() } diff --git a/file.go b/file.go index 720e4fb5..e71cee6c 100644 --- a/file.go +++ b/file.go @@ -3,6 +3,7 @@ package torrent import ( "strings" + "github.com/anacrolix/missinggo" "github.com/anacrolix/torrent/metainfo" ) @@ -99,3 +100,8 @@ func (f *File) exclusivePieces() (begin, end int) { func (f *File) Cancel() { f.t.CancelPieces(f.exclusivePieces()) } + +func (f *File) NewReader() Reader { + tr := f.t.NewReader() + return fileReader{missinggo.NewSectionReadSeeker(tr, f.Offset(), f.Length()), tr} +} diff --git a/file_reader.go b/file_reader.go new file mode 100644 index 00000000..dde43620 --- /dev/null +++ b/file_reader.go @@ -0,0 +1,18 @@ +package torrent + +import ( + "io" + + "github.com/anacrolix/missinggo" +) + +type fileReaderInherited interface { + io.Closer + SetReadahead(int64) + SetResponsive() +} + +type fileReader struct { + missinggo.ReadSeekContexter + fileReaderInherited +} diff --git a/fs/file_handle.go b/fs/file_handle.go index 3405cce7..41517296 100644 --- a/fs/file_handle.go +++ b/fs/file_handle.go @@ -13,7 +13,7 @@ import ( type fileHandle struct { fn fileNode - r *torrent.Reader + r torrent.Reader } var _ interface { @@ -26,11 +26,11 @@ func (me fileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse if req.Dir { panic("read on directory") } - pos, err := me.r.Seek(me.fn.TorrentOffset+req.Offset, io.SeekStart) + pos, err := me.r.Seek(req.Offset, io.SeekStart) if err != nil { panic(err) } - if pos != me.fn.TorrentOffset+req.Offset { + if pos != req.Offset { panic("seek failed") } resp.Data = resp.Data[:req.Size] diff --git a/fs/filenode.go b/fs/filenode.go index db850755..d0e4fe3f 100644 --- a/fs/filenode.go +++ b/fs/filenode.go @@ -1,17 +1,15 @@ package torrentfs import ( - "io" - "bazil.org/fuse" fusefs "bazil.org/fuse/fs" + "github.com/anacrolix/torrent" "golang.org/x/net/context" ) type fileNode struct { node - size uint64 - TorrentOffset int64 + f *torrent.File } var ( @@ -19,13 +17,12 @@ var ( ) func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error { - attr.Size = fn.size + attr.Size = uint64(fn.f.Length()) attr.Mode = defaultMode return nil } func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) { - r := fn.t.NewReader() - r.Seek(fn.TorrentOffset, io.SeekStart) + r := fn.f.NewReader() return fileHandle{fn, r}, nil } diff --git a/fs/torrentfs.go b/fs/torrentfs.go index 32ead7fb..65cb42ce 100644 --- a/fs/torrentfs.go +++ b/fs/torrentfs.go @@ -3,7 +3,6 @@ package torrentfs import ( "expvar" "os" - "path" "strings" "sync" @@ -67,11 +66,12 @@ func isSubPath(parent, child string) bool { if !strings.HasPrefix(child, parent) { return false } - s := child[len(parent):] - if len(s) == 0 { + extra := child[len(parent):] + if len(extra) == 0 { return false } - return s[0] == '/' + // Not just a file with more stuff on the end. + return extra[0] == '/' } func (dn dirNode) ReadDirAll(ctx context.Context) (des []fuse.Dirent, err error) { @@ -98,34 +98,30 @@ func (dn dirNode) ReadDirAll(ctx context.Context) (des []fuse.Dirent, err error) return } -func (dn dirNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) { - var torrentOffset int64 - for _, fi := range dn.metadata.Files { - if !isSubPath(dn.path, strings.Join(fi.Path, "/")) { - torrentOffset += fi.Length - continue +func (dn dirNode) Lookup(_ context.Context, name string) (fusefs.Node, error) { + dir := false + var file *torrent.File + fullPath := dn.path + "/" + name + for _, f := range dn.t.Files() { + if f.DisplayPath() == fullPath { + file = &f } - if fi.Path[len(dn.path)] != name { - torrentOffset += fi.Length - continue + if isSubPath(fullPath, f.DisplayPath()) { + dir = true } - __node := dn.node - __node.path = path.Join(__node.path, name) - if len(fi.Path) == len(dn.path)+1 { - _node = fileNode{ - node: __node, - size: uint64(fi.Length), - TorrentOffset: torrentOffset, - } - } else { - _node = dirNode{__node} - } - break } - if _node == nil { - err = fuse.ENOENT + n := dn.node + n.path = fullPath + if dir && file != nil { + panic("both dir and file") } - return + if file != nil { + return fileNode{n, file}, nil + } + if dir { + return dirNode{n}, nil + } + return nil, fuse.ENOENT } func (dn dirNode) Attr(ctx context.Context, attr *fuse.Attr) error { @@ -145,7 +141,7 @@ func (rn rootNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, t: t, } if !info.IsDir() { - _node = fileNode{__node, uint64(info.Length), 0} + _node = fileNode{__node, &t.Files()[0]} } else { _node = dirNode{__node} } diff --git a/reader.go b/reader.go index bdfeed04..76c39e96 100644 --- a/reader.go +++ b/reader.go @@ -10,6 +10,15 @@ import ( "golang.org/x/net/context" ) +type Reader interface { + io.Reader + io.Seeker + io.Closer + missinggo.ReadContexter + SetReadahead(int64) + SetResponsive() +} + // Piece range by piece index, [begin, end). type pieceRange struct { begin, end int @@ -17,7 +26,7 @@ type pieceRange struct { // Accesses Torrent data via a Client. Reads block until the data is // available. Seeks and readahead also drive Client behaviour. -type Reader struct { +type reader struct { t *Torrent responsive bool // Ensure operations that change the position are exclusive, like Read() @@ -35,22 +44,22 @@ type Reader struct { pieces pieceRange } -var _ io.ReadCloser = &Reader{} +var _ io.ReadCloser = &reader{} // Don't wait for pieces to complete and be verified. Read calls return as // soon as they can when the underlying chunks become available. -func (r *Reader) SetResponsive() { +func (r *reader) SetResponsive() { r.responsive = true } -// Disable responsive mode. -func (r *Reader) SetNonResponsive() { +// Disable responsive mode. TODO: Remove? +func (r *reader) SetNonResponsive() { r.responsive = false } // Configure the number of bytes ahead of a read that should also be // prioritized in preparation for further reads. -func (r *Reader) SetReadahead(readahead int64) { +func (r *reader) SetReadahead(readahead int64) { r.mu.Lock() r.readahead = readahead r.mu.Unlock() @@ -59,12 +68,7 @@ func (r *Reader) SetReadahead(readahead int64) { r.posChanged() } -// Return reader's current position. -func (r *Reader) CurrentPos() int64 { - return r.pos -} - -func (r *Reader) readable(off int64) (ret bool) { +func (r *reader) readable(off int64) (ret bool) { if r.t.closed.IsSet() { return true } @@ -79,7 +83,7 @@ func (r *Reader) readable(off int64) (ret bool) { } // How many bytes are available to read. Max is the most we could require. -func (r *Reader) available(off, max int64) (ret int64) { +func (r *reader) available(off, max int64) (ret int64) { for max > 0 { req, ok := r.t.offsetRequest(off) if !ok { @@ -100,7 +104,7 @@ func (r *Reader) available(off, max int64) (ret int64) { return } -func (r *Reader) waitReadable(off int64) { +func (r *reader) waitReadable(off int64) { // We may have been sent back here because we were told we could read but // it failed. r.t.cl.event.Wait() @@ -108,7 +112,7 @@ func (r *Reader) waitReadable(off int64) { // Calculates the pieces this reader wants downloaded, ignoring the cached // value at r.pieces. -func (r *Reader) piecesUncached() (ret pieceRange) { +func (r *reader) piecesUncached() (ret pieceRange) { ra := r.readahead if ra < 1 { ra = 1 @@ -117,11 +121,11 @@ func (r *Reader) piecesUncached() (ret pieceRange) { return } -func (r *Reader) Read(b []byte) (n int, err error) { +func (r *reader) Read(b []byte) (n int, err error) { return r.ReadContext(context.Background(), b) } -func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { +func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { // This is set under the Client lock if the Context is canceled. var ctxErr error if ctx.Done() != nil { @@ -166,7 +170,7 @@ func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { // Wait until some data should be available to read. Tickles the client if it // isn't. Returns how much should be readable without blocking. -func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) { +func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) { r.t.cl.mu.Lock() defer r.t.cl.mu.Unlock() for !r.readable(pos) && *ctxErr == nil { @@ -176,7 +180,7 @@ func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) { } // Performs at most one successful read to torrent storage. -func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) { +func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) { if pos >= r.t.length { err = io.EOF return @@ -210,14 +214,14 @@ func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro } } -func (r *Reader) Close() error { +func (r *reader) Close() error { r.t.cl.mu.Lock() defer r.t.cl.mu.Unlock() r.t.deleteReader(r) return nil } -func (r *Reader) posChanged() { +func (r *reader) posChanged() { to := r.piecesUncached() from := r.pieces if to == from { @@ -227,7 +231,7 @@ func (r *Reader) posChanged() { r.t.readerPosChanged(from, to) } -func (r *Reader) Seek(off int64, whence int) (ret int64, err error) { +func (r *reader) Seek(off int64, whence int) (ret int64, err error) { r.opMu.Lock() defer r.opMu.Unlock() @@ -248,7 +252,3 @@ func (r *Reader) Seek(off int64, whence int) (ret int64, err error) { r.posChanged() return } - -func (r *Reader) Torrent() *Torrent { - return r.t -} diff --git a/reader_test.go b/reader_test.go index f56e9473..5378efc5 100644 --- a/reader_test.go +++ b/reader_test.go @@ -18,7 +18,7 @@ func TestReaderReadContext(t *testing.T) { require.NoError(t, err) defer tt.Drop() ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond)) - r := tt.NewReader() + r := tt.Files()[0].NewReader() defer r.Close() _, err = r.ReadContext(ctx, make([]byte, 1)) require.EqualValues(t, context.DeadlineExceeded, err) diff --git a/t.go b/t.go index cc69d8e3..f687cb59 100644 --- a/t.go +++ b/t.go @@ -31,14 +31,14 @@ func (t *Torrent) Info() *metainfo.Info { // Returns a Reader bound to the torrent's data. All read calls block until // the data requested is actually available. -func (t *Torrent) NewReader() (ret *Reader) { - ret = &Reader{ +func (t *Torrent) NewReader() Reader { + r := reader{ mu: &t.cl.mu, t: t, readahead: 5 * 1024 * 1024, } - t.addReader(ret) - return + t.addReader(&r) + return &r } // Returns the state of pieces of the torrent. They are grouped into runs of @@ -133,17 +133,17 @@ func (t *Torrent) Metainfo() metainfo.MetaInfo { return t.newMetaInfo() } -func (t *Torrent) addReader(r *Reader) { +func (t *Torrent) addReader(r *reader) { t.cl.mu.Lock() defer t.cl.mu.Unlock() if t.readers == nil { - t.readers = make(map[*Reader]struct{}) + t.readers = make(map[*reader]struct{}) } t.readers[r] = struct{}{} r.posChanged() } -func (t *Torrent) deleteReader(r *Reader) { +func (t *Torrent) deleteReader(r *reader) { delete(t.readers, r) t.readersChanged() } diff --git a/torrent.go b/torrent.go index 46575942..3cb32a13 100644 --- a/torrent.go +++ b/torrent.go @@ -107,7 +107,7 @@ type Torrent struct { // Set when .Info is obtained. gotMetainfo missinggo.Event - readers map[*Reader]struct{} + readers map[*reader]struct{} readerNowPieces bitmap.Bitmap readerReadaheadPieces bitmap.Bitmap