Skip to content

Commit

Permalink
feat(orchestration): add some events (#1286)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored Feb 27, 2024
1 parent 925e9ce commit 506f9a4
Show file tree
Hide file tree
Showing 23 changed files with 466 additions and 29 deletions.
47 changes: 47 additions & 0 deletions components/ledger/libs/publish/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"go.uber.org/fx"
"sync"
)

func newGoChannel() *gochannel.GoChannel {
Expand Down Expand Up @@ -95,3 +96,49 @@ func NewTopicMapperPublisherDecorator(publisher message.Publisher, topics map[st
topics: topics,
}
}

type noOpPublisher struct {
}

func (n noOpPublisher) Publish(topic string, messages ...*message.Message) error {
return nil
}

func (n noOpPublisher) Close() error {
return nil
}

var NoOpPublisher message.Publisher = &noOpPublisher{}

type memoryPublisher struct {
sync.Mutex
messages map[string][]*message.Message
}

func (m *memoryPublisher) Publish(topic string, messages ...*message.Message) error {
m.Lock()
defer m.Unlock()

m.messages[topic] = append(m.messages[topic], messages...)
return nil
}

func (m *memoryPublisher) Close() error {
m.Lock()
defer m.Unlock()

m.messages = map[string][]*message.Message{}
return nil
}

func (m *memoryPublisher) AllMessages() map[string][]*message.Message {
return m.messages
}

var _ message.Publisher = (*memoryPublisher)(nil)

func InMemory() *memoryPublisher {
return &memoryPublisher{
messages: map[string][]*message.Message{},
}
}
31 changes: 27 additions & 4 deletions ee/orchestration/internal/triggers/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"strings"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/formancehq/orchestration/pkg/events"
sharedlogging "github.com/formancehq/stack/libs/go-libs/logging"

"go.temporal.io/sdk/temporal"
Expand All @@ -21,6 +23,7 @@ type Activities struct {
db *bun.DB
manager *workflow.WorkflowManager
expressionEvaluator *expressionEvaluator
publisher message.Publisher
}

func (a Activities) processTrigger(ctx context.Context, request ProcessEventRequest, trigger Trigger) bool {
Expand Down Expand Up @@ -74,7 +77,7 @@ func (a Activities) ListTriggers(ctx context.Context, request ProcessEventReques
return ret, nil
}

func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request ProcessEventRequest) error {
func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request ProcessEventRequest) (*Occurrence, error) {

span := trace.SpanFromContext(ctx)
var (
Expand All @@ -95,7 +98,7 @@ func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request

instance, triggerError := a.manager.RunWorkflow(ctx, trigger.WorkflowID, evaluated)
if triggerError != nil {
return triggerError
return nil, triggerError
}

occurrence.WorkflowInstanceID = pointer.For(instance.ID)
Expand All @@ -113,16 +116,36 @@ func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request
sharedlogging.FromContext(ctx).Errorf("unable to save trigger occurrence: %s", err)
}

return triggerError
return &occurrence, nil
}

func NewActivities(db *bun.DB, manager *workflow.WorkflowManager, expressionEvaluator *expressionEvaluator) Activities {
func (a Activities) SendEventForTriggerTermination(ctx context.Context, occurrence Occurrence) error {
if occurrence.Error == nil || *occurrence.Error == "" {
return a.publisher.Publish(events.SucceededTrigger,
events.NewMessage(ctx, events.SucceededTrigger, events.SucceededTriggerPayload{
ID: occurrence.TriggerID,
EventID: occurrence.EventID,
}))
} else {
return a.publisher.Publish(events.FailedTrigger,
events.NewMessage(ctx, events.FailedTrigger, events.FailedTriggerPayload{
ID: occurrence.TriggerID,
Error: *occurrence.Error,
EventID: occurrence.EventID,
}))
}
}

func NewActivities(db *bun.DB, manager *workflow.WorkflowManager,
expressionEvaluator *expressionEvaluator, publisher message.Publisher) Activities {
return Activities{
db: db,
manager: manager,
expressionEvaluator: expressionEvaluator,
publisher: publisher,
}
}

var ProcessEventActivity = Activities{}.ProcessTrigger
var SendEventForTriggerTermination = Activities{}.SendEventForTriggerTermination
var ListTriggersActivity = Activities{}.ListTriggers
5 changes: 3 additions & 2 deletions ee/orchestration/internal/triggers/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ func NewModule(taskQueue string) fx.Option {
}, fx.As(new(any)), fx.ResultTags(`group:"workflows"`)),
),
fx.Provide(
fx.Annotate(func(db *bun.DB, manager *workflow.WorkflowManager, expressionEvaluator *expressionEvaluator) Activities {
return NewActivities(db, manager, expressionEvaluator)
fx.Annotate(func(db *bun.DB, manager *workflow.WorkflowManager,
expressionEvaluator *expressionEvaluator, publisher message.Publisher) Activities {
return NewActivities(db, manager, expressionEvaluator, publisher)
}, fx.As(new(any)), fx.ResultTags(`group:"activities"`)),
),
)
Expand Down
20 changes: 13 additions & 7 deletions ee/orchestration/internal/triggers/workflow_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package triggers
import (
"time"

"github.com/pkg/errors"
"go.temporal.io/sdk/temporal"

"github.com/formancehq/stack/libs/go-libs/publish"
"github.com/uptrace/bun"
temporalworkflow "go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -36,19 +33,28 @@ func (w triggerWorkflow) RunTrigger(ctx temporalworkflow.Context, req ProcessEve
}

for _, trigger := range triggers {
occurrence := &Occurrence{}
err := temporalworkflow.ExecuteActivity(
temporalworkflow.WithActivityOptions(ctx, temporalworkflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}),
ProcessEventActivity,
trigger,
req,
).Get(ctx, occurrence)
if err != nil {
return err
}

err = temporalworkflow.ExecuteActivity(
temporalworkflow.WithActivityOptions(ctx, temporalworkflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}),
SendEventForTriggerTermination,
occurrence,
).Get(ctx, nil)
if err != nil {
applicationError := &temporal.ApplicationError{}
if !errors.As(err, &applicationError) {
return err
}
return err
}
}

