PEX: fluid event log

This commit is contained in:
Yaroslav Kolomiiets 2021-12-06 18:46:25 +00:00 committed by Matt Joiner
parent 7197c5f493
commit 4a06517856
5 changed files with 190 additions and 310 deletions

View File

@ -1634,7 +1634,7 @@ func (c *PeerConn) dialAddr() PeerRemoteAddr {
func (c *PeerConn) pexEvent(t pexEventType) pexEvent {
f := c.pexPeerFlags()
addr := c.dialAddr()
return pexEvent{t, addr, f}
return pexEvent{t, addr, f, nil}
}
func (c *PeerConn) String() string {

View File

@ -189,22 +189,22 @@ func TestConnPexEvent(t *testing.T) {
{
pexAdd,
&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp},
pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil},
},
{
pexDrop,
&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn},
pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
},
{
pexAdd,
&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
pexEvent{pexAdd, dialTcpAddr, 0},
pexEvent{pexAdd, dialTcpAddr, 0, nil},
},
{
pexDrop,
&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp},
pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
},
}
for i, tc := range testcases {

88
pex.go
View File

@ -28,6 +28,7 @@ type pexEvent struct {
t pexEventType
addr PeerRemoteAddr
f pp.PexPeerFlags
next *pexEvent // event feed list
}
// facilitates efficient de-duplication while generating PEX messages
@ -145,7 +146,7 @@ func (me *pexMsgFactory) drop(e pexEvent) {
me.dropped[key] = struct{}{}
}
func (me *pexMsgFactory) addEvent(event pexEvent) {
func (me *pexMsgFactory) append(event pexEvent) {
switch event.t {
case pexAdd:
me.add(event)
@ -178,36 +179,47 @@ func shortestIP(ip net.IP) net.IP {
// Per-torrent PEX state
type pexState struct {
ev []pexEvent // event feed, append-only
hold []pexEvent // delayed drops
rest time.Time // cooldown deadline on inbound
nc int // net number of alive conns
initCache pexMsgFactory // last generated initial message
initSeq int // number of events which went into initCache
initLock sync.RWMutex // serialise access to initCache and initSeq
sync.RWMutex
tail *pexEvent // event feed list
hold []pexEvent // delayed drops
rest time.Time // cooldown deadline on inbound
nc int // net number of alive conns
msg0 pexMsgFactory // initial message
}
// Reset wipes the state clean, releasing resources. Called from Torrent.Close().
func (s *pexState) Reset() {
s.ev = nil
s.Lock()
defer s.Unlock()
s.tail = nil
s.hold = nil
s.nc = 0
s.rest = time.Time{}
s.initLock.Lock()
s.initCache = pexMsgFactory{}
s.initSeq = 0
s.initLock.Unlock()
s.msg0 = pexMsgFactory{}
}
func (s *pexState) append(e *pexEvent) {
if s.tail != nil {
s.tail.next = e
}
s.tail = e
s.msg0.append(*e)
}
func (s *pexState) Add(c *PeerConn) {
s.Lock()
defer s.Unlock()
s.nc++
if s.nc >= pexTargAdded {
s.ev = append(s.ev, s.hold...)
for _, e := range s.hold {
ne := e
s.append(&ne)
}
s.hold = s.hold[:0]
}
e := c.pexEvent(pexAdd)
s.ev = append(s.ev, e)
c.pex.Listed = true
s.append(&e)
}
func (s *pexState) Drop(c *PeerConn) {
@ -215,44 +227,34 @@ func (s *pexState) Drop(c *PeerConn) {
// skip connections which were not previously Added
return
}
s.Lock()
defer s.Unlock()
e := c.pexEvent(pexDrop)
s.nc--
if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
s.hold = append(s.hold, e)
} else {
s.ev = append(s.ev, e)
s.append(&e)
}
}
// Generate a PEX message based on the event feed. Also returns an index to pass to the subsequent
// calls, producing incremental deltas.
func (s *pexState) Genmsg(start int) (pp.PexMsg, int) {
if start == 0 {
return s.genmsg0()
// Generate a PEX message based on the event feed.
// Also returns a pointer to pass to the subsequent calls
// to produce incremental deltas.
func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
s.RLock()
defer s.RUnlock()
if start == nil {
return s.msg0.PexMsg(), s.tail
}
var factory pexMsgFactory
n := start
for _, e := range s.ev[start:] {
if start > 0 && factory.DeltaLen() >= pexMaxDelta {
var msg pexMsgFactory
last := start
for e := start.next; e != nil; e = e.next {
if msg.DeltaLen() >= pexMaxDelta {
break
}
factory.addEvent(e)
n++
msg.append(*e)
last = e
}
return factory.PexMsg(), n
}
func (s *pexState) genmsg0() (pp.PexMsg, int) {
s.initLock.Lock()
for _, e := range s.ev[s.initSeq:] {
s.initCache.addEvent(e)
s.initSeq++
}
s.initLock.Unlock()
s.initLock.RLock()
n := s.initSeq
msg := s.initCache.PexMsg()
s.initLock.RUnlock()
return msg, n
return msg.PexMsg(), last
}

View File

@ -30,94 +30,18 @@ var (
addrs4[0],
addrs4[1],
}
f = pp.PexOutgoingConn
)
func TestPexAdded(t *testing.T) {
t.Run("noHold", func(t *testing.T) {
s := new(pexState)
s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0], outgoing: true}})
targ := &pexState{
ev: []pexEvent{
{pexAdd, addrs[0], pp.PexOutgoingConn},
},
nc: 1,
}
require.EqualValues(t, targ, s)
})
t.Run("belowTarg", func(t *testing.T) {
s := &pexState{
hold: []pexEvent{
{pexDrop, addrs[1], 0},
},
nc: 0,
}
s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}})
targ := &pexState{
hold: []pexEvent{
{pexDrop, addrs[1], 0},
},
ev: []pexEvent{
{pexAdd, addrs[0], 0},
},
nc: 1,
}
require.EqualValues(t, targ, s)
})
t.Run("aboveTarg", func(t *testing.T) {
holdAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
s := &pexState{
hold: []pexEvent{
{pexDrop, holdAddr, 0},
},
nc: pexTargAdded,
}
s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}})
targ := &pexState{
hold: []pexEvent{},
ev: []pexEvent{
{pexDrop, holdAddr, 0},
{pexAdd, addrs[0], 0},
},
nc: pexTargAdded + 1,
}
require.EqualValues(t, targ, s)
})
}
func TestPexDropped(t *testing.T) {
t.Run("belowTarg", func(t *testing.T) {
s := &pexState{nc: 1}
s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}, pex: pexConnState{Listed: true}})
targ := &pexState{
hold: []pexEvent{{pexDrop, addrs[0], 0}},
nc: 0,
}
require.EqualValues(t, targ, s)
})
t.Run("aboveTarg", func(t *testing.T) {
s := &pexState{nc: pexTargAdded + 1}
s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}, pex: pexConnState{Listed: true}})
targ := &pexState{
ev: []pexEvent{{pexDrop, addrs[0], 0}},
nc: pexTargAdded,
}
require.EqualValues(t, targ, s)
})
t.Run("aboveTargNotListed", func(t *testing.T) {
s := &pexState{nc: pexTargAdded + 1}
s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}, pex: pexConnState{Listed: false}})
targ := &pexState{nc: pexTargAdded + 1}
require.EqualValues(t, targ, s)
})
}
func TestPexReset(t *testing.T) {
s := &pexState{
hold: []pexEvent{{pexDrop, addrs[0], 0}},
ev: []pexEvent{{pexAdd, addrs[1], 0}},
nc: 1,
s := &pexState{}
conns := []PeerConn{
{Peer: Peer{RemoteAddr: addrs[0]}},
{Peer: Peer{RemoteAddr: addrs[1]}},
{Peer: Peer{RemoteAddr: addrs[2]}},
}
s.Add(&conns[0])
s.Add(&conns[1])
s.Drop(&conns[0])
s.Reset()
targ := new(pexState)
require.EqualValues(t, targ, s)
@ -132,54 +56,69 @@ func mustNodeAddr(addr net.Addr) krpc.NodeAddr {
}
var testcases = []struct {
name string
in *pexState
arg int
targM pp.PexMsg
targS int
name string
in *pexState
targ pp.PexMsg
update func(*pexState)
targ1 pp.PexMsg
}{
{
name: "empty",
in: &pexState{},
arg: 0,
targM: pp.PexMsg{},
targS: 0,
name: "empty",
in: &pexState{},
targ: pp.PexMsg{},
},
{
name: "add0",
in: func() *pexState {
s := new(pexState)
nullAddr := &net.TCPAddr{}
s.Add(&PeerConn{Peer: Peer{RemoteAddr: nullAddr}})
return s
}(),
targ: pp.PexMsg{},
},
{
name: "drop0",
in: func() *pexState {
nullAddr := &net.TCPAddr{}
s := new(pexState)
s.Drop(&PeerConn{Peer: Peer{RemoteAddr: nullAddr}, pex: pexConnState{Listed: true}})
return s
}(),
targ: pp.PexMsg{},
},
{
name: "add4",
in: &pexState{
ev: []pexEvent{
{pexAdd, addrs[0], f},
{pexAdd, addrs[1], f},
{pexAdd, addrs[2], f},
{pexAdd, addrs[3], f},
},
},
arg: 0,
targM: pp.PexMsg{
in: func() *pexState {
s := new(pexState)
s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}})
s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[1], outgoing: true}})
s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[2], outgoing: true}})
s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[3]}})
return s
}(),
targ: pp.PexMsg{
Added: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[2]),
mustNodeAddr(addrs[3]),
},
AddedFlags: []pp.PexPeerFlags{f, f},
AddedFlags: []pp.PexPeerFlags{pp.PexOutgoingConn, 0},
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[0]),
mustNodeAddr(addrs[1]),
},
Added6Flags: []pp.PexPeerFlags{f, f},
Added6Flags: []pp.PexPeerFlags{0, pp.PexOutgoingConn},
},
targS: 4,
},
{
name: "drop2",
arg: 0,
in: &pexState{
ev: []pexEvent{
{pexDrop, addrs[0], f},
{pexDrop, addrs[2], f},
},
},
targM: pp.PexMsg{
in: func() *pexState {
s := &pexState{nc: pexTargAdded + 2}
s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}, pex: pexConnState{Listed: true}})
s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[2]}, pex: pexConnState{Listed: true}})
return s
}(),
targ: pp.PexMsg{
Dropped: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[2]),
},
@ -187,70 +126,100 @@ var testcases = []struct {
mustNodeAddr(addrs[0]),
},
},
targS: 2,
},
{
name: "add2drop1",
arg: 0,
in: &pexState{
ev: []pexEvent{
{pexAdd, addrs[0], f},
{pexAdd, addrs[1], f},
{pexDrop, addrs[0], f},
},
},
targM: pp.PexMsg{
in: func() *pexState {
conns := []PeerConn{
{Peer: Peer{RemoteAddr: addrs[0]}},
{Peer: Peer{RemoteAddr: addrs[1]}},
{Peer: Peer{RemoteAddr: addrs[2]}},
}
s := &pexState{nc: pexTargAdded}
s.Add(&conns[0])
s.Add(&conns[1])
s.Drop(&conns[0])
s.Drop(&conns[2]) // to be ignored: it wasn't added
return s
}(),
targ: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[1]),
},
Added6Flags: []pp.PexPeerFlags{f},
Added6Flags: []pp.PexPeerFlags{0},
},
targS: 3,
},
{
name: "delayed",
arg: 0,
in: &pexState{
ev: []pexEvent{
{pexAdd, addrs[0], f},
{pexAdd, addrs[1], f},
{pexAdd, addrs[2], f},
},
hold: []pexEvent{
{pexDrop, addrs[0], f},
{pexDrop, addrs[2], f},
{pexDrop, addrs[1], f},
},
},
targM: pp.PexMsg{
in: func() *pexState {
conns := []PeerConn{
{Peer: Peer{RemoteAddr: addrs[0]}},
{Peer: Peer{RemoteAddr: addrs[1]}},
{Peer: Peer{RemoteAddr: addrs[2]}},
}
s := new(pexState)
s.Add(&conns[0])
s.Add(&conns[1])
s.Add(&conns[2])
s.Drop(&conns[0]) // on hold: s.nc < pexTargAdded
s.Drop(&conns[2])
s.Drop(&conns[1])
return s
}(),
targ: pp.PexMsg{
Added: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[2]),
},
AddedFlags: []pp.PexPeerFlags{f},
AddedFlags: []pp.PexPeerFlags{0},
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[0]),
mustNodeAddr(addrs[1]),
},
Added6Flags: []pp.PexPeerFlags{f, f},
Added6Flags: []pp.PexPeerFlags{0, 0},
},
targS: 3,
},
{
name: "followup",
arg: 1,
in: &pexState{
ev: []pexEvent{
{pexAdd, addrs[0], f},
{pexAdd, addrs[1], f},
},
},
targM: pp.PexMsg{
name: "unheld",
in: func() *pexState {
conns := []PeerConn{
{Peer: Peer{RemoteAddr: addrs[0]}},
{Peer: Peer{RemoteAddr: addrs[1]}},
}
s := &pexState{nc: pexTargAdded - 1}
s.Add(&conns[0])
s.Drop(&conns[0]) // on hold: s.nc < pexTargAdded
s.Add(&conns[1]) // unholds the above
return s
}(),
targ: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[1]),
},
Added6Flags: []pp.PexPeerFlags{f},
Added6Flags: []pp.PexPeerFlags{0},
},
},
{
name: "followup",
in: func() *pexState {
s := new(pexState)
s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}})
return s
}(),
targ: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[0]),
},
Added6Flags: []pp.PexPeerFlags{0},
},
update: func(s *pexState) {
s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[1]}})
},
targ1: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[1]),
},
Added6Flags: []pp.PexPeerFlags{0},
},
targS: 2,
},
}
@ -292,13 +261,18 @@ func assertPexMsgsEqual(t *testing.T, expected, actual pp.PexMsg) {
ac.AssertEqual(t, ec)
}
func TestPexGenmsg(t *testing.T) {
func TestPexGenmsg0(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
s := tc.in
m, seen := s.Genmsg(tc.arg)
assertPexMsgsEqual(t, tc.targM, m)
require.EqualValues(t, tc.targS, seen)
s := *tc.in
m, last := s.Genmsg(nil)
assertPexMsgsEqual(t, tc.targ, m)
if tc.update != nil {
tc.update(&s)
m1, last := s.Genmsg(last)
assertPexMsgsEqual(t, tc.targ1, m1)
assert.NotNil(t, last)
}
})
}
}
@ -324,9 +298,8 @@ func TestPexInitialNoCutoff(t *testing.T) {
for addr := range c {
s.Add(&PeerConn{Peer: Peer{RemoteAddr: addr}})
}
m, seq := s.Genmsg(0)
m, _ := s.Genmsg(nil)
require.EqualValues(t, n, seq)
require.EqualValues(t, n, len(m.Added))
require.EqualValues(t, n, len(m.AddedFlags))
require.EqualValues(t, 0, len(m.Added6))
@ -341,7 +314,7 @@ func benchmarkPexInitialN(b *testing.B, npeers int) {
c := addrgen(npeers)
for addr := range c {
s.Add(&PeerConn{Peer: Peer{RemoteAddr: addr}})
s.Genmsg(0)
s.Genmsg(nil)
}
}
}
@ -352,98 +325,3 @@ func BenchmarkPexInitial50(b *testing.B) { benchmarkPexInitialN(b, 50) }
func BenchmarkPexInitial100(b *testing.B) { benchmarkPexInitialN(b, 100) }
func BenchmarkPexInitial200(b *testing.B) { benchmarkPexInitialN(b, 200) }
func BenchmarkPexInitial400(b *testing.B) { benchmarkPexInitialN(b, 400) }
func TestPexAdd(t *testing.T) {
t.Run("ipv4", func(t *testing.T) {
addrs := addrs4
var m pexMsgFactory
m.addEvent(pexEvent{pexDrop, addrs[0], 0})
m.addEvent(pexEvent{pexAdd, addrs[1], f})
for _, addr := range addrs {
m.addEvent(pexEvent{pexAdd, addr, f})
}
targ := pp.PexMsg{
Added: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[1]),
mustNodeAddr(addrs[2]),
mustNodeAddr(addrs[3]),
},
AddedFlags: []pp.PexPeerFlags{f, f, f},
}
out := m.PexMsg()
assertPexMsgsEqual(t, targ, out)
})
t.Run("ipv6", func(t *testing.T) {
addrs := addrs6
var m pexMsgFactory
m.addEvent(pexEvent{pexDrop, addrs[0], 0})
m.addEvent(pexEvent{pexAdd, addrs[1], f})
for _, addr := range addrs {
m.addEvent(pexEvent{pexAdd, addr, f})
}
targ := pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[1]),
mustNodeAddr(addrs[2]),
mustNodeAddr(addrs[3]),
},
Added6Flags: []pp.PexPeerFlags{f, f, f},
}
assertPexMsgsEqual(t, targ, m.PexMsg())
})
t.Run("empty", func(t *testing.T) {
nullAddr := &net.TCPAddr{}
var xm pexMsgFactory
xm.addEvent(pexEvent{pexAdd, nullAddr, f})
m := xm.PexMsg()
require.EqualValues(t, 0, len(m.Added))
require.EqualValues(t, 0, len(m.AddedFlags))
require.EqualValues(t, 0, len(m.Added6))
require.EqualValues(t, 0, len(m.Added6Flags))
})
}
func TestPexDrop(t *testing.T) {
t.Run("ipv4", func(t *testing.T) {
addrs := addrs4
var m pexMsgFactory
m.addEvent(pexEvent{pexAdd, addrs[0], f})
m.addEvent(pexEvent{pexDrop, addrs[1], 0})
for _, addr := range addrs {
m.addEvent(pexEvent{pexDrop, addr, 0})
}
targ := pp.PexMsg{
Dropped: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[1]),
mustNodeAddr(addrs[2]),
mustNodeAddr(addrs[3]),
},
}
assertPexMsgsEqual(t, targ, m.PexMsg())
})
t.Run("ipv6", func(t *testing.T) {
addrs := addrs6
var m pexMsgFactory
m.addEvent(pexEvent{pexAdd, addrs[0], f})
m.addEvent(pexEvent{pexDrop, addrs[1], 0})
for _, addr := range addrs {
m.addEvent(pexEvent{pexDrop, addr, 0})
}
targ := pp.PexMsg{
Dropped6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[1]),
mustNodeAddr(addrs[2]),
mustNodeAddr(addrs[3]),
},
}
assertPexMsgsEqual(t, targ, m.PexMsg())
})
t.Run("empty", func(t *testing.T) {
nullAddr := &net.TCPAddr{}
var xm pexMsgFactory
xm.addEvent(pexEvent{pexDrop, nullAddr, f})
m := xm.PexMsg()
require.EqualValues(t, 0, len(m.Dropped))
require.EqualValues(t, 0, len(m.Dropped6))
})
}

View File

@ -18,7 +18,7 @@ const (
type pexConnState struct {
enabled bool
xid pp.ExtensionNumber
seq int
last *pexEvent
timer *time.Timer
gate chan struct{}
readyfn func()
@ -39,7 +39,7 @@ func (s *pexConnState) Init(c *PeerConn) {
return
}
s.xid = xid
s.seq = 0
s.last = nil
s.torrent = c.t
s.info = c.t.cl.logger.WithDefaultLevel(log.Info)
s.dbg = c.logger.WithDefaultLevel(log.Debug)
@ -59,11 +59,11 @@ func (s *pexConnState) sched(delay time.Duration) {
// generate next PEX message for the peer; returns nil if nothing yet to send
func (s *pexConnState) genmsg() *pp.PexMsg {
tx, seq := s.torrent.pex.Genmsg(s.seq)
tx, last := s.torrent.pex.Genmsg(s.last)
if tx.Len() == 0 {
return nil
}
s.seq = seq
s.last = last
return &tx
}