Add connection read stats

This commit is contained in:
Matt Joiner 2016-07-12 16:42:04 +10:00
parent a4e140b939
commit fbe0ded844
5 changed files with 93 additions and 22 deletions

View File

@ -435,7 +435,7 @@ func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
if tc, ok := nc.(*net.TCPConn); ok {
tc.SetLinger(0)
}
c := cl.newConnection(nc)
c := newConnection(nc, &cl.mu)
c.Discovery = peerSourceIncoming
c.uTP = utp
cl.runReceivedConn(c)
@ -575,7 +575,7 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
// Performs initiator handshakes and returns a connection. Returns nil
// *connection if no connection for valid reasons.
func (cl *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
c = cl.newConnection(nc)
c = newConnection(nc, &cl.mu)
c.encrypted = encrypted
c.uTP = utp
err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
@ -1153,6 +1153,7 @@ func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
if err != nil {
return err
}
c.readMsg(&msg)
c.lastMessageReceived = time.Now()
if msg.Keepalive {
receivedKeepalives.Add(1)

View File

@ -385,10 +385,15 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
}
assertReadAllGreeting(t, r)
// After one read through, we can assume certain torrent statistics.
assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesSent)
assert.EqualValues(t, 8, seederTorrent.Stats().ChunksSent)
// This is not a strict requirement. It is however interesting to follow.
assert.EqualValues(t, 261, seederTorrent.Stats().BytesSent)
assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
// These are not a strict requirement. It is however interesting to
// follow.
t.Logf("%#v", seederTorrent.Stats())
assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
// Read through again for the cases where the torrent data size exceeds
// the size of the cache.
assertReadAllGreeting(t, r)

View File

@ -1,23 +1,66 @@
package torrent
import (
"io"
"sync"
pp "github.com/anacrolix/torrent/peer_protocol"
)
type ConnStats struct {
ChunksSent int64 // Num piece messages sent.
BytesSent int64 // Total bytes sent.
DataBytesSent int64 // Data-only bytes sent.
// Torrent "piece" messages, or data chunks.
ChunksWritten int64 // Num piece messages sent.
ChunksRead int64
// Total bytes on the wire. Includes handshakes and encryption.
BytesWritten int64
BytesRead int64
// Data bytes, actual torrent data.
DataBytesWritten int64
DataBytesRead int64
}
func (cs *ConnStats) wroteMsg(msg pp.Message) {
func (cs *ConnStats) wroteMsg(msg *pp.Message) {
switch msg.Type {
case pp.Piece:
cs.ChunksSent++
cs.DataBytesSent += int64(len(msg.Piece))
cs.ChunksWritten++
cs.DataBytesWritten += int64(len(msg.Piece))
}
}
func (cs *ConnStats) wroteBytes(b []byte) {
cs.BytesSent += int64(len(b))
func (cs *ConnStats) readMsg(msg *pp.Message) {
switch msg.Type {
case pp.Piece:
cs.ChunksRead++
cs.DataBytesRead += int64(len(msg.Piece))
}
}
func (cs *ConnStats) wroteBytes(n int64) {
cs.BytesWritten += n
}
func (cs *ConnStats) readBytes(n int64) {
cs.BytesRead += n
}
type connStatsReadWriter struct {
rw io.ReadWriter
l sync.Locker
c *connection
}
func (me connStatsReadWriter) Write(b []byte) (n int, err error) {
n, err = me.rw.Write(b)
me.l.Lock()
me.c.wroteBytes(int64(n))
me.l.Unlock()
return
}
func (me connStatsReadWriter) Read(b []byte) (n int, err error) {
n, err = me.rw.Read(b)
me.l.Lock()
me.c.readBytes(int64(n))
me.l.Unlock()
return
}

View File

@ -100,15 +100,15 @@ func (cn *connection) mu() sync.Locker {
return &cn.t.cl.mu
}
func (cl *Client) newConnection(nc net.Conn) (c *connection) {
func newConnection(nc net.Conn, l sync.Locker) (c *connection) {
c = &connection{
conn: nc,
rw: nc,
Choked: true,
PeerChoked: true,
PeerMaxRequests: 250,
}
c.rw = connStatsReadWriter{nc, l, c}
return
}
@ -427,8 +427,7 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) {
panic("short write")
}
cn.mu().Lock()
cn.wroteMsg(msg)
cn.wroteBytes(b)
cn.wroteMsg(&msg)
}
cn.outgoingUnbufferedMessagesNotEmpty.Clear()
cn.mu().Unlock()
@ -645,14 +644,28 @@ func (c *connection) requestPendingMetadata() {
}
}
func (cn *connection) wroteMsg(msg pp.Message) {
func (cn *connection) wroteMsg(msg *pp.Message) {
cn.stats.wroteMsg(msg)
cn.t.stats.wroteMsg(msg)
}
func (cn *connection) wroteBytes(b []byte) {
cn.stats.wroteBytes(b)
cn.t.stats.wroteBytes(b)
func (cn *connection) readMsg(msg *pp.Message) {
cn.stats.readMsg(msg)
cn.t.stats.readMsg(msg)
}
func (cn *connection) wroteBytes(n int64) {
cn.stats.wroteBytes(n)
if cn.t != nil {
cn.t.stats.wroteBytes(n)
}
}
func (cn *connection) readBytes(n int64) {
cn.stats.readBytes(n)
if cn.t != nil {
cn.t.stats.readBytes(n)
}
}
// Returns whether the connection is currently useful to us. We're seeding and

View File

@ -1267,6 +1267,8 @@ func (t *Torrent) addPeers(peers []Peer) {
}
func (t *Torrent) Stats() TorrentStats {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
return t.stats
}
@ -1300,6 +1302,13 @@ func (t *Torrent) addConnection(c *connection) bool {
panic(len(t.conns))
}
t.conns = append(t.conns, c)
if c.t != nil {
panic("connection already associated with a torrent")
}
// Reconcile bytes transferred before connection was associated with a
// torrent.
t.stats.wroteBytes(c.stats.BytesWritten)
t.stats.readBytes(c.stats.BytesRead)
c.t = t
return true
}