Skip to content

Commit

Permalink
[branch-2.10] [fix] [broker] Fix isolated group not work problem. (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
horizonzy authored Sep 6, 2023
1 parent 1300eb5 commit 0026719
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@
package org.apache.pulsar.bookie.rackawareness;

import static org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
Expand Down Expand Up @@ -60,6 +59,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac

private MetadataCache<BookiesRackConfiguration> bookieMappingCache;

private volatile BookiesRackConfiguration cachedRackConfiguration = null;

public IsolatedBookieEnsemblePlacementPolicy() {
super();
Expand Down Expand Up @@ -87,7 +87,12 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,

// Only add the bookieMappingCache if we have defined an isolation group
bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).thenAccept(opt -> opt.ifPresent(
bookiesRackConfiguration -> cachedRackConfiguration = bookiesRackConfiguration))
.exceptionally(e -> {
log.warn("Failed to load bookies rack configuration while initialize the PlacementPolicy.");
return null;
});
}
}
if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
Expand All @@ -107,7 +112,6 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, Set<BookieId> excludeBookies)
throws BKNotEnoughBookiesException {
Map<String, List<String>> isolationGroup = new HashMap<>();
Set<BookieId> blacklistedBookies = getBlacklistedBookiesWithIsolationGroups(
ensembleSize, defaultIsolationGroups);
if (excludeBookies == null) {
Expand Down Expand Up @@ -182,22 +186,23 @@ private static Pair<Set<String>, Set<String>> getIsolationGroup(
return pair;
}

private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize,
@VisibleForTesting
Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize,
Pair<Set<String>, Set<String>> isolationGroups) {
Set<BookieId> blacklistedBookies = new HashSet<>();
try {
if (bookieMappingCache != null) {
CompletableFuture<Optional<BookiesRackConfiguration>> future =
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)
.thenAccept(opt -> cachedRackConfiguration = opt.orElse(null)).exceptionally(e -> {
log.warn("Failed to update the newest bookies rack config.");
return null;
});

Optional<BookiesRackConfiguration> optRes = (future.isDone() && !future.isCompletedExceptionally())
? future.join() : Optional.empty();

if (!optRes.isPresent()) {
BookiesRackConfiguration allGroupsBookieMapping = cachedRackConfiguration;
if (allGroupsBookieMapping == null) {
log.debug("The bookies rack config is not available at now.");
return blacklistedBookies;
}

BookiesRackConfiguration allGroupsBookieMapping = optRes.get();
Set<String> allBookies = allGroupsBookieMapping.keySet();
int totalAvailableBookiesInPrimaryGroup = 0;
Set<String> primaryIsolationGroup = Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
*/
package org.apache.pulsar.bookie.rackawareness;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -34,18 +37,23 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -87,6 +95,104 @@ void teardown() throws Exception {
timer.stop();
}

@Test
public void testMetadataStoreCases() throws Exception {
Map<String, BookieInfo> mainBookieGroup = new HashMap<>();
mainBookieGroup.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
mainBookieGroup.put(BOOKIE2, BookieInfo.builder().rack("rack1").build());

Map<String, BookieInfo> secondaryBookieGroup = new HashMap<>();
secondaryBookieGroup.put(BOOKIE3, BookieInfo.builder().rack("rack0").build());

store = mock(MetadataStoreExtended.class);
MetadataCacheImpl cache = mock(MetadataCacheImpl.class);
when(store.getMetadataCache(BookiesRackConfiguration.class)).thenReturn(cache);
CompletableFuture<Optional<BookiesRackConfiguration>> initialFuture = new CompletableFuture<>();
//The initialFuture only has group1.
BookiesRackConfiguration rackConfiguration1 = new BookiesRackConfiguration();
rackConfiguration1.put("group1", mainBookieGroup);
initialFuture.complete(Optional.of(rackConfiguration1));

long waitTime = 2000;
CompletableFuture<Optional<BookiesRackConfiguration>> waitingCompleteFuture = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//The waitingCompleteFuture has group1 and group2.
BookiesRackConfiguration rackConfiguration2 = new BookiesRackConfiguration();
rackConfiguration2.put("group1", mainBookieGroup);
rackConfiguration2.put("group2", secondaryBookieGroup);
waitingCompleteFuture.complete(Optional.of(rackConfiguration2));
}).start();

long longWaitTime = 4000;
CompletableFuture<Optional<BookiesRackConfiguration>> emptyFuture = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(longWaitTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//The emptyFuture means that the zk node /bookies already be removed.
emptyFuture.complete(Optional.empty());
}).start();

//Return different future means that cache expire.
when(cache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH))
.thenReturn(initialFuture).thenReturn(initialFuture)
.thenReturn(waitingCompleteFuture).thenReturn(waitingCompleteFuture)
.thenReturn(emptyFuture).thenReturn(emptyFuture);

IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);

MutablePair<Set<String>, Set<String>> groups = new MutablePair<>();
groups.setLeft(Sets.newHashSet("group1"));
groups.setRight(new HashSet<>());

//initialFuture, the future is waiting done.
Set<BookieId> blacklist =
isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2, groups);
assertTrue(blacklist.isEmpty());

//waitingCompleteFuture, the future is waiting done.
blacklist =
isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2, groups);
assertTrue(blacklist.isEmpty());

Thread.sleep(waitTime);

//waitingCompleteFuture, the future is already done.
blacklist =
isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2, groups);
assertFalse(blacklist.isEmpty());
assertEquals(blacklist.size(), 1);
BookieId excludeBookie = blacklist.iterator().next();
assertEquals(excludeBookie.toString(), BOOKIE3);

//emptyFuture, the future is waiting done.
blacklist =
isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2, groups);
assertFalse(blacklist.isEmpty());
assertEquals(blacklist.size(), 1);
excludeBookie = blacklist.iterator().next();
assertEquals(excludeBookie.toString(), BOOKIE3);

Thread.sleep(longWaitTime - waitTime);

//emptyFuture, the future is already done.
blacklist =
isolationPolicy.getBlacklistedBookiesWithIsolationGroups(2, groups);
assertTrue(blacklist.isEmpty());
}

@Test
public void testBasic() throws Exception {
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
Expand Down

0 comments on commit 0026719

Please sign in to comment.