Various progress, particularly around the way data readiness is handled

This commit is contained in:
Matt Joiner 2014-03-20 04:30:08 +11:00
parent 998fbeb0c6
commit 9ee83455cf
4 changed files with 127 additions and 99 deletions

View File

@ -52,11 +52,12 @@ type piece struct {
Hash pieceSum
PendingChunkSpecs map[ChunkSpec]struct{}
Hashing bool
QueuedForHash bool
EverHashed bool
}
func (p *piece) Complete() bool {
return len(p.PendingChunkSpecs) == 0 && !p.Hashing && p.EverHashed
return len(p.PendingChunkSpecs) == 0 && p.EverHashed
}
func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) {
@ -126,7 +127,11 @@ func (c *Connection) Post(msg encoding.BinaryMarshaler) {
c.post <- msg
}
// Returns true if more requests can be sent.
func (c *Connection) Request(chunk Request) bool {
if !c.PeerPieces[chunk.Index] {
panic("peer doesn't have that piece!")
}
if len(c.Requests) >= maxRequests {
return false
}
@ -291,20 +296,19 @@ func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
// Currently doesn't really queue, but should in the future.
func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
piece := t.Pieces[pieceIndex]
if piece.Hashing {
if piece.QueuedForHash {
return
}
piece.Hashing = true
piece.QueuedForHash = true
go cl.verifyPiece(t, pieceIndex)
}
func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
log.Print(len_)
cl.mu.Lock()
defer cl.mu.Unlock()
t := cl.torrent(ih)
newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
for len_ != 0 {
for len_ > 0 {
// TODO: Write a function to return the Request for a given offset.
index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength)
@ -313,8 +317,8 @@ func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
cl.queuePieceCheck(t, index)
}
chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
if int64(chunk.Length) > len_ {
chunk.Length = peer_protocol.Integer(len_)
if chunk.Begin+chunk.Length > t.PieceLength(index) {
chunk.Length = t.PieceLength(index) - chunk.Begin
}
adv := int64(chunk.Length - pieceOff%chunkSize)
off += adv
@ -324,7 +328,7 @@ func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
}
newPriorities = append(newPriorities, Request{index, chunk})
}
if len(newPriorities) < 1 {
if len(newPriorities) == 0 {
return
}
log.Print(newPriorities)
@ -347,7 +351,7 @@ func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
func (t *Torrent) bitfield() (bf []bool) {
for _, p := range t.Pieces {
bf = append(bf, p.EverHashed && !p.Hashing && len(p.PendingChunkSpecs) == 0)
bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
}
return
}
@ -413,19 +417,20 @@ type DataSpec struct {
}
type Client struct {
DataDir string
HalfOpenLimit int
PeerId [20]byte
DataReady chan DataSpec
Listener net.Listener
DataDir string
HalfOpenLimit int
PeerId [20]byte
Listener net.Listener
DisableTrackers bool
sync.Mutex
mu *sync.Mutex
event sync.Cond
quit chan struct{}
halfOpen int
torrents map[InfoHash]*Torrent
halfOpen int
torrents map[InfoHash]*Torrent
dataWaiter chan struct{}
}
var (
@ -711,7 +716,7 @@ func (me *Client) peerGotPiece(torrent *Torrent, conn *Connection, piece int) {
func (t *Torrent) wantPiece(index int) bool {
p := t.Pieces[index]
return p.EverHashed && !p.Hashing && len(p.PendingChunkSpecs) != 0
return p.EverHashed && len(p.PendingChunkSpecs) != 0
}
func (me *Client) peerUnchoked(torrent *Torrent, conn *Connection) {
@ -731,7 +736,6 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
if err != nil {
return err
}
log.Print(msg.Type)
if msg.Keepalive {
continue
}
@ -803,6 +807,7 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
if err != nil {
return err
}
log.Print("replenishing from loop")
me.replenishConnRequests(torrent, conn)
}
}
@ -915,7 +920,9 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
return torrent.Close()
}
me.torrents[torrent.InfoHash] = torrent
go me.announceTorrent(torrent)
if !me.DisableTrackers {
go me.announceTorrent(torrent)
}
for i := range torrent.Pieces {
me.queuePieceCheck(torrent, peer_protocol.Integer(i))
}
@ -985,23 +992,32 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) {
addRequest := func(req Request) (again bool) {
piece := torrent.Pieces[req.Index]
if piece.Hashing {
// We can't be sure we want this.
log.Print("piece is hashing")
return true
}
if piece.Complete() {
log.Print("piece is complete")
// We already have this.
return true
}
if requestHeatMap[req] > 0 {
log.Print("piece is hot")
// We've already requested this.
return true
}
return conn.Request(req)
}
// First request prioritized chunks.
if torrent.Priorities != nil {
for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
log.Print(e.Value.(Request))
if !addRequest(e.Value.(Request)) {
return
}
}
}
// Then finish of incomplete pieces in order of bytes remaining.
for _, index := range torrent.piecesByPendingBytesDesc() {
if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
continue
@ -1042,22 +1058,26 @@ func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message)
}
func (cl *Client) dataReady(ds DataSpec) {
if cl.DataReady == nil {
return
if cl.dataWaiter != nil {
close(cl.dataWaiter)
}
go func() {
cl.DataReady <- ds
}()
cl.dataWaiter = nil
}
func (cl *Client) DataWaiter() <-chan struct{} {
cl.Lock()
defer cl.Unlock()
if cl.dataWaiter == nil {
cl.dataWaiter = make(chan struct{})
}
return cl.dataWaiter
}
func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct bool) {
p := t.Pieces[piece]
if !p.Hashing {
panic("invalid state")
}
p.Hashing = false
p.EverHashed = true
if correct {
log.Print("piece passed hash")
p.PendingChunkSpecs = nil
var next *list.Element
if t.Priorities != nil {
@ -1076,6 +1096,7 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b
},
})
} else {
log.Print("piece failed hash")
if len(p.PendingChunkSpecs) == 0 {
p.PendingChunkSpecs = t.pieceChunkSpecs(piece)
}
@ -1096,11 +1117,18 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b
}
func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) {
cl.mu.Lock()
p := t.Pieces[index]
for p.Hashing {
cl.event.Wait()
}
p.Hashing = true
p.QueuedForHash = false
cl.mu.Unlock()
sum := t.HashPiece(index)
cl.mu.Lock()
piece := t.Pieces[index]
cl.pieceHashed(t, index, sum == piece.Hash)
piece.Hashing = false
p.Hashing = false
cl.pieceHashed(t, index, sum == p.Hash)
cl.mu.Unlock()
}

