Use locks on piece per resource pieces to prevent races in reading incomplete data

This commit is contained in:
Matt Joiner 2021-02-04 18:42:43 +11:00
parent 78c77c0b45
commit bc0936c44a
1 changed files with 23 additions and 3 deletions

View File

@ -38,6 +38,7 @@ func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl
type piecePerResourceTorrentImpl struct {
piecePerResource
locks []sync.RWMutex
}
func (piecePerResourceTorrentImpl) Close() error {
@ -45,13 +46,17 @@ func (piecePerResourceTorrentImpl) Close() error {
}
func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return piecePerResourceTorrentImpl{s}, nil
return piecePerResourceTorrentImpl{
s,
make([]sync.RWMutex, info.NumPieces()),
}, nil
}
func (s piecePerResource) Piece(p metainfo.Piece) PieceImpl {
func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
return piecePerResourcePiece{
mp: p,
piecePerResource: s,
piecePerResource: s.piecePerResource,
mu: &s.locks[p.Index()],
}
}
@ -66,11 +71,16 @@ type ConsecutiveChunkReader interface {
type piecePerResourcePiece struct {
mp metainfo.Piece
piecePerResource
// This protects operations that move complete/incomplete pieces around, which can trigger read
// errors that may cause callers to do more drastic things.
mu *sync.RWMutex
}
var _ io.WriterTo = piecePerResourcePiece{}
func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if s.mustIsComplete() {
r, err := s.completed().Get()
if err != nil {
@ -105,6 +115,8 @@ func (s piecePerResourcePiece) mustIsComplete() bool {
}
func (s piecePerResourcePiece) Completion() Completion {
s.mu.RLock()
defer s.mu.RUnlock()
fi, err := s.completed().Stat()
return Completion{
Complete: err == nil && fi.Size() == s.mp.Length(),
@ -117,6 +129,8 @@ type SizedPutter interface {
}
func (s piecePerResourcePiece) MarkComplete() error {
s.mu.Lock()
defer s.mu.Unlock()
incompleteChunks := s.getChunks()
r, err := func() (io.ReadCloser, error) {
if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
@ -155,10 +169,14 @@ func (s piecePerResourcePiece) MarkComplete() error {
}
func (s piecePerResourcePiece) MarkNotComplete() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.completed().Delete()
}
func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if s.mustIsComplete() {
return s.completed().ReadAt(b, off)
}
@ -166,6 +184,8 @@ func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
}
func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
s.mu.RLock()
defer s.mu.RUnlock()
i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10)))
if err != nil {
panic(err)