Add UTP support, disable TCP for now. DHT moves to another port

This commit is contained in:
Matt Joiner 2014-11-16 13:29:31 -06:00
parent eeae2d3285
commit 963918ac90
3 changed files with 75 additions and 22 deletions

6
NOTES Normal file
View File

@ -0,0 +1,6 @@
Shared DHT/UTP/UDP-tracker socket dispatching in Transmission is at
https://trac.transmissionbt.com/browser/trunk/libtransmission/tr-udp.c
event_callback(). Currently I don't do this because github.com/h2so5/utp does
not support UTP sockets backed by a socket out of its control. Also I only
make client requests for UDP-trackers, so no shared socket is required there
unless I want to imply the client port.

View File

@ -29,22 +29,23 @@ import (
mathRand "math/rand" mathRand "math/rand"
"net" "net"
"os" "os"
"strconv"
"strings" "strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
"bitbucket.org/anacrolix/go.torrent/util/levelmu" "github.com/h2so5/utp"
"bitbucket.org/anacrolix/go.torrent/dht"
. "bitbucket.org/anacrolix/go.torrent/util"
"github.com/anacrolix/libtorgo/metainfo" "github.com/anacrolix/libtorgo/metainfo"
"github.com/nsf/libtorgo/bencode" "github.com/nsf/libtorgo/bencode"
"bitbucket.org/anacrolix/go.torrent/dht"
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
"bitbucket.org/anacrolix/go.torrent/tracker" "bitbucket.org/anacrolix/go.torrent/tracker"
_ "bitbucket.org/anacrolix/go.torrent/tracker/udp" _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
. "bitbucket.org/anacrolix/go.torrent/util"
"bitbucket.org/anacrolix/go.torrent/util/levelmu"
) )
var ( var (
@ -117,7 +118,7 @@ type Client struct {
dataDir string dataDir string
halfOpenLimit int halfOpenLimit int
peerID [20]byte peerID [20]byte
listener net.Listener listeners []net.Listener
disableTrackers bool disableTrackers bool
downloadStrategy DownloadStrategy downloadStrategy DownloadStrategy
dHT *dht.Server dHT *dht.Server
@ -153,6 +154,8 @@ func (cl *Client) WriteStatus(w io.Writer) {
if cl.dHT != nil { if cl.dHT != nil {
fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes()) fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes())
fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString()) fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString())
fmt.Fprintf(w, "DHT port: %d\n", addrPort(cl.dHT.LocalAddr()))
fmt.Fprintf(w, "DHT announces: %d\n", cl.dHT.NumConfirmedAnnounces)
} }
cl.downloadStrategy.WriteStatus(w) cl.downloadStrategy.WriteStatus(w)
fmt.Fprintln(w) fmt.Fprintln(w)
@ -225,6 +228,19 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er
return t.Data.ReadAt(p, off) return t.Data.ReadAt(p, off)
} }
func dhtAddr(listen net.Addr) (s string, err error) {
host, port, err := net.SplitHostPort(listen.String())
if err != nil {
return
}
i64, err := strconv.ParseInt(port, 0, 0)
if err != nil {
return
}
s = net.JoinHostPort(host, strconv.FormatInt(i64+1, 10))
return
}
func NewClient(cfg *Config) (cl *Client, err error) { func NewClient(cfg *Config) (cl *Client, err error) {
if cfg == nil { if cfg == nil {
cfg = &Config{} cfg = &Config{}
@ -255,17 +271,38 @@ func NewClient(cfg *Config) (cl *Client, err error) {
cl.downloadStrategy = &DefaultDownloadStrategy{} cl.downloadStrategy = &DefaultDownloadStrategy{}
} }
cl.listener, err = net.Listen("tcp", cfg.ListenAddr) // Returns the laddr string to listen on for the next Listen call.
if err != nil { listenAddr := func() string {
return if addr := cl.ListenAddr(); addr != nil {
return addr.String()
}
return cfg.ListenAddr
} }
if cl.listener != nil { var l net.Listener
go cl.acceptConnections() if false {
l, err = net.Listen("tcp", listenAddr())
if err != nil {
return
}
cl.listeners = append(cl.listeners, l)
go cl.acceptConnections(l, false)
}
if true {
l, err = utp.Listen("utp", listenAddr())
if err != nil {
return
}
cl.listeners = append(cl.listeners, l)
go cl.acceptConnections(l, true)
} }
if !cfg.NoDHT { if !cfg.NoDHT {
var dhtAddr_ string
dhtAddr_, err = dhtAddr(cl.ListenAddr())
if err != nil {
return
}
cl.dHT, err = dht.NewServer(&dht.ServerConfig{ cl.dHT, err = dht.NewServer(&dht.ServerConfig{
Addr: cfg.ListenAddr, Addr: dhtAddr_,
}) })
if err != nil { if err != nil {
return return
@ -296,11 +333,11 @@ func (me *Client) Stop() {
me.mu.Unlock() me.mu.Unlock()
} }
func (cl *Client) acceptConnections() { func (cl *Client) acceptConnections(l net.Listener, utp bool) {
for { for {
// We accept all connections immediately, because we don't what // We accept all connections immediately, because we don't what
// torrent they're for. // torrent they're for.
conn, err := cl.listener.Accept() conn, err := l.Accept()
select { select {
case <-cl.quit: case <-cl.quit:
if conn != nil { if conn != nil {
@ -314,7 +351,7 @@ func (cl *Client) acceptConnections() {
return return
} }
go func() { go func() {
if err := cl.runConnection(conn, nil, peerSourceIncoming); err != nil { if err := cl.runConnection(conn, nil, peerSourceIncoming, utp); err != nil {
log.Print(err) log.Print(err)
} }
}() }()
@ -332,7 +369,7 @@ func (me *Client) torrent(ih InfoHash) *torrent {
// Start the process of connecting to the given peer for the given torrent if // Start the process of connecting to the given peer for the given torrent if
// appropriate. // appropriate.
func (me *Client) initiateConn(peer Peer, torrent *torrent) { func (me *Client) initiateConn(peer Peer, t *torrent) {
if peer.Id == me.peerID { if peer.Id == me.peerID {
return return
} }
@ -353,7 +390,11 @@ func (me *Client) initiateConn(peer Peer, torrent *torrent) {
// "address in use" error. It seems it's not possible to dial out from // "address in use" error. It seems it's not possible to dial out from
// this address so that peers associate our local address with our // this address so that peers associate our local address with our
// listen address. // listen address.
conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout) if false {
conn, err := net.DialTimeout("tcp", addr, dialTimeout)
} else {
conn, err := (&utp.Dialer{Timeout: dialTimeout}).Dial("utp", addr)
}
// Whether or not the connection attempt succeeds, the half open // Whether or not the connection attempt succeeds, the half open
// counter should be decremented, and new connection attempts made. // counter should be decremented, and new connection attempts made.
@ -381,7 +422,7 @@ func (me *Client) initiateConn(peer Peer, torrent *torrent) {
return return
} }
// log.Printf("connected to %s", conn.RemoteAddr()) // log.Printf("connected to %s", conn.RemoteAddr())
err = me.runConnection(conn, torrent, peer.Source) err = me.runConnection(conn, t, peer.Source, true)
if err != nil { if err != nil {
log.Print(err) log.Print(err)
} }
@ -518,7 +559,7 @@ func (pc peerConn) Read(b []byte) (n int, err error) {
return return
} }
func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource) (err error) { func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource, uTP bool) (err error) {
if tcpConn, ok := sock.(*net.TCPConn); ok { if tcpConn, ok := sock.(*net.TCPConn); ok {
tcpConn.SetLinger(0) tcpConn.SetLinger(0)
} }
@ -554,7 +595,7 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
} }
sock.SetWriteDeadline(time.Time{}) sock.SetWriteDeadline(time.Time{})
sock = peerConn{sock} sock = peerConn{sock}
conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID) conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP)
defer conn.Close() defer conn.Close()
conn.Discovery = discovery conn.Discovery = discovery
if !me.addConnection(torrent, conn) { if !me.addConnection(torrent, conn) {

View File

@ -28,6 +28,7 @@ const (
type connection struct { type connection struct {
Socket net.Conn Socket net.Conn
Discovery peerSource Discovery peerSource
uTP bool
closing chan struct{} closing chan struct{}
mu sync.Mutex // Only for closing. mu sync.Mutex // Only for closing.
post chan pp.Message post chan pp.Message
@ -59,9 +60,11 @@ type connection struct {
PeerClientName string PeerClientName string
} }
func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte) (c *connection) { func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool) (c *connection) {
c = &connection{ c = &connection{
Socket: sock, Socket: sock,
uTP: uTP,
Choked: true, Choked: true,
PeerChoked: true, PeerChoked: true,
PeerMaxRequests: 250, PeerMaxRequests: 250,
@ -156,6 +159,9 @@ func (cn *connection) WriteStatus(w io.Writer) {
if cn.Discovery != 0 { if cn.Discovery != 0 {
c(byte(cn.Discovery)) c(byte(cn.Discovery))
} }
if cn.uTP {
c('T')
}
fmt.Fprintln(w) fmt.Fprintln(w)
} }