From 32674a83addd831024c72eafe9f92de309735d48 Mon Sep 17 00:00:00 2001 From: Devin Bost Date: Tue, 6 Apr 2021 00:25:33 -0600 Subject: [PATCH] Issue 9986 - Enabled custom function producer behavior --- .../RoundRobinPartitionMessageRouterImpl.java | 2 +- .../common/functions/ProducerConfig.java | 59 +++++++ .../functions/instance/ContextImpl.java | 70 +++++--- .../instance/FunctionResultRouter.java | 20 ++- .../instance/FunctionRouterClock.java | 39 +++++ .../instance/JavaInstanceRunnable.java | 12 +- .../functions/instance/PulsarCluster.java | 13 +- .../pulsar/functions/sink/PulsarSink.java | 57 ++++++- .../functions/instance/ContextImplTest.java | 14 ++ .../pulsar/functions/sink/PulsarSinkTest.java | 21 ++- .../proto/src/main/proto/Function.proto | 26 +++ .../functions/utils/FunctionConfigUtils.java | 5 + .../functions/utils/ProducerConfigUtils.java | 28 ++- .../ProducerConfigFromProtobufConverter.java | 127 ++++++++++++++ .../ProducerConfigToProtobufConverter.java | 87 ++++++++++ .../utils/FunctionConfigUtilsTest.java | 160 ++++++++++++++++++ .../utils/SourceConfigUtilsTest.java | 10 ++ .../worker/rest/api/FunctionsImplTest.java | 15 ++ 18 files changed, 710 insertions(+), 55 deletions(-) create mode 100644 pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionRouterClock.java create mode 100644 pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/ProducerConfigFromProtobufConverter.java create mode 100644 pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/ProducerConfigToProtobufConverter.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java index d293ad4b94314..4cf3d83549533 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java @@ -45,7 +45,7 @@ public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase { private volatile int partitionIndex = 0; private final int startPtnIdx; - private final boolean isBatchingEnabled; + protected boolean isBatchingEnabled; private final long partitionSwitchMs; private final Clock clock; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java index 0d2f4b48b19be..a5c9ec19cc3b8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java @@ -23,6 +23,9 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.HashingScheme; +import org.apache.pulsar.client.api.MessageRoutingMode; /** * Configuration of the producer inside the function. @@ -38,4 +41,60 @@ public class ProducerConfig { private Boolean useThreadLocalProducers; private CryptoConfig cryptoConfig; private String batchBuilder; + + public Boolean batchingDisabled; + public Boolean chunkingEnabled; + public Boolean blockIfQueueFullDisabled; + public CompressionType compressionType; + public HashingScheme hashingScheme; + public MessageRoutingMode messageRoutingMode; + public Long batchingMaxPublishDelay; + public Boolean getBatchingEnabled(){ + return !this.getBatchingDisabled(); + } + public Boolean getBlockIfQueueFullEnabled() { return !this.getBlockIfQueueFullDisabled();} + public ProducerConfig merge(ProducerConfig newConfig){ + ProducerConfig mergedConfig = new ProducerConfig(); + if(newConfig == null) { + mergedConfig = this; + } + else { + if(newConfig.getBatchingDisabled() != null){ + mergedConfig.setBatchingDisabled(newConfig.getBatchingDisabled()); + } else { + mergedConfig.setBatchingDisabled(this.getBatchingDisabled()); + } + if(newConfig.getChunkingEnabled() != null){ + mergedConfig.setChunkingEnabled(newConfig.getChunkingEnabled()); + } else { + mergedConfig.setChunkingEnabled(this.getChunkingEnabled()); + } + if(newConfig.getBlockIfQueueFullDisabled() != null){ + mergedConfig.setBlockIfQueueFullDisabled(newConfig.getBlockIfQueueFullDisabled()); + } else { + mergedConfig.setBlockIfQueueFullDisabled(this.getBlockIfQueueFullDisabled()); + } + if(newConfig.getCompressionType() != null){ + mergedConfig.setCompressionType(newConfig.getCompressionType()); + } else { + mergedConfig.setCompressionType(this.getCompressionType()); + } + if(newConfig.getHashingScheme() != null){ + mergedConfig.setHashingScheme(newConfig.getHashingScheme()); + } else { + mergedConfig.setHashingScheme(this.getHashingScheme()); + } + if (newConfig.getMessageRoutingMode() != null) { + mergedConfig.setMessageRoutingMode(newConfig.getMessageRoutingMode()); + } else { + mergedConfig.setMessageRoutingMode(this.getMessageRoutingMode()); + } + if(newConfig.getBatchingMaxPublishDelay() != null){ + mergedConfig.setBatchingMaxPublishDelay(newConfig.getBatchingMaxPublishDelay()); + } else { + mergedConfig.setBatchingMaxPublishDelay(this.getBatchingMaxPublishDelay()); + } + } + return mergedConfig; + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index e927332538661..42530ec867aed 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -64,6 +64,7 @@ import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.utils.ProducerConfigUtils; +import org.apache.pulsar.functions.utils.functions.ProducerConfigFromProtobufConverter; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.SourceContext; import org.slf4j.Logger; @@ -480,27 +481,54 @@ private Producer getProducer(String pulsarName, String topicName, Schema< if (producer == null) { - Producer newProducer = ((ProducerBuilderImpl) pulsar.getProducerBuilder().clone()) - .schema(schema) - .blockIfQueueFull(true) - .enableBatching(true) - .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) - .compressionType(CompressionType.LZ4) - .hashingScheme(HashingScheme.Murmur3_32Hash) // - .messageRoutingMode(MessageRoutingMode.CustomPartition) - .messageRouter(FunctionResultRouter.of()) - // set send timeout to be infinity to prevent potential deadlock with consumer - // that might happen when consumer is blocked due to unacked messages - .sendTimeout(0, TimeUnit.SECONDS) - .topic(topicName) - .properties(InstanceUtils.getProperties(componentType, - FunctionCommon.getFullyQualifiedName( - this.config.getFunctionDetails().getTenant(), - this.config.getFunctionDetails().getNamespace(), - this.config.getFunctionDetails().getName()), - this.config.getInstanceId())) - .create(); - + Producer newProducer = null; + + if (this.config.getFunctionDetails() != null && this.config.getFunctionDetails().getSink() != null && + this.config.getFunctionDetails().getSink().getProducerSpec() != null){ + Function.ProducerSpec producerSpec = this.config.getFunctionDetails().getSink().getProducerSpec(); + ProducerConfigFromProtobufConverter converter = new ProducerConfigFromProtobufConverter(producerSpec); + newProducer = ((ProducerBuilderImpl) pulsar.getProducerBuilder().clone()) + .schema(schema) + .blockIfQueueFull(converter.getBlockIfQueueFullEnabled()) + .enableBatching(converter.getBatchingEnabled()) + .batchingMaxPublishDelay(converter.getBatchingMaxPublishDelay(), TimeUnit.MILLISECONDS) + .compressionType(converter.getCompressionType()) + .hashingScheme(converter.getHashingScheme()) // + .messageRoutingMode(converter.getMessageRoutingMode()) + .messageRouter(FunctionResultRouter.of(converter.getBatchingEnabled())) + // set send timeout to be infinity to prevent potential deadlock with consumer + // that might happen when consumer is blocked due to unacked messages + .sendTimeout(0, TimeUnit.SECONDS) + .topic(topicName) + .properties(InstanceUtils.getProperties(componentType, + FunctionCommon.getFullyQualifiedName( + this.config.getFunctionDetails().getTenant(), + this.config.getFunctionDetails().getNamespace(), + this.config.getFunctionDetails().getName()), + this.config.getInstanceId())) + .create(); + } else { + newProducer = ((ProducerBuilderImpl) pulsar.getProducerBuilder().clone()) + .schema(schema) + .blockIfQueueFull(true) + .enableBatching(true) + .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) + .compressionType(CompressionType.LZ4) + .hashingScheme(HashingScheme.Murmur3_32Hash) // + .messageRoutingMode(MessageRoutingMode.CustomPartition) + .messageRouter(FunctionResultRouter.of(true)) + // set send timeout to be infinity to prevent potential deadlock with consumer + // that might happen when consumer is blocked due to unacked messages + .sendTimeout(0, TimeUnit.SECONDS) + .topic(topicName) + .properties(InstanceUtils.getProperties(componentType, + FunctionCommon.getFullyQualifiedName( + this.config.getFunctionDetails().getTenant(), + this.config.getFunctionDetails().getNamespace(), + this.config.getFunctionDetails().getName()), + this.config.getInstanceId())) + .create(); + } if (pulsar.getTlPublishProducers() != null) { pulsar.getTlPublishProducers().get().put(topicName, newProducer); } else { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java index aeeaf6c2be64c..e672e52b3f1b2 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java @@ -26,16 +26,17 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.TopicMetadata; import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl; +import org.apache.pulsar.common.functions.ProducerConfig; /** * Router for routing function results. */ public class FunctionResultRouter extends RoundRobinPartitionMessageRouterImpl { - private static final FunctionResultRouter INSTANCE = new FunctionResultRouter(); + private static final FunctionRouterClock clock = FunctionRouterClock.of(); - public FunctionResultRouter() { - this(Math.abs(ThreadLocalRandom.current().nextInt()), Clock.systemUTC()); + public FunctionResultRouter(boolean isBatchingEnabled) { + this(clock.getPtnIdx(), clock.getTime(), isBatchingEnabled); } @VisibleForTesting @@ -48,8 +49,17 @@ public FunctionResultRouter(int startPtnIdx, Clock clock) { clock); } - public static FunctionResultRouter of() { - return INSTANCE; + public FunctionResultRouter(int startPtnIdx, Clock clock, boolean isBatchingEnabled) { + super( + HashingScheme.Murmur3_32Hash, + startPtnIdx, + isBatchingEnabled, + 1, + clock); + } + + public static FunctionResultRouter of(boolean isBatchingEnabled) { + return new FunctionResultRouter(isBatchingEnabled); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionRouterClock.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionRouterClock.java new file mode 100644 index 0000000000000..5f360a514f864 --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionRouterClock.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import lombok.Getter; + +import java.time.Clock; +import java.util.concurrent.ThreadLocalRandom; + +public class FunctionRouterClock { + private static final FunctionRouterClock INSTANCE = new FunctionRouterClock(); + + @Getter + private final int PtnIdx = Math.abs(ThreadLocalRandom.current().nextInt()); + + @Getter + private final Clock time = Clock.systemUTC(); + + public static FunctionRouterClock of(){ + return INSTANCE; + } + +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index d761ccb108ec0..23d6cebd53e95 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -68,6 +68,7 @@ import org.apache.pulsar.functions.source.batch.BatchSourceExecutor; import org.apache.pulsar.functions.utils.CryptoUtils; import org.apache.pulsar.functions.utils.FunctionCommon; +import org.apache.pulsar.functions.utils.functions.ProducerConfigFromProtobufConverter; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; import org.slf4j.Logger; @@ -744,13 +745,10 @@ private void setupOutput(ContextImpl contextImpl) throws Exception { pulsarSinkConfig.setSchemaProperties(sinkSpec.getSchemaPropertiesMap()); if (this.instanceConfig.getFunctionDetails().getSink().getProducerSpec() != null) { - org.apache.pulsar.functions.proto.Function.ProducerSpec conf = this.instanceConfig.getFunctionDetails().getSink().getProducerSpec(); - ProducerConfig.ProducerConfigBuilder builder = ProducerConfig.builder() - .maxPendingMessages(conf.getMaxPendingMessages()) - .maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions()) - .useThreadLocalProducers(conf.getUseThreadLocalProducers()) - .cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec())); - pulsarSinkConfig.setProducerConfig(builder.build()); + org.apache.pulsar.functions.proto.Function.ProducerSpec spec = this.instanceConfig.getFunctionDetails().getSink().getProducerSpec(); + ProducerConfigFromProtobufConverter converter = new ProducerConfigFromProtobufConverter(spec); + ProducerConfig producerConfig = converter.getProducerConfig(); + pulsarSinkConfig.setProducerConfig(producerConfig); } object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarCluster.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarCluster.java index 41496f0ee872d..b7c810434d444 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarCluster.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarCluster.java @@ -56,10 +56,13 @@ public PulsarCluster(PulsarClient client, PulsarAdmin adminClient, ProducerSpec this.client = client; this.adminClient = adminClient; this.topicSchema = new TopicSchema(client); - this.producerBuilder = (ProducerBuilderImpl) client.newProducer().blockIfQueueFull(true).enableBatching(true) - .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS); boolean useThreadLocalProducers = false; if (producerSpec != null) { + this.producerBuilder = (ProducerBuilderImpl) client.newProducer() + .blockIfQueueFull(!producerSpec.getBlockIfQueueFullDisabled()) + .enableBatching(!producerSpec.getBatchingDisabled()) + .batchingMaxPublishDelay(producerSpec.getBatchingMaxPublishDelay() == 0L ? 1L : + producerSpec.getBatchingMaxPublishDelay(), TimeUnit.MILLISECONDS); if (producerSpec.getMaxPendingMessages() != 0) { this.producerBuilder.maxPendingMessages(producerSpec.getMaxPendingMessages()); } @@ -75,6 +78,12 @@ public PulsarCluster(PulsarClient client, PulsarAdmin adminClient, ProducerSpec } useThreadLocalProducers = producerSpec.getUseThreadLocalProducers(); } + else { + this.producerBuilder = (ProducerBuilderImpl) client.newProducer() + .blockIfQueueFull(true) + .enableBatching(true) + .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS); + } if (useThreadLocalProducers) { tlPublishProducers = new ThreadLocal<>(); } else { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 80d82a12df60e..879bb40e9d34a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -108,13 +108,6 @@ protected PulsarSinkProcessorBase(Schema schema, Crypto crypto) { public Producer createProducer(PulsarClient client, String topic, String producerName, Schema schema) throws PulsarClientException { ProducerBuilder builder = client.newProducer(schema) - .blockIfQueueFull(true) - .enableBatching(true) - .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) - .compressionType(CompressionType.LZ4) - .hashingScheme(HashingScheme.Murmur3_32Hash) // - .messageRoutingMode(MessageRoutingMode.CustomPartition) - .messageRouter(FunctionResultRouter.of()) // set send timeout to be infinity to prevent potential deadlock with consumer // that might happen when consumer is blocked due to unacked messages .sendTimeout(0, TimeUnit.SECONDS) @@ -124,10 +117,48 @@ public Producer createProducer(PulsarClient client, String topic, String prod } if (pulsarSinkConfig.getProducerConfig() != null) { ProducerConfig producerConfig = pulsarSinkConfig.getProducerConfig(); - if (producerConfig.getMaxPendingMessages() != 0) { + if(producerConfig.getBatchingEnabled() != null){ + builder.enableBatching(producerConfig.getBatchingEnabled()); + builder.messageRouter(FunctionResultRouter.of(producerConfig.getBatchingEnabled())); + } else { + builder.enableBatching(true); + builder.messageRouter(FunctionResultRouter.of(true)); + } + if(producerConfig.getChunkingEnabled() != null){ + builder.enableChunking(producerConfig.getChunkingEnabled()); + } else { + builder.enableChunking(false); + } + if(producerConfig.getBlockIfQueueFullEnabled() != null){ + builder.blockIfQueueFull(producerConfig.getBlockIfQueueFullEnabled()); + } else { + builder.blockIfQueueFull(true); + } + if(producerConfig.getBatchingMaxPublishDelay() != null){ + builder.batchingMaxPublishDelay(producerConfig.getBatchingMaxPublishDelay(), TimeUnit.MILLISECONDS); + } else { + builder.batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS); + } + if(producerConfig.getCompressionType() != null){ + builder.compressionType(producerConfig.getCompressionType()); + } else { + builder.compressionType(CompressionType.LZ4); + } + if(producerConfig.getHashingScheme() != null){ + builder.hashingScheme(producerConfig.getHashingScheme()); + } else { + builder.hashingScheme(HashingScheme.Murmur3_32Hash); + } + if(producerConfig.getMessageRoutingMode() != null){ + builder.messageRoutingMode(producerConfig.getMessageRoutingMode()); + } else { + builder.messageRoutingMode(MessageRoutingMode.CustomPartition); + } + if (producerConfig.getMaxPendingMessages() != null && producerConfig.getMaxPendingMessages() != 0) { builder.maxPendingMessages(producerConfig.getMaxPendingMessages()); } - if (producerConfig.getMaxPendingMessagesAcrossPartitions() != 0) { + if (producerConfig.getMaxPendingMessagesAcrossPartitions() != null && + producerConfig.getMaxPendingMessagesAcrossPartitions() != 0) { builder.maxPendingMessagesAcrossPartitions(producerConfig.getMaxPendingMessagesAcrossPartitions()); } if (producerConfig.getCryptoConfig() != null) { @@ -144,6 +175,14 @@ public Producer createProducer(PulsarClient client, String topic, String prod builder.batcherBuilder(BatcherBuilder.DEFAULT); } } + } else { + builder.blockIfQueueFull(true) + .enableBatching(true) + .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) + .compressionType(CompressionType.LZ4) + .hashingScheme(HashingScheme.Murmur3_32Hash) // + .messageRoutingMode(MessageRoutingMode.CustomPartition) + .messageRouter(FunctionResultRouter.of(true)); } return builder.properties(properties).create(); } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index ceb87c3197387..16ce5ec715423 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.state.BKStateStoreImpl; import org.apache.pulsar.functions.instance.state.InstanceStateManager; +import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; import org.slf4j.Logger; @@ -72,8 +73,21 @@ public class ContextImplTest { public void setup() { config = new InstanceConfig(); config.setExposePulsarAdminClientEnabled(true); + Function.ProducerSpec producerSpec = Function.ProducerSpec.newBuilder() + .setBatchingDisabled(false) + .setChunkingEnabled(false) + .setBlockIfQueueFullDisabled(false) + .setCompressionType(Function.CompressionType.LZ4) + .setHashingScheme(Function.HashingScheme.MURMUR3_32HASH) + .setMessageRoutingMode(Function.MessageRoutingMode.CUSTOM_PARTITION) + .setBatchingMaxPublishDelay(10L) // This is the default case. + .build(); + Function.SinkSpec sink = Function.SinkSpec.newBuilder() + .setProducerSpec(producerSpec) + .build(); FunctionDetails functionDetails = FunctionDetails.newBuilder() .setUserConfig("") + .setSink(sink) .build(); config.setFunctionDetails(functionDetails); logger = mock(Logger.class); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index 5e52c89f5fcf3..8c346194440c7 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -45,15 +45,7 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerBuilder; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericRecordBuilder; import org.apache.pulsar.client.api.schema.GenericSchema; @@ -63,6 +55,7 @@ import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees; +import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.SerDe; @@ -149,6 +142,16 @@ private static PulsarSinkConfig getPulsarConfigs() { pulsarConfig.setTopic(TOPIC); pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE); pulsarConfig.setTypeClassName(String.class.getName()); + ProducerConfig producerConfig = new ProducerConfig(); + producerConfig.setBatchingDisabled(false); + producerConfig.setChunkingEnabled(false); + producerConfig.setBlockIfQueueFullDisabled(true); + producerConfig.setCompressionType(CompressionType.SNAPPY); + producerConfig.setHashingScheme(HashingScheme.Murmur3_32Hash); + producerConfig.setMessageRoutingMode(MessageRoutingMode.CustomPartition); + producerConfig.setBatchingMaxPublishDelay(12L); + + pulsarConfig.setProducerConfig(producerConfig); return pulsarConfig; } diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 77396eb017059..0ae016ad5511b 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -40,6 +40,25 @@ enum SubscriptionPosition { EARLIEST = 1; } +enum CompressionType { + LZ4 = 0; + NONE = 1; + ZLIB = 2; + ZSTD = 3; + SNAPPY = 4; +} + +enum HashingScheme { + MURMUR3_32HASH = 0; + JAVA_STRING_HASH = 1; +} + +enum MessageRoutingMode { + CUSTOM_PARTITION = 0; + SINGLE_PARTITION = 1; + ROUND_ROBIN_PARTITION = 2; +} + message Resources { double cpu = 1; int64 ram = 2; @@ -110,6 +129,13 @@ message ProducerSpec { bool useThreadLocalProducers = 3; CryptoSpec cryptoSpec = 4; string batchBuilder = 5; + bool batchingDisabled = 6; + bool chunkingEnabled = 7; + bool blockIfQueueFullDisabled = 8; + CompressionType compressionType = 9; + HashingScheme hashingScheme = 10; + MessageRoutingMode messageRoutingMode = 11; + int64 batchingMaxPublishDelay = 12; } message CryptoSpec { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index c6a3cada08520..aaccf0c1e8aeb 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -944,6 +944,11 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) { mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions()); } + if(existingConfig.getProducerConfig() != null){ + mergedConfig.setProducerConfig(existingConfig.getProducerConfig().merge(newConfig.getProducerConfig())); + } else { + mergedConfig.setProducerConfig(newConfig.getProducerConfig()); + } return mergedConfig; } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ProducerConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ProducerConfigUtils.java index 8e73ff80bbb67..27fd8c9ca700d 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ProducerConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ProducerConfigUtils.java @@ -21,6 +21,8 @@ import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.utils.functions.ProducerConfigFromProtobufConverter; +import org.apache.pulsar.functions.utils.functions.ProducerConfigToProtobufConverter; public class ProducerConfigUtils { public static Function.ProducerSpec convert(ProducerConfig conf) { @@ -37,7 +39,14 @@ public static Function.ProducerSpec convert(ProducerConfig conf) { if (conf.getBatchBuilder() != null) { pbldr.setBatchBuilder(conf.getBatchBuilder()); } - + pbldr.setBatchingDisabled(conf.getBatchingDisabled()); + pbldr.setChunkingEnabled(conf.getChunkingEnabled()); + pbldr.setBlockIfQueueFullDisabled(conf.getBlockIfQueueFullDisabled()); + pbldr.setBatchingMaxPublishDelay(conf.getBatchingMaxPublishDelay()); + ProducerConfigToProtobufConverter converter = new ProducerConfigToProtobufConverter(conf); + pbldr.setCompressionType(converter.getCompressionType()); + pbldr.setHashingScheme(converter.getHashingScheme()); + pbldr.setMessageRoutingMode(converter.getMessageRoutingMode()); return pbldr.build(); } @@ -53,6 +62,23 @@ public static ProducerConfig convertFromSpec(Function.ProducerSpec spec) { producerConfig.setBatchBuilder(spec.getBatchBuilder()); } producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers()); + + producerConfig.setBatchingDisabled(spec.getBatchingDisabled()); + producerConfig.setChunkingEnabled(spec.getChunkingEnabled()); + producerConfig.setBlockIfQueueFullDisabled(spec.getBlockIfQueueFullDisabled()); + producerConfig.setBatchingMaxPublishDelay(spec.getBatchingMaxPublishDelay()); + + ProducerConfigFromProtobufConverter converter = new ProducerConfigFromProtobufConverter(spec); + if (spec.getCompressionType() != null){ + producerConfig.setCompressionType(converter.getCompressionType()); + } + if (spec.getHashingScheme() != null){ + producerConfig.setHashingScheme(converter.getHashingScheme()); + } + if (spec.getMessageRoutingMode() != null){ + producerConfig.setMessageRoutingMode(converter.getMessageRoutingMode()); + } + return producerConfig; } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/ProducerConfigFromProtobufConverter.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/ProducerConfigFromProtobufConverter.java new file mode 100644 index 0000000000000..b83d55b7ee933 --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/ProducerConfigFromProtobufConverter.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils.functions; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.HashingScheme; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.common.functions.CryptoConfig; +import org.apache.pulsar.common.functions.ProducerConfig; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.utils.CryptoUtils; + +@Getter +@AllArgsConstructor +public class ProducerConfigFromProtobufConverter { + private Function.ProducerSpec spec; + + public CompressionType getCompressionType() { + Function.CompressionType compressionType = spec.getCompressionType(); + switch (compressionType){ + case NONE: + return CompressionType.NONE; + case ZLIB: + return CompressionType.ZLIB; + case ZSTD: + return CompressionType.ZSTD; + case SNAPPY: + return CompressionType.SNAPPY; + case LZ4: + default: + return CompressionType.LZ4; + } + } + public HashingScheme getHashingScheme() { + Function.HashingScheme hashingScheme = spec.getHashingScheme(); + switch (hashingScheme){ + case JAVA_STRING_HASH: + return HashingScheme.JavaStringHash; + case MURMUR3_32HASH: + default: + return HashingScheme.Murmur3_32Hash; + } + } + public MessageRoutingMode getMessageRoutingMode() { + Function.MessageRoutingMode messageRoutingMode = spec.getMessageRoutingMode(); + switch (messageRoutingMode){ + case SINGLE_PARTITION: + return MessageRoutingMode.SinglePartition; + case ROUND_ROBIN_PARTITION: + return MessageRoutingMode.RoundRobinPartition; + case CUSTOM_PARTITION: + default: + return MessageRoutingMode.CustomPartition; + + } + } + + public boolean getBatchingDisabled(){ + return spec.getBatchingDisabled(); + } + public boolean getBatchingEnabled(){ + return !this.getBatchingDisabled(); + } + public boolean getChunkingEnabled(){ + return spec.getChunkingEnabled(); + } + public boolean getChunkingDisabled(){ + return !this.getChunkingEnabled(); + } + public boolean getBlockIfQueueFullDisabled(){ + return spec.getBlockIfQueueFullDisabled(); + } + public boolean getBlockIfQueueFullEnabled(){ + return !this.getBlockIfQueueFullDisabled(); + } + public long getBatchingMaxPublishDelay(){ + return spec.getBatchingMaxPublishDelay() == 0L ? 10L : spec.getBatchingMaxPublishDelay(); + } + public int getMaxPendingMessages(){ + return spec.getMaxPendingMessages(); + } + public int getMaxPendingMessagesAcrossPartitions(){ + return spec.getMaxPendingMessagesAcrossPartitions(); + } + public boolean getUseThreadLocalProducers(){ + return spec.getUseThreadLocalProducers(); + } + public CryptoConfig getCryptoConfig(){ + return CryptoUtils.convertFromSpec(spec.getCryptoSpec()); + } + + + public ProducerConfig getProducerConfig(){ + ProducerConfig.ProducerConfigBuilder builder = ProducerConfig + .builder() + .batchingDisabled(this.getBatchingDisabled()) + .chunkingEnabled(this.getChunkingEnabled()) + .blockIfQueueFullDisabled(this.getBlockIfQueueFullDisabled()) + .compressionType(this.getCompressionType()) + .hashingScheme(this.getHashingScheme()) + .messageRoutingMode(this.getMessageRoutingMode()) + .batchingMaxPublishDelay(this.getBatchingMaxPublishDelay()) + .maxPendingMessages(this.getMaxPendingMessages()) + .maxPendingMessagesAcrossPartitions(this.getMaxPendingMessagesAcrossPartitions()) + .useThreadLocalProducers(this.getUseThreadLocalProducers()) + .cryptoConfig(this.getCryptoConfig()); + return builder.build(); + } +} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/ProducerConfigToProtobufConverter.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/ProducerConfigToProtobufConverter.java new file mode 100644 index 0000000000000..73df85f3d2db0 --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/ProducerConfigToProtobufConverter.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils.functions; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.HashingScheme; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.common.functions.ProducerConfig; +import org.apache.pulsar.functions.proto.Function; + +@Getter +@AllArgsConstructor +public class ProducerConfigToProtobufConverter { + private ProducerConfig config; + + public Function.CompressionType getCompressionType() { + CompressionType compressionType = config.getCompressionType(); + switch (compressionType){ + case NONE: + return Function.CompressionType.NONE; + case ZLIB: + return Function.CompressionType.ZLIB; + case ZSTD: + return Function.CompressionType.ZSTD; + case SNAPPY: + return Function.CompressionType.SNAPPY; + default: + return Function.CompressionType.LZ4; + } + } + public Function.HashingScheme getHashingScheme() { + HashingScheme hashingScheme = config.getHashingScheme(); + switch (hashingScheme){ + case JavaStringHash: + return Function.HashingScheme.JAVA_STRING_HASH; + default: + return Function.HashingScheme.MURMUR3_32HASH; + } + } + public Function.MessageRoutingMode getMessageRoutingMode() { + MessageRoutingMode messageRoutingMode = config.getMessageRoutingMode(); + switch (messageRoutingMode){ + case SinglePartition: + return Function.MessageRoutingMode.SINGLE_PARTITION; + case RoundRobinPartition: + return Function.MessageRoutingMode.ROUND_ROBIN_PARTITION; + default: + return Function.MessageRoutingMode.CUSTOM_PARTITION; + } + } + public boolean getBatchingDisabled(){ + return config.getBatchingDisabled(); + } + public boolean getBatchingEnabled(){ + return !this.getBatchingDisabled(); + } + public boolean getChunkingDisabled(){ + return config.getChunkingEnabled(); + } + public boolean getBlockIfQueueFullDisabled(){ + return config.getBlockIfQueueFullDisabled(); + } + public boolean getBlockIfQueueFullEnabled(){ + return !this.getBlockIfQueueFullDisabled(); + } + public long getBatchingMaxPublishDelay(){ + return config.getBatchingMaxPublishDelay(); + } +} diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java index 03c45bd4bd854..ad8121e3fcbd5 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java @@ -20,6 +20,9 @@ import com.google.gson.Gson; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.HashingScheme; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; @@ -41,6 +44,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; /** @@ -76,6 +80,13 @@ public void testConvertBackFidelity() { producerConfig.setMaxPendingMessagesAcrossPartitions(1000); producerConfig.setUseThreadLocalProducers(true); producerConfig.setBatchBuilder("DEFAULT"); + producerConfig.setBatchingDisabled(false); + producerConfig.setChunkingEnabled(false); + producerConfig.setBlockIfQueueFullDisabled(false); + producerConfig.setCompressionType(CompressionType.LZ4); + producerConfig.setHashingScheme(HashingScheme.Murmur3_32Hash); + producerConfig.setMessageRoutingMode(MessageRoutingMode.CustomPartition); + producerConfig.setBatchingMaxPublishDelay(12L); functionConfig.setProducerConfig(producerConfig); Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null); FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails); @@ -117,6 +128,13 @@ public void testConvertWindow() { producerConfig.setMaxPendingMessagesAcrossPartitions(1000); producerConfig.setUseThreadLocalProducers(true); producerConfig.setBatchBuilder("KEY_BASED"); + producerConfig.setBatchingDisabled(false); + producerConfig.setChunkingEnabled(false); + producerConfig.setBlockIfQueueFullDisabled(false); + producerConfig.setCompressionType(CompressionType.LZ4); + producerConfig.setHashingScheme(HashingScheme.Murmur3_32Hash); + producerConfig.setMessageRoutingMode(MessageRoutingMode.CustomPartition); + producerConfig.setBatchingMaxPublishDelay(12L); functionConfig.setProducerConfig(producerConfig); Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null); FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails); @@ -459,6 +477,31 @@ private FunctionConfig createFunctionConfig() { functionConfig.setWindowConfig(new WindowConfig().setWindowLengthCount(10)); functionConfig.setCleanupSubscription(true); functionConfig.setRuntimeFlags("-Dfoo=bar"); + + ProducerConfig producerConfig = new ProducerConfig(); + producerConfig.setBatchingDisabled(false); + producerConfig.setChunkingEnabled(false); + producerConfig.setBlockIfQueueFullDisabled(false); + producerConfig.setCompressionType(CompressionType.SNAPPY); + producerConfig.setHashingScheme(HashingScheme.Murmur3_32Hash); + producerConfig.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); + producerConfig.setBatchingMaxPublishDelay(21L); + + functionConfig.setProducerConfig(producerConfig); + return functionConfig; + } + + private FunctionConfig createUpdatedFunctionConfigDefaults(String fieldName, Object fieldValue) { + FunctionConfig functionConfig = createFunctionConfig(); + ProducerConfig producerConfig = functionConfig.getProducerConfig(); + Class fClass = ProducerConfig.class; + try { + Field chap = fClass.getDeclaredField(fieldName); + chap.setAccessible(true); + chap.set(producerConfig, fieldValue); + } catch (Exception e) { + throw new RuntimeException("Something wrong with the test", e); + } return functionConfig; } @@ -573,4 +616,121 @@ public void testMergeDifferentOutputSchemaTypes() { FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("outputSchemaType", "avro"); FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); } + + @Test + public void testMergeFunctionDefaults_validateUpdate_whenIgnoringExistingDefaults_ExistingConfigsAreNull(){ + FunctionConfig existingFunctionConfig = createFunctionConfig(); + ProducerConfig existingProducerConfig = new ProducerConfig(); + existingProducerConfig.setBatchingDisabled(false); + existingFunctionConfig.setProducerConfig(existingProducerConfig); + + ProducerConfig newProducerConfig = new ProducerConfig(); + newProducerConfig.setBatchingDisabled(true); + FunctionConfig newFunctionConfig = createFunctionConfig(); + newFunctionConfig.setProducerConfig(newProducerConfig); + + FunctionConfig mergedConfig = FunctionConfigUtils + .validateUpdate(existingFunctionConfig, newFunctionConfig); + + assertNotNull(mergedConfig.getProducerConfig()); + assertNotNull(existingFunctionConfig.getProducerConfig().getBatchingDisabled()); + assertNotNull(newFunctionConfig.getProducerConfig().getBatchingDisabled()); + assertEquals((boolean)existingFunctionConfig.getProducerConfig().getBatchingDisabled(), false); + assertEquals((boolean)mergedConfig.getProducerConfig().getBatchingDisabled(), true); + assertNull(mergedConfig.getProducerConfig().getBlockIfQueueFullDisabled()); + assertNull(mergedConfig.getProducerConfig().getChunkingEnabled()); + assertNull(mergedConfig.getProducerConfig().getCompressionType()); + assertNull(mergedConfig.getProducerConfig().getHashingScheme()); + assertNull(mergedConfig.getProducerConfig().getMessageRoutingMode()); + assertNull(mergedConfig.getProducerConfig().getBatchingMaxPublishDelay()); + } + @Test + public void testMergeFunctionDefaults_validateUpdate_whenNotIgnoringExistingDefaults_ExistingConfigsOverride(){ + FunctionConfig existingFunctionConfig = createFunctionConfig(); + + ProducerConfig newProducerConfig = new ProducerConfig(); + newProducerConfig.setBatchingDisabled(true); + FunctionConfig newFunctionConfig = createFunctionConfig(); + newFunctionConfig.setProducerConfig(newProducerConfig); + + FunctionConfig mergedConfig = FunctionConfigUtils + .validateUpdate(existingFunctionConfig, newFunctionConfig); + + assertNotNull(mergedConfig.getProducerConfig()); + assertNotNull(existingFunctionConfig.getProducerConfig().getBatchingDisabled()); + assertNotNull(newFunctionConfig.getProducerConfig().getBatchingDisabled()); + assertEquals((boolean)existingFunctionConfig.getProducerConfig().getBatchingDisabled(), false); + assertEquals((boolean)mergedConfig.getProducerConfig().getBatchingDisabled(), true); + assertNotNull(mergedConfig.getProducerConfig().getBlockIfQueueFullDisabled()); + assertNotNull(mergedConfig.getProducerConfig().getChunkingEnabled()); + assertNotNull(mergedConfig.getProducerConfig().getCompressionType()); + assertNotNull(mergedConfig.getProducerConfig().getHashingScheme()); + assertNotNull(mergedConfig.getProducerConfig().getMessageRoutingMode()); + assertNotNull(mergedConfig.getProducerConfig().getBatchingMaxPublishDelay()); + + assertEquals((boolean)mergedConfig.getProducerConfig().getBlockIfQueueFullDisabled(), false); + assertEquals((boolean)mergedConfig.getProducerConfig().getChunkingEnabled(), false); + assertEquals(mergedConfig.getProducerConfig().getCompressionType(), CompressionType.SNAPPY); + assertEquals(mergedConfig.getProducerConfig().getHashingScheme(), HashingScheme.Murmur3_32Hash); + assertEquals(mergedConfig.getProducerConfig().getMessageRoutingMode(), MessageRoutingMode.RoundRobinPartition); + assertEquals((long)mergedConfig.getProducerConfig().getBatchingMaxPublishDelay(), 21L); + } + @Test + public void testMergeFunctionDefaults_overrideBatchingDisabled(){ + FunctionConfig existingFunctionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfigDefaults("batchingDisabled", true); + FunctionConfig mergedConfig = FunctionConfigUtils + .validateUpdate(existingFunctionConfig, newFunctionConfig); + assertEquals((boolean)mergedConfig.getProducerConfig().getBatchingDisabled(), true); + } + @Test + public void testMergeFunctionDefaults_overrideChunkingEnabled(){ + FunctionConfig existingFunctionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfigDefaults("chunkingEnabled", true); + FunctionConfig mergedConfig = FunctionConfigUtils + .validateUpdate(existingFunctionConfig, newFunctionConfig); + assertEquals((boolean)mergedConfig.getProducerConfig().getChunkingEnabled(), true); + } + @Test + public void testMergeFunctionDefaults_overrideBlockIfQueueFullDisabled(){ + FunctionConfig existingFunctionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfigDefaults("blockIfQueueFullDisabled", true); + FunctionConfig mergedConfig = FunctionConfigUtils + .validateUpdate(existingFunctionConfig, newFunctionConfig); + assertEquals((boolean)mergedConfig.getProducerConfig().getBlockIfQueueFullDisabled(), true); + } + @Test + public void testMergeFunctionDefaults_overrideCompressionType(){ + FunctionConfig existingFunctionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfigDefaults("compressionType", CompressionType.ZLIB); + FunctionConfig mergedConfig = FunctionConfigUtils + .validateUpdate(existingFunctionConfig, newFunctionConfig); + assertEquals(mergedConfig.getProducerConfig().getCompressionType(), CompressionType.ZLIB); + } + @Test + public void testMergeFunctionDefaults_overrideHashingScheme(){ + FunctionConfig existingFunctionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfigDefaults("hashingScheme", HashingScheme.JavaStringHash); + FunctionConfig mergedConfig = FunctionConfigUtils + .validateUpdate(existingFunctionConfig, newFunctionConfig); + assertEquals(mergedConfig.getProducerConfig().getHashingScheme(), HashingScheme.JavaStringHash); + } + @Test + public void testMergeFunctionDefaults_overrideMessageRoutingMode(){ + FunctionConfig existingFunctionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfigDefaults("messageRoutingMode", + MessageRoutingMode.SinglePartition); + FunctionConfig mergedConfig = FunctionConfigUtils + .validateUpdate(existingFunctionConfig, newFunctionConfig); + assertEquals(mergedConfig.getProducerConfig().getMessageRoutingMode(), + MessageRoutingMode.SinglePartition); + } + @Test + public void testMergeFunctionDefaults_overrideBatchingMaxPublishDelay(){ + FunctionConfig existingFunctionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfigDefaults("batchingMaxPublishDelay", 5L); + FunctionConfig mergedConfig = FunctionConfigUtils + .validateUpdate(existingFunctionConfig, newFunctionConfig); + assertEquals((long)mergedConfig.getProducerConfig().getBatchingMaxPublishDelay(), 5L); + } } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java index 20a64f86b9ebd..ae539d0a2d0e1 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java @@ -22,6 +22,9 @@ import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.HashingScheme; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.functions.Resources; @@ -367,6 +370,13 @@ private SourceConfig createSourceConfig() { configs.put("consumerConfigProperties", consumerConfigs); ProducerConfig producerConfig = new ProducerConfig(); + producerConfig.setBatchingDisabled(true); + producerConfig.setChunkingEnabled(false); + producerConfig.setBlockIfQueueFullDisabled(true); + producerConfig.setCompressionType(CompressionType.SNAPPY); + producerConfig.setHashingScheme(HashingScheme.JavaStringHash); + producerConfig.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); + producerConfig.setBatchingMaxPublishDelay(12L); producerConfig.setMaxPendingMessages(100); producerConfig.setMaxPendingMessagesAcrossPartitions(1000); producerConfig.setUseThreadLocalProducers(true); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java index 03c1a149ed8c8..3311b196bb5ef 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java @@ -25,7 +25,11 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Tenants; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.HashingScheme; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.functions.api.Context; @@ -359,6 +363,17 @@ public static FunctionConfig createDefaultFunctionConfig() { functionConfig.setOutput(outputTopic); functionConfig.setOutputSerdeClassName(outputSerdeClassName); functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + + ProducerConfig producerConfig = new ProducerConfig(); + producerConfig.setBatchingDisabled(false); + producerConfig.setChunkingEnabled(false); + producerConfig.setBlockIfQueueFullDisabled(false); + producerConfig.setCompressionType(CompressionType.SNAPPY); + producerConfig.setHashingScheme(HashingScheme.Murmur3_32Hash); + producerConfig.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); + producerConfig.setBatchingMaxPublishDelay(21L); + + functionConfig.setProducerConfig(producerConfig); return functionConfig; }