Rework webseed peers to use a pool of requesters

This fixes the limitation that the max outstanding requests for a webseed peer must match the request concurrency. It should mean less recalculation, and more pipelining.
This commit is contained in:
Matt Joiner 2021-01-29 16:01:35 +11:00
parent 4078644db9
commit 60e992ec97
2 changed files with 47 additions and 13 deletions

View File

@ -26,6 +26,7 @@ import (
"github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2"
"github.com/anacrolix/log" "github.com/anacrolix/log"
"github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/missinggo/slices" "github.com/anacrolix/missinggo/slices"
@ -2118,17 +2119,20 @@ func (t *Torrent) addWebSeed(url string) {
reconciledHandshakeStats: true, reconciledHandshakeStats: true,
peerSentHaveAll: true, peerSentHaveAll: true,
// TODO: Raise this limit, and instead limit concurrent fetches. // TODO: Raise this limit, and instead limit concurrent fetches.
PeerMaxRequests: maxRequests, PeerMaxRequests: 32,
RemoteAddr: remoteAddrFromUrl(url), RemoteAddr: remoteAddrFromUrl(url),
callbacks: t.callbacks(), callbacks: t.callbacks(),
}, },
client: webseed.Client{ client: webseed.Client{
// TODO: Investigate a MaxConnsPerHost in the transport for this, possibly in a global // Consider a MaxConnsPerHost in the transport for this, possibly in a global Client.
// Client.
HttpClient: http.DefaultClient, HttpClient: http.DefaultClient,
Url: url, Url: url,
}, },
requests: make(map[Request]webseed.Request, maxRequests), activeRequests: make(map[Request]webseed.Request, maxRequests),
}
ws.requesterCond.L = t.cl.locker()
for range iter.N(maxRequests) {
go ws.requester()
} }
for _, f := range t.callbacks().NewPeer { for _, f := range t.callbacks().NewPeer {
f(&ws.peer) f(&ws.peer)

View File

@ -3,6 +3,7 @@ package torrent
import ( import (
"fmt" "fmt"
"strings" "strings"
"sync"
"github.com/anacrolix/torrent/common" "github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
@ -13,10 +14,10 @@ import (
) )
type webseedPeer struct { type webseedPeer struct {
client webseed.Client client webseed.Client
// TODO: Remove finished entries from this. activeRequests map[Request]webseed.Request
requests map[Request]webseed.Request requesterCond sync.Cond
peer Peer peer Peer
} }
var _ peerImpl = (*webseedPeer)(nil) var _ peerImpl = (*webseedPeer)(nil)
@ -43,7 +44,11 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
} }
func (ws *webseedPeer) cancel(r Request) bool { func (ws *webseedPeer) cancel(r Request) bool {
ws.requests[r].Cancel() active, ok := ws.activeRequests[r]
if !ok {
return false
}
active.Cancel()
return true return true
} }
@ -52,12 +57,35 @@ func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
} }
func (ws *webseedPeer) request(r Request) bool { func (ws *webseedPeer) request(r Request) bool {
webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) ws.requesterCond.Signal()
ws.requests[r] = webseedRequest
go ws.requestResultHandler(r, webseedRequest)
return true return true
} }
func (ws *webseedPeer) doRequest(r Request) {
webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
ws.activeRequests[r] = webseedRequest
ws.requesterCond.L.Unlock()
ws.requestResultHandler(r, webseedRequest)
ws.requesterCond.L.Lock()
delete(ws.activeRequests, r)
}
func (ws *webseedPeer) requester() {
ws.requesterCond.L.Lock()
defer ws.requesterCond.L.Unlock()
start:
for !ws.peer.closed.IsSet() {
for r := range ws.peer.requests {
if _, ok := ws.activeRequests[r]; ok {
continue
}
ws.doRequest(r)
goto start
}
ws.requesterCond.Wait()
}
}
func (ws *webseedPeer) connectionFlags() string { func (ws *webseedPeer) connectionFlags() string {
return "WS" return "WS"
} }
@ -70,7 +98,9 @@ func (ws *webseedPeer) updateRequests() {
ws.peer.doRequestState() ws.peer.doRequestState()
} }
func (ws *webseedPeer) onClose() {} func (ws *webseedPeer) onClose() {
ws.requesterCond.Broadcast()
}
func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) { func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
result := <-webseedRequest.Result result := <-webseedRequest.Result