containerd/events/exchange/exchange.go

252 lines
6.3 KiB
Go

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exchange
import (
"context"
"strings"
"time"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"
goevents "github.com/docker/go-events"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// Exchange broadcasts events
type Exchange struct {
broadcaster *goevents.Broadcaster
}
// NewExchange returns a new event Exchange
func NewExchange() *Exchange {
return &Exchange{
broadcaster: goevents.NewBroadcaster(),
}
}
var _ events.Publisher = &Exchange{}
var _ events.Forwarder = &Exchange{}
var _ events.Subscriber = &Exchange{}
// Forward accepts an envelope to be directly distributed on the exchange.
//
// This is useful when an event is forwarded on behalf of another namespace or
// when the event is propagated on behalf of another publisher.
func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) {
if err := validateEnvelope(envelope); err != nil {
return err
}
defer func() {
logger := log.G(ctx).WithFields(logrus.Fields{
"topic": envelope.Topic,
"ns": envelope.Namespace,
"type": envelope.Event.TypeUrl,
})
if err != nil {
logger.WithError(err).Error("error forwarding event")
} else {
logger.Debug("event forwarded")
}
}()
return e.broadcaster.Write(envelope)
}
// Publish packages and sends an event. The caller will be considered the
// initial publisher of the event. This means the timestamp will be calculated
// at this point and this method may read from the calling context.
func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event) (err error) {
var (
namespace string
encoded *types.Any
envelope events.Envelope
)
namespace, err = namespaces.NamespaceRequired(ctx)
if err != nil {
return errors.Wrapf(err, "failed publishing event")
}
if err := validateTopic(topic); err != nil {
return errors.Wrapf(err, "envelope topic %q", topic)
}
encoded, err = typeurl.MarshalAny(event)
if err != nil {
return err
}
envelope.Timestamp = time.Now().UTC()
envelope.Namespace = namespace
envelope.Topic = topic
envelope.Event = encoded
defer func() {
logger := log.G(ctx).WithFields(logrus.Fields{
"topic": envelope.Topic,
"ns": envelope.Namespace,
"type": envelope.Event.TypeUrl,
})
if err != nil {
logger.WithError(err).Error("error publishing event")
} else {
logger.Debug("event published")
}
}()
return e.broadcaster.Write(&envelope)
}
// Subscribe to events on the exchange. Events are sent through the returned
// channel ch. If an error is encountered, it will be sent on channel errs and
// errs will be closed. To end the subscription, cancel the provided context.
//
// Zero or more filters may be provided as strings. Only events that match
// *any* of the provided filters will be sent on the channel. The filters use
// the standard containerd filters package syntax.
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) {
var (
evch = make(chan *events.Envelope)
errq = make(chan error, 1)
channel = goevents.NewChannel(0)
queue = goevents.NewQueue(channel)
dst goevents.Sink = queue
)
closeAll := func() {
channel.Close()
queue.Close()
e.broadcaster.Remove(dst)
close(errq)
}
ch = evch
errs = errq
if len(fs) > 0 {
filter, err := filters.ParseAll(fs...)
if err != nil {
errq <- errors.Wrapf(err, "failed parsing subscription filters")
closeAll()
return
}
dst = goevents.NewFilter(queue, goevents.MatcherFunc(func(gev goevents.Event) bool {
return filter.Match(adapt(gev))
}))
}
e.broadcaster.Add(dst)
go func() {
defer closeAll()
var err error
loop:
for {
select {
case ev := <-channel.C:
env, ok := ev.(*events.Envelope)
if !ok {
// TODO(stevvooe): For the most part, we are well protected
// from this condition. Both Forward and Publish protect
// from this.
err = errors.Errorf("invalid envelope encountered %#v; please file a bug", ev)
break
}
select {
case evch <- env:
case <-ctx.Done():
break loop
}
case <-ctx.Done():
break loop
}
}
if err == nil {
if cerr := ctx.Err(); cerr != context.Canceled {
err = cerr
}
}
errq <- err
}()
return
}
func validateTopic(topic string) error {
if topic == "" {
return errors.Wrap(errdefs.ErrInvalidArgument, "must not be empty")
}
if topic[0] != '/' {
return errors.Wrapf(errdefs.ErrInvalidArgument, "must start with '/'")
}
if len(topic) == 1 {
return errors.Wrapf(errdefs.ErrInvalidArgument, "must have at least one component")
}
components := strings.Split(topic[1:], "/")
for _, component := range components {
if err := identifiers.Validate(component); err != nil {
return errors.Wrapf(err, "failed validation on component %q", component)
}
}
return nil
}
func validateEnvelope(envelope *events.Envelope) error {
if err := identifiers.Validate(envelope.Namespace); err != nil {
return errors.Wrapf(err, "event envelope has invalid namespace")
}
if err := validateTopic(envelope.Topic); err != nil {
return errors.Wrapf(err, "envelope topic %q", envelope.Topic)
}
if envelope.Timestamp.IsZero() {
return errors.Wrapf(errdefs.ErrInvalidArgument, "timestamp must be set on forwarded event")
}
return nil
}
func adapt(ev interface{}) filters.Adaptor {
if adaptor, ok := ev.(filters.Adaptor); ok {
return adaptor
}
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
return "", false
})
}