Skip to content

Commit

Permalink
Changing the REST proxy consumer client id to be plain string instead…
Browse files Browse the repository at this point in the history
… of tuple
  • Loading branch information
AnatolyPopov committed Jul 19, 2024
1 parent e5a6e41 commit c01f6c0
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async def create_consumer(self, group_name: str, request_data: dict, content_typ
request_data["auto.commit.enable"] = enable_commit
request_data["auto.offset.reset"] = request_data.get("auto.offset.reset", "earliest")
fetch_min_bytes = request_data.get("fetch.min.bytes", self.config["fetch_min_bytes"])
c = await self.create_kafka_consumer(fetch_min_bytes, group_name, internal_name, request_data)
c = await self.create_kafka_consumer(fetch_min_bytes, group_name, consumer_name, request_data)
except KafkaConfigurationError as e:
KarapaceBase.internal_error(str(e), content_type)
self.consumers[internal_name] = TypedConsumer(
Expand All @@ -200,7 +200,7 @@ async def create_consumer(self, group_name: str, request_data: dict, content_typ
consumer_base_uri = urljoin(self.base_uri, f"consumers/{group_name}/instances/{consumer_name}")
KarapaceBase.r(content_type=content_type, body={"base_uri": consumer_base_uri, "instance_id": consumer_name})

async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name, request_data):
async def create_kafka_consumer(self, fetch_min_bytes, group_name, client_id, request_data):
for retry in [True, True, False]:
try:
session_timeout_ms = self.config["session_timeout_ms"]
Expand All @@ -212,7 +212,7 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name
c = AsyncKafkaConsumer(
bootstrap_servers=self.config["bootstrap_uri"],
auto_offset_reset=request_data["auto.offset.reset"],
client_id=internal_name,
client_id=client_id,
enable_auto_commit=request_data["auto.commit.enable"],
fetch_max_wait_ms=self.config.get("consumer_fetch_max_wait_ms"),
fetch_message_max_bytes=self.config["consumer_request_max_bytes"],
Expand Down

0 comments on commit c01f6c0

Please sign in to comment.