From bd32d7e255ab1f13715b6864e077b5b0ce985913 Mon Sep 17 00:00:00 2001 From: Hanzel Legarda Date: Mon, 21 Oct 2024 11:26:25 -0600 Subject: [PATCH 01/12] added support for msk --- drone-fly-app/pom.xml | 7 +++++++ .../dronefly/app/context/CommonBeans.java | 14 ++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/drone-fly-app/pom.xml b/drone-fly-app/pom.xml index e3792a5..9eccde1 100644 --- a/drone-fly-app/pom.xml +++ b/drone-fly-app/pom.xml @@ -108,6 +108,13 @@ awaitility test + + + software.amazon.msk + aws-msk-iam-auth + 1.1.1 + + diff --git a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java index 55607e2..2ca00a1 100644 --- a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java +++ b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java @@ -16,6 +16,7 @@ package com.expediagroup.dataplatform.dronefly.app.context; import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; import org.apache.hadoop.hive.conf.HiveConf; @@ -65,8 +66,17 @@ public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException { @Bean public MessageReaderAdapter messageReaderAdapter() { - KafkaMessageReader delegate = KafkaMessageReaderBuilder.builder(bootstrapServers, topicName, instanceName).build(); + Properties mskProperties = new Properties(); + mskProperties.put("security.protocol", "SSL"); + mskProperties.put("sasl.mechanism", "AWS_MSK_IAM"); + mskProperties.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;"); + mskProperties.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler"); + + KafkaMessageReader delegate = KafkaMessageReaderBuilder. + builder(bootstrapServers, topicName, instanceName). + withConsumerProperties(mskProperties). + build(); return new MessageReaderAdapter(delegate); } -} +} \ No newline at end of file From 182a2737322834d334d810babc38540da403c6fd Mon Sep 17 00:00:00 2001 From: Hanzel Legarda Date: Mon, 21 Oct 2024 13:16:07 -0600 Subject: [PATCH 02/12] added support for msk --- drone-fly-app/pom.xml | 2 -- .../dronefly/app/context/CommonBeans.java | 31 +++++++++++++++---- drone-fly-integration-tests/pom.xml | 5 +++ 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/drone-fly-app/pom.xml b/drone-fly-app/pom.xml index 9eccde1..0c4edf0 100644 --- a/drone-fly-app/pom.xml +++ b/drone-fly-app/pom.xml @@ -108,13 +108,11 @@ awaitility test - software.amazon.msk aws-msk-iam-auth 1.1.1 - diff --git a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java index 2ca00a1..c6de097 100644 --- a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java +++ b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java @@ -19,6 +19,7 @@ import java.util.Properties; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -50,6 +51,18 @@ public class CommonBeans { @Value("${apiary.listener.list:}") private String confListenerList; + @Value("${apiary.security.protocol:#{null}}") + private String securityProtol; + + @Value("${apiary.sasl.mechanism:#{null}}") + private String saslMechanism; + + @Value("${apiary.sasl.jaas.config:#{null}}") + private String saslJaasConfig; + + @Value("${apiary.sasl.client.callback.handler.class:#{null}}") + private String saslHandlerClass; + @Bean public HiveConf hiveConf() { return new HiveConf(); @@ -66,17 +79,23 @@ public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException { @Bean public MessageReaderAdapter messageReaderAdapter() { - Properties mskProperties = new Properties(); - mskProperties.put("security.protocol", "SSL"); - mskProperties.put("sasl.mechanism", "AWS_MSK_IAM"); - mskProperties.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;"); - mskProperties.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler"); + Properties clientProperties = getClientProperties(); KafkaMessageReader delegate = KafkaMessageReaderBuilder. builder(bootstrapServers, topicName, instanceName). - withConsumerProperties(mskProperties). + withConsumerProperties(clientProperties). build(); return new MessageReaderAdapter(delegate); } + private Properties getClientProperties() { + Properties clientProperties = new Properties(); + if (StringUtils.isNotBlank(securityProtol)) { clientProperties.put("security.protocol", securityProtol); } + if (StringUtils.isNotBlank(saslMechanism)) { clientProperties.put("sasl.mechanism", saslMechanism); } + if (StringUtils.isNotBlank(saslJaasConfig)) { clientProperties.put("sasl.jaas.config", saslJaasConfig); } + if (StringUtils.isNotBlank(saslHandlerClass)) { clientProperties.put("sasl.client.callback.handler.class", + saslHandlerClass); } + return clientProperties; + } + } \ No newline at end of file diff --git a/drone-fly-integration-tests/pom.xml b/drone-fly-integration-tests/pom.xml index 28ccf23..b02d87d 100644 --- a/drone-fly-integration-tests/pom.xml +++ b/drone-fly-integration-tests/pom.xml @@ -76,5 +76,10 @@ junit-jupiter-params test + + software.amazon.msk + aws-msk-iam-auth + 1.1.1 + From 6712b0d3a648d4a08cf2828adaa68516915234cd Mon Sep 17 00:00:00 2001 From: Hanzel Legarda Date: Tue, 22 Oct 2024 13:07:16 -0600 Subject: [PATCH 03/12] modified reader to get properties from env variables --- .../dronefly/app/context/CommonBeans.java | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java index c6de097..63c1796 100644 --- a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java +++ b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -34,10 +35,12 @@ import com.expediagroup.dataplatform.dronefly.app.messaging.MessageReaderAdapter; import com.expediagroup.dataplatform.dronefly.app.service.ListenerCatalog; import com.expediagroup.dataplatform.dronefly.app.service.factory.ListenerCatalogFactory; +import org.springframework.context.annotation.Primary; @Configuration public class CommonBeans { private static final Logger log = LoggerFactory.getLogger(CommonBeans.class); + public static final String PREFIX = "apiary.messaging.client."; @Value("${instance.name:drone-fly}") private String instanceName; @@ -51,23 +54,18 @@ public class CommonBeans { @Value("${apiary.listener.list:}") private String confListenerList; - @Value("${apiary.security.protocol:#{null}}") - private String securityProtol; - - @Value("${apiary.sasl.mechanism:#{null}}") - private String saslMechanism; - - @Value("${apiary.sasl.jaas.config:#{null}}") - private String saslJaasConfig; - - @Value("${apiary.sasl.client.callback.handler.class:#{null}}") - private String saslHandlerClass; - @Bean public HiveConf hiveConf() { return new HiveConf(); } + @Bean + @Primary + @ConfigurationProperties + public Properties getEnvProperties() { + return new Properties(); + } + @Bean public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException { ListenerCatalog listenerCatalog = new ListenerCatalogFactory(conf).newInstance(confListenerList); @@ -80,7 +78,6 @@ public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException { @Bean public MessageReaderAdapter messageReaderAdapter() { Properties clientProperties = getClientProperties(); - KafkaMessageReader delegate = KafkaMessageReaderBuilder. builder(bootstrapServers, topicName, instanceName). withConsumerProperties(clientProperties). @@ -90,11 +87,13 @@ public MessageReaderAdapter messageReaderAdapter() { private Properties getClientProperties() { Properties clientProperties = new Properties(); - if (StringUtils.isNotBlank(securityProtol)) { clientProperties.put("security.protocol", securityProtol); } - if (StringUtils.isNotBlank(saslMechanism)) { clientProperties.put("sasl.mechanism", saslMechanism); } - if (StringUtils.isNotBlank(saslJaasConfig)) { clientProperties.put("sasl.jaas.config", saslJaasConfig); } - if (StringUtils.isNotBlank(saslHandlerClass)) { clientProperties.put("sasl.client.callback.handler.class", - saslHandlerClass); } + getEnvProperties().forEach((key, value) -> { + if (key.toString().startsWith(PREFIX)) { + String keyWithoutPrefix = StringUtils.replace(key.toString(), PREFIX, ""); + clientProperties.put(keyWithoutPrefix, value.toString()); + log.info("Client property {} set with value: {}", keyWithoutPrefix, value); + } + } ); return clientProperties; } From 22360e4a4ec595a791a6ee090bdf240fc6a181b9 Mon Sep 17 00:00:00 2001 From: Hanzel Legarda Date: Tue, 22 Oct 2024 15:23:03 -0600 Subject: [PATCH 04/12] Adding new config parameters to readme file --- README.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/README.md b/README.md index deb6cde..bfc969d 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,26 @@ The table below describes all the available configuration values for Drone Fly. | instance.name | Instance name for a Drone Fly instance. `instance.name` is also used to derive the Kafka consumer group. Therefore, in a multi-instance deployment, a unique `instance.name` for each Drone Fly instance needs to be provided to avoid all instances ending up in the same Kafka consumer group. | `string` | `drone-fly` | no | | endpoint.port | Port on which Drone Fly Spring Boot app will start. | `string` | `8008` | no | +### Additional configuration parameters +The Kafka message reader supports client properties that are passed to the Kafka consumer builder. +These are environment variables with the PREFIX apiary.messaging.client. + + #### Example for sending client parameters when using a Kafka cloud provider +- apiary.messaging.client.security.protocol=SSL +- apiary.messaging.client.sasl.mechanism=AWS_MSK_IAM +- apiary.messaging.client.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; +- apiary.messaging.client.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler + +In this case we are sending the properties to Kafka's consumer to be able to connect to AWS MSK which also requires the IAM library included as a dependency in the POM.xml file + + java -Dloader.path=lib/ -jar drone-fly-app--exec.jar \ + --apiary.bootstrap.servers=localhost:9092 \ + --apiary.kafka.topic.name=apiary \ + --apiary.listener.list="com.expediagroup.sampleListener1,com.expediagroup.sampleListener2" \ + --apiary.messaging.client.security.protocol=SSL \ + --apiary.messaging.client.sasl.mechanism=AWS_MSK_IAM \ + --apiary.messaging.client.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; \ + --apiary.messaging.client.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler ## Metrics From 211d637132dbd3b4ee73eb16929a5ee15c613a62 Mon Sep 17 00:00:00 2001 From: Hanzel Legarda Date: Wed, 23 Oct 2024 09:22:35 -0600 Subject: [PATCH 05/12] fix: code review comments, renaming client to consumer, modifying conf parameters to fill only with prefix params --- README.md | 22 +++++++++---------- .../dronefly/app/context/CommonBeans.java | 18 ++++++--------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index bfc969d..61b68c6 100644 --- a/README.md +++ b/README.md @@ -91,14 +91,14 @@ The table below describes all the available configuration values for Drone Fly. | endpoint.port | Port on which Drone Fly Spring Boot app will start. | `string` | `8008` | no | ### Additional configuration parameters -The Kafka message reader supports client properties that are passed to the Kafka consumer builder. -These are environment variables with the PREFIX apiary.messaging.client. +The Kafka message reader supports properties that are passed to the Kafka consumer builder. +These are environment variables with the PREFIX apiary.messaging.consumer. - #### Example for sending client parameters when using a Kafka cloud provider -- apiary.messaging.client.security.protocol=SSL -- apiary.messaging.client.sasl.mechanism=AWS_MSK_IAM -- apiary.messaging.client.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; -- apiary.messaging.client.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler + #### Example for sending consumer parameters when using a Kafka cloud provider +- apiary.messaging.consumer.security.protocol=SSL +- apiary.messaging.consumer.sasl.mechanism=AWS_MSK_IAM +- apiary.messaging.consumer.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; +- apiary.messaging.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler In this case we are sending the properties to Kafka's consumer to be able to connect to AWS MSK which also requires the IAM library included as a dependency in the POM.xml file @@ -106,10 +106,10 @@ In this case we are sending the properties to Kafka's consumer to be able to con --apiary.bootstrap.servers=localhost:9092 \ --apiary.kafka.topic.name=apiary \ --apiary.listener.list="com.expediagroup.sampleListener1,com.expediagroup.sampleListener2" \ - --apiary.messaging.client.security.protocol=SSL \ - --apiary.messaging.client.sasl.mechanism=AWS_MSK_IAM \ - --apiary.messaging.client.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; \ - --apiary.messaging.client.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler + --apiary.messaging.consumer.security.protocol=SSL \ + --apiary.messaging.consumer.sasl.mechanism=AWS_MSK_IAM \ + --apiary.messaging.consumer.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; \ + --apiary.messaging.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler ## Metrics diff --git a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java index 63c1796..13b4a0b 100644 --- a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java +++ b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java @@ -19,7 +19,6 @@ import java.util.Properties; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -40,7 +39,7 @@ @Configuration public class CommonBeans { private static final Logger log = LoggerFactory.getLogger(CommonBeans.class); - public static final String PREFIX = "apiary.messaging.client."; + public static final String PREFIX = "apiary.messaging.consumer"; @Value("${instance.name:drone-fly}") private String instanceName; @@ -77,7 +76,7 @@ public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException { @Bean public MessageReaderAdapter messageReaderAdapter() { - Properties clientProperties = getClientProperties(); + Properties clientProperties = getConsumerProperties(); KafkaMessageReader delegate = KafkaMessageReaderBuilder. builder(bootstrapServers, topicName, instanceName). withConsumerProperties(clientProperties). @@ -85,16 +84,13 @@ public MessageReaderAdapter messageReaderAdapter() { return new MessageReaderAdapter(delegate); } - private Properties getClientProperties() { - Properties clientProperties = new Properties(); + private Properties getConsumerProperties() { + Properties consumerProperties = new Properties(); getEnvProperties().forEach((key, value) -> { - if (key.toString().startsWith(PREFIX)) { - String keyWithoutPrefix = StringUtils.replace(key.toString(), PREFIX, ""); - clientProperties.put(keyWithoutPrefix, value.toString()); - log.info("Client property {} set with value: {}", keyWithoutPrefix, value); - } + consumerProperties.put(key.toString(), value.toString()); + log.info("Client property {} set with value: {}", key, value); } ); - return clientProperties; + return consumerProperties; } } \ No newline at end of file From 58bcf1a823890240260d27d977acdcd05924c846 Mon Sep 17 00:00:00 2001 From: Hanzel Legarda Date: Wed, 23 Oct 2024 11:49:19 -0600 Subject: [PATCH 06/12] fix: code review comments, renaming client to consumer, modifying conf parameters to fill only with prefix params --- .../dataplatform/dronefly/app/context/CommonBeans.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java index 13b4a0b..9a98709 100644 --- a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java +++ b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java @@ -60,7 +60,7 @@ public HiveConf hiveConf() { @Bean @Primary - @ConfigurationProperties + @ConfigurationProperties(PREFIX) public Properties getEnvProperties() { return new Properties(); } From e2215d07ced388a28cb1051a107521cceb167497 Mon Sep 17 00:00:00 2001 From: Hanzel Legarda Date: Wed, 23 Oct 2024 11:51:00 -0600 Subject: [PATCH 07/12] fix: code review comments, renaming client to consumer, modifying conf parameters to fill only with prefix params --- .../dataplatform/dronefly/app/context/CommonBeans.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java index 9a98709..9cba63c 100644 --- a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java +++ b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java @@ -39,7 +39,7 @@ @Configuration public class CommonBeans { private static final Logger log = LoggerFactory.getLogger(CommonBeans.class); - public static final String PREFIX = "apiary.messaging.consumer"; + public static final String CONSUMER_PROPERTIES_PREFIX = "apiary.messaging.consumer"; @Value("${instance.name:drone-fly}") private String instanceName; @@ -60,7 +60,7 @@ public HiveConf hiveConf() { @Bean @Primary - @ConfigurationProperties(PREFIX) + @ConfigurationProperties(CONSUMER_PROPERTIES_PREFIX) public Properties getEnvProperties() { return new Properties(); } From 7f50d5500bb96c9de52f1d9dce5a7aad089fbe0a Mon Sep 17 00:00:00 2001 From: Hanzel Legarda Date: Wed, 23 Oct 2024 09:22:35 -0600 Subject: [PATCH 08/12] fix: code review comments, renaming client to consumer, modifying conf parameters to fill only with prefix params --- README.md | 22 +++++++++---------- .../dronefly/app/context/CommonBeans.java | 20 +++++++---------- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index bfc969d..61b68c6 100644 --- a/README.md +++ b/README.md @@ -91,14 +91,14 @@ The table below describes all the available configuration values for Drone Fly. | endpoint.port | Port on which Drone Fly Spring Boot app will start. | `string` | `8008` | no | ### Additional configuration parameters -The Kafka message reader supports client properties that are passed to the Kafka consumer builder. -These are environment variables with the PREFIX apiary.messaging.client. +The Kafka message reader supports properties that are passed to the Kafka consumer builder. +These are environment variables with the PREFIX apiary.messaging.consumer. - #### Example for sending client parameters when using a Kafka cloud provider -- apiary.messaging.client.security.protocol=SSL -- apiary.messaging.client.sasl.mechanism=AWS_MSK_IAM -- apiary.messaging.client.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; -- apiary.messaging.client.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler + #### Example for sending consumer parameters when using a Kafka cloud provider +- apiary.messaging.consumer.security.protocol=SSL +- apiary.messaging.consumer.sasl.mechanism=AWS_MSK_IAM +- apiary.messaging.consumer.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; +- apiary.messaging.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler In this case we are sending the properties to Kafka's consumer to be able to connect to AWS MSK which also requires the IAM library included as a dependency in the POM.xml file @@ -106,10 +106,10 @@ In this case we are sending the properties to Kafka's consumer to be able to con --apiary.bootstrap.servers=localhost:9092 \ --apiary.kafka.topic.name=apiary \ --apiary.listener.list="com.expediagroup.sampleListener1,com.expediagroup.sampleListener2" \ - --apiary.messaging.client.security.protocol=SSL \ - --apiary.messaging.client.sasl.mechanism=AWS_MSK_IAM \ - --apiary.messaging.client.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; \ - --apiary.messaging.client.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler + --apiary.messaging.consumer.security.protocol=SSL \ + --apiary.messaging.consumer.sasl.mechanism=AWS_MSK_IAM \ + --apiary.messaging.consumer.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; \ + --apiary.messaging.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler ## Metrics diff --git a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java index 63c1796..9cba63c 100644 --- a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java +++ b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java @@ -19,7 +19,6 @@ import java.util.Properties; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -40,7 +39,7 @@ @Configuration public class CommonBeans { private static final Logger log = LoggerFactory.getLogger(CommonBeans.class); - public static final String PREFIX = "apiary.messaging.client."; + public static final String CONSUMER_PROPERTIES_PREFIX = "apiary.messaging.consumer"; @Value("${instance.name:drone-fly}") private String instanceName; @@ -61,7 +60,7 @@ public HiveConf hiveConf() { @Bean @Primary - @ConfigurationProperties + @ConfigurationProperties(CONSUMER_PROPERTIES_PREFIX) public Properties getEnvProperties() { return new Properties(); } @@ -77,7 +76,7 @@ public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException { @Bean public MessageReaderAdapter messageReaderAdapter() { - Properties clientProperties = getClientProperties(); + Properties clientProperties = getConsumerProperties(); KafkaMessageReader delegate = KafkaMessageReaderBuilder. builder(bootstrapServers, topicName, instanceName). withConsumerProperties(clientProperties). @@ -85,16 +84,13 @@ public MessageReaderAdapter messageReaderAdapter() { return new MessageReaderAdapter(delegate); } - private Properties getClientProperties() { - Properties clientProperties = new Properties(); + private Properties getConsumerProperties() { + Properties consumerProperties = new Properties(); getEnvProperties().forEach((key, value) -> { - if (key.toString().startsWith(PREFIX)) { - String keyWithoutPrefix = StringUtils.replace(key.toString(), PREFIX, ""); - clientProperties.put(keyWithoutPrefix, value.toString()); - log.info("Client property {} set with value: {}", keyWithoutPrefix, value); - } + consumerProperties.put(key.toString(), value.toString()); + log.info("Client property {} set with value: {}", key, value); } ); - return clientProperties; + return consumerProperties; } } \ No newline at end of file From 57050bb6095227705e8ba25b4789ac1d49773659 Mon Sep 17 00:00:00 2001 From: Hanzel Legarda Date: Wed, 23 Oct 2024 11:55:02 -0600 Subject: [PATCH 09/12] fix: code review comments, renaming client to consumer, modifying conf parameters to fill only with prefix params --- .../dataplatform/dronefly/app/context/CommonBeans.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java index 9cba63c..9a35844 100644 --- a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java +++ b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java @@ -76,10 +76,10 @@ public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException { @Bean public MessageReaderAdapter messageReaderAdapter() { - Properties clientProperties = getConsumerProperties(); + Properties consumerProperties = getConsumerProperties(); KafkaMessageReader delegate = KafkaMessageReaderBuilder. builder(bootstrapServers, topicName, instanceName). - withConsumerProperties(clientProperties). + withConsumerProperties(consumerProperties). build(); return new MessageReaderAdapter(delegate); } @@ -88,7 +88,7 @@ private Properties getConsumerProperties() { Properties consumerProperties = new Properties(); getEnvProperties().forEach((key, value) -> { consumerProperties.put(key.toString(), value.toString()); - log.info("Client property {} set with value: {}", key, value); + log.info("Consumer property {} set with value: {}", key, value); } ); return consumerProperties; } From 8c3b78617dcb69382f2a5f0615414efcefc4f82f Mon Sep 17 00:00:00 2001 From: Hanzel Legarda Date: Wed, 23 Oct 2024 12:26:38 -0600 Subject: [PATCH 10/12] Adding readme annotation --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4195c8..7a3a866 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [1.0.1] - 2024-10-24 +### Added +* Support for consumer properties allowing connecting to Kafka cloud provider. + ## [1.0.0] - 2023-04-27 ### Changed * Upgrade `Springboot` version from `2.3.3.RELEASE` to `2.7.10`. From c728606eb153c42e8f34ebacb1a575d3af724789 Mon Sep 17 00:00:00 2001 From: Hanzel Legarda Date: Tue, 5 Nov 2024 11:39:45 -0600 Subject: [PATCH 11/12] Upgrading msk iam version to 1.1.9 --- drone-fly-app/pom.xml | 2 +- drone-fly-integration-tests/pom.xml | 2 +- pom.xml | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/drone-fly-app/pom.xml b/drone-fly-app/pom.xml index 0c4edf0..1cc1eff 100644 --- a/drone-fly-app/pom.xml +++ b/drone-fly-app/pom.xml @@ -111,7 +111,7 @@ software.amazon.msk aws-msk-iam-auth - 1.1.1 + ${msk.iam.version} diff --git a/drone-fly-integration-tests/pom.xml b/drone-fly-integration-tests/pom.xml index b02d87d..481473b 100644 --- a/drone-fly-integration-tests/pom.xml +++ b/drone-fly-integration-tests/pom.xml @@ -79,7 +79,7 @@ software.amazon.msk aws-msk-iam-auth - 1.1.1 + ${msk.iam.version} diff --git a/pom.xml b/pom.xml index 116b03e..9f63cc4 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,7 @@ ${project.version} docker.io 2.17.1 + 1.1.9 From eec323c7864b7630c615d4c61569b16b77da9b9e Mon Sep 17 00:00:00 2001 From: Patrick Duin Date: Wed, 6 Nov 2024 08:53:14 +0100 Subject: [PATCH 12/12] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a3a866..594fbbe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). -## [1.0.1] - 2024-10-24 +## [1.0.1] - 2024-11-06 ### Added * Support for consumer properties allowing connecting to Kafka cloud provider.