IPv6 tracker support

This commit is contained in:
Matt Joiner 2018-02-19 16:19:18 +11:00
parent 9513270c06
commit 8f7408b1f9
12 changed files with 207 additions and 105 deletions

View File

@ -1,6 +1,9 @@
package torrent
import "github.com/anacrolix/dht/krpc"
import (
"github.com/anacrolix/dht/krpc"
"github.com/anacrolix/torrent/tracker"
)
type Peers []Peer
@ -15,3 +18,16 @@ func (me *Peers) FromPex(nas []krpc.NodeAddr, fs []pexPeerFlags) {
*me = append(*me, p)
}
}
func (ret Peers) FromTracker(ps []tracker.Peer) Peers {
for _, p := range ps {
_p := Peer{
IP: p.IP,
Port: p.Port,
Source: peerSourceTracker,
}
copy(_p.Id[:], p.ID)
ret = append(ret, _p)
}
return ret
}

View File

@ -5,12 +5,12 @@ import (
"bytes"
"context"
"crypto/rand"
"encoding/binary"
"errors"
"expvar"
"fmt"
"io"
"net"
"net/url"
"strconv"
"strings"
"time"
@ -122,6 +122,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
fmt.Fprintln(w, "Not listening!")
}
fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
if dht := cl.DHT(); dht != nil {
dhtStats := dht.Stats()
@ -237,6 +238,10 @@ func (cl *Client) initLogger() {
cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
}
func (cl *Client) announceKey() int32 {
return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
}
// Creates a new client.
func NewClient(cfg *Config) (cl *Client, err error) {
if cfg == nil {
@ -402,6 +407,11 @@ func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
return cl.ipBlockList.Lookup(ip)
}
func (cl *Client) ipIsBlocked(ip net.IP) bool {
_, blocked := cl.ipBlockRange(ip)
return blocked
}
func (cl *Client) waitAccept() {
for {
for _, t := range cl.torrents {
@ -1114,33 +1124,6 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
return
}
func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
_url, err := url.Parse(announceURL)
if err != nil {
return
}
hmp := missinggo.SplitHostMaybePort(_url.Host)
if hmp.Err != nil {
err = hmp.Err
return
}
addr, err := net.ResolveIPAddr("ip", hmp.Host)
if err != nil {
return
}
if addr.IP == nil {
panic(hmp.Host)
}
cl.mu.RLock()
_, blocked = cl.ipBlockRange(addr.IP)
cl.mu.RUnlock()
host = _url.Host
hmp.Host = addr.String()
_url.Host = hmp.String()
urlToUse = _url.String()
return
}
func (cl *Client) allTorrentsCompleted() bool {
for _, t := range cl.torrents {
if !t.haveInfo() {

View File

@ -902,15 +902,6 @@ func TestPieceCompletedInStorageButNotClient(t *testing.T) {
})
}
func TestPrepareTrackerAnnounce(t *testing.T) {
cl := &Client{}
blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
require.NoError(t, err)
assert.False(t, blocked)
assert.EqualValues(t, "localhost:1234", host)
assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
}
// Check that when the listen port is 0, all the protocols listened on have
// the same port, and it isn't zero.
func TestClientDynamicListenPortAllProtocols(t *testing.T) {

View File

@ -9,6 +9,7 @@ import (
"math"
"math/rand"
"net"
"net/url"
"os"
"strconv"
"sync"
@ -585,7 +586,7 @@ func (t *Torrent) writeStatus(w io.Writer) {
tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
fmt.Fprintf(tw, " URL\tNext announce\tLast announce\n")
for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
return l.url < r.url
return l.u.String() < r.u.String()
}).([]*trackerScraper) {
fmt.Fprintf(tw, " %s\n", ta.statusLine())
}
@ -1303,21 +1304,35 @@ func (t *Torrent) seeding() bool {
return true
}
func (t *Torrent) startScrapingTracker(url string) {
if url == "" {
func (t *Torrent) startScrapingTracker(_url string) {
if _url == "" {
return
}
if _, ok := t.trackerAnnouncers[url]; ok {
u, _ := url.Parse(_url)
if u.Scheme == "udp" {
u.Scheme = "udp4"
t.startScrapingTracker(u.String())
u.Scheme = "udp6"
t.startScrapingTracker(u.String())
return
}
if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
return
}
if u.Scheme == "udp6" && t.cl.config.DisableIPv6 {
return
}
if _, ok := t.trackerAnnouncers[_url]; ok {
return
}
newAnnouncer := &trackerScraper{
url: url,
t: t,
u: *u,
t: t,
}
if t.trackerAnnouncers == nil {
t.trackerAnnouncers = make(map[string]*trackerScraper)
}
t.trackerAnnouncers[url] = newAnnouncer
t.trackerAnnouncers[_url] = newAnnouncer
go newAnnouncer.Run()
}
@ -1345,6 +1360,7 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest {
PeerId: t.cl.peerID,
InfoHash: t.infoHash,
Left: t.bytesLeftAnnounce(),
Key: t.cl.announceKey(),
}
}

View File

@ -64,7 +64,7 @@ func (me *Peers) UnmarshalBencode(b []byte) (err error) {
}
}
func setAnnounceParams(_url *url.URL, ar *AnnounceRequest) {
func setAnnounceParams(_url *url.URL, ar *AnnounceRequest, opts Announce) {
q := _url.Query()
q.Set("info_hash", string(ar.InfoHash[:]))
@ -81,17 +81,22 @@ func setAnnounceParams(_url *url.URL, ar *AnnounceRequest) {
// According to https://wiki.vuze.com/w/Message_Stream_Encryption. TODO:
// Take EncryptionPolicy or something like it as a parameter.
q.Set("supportcrypto", "1")
if opts.ClientIp4.IP != nil {
q.Set("ipv4", opts.ClientIp4.String())
}
if opts.ClientIp6.IP != nil {
q.Set("ipv6", opts.ClientIp6.String())
}
_url.RawQuery = q.Encode()
}
func announceHTTP(cl *http.Client, userAgent string, ar *AnnounceRequest, _url *url.URL, host string) (ret AnnounceResponse, err error) {
func announceHTTP(opt Announce, _url *url.URL) (ret AnnounceResponse, err error) {
_url = httptoo.CopyURL(_url)
setAnnounceParams(_url, ar)
setAnnounceParams(_url, &opt.Request, opt)
req, err := http.NewRequest("GET", _url.String(), nil)
req.Header.Set("User-Agent", userAgent)
req.Host = host
resp, err := cl.Do(req)
req.Header.Set("User-Agent", opt.UserAgent)
req.Host = opt.HostHeader
resp, err := opt.HttpClient.Do(req)
if err != nil {
return
}

View File

@ -2,6 +2,8 @@ package tracker
import (
"net"
"github.com/anacrolix/dht/krpc"
)
type Peer struct {
@ -16,3 +18,9 @@ func (p *Peer) fromDictInterface(d map[string]interface{}) {
p.ID = []byte(d["peer id"].(string))
p.Port = int(d["port"].(int64))
}
func (p Peer) FromNodeAddr(na krpc.NodeAddr) Peer {
p.IP = na.IP
p.Port = na.Port
return p
}

View File

@ -2,18 +2,20 @@ package tracker
import (
"bytes"
"encoding"
"encoding/binary"
"fmt"
"math/rand"
"net"
"github.com/anacrolix/dht/krpc"
"github.com/anacrolix/missinggo"
)
type torrent struct {
Leechers int32
Seeders int32
Peers krpc.CompactIPv4NodeAddrs
Peers []krpc.NodeAddr
}
type server struct {
@ -91,7 +93,14 @@ func (s *server) serveOne() (err error) {
return
}
t := s.t[ar.InfoHash]
b, err = t.Peers.MarshalBinary()
bm := func() encoding.BinaryMarshaler {
ip := missinggo.AddrIP(addr)
if ip.To4() != nil {
return krpc.CompactIPv4NodeAddrs(t.Peers)
}
return krpc.CompactIPv6NodeAddrs(t.Peers)
}()
b, err = bm.MarshalBinary()
if err != nil {
panic(err)
}

View File

@ -4,6 +4,8 @@ import (
"errors"
"net/http"
"net/url"
"github.com/anacrolix/dht/krpc"
)
// Marshalled as binary by the UDP client, so be careful making changes.
@ -47,22 +49,29 @@ var (
ErrBadScheme = errors.New("unknown scheme")
)
// TODO: Just split udp/http announcing completely, to support various different options they have.
func Announce(cl *http.Client, userAgent string, urlStr string, req *AnnounceRequest) (res AnnounceResponse, err error) {
return AnnounceHost(cl, userAgent, urlStr, req, "")
type Announce struct {
TrackerUrl string
Request AnnounceRequest
HostHeader string
UserAgent string
HttpClient *http.Client
UdpNetwork string
ClientIp4 krpc.NodeAddr
ClientIp6 krpc.NodeAddr
}
func AnnounceHost(cl *http.Client, userAgent string, urlStr string, req *AnnounceRequest, host string) (res AnnounceResponse, err error) {
_url, err := url.Parse(urlStr)
// In an FP language with currying, what order what you put these params?
func (me Announce) Do() (res AnnounceResponse, err error) {
_url, err := url.Parse(me.TrackerUrl)
if err != nil {
return
}
switch _url.Scheme {
case "http", "https":
return announceHTTP(cl, userAgent, req, _url, host)
case "udp":
return announceUDP(req, _url)
return announceHTTP(me, _url)
case "udp", "udp4", "udp6":
return announceUDP(me, _url)
default:
err = ErrBadScheme
return

View File

@ -6,6 +6,8 @@ import (
"net/http"
"testing"
"time"
"github.com/stretchr/testify/require"
)
var defaultClient = &http.Client{
@ -21,8 +23,6 @@ var defaultClient = &http.Client{
func TestUnsupportedTrackerScheme(t *testing.T) {
t.Parallel()
_, err := Announce(defaultClient, defaultHTTPUserAgent, "lol://tracker.openbittorrent.com:80/announce", nil)
if err != ErrBadScheme {
t.Fatal(err)
}
_, err := Announce{TrackerUrl: "lol://tracker.openbittorrent.com:80/announce"}.Do()
require.Equal(t, ErrBadScheme, err)
}

View File

@ -2,6 +2,7 @@ package tracker
import (
"bytes"
"encoding"
"encoding/binary"
"errors"
"fmt"
@ -80,6 +81,7 @@ type udpAnnounce struct {
connectionId int64
socket net.Conn
url url.URL
a *Announce
}
func (c *udpAnnounce) Close() error {
@ -89,6 +91,14 @@ func (c *udpAnnounce) Close() error {
return nil
}
func (c *udpAnnounce) ipv6() bool {
if c.a.UdpNetwork == "udp6" {
return true
}
rip := missinggo.AddrIP(c.socket.RemoteAddr())
return rip.To16() != nil && rip.To4() == nil
}
func (c *udpAnnounce) Do(req *AnnounceRequest) (res AnnounceResponse, err error) {
err = c.connect()
if err != nil {
@ -114,16 +124,22 @@ func (c *udpAnnounce) Do(req *AnnounceRequest) (res AnnounceResponse, err error)
res.Interval = h.Interval
res.Leechers = h.Leechers
res.Seeders = h.Seeders
var cps krpc.CompactIPv4NodeAddrs
err = cps.UnmarshalBinary(b.Bytes())
nas := func() interface {
encoding.BinaryUnmarshaler
NodeAddrs() []krpc.NodeAddr
} {
if c.ipv6() {
return &krpc.CompactIPv6NodeAddrs{}
} else {
return &krpc.CompactIPv4NodeAddrs{}
}
}()
err = nas.UnmarshalBinary(b.Bytes())
if err != nil {
return
}
for _, cp := range cps {
res.Peers = append(res.Peers, Peer{
IP: cp.IP[:],
Port: int(cp.Port),
})
for _, cp := range nas.NodeAddrs() {
res.Peers = append(res.Peers, Peer{}.FromNodeAddr(cp))
}
return
}
@ -226,6 +242,13 @@ func (c *udpAnnounce) connected() bool {
return !c.connectionIdReceived.IsZero() && time.Now().Before(c.connectionIdReceived.Add(time.Minute))
}
func (c *udpAnnounce) dialNetwork() string {
if c.a.UdpNetwork != "" {
return c.a.UdpNetwork
}
return "udp"
}
func (c *udpAnnounce) connect() (err error) {
if c.connected() {
return nil
@ -237,7 +260,7 @@ func (c *udpAnnounce) connect() (err error) {
hmp.NoPort = false
hmp.Port = 80
}
c.socket, err = net.Dial("udp", hmp.String())
c.socket, err = net.Dial(c.dialNetwork(), hmp.String())
if err != nil {
return
}
@ -259,10 +282,11 @@ func (c *udpAnnounce) connect() (err error) {
// TODO: Split on IPv6, as BEP 15 says response peer decoding depends on
// network in use.
func announceUDP(ar *AnnounceRequest, _url *url.URL) (AnnounceResponse, error) {
func announceUDP(opt Announce, _url *url.URL) (AnnounceResponse, error) {
ua := udpAnnounce{
url: *_url,
a: &opt,
}
defer ua.Close()
return ua.Do(ar)
return ua.Do(&opt.Request)
}

View File

@ -116,7 +116,10 @@ func TestAnnounceLocalhost(t *testing.T) {
go func() {
require.NoError(t, srv.serveOne())
}()
ar, err := Announce(defaultClient, defaultHTTPUserAgent, fmt.Sprintf("udp://%s/announce", srv.pc.LocalAddr().String()), &req)
ar, err := Announce{
TrackerUrl: fmt.Sprintf("udp://%s/announce", srv.pc.LocalAddr().String()),
Request: req,
}.Do()
require.NoError(t, err)
assert.EqualValues(t, 1, ar.Seeders)
assert.EqualValues(t, 2, len(ar.Peers))
@ -132,7 +135,10 @@ func TestUDPTracker(t *testing.T) {
}
rand.Read(req.PeerId[:])
copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1})
ar, err := Announce(defaultClient, defaultHTTPUserAgent, trackers[0], &req)
ar, err := Announce{
TrackerUrl: trackers[0],
Request: req,
}.Do()
// Skip any net errors as we don't control the server.
if _, ok := err.(net.Error); ok {
t.Skip(err)
@ -160,7 +166,10 @@ func TestAnnounceRandomInfoHashThirdParty(t *testing.T) {
wg.Add(1)
go func(url string) {
defer wg.Done()
resp, err := Announce(defaultClient, defaultHTTPUserAgent, url, &req)
resp, err := Announce{
TrackerUrl: url,
Request: req,
}.Do()
if err != nil {
t.Logf("error announcing to %s: %s", url, err)
return
@ -196,11 +205,13 @@ func TestURLPathOption(t *testing.T) {
}
defer conn.Close()
go func() {
_, err := Announce(defaultClient, defaultHTTPUserAgent, (&url.URL{
Scheme: "udp",
Host: conn.LocalAddr().String(),
Path: "/announce",
}).String(), &AnnounceRequest{})
_, err := Announce{
TrackerUrl: (&url.URL{
Scheme: "udp",
Host: conn.LocalAddr().String(),
Path: "/announce",
}).String(),
}.Do()
if err != nil {
defer conn.Close()
}

View File

@ -4,6 +4,8 @@ import (
"bytes"
"errors"
"fmt"
"net"
"net/url"
"time"
"github.com/anacrolix/missinggo"
@ -14,7 +16,7 @@ import (
// Announces a torrent to a tracker at regular intervals, when peers are
// required.
type trackerScraper struct {
url string
u url.URL
// Causes the trackerScraper to stop running.
stop missinggo.Event
t *Torrent
@ -24,7 +26,7 @@ type trackerScraper struct {
func (ts *trackerScraper) statusLine() string {
var w bytes.Buffer
fmt.Fprintf(&w, "%q\t%s\t%s",
ts.url,
ts.u.String(),
func() string {
na := time.Until(ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval))
if na > 0 {
@ -43,7 +45,8 @@ func (ts *trackerScraper) statusLine() string {
return "never"
}
return fmt.Sprintf("%d peers", ts.lastAnnounce.NumPeers)
}())
}(),
)
return w.String()
}
@ -54,18 +57,43 @@ type trackerAnnounceResult struct {
Completed time.Time
}
func trackerToTorrentPeers(ps []tracker.Peer) (ret []Peer) {
ret = make([]Peer, 0, len(ps))
for _, p := range ps {
ret = append(ret, Peer{
IP: p.IP,
Port: p.Port,
Source: peerSourceTracker,
})
func (me *trackerScraper) getIp() (ip net.IP, err error) {
ips, err := net.LookupIP(me.u.Hostname())
if err != nil {
return
}
if len(ips) == 0 {
err = errors.New("no ips")
return
}
for _, ip = range ips {
if me.t.cl.ipIsBlocked(ip) {
continue
}
switch me.u.Scheme {
case "udp4":
if ip.To4() == nil {
continue
}
case "udp6":
if ip.To4() != nil {
continue
}
}
return
}
err = errors.New("no acceptable ips")
return
}
func (me *trackerScraper) trackerUrl(ip net.IP) string {
u := me.u
if u.Port() != "" {
u.Host = net.JoinHostPort(ip.String(), u.Port())
}
return u.String()
}
// Return how long to wait before trying again. For most errors, we return 5
// minutes, a relatively quick turn around for DNS changes.
func (me *trackerScraper) announce() (ret trackerAnnounceResult) {
@ -73,24 +101,26 @@ func (me *trackerScraper) announce() (ret trackerAnnounceResult) {
ret.Completed = time.Now()
}()
ret.Interval = 5 * time.Minute
blocked, urlToUse, host, err := me.t.cl.prepareTrackerAnnounceUnlocked(me.url)
ip, err := me.getIp()
if err != nil {
ret.Err = err
return
}
if blocked {
ret.Err = errors.New("blocked by IP")
ret.Err = fmt.Errorf("error getting ip: %s", err)
return
}
me.t.cl.mu.Lock()
req := me.t.announceRequest()
me.t.cl.mu.Unlock()
res, err := tracker.AnnounceHost(me.t.cl.config.HTTP, me.t.cl.config.HTTPUserAgent, urlToUse, &req, host)
res, err := tracker.Announce{
HttpClient: me.t.cl.config.HTTP,
UserAgent: me.t.cl.config.HTTPUserAgent,
TrackerUrl: me.trackerUrl(ip),
Request: req,
HostHeader: me.u.Host,
}.Do()
if err != nil {
ret.Err = err
ret.Err = fmt.Errorf("error announcing: %s", err)
return
}
me.t.AddPeers(trackerToTorrentPeers(res.Peers))
me.t.AddPeers(Peers(nil).FromTracker(res.Peers))
ret.NumPeers = len(res.Peers)
ret.Interval = time.Duration(res.Interval) * time.Second
return