Support upstream trackers

This commit is contained in:
Matt Joiner 2022-12-07 01:54:38 +11:00
parent 425a91c921
commit c23269d1cc
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
4 changed files with 221 additions and 27 deletions

View File

@ -3,16 +3,24 @@ package httpTracker
import ( import (
"fmt" "fmt"
"net" "net"
"net/netip"
"github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/dht/v2/krpc"
) )
// TODO: Use netip.Addr and Option[[20]byte].
type Peer struct { type Peer struct {
IP net.IP `bencode:"ip"` IP net.IP `bencode:"ip"`
Port int `bencode:"port"` Port int `bencode:"port"`
ID []byte `bencode:"peer id"` ID []byte `bencode:"peer id"`
} }
func (p Peer) ToNetipAddrPort() (addrPort netip.AddrPort, ok bool) {
addr, ok := netip.AddrFromSlice(p.IP)
addrPort = netip.AddrPortFrom(addr, uint16(p.Port))
return
}
func (p Peer) String() string { func (p Peer) String() string {
loc := net.JoinHostPort(p.IP.String(), fmt.Sprintf("%d", p.Port)) loc := net.JoinHostPort(p.IP.String(), fmt.Sprintf("%d", p.Port))
if len(p.ID) != 0 { if len(p.ID) != 0 {

View File

@ -14,11 +14,10 @@ import (
"github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/tracker"
httpTracker "github.com/anacrolix/torrent/tracker/http" httpTracker "github.com/anacrolix/torrent/tracker/http"
udpTrackerServer "github.com/anacrolix/torrent/tracker/udp/server"
) )
type Handler struct { type Handler struct {
AnnounceTracker udpTrackerServer.AnnounceTracker Announce tracker.AnnounceHandler
// Called to derive an announcer's IP if non-nil. If not specified, the Request.RemoteAddr is // Called to derive an announcer's IP if non-nil. If not specified, the Request.RemoteAddr is
// used. Necessary for instances running behind reverse proxies for example. // used. Necessary for instances running behind reverse proxies for example.
RequestHost func(r *http.Request) (netip.Addr, error) RequestHost func(r *http.Request) (netip.Addr, error)
@ -74,21 +73,15 @@ func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
portU64, err := strconv.ParseUint(vs.Get("port"), 0, 16) portU64, err := strconv.ParseUint(vs.Get("port"), 0, 16)
addrPort := netip.AddrPortFrom(addr, uint16(portU64)) addrPort := netip.AddrPortFrom(addr, uint16(portU64))
err = me.AnnounceTracker.TrackAnnounce(r.Context(), tracker.AnnounceRequest{ peers, err := me.Announce.Serve(r.Context(), tracker.AnnounceRequest{
InfoHash: infoHash, InfoHash: infoHash,
PeerId: peerId, PeerId: peerId,
Event: event, Event: event,
Port: addrPort.Port(), Port: addrPort.Port(),
}, addrPort) }, addrPort)
if err != nil { if err != nil {
log.Printf("error tracking announce: %v", err) log.Printf("error serving announce: %v", err)
http.Error(w, "error tracking announce", http.StatusInternalServerError) http.Error(w, "error handling announce", http.StatusInternalServerError)
return
}
peers, err := me.AnnounceTracker.GetPeers(r.Context(), infoHash, tracker.GetPeersOpts{})
if err != nil {
log.Printf("error getting peers: %v", err)
http.Error(w, "error getting peers", http.StatusInternalServerError)
return return
} }
var resp httpTracker.HttpResponse var resp httpTracker.HttpResponse

View File

@ -3,7 +3,11 @@ package tracker
import ( import (
"context" "context"
"net/netip" "net/netip"
"sync"
"time"
"github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/anacrolix/torrent/tracker/udp" "github.com/anacrolix/torrent/tracker/udp"
) )
@ -25,11 +29,201 @@ type AnnounceTracker interface {
GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error) GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error)
} }
// type AnnounceHandler struct {
//type Server struct { AnnounceTracker AnnounceTracker
// AnnounceTracker AnnounceTracker UpstreamTrackers []Client
//} UpstreamTrackerUrls []string
// UpstreamAnnouncePeerId [20]byte
//func (me Server) HandleAnnounce(req udp.AnnounceRequest, sourceAddr AnnounceAddr) error {
// mu sync.Mutex
//} // Operations are only removed when all the upstream peers have been tracked.
ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
}
type peerSet = map[PeerInfo]struct{}
type augmentationOperation struct {
// Closed when no more announce responses are pending. finalPeers will contain all the peers
// seen.
doneAnnouncing chan struct{}
// This receives the latest peerSet until doneAnnouncing is closed.
curPeers chan peerSet
// This contains the final peerSet after doneAnnouncing is closed.
finalPeers peerSet
}
func (me augmentationOperation) getCurPeers() (ret peerSet) {
ret, _ = me.getCurPeersAndDone()
return
}
func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
select {
case ret = <-me.curPeers:
case <-me.doneAnnouncing:
ret = me.finalPeers
done = true
}
return
}
// Adds peers from new that aren't in orig. Modifies both arguments.
func addMissing(orig []PeerInfo, new peerSet) {
for _, peer := range orig {
delete(new, peer)
}
for peer := range new {
orig = append(orig, peer)
}
}
func (me *AnnounceHandler) Serve(
ctx context.Context, req AnnounceRequest, addr AnnounceAddr,
) (peers []PeerInfo, err error) {
err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
if err != nil {
return
}
infoHash := req.InfoHash
var op generics.Option[augmentationOperation]
// Grab a handle to any augmentations that are already running.
me.mu.Lock()
op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
me.mu.Unlock()
peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, GetPeersOpts{})
if err != nil {
return
}
// Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
// the mutex or not.
if op.Ok {
curPeers, done := op.Value.getCurPeersAndDone()
addMissing(peers, curPeers)
if done {
// It doesn't get any better with this operation. Forget it.
op.Ok = false
}
}
me.mu.Lock()
// If we didn't have an operation, and don't have enough peers, start one.
if !op.Ok && len(peers) <= 1 {
op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
if !op.Ok {
op.Set(me.augmentPeersFromUpstream(req))
generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
}
}
me.mu.Unlock()
// Wait a while for the current operation.
if op.Ok {
// Force the augmentation to return with whatever it has if it hasn't completed in a
// reasonable time.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
select {
case <-ctx.Done():
case <-op.Value.doneAnnouncing:
}
cancel()
addMissing(peers, op.Value.getCurPeers())
}
return
}
func (me *AnnounceHandler) augmentPeersFromUpstream(req AnnounceRequest) augmentationOperation {
announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
subReq := AnnounceRequest{
InfoHash: req.InfoHash,
PeerId: me.UpstreamAnnouncePeerId,
Event: None,
Key: 0,
NumWant: -1,
Port: 0,
}
peersChan := make(chan []Peer)
var pendingUpstreams sync.WaitGroup
for i := range me.UpstreamTrackers {
client := me.UpstreamTrackers[i]
url := me.UpstreamTrackerUrls[i]
pendingUpstreams.Add(1)
go func() {
resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
UserAgent: "aragorn",
})
peersChan <- resp.Peers
if err != nil {
log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
}
}()
}
peersToTrack := make(map[string]Peer)
go func() {
pendingUpstreams.Wait()
cancel()
close(peersChan)
log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers")
for _, peer := range peersToTrack {
addrPort, ok := peer.ToNetipAddrPort()
if !ok {
continue
}
trackReq := AnnounceRequest{
InfoHash: req.InfoHash,
Event: Started,
Port: uint16(peer.Port),
}
copy(trackReq.PeerId[:], peer.ID)
err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
if err != nil {
log.Levelf(log.Error, "error tracking upstream peer: %v", err)
}
}
me.mu.Lock()
delete(me.ongoingUpstreamAugmentations, req.InfoHash)
me.mu.Unlock()
}()
curPeersChan := make(chan map[PeerInfo]struct{})
doneChan := make(chan struct{})
retPeers := make(map[PeerInfo]struct{})
go func() {
for {
select {
case peers, ok := <-peersChan:
if !ok {
return
}
voldemort(peers, peersToTrack, retPeers)
pendingUpstreams.Done()
case curPeersChan <- copyPeerSet(retPeers):
}
}
}()
// Take return references.
return augmentationOperation{
curPeers: curPeersChan,
finalPeers: retPeers,
doneAnnouncing: doneChan,
}
}
func copyPeerSet(orig peerSet) (ret peerSet) {
ret = make(peerSet, len(orig))
for k, v := range orig {
ret[k] = v
}
return
}
// Adds peers to trailing containers.
func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
for _, protoPeer := range peers {
toTrack[protoPeer.String()] = protoPeer
addr, ok := netip.AddrFromSlice(protoPeer.IP)
if !ok {
continue
}
handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
for _, set := range sets {
set[handlerPeer] = struct{}{}
}
}
}

