Use webrtc local addr for webrtc conn peer priority

WebRTC conns are providing the correct IP for peer priority calculations, so use that instead of trying to guess (which doesn't work if there are no regular conn listeners attached to the Client.

(cherry picked from commit e86bb5fee3958dc90a3d012469b1352005d6c9ad)
This commit is contained in:
Matt Joiner 2022-07-11 18:22:23 +10:00
parent aedf2583cc
commit 214fe6b93c
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
7 changed files with 77 additions and 31 deletions

View File

@ -20,6 +20,10 @@ func TestBep40Priority(t *testing.T) {
IpPort{IP: net.ParseIP("123.213.32.10"), Port: 0}, IpPort{IP: net.ParseIP("123.213.32.10"), Port: 0},
IpPort{IP: net.ParseIP("123.213.32.234"), Port: 0}, IpPort{IP: net.ParseIP("123.213.32.234"), Port: 0},
)) ))
assert.Equal(t, peerPriority(0x2b41d456), bep40PriorityIgnoreError(
IpPort{IP: net.ParseIP("206.248.98.111"), Port: 0},
IpPort{IP: net.ParseIP("142.147.89.224"), Port: 0},
))
assert.EqualValues(t, "\x00\x00\x00\x00", func() []byte { assert.EqualValues(t, "\x00\x00\x00\x00", func() []byte {
b, _ := bep40PriorityBytes( b, _ := bep40PriorityBytes(
IpPort{IP: net.ParseIP("123.213.32.234"), Port: 0}, IpPort{IP: net.ParseIP("123.213.32.234"), Port: 0},

View File

@ -551,8 +551,16 @@ func (cl *Client) incomingConnection(nc net.Conn) {
if tc, ok := nc.(*net.TCPConn); ok { if tc, ok := nc.(*net.TCPConn); ok {
tc.SetLinger(0) tc.SetLinger(0)
} }
c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(), remoteAddr, _ := tryIpPortFromNetAddr(nc.RemoteAddr())
regularNetConnPeerConnConnString(nc)) c := cl.newConnection(
nc,
newConnectionOpts{
outgoing: false,
remoteAddr: nc.RemoteAddr(),
localPublicAddr: cl.publicAddr(remoteAddr.IP),
network: nc.RemoteAddr().Network(),
connString: regularNetConnPeerConnConnString(nc),
})
defer func() { defer func() {
cl.lock() cl.lock()
defer cl.unlock() defer cl.unlock()
@ -687,13 +695,12 @@ func (cl *Client) initiateProtocolHandshakes(
ctx context.Context, ctx context.Context,
nc net.Conn, nc net.Conn,
t *Torrent, t *Torrent,
outgoing, encryptHeader bool, encryptHeader bool,
remoteAddr PeerRemoteAddr, newConnOpts newConnectionOpts,
network, connString string,
) ( ) (
c *PeerConn, err error, c *PeerConn, err error,
) { ) {
c = cl.newConnection(nc, outgoing, remoteAddr, network, connString) c = cl.newConnection(nc, newConnOpts)
c.headerEncrypted = encryptHeader c.headerEncrypted = encryptHeader
ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout) ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
defer cancel() defer cancel()
@ -725,7 +732,15 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfus
} }
return nil, errors.New("dial failed") return nil, errors.New("dial failed")
} }
c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc)) addrIpPort, _ := tryIpPortFromNetAddr(addr)
c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{
outgoing: true,
remoteAddr: addr,
// It would be possible to retrieve a public IP from the dialer used here?
localPublicAddr: cl.publicAddr(addrIpPort.IP),
network: dr.Dialer.DialerNetwork(),
connString: regularNetConnPeerConnConnString(nc),
})
if err != nil { if err != nil {
nc.Close() nc.Close()
} }
@ -1468,28 +1483,37 @@ func (cl *Client) banPeerIP(ip net.IP) {
} }
} }
func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) { type newConnectionOpts struct {
if network == "" { outgoing bool
panic(remoteAddr) remoteAddr PeerRemoteAddr
localPublicAddr peerLocalPublicAddr
network string
connString string
}
func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) {
if opts.network == "" {
panic(opts.remoteAddr)
} }
c = &PeerConn{ c = &PeerConn{
Peer: Peer{ Peer: Peer{
outgoing: outgoing, outgoing: opts.outgoing,
choking: true, choking: true,
peerChoking: true, peerChoking: true,
PeerMaxRequests: 250, PeerMaxRequests: 250,
RemoteAddr: remoteAddr, RemoteAddr: opts.remoteAddr,
Network: network, localPublicAddr: opts.localPublicAddr,
callbacks: &cl.config.Callbacks, Network: opts.network,
callbacks: &cl.config.Callbacks,
}, },
connString: connString, connString: opts.connString,
conn: nc, conn: nc,
} }
c.initRequestState() c.initRequestState()
// TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses. // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
if remoteAddr != nil { if opts.remoteAddr != nil {
netipAddrPort, err := netip.ParseAddrPort(remoteAddr.String()) netipAddrPort, err := netip.ParseAddrPort(opts.remoteAddr.String())
if err == nil { if err == nil {
c.bannableAddr = Some(netipAddrPort.Addr()) c.bannableAddr = Some(netipAddrPort.Addr())
} }
@ -1501,7 +1525,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot
l: cl.config.DownloadRateLimiter, l: cl.config.DownloadRateLimiter,
r: c.r, r: c.r,
} }
c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing) c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", opts.remoteAddr, opts.network, opts.outgoing)
for _, f := range cl.config.Callbacks.NewPeer { for _, f := range cl.config.Callbacks.NewPeer {
f(&c.Peer) f(&c.Peer)
} }

