Store pieces inplace in torrent.Pieces

This should save some allocation overhead, especially for torrents that have 20k+ pieces.
This commit is contained in:
Matt Joiner 2015-10-16 22:10:03 +11:00
parent db2dcfee3e
commit b3a8020401
4 changed files with 27 additions and 25 deletions

View File

@ -110,7 +110,7 @@ const (
// Currently doesn't really queue, but should in the future. // Currently doesn't really queue, but should in the future.
func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) { func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
piece := t.Pieces[pieceIndex] piece := &t.Pieces[pieceIndex]
if piece.QueuedForHash { if piece.QueuedForHash {
return return
} }
@ -122,7 +122,7 @@ func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
// Queue a piece check if one isn't already queued, and the piece has never // Queue a piece check if one isn't already queued, and the piece has never
// been checked before. // been checked before.
func (cl *Client) queueFirstHash(t *torrent, piece int) { func (cl *Client) queueFirstHash(t *torrent, piece int) {
p := t.Pieces[piece] p := &t.Pieces[piece]
if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) { if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
return return
} }
@ -347,7 +347,7 @@ func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority)
if priority != PiecePriorityNone { if priority != PiecePriorityNone {
cl.queueFirstHash(t, piece) cl.queueFirstHash(t, piece)
} }
p := t.Pieces[piece] p := &t.Pieces[piece]
if p.Priority != priority { if p.Priority != priority {
p.Priority = priority p.Priority = priority
cl.pieceChanged(t, piece) cl.pieceChanged(t, piece)
@ -1448,7 +1448,7 @@ func (me *Client) sendChunk(t *torrent, c *connection, r request) error {
// Count the chunk being sent, even if it isn't. // Count the chunk being sent, even if it isn't.
c.chunksSent++ c.chunksSent++
b := make([]byte, r.Length) b := make([]byte, r.Length)
tp := t.Pieces[r.Index] tp := &t.Pieces[r.Index]
tp.pendingWritesMutex.Lock() tp.pendingWritesMutex.Lock()
for tp.pendingWrites != 0 { for tp.pendingWrites != 0 {
tp.noPendingWrites.Wait() tp.noPendingWrites.Wait()
@ -1895,8 +1895,8 @@ func (cl *Client) startTorrent(t *torrent) {
if !cl.config.NoUpload { if !cl.config.NoUpload {
// Queue all pieces for hashing. This is done sequentially to avoid // Queue all pieces for hashing. This is done sequentially to avoid
// spamming goroutines. // spamming goroutines.
for _, p := range t.Pieces { for i := range t.Pieces {
p.QueuedForHash = true t.Pieces[i].QueuedForHash = true
} }
go func() { go func() {
for i := range t.Pieces { for i := range t.Pieces {
@ -2511,7 +2511,7 @@ func (me *Client) fillRequests(t *torrent, c *connection) {
c.pieceRequestOrder.DeletePiece(pieceIndex) c.pieceRequestOrder.DeletePiece(pieceIndex)
continue continue
} }
piece := t.Pieces[pieceIndex] piece := &t.Pieces[pieceIndex]
for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex), pp.Integer(t.chunkSize)) { for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex), pp.Integer(t.chunkSize)) {
r := request{pp.Integer(pieceIndex), cs} r := request{pp.Integer(pieceIndex), cs}
if !addRequest(r) { if !addRequest(r) {
@ -2549,7 +2549,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
unexpectedChunksReceived.Add(1) unexpectedChunksReceived.Add(1)
} }
piece := t.Pieces[req.Index] piece := &t.Pieces[req.Index]
// Do we actually want this chunk? // Do we actually want this chunk?
if !t.wantChunk(req) { if !t.wantChunk(req) {
@ -2630,7 +2630,7 @@ func (me *Client) reapPieceTouches(t *torrent, piece int) (ret []*connection) {
} }
func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
p := t.Pieces[piece] p := &t.Pieces[piece]
if p.EverHashed { if p.EverHashed {
// Don't score the first time a piece is hashed, it could be an // Don't score the first time a piece is hashed, it could be an
// initial check. // initial check.
@ -2661,7 +2661,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
// TODO: Check this isn't called more than once for each piece being correct. // TODO: Check this isn't called more than once for each piece being correct.
func (me *Client) pieceChanged(t *torrent, piece int) { func (me *Client) pieceChanged(t *torrent, piece int) {
correct := t.pieceComplete(piece) correct := t.pieceComplete(piece)
p := t.Pieces[piece] p := &t.Pieces[piece]
defer t.publishPieceChange(piece) defer t.publishPieceChange(piece)
defer p.Event.Broadcast() defer p.Event.Broadcast()
if correct { if correct {
@ -2705,7 +2705,7 @@ func (me *Client) pieceChanged(t *torrent, piece int) {
func (cl *Client) verifyPiece(t *torrent, index pp.Integer) { func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
cl.mu.Lock() cl.mu.Lock()
defer cl.mu.Unlock() defer cl.mu.Unlock()
p := t.Pieces[index] p := &t.Pieces[index]
for p.Hashing || t.data == nil { for p.Hashing || t.data == nil {
cl.event.Wait() cl.event.Wait()
} }

View File

@ -106,7 +106,7 @@ func TestTorrentInitialState(t *testing.T) {
if len(tor.Pieces) != 3 { if len(tor.Pieces) != 3 {
t.Fatal("wrong number of pieces") t.Fatal("wrong number of pieces")
} }
p := tor.Pieces[0] p := &tor.Pieces[0]
tor.pendAllChunkSpecs(0) tor.pendAllChunkSpecs(0)
assert.EqualValues(t, 3, p.numPendingChunks()) assert.EqualValues(t, 3, p.numPendingChunks())
assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize)) assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))

View File

@ -122,7 +122,7 @@ again:
r.t.cl.mu.Unlock() r.t.cl.mu.Unlock()
b1 := b[:avail] b1 := b[:avail]
pi := int(pos / r.t.Info().PieceLength) pi := int(pos / r.t.Info().PieceLength)
tp := r.t.torrent.Pieces[pi] tp := &r.t.torrent.Pieces[pi]
ip := r.t.Info().Piece(pi) ip := r.t.Info().Piece(pi)
po := pos % ip.Length() po := pos % ip.Length()
if int64(len(b1)) > ip.Length()-po { if int64(len(b1)) > ip.Length()-po {

View File

@ -26,7 +26,7 @@ func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) {
if t.pieceComplete(index) { if t.pieceComplete(index) {
return 0 return 0
} }
piece := t.Pieces[index] piece := &t.Pieces[index]
pieceLength := t.pieceLength(index) pieceLength := t.pieceLength(index)
if !piece.EverHashed { if !piece.EverHashed {
return pieceLength return pieceLength
@ -54,7 +54,7 @@ type torrent struct {
ceasingNetworking chan struct{} ceasingNetworking chan struct{}
InfoHash InfoHash InfoHash InfoHash
Pieces []*piece Pieces []piece
// Values are the piece indices that changed. // Values are the piece indices that changed.
pieceStateChanges *pubsub.PubSub pieceStateChanges *pubsub.PubSub
chunkSize pp.Integer chunkSize pp.Integer
@ -245,12 +245,13 @@ func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte, eventLocker s
} }
t.MetaData = infoBytes t.MetaData = infoBytes
t.metadataHave = nil t.metadataHave = nil
for _, hash := range infoPieceHashes(md) { hashes := infoPieceHashes(md)
piece := &piece{} t.Pieces = make([]piece, len(hashes))
for i, hash := range hashes {
piece := &t.Pieces[i]
piece.Event.L = eventLocker piece.Event.L = eventLocker
piece.noPendingWrites.L = &piece.pendingWritesMutex piece.noPendingWrites.L = &piece.pendingWritesMutex
missinggo.CopyExact(piece.Hash[:], hash) missinggo.CopyExact(piece.Hash[:], hash)
t.Pieces = append(t.Pieces, piece)
} }
for _, conn := range t.Conns { for _, conn := range t.Conns {
t.initRequestOrdering(conn) t.initRequestOrdering(conn)
@ -315,7 +316,7 @@ func (t *torrent) Name() string {
} }
func (t *torrent) pieceState(index int) (ret PieceState) { func (t *torrent) pieceState(index int) (ret PieceState) {
p := t.Pieces[index] p := &t.Pieces[index]
ret.Priority = p.Priority ret.Priority = p.Priority
if t.pieceComplete(index) { if t.pieceComplete(index) {
ret.Complete = true ret.Complete = true
@ -583,7 +584,8 @@ func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
} }
func (t *torrent) bitfield() (bf []bool) { func (t *torrent) bitfield() (bf []bool) {
for _, p := range t.Pieces { for i := range t.Pieces {
p := &t.Pieces[i]
// TODO: Check this logic. // TODO: Check this logic.
bf = append(bf, p.EverHashed && p.numPendingChunks() == 0) bf = append(bf, p.EverHashed && p.numPendingChunks() == 0)
} }
@ -622,7 +624,7 @@ func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
} }
func (t *torrent) pendAllChunkSpecs(pieceIndex int) { func (t *torrent) pendAllChunkSpecs(pieceIndex int) {
piece := t.Pieces[pieceIndex] piece := &t.Pieces[pieceIndex]
if piece.PendingChunkSpecs == nil { if piece.PendingChunkSpecs == nil {
// Allocate to exact size. // Allocate to exact size.
piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+t.chunkSize-1)/t.chunkSize) piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+t.chunkSize-1)/t.chunkSize)
@ -656,7 +658,7 @@ func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) { func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
hash := pieceHash.New() hash := pieceHash.New()
p := t.Pieces[piece] p := &t.Pieces[piece]
p.pendingWritesMutex.Lock() p.pendingWritesMutex.Lock()
for p.pendingWrites != 0 { for p.pendingWrites != 0 {
p.noPendingWrites.Wait() p.noPendingWrites.Wait()
@ -709,7 +711,7 @@ func (t *torrent) haveChunk(r request) bool {
if t.pieceComplete(int(r.Index)) { if t.pieceComplete(int(r.Index)) {
return true return true
} }
p := t.Pieces[r.Index] p := &t.Pieces[r.Index]
if p.PendingChunkSpecs == nil { if p.PendingChunkSpecs == nil {
return false return false
} }
@ -747,7 +749,7 @@ func (t *torrent) wantPiece(index int) bool {
if !t.haveInfo() { if !t.haveInfo() {
return false return false
} }
p := t.Pieces[index] p := &t.Pieces[index]
if p.QueuedForHash { if p.QueuedForHash {
return false return false
} }
@ -795,7 +797,7 @@ func (t *torrent) worstBadConn(cl *Client) *connection {
func (t *torrent) publishPieceChange(piece int) { func (t *torrent) publishPieceChange(piece int) {
cur := t.pieceState(piece) cur := t.pieceState(piece)
p := t.Pieces[piece] p := &t.Pieces[piece]
if cur != p.PublicPieceState { if cur != p.PublicPieceState {
t.pieceStateChanges.Publish(piece) t.pieceStateChanges.Publish(piece)
} }