Skip to content

Commit

Permalink
feat(orchestration): add events
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Feb 27, 2024
1 parent 169f4d4 commit 3580f9c
Show file tree
Hide file tree
Showing 26 changed files with 472 additions and 33 deletions.
47 changes: 47 additions & 0 deletions components/ledger/libs/publish/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"go.uber.org/fx"
"sync"
)

func newGoChannel() *gochannel.GoChannel {
Expand Down Expand Up @@ -95,3 +96,49 @@ func NewTopicMapperPublisherDecorator(publisher message.Publisher, topics map[st
topics: topics,
}
}

type noOpPublisher struct {
}

func (n noOpPublisher) Publish(topic string, messages ...*message.Message) error {
return nil
}

func (n noOpPublisher) Close() error {
return nil
}

var NoOpPublisher message.Publisher = &noOpPublisher{}

type memoryPublisher struct {
sync.Mutex
messages map[string][]*message.Message
}

func (m *memoryPublisher) Publish(topic string, messages ...*message.Message) error {
m.Lock()
defer m.Unlock()

m.messages[topic] = append(m.messages[topic], messages...)
return nil
}

func (m *memoryPublisher) Close() error {
m.Lock()
defer m.Unlock()

m.messages = map[string][]*message.Message{}
return nil
}

func (m *memoryPublisher) AllMessages() map[string][]*message.Message {
return m.messages
}

var _ message.Publisher = (*memoryPublisher)(nil)

func InMemory() *memoryPublisher {
return &memoryPublisher{
messages: map[string][]*message.Message{},
}
}
1 change: 0 additions & 1 deletion ee/orchestration/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func NewRootCommand() *cobra.Command {
auth.InitAuthFlags(cmd.PersistentFlags())
bunconnect.InitFlags(cmd.PersistentFlags())
iam.InitFlags(cmd.PersistentFlags())
publish.InitCLIFlags(cmd)

return cmd
}
Expand Down
2 changes: 0 additions & 2 deletions ee/orchestration/cmd/worker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cmd

import (
"github.com/formancehq/stack/libs/go-libs/publish"
"net/http"

"github.com/formancehq/orchestration/internal/triggers"
Expand Down Expand Up @@ -34,7 +33,6 @@ func workerOptions() fx.Option {
viper.GetString(temporalTaskQueueFlag),
viper.GetStringSlice(topicsFlag),
),
publish.CLIPublisherModule("orchestration"),
)
}

Expand Down
2 changes: 1 addition & 1 deletion ee/orchestration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ toolchain go1.21.5

