Limit peer request data allocation

This follows up from abb5cbc96e. We currently limit how many requests peers can send us, but didn't really check that peers didn't make us allocate huge amounts of space to buffer their requests. I'm sure there's some rough edges here.
This commit is contained in:
Matt Joiner 2023-02-14 12:46:23 +11:00
parent ceb7b2443a
commit 2cb7121a93
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
8 changed files with 318 additions and 7 deletions

View File

@ -31,6 +31,7 @@ import (
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync"
"github.com/anacrolix/torrent/types/infohash"
"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
gbtree "github.com/google/btree"
@ -1286,7 +1287,7 @@ func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
}
type AddTorrentOpts struct {
InfoHash InfoHash
InfoHash infohash.T
Storage storage.ClientImpl
ChunkSize pp.Integer
}
@ -1505,6 +1506,7 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon
connString: opts.connString,
conn: nc,
}
c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn
c.initRequestState()
// TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
if opts.remoteAddr != nil {

View File

@ -148,6 +148,8 @@ type ClientConfig struct {
// How long between writes before sending a keep alive message on a peer connection that we want
// to maintain.
KeepAliveTimeout time.Duration
// Maximum bytes to buffer per peer connection for peer request data before it is sent.
MaxAllocPeerRequestDataPerConn int64
// The IP addresses as our peers should see them. May differ from the
// local interfaces due to NAT or other network configurations.
@ -205,6 +207,7 @@ func NewDefaultClientConfig() *ClientConfig {
TorrentPeersLowWater: 50,
HandshakesTimeout: 4 * time.Second,
KeepAliveTimeout: time.Minute,
MaxAllocPeerRequestDataPerConn: 1 << 20,
ListenHost: func(string) string { return "" },
UploadRateLimiter: unlimited,
DownloadRateLimiter: unlimited,

View File

@ -0,0 +1,93 @@
package alloclim
import (
"context"
"testing"
"time"
_ "github.com/anacrolix/envpprof"
qt "github.com/frankban/quicktest"
)
func TestReserveOverMax(t *testing.T) {
c := qt.New(t)
l := &Limiter{Max: 10}
r := l.Reserve(20)
c.Assert(r.Wait(context.Background()), qt.IsNotNil)
}
func TestImmediateAllow(t *testing.T) {
c := qt.New(t)
l := &Limiter{Max: 10}
r := l.Reserve(10)
c.Assert(r.Wait(context.Background()), qt.IsNil)
}
func TestSimpleSequence(t *testing.T) {
c := qt.New(t)
l := &Limiter{Max: 10}
rs := make([]*Reservation, 0)
rs = append(rs, l.Reserve(6))
rs = append(rs, l.Reserve(5))
rs = append(rs, l.Reserve(5))
c.Assert(rs[0].Wait(context.Background()), qt.IsNil)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Nanosecond))
c.Assert(rs[1].Wait(ctx), qt.Equals, context.DeadlineExceeded)
go cancel()
ctx, cancel = context.WithCancel(context.Background())
go cancel()
c.Assert(rs[2].Wait(ctx), qt.Equals, context.Canceled)
go rs[0].Release()
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(time.Second))
c.Assert(rs[1].Wait(ctx), qt.IsNil)
go rs[1].Release()
c.Assert(rs[2].Wait(ctx), qt.IsNil)
go rs[2].Release()
go cancel()
rs[2].Release()
rs[1].Release()
c.Assert(l.Value(), qt.Equals, l.Max)
}
func TestSequenceWithCancel(t *testing.T) {
c := qt.New(t)
l := &Limiter{Max: 10}
rs := make([]*Reservation, 0)
rs = append(rs, l.Reserve(6))
rs = append(rs, l.Reserve(6))
rs = append(rs, l.Reserve(4))
rs = append(rs, l.Reserve(4))
c.Assert(rs[0].Cancel(), qt.IsFalse)
c.Assert(func() { rs[1].Release() }, qt.PanicMatches, "not resolved")
c.Assert(rs[1].Cancel(), qt.IsTrue)
c.Assert(rs[2].Wait(context.Background()), qt.IsNil)
rs[0].Release()
c.Assert(rs[3].Wait(context.Background()), qt.IsNil)
c.Assert(l.Value(), qt.Equals, int64(2))
rs[1].Release()
rs[2].Release()
rs[3].Release()
c.Assert(l.Value(), qt.Equals, l.Max)
}
func TestCancelWhileWaiting(t *testing.T) {
c := qt.New(t)
l := &Limiter{Max: 10}
rs := make([]*Reservation, 0)
rs = append(rs, l.Reserve(6))
rs = append(rs, l.Reserve(6))
rs = append(rs, l.Reserve(4))
rs = append(rs, l.Reserve(4))
go rs[1].Cancel()
err := rs[1].Wait(context.Background())
c.Assert(err, qt.IsNotNil)
err = rs[2].Wait(context.Background())
c.Assert(err, qt.IsNil)
ctx, cancel := context.WithCancel(context.Background())
go cancel()
err = rs[3].Wait(ctx)
c.Assert(err, qt.Equals, context.Canceled)
rs[0].Drop()
err = rs[3].Wait(ctx)
c.Assert(err, qt.IsNil)
}

