From 7e9fe4f4470a601d015fe14eefc0392fabe67cbc Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 15 Jul 2015 16:00:59 +1000 Subject: [PATCH] Store chunk data without holding client lock --- client.go | 19 +++++++++++++------ piece.go | 1 + reader.go | 8 ++++++++ torrent.go | 2 ++ 4 files changed, 24 insertions(+), 6 deletions(-) diff --git a/client.go b/client.go index 63d9e0ca..2c4d4d42 100644 --- a/client.go +++ b/client.go @@ -1432,6 +1432,7 @@ another: func (me *Client) sendChunk(t *torrent, c *connection, r request) error { b := make([]byte, r.Length) + t.Pieces[r.Index].pendingWrites.Wait() p := t.Info.Piece(int(r.Index)) n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin)) if err != nil { @@ -2523,12 +2524,18 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er me.upload(t, c) - // Write the chunk out. - err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) - if err != nil { - log.Printf("error writing chunk: %s", err) - return nil - } + piece.pendingWrites.Add(1) + go func() { + defer piece.pendingWrites.Done() + // Write the chunk out. + tr := perf.NewTimer() + err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) + if err != nil { + log.Printf("error writing chunk: %s", err) + return + } + tr.Stop("write chunk") + }() // log.Println("got chunk", req) piece.Event.Broadcast() diff --git a/piece.go b/piece.go index e326c2b4..9a3a1729 100644 --- a/piece.go +++ b/piece.go @@ -30,6 +30,7 @@ type piece struct { EverHashed bool Event sync.Cond Priority piecePriority + pendingWrites sync.WaitGroup } func (p *piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool { diff --git a/reader.go b/reader.go index 02dee969..0ba8d511 100644 --- a/reader.go +++ b/reader.go @@ -121,6 +121,14 @@ again: avail := r.available(pos, int64(len(b))) // log.Println("available", avail) b1 := b[:avail] + pi := int(pos / r.t.Info().PieceLength) + tp := r.t.torrent.Pieces[pi] + ip := r.t.Info().Piece(pi) + po := pos % ip.Length() + if int64(len(b1)) > ip.Length()-po { + b1 = b1[:ip.Length()-po] + } + tp.pendingWrites.Wait() n, err = dataReadAt(r.t.data, b1, pos) if n != 0 { err = nil diff --git a/torrent.go b/torrent.go index c4bb5990..43d45d5c 100644 --- a/torrent.go +++ b/torrent.go @@ -638,6 +638,8 @@ func (t *torrent) pieceLength(piece int) (len_ pp.Integer) { func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) { hash := pieceHash.New() + p := t.Pieces[piece] + p.pendingWrites.Wait() t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength) util.CopyExact(ps[:], hash.Sum(nil)) return