Replacing pendingWrites WaitGroup with Mutex/Cond/int
Apparently I've been using WaitGroups wrong all along.
This commit is contained in:
parent
19021f1360
commit
085be622a0
20
client.go
20
client.go
|
@ -1440,7 +1440,12 @@ another:
|
|||
|
||||
func (me *Client) sendChunk(t *torrent, c *connection, r request) error {
|
||||
b := make([]byte, r.Length)
|
||||
t.Pieces[r.Index].pendingWrites.Wait()
|
||||
tp := t.Pieces[r.Index]
|
||||
tp.pendingWritesMutex.Lock()
|
||||
for tp.pendingWrites != 0 {
|
||||
tp.noPendingWrites.Wait()
|
||||
}
|
||||
tp.pendingWritesMutex.Unlock()
|
||||
p := t.Info.Piece(int(r.Index))
|
||||
n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin))
|
||||
if err != nil {
|
||||
|
@ -2536,9 +2541,18 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
|
|||
|
||||
me.upload(t, c)
|
||||
|
||||
piece.pendingWrites.Add(1)
|
||||
piece.pendingWritesMutex.Lock()
|
||||
piece.pendingWrites++
|
||||
piece.pendingWritesMutex.Unlock()
|
||||
go func() {
|
||||
defer piece.pendingWrites.Done()
|
||||
defer func() {
|
||||
piece.pendingWritesMutex.Lock()
|
||||
piece.pendingWrites--
|
||||
if piece.pendingWrites == 0 {
|
||||
piece.noPendingWrites.Broadcast()
|
||||
}
|
||||
piece.pendingWritesMutex.Unlock()
|
||||
}()
|
||||
// Write the chunk out.
|
||||
tr := perf.NewTimer()
|
||||
err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
|
||||
|
|
5
piece.go
5
piece.go
|
@ -30,7 +30,10 @@ type piece struct {
|
|||
EverHashed bool
|
||||
Event sync.Cond
|
||||
Priority piecePriority
|
||||
pendingWrites sync.WaitGroup
|
||||
|
||||
pendingWritesMutex sync.Mutex
|
||||
pendingWrites int
|
||||
noPendingWrites sync.Cond
|
||||
}
|
||||
|
||||
func (p *piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool {
|
||||
|
|
|
@ -128,7 +128,11 @@ again:
|
|||
if int64(len(b1)) > ip.Length()-po {
|
||||
b1 = b1[:ip.Length()-po]
|
||||
}
|
||||
tp.pendingWrites.Wait()
|
||||
tp.pendingWritesMutex.Lock()
|
||||
for tp.pendingWrites != 0 {
|
||||
tp.noPendingWrites.Wait()
|
||||
}
|
||||
tp.pendingWritesMutex.Unlock()
|
||||
n, err = dataReadAt(r.t.data, b1, pos)
|
||||
if n != 0 {
|
||||
err = nil
|
||||
|
|
|
@ -225,6 +225,7 @@ func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte, eventLocker s
|
|||
for _, hash := range infoPieceHashes(md) {
|
||||
piece := &piece{}
|
||||
piece.Event.L = eventLocker
|
||||
piece.noPendingWrites.L = &piece.pendingWritesMutex
|
||||
missinggo.CopyExact(piece.Hash[:], hash)
|
||||
t.Pieces = append(t.Pieces, piece)
|
||||
}
|
||||
|
@ -638,7 +639,11 @@ func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
|
|||
func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
|
||||
hash := pieceHash.New()
|
||||
p := t.Pieces[piece]
|
||||
p.pendingWrites.Wait()
|
||||
p.pendingWritesMutex.Lock()
|
||||
for p.pendingWrites != 0 {
|
||||
p.noPendingWrites.Wait()
|
||||
}
|
||||
p.pendingWritesMutex.Unlock()
|
||||
t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
|
||||
missinggo.CopyExact(ps[:], hash.Sum(nil))
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue