Rework conns to/and allow multiple DHT servers

This will help with #229, and IPv6 support.
This commit is contained in:
Matt Joiner 2018-04-12 11:41:07 +10:00
parent a101ebb07e
commit 319e57d1c6
14 changed files with 457 additions and 357 deletions

View File

@ -226,9 +226,9 @@ func getDictField(dict reflect.Value, key string) dictField {
})
}
return dictField{
Value: dict.FieldByIndex(sf.Index),
Ok: true,
Set: func() {},
Value: dict.FieldByIndex(sf.Index),
Ok: true,
Set: func() {},
IgnoreUnmarshalTypeError: getTag(sf.Tag).IgnoreUnmarshalTypeError(),
}
default:

505
client.go
View File

@ -7,7 +7,6 @@ import (
"crypto/rand"
"encoding/binary"
"errors"
"expvar"
"fmt"
"io"
"net"
@ -49,16 +48,13 @@ type Client struct {
peerID PeerID
defaultStorage *storage.Client
onClose []func()
tcpListener net.Listener
utpSock utpSocket
dHT *dht.Server
conns []socket
dhtServers []*dht.Server
ipBlockList iplist.Ranger
// Our BitTorrent protocol extension bytes, sent in our BT handshakes.
extensionBytes peerExtensionBytes
// The net.Addr.String part that should be common to all active listeners.
listenAddr string
uploadLimit *rate.Limiter
downloadLimit *rate.Limiter
uploadLimit *rate.Limiter
downloadLimit *rate.Limiter
// Set of addresses that have our client ID. This intentionally will
// include ourselves if we end up trying to connect to our own address
@ -78,21 +74,6 @@ func (cl *Client) badPeerIPsLocked() []string {
return slices.FromMapKeys(cl.badPeerIPs).([]string)
}
func (cl *Client) IPBlockList() iplist.Ranger {
cl.mu.Lock()
defer cl.mu.Unlock()
return cl.ipBlockList
}
func (cl *Client) SetIPBlockList(list iplist.Ranger) {
cl.mu.Lock()
defer cl.mu.Unlock()
cl.ipBlockList = list
if cl.dHT != nil {
cl.dHT.SetIPBlockList(list)
}
}
func (cl *Client) PeerID() PeerID {
return cl.peerID
}
@ -103,11 +84,29 @@ func (torrentAddr) Network() string { return "" }
func (me torrentAddr) String() string { return string(me) }
func (cl *Client) ListenAddr() net.Addr {
if cl.listenAddr == "" {
return nil
}
return torrentAddr(cl.listenAddr)
func (cl *Client) LocalPort() (port int) {
cl.eachListener(func(l socket) bool {
_port := missinggo.AddrPort(l.Addr())
if _port == 0 {
panic(l)
}
if port == 0 {
port = _port
} else if port != _port {
panic("mismatched ports")
}
return true
})
return
}
func writeDhtServerStatus(w io.Writer, s *dht.Server) {
dhtStats := s.Stats()
fmt.Fprintf(w, "\tDHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
fmt.Fprintf(w, "\tDHT Server ID: %x\n", s.ID())
fmt.Fprintf(w, "\tDHT port: %d\n", missinggo.AddrPort(s.Addr()))
fmt.Fprintf(w, "\tDHT announces: %d\n", dhtStats.ConfirmedAnnounces)
fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
}
// Writes out a human readable status of the client, such as for writing to a
@ -117,22 +116,14 @@ func (cl *Client) WriteStatus(_w io.Writer) {
defer cl.mu.Unlock()
w := bufio.NewWriter(_w)
defer w.Flush()
if addr := cl.ListenAddr(); addr != nil {
fmt.Fprintf(w, "Listening on %s\n", addr)
} else {
fmt.Fprintln(w, "Not listening!")
}
fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
if dht := cl.DHT(); dht != nil {
dhtStats := dht.Stats()
fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
fmt.Fprintf(w, "DHT Server ID: %x\n", dht.ID())
fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(dht.Addr()))
fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
}
cl.eachDhtServer(func(s *dht.Server) {
fmt.Fprintf(w, "%s DHT server:\n", s.Addr().Network())
writeDhtServerStatus(w, s)
})
fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
fmt.Fprintln(w)
for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
@ -155,76 +146,6 @@ func (cl *Client) WriteStatus(_w io.Writer) {
}
}
func listenUTP(networkSuffix, addr string) (utpSocket, error) {
return NewUtpSocket("udp"+networkSuffix, addr)
}
func listenTCP(networkSuffix, addr string) (net.Listener, error) {
return net.Listen("tcp"+networkSuffix, addr)
}
func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
for {
tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
if err != nil {
return
}
listenedAddr = tcpL.Addr().String()
utpSock, err = listenUTP(networkSuffix, listenedAddr)
if err == nil {
return
}
tcpL.Close()
if !strings.Contains(err.Error(), "address already in use") {
return
}
}
}
// Listen to enabled protocols, ensuring ports match.
func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
if addr == "" {
addr = ":50007"
}
if tcp && utp {
var host string
var port int
host, port, err = missinggo.ParseHostPort(addr)
if err != nil {
return
}
if port == 0 {
// If both protocols are active, they need to have the same port.
return listenBothSameDynamicPort(networkSuffix, host)
}
}
defer func() {
if err != nil {
listenedAddr = ""
}
}()
if tcp {
tcpL, err = listenTCP(networkSuffix, addr)
if err != nil {
return
}
defer func() {
if err != nil {
tcpL.Close()
}
}()
listenedAddr = tcpL.Addr().String()
}
if utp {
utpSock, err = listenUTP(networkSuffix, addr)
if err != nil {
return
}
listenedAddr = utpSock.Addr().String()
}
return
}
const debugLogValue = "debug"
func (cl *Client) debugLogFilter(m *log.Msg) bool {
@ -243,15 +164,7 @@ func (cl *Client) announceKey() int32 {
return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
}
// Creates a new client.
func NewClient(cfg *Config) (cl *Client, err error) {
if cfg == nil {
cfg = &Config{
DHTConfig: dht.ServerConfig{
StartingNodes: dht.GlobalBootstrapAddrs,
},
}
}
if cfg == nil {
cfg = &Config{}
}
@ -312,53 +225,64 @@ func NewClient(cfg *Config) (cl *Client, err error) {
}
}
cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
!cl.config.DisableTCP,
!cl.config.DisableUTP,
// We'll listen to IPv4 for TCP even if IPv4 peer connections are
// disabled because we want to ensure peers don't connect to some
// other process on that port.
ipNetworkSuffix(!cl.config.DisableIPv4, !cl.config.DisableIPv6),
cl.config.ListenAddr)
cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenAddr)
if err != nil {
return
}
go cl.forwardPort()
if cl.tcpListener != nil {
go cl.acceptConnections(cl.tcpListener, false)
}
if cl.utpSock != nil {
go cl.acceptConnections(cl.utpSock, true)
}
if !cfg.NoDHT {
dhtCfg := cfg.DHTConfig
if dhtCfg.IPBlocklist == nil {
dhtCfg.IPBlocklist = cl.ipBlockList
cl.LocalPort()
for _, s := range cl.conns {
if peerNetworkEnabled(s.Addr().Network(), cl.config) {
go cl.acceptConnections(s)
}
if dhtCfg.Conn == nil {
if cl.utpSock != nil {
dhtCfg.Conn = cl.utpSock
} else {
dhtCfg.Conn, err = net.ListenPacket("udp", firstNonEmptyString(cl.listenAddr, cl.config.ListenAddr))
}
go cl.forwardPort()
if !cfg.NoDHT {
for _, s := range cl.conns {
if pc, ok := s.(net.PacketConn); ok {
ds, err := cl.newDhtServer(pc)
if err != nil {
return
panic(err)
}
cl.dhtServers = append(cl.dhtServers, ds)
}
}
if dhtCfg.OnAnnouncePeer == nil {
dhtCfg.OnAnnouncePeer = cl.onDHTAnnouncePeer
}
cl.dHT, err = dht.NewServer(&dhtCfg)
if err != nil {
return
}
return
}
func (cl *Client) enabledPeerNetworks() (ns []string) {
for _, n := range allPeerNetworks {
if peerNetworkEnabled(n, cl.config) {
ns = append(ns, n)
}
}
return
}
func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
cfg := dht.ServerConfig{
IPBlocklist: cl.ipBlockList,
Conn: conn,
OnAnnouncePeer: cl.onDHTAnnouncePeer,
PublicIP: func() net.IP {
if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
return cl.config.PublicIp6
}
return cl.config.PublicIp4
}(),
StartingNodes: cl.config.DhtStartingNodes,
}
s, err = dht.NewServer(&cfg)
if err == nil {
go func() {
if _, err := cl.dHT.Bootstrap(); err != nil {
if _, err := s.Bootstrap(); err != nil {
log.Printf("error bootstrapping dht: %s", err)
}
}()
}
return
}
@ -377,21 +301,28 @@ func (cl *Client) Closed() <-chan struct{} {
return cl.closed.C()
}
func (cl *Client) eachDhtServer(f func(*dht.Server)) {
for _, ds := range cl.dhtServers {
f(ds)
}
}
func (cl *Client) closeSockets() {
cl.eachListener(func(l socket) bool {
l.Close()
return true
})
cl.conns = nil
}
// Stops the client. All connections to peers are closed and all activity will
// come to a halt.
func (cl *Client) Close() {
cl.mu.Lock()
defer cl.mu.Unlock()
cl.closed.Set()
if cl.dHT != nil {
cl.dHT.Close()
}
if cl.utpSock != nil {
cl.utpSock.Close()
}
if cl.tcpListener != nil {
cl.tcpListener.Close()
}
cl.eachDhtServer(func(s *dht.Server) { s.Close() })
cl.closeSockets()
for _, t := range cl.torrents {
t.close()
}
@ -442,7 +373,7 @@ func (cl *Client) rejectAccepted(conn net.Conn) bool {
return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
}
func (cl *Client) acceptConnections(l net.Listener, utp bool) {
func (cl *Client) acceptConnections(l net.Listener) {
cl.mu.Lock()
defer cl.mu.Unlock()
for {
@ -466,28 +397,23 @@ func (cl *Client) acceptConnections(l net.Listener, utp bool) {
log.Fmsg("accepted connection from %s", conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
go torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
go torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
if utp {
go torrent.Add("accepted utp connections", 1)
} else {
go torrent.Add("accepted tcp connections", 1)
}
go torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
if cl.rejectAccepted(conn) {
go torrent.Add("rejected accepted connections", 1)
conn.Close()
} else {
go cl.incomingConnection(conn, utp)
go cl.incomingConnection(conn)
}
}
}
func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
func (cl *Client) incomingConnection(nc net.Conn) {
defer nc.Close()
if tc, ok := nc.(*net.TCPConn); ok {
tc.SetLinger(0)
}
c := cl.newConnection(nc)
c.Discovery = peerSourceIncoming
c.uTP = utp
cl.runReceivedConn(c)
}
@ -505,7 +431,6 @@ func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
type dialResult struct {
Conn net.Conn
UTP bool
}
func countDialResult(err error) {
@ -546,22 +471,6 @@ func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err err
return
}
func (cl *Client) utpDialNetwork() string {
// We want to restrict the addr resolve inside the utp library to the
// correct network, since the utp Socket may be listening to a broader
// network for DHT purposes or otherwise.
if !cl.config.DisableIPv4Peers {
return ""
}
n := cl.utpSock.Addr().Network()
switch n {
case "udp", "udp4", "udp6":
return "udp6"
default:
panic(n)
}
}
func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
switch {
case allowIpv4 && allowIpv6:
@ -575,64 +484,70 @@ func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
}
}
func (cl *Client) dialUTP(ctx context.Context, addr string) (c net.Conn, err error) {
c, err = cl.utpSock.DialContext(ctx, cl.utpDialNetwork(), addr)
countDialResult(err)
return
func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
return sock.DialContext(ctx, "", addr)
}
var (
dialledFirstUtp = expvar.NewInt("dialledFirstUtp")
dialledFirstNotUtp = expvar.NewInt("dialledFirstNotUtp")
)
var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
func peerNetworkEnabled(network string, cfg Config) bool {
c := func(s string) bool {
return strings.Contains(network, s)
}
if cfg.DisableUTP {
if c("udp") || c("utp") {
return false
}
}
if cfg.DisableTCP && c("tcp") {
return false
}
return true
}
// Returns a connection over UTP or TCP, whichever is first to connect.
func (cl *Client) dialFirst(ctx context.Context, addr string) (conn net.Conn, utp bool) {
func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
ctx, cancel := context.WithCancel(ctx)
// As soon as we return one connection, cancel the others.
defer cancel()
left := 0
resCh := make(chan dialResult, left)
if !cl.config.DisableUTP {
dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
left++
go func() {
c, _ := cl.dialUTP(ctx, addr)
resCh <- dialResult{c, true}
}()
}
if !cl.config.DisableTCP {
left++
go func() {
c, _ := cl.dialTCP(ctx, addr)
resCh <- dialResult{c, false}
c, err := f(ctx, addr)
countDialResult(err)
resCh <- dialResult{c}
}()
}
func() {
cl.mu.Lock()
defer cl.mu.Unlock()
cl.eachListener(func(s socket) bool {
if peerNetworkEnabled(s.Addr().Network(), cl.config) {
dial(s.dial)
}
return true
})
}()
var res dialResult
// Wait for a successful connection.
for ; left > 0 && res.Conn == nil; left-- {
res = <-resCh
}
if left > 0 {
// There are still incompleted dials.
go func() {
for ; left > 0; left-- {
conn := (<-resCh).Conn
if conn != nil {
conn.Close()
}
// There are still incompleted dials.
go func() {
for ; left > 0; left-- {
conn := (<-resCh).Conn
if conn != nil {
conn.Close()
}
}()
}
conn = res.Conn
utp = res.UTP
if conn != nil {
if utp {
dialledFirstUtp.Add(1)
} else {
dialledFirstNotUtp.Add(1)
}
}()
if res.Conn != nil {
go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
}
return
return res.Conn
}
func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
@ -645,10 +560,9 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
// Performs initiator handshakes and returns a connection. Returns nil
// *connection if no connection for valid reasons.
func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader, utp bool) (c *connection, err error) {
func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
c = cl.newConnection(nc)
c.headerEncrypted = encryptHeader
c.uTP = utp
ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
defer cancel()
dl, ok := ctx.Deadline()
@ -666,31 +580,35 @@ func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torr
return
}
var (
initiatedConnWithPreferredHeaderEncryption = expvar.NewInt("initiatedConnWithPreferredHeaderEncryption")
initiatedConnWithFallbackHeaderEncryption = expvar.NewInt("initiatedConnWithFallbackHeaderEncryption")
)
// Returns nil connection and nil error if no connection could be established
// for valid reasons.
func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
nc := cl.dialFirst(ctx, addr)
if nc == nil {
return
}
defer func() {
if c == nil || err != nil {
nc.Close()
}
}()
return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
}
// Returns nil connection and nil error if no connection could be established
// for valid reasons.
func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
nc, utp := cl.dialFirst(ctx, addr)
if nc == nil {
return
}
obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
c, err = cl.handshakesConnection(ctx, nc, t, obfuscatedHeaderFirst, utp)
c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
if err != nil {
// log.Printf("error initiating connection handshakes: %s", err)
nc.Close()
return
} else if c != nil {
initiatedConnWithPreferredHeaderEncryption.Add(1)
return
}
nc.Close()
if c != nil {
go torrent.Add("initiated conn with preferred header obfuscation", 1)
return
}
if cl.config.ForceEncryption {
// We should have just tried with an obfuscated header. A plaintext
// header can't result in an encrypted connection, so we're done.
@ -699,23 +617,10 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection,
}
return
}
// Try again with encryption if we didn't earlier, or without if we did,
// using whichever protocol type worked last time.
if utp {
nc, err = cl.dialUTP(ctx, addr)
} else {
nc, err = cl.dialTCP(ctx, addr)
}
if err != nil {
err = fmt.Errorf("error dialing for header encryption fallback: %s", err)
return
}
c, err = cl.handshakesConnection(ctx, nc, t, !obfuscatedHeaderFirst, utp)
if err != nil || c == nil {
nc.Close()
}
if err == nil && c != nil {
initiatedConnWithFallbackHeaderEncryption.Add(1)
// Try again with encryption if we didn't earlier, or without if we did.
c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
if c != nil {
go torrent.Add("initiated conn with fallback header obfuscation", 1)
}
return
}
@ -746,14 +651,7 @@ func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
// The port number for incoming peer connections. 0 if the client isn't
// listening.
func (cl *Client) incomingPeerPort() int {
if cl.listenAddr == "" {
return 0
}
_, port, err := missinggo.ParseHostPort(cl.listenAddr)
if err != nil {
panic(err)
}
return port
return cl.LocalPort()
}
func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
@ -952,14 +850,28 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
}(),
})
}
if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
conn.Post(pp.Message{
Type: pp.Port,
Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
Port: cl.dhtPort(),
})
}
}
func (cl *Client) dhtPort() (ret uint16) {
cl.eachDhtServer(func(s *dht.Server) {
ret = uint16(missinggo.AddrPort(s.Addr()))
})
return
}
func (cl *Client) haveDhtServer() (ret bool) {
cl.eachDhtServer(func(_ *dht.Server) {
ret = true
})
return
}
// Process incoming ut_metadata message.
func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
var d map[string]int
@ -1079,9 +991,9 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor
}
new = true
t = cl.newTorrent(infoHash, specStorage)
if cl.dHT != nil {
go t.dhtAnnouncer()
}
cl.eachDhtServer(func(s *dht.Server) {
go t.dhtAnnouncer(s)
})
cl.torrents[infoHash] = t
t.updateWantPeersEvent()
// Tickle Client.waitAccept, new torrent may want conns.
@ -1195,14 +1107,11 @@ func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
return cl.AddTorrent(mi)
}
func (cl *Client) DHT() *dht.Server {
return cl.dHT
func (cl *Client) DhtServers() []*dht.Server {
return cl.dhtServers
}
func (cl *Client) AddDHTNodes(nodes []string) {
if cl.DHT() == nil {
return
}
for _, n := range nodes {
hmp := missinggo.SplitHostMaybePort(n)
ip := net.ParseIP(hmp.Host)
@ -1216,7 +1125,9 @@ func (cl *Client) AddDHTNodes(nodes []string) {
Port: hmp.Port,
},
}
cl.DHT().AddNode(ni)
cl.eachDhtServer(func(s *dht.Server) {
s.AddNode(ni)
})
}
}
@ -1267,16 +1178,52 @@ func firstNotNil(ips ...net.IP) net.IP {
return nil
}
func (cl *Client) eachListener(f func(socket) bool) {
for _, s := range cl.conns {
if !f(s) {
break
}
}
}
func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
cl.eachListener(func(l socket) bool {
ret = l
return !f(l)
})
return
}
func (cl *Client) publicIp(peer net.IP) net.IP {
// TODO: Use BEP 10 to determine how peers are seeing us.
if peer.To4() != nil {
return firstNotNil(cl.config.PublicIp4, missinggo.AddrIP(cl.ListenAddr()).To4())
return firstNotNil(
cl.config.PublicIp4,
cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
)
} else {
return firstNotNil(cl.config.PublicIp6, missinggo.AddrIP(cl.ListenAddr()).To16())
return firstNotNil(
cl.config.PublicIp6,
cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
)
}
}
func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
return f(missinggo.AddrIP(l.Addr()))
}).Addr())
}
// Our IP as a peer should see it.
func (cl *Client) publicAddr(peer net.IP) ipPort {
return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
}
func (cl *Client) ListenAddrs() (ret []net.Addr) {
cl.eachListener(func(l socket) bool {
ret = append(ret, l.Addr())
return true
})
return
}

