From ebbd555e7b2eb9755cc8dfef55eed26d49aa0be2 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 22 Nov 2016 21:12:53 +1100 Subject: [PATCH] Move a bunch of Client methods onto more appropriate types --- client.go | 172 +------------------------------------------------- connection.go | 66 ++++++++++++++++++- torrent.go | 116 ++++++++++++++++++++++++++++++++-- 3 files changed, 175 insertions(+), 179 deletions(-) diff --git a/client.go b/client.go index 7ce5686f..8c356eea 100644 --- a/client.go +++ b/client.go @@ -42,7 +42,7 @@ func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) { } piece.QueuedForHash = true t.publishPieceChange(pieceIndex) - go cl.verifyPiece(t, pieceIndex) + go t.verifyPiece(pieceIndex) } // Queue a piece check if one isn't already queued, and the piece has never @@ -1081,64 +1081,6 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect } } -// Also handles choking and unchoking of the remote peer. -func (cl *Client) upload(t *Torrent, c *connection) { - if cl.config.NoUpload { - return - } - if !c.PeerInterested { - return - } - seeding := t.seeding() - if !seeding && !t.connHasWantedPieces(c) { - // There's no reason to upload to this peer. - return - } - // Breaking or completing this loop means we don't want to upload to the - // peer anymore, and we choke them. -another: - for seeding || c.chunksSent < c.UsefulChunksReceived+6 { - // We want to upload to the peer. - c.Unchoke() - for r := range c.PeerRequests { - res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length)) - delay := res.Delay() - if delay > 0 { - res.Cancel() - go func() { - time.Sleep(delay) - cl.mu.Lock() - defer cl.mu.Unlock() - cl.upload(t, c) - }() - return - } - err := cl.sendChunk(t, c, r) - if err != nil { - i := int(r.Index) - if t.pieceComplete(i) { - t.updatePieceCompletion(i) - if !t.pieceComplete(i) { - // We had the piece, but not anymore. - break another - } - } - log.Printf("error sending chunk %+v to peer: %s", r, err) - // If we failed to send a chunk, choke the peer to ensure they - // flush all their requests. We've probably dropped a piece, - // but there's no way to communicate this to the peer. If they - // ask for it again, we'll kick them to allow us to send them - // an updated bitfield. - break another - } - delete(c.PeerRequests, r) - goto another - } - return - } - c.Choke() -} - func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error { // Count the chunk being sent, even if it isn't. b := make([]byte, r.Length) @@ -1332,118 +1274,6 @@ func (cl *Client) WaitAll() bool { return true } -// Return the connections that touched a piece, and clear the entry while -// doing it. -func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) { - for _, c := range t.conns { - if _, ok := c.peerTouchedPieces[piece]; ok { - ret = append(ret, c) - delete(c.peerTouchedPieces, piece) - } - } - return -} - -func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) { - if t.closed.IsSet() { - return - } - p := &t.pieces[piece] - if p.EverHashed { - // Don't score the first time a piece is hashed, it could be an - // initial check. - if correct { - pieceHashedCorrect.Add(1) - } else { - log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash) - pieceHashedNotCorrect.Add(1) - } - } - p.EverHashed = true - touchers := cl.reapPieceTouches(t, piece) - if correct { - for _, c := range touchers { - c.goodPiecesDirtied++ - } - err := p.Storage().MarkComplete() - if err != nil { - log.Printf("%T: error completing piece %d: %s", t.storage, piece, err) - } - t.updatePieceCompletion(piece) - } else if len(touchers) != 0 { - log.Printf("dropping and banning %d conns that touched piece", len(touchers)) - for _, c := range touchers { - c.badPiecesDirtied++ - t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr())) - t.dropConnection(c) - } - } - cl.pieceChanged(t, piece) -} - -func (cl *Client) onCompletedPiece(t *Torrent, piece int) { - t.pendingPieces.Remove(piece) - t.pendAllChunkSpecs(piece) - for _, conn := range t.conns { - conn.Have(piece) - for r := range conn.Requests { - if int(r.Index) == piece { - conn.Cancel(r) - } - } - // Could check here if peer doesn't have piece, but due to caching - // some peers may have said they have a piece but they don't. - cl.upload(t, conn) - } -} - -func (cl *Client) onFailedPiece(t *Torrent, piece int) { - if t.pieceAllDirty(piece) { - t.pendAllChunkSpecs(piece) - } - if !t.wantPieceIndex(piece) { - return - } - cl.openNewConns(t) - for _, conn := range t.conns { - if conn.PeerHasPiece(piece) { - conn.updateRequests() - } - } -} - -func (cl *Client) pieceChanged(t *Torrent, piece int) { - correct := t.pieceComplete(piece) - defer cl.event.Broadcast() - if correct { - cl.onCompletedPiece(t, piece) - } else { - cl.onFailedPiece(t, piece) - } - t.updatePiecePriority(piece) -} - -func (cl *Client) verifyPiece(t *Torrent, piece int) { - cl.mu.Lock() - defer cl.mu.Unlock() - p := &t.pieces[piece] - for p.Hashing || t.storage == nil { - cl.event.Wait() - } - p.QueuedForHash = false - if t.closed.IsSet() || t.pieceComplete(piece) { - t.updatePiecePriority(piece) - return - } - p.Hashing = true - t.publishPieceChange(piece) - cl.mu.Unlock() - sum := t.hashPiece(piece) - cl.mu.Lock() - p.Hashing = false - cl.pieceHashed(t, piece, sum == p.Hash) -} - // Returns handles to all the torrents loaded in the Client. func (cl *Client) Torrents() (ret []*Torrent) { cl.mu.Lock() diff --git a/connection.go b/connection.go index fbc888c3..06cb7ca8 100644 --- a/connection.go +++ b/connection.go @@ -741,7 +741,7 @@ func (c *connection) mainReadLoop() error { cl.peerUnchoked(t, c) case pp.Interested: c.PeerInterested = true - cl.upload(t, c) + c.upload() case pp.NotInterested: c.PeerInterested = false c.Choke() @@ -767,7 +767,7 @@ func (c *connection) mainReadLoop() error { c.PeerRequests = make(map[request]struct{}, maxRequests) } c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{} - cl.upload(t, c) + c.upload() case pp.Cancel: req := newRequest(msg.Index, msg.Begin, msg.Length) if !c.PeerCancel(req) { @@ -955,7 +955,7 @@ func (c *connection) receiveChunk(msg *pp.Message) { c.UsefulChunksReceived++ c.lastUsefulChunkReceived = time.Now() - cl.upload(t, c) + c.upload() // Need to record that it hasn't been written yet, before we attempt to do // anything with it. @@ -1000,3 +1000,63 @@ func (c *connection) receiveChunk(msg *pp.Message) { t.publishPieceChange(int(req.Index)) return } + +// Also handles choking and unchoking of the remote peer. +func (c *connection) upload() { + t := c.t + cl := t.cl + if cl.config.NoUpload { + return + } + if !c.PeerInterested { + return + } + seeding := t.seeding() + if !seeding && !t.connHasWantedPieces(c) { + // There's no reason to upload to this peer. + return + } + // Breaking or completing this loop means we don't want to upload to the + // peer anymore, and we choke them. +another: + for seeding || c.chunksSent < c.UsefulChunksReceived+6 { + // We want to upload to the peer. + c.Unchoke() + for r := range c.PeerRequests { + res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length)) + delay := res.Delay() + if delay > 0 { + res.Cancel() + go func() { + time.Sleep(delay) + cl.mu.Lock() + defer cl.mu.Unlock() + c.upload() + }() + return + } + err := cl.sendChunk(t, c, r) + if err != nil { + i := int(r.Index) + if t.pieceComplete(i) { + t.updatePieceCompletion(i) + if !t.pieceComplete(i) { + // We had the piece, but not anymore. + break another + } + } + log.Printf("error sending chunk %+v to peer: %s", r, err) + // If we failed to send a chunk, choke the peer to ensure they + // flush all their requests. We've probably dropped a piece, + // but there's no way to communicate this to the peer. If they + // ask for it again, we'll kick them to allow us to send them + // an updated bitfield. + break another + } + delete(c.PeerRequests, r) + goto another + } + return + } + c.Choke() +} diff --git a/torrent.go b/torrent.go index 2577960c..c2baf9c1 100644 --- a/torrent.go +++ b/torrent.go @@ -296,10 +296,6 @@ func (t *Torrent) setInfoBytes(b []byte) error { return nil } -func (t *Torrent) verifyPiece(piece int) { - t.cl.verifyPiece(t, piece) -} - func (t *Torrent) haveAllMetadataPieces() bool { if t.haveInfo() { return true @@ -1009,7 +1005,14 @@ func (t *Torrent) pendRequest(req request) { } func (t *Torrent) pieceChanged(piece int) { - t.cl.pieceChanged(t, piece) + correct := t.pieceComplete(piece) + defer t.cl.event.Broadcast() + if correct { + t.onCompletedPiece(piece) + } else { + t.onFailedPiece(piece) + } + t.updatePiecePriority(piece) } func (t *Torrent) openNewConns() { @@ -1415,3 +1418,106 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { func (t *Torrent) mu() missinggo.RWLocker { return &t.cl.mu } + +func (t *Torrent) pieceHashed(piece int, correct bool) { + if t.closed.IsSet() { + return + } + p := &t.pieces[piece] + if p.EverHashed { + // Don't score the first time a piece is hashed, it could be an + // initial check. + if correct { + pieceHashedCorrect.Add(1) + } else { + log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash) + pieceHashedNotCorrect.Add(1) + } + } + p.EverHashed = true + touchers := t.reapPieceTouches(piece) + if correct { + for _, c := range touchers { + c.goodPiecesDirtied++ + } + err := p.Storage().MarkComplete() + if err != nil { + log.Printf("%T: error completing piece %d: %s", t.storage, piece, err) + } + t.updatePieceCompletion(piece) + } else if len(touchers) != 0 { + log.Printf("dropping and banning %d conns that touched piece", len(touchers)) + for _, c := range touchers { + c.badPiecesDirtied++ + t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr())) + t.dropConnection(c) + } + } + t.pieceChanged(piece) +} + +func (t *Torrent) onCompletedPiece(piece int) { + t.pendingPieces.Remove(piece) + t.pendAllChunkSpecs(piece) + for _, conn := range t.conns { + conn.Have(piece) + for r := range conn.Requests { + if int(r.Index) == piece { + conn.Cancel(r) + } + } + // Could check here if peer doesn't have piece, but due to caching + // some peers may have said they have a piece but they don't. + conn.upload() + } +} + +func (t *Torrent) onFailedPiece(piece int) { + cl := t.cl + if t.pieceAllDirty(piece) { + t.pendAllChunkSpecs(piece) + } + if !t.wantPieceIndex(piece) { + return + } + cl.openNewConns(t) + for _, conn := range t.conns { + if conn.PeerHasPiece(piece) { + conn.updateRequests() + } + } +} + +func (t *Torrent) verifyPiece(piece int) { + cl := t.cl + cl.mu.Lock() + defer cl.mu.Unlock() + p := &t.pieces[piece] + for p.Hashing || t.storage == nil { + cl.event.Wait() + } + p.QueuedForHash = false + if t.closed.IsSet() || t.pieceComplete(piece) { + t.updatePiecePriority(piece) + return + } + p.Hashing = true + t.publishPieceChange(piece) + cl.mu.Unlock() + sum := t.hashPiece(piece) + cl.mu.Lock() + p.Hashing = false + t.pieceHashed(piece, sum == p.Hash) +} + +// Return the connections that touched a piece, and clear the entry while +// doing it. +func (t *Torrent) reapPieceTouches(piece int) (ret []*connection) { + for _, c := range t.conns { + if _, ok := c.peerTouchedPieces[piece]; ok { + ret = append(ret, c) + delete(c.peerTouchedPieces, piece) + } + } + return +}