Improvements to pending chunks; extract testutil

This commit is contained in:
Matt Joiner 2014-03-20 16:58:09 +11:00
parent 771a08d4f4
commit fa000c4f88
5 changed files with 168 additions and 73 deletions

View File

@ -1,9 +1,6 @@
package torrent package torrent
import ( import (
"bitbucket.org/anacrolix/go.torrent/peer_protocol"
"bitbucket.org/anacrolix/go.torrent/tracker"
_ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
"bufio" "bufio"
"container/list" "container/list"
"crypto" "crypto"
@ -11,9 +8,7 @@ import (
"encoding" "encoding"
"errors" "errors"
"fmt" "fmt"
metainfo "github.com/nsf/libtorgo/torrent"
"io" "io"
"launchpad.net/gommap"
"log" "log"
mathRand "math/rand" mathRand "math/rand"
"net" "net"
@ -22,6 +17,13 @@ import (
"sort" "sort"
"sync" "sync"
"time" "time"
metainfo "github.com/nsf/libtorgo/torrent"
"bitbucket.org/anacrolix/go.torrent/peer_protocol"
"bitbucket.org/anacrolix/go.torrent/tracker"
_ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
"launchpad.net/gommap"
) )
const ( const (
@ -252,6 +254,10 @@ type Torrent struct {
Trackers [][]tracker.Client Trackers [][]tracker.Client
} }
func (t *Torrent) NumPieces() int {
return len(t.MetaInfo.Pieces) / PieceHash.Size()
}
func (t *Torrent) Length() int64 { func (t *Torrent) Length() int64 {
return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0)) return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0))
} }
@ -303,6 +309,25 @@ func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer)
go cl.verifyPiece(t, pieceIndex) go cl.verifyPiece(t, pieceIndex)
} }
func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
req.Index = peer_protocol.Integer(off / t.MetaInfo.PieceLength)
if req.Index < 0 || int(req.Index) >= len(t.Pieces) {
return
}
off %= t.MetaInfo.PieceLength
pieceLeft := t.PieceLength(req.Index) - peer_protocol.Integer(off)
if pieceLeft <= 0 {
return
}
req.Begin = chunkSize * (peer_protocol.Integer(off) / chunkSize)
req.Length = chunkSize
if req.Length > pieceLeft {
req.Length = pieceLeft
}
ok = true
return
}
func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) { func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
cl.mu.Lock() cl.mu.Lock()
defer cl.mu.Unlock() defer cl.mu.Unlock()
@ -310,23 +335,16 @@ func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize) newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
for len_ > 0 { for len_ > 0 {
// TODO: Write a function to return the Request for a given offset. // TODO: Write a function to return the Request for a given offset.
index := peer_protocol.Integer(off / t.MetaInfo.PieceLength) req, ok := t.offsetRequest(off)
pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength) if !ok {
piece := t.Pieces[index] break
if !piece.EverHashed {
cl.queuePieceCheck(t, index)
} }
chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize} off += int64(req.Length)
if chunk.Begin+chunk.Length > t.PieceLength(index) { len_ -= int64(req.Length)
chunk.Length = t.PieceLength(index) - chunk.Begin if _, ok = t.Pieces[req.Index].PendingChunkSpecs[req.ChunkSpec]; !ok {
}
adv := int64(chunk.Length - pieceOff%chunkSize)
off += adv
len_ -= adv
if _, ok := piece.PendingChunkSpecs[chunk]; !ok && !piece.Hashing {
continue continue
} }
newPriorities = append(newPriorities, Request{index, chunk}) newPriorities = append(newPriorities, req)
} }
if len(newPriorities) == 0 { if len(newPriorities) == 0 {
return return
@ -356,12 +374,19 @@ func (t *Torrent) bitfield() (bf []bool) {
return return
} }
func (t *Torrent) pieceChunkSpecs(index peer_protocol.Integer) (cs map[ChunkSpec]struct{}) { func (t *Torrent) pendAllChunkSpecs(index peer_protocol.Integer) {
cs = make(map[ChunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize) piece := t.Pieces[index]
if piece.PendingChunkSpecs == nil {
piece.PendingChunkSpecs = make(
map[ChunkSpec]struct{},
(t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
}
c := ChunkSpec{ c := ChunkSpec{
Begin: 0, Begin: 0,
} }
for left := peer_protocol.Integer(t.PieceLength(index)); left > 0; left -= c.Length { cs := piece.PendingChunkSpecs
log.Print(index, t.PieceLength(index))
for left := peer_protocol.Integer(t.PieceLength(index)); left != 0; left -= c.Length {
c.Length = left c.Length = left
if c.Length > chunkSize { if c.Length > chunkSize {
c.Length = chunkSize c.Length = chunkSize
@ -389,7 +414,7 @@ type Peer struct {
} }
func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) { func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
if int(piece) == len(t.Pieces)-1 { if int(piece) == t.NumPieces()-1 {
len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength) len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
} }
if len_ == 0 { if len_ == 0 {
@ -504,6 +529,15 @@ func (c *Client) Start() {
} }
} }
func (cl *Client) stopped() bool {
select {
case <-cl.quit:
return true
default:
return false
}
}
func (me *Client) Stop() { func (me *Client) Stop() {
close(me.quit) close(me.quit)
me.event.Broadcast() me.event.Broadcast()
@ -730,10 +764,14 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
} }
for { for {
me.mu.Unlock() me.mu.Unlock()
// TODO: Can this be allocated on the stack?
msg := new(peer_protocol.Message) msg := new(peer_protocol.Message)
err := decoder.Decode(msg) err := decoder.Decode(msg)
me.mu.Lock() me.mu.Lock()
if err != nil { if err != nil {
if me.stopped() {
return nil
}
return err return err
} }
if msg.Keepalive { if msg.Keepalive {
@ -872,6 +910,10 @@ func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent,
InfoHash: BytesInfoHash(metaInfo.InfoHash), InfoHash: BytesInfoHash(metaInfo.InfoHash),
MetaInfo: metaInfo, MetaInfo: metaInfo,
} }
torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
if err != nil {
return
}
for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() { for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() {
hash := metaInfo.Pieces[offset : offset+PieceHash.Size()] hash := metaInfo.Pieces[offset : offset+PieceHash.Size()]
if len(hash) != PieceHash.Size() { if len(hash) != PieceHash.Size() {
@ -881,10 +923,7 @@ func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent,
piece := &piece{} piece := &piece{}
copyHashSum(piece.Hash[:], hash) copyHashSum(piece.Hash[:], hash)
torrent.Pieces = append(torrent.Pieces, piece) torrent.Pieces = append(torrent.Pieces, piece)
} torrent.pendAllChunkSpecs(peer_protocol.Integer(len(torrent.Pieces) - 1))
torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
if err != nil {
return
} }
torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList)) torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
for tierIndex := range metaInfo.AnnounceList { for tierIndex := range metaInfo.AnnounceList {
@ -1028,8 +1067,10 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) {
} }
} }
} }
if len(conn.Requests) == 0 {
conn.SetInterested(false) conn.SetInterested(false)
} }
}
func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) { func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) {
request := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}} request := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
@ -1098,7 +1139,7 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b
} else { } else {
log.Print("piece failed hash") log.Print("piece failed hash")
if len(p.PendingChunkSpecs) == 0 { if len(p.PendingChunkSpecs) == 0 {
p.PendingChunkSpecs = t.pieceChunkSpecs(piece) t.pendAllChunkSpecs(piece)
} }
} }
for _, conn := range t.Conns { for _, conn := range t.Conns {

View File

@ -1,6 +1,10 @@
package torrent package torrent
import ( import (
"os"
"bitbucket.org/anacrolix/go.torrent/testutil"
"testing" "testing"
) )
@ -15,3 +19,30 @@ func TestAddTorrentNoUsableURLs(t *testing.T) {
func TestAddPeersToUnknownTorrent(t *testing.T) { func TestAddPeersToUnknownTorrent(t *testing.T) {
t.SkipNow() t.SkipNow()
} }
func TestPieceHashSize(t *testing.T) {
if PieceHash.Size() != 20 {
t.FailNow()
}
}
func TestTorrentInitialState(t *testing.T) {
dir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(dir)
tor, err := newTorrent(mi, "")
if err != nil {
t.Fatal(err)
}
if len(tor.Pieces) != 1 {
t.Fatal("wrong number of pieces")
}
p := tor.Pieces[0]
if len(p.PendingChunkSpecs) != 1 {
t.Fatalf("should only be 1 chunk: %s", p.PendingChunkSpecs)
}
if _, ok := p.PendingChunkSpecs[ChunkSpec{
Length: 13,
}]; !ok {
t.Fatal("pending chunk spec is incorrect")
}
}

View File

@ -1,16 +1,17 @@
package main package main
import ( import (
"bitbucket.org/anacrolix/go.torrent"
"bitbucket.org/anacrolix/go.torrent/tracker"
"flag" "flag"
"fmt" "fmt"
metainfo "github.com/nsf/libtorgo/torrent"
"log" "log"
"net" "net"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
metainfo "github.com/nsf/libtorgo/torrent"
"bitbucket.org/anacrolix/go.torrent"
) )
var ( var (
@ -30,7 +31,6 @@ func main() {
} }
client := torrent.Client{ client := torrent.Client{
DataDir: *downloadDir, DataDir: *downloadDir,
// HalfOpenLimit: 2,
} }
client.Start() client.Start()
defer client.Stop() defer client.Stop()
@ -47,6 +47,7 @@ func main() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
client.PrioritizeDataRegion(torrent.BytesInfoHash(metaInfo.InfoHash), 0, 999999999)
err = client.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), func() []torrent.Peer { err = client.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), func() []torrent.Peer {
if *testPeer == "" { if *testPeer == "" {
return nil return nil
@ -56,10 +57,9 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
return []torrent.Peer{{ return []torrent.Peer{{
Peer: tracker.Peer{
IP: addr.IP, IP: addr.IP,
Port: addr.Port, Port: addr.Port,
}}} }}
}()) }())
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)

View File

@ -1,18 +1,20 @@
package torrentfs package torrentfs
import ( import (
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"bitbucket.org/anacrolix/go.torrent"
"bytes" "bytes"
metainfo "github.com/nsf/libtorgo/torrent"
"io"
"io/ioutil" "io/ioutil"
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"testing" "testing"
"time"
"bitbucket.org/anacrolix/go.torrent/testutil"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"bitbucket.org/anacrolix/go.torrent"
metainfo "github.com/nsf/libtorgo/torrent"
) )
func TestTCPAddrString(t *testing.T) { func TestTCPAddrString(t *testing.T) {
@ -37,34 +39,7 @@ func TestTCPAddrString(t *testing.T) {
} }
} }
const dummyFileContents = "hello, world\n"
func createDummyTorrentData(dirName string) string {
f, _ := os.Create(filepath.Join(dirName, "greeting"))
f.WriteString("hello, world\n")
return f.Name()
}
func createMetaInfo(name string, w io.Writer) {
builder := metainfo.Builder{}
builder.AddFile(name)
builder.AddAnnounceGroup([]string{"lol://cheezburger"})
batch, err := builder.Submit()
if err != nil {
panic(err)
}
errs, _ := batch.Start(w, 1)
<-errs
}
func TestDownloadOnDemand(t *testing.T) { func TestDownloadOnDemand(t *testing.T) {
priorNumGoroutines := runtime.NumGoroutine()
defer func() {
n := runtime.NumGoroutine()
if n != priorNumGoroutines {
t.Fatalf("expected %d goroutines, but %d are running", priorNumGoroutines, n)
}
}()
dir, err := ioutil.TempDir("", "torrentfs") dir, err := ioutil.TempDir("", "torrentfs")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -77,9 +52,9 @@ func TestDownloadOnDemand(t *testing.T) {
t.Logf("test directory: %s", dir) t.Logf("test directory: %s", dir)
finishedDir := filepath.Join(dir, "finished") finishedDir := filepath.Join(dir, "finished")
os.Mkdir(finishedDir, 0777) os.Mkdir(finishedDir, 0777)
name := createDummyTorrentData(finishedDir) name := testutil.CreateDummyTorrentData(finishedDir)
metaInfoBuf := &bytes.Buffer{} metaInfoBuf := &bytes.Buffer{}
createMetaInfo(name, metaInfoBuf) testutil.CreateMetaInfo(name, metaInfoBuf)
metaInfo, err := metainfo.Load(metaInfoBuf) metaInfo, err := metainfo.Load(metaInfoBuf)
seeder := torrent.Client{ seeder := torrent.Client{
DataDir: finishedDir, DataDir: finishedDir,
@ -132,11 +107,15 @@ func TestDownloadOnDemand(t *testing.T) {
if fuseConn.MountError != nil { if fuseConn.MountError != nil {
t.Fatal(fuseConn.MountError) t.Fatal(fuseConn.MountError)
} }
go func() {
time.Sleep(10 * time.Second)
fuse.Unmount(mountDir)
}()
content, err := ioutil.ReadFile(filepath.Join(mountDir, "greeting")) content, err := ioutil.ReadFile(filepath.Join(mountDir, "greeting"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if string(content) != dummyFileContents { if string(content) != testutil.GreetingFileContents {
t.FailNow() t.FailNow()
} }
} }

44
testutil/testutil.go Normal file
View File

@ -0,0 +1,44 @@
package testutil
import (
"io"
"io/ioutil"
"os"
"path/filepath"
metainfo "github.com/nsf/libtorgo/torrent"
"bytes"
)
const GreetingFileContents = "hello, world\n"
func CreateDummyTorrentData(dirName string) string {
f, _ := os.Create(filepath.Join(dirName, "greeting"))
f.WriteString("hello, world\n")
return f.Name()
}
func CreateMetaInfo(name string, w io.Writer) {
builder := metainfo.Builder{}
builder.AddFile(name)
builder.AddAnnounceGroup([]string{"lol://cheezburger"})
batch, err := builder.Submit()
if err != nil {
panic(err)
}
errs, _ := batch.Start(w, 1)
<-errs
}
func GreetingTestTorrent() (tempDir string, metaInfo *metainfo.MetaInfo) {
tempDir, err := ioutil.TempDir(os.TempDir(), "")
if err != nil {
panic(err)
}
name := CreateDummyTorrentData(tempDir)
w := &bytes.Buffer{}
CreateMetaInfo(name, w)
metaInfo, _ = metainfo.Load(w)
return
}