View File

@ -13,6 +13,7 @@ import (
"testing"
"time"
"github.com/anacrolix/dht"
_ "github.com/anacrolix/envpprof"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/filecache"
@ -214,16 +215,6 @@ func TestUTPRawConn(t *testing.T) {
}
}
func TestTwoClientsArbitraryPorts(t *testing.T) {
for i := 0; i < 2; i++ {
cl, err := NewClient(TestingConfig())
if err != nil {
t.Fatal(err)
}
defer cl.Close()
}
}
func TestAddDropManyTorrents(t *testing.T) {
cl, err := NewClient(TestingConfig())
require.NoError(t, err)
@ -403,7 +394,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
require.NoError(t, err)
assert.True(t, new)
// Now do some things with leecher and seeder.
addClientPeer(leecherTorrent, seeder)
leecherTorrent.AddClientPeer(seeder)
// The Torrent should not be interested in obtaining peers, so the one we
// just added should be the only one.
assert.False(t, leecherTorrent.Seeding())
@ -461,6 +452,7 @@ func TestSeedAfterDownloading(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(cfg.DataDir)
leecherLeecher, _ := NewClient(cfg)
require.NoError(t, err)
defer leecherLeecher.Close()
testutil.ExportStatusWriter(leecherLeecher, "ll")
leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
@ -486,8 +478,8 @@ func TestSeedAfterDownloading(t *testing.T) {
require.NoError(t, err)
assert.EqualValues(t, testutil.GreetingFileContents, b)
}()
addClientPeer(leecherGreeting, seeder)
addClientPeer(leecherGreeting, leecherLeecher)
leecherGreeting.AddClientPeer(seeder)
leecherGreeting.AddClientPeer(leecherLeecher)
wg.Add(1)
go func() {
defer wg.Done()
@ -585,7 +577,7 @@ func TestResponsive(t *testing.T) {
ret.ChunkSize = 2
return
}())
addClientPeer(leecherTorrent, seeder)
leecherTorrent.AddClientPeer(seeder)
reader := leecherTorrent.NewReader()
defer reader.Close()
reader.SetReadahead(0)
@ -628,7 +620,7 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
ret.ChunkSize = 2
return
}())
addClientPeer(leecherTorrent, seeder)
leecherTorrent.AddClientPeer(seeder)
reader := leecherTorrent.NewReader()
defer reader.Close()
reader.SetReadahead(0)
@ -656,7 +648,12 @@ func TestDHTInheritBlocklist(t *testing.T) {
cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()
require.Equal(t, ipl, cl.DHT().IPBlocklist())
numServers := 0
cl.eachDhtServer(func(s *dht.Server) {
assert.Equal(t, ipl, s.IPBlocklist())
numServers++
})
assert.EqualValues(t, 2, numServers)
}
// Check that stuff is merged in subsequent AddTorrentSpec for the same
@ -761,21 +758,29 @@ func TestAddMetainfoWithNodes(t *testing.T) {
cfg := TestingConfig()
cfg.ListenAddr = ":0"
cfg.NoDHT = false
cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
// For now, we want to just jam the nodes into the table, without
// verifying them first. Also the DHT code doesn't support mixing secure
// and insecure nodes if security is enabled (yet).
cfg.DHTConfig.NoSecurity = true
// cfg.DHTConfig.NoSecurity = true
cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()
assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
sum := func() (ret int) {
cl.eachDhtServer(func(s *dht.Server) {
ret += s.NumNodes()
ret += s.Stats().OutstandingTransactions
})
return
}
assert.EqualValues(t, 0, sum())
tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
require.NoError(t, err)
// Nodes are not added or exposed in Torrent's metainfo. We just randomly
// check if the announce-list is here instead. TODO: Add nodes.
assert.Len(t, tt.metainfo.AnnounceList, 5)
// There are 6 nodes in the torrent file.
assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
assert.EqualValues(t, 6*len(cl.dhtServers), sum())
}
type testDownloadCancelParams struct {
@ -831,7 +836,7 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
}
leecherGreeting.cl.mu.Unlock()
addClientPeer(leecherGreeting, seeder)
leecherGreeting.AddClientPeer(seeder)
completes := make(map[int]bool, 3)
values:
for {
@ -908,8 +913,12 @@ func TestClientDynamicListenPortAllProtocols(t *testing.T) {
cl, err := NewClient(TestingConfig())
require.NoError(t, err)
defer cl.Close()
assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
port := cl.LocalPort()
assert.NotEqual(t, 0, port)
cl.eachListener(func(s socket) bool {
assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
return true
})
}
func TestClientDynamicListenTCPOnly(t *testing.T) {
@ -918,8 +927,11 @@ func TestClientDynamicListenTCPOnly(t *testing.T) {
cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()
assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
assert.Nil(t, cl.utpSock)
assert.NotEqual(t, 0, cl.LocalPort())
cl.eachListener(func(s socket) bool {
assert.True(t, isTcpNetwork(s.Addr().Network()))
return true
})
}
func TestClientDynamicListenUTPOnly(t *testing.T) {
@ -928,8 +940,11 @@ func TestClientDynamicListenUTPOnly(t *testing.T) {
cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()
assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
assert.Nil(t, cl.tcpListener)
assert.NotEqual(t, 0, cl.LocalPort())
cl.eachListener(func(s socket) bool {
assert.True(t, isUtpNetwork(s.Addr().Network()))
return true
})
}
func TestClientDynamicListenPortNoProtocols(t *testing.T) {
@ -939,16 +954,7 @@ func TestClientDynamicListenPortNoProtocols(t *testing.T) {
cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()
assert.Nil(t, cl.ListenAddr())
}
func addClientPeer(t *Torrent, cl *Client) {
t.AddPeers([]Peer{
{
IP: missinggo.AddrIP(cl.ListenAddr()),
Port: missinggo.AddrPort(cl.ListenAddr()),
},
})
assert.Equal(t, 0, cl.LocalPort())
}
func totalConns(tts []*Torrent) (ret int) {
@ -978,7 +984,7 @@ func TestSetMaxEstablishedConn(t *testing.T) {
for _, tt := range tts {
for _, _tt := range tts {
// if tt != _tt {
addClientPeer(tt, _tt.cl)
tt.AddClientPeer(_tt.cl)
// }
}
}
@ -1048,10 +1054,7 @@ func TestMultipleTorrentsWithEncryption(t *testing.T) {
testutil.ExportStatusWriter(client, "c")
tr, err := client.AddMagnet(magnet1)
require.NoError(t, err)
tr.AddPeers([]Peer{{
IP: missinggo.AddrIP(server.ListenAddr()),
Port: missinggo.AddrPort(server.ListenAddr()),
}})
tr.AddClientPeer(server)
<-tr.GotInfo()
tr.DownloadAll()
client.WaitAll()

View File

@ -28,7 +28,9 @@ func main() {
cl.WriteStatus(w)
})
http.HandleFunc("/dht", func(w http.ResponseWriter, r *http.Request) {
cl.DHT().WriteStatus(w)
for _, ds := range cl.DhtServers() {
ds.WriteStatus(w)
}
})
wg := sync.WaitGroup{}
for _, arg := range args.Magnet {

View File

@ -13,7 +13,6 @@ import (
"github.com/anacrolix/torrent/iplist"
"github.com/anacrolix/dht"
"github.com/anacrolix/envpprof"
"github.com/anacrolix/tagflag"
"github.com/dustin/go-humanize"
@ -148,9 +147,6 @@ func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
tagflag.Parse(&flags)
clientConfig := torrent.Config{
DHTConfig: dht.ServerConfig{
StartingNodes: dht.GlobalBootstrapAddrs,
},
Debug: flags.Debug,
Seed: flags.Seed,
}

View File

@ -15,7 +15,6 @@ import (
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"github.com/anacrolix/dht"
_ "github.com/anacrolix/envpprof"
"github.com/anacrolix/tagflag"
@ -92,9 +91,6 @@ func mainExitCode() int {
DisableTrackers: args.DisableTrackers,
ListenAddr: args.ListenAddr.String(),
NoUpload: true, // Ensure that downloads are responsive.
DHTConfig: dht.ServerConfig{
StartingNodes: dht.GlobalBootstrapAddrs,
},
})
if err != nil {
log.Print(err)

View File

@ -6,9 +6,9 @@ import (
"net/http"
"time"
"github.com/anacrolix/dht"
"golang.org/x/time/rate"
"github.com/anacrolix/dht"
"github.com/anacrolix/torrent/iplist"
"github.com/anacrolix/torrent/storage"
)
@ -38,11 +38,10 @@ type Config struct {
// Don't announce to trackers. This only leaves DHT to discover peers.
DisableTrackers bool `long:"disable-trackers"`
DisablePEX bool `long:"disable-pex"`
// Don't create a DHT.
NoDHT bool `long:"disable-dht"`
// Overrides the default DHT configuration.
DHTConfig dht.ServerConfig
// Don't create a DHT.
NoDHT bool `long:"disable-dht"`
DhtStartingNodes dht.StartingNodesGetter
// Never send chunks to peers.
NoUpload bool `long:"no-upload"`
// Disable uploading even when it isn't fair.
@ -148,6 +147,9 @@ func (cfg *Config) setDefaults() {
if cfg.HandshakesTimeout == 0 {
cfg.HandshakesTimeout = 20 * time.Second
}
if cfg.DhtStartingNodes == nil {
cfg.DhtStartingNodes = dht.GlobalBootstrapAddrs
}
}
type EncryptionPolicy struct {

View File

@ -13,6 +13,7 @@ import (
"sync"
"time"
"github.com/anacrolix/dht"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo"
@ -48,7 +49,6 @@ type connection struct {
headerEncrypted bool
cryptoMethod mse.CryptoMethod
Discovery peerSource
uTP bool
closed missinggo.Event
stats ConnStats
@ -179,12 +179,16 @@ func (cn *connection) connectionFlags() (ret string) {
c('e')
}
ret += string(cn.Discovery)
if cn.uTP {
if cn.utp() {
c('U')
}
return
}
func (cn *connection) utp() bool {
return strings.Contains(cn.remoteAddr().Network(), "utp")
}
// Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
func (cn *connection) statusFlags() (ret string) {
c := func(b byte) {
@ -1019,9 +1023,6 @@ func (c *connection) mainReadLoop() (err error) {
case pp.Extended:
err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
case pp.Port:
if cl.dHT == nil {
break
}
pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
if err != nil {
panic(err)
@ -1029,7 +1030,9 @@ func (c *connection) mainReadLoop() (err error) {
if msg.Port != 0 {
pingAddr.Port = int(msg.Port)
}
go cl.dHT.Ping(pingAddr, nil)
cl.eachDhtServer(func(s *dht.Server) {
go s.Ping(pingAddr, nil)
})
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)

View File

@ -197,13 +197,9 @@ func TestDownloadOnDemand(t *testing.T) {
require.NoError(t, err)
testutil.ExportStatusWriter(leecher, "l")
defer leecher.Close()
leecherTorrent, _ := leecher.AddTorrent(layout.Metainfo)
leecherTorrent.AddPeers([]torrent.Peer{
{
IP: missinggo.AddrIP(seeder.ListenAddr()),
Port: missinggo.AddrPort(seeder.ListenAddr()),
},
})
leecherTorrent, err := leecher.AddTorrent(layout.Metainfo)
require.NoError(t, err)
leecherTorrent.AddClientPeer(seeder)
fs := New(leecher)
defer fs.Destroy()
root, _ := fs.Root()

17
listen.go Normal file
View File

@ -0,0 +1,17 @@
package torrent
type peerNetworks struct {
tcp4, tcp6 bool
utp4, utp6 bool
}
func handleErr(h func(), fs ...func() error) error {
for _, f := range fs {
err := f()
if err != nil {
h()
return err
}
}
return nil
}

View File

@ -125,8 +125,10 @@ func addrCompactIP(addr net.Addr) (string, error) {
return string(ip.To16()), nil
}
func connIsIpv6(nc net.Conn) bool {
ra := nc.RemoteAddr()
func connIsIpv6(nc interface {
LocalAddr() net.Addr
}) bool {
ra := nc.LocalAddr()
rip := missinggo.AddrIP(ra)
return rip.To4() == nil && rip.To16() != nil
}

123
socket.go Normal file
View File

@ -0,0 +1,123 @@
package torrent
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"github.com/anacrolix/missinggo"
)
type dialer interface {
dial(_ context.Context, addr string) (net.Conn, error)
}
type socket interface {
net.Listener
dialer
}
func listen(network, addr string) (socket, error) {
if isTcpNetwork(network) {
return listenTcp(network, addr)
} else if isUtpNetwork(network) {
return listenUtp(network, addr)
} else {
panic(fmt.Sprintf("unknown network %q", network))
}
}
func isTcpNetwork(s string) bool {
return strings.Contains(s, "tcp")
}
func isUtpNetwork(s string) bool {
return strings.Contains(s, "utp") || strings.Contains(s, "udp")
}
func listenTcp(network, address string) (s socket, err error) {
l, err := net.Listen(network, address)
if err != nil {
return
}
return tcpSocket{l}, nil
}
type tcpSocket struct {
net.Listener
}
func (me tcpSocket) dial(ctx context.Context, addr string) (net.Conn, error) {
return net.Dial(me.Addr().Network(), addr)
}
func setPort(addr string, port int) string {
host, _, err := net.SplitHostPort(addr)
if err != nil {
panic(err)
}
return net.JoinHostPort(host, strconv.FormatInt(int64(port), 10))
}
func listenAll(networks []string, addr string) ([]socket, error) {
if len(networks) == 0 {
return nil, nil
}
for {
ss, retry, err := listenAllRetry(networks, addr)
if !retry {
return ss, err
}
}
}
func listenAllRetry(networks []string, addr string) (ss []socket, retry bool, err error) {
_, port, err := missinggo.ParseHostPort(addr)
if err != nil {
err = fmt.Errorf("error parsing addr: %s", err)
return
}
ss = make([]socket, 1, len(networks))
ss[0], err = listen(networks[0], addr)
if err != nil {
return nil, false, fmt.Errorf("first listen: %s", err)
}
defer func() {
if err != nil || retry {
for _, s := range ss {
s.Close()
}
ss = nil
}
}()
restAddr := setPort(addr, missinggo.AddrPort(ss[0].Addr()))
for _, n := range networks[1:] {
s, err := listen(n, restAddr)
if err != nil {
return ss,
missinggo.IsAddrInUse(err) && port == 0,
fmt.Errorf("subsequent listen: %s", err)
}
ss = append(ss, s)
}
return
}
func listenUtp(network, addr string) (s socket, err error) {
us, err := NewUtpSocket(network, addr)
if err != nil {
return
}
return utpSocketSocket{us, network}, nil
}
type utpSocketSocket struct {
utpSocket
network string
}
func (me utpSocketSocket) dial(ctx context.Context, addr string) (net.Conn, error) {
return me.utpSocket.DialContext(ctx, me.network, addr)
}

View File

@ -1378,9 +1378,9 @@ func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
}
}
func (t *Torrent) announceDHT(impliedPort bool) (err error) {
func (t *Torrent) announceDHT(impliedPort bool, s *dht.Server) (err error) {
cl := t.cl
ps, err := cl.dHT.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
ps, err := s.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
if err != nil {
return
}
@ -1389,7 +1389,7 @@ func (t *Torrent) announceDHT(impliedPort bool) (err error) {
return
}
func (t *Torrent) dhtAnnouncer() {
func (t *Torrent) dhtAnnouncer(s *dht.Server) {
cl := t.cl
for {
select {
@ -1397,7 +1397,7 @@ func (t *Torrent) dhtAnnouncer() {
case <-t.closed.LockedChan(&cl.mu):
return
}
err := t.announceDHT(true)
err := t.announceDHT(true, s)
func() {
cl.mu.Lock()
defer cl.mu.Unlock()
@ -1724,3 +1724,15 @@ func (t *Torrent) initiateConn(peer Peer) {
t.halfOpen[addr] = peer
go t.cl.outgoingConnection(t, addr, peer.Source)
}
func (t *Torrent) AddClientPeer(cl *Client) {
t.AddPeers(func() (ps []Peer) {
for _, la := range cl.ListenAddrs() {
ps = append(ps, Peer{
IP: missinggo.AddrIP(la),
Port: missinggo.AddrPort(la),
})
}
return
}())
}

View File

@ -1,6 +1,7 @@
package torrent
import (
"fmt"
"net"
"os"
"path/filepath"
@ -176,7 +177,7 @@ func TestTorrentMetainfoIncompleteMetadata(t *testing.T) {
assert.Nil(t, tt.Metainfo().InfoBytes)
assert.False(t, tt.haveAllMetadataPieces())
nc, err := net.Dial("tcp", cl.ListenAddr().String())
nc, err := net.Dial("tcp", fmt.Sprintf(":%d", cl.LocalPort()))
require.NoError(t, err)
defer nc.Close()