FedP2P/client_test.go

1020 lines
27 KiB
Go
Raw Normal View History

package torrent
import (
"encoding/binary"
"errors"
"fmt"
2015-06-02 22:16:38 +08:00
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"os"
"strings"
2015-08-12 14:51:12 +08:00
"sync"
"testing"
"time"
2014-08-21 16:24:19 +08:00
_ "github.com/anacrolix/envpprof"
2015-08-03 22:29:01 +08:00
"github.com/anacrolix/missinggo"
2016-03-29 08:14:34 +08:00
"github.com/anacrolix/missinggo/filecache"
"github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/utp"
"github.com/bradfitz/iter"
"github.com/stretchr/testify/assert"
2015-07-15 13:51:42 +08:00
"github.com/stretchr/testify/require"
2015-04-29 22:31:34 +08:00
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/dht"
"github.com/anacrolix/torrent/internal/testutil"
"github.com/anacrolix/torrent/iplist"
2015-06-02 22:16:38 +08:00
"github.com/anacrolix/torrent/metainfo"
2016-03-28 17:38:30 +08:00
"github.com/anacrolix/torrent/storage"
)
func init() {
log.SetFlags(log.LstdFlags | log.Llongfile)
}
var TestingConfig = Config{
ListenAddr: "localhost:0",
NoDHT: true,
DisableTrackers: true,
DataDir: "/tmp/anacrolix",
2016-01-16 21:12:53 +08:00
DHTConfig: dht.ServerConfig{
NoDefaultBootstrap: true,
},
}
2014-08-21 16:07:06 +08:00
func TestClientDefault(t *testing.T) {
cl, err := NewClient(&TestingConfig)
2016-03-28 18:57:04 +08:00
require.NoError(t, err)
2015-03-08 14:28:14 +08:00
cl.Close()
2014-08-21 16:07:06 +08:00
}
func TestAddDropTorrent(t *testing.T) {
2015-03-11 01:22:56 +08:00
cl, err := NewClient(&TestingConfig)
2016-03-28 18:57:04 +08:00
require.NoError(t, err)
2015-03-08 14:28:14 +08:00
defer cl.Close()
dir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(dir)
tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
2016-03-28 18:57:04 +08:00
require.NoError(t, err)
assert.True(t, new)
2016-07-05 22:42:16 +08:00
tt.SetMaxEstablishedConns(0)
tt.SetMaxEstablishedConns(1)
tt.Drop()
}
func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
t.SkipNow()
}
func TestAddTorrentNoUsableURLs(t *testing.T) {
t.SkipNow()
}
func TestAddPeersToUnknownTorrent(t *testing.T) {
t.SkipNow()
}
func TestPieceHashSize(t *testing.T) {
if pieceHash.Size() != 20 {
t.FailNow()
}
}
func TestTorrentInitialState(t *testing.T) {
dir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(dir)
tor := &Torrent{
infoHash: mi.HashInfoBytes(),
pieceStateChanges: pubsub.NewPubSub(),
}
tor.chunkSize = 2
tor.storageOpener = storage.NewFile("/dev/null")
2016-03-28 17:38:30 +08:00
// Needed to lock for asynchronous piece verification.
tor.cl = new(Client)
err := tor.setInfoBytes(mi.InfoBytes)
2016-03-28 17:38:30 +08:00
require.NoError(t, err)
require.Len(t, tor.pieces, 3)
2014-12-09 11:59:01 +08:00
tor.pendAllChunkSpecs(0)
tor.cl.mu.Lock()
assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
tor.cl.mu.Unlock()
assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
}
2014-06-29 17:07:43 +08:00
func TestUnmarshalPEXMsg(t *testing.T) {
var m peerExchangeMessage
if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil {
t.Fatal(err)
}
if len(m.Added) != 2 {
t.FailNow()
}
if m.Added[0].Port != 0x506 {
t.FailNow()
}
}
func TestReducedDialTimeout(t *testing.T) {
for _, _case := range []struct {
Max time.Duration
HalfOpenLimit int
PendingPeers int
ExpectedReduced time.Duration
}{
2014-11-19 11:53:00 +08:00
{nominalDialTimeout, 40, 0, nominalDialTimeout},
{nominalDialTimeout, 40, 1, nominalDialTimeout},
{nominalDialTimeout, 40, 39, nominalDialTimeout},
{nominalDialTimeout, 40, 40, nominalDialTimeout / 2},
{nominalDialTimeout, 40, 80, nominalDialTimeout / 3},
{nominalDialTimeout, 40, 4000, nominalDialTimeout / 101},
} {
reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers)
2014-11-19 11:53:00 +08:00
expected := _case.ExpectedReduced
if expected < minDialTimeout {
expected = minDialTimeout
}
if reduced != expected {
t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced)
}
}
}
func TestUTPRawConn(t *testing.T) {
l, err := utp.NewSocket("udp", "")
if err != nil {
t.Fatal(err)
}
defer l.Close()
go func() {
for {
_, err := l.Accept()
if err != nil {
break
}
}
}()
// Connect a UTP peer to see if the RawConn will still work.
s, _ := utp.NewSocket("udp", "")
2015-08-03 22:48:17 +08:00
defer s.Close()
utpPeer, err := s.Dial(fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
if err != nil {
t.Fatalf("error dialing utp listener: %s", err)
}
defer utpPeer.Close()
peer, err := net.ListenPacket("udp", ":0")
if err != nil {
t.Fatal(err)
}
defer peer.Close()
msgsReceived := 0
// How many messages to send. I've set this to double the channel buffer
// size in the raw packetConn.
const N = 200
readerStopped := make(chan struct{})
// The reader goroutine.
go func() {
defer close(readerStopped)
b := make([]byte, 500)
for i := 0; i < N; i++ {
n, _, err := l.ReadFrom(b)
if err != nil {
t.Fatalf("error reading from raw conn: %s", err)
}
msgsReceived++
var d int
fmt.Sscan(string(b[:n]), &d)
if d != i {
log.Printf("got wrong number: expected %d, got %d", i, d)
}
}
}()
2015-08-03 22:29:01 +08:00
udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", missinggo.AddrPort(l.Addr())))
2014-12-26 14:19:01 +08:00
if err != nil {
t.Fatal(err)
}
for i := 0; i < N; i++ {
2014-12-26 14:19:01 +08:00
_, err := peer.WriteTo([]byte(fmt.Sprintf("%d", i)), udpAddr)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Microsecond)
}
select {
case <-readerStopped:
case <-time.After(time.Second):
t.Fatal("reader timed out")
}
if msgsReceived != N {
t.Fatalf("messages received: %d", msgsReceived)
}
}
2015-01-11 18:42:57 +08:00
func TestTwoClientsArbitraryPorts(t *testing.T) {
for i := 0; i < 2; i++ {
cl, err := NewClient(&TestingConfig)
2015-01-11 18:42:57 +08:00
if err != nil {
t.Fatal(err)
}
2015-03-08 14:28:14 +08:00
defer cl.Close()
2015-01-11 18:42:57 +08:00
}
}
func TestAddDropManyTorrents(t *testing.T) {
cl, err := NewClient(&TestingConfig)
require.NoError(t, err)
2015-03-08 14:28:14 +08:00
defer cl.Close()
for i := range iter.N(1000) {
var spec TorrentSpec
binary.PutVarint(spec.InfoHash[:], int64(i))
tt, new, err := cl.AddTorrentSpec(&spec)
assert.NoError(t, err)
assert.True(t, new)
defer tt.Drop()
}
}
type FileCacheClientStorageFactoryParams struct {
Capacity int64
SetCapacity bool
Wrapper func(*filecache.Cache) storage.Client
}
func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
return func(dataDir string) storage.Client {
fc, err := filecache.NewCache(dataDir)
if err != nil {
panic(err)
}
if ps.SetCapacity {
fc.SetCapacity(ps.Capacity)
}
return ps.Wrapper(fc)
}
}
type storageFactory func(string) storage.Client
func TestClientTransferDefault(t *testing.T) {
2016-03-28 17:38:30 +08:00
testClientTransfer(t, testClientTransferParams{
ExportClientStatus: true,
LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
Wrapper: fileCachePieceResourceStorage,
}),
2016-03-28 17:38:30 +08:00
})
}
2016-05-16 19:50:43 +08:00
func fileCachePieceResourceStorage(fc *filecache.Cache) storage.Client {
2016-05-16 20:02:03 +08:00
return storage.NewResourcePieces(fc.AsResourceProvider())
}
2016-05-16 19:50:43 +08:00
func fileCachePieceFileStorage(fc *filecache.Cache) storage.Client {
2016-05-16 20:02:03 +08:00
return storage.NewFileStorePieces(fc.AsFileStore())
}
func TestClientTransferSmallCache(t *testing.T) {
testClientTransfer(t, testClientTransferParams{
LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
SetCapacity: true,
// Going below the piece length means it can't complete a piece so
// that it can be hashed.
Capacity: 5,
Wrapper: fileCachePieceResourceStorage,
}),
SetReadahead: true,
// Can't readahead too far or the cache will thrash and drop data we
// thought we had.
Readahead: 0,
ExportClientStatus: true,
})
}
func TestClientTransferVarious(t *testing.T) {
2016-08-31 15:48:50 +08:00
for _, ls := range []storageFactory{
NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
Wrapper: fileCachePieceFileStorage,
}),
NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
Wrapper: fileCachePieceResourceStorage,
}),
storage.NewBoltDB,
2016-03-28 18:57:04 +08:00
} {
2016-05-16 19:50:43 +08:00
for _, ss := range []func(string) storage.Client{
storage.NewFile,
storage.NewMMap,
} {
for _, responsive := range []bool{false, true} {
2016-03-28 18:57:04 +08:00
testClientTransfer(t, testClientTransferParams{
2016-08-31 15:48:50 +08:00
Responsive: responsive,
SeederStorage: ss,
LeecherStorage: ls,
2016-03-28 18:57:04 +08:00
})
for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
testClientTransfer(t, testClientTransferParams{
2016-08-31 15:48:50 +08:00
SeederStorage: ss,
Responsive: responsive,
SetReadahead: true,
Readahead: readahead,
LeecherStorage: ls,
})
}
2016-03-28 18:57:04 +08:00
}
}
}
}
type testClientTransferParams struct {
Responsive bool
Readahead int64
SetReadahead bool
ExportClientStatus bool
LeecherStorage func(string) storage.Client
SeederStorage func(string) storage.Client
}
2016-07-05 13:52:33 +08:00
// Creates a seeder and a leecher, and ensures the data transfers when a read
// is attempted on the leecher.
func testClientTransfer(t *testing.T, ps testClientTransferParams) {
greetingTempDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(greetingTempDir)
// Create seeder and a Torrent.
cfg := TestingConfig
2015-06-16 14:57:47 +08:00
cfg.Seed = true
// cfg.ListenAddr = "localhost:4000"
2016-03-28 18:57:04 +08:00
if ps.SeederStorage != nil {
cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
} else {
cfg.DataDir = greetingTempDir
}
seeder, err := NewClient(&cfg)
2016-02-21 19:08:01 +08:00
require.NoError(t, err)
2015-03-08 14:28:14 +08:00
defer seeder.Close()
if ps.ExportClientStatus {
testutil.ExportStatusWriter(seeder, "s")
}
2016-07-05 13:52:33 +08:00
seederTorrent, new, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
2016-03-28 17:38:30 +08:00
require.NoError(t, err)
assert.True(t, new)
// Create leecher and a Torrent.
leecherDataDir, err := ioutil.TempDir("", "")
2016-02-21 19:08:01 +08:00
require.NoError(t, err)
defer os.RemoveAll(leecherDataDir)
cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
// cfg.ListenAddr = "localhost:4001"
2016-02-26 19:10:09 +08:00
leecher, err := NewClient(&cfg)
require.NoError(t, err)
2015-03-08 14:28:14 +08:00
defer leecher.Close()
if ps.ExportClientStatus {
testutil.ExportStatusWriter(leecher, "l")
}
2016-02-21 19:08:01 +08:00
leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
ret = TorrentSpecFromMetaInfo(mi)
ret.ChunkSize = 2
2016-03-28 17:38:30 +08:00
ret.Storage = storage.NewFile(leecherDataDir)
return
}())
2016-02-21 19:08:01 +08:00
require.NoError(t, err)
assert.True(t, new)
// Now do some things with leecher and seeder.
2016-07-06 06:31:30 +08:00
addClientPeer(leecherGreeting, seeder)
r := leecherGreeting.NewReader()
defer r.Close()
if ps.Responsive {
r.SetResponsive()
}
if ps.SetReadahead {
r.SetReadahead(ps.Readahead)
}
2016-07-05 13:52:33 +08:00
assertReadAllGreeting(t, r)
// After one read through, we can assume certain torrent statistics.
2016-07-12 14:42:04 +08:00
// These are not a strict requirement. It is however interesting to
// follow.
// t.Logf("%#v", seederTorrent.Stats())
2016-07-12 14:42:04 +08:00
assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
2016-07-06 06:31:30 +08:00
// Read through again for the cases where the torrent data size exceeds
// the size of the cache.
2016-07-05 13:52:33 +08:00
assertReadAllGreeting(t, r)
}
func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
pos, err := r.Seek(0, os.SEEK_SET)
assert.NoError(t, err)
assert.EqualValues(t, 0, pos)
_greeting, err := ioutil.ReadAll(r)
assert.NoError(t, err)
assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
}
2015-08-12 14:51:12 +08:00
// Check that after completing leeching, a leecher transitions to a seeding
// correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
func TestSeedAfterDownloading(t *testing.T) {
greetingTempDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(greetingTempDir)
cfg := TestingConfig
cfg.Seed = true
cfg.DataDir = greetingTempDir
seeder, err := NewClient(&cfg)
2016-03-30 16:16:40 +08:00
require.NoError(t, err)
2015-08-12 14:51:12 +08:00
defer seeder.Close()
testutil.ExportStatusWriter(seeder, "s")
2015-08-12 14:51:12 +08:00
seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
cfg.DataDir, err = ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(cfg.DataDir)
2016-03-30 16:16:40 +08:00
leecher, err := NewClient(&cfg)
require.NoError(t, err)
2015-08-12 14:51:12 +08:00
defer leecher.Close()
testutil.ExportStatusWriter(leecher, "l")
2015-08-12 14:51:12 +08:00
cfg.Seed = false
2016-03-28 17:38:30 +08:00
// cfg.TorrentDataOpener = nil
2015-08-12 14:51:12 +08:00
cfg.DataDir, err = ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(cfg.DataDir)
leecherLeecher, _ := NewClient(&cfg)
defer leecherLeecher.Close()
testutil.ExportStatusWriter(leecherLeecher, "ll")
2015-08-12 14:51:12 +08:00
leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
ret = TorrentSpecFromMetaInfo(mi)
ret.ChunkSize = 2
return
}())
llg, _, _ := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
ret = TorrentSpecFromMetaInfo(mi)
ret.ChunkSize = 3
return
}())
// Simultaneously DownloadAll in Leecher, and read the contents
// consecutively in LeecherLeecher. This non-deterministically triggered a
// case where the leecher wouldn't unchoke the LeecherLeecher.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
r := llg.NewReader()
defer r.Close()
b, err := ioutil.ReadAll(r)
require.NoError(t, err)
2016-01-18 22:41:33 +08:00
assert.EqualValues(t, testutil.GreetingFileContents, b)
2015-08-12 14:51:12 +08:00
}()
2016-07-06 06:31:30 +08:00
addClientPeer(leecherGreeting, seeder)
addClientPeer(leecherGreeting, leecherLeecher)
2015-08-12 14:51:12 +08:00
wg.Add(1)
go func() {
defer wg.Done()
leecherGreeting.DownloadAll()
leecher.WaitAll()
}()
wg.Wait()
}
func TestMergingTrackersByAddingSpecs(t *testing.T) {
cl, err := NewClient(&TestingConfig)
require.NoError(t, err)
defer cl.Close()
spec := TorrentSpec{}
T, new, _ := cl.AddTorrentSpec(&spec)
if !new {
t.FailNow()
}
spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
_, new, _ = cl.AddTorrentSpec(&spec)
assert.False(t, new)
assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
// Because trackers are disabled in TestingConfig.
assert.EqualValues(t, 0, len(T.trackerAnnouncers))
}
2015-06-02 22:16:38 +08:00
2016-03-28 17:38:30 +08:00
type badStorage struct{}
func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.Torrent, error) {
return bs, nil
}
func (bs badStorage) Close() error {
return nil
}
func (bs badStorage) Piece(p metainfo.Piece) storage.Piece {
2016-03-28 17:38:30 +08:00
return badStoragePiece{p}
}
2016-03-28 17:38:30 +08:00
type badStoragePiece struct {
p metainfo.Piece
}
2015-06-02 22:16:38 +08:00
func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
2015-06-02 22:16:38 +08:00
return 0, nil
}
func (p badStoragePiece) GetIsComplete() bool {
2015-06-02 22:16:38 +08:00
return true
}
func (p badStoragePiece) MarkComplete() error {
return errors.New("psyyyyyyyche")
}
func (p badStoragePiece) randomlyTruncatedDataString() string {
return "hello, world\n"[:rand.Intn(14)]
2015-06-02 22:16:38 +08:00
}
func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
r := strings.NewReader(p.randomlyTruncatedDataString())
return r.ReadAt(b, off+p.p.Offset())
2015-06-02 22:16:38 +08:00
}
// We read from a piece which is marked completed, but is missing data.
func TestCompletedPieceWrongSize(t *testing.T) {
cfg := TestingConfig
2016-03-28 17:38:30 +08:00
cfg.DefaultStorage = badStorage{}
cl, err := NewClient(&cfg)
require.NoError(t, err)
2015-06-02 22:16:38 +08:00
defer cl.Close()
info := metainfo.Info{
PieceLength: 15,
Pieces: make([]byte, 20),
Files: []metainfo.FileInfo{
metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
2015-06-02 22:16:38 +08:00
},
2016-05-09 13:47:39 +08:00
}
b, err := bencode.Marshal(info)
2016-05-09 13:47:39 +08:00
tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
InfoBytes: b,
InfoHash: metainfo.HashBytes(b),
2015-06-02 22:16:38 +08:00
})
require.NoError(t, err)
defer tt.Drop()
assert.True(t, new)
2015-06-02 22:16:38 +08:00
r := tt.NewReader()
defer r.Close()
b, err = ioutil.ReadAll(r)
assert.Len(t, b, 13)
assert.NoError(t, err)
2015-06-02 22:16:38 +08:00
}
func BenchmarkAddLargeTorrent(b *testing.B) {
cfg := TestingConfig
cfg.DisableTCP = true
cfg.DisableUTP = true
cfg.ListenAddr = "redonk"
cl, err := NewClient(&cfg)
require.NoError(b, err)
defer cl.Close()
for range iter.N(b.N) {
t, err := cl.AddTorrentFromFile("testdata/bootstrap.dat.torrent")
if err != nil {
b.Fatal(err)
}
t.Drop()
}
}
2015-07-15 13:51:42 +08:00
func TestResponsive(t *testing.T) {
seederDataDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(seederDataDir)
cfg := TestingConfig
cfg.Seed = true
cfg.DataDir = seederDataDir
seeder, err := NewClient(&cfg)
require.Nil(t, err)
defer seeder.Close()
seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
leecherDataDir, err := ioutil.TempDir("", "")
require.Nil(t, err)
defer os.RemoveAll(leecherDataDir)
cfg = TestingConfig
cfg.DataDir = leecherDataDir
leecher, err := NewClient(&cfg)
require.Nil(t, err)
defer leecher.Close()
leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
ret = TorrentSpecFromMetaInfo(mi)
ret.ChunkSize = 2
return
}())
2016-07-06 06:31:30 +08:00
addClientPeer(leecherTorrent, seeder)
2015-07-15 13:51:42 +08:00
reader := leecherTorrent.NewReader()
2016-02-04 22:17:26 +08:00
defer reader.Close()
2015-07-15 13:51:42 +08:00
reader.SetReadahead(0)
reader.SetResponsive()
b := make([]byte, 2)
_, err = reader.Seek(3, os.SEEK_SET)
require.NoError(t, err)
_, err = io.ReadFull(reader, b)
2015-07-15 13:51:42 +08:00
assert.Nil(t, err)
assert.EqualValues(t, "lo", string(b))
_, err = reader.Seek(11, os.SEEK_SET)
require.NoError(t, err)
n, err := io.ReadFull(reader, b)
2015-07-15 13:51:42 +08:00
assert.Nil(t, err)
assert.EqualValues(t, 2, n)
assert.EqualValues(t, "d\n", string(b))
}
func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
seederDataDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(seederDataDir)
cfg := TestingConfig
cfg.Seed = true
cfg.DataDir = seederDataDir
seeder, err := NewClient(&cfg)
require.Nil(t, err)
defer seeder.Close()
seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
leecherDataDir, err := ioutil.TempDir("", "")
require.Nil(t, err)
defer os.RemoveAll(leecherDataDir)
cfg = TestingConfig
cfg.DataDir = leecherDataDir
leecher, err := NewClient(&cfg)
require.Nil(t, err)
defer leecher.Close()
leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
ret = TorrentSpecFromMetaInfo(mi)
ret.ChunkSize = 2
return
}())
2016-07-06 06:31:30 +08:00
addClientPeer(leecherTorrent, seeder)
reader := leecherTorrent.NewReader()
2016-02-04 22:17:26 +08:00
defer reader.Close()
reader.SetReadahead(0)
reader.SetResponsive()
b := make([]byte, 2)
_, err = reader.Seek(3, os.SEEK_SET)
require.NoError(t, err)
_, err = io.ReadFull(reader, b)
assert.Nil(t, err)
assert.EqualValues(t, "lo", string(b))
go leecherTorrent.Drop()
_, err = reader.Seek(11, os.SEEK_SET)
require.NoError(t, err)
n, err := reader.Read(b)
assert.EqualError(t, err, "torrent closed")
assert.EqualValues(t, 0, n)
}
func TestDHTInheritBlocklist(t *testing.T) {
ipl := iplist.New(nil)
require.NotNil(t, ipl)
2016-01-16 21:12:53 +08:00
cfg := TestingConfig
cfg.IPBlocklist = ipl
cfg.NoDHT = false
cl, err := NewClient(&cfg)
require.NoError(t, err)
defer cl.Close()
require.Equal(t, ipl, cl.DHT().IPBlocklist())
}
2015-08-23 10:50:32 +08:00
// Check that stuff is merged in subsequent AddTorrentSpec for the same
// infohash.
func TestAddTorrentSpecMerging(t *testing.T) {
cl, err := NewClient(&TestingConfig)
require.NoError(t, err)
defer cl.Close()
dir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(dir)
tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
InfoHash: mi.HashInfoBytes(),
})
2015-08-23 10:50:32 +08:00
require.NoError(t, err)
require.True(t, new)
require.Nil(t, tt.Info())
_, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
require.NoError(t, err)
require.False(t, new)
require.NotNil(t, tt.Info())
}
func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
dir, mi := testutil.GreetingTestTorrent()
os.RemoveAll(dir)
cl, _ := NewClient(&TestingConfig)
defer cl.Close()
tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
InfoHash: mi.HashInfoBytes(),
})
tt.Drop()
assert.EqualValues(t, 0, len(cl.Torrents()))
select {
case <-tt.GotInfo():
t.FailNow()
default:
}
}
func writeTorrentData(ts storage.Torrent, info metainfo.Info, b []byte) {
2016-03-29 08:14:34 +08:00
for i := range iter.N(info.NumPieces()) {
2016-03-30 16:12:57 +08:00
n, _ := ts.Piece(info.Piece(i)).WriteAt(b, 0)
2016-03-29 08:14:34 +08:00
b = b[n:]
}
}
2016-05-16 19:50:43 +08:00
func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.Client) {
2016-03-29 08:14:34 +08:00
fileCacheDir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(fileCacheDir)
fileCache, err := filecache.NewCache(fileCacheDir)
require.NoError(t, err)
greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
defer os.RemoveAll(greetingDataTempDir)
filePieceStore := csf(fileCache)
info := greetingMetainfo.UnmarshalInfo()
ih := greetingMetainfo.HashInfoBytes()
greetingData, err := filePieceStore.OpenTorrent(&info, ih)
2016-03-29 08:14:34 +08:00
require.NoError(t, err)
writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
2016-03-29 08:14:34 +08:00
// require.Equal(t, len(testutil.GreetingFileContents), written)
// require.NoError(t, err)
for i := 0; i < info.NumPieces(); i++ {
p := info.Piece(i)
2016-03-29 08:14:34 +08:00
if alreadyCompleted {
err := greetingData.Piece(p).MarkComplete()
assert.NoError(t, err)
}
}
cfg := TestingConfig
// TODO: Disable network option?
cfg.DisableTCP = true
cfg.DisableUTP = true
cfg.DefaultStorage = filePieceStore
cl, err := NewClient(&cfg)
require.NoError(t, err)
defer cl.Close()
tt, err := cl.AddTorrent(greetingMetainfo)
require.NoError(t, err)
psrs := tt.PieceStateRuns()
assert.Len(t, psrs, 1)
assert.EqualValues(t, 3, psrs[0].Length)
assert.Equal(t, alreadyCompleted, psrs[0].Complete)
if alreadyCompleted {
r := tt.NewReader()
b, err := ioutil.ReadAll(r)
assert.NoError(t, err)
assert.EqualValues(t, testutil.GreetingFileContents, b)
}
}
func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
testAddTorrentPriorPieceCompletion(t, true, fileCachePieceFileStorage)
testAddTorrentPriorPieceCompletion(t, true, fileCachePieceResourceStorage)
2016-03-29 08:14:34 +08:00
}
func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
testAddTorrentPriorPieceCompletion(t, false, fileCachePieceFileStorage)
testAddTorrentPriorPieceCompletion(t, false, fileCachePieceResourceStorage)
2016-03-29 08:14:34 +08:00
}
func TestAddMetainfoWithNodes(t *testing.T) {
cfg := TestingConfig
cfg.NoDHT = false
// For now, we want to just jam the nodes into the table, without
// verifying them first. Also the DHT code doesn't support mixing secure
// and insecure nodes if security is enabled (yet).
cfg.DHTConfig.NoSecurity = true
cl, err := NewClient(&cfg)
require.NoError(t, err)
defer cl.Close()
assert.EqualValues(t, cl.DHT().NumNodes(), 0)
tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
require.NoError(t, err)
assert.Len(t, tt.metainfo.AnnounceList, 5)
assert.EqualValues(t, 6, cl.DHT().NumNodes())
}
2016-02-26 19:10:29 +08:00
type testDownloadCancelParams struct {
ExportClientStatus bool
SetLeecherStorageCapacity bool
LeecherStorageCapacity int64
Cancel bool
}
func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
greetingTempDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(greetingTempDir)
cfg := TestingConfig
cfg.Seed = true
cfg.DataDir = greetingTempDir
seeder, err := NewClient(&cfg)
require.NoError(t, err)
defer seeder.Close()
if ps.ExportClientStatus {
testutil.ExportStatusWriter(seeder, "s")
}
seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
leecherDataDir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(leecherDataDir)
2016-03-29 08:14:34 +08:00
fc, err := filecache.NewCache(leecherDataDir)
require.NoError(t, err)
if ps.SetLeecherStorageCapacity {
fc.SetCapacity(ps.LeecherStorageCapacity)
}
2016-05-16 20:02:03 +08:00
cfg.DefaultStorage = storage.NewFileStorePieces(fc.AsFileStore())
2016-03-28 17:38:30 +08:00
cfg.DataDir = leecherDataDir
2016-02-26 19:10:29 +08:00
leecher, _ := NewClient(&cfg)
defer leecher.Close()
if ps.ExportClientStatus {
testutil.ExportStatusWriter(leecher, "l")
}
leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
ret = TorrentSpecFromMetaInfo(mi)
ret.ChunkSize = 2
return
}())
require.NoError(t, err)
assert.True(t, new)
psc := leecherGreeting.SubscribePieceStateChanges()
defer psc.Close()
leecherGreeting.DownloadAll()
if ps.Cancel {
leecherGreeting.CancelPieces(0, leecherGreeting.NumPieces())
}
2016-07-06 06:31:30 +08:00
addClientPeer(leecherGreeting, seeder)
2016-02-26 19:10:29 +08:00
completes := make(map[int]bool, 3)
values:
for {
2016-03-28 17:38:30 +08:00
// started := time.Now()
2016-02-26 19:10:29 +08:00
select {
case _v := <-psc.Values:
2016-03-28 17:38:30 +08:00
// log.Print(time.Since(started))
2016-02-26 19:10:29 +08:00
v := _v.(PieceStateChange)
completes[v.Index] = v.Complete
2016-02-26 19:18:08 +08:00
case <-time.After(100 * time.Millisecond):
2016-02-26 19:10:29 +08:00
break values
}
}
if ps.Cancel {
assert.EqualValues(t, map[int]bool{0: false, 1: false, 2: false}, completes)
} else {
assert.EqualValues(t, map[int]bool{0: true, 1: true, 2: true}, completes)
}
}
func TestTorrentDownloadAll(t *testing.T) {
testDownloadCancel(t, testDownloadCancelParams{})
}
func TestTorrentDownloadAllThenCancel(t *testing.T) {
testDownloadCancel(t, testDownloadCancelParams{
Cancel: true,
})
}
// Ensure that it's an error for a peer to send an invalid have message.
func TestPeerInvalidHave(t *testing.T) {
cl, err := NewClient(&TestingConfig)
require.NoError(t, err)
defer cl.Close()
info := metainfo.Info{
PieceLength: 1,
Pieces: make([]byte, 20),
Files: []metainfo.FileInfo{{Length: 1}},
2016-05-09 13:47:39 +08:00
}
infoBytes, err := bencode.Marshal(info)
require.NoError(t, err)
2016-05-09 13:47:39 +08:00
tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
InfoBytes: infoBytes,
InfoHash: metainfo.HashBytes(infoBytes),
})
require.NoError(t, err)
assert.True(t, _new)
defer tt.Drop()
cn := &connection{
2016-04-03 16:40:43 +08:00
t: tt,
}
assert.NoError(t, cn.peerSentHave(0))
assert.Error(t, cn.peerSentHave(1))
}
2016-03-28 17:38:30 +08:00
func TestPieceCompletedInStorageButNotClient(t *testing.T) {
greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
defer os.RemoveAll(greetingTempDir)
cfg := TestingConfig
cfg.DataDir = greetingTempDir
seeder, err := NewClient(&TestingConfig)
require.NoError(t, err)
seeder.AddTorrentSpec(&TorrentSpec{
InfoBytes: greetingMetainfo.InfoBytes,
2016-03-28 17:38:30 +08:00
})
}
func TestPrepareTrackerAnnounce(t *testing.T) {
cl := &Client{}
blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
require.NoError(t, err)
assert.False(t, blocked)
assert.EqualValues(t, "localhost:1234", host)
assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
}
2016-05-24 17:35:23 +08:00
// Check that when the listen port is 0, all the protocols listened on have
// the same port, and it isn't zero.
func TestClientDynamicListenPortAllProtocols(t *testing.T) {
cl, err := NewClient(&TestingConfig)
require.NoError(t, err)
defer cl.Close()
assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
}
func TestClientDynamicListenTCPOnly(t *testing.T) {
cfg := TestingConfig
cfg.DisableUTP = true
cl, err := NewClient(&cfg)
require.NoError(t, err)
defer cl.Close()
assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
assert.Nil(t, cl.utpSock)
}
func TestClientDynamicListenUTPOnly(t *testing.T) {
cfg := TestingConfig
cfg.DisableTCP = true
cl, err := NewClient(&cfg)
require.NoError(t, err)
defer cl.Close()
assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
assert.Nil(t, cl.tcpListener)
}
func TestClientDynamicListenPortNoProtocols(t *testing.T) {
cfg := TestingConfig
cfg.DisableTCP = true
cfg.DisableUTP = true
cl, err := NewClient(&cfg)
require.NoError(t, err)
defer cl.Close()
assert.Nil(t, cl.ListenAddr())
}
func addClientPeer(t *Torrent, cl *Client) {
t.AddPeers([]Peer{
Peer{
IP: missinggo.AddrIP(cl.ListenAddr()),
Port: missinggo.AddrPort(cl.ListenAddr()),
},
})
}
func printConnPeerCounts(t *Torrent) {
t.cl.mu.Lock()
log.Println(len(t.conns), len(t.peers))
t.cl.mu.Unlock()
}
func totalConns(tts []*Torrent) (ret int) {
for _, tt := range tts {
tt.cl.mu.Lock()
ret += len(tt.conns)
tt.cl.mu.Unlock()
}
return
}
func TestSetMaxEstablishedConn(t *testing.T) {
var tts []*Torrent
ih := testutil.GreetingMetaInfo().HashInfoBytes()
cfg := TestingConfig
for i := range iter.N(3) {
cl, err := NewClient(&cfg)
require.NoError(t, err)
defer cl.Close()
tt, _ := cl.AddTorrentInfoHash(ih)
tt.SetMaxEstablishedConns(2)
testutil.ExportStatusWriter(cl, fmt.Sprintf("%d", i))
tts = append(tts, tt)
}
addPeers := func() {
for i, tt := range tts {
for _, _tt := range tts[:i] {
addClientPeer(tt, _tt.cl)
}
}
}
waitTotalConns := func(num int) {
for totalConns(tts) != num {
time.Sleep(time.Millisecond)
}
}
addPeers()
waitTotalConns(6)
tts[0].SetMaxEstablishedConns(1)
waitTotalConns(4)
tts[0].SetMaxEstablishedConns(0)
waitTotalConns(2)
tts[0].SetMaxEstablishedConns(1)
addPeers()
waitTotalConns(4)
tts[0].SetMaxEstablishedConns(2)
addPeers()
waitTotalConns(6)
}