diff --git a/kafka/consumer.c b/kafka/consumer.c index b23de6b..9dc0487 100644 --- a/kafka/consumer.c +++ b/kafka/consumer.c @@ -821,3 +821,17 @@ int lua_consumer_resume(struct lua_State *L) { return lua_consumer_call_pause_resume(L, kafka_resume); } + +int +lua_consumer_rebalance_protocol(struct lua_State *L) { + consumer_t **consumer_p = luaL_checkudata(L, 1, consumer_label); + if (consumer_p == NULL || *consumer_p == NULL) + return 0; + + if ((*consumer_p)->rd_consumer != NULL) { + const char *proto = rd_kafka_rebalance_protocol((*consumer_p)->rd_consumer); + lua_pushstring(L, proto); + return 1; + } + return 0; +} diff --git a/kafka/consumer.h b/kafka/consumer.h index 1715bd4..34b66d0 100644 --- a/kafka/consumer.h +++ b/kafka/consumer.h @@ -59,4 +59,7 @@ lua_consumer_pause(struct lua_State *L); int lua_consumer_resume(struct lua_State *L); +int +lua_consumer_rebalance_protocol(struct lua_State *L); + #endif //TNT_KAFKA_CONSUMER_H diff --git a/kafka/init.lua b/kafka/init.lua index 0e3b32e..97dab59 100644 --- a/kafka/init.lua +++ b/kafka/init.lua @@ -219,6 +219,10 @@ function Consumer:resume() return self._consumer:resume() end +function Consumer:rebalance_protocol() + return self._consumer:rebalance_protocol() +end + function Consumer:seek_partitions(topic_partitions_list, options) local timeout_ms = get_timeout_from_options(options) return self._consumer:seek_partitions(topic_partitions_list, timeout_ms) diff --git a/kafka/tnt_kafka.c b/kafka/tnt_kafka.c index 06266fe..9055dea 100644 --- a/kafka/tnt_kafka.c +++ b/kafka/tnt_kafka.c @@ -33,6 +33,7 @@ luaopen_kafka_tntkafka(lua_State *L) { {"resume", lua_consumer_resume}, {"close", lua_consumer_close}, {"destroy", lua_consumer_destroy}, + {"rebalance_protocol", lua_consumer_rebalance_protocol}, {"__tostring", lua_consumer_tostring}, {NULL, NULL} };