Skip to content

Commit

Permalink
refactor: mk webhook event type register non-critical
Browse files Browse the repository at this point in the history
* make timeout for event type register operation configurable
  (`notification.webhook.eventTypeRegisterTimeout`)
* allow skipping event type register in case the first attempt was
  unsuccessful in order to allow the service to start even in degraded
  state. (`notification.webhook.skipEventTypeRegisterOnError`)
  • Loading branch information
chrisgacsal committed Sep 30, 2024
1 parent 520906e commit d7fc077
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 6 deletions.
7 changes: 5 additions & 2 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
"github.com/openmeterio/openmeter/openmeter/watermill/router"
"github.com/openmeterio/openmeter/pkg/contextx"
entdriver "github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver"
"github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver"
"github.com/openmeterio/openmeter/pkg/framework/operation"
"github.com/openmeterio/openmeter/pkg/framework/pgdriver"
"github.com/openmeterio/openmeter/pkg/gosundheit"
Expand Down Expand Up @@ -305,7 +305,10 @@ func main() {
}

notificationWebhook, err := notificationwebhook.New(notificationwebhook.Config{
SvixConfig: conf.Svix,
SvixConfig: conf.Svix,
RegisterTimeout: conf.Notification.Webhook.EventTypeRegisterTimeout,
SkipRegisterOnError: conf.Notification.Webhook.SkipEventTypeRegisterOnError,
Logger: logger.WithGroup("notification.webhook"),
})
if err != nil {
logger.Error("failed to initialize notification repository", "error", err)
Expand Down
5 changes: 4 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,10 @@ func main() {

var notificationWebhook notificationwebhook.Handler
notificationWebhook, err = notificationwebhook.New(notificationwebhook.Config{
SvixConfig: conf.Svix,
SvixConfig: conf.Svix,
RegisterTimeout: conf.Notification.Webhook.EventTypeRegisterTimeout,
SkipRegisterOnError: conf.Notification.Webhook.SkipEventTypeRegisterOnError,
Logger: logger.WithGroup("notification.webhook"),
})
if err != nil {
logger.Error("failed to initialize notification webhook handler", "error", err)
Expand Down
3 changes: 3 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ meters:

notification:
enabled: true
# webhook:
# eventTypeRegisterTimeout: 30s
# skipEventTypeRegisterOnError: false

svix:
apiKey: secret
Expand Down
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ func TestComplete(t *testing.T) {
},
ConsumerGroupName: "om_notification_service",
},
Webhook: WebhookConfiguration{
EventTypeRegisterTimeout: notificationwebhook.DefaultRegisterTimeout,
SkipEventTypeRegisterOnError: false,
},
},
Svix: notificationwebhook.SvixConfig{
APIKey: "test-svix-token",
Expand Down
14 changes: 14 additions & 0 deletions config/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,25 @@ package config

import (
"fmt"
"time"

"github.com/spf13/viper"

"github.com/openmeterio/openmeter/openmeter/notification/webhook"
)

type WebhookConfiguration struct {
// Timeout for registering event types in webhook provider
EventTypeRegisterTimeout time.Duration
// Skip registering event types on unsuccessful attempt instead of returning with error
SkipEventTypeRegisterOnError bool
}

type NotificationConfiguration struct {
Enabled bool
Consumer ConsumerConfiguration

Webhook WebhookConfiguration
}

func (c NotificationConfiguration) Validate() error {
Expand All @@ -22,4 +34,6 @@ func ConfigureNotification(v *viper.Viper) {
ConfigureConsumer(v, "notification.consumer")
v.SetDefault("notification.consumer.dlq.topic", "om_sys.notification_service_dlq")
v.SetDefault("notification.consumer.consumerGroupName", "om_notification_service")
v.SetDefault("notification.webhook.eventTypeRegisterTimeout", webhook.DefaultRegisterTimeout)
v.SetDefault("notification.webhook.skipEventTypeRegisterOnError", false)
}
27 changes: 24 additions & 3 deletions openmeter/notification/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"log/slog"
"strings"
"time"
)
Expand Down Expand Up @@ -263,31 +264,51 @@ type Handler interface {
SendMessage(ctx context.Context, params SendMessageInput) (*Message, error)
}

const (
DefaultRegisterTimeout = 30 * time.Second
)

type Config struct {
SvixConfig

RegisterEvenTypes []EventType
RegisterEvenTypes []EventType
RegisterTimeout time.Duration
SkipRegisterOnError bool

Logger *slog.Logger
}

func New(config Config) (Handler, error) {
if config.Logger == nil {
return nil, errors.New("logger is required")
}

if config.RegisterEvenTypes == nil {
config.RegisterEvenTypes = NotificationEventTypes
}

if config.RegisterTimeout == 0 {
config.RegisterTimeout = DefaultRegisterTimeout
}

handler, err := newSvixWebhookHandler(config.SvixConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize Svix webhook handler: %w", err)
}

if len(config.RegisterEvenTypes) > 0 {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), config.RegisterTimeout)
defer cancel()

err = handler.RegisterEventTypes(ctx, RegisterEventTypesInputs{
EvenTypes: config.RegisterEvenTypes,
})
if err != nil {
return nil, fmt.Errorf("failed to register event types: %w", err)
if config.SkipRegisterOnError {
config.Logger.Warn("failed to register event types", "error", err)
} else {
return nil, fmt.Errorf("failed to register event types: %w", err)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions test/notification/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func NewTestEnv(t *testing.T, ctx context.Context) (TestEnv, error) {
ServerURL: fmt.Sprintf(SvixServerURLTemplate, svixHost),
Debug: false,
},
Logger: logger,
})
if err != nil {
return nil, fmt.Errorf("failed to create webhook handler: %w", err)
Expand Down

0 comments on commit d7fc077

Please sign in to comment.