simplify pexMsgFactory

This commit is contained in:
Yaroslav Kolomiiets 2020-11-09 15:05:47 +00:00 committed by Matt Joiner
parent ebdfae97a2
commit 94055287b0
2 changed files with 97 additions and 135 deletions

100
pex.go
View File

@ -28,16 +28,10 @@ type pexEvent struct {
f pp.PexPeerFlags
}
// Combines the node addr, as required for pp.PexMsg.
type pexMsgAdded struct {
krpc.NodeAddr
pp.PexPeerFlags
}
// Makes generating a PexMsg more efficient.
// facilitates efficient de-duplication while generating PEX messages
type pexMsgFactory struct {
added map[string]pexMsgAdded
dropped map[string]krpc.NodeAddr
added map[addrKey]pexEvent
dropped map[addrKey]pexEvent
}
func (me *pexMsgFactory) DeltaLen() int {
@ -46,90 +40,78 @@ func (me *pexMsgFactory) DeltaLen() int {
int64(len(me.dropped))))
}
// Returns the key to use to identify a given addr in the factory. Panics if we can't support the
// addr later in generating a PexMsg (since adding an unusable addr will cause DeltaLen to be out.)
func (me *pexMsgFactory) addrKey(addr krpc.NodeAddr) string {
if addr.IP.To4() != nil {
addr.IP = addr.IP.To4()
}
keyBytes, err := addr.MarshalBinary()
if err != nil {
panic(err)
}
switch len(keyBytes) {
case compactIpv4NodeAddrElemSize:
case compactIpv6NodeAddrElemSize:
default:
panic(len(keyBytes))
}
return string(keyBytes)
type addrKey string
// Returns the key to use to identify a given addr in the factory.
func (me *pexMsgFactory) addrKey(addr net.Addr) addrKey {
return addrKey(addr.String())
}
// Returns whether the entry was added (we can check if we're cancelling out another entry and so
// won't hit the limit consuming this event).
func (me *pexMsgFactory) Add(addr krpc.NodeAddr, flags pp.PexPeerFlags) {
key := me.addrKey(addr)
func (me *pexMsgFactory) add(e pexEvent) {
key := me.addrKey(e.addr)
if _, ok := me.dropped[key]; ok {
delete(me.dropped, key)
return
}
if me.added == nil {
me.added = make(map[string]pexMsgAdded, pexMaxDelta)
me.added = make(map[addrKey]pexEvent, pexMaxDelta)
}
me.added[key] = pexMsgAdded{addr, flags}
me.added[key] = e
}
// Returns whether the entry was added (we can check if we're cancelling out another entry and so
// won't hit the limit consuming this event).
func (me *pexMsgFactory) Drop(addr krpc.NodeAddr) {
key := me.addrKey(addr)
func (me *pexMsgFactory) drop(e pexEvent) {
key := me.addrKey(e.addr)
if _, ok := me.added[key]; ok {
delete(me.added, key)
return
}
if me.dropped == nil {
me.dropped = make(map[string]krpc.NodeAddr, pexMaxDelta)
me.dropped = make(map[addrKey]pexEvent, pexMaxDelta)
}
me.dropped[key] = addr
me.dropped[key] = e
}
func (me *pexMsgFactory) addEvent(event pexEvent) {
addr, ok := nodeAddr(event.addr)
if !ok {
return
}
switch event.t {
case pexAdd:
me.Add(addr, event.f)
me.add(event)
case pexDrop:
me.Drop(addr)
me.drop(event)
default:
panic(event.t)
}
}
var compactIpv4NodeAddrElemSize = krpc.CompactIPv4NodeAddrs{}.ElemSize()
var compactIpv6NodeAddrElemSize = krpc.CompactIPv6NodeAddrs{}.ElemSize()
func (me *pexMsgFactory) PexMsg() (ret pp.PexMsg) {
for key, added := range me.added {
switch len(key) {
case compactIpv4NodeAddrElemSize:
ret.Added = append(ret.Added, added.NodeAddr)
ret.AddedFlags = append(ret.AddedFlags, added.PexPeerFlags)
case compactIpv6NodeAddrElemSize:
ret.Added6 = append(ret.Added6, added.NodeAddr)
ret.Added6Flags = append(ret.Added6Flags, added.PexPeerFlags)
addr, ok := nodeAddr(added.addr)
if !ok {
continue
}
switch len(addr.IP) {
case net.IPv4len:
ret.Added = append(ret.Added, addr)
ret.AddedFlags = append(ret.AddedFlags, added.f)
case net.IPv6len:
ret.Added6 = append(ret.Added6, addr)
ret.Added6Flags = append(ret.Added6Flags, added.f)
default:
panic(key)
}
}
for key, addr := range me.dropped {
switch len(key) {
case compactIpv4NodeAddrElemSize:
for key, dropped := range me.dropped {
addr, ok := nodeAddr(dropped.addr)
if !ok {
continue
}
switch len(addr.IP) {
case net.IPv4len:
ret.Dropped = append(ret.Dropped, addr)
case compactIpv6NodeAddrElemSize:
case net.IPv6len:
ret.Dropped6 = append(ret.Dropped6, addr)
default:
panic(key)
@ -138,14 +120,6 @@ func (me *pexMsgFactory) PexMsg() (ret pp.PexMsg) {
return
}
func mustNodeAddr(addr net.Addr) krpc.NodeAddr {
ret, ok := nodeAddr(addr)
if !ok {
panic(addr)
}
return ret
}
// Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
// format.
func nodeAddr(addr net.Addr) (_ krpc.NodeAddr, ok bool) {

View File

@ -12,11 +12,23 @@ import (
)
var (
addrs = []net.Addr{
addrs6 = []net.Addr{
&net.TCPAddr{IP: net.IPv6loopback, Port: 4747},
&net.TCPAddr{IP: net.IPv6loopback, Port: 4748},
&net.TCPAddr{IP: net.IPv6loopback, Port: 4749},
&net.TCPAddr{IP: net.IPv6loopback, Port: 4750},
}
addrs4 = []net.Addr{
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747},
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748},
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4749},
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4750},
}
addrs = []net.Addr{
addrs6[0],
addrs6[1],
addrs4[0],
addrs4[1],
}
f = pp.PexOutgoingConn
)
@ -111,6 +123,14 @@ func TestPexReset(t *testing.T) {
require.EqualValues(t, targ, s)
}
func mustNodeAddr(addr net.Addr) krpc.NodeAddr {
ret, ok := nodeAddr(addr)
if !ok {
panic(addr)
}
return ret
}
var testcases = []struct {
name string
in *pexState
@ -239,24 +259,17 @@ var testcases = []struct {
// deterministic. Because the flags are in a different array, we can't just use testify's
// ElementsMatch because the ordering *does* still matter between an added addr and its flags.
type comparablePexMsg struct {
added, added6 []pexMsgAdded
dropped, dropped6 []krpc.NodeAddr
}
func (me *comparablePexMsg) makeAdded(addrs []krpc.NodeAddr, flags []pp.PexPeerFlags) (ret []pexMsgAdded) {
for i, addr := range addrs {
ret = append(ret, pexMsgAdded{
NodeAddr: addr,
PexPeerFlags: flags[i],
})
}
return
added, added6 []krpc.NodeAddr
addedFlags, added6Flags []pp.PexPeerFlags
dropped, dropped6 []krpc.NodeAddr
}
// Such Rust-inspired.
func (me *comparablePexMsg) From(f pp.PexMsg) {
me.added = me.makeAdded(f.Added, f.AddedFlags)
me.added6 = me.makeAdded(f.Added6, f.Added6Flags)
me.added = f.Added
me.addedFlags = f.AddedFlags
me.added6 = f.Added6
me.added6Flags = f.Added6Flags
me.dropped = f.Dropped
me.dropped6 = f.Dropped6
}
@ -265,7 +278,9 @@ func (me *comparablePexMsg) From(f pp.PexMsg) {
// in pexMsgFactory that preserve insert ordering.
func (actual comparablePexMsg) AssertEqual(t *testing.T, expected comparablePexMsg) {
assert.ElementsMatch(t, expected.added, actual.added)
assert.ElementsMatch(t, expected.addedFlags, actual.addedFlags)
assert.ElementsMatch(t, expected.added6, actual.added6)
assert.ElementsMatch(t, expected.added6Flags, actual.added6Flags)
assert.ElementsMatch(t, expected.dropped, actual.dropped)
assert.ElementsMatch(t, expected.dropped6, actual.dropped6)
}
@ -321,60 +336,47 @@ func TestPexInitialNoCutoff(t *testing.T) {
}
func TestPexAdd(t *testing.T) {
addrs4 := []krpc.NodeAddr{
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3
}
addrs6 := []krpc.NodeAddr{
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3
}
f := pp.PexPrefersEncryption | pp.PexOutgoingConn
t.Run("ipv4", func(t *testing.T) {
addrs := addrs4
var m pexMsgFactory
m.Drop(addrs[0])
m.Add(addrs[1], f)
m.addEvent(pexEvent{pexDrop, addrs[0], 0})
m.addEvent(pexEvent{pexAdd, addrs[1], f})
for _, addr := range addrs {
m.Add(addr, f)
m.addEvent(pexEvent{pexAdd, addr, f})
}
targ := pp.PexMsg{
Added: krpc.CompactIPv4NodeAddrs{
addrs[1],
addrs[2],
addrs[3],
mustNodeAddr(addrs[1]),
mustNodeAddr(addrs[2]),
mustNodeAddr(addrs[3]),
},
AddedFlags: []pp.PexPeerFlags{f, f, f},
}
assertPexMsgsEqual(t, targ, m.PexMsg())
out := m.PexMsg()
assertPexMsgsEqual(t, targ, out)
})
t.Run("ipv6", func(t *testing.T) {
addrs := addrs6
var m pexMsgFactory
m.Drop(addrs[0])
m.Add(addrs[1], f)
m.addEvent(pexEvent{pexDrop, addrs[0], 0})
m.addEvent(pexEvent{pexAdd, addrs[1], f})
for _, addr := range addrs {
m.Add(addr, f)
m.addEvent(pexEvent{pexAdd, addr, f})
}
targ := pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{
addrs[1],
addrs[2],
addrs[3],
mustNodeAddr(addrs[1]),
mustNodeAddr(addrs[2]),
mustNodeAddr(addrs[3]),
},
Added6Flags: []pp.PexPeerFlags{f, f, f},
}
assertPexMsgsEqual(t, targ, m.PexMsg())
})
t.Run("empty", func(t *testing.T) {
addr := krpc.NodeAddr{}
nullAddr := &net.TCPAddr{}
var xm pexMsgFactory
assert.Panics(t, func() { xm.Add(addr, f) })
xm.addEvent(pexEvent{pexAdd, nullAddr, f})
m := xm.PexMsg()
require.EqualValues(t, 0, len(m.Added))
require.EqualValues(t, 0, len(m.AddedFlags))
@ -384,33 +386,19 @@ func TestPexAdd(t *testing.T) {
}
func TestPexDrop(t *testing.T) {
addrs4 := []krpc.NodeAddr{
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4747}, // 0
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4748}, // 1
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4747}, // 2
krpc.NodeAddr{IP: net.IPv4(127, 0, 0, 2), Port: 4748}, // 3
}
addrs6 := []krpc.NodeAddr{
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4747}, // 0
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4748}, // 1
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4749}, // 2
krpc.NodeAddr{IP: net.IPv6loopback, Port: 4750}, // 3
}
f := pp.PexPrefersEncryption | pp.PexOutgoingConn
t.Run("ipv4", func(t *testing.T) {
addrs := addrs4
var m pexMsgFactory
m.Add(addrs[0], f)
m.Drop(addrs[1])
m.addEvent(pexEvent{pexAdd, addrs[0], f})
m.addEvent(pexEvent{pexDrop, addrs[1], 0})
for _, addr := range addrs {
m.Drop(addr)
m.addEvent(pexEvent{pexDrop, addr, 0})
}
targ := pp.PexMsg{
Dropped: krpc.CompactIPv4NodeAddrs{
addrs[1],
addrs[2],
addrs[3],
mustNodeAddr(addrs[1]),
mustNodeAddr(addrs[2]),
mustNodeAddr(addrs[3]),
},
}
assertPexMsgsEqual(t, targ, m.PexMsg())
@ -418,24 +406,24 @@ func TestPexDrop(t *testing.T) {
t.Run("ipv6", func(t *testing.T) {
addrs := addrs6
var m pexMsgFactory
m.Add(addrs[0], f)
m.Drop(addrs[1])
m.addEvent(pexEvent{pexAdd, addrs[0], f})
m.addEvent(pexEvent{pexDrop, addrs[1], 0})
for _, addr := range addrs {
m.Drop(addr)
m.addEvent(pexEvent{pexDrop, addr, 0})
}
targ := pp.PexMsg{
Dropped6: krpc.CompactIPv6NodeAddrs{
addrs[1],
addrs[2],
addrs[3],
mustNodeAddr(addrs[1]),
mustNodeAddr(addrs[2]),
mustNodeAddr(addrs[3]),
},
}
assertPexMsgsEqual(t, targ, m.PexMsg())
})
t.Run("empty", func(t *testing.T) {
addr := krpc.NodeAddr{}
nullAddr := &net.TCPAddr{}
var xm pexMsgFactory
require.Panics(t, func() { xm.Drop(addr) })
xm.addEvent(pexEvent{pexDrop, nullAddr, f})
m := xm.PexMsg()
require.EqualValues(t, 0, len(m.Dropped))
require.EqualValues(t, 0, len(m.Dropped6))