Move a bunch of Client methods onto more appropriate types

This commit is contained in:
Matt Joiner 2016-11-22 21:12:53 +11:00
parent 389898bb84
commit ebbd555e7b
3 changed files with 175 additions and 179 deletions

172
client.go
View File

@ -42,7 +42,7 @@ func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) {
}
piece.QueuedForHash = true
t.publishPieceChange(pieceIndex)
go cl.verifyPiece(t, pieceIndex)
go t.verifyPiece(pieceIndex)
}
// Queue a piece check if one isn't already queued, and the piece has never
@ -1081,64 +1081,6 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect
}
}
// Also handles choking and unchoking of the remote peer.
func (cl *Client) upload(t *Torrent, c *connection) {
if cl.config.NoUpload {
return
}
if !c.PeerInterested {
return
}
seeding := t.seeding()
if !seeding && !t.connHasWantedPieces(c) {
// There's no reason to upload to this peer.
return
}
// Breaking or completing this loop means we don't want to upload to the
// peer anymore, and we choke them.
another:
for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
// We want to upload to the peer.
c.Unchoke()
for r := range c.PeerRequests {
res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
delay := res.Delay()
if delay > 0 {
res.Cancel()
go func() {
time.Sleep(delay)
cl.mu.Lock()
defer cl.mu.Unlock()
cl.upload(t, c)
}()
return
}
err := cl.sendChunk(t, c, r)
if err != nil {
i := int(r.Index)
if t.pieceComplete(i) {
t.updatePieceCompletion(i)
if !t.pieceComplete(i) {
// We had the piece, but not anymore.
break another
}
}
log.Printf("error sending chunk %+v to peer: %s", r, err)
// If we failed to send a chunk, choke the peer to ensure they
// flush all their requests. We've probably dropped a piece,
// but there's no way to communicate this to the peer. If they
// ask for it again, we'll kick them to allow us to send them
// an updated bitfield.
break another
}
delete(c.PeerRequests, r)
goto another
}
return
}
c.Choke()
}
func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
// Count the chunk being sent, even if it isn't.
b := make([]byte, r.Length)
@ -1332,118 +1274,6 @@ func (cl *Client) WaitAll() bool {
return true
}
// Return the connections that touched a piece, and clear the entry while
// doing it.
func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
for _, c := range t.conns {
if _, ok := c.peerTouchedPieces[piece]; ok {
ret = append(ret, c)
delete(c.peerTouchedPieces, piece)
}
}
return
}
func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
if t.closed.IsSet() {
return
}
p := &t.pieces[piece]
if p.EverHashed {
// Don't score the first time a piece is hashed, it could be an
// initial check.
if correct {
pieceHashedCorrect.Add(1)
} else {
log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
pieceHashedNotCorrect.Add(1)
}
}
p.EverHashed = true
touchers := cl.reapPieceTouches(t, piece)
if correct {
for _, c := range touchers {
c.goodPiecesDirtied++
}
err := p.Storage().MarkComplete()
if err != nil {
log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
}
t.updatePieceCompletion(piece)
} else if len(touchers) != 0 {
log.Printf("dropping and banning %d conns that touched piece", len(touchers))
for _, c := range touchers {
c.badPiecesDirtied++
t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
t.dropConnection(c)
}
}
cl.pieceChanged(t, piece)
}
func (cl *Client) onCompletedPiece(t *Torrent, piece int) {
t.pendingPieces.Remove(piece)
t.pendAllChunkSpecs(piece)
for _, conn := range t.conns {
conn.Have(piece)
for r := range conn.Requests {
if int(r.Index) == piece {
conn.Cancel(r)
}
}
// Could check here if peer doesn't have piece, but due to caching
// some peers may have said they have a piece but they don't.
cl.upload(t, conn)
}
}
func (cl *Client) onFailedPiece(t *Torrent, piece int) {
if t.pieceAllDirty(piece) {
t.pendAllChunkSpecs(piece)
}
if !t.wantPieceIndex(piece) {
return
}
cl.openNewConns(t)
for _, conn := range t.conns {
if conn.PeerHasPiece(piece) {
conn.updateRequests()
}
}
}
func (cl *Client) pieceChanged(t *Torrent, piece int) {
correct := t.pieceComplete(piece)
defer cl.event.Broadcast()
if correct {
cl.onCompletedPiece(t, piece)
} else {
cl.onFailedPiece(t, piece)
}
t.updatePiecePriority(piece)
}
func (cl *Client) verifyPiece(t *Torrent, piece int) {
cl.mu.Lock()
defer cl.mu.Unlock()
p := &t.pieces[piece]
for p.Hashing || t.storage == nil {
cl.event.Wait()
}
p.QueuedForHash = false
if t.closed.IsSet() || t.pieceComplete(piece) {
t.updatePiecePriority(piece)
return
}
p.Hashing = true
t.publishPieceChange(piece)
cl.mu.Unlock()
sum := t.hashPiece(piece)
cl.mu.Lock()
p.Hashing = false
cl.pieceHashed(t, piece, sum == p.Hash)
}
// Returns handles to all the torrents loaded in the Client.
func (cl *Client) Torrents() (ret []*Torrent) {
cl.mu.Lock()

View File

@ -741,7 +741,7 @@ func (c *connection) mainReadLoop() error {
cl.peerUnchoked(t, c)
case pp.Interested:
c.PeerInterested = true
cl.upload(t, c)
c.upload()
case pp.NotInterested:
c.PeerInterested = false
c.Choke()
@ -767,7 +767,7 @@ func (c *connection) mainReadLoop() error {
c.PeerRequests = make(map[request]struct{}, maxRequests)
}
c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
cl.upload(t, c)
c.upload()
case pp.Cancel:
req := newRequest(msg.Index, msg.Begin, msg.Length)
if !c.PeerCancel(req) {
@ -955,7 +955,7 @@ func (c *connection) receiveChunk(msg *pp.Message) {
c.UsefulChunksReceived++
c.lastUsefulChunkReceived = time.Now()
cl.upload(t, c)
c.upload()
// Need to record that it hasn't been written yet, before we attempt to do
// anything with it.
@ -1000,3 +1000,63 @@ func (c *connection) receiveChunk(msg *pp.Message) {
t.publishPieceChange(int(req.Index))
return
}
// Also handles choking and unchoking of the remote peer.
func (c *connection) upload() {
t := c.t
cl := t.cl
if cl.config.NoUpload {
return
}
if !c.PeerInterested {
return
}
seeding := t.seeding()
if !seeding && !t.connHasWantedPieces(c) {
// There's no reason to upload to this peer.
return
}
// Breaking or completing this loop means we don't want to upload to the
// peer anymore, and we choke them.
another:
for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
// We want to upload to the peer.
c.Unchoke()
for r := range c.PeerRequests {
res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
delay := res.Delay()
if delay > 0 {
res.Cancel()
go func() {
time.Sleep(delay)
cl.mu.Lock()
defer cl.mu.Unlock()
c.upload()
}()
return
}
err := cl.sendChunk(t, c, r)
if err != nil {
i := int(r.Index)
if t.pieceComplete(i) {
t.updatePieceCompletion(i)
if !t.pieceComplete(i) {
// We had the piece, but not anymore.
break another
}
}
log.Printf("error sending chunk %+v to peer: %s", r, err)
// If we failed to send a chunk, choke the peer to ensure they
// flush all their requests. We've probably dropped a piece,
// but there's no way to communicate this to the peer. If they
// ask for it again, we'll kick them to allow us to send them
// an updated bitfield.
break another
}
delete(c.PeerRequests, r)
goto another
}
return
}
c.Choke()
}

View File

@ -296,10 +296,6 @@ func (t *Torrent) setInfoBytes(b []byte) error {
return nil
}
func (t *Torrent) verifyPiece(piece int) {
t.cl.verifyPiece(t, piece)
}
func (t *Torrent) haveAllMetadataPieces() bool {
if t.haveInfo() {
return true
@ -1009,7 +1005,14 @@ func (t *Torrent) pendRequest(req request) {
}
func (t *Torrent) pieceChanged(piece int) {
t.cl.pieceChanged(t, piece)
correct := t.pieceComplete(piece)
defer t.cl.event.Broadcast()
if correct {
t.onCompletedPiece(piece)
} else {
t.onFailedPiece(piece)
}
t.updatePiecePriority(piece)
}
func (t *Torrent) openNewConns() {
@ -1415,3 +1418,106 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
func (t *Torrent) mu() missinggo.RWLocker {
return &t.cl.mu
}
func (t *Torrent) pieceHashed(piece int, correct bool) {
if t.closed.IsSet() {
return
}
p := &t.pieces[piece]
if p.EverHashed {
// Don't score the first time a piece is hashed, it could be an
// initial check.
if correct {
pieceHashedCorrect.Add(1)
} else {
log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
pieceHashedNotCorrect.Add(1)
}
}
p.EverHashed = true
touchers := t.reapPieceTouches(piece)
if correct {
for _, c := range touchers {
c.goodPiecesDirtied++
}
err := p.Storage().MarkComplete()
if err != nil {
log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
}
t.updatePieceCompletion(piece)
} else if len(touchers) != 0 {
log.Printf("dropping and banning %d conns that touched piece", len(touchers))
for _, c := range touchers {
c.badPiecesDirtied++
t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
t.dropConnection(c)
}
}
t.pieceChanged(piece)
}
func (t *Torrent) onCompletedPiece(piece int) {
t.pendingPieces.Remove(piece)
t.pendAllChunkSpecs(piece)
for _, conn := range t.conns {
conn.Have(piece)
for r := range conn.Requests {
if int(r.Index) == piece {
conn.Cancel(r)
}
}
// Could check here if peer doesn't have piece, but due to caching
// some peers may have said they have a piece but they don't.
conn.upload()
}
}
func (t *Torrent) onFailedPiece(piece int) {
cl := t.cl
if t.pieceAllDirty(piece) {
t.pendAllChunkSpecs(piece)
}
if !t.wantPieceIndex(piece) {
return
}
cl.openNewConns(t)
for _, conn := range t.conns {
if conn.PeerHasPiece(piece) {
conn.updateRequests()
}
}
}
func (t *Torrent) verifyPiece(piece int) {
cl := t.cl
cl.mu.Lock()
defer cl.mu.Unlock()
p := &t.pieces[piece]
for p.Hashing || t.storage == nil {
cl.event.Wait()
}
p.QueuedForHash = false
if t.closed.IsSet() || t.pieceComplete(piece) {
t.updatePiecePriority(piece)
return
}
p.Hashing = true
t.publishPieceChange(piece)
cl.mu.Unlock()
sum := t.hashPiece(piece)
cl.mu.Lock()
p.Hashing = false
t.pieceHashed(piece, sum == p.Hash)
}
// Return the connections that touched a piece, and clear the entry while
// doing it.
func (t *Torrent) reapPieceTouches(piece int) (ret []*connection) {
for _, c := range t.conns {
if _, ok := c.peerTouchedPieces[piece]; ok {
ret = append(ret, c)
delete(c.peerTouchedPieces, piece)
}
}
return
}