From fa000c4f882cbf1442ca5c0f9c81ba269a3c7416 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 20 Mar 2014 16:58:09 +1100 Subject: [PATCH] Improvements to pending chunks; extract testutil --- client.go | 99 +++++++++++++++++++++++++++++++------------- client_test.go | 31 ++++++++++++++ cmd/torrent/main.go | 16 +++---- fs/torrentfs_test.go | 51 +++++++---------------- testutil/testutil.go | 44 ++++++++++++++++++++ 5 files changed, 168 insertions(+), 73 deletions(-) create mode 100644 testutil/testutil.go diff --git a/client.go b/client.go index 8f163780..f7b5ebae 100644 --- a/client.go +++ b/client.go @@ -1,9 +1,6 @@ package torrent import ( - "bitbucket.org/anacrolix/go.torrent/peer_protocol" - "bitbucket.org/anacrolix/go.torrent/tracker" - _ "bitbucket.org/anacrolix/go.torrent/tracker/udp" "bufio" "container/list" "crypto" @@ -11,9 +8,7 @@ import ( "encoding" "errors" "fmt" - metainfo "github.com/nsf/libtorgo/torrent" "io" - "launchpad.net/gommap" "log" mathRand "math/rand" "net" @@ -22,6 +17,13 @@ import ( "sort" "sync" "time" + + metainfo "github.com/nsf/libtorgo/torrent" + + "bitbucket.org/anacrolix/go.torrent/peer_protocol" + "bitbucket.org/anacrolix/go.torrent/tracker" + _ "bitbucket.org/anacrolix/go.torrent/tracker/udp" + "launchpad.net/gommap" ) const ( @@ -252,6 +254,10 @@ type Torrent struct { Trackers [][]tracker.Client } +func (t *Torrent) NumPieces() int { + return len(t.MetaInfo.Pieces) / PieceHash.Size() +} + func (t *Torrent) Length() int64 { return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0)) } @@ -303,6 +309,25 @@ func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) go cl.verifyPiece(t, pieceIndex) } +func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) { + req.Index = peer_protocol.Integer(off / t.MetaInfo.PieceLength) + if req.Index < 0 || int(req.Index) >= len(t.Pieces) { + return + } + off %= t.MetaInfo.PieceLength + pieceLeft := t.PieceLength(req.Index) - peer_protocol.Integer(off) + if pieceLeft <= 0 { + return + } + req.Begin = chunkSize * (peer_protocol.Integer(off) / chunkSize) + req.Length = chunkSize + if req.Length > pieceLeft { + req.Length = pieceLeft + } + ok = true + return +} + func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) { cl.mu.Lock() defer cl.mu.Unlock() @@ -310,23 +335,16 @@ func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) { newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize) for len_ > 0 { // TODO: Write a function to return the Request for a given offset. - index := peer_protocol.Integer(off / t.MetaInfo.PieceLength) - pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength) - piece := t.Pieces[index] - if !piece.EverHashed { - cl.queuePieceCheck(t, index) + req, ok := t.offsetRequest(off) + if !ok { + break } - chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize} - if chunk.Begin+chunk.Length > t.PieceLength(index) { - chunk.Length = t.PieceLength(index) - chunk.Begin - } - adv := int64(chunk.Length - pieceOff%chunkSize) - off += adv - len_ -= adv - if _, ok := piece.PendingChunkSpecs[chunk]; !ok && !piece.Hashing { + off += int64(req.Length) + len_ -= int64(req.Length) + if _, ok = t.Pieces[req.Index].PendingChunkSpecs[req.ChunkSpec]; !ok { continue } - newPriorities = append(newPriorities, Request{index, chunk}) + newPriorities = append(newPriorities, req) } if len(newPriorities) == 0 { return @@ -356,12 +374,19 @@ func (t *Torrent) bitfield() (bf []bool) { return } -func (t *Torrent) pieceChunkSpecs(index peer_protocol.Integer) (cs map[ChunkSpec]struct{}) { - cs = make(map[ChunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize) +func (t *Torrent) pendAllChunkSpecs(index peer_protocol.Integer) { + piece := t.Pieces[index] + if piece.PendingChunkSpecs == nil { + piece.PendingChunkSpecs = make( + map[ChunkSpec]struct{}, + (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize) + } c := ChunkSpec{ Begin: 0, } - for left := peer_protocol.Integer(t.PieceLength(index)); left > 0; left -= c.Length { + cs := piece.PendingChunkSpecs + log.Print(index, t.PieceLength(index)) + for left := peer_protocol.Integer(t.PieceLength(index)); left != 0; left -= c.Length { c.Length = left if c.Length > chunkSize { c.Length = chunkSize @@ -389,7 +414,7 @@ type Peer struct { } func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) { - if int(piece) == len(t.Pieces)-1 { + if int(piece) == t.NumPieces()-1 { len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength) } if len_ == 0 { @@ -504,6 +529,15 @@ func (c *Client) Start() { } } +func (cl *Client) stopped() bool { + select { + case <-cl.quit: + return true + default: + return false + } +} + func (me *Client) Stop() { close(me.quit) me.event.Broadcast() @@ -730,10 +764,14 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error { } for { me.mu.Unlock() + // TODO: Can this be allocated on the stack? msg := new(peer_protocol.Message) err := decoder.Decode(msg) me.mu.Lock() if err != nil { + if me.stopped() { + return nil + } return err } if msg.Keepalive { @@ -872,6 +910,10 @@ func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent, InfoHash: BytesInfoHash(metaInfo.InfoHash), MetaInfo: metaInfo, } + torrent.Data, err = mmapTorrentData(metaInfo, dataDir) + if err != nil { + return + } for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() { hash := metaInfo.Pieces[offset : offset+PieceHash.Size()] if len(hash) != PieceHash.Size() { @@ -881,10 +923,7 @@ func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent, piece := &piece{} copyHashSum(piece.Hash[:], hash) torrent.Pieces = append(torrent.Pieces, piece) - } - torrent.Data, err = mmapTorrentData(metaInfo, dataDir) - if err != nil { - return + torrent.pendAllChunkSpecs(peer_protocol.Integer(len(torrent.Pieces) - 1)) } torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList)) for tierIndex := range metaInfo.AnnounceList { @@ -1028,7 +1067,9 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) { } } } - conn.SetInterested(false) + if len(conn.Requests) == 0 { + conn.SetInterested(false) + } } func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) { @@ -1098,7 +1139,7 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b } else { log.Print("piece failed hash") if len(p.PendingChunkSpecs) == 0 { - p.PendingChunkSpecs = t.pieceChunkSpecs(piece) + t.pendAllChunkSpecs(piece) } } for _, conn := range t.Conns { diff --git a/client_test.go b/client_test.go index ec15ee46..4cb963ac 100644 --- a/client_test.go +++ b/client_test.go @@ -1,6 +1,10 @@ package torrent import ( + "os" + + "bitbucket.org/anacrolix/go.torrent/testutil" + "testing" ) @@ -15,3 +19,30 @@ func TestAddTorrentNoUsableURLs(t *testing.T) { func TestAddPeersToUnknownTorrent(t *testing.T) { t.SkipNow() } + +func TestPieceHashSize(t *testing.T) { + if PieceHash.Size() != 20 { + t.FailNow() + } +} + +func TestTorrentInitialState(t *testing.T) { + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + tor, err := newTorrent(mi, "") + if err != nil { + t.Fatal(err) + } + if len(tor.Pieces) != 1 { + t.Fatal("wrong number of pieces") + } + p := tor.Pieces[0] + if len(p.PendingChunkSpecs) != 1 { + t.Fatalf("should only be 1 chunk: %s", p.PendingChunkSpecs) + } + if _, ok := p.PendingChunkSpecs[ChunkSpec{ + Length: 13, + }]; !ok { + t.Fatal("pending chunk spec is incorrect") + } +} diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go index 6f53f6c0..cbc9512b 100644 --- a/cmd/torrent/main.go +++ b/cmd/torrent/main.go @@ -1,16 +1,17 @@ package main import ( - "bitbucket.org/anacrolix/go.torrent" - "bitbucket.org/anacrolix/go.torrent/tracker" "flag" "fmt" - metainfo "github.com/nsf/libtorgo/torrent" "log" "net" "net/http" _ "net/http/pprof" "os" + + metainfo "github.com/nsf/libtorgo/torrent" + + "bitbucket.org/anacrolix/go.torrent" ) var ( @@ -30,7 +31,6 @@ func main() { } client := torrent.Client{ DataDir: *downloadDir, - // HalfOpenLimit: 2, } client.Start() defer client.Stop() @@ -47,6 +47,7 @@ func main() { if err != nil { log.Fatal(err) } + client.PrioritizeDataRegion(torrent.BytesInfoHash(metaInfo.InfoHash), 0, 999999999) err = client.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), func() []torrent.Peer { if *testPeer == "" { return nil @@ -56,10 +57,9 @@ func main() { log.Fatal(err) } return []torrent.Peer{{ - Peer: tracker.Peer{ - IP: addr.IP, - Port: addr.Port, - }}} + IP: addr.IP, + Port: addr.Port, + }} }()) if err != nil { log.Fatal(err) diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index 0c38850d..f43b913d 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -1,18 +1,20 @@ package torrentfs import ( - "bazil.org/fuse" - fusefs "bazil.org/fuse/fs" - "bitbucket.org/anacrolix/go.torrent" "bytes" - metainfo "github.com/nsf/libtorgo/torrent" - "io" "io/ioutil" "net" "os" "path/filepath" - "runtime" "testing" + "time" + + "bitbucket.org/anacrolix/go.torrent/testutil" + + "bazil.org/fuse" + fusefs "bazil.org/fuse/fs" + "bitbucket.org/anacrolix/go.torrent" + metainfo "github.com/nsf/libtorgo/torrent" ) func TestTCPAddrString(t *testing.T) { @@ -37,34 +39,7 @@ func TestTCPAddrString(t *testing.T) { } } -const dummyFileContents = "hello, world\n" - -func createDummyTorrentData(dirName string) string { - f, _ := os.Create(filepath.Join(dirName, "greeting")) - f.WriteString("hello, world\n") - return f.Name() -} - -func createMetaInfo(name string, w io.Writer) { - builder := metainfo.Builder{} - builder.AddFile(name) - builder.AddAnnounceGroup([]string{"lol://cheezburger"}) - batch, err := builder.Submit() - if err != nil { - panic(err) - } - errs, _ := batch.Start(w, 1) - <-errs -} - func TestDownloadOnDemand(t *testing.T) { - priorNumGoroutines := runtime.NumGoroutine() - defer func() { - n := runtime.NumGoroutine() - if n != priorNumGoroutines { - t.Fatalf("expected %d goroutines, but %d are running", priorNumGoroutines, n) - } - }() dir, err := ioutil.TempDir("", "torrentfs") if err != nil { t.Fatal(err) @@ -77,9 +52,9 @@ func TestDownloadOnDemand(t *testing.T) { t.Logf("test directory: %s", dir) finishedDir := filepath.Join(dir, "finished") os.Mkdir(finishedDir, 0777) - name := createDummyTorrentData(finishedDir) + name := testutil.CreateDummyTorrentData(finishedDir) metaInfoBuf := &bytes.Buffer{} - createMetaInfo(name, metaInfoBuf) + testutil.CreateMetaInfo(name, metaInfoBuf) metaInfo, err := metainfo.Load(metaInfoBuf) seeder := torrent.Client{ DataDir: finishedDir, @@ -132,11 +107,15 @@ func TestDownloadOnDemand(t *testing.T) { if fuseConn.MountError != nil { t.Fatal(fuseConn.MountError) } + go func() { + time.Sleep(10 * time.Second) + fuse.Unmount(mountDir) + }() content, err := ioutil.ReadFile(filepath.Join(mountDir, "greeting")) if err != nil { t.Fatal(err) } - if string(content) != dummyFileContents { + if string(content) != testutil.GreetingFileContents { t.FailNow() } } diff --git a/testutil/testutil.go b/testutil/testutil.go new file mode 100644 index 00000000..4dba8144 --- /dev/null +++ b/testutil/testutil.go @@ -0,0 +1,44 @@ +package testutil + +import ( + "io" + "io/ioutil" + "os" + "path/filepath" + + metainfo "github.com/nsf/libtorgo/torrent" + + "bytes" +) + +const GreetingFileContents = "hello, world\n" + +func CreateDummyTorrentData(dirName string) string { + f, _ := os.Create(filepath.Join(dirName, "greeting")) + f.WriteString("hello, world\n") + return f.Name() +} + +func CreateMetaInfo(name string, w io.Writer) { + builder := metainfo.Builder{} + builder.AddFile(name) + builder.AddAnnounceGroup([]string{"lol://cheezburger"}) + batch, err := builder.Submit() + if err != nil { + panic(err) + } + errs, _ := batch.Start(w, 1) + <-errs +} + +func GreetingTestTorrent() (tempDir string, metaInfo *metainfo.MetaInfo) { + tempDir, err := ioutil.TempDir(os.TempDir(), "") + if err != nil { + panic(err) + } + name := CreateDummyTorrentData(tempDir) + w := &bytes.Buffer{} + CreateMetaInfo(name, w) + metaInfo, _ = metainfo.Load(w) + return +}