Rework Client listeners

In particular, if the ListenAddr used a dynamic port ":0", and both TCP and uTP were enabled. If the TCP listen succeeded, and the uTP did not, the TCP listener was leaked, and another port number was not tried.
This commit is contained in:
Matt Joiner 2016-05-11 21:11:52 +10:00
parent 948552b282
commit dce3a7f675
3 changed files with 96 additions and 49 deletions

141
client.go
View File

@ -58,9 +58,11 @@ func (cl *Client) queueFirstHash(t *Torrent, piece int) {
// Clients contain zero or more Torrents. A Client manages a blocklist, the // Clients contain zero or more Torrents. A Client manages a blocklist, the
// TCP/UDP protocol ports, and DHT as desired. // TCP/UDP protocol ports, and DHT as desired.
type Client struct { type Client struct {
halfOpenLimit int halfOpenLimit int
peerID [20]byte peerID [20]byte
listeners []net.Listener // The net.Addr.String part that should be common to all active listeners.
listenAddr string
tcpListener net.Listener
utpSock *utp.Socket utpSock *utp.Socket
dHT *dht.Server dHT *dht.Server
ipBlockList iplist.Ranger ipBlockList iplist.Ranger
@ -99,12 +101,17 @@ func (cl *Client) PeerID() string {
return string(cl.peerID[:]) return string(cl.peerID[:])
} }
func (cl *Client) ListenAddr() (addr net.Addr) { type torrentAddr string
for _, l := range cl.listeners {
addr = l.Addr() func (me torrentAddr) Network() string { return "" }
break
func (me torrentAddr) String() string { return string(me) }
func (cl *Client) ListenAddr() net.Addr {
if cl.listenAddr == "" {
return nil
} }
return return torrentAddr(cl.listenAddr)
} }
type hashSorter struct { type hashSorter struct {
@ -176,6 +183,59 @@ func (cl *Client) WriteStatus(_w io.Writer) {
} }
} }
func listenUTP(networkSuffix, addr string) (*utp.Socket, error) {
return utp.NewSocket("udp"+networkSuffix, addr)
}
func listenTCP(networkSuffix, addr string) (net.Listener, error) {
return net.Listen("tcp"+networkSuffix, addr)
}
func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock *utp.Socket, listenedAddr string, err error) {
for {
tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
if err != nil {
return
}
listenedAddr = tcpL.Addr().String()
utpSock, err = listenUTP(networkSuffix, listenedAddr)
if err == nil {
return
}
tcpL.Close()
if !strings.Contains(err.Error(), "address already in use") {
return
}
}
}
func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock *utp.Socket, listenedAddr string, err error) {
if addr == "" {
addr = ":50007"
}
host, port, err := missinggo.ParseHostPort(addr)
if err != nil {
return
}
if tcp && utp && port == 0 {
return listenBothSameDynamicPort(networkSuffix, host)
}
listenedAddr = addr
if tcp {
tcpL, err = listenTCP(networkSuffix, addr)
if err != nil {
return
}
}
if utp {
utpSock, err = listenUTP(networkSuffix, addr)
if err != nil && tcp {
tcpL.Close()
}
}
return
}
// Creates a new client. // Creates a new client.
func NewClient(cfg *Config) (cl *Client, err error) { func NewClient(cfg *Config) (cl *Client, err error) {
if cfg == nil { if cfg == nil {
@ -213,43 +273,24 @@ func NewClient(cfg *Config) (cl *Client, err error) {
} }
} }
// Returns the laddr string to listen on for the next Listen call. cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
listenAddr := func() string { !cl.config.DisableTCP,
if addr := cl.ListenAddr(); addr != nil { !cl.config.DisableUTP,
return addr.String() func() string {
}
if cfg.ListenAddr == "" {
return ":50007"
}
return cfg.ListenAddr
}
if !cl.config.DisableTCP {
var l net.Listener
l, err = net.Listen(func() string {
if cl.config.DisableIPv6 { if cl.config.DisableIPv6 {
return "tcp4" return "4"
} else { } else {
return "tcp" return ""
} }
}(), listenAddr()) }(),
if err != nil { cl.config.ListenAddr)
return if err != nil {
} return
cl.listeners = append(cl.listeners, l)
go cl.acceptConnections(l, false)
} }
if !cl.config.DisableUTP { if cl.tcpListener != nil {
cl.utpSock, err = utp.NewSocket(func() string { go cl.acceptConnections(cl.tcpListener, false)
if cl.config.DisableIPv6 { }
return "udp4" if cl.utpSock != nil {
} else {
return "udp"
}
}(), listenAddr())
if err != nil {
return
}
cl.listeners = append(cl.listeners, cl.utpSock)
go cl.acceptConnections(cl.utpSock, true) go cl.acceptConnections(cl.utpSock, true)
} }
if !cfg.NoDHT { if !cfg.NoDHT {
@ -258,7 +299,7 @@ func NewClient(cfg *Config) (cl *Client, err error) {
dhtCfg.IPBlocklist = cl.ipBlockList dhtCfg.IPBlocklist = cl.ipBlockList
} }
if dhtCfg.Addr == "" { if dhtCfg.Addr == "" {
dhtCfg.Addr = listenAddr() dhtCfg.Addr = cl.listenAddr
} }
if dhtCfg.Conn == nil && cl.utpSock != nil { if dhtCfg.Conn == nil && cl.utpSock != nil {
dhtCfg.Conn = cl.utpSock dhtCfg.Conn = cl.utpSock
@ -281,8 +322,11 @@ func (cl *Client) Close() {
if cl.dHT != nil { if cl.dHT != nil {
cl.dHT.Close() cl.dHT.Close()
} }
for _, l := range cl.listeners { if cl.utpSock != nil {
l.Close() cl.utpSock.Close()
}
if cl.tcpListener != nil {
cl.tcpListener.Close()
} }
for _, t := range cl.torrents { for _, t := range cl.torrents {
t.close() t.close()
@ -593,11 +637,14 @@ func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
// The port number for incoming peer connections. 0 if the client isn't // The port number for incoming peer connections. 0 if the client isn't
// listening. // listening.
func (cl *Client) incomingPeerPort() int { func (cl *Client) incomingPeerPort() int {
listenAddr := cl.ListenAddr() if cl.listenAddr == "" {
if listenAddr == nil {
return 0 return 0
} }
return missinggo.AddrPort(listenAddr) _, port, err := missinggo.ParseHostPort(cl.listenAddr)
if err != nil {
panic(err)
}
return port
} }
// Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes

View File

@ -485,7 +485,8 @@ func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
func TestCompletedPieceWrongSize(t *testing.T) { func TestCompletedPieceWrongSize(t *testing.T) {
cfg := TestingConfig cfg := TestingConfig
cfg.DefaultStorage = badStorage{} cfg.DefaultStorage = badStorage{}
cl, _ := NewClient(&cfg) cl, err := NewClient(&cfg)
require.NoError(t, err)
defer cl.Close() defer cl.Close()
ie := metainfo.InfoEx{ ie := metainfo.InfoEx{
Info: metainfo.Info{ Info: metainfo.Info{

View File

@ -92,7 +92,6 @@ func TestUnmountWedged(t *testing.T) {
DataDir: filepath.Join(layout.BaseDir, "incomplete"), DataDir: filepath.Join(layout.BaseDir, "incomplete"),
DisableTrackers: true, DisableTrackers: true,
NoDHT: true, NoDHT: true,
ListenAddr: "redonk",
DisableTCP: true, DisableTCP: true,
DisableUTP: true, DisableUTP: true,
}) })