Track outgoing through a new field on connection, and rework duplicate connection preferencing

This commit is contained in:
Matt Joiner 2018-06-12 20:14:00 +10:00
parent e9040f8f74
commit cea5584d6b
4 changed files with 34 additions and 34 deletions

View File

@ -420,7 +420,7 @@ func (cl *Client) incomingConnection(nc net.Conn) {
if tc, ok := nc.(*net.TCPConn); ok {
tc.SetLinger(0)
}
c := cl.newConnection(nc)
c := cl.newConnection(nc, false)
c.Discovery = peerSourceIncoming
cl.runReceivedConn(c)
}
@ -572,7 +572,7 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
// Performs initiator handshakes and returns a connection. Returns nil
// *connection if no connection for valid reasons.
func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
c = cl.newConnection(nc)
c = cl.newConnection(nc, true)
c.headerEncrypted = encryptHeader
ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
defer cancel()
@ -656,7 +656,7 @@ func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
}
defer c.Close()
c.Discovery = ps
cl.runHandshookConn(c, t, true)
cl.runHandshookConn(c, t)
}
// The port number for incoming peer connections. 0 if the client isn't
@ -768,13 +768,13 @@ func (cl *Client) runReceivedConn(c *connection) {
}
cl.mu.Lock()
defer cl.mu.Unlock()
cl.runHandshookConn(c, t, false)
cl.runHandshookConn(c, t)
}
func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
c.setTorrent(t)
if c.PeerID == cl.peerID {
if outgoing {
if c.outgoing {
connsToSelf.Add(1)
addr := c.conn.RemoteAddr().String()
cl.dopplegangerAddrs[addr] = struct{}{}
@ -792,7 +792,8 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
if connIsIpv6(c.conn) {
torrent.Add("completed handshake over ipv6", 1)
}
if !t.addConnection(c, outgoing) {
if err := t.addConnection(c); err != nil {
log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
return
}
defer t.dropConnection(c)
@ -1148,9 +1149,10 @@ func (cl *Client) banPeerIP(ip net.IP) {
cl.badPeerIPs[ip.String()] = struct{}{}
}
func (cl *Client) newConnection(nc net.Conn) (c *connection) {
func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
c = &connection{
conn: nc,
outgoing: outgoing,
Choked: true,
PeerChoked: true,
PeerMaxRequests: 250,

View File

@ -40,7 +40,8 @@ const (
type connection struct {
t *Torrent
// The actual Conn, used for closing, and setting socket options.
conn net.Conn
conn net.Conn
outgoing bool
// The Reader and Writer for this Conn, with hooks installed for stats,
// limiting, deadlines etc.
w io.Writer

View File

@ -23,7 +23,7 @@ func TestSendBitfieldThenHave(t *testing.T) {
r, w := io.Pipe()
var cl Client
cl.initLogger()
c := cl.newConnection(nil)
c := cl.newConnection(nil, false)
c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
c.t.setInfo(&metainfo.Info{
Pieces: make([]byte, metainfo.HashSize*3),
@ -103,7 +103,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
t.setChunkSize(defaultChunkSize)
t.pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority())
r, w := net.Pipe()
cn := cl.newConnection(r)
cn := cl.newConnection(r, true)
cn.setTorrent(t)
mrlErr := make(chan error)
cl.mu.Lock()

View File

@ -1489,37 +1489,34 @@ func (t *Torrent) reconcileHandshakeStats(c *connection) {
}
// Returns true if the connection is added.
func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
func (t *Torrent) addConnection(c *connection) error {
if t.closed.IsSet() {
return false
return errors.New("torrent closed")
}
if !t.wantConns() {
return false
return errors.New("don't want conns")
}
for c0 := range t.conns {
if c.PeerID == c0.PeerID {
// Already connected to a client with that ID.
duplicateClientConns.Add(1)
lower := string(t.cl.peerID[:]) < string(c.PeerID[:])
// Retain the connection from initiated from lower peer ID to
// higher.
if outgoing == lower {
// Close the other one.
c0.Close()
// TODO: Is it safe to delete from the map while we're
// iterating over it?
t.deleteConnection(c0)
} else {
// Abandon this one.
return false
}
if c.PeerID != c0.PeerID {
continue
}
// Already connected to a client with that ID.
preferOutbound := string(t.cl.peerID[:]) < string(c.PeerID[:])
// Retain the connection from initiated from lower peer ID to higher.
if c0.outgoing == preferOutbound {
return errors.New("existing connection preferred")
}
if c.outgoing != preferOutbound {
return errors.New("prefer older connection")
}
// Close the other one.
c0.Close()
// TODO: Is it safe to delete from the map while we're iterating
// over it?
t.deleteConnection(c0)
}
if len(t.conns) >= t.maxEstablishedConns {
c := t.worstBadConn()
if c == nil {
return false
}
if t.cl.config.Debug && missinggo.CryHeard() {
log.Printf("%s: dropping connection to make room for new one:\n %s", t, c)
}
@ -1530,7 +1527,7 @@ func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
panic(len(t.conns))
}
t.conns[c] = struct{}{}
return true
return nil
}
func (t *Torrent) wantConns() bool {