Add Reader.ReadContext

Allows cancelling reads etc. Torrents that get stuck can result in Reads that won't return until the torrent is dropped.
This commit is contained in:
Matt Joiner 2016-04-30 11:08:29 +10:00
parent 28c9ec2bd1
commit f62ff2f540
2 changed files with 55 additions and 5 deletions

View File

@ -1,6 +1,7 @@
package torrent
import (
"context"
"errors"
"io"
"os"
@ -91,11 +92,31 @@ func (r *Reader) waitReadable(off int64) {
}
func (r *Reader) Read(b []byte) (n int, err error) {
return r.ReadContext(b, context.Background())
}
func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) {
// This is set under the Client lock if the Context is canceled.
var ctxErr error
if ctx.Done() != nil {
ctx, cancel := context.WithCancel(ctx)
// Abort the goroutine when the function returns.
defer cancel()
go func() {
<-ctx.Done()
r.t.cl.mu.Lock()
ctxErr = ctx.Err()
r.t.cl.event.Broadcast()
r.t.cl.mu.Unlock()
}()
}
// Hmmm, if a Read gets stuck, this means you can't change position for
// other purposes. That seems reasonable, but unusual.
r.opMu.Lock()
defer r.opMu.Unlock()
for len(b) != 0 {
var n1 int
n1, err = r.readOnceAt(b, r.pos)
n1, err = r.readOnceAt(b, r.pos, &ctxErr)
if n1 == 0 {
if err == nil {
panic("expected error")
@ -123,28 +144,32 @@ func (r *Reader) torrentClosed() bool {
// Wait until some data should be available to read. Tickles the client if it
// isn't. Returns how much should be readable without blocking.
func (r *Reader) waitAvailable(pos, wanted int64) (avail int64) {
func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
r.t.cl.mu.Lock()
defer r.t.cl.mu.Unlock()
for !r.readable(pos) {
for !r.readable(pos) && *ctxErr == nil {
r.waitReadable(pos)
}
return r.available(pos, wanted)
}
// Performs at most one successful read to torrent storage.
func (r *Reader) readOnceAt(b []byte, pos int64) (n int, err error) {
func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
if pos >= r.t.length {
err = io.EOF
return
}
for {
avail := r.waitAvailable(pos, int64(len(b)))
avail := r.waitAvailable(pos, int64(len(b)), ctxErr)
if avail == 0 {
if r.torrentClosed() {
err = errors.New("torrent closed")
return
}
if *ctxErr != nil {
err = *ctxErr
return
}
}
b1 := b[:avail]
pi := int(pos / r.t.Info().PieceLength)

25
reader_test.go Normal file
View File

@ -0,0 +1,25 @@
package torrent
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/anacrolix/torrent/internal/testutil"
)
func TestReaderReadContext(t *testing.T) {
cl, err := NewClient(&TestingConfig)
require.NoError(t, err)
defer cl.Close()
tt, err := cl.AddTorrent(testutil.GreetingMetaInfo())
require.NoError(t, err)
defer tt.Drop()
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond))
r := tt.NewReader()
defer r.Close()
_, err = r.ReadContext(make([]byte, 1), ctx)
require.EqualValues(t, context.DeadlineExceeded, err)
}