Handle 503 returns from webseed peer endpoints
This commit is contained in:
parent
32d9ec900c
commit
a2c50ea2bd
10
torrent.go
10
torrent.go
|
@ -2200,10 +2200,12 @@ func (t *Torrent) addWebSeed(url string) {
|
|||
if _, ok := t.webSeeds[url]; ok {
|
||||
return
|
||||
}
|
||||
// I don't think Go http supports pipelining requests. However we can have more ready to go
|
||||
// I don't think Go http supports pipelining requests. However, we can have more ready to go
|
||||
// right away. This value should be some multiple of the number of connections to a host. I
|
||||
// would expect that double maxRequests plus a bit would be appropriate.
|
||||
const maxRequests = 32
|
||||
// would expect that double maxRequests plus a bit would be appropriate. This value is based on
|
||||
// downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
|
||||
// "https://webtorrent.io/torrents/".
|
||||
const maxRequests = 16
|
||||
ws := webseedPeer{
|
||||
peer: Peer{
|
||||
t: t,
|
||||
|
@ -2228,7 +2230,7 @@ func (t *Torrent) addWebSeed(url string) {
|
|||
ws.peer.initUpdateRequestsTimer()
|
||||
ws.requesterCond.L = t.cl.locker()
|
||||
for i := 0; i < maxRequests; i += 1 {
|
||||
go ws.requester()
|
||||
go ws.requester(i)
|
||||
}
|
||||
for _, f := range t.callbacks().NewPeer {
|
||||
f(&ws.peer)
|
||||
|
|
|
@ -4,7 +4,9 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/RoaringBitmap/roaring"
|
||||
"github.com/anacrolix/log"
|
||||
|
@ -69,18 +71,19 @@ func (ws *webseedPeer) _request(r Request) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (ws *webseedPeer) doRequest(r Request) {
|
||||
func (ws *webseedPeer) doRequest(r Request) error {
|
||||
webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
|
||||
ws.activeRequests[r] = webseedRequest
|
||||
func() {
|
||||
err := func() error {
|
||||
ws.requesterCond.L.Unlock()
|
||||
defer ws.requesterCond.L.Lock()
|
||||
ws.requestResultHandler(r, webseedRequest)
|
||||
return ws.requestResultHandler(r, webseedRequest)
|
||||
}()
|
||||
delete(ws.activeRequests, r)
|
||||
return err
|
||||
}
|
||||
|
||||
func (ws *webseedPeer) requester() {
|
||||
func (ws *webseedPeer) requester(i int) {
|
||||
ws.requesterCond.L.Lock()
|
||||
defer ws.requesterCond.L.Unlock()
|
||||
start:
|
||||
|
@ -91,8 +94,16 @@ start:
|
|||
if _, ok := ws.activeRequests[r]; ok {
|
||||
return true
|
||||
}
|
||||
ws.doRequest(r)
|
||||
err := ws.doRequest(r)
|
||||
ws.requesterCond.L.Unlock()
|
||||
if err != nil {
|
||||
log.Printf("requester %v: error doing webseed request %v: %v", i, r, err)
|
||||
}
|
||||
restart = true
|
||||
if errors.Is(err, webseed.ErrTooFast) {
|
||||
time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
|
||||
}
|
||||
ws.requesterCond.L.Lock()
|
||||
return false
|
||||
})
|
||||
if restart {
|
||||
|
@ -125,7 +136,7 @@ func (ws *webseedPeer) onClose() {
|
|||
ws.requesterCond.Broadcast()
|
||||
}
|
||||
|
||||
func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
|
||||
func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error {
|
||||
result := <-webseedRequest.Result
|
||||
close(webseedRequest.Result) // one-shot
|
||||
// We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
|
||||
|
@ -139,10 +150,15 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
|
|||
ws.peer.t.cl.lock()
|
||||
defer ws.peer.t.cl.unlock()
|
||||
if ws.peer.t.closed.IsSet() {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
if result.Err != nil {
|
||||
if !errors.Is(result.Err, context.Canceled) && !ws.peer.closed.IsSet() {
|
||||
err := result.Err
|
||||
if err != nil {
|
||||
switch {
|
||||
case errors.Is(err, context.Canceled):
|
||||
case errors.Is(err, webseed.ErrTooFast):
|
||||
case ws.peer.closed.IsSet():
|
||||
default:
|
||||
ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
|
||||
// // Here lies my attempt to extract something concrete from Go's error system. RIP.
|
||||
// cfg := spew.NewDefaultConfig()
|
||||
|
@ -152,17 +168,18 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
|
|||
ws.peer.close()
|
||||
}
|
||||
ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
|
||||
} else {
|
||||
err := ws.peer.receiveChunk(&pp.Message{
|
||||
Type: pp.Piece,
|
||||
Index: r.Index,
|
||||
Begin: r.Begin,
|
||||
Piece: result.Bytes,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
err = ws.peer.receiveChunk(&pp.Message{
|
||||
Type: pp.Piece,
|
||||
Index: r.Index,
|
||||
Begin: r.Begin,
|
||||
Piece: result.Bytes,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (me *webseedPeer) isLowOnRequests() bool {
|
||||
|
|
|
@ -3,6 +3,7 @@ package webseed
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
|
@ -162,6 +163,8 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error
|
|||
} else {
|
||||
return ErrBadResponse{"resp status ok but requested range", result.resp}
|
||||
}
|
||||
case http.StatusServiceUnavailable:
|
||||
return ErrTooFast
|
||||
default:
|
||||
return ErrBadResponse{
|
||||
fmt.Sprintf("unhandled response status code (%v)", result.resp.StatusCode),
|
||||
|
@ -170,6 +173,8 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error
|
|||
}
|
||||
}
|
||||
|
||||
var ErrTooFast = errors.New("making requests too fast")
|
||||
|
||||
func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) {
|
||||
var buf bytes.Buffer
|
||||
for _, part := range parts {
|
||||
|
|
Loading…
Reference in New Issue