Rework piece completion

This commit is contained in:
Matt Joiner 2015-03-11 02:41:21 +11:00
parent 914bc12bb6
commit e6fbde0dc6
7 changed files with 351 additions and 236 deletions

View File

@ -100,29 +100,28 @@ func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
// been checked before.
func (cl *Client) queueFirstHash(t *torrent, piece int) {
p := t.Pieces[piece]
if p.EverHashed || p.Hashing || p.QueuedForHash || p.Complete() {
if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
return
}
cl.queuePieceCheck(t, pp.Integer(piece))
}
type Client struct {
noUpload bool
dataDir string
halfOpenLimit int
peerID [20]byte
listeners []net.Listener
utpSock *utp.Socket
disableTrackers bool
downloadStrategy downloadStrategy
dHT *dht.Server
disableUTP bool
disableTCP bool
ipBlockList *iplist.IPList
bannedTorrents map[InfoHash]struct{}
_configDir string
config Config
pruneTimer *time.Timer
noUpload bool
dataDir string
halfOpenLimit int
peerID [20]byte
listeners []net.Listener
utpSock *utp.Socket
disableTrackers bool
dHT *dht.Server
disableUTP bool
disableTCP bool
ipBlockList *iplist.IPList
bannedTorrents map[InfoHash]struct{}
_configDir string
config Config
pruneTimer *time.Timer
torrentDataOpener TorrentDataOpener
@ -274,7 +273,10 @@ func (cl *Client) prepareRead(t *torrent, off int64) (n int64) {
}
piece := t.Pieces[index]
cl.readRaisePiecePriorities(t, off)
for !piece.Complete() && !t.isClosed() {
for !t.pieceComplete(index) && !t.isClosed() {
// This is to prevent being starved if a piece is dropped before we
// can read it.
cl.readRaisePiecePriorities(t, off)
piece.Event.Wait()
}
return t.Info.Piece(index).Length() - off%t.Info.PieceLength
@ -292,6 +294,7 @@ type SectionOpener interface {
}
func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
again:
if ra, ok := d.(io.ReaderAt); ok {
return ra.ReadAt(b, off)
}
@ -304,6 +307,10 @@ func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
defer rc.Close()
return io.ReadFull(rc, b)
}
if dp, ok := super(d); ok {
d = dp.(data.Data)
goto again
}
panic(fmt.Sprintf("can't read from %T", d))
}
@ -355,14 +362,7 @@ func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority)
}
cl.queueFirstHash(t, piece)
t.Pieces[piece].Priority = priority
if t.wantPiece(piece) {
for _, c := range t.Conns {
if c.PeerHasPiece(pp.Integer(piece)) {
t.connPendPiece(c, piece)
cl.replenishConnRequests(t, c)
}
}
}
cl.pieceChanged(t, piece)
}
func (cl *Client) setEnvBlocklist() (err error) {
@ -2224,11 +2224,45 @@ func (me *Client) WaitAll() bool {
return true
}
func (me *Client) fillRequests(t *torrent, c *connection) {
if c.Interested {
if c.PeerChoked {
return
}
if len(c.Requests) > c.requestsLowWater {
return
}
}
addRequest := func(req request) (again bool) {
if len(c.Requests) >= 32 {
return false
}
return c.Request(req)
}
for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
pieceIndex := e.Piece()
if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
panic("piece in request order but peer doesn't have it")
}
if !t.wantPiece(pieceIndex) {
panic("unwanted piece in connection request order")
}
piece := t.Pieces[pieceIndex]
for _, cs := range piece.shuffledPendingChunkSpecs() {
r := request{pp.Integer(pieceIndex), cs}
if !addRequest(r) {
return
}
}
}
return
}
func (me *Client) replenishConnRequests(t *torrent, c *connection) {
if !t.haveInfo() {
return
}
me.downloadStrategy.FillRequests(t, c)
me.fillRequests(t, c)
if len(c.Requests) == 0 && !c.PeerChoked {
c.SetInterested(false)
}
@ -2248,7 +2282,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
piece := t.Pieces[req.Index]
// Do we actually want this chunk?
if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
if _, ok := piece.PendingChunkSpecs[req.chunkSpec]; !ok || piece.Priority == piecePriorityNone {
unusedDownloadedChunksCount.Add(1)
c.UnwantedChunksReceived++
return nil
@ -2298,10 +2332,15 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
}
}
}
me.pieceChanged(t, int(piece))
}
func (me *Client) pieceChanged(t *torrent, piece int) {
correct := t.pieceComplete(piece)
p := t.Pieces[piece]
if correct {
p.Priority = piecePriorityNone
p.PendingChunkSpecs = nil
p.complete = true
p.Event.Broadcast()
} else {
if len(p.PendingChunkSpecs) == 0 {
@ -2344,7 +2383,7 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
cl.event.Wait()
}
p.QueuedForHash = false
if t.isClosed() || p.complete {
if t.isClosed() || t.pieceComplete(int(index)) {
return
}
p.Hashing = true

View File

@ -1,39 +1,15 @@
package blob
import (
"bytes"
"crypto/sha1"
"encoding/hex"
"errors"
"io"
"os"
"path/filepath"
dataPkg "bitbucket.org/anacrolix/go.torrent/data"
"github.com/anacrolix/libtorgo/metainfo"
)
const (
filePerm = 0640
dirPerm = 0750
)
type data struct {
info *metainfo.Info
baseDir string
}
type store struct {
baseDir string
}
func (me store) OpenTorrent(info *metainfo.Info) dataPkg.Data {
return &data{info, me.baseDir}
}
func NewStore(baseDir string) dataPkg.Store {
return store{baseDir}
info *metainfo.Info
store *store
}
func (me *data) pieceHashHex(i int) string {
@ -42,45 +18,26 @@ func (me *data) pieceHashHex(i int) string {
func (me *data) Close() {}
func (me *data) ReadAt(p []byte, off int64) (n int, err error) {
hash := me.pieceHashHex(int(off / me.info.PieceLength))
f, err := os.Open(me.baseDir + "/complete/" + hash)
if os.IsNotExist(err) {
f, err = os.Open(me.baseDir + "/incomplete/" + hash)
if os.IsNotExist(err) {
err = io.EOF
return
}
if err != nil {
return
}
} else if err != nil {
func (me *data) ReadAt(b []byte, off int64) (n int, err error) {
p := me.info.Piece(int(off / me.info.PieceLength))
f := me.store.pieceRead(p)
if f == nil {
err = io.ErrUnexpectedEOF
return
}
defer f.Close()
off %= me.info.PieceLength
return f.ReadAt(p, off)
}
func (me *data) openComplete(piece int) (f *os.File, err error) {
return os.OpenFile(me.baseDir+"/complete/"+me.pieceHashHex(piece), os.O_RDWR, 0660)
n, err = f.ReadAt(b, off%me.info.PieceLength)
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return
}
func (me *data) WriteAt(p []byte, off int64) (n int, err error) {
i := int(off / me.info.PieceLength)
off %= me.info.PieceLength
for len(p) != 0 {
_, err = os.Stat(me.baseDir + "/complete/" + me.pieceHashHex(i))
if err == nil {
err = errors.New("can't write to completed piece")
return
}
os.MkdirAll(me.baseDir+"/incomplete", 0750)
var f *os.File
f, err = os.OpenFile(me.baseDir+"/incomplete/"+me.pieceHashHex(i), os.O_WRONLY|os.O_CREATE, 0640)
if err != nil {
return
}
f := me.store.pieceWrite(me.info.Piece(i))
p1 := p
maxN := me.info.Piece(i).Length() - off
if int64(len(p1)) > maxN {
@ -95,28 +52,24 @@ func (me *data) WriteAt(p []byte, off int64) (n int, err error) {
}
p = p[n1:]
off = 0
i++
}
return
}
func (me *data) pieceReader(piece int, off int64) (ret io.ReadCloser, err error) {
f, err := os.Open(me.completedPiecePath(piece))
if os.IsNotExist(err) {
f, err = os.Open(me.incompletePiecePath(piece))
if os.IsNotExist(err) {
err = io.EOF
return
}
if err != nil {
return
}
} else if err != nil {
f := me.store.pieceRead(me.info.Piece(piece))
if f == nil {
err = io.ErrUnexpectedEOF
return
}
return struct {
io.Reader
io.Closer
}{io.NewSectionReader(f, off, me.info.Piece(piece).Length()-off), f}, nil
}{
Reader: io.NewSectionReader(f, off, me.info.Piece(piece).Length()-off),
Closer: f,
}, nil
}
func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) {
@ -130,59 +83,26 @@ func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err er
err = nil
}
return
pr.Close()
}
var n1 int64
n1, err = io.CopyN(w, pr, n)
pr.Close()
written += n1
n -= n1
if err != nil {
return
}
off = 0
i++
}
return
}
func (me *data) incompletePiecePath(piece int) string {
return filepath.Join(me.baseDir, "incomplete", me.pieceHashHex(piece))
}
func (me *data) completedPiecePath(piece int) string {
return filepath.Join(me.baseDir, "complete", me.pieceHashHex(piece))
}
func (me *data) PieceCompleted(index int) (err error) {
var (
incompletePiecePath = me.incompletePiecePath(index)
completedPiecePath = me.completedPiecePath(index)
)
fSrc, err := os.Open(incompletePiecePath)
if err != nil {
return
}
defer fSrc.Close()
os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm)
fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm)
if err != nil {
return
}
defer fDst.Close()
hasher := sha1.New()
r := io.TeeReader(io.LimitReader(fSrc, me.info.Piece(index).Length()), hasher)
_, err = io.Copy(fDst, r)
if err != nil {
return
}
if !bytes.Equal(hasher.Sum(nil), me.info.Piece(index).Hash()) {
err = errors.New("piece incomplete")
os.Remove(completedPiecePath)
return
}
os.Remove(incompletePiecePath)
return
return me.store.PieceCompleted(me.info.Piece(index))
}
func (me *data) PieceComplete(piece int) bool {
_, err := os.Stat(me.completedPiecePath(piece))
return err == nil
return me.store.pieceComplete(me.info.Piece(piece))
}

195
data/blob/store.go Normal file
View File

@ -0,0 +1,195 @@
package blob
import (
"bytes"
"crypto/sha1"
"encoding/hex"
"errors"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
dataPkg "bitbucket.org/anacrolix/go.torrent/data"
"github.com/anacrolix/libtorgo/metainfo"
)
const (
filePerm = 0640
dirPerm = 0750
)
type store struct {
baseDir string
capacity int64
completed map[string]struct{}
}
func (me *store) OpenTorrent(info *metainfo.Info) dataPkg.Data {
return &data{info, me}
}
type StoreOption func(*store)
func Capacity(bytes int64) StoreOption {
return func(s *store) {
s.capacity = bytes
}
}
func NewStore(baseDir string, opt ...StoreOption) dataPkg.Store {
s := &store{baseDir, -1, nil}
for _, o := range opt {
o(s)
}
s.initCompleted()
return s
}
func (me *store) initCompleted() {
fis, err := me.readCompletedDir()
if err != nil {
panic(err)
}
me.completed = make(map[string]struct{}, len(fis))
for _, fi := range fis {
me.completed[fi.Name()] = struct{}{}
}
}
func (me *store) completePieceDirPath() string {
return filepath.Join(me.baseDir, "complete")
}
func (me *store) path(p metainfo.Piece, completed bool) string {
return filepath.Join(me.baseDir, func() string {
if completed {
return "complete"
} else {
return "incomplete"
}
}(), fmt.Sprintf("%x", p.Hash()))
}
func (me *store) pieceComplete(p metainfo.Piece) bool {
_, ok := me.completed[hex.EncodeToString(p.Hash())]
return ok
}
func (me *store) pieceWrite(p metainfo.Piece) (f *os.File) {
if me.pieceComplete(p) {
return
}
name := me.path(p, false)
os.MkdirAll(filepath.Dir(name), dirPerm)
f, err := os.OpenFile(name, os.O_CREATE|os.O_WRONLY, filePerm)
if err != nil {
panic(err)
}
return
}
func (me *store) pieceRead(p metainfo.Piece) (f *os.File) {
f, err := os.Open(me.path(p, true))
if err == nil {
return
}
if !os.IsNotExist(err) {
panic(err)
}
f, err = os.Open(me.path(p, false))
if err == nil {
return
}
if !os.IsNotExist(err) {
panic(err)
}
return
}
func (me *store) readCompletedDir() (fis []os.FileInfo, err error) {
f, err := os.Open(me.completePieceDirPath())
if err != nil {
if os.IsNotExist(err) {
err = nil
}
return
}
fis, err = f.Readdir(-1)
f.Close()
return
}
func (me *store) removeCompleted(name string) (err error) {
err = os.Remove(filepath.Join(me.completePieceDirPath(), name))
if os.IsNotExist(err) {
err = nil
}
if err != nil {
return err
}
delete(me.completed, name)
return
}
func (me *store) makeSpace(space int64) error {
if me.capacity < 0 {
return nil
}
if space > me.capacity {
return errors.New("space requested exceeds capacity")
}
fis, err := me.readCompletedDir()
if err != nil {
return err
}
var size int64
for _, fi := range fis {
size += fi.Size()
}
for size > me.capacity-space {
i := rand.Intn(len(fis))
me.removeCompleted(fis[i].Name())
size -= fis[i].Size()
fis[i] = fis[len(fis)-1]
fis = fis[:len(fis)-1]
}
return nil
}
func (me *store) PieceCompleted(p metainfo.Piece) (err error) {
err = me.makeSpace(p.Length())
if err != nil {
return
}
var (
incompletePiecePath = me.path(p, false)
completedPiecePath = me.path(p, true)
)
fSrc, err := os.Open(incompletePiecePath)
if err != nil {
return
}
defer fSrc.Close()
os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm)
fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm)
if err != nil {
return
}
defer fDst.Close()
hasher := sha1.New()
r := io.TeeReader(io.LimitReader(fSrc, p.Length()), hasher)
_, err = io.Copy(fDst, r)
if err != nil {
return
}
if !bytes.Equal(hasher.Sum(nil), p.Hash()) {
err = errors.New("piece incomplete")
os.Remove(completedPiecePath)
return
}
os.Remove(incompletePiecePath)
me.completed[hex.EncodeToString(p.Hash())] = struct{}{}
return
}

