diff --git a/client.go b/client.go index 077a5ac1..8f163780 100644 --- a/client.go +++ b/client.go @@ -52,11 +52,12 @@ type piece struct { Hash pieceSum PendingChunkSpecs map[ChunkSpec]struct{} Hashing bool + QueuedForHash bool EverHashed bool } func (p *piece) Complete() bool { - return len(p.PendingChunkSpecs) == 0 && !p.Hashing && p.EverHashed + return len(p.PendingChunkSpecs) == 0 && p.EverHashed } func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) { @@ -126,7 +127,11 @@ func (c *Connection) Post(msg encoding.BinaryMarshaler) { c.post <- msg } +// Returns true if more requests can be sent. func (c *Connection) Request(chunk Request) bool { + if !c.PeerPieces[chunk.Index] { + panic("peer doesn't have that piece!") + } if len(c.Requests) >= maxRequests { return false } @@ -291,20 +296,19 @@ func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) { // Currently doesn't really queue, but should in the future. func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) { piece := t.Pieces[pieceIndex] - if piece.Hashing { + if piece.QueuedForHash { return } - piece.Hashing = true + piece.QueuedForHash = true go cl.verifyPiece(t, pieceIndex) } func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) { - log.Print(len_) cl.mu.Lock() defer cl.mu.Unlock() t := cl.torrent(ih) newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize) - for len_ != 0 { + for len_ > 0 { // TODO: Write a function to return the Request for a given offset. index := peer_protocol.Integer(off / t.MetaInfo.PieceLength) pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength) @@ -313,8 +317,8 @@ func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) { cl.queuePieceCheck(t, index) } chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize} - if int64(chunk.Length) > len_ { - chunk.Length = peer_protocol.Integer(len_) + if chunk.Begin+chunk.Length > t.PieceLength(index) { + chunk.Length = t.PieceLength(index) - chunk.Begin } adv := int64(chunk.Length - pieceOff%chunkSize) off += adv @@ -324,7 +328,7 @@ func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) { } newPriorities = append(newPriorities, Request{index, chunk}) } - if len(newPriorities) < 1 { + if len(newPriorities) == 0 { return } log.Print(newPriorities) @@ -347,7 +351,7 @@ func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) { func (t *Torrent) bitfield() (bf []bool) { for _, p := range t.Pieces { - bf = append(bf, p.EverHashed && !p.Hashing && len(p.PendingChunkSpecs) == 0) + bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0) } return } @@ -413,19 +417,20 @@ type DataSpec struct { } type Client struct { - DataDir string - HalfOpenLimit int - PeerId [20]byte - DataReady chan DataSpec - Listener net.Listener + DataDir string + HalfOpenLimit int + PeerId [20]byte + Listener net.Listener + DisableTrackers bool sync.Mutex mu *sync.Mutex event sync.Cond quit chan struct{} - halfOpen int - torrents map[InfoHash]*Torrent + halfOpen int + torrents map[InfoHash]*Torrent + dataWaiter chan struct{} } var ( @@ -711,7 +716,7 @@ func (me *Client) peerGotPiece(torrent *Torrent, conn *Connection, piece int) { func (t *Torrent) wantPiece(index int) bool { p := t.Pieces[index] - return p.EverHashed && !p.Hashing && len(p.PendingChunkSpecs) != 0 + return p.EverHashed && len(p.PendingChunkSpecs) != 0 } func (me *Client) peerUnchoked(torrent *Torrent, conn *Connection) { @@ -731,7 +736,6 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error { if err != nil { return err } - log.Print(msg.Type) if msg.Keepalive { continue } @@ -803,6 +807,7 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error { if err != nil { return err } + log.Print("replenishing from loop") me.replenishConnRequests(torrent, conn) } } @@ -915,7 +920,9 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error { return torrent.Close() } me.torrents[torrent.InfoHash] = torrent - go me.announceTorrent(torrent) + if !me.DisableTrackers { + go me.announceTorrent(torrent) + } for i := range torrent.Pieces { me.queuePieceCheck(torrent, peer_protocol.Integer(i)) } @@ -985,23 +992,32 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) { addRequest := func(req Request) (again bool) { piece := torrent.Pieces[req.Index] if piece.Hashing { + // We can't be sure we want this. + log.Print("piece is hashing") return true } if piece.Complete() { + log.Print("piece is complete") + // We already have this. return true } if requestHeatMap[req] > 0 { + log.Print("piece is hot") + // We've already requested this. return true } return conn.Request(req) } + // First request prioritized chunks. if torrent.Priorities != nil { for e := torrent.Priorities.Front(); e != nil; e = e.Next() { + log.Print(e.Value.(Request)) if !addRequest(e.Value.(Request)) { return } } } + // Then finish of incomplete pieces in order of bytes remaining. for _, index := range torrent.piecesByPendingBytesDesc() { if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) { continue @@ -1042,22 +1058,26 @@ func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) } func (cl *Client) dataReady(ds DataSpec) { - if cl.DataReady == nil { - return + if cl.dataWaiter != nil { + close(cl.dataWaiter) } - go func() { - cl.DataReady <- ds - }() + cl.dataWaiter = nil +} + +func (cl *Client) DataWaiter() <-chan struct{} { + cl.Lock() + defer cl.Unlock() + if cl.dataWaiter == nil { + cl.dataWaiter = make(chan struct{}) + } + return cl.dataWaiter } func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct bool) { p := t.Pieces[piece] - if !p.Hashing { - panic("invalid state") - } - p.Hashing = false p.EverHashed = true if correct { + log.Print("piece passed hash") p.PendingChunkSpecs = nil var next *list.Element if t.Priorities != nil { @@ -1076,6 +1096,7 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b }, }) } else { + log.Print("piece failed hash") if len(p.PendingChunkSpecs) == 0 { p.PendingChunkSpecs = t.pieceChunkSpecs(piece) } @@ -1096,11 +1117,18 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b } func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) { + cl.mu.Lock() + p := t.Pieces[index] + for p.Hashing { + cl.event.Wait() + } + p.Hashing = true + p.QueuedForHash = false + cl.mu.Unlock() sum := t.HashPiece(index) cl.mu.Lock() - piece := t.Pieces[index] - cl.pieceHashed(t, index, sum == piece.Hash) - piece.Hashing = false + p.Hashing = false + cl.pieceHashed(t, index, sum == p.Hash) cl.mu.Unlock() } diff --git a/cmd/torrentfs/main.go b/cmd/torrentfs/main.go index f13774c1..7a13a3fe 100644 --- a/cmd/torrentfs/main.go +++ b/cmd/torrentfs/main.go @@ -4,6 +4,7 @@ import ( "bazil.org/fuse" fusefs "bazil.org/fuse/fs" "bitbucket.org/anacrolix/go.torrent" + "bitbucket.org/anacrolix/go.torrent/fs" "flag" metainfo "github.com/nsf/libtorgo/torrent" "log" @@ -11,16 +12,21 @@ import ( "net/http" _ "net/http/pprof" "os" + "os/signal" "os/user" "path/filepath" - "sync" + "syscall" "time" ) var ( - downloadDir string - torrentPath string - mountDir string + downloadDir string + torrentPath string + mountDir string + disableTrackers = flag.Bool("disableTrackers", false, "disables trackers") + testPeer = flag.String("testPeer", "", "the address for a test peer") + pprofAddr = flag.String("pprofAddr", "", "pprof HTTP server bind address") + testPeerAddr *net.TCPAddr ) func init() { @@ -35,17 +41,51 @@ func init() { flag.StringVar(&mountDir, "mountDir", "", "location the torrent contents are made available") } +func resolveTestPeerAddr() { + if *testPeer == "" { + return + } + var err error + testPeerAddr, err = net.ResolveTCPAddr("tcp4", *testPeer) + if err != nil { + log.Fatal(err) + } +} + +func setSignalHandlers() { + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-c + fuse.Unmount(mountDir) + }() +} + func main() { - pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address") - testPeer := flag.String("testPeer", "", "the address for a test peer") flag.Parse() + if flag.NArg() != 0 { + os.Stderr.WriteString("one does not simply pass positional args\n") + os.Exit(2) + } + if mountDir == "" { + os.Stderr.WriteString("y u no specify mountpoint?\n") + os.Exit(2) + } log.SetFlags(log.LstdFlags | log.Lshortfile) if *pprofAddr != "" { go http.ListenAndServe(*pprofAddr, nil) } + conn, err := fuse.Mount(mountDir) + if err != nil { + log.Fatal(err) + } + defer fuse.Unmount(mountDir) + // TODO: Think about the ramifications of exiting not due to a signal. + setSignalHandlers() + defer conn.Close() client := &torrent.Client{ - DataDir: downloadDir, - HalfOpenLimit: 2, + DataDir: downloadDir, + DisableTrackers: *disableTrackers, } client.Start() torrentDir, err := os.Open(torrentPath) @@ -57,13 +97,7 @@ func main() { if err != nil { log.Fatal(err) } - var testAddr *net.TCPAddr - if *testPeer != "" { - testAddr, err = net.ResolveTCPAddr("tcp4", *testPeer) - if err != nil { - log.Fatal(err) - } - } + resolveTestPeerAddr() for _, name := range names { metaInfo, err := metainfo.LoadFromFile(filepath.Join(torrentPath, name)) if err != nil { @@ -74,31 +108,23 @@ func main() { log.Print(err) } } - conn, err := fuse.Mount(mountDir) - if err != nil { - log.Fatal(err) - } - fs := &TorrentFS{ - Client: client, - DataSubs: make(map[chan torrent.DataSpec]struct{}), - } - go fs.publishData() + fs := torrentfs.New(client) go func() { for { torrentLoop: for _, t := range client.Torrents() { client.Lock() for _, c := range t.Conns { - if c.Socket.RemoteAddr().String() == testAddr.String() { + if c.Socket.RemoteAddr().String() == testPeerAddr.String() { client.Unlock() continue torrentLoop } } client.Unlock() - if testAddr != nil { + if testPeerAddr != nil { if err := client.AddPeers(t.InfoHash, []torrent.Peer{{ - IP: testAddr.IP, - Port: testAddr.Port, + IP: testPeerAddr.IP, + Port: testPeerAddr.Port, }}); err != nil { log.Print(err) } @@ -107,5 +133,7 @@ func main() { time.Sleep(10 * time.Second) } }() - fusefs.Serve(conn, fs) + if err := fusefs.Serve(conn, fs); err != nil { + log.Fatal(err) + } } diff --git a/fs/torrentfs.go b/fs/torrentfs.go index 0d8ae09e..44710e8a 100644 --- a/fs/torrentfs.go +++ b/fs/torrentfs.go @@ -5,6 +5,7 @@ import ( fusefs "bazil.org/fuse/fs" "bitbucket.org/anacrolix/go.torrent" metainfo "github.com/nsf/libtorgo/torrent" + "log" "os" "sync" ) @@ -19,36 +20,6 @@ type torrentFS struct { sync.Mutex } -func (tfs *torrentFS) publishData() { - for { - spec := <-tfs.Client.DataReady - tfs.Lock() - for ds := range tfs.DataSubs { - ds <- spec - } - tfs.Unlock() - } -} - -func (tfs *torrentFS) SubscribeData() chan torrent.DataSpec { - ch := make(chan torrent.DataSpec) - tfs.Lock() - tfs.DataSubs[ch] = struct{}{} - tfs.Unlock() - return ch -} - -func (tfs *torrentFS) UnsubscribeData(ch chan torrent.DataSpec) { - go func() { - for _ = range ch { - } - }() - tfs.Lock() - delete(tfs.DataSubs, ch) - tfs.Unlock() - close(ch) -} - var _ fusefs.NodeForgetter = rootNode{} type rootNode struct { @@ -78,8 +49,6 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus if req.Dir { panic("hodor") } - dataSpecs := fn.FS.SubscribeData() - defer fn.FS.UnsubscribeData(dataSpecs) data := make([]byte, func() int { _len := int64(fn.size) - req.Offset if int64(req.Size) < _len { @@ -94,8 +63,10 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus } infoHash := torrent.BytesInfoHash(fn.metaInfo.InfoHash) torrentOff := fn.TorrentOffset + req.Offset + log.Print(torrentOff, len(data), fn.TorrentOffset) fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data))) for { + dataWaiter := fn.FS.Client.DataWaiter() n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data) switch err { case nil: @@ -103,11 +74,12 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus return nil case torrent.ErrDataNotReady: select { - case <-dataSpecs: + case <-dataWaiter: case <-intr: return fuse.EINTR } default: + log.Print(err) return fuse.EIO } } @@ -256,6 +228,5 @@ func New(cl *torrent.Client) *torrentFS { Client: cl, DataSubs: make(map[chan torrent.DataSpec]struct{}), } - go fs.publishData() return fs } diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index a1905c26..0c38850d 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -96,8 +96,7 @@ func TestDownloadOnDemand(t *testing.T) { defer seeder.Stop() seeder.AddTorrent(metaInfo) leecher := torrent.Client{ - DataDir: filepath.Join(dir, "download"), - DataReady: make(chan torrent.DataSpec), + DataDir: filepath.Join(dir, "download"), } leecher.Start() defer leecher.Stop() @@ -116,6 +115,11 @@ func TestDownloadOnDemand(t *testing.T) { if err != nil { t.Fatal(err) } + defer func() { + if err := fuse.Unmount(mountDir); err != nil { + t.Fatal(err) + } + }() go func() { if err := fusefs.Serve(fuseConn, fs); err != nil { t.Fatal(err) @@ -132,9 +136,6 @@ func TestDownloadOnDemand(t *testing.T) { if err != nil { t.Fatal(err) } - if err := fuse.Unmount(mountDir); err != nil { - t.Fatal(err) - } if string(content) != dummyFileContents { t.FailNow() }