Tidy up the webtorrent package, remove buffer

This commit is contained in:
Matt Joiner 2020-04-13 14:31:39 +10:00
parent 1b3cbc1287
commit e5fd9ede54
6 changed files with 70 additions and 131 deletions

View File

@ -1322,7 +1322,7 @@ func (t *Torrent) startScrapingTracker(_url string) {
return fmt.Sprintf("%q: %v", u.String(), m.Text())
}))}
go func() {
err := wst.Client.Run(t.announceRequest(tracker.Started), u.String())
err := wst.TrackerClient.Run(t.announceRequest(tracker.Started), u.String())
if err != nil {
t.logger.WithValues(log.Error).Printf("error running websocket tracker announcer: %v", err)
}

View File

@ -1,68 +0,0 @@
// Package buffer mirrors the Node.JS buffer type.
package buffer
import (
"crypto/rand"
"encoding/base64"
"encoding/hex"
"fmt"
)
// Buffer mirrors the Node.JS Buffer type.
type Buffer struct {
b []byte
}
// New creates a new buffer from b
func New(b []byte) *Buffer {
return &Buffer{b: b}
}
// From creates a new buffer from a string
func From(s string) *Buffer {
return &Buffer{b: []byte(s)}
}
// FromHex creates a new buffer from a hex string.
func FromHex(in string) (*Buffer, error) {
decoded, err := hex.DecodeString(in)
if err != nil {
return nil, fmt.Errorf("failed to decode hex: %v", err)
}
return &Buffer{b: decoded}, nil
}
// ToStringBase64 turns the buffer into a base64 string.
func (b *Buffer) ToStringBase64() string {
return base64.StdEncoding.EncodeToString(b.b)
}
// ToStringLatin1 turns the buffer into a string using
// Latin-1 supplement block and C0/C1 control codes.
func (b *Buffer) ToStringLatin1() string {
seq := []rune{}
for _, v := range b.b {
seq = append(seq, rune(v))
}
return string(seq)
}
// ToStringHex converts the buffer to a hex string
func (b *Buffer) ToStringHex() string {
return hex.EncodeToString(b.b)
}
// RandomBytes returns securely generated random bytes.
// It will return an error if the system's secure random
// number generator fails to function correctly, in which
// case the caller should not continue.
func RandomBytes(n int) (*Buffer, error) {
b := make([]byte, n)
_, err := rand.Read(b)
// Note that err == nil only if we read len(b) bytes.
if err != nil {
return nil, err
}
return New(b), nil
}

View File

@ -1,6 +1,7 @@
package webtorrent
import (
"crypto/rand"
"encoding/json"
"fmt"
"sync"
@ -8,14 +9,13 @@ import (
"github.com/anacrolix/log"
"github.com/anacrolix/torrent/tracker"
"github.com/anacrolix/torrent/webtorrent/buffer"
"github.com/gorilla/websocket"
"github.com/pion/datachannel"
"github.com/pion/webrtc/v2"
)
// Client represents the webtorrent client
type Client struct {
type TrackerClient struct {
lock sync.Mutex
peerIDBinary string
infoHashBinary string
@ -28,15 +28,7 @@ type Client struct {
// outboundOffer represents an outstanding offer.
type outboundOffer struct {
originalOffer webrtc.SessionDescription
transport *Transport
}
func binaryToJsonString(b []byte) string {
var seq []rune
for _, v := range b {
seq = append(seq, rune(v))
}
return string(seq)
transport *transport
}
type DataChannelContext struct {
@ -47,8 +39,8 @@ type DataChannelContext struct {
type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
func NewClient(peerId, infoHash [20]byte, onConn onDataChannelOpen, logger log.Logger) *Client {
return &Client{
func NewClient(peerId, infoHash [20]byte, onConn onDataChannelOpen, logger log.Logger) *TrackerClient {
return &TrackerClient{
outboundOffers: make(map[string]outboundOffer),
peerIDBinary: binaryToJsonString(peerId[:]),
infoHashBinary: binaryToJsonString(infoHash[:]),
@ -57,7 +49,7 @@ func NewClient(peerId, infoHash [20]byte, onConn onDataChannelOpen, logger log.L
}
}
func (c *Client) Run(ar tracker.AnnounceRequest, url string) error {
func (c *TrackerClient) Run(ar tracker.AnnounceRequest, url string) error {
t, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return fmt.Errorf("failed to dial tracker: %w", err)
@ -75,17 +67,18 @@ func (c *Client) Run(ar tracker.AnnounceRequest, url string) error {
return c.trackerReadLoop()
}
func (c *Client) announce(request tracker.AnnounceRequest) error {
transport, offer, err := NewTransport()
func (c *TrackerClient) announce(request tracker.AnnounceRequest) error {
transport, offer, err := newTransport()
if err != nil {
return fmt.Errorf("failed to create transport: %w", err)
}
randOfferID, err := buffer.RandomBytes(20)
var randOfferId [20]byte
_, err = rand.Read(randOfferId[:])
if err != nil {
return fmt.Errorf("failed to generate bytes: %w", err)
}
offerIDBinary := randOfferID.ToStringLatin1()
offerIDBinary := binaryToJsonString(randOfferId[:])
c.lock.Lock()
c.outboundOffers[offerIDBinary] = outboundOffer{
@ -124,7 +117,7 @@ func (c *Client) announce(request tracker.AnnounceRequest) error {
return nil
}
func (c *Client) trackerReadLoop() error {
func (c *TrackerClient) trackerReadLoop() error {
c.lock.Lock()
tracker := c.tracker
@ -147,7 +140,7 @@ func (c *Client) trackerReadLoop() error {
}
switch {
case ar.Offer != nil:
_, answer, err := NewTransportFromOffer(*ar.Offer, c.onConn, ar.OfferID)
_, answer, err := newTransportFromOffer(*ar.Offer, c.onConn, ar.OfferID)
if err != nil {
return fmt.Errorf("write AnnounceResponse: %w", err)
}
@ -195,33 +188,3 @@ func (c *Client) trackerReadLoop() error {
}
}
}
type AnnounceRequest struct {
Numwant int `json:"numwant"`
Uploaded int `json:"uploaded"`
Downloaded int `json:"downloaded"`
Left int64 `json:"left"`
Event string `json:"event"`
Action string `json:"action"`
InfoHash string `json:"info_hash"`
PeerID string `json:"peer_id"`
Offers []Offer `json:"offers"`
}
type Offer struct {
OfferID string `json:"offer_id"`
Offer webrtc.SessionDescription `json:"offer"`
}
type AnnounceResponse struct {
InfoHash string `json:"info_hash"`
Action string `json:"action"`
Interval *int `json:"interval,omitempty"`
Complete *int `json:"complete,omitempty"`
Incomplete *int `json:"incomplete,omitempty"`
PeerID string `json:"peer_id,omitempty"`
ToPeerID string `json:"to_peer_id,omitempty"`
Answer *webrtc.SessionDescription `json:"answer,omitempty"`
Offer *webrtc.SessionDescription `json:"offer,omitempty"`
OfferID string `json:"offer_id,omitempty"`
}

View File

@ -0,0 +1,45 @@
package webtorrent
import (
"github.com/pion/webrtc/v2"
)
type AnnounceRequest struct {
Numwant int `json:"numwant"`
Uploaded int `json:"uploaded"`
Downloaded int `json:"downloaded"`
Left int64 `json:"left"`
Event string `json:"event"`
Action string `json:"action"`
InfoHash string `json:"info_hash"`
PeerID string `json:"peer_id"`
Offers []Offer `json:"offers"`
}
type Offer struct {
OfferID string `json:"offer_id"`
Offer webrtc.SessionDescription `json:"offer"`
}
type AnnounceResponse struct {
InfoHash string `json:"info_hash"`
Action string `json:"action"`
Interval *int `json:"interval,omitempty"`
Complete *int `json:"complete,omitempty"`
Incomplete *int `json:"incomplete,omitempty"`
PeerID string `json:"peer_id,omitempty"`
ToPeerID string `json:"to_peer_id,omitempty"`
Answer *webrtc.SessionDescription `json:"answer,omitempty"`
Offer *webrtc.SessionDescription `json:"offer,omitempty"`
OfferID string `json:"offer_id,omitempty"`
}
// I wonder if this is a defacto standard way to decode bytes to JSON for webtorrent. I don't really
// care.
func binaryToJsonString(b []byte) string {
var seq []rune
for _, v := range b {
seq = append(seq, rune(v))
}
return string(seq)
}

View File

@ -12,8 +12,7 @@ import (
var (
api = func() *webrtc.API {
// Enable the detach API (since it's non-standard but more idiomatic)
// (This should be done once globally)
// Enable the detach API (since it's non-standard but more idiomatic).
s := webrtc.SettingEngine{}
s.DetachDataChannels()
return webrtc.NewAPI(webrtc.WithSettingEngine(s))
@ -28,15 +27,15 @@ func newPeerConnection() (*webrtc.PeerConnection, error) {
return api.NewPeerConnection(config)
}
type Transport struct {
type transport struct {
pc *webrtc.PeerConnection
dc *webrtc.DataChannel
lock sync.Mutex
}
// NewTransport creates a transport and returns a WebRTC offer to be announced
func NewTransport() (*Transport, webrtc.SessionDescription, error) {
// newTransport creates a transport and returns a WebRTC offer to be announced
func newTransport() (*transport, webrtc.SessionDescription, error) {
peerConnection, err := newPeerConnection()
if err != nil {
return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to peer connection: %v\n", err)
@ -61,13 +60,13 @@ func NewTransport() (*Transport, webrtc.SessionDescription, error) {
return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to set local description: %v\n", err)
}
t := &Transport{pc: peerConnection, dc: dataChannel}
t := &transport{pc: peerConnection, dc: dataChannel}
return t, offer, nil
}
// NewTransportFromOffer creates a transport from a WebRTC offer and and returns a WebRTC answer to
// newTransportFromOffer creates a transport from a WebRTC offer and and returns a WebRTC answer to
// be announced.
func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen onDataChannelOpen, offerId string) (*Transport, webrtc.SessionDescription, error) {
func newTransportFromOffer(offer webrtc.SessionDescription, onOpen onDataChannelOpen, offerId string) (*transport, webrtc.SessionDescription, error) {
peerConnection, err := newPeerConnection()
if err != nil {
return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to peer connection: %v", err)
@ -76,7 +75,7 @@ func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen onDataChannel
fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
})
t := &Transport{pc: peerConnection}
t := &transport{pc: peerConnection}
err = peerConnection.SetRemoteDescription(offer)
if err != nil {
@ -104,7 +103,7 @@ func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen onDataChannel
}
// SetAnswer sets the WebRTC answer
func (t *Transport) SetAnswer(answer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) error {
func (t *transport) SetAnswer(answer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) error {
t.handleOpen(onOpen)
err := t.pc.SetRemoteDescription(answer)
@ -114,7 +113,7 @@ func (t *Transport) SetAnswer(answer webrtc.SessionDescription, onOpen func(data
return nil
}
func (t *Transport) handleOpen(onOpen func(datachannel.ReadWriteCloser)) {
func (t *transport) handleOpen(onOpen func(datachannel.ReadWriteCloser)) {
t.lock.Lock()
dc := t.dc
t.lock.Unlock()

View File

@ -9,7 +9,7 @@ import (
type websocketTracker struct {
url url.URL
*webtorrent.Client
*webtorrent.TrackerClient
}
func (me websocketTracker) statusLine() string {