diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index 0d85b072b..515df5c14 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -300,9 +300,10 @@ func main() { IngestEventsTopic: conf.Events.IngestEvents.Topic, Router: router.Options{ - Subscriber: wmSubscriber, - Publisher: eventPublisherDriver, - Logger: logger, + Subscriber: wmSubscriber, + Publisher: eventPublisherDriver, + Logger: logger, + MetricMeter: metricMeter, Config: conf.BalanceWorker.ConsumerConfiguration, }, diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index 823e1f56f..5b30544f3 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -326,9 +326,10 @@ func main() { consumerOptions := consumer.Options{ SystemEventsTopic: conf.Events.SystemEvents.Topic, Router: router.Options{ - Subscriber: wmSubscriber, - Publisher: eventPublisherDriver, - Logger: logger, + Subscriber: wmSubscriber, + Publisher: eventPublisherDriver, + Logger: logger, + MetricMeter: metricMeter, Config: conf.Notification.Consumer, }, diff --git a/internal/entitlement/balanceworker/entitlementhandler.go b/internal/entitlement/balanceworker/entitlementhandler.go index ead8babb1..e19ba8932 100644 --- a/internal/entitlement/balanceworker/entitlementhandler.go +++ b/internal/entitlement/balanceworker/entitlementhandler.go @@ -15,53 +15,7 @@ import ( "github.com/openmeterio/openmeter/pkg/convert" ) -func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent entitlement.EntitlementDeletedEvent) (marshaler.Event, error) { - namespace := delEvent.Namespace.ID - - feature, err := w.entitlement.Feature.GetFeature(ctx, namespace, delEvent.FeatureID, productcatalog.IncludeArchivedFeatureTrue) - if err != nil { - return nil, fmt.Errorf("failed to get feature: %w", err) - } - - subject := models.Subject{ - Key: delEvent.SubjectKey, - } - - if w.opts.SubjectResolver != nil { - subject, err = w.opts.SubjectResolver.GetSubjectByKey(ctx, namespace, delEvent.SubjectKey) - if err != nil { - return nil, fmt.Errorf("failed to get subject: %w", err) - } - } - - calculationTime := time.Now() - - event := marshaler.WithSource( - metadata.ComposeResourcePath(namespace, metadata.EntityEntitlement, delEvent.ID), - snapshot.SnapshotEvent{ - Entitlement: delEvent.Entitlement, - Namespace: models.NamespaceID{ - ID: namespace, - }, - Subject: subject, - Feature: *feature, - Operation: snapshot.ValueOperationDelete, - - CalculatedAt: convert.ToPointer(calculationTime), - - CurrentUsagePeriod: delEvent.CurrentUsagePeriod, - }, - ) - - _ = w.highWatermarkCache.Add(delEvent.ID, highWatermarkCacheEntry{ - HighWatermark: calculationTime.Add(-defaultClockDrift), - IsDeleted: true, - }) - - return event, nil -} - -func (w *Worker) handleEntitlementUpdateEvent(ctx context.Context, entitlementID NamespacedID, source string) (marshaler.Event, error) { +func (w *Worker) handleEntitlementEvent(ctx context.Context, entitlementID NamespacedID, source string) (marshaler.Event, error) { calculatedAt := time.Now() if entry, ok := w.highWatermarkCache.Get(entitlementID.ID); ok { @@ -70,19 +24,6 @@ func (w *Worker) handleEntitlementUpdateEvent(ctx context.Context, entitlementID } } - snapshot, err := w.createSnapshotEvent(ctx, entitlementID, source, calculatedAt) - if err != nil { - return nil, fmt.Errorf("failed to create entitlement update snapshot event: %w", err) - } - - _ = w.highWatermarkCache.Add(entitlementID.ID, highWatermarkCacheEntry{ - HighWatermark: calculatedAt.Add(-defaultClockDrift), - }) - - return snapshot, nil -} - -func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID NamespacedID, source string, calculatedAt time.Time) (marshaler.Event, error) { entitlements, err := w.entitlement.Entitlement.ListEntitlements(ctx, entitlement.ListEntitlementsParams{ Namespaces: []string{entitlementID.Namespace}, IDs: []string{entitlementID.ID}, @@ -101,21 +42,48 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac } entitlementEntity := &entitlements.Items[0] + if entitlementEntity.DeletedAt != nil { // entitlement got deleted while processing changes => let's create a delete event so that we are not working - // on entitlement updates that are not relevant anymore - return w.handleEntitlementDeleteEvent(ctx, entitlement.EntitlementDeletedEvent{ - Entitlement: *entitlementEntity, - Namespace: models.NamespaceID{ID: entitlementID.Namespace}, + + snapshot, err := w.createDeletedSnapshotEvent(ctx, + entitlement.EntitlementDeletedEvent{ + Entitlement: *entitlementEntity, + Namespace: models.NamespaceID{ + ID: entitlementEntity.Namespace, + }, + }, calculatedAt) + if err != nil { + return nil, fmt.Errorf("failed to create entitlement delete snapshot event: %w", err) + } + + _ = w.highWatermarkCache.Add(entitlementID.ID, highWatermarkCacheEntry{ + HighWatermark: calculatedAt.Add(-defaultClockDrift), + IsDeleted: true, }) + + return snapshot, nil } - feature, err := w.entitlement.Feature.GetFeature(ctx, entitlementID.Namespace, entitlementEntity.FeatureID, productcatalog.IncludeArchivedFeatureTrue) + snapshot, err := w.createSnapshotEvent(ctx, entitlementEntity, source, calculatedAt) + if err != nil { + return nil, fmt.Errorf("failed to create entitlement update snapshot event: %w", err) + } + + _ = w.highWatermarkCache.Add(entitlementID.ID, highWatermarkCacheEntry{ + HighWatermark: calculatedAt.Add(-defaultClockDrift), + }) + + return snapshot, nil +} + +func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementEntity *entitlement.Entitlement, source string, calculatedAt time.Time) (marshaler.Event, error) { + feature, err := w.entitlement.Feature.GetFeature(ctx, entitlementEntity.Namespace, entitlementEntity.FeatureID, productcatalog.IncludeArchivedFeatureTrue) if err != nil { return nil, fmt.Errorf("failed to get feature: %w", err) } - value, err := w.entitlement.Entitlement.GetEntitlementValue(ctx, entitlementID.Namespace, entitlementEntity.SubjectKey, entitlementEntity.ID, calculatedAt) + value, err := w.entitlement.Entitlement.GetEntitlementValue(ctx, entitlementEntity.Namespace, entitlementEntity.SubjectKey, entitlementEntity.ID, calculatedAt) if err != nil { return nil, fmt.Errorf("failed to get entitlement value: %w", err) } @@ -129,7 +97,7 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac Key: entitlementEntity.SubjectKey, } if w.opts.SubjectResolver != nil { - subject, err = w.opts.SubjectResolver.GetSubjectByKey(ctx, entitlementID.Namespace, entitlementEntity.SubjectKey) + subject, err = w.opts.SubjectResolver.GetSubjectByKey(ctx, entitlementEntity.Namespace, entitlementEntity.SubjectKey) if err != nil { return nil, fmt.Errorf("failed to get subject ID: %w", err) } @@ -140,7 +108,7 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac snapshot.SnapshotEvent{ Entitlement: *entitlementEntity, Namespace: models.NamespaceID{ - ID: entitlementID.Namespace, + ID: entitlementEntity.Namespace, }, Subject: subject, Feature: *feature, @@ -155,3 +123,42 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac return event, nil } + +func (w *Worker) createDeletedSnapshotEvent(ctx context.Context, delEvent entitlement.EntitlementDeletedEvent, calculationTime time.Time) (marshaler.Event, error) { + namespace := delEvent.Namespace.ID + + feature, err := w.entitlement.Feature.GetFeature(ctx, namespace, delEvent.FeatureID, productcatalog.IncludeArchivedFeatureTrue) + if err != nil { + return nil, fmt.Errorf("failed to get feature: %w", err) + } + + subject := models.Subject{ + Key: delEvent.SubjectKey, + } + + if w.opts.SubjectResolver != nil { + subject, err = w.opts.SubjectResolver.GetSubjectByKey(ctx, namespace, delEvent.SubjectKey) + if err != nil { + return nil, fmt.Errorf("failed to get subject: %w", err) + } + } + + event := marshaler.WithSource( + metadata.ComposeResourcePath(namespace, metadata.EntityEntitlement, delEvent.ID), + snapshot.SnapshotEvent{ + Entitlement: delEvent.Entitlement, + Namespace: models.NamespaceID{ + ID: namespace, + }, + Subject: subject, + Feature: *feature, + Operation: snapshot.ValueOperationDelete, + + CalculatedAt: convert.ToPointer(calculationTime), + + CurrentUsagePeriod: delEvent.CurrentUsagePeriod, + }, + ) + + return event, nil +} diff --git a/internal/entitlement/balanceworker/ingesthandler.go b/internal/entitlement/balanceworker/ingesthandler.go index dd8285f38..b8fa58687 100644 --- a/internal/entitlement/balanceworker/ingesthandler.go +++ b/internal/entitlement/balanceworker/ingesthandler.go @@ -3,6 +3,7 @@ package balanceworker import ( "context" "errors" + "fmt" "github.com/openmeterio/openmeter/internal/entitlement" "github.com/openmeterio/openmeter/internal/event/metadata" @@ -27,7 +28,7 @@ func (w *Worker) handleBatchedIngestEvent(ctx context.Context, event ingestevent var handlingError error for _, entitlement := range affectedEntitlements { - event, err := w.handleEntitlementUpdateEvent( + event, err := w.handleEntitlementEvent( ctx, NamespacedID{Namespace: entitlement.Namespace, ID: entitlement.EntitlementID}, metadata.ComposeResourcePath(entitlement.Namespace, metadata.EntityEvent), @@ -39,10 +40,14 @@ func (w *Worker) handleBatchedIngestEvent(ctx context.Context, event ingestevent } if err := w.opts.EventBus.Publish(ctx, event); err != nil { - handlingError = errors.Join(handlingError, err) + handlingError = errors.Join(handlingError, fmt.Errorf("handling entitlement event for %s: %w", entitlement.EntitlementID, err)) } } + if handlingError != nil { + w.opts.Logger.Error("error handling batched ingest event", "error", handlingError) + } + return handlingError } diff --git a/internal/entitlement/balanceworker/worker.go b/internal/entitlement/balanceworker/worker.go index c767634d7..19a0801d5 100644 --- a/internal/entitlement/balanceworker/worker.go +++ b/internal/entitlement/balanceworker/worker.go @@ -113,7 +113,7 @@ func (w *Worker) eventHandler() message.NoPublishHandlerFunc { grouphandler.NewGroupEventHandler(func(ctx context.Context, event *entitlement.EntitlementCreatedEvent) error { return w.opts.EventBus. WithContext(ctx). - PublishIfNoError(w.handleEntitlementUpdateEvent( + PublishIfNoError(w.handleEntitlementEvent( ctx, NamespacedID{Namespace: event.Namespace.ID, ID: event.ID}, metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, event.ID), @@ -124,14 +124,17 @@ func (w *Worker) eventHandler() message.NoPublishHandlerFunc { grouphandler.NewGroupEventHandler(func(ctx context.Context, event *entitlement.EntitlementDeletedEvent) error { return w.opts.EventBus. WithContext(ctx). - PublishIfNoError(w.handleEntitlementDeleteEvent(ctx, *event)) + PublishIfNoError(w.handleEntitlementEvent(ctx, + NamespacedID{Namespace: event.Namespace.ID, ID: event.ID}, + metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, event.ID), + )) }), // Grant created event grouphandler.NewGroupEventHandler(func(ctx context.Context, event *grant.CreatedEvent) error { return w.opts.EventBus. WithContext(ctx). - PublishIfNoError(w.handleEntitlementUpdateEvent( + PublishIfNoError(w.handleEntitlementEvent( ctx, NamespacedID{Namespace: event.Namespace.ID, ID: string(event.OwnerID)}, metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, string(event.OwnerID), metadata.EntityGrant, event.ID), @@ -142,7 +145,7 @@ func (w *Worker) eventHandler() message.NoPublishHandlerFunc { grouphandler.NewGroupEventHandler(func(ctx context.Context, event *grant.VoidedEvent) error { return w.opts.EventBus. WithContext(ctx). - PublishIfNoError(w.handleEntitlementUpdateEvent( + PublishIfNoError(w.handleEntitlementEvent( ctx, NamespacedID{Namespace: event.Namespace.ID, ID: string(event.OwnerID)}, metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, string(event.OwnerID), metadata.EntityGrant, event.ID), @@ -153,7 +156,7 @@ func (w *Worker) eventHandler() message.NoPublishHandlerFunc { grouphandler.NewGroupEventHandler(func(ctx context.Context, event *meteredentitlement.EntitlementResetEvent) error { return w.opts.EventBus. WithContext(ctx). - PublishIfNoError(w.handleEntitlementUpdateEvent( + PublishIfNoError(w.handleEntitlementEvent( ctx, NamespacedID{Namespace: event.Namespace.ID, ID: event.EntitlementID}, metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, event.EntitlementID), diff --git a/internal/watermill/eventbus/eventbus.go b/internal/watermill/eventbus/eventbus.go index 6f278aff8..08fbb97b1 100644 --- a/internal/watermill/eventbus/eventbus.go +++ b/internal/watermill/eventbus/eventbus.go @@ -49,6 +49,11 @@ type publisher struct { } func (p publisher) Publish(ctx context.Context, event marshaler.Event) error { + if event == nil { + // nil events are always ignored as the handler signifies that it doesn't want to publish anything + return nil + } + return p.eventBus.Publish(ctx, event) } diff --git a/internal/watermill/router/metrics.go b/internal/watermill/router/metrics.go new file mode 100644 index 000000000..0ba43f804 --- /dev/null +++ b/internal/watermill/router/metrics.go @@ -0,0 +1,59 @@ +package router + +import ( + "fmt" + "log/slog" + "time" + + "github.com/ThreeDotsLabs/watermill/message" + "go.opentelemetry.io/otel/metric" +) + +const ( + messageProcessingTimeMetricName = "message_processing_time" + messageProcessedCount = "message_processed_count" + messageProcessingErrorCount = "message_processing_error_count" +) + +func Metrics(metricMeter metric.Meter, prefix string, log *slog.Logger) (func(message.HandlerFunc) message.HandlerFunc, error) { + messageProcessingTime, err := metricMeter.Float64Histogram( + fmt.Sprintf("%s.%s", prefix, messageProcessingTimeMetricName), + metric.WithDescription("Time spent processing a message"), + ) + if err != nil { + return nil, err + } + + messageProcessed, err := metricMeter.Int64Counter( + fmt.Sprintf("%s.%s", prefix, messageProcessedCount), + metric.WithDescription("Number of messages processed"), + ) + if err != nil { + return nil, err + } + + messageProcessingError, err := metricMeter.Int64Counter( + fmt.Sprintf("%s.%s", prefix, messageProcessingErrorCount), + metric.WithDescription("Number of messages that failed to process"), + ) + if err != nil { + return nil, err + } + + return func(h message.HandlerFunc) message.HandlerFunc { + return func(msg *message.Message) ([]*message.Message, error) { + start := time.Now() + + resMsg, err := h(msg) + if err != nil { + log.Error("Failed to process message", "error", err, "message_metadata", msg.Metadata, "message_payload", string(msg.Payload)) + messageProcessingError.Add(msg.Context(), 1) + return resMsg, err + } + + messageProcessingTime.Record(msg.Context(), time.Since(start).Seconds()) + messageProcessed.Add(msg.Context(), 1) + return resMsg, nil + } + }, nil +} diff --git a/internal/watermill/router/router.go b/internal/watermill/router/router.go index 8acec622e..9a0cdab46 100644 --- a/internal/watermill/router/router.go +++ b/internal/watermill/router/router.go @@ -7,14 +7,16 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" + "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/config" ) type Options struct { - Subscriber message.Subscriber - Publisher message.Publisher - Logger *slog.Logger + Subscriber message.Subscriber + Publisher message.Publisher + Logger *slog.Logger + MetricMeter metric.Meter Config config.ConsumerConfiguration } @@ -91,5 +93,13 @@ func NewDefaultRouter(opts Options) (*message.Router, error) { ) } + if opts.MetricMeter != nil { + metricsMiddleware, err := Metrics(opts.MetricMeter, "consumer", opts.Logger) + if err != nil { + return nil, err + } + router.AddMiddleware(metricsMiddleware) + } + return router, nil }