diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index ae288fcf7f238..97a537199dc0b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -28,7 +28,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy; @@ -48,6 +49,8 @@ import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping; +import org.apache.zookeeper.KeeperException; @Slf4j public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy { @@ -187,17 +190,13 @@ private Set getBlacklistedBookiesWithIsolationGroups(int ensembleSize, Set blacklistedBookies = new HashSet<>(); try { if (bookieMappingCache != null) { - CompletableFuture> future = - bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH); - - Optional optRes = (future.isDone() && !future.isCompletedExceptionally()) - ? future.join() : Optional.empty(); - - if (!optRes.isPresent()) { - return blacklistedBookies; + Optional optional = + bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).get(30, TimeUnit.SECONDS); + if (!optional.isPresent()) { + throw new KeeperException.NoNodeException(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH); } - BookiesRackConfiguration allGroupsBookieMapping = optRes.get(); + BookiesRackConfiguration allGroupsBookieMapping = optional.get(); Set allBookies = allGroupsBookieMapping.keySet(); int totalAvailableBookiesInPrimaryGroup = 0; Set primaryIsolationGroup = Collections.emptySet(); @@ -246,6 +245,8 @@ private Set getBlacklistedBookiesWithIsolationGroups(int ensembleSize, } } } + } catch (TimeoutException e) { + log.warn("Getting bookie isolation info from metadata store timeout."); } catch (Exception e) { log.warn("Error getting bookie isolation info from metadata store: {}", e.getMessage()); }