Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][test] Fix resource leaks in pulsar-metadata bookkeeper.replicat…
Browse files Browse the repository at this point in the history
…ion tests (apache#21483)
  • Loading branch information
lhotari authored Oct 31, 2023
1 parent 69740c8 commit b189ea7
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 18 deletions.
7 changes: 6 additions & 1 deletion pulsar-metadata/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
<relativePath>..</relativePath>
</parent>

<properties>
<!-- workaround for OOM error 134 in CI -->
<testForkCount>2</testForkCount>
</properties>

<artifactId>pulsar-metadata</artifactId>
<name>Pulsar Metadata</name>
<dependencies>
Expand Down Expand Up @@ -149,7 +154,7 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ public AuditorCheckAllLedgersTaskTest() {
@Override
public void setUp() throws Exception {
super.setUp();
final BookKeeper bookKeeper = new BookKeeper(baseClientConf);
final BookKeeper bookKeeper = registerCloseable(new BookKeeper(baseClientConf));
admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, new ClientConfiguration(baseClientConf));

String ledgersRoot = "/ledgers";
String storeUri = metadataServiceUri.replaceAll("zk://", "").replaceAll("/ledgers", "");
MetadataStoreExtended store = MetadataStoreExtended.create(storeUri,
MetadataStoreConfig.builder().fsyncEnable(false).build());
MetadataStoreExtended store = registerCloseable(MetadataStoreExtended.create(storeUri,
MetadataStoreConfig.builder().fsyncEnable(false).build()));
LayoutManager layoutManager = new PulsarLayoutManager(store, ledgersRoot);
PulsarLedgerManagerFactory ledgerManagerFactory = new PulsarLedgerManagerFactory();
PulsarLedgerManagerFactory ledgerManagerFactory = registerCloseable(new PulsarLedgerManagerFactory());

ClientConfiguration conf = new ClientConfiguration();
conf.setZkLedgersRootPath(ledgersRoot);
Expand All @@ -86,7 +86,7 @@ public void setUp() throws Exception {
acquireConcurrentOpenLedgerOperationsTimeoutMSec);
}

@AfterMethod
@AfterMethod(alwaysRun = true)
@Override
public void tearDown() throws Exception {
if (ledgerManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,14 @@ public void setUp() throws Exception {

String ledgersRoot = "/ledgers";
String storeUri = metadataServiceUri.replaceAll("zk://", "").replaceAll("/ledgers", "");
MetadataStoreExtended store = MetadataStoreExtended.create(storeUri,
MetadataStoreConfig.builder().fsyncEnable(false).build());
MetadataStoreExtended store = registerCloseable(MetadataStoreExtended.create(storeUri,
MetadataStoreConfig.builder().fsyncEnable(false).build()));
LayoutManager layoutManager = new PulsarLayoutManager(store, ledgersRoot);
PulsarLedgerManagerFactory ledgerManagerFactory = new PulsarLedgerManagerFactory();
PulsarLedgerManagerFactory ledgerManagerFactory = registerCloseable(new PulsarLedgerManagerFactory());
ClientConfiguration conf = new ClientConfiguration();
conf.setZkLedgersRootPath(ledgersRoot);
ledgerManagerFactory.initialize(conf, layoutManager, 1);
urLedgerMgr = ledgerManagerFactory.newLedgerUnderreplicationManager();
urLedgerMgr = registerCloseable(ledgerManagerFactory.newLedgerUnderreplicationManager());
urLedgerMgr.setCheckAllLedgersCTime(System.currentTimeMillis());

baseClientConf.setMetadataServiceUri(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,19 @@ public void setUp() throws Exception {
super.setUp();
baseClientConf.setMetadataServiceUri(
metadataServiceUri.replaceAll("zk://", "metadata-store:").replaceAll("/ledgers", ""));
final BookKeeper bookKeeper = new BookKeeper(baseClientConf);
final BookKeeper bookKeeper = registerCloseable(new BookKeeper(baseClientConf));
admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, new ClientConfiguration(baseClientConf));
LedgerManagerFactory ledgerManagerFactory = bookKeeper.getLedgerManagerFactory();
LedgerManagerFactory ledgerManagerFactory = registerCloseable(bookKeeper.getLedgerManagerFactory());
ledgerManager = ledgerManagerFactory.newLedgerManager();
ledgerUnderreplicationManager = ledgerManagerFactory.newLedgerUnderreplicationManager();
}

@AfterMethod
@AfterMethod(alwaysRun = true)
@Override
public void tearDown() throws Exception {
if (admin != null) {
admin.close();
}
if (ledgerManager != null) {
ledgerManager.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ public void setUp() throws Exception {
super.setUp();
baseClientConf.setMetadataServiceUri(
metadataServiceUri.replaceAll("zk://", "metadata-store:").replaceAll("/ledgers", ""));
final BookKeeper bookKeeper = new BookKeeper(baseClientConf);
final BookKeeper bookKeeper = registerCloseable(new BookKeeper(baseClientConf));
admin = new BookKeeperAdmin(bookKeeper, NullStatsLogger.INSTANCE, new ClientConfiguration(baseClientConf));
LedgerManagerFactory ledgerManagerFactory = bookKeeper.getLedgerManagerFactory();
LedgerManagerFactory ledgerManagerFactory = registerCloseable(bookKeeper.getLedgerManagerFactory());
ledgerManager = ledgerManagerFactory.newLedgerManager();
ledgerUnderreplicationManager = ledgerManagerFactory.newLedgerUnderreplicationManager();
}

@AfterMethod
@AfterMethod(alwaysRun = true)
@Override
public void tearDown() throws Exception {
if (ledgerManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public void handleTestMethodName(Method method) {
protected ExecutorService executor;
private final List<Integer> bookiePorts = new ArrayList<>();

private final List<AutoCloseable> closeables = new ArrayList<>();

SynchronousQueue<Throwable> asyncExceptions = new SynchronousQueue<>();
protected void captureThrowable(Runnable c) {
try {
Expand Down Expand Up @@ -187,6 +189,9 @@ protected String changeLedgerPath() {

@AfterTest(alwaysRun = true)
public void tearDown() throws Exception {
callCloseables(closeables);
closeables.clear();

boolean failed = false;
for (Throwable e : asyncExceptions) {
LOG.error("Got async exception: ", e);
Expand Down Expand Up @@ -228,6 +233,21 @@ public void tearDown() throws Exception {
}
}

protected <T extends AutoCloseable> T registerCloseable(T closeable) {
closeables.add(closeable);
return closeable;
}

private static void callCloseables(List<AutoCloseable> closeables) {
for (int i = closeables.size() - 1; i >= 0; i--) {
try {
closeables.get(i).close();
} catch (Exception e) {
LOG.error("Failure in calling close method", e);
}
}
}

/**
* Start zookeeper cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public void setUp() throws Exception {

String ledgersRoot = "/ledgers";
String storeUri = metadataServiceUri.replaceAll("zk://", "").replaceAll("/ledgers", "");
MetadataStoreExtended store = MetadataStoreExtended.create(storeUri,
MetadataStoreConfig.builder().fsyncEnable(false).build());
MetadataStoreExtended store = registerCloseable(MetadataStoreExtended.create(storeUri,
MetadataStoreConfig.builder().fsyncEnable(false).build()));
LayoutManager layoutManager = new PulsarLayoutManager(store, ledgersRoot);
newLedgerManagerFactory = new PulsarLedgerManagerFactory();

Expand Down

0 comments on commit b189ea7

Please sign in to comment.