View File

@ -4,6 +4,7 @@ import (
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"bitbucket.org/anacrolix/go.torrent"
"bitbucket.org/anacrolix/go.torrent/fs"
"flag"
metainfo "github.com/nsf/libtorgo/torrent"
"log"
@ -11,16 +12,21 @@ import (
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"os/user"
"path/filepath"
"sync"
"syscall"
"time"
)
var (
downloadDir string
torrentPath string
mountDir string
downloadDir string
torrentPath string
mountDir string
disableTrackers = flag.Bool("disableTrackers", false, "disables trackers")
testPeer = flag.String("testPeer", "", "the address for a test peer")
pprofAddr = flag.String("pprofAddr", "", "pprof HTTP server bind address")
testPeerAddr *net.TCPAddr
)
func init() {
@ -35,17 +41,51 @@ func init() {
flag.StringVar(&mountDir, "mountDir", "", "location the torrent contents are made available")
}
func resolveTestPeerAddr() {
if *testPeer == "" {
return
}
var err error
testPeerAddr, err = net.ResolveTCPAddr("tcp4", *testPeer)
if err != nil {
log.Fatal(err)
}
}
func setSignalHandlers() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-c
fuse.Unmount(mountDir)
}()
}
func main() {
pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address")
testPeer := flag.String("testPeer", "", "the address for a test peer")
flag.Parse()
if flag.NArg() != 0 {
os.Stderr.WriteString("one does not simply pass positional args\n")
os.Exit(2)
}
if mountDir == "" {
os.Stderr.WriteString("y u no specify mountpoint?\n")
os.Exit(2)
}
log.SetFlags(log.LstdFlags | log.Lshortfile)
if *pprofAddr != "" {
go http.ListenAndServe(*pprofAddr, nil)
}
conn, err := fuse.Mount(mountDir)
if err != nil {
log.Fatal(err)
}
defer fuse.Unmount(mountDir)
// TODO: Think about the ramifications of exiting not due to a signal.
setSignalHandlers()
defer conn.Close()
client := &torrent.Client{
DataDir: downloadDir,
HalfOpenLimit: 2,
DataDir: downloadDir,
DisableTrackers: *disableTrackers,
}
client.Start()
torrentDir, err := os.Open(torrentPath)
@ -57,13 +97,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
var testAddr *net.TCPAddr
if *testPeer != "" {
testAddr, err = net.ResolveTCPAddr("tcp4", *testPeer)
if err != nil {
log.Fatal(err)
}
}
resolveTestPeerAddr()
for _, name := range names {
metaInfo, err := metainfo.LoadFromFile(filepath.Join(torrentPath, name))
if err != nil {
@ -74,31 +108,23 @@ func main() {
log.Print(err)
}
}
conn, err := fuse.Mount(mountDir)
if err != nil {
log.Fatal(err)
}
fs := &TorrentFS{
Client: client,
DataSubs: make(map[chan torrent.DataSpec]struct{}),
}
go fs.publishData()
fs := torrentfs.New(client)
go func() {
for {
torrentLoop:
for _, t := range client.Torrents() {
client.Lock()
for _, c := range t.Conns {
if c.Socket.RemoteAddr().String() == testAddr.String() {
if c.Socket.RemoteAddr().String() == testPeerAddr.String() {
client.Unlock()
continue torrentLoop
}
}
client.Unlock()
if testAddr != nil {
if testPeerAddr != nil {
if err := client.AddPeers(t.InfoHash, []torrent.Peer{{
IP: testAddr.IP,
Port: testAddr.Port,
IP: testPeerAddr.IP,
Port: testPeerAddr.Port,
}}); err != nil {
log.Print(err)
}
@ -107,5 +133,7 @@ func main() {
time.Sleep(10 * time.Second)
}
}()
fusefs.Serve(conn, fs)
if err := fusefs.Serve(conn, fs); err != nil {
log.Fatal(err)
}
}

View File

@ -5,6 +5,7 @@ import (
fusefs "bazil.org/fuse/fs"
"bitbucket.org/anacrolix/go.torrent"
metainfo "github.com/nsf/libtorgo/torrent"
"log"
"os"
"sync"
)
@ -19,36 +20,6 @@ type torrentFS struct {
sync.Mutex
}
func (tfs *torrentFS) publishData() {
for {
spec := <-tfs.Client.DataReady
tfs.Lock()
for ds := range tfs.DataSubs {
ds <- spec
}
tfs.Unlock()
}
}
func (tfs *torrentFS) SubscribeData() chan torrent.DataSpec {
ch := make(chan torrent.DataSpec)
tfs.Lock()
tfs.DataSubs[ch] = struct{}{}
tfs.Unlock()
return ch
}
func (tfs *torrentFS) UnsubscribeData(ch chan torrent.DataSpec) {
go func() {
for _ = range ch {
}
}()
tfs.Lock()
delete(tfs.DataSubs, ch)
tfs.Unlock()
close(ch)
}
var _ fusefs.NodeForgetter = rootNode{}
type rootNode struct {
@ -78,8 +49,6 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus
if req.Dir {
panic("hodor")
}
dataSpecs := fn.FS.SubscribeData()
defer fn.FS.UnsubscribeData(dataSpecs)
data := make([]byte, func() int {
_len := int64(fn.size) - req.Offset
if int64(req.Size) < _len {
@ -94,8 +63,10 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus
}
infoHash := torrent.BytesInfoHash(fn.metaInfo.InfoHash)
torrentOff := fn.TorrentOffset + req.Offset
log.Print(torrentOff, len(data), fn.TorrentOffset)
fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data)))
for {
dataWaiter := fn.FS.Client.DataWaiter()
n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data)
switch err {
case nil:
@ -103,11 +74,12 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus
return nil
case torrent.ErrDataNotReady:
select {
case <-dataSpecs:
case <-dataWaiter:
case <-intr:
return fuse.EINTR
}
default:
log.Print(err)
return fuse.EIO
}
}
@ -256,6 +228,5 @@ func New(cl *torrent.Client) *torrentFS {
Client: cl,
DataSubs: make(map[chan torrent.DataSpec]struct{}),
}
go fs.publishData()
return fs
}

View File

@ -96,8 +96,7 @@ func TestDownloadOnDemand(t *testing.T) {
defer seeder.Stop()
seeder.AddTorrent(metaInfo)
leecher := torrent.Client{
DataDir: filepath.Join(dir, "download"),
DataReady: make(chan torrent.DataSpec),
DataDir: filepath.Join(dir, "download"),
}
leecher.Start()
defer leecher.Stop()
@ -116,6 +115,11 @@ func TestDownloadOnDemand(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer func() {
if err := fuse.Unmount(mountDir); err != nil {
t.Fatal(err)
}
}()
go func() {
if err := fusefs.Serve(fuseConn, fs); err != nil {
t.Fatal(err)
@ -132,9 +136,6 @@ func TestDownloadOnDemand(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := fuse.Unmount(mountDir); err != nil {
t.Fatal(err)
}
if string(content) != dummyFileContents {
t.FailNow()
}