Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added request-reply support for commands #366

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
12708e2
added support for Event Handler groups
roblaszczak Mar 14, 2023
67f92fd
Added Request/Reply support for commands
roblaszczak Apr 12, 2023
372d0cc
added OnListenForReplyFinished and PubSubRequestReplySubscriberContext
roblaszczak Apr 17, 2023
918913b
deprecated CQRS facade
roblaszczak Apr 17, 2023
acc6901
close internalRouter in gochannel.FanOut.Close()
roblaszczak Apr 21, 2023
75d7033
added back missing error return
roblaszczak Apr 27, 2023
cdadfbe
export HandledCommandMessageUuidMetadataKey
roblaszczak Apr 27, 2023
1983487
added PubSubRequestReplyOnCommandProcessedContext
roblaszczak Apr 27, 2023
1c482d4
added generic CommandHandler and EventHandler constructors
roblaszczak May 6, 2023
8e71ddb
variadic handlers in command/event processors AddHandler
roblaszczak May 6, 2023
e6036b5
added option to provide pre-existing router for forwarder
roblaszczak May 7, 2023
1b1e016
forwarder: add middlewares just to forwarder handler, not entire router
roblaszczak May 7, 2023
cc8c400
fix cqrs tests
roblaszczak Jun 4, 2023
ace0f68
added tests for new cqrs API
roblaszczak Jun 4, 2023
6c11946
re-organize commands
roblaszczak Jun 4, 2023
1734aaa
leftover
roblaszczak Jun 4, 2023
03433ec
added generic command/event handler tests
roblaszczak Jun 4, 2023
9e38ea1
cqrs configs tests
roblaszczak Jun 4, 2023
e58975a
added commands request-reply tests
roblaszczak Jun 4, 2023
f6f207c
Merge branch 'event-handler-groups-cleanup' into request-reply
roblaszczak Jun 8, 2023
a285ff0
fix mergo
roblaszczak Jun 8, 2023
e282d21
Merge branch 'event-handler-groups-cleanup' into request-reply
roblaszczak Jun 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions components/cqrs/command_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ type CommandConfig struct {
OnSend OnCommandSendFn
OnHandle OnCommandHandleFn

// RequestReplyEnabled enables request-reply pattern for commands.
// Reply is sent **just** from the CommandBus.SendAndWait method.
// This configuration doesn't affect CommandBus.Send method.
RequestReplyEnabled bool
RequestReplyBackend RequestReplyBackend

Marshaler CommandEventMarshaler
Logger watermill.LoggerAdapter

Expand Down Expand Up @@ -45,6 +51,10 @@ func (c CommandConfig) Validate() error {
err = stdErrors.Join(err, errors.New("missing Marshaler"))
}

if c.RequestReplyEnabled && c.RequestReplyBackend == nil {
err = stdErrors.Join(err, errors.New("missing RequestReply.Backend"))
}

return err
}

Expand Down
19 changes: 19 additions & 0 deletions components/cqrs/command_processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cqrs

import (
stdErrors "errors"
"fmt"

"github.com/pkg/errors"
Expand Down Expand Up @@ -188,10 +189,28 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water
Message: msg,
})

var replyErr error
// todo: test
if p.config.RequestReplyEnabled {
replyErr = p.config.RequestReplyBackend.OnCommandProcessed(msg, cmd, err)
}


if p.config.AckCommandHandlingErrors && err != nil {
// we want to nack if we are using request-reply,
// and we failed to send information about failure
// todo: test
if replyErr != nil {
return replyErr
}

logger.Error("Error when handling command", err, nil)
return nil
} else if replyErr != nil {
// todo: test
err = stdErrors.Join(err, replyErr)
}

if err != nil {
logger.Debug("Error when handling command", watermill.LogFields{"err": err})
return err
Expand Down
40 changes: 40 additions & 0 deletions components/cqrs/command_request_reply.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package cqrs

import (
"context"
"fmt"
"time"

"github.com/ThreeDotsLabs/watermill/message"
)

type RequestReplyBackend interface {
ModifyCommandMessageBeforePublish(cmdMsg *message.Message, command any) error

ListenForReply(ctx context.Context, cmdMsg *message.Message, cmd any) (<-chan CommandReply, error)

OnCommandProcessed(cmdMsg *message.Message, cmd any, handleErr error) error
}

// ReplyTimeoutError is returned when the reply timeout is exceeded.
type ReplyTimeoutError struct {
Duration time.Duration
Err error
}

func (e ReplyTimeoutError) Error() string {
return fmt.Sprintf("reply timeout after %s: %s", e.Duration, e.Err)
}

// CommandHandlerError is returned when the command handler returns an error.
type CommandHandlerError struct {
Err error
}

func (e CommandHandlerError) Error() string {
return e.Err.Error()
}

func (e CommandHandlerError) Unwrap() error {
return e.Err
}
51 changes: 51 additions & 0 deletions components/cqrs/command_request_reply_bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package cqrs

import (
"context"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
)

type CommandReply struct {
// HandlerErr contains the error returned by the command handler or by RequestReplyBackend if sending reply failed.
//
// If error from handler is returned, CommandHandlerError is returned.
// If listening for reply timed out, HandlerErr is ReplyTimeoutError.
// If processing was successful, HandlerErr is nil.
Err error

// ReplyMsg contains the reply message from the command handler.
// Warning: ReplyMsg is nil if timeout occurred.
ReplyMsg *message.Message
}

// todo: test
// todo: add cancel func?
// SendAndWait sends command to the command bus and waits for the command execution.
func (c CommandBus) SendAndWait(ctx context.Context, cmd interface{}) (<-chan CommandReply, error) {
if !c.config.RequestReplyEnabled {
return nil, errors.New("RequestReply is not enabled in config")
}

msg, topicName, err := c.newMessage(ctx, cmd)
if err != nil {
return nil, err
}

if err := c.config.RequestReplyBackend.ModifyCommandMessageBeforePublish(msg, cmd); err != nil {
return nil, errors.Wrap(err, "cannot modify command message before publish")
}

// todo: wait for 1 reply by default?
replyChan, err := c.config.RequestReplyBackend.ListenForReply(ctx, msg, cmd)
if err != nil {
return nil, errors.Wrap(err, "cannot listen for reply")
}

if err := c.publisher.Publish(topicName, msg); err != nil {
return nil, err
}

return replyChan, nil
}
Loading