Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][broker] Fix the deadlock when using BookieRackAffinityMapping w…
Browse files Browse the repository at this point in the history
…ith rackaware policy (apache#21481)
  • Loading branch information
erobot authored Nov 10, 2023
1 parent 3c067ce commit 2efef87
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ private void handleUpdates(Notification n) {

bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
.thenAccept(optVal -> {
Set<BookieId> bookieIdSet = new HashSet<>();
synchronized (this) {
LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", optVal);
this.updateRacksWithHost(optVal.orElseGet(BookiesRackConfiguration::new));
Expand All @@ -259,12 +260,12 @@ private void handleUpdates(Notification n) {
LOG.debug("Bookies with rack update from {} to {}", bookieAddressListLastTime,
bookieAddressList);
}
Set<BookieId> bookieIdSet = new HashSet<>(bookieAddressList);
bookieIdSet.addAll(bookieAddressList);
bookieIdSet.addAll(bookieAddressListLastTime);
bookieAddressListLastTime = bookieAddressList;
if (rackawarePolicy != null) {
rackawarePolicy.onBookieRackChange(new ArrayList<>(bookieIdSet));
}
}
if (rackawarePolicy != null) {
rackawarePolicy.onBookieRackChange(new ArrayList<>(bookieIdSet));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,27 @@
import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.HashedWheelTimer;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.client.DefaultBookieAddressResolver;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
Expand All @@ -46,6 +52,7 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
Expand All @@ -55,6 +62,8 @@
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.bookkeeper.BookieServiceInfoSerde;
import org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -342,4 +351,63 @@ public void testWithPulsarRegistrationClient() throws Exception {

timer.stop();
}

@Test
public void testNoDeadlockWithRackawarePolicy() throws Exception {
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);

BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
mapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
mapping.setConf(bkClientConf);

@Cleanup("stop")
HashedWheelTimer timer = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
bkClientConf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
bkClientConf.getTimeoutTimerNumTicks());

RackawareEnsemblePlacementPolicy repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(bkClientConf, Optional.of(mapping), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);

mapping.registerRackChangeListener(repp);

@Cleanup("shutdownNow")
ExecutorService executor1 = Executors.newSingleThreadExecutor();
@Cleanup("shutdownNow")
ExecutorService executor2 = Executors.newSingleThreadExecutor();

CountDownLatch count = new CountDownLatch(2);

executor1.submit(() -> {
try {
Method handleUpdates =
BookieRackAffinityMapping.class.getDeclaredMethod("handleUpdates", Notification.class);
handleUpdates.setAccessible(true);
Notification n =
new Notification(NotificationType.Modified, BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 2_000) {
handleUpdates.invoke(mapping, n);
}
count.countDown();
} catch (Exception e) {
throw new RuntimeException(e);
}
});

executor2.submit(() -> {
Set<BookieId> writableBookies = new HashSet<>();
writableBookies.add(BOOKIE1.toBookieId());
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 2_000) {
repp.onClusterChanged(writableBookies, Collections.emptySet());
repp.onClusterChanged(Collections.emptySet(), Collections.emptySet());
}
count.countDown();
});

assertTrue(count.await(3, TimeUnit.SECONDS));
}
}

0 comments on commit 2efef87

Please sign in to comment.