Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring to support orchestration history chunking #50

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 51 additions & 21 deletions backend/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,55 @@ import (
)

type activityProcessor struct {
be Backend
executor ActivityExecutor
be Backend
executor ActivityExecutor
logger Logger
maxOutputSizeInBytes int
}

type activityWorkerConfig struct {
workerOptions []NewTaskWorkerOptions
maxOutputSizeInBytes int
}

// ActivityWorkerOption is a function that configures an activity worker.
type ActivityWorkerOption func(*activityWorkerConfig)

// WithMaxConcurrentActivityInvocations sets the maximum number of concurrent activity invocations
// that the worker can process. If this limit is exceeded, the worker will block until the number of
// concurrent invocations drops below the limit.
func WithMaxConcurrentActivityInvocations(n int32) ActivityWorkerOption {
return func(o *activityWorkerConfig) {
o.workerOptions = append(o.workerOptions, WithMaxParallelism(n))
}
}

// WithMaxActivityOutputSizeInKB sets the maximum size of an activity's output.
// If an activity's output exceeds this size, the activity execution will fail with an error.
func WithMaxActivityOutputSizeInKB(n int) ActivityWorkerOption {
return func(o *activityWorkerConfig) {
o.maxOutputSizeInBytes = n * 1024
}
}

type ActivityExecutor interface {
ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
}

func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker {
processor := newActivityProcessor(be, executor)
return NewTaskWorker(be, processor, logger, opts...)
}
func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...ActivityWorkerOption) TaskWorker {
config := &activityWorkerConfig{}
for _, configure := range opts {
configure(config)
}

func newActivityProcessor(be Backend, executor ActivityExecutor) TaskProcessor {
return &activityProcessor{
be: be,
executor: executor,
processor := &activityProcessor{
be: be,
executor: executor,
logger: logger,
maxOutputSizeInBytes: config.maxOutputSizeInBytes,
}

return NewTaskWorker(be, processor, logger, config.workerOptions...)
}

// Name implements TaskProcessor
Expand Down Expand Up @@ -77,22 +108,21 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, wi WorkItem) er
}
return err
}

awi.Result = result
return nil
}

// CompleteWorkItem implements TaskDispatcher
func (ap *activityProcessor) CompleteWorkItem(ctx context.Context, wi WorkItem) error {
awi := wi.(*ActivityWorkItem)
if awi.Result == nil {
if result == nil {
return fmt.Errorf("can't complete work item '%s' with nil result", wi.Description())
}
if awi.Result.GetTaskCompleted() == nil && awi.Result.GetTaskFailed() == nil {
if result.GetTaskCompleted() == nil && result.GetTaskFailed() == nil {
return fmt.Errorf("can't complete work item '%s', which isn't TaskCompleted or TaskFailed", wi.Description())
}

return ap.be.CompleteActivityWorkItem(ctx, awi)
if p.maxOutputSizeInBytes > 0 && helpers.GetProtoSize(result) > p.maxOutputSizeInBytes {
err = fmt.Errorf("activity output size %d exceeds limit of %d bytes", helpers.GetProtoSize(result), p.maxOutputSizeInBytes)
awi.Result = helpers.NewTaskFailedEvent(awi.NewEvent.EventId, helpers.NewTaskFailureDetails(err))
} else {
awi.Result = result
}

return p.be.CompleteActivityWorkItem(ctx, awi)
}

// AbandonWorkItem implements TaskDispatcher
Expand Down
41 changes: 39 additions & 2 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"errors"
"fmt"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/internal/protos"
"google.golang.org/protobuf/proto"
)

var (
Expand All @@ -23,6 +25,25 @@ type (
TaskFailureDetails = protos.TaskFailureDetails
)

type OrchestrationStateChanges struct {
NewEvents []*HistoryEvent
NewTasks []*HistoryEvent
NewTimers []*HistoryEvent
NewMessages []OrchestratorMessage
CustomStatus *wrapperspb.StringValue
RuntimeStatus protos.OrchestrationStatus
ContinuedAsNew bool
IsPartial bool
HistoryStartIndex int
}

func (c *OrchestrationStateChanges) IsEmpty() bool {
return len(c.NewEvents) == 0 &&
len(c.NewTasks) == 0 &&
len(c.NewTimers) == 0 &&
len(c.NewMessages) == 0
}

type OrchestrationIdReusePolicyOptions func(*protos.OrchestrationIdReusePolicy) error

func WithOrchestrationIdReusePolicy(policy *protos.OrchestrationIdReusePolicy) OrchestrationIdReusePolicyOptions {
Expand All @@ -35,6 +56,7 @@ func WithOrchestrationIdReusePolicy(policy *protos.OrchestrationIdReusePolicy) O
}
}

// Backend is the interface that must be implemented by all task hub backends.
type Backend interface {
// CreateTaskHub creates a new task hub for the current backend. Task hub creation must be idempotent.
//
Expand Down Expand Up @@ -74,8 +96,23 @@ type Backend interface {

// CompleteOrchestrationWorkItem completes a work item by saving the updated runtime state to durable storage.
//
// The [OrchestrationStateChanges] parameter contains the changes to the orchestration state that should be
// saved to durable storage. The [HistoryStartIndex] field indicates the index of the first history event
// in the [OrchestrationStateChanges.NewEvents] slice. This is used to determine the index of the first
// history event in the [OrchestrationRuntimeState.History] slice, which is useful for backends that store
// the history events as an append log.
//
// The [OrchestrationStateChanges.IsPartial] field indicates whether this is a partial completion operation,
// in which case more calls to this function are expected to follow with the same work item. Partial completion
// is used to commit state updates in chunks to avoid overly large transactions. The final chunk will be committed
// with [OrchestrationStateChanges.IsPartial] set to [false].
//
// Implementations of this function should not attempt to delete the work item from storage until the final chunk
// is committed (i.e., until [OrchestrationStateChanges.IsPartial] is [false]) to ensure that the work item can be
// recovered if the process crashes before the final chunk is committed.
//
// Returns [ErrWorkItemLockLost] if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain).
CompleteOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error
CompleteOrchestrationWorkItem(context.Context, *OrchestrationWorkItem, OrchestrationStateChanges) error

// AbandonOrchestrationWorkItem undoes any state changes and returns the work item to the work item queue.
//
Expand Down
2 changes: 1 addition & 1 deletion backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *backendClient) ScheduleNewOrchestration(ctx context.Context, orchestrat
func (c *backendClient) FetchOrchestrationMetadata(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error) {
metadata, err := c.be.GetOrchestrationMetadata(ctx, id)
if err != nil {
return nil, fmt.Errorf("Failed to fetch orchestration metadata: %w", err)
return nil, fmt.Errorf("failed to fetch orchestration metadata: %w", err)
}
return metadata, nil
}
Expand Down
2 changes: 2 additions & 0 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ loop:
}

// mustEmbedUnimplementedTaskHubSidecarServiceServer implements protos.TaskHubSidecarServiceServer
//
//lint:ignore U1000 because this is a required gRPC method
func (grpcExecutor) mustEmbedUnimplementedTaskHubSidecarServiceServer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What gRPC wants us to do here is to change type grpcExecutor struct and add an embed for unimplementedTaskHubSidecarServiceServer. We shouldn't have to implement this method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this means, unfortunately. Can you provide an example of the code change I can make that will allow me to delete this method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing this method should be enough.

grpcExecutor already embeds unimplementedTaskHubSidecarServiceServer:

protos.UnimplementedTaskHubSidecarServiceServer

I tried doing it locally but right now this PR isn't building (at least not for me?) with some other really weird issues

}

Expand Down
Loading