Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix server stuck issue when test timeout exception #61

Merged
merged 9 commits into from
Jun 28, 2024
2 changes: 1 addition & 1 deletion backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *backendClient) ScheduleNewOrchestration(ctx context.Context, orchestrat
func (c *backendClient) FetchOrchestrationMetadata(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error) {
metadata, err := c.be.GetOrchestrationMetadata(ctx, id)
if err != nil {
return nil, fmt.Errorf("Failed to fetch orchestration metadata: %w", err)
return nil, fmt.Errorf("failed to fetch orchestration metadata: %w", err)
}
return metadata, nil
}
Expand Down
145 changes: 124 additions & 21 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
context "context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -28,17 +29,20 @@ var errShuttingDown error = status.Error(codes.Canceled, "shutting down")

type ExecutionResults struct {
Response *protos.OrchestratorResponse
complete chan interface{}
complete chan struct{}
cgillum marked this conversation as resolved.
Show resolved Hide resolved
pending chan string
}

type activityExecutionResult struct {
response *protos.ActivityResponse
complete chan interface{}
complete chan struct{}
pending chan string
}

type Executor interface {
ExecuteOrchestrator(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*ExecutionResults, error)
ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
Shutdown(ctx context.Context) error
}

type grpcExecutor struct {
Expand Down Expand Up @@ -96,9 +100,8 @@ func NewGrpcExecutor(be Backend, logger Logger, opts ...grpcExecutorOptions) (ex

// ExecuteOrchestrator implements Executor
func (executor *grpcExecutor) ExecuteOrchestrator(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*ExecutionResults, error) {
result := &ExecutionResults{complete: make(chan interface{})}
result := &ExecutionResults{complete: make(chan struct{})}
executor.pendingOrchestrators.Store(iid, result)
defer executor.pendingOrchestrators.Delete(iid)

workItem := &protos.WorkItem{
Request: &protos.WorkItem_OrchestratorRequest{
Expand All @@ -121,12 +124,15 @@ func (executor *grpcExecutor) ExecuteOrchestrator(ctx context.Context, iid api.I
}

// Wait for the connected worker to signal that it's done executing the work-item
// TODO: Timeout logic - i.e. handle the case where we never hear back from the remote worker (due to a hang, etc.).
select {
case <-ctx.Done():
executor.logger.Warnf("%s: context canceled before receiving orchestrator result", iid)
return nil, ctx.Err()
case <-result.complete:
executor.logger.Debugf("%s: orchestrator got result", iid)
if result.Response == nil {
return nil, errors.New("operation aborted")
cgillum marked this conversation as resolved.
Show resolved Hide resolved
}
}

return result, nil
Expand All @@ -135,9 +141,8 @@ func (executor *grpcExecutor) ExecuteOrchestrator(ctx context.Context, iid api.I
// ExecuteActivity implements Executor
func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.InstanceID, e *protos.HistoryEvent) (*protos.HistoryEvent, error) {
key := getActivityExecutionKey(string(iid), e.EventId)
result := &activityExecutionResult{complete: make(chan interface{})}
result := &activityExecutionResult{complete: make(chan struct{})}
executor.pendingActivities.Store(key, result)
defer executor.pendingActivities.Delete(key)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing the calls to delete these keys? Won't that result in a memory leak?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will be deleted by CompleteActivityTask. Additionally, if the GetWorkItems strema is closed while the operation is in progress, the cleanup logic added to GetWorkItems will delete any "leftover" item in this map

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason for moving the delete operation from here to other places? If I understand corretly, the delete then needs to be added in CompleteActivityTask, in the defer block in GetWorkItems and potentially Shutdown as well. Instead if we just keep it here, it would handle all scenarios. Also looks more intuitive to me(We would still need to close the complete channel in getWorkItems and Shutdown)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I wrote this a week ago and now I cannot remember why, even after reading the code :( It may work

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then this may help to avoid closing a closed channel, as we delete it in time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's impossible currently for close() to be called on the same channel twice

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I mean currently implementation will help avoid closing a closed channel, but with defer executor.pendingActivities.Delete(key), we can potentially close a closed channel if the stream is closed and then the shutdown is called.

task := e.GetTaskScheduled()
workItem := &protos.WorkItem{
Expand All @@ -162,12 +167,15 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
}

// Wait for the connected worker to signal that it's done executing the work-item
// TODO: Timeout logic
select {
case <-ctx.Done():
executor.logger.Warnf("%s/%s#%d: context canceled before receiving activity result", iid, task.Name, e.EventId)
return nil, ctx.Err()
case <-result.complete:
executor.logger.Debugf("%s: activity got result", key)
if result.response == nil {
return nil, errors.New("operation aborted")
}
}

var responseEvent *protos.HistoryEvent
Expand All @@ -181,9 +189,27 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
}

// Shutdown implements Executor
func (g *grpcExecutor) Shutdown(ctx context.Context) {
func (g *grpcExecutor) Shutdown(ctx context.Context) error {
// closing the work item queue is a signal for shutdown
close(g.workItemQueue)

// Iterate through all pending items and close them to unblock the goroutines waiting on this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes to the Shutdown routine related to the original issue or is this unrelated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my first attempt at fixing the issue. It turned out it didn't fix it, but it seemed like something that would be helpful to have nevertheless, to ensure we don't leave goroutines waiting

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not delete the items as well from pendingActivities/pendingOrchestrators as well in case of shutdown?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that shutdown is called before the object is abandoned. The memory is released by the GC regardless. The important thing is to close the channels in case there are goroutines waiting on them.

g.pendingActivities.Range(func(_, value any) bool {
p, ok := value.(*activityExecutionResult)
if ok {
close(p.complete)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we check if the complete channel is already closed as it's closed at multiple places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the channel is in the map, it's not been closed. Before closing the channel, the calls use LoadAndDelete which means that the channel is "removed from the map" before close is called on it

}
return true
})
g.pendingOrchestrators.Range(func(_, value any) bool {
p, ok := value.(*ExecutionResults)
if ok {
close(p.complete)
}
return true
})

return nil
}

// Hello implements protos.TaskHubSidecarServiceServer
Expand All @@ -202,22 +228,73 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
callback := g.onWorkItemConnection
if callback != nil {
if err := callback(stream.Context()); err != nil {
message := fmt.Sprint("unable to establish work item stream at this time: ", err)
message := "unable to establish work item stream at this time: " + err.Error()
g.logger.Warn(message)
return status.Errorf(codes.Unavailable, message)
}
}

// Collect all pending activities on this stream
// Note: we don't need sync.Map's here because access is only on this thread
pendingActivities := make(map[string]struct{})
pendingActivityCh := make(chan string, 1)
pendingOrchestrators := make(map[string]struct{})
pendingOrchestratorCh := make(chan string, 1)
defer func() {
// If there's any pending activity left, remove them
for key := range pendingActivities {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of pendingActivities or pendingOrchestrators here? Since we just need to delete all pending keys that are there in g.pendingActivities or g.pendingOrchestrators, can we not simply iterate in the map and delete? This would avoid sending the items to pendingChannel as well for every item being processed..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we do not delete all keys in g.pendingActivities.

There could be multiple connections to GetWorkItems, so each connection needs its own map to track what pending items are related to that connection.

Using a local variable, rather than depending on the global g.pendingActivities/g.pendingOrchestrators, means we can avoid using locks (because of how gRPC works, there's no issue with concurrent access there) and simplify the cleanup process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting. We support multiple connections to GetWorkItems? I didn't know and was actually going to open an issue for it!

A question though: In case we have say 2 connections to the server, when the workItem gets added to g.workItemQueue, how does it properly gets allocated to the correct stream? It seems like two concurrent streams trying to read a common channel.. won't that have any issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I had assumed it was supported to have multiple GetWOrkItems streams. But this patch doesn't prevent it.

In case we have say 2 connections to the server, when the workItem gets added to g.workItemQueue, how does it properly gets allocated to the correct stream?

With multiple GetWorkItems, there's multiple goroutines blocked here:

https://github.com/microsoft/durabletask-go/pull/61/files#diff-fe6ca2dcacabca215bef6921c53b3c197d9c9ea4d1febcf15392a1e441dddc07R166

In this case, one of those listening, at "random", will get the message

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. In that case, it's required to maintain the stream local map. Thanks for explaining.

g.logger.Debugf("cleaning up pending activity: %s", key)
p, ok := g.pendingActivities.LoadAndDelete(key)
if ok {
pending := p.(*activityExecutionResult)
close(pending.complete)
}
}
for key := range pendingOrchestrators {
g.logger.Debugf("cleaning up pending orchestrator: %s", key)
p, ok := g.pendingOrchestrators.LoadAndDelete(api.InstanceID(key))
if ok {
pending := p.(*ExecutionResults)
close(pending.complete)
}
}
}()

// The worker client invokes this method, which streams back work-items as they arrive.
for {
select {
case <-stream.Context().Done():
g.logger.Infof("work item stream closed")
g.logger.Info("work item stream closed")
return nil
case wi := <-g.workItemQueue:
case wi, ok := <-g.workItemQueue:
if !ok {
continue
}
switch x := wi.Request.(type) {
case *protos.WorkItem_OrchestratorRequest:
key := x.OrchestratorRequest.GetInstanceId()
pendingOrchestrators[key] = struct{}{}
p, ok := g.pendingOrchestrators.Load(api.InstanceID(key))
if ok {
p.(*ExecutionResults).pending = pendingOrchestratorCh
}
case *protos.WorkItem_ActivityRequest:
key := getActivityExecutionKey(x.ActivityRequest.GetOrchestrationInstance().GetInstanceId(), x.ActivityRequest.GetTaskId())
pendingActivities[key] = struct{}{}
p, ok := g.pendingActivities.Load(key)
if ok {
p.(*activityExecutionResult).pending = pendingActivityCh
}
}

if err := stream.Send(wi); err != nil {
g.logger.Errorf("encountered an error while sending work item: %v", err)
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a debug log here as well work item stream closed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add g.logger.Errorf("encountered an error while sending work item: %v", err), I think error here does not always mean stream close right.

}
case key := <-pendingActivityCh:
delete(pendingActivities, key)
case key := <-pendingOrchestratorCh:
delete(pendingOrchestrators, key)
case <-g.streamShutdownChan:
return errShuttingDown
}
Expand All @@ -227,31 +304,57 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
// CompleteOrchestratorTask implements protos.TaskHubSidecarServiceServer
func (g *grpcExecutor) CompleteOrchestratorTask(ctx context.Context, res *protos.OrchestratorResponse) (*protos.CompleteTaskResponse, error) {
iid := api.InstanceID(res.InstanceId)
if p, ok := g.pendingOrchestrators.Load(iid); ok {
pending := p.(*ExecutionResults)
pending.Response = res
pending.complete <- true
if g.deletePendingOrchestrator(iid, res) {
return emptyCompleteTaskResponse, nil
}

return emptyCompleteTaskResponse, fmt.Errorf("unknown instance ID: %s", res.InstanceId)
}

func (g *grpcExecutor) deletePendingOrchestrator(iid api.InstanceID, res *protos.OrchestratorResponse) bool {
p, ok := g.pendingOrchestrators.LoadAndDelete(iid)
if !ok {
return false
}

// Note that res can be nil in case of certain failures
pending := p.(*ExecutionResults)
pending.Response = res
if pending.pending != nil {
pending.pending <- string(iid)
}
close(pending.complete)
return true
}

// CompleteActivityTask implements protos.TaskHubSidecarServiceServer
func (g *grpcExecutor) CompleteActivityTask(ctx context.Context, res *protos.ActivityResponse) (*protos.CompleteTaskResponse, error) {
key := getActivityExecutionKey(res.InstanceId, res.TaskId)
if p, ok := g.pendingActivities.Load(key); ok {
pending := p.(*activityExecutionResult)
pending.response = res
pending.complete <- true
if g.deletePendingActivityTask(key, res) {
return emptyCompleteTaskResponse, nil
}

return emptyCompleteTaskResponse, fmt.Errorf("unknown instance ID/task ID combo: %s", key)
}

func (g *grpcExecutor) deletePendingActivityTask(key string, res *protos.ActivityResponse) bool {
p, ok := g.pendingActivities.LoadAndDelete(key)
if !ok {
return false
}

// Note that res can be nil in case of certain failures
pending := p.(*activityExecutionResult)
pending.response = res
if pending.pending != nil {
pending.pending <- key
}
close(pending.complete)
return true
}

func getActivityExecutionKey(iid string, taskID int32) string {
return fmt.Sprintf("%s/%d", iid, taskID)
return iid + "/" + strconv.FormatInt(int64(taskID), 10)
}

// CreateTaskHub implements protos.TaskHubSidecarServiceServer
Expand Down
17 changes: 12 additions & 5 deletions client/worker_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"errors"
"fmt"
"io"
"time"
Expand Down Expand Up @@ -49,6 +50,14 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T

go func() {
c.logger.Info("starting background processor")
defer func() {
c.logger.Info("stopping background processor")
// We must use a background context here as the stream's context is likely canceled
shutdownErr := executor.Shutdown(context.Background())
if shutdownErr != nil {
c.logger.Warnf("error while shutting down background processor: %v", shutdownErr)
}
}()
for {
// TODO: Manage concurrency
workItem, err := stream.Recv()
Expand All @@ -64,15 +73,13 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T

c.logger.Errorf("background processor received stream error: %v", err)

if err == io.EOF {
if errors.Is(err, io.EOF) {
retriable = true
} else if grpcStatus, ok := status.FromError(err); ok {
c.logger.Warnf("received grpc error code %v", grpcStatus.Code().String())
switch grpcStatus.Code() {
case codes.Unavailable:
fallthrough
case codes.Canceled:
fallthrough
case codes.Unavailable, codes.Canceled:
retriable = true
default:
retriable = true
}
Expand Down
5 changes: 5 additions & 0 deletions task/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func (te *taskExecutor) ExecuteOrchestrator(ctx context.Context, id api.Instance
return results, nil
}

func (te taskExecutor) Shutdown(ctx context.Context) error {
// Nothing to do
return nil
}

func unmarshalData(data []byte, v any) error {
if v == nil {
return nil
Expand Down
Loading
Loading