Skip to content

Commit

Permalink
fix: include stack in workflows ids
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Dec 8, 2023
1 parent bd45537 commit f7a5f97
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 7 deletions.
2 changes: 2 additions & 0 deletions components/orchestration/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (
)

const (
stackFlag = "stack"
stackURLFlag = "stack-url"
stackClientIDFlag = "stack-client-id"
stackClientSecretFlag = "stack-client-secret"
Expand Down Expand Up @@ -61,6 +62,7 @@ func NewRootCommand() *cobra.Command {
cmd.PersistentFlags().String(temporalTaskQueueFlag, "default", "Temporal task queue name")
cmd.PersistentFlags().String(postgresDSNFlag, "", "Postgres address")
cmd.PersistentFlags().StringSlice(topicsFlag, []string{}, "Topics to listen")
cmd.PersistentFlags().String(stackFlag, "", "Stack")
cmd.AddCommand(newServeCommand(), newVersionCommand(), newWorkerCommand())

publish.InitCLIFlags(cmd)
Expand Down
6 changes: 5 additions & 1 deletion components/orchestration/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func workerOptions() fx.Option {
return fx.Options(
stackClientModule(),
temporalworker.NewWorkerModule(viper.GetString(temporalTaskQueueFlag)),
triggers.NewListenerModule(viper.GetString(temporalTaskQueueFlag), viper.GetStringSlice(topicsFlag)),
triggers.NewListenerModule(
viper.GetString(stackFlag),
viper.GetString(temporalTaskQueueFlag),
viper.GetStringSlice(topicsFlag),
),
)
}

Expand Down
9 changes: 5 additions & 4 deletions components/orchestration/internal/triggers/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func getWorkflowIDFromEvent(event publish.EventMessage) *string {
}
}

func handleMessage(logger logging.Logger, temporalClient client.Client, taskQueue string, msg *message.Message) error {
func handleMessage(logger logging.Logger, temporalClient client.Client, taskIDPrefix, taskQueue string, msg *message.Message) error {
logger = logger.WithFields(map[string]any{
"event-id": msg.UUID,
"duplicate": "false",
Expand Down Expand Up @@ -68,7 +68,7 @@ func handleMessage(logger logging.Logger, temporalClient client.Client, taskQueu
TaskQueue: taskQueue,
}
if ik := getWorkflowIDFromEvent(*event); ik != nil {
options.ID = *ik
options.ID = taskIDPrefix + "-" + *ik
options.WorkflowIDReusePolicy = enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE
options.WorkflowExecutionErrorWhenAlreadyStarted = true
logger = logger.WithField("ik", *ik)
Expand All @@ -95,10 +95,11 @@ func handleMessage(logger logging.Logger, temporalClient client.Client, taskQueu
return errors.Wrap(err, "executing workflow")
}

func registerListener(logger logging.Logger, r *message.Router, s message.Subscriber, temporalClient client.Client, taskQueue string, topics []string) {
func registerListener(logger logging.Logger, r *message.Router, s message.Subscriber, temporalClient client.Client,
taskIDPrefix, taskQueue string, topics []string) {
for _, topic := range topics {
r.AddNoPublisherHandler(fmt.Sprintf("listen-%s-events", topic), topic, s, func(msg *message.Message) error {
if err := handleMessage(logger, temporalClient, taskQueue, msg); err != nil {
if err := handleMessage(logger, temporalClient, taskIDPrefix, taskQueue, msg); err != nil {
logging.Errorf("Error executing workflow: %s", err)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions components/orchestration/internal/triggers/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ func NewModule(taskQueue string) fx.Option {
)
}

func NewListenerModule(taskQueue string, topics []string) fx.Option {
func NewListenerModule(taskIDPrefix, taskQueue string, topics []string) fx.Option {
return fx.Options(
fx.Invoke(func(logger logging.Logger, r *message.Router, s message.Subscriber, temporalClient client.Client) {
logger.Infof("Listening events from topics: %s", strings.Join(topics, ","))
registerListener(logger, r, s, temporalClient, taskQueue, topics)
registerListener(logger, r, s, temporalClient, taskIDPrefix, taskQueue, topics)
}),
)
}

0 comments on commit f7a5f97

Please sign in to comment.