Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(orchestration): add some events #1286

Merged
merged 4 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions components/ledger/libs/publish/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"go.uber.org/fx"
"sync"
)

func newGoChannel() *gochannel.GoChannel {
Expand Down Expand Up @@ -95,3 +96,49 @@ func NewTopicMapperPublisherDecorator(publisher message.Publisher, topics map[st
topics: topics,
}
}

type noOpPublisher struct {
}

func (n noOpPublisher) Publish(topic string, messages ...*message.Message) error {
return nil
}

func (n noOpPublisher) Close() error {
return nil
}

var NoOpPublisher message.Publisher = &noOpPublisher{}

type memoryPublisher struct {
sync.Mutex
messages map[string][]*message.Message
}

func (m *memoryPublisher) Publish(topic string, messages ...*message.Message) error {
m.Lock()
defer m.Unlock()

m.messages[topic] = append(m.messages[topic], messages...)
return nil
}

func (m *memoryPublisher) Close() error {
m.Lock()
defer m.Unlock()

m.messages = map[string][]*message.Message{}
return nil
}

func (m *memoryPublisher) AllMessages() map[string][]*message.Message {
return m.messages
}

var _ message.Publisher = (*memoryPublisher)(nil)

func InMemory() *memoryPublisher {
return &memoryPublisher{
messages: map[string][]*message.Message{},
}
}
31 changes: 27 additions & 4 deletions ee/orchestration/internal/triggers/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"strings"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/formancehq/orchestration/pkg/events"
sharedlogging "github.com/formancehq/stack/libs/go-libs/logging"

"go.temporal.io/sdk/temporal"
Expand All @@ -21,6 +23,7 @@ type Activities struct {
db *bun.DB
manager *workflow.WorkflowManager
expressionEvaluator *expressionEvaluator
publisher message.Publisher
}

func (a Activities) processTrigger(ctx context.Context, request ProcessEventRequest, trigger Trigger) bool {
Expand Down Expand Up @@ -74,7 +77,7 @@ func (a Activities) ListTriggers(ctx context.Context, request ProcessEventReques
return ret, nil
}

func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request ProcessEventRequest) error {
func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request ProcessEventRequest) (*Occurrence, error) {

span := trace.SpanFromContext(ctx)
var (
Expand All @@ -95,7 +98,7 @@ func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request

instance, triggerError := a.manager.RunWorkflow(ctx, trigger.WorkflowID, evaluated)
if triggerError != nil {
return triggerError
return nil, triggerError
}

occurrence.WorkflowInstanceID = pointer.For(instance.ID)
Expand All @@ -113,16 +116,36 @@ func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request
sharedlogging.FromContext(ctx).Errorf("unable to save trigger occurrence: %s", err)
}

return triggerError
return &occurrence, nil
}

func NewActivities(db *bun.DB, manager *workflow.WorkflowManager, expressionEvaluator *expressionEvaluator) Activities {
func (a Activities) SendEventForTriggerTermination(ctx context.Context, occurrence Occurrence) error {
if occurrence.Error == nil || *occurrence.Error == "" {
return a.publisher.Publish(events.SucceededTrigger,
events.NewMessage(ctx, events.SucceededTrigger, events.SucceededTriggerPayload{
ID: occurrence.TriggerID,
EventID: occurrence.EventID,
}))
} else {
return a.publisher.Publish(events.FailedTrigger,
events.NewMessage(ctx, events.FailedTrigger, events.FailedTriggerPayload{
ID: occurrence.TriggerID,
Error: *occurrence.Error,
EventID: occurrence.EventID,
}))
}
}

func NewActivities(db *bun.DB, manager *workflow.WorkflowManager,
expressionEvaluator *expressionEvaluator, publisher message.Publisher) Activities {
return Activities{
db: db,
manager: manager,
expressionEvaluator: expressionEvaluator,
publisher: publisher,
}
}

var ProcessEventActivity = Activities{}.ProcessTrigger
var SendEventForTriggerTermination = Activities{}.SendEventForTriggerTermination
var ListTriggersActivity = Activities{}.ListTriggers
5 changes: 3 additions & 2 deletions ee/orchestration/internal/triggers/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ func NewModule(taskQueue string) fx.Option {
}, fx.As(new(any)), fx.ResultTags(`group:"workflows"`)),
),
fx.Provide(
fx.Annotate(func(db *bun.DB, manager *workflow.WorkflowManager, expressionEvaluator *expressionEvaluator) Activities {
return NewActivities(db, manager, expressionEvaluator)
fx.Annotate(func(db *bun.DB, manager *workflow.WorkflowManager,
expressionEvaluator *expressionEvaluator, publisher message.Publisher) Activities {
return NewActivities(db, manager, expressionEvaluator, publisher)
}, fx.As(new(any)), fx.ResultTags(`group:"activities"`)),
),
)
Expand Down
20 changes: 13 additions & 7 deletions ee/orchestration/internal/triggers/workflow_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package triggers
import (
"time"

"github.com/pkg/errors"
"go.temporal.io/sdk/temporal"

"github.com/formancehq/stack/libs/go-libs/publish"
"github.com/uptrace/bun"
temporalworkflow "go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -36,19 +33,28 @@ func (w triggerWorkflow) RunTrigger(ctx temporalworkflow.Context, req ProcessEve
}

for _, trigger := range triggers {
occurrence := &Occurrence{}
err := temporalworkflow.ExecuteActivity(
temporalworkflow.WithActivityOptions(ctx, temporalworkflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}),
ProcessEventActivity,
trigger,
req,
).Get(ctx, occurrence)
if err != nil {
return err
}

err = temporalworkflow.ExecuteActivity(
temporalworkflow.WithActivityOptions(ctx, temporalworkflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}),
SendEventForTriggerTermination,
occurrence,
).Get(ctx, nil)
if err != nil {
applicationError := &temporal.ApplicationError{}
if !errors.As(err, &applicationError) {
return err
}
return err
}
}

