From ced5733c881f82d0958ca15c5de072f45d01d15a Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 16 Jun 2015 16:57:47 +1000 Subject: [PATCH] Improve uploading/seeding --- client.go | 126 ++++++++++++++++++++++++++++--------------- client_test.go | 3 +- connection.go | 1 + fs/torrentfs_test.go | 1 + piece.go | 3 +- torrent.go | 17 +++--- worst_conns.go | 43 ++++++--------- 7 files changed, 116 insertions(+), 78 deletions(-) diff --git a/client.go b/client.go index 1f119900..5f9420fd 100644 --- a/client.go +++ b/client.go @@ -232,7 +232,7 @@ func (cl *Client) WriteStatus(_w io.Writer) { w.WriteString("") } fmt.Fprint(w, "\n") - t.writeStatus(w) + t.writeStatus(w, cl) fmt.Fprintln(w) } } @@ -342,7 +342,6 @@ func (t *torrent) connPendPiece(c *connection, piece int) { func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) { if t.Pieces[piece].Priority < priority { cl.prioritizePiece(t, piece, priority) - cl.event.Broadcast() } } @@ -1404,6 +1403,54 @@ func (cl *Client) peerHasAll(t *torrent, cn *connection) { } } +func (me *Client) upload(t *torrent, c *connection) { + if me.config.NoUpload { + return + } + if !c.PeerInterested { + return + } + if !me.seeding(t) && !t.connHasWantedPieces(c) { + return + } +another: + for c.chunksSent < c.UsefulChunksReceived+6 { + c.Unchoke() + for r := range c.PeerRequests { + err := me.sendChunk(t, c, r) + if err != nil { + log.Printf("error sending chunk to peer: %s", err) + } + delete(c.PeerRequests, r) + goto another + } + return + } + c.Choke() +} + +func (me *Client) sendChunk(t *torrent, c *connection, r request) error { + b := make([]byte, r.Length) + p := t.Info.Piece(int(r.Index)) + n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin)) + if err != nil { + return err + } + if n != len(b) { + log.Fatal(b) + } + c.Post(pp.Message{ + Type: pp.Piece, + Index: r.Index, + Begin: r.Begin, + Piece: b, + }) + uploadChunksPosted.Add(1) + c.chunksSent++ + c.lastChunkSent = time.Now() + return nil +} + // Processes incoming bittorrent messages. The client lock is held upon entry // and exit. func (me *Client) connectionLoop(t *torrent, c *connection) error { @@ -1448,11 +1495,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { me.peerUnchoked(t, c) case pp.Interested: c.PeerInterested = true - // TODO: This should be done from a dedicated unchoking routine. - if me.config.NoUpload { - break - } - c.Unchoke() + me.upload(t, c) case pp.NotInterested: c.PeerInterested = false c.Choke() @@ -1462,30 +1505,15 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { if c.Choked { break } - request := newRequest(msg.Index, msg.Begin, msg.Length) - // TODO: Requests should be satisfied from a dedicated upload - // routine. - // c.PeerRequests[request] = struct{}{} - // if c.PeerRequests == nil { - // c.PeerRequests = make(map[request]struct{}, maxRequests) - // } - p := make([]byte, msg.Length) - n, err := dataReadAt(t.data, p, int64(t.pieceLength(0))*int64(msg.Index)+int64(msg.Begin)) - // TODO: Failing to read for a request should not be fatal to the connection. - if err != nil { - return fmt.Errorf("reading t data to serve request %q: %s", request, err) + if !c.PeerInterested { + err = errors.New("peer sent request but isn't interested") + break } - if n != int(msg.Length) { - return fmt.Errorf("bad request: %v", msg) + if c.PeerRequests == nil { + c.PeerRequests = make(map[request]struct{}, maxRequests) } - c.Post(pp.Message{ - Type: pp.Piece, - Index: msg.Index, - Begin: msg.Begin, - Piece: p, - }) - uploadChunksPosted.Add(1) - c.chunksSent++ + c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{} + me.upload(t, c) case pp.Cancel: req := newRequest(msg.Index, msg.Begin, msg.Length) if !c.PeerCancel(req) { @@ -1699,7 +1727,7 @@ func (me *Client) addConnection(t *torrent, c *connection) bool { // TODO: This should probably be done by a routine that kills off bad // connections, and extra connections killed here instead. if len(t.Conns) > socketsPerTorrent { - wcs := t.worstConnsHeap() + wcs := t.worstConnsHeap(me) heap.Pop(wcs).(*connection).Close() } return true @@ -1717,26 +1745,29 @@ func (t *torrent) needData() bool { return false } -// TODO: I'm sure there's something here to do with seeding. -func (t *torrent) badConn(c *connection) bool { +func (cl *Client) usefulConn(t *torrent, c *connection) bool { // A 30 second grace for initial messages to go through. if time.Since(c.completedHandshake) < 30*time.Second { - return false + return true } if !t.haveInfo() { if !c.supportsExtension("ut_metadata") { + return false + } + if time.Since(c.completedHandshake) < 2*time.Minute { return true } - if time.Since(c.completedHandshake) > 2*time.Minute { - return true - } + return false } - return !t.connHasWantedPieces(c) + if cl.seeding(t) { + return c.PeerInterested + } + return t.connHasWantedPieces(c) } -func (t *torrent) numGoodConns() (num int) { +func (t *torrent) numGoodConns(cl *Client) (num int) { for _, c := range t.Conns { - if !t.badConn(c) { + if cl.usefulConn(t, c) { num++ } } @@ -1744,10 +1775,10 @@ func (t *torrent) numGoodConns() (num int) { } func (me *Client) wantConns(t *torrent) bool { - if me.config.NoUpload && !t.needData() { + if !me.seeding(t) && !t.needData() { return false } - if t.numGoodConns() >= socketsPerTorrent { + if t.numGoodConns(me) >= socketsPerTorrent { return false } return true @@ -2228,7 +2259,16 @@ func (cl *Client) waitWantPeers(t *torrent) bool { // Returns whether the client should make effort to seed the torrent. func (cl *Client) seeding(t *torrent) bool { - return cl.config.Seed && !cl.config.NoUpload + if cl.config.NoUpload { + return false + } + if !cl.config.Seed { + return false + } + if t.needData() { + return false + } + return true } func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) { @@ -2515,6 +2555,8 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er c.UsefulChunksReceived++ c.lastUsefulChunkReceived = time.Now() + me.upload(t, c) + // Write the chunk out. err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) if err != nil { diff --git a/client_test.go b/client_test.go index 0e924bb2..8c7cc344 100644 --- a/client_test.go +++ b/client_test.go @@ -18,8 +18,8 @@ import ( "gopkg.in/check.v1" "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/data/blob" "github.com/anacrolix/torrent/data" + "github.com/anacrolix/torrent/data/blob" "github.com/anacrolix/torrent/internal/testutil" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/util" @@ -251,6 +251,7 @@ func TestClientTransfer(t *testing.T) { greetingTempDir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(greetingTempDir) cfg := TestingConfig + cfg.Seed = true cfg.DataDir = greetingTempDir seeder, err := NewClient(&cfg) if err != nil { diff --git a/connection.go b/connection.go index 943e1a7c..a57462b3 100644 --- a/connection.go +++ b/connection.go @@ -52,6 +52,7 @@ type connection struct { lastMessageReceived time.Time completedHandshake time.Time lastUsefulChunkReceived time.Time + lastChunkSent time.Time // Stuff controlled by the local peer. Interested bool diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index b77dab2f..1d5dc100 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -170,6 +170,7 @@ func TestDownloadOnDemand(t *testing.T) { DisableTrackers: true, NoDHT: true, ListenAddr: ":0", + Seed: true, NoDefaultBlocklist: true, // Ensure that the metainfo is obtained over the wire, since we added diff --git a/piece.go b/piece.go index bae74094..a5e9947b 100644 --- a/piece.go +++ b/piece.go @@ -20,7 +20,8 @@ const ( ) type piece struct { - Hash pieceSum // The completed piece SHA1 hash, from the metainfo "pieces" field. + // The completed piece SHA1 hash, from the metainfo "pieces" field. + Hash pieceSum // Chunks we don't have. The offset and length can be determined by the // request chunkSize in use. PendingChunkSpecs []bool diff --git a/torrent.go b/torrent.go index e32175e9..30afe1a9 100644 --- a/torrent.go +++ b/torrent.go @@ -131,10 +131,11 @@ func (t *torrent) addrActive(addr string) bool { return false } -func (t *torrent) worstConnsHeap() (wcs *worstConns) { +func (t *torrent) worstConnsHeap(cl *Client) (wcs *worstConns) { wcs = &worstConns{ - c: append([]*connection{}, t.Conns...), - t: t, + c: append([]*connection{}, t.Conns...), + t: t, + cl: cl, } heap.Init(wcs) return @@ -376,7 +377,7 @@ func pieceStateRunStatusChars(psr PieceStateRun) (ret string) { return } -func (t *torrent) writeStatus(w io.Writer) { +func (t *torrent) writeStatus(w io.Writer, cl *Client) { fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash) fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize()) fmt.Fprintf(w, "Metadata have: ") @@ -421,8 +422,9 @@ func (t *torrent) writeStatus(w io.Writer) { fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen)) fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns)) sort.Sort(&worstConns{ - c: t.Conns, - t: t, + c: t.Conns, + t: t, + cl: cl, }) for _, c := range t.Conns { c.WriteStatus(w, t) @@ -685,8 +687,9 @@ func (t *torrent) wantChunk(r request) bool { } func (t *torrent) urgentChunkInPiece(piece int) bool { + p := pp.Integer(piece) for req := range t.urgent { - if int(req.Index) == piece { + if req.Index == p { return true } } diff --git a/worst_conns.go b/worst_conns.go index 69f67e93..0d7b9d0d 100644 --- a/worst_conns.go +++ b/worst_conns.go @@ -6,12 +6,13 @@ import ( // Implements heap functions such that [0] is the worst connection. type worstConns struct { - c []*connection - t *torrent + c []*connection + t *torrent + cl *Client } -func (me worstConns) Len() int { return len(me.c) } -func (me worstConns) Swap(i, j int) { me.c[i], me.c[j] = me.c[j], me.c[i] } +func (me *worstConns) Len() int { return len(me.c) } +func (me *worstConns) Swap(i, j int) { me.c[i], me.c[j] = me.c[j], me.c[i] } func (me *worstConns) Pop() (ret interface{}) { old := me.c @@ -26,37 +27,25 @@ func (me *worstConns) Push(x interface{}) { } type worstConnsSortKey struct { - // Peer has something we want. - useless bool - // A fabricated duration since peer was last helpful. - age time.Duration + useful bool + lastHelpful time.Time } func (me worstConnsSortKey) Less(other worstConnsSortKey) bool { - if me.useless != other.useless { - return me.useless + if me.useful != other.useful { + return !me.useful } - return me.age > other.age + return me.lastHelpful.Before(other.lastHelpful) } -func (me worstConns) key(i int) (key worstConnsSortKey) { +func (me *worstConns) key(i int) (key worstConnsSortKey) { c := me.c[i] - // Peer has had time to declare what they have. - if time.Now().Sub(c.completedHandshake) >= 30*time.Second { - if !me.t.haveInfo() { - key.useless = !c.supportsExtension("ut_metadata") - } else { - if !me.t.connHasWantedPieces(c) { - key.useless = true - } - } + key.useful = me.cl.usefulConn(me.t, c) + if me.cl.seeding(me.t) { + key.lastHelpful = c.lastChunkSent + } else { + key.lastHelpful = c.lastUsefulChunkReceived } - key.age = time.Duration(1+3*c.UnwantedChunksReceived) * time.Now().Sub(func() time.Time { - if !c.lastUsefulChunkReceived.IsZero() { - return c.lastUsefulChunkReceived - } - return c.completedHandshake.Add(-time.Minute) - }()) / time.Duration(1+c.UsefulChunksReceived) return }