Add tracing to webtorrent webrtc resources

This commit is contained in:
Matt Joiner 2022-07-12 16:05:19 +10:00
parent 43b2d3ee21
commit c17018d04e
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
5 changed files with 144 additions and 57 deletions

6
go.mod
View File

@ -14,7 +14,7 @@ require (
github.com/anacrolix/fuse v0.2.0
github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60
github.com/anacrolix/go-libutp v1.2.0
github.com/anacrolix/log v0.13.2-0.20220427063716-a4894bb521c6
github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30
github.com/anacrolix/missinggo v1.3.0
github.com/anacrolix/missinggo/perf v1.0.0
github.com/anacrolix/missinggo/v2 v2.7.0
@ -58,6 +58,8 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.2.2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
@ -86,6 +88,8 @@ require (
github.com/rogpeppe/go-internal v1.8.1 // indirect
github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 // indirect
github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 // indirect
go.opentelemetry.io/otel v1.8.0 // indirect
go.opentelemetry.io/otel/trace v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d // indirect
golang.org/x/net v0.0.0-20220630215102-69896b714898 // indirect

11
go.sum
View File

@ -93,6 +93,8 @@ github.com/anacrolix/log v0.10.0/go.mod h1:s5yBP/j046fm9odtUTbHOfDUq/zh1W8OkPpJt
github.com/anacrolix/log v0.10.1-0.20220123034749-3920702c17f8/go.mod h1:GmnE2c0nvz8pOIPUSC9Rawgefy1sDXqposC2wgtBZE4=
github.com/anacrolix/log v0.13.2-0.20220427063716-a4894bb521c6 h1:WH/Xcok0GpNID/NUV80CfTwUYXdbhR3pX/DXboxGhNI=
github.com/anacrolix/log v0.13.2-0.20220427063716-a4894bb521c6/go.mod h1:D4+CvN8SnruK6zIFS/xPoRJmtvtnxs+CSfDQ+BFxZ68=
github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30 h1:bAgFzUxN1K3U8KwOzqCOhiygOr5NqYO3kNlV9tvp2Rc=
github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30/go.mod h1:D4+CvN8SnruK6zIFS/xPoRJmtvtnxs+CSfDQ+BFxZ68=
github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62 h1:P04VG6Td13FHMgS5ZBcJX23NPC/fiC4cp9bXwYujdYM=
github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62/go.mod h1:66cFKPCO7Sl4vbFnAaSq7e4OXtdMhRSBagJGWgmpJbM=
github.com/anacrolix/missinggo v0.0.0-20180725070939-60ef2fbf63df/go.mod h1:kwGiTUTZ0+p4vAz3VbAI5a30t2YbvemcmspjKwrAz5s=
@ -202,6 +204,11 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@ -474,6 +481,10 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opentelemetry.io/otel v1.8.0 h1:zcvBFizPbpa1q7FehvFiHbQwGzmPILebO0tyqIR5Djg=
go.opentelemetry.io/otel v1.8.0/go.mod h1:2pkj+iMj0o03Y+cW6/m8Y4WkRdYN3AvCXCnzRMp9yvM=
go.opentelemetry.io/otel/trace v1.8.0 h1:cSy0DF9eGI5WIfNwZ1q2iUyGj00tGzP24dE1lOlHrfY=
go.opentelemetry.io/otel/trace v1.8.0/go.mod h1:0Bt3PXY8w+3pheS3hQUt+wow8b1ojPaTBoTCh2zIFI4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=

21
webtorrent/otel.go Normal file
View File

@ -0,0 +1,21 @@
package webtorrent
import (
"context"
"github.com/pion/webrtc/v3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
const (
tracerName = "anacrolix.torrent.webtorrent"
webrtcConnTypeKey = "webtorrent.webrtc.conn.type"
)
func dataChannelStarted(peerConnectionCtx context.Context, dc *webrtc.DataChannel) (dataChannelCtx context.Context, span trace.Span) {
dataChannelCtx, span = otel.Tracer(tracerName).Start(peerConnectionCtx, "DataChannel")
dc.OnClose(func() {
span.End()
})
return
}

View File

@ -1,9 +1,12 @@
package webtorrent
import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"sync"
"time"
@ -53,7 +56,6 @@ func (me *TrackerClient) peerIdBinary() string {
type outboundOffer struct {
originalOffer webrtc.SessionDescription
peerConnection *wrappedPeerConnection
dataChannel *webrtc.DataChannel
infoHash [20]byte
}
@ -65,6 +67,8 @@ type DataChannelContext struct {
InfoHash [20]byte
// This is private as some methods might not be appropriate with data channel context.
peerConnection *wrappedPeerConnection
span trace.Span
ctx context.Context
}
func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) {
@ -207,7 +211,7 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte
}
offerIDBinary := binaryToJsonString(randOfferId[:])
pc, dc, offer, err := newOffer()
pc, offer, err := newOffer(tc.Logger)
if err != nil {
return fmt.Errorf("creating offer: %w", err)
}
@ -251,7 +255,6 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte
}
tc.outboundOffers[offerIDBinary] = outboundOffer{
peerConnection: pc,
dataChannel: dc,
originalOffer: offer,
infoHash: infoHash,
}
@ -301,7 +304,7 @@ func (tc *TrackerClient) handleOffer(
infoHash [20]byte,
peerId string,
) error {
peerConnection, answer, err := newAnsweringPeerConnection(offer)
peerConnection, answer, err := newAnsweringPeerConnection(tc.Logger, offer)
if err != nil {
return fmt.Errorf("write AnnounceResponse: %w", err)
}
@ -325,11 +328,13 @@ func (tc *TrackerClient) handleOffer(
return fmt.Errorf("writing response: %w", err)
}
timer := time.AfterFunc(30*time.Second, func() {
peerConnection.span.SetStatus(codes.Error, "answer timeout")
metrics.Add("answering peer connections timed out", 1)
peerConnection.Close()
})
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
ctx, span := dataChannelStarted(peerConnection.ctx, d)
setDataChannelOnOpen(ctx, d, peerConnection, func(dc datachannel.ReadWriteCloser) {
timer.Stop()
metrics.Add("answering peer connection conversions", 1)
tc.mu.Lock()
@ -342,6 +347,8 @@ func (tc *TrackerClient) handleOffer(
LocalOffered: false,
InfoHash: infoHash,
peerConnection: peerConnection,
ctx: ctx,
span: span,
})
})
})
@ -358,24 +365,44 @@ func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescr
}
// tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
metrics.Add("outbound offers answered", 1)
err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
metrics.Add("outbound offers answered with datachannel", 1)
tc.mu.Lock()
tc.stats.ConvertedOutboundConns++
tc.mu.Unlock()
tc.OnConn(dc, DataChannelContext{
Local: offer.originalOffer,
Remote: answer,
OfferId: offerId,
LocalOffered: true,
InfoHash: offer.infoHash,
peerConnection: offer.peerConnection,
})
})
// Why do we create the data channel before setting the remote description? Are we trying to avoid the peer
// initiating?
dataChannel, err := offer.peerConnection.CreateDataChannel("webrtc-datachannel", nil)
if err != nil {
tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
err = fmt.Errorf("creating data channel: %w", err)
tc.Logger.LevelPrint(log.Error, err)
offer.peerConnection.span.RecordError(err)
offer.peerConnection.Close()
goto deleteOffer
}
{
ctx, span := dataChannelStarted(offer.peerConnection.ctx, dataChannel)
setDataChannelOnOpen(ctx, dataChannel, offer.peerConnection, func(dc datachannel.ReadWriteCloser) {
metrics.Add("outbound offers answered with datachannel", 1)
tc.mu.Lock()
tc.stats.ConvertedOutboundConns++
tc.mu.Unlock()
tc.OnConn(dc, DataChannelContext{
Local: offer.originalOffer,
Remote: answer,
OfferId: offerId,
LocalOffered: true,
InfoHash: offer.infoHash,
peerConnection: offer.peerConnection,
ctx: ctx,
span: span,
})
})
}
err = offer.peerConnection.SetRemoteDescription(answer)
if err != nil {
err = fmt.Errorf("using outbound offer answer: %w", err)
offer.peerConnection.span.RecordError(err)
dataChannel.Close()
tc.Logger.WithDefaultLevel(log.Error).Print(err)
return
}
deleteOffer:
delete(tc.outboundOffers, offerId)
go tc.Announce(tracker.None, offer.infoHash)
}