Expand Down
4 changes: 4 additions & 0 deletions ee/orchestration/internal/triggers/workflow_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func TestWorkflow(t *testing.T) {
env.
OnActivity(ProcessEventActivity, mock.Anything, trigger, req).
Once().
Return(&Occurrence{}, nil)
env.
OnActivity(SendEventForTriggerTermination, mock.Anything, mock.Anything).
Once().
Return(nil)

env.ExecuteWorkflow(RunTrigger, req)
Expand Down
37 changes: 37 additions & 0 deletions ee/orchestration/internal/workflow/activities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package workflow

import (
"context"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/formancehq/orchestration/pkg/events"
)

type Activities struct {
publisher message.Publisher
}

func (a Activities) SendWorkflowTerminationEvent(ctx context.Context, instance Instance) error {
if instance.Error == "" {
return a.publisher.Publish(events.SucceededWorkflow,
events.NewMessage(ctx, events.SucceededWorkflow, events.SucceededWorkflowPayload{
ID: instance.WorkflowID,
InstanceID: instance.ID,
}))
} else {
return a.publisher.Publish(events.FailedWorkflow,
events.NewMessage(ctx, events.FailedWorkflow, events.FailedWorkflowPayload{
ID: instance.WorkflowID,
InstanceID: instance.ID,
Error: instance.Error,
}))
}
}

var SendWorkflowTerminationEventActivity = (&Activities{}).SendWorkflowTerminationEvent

func NewActivities(publisher message.Publisher) Activities {
return Activities{
publisher: publisher,
}
}
21 changes: 21 additions & 0 deletions ee/orchestration/internal/workflow/activities_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package workflow

import (
"testing"

"github.com/formancehq/stack/libs/go-libs/publish"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)

func TestActivities(t *testing.T) {
publisher := publish.InMemory()
activities := NewActivities(publisher)

testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestActivityEnvironment()
env.RegisterActivity(activities.SendWorkflowTerminationEvent)
_, err := env.ExecuteActivity(SendWorkflowTerminationEventActivity, NewInstance("xxx"))
require.NoError(t, err)
require.NotEmpty(t, publisher.AllMessages())
}
13 changes: 13 additions & 0 deletions ee/orchestration/internal/workflow/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package workflow

import (
"context"
"time"

"github.com/uptrace/bun"
"go.temporal.io/sdk/workflow"
Expand All @@ -27,5 +28,17 @@ func (i Input) run(ctx workflow.Context, db *bun.DB) error {
Exec(context.Background()); dbErr != nil {
workflow.GetLogger(ctx).Error("error updating instance into database", "error", dbErr)
}

err = workflow.ExecuteActivity(
workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}),
SendWorkflowTerminationEventActivity,
instance,
).Get(ctx, nil)
if err != nil {
return err
}

return err
}
3 changes: 3 additions & 0 deletions ee/orchestration/internal/workflow/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"github.com/stretchr/testify/mock"

