Don't block trying to fill entire Reader.Read

This conforms more to the contract in io.Reader. It's possible the old behaviour was better in reducing overhead, but that can be iterated on (or added as comments next time).
This commit is contained in:
Matt Joiner 2020-10-27 12:59:07 +11:00
parent 1bfca56e94
commit 2a7352aad2
2 changed files with 35 additions and 24 deletions

View File

@ -206,15 +206,24 @@ func TestDownloadOnDemand(t *testing.T) {
var attr fuse.Attr
node.Attr(netContext.Background(), &attr)
size := attr.Size
resp := &fuse.ReadResponse{
Data: make([]byte, size),
}
data := make([]byte, size)
h, err := node.(fusefs.NodeOpener).Open(context.TODO(), nil, nil)
require.NoError(t, err)
h.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{
Size: int(size),
}, resp)
assert.EqualValues(t, testutil.GreetingFileContents, resp.Data)
// torrent.Reader.Read no longer tries to fill the entire read buffer, so this is a ReadFull for
// fusefs.
var n int
for n < len(data) {
resp := fuse.ReadResponse{Data: data[n:]}
err := h.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{
Size: int(size) - n,
Offset: int64(n),
}, &resp)
assert.NoError(t, err)
n += len(resp.Data)
}
assert.EqualValues(t, testutil.GreetingFileContents, data)
}
func TestIsSubPath(t *testing.T) {

View File

@ -140,7 +140,9 @@ func (r *reader) Read(b []byte) (n int, err error) {
}
func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
// This is set under the Client lock if the Context is canceled.
// This is set under the Client lock if the Context is canceled. I think we coordinate on a
// separate variable so as to avoid false negatives with race conditions due to Contexts being
// synchronized.
var ctxErr error
if ctx.Done() != nil {
ctx, cancel := context.WithCancel(ctx)
@ -158,22 +160,19 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
// other purposes. That seems reasonable, but unusual.
r.opMu.Lock()
defer r.opMu.Unlock()
for len(b) != 0 {
var n1 int
n1, err = r.readOnceAt(b, r.pos, &ctxErr)
if n1 == 0 {
n, err = r.readOnceAt(b, r.pos, &ctxErr)
if n == 0 {
if err == nil {
panic("expected error")
} else {
return
}
break
}
b = b[n1:]
n += n1
r.mu.Lock()
r.pos += int64(n1)
r.pos += int64(n)
r.posChanged()
r.mu.Unlock()
}
if r.pos >= r.length {
err = io.EOF
} else if err == io.EOF {
@ -184,7 +183,7 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
// Wait until some data should be available to read. Tickles the client if it
// isn't. Returns how much should be readable without blocking.
func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64, err error) {
func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error, wait bool) (avail int64, err error) {
r.t.cl.lock()
defer r.t.cl.unlock()
for {
@ -200,6 +199,9 @@ func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64, e
err = *ctxErr
return
}
if !wait {
return
}
r.waitReadable(pos)
}
}
@ -218,7 +220,7 @@ func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro
}
for {
var avail int64
avail, err = r.waitAvailable(pos, int64(len(b)), ctxErr)
avail, err = r.waitAvailable(pos, int64(len(b)), ctxErr, n == 0)
if avail == 0 {
return
}