Skip to content

Commit

Permalink
implement new design
Browse files Browse the repository at this point in the history
  • Loading branch information
kaibocai committed Dec 1, 2023
1 parent 9e8ae99 commit 56663bc
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 223 deletions.
63 changes: 17 additions & 46 deletions api/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,29 @@ type OrchestrationMetadata struct {
FailureDetails *protos.TaskFailureDetails
}

type CreateOrchestrationAction int
type CreateOrchestrationAction protos.CreateOrchestrationAction

const (
THROW CreateOrchestrationAction = iota
SKIP
TERMINATE
SKIP = protos.CreateOrchestrationAction_SKIP
TERMINATE = protos.CreateOrchestrationAction_TERMINATE
)

type OrchestrationStatus int
type OrchestrationStatus protos.OrchestrationStatus

const (
RUNNING OrchestrationStatus = iota
COMPLETED
// CONTINUED_AS_NEW
FAILED
// CANCELED
TERMINATED
PENDING
// SUSPENDED
RUNNING = protos.OrchestrationStatus_ORCHESTRATION_STATUS_RUNNING
COMPLETED = protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED
// CONTINUED_AS_NEW = protos.OrchestrationStatus_ORCHESTRATION_STATUS_CONTINUED_AS_NEW
FAILED = protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED
// CANCELED = protos.OrchestrationStatus_ORCHESTRATION_STATUS_CANCELED
TERMINATED = protos.OrchestrationStatus_ORCHESTRATION_STATUS_TERMINATED
PENDING = protos.OrchestrationStatus_ORCHESTRATION_STATUS_PENDING
// SUSPENDED = protos.OrchestrationStatus_ORCHESTRATION_STATUS_SUSPENDED
)

type OrchestrationIDReuseOption struct {
CreateOrchestrationAction CreateOrchestrationAction
OrchestrationStatuses []OrchestrationStatus
CreateOrchestrationAction protos.CreateOrchestrationAction
OrchestrationStatuses []protos.OrchestrationStatus
}

