Replace pruning timer with as-required connection dropping

This commit is contained in:
Matt Joiner 2015-06-30 00:45:26 +10:00
parent 1595f65ac2
commit bb28ff7492
4 changed files with 72 additions and 87 deletions

113
client.go
View File

@ -3,7 +3,6 @@ package torrent
import (
"bufio"
"bytes"
"container/heap"
"crypto/rand"
"crypto/sha1"
"encoding/hex"
@ -1681,27 +1680,31 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
}
}
func (me *Client) dropConnection(torrent *torrent, conn *connection) {
me.event.Broadcast()
for r := range conn.Requests {
me.connDeleteRequest(torrent, conn, r)
}
conn.Close()
for i0, c := range torrent.Conns {
if c != conn {
// Returns true if connection is removed from torrent.Conns.
func (me *Client) deleteConnection(t *torrent, c *connection) bool {
for i0, _c := range t.Conns {
if _c != c {
continue
}
i1 := len(torrent.Conns) - 1
i1 := len(t.Conns) - 1
if i0 != i1 {
torrent.Conns[i0] = torrent.Conns[i1]
t.Conns[i0] = t.Conns[i1]
}
torrent.Conns = torrent.Conns[:i1]
me.openNewConns(torrent)
return
t.Conns = t.Conns[:i1]
return true
}
panic("connection not found")
return false
}
func (me *Client) dropConnection(t *torrent, c *connection) {
me.event.Broadcast()
c.Close()
if me.deleteConnection(t, c) {
me.openNewConns(t)
}
}
// Returns true if the connection is added.
func (me *Client) addConnection(t *torrent, c *connection) bool {
if me.stopped() {
return false
@ -1721,13 +1724,19 @@ func (me *Client) addConnection(t *torrent, c *connection) bool {
return false
}
}
t.Conns = append(t.Conns, c)
// TODO: This should probably be done by a routine that kills off bad
// connections, and extra connections killed here instead.
if len(t.Conns) > socketsPerTorrent {
wcs := t.worstConnsHeap(me)
heap.Pop(wcs).(*connection).Close()
if len(t.Conns) >= socketsPerTorrent {
c := t.worstBadConn(me)
if c == nil {
return false
}
log.Printf("%s: dropping connection to make room for new one: %s", t, c)
c.Close()
me.deleteConnection(t, c)
}
if len(t.Conns) >= socketsPerTorrent {
panic(len(t.Conns))
}
t.Conns = append(t.Conns, c)
return true
}
@ -1744,18 +1753,13 @@ func (t *torrent) needData() bool {
}
func (cl *Client) usefulConn(t *torrent, c *connection) bool {
// A 30 second grace for initial messages to go through.
if time.Since(c.completedHandshake) < 30*time.Second {
return true
select {
case <-c.closing:
return false
default:
}
if !t.haveInfo() {
if !c.supportsExtension("ut_metadata") {
return false
}
if time.Since(c.completedHandshake) < 2*time.Minute {
return true
}
return false
return c.supportsExtension("ut_metadata")
}
if cl.seeding(t) {
return c.PeerInterested
@ -1763,23 +1767,14 @@ func (cl *Client) usefulConn(t *torrent, c *connection) bool {
return t.connHasWantedPieces(c)
}
func (t *torrent) numGoodConns(cl *Client) (num int) {
for _, c := range t.Conns {
if cl.usefulConn(t, c) {
num++
}
}
return
}
func (me *Client) wantConns(t *torrent) bool {
if !me.seeding(t) && !t.needData() {
return false
}
if t.numGoodConns(me) >= socketsPerTorrent {
return false
if len(t.Conns) < socketsPerTorrent {
return true
}
return true
return t.worstBadConn(me) != nil
}
func (me *Client) openNewConns(t *torrent) {
@ -2162,9 +2157,6 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (T Torrent, new bool, err er
// From this point onwards, we can consider the torrent a part of the
// client.
if new {
t.pruneTimer = time.AfterFunc(0, func() {
cl.pruneConnectionsUnlocked(T.torrent)
})
if !cl.config.DisableTrackers {
go cl.announceTorrentTrackers(T.torrent)
}
@ -2175,35 +2167,6 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (T Torrent, new bool, err er
return
}
// Prunes unused connections. This is required to make space to dial for
// replacements.
func (cl *Client) pruneConnectionsUnlocked(t *torrent) {
select {
case <-t.ceasingNetworking:
return
case <-t.closing:
return
default:
}
cl.mu.Lock()
license := len(t.Conns) - (socketsPerTorrent+1)/2
for _, c := range t.Conns {
if license <= 0 {
break
}
if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute {
continue
}
if time.Now().Sub(c.completedHandshake) < time.Minute {
continue
}
c.Close()
license--
}
cl.mu.Unlock()
t.pruneTimer.Reset(pruneInterval)
}
func (me *Client) dropTorrent(infoHash InfoHash) (err error) {
t, ok := me.torrents[infoHash]
if !ok {

View File

@ -99,8 +99,6 @@ type torrent struct {
// Closed when .Info is set.
gotMetainfo chan struct{}
pruneTimer *time.Timer
}
func (t *torrent) pieceComplete(piece int) bool {
@ -131,13 +129,19 @@ func (t *torrent) addrActive(addr string) bool {
return false
}
func (t *torrent) worstConnsHeap(cl *Client) (wcs *worstConns) {
func (t *torrent) worstConns(cl *Client) (wcs *worstConns) {
wcs = &worstConns{
c: append([]*connection{}, t.Conns...),
c: make([]*connection, 0, len(t.Conns)),
t: t,
cl: cl,
}
heap.Init(wcs)
for _, c := range t.Conns {
select {
case <-c.closing:
default:
wcs.c = append(wcs.c, c)
}
}
return
}
@ -153,9 +157,6 @@ func (t *torrent) ceaseNetworking() {
for _, c := range t.Conns {
c.Close()
}
if t.pruneTimer != nil {
t.pruneTimer.Stop()
}
}
func (t *torrent) addPeer(p Peer) {
@ -728,3 +729,18 @@ func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
}
return
}
func (t *torrent) worstBadConn(cl *Client) *connection {
wcs := t.worstConns(cl)
heap.Init(wcs)
// A connection can only be bad if it's in the worst half, rounded down.
for wcs.Len() > (socketsPerTorrent+1)/2 {
c := heap.Pop(wcs).(*connection)
// Give connections 1 minute to prove themselves.
if time.Since(c.completedHandshake) < time.Minute {
continue
}
return c
}
return nil
}

View File

@ -3,7 +3,6 @@ package torrent
import (
"sync"
"testing"
"time"
"github.com/anacrolix/torrent/peer_protocol"
)
@ -46,7 +45,6 @@ func TestTorrentRequest(t *testing.T) {
func TestTorrentDoubleClose(t *testing.T) {
tt, err := newTorrent(InfoHash{})
tt.pruneTimer = time.NewTimer(0)
if err != nil {
t.Fatal(err)
}

View File

@ -29,13 +29,17 @@ func (me *worstConns) Push(x interface{}) {
type worstConnsSortKey struct {
useful bool
lastHelpful time.Time
connected time.Time
}
func (me worstConnsSortKey) Less(other worstConnsSortKey) bool {
if me.useful != other.useful {
return !me.useful
}
return me.lastHelpful.Before(other.lastHelpful)
if !me.lastHelpful.Equal(other.lastHelpful) {
return me.lastHelpful.Before(other.lastHelpful)
}
return me.connected.Before(other.connected)
}
func (me *worstConns) key(i int) (key worstConnsSortKey) {
@ -43,9 +47,13 @@ func (me *worstConns) key(i int) (key worstConnsSortKey) {
key.useful = me.cl.usefulConn(me.t, c)
if me.cl.seeding(me.t) {
key.lastHelpful = c.lastChunkSent
} else {
}
// Intentionally consider the last time a chunk was received when seeding,
// because we might go from seeding back to leeching.
if c.lastUsefulChunkReceived.After(key.lastHelpful) {
key.lastHelpful = c.lastUsefulChunkReceived
}
key.connected = c.completedHandshake
return
}