diff --git a/client.go b/client.go index 4adf28b7..167cf0cb 100644 --- a/client.go +++ b/client.go @@ -31,6 +31,7 @@ import ( "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/sync" + "github.com/anacrolix/torrent/types/infohash" "github.com/davecgh/go-spew/spew" "github.com/dustin/go-humanize" gbtree "github.com/google/btree" @@ -1286,7 +1287,7 @@ func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) { } type AddTorrentOpts struct { - InfoHash InfoHash + InfoHash infohash.T Storage storage.ClientImpl ChunkSize pp.Integer } @@ -1505,6 +1506,7 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon connString: opts.connString, conn: nc, } + c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn c.initRequestState() // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses. if opts.remoteAddr != nil { diff --git a/config.go b/config.go index e1e6452a..17641689 100644 --- a/config.go +++ b/config.go @@ -148,6 +148,8 @@ type ClientConfig struct { // How long between writes before sending a keep alive message on a peer connection that we want // to maintain. KeepAliveTimeout time.Duration + // Maximum bytes to buffer per peer connection for peer request data before it is sent. + MaxAllocPeerRequestDataPerConn int64 // The IP addresses as our peers should see them. May differ from the // local interfaces due to NAT or other network configurations. @@ -205,6 +207,7 @@ func NewDefaultClientConfig() *ClientConfig { TorrentPeersLowWater: 50, HandshakesTimeout: 4 * time.Second, KeepAliveTimeout: time.Minute, + MaxAllocPeerRequestDataPerConn: 1 << 20, ListenHost: func(string) string { return "" }, UploadRateLimiter: unlimited, DownloadRateLimiter: unlimited, diff --git a/internal/alloclim/alloclim_test.go b/internal/alloclim/alloclim_test.go new file mode 100644 index 00000000..5952804e --- /dev/null +++ b/internal/alloclim/alloclim_test.go @@ -0,0 +1,93 @@ +package alloclim + +import ( + "context" + "testing" + "time" + + _ "github.com/anacrolix/envpprof" + qt "github.com/frankban/quicktest" +) + +func TestReserveOverMax(t *testing.T) { + c := qt.New(t) + l := &Limiter{Max: 10} + r := l.Reserve(20) + c.Assert(r.Wait(context.Background()), qt.IsNotNil) +} + +func TestImmediateAllow(t *testing.T) { + c := qt.New(t) + l := &Limiter{Max: 10} + r := l.Reserve(10) + c.Assert(r.Wait(context.Background()), qt.IsNil) +} + +func TestSimpleSequence(t *testing.T) { + c := qt.New(t) + l := &Limiter{Max: 10} + rs := make([]*Reservation, 0) + rs = append(rs, l.Reserve(6)) + rs = append(rs, l.Reserve(5)) + rs = append(rs, l.Reserve(5)) + c.Assert(rs[0].Wait(context.Background()), qt.IsNil) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Nanosecond)) + c.Assert(rs[1].Wait(ctx), qt.Equals, context.DeadlineExceeded) + go cancel() + ctx, cancel = context.WithCancel(context.Background()) + go cancel() + c.Assert(rs[2].Wait(ctx), qt.Equals, context.Canceled) + go rs[0].Release() + ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + c.Assert(rs[1].Wait(ctx), qt.IsNil) + go rs[1].Release() + c.Assert(rs[2].Wait(ctx), qt.IsNil) + go rs[2].Release() + go cancel() + rs[2].Release() + rs[1].Release() + c.Assert(l.Value(), qt.Equals, l.Max) +} + +func TestSequenceWithCancel(t *testing.T) { + c := qt.New(t) + l := &Limiter{Max: 10} + rs := make([]*Reservation, 0) + rs = append(rs, l.Reserve(6)) + rs = append(rs, l.Reserve(6)) + rs = append(rs, l.Reserve(4)) + rs = append(rs, l.Reserve(4)) + c.Assert(rs[0].Cancel(), qt.IsFalse) + c.Assert(func() { rs[1].Release() }, qt.PanicMatches, "not resolved") + c.Assert(rs[1].Cancel(), qt.IsTrue) + c.Assert(rs[2].Wait(context.Background()), qt.IsNil) + rs[0].Release() + c.Assert(rs[3].Wait(context.Background()), qt.IsNil) + c.Assert(l.Value(), qt.Equals, int64(2)) + rs[1].Release() + rs[2].Release() + rs[3].Release() + c.Assert(l.Value(), qt.Equals, l.Max) +} + +func TestCancelWhileWaiting(t *testing.T) { + c := qt.New(t) + l := &Limiter{Max: 10} + rs := make([]*Reservation, 0) + rs = append(rs, l.Reserve(6)) + rs = append(rs, l.Reserve(6)) + rs = append(rs, l.Reserve(4)) + rs = append(rs, l.Reserve(4)) + go rs[1].Cancel() + err := rs[1].Wait(context.Background()) + c.Assert(err, qt.IsNotNil) + err = rs[2].Wait(context.Background()) + c.Assert(err, qt.IsNil) + ctx, cancel := context.WithCancel(context.Background()) + go cancel() + err = rs[3].Wait(ctx) + c.Assert(err, qt.Equals, context.Canceled) + rs[0].Drop() + err = rs[3].Wait(ctx) + c.Assert(err, qt.IsNil) +} diff --git a/internal/alloclim/l.go b/internal/alloclim/l.go new file mode 100644 index 00000000..98be1a10 --- /dev/null +++ b/internal/alloclim/l.go @@ -0,0 +1,80 @@ +package alloclim + +import "sync" + +// Manages reservations sharing a common allocation limit. +type Limiter struct { + // Maximum outstanding allocation space. + Max int64 + initOnce sync.Once + mu sync.Mutex + // Current unallocated space. + value int64 + // Reservations waiting to in the order they arrived. + waiting []*Reservation +} + +func (me *Limiter) initValue() { + me.value = me.Max +} + +func (me *Limiter) init() { + me.initOnce.Do(func() { + me.initValue() + }) +} + +func (me *Limiter) Reserve(n int64) *Reservation { + r := &Reservation{ + l: me, + n: n, + } + me.init() + me.mu.Lock() + if n <= me.value { + me.value -= n + r.granted.Set() + } else { + me.waiting = append(me.waiting, r) + } + me.mu.Unlock() + return r +} + +func (me *Limiter) doWakesLocked() { + for { + if len(me.waiting) == 0 { + break + } + r := me.waiting[0] + switch { + case r.cancelled.IsSet(): + case r.n <= me.value: + if r.wake() { + me.value -= r.n + } + default: + return + } + me.waiting = me.waiting[1:] + } +} + +func (me *Limiter) doWakes() { + me.mu.Lock() + me.doWakesLocked() + me.mu.Unlock() +} + +func (me *Limiter) addValue(n int64) { + me.mu.Lock() + me.value += n + me.doWakesLocked() + me.mu.Unlock() +} + +func (me *Limiter) Value() int64 { + me.mu.Lock() + defer me.mu.Unlock() + return me.value +} diff --git a/internal/alloclim/r.go b/internal/alloclim/r.go new file mode 100644 index 00000000..b84be667 --- /dev/null +++ b/internal/alloclim/r.go @@ -0,0 +1,97 @@ +package alloclim + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/anacrolix/chansync" +) + +type Reservation struct { + l *Limiter + n int64 + releaseOnce sync.Once + mu sync.Mutex + granted chansync.SetOnce + cancelled chansync.SetOnce +} + +// Releases the alloc claim if the reservation has been granted. Does nothing if it was cancelled. +// Otherwise panics. +func (me *Reservation) Release() { + me.mu.Lock() + defer me.mu.Unlock() + switch { + default: + panic("not resolved") + case me.cancelled.IsSet(): + return + case me.granted.IsSet(): + } + me.releaseOnce.Do(func() { + me.l.addValue(me.n) + }) +} + +// Cancel the reservation, returns false if it was already granted. You must still release if that's +// the case. See Drop. +func (me *Reservation) Cancel() bool { + me.mu.Lock() + defer me.mu.Unlock() + if me.granted.IsSet() { + return false + } + if me.cancelled.Set() { + go me.l.doWakes() + } + return true +} + +// If the reservation is granted, release it, otherwise cancel the reservation. +func (me *Reservation) Drop() { + me.mu.Lock() + defer me.mu.Unlock() + if me.granted.IsSet() { + me.releaseOnce.Do(func() { + me.l.addValue(me.n) + }) + return + } + if me.cancelled.Set() { + go me.l.doWakes() + } +} + +func (me *Reservation) wake() bool { + me.mu.Lock() + defer me.mu.Unlock() + if me.cancelled.IsSet() { + return false + } + return me.granted.Set() +} + +func (me *Reservation) Wait(ctx context.Context) error { + if me.n > me.l.Max { + return fmt.Errorf("reservation for %v exceeds limiter max %v", me.n, me.l.Max) + } + select { + case <-ctx.Done(): + case <-me.granted.Done(): + case <-me.cancelled.Done(): + } + defer me.mu.Unlock() + me.mu.Lock() + switch { + case me.granted.IsSet(): + return nil + case me.cancelled.IsSet(): + return errors.New("reservation cancelled") + case ctx.Err() != nil: + return ctx.Err() + default: + panic("unexpected") + } +} diff --git a/peerconn.go b/peerconn.go index fdc8236d..af9c0844 100644 --- a/peerconn.go +++ b/peerconn.go @@ -3,6 +3,7 @@ package torrent import ( "bufio" "bytes" + "context" "errors" "fmt" "io" @@ -20,6 +21,7 @@ import ( "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/multiless" + "github.com/anacrolix/torrent/internal/alloclim" "golang.org/x/time/rate" "github.com/anacrolix/torrent/bencode" @@ -43,7 +45,8 @@ const ( ) type peerRequestState struct { - data []byte + data []byte + allocReservation *alloclim.Reservation } type PeerRemoteAddr interface { @@ -169,6 +172,8 @@ type PeerConn struct { // The peer has everything. This can occur due to a special message, when // we may not even know the number of pieces in the torrent yet. peerSentHaveAll bool + + peerRequestDataAllocLimiter alloclim.Limiter } func (cn *PeerConn) peerImplStatusLines() []string { @@ -547,11 +552,18 @@ func (cn *PeerConn) choke(msg messageWriter) (more bool) { Type: pp.Choke, }) if !cn.fastEnabled() { - cn.peerRequests = nil + cn.deleteAllPeerRequests() } return } +func (cn *PeerConn) deleteAllPeerRequests() { + for _, state := range cn.peerRequests { + state.allocReservation.Drop() + } + cn.peerRequests = nil +} + func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool { if !cn.choking { return true @@ -992,7 +1004,11 @@ func (c *PeerConn) reject(r Request) { panic("fast not enabled") } c.write(r.ToMsg(pp.Reject)) - delete(c.peerRequests, r) + // It is possible to reject a request before it is added to peer requests due to being invalid. + if state, ok := c.peerRequests[r]; ok { + state.allocReservation.Drop() + delete(c.peerRequests, r) + } } func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) { @@ -1066,7 +1082,9 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { if c.peerRequests == nil { c.peerRequests = make(map[Request]*peerRequestState, localClientReqq) } - value := &peerRequestState{} + value := &peerRequestState{ + allocReservation: c.peerRequestDataAllocLimiter.Reserve(int64(r.Length)), + } c.peerRequests[r] = value if startFetch { // TODO: Limit peer request data read concurrency. @@ -1076,7 +1094,7 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { } func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) { - b, err := readPeerRequestData(r, c) + b, err := c.readPeerRequestData(r, prs) c.locker().Lock() defer c.locker().Unlock() if err != nil { @@ -1133,7 +1151,20 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) { } } -func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) { +func (c *PeerConn) readPeerRequestData(r Request, prs *peerRequestState) ([]byte, error) { + // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere, + // or fail to read and then cleanup. + ctx := context.Background() + err := prs.allocReservation.Wait(ctx) + if err != nil { + if ctx.Err() == nil { + // The error is from the reservation itself. Something is very broken, or we're not + // guarding against excessively large requests. + err = log.WithLevel(log.Critical, err) + } + err = fmt.Errorf("waiting for alloc limit reservation: %w", err) + return nil, err + } b := make([]byte, r.Length) p := c.t.info.Piece(int(r.Index)) n, err := c.t.readAt(b, p.Offset()+int64(r.Begin)) @@ -1740,6 +1771,7 @@ func (c *PeerConn) tickleWriter() { func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) { c.lastChunkSent = time.Now() + state.allocReservation.Release() return msg(pp.Message{ Type: pp.Piece, Index: r.Index, diff --git a/test/leecher-storage.go b/test/leecher-storage.go index eac152d6..bfb3816d 100644 --- a/test/leecher-storage.go +++ b/test/leecher-storage.go @@ -108,6 +108,8 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { cfg := torrent.TestingConfig(t) // cfg.Debug = true cfg.Seed = true + // Less than a piece, more than a single request. + cfg.MaxAllocPeerRequestDataPerConn = 4 // Some test instances don't like this being on, even when there's no cache involved. cfg.DropMutuallyCompletePeers = false if ps.SeederUploadRateLimiter != nil { diff --git a/test/transfer_test.go b/test/transfer_test.go index 2c77e673..b96c94d3 100644 --- a/test/transfer_test.go +++ b/test/transfer_test.go @@ -138,6 +138,7 @@ func TestSeedAfterDownloading(t *testing.T) { cfg := torrent.TestingConfig(t) cfg.Seed = true + cfg.MaxAllocPeerRequestDataPerConn = 4 cfg.DataDir = greetingTempDir seeder, err := torrent.NewClient(cfg) require.NoError(t, err) @@ -159,6 +160,7 @@ func TestSeedAfterDownloading(t *testing.T) { cfg = torrent.TestingConfig(t) cfg.Seed = false cfg.DataDir = t.TempDir() + cfg.MaxAllocPeerRequestDataPerConn = 4 leecherLeecher, _ := torrent.NewClient(cfg) require.NoError(t, err) defer leecherLeecher.Close()