diff --git a/webseed-peer.go b/webseed-peer.go index cea3f9de..4cf27d13 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -125,10 +125,14 @@ func (ws *webseedPeer) onClose() { func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) { result := <-webseedRequest.Result + close(webseedRequest.Result) // one-shot // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not // sure if we can divine which errors indicate cancellation on our end without hitting the // network though. - ws.peer.doChunkReadStats(int64(len(result.Bytes))) + if len(result.Bytes) != 0 || result.Err == nil { + // Increment ChunksRead and friends + ws.peer.doChunkReadStats(int64(len(result.Bytes))) + } ws.peer.readBytes(int64(len(result.Bytes))) ws.peer.t.cl.lock() defer ws.peer.t.cl.unlock() @@ -138,6 +142,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re if result.Err != nil { if !errors.Is(result.Err, context.Canceled) && !ws.peer.closed.IsSet() { ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err) + // // Here lies my attempt to extract something concrete from Go's error system. RIP. // cfg := spew.NewDefaultConfig() // cfg.DisableMethods = true // cfg.Dump(result.Err) diff --git a/webseed/client.go b/webseed/client.go index 5dc9d2c4..5752575d 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -26,6 +26,7 @@ type requestPart struct { req *http.Request e segments.Extent result chan requestPartResult + start func() } type Request struct { @@ -79,13 +80,15 @@ func (ws *Client) NewRequest(r RequestSpec) Request { result: make(chan requestPartResult, 1), e: e, } - go func() { - resp, err := ws.HttpClient.Do(req) - part.result <- requestPartResult{ - resp: resp, - err: err, - } - }() + part.start = func() { + go func() { + resp, err := ws.HttpClient.Do(req) + part.result <- requestPartResult{ + resp: resp, + err: err, + } + }() + } requestParts = append(requestParts, part) return true }) { @@ -116,6 +119,8 @@ func (me ErrBadResponse) Error() string { func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error { result := <-part.result + // Make sure there's no further results coming, it should be a one-shot channel. + close(part.result) if result.err != nil { return result.err } @@ -165,29 +170,15 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error } } -func readRequestPartResponses(ctx context.Context, parts []requestPart) ([]byte, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() +func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) { var buf bytes.Buffer - firstErr := make(chan error, 1) - go func() { - for _, part := range parts { - err := recvPartResult(ctx, &buf, part) - if err != nil { - // Ensure no further unnecessary response reads occur. - cancel() - select { - case firstErr <- fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err): - default: - } - } + for _, part := range parts { + part.start() + err = recvPartResult(ctx, &buf, part) + if err != nil { + err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err) + break } - select { - case firstErr <- nil: - default: - } - }() - // This can't be merged into the return statement, because buf.Bytes is called first! - err := <-firstErr + } return buf.Bytes(), err }