Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix server stuck issue when test timeout exception #61

Merged
merged 9 commits into from
Jun 28, 2024

Conversation

kaibocai
Copy link
Member

Resolve #58.

Thanks @ItalyPaleAle for the fix!

@kaibocai kaibocai requested a review from cgillum January 16, 2024 22:40
backend/executor.go Show resolved Hide resolved
backend/executor.go Outdated Show resolved Hide resolved
executor.pendingActivities.Store(key, result)
defer executor.pendingActivities.Delete(key)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing the calls to delete these keys? Won't that result in a memory leak?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will be deleted by CompleteActivityTask. Additionally, if the GetWorkItems strema is closed while the operation is in progress, the cleanup logic added to GetWorkItems will delete any "leftover" item in this map

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason for moving the delete operation from here to other places? If I understand corretly, the delete then needs to be added in CompleteActivityTask, in the defer block in GetWorkItems and potentially Shutdown as well. Instead if we just keep it here, it would handle all scenarios. Also looks more intuitive to me(We would still need to close the complete channel in getWorkItems and Shutdown)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I wrote this a week ago and now I cannot remember why, even after reading the code :( It may work

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then this may help to avoid closing a closed channel, as we delete it in time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's impossible currently for close() to be called on the same channel twice

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I mean currently implementation will help avoid closing a closed channel, but with defer executor.pendingActivities.Delete(key), we can potentially close a closed channel if the stream is closed and then the shutdown is called.

backend/executor.go Show resolved Hide resolved
// closing the work item queue is a signal for shutdown
close(g.workItemQueue)

// Iterate through all pending items and close them to unblock the goroutines waiting on this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes to the Shutdown routine related to the original issue or is this unrelated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my first attempt at fixing the issue. It turned out it didn't fix it, but it seemed like something that would be helpful to have nevertheless, to ensure we don't leave goroutines waiting

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not delete the items as well from pendingActivities/pendingOrchestrators as well in case of shutdown?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that shutdown is called before the object is abandoned. The memory is released by the GC regardless. The important thing is to close the channels in case there are goroutines waiting on them.

if ok {
p.(*ExecutionResults).pending = pendingOrchestratorCh
}
g.logger.Debugf("pending orchestrators after add %s: %#v\n", key, pendingOrchestrators)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these debug logs could be removed @kaibocai
I had left the logs in the code I shared with you just to show the flow of data! These logs may be even too verbose for a debug-level logging

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed logs here and just left a few logs on tracking the newly added logic.

pendingOrchestratorCh := make(chan string, 1)
defer func() {
// If there's any pending activity left, remove them
for key := range pendingActivities {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of pendingActivities or pendingOrchestrators here? Since we just need to delete all pending keys that are there in g.pendingActivities or g.pendingOrchestrators, can we not simply iterate in the map and delete? This would avoid sending the items to pendingChannel as well for every item being processed..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we do not delete all keys in g.pendingActivities.

There could be multiple connections to GetWorkItems, so each connection needs its own map to track what pending items are related to that connection.

Using a local variable, rather than depending on the global g.pendingActivities/g.pendingOrchestrators, means we can avoid using locks (because of how gRPC works, there's no issue with concurrent access there) and simplify the cleanup process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting. We support multiple connections to GetWorkItems? I didn't know and was actually going to open an issue for it!

A question though: In case we have say 2 connections to the server, when the workItem gets added to g.workItemQueue, how does it properly gets allocated to the correct stream? It seems like two concurrent streams trying to read a common channel.. won't that have any issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I had assumed it was supported to have multiple GetWOrkItems streams. But this patch doesn't prevent it.

In case we have say 2 connections to the server, when the workItem gets added to g.workItemQueue, how does it properly gets allocated to the correct stream?

With multiple GetWorkItems, there's multiple goroutines blocked here:

https://github.com/microsoft/durabletask-go/pull/61/files#diff-fe6ca2dcacabca215bef6921c53b3c197d9c9ea4d1febcf15392a1e441dddc07R166

In this case, one of those listening, at "random", will get the message

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. In that case, it's required to maintain the stream local map. Thanks for explaining.

executor.pendingActivities.Store(key, result)
defer executor.pendingActivities.Delete(key)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason for moving the delete operation from here to other places? If I understand corretly, the delete then needs to be added in CompleteActivityTask, in the defer block in GetWorkItems and potentially Shutdown as well. Instead if we just keep it here, it would handle all scenarios. Also looks more intuitive to me(We would still need to close the complete channel in getWorkItems and Shutdown)

}
g.logger.Debugf("pending activities after add %s: %#v\n", key, pendingActivities)
}

if err := stream.Send(wi); err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a debug log here as well work item stream closed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add g.logger.Errorf("encountered an error while sending work item: %v", err), I think error here does not always mean stream close right.

tests/grpc/grpc_test.go Show resolved Hide resolved
g.pendingActivities.Range(func(_, value any) bool {
p, ok := value.(*activityExecutionResult)
if ok {
close(p.complete)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we check if the complete channel is already closed as it's closed at multiple places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the channel is in the map, it's not been closed. Before closing the channel, the calls use LoadAndDelete which means that the channel is "removed from the map" before close is called on it

@ItalyPaleAle
Copy link
Contributor

@cgillum are you good with this PR? I think it fixes some bugs that could make dtf-go hang

@cgillum
Copy link
Member

cgillum commented Mar 26, 2024

@ItalyPaleAle I'm quite uncomfortable with all the changes related to shutdown, which aren't even relevant for the original issue. I need more time to understand all the implications. If we can reduce the scope of this PR to just fixing the hangs, then I'd be okay with merging it.

@ItalyPaleAle
Copy link
Contributor

I had to go back and look at the PR again since it's been a while. I don't remember the reason for changes behind the shutdown, but looking at it, it seems there's code that closes channels that would otherwise be hanging, so it seems to prevent leaking goroutines

@yaron2
Copy link
Contributor

yaron2 commented Jun 20, 2024

We've tested this code internally and can confirm it improves reliability greatly.

@yaron2
Copy link
Contributor

yaron2 commented Jun 20, 2024

cc @cgillum @ItalyPaleAle

@famarting
Copy link
Contributor

looks good to me!

Making the calls ExecuteOrchestrator and ExecuteActivity fail when the GetWorkItems function exits increases reliability greatly. Because it allows to signal the backend of an error so it can call AbandonActivityWorkItem, and that way depending on the backend implementation the activity will be retried as soon as a the GetWorkItems stream is recreated.

@kaibocai kaibocai merged commit 0948712 into main Jun 28, 2024
4 checks passed
@kaibocai kaibocai mentioned this pull request Jun 28, 2024
@cgillum cgillum deleted the debug/server-stuck branch June 28, 2024 22:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Grpc server stuck for certain tests
6 participants