Skip to content

Commit

Permalink
Allow consuming from wildcard-topics
Browse files Browse the repository at this point in the history
  • Loading branch information
Роман Смирнов committed Mar 22, 2024
1 parent 9810163 commit b2763e2
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion lib/kaffe/config/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,21 @@ defmodule Kaffe.Config.Consumer do

def subscriber_name(idx), do: config_get(idx, :subscriber_name, consumer_group(idx)) |> String.to_atom()

def topics(idx), do: config_get!(idx, :topics)
def topics(idx) do
{wildcard_topics, specified_topics} = Enum.split_with(config_get!(idx, :topics), &(String.starts_with?(&1, "*")))
specified_topics ++ specify_wildcard_topics(wildcard_topics, idx)
end

defp specify_wildcard_topics([], _), do: []

defp specify_wildcard_topics(topics, idx) do
metadata = :brod.get_metadata(endpoints(idx))
topic_names = metadata |> elem(1) |> Map.get(:topics) |> Enum.map(& &1[:name])

Enum.flat_map(topics, fn("*" <> topic_name_tail) ->
Enum.filter(topic_names, &(String.ends_with?(&1, topic_name_tail)))
end) |> IO.inspect(label: "specify_wildcard_topics")
end

def message_handler(idx), do: config_get!(idx, :message_handler)

Expand Down

0 comments on commit b2763e2

Please sign in to comment.