missinggo.Event changed, connection.writeOptimizer changes
This commit is contained in:
parent
d5d3d9f623
commit
0fd73396fd
|
@ -881,8 +881,7 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer cl.dropConnection(t, c)
|
defer cl.dropConnection(t, c)
|
||||||
go c.writer()
|
go c.writer(time.Minute)
|
||||||
go c.writeOptimizer(time.Minute)
|
|
||||||
cl.sendInitialMessages(c, t)
|
cl.sendInitialMessages(c, t)
|
||||||
err = cl.connectionLoop(t, c)
|
err = cl.connectionLoop(t, c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -93,7 +93,9 @@ func TestTorrentInitialState(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, tor.pieces, 3)
|
require.Len(t, tor.pieces, 3)
|
||||||
tor.pendAllChunkSpecs(0)
|
tor.pendAllChunkSpecs(0)
|
||||||
|
tor.cl.mu.Lock()
|
||||||
assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
|
assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
|
||||||
|
tor.cl.mu.Unlock()
|
||||||
assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
|
assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
167
connection.go
167
connection.go
|
@ -4,13 +4,13 @@ import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"container/list"
|
"container/list"
|
||||||
"encoding"
|
|
||||||
"errors"
|
"errors"
|
||||||
"expvar"
|
"expvar"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/anacrolix/missinggo"
|
"github.com/anacrolix/missinggo"
|
||||||
|
@ -42,8 +42,6 @@ type connection struct {
|
||||||
Discovery peerSource
|
Discovery peerSource
|
||||||
uTP bool
|
uTP bool
|
||||||
closed missinggo.Event
|
closed missinggo.Event
|
||||||
post chan pp.Message
|
|
||||||
writeCh chan []byte
|
|
||||||
|
|
||||||
UnwantedChunksReceived int
|
UnwantedChunksReceived int
|
||||||
UsefulChunksReceived int
|
UsefulChunksReceived int
|
||||||
|
@ -88,6 +86,13 @@ type connection struct {
|
||||||
|
|
||||||
pieceInclination []int
|
pieceInclination []int
|
||||||
pieceRequestOrder prioritybitmap.PriorityBitmap
|
pieceRequestOrder prioritybitmap.PriorityBitmap
|
||||||
|
|
||||||
|
outgoingUnbufferedMessages *list.List
|
||||||
|
outgoingUnbufferedMessagesNotEmpty missinggo.Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cn *connection) mu() sync.Locker {
|
||||||
|
return &cn.t.cl.mu
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConnection() (c *connection) {
|
func newConnection() (c *connection) {
|
||||||
|
@ -95,9 +100,6 @@ func newConnection() (c *connection) {
|
||||||
Choked: true,
|
Choked: true,
|
||||||
PeerChoked: true,
|
PeerChoked: true,
|
||||||
PeerMaxRequests: 250,
|
PeerMaxRequests: 250,
|
||||||
|
|
||||||
writeCh: make(chan []byte),
|
|
||||||
post: make(chan pp.Message),
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -221,12 +223,24 @@ func (cn *connection) PeerHasPiece(piece int) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cn *connection) Post(msg pp.Message) {
|
func (cn *connection) Post(msg pp.Message) {
|
||||||
select {
|
switch msg.Type {
|
||||||
case cn.post <- msg:
|
case pp.Cancel:
|
||||||
postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
|
for e := cn.outgoingUnbufferedMessages.Back(); e != nil; e = e.Prev() {
|
||||||
case <-cn.closed.C():
|
elemMsg := e.Value.(pp.Message)
|
||||||
|
if elemMsg.Type == pp.Request && elemMsg.Index == msg.Index && elemMsg.Begin == msg.Begin && elemMsg.Length == msg.Length {
|
||||||
|
cn.outgoingUnbufferedMessages.Remove(e)
|
||||||
|
optimizedCancels.Add(1)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if cn.outgoingUnbufferedMessages == nil {
|
||||||
|
cn.outgoingUnbufferedMessages = list.New()
|
||||||
|
}
|
||||||
|
cn.outgoingUnbufferedMessages.PushBack(msg)
|
||||||
|
cn.outgoingUnbufferedMessagesNotEmpty.Set()
|
||||||
|
postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
|
||||||
|
}
|
||||||
|
|
||||||
func (cn *connection) RequestPending(r request) bool {
|
func (cn *connection) RequestPending(r request) bool {
|
||||||
_, ok := cn.Requests[r]
|
_, ok := cn.Requests[r]
|
||||||
|
@ -305,10 +319,7 @@ func (cn *connection) Request(chunk request) bool {
|
||||||
|
|
||||||
// Returns true if an unsatisfied request was canceled.
|
// Returns true if an unsatisfied request was canceled.
|
||||||
func (cn *connection) Cancel(r request) bool {
|
func (cn *connection) Cancel(r request) bool {
|
||||||
if cn.Requests == nil {
|
if !cn.RequestPending(r) {
|
||||||
return false
|
|
||||||
}
|
|
||||||
if _, ok := cn.Requests[r]; !ok {
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
delete(cn.Requests, r)
|
delete(cn.Requests, r)
|
||||||
|
@ -378,111 +389,53 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Writes buffers to the socket from the write channel.
|
// Writes buffers to the socket from the write channel.
|
||||||
func (cn *connection) writer() {
|
func (cn *connection) writer(keepAliveTimeout time.Duration) {
|
||||||
defer func() {
|
defer func() {
|
||||||
cn.t.cl.mu.Lock()
|
cn.mu().Lock()
|
||||||
defer cn.t.cl.mu.Unlock()
|
defer cn.mu().Unlock()
|
||||||
cn.Close()
|
cn.Close()
|
||||||
}()
|
}()
|
||||||
// Reduce write syscalls.
|
// Reduce write syscalls.
|
||||||
buf := bufio.NewWriter(cn.rw)
|
buf := bufio.NewWriter(cn.rw)
|
||||||
|
keepAliveTimer := time.NewTimer(keepAliveTimeout)
|
||||||
for {
|
for {
|
||||||
if buf.Buffered() == 0 {
|
cn.mu().Lock()
|
||||||
// There's nothing to write, so block until we get something.
|
for cn.outgoingUnbufferedMessages.Len() != 0 {
|
||||||
select {
|
msg := cn.outgoingUnbufferedMessages.Remove(cn.outgoingUnbufferedMessages.Front()).(pp.Message)
|
||||||
case b, ok := <-cn.writeCh:
|
cn.mu().Unlock()
|
||||||
if !ok {
|
b, err := msg.MarshalBinary()
|
||||||
return
|
|
||||||
}
|
|
||||||
connectionWriterWrite.Add(1)
|
|
||||||
_, err := buf.Write(b)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-cn.closed.C():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// We already have something to write, so flush if there's nothing
|
|
||||||
// more to write.
|
|
||||||
select {
|
|
||||||
case b, ok := <-cn.writeCh:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
connectionWriterWrite.Add(1)
|
|
||||||
_, err := buf.Write(b)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-cn.closed.C():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
connectionWriterFlush.Add(1)
|
|
||||||
err := buf.Flush()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cn *connection) writeOptimizer(keepAliveDelay time.Duration) {
|
|
||||||
defer close(cn.writeCh) // Responsible for notifying downstream routines.
|
|
||||||
pending := list.New() // Message queue.
|
|
||||||
var nextWrite []byte // Set to nil if we need to need to marshal the next message.
|
|
||||||
timer := time.NewTimer(keepAliveDelay)
|
|
||||||
defer timer.Stop()
|
|
||||||
lastWrite := time.Now()
|
|
||||||
for {
|
|
||||||
write := cn.writeCh // Set to nil if there's nothing to write.
|
|
||||||
if pending.Len() == 0 {
|
|
||||||
write = nil
|
|
||||||
} else if nextWrite == nil {
|
|
||||||
var err error
|
|
||||||
nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
connectionWriterWrite.Add(1)
|
||||||
|
n, err := buf.Write(b)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
keepAliveTimer.Reset(keepAliveTimeout)
|
||||||
|
if n != len(b) {
|
||||||
|
panic("short write")
|
||||||
|
}
|
||||||
|
cn.mu().Lock()
|
||||||
|
}
|
||||||
|
cn.outgoingUnbufferedMessagesNotEmpty.Clear()
|
||||||
|
cn.mu().Unlock()
|
||||||
|
connectionWriterFlush.Add(1)
|
||||||
|
if buf.Buffered() != 0 {
|
||||||
|
if buf.Flush() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
keepAliveTimer.Reset(keepAliveTimeout)
|
||||||
}
|
}
|
||||||
event:
|
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-cn.closed.LockedChan(cn.mu()):
|
||||||
if pending.Len() != 0 {
|
return
|
||||||
break
|
case <-cn.outgoingUnbufferedMessagesNotEmpty.LockedChan(cn.mu()):
|
||||||
}
|
case <-keepAliveTimer.C:
|
||||||
keepAliveTime := lastWrite.Add(keepAliveDelay)
|
cn.mu().Lock()
|
||||||
if time.Now().Before(keepAliveTime) {
|
cn.Post(pp.Message{Keepalive: true})
|
||||||
timer.Reset(keepAliveTime.Sub(time.Now()))
|
cn.mu().Unlock()
|
||||||
break
|
|
||||||
}
|
|
||||||
pending.PushBack(pp.Message{Keepalive: true})
|
|
||||||
postedKeepalives.Add(1)
|
postedKeepalives.Add(1)
|
||||||
case msg, ok := <-cn.post:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if msg.Type == pp.Cancel {
|
|
||||||
for e := pending.Back(); e != nil; e = e.Prev() {
|
|
||||||
elemMsg := e.Value.(pp.Message)
|
|
||||||
if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
|
|
||||||
pending.Remove(e)
|
|
||||||
optimizedCancels.Add(1)
|
|
||||||
break event
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pending.PushBack(msg)
|
|
||||||
case write <- nextWrite:
|
|
||||||
pending.Remove(pending.Front())
|
|
||||||
nextWrite = nil
|
|
||||||
lastWrite = time.Now()
|
|
||||||
if pending.Len() == 0 {
|
|
||||||
timer.Reset(keepAliveDelay)
|
|
||||||
}
|
|
||||||
case <-cn.closed.C():
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,16 +1,19 @@
|
||||||
package torrent
|
package torrent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/anacrolix/missinggo/bitmap"
|
"github.com/anacrolix/missinggo/bitmap"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/anacrolix/torrent/peer_protocol"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCancelRequestOptimized(t *testing.T) {
|
func TestCancelRequestOptimized(t *testing.T) {
|
||||||
|
r, w := io.Pipe()
|
||||||
c := &connection{
|
c := &connection{
|
||||||
PeerMaxRequests: 1,
|
PeerMaxRequests: 1,
|
||||||
peerPieces: func() bitmap.Bitmap {
|
peerPieces: func() bitmap.Bitmap {
|
||||||
|
@ -18,38 +21,39 @@ func TestCancelRequestOptimized(t *testing.T) {
|
||||||
bm.Set(1, true)
|
bm.Set(1, true)
|
||||||
return bm
|
return bm
|
||||||
}(),
|
}(),
|
||||||
post: make(chan peer_protocol.Message),
|
rw: struct {
|
||||||
writeCh: make(chan []byte),
|
io.Reader
|
||||||
|
io.Writer
|
||||||
|
}{
|
||||||
|
Writer: w,
|
||||||
|
},
|
||||||
|
conn: new(net.TCPConn),
|
||||||
|
// For the locks
|
||||||
|
t: &Torrent{cl: &Client{}},
|
||||||
}
|
}
|
||||||
assert.Len(t, c.Requests, 0)
|
assert.Len(t, c.Requests, 0)
|
||||||
// Keepalive timeout of 0 works because I'm just that good.
|
|
||||||
go c.writeOptimizer(0 * time.Millisecond)
|
|
||||||
c.Request(newRequest(1, 2, 3))
|
c.Request(newRequest(1, 2, 3))
|
||||||
if len(c.Requests) != 1 {
|
require.Len(t, c.Requests, 1)
|
||||||
t.Fatal("request was not posted")
|
|
||||||
}
|
|
||||||
// Posting this message should removing the pending Request.
|
// Posting this message should removing the pending Request.
|
||||||
if !c.Cancel(newRequest(1, 2, 3)) {
|
require.True(t, c.Cancel(newRequest(1, 2, 3)))
|
||||||
t.Fatal("request was not found")
|
assert.Len(t, c.Requests, 0)
|
||||||
}
|
// Check that write optimization filters out the Request, due to the
|
||||||
// Check that the write optimization has filtered out the Request message.
|
// Cancel. We should have received an Interested, due to the initial
|
||||||
for _, b := range []string{
|
// request, and then keep-alives until we close the connection.
|
||||||
// The initial request triggers an Interested message.
|
go c.writer(0)
|
||||||
"\x00\x00\x00\x01\x02",
|
b := make([]byte, 9)
|
||||||
// Let a keep-alive through to verify there were no pending messages.
|
n, err := io.ReadFull(r, b)
|
||||||
"\x00\x00\x00\x00",
|
require.NoError(t, err)
|
||||||
} {
|
require.EqualValues(t, len(b), n)
|
||||||
bb := string(<-c.writeCh)
|
require.EqualValues(t, "\x00\x00\x00\x01\x02"+"\x00\x00\x00\x00", string(b))
|
||||||
if b != bb {
|
time.Sleep(time.Millisecond)
|
||||||
t.Fatalf("received message %q is not expected: %q", bb, b)
|
c.mu().Lock()
|
||||||
}
|
c.Close()
|
||||||
}
|
c.mu().Unlock()
|
||||||
close(c.post)
|
w.Close()
|
||||||
// Drain the write channel until it closes.
|
b, err = ioutil.ReadAll(r)
|
||||||
for b := range c.writeCh {
|
require.NoError(t, err)
|
||||||
bs := string(b)
|
// A single keep-alive will have gone through, as writer would be stuck
|
||||||
if bs != "\x00\x00\x00\x00" {
|
// trying to flush it, and then promptly close.
|
||||||
t.Fatal("got unexpected non-keepalive")
|
require.EqualValues(t, "\x00\x00\x00\x00", string(b))
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -585,7 +585,7 @@ func (s *Server) bootstrap() (err error) {
|
||||||
}()
|
}()
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
select {
|
select {
|
||||||
case <-s.closed.C():
|
case <-s.closed.LockedChan(&s.mu):
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
return
|
return
|
||||||
case <-time.After(15 * time.Second):
|
case <-time.After(15 * time.Second):
|
||||||
|
|
Loading…
Reference in New Issue