From f735384f259fb32112565d446e195e348b3ea407 Mon Sep 17 00:00:00 2001 From: ChickenchickenLove Date: Sun, 24 Mar 2024 21:36:56 +0900 Subject: [PATCH 1/5] Implementation to integrate ParallelConsumer. --- build.gradle | 4 + .../annotation/EnableParallelConsumer.java | 39 +++++++ .../kafka/config/ParallelConsumerConfig.java | 108 ++++++++++++++++++ .../config/ParallelConsumerConfiguration.java | 47 ++++++++ .../kafka/config/ParallelConsumerContext.java | 60 ++++++++++ .../ParallelConsumerImportSelector.java | 35 ++++++ .../kafka/core/ParallelConsumerCallback.java | 42 +++++++ .../kafka/core/ParallelConsumerFactory.java | 65 +++++++++++ 8 files changed, 400 insertions(+) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java diff --git a/build.gradle b/build.gradle index bf86b4d892..5c9eae6758 100644 --- a/build.gradle +++ b/build.gradle @@ -73,6 +73,7 @@ ext { springRetryVersion = '2.0.5' springVersion = '6.1.5' zookeeperVersion = '3.8.4' + parallelConsumerVersion = '0.5.2.8' idPrefix = 'kafka' @@ -289,6 +290,9 @@ project ('spring-kafka') { exclude group: 'org.jetbrains.kotlin' } + // Parallel Consumer + api "io.confluent.parallelconsumer:parallel-consumer-core:$parallelConsumerVersion" + // Spring Data projection message binding support optionalApi ("org.springframework.data:spring-data-commons") { exclude group: 'org.springframework' diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java new file mode 100644 index 0000000000..6966edbde3 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java @@ -0,0 +1,39 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.annotation; + +import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.ParallelConsumerConfiguration; +import org.springframework.kafka.config.ParallelConsumerImportSelector; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * If you want to import {@link ParallelConsumerConfiguration} to your application, + * you just annotated {@link EnableParallelConsumer} to your spring application. + * @author ... + * @since 3.2.0 + */ + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Import(ParallelConsumerImportSelector.class) +public @interface EnableParallelConsumer { +} \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java new file mode 100644 index 0000000000..6abc17a387 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java @@ -0,0 +1,108 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.Consumer; + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; +import org.springframework.util.StringUtils; +import org.springframework.kafka.annotation.EnableParallelConsumer; + +/** + * ParallelConsumerConfig is for config of {@link io.confluent.parallelconsumer}. + * This will be registered as Spring Bean when {@link EnableParallelConsumer} is annotated to your spring application. + * @author ... + * @since 3.2.0 + */ + +public class ParallelConsumerConfig { + + + private static final String PARALLEL_CONSUMER_MAX_CONCURRENCY = "PARALLEL_CONSUMER_MAX_CONCURRENCY"; + private static final String PARALLEL_CONSUMER_ORDERING = "PARALLEL_CONSUMER_ORDERING"; + private static final String ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT = "ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT"; + private static final String COMMIT_LOCK_ACQUISITION_TIMEOUT = "COMMIT_LOCK_ACQUISITION_TIMEOUT"; + private static final String COMMIT_INTERVAL = "COMMIT_INTERVAL"; + private final Map properties = new HashMap<>(); + + public ParallelConsumerConfig() { + + final String maxConcurrency = System.getenv(PARALLEL_CONSUMER_MAX_CONCURRENCY); + final String ordering = System.getenv(PARALLEL_CONSUMER_ORDERING); + final String allowEagerProcessingDuringTransactionCommit = System.getenv(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT); + final String commitLockAcquisitionTimeout = System.getenv(COMMIT_LOCK_ACQUISITION_TIMEOUT); + final String commitInterval = System.getenv(COMMIT_INTERVAL); + + this.properties.put(PARALLEL_CONSUMER_MAX_CONCURRENCY, maxConcurrency); + this.properties.put(PARALLEL_CONSUMER_ORDERING, ordering); + this.properties.put(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT, allowEagerProcessingDuringTransactionCommit); + this.properties.put(COMMIT_LOCK_ACQUISITION_TIMEOUT, commitLockAcquisitionTimeout); + this.properties.put(COMMIT_INTERVAL, commitInterval); + } + + private ProcessingOrder toOrder(String order) { + return switch (order) { + case "partition" -> ProcessingOrder.PARTITION; + case "unordered" -> ProcessingOrder.UNORDERED; + default -> ProcessingOrder.KEY; // Confluent Consumer Default Policy + }; + } + + public ParallelConsumerOptions toConsumerOptions(Consumer consumer) { + + ParallelConsumerOptions.ParallelConsumerOptionsBuilder builder = ParallelConsumerOptions.builder(); + builder.consumer(consumer); + + final String maxConcurrencyString = this.properties.get(PARALLEL_CONSUMER_MAX_CONCURRENCY); + final String orderingString = this.properties.get(PARALLEL_CONSUMER_ORDERING); + final String allowEagerProcessingDuringTransactionCommitString = this.properties.get(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT); + final String commitLockAcquisitionTimeoutString = this.properties.get(COMMIT_LOCK_ACQUISITION_TIMEOUT); + final String commitIntervalString = this.properties.get(COMMIT_INTERVAL); + + if (StringUtils.hasText(maxConcurrencyString)) { + final Integer maxConcurrency = Integer.valueOf(maxConcurrencyString); + builder.maxConcurrency(maxConcurrency); + } + + if (StringUtils.hasText(orderingString)) { + final ProcessingOrder processingOrder = toOrder(orderingString); + builder.ordering(processingOrder); + } + + if (StringUtils.hasText(allowEagerProcessingDuringTransactionCommitString)) { + final Boolean allowEagerProcessingDuringTransactionCommit = Boolean.valueOf(allowEagerProcessingDuringTransactionCommitString); + builder.allowEagerProcessingDuringTransactionCommit(allowEagerProcessingDuringTransactionCommit); + } + + if (StringUtils.hasText(commitLockAcquisitionTimeoutString)) { + final Long commitLockAcquisitionTimeout = Long.valueOf(commitLockAcquisitionTimeoutString); + builder.commitLockAcquisitionTimeout(Duration.ofSeconds(commitLockAcquisitionTimeout)); + } + + if (StringUtils.hasText(commitIntervalString)) { + final Long commitInterval = Long.valueOf(commitIntervalString); + builder.commitInterval(Duration.ofMillis(commitInterval)); + } + + return builder.build(); + } +} \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java new file mode 100644 index 0000000000..a52bdb1de4 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java @@ -0,0 +1,47 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.ParallelConsumerCallback; +import org.springframework.kafka.core.ParallelConsumerFactory; +import org.springframework.kafka.annotation.EnableParallelConsumer; + +/** + * If User decide to use parallelConsumer on SpringKafka, User should import this class to their ComponentScan scopes. + * If so, this class will register both {@link ParallelConsumerContext} and {@link ParallelConsumerFactory} as Spring Bean. + * User has responsibility + * 1. annotated {@link EnableParallelConsumer} on their spring application + * 2. register ConcreteClass of {@link ParallelConsumerCallback}. + * @author ... + * @since 3.2.0 + */ + +public class ParallelConsumerConfiguration { + + @Bean(name = ParallelConsumerContext.DEFAULT_BEAN_NAME) + public ParallelConsumerContext parallelConsumerContext(ParallelConsumerCallback parallelConsumerCallback) { + return new ParallelConsumerContext(parallelConsumerCallback); + } + + @Bean(name = ParallelConsumerFactory.DEFAULT_BEAN_NAME) + public ParallelConsumerFactory parallelConsumerFactory(DefaultKafkaConsumerFactory consumerFactory, + ParallelConsumerContext parallelConsumerContext) { + return new ParallelConsumerFactory(parallelConsumerContext, consumerFactory); + } +} \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java new file mode 100644 index 0000000000..b07a35cdc7 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java @@ -0,0 +1,60 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.kafka.core.ParallelConsumerCallback; + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelStreamProcessor; + +/** + * This class is for aggregating all related with ParallelConsumer. + * @author ... + * @since 3.2.0 + */ + + +public class ParallelConsumerContext { + + public static final String DEFAULT_BEAN_NAME = "parallelConsumerContext"; + private final ParallelConsumerConfig parallelConsumerConfig; + private final ParallelConsumerCallback parallelConsumerCallback; + private ParallelStreamProcessor processor; + + public ParallelConsumerContext(ParallelConsumerCallback callback) { + this.parallelConsumerConfig = new ParallelConsumerConfig(); + this.parallelConsumerCallback = callback; + } + + + public ParallelConsumerCallback parallelConsumerCallback() { + return this.parallelConsumerCallback; + } + + + public ParallelStreamProcessor createConsumer(Consumer consumer) { + final ParallelConsumerOptions options = parallelConsumerConfig.toConsumerOptions(consumer); + this.processor = ParallelStreamProcessor.createEosStreamProcessor(options); + return this.processor; + } + + public void stopParallelConsumer() { + this.processor.close(); + } + +} \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java new file mode 100644 index 0000000000..9cc32b013c --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java @@ -0,0 +1,35 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + +import org.springframework.context.annotation.ImportSelector; +import org.springframework.core.type.AnnotationMetadata; +import org.springframework.kafka.annotation.EnableParallelConsumer; +/** + * ParallelConsumerImportSelector is to register {@link ParallelConsumerConfiguration}. + * If you want to import {@link ParallelConsumerConfiguration} to your application, + * you just annotated {@link EnableParallelConsumer} to your spring application. + * @author ... + * @since 3.2.0 + */ + +public class ParallelConsumerImportSelector implements ImportSelector { + @Override + public String[] selectImports(AnnotationMetadata importingClassMetadata) { + return new String[]{ParallelConsumerConfiguration.class.getName()}; + } +} \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java new file mode 100644 index 0000000000..7bdf7ae06d --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java @@ -0,0 +1,42 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.util.List; + +import io.confluent.parallelconsumer.PollContext; +import org.springframework.kafka.config.ParallelConsumerContext; + +/** + * User should create ConcreteClass of this and register it as Spring Bean. + * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext}, + * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start. + * @author ... + * @since 3.2.0 + */ + +public interface ParallelConsumerCallback { + + /** + * This is for {@link ParallelConsumerFactory} and {@link ParallelConsumerContext}. + * ParallelConsumer will process the consumed messages using this callback. + * @param context context which Parallel Consumer produce + * @return void. + */ + void accept(PollContext context); + List getTopics(); +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java new file mode 100644 index 0000000000..ab70a0503e --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java @@ -0,0 +1,65 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.context.SmartLifecycle; +import org.springframework.kafka.config.ParallelConsumerContext; + +import io.confluent.parallelconsumer.ParallelStreamProcessor; + +/** + * ParallelConsumerFactory will be started and closed by Spring LifeCycle. + * This class is quite simple, because ParallelConsumer requires delegating the situation to itself. + * @author ... + * @since 3.2.0 + */ + +public class ParallelConsumerFactory implements SmartLifecycle { + + public static final String DEFAULT_BEAN_NAME = "parallelConsumerFactory"; + private final ParallelConsumerContext parallelConsumerContext; + private final DefaultKafkaConsumerFactory defaultKafkaConsumerFactory; + private boolean running; + + public ParallelConsumerFactory(ParallelConsumerContext parallelConsumerContext, + DefaultKafkaConsumerFactory defaultKafkaConsumerFactory) { + this.parallelConsumerContext = parallelConsumerContext; + this.defaultKafkaConsumerFactory = defaultKafkaConsumerFactory; + } + + + @Override + public void start() { + final Consumer consumer = defaultKafkaConsumerFactory.createConsumer(); + final ParallelStreamProcessor parallelConsumer = parallelConsumerContext.createConsumer(consumer); + parallelConsumer.subscribe(parallelConsumerContext.parallelConsumerCallback().getTopics()); + parallelConsumer.poll(recordContexts -> parallelConsumerContext.parallelConsumerCallback().accept(recordContexts)); + this.running = true; + } + + @Override + public void stop() { + this.parallelConsumerContext.stopParallelConsumer(); + this.running = false; + } + + @Override + public boolean isRunning() { + return this.running; + } +} \ No newline at end of file From 5be340db6d196494bb3498aeb33101f9dfb98331 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sat, 27 Apr 2024 22:40:50 +0900 Subject: [PATCH 2/5] refactoring --- .../kafka/config/ParallelConsumerConfig.java | 23 +++- .../config/ParallelConsumerConfiguration.java | 19 ++- .../kafka/config/ParallelConsumerContext.java | 35 ++++-- .../kafka/core/ParallelConsumerCallback.java | 16 ++- .../kafka/core/ParallelConsumerFactory.java | 115 ++++++++++++++++-- .../kafka/core/PollAndProduceCallback.java | 40 ++++++ .../core/PollAndProduceManyCallback.java | 40 ++++++ .../PollAndProduceManyResultCallback.java | 32 +++++ .../core/PollAndProduceResultCallback.java | 31 +++++ .../kafka/core/PollCallback.java | 45 +++++++ .../kafka/core/ResultConsumerCallback.java | 9 ++ 11 files changed, 369 insertions(+), 36 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java index 6abc17a387..0ad99e6bb2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java @@ -24,19 +24,22 @@ import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; + +import org.apache.kafka.clients.producer.Producer; import org.springframework.util.StringUtils; import org.springframework.kafka.annotation.EnableParallelConsumer; +// It would be better to be migrated to org.springframework.boot.autoconfigure.kafka.KafkaProperties. /** * ParallelConsumerConfig is for config of {@link io.confluent.parallelconsumer}. * This will be registered as Spring Bean when {@link EnableParallelConsumer} is annotated to your spring application. * @author ... - * @since 3.2.0 + * @since 3.3 */ public class ParallelConsumerConfig { - + public static final String DEFAULT_BEAN_NAME = "parallelConsumerConfig"; private static final String PARALLEL_CONSUMER_MAX_CONCURRENCY = "PARALLEL_CONSUMER_MAX_CONCURRENCY"; private static final String PARALLEL_CONSUMER_ORDERING = "PARALLEL_CONSUMER_ORDERING"; private static final String ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT = "ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT"; @@ -67,9 +70,19 @@ private ProcessingOrder toOrder(String order) { }; } - public ParallelConsumerOptions toConsumerOptions(Consumer consumer) { + public ParallelConsumerOptions toConsumerOptions( + ParallelConsumerOptions.ParallelConsumerOptionsBuilder builder, + Consumer consumer, + Producer producer) { + + builder.producer(producer); + return toConsumerOptions(builder, consumer); + } + + public ParallelConsumerOptions toConsumerOptions( + ParallelConsumerOptions.ParallelConsumerOptionsBuilder builder, + Consumer consumer) { - ParallelConsumerOptions.ParallelConsumerOptionsBuilder builder = ParallelConsumerOptions.builder(); builder.consumer(consumer); final String maxConcurrencyString = this.properties.get(PARALLEL_CONSUMER_MAX_CONCURRENCY); @@ -105,4 +118,4 @@ public ParallelConsumerOptions toConsumerOptions(Consumer cons return builder.build(); } -} \ No newline at end of file +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java index a52bdb1de4..ebf1c011d4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java @@ -16,8 +16,11 @@ package org.springframework.kafka.config; +import javax.annotation.Nullable; + import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.ParallelConsumerCallback; import org.springframework.kafka.core.ParallelConsumerFactory; import org.springframework.kafka.annotation.EnableParallelConsumer; @@ -34,14 +37,22 @@ public class ParallelConsumerConfiguration { + @Bean(name = ParallelConsumerConfig.DEFAULT_BEAN_NAME) + public ParallelConsumerConfig parallelConsumerConfig() { + return new ParallelConsumerConfig(); + } + @Bean(name = ParallelConsumerContext.DEFAULT_BEAN_NAME) - public ParallelConsumerContext parallelConsumerContext(ParallelConsumerCallback parallelConsumerCallback) { - return new ParallelConsumerContext(parallelConsumerCallback); + public ParallelConsumerContext parallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, + ParallelConsumerCallback parallelConsumerCallback) { + return new ParallelConsumerContext(parallelConsumerConfig, + parallelConsumerCallback); } @Bean(name = ParallelConsumerFactory.DEFAULT_BEAN_NAME) public ParallelConsumerFactory parallelConsumerFactory(DefaultKafkaConsumerFactory consumerFactory, + DefaultKafkaProducerFactory producerFactory, ParallelConsumerContext parallelConsumerContext) { - return new ParallelConsumerFactory(parallelConsumerContext, consumerFactory); + return new ParallelConsumerFactory(parallelConsumerContext, consumerFactory, producerFactory); } -} \ No newline at end of file +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java index b07a35cdc7..2c01b2786c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java @@ -16,11 +16,17 @@ package org.springframework.kafka.config; +import java.time.Duration; + import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; import org.springframework.kafka.core.ParallelConsumerCallback; +import io.confluent.parallelconsumer.JStreamParallelStreamProcessor; import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder; import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode; /** * This class is for aggregating all related with ParallelConsumer. @@ -33,28 +39,31 @@ public class ParallelConsumerContext { public static final String DEFAULT_BEAN_NAME = "parallelConsumerContext"; private final ParallelConsumerConfig parallelConsumerConfig; - private final ParallelConsumerCallback parallelConsumerCallback; - private ParallelStreamProcessor processor; + private final ParallelConsumerCallback parallelConsumerCallback; - public ParallelConsumerContext(ParallelConsumerCallback callback) { - this.parallelConsumerConfig = new ParallelConsumerConfig(); + public ParallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, + ParallelConsumerCallback callback) { + this.parallelConsumerConfig = parallelConsumerConfig; this.parallelConsumerCallback = callback; } - - public ParallelConsumerCallback parallelConsumerCallback() { + public ParallelConsumerCallback parallelConsumerCallback() { return this.parallelConsumerCallback; } + public ParallelConsumerOptions getParallelConsumerOptions(Consumer consumer) { + final ParallelConsumerOptionsBuilder builder = ParallelConsumerOptions.builder(); + return parallelConsumerConfig.toConsumerOptions(builder, consumer); + } - public ParallelStreamProcessor createConsumer(Consumer consumer) { - final ParallelConsumerOptions options = parallelConsumerConfig.toConsumerOptions(consumer); - this.processor = ParallelStreamProcessor.createEosStreamProcessor(options); - return this.processor; + public ParallelConsumerOptions getParallelConsumerOptions(Consumer consumer, Producer producer) { + final ParallelConsumerOptionsBuilder builder = ParallelConsumerOptions.builder(); + return parallelConsumerConfig.toConsumerOptions(builder, consumer, producer); } - public void stopParallelConsumer() { - this.processor.close(); + public void stop(ParallelStreamProcessor parallelStreamProcessor) { + parallelStreamProcessor.close(); + } -} \ No newline at end of file +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java index 7bdf7ae06d..6c3072a15f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java @@ -17,8 +17,12 @@ package org.springframework.kafka.core; import java.util.List; +import java.util.regex.Pattern; import io.confluent.parallelconsumer.PollContext; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.config.ParallelConsumerContext; /** @@ -32,11 +36,13 @@ public interface ParallelConsumerCallback { /** - * This is for {@link ParallelConsumerFactory} and {@link ParallelConsumerContext}. - * ParallelConsumer will process the consumed messages using this callback. - * @param context context which Parallel Consumer produce - * @return void. + * ... */ - void accept(PollContext context); List getTopics(); + default Pattern getSubscribeTopicsPattern(){ + return null; + } + default ConsumerRebalanceListener getRebalanceListener(){ + return null; + } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java index ab70a0503e..4cea9234c9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java @@ -16,45 +16,113 @@ package org.springframework.kafka.core; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.regex.Pattern; + import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.springframework.context.SmartLifecycle; import org.springframework.kafka.config.ParallelConsumerContext; +import io.confluent.parallelconsumer.ParallelConsumer; +import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; /** * ParallelConsumerFactory will be started and closed by Spring LifeCycle. * This class is quite simple, because ParallelConsumer requires delegating the situation to itself. - * @author ... + * @author Sanghyeok An * @since 3.2.0 */ public class ParallelConsumerFactory implements SmartLifecycle { public static final String DEFAULT_BEAN_NAME = "parallelConsumerFactory"; - private final ParallelConsumerContext parallelConsumerContext; + private final DefaultKafkaConsumerFactory defaultKafkaConsumerFactory; + private final DefaultKafkaProducerFactory defaultKafkaProducerFactory; + private final ParallelConsumerContext parallelConsumerContext; + + private final ParallelStreamProcessor parallelConsumer; + private final ParallelConsumerOptions parallelConsumerOptions; private boolean running; public ParallelConsumerFactory(ParallelConsumerContext parallelConsumerContext, - DefaultKafkaConsumerFactory defaultKafkaConsumerFactory) { + DefaultKafkaConsumerFactory defaultKafkaConsumerFactory, + DefaultKafkaProducerFactory defaultKafkaProducerFactory) { this.parallelConsumerContext = parallelConsumerContext; this.defaultKafkaConsumerFactory = defaultKafkaConsumerFactory; + this.defaultKafkaProducerFactory = defaultKafkaProducerFactory; + + final Consumer kafkaConsumer = defaultKafkaConsumerFactory.createConsumer(); + final Producer kafkaProducer = defaultKafkaProducerFactory.createProducer(); + this.parallelConsumerOptions = parallelConsumerOptions(kafkaConsumer, kafkaProducer); + this.parallelConsumer = ParallelStreamProcessor.createEosStreamProcessor(this.parallelConsumerOptions); } + private ParallelConsumerOptions parallelConsumerOptions(Consumer consumer, + Producer producer) { + final ParallelConsumerCallback callback = parallelConsumerContext.parallelConsumerCallback(); + if (callback instanceof PollAndProduceManyCallback || + callback instanceof PollAndProduceCallback) { + return parallelConsumerContext.getParallelConsumerOptions(consumer, producer); + } else { + return parallelConsumerContext.getParallelConsumerOptions(consumer); + } + } + @Override public void start() { - final Consumer consumer = defaultKafkaConsumerFactory.createConsumer(); - final ParallelStreamProcessor parallelConsumer = parallelConsumerContext.createConsumer(consumer); - parallelConsumer.subscribe(parallelConsumerContext.parallelConsumerCallback().getTopics()); - parallelConsumer.poll(recordContexts -> parallelConsumerContext.parallelConsumerCallback().accept(recordContexts)); + subscribe(); + + final ParallelConsumerCallback callback0 = parallelConsumerContext.parallelConsumerCallback(); + + if (callback0 instanceof ResultConsumerCallback) { + if (callback0 instanceof PollAndProduceManyResultCallback) { + final PollAndProduceManyResultCallback callback = + (PollAndProduceManyResultCallback) callback0; + + this.parallelConsumer.pollAndProduceMany(callback::accept, callback::resultConsumer); + } else if (callback0 instanceof PollAndProduceCallback) { + final PollAndProduceResultCallback callback = + (PollAndProduceResultCallback) callback0; + + this.parallelConsumer.pollAndProduce(callback::accept, callback::resultConsumer); + } else { + throw new UnsupportedOperationException(); + } + } else { + if (callback0 instanceof PollAndProduceManyCallback) { + final PollAndProduceManyCallback callback = + (PollAndProduceManyCallback) callback0; + + this.parallelConsumer.pollAndProduceMany(callback::accept); + } else if (callback0 instanceof PollAndProduceCallback) { + final PollAndProduceCallback callback = + (PollAndProduceCallback) callback0; + + this.parallelConsumer.pollAndProduce(callback::accept); + } else if (callback0 instanceof PollCallback) { + final PollCallback callback = (PollCallback) callback0; + + this.parallelConsumer.poll(callback::accept); + } else { + throw new UnsupportedOperationException(); + } + } this.running = true; } @Override public void stop() { - this.parallelConsumerContext.stopParallelConsumer(); + this.parallelConsumerContext.stop(this.parallelConsumer); this.running = false; } @@ -62,4 +130,33 @@ public void stop() { public boolean isRunning() { return this.running; } -} \ No newline at end of file + + private void subscribe() { + final ParallelConsumerCallback callback = this.parallelConsumerContext.parallelConsumerCallback(); + + final List topics = callback.getTopics(); + final ConsumerRebalanceListener rebalanceListener = callback.getRebalanceListener(); + + if (topics != null && !topics.isEmpty()) { + subscribe(topics, rebalanceListener); + } else { + subscribe(callback.getSubscribeTopicsPattern(), rebalanceListener); + } + } + + private void subscribe(Collection topics, ConsumerRebalanceListener callback){ + if (callback == null) { + this.parallelConsumer.subscribe(topics); + } else { + this.parallelConsumer.subscribe(topics, callback); + } + } + + private void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { + if (callback == null) { + this.parallelConsumer.subscribe(pattern); + } else { + this.parallelConsumer.subscribe(pattern, callback); + } + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java new file mode 100644 index 0000000000..3d6d27f253 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java @@ -0,0 +1,40 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.util.List; +import java.util.function.Consumer; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.config.ParallelConsumerContext; + +import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; +import io.confluent.parallelconsumer.PollContext; + +/** + * ... + */ + +public interface PollAndProduceCallback extends ParallelConsumerCallback { + + /** + * ... + */ + ProducerRecord accept(PollContext context); +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java new file mode 100644 index 0000000000..8868f9ffe5 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java @@ -0,0 +1,40 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.util.List; +import java.util.function.Consumer; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.config.ParallelConsumerContext; + +import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; +import io.confluent.parallelconsumer.PollContext; + +/** + * ... + */ + +public interface PollAndProduceManyCallback extends ParallelConsumerCallback { + + /** + * ... + */ + List> accept(PollContext context); +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java new file mode 100644 index 0000000000..885991cf3c --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java @@ -0,0 +1,32 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.util.List; +import java.util.function.Consumer; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; +import io.confluent.parallelconsumer.PollContext; + +/** + * ... + */ + +public interface PollAndProduceManyResultCallback extends PollAndProduceManyCallback, ResultConsumerCallback { +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java new file mode 100644 index 0000000000..57f1f68732 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.util.function.Consumer; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; +import io.confluent.parallelconsumer.PollContext; + +/** + * ... + */ + +public interface PollAndProduceResultCallback extends PollAndProduceCallback, ResultConsumerCallback { +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java new file mode 100644 index 0000000000..5b7e765dc9 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java @@ -0,0 +1,45 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.config.ParallelConsumerContext; + +import io.confluent.parallelconsumer.PollContext; + +/** + * User should create ConcreteClass of this and register it as Spring Bean. + * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext}, + * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start. + * @author ... + * @since 3.2.0 + */ + +public interface PollCallback extends ParallelConsumerCallback { + + /** + * This is for {@link ParallelConsumerFactory} and {@link ParallelConsumerContext}. + * ParallelConsumer will process the consumed messages using this callback. + * @param context context which Parallel Consumer produce + * @return void. + */ + void accept(PollContext context); +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java new file mode 100644 index 0000000000..07a32cf437 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java @@ -0,0 +1,9 @@ +package org.springframework.kafka.core; + +import java.util.function.Consumer; + +import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; + +public interface ResultConsumerCallback { + Consumer> resultConsumer(ConsumeProduceResult result); +} From 4eca9795342bc482e39214c1271a718ac24611a6 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sun, 28 Apr 2024 08:11:52 +0900 Subject: [PATCH 3/5] refactor --- ...allelConsumerOptionsProviderCondition.java | 14 ++ .../kafka/config/ParallelConsumerConfig.java | 152 ++++++++++------ .../config/ParallelConsumerConfiguration.java | 25 ++- .../kafka/config/ParallelConsumerContext.java | 13 +- .../kafka/core/ParallelConsumerFactory.java | 86 +++++---- .../ParallelConsumerCallback.java | 22 ++- .../ParallelConsumerOptionsProvider.java | 168 ++++++++++++++++++ .../Poll.java} | 16 +- .../PollAndProduce.java} | 16 +- .../PollAndProduceMany.java} | 15 +- .../PollAndProduceManyResult.java} | 23 +-- .../PollAndProduceResult.java} | 23 +-- 12 files changed, 406 insertions(+), 167 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java rename spring-kafka/src/main/java/org/springframework/kafka/core/{ => parallelconsumer}/ParallelConsumerCallback.java (76%) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollCallback.java => parallelconsumer/Poll.java} (80%) rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollAndProduceResultCallback.java => parallelconsumer/PollAndProduce.java} (72%) rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollAndProduceManyResultCallback.java => parallelconsumer/PollAndProduceMany.java} (73%) rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollAndProduceCallback.java => parallelconsumer/PollAndProduceManyResult.java} (51%) rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollAndProduceManyCallback.java => parallelconsumer/PollAndProduceResult.java} (50%) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java b/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java new file mode 100644 index 0000000000..a383383755 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java @@ -0,0 +1,14 @@ +package org.springframework.kafka.config; + +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.type.AnnotatedTypeMetadata; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider; + +public class OnMissingParallelConsumerOptionsProviderCondition implements Condition { + + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + return context.getBeanFactory().getBean(ParallelConsumerOptionsProvider.class) == null; + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java index 0ad99e6bb2..8cb5557f35 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java @@ -16,18 +16,15 @@ package org.springframework.kafka.config; -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; import org.apache.kafka.clients.consumer.Consumer; import io.confluent.parallelconsumer.ParallelConsumerOptions; -import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; import org.apache.kafka.clients.producer.Producer; -import org.springframework.util.StringUtils; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider; import org.springframework.kafka.annotation.EnableParallelConsumer; + // It would be better to be migrated to org.springframework.boot.autoconfigure.kafka.KafkaProperties. /** @@ -37,40 +34,16 @@ * @since 3.3 */ -public class ParallelConsumerConfig { +public class ParallelConsumerConfig { public static final String DEFAULT_BEAN_NAME = "parallelConsumerConfig"; - private static final String PARALLEL_CONSUMER_MAX_CONCURRENCY = "PARALLEL_CONSUMER_MAX_CONCURRENCY"; - private static final String PARALLEL_CONSUMER_ORDERING = "PARALLEL_CONSUMER_ORDERING"; - private static final String ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT = "ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT"; - private static final String COMMIT_LOCK_ACQUISITION_TIMEOUT = "COMMIT_LOCK_ACQUISITION_TIMEOUT"; - private static final String COMMIT_INTERVAL = "COMMIT_INTERVAL"; - private final Map properties = new HashMap<>(); - - public ParallelConsumerConfig() { - - final String maxConcurrency = System.getenv(PARALLEL_CONSUMER_MAX_CONCURRENCY); - final String ordering = System.getenv(PARALLEL_CONSUMER_ORDERING); - final String allowEagerProcessingDuringTransactionCommit = System.getenv(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT); - final String commitLockAcquisitionTimeout = System.getenv(COMMIT_LOCK_ACQUISITION_TIMEOUT); - final String commitInterval = System.getenv(COMMIT_INTERVAL); - - this.properties.put(PARALLEL_CONSUMER_MAX_CONCURRENCY, maxConcurrency); - this.properties.put(PARALLEL_CONSUMER_ORDERING, ordering); - this.properties.put(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT, allowEagerProcessingDuringTransactionCommit); - this.properties.put(COMMIT_LOCK_ACQUISITION_TIMEOUT, commitLockAcquisitionTimeout); - this.properties.put(COMMIT_INTERVAL, commitInterval); - } + private final ParallelConsumerOptionsProvider provider; - private ProcessingOrder toOrder(String order) { - return switch (order) { - case "partition" -> ProcessingOrder.PARTITION; - case "unordered" -> ProcessingOrder.UNORDERED; - default -> ProcessingOrder.KEY; // Confluent Consumer Default Policy - }; + public ParallelConsumerConfig(ParallelConsumerOptionsProvider provider) { + this.provider = provider; } - public ParallelConsumerOptions toConsumerOptions( + public ParallelConsumerOptions toConsumerOptions( ParallelConsumerOptions.ParallelConsumerOptionsBuilder builder, Consumer consumer, Producer producer) { @@ -79,43 +52,110 @@ public ParallelConsumerOptions toConsumerOptions( return toConsumerOptions(builder, consumer); } - public ParallelConsumerOptions toConsumerOptions( + public ParallelConsumerOptions toConsumerOptions( ParallelConsumerOptions.ParallelConsumerOptionsBuilder builder, Consumer consumer) { - builder.consumer(consumer); + return buildRemainOptions(builder); + } + + private ParallelConsumerOptions buildRemainOptions(ParallelConsumerOptions.ParallelConsumerOptionsBuilder builder) { + if (this.provider.managedExecutorService() != null){ + builder.managedExecutorService(this.provider.managedExecutorService()); + } - final String maxConcurrencyString = this.properties.get(PARALLEL_CONSUMER_MAX_CONCURRENCY); - final String orderingString = this.properties.get(PARALLEL_CONSUMER_ORDERING); - final String allowEagerProcessingDuringTransactionCommitString = this.properties.get(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT); - final String commitLockAcquisitionTimeoutString = this.properties.get(COMMIT_LOCK_ACQUISITION_TIMEOUT); - final String commitIntervalString = this.properties.get(COMMIT_INTERVAL); + if (this.provider.managedThreadFactory() != null){ + builder.managedThreadFactory(this.provider.managedThreadFactory()); + } + + if (this.provider.meterRegistry() != null){ + builder.meterRegistry(this.provider.meterRegistry()); + } + + if (this.provider.pcInstanceTag() != null){ + builder.pcInstanceTag(this.provider.pcInstanceTag()); + } + + if (this.provider.metricsTags() != null){ + builder.metricsTags(this.provider.metricsTags()); + } + + if (this.provider.allowEagerProcessingDuringTransactionCommit() != null){ + builder.allowEagerProcessingDuringTransactionCommit(this.provider.allowEagerProcessingDuringTransactionCommit()); + } + + if (this.provider.commitLockAcquisitionTimeout() != null){ + builder.commitLockAcquisitionTimeout(this.provider.commitLockAcquisitionTimeout()); + } + + if (this.provider.produceLockAcquisitionTimeout() != null){ + builder.produceLockAcquisitionTimeout(this.provider.produceLockAcquisitionTimeout()); + } + + if (this.provider.commitInterval() != null){ + builder.commitInterval(this.provider.commitInterval()); + } - if (StringUtils.hasText(maxConcurrencyString)) { - final Integer maxConcurrency = Integer.valueOf(maxConcurrencyString); - builder.maxConcurrency(maxConcurrency); + if (this.provider.ordering() != null){ + builder.ordering(this.provider.ordering()); } - if (StringUtils.hasText(orderingString)) { - final ProcessingOrder processingOrder = toOrder(orderingString); - builder.ordering(processingOrder); + if (this.provider.commitMode() != null){ + builder.commitMode(this.provider.commitMode()); } - if (StringUtils.hasText(allowEagerProcessingDuringTransactionCommitString)) { - final Boolean allowEagerProcessingDuringTransactionCommit = Boolean.valueOf(allowEagerProcessingDuringTransactionCommitString); - builder.allowEagerProcessingDuringTransactionCommit(allowEagerProcessingDuringTransactionCommit); + if (this.provider.maxConcurrency() != null){ + builder.maxConcurrency(this.provider.maxConcurrency()); } - if (StringUtils.hasText(commitLockAcquisitionTimeoutString)) { - final Long commitLockAcquisitionTimeout = Long.valueOf(commitLockAcquisitionTimeoutString); - builder.commitLockAcquisitionTimeout(Duration.ofSeconds(commitLockAcquisitionTimeout)); + if (this.provider.invalidOffsetMetadataPolicy() != null){ + builder.invalidOffsetMetadataPolicy(this.provider.invalidOffsetMetadataPolicy()); } - if (StringUtils.hasText(commitIntervalString)) { - final Long commitInterval = Long.valueOf(commitIntervalString); - builder.commitInterval(Duration.ofMillis(commitInterval)); + if (this.provider.retryDelayProvider() != null){ + builder.retryDelayProvider(this.provider.retryDelayProvider()); + } + + if (this.provider.sendTimeout() != null){ + builder.sendTimeout(this.provider.sendTimeout()); + } + + if (this.provider.offsetCommitTimeout() != null){ + builder.offsetCommitTimeout(this.provider.offsetCommitTimeout()); + } + + if (this.provider.batchSize() != null){ + builder.batchSize(this.provider.batchSize()); + } + + if (this.provider.thresholdForTimeSpendInQueueWarning() != null){ + builder.thresholdForTimeSpendInQueueWarning(this.provider.thresholdForTimeSpendInQueueWarning()); + } + + if (this.provider.maxFailureHistory() != null){ + builder.maxFailureHistory(this.provider.maxFailureHistory()); + } + + if (this.provider.shutdownTimeout() != null){ + builder.shutdownTimeout(this.provider.shutdownTimeout()); + } + + if (this.provider.drainTimeout() != null){ + builder.drainTimeout(this.provider.drainTimeout()); + } + + if (this.provider.messageBufferSize() != null){ + builder.messageBufferSize(this.provider.messageBufferSize()); + } + + if (this.provider.initialLoadFactor() != null){ + builder.initialLoadFactor(this.provider.initialLoadFactor()); + } + if (this.provider.maximumLoadFactor() != null){ + builder.maximumLoadFactor(this.provider.maximumLoadFactor()); } return builder.build(); } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java index ebf1c011d4..da5e556826 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java @@ -16,14 +16,14 @@ package org.springframework.kafka.config; -import javax.annotation.Nullable; - import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.ParallelConsumerCallback; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback; import org.springframework.kafka.core.ParallelConsumerFactory; import org.springframework.kafka.annotation.EnableParallelConsumer; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider; /** * If User decide to use parallelConsumer on SpringKafka, User should import this class to their ComponentScan scopes. @@ -37,22 +37,29 @@ public class ParallelConsumerConfiguration { + @Bean + @Conditional(OnMissingParallelConsumerOptionsProviderCondition.class) + public ParallelConsumerOptionsProvider parallelConsumerOptionsProvider() { + return new ParallelConsumerOptionsProvider() {}; + } + @Bean(name = ParallelConsumerConfig.DEFAULT_BEAN_NAME) - public ParallelConsumerConfig parallelConsumerConfig() { - return new ParallelConsumerConfig(); + public ParallelConsumerConfig parallelConsumerConfig(ParallelConsumerOptionsProvider parallelConsumerOptionsProvider) { + return new ParallelConsumerConfig(parallelConsumerOptionsProvider); } @Bean(name = ParallelConsumerContext.DEFAULT_BEAN_NAME) - public ParallelConsumerContext parallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, + public ParallelConsumerContext parallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, ParallelConsumerCallback parallelConsumerCallback) { - return new ParallelConsumerContext(parallelConsumerConfig, - parallelConsumerCallback); + return new ParallelConsumerContext(parallelConsumerConfig, + parallelConsumerCallback); } @Bean(name = ParallelConsumerFactory.DEFAULT_BEAN_NAME) public ParallelConsumerFactory parallelConsumerFactory(DefaultKafkaConsumerFactory consumerFactory, DefaultKafkaProducerFactory producerFactory, ParallelConsumerContext parallelConsumerContext) { - return new ParallelConsumerFactory(parallelConsumerContext, consumerFactory, producerFactory); + return new ParallelConsumerFactory(parallelConsumerContext, consumerFactory, producerFactory); } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java index 2c01b2786c..4bb2579a13 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java @@ -16,17 +16,13 @@ package org.springframework.kafka.config; -import java.time.Duration; - import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; -import org.springframework.kafka.core.ParallelConsumerCallback; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback; -import io.confluent.parallelconsumer.JStreamParallelStreamProcessor; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder; import io.confluent.parallelconsumer.ParallelStreamProcessor; -import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode; /** * This class is for aggregating all related with ParallelConsumer. @@ -41,7 +37,7 @@ public class ParallelConsumerContext { private final ParallelConsumerConfig parallelConsumerConfig; private final ParallelConsumerCallback parallelConsumerCallback; - public ParallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, + public ParallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, ParallelConsumerCallback callback) { this.parallelConsumerConfig = parallelConsumerConfig; this.parallelConsumerCallback = callback; @@ -61,9 +57,4 @@ public ParallelConsumerOptions getParallelConsumerOptions(Consumer c return parallelConsumerConfig.toConsumerOptions(builder, consumer, producer); } - public void stop(ParallelStreamProcessor parallelStreamProcessor) { - parallelStreamProcessor.close(); - - } - } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java index 4cea9234c9..43b77c1df1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java @@ -16,29 +16,32 @@ package org.springframework.kafka.core; +import java.time.Duration; import java.util.Collection; import java.util.List; -import java.util.Objects; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.springframework.context.SmartLifecycle; import org.springframework.kafka.config.ParallelConsumerContext; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback; +import org.springframework.kafka.core.parallelconsumer.PollAndProduce; +import org.springframework.kafka.core.parallelconsumer.PollAndProduceMany; +import org.springframework.kafka.core.parallelconsumer.PollAndProduceManyResult; +import org.springframework.kafka.core.parallelconsumer.PollAndProduceResult; +import org.springframework.kafka.core.parallelconsumer.Poll; -import io.confluent.parallelconsumer.ParallelConsumer; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelStreamProcessor; -import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; +import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode; /** * ParallelConsumerFactory will be started and closed by Spring LifeCycle. * This class is quite simple, because ParallelConsumer requires delegating the situation to itself. * @author Sanghyeok An - * @since 3.2.0 + * @since 3.3 */ public class ParallelConsumerFactory implements SmartLifecycle { @@ -48,7 +51,6 @@ public class ParallelConsumerFactory implements SmartLifecycle { private final DefaultKafkaConsumerFactory defaultKafkaConsumerFactory; private final DefaultKafkaProducerFactory defaultKafkaProducerFactory; private final ParallelConsumerContext parallelConsumerContext; - private final ParallelStreamProcessor parallelConsumer; private final ParallelConsumerOptions parallelConsumerOptions; private boolean running; @@ -70,8 +72,8 @@ public ParallelConsumerFactory(ParallelConsumerContext parallelConsumerCon private ParallelConsumerOptions parallelConsumerOptions(Consumer consumer, Producer producer) { final ParallelConsumerCallback callback = parallelConsumerContext.parallelConsumerCallback(); - if (callback instanceof PollAndProduceManyCallback || - callback instanceof PollAndProduceCallback) { + if (callback instanceof PollAndProduceMany || + callback instanceof PollAndProduce) { return parallelConsumerContext.getParallelConsumerOptions(consumer, producer); } else { return parallelConsumerContext.getParallelConsumerOptions(consumer); @@ -85,35 +87,40 @@ public void start() { final ParallelConsumerCallback callback0 = parallelConsumerContext.parallelConsumerCallback(); if (callback0 instanceof ResultConsumerCallback) { - if (callback0 instanceof PollAndProduceManyResultCallback) { - final PollAndProduceManyResultCallback callback = - (PollAndProduceManyResultCallback) callback0; + if (callback0 instanceof PollAndProduceManyResult) { + final PollAndProduceManyResult callback = + (PollAndProduceManyResult) callback0; this.parallelConsumer.pollAndProduceMany(callback::accept, callback::resultConsumer); - } else if (callback0 instanceof PollAndProduceCallback) { - final PollAndProduceResultCallback callback = - (PollAndProduceResultCallback) callback0; + } + else if (callback0 instanceof PollAndProduce) { + final PollAndProduceResult callback = + (PollAndProduceResult) callback0; this.parallelConsumer.pollAndProduce(callback::accept, callback::resultConsumer); - } else { + } + else { throw new UnsupportedOperationException(); } } else { - if (callback0 instanceof PollAndProduceManyCallback) { - final PollAndProduceManyCallback callback = - (PollAndProduceManyCallback) callback0; + if (callback0 instanceof PollAndProduceMany) { + final PollAndProduceMany callback = + (PollAndProduceMany) callback0; this.parallelConsumer.pollAndProduceMany(callback::accept); - } else if (callback0 instanceof PollAndProduceCallback) { - final PollAndProduceCallback callback = - (PollAndProduceCallback) callback0; + } + else if (callback0 instanceof PollAndProduce) { + final PollAndProduce callback = + (PollAndProduce) callback0; this.parallelConsumer.pollAndProduce(callback::accept); - } else if (callback0 instanceof PollCallback) { - final PollCallback callback = (PollCallback) callback0; + } + else if (callback0 instanceof Poll) { + final Poll callback = (Poll) callback0; this.parallelConsumer.poll(callback::accept); - } else { + } + else { throw new UnsupportedOperationException(); } } @@ -122,7 +129,12 @@ public void start() { @Override public void stop() { - this.parallelConsumerContext.stop(this.parallelConsumer); + final ParallelConsumerCallback callback = + this.parallelConsumerContext.parallelConsumerCallback(); + final DrainingMode drainingMode = callback.drainingMode(); + final Duration duration = callback.drainTimeOut(); + + this.parallelConsumer.close(duration, drainingMode); this.running = false; } @@ -139,24 +151,28 @@ private void subscribe() { if (topics != null && !topics.isEmpty()) { subscribe(topics, rebalanceListener); - } else { + } + else { subscribe(callback.getSubscribeTopicsPattern(), rebalanceListener); } } - private void subscribe(Collection topics, ConsumerRebalanceListener callback){ - if (callback == null) { + private void subscribe(Collection topics, ConsumerRebalanceListener listenerCallback){ + if (listenerCallback == null) { this.parallelConsumer.subscribe(topics); - } else { - this.parallelConsumer.subscribe(topics, callback); + } + else { + this.parallelConsumer.subscribe(topics, listenerCallback); } } - private void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - if (callback == null) { + private void subscribe(Pattern pattern, ConsumerRebalanceListener listenerCallback) { + if (listenerCallback == null) { this.parallelConsumer.subscribe(pattern); - } else { - this.parallelConsumer.subscribe(pattern, callback); + } + else { + this.parallelConsumer.subscribe(pattern, listenerCallback); } } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java similarity index 76% rename from spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java index 6c3072a15f..29a995580f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java @@ -14,23 +14,25 @@ * limitations under the License. */ -package org.springframework.kafka.core; +package org.springframework.kafka.core.parallelconsumer; +import java.time.Duration; import java.util.List; import java.util.regex.Pattern; -import io.confluent.parallelconsumer.PollContext; - import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.config.ParallelConsumerContext; +import org.springframework.kafka.core.ParallelConsumerFactory; + +import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode; /** * User should create ConcreteClass of this and register it as Spring Bean. * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext}, * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start. - * @author ... - * @since 3.2.0 + * + * @author Sanghyeok An + * @since 3.3 */ public interface ParallelConsumerCallback { @@ -45,4 +47,12 @@ default Pattern getSubscribeTopicsPattern(){ default ConsumerRebalanceListener getRebalanceListener(){ return null; } + default DrainingMode drainingMode() { + return DrainingMode.DONT_DRAIN; + } + + default Duration drainTimeOut() { + return Duration.ofMillis(0); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java new file mode 100644 index 0000000000..09bfb13029 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java @@ -0,0 +1,168 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core.parallelconsumer; + +import java.time.Duration; +import java.util.function.Function; + +import javax.annotation.Nullable; + +import org.springframework.kafka.config.ParallelConsumerContext; +import org.springframework.kafka.core.ParallelConsumerFactory; + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode; +import io.confluent.parallelconsumer.ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy; +import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder; +import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; +import io.confluent.parallelconsumer.RecordContext; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; + +/** + * + * @author Sanghyeok An + * @since 3.3 + */ + +public interface ParallelConsumerOptionsProvider { + + default void hello() { + ParallelConsumerOptionsBuilder builder = ParallelConsumerOptions.builder(); + } + + @Nullable + default String managedExecutorService() { + return null; + } + + @Nullable + default String managedThreadFactory() { + return null; + } + + @Nullable + default MeterRegistry meterRegistry() { + return null; + } + + @Nullable + default String pcInstanceTag() { + return null; + } + + @Nullable + default Iterable metricsTags() { + return null; + } + + @Nullable + default Boolean allowEagerProcessingDuringTransactionCommit() { + return null; + } + + @Nullable + default Duration commitLockAcquisitionTimeout() { + return null; + } + + @Nullable + default Duration produceLockAcquisitionTimeout() { + return null; + } + + @Nullable + default Duration commitInterval() { + return null; + } + + @Nullable + default ProcessingOrder ordering() { + return null; + } + + @Nullable + default CommitMode commitMode() { + return null; + } + + @Nullable + default Integer maxConcurrency() { + return null; + } + + @Nullable + default InvalidOffsetMetadataHandlingPolicy invalidOffsetMetadataPolicy() { + return null; + } + + @Nullable + default Function, Duration> retryDelayProvider() { + return null; + } + + @Nullable + default Duration sendTimeout() { + return null; + } + + @Nullable + default Duration offsetCommitTimeout() { + return null; + } + + @Nullable + default Integer batchSize() { + return null; + } + + @Nullable + default Duration thresholdForTimeSpendInQueueWarning () { + return null; + } + + @Nullable + default Integer maxFailureHistory() { + return null; + } + + @Nullable + default Duration shutdownTimeout() { + return null; + } + + @Nullable + default Duration drainTimeout() { + return null; + } + + @Nullable + default Integer messageBufferSize() { + return null; + } + + @Nullable + default Integer initialLoadFactor() { + return null; + } + + @Nullable + default Integer maximumLoadFactor() { + return null; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java similarity index 80% rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java index 5b7e765dc9..bfa3f7dbd3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java @@ -14,14 +14,10 @@ * limitations under the License. */ -package org.springframework.kafka.core; +package org.springframework.kafka.core.parallelconsumer; -import java.util.List; -import java.util.regex.Pattern; - -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.config.ParallelConsumerContext; +import org.springframework.kafka.core.ParallelConsumerFactory; import io.confluent.parallelconsumer.PollContext; @@ -29,11 +25,12 @@ * User should create ConcreteClass of this and register it as Spring Bean. * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext}, * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start. - * @author ... - * @since 3.2.0 + * + * @author Sanghyeok An + * @since 3.3 */ -public interface PollCallback extends ParallelConsumerCallback { +public interface Poll extends ParallelConsumerCallback { /** * This is for {@link ParallelConsumerFactory} and {@link ParallelConsumerContext}. @@ -42,4 +39,5 @@ public interface PollCallback extends ParallelConsumerCallback { * @return void. */ void accept(PollContext context); + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java similarity index 72% rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java index 57f1f68732..1b00ed251d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java @@ -14,18 +14,24 @@ * limitations under the License. */ -package org.springframework.kafka.core; - -import java.util.function.Consumer; +package org.springframework.kafka.core.parallelconsumer; import org.apache.kafka.clients.producer.ProducerRecord; -import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; import io.confluent.parallelconsumer.PollContext; /** * ... + * + * @author Sanghyeok An + * @since 3.3 */ -public interface PollAndProduceResultCallback extends PollAndProduceCallback, ResultConsumerCallback { +public interface PollAndProduce extends ParallelConsumerCallback { + + /** + * ... + */ + ProducerRecord accept(PollContext context); + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java similarity index 73% rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java index 885991cf3c..2d46391c39 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java @@ -14,19 +14,26 @@ * limitations under the License. */ -package org.springframework.kafka.core; +package org.springframework.kafka.core.parallelconsumer; import java.util.List; -import java.util.function.Consumer; import org.apache.kafka.clients.producer.ProducerRecord; -import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; import io.confluent.parallelconsumer.PollContext; /** * ... + * + * @author Sanghyeok An + * @since 3.3 */ -public interface PollAndProduceManyResultCallback extends PollAndProduceManyCallback, ResultConsumerCallback { +public interface PollAndProduceMany extends ParallelConsumerCallback { + + /** + * ... + */ + List> accept(PollContext context); + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java similarity index 51% rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java index 3d6d27f253..b9be8d9db0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java @@ -14,27 +14,18 @@ * limitations under the License. */ -package org.springframework.kafka.core; +package org.springframework.kafka.core.parallelconsumer; -import java.util.List; -import java.util.function.Consumer; -import java.util.regex.Pattern; - -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.kafka.config.ParallelConsumerContext; - -import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; -import io.confluent.parallelconsumer.PollContext; +import org.springframework.kafka.core.ResultConsumerCallback; /** * ... + * + * @author Sanghyeok An + * @since 3.3 */ -public interface PollAndProduceCallback extends ParallelConsumerCallback { +public interface PollAndProduceManyResult extends PollAndProduceMany, + ResultConsumerCallback { - /** - * ... - */ - ProducerRecord accept(PollContext context); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java similarity index 50% rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java index 8868f9ffe5..fca8fe9d5a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java @@ -14,27 +14,18 @@ * limitations under the License. */ -package org.springframework.kafka.core; +package org.springframework.kafka.core.parallelconsumer; -import java.util.List; -import java.util.function.Consumer; -import java.util.regex.Pattern; - -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.kafka.config.ParallelConsumerContext; - -import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; -import io.confluent.parallelconsumer.PollContext; +import org.springframework.kafka.core.ResultConsumerCallback; /** * ... + * + * @author Sanghyeok An + * @since 3.3 */ -public interface PollAndProduceManyCallback extends ParallelConsumerCallback { +public interface PollAndProduceResult extends PollAndProduce, + ResultConsumerCallback { - /** - * ... - */ - List> accept(PollContext context); } From 989cef16a222dd02d4b598aa483ce7f797a26347 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sun, 28 Apr 2024 08:38:41 +0900 Subject: [PATCH 4/5] Add doc --- ...allelConsumerOptionsProviderCondition.java | 27 ++++++++++++++ .../kafka/config/ParallelConsumerConfig.java | 4 ++- .../config/ParallelConsumerConfiguration.java | 12 ++++--- .../kafka/config/ParallelConsumerContext.java | 17 ++++----- .../ParallelConsumerImportSelector.java | 9 +++-- .../kafka/core/ParallelConsumerFactory.java | 15 ++++---- .../kafka/core/ResultConsumerCallback.java | 9 ----- .../ParallelConsumerOptionsProvider.java | 17 +++++---- .../ParallelConsumerResultInterface.java | 35 +++++++++++++++++++ ...ava => ParallelConsumerRootInterface.java} | 14 ++++---- .../kafka/core/parallelconsumer/Poll.java | 13 ++++--- .../core/parallelconsumer/PollAndProduce.java | 12 +++++-- .../parallelconsumer/PollAndProduceMany.java | 12 +++++-- .../PollAndProduceManyResult.java | 18 ++++++++-- .../PollAndProduceResult.java | 18 ++++++++-- 15 files changed, 169 insertions(+), 63 deletions(-) delete mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerResultInterface.java rename spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/{ParallelConsumerCallback.java => ParallelConsumerRootInterface.java} (74%) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java b/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java index a383383755..615ac24ffc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java @@ -1,10 +1,37 @@ +/* + * Copyright 2016-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.kafka.config; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Condition; import org.springframework.context.annotation.ConditionContext; import org.springframework.core.type.AnnotatedTypeMetadata; import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider; +/** + * This class is to provide {@link ParallelConsumerOptionsProvider} as default. + * {@link ParallelConsumerConfiguration} use this function when {@link ApplicationContext} started. + * If there is no spring bean in {@link ApplicationContext} such as {@link ParallelConsumerOptionsProvider}, + * {@link ParallelConsumerConfiguration} will register default {@link ParallelConsumerOptionsProvider}. + * + * @author Sanghyeok An + * + * @since 3.3 + */ public class OnMissingParallelConsumerOptionsProviderCondition implements Condition { @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java index 8cb5557f35..a7eb9734b9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java @@ -30,7 +30,9 @@ /** * ParallelConsumerConfig is for config of {@link io.confluent.parallelconsumer}. * This will be registered as Spring Bean when {@link EnableParallelConsumer} is annotated to your spring application. - * @author ... + * + * @author Sanghyeok An + * * @since 3.3 */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java index da5e556826..ea6557fde0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java @@ -20,7 +20,7 @@ import org.springframework.context.annotation.Conditional; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerRootInterface; import org.springframework.kafka.core.ParallelConsumerFactory; import org.springframework.kafka.annotation.EnableParallelConsumer; import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider; @@ -30,9 +30,11 @@ * If so, this class will register both {@link ParallelConsumerContext} and {@link ParallelConsumerFactory} as Spring Bean. * User has responsibility * 1. annotated {@link EnableParallelConsumer} on their spring application - * 2. register ConcreteClass of {@link ParallelConsumerCallback}. - * @author ... - * @since 3.2.0 + * 2. register ConcreteClass of {@link ParallelConsumerRootInterface}. + * + * @author Sanghyoek An + * + * @since 3.3 */ public class ParallelConsumerConfiguration { @@ -50,7 +52,7 @@ public ParallelConsumerConfig parallelConsumerConfig(ParallelConsumerOptio @Bean(name = ParallelConsumerContext.DEFAULT_BEAN_NAME) public ParallelConsumerContext parallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, - ParallelConsumerCallback parallelConsumerCallback) { + ParallelConsumerRootInterface parallelConsumerCallback) { return new ParallelConsumerContext(parallelConsumerConfig, parallelConsumerCallback); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java index 4bb2579a13..205be1e5f2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java @@ -18,16 +18,17 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; -import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerRootInterface; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder; -import io.confluent.parallelconsumer.ParallelStreamProcessor; /** - * This class is for aggregating all related with ParallelConsumer. - * @author ... - * @since 3.2.0 + * This class is for collecting all related with ParallelConsumer. + * + * @author Sanghyeok An + * + * @since 3.3 */ @@ -35,15 +36,15 @@ public class ParallelConsumerContext { public static final String DEFAULT_BEAN_NAME = "parallelConsumerContext"; private final ParallelConsumerConfig parallelConsumerConfig; - private final ParallelConsumerCallback parallelConsumerCallback; + private final ParallelConsumerRootInterface parallelConsumerCallback; public ParallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, - ParallelConsumerCallback callback) { + ParallelConsumerRootInterface callback) { this.parallelConsumerConfig = parallelConsumerConfig; this.parallelConsumerCallback = callback; } - public ParallelConsumerCallback parallelConsumerCallback() { + public ParallelConsumerRootInterface parallelConsumerCallback() { return this.parallelConsumerCallback; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java index 9cc32b013c..938addcae1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java @@ -19,12 +19,15 @@ import org.springframework.context.annotation.ImportSelector; import org.springframework.core.type.AnnotationMetadata; import org.springframework.kafka.annotation.EnableParallelConsumer; + /** * ParallelConsumerImportSelector is to register {@link ParallelConsumerConfiguration}. * If you want to import {@link ParallelConsumerConfiguration} to your application, * you just annotated {@link EnableParallelConsumer} to your spring application. - * @author ... - * @since 3.2.0 + * + * @author Sanghyeok An + * + * @since 3.3 */ public class ParallelConsumerImportSelector implements ImportSelector { @@ -32,4 +35,4 @@ public class ParallelConsumerImportSelector implements ImportSelector { public String[] selectImports(AnnotationMetadata importingClassMetadata) { return new String[]{ParallelConsumerConfiguration.class.getName()}; } -} \ No newline at end of file +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java index 43b77c1df1..5a22685f15 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java @@ -26,12 +26,13 @@ import org.apache.kafka.clients.producer.Producer; import org.springframework.context.SmartLifecycle; import org.springframework.kafka.config.ParallelConsumerContext; -import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerRootInterface; import org.springframework.kafka.core.parallelconsumer.PollAndProduce; import org.springframework.kafka.core.parallelconsumer.PollAndProduceMany; import org.springframework.kafka.core.parallelconsumer.PollAndProduceManyResult; import org.springframework.kafka.core.parallelconsumer.PollAndProduceResult; import org.springframework.kafka.core.parallelconsumer.Poll; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerResultInterface; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelStreamProcessor; @@ -40,7 +41,9 @@ /** * ParallelConsumerFactory will be started and closed by Spring LifeCycle. * This class is quite simple, because ParallelConsumer requires delegating the situation to itself. + * * @author Sanghyeok An + * * @since 3.3 */ @@ -71,7 +74,7 @@ public ParallelConsumerFactory(ParallelConsumerContext parallelConsumerCon private ParallelConsumerOptions parallelConsumerOptions(Consumer consumer, Producer producer) { - final ParallelConsumerCallback callback = parallelConsumerContext.parallelConsumerCallback(); + final ParallelConsumerRootInterface callback = parallelConsumerContext.parallelConsumerCallback(); if (callback instanceof PollAndProduceMany || callback instanceof PollAndProduce) { return parallelConsumerContext.getParallelConsumerOptions(consumer, producer); @@ -84,9 +87,9 @@ private ParallelConsumerOptions parallelConsumerOptions(Consumer con public void start() { subscribe(); - final ParallelConsumerCallback callback0 = parallelConsumerContext.parallelConsumerCallback(); + final ParallelConsumerRootInterface callback0 = parallelConsumerContext.parallelConsumerCallback(); - if (callback0 instanceof ResultConsumerCallback) { + if (callback0 instanceof ParallelConsumerResultInterface) { if (callback0 instanceof PollAndProduceManyResult) { final PollAndProduceManyResult callback = (PollAndProduceManyResult) callback0; @@ -129,7 +132,7 @@ else if (callback0 instanceof Poll) { @Override public void stop() { - final ParallelConsumerCallback callback = + final ParallelConsumerRootInterface callback = this.parallelConsumerContext.parallelConsumerCallback(); final DrainingMode drainingMode = callback.drainingMode(); final Duration duration = callback.drainTimeOut(); @@ -144,7 +147,7 @@ public boolean isRunning() { } private void subscribe() { - final ParallelConsumerCallback callback = this.parallelConsumerContext.parallelConsumerCallback(); + final ParallelConsumerRootInterface callback = this.parallelConsumerContext.parallelConsumerCallback(); final List topics = callback.getTopics(); final ConsumerRebalanceListener rebalanceListener = callback.getRebalanceListener(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java deleted file mode 100644 index 07a32cf437..0000000000 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ResultConsumerCallback.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.springframework.kafka.core; - -import java.util.function.Consumer; - -import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; - -public interface ResultConsumerCallback { - Consumer> resultConsumer(ConsumeProduceResult result); -} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java index 09bfb13029..5d56e0c6a5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java @@ -21,30 +21,29 @@ import javax.annotation.Nullable; -import org.springframework.kafka.config.ParallelConsumerContext; -import org.springframework.kafka.core.ParallelConsumerFactory; - -import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelConsumer; import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode; import io.confluent.parallelconsumer.ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy; -import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder; import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; import io.confluent.parallelconsumer.RecordContext; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; /** + * User can configure options of {@link ParallelConsumer} via {@link ParallelConsumerOptionsProvider}. + * If user want to configure options of {@link ParallelConsumer}, user should implement {@link ParallelConsumerOptionsProvider} + * and register it as spring bean. + * + * User don't need to implement all of methods. + * Note : If a method returns null, that option will use the default value of the {@link ParallelConsumer}. * * @author Sanghyeok An + * * @since 3.3 */ public interface ParallelConsumerOptionsProvider { - default void hello() { - ParallelConsumerOptionsBuilder builder = ParallelConsumerOptions.builder(); - } - @Nullable default String managedExecutorService() { return null; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerResultInterface.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerResultInterface.java new file mode 100644 index 0000000000..481065d910 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerResultInterface.java @@ -0,0 +1,35 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core.parallelconsumer; + +import java.util.function.Consumer; + +import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; + +/** + * This interface is an interface that marks whether there is a Callback for {@link ConsumeProduceResult}. + * Users should implement one of {@link Poll}, {@link PollAndProduce}, {@link PollAndProduceResult} + * , {@link PollAndProduceMany}, {@link PollAndProduceManyResult} instead of {@link ParallelConsumerResultInterface}. + * + * @author Sanghyeok An + * + * @since 3.3 + */ + +public interface ParallelConsumerResultInterface { + Consumer> resultConsumer(ConsumeProduceResult result); +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerRootInterface.java similarity index 74% rename from spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerRootInterface.java index 29a995580f..9b6380c240 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerRootInterface.java @@ -21,21 +21,21 @@ import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.springframework.kafka.config.ParallelConsumerContext; -import org.springframework.kafka.core.ParallelConsumerFactory; import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode; /** - * User should create ConcreteClass of this and register it as Spring Bean. - * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext}, - * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start. - * + * This interface provides a common interface for sub-interfaces. + * Users should not implement this interface. + * Users should implement one of {@link Poll}, {@link PollAndProduce}, {@link PollAndProduceResult} + * , {@link PollAndProduceMany}, {@link PollAndProduceManyResult} instead of {@link ParallelConsumerRootInterface}. + * @author Sanghyeok An + * * @since 3.3 */ -public interface ParallelConsumerCallback { +public interface ParallelConsumerRootInterface { /** * ... diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java index bfa3f7dbd3..f3a399d979 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java @@ -16,21 +16,26 @@ package org.springframework.kafka.core.parallelconsumer; +import java.util.function.Consumer; + import org.springframework.kafka.config.ParallelConsumerContext; import org.springframework.kafka.core.ParallelConsumerFactory; +import io.confluent.parallelconsumer.ParallelStreamProcessor; import io.confluent.parallelconsumer.PollContext; /** - * User should create ConcreteClass of this and register it as Spring Bean. - * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext}, - * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start. + * This interface is intended for use when the user does not use the producer after consuming. + * User should implement {@link Poll} and register it as Spring Bean. + * {@link Poll#accept(PollContext)} will be called by {@link ParallelStreamProcessor#poll(Consumer)} + * when {@link ParallelConsumerFactory} started. * * @author Sanghyeok An + * * @since 3.3 */ -public interface Poll extends ParallelConsumerCallback { +public interface Poll extends ParallelConsumerRootInterface { /** * This is for {@link ParallelConsumerFactory} and {@link ParallelConsumerContext}. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java index 1b00ed251d..1eb171d24e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java @@ -16,18 +16,26 @@ package org.springframework.kafka.core.parallelconsumer; +import java.util.function.Function; + import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.core.ParallelConsumerFactory; +import io.confluent.parallelconsumer.ParallelStreamProcessor; import io.confluent.parallelconsumer.PollContext; /** - * ... + * This interface is intended for use when the user use the producer after consuming. + * User should implement {@link PollAndProduce} and register it as Spring Bean. + * {@link PollAndProduce#accept(PollContext)} will be called by {@link ParallelStreamProcessor#pollAndProduce(Function)} + * when {@link ParallelConsumerFactory} started. * * @author Sanghyeok An + * * @since 3.3 */ -public interface PollAndProduce extends ParallelConsumerCallback { +public interface PollAndProduce extends ParallelConsumerRootInterface { /** * ... diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java index 2d46391c39..5d7f20c04f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java @@ -17,19 +17,25 @@ package org.springframework.kafka.core.parallelconsumer; import java.util.List; +import java.util.function.Function; import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.core.ParallelConsumerFactory; +import io.confluent.parallelconsumer.ParallelStreamProcessor; import io.confluent.parallelconsumer.PollContext; /** - * ... - * + * This interface is intended for use when the user use the producer after consuming. + * User should implement {@link PollAndProduceMany} and register it as Spring Bean. + * {@link PollAndProduceMany#accept(PollContext)} will be called by {@link ParallelStreamProcessor#pollAndProduceMany(Function)} + * when {@link ParallelConsumerFactory} started. * @author Sanghyeok An + * * @since 3.3 */ -public interface PollAndProduceMany extends ParallelConsumerCallback { +public interface PollAndProduceMany extends ParallelConsumerRootInterface { /** * ... diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java index b9be8d9db0..810c8af97b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java @@ -16,16 +16,28 @@ package org.springframework.kafka.core.parallelconsumer; -import org.springframework.kafka.core.ResultConsumerCallback; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.springframework.kafka.core.ParallelConsumerFactory; + +import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; +import io.confluent.parallelconsumer.PollContext; /** - * ... + * This interface is intended for use when the user use the producer after consuming. + * User should implement {@link PollAndProduce} and register it as Spring Bean. + * {@link PollAndProduceManyResult#accept(PollContext)} and {@link PollAndProduceManyResult#resultConsumer(ConsumeProduceResult)} + * will be called by {@link ParallelStreamProcessor#pollAndProduceMany(Function, Consumer)} + * when {@link ParallelConsumerFactory} started. * * @author Sanghyeok An + * * @since 3.3 */ public interface PollAndProduceManyResult extends PollAndProduceMany, - ResultConsumerCallback { + ParallelConsumerResultInterface { } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java index fca8fe9d5a..fbf9c4c48c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java @@ -16,16 +16,28 @@ package org.springframework.kafka.core.parallelconsumer; -import org.springframework.kafka.core.ResultConsumerCallback; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.springframework.kafka.core.ParallelConsumerFactory; + +import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; +import io.confluent.parallelconsumer.PollContext; /** - * ... + * This interface is intended for use when the user use the producer after consuming. + * User should implement {@link PollAndProduceResult} and register it as Spring Bean. + * Both {@link PollAndProduceResult#accept(PollContext)} {@link PollAndProduceResult#resultConsumer(ConsumeProduceResult)} + * will be called by {@link ParallelStreamProcessor#pollAndProduce(Function, Consumer)} + * when {@link ParallelConsumerFactory} started. * * @author Sanghyeok An + * * @since 3.3 */ public interface PollAndProduceResult extends PollAndProduce, - ResultConsumerCallback { + ParallelConsumerResultInterface { } From 1790392bbff4df1134a07ef5ba8efcf466872d92 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sun, 28 Apr 2024 08:39:18 +0900 Subject: [PATCH 5/5] fix miss typo --- .../kafka/annotation/EnableParallelConsumer.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java index 6966edbde3..0cf4e7a5e5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java @@ -28,12 +28,14 @@ /** * If you want to import {@link ParallelConsumerConfiguration} to your application, * you just annotated {@link EnableParallelConsumer} to your spring application. - * @author ... - * @since 3.2.0 + * + * @author Sanghyeok An + * + * @since 3.3 */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(ParallelConsumerImportSelector.class) public @interface EnableParallelConsumer { -} \ No newline at end of file +}