From 8be7136d67163b7189c9a12ed6a8c53bd2730762 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Mon, 26 Feb 2024 13:59:06 +0100 Subject: [PATCH] fix(orchestration): stop trigger workflows when variables evaluation fails --- .../fctl/cmd/wallets/balances/create.go | 3 +- .../internal/storage/migrations.go | 11 ++++ .../internal/triggers/activity.go | 45 +++++++------ ee/orchestration/internal/triggers/trigger.go | 16 ++--- .../internal/triggers/workflow_trigger.go | 8 ++- ee/orchestration/openapi/v2.yaml | 4 +- releases/sdks/go/.speakeasy/gen.lock | 6 +- .../pkg/models/shared/v2triggeroccurrence.md | 15 +++-- releases/sdks/go/formance.go | 4 +- releases/sdks/go/gen.yaml | 2 +- .../pkg/models/shared/v2triggeroccurrence.go | 20 ++++-- .../suite/orchestration-triggers.go | 64 ++++++++++++++++++- 12 files changed, 148 insertions(+), 50 deletions(-) diff --git a/components/fctl/cmd/wallets/balances/create.go b/components/fctl/cmd/wallets/balances/create.go index 8b11bf02ce..32e6913ee6 100644 --- a/components/fctl/cmd/wallets/balances/create.go +++ b/components/fctl/cmd/wallets/balances/create.go @@ -2,6 +2,8 @@ package balances import ( "fmt" + "math/big" + "github.com/formancehq/fctl/cmd/wallets/internal" fctl "github.com/formancehq/fctl/pkg" "github.com/formancehq/formance-sdk-go/v2/pkg/models/operations" @@ -9,7 +11,6 @@ import ( "github.com/pkg/errors" "github.com/pterm/pterm" "github.com/spf13/cobra" - "math/big" ) type CreateStore struct { diff --git a/ee/orchestration/internal/storage/migrations.go b/ee/orchestration/internal/storage/migrations.go index 466fefaacc..a49b12f47f 100644 --- a/ee/orchestration/internal/storage/migrations.go +++ b/ee/orchestration/internal/storage/migrations.go @@ -116,6 +116,17 @@ add primary key (instance_id, stage, temporal_run_id); return nil }, }, + { + Up: func(tx bun.Tx) error { + if _, err := tx.Exec(` +alter table "triggers_occurrences" +add column error varchar; + `); err != nil { + return err + } + return nil + }, + }, } func Migrate(ctx context.Context, db *bun.DB) error { diff --git a/ee/orchestration/internal/triggers/activity.go b/ee/orchestration/internal/triggers/activity.go index ee868e6ced..303b05f8e4 100644 --- a/ee/orchestration/internal/triggers/activity.go +++ b/ee/orchestration/internal/triggers/activity.go @@ -5,6 +5,8 @@ import ( "encoding/json" "strings" + "go.temporal.io/sdk/temporal" + "github.com/formancehq/orchestration/internal/workflow" "github.com/formancehq/stack/libs/go-libs/collectionutils" "github.com/formancehq/stack/libs/go-libs/pointer" @@ -74,35 +76,42 @@ func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request span := trace.SpanFromContext(ctx) var ( - evaluated map[string]string - err error + evaluated map[string]string + triggerError error + occurrence = NewTriggerOccurrence(request.MessageID, trigger.ID, request.Event) ) if trigger.Vars != nil { - evaluated, err = a.expressionEvaluator.evalVariables(request.Event.Payload, trigger.Vars) - if err != nil { - span.RecordError(err) - return err - } + evaluated, triggerError = a.expressionEvaluator.evalVariables(request.Event.Payload, trigger.Vars) } + if triggerError == nil { + data, triggerError := json.Marshal(evaluated) + if triggerError != nil { + panic(triggerError) + } - data, err := json.Marshal(evaluated) - if err != nil { - panic(err) - } + span.SetAttributes(attribute.String("variables", string(data))) - span.SetAttributes(attribute.String("variables", string(data))) + instance, triggerError := a.manager.RunWorkflow(ctx, trigger.WorkflowID, evaluated) + if triggerError != nil { + return triggerError + } - instance, err := a.manager.RunWorkflow(ctx, trigger.WorkflowID, evaluated) - if err != nil { - return err + occurrence.WorkflowInstanceID = pointer.For(instance.ID) + } else { + triggerError = temporal.NewNonRetryableApplicationError("unable to eval variables", "VARIABLES_EVAL", triggerError) + span.RecordError(triggerError) + occurrence.Error = pointer.For(triggerError.Error()) } - _, err = a.db.NewInsert(). - Model(pointer.For(NewTriggerOccurrence(request.MessageID, trigger.ID, instance.ID, request.Event))). + _, err := a.db.NewInsert(). + Model(pointer.For(occurrence)). On("CONFLICT (trigger_id, event_id) DO NOTHING"). Exec(ctx) + if err != nil { + return err + } - return err + return triggerError } func NewActivities(db *bun.DB, manager *workflow.WorkflowManager, expressionEvaluator *expressionEvaluator) Activities { diff --git a/ee/orchestration/internal/triggers/trigger.go b/ee/orchestration/internal/triggers/trigger.go index 5c219aca57..ff5ab4b06c 100644 --- a/ee/orchestration/internal/triggers/trigger.go +++ b/ee/orchestration/internal/triggers/trigger.go @@ -102,18 +102,18 @@ type Occurrence struct { EventID string `json:"-" bun:"event_id,pk"` TriggerID string `json:"triggerID" bun:"trigger_id,pk"` - WorkflowInstanceID string `json:"workflowInstanceID" bun:"workflow_instance_id"` - WorkflowInstance workflow.Instance `json:"workflowInstance" bun:"rel:belongs-to,join:workflow_instance_id=id"` + WorkflowInstanceID *string `json:"workflowInstanceID,omitempty" bun:"workflow_instance_id"` + WorkflowInstance *workflow.Instance `json:"workflowInstance,omitempty" bun:"rel:belongs-to,join:workflow_instance_id=id"` Date time.Time `json:"date" bun:"date"` Event publish.EventMessage `json:"event" bun:"event"` + Error *string `json:"error,omitempty" bun:"error"` } -func NewTriggerOccurrence(eventID, triggerID, workflowInstanceID string, event publish.EventMessage) Occurrence { +func NewTriggerOccurrence(eventID, triggerID string, event publish.EventMessage) Occurrence { return Occurrence{ - TriggerID: triggerID, - EventID: eventID, - WorkflowInstanceID: workflowInstanceID, - Date: time.Now().Round(time.Microsecond).UTC(), - Event: event, + TriggerID: triggerID, + EventID: eventID, + Date: time.Now().Round(time.Microsecond).UTC(), + Event: event, } } diff --git a/ee/orchestration/internal/triggers/workflow_trigger.go b/ee/orchestration/internal/triggers/workflow_trigger.go index 6efeb566e2..1415bb6381 100644 --- a/ee/orchestration/internal/triggers/workflow_trigger.go +++ b/ee/orchestration/internal/triggers/workflow_trigger.go @@ -3,6 +3,9 @@ package triggers import ( "time" + "github.com/pkg/errors" + "go.temporal.io/sdk/temporal" + "github.com/formancehq/stack/libs/go-libs/publish" "github.com/uptrace/bun" temporalworkflow "go.temporal.io/sdk/workflow" @@ -42,7 +45,10 @@ func (w triggerWorkflow) RunTrigger(ctx temporalworkflow.Context, req ProcessEve req, ).Get(ctx, nil) if err != nil { - return err + applicationError := &temporal.ApplicationError{} + if !errors.As(err, &applicationError) { + return err + } } } diff --git a/ee/orchestration/openapi/v2.yaml b/ee/orchestration/openapi/v2.yaml index 91b7f9cfdd..0b3fadf862 100644 --- a/ee/orchestration/openapi/v2.yaml +++ b/ee/orchestration/openapi/v2.yaml @@ -659,8 +659,6 @@ components: type: object required: - triggerID - - workflowInstanceID - - workflowInstance - date - event properties: @@ -673,6 +671,8 @@ components: $ref: '#/components/schemas/V2WorkflowInstance' triggerID: type: string + error: + type: string event: type: object additionalProperties: true diff --git a/releases/sdks/go/.speakeasy/gen.lock b/releases/sdks/go/.speakeasy/gen.lock index 6ba2b00d63..8eea180b0d 100755 --- a/releases/sdks/go/.speakeasy/gen.lock +++ b/releases/sdks/go/.speakeasy/gen.lock @@ -1,12 +1,12 @@ lockVersion: 2.0.0 id: 7eac0a45-60a2-40bb-9e85-26bd77ec2a6d management: - docChecksum: c333ef5c7396d3980e1a5806c46d1862 + docChecksum: c77094cb6296aa1c7e4452b8f69787ba docVersion: v0.0.0 speakeasyVersion: internal generationVersion: 2.237.2 - releaseVersion: v0.0.0 - configChecksum: 50130ac1de5d0825ab31d45fbe3aac35 + releaseVersion: 0.0.1 + configChecksum: c299152255ec1caf1aad2b6fbca6bdc9 features: go: constsAndDefaults: 0.1.2 diff --git a/releases/sdks/go/docs/pkg/models/shared/v2triggeroccurrence.md b/releases/sdks/go/docs/pkg/models/shared/v2triggeroccurrence.md index ba4882bc28..d596f4a82d 100644 --- a/releases/sdks/go/docs/pkg/models/shared/v2triggeroccurrence.md +++ b/releases/sdks/go/docs/pkg/models/shared/v2triggeroccurrence.md @@ -3,10 +3,11 @@ ## Fields -| Field | Type | Required | Description | -| ----------------------------------------------------------------------------- | ----------------------------------------------------------------------------- | ----------------------------------------------------------------------------- | ----------------------------------------------------------------------------- | -| `Date` | [time.Time](https://pkg.go.dev/time#Time) | :heavy_check_mark: | N/A | -| `Event` | map[string]*interface{}* | :heavy_check_mark: | N/A | -| `TriggerID` | *string* | :heavy_check_mark: | N/A | -| `WorkflowInstance` | [shared.V2WorkflowInstance](../../../pkg/models/shared/v2workflowinstance.md) | :heavy_check_mark: | N/A | -| `WorkflowInstanceID` | *string* | :heavy_check_mark: | N/A | \ No newline at end of file +| Field | Type | Required | Description | +| ------------------------------------------------------------------------------ | ------------------------------------------------------------------------------ | ------------------------------------------------------------------------------ | ------------------------------------------------------------------------------ | +| `Date` | [time.Time](https://pkg.go.dev/time#Time) | :heavy_check_mark: | N/A | +| `Error` | **string* | :heavy_minus_sign: | N/A | +| `Event` | map[string]*interface{}* | :heavy_check_mark: | N/A | +| `TriggerID` | *string* | :heavy_check_mark: | N/A | +| `WorkflowInstance` | [*shared.V2WorkflowInstance](../../../pkg/models/shared/v2workflowinstance.md) | :heavy_minus_sign: | N/A | +| `WorkflowInstanceID` | **string* | :heavy_minus_sign: | N/A | \ No newline at end of file diff --git a/releases/sdks/go/formance.go b/releases/sdks/go/formance.go index 40eeed20e3..bf6ba94588 100644 --- a/releases/sdks/go/formance.go +++ b/releases/sdks/go/formance.go @@ -164,9 +164,9 @@ func New(opts ...SDKOption) *Formance { sdkConfiguration: sdkConfiguration{ Language: "go", OpenAPIDocVersion: "v0.0.0", - SDKVersion: "v0.0.0", + SDKVersion: "0.0.1", GenVersion: "2.237.2", - UserAgent: "speakeasy-sdk/go v0.0.0 2.237.2 v0.0.0 github.com/formancehq/formance-sdk-go/v2", + UserAgent: "speakeasy-sdk/go 0.0.1 2.237.2 v0.0.0 github.com/formancehq/formance-sdk-go/v2", }, } for _, opt := range opts { diff --git a/releases/sdks/go/gen.yaml b/releases/sdks/go/gen.yaml index ba820a470a..59af297471 100755 --- a/releases/sdks/go/gen.yaml +++ b/releases/sdks/go/gen.yaml @@ -7,7 +7,7 @@ generation: nameResolutionDec2023: false telemetryEnabled: false go: - version: v0.0.0 + version: 0.0.1 author: Formance clientServerStatusCodesAsErrors: true flattenGlobalSecurity: false diff --git a/releases/sdks/go/pkg/models/shared/v2triggeroccurrence.go b/releases/sdks/go/pkg/models/shared/v2triggeroccurrence.go index 5d58d2f862..e164f19da1 100644 --- a/releases/sdks/go/pkg/models/shared/v2triggeroccurrence.go +++ b/releases/sdks/go/pkg/models/shared/v2triggeroccurrence.go @@ -9,10 +9,11 @@ import ( type V2TriggerOccurrence struct { Date time.Time `json:"date"` + Error *string `json:"error,omitempty"` Event map[string]interface{} `json:"event"` TriggerID string `json:"triggerID"` - WorkflowInstance V2WorkflowInstance `json:"workflowInstance"` - WorkflowInstanceID string `json:"workflowInstanceID"` + WorkflowInstance *V2WorkflowInstance `json:"workflowInstance,omitempty"` + WorkflowInstanceID *string `json:"workflowInstanceID,omitempty"` } func (v V2TriggerOccurrence) MarshalJSON() ([]byte, error) { @@ -33,6 +34,13 @@ func (o *V2TriggerOccurrence) GetDate() time.Time { return o.Date } +func (o *V2TriggerOccurrence) GetError() *string { + if o == nil { + return nil + } + return o.Error +} + func (o *V2TriggerOccurrence) GetEvent() map[string]interface{} { if o == nil { return map[string]interface{}{} @@ -47,16 +55,16 @@ func (o *V2TriggerOccurrence) GetTriggerID() string { return o.TriggerID } -func (o *V2TriggerOccurrence) GetWorkflowInstance() V2WorkflowInstance { +func (o *V2TriggerOccurrence) GetWorkflowInstance() *V2WorkflowInstance { if o == nil { - return V2WorkflowInstance{} + return nil } return o.WorkflowInstance } -func (o *V2TriggerOccurrence) GetWorkflowInstanceID() string { +func (o *V2TriggerOccurrence) GetWorkflowInstanceID() *string { if o == nil { - return "" + return nil } return o.WorkflowInstanceID } diff --git a/tests/integration/suite/orchestration-triggers.go b/tests/integration/suite/orchestration-triggers.go index 36c82a9c71..6f47e1c0f0 100644 --- a/tests/integration/suite/orchestration-triggers.go +++ b/tests/integration/suite/orchestration-triggers.go @@ -127,7 +127,7 @@ var _ = WithModules([]*Module{modules.Auth, modules.Orchestration, modules.Ledge var getInstanceResponse *operations.V2GetInstanceResponse Eventually(func() bool { getInstanceResponse, err = Client().Orchestration.V2GetInstance(TestContext(), operations.V2GetInstanceRequest{ - InstanceID: listTriggersOccurrencesResponse.V2ListTriggersOccurrencesResponse.Cursor.Data[0].WorkflowInstanceID, + InstanceID: *listTriggersOccurrencesResponse.V2ListTriggersOccurrencesResponse.Cursor.Data[0].WorkflowInstanceID, }) Expect(err).To(BeNil()) @@ -167,3 +167,65 @@ var _ = WithModules([]*Module{modules.Auth, modules.Orchestration, modules.Ledge }) }) }) + +var _ = WithModules([]*Module{modules.Auth, modules.Orchestration}, func() { + When("creating a new workflow with a delay of 5s and a trigger on payments creation", func() { + var ( + createTriggerResponse *operations.CreateTriggerResponse + ) + BeforeEach(func() { + response, err := Client().Orchestration.CreateWorkflow( + TestContext(), + &shared.CreateWorkflowRequest{ + Name: ptr(uuid.NewString()), + Stages: []map[string]interface{}{{"delay": map[string]any{"duration": "5s"}}}, + }, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(response.StatusCode).To(Equal(201)) + + createTriggerResponse, err = Client().Orchestration.CreateTrigger( + TestContext(), + &shared.TriggerData{ + Event: paymentsevents.EventTypeSavedPayments, + WorkflowID: response.CreateWorkflowResponse.Data.ID, + Vars: map[string]any{ + "fail": `link(event, "unknown").name`, + }, + }, + ) + Expect(err).ToNot(HaveOccurred()) + }) + Then("publishing a new empty payment", func() { + var payment map[string]any + BeforeEach(func() { + payment = map[string]any{ + "id": uuid.NewString(), + } + PublishPayments(publish.EventMessage{ + Date: time.Now(), + App: "payments", + Type: paymentsevents.EventTypeSavedPayments, + Payload: payment, + }) + }) + It("Should create a trigger workflow", func() { + var ( + listTriggersOccurrencesResponse *operations.V2ListTriggersOccurrencesResponse + err error + ) + Eventually(func(g Gomega) bool { + listTriggersOccurrencesResponse, err = Client().Orchestration.V2ListTriggersOccurrences(TestContext(), operations.V2ListTriggersOccurrencesRequest{ + TriggerID: createTriggerResponse.CreateTriggerResponse.Data.ID, + }) + g.Expect(err).To(BeNil()) + g.Expect(listTriggersOccurrencesResponse.V2ListTriggersOccurrencesResponse.Cursor.Data).NotTo(BeEmpty()) + occurrence := listTriggersOccurrencesResponse.V2ListTriggersOccurrencesResponse.Cursor.Data[0] + g.Expect(occurrence.WorkflowInstanceID).To(BeNil()) + g.Expect(occurrence.WorkflowInstance).To(BeNil()) + return true + }).Should(BeTrue()) + }) + }) + }) +})