From efecf88d3ce64f11f2242b9b53a7cf26c72169a2 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sat, 3 Feb 2018 15:09:38 +1100 Subject: [PATCH] Use flip buffering for connection writing --- client.go | 7 ++++--- connection.go | 20 +++++++++----------- connection_test.go | 14 ++++++-------- 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/client.go b/client.go index fbdef7e3..0dd785ab 100644 --- a/client.go +++ b/client.go @@ -2,6 +2,7 @@ package torrent import ( "bufio" + "bytes" "context" "crypto/rand" "errors" @@ -538,7 +539,7 @@ func (cl *Client) initiateConn(peer Peer, t *Torrent) { func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) { d := net.Dialer{ - // LocalAddr: cl.tcpListener.Addr(), + // LocalAddr: cl.tcpListener.Addr(), } c, err = d.DialContext(ctx, "tcp", addr) countDialResult(err) @@ -1232,11 +1233,11 @@ func (cl *Client) banPeerIP(ip net.IP) { func (cl *Client) newConnection(nc net.Conn) (c *connection) { c = &connection{ - conn: nc, - + conn: nc, Choked: true, PeerChoked: true, PeerMaxRequests: 250, + writeBuffer: new(bytes.Buffer), } c.writerCond.L = &cl.mu c.setRW(connStatsReadWriter{nc, &cl.mu, c}) diff --git a/connection.go b/connection.go index 73b1f4ce..5c1a6c84 100644 --- a/connection.go +++ b/connection.go @@ -93,7 +93,7 @@ type connection struct { pieceInclination []int pieceRequestOrder prioritybitmap.PriorityBitmap - writeBuffer bytes.Buffer + writeBuffer *bytes.Buffer uploadTimer *time.Timer writerCond sync.Cond } @@ -404,10 +404,9 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) { // connection is writable. func (cn *connection) writer(keepAliveTimeout time.Duration) { var ( - // buf bytes.Buffer - lastWrite time.Time = time.Now() + lastWrite time.Time = time.Now() + keepAliveTimer *time.Timer ) - var keepAliveTimer *time.Timer keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() { cn.mu().Lock() defer cn.mu().Unlock() @@ -420,6 +419,7 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) { defer cn.mu().Unlock() defer cn.Close() defer keepAliveTimer.Stop() + frontBuf := new(bytes.Buffer) for { if cn.closed.IsSet() { return @@ -440,12 +440,10 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) { cn.writerCond.Wait() continue } - var buf bytes.Buffer - buf.Write(cn.writeBuffer.Bytes()) - cn.writeBuffer.Reset() + // Flip the buffers. + frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf cn.mu().Unlock() - // log.Printf("writing %d bytes", buf.Len()) - n, err := cn.w.Write(buf.Bytes()) + n, err := cn.w.Write(frontBuf.Bytes()) cn.mu().Lock() if n != 0 { lastWrite = time.Now() @@ -454,10 +452,10 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) { if err != nil { return } - if n != buf.Len() { + if n != frontBuf.Len() { panic("short write") } - buf.Reset() + frontBuf.Reset() } } diff --git a/connection_test.go b/connection_test.go index 742d8a67..4d4c7465 100644 --- a/connection_test.go +++ b/connection_test.go @@ -20,14 +20,12 @@ import ( // Have that would potentially alter it. func TestSendBitfieldThenHave(t *testing.T) { r, w := io.Pipe() - c := &connection{ - t: &Torrent{ - cl: &Client{}, - }, - r: r, - w: w, - } - c.writerCond.L = &c.t.cl.mu + var cl Client + cl.initLogger() + c := cl.newConnection(nil) + c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil)) + c.r = r + c.w = w go c.writer(time.Minute) c.mu().Lock() c.Bitfield([]bool{false, true, false})