From bf928601f818d08739720aac39a16dc17f60a9f8 Mon Sep 17 00:00:00 2001 From: Laur Sisask Date: Wed, 8 May 2024 13:28:04 +0300 Subject: [PATCH] Fix coordinator process exiting due to heartbeat race Brod group coordinator periodically sends heartbeats to the Kafka broker. If it does not receive a response to a request within configured timeout, it exits with `hb_timeout` reason. There was a race condition where the connection to the Kafka broker was closed after a heartbeat was sent out, but before a heartbeat response was received. When this happened, brod still expected to receive a response to the heartbeat. But since the connection had closed, this response never came and the process exited with `hb_timeout`. This error consistently happens once in an hour in all our Elixir deployments that use brod. It looks like that for some reason Amazon MSK closes the Kafka connection from the broker side every 1 hour, and for some reason always after the client sends a heartbeat request. I do not know why this happens, but regardless, the server has a right to close the connection and the application should be able to handle that without causing error noise. This commit fixes the race condition. Now, when the connection goes down, we remove the reference to the heartbeat request that was last sent out. By removing this reference, the coordinator will no longer expect a response to the heartbeat request. Should connection be re-established, the coordinator will start sending out new heartbeat requests as usual. I tested out the solution in my own computer by adding a custom TCP proxy in front of Kafka where I had ability to terminate the connections and introduce additional latency. With this setup, I was able to verify that with the previous version, the same errors that we saw in production happened, but with the changes they no longer showed up. These are the errors that showed up in our logs: ``` Process #PID<0.19777.11> terminating ** (exit) :hb_timeout (stdlib 4.2) gen_server.erl:1241: :gen_server.handle_common_reply/8 (stdlib 4.2) proc_lib.erl:240: :proc_lib.init_p_do_apply/3 Initial Call: :brod_group_coordinator.init/1 Ancestors: [#PID<0.19775.11>, CallRouter.Supervisor, #PID<0.4065.0>] Neighbours: #PID<0.6845.12> Initial Call: :kpro_connection.init/4 Current Call: :kpro_connection.loop/2 Ancestors: [#PID<0.19777.11>, #PID<0.19775.11>, CallRouter.Supervisor, #PID<0.4065.0>] ``` ``` GenServer #PID<0.1262.11> terminating ** (stop) :hb_timeout Last message: :lo_cmd_send_heartbeat ``` XT-19 --- src/brod_group_coordinator.erl | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/brod_group_coordinator.erl b/src/brod_group_coordinator.erl index 5ae1ac9c..50884d1a 100644 --- a/src/brod_group_coordinator.erl +++ b/src/brod_group_coordinator.erl @@ -526,7 +526,11 @@ stabilize(#state{ rejoin_delay_seconds = RejoinDelaySeconds State3 = State2#state{is_in_group = false}, %$ 4. Clean up state based on the last failure reason - State = maybe_reset_member_id(State3, Reason), + State4 = maybe_reset_member_id(State3, Reason), + + %% 5. Clean up ongoing heartbeat request ref if connection + %% was closed + State = maybe_reset_hb_ref(State4, Reason), %% 5. ensure we have a connection to the (maybe new) group coordinator F1 = fun discover_coordinator/1, @@ -591,6 +595,15 @@ should_reset_member_id({connection_down, _Reason}) -> should_reset_member_id(_) -> false. +%% When connection goes down while waiting for heartbeat +%% response, the response will never be received. +%% Reset heartbeat ref to let new heartbeat request to +%% be sent over new connection. +maybe_reset_hb_ref(State, {connection_down, _Reason}) -> + State#state{hb_ref = ?undef}; +maybe_reset_hb_ref(State, _) -> + State. + -spec join_group(state()) -> {ok, state()}. join_group(#state{ groupId = GroupId , memberId = MemberId0