Pool webtorrent tracker websockets at the Client level
This commit is contained in:
parent
cc69c3ec8f
commit
0ab6d108be
51
client.go
51
client.go
|
@ -23,9 +23,12 @@ import (
|
||||||
"github.com/anacrolix/missinggo/slices"
|
"github.com/anacrolix/missinggo/slices"
|
||||||
"github.com/anacrolix/missinggo/v2/pproffd"
|
"github.com/anacrolix/missinggo/v2/pproffd"
|
||||||
"github.com/anacrolix/sync"
|
"github.com/anacrolix/sync"
|
||||||
|
"github.com/anacrolix/torrent/tracker"
|
||||||
|
"github.com/anacrolix/torrent/webtorrent"
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/dustin/go-humanize"
|
"github.com/dustin/go-humanize"
|
||||||
"github.com/google/btree"
|
"github.com/google/btree"
|
||||||
|
"github.com/pion/datachannel"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
@ -73,6 +76,8 @@ type Client struct {
|
||||||
|
|
||||||
acceptLimiter map[ipStr]int
|
acceptLimiter map[ipStr]int
|
||||||
dialRateLimiter *rate.Limiter
|
dialRateLimiter *rate.Limiter
|
||||||
|
|
||||||
|
websocketTrackers websocketTrackers
|
||||||
}
|
}
|
||||||
|
|
||||||
type ipStr string
|
type ipStr string
|
||||||
|
@ -241,6 +246,32 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cl.websocketTrackers = websocketTrackers{
|
||||||
|
PeerId: cl.peerID,
|
||||||
|
Logger: cl.logger.WithMap(func(msg log.Msg) log.Msg {
|
||||||
|
return msg.SetLevel(log.Critical)
|
||||||
|
}),
|
||||||
|
GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) tracker.AnnounceRequest {
|
||||||
|
cl.lock()
|
||||||
|
defer cl.unlock()
|
||||||
|
return cl.torrents[infoHash].announceRequest(event)
|
||||||
|
},
|
||||||
|
OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
|
||||||
|
cl.lock()
|
||||||
|
defer cl.unlock()
|
||||||
|
t, ok := cl.torrents[dcc.InfoHash]
|
||||||
|
if !ok {
|
||||||
|
cl.logger.WithDefaultLevel(log.Warning).Printf(
|
||||||
|
"got webrtc conn for unloaded torrent with infohash %x",
|
||||||
|
dcc.InfoHash,
|
||||||
|
)
|
||||||
|
dc.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
go t.onWebRtcConn(dc, dcc)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -627,9 +658,17 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
|
||||||
|
|
||||||
// Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
|
// Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
|
||||||
// for valid reasons.
|
// for valid reasons.
|
||||||
func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr net.Addr,
|
func (cl *Client) handshakesConnection(
|
||||||
network, connString string,
|
ctx context.Context,
|
||||||
) (c *PeerConn, err error) {
|
nc net.Conn,
|
||||||
|
t *Torrent,
|
||||||
|
encryptHeader bool,
|
||||||
|
remoteAddr net.Addr,
|
||||||
|
network,
|
||||||
|
connString string,
|
||||||
|
) (
|
||||||
|
c *PeerConn, err error,
|
||||||
|
) {
|
||||||
c = cl.newConnection(nc, true, remoteAddr, network, connString)
|
c = cl.newConnection(nc, true, remoteAddr, network, connString)
|
||||||
c.headerEncrypted = encryptHeader
|
c.headerEncrypted = encryptHeader
|
||||||
ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
|
ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
|
||||||
|
@ -881,7 +920,11 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
|
||||||
defer t.dropConnection(c)
|
defer t.dropConnection(c)
|
||||||
go c.writer(time.Minute)
|
go c.writer(time.Minute)
|
||||||
cl.sendInitialMessages(c, t)
|
cl.sendInitialMessages(c, t)
|
||||||
return fmt.Errorf("main read loop: %w", c.mainReadLoop())
|
err := c.mainReadLoop()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("main read loop: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// See the order given in Transmission's tr_peerMsgsNew.
|
// See the order given in Transmission's tr_peerMsgsNew.
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -6,7 +6,7 @@ require (
|
||||||
github.com/anacrolix/dht/v2 v2.6.1-0.20200416071723-3850fa1b802a
|
github.com/anacrolix/dht/v2 v2.6.1-0.20200416071723-3850fa1b802a
|
||||||
github.com/anacrolix/envpprof v1.1.0
|
github.com/anacrolix/envpprof v1.1.0
|
||||||
github.com/anacrolix/go-libutp v1.0.2
|
github.com/anacrolix/go-libutp v1.0.2
|
||||||
github.com/anacrolix/log v0.6.1-0.20200416071330-f58a030e6149
|
github.com/anacrolix/log v0.7.0
|
||||||
github.com/anacrolix/missinggo v1.2.1
|
github.com/anacrolix/missinggo v1.2.1
|
||||||
github.com/anacrolix/missinggo/perf v1.0.0
|
github.com/anacrolix/missinggo/perf v1.0.0
|
||||||
github.com/anacrolix/missinggo/v2 v2.4.1-0.20200419051441-747d9d7544c6
|
github.com/anacrolix/missinggo/v2 v2.4.1-0.20200419051441-747d9d7544c6
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -61,6 +61,8 @@ github.com/anacrolix/log v0.6.0 h1:5y+wtTWoecbrAWWuoBCH7UuGFiD6q2jnQxrLK01RC+Q=
|
||||||
github.com/anacrolix/log v0.6.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU=
|
github.com/anacrolix/log v0.6.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU=
|
||||||
github.com/anacrolix/log v0.6.1-0.20200416071330-f58a030e6149 h1:3cEyLU9ObAfTnBHCev8uuWGhbHfol8uTwyMRkLIpZGg=
|
github.com/anacrolix/log v0.6.1-0.20200416071330-f58a030e6149 h1:3cEyLU9ObAfTnBHCev8uuWGhbHfol8uTwyMRkLIpZGg=
|
||||||
github.com/anacrolix/log v0.6.1-0.20200416071330-f58a030e6149/go.mod h1:s5yBP/j046fm9odtUTbHOfDUq/zh1W8OkPpJtnX0oQI=
|
github.com/anacrolix/log v0.6.1-0.20200416071330-f58a030e6149/go.mod h1:s5yBP/j046fm9odtUTbHOfDUq/zh1W8OkPpJtnX0oQI=
|
||||||
|
github.com/anacrolix/log v0.7.0 h1:koGkC/K0LjIbrhLhwfpsfMuvu8nhvY7J4TmLVc1mAwE=
|
||||||
|
github.com/anacrolix/log v0.7.0/go.mod h1:s5yBP/j046fm9odtUTbHOfDUq/zh1W8OkPpJtnX0oQI=
|
||||||
github.com/anacrolix/missinggo v0.0.0-20180522035225-b4a5853e62ff/go.mod h1:b0p+7cn+rWMIphK1gDH2hrDuwGOcbB6V4VXeSsEfHVk=
|
github.com/anacrolix/missinggo v0.0.0-20180522035225-b4a5853e62ff/go.mod h1:b0p+7cn+rWMIphK1gDH2hrDuwGOcbB6V4VXeSsEfHVk=
|
||||||
github.com/anacrolix/missinggo v0.0.0-20180725070939-60ef2fbf63df/go.mod h1:kwGiTUTZ0+p4vAz3VbAI5a30t2YbvemcmspjKwrAz5s=
|
github.com/anacrolix/missinggo v0.0.0-20180725070939-60ef2fbf63df/go.mod h1:kwGiTUTZ0+p4vAz3VbAI5a30t2YbvemcmspjKwrAz5s=
|
||||||
github.com/anacrolix/missinggo v0.2.1-0.20190310234110-9fbdc9f242a8 h1:E2Xb2SBsVzHJ1tNMW9QcckYEQcyBKz1ee8qVjeVRWys=
|
github.com/anacrolix/missinggo v0.2.1-0.20190310234110-9fbdc9f242a8 h1:E2Xb2SBsVzHJ1tNMW9QcckYEQcyBKz1ee8qVjeVRWys=
|
||||||
|
|
50
torrent.go
50
torrent.go
|
@ -1308,8 +1308,10 @@ func (t *Torrent) onWebRtcConn(
|
||||||
t.cl.lock()
|
t.cl.lock()
|
||||||
defer t.cl.unlock()
|
defer t.cl.unlock()
|
||||||
err = t.cl.runHandshookConn(pc, t)
|
err = t.cl.runHandshookConn(pc, t)
|
||||||
|
if err != nil {
|
||||||
t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
|
t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
|
func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
|
||||||
err := t.cl.runHandshookConn(pc, t)
|
err := t.cl.runHandshookConn(pc, t)
|
||||||
|
@ -1322,6 +1324,26 @@ func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
|
||||||
t.logRunHandshookConn(pc, false, log.Debug)
|
t.logRunHandshookConn(pc, false, log.Debug)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
|
||||||
|
wtc, release := t.cl.websocketTrackers.Get(u.String())
|
||||||
|
go func() {
|
||||||
|
<-t.closed.LockedChan(t.cl.locker())
|
||||||
|
release()
|
||||||
|
}()
|
||||||
|
wst := websocketTracker{u, wtc}
|
||||||
|
go func() {
|
||||||
|
err := wtc.Announce(tracker.Started, t.infoHash)
|
||||||
|
if err != nil {
|
||||||
|
t.logger.WithDefaultLevel(log.Warning).Printf(
|
||||||
|
"error in initial announce to %q: %v",
|
||||||
|
u.String(), err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return wst
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (t *Torrent) startScrapingTracker(_url string) {
|
func (t *Torrent) startScrapingTracker(_url string) {
|
||||||
if _url == "" {
|
if _url == "" {
|
||||||
return
|
return
|
||||||
|
@ -1348,33 +1370,7 @@ func (t *Torrent) startScrapingTracker(_url string) {
|
||||||
sl := func() torrentTrackerAnnouncer {
|
sl := func() torrentTrackerAnnouncer {
|
||||||
switch u.Scheme {
|
switch u.Scheme {
|
||||||
case "ws", "wss":
|
case "ws", "wss":
|
||||||
wst := websocketTracker{
|
return t.startWebsocketAnnouncer(*u)
|
||||||
*u,
|
|
||||||
&webtorrent.TrackerClient{
|
|
||||||
Url: u.String(),
|
|
||||||
GetAnnounceRequest: func(event tracker.AnnounceEvent) tracker.AnnounceRequest {
|
|
||||||
t.cl.lock()
|
|
||||||
defer t.cl.unlock()
|
|
||||||
return t.announceRequest(event)
|
|
||||||
},
|
|
||||||
PeerId: t.cl.peerID,
|
|
||||||
InfoHash: t.infoHash,
|
|
||||||
OnConn: t.onWebRtcConn,
|
|
||||||
Logger: t.logger.WithText(func(m log.Msg) string {
|
|
||||||
return fmt.Sprintf("%q: %v", u.String(), m.Text())
|
|
||||||
}).WithDefaultLevel(log.Debug),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
err := wst.TrackerClient.Run()
|
|
||||||
if err != nil {
|
|
||||||
t.logger.WithDefaultLevel(log.Warning).Printf(
|
|
||||||
"error running websocket tracker announcer for %q: %v",
|
|
||||||
u.String(), err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return wst
|
|
||||||
}
|
}
|
||||||
if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
|
if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -18,36 +18,35 @@ import (
|
||||||
// Client represents the webtorrent client
|
// Client represents the webtorrent client
|
||||||
type TrackerClient struct {
|
type TrackerClient struct {
|
||||||
Url string
|
Url string
|
||||||
GetAnnounceRequest func(tracker.AnnounceEvent) tracker.AnnounceRequest
|
GetAnnounceRequest func(_ tracker.AnnounceEvent, infoHash [20]byte) tracker.AnnounceRequest
|
||||||
PeerId [20]byte
|
PeerId [20]byte
|
||||||
InfoHash [20]byte
|
|
||||||
OnConn onDataChannelOpen
|
OnConn onDataChannelOpen
|
||||||
Logger log.Logger
|
Logger log.Logger
|
||||||
|
|
||||||
lock sync.Mutex
|
mu sync.Mutex
|
||||||
|
cond sync.Cond
|
||||||
outboundOffers map[string]outboundOffer // OfferID to outboundOffer
|
outboundOffers map[string]outboundOffer // OfferID to outboundOffer
|
||||||
wsConn *websocket.Conn
|
wsConn *websocket.Conn
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (me *TrackerClient) peerIdBinary() string {
|
func (me *TrackerClient) peerIdBinary() string {
|
||||||
return binaryToJsonString(me.PeerId[:])
|
return binaryToJsonString(me.PeerId[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (me *TrackerClient) infoHashBinary() string {
|
|
||||||
return binaryToJsonString(me.InfoHash[:])
|
|
||||||
}
|
|
||||||
|
|
||||||
// outboundOffer represents an outstanding offer.
|
// outboundOffer represents an outstanding offer.
|
||||||
type outboundOffer struct {
|
type outboundOffer struct {
|
||||||
originalOffer webrtc.SessionDescription
|
originalOffer webrtc.SessionDescription
|
||||||
peerConnection wrappedPeerConnection
|
peerConnection wrappedPeerConnection
|
||||||
dataChannel *webrtc.DataChannel
|
dataChannel *webrtc.DataChannel
|
||||||
|
infoHash [20]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type DataChannelContext struct {
|
type DataChannelContext struct {
|
||||||
Local, Remote webrtc.SessionDescription
|
Local, Remote webrtc.SessionDescription
|
||||||
OfferId string
|
OfferId string
|
||||||
LocalOffered bool
|
LocalOffered bool
|
||||||
|
InfoHash [20]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
|
type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
|
||||||
|
@ -60,26 +59,41 @@ func (tc *TrackerClient) doWebsocket() error {
|
||||||
}
|
}
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
tc.Logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", tc.Url)
|
tc.Logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", tc.Url)
|
||||||
|
tc.mu.Lock()
|
||||||
tc.wsConn = c
|
tc.wsConn = c
|
||||||
go func() {
|
tc.cond.Broadcast()
|
||||||
err := tc.announce(tracker.Started)
|
tc.mu.Unlock()
|
||||||
if err != nil {
|
|
||||||
tc.Logger.WithDefaultLevel(log.Error).Printf("error in initial announce: %v", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
err = tc.trackerReadLoop(tc.wsConn)
|
err = tc.trackerReadLoop(tc.wsConn)
|
||||||
tc.lock.Lock()
|
tc.mu.Lock()
|
||||||
tc.closeUnusedOffers()
|
tc.closeUnusedOffers()
|
||||||
tc.lock.Unlock()
|
c.Close()
|
||||||
|
tc.mu.Unlock()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *TrackerClient) Run() error {
|
func (tc *TrackerClient) Run() error {
|
||||||
for {
|
tc.cond.L = &tc.mu
|
||||||
|
tc.mu.Lock()
|
||||||
|
for !tc.closed {
|
||||||
|
tc.mu.Unlock()
|
||||||
err := tc.doWebsocket()
|
err := tc.doWebsocket()
|
||||||
tc.Logger.WithDefaultLevel(log.Warning).Printf("websocket instance ended: %v", err)
|
tc.Logger.WithDefaultLevel(log.Warning).Printf("websocket instance ended: %v", err)
|
||||||
time.Sleep(time.Minute)
|
time.Sleep(time.Minute)
|
||||||
|
tc.mu.Lock()
|
||||||
}
|
}
|
||||||
|
tc.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tc *TrackerClient) Close() error {
|
||||||
|
tc.mu.Lock()
|
||||||
|
tc.closed = true
|
||||||
|
if tc.wsConn != nil {
|
||||||
|
tc.wsConn.Close()
|
||||||
|
}
|
||||||
|
tc.mu.Unlock()
|
||||||
|
tc.cond.Broadcast()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *TrackerClient) closeUnusedOffers() {
|
func (tc *TrackerClient) closeUnusedOffers() {
|
||||||
|
@ -89,7 +103,7 @@ func (tc *TrackerClient) closeUnusedOffers() {
|
||||||
tc.outboundOffers = nil
|
tc.outboundOffers = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *TrackerClient) announce(event tracker.AnnounceEvent) error {
|
func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte) error {
|
||||||
metrics.Add("outbound announces", 1)
|
metrics.Add("outbound announces", 1)
|
||||||
var randOfferId [20]byte
|
var randOfferId [20]byte
|
||||||
_, err := rand.Read(randOfferId[:])
|
_, err := rand.Read(randOfferId[:])
|
||||||
|
@ -103,7 +117,7 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent) error {
|
||||||
return fmt.Errorf("creating offer: %w", err)
|
return fmt.Errorf("creating offer: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
request := tc.GetAnnounceRequest(event)
|
request := tc.GetAnnounceRequest(event, infoHash)
|
||||||
|
|
||||||
req := AnnounceRequest{
|
req := AnnounceRequest{
|
||||||
Numwant: 1, // If higher we need to create equal amount of offers.
|
Numwant: 1, // If higher we need to create equal amount of offers.
|
||||||
|
@ -112,7 +126,7 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent) error {
|
||||||
Left: request.Left,
|
Left: request.Left,
|
||||||
Event: request.Event.String(),
|
Event: request.Event.String(),
|
||||||
Action: "announce",
|
Action: "announce",
|
||||||
InfoHash: tc.infoHashBinary(),
|
InfoHash: binaryToJsonString(infoHash[:]),
|
||||||
PeerID: tc.peerIdBinary(),
|
PeerID: tc.peerIdBinary(),
|
||||||
Offers: []Offer{{
|
Offers: []Offer{{
|
||||||
OfferID: offerIDBinary,
|
OfferID: offerIDBinary,
|
||||||
|
@ -125,10 +139,9 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent) error {
|
||||||
return fmt.Errorf("marshalling request: %w", err)
|
return fmt.Errorf("marshalling request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tc.lock.Lock()
|
tc.mu.Lock()
|
||||||
defer tc.lock.Unlock()
|
defer tc.mu.Unlock()
|
||||||
|
err = tc.writeMessage(data)
|
||||||
err = tc.wsConn.WriteMessage(websocket.TextMessage, data)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pc.Close()
|
pc.Close()
|
||||||
return fmt.Errorf("write AnnounceRequest: %w", err)
|
return fmt.Errorf("write AnnounceRequest: %w", err)
|
||||||
|
@ -140,69 +153,106 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent) error {
|
||||||
peerConnection: pc,
|
peerConnection: pc,
|
||||||
dataChannel: dc,
|
dataChannel: dc,
|
||||||
originalOffer: offer,
|
originalOffer: offer,
|
||||||
|
infoHash: infoHash,
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tc *TrackerClient) writeMessage(data []byte) error {
|
||||||
|
for tc.wsConn == nil {
|
||||||
|
if tc.closed {
|
||||||
|
return fmt.Errorf("%T closed", tc)
|
||||||
|
}
|
||||||
|
tc.cond.Wait()
|
||||||
|
}
|
||||||
|
return tc.wsConn.WriteMessage(websocket.TextMessage, data)
|
||||||
|
}
|
||||||
|
|
||||||
func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
|
func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
|
||||||
for {
|
for {
|
||||||
_, message, err := tracker.ReadMessage()
|
_, message, err := tracker.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("read message error: %w", err)
|
return fmt.Errorf("read message error: %w", err)
|
||||||
}
|
}
|
||||||
tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
|
//tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
|
||||||
|
|
||||||
var ar AnnounceResponse
|
var ar AnnounceResponse
|
||||||
if err := json.Unmarshal(message, &ar); err != nil {
|
if err := json.Unmarshal(message, &ar); err != nil {
|
||||||
tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
|
tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if ar.InfoHash != tc.infoHashBinary() {
|
|
||||||
tc.Logger.Printf("announce response for different hash: expected %q got %q", tc.infoHashBinary(), ar.InfoHash)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
switch {
|
switch {
|
||||||
case ar.Offer != nil:
|
case ar.Offer != nil:
|
||||||
answer, err := getAnswerForOffer(*ar.Offer, tc.OnConn, ar.OfferID)
|
ih, err := jsonStringToInfoHash(ar.InfoHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("write AnnounceResponse: %w", err)
|
tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
tc.handleOffer(*ar.Offer, ar.OfferID, ih, ar.PeerID)
|
||||||
req := AnnounceResponse{
|
|
||||||
Action: "announce",
|
|
||||||
InfoHash: tc.infoHashBinary(),
|
|
||||||
PeerID: tc.peerIdBinary(),
|
|
||||||
ToPeerID: ar.PeerID,
|
|
||||||
Answer: &answer,
|
|
||||||
OfferID: ar.OfferID,
|
|
||||||
}
|
|
||||||
data, err := json.Marshal(req)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to marshal request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
tc.lock.Lock()
|
|
||||||
err = tracker.WriteMessage(websocket.TextMessage, data)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("write AnnounceResponse: %w", err)
|
|
||||||
tc.lock.Unlock()
|
|
||||||
}
|
|
||||||
tc.lock.Unlock()
|
|
||||||
case ar.Answer != nil:
|
case ar.Answer != nil:
|
||||||
tc.handleAnswer(ar.OfferID, *ar.Answer)
|
tc.handleAnswer(ar.OfferID, *ar.Answer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tc *TrackerClient) handleOffer(
|
||||||
|
offer webrtc.SessionDescription,
|
||||||
|
offerId string,
|
||||||
|
infoHash [20]byte,
|
||||||
|
peerId string,
|
||||||
|
) error {
|
||||||
|
peerConnection, answer, err := newAnsweringPeerConnection(offer)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("write AnnounceResponse: %w", err)
|
||||||
|
}
|
||||||
|
response := AnnounceResponse{
|
||||||
|
Action: "announce",
|
||||||
|
InfoHash: binaryToJsonString(infoHash[:]),
|
||||||
|
PeerID: tc.peerIdBinary(),
|
||||||
|
ToPeerID: peerId,
|
||||||
|
Answer: &answer,
|
||||||
|
OfferID: offerId,
|
||||||
|
}
|
||||||
|
data, err := json.Marshal(response)
|
||||||
|
if err != nil {
|
||||||
|
peerConnection.Close()
|
||||||
|
return fmt.Errorf("marshalling response: %w", err)
|
||||||
|
}
|
||||||
|
tc.mu.Lock()
|
||||||
|
defer tc.mu.Unlock()
|
||||||
|
if err := tc.writeMessage(data); err != nil {
|
||||||
|
peerConnection.Close()
|
||||||
|
return fmt.Errorf("writing response: %w", err)
|
||||||
|
}
|
||||||
|
timer := time.AfterFunc(30*time.Second, func() {
|
||||||
|
metrics.Add("answering peer connections timed out", 1)
|
||||||
|
peerConnection.Close()
|
||||||
|
})
|
||||||
|
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
|
||||||
|
setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
|
||||||
|
timer.Stop()
|
||||||
|
metrics.Add("answering peer connection conversions", 1)
|
||||||
|
tc.OnConn(dc, DataChannelContext{
|
||||||
|
Local: answer,
|
||||||
|
Remote: offer,
|
||||||
|
OfferId: offerId,
|
||||||
|
LocalOffered: false,
|
||||||
|
InfoHash: infoHash,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
|
func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
|
||||||
tc.lock.Lock()
|
tc.mu.Lock()
|
||||||
defer tc.lock.Unlock()
|
defer tc.mu.Unlock()
|
||||||
offer, ok := tc.outboundOffers[offerId]
|
offer, ok := tc.outboundOffers[offerId]
|
||||||
if !ok {
|
if !ok {
|
||||||
tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", offerId)
|
tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", offerId)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tc.Logger.Printf("offer %q got answer %v", offerId, answer)
|
//tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
|
||||||
metrics.Add("outbound offers answered", 1)
|
metrics.Add("outbound offers answered", 1)
|
||||||
err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
|
err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
|
||||||
metrics.Add("outbound offers answered with datachannel", 1)
|
metrics.Add("outbound offers answered with datachannel", 1)
|
||||||
|
@ -211,6 +261,7 @@ func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescr
|
||||||
Remote: answer,
|
Remote: answer,
|
||||||
OfferId: offerId,
|
OfferId: offerId,
|
||||||
LocalOffered: true,
|
LocalOffered: true,
|
||||||
|
InfoHash: offer.infoHash,
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -218,5 +269,5 @@ func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescr
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
delete(tc.outboundOffers, offerId)
|
delete(tc.outboundOffers, offerId)
|
||||||
go tc.announce(tracker.None)
|
go tc.Announce(tracker.None, offer.infoHash)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
package webtorrent
|
package webtorrent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
|
||||||
"github.com/pion/webrtc/v2"
|
"github.com/pion/webrtc/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -43,3 +46,21 @@ func binaryToJsonString(b []byte) string {
|
||||||
}
|
}
|
||||||
return string(seq)
|
return string(seq)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func jsonStringToInfoHash(s string) (ih [20]byte, err error) {
|
||||||
|
defer func() {
|
||||||
|
r := recover()
|
||||||
|
if r == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
panic(fmt.Sprintf("%q", s))
|
||||||
|
}()
|
||||||
|
for i, c := range []rune(s) {
|
||||||
|
if c < 0 || c > math.MaxUint8 {
|
||||||
|
err = fmt.Errorf("bad infohash string: %v", s)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ih[i] = byte(c)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/anacrolix/missinggo/v2/pproffd"
|
"github.com/anacrolix/missinggo/v2/pproffd"
|
||||||
"github.com/pion/datachannel"
|
"github.com/pion/datachannel"
|
||||||
|
@ -78,9 +77,7 @@ func newOffer() (
|
||||||
|
|
||||||
func initAnsweringPeerConnection(
|
func initAnsweringPeerConnection(
|
||||||
peerConnection wrappedPeerConnection,
|
peerConnection wrappedPeerConnection,
|
||||||
offerId string,
|
|
||||||
offer webrtc.SessionDescription,
|
offer webrtc.SessionDescription,
|
||||||
onOpen onDataChannelOpen,
|
|
||||||
) (answer webrtc.SessionDescription, err error) {
|
) (answer webrtc.SessionDescription, err error) {
|
||||||
err = peerConnection.SetRemoteDescription(offer)
|
err = peerConnection.SetRemoteDescription(offer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -94,35 +91,22 @@ func initAnsweringPeerConnection(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
timer := time.AfterFunc(30*time.Second, func() {
|
|
||||||
metrics.Add("answering peer connections timed out", 1)
|
|
||||||
peerConnection.Close()
|
|
||||||
})
|
|
||||||
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
|
|
||||||
setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
|
|
||||||
timer.Stop()
|
|
||||||
metrics.Add("answering peer connection conversions", 1)
|
|
||||||
onOpen(dc, DataChannelContext{answer, offer, offerId, false})
|
|
||||||
})
|
|
||||||
})
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// getAnswerForOffer creates a transport from a WebRTC offer and and returns a WebRTC answer to be
|
// newAnsweringPeerConnection creates a transport from a WebRTC offer and and returns a WebRTC answer to be
|
||||||
// announced.
|
// announced.
|
||||||
func getAnswerForOffer(
|
func newAnsweringPeerConnection(offer webrtc.SessionDescription) (
|
||||||
offer webrtc.SessionDescription, onOpen onDataChannelOpen, offerId string,
|
peerConn wrappedPeerConnection, answer webrtc.SessionDescription, err error,
|
||||||
) (
|
|
||||||
answer webrtc.SessionDescription, err error,
|
|
||||||
) {
|
) {
|
||||||
peerConnection, err := newPeerConnection()
|
peerConn, err = newPeerConnection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to peer connection: %w", err)
|
err = fmt.Errorf("failed to create new connection: %w", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
answer, err = initAnsweringPeerConnection(peerConnection, offerId, offer, onOpen)
|
answer, err = initAnsweringPeerConnection(peerConn, offer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peerConnection.Close()
|
peerConn.Close()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
58
wstracker.go
58
wstracker.go
|
@ -3,8 +3,13 @@ package torrent
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/anacrolix/log"
|
||||||
|
|
||||||
|
"github.com/anacrolix/torrent/tracker"
|
||||||
"github.com/anacrolix/torrent/webtorrent"
|
"github.com/anacrolix/torrent/webtorrent"
|
||||||
|
"github.com/pion/datachannel"
|
||||||
)
|
)
|
||||||
|
|
||||||
type websocketTracker struct {
|
type websocketTracker struct {
|
||||||
|
@ -19,3 +24,56 @@ func (me websocketTracker) statusLine() string {
|
||||||
func (me websocketTracker) URL() url.URL {
|
func (me websocketTracker) URL() url.URL {
|
||||||
return me.url
|
return me.url
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type refCountedWebtorrentTrackerClient struct {
|
||||||
|
webtorrent.TrackerClient
|
||||||
|
refCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
type websocketTrackers struct {
|
||||||
|
PeerId [20]byte
|
||||||
|
Logger log.Logger
|
||||||
|
GetAnnounceRequest func(event tracker.AnnounceEvent, infoHash [20]byte) tracker.AnnounceRequest
|
||||||
|
OnConn func(datachannel.ReadWriteCloser, webtorrent.DataChannelContext)
|
||||||
|
mu sync.Mutex
|
||||||
|
clients map[string]*refCountedWebtorrentTrackerClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (me *websocketTrackers) Get(url string) (*webtorrent.TrackerClient, func()) {
|
||||||
|
me.mu.Lock()
|
||||||
|
defer me.mu.Unlock()
|
||||||
|
value, ok := me.clients[url]
|
||||||
|
if !ok {
|
||||||
|
value = &refCountedWebtorrentTrackerClient{
|
||||||
|
TrackerClient: webtorrent.TrackerClient{
|
||||||
|
Url: url,
|
||||||
|
GetAnnounceRequest: me.GetAnnounceRequest,
|
||||||
|
PeerId: me.PeerId,
|
||||||
|
OnConn: me.OnConn,
|
||||||
|
Logger: me.Logger.WithText(func(m log.Msg) string {
|
||||||
|
return fmt.Sprintf("tracker client for %q: %v", url, m)
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
err := value.TrackerClient.Run()
|
||||||
|
if err != nil {
|
||||||
|
me.Logger.Printf("error running tracker client for %q: %v", url, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if me.clients == nil {
|
||||||
|
me.clients = make(map[string]*refCountedWebtorrentTrackerClient)
|
||||||
|
}
|
||||||
|
me.clients[url] = value
|
||||||
|
}
|
||||||
|
value.refCount++
|
||||||
|
return &value.TrackerClient, func() {
|
||||||
|
me.mu.Lock()
|
||||||
|
defer me.mu.Unlock()
|
||||||
|
value.refCount--
|
||||||
|
if value.refCount == 0 {
|
||||||
|
value.TrackerClient.Close()
|
||||||
|
delete(me.clients, url)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue