Skip to content

Commit

Permalink
Fix coordinator process exiting due to heartbeat race
Browse files Browse the repository at this point in the history
Brod group coordinator periodically sends heartbeats to the Kafka
broker. If it does not receive a response to a request within configured
timeout, it exits with `hb_timeout` reason.

There was a race condition where the connection to the Kafka broker was
closed after a heartbeat was sent out, but before a heartbeat response
was received. When this happened, brod still expected to receive a
response to the heartbeat. But since the connection had closed, this
response never came and the process exited with `hb_timeout`.

This error consistently happens once in an hour in all our Elixir
deployments that use brod. It looks like that for some reason Amazon
MSK closes the Kafka connection from the broker side every 1 hour, and for
some reason always after the client sends a heartbeat request. I do not
know why this happens, but regardless, the server has a right to close
the connection and the application should be able to handle that without
causing error noise.

This commit fixes the race condition. Now, when the connection goes
down, we remove the reference to the heartbeat request that was last
sent out. By removing this reference, the coordinator will no longer
expect a response to the heartbeat request. Should connection be
re-established, the coordinator will start sending out new heartbeat
requests as usual.

I tested out the solution in my own computer by adding a custom TCP
proxy in front of Kafka where I had ability to terminate the connections
and introduce additional latency. With this setup, I was able to verify
that with the previous version, the same errors that we saw in
production happened, but with the changes they no longer showed up.

These are the errors that showed up in our logs:

```
Process #PID<0.19777.11> terminating
** (exit) :hb_timeout
    (stdlib 4.2) gen_server.erl:1241: :gen_server.handle_common_reply/8
    (stdlib 4.2) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Initial Call: :brod_group_coordinator.init/1
Ancestors: [#PID<0.19775.11>, CallRouter.Supervisor, #PID<0.4065.0>]
Neighbours:
    #PID<0.6845.12>
        Initial Call: :kpro_connection.init/4
        Current Call: :kpro_connection.loop/2
        Ancestors: [#PID<0.19777.11>, #PID<0.19775.11>, CallRouter.Supervisor, #PID<0.4065.0>]
```

```
GenServer #PID<0.1262.11> terminating
** (stop) :hb_timeout
Last message: :lo_cmd_send_heartbeat
```

XT-19
  • Loading branch information
laurglia committed May 8, 2024
1 parent 6dd0150 commit bf92860
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion src/brod_group_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,11 @@ stabilize(#state{ rejoin_delay_seconds = RejoinDelaySeconds
State3 = State2#state{is_in_group = false},

%$ 4. Clean up state based on the last failure reason
State = maybe_reset_member_id(State3, Reason),
State4 = maybe_reset_member_id(State3, Reason),

%% 5. Clean up ongoing heartbeat request ref if connection
%% was closed
State = maybe_reset_hb_ref(State4, Reason),

%% 5. ensure we have a connection to the (maybe new) group coordinator
F1 = fun discover_coordinator/1,
Expand Down Expand Up @@ -591,6 +595,15 @@ should_reset_member_id({connection_down, _Reason}) ->
should_reset_member_id(_) ->
false.

%% When connection goes down while waiting for heartbeat
%% response, the response will never be received.
%% Reset heartbeat ref to let new heartbeat request to
%% be sent over new connection.
maybe_reset_hb_ref(State, {connection_down, _Reason}) ->
State#state{hb_ref = ?undef};
maybe_reset_hb_ref(State, _) ->
State.

-spec join_group(state()) -> {ok, state()}.
join_group(#state{ groupId = GroupId
, memberId = MemberId0
Expand Down

0 comments on commit bf92860

Please sign in to comment.