diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 08550886ecb4a..b028bfa3e4f24 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -150,6 +150,7 @@ import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; @@ -1577,137 +1578,141 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { }); } - public CompletableFuture getManagedLedgerConfig(TopicName topicName) { + public CompletableFuture getManagedLedgerConfig(@Nonnull TopicName topicName) { + requireNonNull(topicName); NamespaceName namespace = topicName.getNamespaceObject(); ServiceConfiguration serviceConfig = pulsar.getConfiguration(); NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources(); LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies(); - return nsr.getPoliciesAsync(namespace) - .thenCombine(lpr.getLocalPoliciesAsync(namespace), (policies, localPolicies) -> { - PersistencePolicies persistencePolicies = null; - RetentionPolicies retentionPolicies = null; - OffloadPoliciesImpl topicLevelOffloadPolicies = null; - - if (pulsar.getConfig().isTopicLevelPoliciesEnabled() - && !NamespaceService.isSystemServiceNamespace(namespace.toString())) { - try { - TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName); - if (topicPolicies != null) { - persistencePolicies = topicPolicies.getPersistence(); - retentionPolicies = topicPolicies.getRetentionPolicies(); - topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies(); - } - } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { - log.debug("Topic {} policies have not been initialized yet.", topicName); - } - } - - if (persistencePolicies == null) { - persistencePolicies = policies.map(p -> p.persistence).orElseGet( - () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(), - serviceConfig.getManagedLedgerDefaultWriteQuorum(), - serviceConfig.getManagedLedgerDefaultAckQuorum(), - serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit())); - } - - if (retentionPolicies == null) { - retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( - () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), - serviceConfig.getDefaultRetentionSizeInMB()) - ); - } - + final CompletableFuture> topicPoliciesFuture; + if (pulsar.getConfig().isTopicLevelPoliciesEnabled() + && !NamespaceService.isSystemServiceNamespace(namespace.toString()) + && !EventsTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) { + topicPoliciesFuture = pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName); + } else { + topicPoliciesFuture = CompletableFuture.completedFuture(Optional.empty()); + } + return topicPoliciesFuture.thenCompose(topicPoliciesOptional -> { + final CompletableFuture> nsPolicies = nsr.getPoliciesAsync(namespace); + final CompletableFuture> lcPolicies = lpr.getLocalPoliciesAsync(namespace); + return nsPolicies.thenCombine(lcPolicies, (policies, localPolicies) -> { + PersistencePolicies persistencePolicies = null; + RetentionPolicies retentionPolicies = null; + OffloadPoliciesImpl topicLevelOffloadPolicies = null; + if (topicPoliciesOptional.isPresent()) { + final TopicPolicies topicPolicies = topicPoliciesOptional.get(); + persistencePolicies = topicPolicies.getPersistence(); + retentionPolicies = topicPolicies.getRetentionPolicies(); + topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies(); + } - ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); - managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble()); - managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum()); - managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum()); - if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) { - managedLedgerConfig - .setBookKeeperEnsemblePlacementPolicyClassName( - IsolatedBookieEnsemblePlacementPolicy.class); - Map properties = Maps.newHashMap(); - properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, - localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary()); - properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, - localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary()); - managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); - } - managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()); - managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType()); - managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword()); + if (persistencePolicies == null) { + persistencePolicies = policies.map(p -> p.persistence).orElseGet( + () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(), + serviceConfig.getManagedLedgerDefaultWriteQuorum(), + serviceConfig.getManagedLedgerDefaultAckQuorum(), + serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit())); + } - managedLedgerConfig - .setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist()); - managedLedgerConfig.setMaxUnackedRangesToPersistInZk( - serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper()); - managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger()); - managedLedgerConfig - .setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(), - TimeUnit.MINUTES); - managedLedgerConfig - .setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(), - TimeUnit.MINUTES); - managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes()); + if (retentionPolicies == null) { + retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( + () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), + serviceConfig.getDefaultRetentionSizeInMB()) + ); + } - managedLedgerConfig.setMetadataOperationsTimeoutSeconds( - serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds()); - managedLedgerConfig - .setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds()); - managedLedgerConfig - .setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds()); - managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize()); - managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled( - serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled()); - managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum()); - managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum()); - managedLedgerConfig - .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger()); + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble()); + managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum()); + managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum()); + if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) { managedLedgerConfig - .setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds()); - managedLedgerConfig - .setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); - managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); - managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData()); - managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery()); - managedLedgerConfig.setInactiveLedgerRollOverTime( - serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS); - managedLedgerConfig.setCacheEvictionByMarkDeletedPosition( - serviceConfig.isCacheEvictionByMarkDeletedPosition()); - - OffloadPoliciesImpl nsLevelOffloadPolicies = - (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); - OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration( - topicLevelOffloadPolicies, - OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), - getPulsar().getConfig().getProperties()); - if (NamespaceService.isSystemServiceNamespace(namespace.toString())) { - managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE); - } else { - if (topicLevelOffloadPolicies != null) { - try { - LedgerOffloader topicLevelLedgerOffLoader = - pulsar().createManagedLedgerOffloader(offloadPolicies); - managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader); - } catch (PulsarServerException e) { - throw new RuntimeException(e); - } - } else { - //If the topic level policy is null, use the namespace level - managedLedgerConfig - .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); + .setBookKeeperEnsemblePlacementPolicyClassName( + IsolatedBookieEnsemblePlacementPolicy.class); + Map properties = Maps.newHashMap(); + properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, + localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary()); + properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, + localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary()); + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); + } + managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()); + managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType()); + managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword()); + + managedLedgerConfig + .setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist()); + managedLedgerConfig.setMaxUnackedRangesToPersistInZk( + serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper()); + managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger()); + managedLedgerConfig + .setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(), + TimeUnit.MINUTES); + managedLedgerConfig + .setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(), + TimeUnit.MINUTES); + managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes()); + + managedLedgerConfig.setMetadataOperationsTimeoutSeconds( + serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds()); + managedLedgerConfig + .setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds()); + managedLedgerConfig + .setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds()); + managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize()); + managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled( + serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled()); + managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum()); + managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum()); + managedLedgerConfig + .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger()); + + managedLedgerConfig + .setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds()); + managedLedgerConfig + .setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); + managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); + managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData()); + managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery()); + managedLedgerConfig.setInactiveLedgerRollOverTime( + serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS); + managedLedgerConfig.setCacheEvictionByMarkDeletedPosition( + serviceConfig.isCacheEvictionByMarkDeletedPosition()); + + OffloadPoliciesImpl nsLevelOffloadPolicies = + (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); + OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration( + topicLevelOffloadPolicies, + OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), + getPulsar().getConfig().getProperties()); + if (NamespaceService.isSystemServiceNamespace(namespace.toString())) { + managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE); + } else { + if (topicLevelOffloadPolicies != null) { + try { + LedgerOffloader topicLevelLedgerOffLoader = + pulsar().createManagedLedgerOffloader(offloadPolicies); + managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader); + } catch (PulsarServerException e) { + throw new RuntimeException(e); } + } else { + //If the topic level policy is null, use the namespace level + managedLedgerConfig + .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); } + } - managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled( - serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled()); - managedLedgerConfig.setNewEntriesCheckDelayInMillis( - serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis()); + managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled( + serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled()); + managedLedgerConfig.setNewEntriesCheckDelayInMillis( + serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis()); - return managedLedgerConfig; - }); + return managedLedgerConfig; + }); + }); } private void addTopicToStatsMaps(TopicName topicName, Topic topic) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 419c6d88b22a1..daa2b249a9ca6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import static java.util.Objects.requireNonNull; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -25,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -78,8 +80,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private final Map>> readerCaches = new ConcurrentHashMap<>(); - @VisibleForTesting - final Map policyCacheInitMap = new ConcurrentHashMap<>(); + + final Map> policyCacheInitMap = new ConcurrentHashMap<>(); @VisibleForTesting final Map>> listeners = new ConcurrentHashMap<>(); @@ -219,12 +221,12 @@ public TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws TopicPoliciesCacheNotInitException { if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) { NamespaceName namespace = topicName.getNamespaceObject(); - prepareInitPoliciesCache(namespace, new CompletableFuture<>()); + prepareInitPoliciesCacheAsync(namespace); } MutablePair result = new MutablePair<>(); policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, initialized) -> { - if (initialized == null || !initialized) { + if (initialized == null || !initialized.isDone()) { result.setLeft(new TopicPoliciesCacheNotInitException()); } else { TopicPolicies topicPolicies = @@ -242,6 +244,34 @@ public TopicPolicies getTopicPolicies(TopicName topicName, } } + @Nonnull + @Override + public CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName, + boolean isGlobal) { + requireNonNull(topicName); + final CompletableFuture preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject()); + return preparedFuture.thenApply(__ -> { + final TopicPolicies candidatePolicies = isGlobal + ? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())) + : policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); + return Optional.ofNullable(candidatePolicies); + }); + } + + @Nonnull + @Override + public CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName) { + requireNonNull(topicName); + final CompletableFuture preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject()); + return preparedFuture.thenApply(__ -> { + final TopicPolicies localPolicies = policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); + if (localPolicies != null) { + return Optional.of(localPolicies); + } + return Optional.ofNullable(globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))); + }); + } + @Override public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) { return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); @@ -265,40 +295,49 @@ public CompletableFuture getTopicPoliciesBypassCacheAsync(TopicNa @Override public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { - CompletableFuture result = new CompletableFuture<>(); NamespaceName namespace = namespaceBundle.getNamespaceObject(); if (NamespaceService.checkHeartbeatNamespace(namespace) != null || NamespaceService.checkHeartbeatNamespaceV2(namespace) != null) { - result.complete(null); - return result; + return CompletableFuture.completedFuture(null); } synchronized (this) { if (readerCaches.get(namespace) != null) { ownedBundlesCountPerNamespace.get(namespace).incrementAndGet(); - result.complete(null); + return CompletableFuture.completedFuture(null); } else { - prepareInitPoliciesCache(namespace, result); + return prepareInitPoliciesCacheAsync(namespace); } } - return result; } - private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture result) { - if (policyCacheInitMap.putIfAbsent(namespace, false) == null) { - CompletableFuture> readerCompletableFuture = + private @Nonnull CompletableFuture prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) { + requireNonNull(namespace); + return policyCacheInitMap.computeIfAbsent(namespace, (k) -> { + final CompletableFuture> readerCompletableFuture = createSystemTopicClientWithRetry(namespace); readerCaches.put(namespace, readerCompletableFuture); ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1)); - readerCompletableFuture.thenAccept(reader -> { - initPolicesCache(reader, result); - result.thenRun(() -> readMorePolicies(reader)); - }).exceptionally(ex -> { - log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); - cleanCacheAndCloseReader(namespace, false); - result.completeExceptionally(ex); + final CompletableFuture initFuture = readerCompletableFuture + .thenCompose(reader -> { + final CompletableFuture stageFuture = new CompletableFuture<>(); + initPolicesCache(reader, stageFuture); + return stageFuture + // Read policies in background + .thenAccept(__ -> readMorePoliciesAsync(reader)); + }); + initFuture.exceptionally(ex -> { + try { + log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); + cleanCacheAndCloseReader(namespace, false); + } catch (Throwable cleanupEx) { + // Adding this catch to avoid break callback chain + log.error("[{}] Failed to cleanup reader on __change_events topic", namespace, cleanupEx); + } return null; }); - } + // let caller know we've got an exception. + return initFuture; + }); } protected CompletableFuture> createSystemTopicClientWithRetry( @@ -382,8 +421,7 @@ private void initPolicesCache(SystemTopicClient.Reader reader, Comp if (log.isDebugEnabled()) { log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName()); } - policyCacheInitMap.computeIfPresent( - reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true); + // replay policy message policiesCache.forEach(((topicName, topicPolicies) -> { if (listeners.get(topicName) != null) { @@ -396,6 +434,7 @@ private void initPolicesCache(SystemTopicClient.Reader reader, Comp } } })); + future.complete(null); } }); @@ -421,7 +460,13 @@ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean }); } - private void readMorePolicies(SystemTopicClient.Reader reader) { + /** + * This is an async method for the background reader to continue syncing new messages. + * + * Note: You should not do any blocking call here. because it will affect + * #{@link SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method to block loading topic. + */ + private void readMorePoliciesAsync(SystemTopicClient.Reader reader) { reader.readNextAsync() .thenAccept(msg -> { refreshTopicPoliciesCache(msg); @@ -429,7 +474,7 @@ private void readMorePolicies(SystemTopicClient.Reader reader) { }) .whenComplete((__, ex) -> { if (ex == null) { - readMorePolicies(reader); + readMorePoliciesAsync(reader); } else { Throwable cause = FutureUtil.unwrapCompletionException(ex); if (cause instanceof PulsarClientException.AlreadyClosedException) { @@ -438,7 +483,7 @@ private void readMorePolicies(SystemTopicClient.Reader reader) { reader.getSystemTopic().getTopicName().getNamespaceObject(), false); } else { log.warn("Read more topic polices exception, read again.", ex); - readMorePolicies(reader); + readMorePoliciesAsync(reader); } } }); @@ -591,7 +636,7 @@ boolean checkReaderIsCached(NamespaceName namespaceName) { } @VisibleForTesting - public Boolean getPoliciesCacheInit(NamespaceName namespaceName) { + public CompletableFuture getPoliciesCacheInit(NamespaceName namespaceName) { return policyCacheInitMap.get(namespaceName); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index 5b2aa6e8ce974..98c8e71bee090 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.BackoffBuilder; @@ -109,6 +110,32 @@ default CompletableFuture> getTopicPoliciesAsyncWithRetr return response; } + /** + * Asynchronously retrieves topic policies. + * This triggers the Pulsar broker's internal client to load policies from the + * system topic `persistent://tenant/namespace/__change_event`. + * + * @param topicName The name of the topic. + * @param isGlobal Indicates if the policies are global. + * @return A CompletableFuture containing an Optional of TopicPolicies. + * @throws NullPointerException If the topicName is null. + */ + @Nonnull + CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName, boolean isGlobal); + + /** + * Asynchronously retrieves topic policies. + * This triggers the Pulsar broker's internal client to load policies from the + * system topic `persistent://tenant/namespace/__change_event`. + * + * NOTE: If local policies are not available, it will fallback to using topic global policies. + * @param topicName The name of the topic. + * @return A CompletableFuture containing an Optional of TopicPolicies. + * @throws NullPointerException If the topicName is null. + */ + @Nonnull + CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName); + /** * Get policies for a topic without cache async. * @param topicName topic name @@ -162,6 +189,19 @@ public TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) return null; } + @Nonnull + @Override + public CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName, + boolean isGlobal) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Nonnull + @Override + public CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName) { + return CompletableFuture.completedFuture(Optional.empty()); + } + @Override public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) { return null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index f3e3188de4bac..adfa95e8f4de3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -169,7 +169,7 @@ public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws Excep assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic)); //make sure namespace policy reader is fully started. Awaitility.await().untilAsserted(()-> { - assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject())); + assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()).isDone()); }); //load the topic. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java new file mode 100644 index 0000000000000..f39981eec4812 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Test(groups = "broker-admin") +public class TopicPoliciesWithBrokerRestartTest extends MockedPulsarServiceBaseTest { + + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setSystemTopicEnabled(true); + conf.setTopicLevelPoliciesEnabled(true); + } + + @Override + @BeforeClass(alwaysRun = true) + protected void setup() throws Exception { + super.internalSetup(); + setupDefaultTenantAndNamespace(); + } + + @Override + @AfterClass(alwaysRun = true) + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + + @Test + public void testRetentionWithBrokerRestart() throws Exception { + final int messages = 1_000; + final int topicNum = 500; + // (1) Init topic + admin.namespaces().createNamespace("public/retention"); + final String topicName = "persistent://public/retention/retention_with_broker_restart"; + admin.topics().createNonPartitionedTopic(topicName); + for (int i = 0; i < topicNum; i++) { + final String shadowTopicNames = topicName + "_" + i; + admin.topics().createNonPartitionedTopic(shadowTopicNames); + } + // (2) Set retention + final RetentionPolicies retentionPolicies = new RetentionPolicies(20, 20); + for (int i = 0; i < topicNum; i++) { + final String shadowTopicNames = topicName + "_" + i; + admin.topicPolicies().setRetention(shadowTopicNames, retentionPolicies); + } + admin.topicPolicies().setRetention(topicName, retentionPolicies); + // (3) Send messages + @Cleanup + final Producer publisher = pulsarClient.newProducer() + .topic(topicName) + .create(); + for (int i = 0; i < messages; i++) { + publisher.send((i + "").getBytes(StandardCharsets.UTF_8)); + } + // (4) Check configuration + Awaitility.await().untilAsserted(() -> { + final PersistentTopic persistentTopic1 = (PersistentTopic) + pulsar.getBrokerService().getTopic(topicName, true).join().get(); + final ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger(); + Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(), 20); + Assert.assertEquals(managedLedger1.getConfig().getRetentionTimeMillis(), + TimeUnit.MINUTES.toMillis(20)); + }); + // (5) Restart broker + restartBroker(); + // (6) Check configuration again + for (int i = 0; i < topicNum; i++) { + final String shadowTopicNames = topicName + "_" + i; + admin.lookups().lookupTopic(shadowTopicNames); + final PersistentTopic persistentTopicTmp = (PersistentTopic) + pulsar.getBrokerService().getTopic(shadowTopicNames, true).join().get(); + final ManagedLedgerImpl managedLedgerTemp = (ManagedLedgerImpl) persistentTopicTmp.getManagedLedger(); + Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionSizeInMB(), 20); + Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionTimeMillis(), + TimeUnit.MINUTES.toMillis(20)); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index c90c4a3a5bc31..e69230b2d2dcb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -142,7 +142,7 @@ public void testGetPolicy() throws ExecutionException, InterruptedException, Top // Wait for all topic policies updated. Awaitility.await().untilAsserted(() -> Assert.assertTrue(systemTopicBasedTopicPoliciesService - .getPoliciesCacheInit(TOPIC1.getNamespaceObject()))); + .getPoliciesCacheInit(TOPIC1.getNamespaceObject()).isDone())); // Assert broker is cache all topic policies Awaitility.await().untilAsserted(() -> @@ -305,8 +305,8 @@ private void prepareData() throws PulsarAdminException { @Test public void testGetPolicyTimeout() throws Exception { SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); - Awaitility.await().untilAsserted(() -> assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()))); - service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), false); + Awaitility.await().untilAsserted(() -> assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()).isDone())); + service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), new CompletableFuture<>()); long start = System.currentTimeMillis(); Backoff backoff = new BackoffBuilder() .setInitialTime(500, TimeUnit.MILLISECONDS) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 3b018e70481b6..dac7f4ab2aaa8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -923,9 +923,9 @@ public void testMergeLookupRequests() throws Exception { } private int calculateLookupRequestCount() throws Exception { - int failures = CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_failures_total") + int failures = CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_failures") .intValue(); - int answers = CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_answers_total") + int answers = CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_answers") .intValue(); return failures + answers; }