From 1b7e6d1ce9bcac0530c00e784318b6735dc62139 Mon Sep 17 00:00:00 2001 From: z1597534268 Date: Wed, 22 Jan 2025 00:47:41 +0200 Subject: [PATCH 1/2] add option to add configs to kafka --- .gitignore | 1 + examples/kafka_ex.py | 1 + locust_plugins/users/kafka.py | 8 +++++--- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index f1629c4..cf45d97 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ env _version.py examples/screenshot_* output.txt +.idea diff --git a/examples/kafka_ex.py b/examples/kafka_ex.py index 105acf8..0542fd1 100644 --- a/examples/kafka_ex.py +++ b/examples/kafka_ex.py @@ -5,6 +5,7 @@ class MyUser(KafkaUser): bootstrap_servers = os.environ["LOCUST_KAFKA_SERVERS"] + configs = {"compression.type": "gzip"} @task def t(self): diff --git a/locust_plugins/users/kafka.py b/locust_plugins/users/kafka.py index fa69054..b2e55af 100644 --- a/locust_plugins/users/kafka.py +++ b/locust_plugins/users/kafka.py @@ -13,10 +13,12 @@ class KafkaUser(User): abstract = True # overload these values in your subclass bootstrap_servers: str = None # type: ignore + configs: dict[str, str | int] = None def __init__(self, environment): super().__init__(environment) - self.client: KafkaClient = KafkaClient(environment=environment, bootstrap_servers=self.bootstrap_servers) + self.client: KafkaClient = KafkaClient(environment=environment, bootstrap_servers=self.bootstrap_servers, + configs=self.configs) def on_stop(self): self.client.producer.flush(5) @@ -35,9 +37,9 @@ def _on_delivery(environment, identifier, response_length, start_time, start_per class KafkaClient: - def __init__(self, *, environment, bootstrap_servers): + def __init__(self, *, environment, bootstrap_servers, configs=[]): self.environment = environment - self.producer = Producer({"bootstrap.servers": bootstrap_servers}) + self.producer = Producer({"bootstrap.servers": bootstrap_servers}, **configs) def send(self, topic: str, value: bytes, key=None, response_length_override=None, name=None, context={}): start_perf_counter = time.perf_counter() From 6a274d15bf745faeabe679ab0e9647d33993b4db Mon Sep 17 00:00:00 2001 From: z1597534268 Date: Wed, 22 Jan 2025 00:51:17 +0200 Subject: [PATCH 2/2] fix minor issue --- locust_plugins/users/kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/locust_plugins/users/kafka.py b/locust_plugins/users/kafka.py index b2e55af..56ea664 100644 --- a/locust_plugins/users/kafka.py +++ b/locust_plugins/users/kafka.py @@ -37,7 +37,7 @@ def _on_delivery(environment, identifier, response_length, start_time, start_per class KafkaClient: - def __init__(self, *, environment, bootstrap_servers, configs=[]): + def __init__(self, *, environment, bootstrap_servers, configs=None): self.environment = environment self.producer = Producer({"bootstrap.servers": bootstrap_servers}, **configs)