From 240abaf51239842619bc741c0610743987dbded9 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 6 Jul 2016 00:42:16 +1000 Subject: [PATCH] Add Torrent.SetMaxEstablishedConns --- client_test.go | 2 ++ connection.go | 8 +++++++ torrent.go | 29 ++++++++++++++++++------ worst_conns.go | 61 +++++--------------------------------------------- 4 files changed, 38 insertions(+), 62 deletions(-) diff --git a/client_test.go b/client_test.go index 648b55c2..c9046ece 100644 --- a/client_test.go +++ b/client_test.go @@ -61,6 +61,8 @@ func TestAddDropTorrent(t *testing.T) { tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) require.NoError(t, err) assert.True(t, new) + tt.SetMaxEstablishedConns(0) + tt.SetMaxEstablishedConns(1) tt.Drop() } diff --git a/connection.go b/connection.go index 1f8d3e74..a58647ad 100644 --- a/connection.go +++ b/connection.go @@ -669,3 +669,11 @@ func (c *connection) useful() bool { } return t.connHasWantedPieces(c) } + +func (c *connection) lastHelpful() time.Time { + lasts := []time.Time{c.lastUsefulChunkReceived} + if c.t.seeding() { + lasts = append(lasts, c.lastChunkSent) + } + return missinggo.Max(time.Time.Before, missinggo.ConvertToSliceOfEmptyInterface(lasts)...).(time.Time) +} diff --git a/torrent.go b/torrent.go index 50441879..3fa662c1 100644 --- a/torrent.go +++ b/torrent.go @@ -10,7 +10,6 @@ import ( "math/rand" "net" "os" - "sort" "sync" "time" @@ -134,11 +133,11 @@ func (t *Torrent) addrActive(addr string) bool { return false } -func (t *Torrent) worstConns() (wcs *worstConns) { - wcs = &worstConns{make([]*connection, 0, len(t.conns))} +func (t *Torrent) worstUnclosedConns() (ret []*connection) { + ret = make([]*connection, 0, len(t.conns)) for _, c := range t.conns { if !c.closed.IsSet() { - wcs.c = append(wcs.c, c) + ret = append(ret, c) } } return @@ -443,7 +442,7 @@ func (t *Torrent) writeStatus(w io.Writer, cl *Client) { fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers)) fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen)) fmt.Fprintf(w, "Active peers: %d\n", len(t.conns)) - sort.Sort(&worstConns{t.conns}) + missinggo.Sort(t.conns, worseConn) for i, c := range t.conns { fmt.Fprintf(w, "%2d. ", i+1) c.WriteStatus(w, t) @@ -733,9 +732,12 @@ func (t *Torrent) extentPieces(off, _len int64) (pieces []int) { return } +// The worst connection is one that hasn't been sent, or sent anything useful +// for the longest. A bad connection is one that usually sends us unwanted +// pieces, or has been in worser half of the established connections for more +// than a minute. func (t *Torrent) worstBadConn() *connection { - wcs := t.worstConns() - heap.Init(wcs) + wcs := missinggo.HeapFromSlice(t.worstUnclosedConns(), worseConn) for wcs.Len() != 0 { c := heap.Pop(wcs).(*connection) if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived { @@ -1310,3 +1312,16 @@ func (t *Torrent) wantConns() bool { } return t.worstBadConn() != nil } + +func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { + t.cl.mu.Lock() + defer t.cl.mu.Unlock() + oldMax = t.maxEstablishedConns + t.maxEstablishedConns = max + wcs := missinggo.HeapFromSlice(append([]*connection(nil), t.conns...), worseConn) + for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 { + t.dropConnection(wcs.Pop().(*connection)) + } + t.openNewConns() + return oldMax +} diff --git a/worst_conns.go b/worst_conns.go index 2abc8f09..4085e0c9 100644 --- a/worst_conns.go +++ b/worst_conns.go @@ -1,60 +1,11 @@ package torrent -import ( - "time" -) - -// Implements a heap of connections by how useful they are or have been. -type worstConns struct { - c []*connection -} - -func (wc *worstConns) Len() int { return len(wc.c) } -func (wc *worstConns) Swap(i, j int) { wc.c[i], wc.c[j] = wc.c[j], wc.c[i] } - -func (wc *worstConns) Pop() (ret interface{}) { - old := wc.c - n := len(old) - ret = old[n-1] - wc.c = old[:n-1] - return -} - -func (wc *worstConns) Push(x interface{}) { - wc.c = append(wc.c, x.(*connection)) -} - -type worstConnsSortKey struct { - useful bool - lastHelpful time.Time - connected time.Time -} - -func (wc worstConnsSortKey) Less(other worstConnsSortKey) bool { - if wc.useful != other.useful { - return !wc.useful +func worseConn(l, r *connection) bool { + if l.useful() != r.useful() { + return r.useful() } - if !wc.lastHelpful.Equal(other.lastHelpful) { - return wc.lastHelpful.Before(other.lastHelpful) + if !l.lastHelpful().Equal(r.lastHelpful()) { + return l.lastHelpful().Before(r.lastHelpful()) } - return wc.connected.Before(other.connected) -} - -func (wc *worstConns) key(i int) (key worstConnsSortKey) { - c := wc.c[i] - key.useful = c.useful() - if c.t.seeding() { - key.lastHelpful = c.lastChunkSent - } - // Intentionally consider the last time a chunk was received when seeding, - // because we might go from seeding back to leeching. - if c.lastUsefulChunkReceived.After(key.lastHelpful) { - key.lastHelpful = c.lastUsefulChunkReceived - } - key.connected = c.completedHandshake - return -} - -func (wc worstConns) Less(i, j int) bool { - return wc.key(i).Less(wc.key(j)) + return l.completedHandshake.Before(r.completedHandshake) }