Merge branch 'master' into crawshaw-386-wasm
This commit is contained in:
commit
3f74e192f6
|
@ -9,7 +9,7 @@ jobs:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
go-version: [ '1.16', '1.17', 'tip' ]
|
go-version: [ 'tip' ]
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
|
@ -20,7 +20,7 @@ jobs:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
go-version: [ '1.17' ]
|
go-version: [ 'tip' ]
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
|
@ -31,7 +31,7 @@ jobs:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
go-version: [ '1.17' ]
|
go-version: [ 'tip' ]
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
|
@ -42,7 +42,7 @@ jobs:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
go-version: [ '1.16', '1.17' ]
|
go-version: [ 'tip' ]
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
|
@ -54,7 +54,7 @@ jobs:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
go-version: [ '1.16', '1.17' ]
|
go-version: [ 'tip' ]
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
|
@ -66,7 +66,7 @@ jobs:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
go-version: [ '1.17' ]
|
go-version: [ 'tip' ]
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
|
@ -83,4 +83,3 @@ jobs:
|
||||||
- name: torrentfs end-to-end test
|
- name: torrentfs end-to-end test
|
||||||
# Test on 386 for atomic alignment and other bad 64-bit assumptions
|
# Test on 386 for atomic alignment and other bad 64-bit assumptions
|
||||||
run: GOARCH=386 fs/test.sh
|
run: GOARCH=386 fs/test.sh
|
||||||
timeout-minutes: 10
|
|
||||||
|
|
46
client.go
46
client.go
|
@ -4,6 +4,7 @@ import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
|
"crypto/sha1"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"expvar"
|
"expvar"
|
||||||
|
@ -12,6 +13,7 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/netip"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -20,9 +22,9 @@ import (
|
||||||
"github.com/anacrolix/chansync/events"
|
"github.com/anacrolix/chansync/events"
|
||||||
"github.com/anacrolix/dht/v2"
|
"github.com/anacrolix/dht/v2"
|
||||||
"github.com/anacrolix/dht/v2/krpc"
|
"github.com/anacrolix/dht/v2/krpc"
|
||||||
|
"github.com/anacrolix/generics"
|
||||||
"github.com/anacrolix/log"
|
"github.com/anacrolix/log"
|
||||||
"github.com/anacrolix/missinggo/perf"
|
"github.com/anacrolix/missinggo/perf"
|
||||||
"github.com/anacrolix/missinggo/pubsub"
|
|
||||||
"github.com/anacrolix/missinggo/v2"
|
"github.com/anacrolix/missinggo/v2"
|
||||||
"github.com/anacrolix/missinggo/v2/bitmap"
|
"github.com/anacrolix/missinggo/v2/bitmap"
|
||||||
"github.com/anacrolix/missinggo/v2/pproffd"
|
"github.com/anacrolix/missinggo/v2/pproffd"
|
||||||
|
@ -35,6 +37,7 @@ import (
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/anacrolix/chansync"
|
"github.com/anacrolix/chansync"
|
||||||
|
. "github.com/anacrolix/generics"
|
||||||
|
|
||||||
"github.com/anacrolix/torrent/bencode"
|
"github.com/anacrolix/torrent/bencode"
|
||||||
"github.com/anacrolix/torrent/internal/limiter"
|
"github.com/anacrolix/torrent/internal/limiter"
|
||||||
|
@ -73,7 +76,7 @@ type Client struct {
|
||||||
// include ourselves if we end up trying to connect to our own address
|
// include ourselves if we end up trying to connect to our own address
|
||||||
// through legitimate channels.
|
// through legitimate channels.
|
||||||
dopplegangerAddrs map[string]struct{}
|
dopplegangerAddrs map[string]struct{}
|
||||||
badPeerIPs map[string]struct{}
|
badPeerIPs map[netip.Addr]struct{}
|
||||||
torrents map[InfoHash]*Torrent
|
torrents map[InfoHash]*Torrent
|
||||||
pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
|
pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
|
||||||
|
|
||||||
|
@ -100,7 +103,7 @@ func (cl *Client) badPeerIPsLocked() (ips []string) {
|
||||||
ips = make([]string, len(cl.badPeerIPs))
|
ips = make([]string, len(cl.badPeerIPs))
|
||||||
i := 0
|
i := 0
|
||||||
for k := range cl.badPeerIPs {
|
for k := range cl.badPeerIPs {
|
||||||
ips[i] = k
|
ips[i] = k.String()
|
||||||
i += 1
|
i += 1
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
@ -192,7 +195,7 @@ func (cl *Client) announceKey() int32 {
|
||||||
// Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
|
// Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
|
||||||
func (cl *Client) init(cfg *ClientConfig) {
|
func (cl *Client) init(cfg *ClientConfig) {
|
||||||
cl.config = cfg
|
cl.config = cfg
|
||||||
cl.dopplegangerAddrs = make(map[string]struct{})
|
generics.MakeMap(&cl.dopplegangerAddrs)
|
||||||
cl.torrents = make(map[metainfo.Hash]*Torrent)
|
cl.torrents = make(map[metainfo.Hash]*Torrent)
|
||||||
cl.dialRateLimiter = rate.NewLimiter(10, 10)
|
cl.dialRateLimiter = rate.NewLimiter(10, 10)
|
||||||
cl.activeAnnounceLimiter.SlotsPerKey = 2
|
cl.activeAnnounceLimiter.SlotsPerKey = 2
|
||||||
|
@ -381,6 +384,7 @@ func (cl *Client) listenNetworks() (ns []network) {
|
||||||
|
|
||||||
// Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
|
// Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
|
||||||
func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
|
func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
|
||||||
|
logger := cl.logger.WithNames("dht", conn.LocalAddr().String())
|
||||||
cfg := dht.ServerConfig{
|
cfg := dht.ServerConfig{
|
||||||
IPBlocklist: cl.ipBlockList,
|
IPBlocklist: cl.ipBlockList,
|
||||||
Conn: conn,
|
Conn: conn,
|
||||||
|
@ -393,7 +397,7 @@ func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err
|
||||||
}(),
|
}(),
|
||||||
StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
|
StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
|
||||||
OnQuery: cl.config.DHTOnQuery,
|
OnQuery: cl.config.DHTOnQuery,
|
||||||
Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
|
Logger: logger,
|
||||||
}
|
}
|
||||||
if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
|
if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
|
||||||
f(&cfg)
|
f(&cfg)
|
||||||
|
@ -403,9 +407,9 @@ func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err
|
||||||
go func() {
|
go func() {
|
||||||
ts, err := s.Bootstrap()
|
ts, err := s.Bootstrap()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cl.logger.Printf("error bootstrapping dht: %s", err)
|
logger.Levelf(log.Warning, "error bootstrapping dht: %s", err)
|
||||||
}
|
}
|
||||||
log.Fstr("%v completed bootstrap (%+v)", s, ts).AddValues(s, ts).Log(cl.logger)
|
logger.Levelf(log.Debug, "completed bootstrap: %+v", ts)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
@ -1142,7 +1146,11 @@ func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
|
||||||
if _, ok := cl.ipBlockRange(ip); ok {
|
if _, ok := cl.ipBlockRange(ip); ok {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if _, ok := cl.badPeerIPs[ip.String()]; ok {
|
ipAddr, ok := netip.AddrFromSlice(ip)
|
||||||
|
if !ok {
|
||||||
|
panic(ip)
|
||||||
|
}
|
||||||
|
if _, ok := cl.badPeerIPs[ipAddr]; ok {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
@ -1176,8 +1184,7 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
|
||||||
},
|
},
|
||||||
conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
|
conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
|
||||||
|
|
||||||
halfOpen: make(map[string]PeerInfo),
|
halfOpen: make(map[string]PeerInfo),
|
||||||
pieceStateChanges: pubsub.NewPubSub(),
|
|
||||||
|
|
||||||
storageOpener: storageClient,
|
storageOpener: storageClient,
|
||||||
maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
|
maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
|
||||||
|
@ -1188,6 +1195,8 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
|
||||||
webSeeds: make(map[string]*Peer),
|
webSeeds: make(map[string]*Peer),
|
||||||
gotMetainfoC: make(chan struct{}),
|
gotMetainfoC: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
t.smartBanCache.Hash = sha1.Sum
|
||||||
|
t.smartBanCache.Init()
|
||||||
t.networkingEnabled.Set()
|
t.networkingEnabled.Set()
|
||||||
t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString())
|
t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString())
|
||||||
t.sourcesLogger = t.logger.WithNames("sources")
|
t.sourcesLogger = t.logger.WithNames("sources")
|
||||||
|
@ -1441,11 +1450,13 @@ func (cl *Client) AddDhtNodes(nodes []string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cl *Client) banPeerIP(ip net.IP) {
|
func (cl *Client) banPeerIP(ip net.IP) {
|
||||||
cl.logger.Printf("banning ip %v", ip)
|
// We can't take this from string, because it will lose netip's v4on6. net.ParseIP parses v4
|
||||||
if cl.badPeerIPs == nil {
|
// addresses directly to v4on6, which doesn't compare equal with v4.
|
||||||
cl.badPeerIPs = make(map[string]struct{})
|
ipAddr, ok := netip.AddrFromSlice(ip)
|
||||||
|
if !ok {
|
||||||
|
panic(ip)
|
||||||
}
|
}
|
||||||
cl.badPeerIPs[ip.String()] = struct{}{}
|
generics.MakeMapIfNilAndSet(&cl.badPeerIPs, ipAddr, struct{}{})
|
||||||
for _, t := range cl.torrents {
|
for _, t := range cl.torrents {
|
||||||
t.iterPeers(func(p *Peer) {
|
t.iterPeers(func(p *Peer) {
|
||||||
if p.remoteIp().Equal(ip) {
|
if p.remoteIp().Equal(ip) {
|
||||||
|
@ -1475,6 +1486,13 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot
|
||||||
connString: connString,
|
connString: connString,
|
||||||
conn: nc,
|
conn: nc,
|
||||||
}
|
}
|
||||||
|
// TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
|
||||||
|
if remoteAddr != nil {
|
||||||
|
netipAddrPort, err := netip.ParseAddrPort(remoteAddr.String())
|
||||||
|
if err == nil {
|
||||||
|
c.bannableAddr = Some(netipAddrPort.Addr())
|
||||||
|
}
|
||||||
|
}
|
||||||
c.peerImpl = c
|
c.peerImpl = c
|
||||||
c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
|
c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
|
||||||
c.setRW(connStatsReadWriter{nc, c})
|
c.setRW(connStatsReadWriter{nc, c})
|
||||||
|
|
|
@ -491,8 +491,7 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
for !reflect.DeepEqual(completes, expected) {
|
for !reflect.DeepEqual(completes, expected) {
|
||||||
_v := <-psc.Values
|
v := <-psc.Values
|
||||||
v := _v.(PieceStateChange)
|
|
||||||
completes[v.Index] = v.Complete
|
completes[v.Index] = v.Complete
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"expvar"
|
"expvar"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -15,7 +16,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/anacrolix/log"
|
"github.com/anacrolix/log"
|
||||||
"github.com/anacrolix/missinggo/v2"
|
|
||||||
"github.com/anacrolix/tagflag"
|
"github.com/anacrolix/tagflag"
|
||||||
"github.com/anacrolix/torrent"
|
"github.com/anacrolix/torrent"
|
||||||
"github.com/anacrolix/torrent/iplist"
|
"github.com/anacrolix/torrent/iplist"
|
||||||
|
@ -89,7 +89,7 @@ func resolveTestPeers(addrs []string) (ret []torrent.PeerInfo) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func addTorrents(client *torrent.Client, flags downloadFlags) error {
|
func addTorrents(ctx context.Context, client *torrent.Client, flags downloadFlags, wg *sync.WaitGroup) error {
|
||||||
testPeers := resolveTestPeers(flags.TestPeer)
|
testPeers := resolveTestPeers(flags.TestPeer)
|
||||||
for _, arg := range flags.Torrent {
|
for _, arg := range flags.Torrent {
|
||||||
t, err := func() (*torrent.Torrent, error) {
|
t, err := func() (*torrent.Torrent, error) {
|
||||||
|
@ -137,10 +137,30 @@ func addTorrents(client *torrent.Client, flags downloadFlags) error {
|
||||||
torrentBar(t, flags.PieceStates)
|
torrentBar(t, flags.PieceStates)
|
||||||
}
|
}
|
||||||
t.AddPeers(testPeers)
|
t.AddPeers(testPeers)
|
||||||
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
<-t.GotInfo()
|
defer wg.Done()
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-t.GotInfo():
|
||||||
|
}
|
||||||
|
if flags.SaveMetainfos {
|
||||||
|
path := fmt.Sprintf("%v.torrent", t.InfoHash().HexString())
|
||||||
|
err := writeMetainfoToFile(t.Metainfo(), path)
|
||||||
|
if err == nil {
|
||||||
|
log.Printf("wrote %q", path)
|
||||||
|
} else {
|
||||||
|
log.Printf("error writing %q: %v", path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
if len(flags.File) == 0 {
|
if len(flags.File) == 0 {
|
||||||
t.DownloadAll()
|
t.DownloadAll()
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
waitForPieces(ctx, t, 0, t.NumPieces())
|
||||||
|
}()
|
||||||
if flags.LinearDiscard {
|
if flags.LinearDiscard {
|
||||||
r := t.NewReader()
|
r := t.NewReader()
|
||||||
io.Copy(io.Discard, r)
|
io.Copy(io.Discard, r)
|
||||||
|
@ -150,6 +170,11 @@ func addTorrents(client *torrent.Client, flags downloadFlags) error {
|
||||||
for _, f := range t.Files() {
|
for _, f := range t.Files() {
|
||||||
for _, fileArg := range flags.File {
|
for _, fileArg := range flags.File {
|
||||||
if f.DisplayPath() == fileArg {
|
if f.DisplayPath() == fileArg {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
waitForPieces(ctx, t, f.BeginPieceIndex(), f.EndPieceIndex())
|
||||||
|
}()
|
||||||
f.Download()
|
f.Download()
|
||||||
if flags.LinearDiscard {
|
if flags.LinearDiscard {
|
||||||
r := f.NewReader()
|
r := f.NewReader()
|
||||||
|
@ -167,12 +192,52 @@ func addTorrents(client *torrent.Client, flags downloadFlags) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func waitForPieces(ctx context.Context, t *torrent.Torrent, beginIndex, endIndex int) {
|
||||||
|
sub := t.SubscribePieceStateChanges()
|
||||||
|
defer sub.Close()
|
||||||
|
pending := make(map[int]struct{})
|
||||||
|
for i := beginIndex; i < endIndex; i++ {
|
||||||
|
pending[i] = struct{}{}
|
||||||
|
}
|
||||||
|
expected := storage.Completion{
|
||||||
|
Complete: true,
|
||||||
|
Ok: true,
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case ev := <-sub.Values:
|
||||||
|
if ev.Completion == expected {
|
||||||
|
delete(pending, ev.Index)
|
||||||
|
}
|
||||||
|
if len(pending) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeMetainfoToFile(mi metainfo.MetaInfo, path string) error {
|
||||||
|
f, err := os.OpenFile(path, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0640)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
err = mi.Write(f)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return f.Close()
|
||||||
|
}
|
||||||
|
|
||||||
type downloadFlags struct {
|
type downloadFlags struct {
|
||||||
Debug bool
|
Debug bool
|
||||||
DownloadCmd
|
DownloadCmd
|
||||||
}
|
}
|
||||||
|
|
||||||
type DownloadCmd struct {
|
type DownloadCmd struct {
|
||||||
|
SaveMetainfos bool
|
||||||
Mmap bool `help:"memory-map torrent data"`
|
Mmap bool `help:"memory-map torrent data"`
|
||||||
Seed bool `help:"seed after download is complete"`
|
Seed bool `help:"seed after download is complete"`
|
||||||
Addr string `help:"network listen addr"`
|
Addr string `help:"network listen addr"`
|
||||||
|
@ -211,15 +276,6 @@ func statsEnabled(flags downloadFlags) bool {
|
||||||
return flags.Stats
|
return flags.Stats
|
||||||
}
|
}
|
||||||
|
|
||||||
func exitSignalHandlers(notify *missinggo.SynchronizedEvent) {
|
|
||||||
c := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
for {
|
|
||||||
log.Printf("close signal received: %+v", <-c)
|
|
||||||
notify.Set()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func downloadErr(flags downloadFlags) error {
|
func downloadErr(flags downloadFlags) error {
|
||||||
clientConfig := torrent.NewDefaultClientConfig()
|
clientConfig := torrent.NewDefaultClientConfig()
|
||||||
clientConfig.DisableWebseeds = flags.DisableWebseeds
|
clientConfig.DisableWebseeds = flags.DisableWebseeds
|
||||||
|
@ -269,35 +325,29 @@ func downloadErr(flags downloadFlags) error {
|
||||||
}
|
}
|
||||||
clientConfig.MaxUnverifiedBytes = flags.MaxUnverifiedBytes.Int64()
|
clientConfig.MaxUnverifiedBytes = flags.MaxUnverifiedBytes.Int64()
|
||||||
|
|
||||||
var stop missinggo.SynchronizedEvent
|
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||||
defer func() {
|
defer cancel()
|
||||||
stop.Set()
|
|
||||||
}()
|
|
||||||
|
|
||||||
client, err := torrent.NewClient(clientConfig)
|
client, err := torrent.NewClient(clientConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("creating client: %w", err)
|
return fmt.Errorf("creating client: %w", err)
|
||||||
}
|
}
|
||||||
var clientClose sync.Once // In certain situations, close was being called more than once.
|
defer client.Close()
|
||||||
defer clientClose.Do(func() { client.Close() })
|
|
||||||
go exitSignalHandlers(&stop)
|
|
||||||
go func() {
|
|
||||||
<-stop.C()
|
|
||||||
clientClose.Do(func() { client.Close() })
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Write status on the root path on the default HTTP muxer. This will be bound to localhost
|
// Write status on the root path on the default HTTP muxer. This will be bound to localhost
|
||||||
// somewhere if GOPPROF is set, thanks to the envpprof import.
|
// somewhere if GOPPROF is set, thanks to the envpprof import.
|
||||||
http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
|
http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
|
||||||
client.WriteStatus(w)
|
client.WriteStatus(w)
|
||||||
})
|
})
|
||||||
err = addTorrents(client, flags)
|
var wg sync.WaitGroup
|
||||||
started := time.Now()
|
err = addTorrents(ctx, client, flags, &wg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("adding torrents: %w", err)
|
return fmt.Errorf("adding torrents: %w", err)
|
||||||
}
|
}
|
||||||
|
started := time.Now()
|
||||||
defer outputStats(client, flags)
|
defer outputStats(client, flags)
|
||||||
if client.WaitAll() {
|
wg.Wait()
|
||||||
|
if ctx.Err() == nil {
|
||||||
log.Print("downloaded ALL the torrents")
|
log.Print("downloaded ALL the torrents")
|
||||||
} else {
|
} else {
|
||||||
err = errors.New("y u no complete torrents?!")
|
err = errors.New("y u no complete torrents?!")
|
||||||
|
@ -314,7 +364,7 @@ func downloadErr(flags downloadFlags) error {
|
||||||
log.Print("no torrents to seed")
|
log.Print("no torrents to seed")
|
||||||
} else {
|
} else {
|
||||||
outputStats(client, flags)
|
outputStats(client, flags)
|
||||||
<-stop.C()
|
<-ctx.Done()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
spew.Dump(expvar.Get("torrent").(*expvar.Map).Get("chunks received"))
|
spew.Dump(expvar.Get("torrent").(*expvar.Map).Get("chunks received"))
|
||||||
|
|
|
@ -77,6 +77,7 @@ func pprintMetainfo(metainfo *metainfo.MetaInfo, flags pprintMetainfoFlags) erro
|
||||||
}
|
}
|
||||||
d := map[string]interface{}{
|
d := map[string]interface{}{
|
||||||
"Name": info.Name,
|
"Name": info.Name,
|
||||||
|
"Name.Utf8": info.NameUtf8,
|
||||||
"NumPieces": info.NumPieces(),
|
"NumPieces": info.NumPieces(),
|
||||||
"PieceLength": info.PieceLength,
|
"PieceLength": info.PieceLength,
|
||||||
"InfoHash": metainfo.HashInfoBytes().HexString(),
|
"InfoHash": metainfo.HashInfoBytes().HexString(),
|
||||||
|
|
8
file.go
8
file.go
|
@ -86,7 +86,7 @@ func fileBytesLeft(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *File) bytesLeft() (left int64) {
|
func (f *File) bytesLeft() (left int64) {
|
||||||
return fileBytesLeft(int64(f.t.usualPieceSize()), f.firstPieceIndex(), f.endPieceIndex(), f.offset, f.length, &f.t._completedPieces)
|
return fileBytesLeft(int64(f.t.usualPieceSize()), f.BeginPieceIndex(), f.EndPieceIndex(), f.offset, f.length, &f.t._completedPieces)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The relative file path for a multi-file torrent, and the torrent name for a
|
// The relative file path for a multi-file torrent, and the torrent name for a
|
||||||
|
@ -149,7 +149,7 @@ func (f *File) SetPriority(prio piecePriority) {
|
||||||
f.t.cl.lock()
|
f.t.cl.lock()
|
||||||
if prio != f.prio {
|
if prio != f.prio {
|
||||||
f.prio = prio
|
f.prio = prio
|
||||||
f.t.updatePiecePriorities(f.firstPieceIndex(), f.endPieceIndex(), "File.SetPriority")
|
f.t.updatePiecePriorities(f.BeginPieceIndex(), f.EndPieceIndex(), "File.SetPriority")
|
||||||
}
|
}
|
||||||
f.t.cl.unlock()
|
f.t.cl.unlock()
|
||||||
}
|
}
|
||||||
|
@ -163,7 +163,7 @@ func (f *File) Priority() (prio piecePriority) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the index of the first piece containing data for the file.
|
// Returns the index of the first piece containing data for the file.
|
||||||
func (f *File) firstPieceIndex() pieceIndex {
|
func (f *File) BeginPieceIndex() int {
|
||||||
if f.t.usualPieceSize() == 0 {
|
if f.t.usualPieceSize() == 0 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
@ -171,7 +171,7 @@ func (f *File) firstPieceIndex() pieceIndex {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the index of the piece after the last one containing data for the file.
|
// Returns the index of the piece after the last one containing data for the file.
|
||||||
func (f *File) endPieceIndex() pieceIndex {
|
func (f *File) EndPieceIndex() int {
|
||||||
if f.t.usualPieceSize() == 0 {
|
if f.t.usualPieceSize() == 0 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
49
go.mod
49
go.mod
|
@ -1,22 +1,24 @@
|
||||||
module github.com/anacrolix/torrent
|
module github.com/anacrolix/torrent
|
||||||
|
|
||||||
go 1.16
|
go 1.18
|
||||||
|
|
||||||
require (
|
require (
|
||||||
crawshaw.io/sqlite v0.3.3-0.20210127221821-98b1f83c5508
|
crawshaw.io/sqlite v0.3.3-0.20210127221821-98b1f83c5508
|
||||||
github.com/RoaringBitmap/roaring v0.9.4
|
github.com/RoaringBitmap/roaring v0.9.4
|
||||||
|
github.com/ajwerner/btree v0.0.0-20211201061316-91c8b66ad617
|
||||||
github.com/alexflint/go-arg v1.4.2
|
github.com/alexflint/go-arg v1.4.2
|
||||||
github.com/anacrolix/args v0.5.0
|
github.com/anacrolix/args v0.5.0
|
||||||
github.com/anacrolix/chansync v0.3.0
|
github.com/anacrolix/chansync v0.3.0
|
||||||
github.com/anacrolix/dht/v2 v2.15.2-0.20220123034220-0538803801cb
|
github.com/anacrolix/dht/v2 v2.16.2-0.20220311024416-dd658f18fd51
|
||||||
github.com/anacrolix/envpprof v1.1.1
|
github.com/anacrolix/envpprof v1.1.1
|
||||||
github.com/anacrolix/fuse v0.2.0
|
github.com/anacrolix/fuse v0.2.0
|
||||||
|
github.com/anacrolix/generics v0.0.0-20220217222028-44932cf46edd
|
||||||
github.com/anacrolix/go-libutp v1.2.0
|
github.com/anacrolix/go-libutp v1.2.0
|
||||||
github.com/anacrolix/log v0.13.1
|
github.com/anacrolix/log v0.13.1
|
||||||
github.com/anacrolix/missinggo v1.3.0
|
github.com/anacrolix/missinggo v1.3.0
|
||||||
github.com/anacrolix/missinggo/perf v1.0.0
|
github.com/anacrolix/missinggo/perf v1.0.0
|
||||||
github.com/anacrolix/missinggo/v2 v2.5.2
|
github.com/anacrolix/missinggo/v2 v2.5.4-0.20220317032254-8c5ea4947a0b
|
||||||
github.com/anacrolix/multiless v0.2.0
|
github.com/anacrolix/multiless v0.2.1-0.20211218050420-533661eef5dc
|
||||||
github.com/anacrolix/squirrel v0.4.1-0.20220122230132-14b040773bac
|
github.com/anacrolix/squirrel v0.4.1-0.20220122230132-14b040773bac
|
||||||
github.com/anacrolix/sync v0.4.0
|
github.com/anacrolix/sync v0.4.0
|
||||||
github.com/anacrolix/tagflag v1.3.0
|
github.com/anacrolix/tagflag v1.3.0
|
||||||
|
@ -37,10 +39,49 @@ require (
|
||||||
github.com/pion/webrtc/v3 v3.1.24-0.20220208053747-94262c1b2b38
|
github.com/pion/webrtc/v3 v3.1.24-0.20220208053747-94262c1b2b38
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
github.com/stretchr/testify v1.7.0
|
github.com/stretchr/testify v1.7.0
|
||||||
|
github.com/tidwall/btree v0.7.2-0.20211211132910-4215444137fc
|
||||||
go.etcd.io/bbolt v1.3.6
|
go.etcd.io/bbolt v1.3.6
|
||||||
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
|
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
|
||||||
)
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/alexflint/go-scalar v1.1.0 // indirect
|
||||||
|
github.com/anacrolix/mmsg v1.0.0 // indirect
|
||||||
|
github.com/anacrolix/stm v0.3.0 // indirect
|
||||||
|
github.com/benbjohnson/immutable v0.3.0 // indirect
|
||||||
|
github.com/bits-and-blooms/bitset v1.2.0 // indirect
|
||||||
|
github.com/google/uuid v1.3.0 // indirect
|
||||||
|
github.com/huandu/xstrings v1.3.2 // indirect
|
||||||
|
github.com/kr/pretty v0.3.0 // indirect
|
||||||
|
github.com/kr/text v0.2.0 // indirect
|
||||||
|
github.com/mschoch/smat v0.2.0 // indirect
|
||||||
|
github.com/pion/dtls/v2 v2.1.2 // indirect
|
||||||
|
github.com/pion/ice/v2 v2.1.20 // indirect
|
||||||
|
github.com/pion/interceptor v0.1.7 // indirect
|
||||||
|
github.com/pion/logging v0.2.2 // indirect
|
||||||
|
github.com/pion/mdns v0.0.5 // indirect
|
||||||
|
github.com/pion/randutil v0.1.0 // indirect
|
||||||
|
github.com/pion/rtcp v1.2.9 // indirect
|
||||||
|
github.com/pion/rtp v1.7.4 // indirect
|
||||||
|
github.com/pion/sctp v1.8.2 // indirect
|
||||||
|
github.com/pion/sdp/v3 v3.0.4 // indirect
|
||||||
|
github.com/pion/srtp/v2 v2.0.5 // indirect
|
||||||
|
github.com/pion/stun v0.3.5 // indirect
|
||||||
|
github.com/pion/transport v0.13.0 // indirect
|
||||||
|
github.com/pion/turn/v2 v2.0.6 // indirect
|
||||||
|
github.com/pion/udp v0.1.1 // indirect
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
|
github.com/rogpeppe/go-internal v1.8.0 // indirect
|
||||||
|
github.com/rs/dnscache v0.0.0-20210201191234-295bba877686 // indirect
|
||||||
|
github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 // indirect
|
||||||
|
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect
|
||||||
|
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
|
||||||
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||||
|
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
|
||||||
|
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
|
||||||
|
)
|
||||||
|
|
||||||
retract (
|
retract (
|
||||||
// Doesn't signal interest to peers if choked when piece priorities change.
|
// Doesn't signal interest to peers if choked when piece priorities change.
|
||||||
v1.39.0
|
v1.39.0
|
||||||
|
|
|
@ -6,14 +6,14 @@ import "strings"
|
||||||
type FileInfo struct {
|
type FileInfo struct {
|
||||||
Length int64 `bencode:"length"` // BEP3
|
Length int64 `bencode:"length"` // BEP3
|
||||||
Path []string `bencode:"path"` // BEP3
|
Path []string `bencode:"path"` // BEP3
|
||||||
PathUTF8 []string `bencode:"path.utf-8,omitempty"`
|
PathUtf8 []string `bencode:"path.utf-8,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi *FileInfo) DisplayPath(info *Info) string {
|
func (fi *FileInfo) DisplayPath(info *Info) string {
|
||||||
if info.IsDir() {
|
if info.IsDir() {
|
||||||
return strings.Join(fi.Path, "/")
|
return strings.Join(fi.BestPath(), "/")
|
||||||
} else {
|
} else {
|
||||||
return info.Name
|
return info.BestName()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,3 +26,10 @@ func (me FileInfo) Offset(info *Info) (ret int64) {
|
||||||
}
|
}
|
||||||
panic("not found")
|
panic("not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fi FileInfo) BestPath() []string {
|
||||||
|
if len(fi.PathUtf8) != 0 {
|
||||||
|
return fi.PathUtf8
|
||||||
|
}
|
||||||
|
return fi.Path
|
||||||
|
}
|
||||||
|
|
|
@ -13,9 +13,10 @@ import (
|
||||||
|
|
||||||
// The info dictionary.
|
// The info dictionary.
|
||||||
type Info struct {
|
type Info struct {
|
||||||
PieceLength int64 `bencode:"piece length"` // BEP3
|
PieceLength int64 `bencode:"piece length"` // BEP3
|
||||||
Pieces []byte `bencode:"pieces"` // BEP3
|
Pieces []byte `bencode:"pieces"` // BEP3
|
||||||
Name string `bencode:"name"` // BEP3
|
Name string `bencode:"name"` // BEP3
|
||||||
|
NameUtf8 string `bencode:"name.utf-8,omitempty"`
|
||||||
Length int64 `bencode:"length,omitempty"` // BEP3, mutually exclusive with Files
|
Length int64 `bencode:"length,omitempty"` // BEP3, mutually exclusive with Files
|
||||||
Private *bool `bencode:"private,omitempty"` // BEP27
|
Private *bool `bencode:"private,omitempty"` // BEP27
|
||||||
// TODO: Document this field.
|
// TODO: Document this field.
|
||||||
|
@ -152,3 +153,10 @@ func (info *Info) UpvertedFiles() []FileInfo {
|
||||||
func (info *Info) Piece(index int) Piece {
|
func (info *Info) Piece(index int) Piece {
|
||||||
return Piece{info, pieceIndex(index)}
|
return Piece{info, pieceIndex(index)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (info Info) BestName() string {
|
||||||
|
if info.NameUtf8 != "" {
|
||||||
|
return info.NameUtf8
|
||||||
|
}
|
||||||
|
return info.Name
|
||||||
|
}
|
||||||
|
|
17
peerconn.go
17
peerconn.go
|
@ -15,12 +15,12 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/RoaringBitmap/roaring"
|
"github.com/RoaringBitmap/roaring"
|
||||||
|
"github.com/anacrolix/chansync"
|
||||||
|
. "github.com/anacrolix/generics"
|
||||||
"github.com/anacrolix/log"
|
"github.com/anacrolix/log"
|
||||||
"github.com/anacrolix/missinggo/iter"
|
"github.com/anacrolix/missinggo/iter"
|
||||||
"github.com/anacrolix/missinggo/v2/bitmap"
|
"github.com/anacrolix/missinggo/v2/bitmap"
|
||||||
"github.com/anacrolix/multiless"
|
"github.com/anacrolix/multiless"
|
||||||
|
|
||||||
"github.com/anacrolix/chansync"
|
|
||||||
"github.com/anacrolix/torrent/bencode"
|
"github.com/anacrolix/torrent/bencode"
|
||||||
"github.com/anacrolix/torrent/metainfo"
|
"github.com/anacrolix/torrent/metainfo"
|
||||||
"github.com/anacrolix/torrent/mse"
|
"github.com/anacrolix/torrent/mse"
|
||||||
|
@ -64,9 +64,10 @@ type Peer struct {
|
||||||
peerImpl
|
peerImpl
|
||||||
callbacks *Callbacks
|
callbacks *Callbacks
|
||||||
|
|
||||||
outgoing bool
|
outgoing bool
|
||||||
Network string
|
Network string
|
||||||
RemoteAddr PeerRemoteAddr
|
RemoteAddr PeerRemoteAddr
|
||||||
|
bannableAddr Option[bannableAddr]
|
||||||
// True if the connection is operating over MSE obfuscation.
|
// True if the connection is operating over MSE obfuscation.
|
||||||
headerEncrypted bool
|
headerEncrypted bool
|
||||||
cryptoMethod mse.CryptoMethod
|
cryptoMethod mse.CryptoMethod
|
||||||
|
@ -1387,6 +1388,11 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
|
||||||
|
|
||||||
ppReq := newRequestFromMessage(msg)
|
ppReq := newRequestFromMessage(msg)
|
||||||
req := c.t.requestIndexFromRequest(ppReq)
|
req := c.t.requestIndexFromRequest(ppReq)
|
||||||
|
t := c.t
|
||||||
|
|
||||||
|
if c.bannableAddr.Ok() {
|
||||||
|
t.smartBanCache.RecordBlock(c.bannableAddr.Value(), req, msg.Piece)
|
||||||
|
}
|
||||||
|
|
||||||
if c.peerChoking {
|
if c.peerChoking {
|
||||||
chunksReceived.Add("while choked", 1)
|
chunksReceived.Add("while choked", 1)
|
||||||
|
@ -1426,7 +1432,6 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t := c.t
|
|
||||||
cl := t.cl
|
cl := t.cl
|
||||||
|
|
||||||
// Do we actually want this chunk?
|
// Do we actually want this chunk?
|
||||||
|
|
1
piece.go
1
piece.go
|
@ -8,7 +8,6 @@ import (
|
||||||
"github.com/RoaringBitmap/roaring"
|
"github.com/RoaringBitmap/roaring"
|
||||||
"github.com/anacrolix/chansync"
|
"github.com/anacrolix/chansync"
|
||||||
"github.com/anacrolix/missinggo/v2/bitmap"
|
"github.com/anacrolix/missinggo/v2/bitmap"
|
||||||
|
|
||||||
"github.com/anacrolix/torrent/metainfo"
|
"github.com/anacrolix/torrent/metainfo"
|
||||||
pp "github.com/anacrolix/torrent/peer_protocol"
|
pp "github.com/anacrolix/torrent/peer_protocol"
|
||||||
"github.com/anacrolix/torrent/storage"
|
"github.com/anacrolix/torrent/storage"
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
package request_strategy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/ajwerner/btree"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ajwernerBtree struct {
|
||||||
|
btree btree.Set[pieceRequestOrderItem]
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Btree = (*ajwernerBtree)(nil)
|
||||||
|
|
||||||
|
func NewAjwernerBtree() *ajwernerBtree {
|
||||||
|
return &ajwernerBtree{
|
||||||
|
btree: btree.MakeSet(func(t, t2 pieceRequestOrderItem) int {
|
||||||
|
return pieceOrderLess(&t, &t2).OrderingInt()
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustValue[V any](b bool, panicValue V) {
|
||||||
|
if !b {
|
||||||
|
panic(panicValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ajwernerBtree) Delete(item pieceRequestOrderItem) {
|
||||||
|
mustValue(a.btree.Delete(item), item)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ajwernerBtree) Add(item pieceRequestOrderItem) {
|
||||||
|
_, overwrote := a.btree.Upsert(item)
|
||||||
|
mustValue(!overwrote, item)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ajwernerBtree) Scan(f func(pieceRequestOrderItem) bool) {
|
||||||
|
it := a.btree.Iterator()
|
||||||
|
it.First()
|
||||||
|
for it.First(); it.Valid(); it.Next() {
|
||||||
|
if !f(it.Cur()) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -6,7 +6,6 @@ import (
|
||||||
|
|
||||||
"github.com/anacrolix/multiless"
|
"github.com/anacrolix/multiless"
|
||||||
"github.com/anacrolix/torrent/metainfo"
|
"github.com/anacrolix/torrent/metainfo"
|
||||||
"github.com/google/btree"
|
|
||||||
|
|
||||||
"github.com/anacrolix/torrent/types"
|
"github.com/anacrolix/torrent/types"
|
||||||
)
|
)
|
||||||
|
@ -52,8 +51,7 @@ func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(ih metainf
|
||||||
storageLeft = &cap
|
storageLeft = &cap
|
||||||
}
|
}
|
||||||
var allTorrentsUnverifiedBytes int64
|
var allTorrentsUnverifiedBytes int64
|
||||||
pro.tree.Ascend(func(i btree.Item) bool {
|
pro.tree.Scan(func(_i pieceRequestOrderItem) bool {
|
||||||
_i := i.(*pieceRequestOrderItem)
|
|
||||||
ih := _i.key.InfoHash
|
ih := _i.key.InfoHash
|
||||||
var t Torrent = input.Torrent(ih)
|
var t Torrent = input.Torrent(ih)
|
||||||
var piece Piece = t.Piece(_i.key.Index)
|
var piece Piece = t.Piece(_i.key.Index)
|
||||||
|
|
|
@ -1,21 +1,22 @@
|
||||||
package request_strategy
|
package request_strategy
|
||||||
|
|
||||||
import (
|
import "github.com/anacrolix/torrent/metainfo"
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/anacrolix/torrent/metainfo"
|
type Btree interface {
|
||||||
"github.com/google/btree"
|
Delete(pieceRequestOrderItem)
|
||||||
)
|
Add(pieceRequestOrderItem)
|
||||||
|
Scan(func(pieceRequestOrderItem) bool)
|
||||||
|
}
|
||||||
|
|
||||||
func NewPieceOrder() *PieceRequestOrder {
|
func NewPieceOrder(btree Btree, cap int) *PieceRequestOrder {
|
||||||
return &PieceRequestOrder{
|
return &PieceRequestOrder{
|
||||||
tree: btree.New(32),
|
tree: btree,
|
||||||
keys: make(map[PieceRequestOrderKey]PieceRequestOrderState),
|
keys: make(map[PieceRequestOrderKey]PieceRequestOrderState, cap),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type PieceRequestOrder struct {
|
type PieceRequestOrder struct {
|
||||||
tree *btree.BTree
|
tree Btree
|
||||||
keys map[PieceRequestOrderKey]PieceRequestOrderState
|
keys map[PieceRequestOrderKey]PieceRequestOrderState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,8 +36,7 @@ type pieceRequestOrderItem struct {
|
||||||
state PieceRequestOrderState
|
state PieceRequestOrderState
|
||||||
}
|
}
|
||||||
|
|
||||||
func (me *pieceRequestOrderItem) Less(other btree.Item) bool {
|
func (me *pieceRequestOrderItem) Less(otherConcrete *pieceRequestOrderItem) bool {
|
||||||
otherConcrete := other.(*pieceRequestOrderItem)
|
|
||||||
return pieceOrderLess(me, otherConcrete).Less()
|
return pieceOrderLess(me, otherConcrete).Less()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,16 +44,14 @@ func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrd
|
||||||
if _, ok := me.keys[key]; ok {
|
if _, ok := me.keys[key]; ok {
|
||||||
panic(key)
|
panic(key)
|
||||||
}
|
}
|
||||||
if me.tree.ReplaceOrInsert(&pieceRequestOrderItem{
|
me.tree.Add(pieceRequestOrderItem{key, state})
|
||||||
key: key,
|
|
||||||
state: state,
|
|
||||||
}) != nil {
|
|
||||||
panic("shouldn't already have this")
|
|
||||||
}
|
|
||||||
me.keys[key] = state
|
me.keys[key] = state
|
||||||
}
|
}
|
||||||
|
|
||||||
func (me *PieceRequestOrder) Update(key PieceRequestOrderKey, state PieceRequestOrderState) {
|
func (me *PieceRequestOrder) Update(
|
||||||
|
key PieceRequestOrderKey,
|
||||||
|
state PieceRequestOrderState,
|
||||||
|
) {
|
||||||
oldState, ok := me.keys[key]
|
oldState, ok := me.keys[key]
|
||||||
if !ok {
|
if !ok {
|
||||||
panic("key should have been added already")
|
panic("key should have been added already")
|
||||||
|
@ -61,17 +59,8 @@ func (me *PieceRequestOrder) Update(key PieceRequestOrderKey, state PieceRequest
|
||||||
if state == oldState {
|
if state == oldState {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
item := pieceRequestOrderItem{
|
me.tree.Delete(pieceRequestOrderItem{key, oldState})
|
||||||
key: key,
|
me.tree.Add(pieceRequestOrderItem{key, state})
|
||||||
state: oldState,
|
|
||||||
}
|
|
||||||
if me.tree.Delete(&item) == nil {
|
|
||||||
panic(fmt.Sprintf("%#v", key))
|
|
||||||
}
|
|
||||||
item.state = state
|
|
||||||
if me.tree.ReplaceOrInsert(&item) != nil {
|
|
||||||
panic(key)
|
|
||||||
}
|
|
||||||
me.keys[key] = state
|
me.keys[key] = state
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,12 +72,8 @@ func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceR
|
||||||
}
|
}
|
||||||
|
|
||||||
func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) {
|
func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) {
|
||||||
item := me.existingItemForKey(key)
|
me.tree.Delete(pieceRequestOrderItem{key, me.keys[key]})
|
||||||
if me.tree.Delete(&item) == nil {
|
|
||||||
panic(key)
|
|
||||||
}
|
|
||||||
delete(me.keys, key)
|
delete(me.keys, key)
|
||||||
// log.Printf("deleting %#v", key)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (me *PieceRequestOrder) Len() int {
|
func (me *PieceRequestOrder) Len() int {
|
||||||
|
|
|
@ -0,0 +1,106 @@
|
||||||
|
package request_strategy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/bradfitz/iter"
|
||||||
|
)
|
||||||
|
|
||||||
|
func benchmarkPieceRequestOrder[B Btree](
|
||||||
|
b *testing.B,
|
||||||
|
// Initialize the next run, and return a Btree
|
||||||
|
newBtree func() B,
|
||||||
|
// Set any path hinting for the specified piece
|
||||||
|
hintForPiece func(index int),
|
||||||
|
numPieces int,
|
||||||
|
) {
|
||||||
|
b.ResetTimer()
|
||||||
|
b.ReportAllocs()
|
||||||
|
for range iter.N(b.N) {
|
||||||
|
pro := NewPieceOrder(newBtree(), numPieces)
|
||||||
|
state := PieceRequestOrderState{}
|
||||||
|
doPieces := func(m func(PieceRequestOrderKey)) {
|
||||||
|
for i := range iter.N(numPieces) {
|
||||||
|
key := PieceRequestOrderKey{
|
||||||
|
Index: i,
|
||||||
|
}
|
||||||
|
hintForPiece(i)
|
||||||
|
m(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
doPieces(func(key PieceRequestOrderKey) {
|
||||||
|
pro.Add(key, state)
|
||||||
|
})
|
||||||
|
state.Availability++
|
||||||
|
doPieces(func(key PieceRequestOrderKey) {
|
||||||
|
pro.Update(key, state)
|
||||||
|
})
|
||||||
|
pro.tree.Scan(func(item pieceRequestOrderItem) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
doPieces(func(key PieceRequestOrderKey) {
|
||||||
|
state.Priority = piecePriority(key.Index / 4)
|
||||||
|
pro.Update(key, state)
|
||||||
|
})
|
||||||
|
pro.tree.Scan(func(item pieceRequestOrderItem) bool {
|
||||||
|
return item.key.Index < 1000
|
||||||
|
})
|
||||||
|
state.Priority = 0
|
||||||
|
state.Availability++
|
||||||
|
doPieces(func(key PieceRequestOrderKey) {
|
||||||
|
pro.Update(key, state)
|
||||||
|
})
|
||||||
|
pro.tree.Scan(func(item pieceRequestOrderItem) bool {
|
||||||
|
return item.key.Index < 1000
|
||||||
|
})
|
||||||
|
state.Availability--
|
||||||
|
doPieces(func(key PieceRequestOrderKey) {
|
||||||
|
pro.Update(key, state)
|
||||||
|
})
|
||||||
|
doPieces(pro.Delete)
|
||||||
|
if pro.Len() != 0 {
|
||||||
|
b.FailNow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func zero[T any](t *T) {
|
||||||
|
var zt T
|
||||||
|
*t = zt
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkPieceRequestOrder(b *testing.B) {
|
||||||
|
const numPieces = 2000
|
||||||
|
b.Run("TidwallBtree", func(b *testing.B) {
|
||||||
|
b.Run("NoPathHints", func(b *testing.B) {
|
||||||
|
benchmarkPieceRequestOrder(b, NewTidwallBtree, func(int) {}, numPieces)
|
||||||
|
})
|
||||||
|
b.Run("SharedPathHint", func(b *testing.B) {
|
||||||
|
var pathHint PieceRequestOrderPathHint
|
||||||
|
var btree *tidwallBtree
|
||||||
|
benchmarkPieceRequestOrder(
|
||||||
|
b, func() *tidwallBtree {
|
||||||
|
zero(&pathHint)
|
||||||
|
btree = NewTidwallBtree()
|
||||||
|
btree.PathHint = &pathHint
|
||||||
|
return btree
|
||||||
|
}, func(int) {}, numPieces,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
b.Run("PathHintPerPiece", func(b *testing.B) {
|
||||||
|
pathHints := make([]PieceRequestOrderPathHint, numPieces)
|
||||||
|
var btree *tidwallBtree
|
||||||
|
benchmarkPieceRequestOrder(
|
||||||
|
b, func() *tidwallBtree {
|
||||||
|
btree = NewTidwallBtree()
|
||||||
|
return btree
|
||||||
|
}, func(index int) {
|
||||||
|
btree.PathHint = &pathHints[index]
|
||||||
|
}, numPieces,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
b.Run("AjwernerBtree", func(b *testing.B) {
|
||||||
|
benchmarkPieceRequestOrder(b, NewAjwernerBtree, func(index int) {}, numPieces)
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
package request_strategy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/tidwall/btree"
|
||||||
|
)
|
||||||
|
|
||||||
|
type tidwallBtree struct {
|
||||||
|
tree *btree.BTree[pieceRequestOrderItem]
|
||||||
|
PathHint *btree.PathHint
|
||||||
|
}
|
||||||
|
|
||||||
|
func (me *tidwallBtree) Scan(f func(pieceRequestOrderItem) bool) {
|
||||||
|
me.tree.Scan(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTidwallBtree() *tidwallBtree {
|
||||||
|
return &tidwallBtree{
|
||||||
|
tree: btree.NewOptions(
|
||||||
|
func(a, b pieceRequestOrderItem) bool {
|
||||||
|
return a.Less(&b)
|
||||||
|
},
|
||||||
|
btree.Options{NoLocks: true}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (me *tidwallBtree) Add(item pieceRequestOrderItem) {
|
||||||
|
if _, ok := me.tree.SetHint(item, me.PathHint); ok {
|
||||||
|
panic("shouldn't already have this")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type PieceRequestOrderPathHint = btree.PathHint
|
||||||
|
|
||||||
|
func (me *tidwallBtree) Delete(item pieceRequestOrderItem) {
|
||||||
|
_, deleted := me.tree.DeleteHint(item, me.PathHint)
|
||||||
|
mustValue(deleted, item)
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
package torrent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/sha1"
|
||||||
|
"net/netip"
|
||||||
|
|
||||||
|
"github.com/anacrolix/generics"
|
||||||
|
"github.com/anacrolix/torrent/smartban"
|
||||||
|
)
|
||||||
|
|
||||||
|
type bannableAddr = netip.Addr
|
||||||
|
|
||||||
|
type smartBanCache = smartban.Cache[bannableAddr, RequestIndex, [sha1.Size]byte]
|
||||||
|
|
||||||
|
type blockCheckingWriter struct {
|
||||||
|
cache *smartBanCache
|
||||||
|
requestIndex RequestIndex
|
||||||
|
// Peers that didn't match blocks written now.
|
||||||
|
badPeers map[bannableAddr]struct{}
|
||||||
|
blockBuffer bytes.Buffer
|
||||||
|
chunkSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (me *blockCheckingWriter) checkBlock() {
|
||||||
|
b := me.blockBuffer.Next(me.chunkSize)
|
||||||
|
for _, peer := range me.cache.CheckBlock(me.requestIndex, b) {
|
||||||
|
generics.MakeMapIfNilAndSet(&me.badPeers, peer, struct{}{})
|
||||||
|
}
|
||||||
|
me.requestIndex++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (me *blockCheckingWriter) checkFullBlocks() {
|
||||||
|
for me.blockBuffer.Len() >= me.chunkSize {
|
||||||
|
me.checkBlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (me *blockCheckingWriter) Write(b []byte) (int, error) {
|
||||||
|
n, err := me.blockBuffer.Write(b)
|
||||||
|
if err != nil {
|
||||||
|
// bytes.Buffer.Write should never fail.
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
me.checkFullBlocks()
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check any remaining block data. Terminal pieces or piece sizes that don't divide into the chunk
|
||||||
|
// size cleanly may leave fragments that should be checked.
|
||||||
|
func (me *blockCheckingWriter) Flush() {
|
||||||
|
for me.blockBuffer.Len() != 0 {
|
||||||
|
me.checkBlock()
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,51 @@
|
||||||
|
package smartban
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Cache[Peer, BlockKey, Hash comparable] struct {
|
||||||
|
Hash func([]byte) Hash
|
||||||
|
|
||||||
|
lock sync.RWMutex
|
||||||
|
blocks map[BlockKey]map[Peer]Hash
|
||||||
|
}
|
||||||
|
|
||||||
|
type Block[Key any] struct {
|
||||||
|
Key Key
|
||||||
|
Data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (me *Cache[Peer, BlockKey, Hash]) Init() {
|
||||||
|
me.blocks = make(map[BlockKey]map[Peer]Hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (me *Cache[Peer, BlockKey, Hash]) RecordBlock(peer Peer, key BlockKey, data []byte) {
|
||||||
|
hash := me.Hash(data)
|
||||||
|
me.lock.Lock()
|
||||||
|
defer me.lock.Unlock()
|
||||||
|
peers := me.blocks[key]
|
||||||
|
if peers == nil {
|
||||||
|
peers = make(map[Peer]Hash)
|
||||||
|
me.blocks[key] = peers
|
||||||
|
}
|
||||||
|
peers[peer] = hash
|
||||||
|
}
|
||||||
|
|
||||||
|
func (me *Cache[Peer, BlockKey, Hash]) CheckBlock(key BlockKey, data []byte) (bad []Peer) {
|
||||||
|
correct := me.Hash(data)
|
||||||
|
me.lock.RLock()
|
||||||
|
defer me.lock.RUnlock()
|
||||||
|
for peer, hash := range me.blocks[key] {
|
||||||
|
if hash != correct {
|
||||||
|
bad = append(bad, peer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (me *Cache[Peer, BlockKey, Hash]) ForgetBlock(key BlockKey) {
|
||||||
|
me.lock.Lock()
|
||||||
|
defer me.lock.Unlock()
|
||||||
|
delete(me.blocks, key)
|
||||||
|
}
|
18
t.go
18
t.go
|
@ -5,7 +5,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/anacrolix/chansync/events"
|
"github.com/anacrolix/chansync/events"
|
||||||
"github.com/anacrolix/missinggo/pubsub"
|
"github.com/anacrolix/missinggo/v2/pubsub"
|
||||||
"github.com/anacrolix/sync"
|
"github.com/anacrolix/sync"
|
||||||
|
|
||||||
"github.com/anacrolix/torrent/metainfo"
|
"github.com/anacrolix/torrent/metainfo"
|
||||||
|
@ -115,7 +115,7 @@ func (t *Torrent) BytesCompleted() int64 {
|
||||||
|
|
||||||
// The subscription emits as (int) the index of pieces as their state changes.
|
// The subscription emits as (int) the index of pieces as their state changes.
|
||||||
// A state change is when the PieceState for a piece alters in value.
|
// A state change is when the PieceState for a piece alters in value.
|
||||||
func (t *Torrent) SubscribePieceStateChanges() *pubsub.Subscription {
|
func (t *Torrent) SubscribePieceStateChanges() *pubsub.Subscription[PieceStateChange] {
|
||||||
return t.pieceStateChanges.Subscribe()
|
return t.pieceStateChanges.Subscribe()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -211,23 +211,13 @@ func (t *Torrent) initFiles() {
|
||||||
var offset int64
|
var offset int64
|
||||||
t.files = new([]*File)
|
t.files = new([]*File)
|
||||||
for _, fi := range t.info.UpvertedFiles() {
|
for _, fi := range t.info.UpvertedFiles() {
|
||||||
var path []string
|
|
||||||
if len(fi.PathUTF8) != 0 {
|
|
||||||
path = fi.PathUTF8
|
|
||||||
} else {
|
|
||||||
path = fi.Path
|
|
||||||
}
|
|
||||||
dp := t.info.Name
|
|
||||||
if len(fi.Path) != 0 {
|
|
||||||
dp = strings.Join(fi.Path, "/")
|
|
||||||
}
|
|
||||||
*t.files = append(*t.files, &File{
|
*t.files = append(*t.files, &File{
|
||||||
t,
|
t,
|
||||||
strings.Join(append([]string{t.info.Name}, path...), "/"),
|
strings.Join(append([]string{t.info.BestName()}, fi.BestPath()...), "/"),
|
||||||
offset,
|
offset,
|
||||||
fi.Length,
|
fi.Length,
|
||||||
fi,
|
fi,
|
||||||
dp,
|
fi.DisplayPath(t.info),
|
||||||
PiecePriorityNone,
|
PiecePriorityNone,
|
||||||
})
|
})
|
||||||
offset += fi.Length
|
offset += fi.Length
|
||||||
|
|
|
@ -45,7 +45,7 @@ func (t *Torrent) initPieceRequestOrder() {
|
||||||
key := t.clientPieceRequestOrderKey()
|
key := t.clientPieceRequestOrderKey()
|
||||||
cpro := t.cl.pieceRequestOrder
|
cpro := t.cl.pieceRequestOrder
|
||||||
if cpro[key] == nil {
|
if cpro[key] == nil {
|
||||||
cpro[key] = request_strategy.NewPieceOrder()
|
cpro[key] = request_strategy.NewPieceOrder(request_strategy.NewAjwernerBtree(), t.numPieces())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
89
torrent.go
89
torrent.go
|
@ -8,6 +8,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"net/netip"
|
||||||
"net/url"
|
"net/url"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -19,12 +20,13 @@ import (
|
||||||
"github.com/anacrolix/chansync"
|
"github.com/anacrolix/chansync"
|
||||||
"github.com/anacrolix/chansync/events"
|
"github.com/anacrolix/chansync/events"
|
||||||
"github.com/anacrolix/dht/v2"
|
"github.com/anacrolix/dht/v2"
|
||||||
|
. "github.com/anacrolix/generics"
|
||||||
"github.com/anacrolix/log"
|
"github.com/anacrolix/log"
|
||||||
"github.com/anacrolix/missinggo/perf"
|
"github.com/anacrolix/missinggo/perf"
|
||||||
"github.com/anacrolix/missinggo/pubsub"
|
|
||||||
"github.com/anacrolix/missinggo/slices"
|
"github.com/anacrolix/missinggo/slices"
|
||||||
"github.com/anacrolix/missinggo/v2"
|
"github.com/anacrolix/missinggo/v2"
|
||||||
"github.com/anacrolix/missinggo/v2/bitmap"
|
"github.com/anacrolix/missinggo/v2/bitmap"
|
||||||
|
"github.com/anacrolix/missinggo/v2/pubsub"
|
||||||
"github.com/anacrolix/multiless"
|
"github.com/anacrolix/multiless"
|
||||||
"github.com/anacrolix/sync"
|
"github.com/anacrolix/sync"
|
||||||
request_strategy "github.com/anacrolix/torrent/request-strategy"
|
request_strategy "github.com/anacrolix/torrent/request-strategy"
|
||||||
|
@ -60,7 +62,7 @@ type Torrent struct {
|
||||||
infoHash metainfo.Hash
|
infoHash metainfo.Hash
|
||||||
pieces []Piece
|
pieces []Piece
|
||||||
// Values are the piece indices that changed.
|
// Values are the piece indices that changed.
|
||||||
pieceStateChanges *pubsub.PubSub
|
pieceStateChanges pubsub.PubSub[PieceStateChange]
|
||||||
// The size of chunks to request from peers over the wire. This is
|
// The size of chunks to request from peers over the wire. This is
|
||||||
// normally 16KiB by convention these days.
|
// normally 16KiB by convention these days.
|
||||||
chunkSize pp.Integer
|
chunkSize pp.Integer
|
||||||
|
@ -151,6 +153,8 @@ type Torrent struct {
|
||||||
// Torrent sources in use keyed by the source string.
|
// Torrent sources in use keyed by the source string.
|
||||||
activeSources sync.Map
|
activeSources sync.Map
|
||||||
sourcesLogger log.Logger
|
sourcesLogger log.Logger
|
||||||
|
|
||||||
|
smartBanCache smartBanCache
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
|
func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
|
||||||
|
@ -943,7 +947,20 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
|
||||||
return pp.Integer(t.info.PieceLength)
|
return pp.Integer(t.info.PieceLength)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
|
func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
|
||||||
|
return &blockCheckingWriter{
|
||||||
|
cache: &t.smartBanCache,
|
||||||
|
requestIndex: t.pieceRequestIndexOffset(piece),
|
||||||
|
chunkSize: t.chunkSize.Int(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Torrent) hashPiece(piece pieceIndex) (
|
||||||
|
ret metainfo.Hash,
|
||||||
|
// These are peers that sent us blocks that differ from what we hash here.
|
||||||
|
differingPeers map[bannableAddr]struct{},
|
||||||
|
err error,
|
||||||
|
) {
|
||||||
p := t.piece(piece)
|
p := t.piece(piece)
|
||||||
p.waitNoPendingWrites()
|
p.waitNoPendingWrites()
|
||||||
storagePiece := t.pieces[piece].Storage()
|
storagePiece := t.pieces[piece].Storage()
|
||||||
|
@ -959,13 +976,18 @@ func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
|
||||||
|
|
||||||
hash := pieceHash.New()
|
hash := pieceHash.New()
|
||||||
const logPieceContents = false
|
const logPieceContents = false
|
||||||
|
smartBanWriter := t.smartBanBlockCheckingWriter(piece)
|
||||||
|
writers := []io.Writer{hash, smartBanWriter}
|
||||||
|
var examineBuf bytes.Buffer
|
||||||
if logPieceContents {
|
if logPieceContents {
|
||||||
var examineBuf bytes.Buffer
|
writers = append(writers, &examineBuf)
|
||||||
_, err = storagePiece.WriteTo(io.MultiWriter(hash, &examineBuf))
|
|
||||||
log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
|
|
||||||
} else {
|
|
||||||
_, err = storagePiece.WriteTo(hash)
|
|
||||||
}
|
}
|
||||||
|
_, err = storagePiece.WriteTo(io.MultiWriter(writers...))
|
||||||
|
if logPieceContents {
|
||||||
|
log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
|
||||||
|
}
|
||||||
|
smartBanWriter.Flush()
|
||||||
|
differingPeers = smartBanWriter.badPeers
|
||||||
missinggo.CopyExact(&ret, hash.Sum(nil))
|
missinggo.CopyExact(&ret, hash.Sum(nil))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -2024,7 +2046,16 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
|
||||||
|
|
||||||
if len(bannableTouchers) >= 1 {
|
if len(bannableTouchers) >= 1 {
|
||||||
c := bannableTouchers[0]
|
c := bannableTouchers[0]
|
||||||
c.ban()
|
if len(bannableTouchers) != 1 {
|
||||||
|
t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
|
||||||
|
} else {
|
||||||
|
// Turns out it's still useful to ban peers like this because if there's only a
|
||||||
|
// single peer for a piece, and we never progress that piece to completion, we
|
||||||
|
// will never smart-ban them. Discovered in
|
||||||
|
// https://github.com/anacrolix/torrent/issues/715.
|
||||||
|
t.logger.Levelf(log.Warning, "banning %v for being sole dirtier of piece %v after failed piece check", c, piece)
|
||||||
|
c.ban()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.onIncompletePiece(piece)
|
t.onIncompletePiece(piece)
|
||||||
|
@ -2112,9 +2143,32 @@ func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Torrent) dropBannedPeers() {
|
||||||
|
t.iterPeers(func(p *Peer) {
|
||||||
|
remoteIp := p.remoteIp()
|
||||||
|
if remoteIp == nil {
|
||||||
|
if p.bannableAddr.Ok() {
|
||||||
|
log.Printf("can't get remote ip for peer %v", p)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
netipAddr := netip.MustParseAddr(remoteIp.String())
|
||||||
|
if Some(netipAddr) != p.bannableAddr {
|
||||||
|
log.Printf(
|
||||||
|
"peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
|
||||||
|
p, remoteIp, p.bannableAddr)
|
||||||
|
}
|
||||||
|
if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
|
||||||
|
// Should this be a close?
|
||||||
|
p.drop()
|
||||||
|
log.Printf("dropped %v for banned remote IP %v", p, netipAddr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (t *Torrent) pieceHasher(index pieceIndex) {
|
func (t *Torrent) pieceHasher(index pieceIndex) {
|
||||||
p := t.piece(index)
|
p := t.piece(index)
|
||||||
sum, copyErr := t.hashPiece(index)
|
sum, failedPeers, copyErr := t.hashPiece(index)
|
||||||
correct := sum == *p.hash
|
correct := sum == *p.hash
|
||||||
switch copyErr {
|
switch copyErr {
|
||||||
case nil, io.EOF:
|
case nil, io.EOF:
|
||||||
|
@ -2124,6 +2178,16 @@ func (t *Torrent) pieceHasher(index pieceIndex) {
|
||||||
t.storageLock.RUnlock()
|
t.storageLock.RUnlock()
|
||||||
t.cl.lock()
|
t.cl.lock()
|
||||||
defer t.cl.unlock()
|
defer t.cl.unlock()
|
||||||
|
if correct {
|
||||||
|
for peer := range failedPeers {
|
||||||
|
t.cl.banPeerIP(peer.AsSlice())
|
||||||
|
log.Printf("smart banned %v for piece %v", peer, index)
|
||||||
|
}
|
||||||
|
t.dropBannedPeers()
|
||||||
|
for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
|
||||||
|
t.smartBanCache.ForgetBlock(ri)
|
||||||
|
}
|
||||||
|
}
|
||||||
p.hashing = false
|
p.hashing = false
|
||||||
t.pieceHashed(index, correct, copyErr)
|
t.pieceHashed(index, correct, copyErr)
|
||||||
t.updatePiecePriority(index, "Torrent.pieceHasher")
|
t.updatePiecePriority(index, "Torrent.pieceHasher")
|
||||||
|
@ -2315,8 +2379,9 @@ func (t *Torrent) addWebSeed(url string) {
|
||||||
// requests mark more often, so recomputation is probably sooner than with regular peer
|
// requests mark more often, so recomputation is probably sooner than with regular peer
|
||||||
// conns. ~4x maxRequests would be about right.
|
// conns. ~4x maxRequests would be about right.
|
||||||
PeerMaxRequests: 128,
|
PeerMaxRequests: 128,
|
||||||
RemoteAddr: remoteAddrFromUrl(url),
|
// TODO: Set ban prefix?
|
||||||
callbacks: t.callbacks(),
|
RemoteAddr: remoteAddrFromUrl(url),
|
||||||
|
callbacks: t.callbacks(),
|
||||||
},
|
},
|
||||||
client: webseed.Client{
|
client: webseed.Client{
|
||||||
HttpClient: t.cl.httpClient,
|
HttpClient: t.cl.httpClient,
|
||||||
|
|
Loading…
Reference in New Issue