Add File priorities

Fixes #220.
This commit is contained in:
Matt Joiner 2018-01-21 22:49:12 +11:00
parent 696595a76e
commit 0b553b296f
8 changed files with 102 additions and 43 deletions

View File

@ -162,6 +162,7 @@ func main() {
if file.DisplayPath() != rootGroup.Pick {
continue
}
file.Download()
srcReader := file.NewReader()
defer srcReader.Close()
io.Copy(dstWriter, srcReader)

44
file.go
View File

@ -4,6 +4,7 @@ import (
"strings"
"github.com/anacrolix/torrent/metainfo"
pwp "github.com/anacrolix/torrent/peer_protocol"
)
// Provides access to regions of torrent data that correspond to its files.
@ -13,6 +14,7 @@ type File struct {
offset int64
length int64
fi metainfo.FileInfo
prio piecePriority
}
func (f *File) Torrent() *Torrent {
@ -81,18 +83,7 @@ func (f *File) State() (ret []FilePieceState) {
// Requests that all pieces containing data in the file be downloaded.
func (f *File) Download() {
f.t.DownloadPieces(f.t.byteRegionPieces(f.offset, f.length))
}
// Deprecated: Use File.DownloadRegion.
func (f *File) PrioritizeRegion(off, len int64) {
f.DownloadRegion(off, len)
}
// Requests that torrent pieces containing bytes in the given region of the
// file be downloaded.
func (f *File) DownloadRegion(off, len int64) {
f.t.DownloadPieces(f.t.byteRegionPieces(f.offset+off, len))
f.SetPriority(PiecePriorityNormal)
}
func byteRegionExclusivePieces(off, size, pieceSize int64) (begin, end int) {
@ -105,8 +96,9 @@ func (f *File) exclusivePieces() (begin, end int) {
return byteRegionExclusivePieces(f.offset, f.length, int64(f.t.usualPieceSize()))
}
// Deprecated: Use File.SetPriority.
func (f *File) Cancel() {
f.t.CancelPieces(f.exclusivePieces())
f.SetPriority(PiecePriorityNone)
}
func (f *File) NewReader() Reader {
@ -120,3 +112,29 @@ func (f *File) NewReader() Reader {
f.t.addReader(&tr)
return &tr
}
// Sets the minimum priority for pieces in the File.
func (f *File) SetPriority(prio piecePriority) {
f.t.cl.mu.Lock()
defer f.t.cl.mu.Unlock()
if prio == f.prio {
return
}
f.prio = prio
f.t.updatePiecePriorities(f.firstPieceIndex().Int(), f.lastPieceIndex().Int()+1)
}
// Returns the priority per File.SetPriority.
func (f *File) Priority() piecePriority {
f.t.cl.mu.Lock()
defer f.t.cl.mu.Unlock()
return f.prio
}
func (f *File) firstPieceIndex() pwp.Integer {
return pwp.Integer(f.offset / int64(f.t.usualPieceSize()))
}
func (f *File) lastPieceIndex() pwp.Integer {
return pwp.Integer((f.offset + f.length) / int64(f.t.usualPieceSize()))
}

View File

@ -106,7 +106,7 @@ func (dn dirNode) Lookup(_ context.Context, name string) (fusefs.Node, error) {
fullPath := dn.path + "/" + name
for _, f := range dn.t.Files() {
if f.DisplayPath() == fullPath {
file = &f
file = f
}
if isSubPath(fullPath, f.DisplayPath()) {
dir = true
@ -143,7 +143,7 @@ func (rn rootNode) Lookup(ctx context.Context, name string) (_node fusefs.Node,
t: t,
}
if !info.IsDir() {
_node = fileNode{__node, &t.Files()[0]}
_node = fileNode{__node, t.Files()[0]}
} else {
_node = dirNode{__node}
}

View File

@ -21,6 +21,8 @@ func TestHashPieceAfterStorageClosed(t *testing.T) {
info, err := mi.UnmarshalInfo()
require.NoError(t, err)
tt.info = &info
tt.cacheLength(&info)
tt.initFiles()
tt.makePieces()
tt.storage, err = cs.OpenTorrent(tt.info, mi.HashInfoBytes())
require.NoError(t, err)

View File

@ -20,7 +20,7 @@ func (i *Integer) Read(r io.Reader) error {
return binary.Read(r, binary.BigEndian, i)
}
// It's perfectly fine to cast these to an int.
// It's perfectly fine to cast these to an int. TODO: Or is it?
func (i Integer) Int() int {
return int(i)
}

View File

@ -27,7 +27,7 @@ func (me piecePriority) BitmapPriority() int {
}
const (
PiecePriorityNone piecePriority = iota // Not wanted.
PiecePriorityNone piecePriority = iota // Not wanted. Must be the zero value.
PiecePriorityNormal // Wanted.
PiecePriorityHigh // Wanted a lot.
PiecePriorityReadahead // May be required soon.
@ -42,6 +42,7 @@ type Piece struct {
hash metainfo.Hash
t *Torrent
index int
files []*File
// Chunks we've written to since the last check. The chunk offset and
// length can be determined by the request chunkSize in use.
dirtyChunks bitmap.Bitmap
@ -192,3 +193,11 @@ func (p *Piece) VerifyData() {
func (p *Piece) queuedForHash() bool {
return p.t.piecesQueuedForHash.Get(p.index)
}
func (p *Piece) torrentBeginOffset() int64 {
return int64(p.index) * p.t.info.PieceLength
}
func (p *Piece) torrentEndOffset() int64 {
return p.torrentBeginOffset() + int64(p.length())
}

24
t.go
View File

@ -158,25 +158,27 @@ func (t *Torrent) CancelPieces(begin, end int) {
t.unpendPieceRange(begin, end)
}
// Returns handles to the files in the torrent. This requires the metainfo is
// available first.
func (t *Torrent) Files() (ret []File) {
info := t.Info()
if info == nil {
return
}
func (t *Torrent) initFiles() {
var offset int64
for _, fi := range info.UpvertedFiles() {
ret = append(ret, File{
t.files = new([]*File)
for _, fi := range t.info.UpvertedFiles() {
*t.files = append(*t.files, &File{
t,
strings.Join(append([]string{info.Name}, fi.Path...), "/"),
strings.Join(append([]string{t.info.Name}, fi.Path...), "/"),
offset,
fi.Length,
fi,
PiecePriorityNone,
})
offset += fi.Length
}
return
}
// Returns handles to the files in the torrent. This requires that the Info is
// available first.
func (t *Torrent) Files() []*File {
return *t.files
}
func (t *Torrent) AddPeers(pp []Peer) {

View File

@ -16,12 +16,11 @@ import (
"text/tabwriter"
"time"
"github.com/anacrolix/missinggo/prioritybitmap"
"github.com/anacrolix/dht"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/bitmap"
"github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/prioritybitmap"
"github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/missinggo/slices"
"github.com/bradfitz/iter"
@ -76,7 +75,8 @@ type Torrent struct {
metainfo metainfo.MetaInfo
// The info dict. nil if we don't have it (yet).
info *metainfo.Info
info *metainfo.Info
files *[]*File
// Active peer connections, running message stream loops.
conns map[*connection]struct{}
@ -305,9 +305,36 @@ func (t *Torrent) makePieces() {
piece.index = i
piece.noPendingWrites.L = &piece.pendingWritesMutex
missinggo.CopyExact(piece.hash[:], hash)
piece.files = (*t.files)[pieceFirstFileIndex(piece.torrentBeginOffset(), *t.files):pieceLastFileIndex(piece.torrentEndOffset(), *t.files)]
}
}
func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
for i, f := range files {
if f.offset+f.length > pieceOffset {
return i
}
}
return -1
}
func pieceLastFileIndex(pieceEndOffset int64, files []*File) int {
for i, f := range files {
if f.offset+f.length >= pieceEndOffset {
return i
}
}
return -1
}
func (t *Torrent) cacheLength(info *metainfo.Info) {
var l int64
for _, f := range t.info.UpvertedFiles() {
l += f.Length
}
t.length = &l
}
// Called when metadata for a torrent becomes available.
func (t *Torrent) setInfoBytes(b []byte) error {
if t.haveInfo() {
@ -327,6 +354,7 @@ func (t *Torrent) setInfoBytes(b []byte) error {
}
defer t.updateWantPeersEvent()
t.info = &info
t.initFiles()
t.displayName = "" // Save a few bytes lol.
t.cl.event.Broadcast()
t.gotMetainfo.Set()
@ -334,11 +362,7 @@ func (t *Torrent) setInfoBytes(b []byte) error {
if err != nil {
return fmt.Errorf("error opening torrent storage: %s", err)
}
var l int64
for _, f := range t.info.UpvertedFiles() {
l += f.Length
}
t.length = &l
t.cacheLength(&info)
t.metadataBytes = b
t.metadataCompletedChunks = nil
t.makePieces()
@ -945,23 +969,26 @@ func (t *Torrent) piecePriority(piece int) piecePriority {
return t.pieces[piece].priority
}
func (t *Torrent) piecePriorityUncached(piece int) piecePriority {
if t.pieceComplete(piece) {
return PiecePriorityNone
func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) {
for _, f := range t.pieces[piece].files {
ret.Raise(f.prio)
}
if t.readerNowPieces.Contains(piece) {
return PiecePriorityNow
ret.Raise(PiecePriorityNow)
}
// if t.readerNowPieces.Contains(piece - 1) {
// return PiecePriorityNext
// }
if t.readerReadaheadPieces.Contains(piece) {
return PiecePriorityReadahead
ret.Raise(PiecePriorityReadahead)
}
if t.pendingPieces.Contains(piece) {
return PiecePriorityNormal
ret.Raise(PiecePriorityNormal)
}
return PiecePriorityNone
if t.pieceComplete(piece) {
return PiecePriorityNone
}
return
}
func (t *Torrent) pendPiece(piece int) {