Skip to content

Commit

Permalink
[improve][broker] Exclude system topics from namespace level publish …
Browse files Browse the repository at this point in the history
…and dispatch rate limiting (apache#23589)

(cherry picked from commit 9bcbb20)
(cherry picked from commit aa4dbf3)
  • Loading branch information
poorbarcode authored and nikhil-ctds committed Nov 13, 2024
1 parent 5695a17 commit 8fb4d2c
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
if (log.isDebugEnabled()) {
log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies);
}
if (!isSystemTopic()) {
updateNamespacePublishRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
}
topicPolicies.getRetentionPolicies().updateNamespaceValue(namespacePolicies.retention_policies);
topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold);
topicPolicies.getReplicationClusters().updateNamespaceValue(
Expand All @@ -281,7 +285,6 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue(
namespacePolicies.deduplicationSnapshotIntervalSeconds);
updateNamespacePublishRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
topicPolicies.getDelayedDeliveryEnabled().updateNamespaceValue(
Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
.map(DelayedDeliveryPolicies::isActive).orElse(null));
Expand All @@ -299,7 +302,6 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
updateNamespaceSubscriptionDispatchRate(namespacePolicies,
brokerService.getPulsar().getConfig().getClusterName());
updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced);
topicPolicies.getEntryFilters().updateNamespaceValue(namespacePolicies.entryFilters);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@
package org.apache.pulsar.broker.service.persistent;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PublishRateLimiter;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.Policies;

public class SystemTopic extends PersistentTopic {

Expand Down Expand Up @@ -111,4 +115,19 @@ public EntryFilters getEntryFiltersPolicy() {
public List<EntryFilter> getEntryFilters() {
return null;
}

@Override
public PublishRateLimiter getBrokerPublishRateLimiter() {
return PublishRateLimiter.DISABLED_RATE_LIMITER;
}

@Override
public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) {
// nothing todo.
}

@Override
public Optional<DispatchRateLimiter> getBrokerDispatchRateLimiter() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.util.RateLimiter;
Expand Down Expand Up @@ -115,6 +117,22 @@ public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Excepti
super.internalCleanup();
}

@Test
public void testSystemTopicPublishNonBlock() throws Exception {
super.baseSetup();
PublishRate publishRate = new PublishRate(1,10);
admin.namespaces().setPublishRate("prop/ns-abc", publishRate);
final String topic = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
PulsarAdmin admin1 = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
? brokerUrl.toString() : brokerUrlTls.toString()).readTimeout(5, TimeUnit.SECONDS).build();
admin1.topics().createNonPartitionedTopic(topic);
admin1.topicPolicies().setDeduplicationStatus(topic, true);
admin1.topicPolicies().setDeduplicationStatus(topic, false);
// cleanup.
admin.namespaces().removePublishRate("prop/ns-abc");
admin1.close();
}

@Test
public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception {
PublishRate publishRate = new PublishRate(1,10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -48,6 +49,8 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
Expand Down Expand Up @@ -211,6 +214,42 @@ public void testMessageRateDynamicallyChange() throws Exception {
producer.close();
}

@SuppressWarnings("deprecation")
@Test
public void testSystemTopicDeliveryNonBlock() throws Exception {
final String namespace = "my-property/throttling_ns";
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
final String topicName = "persistent://" + namespace + "/" + UUID.randomUUID().toString().replaceAll("-", "");
admin.topics().createNonPartitionedTopic(topicName);
// Set a rate limitation.
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(1)
.dispatchThrottlingRateInByte(-1)
.ratePeriodInSecond(360)
.build();
admin.namespaces().setDispatchRate(namespace, dispatchRate);

// Verify the limitation does not take effect. in other words, the topic policies should takes effect.
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
admin.topicPolicies().setPublishRate(topicName, new PublishRate(1000, 1000));
Awaitility.await().untilAsserted(() -> {
assertNotNull(persistentTopic.getHierarchyTopicPolicies().getPublishRate().getTopicValue());
});
admin.topicPolicies().setRetention(topicName, new RetentionPolicies(1000, 1000));
Awaitility.await().untilAsserted(() -> {
assertNotNull(persistentTopic.getHierarchyTopicPolicies().getRetentionPolicies().getTopicValue());
});
admin.topicPolicies().setMessageTTL(topicName, 1000);
Awaitility.await().untilAsserted(() -> {
assertNotNull(persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().getTopicValue());
});

// cleanup.
admin.topics().delete(topicName);
admin.namespaces().removeDispatchRate(namespace);
}

/**
* verify: consumer should not receive all messages due to message-rate throttling
*
Expand Down

0 comments on commit 8fb4d2c

Please sign in to comment.