View File

@ -1,5 +1,7 @@
package torrent package torrent
type peerLocalPublicAddr = IpPort
func (p *Peer) isLowOnRequests() bool { func (p *Peer) isLowOnRequests() bool {
return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty() return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
} }

View File

@ -64,10 +64,13 @@ type Peer struct {
peerImpl peerImpl
callbacks *Callbacks callbacks *Callbacks
outgoing bool outgoing bool
Network string Network string
RemoteAddr PeerRemoteAddr RemoteAddr PeerRemoteAddr
bannableAddr Option[bannableAddr] // The local address as observed by the remote peer. WebRTC seems to get this right without needing hints from the
// config.
localPublicAddr peerLocalPublicAddr
bannableAddr Option[bannableAddr]
// True if the connection is operating over MSE obfuscation. // True if the connection is operating over MSE obfuscation.
headerEncrypted bool headerEncrypted bool
cryptoMethod mse.CryptoMethod cryptoMethod mse.CryptoMethod
@ -1729,7 +1732,7 @@ func (c *PeerConn) setTorrent(t *Torrent) {
} }
func (c *Peer) peerPriority() (peerPriority, error) { func (c *Peer) peerPriority() (peerPriority, error) {
return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp())) return bep40Priority(c.remoteIpPort(), c.localPublicAddr)
} }
func (c *Peer) remoteIp() net.IP { func (c *Peer) remoteIp() net.IP {

View File

@ -25,7 +25,7 @@ func TestSendBitfieldThenHave(t *testing.T) {
var cl Client var cl Client
cl.init(TestingConfig(t)) cl.init(TestingConfig(t))
cl.initLogger() cl.initLogger()
c := cl.newConnection(nil, false, nil, "io.Pipe", "") c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"})
c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil)) c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil { if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
t.Log(err) t.Log(err)
@ -107,7 +107,12 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
t.onSetInfo() t.onSetInfo()
t._pendingPieces.Add(0) t._pendingPieces.Add(0)
r, w := net.Pipe() r, w := net.Pipe()
cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r)) cn := cl.newConnection(r, newConnectionOpts{
outgoing: true,
remoteAddr: r.RemoteAddr(),
network: r.RemoteAddr().Network(),
connString: regularNetConnPeerConnConnString(r),
})
cn.setTorrent(t) cn.setTorrent(t)
mrlErrChan := make(chan error) mrlErrChan := make(chan error)
msg := pp.Message{ msg := pp.Message{
@ -287,7 +292,7 @@ func TestPreferredNetworkDirection(t *testing.T) {
func TestReceiveLargeRequest(t *testing.T) { func TestReceiveLargeRequest(t *testing.T) {
c := qt.New(t) c := qt.New(t)
cl := newTestingClient(t) cl := newTestingClient(t)
pc := cl.newConnection(nil, false, nil, "test", "") pc := cl.newConnection(nil, newConnectionOpts{network: "test"})
tor := cl.newTorrentForTesting() tor := cl.newTorrentForTesting()
tor.info = &metainfo.Info{PieceLength: 3 << 20} tor.info = &metainfo.Info{PieceLength: 3 << 20}
pc.setTorrent(tor) pc.setTorrent(tor)

View File

@ -17,7 +17,10 @@ func TestPexConnState(t *testing.T) {
cl.initLogger() cl.initLogger()
torrent := cl.newTorrent(metainfo.Hash{}, nil) torrent := cl.newTorrent(metainfo.Hash{}, nil)
addr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4747} addr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
c := cl.newConnection(nil, false, addr, addr.Network(), "") c := cl.newConnection(nil, newConnectionOpts{
remoteAddr: addr,
network: addr.Network(),
})
c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber) c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber)
c.PeerExtensionIDs[pp.ExtensionNamePex] = pexExtendedId c.PeerExtensionIDs[pp.ExtensionNamePex] = pexExtendedId
c.messageWriter.mu.Lock() c.messageWriter.mu.Lock()

View File

@ -1575,18 +1575,23 @@ func (t *Torrent) onWebRtcConn(
DataChannelContext: dcc, DataChannelContext: dcc,
} }
peerRemoteAddr := netConn.RemoteAddr() peerRemoteAddr := netConn.RemoteAddr()
//t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr)
if t.cl.badPeerAddr(peerRemoteAddr) { if t.cl.badPeerAddr(peerRemoteAddr) {
return return
} }
localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr())
pc, err := t.cl.initiateProtocolHandshakes( pc, err := t.cl.initiateProtocolHandshakes(
context.Background(), context.Background(),
netConn, netConn,
t, t,
dcc.LocalOffered,
false, false,
netConn.RemoteAddr(), newConnectionOpts{
webrtcNetwork, outgoing: dcc.LocalOffered,
fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)), remoteAddr: peerRemoteAddr,
localPublicAddr: localAddrIpPort,
network: webrtcNetwork,
connString: fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
},
) )
if err != nil { if err != nil {
t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err) t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)