Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker][branch-2.10] Fix inconsistent topic policy #21258

Merged
merged 12 commits into from
Oct 7, 2023

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
*/
package org.apache.pulsar.broker.service;

import static java.util.Objects.requireNonNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -78,8 +80,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic

private final Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
readerCaches = new ConcurrentHashMap<>();
@VisibleForTesting
final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>();

final Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap = new ConcurrentHashMap<>();

@VisibleForTesting
final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -219,12 +221,12 @@ public TopicPolicies getTopicPolicies(TopicName topicName,
boolean isGlobal) throws TopicPoliciesCacheNotInitException {
if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
NamespaceName namespace = topicName.getNamespaceObject();
prepareInitPoliciesCache(namespace, new CompletableFuture<>());
prepareInitPoliciesCacheAsync(namespace);
}

MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result = new MutablePair<>();
policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, initialized) -> {
if (initialized == null || !initialized) {
if (initialized == null || !initialized.isDone()) {
result.setLeft(new TopicPoliciesCacheNotInitException());
} else {
TopicPolicies topicPolicies =
Expand All @@ -242,6 +244,34 @@ public TopicPolicies getTopicPolicies(TopicName topicName,
}
}

@Nonnull
@Override
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName,
boolean isGlobal) {
requireNonNull(topicName);
final CompletableFuture<Void> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
return preparedFuture.thenApply(__ -> {
final TopicPolicies candidatePolicies = isGlobal
? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
: policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
return Optional.ofNullable(candidatePolicies);
});
}

@Nonnull
@Override
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName) {
requireNonNull(topicName);
final CompletableFuture<Void> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
return preparedFuture.thenApply(__ -> {
final TopicPolicies localPolicies = policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
if (localPolicies != null) {
return Optional.of(localPolicies);
}
return Optional.ofNullable(globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())));
});
}

@Override
public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
Expand All @@ -265,40 +295,49 @@ public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicNa

@Override
public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
CompletableFuture<Void> result = new CompletableFuture<>();
NamespaceName namespace = namespaceBundle.getNamespaceObject();
if (NamespaceService.checkHeartbeatNamespace(namespace) != null
|| NamespaceService.checkHeartbeatNamespaceV2(namespace) != null) {
result.complete(null);
return result;
return CompletableFuture.completedFuture(null);
}
synchronized (this) {
if (readerCaches.get(namespace) != null) {
ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
result.complete(null);
return CompletableFuture.completedFuture(null);
} else {
prepareInitPoliciesCache(namespace, result);
return prepareInitPoliciesCacheAsync(namespace);
}
}
return result;
}

private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
private @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
requireNonNull(namespace);
return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
readerCompletableFuture.thenAccept(reader -> {
initPolicesCache(reader, result);
result.thenRun(() -> readMorePolicies(reader));
}).exceptionally(ex -> {
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
cleanCacheAndCloseReader(namespace, false);
result.completeExceptionally(ex);
final CompletableFuture<Void> initFuture = readerCompletableFuture
.thenCompose(reader -> {
final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
initPolicesCache(reader, stageFuture);
return stageFuture
// Read policies in background
.thenAccept(__ -> readMorePoliciesAsync(reader));
});
initFuture.exceptionally(ex -> {
try {
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
cleanCacheAndCloseReader(namespace, false);
} catch (Throwable cleanupEx) {
// Adding this catch to avoid break callback chain
log.error("[{}] Failed to cleanup reader on __change_events topic", namespace, cleanupEx);
}
return null;
});
}
// let caller know we've got an exception.
return initFuture;
});
}

protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClientWithRetry(
Expand Down Expand Up @@ -382,8 +421,7 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
if (log.isDebugEnabled()) {
log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName());
}
policyCacheInitMap.computeIfPresent(
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);

// replay policy message
policiesCache.forEach(((topicName, topicPolicies) -> {
if (listeners.get(topicName) != null) {
Expand All @@ -396,6 +434,7 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
}
}
}));

future.complete(null);
}
});
Expand All @@ -421,15 +460,21 @@ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean
});
}

private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
/**
* This is an async method for the background reader to continue syncing new messages.
*
* Note: You should not do any blocking call here. because it will affect
* #{@link SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method to block loading topic.
*/
private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader) {
reader.readNextAsync()
.thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
notifyListener(msg);
})
.whenComplete((__, ex) -> {
if (ex == null) {
readMorePolicies(reader);
readMorePoliciesAsync(reader);
} else {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof PulsarClientException.AlreadyClosedException) {
Expand All @@ -438,7 +483,7 @@ private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
log.warn("Read more topic polices exception, read again.", ex);
readMorePolicies(reader);
readMorePoliciesAsync(reader);
}
}
});
Expand Down Expand Up @@ -591,7 +636,7 @@ boolean checkReaderIsCached(NamespaceName namespaceName) {
}

