Skip to content

Commit

Permalink
fix(orchestration): stop trigger workflows when variables evaluation …
Browse files Browse the repository at this point in the history
…fails
  • Loading branch information
gfyrag committed Feb 26, 2024
1 parent 2801a06 commit 592e2d2
Show file tree
Hide file tree
Showing 17 changed files with 418 additions and 157 deletions.
16 changes: 12 additions & 4 deletions components/fctl/cmd/orchestration/triggers/occurrences/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,23 @@ func (c *OccurrencesListController) Render(cmd *cobra.Command, args []string) er
fctl.Map(c.store.WorkflowOccurrence,
func(src shared.TriggerOccurrence) []string {
return []string{
src.WorkflowInstanceID,
func() string {
if src.WorkflowInstanceID != nil {
return ""
}
return *src.WorkflowInstanceID
}(),
src.Date.Format(time.RFC3339),
fctl.BoolToString(src.WorkflowInstance.Terminated),
src.WorkflowInstance.TerminatedAt.Format(time.RFC3339),
func() string {
if src.WorkflowInstance.Error == nil {
return ""
if src.Error != nil && *src.Error != "" {
return *src.Error
}
if src.WorkflowInstance != nil && src.WorkflowInstance.Error != nil && *src.WorkflowInstance.Error != "" {
return *src.WorkflowInstance.Error
}
return *src.WorkflowInstance.Error
return ""
}(),
}
}),
Expand Down
3 changes: 3 additions & 0 deletions components/ledger/internal/posting.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (p *Postings) Scan(value interface{}) error {

func (p Postings) Validate() (int, error) {
for i, p := range p {
if p.Amount == nil {
return i, errors.New("no amount defined")
}
if p.Amount.Cmp(Zero) < 0 {
return i, errors.New("negative amount")
}
Expand Down
6 changes: 2 additions & 4 deletions components/ledger/libs/publish/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,14 @@ func InitCLIFlags(cmd *cobra.Command, options ...func(*ConfigDefault)) {
cmd.PersistentFlags().Int(PublisherKafkaSASLScramSHASizeFlag, values.PublisherKafkaSASLScramSHASize, "SASL SCRAM SHA size")
cmd.PersistentFlags().Bool(PublisherKafkaTLSEnabledFlag, values.PublisherKafkaTLSEnabled, "Enable TLS to connect on kafka")

// NATS
InitNatsCliFlags(cmd, options...)
InitNatsCLIFlags(cmd, options...)
}

func InitNatsCliFlags(cmd *cobra.Command, options ...func(*ConfigDefault)) {
func InitNatsCLIFlags(cmd *cobra.Command, options ...func(*ConfigDefault)) {
values := defaultConfigValues
for _, option := range options {
option(&values)
}

// NATS
cmd.PersistentFlags().Bool(PublisherNatsEnabledFlag, values.PublisherNatsEnabled, "Publish write events to nats")
cmd.PersistentFlags().String(PublisherNatsClientIDFlag, values.PublisherNatsClientID, "Nats client ID")
Expand Down
6 changes: 3 additions & 3 deletions components/ledger/libs/publish/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NatsModule(url, serviceName string, natsOptions ...nats.Option) fx.Option {
fx.Provide(NewNatsPublisherWithConn),
fx.Provide(NewNatsSubscriberWithConn),
fx.Provide(func(natsCallbacks NATSCallbacks) wNats.PublisherConfig {
natsOptions = AppendCallBacks(natsOptions, natsCallbacks)
natsOptions = AppendNatsCallBacks(natsOptions, natsCallbacks)
return wNats.PublisherConfig{
NatsOptions: natsOptions,
URL: url,
Expand All @@ -54,7 +54,7 @@ func NatsModule(url, serviceName string, natsOptions ...nats.Option) fx.Option {
}
}),
fx.Provide(func(natsCallbacks NATSCallbacks) wNats.SubscriberConfig {
natsOptions = AppendCallBacks(natsOptions, natsCallbacks)
natsOptions = AppendNatsCallBacks(natsOptions, natsCallbacks)
return wNats.SubscriberConfig{
NatsOptions: natsOptions,
Unmarshaler: &wNats.NATSMarshaler{},
Expand Down Expand Up @@ -88,7 +88,7 @@ type NATSCallbacks interface {
AsyncErrorCB(nc *nats.Conn, sub *nats.Subscription, err error)
}

func AppendCallBacks(natsOptions []nats.Option, c NATSCallbacks) []nats.Option {
func AppendNatsCallBacks(natsOptions []nats.Option, c NATSCallbacks) []nats.Option {
return append(natsOptions,
nats.ConnectHandler(c.ConnectedCB),
nats.DisconnectErrHandler(c.DisconnectedErrCB),
Expand Down
11 changes: 11 additions & 0 deletions ee/orchestration/internal/storage/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ add primary key (instance_id, stage, temporal_run_id);
return nil
},
},
{
Up: func(tx bun.Tx) error {
if _, err := tx.Exec(`
alter table "triggers_occurrences"
add column error varchar;
`); err != nil {
return err
}
return nil
},
},
}

func Migrate(ctx context.Context, db *bun.DB) error {
Expand Down
47 changes: 29 additions & 18 deletions ee/orchestration/internal/triggers/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"encoding/json"
"strings"

sharedlogging "github.com/formancehq/stack/libs/go-libs/logging"

"go.temporal.io/sdk/temporal"

"github.com/formancehq/orchestration/internal/workflow"
"github.com/formancehq/stack/libs/go-libs/collectionutils"
"github.com/formancehq/stack/libs/go-libs/pointer"
Expand Down Expand Up @@ -74,35 +78,42 @@ func (a Activities) ProcessTrigger(ctx context.Context, trigger Trigger, request

span := trace.SpanFromContext(ctx)
var (
evaluated map[string]string
err error
evaluated map[string]string
triggerError error
occurrence = NewTriggerOccurrence(request.MessageID, trigger.ID, request.Event)
)
if trigger.Vars != nil {
evaluated, err = a.expressionEvaluator.evalVariables(request.Event.Payload, trigger.Vars)
if err != nil {
span.RecordError(err)
return err
}
evaluated, triggerError = a.expressionEvaluator.evalVariables(request.Event.Payload, trigger.Vars)
}
if triggerError == nil {
data, triggerError := json.Marshal(evaluated)
if triggerError != nil {
panic(triggerError)
}

data, err := json.Marshal(evaluated)
if err != nil {
panic(err)
}
span.SetAttributes(attribute.String("variables", string(data)))

span.SetAttributes(attribute.String("variables", string(data)))
instance, triggerError := a.manager.RunWorkflow(ctx, trigger.WorkflowID, evaluated)
if triggerError != nil {
return triggerError
}

instance, err := a.manager.RunWorkflow(ctx, trigger.WorkflowID, evaluated)
if err != nil {
return err
occurrence.WorkflowInstanceID = pointer.For(instance.ID)
} else {
triggerError = temporal.NewNonRetryableApplicationError("unable to eval variables", "VARIABLES_EVAL", triggerError)
span.RecordError(triggerError)
occurrence.Error = pointer.For(triggerError.Error())
}

_, err = a.db.NewInsert().
Model(pointer.For(NewTriggerOccurrence(request.MessageID, trigger.ID, instance.ID, request.Event))).
_, err := a.db.NewInsert().
Model(pointer.For(occurrence)).
On("CONFLICT (trigger_id, event_id) DO NOTHING").
Exec(ctx)
if err != nil {
sharedlogging.FromContext(ctx).Errorf("unable to save trigger occurrence: %s", err)
}

return err
return triggerError
}

func NewActivities(db *bun.DB, manager *workflow.WorkflowManager, expressionEvaluator *expressionEvaluator) Activities {
Expand Down
16 changes: 8 additions & 8 deletions ee/orchestration/internal/triggers/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,18 @@ type Occurrence struct {

EventID string `json:"-" bun:"event_id,pk"`
TriggerID string `json:"triggerID" bun:"trigger_id,pk"`
WorkflowInstanceID string `json:"workflowInstanceID" bun:"workflow_instance_id"`
WorkflowInstance workflow.Instance `json:"workflowInstance" bun:"rel:belongs-to,join:workflow_instance_id=id"`
WorkflowInstanceID *string `json:"workflowInstanceID,omitempty" bun:"workflow_instance_id"`
WorkflowInstance *workflow.Instance `json:"workflowInstance,omitempty" bun:"rel:belongs-to,join:workflow_instance_id=id"`
Date time.Time `json:"date" bun:"date"`
Event publish.EventMessage `json:"event" bun:"event"`
Error *string `json:"error,omitempty" bun:"error"`
}

func NewTriggerOccurrence(eventID, triggerID, workflowInstanceID string, event publish.EventMessage) Occurrence {
func NewTriggerOccurrence(eventID, triggerID string, event publish.EventMessage) Occurrence {
return Occurrence{
TriggerID: triggerID,
EventID: eventID,
WorkflowInstanceID: workflowInstanceID,
Date: time.Now().Round(time.Microsecond).UTC(),
Event: event,
TriggerID: triggerID,
EventID: eventID,
Date: time.Now().Round(time.Microsecond).UTC(),
Event: event,
}
}
8 changes: 7 additions & 1 deletion ee/orchestration/internal/triggers/workflow_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package triggers
import (
"time"

"github.com/pkg/errors"
"go.temporal.io/sdk/temporal"

"github.com/formancehq/stack/libs/go-libs/publish"
"github.com/uptrace/bun"
temporalworkflow "go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -42,7 +45,10 @@ func (w triggerWorkflow) RunTrigger(ctx temporalworkflow.Context, req ProcessEve
req,
).Get(ctx, nil)
if err != nil {
return err
applicationError := &temporal.ApplicationError{}
if !errors.As(err, &applicationError) {
return err
}
}
}

Expand Down
Loading

0 comments on commit 592e2d2

Please sign in to comment.