"github.com/formancehq/stack/libs/go-libs/logging"

"github.com/formancehq/stack/libs/go-libs/bun/bunconnect"
Expand Down Expand Up @@ -46,6 +48,7 @@ func TestConfig(t *testing.T) {
workflows := NewWorkflows(db)
env.RegisterWorkflow(stages.RunNoOp)
env.RegisterWorkflow(workflows.Run)
env.OnActivity((&Activities{}).SendWorkflowTerminationEvent, mock.Anything, mock.Anything).Return(nil)
mockClient := &mockTemporalClient{env: env, workflows: workflows}
manager := NewManager(db, mockClient, "default")

Expand Down
1 change: 1 addition & 0 deletions ee/orchestration/internal/workflow/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func NewModule(taskQueue string) fx.Option {
}),
fx.Provide(fx.Annotate(NewWorkflows, fx.ResultTags(`group:"workflows"`), fx.As(new(any)))),
fx.Provide(fx.Annotate(activities.New, fx.ResultTags(`group:"activities"`), fx.As(new(any)))),
fx.Provide(fx.Annotate(NewActivities, fx.ResultTags(`group:"activities"`), fx.As(new(any)))),
}

for _, schema := range stages.All() {
Expand Down
48 changes: 48 additions & 0 deletions ee/orchestration/pkg/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package events

import (
"context"
"time"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/formancehq/stack/libs/go-libs/publish"
)

const (
SucceededWorkflow = "SUCCEEDED_WORKFLOW"
FailedWorkflow = "FAILED_WORKFLOW"
SucceededTrigger = "SUCCEEDED_TRIGGER"
FailedTrigger = "FAILED_TRIGGER"
)

type SucceededWorkflowPayload struct {
ID string `json:"id"`
InstanceID string `json:"instanceID"`
}

type FailedWorkflowPayload struct {
ID string `json:"id"`
InstanceID string `json:"instanceID"`
Error string `json:"error"`
}

type SucceededTriggerPayload struct {
ID string `json:"id"`
EventID string `json:"eventID"`
}

type FailedTriggerPayload struct {
ID string `json:"id"`
EventID string `json:"eventID"`
Error string `json:"error"`
}

func NewMessage(ctx context.Context, mtype string, payload any) *message.Message {
return publish.NewMessage(ctx, publish.EventMessage{
Date: time.Now(),
App: "orchestration",
Version: "v2",
Type: mtype,
Payload: payload,
})
}
12 changes: 12 additions & 0 deletions libs/events/services/orchestration/v2.0.0/FAILED_TRIGGER.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
type: object
properties:
id:
type: string
eventID:
type: string
error:
type: string
required:
- id
- eventID
- error
12 changes: 12 additions & 0 deletions libs/events/services/orchestration/v2.0.0/FAILED_WORKFLOW.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
type: object
properties:
id:
type: string
instanceID:
type: string
error:
type: string
required:
- id
- instanceID
- error
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
type: object
properties:
id:
type: string
eventID:
type: string
required:
- id
- eventID
Loading
Loading