Factor out internal/string_limiter
This commit is contained in:
parent
d713aaf280
commit
ddd03362f8
52
client.go
52
client.go
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/anacrolix/missinggo/slices"
|
||||
"github.com/anacrolix/missinggo/v2/pproffd"
|
||||
"github.com/anacrolix/sync"
|
||||
"github.com/anacrolix/torrent/internal/string-limiter"
|
||||
"github.com/anacrolix/torrent/tracker"
|
||||
"github.com/anacrolix/torrent/webtorrent"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
|
@ -79,55 +80,7 @@ type Client struct {
|
|||
|
||||
websocketTrackers websocketTrackers
|
||||
|
||||
activeAnnouncesMu sync.Mutex
|
||||
// Limits concurrent use of a trackers by URL. Push into the channel to use a slot, and receive
|
||||
// to free up a slot.
|
||||
activeAnnounces map[string]*activeAnnouncesValueType
|
||||
}
|
||||
|
||||
type activeAnnouncesValueType struct {
|
||||
ch chan struct{}
|
||||
refs int
|
||||
}
|
||||
|
||||
type activeAnnouncesValueRef struct {
|
||||
r *activeAnnouncesValueType
|
||||
url string
|
||||
cl *Client
|
||||
}
|
||||
|
||||
func (me activeAnnouncesValueRef) C() chan struct{} {
|
||||
return me.r.ch
|
||||
}
|
||||
|
||||
func (me activeAnnouncesValueRef) Drop() {
|
||||
me.cl.activeAnnouncesMu.Lock()
|
||||
defer me.cl.activeAnnouncesMu.Unlock()
|
||||
me.r.refs--
|
||||
if me.r.refs == 0 {
|
||||
delete(me.cl.activeAnnounces, me.url)
|
||||
}
|
||||
}
|
||||
|
||||
func (cl *Client) getAnnounceRef(url string) activeAnnouncesValueRef {
|
||||
cl.activeAnnouncesMu.Lock()
|
||||
defer cl.activeAnnouncesMu.Unlock()
|
||||
if cl.activeAnnounces == nil {
|
||||
cl.activeAnnounces = make(map[string]*activeAnnouncesValueType)
|
||||
}
|
||||
v, ok := cl.activeAnnounces[url]
|
||||
if !ok {
|
||||
v = &activeAnnouncesValueType{
|
||||
ch: make(chan struct{}, 2),
|
||||
}
|
||||
cl.activeAnnounces[url] = v
|
||||
}
|
||||
v.refs++
|
||||
return activeAnnouncesValueRef{
|
||||
r: v,
|
||||
url: url,
|
||||
cl: cl,
|
||||
}
|
||||
activeAnnounceLimiter string_limiter.Instance
|
||||
}
|
||||
|
||||
type ipStr string
|
||||
|
@ -234,6 +187,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
|
|||
torrents: make(map[metainfo.Hash]*Torrent),
|
||||
dialRateLimiter: rate.NewLimiter(10, 10),
|
||||
}
|
||||
cl.activeAnnounceLimiter.SlotsPerKey = 2
|
||||
go cl.acceptLimitClearer()
|
||||
cl.initLogger()
|
||||
defer func() {
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
package string_limiter
|
||||
|
||||
import "sync"
|
||||
|
||||
// Manages resources with a limited number of concurrent slots for use keyed by a string.
|
||||
type Instance struct {
|
||||
SlotsPerKey int
|
||||
|
||||
mu sync.Mutex
|
||||
// Limits concurrent use of a resource. Push into the channel to use a slot, and receive to free
|
||||
// up a slot.
|
||||
active map[string]*activeValueType
|
||||
}
|
||||
|
||||
type activeValueType struct {
|
||||
ch chan struct{}
|
||||
refs int
|
||||
}
|
||||
|
||||
type ActiveValueRef struct {
|
||||
v *activeValueType
|
||||
k string
|
||||
i *Instance
|
||||
}
|
||||
|
||||
// Returns the limiting channel. Send to it to obtain a slot, and receive to release the slot.
|
||||
func (me ActiveValueRef) C() chan struct{} {
|
||||
return me.v.ch
|
||||
}
|
||||
|
||||
// Drop the reference to a key, this allows keys to be reclaimed when they're no longer in use.
|
||||
func (me ActiveValueRef) Drop() {
|
||||
me.i.mu.Lock()
|
||||
defer me.i.mu.Unlock()
|
||||
me.v.refs--
|
||||
if me.v.refs == 0 {
|
||||
delete(me.i.active, me.k)
|
||||
}
|
||||
}
|
||||
|
||||
// Get a reference to the values for a key. You should make sure to call Drop exactly once on the
|
||||
// returned value when done.
|
||||
func (i *Instance) GetRef(key string) ActiveValueRef {
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
if i.active == nil {
|
||||
i.active = make(map[string]*activeValueType)
|
||||
}
|
||||
v, ok := i.active[key]
|
||||
if !ok {
|
||||
v = &activeValueType{
|
||||
ch: make(chan struct{}, i.SlotsPerKey),
|
||||
}
|
||||
i.active[key] = v
|
||||
}
|
||||
v.refs++
|
||||
return ActiveValueRef{
|
||||
v: v,
|
||||
k: key,
|
||||
i: i,
|
||||
}
|
||||
}
|
|
@ -112,7 +112,7 @@ func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEv
|
|||
ret.Interval = time.Minute
|
||||
|
||||
// Limit concurrent use of the same tracker URL by the Client.
|
||||
ref := me.t.cl.getAnnounceRef(me.u.String())
|
||||
ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String())
|
||||
defer ref.Drop()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
Loading…
Reference in New Issue