Use HTTP proxy config for torrent sources

This commit is contained in:
Matt Joiner 2022-02-25 18:35:21 +11:00
parent 716dc55321
commit d06f990b81
2 changed files with 53 additions and 31 deletions

View File

@ -84,7 +84,7 @@ type Client struct {
websocketTrackers websocketTrackers websocketTrackers websocketTrackers
activeAnnounceLimiter limiter.Instance activeAnnounceLimiter limiter.Instance
webseedHttpClient *http.Client httpClient *http.Client
} }
type ipStr string type ipStr string
@ -198,9 +198,11 @@ func (cl *Client) init(cfg *ClientConfig) {
cl.activeAnnounceLimiter.SlotsPerKey = 2 cl.activeAnnounceLimiter.SlotsPerKey = 2
cl.event.L = cl.locker() cl.event.L = cl.locker()
cl.ipBlockList = cfg.IPBlocklist cl.ipBlockList = cfg.IPBlocklist
cl.webseedHttpClient = &http.Client{ cl.httpClient = &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
Proxy: cfg.HTTPProxy, Proxy: cfg.HTTPProxy,
// I think this value was observed from some webseeds. It seems reasonable to extend it
// to other uses of HTTP from the client.
MaxConnsPerHost: 10, MaxConnsPerHost: 10,
}, },
} }
@ -1188,6 +1190,7 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
} }
t.networkingEnabled.Set() t.networkingEnabled.Set()
t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString()) t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString())
t.sourcesLogger = t.logger.WithNames("sources")
if opts.ChunkSize == 0 { if opts.ChunkSize == 0 {
opts.ChunkSize = defaultChunkSize opts.ChunkSize = defaultChunkSize
} }
@ -1312,7 +1315,7 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
cl.lock() cl.lock()
defer cl.unlock() defer cl.unlock()
t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
useTorrentSources(spec.Sources, t) t.useSources(spec.Sources)
for _, url := range spec.Webseeds { for _, url := range spec.Webseeds {
t.addWebSeed(url) t.addWebSeed(url)
} }
@ -1333,23 +1336,37 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
return nil return nil
} }
func useTorrentSources(sources []string, t *Torrent) { func (t *Torrent) useSources(sources []string) {
// TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes select {
ctx := context.Background() case <-t.Closed():
for i := 0; i < len(sources); i += 1 { return
s := sources[i] case <-t.GotInfo():
return
default:
}
for _, s := range sources {
_, loaded := t.activeSources.LoadOrStore(s, struct{}{})
if loaded {
continue
}
s := s
go func() { go func() {
if err := useTorrentSource(ctx, s, t); err != nil { err := t.useActiveTorrentSource(s)
t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err) _, loaded := t.activeSources.LoadAndDelete(s)
} else { if !loaded {
t.logger.Printf("successfully used source %q", s) panic(s)
} }
level := log.Debug
if err != nil {
level = log.Warning
}
t.logger.Levelf(level, "used torrent source %q [err=%q]", s, err)
}() }()
} }
} }
func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) { func (t *Torrent) useActiveTorrentSource(source string) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
go func() { go func() {
select { select {
@ -1359,26 +1376,27 @@ func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error
} }
cancel() cancel()
}() }()
var req *http.Request mi, err := getTorrentSource(ctx, source, t.cl.httpClient)
if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
panic(err)
}
var resp *http.Response
if resp, err = http.DefaultClient.Do(req); err != nil {
return
}
var mi metainfo.MetaInfo
err = bencode.NewDecoder(resp.Body).Decode(&mi)
resp.Body.Close()
if err != nil { if err != nil {
if ctx.Err() != nil { return err
return nil
}
return
} }
return t.MergeSpec(TorrentSpecFromMetaInfo(&mi)) return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
} }
func getTorrentSource(ctx context.Context, source string, hc *http.Client) (mi metainfo.MetaInfo, err error) {
var req *http.Request
if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
return
}
var resp *http.Response
if resp, err = hc.Do(req); err != nil {
return
}
defer resp.Body.Close()
err = bencode.NewDecoder(resp.Body).Decode(&mi)
return
}
func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) { func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
t, ok := cl.torrents[infoHash] t, ok := cl.torrents[infoHash]
if !ok { if !ok {

View File

@ -147,6 +147,10 @@ type Torrent struct {
// Is On when all pieces are complete. // Is On when all pieces are complete.
Complete chansync.Flag Complete chansync.Flag
// Torrent sources in use keyed by the source string.
activeSources sync.Map
sourcesLogger log.Logger
} }
func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) { func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
@ -2308,7 +2312,7 @@ func (t *Torrent) addWebSeed(url string) {
callbacks: t.callbacks(), callbacks: t.callbacks(),
}, },
client: webseed.Client{ client: webseed.Client{
HttpClient: t.cl.webseedHttpClient, HttpClient: t.cl.httpClient,
Url: url, Url: url,
ResponseBodyWrapper: func(r io.Reader) io.Reader { ResponseBodyWrapper: func(r io.Reader) io.Reader {
return &rateLimitedReader{ return &rateLimitedReader{