Store remoteAddr with each connection

It would appear net.Conns returned from proxies don't have a RemoteAddr the client expects.
This commit is contained in:
Matt Joiner 2018-11-04 16:56:55 +11:00
parent f1f54ce949
commit 0032b45a02
4 changed files with 60 additions and 73 deletions

View File

@ -442,7 +442,7 @@ func (cl *Client) incomingConnection(nc net.Conn) {
if tc, ok := nc.(*net.TCPConn); ok { if tc, ok := nc.(*net.TCPConn); ok {
tc.SetLinger(0) tc.SetLinger(0)
} }
c := cl.newConnection(nc, false) c := cl.newConnection(nc, false, ipPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network())
c.Discovery = peerSourceIncoming c.Discovery = peerSourceIncoming
cl.runReceivedConn(c) cl.runReceivedConn(c)
} }
@ -460,7 +460,8 @@ func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
} }
type dialResult struct { type dialResult struct {
Conn net.Conn Conn net.Conn
Network string
} }
func countDialResult(err error) { func countDialResult(err error) {
@ -523,32 +524,30 @@ func peerNetworkEnabled(network string, cfg *ClientConfig) bool {
} }
// Returns a connection over UTP or TCP, whichever is first to connect. // Returns a connection over UTP or TCP, whichever is first to connect.
func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn { func (cl *Client) dialFirst(ctx context.Context, addr string) dialResult {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
// As soon as we return one connection, cancel the others. // As soon as we return one connection, cancel the others.
defer cancel() defer cancel()
left := 0 left := 0
resCh := make(chan dialResult, left) resCh := make(chan dialResult, left)
dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
left++
go func() {
c, err := f(ctx, addr)
// This is a bit optimistic, but it looks non-trivial to thread
// this through the proxy code. Set it now in case we close the
// connection forthwith.
if tc, ok := c.(*net.TCPConn); ok {
tc.SetLinger(0)
}
countDialResult(err)
resCh <- dialResult{c}
}()
}
func() { func() {
cl.lock() cl.lock()
defer cl.unlock() defer cl.unlock()
cl.eachListener(func(s socket) bool { cl.eachListener(func(s socket) bool {
if peerNetworkEnabled(s.Addr().Network(), cl.config) { network := s.Addr().Network()
dial(s.dial) if peerNetworkEnabled(network, cl.config) {
left++
go func() {
c, err := s.dial(ctx, addr)
// This is a bit optimistic, but it looks non-trivial to thread
// this through the proxy code. Set it now in case we close the
// connection forthwith.
if tc, ok := c.(*net.TCPConn); ok {
tc.SetLinger(0)
}
countDialResult(err)
resCh <- dialResult{c, network}
}()
} }
return true return true
}) })
@ -573,7 +572,7 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
if res.Conn != nil { if res.Conn != nil {
go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1) go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
} }
return res.Conn return res
} }
func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) { func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
@ -586,8 +585,8 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
// Performs initiator handshakes and returns a connection. Returns nil // Performs initiator handshakes and returns a connection. Returns nil
// *connection if no connection for valid reasons. // *connection if no connection for valid reasons.
func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) { func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr ipPort, network string) (c *connection, err error) {
c = cl.newConnection(nc, true) c = cl.newConnection(nc, true, remoteAddr, network)
c.headerEncrypted = encryptHeader c.headerEncrypted = encryptHeader
ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout) ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
defer cancel() defer cancel()
@ -608,8 +607,9 @@ func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torr
// Returns nil connection and nil error if no connection could be established // Returns nil connection and nil error if no connection could be established
// for valid reasons. // for valid reasons.
func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) { func (cl *Client) establishOutgoingConnEx(t *Torrent, addr ipPort, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
nc := cl.dialFirst(ctx, addr) dr := cl.dialFirst(ctx, addr.String())
nc := dr.Conn
if nc == nil { if nc == nil {
return return
} }
@ -618,12 +618,12 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.C
nc.Close() nc.Close()
} }
}() }()
return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader) return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader, addr, dr.Network)
} }
// Returns nil connection and nil error if no connection could be established // Returns nil connection and nil error if no connection could be established
// for valid reasons. // for valid reasons.
func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) { func (cl *Client) establishOutgoingConn(t *Torrent, addr ipPort) (c *connection, err error) {
torrent.Add("establish outgoing connection", 1) torrent.Add("establish outgoing connection", 1)
ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration { ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
cl.rLock() cl.rLock()
@ -658,14 +658,14 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection,
// Called to dial out and run a connection. The addr we're given is already // Called to dial out and run a connection. The addr we're given is already
// considered half-open. // considered half-open.
func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) { func (cl *Client) outgoingConnection(t *Torrent, addr ipPort, ps peerSource) {
cl.dialRateLimiter.Wait(context.Background()) cl.dialRateLimiter.Wait(context.Background())
c, err := cl.establishOutgoingConn(t, addr) c, err := cl.establishOutgoingConn(t, addr)
cl.lock() cl.lock()
defer cl.unlock() defer cl.unlock()
// Don't release lock between here and addConnection, unless it's for // Don't release lock between here and addConnection, unless it's for
// failure. // failure.
cl.noLongerHalfOpen(t, addr) cl.noLongerHalfOpen(t, addr.String())
if err != nil { if err != nil {
if cl.config.Debug { if cl.config.Debug {
log.Printf("error establishing outgoing connection: %s", err) log.Printf("error establishing outgoing connection: %s", err)
@ -794,18 +794,18 @@ func (cl *Client) runReceivedConn(c *connection) {
).AddValue( ).AddValue(
debugLogValue, debugLogValue,
).Add( ).Add(
"network", c.remoteAddr().Network(), "network", c.network,
).Log(cl.logger) ).Log(cl.logger)
torrent.Add("error receiving handshake", 1) torrent.Add("error receiving handshake", 1)
cl.lock() cl.lock()
cl.onBadAccept(c.remoteAddr()) cl.onBadAccept(c.remoteAddr)
cl.unlock() cl.unlock()
return return
} }
if t == nil { if t == nil {
torrent.Add("received handshake for unloaded torrent", 1) torrent.Add("received handshake for unloaded torrent", 1)
cl.lock() cl.lock()
cl.onBadAccept(c.remoteAddr()) cl.onBadAccept(c.remoteAddr)
cl.unlock() cl.unlock()
return return
} }
@ -862,7 +862,7 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
}, },
V: cl.config.ExtendedHandshakeClientVersion, V: cl.config.ExtendedHandshakeClientVersion,
Reqq: 64, // TODO: Really? Reqq: 64, // TODO: Really?
YourIp: pp.CompactIp(missinggo.AddrIP(conn.remoteAddr())), YourIp: pp.CompactIp(conn.remoteAddr.IP),
Encryption: !cl.config.DisableEncryption, Encryption: !cl.config.DisableEncryption,
Port: cl.incomingPeerPort(), Port: cl.incomingPeerPort(),
MetadataSize: torrent.metadataSize(), MetadataSize: torrent.metadataSize(),
@ -1182,7 +1182,7 @@ func (cl *Client) banPeerIP(ip net.IP) {
cl.badPeerIPs[ip.String()] = struct{}{} cl.badPeerIPs[ip.String()] = struct{}{}
} }
func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) { func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr ipPort, network string) (c *connection) {
c = &connection{ c = &connection{
conn: nc, conn: nc,
outgoing: outgoing, outgoing: outgoing,
@ -1190,6 +1190,8 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
PeerChoked: true, PeerChoked: true,
PeerMaxRequests: 250, PeerMaxRequests: 250,
writeBuffer: new(bytes.Buffer), writeBuffer: new(bytes.Buffer),
remoteAddr: remoteAddr,
network: network,
} }
c.writerCond.L = cl.locker() c.writerCond.L = cl.locker()
c.setRW(connStatsReadWriter{nc, c}) c.setRW(connStatsReadWriter{nc, c})
@ -1275,8 +1277,8 @@ func (cl *Client) ListenAddrs() (ret []net.Addr) {
return return
} }
func (cl *Client) onBadAccept(addr net.Addr) { func (cl *Client) onBadAccept(addr ipPort) {
ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr)) ip := maskIpForAcceptLimiting(addr.IP)
if cl.acceptLimiter == nil { if cl.acceptLimiter == nil {
cl.acceptLimiter = make(map[ipStr]int) cl.acceptLimiter = make(map[ipStr]int)
} }

View File

@ -42,8 +42,10 @@ type connection struct {
t *Torrent t *Torrent
// The actual Conn, used for closing, and setting socket options. // The actual Conn, used for closing, and setting socket options.
conn net.Conn conn net.Conn
outgoing bool outgoing bool
network string
remoteAddr ipPort
// The Reader and Writer for this Conn, with hooks installed for stats, // The Reader and Writer for this Conn, with hooks installed for stats,
// limiting, deadlines etc. // limiting, deadlines etc.
w io.Writer w io.Writer
@ -133,7 +135,7 @@ func (cn *connection) expectingChunks() bool {
// Returns true if the connection is over IPv6. // Returns true if the connection is over IPv6.
func (cn *connection) ipv6() bool { func (cn *connection) ipv6() bool {
ip := missinggo.AddrIP(cn.remoteAddr()) ip := cn.remoteAddr.IP
if ip.To4() != nil { if ip.To4() != nil {
return false return false
} }
@ -179,10 +181,6 @@ func (cn *connection) mu() sync.Locker {
return cn.t.cl.locker() return cn.t.cl.locker()
} }
func (cn *connection) remoteAddr() net.Addr {
return cn.conn.RemoteAddr()
}
func (cn *connection) localAddr() net.Addr { func (cn *connection) localAddr() net.Addr {
return cn.conn.LocalAddr() return cn.conn.LocalAddr()
} }
@ -241,7 +239,7 @@ func (cn *connection) connectionFlags() (ret string) {
} }
func (cn *connection) utp() bool { func (cn *connection) utp() bool {
return isUtpNetwork(cn.remoteAddr().Network()) return isUtpNetwork(cn.network)
} }
// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text. // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
@ -279,7 +277,7 @@ func (cn *connection) downloadRate() float64 {
func (cn *connection) WriteStatus(w io.Writer, t *Torrent) { func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
// \t isn't preserved in <pre> blocks? // \t isn't preserved in <pre> blocks?
fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr()) fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr)
fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n", fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
eventAgeString(cn.lastMessageReceived), eventAgeString(cn.lastMessageReceived),
eventAgeString(cn.completedHandshake), eventAgeString(cn.completedHandshake),
@ -1150,15 +1148,15 @@ func (c *connection) mainReadLoop() (err error) {
case pp.Extended: case pp.Extended:
err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload) err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
case pp.Port: case pp.Port:
pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String()) pingAddr := net.UDPAddr{
if err != nil { IP: c.remoteAddr.IP,
panic(err) Port: int(c.remoteAddr.Port),
} }
if msg.Port != 0 { if msg.Port != 0 {
pingAddr.Port = int(msg.Port) pingAddr.Port = int(msg.Port)
} }
cl.eachDhtServer(func(s *dht.Server) { cl.eachDhtServer(func(s *dht.Server) {
go s.Ping(pingAddr, nil) go s.Ping(&pingAddr, nil)
}) })
case pp.AllowedFast: case pp.AllowedFast:
torrent.Add("allowed fasts received", 1) torrent.Add("allowed fasts received", 1)
@ -1550,9 +1548,10 @@ func (c *connection) peerPriority() peerPriority {
} }
func (c *connection) remoteIp() net.IP { func (c *connection) remoteIp() net.IP {
return missinggo.AddrIP(c.remoteAddr()) return c.remoteAddr.IP
} }
// ???
func (c *connection) remoteIpPort() ipPort { func (c *connection) remoteIpPort() ipPort {
return ipPort{missinggo.AddrIP(c.remoteAddr()), uint16(missinggo.AddrPort(c.remoteAddr()))} return c.remoteAddr
} }

View File

@ -23,7 +23,7 @@ func TestSendBitfieldThenHave(t *testing.T) {
config: &ClientConfig{DownloadRateLimiter: unlimited}, config: &ClientConfig{DownloadRateLimiter: unlimited},
} }
cl.initLogger() cl.initLogger()
c := cl.newConnection(nil, false) c := cl.newConnection(nil, false, ipPort{}, "")
c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil)) c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
c.t.setInfo(&metainfo.Info{ c.t.setInfo(&metainfo.Info{
Pieces: make([]byte, metainfo.HashSize*3), Pieces: make([]byte, metainfo.HashSize*3),
@ -105,7 +105,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
t.setChunkSize(defaultChunkSize) t.setChunkSize(defaultChunkSize)
t.pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority()) t.pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority())
r, w := net.Pipe() r, w := net.Pipe()
cn := cl.newConnection(r, true) cn := cl.newConnection(r, true, ipPort{}, "")
cn.setTorrent(t) cn.setTorrent(t)
mrlErr := make(chan error) mrlErr := make(chan error)
msg := pp.Message{ msg := pp.Message{

View File

@ -11,7 +11,6 @@ import (
"net" "net"
"net/url" "net/url"
"os" "os"
"strconv"
"sync" "sync"
"text/tabwriter" "text/tabwriter"
"time" "time"
@ -172,21 +171,11 @@ func (t *Torrent) KnownSwarm() (ks []Peer) {
// Add active peers to the list // Add active peers to the list
for conn := range t.conns { for conn := range t.conns {
host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
if err != nil {
panic(err)
}
ip := net.ParseIP(host)
port, err := strconv.Atoi(portString)
if err != nil {
panic(err)
}
ks = append(ks, Peer{ ks = append(ks, Peer{
Id: conn.PeerID, Id: conn.PeerID,
IP: ip, IP: conn.remoteAddr.IP,
Port: port, Port: int(conn.remoteAddr.Port),
Source: conn.Discovery, Source: conn.Discovery,
// > If the connection is encrypted, that's certainly enough to set SupportsEncryption. // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
// > But if we're not connected to them with an encrypted connection, I couldn't say // > But if we're not connected to them with an encrypted connection, I couldn't say
@ -232,10 +221,7 @@ func (t *Torrent) addrActive(addr string) bool {
return true return true
} }
for c := range t.conns { for c := range t.conns {
ra := c.remoteAddr() ra := c.remoteAddr
if ra == nil {
continue
}
if ra.String() == addr { if ra.String() == addr {
return true return true
} }
@ -1621,7 +1607,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
}()) }())
} }
c := touchers[0] c := touchers[0]
t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr())) t.cl.banPeerIP(c.remoteAddr.IP)
c.Drop() c.Drop()
} }
t.onIncompletePiece(piece) t.onIncompletePiece(piece)
@ -1750,11 +1736,11 @@ func (t *Torrent) initiateConn(peer Peer) {
if t.cl.badPeerIPPort(peer.IP, peer.Port) { if t.cl.badPeerIPPort(peer.IP, peer.Port) {
return return
} }
addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port)) addr := ipPort{peer.IP, uint16(peer.Port)}
if t.addrActive(addr) { if t.addrActive(addr.String()) {
return return
} }
t.halfOpen[addr] = peer t.halfOpen[addr.String()] = peer
go t.cl.outgoingConnection(t, addr, peer.Source) go t.cl.outgoingConnection(t, addr, peer.Source)
} }