diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 96ea2004be8d7..69a38bc50de9d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -283,6 +283,10 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { if (log.isDebugEnabled()) { log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies); } + if (!isSystemTopic()) { + updateNamespacePublishRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); + updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); + } topicPolicies.getRetentionPolicies().updateNamespaceValue(namespacePolicies.retention_policies); topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold); topicPolicies.getReplicationClusters().updateNamespaceValue( @@ -305,7 +309,6 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled); topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue( namespacePolicies.deduplicationSnapshotIntervalSeconds); - updateNamespacePublishRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); topicPolicies.getDelayedDeliveryEnabled().updateNamespaceValue( Optional.ofNullable(namespacePolicies.delayed_delivery_policies) .map(DelayedDeliveryPolicies::isActive).orElse(null)); @@ -326,7 +329,6 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { updateNamespaceSubscriptionDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies); - updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced); topicPolicies.getEntryFilters().updateNamespaceValue(namespacePolicies.entryFilters); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DisabledPublishRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DisabledPublishRateLimiter.java new file mode 100644 index 0000000000000..372918e9d3209 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DisabledPublishRateLimiter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.PublishRate; + +public class DisabledPublishRateLimiter implements PublishRateLimiter { + + public static final DisabledPublishRateLimiter INSTANCE = new DisabledPublishRateLimiter(); + + private DisabledPublishRateLimiter() {} + + @Override + public void handlePublishThrottling(Producer producer, int numOfMessages, long msgSizeInBytes) { + + } + + @Override + public void update(Policies policies, String clusterName) { + + } + + @Override + public void update(PublishRate maxPublishRate) { + + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index f2cec2138a3a0..8feb432a08001 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -19,15 +19,20 @@ package org.apache.pulsar.broker.service.persistent; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.DisabledPublishRateLimiter; +import org.apache.pulsar.broker.service.PublishRateLimiter; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.EntryFilters; +import org.apache.pulsar.common.policies.data.Policies; public class SystemTopic extends PersistentTopic { @@ -111,4 +116,19 @@ public EntryFilters getEntryFiltersPolicy() { public List getEntryFilters() { return null; } + + @Override + public PublishRateLimiter getBrokerPublishRateLimiter() { + return DisabledPublishRateLimiter.INSTANCE; + } + + @Override + public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) { + // nothing todo. + } + + @Override + public Optional getBrokerDispatchRateLimiter() { + return Optional.empty(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java index 721d049342552..554bc754c5c43 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java @@ -20,6 +20,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.broker.qos.AsyncTokenBucket; @@ -73,6 +75,22 @@ public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Excepti } } + @Test + public void testSystemTopicPublishNonBlock() throws Exception { + super.baseSetup(); + PublishRate publishRate = new PublishRate(1,10); + admin.namespaces().setPublishRate("prop/ns-abc", publishRate); + final String topic = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp"); + PulsarAdmin admin1 = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null + ? brokerUrl.toString() : brokerUrlTls.toString()).readTimeout(5, TimeUnit.SECONDS).build(); + admin1.topics().createNonPartitionedTopic(topic); + admin1.topicPolicies().setDeduplicationStatus(topic, true); + admin1.topicPolicies().setDeduplicationStatus(topic, false); + // cleanup. + admin.namespaces().removePublishRate("prop/ns-abc"); + admin1.close(); + } + @Test public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception { PublishRate publishRate = new PublishRate(1,10); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java index 360d27f64133d..a544c7e13bc83 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -48,6 +49,8 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.awaitility.Awaitility; @@ -214,6 +217,42 @@ public void testMessageRateDynamicallyChange() throws Exception { producer.close(); } + @SuppressWarnings("deprecation") + @Test + public void testSystemTopicDeliveryNonBlock() throws Exception { + final String namespace = "my-property/throttling_ns"; + admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); + final String topicName = "persistent://" + namespace + "/" + UUID.randomUUID().toString().replaceAll("-", ""); + admin.topics().createNonPartitionedTopic(topicName); + // Set a rate limitation. + DispatchRate dispatchRate = DispatchRate.builder() + .dispatchThrottlingRateInMsg(1) + .dispatchThrottlingRateInByte(-1) + .ratePeriodInSecond(360) + .build(); + admin.namespaces().setDispatchRate(namespace, dispatchRate); + + // Verify the limitation does not take effect. in other words, the topic policies should takes effect. + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + admin.topicPolicies().setPublishRate(topicName, new PublishRate(1000, 1000)); + Awaitility.await().untilAsserted(() -> { + assertNotNull(persistentTopic.getHierarchyTopicPolicies().getPublishRate().getTopicValue()); + }); + admin.topicPolicies().setRetention(topicName, new RetentionPolicies(1000, 1000)); + Awaitility.await().untilAsserted(() -> { + assertNotNull(persistentTopic.getHierarchyTopicPolicies().getRetentionPolicies().getTopicValue()); + }); + admin.topicPolicies().setMessageTTL(topicName, 1000); + Awaitility.await().untilAsserted(() -> { + assertNotNull(persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().getTopicValue()); + }); + + // cleanup. + admin.topics().delete(topicName); + admin.namespaces().removeDispatchRate(namespace); + } + /** * verify: consumer should not receive all messages due to message-rate throttling *