Restrict the number of concurrent piece hashes

This fixes bad behaviour where running out of file descriptors, and overloading the system with goroutines and concurrent I/O may cause newly started torrents to fail to hash data.
This commit is contained in:
Matt Joiner 2019-08-21 13:31:33 +10:00
parent ee5e459ea3
commit 4850ce6ab3
1 changed files with 49 additions and 28 deletions

View File

@ -131,6 +131,7 @@ type Torrent struct {
completedPieces bitmap.Bitmap
// Pieces that need to be hashed.
piecesQueuedForHash bitmap.Bitmap
activePieceHashes int
// A pool of piece priorities []int for assignment to new connections.
// These "inclinations" are used to give connections preference for
@ -382,6 +383,7 @@ func (t *Torrent) onSetInfo() {
t.updateWantPeersEvent()
t.pendingRequests = make(map[request]int)
t.lastRequested = make(map[request]*time.Timer)
t.tryCreateMorePieceHashers()
}
// Called when metadata for a torrent becomes available.
@ -1503,11 +1505,13 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
}
func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
t.logger.Log(log.Fstr("hashed piece %d (passed=%t)", piece, correct).WithValues(debugLogValue))
p := t.piece(piece)
p.numVerifies++
t.cl.event.Broadcast()
if t.closed.IsSet() {
return
}
p := &t.pieces[piece]
touchers := t.reapPieceTouchers(piece)
if p.storageCompletionOk {
// Don't score the first time a piece is hashed, it could be an
@ -1602,37 +1606,54 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) {
}
}
func (t *Torrent) verifyPiece(piece pieceIndex) {
cl := t.cl
cl.lock()
defer cl.unlock()
p := &t.pieces[piece]
defer func() {
p.numVerifies++
cl.event.Broadcast()
}()
for p.hashing || t.storage == nil {
cl.event.Wait()
func (t *Torrent) tryCreateMorePieceHashers() {
for t.activePieceHashes < 2 && t.tryCreatePieceHasher() {
}
if !p.t.piecesQueuedForHash.Remove(bitmap.BitIndex(piece)) {
panic("piece was not queued")
}
func (t *Torrent) tryCreatePieceHasher() bool {
if t.storage == nil {
return false
}
t.updatePiecePriority(piece)
if t.closed.IsSet() {
return
pi, ok := t.getPieceToHash()
if !ok {
return false
}
p := t.piece(pi)
t.piecesQueuedForHash.Remove(pi)
p.hashing = true
t.publishPieceChange(piece)
t.updatePiecePriority(piece)
t.publishPieceChange(pi)
t.updatePiecePriority(pi)
t.storageLock.RLock()
cl.unlock()
sum := t.hashPiece(piece)
t.activePieceHashes++
go t.pieceHasher(pi)
return true
}
func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
if t.piece(i).hashing {
return true
}
ret = i
ok = true
return false
})
return
}
func (t *Torrent) pieceHasher(index pieceIndex) {
p := t.piece(index)
sum := t.hashPiece(index)
t.storageLock.RUnlock()
cl.lock()
t.cl.lock()
defer t.cl.unlock()
p.hashing = false
t.updatePiecePriority(piece)
t.pieceHashed(piece, sum == *p.hash)
t.publishPieceChange(piece)
t.updatePiecePriority(index)
t.pieceHashed(index, sum == *p.hash)
t.publishPieceChange(index)
t.activePieceHashes--
t.tryCreateMorePieceHashers()
}
// Return the connections that touched a piece, and clear the entries while
@ -1655,14 +1676,14 @@ func (t *Torrent) connsAsSlice() (ret []*connection) {
// Currently doesn't really queue, but should in the future.
func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
piece := &t.pieces[pieceIndex]
piece := t.piece(pieceIndex)
if piece.queuedForHash() {
return
}
t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
t.publishPieceChange(pieceIndex)
t.updatePiecePriority(pieceIndex)
go t.verifyPiece(pieceIndex)
t.tryCreateMorePieceHashers()
}
// Forces all the pieces to be re-hashed. See also Piece.VerifyData.