Use flip buffering for connection writing

This commit is contained in:
Matt Joiner 2018-02-03 15:09:38 +11:00
parent 86aabb081c
commit efecf88d3c
3 changed files with 19 additions and 22 deletions

View File

@ -2,6 +2,7 @@ package torrent
import ( import (
"bufio" "bufio"
"bytes"
"context" "context"
"crypto/rand" "crypto/rand"
"errors" "errors"
@ -538,7 +539,7 @@ func (cl *Client) initiateConn(peer Peer, t *Torrent) {
func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) { func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) {
d := net.Dialer{ d := net.Dialer{
// LocalAddr: cl.tcpListener.Addr(), // LocalAddr: cl.tcpListener.Addr(),
} }
c, err = d.DialContext(ctx, "tcp", addr) c, err = d.DialContext(ctx, "tcp", addr)
countDialResult(err) countDialResult(err)
@ -1232,11 +1233,11 @@ func (cl *Client) banPeerIP(ip net.IP) {
func (cl *Client) newConnection(nc net.Conn) (c *connection) { func (cl *Client) newConnection(nc net.Conn) (c *connection) {
c = &connection{ c = &connection{
conn: nc, conn: nc,
Choked: true, Choked: true,
PeerChoked: true, PeerChoked: true,
PeerMaxRequests: 250, PeerMaxRequests: 250,
writeBuffer: new(bytes.Buffer),
} }
c.writerCond.L = &cl.mu c.writerCond.L = &cl.mu
c.setRW(connStatsReadWriter{nc, &cl.mu, c}) c.setRW(connStatsReadWriter{nc, &cl.mu, c})

View File

@ -93,7 +93,7 @@ type connection struct {
pieceInclination []int pieceInclination []int
pieceRequestOrder prioritybitmap.PriorityBitmap pieceRequestOrder prioritybitmap.PriorityBitmap
writeBuffer bytes.Buffer writeBuffer *bytes.Buffer
uploadTimer *time.Timer uploadTimer *time.Timer
writerCond sync.Cond writerCond sync.Cond
} }
@ -404,10 +404,9 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
// connection is writable. // connection is writable.
func (cn *connection) writer(keepAliveTimeout time.Duration) { func (cn *connection) writer(keepAliveTimeout time.Duration) {
var ( var (
// buf bytes.Buffer lastWrite time.Time = time.Now()
lastWrite time.Time = time.Now() keepAliveTimer *time.Timer
) )
var keepAliveTimer *time.Timer
keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() { keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
cn.mu().Lock() cn.mu().Lock()
defer cn.mu().Unlock() defer cn.mu().Unlock()
@ -420,6 +419,7 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) {
defer cn.mu().Unlock() defer cn.mu().Unlock()
defer cn.Close() defer cn.Close()
defer keepAliveTimer.Stop() defer keepAliveTimer.Stop()
frontBuf := new(bytes.Buffer)
for { for {
if cn.closed.IsSet() { if cn.closed.IsSet() {
return return
@ -440,12 +440,10 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) {
cn.writerCond.Wait() cn.writerCond.Wait()
continue continue
} }
var buf bytes.Buffer // Flip the buffers.
buf.Write(cn.writeBuffer.Bytes()) frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
cn.writeBuffer.Reset()
cn.mu().Unlock() cn.mu().Unlock()
// log.Printf("writing %d bytes", buf.Len()) n, err := cn.w.Write(frontBuf.Bytes())
n, err := cn.w.Write(buf.Bytes())
cn.mu().Lock() cn.mu().Lock()
if n != 0 { if n != 0 {
lastWrite = time.Now() lastWrite = time.Now()
@ -454,10 +452,10 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) {
if err != nil { if err != nil {
return return
} }
if n != buf.Len() { if n != frontBuf.Len() {
panic("short write") panic("short write")
} }
buf.Reset() frontBuf.Reset()
} }
} }

View File

@ -20,14 +20,12 @@ import (
// Have that would potentially alter it. // Have that would potentially alter it.
func TestSendBitfieldThenHave(t *testing.T) { func TestSendBitfieldThenHave(t *testing.T) {
r, w := io.Pipe() r, w := io.Pipe()
c := &connection{ var cl Client
t: &Torrent{ cl.initLogger()
cl: &Client{}, c := cl.newConnection(nil)
}, c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
r: r, c.r = r
w: w, c.w = w
}
c.writerCond.L = &c.t.cl.mu
go c.writer(time.Minute) go c.writer(time.Minute)
c.mu().Lock() c.mu().Lock()
c.Bitfield([]bool{false, true, false}) c.Bitfield([]bool{false, true, false})