From eb9c032f2b5a1071052d3c0ccf6dbc97d7a907e4 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 5 Dec 2022 12:52:19 +1100 Subject: [PATCH 01/32] Start a UDP server implementation --- tracker/udp/addr-family.go | 25 +++++ tracker/udp/server/server.go | 199 +++++++++++++++++++++++++++++++++++ tracker/udp_test.go | 1 + 3 files changed, 225 insertions(+) diff --git a/tracker/udp/addr-family.go b/tracker/udp/addr-family.go index 0213f41f..ddecb4c9 100644 --- a/tracker/udp/addr-family.go +++ b/tracker/udp/addr-family.go @@ -1 +1,26 @@ package udp + +import ( + "encoding" + + "github.com/anacrolix/dht/v2/krpc" +) + +// Discriminates behaviours based on address family in use. +type AddrFamily int + +const ( + AddrFamilyIpv4 = iota + 1 + AddrFamilyIpv6 +) + +// Returns a marshaler for the given node addrs for the specified family. +func GetNodeAddrsCompactMarshaler(nas []krpc.NodeAddr, family AddrFamily) encoding.BinaryMarshaler { + switch family { + case AddrFamilyIpv4: + return krpc.CompactIPv4NodeAddrs(nas) + case AddrFamilyIpv6: + return krpc.CompactIPv6NodeAddrs(nas) + } + return nil +} diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index abb4e431..81500214 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -1 +1,200 @@ package server + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/binary" + "fmt" + "io" + "net" + "net/netip" + + "github.com/anacrolix/dht/v2/krpc" + "github.com/anacrolix/log" + "github.com/anacrolix/torrent/tracker/udp" +) + +type ConnectionTrackerAddr = string + +type ConnectionTracker interface { + Add(ctx context.Context, addr ConnectionTrackerAddr, id udp.ConnectionId) error + Check(ctx context.Context, addr ConnectionTrackerAddr, id udp.ConnectionId) (bool, error) +} + +type InfoHash = [20]byte + +// This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key, +// limiting return count, etc. +type GetPeersOpts struct{} + +type PeerInfo struct { + netip.AddrPort +} + +type AnnounceTracker interface { + TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr RequestSourceAddr) error + Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error) + GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error) +} + +type Server struct { + ConnTracker ConnectionTracker + SendResponse func(data []byte, addr net.Addr) (int, error) + AnnounceTracker AnnounceTracker +} + +type RequestSourceAddr = net.Addr + +func (me *Server) HandleRequest(ctx context.Context, family udp.AddrFamily, source RequestSourceAddr, body []byte) error { + var h udp.RequestHeader + var r bytes.Reader + r.Reset(body) + err := udp.Read(&r, &h) + if err != nil { + err = fmt.Errorf("reading request header: %w", err) + return err + } + switch h.Action { + case udp.ActionConnect: + err = me.handleConnect(ctx, source, h.TransactionId) + case udp.ActionAnnounce: + err = me.handleAnnounce(ctx, family, source, h.ConnectionId, h.TransactionId, &r) + default: + err = fmt.Errorf("unimplemented") + } + if err != nil { + err = fmt.Errorf("handling action %v: %w", h.Action, err) + } + return err +} + +func (me *Server) handleAnnounce( + ctx context.Context, + addrFamily udp.AddrFamily, + source RequestSourceAddr, + connId udp.ConnectionId, + tid udp.TransactionId, + r *bytes.Reader, +) error { + ok, err := me.ConnTracker.Check(ctx, source.String(), connId) + if err != nil { + err = fmt.Errorf("checking conn id: %w", err) + return err + } + if !ok { + return fmt.Errorf("invalid connection id: %v", connId) + } + var req udp.AnnounceRequest + err = udp.Read(r, &req) + if err != nil { + return err + } + // TODO: This should be done asynchronously to responding to the announce. + err = me.AnnounceTracker.TrackAnnounce(ctx, req, source) + if err != nil { + return err + } + peers, err := me.AnnounceTracker.GetPeers(ctx, req.InfoHash, GetPeersOpts{}) + if err != nil { + return err + } + nodeAddrs := make([]krpc.NodeAddr, 0, len(peers)) + for _, p := range peers { + var ip net.IP + switch addrFamily { + default: + continue + case udp.AddrFamilyIpv4: + if !p.Addr().Unmap().Is4() { + continue + } + ipBuf := p.Addr().As4() + ip = ipBuf[:] + case udp.AddrFamilyIpv6: + ipBuf := p.Addr().As16() + ip = ipBuf[:] + } + nodeAddrs = append(nodeAddrs, krpc.NodeAddr{ + IP: ip[:], + Port: int(p.Port()), + }) + } + var buf bytes.Buffer + err = udp.Write(&buf, udp.ResponseHeader{ + Action: udp.ActionAnnounce, + TransactionId: tid, + }) + if err != nil { + return err + } + err = udp.Write(&buf, udp.AnnounceResponseHeader{}) + if err != nil { + return err + } + b, err := udp.GetNodeAddrsCompactMarshaler(nodeAddrs, addrFamily).MarshalBinary() + if err != nil { + err = fmt.Errorf("marshalling compact node addrs: %w", err) + return err + } + log.Print(nodeAddrs) + buf.Write(b) + n, err := me.SendResponse(buf.Bytes(), source) + if err != nil { + return err + } + if n < buf.Len() { + err = io.ErrShortWrite + } + return err +} + +func (me *Server) handleConnect(ctx context.Context, source RequestSourceAddr, tid udp.TransactionId) error { + connId := randomConnectionId() + err := me.ConnTracker.Add(ctx, source.String(), connId) + if err != nil { + err = fmt.Errorf("recording conn id: %w", err) + return err + } + var buf bytes.Buffer + udp.Write(&buf, udp.ResponseHeader{ + Action: udp.ActionConnect, + TransactionId: tid, + }) + udp.Write(&buf, udp.ConnectionResponse{connId}) + n, err := me.SendResponse(buf.Bytes(), source) + if err != nil { + return err + } + if n < buf.Len() { + err = io.ErrShortWrite + } + return err +} + +func randomConnectionId() udp.ConnectionId { + var b [8]byte + _, err := rand.Read(b[:]) + if err != nil { + panic(err) + } + return int64(binary.BigEndian.Uint64(b[:])) +} + +func RunServer(ctx context.Context, s *Server, pc net.PacketConn, family udp.AddrFamily) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + for { + var b [1500]byte + n, addr, err := pc.ReadFrom(b[:]) + if err != nil { + return err + } + go func() { + err := s.HandleRequest(ctx, family, addr, b[:n]) + if err != nil { + log.Printf("error handling %v byte request from %v: %v", n, addr, err) + } + }() + } +} diff --git a/tracker/udp_test.go b/tracker/udp_test.go index 7354063b..751e41b9 100644 --- a/tracker/udp_test.go +++ b/tracker/udp_test.go @@ -23,6 +23,7 @@ import ( var trackers = []string{ "udp://tracker.opentrackr.org:1337/announce", "udp://tracker.openbittorrent.com:6969/announce", + "udp://localhost:42069", } func TestAnnounceLocalhost(t *testing.T) { From 6b5c2fa1b0d5ab4da419e6d085f00ae06bf2ead1 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 5 Dec 2022 17:52:03 +1100 Subject: [PATCH 02/32] Support HTTP tracker server --- tracker/http/server/server.go | 94 +++++++++++++++++++++++++++++++++++ tracker/server.go | 35 +++++++++++++ tracker/udp/server/server.go | 37 +++++++------- 3 files changed, 147 insertions(+), 19 deletions(-) create mode 100644 tracker/http/server/server.go create mode 100644 tracker/server.go diff --git a/tracker/http/server/server.go b/tracker/http/server/server.go new file mode 100644 index 00000000..88457826 --- /dev/null +++ b/tracker/http/server/server.go @@ -0,0 +1,94 @@ +package httpTrackerServer + +import ( + "fmt" + "net" + "net/http" + "net/netip" + "net/url" + + "github.com/anacrolix/dht/v2/krpc" + "github.com/anacrolix/log" + + "github.com/anacrolix/torrent/bencode" + "github.com/anacrolix/torrent/tracker" + httpTracker "github.com/anacrolix/torrent/tracker/http" + udpTrackerServer "github.com/anacrolix/torrent/tracker/udp/server" +) + +type Handler struct { + AnnounceTracker udpTrackerServer.AnnounceTracker +} + +func unmarshalQueryKeyToArray(w http.ResponseWriter, key string, query url.Values) (ret [20]byte, ok bool) { + str := query.Get(key) + if len(str) != len(ret) { + http.Error(w, fmt.Sprintf("%v has wrong length", key), http.StatusBadRequest) + return + } + copy(ret[:], str) + ok = true + return +} + +func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + vs := r.URL.Query() + var event tracker.AnnounceEvent + err := event.UnmarshalText([]byte(vs.Get("event"))) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + infoHash, ok := unmarshalQueryKeyToArray(w, "info_hash", vs) + if !ok { + return + } + peerId, ok := unmarshalQueryKeyToArray(w, "peer_id", vs) + if !ok { + return + } + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + log.Printf("error splitting remote port: %v", err) + http.Error(w, "error determining your IP", http.StatusInternalServerError) + return + } + addrPort, err := netip.ParseAddrPort(net.JoinHostPort(host, vs.Get("port"))) + err = me.AnnounceTracker.TrackAnnounce(r.Context(), tracker.AnnounceRequest{ + InfoHash: infoHash, + PeerId: peerId, + Event: event, + Port: addrPort.Port(), + }, addrPort) + if err != nil { + log.Printf("error tracking announce: %v", err) + http.Error(w, "error tracking announce", http.StatusInternalServerError) + return + } + peers, err := me.AnnounceTracker.GetPeers(r.Context(), infoHash, tracker.GetPeersOpts{}) + if err != nil { + log.Printf("error getting peers: %v", err) + http.Error(w, "error getting peers", http.StatusInternalServerError) + return + } + var resp httpTracker.HttpResponse + resp.Interval = 5 * 60 + resp.Peers.Compact = true + for _, peer := range peers { + if peer.Addr().Is4() { + resp.Peers.List = append(resp.Peers.List, tracker.Peer{ + IP: peer.Addr().AsSlice(), + Port: int(peer.Port()), + }) + } else if peer.Addr().Is6() { + resp.Peers6 = append(resp.Peers6, krpc.NodeAddr{ + IP: peer.Addr().AsSlice(), + Port: int(peer.Port()), + }) + } + } + err = bencode.NewEncoder(w).Encode(resp) + if err != nil { + log.Printf("error encoding and writing response body: %v", err) + } +} diff --git a/tracker/server.go b/tracker/server.go new file mode 100644 index 00000000..077d3244 --- /dev/null +++ b/tracker/server.go @@ -0,0 +1,35 @@ +package tracker + +import ( + "context" + "net/netip" + + "github.com/anacrolix/torrent/tracker/udp" +) + +// This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key, +// limiting return count, etc. +type GetPeersOpts struct{} + +type InfoHash = [20]byte + +type PeerInfo struct { + AnnounceAddr +} + +type AnnounceAddr = netip.AddrPort + +type AnnounceTracker interface { + TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error + Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error) + GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error) +} + +// +//type Server struct { +// AnnounceTracker AnnounceTracker +//} +// +//func (me Server) HandleAnnounce(req udp.AnnounceRequest, sourceAddr AnnounceAddr) error { +// +//} diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index 81500214..68abb106 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -1,4 +1,4 @@ -package server +package udpTrackerServer import ( "bytes" @@ -12,6 +12,8 @@ import ( "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/log" + + "github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/tracker/udp" ) @@ -24,19 +26,7 @@ type ConnectionTracker interface { type InfoHash = [20]byte -// This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key, -// limiting return count, etc. -type GetPeersOpts struct{} - -type PeerInfo struct { - netip.AddrPort -} - -type AnnounceTracker interface { - TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr RequestSourceAddr) error - Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error) - GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error) -} +type AnnounceTracker = tracker.AnnounceTracker type Server struct { ConnTracker ConnectionTracker @@ -46,7 +36,12 @@ type Server struct { type RequestSourceAddr = net.Addr -func (me *Server) HandleRequest(ctx context.Context, family udp.AddrFamily, source RequestSourceAddr, body []byte) error { +func (me *Server) HandleRequest( + ctx context.Context, + family udp.AddrFamily, + source RequestSourceAddr, + body []byte, +) error { var h udp.RequestHeader var r bytes.Reader r.Reset(body) @@ -91,11 +86,16 @@ func (me *Server) handleAnnounce( return err } // TODO: This should be done asynchronously to responding to the announce. - err = me.AnnounceTracker.TrackAnnounce(ctx, req, source) + announceAddr, err := netip.ParseAddrPort(source.String()) + if err != nil { + err = fmt.Errorf("converting source net.Addr to AnnounceAddr: %w", err) + return err + } + err = me.AnnounceTracker.TrackAnnounce(ctx, req, announceAddr) if err != nil { return err } - peers, err := me.AnnounceTracker.GetPeers(ctx, req.InfoHash, GetPeersOpts{}) + peers, err := me.AnnounceTracker.GetPeers(ctx, req.InfoHash, tracker.GetPeersOpts{}) if err != nil { return err } @@ -137,7 +137,6 @@ func (me *Server) handleAnnounce( err = fmt.Errorf("marshalling compact node addrs: %w", err) return err } - log.Print(nodeAddrs) buf.Write(b) n, err := me.SendResponse(buf.Bytes(), source) if err != nil { @@ -181,7 +180,7 @@ func randomConnectionId() udp.ConnectionId { return int64(binary.BigEndian.Uint64(b[:])) } -func RunServer(ctx context.Context, s *Server, pc net.PacketConn, family udp.AddrFamily) error { +func RunSimple(ctx context.Context, s *Server, pc net.PacketConn, family udp.AddrFamily) error { ctx, cancel := context.WithCancel(ctx) defer cancel() for { From 425a91c9219638a12c9e9fa583638a562ab14828 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 6 Dec 2022 15:59:06 +1100 Subject: [PATCH 03/32] Support alternate remote host resolution --- tracker/http/server/server.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/tracker/http/server/server.go b/tracker/http/server/server.go index 88457826..0840fe96 100644 --- a/tracker/http/server/server.go +++ b/tracker/http/server/server.go @@ -6,6 +6,7 @@ import ( "net/http" "net/netip" "net/url" + "strconv" "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/log" @@ -18,6 +19,9 @@ import ( type Handler struct { AnnounceTracker udpTrackerServer.AnnounceTracker + // Called to derive an announcer's IP if non-nil. If not specified, the Request.RemoteAddr is + // used. Necessary for instances running behind reverse proxies for example. + RequestHost func(r *http.Request) (netip.Addr, error) } func unmarshalQueryKeyToArray(w http.ResponseWriter, key string, query url.Values) (ret [20]byte, ok bool) { @@ -31,6 +35,20 @@ func unmarshalQueryKeyToArray(w http.ResponseWriter, key string, query url.Value return } +var Logger = log.NewLogger("anacrolix", "torrent", "tracker", "http", "server") + +// Returns false if there was an error and it was served. +func (me Handler) requestHostAddr(r *http.Request) (_ netip.Addr, err error) { + if me.RequestHost != nil { + return me.RequestHost(r) + } + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + return + } + return netip.ParseAddr(host) +} + func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { vs := r.URL.Query() var event tracker.AnnounceEvent @@ -47,13 +65,15 @@ func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !ok { return } - host, _, err := net.SplitHostPort(r.RemoteAddr) + Logger.WithNames("request").Levelf(log.Debug, "request RemoteAddr=%q, header=%q", r.RemoteAddr, r.Header) + addr, err := me.requestHostAddr(r) if err != nil { - log.Printf("error splitting remote port: %v", err) - http.Error(w, "error determining your IP", http.StatusInternalServerError) + log.Printf("error getting requester IP: %v", err) + http.Error(w, "error determining your IP", http.StatusBadGateway) return } - addrPort, err := netip.ParseAddrPort(net.JoinHostPort(host, vs.Get("port"))) + portU64, err := strconv.ParseUint(vs.Get("port"), 0, 16) + addrPort := netip.AddrPortFrom(addr, uint16(portU64)) err = me.AnnounceTracker.TrackAnnounce(r.Context(), tracker.AnnounceRequest{ InfoHash: infoHash, PeerId: peerId, From c23269d1ccd14d9e4bb58b245249a2adf7b2b06b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 7 Dec 2022 01:54:38 +1100 Subject: [PATCH 04/32] Support upstream trackers --- tracker/http/peer.go | 8 ++ tracker/http/server/server.go | 15 +-- tracker/server.go | 210 ++++++++++++++++++++++++++++++++-- tracker/udp/server/server.go | 15 ++- 4 files changed, 221 insertions(+), 27 deletions(-) diff --git a/tracker/http/peer.go b/tracker/http/peer.go index 363ba6d3..b0deee0b 100644 --- a/tracker/http/peer.go +++ b/tracker/http/peer.go @@ -3,16 +3,24 @@ package httpTracker import ( "fmt" "net" + "net/netip" "github.com/anacrolix/dht/v2/krpc" ) +// TODO: Use netip.Addr and Option[[20]byte]. type Peer struct { IP net.IP `bencode:"ip"` Port int `bencode:"port"` ID []byte `bencode:"peer id"` } +func (p Peer) ToNetipAddrPort() (addrPort netip.AddrPort, ok bool) { + addr, ok := netip.AddrFromSlice(p.IP) + addrPort = netip.AddrPortFrom(addr, uint16(p.Port)) + return +} + func (p Peer) String() string { loc := net.JoinHostPort(p.IP.String(), fmt.Sprintf("%d", p.Port)) if len(p.ID) != 0 { diff --git a/tracker/http/server/server.go b/tracker/http/server/server.go index 0840fe96..10713a1b 100644 --- a/tracker/http/server/server.go +++ b/tracker/http/server/server.go @@ -14,11 +14,10 @@ import ( "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/tracker" httpTracker "github.com/anacrolix/torrent/tracker/http" - udpTrackerServer "github.com/anacrolix/torrent/tracker/udp/server" ) type Handler struct { - AnnounceTracker udpTrackerServer.AnnounceTracker + Announce tracker.AnnounceHandler // Called to derive an announcer's IP if non-nil. If not specified, the Request.RemoteAddr is // used. Necessary for instances running behind reverse proxies for example. RequestHost func(r *http.Request) (netip.Addr, error) @@ -74,21 +73,15 @@ func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } portU64, err := strconv.ParseUint(vs.Get("port"), 0, 16) addrPort := netip.AddrPortFrom(addr, uint16(portU64)) - err = me.AnnounceTracker.TrackAnnounce(r.Context(), tracker.AnnounceRequest{ + peers, err := me.Announce.Serve(r.Context(), tracker.AnnounceRequest{ InfoHash: infoHash, PeerId: peerId, Event: event, Port: addrPort.Port(), }, addrPort) if err != nil { - log.Printf("error tracking announce: %v", err) - http.Error(w, "error tracking announce", http.StatusInternalServerError) - return - } - peers, err := me.AnnounceTracker.GetPeers(r.Context(), infoHash, tracker.GetPeersOpts{}) - if err != nil { - log.Printf("error getting peers: %v", err) - http.Error(w, "error getting peers", http.StatusInternalServerError) + log.Printf("error serving announce: %v", err) + http.Error(w, "error handling announce", http.StatusInternalServerError) return } var resp httpTracker.HttpResponse diff --git a/tracker/server.go b/tracker/server.go index 077d3244..2b483436 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -3,7 +3,11 @@ package tracker import ( "context" "net/netip" + "sync" + "time" + "github.com/anacrolix/generics" + "github.com/anacrolix/log" "github.com/anacrolix/torrent/tracker/udp" ) @@ -25,11 +29,201 @@ type AnnounceTracker interface { GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error) } -// -//type Server struct { -// AnnounceTracker AnnounceTracker -//} -// -//func (me Server) HandleAnnounce(req udp.AnnounceRequest, sourceAddr AnnounceAddr) error { -// -//} +type AnnounceHandler struct { + AnnounceTracker AnnounceTracker + UpstreamTrackers []Client + UpstreamTrackerUrls []string + UpstreamAnnouncePeerId [20]byte + + mu sync.Mutex + // Operations are only removed when all the upstream peers have been tracked. + ongoingUpstreamAugmentations map[InfoHash]augmentationOperation +} + +type peerSet = map[PeerInfo]struct{} + +type augmentationOperation struct { + // Closed when no more announce responses are pending. finalPeers will contain all the peers + // seen. + doneAnnouncing chan struct{} + // This receives the latest peerSet until doneAnnouncing is closed. + curPeers chan peerSet + // This contains the final peerSet after doneAnnouncing is closed. + finalPeers peerSet +} + +func (me augmentationOperation) getCurPeers() (ret peerSet) { + ret, _ = me.getCurPeersAndDone() + return +} + +func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) { + select { + case ret = <-me.curPeers: + case <-me.doneAnnouncing: + ret = me.finalPeers + done = true + } + return +} + +// Adds peers from new that aren't in orig. Modifies both arguments. +func addMissing(orig []PeerInfo, new peerSet) { + for _, peer := range orig { + delete(new, peer) + } + for peer := range new { + orig = append(orig, peer) + } +} + +func (me *AnnounceHandler) Serve( + ctx context.Context, req AnnounceRequest, addr AnnounceAddr, +) (peers []PeerInfo, err error) { + err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr) + if err != nil { + return + } + infoHash := req.InfoHash + var op generics.Option[augmentationOperation] + // Grab a handle to any augmentations that are already running. + me.mu.Lock() + op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash] + me.mu.Unlock() + peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, GetPeersOpts{}) + if err != nil { + return + } + // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside + // the mutex or not. + if op.Ok { + curPeers, done := op.Value.getCurPeersAndDone() + addMissing(peers, curPeers) + if done { + // It doesn't get any better with this operation. Forget it. + op.Ok = false + } + } + me.mu.Lock() + // If we didn't have an operation, and don't have enough peers, start one. + if !op.Ok && len(peers) <= 1 { + op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash] + if !op.Ok { + op.Set(me.augmentPeersFromUpstream(req)) + generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value) + } + } + me.mu.Unlock() + // Wait a while for the current operation. + if op.Ok { + // Force the augmentation to return with whatever it has if it hasn't completed in a + // reasonable time. + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + select { + case <-ctx.Done(): + case <-op.Value.doneAnnouncing: + } + cancel() + addMissing(peers, op.Value.getCurPeers()) + } + return +} + +func (me *AnnounceHandler) augmentPeersFromUpstream(req AnnounceRequest) augmentationOperation { + announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute) + subReq := AnnounceRequest{ + InfoHash: req.InfoHash, + PeerId: me.UpstreamAnnouncePeerId, + Event: None, + Key: 0, + NumWant: -1, + Port: 0, + } + peersChan := make(chan []Peer) + var pendingUpstreams sync.WaitGroup + for i := range me.UpstreamTrackers { + client := me.UpstreamTrackers[i] + url := me.UpstreamTrackerUrls[i] + pendingUpstreams.Add(1) + go func() { + resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{ + UserAgent: "aragorn", + }) + peersChan <- resp.Peers + if err != nil { + log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err) + } + }() + } + peersToTrack := make(map[string]Peer) + go func() { + pendingUpstreams.Wait() + cancel() + close(peersChan) + log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers") + for _, peer := range peersToTrack { + addrPort, ok := peer.ToNetipAddrPort() + if !ok { + continue + } + trackReq := AnnounceRequest{ + InfoHash: req.InfoHash, + Event: Started, + Port: uint16(peer.Port), + } + copy(trackReq.PeerId[:], peer.ID) + err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort) + if err != nil { + log.Levelf(log.Error, "error tracking upstream peer: %v", err) + } + } + me.mu.Lock() + delete(me.ongoingUpstreamAugmentations, req.InfoHash) + me.mu.Unlock() + }() + curPeersChan := make(chan map[PeerInfo]struct{}) + doneChan := make(chan struct{}) + retPeers := make(map[PeerInfo]struct{}) + go func() { + for { + select { + case peers, ok := <-peersChan: + if !ok { + return + } + voldemort(peers, peersToTrack, retPeers) + pendingUpstreams.Done() + case curPeersChan <- copyPeerSet(retPeers): + } + } + }() + // Take return references. + return augmentationOperation{ + curPeers: curPeersChan, + finalPeers: retPeers, + doneAnnouncing: doneChan, + } +} + +func copyPeerSet(orig peerSet) (ret peerSet) { + ret = make(peerSet, len(orig)) + for k, v := range orig { + ret[k] = v + } + return +} + +// Adds peers to trailing containers. +func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) { + for _, protoPeer := range peers { + toTrack[protoPeer.String()] = protoPeer + addr, ok := netip.AddrFromSlice(protoPeer.IP) + if !ok { + continue + } + handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))} + for _, set := range sets { + set[handlerPeer] = struct{}{} + } + } +} diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index 68abb106..95e7a5e7 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -29,9 +29,9 @@ type InfoHash = [20]byte type AnnounceTracker = tracker.AnnounceTracker type Server struct { - ConnTracker ConnectionTracker - SendResponse func(data []byte, addr net.Addr) (int, error) - AnnounceTracker AnnounceTracker + ConnTracker ConnectionTracker + SendResponse func(data []byte, addr net.Addr) (int, error) + Announce tracker.AnnounceHandler } type RequestSourceAddr = net.Addr @@ -72,6 +72,9 @@ func (me *Server) handleAnnounce( tid udp.TransactionId, r *bytes.Reader, ) error { + // Should we set a timeout of 10s or something for the entire response, so that we give up if a + // retry is imminent? + ok, err := me.ConnTracker.Check(ctx, source.String(), connId) if err != nil { err = fmt.Errorf("checking conn id: %w", err) @@ -91,11 +94,7 @@ func (me *Server) handleAnnounce( err = fmt.Errorf("converting source net.Addr to AnnounceAddr: %w", err) return err } - err = me.AnnounceTracker.TrackAnnounce(ctx, req, announceAddr) - if err != nil { - return err - } - peers, err := me.AnnounceTracker.GetPeers(ctx, req.InfoHash, tracker.GetPeersOpts{}) + peers, err := me.Announce.Serve(ctx, req, announceAddr) if err != nil { return err } From 68dc90a7f758651f5744290ff1a4c45b1f31d5ac Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 7 Dec 2022 10:43:33 +1100 Subject: [PATCH 05/32] Rework logging --- tracker/http/server/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tracker/http/server/server.go b/tracker/http/server/server.go index 10713a1b..ed08ac92 100644 --- a/tracker/http/server/server.go +++ b/tracker/http/server/server.go @@ -34,8 +34,6 @@ func unmarshalQueryKeyToArray(w http.ResponseWriter, key string, query url.Value return } -var Logger = log.NewLogger("anacrolix", "torrent", "tracker", "http", "server") - // Returns false if there was an error and it was served. func (me Handler) requestHostAddr(r *http.Request) (_ netip.Addr, err error) { if me.RequestHost != nil { @@ -48,6 +46,8 @@ func (me Handler) requestHostAddr(r *http.Request) (_ netip.Addr, err error) { return netip.ParseAddr(host) } +var requestHeadersLogger = log.Default.WithNames("request", "headers") + func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { vs := r.URL.Query() var event tracker.AnnounceEvent @@ -64,7 +64,7 @@ func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !ok { return } - Logger.WithNames("request").Levelf(log.Debug, "request RemoteAddr=%q, header=%q", r.RemoteAddr, r.Header) + requestHeadersLogger.Levelf(log.Debug, "request RemoteAddr=%q, header=%q", r.RemoteAddr, r.Header) addr, err := me.requestHostAddr(r) if err != nil { log.Printf("error getting requester IP: %v", err) From 26c226f88f90c21c2792084d058556d3b22e2fe2 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 7 Dec 2022 10:43:45 +1100 Subject: [PATCH 06/32] Fix missing log argument --- tracker/server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tracker/server.go b/tracker/server.go index 2b483436..806cdcf0 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -8,6 +8,7 @@ import ( "github.com/anacrolix/generics" "github.com/anacrolix/log" + "github.com/anacrolix/torrent/tracker/udp" ) @@ -160,7 +161,7 @@ func (me *AnnounceHandler) augmentPeersFromUpstream(req AnnounceRequest) augment pendingUpstreams.Wait() cancel() close(peersChan) - log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers") + log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers", len(peersToTrack)) for _, peer := range peersToTrack { addrPort, ok := peer.ToNetipAddrPort() if !ok { From e5f00e9a822450ae9c951bdc2d62cd3284ba5239 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 8 Dec 2022 13:39:58 +1100 Subject: [PATCH 07/32] Fix goroutine leak when augmenting peers from upstream --- tracker/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tracker/server.go b/tracker/server.go index 806cdcf0..e19b816a 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -186,6 +186,7 @@ func (me *AnnounceHandler) augmentPeersFromUpstream(req AnnounceRequest) augment doneChan := make(chan struct{}) retPeers := make(map[PeerInfo]struct{}) go func() { + defer close(doneChan) for { select { case peers, ok := <-peersChan: From 1028161833bb73c171b9bf5a70d5b6201478b707 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 8 Dec 2022 14:04:42 +1100 Subject: [PATCH 08/32] Use smaller parameter type --- tracker/server.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tracker/server.go b/tracker/server.go index e19b816a..dc95bce6 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -110,7 +110,7 @@ func (me *AnnounceHandler) Serve( if !op.Ok && len(peers) <= 1 { op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash] if !op.Ok { - op.Set(me.augmentPeersFromUpstream(req)) + op.Set(me.augmentPeersFromUpstream(req.InfoHash)) generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value) } } @@ -130,10 +130,10 @@ func (me *AnnounceHandler) Serve( return } -func (me *AnnounceHandler) augmentPeersFromUpstream(req AnnounceRequest) augmentationOperation { +func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation { announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute) subReq := AnnounceRequest{ - InfoHash: req.InfoHash, + InfoHash: infoHash, PeerId: me.UpstreamAnnouncePeerId, Event: None, Key: 0, @@ -168,7 +168,7 @@ func (me *AnnounceHandler) augmentPeersFromUpstream(req AnnounceRequest) augment continue } trackReq := AnnounceRequest{ - InfoHash: req.InfoHash, + InfoHash: infoHash, Event: Started, Port: uint16(peer.Port), } @@ -179,7 +179,7 @@ func (me *AnnounceHandler) augmentPeersFromUpstream(req AnnounceRequest) augment } } me.mu.Lock() - delete(me.ongoingUpstreamAugmentations, req.InfoHash) + delete(me.ongoingUpstreamAugmentations, infoHash) me.mu.Unlock() }() curPeersChan := make(chan map[PeerInfo]struct{}) From acb09fcf799f4fd5e3a81323b1e41ddd1247a127 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 8 Dec 2022 14:04:23 +1100 Subject: [PATCH 09/32] Add get peers limits --- tracker/http/server/server.go | 4 +++- tracker/server.go | 21 ++++++++++++++++++--- tracker/udp/server/server.go | 7 ++++++- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/tracker/http/server/server.go b/tracker/http/server/server.go index ed08ac92..82c402de 100644 --- a/tracker/http/server/server.go +++ b/tracker/http/server/server.go @@ -9,6 +9,7 @@ import ( "strconv" "github.com/anacrolix/dht/v2/krpc" + "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/torrent/bencode" @@ -78,7 +79,8 @@ func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { PeerId: peerId, Event: event, Port: addrPort.Port(), - }, addrPort) + NumWant: -1, + }, addrPort, tracker.GetPeersOpts{generics.Some[uint](200)}) if err != nil { log.Printf("error serving announce: %v", err) http.Error(w, "error handling announce", http.StatusInternalServerError) diff --git a/tracker/server.go b/tracker/server.go index dc95bce6..f7d5ecb8 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -14,7 +14,10 @@ import ( // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key, // limiting return count, etc. -type GetPeersOpts struct{} +type GetPeersOpts struct { + // Negative numbers are not allowed. + MaxCount generics.Option[uint] +} type InfoHash = [20]byte @@ -79,7 +82,7 @@ func addMissing(orig []PeerInfo, new peerSet) { } func (me *AnnounceHandler) Serve( - ctx context.Context, req AnnounceRequest, addr AnnounceAddr, + ctx context.Context, req AnnounceRequest, addr AnnounceAddr, opts GetPeersOpts, ) (peers []PeerInfo, err error) { err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr) if err != nil { @@ -91,7 +94,19 @@ func (me *AnnounceHandler) Serve( me.mu.Lock() op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash] me.mu.Unlock() - peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, GetPeersOpts{}) + // Apply num_want limit to max count. I really can't tell if this is the right place to do it, + // but it seems the most flexible. + if req.NumWant != -1 { + newCount := uint(req.NumWant) + if opts.MaxCount.Ok { + if newCount < opts.MaxCount.Value { + opts.MaxCount.Value = newCount + } + } else { + opts.MaxCount = generics.Some(newCount) + } + } + peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, opts) if err != nil { return } diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index 95e7a5e7..dfcc6500 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -11,6 +11,7 @@ import ( "net/netip" "github.com/anacrolix/dht/v2/krpc" + "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/torrent/tracker" @@ -94,7 +95,11 @@ func (me *Server) handleAnnounce( err = fmt.Errorf("converting source net.Addr to AnnounceAddr: %w", err) return err } - peers, err := me.Announce.Serve(ctx, req, announceAddr) + opts := tracker.GetPeersOpts{MaxCount: generics.Some[uint](50)} + if addrFamily == udp.AddrFamilyIpv4 { + opts.MaxCount = generics.Some[uint](150) + } + peers, err := me.Announce.Serve(ctx, req, announceAddr, opts) if err != nil { return err } From 91bde5fdf0a8a30597150ea304d2a867ed92cb59 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 8 Dec 2022 15:06:05 +1100 Subject: [PATCH 10/32] Use ConnectionId type alias --- tracker/udp-server_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tracker/udp-server_test.go b/tracker/udp-server_test.go index 824038ea..b32fe853 100644 --- a/tracker/udp-server_test.go +++ b/tracker/udp-server_test.go @@ -21,7 +21,7 @@ type torrent struct { type server struct { pc net.PacketConn - conns map[int64]struct{} + conns map[udp.ConnectionId]struct{} t map[[20]byte]torrent } @@ -46,10 +46,10 @@ func (s *server) respond(addr net.Addr, rh udp.ResponseHeader, parts ...interfac return } -func (s *server) newConn() (ret int64) { +func (s *server) newConn() (ret udp.ConnectionId) { ret = rand.Int63() if s.conns == nil { - s.conns = make(map[int64]struct{}) + s.conns = make(map[udp.ConnectionId]struct{}) } s.conns[ret] = struct{}{} return From 5cedf602f23974955808826606916356e2c2da97 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 8 Dec 2022 15:06:59 +1100 Subject: [PATCH 11/32] Make UDP tracker connection ID unsigned This is more appropriate for logging and its use as a byte blob elsewhere. --- tracker/udp-server_test.go | 2 +- tracker/udp/protocol.go | 2 +- tracker/udp/server/server.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tracker/udp-server_test.go b/tracker/udp-server_test.go index b32fe853..7308ed0d 100644 --- a/tracker/udp-server_test.go +++ b/tracker/udp-server_test.go @@ -47,7 +47,7 @@ func (s *server) respond(addr net.Addr, rh udp.ResponseHeader, parts ...interfac } func (s *server) newConn() (ret udp.ConnectionId) { - ret = rand.Int63() + ret = rand.Uint64() if s.conns == nil { s.conns = make(map[udp.ConnectionId]struct{}) } diff --git a/tracker/udp/protocol.go b/tracker/udp/protocol.go index f6beb4c6..653d013e 100644 --- a/tracker/udp/protocol.go +++ b/tracker/udp/protocol.go @@ -26,7 +26,7 @@ const ( type TransactionId = int32 -type ConnectionId = int64 +type ConnectionId = uint64 type ConnectionRequest struct { ConnectionId ConnectionId diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index dfcc6500..20e827b9 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -181,7 +181,7 @@ func randomConnectionId() udp.ConnectionId { if err != nil { panic(err) } - return int64(binary.BigEndian.Uint64(b[:])) + return binary.BigEndian.Uint64(b[:]) } func RunSimple(ctx context.Context, s *Server, pc net.PacketConn, family udp.AddrFamily) error { From 0a4de1821e24ee2d31281fd0f752cc0e8fbd71e5 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 8 Dec 2022 15:07:14 +1100 Subject: [PATCH 12/32] Fix error message for connection ID mismatch --- tracker/udp/server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index 20e827b9..c3179ae0 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -82,7 +82,7 @@ func (me *Server) handleAnnounce( return err } if !ok { - return fmt.Errorf("invalid connection id: %v", connId) + return fmt.Errorf("incorrect connection id: %x", connId) } var req udp.AnnounceRequest err = udp.Read(r, &req) From cfe166c745b89880846757fecd81bbdc510e8dd3 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 12 Dec 2022 11:23:33 +1100 Subject: [PATCH 13/32] Don't use AnnounceHandler by value It needs to share mutex for upstream tracker single flight handling. --- tracker/http/server/server.go | 2 +- tracker/udp/server/server.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tracker/http/server/server.go b/tracker/http/server/server.go index 82c402de..8937ed5e 100644 --- a/tracker/http/server/server.go +++ b/tracker/http/server/server.go @@ -18,7 +18,7 @@ import ( ) type Handler struct { - Announce tracker.AnnounceHandler + Announce *tracker.AnnounceHandler // Called to derive an announcer's IP if non-nil. If not specified, the Request.RemoteAddr is // used. Necessary for instances running behind reverse proxies for example. RequestHost func(r *http.Request) (netip.Addr, error) diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index c3179ae0..d76dc424 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -32,7 +32,7 @@ type AnnounceTracker = tracker.AnnounceTracker type Server struct { ConnTracker ConnectionTracker SendResponse func(data []byte, addr net.Addr) (int, error) - Announce tracker.AnnounceHandler + Announce *tracker.AnnounceHandler } type RequestSourceAddr = net.Addr From ab4599b6b8f10ca8fb9eb7b596eb1caf838b6af1 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 13 Dec 2022 15:28:34 +1100 Subject: [PATCH 14/32] Add some tracing --- tracker/server.go | 32 ++++++++++++++++++++++++++++++-- tracker/udp/server/server.go | 5 +++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/tracker/server.go b/tracker/server.go index f7d5ecb8..0d5068d4 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -2,12 +2,16 @@ package tracker import ( "context" + "encoding/hex" "net/netip" "sync" "time" "github.com/anacrolix/generics" "github.com/anacrolix/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/anacrolix/torrent/tracker/udp" ) @@ -81,9 +85,30 @@ func addMissing(orig []PeerInfo, new peerSet) { } } +var tracer = otel.Tracer("torrent.tracker.udp") + func (me *AnnounceHandler) Serve( ctx context.Context, req AnnounceRequest, addr AnnounceAddr, opts GetPeersOpts, ) (peers []PeerInfo, err error) { + ctx, span := tracer.Start( + ctx, + "AnnounceHandler.Serve", + trace.WithAttributes( + attribute.Int64("announce.request.num_want", int64(req.NumWant)), + attribute.Int("announce.request.port", int(req.Port)), + attribute.String("announce.request.info_hash", hex.EncodeToString(req.InfoHash[:])), + attribute.String("announce.request.event", req.Event.String()), + attribute.Int64("announce.get_peers.opts.max_count_value", int64(opts.MaxCount.Value)), + attribute.Bool("announce.get_peers.opts.max_count_ok", opts.MaxCount.Ok), + attribute.String("announce.source.addr.ip", addr.Addr().String()), + attribute.Int("announce.source.addr.port", int(addr.Port())), + ), + ) + defer span.End() + defer func() { + span.SetAttributes(attribute.Int("announce.get_peers.len", len(peers))) + }() + err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr) if err != nil { return @@ -121,8 +146,11 @@ func (me *AnnounceHandler) Serve( } } me.mu.Lock() - // If we didn't have an operation, and don't have enough peers, start one. - if !op.Ok && len(peers) <= 1 { + // If we didn't have an operation, and don't have enough peers, start one. Allowing 1 is + // assuming the announcing peer might be that one. Really we should record a value to prevent + // duplicate announces. Also don't announce upstream if we got no peers because the caller asked + // for none. + if !op.Ok && len(peers) <= 1 && opts.MaxCount.UnwrapOr(1) > 0 { op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash] if !op.Ok { op.Set(me.augmentPeersFromUpstream(req.InfoHash)) diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index d76dc424..716c3916 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -13,6 +13,7 @@ import ( "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/generics" "github.com/anacrolix/log" + "go.opentelemetry.io/otel" "github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/tracker/udp" @@ -37,12 +38,16 @@ type Server struct { type RequestSourceAddr = net.Addr +var tracer = otel.Tracer("torrent.tracker.udp") + func (me *Server) HandleRequest( ctx context.Context, family udp.AddrFamily, source RequestSourceAddr, body []byte, ) error { + ctx, span := tracer.Start(ctx, "Server.HandleRequest") + defer span.End() var h udp.RequestHeader var r bytes.Reader r.Reset(body) From cbea87aaf3f9c7ad5d63bb32eb7e71d743b0aaf5 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 13 Dec 2022 16:41:08 +1100 Subject: [PATCH 15/32] Add upstream announce gating --- tracker/server.go | 28 ++++++++++++++++++++++++++-- tracker/upstream-announcing.go | 18 ++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) create mode 100644 tracker/upstream-announcing.go diff --git a/tracker/server.go b/tracker/server.go index 0d5068d4..c74cbd24 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -38,10 +38,12 @@ type AnnounceTracker interface { } type AnnounceHandler struct { - AnnounceTracker AnnounceTracker + AnnounceTracker AnnounceTracker + UpstreamTrackers []Client UpstreamTrackerUrls []string UpstreamAnnouncePeerId [20]byte + UpstreamAnnounceGate UpstreamAnnounceGater mu sync.Mutex // Operations are only removed when all the upstream peers have been tracked. @@ -174,7 +176,8 @@ func (me *AnnounceHandler) Serve( } func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation { - announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute) + const announceTimeout = time.Minute + announceCtx, cancel := context.WithTimeout(context.Background(), announceTimeout) subReq := AnnounceRequest{ InfoHash: infoHash, PeerId: me.UpstreamAnnouncePeerId, @@ -190,9 +193,30 @@ func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentat url := me.UpstreamTrackerUrls[i] pendingUpstreams.Add(1) go func() { + started, err := me.UpstreamAnnounceGate.Start(announceCtx, url, infoHash, announceTimeout) + if err != nil { + log.Printf("error reserving announce for %x to %v: %v", infoHash, url, err) + } + if err != nil || !started { + peersChan <- nil + return + } + log.Printf("announcing %x upstream to %v", infoHash, url) resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{ UserAgent: "aragorn", }) + interval := resp.Interval + go func() { + if interval < 5*60 { + // This is as much to reduce load on upstream trackers in the event of errors, + // as it is to reduce load on our peer store. + interval = 5 * 60 + } + err := me.UpstreamAnnounceGate.Completed(context.Background(), url, infoHash, interval) + if err != nil { + log.Printf("error recording completed announce for %x to %v: %v", infoHash, url, err) + } + }() peersChan <- resp.Peers if err != nil { log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err) diff --git a/tracker/upstream-announcing.go b/tracker/upstream-announcing.go new file mode 100644 index 00000000..ab5a5fb3 --- /dev/null +++ b/tracker/upstream-announcing.go @@ -0,0 +1,18 @@ +package tracker + +import ( + "context" + "time" +) + +type UpstreamAnnounceGater interface { + Start(ctx context.Context, tracker string, infoHash InfoHash, + // How long the announce block remains before discarding it. + timeout time.Duration, + ) (bool, error) + Completed( + ctx context.Context, tracker string, infoHash InfoHash, + // Num of seconds reported by tracker, or some suitable value the caller has chosen. + interval int32, + ) error +} From 87e64b3088a7e45ba69fab18185780aa23af58a4 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 15 Dec 2022 23:21:08 +1100 Subject: [PATCH 16/32] Propagate announce interval, particularly for UDP --- tracker/http/server/server.go | 9 +++++---- tracker/server.go | 28 ++++++++++++++++++---------- tracker/udp/server/server.go | 14 ++++++++------ 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/tracker/http/server/server.go b/tracker/http/server/server.go index 8937ed5e..781c640b 100644 --- a/tracker/http/server/server.go +++ b/tracker/http/server/server.go @@ -74,22 +74,23 @@ func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } portU64, err := strconv.ParseUint(vs.Get("port"), 0, 16) addrPort := netip.AddrPortFrom(addr, uint16(portU64)) - peers, err := me.Announce.Serve(r.Context(), tracker.AnnounceRequest{ + res := me.Announce.Serve(r.Context(), tracker.AnnounceRequest{ InfoHash: infoHash, PeerId: peerId, Event: event, Port: addrPort.Port(), NumWant: -1, - }, addrPort, tracker.GetPeersOpts{generics.Some[uint](200)}) + }, addrPort, tracker.GetPeersOpts{MaxCount: generics.Some[uint](200)}) + err = res.Err if err != nil { log.Printf("error serving announce: %v", err) http.Error(w, "error handling announce", http.StatusInternalServerError) return } var resp httpTracker.HttpResponse - resp.Interval = 5 * 60 + resp.Interval = res.Interval.UnwrapOr(5 * 60) resp.Peers.Compact = true - for _, peer := range peers { + for _, peer := range res.Peers { if peer.Addr().Is4() { resp.Peers.List = append(resp.Peers.List, tracker.Peer{ IP: peer.Addr().AsSlice(), diff --git a/tracker/server.go b/tracker/server.go index c74cbd24..67b46e0c 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -34,7 +34,15 @@ type AnnounceAddr = netip.AddrPort type AnnounceTracker interface { TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error) - GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error) + GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ServerAnnounceResult +} + +type ServerAnnounceResult struct { + Err error + Peers []PeerInfo + Interval generics.Option[int32] + Leechers generics.Option[int32] + Seeders generics.Option[int32] } type AnnounceHandler struct { @@ -91,7 +99,7 @@ var tracer = otel.Tracer("torrent.tracker.udp") func (me *AnnounceHandler) Serve( ctx context.Context, req AnnounceRequest, addr AnnounceAddr, opts GetPeersOpts, -) (peers []PeerInfo, err error) { +) (ret ServerAnnounceResult) { ctx, span := tracer.Start( ctx, "AnnounceHandler.Serve", @@ -108,11 +116,11 @@ func (me *AnnounceHandler) Serve( ) defer span.End() defer func() { - span.SetAttributes(attribute.Int("announce.get_peers.len", len(peers))) + span.SetAttributes(attribute.Int("announce.get_peers.len", len(ret.Peers))) }() - err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr) - if err != nil { + ret.Err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr) + if ret.Err != nil { return } infoHash := req.InfoHash @@ -133,15 +141,15 @@ func (me *AnnounceHandler) Serve( opts.MaxCount = generics.Some(newCount) } } - peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, opts) - if err != nil { + ret = me.AnnounceTracker.GetPeers(ctx, infoHash, opts) + if ret.Err != nil { return } // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside // the mutex or not. if op.Ok { curPeers, done := op.Value.getCurPeersAndDone() - addMissing(peers, curPeers) + addMissing(ret.Peers, curPeers) if done { // It doesn't get any better with this operation. Forget it. op.Ok = false @@ -152,7 +160,7 @@ func (me *AnnounceHandler) Serve( // assuming the announcing peer might be that one. Really we should record a value to prevent // duplicate announces. Also don't announce upstream if we got no peers because the caller asked // for none. - if !op.Ok && len(peers) <= 1 && opts.MaxCount.UnwrapOr(1) > 0 { + if !op.Ok && len(ret.Peers) <= 1 && opts.MaxCount.UnwrapOr(1) > 0 { op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash] if !op.Ok { op.Set(me.augmentPeersFromUpstream(req.InfoHash)) @@ -170,7 +178,7 @@ func (me *AnnounceHandler) Serve( case <-op.Value.doneAnnouncing: } cancel() - addMissing(peers, op.Value.getCurPeers()) + addMissing(ret.Peers, op.Value.getCurPeers()) } return } diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index 716c3916..c8c9bec3 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -104,12 +104,12 @@ func (me *Server) handleAnnounce( if addrFamily == udp.AddrFamilyIpv4 { opts.MaxCount = generics.Some[uint](150) } - peers, err := me.Announce.Serve(ctx, req, announceAddr, opts) - if err != nil { - return err + res := me.Announce.Serve(ctx, req, announceAddr, opts) + if res.Err != nil { + return res.Err } - nodeAddrs := make([]krpc.NodeAddr, 0, len(peers)) - for _, p := range peers { + nodeAddrs := make([]krpc.NodeAddr, 0, len(res.Peers)) + for _, p := range res.Peers { var ip net.IP switch addrFamily { default: @@ -137,7 +137,9 @@ func (me *Server) handleAnnounce( if err != nil { return err } - err = udp.Write(&buf, udp.AnnounceResponseHeader{}) + err = udp.Write(&buf, udp.AnnounceResponseHeader{ + Interval: res.Interval.UnwrapOr(5 * 60), + }) if err != nil { return err } From 16c7621d9ec6b31bdfef985d84295be97961cc21 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 16 Dec 2022 13:22:44 +1100 Subject: [PATCH 17/32] Set span errors --- tracker/server.go | 4 ++++ tracker/udp/server/server.go | 10 ++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/tracker/server.go b/tracker/server.go index 67b46e0c..6ef4ec4e 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -11,6 +11,7 @@ import ( "github.com/anacrolix/log" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "github.com/anacrolix/torrent/tracker/udp" @@ -117,6 +118,9 @@ func (me *AnnounceHandler) Serve( defer span.End() defer func() { span.SetAttributes(attribute.Int("announce.get_peers.len", len(ret.Peers))) + if ret.Err != nil { + span.SetStatus(codes.Error, ret.Err.Error()) + } }() ret.Err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr) diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index c8c9bec3..ad97f407 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -14,6 +14,7 @@ import ( "github.com/anacrolix/generics" "github.com/anacrolix/log" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" "github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/tracker/udp" @@ -45,13 +46,18 @@ func (me *Server) HandleRequest( family udp.AddrFamily, source RequestSourceAddr, body []byte, -) error { +) (err error) { ctx, span := tracer.Start(ctx, "Server.HandleRequest") defer span.End() + defer func() { + if err != nil { + span.SetStatus(codes.Error, err.Error()) + } + }() var h udp.RequestHeader var r bytes.Reader r.Reset(body) - err := udp.Read(&r, &h) + err = udp.Read(&r, &h) if err != nil { err = fmt.Errorf("reading request header: %w", err) return err From e554aa19a691615f349bf6dae0601706ea47dc03 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 16 Dec 2022 15:38:30 +1100 Subject: [PATCH 18/32] Add --port flag to announce --- cmd/torrent/announce.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/cmd/torrent/announce.go b/cmd/torrent/announce.go index 94500d96..f48dfd2a 100644 --- a/cmd/torrent/announce.go +++ b/cmd/torrent/announce.go @@ -12,19 +12,24 @@ import ( type AnnounceCmd struct { Event udp.AnnounceEvent + Port *uint16 Tracker string `arg:"positional"` InfoHash torrent.InfoHash `arg:"positional"` } func announceErr(flags AnnounceCmd) error { + req := tracker.AnnounceRequest{ + InfoHash: flags.InfoHash, + Port: uint16(torrent.NewDefaultClientConfig().ListenPort), + NumWant: -1, + Event: flags.Event, + } + if flags.Port != nil { + req.Port = *flags.Port + } response, err := tracker.Announce{ TrackerUrl: flags.Tracker, - Request: tracker.AnnounceRequest{ - InfoHash: flags.InfoHash, - Port: uint16(torrent.NewDefaultClientConfig().ListenPort), - NumWant: -1, - Event: flags.Event, - }, + Request: req, }.Do() if err != nil { return fmt.Errorf("doing announce: %w", err) From 3371522119dc054841c5ad783fafbdf0c69ad28d Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 16 Dec 2022 15:39:02 +1100 Subject: [PATCH 19/32] Use port from announce request, not packet source --- tracker/server.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tracker/server.go b/tracker/server.go index 6ef4ec4e..22f54da5 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -3,6 +3,7 @@ package tracker import ( "context" "encoding/hex" + "fmt" "net/netip" "sync" "time" @@ -123,8 +124,12 @@ func (me *AnnounceHandler) Serve( } }() + if req.Port != 0 { + addr = netip.AddrPortFrom(addr.Addr(), req.Port) + } ret.Err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr) if ret.Err != nil { + ret.Err = fmt.Errorf("tracking announce: %w", ret.Err) return } infoHash := req.InfoHash From 6731a3183955453aadd95ef3cb5541691e4e83ca Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sat, 17 Dec 2022 11:08:35 +1100 Subject: [PATCH 20/32] Pass seeders and leechers back in UDP announce --- tracker/udp/server/server.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index ad97f407..d86012dc 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -145,6 +145,8 @@ func (me *Server) handleAnnounce( } err = udp.Write(&buf, udp.AnnounceResponseHeader{ Interval: res.Interval.UnwrapOr(5 * 60), + Seeders: res.Seeders.Value, + Leechers: res.Leechers.Value, }) if err != nil { return err From 16da3c0c46a71d6f270600c63acb674226c1c9ae Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sat, 17 Dec 2022 11:08:46 +1100 Subject: [PATCH 21/32] Default to announcing as leecher --- cmd/torrent/announce.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/torrent/announce.go b/cmd/torrent/announce.go index f48dfd2a..31676d9a 100644 --- a/cmd/torrent/announce.go +++ b/cmd/torrent/announce.go @@ -23,6 +23,7 @@ func announceErr(flags AnnounceCmd) error { Port: uint16(torrent.NewDefaultClientConfig().ListenPort), NumWant: -1, Event: flags.Event, + Left: -1, } if flags.Port != nil { req.Port = *flags.Port From b06b6148456763ba92e33a29f7d4cbbe26035ebe Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 18 Dec 2022 10:56:25 +1100 Subject: [PATCH 22/32] Make trackerServer package --- tracker/http/server/server.go | 24 ++++++++++++++------- tracker/{ => server}/server.go | 18 ++++++++++------ tracker/{ => server}/upstream-announcing.go | 6 +++--- tracker/server/use.go | 9 ++++++++ tracker/udp/server/server.go | 8 +++---- 5 files changed, 44 insertions(+), 21 deletions(-) rename tracker/{ => server}/server.go (95%) rename tracker/{ => server}/upstream-announcing.go (61%) create mode 100644 tracker/server/use.go diff --git a/tracker/http/server/server.go b/tracker/http/server/server.go index 781c640b..bd2783e3 100644 --- a/tracker/http/server/server.go +++ b/tracker/http/server/server.go @@ -11,6 +11,7 @@ import ( "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/generics" "github.com/anacrolix/log" + trackerServer "github.com/anacrolix/torrent/tracker/server" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/tracker" @@ -18,7 +19,7 @@ import ( ) type Handler struct { - Announce *tracker.AnnounceHandler + Announce *trackerServer.AnnounceHandler // Called to derive an announcer's IP if non-nil. If not specified, the Request.RemoteAddr is // used. Necessary for instances running behind reverse proxies for example. RequestHost func(r *http.Request) (netip.Addr, error) @@ -74,13 +75,20 @@ func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } portU64, err := strconv.ParseUint(vs.Get("port"), 0, 16) addrPort := netip.AddrPortFrom(addr, uint16(portU64)) - res := me.Announce.Serve(r.Context(), tracker.AnnounceRequest{ - InfoHash: infoHash, - PeerId: peerId, - Event: event, - Port: addrPort.Port(), - NumWant: -1, - }, addrPort, tracker.GetPeersOpts{MaxCount: generics.Some[uint](200)}) + res := me.Announce.Serve( + r.Context(), + tracker.AnnounceRequest{ + InfoHash: infoHash, + PeerId: peerId, + Event: event, + Port: addrPort.Port(), + NumWant: -1, + }, + addrPort, + trackerServer.GetPeersOpts{ + MaxCount: generics.Some[uint](200), + }, + ) err = res.Err if err != nil { log.Printf("error serving announce: %v", err) diff --git a/tracker/server.go b/tracker/server/server.go similarity index 95% rename from tracker/server.go rename to tracker/server/server.go index 22f54da5..ced31616 100644 --- a/tracker/server.go +++ b/tracker/server/server.go @@ -1,4 +1,4 @@ -package tracker +package trackerServer import ( "context" @@ -10,6 +10,7 @@ import ( "github.com/anacrolix/generics" "github.com/anacrolix/log" + "github.com/anacrolix/torrent/tracker" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -36,7 +37,12 @@ type AnnounceAddr = netip.AddrPort type AnnounceTracker interface { TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error) - GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ServerAnnounceResult + GetPeers( + ctx context.Context, + infoHash InfoHash, + opts GetPeersOpts, + remote AnnounceAddr, + ) ServerAnnounceResult } type ServerAnnounceResult struct { @@ -150,7 +156,7 @@ func (me *AnnounceHandler) Serve( opts.MaxCount = generics.Some(newCount) } } - ret = me.AnnounceTracker.GetPeers(ctx, infoHash, opts) + ret = me.AnnounceTracker.GetPeers(ctx, infoHash, opts, addr) if ret.Err != nil { return } @@ -198,7 +204,7 @@ func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentat subReq := AnnounceRequest{ InfoHash: infoHash, PeerId: me.UpstreamAnnouncePeerId, - Event: None, + Event: tracker.None, Key: 0, NumWant: -1, Port: 0, @@ -219,7 +225,7 @@ func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentat return } log.Printf("announcing %x upstream to %v", infoHash, url) - resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{ + resp, err := client.Announce(announceCtx, subReq, tracker.AnnounceOpt{ UserAgent: "aragorn", }) interval := resp.Interval @@ -253,7 +259,7 @@ func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentat } trackReq := AnnounceRequest{ InfoHash: infoHash, - Event: Started, + Event: tracker.Started, Port: uint16(peer.Port), } copy(trackReq.PeerId[:], peer.ID) diff --git a/tracker/upstream-announcing.go b/tracker/server/upstream-announcing.go similarity index 61% rename from tracker/upstream-announcing.go rename to tracker/server/upstream-announcing.go index ab5a5fb3..cfbf61c8 100644 --- a/tracker/upstream-announcing.go +++ b/tracker/server/upstream-announcing.go @@ -1,4 +1,4 @@ -package tracker +package trackerServer import ( "context" @@ -7,12 +7,12 @@ import ( type UpstreamAnnounceGater interface { Start(ctx context.Context, tracker string, infoHash InfoHash, - // How long the announce block remains before discarding it. + // How long the announce block remains before discarding it. timeout time.Duration, ) (bool, error) Completed( ctx context.Context, tracker string, infoHash InfoHash, - // Num of seconds reported by tracker, or some suitable value the caller has chosen. + // Num of seconds reported by tracker, or some suitable value the caller has chosen. interval int32, ) error } diff --git a/tracker/server/use.go b/tracker/server/use.go new file mode 100644 index 00000000..942321c5 --- /dev/null +++ b/tracker/server/use.go @@ -0,0 +1,9 @@ +package trackerServer + +import "github.com/anacrolix/torrent/tracker" + +type ( + AnnounceRequest = tracker.AnnounceRequest + Client = tracker.Client + Peer = tracker.Peer +) diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index d86012dc..2007233f 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -13,10 +13,10 @@ import ( "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/generics" "github.com/anacrolix/log" + trackerServer "github.com/anacrolix/torrent/tracker/server" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/codes" - "github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/tracker/udp" ) @@ -29,12 +29,12 @@ type ConnectionTracker interface { type InfoHash = [20]byte -type AnnounceTracker = tracker.AnnounceTracker +type AnnounceTracker = trackerServer.AnnounceTracker type Server struct { ConnTracker ConnectionTracker SendResponse func(data []byte, addr net.Addr) (int, error) - Announce *tracker.AnnounceHandler + Announce *trackerServer.AnnounceHandler } type RequestSourceAddr = net.Addr @@ -106,7 +106,7 @@ func (me *Server) handleAnnounce( err = fmt.Errorf("converting source net.Addr to AnnounceAddr: %w", err) return err } - opts := tracker.GetPeersOpts{MaxCount: generics.Some[uint](50)} + opts := trackerServer.GetPeersOpts{MaxCount: generics.Some[uint](50)} if addrFamily == udp.AddrFamilyIpv4 { opts.MaxCount = generics.Some[uint](150) } From 7f3655a14bc1fa247e22d4675b911fb1307c1999 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 29 Dec 2022 10:21:34 +1100 Subject: [PATCH 23/32] Track request payload len --- tracker/udp/server/server.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index 2007233f..9df2fc67 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -15,7 +15,9 @@ import ( "github.com/anacrolix/log" trackerServer "github.com/anacrolix/torrent/tracker/server" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "github.com/anacrolix/torrent/tracker/udp" ) @@ -47,7 +49,8 @@ func (me *Server) HandleRequest( source RequestSourceAddr, body []byte, ) (err error) { - ctx, span := tracer.Start(ctx, "Server.HandleRequest") + ctx, span := tracer.Start(ctx, "Server.HandleRequest", + trace.WithAttributes(attribute.Int("payload.len", len(body)))) defer span.End() defer func() { if err != nil { From 7d3d4bc08834b406e97c53cf240f16a19004c49c Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 29 Dec 2022 10:22:03 +1100 Subject: [PATCH 24/32] Add Context parameter to SendResponse --- tracker/udp/server/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index 9df2fc67..cd9d9f0f 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -35,7 +35,7 @@ type AnnounceTracker = trackerServer.AnnounceTracker type Server struct { ConnTracker ConnectionTracker - SendResponse func(data []byte, addr net.Addr) (int, error) + SendResponse func(ctx context.Context, data []byte, addr net.Addr) (int, error) Announce *trackerServer.AnnounceHandler } @@ -160,7 +160,7 @@ func (me *Server) handleAnnounce( return err } buf.Write(b) - n, err := me.SendResponse(buf.Bytes(), source) + n, err := me.SendResponse(ctx, buf.Bytes(), source) if err != nil { return err } @@ -183,7 +183,7 @@ func (me *Server) handleConnect(ctx context.Context, source RequestSourceAddr, t TransactionId: tid, }) udp.Write(&buf, udp.ConnectionResponse{connId}) - n, err := me.SendResponse(buf.Bytes(), source) + n, err := me.SendResponse(ctx, buf.Bytes(), source) if err != nil { return err } From 5f127343b0cd3fcf0aa51401852499b16fafefdb Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 29 Dec 2022 19:40:20 +1100 Subject: [PATCH 25/32] Fix race when final peers are available early --- tracker/server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tracker/server/server.go b/tracker/server/server.go index ced31616..6e845eee 100644 --- a/tracker/server/server.go +++ b/tracker/server/server.go @@ -87,7 +87,7 @@ func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) { select { case ret = <-me.curPeers: case <-me.doneAnnouncing: - ret = me.finalPeers + ret = copyPeerSet(me.finalPeers) done = true } return From f61085c7859f36a78391e32d302b0c6cd3fbb0db Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 29 Dec 2022 19:41:09 +1100 Subject: [PATCH 26/32] Avoid panic in AnnounceEvent.String --- tracker/udp/announce.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tracker/udp/announce.go b/tracker/udp/announce.go index 59b6c6cf..b5c9f8ff 100644 --- a/tracker/udp/announce.go +++ b/tracker/udp/announce.go @@ -38,7 +38,12 @@ func (me *AnnounceEvent) UnmarshalText(text []byte) error { var announceEventStrings = []string{"", "completed", "started", "stopped"} func (e AnnounceEvent) String() string { - // See BEP 3, "event", and https://github.com/anacrolix/torrent/issues/416#issuecomment-751427001. + // See BEP 3, "event", and + // https://github.com/anacrolix/torrent/issues/416#issuecomment-751427001. Return a safe default + // in case event values are not sanitized. + if e < 0 || int(e) >= len(announceEventStrings) { + return "" + } return announceEventStrings[e] } From 4f8826483ead1e42b27783d3e8d234610b3ab02c Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 29 Dec 2022 19:41:36 +1100 Subject: [PATCH 27/32] Resize packet buffer to avoid wasting memory --- tracker/udp/server/server.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index cd9d9f0f..c686a23d 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -205,14 +205,15 @@ func randomConnectionId() udp.ConnectionId { func RunSimple(ctx context.Context, s *Server, pc net.PacketConn, family udp.AddrFamily) error { ctx, cancel := context.WithCancel(ctx) defer cancel() + var b [1500]byte for { - var b [1500]byte n, addr, err := pc.ReadFrom(b[:]) if err != nil { return err } + b := append([]byte(nil), b[:n]...) go func() { - err := s.HandleRequest(ctx, family, addr, b[:n]) + err := s.HandleRequest(ctx, family, addr, b) if err != nil { log.Printf("error handling %v byte request from %v: %v", n, addr, err) } From f38629d35489e8192c9971864c89bffd5670001c Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 29 Dec 2022 19:42:19 +1100 Subject: [PATCH 28/32] Assume upstream peers are leechers --- tracker/server/server.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tracker/server/server.go b/tracker/server/server.go index 6e845eee..823816d5 100644 --- a/tracker/server/server.go +++ b/tracker/server/server.go @@ -261,8 +261,11 @@ func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentat InfoHash: infoHash, Event: tracker.Started, Port: uint16(peer.Port), + // Let's assume upstream peers are leechers without knowing better. + Left: -1, } copy(trackReq.PeerId[:], peer.ID) + // TODO: How do we know if these peers are leechers or seeders? err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort) if err != nil { log.Levelf(log.Error, "error tracking upstream peer: %v", err) From 6baf8dcb9989cb22b20893b20347be010475f0b4 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 3 Jan 2023 00:14:21 +1100 Subject: [PATCH 29/32] Limit udp tracker server request concurrency --- tracker/udp/server/server.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index c686a23d..6f0fb438 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -206,13 +206,24 @@ func RunSimple(ctx context.Context, s *Server, pc net.PacketConn, family udp.Add ctx, cancel := context.WithCancel(ctx) defer cancel() var b [1500]byte + // Limit concurrent handled requests. + sem := make(chan struct{}, 1000) for { n, addr, err := pc.ReadFrom(b[:]) if err != nil { return err } + select { + case <-ctx.Done(): + return ctx.Err() + default: + log.Printf("dropping request from %v: concurrency limit reached", addr) + continue + case sem <- struct{}{}: + } b := append([]byte(nil), b[:n]...) go func() { + defer func() { <-sem }() err := s.HandleRequest(ctx, family, addr, b) if err != nil { log.Printf("error handling %v byte request from %v: %v", n, addr, err) From 2dfb57f3f7290e1230c0d64fea27d51c1f407fa4 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 5 Jan 2023 00:04:42 +1100 Subject: [PATCH 30/32] Handle left param for http tracker server announces --- tracker/http/server/server.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tracker/http/server/server.go b/tracker/http/server/server.go index bd2783e3..9a21b8c1 100644 --- a/tracker/http/server/server.go +++ b/tracker/http/server/server.go @@ -73,8 +73,12 @@ func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "error determining your IP", http.StatusBadGateway) return } - portU64, err := strconv.ParseUint(vs.Get("port"), 0, 16) + portU64, _ := strconv.ParseUint(vs.Get("port"), 0, 16) addrPort := netip.AddrPortFrom(addr, uint16(portU64)) + left, err := strconv.ParseInt(vs.Get("left"), 0, 64) + if err != nil { + left = -1 + } res := me.Announce.Serve( r.Context(), tracker.AnnounceRequest{ @@ -83,6 +87,7 @@ func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { Event: event, Port: addrPort.Port(), NumWant: -1, + Left: left, }, addrPort, trackerServer.GetPeersOpts{ From 1eb923c9479ceba2f57f25a5d64b9167804cd9ad Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 5 Jan 2023 00:05:34 +1100 Subject: [PATCH 31/32] Forward leechers and seeders announce handler results --- tracker/http/server/server.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tracker/http/server/server.go b/tracker/http/server/server.go index 9a21b8c1..30be15c6 100644 --- a/tracker/http/server/server.go +++ b/tracker/http/server/server.go @@ -101,6 +101,8 @@ func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } var resp httpTracker.HttpResponse + resp.Incomplete = res.Leechers.Value + resp.Complete = res.Seeders.Value resp.Interval = res.Interval.UnwrapOr(5 * 60) resp.Peers.Compact = true for _, peer := range res.Peers { From 8fe9fb78aafb280d08186f1d5d1ecd5d837aad5e Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 8 Jan 2023 17:22:02 +1100 Subject: [PATCH 32/32] Add span for udp packet handling --- tracker/udp/server/server.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index 6f0fb438..5666e805 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -210,19 +210,27 @@ func RunSimple(ctx context.Context, s *Server, pc net.PacketConn, family udp.Add sem := make(chan struct{}, 1000) for { n, addr, err := pc.ReadFrom(b[:]) + ctx, span := tracer.Start(ctx, "handle udp packet") if err != nil { + span.SetStatus(codes.Error, err.Error()) + span.End() return err } select { case <-ctx.Done(): + span.SetStatus(codes.Error, err.Error()) + span.End() return ctx.Err() default: - log.Printf("dropping request from %v: concurrency limit reached", addr) + span.SetStatus(codes.Error, "concurrency limit reached") + span.End() + log.Levelf(log.Debug, "dropping request from %v: concurrency limit reached", addr) continue case sem <- struct{}{}: } b := append([]byte(nil), b[:n]...) go func() { + defer span.End() defer func() { <-sem }() err := s.HandleRequest(ctx, family, addr, b) if err != nil {