diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 9a27d953..af1530ff 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -9,7 +9,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [ '1.16', '1.17', 'tip' ] + go-version: [ 'tip' ] fail-fast: false steps: - uses: actions/checkout@v2 @@ -20,7 +20,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [ '1.17' ] + go-version: [ 'tip' ] fail-fast: false steps: - uses: actions/checkout@v2 @@ -31,7 +31,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [ '1.17' ] + go-version: [ 'tip' ] fail-fast: false steps: - uses: actions/checkout@v2 @@ -42,7 +42,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [ '1.16', '1.17' ] + go-version: [ 'tip' ] fail-fast: false steps: - uses: actions/checkout@v2 @@ -54,7 +54,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [ '1.16', '1.17' ] + go-version: [ 'tip' ] fail-fast: false steps: - uses: actions/checkout@v2 @@ -66,7 +66,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [ '1.17' ] + go-version: [ 'tip' ] fail-fast: false steps: - uses: actions/checkout@v2 @@ -83,4 +83,3 @@ jobs: - name: torrentfs end-to-end test # Test on 386 for atomic alignment and other bad 64-bit assumptions run: GOARCH=386 fs/test.sh - timeout-minutes: 10 diff --git a/client.go b/client.go index 4b6f1a24..d5f96c0e 100644 --- a/client.go +++ b/client.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "crypto/rand" + "crypto/sha1" "encoding/binary" "errors" "expvar" @@ -12,6 +13,7 @@ import ( "math" "net" "net/http" + "net/netip" "sort" "strconv" "strings" @@ -20,6 +22,7 @@ import ( "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2/krpc" + "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/v2" @@ -34,6 +37,7 @@ import ( "golang.org/x/time/rate" "github.com/anacrolix/chansync" + . "github.com/anacrolix/generics" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/internal/limiter" @@ -72,7 +76,7 @@ type Client struct { // include ourselves if we end up trying to connect to our own address // through legitimate channels. dopplegangerAddrs map[string]struct{} - badPeerIPs map[string]struct{} + badPeerIPs map[netip.Addr]struct{} torrents map[InfoHash]*Torrent pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder @@ -99,7 +103,7 @@ func (cl *Client) badPeerIPsLocked() (ips []string) { ips = make([]string, len(cl.badPeerIPs)) i := 0 for k := range cl.badPeerIPs { - ips[i] = k + ips[i] = k.String() i += 1 } return @@ -191,7 +195,7 @@ func (cl *Client) announceKey() int32 { // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil. func (cl *Client) init(cfg *ClientConfig) { cl.config = cfg - cl.dopplegangerAddrs = make(map[string]struct{}) + generics.MakeMap(&cl.dopplegangerAddrs) cl.torrents = make(map[metainfo.Hash]*Torrent) cl.dialRateLimiter = rate.NewLimiter(10, 10) cl.activeAnnounceLimiter.SlotsPerKey = 2 @@ -1142,7 +1146,11 @@ func (cl *Client) badPeerIPPort(ip net.IP, port int) bool { if _, ok := cl.ipBlockRange(ip); ok { return true } - if _, ok := cl.badPeerIPs[ip.String()]; ok { + ipAddr, ok := netip.AddrFromSlice(ip) + if !ok { + panic(ip) + } + if _, ok := cl.badPeerIPs[ipAddr]; ok { return true } return false @@ -1187,6 +1195,8 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) { webSeeds: make(map[string]*Peer), gotMetainfoC: make(chan struct{}), } + t.smartBanCache.Hash = sha1.Sum + t.smartBanCache.Init() t.networkingEnabled.Set() t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString()) t.sourcesLogger = t.logger.WithNames("sources") @@ -1440,11 +1450,13 @@ func (cl *Client) AddDhtNodes(nodes []string) { } func (cl *Client) banPeerIP(ip net.IP) { - cl.logger.Printf("banning ip %v", ip) - if cl.badPeerIPs == nil { - cl.badPeerIPs = make(map[string]struct{}) + // We can't take this from string, because it will lose netip's v4on6. net.ParseIP parses v4 + // addresses directly to v4on6, which doesn't compare equal with v4. + ipAddr, ok := netip.AddrFromSlice(ip) + if !ok { + panic(ip) } - cl.badPeerIPs[ip.String()] = struct{}{} + generics.MakeMapIfNilAndSet(&cl.badPeerIPs, ipAddr, struct{}{}) for _, t := range cl.torrents { t.iterPeers(func(p *Peer) { if p.remoteIp().Equal(ip) { @@ -1474,6 +1486,13 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot connString: connString, conn: nc, } + // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses. + if remoteAddr != nil { + netipAddrPort, err := netip.ParseAddrPort(remoteAddr.String()) + if err == nil { + c.bannableAddr = Some(netipAddrPort.Addr()) + } + } c.peerImpl = c c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c) c.setRW(connStatsReadWriter{nc, c}) diff --git a/go.mod b/go.mod index 4aa49944..2b67f859 100644 --- a/go.mod +++ b/go.mod @@ -5,18 +5,20 @@ go 1.18 require ( crawshaw.io/sqlite v0.3.3-0.20210127221821-98b1f83c5508 github.com/RoaringBitmap/roaring v0.9.4 + github.com/ajwerner/btree v0.0.0-20211201061316-91c8b66ad617 github.com/alexflint/go-arg v1.4.2 github.com/anacrolix/args v0.5.0 github.com/anacrolix/chansync v0.3.0 github.com/anacrolix/dht/v2 v2.16.2-0.20220311024416-dd658f18fd51 github.com/anacrolix/envpprof v1.1.1 github.com/anacrolix/fuse v0.2.0 + github.com/anacrolix/generics v0.0.0-20220217222028-44932cf46edd github.com/anacrolix/go-libutp v1.2.0 github.com/anacrolix/log v0.13.1 github.com/anacrolix/missinggo v1.3.0 github.com/anacrolix/missinggo/perf v1.0.0 github.com/anacrolix/missinggo/v2 v2.5.4-0.20220317032254-8c5ea4947a0b - github.com/anacrolix/multiless v0.2.0 + github.com/anacrolix/multiless v0.2.1-0.20211218050420-533661eef5dc github.com/anacrolix/squirrel v0.4.1-0.20220122230132-14b040773bac github.com/anacrolix/sync v0.4.0 github.com/anacrolix/tagflag v1.3.0 @@ -37,13 +39,13 @@ require ( github.com/pion/webrtc/v3 v3.1.24-0.20220208053747-94262c1b2b38 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.7.0 + github.com/tidwall/btree v0.7.2-0.20211211132910-4215444137fc go.etcd.io/bbolt v1.3.6 golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac ) require ( - github.com/alexflint/go-scalar v1.0.0 // indirect - github.com/anacrolix/confluence v1.9.0 // indirect + github.com/alexflint/go-scalar v1.1.0 // indirect github.com/anacrolix/mmsg v1.0.0 // indirect github.com/anacrolix/stm v0.3.0 // indirect github.com/benbjohnson/immutable v0.3.0 // indirect diff --git a/go.sum b/go.sum index a0f9b8cf..8e6a3634 100644 --- a/go.sum +++ b/go.sum @@ -15,24 +15,21 @@ github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXl github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/ajwerner/btree v0.0.0-20211201061316-91c8b66ad617 h1:sxP5D87Mq99SZMHhYBmq1yY4AAVkfNY5Wn02B9crYs0= +github.com/ajwerner/btree v0.0.0-20211201061316-91c8b66ad617/go.mod h1:q37NoqncT41qKc048STsifIt69LfUJ8SrWWcz/yam5k= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alexflint/go-arg v1.4.2 h1:lDWZAXxpAnZUq4qwb86p/3rIJJ2Li81EoMbTMujhVa0= github.com/alexflint/go-arg v1.4.2/go.mod h1:9iRbDxne7LcR/GSvEr7ma++GLpdIU1zrghf2y2768kM= -github.com/alexflint/go-scalar v1.0.0 h1:NGupf1XV/Xb04wXskDFzS0KWOLH632W/EO4fAFi+A70= github.com/alexflint/go-scalar v1.0.0/go.mod h1:GpHzbCOZXEKMEcygYQ5n/aa4Aq84zbxjy3MxYW0gjYw= +github.com/alexflint/go-scalar v1.1.0 h1:aaAouLLzI9TChcPXotr6gUhq+Scr8rl0P9P4PnltbhM= +github.com/alexflint/go-scalar v1.1.0/go.mod h1:LoFvNMqS1CPrMVltza4LvnGKhaSpc3oyLEBUZVhhS2o= github.com/anacrolix/args v0.5.0 h1:bZCkbtn4QMB4ow2g34R/oqTFwrI6IojysKop6+ZcOSs= github.com/anacrolix/args v0.5.0/go.mod h1:Fj/N2PehEwTBE5t/V/9xgTcxDkuYQ+5IBoFw/8gkldI= github.com/anacrolix/chansync v0.3.0 h1:lRu9tbeuw3wl+PhMu/r+JJCRu5ArFXIluOgdF0ao6/U= github.com/anacrolix/chansync v0.3.0/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k= -github.com/anacrolix/confluence v1.9.0 h1:7WrWktoDw7P4uo1bzgaA8FFesvc7NsTp37sAsG54XlE= -github.com/anacrolix/confluence v1.9.0/go.mod h1:O5uS+WVgip+3SOcV1K7E/jE3m4DtK7Jk6QJTnU2VS5s= -github.com/anacrolix/dht/v2 v2.15.2-0.20220123034220-0538803801cb h1:c4e9XiiT0P3uMkONTkpzLVGxz3if5fIn6UB4HTrVlc0= -github.com/anacrolix/dht/v2 v2.15.2-0.20220123034220-0538803801cb/go.mod h1:GCylVI6WTvbxvhY7pBoHiE5dmjfDWkhqbobDpjND01A= -github.com/anacrolix/dht/v2 v2.16.1 h1:Q/mL3wP+zZwgo0B3W21JDY3X51KE3xwv3UUjqTf4VJY= -github.com/anacrolix/dht/v2 v2.16.1/go.mod h1:7FOQ1dkRmAMb0LQM8T0hGXc9bQ4RD2/0CB3wcVyBxhk= github.com/anacrolix/dht/v2 v2.16.2-0.20220311024416-dd658f18fd51 h1:issCwqC43gQ7n0gg9rn0EeVYXnQMI7vlnWub4oidtlU= github.com/anacrolix/dht/v2 v2.16.2-0.20220311024416-dd658f18fd51/go.mod h1:osiyaNrMLG9dw7wUtVMaII/NdCjlXeHjUcYzXnmop68= github.com/anacrolix/envpprof v0.0.0-20180404065416-323002cec2fa/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c= @@ -42,6 +39,8 @@ github.com/anacrolix/envpprof v1.1.1 h1:sHQCyj7HtiSfaZAzL2rJrQdyS7odLqlwO6nhk/tG github.com/anacrolix/envpprof v1.1.1/go.mod h1:My7T5oSqVfEn4MD4Meczkw/f5lSIndGAKu/0SM/rkf4= github.com/anacrolix/fuse v0.2.0 h1:pc+To78kI2d/WUjIyrsdqeJQAesuwpGxlI3h1nAv3Do= github.com/anacrolix/fuse v0.2.0/go.mod h1:Kfu02xBwnySDpH3N23BmrP3MDfwAQGRLUCj6XyeOvBQ= +github.com/anacrolix/generics v0.0.0-20220217222028-44932cf46edd h1:u0sIIPDd4zM287UxlhCtIJURZyYsPQQAyZGBaO0nAy0= +github.com/anacrolix/generics v0.0.0-20220217222028-44932cf46edd/go.mod h1:SommN0/3j+jrAnjopAZfqkREMGw59ELwloDcx6Y0KLA= github.com/anacrolix/go-libutp v1.2.0 h1:sjxoB+/ARiKUR7IK/6wLWyADIBqGmu1fm0xo+8Yy7u0= github.com/anacrolix/go-libutp v1.2.0/go.mod h1:RrJ3KcaDcf9Jqp33YL5V/5CBEc6xMc7aJL8wXfuWL50= github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU= @@ -68,8 +67,8 @@ github.com/anacrolix/missinggo/v2 v2.5.4-0.20220317032254-8c5ea4947a0b/go.mod h1 github.com/anacrolix/mmsg v0.0.0-20180515031531-a4a3ba1fc8bb/go.mod h1:x2/ErsYUmT77kezS63+wzZp8E3byYB0gzirM/WMBLfw= github.com/anacrolix/mmsg v1.0.0 h1:btC7YLjOn29aTUAExJiVUhQOuf/8rhm+/nWCMAnL3Hg= github.com/anacrolix/mmsg v1.0.0/go.mod h1:x8kRaJY/dCrY9Al0PEcj1mb/uFHwP6GCJ9fLl4thEPc= -github.com/anacrolix/multiless v0.2.0 h1:HtGBBOQcHaJM59RP3ysITId7AMIgiNF4xJucaFh14Ms= -github.com/anacrolix/multiless v0.2.0/go.mod h1:TrCLEZfIDbMVfLoQt5tOoiBS/uq4y8+ojuEVVvTNPX4= +github.com/anacrolix/multiless v0.2.1-0.20211218050420-533661eef5dc h1:K047jUtd0Xv4SEpv/5DoBgDvj4ZNpT1SOVtMlFpRrh0= +github.com/anacrolix/multiless v0.2.1-0.20211218050420-533661eef5dc/go.mod h1:TrCLEZfIDbMVfLoQt5tOoiBS/uq4y8+ojuEVVvTNPX4= github.com/anacrolix/squirrel v0.4.1-0.20220122230132-14b040773bac h1:eddZTnM9TIy3Z9ARLeDMlUpEjcs0ZdoFMXSG0ChAHvE= github.com/anacrolix/squirrel v0.4.1-0.20220122230132-14b040773bac/go.mod h1:YzgVvikMdFD441oTWlNG189bpKabO9Sbf3uCSVgca04= github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQA4hsg= @@ -319,6 +318,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tidwall/btree v0.7.2-0.20211211132910-4215444137fc h1:THtJVe/QBctKEe8kjnXwt7RAlvHNtUjFJOEmgZkN05w= +github.com/tidwall/btree v0.7.2-0.20211211132910-4215444137fc/go.mod h1:LGm8L/DZjPLmeWGjv5kFrY8dL4uVhMmzmmLYmsObdKE= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= diff --git a/peerconn.go b/peerconn.go index 6615d79c..280a82ae 100644 --- a/peerconn.go +++ b/peerconn.go @@ -15,12 +15,12 @@ import ( "time" "github.com/RoaringBitmap/roaring" + "github.com/anacrolix/chansync" + . "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/multiless" - - "github.com/anacrolix/chansync" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" @@ -64,9 +64,10 @@ type Peer struct { peerImpl callbacks *Callbacks - outgoing bool - Network string - RemoteAddr PeerRemoteAddr + outgoing bool + Network string + RemoteAddr PeerRemoteAddr + bannableAddr Option[bannableAddr] // True if the connection is operating over MSE obfuscation. headerEncrypted bool cryptoMethod mse.CryptoMethod @@ -1387,6 +1388,11 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { ppReq := newRequestFromMessage(msg) req := c.t.requestIndexFromRequest(ppReq) + t := c.t + + if c.bannableAddr.Ok() { + t.smartBanCache.RecordBlock(c.bannableAddr.Value(), req, msg.Piece) + } if c.peerChoking { chunksReceived.Add("while choked", 1) @@ -1426,7 +1432,6 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { } } - t := c.t cl := t.cl // Do we actually want this chunk? diff --git a/piece.go b/piece.go index 1c4375f1..6842f1f0 100644 --- a/piece.go +++ b/piece.go @@ -8,7 +8,6 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/anacrolix/chansync" "github.com/anacrolix/missinggo/v2/bitmap" - "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/storage" diff --git a/request-strategy/ajwerner-btree.go b/request-strategy/ajwerner-btree.go new file mode 100644 index 00000000..209f62dc --- /dev/null +++ b/request-strategy/ajwerner-btree.go @@ -0,0 +1,44 @@ +package request_strategy + +import ( + "github.com/ajwerner/btree" +) + +type ajwernerBtree struct { + btree btree.Set[pieceRequestOrderItem] +} + +var _ Btree = (*ajwernerBtree)(nil) + +func NewAjwernerBtree() *ajwernerBtree { + return &ajwernerBtree{ + btree: btree.MakeSet(func(t, t2 pieceRequestOrderItem) int { + return pieceOrderLess(&t, &t2).OrderingInt() + }), + } +} + +func mustValue[V any](b bool, panicValue V) { + if !b { + panic(panicValue) + } +} + +func (a *ajwernerBtree) Delete(item pieceRequestOrderItem) { + mustValue(a.btree.Delete(item), item) +} + +func (a *ajwernerBtree) Add(item pieceRequestOrderItem) { + _, overwrote := a.btree.Upsert(item) + mustValue(!overwrote, item) +} + +func (a *ajwernerBtree) Scan(f func(pieceRequestOrderItem) bool) { + it := a.btree.Iterator() + it.First() + for it.First(); it.Valid(); it.Next() { + if !f(it.Cur()) { + break + } + } +} diff --git a/request-strategy/order.go b/request-strategy/order.go index cb69d23d..130c698b 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -6,7 +6,6 @@ import ( "github.com/anacrolix/multiless" "github.com/anacrolix/torrent/metainfo" - "github.com/google/btree" "github.com/anacrolix/torrent/types" ) @@ -52,8 +51,7 @@ func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(ih metainf storageLeft = &cap } var allTorrentsUnverifiedBytes int64 - pro.tree.Ascend(func(i btree.Item) bool { - _i := i.(*pieceRequestOrderItem) + pro.tree.Scan(func(_i pieceRequestOrderItem) bool { ih := _i.key.InfoHash var t Torrent = input.Torrent(ih) var piece Piece = t.Piece(_i.key.Index) diff --git a/request-strategy/piece-request-order.go b/request-strategy/piece-request-order.go index dbdac738..5238a3a1 100644 --- a/request-strategy/piece-request-order.go +++ b/request-strategy/piece-request-order.go @@ -1,21 +1,22 @@ package request_strategy -import ( - "fmt" +import "github.com/anacrolix/torrent/metainfo" - "github.com/anacrolix/torrent/metainfo" - "github.com/google/btree" -) +type Btree interface { + Delete(pieceRequestOrderItem) + Add(pieceRequestOrderItem) + Scan(func(pieceRequestOrderItem) bool) +} -func NewPieceOrder() *PieceRequestOrder { +func NewPieceOrder(btree Btree, cap int) *PieceRequestOrder { return &PieceRequestOrder{ - tree: btree.New(32), - keys: make(map[PieceRequestOrderKey]PieceRequestOrderState), + tree: btree, + keys: make(map[PieceRequestOrderKey]PieceRequestOrderState, cap), } } type PieceRequestOrder struct { - tree *btree.BTree + tree Btree keys map[PieceRequestOrderKey]PieceRequestOrderState } @@ -35,8 +36,7 @@ type pieceRequestOrderItem struct { state PieceRequestOrderState } -func (me *pieceRequestOrderItem) Less(other btree.Item) bool { - otherConcrete := other.(*pieceRequestOrderItem) +func (me *pieceRequestOrderItem) Less(otherConcrete *pieceRequestOrderItem) bool { return pieceOrderLess(me, otherConcrete).Less() } @@ -44,16 +44,14 @@ func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrd if _, ok := me.keys[key]; ok { panic(key) } - if me.tree.ReplaceOrInsert(&pieceRequestOrderItem{ - key: key, - state: state, - }) != nil { - panic("shouldn't already have this") - } + me.tree.Add(pieceRequestOrderItem{key, state}) me.keys[key] = state } -func (me *PieceRequestOrder) Update(key PieceRequestOrderKey, state PieceRequestOrderState) { +func (me *PieceRequestOrder) Update( + key PieceRequestOrderKey, + state PieceRequestOrderState, +) { oldState, ok := me.keys[key] if !ok { panic("key should have been added already") @@ -61,17 +59,8 @@ func (me *PieceRequestOrder) Update(key PieceRequestOrderKey, state PieceRequest if state == oldState { return } - item := pieceRequestOrderItem{ - key: key, - state: oldState, - } - if me.tree.Delete(&item) == nil { - panic(fmt.Sprintf("%#v", key)) - } - item.state = state - if me.tree.ReplaceOrInsert(&item) != nil { - panic(key) - } + me.tree.Delete(pieceRequestOrderItem{key, oldState}) + me.tree.Add(pieceRequestOrderItem{key, state}) me.keys[key] = state } @@ -83,12 +72,8 @@ func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceR } func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) { - item := me.existingItemForKey(key) - if me.tree.Delete(&item) == nil { - panic(key) - } + me.tree.Delete(pieceRequestOrderItem{key, me.keys[key]}) delete(me.keys, key) - // log.Printf("deleting %#v", key) } func (me *PieceRequestOrder) Len() int { diff --git a/request-strategy/piece-request-order_test.go b/request-strategy/piece-request-order_test.go new file mode 100644 index 00000000..35c97c2d --- /dev/null +++ b/request-strategy/piece-request-order_test.go @@ -0,0 +1,106 @@ +package request_strategy + +import ( + "testing" + + "github.com/bradfitz/iter" +) + +func benchmarkPieceRequestOrder[B Btree]( + b *testing.B, + // Initialize the next run, and return a Btree + newBtree func() B, + // Set any path hinting for the specified piece + hintForPiece func(index int), + numPieces int, +) { + b.ResetTimer() + b.ReportAllocs() + for range iter.N(b.N) { + pro := NewPieceOrder(newBtree(), numPieces) + state := PieceRequestOrderState{} + doPieces := func(m func(PieceRequestOrderKey)) { + for i := range iter.N(numPieces) { + key := PieceRequestOrderKey{ + Index: i, + } + hintForPiece(i) + m(key) + } + } + doPieces(func(key PieceRequestOrderKey) { + pro.Add(key, state) + }) + state.Availability++ + doPieces(func(key PieceRequestOrderKey) { + pro.Update(key, state) + }) + pro.tree.Scan(func(item pieceRequestOrderItem) bool { + return true + }) + doPieces(func(key PieceRequestOrderKey) { + state.Priority = piecePriority(key.Index / 4) + pro.Update(key, state) + }) + pro.tree.Scan(func(item pieceRequestOrderItem) bool { + return item.key.Index < 1000 + }) + state.Priority = 0 + state.Availability++ + doPieces(func(key PieceRequestOrderKey) { + pro.Update(key, state) + }) + pro.tree.Scan(func(item pieceRequestOrderItem) bool { + return item.key.Index < 1000 + }) + state.Availability-- + doPieces(func(key PieceRequestOrderKey) { + pro.Update(key, state) + }) + doPieces(pro.Delete) + if pro.Len() != 0 { + b.FailNow() + } + } +} + +func zero[T any](t *T) { + var zt T + *t = zt +} + +func BenchmarkPieceRequestOrder(b *testing.B) { + const numPieces = 2000 + b.Run("TidwallBtree", func(b *testing.B) { + b.Run("NoPathHints", func(b *testing.B) { + benchmarkPieceRequestOrder(b, NewTidwallBtree, func(int) {}, numPieces) + }) + b.Run("SharedPathHint", func(b *testing.B) { + var pathHint PieceRequestOrderPathHint + var btree *tidwallBtree + benchmarkPieceRequestOrder( + b, func() *tidwallBtree { + zero(&pathHint) + btree = NewTidwallBtree() + btree.PathHint = &pathHint + return btree + }, func(int) {}, numPieces, + ) + }) + b.Run("PathHintPerPiece", func(b *testing.B) { + pathHints := make([]PieceRequestOrderPathHint, numPieces) + var btree *tidwallBtree + benchmarkPieceRequestOrder( + b, func() *tidwallBtree { + btree = NewTidwallBtree() + return btree + }, func(index int) { + btree.PathHint = &pathHints[index] + }, numPieces, + ) + }) + }) + b.Run("AjwernerBtree", func(b *testing.B) { + benchmarkPieceRequestOrder(b, NewAjwernerBtree, func(index int) {}, numPieces) + }) +} diff --git a/request-strategy/tidwall-btree.go b/request-strategy/tidwall-btree.go new file mode 100644 index 00000000..0d93baf0 --- /dev/null +++ b/request-strategy/tidwall-btree.go @@ -0,0 +1,37 @@ +package request_strategy + +import ( + "github.com/tidwall/btree" +) + +type tidwallBtree struct { + tree *btree.BTree[pieceRequestOrderItem] + PathHint *btree.PathHint +} + +func (me *tidwallBtree) Scan(f func(pieceRequestOrderItem) bool) { + me.tree.Scan(f) +} + +func NewTidwallBtree() *tidwallBtree { + return &tidwallBtree{ + tree: btree.NewOptions( + func(a, b pieceRequestOrderItem) bool { + return a.Less(&b) + }, + btree.Options{NoLocks: true}), + } +} + +func (me *tidwallBtree) Add(item pieceRequestOrderItem) { + if _, ok := me.tree.SetHint(item, me.PathHint); ok { + panic("shouldn't already have this") + } +} + +type PieceRequestOrderPathHint = btree.PathHint + +func (me *tidwallBtree) Delete(item pieceRequestOrderItem) { + _, deleted := me.tree.DeleteHint(item, me.PathHint) + mustValue(deleted, item) +} diff --git a/smartban.go b/smartban.go new file mode 100644 index 00000000..15b4d44b --- /dev/null +++ b/smartban.go @@ -0,0 +1,55 @@ +package torrent + +import ( + "bytes" + "crypto/sha1" + "net/netip" + + "github.com/anacrolix/generics" + "github.com/anacrolix/torrent/smartban" +) + +type bannableAddr = netip.Addr + +type smartBanCache = smartban.Cache[bannableAddr, RequestIndex, [sha1.Size]byte] + +type blockCheckingWriter struct { + cache *smartBanCache + requestIndex RequestIndex + // Peers that didn't match blocks written now. + badPeers map[bannableAddr]struct{} + blockBuffer bytes.Buffer + chunkSize int +} + +func (me *blockCheckingWriter) checkBlock() { + b := me.blockBuffer.Next(me.chunkSize) + for _, peer := range me.cache.CheckBlock(me.requestIndex, b) { + generics.MakeMapIfNilAndSet(&me.badPeers, peer, struct{}{}) + } + me.requestIndex++ +} + +func (me *blockCheckingWriter) checkFullBlocks() { + for me.blockBuffer.Len() >= me.chunkSize { + me.checkBlock() + } +} + +func (me *blockCheckingWriter) Write(b []byte) (int, error) { + n, err := me.blockBuffer.Write(b) + if err != nil { + // bytes.Buffer.Write should never fail. + panic(err) + } + me.checkFullBlocks() + return n, err +} + +// Check any remaining block data. Terminal pieces or piece sizes that don't divide into the chunk +// size cleanly may leave fragments that should be checked. +func (me *blockCheckingWriter) Flush() { + for me.blockBuffer.Len() != 0 { + me.checkBlock() + } +} diff --git a/smartban/smartban.go b/smartban/smartban.go new file mode 100644 index 00000000..96e9b759 --- /dev/null +++ b/smartban/smartban.go @@ -0,0 +1,51 @@ +package smartban + +import ( + "sync" +) + +type Cache[Peer, BlockKey, Hash comparable] struct { + Hash func([]byte) Hash + + lock sync.RWMutex + blocks map[BlockKey]map[Peer]Hash +} + +type Block[Key any] struct { + Key Key + Data []byte +} + +func (me *Cache[Peer, BlockKey, Hash]) Init() { + me.blocks = make(map[BlockKey]map[Peer]Hash) +} + +func (me *Cache[Peer, BlockKey, Hash]) RecordBlock(peer Peer, key BlockKey, data []byte) { + hash := me.Hash(data) + me.lock.Lock() + defer me.lock.Unlock() + peers := me.blocks[key] + if peers == nil { + peers = make(map[Peer]Hash) + me.blocks[key] = peers + } + peers[peer] = hash +} + +func (me *Cache[Peer, BlockKey, Hash]) CheckBlock(key BlockKey, data []byte) (bad []Peer) { + correct := me.Hash(data) + me.lock.RLock() + defer me.lock.RUnlock() + for peer, hash := range me.blocks[key] { + if hash != correct { + bad = append(bad, peer) + } + } + return +} + +func (me *Cache[Peer, BlockKey, Hash]) ForgetBlock(key BlockKey) { + me.lock.Lock() + defer me.lock.Unlock() + delete(me.blocks, key) +} diff --git a/torrent-piece-request-order.go b/torrent-piece-request-order.go index 86c98398..17206679 100644 --- a/torrent-piece-request-order.go +++ b/torrent-piece-request-order.go @@ -45,7 +45,7 @@ func (t *Torrent) initPieceRequestOrder() { key := t.clientPieceRequestOrderKey() cpro := t.cl.pieceRequestOrder if cpro[key] == nil { - cpro[key] = request_strategy.NewPieceOrder() + cpro[key] = request_strategy.NewPieceOrder(request_strategy.NewAjwernerBtree(), t.numPieces()) } } diff --git a/torrent.go b/torrent.go index 8f50879c..f6be6978 100644 --- a/torrent.go +++ b/torrent.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "net/netip" "net/url" "sort" "strings" @@ -19,6 +20,7 @@ import ( "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" + . "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/slices" @@ -151,6 +153,8 @@ type Torrent struct { // Torrent sources in use keyed by the source string. activeSources sync.Map sourcesLogger log.Logger + + smartBanCache smartBanCache } func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) { @@ -943,7 +947,20 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer { return pp.Integer(t.info.PieceLength) } -func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) { +func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter { + return &blockCheckingWriter{ + cache: &t.smartBanCache, + requestIndex: t.pieceRequestIndexOffset(piece), + chunkSize: t.chunkSize.Int(), + } +} + +func (t *Torrent) hashPiece(piece pieceIndex) ( + ret metainfo.Hash, + // These are peers that sent us blocks that differ from what we hash here. + differingPeers map[bannableAddr]struct{}, + err error, +) { p := t.piece(piece) p.waitNoPendingWrites() storagePiece := t.pieces[piece].Storage() @@ -959,13 +976,18 @@ func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) { hash := pieceHash.New() const logPieceContents = false + smartBanWriter := t.smartBanBlockCheckingWriter(piece) + writers := []io.Writer{hash, smartBanWriter} + var examineBuf bytes.Buffer if logPieceContents { - var examineBuf bytes.Buffer - _, err = storagePiece.WriteTo(io.MultiWriter(hash, &examineBuf)) - log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err) - } else { - _, err = storagePiece.WriteTo(hash) + writers = append(writers, &examineBuf) } + _, err = storagePiece.WriteTo(io.MultiWriter(writers...)) + if logPieceContents { + log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err) + } + smartBanWriter.Flush() + differingPeers = smartBanWriter.badPeers missinggo.CopyExact(&ret, hash.Sum(nil)) return } @@ -2024,7 +2046,16 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { if len(bannableTouchers) >= 1 { c := bannableTouchers[0] - c.ban() + if len(bannableTouchers) != 1 { + t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece) + } else { + // Turns out it's still useful to ban peers like this because if there's only a + // single peer for a piece, and we never progress that piece to completion, we + // will never smart-ban them. Discovered in + // https://github.com/anacrolix/torrent/issues/715. + t.logger.Levelf(log.Warning, "banning %v for being sole dirtier of piece %v after failed piece check", c, piece) + c.ban() + } } } t.onIncompletePiece(piece) @@ -2112,9 +2143,32 @@ func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) { return } +func (t *Torrent) dropBannedPeers() { + t.iterPeers(func(p *Peer) { + remoteIp := p.remoteIp() + if remoteIp == nil { + if p.bannableAddr.Ok() { + log.Printf("can't get remote ip for peer %v", p) + } + return + } + netipAddr := netip.MustParseAddr(remoteIp.String()) + if Some(netipAddr) != p.bannableAddr { + log.Printf( + "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]", + p, remoteIp, p.bannableAddr) + } + if _, ok := t.cl.badPeerIPs[netipAddr]; ok { + // Should this be a close? + p.drop() + log.Printf("dropped %v for banned remote IP %v", p, netipAddr) + } + }) +} + func (t *Torrent) pieceHasher(index pieceIndex) { p := t.piece(index) - sum, copyErr := t.hashPiece(index) + sum, failedPeers, copyErr := t.hashPiece(index) correct := sum == *p.hash switch copyErr { case nil, io.EOF: @@ -2124,6 +2178,16 @@ func (t *Torrent) pieceHasher(index pieceIndex) { t.storageLock.RUnlock() t.cl.lock() defer t.cl.unlock() + if correct { + for peer := range failedPeers { + t.cl.banPeerIP(peer.AsSlice()) + log.Printf("smart banned %v for piece %v", peer, index) + } + t.dropBannedPeers() + for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ { + t.smartBanCache.ForgetBlock(ri) + } + } p.hashing = false t.pieceHashed(index, correct, copyErr) t.updatePiecePriority(index, "Torrent.pieceHasher") @@ -2315,8 +2379,9 @@ func (t *Torrent) addWebSeed(url string) { // requests mark more often, so recomputation is probably sooner than with regular peer // conns. ~4x maxRequests would be about right. PeerMaxRequests: 128, - RemoteAddr: remoteAddrFromUrl(url), - callbacks: t.callbacks(), + // TODO: Set ban prefix? + RemoteAddr: remoteAddrFromUrl(url), + callbacks: t.callbacks(), }, client: webseed.Client{ HttpClient: t.cl.httpClient,