From 6e28bccc7e748ebb94063ffa771e39fe7851369c Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Fri, 31 Jan 2025 16:44:30 -0800 Subject: [PATCH 1/2] Disallow workflows, activities, Nexus operations/services, task queue, signal, from using __temporal_ prefix. TODO: add tests for activities/workflows, all other scenarios. --- internal/internal_utils.go | 3 +++ internal/internal_worker.go | 17 +++++++++++++++ internal/internal_worker_test.go | 37 ++++++++++++++++++++++++++++++++ internal/workflow.go | 9 ++++++++ temporalnexus/operation.go | 10 +++++++++ temporalnexus/operation_test.go | 5 +++++ test/integration_test.go | 14 ++++++++++++ test/workflow_test.go | 6 ++++++ 8 files changed, 101 insertions(+) diff --git a/internal/internal_utils.go b/internal/internal_utils.go index 15bff4857..851f63fb4 100644 --- a/internal/internal_utils.go +++ b/internal/internal_utils.go @@ -52,6 +52,9 @@ const ( minRPCTimeout = 1 * time.Second // maxRPCTimeout is maximum gRPC call timeout allowed (should not be less than defaultRPCTimeout). maxRPCTimeout = 10 * time.Second + + temporalPrefix = "__temporal_" + temporalPrefixError = "__temporal_ is a reserved prefix" ) // grpcContextBuilder stores all gRPC-specific parameters that will diff --git a/internal/internal_worker.go b/internal/internal_worker.go index c518b775e..e1a61d0d2 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -567,6 +567,9 @@ func (r *registry) RegisterWorkflowWithOptions( if len(options.Name) == 0 { panic("WorkflowDefinitionFactory must be registered with a name") } + if strings.HasPrefix(options.Name, temporalPrefix) { + panic(temporalPrefixError) + } r.workflowFuncMap[options.Name] = factory r.workflowVersioningBehaviorMap[options.Name] = options.VersioningBehavior return @@ -583,6 +586,10 @@ func (r *registry) RegisterWorkflowWithOptions( registerName = alias } + if strings.HasPrefix(alias, temporalPrefix) || strings.HasPrefix(registerName, temporalPrefix) { + panic(temporalPrefixError) + } + r.Lock() defer r.Unlock() @@ -613,6 +620,9 @@ func (r *registry) RegisterActivityWithOptions( if options.Name == "" { panic("registration of activity interface requires name") } + if strings.HasPrefix(options.Name, temporalPrefix) { + panic(temporalPrefixError) + } r.addActivityWithLock(options.Name, a) return } @@ -635,6 +645,10 @@ func (r *registry) RegisterActivityWithOptions( registerName = alias } + if strings.HasPrefix(alias, temporalPrefix) || strings.HasPrefix(registerName, temporalPrefix) { + panic(temporalPrefixError) + } + r.Lock() defer r.Unlock() @@ -1659,6 +1673,9 @@ func extractHistoryFromFile(jsonfileName string, lastEventID int64) (hist *histo // NewAggregatedWorker returns an instance to manage both activity and workflow workers func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options WorkerOptions) *AggregatedWorker { + if strings.HasPrefix(taskQueue, temporalPrefix) { + panic(temporalPrefixError) + } setClientDefaults(client) setWorkerOptionsDefaults(&options) ctx := options.BackgroundActivityContext diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 1313a6fe3..74a1ed083 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -2949,3 +2949,40 @@ func TestAliasUnqualifiedNameClash(t *testing.T) { require.Equal(t, "func3", executeWorkflow(false)) require.Equal(t, "func1", executeWorkflow(true)) } + +func (s *internalWorkerTestSuite) TestReservedTemporalName() { + // workflow + worker := createWorker(s.service) + workflowFn := func(ctx Context) error { return nil } + err := runAndCatchPanic(func() { + worker.RegisterWorkflowWithOptions(workflowFn, RegisterWorkflowOptions{Name: "__temporal_workflow"}) + }) + require.Error(s.T(), err) + require.Contains(s.T(), err.Error(), temporalPrefixError) + + // activity + activityFn := func() error { + return nil + } + err = runAndCatchPanic(func() { + worker.RegisterActivityWithOptions(activityFn, RegisterActivityOptions{Name: "__temporal_workflow"}) + }) + require.Error(s.T(), err) + require.Contains(s.T(), err.Error(), temporalPrefixError) + + err = worker.Start() + require.NoError(s.T(), err) + worker.Stop() + + // task queue + namespace := "testNamespace" + service := workflowservicemock.NewMockWorkflowServiceClient(s.mockCtrl) + client := NewServiceClient(service, nil, ClientOptions{ + Namespace: namespace, + }) + err = runAndCatchPanic(func() { + _ = NewAggregatedWorker(client, "__temporal_task_queue", WorkerOptions{}) + }) + require.Error(s.T(), err) + require.Contains(s.T(), err.Error(), temporalPrefixError) +} diff --git a/internal/workflow.go b/internal/workflow.go index 5b0758919..3f000f4c5 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -1897,6 +1897,9 @@ func (wc *workflowEnvironmentInterceptor) GetSignalChannelWithOptions( signalName string, options SignalChannelOptions, ) ReceiveChannel { + if strings.HasPrefix(signalName, temporalPrefix) { + panic(temporalPrefixError) + } eo := getWorkflowEnvOptions(ctx) ch := eo.getSignalChannel(ctx, signalName) // Add as a requested channel if not already done @@ -2620,6 +2623,12 @@ func NewNexusClient(endpoint, service string) NexusClient { if service == "" { panic("service must not be empty") } + if strings.HasPrefix(endpoint, temporalPrefix) { + panic("endpoint cannot use reserved __temporal_ prefix") + } + if strings.HasPrefix(service, temporalPrefix) { + panic("service cannot use reserved __temporal_ prefix") + } return nexusClient{endpoint, service} } diff --git a/temporalnexus/operation.go b/temporalnexus/operation.go index 84ee119a2..d9c2f0811 100644 --- a/temporalnexus/operation.go +++ b/temporalnexus/operation.go @@ -41,6 +41,7 @@ package temporalnexus import ( "context" "errors" + "strings" "github.com/nexus-rpc/sdk-go/nexus" "go.temporal.io/api/common/v1" @@ -87,6 +88,9 @@ func NewSyncOperation[I any, O any]( name string, handler func(context.Context, client.Client, I, nexus.StartOperationOptions) (O, error), ) nexus.Operation[I, O] { + if strings.HasPrefix(name, "__temporal_") { + panic(errors.New("temporalnexus NewSyncOperation __temporal_ is an reserved prefix")) + } return &syncOperation[I, O]{ name: name, handler: handler, @@ -184,6 +188,9 @@ func NewWorkflowRunOperation[I, O any]( workflow func(workflow.Context, I) (O, error), getOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error), ) nexus.Operation[I, O] { + if strings.HasPrefix(name, "__temporal_") { + panic(errors.New("temporalnexus NewWorkflowRunOperation __temporal_ is an invalid name")) + } return &workflowRunOperation[I, O]{ options: WorkflowRunOperationOptions[I, O]{ Name: name, @@ -201,6 +208,9 @@ func NewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOp if options.Name == "" { return nil, errors.New("invalid options: Name is required") } + if strings.HasPrefix(options.Name, "__temporal_") { + return nil, errors.New("invalid options: __temporal_ is a reserved prefix") + } if options.Workflow == nil && options.GetOptions == nil && options.Handler == nil { return nil, errors.New("invalid options: either GetOptions and Workflow, or Handler are required") } diff --git a/temporalnexus/operation_test.go b/temporalnexus/operation_test.go index f590aa34d..ae890a0cd 100644 --- a/temporalnexus/operation_test.go +++ b/temporalnexus/operation_test.go @@ -42,6 +42,11 @@ func TestNewWorkflowRunOperationWithOptions(t *testing.T) { }) require.ErrorContains(t, err, "either GetOptions and Workflow, or Handler are required") + _, err = temporalnexus.NewWorkflowRunOperationWithOptions(temporalnexus.WorkflowRunOperationOptions[string, string]{ + Name: "__temporal_test", + }) + require.ErrorContains(t, err, "__temporal_ is a reserved prefix") + _, err = temporalnexus.NewWorkflowRunOperationWithOptions(temporalnexus.WorkflowRunOperationOptions[string, string]{ Name: "test", Workflow: func(workflow.Context, string) (string, error) { diff --git a/test/integration_test.go b/test/integration_test.go index d181db6ea..d5bb2d8bf 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -7024,3 +7024,17 @@ func (c *coroutineCountingWorkflowOutboundInterceptor) Go( f(ctx) }) } + +func (ts *IntegrationTestSuite) TestTemporalPrefixSignal() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-temporal-prefix") + run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.WorkflowTemporalPrefixSignal) + ts.NoError(err) + + err = ts.client.SignalWorkflow(ctx, run.GetID(), "", "__temporal_signal", nil) + ts.NoError(err) + + err = run.Get(ctx, nil) + ts.Error(err) +} diff --git a/test/workflow_test.go b/test/workflow_test.go index 740af54f2..991010165 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -3296,6 +3296,11 @@ func (w *Workflows) WorkflowClientFromActivity(ctx workflow.Context) error { return workflow.ExecuteLocalActivity(ctx, activities.ClientFromActivity).Get(ctx, nil) } +func (w *Workflows) WorkflowTemporalPrefixSignal(ctx workflow.Context) error { + _ = workflow.GetSignalChannel(ctx, "__temporal_signal").Receive(ctx, nil) + return nil +} + func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.ActivityCancelRepro) worker.RegisterWorkflow(w.ActivityCompletionUsingID) @@ -3435,6 +3440,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.RunsLocalAndNonlocalActsWithRetries) worker.RegisterWorkflow(w.SelectorBlockSignal) worker.RegisterWorkflow(w.WorkflowClientFromActivity) + worker.RegisterWorkflow(w.WorkflowTemporalPrefixSignal) } func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions { From 8c70b48417d39f8639e2a795f91b25d9e5a44b0b Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Fri, 7 Feb 2025 15:24:07 -0800 Subject: [PATCH 2/2] Remove test race from trying to signal a workflow that is expected to error out --- test/integration_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/integration_test.go b/test/integration_test.go index 28148eeb3..8a05591b3 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -7032,9 +7032,7 @@ func (ts *IntegrationTestSuite) TestTemporalPrefixSignal() { run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.WorkflowTemporalPrefixSignal) ts.NoError(err) - err = ts.client.SignalWorkflow(ctx, run.GetID(), "", "__temporal_signal", nil) - ts.NoError(err) - + // Trying to GetSignalChannel with a __temporal_ prefixed name should return an error err = run.Get(ctx, nil) ts.Error(err) }