Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add method to check rebalance protocol #113

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions kafka/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -821,3 +821,17 @@ int
lua_consumer_resume(struct lua_State *L) {
return lua_consumer_call_pause_resume(L, kafka_resume);
}

int
lua_consumer_rebalance_protocol(struct lua_State *L) {
consumer_t **consumer_p = luaL_checkudata(L, 1, consumer_label);
if (consumer_p == NULL || *consumer_p == NULL)
return 0;

if ((*consumer_p)->rd_consumer != NULL) {
const char *proto = rd_kafka_rebalance_protocol((*consumer_p)->rd_consumer);
lua_pushstring(L, proto);
return 1;
}
return 0;
}
3 changes: 3 additions & 0 deletions kafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,7 @@ lua_consumer_pause(struct lua_State *L);
int
lua_consumer_resume(struct lua_State *L);

int
lua_consumer_rebalance_protocol(struct lua_State *L);

#endif //TNT_KAFKA_CONSUMER_H
4 changes: 4 additions & 0 deletions kafka/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ function Consumer:resume()
return self._consumer:resume()
end

function Consumer:rebalance_protocol()
return self._consumer:rebalance_protocol()
end

function Consumer:seek_partitions(topic_partitions_list, options)
local timeout_ms = get_timeout_from_options(options)
return self._consumer:seek_partitions(topic_partitions_list, timeout_ms)
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ lua_producer_metadata(struct lua_State *L) {

int
lua_producer_list_groups(struct lua_State *L) {
producer_t **producer_p = (producer_t **)luaL_checkudata(L, 1, producer_label);
producer_t **producer_p = luaL_checkudata(L, 1, producer_label);
if (producer_p == NULL || *producer_p == NULL)
return 0;

Expand Down
1 change: 1 addition & 0 deletions kafka/tnt_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ luaopen_kafka_tntkafka(lua_State *L) {
{"resume", lua_consumer_resume},
{"close", lua_consumer_close},
{"destroy", lua_consumer_destroy},
{"rebalance_protocol", lua_consumer_rebalance_protocol},
{"__tostring", lua_consumer_tostring},
{NULL, NULL}
};
Expand Down
10 changes: 9 additions & 1 deletion tests/consumer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ local function list_groups(timeout_ms)
log.info("Groups: %s", json.encode(res))
-- Some fields can have binary data that won't
-- be correctly processed by connector.
for _, group in ipairs(res) do
for _, group in ipairs(res) do
group['members'] = nil
end
return res
Expand Down Expand Up @@ -204,6 +204,9 @@ local function test_seek_partitions()

for _ = 1, 5 do
local msg = out:get(3)
if msg == nil then
error('Message is not delivered')
end
log.info('Get message: %s', json.encode(msg_totable(msg)))
append_message(messages, msg)
consumer:seek_partitions({
Expand All @@ -214,6 +217,10 @@ local function test_seek_partitions()
return messages
end

local function rebalance_protocol()
return consumer:rebalance_protocol()
end

local function test_create_errors()
log.info('Create without config')
local _, err = tnt_kafka.Consumer.create()
Expand Down Expand Up @@ -263,6 +270,7 @@ return {
list_groups = list_groups,
pause = pause,
resume = resume,
rebalance_protocol = rebalance_protocol,

test_seek_partitions = test_seek_partitions,
test_create_errors = test_create_errors,
Expand Down
2 changes: 1 addition & 1 deletion tests/producer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ local function list_groups(timeout_ms)
log.info("Groups: %s", json.encode(res))
-- Some fields can have binary data that won't
-- be correctly processed by connector.
for _, group in ipairs(res) do
for _, group in ipairs(res) do
group['members'] = nil
end
return res
Expand Down
13 changes: 13 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,19 @@ def test_consumer_should_log_rebalances():
assert len(response.data[0]) > 0


def test_consumer_rebalance_protocol():
server = get_server()

with create_consumer(server, KAFKA_HOST, {"bootstrap.servers": KAFKA_HOST}):
time.sleep(5)
response = server.call("consumer.rebalance_protocol", [])
assert response[0] == 'NONE'

server.call("consumer.subscribe", [["test_unsub_partially_1"]])
response = server.call("consumer.rebalance_protocol", [])
assert response[0] == 'NONE'


def test_consumer_should_continue_consuming_from_last_committed_offset():
message1 = {
"key": "test1",
Expand Down
1 change: 1 addition & 0 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def test_producer_should_log_debug():

server.call("producer.close", [])


def test_producer_create_errors():
server = get_server()
server.call("producer.test_create_errors")
Loading