require (
github.com/ThreeDotsLabs/watermill v1.3.5
github.com/davecgh/go-spew v1.1.1
github.com/expr-lang/expr v1.15.6
github.com/formancehq/formance-sdk-go/v2 v2.0.0-00010101000000-000000000000
github.com/formancehq/stack/libs/go-libs v0.0.0-20230221161632-e6dc6a89a85e
Expand Down Expand Up @@ -55,7 +56,6 @@ require (
github.com/aws/smithy-go v1.19.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/containerd/continuity v0.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dnwe/otelsarama v0.0.0-20231212173111-631a0a53d5d4 // indirect
github.com/docker/cli v20.10.17+incompatible // indirect
github.com/docker/docker v24.0.7+incompatible // indirect
Expand Down
36 changes: 32 additions & 4 deletions ee/orchestration/internal/triggers/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package triggers
import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/davecgh/go-spew/spew"
"github.com/formancehq/orchestration/pkg/events"
sharedlogging "github.com/formancehq/stack/libs/go-libs/logging"

"go.temporal.io/sdk/temporal"
Expand All @@ -21,6 +25,7 @@ type Activities struct {
db *bun.DB
manager *workflow.WorkflowManager
expressionEvaluator *expressionEvaluator
publisher message.Publisher
}

func (a Activities) processTrigger(ctx context.Context, request ProcessEventRequest, trigger Trigger) bool {
Expand Down Expand Up @@ -74,7 +79,7 @@ func (a Activities) ListTriggers(ctx context.Context, request ProcessEventReques
return ret, nil
}

func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request ProcessEventRequest) error {
func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request ProcessEventRequest) (*Occurrence, error) {

span := trace.SpanFromContext(ctx)
var (
Expand All @@ -95,7 +100,7 @@ func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request

instance, triggerError := a.manager.RunWorkflow(ctx, trigger.WorkflowID, evaluated)
if triggerError != nil {
return triggerError
return nil, triggerError
}

occurrence.WorkflowInstanceID = pointer.For(instance.ID)
Expand All @@ -113,16 +118,39 @@ func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request
sharedlogging.FromContext(ctx).Errorf("unable to save trigger occurrence: %s", err)
}

return triggerError
return &occurrence, nil
}

func NewActivities(db *bun.DB, manager *workflow.WorkflowManager, expressionEvaluator *expressionEvaluator) Activities {
func (a Activities) SendEventForTriggerTermination(ctx context.Context, occurrence Occurrence) error {
spew.Dump(occurrence)
if occurrence.Error == nil || *occurrence.Error == "" {
fmt.Println("send succeeded")
return a.publisher.Publish(events.SucceededTrigger,
events.NewMessage(ctx, events.SucceededTrigger, events.SucceededTriggerPayload{
ID: occurrence.TriggerID,
EventID: occurrence.EventID,
}))
} else {
fmt.Println("send failure")
return a.publisher.Publish(events.FailedTrigger,
events.NewMessage(ctx, events.FailedTrigger, events.FailedTriggerPayload{
ID: occurrence.TriggerID,
Error: *occurrence.Error,
EventID: occurrence.EventID,
}))
}
}

func NewActivities(db *bun.DB, manager *workflow.WorkflowManager,
expressionEvaluator *expressionEvaluator, publisher message.Publisher) Activities {
return Activities{
db: db,
manager: manager,
expressionEvaluator: expressionEvaluator,
publisher: publisher,
}
}

var ProcessEventActivity = Activities{}.ProcessTrigger
var SendEventForTriggerTermination = Activities{}.SendEventForTriggerTermination
var ListTriggersActivity = Activities{}.ListTriggers
5 changes: 3 additions & 2 deletions ee/orchestration/internal/triggers/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ func NewModule(taskQueue string) fx.Option {
}, fx.As(new(any)), fx.ResultTags(`group:"workflows"`)),
),
fx.Provide(
fx.Annotate(func(db *bun.DB, manager *workflow.WorkflowManager, expressionEvaluator *expressionEvaluator) Activities {
return NewActivities(db, manager, expressionEvaluator)
fx.Annotate(func(db *bun.DB, manager *workflow.WorkflowManager,
expressionEvaluator *expressionEvaluator, publisher message.Publisher) Activities {
return NewActivities(db, manager, expressionEvaluator, publisher)
}, fx.As(new(any)), fx.ResultTags(`group:"activities"`)),
),
)
Expand Down
20 changes: 13 additions & 7 deletions ee/orchestration/internal/triggers/workflow_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package triggers
import (
"time"

"github.com/pkg/errors"
"go.temporal.io/sdk/temporal"

"github.com/formancehq/stack/libs/go-libs/publish"
"github.com/uptrace/bun"
temporalworkflow "go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -36,19 +33,28 @@ func (w triggerWorkflow) RunTrigger(ctx temporalworkflow.Context, req ProcessEve
}

for _, trigger := range triggers {
occurrence := &Occurrence{}
err := temporalworkflow.ExecuteActivity(
temporalworkflow.WithActivityOptions(ctx, temporalworkflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}),
ProcessEventActivity,
trigger,
req,
).Get(ctx, occurrence)
if err != nil {
return err
}

err = temporalworkflow.ExecuteActivity(
temporalworkflow.WithActivityOptions(ctx, temporalworkflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}),
SendEventForTriggerTermination,
occurrence,
).Get(ctx, nil)
if err != nil {
applicationError := &temporal.ApplicationError{}
if !errors.As(err, &applicationError) {
return err
}
return err
}
}