80
internal/alloclim/l.go Normal file
View File

@ -0,0 +1,80 @@
package alloclim
import "sync"
// Manages reservations sharing a common allocation limit.
type Limiter struct {
// Maximum outstanding allocation space.
Max int64
initOnce sync.Once
mu sync.Mutex
// Current unallocated space.
value int64
// Reservations waiting to in the order they arrived.
waiting []*Reservation
}
func (me *Limiter) initValue() {
me.value = me.Max
}
func (me *Limiter) init() {
me.initOnce.Do(func() {
me.initValue()
})
}
func (me *Limiter) Reserve(n int64) *Reservation {
r := &Reservation{
l: me,
n: n,
}
me.init()
me.mu.Lock()
if n <= me.value {
me.value -= n
r.granted.Set()
} else {
me.waiting = append(me.waiting, r)
}
me.mu.Unlock()
return r
}
func (me *Limiter) doWakesLocked() {
for {
if len(me.waiting) == 0 {
break
}
r := me.waiting[0]
switch {
case r.cancelled.IsSet():
case r.n <= me.value:
if r.wake() {
me.value -= r.n
}
default:
return
}
me.waiting = me.waiting[1:]
}
}
func (me *Limiter) doWakes() {
me.mu.Lock()
me.doWakesLocked()
me.mu.Unlock()
}
func (me *Limiter) addValue(n int64) {
me.mu.Lock()
me.value += n
me.doWakesLocked()
me.mu.Unlock()
}
func (me *Limiter) Value() int64 {
me.mu.Lock()
defer me.mu.Unlock()
return me.value
}

97
internal/alloclim/r.go Normal file
View File

@ -0,0 +1,97 @@
package alloclim
import (
"context"
"errors"
"fmt"
"sync"
"github.com/anacrolix/chansync"
)
type Reservation struct {
l *Limiter
n int64
releaseOnce sync.Once
mu sync.Mutex
granted chansync.SetOnce
cancelled chansync.SetOnce
}
// Releases the alloc claim if the reservation has been granted. Does nothing if it was cancelled.
// Otherwise panics.
func (me *Reservation) Release() {
me.mu.Lock()
defer me.mu.Unlock()
switch {
default:
panic("not resolved")
case me.cancelled.IsSet():
return
case me.granted.IsSet():
}
me.releaseOnce.Do(func() {
me.l.addValue(me.n)
})
}
// Cancel the reservation, returns false if it was already granted. You must still release if that's
// the case. See Drop.
func (me *Reservation) Cancel() bool {
me.mu.Lock()
defer me.mu.Unlock()
if me.granted.IsSet() {
return false
}
if me.cancelled.Set() {
go me.l.doWakes()
}
return true
}
// If the reservation is granted, release it, otherwise cancel the reservation.
func (me *Reservation) Drop() {
me.mu.Lock()
defer me.mu.Unlock()
if me.granted.IsSet() {
me.releaseOnce.Do(func() {
me.l.addValue(me.n)
})
return
}
if me.cancelled.Set() {
go me.l.doWakes()
}
}
func (me *Reservation) wake() bool {
me.mu.Lock()
defer me.mu.Unlock()
if me.cancelled.IsSet() {
return false
}
return me.granted.Set()
}
func (me *Reservation) Wait(ctx context.Context) error {
if me.n > me.l.Max {
return fmt.Errorf("reservation for %v exceeds limiter max %v", me.n, me.l.Max)
}
select {
case <-ctx.Done():
case <-me.granted.Done():
case <-me.cancelled.Done():
}
defer me.mu.Unlock()
me.mu.Lock()
switch {
case me.granted.IsSet():
return nil
case me.cancelled.IsSet():
return errors.New("reservation cancelled")
case ctx.Err() != nil:
return ctx.Err()
default:
panic("unexpected")
}
}

