diff --git a/api/orchestration.go b/api/orchestration.go index 0054268..238fa96 100644 --- a/api/orchestration.go +++ b/api/orchestration.go @@ -37,30 +37,29 @@ type OrchestrationMetadata struct { FailureDetails *protos.TaskFailureDetails } -type CreateOrchestrationAction int +type CreateOrchestrationAction protos.CreateOrchestrationAction const ( - THROW CreateOrchestrationAction = iota - SKIP - TERMINATE + SKIP = protos.CreateOrchestrationAction_SKIP + TERMINATE = protos.CreateOrchestrationAction_TERMINATE ) -type OrchestrationStatus int +type OrchestrationStatus protos.OrchestrationStatus const ( - RUNNING OrchestrationStatus = iota - COMPLETED - // CONTINUED_AS_NEW - FAILED - // CANCELED - TERMINATED - PENDING - // SUSPENDED + RUNNING = protos.OrchestrationStatus_ORCHESTRATION_STATUS_RUNNING + COMPLETED = protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED + // CONTINUED_AS_NEW = protos.OrchestrationStatus_ORCHESTRATION_STATUS_CONTINUED_AS_NEW + FAILED = protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED + // CANCELED = protos.OrchestrationStatus_ORCHESTRATION_STATUS_CANCELED + TERMINATED = protos.OrchestrationStatus_ORCHESTRATION_STATUS_TERMINATED + PENDING = protos.OrchestrationStatus_ORCHESTRATION_STATUS_PENDING + // SUSPENDED = protos.OrchestrationStatus_ORCHESTRATION_STATUS_SUSPENDED ) type OrchestrationIDReuseOption struct { - CreateOrchestrationAction CreateOrchestrationAction - OrchestrationStatuses []OrchestrationStatus + CreateOrchestrationAction protos.CreateOrchestrationAction + OrchestrationStatuses []protos.OrchestrationStatus } // NewOrchestrationOptions configures options for starting a new orchestration. @@ -87,38 +86,10 @@ func WithInstanceID(id InstanceID) NewOrchestrationOptions { // WithOrchestrationReuseOption configures Orchestration ID reuse policy. func WithOrchestrationReuseOption(option *OrchestrationIDReuseOption) NewOrchestrationOptions { return func(req *protos.CreateInstanceRequest) error { + // initialize CreateInstanceOption req.CreateInstanceOption = &protos.CreateInstanceOption{} - // set action - switch option.CreateOrchestrationAction { - case SKIP: - req.CreateInstanceOption.Action = protos.CreateOrchestrationAction_SKIP - case TERMINATE: - req.CreateInstanceOption.Action = protos.CreateOrchestrationAction_TERMINATE - case THROW: - req.CreateInstanceOption.Action = protos.CreateOrchestrationAction_THROW - } - - // set status - for _, status := range option.OrchestrationStatuses { - switch status { - case RUNNING: - req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_RUNNING) - case COMPLETED: - req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED) - // case CONTINUED_AS_NEW: - // req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_CONTINUED_AS_NEW) - case FAILED: - req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED) - // case CANCELED: - // req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_CANCELED) - case TERMINATED: - req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_TERMINATED) - case PENDING: - req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_PENDING) - // case SUSPENDED: - // req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_SUSPENDED) - } - } + req.CreateInstanceOption.Action = option.CreateOrchestrationAction + req.CreateInstanceOption.OperationStatus = option.OrchestrationStatuses return nil } } diff --git a/backend/backend.go b/backend/backend.go index ac95070..5bf9bd4 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -43,7 +43,7 @@ type Backend interface { // CreateOrchestrationInstance creates a new orchestration instance with a history event that // wraps a ExecutionStarted event. - CreateOrchestrationInstance(context.Context, *HistoryEvent) error + CreateOrchestrationInstance(context.Context, *HistoryEvent, *protos.CreateInstanceOption) error // AddNewEvent adds a new orchestration event to the specified orchestration instance. AddNewOrchestrationEvent(context.Context, api.InstanceID, *HistoryEvent) error @@ -91,11 +91,6 @@ type Backend interface { // [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist. // [api.ErrNotCompleted] is returned if the specified orchestration instance is still running. PurgeOrchestrationState(context.Context, api.InstanceID) error - - // CleanupOrchestration clean up all records for the specified orchestration instance in the entire task hub. - // - // [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist. - CleanupOrchestration(context.Context, api.InstanceID) error } // MarshalHistoryEvent serializes the [HistoryEvent] into a protobuf byte array. diff --git a/backend/client.go b/backend/client.go index 4aae340..4acccf6 100644 --- a/backend/client.go +++ b/backend/client.go @@ -59,7 +59,8 @@ func (c *backendClient) ScheduleNewOrchestration(ctx context.Context, orchestrat tc := helpers.TraceContextFromSpan(span) e := helpers.NewExecutionStartedEvent(req.Name, req.InstanceId, req.Input, nil, tc) - if err := c.be.CreateOrchestrationInstance(ctx, e); err != nil { + option := &protos.CreateInstanceOption{} + if err := c.be.CreateOrchestrationInstance(ctx, e, option); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return api.EmptyInstanceID, fmt.Errorf("failed to start orchestration: %w", err) diff --git a/backend/executor.go b/backend/executor.go index 13a220f..feeefce 100644 --- a/backend/executor.go +++ b/backend/executor.go @@ -9,7 +9,6 @@ import ( "time" "github.com/cenkalti/backoff/v4" - "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -311,47 +310,11 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst ctx, span := helpers.StartNewCreateOrchestrationSpan(ctx, req.Name, req.Version.GetValue(), instanceID) defer span.End() - // retreive instance with instanceID - metadata, err := g.backend.GetOrchestrationMetadata(ctx, api.InstanceID(instanceID)) - if err != nil { - // if the instance doesn't exist, create instance directly. - if errors.Is(err, api.ErrInstanceNotFound) { - return createInstance(ctx, g.backend, instanceID, req, span) - } else { - return nil, err - } - } - - // build target status set - statusSet := convertStatusToSet(req.CreateInstanceOption.OperationStatus) - - // if current status is not one of the target status, create instance directly - if !statusSet[metadata.RuntimeStatus] { - return createInstance(ctx, g.backend, instanceID, req, span) - } else { - if req.CreateInstanceOption.Action == protos.CreateOrchestrationAction_THROW { - // throw ErrDuplicateEvent since instance already exists and the status is in target status set - return nil, api.ErrDuplicateInstance - } else if req.CreateInstanceOption.Action == protos.CreateOrchestrationAction_SKIP { - // skip creating new instance - g.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", instanceID) - return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil - } else { - // CreateInstanceAction_TERMINATE - // terminate existing instance and create a new one - if err := g.backend.CleanupOrchestration(ctx, api.InstanceID(instanceID)); err != nil { - return nil, err - } - return createInstance(ctx, g.backend, instanceID, req, span) - } - } -} - -func createInstance(ctx context.Context, be Backend, instanceID string, req *protos.CreateInstanceRequest, span trace.Span) (*protos.CreateInstanceResponse, error) { e := helpers.NewExecutionStartedEvent(req.Name, instanceID, req.Input, nil, helpers.TraceContextFromSpan(span)) - if err := be.CreateOrchestrationInstance(ctx, e); err != nil { + if err := g.backend.CreateOrchestrationInstance(ctx, e, req.CreateInstanceOption); err != nil { return nil, err } + return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil } diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index cd0afca..9160a0b 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -79,6 +79,9 @@ func NewSqliteBackend(opts *SqliteOptions, logger backend.Logger) backend.Backen be.dsn = opts.FilePath } + // used for local debug + // be.dsn = "file:file.sqlite" + return be } @@ -334,7 +337,7 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi * if es := msg.HistoryEvent.GetExecutionStarted(); es != nil { // Need to insert a new row into the DB var instanceID string - if err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx, &instanceID); err != nil { + if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx, &instanceID); err != nil { if err == backend.ErrDuplicateEvent { be.logger.Warnf( "%v: dropping sub-orchestration creation event because an instance with the target ID (%v) already exists.", @@ -390,7 +393,7 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi * } // CreateOrchestrationInstance implements backend.Backend -func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent) error { +func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent, option *protos.CreateInstanceOption) error { if err := be.ensureDB(); err != nil { return err } @@ -402,8 +405,38 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac defer tx.Rollback() var instanceID string - if err := be.createOrchestrationInstanceInternal(ctx, e, tx, &instanceID); err != nil { - return err + runtimeStatus, err := be.createOrchestrationInstanceInternal(ctx, e, tx, &instanceID) + if err != nil { + // instance alredy exists + if errors.Is(err, backend.ErrDuplicateEvent) { + // build target status set + fmt.Println("@@", option) + statusSet := convertStatusToSet(option.OperationStatus) + // if current status is not one of the target status, return error + if !statusSet[helpers.FromRuntimeStatusString(runtimeStatus)] { + return api.ErrDuplicateInstance + } else { + if option.Action == protos.CreateOrchestrationAction_SKIP { + // Log an warning message and skip cerating new instance + be.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", instanceID) + return nil + } else if option.Action == protos.CreateOrchestrationAction_TERMINATE { + // terminate existing instance + if err := be.cleanupOrchestrationStateInternal(ctx, tx, api.InstanceID(instanceID)); err != nil { + return err + } + // create a new instance + if _, err := be.createOrchestrationInstanceInternal(ctx, e, tx, &instanceID); err != nil { + return err + } + } else { + // CreateInstanceAction_THROW + return api.ErrDuplicateInstance + } + } + } else { + return err + } } eventPayload, err := backend.MarshalHistoryEvent(e) @@ -429,17 +462,26 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac return nil } -func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context, e *backend.HistoryEvent, tx *sql.Tx, instanceID *string) error { +func convertStatusToSet(statuses []protos.OrchestrationStatus) map[protos.OrchestrationStatus]bool { + statusSet := make(map[protos.OrchestrationStatus]bool) + for _, status := range statuses { + statusSet[status] = true + } + return statusSet +} + +func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context, e *backend.HistoryEvent, tx *sql.Tx, instanceID *string) (string, error) { if e == nil { - return errors.New("HistoryEvent must be non-nil") + return "", errors.New("HistoryEvent must be non-nil") } else if e.Timestamp == nil { - return errors.New("HistoryEvent must have a non-nil timestamp") + return "", errors.New("HistoryEvent must have a non-nil timestamp") } startEvent := e.GetExecutionStarted() if startEvent == nil { - return errors.New("HistoryEvent must be an ExecutionStartedEvent") + return "", errors.New("HistoryEvent must be an ExecutionStartedEvent") } + *instanceID = startEvent.OrchestrationInstance.InstanceId // TODO: Support for re-using orchestration instance IDs res, err := tx.ExecContext( @@ -462,19 +504,69 @@ func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context e.Timestamp.AsTime(), ) if err != nil { - return fmt.Errorf("failed to insert into [Instances] table: %w", err) + return "", fmt.Errorf("failed to insert into [Instances] table: %w", err) } rows, err := res.RowsAffected() if err != nil { - return fmt.Errorf("failed to count the rows affected: %w", err) + return "", fmt.Errorf("failed to count the rows affected: %w", err) } if rows <= 0 { - return backend.ErrDuplicateEvent + // query RuntimeStatus for the existing instance + queryRows, err := tx.QueryContext( + ctx, + `SELECT [RuntimeStatus] FROM Instances WHERE [InstanceID] = ?`, + startEvent.OrchestrationInstance.InstanceId, + ) + var runtimeStatus *string + if queryRows.Next() { + err = queryRows.Scan(&runtimeStatus) + if errors.Is(err, sql.ErrNoRows) { + return "", api.ErrInstanceNotFound + } else if err != nil { + return "", fmt.Errorf("failed to scan the Instances table result: %w", err) + } + } + return *runtimeStatus, backend.ErrDuplicateEvent } + return "", nil +} - *instanceID = startEvent.OrchestrationInstance.InstanceId +func (be *sqliteBackend) cleanupOrchestrationStateInternal(ctx context.Context, tx *sql.Tx, id api.InstanceID) error { + row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id)) + if err := row.Err(); err != nil { + return fmt.Errorf("failed to query for instance existence: %w", err) + } + + var unused int + if err := row.Scan(&unused); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return api.ErrInstanceNotFound + } else { + return fmt.Errorf("failed to scan instance existence: %w", err) + } + } + + _, err := tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ?", string(id)) + if err != nil { + return fmt.Errorf("failed to delete from the Instances table: %w", err) + } + + _, err = tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id)) + if err != nil { + return fmt.Errorf("failed to delete from History table: %w", err) + } + + _, err = tx.ExecContext(ctx, "DELETE FROM NewEvents WHERE [InstanceID] = ?", string(id)) + if err != nil { + return fmt.Errorf("failed to delete from NewEvents table: %w", err) + } + + _, err = tx.ExecContext(ctx, "DELETE FROM NewTasks WHERE [InstanceID] = ?", string(id)) + if err != nil { + return fmt.Errorf("failed to delete from NewTasks table: %w", err) + } return nil } @@ -867,47 +959,6 @@ func (be *sqliteBackend) PurgeOrchestrationState(ctx context.Context, id api.Ins return fmt.Errorf("failed to delete from History table: %w", err) } - if err = tx.Commit(); err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - return nil -} - -func (be *sqliteBackend) CleanupOrchestration(ctx context.Context, id api.InstanceID) error { - if err := be.ensureDB(); err != nil { - return err - } - - tx, err := be.db.BeginTx(ctx, nil) - if err != nil { - return err - } - defer tx.Rollback() - - row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id)) - if err := row.Err(); err != nil { - return fmt.Errorf("failed to query for instance existence: %w", err) - } - - var unused int - if err := row.Scan(&unused); err != nil { - if errors.Is(err, sql.ErrNoRows) { - return api.ErrInstanceNotFound - } else { - return fmt.Errorf("failed to scan instance existence: %w", err) - } - } - - _, err = tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ?", string(id)) - if err != nil { - return fmt.Errorf("failed to delete from the Instances table: %w", err) - } - - _, err = tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id)) - if err != nil { - return fmt.Errorf("failed to delete from History table: %w", err) - } - _, err = tx.ExecContext(ctx, "DELETE FROM NewEvents WHERE [InstanceID] = ?", string(id)) if err != nil { return fmt.Errorf("failed to delete from NewEvents table: %w", err) diff --git a/client/client_grpc.go b/client/client_grpc.go index 8209d4d..a57453f 100644 --- a/client/client_grpc.go +++ b/client/client_grpc.go @@ -31,7 +31,10 @@ func NewTaskHubGrpcClient(cc grpc.ClientConnInterface, logger backend.Logger) *T // ScheduleNewOrchestration schedules a new orchestration instance with a specified set of options for execution. func (c *TaskHubGrpcClient) ScheduleNewOrchestration(ctx context.Context, orchestrator string, opts ...api.NewOrchestrationOptions) (api.InstanceID, error) { - req := &protos.CreateInstanceRequest{Name: orchestrator} + req := &protos.CreateInstanceRequest{ + Name: orchestrator, + CreateInstanceOption: &protos.CreateInstanceOption{}, + } for _, configure := range opts { configure(req) } diff --git a/internal/protos/orchestrator_service.pb.go b/internal/protos/orchestrator_service.pb.go index 035f6f6..2ffcba3 100644 --- a/internal/protos/orchestrator_service.pb.go +++ b/internal/protos/orchestrator_service.pb.go @@ -94,7 +94,7 @@ func (OrchestrationStatus) EnumDescriptor() ([]byte, []int) { type CreateOrchestrationAction int32 const ( - CreateOrchestrationAction_THROW CreateOrchestrationAction = 0 + CreateOrchestrationAction_ERROR CreateOrchestrationAction = 0 CreateOrchestrationAction_SKIP CreateOrchestrationAction = 1 CreateOrchestrationAction_TERMINATE CreateOrchestrationAction = 2 ) @@ -102,12 +102,12 @@ const ( // Enum value maps for CreateOrchestrationAction. var ( CreateOrchestrationAction_name = map[int32]string{ - 0: "THROW", + 0: "ERROR", 1: "SKIP", 2: "TERMINATE", } CreateOrchestrationAction_value = map[string]int32{ - "THROW": 0, + "ERROR": 0, "SKIP": 1, "TERMINATE": 2, } @@ -2851,7 +2851,7 @@ func (x *CreateInstanceOption) GetAction() CreateOrchestrationAction { if x != nil { return x.Action } - return CreateOrchestrationAction_THROW + return CreateOrchestrationAction_ERROR } type CreateInstanceResponse struct { @@ -6580,8 +6580,8 @@ var file_orchestrator_service_proto_rawDesc = []byte{ 0x48, 0x45, 0x53, 0x54, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x53, 0x55, 0x53, 0x50, 0x45, 0x4e, 0x44, 0x45, 0x44, 0x10, 0x07, 0x2a, 0x3f, 0x0a, 0x19, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4f, 0x72, 0x63, 0x68, 0x65, 0x73, 0x74, 0x72, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x48, - 0x52, 0x4f, 0x57, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x53, 0x4b, 0x49, 0x50, 0x10, 0x01, 0x12, + 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, + 0x52, 0x4f, 0x52, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x53, 0x4b, 0x49, 0x50, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x45, 0x52, 0x4d, 0x49, 0x4e, 0x41, 0x54, 0x45, 0x10, 0x02, 0x32, 0xfc, 0x0a, 0x0a, 0x15, 0x54, 0x61, 0x73, 0x6b, 0x48, 0x75, 0x62, 0x53, 0x69, 0x64, 0x65, 0x63, 0x61, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x05, 0x48, 0x65, 0x6c, 0x6c, diff --git a/submodules/durabletask-protobuf b/submodules/durabletask-protobuf index 0ee8153..cb1bb8c 160000 --- a/submodules/durabletask-protobuf +++ b/submodules/durabletask-protobuf @@ -1 +1 @@ -Subproject commit 0ee81538221a900dfa47b8e25feda2d65f08549e +Subproject commit cb1bb8c1a3c59797f8038c68f0771fe2171a8077 diff --git a/tests/backend_test.go b/tests/backend_test.go index 97d3158..6b29a86 100644 --- a/tests/backend_test.go +++ b/tests/backend_test.go @@ -337,7 +337,8 @@ func Test_UninitializedBackend(t *testing.T) { assert.Equal(t, err, backend.ErrNotInitialized) err = be.CompleteOrchestrationWorkItem(ctx, nil) assert.Equal(t, err, backend.ErrNotInitialized) - err = be.CreateOrchestrationInstance(ctx, nil) + option := &protos.CreateInstanceOption{} + err = be.CreateOrchestrationInstance(ctx, nil, option) assert.Equal(t, err, backend.ErrNotInitialized) _, err = be.GetOrchestrationMetadata(ctx, api.InstanceID("")) assert.Equal(t, err, backend.ErrNotInitialized) @@ -487,7 +488,8 @@ func createOrchestrationInstance(t assert.TestingT, be backend.Backend, instance }, }, } - err := be.CreateOrchestrationInstance(ctx, e) + option := &protos.CreateInstanceOption{} + err := be.CreateOrchestrationInstance(ctx, e, option) return assert.NoError(t, err) } diff --git a/tests/grpc/grpc_test.go b/tests/grpc/grpc_test.go index 515458f..3a0d789 100644 --- a/tests/grpc/grpc_test.go +++ b/tests/grpc/grpc_test.go @@ -255,7 +255,7 @@ func Test_Grpc_ReuseInstanceIDSkip(t *testing.T) { instanceIDs := api.InstanceID("SKIP_IF_RUNNING_OR_COMPLETED") ReuseIdOption := &api.OrchestrationIDReuseOption{ CreateOrchestrationAction: api.SKIP, - OrchestrationStatuses: []api.OrchestrationStatus{ + OrchestrationStatuses: []protos.OrchestrationStatus{ api.RUNNING, api.COMPLETED, api.PENDING, @@ -307,7 +307,7 @@ func Test_Grpc_ReuseInstanceIDTerminate(t *testing.T) { instanceIDs := api.InstanceID("TERMINATE_IF_RUNNING_OR_COMPLETED") ReuseIdOption := &api.OrchestrationIDReuseOption{ CreateOrchestrationAction: api.TERMINATE, - OrchestrationStatuses: []api.OrchestrationStatus{ + OrchestrationStatuses: []protos.OrchestrationStatus{ api.RUNNING, api.COMPLETED, api.PENDING, @@ -357,18 +357,10 @@ func Test_Grpc_ReuseInstanceIDThrow(t *testing.T) { cancelListener := startGrpcListener(t, r) defer cancelListener() instanceIDs := api.InstanceID("THROW_IF_RUNNING_OR_COMPLETED") - ReuseIdOption := &api.OrchestrationIDReuseOption{ - CreateOrchestrationAction: api.THROW, - OrchestrationStatuses: []api.OrchestrationStatus{ - api.RUNNING, - api.COMPLETED, - api.PENDING, - }, - } id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceIDs)) require.NoError(t, err) - id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(id), api.WithOrchestrationReuseOption(ReuseIdOption)) + id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(id)) if assert.Error(t, err) { assert.Contains(t, err.Error(), "orchestration instance already exists") } diff --git a/tests/mocks/Backend.go b/tests/mocks/Backend.go index c558e04..cec693f 100644 --- a/tests/mocks/Backend.go +++ b/tests/mocks/Backend.go @@ -168,53 +168,6 @@ func (_c *Backend_AddNewOrchestrationEvent_Call) RunAndReturn(run func(context.C return _c } -// CleanupOrchestration provides a mock function with given fields: _a0, _a1 -func (_m *Backend) CleanupOrchestration(_a0 context.Context, _a1 api.InstanceID) error { - ret := _m.Called(_a0, _a1) - - if len(ret) == 0 { - panic("no return value specified for CleanupOrchestration") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID) error); ok { - r0 = rf(_a0, _a1) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Backend_CleanupOrchestration_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanupOrchestration' -type Backend_CleanupOrchestration_Call struct { - *mock.Call -} - -// CleanupOrchestration is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 api.InstanceID -func (_e *Backend_Expecter) CleanupOrchestration(_a0 interface{}, _a1 interface{}) *Backend_CleanupOrchestration_Call { - return &Backend_CleanupOrchestration_Call{Call: _e.mock.On("CleanupOrchestration", _a0, _a1)} -} - -func (_c *Backend_CleanupOrchestration_Call) Run(run func(_a0 context.Context, _a1 api.InstanceID)) *Backend_CleanupOrchestration_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(api.InstanceID)) - }) - return _c -} - -func (_c *Backend_CleanupOrchestration_Call) Return(_a0 error) *Backend_CleanupOrchestration_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *Backend_CleanupOrchestration_Call) RunAndReturn(run func(context.Context, api.InstanceID) error) *Backend_CleanupOrchestration_Call { - _c.Call.Return(run) - return _c -} - // CompleteActivityWorkItem provides a mock function with given fields: _a0, _a1 func (_m *Backend) CompleteActivityWorkItem(_a0 context.Context, _a1 *backend.ActivityWorkItem) error { ret := _m.Called(_a0, _a1) @@ -309,17 +262,17 @@ func (_c *Backend_CompleteOrchestrationWorkItem_Call) RunAndReturn(run func(cont return _c } -// CreateOrchestrationInstance provides a mock function with given fields: _a0, _a1 -func (_m *Backend) CreateOrchestrationInstance(_a0 context.Context, _a1 *protos.HistoryEvent) error { - ret := _m.Called(_a0, _a1) +// CreateOrchestrationInstance provides a mock function with given fields: _a0, _a1, _a2 +func (_m *Backend) CreateOrchestrationInstance(_a0 context.Context, _a1 *protos.HistoryEvent, _a2 *protos.CreateInstanceOption) error { + ret := _m.Called(_a0, _a1, _a2) if len(ret) == 0 { panic("no return value specified for CreateOrchestrationInstance") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *protos.HistoryEvent) error); ok { - r0 = rf(_a0, _a1) + if rf, ok := ret.Get(0).(func(context.Context, *protos.HistoryEvent, *protos.CreateInstanceOption) error); ok { + r0 = rf(_a0, _a1, _a2) } else { r0 = ret.Error(0) } @@ -335,13 +288,14 @@ type Backend_CreateOrchestrationInstance_Call struct { // CreateOrchestrationInstance is a helper method to define mock.On call // - _a0 context.Context // - _a1 *protos.HistoryEvent -func (_e *Backend_Expecter) CreateOrchestrationInstance(_a0 interface{}, _a1 interface{}) *Backend_CreateOrchestrationInstance_Call { - return &Backend_CreateOrchestrationInstance_Call{Call: _e.mock.On("CreateOrchestrationInstance", _a0, _a1)} +// - _a2 *protos.CreateInstanceOption +func (_e *Backend_Expecter) CreateOrchestrationInstance(_a0 interface{}, _a1 interface{}, _a2 interface{}) *Backend_CreateOrchestrationInstance_Call { + return &Backend_CreateOrchestrationInstance_Call{Call: _e.mock.On("CreateOrchestrationInstance", _a0, _a1, _a2)} } -func (_c *Backend_CreateOrchestrationInstance_Call) Run(run func(_a0 context.Context, _a1 *protos.HistoryEvent)) *Backend_CreateOrchestrationInstance_Call { +func (_c *Backend_CreateOrchestrationInstance_Call) Run(run func(_a0 context.Context, _a1 *protos.HistoryEvent, _a2 *protos.CreateInstanceOption)) *Backend_CreateOrchestrationInstance_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*protos.HistoryEvent)) + run(args[0].(context.Context), args[1].(*protos.HistoryEvent), args[2].(*protos.CreateInstanceOption)) }) return _c } @@ -351,7 +305,7 @@ func (_c *Backend_CreateOrchestrationInstance_Call) Return(_a0 error) *Backend_C return _c } -func (_c *Backend_CreateOrchestrationInstance_Call) RunAndReturn(run func(context.Context, *protos.HistoryEvent) error) *Backend_CreateOrchestrationInstance_Call { +func (_c *Backend_CreateOrchestrationInstance_Call) RunAndReturn(run func(context.Context, *protos.HistoryEvent, *protos.CreateInstanceOption) error) *Backend_CreateOrchestrationInstance_Call { _c.Call.Return(run) return _c }