Expand Down
4 changes: 4 additions & 0 deletions ee/orchestration/internal/triggers/workflow_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func TestWorkflow(t *testing.T) {
env.
OnActivity(ProcessEventActivity, mock.Anything, trigger, req).
Once().
Return(&Occurrence{}, nil)
env.
OnActivity(SendEventForTriggerTermination, mock.Anything, mock.Anything).
Once().
Return(nil)

env.ExecuteWorkflow(RunTrigger, req)
Expand Down
37 changes: 37 additions & 0 deletions ee/orchestration/internal/workflow/activities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package workflow

import (
"context"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/formancehq/orchestration/pkg/events"
)

type Activities struct {
publisher message.Publisher
}

func (a Activities) SendWorkflowTerminationEvent(ctx context.Context, instance Instance) error {
if instance.Error == "" {
return a.publisher.Publish(events.SucceededWorkflow,
events.NewMessage(ctx, events.SucceededWorkflow, events.SucceededWorkflowPayload{
ID: instance.WorkflowID,
InstanceID: instance.ID,
}))
} else {
return a.publisher.Publish(events.FailedWorkflow,
events.NewMessage(ctx, events.FailedWorkflow, events.FailedWorkflowPayload{
ID: instance.WorkflowID,
InstanceID: instance.ID,
Error: instance.Error,
}))
}
}

var SendWorkflowTerminationEventActivity = (&Activities{}).SendWorkflowTerminationEvent

func NewActivities(publisher message.Publisher) Activities {
return Activities{
publisher: publisher,
}
}
21 changes: 21 additions & 0 deletions ee/orchestration/internal/workflow/activities_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package workflow

import (
"testing"

"github.com/formancehq/stack/libs/go-libs/publish"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)

func TestActivities(t *testing.T) {
publisher := publish.InMemory()
activities := NewActivities(publisher)

testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestActivityEnvironment()
env.RegisterActivity(activities.SendWorkflowTerminationEvent)
_, err := env.ExecuteActivity(SendWorkflowTerminationEventActivity, NewInstance("xxx"))
require.NoError(t, err)
require.NotEmpty(t, publisher.AllMessages())
}
13 changes: 13 additions & 0 deletions ee/orchestration/internal/workflow/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package workflow

import (
"context"
"time"

"github.com/uptrace/bun"
"go.temporal.io/sdk/workflow"
Expand All @@ -27,5 +28,17 @@ func (i Input) run(ctx workflow.Context, db *bun.DB) error {
Exec(context.Background()); dbErr != nil {
workflow.GetLogger(ctx).Error("error updating instance into database", "error", dbErr)
}

err = workflow.ExecuteActivity(
workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}),
SendWorkflowTerminationEventActivity,
instance,
).Get(ctx, nil)
if err != nil {
return err
}

return err
}
3 changes: 3 additions & 0 deletions ee/orchestration/internal/workflow/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"github.com/stretchr/testify/mock"

"github.com/formancehq/stack/libs/go-libs/logging"

"github.com/formancehq/stack/libs/go-libs/bun/bunconnect"
Expand Down Expand Up @@ -46,6 +48,7 @@ func TestConfig(t *testing.T) {
workflows := NewWorkflows(db)
env.RegisterWorkflow(stages.RunNoOp)
env.RegisterWorkflow(workflows.Run)
env.OnActivity((&Activities{}).SendWorkflowTerminationEvent, mock.Anything, mock.Anything).Return(nil)
mockClient := &mockTemporalClient{env: env, workflows: workflows}
manager := NewManager(db, mockClient, "default")

Expand Down
1 change: 1 addition & 0 deletions ee/orchestration/internal/workflow/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func NewModule(taskQueue string) fx.Option {
}),
fx.Provide(fx.Annotate(NewWorkflows, fx.ResultTags(`group:"workflows"`), fx.As(new(any)))),
fx.Provide(fx.Annotate(activities.New, fx.ResultTags(`group:"activities"`), fx.As(new(any)))),
fx.Provide(fx.Annotate(NewActivities, fx.ResultTags(`group:"activities"`), fx.As(new(any)))),
}

for _, schema := range stages.All() {
Expand Down
Loading

0 comments on commit 3580f9c

Please sign in to comment.