diff --git a/_examples/basic/2-realtime-feed/producer/main.go b/_examples/basic/2-realtime-feed/producer/main.go index ce17e6f22..d62c565f6 100644 --- a/_examples/basic/2-realtime-feed/producer/main.go +++ b/_examples/basic/2-realtime-feed/producer/main.go @@ -28,8 +28,6 @@ func main() { logger := watermill.NewStdLogger(false, false) logger.Info("Starting the producer", watermill.LogFields{}) - rand.Seed(time.Now().Unix()) - publisher, err := kafka.NewPublisher( kafka.PublisherConfig{ Brokers: brokers, diff --git a/_examples/basic/5-cqrs-protobuf/docker-compose.yml b/_examples/basic/5-cqrs-protobuf/docker-compose.yml index 5d5f9c822..1cf2b9a41 100644 --- a/_examples/basic/5-cqrs-protobuf/docker-compose.yml +++ b/_examples/basic/5-cqrs-protobuf/docker-compose.yml @@ -1,7 +1,7 @@ version: '3' services: golang: - image: golang:1.19 + image: golang:1.20 restart: unless-stopped ports: - 8080:8080 diff --git a/_examples/basic/5-cqrs-protobuf/go.mod b/_examples/basic/5-cqrs-protobuf/go.mod index 5d4994981..a86733eca 100644 --- a/_examples/basic/5-cqrs-protobuf/go.mod +++ b/_examples/basic/5-cqrs-protobuf/go.mod @@ -1,7 +1,7 @@ module main.go require ( - github.com/ThreeDotsLabs/watermill v1.2.0-rc.11 + github.com/ThreeDotsLabs/watermill v1.2.1-0.20230623082929-7fe0ca7ad2cc github.com/ThreeDotsLabs/watermill-amqp/v2 v2.0.7 github.com/golang/protobuf v1.5.2 github.com/pkg/errors v0.9.1 diff --git a/_examples/basic/5-cqrs-protobuf/go.sum b/_examples/basic/5-cqrs-protobuf/go.sum index b720081a1..35b212220 100644 --- a/_examples/basic/5-cqrs-protobuf/go.sum +++ b/_examples/basic/5-cqrs-protobuf/go.sum @@ -1,5 +1,13 @@ github.com/ThreeDotsLabs/watermill v1.2.0-rc.11 h1:tQJ3L/AnfliXaxaq+ElHOfzi0Vx+AN8cAnIOLcUTrxo= github.com/ThreeDotsLabs/watermill v1.2.0-rc.11/go.mod h1:QLZSaklpSZ/7yv288LL2DFOgCEi86VYEmQvzmaMlHoA= +github.com/ThreeDotsLabs/watermill v1.2.1-0.20230620142403-c0e20f18aef0 h1:920Tfprg3Lwn7ieUOtnSZSz73UhZLeqzkf/fozF5vZ4= +github.com/ThreeDotsLabs/watermill v1.2.1-0.20230620142403-c0e20f18aef0/go.mod h1:zn/7F0TGOr1K/RX7bFbVxii6p1abOMLllAMpVpKinQg= +github.com/ThreeDotsLabs/watermill v1.2.1-0.20230620191859-b41f6a2770be h1:7c3tZkJ3w2jB0S9xRkUvMNVUD/AE49c6wAFr1AyMy/g= +github.com/ThreeDotsLabs/watermill v1.2.1-0.20230620191859-b41f6a2770be/go.mod h1:zn/7F0TGOr1K/RX7bFbVxii6p1abOMLllAMpVpKinQg= +github.com/ThreeDotsLabs/watermill v1.2.1-0.20230622094202-6eb02dc0b0b8 h1:S+6+P94VcfyiJCtmP6q8mGoMuSGmeAZc8lfIbDJ1R/E= +github.com/ThreeDotsLabs/watermill v1.2.1-0.20230622094202-6eb02dc0b0b8/go.mod h1:zn/7F0TGOr1K/RX7bFbVxii6p1abOMLllAMpVpKinQg= +github.com/ThreeDotsLabs/watermill v1.2.1-0.20230623082929-7fe0ca7ad2cc h1:j8hIjk/pE05TmiEoiljsJPTZCL/4OJEUifWoFlMs0HI= +github.com/ThreeDotsLabs/watermill v1.2.1-0.20230623082929-7fe0ca7ad2cc/go.mod h1:zn/7F0TGOr1K/RX7bFbVxii6p1abOMLllAMpVpKinQg= github.com/ThreeDotsLabs/watermill-amqp/v2 v2.0.7 h1:AUSXLqdsA1LXWuoQSkIRG9FhMo6EYM9GSgd+bnf1W0w= github.com/ThreeDotsLabs/watermill-amqp/v2 v2.0.7/go.mod h1:DKOBUoMVtPMV8jBEbRP3NL6TgnOMyRvVza3W297cdqU= github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= diff --git a/_examples/basic/5-cqrs-protobuf/main.go b/_examples/basic/5-cqrs-protobuf/main.go index e247acf57..ab984264a 100644 --- a/_examples/basic/5-cqrs-protobuf/main.go +++ b/_examples/basic/5-cqrs-protobuf/main.go @@ -207,56 +207,149 @@ func main() { // List of available middlewares you can find in message/router/middleware. router.AddMiddleware(middleware.Recoverer) - // cqrs.Facade is facade for Command and Event buses and processors. - // You can use facade, or create buses and processors manually (you can inspire with cqrs.NewFacade) - cqrsFacade, err := cqrs.NewFacade(cqrs.FacadeConfig{ - GenerateCommandsTopic: func(commandName string) string { + commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{ + GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { // we are using queue RabbitMQ config, so we need to have topic per command type - return commandName + return params.CommandName, nil }, - CommandHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.CommandHandler { - return []cqrs.CommandHandler{ - BookRoomHandler{eb}, - OrderBeerHandler{eb}, - } + OnSend: func(params cqrs.CommandBusOnSendParams) error { + logger.Info("Sending command", watermill.LogFields{ + "command_name": params.CommandName, + }) + + params.Message.Metadata.Set("sent_at", time.Now().String()) + + return nil }, - CommandsPublisher: commandsPublisher, - CommandsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) { - // we can reuse subscriber, because all commands have separated topics - return commandsSubscriber, nil + Marshaler: cqrsMarshaler, + Logger: logger, + }) + if err != nil { + panic(err) + } + + commandProcessor, err := cqrs.NewCommandProcessorWithConfig( + router, + cqrs.CommandProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { + // we are using queue RabbitMQ config, so we need to have topic per command type + return params.CommandName, nil + }, + SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { + // we can reuse subscriber, because all commands have separated topics + return commandsSubscriber, nil + }, + OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error { + start := time.Now() + + err := params.Handler.Handle(params.Message.Context(), params.Command) + + logger.Info("Command handled", watermill.LogFields{ + "command_name": params.CommandName, + "duration": time.Since(start), + "err": err, + }) + + return err + }, + Marshaler: cqrsMarshaler, + Logger: logger, }, - GenerateEventsTopic: func(eventName string) string { + ) + if err != nil { + panic(err) + } + + eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{ + GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { // because we are using PubSub RabbitMQ config, we can use one topic for all events - return "events" + return "events", nil // we can also use topic per event type - // return eventName + // return params.EventName, nil }, - EventHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.EventHandler { - return []cqrs.EventHandler{ - OrderBeerOnRoomBooked{cb}, - NewBookingsFinancialReport(), - } - }, - EventsPublisher: eventsPublisher, - EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) { - config := amqp.NewDurablePubSubConfig( - amqpAddress, - amqp.GenerateQueueNameTopicNameWithSuffix(handlerName), - ) - - return amqp.NewSubscriber(config, logger) + + OnPublish: func(params cqrs.OnEventSendParams) error { + logger.Info("Publishing event", watermill.LogFields{ + "event_name": params.EventName, + }) + + params.Message.Metadata.Set("published_at", time.Now().String()) + + return nil }, - Router: router, - CommandEventMarshaler: cqrsMarshaler, - Logger: logger, + + Marshaler: cqrsMarshaler, + Logger: logger, }) if err != nil { panic(err) } + eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig( + router, + cqrs.EventGroupProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) { + return "events", nil + }, + SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) { + config := amqp.NewDurablePubSubConfig( + amqpAddress, + amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName), + ) + + return amqp.NewSubscriber(config, logger) + }, + + OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error { + start := time.Now() + + err := params.Handler.Handle(params.Message.Context(), params.Event) + + logger.Info("Event handled", watermill.LogFields{ + "event_name": params.EventName, + "duration": time.Since(start), + "err": err, + }) + + return err + }, + + Marshaler: cqrsMarshaler, + Logger: logger, + }, + ) + if err != nil { + panic(err) + } + + err = commandProcessor.AddHandlers( + BookRoomHandler{eventBus}, + OrderBeerHandler{eventBus}, + ) + if err != nil { + panic(err) + } + + err = eventProcessor.AddHandlersGroup( + "events", + OrderBeerOnRoomBooked{commandBus}, + + NewBookingsFinancialReport(), + + cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error { + logger.Info("Beer ordered", watermill.LogFields{ + "room_id": event.RoomId, + }) + return nil + }), + ) + if err != nil { + panic(err) + } + // publish BookRoom commands every second to simulate incoming traffic - go publishCommands(cqrsFacade.CommandBus()) + go publishCommands(commandBus) // processors are based on router, so they will work when router will start if err := router.Run(context.Background()); err != nil { diff --git a/_examples/real-world-examples/consumer-groups/api/main.go b/_examples/real-world-examples/consumer-groups/api/main.go index c9e6ee82f..8b2cfde23 100644 --- a/_examples/real-world-examples/consumer-groups/api/main.go +++ b/_examples/real-world-examples/consumer-groups/api/main.go @@ -2,10 +2,8 @@ package main import ( "context" - "math/rand" "net/http" "sync" - "time" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream" @@ -16,7 +14,6 @@ import ( ) func main() { - rand.Seed(time.Now().UnixNano()) logger := watermill.NewStdLogger(false, false) router, err := message.NewRouter(message.RouterConfig{}, logger) diff --git a/_examples/real-world-examples/server-sent-events/server/main.go b/_examples/real-world-examples/server-sent-events/server/main.go index 82c004015..977efa499 100644 --- a/_examples/real-world-examples/server-sent-events/server/main.go +++ b/_examples/real-world-examples/server-sent-events/server/main.go @@ -1,16 +1,12 @@ package main import ( - "math/rand" "net/http" - "time" "github.com/ThreeDotsLabs/watermill" ) func main() { - rand.Seed(time.Now().Unix()) - logger := watermill.NewStdLogger(false, false) postsStorage := NewPostsStorage() diff --git a/components/cqrs/command_bus.go b/components/cqrs/command_bus.go index 8c4d6e7ea..161580a00 100644 --- a/components/cqrs/command_bus.go +++ b/components/cqrs/command_bus.go @@ -2,19 +2,93 @@ package cqrs import ( "context" + stdErrors "errors" + "github.com/ThreeDotsLabs/watermill" "github.com/pkg/errors" "github.com/ThreeDotsLabs/watermill/message" ) +type CommandBusConfig struct { + // GeneratePublishTopic is used to generate topic for publishing command. + GeneratePublishTopic CommandBusGeneratePublishTopicFn + + // OnSend is called before publishing the command. + // The *message.Message can be modified. + // + // This option is not required. + OnSend CommandBusOnSendFn + + // Marshaler is used to marshal and unmarshal commands. + // It is required. + Marshaler CommandEventMarshaler + + // Logger instance used to log. + // If not provided, watermill.NopLogger is used. + Logger watermill.LoggerAdapter +} + +func (c *CommandBusConfig) setDefaults() { + if c.Logger == nil { + c.Logger = watermill.NopLogger{} + } +} + +func (c CommandBusConfig) Validate() error { + var err error + + if c.Marshaler == nil { + err = stdErrors.Join(err, errors.New("missing Marshaler")) + } + + if c.GeneratePublishTopic == nil { + err = stdErrors.Join(err, errors.New("missing GeneratePublishTopic")) + } + + return err +} + +type CommandBusGeneratePublishTopicFn func(CommandBusGeneratePublishTopicParams) (string, error) + +type CommandBusGeneratePublishTopicParams struct { + CommandName string + Command any +} + +type CommandBusOnSendFn func(params CommandBusOnSendParams) error + +type CommandBusOnSendParams struct { + CommandName string + Command any + + // Message is never nil and can be modified. + Message *message.Message +} + // CommandBus transports commands to command handlers. type CommandBus struct { - publisher message.Publisher - generateTopic func(commandName string) string - marshaler CommandEventMarshaler + publisher message.Publisher + + config CommandBusConfig } +// NewCommandBusWithConfig creates a new CommandBus. +func NewCommandBusWithConfig(publisher message.Publisher, config CommandBusConfig) (*CommandBus, error) { + if publisher == nil { + return nil, errors.New("missing publisher") + } + + config.setDefaults() + if err := config.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid config") + } + + return &CommandBus{publisher, config}, nil +} + +// NewCommandBus creates a new CommandBus. +// Deprecated: use NewCommandBusWithConfig instead. func NewCommandBus( publisher message.Publisher, generateTopic func(commandName string) string, @@ -30,20 +104,51 @@ func NewCommandBus( return nil, errors.New("missing marshaler") } - return &CommandBus{publisher, generateTopic, marshaler}, nil + return &CommandBus{publisher, CommandBusConfig{ + GeneratePublishTopic: func(params CommandBusGeneratePublishTopicParams) (string, error) { + return generateTopic(params.CommandName), nil + }, + Marshaler: marshaler, + }}, nil } // Send sends command to the command bus. -func (c CommandBus) Send(ctx context.Context, cmd interface{}) error { - msg, err := c.marshaler.Marshal(cmd) +func (c CommandBus) Send(ctx context.Context, cmd any) error { + msg, topicName, err := c.newMessage(ctx, cmd) if err != nil { return err } - commandName := c.marshaler.Name(cmd) - topicName := c.generateTopic(commandName) + return c.publisher.Publish(topicName, msg) +} + +func (c CommandBus) newMessage(ctx context.Context, command any) (*message.Message, string, error) { + msg, err := c.config.Marshaler.Marshal(command) + if err != nil { + return nil, "", err + } + + commandName := c.config.Marshaler.Name(command) + topicName, err := c.config.GeneratePublishTopic(CommandBusGeneratePublishTopicParams{ + CommandName: commandName, + Command: command, + }) + if err != nil { + return nil, "", errors.Wrap(err, "cannot generate topic name") + } msg.SetContext(ctx) - return c.publisher.Publish(topicName, msg) + if c.config.OnSend != nil { + err := c.config.OnSend(CommandBusOnSendParams{ + CommandName: commandName, + Command: command, + Message: msg, + }) + if err != nil { + return nil, "", errors.Wrap(err, "cannot execute OnSend") + } + } + + return msg, topicName, nil } diff --git a/components/cqrs/command_bus_test.go b/components/cqrs/command_bus_test.go index cc3f6b8a6..2a076ca08 100644 --- a/components/cqrs/command_bus_test.go +++ b/components/cqrs/command_bus_test.go @@ -4,32 +4,84 @@ import ( "context" "testing" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/ThreeDotsLabs/watermill/components/cqrs" ) +func TestCommandBusConfig_Validate(t *testing.T) { + testCases := []struct { + Name string + ModifyValidConfig func(*cqrs.CommandBusConfig) + ExpectedErr error + }{ + { + Name: "valid_config", + ModifyValidConfig: nil, + ExpectedErr: nil, + }, + { + Name: "missing_Marshaler", + ModifyValidConfig: func(c *cqrs.CommandBusConfig) { + c.Marshaler = nil + }, + ExpectedErr: errors.Errorf("missing Marshaler"), + }, + { + Name: "missing_GeneratePublishTopic", + ModifyValidConfig: func(c *cqrs.CommandBusConfig) { + c.GeneratePublishTopic = nil + }, + ExpectedErr: errors.Errorf("missing GeneratePublishTopic"), + }, + } + for i := range testCases { + tc := testCases[i] + + t.Run(tc.Name, func(t *testing.T) { + validConfig := cqrs.CommandBusConfig{ + GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { + return "", nil + }, + Marshaler: cqrs.JSONMarshaler{}, + } + + if tc.ModifyValidConfig != nil { + tc.ModifyValidConfig(&validConfig) + } + + err := validConfig.Validate() + if tc.ExpectedErr == nil { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tc.ExpectedErr.Error()) + } + }) + } +} + func TestNewCommandBus(t *testing.T) { pub := newPublisherStub() - generateTopic := func(commandName string) string { - return "" + + config := cqrs.CommandBusConfig{ + GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { + return "", nil + }, + Marshaler: cqrs.JSONMarshaler{}, } - marshaler := cqrs.JSONMarshaler{} - cb, err := cqrs.NewCommandBus(pub, generateTopic, marshaler) + require.NoError(t, config.Validate()) + + cb, err := cqrs.NewCommandBusWithConfig(pub, config) assert.NotNil(t, cb) assert.NoError(t, err) - cb, err = cqrs.NewCommandBus(nil, generateTopic, marshaler) - assert.Nil(t, cb) - assert.Error(t, err) - - cb, err = cqrs.NewCommandBus(pub, nil, marshaler) - assert.Nil(t, cb) - assert.Error(t, err) + config.GeneratePublishTopic = nil + require.Error(t, config.Validate()) - cb, err = cqrs.NewCommandBus(pub, generateTopic, nil) + cb, err = cqrs.NewCommandBusWithConfig(pub, config) assert.Nil(t, cb) assert.Error(t, err) } @@ -39,12 +91,14 @@ type contextKey string func TestCommandBus_Send_ContextPropagation(t *testing.T) { publisher := newPublisherStub() - commandBus, err := cqrs.NewCommandBus( + commandBus, err := cqrs.NewCommandBusWithConfig( publisher, - func(commandName string) string { - return "whatever" + cqrs.CommandBusConfig{ + GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { + return "whatever", nil + }, + Marshaler: cqrs.JSONMarshaler{}, }, - cqrs.JSONMarshaler{}, ) require.NoError(t, err) @@ -57,15 +111,64 @@ func TestCommandBus_Send_ContextPropagation(t *testing.T) { } func TestCommandBus_Send_topic_name(t *testing.T) { - cb, err := cqrs.NewCommandBus( + cb, err := cqrs.NewCommandBusWithConfig( assertPublishTopicPublisher{ExpectedTopic: "cqrs_test.TestCommand", T: t}, - func(commandName string) string { - return commandName + cqrs.CommandBusConfig{ + GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { + return params.CommandName, nil + }, + Marshaler: cqrs.JSONMarshaler{}, + }, + ) + require.NoError(t, err) + + err = cb.Send(context.Background(), TestCommand{}) + require.NoError(t, err) +} + +func TestCommandBus_Send_OnSend(t *testing.T) { + publisher := newPublisherStub() + + cb, err := cqrs.NewCommandBusWithConfig( + publisher, + cqrs.CommandBusConfig{ + GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { + return "whatever", nil + }, + Marshaler: cqrs.JSONMarshaler{}, + OnSend: func(params cqrs.CommandBusOnSendParams) error { + params.Message.Metadata.Set("key", "value") + return nil + }, }, - cqrs.JSONMarshaler{}, ) require.NoError(t, err) err = cb.Send(context.Background(), TestCommand{}) require.NoError(t, err) + + assert.Equal(t, "value", publisher.messages["whatever"][0].Metadata.Get("key")) +} + +func TestCommandBus_Send_OnSend_error(t *testing.T) { + publisher := newPublisherStub() + + expectedErr := errors.New("some error") + + cb, err := cqrs.NewCommandBusWithConfig( + publisher, + cqrs.CommandBusConfig{ + GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { + return "whatever", nil + }, + Marshaler: cqrs.JSONMarshaler{}, + OnSend: func(params cqrs.CommandBusOnSendParams) error { + return expectedErr + }, + }, + ) + require.NoError(t, err) + + err = cb.Send(context.Background(), TestCommand{}) + require.EqualError(t, err, "cannot execute OnSend: some error") } diff --git a/components/cqrs/command_handler.go b/components/cqrs/command_handler.go new file mode 100644 index 000000000..dc06edca8 --- /dev/null +++ b/components/cqrs/command_handler.go @@ -0,0 +1,56 @@ +package cqrs + +import ( + "context" +) + +// CommandHandler receives a command defined by NewCommand and handles it with the Handle method. +// If using DDD, CommandHandler may modify and persist the aggregate. +// +// In contrast to EventHandler, every Command must have only one CommandHandler. +// +// One instance of CommandHandler is used during handling messages. +// When multiple commands are delivered at the same time, Handle method can be executed multiple times at the same time. +// Because of that, Handle method needs to be thread safe! +type CommandHandler interface { + // HandlerName is the name used in message.Router while creating handler. + // + // It will be also passed to CommandsSubscriberConstructor. + // May be useful, for example, to create a consumer group per each handler. + // + // WARNING: If HandlerName was changed and is used for generating consumer groups, + // it may result with **reconsuming all messages**! + HandlerName() string + + NewCommand() any + + Handle(ctx context.Context, cmd any) error +} + +type genericCommandHandler[Command any] struct { + handleFunc func(ctx context.Context, cmd *Command) error + handlerName string +} + +// NewCommandHandler creates a new CommandHandler implementation based on provided function +// and command type inferred from function argument. +func NewCommandHandler[Command any](handlerName string, handleFunc func(ctx context.Context, cmd *Command) error) CommandHandler { + return &genericCommandHandler[Command]{ + handleFunc: handleFunc, + handlerName: handlerName, + } +} + +func (c genericCommandHandler[Command]) HandlerName() string { + return c.handlerName +} + +func (c genericCommandHandler[Command]) NewCommand() any { + tVar := new(Command) + return tVar +} + +func (c genericCommandHandler[Command]) Handle(ctx context.Context, cmd any) error { + command := cmd.(*Command) + return c.handleFunc(ctx, command) +} diff --git a/components/cqrs/command_handler_test.go b/components/cqrs/command_handler_test.go new file mode 100644 index 000000000..4df4318fc --- /dev/null +++ b/components/cqrs/command_handler_test.go @@ -0,0 +1,32 @@ +package cqrs_test + +import ( + "context" + "fmt" + "testing" + + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/stretchr/testify/assert" +) + +type SomeCommand struct { + Foo string +} + +func TestNewCommandHandler(t *testing.T) { + cmdToSend := &SomeCommand{"bar"} + + ch := cqrs.NewCommandHandler( + "some_handler", + func(ctx context.Context, cmd *SomeCommand) error { + assert.Equal(t, cmdToSend, cmd) + return fmt.Errorf("some error") + }, + ) + + assert.Equal(t, "some_handler", ch.HandlerName()) + assert.Equal(t, &SomeCommand{}, ch.NewCommand()) + + err := ch.Handle(context.Background(), cmdToSend) + assert.EqualError(t, err, "some error") +} diff --git a/components/cqrs/command_processor.go b/components/cqrs/command_processor.go index 58f9db319..f74519f9b 100644 --- a/components/cqrs/command_processor.go +++ b/components/cqrs/command_processor.go @@ -1,7 +1,7 @@ package cqrs import ( - "context" + stdErrors "errors" "fmt" "github.com/pkg/errors" @@ -10,45 +10,129 @@ import ( "github.com/ThreeDotsLabs/watermill/message" ) -// CommandHandler receives a command defined by NewCommand and handles it with the Handle method. -// If using DDD, CommandHandler may modify and persist the aggregate. -// -// In contrast to EventHandler, every Command must have only one CommandHandler. -// -// One instance of CommandHandler is used during handling messages. -// When multiple commands are delivered at the same time, Handle method can be executed multiple times at the same time. -// Because of that, Handle method needs to be thread safe! -type CommandHandler interface { - // HandlerName is the name used in message.Router while creating handler. +type CommandProcessorConfig struct { + // GenerateSubscribeTopic is used to generate topic for subscribing command. + GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn + + // SubscriberConstructor is used to create subscriber for CommandHandler. + SubscriberConstructor CommandProcessorSubscriberConstructorFn + + // OnHandle is called before handling command. + // OnHandle works in a similar way to middlewares: you can inject additional logic before and after handling a command. + // + // Because of that, you need to explicitly call params.Handler.Handle() to handle the command. + // func(params CommandProcessorOnHandleParams) (err error) { + // // logic before handle + // // (...) + // + // err := params.Handler.Handle(params.Message.Context(), params.Command) + // + // // logic after handle + // // (...) // - // It will be also passed to CommandsSubscriberConstructor. - // May be useful, for example, to create a consumer group per each handler. + // return err + // } // - // WARNING: If HandlerName was changed and is used for generating consumer groups, - // it may result with **reconsuming all messages**! - HandlerName() string + // This option is not required. + OnHandle CommandProcessorOnHandleFn - NewCommand() interface{} + // Marshaler is used to marshal and unmarshal commands. + // It is required. + Marshaler CommandEventMarshaler - Handle(ctx context.Context, cmd interface{}) error + // Logger instance used to log. + // If not provided, watermill.NopLogger is used. + Logger watermill.LoggerAdapter + + // If true, CommandProcessor will ack messages even if CommandHandler returns an error. + // If RequestReplyEnabled is enabled and sending reply fails, the message will be nack-ed anyway. + AckCommandHandlingErrors bool + + // disableRouterAutoAddHandlers is used to keep backwards compatibility. + // it is set when CommandProcessor is created by NewCommandProcessor. + // Deprecated: please migrate to NewCommandProcessorWithConfig. + disableRouterAutoAddHandlers bool } -// CommandsSubscriberConstructor creates subscriber for CommandHandler. +func (c *CommandProcessorConfig) setDefaults() { + if c.Logger == nil { + c.Logger = watermill.NopLogger{} + } +} + +func (c CommandProcessorConfig) Validate() error { + var err error + + if c.Marshaler == nil { + err = stdErrors.Join(err, errors.New("missing Marshaler")) + } + + if c.GenerateSubscribeTopic == nil { + err = stdErrors.Join(err, errors.New("missing GenerateSubscribeTopic")) + } + if c.SubscriberConstructor == nil { + err = stdErrors.Join(err, errors.New("missing SubscriberConstructor")) + } + + return err +} + +type CommandProcessorGenerateSubscribeTopicFn func(CommandProcessorGenerateSubscribeTopicParams) (string, error) + +type CommandProcessorGenerateSubscribeTopicParams struct { + CommandName string + CommandHandler CommandHandler +} + +// CommandProcessorSubscriberConstructorFn creates subscriber for CommandHandler. // It allows you to create a separate customized Subscriber for every command handler. -type CommandsSubscriberConstructor func(handlerName string) (message.Subscriber, error) +type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) + +type CommandProcessorSubscriberConstructorParams struct { + HandlerName string + Handler CommandHandler +} + +type CommandProcessorOnHandleFn func(params CommandProcessorOnHandleParams) error + +type CommandProcessorOnHandleParams struct { + Handler CommandHandler + + CommandName string + Command any + + // Message is never nil and can be modified. + Message *message.Message +} // CommandProcessor determines which CommandHandler should handle the command received from the command bus. type CommandProcessor struct { - handlers []CommandHandler - generateTopic func(commandName string) string + router *message.Router - subscriberConstructor CommandsSubscriberConstructor + handlers []CommandHandler - marshaler CommandEventMarshaler - logger watermill.LoggerAdapter + config CommandProcessorConfig +} + +func NewCommandProcessorWithConfig(router *message.Router, config CommandProcessorConfig) (*CommandProcessor, error) { + config.setDefaults() + + if err := config.Validate(); err != nil { + return nil, err + } + + if router == nil && !config.disableRouterAutoAddHandlers { + return nil, errors.New("missing router") + } + + return &CommandProcessor{ + router: router, + config: config, + }, nil } // NewCommandProcessor creates a new CommandProcessor. +// Deprecated. Use NewCommandProcessorWithConfig instead. func NewCommandProcessor( handlers []CommandHandler, generateTopic func(commandName string) string, @@ -65,20 +149,66 @@ func NewCommandProcessor( if subscriberConstructor == nil { return nil, errors.New("missing subscriberConstructor") } - if marshaler == nil { - return nil, errors.New("missing marshaler") + + cp, err := NewCommandProcessorWithConfig( + nil, + CommandProcessorConfig{ + GenerateSubscribeTopic: func(params CommandProcessorGenerateSubscribeTopicParams) (string, error) { + return generateTopic(params.CommandName), nil + }, + SubscriberConstructor: func(params CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return subscriberConstructor(params.HandlerName) + }, + Marshaler: marshaler, + Logger: logger, + disableRouterAutoAddHandlers: true, + }, + ) + if err != nil { + return nil, err } - if logger == nil { - logger = watermill.NopLogger{} + + for _, handler := range handlers { + if err := cp.AddHandlers(handler); err != nil { + return nil, err + } } - return &CommandProcessor{ - handlers, - generateTopic, - subscriberConstructor, - marshaler, - logger, - }, nil + return cp, nil +} + +// CommandsSubscriberConstructor creates subscriber for CommandHandler. +// It allows you to create a separate customized Subscriber for every command handler. +// +// Deprecated: please use CommandProcessorSubscriberConstructorFn instead. +type CommandsSubscriberConstructor func(handlerName string) (message.Subscriber, error) + +// AddHandlers adds a new CommandHandler to the CommandProcessor and adds it to the router. +func (p *CommandProcessor) AddHandlers(handlers ...CommandHandler) error { + handledCommands := map[string]struct{}{} + for _, handler := range handlers { + commandName := p.config.Marshaler.Name(handler.NewCommand()) + if _, ok := handledCommands[commandName]; ok { + return DuplicateCommandHandlerError{commandName} + } + + handledCommands[commandName] = struct{}{} + } + + if p.config.disableRouterAutoAddHandlers { + p.handlers = append(p.handlers, handlers...) + return nil + } + + for _, handler := range handlers { + if err := p.addHandlerToRouter(p.router, handler); err != nil { + return err + } + + p.handlers = append(p.handlers, handler) + } + + return nil } // DuplicateCommandHandlerError occurs when a handler with the same name already exists. @@ -91,45 +221,65 @@ func (d DuplicateCommandHandlerError) Error() string { } // AddHandlersToRouter adds the CommandProcessor's handlers to the given router. +// It should be called only once per CommandProcessor instance. +// +// It is required to call AddHandlersToRouter only if command processor is created with NewCommandProcessor (disableRouterAutoAddHandlers is set to true). +// Deprecated: please migrate to command processor created by NewCommandProcessorWithConfig. func (p CommandProcessor) AddHandlersToRouter(r *message.Router) error { - handledCommands := map[string]struct{}{} + if !p.config.disableRouterAutoAddHandlers { + return errors.New("AddHandlersToRouter should be called only when using deprecated NewCommandProcessor") + } for i := range p.Handlers() { handler := p.handlers[i] - handlerName := handler.HandlerName() - commandName := p.marshaler.Name(handler.NewCommand()) - topicName := p.generateTopic(commandName) - if _, ok := handledCommands[commandName]; ok { - return DuplicateCommandHandlerError{commandName} + if err := p.addHandlerToRouter(r, handler); err != nil { + return err } - handledCommands[commandName] = struct{}{} + } - logger := p.logger.With(watermill.LogFields{ - "command_handler_name": handlerName, - "topic": topicName, - }) + return nil +} - handlerFunc, err := p.routerHandlerFunc(handler, logger) - if err != nil { - return err - } +func (p CommandProcessor) addHandlerToRouter(r *message.Router, handler CommandHandler) error { + handlerName := handler.HandlerName() + commandName := p.config.Marshaler.Name(handler.NewCommand()) - logger.Debug("Adding CQRS command handler to router", nil) + topicName, err := p.config.GenerateSubscribeTopic(CommandProcessorGenerateSubscribeTopicParams{ + CommandName: commandName, + CommandHandler: handler, + }) + if err != nil { + return errors.Wrapf(err, "cannot generate topic for command handler %s", handlerName) + } - subscriber, err := p.subscriberConstructor(handlerName) - if err != nil { - return errors.Wrap(err, "cannot create subscriber for command processor") - } + logger := p.config.Logger.With(watermill.LogFields{ + "command_handler_name": handlerName, + "topic": topicName, + }) - r.AddNoPublisherHandler( - handlerName, - topicName, - subscriber, - handlerFunc, - ) + handlerFunc, err := p.routerHandlerFunc(handler, logger) + if err != nil { + return err } + logger.Debug("Adding CQRS command handler to router", nil) + + subscriber, err := p.config.SubscriberConstructor(CommandProcessorSubscriberConstructorParams{ + HandlerName: handlerName, + Handler: handler, + }) + if err != nil { + return errors.Wrap(err, "cannot create subscriber for command processor") + } + + r.AddNoPublisherHandler( + handlerName, + topicName, + subscriber, + handlerFunc, + ) + return nil } @@ -140,7 +290,7 @@ func (p CommandProcessor) Handlers() []CommandHandler { func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger watermill.LoggerAdapter) (message.NoPublishHandlerFunc, error) { cmd := handler.NewCommand() - cmdName := p.marshaler.Name(cmd) + cmdName := p.config.Marshaler.Name(cmd) if err := p.validateCommand(cmd); err != nil { return nil, err @@ -148,7 +298,7 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water return func(msg *message.Message) error { cmd := handler.NewCommand() - messageCmdName := p.marshaler.NameFromMessage(msg) + messageCmdName := p.config.Marshaler.NameFromMessage(msg) if messageCmdName != cmdName { logger.Trace("Received different command type than expected, ignoring", watermill.LogFields{ @@ -164,12 +314,30 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water "received_command_type": messageCmdName, }) - if err := p.marshaler.Unmarshal(msg, cmd); err != nil { + if err := p.config.Marshaler.Unmarshal(msg, cmd); err != nil { return err } - if err := handler.Handle(msg.Context(), cmd); err != nil { - logger.Debug("Error when handling command", watermill.LogFields{"err": err}) + handle := func(params CommandProcessorOnHandleParams) (err error) { + return params.Handler.Handle(params.Message.Context(), params.Command) + } + if p.config.OnHandle != nil { + handle = p.config.OnHandle + } + + err := handle(CommandProcessorOnHandleParams{ + Handler: handler, + CommandName: messageCmdName, + Command: cmd, + Message: msg, + }) + + if p.config.AckCommandHandlingErrors && err != nil { + logger.Error("Error when handling command, acking (AckCommandHandlingErrors is enabled)", err, nil) + return nil + } + if err != nil { + logger.Debug("Error when handling command, nacking", watermill.LogFields{"err": err}) return err } diff --git a/components/cqrs/command_processor_test.go b/components/cqrs/command_processor_test.go index 3ad28faf4..244f84c9f 100644 --- a/components/cqrs/command_processor_test.go +++ b/components/cqrs/command_processor_test.go @@ -3,7 +3,9 @@ package cqrs_test import ( "context" "testing" + "time" + "github.com/ThreeDotsLabs/watermill" "github.com/pkg/errors" "github.com/ThreeDotsLabs/watermill/components/cqrs" @@ -13,33 +15,90 @@ import ( "github.com/stretchr/testify/require" ) -func TestNewCommandProcessor(t *testing.T) { - handlers := []cqrs.CommandHandler{nonPointerCommandHandler{}} +func TestCommandProcessorConfig_Validate(t *testing.T) { + testCases := []struct { + Name string + ModifyValidConfig func(*cqrs.CommandProcessorConfig) + ExpectedErr error + }{ + { + Name: "valid_config", + ModifyValidConfig: nil, + ExpectedErr: nil, + }, + { + Name: "missing_Marshaler", + ModifyValidConfig: func(c *cqrs.CommandProcessorConfig) { + c.Marshaler = nil + }, + ExpectedErr: errors.Errorf("missing Marshaler"), + }, + { + Name: "missing_SubscriberConstructor", + ModifyValidConfig: func(c *cqrs.CommandProcessorConfig) { + c.SubscriberConstructor = nil + }, + ExpectedErr: errors.Errorf("missing SubscriberConstructor"), + }, + { + Name: "missing_GenerateHandlerSubscribeTopic", + ModifyValidConfig: func(c *cqrs.CommandProcessorConfig) { + c.GenerateSubscribeTopic = nil + }, + ExpectedErr: errors.Errorf("missing GenerateSubscribeTopic"), + }, + } + for i := range testCases { + tc := testCases[i] + + t.Run(tc.Name, func(t *testing.T) { + validConfig := cqrs.CommandProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { + return "", nil + }, + SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return nil, nil + }, + Marshaler: cqrs.JSONMarshaler{}, + } - generateTopic := func(commandName string) string { - return "" + if tc.ModifyValidConfig != nil { + tc.ModifyValidConfig(&validConfig) + } + + err := validConfig.Validate() + if tc.ExpectedErr == nil { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tc.ExpectedErr.Error()) + } + }) } - subscriberConstructor := func(handlerName string) (subscriber message.Subscriber, e error) { - return nil, nil +} + +func TestNewCommandProcessor(t *testing.T) { + config := cqrs.CommandProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { + return "", nil + }, + SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return nil, nil + }, + Marshaler: cqrs.JSONMarshaler{}, } + require.NoError(t, config.Validate()) - cp, err := cqrs.NewCommandProcessor(handlers, generateTopic, subscriberConstructor, cqrs.JSONMarshaler{}, nil) + router, err := message.NewRouter(message.RouterConfig{}, watermill.NewStdLogger(false, false)) + require.NoError(t, err) + + cp, err := cqrs.NewCommandProcessorWithConfig(router, config) assert.NotNil(t, cp) assert.NoError(t, err) - cp, err = cqrs.NewCommandProcessor([]cqrs.CommandHandler{}, generateTopic, subscriberConstructor, cqrs.JSONMarshaler{}, nil) - assert.Nil(t, cp) - assert.Error(t, err) - - cp, err = cqrs.NewCommandProcessor(handlers, nil, subscriberConstructor, cqrs.JSONMarshaler{}, nil) - assert.Nil(t, cp) - assert.Error(t, err) + config.SubscriberConstructor = nil + require.Error(t, config.Validate()) - cp, err = cqrs.NewCommandProcessor(handlers, generateTopic, nil, cqrs.JSONMarshaler{}, nil) - assert.Nil(t, cp) - assert.Error(t, err) - - cp, err = cqrs.NewCommandProcessor(handlers, generateTopic, subscriberConstructor, nil, nil) + cp, err = cqrs.NewCommandProcessorWithConfig(router, config) assert.Nil(t, cp) assert.Error(t, err) } @@ -62,23 +121,27 @@ func (nonPointerCommandHandler) Handle(ctx context.Context, cmd interface{}) err func TestCommandProcessor_non_pointer_command(t *testing.T) { ts := NewTestServices() - commandProcessor, err := cqrs.NewCommandProcessor( - []cqrs.CommandHandler{nonPointerCommandHandler{}}, - func(commandName string) string { - return "commands" - }, - func(handlerName string) (message.Subscriber, error) { - return ts.CommandsPubSub, nil - }, - ts.Marshaler, - ts.Logger, - ) - require.NoError(t, err) + handler := nonPointerCommandHandler{} router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) require.NoError(t, err) - err = commandProcessor.AddHandlersToRouter(router) + commandProcessor, err := cqrs.NewCommandProcessorWithConfig( + router, + cqrs.CommandProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { + return "", nil + }, + SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return nil, nil + }, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }, + ) + require.NoError(t, err) + + err = commandProcessor.AddHandlers(handler) assert.IsType(t, cqrs.NonPointerError{}, errors.Cause(err)) } @@ -86,26 +149,303 @@ func TestCommandProcessor_non_pointer_command(t *testing.T) { func TestCommandProcessor_multiple_same_command_handlers(t *testing.T) { ts := NewTestServices() - commandProcessor, err := cqrs.NewCommandProcessor( - []cqrs.CommandHandler{ - &CaptureCommandHandler{}, - &CaptureCommandHandler{}, + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) + require.NoError(t, err) + + commandProcessor, err := cqrs.NewCommandProcessorWithConfig( + router, + cqrs.CommandProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { + return "", nil + }, + SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return nil, nil + }, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }, + ) + require.NoError(t, err) + + err = commandProcessor.AddHandlers( + &CaptureCommandHandler{}, + &CaptureCommandHandler{}, + ) + require.Error(t, err) + assert.EqualValues(t, cqrs.DuplicateCommandHandlerError{CommandName: "cqrs_test.TestCommand"}, err) + assert.Equal(t, "command handler for command cqrs_test.TestCommand already exists", err.Error()) +} + +type mockSubscriber struct { + MessagesToSend []*message.Message + out chan *message.Message +} + +func (m *mockSubscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) { + m.out = make(chan *message.Message) + + go func() { + for _, msg := range m.MessagesToSend { + m.out <- msg + } + }() + + return m.out, nil +} + +func (m mockSubscriber) Close() error { + close(m.out) + return nil +} + +func TestCommandProcessor_AckCommandHandlingErrors_option_true(t *testing.T) { + logger := watermill.NewCaptureLogger() + + marshaler := cqrs.JSONMarshaler{} + + msgToSend, err := marshaler.Marshal(&TestCommand{ID: "1"}) + require.NoError(t, err) + + mockSub := &mockSubscriber{ + MessagesToSend: []*message.Message{ + msgToSend, + }, + } + + router, err := message.NewRouter(message.RouterConfig{}, logger) + require.NoError(t, err) + + commandProcessor, err := cqrs.NewCommandProcessorWithConfig( + router, + cqrs.CommandProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { + return "commands", nil + }, + SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return mockSub, nil + }, + Marshaler: marshaler, + Logger: logger, + AckCommandHandlingErrors: true, + }, + ) + require.NoError(t, err) + + expectedErr := errors.New("test error") + + err = commandProcessor.AddHandlers(cqrs.NewCommandHandler( + "handler", func(ctx context.Context, cmd *TestCommand) error { + return expectedErr + }), + ) + require.NoError(t, err) + + go func() { + err := router.Run(context.Background()) + assert.NoError(t, err) + }() + + <-router.Running() + + select { + case <-msgToSend.Acked(): + // ok + case <-msgToSend.Nacked(): + // nack received + t.Fatal("nack received, message should be acked") + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for ack") + } + + // it's pretty important to not ack message silently, so let's assert if it's logged properly + expectedLogMessage := watermill.CapturedMessage{ + Level: watermill.ErrorLogLevel, + Fields: map[string]any{ + "command_handler_name": "handler", + "topic": "commands", }, - func(commandName string) string { - return "commands" + Msg: "Error when handling command, acking (AckCommandHandlingErrors is enabled)", + Err: expectedErr, + } + assert.True( + t, + logger.Has(expectedLogMessage), + "expected log message not found, logs: %#v", + logger.Captured(), + ) +} + +func TestCommandProcessor_AckCommandHandlingErrors_option_false(t *testing.T) { + logger := watermill.NewCaptureLogger() + + marshaler := cqrs.JSONMarshaler{} + + msgToSend, err := marshaler.Marshal(&TestCommand{ID: "1"}) + require.NoError(t, err) + + mockSub := &mockSubscriber{ + MessagesToSend: []*message.Message{ + msgToSend, }, - func(handlerName string) (message.Subscriber, error) { - return ts.CommandsPubSub, nil + } + + router, err := message.NewRouter(message.RouterConfig{}, logger) + require.NoError(t, err) + + commandProcessor, err := cqrs.NewCommandProcessorWithConfig( + router, + cqrs.CommandProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { + return "commands", nil + }, + SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return mockSub, nil + }, + Marshaler: marshaler, + Logger: logger, + AckCommandHandlingErrors: false, }, - ts.Marshaler, - ts.Logger, ) require.NoError(t, err) + expectedErr := errors.New("test error") + + err = commandProcessor.AddHandlers(cqrs.NewCommandHandler( + "handler", func(ctx context.Context, cmd *TestCommand) error { + return expectedErr + }), + ) + require.NoError(t, err) + + go func() { + err := router.Run(context.Background()) + assert.NoError(t, err) + }() + + <-router.Running() + + select { + case <-msgToSend.Acked(): + // nack received + t.Fatal("ack received, message should be nacked") + case <-msgToSend.Nacked(): + // ok + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for ack") + } +} + +func TestNewCommandProcessor_OnHandle(t *testing.T) { + ts := NewTestServices() + + msg1, err := ts.Marshaler.Marshal(&TestCommand{ID: "1"}) + require.NoError(t, err) + + msg2, err := ts.Marshaler.Marshal(&TestCommand{ID: "2"}) + require.NoError(t, err) + + mockSub := &mockSubscriber{ + MessagesToSend: []*message.Message{ + msg1, + msg2, + }, + } + + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) + require.NoError(t, err) + + handlerCalled := 0 + + defer func() { + // for msg 1 we are not calling handler - but returning before + assert.Equal(t, 1, handlerCalled) + }() + + handler := cqrs.NewCommandHandler("test", func(ctx context.Context, cmd *TestCommand) error { + handlerCalled++ + return nil + }) + + onHandleCalled := 0 + + config := cqrs.CommandProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { + return "commands", nil + }, + SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return mockSub, nil + }, + OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error { + onHandleCalled++ + + assert.IsType(t, &TestCommand{}, params.Command) + assert.Equal(t, "cqrs_test.TestCommand", params.CommandName) + assert.Equal(t, handler, params.Handler) + + if params.Command.(*TestCommand).ID == "1" { + assert.Equal(t, msg1, params.Message) + return errors.New("test error") + } else { + assert.Equal(t, msg2, params.Message) + } + + return params.Handler.Handle(params.Message.Context(), params.Command) + }, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + } + cp, err := cqrs.NewCommandProcessorWithConfig(router, config) + require.NoError(t, err) + + err = cp.AddHandlers(handler) + require.NoError(t, err) + + go func() { + err := router.Run(context.Background()) + assert.NoError(t, err) + }() + + <-router.Running() + + select { + case <-msg1.Nacked(): + // ok + case <-msg1.Acked(): + // ack received + t.Fatal("ack received, message should be nacked") + } + + select { + case <-msg2.Acked(): + // ok + case <-msg2.Nacked(): + // nack received + } + + assert.Equal(t, 2, onHandleCalled) +} + +func TestCommandProcessor_AddHandlersToRouter_without_disableRouterAutoAddHandlers(t *testing.T) { + ts := NewTestServices() + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) require.NoError(t, err) - err = commandProcessor.AddHandlersToRouter(router) - assert.EqualValues(t, cqrs.DuplicateCommandHandlerError{CommandName: "cqrs_test.TestCommand"}, err) - assert.Equal(t, "command handler for command cqrs_test.TestCommand already exists", err.Error()) + cp, err := cqrs.NewCommandProcessorWithConfig( + router, + cqrs.CommandProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { + return "commands", nil + }, + SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return ts.CommandsPubSub, nil + }, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }, + ) + require.NoError(t, err) + + err = cp.AddHandlersToRouter(router) + assert.ErrorContains(t, err, "AddHandlersToRouter should be called only when using deprecated NewCommandProcessor") } diff --git a/components/cqrs/cqrs.go b/components/cqrs/cqrs.go index 2c4bd2a3b..59d077f5b 100644 --- a/components/cqrs/cqrs.go +++ b/components/cqrs/cqrs.go @@ -7,11 +7,12 @@ import ( "github.com/pkg/errors" ) +// Deprecated: use CommandHandler and EventHandler instead. type FacadeConfig struct { // GenerateCommandsTopic generates topic name based on the command name. // Command name is generated by CommandEventMarshaler's Name method. // - // It allows you to use topic per command or one topic for every command. [todo - add to doc] + // It allows you to use topic per command or one topic for every command. GenerateCommandsTopic func(commandName string) string // CommandHandlers return command handlers which should be executed. @@ -28,7 +29,7 @@ type FacadeConfig struct { // GenerateEventsTopic generates topic name based on the event name. // Event name is generated by CommandEventMarshaler's Name method. // - // It allows you to use topic per command or one topic for every command. [todo - add to doc] + // It allows you to use topic per command or one topic for every command. GenerateEventsTopic func(eventName string) string // EventHandlers return event handlers which should be executed. @@ -98,6 +99,8 @@ func (c FacadeConfig) CommandsEnabled() bool { return c.GenerateCommandsTopic != nil || c.CommandsPublisher != nil || c.CommandsSubscriberConstructor != nil } +// Deprecated: use CommandHandler and EventHandler instead. +// // Facade is a facade for creating the Command and Event buses and processors. // It was created to avoid boilerplate, when using CQRS in the standard way. // You can also create buses and processors manually, drawing inspiration from how it's done in NewFacade. @@ -123,6 +126,7 @@ func (f Facade) CommandEventMarshaler() CommandEventMarshaler { return f.commandEventMarshaler } +// Deprecated: use CommandHandler and EventHandler instead. func NewFacade(config FacadeConfig) (*Facade, error) { if err := config.Validate(); err != nil { return nil, errors.Wrap(err, "invalid config") diff --git a/components/cqrs/cqrs_test.go b/components/cqrs/cqrs_test.go index 5a6735f56..2358252fb 100644 --- a/components/cqrs/cqrs_test.go +++ b/components/cqrs/cqrs_test.go @@ -17,41 +17,174 @@ import ( // TestCQRS is functional test of CQRS command handler and event handler. func TestCQRS(t *testing.T) { + testCases := []struct { + Name string + CreateCqrs func(t *testing.T, cc *CaptureCommandHandler, ce *CaptureEventHandler) (*message.Router, *cqrs.CommandBus, *cqrs.EventBus) + }{ + { + // facade is deprecated, testing backwards compatibility + Name: "facade", + CreateCqrs: func(t *testing.T, cc *CaptureCommandHandler, ce *CaptureEventHandler) (*message.Router, *cqrs.CommandBus, *cqrs.EventBus) { + router, cqrsFacade := createRouterAndFacade(t, cc, ce) + return router, cqrsFacade.CommandBus(), cqrsFacade.EventBus() + }, + }, + { + Name: "constructors", + CreateCqrs: createCqrsComponents, + }, + } + for i := range testCases { + tc := testCases[i] + + t.Run(tc.Name, func(t *testing.T) { + captureCommandHandler := &CaptureCommandHandler{} + captureEventHandler := &CaptureEventHandler{} + + router, commandBus, eventBus := tc.CreateCqrs(t, captureCommandHandler, captureEventHandler) + + pointerCmd := &TestCommand{ID: watermill.NewULID()} + require.NoError(t, commandBus.Send(context.Background(), pointerCmd)) + assert.EqualValues(t, []interface{}{pointerCmd}, captureCommandHandler.HandledCommands()) + captureCommandHandler.Reset() + + nonPointerCmd := TestCommand{ID: watermill.NewULID()} + require.NoError(t, commandBus.Send(context.Background(), nonPointerCmd)) + // command is always unmarshaled to pointer value + assert.EqualValues(t, []interface{}{&nonPointerCmd}, captureCommandHandler.HandledCommands()) + captureCommandHandler.Reset() + + pointerEvent := &TestEvent{ID: watermill.NewULID()} + require.NoError(t, eventBus.Publish(context.Background(), pointerEvent)) + assert.EqualValues(t, []interface{}{pointerEvent}, captureEventHandler.HandledEvents()) + captureEventHandler.Reset() + + nonPointerEvent := TestEvent{ID: watermill.NewULID()} + require.NoError(t, eventBus.Publish(context.Background(), nonPointerEvent)) + // event is always unmarshaled to pointer value + assert.EqualValues(t, []interface{}{&nonPointerEvent}, captureEventHandler.HandledEvents()) + captureEventHandler.Reset() + + assert.NoError(t, router.Close()) + }) + } +} + +func createCqrsComponents(t *testing.T, commandHandler *CaptureCommandHandler, eventHandler *CaptureEventHandler) (*message.Router, *cqrs.CommandBus, *cqrs.EventBus) { ts := NewTestServices() - captureCommandHandler := &CaptureCommandHandler{} - captureEventHandler := &CaptureEventHandler{} + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) + require.NoError(t, err) - router, cqrsFacade := createRouterAndFacade(ts, t, captureCommandHandler, captureEventHandler) + eventProcessor, err := cqrs.NewEventProcessorWithConfig( + router, + cqrs.EventProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { + return params.EventName, nil + }, + AckOnUnknownEvent: true, + SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { + assert.Equal(t, "CaptureEventHandler", params.HandlerName) + + assert.Implements(t, new(cqrs.EventHandler), params.EventHandler) + assert.NotNil(t, params.EventHandler) + + return ts.EventsPubSub, nil + }, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }, + ) + require.NoError(t, err) - pointerCmd := &TestCommand{ID: watermill.NewULID()} - require.NoError(t, cqrsFacade.CommandBus().Send(context.Background(), pointerCmd)) - assert.EqualValues(t, []interface{}{pointerCmd}, captureCommandHandler.HandledCommands()) - captureCommandHandler.Reset() + err = eventProcessor.AddHandlers(eventHandler) + require.NoError(t, err) - nonPointerCmd := TestCommand{ID: watermill.NewULID()} - require.NoError(t, cqrsFacade.CommandBus().Send(context.Background(), nonPointerCmd)) - // command is always unmarshaled to pointer value - assert.EqualValues(t, []interface{}{&nonPointerCmd}, captureCommandHandler.HandledCommands()) - captureCommandHandler.Reset() + eventBus, err := cqrs.NewEventBusWithConfig( + ts.EventsPubSub, + cqrs.EventBusConfig{ + GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { + assert.Equal(t, "cqrs_test.TestEvent", params.EventName) + + switch cmd := params.Event.(type) { + case *TestEvent: + assert.NotEmpty(t, cmd.ID) + case TestEvent: + assert.NotEmpty(t, cmd.ID) + default: + assert.Fail(t, "unexpected command type: %T", cmd) + } + + assert.NotEmpty(t, params.Event) + + return params.EventName, nil + }, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }, + ) + require.NoError(t, err) - pointerEvent := &TestEvent{ID: watermill.NewULID()} - require.NoError(t, cqrsFacade.EventBus().Publish(context.Background(), pointerEvent)) - assert.EqualValues(t, []interface{}{pointerEvent}, captureEventHandler.HandledEvents()) - captureEventHandler.Reset() + commandProcessor, err := cqrs.NewCommandProcessorWithConfig( + router, + cqrs.CommandProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { + assert.Equal(t, "cqrs_test.TestCommand", params.CommandName) + + assert.Implements(t, new(cqrs.CommandHandler), params.CommandHandler) + assert.NotNil(t, params.CommandHandler) + + return params.CommandName, nil + }, + SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { + assert.Equal(t, "CaptureCommandHandler", params.HandlerName) + + return ts.CommandsPubSub, nil + }, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + AckCommandHandlingErrors: false, + }, + ) + require.NoError(t, err) - nonPointerEvent := TestEvent{ID: watermill.NewULID()} - require.NoError(t, cqrsFacade.EventBus().Publish(context.Background(), nonPointerEvent)) - // event is always unmarshaled to pointer value - assert.EqualValues(t, []interface{}{&nonPointerEvent}, captureEventHandler.HandledEvents()) - captureEventHandler.Reset() + err = commandProcessor.AddHandlers(commandHandler) + require.NoError(t, err) + + commandBus, err := cqrs.NewCommandBusWithConfig(ts.CommandsPubSub, cqrs.CommandBusConfig{ + GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { + assert.Equal(t, "cqrs_test.TestCommand", params.CommandName) + + switch cmd := params.Command.(type) { + case *TestCommand: + assert.NotEmpty(t, cmd.ID) + case TestCommand: + assert.NotEmpty(t, cmd.ID) + default: + assert.Fail(t, "unexpected command type: %T", cmd) + } - assert.NoError(t, router.Close()) + assert.NotNil(t, params.Command) + + return params.CommandName, nil + }, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }) + require.NoError(t, err) - assert.Equal(t, cqrsFacade.CommandEventMarshaler(), ts.Marshaler) + go func() { + require.NoError(t, router.Run(context.Background())) + }() + + <-router.Running() + + return router, commandBus, eventBus } -func createRouterAndFacade(ts TestServices, t *testing.T, commandHandler *CaptureCommandHandler, eventHandler *CaptureEventHandler) (*message.Router, *cqrs.Facade) { +func createRouterAndFacade(t *testing.T, commandHandler *CaptureCommandHandler, eventHandler *CaptureEventHandler) (*message.Router, *cqrs.Facade) { + ts := NewTestServices() + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) require.NoError(t, err) @@ -102,6 +235,8 @@ func createRouterAndFacade(ts TestServices, t *testing.T, commandHandler *Captur <-router.Running() + assert.Equal(t, c.CommandEventMarshaler(), ts.Marshaler) + return router, c } @@ -163,6 +298,10 @@ type TestEvent struct { When time.Time } +type AnotherTestEvent struct { + ID string +} + type CaptureEventHandler struct { handledEvents []interface{} } diff --git a/components/cqrs/event_bus.go b/components/cqrs/event_bus.go index 5a720ebaa..c3ac50718 100644 --- a/components/cqrs/event_bus.go +++ b/components/cqrs/event_bus.go @@ -2,18 +2,77 @@ package cqrs import ( "context" + stdErrors "errors" + "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/pkg/errors" ) +type EventBusConfig struct { + // GeneratePublishTopic is used to generate topic name for publishing event. + GeneratePublishTopic GenerateEventPublishTopicFn + + // OnPublish is called before sending the event. + // The *message.Message can be modified. + // + // This option is not required. + OnPublish OnEventSendFn + + // Marshaler is used to marshal and unmarshal events. + // It is required. + Marshaler CommandEventMarshaler + + // Logger instance used to log. + // If not provided, watermill.NopLogger is used. + Logger watermill.LoggerAdapter +} + +func (c *EventBusConfig) setDefaults() { + if c.Logger == nil { + c.Logger = watermill.NopLogger{} + } +} + +func (c EventBusConfig) Validate() error { + var err error + + if c.Marshaler == nil { + err = stdErrors.Join(err, errors.New("missing Marshaler")) + } + + if c.GeneratePublishTopic == nil { + err = stdErrors.Join(err, errors.New("missing GenerateHandlerTopic")) + } + + return err +} + +type GenerateEventPublishTopicFn func(GenerateEventPublishTopicParams) (string, error) + +type GenerateEventPublishTopicParams struct { + EventName string + Event any +} + +type OnEventSendFn func(params OnEventSendParams) error + +type OnEventSendParams struct { + EventName string + Event any + + // Message is never nil and can be modified. + Message *message.Message +} + // EventBus transports events to event handlers. type EventBus struct { - publisher message.Publisher - generateTopic func(eventName string) string - marshaler CommandEventMarshaler + publisher message.Publisher + config EventBusConfig } +// NewEventBus creates a new CommandBus. +// Deprecated: use NewEventBusWithConfig instead. func NewEventBus( publisher message.Publisher, generateTopic func(eventName string) string, @@ -29,20 +88,59 @@ func NewEventBus( return nil, errors.New("missing marshaler") } - return &EventBus{publisher, generateTopic, marshaler}, nil + return &EventBus{ + publisher: publisher, + config: EventBusConfig{ + GeneratePublishTopic: func(params GenerateEventPublishTopicParams) (string, error) { + return generateTopic(params.EventName), nil + }, + Marshaler: marshaler, + }, + }, nil +} + +// NewEventBusWithConfig creates a new EventBus. +func NewEventBusWithConfig(publisher message.Publisher, config EventBusConfig) (*EventBus, error) { + if publisher == nil { + return nil, errors.New("missing publisher") + } + + config.setDefaults() + if err := config.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid config") + } + + return &EventBus{publisher, config}, nil } // Publish sends event to the event bus. -func (c EventBus) Publish(ctx context.Context, event interface{}) error { - msg, err := c.marshaler.Marshal(event) +func (c EventBus) Publish(ctx context.Context, event any) error { + msg, err := c.config.Marshaler.Marshal(event) if err != nil { return err } - eventName := c.marshaler.Name(event) - topicName := c.generateTopic(eventName) + eventName := c.config.Marshaler.Name(event) + topicName, err := c.config.GeneratePublishTopic(GenerateEventPublishTopicParams{ + EventName: eventName, + Event: event, + }) + if err != nil { + return errors.Wrap(err, "cannot generate topic") + } msg.SetContext(ctx) + if c.config.OnPublish != nil { + err := c.config.OnPublish(OnEventSendParams{ + EventName: eventName, + Event: event, + Message: msg, + }) + if err != nil { + return errors.Wrap(err, "cannot execute OnPublish") + } + } + return c.publisher.Publish(topicName, msg) } diff --git a/components/cqrs/event_bus_test.go b/components/cqrs/event_bus_test.go index ea5056b27..254262a4e 100644 --- a/components/cqrs/event_bus_test.go +++ b/components/cqrs/event_bus_test.go @@ -2,13 +2,66 @@ package cqrs_test import ( "context" + "fmt" "testing" "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func TestEventBusConfig_Validate(t *testing.T) { + testCases := []struct { + Name string + ModifyValidConfig func(*cqrs.EventBusConfig) + ExpectedErr error + }{ + { + Name: "valid_config", + ModifyValidConfig: nil, + ExpectedErr: nil, + }, + { + Name: "missing_GenerateEventPublishTopic", + ModifyValidConfig: func(config *cqrs.EventBusConfig) { + config.GeneratePublishTopic = nil + }, + ExpectedErr: fmt.Errorf("missing GenerateHandlerTopic"), + }, + { + Name: "missing_marshaler", + ModifyValidConfig: func(config *cqrs.EventBusConfig) { + config.Marshaler = nil + }, + ExpectedErr: fmt.Errorf("missing Marshaler"), + }, + } + for i := range testCases { + tc := testCases[i] + + t.Run(tc.Name, func(t *testing.T) { + validConfig := cqrs.EventBusConfig{ + GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { + return "", nil + }, + Marshaler: cqrs.JSONMarshaler{}, + } + + if tc.ModifyValidConfig != nil { + tc.ModifyValidConfig(&validConfig) + } + + err := validConfig.Validate() + if tc.ExpectedErr == nil { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tc.ExpectedErr.Error()) + } + }) + } +} + func TestNewEventBus(t *testing.T) { pub := newPublisherStub() generateTopic := func(commandName string) string { @@ -66,3 +119,50 @@ func TestEventBus_Send_topic_name(t *testing.T) { err = cb.Publish(context.Background(), TestEvent{}) require.NoError(t, err) } + +func TestEventBus_Send_OnPublish(t *testing.T) { + publisher := newPublisherStub() + + eb, err := cqrs.NewEventBusWithConfig( + publisher, + cqrs.EventBusConfig{ + GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { + return "whatever", nil + }, + Marshaler: cqrs.JSONMarshaler{}, + OnPublish: func(params cqrs.OnEventSendParams) error { + params.Message.Metadata.Set("key", "value") + return nil + }, + }, + ) + require.NoError(t, err) + + err = eb.Publish(context.Background(), TestEvent{}) + require.NoError(t, err) + + assert.Equal(t, "value", publisher.messages["whatever"][0].Metadata.Get("key")) +} + +func TestEventBus_Send_OnPublish_error(t *testing.T) { + publisher := newPublisherStub() + + expectedErr := errors.New("some error") + + eb, err := cqrs.NewEventBusWithConfig( + publisher, + cqrs.EventBusConfig{ + GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { + return "whatever", nil + }, + Marshaler: cqrs.JSONMarshaler{}, + OnPublish: func(params cqrs.OnEventSendParams) error { + return expectedErr + }, + }, + ) + require.NoError(t, err) + + err = eb.Publish(context.Background(), TestEvent{}) + require.EqualError(t, err, "cannot execute OnPublish: some error") +} diff --git a/components/cqrs/event_handler.go b/components/cqrs/event_handler.go new file mode 100644 index 000000000..50aade11c --- /dev/null +++ b/components/cqrs/event_handler.go @@ -0,0 +1,70 @@ +package cqrs + +import ( + "context" +) + +// EventHandler receives events defined by NewEvent and handles them with its Handle method. +// If using DDD, CommandHandler may modify and persist the aggregate. +// It can also invoke a process manager, a saga or just build a read model. +// +// In contrast to CommandHandler, every Event can have multiple EventHandlers. +// +// One instance of EventHandler is used during handling messages. +// When multiple events are delivered at the same time, Handle method can be executed multiple times at the same time. +// Because of that, Handle method needs to be thread safe! +type EventHandler interface { + // HandlerName is the name used in message.Router while creating handler. + // + // It will be also passed to EventsSubscriberConstructor. + // May be useful, for example, to create a consumer group per each handler. + // + // WARNING: If HandlerName was changed and is used for generating consumer groups, + // it may result with **reconsuming all messages** !!! + HandlerName() string + + NewEvent() any + + Handle(ctx context.Context, event any) error +} + +type genericEventHandler[T any] struct { + handleFunc func(ctx context.Context, event *T) error + handlerName string +} + +// NewEventHandler creates a new EventHandler implementation based on provided function +// and event type inferred from function argument. +func NewEventHandler[T any](handlerName string, handleFunc func(ctx context.Context, event *T) error) EventHandler { + return &genericEventHandler[T]{ + handleFunc: handleFunc, + handlerName: handlerName, + } +} + +func (c genericEventHandler[T]) HandlerName() string { + return c.handlerName +} + +func (c genericEventHandler[T]) NewEvent() any { + tVar := new(T) + return tVar +} + +func (c genericEventHandler[T]) Handle(ctx context.Context, e any) error { + event := e.(*T) + return c.handleFunc(ctx, event) +} + +type GroupEventHandler interface { + NewEvent() interface{} + Handle(ctx context.Context, event interface{}) error +} + +// NewGroupEventHandler creates a new GroupEventHandler implementation based on provided function +// and event type inferred from function argument. +func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler { + return &genericEventHandler[T]{ + handleFunc: handleFunc, + } +} diff --git a/components/cqrs/event_handler_test.go b/components/cqrs/event_handler_test.go new file mode 100644 index 000000000..0cbc6f1df --- /dev/null +++ b/components/cqrs/event_handler_test.go @@ -0,0 +1,48 @@ +package cqrs_test + +import ( + "context" + "fmt" + "testing" + + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/stretchr/testify/assert" +) + +type SomeEvent struct { + Foo string +} + +func TestNewEventHandler(t *testing.T) { + cmdToSend := &SomeEvent{"bar"} + + ch := cqrs.NewEventHandler( + "some_handler", + func(ctx context.Context, cmd *SomeEvent) error { + assert.Equal(t, cmdToSend, cmd) + return fmt.Errorf("some error") + }, + ) + + assert.Equal(t, "some_handler", ch.HandlerName()) + assert.Equal(t, &SomeEvent{}, ch.NewEvent()) + + err := ch.Handle(context.Background(), cmdToSend) + assert.EqualError(t, err, "some error") +} + +func TestNewGroupEventHandler(t *testing.T) { + cmdToSend := &SomeEvent{"bar"} + + ch := cqrs.NewGroupEventHandler( + func(ctx context.Context, cmd *SomeEvent) error { + assert.Equal(t, cmdToSend, cmd) + return fmt.Errorf("some error") + }, + ) + + assert.Equal(t, &SomeEvent{}, ch.NewEvent()) + + err := ch.Handle(context.Background(), cmdToSend) + assert.EqualError(t, err, "some error") +} diff --git a/components/cqrs/event_processor.go b/components/cqrs/event_processor.go index 85e90e3da..4061c308e 100644 --- a/components/cqrs/event_processor.go +++ b/components/cqrs/event_processor.go @@ -1,7 +1,8 @@ package cqrs import ( - "context" + stdErrors "errors" + "fmt" "github.com/pkg/errors" @@ -9,53 +10,137 @@ import ( "github.com/ThreeDotsLabs/watermill/message" ) -// EventHandler receives events defined by NewEvent and handles them with its Handle method. -// If using DDD, CommandHandler may modify and persist the aggregate. -// It can also invoke a process manager, a saga or just build a read model. -// -// In contrast to CommandHandler, every Event can have multiple EventHandlers. -// -// One instance of EventHandler is used during handling messages. -// When multiple events are delivered at the same time, Handle method can be executed multiple times at the same time. -// Because of that, Handle method needs to be thread safe! -type EventHandler interface { - // HandlerName is the name used in message.Router while creating handler. +type EventProcessorConfig struct { + // GenerateSubscribeTopic is used to generate topic for subscribing to events. + // If event processor is using handler groups, GenerateSubscribeTopic is used instead. + GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn + + // SubscriberConstructor is used to create subscriber for EventHandler. + // + // This function is called for every EventHandler instance. + // If you want to re-use one subscriber for multiple handlers, use GroupEventProcessor instead. + SubscriberConstructor EventProcessorSubscriberConstructorFn + + // OnHandle is called before handling event. + // OnHandle works in a similar way to middlewares: you can inject additional logic before and after handling a event. + // + // Because of that, you need to explicitly call params.Handler.Handle() to handle the event. + // + // func(params EventProcessorOnHandleParams) (err error) { + // // logic before handle + // // (...) // - // It will be also passed to EventsSubscriberConstructor. - // May be useful, for example, to create a consumer group per each handler. + // err := params.Handler.Handle(params.Message.Context(), params.Event) // - // WARNING: If HandlerName was changed and is used for generating consumer groups, - // it may result with **reconsuming all messages** !!! - HandlerName() string + // // logic after handle + // // (...) + // + // return err + // } + // + // This option is not required. + OnHandle EventProcessorOnHandleFn + + // AckOnUnknownEvent is used to decide if message should be acked if event has no handler defined. + AckOnUnknownEvent bool - NewEvent() interface{} + // Marshaler is used to marshal and unmarshal events. + // It is required. + Marshaler CommandEventMarshaler - Handle(ctx context.Context, event interface{}) error + // Logger instance used to log. + // If not provided, watermill.NopLogger is used. + Logger watermill.LoggerAdapter + + // disableRouterAutoAddHandlers is used to keep backwards compatibility. + // it is set when EventProcessor is created by NewEventProcessor. + // Deprecated: please migrate to NewEventProcessorWithConfig. + disableRouterAutoAddHandlers bool } -// EventsSubscriberConstructor creates a subscriber for EventHandler. -// It allows you to create separated customized Subscriber for every command handler. -type EventsSubscriberConstructor func(handlerName string) (message.Subscriber, error) +func (c *EventProcessorConfig) setDefaults() { + if c.Logger == nil { + c.Logger = watermill.NopLogger{} + } +} + +func (c EventProcessorConfig) Validate() error { + var err error + + if c.Marshaler == nil { + err = stdErrors.Join(err, errors.New("missing Marshaler")) + } + + if c.GenerateSubscribeTopic == nil { + err = stdErrors.Join(err, errors.New("missing GenerateHandlerTopic")) + } + if c.SubscriberConstructor == nil { + err = stdErrors.Join(err, errors.New("missing SubscriberConstructor")) + } + + return err +} + +type EventProcessorGenerateSubscribeTopicFn func(EventProcessorGenerateSubscribeTopicParams) (string, error) + +type EventProcessorGenerateSubscribeTopicParams struct { + EventName string + EventHandler EventHandler +} + +type EventProcessorSubscriberConstructorFn func(EventProcessorSubscriberConstructorParams) (message.Subscriber, error) + +type EventProcessorSubscriberConstructorParams struct { + HandlerName string + EventHandler EventHandler +} + +type EventProcessorOnHandleFn func(params EventProcessorOnHandleParams) error + +type EventProcessorOnHandleParams struct { + Handler EventHandler + + Event any + EventName string + + // Message is never nil and can be modified. + Message *message.Message +} // EventProcessor determines which EventHandler should handle event received from event bus. type EventProcessor struct { - handlers []EventHandler - generateTopic func(eventName string) string + router *message.Router + handlers []EventHandler + config EventProcessorConfig +} - subscriberConstructor EventsSubscriberConstructor +// NewEventProcessorWithConfig creates a new EventProcessor. +func NewEventProcessorWithConfig(router *message.Router, config EventProcessorConfig) (*EventProcessor, error) { + config.setDefaults() - marshaler CommandEventMarshaler - logger watermill.LoggerAdapter + if err := config.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid config EventProcessor") + } + if router == nil && !config.disableRouterAutoAddHandlers { + return nil, errors.New("missing router") + } + + return &EventProcessor{ + router: router, + config: config, + }, nil } +// NewEventProcessor creates a new EventProcessor. +// Deprecated. Use NewEventProcessorWithConfig instead. func NewEventProcessor( - handlers []EventHandler, + individualHandlers []EventHandler, generateTopic func(eventName string) string, subscriberConstructor EventsSubscriberConstructor, marshaler CommandEventMarshaler, logger watermill.LoggerAdapter, ) (*EventProcessor, error) { - if len(handlers) == 0 { + if len(individualHandlers) == 0 { return nil, errors.New("missing handlers") } if generateTopic == nil { @@ -71,45 +156,120 @@ func NewEventProcessor( logger = watermill.NopLogger{} } - return &EventProcessor{ - handlers, - generateTopic, - subscriberConstructor, - marshaler, - logger, - }, nil + eventProcessorConfig := EventProcessorConfig{ + AckOnUnknownEvent: true, // this is the previous default behaviour - keeping backwards compatibility + GenerateSubscribeTopic: func(params EventProcessorGenerateSubscribeTopicParams) (string, error) { + return generateTopic(params.EventName), nil + }, + SubscriberConstructor: func(params EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return subscriberConstructor(params.HandlerName) + }, + Marshaler: marshaler, + Logger: logger, + disableRouterAutoAddHandlers: true, + } + eventProcessorConfig.setDefaults() + + ep, err := NewEventProcessorWithConfig(nil, eventProcessorConfig) + if err != nil { + return nil, err + } + + for _, handler := range individualHandlers { + if err := ep.AddHandlers(handler); err != nil { + return nil, err + } + } + + return ep, nil } -func (p EventProcessor) AddHandlersToRouter(r *message.Router) error { - for i := range p.Handlers() { - handler := p.handlers[i] - handlerName := handler.HandlerName() - eventName := p.marshaler.Name(handler.NewEvent()) - topicName := p.generateTopic(eventName) +// EventsSubscriberConstructor creates a subscriber for EventHandler. +// It allows you to create separated customized Subscriber for every command handler. +// +// When handler groups are used, handler group is passed as handlerName. +// Deprecated: please use EventProcessorSubscriberConstructorFn instead. +type EventsSubscriberConstructor func(handlerName string) (message.Subscriber, error) - logger := p.logger.With(watermill.LogFields{ - "event_handler_name": handlerName, - "topic": topicName, - }) +// AddHandlers adds a new EventHandler to the EventProcessor and adds it to the router. +func (p *EventProcessor) AddHandlers(handlers ...EventHandler) error { + if p.config.disableRouterAutoAddHandlers { + p.handlers = append(p.handlers, handlers...) + return nil + } - handlerFunc, err := p.routerHandlerFunc(handler, logger) - if err != nil { + for _, handler := range handlers { + if err := p.addHandlerToRouter(p.router, handler); err != nil { return err } - logger.Debug("Adding CQRS event handler to router", nil) + p.handlers = append(p.handlers, handler) + } - subscriber, err := p.subscriberConstructor(handlerName) - if err != nil { - return errors.Wrap(err, "cannot create subscriber for event processor") + return nil +} + +// AddHandlersToRouter adds the EventProcessor's handlers to the given router. +// It should be called only once per EventProcessor instance. +// +// It is required to call AddHandlersToRouter only if command processor is created with NewEventProcessor (disableRouterAutoAddHandlers is set to true). +// Deprecated: please migrate to event processor created by NewEventProcessorWithConfig. +func (p EventProcessor) AddHandlersToRouter(r *message.Router) error { + if !p.config.disableRouterAutoAddHandlers { + return errors.New("AddHandlersToRouter should be called only when using deprecated NewEventProcessor") + } + + for i := range p.handlers { + handler := p.handlers[i] + + if err := p.addHandlerToRouter(r, handler); err != nil { + return err } + } + + return nil +} - r.AddNoPublisherHandler( - handlerName, - topicName, - subscriber, - handlerFunc, - ) +func (p EventProcessor) addHandlerToRouter(r *message.Router, handler EventHandler) error { + if err := validateEvent(handler.NewEvent()); err != nil { + return errors.Wrapf(err, "invalid event for handler %s", handler.HandlerName()) + } + + handlerName := handler.HandlerName() + eventName := p.config.Marshaler.Name(handler.NewEvent()) + + topicName, err := p.config.GenerateSubscribeTopic(EventProcessorGenerateSubscribeTopicParams{ + EventName: eventName, + EventHandler: handler, + }) + if err != nil { + return errors.Wrapf(err, "cannot generate topic name for handler %s", handlerName) + } + + logger := p.config.Logger.With(watermill.LogFields{ + "event_handler_name": handlerName, + "topic": topicName, + }) + + handlerFunc, err := p.routerHandlerFunc(handler, logger) + if err != nil { + return err + } + + if p.config.SubscriberConstructor == nil { + return errors.New("missing SubscriberConstructor config option") + } + + subscriber, err := p.config.SubscriberConstructor(EventProcessorSubscriberConstructorParams{ + HandlerName: handlerName, + EventHandler: handler, + }) + if err != nil { + return errors.Wrap(err, "cannot create subscriber for event processor") + } + + if err := addHandlerToRouter(p.config.Logger, r, handlerName, topicName, handlerFunc, subscriber); err != nil { + return err } return nil @@ -119,25 +279,47 @@ func (p EventProcessor) Handlers() []EventHandler { return p.handlers } +func addHandlerToRouter(logger watermill.LoggerAdapter, r *message.Router, handlerName string, topicName string, handlerFunc message.NoPublishHandlerFunc, subscriber message.Subscriber) error { + logger = logger.With(watermill.LogFields{ + "event_handler_name": handlerName, + "topic": topicName, + }) + + logger.Debug("Adding CQRS event handler to router", nil) + + r.AddNoPublisherHandler( + handlerName, + topicName, + subscriber, + handlerFunc, + ) + + return nil +} + func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill.LoggerAdapter) (message.NoPublishHandlerFunc, error) { initEvent := handler.NewEvent() - expectedEventName := p.marshaler.Name(initEvent) + expectedEventName := p.config.Marshaler.Name(initEvent) - if err := p.validateEvent(initEvent); err != nil { + if err := validateEvent(initEvent); err != nil { return nil, err } return func(msg *message.Message) error { event := handler.NewEvent() - messageEventName := p.marshaler.NameFromMessage(msg) + messageEventName := p.config.Marshaler.NameFromMessage(msg) if messageEventName != expectedEventName { - logger.Trace("Received different event type than expected, ignoring", watermill.LogFields{ - "message_uuid": msg.UUID, - "expected_event_type": expectedEventName, - "received_event_type": messageEventName, - }) - return nil + if !p.config.AckOnUnknownEvent { + return fmt.Errorf("received unexpected event type %s, expected %s", messageEventName, expectedEventName) + } else { + logger.Trace("Received different event type than expected, ignoring", watermill.LogFields{ + "message_uuid": msg.UUID, + "expected_event_type": expectedEventName, + "received_event_type": messageEventName, + }) + return nil + } } logger.Debug("Handling event", watermill.LogFields{ @@ -145,11 +327,24 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill "received_event_type": messageEventName, }) - if err := p.marshaler.Unmarshal(msg, event); err != nil { + if err := p.config.Marshaler.Unmarshal(msg, event); err != nil { return err } - if err := handler.Handle(msg.Context(), event); err != nil { + handle := func(params EventProcessorOnHandleParams) error { + return params.Handler.Handle(params.Message.Context(), params.Event) + } + if p.config.OnHandle != nil { + handle = p.config.OnHandle + } + + err := handle(EventProcessorOnHandleParams{ + Handler: handler, + Event: event, + EventName: messageEventName, + Message: msg, + }) + if err != nil { logger.Debug("Error when handling event", watermill.LogFields{"err": err}) return err } @@ -158,7 +353,7 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill }, nil } -func (p EventProcessor) validateEvent(event interface{}) error { +func validateEvent(event interface{}) error { // EventHandler's NewEvent must return a pointer, because it is used to unmarshal if err := isPointer(event); err != nil { return errors.Wrap(err, "command must be a non-nil pointer") diff --git a/components/cqrs/event_processor_group.go b/components/cqrs/event_processor_group.go new file mode 100644 index 000000000..af7912b6d --- /dev/null +++ b/components/cqrs/event_processor_group.go @@ -0,0 +1,262 @@ +package cqrs + +import ( + stdErrors "errors" + "fmt" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/pkg/errors" +) + +type EventGroupProcessorConfig struct { + // GenerateSubscribeTopic is used to generate topic for subscribing to events for handler groups. + // This option is required for EventProcessor if handler groups are used. + GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn + + // SubscriberConstructor is used to create subscriber for GroupEventHandler. + // This function is called for every events group once - thanks to that it's possible to have one subscription per group. + // It's useful, when we are processing events from one stream and we want to do it in order. + SubscriberConstructor EventGroupProcessorSubscriberConstructorFn + + // OnHandle is called before handling event. + // OnHandle works in a similar way to middlewares: you can inject additional logic before and after handling a event. + // + // Because of that, you need to explicitly call params.Handler.Handle() to handle the event. + // + // func(params EventGroupProcessorOnHandleParams) (err error) { + // // logic before handle + // // (...) + // + // err := params.Handler.Handle(params.Message.Context(), params.Event) + // + // // logic after handle + // // (...) + // + // return err + // } + // + // This option is not required. + OnHandle EventGroupProcessorOnHandleFn + + // AckOnUnknownEvent is used to decide if message should be acked if event has no handler defined. + AckOnUnknownEvent bool + + // Marshaler is used to marshal and unmarshal events. + // It is required. + Marshaler CommandEventMarshaler + + // Logger instance used to log. + // If not provided, watermill.NopLogger is used. + Logger watermill.LoggerAdapter +} + +func (c *EventGroupProcessorConfig) setDefaults() { + if c.Logger == nil { + c.Logger = watermill.NopLogger{} + } +} + +func (c EventGroupProcessorConfig) Validate() error { + var err error + + if c.Marshaler == nil { + err = stdErrors.Join(err, errors.New("missing Marshaler")) + } + + if c.GenerateSubscribeTopic == nil { + err = stdErrors.Join(err, errors.New("missing GenerateHandlerGroupTopic")) + } + if c.SubscriberConstructor == nil { + err = stdErrors.Join(err, errors.New("missing SubscriberConstructor")) + } + + return err +} + +type EventGroupProcessorGenerateSubscribeTopicFn func(EventGroupProcessorGenerateSubscribeTopicParams) (string, error) + +type EventGroupProcessorGenerateSubscribeTopicParams struct { + EventGroupName string + EventGroupHandlers []GroupEventHandler +} + +type EventGroupProcessorSubscriberConstructorFn func(EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) + +type EventGroupProcessorSubscriberConstructorParams struct { + EventGroupName string + EventGroupHandlers []GroupEventHandler +} + +type EventGroupProcessorOnHandleFn func(params EventGroupProcessorOnHandleParams) error + +type EventGroupProcessorOnHandleParams struct { + GroupName string + Handler GroupEventHandler + + Event any + EventName string + + // Message is never nil and can be modified. + Message *message.Message +} + +// EventGroupProcessor determines which EventHandler should handle event received from event bus. +// Compared to EventProcessor, EventGroupProcessor allows to have multiple handlers that share the same subscriber instance. +type EventGroupProcessor struct { + router *message.Router + + groupEventHandlers map[string][]GroupEventHandler + + config EventGroupProcessorConfig +} + +// NewEventGroupProcessorWithConfig creates a new EventGroupProcessor. +func NewEventGroupProcessorWithConfig(router *message.Router, config EventGroupProcessorConfig) (*EventGroupProcessor, error) { + config.setDefaults() + + if err := config.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid config EventProcessor") + } + if router == nil { + return nil, errors.New("missing router") + } + + return &EventGroupProcessor{ + router: router, + groupEventHandlers: map[string][]GroupEventHandler{}, + config: config, + }, nil +} + +// AddHandlersGroup adds a new list of GroupEventHandler to the EventGroupProcessor and adds it to the router. +// +// Compared to AddHandlers, AddHandlersGroup allows to have multiple handlers that share the same subscriber instance. +// +// Handlers group needs to be unique within the EventProcessor instance. +// +// Handler group name is used as handler's name in router. +func (p *EventGroupProcessor) AddHandlersGroup(groupName string, handlers ...GroupEventHandler) error { + if len(handlers) == 0 { + return errors.New("no handlers provided") + } + if _, ok := p.groupEventHandlers[groupName]; ok { + return fmt.Errorf("event handler group '%s' already exists", groupName) + } + + if err := p.addHandlerToRouter(p.router, groupName, handlers); err != nil { + return err + } + + p.groupEventHandlers[groupName] = handlers + + return nil +} + +func (p EventGroupProcessor) addHandlerToRouter(r *message.Router, groupName string, handlersGroup []GroupEventHandler) error { + for i, handler := range handlersGroup { + if err := validateEvent(handler.NewEvent()); err != nil { + return errors.Wrapf( + err, + "invalid event for handler %T (num %d) in group %s", + handler, + i, + groupName, + ) + } + } + + topicName, err := p.config.GenerateSubscribeTopic(EventGroupProcessorGenerateSubscribeTopicParams{ + EventGroupName: groupName, + EventGroupHandlers: handlersGroup, + }) + if err != nil { + return errors.Wrapf(err, "cannot generate topic name for handler group %s", groupName) + } + + logger := p.config.Logger.With(watermill.LogFields{ + "event_handler_group_name": groupName, + "topic": topicName, + }) + + handlerFunc, err := p.routerHandlerGroupFunc(handlersGroup, groupName, logger) + if err != nil { + return err + } + + subscriber, err := p.config.SubscriberConstructor(EventGroupProcessorSubscriberConstructorParams{ + EventGroupName: groupName, + EventGroupHandlers: handlersGroup, + }) + if err != nil { + return errors.Wrap(err, "cannot create subscriber for event processor") + } + + if err := addHandlerToRouter(p.config.Logger, r, groupName, topicName, handlerFunc, subscriber); err != nil { + return err + } + + return nil +} + +func (p EventGroupProcessor) routerHandlerGroupFunc(handlers []GroupEventHandler, groupName string, logger watermill.LoggerAdapter) (message.NoPublishHandlerFunc, error) { + return func(msg *message.Message) error { + messageEventName := p.config.Marshaler.NameFromMessage(msg) + + for _, handler := range handlers { + initEvent := handler.NewEvent() + expectedEventName := p.config.Marshaler.Name(initEvent) + + event := handler.NewEvent() + + if messageEventName != expectedEventName { + logger.Trace("Received different event type than expected, ignoring", watermill.LogFields{ + "message_uuid": msg.UUID, + "expected_event_type": expectedEventName, + "received_event_type": messageEventName, + }) + continue + } + + logger.Debug("Handling event", watermill.LogFields{ + "message_uuid": msg.UUID, + "received_event_type": messageEventName, + }) + + if err := p.config.Marshaler.Unmarshal(msg, event); err != nil { + return err + } + + handle := func(params EventGroupProcessorOnHandleParams) error { + return params.Handler.Handle(params.Message.Context(), params.Event) + } + if p.config.OnHandle != nil { + handle = p.config.OnHandle + } + + err := handle(EventGroupProcessorOnHandleParams{ + GroupName: groupName, + Handler: handler, + EventName: messageEventName, + Event: event, + Message: msg, + }) + if err != nil { + logger.Debug("Error when handling event", watermill.LogFields{"err": err}) + return err + } + + return nil + } + + if !p.config.AckOnUnknownEvent { + return fmt.Errorf("no handler found for event %s", p.config.Marshaler.NameFromMessage(msg)) + } else { + logger.Trace("Received event can't be handled by any handler in handler group", watermill.LogFields{ + "message_uuid": msg.UUID, + "received_event_type": messageEventName, + }) + return nil + } + }, nil +} diff --git a/components/cqrs/event_processor_group_test.go b/components/cqrs/event_processor_group_test.go new file mode 100644 index 000000000..635ad8420 --- /dev/null +++ b/components/cqrs/event_processor_group_test.go @@ -0,0 +1,387 @@ +package cqrs_test + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventGroupProcessorConfig_Validate(t *testing.T) { + testCases := []struct { + Name string + ModifyValidConfig func(*cqrs.EventGroupProcessorConfig) + ExpectedErr error + }{ + { + Name: "valid_config", + ModifyValidConfig: nil, + ExpectedErr: nil, + }, + { + Name: "valid_with_group_handlers", + ExpectedErr: nil, + }, + { + Name: "missing_GroupSubscriberConstructor", + ModifyValidConfig: func(config *cqrs.EventGroupProcessorConfig) { + config.SubscriberConstructor = nil + }, + ExpectedErr: fmt.Errorf("missing SubscriberConstructor"), + }, + { + Name: "missing_GenerateHandlerGroupSubscribeTopic", + ModifyValidConfig: func(config *cqrs.EventGroupProcessorConfig) { + config.GenerateSubscribeTopic = nil + }, + ExpectedErr: fmt.Errorf("missing GenerateHandlerGroupTopic"), + }, + { + Name: "missing_marshaler", + ModifyValidConfig: func(config *cqrs.EventGroupProcessorConfig) { + config.Marshaler = nil + }, + ExpectedErr: fmt.Errorf("missing Marshaler"), + }, + } + for i := range testCases { + tc := testCases[i] + + t.Run(tc.Name, func(t *testing.T) { + validConfig := cqrs.EventGroupProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) { + return "", nil + }, + SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return nil, nil + }, + Marshaler: cqrs.JSONMarshaler{}, + } + + if tc.ModifyValidConfig != nil { + tc.ModifyValidConfig(&validConfig) + } + + err := validConfig.Validate() + if tc.ExpectedErr == nil { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tc.ExpectedErr.Error()) + } + }) + } +} + +func TestNewEventProcessor_OnGroupHandle(t *testing.T) { + ts := NewTestServices() + + msg1, err := ts.Marshaler.Marshal(&TestEvent{ID: "1"}) + require.NoError(t, err) + + msg2, err := ts.Marshaler.Marshal(&TestEvent{ID: "2"}) + require.NoError(t, err) + + mockSub := &mockSubscriber{ + MessagesToSend: []*message.Message{ + msg1, + msg2, + }, + } + + handlerCalled := 0 + + defer func() { + // for msg 1 we are not calling handler - but returning before + assert.Equal(t, 1, handlerCalled) + }() + + handler := cqrs.NewEventHandler("test", func(ctx context.Context, cmd *TestEvent) error { + handlerCalled++ + return nil + }) + + onHandleCalled := int64(0) + + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) + require.NoError(t, err) + + config := cqrs.EventGroupProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) { + return "events", nil + }, + SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return mockSub, nil + }, + OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error { + atomic.AddInt64(&onHandleCalled, 1) + + assert.Equal(t, "some_group", params.GroupName) + + assert.IsType(t, &TestEvent{}, params.Event) + assert.Equal(t, "cqrs_test.TestEvent", params.EventName) + assert.Equal(t, handler, params.Handler) + + if params.Event.(*TestEvent).ID == "1" { + assert.Equal(t, msg1, params.Message) + return errors.New("test error") + } else { + assert.Equal(t, msg2, params.Message) + } + + return params.Handler.Handle(params.Message.Context(), params.Event) + }, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + } + cp, err := cqrs.NewEventGroupProcessorWithConfig(router, config) + require.NoError(t, err) + + err = cp.AddHandlersGroup("some_group", handler) + require.NoError(t, err) + + go func() { + err := router.Run(context.Background()) + assert.NoError(t, err) + }() + + <-router.Running() + + select { + case <-msg1.Nacked(): + // ok + case <-msg1.Acked(): + // ack received + t.Fatal("ack received, message should be nacked") + } + + select { + case <-msg2.Acked(): + // ok + case <-msg2.Nacked(): + // nack received + } + + assert.EqualValues(t, 2, onHandleCalled) +} + +func TestNewEventProcessor_AckOnUnknownEvent_handler_group(t *testing.T) { + ts := NewTestServices() + + msg, err := ts.Marshaler.Marshal(&UnknownEvent{}) + require.NoError(t, err) + + mockSub := &mockSubscriber{ + MessagesToSend: []*message.Message{ + msg, + }, + } + + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) + require.NoError(t, err) + + cp, err := cqrs.NewEventGroupProcessorWithConfig( + router, + cqrs.EventGroupProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) { + return "events", nil + }, + SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return mockSub, nil + }, + AckOnUnknownEvent: true, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }, + ) + require.NoError(t, err) + + err = cp.AddHandlersGroup( + "foo", + cqrs.NewEventHandler("test", func(ctx context.Context, cmd *TestEvent) error { + return nil + }), + ) + require.NoError(t, err) + + go func() { + err := router.Run(context.Background()) + assert.NoError(t, err) + }() + + <-router.Running() + + select { + case <-msg.Acked(): + // ok + case <-msg.Nacked(): + // ack received + t.Fatal("ack received, message should be nacked") + } +} + +func TestNewEventProcessor_AckOnUnknownEvent_disabled_handler_group(t *testing.T) { + ts := NewTestServices() + + msg, err := ts.Marshaler.Marshal(&UnknownEvent{}) + require.NoError(t, err) + + mockSub := &mockSubscriber{ + MessagesToSend: []*message.Message{ + msg, + }, + } + + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) + require.NoError(t, err) + + cp, err := cqrs.NewEventGroupProcessorWithConfig( + router, + cqrs.EventGroupProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) { + return "events", nil + }, + SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return mockSub, nil + }, + AckOnUnknownEvent: false, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }, + ) + require.NoError(t, err) + + err = cp.AddHandlersGroup( + "foo", + cqrs.NewEventHandler("test", func(ctx context.Context, cmd *TestEvent) error { + return nil + }), + ) + require.NoError(t, err) + + go func() { + err := router.Run(context.Background()) + assert.NoError(t, err) + }() + + <-router.Running() + + select { + case <-msg.Nacked(): + // ok + case <-msg.Acked(): + t.Fatal("ack received, message should be nacked") + } +} + +func TestEventProcessor_handler_group(t *testing.T) { + ts := NewTestServices() + + event1 := &TestEvent{ID: "1"} + + msg1, err := ts.Marshaler.Marshal(event1) + require.NoError(t, err) + + event2 := &AnotherTestEvent{ID: "2"} + + msg2, err := ts.Marshaler.Marshal(event2) + require.NoError(t, err) + + mockSub := &mockSubscriber{ + MessagesToSend: []*message.Message{ + msg1, + msg2, + }, + } + + handler1Calls := 0 + handler2Calls := 0 + + handlers := []cqrs.GroupEventHandler{ + cqrs.NewGroupEventHandler(func(ctx context.Context, event *TestEvent) error { + assert.EqualValues(t, event1, event) + + handler1Calls++ + + return nil + }), + cqrs.NewGroupEventHandler(func(ctx context.Context, event *AnotherTestEvent) error { + assert.EqualValues(t, event2, event) + + handler2Calls++ + + return nil + }), + } + + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) + require.NoError(t, err) + + eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig( + router, + cqrs.EventGroupProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) { + assert.Equal(t, "some_group", params.EventGroupName) + assert.Equal(t, handlers, params.EventGroupHandlers) + + return "events", nil + }, + SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) { + assert.Equal(t, "some_group", params.EventGroupName) + assert.Equal(t, handlers, params.EventGroupHandlers) + + return mockSub, nil + }, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }, + ) + require.NoError(t, err) + + err = eventProcessor.AddHandlersGroup( + "some_group", + handlers..., + ) + require.NoError(t, err) + + err = eventProcessor.AddHandlersGroup( + "some_group", + handlers..., + ) + require.ErrorContains(t, err, "event handler group 'some_group' already exists") + + err = eventProcessor.AddHandlersGroup( + "some_group_2", + ) + require.ErrorContains(t, err, "no handlers provided") + + go func() { + err := router.Run(context.Background()) + assert.NoError(t, err) + }() + + <-router.Running() + + select { + case <-msg1.Acked(): + // ok + case <-time.After(time.Second): + t.Fatal("message 1 not acked") + } + + select { + case <-msg2.Acked(): + // ok + case <-time.After(time.Second): + t.Fatal("message 2 not acked") + } + + assert.Equal(t, 1, handler1Calls) + assert.Equal(t, 1, handler2Calls) +} diff --git a/components/cqrs/event_processor_test.go b/components/cqrs/event_processor_test.go index 63462f414..648d9028c 100644 --- a/components/cqrs/event_processor_test.go +++ b/components/cqrs/event_processor_test.go @@ -2,6 +2,8 @@ package cqrs_test import ( "context" + "fmt" + "sync/atomic" "testing" "github.com/pkg/errors" @@ -13,33 +15,90 @@ import ( "github.com/stretchr/testify/require" ) -func TestNewEventProcessor(t *testing.T) { - handlers := []cqrs.EventHandler{nonPointerEventProcessor{}} +func TestEventProcessorConfig_Validate(t *testing.T) { + testCases := []struct { + Name string + ModifyValidConfig func(*cqrs.EventProcessorConfig) + ExpectedErr error + }{ + { + Name: "valid_config", + ModifyValidConfig: nil, + ExpectedErr: nil, + }, + { + Name: "missing_GenerateHandlerSubscribeTopic", + ModifyValidConfig: func(config *cqrs.EventProcessorConfig) { + config.GenerateSubscribeTopic = nil + }, + ExpectedErr: fmt.Errorf("missing GenerateHandlerTopic"), + }, + { + Name: "missing_marshaler", + ModifyValidConfig: func(config *cqrs.EventProcessorConfig) { + config.Marshaler = nil + }, + ExpectedErr: fmt.Errorf("missing Marshaler"), + }, + { + Name: "missing_subscriber_constructor", + ModifyValidConfig: func(config *cqrs.EventProcessorConfig) { + config.SubscriberConstructor = nil + }, + ExpectedErr: fmt.Errorf("missing SubscriberConstructor"), + }, + } + for i := range testCases { + tc := testCases[i] + + t.Run(tc.Name, func(t *testing.T) { + validConfig := cqrs.EventProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { + return "", nil + }, + SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return nil, nil + }, + Marshaler: cqrs.JSONMarshaler{}, + } + + if tc.ModifyValidConfig != nil { + tc.ModifyValidConfig(&validConfig) + } - generateTopic := func(commandName string) string { - return "" + err := validConfig.Validate() + if tc.ExpectedErr == nil { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tc.ExpectedErr.Error()) + } + }) } - subscriberConstructor := func(handlerName string) (subscriber message.Subscriber, e error) { - return nil, nil +} + +func TestNewEventProcessor(t *testing.T) { + eventConfig := cqrs.EventProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { + return "", nil + }, + SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return nil, nil + }, + Marshaler: cqrs.JSONMarshaler{}, } + require.NoError(t, eventConfig.Validate()) + + router, err := message.NewRouter(message.RouterConfig{}, nil) + require.NoError(t, err) - cp, err := cqrs.NewEventProcessor(handlers, generateTopic, subscriberConstructor, cqrs.JSONMarshaler{}, nil) + cp, err := cqrs.NewEventProcessorWithConfig(router, eventConfig) assert.NotNil(t, cp) assert.NoError(t, err) - cp, err = cqrs.NewEventProcessor([]cqrs.EventHandler{}, generateTopic, subscriberConstructor, cqrs.JSONMarshaler{}, nil) - assert.Nil(t, cp) - assert.Error(t, err) - - cp, err = cqrs.NewEventProcessor(handlers, nil, subscriberConstructor, cqrs.JSONMarshaler{}, nil) - assert.Nil(t, cp) - assert.Error(t, err) - - cp, err = cqrs.NewEventProcessor(handlers, generateTopic, nil, cqrs.JSONMarshaler{}, nil) - assert.Nil(t, cp) - assert.Error(t, err) + eventConfig.SubscriberConstructor = nil + require.Error(t, eventConfig.Validate()) - cp, err = cqrs.NewEventProcessor(handlers, generateTopic, subscriberConstructor, nil, nil) + cp, err = cqrs.NewEventProcessorWithConfig(router, eventConfig) assert.Nil(t, cp) assert.Error(t, err) } @@ -62,23 +121,27 @@ func (nonPointerEventProcessor) Handle(ctx context.Context, cmd interface{}) err func TestEventProcessor_non_pointer_event(t *testing.T) { ts := NewTestServices() - eventProcessor, err := cqrs.NewEventProcessor( - []cqrs.EventHandler{nonPointerEventProcessor{}}, - func(eventName string) string { - return "events" - }, - func(handlerName string) (message.Subscriber, error) { - return ts.EventsPubSub, nil - }, - ts.Marshaler, - ts.Logger, - ) - require.NoError(t, err) + handler := nonPointerEventProcessor{} router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) require.NoError(t, err) - err = eventProcessor.AddHandlersToRouter(router) + eventProcessor, err := cqrs.NewEventProcessorWithConfig( + router, + cqrs.EventProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { + return "", nil + }, + SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return nil, nil + }, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }, + ) + require.NoError(t, err) + + err = eventProcessor.AddHandlers(handler) assert.IsType(t, cqrs.NonPointerError{}, errors.Cause(err)) } @@ -109,16 +172,255 @@ func (h *duplicateTestEventHandler2) Handle(ctx context.Context, event interface func TestEventProcessor_multiple_same_event_handlers(t *testing.T) { ts := NewTestServices() - eventProcessor, err := cqrs.NewEventProcessor( + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) + require.NoError(t, err) + + eventProcessor, err := cqrs.NewEventProcessorWithConfig( + router, + cqrs.EventProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { + return "", nil + }, + SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return nil, nil + }, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }, + ) + require.NoError(t, err) + + err = eventProcessor.AddHandlers( + &duplicateTestEventHandler1{}, + &duplicateTestEventHandler2{}, + ) + require.NoError(t, err) +} + +func TestNewEventProcessor_OnHandle(t *testing.T) { + ts := NewTestServices() + + msg1, err := ts.Marshaler.Marshal(&TestEvent{ID: "1"}) + require.NoError(t, err) + + msg2, err := ts.Marshaler.Marshal(&TestEvent{ID: "2"}) + require.NoError(t, err) + + mockSub := &mockSubscriber{ + MessagesToSend: []*message.Message{ + msg1, + msg2, + }, + } + + handlerCalled := 0 + + defer func() { + // for msg 1 we are not calling handler - but returning before + assert.Equal(t, 1, handlerCalled) + }() + + handler := cqrs.NewEventHandler("test", func(ctx context.Context, cmd *TestEvent) error { + handlerCalled++ + return nil + }) + + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) + require.NoError(t, err) + + onHandleCalled := int64(0) + + config := cqrs.EventProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { + return "events", nil + }, + SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return mockSub, nil + }, + OnHandle: func(params cqrs.EventProcessorOnHandleParams) error { + atomic.AddInt64(&onHandleCalled, 1) + + assert.IsType(t, &TestEvent{}, params.Event) + assert.Equal(t, "cqrs_test.TestEvent", params.EventName) + assert.Equal(t, handler, params.Handler) + + if params.Event.(*TestEvent).ID == "1" { + assert.Equal(t, msg1, params.Message) + return errors.New("test error") + } else { + assert.Equal(t, msg2, params.Message) + } + + return params.Handler.Handle(params.Message.Context(), params.Event) + }, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + } + cp, err := cqrs.NewEventProcessorWithConfig(router, config) + require.NoError(t, err) + + err = cp.AddHandlers(handler) + require.NoError(t, err) + + go func() { + err := router.Run(context.Background()) + assert.NoError(t, err) + }() + + <-router.Running() + + select { + case <-msg1.Nacked(): + // ok + case <-msg1.Acked(): + // ack received + t.Fatal("ack received, message should be nacked") + } + + select { + case <-msg2.Acked(): + // ok + case <-msg2.Nacked(): + // nack received + } + + assert.EqualValues(t, 2, onHandleCalled) +} + +type UnknownEvent struct { +} + +func TestNewEventProcessor_AckOnUnknownEvent(t *testing.T) { + ts := NewTestServices() + + msg, err := ts.Marshaler.Marshal(&UnknownEvent{}) + require.NoError(t, err) + + mockSub := &mockSubscriber{ + MessagesToSend: []*message.Message{ + msg, + }, + } + + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) + require.NoError(t, err) + + cp, err := cqrs.NewEventProcessorWithConfig( + router, + cqrs.EventProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { + return "events", nil + }, + SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return mockSub, nil + }, + AckOnUnknownEvent: true, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }, + ) + require.NoError(t, err) + + err = cp.AddHandlers( + cqrs.NewEventHandler("test", func(ctx context.Context, cmd *TestEvent) error { + return nil + }), + ) + require.NoError(t, err) + + go func() { + err := router.Run(context.Background()) + assert.NoError(t, err) + }() + + <-router.Running() + + select { + case <-msg.Acked(): + // ok + case <-msg.Nacked(): + // ack received + t.Fatal("ack received, message should be nacked") + } +} + +func TestNewEventProcessor_AckOnUnknownEvent_disabled(t *testing.T) { + ts := NewTestServices() + + msg, err := ts.Marshaler.Marshal(&UnknownEvent{}) + require.NoError(t, err) + + mockSub := &mockSubscriber{ + MessagesToSend: []*message.Message{ + msg, + }, + } + + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) + require.NoError(t, err) + + cp, err := cqrs.NewEventProcessorWithConfig( + router, + cqrs.EventProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { + return "events", nil + }, + SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return mockSub, nil + }, + AckOnUnknownEvent: false, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }, + ) + require.NoError(t, err) + + err = cp.AddHandlers( + cqrs.NewEventHandler("test", func(ctx context.Context, cmd *TestEvent) error { + return nil + }), + ) + require.NoError(t, err) + + go func() { + err := router.Run(context.Background()) + assert.NoError(t, err) + }() + + <-router.Running() + + select { + case <-msg.Nacked(): + // ok + case <-msg.Acked(): + // ack received + t.Fatal("ack received, message should be nacked") + } +} + +func TestNewEventProcessor_backward_compatibility_of_AckOnUnknownEvent(t *testing.T) { + ts := NewTestServices() + + msg, err := ts.Marshaler.Marshal(&UnknownEvent{}) + require.NoError(t, err) + + mockSub := &mockSubscriber{ + MessagesToSend: []*message.Message{ + msg, + }, + } + + cp, err := cqrs.NewEventProcessor( []cqrs.EventHandler{ - &duplicateTestEventHandler1{}, - &duplicateTestEventHandler2{}, + cqrs.NewEventHandler("test", func(ctx context.Context, cmd *TestEvent) error { + return nil + }), }, func(eventName string) string { return "events" }, func(handlerName string) (message.Subscriber, error) { - return ts.EventsPubSub, nil + return mockSub, nil }, ts.Marshaler, ts.Logger, @@ -128,6 +430,47 @@ func TestEventProcessor_multiple_same_event_handlers(t *testing.T) { router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) require.NoError(t, err) - err = eventProcessor.AddHandlersToRouter(router) + err = cp.AddHandlersToRouter(router) + require.NoError(t, err) + + go func() { + err := router.Run(context.Background()) + assert.NoError(t, err) + }() + + <-router.Running() + + select { + case <-msg.Acked(): + // ok + case <-msg.Nacked(): + // ack received + t.Fatal("ack received, message should be nacked") + } +} + +func TestEventProcessor_AddHandlersToRouter_without_disableRouterAutoAddHandlers(t *testing.T) { + ts := NewTestServices() + + router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) + require.NoError(t, err) + + cp, err := cqrs.NewEventProcessorWithConfig( + router, + cqrs.EventProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { + return "events", nil + }, + SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return ts.EventsPubSub, nil + }, + AckOnUnknownEvent: false, + Marshaler: ts.Marshaler, + Logger: ts.Logger, + }, + ) require.NoError(t, err) + + err = cp.AddHandlersToRouter(router) + assert.ErrorContains(t, err, "AddHandlersToRouter should be called only when using deprecated NewEventProcessor") } diff --git a/components/forwarder/forwarder.go b/components/forwarder/forwarder.go index dd77a2943..a5ffa1e99 100644 --- a/components/forwarder/forwarder.go +++ b/components/forwarder/forwarder.go @@ -24,6 +24,12 @@ type Config struct { // AckWhenCannotUnwrap enables acking of messages which cannot be unwrapped from an envelope. AckWhenCannotUnwrap bool + + // Router is a router used by the forwarder. + // If not provided, a new router will be created. + // + // If router is provided, it's not necessary to call `Forwarder.Run()` if the router is started with `router.Run()`. + Router *message.Router } func (c *Config) setDefaults() { @@ -66,21 +72,27 @@ func NewForwarder(subscriberIn message.Subscriber, publisherOut message.Publishe return nil, errors.Wrap(err, "invalid router config") } - router, err := message.NewRouter(routerConfig, logger) - if err != nil { - return nil, errors.Wrap(err, "cannot create a router") + var router *message.Router + if config.Router != nil { + router = config.Router + } else { + var err error + router, err = message.NewRouter(routerConfig, logger) + if err != nil { + return nil, errors.Wrap(err, "cannot create a router") + } } f := &Forwarder{router, publisherOut, logger, config} - router.AddNoPublisherHandler( + handler := router.AddNoPublisherHandler( "events_forwarder", config.ForwarderTopic, subscriberIn, f.forwardMessage, ) - router.AddMiddleware(config.Middlewares...) + handler.AddMiddleware(config.Middlewares...) return f, nil } diff --git a/docs/build.sh b/docs/build.sh index ef828cfc1..e771e29de 100755 --- a/docs/build.sh +++ b/docs/build.sh @@ -43,8 +43,13 @@ else "components/cqrs/command_bus.go" "components/cqrs/command_processor.go" + "components/cqrs/command_handler.go" + "components/cqrs/event_bus.go" "components/cqrs/event_processor.go" + "components/cqrs/event_processor_group.go" + "components/cqrs/event_handler.go" + "components/cqrs/marshaler.go" "components/cqrs/cqrs.go" "components/cqrs/marshaler.go" diff --git a/docs/content/docs/cqrs.md b/docs/content/docs/cqrs.md index cafe5fff9..81e91bd4b 100644 --- a/docs/content/docs/cqrs.md +++ b/docs/content/docs/cqrs.md @@ -1,6 +1,6 @@ +++ title = "CQRS Component" -description = "Command Query Responsibility Segregation (CQRS) Component" +description = "Build CQRS and Event-Driven applications" date = 2019-02-12T12:47:30+01:00 weight = -400 draft = false @@ -16,58 +16,84 @@ toc = true > > Source: [www.cqrs.nu FAQ](http://www.cqrs.nu/Faq/command-query-responsibility-segregation) -### Glossary - ![CQRS Schema](https://threedots.tech/watermill-io/cqrs-big-picture.svg) -#### Command +The `cqrs` component provides some useful abstractions built on top of Pub/Sub and Router that help to implement the CQRS pattern. -The command is a simple data structure, representing the request for executing some operation. +You don't need to implement the entire CQRS. It's very common to use just the event part of this component to build event-driven applications. -#### Command Bus +### Building blocks + +#### Event + +The event represents something that already took place. Events are immutable. + +#### Event Bus {{% render-md %}} -{{% load-snippet-partial file="src-link/components/cqrs/command_bus.go" first_line_contains="// CommandBus" last_line_contains="type CommandBus" padding_after="0" %}} +{{% load-snippet-partial file="src-link/components/cqrs/event_bus.go" first_line_contains="// EventBus" last_line_contains="type EventBus" padding_after="0" %}} {{% /render-md %}} -#### Command Processor +{{% render-md %}} +{{% load-snippet-partial file="src-link/components/cqrs/event_bus.go" first_line_contains="type EventBusConfig" last_line_contains="func (c *EventBusConfig) setDefaults()" padding_after="4" %}} +{{% /render-md %}} + +#### Event Processor {{% render-md %}} -{{% load-snippet-partial file="src-link/components/cqrs/command_processor.go" first_line_contains="// CommandProcessor" last_line_contains="type CommandProcessor" padding_after="0" %}} +{{% load-snippet-partial file="src-link/components/cqrs/event_processor.go" first_line_contains="// EventProcessor" last_line_contains="type EventProcessor" padding_after="0" %}} {{% /render-md %}} -#### Command Handler +{{% render-md %}} +{{% load-snippet-partial file="src-link/components/cqrs/event_processor.go" first_line_contains="type EventProcessorConfig" last_line_contains="func (c *EventProcessorConfig) setDefaults()" padding_after="4" %}} +{{% /render-md %}} + +#### Event Group Processor {{% render-md %}} -{{% load-snippet-partial file="src-link/components/cqrs/command_processor.go" first_line_contains="// CommandHandler" last_line_contains="type CommandHandler" padding_after="0" %}} +{{% load-snippet-partial file="src-link/components/cqrs/event_processor_group.go" first_line_contains="// EventGroupProcessor" last_line_contains="type EventGroupProcessor" padding_after="0" %}} {{% /render-md %}} -#### Event +{{% render-md %}} +{{% load-snippet-partial file="src-link/components/cqrs/event_processor_group.go" first_line_contains="type EventGroupProcessorConfig" last_line_contains="func (c *EventGroupProcessorConfig) setDefaults()" padding_after="4" %}} +{{% /render-md %}} -The event represents something that already took place. Events are immutable. +Learn more in [Event Group Processor](#event-handler-groups). -#### Event Bus +#### Event Handler {{% render-md %}} -{{% load-snippet-partial file="src-link/components/cqrs/event_bus.go" first_line_contains="// EventBus" last_line_contains="type EventBus" padding_after="0" %}} +{{% load-snippet-partial file="src-link/components/cqrs/event_handler.go" first_line_contains="// EventHandler" last_line_contains="type EventHandler" padding_after="0" %}} {{% /render-md %}} -#### Event Processor +#### Command + +The command is a simple data structure, representing the request for executing some operation. + +#### Command Bus {{% render-md %}} -{{% load-snippet-partial file="src-link/components/cqrs/event_processor.go" first_line_contains="// EventProcessor" last_line_contains="type EventProcessor" padding_after="0" %}} +{{% load-snippet-partial file="src-link/components/cqrs/command_bus.go" first_line_contains="// CommandBus" last_line_contains="type CommandBus" padding_after="0" %}} {{% /render-md %}} -#### Event Handler +{{% render-md %}} +{{% load-snippet-partial file="src-link/components/cqrs/command_bus.go" first_line_contains="type CommandBusConfig" last_line_contains="func (c *CommandBusConfig) setDefaults()" padding_after="4" %}} +{{% /render-md %}} + +#### Command Processor + +{{% render-md %}} +{{% load-snippet-partial file="src-link/components/cqrs/command_processor.go" first_line_contains="// CommandProcessor" last_line_contains="type CommandProcessor" padding_after="0" %}} +{{% /render-md %}} {{% render-md %}} -{{% load-snippet-partial file="src-link/components/cqrs/event_processor.go" first_line_contains="// EventHandler" last_line_contains="type EventHandler" padding_after="0" %}} +{{% load-snippet-partial file="src-link/components/cqrs/command_processor.go" first_line_contains="type CommandProcessorConfig" last_line_contains="func (c *CommandProcessorConfig) setDefaults()" padding_after="4" %}} {{% /render-md %}} -#### CQRS Facade +#### Command Handler {{% render-md %}} -{{% load-snippet-partial file="src-link/components/cqrs/cqrs.go" first_line_contains="// Facade" last_line_contains="type Facade" padding_after="0" %}} +{{% load-snippet-partial file="src-link/components/cqrs/command_handler.go" first_line_contains="// CommandHandler" last_line_contains="type CommandHandler" padding_after="0" %}} {{% /render-md %}} #### Command and Event Marshaler @@ -129,23 +155,65 @@ As mentioned before, we want to order a beer every time when a room is booked (* `OrderBeerHandler` is very similar to `BookRoomHandler`. The only difference is, that it sometimes returns an error when there are not enough beers, which causes redelivery of the command. You can find the entire implementation in the [example source code](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/5-cqrs-protobuf/?utm_source=cqrs_doc). +### Event Handler groups + +By default, each event handler has a separate subscriber instance. +It works fine, if just one event type is sent to the topic. + +In the scenario, when we have multiple event types on one topic, you have two options: + +1. You can set `EventConfig.AckOnUnknownEvent` to true - it will acknowledge all events that are not handled by handler, +2. You can use Event Handler groups mechanism. + +To use event groups, you need to set `GenerateHandlerGroupSubscribeTopic` and `GroupSubscriberConstructor` options in [`EventConfig`](#event-config). + +After that, you can use `AddHandlersGroup` on [`EventProcessor`](#event-processor). + +{{% render-md %}} +{{% load-snippet-partial file="src-link/_examples/basic/5-cqrs-protobuf/main.go" first_line_contains="eventProcessor.AddHandlersGroup(" last_line_contains="if err != nil {" padding_after="0" %}} +{{% /render-md %}} + +Both `GenerateHandlerGroupSubscribeTopic` and `GroupSubscriberConstructor` receives information about group name in function arguments. + +### Generic handlers + +Since Watermill v1.3 it's possible to use generic handlers for commands and events. It's useful when you have a lot of commands/events and you don't want to create a handler for each of them. + +{{% render-md %}} +{{% load-snippet-partial file="src-link/_examples/basic/5-cqrs-protobuf/main.go" first_line_contains="cqrs.NewGroupEventHandler" last_line_contains="})," padding_after="0" %}} +{{% /render-md %}} + +Under the hood, it creates EventHandler or CommandHandler implementation. +It's available for all kind of handlers. + +{{% render-md %}} +{{% load-snippet-partial file="src-link/components/cqrs/command_handler.go" first_line_contains="// NewCommandHandler" last_line_contains="func NewCommandHandler" padding_after="0" %}} +{{% /render-md %}} + +{{% render-md %}} +{{% load-snippet-partial file="src-link/components/cqrs/event_handler.go" first_line_contains="// NewEventHandler" last_line_contains="func NewEventHandler" padding_after="0" %}} +{{% /render-md %}} + +{{% render-md %}} +{{% load-snippet-partial file="src-link/components/cqrs/event_handler.go" first_line_contains="// NewGroupEventHandler" last_line_contains="func NewGroupEventHandler" padding_after="0" %}} +{{% /render-md %}} + ### Building a read model with the event handler {{% render-md %}} {{% load-snippet-partial file="src-link/_examples/basic/5-cqrs-protobuf/main.go" first_line_contains="// BookingsFinancialReport is a read model" last_line_contains="func main() {" padding_after="0" %}} {{% /render-md %}} -### Wiring it up - the CQRS facade +### Wiring it up -We have all the blocks to build our CQRS application. We now need to use some kind of glue to wire it up. +We have all the blocks to build our CQRS application. -We will use the simplest in-memory messaging infrastructure: [GoChannel]({{< ref "/pubsubs/gochannel" >}}). +We will use the AMQP (RabbitMQ) as our message broker: [AMQP]({{< ref "/pubsubs/amqp" >}}). Under the hood, CQRS is using Watermill's message router. If you are not familiar with it and want to learn how it works, you should check [Getting Started guide]({{< ref "getting-started" >}}). It will also show you how to use some standard messaging patterns, like metrics, poison queue, throttling, correlation and other tools used by every message-driven application. Those come built-in with Watermill. Let's go back to the CQRS. As you already know, CQRS is built from multiple components, like Command or Event buses, handlers, processors, etc. -To simplify creating all these building blocks, we created `cqrs.Facade`, which creates all of them. {{% render-md %}} {{% load-snippet-partial file="src-link/_examples/basic/5-cqrs-protobuf/main.go" first_line_contains="main() {" last_line_contains="err := router.Run(" padding_after="3" %}} diff --git a/go.mod b/go.mod index 694d1b08c..c570dfd06 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ThreeDotsLabs/watermill -go 1.17 +go 1.20 require ( github.com/cenkalti/backoff/v3 v3.2.2 @@ -13,7 +13,7 @@ require ( github.com/oklog/ulid v1.3.1 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.14.0 - github.com/stretchr/testify v1.8.1 + github.com/stretchr/testify v1.8.4 google.golang.org/protobuf v1.28.1 ) @@ -28,6 +28,7 @@ require ( github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.39.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect golang.org/x/sys v0.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 35cfc33d6..7f6dba3ae 100644 --- a/go.sum +++ b/go.sum @@ -227,6 +227,7 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -235,6 +236,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/message/router.go b/message/router.go index 63f77587a..32e1dd301 100644 --- a/message/router.go +++ b/message/router.go @@ -391,6 +391,8 @@ func (r *Router) RunHandlers(ctx context.Context) error { r.handlersLock.Lock() defer r.handlersLock.Unlock() + r.logger.Info("Running router handlers", watermill.LogFields{"count": len(r.handlers)}) + for name, h := range r.handlers { name := name h := h @@ -465,7 +467,7 @@ func (r *Router) closeWhenAllHandlersStopped() { } r.handlersWg.Wait() - if r.isClosed() { + if r.IsClosed() { // already closed return } @@ -484,11 +486,16 @@ func (r *Router) closeWhenAllHandlersStopped() { // go r.Run(ctx) // <- r.Running() // fmt.Println("Router is running") +// +// Warning: for historical reasons, this channel is not aware of router closing - the channel will be closed if the router has been running and closed. func (r *Router) Running() chan struct{} { return r.running } // IsRunning returns true when router is running. +// +// Warning: for historical reasons, this method is not aware of router closing. +// If you want to know if the router was closed, use IsClosed. func (r *Router) IsRunning() bool { select { case <-r.running: @@ -544,7 +551,7 @@ func (r *Router) waitForHandlers() bool { return sync_internal.WaitGroupTimeout(&waitGroup, r.config.CloseTimeout) } -func (r *Router) isClosed() bool { +func (r *Router) IsClosed() bool { r.closedLock.Lock() defer r.closedLock.Unlock() diff --git a/message/router_test.go b/message/router_test.go index 973d1d0cb..110f23adf 100644 --- a/message/router_test.go +++ b/message/router_test.go @@ -98,6 +98,8 @@ func TestRouter_functional(t *testing.T) { defer func() { assert.True(t, r.IsRunning()) assert.NoError(t, r.Close()) + + assert.True(t, r.IsClosed()) }() <-allMessagesSent diff --git a/pubsub/gochannel/fanout.go b/pubsub/gochannel/fanout.go index 2866949ff..643709f9b 100644 --- a/pubsub/gochannel/fanout.go +++ b/pubsub/gochannel/fanout.go @@ -100,6 +100,10 @@ func (f *FanOut) Running() chan struct{} { return f.internalRouter.Running() } +func (f *FanOut) IsClosed() bool { + return f.internalRouter.IsClosed() +} + // Subscribe starts subscription to the FanOut's internal Pub/Sub. func (f *FanOut) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) { return f.internalPubSub.Subscribe(ctx, topic) @@ -107,5 +111,14 @@ func (f *FanOut) Subscribe(ctx context.Context, topic string) (<-chan *message.M // Close closes the FanOut's internal Pub/Sub. func (f *FanOut) Close() error { - return f.internalPubSub.Close() + var err error + + if routerCloseErr := f.internalRouter.Close(); routerCloseErr != nil { + err = errors.Join(err, routerCloseErr) + } + if internalPubSubCloseErr := f.internalPubSub.Close(); internalPubSubCloseErr != nil { + err = errors.Join(err, internalPubSubCloseErr) + } + + return err } diff --git a/pubsub/gochannel/fanout_test.go b/pubsub/gochannel/fanout_test.go index 88f334d8a..f5597d4bd 100644 --- a/pubsub/gochannel/fanout_test.go +++ b/pubsub/gochannel/fanout_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/ThreeDotsLabs/watermill" @@ -111,4 +112,6 @@ func TestFanOut_RouterClosed(t *testing.T) { err = fanout.Close() require.NoError(t, err) + + assert.True(t, fanout.IsClosed()) } diff --git a/pubsub/tests/test_pubsub.go b/pubsub/tests/test_pubsub.go index a6125b749..c380a62d4 100644 --- a/pubsub/tests/test_pubsub.go +++ b/pubsub/tests/test_pubsub.go @@ -28,10 +28,6 @@ import ( var defaultTimeout = time.Second * 15 -func init() { - rand.Seed(3) -} - // TestPubSub is a universal test suite. Every Pub/Sub implementation should pass it // before it's considered production ready. //