Do webseed request parts sequentially

This means we can treat the number of bytes in the result with enough accuracy to decide if we should count it as a wasted chunk. Also I'm not sure why it was a good idea to do parts of a request in parallel anyway, it could just lead to spikes in outstanding requests to the webseed, rather than sticking to the predictable maxRequests limit.
This commit is contained in:
Matt Joiner 2021-12-02 13:47:06 +11:00
parent 9bee7c3bc4
commit c653cf2070
2 changed files with 26 additions and 30 deletions

View File

@ -125,10 +125,14 @@ func (ws *webseedPeer) onClose() {
func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) { func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
result := <-webseedRequest.Result result := <-webseedRequest.Result
close(webseedRequest.Result) // one-shot
// We do this here rather than inside receiveChunk, since we want to count errors too. I'm not // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
// sure if we can divine which errors indicate cancellation on our end without hitting the // sure if we can divine which errors indicate cancellation on our end without hitting the
// network though. // network though.
if len(result.Bytes) != 0 || result.Err == nil {
// Increment ChunksRead and friends
ws.peer.doChunkReadStats(int64(len(result.Bytes))) ws.peer.doChunkReadStats(int64(len(result.Bytes)))
}
ws.peer.readBytes(int64(len(result.Bytes))) ws.peer.readBytes(int64(len(result.Bytes)))
ws.peer.t.cl.lock() ws.peer.t.cl.lock()
defer ws.peer.t.cl.unlock() defer ws.peer.t.cl.unlock()
@ -138,6 +142,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
if result.Err != nil { if result.Err != nil {
if !errors.Is(result.Err, context.Canceled) && !ws.peer.closed.IsSet() { if !errors.Is(result.Err, context.Canceled) && !ws.peer.closed.IsSet() {
ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err) ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
// // Here lies my attempt to extract something concrete from Go's error system. RIP.
// cfg := spew.NewDefaultConfig() // cfg := spew.NewDefaultConfig()
// cfg.DisableMethods = true // cfg.DisableMethods = true
// cfg.Dump(result.Err) // cfg.Dump(result.Err)

View File

@ -26,6 +26,7 @@ type requestPart struct {
req *http.Request req *http.Request
e segments.Extent e segments.Extent
result chan requestPartResult result chan requestPartResult
start func()
} }
type Request struct { type Request struct {
@ -79,6 +80,7 @@ func (ws *Client) NewRequest(r RequestSpec) Request {
result: make(chan requestPartResult, 1), result: make(chan requestPartResult, 1),
e: e, e: e,
} }
part.start = func() {
go func() { go func() {
resp, err := ws.HttpClient.Do(req) resp, err := ws.HttpClient.Do(req)
part.result <- requestPartResult{ part.result <- requestPartResult{
@ -86,6 +88,7 @@ func (ws *Client) NewRequest(r RequestSpec) Request {
err: err, err: err,
} }
}() }()
}
requestParts = append(requestParts, part) requestParts = append(requestParts, part)
return true return true
}) { }) {
@ -116,6 +119,8 @@ func (me ErrBadResponse) Error() string {
func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error { func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error {
result := <-part.result result := <-part.result
// Make sure there's no further results coming, it should be a one-shot channel.
close(part.result)
if result.err != nil { if result.err != nil {
return result.err return result.err
} }
@ -165,29 +170,15 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error
} }
} }
func readRequestPartResponses(ctx context.Context, parts []requestPart) ([]byte, error) { func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var buf bytes.Buffer var buf bytes.Buffer
firstErr := make(chan error, 1)
go func() {
for _, part := range parts { for _, part := range parts {
err := recvPartResult(ctx, &buf, part) part.start()
err = recvPartResult(ctx, &buf, part)
if err != nil { if err != nil {
// Ensure no further unnecessary response reads occur. err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err)
cancel() break
select {
case firstErr <- fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err):
default:
} }
} }
}
select {
case firstErr <- nil:
default:
}
}()
// This can't be merged into the return statement, because buf.Bytes is called first!
err := <-firstErr
return buf.Bytes(), err return buf.Bytes(), err
} }