View File

@ -31,7 +31,7 @@ type AnnounceTracker = tracker.AnnounceTracker
type Server struct { type Server struct {
ConnTracker ConnectionTracker ConnTracker ConnectionTracker
SendResponse func(data []byte, addr net.Addr) (int, error) SendResponse func(data []byte, addr net.Addr) (int, error)
AnnounceTracker AnnounceTracker Announce tracker.AnnounceHandler
} }
type RequestSourceAddr = net.Addr type RequestSourceAddr = net.Addr
@ -72,6 +72,9 @@ func (me *Server) handleAnnounce(
tid udp.TransactionId, tid udp.TransactionId,
r *bytes.Reader, r *bytes.Reader,
) error { ) error {
// Should we set a timeout of 10s or something for the entire response, so that we give up if a
// retry is imminent?
ok, err := me.ConnTracker.Check(ctx, source.String(), connId) ok, err := me.ConnTracker.Check(ctx, source.String(), connId)
if err != nil { if err != nil {
err = fmt.Errorf("checking conn id: %w", err) err = fmt.Errorf("checking conn id: %w", err)
@ -91,11 +94,7 @@ func (me *Server) handleAnnounce(
err = fmt.Errorf("converting source net.Addr to AnnounceAddr: %w", err) err = fmt.Errorf("converting source net.Addr to AnnounceAddr: %w", err)
return err return err
} }
err = me.AnnounceTracker.TrackAnnounce(ctx, req, announceAddr) peers, err := me.Announce.Serve(ctx, req, announceAddr)
if err != nil {
return err
}
peers, err := me.AnnounceTracker.GetPeers(ctx, req.InfoHash, tracker.GetPeersOpts{})
if err != nil { if err != nil {
return err return err
} }