Expand Down
4 changes: 4 additions & 0 deletions ee/orchestration/internal/triggers/workflow_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func TestWorkflow(t *testing.T) {
env.
OnActivity(ProcessEventActivity, mock.Anything, trigger, req).
Once().
Return(&Occurrence{}, nil)
env.
OnActivity(SendEventForTriggerTermination, mock.Anything, mock.Anything).
Once().
Return(nil)

env.ExecuteWorkflow(RunTrigger, req)
Expand Down
37 changes: 37 additions & 0 deletions ee/orchestration/internal/workflow/activities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package workflow

import (
"context"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/formancehq/orchestration/pkg/events"
)

type Activities struct {
publisher message.Publisher
}

func (a Activities) SendWorkflowTerminationEvent(ctx context.Context, instance Instance) error {
if instance.Error == "" {
return a.publisher.Publish(events.SucceededWorkflow,
events.NewMessage(ctx, events.SucceededWorkflow, events.SucceededWorkflowPayload{
ID: instance.WorkflowID,
InstanceID: instance.ID,
}))
} else {
return a.publisher.Publish(events.FailedWorkflow,
events.NewMessage(ctx, events.FailedWorkflow, events.FailedWorkflowPayload{
ID: instance.WorkflowID,
InstanceID: instance.ID,
Error: instance.Error,
}))
}
}

var SendWorkflowTerminationEventActivity = (&Activities{}).SendWorkflowTerminationEvent

func NewActivities(publisher message.Publisher) Activities {
return Activities{
publisher: publisher,
}
}
21 changes: 21 additions & 0 deletions ee/orchestration/internal/workflow/activities_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package workflow

import (
"testing"

"github.com/formancehq/stack/libs/go-libs/publish"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)

func TestActivities(t *testing.T) {
publisher := publish.InMemory()
activities := NewActivities(publisher)

testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestActivityEnvironment()
env.RegisterActivity(activities.SendWorkflowTerminationEvent)
_, err := env.ExecuteActivity(SendWorkflowTerminationEventActivity, NewInstance("xxx"))
require.NoError(t, err)
require.NotEmpty(t, publisher.AllMessages())
}
13 changes: 13 additions & 0 deletions ee/orchestration/internal/workflow/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package workflow

import (
"context"
"time"

"github.com/uptrace/bun"
"go.temporal.io/sdk/workflow"
Expand All @@ -27,5 +28,17 @@ func (i Input) run(ctx workflow.Context, db *bun.DB) error {
Exec(context.Background()); dbErr != nil {
workflow.GetLogger(ctx).Error("error updating instance into database", "error", dbErr)
}

err = workflow.ExecuteActivity(
workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}),
SendWorkflowTerminationEventActivity,
instance,
).Get(ctx, nil)
if err != nil {
return err
}

return err
}
3 changes: 3 additions & 0 deletions ee/orchestration/internal/workflow/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"github.com/stretchr/testify/mock"

