webtorrent: Create data channel earlier per webrtc examples

This commit is contained in:
Matt Joiner 2022-07-11 11:39:54 +10:00
parent 0a4c416881
commit 9694c66b36
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
5 changed files with 150 additions and 118 deletions

3
otel.go Normal file
View File

@ -0,0 +1,3 @@
package torrent
const tracerName = "anacrolix.torrent"

View File

@ -1,22 +1,6 @@
package webtorrent package webtorrent
import (
"context"
"github.com/pion/webrtc/v3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
const ( const (
tracerName = "anacrolix.torrent.webtorrent" tracerName = "anacrolix.torrent.webtorrent"
webrtcConnTypeKey = "webtorrent.webrtc.conn.type" webrtcConnTypeKey = "webtorrent.webrtc.conn.type"
) )
func dataChannelStarted(peerConnectionCtx context.Context, dc *webrtc.DataChannel) (dataChannelCtx context.Context, span trace.Span) {
trace.SpanFromContext(peerConnectionCtx).AddEvent("starting data channel")
dataChannelCtx, span = otel.Tracer(tracerName).Start(peerConnectionCtx, "DataChannel")
dc.OnClose(func() {
span.End()
})
return
}

View File

@ -5,7 +5,6 @@ import (
"crypto/rand" "crypto/rand"
"encoding/json" "encoding/json"
"fmt" "fmt"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"sync" "sync"
"time" "time"
@ -57,18 +56,17 @@ type outboundOffer struct {
originalOffer webrtc.SessionDescription originalOffer webrtc.SessionDescription
peerConnection *wrappedPeerConnection peerConnection *wrappedPeerConnection
infoHash [20]byte infoHash [20]byte
dataChannel *webrtc.DataChannel
} }
type DataChannelContext struct { type DataChannelContext struct {
// Can these be obtained by just calling the relevant methods on peerConnection? OfferId string
Local, Remote webrtc.SessionDescription LocalOffered bool
OfferId string InfoHash [20]byte
LocalOffered bool
InfoHash [20]byte
// This is private as some methods might not be appropriate with data channel context. // This is private as some methods might not be appropriate with data channel context.
peerConnection *wrappedPeerConnection peerConnection *wrappedPeerConnection
span trace.Span Span trace.Span
ctx context.Context Context context.Context
} }
func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) { func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) {
@ -211,7 +209,7 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte
} }
offerIDBinary := binaryToJsonString(randOfferId[:]) offerIDBinary := binaryToJsonString(randOfferId[:])
pc, offer, err := newOffer(tc.Logger) pc, dc, offer, err := tc.newOffer(tc.Logger, offerIDBinary, infoHash)
if err != nil { if err != nil {
return fmt.Errorf("creating offer: %w", err) return fmt.Errorf("creating offer: %w", err)
} }
@ -257,6 +255,7 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte
peerConnection: pc, peerConnection: pc,
originalOffer: offer, originalOffer: offer,
infoHash: infoHash, infoHash: infoHash,
dataChannel: dc,
} }
return nil return nil
} }
@ -291,30 +290,43 @@ func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err) tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
break break
} }
tc.handleOffer(*ar.Offer, ar.OfferID, ih, ar.PeerID) err = tc.handleOffer(offerContext{
SessDesc: *ar.Offer,
Id: ar.OfferID,
InfoHash: ih,
}, ar.PeerID)
if err != nil {
tc.Logger.Levelf(log.Error, "handling offer for infohash %x: %v", ih, err)
}
case ar.Answer != nil: case ar.Answer != nil:
tc.handleAnswer(ar.OfferID, *ar.Answer) tc.handleAnswer(ar.OfferID, *ar.Answer)
default:
tc.Logger.Levelf(log.Warning, "unhandled announce response %q", message)
} }
} }
} }
type offerContext struct {
SessDesc webrtc.SessionDescription
Id string
InfoHash [20]byte
}
func (tc *TrackerClient) handleOffer( func (tc *TrackerClient) handleOffer(
offer webrtc.SessionDescription, offerContext offerContext,
offerId string,
infoHash [20]byte,
peerId string, peerId string,
) error { ) error {
peerConnection, answer, err := newAnsweringPeerConnection(tc.Logger, offer) peerConnection, answer, err := tc.newAnsweringPeerConnection(offerContext)
if err != nil { if err != nil {
return fmt.Errorf("write AnnounceResponse: %w", err) return fmt.Errorf("creating answering peer connection: %w", err)
} }
response := AnnounceResponse{ response := AnnounceResponse{
Action: "announce", Action: "announce",
InfoHash: binaryToJsonString(infoHash[:]), InfoHash: binaryToJsonString(offerContext.InfoHash[:]),
PeerID: tc.peerIdBinary(), PeerID: tc.peerIdBinary(),
ToPeerID: peerId, ToPeerID: peerId,
Answer: &answer, Answer: &answer,
OfferID: offerId, OfferID: offerContext.Id,
} }
data, err := json.Marshal(response) data, err := json.Marshal(response)
if err != nil { if err != nil {
@ -327,31 +339,6 @@ func (tc *TrackerClient) handleOffer(
peerConnection.Close() peerConnection.Close()
return fmt.Errorf("writing response: %w", err) return fmt.Errorf("writing response: %w", err)
} }
timer := time.AfterFunc(30*time.Second, func() {
peerConnection.span.SetStatus(codes.Error, "answer timeout")
metrics.Add("answering peer connections timed out", 1)
peerConnection.Close()
})
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
ctx, span := dataChannelStarted(peerConnection.ctx, d)
setDataChannelOnOpen(ctx, d, peerConnection, func(dc datachannel.ReadWriteCloser) {
timer.Stop()
metrics.Add("answering peer connection conversions", 1)
tc.mu.Lock()
tc.stats.ConvertedInboundConns++
tc.mu.Unlock()
tc.OnConn(dc, DataChannelContext{
Local: answer,
Remote: offer,
OfferId: offerId,
LocalOffered: false,
InfoHash: infoHash,
peerConnection: peerConnection,
ctx: ctx,
span: span,
})
})
})
return nil return nil
} }
@ -365,44 +352,13 @@ func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescr
} }
// tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer) // tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
metrics.Add("outbound offers answered", 1) metrics.Add("outbound offers answered", 1)
// Why do we create the data channel before setting the remote description? Are we trying to avoid the peer err := offer.peerConnection.SetRemoteDescription(answer)
// initiating?
dataChannel, err := offer.peerConnection.CreateDataChannel("webrtc-datachannel", nil)
if err != nil {
err = fmt.Errorf("creating data channel: %w", err)
tc.Logger.LevelPrint(log.Error, err)
offer.peerConnection.span.RecordError(err)
offer.peerConnection.Close()
goto deleteOffer
}
{
ctx, span := dataChannelStarted(offer.peerConnection.ctx, dataChannel)
setDataChannelOnOpen(ctx, dataChannel, offer.peerConnection, func(dc datachannel.ReadWriteCloser) {
metrics.Add("outbound offers answered with datachannel", 1)
tc.mu.Lock()
tc.stats.ConvertedOutboundConns++
tc.mu.Unlock()
tc.OnConn(dc, DataChannelContext{
Local: offer.originalOffer,
Remote: answer,
OfferId: offerId,
LocalOffered: true,
InfoHash: offer.infoHash,
peerConnection: offer.peerConnection,
ctx: ctx,
span: span,
})
})
}
err = offer.peerConnection.SetRemoteDescription(answer)
if err != nil { if err != nil {
err = fmt.Errorf("using outbound offer answer: %w", err) err = fmt.Errorf("using outbound offer answer: %w", err)
offer.peerConnection.span.RecordError(err) offer.peerConnection.span.RecordError(err)
dataChannel.Close() tc.Logger.LevelPrint(log.Error, err)
tc.Logger.WithDefaultLevel(log.Error).Print(err)
return return
} }
deleteOffer:
delete(tc.outboundOffers, offerId) delete(tc.outboundOffers, offerId)
go tc.Announce(tracker.None, offer.infoHash) go tc.Announce(tracker.None, offer.infoHash)
} }

