Break peer out from PeerConn

This commit is contained in:
Matt Joiner 2020-05-30 10:14:20 +10:00
parent cb37a914c1
commit 02adc3f2ee
5 changed files with 51 additions and 43 deletions

View File

@ -1277,15 +1277,18 @@ func (cl *Client) banPeerIP(ip net.IP) {
func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) { func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
c = &PeerConn{ c = &PeerConn{
conn: nc, peer: peer{
outgoing: outgoing, outgoing: outgoing,
choking: true, choking: true,
peerChoking: true, peerChoking: true,
PeerMaxRequests: 250, PeerMaxRequests: 250,
writeBuffer: new(bytes.Buffer),
remoteAddr: remoteAddr, remoteAddr: remoteAddr,
network: network, network: network,
connString: connString, connString: connString,
},
conn: nc,
writeBuffer: new(bytes.Buffer),
} }
c.logger = cl.logger.WithValues(c).WithDefaultLevel(log.Debug).WithText(func(m log.Msg) string { c.logger = cl.logger.WithValues(c).WithDefaultLevel(log.Debug).WithText(func(m log.Msg) string {
return fmt.Sprintf("%v: %s", c, m.Text()) return fmt.Sprintf("%v: %s", c, m.Text())

View File

@ -546,9 +546,9 @@ func TestPeerInvalidHave(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
assert.True(t, _new) assert.True(t, _new)
defer tt.Drop() defer tt.Drop()
cn := &PeerConn{ cn := &PeerConn{peer: peer{
t: tt, t: tt,
} }}
assert.NoError(t, cn.peerSentHave(0)) assert.NoError(t, cn.peerSentHave(0))
assert.Error(t, cn.peerSentHave(1)) assert.Error(t, cn.peerSentHave(1))
} }

View File

@ -35,22 +35,12 @@ const (
PeerSourcePex = "X" PeerSourcePex = "X"
) )
// Maintains the state of a connection with a peer. type peer struct {
type PeerConn struct { t *Torrent
// First to ensure 64-bit alignment for atomics. See #262.
_stats ConnStats
t *Torrent
// The actual Conn, used for closing, and setting socket options.
conn net.Conn
connString string connString string
outgoing bool outgoing bool
network string network string
remoteAddr net.Addr remoteAddr net.Addr
// The Reader and Writer for this Conn, with hooks installed for stats,
// limiting, deadlines etc.
w io.Writer
r io.Reader
// True if the connection is operating over MSE obfuscation. // True if the connection is operating over MSE obfuscation.
headerEncrypted bool headerEncrypted bool
cryptoMethod mse.CryptoMethod cryptoMethod mse.CryptoMethod
@ -116,11 +106,26 @@ type PeerConn struct {
pieceInclination []int pieceInclination []int
_pieceRequestOrder prioritybitmap.PriorityBitmap _pieceRequestOrder prioritybitmap.PriorityBitmap
logger log.Logger
}
// Maintains the state of a connection with a peer.
type PeerConn struct {
// First to ensure 64-bit alignment for atomics. See #262.
_stats ConnStats
peer
// The actual Conn, used for closing, and setting socket options.
conn net.Conn
// The Reader and Writer for this Conn, with hooks installed for stats,
// limiting, deadlines etc.
w io.Writer
r io.Reader
writeBuffer *bytes.Buffer writeBuffer *bytes.Buffer
uploadTimer *time.Timer uploadTimer *time.Timer
writerCond sync.Cond writerCond sync.Cond
logger log.Logger
} }
func (cn *PeerConn) updateExpectingChunks() { func (cn *PeerConn) updateExpectingChunks() {

View File

@ -155,14 +155,14 @@ func TestConnPexPeerFlags(t *testing.T) {
conn *PeerConn conn *PeerConn
f pp.PexPeerFlags f pp.PexPeerFlags
}{ }{
{&PeerConn{outgoing: false, PeerPrefersEncryption: false}, 0}, {&PeerConn{peer: peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
{&PeerConn{outgoing: false, PeerPrefersEncryption: true}, pp.PexPrefersEncryption}, {&PeerConn{peer: peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
{&PeerConn{outgoing: true, PeerPrefersEncryption: false}, pp.PexOutgoingConn}, {&PeerConn{peer: peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
{&PeerConn{outgoing: true, PeerPrefersEncryption: true}, pp.PexOutgoingConn | pp.PexPrefersEncryption}, {&PeerConn{peer: peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
{&PeerConn{remoteAddr: udpAddr}, pp.PexSupportsUtp}, {&PeerConn{peer: peer{remoteAddr: udpAddr}}, pp.PexSupportsUtp},
{&PeerConn{remoteAddr: udpAddr, outgoing: true}, pp.PexOutgoingConn | pp.PexSupportsUtp}, {&PeerConn{peer: peer{remoteAddr: udpAddr, outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
{&PeerConn{remoteAddr: tcpAddr, outgoing: true}, pp.PexOutgoingConn}, {&PeerConn{peer: peer{remoteAddr: tcpAddr, outgoing: true}}, pp.PexOutgoingConn},
{&PeerConn{remoteAddr: tcpAddr}, 0}, {&PeerConn{peer: peer{remoteAddr: tcpAddr}}, 0},
} }
for i, tc := range testcases { for i, tc := range testcases {
f := tc.conn.pexPeerFlags() f := tc.conn.pexPeerFlags()
@ -184,22 +184,22 @@ func TestConnPexEvent(t *testing.T) {
}{ }{
{ {
pexAdd, pexAdd,
&PeerConn{remoteAddr: udpAddr}, &PeerConn{peer: peer{remoteAddr: udpAddr}},
pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp}, pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp},
}, },
{ {
pexDrop, pexDrop,
&PeerConn{remoteAddr: tcpAddr, outgoing: true, PeerListenPort: dialTcpAddr.Port}, &PeerConn{peer: peer{remoteAddr: tcpAddr, outgoing: true, PeerListenPort: dialTcpAddr.Port}},
pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn}, pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn},
}, },
{ {
pexAdd, pexAdd,
&PeerConn{remoteAddr: tcpAddr, PeerListenPort: dialTcpAddr.Port}, &PeerConn{peer: peer{remoteAddr: tcpAddr, PeerListenPort: dialTcpAddr.Port}},
pexEvent{pexAdd, dialTcpAddr, 0}, pexEvent{pexAdd, dialTcpAddr, 0},
}, },
{ {
pexDrop, pexDrop,
&PeerConn{remoteAddr: udpAddr, PeerListenPort: dialUdpAddr.Port}, &PeerConn{peer: peer{remoteAddr: udpAddr, PeerListenPort: dialUdpAddr.Port}},
pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp}, pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp},
}, },
} }

View File

@ -23,7 +23,7 @@ var (
func TestPexAdded(t *testing.T) { func TestPexAdded(t *testing.T) {
t.Run("noHold", func(t *testing.T) { t.Run("noHold", func(t *testing.T) {
s := new(pexState) s := new(pexState)
s.Add(&PeerConn{remoteAddr: addrs[0], outgoing: true}) s.Add(&PeerConn{peer: peer{remoteAddr: addrs[0], outgoing: true}})
targ := &pexState{ targ := &pexState{
ev: []pexEvent{ ev: []pexEvent{
pexEvent{pexAdd, addrs[0], pp.PexOutgoingConn}, pexEvent{pexAdd, addrs[0], pp.PexOutgoingConn},
@ -39,7 +39,7 @@ func TestPexAdded(t *testing.T) {
}, },
nc: 0, nc: 0,
} }
s.Add(&PeerConn{remoteAddr: addrs[0]}) s.Add(&PeerConn{peer: peer{remoteAddr: addrs[0]}})
targ := &pexState{ targ := &pexState{
hold: []pexEvent{ hold: []pexEvent{
pexEvent{pexDrop, addrs[1], 0}, pexEvent{pexDrop, addrs[1], 0},
@ -59,7 +59,7 @@ func TestPexAdded(t *testing.T) {
}, },
nc: pexTargAdded, nc: pexTargAdded,
} }
s.Add(&PeerConn{remoteAddr: addrs[0]}) s.Add(&PeerConn{peer: peer{remoteAddr: addrs[0]}})
targ := &pexState{ targ := &pexState{
hold: []pexEvent{}, hold: []pexEvent{},
ev: []pexEvent{ ev: []pexEvent{
@ -75,7 +75,7 @@ func TestPexAdded(t *testing.T) {
func TestPexDropped(t *testing.T) { func TestPexDropped(t *testing.T) {
t.Run("belowTarg", func(t *testing.T) { t.Run("belowTarg", func(t *testing.T) {
s := &pexState{nc: 1} s := &pexState{nc: 1}
s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}}) s.Drop(&PeerConn{peer: peer{remoteAddr: addrs[0], pex: pexConnState{Listed: true}}})
targ := &pexState{ targ := &pexState{
hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}}, hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
nc: 0, nc: 0,
@ -84,7 +84,7 @@ func TestPexDropped(t *testing.T) {
}) })
t.Run("aboveTarg", func(t *testing.T) { t.Run("aboveTarg", func(t *testing.T) {
s := &pexState{nc: pexTargAdded + 1} s := &pexState{nc: pexTargAdded + 1}
s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}}) s.Drop(&PeerConn{peer: peer{remoteAddr: addrs[0], pex: pexConnState{Listed: true}}})
targ := &pexState{ targ := &pexState{
ev: []pexEvent{pexEvent{pexDrop, addrs[0], 0}}, ev: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
nc: pexTargAdded, nc: pexTargAdded,
@ -93,7 +93,7 @@ func TestPexDropped(t *testing.T) {
}) })
t.Run("aboveTargNotListed", func(t *testing.T) { t.Run("aboveTargNotListed", func(t *testing.T) {
s := &pexState{nc: pexTargAdded + 1} s := &pexState{nc: pexTargAdded + 1}
s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: false}}) s.Drop(&PeerConn{peer: peer{remoteAddr: addrs[0], pex: pexConnState{Listed: false}}})
targ := &pexState{nc: pexTargAdded + 1} targ := &pexState{nc: pexTargAdded + 1}
require.EqualValues(t, targ, s) require.EqualValues(t, targ, s)
}) })