From 4a06517856eff49efa03e9f6c550aa1baeb35500 Mon Sep 17 00:00:00 2001 From: Yaroslav Kolomiiets Date: Mon, 6 Dec 2021 18:46:25 +0000 Subject: [PATCH] PEX: fluid event log --- peerconn.go | 2 +- peerconn_test.go | 8 +- pex.go | 88 +++++------ pex_test.go | 394 ++++++++++++++++------------------------------- pexconn.go | 8 +- 5 files changed, 190 insertions(+), 310 deletions(-) diff --git a/peerconn.go b/peerconn.go index 8efa343f..27d34ee5 100644 --- a/peerconn.go +++ b/peerconn.go @@ -1634,7 +1634,7 @@ func (c *PeerConn) dialAddr() PeerRemoteAddr { func (c *PeerConn) pexEvent(t pexEventType) pexEvent { f := c.pexPeerFlags() addr := c.dialAddr() - return pexEvent{t, addr, f} + return pexEvent{t, addr, f, nil} } func (c *PeerConn) String() string { diff --git a/peerconn_test.go b/peerconn_test.go index e81b6343..304a3ec4 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -189,22 +189,22 @@ func TestConnPexEvent(t *testing.T) { { pexAdd, &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, - pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp}, + pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil}, }, { pexDrop, &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}}, - pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn}, + pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil}, }, { pexAdd, &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}}, - pexEvent{pexAdd, dialTcpAddr, 0}, + pexEvent{pexAdd, dialTcpAddr, 0, nil}, }, { pexDrop, &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}}, - pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp}, + pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil}, }, } for i, tc := range testcases { diff --git a/pex.go b/pex.go index 22834436..27c33516 100644 --- a/pex.go +++ b/pex.go @@ -28,6 +28,7 @@ type pexEvent struct { t pexEventType addr PeerRemoteAddr f pp.PexPeerFlags + next *pexEvent // event feed list } // facilitates efficient de-duplication while generating PEX messages @@ -145,7 +146,7 @@ func (me *pexMsgFactory) drop(e pexEvent) { me.dropped[key] = struct{}{} } -func (me *pexMsgFactory) addEvent(event pexEvent) { +func (me *pexMsgFactory) append(event pexEvent) { switch event.t { case pexAdd: me.add(event) @@ -178,36 +179,47 @@ func shortestIP(ip net.IP) net.IP { // Per-torrent PEX state type pexState struct { - ev []pexEvent // event feed, append-only - hold []pexEvent // delayed drops - rest time.Time // cooldown deadline on inbound - nc int // net number of alive conns - initCache pexMsgFactory // last generated initial message - initSeq int // number of events which went into initCache - initLock sync.RWMutex // serialise access to initCache and initSeq + sync.RWMutex + tail *pexEvent // event feed list + hold []pexEvent // delayed drops + rest time.Time // cooldown deadline on inbound + nc int // net number of alive conns + msg0 pexMsgFactory // initial message } // Reset wipes the state clean, releasing resources. Called from Torrent.Close(). func (s *pexState) Reset() { - s.ev = nil + s.Lock() + defer s.Unlock() + s.tail = nil s.hold = nil s.nc = 0 s.rest = time.Time{} - s.initLock.Lock() - s.initCache = pexMsgFactory{} - s.initSeq = 0 - s.initLock.Unlock() + s.msg0 = pexMsgFactory{} +} + +func (s *pexState) append(e *pexEvent) { + if s.tail != nil { + s.tail.next = e + } + s.tail = e + s.msg0.append(*e) } func (s *pexState) Add(c *PeerConn) { + s.Lock() + defer s.Unlock() s.nc++ if s.nc >= pexTargAdded { - s.ev = append(s.ev, s.hold...) + for _, e := range s.hold { + ne := e + s.append(&ne) + } s.hold = s.hold[:0] } e := c.pexEvent(pexAdd) - s.ev = append(s.ev, e) c.pex.Listed = true + s.append(&e) } func (s *pexState) Drop(c *PeerConn) { @@ -215,44 +227,34 @@ func (s *pexState) Drop(c *PeerConn) { // skip connections which were not previously Added return } + s.Lock() + defer s.Unlock() e := c.pexEvent(pexDrop) s.nc-- if s.nc < pexTargAdded && len(s.hold) < pexMaxHold { s.hold = append(s.hold, e) } else { - s.ev = append(s.ev, e) + s.append(&e) } } -// Generate a PEX message based on the event feed. Also returns an index to pass to the subsequent -// calls, producing incremental deltas. -func (s *pexState) Genmsg(start int) (pp.PexMsg, int) { - if start == 0 { - return s.genmsg0() +// Generate a PEX message based on the event feed. +// Also returns a pointer to pass to the subsequent calls +// to produce incremental deltas. +func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) { + s.RLock() + defer s.RUnlock() + if start == nil { + return s.msg0.PexMsg(), s.tail } - - var factory pexMsgFactory - n := start - for _, e := range s.ev[start:] { - if start > 0 && factory.DeltaLen() >= pexMaxDelta { + var msg pexMsgFactory + last := start + for e := start.next; e != nil; e = e.next { + if msg.DeltaLen() >= pexMaxDelta { break } - factory.addEvent(e) - n++ + msg.append(*e) + last = e } - return factory.PexMsg(), n -} - -func (s *pexState) genmsg0() (pp.PexMsg, int) { - s.initLock.Lock() - for _, e := range s.ev[s.initSeq:] { - s.initCache.addEvent(e) - s.initSeq++ - } - s.initLock.Unlock() - s.initLock.RLock() - n := s.initSeq - msg := s.initCache.PexMsg() - s.initLock.RUnlock() - return msg, n + return msg.PexMsg(), last } diff --git a/pex_test.go b/pex_test.go index 54866596..2b973296 100644 --- a/pex_test.go +++ b/pex_test.go @@ -30,94 +30,18 @@ var ( addrs4[0], addrs4[1], } - f = pp.PexOutgoingConn ) -func TestPexAdded(t *testing.T) { - t.Run("noHold", func(t *testing.T) { - s := new(pexState) - s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0], outgoing: true}}) - targ := &pexState{ - ev: []pexEvent{ - {pexAdd, addrs[0], pp.PexOutgoingConn}, - }, - nc: 1, - } - require.EqualValues(t, targ, s) - }) - t.Run("belowTarg", func(t *testing.T) { - s := &pexState{ - hold: []pexEvent{ - {pexDrop, addrs[1], 0}, - }, - nc: 0, - } - s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}}) - targ := &pexState{ - hold: []pexEvent{ - {pexDrop, addrs[1], 0}, - }, - ev: []pexEvent{ - {pexAdd, addrs[0], 0}, - }, - nc: 1, - } - require.EqualValues(t, targ, s) - }) - t.Run("aboveTarg", func(t *testing.T) { - holdAddr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} - s := &pexState{ - hold: []pexEvent{ - {pexDrop, holdAddr, 0}, - }, - nc: pexTargAdded, - } - s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}}) - targ := &pexState{ - hold: []pexEvent{}, - ev: []pexEvent{ - {pexDrop, holdAddr, 0}, - {pexAdd, addrs[0], 0}, - }, - nc: pexTargAdded + 1, - } - require.EqualValues(t, targ, s) - }) -} - -func TestPexDropped(t *testing.T) { - t.Run("belowTarg", func(t *testing.T) { - s := &pexState{nc: 1} - s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}, pex: pexConnState{Listed: true}}) - targ := &pexState{ - hold: []pexEvent{{pexDrop, addrs[0], 0}}, - nc: 0, - } - require.EqualValues(t, targ, s) - }) - t.Run("aboveTarg", func(t *testing.T) { - s := &pexState{nc: pexTargAdded + 1} - s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}, pex: pexConnState{Listed: true}}) - targ := &pexState{ - ev: []pexEvent{{pexDrop, addrs[0], 0}}, - nc: pexTargAdded, - } - require.EqualValues(t, targ, s) - }) - t.Run("aboveTargNotListed", func(t *testing.T) { - s := &pexState{nc: pexTargAdded + 1} - s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}, pex: pexConnState{Listed: false}}) - targ := &pexState{nc: pexTargAdded + 1} - require.EqualValues(t, targ, s) - }) -} - func TestPexReset(t *testing.T) { - s := &pexState{ - hold: []pexEvent{{pexDrop, addrs[0], 0}}, - ev: []pexEvent{{pexAdd, addrs[1], 0}}, - nc: 1, + s := &pexState{} + conns := []PeerConn{ + {Peer: Peer{RemoteAddr: addrs[0]}}, + {Peer: Peer{RemoteAddr: addrs[1]}}, + {Peer: Peer{RemoteAddr: addrs[2]}}, } + s.Add(&conns[0]) + s.Add(&conns[1]) + s.Drop(&conns[0]) s.Reset() targ := new(pexState) require.EqualValues(t, targ, s) @@ -132,54 +56,69 @@ func mustNodeAddr(addr net.Addr) krpc.NodeAddr { } var testcases = []struct { - name string - in *pexState - arg int - targM pp.PexMsg - targS int + name string + in *pexState + targ pp.PexMsg + update func(*pexState) + targ1 pp.PexMsg }{ { - name: "empty", - in: &pexState{}, - arg: 0, - targM: pp.PexMsg{}, - targS: 0, + name: "empty", + in: &pexState{}, + targ: pp.PexMsg{}, + }, + { + name: "add0", + in: func() *pexState { + s := new(pexState) + nullAddr := &net.TCPAddr{} + s.Add(&PeerConn{Peer: Peer{RemoteAddr: nullAddr}}) + return s + }(), + targ: pp.PexMsg{}, + }, + { + name: "drop0", + in: func() *pexState { + nullAddr := &net.TCPAddr{} + s := new(pexState) + s.Drop(&PeerConn{Peer: Peer{RemoteAddr: nullAddr}, pex: pexConnState{Listed: true}}) + return s + }(), + targ: pp.PexMsg{}, }, { name: "add4", - in: &pexState{ - ev: []pexEvent{ - {pexAdd, addrs[0], f}, - {pexAdd, addrs[1], f}, - {pexAdd, addrs[2], f}, - {pexAdd, addrs[3], f}, - }, - }, - arg: 0, - targM: pp.PexMsg{ + in: func() *pexState { + s := new(pexState) + s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}}) + s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[1], outgoing: true}}) + s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[2], outgoing: true}}) + s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[3]}}) + return s + }(), + targ: pp.PexMsg{ Added: krpc.CompactIPv4NodeAddrs{ mustNodeAddr(addrs[2]), mustNodeAddr(addrs[3]), }, - AddedFlags: []pp.PexPeerFlags{f, f}, + AddedFlags: []pp.PexPeerFlags{pp.PexOutgoingConn, 0}, Added6: krpc.CompactIPv6NodeAddrs{ mustNodeAddr(addrs[0]), mustNodeAddr(addrs[1]), }, - Added6Flags: []pp.PexPeerFlags{f, f}, + Added6Flags: []pp.PexPeerFlags{0, pp.PexOutgoingConn}, }, - targS: 4, }, { name: "drop2", - arg: 0, - in: &pexState{ - ev: []pexEvent{ - {pexDrop, addrs[0], f}, - {pexDrop, addrs[2], f}, - }, - }, - targM: pp.PexMsg{ + in: func() *pexState { + s := &pexState{nc: pexTargAdded + 2} + s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}, pex: pexConnState{Listed: true}}) + s.Drop(&PeerConn{Peer: Peer{RemoteAddr: addrs[2]}, pex: pexConnState{Listed: true}}) + return s + }(), + targ: pp.PexMsg{ Dropped: krpc.CompactIPv4NodeAddrs{ mustNodeAddr(addrs[2]), }, @@ -187,70 +126,100 @@ var testcases = []struct { mustNodeAddr(addrs[0]), }, }, - targS: 2, }, { name: "add2drop1", - arg: 0, - in: &pexState{ - ev: []pexEvent{ - {pexAdd, addrs[0], f}, - {pexAdd, addrs[1], f}, - {pexDrop, addrs[0], f}, - }, - }, - targM: pp.PexMsg{ + in: func() *pexState { + conns := []PeerConn{ + {Peer: Peer{RemoteAddr: addrs[0]}}, + {Peer: Peer{RemoteAddr: addrs[1]}}, + {Peer: Peer{RemoteAddr: addrs[2]}}, + } + s := &pexState{nc: pexTargAdded} + s.Add(&conns[0]) + s.Add(&conns[1]) + s.Drop(&conns[0]) + s.Drop(&conns[2]) // to be ignored: it wasn't added + return s + }(), + targ: pp.PexMsg{ Added6: krpc.CompactIPv6NodeAddrs{ mustNodeAddr(addrs[1]), }, - Added6Flags: []pp.PexPeerFlags{f}, + Added6Flags: []pp.PexPeerFlags{0}, }, - targS: 3, }, { name: "delayed", - arg: 0, - in: &pexState{ - ev: []pexEvent{ - {pexAdd, addrs[0], f}, - {pexAdd, addrs[1], f}, - {pexAdd, addrs[2], f}, - }, - hold: []pexEvent{ - {pexDrop, addrs[0], f}, - {pexDrop, addrs[2], f}, - {pexDrop, addrs[1], f}, - }, - }, - targM: pp.PexMsg{ + in: func() *pexState { + conns := []PeerConn{ + {Peer: Peer{RemoteAddr: addrs[0]}}, + {Peer: Peer{RemoteAddr: addrs[1]}}, + {Peer: Peer{RemoteAddr: addrs[2]}}, + } + s := new(pexState) + s.Add(&conns[0]) + s.Add(&conns[1]) + s.Add(&conns[2]) + s.Drop(&conns[0]) // on hold: s.nc < pexTargAdded + s.Drop(&conns[2]) + s.Drop(&conns[1]) + return s + }(), + targ: pp.PexMsg{ Added: krpc.CompactIPv4NodeAddrs{ mustNodeAddr(addrs[2]), }, - AddedFlags: []pp.PexPeerFlags{f}, + AddedFlags: []pp.PexPeerFlags{0}, Added6: krpc.CompactIPv6NodeAddrs{ mustNodeAddr(addrs[0]), mustNodeAddr(addrs[1]), }, - Added6Flags: []pp.PexPeerFlags{f, f}, + Added6Flags: []pp.PexPeerFlags{0, 0}, }, - targS: 3, }, { - name: "followup", - arg: 1, - in: &pexState{ - ev: []pexEvent{ - {pexAdd, addrs[0], f}, - {pexAdd, addrs[1], f}, - }, - }, - targM: pp.PexMsg{ + name: "unheld", + in: func() *pexState { + conns := []PeerConn{ + {Peer: Peer{RemoteAddr: addrs[0]}}, + {Peer: Peer{RemoteAddr: addrs[1]}}, + } + s := &pexState{nc: pexTargAdded - 1} + s.Add(&conns[0]) + s.Drop(&conns[0]) // on hold: s.nc < pexTargAdded + s.Add(&conns[1]) // unholds the above + return s + }(), + targ: pp.PexMsg{ Added6: krpc.CompactIPv6NodeAddrs{ mustNodeAddr(addrs[1]), }, - Added6Flags: []pp.PexPeerFlags{f}, + Added6Flags: []pp.PexPeerFlags{0}, + }, + }, + { + name: "followup", + in: func() *pexState { + s := new(pexState) + s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[0]}}) + return s + }(), + targ: pp.PexMsg{ + Added6: krpc.CompactIPv6NodeAddrs{ + mustNodeAddr(addrs[0]), + }, + Added6Flags: []pp.PexPeerFlags{0}, + }, + update: func(s *pexState) { + s.Add(&PeerConn{Peer: Peer{RemoteAddr: addrs[1]}}) + }, + targ1: pp.PexMsg{ + Added6: krpc.CompactIPv6NodeAddrs{ + mustNodeAddr(addrs[1]), + }, + Added6Flags: []pp.PexPeerFlags{0}, }, - targS: 2, }, } @@ -292,13 +261,18 @@ func assertPexMsgsEqual(t *testing.T, expected, actual pp.PexMsg) { ac.AssertEqual(t, ec) } -func TestPexGenmsg(t *testing.T) { +func TestPexGenmsg0(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - s := tc.in - m, seen := s.Genmsg(tc.arg) - assertPexMsgsEqual(t, tc.targM, m) - require.EqualValues(t, tc.targS, seen) + s := *tc.in + m, last := s.Genmsg(nil) + assertPexMsgsEqual(t, tc.targ, m) + if tc.update != nil { + tc.update(&s) + m1, last := s.Genmsg(last) + assertPexMsgsEqual(t, tc.targ1, m1) + assert.NotNil(t, last) + } }) } } @@ -324,9 +298,8 @@ func TestPexInitialNoCutoff(t *testing.T) { for addr := range c { s.Add(&PeerConn{Peer: Peer{RemoteAddr: addr}}) } - m, seq := s.Genmsg(0) + m, _ := s.Genmsg(nil) - require.EqualValues(t, n, seq) require.EqualValues(t, n, len(m.Added)) require.EqualValues(t, n, len(m.AddedFlags)) require.EqualValues(t, 0, len(m.Added6)) @@ -341,7 +314,7 @@ func benchmarkPexInitialN(b *testing.B, npeers int) { c := addrgen(npeers) for addr := range c { s.Add(&PeerConn{Peer: Peer{RemoteAddr: addr}}) - s.Genmsg(0) + s.Genmsg(nil) } } } @@ -352,98 +325,3 @@ func BenchmarkPexInitial50(b *testing.B) { benchmarkPexInitialN(b, 50) } func BenchmarkPexInitial100(b *testing.B) { benchmarkPexInitialN(b, 100) } func BenchmarkPexInitial200(b *testing.B) { benchmarkPexInitialN(b, 200) } func BenchmarkPexInitial400(b *testing.B) { benchmarkPexInitialN(b, 400) } - -func TestPexAdd(t *testing.T) { - t.Run("ipv4", func(t *testing.T) { - addrs := addrs4 - var m pexMsgFactory - m.addEvent(pexEvent{pexDrop, addrs[0], 0}) - m.addEvent(pexEvent{pexAdd, addrs[1], f}) - for _, addr := range addrs { - m.addEvent(pexEvent{pexAdd, addr, f}) - } - targ := pp.PexMsg{ - Added: krpc.CompactIPv4NodeAddrs{ - mustNodeAddr(addrs[1]), - mustNodeAddr(addrs[2]), - mustNodeAddr(addrs[3]), - }, - AddedFlags: []pp.PexPeerFlags{f, f, f}, - } - out := m.PexMsg() - assertPexMsgsEqual(t, targ, out) - }) - t.Run("ipv6", func(t *testing.T) { - addrs := addrs6 - var m pexMsgFactory - m.addEvent(pexEvent{pexDrop, addrs[0], 0}) - m.addEvent(pexEvent{pexAdd, addrs[1], f}) - for _, addr := range addrs { - m.addEvent(pexEvent{pexAdd, addr, f}) - } - targ := pp.PexMsg{ - Added6: krpc.CompactIPv6NodeAddrs{ - mustNodeAddr(addrs[1]), - mustNodeAddr(addrs[2]), - mustNodeAddr(addrs[3]), - }, - Added6Flags: []pp.PexPeerFlags{f, f, f}, - } - assertPexMsgsEqual(t, targ, m.PexMsg()) - }) - t.Run("empty", func(t *testing.T) { - nullAddr := &net.TCPAddr{} - var xm pexMsgFactory - xm.addEvent(pexEvent{pexAdd, nullAddr, f}) - m := xm.PexMsg() - require.EqualValues(t, 0, len(m.Added)) - require.EqualValues(t, 0, len(m.AddedFlags)) - require.EqualValues(t, 0, len(m.Added6)) - require.EqualValues(t, 0, len(m.Added6Flags)) - }) -} - -func TestPexDrop(t *testing.T) { - t.Run("ipv4", func(t *testing.T) { - addrs := addrs4 - var m pexMsgFactory - m.addEvent(pexEvent{pexAdd, addrs[0], f}) - m.addEvent(pexEvent{pexDrop, addrs[1], 0}) - for _, addr := range addrs { - m.addEvent(pexEvent{pexDrop, addr, 0}) - } - targ := pp.PexMsg{ - Dropped: krpc.CompactIPv4NodeAddrs{ - mustNodeAddr(addrs[1]), - mustNodeAddr(addrs[2]), - mustNodeAddr(addrs[3]), - }, - } - assertPexMsgsEqual(t, targ, m.PexMsg()) - }) - t.Run("ipv6", func(t *testing.T) { - addrs := addrs6 - var m pexMsgFactory - m.addEvent(pexEvent{pexAdd, addrs[0], f}) - m.addEvent(pexEvent{pexDrop, addrs[1], 0}) - for _, addr := range addrs { - m.addEvent(pexEvent{pexDrop, addr, 0}) - } - targ := pp.PexMsg{ - Dropped6: krpc.CompactIPv6NodeAddrs{ - mustNodeAddr(addrs[1]), - mustNodeAddr(addrs[2]), - mustNodeAddr(addrs[3]), - }, - } - assertPexMsgsEqual(t, targ, m.PexMsg()) - }) - t.Run("empty", func(t *testing.T) { - nullAddr := &net.TCPAddr{} - var xm pexMsgFactory - xm.addEvent(pexEvent{pexDrop, nullAddr, f}) - m := xm.PexMsg() - require.EqualValues(t, 0, len(m.Dropped)) - require.EqualValues(t, 0, len(m.Dropped6)) - }) -} diff --git a/pexconn.go b/pexconn.go index 2940c4d2..d0308f75 100644 --- a/pexconn.go +++ b/pexconn.go @@ -18,7 +18,7 @@ const ( type pexConnState struct { enabled bool xid pp.ExtensionNumber - seq int + last *pexEvent timer *time.Timer gate chan struct{} readyfn func() @@ -39,7 +39,7 @@ func (s *pexConnState) Init(c *PeerConn) { return } s.xid = xid - s.seq = 0 + s.last = nil s.torrent = c.t s.info = c.t.cl.logger.WithDefaultLevel(log.Info) s.dbg = c.logger.WithDefaultLevel(log.Debug) @@ -59,11 +59,11 @@ func (s *pexConnState) sched(delay time.Duration) { // generate next PEX message for the peer; returns nil if nothing yet to send func (s *pexConnState) genmsg() *pp.PexMsg { - tx, seq := s.torrent.pex.Genmsg(s.seq) + tx, last := s.torrent.pex.Genmsg(s.last) if tx.Len() == 0 { return nil } - s.seq = seq + s.last = last return &tx }