From 3580f9c2b3e930229d61a4190e95d021658a829b Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Tue, 27 Feb 2024 12:36:34 +0100 Subject: [PATCH] feat(orchestration): add events --- components/ledger/libs/publish/module.go | 47 +++++++++ ee/orchestration/cmd/root.go | 1 - ee/orchestration/cmd/worker.go | 2 - ee/orchestration/go.mod | 2 +- .../internal/triggers/activity.go | 36 ++++++- ee/orchestration/internal/triggers/module.go | 5 +- .../internal/triggers/workflow_trigger.go | 20 ++-- .../triggers/workflow_trigger_test.go | 4 + .../internal/workflow/activities.go | 37 +++++++ .../internal/workflow/activities_test.go | 21 ++++ ee/orchestration/internal/workflow/input.go | 13 +++ .../internal/workflow/manager_test.go | 3 + ee/orchestration/internal/workflow/module.go | 1 + ee/orchestration/pkg/events/events.go | 48 +++++++++ .../orchestration/v2.0.0/FAILED_TRIGGER.yaml | 12 +++ .../orchestration/v2.0.0/FAILED_WORKFLOW.yaml | 12 +++ .../v2.0.0/SUCCEEDED_TRIGGER.yaml | 9 ++ .../v2.0.0/SUCCEEDED_WORKFLOW.yaml | 9 ++ libs/go-libs/publish/module.go | 47 +++++++++ tests/integration/internal/events.go | 21 ++-- tests/integration/internal/helpers.go | 2 +- tests/integration/internal/modules/ledger.go | 2 +- .../internal/modules/orchestration.go | 3 +- .../suite/orchestration-triggers.go | 35 ++++++- ...hestration-workflows-execute-with-error.go | 99 +++++++++++++++++++ .../suite/orchestration-workflows-execute.go | 14 +++ 26 files changed, 472 insertions(+), 33 deletions(-) create mode 100644 ee/orchestration/internal/workflow/activities.go create mode 100644 ee/orchestration/internal/workflow/activities_test.go create mode 100644 ee/orchestration/pkg/events/events.go create mode 100644 libs/events/services/orchestration/v2.0.0/FAILED_TRIGGER.yaml create mode 100644 libs/events/services/orchestration/v2.0.0/FAILED_WORKFLOW.yaml create mode 100644 libs/events/services/orchestration/v2.0.0/SUCCEEDED_TRIGGER.yaml create mode 100644 libs/events/services/orchestration/v2.0.0/SUCCEEDED_WORKFLOW.yaml diff --git a/components/ledger/libs/publish/module.go b/components/ledger/libs/publish/module.go index 7be3d900a7..b46145b2d0 100644 --- a/components/ledger/libs/publish/module.go +++ b/components/ledger/libs/publish/module.go @@ -6,6 +6,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" "go.uber.org/fx" + "sync" ) func newGoChannel() *gochannel.GoChannel { @@ -95,3 +96,49 @@ func NewTopicMapperPublisherDecorator(publisher message.Publisher, topics map[st topics: topics, } } + +type noOpPublisher struct { +} + +func (n noOpPublisher) Publish(topic string, messages ...*message.Message) error { + return nil +} + +func (n noOpPublisher) Close() error { + return nil +} + +var NoOpPublisher message.Publisher = &noOpPublisher{} + +type memoryPublisher struct { + sync.Mutex + messages map[string][]*message.Message +} + +func (m *memoryPublisher) Publish(topic string, messages ...*message.Message) error { + m.Lock() + defer m.Unlock() + + m.messages[topic] = append(m.messages[topic], messages...) + return nil +} + +func (m *memoryPublisher) Close() error { + m.Lock() + defer m.Unlock() + + m.messages = map[string][]*message.Message{} + return nil +} + +func (m *memoryPublisher) AllMessages() map[string][]*message.Message { + return m.messages +} + +var _ message.Publisher = (*memoryPublisher)(nil) + +func InMemory() *memoryPublisher { + return &memoryPublisher{ + messages: map[string][]*message.Message{}, + } +} diff --git a/ee/orchestration/cmd/root.go b/ee/orchestration/cmd/root.go index 09161d71d4..065474c4b1 100644 --- a/ee/orchestration/cmd/root.go +++ b/ee/orchestration/cmd/root.go @@ -85,7 +85,6 @@ func NewRootCommand() *cobra.Command { auth.InitAuthFlags(cmd.PersistentFlags()) bunconnect.InitFlags(cmd.PersistentFlags()) iam.InitFlags(cmd.PersistentFlags()) - publish.InitCLIFlags(cmd) return cmd } diff --git a/ee/orchestration/cmd/worker.go b/ee/orchestration/cmd/worker.go index 8f79b8d9cc..c3361af746 100644 --- a/ee/orchestration/cmd/worker.go +++ b/ee/orchestration/cmd/worker.go @@ -1,7 +1,6 @@ package cmd import ( - "github.com/formancehq/stack/libs/go-libs/publish" "net/http" "github.com/formancehq/orchestration/internal/triggers" @@ -34,7 +33,6 @@ func workerOptions() fx.Option { viper.GetString(temporalTaskQueueFlag), viper.GetStringSlice(topicsFlag), ), - publish.CLIPublisherModule("orchestration"), ) } diff --git a/ee/orchestration/go.mod b/ee/orchestration/go.mod index 6df7f87f1f..5d845bb61a 100644 --- a/ee/orchestration/go.mod +++ b/ee/orchestration/go.mod @@ -6,6 +6,7 @@ toolchain go1.21.5 require ( github.com/ThreeDotsLabs/watermill v1.3.5 + github.com/davecgh/go-spew v1.1.1 github.com/expr-lang/expr v1.15.6 github.com/formancehq/formance-sdk-go/v2 v2.0.0-00010101000000-000000000000 github.com/formancehq/stack/libs/go-libs v0.0.0-20230221161632-e6dc6a89a85e @@ -55,7 +56,6 @@ require ( github.com/aws/smithy-go v1.19.0 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/containerd/continuity v0.3.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect github.com/dnwe/otelsarama v0.0.0-20231212173111-631a0a53d5d4 // indirect github.com/docker/cli v20.10.17+incompatible // indirect github.com/docker/docker v24.0.7+incompatible // indirect diff --git a/ee/orchestration/internal/triggers/activity.go b/ee/orchestration/internal/triggers/activity.go index c1108119e4..1303936b02 100644 --- a/ee/orchestration/internal/triggers/activity.go +++ b/ee/orchestration/internal/triggers/activity.go @@ -3,8 +3,12 @@ package triggers import ( "context" "encoding/json" + "fmt" "strings" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/davecgh/go-spew/spew" + "github.com/formancehq/orchestration/pkg/events" sharedlogging "github.com/formancehq/stack/libs/go-libs/logging" "go.temporal.io/sdk/temporal" @@ -21,6 +25,7 @@ type Activities struct { db *bun.DB manager *workflow.WorkflowManager expressionEvaluator *expressionEvaluator + publisher message.Publisher } func (a Activities) processTrigger(ctx context.Context, request ProcessEventRequest, trigger Trigger) bool { @@ -74,7 +79,7 @@ func (a Activities) ListTriggers(ctx context.Context, request ProcessEventReques return ret, nil } -func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request ProcessEventRequest) error { +func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request ProcessEventRequest) (*Occurrence, error) { span := trace.SpanFromContext(ctx) var ( @@ -95,7 +100,7 @@ func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request instance, triggerError := a.manager.RunWorkflow(ctx, trigger.WorkflowID, evaluated) if triggerError != nil { - return triggerError + return nil, triggerError } occurrence.WorkflowInstanceID = pointer.For(instance.ID) @@ -113,16 +118,39 @@ func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request sharedlogging.FromContext(ctx).Errorf("unable to save trigger occurrence: %s", err) } - return triggerError + return &occurrence, nil } -func NewActivities(db *bun.DB, manager *workflow.WorkflowManager, expressionEvaluator *expressionEvaluator) Activities { +func (a Activities) SendEventForTriggerTermination(ctx context.Context, occurrence Occurrence) error { + spew.Dump(occurrence) + if occurrence.Error == nil || *occurrence.Error == "" { + fmt.Println("send succeeded") + return a.publisher.Publish(events.SucceededTrigger, + events.NewMessage(ctx, events.SucceededTrigger, events.SucceededTriggerPayload{ + ID: occurrence.TriggerID, + EventID: occurrence.EventID, + })) + } else { + fmt.Println("send failure") + return a.publisher.Publish(events.FailedTrigger, + events.NewMessage(ctx, events.FailedTrigger, events.FailedTriggerPayload{ + ID: occurrence.TriggerID, + Error: *occurrence.Error, + EventID: occurrence.EventID, + })) + } +} + +func NewActivities(db *bun.DB, manager *workflow.WorkflowManager, + expressionEvaluator *expressionEvaluator, publisher message.Publisher) Activities { return Activities{ db: db, manager: manager, expressionEvaluator: expressionEvaluator, + publisher: publisher, } } var ProcessEventActivity = Activities{}.ProcessTrigger +var SendEventForTriggerTermination = Activities{}.SendEventForTriggerTermination var ListTriggersActivity = Activities{}.ListTriggers diff --git a/ee/orchestration/internal/triggers/module.go b/ee/orchestration/internal/triggers/module.go index 89534c7c1a..03cac9aea2 100644 --- a/ee/orchestration/internal/triggers/module.go +++ b/ee/orchestration/internal/triggers/module.go @@ -24,8 +24,9 @@ func NewModule(taskQueue string) fx.Option { }, fx.As(new(any)), fx.ResultTags(`group:"workflows"`)), ), fx.Provide( - fx.Annotate(func(db *bun.DB, manager *workflow.WorkflowManager, expressionEvaluator *expressionEvaluator) Activities { - return NewActivities(db, manager, expressionEvaluator) + fx.Annotate(func(db *bun.DB, manager *workflow.WorkflowManager, + expressionEvaluator *expressionEvaluator, publisher message.Publisher) Activities { + return NewActivities(db, manager, expressionEvaluator, publisher) }, fx.As(new(any)), fx.ResultTags(`group:"activities"`)), ), ) diff --git a/ee/orchestration/internal/triggers/workflow_trigger.go b/ee/orchestration/internal/triggers/workflow_trigger.go index 1415bb6381..2e4d994517 100644 --- a/ee/orchestration/internal/triggers/workflow_trigger.go +++ b/ee/orchestration/internal/triggers/workflow_trigger.go @@ -3,9 +3,6 @@ package triggers import ( "time" - "github.com/pkg/errors" - "go.temporal.io/sdk/temporal" - "github.com/formancehq/stack/libs/go-libs/publish" "github.com/uptrace/bun" temporalworkflow "go.temporal.io/sdk/workflow" @@ -36,6 +33,7 @@ func (w triggerWorkflow) RunTrigger(ctx temporalworkflow.Context, req ProcessEve } for _, trigger := range triggers { + occurrence := &Occurrence{} err := temporalworkflow.ExecuteActivity( temporalworkflow.WithActivityOptions(ctx, temporalworkflow.ActivityOptions{ StartToCloseTimeout: 10 * time.Second, @@ -43,12 +41,20 @@ func (w triggerWorkflow) RunTrigger(ctx temporalworkflow.Context, req ProcessEve ProcessEventActivity, trigger, req, + ).Get(ctx, occurrence) + if err != nil { + return err + } + + err = temporalworkflow.ExecuteActivity( + temporalworkflow.WithActivityOptions(ctx, temporalworkflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + }), + SendEventForTriggerTermination, + occurrence, ).Get(ctx, nil) if err != nil { - applicationError := &temporal.ApplicationError{} - if !errors.As(err, &applicationError) { - return err - } + return err } } diff --git a/ee/orchestration/internal/triggers/workflow_trigger_test.go b/ee/orchestration/internal/triggers/workflow_trigger_test.go index e22131163e..057042efae 100644 --- a/ee/orchestration/internal/triggers/workflow_trigger_test.go +++ b/ee/orchestration/internal/triggers/workflow_trigger_test.go @@ -38,6 +38,10 @@ func TestWorkflow(t *testing.T) { env. OnActivity(ProcessEventActivity, mock.Anything, trigger, req). Once(). + Return(&Occurrence{}, nil) + env. + OnActivity(SendEventForTriggerTermination, mock.Anything, mock.Anything). + Once(). Return(nil) env.ExecuteWorkflow(RunTrigger, req) diff --git a/ee/orchestration/internal/workflow/activities.go b/ee/orchestration/internal/workflow/activities.go new file mode 100644 index 0000000000..85ad93a291 --- /dev/null +++ b/ee/orchestration/internal/workflow/activities.go @@ -0,0 +1,37 @@ +package workflow + +import ( + "context" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/formancehq/orchestration/pkg/events" +) + +type Activities struct { + publisher message.Publisher +} + +func (a Activities) SendWorkflowTerminationEvent(ctx context.Context, instance Instance) error { + if instance.Error == "" { + return a.publisher.Publish(events.SucceededWorkflow, + events.NewMessage(ctx, events.SucceededWorkflow, events.SucceededWorkflowPayload{ + ID: instance.WorkflowID, + InstanceID: instance.ID, + })) + } else { + return a.publisher.Publish(events.FailedWorkflow, + events.NewMessage(ctx, events.FailedWorkflow, events.FailedWorkflowPayload{ + ID: instance.WorkflowID, + InstanceID: instance.ID, + Error: instance.Error, + })) + } +} + +var SendWorkflowTerminationEventActivity = (&Activities{}).SendWorkflowTerminationEvent + +func NewActivities(publisher message.Publisher) Activities { + return Activities{ + publisher: publisher, + } +} diff --git a/ee/orchestration/internal/workflow/activities_test.go b/ee/orchestration/internal/workflow/activities_test.go new file mode 100644 index 0000000000..dc531ef75f --- /dev/null +++ b/ee/orchestration/internal/workflow/activities_test.go @@ -0,0 +1,21 @@ +package workflow + +import ( + "testing" + + "github.com/formancehq/stack/libs/go-libs/publish" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/testsuite" +) + +func TestActivities(t *testing.T) { + publisher := publish.InMemory() + activities := NewActivities(publisher) + + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + env.RegisterActivity(activities.SendWorkflowTerminationEvent) + _, err := env.ExecuteActivity(SendWorkflowTerminationEventActivity, NewInstance("xxx")) + require.NoError(t, err) + require.NotEmpty(t, publisher.AllMessages()) +} diff --git a/ee/orchestration/internal/workflow/input.go b/ee/orchestration/internal/workflow/input.go index 2f6e912766..17cd3dc10a 100644 --- a/ee/orchestration/internal/workflow/input.go +++ b/ee/orchestration/internal/workflow/input.go @@ -2,6 +2,7 @@ package workflow import ( "context" + "time" "github.com/uptrace/bun" "go.temporal.io/sdk/workflow" @@ -27,5 +28,17 @@ func (i Input) run(ctx workflow.Context, db *bun.DB) error { Exec(context.Background()); dbErr != nil { workflow.GetLogger(ctx).Error("error updating instance into database", "error", dbErr) } + + err = workflow.ExecuteActivity( + workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + }), + SendWorkflowTerminationEventActivity, + instance, + ).Get(ctx, nil) + if err != nil { + return err + } + return err } diff --git a/ee/orchestration/internal/workflow/manager_test.go b/ee/orchestration/internal/workflow/manager_test.go index 1c3135a0e5..02f87f75bf 100644 --- a/ee/orchestration/internal/workflow/manager_test.go +++ b/ee/orchestration/internal/workflow/manager_test.go @@ -4,6 +4,8 @@ import ( "context" "testing" + "github.com/stretchr/testify/mock" + "github.com/formancehq/stack/libs/go-libs/logging" "github.com/formancehq/stack/libs/go-libs/bun/bunconnect" @@ -46,6 +48,7 @@ func TestConfig(t *testing.T) { workflows := NewWorkflows(db) env.RegisterWorkflow(stages.RunNoOp) env.RegisterWorkflow(workflows.Run) + env.OnActivity((&Activities{}).SendWorkflowTerminationEvent, mock.Anything, mock.Anything).Return(nil) mockClient := &mockTemporalClient{env: env, workflows: workflows} manager := NewManager(db, mockClient, "default") diff --git a/ee/orchestration/internal/workflow/module.go b/ee/orchestration/internal/workflow/module.go index a2d8ded884..31430ba907 100644 --- a/ee/orchestration/internal/workflow/module.go +++ b/ee/orchestration/internal/workflow/module.go @@ -15,6 +15,7 @@ func NewModule(taskQueue string) fx.Option { }), fx.Provide(fx.Annotate(NewWorkflows, fx.ResultTags(`group:"workflows"`), fx.As(new(any)))), fx.Provide(fx.Annotate(activities.New, fx.ResultTags(`group:"activities"`), fx.As(new(any)))), + fx.Provide(fx.Annotate(NewActivities, fx.ResultTags(`group:"activities"`), fx.As(new(any)))), } for _, schema := range stages.All() { diff --git a/ee/orchestration/pkg/events/events.go b/ee/orchestration/pkg/events/events.go new file mode 100644 index 0000000000..4909d2f1e5 --- /dev/null +++ b/ee/orchestration/pkg/events/events.go @@ -0,0 +1,48 @@ +package events + +import ( + "context" + "time" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/formancehq/stack/libs/go-libs/publish" +) + +const ( + SucceededWorkflow = "SUCCEEDED_WORKFLOW" + FailedWorkflow = "FAILED_WORKFLOW" + SucceededTrigger = "SUCCEEDED_TRIGGER" + FailedTrigger = "FAILED_TRIGGER" +) + +type SucceededWorkflowPayload struct { + ID string `json:"id"` + InstanceID string `json:"instanceID"` +} + +type FailedWorkflowPayload struct { + ID string `json:"id"` + InstanceID string `json:"instanceID"` + Error string `json:"error"` +} + +type SucceededTriggerPayload struct { + ID string `json:"id"` + EventID string `json:"eventID"` +} + +type FailedTriggerPayload struct { + ID string `json:"id"` + EventID string `json:"eventID"` + Error string `json:"error"` +} + +func NewMessage(ctx context.Context, mtype string, payload any) *message.Message { + return publish.NewMessage(ctx, publish.EventMessage{ + Date: time.Now(), + App: "orchestration", + Version: "v2", + Type: mtype, + Payload: payload, + }) +} diff --git a/libs/events/services/orchestration/v2.0.0/FAILED_TRIGGER.yaml b/libs/events/services/orchestration/v2.0.0/FAILED_TRIGGER.yaml new file mode 100644 index 0000000000..7d2f8d3da7 --- /dev/null +++ b/libs/events/services/orchestration/v2.0.0/FAILED_TRIGGER.yaml @@ -0,0 +1,12 @@ +type: object +properties: + id: + type: string + eventID: + type: string + error: + type: string +required: + - id + - eventID + - error \ No newline at end of file diff --git a/libs/events/services/orchestration/v2.0.0/FAILED_WORKFLOW.yaml b/libs/events/services/orchestration/v2.0.0/FAILED_WORKFLOW.yaml new file mode 100644 index 0000000000..cd3fef0764 --- /dev/null +++ b/libs/events/services/orchestration/v2.0.0/FAILED_WORKFLOW.yaml @@ -0,0 +1,12 @@ +type: object +properties: + id: + type: string + instanceID: + type: string + error: + type: string +required: + - id + - instanceID + - error \ No newline at end of file diff --git a/libs/events/services/orchestration/v2.0.0/SUCCEEDED_TRIGGER.yaml b/libs/events/services/orchestration/v2.0.0/SUCCEEDED_TRIGGER.yaml new file mode 100644 index 0000000000..5345ac30f3 --- /dev/null +++ b/libs/events/services/orchestration/v2.0.0/SUCCEEDED_TRIGGER.yaml @@ -0,0 +1,9 @@ +type: object +properties: + id: + type: string + eventID: + type: string +required: + - id + - eventID \ No newline at end of file diff --git a/libs/events/services/orchestration/v2.0.0/SUCCEEDED_WORKFLOW.yaml b/libs/events/services/orchestration/v2.0.0/SUCCEEDED_WORKFLOW.yaml new file mode 100644 index 0000000000..3c47d3cb3f --- /dev/null +++ b/libs/events/services/orchestration/v2.0.0/SUCCEEDED_WORKFLOW.yaml @@ -0,0 +1,9 @@ +type: object +properties: + id: + type: string + instanceID: + type: string +required: + - id + - instanceID \ No newline at end of file diff --git a/libs/go-libs/publish/module.go b/libs/go-libs/publish/module.go index 7be3d900a7..b46145b2d0 100644 --- a/libs/go-libs/publish/module.go +++ b/libs/go-libs/publish/module.go @@ -6,6 +6,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" "go.uber.org/fx" + "sync" ) func newGoChannel() *gochannel.GoChannel { @@ -95,3 +96,49 @@ func NewTopicMapperPublisherDecorator(publisher message.Publisher, topics map[st topics: topics, } } + +type noOpPublisher struct { +} + +func (n noOpPublisher) Publish(topic string, messages ...*message.Message) error { + return nil +} + +func (n noOpPublisher) Close() error { + return nil +} + +var NoOpPublisher message.Publisher = &noOpPublisher{} + +type memoryPublisher struct { + sync.Mutex + messages map[string][]*message.Message +} + +func (m *memoryPublisher) Publish(topic string, messages ...*message.Message) error { + m.Lock() + defer m.Unlock() + + m.messages[topic] = append(m.messages[topic], messages...) + return nil +} + +func (m *memoryPublisher) Close() error { + m.Lock() + defer m.Unlock() + + m.messages = map[string][]*message.Message{} + return nil +} + +func (m *memoryPublisher) AllMessages() map[string][]*message.Message { + return m.messages +} + +var _ message.Publisher = (*memoryPublisher)(nil) + +func InMemory() *memoryPublisher { + return &memoryPublisher{ + messages: map[string][]*message.Message{}, + } +} diff --git a/tests/integration/internal/events.go b/tests/integration/internal/events.go index 2ff43b6151..ecb94cd630 100644 --- a/tests/integration/internal/events.go +++ b/tests/integration/internal/events.go @@ -14,9 +14,9 @@ func NatsClient() *nats.Conn { return natsConn } -func SubscribeLedger() (func(), chan *nats.Msg) { +func Subscribe(service string) (func(), chan *nats.Msg) { msgs := make(chan *nats.Msg) - subscription, err := NatsClient().Subscribe(fmt.Sprintf("%s-ledger", currentTest.id), func(msg *nats.Msg) { + subscription, err := NatsClient().Subscribe(fmt.Sprintf("%s-%s", currentTest.id, service), func(msg *nats.Msg) { msgs <- msg }) Expect(err).ToNot(HaveOccurred()) @@ -25,15 +25,16 @@ func SubscribeLedger() (func(), chan *nats.Msg) { }, msgs } +func SubscribeLedger() (func(), chan *nats.Msg) { + return Subscribe("ledger") +} + func SubscribePayments() (func(), chan *nats.Msg) { - msgs := make(chan *nats.Msg) - subscription, err := NatsClient().Subscribe(fmt.Sprintf("%s-payments", currentTest.id), func(msg *nats.Msg) { - msgs <- msg - }) - Expect(err).ToNot(HaveOccurred()) - return func() { - Expect(subscription.Unsubscribe()).To(Succeed()) - }, msgs + return Subscribe("payments") +} + +func SubscribeOrchestration() (func(), chan *nats.Msg) { + return Subscribe("orchestration") } func PublishPayments(message publish.EventMessage) { diff --git a/tests/integration/internal/helpers.go b/tests/integration/internal/helpers.go index 480749fc93..c401b4a9eb 100644 --- a/tests/integration/internal/helpers.go +++ b/tests/integration/internal/helpers.go @@ -42,7 +42,7 @@ func WaitOnChanWithTimeout[T any](ch chan T, timeout time.Duration) T { case t := <-ch: return t case <-time.After(timeout): - Fail("should have received an event") + Fail("should have received an event", 1) } panic("cannot happen") } diff --git a/tests/integration/internal/modules/ledger.go b/tests/integration/internal/modules/ledger.go index 2f2df9eb92..b4c983b9b4 100644 --- a/tests/integration/internal/modules/ledger.go +++ b/tests/integration/internal/modules/ledger.go @@ -14,8 +14,8 @@ var Ledger = internal.NewModule("ledger"). WithArgs(func(test *internal.Test) []string { return []string{ "serve", - "--publisher-nats-enabled", "--auth-enabled=false", + "--publisher-nats-enabled", "--publisher-nats-client-id=ledger", "--publisher-nats-url=" + internal.GetNatsAddress(), fmt.Sprintf("--publisher-topic-mapping=*:%s-ledger", test.ID()), diff --git a/tests/integration/internal/modules/orchestration.go b/tests/integration/internal/modules/orchestration.go index 22d3b7bd43..515f54e8d3 100644 --- a/tests/integration/internal/modules/orchestration.go +++ b/tests/integration/internal/modules/orchestration.go @@ -24,8 +24,9 @@ var Orchestration = internal.NewModule("orchestration"). "--temporal-task-queue=" + test.ID(), "--worker", "--publisher-nats-enabled", - "--publisher-nats-client-id=ledger", + "--publisher-nats-client-id=orchestration", "--publisher-nats-url=" + internal.GetNatsAddress(), + fmt.Sprintf("--publisher-topic-mapping=*:%s-orchestration", test.ID()), fmt.Sprintf("--topics=%s-ledger", test.ID()), fmt.Sprintf("--topics=%s-payments", test.ID()), "--debug", diff --git a/tests/integration/suite/orchestration-triggers.go b/tests/integration/suite/orchestration-triggers.go index 6f47e1c0f0..f8e14cece6 100644 --- a/tests/integration/suite/orchestration-triggers.go +++ b/tests/integration/suite/orchestration-triggers.go @@ -3,12 +3,15 @@ package suite import ( "github.com/formancehq/formance-sdk-go/v2/pkg/models/operations" "github.com/formancehq/formance-sdk-go/v2/pkg/models/shared" + orchestrationevents "github.com/formancehq/orchestration/pkg/events" paymentsevents "github.com/formancehq/payments/pkg/events" + "github.com/formancehq/stack/libs/events" "github.com/formancehq/stack/libs/go-libs/api" "github.com/formancehq/stack/libs/go-libs/publish" . "github.com/formancehq/stack/tests/integration/internal" "github.com/formancehq/stack/tests/integration/internal/modules" "github.com/google/uuid" + "github.com/nats-io/nats.go" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "math/big" @@ -86,7 +89,10 @@ var _ = WithModules([]*Module{modules.Auth, modules.Orchestration, modules.Ledge Expect(listTriggersResponse.V2ListTriggersResponse.Cursor.Data).Should(HaveLen(1)) }) Then("publishing a new payments in the event bus", func() { - var payment map[string]any + var ( + payment map[string]any + msgs chan *nats.Msg + ) BeforeEach(func() { payment = map[string]any{ "amount": 1000000, @@ -105,6 +111,11 @@ var _ = WithModules([]*Module{modules.Auth, modules.Orchestration, modules.Ledge Type: paymentsevents.EventTypeSavedPayments, Payload: payment, }) + var closeSubscription func() + closeSubscription, msgs = SubscribeOrchestration() + DeferCleanup(func() { + closeSubscription() + }) }) It("Should trigger the workflow", func() { var ( @@ -150,6 +161,11 @@ var _ = WithModules([]*Module{modules.Auth, modules.Orchestration, modules.Ledge To(Equal("USD/2")) Expect(listTransactionsResponse.V2TransactionsCursorResponse.Cursor.Data[0].Postings[0].Amount). To(Equal(big.NewInt(1000000))) + + By("And trigger a new succeeded workflow event", func() { + msg := WaitOnChanWithTimeout(msgs, time.Second) + Expect(events.Check(msg.Data, "orchestration", orchestrationevents.SucceededTrigger)).Should(Succeed()) + }) }) }) Then("deleting the trigger", func() { @@ -197,7 +213,10 @@ var _ = WithModules([]*Module{modules.Auth, modules.Orchestration}, func() { Expect(err).ToNot(HaveOccurred()) }) Then("publishing a new empty payment", func() { - var payment map[string]any + var ( + payment map[string]any + msgs chan *nats.Msg + ) BeforeEach(func() { payment = map[string]any{ "id": uuid.NewString(), @@ -208,8 +227,13 @@ var _ = WithModules([]*Module{modules.Auth, modules.Orchestration}, func() { Type: paymentsevents.EventTypeSavedPayments, Payload: payment, }) + var closeSubscription func() + closeSubscription, msgs = SubscribeOrchestration() + DeferCleanup(func() { + closeSubscription() + }) }) - It("Should create a trigger workflow", func() { + It("Should create a trigger workflow with an error", func() { var ( listTriggersOccurrencesResponse *operations.V2ListTriggersOccurrencesResponse err error @@ -225,6 +249,11 @@ var _ = WithModules([]*Module{modules.Auth, modules.Orchestration}, func() { g.Expect(occurrence.WorkflowInstance).To(BeNil()) return true }).Should(BeTrue()) + + By("should also trigger a new failed workflow event", func() { + msg := WaitOnChanWithTimeout(msgs, time.Second) + Expect(events.Check(msg.Data, "orchestration", orchestrationevents.FailedTrigger)).Should(Succeed()) + }) }) }) }) diff --git a/tests/integration/suite/orchestration-workflows-execute-with-error.go b/tests/integration/suite/orchestration-workflows-execute-with-error.go index a7b606d883..6df89419fe 100644 --- a/tests/integration/suite/orchestration-workflows-execute-with-error.go +++ b/tests/integration/suite/orchestration-workflows-execute-with-error.go @@ -1,9 +1,13 @@ package suite import ( + orchestrationevents "github.com/formancehq/orchestration/pkg/events" + "github.com/formancehq/stack/libs/events" "github.com/formancehq/stack/tests/integration/internal/modules" + "github.com/nats-io/nats.go" "math/big" "net/http" + "time" "github.com/formancehq/formance-sdk-go/v2/pkg/models/operations" "github.com/formancehq/formance-sdk-go/v2/pkg/models/shared" @@ -129,3 +133,98 @@ var _ = WithModules([]*Module{modules.Auth, modules.Orchestration, modules.Ledge }) }) }) + +var _ = WithModules([]*Module{modules.Auth, modules.Orchestration, modules.Ledger}, func() { + BeforeEach(func() { + createLedgerResponse, err := Client().Ledger.V2CreateLedger(TestContext(), operations.V2CreateLedgerRequest{ + Ledger: "default", + }) + Expect(err).To(BeNil()) + Expect(createLedgerResponse.StatusCode).To(Equal(http.StatusNoContent)) + }) + When("creating a new workflow which will fail with invalid request", func() { + var ( + createWorkflowResponse *shared.V2CreateWorkflowResponse + ) + BeforeEach(func() { + response, err := Client().Orchestration.V2CreateWorkflow( + TestContext(), + &shared.V2CreateWorkflowRequest{ + Name: ptr(uuid.New()), + Stages: []map[string]interface{}{ + { + "send": map[string]any{ + "source": map[string]any{ + "account": map[string]any{ + "id": "empty:account", + "ledger": "default", + }, + }, + "destination": map[string]any{ + "account": map[string]any{ + "id": "bank", + "ledger": "default", + }, + }, + "amount": map[string]any{ + "amount": -1, // Invalid amount + "asset": "EUR/2", + }, + }, + }, + }, + }, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(response.StatusCode).To(Equal(201)) + + createWorkflowResponse = response.V2CreateWorkflowResponse + }) + Then("executing it", func() { + var ( + runWorkflowResponse *shared.V2RunWorkflowResponse + msgs chan *nats.Msg + ) + BeforeEach(func() { + response, err := Client().Orchestration.V2RunWorkflow( + TestContext(), + operations.V2RunWorkflowRequest{ + RequestBody: map[string]string{}, + WorkflowID: createWorkflowResponse.Data.ID, + }, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(response.StatusCode).To(Equal(201)) + + runWorkflowResponse = response.V2RunWorkflowResponse + + var closeSubscription func() + closeSubscription, msgs = SubscribeOrchestration() + DeferCleanup(func() { + closeSubscription() + }) + }) + It("should declare the workflow run instance as errored", func() { + Eventually(func(g Gomega) string { + response, err := Client().Orchestration.V2GetInstanceStageHistory( + TestContext(), + operations.V2GetInstanceStageHistoryRequest{ + InstanceID: runWorkflowResponse.Data.ID, + Number: 0, + }, + ) + g.Expect(err).To(BeNil()) + g.Expect(response.StatusCode).To(Equal(200)) + g.Expect(response.V2GetWorkflowInstanceHistoryStageResponse.Data[0].Error).NotTo(BeNil()) + + return *response.V2GetWorkflowInstanceHistoryStageResponse.Data[0].Error + }).ShouldNot(BeEmpty()) + + By("and trigger a failed workflow event", func() { + msg := WaitOnChanWithTimeout(msgs, time.Second) + Expect(events.Check(msg.Data, "orchestration", orchestrationevents.FailedWorkflow)).Should(Succeed()) + }) + }) + }) + }) +}) diff --git a/tests/integration/suite/orchestration-workflows-execute.go b/tests/integration/suite/orchestration-workflows-execute.go index cabef81c1e..c0068563c7 100644 --- a/tests/integration/suite/orchestration-workflows-execute.go +++ b/tests/integration/suite/orchestration-workflows-execute.go @@ -2,7 +2,10 @@ package suite import ( "encoding/json" + orchestrationevents "github.com/formancehq/orchestration/pkg/events" + "github.com/formancehq/stack/libs/events" "github.com/formancehq/stack/tests/integration/internal/modules" + "github.com/nats-io/nats.go" "math/big" "time" @@ -19,6 +22,7 @@ var _ = WithModules([]*Module{modules.Orchestration, modules.Auth, modules.Ledge When("creating a new workflow", func() { var ( createWorkflowResponse *shared.V2CreateWorkflowResponse + msgs chan *nats.Msg ) BeforeEach(func() { response, err := Client().Orchestration.V2CreateWorkflow( @@ -56,6 +60,12 @@ var _ = WithModules([]*Module{modules.Orchestration, modules.Auth, modules.Ledge Expect(response.StatusCode).To(Equal(201)) createWorkflowResponse = response.V2CreateWorkflowResponse + + var closeSubscription func() + closeSubscription, msgs = SubscribeOrchestration() + DeferCleanup(func() { + closeSubscription() + }) }) It("should be ok", func() { Expect(createWorkflowResponse.Data.ID).NotTo(BeEmpty()) @@ -80,6 +90,10 @@ var _ = WithModules([]*Module{modules.Orchestration, modules.Auth, modules.Ledge It("should be ok", func() { Expect(runWorkflowResponse.Data.ID).NotTo(BeEmpty()) }) + It("Should trigger a succeeded workflow event", func() { + msg := WaitOnChanWithTimeout(msgs, 5*time.Second) + Expect(events.Check(msg.Data, "orchestration", orchestrationevents.SucceededWorkflow)).Should(Succeed()) + }) Then("waiting for termination", func() { var instanceResponse *shared.V2GetWorkflowInstanceResponse BeforeEach(func() {