FedP2P/peerconn_test.go

249 lines
7.1 KiB
Go
Raw Normal View History

package torrent
import (
2021-09-30 10:01:34 +08:00
"errors"
"io"
"net"
"sync"
"testing"
2014-12-26 14:17:00 +08:00
"github.com/frankban/quicktest"
qt "github.com/frankban/quicktest"
2019-08-21 18:58:40 +08:00
"github.com/stretchr/testify/require"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/storage"
)
// Ensure that no race exists between sending a bitfield, and a subsequent
// Have that would potentially alter it.
func TestSendBitfieldThenHave(t *testing.T) {
var cl Client
cl.init(TestingConfig(t))
cl.initLogger()
c := cl.newConnection(nil, false, nil, "io.Pipe", "")
c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
2021-09-19 13:16:37 +08:00
if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
t.Log(err)
}
r, w := io.Pipe()
2021-11-08 11:47:01 +08:00
// c.r = r
c.w = w
2021-05-20 16:51:08 +08:00
c.startWriter()
2020-02-21 08:51:24 +08:00
c.locker().Lock()
c.t._completedPieces.Add(1)
c.postBitfield( /*[]bool{false, true, false}*/ )
2020-02-21 08:51:24 +08:00
c.locker().Unlock()
c.locker().Lock()
c.have(2)
2020-02-21 08:51:24 +08:00
c.locker().Unlock()
b := make([]byte, 15)
n, err := io.ReadFull(r, b)
2020-02-21 08:51:24 +08:00
c.locker().Lock()
// This will cause connection.writer to terminate.
c.closed.Set()
2020-02-21 08:51:24 +08:00
c.locker().Unlock()
require.NoError(t, err)
require.EqualValues(t, 15, n)
// Here we see that the bitfield doesn't have piece 2 set, as that should
// arrive in the following Have message.
require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
}
type torrentStorage struct {
writeSem sync.Mutex
}
func (me *torrentStorage) Close() error { return nil }
func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
return me
}
func (me *torrentStorage) Completion() storage.Completion {
return storage.Completion{}
}
func (me *torrentStorage) MarkComplete() error {
return nil
}
func (me *torrentStorage) MarkNotComplete() error {
return nil
}
func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
panic("shouldn't be called")
}
func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
if len(b) != defaultChunkSize {
panic(len(b))
}
me.writeSem.Unlock()
return len(b), nil
}
func BenchmarkConnectionMainReadLoop(b *testing.B) {
c := quicktest.New(b)
var cl Client
cl.init(&ClientConfig{
DownloadRateLimiter: unlimited,
})
cl.initLogger()
ts := &torrentStorage{}
t := cl.newTorrent(metainfo.Hash{}, nil)
t.initialPieceCheckDisabled = true
2018-01-25 14:10:37 +08:00
require.NoError(b, t.setInfo(&metainfo.Info{
Pieces: make([]byte, 20),
Length: 1 << 20,
PieceLength: 1 << 20,
}))
t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
t.onSetInfo()
t._pendingPieces.Add(0)
r, w := net.Pipe()
cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r))
cn.setTorrent(t)
2021-09-30 10:01:34 +08:00
mrlErrChan := make(chan error)
2018-06-25 12:09:08 +08:00
msg := pp.Message{
Type: pp.Piece,
Piece: make([]byte, defaultChunkSize),
}
go func() {
2018-07-25 11:41:50 +08:00
cl.lock()
err := cn.mainReadLoop()
if err != nil {
2021-09-30 10:01:34 +08:00
mrlErrChan <- err
}
2021-09-30 10:01:34 +08:00
close(mrlErrChan)
}()
2018-06-25 12:09:08 +08:00
wb := msg.MustMarshalBinary()
b.SetBytes(int64(len(msg.Piece)))
2018-06-25 12:09:08 +08:00
go func() {
ts.writeSem.Lock()
for i := 0; i < b.N; i += 1 {
2018-07-25 11:41:50 +08:00
cl.lock()
2018-06-25 12:09:08 +08:00
// The chunk must be written to storage everytime, to ensure the
// writeSem is unlocked.
t.pendAllChunkSpecs(0)
2021-09-19 13:16:37 +08:00
cn.validReceiveChunks = map[RequestIndex]int{
t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
}
2018-07-25 11:41:50 +08:00
cl.unlock()
2018-06-25 12:09:08 +08:00
n, err := w.Write(wb)
require.NoError(b, err)
require.EqualValues(b, len(wb), n)
ts.writeSem.Lock()
}
if err := w.Close(); err != nil {
panic(err)
}
2018-06-25 12:09:08 +08:00
}()
2021-09-30 10:01:34 +08:00
mrlErr := <-mrlErrChan
if mrlErr != nil && !errors.Is(mrlErr, io.EOF) {
c.Fatal(mrlErr)
}
c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
}
2020-04-09 00:03:29 +08:00
func TestConnPexPeerFlags(t *testing.T) {
var (
tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
)
2021-11-08 11:47:01 +08:00
testcases := []struct {
conn *PeerConn
f pp.PexPeerFlags
}{
2021-01-20 10:10:32 +08:00
{&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
{&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
{&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
{&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
{&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
{&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
{&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
{&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
}
for i, tc := range testcases {
f := tc.conn.pexPeerFlags()
require.EqualValues(t, tc.f, f, i)
}
}
2020-04-09 00:03:29 +08:00
func TestConnPexEvent(t *testing.T) {
var (
udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
)
2021-11-08 11:47:01 +08:00
testcases := []struct {
2020-04-09 00:03:29 +08:00
t pexEventType
c *PeerConn
e pexEvent
}{
{
pexAdd,
&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
2021-12-07 02:46:25 +08:00
pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil},
2020-04-09 00:03:29 +08:00
},
{
pexDrop,
&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
2021-12-07 02:46:25 +08:00
pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
2020-04-09 00:03:29 +08:00
},
{
pexAdd,
&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
2021-12-07 02:46:25 +08:00
pexEvent{pexAdd, dialTcpAddr, 0, nil},
2020-04-09 00:03:29 +08:00
},
{
pexDrop,
&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
2021-12-07 02:46:25 +08:00
pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
2020-04-09 00:03:29 +08:00
},
}
for i, tc := range testcases {
e := tc.c.pexEvent(tc.t)
require.EqualValues(t, tc.e, e, i)
}
}
func TestHaveAllThenBitfield(t *testing.T) {
c := qt.New(t)
cl := newTestingClient(t)
tt := cl.newTorrentForTesting()
// cl.newConnection()
pc := PeerConn{
Peer: Peer{t: tt},
}
pc.peerImpl = &pc
tt.conns[&pc] = struct{}{}
c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
c.Check(pc.peerMinPieces, qt.Equals, 6)
c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
c.Assert(pc.t.setInfo(&metainfo.Info{
PieceLength: 0,
Pieces: make([]byte, pieceHash.Size()*7),
}), qt.IsNil)
pc.t.onSetInfo()
c.Check(tt.numPieces(), qt.Equals, 7)
c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
// The last element of the bitfield is irrelevant, as the Torrent actually only has 7
// pieces.
{2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
})
}
func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
c := qt.New(t)
c.Check(interestedMsgLen, qt.Equals, 5)
c.Check(requestMsgLen, qt.Equals, 17)
c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
}