From 203da0aab0e1e758acd7fa750d7ecb4d4eb2e4a9 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 13 Mar 2015 06:21:13 +1100 Subject: [PATCH] Tidy up extension bytes handling; don't close conn from handshake writer; force protocol encryption for now --- client.go | 66 ++++++++++++++++++++++++++++++++++++++------------- connection.go | 24 +++++++++++++------ torrent.go | 2 +- 3 files changed, 68 insertions(+), 24 deletions(-) diff --git a/client.go b/client.go index 177a33a6..396205ae 100644 --- a/client.go +++ b/client.go @@ -38,6 +38,8 @@ import ( "syscall" "time" + "bitbucket.org/anacrolix/go.torrent/mse" + "bitbucket.org/anacrolix/go.torrent/data" filePkg "bitbucket.org/anacrolix/go.torrent/data/file" "bitbucket.org/anacrolix/go.torrent/dht" @@ -77,8 +79,8 @@ const ( // // Extension protocol: http://www.bittorrent.org/beps/bep_0010.html // DHT: http://www.bittorrent.org/beps/bep_0005.html - // Fast Extension: http://bittorrent.org/beps/bep_0006.html - extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x05" + // Fast Extension: http://bittorrent.org/beps/bep_0006.html ([7]|=4) + defaultExtensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x05" socketsPerTorrent = 40 torrentPeersHighWater = 200 @@ -87,7 +89,7 @@ const ( // Limit how long handshake can take. This is to reduce the lingering // impact of a few bad apples. 4s loses 1% of successful handshakes that // are obtained with 60s timeout, and 5% of unsuccessful handshakes. - handshakeTimeout = 4 * time.Second + handshakeTimeout = 45 * time.Second pruneInterval = 10 * time.Second ) @@ -128,6 +130,7 @@ type Client struct { _configDir string config Config pruneTimer *time.Timer + extensionBytes peerExtensionBytes torrentDataOpener TorrentDataOpener @@ -469,6 +472,7 @@ func NewClient(cfg *Config) (cl *Client, err error) { quit: make(chan struct{}), torrents: make(map[InfoHash]*torrent), } + CopyExact(&cl.extensionBytes, defaultExtensionBytes) cl.event.L = &cl.mu if cfg.TorrentDataOpener != nil { cl.torrentDataOpener = cfg.TorrentDataOpener @@ -777,12 +781,11 @@ func addrCompactIP(addr net.Addr) (string, error) { return string(ip.To16()), nil } -func handshakeWriter(w io.WriteCloser, bb <-chan []byte, done chan<- error) { +func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) { var err error for b := range bb { _, err = w.Write(b) if err != nil { - w.Close() break } } @@ -794,6 +797,18 @@ type ( peerID [20]byte ) +func (me *peerExtensionBytes) SupportsExtended() bool { + return me[5]&0x10 != 0 +} + +func (me *peerExtensionBytes) SupportsDHT() bool { + return me[7]&0x01 != 0 +} + +func (me *peerExtensionBytes) SupportsFast() bool { + return me[7]&0x04 != 0 +} + type handshakeResult struct { peerExtensionBytes peerID @@ -804,7 +819,7 @@ type handshakeResult struct { // peer initiated the connection. Returns ok if the handshake was successful, // and err if there was an unexpected condition other than the peer simply // abandoning the handshake. -func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res handshakeResult, ok bool, err error) { +func handshake(sock io.ReadWriter, ih *InfoHash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) { // Bytes to be sent to the peer. Should never block the sender. postCh := make(chan []byte, 4) // A single error value sent when the writer completes. @@ -836,7 +851,7 @@ func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res hand } post([]byte(pp.Protocol)) - post([]byte(extensionBytes)) + post(extensions[:]) if ih != nil { // We already know what we want. post(ih[:]) post(peerID[:]) @@ -907,13 +922,28 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS me.mu.Lock() me.handshaking++ me.mu.Unlock() - hsRes, ok, err := handshake(sock, func() *InfoHash { + var rw io.ReadWriter = sock + if torrent == nil { + rw, err = mse.ReceiveHandshake(sock, func() (ret [][]byte) { + for ih := range me.torrents { + ret = append(ret, ih[:]) + } + return + }()) + } else { + rw, err = mse.InitiateHandshake(sock, torrent.InfoHash[:]) + } + if err != nil { + err = fmt.Errorf("error during MSE handshake: %s", err) + return + } + hsRes, ok, err := handshake(rw, func() *InfoHash { if torrent == nil { return nil } else { return &torrent.InfoHash } - }(), me.peerID) + }(), me.peerID, me.extensionBytes) me.mu.Lock() defer me.mu.Unlock() if me.handshaking == 0 { @@ -936,13 +966,13 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS } sock.SetWriteDeadline(time.Time{}) sock = peerConn{sock} - conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP) + conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP, rw) defer conn.Close() conn.Discovery = discovery if !me.addConnection(torrent, conn) { return } - if conn.PeerExtensionBytes[5]&0x10 != 0 { + if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() { conn.Post(pp.Message{ Type: pp.Extended, ExtendedID: pp.HandshakeExtendedID, @@ -969,7 +999,7 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS if p := me.incomingPeerPort(); p != 0 { d["p"] = p } - yourip, err := addrCompactIP(conn.Socket.RemoteAddr()) + yourip, err := addrCompactIP(conn.remoteAddr()) if err != nil { log.Printf("error calculating yourip field value in extension handshake: %s", err) } else { @@ -989,8 +1019,12 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS Type: pp.Bitfield, Bitfield: torrent.bitfield(), }) + } else if me.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() { + conn.Post(pp.Message{ + Type: pp.HaveNone, + }) } - if conn.PeerExtensionBytes[7]&0x01 != 0 && me.dHT != nil { + if conn.PeerExtensionBytes.SupportsDHT() && me.extensionBytes.SupportsDHT() && me.dHT != nil { conn.Post(pp.Message{ Type: pp.Port, Port: uint16(AddrPort(me.dHT.LocalAddr())), @@ -998,7 +1032,6 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS } if torrent.haveInfo() { torrent.initRequestOrdering(conn) - me.replenishConnRequests(torrent, conn) } err = me.connectionLoop(torrent, conn) if err != nil { @@ -1189,7 +1222,7 @@ func (cl *Client) peerHasAll(t *torrent, cn *connection) { // and exit. func (me *Client) connectionLoop(t *torrent, c *connection) error { decoder := pp.Decoder{ - R: bufio.NewReader(c.Socket), + R: bufio.NewReader(c.rw), MaxLength: 256 * 1024, } for { @@ -1222,6 +1255,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { me.replenishConnRequests(t, c) case pp.Reject: me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length)) + me.replenishConnRequests(t, c) case pp.Unchoke: c.PeerChoked = false me.peerUnchoked(t, c) @@ -1409,7 +1443,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { if me.dHT == nil { break } - pingAddr, err := net.ResolveUDPAddr("", c.Socket.RemoteAddr().String()) + pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String()) if err != nil { panic(err) } diff --git a/connection.go b/connection.go index afb12c4b..75a1dba3 100644 --- a/connection.go +++ b/connection.go @@ -28,7 +28,8 @@ const ( // Maintains the state of a connection with a peer. type connection struct { - Socket net.Conn + conn net.Conn + rw io.ReadWriter // The real slim shady Discovery peerSource uTP bool closing chan struct{} @@ -68,10 +69,11 @@ type connection struct { PeerClientName string } -func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool) (c *connection) { +func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool, rw io.ReadWriter) (c *connection) { c = &connection{ - Socket: sock, - uTP: uTP, + conn: sock, + rw: rw, + uTP: uTP, Choked: true, PeerChoked: true, @@ -90,6 +92,14 @@ func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP b return } +func (cn *connection) remoteAddr() net.Addr { + return cn.conn.RemoteAddr() +} + +func (cn *connection) localAddr() net.Addr { + return cn.conn.LocalAddr() +} + func (cn *connection) pendPiece(piece int, priority piecePriority) { if priority == piecePriorityNone { cn.pieceRequestOrder.DeletePiece(piece) @@ -184,7 +194,7 @@ func eventAgeString(t time.Time) string { func (cn *connection) WriteStatus(w io.Writer, t *torrent) { // \t isn't preserved in
 blocks?
