Skip to content

Commit

Permalink
Merge pull request #1602 from openmeterio/fix-svix-timeout
Browse files Browse the repository at this point in the history
refactor: mk webhook event type register non-critical
  • Loading branch information
chrisgacsal authored Oct 1, 2024
2 parents 0efae86 + db06a75 commit e63cd3b
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 17 deletions.
7 changes: 5 additions & 2 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
"github.com/openmeterio/openmeter/openmeter/watermill/router"
"github.com/openmeterio/openmeter/pkg/contextx"
entdriver "github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver"
"github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver"
"github.com/openmeterio/openmeter/pkg/framework/operation"
"github.com/openmeterio/openmeter/pkg/framework/pgdriver"
"github.com/openmeterio/openmeter/pkg/gosundheit"
Expand Down Expand Up @@ -305,7 +305,10 @@ func main() {
}

notificationWebhook, err := notificationwebhook.New(notificationwebhook.Config{
SvixConfig: conf.Svix,
SvixConfig: conf.Svix,
RegistrationTimeout: conf.Notification.Webhook.EventTypeRegistrationTimeout,
SkipRegistrationOnError: conf.Notification.Webhook.SkipEventTypeRegistrationOnError,
Logger: logger.WithGroup("notification.webhook"),
})
if err != nil {
logger.Error("failed to initialize notification repository", "error", err)
Expand Down
5 changes: 4 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,10 @@ func main() {

var notificationWebhook notificationwebhook.Handler
notificationWebhook, err = notificationwebhook.New(notificationwebhook.Config{
SvixConfig: conf.Svix,
SvixConfig: conf.Svix,
RegistrationTimeout: conf.Notification.Webhook.EventTypeRegistrationTimeout,
SkipRegistrationOnError: conf.Notification.Webhook.SkipEventTypeRegistrationOnError,
Logger: logger.WithGroup("notification.webhook"),
})
if err != nil {
logger.Error("failed to initialize notification webhook handler", "error", err)
Expand Down
3 changes: 3 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ meters:

notification:
enabled: true
# webhook:
# eventTypeRegistrationTimeout: 30s
# skipEventTypeRegistrationOnError: false

svix:
apiKey: secret
Expand Down
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ func TestComplete(t *testing.T) {
},
ConsumerGroupName: "om_notification_service",
},
Webhook: WebhookConfiguration{
EventTypeRegistrationTimeout: notificationwebhook.DefaultRegistrationTimeout,
SkipEventTypeRegistrationOnError: false,
},
},
Svix: notificationwebhook.SvixConfig{
APIKey: "test-svix-token",
Expand Down
14 changes: 14 additions & 0 deletions config/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,25 @@ package config

import (
"fmt"
"time"

"github.com/spf13/viper"

"github.com/openmeterio/openmeter/openmeter/notification/webhook"
)

type WebhookConfiguration struct {
// Timeout for registering event types in webhook provider
EventTypeRegistrationTimeout time.Duration
// Skip registering event types on unsuccessful attempt instead of returning with error
SkipEventTypeRegistrationOnError bool
}

type NotificationConfiguration struct {
Enabled bool
Consumer ConsumerConfiguration

Webhook WebhookConfiguration
}

func (c NotificationConfiguration) Validate() error {
Expand All @@ -22,4 +34,6 @@ func ConfigureNotification(v *viper.Viper) {
ConfigureConsumer(v, "notification.consumer")
v.SetDefault("notification.consumer.dlq.topic", "om_sys.notification_service_dlq")
v.SetDefault("notification.consumer.consumerGroupName", "om_notification_service")
v.SetDefault("notification.webhook.eventTypeRegistrationTimeout", webhook.DefaultRegistrationTimeout)
v.SetDefault("notification.webhook.skipEventTypeRegistrationOnError", false)
}
10 changes: 5 additions & 5 deletions openmeter/notification/webhook/svix.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ func newSvixWebhookHandler(config SvixConfig) (Handler, error) {
}

func (h svixWebhookHandler) RegisterEventTypes(ctx context.Context, params RegisterEventTypesInputs) error {
for _, evenType := range params.EvenTypes {
for _, eventType := range params.EventTypes {
input := &svix.EventTypeUpdate{
Description: evenType.Description,
Description: eventType.Description,
FeatureFlag: *svix.NullableString(nil),
GroupName: *svix.NullableString(&evenType.GroupName),
Schemas: evenType.Schemas,
GroupName: *svix.NullableString(&eventType.GroupName),
Schemas: eventType.Schemas,
}

_, err := h.client.EventType.Update(ctx, evenType.Name, input)
_, err := h.client.EventType.Update(ctx, eventType.Name, input)
if err != nil {
err = unwrapSvixError(err)

Expand Down
37 changes: 29 additions & 8 deletions openmeter/notification/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"log/slog"
"strings"
"time"
)
Expand Down Expand Up @@ -240,7 +241,7 @@ func (i SendMessageInput) Validate() error {
}

type RegisterEventTypesInputs struct {
EvenTypes []EventType
EventTypes []EventType
AllowUpdate bool
}

Expand All @@ -263,31 +264,51 @@ type Handler interface {
SendMessage(ctx context.Context, params SendMessageInput) (*Message, error)
}

const (
DefaultRegistrationTimeout = 30 * time.Second
)

type Config struct {
SvixConfig

RegisterEvenTypes []EventType
RegisterEventTypes []EventType
RegistrationTimeout time.Duration
SkipRegistrationOnError bool

Logger *slog.Logger
}

func New(config Config) (Handler, error) {
if config.RegisterEvenTypes == nil {
config.RegisterEvenTypes = NotificationEventTypes
if config.Logger == nil {
return nil, errors.New("logger is required")
}

if config.RegisterEventTypes == nil {
config.RegisterEventTypes = NotificationEventTypes
}

if config.RegistrationTimeout == 0 {
config.RegistrationTimeout = DefaultRegistrationTimeout
}

handler, err := newSvixWebhookHandler(config.SvixConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize Svix webhook handler: %w", err)
}

if len(config.RegisterEvenTypes) > 0 {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if len(config.RegisterEventTypes) > 0 {
ctx, cancel := context.WithTimeout(context.Background(), config.RegistrationTimeout)
defer cancel()

err = handler.RegisterEventTypes(ctx, RegisterEventTypesInputs{
EvenTypes: config.RegisterEvenTypes,
EventTypes: config.RegisterEventTypes,
})
if err != nil {
return nil, fmt.Errorf("failed to register event types: %w", err)
if config.SkipRegistrationOnError {
config.Logger.Warn("failed to register event types", "error", err)
} else {
return nil, fmt.Errorf("failed to register event types: %w", err)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions test/notification/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func NewTestEnv(t *testing.T, ctx context.Context) (TestEnv, error) {
ServerURL: fmt.Sprintf(SvixServerURLTemplate, svixHost),
Debug: false,
},
Logger: logger,
})
if err != nil {
return nil, fmt.Errorf("failed to create webhook handler: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion test/notification/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type WebhookTestSuite struct {

func (s *WebhookTestSuite) Setup(ctx context.Context, t *testing.T) {
err := s.Env.NotificationWebhook().RegisterEventTypes(ctx, notificationwebhook.RegisterEventTypesInputs{
EvenTypes: notificationwebhook.NotificationEventTypes,
EventTypes: notificationwebhook.NotificationEventTypes,
})
assert.NoError(t, err, "Registering event types must not fail")
}
Expand Down

0 comments on commit e63cd3b

Please sign in to comment.