Limit half-open connections at the Client level

This commit is contained in:
Matt Joiner 2020-09-30 16:56:27 +10:00
parent f5c6f28985
commit a2c7b384df
3 changed files with 14 additions and 3 deletions

View File

@ -75,6 +75,7 @@ type Client struct {
acceptLimiter map[ipStr]int acceptLimiter map[ipStr]int
dialRateLimiter *rate.Limiter dialRateLimiter *rate.Limiter
numHalfOpen int
websocketTrackers websocketTrackers websocketTrackers websocketTrackers
} }
@ -658,7 +659,10 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
panic("invariant broken") panic("invariant broken")
} }
delete(t.halfOpen, addr) delete(t.halfOpen, addr)
cl.numHalfOpen--
for _, t := range cl.torrents {
t.openNewConns() t.openNewConns()
}
} }
// Performs initiator handshakes and returns a connection. Returns nil *connection if no connection // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection

View File

@ -102,6 +102,7 @@ type ClientConfig struct {
MinDialTimeout time.Duration MinDialTimeout time.Duration
EstablishedConnsPerTorrent int EstablishedConnsPerTorrent int
HalfOpenConnsPerTorrent int HalfOpenConnsPerTorrent int
TotalHalfOpenConns int
// Maximum number of peer addresses in reserve. // Maximum number of peer addresses in reserve.
TorrentPeersHighWater int TorrentPeersHighWater int
// Minumum number of peers before effort is made to obtain more peers. // Minumum number of peers before effort is made to obtain more peers.
@ -155,6 +156,7 @@ func NewDefaultClientConfig() *ClientConfig {
MinDialTimeout: 3 * time.Second, MinDialTimeout: 3 * time.Second,
EstablishedConnsPerTorrent: 50, EstablishedConnsPerTorrent: 50,
HalfOpenConnsPerTorrent: 25, HalfOpenConnsPerTorrent: 25,
TotalHalfOpenConns: 100,
TorrentPeersHighWater: 500, TorrentPeersHighWater: 500,
TorrentPeersLowWater: 50, TorrentPeersLowWater: 50,
HandshakesTimeout: 4 * time.Second, HandshakesTimeout: 4 * time.Second,

View File

@ -1075,7 +1075,7 @@ func (t *Torrent) maxHalfOpen() int {
return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent))) return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
} }
func (t *Torrent) openNewConns() { func (t *Torrent) openNewConns() (initiated int) {
defer t.updateWantPeersEvent() defer t.updateWantPeersEvent()
for t.peers.Len() != 0 { for t.peers.Len() != 0 {
if !t.wantConns() { if !t.wantConns() {
@ -1087,9 +1087,14 @@ func (t *Torrent) openNewConns() {
if len(t.cl.dialers) == 0 { if len(t.cl.dialers) == 0 {
return return
} }
if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
return
}
p := t.peers.PopMax() p := t.peers.PopMax()
t.initiateConn(p) t.initiateConn(p)
initiated++
} }
return
} }
func (t *Torrent) getConnPieceInclination() []int { func (t *Torrent) getConnPieceInclination() []int {
@ -1889,7 +1894,6 @@ func (t *Torrent) initiateConn(peer PeerInfo) {
if peer.Id == t.cl.peerID { if peer.Id == t.cl.peerID {
return return
} }
if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted { if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
return return
} }
@ -1897,6 +1901,7 @@ func (t *Torrent) initiateConn(peer PeerInfo) {
if t.addrActive(addr.String()) { if t.addrActive(addr.String()) {
return return
} }
t.cl.numHalfOpen++
t.halfOpen[addr.String()] = peer t.halfOpen[addr.String()] = peer
go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted) go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
} }