612 lines
13 KiB
Go
612 lines
13 KiB
Go
package torrent
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"container/list"
|
|
"encoding"
|
|
"errors"
|
|
"expvar"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/anacrolix/missinggo"
|
|
"github.com/anacrolix/missinggo/prioritybitmap"
|
|
|
|
"github.com/anacrolix/torrent/bencode"
|
|
pp "github.com/anacrolix/torrent/peer_protocol"
|
|
)
|
|
|
|
var optimizedCancels = expvar.NewInt("optimizedCancels")
|
|
|
|
type peerSource byte
|
|
|
|
const (
|
|
peerSourceTracker = '\x00' // It's the default.
|
|
peerSourceIncoming = 'I'
|
|
peerSourceDHT = 'H'
|
|
peerSourcePEX = 'X'
|
|
)
|
|
|
|
// Maintains the state of a connection with a peer.
|
|
type connection struct {
|
|
t *torrent
|
|
conn net.Conn
|
|
rw io.ReadWriter // The real slim shady
|
|
encrypted bool
|
|
Discovery peerSource
|
|
uTP bool
|
|
closed missinggo.Event
|
|
post chan pp.Message
|
|
writeCh chan []byte
|
|
|
|
UnwantedChunksReceived int
|
|
UsefulChunksReceived int
|
|
chunksSent int
|
|
|
|
lastMessageReceived time.Time
|
|
completedHandshake time.Time
|
|
lastUsefulChunkReceived time.Time
|
|
lastChunkSent time.Time
|
|
|
|
// Stuff controlled by the local peer.
|
|
Interested bool
|
|
Choked bool
|
|
Requests map[request]struct{}
|
|
requestsLowWater int
|
|
// Indexed by metadata piece, set to true if posted and pending a
|
|
// response.
|
|
metadataRequests []bool
|
|
sentHaves []bool
|
|
|
|
// Stuff controlled by the remote peer.
|
|
PeerID [20]byte
|
|
PeerInterested bool
|
|
PeerChoked bool
|
|
PeerRequests map[request]struct{}
|
|
PeerExtensionBytes peerExtensionBytes
|
|
// Whether the peer has the given piece. nil if they've not sent any
|
|
// related messages yet.
|
|
PeerPieces []bool
|
|
peerHasAll bool
|
|
// Pieces we've accepted chunks for from the peer.
|
|
peerTouchedPieces map[int]struct{}
|
|
|
|
PeerMaxRequests int // Maximum pending requests the peer allows.
|
|
PeerExtensionIDs map[string]byte
|
|
PeerClientName string
|
|
|
|
pieceInclination []int
|
|
pieceRequestOrder prioritybitmap.PriorityBitmap
|
|
}
|
|
|
|
func newConnection() (c *connection) {
|
|
c = &connection{
|
|
Choked: true,
|
|
PeerChoked: true,
|
|
PeerMaxRequests: 250,
|
|
|
|
writeCh: make(chan []byte),
|
|
post: make(chan pp.Message),
|
|
}
|
|
return
|
|
}
|
|
|
|
func (cn *connection) remoteAddr() net.Addr {
|
|
return cn.conn.RemoteAddr()
|
|
}
|
|
|
|
func (cn *connection) localAddr() net.Addr {
|
|
return cn.conn.LocalAddr()
|
|
}
|
|
|
|
func (cn *connection) supportsExtension(ext string) bool {
|
|
_, ok := cn.PeerExtensionIDs[ext]
|
|
return ok
|
|
}
|
|
|
|
func (cn *connection) completedString(t *torrent) string {
|
|
if cn.PeerPieces == nil && !cn.peerHasAll {
|
|
return "?"
|
|
}
|
|
return fmt.Sprintf("%d/%d", func() int {
|
|
if cn.peerHasAll {
|
|
if t.haveInfo() {
|
|
return t.numPieces()
|
|
}
|
|
return -1
|
|
}
|
|
ret := 0
|
|
for _, b := range cn.PeerPieces {
|
|
if b {
|
|
ret++
|
|
}
|
|
}
|
|
return ret
|
|
}(), func() int {
|
|
if cn.peerHasAll || cn.PeerPieces == nil {
|
|
if t.haveInfo() {
|
|
return t.numPieces()
|
|
}
|
|
return -1
|
|
}
|
|
return len(cn.PeerPieces)
|
|
}())
|
|
}
|
|
|
|
// Correct the PeerPieces slice length. Return false if the existing slice is
|
|
// invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
|
|
// messages.
|
|
func (cn *connection) setNumPieces(num int) error {
|
|
if cn.peerHasAll {
|
|
return nil
|
|
}
|
|
if cn.PeerPieces == nil {
|
|
return nil
|
|
}
|
|
if len(cn.PeerPieces) == num {
|
|
} else if len(cn.PeerPieces) < num {
|
|
cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
|
|
} else if len(cn.PeerPieces) <= (num+7)/8*8 {
|
|
for _, have := range cn.PeerPieces[num:] {
|
|
if have {
|
|
return errors.New("peer has invalid piece")
|
|
}
|
|
}
|
|
cn.PeerPieces = cn.PeerPieces[:num]
|
|
} else {
|
|
return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
|
|
}
|
|
if len(cn.PeerPieces) != num {
|
|
panic("wat")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func eventAgeString(t time.Time) string {
|
|
if t.IsZero() {
|
|
return "never"
|
|
}
|
|
return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
|
|
}
|
|
|
|
func (cn *connection) connectionFlags() (ret string) {
|
|
c := func(b byte) {
|
|
ret += string([]byte{b})
|
|
}
|
|
if cn.encrypted {
|
|
c('E')
|
|
}
|
|
if cn.Discovery != 0 {
|
|
c(byte(cn.Discovery))
|
|
}
|
|
if cn.uTP {
|
|
c('T')
|
|
}
|
|
return
|
|
}
|
|
|
|
// Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
|
|
func (cn *connection) statusFlags() (ret string) {
|
|
c := func(b byte) {
|
|
ret += string([]byte{b})
|
|
}
|
|
if cn.Interested {
|
|
c('i')
|
|
}
|
|
if cn.Choked {
|
|
c('c')
|
|
}
|
|
c('-')
|
|
ret += cn.connectionFlags()
|
|
c('-')
|
|
if cn.PeerInterested {
|
|
c('i')
|
|
}
|
|
if cn.PeerChoked {
|
|
c('c')
|
|
}
|
|
return
|
|
}
|
|
|
|
func (cn *connection) String() string {
|
|
var buf bytes.Buffer
|
|
cn.WriteStatus(&buf, nil)
|
|
return buf.String()
|
|
}
|
|
|
|
func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
|
|
// \t isn't preserved in <pre> blocks?
|
|
fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
|
|
fmt.Fprintf(w, " last msg: %s, connected: %s, last useful chunk: %s\n",
|
|
eventAgeString(cn.lastMessageReceived),
|
|
eventAgeString(cn.completedHandshake),
|
|
eventAgeString(cn.lastUsefulChunkReceived))
|
|
fmt.Fprintf(w,
|
|
" %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
|
|
cn.completedString(t),
|
|
len(cn.peerTouchedPieces),
|
|
cn.UsefulChunksReceived,
|
|
cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
|
|
cn.chunksSent,
|
|
len(cn.Requests),
|
|
len(cn.PeerRequests),
|
|
cn.statusFlags(),
|
|
)
|
|
}
|
|
|
|
func (c *connection) Close() {
|
|
c.closed.Set()
|
|
c.discardPieceInclination()
|
|
c.pieceRequestOrder.Clear()
|
|
// TODO: This call blocks sometimes, why?
|
|
go c.conn.Close()
|
|
}
|
|
|
|
func (c *connection) PeerHasPiece(piece int) bool {
|
|
if c.peerHasAll {
|
|
return true
|
|
}
|
|
if piece >= len(c.PeerPieces) {
|
|
return false
|
|
}
|
|
return c.PeerPieces[piece]
|
|
}
|
|
|
|
func (c *connection) Post(msg pp.Message) {
|
|
select {
|
|
case c.post <- msg:
|
|
case <-c.closed.C():
|
|
}
|
|
}
|
|
|
|
func (c *connection) RequestPending(r request) bool {
|
|
_, ok := c.Requests[r]
|
|
return ok
|
|
}
|
|
|
|
func (c *connection) requestMetadataPiece(index int) {
|
|
eID := c.PeerExtensionIDs["ut_metadata"]
|
|
if eID == 0 {
|
|
return
|
|
}
|
|
if index < len(c.metadataRequests) && c.metadataRequests[index] {
|
|
return
|
|
}
|
|
c.Post(pp.Message{
|
|
Type: pp.Extended,
|
|
ExtendedID: eID,
|
|
ExtendedPayload: func() []byte {
|
|
b, err := bencode.Marshal(map[string]int{
|
|
"msg_type": pp.RequestMetadataExtensionMsgType,
|
|
"piece": index,
|
|
})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return b
|
|
}(),
|
|
})
|
|
for index >= len(c.metadataRequests) {
|
|
c.metadataRequests = append(c.metadataRequests, false)
|
|
}
|
|
c.metadataRequests[index] = true
|
|
}
|
|
|
|
func (c *connection) requestedMetadataPiece(index int) bool {
|
|
return index < len(c.metadataRequests) && c.metadataRequests[index]
|
|
}
|
|
|
|
// Returns true if more requests can be sent.
|
|
func (c *connection) Request(chunk request) bool {
|
|
if len(c.Requests) >= c.PeerMaxRequests {
|
|
return false
|
|
}
|
|
if !c.PeerHasPiece(int(chunk.Index)) {
|
|
return true
|
|
}
|
|
if c.RequestPending(chunk) {
|
|
return true
|
|
}
|
|
c.SetInterested(true)
|
|
if c.PeerChoked {
|
|
return false
|
|
}
|
|
if c.Requests == nil {
|
|
c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
|
|
}
|
|
c.Requests[chunk] = struct{}{}
|
|
c.requestsLowWater = len(c.Requests) / 2
|
|
c.Post(pp.Message{
|
|
Type: pp.Request,
|
|
Index: chunk.Index,
|
|
Begin: chunk.Begin,
|
|
Length: chunk.Length,
|
|
})
|
|
return true
|
|
}
|
|
|
|
// Returns true if an unsatisfied request was canceled.
|
|
func (c *connection) Cancel(r request) bool {
|
|
if c.Requests == nil {
|
|
return false
|
|
}
|
|
if _, ok := c.Requests[r]; !ok {
|
|
return false
|
|
}
|
|
delete(c.Requests, r)
|
|
c.Post(pp.Message{
|
|
Type: pp.Cancel,
|
|
Index: r.Index,
|
|
Begin: r.Begin,
|
|
Length: r.Length,
|
|
})
|
|
return true
|
|
}
|
|
|
|
// Returns true if an unsatisfied request was canceled.
|
|
func (c *connection) PeerCancel(r request) bool {
|
|
if c.PeerRequests == nil {
|
|
return false
|
|
}
|
|
if _, ok := c.PeerRequests[r]; !ok {
|
|
return false
|
|
}
|
|
delete(c.PeerRequests, r)
|
|
return true
|
|
}
|
|
|
|
func (c *connection) Choke() {
|
|
if c.Choked {
|
|
return
|
|
}
|
|
c.Post(pp.Message{
|
|
Type: pp.Choke,
|
|
})
|
|
c.PeerRequests = nil
|
|
c.Choked = true
|
|
}
|
|
|
|
func (c *connection) Unchoke() {
|
|
if !c.Choked {
|
|
return
|
|
}
|
|
c.Post(pp.Message{
|
|
Type: pp.Unchoke,
|
|
})
|
|
c.Choked = false
|
|
}
|
|
|
|
func (c *connection) SetInterested(interested bool) {
|
|
if c.Interested == interested {
|
|
return
|
|
}
|
|
c.Post(pp.Message{
|
|
Type: func() pp.MessageType {
|
|
if interested {
|
|
return pp.Interested
|
|
} else {
|
|
return pp.NotInterested
|
|
}
|
|
}(),
|
|
})
|
|
c.Interested = interested
|
|
}
|
|
|
|
var (
|
|
// Track connection writer buffer writes and flushes, to determine its
|
|
// efficiency.
|
|
connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
|
|
connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
|
|
)
|
|
|
|
// Writes buffers to the socket from the write channel.
|
|
func (conn *connection) writer() {
|
|
defer func() {
|
|
conn.t.cl.mu.Lock()
|
|
defer conn.t.cl.mu.Unlock()
|
|
conn.Close()
|
|
}()
|
|
// Reduce write syscalls.
|
|
buf := bufio.NewWriter(conn.rw)
|
|
for {
|
|
if buf.Buffered() == 0 {
|
|
// There's nothing to write, so block until we get something.
|
|
select {
|
|
case b, ok := <-conn.writeCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
connectionWriterWrite.Add(1)
|
|
_, err := buf.Write(b)
|
|
if err != nil {
|
|
return
|
|
}
|
|
case <-conn.closed.C():
|
|
return
|
|
}
|
|
} else {
|
|
// We already have something to write, so flush if there's nothing
|
|
// more to write.
|
|
select {
|
|
case b, ok := <-conn.writeCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
connectionWriterWrite.Add(1)
|
|
_, err := buf.Write(b)
|
|
if err != nil {
|
|
return
|
|
}
|
|
case <-conn.closed.C():
|
|
return
|
|
default:
|
|
connectionWriterFlush.Add(1)
|
|
err := buf.Flush()
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
|
|
defer close(conn.writeCh) // Responsible for notifying downstream routines.
|
|
pending := list.New() // Message queue.
|
|
var nextWrite []byte // Set to nil if we need to need to marshal the next message.
|
|
timer := time.NewTimer(keepAliveDelay)
|
|
defer timer.Stop()
|
|
lastWrite := time.Now()
|
|
for {
|
|
write := conn.writeCh // Set to nil if there's nothing to write.
|
|
if pending.Len() == 0 {
|
|
write = nil
|
|
} else if nextWrite == nil {
|
|
var err error
|
|
nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
event:
|
|
select {
|
|
case <-timer.C:
|
|
if pending.Len() != 0 {
|
|
break
|
|
}
|
|
keepAliveTime := lastWrite.Add(keepAliveDelay)
|
|
if time.Now().Before(keepAliveTime) {
|
|
timer.Reset(keepAliveTime.Sub(time.Now()))
|
|
break
|
|
}
|
|
pending.PushBack(pp.Message{Keepalive: true})
|
|
case msg, ok := <-conn.post:
|
|
if !ok {
|
|
return
|
|
}
|
|
if msg.Type == pp.Cancel {
|
|
for e := pending.Back(); e != nil; e = e.Prev() {
|
|
elemMsg := e.Value.(pp.Message)
|
|
if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
|
|
pending.Remove(e)
|
|
optimizedCancels.Add(1)
|
|
break event
|
|
}
|
|
}
|
|
}
|
|
pending.PushBack(msg)
|
|
case write <- nextWrite:
|
|
pending.Remove(pending.Front())
|
|
nextWrite = nil
|
|
lastWrite = time.Now()
|
|
if pending.Len() == 0 {
|
|
timer.Reset(keepAliveDelay)
|
|
}
|
|
case <-conn.closed.C():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cn *connection) Have(piece int) {
|
|
for piece >= len(cn.sentHaves) {
|
|
cn.sentHaves = append(cn.sentHaves, false)
|
|
}
|
|
if cn.sentHaves[piece] {
|
|
return
|
|
}
|
|
cn.Post(pp.Message{
|
|
Type: pp.Have,
|
|
Index: pp.Integer(piece),
|
|
})
|
|
cn.sentHaves[piece] = true
|
|
}
|
|
|
|
func (cn *connection) Bitfield(haves []bool) {
|
|
if cn.sentHaves != nil {
|
|
panic("bitfield must be first have-related message sent")
|
|
}
|
|
cn.Post(pp.Message{
|
|
Type: pp.Bitfield,
|
|
Bitfield: haves,
|
|
})
|
|
cn.sentHaves = haves
|
|
}
|
|
|
|
func (c *connection) updateRequests() {
|
|
if !c.t.haveInfo() {
|
|
return
|
|
}
|
|
if c.Interested {
|
|
if c.PeerChoked {
|
|
return
|
|
}
|
|
if len(c.Requests) > c.requestsLowWater {
|
|
return
|
|
}
|
|
}
|
|
c.fillRequests()
|
|
if len(c.Requests) == 0 && !c.PeerChoked {
|
|
// So we're not choked, but we don't want anything right now. We may
|
|
// have completed readahead, and the readahead window has not rolled
|
|
// over to the next piece. Better to stay interested in case we're
|
|
// going to want data in the near future.
|
|
c.SetInterested(!c.t.haveAllPieces())
|
|
}
|
|
}
|
|
|
|
func (c *connection) fillRequests() {
|
|
c.pieceRequestOrder.IterTyped(func(piece int) (more bool) {
|
|
return c.requestPiecePendingChunks(piece)
|
|
})
|
|
}
|
|
|
|
func (c *connection) requestPiecePendingChunks(piece int) (again bool) {
|
|
return c.t.connRequestPiecePendingChunks(c, piece)
|
|
}
|
|
|
|
func (c *connection) stopRequestingPiece(piece int) {
|
|
c.pieceRequestOrder.Remove(piece)
|
|
}
|
|
|
|
func (c *connection) updatePiecePriority(piece int) {
|
|
if !c.PeerHasPiece(piece) {
|
|
return
|
|
}
|
|
tpp := c.t.piecePriority(piece)
|
|
if tpp == PiecePriorityNone {
|
|
c.stopRequestingPiece(piece)
|
|
return
|
|
}
|
|
prio := c.getPieceInclination()[piece]
|
|
switch tpp {
|
|
case PiecePriorityNormal:
|
|
case PiecePriorityReadahead:
|
|
prio -= c.t.numPieces()
|
|
case PiecePriorityNext, PiecePriorityNow:
|
|
prio -= 2 * c.t.numPieces()
|
|
default:
|
|
panic(tpp)
|
|
}
|
|
c.pieceRequestOrder.Set(piece, prio)
|
|
c.updateRequests()
|
|
}
|
|
|
|
func (c *connection) getPieceInclination() []int {
|
|
if c.pieceInclination == nil {
|
|
c.pieceInclination = c.t.getConnPieceInclination()
|
|
}
|
|
return c.pieceInclination
|
|
}
|
|
|
|
func (c *connection) discardPieceInclination() {
|
|
if c.pieceInclination == nil {
|
|
return
|
|
}
|
|
c.t.putPieceInclination(c.pieceInclination)
|
|
c.pieceInclination = nil
|
|
}
|