From bb28ff749289e4ba7256bc513330738eb52c9e6a Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 30 Jun 2015 00:45:26 +1000 Subject: [PATCH] Replace pruning timer with as-required connection dropping --- client.go | 113 ++++++++++++++++-------------------------------- torrent.go | 32 ++++++++++---- torrent_test.go | 2 - worst_conns.go | 12 ++++- 4 files changed, 72 insertions(+), 87 deletions(-) diff --git a/client.go b/client.go index bdef0cc4..3128ca82 100644 --- a/client.go +++ b/client.go @@ -3,7 +3,6 @@ package torrent import ( "bufio" "bytes" - "container/heap" "crypto/rand" "crypto/sha1" "encoding/hex" @@ -1681,27 +1680,31 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { } } -func (me *Client) dropConnection(torrent *torrent, conn *connection) { - me.event.Broadcast() - for r := range conn.Requests { - me.connDeleteRequest(torrent, conn, r) - } - conn.Close() - for i0, c := range torrent.Conns { - if c != conn { +// Returns true if connection is removed from torrent.Conns. +func (me *Client) deleteConnection(t *torrent, c *connection) bool { + for i0, _c := range t.Conns { + if _c != c { continue } - i1 := len(torrent.Conns) - 1 + i1 := len(t.Conns) - 1 if i0 != i1 { - torrent.Conns[i0] = torrent.Conns[i1] + t.Conns[i0] = t.Conns[i1] } - torrent.Conns = torrent.Conns[:i1] - me.openNewConns(torrent) - return + t.Conns = t.Conns[:i1] + return true } - panic("connection not found") + return false } +func (me *Client) dropConnection(t *torrent, c *connection) { + me.event.Broadcast() + c.Close() + if me.deleteConnection(t, c) { + me.openNewConns(t) + } +} + +// Returns true if the connection is added. func (me *Client) addConnection(t *torrent, c *connection) bool { if me.stopped() { return false @@ -1721,13 +1724,19 @@ func (me *Client) addConnection(t *torrent, c *connection) bool { return false } } - t.Conns = append(t.Conns, c) - // TODO: This should probably be done by a routine that kills off bad - // connections, and extra connections killed here instead. - if len(t.Conns) > socketsPerTorrent { - wcs := t.worstConnsHeap(me) - heap.Pop(wcs).(*connection).Close() + if len(t.Conns) >= socketsPerTorrent { + c := t.worstBadConn(me) + if c == nil { + return false + } + log.Printf("%s: dropping connection to make room for new one: %s", t, c) + c.Close() + me.deleteConnection(t, c) } + if len(t.Conns) >= socketsPerTorrent { + panic(len(t.Conns)) + } + t.Conns = append(t.Conns, c) return true } @@ -1744,18 +1753,13 @@ func (t *torrent) needData() bool { } func (cl *Client) usefulConn(t *torrent, c *connection) bool { - // A 30 second grace for initial messages to go through. - if time.Since(c.completedHandshake) < 30*time.Second { - return true + select { + case <-c.closing: + return false + default: } if !t.haveInfo() { - if !c.supportsExtension("ut_metadata") { - return false - } - if time.Since(c.completedHandshake) < 2*time.Minute { - return true - } - return false + return c.supportsExtension("ut_metadata") } if cl.seeding(t) { return c.PeerInterested @@ -1763,23 +1767,14 @@ func (cl *Client) usefulConn(t *torrent, c *connection) bool { return t.connHasWantedPieces(c) } -func (t *torrent) numGoodConns(cl *Client) (num int) { - for _, c := range t.Conns { - if cl.usefulConn(t, c) { - num++ - } - } - return -} - func (me *Client) wantConns(t *torrent) bool { if !me.seeding(t) && !t.needData() { return false } - if t.numGoodConns(me) >= socketsPerTorrent { - return false + if len(t.Conns) < socketsPerTorrent { + return true } - return true + return t.worstBadConn(me) != nil } func (me *Client) openNewConns(t *torrent) { @@ -2162,9 +2157,6 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (T Torrent, new bool, err er // From this point onwards, we can consider the torrent a part of the // client. if new { - t.pruneTimer = time.AfterFunc(0, func() { - cl.pruneConnectionsUnlocked(T.torrent) - }) if !cl.config.DisableTrackers { go cl.announceTorrentTrackers(T.torrent) } @@ -2175,35 +2167,6 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (T Torrent, new bool, err er return } -// Prunes unused connections. This is required to make space to dial for -// replacements. -func (cl *Client) pruneConnectionsUnlocked(t *torrent) { - select { - case <-t.ceasingNetworking: - return - case <-t.closing: - return - default: - } - cl.mu.Lock() - license := len(t.Conns) - (socketsPerTorrent+1)/2 - for _, c := range t.Conns { - if license <= 0 { - break - } - if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute { - continue - } - if time.Now().Sub(c.completedHandshake) < time.Minute { - continue - } - c.Close() - license-- - } - cl.mu.Unlock() - t.pruneTimer.Reset(pruneInterval) -} - func (me *Client) dropTorrent(infoHash InfoHash) (err error) { t, ok := me.torrents[infoHash] if !ok { diff --git a/torrent.go b/torrent.go index 30afe1a9..2d791aab 100644 --- a/torrent.go +++ b/torrent.go @@ -99,8 +99,6 @@ type torrent struct { // Closed when .Info is set. gotMetainfo chan struct{} - - pruneTimer *time.Timer } func (t *torrent) pieceComplete(piece int) bool { @@ -131,13 +129,19 @@ func (t *torrent) addrActive(addr string) bool { return false } -func (t *torrent) worstConnsHeap(cl *Client) (wcs *worstConns) { +func (t *torrent) worstConns(cl *Client) (wcs *worstConns) { wcs = &worstConns{ - c: append([]*connection{}, t.Conns...), + c: make([]*connection, 0, len(t.Conns)), t: t, cl: cl, } - heap.Init(wcs) + for _, c := range t.Conns { + select { + case <-c.closing: + default: + wcs.c = append(wcs.c, c) + } + } return } @@ -153,9 +157,6 @@ func (t *torrent) ceaseNetworking() { for _, c := range t.Conns { c.Close() } - if t.pruneTimer != nil { - t.pruneTimer.Stop() - } } func (t *torrent) addPeer(p Peer) { @@ -728,3 +729,18 @@ func (t *torrent) extentPieces(off, _len int64) (pieces []int) { } return } + +func (t *torrent) worstBadConn(cl *Client) *connection { + wcs := t.worstConns(cl) + heap.Init(wcs) + // A connection can only be bad if it's in the worst half, rounded down. + for wcs.Len() > (socketsPerTorrent+1)/2 { + c := heap.Pop(wcs).(*connection) + // Give connections 1 minute to prove themselves. + if time.Since(c.completedHandshake) < time.Minute { + continue + } + return c + } + return nil +} diff --git a/torrent_test.go b/torrent_test.go index fdc96903..6134e28d 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -3,7 +3,6 @@ package torrent import ( "sync" "testing" - "time" "github.com/anacrolix/torrent/peer_protocol" ) @@ -46,7 +45,6 @@ func TestTorrentRequest(t *testing.T) { func TestTorrentDoubleClose(t *testing.T) { tt, err := newTorrent(InfoHash{}) - tt.pruneTimer = time.NewTimer(0) if err != nil { t.Fatal(err) } diff --git a/worst_conns.go b/worst_conns.go index 0d7b9d0d..0f7aa3aa 100644 --- a/worst_conns.go +++ b/worst_conns.go @@ -29,13 +29,17 @@ func (me *worstConns) Push(x interface{}) { type worstConnsSortKey struct { useful bool lastHelpful time.Time + connected time.Time } func (me worstConnsSortKey) Less(other worstConnsSortKey) bool { if me.useful != other.useful { return !me.useful } - return me.lastHelpful.Before(other.lastHelpful) + if !me.lastHelpful.Equal(other.lastHelpful) { + return me.lastHelpful.Before(other.lastHelpful) + } + return me.connected.Before(other.connected) } func (me *worstConns) key(i int) (key worstConnsSortKey) { @@ -43,9 +47,13 @@ func (me *worstConns) key(i int) (key worstConnsSortKey) { key.useful = me.cl.usefulConn(me.t, c) if me.cl.seeding(me.t) { key.lastHelpful = c.lastChunkSent - } else { + } + // Intentionally consider the last time a chunk was received when seeding, + // because we might go from seeding back to leeching. + if c.lastUsefulChunkReceived.After(key.lastHelpful) { key.lastHelpful = c.lastUsefulChunkReceived } + key.connected = c.completedHandshake return }