-	fmt.Fprintf(w, "%s\n    %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(t), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
+	fmt.Fprintf(w, "%s\n    %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.localAddr(), cn.remoteAddr()), cn.completedString(t), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
 	c := func(b byte) {
 		fmt.Fprintf(w, "%c", b)
 	}
@@ -224,7 +234,7 @@ func (c *connection) Close() {
 	}
 	close(c.closing)
 	// TODO: This call blocks sometimes, why?
-	go c.Socket.Close()
+	go c.conn.Close()
 }
 
 func (c *connection) PeerHasPiece(piece int) bool {
@@ -347,7 +357,7 @@ func (c *connection) SetInterested(interested bool) {
 // Writes buffers to the socket from the write channel.
 func (conn *connection) writer() {
 	// Reduce write syscalls.
-	buf := bufio.NewWriterSize(conn.Socket, 0x8000) // 32 KiB
+	buf := bufio.NewWriterSize(conn.rw, 0x8000) // 32 KiB
 	// Receives when buf is not empty.
 	notEmpty := make(chan struct{}, 1)
 	for {
diff --git a/torrent.go b/torrent.go
index 1f5660ab..9818f131 100644
--- a/torrent.go
+++ b/torrent.go
@@ -200,7 +200,7 @@ func (t *torrent) addrActive(addr string) bool {
 		return true
 	}
 	for _, c := range t.Conns {
-		if c.Socket.RemoteAddr().String() == addr {
+		if c.remoteAddr().String() == addr {
 			return true
 		}
 	}