Skip to content

Commit

Permalink
cleanup consumer queue in consumer group (#52)
Browse files Browse the repository at this point in the history
Co-authored-by: Henry Miao <[email protected]>
  • Loading branch information
hikui and hmiaoib authored Oct 23, 2022
1 parent fe93cd2 commit 7a292e5
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
3 changes: 1 addition & 2 deletions src/erlkaf_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,8 @@ process_events_one_by_one([], _ClientRef, _Backoff, _CbModule, CbState) ->
recv_stop() ->
receive {stop, _From, _Tag} = Msg -> Msg after 0 -> false end.

handle_stop(From, Tag, #state{topic_name = TopicName, partition = Partition, queue_ref = Queue}) ->
handle_stop(From, Tag, #state{topic_name = TopicName, partition = Partition}) ->
?LOG_INFO("stop consumer for: ~p partition: ~p", [TopicName, Partition]),
ok = erlkaf_nif:consumer_queue_cleanup(Queue),
From ! {stopped, Tag}.

exponential_backoff(0) ->
Expand Down
15 changes: 9 additions & 6 deletions src/erlkaf_consumer_group.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ handle_info({assign_partitions, Partitions}, #state{
PartFun = fun({TopicName, Partition, Offset, QueueRef}, Tmap) ->
TopicSettings = maps:get(TopicName, TopicsSettingsMap),
{ok, Pid} = erlkaf_consumer:start_link(ClientRef, TopicName, Partition, Offset, QueueRef, TopicSettings),
maps:put({TopicName, Partition}, Pid, Tmap)
maps:put({TopicName, Partition}, {Pid, QueueRef}, Tmap)
end,

{noreply, State#state{active_topics_map = lists:foldl(PartFun, ActiveTopicsMap, Partitions)}};
Expand All @@ -88,8 +88,8 @@ handle_info({revoke_partitions, Partitions}, #state{
active_topics_map = ActiveTopicsMap} = State) ->

?LOG_INFO("revoke partitions: ~p", [Partitions]),
Pids = get_pids(ActiveTopicsMap, Partitions),
ok = stop_consumers(Pids),
PidQueuePairs = get_pid_queue_pairs(ActiveTopicsMap, Partitions),
ok = stop_consumers(PidQueuePairs),
?LOG_INFO("all existing consumers stopped for partitions: ~p", [Partitions]),
ok = erlkaf_nif:consumer_partition_revoke_completed(ClientRef),
{noreply, State#state{active_topics_map = #{}}};
Expand Down Expand Up @@ -124,9 +124,12 @@ terminate(_Reason, #state{active_topics_map = TopicsMap, client_ref = ClientRef,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

get_pids(TopicsMap, Partitions) ->
get_pid_queue_pairs(TopicsMap, Partitions) ->
lists:map(fun(P) -> maps:get(P, TopicsMap) end, Partitions).

stop_consumers(Pids) ->
erlkaf_utils:parralel_exec(fun(Pid) -> erlkaf_consumer:stop(Pid) end, Pids).
stop_consumers(PidQueuePairs) ->
erlkaf_utils:parralel_exec(fun({Pid, QueueRef}) ->
erlkaf_consumer:stop(Pid),
ok = erlkaf_nif:consumer_queue_cleanup(QueueRef)
end, PidQueuePairs).

0 comments on commit 7a292e5

Please sign in to comment.