View File

@ -3,6 +3,7 @@ package torrent
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
@ -20,6 +21,7 @@ import (
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
"github.com/anacrolix/torrent/internal/alloclim"
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/bencode"
@ -43,7 +45,8 @@ const (
)
type peerRequestState struct {
data []byte
data []byte
allocReservation *alloclim.Reservation
}
type PeerRemoteAddr interface {
@ -169,6 +172,8 @@ type PeerConn struct {
// The peer has everything. This can occur due to a special message, when
// we may not even know the number of pieces in the torrent yet.
peerSentHaveAll bool
peerRequestDataAllocLimiter alloclim.Limiter
}
func (cn *PeerConn) peerImplStatusLines() []string {
@ -547,11 +552,18 @@ func (cn *PeerConn) choke(msg messageWriter) (more bool) {
Type: pp.Choke,
})
if !cn.fastEnabled() {
cn.peerRequests = nil
cn.deleteAllPeerRequests()
}
return
}
func (cn *PeerConn) deleteAllPeerRequests() {
for _, state := range cn.peerRequests {
state.allocReservation.Drop()
}
cn.peerRequests = nil
}
func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool {
if !cn.choking {
return true
@ -992,7 +1004,11 @@ func (c *PeerConn) reject(r Request) {
panic("fast not enabled")
}
c.write(r.ToMsg(pp.Reject))
delete(c.peerRequests, r)
// It is possible to reject a request before it is added to peer requests due to being invalid.
if state, ok := c.peerRequests[r]; ok {
state.allocReservation.Drop()
delete(c.peerRequests, r)
}
}
func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) {
@ -1066,7 +1082,9 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
if c.peerRequests == nil {
c.peerRequests = make(map[Request]*peerRequestState, localClientReqq)
}
value := &peerRequestState{}
value := &peerRequestState{
allocReservation: c.peerRequestDataAllocLimiter.Reserve(int64(r.Length)),
}
c.peerRequests[r] = value
if startFetch {
// TODO: Limit peer request data read concurrency.
@ -1076,7 +1094,7 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
}
func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
b, err := readPeerRequestData(r, c)
b, err := c.readPeerRequestData(r, prs)
c.locker().Lock()
defer c.locker().Unlock()
if err != nil {
@ -1133,7 +1151,20 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) {
}
}
func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) {
func (c *PeerConn) readPeerRequestData(r Request, prs *peerRequestState) ([]byte, error) {
// Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere,
// or fail to read and then cleanup.
ctx := context.Background()
err := prs.allocReservation.Wait(ctx)
if err != nil {
if ctx.Err() == nil {
// The error is from the reservation itself. Something is very broken, or we're not
// guarding against excessively large requests.
err = log.WithLevel(log.Critical, err)
}
err = fmt.Errorf("waiting for alloc limit reservation: %w", err)
return nil, err
}
b := make([]byte, r.Length)
p := c.t.info.Piece(int(r.Index))
n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
@ -1740,6 +1771,7 @@ func (c *PeerConn) tickleWriter() {
func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
c.lastChunkSent = time.Now()
state.allocReservation.Release()
return msg(pp.Message{
Type: pp.Piece,
Index: r.Index,

View File

@ -108,6 +108,8 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
cfg := torrent.TestingConfig(t)
// cfg.Debug = true
cfg.Seed = true
// Less than a piece, more than a single request.
cfg.MaxAllocPeerRequestDataPerConn = 4
// Some test instances don't like this being on, even when there's no cache involved.
cfg.DropMutuallyCompletePeers = false
if ps.SeederUploadRateLimiter != nil {

View File

@ -138,6 +138,7 @@ func TestSeedAfterDownloading(t *testing.T) {
cfg := torrent.TestingConfig(t)
cfg.Seed = true
cfg.MaxAllocPeerRequestDataPerConn = 4
cfg.DataDir = greetingTempDir
seeder, err := torrent.NewClient(cfg)
require.NoError(t, err)
@ -159,6 +160,7 @@ func TestSeedAfterDownloading(t *testing.T) {
cfg = torrent.TestingConfig(t)
cfg.Seed = false
cfg.DataDir = t.TempDir()
cfg.MaxAllocPeerRequestDataPerConn = 4
leecherLeecher, _ := torrent.NewClient(cfg)
require.NoError(t, err)
defer leecherLeecher.Close()