View File

@ -4,17 +4,21 @@ import (
"context" "context"
"expvar" "expvar"
"fmt" "fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"io"
"sync"
"github.com/anacrolix/log" "github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/missinggo/v2/pproffd"
"github.com/pion/datachannel" "github.com/pion/datachannel"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"io"
"sync"
"time"
)
const (
dataChannelLabel = "webrtc-datachannel"
) )
var ( var (
@ -82,11 +86,15 @@ func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp web
return *peerConnection.LocalDescription(), nil return *peerConnection.LocalDescription(), nil
} }
// newOffer creates a transport and returns a WebRTC offer to be announced // newOffer creates a transport and returns a WebRTC offer to be announced. See
func newOffer( // https://github.com/pion/webrtc/blob/master/examples/data-channels/jsfiddle/main.go for what this is modelled on.
func (tc *TrackerClient) newOffer(
logger log.Logger, logger log.Logger,
offerId string,
infoHash [20]byte,
) ( ) (
peerConnection *wrappedPeerConnection, peerConnection *wrappedPeerConnection,
dataChannel *webrtc.DataChannel,
offer webrtc.SessionDescription, offer webrtc.SessionDescription,
err error, err error,
) { ) {
@ -97,6 +105,26 @@ func newOffer(
peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer")) peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
dataChannel, err = peerConnection.CreateDataChannel(dataChannelLabel, nil)
if err != nil {
err = fmt.Errorf("creating data channel: %w", err)
peerConnection.Close()
}
initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) {
metrics.Add("outbound offers answered with datachannel", 1)
tc.mu.Lock()
tc.stats.ConvertedOutboundConns++
tc.mu.Unlock()
tc.OnConn(dc, DataChannelContext{
OfferId: offerId,
LocalOffered: true,
InfoHash: infoHash,
peerConnection: peerConnection,
Context: dcCtx,
Span: dcSpan,
})
})
offer, err = peerConnection.CreateOffer(nil) offer, err = peerConnection.CreateOffer(nil)
if err != nil { if err != nil {
peerConnection.Close() peerConnection.Close()
@ -110,38 +138,62 @@ func newOffer(
return return
} }
func initAnsweringPeerConnection( type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span)
peerConnection *wrappedPeerConnection,
offer webrtc.SessionDescription, func (tc *TrackerClient) initAnsweringPeerConnection(
peerConn *wrappedPeerConnection,
offerContext offerContext,
) (answer webrtc.SessionDescription, err error) { ) (answer webrtc.SessionDescription, err error) {
peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer")) peerConn.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
err = peerConnection.SetRemoteDescription(offer) timer := time.AfterFunc(30*time.Second, func() {
peerConn.span.SetStatus(codes.Error, "answer timeout")
metrics.Add("answering peer connections timed out", 1)
peerConn.Close()
})
peerConn.OnDataChannel(func(d *webrtc.DataChannel) {
initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) {
timer.Stop()
metrics.Add("answering peer connection conversions", 1)
tc.mu.Lock()
tc.stats.ConvertedInboundConns++
tc.mu.Unlock()
tc.OnConn(detached, DataChannelContext{
OfferId: offerContext.Id,
LocalOffered: false,
InfoHash: offerContext.InfoHash,
peerConnection: peerConn,
Context: ctx,
Span: span,
})
})
})
err = peerConn.SetRemoteDescription(offerContext.SessDesc)
if err != nil { if err != nil {
return return
} }
answer, err = peerConnection.CreateAnswer(nil) answer, err = peerConn.CreateAnswer(nil)
if err != nil { if err != nil {
return return
} }
answer, err = setAndGatherLocalDescription(peerConnection, answer) answer, err = setAndGatherLocalDescription(peerConn, answer)
return return
} }
// newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced. // newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
func newAnsweringPeerConnection( func (tc *TrackerClient) newAnsweringPeerConnection(
logger log.Logger, offerContext offerContext,
offer webrtc.SessionDescription,
) ( ) (
peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error, peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
) { ) {
peerConn, err = newPeerConnection(logger) peerConn, err = newPeerConnection(tc.Logger)
if err != nil { if err != nil {
err = fmt.Errorf("failed to create new connection: %w", err) err = fmt.Errorf("failed to create new connection: %w", err)
return return
} }
answer, err = initAnsweringPeerConnection(peerConn, offer) answer, err = tc.initAnsweringPeerConnection(peerConn, offerContext)
if err != nil { if err != nil {
peerConn.span.RecordError(err) peerConn.span.RecordError(err)
peerConn.Close() peerConn.Close()
@ -162,22 +214,25 @@ func (me ioCloserFunc) Close() error {
return me() return me()
} }
func setDataChannelOnOpen( func initDataChannel(
ctx context.Context,
dc *webrtc.DataChannel, dc *webrtc.DataChannel,
pc *wrappedPeerConnection, pc *wrappedPeerConnection,
onOpen func(closer datachannel.ReadWriteCloser), onOpen onDetachedDataChannelFunc,
) { ) {
var span trace.Span
dc.OnClose(func() {
span.End()
})
dc.OnOpen(func() { dc.OnOpen(func() {
dataChannelSpan := trace.SpanFromContext(ctx) pc.span.AddEvent("data channel opened")
dataChannelSpan.AddEvent("opened") var ctx context.Context
ctx, span = otel.Tracer(tracerName).Start(pc.ctx, "DataChannel")
raw, err := dc.Detach() raw, err := dc.Detach()
if err != nil { if err != nil {
// This shouldn't happen if the API is configured correctly, and we call from OnOpen. // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
panic(err) panic(err)
} }
//dc.OnClose() onOpen(hookDataChannelCloser(raw, pc, span), ctx, span)
onOpen(hookDataChannelCloser(raw, pc, dataChannelSpan))
}) })
} }

View File

@ -0,0 +1,34 @@
package webtorrent
import (
"github.com/anacrolix/log"
qt "github.com/frankban/quicktest"
"github.com/pion/webrtc/v3"
"testing"
)
func TestClosingPeerConnectionDoesNotCloseUnopenedDataChannel(t *testing.T) {
c := qt.New(t)
var tc TrackerClient
pc, dc, _, err := tc.newOffer(log.Default, "", [20]byte{})
c.Assert(err, qt.IsNil)
defer pc.Close()
defer dc.Close()
peerConnClosed := make(chan struct{})
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
if state == webrtc.PeerConnectionStateClosed {
close(peerConnClosed)
}
})
dc.OnClose(func() {
// This should not be called because the DataChannel is never opened.
t.Fatal("DataChannel.OnClose handler called")
})
t.Logf("data channel ready state before close: %v", dc.ReadyState())
dc.OnError(func(err error) {
t.Logf("data channel error: %v", err)
})
pc.Close()
c.Check(dc.ReadyState(), qt.Equals, webrtc.DataChannelStateClosed)
<-peerConnClosed
}