// NewOrchestrationOptions configures options for starting a new orchestration.
Expand All @@ -87,38 +86,10 @@ func WithInstanceID(id InstanceID) NewOrchestrationOptions {
// WithOrchestrationReuseOption configures Orchestration ID reuse policy.
func WithOrchestrationReuseOption(option *OrchestrationIDReuseOption) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) error {
// initialize CreateInstanceOption
req.CreateInstanceOption = &protos.CreateInstanceOption{}
// set action
switch option.CreateOrchestrationAction {
case SKIP:
req.CreateInstanceOption.Action = protos.CreateOrchestrationAction_SKIP
case TERMINATE:
req.CreateInstanceOption.Action = protos.CreateOrchestrationAction_TERMINATE
case THROW:
req.CreateInstanceOption.Action = protos.CreateOrchestrationAction_THROW
}

// set status
for _, status := range option.OrchestrationStatuses {
switch status {
case RUNNING:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_RUNNING)
case COMPLETED:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED)
// case CONTINUED_AS_NEW:
// req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_CONTINUED_AS_NEW)
case FAILED:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED)
// case CANCELED:
// req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_CANCELED)
case TERMINATED:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_TERMINATED)
case PENDING:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_PENDING)
// case SUSPENDED:
// req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_SUSPENDED)
}
}
req.CreateInstanceOption.Action = option.CreateOrchestrationAction
req.CreateInstanceOption.OperationStatus = option.OrchestrationStatuses
return nil
}
}
Expand Down
7 changes: 1 addition & 6 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Backend interface {

// CreateOrchestrationInstance creates a new orchestration instance with a history event that
// wraps a ExecutionStarted event.
CreateOrchestrationInstance(context.Context, *HistoryEvent) error
CreateOrchestrationInstance(context.Context, *HistoryEvent, *protos.CreateInstanceOption) error

// AddNewEvent adds a new orchestration event to the specified orchestration instance.
AddNewOrchestrationEvent(context.Context, api.InstanceID, *HistoryEvent) error
Expand Down Expand Up @@ -91,11 +91,6 @@ type Backend interface {
// [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist.
// [api.ErrNotCompleted] is returned if the specified orchestration instance is still running.
PurgeOrchestrationState(context.Context, api.InstanceID) error

// CleanupOrchestration clean up all records for the specified orchestration instance in the entire task hub.
//
// [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist.
CleanupOrchestration(context.Context, api.InstanceID) error
}

// MarshalHistoryEvent serializes the [HistoryEvent] into a protobuf byte array.
Expand Down
3 changes: 2 additions & 1 deletion backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func (c *backendClient) ScheduleNewOrchestration(ctx context.Context, orchestrat

tc := helpers.TraceContextFromSpan(span)
e := helpers.NewExecutionStartedEvent(req.Name, req.InstanceId, req.Input, nil, tc)
if err := c.be.CreateOrchestrationInstance(ctx, e); err != nil {
option := &protos.CreateInstanceOption{}
if err := c.be.CreateOrchestrationInstance(ctx, e, option); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return api.EmptyInstanceID, fmt.Errorf("failed to start orchestration: %w", err)
Expand Down
41 changes: 2 additions & 39 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -311,47 +310,11 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst
ctx, span := helpers.StartNewCreateOrchestrationSpan(ctx, req.Name, req.Version.GetValue(), instanceID)
defer span.End()

// retreive instance with instanceID
metadata, err := g.backend.GetOrchestrationMetadata(ctx, api.InstanceID(instanceID))
if err != nil {
// if the instance doesn't exist, create instance directly.
if errors.Is(err, api.ErrInstanceNotFound) {
return createInstance(ctx, g.backend, instanceID, req, span)
} else {
return nil, err
}
}

// build target status set
statusSet := convertStatusToSet(req.CreateInstanceOption.OperationStatus)

// if current status is not one of the target status, create instance directly
if !statusSet[metadata.RuntimeStatus] {
return createInstance(ctx, g.backend, instanceID, req, span)
} else {
if req.CreateInstanceOption.Action == protos.CreateOrchestrationAction_THROW {
// throw ErrDuplicateEvent since instance already exists and the status is in target status set
return nil, api.ErrDuplicateInstance
} else if req.CreateInstanceOption.Action == protos.CreateOrchestrationAction_SKIP {
// skip creating new instance
g.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", instanceID)
return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil
} else {
// CreateInstanceAction_TERMINATE
// terminate existing instance and create a new one
if err := g.backend.CleanupOrchestration(ctx, api.InstanceID(instanceID)); err != nil {
return nil, err
}
return createInstance(ctx, g.backend, instanceID, req, span)
}
}
}

func createInstance(ctx context.Context, be Backend, instanceID string, req *protos.CreateInstanceRequest, span trace.Span) (*protos.CreateInstanceResponse, error) {
e := helpers.NewExecutionStartedEvent(req.Name, instanceID, req.Input, nil, helpers.TraceContextFromSpan(span))
if err := be.CreateOrchestrationInstance(ctx, e); err != nil {
if err := g.backend.CreateOrchestrationInstance(ctx, e, req.CreateInstanceOption); err != nil {
return nil, err
}

return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil
}

Expand Down
157 changes: 104 additions & 53 deletions backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func NewSqliteBackend(opts *SqliteOptions, logger backend.Logger) backend.Backen
be.dsn = opts.FilePath
}

// used for local debug
// be.dsn = "file:file.sqlite"

return be
}

Expand Down Expand Up @@ -334,7 +337,7 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *
if es := msg.HistoryEvent.GetExecutionStarted(); es != nil {
// Need to insert a new row into the DB
var instanceID string
if err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx, &instanceID); err != nil {
if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx, &instanceID); err != nil {
if err == backend.ErrDuplicateEvent {
be.logger.Warnf(
"%v: dropping sub-orchestration creation event because an instance with the target ID (%v) already exists.",
Expand Down Expand Up @@ -390,7 +393,7 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *
}

// CreateOrchestrationInstance implements backend.Backend
func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent) error {
func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent, option *protos.CreateInstanceOption) error {
if err := be.ensureDB(); err != nil {
return err
}
Expand All @@ -402,8 +405,38 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac
defer tx.Rollback()

var instanceID string
if err := be.createOrchestrationInstanceInternal(ctx, e, tx, &instanceID); err != nil {
return err
runtimeStatus, err := be.createOrchestrationInstanceInternal(ctx, e, tx, &instanceID)
if err != nil {
// instance alredy exists
if errors.Is(err, backend.ErrDuplicateEvent) {
// build target status set
fmt.Println("@@", option)
statusSet := convertStatusToSet(option.OperationStatus)
// if current status is not one of the target status, return error
if !statusSet[helpers.FromRuntimeStatusString(runtimeStatus)] {
return api.ErrDuplicateInstance
} else {
if option.Action == protos.CreateOrchestrationAction_SKIP {
// Log an warning message and skip cerating new instance
be.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", instanceID)
return nil
} else if option.Action == protos.CreateOrchestrationAction_TERMINATE {
// terminate existing instance
if err := be.cleanupOrchestrationStateInternal(ctx, tx, api.InstanceID(instanceID)); err != nil {
return err
}
// create a new instance
if _, err := be.createOrchestrationInstanceInternal(ctx, e, tx, &instanceID); err != nil {
return err
}
} else {
// CreateInstanceAction_THROW
return api.ErrDuplicateInstance
}
}
} else {
return err
}
}

eventPayload, err := backend.MarshalHistoryEvent(e)
Expand All @@ -429,17 +462,26 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac
return nil
}

func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context, e *backend.HistoryEvent, tx *sql.Tx, instanceID *string) error {
func convertStatusToSet(statuses []protos.OrchestrationStatus) map[protos.OrchestrationStatus]bool {
statusSet := make(map[protos.OrchestrationStatus]bool)
for _, status := range statuses {
statusSet[status] = true
}
return statusSet
}

func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context, e *backend.HistoryEvent, tx *sql.Tx, instanceID *string) (string, error) {
if e == nil {
return errors.New("HistoryEvent must be non-nil")
return "", errors.New("HistoryEvent must be non-nil")
} else if e.Timestamp == nil {
return errors.New("HistoryEvent must have a non-nil timestamp")
return "", errors.New("HistoryEvent must have a non-nil timestamp")
}

startEvent := e.GetExecutionStarted()
if startEvent == nil {
return errors.New("HistoryEvent must be an ExecutionStartedEvent")
return "", errors.New("HistoryEvent must be an ExecutionStartedEvent")
}
*instanceID = startEvent.OrchestrationInstance.InstanceId

// TODO: Support for re-using orchestration instance IDs
res, err := tx.ExecContext(
Expand All @@ -462,19 +504,69 @@ func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context
e.Timestamp.AsTime(),
)
if err != nil {
return fmt.Errorf("failed to insert into [Instances] table: %w", err)
return "", fmt.Errorf("failed to insert into [Instances] table: %w", err)
}

rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("failed to count the rows affected: %w", err)
return "", fmt.Errorf("failed to count the rows affected: %w", err)
}

if rows <= 0 {
return backend.ErrDuplicateEvent
// query RuntimeStatus for the existing instance
queryRows, err := tx.QueryContext(
ctx,
`SELECT [RuntimeStatus] FROM Instances WHERE [InstanceID] = ?`,
startEvent.OrchestrationInstance.InstanceId,
)
var runtimeStatus *string
if queryRows.Next() {
err = queryRows.Scan(&runtimeStatus)
if errors.Is(err, sql.ErrNoRows) {
return "", api.ErrInstanceNotFound
} else if err != nil {
return "", fmt.Errorf("failed to scan the Instances table result: %w", err)
}
}
return *runtimeStatus, backend.ErrDuplicateEvent
}
return "", nil
}

*instanceID = startEvent.OrchestrationInstance.InstanceId
func (be *sqliteBackend) cleanupOrchestrationStateInternal(ctx context.Context, tx *sql.Tx, id api.InstanceID) error {
row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id))
if err := row.Err(); err != nil {
return fmt.Errorf("failed to query for instance existence: %w", err)
}

var unused int
if err := row.Scan(&unused); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return api.ErrInstanceNotFound
} else {
return fmt.Errorf("failed to scan instance existence: %w", err)
}
}

_, err := tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from the Instances table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from History table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM NewEvents WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from NewEvents table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM NewTasks WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from NewTasks table: %w", err)
}
return nil
}

Expand Down Expand Up @@ -867,47 +959,6 @@ func (be *sqliteBackend) PurgeOrchestrationState(ctx context.Context, id api.Ins
return fmt.Errorf("failed to delete from History table: %w", err)
}

if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}

func (be *sqliteBackend) CleanupOrchestration(ctx context.Context, id api.InstanceID) error {
if err := be.ensureDB(); err != nil {
return err
}

tx, err := be.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id))
if err := row.Err(); err != nil {
return fmt.Errorf("failed to query for instance existence: %w", err)
}

var unused int
if err := row.Scan(&unused); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return api.ErrInstanceNotFound
} else {
return fmt.Errorf("failed to scan instance existence: %w", err)
}
}

_, err = tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from the Instances table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from History table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM NewEvents WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from NewEvents table: %w", err)
Expand Down
Loading

0 comments on commit 56663bc

Please sign in to comment.