Skip to content

Commit

Permalink
Fix isolated group not work problem.
Browse files Browse the repository at this point in the history
  • Loading branch information
horizonzy committed Aug 30, 2023
1 parent 810a2f0 commit faa1a91
Showing 1 changed file with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
Expand All @@ -48,6 +49,8 @@
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.zookeeper.KeeperException;

@Slf4j
public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy {
Expand Down Expand Up @@ -187,17 +190,13 @@ private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize,
Set<BookieId> blacklistedBookies = new HashSet<>();
try {
if (bookieMappingCache != null) {
CompletableFuture<Optional<BookiesRackConfiguration>> future =
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);

Optional<BookiesRackConfiguration> optRes = (future.isDone() && !future.isCompletedExceptionally())
? future.join() : Optional.empty();

if (!optRes.isPresent()) {
return blacklistedBookies;
Optional<BookiesRackConfiguration> optional =
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).get(30, TimeUnit.SECONDS);
if (!optional.isPresent()) {
throw new KeeperException.NoNodeException(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
}

BookiesRackConfiguration allGroupsBookieMapping = optRes.get();
BookiesRackConfiguration allGroupsBookieMapping = optional.get();
Set<String> allBookies = allGroupsBookieMapping.keySet();
int totalAvailableBookiesInPrimaryGroup = 0;
Set<String> primaryIsolationGroup = Collections.emptySet();
Expand Down Expand Up @@ -246,6 +245,8 @@ private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize,
}
}
}
} catch (TimeoutException e) {
log.warn("Getting bookie isolation info from metadata store timeout.");
} catch (Exception e) {
log.warn("Error getting bookie isolation info from metadata store: {}", e.getMessage());
}
Expand Down

0 comments on commit faa1a91

Please sign in to comment.