diff --git a/tracker/http/peer.go b/tracker/http/peer.go index 363ba6d3..b0deee0b 100644 --- a/tracker/http/peer.go +++ b/tracker/http/peer.go @@ -3,16 +3,24 @@ package httpTracker import ( "fmt" "net" + "net/netip" "github.com/anacrolix/dht/v2/krpc" ) +// TODO: Use netip.Addr and Option[[20]byte]. type Peer struct { IP net.IP `bencode:"ip"` Port int `bencode:"port"` ID []byte `bencode:"peer id"` } +func (p Peer) ToNetipAddrPort() (addrPort netip.AddrPort, ok bool) { + addr, ok := netip.AddrFromSlice(p.IP) + addrPort = netip.AddrPortFrom(addr, uint16(p.Port)) + return +} + func (p Peer) String() string { loc := net.JoinHostPort(p.IP.String(), fmt.Sprintf("%d", p.Port)) if len(p.ID) != 0 { diff --git a/tracker/http/server/server.go b/tracker/http/server/server.go index 0840fe96..10713a1b 100644 --- a/tracker/http/server/server.go +++ b/tracker/http/server/server.go @@ -14,11 +14,10 @@ import ( "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/tracker" httpTracker "github.com/anacrolix/torrent/tracker/http" - udpTrackerServer "github.com/anacrolix/torrent/tracker/udp/server" ) type Handler struct { - AnnounceTracker udpTrackerServer.AnnounceTracker + Announce tracker.AnnounceHandler // Called to derive an announcer's IP if non-nil. If not specified, the Request.RemoteAddr is // used. Necessary for instances running behind reverse proxies for example. RequestHost func(r *http.Request) (netip.Addr, error) @@ -74,21 +73,15 @@ func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } portU64, err := strconv.ParseUint(vs.Get("port"), 0, 16) addrPort := netip.AddrPortFrom(addr, uint16(portU64)) - err = me.AnnounceTracker.TrackAnnounce(r.Context(), tracker.AnnounceRequest{ + peers, err := me.Announce.Serve(r.Context(), tracker.AnnounceRequest{ InfoHash: infoHash, PeerId: peerId, Event: event, Port: addrPort.Port(), }, addrPort) if err != nil { - log.Printf("error tracking announce: %v", err) - http.Error(w, "error tracking announce", http.StatusInternalServerError) - return - } - peers, err := me.AnnounceTracker.GetPeers(r.Context(), infoHash, tracker.GetPeersOpts{}) - if err != nil { - log.Printf("error getting peers: %v", err) - http.Error(w, "error getting peers", http.StatusInternalServerError) + log.Printf("error serving announce: %v", err) + http.Error(w, "error handling announce", http.StatusInternalServerError) return } var resp httpTracker.HttpResponse diff --git a/tracker/server.go b/tracker/server.go index 077d3244..2b483436 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -3,7 +3,11 @@ package tracker import ( "context" "net/netip" + "sync" + "time" + "github.com/anacrolix/generics" + "github.com/anacrolix/log" "github.com/anacrolix/torrent/tracker/udp" ) @@ -25,11 +29,201 @@ type AnnounceTracker interface { GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error) } -// -//type Server struct { -// AnnounceTracker AnnounceTracker -//} -// -//func (me Server) HandleAnnounce(req udp.AnnounceRequest, sourceAddr AnnounceAddr) error { -// -//} +type AnnounceHandler struct { + AnnounceTracker AnnounceTracker + UpstreamTrackers []Client + UpstreamTrackerUrls []string + UpstreamAnnouncePeerId [20]byte + + mu sync.Mutex + // Operations are only removed when all the upstream peers have been tracked. + ongoingUpstreamAugmentations map[InfoHash]augmentationOperation +} + +type peerSet = map[PeerInfo]struct{} + +type augmentationOperation struct { + // Closed when no more announce responses are pending. finalPeers will contain all the peers + // seen. + doneAnnouncing chan struct{} + // This receives the latest peerSet until doneAnnouncing is closed. + curPeers chan peerSet + // This contains the final peerSet after doneAnnouncing is closed. + finalPeers peerSet +} + +func (me augmentationOperation) getCurPeers() (ret peerSet) { + ret, _ = me.getCurPeersAndDone() + return +} + +func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) { + select { + case ret = <-me.curPeers: + case <-me.doneAnnouncing: + ret = me.finalPeers + done = true + } + return +} + +// Adds peers from new that aren't in orig. Modifies both arguments. +func addMissing(orig []PeerInfo, new peerSet) { + for _, peer := range orig { + delete(new, peer) + } + for peer := range new { + orig = append(orig, peer) + } +} + +func (me *AnnounceHandler) Serve( + ctx context.Context, req AnnounceRequest, addr AnnounceAddr, +) (peers []PeerInfo, err error) { + err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr) + if err != nil { + return + } + infoHash := req.InfoHash + var op generics.Option[augmentationOperation] + // Grab a handle to any augmentations that are already running. + me.mu.Lock() + op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash] + me.mu.Unlock() + peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, GetPeersOpts{}) + if err != nil { + return + } + // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside + // the mutex or not. + if op.Ok { + curPeers, done := op.Value.getCurPeersAndDone() + addMissing(peers, curPeers) + if done { + // It doesn't get any better with this operation. Forget it. + op.Ok = false + } + } + me.mu.Lock() + // If we didn't have an operation, and don't have enough peers, start one. + if !op.Ok && len(peers) <= 1 { + op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash] + if !op.Ok { + op.Set(me.augmentPeersFromUpstream(req)) + generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value) + } + } + me.mu.Unlock() + // Wait a while for the current operation. + if op.Ok { + // Force the augmentation to return with whatever it has if it hasn't completed in a + // reasonable time. + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + select { + case <-ctx.Done(): + case <-op.Value.doneAnnouncing: + } + cancel() + addMissing(peers, op.Value.getCurPeers()) + } + return +} + +func (me *AnnounceHandler) augmentPeersFromUpstream(req AnnounceRequest) augmentationOperation { + announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute) + subReq := AnnounceRequest{ + InfoHash: req.InfoHash, + PeerId: me.UpstreamAnnouncePeerId, + Event: None, + Key: 0, + NumWant: -1, + Port: 0, + } + peersChan := make(chan []Peer) + var pendingUpstreams sync.WaitGroup + for i := range me.UpstreamTrackers { + client := me.UpstreamTrackers[i] + url := me.UpstreamTrackerUrls[i] + pendingUpstreams.Add(1) + go func() { + resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{ + UserAgent: "aragorn", + }) + peersChan <- resp.Peers + if err != nil { + log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err) + } + }() + } + peersToTrack := make(map[string]Peer) + go func() { + pendingUpstreams.Wait() + cancel() + close(peersChan) + log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers") + for _, peer := range peersToTrack { + addrPort, ok := peer.ToNetipAddrPort() + if !ok { + continue + } + trackReq := AnnounceRequest{ + InfoHash: req.InfoHash, + Event: Started, + Port: uint16(peer.Port), + } + copy(trackReq.PeerId[:], peer.ID) + err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort) + if err != nil { + log.Levelf(log.Error, "error tracking upstream peer: %v", err) + } + } + me.mu.Lock() + delete(me.ongoingUpstreamAugmentations, req.InfoHash) + me.mu.Unlock() + }() + curPeersChan := make(chan map[PeerInfo]struct{}) + doneChan := make(chan struct{}) + retPeers := make(map[PeerInfo]struct{}) + go func() { + for { + select { + case peers, ok := <-peersChan: + if !ok { + return + } + voldemort(peers, peersToTrack, retPeers) + pendingUpstreams.Done() + case curPeersChan <- copyPeerSet(retPeers): + } + } + }() + // Take return references. + return augmentationOperation{ + curPeers: curPeersChan, + finalPeers: retPeers, + doneAnnouncing: doneChan, + } +} + +func copyPeerSet(orig peerSet) (ret peerSet) { + ret = make(peerSet, len(orig)) + for k, v := range orig { + ret[k] = v + } + return +} + +// Adds peers to trailing containers. +func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) { + for _, protoPeer := range peers { + toTrack[protoPeer.String()] = protoPeer + addr, ok := netip.AddrFromSlice(protoPeer.IP) + if !ok { + continue + } + handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))} + for _, set := range sets { + set[handlerPeer] = struct{}{} + } + } +} diff --git a/tracker/udp/server/server.go b/tracker/udp/server/server.go index 68abb106..95e7a5e7 100644 --- a/tracker/udp/server/server.go +++ b/tracker/udp/server/server.go @@ -29,9 +29,9 @@ type InfoHash = [20]byte type AnnounceTracker = tracker.AnnounceTracker type Server struct { - ConnTracker ConnectionTracker - SendResponse func(data []byte, addr net.Addr) (int, error) - AnnounceTracker AnnounceTracker + ConnTracker ConnectionTracker + SendResponse func(data []byte, addr net.Addr) (int, error) + Announce tracker.AnnounceHandler } type RequestSourceAddr = net.Addr @@ -72,6 +72,9 @@ func (me *Server) handleAnnounce( tid udp.TransactionId, r *bytes.Reader, ) error { + // Should we set a timeout of 10s or something for the entire response, so that we give up if a + // retry is imminent? + ok, err := me.ConnTracker.Check(ctx, source.String(), connId) if err != nil { err = fmt.Errorf("checking conn id: %w", err) @@ -91,11 +94,7 @@ func (me *Server) handleAnnounce( err = fmt.Errorf("converting source net.Addr to AnnounceAddr: %w", err) return err } - err = me.AnnounceTracker.TrackAnnounce(ctx, req, announceAddr) - if err != nil { - return err - } - peers, err := me.AnnounceTracker.GetPeers(ctx, req.InfoHash, tracker.GetPeersOpts{}) + peers, err := me.Announce.Serve(ctx, req, announceAddr) if err != nil { return err }