From 6ab94e1323ee468eeaaaec4fdbdbd19eae1a7013 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Wed, 27 Nov 2024 21:44:40 +0100 Subject: [PATCH] Sub-orchestration retries (#84) Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- CHANGELOG.md | 1 + backend/sqlite/sqlite.go | 5 ++- samples/retries/retries.go | 2 +- task/activity.go | 54 +++++++++++++----------- task/orchestrator.go | 80 ++++++++++++++++++++++++------------ task/orchestrator_test.go | 14 +++---- tests/grpc/grpc_test.go | 35 +++++++++++++++- tests/orchestrations_test.go | 45 +++++++++++++++++++- 8 files changed, 175 insertions(+), 61 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 855a71a..28a4679 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add API to set custom status ([#81](https://github.com/microsoft/durabletask-go/pull/81)) - by [@famarting](https://github.com/famarting) - Add missing purge orchestration options ([#82](https://github.com/microsoft/durabletask-go/pull/82)) - by [@famarting](https://github.com/famarting) - Add support for activity retry policies ([#83](https://github.com/microsoft/durabletask-go/pull/83)) - by [@famarting](https://github.com/famarting) +- Add support for sub-orchestration retry policies ([#84](https://github.com/microsoft/durabletask-go/pull/84)) - by [@famarting](https://github.com/famarting) ### Changed diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 3ce4dbc..8343342 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -336,7 +336,10 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi * for _, msg := range wi.State.PendingMessages() { if es := msg.HistoryEvent.GetExecutionStarted(); es != nil { // Need to insert a new row into the DB - if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx); err != nil { + if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx, backend.WithOrchestrationIdReusePolicy(&protos.OrchestrationIdReusePolicy{ + OperationStatus: []protos.OrchestrationStatus{protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED}, + Action: api.REUSE_ID_ACTION_TERMINATE, + })); err != nil { if err == backend.ErrDuplicateEvent { be.logger.Warnf( "%v: dropping sub-orchestration creation event because an instance with the target ID (%v) already exists.", diff --git a/samples/retries/retries.go b/samples/retries/retries.go index 95f2885..ee5a8b4 100644 --- a/samples/retries/retries.go +++ b/samples/retries/retries.go @@ -74,7 +74,7 @@ func Init(ctx context.Context, r *task.TaskRegistry) (backend.TaskHubClient, bac } func RetryActivityOrchestrator(ctx *task.OrchestrationContext) (any, error) { - if err := ctx.CallActivity(RandomFailActivity, task.WithRetryPolicy(&task.ActivityRetryPolicy{ + if err := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{ MaxAttempts: 10, InitialRetryInterval: 100 * time.Millisecond, BackoffCoefficient: 2, diff --git a/task/activity.go b/task/activity.go index f57797f..bc77429 100644 --- a/task/activity.go +++ b/task/activity.go @@ -14,10 +14,10 @@ type callActivityOption func(*callActivityOptions) error type callActivityOptions struct { rawInput *wrapperspb.StringValue - retryPolicy *ActivityRetryPolicy + retryPolicy *RetryPolicy } -type ActivityRetryPolicy struct { +type RetryPolicy struct { // Max number of attempts to try the activity call, first execution inclusive MaxAttempts int // Timespan to wait for the first retry @@ -32,6 +32,31 @@ type ActivityRetryPolicy struct { Handle func(error) bool } +func (policy *RetryPolicy) Validate() error { + if policy.InitialRetryInterval <= 0 { + return fmt.Errorf("InitialRetryInterval must be greater than 0") + } + if policy.MaxAttempts <= 0 { + // setting 1 max attempt is equivalent to not retrying + policy.MaxAttempts = 1 + } + if policy.BackoffCoefficient <= 0 { + policy.BackoffCoefficient = 1 + } + if policy.MaxRetryInterval <= 0 { + policy.MaxRetryInterval = math.MaxInt64 + } + if policy.RetryTimeout <= 0 { + policy.RetryTimeout = math.MaxInt64 + } + if policy.Handle == nil { + policy.Handle = func(err error) bool { + return true + } + } + return nil +} + // WithActivityInput configures an input for an activity invocation. // The specified input must be JSON serializable. func WithActivityInput(input any) callActivityOption { @@ -53,31 +78,14 @@ func WithRawActivityInput(input string) callActivityOption { } } -func WithRetryPolicy(policy *ActivityRetryPolicy) callActivityOption { +func WithActivityRetryPolicy(policy *RetryPolicy) callActivityOption { return func(opt *callActivityOptions) error { if policy == nil { return nil } - if policy.InitialRetryInterval <= 0 { - return fmt.Errorf("InitialRetryInterval must be greater than 0") - } - if policy.MaxAttempts <= 0 { - // setting 1 max attempt is equivalent to not retrying - policy.MaxAttempts = 1 - } - if policy.BackoffCoefficient <= 0 { - policy.BackoffCoefficient = 1 - } - if policy.MaxRetryInterval <= 0 { - policy.MaxRetryInterval = math.MaxInt64 - } - if policy.RetryTimeout <= 0 { - policy.RetryTimeout = math.MaxInt64 - } - if policy.Handle == nil { - policy.Handle = func(err error) bool { - return true - } + err := policy.Validate() + if err != nil { + return err } opt.retryPolicy = policy return nil diff --git a/task/orchestrator.go b/task/orchestrator.go index 4b5f967..b418b0d 100644 --- a/task/orchestrator.go +++ b/task/orchestrator.go @@ -49,6 +49,8 @@ type OrchestrationContext struct { type callSubOrchestratorOptions struct { instanceID string rawInput *wrapperspb.StringValue + + retryPolicy *RetryPolicy } // subOrchestratorOption is a functional option type for the CallSubOrchestrator orchestrator method. @@ -96,6 +98,20 @@ func WithSubOrchestrationInstanceID(instanceID string) subOrchestratorOption { } } +func WithSubOrchestrationRetryPolicy(policy *RetryPolicy) subOrchestratorOption { + return func(opt *callSubOrchestratorOptions) error { + if policy == nil { + return nil + } + err := policy.Validate() + if err != nil { + return err + } + opt.retryPolicy = policy + return nil + } +} + // NewOrchestrationContext returns a new [OrchestrationContext] struct with the specified parameters. func NewOrchestrationContext(registry *TaskRegistry, id api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) *OrchestrationContext { return &OrchestrationContext{ @@ -238,7 +254,7 @@ func (ctx *OrchestrationContext) CallActivity(activity interface{}, opts ...call } if options.retryPolicy != nil { - return ctx.internalCallActivityWithRetries(ctx.CurrentTimeUtc, func() Task { + return ctx.internalScheduleTaskWithRetries(ctx.CurrentTimeUtc, func() Task { return ctx.internalScheduleActivity(activity, options) }, *options.retryPolicy, 0) } @@ -259,7 +275,40 @@ func (ctx *OrchestrationContext) internalScheduleActivity(activity interface{}, return task } -func (ctx *OrchestrationContext) internalCallActivityWithRetries(initialAttempt time.Time, schedule func() Task, policy ActivityRetryPolicy, retryCount int) Task { +func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task { + options := new(callSubOrchestratorOptions) + for _, configure := range opts { + if err := configure(options); err != nil { + failedTask := newTask(ctx) + failedTask.fail(helpers.NewTaskFailureDetails(err)) + return failedTask + } + } + + if options.retryPolicy != nil { + return ctx.internalScheduleTaskWithRetries(ctx.CurrentTimeUtc, func() Task { + return ctx.internalCallSubOrchestrator(orchestrator, options) + }, *options.retryPolicy, 0) + } + + return ctx.internalCallSubOrchestrator(orchestrator, options) +} + +func (ctx *OrchestrationContext) internalCallSubOrchestrator(orchestrator interface{}, options *callSubOrchestratorOptions) Task { + createSubOrchestrationAction := helpers.NewCreateSubOrchestrationAction( + ctx.getNextSequenceNumber(), + helpers.GetTaskFunctionName(orchestrator), + options.instanceID, + options.rawInput, + ) + ctx.pendingActions[createSubOrchestrationAction.Id] = createSubOrchestrationAction + + task := newTask(ctx) + ctx.pendingTasks[createSubOrchestrationAction.Id] = task + return task +} + +func (ctx *OrchestrationContext) internalScheduleTaskWithRetries(initialAttempt time.Time, schedule func() Task, policy RetryPolicy, retryCount int) Task { return &taskWrapper{ delegate: schedule(), onAwaitResult: func(v any, err error) error { @@ -283,7 +332,7 @@ func (ctx *OrchestrationContext) internalCallActivityWithRetries(initialAttempt return fmt.Errorf("%v %w", timerErr, err) } - err = ctx.internalCallActivityWithRetries(initialAttempt, schedule, policy, retryCount+1).Await(v) + err = ctx.internalScheduleTaskWithRetries(initialAttempt, schedule, policy, retryCount+1).Await(v) if err == nil { return nil } @@ -292,7 +341,7 @@ func (ctx *OrchestrationContext) internalCallActivityWithRetries(initialAttempt } } -func computeNextDelay(currentTimeUtc time.Time, policy ActivityRetryPolicy, attempt int, firstAttempt time.Time, err error) time.Duration { +func computeNextDelay(currentTimeUtc time.Time, policy RetryPolicy, attempt int, firstAttempt time.Time, err error) time.Duration { if policy.Handle(err) { isExpired := false if policy.RetryTimeout != math.MaxInt64 { @@ -309,29 +358,6 @@ func computeNextDelay(currentTimeUtc time.Time, policy ActivityRetryPolicy, atte return 0 } -func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task { - options := new(callSubOrchestratorOptions) - for _, configure := range opts { - if err := configure(options); err != nil { - failedTask := newTask(ctx) - failedTask.fail(helpers.NewTaskFailureDetails(err)) - return failedTask - } - } - - createSubOrchestrationAction := helpers.NewCreateSubOrchestrationAction( - ctx.getNextSequenceNumber(), - helpers.GetTaskFunctionName(orchestrator), - options.instanceID, - options.rawInput, - ) - ctx.pendingActions[createSubOrchestrationAction.Id] = createSubOrchestrationAction - - task := newTask(ctx) - ctx.pendingTasks[createSubOrchestrationAction.Id] = task - return task -} - // CreateTimer schedules a durable timer that expires after the specified delay. func (ctx *OrchestrationContext) CreateTimer(delay time.Duration) Task { return ctx.createTimerInternal(delay) diff --git a/task/orchestrator_test.go b/task/orchestrator_test.go index d664b40..f4842b5 100644 --- a/task/orchestrator_test.go +++ b/task/orchestrator_test.go @@ -10,7 +10,7 @@ func Test_computeNextDelay(t *testing.T) { time2 := time.Now().Add(1 * time.Minute) type args struct { currentTimeUtc time.Time - policy ActivityRetryPolicy + policy RetryPolicy attempt int firstAttempt time.Time err error @@ -24,7 +24,7 @@ func Test_computeNextDelay(t *testing.T) { name: "first attempt", args: args{ currentTimeUtc: time2, - policy: ActivityRetryPolicy{ + policy: RetryPolicy{ MaxAttempts: 3, InitialRetryInterval: 2 * time.Second, BackoffCoefficient: 2, @@ -41,7 +41,7 @@ func Test_computeNextDelay(t *testing.T) { name: "second attempt", args: args{ currentTimeUtc: time2, - policy: ActivityRetryPolicy{ + policy: RetryPolicy{ MaxAttempts: 3, InitialRetryInterval: 2 * time.Second, BackoffCoefficient: 2, @@ -58,7 +58,7 @@ func Test_computeNextDelay(t *testing.T) { name: "third attempt", args: args{ currentTimeUtc: time2, - policy: ActivityRetryPolicy{ + policy: RetryPolicy{ MaxAttempts: 3, InitialRetryInterval: 2 * time.Second, BackoffCoefficient: 2, @@ -75,7 +75,7 @@ func Test_computeNextDelay(t *testing.T) { name: "fourth attempt", args: args{ currentTimeUtc: time2, - policy: ActivityRetryPolicy{ + policy: RetryPolicy{ MaxAttempts: 3, InitialRetryInterval: 2 * time.Second, BackoffCoefficient: 2, @@ -92,7 +92,7 @@ func Test_computeNextDelay(t *testing.T) { name: "expired", args: args{ currentTimeUtc: time2, - policy: ActivityRetryPolicy{ + policy: RetryPolicy{ MaxAttempts: 3, InitialRetryInterval: 2 * time.Second, BackoffCoefficient: 2, @@ -109,7 +109,7 @@ func Test_computeNextDelay(t *testing.T) { name: "fourth attempt backoff 1", args: args{ currentTimeUtc: time2, - policy: ActivityRetryPolicy{ + policy: RetryPolicy{ MaxAttempts: 3, InitialRetryInterval: 2 * time.Second, BackoffCoefficient: 1, diff --git a/tests/grpc/grpc_test.go b/tests/grpc/grpc_test.go index 85d022f..7e085cb 100644 --- a/tests/grpc/grpc_test.go +++ b/tests/grpc/grpc_test.go @@ -426,7 +426,7 @@ func Test_Grpc_ReuseInstanceIDError(t *testing.T) { func Test_Grpc_ActivityRetries(t *testing.T) { r := task.NewTaskRegistry() r.AddOrchestratorN("ActivityRetries", func(ctx *task.OrchestrationContext) (any, error) { - if err := ctx.CallActivity("FailActivity", task.WithRetryPolicy(&task.ActivityRetryPolicy{ + if err := ctx.CallActivity("FailActivity", task.WithActivityRetryPolicy(&task.RetryPolicy{ MaxAttempts: 3, InitialRetryInterval: 10 * time.Millisecond, })).Await(nil); err != nil { @@ -453,3 +453,36 @@ func Test_Grpc_ActivityRetries(t *testing.T) { // With 3 max attempts there will be two retries with 10 millis delay before each require.GreaterOrEqual(t, metadata.LastUpdatedAt, metadata.CreatedAt.Add(2*10*time.Millisecond)) } + +func Test_Grpc_SubOrchestratorRetries(t *testing.T) { + r := task.NewTaskRegistry() + r.AddOrchestratorN("Parent", func(ctx *task.OrchestrationContext) (any, error) { + err := ctx.CallSubOrchestrator( + "Child", + task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_child"), + task.WithSubOrchestrationRetryPolicy(&task.RetryPolicy{ + MaxAttempts: 3, + InitialRetryInterval: 10 * time.Millisecond, + BackoffCoefficient: 2, + })).Await(nil) + return nil, err + }) + r.AddOrchestratorN("Child", func(ctx *task.OrchestrationContext) (any, error) { + return nil, errors.New("Child failed") + }) + + cancelListener := startGrpcListener(t, r) + defer cancelListener() + instanceID := api.InstanceID("orchestrator_retries") + + id, err := grpcClient.ScheduleNewOrchestration(ctx, "Parent", api.WithInstanceID(instanceID)) + require.NoError(t, err) + timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second) + defer cancelTimeout() + metadata, err := grpcClient.WaitForOrchestrationCompletion(timeoutCtx, id, api.WithFetchPayloads(true)) + require.NoError(t, err) + assert.Equal(t, true, metadata.IsComplete()) + assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED, metadata.RuntimeStatus) + // With 3 max attempts there will be two retries with 10 millis delay before each + require.GreaterOrEqual(t, metadata.LastUpdatedAt, metadata.CreatedAt.Add(2*10*time.Millisecond)) +} diff --git a/tests/orchestrations_test.go b/tests/orchestrations_test.go index a359351..922f882 100644 --- a/tests/orchestrations_test.go +++ b/tests/orchestrations_test.go @@ -258,7 +258,7 @@ func Test_ActivityRetries(t *testing.T) { // Registration r := task.NewTaskRegistry() r.AddOrchestratorN("ActivityRetries", func(ctx *task.OrchestrationContext) (any, error) { - if err := ctx.CallActivity("FailActivity", task.WithRetryPolicy(&task.ActivityRetryPolicy{ + if err := ctx.CallActivity("FailActivity", task.WithActivityRetryPolicy(&task.RetryPolicy{ MaxAttempts: 3, InitialRetryInterval: 10 * time.Millisecond, })).Await(nil); err != nil { @@ -431,6 +431,49 @@ func Test_SingleSubOrchestrator_Failed(t *testing.T) { ) } +func Test_SingleSubOrchestrator_Failed_Retries(t *testing.T) { + r := task.NewTaskRegistry() + r.AddOrchestratorN("Parent", func(ctx *task.OrchestrationContext) (any, error) { + err := ctx.CallSubOrchestrator( + "Child", + task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_child"), + task.WithSubOrchestrationRetryPolicy(&task.RetryPolicy{ + MaxAttempts: 3, + InitialRetryInterval: 10 * time.Millisecond, + BackoffCoefficient: 2, + })).Await(nil) + return nil, err + }) + r.AddOrchestratorN("Child", func(ctx *task.OrchestrationContext) (any, error) { + return nil, errors.New("Child failed") + }) + + ctx := context.Background() + exporter := initTracing() + client, worker := initTaskHubWorker(ctx, r) + defer worker.Shutdown(ctx) + + id, err := client.ScheduleNewOrchestration(ctx, "Parent") + require.NoError(t, err) + metadata, err := client.WaitForOrchestrationCompletion(ctx, id) + require.NoError(t, err) + assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED, metadata.RuntimeStatus) + if assert.NotNil(t, metadata.FailureDetails) { + assert.Contains(t, metadata.FailureDetails.ErrorMessage, "Child failed") + } + + spans := exporter.GetSpans().Snapshots() + assertSpanSequence(t, spans, + assertOrchestratorCreated("Parent", id), + assertOrchestratorExecuted("Child", id+"_child", "FAILED"), + assertTimer(id, assertTaskID(1)), + assertOrchestratorExecuted("Child", id+"_child", "FAILED"), + assertTimer(id, assertTaskID(3)), + assertOrchestratorExecuted("Child", id+"_child", "FAILED"), + assertOrchestratorExecuted("Parent", id, "FAILED"), + ) +} + func Test_ContinueAsNew(t *testing.T) { // Registration r := task.NewTaskRegistry()