View File

@ -1,77 +0,0 @@
package torrent
import (
"io"
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
)
type downloadStrategy interface {
// Tops up the outgoing pending requests.
FillRequests(*torrent, *connection)
TorrentStarted(*torrent)
TorrentStopped(*torrent)
DeleteRequest(*torrent, request)
TorrentPrioritize(t *torrent, off, _len int64)
TorrentGotChunk(*torrent, request)
TorrentGotPiece(t *torrent, piece int)
WriteStatus(w io.Writer)
AssertNotRequested(*torrent, request)
PendingData(*torrent) bool
}
type defaultDownloadStrategy struct{}
func (me *defaultDownloadStrategy) PendingData(t *torrent) bool {
return !t.haveAllPieces()
}
func (me *defaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {}
func (me *defaultDownloadStrategy) WriteStatus(w io.Writer) {}
func (s *defaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
if c.Interested {
if c.PeerChoked {
return
}
if len(c.Requests) > c.requestsLowWater {
return
}
}
addRequest := func(req request) (again bool) {
if len(c.Requests) >= 32 {
return false
}
return c.Request(req)
}
for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
pieceIndex := e.Piece()
if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
panic("piece in request order but peer doesn't have it")
}
if !t.wantPiece(pieceIndex) {
panic("unwanted piece in connection request order")
}
piece := t.Pieces[pieceIndex]
for _, cs := range piece.shuffledPendingChunkSpecs() {
r := request{pp.Integer(pieceIndex), cs}
if !addRequest(r) {
return
}
}
}
return
}
func (s *defaultDownloadStrategy) TorrentStarted(t *torrent) {}
func (s *defaultDownloadStrategy) TorrentStopped(t *torrent) {
}
func (s *defaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
}
func (me *defaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {}
func (me *defaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {}
func (*defaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}

20
misc.go
View File

@ -45,7 +45,6 @@ const (
type piece struct {
Hash pieceSum
complete bool
PendingChunkSpecs map[chunkSpec]struct{}
Hashing bool
QueuedForHash bool
@ -72,10 +71,6 @@ func (p *piece) shuffledPendingChunkSpecs() (css []chunkSpec) {
return
}
func (p *piece) Complete() bool {
return p.complete
}
func lastChunkSpec(pieceLength peer_protocol.Integer) (cs chunkSpec) {
cs.Begin = (pieceLength - 1) / chunkSize * chunkSize
cs.Length = pieceLength - cs.Begin
@ -108,3 +103,18 @@ func metadataPieceSize(totalSize int, piece int) int {
}
return ret
}
type Super interface {
Super() interface{}
}
// Returns ok if there's a parent, and it's not nil.
func super(child interface{}) (parent interface{}, ok bool) {
s, ok := child.(Super)
if !ok {
return
}
parent = s.Super()
ok = parent != nil
return
}

21
stateless.go Normal file
View File

@ -0,0 +1,21 @@
package torrent
import "bitbucket.org/anacrolix/go.torrent/data"
type statelessDataWrapper struct {
data.Data
complete []bool
}
func (me *statelessDataWrapper) PieceComplete(piece int) bool {
return me.complete[piece]
}
func (me *statelessDataWrapper) PieceCompleted(piece int) error {
me.complete[piece] = true
return nil
}
func (me *statelessDataWrapper) Super() interface{} {
return me.Data
}

View File

@ -12,6 +12,8 @@ import (
"sync"
"time"
"github.com/bradfitz/iter"
"bitbucket.org/anacrolix/go.torrent/data"
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
"bitbucket.org/anacrolix/go.torrent/tracker"
@ -20,11 +22,11 @@ import (
"github.com/anacrolix/libtorgo/metainfo"
)
func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
piece := t.Pieces[index]
if piece.complete {
func (t *torrent) PieceNumPendingBytes(index int) (count pp.Integer) {
if t.pieceComplete(index) {
return 0
}
piece := t.Pieces[index]
if !piece.EverHashed {
return t.PieceLength(index)
}
@ -66,7 +68,7 @@ type torrent struct {
Pieces []*piece
length int64
data data.Data
data StatefulData
Info *metainfo.Info
// Active peer connections.
@ -93,6 +95,12 @@ type torrent struct {
pruneTimer *time.Timer
}
func (t *torrent) pieceComplete(piece int) bool {
// TODO: This is called when setting metadata, and before storage is
// assigned, which doesn't seem right.
return t.data != nil && t.data.PieceComplete(piece)
}
// A file-like handle to torrent data that implements SectionOpener. Opened
// sections will be reused so long as Reads and ReadAt's are contiguous.
type handle struct {
@ -301,11 +309,10 @@ func (t *torrent) setStorage(td data.Data) (err error) {
if c, ok := t.data.(io.Closer); ok {
c.Close()
}
t.data = td
if sd, ok := t.data.(StatefulData); ok {
for i, p := range t.Pieces {
p.complete = sd.PieceComplete(i)
}
if sd, ok := td.(StatefulData); ok {
t.data = sd
} else {
t.data = &statelessDataWrapper{td, make([]bool, t.Info.NumPieces())}
}
return
}
@ -351,7 +358,7 @@ func (t *torrent) Name() string {
func (t *torrent) pieceStatusChar(index int) byte {
p := t.Pieces[index]
switch {
case p.Complete():
case t.pieceComplete(index):
return 'C'
case p.QueuedForHash:
return 'Q'
@ -550,8 +557,8 @@ func (t *torrent) numPieces() int {
}
func (t *torrent) numPiecesCompleted() (num int) {
for _, p := range t.Pieces {
if p.Complete() {
for i := range iter.N(t.Info.NumPieces()) {
if t.pieceComplete(i) {
num++
}
}
@ -689,8 +696,8 @@ func (t *torrent) haveAllPieces() bool {
if !t.haveInfo() {
return false
}
for _, piece := range t.Pieces {
if !piece.Complete() {
for i := range t.Pieces {
if !t.pieceComplete(i) {
return false
}
}
@ -698,8 +705,8 @@ func (t *torrent) haveAllPieces() bool {
}
func (me *torrent) haveAnyPieces() bool {
for _, piece := range me.Pieces {
if piece.Complete() {
for i := range me.Pieces {
if me.pieceComplete(i) {
return true
}
}
@ -707,7 +714,7 @@ func (me *torrent) haveAnyPieces() bool {
}
func (t *torrent) havePiece(index int) bool {
return t.haveInfo() && t.Pieces[index].Complete()
return t.haveInfo() && t.pieceComplete(index)
}
func (t *torrent) haveChunk(r request) bool {
@ -732,7 +739,7 @@ func (t *torrent) wantPiece(index int) bool {
return false
}
p := t.Pieces[index]
return p.EverHashed && len(p.PendingChunkSpecs) != 0 && p.Priority != piecePriorityNone
return !t.pieceComplete(index) && p.Priority != piecePriorityNone && !p.QueuedForHash && !p.Hashing
}
func (t *torrent) connHasWantedPieces(c *connection) bool {