2014-04-03 20:16:59 +08:00
package torrent
import (
"container/list"
"encoding"
2014-07-16 15:09:30 +08:00
"errors"
2014-08-24 01:04:07 +08:00
"expvar"
2014-06-26 15:29:12 +08:00
"fmt"
"io"
2014-04-03 20:16:59 +08:00
"net"
"sync"
"time"
2014-08-21 16:12:49 +08:00
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
2014-04-03 20:16:59 +08:00
)
2014-08-24 01:04:07 +08:00
var optimizedCancels = expvar . NewInt ( "optimizedCancels" )
2014-07-16 15:06:18 +08:00
type peerSource byte
const (
peerSourceIncoming = 'I'
peerSourceDHT = 'H'
peerSourcePEX = 'X'
)
2014-04-03 20:16:59 +08:00
// Maintains the state of a connection with a peer.
2014-04-09 00:36:05 +08:00
type connection struct {
2014-07-16 15:06:18 +08:00
Socket net . Conn
Discovery peerSource
2014-08-28 07:31:05 +08:00
closing chan struct { }
2014-07-16 15:06:18 +08:00
mu sync . Mutex // Only for closing.
2014-08-21 16:12:49 +08:00
post chan pp . Message
2014-07-18 00:37:33 +08:00
writeCh chan [ ] byte
2014-04-03 20:16:59 +08:00
2014-08-28 07:32:49 +08:00
ChunksReceived int
UsefulChunksReceived int
lastMessageReceived time . Time
completedHandshake time . Time
lastUsefulChunkReceived time . Time
2014-04-03 20:16:59 +08:00
// Stuff controlled by the local peer.
Interested bool
Choked bool
2014-04-16 19:13:44 +08:00
Requests map [ request ] struct { }
2014-04-03 20:16:59 +08:00
// Stuff controlled by the remote peer.
2014-08-21 16:12:49 +08:00
PeerID [ 20 ] byte
PeerInterested bool
PeerChoked bool
PeerRequests map [ request ] struct { }
PeerExtensionBytes peerExtensionBytes
2014-07-16 15:09:30 +08:00
// Whether the peer has the given piece. nil if they've not sent any
// related messages yet.
2014-06-26 22:57:07 +08:00
PeerPieces [ ] bool
PeerMaxRequests int // Maximum pending requests the peer allows.
PeerExtensionIDs map [ string ] int64
2014-06-29 16:57:49 +08:00
PeerClientName string
2014-04-03 20:16:59 +08:00
}
2014-08-21 16:12:49 +08:00
func newConnection ( sock net . Conn , peb peerExtensionBytes , peerID [ 20 ] byte ) ( c * connection ) {
c = & connection {
Socket : sock ,
Choked : true ,
PeerChoked : true ,
PeerMaxRequests : 250 ,
PeerExtensionBytes : peb ,
PeerID : peerID ,
2014-08-28 07:31:05 +08:00
closing : make ( chan struct { } ) ,
2014-08-21 16:12:49 +08:00
writeCh : make ( chan [ ] byte ) ,
post : make ( chan pp . Message ) ,
2014-08-28 07:32:49 +08:00
completedHandshake : time . Now ( ) ,
2014-08-21 16:12:49 +08:00
}
go c . writer ( )
go c . writeOptimizer ( time . Minute )
return
2014-07-18 00:37:33 +08:00
}
2014-06-26 15:29:12 +08:00
func ( cn * connection ) completedString ( ) string {
if cn . PeerPieces == nil {
return "?"
}
2014-07-17 14:04:43 +08:00
// f := float32(cn.piecesPeerHasCount()) / float32(cn.totalPiecesCount())
// return fmt.Sprintf("%d%%", int(f*100))
return fmt . Sprintf ( "%d/%d" , cn . piecesPeerHasCount ( ) , cn . totalPiecesCount ( ) )
2014-06-26 15:29:12 +08:00
}
func ( cn * connection ) totalPiecesCount ( ) int {
return len ( cn . PeerPieces )
}
func ( cn * connection ) piecesPeerHasCount ( ) ( count int ) {
for _ , has := range cn . PeerPieces {
if has {
count ++
}
}
return
}
2014-07-16 15:09:30 +08:00
// Correct the PeerPieces slice length. Return false if the existing slice is
// invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
// messages.
func ( cn * connection ) setNumPieces ( num int ) error {
if cn . PeerPieces == nil {
return nil
}
if len ( cn . PeerPieces ) == num {
} else if len ( cn . PeerPieces ) < num {
cn . PeerPieces = append ( cn . PeerPieces , make ( [ ] bool , num - len ( cn . PeerPieces ) ) ... )
2014-08-21 23:26:41 +08:00
} else if len ( cn . PeerPieces ) <= ( num + 7 ) / 8 * 8 {
2014-07-16 15:09:30 +08:00
for _ , have := range cn . PeerPieces [ num : ] {
if have {
return errors . New ( "peer has invalid piece" )
}
}
cn . PeerPieces = cn . PeerPieces [ : num ]
} else {
2014-08-21 23:26:41 +08:00
return fmt . Errorf ( "peer bitfield is excessively long: expected %d, have %d" , num , len ( cn . PeerPieces ) )
2014-07-16 15:09:30 +08:00
}
if len ( cn . PeerPieces ) != num {
panic ( "wat" )
}
return nil
}
2014-06-26 15:29:12 +08:00
func ( cn * connection ) WriteStatus ( w io . Writer ) {
2014-08-28 07:32:49 +08:00
fmt . Fprintf ( w , "%-90s: %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %.0fs ago age: %.1fmin last useful chunk: %s ago flags: " , fmt . Sprintf ( "%q: %s-%s" , cn . PeerID , cn . Socket . LocalAddr ( ) , cn . Socket . RemoteAddr ( ) ) , cn . completedString ( ) , cn . UsefulChunksReceived , cn . ChunksReceived , len ( cn . Requests ) , len ( cn . PeerRequests ) , time . Now ( ) . Sub ( cn . lastMessageReceived ) . Seconds ( ) , time . Now ( ) . Sub ( cn . completedHandshake ) . Minutes ( ) , time . Now ( ) . Sub ( cn . lastUsefulChunkReceived ) )
2014-06-26 15:29:12 +08:00
c := func ( b byte ) {
fmt . Fprintf ( w , "%c" , b )
}
2014-08-22 15:33:17 +08:00
// Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
2014-06-26 15:29:12 +08:00
if len ( cn . Requests ) != 0 {
c ( 'D' )
2014-07-09 22:16:09 +08:00
}
if cn . PeerChoked && cn . Interested {
2014-06-26 15:29:12 +08:00
c ( 'd' )
}
2014-08-22 15:33:17 +08:00
if ! cn . Choked {
if cn . PeerInterested {
c ( 'U' )
} else {
c ( 'u' )
}
2014-06-26 15:29:12 +08:00
}
2014-07-16 15:06:18 +08:00
if cn . Discovery != 0 {
c ( byte ( cn . Discovery ) )
2014-06-30 22:04:28 +08:00
}
2014-06-26 15:29:12 +08:00
fmt . Fprintln ( w )
}
2014-04-09 00:36:05 +08:00
func ( c * connection ) Close ( ) {
2014-04-03 20:16:59 +08:00
c . mu . Lock ( )
2014-07-17 14:02:30 +08:00
defer c . mu . Unlock ( )
2014-08-21 16:12:49 +08:00
select {
2014-08-28 07:31:05 +08:00
case <- c . closing :
return
2014-08-21 16:12:49 +08:00
default :
}
2014-08-28 07:31:05 +08:00
close ( c . closing )
c . Socket . Close ( )
2014-04-03 20:16:59 +08:00
}
2014-08-21 16:12:49 +08:00
func ( c * connection ) PeerHasPiece ( index pp . Integer ) bool {
2014-04-03 20:16:59 +08:00
if c . PeerPieces == nil {
return false
}
2014-06-30 22:06:58 +08:00
if int ( index ) >= len ( c . PeerPieces ) {
return false
}
2014-04-03 20:16:59 +08:00
return c . PeerPieces [ index ]
}
2014-08-21 16:12:49 +08:00
func ( c * connection ) Post ( msg pp . Message ) {
select {
case c . post <- msg :
2014-08-28 07:31:05 +08:00
case <- c . closing :
2014-08-21 16:12:49 +08:00
}
2014-04-03 20:16:59 +08:00
}
2014-05-23 19:01:05 +08:00
func ( c * connection ) RequestPending ( r request ) bool {
_ , ok := c . Requests [ r ]
return ok
}
2014-04-03 20:16:59 +08:00
// Returns true if more requests can be sent.
2014-04-16 19:13:44 +08:00
func ( c * connection ) Request ( chunk request ) bool {
2014-05-21 15:47:42 +08:00
if len ( c . Requests ) >= c . PeerMaxRequests {
2014-04-03 20:16:59 +08:00
return false
}
2014-05-21 15:42:06 +08:00
if ! c . PeerHasPiece ( chunk . Index ) {
2014-04-03 20:16:59 +08:00
return true
}
2014-06-28 17:38:31 +08:00
if c . RequestPending ( chunk ) {
return true
}
2014-04-03 20:16:59 +08:00
c . SetInterested ( true )
if c . PeerChoked {
return false
}
if c . Requests == nil {
2014-05-21 15:47:42 +08:00
c . Requests = make ( map [ request ] struct { } , c . PeerMaxRequests )
2014-04-03 20:16:59 +08:00
}
c . Requests [ chunk ] = struct { } { }
2014-08-21 16:12:49 +08:00
c . Post ( pp . Message {
Type : pp . Request ,
2014-05-23 19:01:05 +08:00
Index : chunk . Index ,
Begin : chunk . Begin ,
Length : chunk . Length ,
} )
2014-04-03 20:16:59 +08:00
return true
}
2014-05-21 15:49:28 +08:00
// Returns true if an unsatisfied request was canceled.
func ( c * connection ) Cancel ( r request ) bool {
if c . Requests == nil {
return false
}
if _ , ok := c . Requests [ r ] ; ! ok {
return false
}
delete ( c . Requests , r )
2014-08-21 16:12:49 +08:00
c . Post ( pp . Message {
Type : pp . Cancel ,
2014-05-21 15:49:28 +08:00
Index : r . Index ,
Begin : r . Begin ,
Length : r . Length ,
} )
return true
}
2014-04-16 15:33:33 +08:00
// Returns true if an unsatisfied request was canceled.
2014-04-16 19:13:44 +08:00
func ( c * connection ) PeerCancel ( r request ) bool {
2014-04-16 15:33:33 +08:00
if c . PeerRequests == nil {
return false
}
if _ , ok := c . PeerRequests [ r ] ; ! ok {
return false
}
delete ( c . PeerRequests , r )
return true
}
2014-05-21 15:49:28 +08:00
func ( c * connection ) Choke ( ) {
if c . Choked {
return
}
2014-08-21 16:12:49 +08:00
c . Post ( pp . Message {
Type : pp . Choke ,
2014-05-21 15:49:28 +08:00
} )
c . Choked = true
}
2014-04-09 00:36:05 +08:00
func ( c * connection ) Unchoke ( ) {
2014-04-03 20:16:59 +08:00
if ! c . Choked {
return
}
2014-08-21 16:12:49 +08:00
c . Post ( pp . Message {
Type : pp . Unchoke ,
2014-04-03 20:16:59 +08:00
} )
c . Choked = false
}
2014-04-09 00:36:05 +08:00
func ( c * connection ) SetInterested ( interested bool ) {
2014-04-03 20:16:59 +08:00
if c . Interested == interested {
return
}
2014-08-21 16:12:49 +08:00
c . Post ( pp . Message {
Type : func ( ) pp . MessageType {
2014-04-03 20:16:59 +08:00
if interested {
2014-08-21 16:12:49 +08:00
return pp . Interested
2014-04-03 20:16:59 +08:00
} else {
2014-08-21 16:12:49 +08:00
return pp . NotInterested
2014-04-03 20:16:59 +08:00
}
} ( ) ,
} )
c . Interested = interested
}
var (
// Four consecutive zero bytes that comprise a keep alive on the wire.
keepAliveBytes [ 4 ] byte
)
2014-05-28 23:27:48 +08:00
// Writes buffers to the socket from the write channel.
2014-04-09 00:36:05 +08:00
func ( conn * connection ) writer ( ) {
2014-08-21 16:12:49 +08:00
for {
select {
case b , ok := <- conn . writeCh :
if ! ok {
return
}
_ , err := conn . Socket . Write ( b )
if err != nil {
conn . Close ( )
return
2014-05-28 23:27:48 +08:00
}
2014-08-28 07:31:05 +08:00
case <- conn . closing :
2014-08-21 16:12:49 +08:00
return
2014-04-03 20:16:59 +08:00
}
}
}
2014-05-28 23:27:48 +08:00
func ( conn * connection ) writeOptimizer ( keepAliveDelay time . Duration ) {
2014-07-18 00:37:33 +08:00
defer close ( conn . writeCh ) // Responsible for notifying downstream routines.
pending := list . New ( ) // Message queue.
var nextWrite [ ] byte // Set to nil if we need to need to marshal the next message.
2014-05-28 23:27:48 +08:00
timer := time . NewTimer ( keepAliveDelay )
defer timer . Stop ( )
lastWrite := time . Now ( )
2014-04-03 20:16:59 +08:00
for {
2014-07-18 00:37:33 +08:00
write := conn . writeCh // Set to nil if there's nothing to write.
2014-04-03 20:16:59 +08:00
if pending . Len ( ) == 0 {
write = nil
2014-05-28 23:27:48 +08:00
} else if nextWrite == nil {
2014-04-03 20:16:59 +08:00
var err error
nextWrite , err = pending . Front ( ) . Value . ( encoding . BinaryMarshaler ) . MarshalBinary ( )
if err != nil {
panic ( err )
}
}
2014-05-28 23:27:48 +08:00
event :
2014-04-03 20:16:59 +08:00
select {
2014-05-28 23:27:48 +08:00
case <- timer . C :
if pending . Len ( ) != 0 {
break
}
keepAliveTime := lastWrite . Add ( keepAliveDelay )
if time . Now ( ) . Before ( keepAliveTime ) {
timer . Reset ( keepAliveTime . Sub ( time . Now ( ) ) )
break
}
2014-08-21 16:12:49 +08:00
pending . PushBack ( pp . Message { Keepalive : true } )
2014-04-03 20:16:59 +08:00
case msg , ok := <- conn . post :
if ! ok {
return
}
2014-08-21 16:12:49 +08:00
if msg . Type == pp . Cancel {
2014-05-28 23:27:48 +08:00
for e := pending . Back ( ) ; e != nil ; e = e . Prev ( ) {
2014-08-21 16:12:49 +08:00
elemMsg := e . Value . ( pp . Message )
if elemMsg . Type == pp . Request && msg . Index == elemMsg . Index && msg . Begin == elemMsg . Begin && msg . Length == elemMsg . Length {
2014-05-28 23:27:48 +08:00
pending . Remove ( e )
2014-08-24 01:04:07 +08:00
optimizedCancels . Add ( 1 )
2014-05-28 23:27:48 +08:00
break event
}
}
}
2014-04-03 20:16:59 +08:00
pending . PushBack ( msg )
case write <- nextWrite :
pending . Remove ( pending . Front ( ) )
2014-05-28 23:27:48 +08:00
nextWrite = nil
lastWrite = time . Now ( )
if pending . Len ( ) == 0 {
timer . Reset ( keepAliveDelay )
}
2014-08-28 07:31:05 +08:00
case <- conn . closing :
2014-08-21 16:12:49 +08:00
return
2014-04-03 20:16:59 +08:00
}
}
}