"github.com/formancehq/stack/libs/go-libs/logging"

"github.com/formancehq/stack/libs/go-libs/bun/bunconnect"
Expand Down Expand Up @@ -46,6 +48,7 @@ func TestConfig(t *testing.T) {
workflows := NewWorkflows(db)
env.RegisterWorkflow(stages.RunNoOp)
env.RegisterWorkflow(workflows.Run)
env.OnActivity((&Activities{}).SendWorkflowTerminationEvent, mock.Anything, mock.Anything).Return(nil)
mockClient := &mockTemporalClient{env: env, workflows: workflows}
manager := NewManager(db, mockClient, "default")

Expand Down
1 change: 1 addition & 0 deletions ee/orchestration/internal/workflow/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func NewModule(taskQueue string) fx.Option {
}),
fx.Provide(fx.Annotate(NewWorkflows, fx.ResultTags(`group:"workflows"`), fx.As(new(any)))),
fx.Provide(fx.Annotate(activities.New, fx.ResultTags(`group:"activities"`), fx.As(new(any)))),
fx.Provide(fx.Annotate(NewActivities, fx.ResultTags(`group:"activities"`), fx.As(new(any)))),
}

for _, schema := range stages.All() {
Expand Down
48 changes: 48 additions & 0 deletions ee/orchestration/pkg/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package events

import (
"context"
"time"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/formancehq/stack/libs/go-libs/publish"
)

const (
SucceededWorkflow = "SUCCEEDED_WORKFLOW"
FailedWorkflow = "FAILED_WORKFLOW"
SucceededTrigger = "SUCCEEDED_TRIGGER"
FailedTrigger = "FAILED_TRIGGER"
)

type SucceededWorkflowPayload struct {
ID string `json:"id"`
InstanceID string `json:"instanceID"`
}

type FailedWorkflowPayload struct {
ID string `json:"id"`
InstanceID string `json:"instanceID"`
Error string `json:"error"`
}

type SucceededTriggerPayload struct {
ID string `json:"id"`
EventID string `json:"eventID"`
}

type FailedTriggerPayload struct {
ID string `json:"id"`
EventID string `json:"eventID"`
Error string `json:"error"`
}

func NewMessage(ctx context.Context, mtype string, payload any) *message.Message {
return publish.NewMessage(ctx, publish.EventMessage{
Date: time.Now(),
App: "orchestration",
Version: "v2",
Type: mtype,
Payload: payload,
})
}
12 changes: 12 additions & 0 deletions libs/events/services/orchestration/v2.0.0/FAILED_TRIGGER.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
type: object
properties:
id:
type: string
eventID:
type: string
error:
type: string
required:
- id
- eventID
- error
12 changes: 12 additions & 0 deletions libs/events/services/orchestration/v2.0.0/FAILED_WORKFLOW.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
type: object
properties:
id:
type: string
instanceID:
type: string
error:
type: string
required:
- id
- instanceID
- error
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
type: object
properties:
id:
type: string
eventID:
type: string
required:
- id
- eventID
Loading

0 comments on commit 506f9a4

Please sign in to comment.