From 90348f6a489d597c5e31efb7dd9ebd81b0639630 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 25 Jan 2016 07:22:33 +1100 Subject: [PATCH] Add pending write helpers --- client.go | 6 +----- piece.go | 26 ++++++++++++++++++++++++++ reader.go | 6 +----- torrent.go | 6 +----- 4 files changed, 29 insertions(+), 15 deletions(-) diff --git a/client.go b/client.go index 007186e9..422180de 100644 --- a/client.go +++ b/client.go @@ -1357,11 +1357,7 @@ func (me *Client) sendChunk(t *torrent, c *connection, r request) error { c.chunksSent++ b := make([]byte, r.Length) tp := &t.Pieces[r.Index] - tp.pendingWritesMutex.Lock() - for tp.pendingWrites != 0 { - tp.noPendingWrites.Wait() - } - tp.pendingWritesMutex.Unlock() + tp.waitNoPendingWrites() p := t.Info.Piece(int(r.Index)) n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin)) if err != nil { diff --git a/piece.go b/piece.go index 518e76bb..1c86b7ea 100644 --- a/piece.go +++ b/piece.go @@ -92,3 +92,29 @@ func (p *piece) shuffledPendingChunkSpecs(t *torrent, piece int) (css []chunkSpe } return } + +func (p *piece) incrementPendingWrites() { + p.pendingWritesMutex.Lock() + p.pendingWrites++ + p.pendingWritesMutex.Unlock() +} + +func (p *piece) decrementPendingWrites() { + p.pendingWritesMutex.Lock() + if p.pendingWrites == 0 { + panic("assertion") + } + p.pendingWrites-- + if p.pendingWrites == 0 { + p.noPendingWrites.Broadcast() + } + p.pendingWritesMutex.Unlock() +} + +func (p *piece) waitNoPendingWrites() { + p.pendingWritesMutex.Lock() + for p.pendingWrites != 0 { + p.noPendingWrites.Wait() + } + p.pendingWritesMutex.Unlock() +} diff --git a/reader.go b/reader.go index 3d04b35c..8c3fa56f 100644 --- a/reader.go +++ b/reader.go @@ -118,11 +118,7 @@ again: if int64(len(b1)) > ip.Length()-po { b1 = b1[:ip.Length()-po] } - tp.pendingWritesMutex.Lock() - for tp.pendingWrites != 0 { - tp.noPendingWrites.Wait() - } - tp.pendingWritesMutex.Unlock() + tp.waitNoPendingWrites() n, err = dataReadAt(r.t.torrent.data, b1, pos) if n != 0 { err = nil diff --git a/torrent.go b/torrent.go index 924561cd..38990144 100644 --- a/torrent.go +++ b/torrent.go @@ -645,11 +645,7 @@ func (t *torrent) pieceLength(piece int) (len_ pp.Integer) { func (t *torrent) hashPiece(piece int) (ps pieceSum) { hash := pieceHash.New() p := &t.Pieces[piece] - p.pendingWritesMutex.Lock() - for p.pendingWrites != 0 { - p.noPendingWrites.Wait() - } - p.pendingWritesMutex.Unlock() + p.waitNoPendingWrites() pl := t.Info.Piece(int(piece)).Length() n, err := t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, pl) if err != nil {