Add pending write helpers

This commit is contained in:
Matt Joiner 2016-01-25 07:22:33 +11:00
parent 94d764e6be
commit 90348f6a48
4 changed files with 29 additions and 15 deletions

View File

@ -1357,11 +1357,7 @@ func (me *Client) sendChunk(t *torrent, c *connection, r request) error {
c.chunksSent++ c.chunksSent++
b := make([]byte, r.Length) b := make([]byte, r.Length)
tp := &t.Pieces[r.Index] tp := &t.Pieces[r.Index]
tp.pendingWritesMutex.Lock() tp.waitNoPendingWrites()
for tp.pendingWrites != 0 {
tp.noPendingWrites.Wait()
}
tp.pendingWritesMutex.Unlock()
p := t.Info.Piece(int(r.Index)) p := t.Info.Piece(int(r.Index))
n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin)) n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin))
if err != nil { if err != nil {

View File

@ -92,3 +92,29 @@ func (p *piece) shuffledPendingChunkSpecs(t *torrent, piece int) (css []chunkSpe
} }
return return
} }
func (p *piece) incrementPendingWrites() {
p.pendingWritesMutex.Lock()
p.pendingWrites++
p.pendingWritesMutex.Unlock()
}
func (p *piece) decrementPendingWrites() {
p.pendingWritesMutex.Lock()
if p.pendingWrites == 0 {
panic("assertion")
}
p.pendingWrites--
if p.pendingWrites == 0 {
p.noPendingWrites.Broadcast()
}
p.pendingWritesMutex.Unlock()
}
func (p *piece) waitNoPendingWrites() {
p.pendingWritesMutex.Lock()
for p.pendingWrites != 0 {
p.noPendingWrites.Wait()
}
p.pendingWritesMutex.Unlock()
}

View File

@ -118,11 +118,7 @@ again:
if int64(len(b1)) > ip.Length()-po { if int64(len(b1)) > ip.Length()-po {
b1 = b1[:ip.Length()-po] b1 = b1[:ip.Length()-po]
} }
tp.pendingWritesMutex.Lock() tp.waitNoPendingWrites()
for tp.pendingWrites != 0 {
tp.noPendingWrites.Wait()
}
tp.pendingWritesMutex.Unlock()
n, err = dataReadAt(r.t.torrent.data, b1, pos) n, err = dataReadAt(r.t.torrent.data, b1, pos)
if n != 0 { if n != 0 {
err = nil err = nil

View File

@ -645,11 +645,7 @@ func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
func (t *torrent) hashPiece(piece int) (ps pieceSum) { func (t *torrent) hashPiece(piece int) (ps pieceSum) {
hash := pieceHash.New() hash := pieceHash.New()
p := &t.Pieces[piece] p := &t.Pieces[piece]
p.pendingWritesMutex.Lock() p.waitNoPendingWrites()
for p.pendingWrites != 0 {
p.noPendingWrites.Wait()
}
p.pendingWritesMutex.Unlock()
pl := t.Info.Piece(int(piece)).Length() pl := t.Info.Piece(int(piece)).Length()
n, err := t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, pl) n, err := t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, pl)
if err != nil { if err != nil {