Skip to content

Commit

Permalink
fix(group subscriber v2): cleanup consumer processes on shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Oct 25, 2024
1 parent fa28883 commit 378f172
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
17 changes: 15 additions & 2 deletions src/brod_group_subscriber_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,15 @@ handle_info(_Info, State) ->
%%--------------------------------------------------------------------
-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(),
State :: term()) -> any().
terminate(_Reason, #state{workers = Workers,
terminate(_Reason, #state{config = Config,
workers = Workers,
coordinator = Coordinator,
group_id = GroupId
}) ->
ok = terminate_all_workers(Workers),
ok = flush_offset_commits(GroupId, Coordinator).
ok = flush_offset_commits(GroupId, Coordinator),
ok = stop_consumers(Config),
ok.

%%%===================================================================
%%% Internal functions
Expand Down Expand Up @@ -529,6 +532,16 @@ do_ack(Topic, Partition, Offset, #state{ workers = Workers
{error, unknown_topic_or_partition}
end.

stop_consumers(Config) ->
#{ client := Client
, topics := Topics
} = Config,
lists:foreach(
fun(Topic) ->
_ = brod_client:stop_consumer(Client, Topic)
end,
Topics).

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
Expand Down
27 changes: 27 additions & 0 deletions test/brod_group_subscriber_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
, t_assign_partitions_handles_updating_state/1
, t_get_workers/1
, v2_coordinator_crash/1
, v2_consumer_cleanup/1
, v2_subscriber_shutdown/1
, v2_subscriber_assignments_revoked/1
]).
Expand Down Expand Up @@ -97,6 +98,7 @@ groups() ->
, t_assign_partitions_handles_updating_state
, t_get_workers
, v2_coordinator_crash
, v2_consumer_cleanup
, v2_subscriber_shutdown
, v2_subscriber_assignments_revoked
]}
Expand Down Expand Up @@ -391,6 +393,31 @@ v2_coordinator_crash(Config) when is_list(Config) ->
ok
end).

%% Checks that we don't leave `brod_consumer' processes lingering after we stop a group
%% subscriber v2.
v2_consumer_cleanup(Config) when is_list(Config) ->
InitArgs = #{},
Topic = ?topic,
Partition = 0,
Client = ?CLIENT_ID,
?check_trace(
#{timetrap => 5_000},
begin
{ok, SubscriberPid} = start_subscriber(?group_id, Config, [Topic], InitArgs),
%% Send a message to the topic and wait until it's received to make sure
%% the subscriber is stable:
produce({Topic, Partition}, <<0>>),
{ok, _} = ?wait_message(Topic, Partition, <<0>>, _),
?assertMatch({ok, _}, brod_client:get_consumer(Client, Topic, Partition)),
ok = stop_subscriber(Config, SubscriberPid),
?assertMatch({error, {consumer_not_found, _}},
brod_client:get_consumer(Client, Topic, Partition)),
ok
end,
[]
),
ok.

v2_subscriber_shutdown(Config) when is_list(Config) ->
%% Test graceful shutdown of the group subscriber:
InitArgs = #{async_ack => true},
Expand Down

0 comments on commit 378f172

Please sign in to comment.