From 4e0a4fae5e5c7e6a148dd47810aa113175405d21 Mon Sep 17 00:00:00 2001 From: Yupeng Fu Date: Tue, 21 Jan 2025 08:04:01 -0800 Subject: [PATCH] comments Signed-off-by: Yupeng Fu --- .../java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java | 2 +- .../java/org/opensearch/plugin/kafka/KafkaSourceConfig.java | 2 +- .../opensearch/plugin/kafka/KafkaConsumerFactoryTests.java | 2 +- .../opensearch/plugin/kafka/KafkaPartitionConsumerTests.java | 4 ++-- .../org/opensearch/plugin/kafka/KafkaSourceConfigTests.java | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java index 5efe06acd71af..495e91be0c715 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java @@ -85,7 +85,7 @@ public void testKafkaIngestion() { .put("ingestion_source.type", "kafka") .put("ingestion_source.pointer.init.reset", "earliest") .put("ingestion_source.param.topic", "test") - .put("ingestion_source.param.bootstrapServers", kafka.getBootstrapServers()) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) .build(), "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" ); diff --git a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java index 7b7f14d6bbf21..099300c6e5767 100644 --- a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java +++ b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java @@ -25,7 +25,7 @@ public class KafkaSourceConfig { public KafkaSourceConfig(Map params) { // TODO: better parsing and validation this.topic = (String) Objects.requireNonNull(params.get("topic")); - this.bootstrapServers = (String) Objects.requireNonNull(params.get("bootstrapServers")); + this.bootstrapServers = (String) Objects.requireNonNull(params.get("bootstrap_servers")); assert this.bootstrapServers != null; } diff --git a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaConsumerFactoryTests.java b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaConsumerFactoryTests.java index 5f30c634dacc1..deaa4b1f0b369 100644 --- a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaConsumerFactoryTests.java +++ b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaConsumerFactoryTests.java @@ -19,7 +19,7 @@ public void testInitialize() { KafkaConsumerFactory factory = new KafkaConsumerFactory(); Map params = new HashMap<>(); params.put("topic", "test-topic"); - params.put("bootstrapServers", "localhost:9092"); + params.put("bootstrap_servers", "localhost:9092"); factory.initialize(params); diff --git a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java index be2c623653241..96f639366d887 100644 --- a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java +++ b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java @@ -42,7 +42,7 @@ public void setUp() throws Exception { super.setUp(); Map params = new HashMap<>(); params.put("topic", "test-topic"); - params.put("bootstrapServers", "localhost:9092"); + params.put("bootstrap_servers", "localhost:9092"); config = new KafkaSourceConfig(params); mockConsumer = mock(KafkaConsumer.class); @@ -93,7 +93,7 @@ public void testLatestPointer() { public void testTopicDoesNotExist() { Map params = new HashMap<>(); params.put("topic", "non-existent-topic"); - params.put("bootstrapServers", "localhost:9092"); + params.put("bootstrap_servers", "localhost:9092"); var kafkaSourceConfig = new KafkaSourceConfig(params); when(mockConsumer.partitionsFor(eq("non-existent-topic"), any(Duration.class))).thenReturn(null); try { diff --git a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSourceConfigTests.java b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSourceConfigTests.java index 2d593c1373702..aa4ddb94f23fc 100644 --- a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSourceConfigTests.java +++ b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSourceConfigTests.java @@ -19,7 +19,7 @@ public class KafkaSourceConfigTests extends OpenSearchTestCase { public void testConstructorAndGetters() { Map params = new HashMap<>(); params.put("topic", "topic"); - params.put("bootstrapServers", "bootstrap"); + params.put("bootstrap_servers", "bootstrap"); KafkaSourceConfig config = new KafkaSourceConfig(params);