FedP2P/client.go

1261 lines
29 KiB
Go
Raw Normal View History

/*
Package torrent implements a torrent client.
Simple example:
c := &Client{}
c.Start()
defer c.Stop()
if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil {
return fmt.Errors("error adding torrent: %s", err)
}
c.WaitAll()
log.Print("erhmahgerd, torrent downloaded")
*/
2013-09-26 17:49:15 +08:00
package torrent
import (
"bufio"
"crypto/rand"
"crypto/sha1"
2013-09-26 17:49:15 +08:00
"errors"
"fmt"
2013-09-26 17:49:15 +08:00
"io"
"log"
2014-03-16 23:30:10 +08:00
mathRand "math/rand"
"net"
2013-09-26 17:49:15 +08:00
"os"
"sync"
"syscall"
"time"
2014-08-21 16:07:06 +08:00
"bitbucket.org/anacrolix/go.torrent/dht"
. "bitbucket.org/anacrolix/go.torrent/util"
2014-06-28 17:38:31 +08:00
"github.com/anacrolix/libtorgo/metainfo"
"github.com/nsf/libtorgo/bencode"
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
"bitbucket.org/anacrolix/go.torrent/tracker"
_ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
2013-09-26 17:49:15 +08:00
)
2014-03-16 23:30:10 +08:00
// Currently doesn't really queue, but should in the future.
func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
piece := t.Pieces[pieceIndex]
if piece.QueuedForHash {
return
}
piece.QueuedForHash = true
go cl.verifyPiece(t, pieceIndex)
}
// Queues the torrent data for the given region for download. The beginning of
// the region is given highest priority to allow a subsequent read at the same
// offset to return data ASAP.
func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
me.mu.Lock()
defer me.mu.Unlock()
t := me.torrent(ih)
if t == nil {
return errors.New("no such active torrent")
}
2014-06-28 17:38:31 +08:00
if !t.haveInfo() {
return errors.New("missing metadata")
}
2014-08-21 16:07:06 +08:00
me.downloadStrategy.TorrentPrioritize(t, off, len_)
for _, cn := range t.Conns {
me.replenishConnRequests(t, cn)
}
return nil
}
type dataSpec struct {
InfoHash
2014-04-16 19:13:44 +08:00
request
}
2013-10-06 15:01:39 +08:00
type Client struct {
2014-08-21 16:07:06 +08:00
dataDir string
halfOpenLimit int
peerID [20]byte
listener net.Listener
disableTrackers bool
downloadStrategy DownloadStrategy
dHT *dht.Server
mu sync.Mutex
event sync.Cond
quit chan struct{}
halfOpen int
torrents map[InfoHash]*torrent
dataWaiter chan struct{}
2013-09-26 17:49:15 +08:00
}
2014-08-21 16:07:06 +08:00
func (me *Client) ListenAddr() net.Addr {
return me.listener.Addr()
}
func (cl *Client) WriteStatus(w io.Writer) {
cl.mu.Lock()
defer cl.mu.Unlock()
2014-08-21 16:07:06 +08:00
if cl.listener != nil {
fmt.Fprintf(w, "Listening on %s\n", cl.listener.Addr())
2014-07-24 11:43:11 +08:00
} else {
fmt.Fprintf(w, "No listening torrent port!\n")
}
2014-08-21 16:07:06 +08:00
fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
2014-07-24 11:43:11 +08:00
fmt.Fprintf(w, "Half open outgoing connections: %d\n", cl.halfOpen)
2014-08-21 16:07:06 +08:00
if cl.dHT != nil {
fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes())
fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString())
}
fmt.Fprintln(w)
for _, t := range cl.torrents {
fmt.Fprintf(w, "%s: %f%%\n", t.Name(), func() float32 {
if !t.haveInfo() {
return 0
} else {
return 100 * (1 - float32(t.BytesLeft())/float32(t.Length()))
}
}())
t.WriteStatus(w)
fmt.Fprintln(w)
}
}
// Read torrent data at the given offset. Returns ErrDataNotReady if the data
// isn't available.
func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
cl.mu.Lock()
defer cl.mu.Unlock()
t := cl.torrent(ih)
if t == nil {
err = errors.New("unknown torrent")
return
}
2014-06-28 17:38:31 +08:00
index := pp.Integer(off / int64(t.UsualPieceSize()))
// Reading outside the bounds of a file is an error.
if index < 0 {
err = os.ErrInvalid
return
}
if int(index) >= len(t.Pieces) {
err = io.EOF
return
}
piece := t.Pieces[index]
pieceOff := pp.Integer(off % int64(t.PieceLength(0)))
high := int(t.PieceLength(index) - pieceOff)
if high < len(p) {
p = p[:high]
}
for cs, _ := range piece.PendingChunkSpecs {
chunkOff := int64(pieceOff) - int64(cs.Begin)
if chunkOff >= int64(t.PieceLength(index)) {
panic(chunkOff)
}
if 0 <= chunkOff && chunkOff < int64(cs.Length) {
// read begins in a pending chunk
err = ErrDataNotReady
return
}
// pending chunk caps available data
if chunkOff < 0 && int64(len(p)) > -chunkOff {
p = p[:-chunkOff]
}
}
return t.Data.ReadAt(p, off)
}
2013-09-26 17:49:15 +08:00
2014-08-21 16:07:06 +08:00
func NewClient(cfg *Config) (cl *Client, err error) {
if cfg == nil {
cfg = &Config{}
}
2014-08-21 16:07:06 +08:00
cl = &Client{
disableTrackers: cfg.DisableTrackers,
downloadStrategy: cfg.DownloadStrategy,
halfOpenLimit: 100,
dataDir: cfg.DataDir,
quit: make(chan struct{}),
torrents: make(map[InfoHash]*torrent),
}
cl.event.L = &cl.mu
o := copy(cl.peerID[:], BEP20)
_, err = rand.Read(cl.peerID[o:])
if err != nil {
panic("error generating peer id")
}
2014-08-21 16:07:06 +08:00
if cl.downloadStrategy == nil {
cl.downloadStrategy = &DefaultDownloadStrategy{}
}
cl.listener, err = net.Listen("tcp", cfg.ListenAddr)
if err != nil {
return
}
2014-08-21 16:07:06 +08:00
if cl.listener != nil {
go cl.acceptConnections()
}
2014-08-21 16:07:06 +08:00
if !cfg.NoDHT {
cl.dHT, err = dht.NewServer(&dht.ServerConfig{
Addr: cfg.ListenAddr,
})
if err != nil {
return
}
}
return
}
func (cl *Client) stopped() bool {
select {
case <-cl.quit:
return true
default:
return false
}
}
// Stops the client. All connections to peers are closed and all activity will
// come to a halt.
func (me *Client) Stop() {
me.mu.Lock()
close(me.quit)
me.event.Broadcast()
for _, t := range me.torrents {
for _, c := range t.Conns {
c.Close()
}
}
me.mu.Unlock()
}
func (cl *Client) acceptConnections() {
for {
2014-08-21 16:07:06 +08:00
conn, err := cl.listener.Accept()
select {
case <-cl.quit:
if conn != nil {
conn.Close()
}
return
default:
}
if err != nil {
log.Print(err)
return
}
// log.Printf("accepted connection from %s", conn.RemoteAddr())
go func() {
if err := cl.runConnection(conn, nil, peerSourceIncoming); err != nil {
log.Print(err)
}
}()
}
2013-09-26 17:49:15 +08:00
}
func (me *Client) torrent(ih InfoHash) *torrent {
for _, t := range me.torrents {
if t.InfoHash == ih {
return t
}
}
return nil
}
// Start the process of connecting to the given peer for the given torrent if
// appropriate.
func (me *Client) initiateConn(peer Peer, torrent *torrent) {
2014-08-21 16:07:06 +08:00
if peer.Id == me.peerID {
return
}
me.halfOpen++
go func() {
addr := &net.TCPAddr{
IP: peer.IP,
Port: peer.Port,
}
// Binding to the listener address and dialing via net.Dialer gives "address in use" error. It seems it's not possible to dial out from this address so that peers associate our local address with our listen address.
conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
// Whether or not the connection attempt succeeds, the half open
// counter should be decremented, and new connection attempts made.
go func() {
me.mu.Lock()
defer me.mu.Unlock()
if me.halfOpen == 0 {
panic("assert")
}
me.halfOpen--
me.openNewConns()
}()
if netOpErr, ok := err.(*net.OpError); ok {
if netOpErr.Timeout() {
return
}
switch netOpErr.Err {
2014-04-08 17:40:10 +08:00
case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
return
}
}
if err != nil {
log.Printf("error connecting to peer: %s %#v", err, err)
return
}
// log.Printf("connected to %s", conn.RemoteAddr())
err = me.runConnection(conn, torrent, peer.Source)
if err != nil {
log.Print(err)
}
}()
}
func (cl *Client) incomingPeerPort() int {
2014-08-21 16:07:06 +08:00
if cl.listener == nil {
return 0
}
2014-08-21 16:07:06 +08:00
_, p, err := net.SplitHostPort(cl.listener.Addr().String())
if err != nil {
panic(err)
}
var i int
_, err = fmt.Sscanf(p, "%d", &i)
if err != nil {
panic(err)
}
return i
}
// Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
func addrCompactIP(addr net.Addr) (string, error) {
switch typed := addr.(type) {
case *net.TCPAddr:
if v4 := typed.IP.To4(); v4 != nil {
if len(v4) != 4 {
panic(v4)
}
return string(v4), nil
}
return string(typed.IP.To16()), nil
default:
return "", fmt.Errorf("unhandled type: %T", addr)
}
}
func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource) (err error) {
conn := &connection{
Discovery: discovery,
2014-05-21 15:47:42 +08:00
Socket: sock,
Choked: true,
PeerChoked: true,
writeCh: make(chan []byte),
2014-06-29 17:10:59 +08:00
PeerMaxRequests: 250, // Default in libtorrent is 250.
}
go conn.writer()
defer func() {
// There's a lock and deferred unlock later in this function. The
// client will not be locked when this deferred is invoked.
me.mu.Lock()
defer me.mu.Unlock()
conn.Close()
}()
// go conn.writeOptimizer()
conn.write(pp.Bytes(pp.Protocol))
conn.write(pp.Bytes("\x00\x00\x00\x00\x00\x10\x00\x00"))
if torrent != nil {
conn.write(pp.Bytes(torrent.InfoHash[:]))
conn.write(pp.Bytes(me.PeerId[:]))
}
var b [28]byte
_, err = io.ReadFull(conn.Socket, b[:])
2014-03-20 21:14:17 +08:00
if err == io.EOF {
return nil
}
if err != nil {
err = fmt.Errorf("when reading protocol and extensions: %s", err)
return
}
if string(b[:20]) != pp.Protocol {
// err = fmt.Errorf("wrong protocol: %#v", string(b[:20]))
return
}
if 8 != copy(conn.PeerExtensions[:], b[20:]) {
panic("wtf")
}
2014-03-20 21:14:17 +08:00
// log.Printf("peer extensions: %#v", string(conn.PeerExtensions[:]))
var infoHash [20]byte
_, err = io.ReadFull(conn.Socket, infoHash[:])
if err != nil {
return fmt.Errorf("reading peer info hash: %s", err)
}
_, err = io.ReadFull(conn.Socket, conn.PeerId[:])
if err != nil {
return fmt.Errorf("reading peer id: %s", err)
}
if torrent == nil {
torrent = me.torrent(infoHash)
if torrent == nil {
return
}
conn.write(pp.Bytes(torrent.InfoHash[:]))
conn.write(pp.Bytes(me.PeerId[:]))
}
me.mu.Lock()
2014-03-16 23:30:10 +08:00
defer me.mu.Unlock()
if !me.addConnection(torrent, conn) {
return
}
conn.post = make(chan pp.Message)
go conn.writeOptimizer(time.Minute)
if conn.PeerExtensions[5]&0x10 != 0 {
conn.Post(pp.Message{
Type: pp.Extended,
ExtendedID: pp.HandshakeExtendedID,
ExtendedPayload: func() []byte {
2014-06-28 17:38:31 +08:00
d := map[string]interface{}{
"m": map[string]int{
"ut_metadata": 1,
2014-06-29 17:07:43 +08:00
"ut_pex": 2,
},
"v": "go.torrent dev",
"reqq": 1,
2014-06-28 17:38:31 +08:00
}
if torrent.metadataSizeKnown() {
d["metadata_size"] = torrent.metadataSize()
}
if p := me.incomingPeerPort(); p != 0 {
d["p"] = p
}
yourip, err := addrCompactIP(conn.Socket.RemoteAddr())
if err != nil {
log.Printf("error calculating yourip field value in extension handshake: %s", err)
} else {
d["yourip"] = yourip
}
2014-07-24 11:43:45 +08:00
// log.Printf("sending %v", d)
2014-06-28 17:38:31 +08:00
b, err := bencode.Marshal(d)
if err != nil {
panic(err)
}
return b
}(),
})
}
if torrent.haveAnyPieces() {
conn.Post(pp.Message{
Type: pp.Bitfield,
Bitfield: torrent.bitfield(),
})
}
err = me.connectionLoop(torrent, conn)
if err != nil {
err = fmt.Errorf("during Connection loop: %s", err)
}
me.dropConnection(torrent, conn)
return
}
func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
for piece >= len(c.PeerPieces) {
c.PeerPieces = append(c.PeerPieces, false)
}
c.PeerPieces[piece] = true
if t.wantPiece(piece) {
me.replenishConnRequests(t, c)
}
}
func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
me.replenishConnRequests(torrent, conn)
}
func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
ok = cn.Cancel(r)
if ok {
2014-08-21 16:07:06 +08:00
cl.downloadStrategy.DeleteRequest(t, r)
}
return
}
func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) {
if !cn.RequestPending(r) {
return
}
2014-08-21 16:07:06 +08:00
cl.downloadStrategy.DeleteRequest(t, r)
delete(cn.Requests, r)
}
func (cl *Client) requestPendingMetadata(t *torrent, c *connection) {
if t.haveInfo() {
return
}
var pending []int
2014-06-28 17:38:31 +08:00
for index := 0; index < t.MetadataPieceCount(); index++ {
if !t.HaveMetadataPiece(index) {
pending = append(pending, index)
}
}
for _, i := range mathRand.Perm(len(pending)) {
c.Post(pp.Message{
Type: pp.Extended,
ExtendedID: byte(c.PeerExtensionIDs["ut_metadata"]),
ExtendedPayload: func() []byte {
b, err := bencode.Marshal(map[string]int{
"msg_type": 0,
"piece": pending[i],
})
if err != nil {
panic(err)
}
return b
}(),
})
}
}
2014-06-28 17:38:31 +08:00
func (cl *Client) completedMetadata(t *torrent) {
h := sha1.New()
h.Write(t.MetaData)
var ih InfoHash
copy(ih[:], h.Sum(nil)[:])
if ih != t.InfoHash {
log.Print("bad metadata")
t.InvalidateMetadata()
return
}
var info metainfo.Info
err := bencode.Unmarshal(t.MetaData, &info)
if err != nil {
log.Printf("error unmarshalling metadata: %s", err)
t.InvalidateMetadata()
return
}
// TODO(anacrolix): If this fails, I think something harsher should be
// done.
err = cl.setMetaData(t, info, t.MetaData)
if err != nil {
log.Printf("error setting metadata: %s", err)
t.InvalidateMetadata()
return
}
log.Printf("%s: got metadata from peers", t)
2014-06-28 17:38:31 +08:00
}
func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connection) (err error) {
var d map[string]int
err = bencode.Unmarshal(payload, &d)
if err != nil {
2014-06-30 22:05:28 +08:00
err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
2014-06-28 17:38:31 +08:00
return
}
msgType, ok := d["msg_type"]
if !ok {
err = errors.New("missing msg_type field")
return
}
piece := d["piece"]
switch msgType {
case pp.DataMetadataExtensionMsgType:
if t.haveInfo() {
break
}
t.SaveMetadataPiece(piece, payload[len(payload)-metadataPieceSize(d["total_size"], piece):])
if !t.HaveAllMetadataPieces() {
break
}
cl.completedMetadata(t)
case pp.RequestMetadataExtensionMsgType:
if !t.HaveMetadataPiece(piece) {
c.Post(t.NewMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
break
}
c.Post(t.NewMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.MetaData[(1<<14)*piece:(1<<14)*piece+t.metadataPieceSize(piece)]))
case pp.RejectMetadataExtensionMsgType:
default:
err = errors.New("unknown msg_type value")
}
return
}
2014-06-29 17:07:43 +08:00
type peerExchangeMessage struct {
2014-07-03 23:41:46 +08:00
Added util.CompactPeers `bencode:"added"`
AddedFlags []byte `bencode:"added.f"`
Dropped []tracker.Peer `bencode:"dropped"`
2014-06-29 17:07:43 +08:00
}
2014-07-24 11:43:45 +08:00
// Processes incoming bittorrent messages. The client lock is held upon entry
// and exit.
func (me *Client) connectionLoop(t *torrent, c *connection) error {
decoder := pp.Decoder{
R: bufio.NewReader(c.Socket),
MaxLength: 256 * 1024,
}
for {
me.mu.Unlock()
var msg pp.Message
err := decoder.Decode(&msg)
me.mu.Lock()
if c.closed {
return nil
}
if err != nil {
2014-03-20 21:14:17 +08:00
if me.stopped() || err == io.EOF {
return nil
}
return err
}
if msg.Keepalive {
continue
}
switch msg.Type {
case pp.Choke:
c.PeerChoked = true
for r := range c.Requests {
me.connDeleteRequest(t, c, r)
}
case pp.Unchoke:
c.PeerChoked = false
me.peerUnchoked(t, c)
case pp.Interested:
c.PeerInterested = true
// TODO: This should be done from a dedicated unchoking routine.
c.Unchoke()
case pp.NotInterested:
c.PeerInterested = false
c.Choke()
case pp.Have:
me.peerGotPiece(t, c, int(msg.Index))
case pp.Request:
if c.PeerRequests == nil {
c.PeerRequests = make(map[request]struct{}, maxRequests)
}
2014-05-21 15:42:06 +08:00
request := newRequest(msg.Index, msg.Begin, msg.Length)
// TODO: Requests should be satisfied from a dedicated upload routine.
// c.PeerRequests[request] = struct{}{}
p := make([]byte, msg.Length)
n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
if err != nil {
2014-06-26 15:30:16 +08:00
return fmt.Errorf("reading t data to serve request %q: %s", request, err)
}
if n != int(msg.Length) {
2014-06-26 15:30:16 +08:00
return fmt.Errorf("bad request: %v", msg)
}
c.Post(pp.Message{
Type: pp.Piece,
Index: msg.Index,
Begin: msg.Begin,
Piece: p,
})
case pp.Cancel:
2014-04-16 15:33:33 +08:00
req := newRequest(msg.Index, msg.Begin, msg.Length)
if !c.PeerCancel(req) {
2014-04-16 15:33:33 +08:00
log.Printf("received unexpected cancel: %v", req)
}
case pp.Bitfield:
if c.PeerPieces != nil {
err = errors.New("received unexpected bitfield")
break
}
if t.haveInfo() {
if len(msg.Bitfield) < t.NumPieces() {
err = errors.New("received invalid bitfield")
break
}
msg.Bitfield = msg.Bitfield[:t.NumPieces()]
}
c.PeerPieces = msg.Bitfield
for index, has := range c.PeerPieces {
if has {
me.peerGotPiece(t, c, index)
}
}
case pp.Piece:
err = me.downloadedChunk(t, c, &msg)
case pp.Extended:
switch msg.ExtendedID {
case pp.HandshakeExtendedID:
// TODO: Create a bencode struct for this.
var d map[string]interface{}
err = bencode.Unmarshal(msg.ExtendedPayload, &d)
if err != nil {
err = fmt.Errorf("error decoding extended message payload: %s", err)
break
}
2014-07-24 11:43:45 +08:00
// log.Printf("got handshake: %v", d)
if reqq, ok := d["reqq"]; ok {
if i, ok := reqq.(int64); ok {
c.PeerMaxRequests = int(i)
}
}
if v, ok := d["v"]; ok {
c.PeerClientName = v.(string)
}
m, ok := d["m"]
if !ok {
err = errors.New("handshake missing m item")
break
}
mTyped, ok := m.(map[string]interface{})
if !ok {
err = errors.New("handshake m value is not dict")
break
}
if c.PeerExtensionIDs == nil {
c.PeerExtensionIDs = make(map[string]int64, len(mTyped))
}
for name, v := range mTyped {
id, ok := v.(int64)
if !ok {
log.Printf("bad handshake m item extension ID type: %T", v)
continue
}
if id == 0 {
delete(c.PeerExtensionIDs, name)
} else {
c.PeerExtensionIDs[name] = id
}
}
metadata_sizeUntyped, ok := d["metadata_size"]
if ok {
metadata_size, ok := metadata_sizeUntyped.(int64)
if !ok {
log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
} else {
2014-06-28 17:38:31 +08:00
t.SetMetadataSize(metadata_size)
}
}
if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
me.requestPendingMetadata(t, c)
}
case 1:
2014-06-28 17:38:31 +08:00
err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
2014-06-30 22:05:28 +08:00
if err != nil {
err = fmt.Errorf("error handling metadata extension message: %s", err)
}
2014-06-29 17:07:43 +08:00
case 2:
var pexMsg peerExchangeMessage
err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
if err != nil {
err = fmt.Errorf("error unmarshalling PEX message: %s", err)
break
}
go func() {
err := me.AddPeers(t.InfoHash, func() (ret []Peer) {
for _, cp := range pexMsg.Added {
p := Peer{
IP: make([]byte, 4),
Port: int(cp.Port),
Source: peerSourcePEX,
2014-06-29 17:07:43 +08:00
}
if n := copy(p.IP, cp.IP[:]); n != 4 {
panic(n)
}
ret = append(ret, p)
}
return
}())
if err != nil {
log.Printf("error adding PEX peers: %s", err)
return
}
log.Printf("added %d peers from PEX", len(pexMsg.Added))
}()
2014-06-28 17:38:31 +08:00
default:
2014-07-10 00:59:37 +08:00
err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
}
default:
2014-05-21 15:42:06 +08:00
err = fmt.Errorf("received unknown message type: %#v", msg.Type)
}
if err != nil {
return err
}
}
}
func (me *Client) dropConnection(torrent *torrent, conn *connection) {
conn.Socket.Close()
for r := range conn.Requests {
me.connDeleteRequest(torrent, conn, r)
}
for i0, c := range torrent.Conns {
if c != conn {
continue
}
i1 := len(torrent.Conns) - 1
if i0 != i1 {
torrent.Conns[i0] = torrent.Conns[i1]
}
torrent.Conns = torrent.Conns[:i1]
return
}
2014-05-23 19:02:11 +08:00
panic("connection not found")
}
func (me *Client) addConnection(t *torrent, c *connection) bool {
if me.stopped() {
return false
}
2014-03-16 23:30:10 +08:00
for _, c0 := range t.Conns {
if c.PeerId == c0.PeerId {
2014-05-21 15:42:06 +08:00
// Already connected to a client with that ID.
return false
}
}
t.Conns = append(t.Conns, c)
return true
}
2013-10-06 15:01:39 +08:00
func (me *Client) openNewConns() {
for _, t := range me.torrents {
for len(t.Peers) != 0 {
2014-08-21 16:07:06 +08:00
if me.halfOpen >= me.halfOpenLimit {
return
}
p := t.Peers[0]
t.Peers = t.Peers[1:]
me.initiateConn(p, t)
}
}
}
// Adds peers to the swarm for the torrent corresponding to infoHash.
func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
me.mu.Lock()
defer me.mu.Unlock()
t := me.torrent(infoHash)
if t == nil {
return errors.New("no such torrent")
}
t.Peers = append(t.Peers, peers...)
me.openNewConns()
return nil
}
2014-06-28 17:38:31 +08:00
func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
2014-08-21 16:07:06 +08:00
err = t.setMetadata(md, cl.dataDir, bytes)
if err != nil {
return
}
// Queue all pieces for hashing. This is done sequentially to avoid
// spamming goroutines.
for _, p := range t.Pieces {
p.QueuedForHash = true
}
go func() {
for i := range t.Pieces {
cl.verifyPiece(t, pp.Integer(i))
}
}()
2014-08-21 16:07:06 +08:00
cl.downloadStrategy.TorrentStarted(t)
return
}
// Prepare a Torrent without any attachment to a Client. That means we can
// initialize fields all fields that don't require the Client without locking
// it.
func newTorrent(ih InfoHash, announceList [][]string) (t *torrent, err error) {
t = &torrent{
InfoHash: ih,
}
t.Trackers = make([][]tracker.Client, len(announceList))
for tierIndex := range announceList {
tier := t.Trackers[tierIndex]
for _, url := range announceList[tierIndex] {
2014-03-16 23:30:10 +08:00
tr, err := tracker.New(url)
if err != nil {
log.Print(err)
continue
}
tier = append(tier, tr)
}
// The trackers within each tier must be shuffled before use.
// http://stackoverflow.com/a/12267471/149482
// http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
for i := range tier {
j := mathRand.Intn(i + 1)
tier[i], tier[j] = tier[j], tier[i]
}
t.Trackers[tierIndex] = tier
2014-03-16 23:30:10 +08:00
}
return
}
func (cl *Client) AddMagnet(uri string) (err error) {
m, err := ParseMagnetURI(uri)
2013-09-26 17:49:15 +08:00
if err != nil {
return
2013-09-26 17:49:15 +08:00
}
t, err := newTorrent(m.InfoHash, [][]string{m.Trackers})
if err != nil {
return
}
t.DisplayName = m.DisplayName
cl.mu.Lock()
defer cl.mu.Unlock()
err = cl.addTorrent(t)
if err != nil {
t.Close()
}
return
}
func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
me.mu.Lock()
defer me.mu.Unlock()
t, ok := me.torrents[infoHash]
if !ok {
err = fmt.Errorf("no such torrent")
return
}
err = t.Close()
if err != nil {
panic(err)
}
delete(me.torrents, infoHash)
return
}
func (me *Client) addTorrent(t *torrent) (err error) {
if _, ok := me.torrents[t.InfoHash]; ok {
err = fmt.Errorf("torrent infohash collision")
return
}
me.torrents[t.InfoHash] = t
2014-08-21 16:07:06 +08:00
if !me.disableTrackers {
go me.announceTorrent(t)
}
2014-08-21 16:07:06 +08:00
if me.dHT != nil {
2014-07-11 17:30:20 +08:00
go me.announceTorrentDHT(t)
}
return
}
// Adds the torrent to the client.
func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) {
t, err := newTorrent(BytesInfoHash(metaInfo.Info.Hash), metaInfo.AnnounceList)
if err != nil {
return
}
me.mu.Lock()
defer me.mu.Unlock()
err = me.addTorrent(t)
if err != nil {
return
}
err = me.setMetaData(t, metaInfo.Info.Info, metaInfo.Info.Bytes)
if err != nil {
return
}
return
2013-09-26 17:49:15 +08:00
}
func (me *Client) AddTorrentFromFile(name string) (err error) {
mi, err := metainfo.LoadFromFile(name)
if err != nil {
err = fmt.Errorf("error loading metainfo from file: %s", err)
return
}
return me.AddTorrent(mi)
}
2014-03-20 21:12:53 +08:00
func (cl *Client) listenerAnnouncePort() (port int16) {
2014-08-21 16:07:06 +08:00
l := cl.listener
2014-03-20 21:12:53 +08:00
if l == nil {
return
}
addr := l.Addr()
switch data := addr.(type) {
case *net.TCPAddr:
return int16(data.Port)
case *net.UDPAddr:
return int16(data.Port)
default:
log.Printf("unknown listener addr type: %T", addr)
}
return
}
2014-07-11 17:30:20 +08:00
func (cl *Client) announceTorrentDHT(t *torrent) {
for {
2014-08-21 16:07:06 +08:00
ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
2014-07-11 17:30:20 +08:00
if err != nil {
log.Printf("error getting peers from dht: %s", err)
return
}
nextScrape := time.After(1 * time.Minute)
getPeers:
for {
select {
case <-nextScrape:
break getPeers
case cps, ok := <-ps.Values:
if !ok {
break getPeers
}
err = cl.AddPeers(t.InfoHash, func() (ret []Peer) {
for _, cp := range cps {
ret = append(ret, Peer{
IP: cp.IP[:],
Port: int(cp.Port),
Source: peerSourceDHT,
2014-07-11 17:30:20 +08:00
})
// log.Printf("peer from dht: %s", &net.UDPAddr{
// IP: cp.IP[:],
// Port: int(cp.Port),
// })
2014-07-11 17:30:20 +08:00
}
return
}())
if err != nil {
log.Printf("error adding peers from dht for torrent %q: %s", t, err)
break getPeers
}
// log.Printf("got %d peers from dht for torrent %q", len(cps), t)
2014-07-11 17:30:20 +08:00
}
}
ps.Close()
}
}
func (cl *Client) announceTorrent(t *torrent) {
2014-03-16 23:30:10 +08:00
req := tracker.AnnounceRequest{
Event: tracker.Started,
NumWant: -1,
Port: cl.listenerAnnouncePort(),
2014-08-21 16:07:06 +08:00
PeerId: cl.peerID,
InfoHash: t.InfoHash,
2014-03-16 23:30:10 +08:00
}
newAnnounce:
for {
cl.mu.Lock()
if t.isClosed() {
return
}
req.Left = t.BytesLeft()
cl.mu.Unlock()
2014-03-16 23:30:10 +08:00
for _, tier := range t.Trackers {
for trIndex, tr := range tier {
if err := tr.Connect(); err != nil {
log.Print(err)
continue
}
resp, err := tr.Announce(&req)
if err != nil {
log.Print(err)
continue
}
var peers []Peer
for _, peer := range resp.Peers {
peers = append(peers, Peer{
IP: peer.IP,
Port: peer.Port,
2014-03-16 23:30:10 +08:00
})
}
err = cl.AddPeers(t.InfoHash, peers)
if err != nil {
2014-07-24 11:43:45 +08:00
log.Printf("error adding peers to torrent %s: %s", t, err)
} else {
log.Printf("%s: %d new peers from %s", t, len(peers), tr)
2014-03-16 23:30:10 +08:00
}
tier[0], tier[trIndex] = tier[trIndex], tier[0]
time.Sleep(time.Second * time.Duration(resp.Interval))
req.Event = tracker.None
2014-03-16 23:30:10 +08:00
continue newAnnounce
}
}
time.Sleep(5 * time.Second)
2014-03-16 23:30:10 +08:00
}
}
func (cl *Client) allTorrentsCompleted() bool {
for _, t := range cl.torrents {
if !t.haveAllPieces() {
return false
}
}
return true
}
// Returns true when all torrents are completely downloaded and false if the
// client is stopped before that.
func (me *Client) WaitAll() bool {
me.mu.Lock()
defer me.mu.Unlock()
2014-03-16 23:30:10 +08:00
for !me.allTorrentsCompleted() {
if me.stopped() {
return false
}
me.event.Wait()
}
return true
2013-09-26 17:49:15 +08:00
}
func (cl *Client) assertRequestHeat() {
2014-08-21 16:07:06 +08:00
dds, ok := cl.downloadStrategy.(*DefaultDownloadStrategy)
if !ok {
return
}
for _, t := range cl.torrents {
m := make(map[request]int, 3000)
for _, cn := range t.Conns {
for r := range cn.Requests {
m[r]++
}
}
for r, h := range dds.heat[t] {
if m[r] != h {
panic(fmt.Sprintln(m[r], h))
}
}
}
}
func (me *Client) replenishConnRequests(t *torrent, c *connection) {
if !t.haveInfo() {
return
}
2014-08-21 16:07:06 +08:00
me.downloadStrategy.FillRequests(t, c)
//me.assertRequestHeat()
2014-06-28 17:38:31 +08:00
if len(c.Requests) == 0 && !c.PeerChoked {
c.SetInterested(false)
}
}
func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
// Request has been satisfied.
me.connDeleteRequest(t, c, req)
defer me.replenishConnRequests(t, c)
// Do we actually want this chunk?
if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
log.Printf("got unnecessary chunk from %v: %q", req, string(c.PeerId[:]))
return nil
}
// Write the chunk out.
err := t.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
if err != nil {
return err
}
// Record that we have the chunk.
delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
2014-07-09 22:26:58 +08:00
t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement)
if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
me.queuePieceCheck(t, req.Index)
}
// Unprioritize the chunk.
2014-08-21 16:07:06 +08:00
me.downloadStrategy.TorrentGotChunk(t, req)
// Cancel pending requests for this chunk.
2014-06-30 22:05:28 +08:00
cancelled := false
for _, c := range t.Conns {
if me.connCancel(t, c, req) {
2014-06-30 22:05:28 +08:00
cancelled = true
me.replenishConnRequests(t, c)
}
}
2014-06-30 22:05:28 +08:00
if cancelled {
log.Printf("cancelled concurrent requests for %v", req)
2014-06-30 22:05:28 +08:00
}
2014-06-28 17:38:31 +08:00
me.dataReady(dataSpec{t.InfoHash, req})
return nil
}
func (cl *Client) dataReady(ds dataSpec) {
if cl.dataWaiter != nil {
close(cl.dataWaiter)
}
cl.dataWaiter = nil
}
// Returns a channel that is closed when new data has become available in the
// client.
func (me *Client) DataWaiter() <-chan struct{} {
me.mu.Lock()
defer me.mu.Unlock()
if me.dataWaiter == nil {
me.dataWaiter = make(chan struct{})
}
return me.dataWaiter
}
func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
p := t.Pieces[piece]
p.EverHashed = true
if correct {
p.PendingChunkSpecs = nil
2014-08-21 16:07:06 +08:00
me.downloadStrategy.TorrentGotPiece(t, int(piece))
me.dataReady(dataSpec{
t.InfoHash,
2014-04-16 19:13:44 +08:00
request{
pp.Integer(piece),
chunkSpec{0, pp.Integer(t.PieceLength(piece))},
},
})
} else {
if len(p.PendingChunkSpecs) == 0 {
t.pendAllChunkSpecs(piece)
}
}
for _, conn := range t.Conns {
if correct {
conn.Post(pp.Message{
Type: pp.Have,
Index: pp.Integer(piece),
})
2014-03-20 21:40:54 +08:00
// TODO: Cancel requests for this piece.
} else {
if conn.PeerHasPiece(piece) {
me.replenishConnRequests(t, conn)
}
2013-09-26 17:49:15 +08:00
}
}
2014-03-16 23:30:10 +08:00
me.event.Broadcast()
}
func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
cl.mu.Lock()
p := t.Pieces[index]
for p.Hashing {
cl.event.Wait()
}
if t.isClosed() {
cl.mu.Unlock()
return
}
p.Hashing = true
p.QueuedForHash = false
cl.mu.Unlock()
sum := t.HashPiece(index)
cl.mu.Lock()
p.Hashing = false
cl.pieceHashed(t, index, sum == p.Hash)
cl.mu.Unlock()
2013-09-26 17:49:15 +08:00
}
2013-10-06 15:01:39 +08:00
func (me *Client) Torrents() (ret []*torrent) {
me.mu.Lock()
for _, t := range me.torrents {
ret = append(ret, t)
}
me.mu.Unlock()
2013-10-06 15:01:39 +08:00
return
}