Remove requests from the outbound message queue if cancelled before they're written

Only post peer protocol messages to the channel, bytes must be done directly.
This fixes a possible issue where slow responses during handshake could cause
keep alive messages to be sent prematurely.
This commit is contained in:
Matt Joiner 2014-05-29 01:27:48 +10:00
parent ef69e82765
commit cfc282ff51
3 changed files with 97 additions and 36 deletions

View File

@ -19,7 +19,6 @@ import (
"bufio" "bufio"
"container/list" "container/list"
"crypto/rand" "crypto/rand"
"encoding"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -278,7 +277,7 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent) (err error) {
Choked: true, Choked: true,
PeerChoked: true, PeerChoked: true,
write: make(chan []byte), write: make(chan []byte),
post: make(chan encoding.BinaryMarshaler), post: make(chan pp.Message),
PeerMaxRequests: 250, PeerMaxRequests: 250,
} }
defer func() { defer func() {
@ -289,12 +288,12 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent) (err error) {
conn.Close() conn.Close()
}() }()
go conn.writer() go conn.writer()
go conn.writeOptimizer() // go conn.writeOptimizer()
conn.post <- pp.Bytes(pp.Protocol) conn.write <- pp.Bytes(pp.Protocol)
conn.post <- pp.Bytes("\x00\x00\x00\x00\x00\x00\x00\x00") conn.write <- pp.Bytes("\x00\x00\x00\x00\x00\x00\x00\x00")
if torrent != nil { if torrent != nil {
conn.post <- pp.Bytes(torrent.InfoHash[:]) conn.write <- pp.Bytes(torrent.InfoHash[:])
conn.post <- pp.Bytes(me.PeerId[:]) conn.write <- pp.Bytes(me.PeerId[:])
} }
var b [28]byte var b [28]byte
_, err = io.ReadFull(conn.Socket, b[:]) _, err = io.ReadFull(conn.Socket, b[:])
@ -327,14 +326,15 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent) (err error) {
if torrent == nil { if torrent == nil {
return return
} }
conn.post <- pp.Bytes(torrent.InfoHash[:]) conn.write <- pp.Bytes(torrent.InfoHash[:])
conn.post <- pp.Bytes(me.PeerId[:]) conn.write <- pp.Bytes(me.PeerId[:])
} }
me.mu.Lock() me.mu.Lock()
defer me.mu.Unlock() defer me.mu.Unlock()
if !me.addConnection(torrent, conn) { if !me.addConnection(torrent, conn) {
return return
} }
go conn.writeOptimizer(time.Minute)
if torrent.haveAnyPieces() { if torrent.haveAnyPieces() {
conn.Post(pp.Message{ conn.Post(pp.Message{
Type: pp.Bitfield, Type: pp.Bitfield,

View File

@ -16,7 +16,7 @@ type connection struct {
Socket net.Conn Socket net.Conn
closed bool closed bool
mu sync.Mutex // Only for closing. mu sync.Mutex // Only for closing.
post chan encoding.BinaryMarshaler post chan peer_protocol.Message
write chan []byte write chan []byte
// Stuff controlled by the local peer. // Stuff controlled by the local peer.
@ -58,7 +58,7 @@ func (c *connection) PeerHasPiece(index peer_protocol.Integer) bool {
return c.PeerPieces[index] return c.PeerPieces[index]
} }
func (c *connection) Post(msg encoding.BinaryMarshaler) { func (c *connection) Post(msg peer_protocol.Message) {
c.post <- msg c.post <- msg
} }
@ -166,56 +166,71 @@ var (
keepAliveBytes [4]byte keepAliveBytes [4]byte
) )
// Writes buffers to the socket from the write channel.
func (conn *connection) writer() { func (conn *connection) writer() {
timer := time.NewTimer(0) for b := range conn.write {
defer timer.Stop()
for {
if !timer.Reset(time.Minute) {
<-timer.C
}
var b []byte
select {
case <-timer.C:
b = keepAliveBytes[:]
case b = <-conn.write:
if b == nil {
return
}
}
_, err := conn.Socket.Write(b) _, err := conn.Socket.Write(b)
if conn.getClosed() {
break
}
if err != nil { if err != nil {
log.Print(err) if !conn.getClosed() {
log.Print(err)
}
break break
} }
} }
} }
func (conn *connection) writeOptimizer() { func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
pending := list.New() defer close(conn.write) // Responsible for notifying downstream routines.
var nextWrite []byte pending := list.New() // Message queue.
defer close(conn.write) var nextWrite []byte // Set to nil if we need to need to marshal the next message.
timer := time.NewTimer(keepAliveDelay)
defer timer.Stop()
lastWrite := time.Now()
for { for {
write := conn.write write := conn.write // Set to nil if there's nothing to write.
if pending.Len() == 0 { if pending.Len() == 0 {
write = nil write = nil
} else { } else if nextWrite == nil {
var err error var err error
nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary() nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
event:
select { select {
case <-timer.C:
if pending.Len() != 0 {
break
}
keepAliveTime := lastWrite.Add(keepAliveDelay)
if time.Now().Before(keepAliveTime) {
timer.Reset(keepAliveTime.Sub(time.Now()))
break
}
pending.PushBack(peer_protocol.Message{Keepalive: true})
case msg, ok := <-conn.post: case msg, ok := <-conn.post:
if !ok { if !ok {
return return
} }
if msg.Type == peer_protocol.Cancel {
for e := pending.Back(); e != nil; e = e.Prev() {
elemMsg := e.Value.(peer_protocol.Message)
if elemMsg.Type == peer_protocol.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
pending.Remove(e)
log.Print("optimized cancel! %q", msg)
break event
}
}
}
pending.PushBack(msg) pending.PushBack(msg)
case write <- nextWrite: case write <- nextWrite:
pending.Remove(pending.Front()) pending.Remove(pending.Front())
nextWrite = nil
lastWrite = time.Now()
if pending.Len() == 0 {
timer.Reset(keepAliveDelay)
}
} }
} }
} }

46
connection_test.go Normal file
View File

@ -0,0 +1,46 @@
package torrent
import (
"bitbucket.org/anacrolix/go.torrent/peer_protocol"
"testing"
"time"
)
func TestCancelRequestOptimized(t *testing.T) {
c := &connection{
PeerMaxRequests: 1,
PeerPieces: []bool{false, true},
post: make(chan peer_protocol.Message),
write: make(chan []byte),
}
if len(c.Requests) != 0 {
t.FailNow()
}
// Keepalive timeout of 0 works because I'm just that good.
go c.writeOptimizer(0 * time.Millisecond)
c.Request(newRequest(1, 2, 3))
if len(c.Requests) != 1 {
t.Fatal("request was not posted")
}
// Posting this message should removing the pending Request.
if !c.Cancel(newRequest(1, 2, 3)) {
t.Fatal("request was not found")
}
// Check that the write optimization has filtered out the Request message.
for _, b := range []string{
// The initial request triggers an Interested message.
"\x00\x00\x00\x01\x02",
// Let a keep-alive through to verify there were no pending messages.
"\x00\x00\x00\x00",
} {
bb := string(<-c.write)
if b != bb {
t.Fatalf("received message %q is not expected: %q", bb, b)
}
}
close(c.post)
_, ok := <-c.write
if ok {
t.Fatal("write channel didn't close")
}
}