From 70c112df6041921f4b1ff6f8d979a01439cc13a0 Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Mon, 30 Sep 2024 22:14:12 +0200 Subject: [PATCH 1/2] refactor: mk webhook event type register non-critical * make timeout for event type register operation configurable (`notification.webhook.eventTypeRegistrationTimeout`) * allow skipping event type register in case the first attempt was unsuccessful in order to allow the service to start even in degraded state. (`notification.webhook.skipEventTypeRegistrationOnError`) --- cmd/notification-service/main.go | 7 ++++-- cmd/server/main.go | 5 ++++- config.example.yaml | 3 +++ config/config_test.go | 4 ++++ config/notification.go | 14 ++++++++++++ openmeter/notification/webhook/webhook.go | 27 ++++++++++++++++++++--- test/notification/testenv.go | 1 + 7 files changed, 55 insertions(+), 6 deletions(-) diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index dfdaba314..2278aa82d 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -42,7 +42,7 @@ import ( "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" "github.com/openmeterio/openmeter/openmeter/watermill/router" "github.com/openmeterio/openmeter/pkg/contextx" - entdriver "github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver" + "github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver" "github.com/openmeterio/openmeter/pkg/framework/operation" "github.com/openmeterio/openmeter/pkg/framework/pgdriver" "github.com/openmeterio/openmeter/pkg/gosundheit" @@ -305,7 +305,10 @@ func main() { } notificationWebhook, err := notificationwebhook.New(notificationwebhook.Config{ - SvixConfig: conf.Svix, + SvixConfig: conf.Svix, + RegistrationTimeout: conf.Notification.Webhook.EventTypeRegistrationTimeout, + SkipRegistrationOnError: conf.Notification.Webhook.SkipEventTypeRegistrationOnError, + Logger: logger.WithGroup("notification.webhook"), }) if err != nil { logger.Error("failed to initialize notification repository", "error", err) diff --git a/cmd/server/main.go b/cmd/server/main.go index 8c2a96cd2..88b649449 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -422,7 +422,10 @@ func main() { var notificationWebhook notificationwebhook.Handler notificationWebhook, err = notificationwebhook.New(notificationwebhook.Config{ - SvixConfig: conf.Svix, + SvixConfig: conf.Svix, + RegistrationTimeout: conf.Notification.Webhook.EventTypeRegistrationTimeout, + SkipRegistrationOnError: conf.Notification.Webhook.SkipEventTypeRegistrationOnError, + Logger: logger.WithGroup("notification.webhook"), }) if err != nil { logger.Error("failed to initialize notification webhook handler", "error", err) diff --git a/config.example.yaml b/config.example.yaml index e24d7762f..bcd950c6a 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -92,6 +92,9 @@ meters: notification: enabled: true +# webhook: +# eventTypeRegistrationTimeout: 30s +# skipEventTypeRegistrationOnError: false svix: apiKey: secret diff --git a/config/config_test.go b/config/config_test.go index 1f1f0a8c5..e1bbc9bdd 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -273,6 +273,10 @@ func TestComplete(t *testing.T) { }, ConsumerGroupName: "om_notification_service", }, + Webhook: WebhookConfiguration{ + EventTypeRegistrationTimeout: notificationwebhook.DefaultRegistrationTimeout, + SkipEventTypeRegistrationOnError: false, + }, }, Svix: notificationwebhook.SvixConfig{ APIKey: "test-svix-token", diff --git a/config/notification.go b/config/notification.go index b9e265a1c..47d34cfd5 100644 --- a/config/notification.go +++ b/config/notification.go @@ -2,13 +2,25 @@ package config import ( "fmt" + "time" "github.com/spf13/viper" + + "github.com/openmeterio/openmeter/openmeter/notification/webhook" ) +type WebhookConfiguration struct { + // Timeout for registering event types in webhook provider + EventTypeRegistrationTimeout time.Duration + // Skip registering event types on unsuccessful attempt instead of returning with error + SkipEventTypeRegistrationOnError bool +} + type NotificationConfiguration struct { Enabled bool Consumer ConsumerConfiguration + + Webhook WebhookConfiguration } func (c NotificationConfiguration) Validate() error { @@ -22,4 +34,6 @@ func ConfigureNotification(v *viper.Viper) { ConfigureConsumer(v, "notification.consumer") v.SetDefault("notification.consumer.dlq.topic", "om_sys.notification_service_dlq") v.SetDefault("notification.consumer.consumerGroupName", "om_notification_service") + v.SetDefault("notification.webhook.eventTypeRegistrationTimeout", webhook.DefaultRegistrationTimeout) + v.SetDefault("notification.webhook.skipEventTypeRegistrationOnError", false) } diff --git a/openmeter/notification/webhook/webhook.go b/openmeter/notification/webhook/webhook.go index 210e2f703..e883c73b6 100644 --- a/openmeter/notification/webhook/webhook.go +++ b/openmeter/notification/webhook/webhook.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "errors" "fmt" + "log/slog" "strings" "time" ) @@ -263,31 +264,51 @@ type Handler interface { SendMessage(ctx context.Context, params SendMessageInput) (*Message, error) } +const ( + DefaultRegistrationTimeout = 30 * time.Second +) + type Config struct { SvixConfig - RegisterEvenTypes []EventType + RegisterEvenTypes []EventType + RegistrationTimeout time.Duration + SkipRegistrationOnError bool + + Logger *slog.Logger } func New(config Config) (Handler, error) { + if config.Logger == nil { + return nil, errors.New("logger is required") + } + if config.RegisterEvenTypes == nil { config.RegisterEvenTypes = NotificationEventTypes } + if config.RegistrationTimeout == 0 { + config.RegistrationTimeout = DefaultRegistrationTimeout + } + handler, err := newSvixWebhookHandler(config.SvixConfig) if err != nil { return nil, fmt.Errorf("failed to initialize Svix webhook handler: %w", err) } if len(config.RegisterEvenTypes) > 0 { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), config.RegistrationTimeout) defer cancel() err = handler.RegisterEventTypes(ctx, RegisterEventTypesInputs{ EvenTypes: config.RegisterEvenTypes, }) if err != nil { - return nil, fmt.Errorf("failed to register event types: %w", err) + if config.SkipRegistrationOnError { + config.Logger.Warn("failed to register event types", "error", err) + } else { + return nil, fmt.Errorf("failed to register event types: %w", err) + } } } diff --git a/test/notification/testenv.go b/test/notification/testenv.go index 51eec94b2..74989623c 100644 --- a/test/notification/testenv.go +++ b/test/notification/testenv.go @@ -135,6 +135,7 @@ func NewTestEnv(t *testing.T, ctx context.Context) (TestEnv, error) { ServerURL: fmt.Sprintf(SvixServerURLTemplate, svixHost), Debug: false, }, + Logger: logger, }) if err != nil { return nil, fmt.Errorf("failed to create webhook handler: %w", err) From db06a7567b71073df1bf5ad9a2958b16ecef240a Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Tue, 1 Oct 2024 14:24:52 +0200 Subject: [PATCH 2/2] fix: typo in webhook.EventTypes --- openmeter/notification/webhook/svix.go | 10 +++++----- openmeter/notification/webhook/webhook.go | 12 ++++++------ test/notification/webhook.go | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/openmeter/notification/webhook/svix.go b/openmeter/notification/webhook/svix.go index 0c78b8784..0d4061b7a 100644 --- a/openmeter/notification/webhook/svix.go +++ b/openmeter/notification/webhook/svix.go @@ -76,15 +76,15 @@ func newSvixWebhookHandler(config SvixConfig) (Handler, error) { } func (h svixWebhookHandler) RegisterEventTypes(ctx context.Context, params RegisterEventTypesInputs) error { - for _, evenType := range params.EvenTypes { + for _, eventType := range params.EventTypes { input := &svix.EventTypeUpdate{ - Description: evenType.Description, + Description: eventType.Description, FeatureFlag: *svix.NullableString(nil), - GroupName: *svix.NullableString(&evenType.GroupName), - Schemas: evenType.Schemas, + GroupName: *svix.NullableString(&eventType.GroupName), + Schemas: eventType.Schemas, } - _, err := h.client.EventType.Update(ctx, evenType.Name, input) + _, err := h.client.EventType.Update(ctx, eventType.Name, input) if err != nil { err = unwrapSvixError(err) diff --git a/openmeter/notification/webhook/webhook.go b/openmeter/notification/webhook/webhook.go index e883c73b6..45710187c 100644 --- a/openmeter/notification/webhook/webhook.go +++ b/openmeter/notification/webhook/webhook.go @@ -241,7 +241,7 @@ func (i SendMessageInput) Validate() error { } type RegisterEventTypesInputs struct { - EvenTypes []EventType + EventTypes []EventType AllowUpdate bool } @@ -271,7 +271,7 @@ const ( type Config struct { SvixConfig - RegisterEvenTypes []EventType + RegisterEventTypes []EventType RegistrationTimeout time.Duration SkipRegistrationOnError bool @@ -283,8 +283,8 @@ func New(config Config) (Handler, error) { return nil, errors.New("logger is required") } - if config.RegisterEvenTypes == nil { - config.RegisterEvenTypes = NotificationEventTypes + if config.RegisterEventTypes == nil { + config.RegisterEventTypes = NotificationEventTypes } if config.RegistrationTimeout == 0 { @@ -296,12 +296,12 @@ func New(config Config) (Handler, error) { return nil, fmt.Errorf("failed to initialize Svix webhook handler: %w", err) } - if len(config.RegisterEvenTypes) > 0 { + if len(config.RegisterEventTypes) > 0 { ctx, cancel := context.WithTimeout(context.Background(), config.RegistrationTimeout) defer cancel() err = handler.RegisterEventTypes(ctx, RegisterEventTypesInputs{ - EvenTypes: config.RegisterEvenTypes, + EventTypes: config.RegisterEventTypes, }) if err != nil { if config.SkipRegistrationOnError { diff --git a/test/notification/webhook.go b/test/notification/webhook.go index a82a64f7d..8f430e8e2 100644 --- a/test/notification/webhook.go +++ b/test/notification/webhook.go @@ -41,7 +41,7 @@ type WebhookTestSuite struct { func (s *WebhookTestSuite) Setup(ctx context.Context, t *testing.T) { err := s.Env.NotificationWebhook().RegisterEventTypes(ctx, notificationwebhook.RegisterEventTypesInputs{ - EvenTypes: notificationwebhook.NotificationEventTypes, + EventTypes: notificationwebhook.NotificationEventTypes, }) assert.NoError(t, err, "Registering event types must not fail") }