@VisibleForTesting
public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
public CompletableFuture<Void> getPoliciesCacheInit(NamespaceName namespaceName) {
return policyCacheInitMap.get(namespaceName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
Expand Down Expand Up @@ -109,6 +110,32 @@ default CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetr
return response;
}

/**
* Asynchronously retrieves topic policies.
* This triggers the Pulsar broker's internal client to load policies from the
* system topic `persistent://tenant/namespace/__change_event`.
*
* @param topicName The name of the topic.
* @param isGlobal Indicates if the policies are global.
* @return A CompletableFuture containing an Optional of TopicPolicies.
* @throws NullPointerException If the topicName is null.
*/
@Nonnull
CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName, boolean isGlobal);

/**
* Asynchronously retrieves topic policies.
* This triggers the Pulsar broker's internal client to load policies from the
* system topic `persistent://tenant/namespace/__change_event`.
*
* NOTE: If local policies are not available, it will fallback to using topic global policies.
* @param topicName The name of the topic.
* @return A CompletableFuture containing an Optional of TopicPolicies.
* @throws NullPointerException If the topicName is null.
*/
@Nonnull
CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName);

/**
* Get policies for a topic without cache async.
* @param topicName topic name
Expand Down Expand Up @@ -162,6 +189,19 @@ public TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal)
return null;
}

@Nonnull
@Override
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName,
boolean isGlobal) {
return CompletableFuture.completedFuture(Optional.empty());
}

@Nonnull
@Override
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName) {
return CompletableFuture.completedFuture(Optional.empty());
}

@Override
public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws Excep
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
//make sure namespace policy reader is fully started.
Awaitility.await().untilAsserted(()-> {
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()).isDone());
});

//load the topic.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

@Slf4j
@Test(groups = "broker-admin")
public class TopicPoliciesWithBrokerRestartTest extends MockedPulsarServiceBaseTest {


@Override
protected void doInitConf() throws Exception {
super.doInitConf();
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
}

@Override
@BeforeClass(alwaysRun = true)
protected void setup() throws Exception {
super.internalSetup();
setupDefaultTenantAndNamespace();
}

@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
super.internalCleanup();
}


@Test
public void testRetentionWithBrokerRestart() throws Exception {
final int messages = 1_000;
final int topicNum = 500;
// (1) Init topic
admin.namespaces().createNamespace("public/retention");
final String topicName = "persistent://public/retention/retention_with_broker_restart";
admin.topics().createNonPartitionedTopic(topicName);
for (int i = 0; i < topicNum; i++) {
final String shadowTopicNames = topicName + "_" + i;
admin.topics().createNonPartitionedTopic(shadowTopicNames);
}
// (2) Set retention
final RetentionPolicies retentionPolicies = new RetentionPolicies(20, 20);
for (int i = 0; i < topicNum; i++) {
final String shadowTopicNames = topicName + "_" + i;
admin.topicPolicies().setRetention(shadowTopicNames, retentionPolicies);
}
admin.topicPolicies().setRetention(topicName, retentionPolicies);
// (3) Send messages
@Cleanup
final Producer<byte[]> publisher = pulsarClient.newProducer()
.topic(topicName)
.create();
for (int i = 0; i < messages; i++) {
publisher.send((i + "").getBytes(StandardCharsets.UTF_8));
}
// (4) Check configuration
Awaitility.await().untilAsserted(() -> {
final PersistentTopic persistentTopic1 = (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, true).join().get();
final ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger();
Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(), 20);
Assert.assertEquals(managedLedger1.getConfig().getRetentionTimeMillis(),
TimeUnit.MINUTES.toMillis(20));
});
// (5) Restart broker
restartBroker();
// (6) Check configuration again
for (int i = 0; i < topicNum; i++) {
final String shadowTopicNames = topicName + "_" + i;
admin.lookups().lookupTopic(shadowTopicNames);
final PersistentTopic persistentTopicTmp = (PersistentTopic)
pulsar.getBrokerService().getTopic(shadowTopicNames, true).join().get();
final ManagedLedgerImpl managedLedgerTemp = (ManagedLedgerImpl) persistentTopicTmp.getManagedLedger();
Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionSizeInMB(), 20);
Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionTimeMillis(),
TimeUnit.MINUTES.toMillis(20));
}
}
}
Loading
Loading