-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix server stuck issue when test timeout exception #61
Conversation
executor.pendingActivities.Store(key, result) | ||
defer executor.pendingActivities.Delete(key) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we removing the calls to delete these keys? Won't that result in a memory leak?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They will be deleted by CompleteActivityTask
. Additionally, if the GetWorkItems
strema is closed while the operation is in progress, the cleanup logic added to GetWorkItems
will delete any "leftover" item in this map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason for moving the delete operation from here to other places? If I understand corretly, the delete then needs to be added in CompleteActivityTask
, in the defer block in GetWorkItems
and potentially Shutdown
as well. Instead if we just keep it here, it would handle all scenarios. Also looks more intuitive to me(We would still need to close the complete
channel in getWorkItems and Shutdown
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I wrote this a week ago and now I cannot remember why, even after reading the code :( It may work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then this may help to avoid closing a closed channel, as we delete it in time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's impossible currently for close()
to be called on the same channel twice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I mean currently implementation will help avoid closing a closed channel, but with defer executor.pendingActivities.Delete(key)
, we can potentially close a closed channel if the stream is closed and then the shutdown is called.
// closing the work item queue is a signal for shutdown | ||
close(g.workItemQueue) | ||
|
||
// Iterate through all pending items and close them to unblock the goroutines waiting on this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these changes to the Shutdown
routine related to the original issue or is this unrelated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was my first attempt at fixing the issue. It turned out it didn't fix it, but it seemed like something that would be helpful to have nevertheless, to ensure we don't leave goroutines waiting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not delete the items as well from pendingActivities/pendingOrchestrators as well in case of shutdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that shutdown is called before the object is abandoned. The memory is released by the GC regardless. The important thing is to close the channels in case there are goroutines waiting on them.
backend/executor.go
Outdated
if ok { | ||
p.(*ExecutionResults).pending = pendingOrchestratorCh | ||
} | ||
g.logger.Debugf("pending orchestrators after add %s: %#v\n", key, pendingOrchestrators) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these debug logs could be removed @kaibocai
I had left the logs in the code I shared with you just to show the flow of data! These logs may be even too verbose for a debug-level logging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed logs here and just left a few logs on tracking the newly added logic.
pendingOrchestratorCh := make(chan string, 1) | ||
defer func() { | ||
// If there's any pending activity left, remove them | ||
for key := range pendingActivities { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the use of pendingActivities
or pendingOrchestrators
here? Since we just need to delete all pending keys that are there in g.pendingActivities
or g.pendingOrchestrators
, can we not simply iterate in the map and delete? This would avoid sending the items to pendingChannel as well for every item being processed..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we do not delete all keys in g.pendingActivities.
There could be multiple connections to GetWorkItems, so each connection needs its own map to track what pending items are related to that connection.
Using a local variable, rather than depending on the global g.pendingActivities/g.pendingOrchestrators, means we can avoid using locks (because of how gRPC works, there's no issue with concurrent access there) and simplify the cleanup process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh interesting. We support multiple connections to GetWorkItems? I didn't know and was actually going to open an issue for it!
A question though: In case we have say 2 connections to the server, when the workItem gets added to g.workItemQueue
, how does it properly gets allocated to the correct stream? It seems like two concurrent streams trying to read a common channel.. won't that have any issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I had assumed it was supported to have multiple GetWOrkItems streams. But this patch doesn't prevent it.
In case we have say 2 connections to the server, when the workItem gets added to g.workItemQueue, how does it properly gets allocated to the correct stream?
With multiple GetWorkItems, there's multiple goroutines blocked here:
In this case, one of those listening, at "random", will get the message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. In that case, it's required to maintain the stream local map. Thanks for explaining.
executor.pendingActivities.Store(key, result) | ||
defer executor.pendingActivities.Delete(key) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason for moving the delete operation from here to other places? If I understand corretly, the delete then needs to be added in CompleteActivityTask
, in the defer block in GetWorkItems
and potentially Shutdown
as well. Instead if we just keep it here, it would handle all scenarios. Also looks more intuitive to me(We would still need to close the complete
channel in getWorkItems and Shutdown
)
} | ||
g.logger.Debugf("pending activities after add %s: %#v\n", key, pendingActivities) | ||
} | ||
|
||
if err := stream.Send(wi); err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a debug log here as well work item stream closed
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add g.logger.Errorf("encountered an error while sending work item: %v", err)
, I think error here does not always mean stream close right.
g.pendingActivities.Range(func(_, value any) bool { | ||
p, ok := value.(*activityExecutionResult) | ||
if ok { | ||
close(p.complete) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we check if the complete
channel is already closed as it's closed at multiple places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the channel is in the map, it's not been closed. Before closing the channel, the calls use LoadAndDelete
which means that the channel is "removed from the map" before close
is called on it
@cgillum are you good with this PR? I think it fixes some bugs that could make dtf-go hang |
@ItalyPaleAle I'm quite uncomfortable with all the changes related to shutdown, which aren't even relevant for the original issue. I need more time to understand all the implications. If we can reduce the scope of this PR to just fixing the hangs, then I'd be okay with merging it. |
I had to go back and look at the PR again since it's been a while. I don't remember the reason for changes behind the shutdown, but looking at it, it seems there's code that closes channels that would otherwise be hanging, so it seems to prevent leaking goroutines |
We've tested this code internally and can confirm it improves reliability greatly. |
looks good to me! Making the calls |
Resolve #58.
Thanks @ItalyPaleAle for the fix!