View File

@ -1,14 +1,20 @@
package webtorrent
import (
"context"
"expvar"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"io"
"sync"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/pion/datachannel"
"github.com/pion/webrtc/v3"
"go.opentelemetry.io/otel"
)
var (
@ -26,58 +32,81 @@ type wrappedPeerConnection struct {
*webrtc.PeerConnection
closeMu sync.Mutex
pproffd.CloseWrapper
span trace.Span
ctx context.Context
}
func (me *wrappedPeerConnection) Close() error {
me.closeMu.Lock()
defer me.closeMu.Unlock()
return me.CloseWrapper.Close()
err := me.CloseWrapper.Close()
me.span.End()
return err
}
func newPeerConnection() (*wrappedPeerConnection, error) {
func newPeerConnection(logger log.Logger) (*wrappedPeerConnection, error) {
newPeerConnectionMu.Lock()
defer newPeerConnectionMu.Unlock()
ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
pc, err := api.NewPeerConnection(config)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
span.End()
return nil, err
}
return &wrappedPeerConnection{
wpc := &wrappedPeerConnection{
PeerConnection: pc,
CloseWrapper: pproffd.NewCloseWrapper(pc),
}, nil
ctx: ctx,
span: span,
}
// If the state change handler intends to call Close, it should call it on the wrapper.
wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
})
return wpc, nil
}
func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) {
gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
peerConnection.span.AddEvent("setting local description")
err = peerConnection.SetLocalDescription(sdp)
if err != nil {
err = fmt.Errorf("setting local description: %w", err)
return
}
<-gatherComplete
peerConnection.span.AddEvent("gathering complete")
return *peerConnection.LocalDescription(), nil
}
// newOffer creates a transport and returns a WebRTC offer to be announced
func newOffer() (
func newOffer(
logger log.Logger,
) (
peerConnection *wrappedPeerConnection,
dataChannel *webrtc.DataChannel,
offer webrtc.SessionDescription,
err error,
) {
peerConnection, err = newPeerConnection()
peerConnection, err = newPeerConnection(logger)
if err != nil {
return
}
dataChannel, err = peerConnection.CreateDataChannel("webrtc-datachannel", nil)
if err != nil {
peerConnection.Close()
return
}
peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
offer, err = peerConnection.CreateOffer(nil)
if err != nil {
peerConnection.Close()
return
}
gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
err = peerConnection.SetLocalDescription(offer)
offer, err = setAndGatherLocalDescription(peerConnection, offer)
if err != nil {
peerConnection.Close()
return
}
<-gatherComplete
offer = *peerConnection.LocalDescription()
return
}
@ -85,6 +114,8 @@ func initAnsweringPeerConnection(
peerConnection *wrappedPeerConnection,
offer webrtc.SessionDescription,
) (answer webrtc.SessionDescription, err error) {
peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
err = peerConnection.SetRemoteDescription(offer)
if err != nil {
return
@ -94,40 +125,30 @@ func initAnsweringPeerConnection(
return
}
gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
err = peerConnection.SetLocalDescription(answer)
if err != nil {
return
}
<-gatherComplete
answer = *peerConnection.LocalDescription()
answer, err = setAndGatherLocalDescription(peerConnection, answer)
return
}
// newAnsweringPeerConnection creates a transport from a WebRTC offer and and returns a WebRTC answer to be
// announced.
func newAnsweringPeerConnection(offer webrtc.SessionDescription) (
// newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
func newAnsweringPeerConnection(
logger log.Logger,
offer webrtc.SessionDescription,
) (
peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
) {
peerConn, err = newPeerConnection()
peerConn, err = newPeerConnection(logger)
if err != nil {
err = fmt.Errorf("failed to create new connection: %w", err)
return
}
answer, err = initAnsweringPeerConnection(peerConn, offer)
if err != nil {
peerConn.span.RecordError(err)
peerConn.Close()
}
return
}
func (t *outboundOffer) setAnswer(answer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) error {
setDataChannelOnOpen(t.dataChannel, t.peerConnection, onOpen)
err := t.peerConnection.SetRemoteDescription(answer)
return err
}
type datachannelReadWriter interface {
datachannel.Reader
datachannel.Writer
@ -142,16 +163,19 @@ func (me ioCloserFunc) Close() error {
}
func setDataChannelOnOpen(
ctx context.Context,
dc *webrtc.DataChannel,
pc *wrappedPeerConnection,
onOpen func(closer datachannel.ReadWriteCloser),
) {
dc.OnOpen(func() {
trace.SpanFromContext(ctx).AddEvent("opened")
raw, err := dc.Detach()
if err != nil {
// This shouldn't happen if the API is configured correctly, and we call from OnOpen.
panic(err)
}
//dc.OnClose()
onOpen(hookDataChannelCloser(raw, pc))
})
}