diff --git a/Peers.go b/Peers.go index 72f7e8e1..5c9f2043 100644 --- a/Peers.go +++ b/Peers.go @@ -1,6 +1,9 @@ package torrent -import "github.com/anacrolix/dht/krpc" +import ( + "github.com/anacrolix/dht/krpc" + "github.com/anacrolix/torrent/tracker" +) type Peers []Peer @@ -15,3 +18,16 @@ func (me *Peers) FromPex(nas []krpc.NodeAddr, fs []pexPeerFlags) { *me = append(*me, p) } } + +func (ret Peers) FromTracker(ps []tracker.Peer) Peers { + for _, p := range ps { + _p := Peer{ + IP: p.IP, + Port: p.Port, + Source: peerSourceTracker, + } + copy(_p.Id[:], p.ID) + ret = append(ret, _p) + } + return ret +} diff --git a/client.go b/client.go index a05a754d..fc774ba1 100644 --- a/client.go +++ b/client.go @@ -5,12 +5,12 @@ import ( "bytes" "context" "crypto/rand" + "encoding/binary" "errors" "expvar" "fmt" "io" "net" - "net/url" "strconv" "strings" "time" @@ -122,6 +122,7 @@ func (cl *Client) WriteStatus(_w io.Writer) { fmt.Fprintln(w, "Not listening!") } fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID()) + fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey()) fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked())) if dht := cl.DHT(); dht != nil { dhtStats := dht.Stats() @@ -237,6 +238,10 @@ func (cl *Client) initLogger() { cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter)) } +func (cl *Client) announceKey() int32 { + return int32(binary.BigEndian.Uint32(cl.peerID[16:20])) +} + // Creates a new client. func NewClient(cfg *Config) (cl *Client, err error) { if cfg == nil { @@ -402,6 +407,11 @@ func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) { return cl.ipBlockList.Lookup(ip) } +func (cl *Client) ipIsBlocked(ip net.IP) bool { + _, blocked := cl.ipBlockRange(ip) + return blocked +} + func (cl *Client) waitAccept() { for { for _, t := range cl.torrents { @@ -1114,33 +1124,6 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) { return } -func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) { - _url, err := url.Parse(announceURL) - if err != nil { - return - } - hmp := missinggo.SplitHostMaybePort(_url.Host) - if hmp.Err != nil { - err = hmp.Err - return - } - addr, err := net.ResolveIPAddr("ip", hmp.Host) - if err != nil { - return - } - if addr.IP == nil { - panic(hmp.Host) - } - cl.mu.RLock() - _, blocked = cl.ipBlockRange(addr.IP) - cl.mu.RUnlock() - host = _url.Host - hmp.Host = addr.String() - _url.Host = hmp.String() - urlToUse = _url.String() - return -} - func (cl *Client) allTorrentsCompleted() bool { for _, t := range cl.torrents { if !t.haveInfo() { diff --git a/client_test.go b/client_test.go index d0d88cdf..c19129f9 100644 --- a/client_test.go +++ b/client_test.go @@ -902,15 +902,6 @@ func TestPieceCompletedInStorageButNotClient(t *testing.T) { }) } -func TestPrepareTrackerAnnounce(t *testing.T) { - cl := &Client{} - blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp") - require.NoError(t, err) - assert.False(t, blocked) - assert.EqualValues(t, "localhost:1234", host) - assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse) -} - // Check that when the listen port is 0, all the protocols listened on have // the same port, and it isn't zero. func TestClientDynamicListenPortAllProtocols(t *testing.T) { diff --git a/torrent.go b/torrent.go index e0f5ee3c..0b14f3f2 100644 --- a/torrent.go +++ b/torrent.go @@ -9,6 +9,7 @@ import ( "math" "math/rand" "net" + "net/url" "os" "strconv" "sync" @@ -585,7 +586,7 @@ func (t *Torrent) writeStatus(w io.Writer) { tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0) fmt.Fprintf(tw, " URL\tNext announce\tLast announce\n") for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool { - return l.url < r.url + return l.u.String() < r.u.String() }).([]*trackerScraper) { fmt.Fprintf(tw, " %s\n", ta.statusLine()) } @@ -1303,21 +1304,35 @@ func (t *Torrent) seeding() bool { return true } -func (t *Torrent) startScrapingTracker(url string) { - if url == "" { +func (t *Torrent) startScrapingTracker(_url string) { + if _url == "" { return } - if _, ok := t.trackerAnnouncers[url]; ok { + u, _ := url.Parse(_url) + if u.Scheme == "udp" { + u.Scheme = "udp4" + t.startScrapingTracker(u.String()) + u.Scheme = "udp6" + t.startScrapingTracker(u.String()) + return + } + if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) { + return + } + if u.Scheme == "udp6" && t.cl.config.DisableIPv6 { + return + } + if _, ok := t.trackerAnnouncers[_url]; ok { return } newAnnouncer := &trackerScraper{ - url: url, - t: t, + u: *u, + t: t, } if t.trackerAnnouncers == nil { t.trackerAnnouncers = make(map[string]*trackerScraper) } - t.trackerAnnouncers[url] = newAnnouncer + t.trackerAnnouncers[_url] = newAnnouncer go newAnnouncer.Run() } @@ -1345,6 +1360,7 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest { PeerId: t.cl.peerID, InfoHash: t.infoHash, Left: t.bytesLeftAnnounce(), + Key: t.cl.announceKey(), } } diff --git a/tracker/http.go b/tracker/http.go index 36cfa303..5cec503e 100644 --- a/tracker/http.go +++ b/tracker/http.go @@ -64,7 +64,7 @@ func (me *Peers) UnmarshalBencode(b []byte) (err error) { } } -func setAnnounceParams(_url *url.URL, ar *AnnounceRequest) { +func setAnnounceParams(_url *url.URL, ar *AnnounceRequest, opts Announce) { q := _url.Query() q.Set("info_hash", string(ar.InfoHash[:])) @@ -81,17 +81,22 @@ func setAnnounceParams(_url *url.URL, ar *AnnounceRequest) { // According to https://wiki.vuze.com/w/Message_Stream_Encryption. TODO: // Take EncryptionPolicy or something like it as a parameter. q.Set("supportcrypto", "1") - + if opts.ClientIp4.IP != nil { + q.Set("ipv4", opts.ClientIp4.String()) + } + if opts.ClientIp6.IP != nil { + q.Set("ipv6", opts.ClientIp6.String()) + } _url.RawQuery = q.Encode() } -func announceHTTP(cl *http.Client, userAgent string, ar *AnnounceRequest, _url *url.URL, host string) (ret AnnounceResponse, err error) { +func announceHTTP(opt Announce, _url *url.URL) (ret AnnounceResponse, err error) { _url = httptoo.CopyURL(_url) - setAnnounceParams(_url, ar) + setAnnounceParams(_url, &opt.Request, opt) req, err := http.NewRequest("GET", _url.String(), nil) - req.Header.Set("User-Agent", userAgent) - req.Host = host - resp, err := cl.Do(req) + req.Header.Set("User-Agent", opt.UserAgent) + req.Host = opt.HostHeader + resp, err := opt.HttpClient.Do(req) if err != nil { return } diff --git a/tracker/peer.go b/tracker/peer.go index 604d61e2..44d61e82 100644 --- a/tracker/peer.go +++ b/tracker/peer.go @@ -2,6 +2,8 @@ package tracker import ( "net" + + "github.com/anacrolix/dht/krpc" ) type Peer struct { @@ -16,3 +18,9 @@ func (p *Peer) fromDictInterface(d map[string]interface{}) { p.ID = []byte(d["peer id"].(string)) p.Port = int(d["port"].(int64)) } + +func (p Peer) FromNodeAddr(na krpc.NodeAddr) Peer { + p.IP = na.IP + p.Port = na.Port + return p +} diff --git a/tracker/server.go b/tracker/server.go index fbb9840d..be982230 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -2,18 +2,20 @@ package tracker import ( "bytes" + "encoding" "encoding/binary" "fmt" "math/rand" "net" "github.com/anacrolix/dht/krpc" + "github.com/anacrolix/missinggo" ) type torrent struct { Leechers int32 Seeders int32 - Peers krpc.CompactIPv4NodeAddrs + Peers []krpc.NodeAddr } type server struct { @@ -91,7 +93,14 @@ func (s *server) serveOne() (err error) { return } t := s.t[ar.InfoHash] - b, err = t.Peers.MarshalBinary() + bm := func() encoding.BinaryMarshaler { + ip := missinggo.AddrIP(addr) + if ip.To4() != nil { + return krpc.CompactIPv4NodeAddrs(t.Peers) + } + return krpc.CompactIPv6NodeAddrs(t.Peers) + }() + b, err = bm.MarshalBinary() if err != nil { panic(err) } diff --git a/tracker/tracker.go b/tracker/tracker.go index f2cea82e..fd24b0be 100644 --- a/tracker/tracker.go +++ b/tracker/tracker.go @@ -4,6 +4,8 @@ import ( "errors" "net/http" "net/url" + + "github.com/anacrolix/dht/krpc" ) // Marshalled as binary by the UDP client, so be careful making changes. @@ -47,22 +49,29 @@ var ( ErrBadScheme = errors.New("unknown scheme") ) -// TODO: Just split udp/http announcing completely, to support various different options they have. - -func Announce(cl *http.Client, userAgent string, urlStr string, req *AnnounceRequest) (res AnnounceResponse, err error) { - return AnnounceHost(cl, userAgent, urlStr, req, "") +type Announce struct { + TrackerUrl string + Request AnnounceRequest + HostHeader string + UserAgent string + HttpClient *http.Client + UdpNetwork string + ClientIp4 krpc.NodeAddr + ClientIp6 krpc.NodeAddr } -func AnnounceHost(cl *http.Client, userAgent string, urlStr string, req *AnnounceRequest, host string) (res AnnounceResponse, err error) { - _url, err := url.Parse(urlStr) +// In an FP language with currying, what order what you put these params? + +func (me Announce) Do() (res AnnounceResponse, err error) { + _url, err := url.Parse(me.TrackerUrl) if err != nil { return } switch _url.Scheme { case "http", "https": - return announceHTTP(cl, userAgent, req, _url, host) - case "udp": - return announceUDP(req, _url) + return announceHTTP(me, _url) + case "udp", "udp4", "udp6": + return announceUDP(me, _url) default: err = ErrBadScheme return diff --git a/tracker/tracker_test.go b/tracker/tracker_test.go index 24157ec6..fe565564 100644 --- a/tracker/tracker_test.go +++ b/tracker/tracker_test.go @@ -6,6 +6,8 @@ import ( "net/http" "testing" "time" + + "github.com/stretchr/testify/require" ) var defaultClient = &http.Client{ @@ -21,8 +23,6 @@ var defaultClient = &http.Client{ func TestUnsupportedTrackerScheme(t *testing.T) { t.Parallel() - _, err := Announce(defaultClient, defaultHTTPUserAgent, "lol://tracker.openbittorrent.com:80/announce", nil) - if err != ErrBadScheme { - t.Fatal(err) - } + _, err := Announce{TrackerUrl: "lol://tracker.openbittorrent.com:80/announce"}.Do() + require.Equal(t, ErrBadScheme, err) } diff --git a/tracker/udp.go b/tracker/udp.go index 8c3b632d..babbbefb 100644 --- a/tracker/udp.go +++ b/tracker/udp.go @@ -2,6 +2,7 @@ package tracker import ( "bytes" + "encoding" "encoding/binary" "errors" "fmt" @@ -80,6 +81,7 @@ type udpAnnounce struct { connectionId int64 socket net.Conn url url.URL + a *Announce } func (c *udpAnnounce) Close() error { @@ -89,6 +91,14 @@ func (c *udpAnnounce) Close() error { return nil } +func (c *udpAnnounce) ipv6() bool { + if c.a.UdpNetwork == "udp6" { + return true + } + rip := missinggo.AddrIP(c.socket.RemoteAddr()) + return rip.To16() != nil && rip.To4() == nil +} + func (c *udpAnnounce) Do(req *AnnounceRequest) (res AnnounceResponse, err error) { err = c.connect() if err != nil { @@ -114,16 +124,22 @@ func (c *udpAnnounce) Do(req *AnnounceRequest) (res AnnounceResponse, err error) res.Interval = h.Interval res.Leechers = h.Leechers res.Seeders = h.Seeders - var cps krpc.CompactIPv4NodeAddrs - err = cps.UnmarshalBinary(b.Bytes()) + nas := func() interface { + encoding.BinaryUnmarshaler + NodeAddrs() []krpc.NodeAddr + } { + if c.ipv6() { + return &krpc.CompactIPv6NodeAddrs{} + } else { + return &krpc.CompactIPv4NodeAddrs{} + } + }() + err = nas.UnmarshalBinary(b.Bytes()) if err != nil { return } - for _, cp := range cps { - res.Peers = append(res.Peers, Peer{ - IP: cp.IP[:], - Port: int(cp.Port), - }) + for _, cp := range nas.NodeAddrs() { + res.Peers = append(res.Peers, Peer{}.FromNodeAddr(cp)) } return } @@ -226,6 +242,13 @@ func (c *udpAnnounce) connected() bool { return !c.connectionIdReceived.IsZero() && time.Now().Before(c.connectionIdReceived.Add(time.Minute)) } +func (c *udpAnnounce) dialNetwork() string { + if c.a.UdpNetwork != "" { + return c.a.UdpNetwork + } + return "udp" +} + func (c *udpAnnounce) connect() (err error) { if c.connected() { return nil @@ -237,7 +260,7 @@ func (c *udpAnnounce) connect() (err error) { hmp.NoPort = false hmp.Port = 80 } - c.socket, err = net.Dial("udp", hmp.String()) + c.socket, err = net.Dial(c.dialNetwork(), hmp.String()) if err != nil { return } @@ -259,10 +282,11 @@ func (c *udpAnnounce) connect() (err error) { // TODO: Split on IPv6, as BEP 15 says response peer decoding depends on // network in use. -func announceUDP(ar *AnnounceRequest, _url *url.URL) (AnnounceResponse, error) { +func announceUDP(opt Announce, _url *url.URL) (AnnounceResponse, error) { ua := udpAnnounce{ url: *_url, + a: &opt, } defer ua.Close() - return ua.Do(ar) + return ua.Do(&opt.Request) } diff --git a/tracker/udp_test.go b/tracker/udp_test.go index 1c064071..a6d4cb15 100644 --- a/tracker/udp_test.go +++ b/tracker/udp_test.go @@ -116,7 +116,10 @@ func TestAnnounceLocalhost(t *testing.T) { go func() { require.NoError(t, srv.serveOne()) }() - ar, err := Announce(defaultClient, defaultHTTPUserAgent, fmt.Sprintf("udp://%s/announce", srv.pc.LocalAddr().String()), &req) + ar, err := Announce{ + TrackerUrl: fmt.Sprintf("udp://%s/announce", srv.pc.LocalAddr().String()), + Request: req, + }.Do() require.NoError(t, err) assert.EqualValues(t, 1, ar.Seeders) assert.EqualValues(t, 2, len(ar.Peers)) @@ -132,7 +135,10 @@ func TestUDPTracker(t *testing.T) { } rand.Read(req.PeerId[:]) copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1}) - ar, err := Announce(defaultClient, defaultHTTPUserAgent, trackers[0], &req) + ar, err := Announce{ + TrackerUrl: trackers[0], + Request: req, + }.Do() // Skip any net errors as we don't control the server. if _, ok := err.(net.Error); ok { t.Skip(err) @@ -160,7 +166,10 @@ func TestAnnounceRandomInfoHashThirdParty(t *testing.T) { wg.Add(1) go func(url string) { defer wg.Done() - resp, err := Announce(defaultClient, defaultHTTPUserAgent, url, &req) + resp, err := Announce{ + TrackerUrl: url, + Request: req, + }.Do() if err != nil { t.Logf("error announcing to %s: %s", url, err) return @@ -196,11 +205,13 @@ func TestURLPathOption(t *testing.T) { } defer conn.Close() go func() { - _, err := Announce(defaultClient, defaultHTTPUserAgent, (&url.URL{ - Scheme: "udp", - Host: conn.LocalAddr().String(), - Path: "/announce", - }).String(), &AnnounceRequest{}) + _, err := Announce{ + TrackerUrl: (&url.URL{ + Scheme: "udp", + Host: conn.LocalAddr().String(), + Path: "/announce", + }).String(), + }.Do() if err != nil { defer conn.Close() } diff --git a/tracker_scraper.go b/tracker_scraper.go index 7434bdcc..d8065b69 100644 --- a/tracker_scraper.go +++ b/tracker_scraper.go @@ -4,6 +4,8 @@ import ( "bytes" "errors" "fmt" + "net" + "net/url" "time" "github.com/anacrolix/missinggo" @@ -14,7 +16,7 @@ import ( // Announces a torrent to a tracker at regular intervals, when peers are // required. type trackerScraper struct { - url string + u url.URL // Causes the trackerScraper to stop running. stop missinggo.Event t *Torrent @@ -24,7 +26,7 @@ type trackerScraper struct { func (ts *trackerScraper) statusLine() string { var w bytes.Buffer fmt.Fprintf(&w, "%q\t%s\t%s", - ts.url, + ts.u.String(), func() string { na := time.Until(ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval)) if na > 0 { @@ -43,7 +45,8 @@ func (ts *trackerScraper) statusLine() string { return "never" } return fmt.Sprintf("%d peers", ts.lastAnnounce.NumPeers) - }()) + }(), + ) return w.String() } @@ -54,18 +57,43 @@ type trackerAnnounceResult struct { Completed time.Time } -func trackerToTorrentPeers(ps []tracker.Peer) (ret []Peer) { - ret = make([]Peer, 0, len(ps)) - for _, p := range ps { - ret = append(ret, Peer{ - IP: p.IP, - Port: p.Port, - Source: peerSourceTracker, - }) +func (me *trackerScraper) getIp() (ip net.IP, err error) { + ips, err := net.LookupIP(me.u.Hostname()) + if err != nil { + return } + if len(ips) == 0 { + err = errors.New("no ips") + return + } + for _, ip = range ips { + if me.t.cl.ipIsBlocked(ip) { + continue + } + switch me.u.Scheme { + case "udp4": + if ip.To4() == nil { + continue + } + case "udp6": + if ip.To4() != nil { + continue + } + } + return + } + err = errors.New("no acceptable ips") return } +func (me *trackerScraper) trackerUrl(ip net.IP) string { + u := me.u + if u.Port() != "" { + u.Host = net.JoinHostPort(ip.String(), u.Port()) + } + return u.String() +} + // Return how long to wait before trying again. For most errors, we return 5 // minutes, a relatively quick turn around for DNS changes. func (me *trackerScraper) announce() (ret trackerAnnounceResult) { @@ -73,24 +101,26 @@ func (me *trackerScraper) announce() (ret trackerAnnounceResult) { ret.Completed = time.Now() }() ret.Interval = 5 * time.Minute - blocked, urlToUse, host, err := me.t.cl.prepareTrackerAnnounceUnlocked(me.url) + ip, err := me.getIp() if err != nil { - ret.Err = err - return - } - if blocked { - ret.Err = errors.New("blocked by IP") + ret.Err = fmt.Errorf("error getting ip: %s", err) return } me.t.cl.mu.Lock() req := me.t.announceRequest() me.t.cl.mu.Unlock() - res, err := tracker.AnnounceHost(me.t.cl.config.HTTP, me.t.cl.config.HTTPUserAgent, urlToUse, &req, host) + res, err := tracker.Announce{ + HttpClient: me.t.cl.config.HTTP, + UserAgent: me.t.cl.config.HTTPUserAgent, + TrackerUrl: me.trackerUrl(ip), + Request: req, + HostHeader: me.u.Host, + }.Do() if err != nil { - ret.Err = err + ret.Err = fmt.Errorf("error announcing: %s", err) return } - me.t.AddPeers(trackerToTorrentPeers(res.Peers)) + me.t.AddPeers(Peers(nil).FromTracker(res.Peers)) ret.NumPeers = len(res.Peers) ret.Interval = time.Duration(res.Interval) * time.Second return