Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reserve "__temporal_" prefix #1806

Merged
merged 3 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/internal_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const (
minRPCTimeout = 1 * time.Second
// maxRPCTimeout is maximum gRPC call timeout allowed (should not be less than defaultRPCTimeout).
maxRPCTimeout = 10 * time.Second

temporalPrefix = "__temporal_"
temporalPrefixError = "__temporal_ is a reserved prefix"
)

// grpcContextBuilder stores all gRPC-specific parameters that will
Expand Down
17 changes: 17 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,9 @@ func (r *registry) RegisterWorkflowWithOptions(
if len(options.Name) == 0 {
panic("WorkflowDefinitionFactory must be registered with a name")
}
if strings.HasPrefix(options.Name, temporalPrefix) {
panic(temporalPrefixError)
}
r.workflowFuncMap[options.Name] = factory
r.workflowVersioningBehaviorMap[options.Name] = options.VersioningBehavior
return
Expand All @@ -583,6 +586,10 @@ func (r *registry) RegisterWorkflowWithOptions(
registerName = alias
}

if strings.HasPrefix(alias, temporalPrefix) || strings.HasPrefix(registerName, temporalPrefix) {
panic(temporalPrefixError)
}

r.Lock()
defer r.Unlock()

Expand Down Expand Up @@ -613,6 +620,9 @@ func (r *registry) RegisterActivityWithOptions(
if options.Name == "" {
panic("registration of activity interface requires name")
}
if strings.HasPrefix(options.Name, temporalPrefix) {
panic(temporalPrefixError)
}
r.addActivityWithLock(options.Name, a)
return
}
Expand All @@ -635,6 +645,10 @@ func (r *registry) RegisterActivityWithOptions(
registerName = alias
}

if strings.HasPrefix(alias, temporalPrefix) || strings.HasPrefix(registerName, temporalPrefix) {
panic(temporalPrefixError)
}

r.Lock()
defer r.Unlock()

Expand Down Expand Up @@ -1659,6 +1673,9 @@ func extractHistoryFromFile(jsonfileName string, lastEventID int64) (hist *histo

// NewAggregatedWorker returns an instance to manage both activity and workflow workers
func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options WorkerOptions) *AggregatedWorker {
if strings.HasPrefix(taskQueue, temporalPrefix) {
panic(temporalPrefixError)
}
setClientDefaults(client)
setWorkerOptionsDefaults(&options)
ctx := options.BackgroundActivityContext
Expand Down
37 changes: 37 additions & 0 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2949,3 +2949,40 @@ func TestAliasUnqualifiedNameClash(t *testing.T) {
require.Equal(t, "func3", executeWorkflow(false))
require.Equal(t, "func1", executeWorkflow(true))
}

func (s *internalWorkerTestSuite) TestReservedTemporalName() {
// workflow
worker := createWorker(s.service)
workflowFn := func(ctx Context) error { return nil }
err := runAndCatchPanic(func() {
worker.RegisterWorkflowWithOptions(workflowFn, RegisterWorkflowOptions{Name: "__temporal_workflow"})
})
require.Error(s.T(), err)
require.Contains(s.T(), err.Error(), temporalPrefixError)

// activity
activityFn := func() error {
return nil
}
err = runAndCatchPanic(func() {
worker.RegisterActivityWithOptions(activityFn, RegisterActivityOptions{Name: "__temporal_workflow"})
})
require.Error(s.T(), err)
require.Contains(s.T(), err.Error(), temporalPrefixError)

err = worker.Start()
require.NoError(s.T(), err)
worker.Stop()

// task queue
namespace := "testNamespace"
service := workflowservicemock.NewMockWorkflowServiceClient(s.mockCtrl)
client := NewServiceClient(service, nil, ClientOptions{
Namespace: namespace,
})
err = runAndCatchPanic(func() {
_ = NewAggregatedWorker(client, "__temporal_task_queue", WorkerOptions{})
})
require.Error(s.T(), err)
require.Contains(s.T(), err.Error(), temporalPrefixError)
}
9 changes: 9 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1897,6 +1897,9 @@ func (wc *workflowEnvironmentInterceptor) GetSignalChannelWithOptions(
signalName string,
options SignalChannelOptions,
) ReceiveChannel {
if strings.HasPrefix(signalName, temporalPrefix) {
panic(temporalPrefixError)
}
eo := getWorkflowEnvOptions(ctx)
ch := eo.getSignalChannel(ctx, signalName)
// Add as a requested channel if not already done
Expand Down Expand Up @@ -2620,6 +2623,12 @@ func NewNexusClient(endpoint, service string) NexusClient {
if service == "" {
panic("service must not be empty")
}
if strings.HasPrefix(endpoint, temporalPrefix) {
panic("endpoint cannot use reserved __temporal_ prefix")
}
if strings.HasPrefix(service, temporalPrefix) {
panic("service cannot use reserved __temporal_ prefix")
}
return nexusClient{endpoint, service}
}

Expand Down
10 changes: 10 additions & 0 deletions temporalnexus/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ package temporalnexus
import (
"context"
"errors"
"strings"

"github.com/nexus-rpc/sdk-go/nexus"
"go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -87,6 +88,9 @@ func NewSyncOperation[I any, O any](
name string,
handler func(context.Context, client.Client, I, nexus.StartOperationOptions) (O, error),
) nexus.Operation[I, O] {
if strings.HasPrefix(name, "__temporal_") {
panic(errors.New("temporalnexus NewSyncOperation __temporal_ is an reserved prefix"))
}
return &syncOperation[I, O]{
name: name,
handler: handler,
Expand Down Expand Up @@ -184,6 +188,9 @@ func NewWorkflowRunOperation[I, O any](
workflow func(workflow.Context, I) (O, error),
getOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error),
) nexus.Operation[I, O] {
if strings.HasPrefix(name, "__temporal_") {
panic(errors.New("temporalnexus NewWorkflowRunOperation __temporal_ is an invalid name"))
}
return &workflowRunOperation[I, O]{
options: WorkflowRunOperationOptions[I, O]{
Name: name,
Expand All @@ -201,6 +208,9 @@ func NewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOp
if options.Name == "" {
return nil, errors.New("invalid options: Name is required")
}
if strings.HasPrefix(options.Name, "__temporal_") {
return nil, errors.New("invalid options: __temporal_ is a reserved prefix")
}
if options.Workflow == nil && options.GetOptions == nil && options.Handler == nil {
return nil, errors.New("invalid options: either GetOptions and Workflow, or Handler are required")
}
Expand Down
5 changes: 5 additions & 0 deletions temporalnexus/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func TestNewWorkflowRunOperationWithOptions(t *testing.T) {
})
require.ErrorContains(t, err, "either GetOptions and Workflow, or Handler are required")

_, err = temporalnexus.NewWorkflowRunOperationWithOptions(temporalnexus.WorkflowRunOperationOptions[string, string]{
Name: "__temporal_test",
})
require.ErrorContains(t, err, "__temporal_ is a reserved prefix")

_, err = temporalnexus.NewWorkflowRunOperationWithOptions(temporalnexus.WorkflowRunOperationOptions[string, string]{
Name: "test",
Workflow: func(workflow.Context, string) (string, error) {
Expand Down
14 changes: 14 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7024,3 +7024,17 @@ func (c *coroutineCountingWorkflowOutboundInterceptor) Go(
f(ctx)
})
}

func (ts *IntegrationTestSuite) TestTemporalPrefixSignal() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
options := ts.startWorkflowOptions("test-temporal-prefix")
run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.WorkflowTemporalPrefixSignal)
ts.NoError(err)

err = ts.client.SignalWorkflow(ctx, run.GetID(), "", "__temporal_signal", nil)
ts.NoError(err)

err = run.Get(ctx, nil)
ts.Error(err)
}
6 changes: 6 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3296,6 +3296,11 @@ func (w *Workflows) WorkflowClientFromActivity(ctx workflow.Context) error {
return workflow.ExecuteLocalActivity(ctx, activities.ClientFromActivity).Get(ctx, nil)
}

func (w *Workflows) WorkflowTemporalPrefixSignal(ctx workflow.Context) error {
_ = workflow.GetSignalChannel(ctx, "__temporal_signal").Receive(ctx, nil)
return nil
}

func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ActivityCancelRepro)
worker.RegisterWorkflow(w.ActivityCompletionUsingID)
Expand Down Expand Up @@ -3435,6 +3440,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.RunsLocalAndNonlocalActsWithRetries)
worker.RegisterWorkflow(w.SelectorBlockSignal)
worker.RegisterWorkflow(w.WorkflowClientFromActivity)
worker.RegisterWorkflow(w.WorkflowTemporalPrefixSignal)
}

func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions {
Expand Down