Skip to content

Commit

Permalink
fix(orchestration): stop trigger workflows when variables evaluation …
Browse files Browse the repository at this point in the history
…fails
  • Loading branch information
gfyrag committed Feb 26, 2024
1 parent 693044a commit 8be7136
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 50 deletions.
3 changes: 2 additions & 1 deletion components/fctl/cmd/wallets/balances/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package balances

import (
"fmt"
"math/big"

"github.com/formancehq/fctl/cmd/wallets/internal"
fctl "github.com/formancehq/fctl/pkg"
"github.com/formancehq/formance-sdk-go/v2/pkg/models/operations"
"github.com/formancehq/formance-sdk-go/v2/pkg/models/shared"
"github.com/pkg/errors"
"github.com/pterm/pterm"
"github.com/spf13/cobra"
"math/big"
)

type CreateStore struct {
Expand Down
11 changes: 11 additions & 0 deletions ee/orchestration/internal/storage/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ add primary key (instance_id, stage, temporal_run_id);
return nil
},
},
{
Up: func(tx bun.Tx) error {
if _, err := tx.Exec(`
alter table "triggers_occurrences"
add column error varchar;
`); err != nil {
return err
}
return nil
},
},
}

func Migrate(ctx context.Context, db *bun.DB) error {
Expand Down
45 changes: 27 additions & 18 deletions ee/orchestration/internal/triggers/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"strings"

"go.temporal.io/sdk/temporal"

"github.com/formancehq/orchestration/internal/workflow"
"github.com/formancehq/stack/libs/go-libs/collectionutils"
"github.com/formancehq/stack/libs/go-libs/pointer"
Expand Down Expand Up @@ -74,35 +76,42 @@ func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request

span := trace.SpanFromContext(ctx)
var (
evaluated map[string]string
err error
evaluated map[string]string
triggerError error
occurrence = NewTriggerOccurrence(request.MessageID, trigger.ID, request.Event)
)
if trigger.Vars != nil {
evaluated, err = a.expressionEvaluator.evalVariables(request.Event.Payload, trigger.Vars)
if err != nil {
span.RecordError(err)
return err
}
evaluated, triggerError = a.expressionEvaluator.evalVariables(request.Event.Payload, trigger.Vars)
}
if triggerError == nil {
data, triggerError := json.Marshal(evaluated)
if triggerError != nil {
panic(triggerError)
}

data, err := json.Marshal(evaluated)
if err != nil {
panic(err)
}
span.SetAttributes(attribute.String("variables", string(data)))

span.SetAttributes(attribute.String("variables", string(data)))
instance, triggerError := a.manager.RunWorkflow(ctx, trigger.WorkflowID, evaluated)
if triggerError != nil {
return triggerError
}

instance, err := a.manager.RunWorkflow(ctx, trigger.WorkflowID, evaluated)
if err != nil {
return err
occurrence.WorkflowInstanceID = pointer.For(instance.ID)
} else {
triggerError = temporal.NewNonRetryableApplicationError("unable to eval variables", "VARIABLES_EVAL", triggerError)
span.RecordError(triggerError)
occurrence.Error = pointer.For(triggerError.Error())
}

_, err = a.db.NewInsert().
Model(pointer.For(NewTriggerOccurrence(request.MessageID, trigger.ID, instance.ID, request.Event))).
_, err := a.db.NewInsert().
Model(pointer.For(occurrence)).
On("CONFLICT (trigger_id, event_id) DO NOTHING").
Exec(ctx)
if err != nil {
return err
}

return err
return triggerError
}

func NewActivities(db *bun.DB, manager *workflow.WorkflowManager, expressionEvaluator *expressionEvaluator) Activities {
Expand Down
16 changes: 8 additions & 8 deletions ee/orchestration/internal/triggers/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,18 @@ type Occurrence struct {

EventID string `json:"-" bun:"event_id,pk"`
TriggerID string `json:"triggerID" bun:"trigger_id,pk"`
WorkflowInstanceID string `json:"workflowInstanceID" bun:"workflow_instance_id"`
WorkflowInstance workflow.Instance `json:"workflowInstance" bun:"rel:belongs-to,join:workflow_instance_id=id"`
WorkflowInstanceID *string `json:"workflowInstanceID,omitempty" bun:"workflow_instance_id"`
WorkflowInstance *workflow.Instance `json:"workflowInstance,omitempty" bun:"rel:belongs-to,join:workflow_instance_id=id"`
Date time.Time `json:"date" bun:"date"`
Event publish.EventMessage `json:"event" bun:"event"`
Error *string `json:"error,omitempty" bun:"error"`
}

func NewTriggerOccurrence(eventID, triggerID, workflowInstanceID string, event publish.EventMessage) Occurrence {
func NewTriggerOccurrence(eventID, triggerID string, event publish.EventMessage) Occurrence {
return Occurrence{
TriggerID: triggerID,
EventID: eventID,
WorkflowInstanceID: workflowInstanceID,
Date: time.Now().Round(time.Microsecond).UTC(),
Event: event,
TriggerID: triggerID,
EventID: eventID,
Date: time.Now().Round(time.Microsecond).UTC(),
Event: event,
}
}
8 changes: 7 additions & 1 deletion ee/orchestration/internal/triggers/workflow_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package triggers
import (
"time"

"github.com/pkg/errors"
"go.temporal.io/sdk/temporal"

"github.com/formancehq/stack/libs/go-libs/publish"
"github.com/uptrace/bun"
temporalworkflow "go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -42,7 +45,10 @@ func (w triggerWorkflow) RunTrigger(ctx temporalworkflow.Context, req ProcessEve
req,
).Get(ctx, nil)
if err != nil {
return err
applicationError := &temporal.ApplicationError{}
if !errors.As(err, &applicationError) {
return err
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions ee/orchestration/openapi/v2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,6 @@ components:
type: object
required:
- triggerID
- workflowInstanceID
- workflowInstance
- date
- event
properties:
Expand All @@ -673,6 +671,8 @@ components:
$ref: '#/components/schemas/V2WorkflowInstance'
triggerID:
type: string
error:
type: string
event:
type: object
additionalProperties: true
Expand Down
6 changes: 3 additions & 3 deletions releases/sdks/go/.speakeasy/gen.lock
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
lockVersion: 2.0.0
id: 7eac0a45-60a2-40bb-9e85-26bd77ec2a6d
management:
docChecksum: c333ef5c7396d3980e1a5806c46d1862
docChecksum: c77094cb6296aa1c7e4452b8f69787ba
docVersion: v0.0.0
speakeasyVersion: internal
generationVersion: 2.237.2
releaseVersion: v0.0.0
configChecksum: 50130ac1de5d0825ab31d45fbe3aac35
releaseVersion: 0.0.1
configChecksum: c299152255ec1caf1aad2b6fbca6bdc9
features:
go:
constsAndDefaults: 0.1.2
Expand Down
15 changes: 8 additions & 7 deletions releases/sdks/go/docs/pkg/models/shared/v2triggeroccurrence.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

## Fields

| Field | Type | Required | Description |
| ----------------------------------------------------------------------------- | ----------------------------------------------------------------------------- | ----------------------------------------------------------------------------- | ----------------------------------------------------------------------------- |
| `Date` | [time.Time](https://pkg.go.dev/time#Time) | :heavy_check_mark: | N/A |
| `Event` | map[string]*interface{}* | :heavy_check_mark: | N/A |
| `TriggerID` | *string* | :heavy_check_mark: | N/A |
| `WorkflowInstance` | [shared.V2WorkflowInstance](../../../pkg/models/shared/v2workflowinstance.md) | :heavy_check_mark: | N/A |
| `WorkflowInstanceID` | *string* | :heavy_check_mark: | N/A |
| Field | Type | Required | Description |
| ------------------------------------------------------------------------------ | ------------------------------------------------------------------------------ | ------------------------------------------------------------------------------ | ------------------------------------------------------------------------------ |
| `Date` | [time.Time](https://pkg.go.dev/time#Time) | :heavy_check_mark: | N/A |
| `Error` | **string* | :heavy_minus_sign: | N/A |
| `Event` | map[string]*interface{}* | :heavy_check_mark: | N/A |
| `TriggerID` | *string* | :heavy_check_mark: | N/A |
| `WorkflowInstance` | [*shared.V2WorkflowInstance](../../../pkg/models/shared/v2workflowinstance.md) | :heavy_minus_sign: | N/A |
| `WorkflowInstanceID` | **string* | :heavy_minus_sign: | N/A |
4 changes: 2 additions & 2 deletions releases/sdks/go/formance.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ func New(opts ...SDKOption) *Formance {
sdkConfiguration: sdkConfiguration{
Language: "go",
OpenAPIDocVersion: "v0.0.0",
SDKVersion: "v0.0.0",
SDKVersion: "0.0.1",
GenVersion: "2.237.2",
UserAgent: "speakeasy-sdk/go v0.0.0 2.237.2 v0.0.0 github.com/formancehq/formance-sdk-go/v2",
UserAgent: "speakeasy-sdk/go 0.0.1 2.237.2 v0.0.0 github.com/formancehq/formance-sdk-go/v2",
},
}
for _, opt := range opts {
Expand Down
2 changes: 1 addition & 1 deletion releases/sdks/go/gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ generation:
nameResolutionDec2023: false
telemetryEnabled: false
go:
version: v0.0.0
version: 0.0.1
author: Formance
clientServerStatusCodesAsErrors: true
flattenGlobalSecurity: false
Expand Down
20 changes: 14 additions & 6 deletions releases/sdks/go/pkg/models/shared/v2triggeroccurrence.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (

type V2TriggerOccurrence struct {
Date time.Time `json:"date"`
Error *string `json:"error,omitempty"`
Event map[string]interface{} `json:"event"`
TriggerID string `json:"triggerID"`
WorkflowInstance V2WorkflowInstance `json:"workflowInstance"`
WorkflowInstanceID string `json:"workflowInstanceID"`
WorkflowInstance *V2WorkflowInstance `json:"workflowInstance,omitempty"`
WorkflowInstanceID *string `json:"workflowInstanceID,omitempty"`
}

func (v V2TriggerOccurrence) MarshalJSON() ([]byte, error) {
Expand All @@ -33,6 +34,13 @@ func (o *V2TriggerOccurrence) GetDate() time.Time {
return o.Date
}

func (o *V2TriggerOccurrence) GetError() *string {
if o == nil {
return nil
}
return o.Error
}

func (o *V2TriggerOccurrence) GetEvent() map[string]interface{} {
if o == nil {
return map[string]interface{}{}
Expand All @@ -47,16 +55,16 @@ func (o *V2TriggerOccurrence) GetTriggerID() string {
return o.TriggerID
}

func (o *V2TriggerOccurrence) GetWorkflowInstance() V2WorkflowInstance {
func (o *V2TriggerOccurrence) GetWorkflowInstance() *V2WorkflowInstance {
if o == nil {
return V2WorkflowInstance{}
return nil
}
return o.WorkflowInstance
}

func (o *V2TriggerOccurrence) GetWorkflowInstanceID() string {
func (o *V2TriggerOccurrence) GetWorkflowInstanceID() *string {
if o == nil {
return ""
return nil
}
return o.WorkflowInstanceID
}
64 changes: 63 additions & 1 deletion tests/integration/suite/orchestration-triggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ var _ = WithModules([]*Module{modules.Auth, modules.Orchestration, modules.Ledge
var getInstanceResponse *operations.V2GetInstanceResponse
Eventually(func() bool {
getInstanceResponse, err = Client().Orchestration.V2GetInstance(TestContext(), operations.V2GetInstanceRequest{
InstanceID: listTriggersOccurrencesResponse.V2ListTriggersOccurrencesResponse.Cursor.Data[0].WorkflowInstanceID,
InstanceID: *listTriggersOccurrencesResponse.V2ListTriggersOccurrencesResponse.Cursor.Data[0].WorkflowInstanceID,
})
Expect(err).To(BeNil())

Expand Down Expand Up @@ -167,3 +167,65 @@ var _ = WithModules([]*Module{modules.Auth, modules.Orchestration, modules.Ledge
})
})
})

var _ = WithModules([]*Module{modules.Auth, modules.Orchestration}, func() {
When("creating a new workflow with a delay of 5s and a trigger on payments creation", func() {
var (
createTriggerResponse *operations.CreateTriggerResponse
)
BeforeEach(func() {
response, err := Client().Orchestration.CreateWorkflow(
TestContext(),
&shared.CreateWorkflowRequest{
Name: ptr(uuid.NewString()),
Stages: []map[string]interface{}{{"delay": map[string]any{"duration": "5s"}}},
},
)
Expect(err).ToNot(HaveOccurred())
Expect(response.StatusCode).To(Equal(201))

createTriggerResponse, err = Client().Orchestration.CreateTrigger(
TestContext(),
&shared.TriggerData{
Event: paymentsevents.EventTypeSavedPayments,
WorkflowID: response.CreateWorkflowResponse.Data.ID,
Vars: map[string]any{
"fail": `link(event, "unknown").name`,
},
},
)
Expect(err).ToNot(HaveOccurred())
})
Then("publishing a new empty payment", func() {
var payment map[string]any
BeforeEach(func() {
payment = map[string]any{
"id": uuid.NewString(),
}
PublishPayments(publish.EventMessage{
Date: time.Now(),
App: "payments",
Type: paymentsevents.EventTypeSavedPayments,
Payload: payment,
})
})
It("Should create a trigger workflow", func() {
var (
listTriggersOccurrencesResponse *operations.V2ListTriggersOccurrencesResponse
err error
)
Eventually(func(g Gomega) bool {
listTriggersOccurrencesResponse, err = Client().Orchestration.V2ListTriggersOccurrences(TestContext(), operations.V2ListTriggersOccurrencesRequest{
TriggerID: createTriggerResponse.CreateTriggerResponse.Data.ID,
})
g.Expect(err).To(BeNil())
g.Expect(listTriggersOccurrencesResponse.V2ListTriggersOccurrencesResponse.Cursor.Data).NotTo(BeEmpty())
occurrence := listTriggersOccurrencesResponse.V2ListTriggersOccurrencesResponse.Cursor.Data[0]
g.Expect(occurrence.WorkflowInstanceID).To(BeNil())
g.Expect(occurrence.WorkflowInstance).To(BeNil())
return true
}).Should(BeTrue())
})
})
})
})

0 comments on commit 8be7136

Please sign in to comment.