Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][test] Add MockedPulsarServiceBaseTest.registerCloseable and con…
Browse files Browse the repository at this point in the history
…tinue fixing thread leaks (apache#21477)
  • Loading branch information
lhotari authored Oct 31, 2023
1 parent bc84721 commit 69740c8
Show file tree
Hide file tree
Showing 45 changed files with 197 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -140,6 +142,8 @@ protected static String getTlsFileForClient(String name) {

protected boolean enableBrokerInterceptor = false;

private final List<AutoCloseable> closeables = new ArrayList<>();

public MockedPulsarServiceBaseTest() {
resetConfig();
}
Expand Down Expand Up @@ -274,6 +278,8 @@ protected final void internalCleanup() throws Exception {
pulsarTestContext = null;
}
resetConfig();
callCloseables(closeables);
closeables.clear();
onCleanup();
}

Expand All @@ -291,6 +297,21 @@ protected void onCleanup() {

}

protected <T extends AutoCloseable> T registerCloseable(T closeable) {
closeables.add(closeable);
return closeable;
}

private static void callCloseables(List<AutoCloseable> closeables) {
for (int i = closeables.size() - 1; i >= 0; i--) {
try {
closeables.get(i).close();
} catch (Exception e) {
log.error("Failure in calling close method", e);
}
}
}

protected abstract void setup() throws Exception;

protected abstract void cleanup() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import lombok.Cleanup;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.testng.annotations.Test;

public class PrecisePublishLimiterTest {

@Test
void shouldResetMsgLimitAfterUpdate() {
@Cleanup
PrecisePublishLimiter precisePublishLimiter = new PrecisePublishLimiter(new PublishRate(), () -> {
});
precisePublishLimiter.update(new PublishRate(1, 1));
Expand All @@ -37,6 +39,7 @@ void shouldResetMsgLimitAfterUpdate() {

@Test
void shouldResetBytesLimitAfterUpdate() {
@Cleanup
PrecisePublishLimiter precisePublishLimiter = new PrecisePublishLimiter(new PublishRate(), () -> {
});
precisePublishLimiter.update(new PublishRate(1, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void testTransactionMetaStoreAssignAndFailover() throws Exception {
pulsarServiceList.remove(crashedMetaStore);
crashedMetaStore.close();

pulsarClient.close();
pulsarClient = buildClient();
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
Expand Down Expand Up @@ -90,7 +91,7 @@ public void testTransactionMetaStoreUnload() throws Exception {
.removeTransactionMetadataStore(TransactionCoordinatorID.get(f)));
}
checkTransactionCoordinatorNum(0);
buildClient();
pulsarClient = buildClient();
checkTransactionCoordinatorNum(16);

pulsarClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import com.google.common.collect.Sets;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.file.Path;
Expand Down Expand Up @@ -112,21 +113,22 @@ protected final void clientSetup() throws Exception {
Path path = Paths.get(CREDENTIALS_FILE).toAbsolutePath();
log.info("Credentials File path: {}", path.toString());

// AuthenticationOAuth2
Authentication authentication = AuthenticationFactoryOAuth2.clientCredentials(
new URL(server.getIssuer()),
path.toUri().toURL(), // key file path
audience
);

closeAdmin();
admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(authentication)
.authentication(createAuthentication(path))
.build());

replacePulsarClient(PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
.statsInterval(0, TimeUnit.SECONDS)
.authentication(authentication));
.authentication(createAuthentication(path)));
}

private Authentication createAuthentication(Path path) throws MalformedURLException {
return AuthenticationFactoryOAuth2.clientCredentials(
new URL(server.getIssuer()),
path.toUri().toURL(), // key file path
audience
);
}

@AfterMethod(alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ public void testPartitionedTopicAutoCreation() {
Assert.assertEquals(nsAutoTopicCreationOverride, expectedPolicies);
// Background invalidate cache
final MetadataCache<Policies> nsCache = pulsar.getPulsarResources().getNamespaceResources().getCache();
@Cleanup("interrupt")
final Thread t1 = new Thread(() -> {
while (true) {
while (!Thread.currentThread().isInterrupted()) {
nsCache.invalidate("/admin/policies/" + namespace);
}
});
Expand All @@ -90,7 +91,5 @@ public void testPartitionedTopicAutoCreation() {
// double-check policies
final AutoTopicCreationOverride actualPolicies2 = admin.namespaces().getAutoTopicCreation(namespace);
Assert.assertEquals(actualPolicies2, expectedPolicies);

t1.interrupt();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ public void testPulsarFunctionBKCleanup() throws Exception {
log.info("dlog url: {}", url);
URI dlogUri = URI.create(url);

@Cleanup
Namespace dlogNamespace = NamespaceBuilder.newBuilder()
.conf(dlogConf)
.clientId("function-worker-" + workerConfig.getWorkerId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void setup() throws Exception {
}

service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected void setup() throws Exception {
config.setWebServicePort(Optional.of(0));
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
service.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void configTest(int numIoThreads, int connectionsPerBroker) throws Except
config.setServiceUrl("http://localhost:8080");
config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100");
WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
service.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void setup() throws Exception {
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
config.setCryptoKeyReaderFactoryClassName(CryptoKeyReaderFactoryImpl.class.getName());
service = spy(new WebSocketService(config));
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void setup() throws Exception {
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void setup() throws Exception {
config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
config.setWebSocketPingDurationSeconds(2);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void setup() throws Exception {
config.setClusterName("test");
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spy(new WebSocketService(config));
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void setup() throws Exception {
config.setClusterName("test");
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void setup() throws Exception {
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void setup() throws Exception {
config.setServiceUrl(pulsar.getSafeWebServiceAddress());
config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeper)).when(service)
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void setup() throws Exception {
}

service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public void close() throws IOException {
}

if (statsExecutor != null) {
statsExecutor.shutdown();
statsExecutor.shutdownNow();
}

if (proxyAdditionalServlets != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.util.datetime.FixedDateFormat;
Expand Down Expand Up @@ -101,6 +102,7 @@ public class ProxyServiceStarter {
private ProxyService proxyService;

private WebServer server;
private WebSocketService webSocketService;
private static boolean metricsInitialized;

public ProxyServiceStarter(String[] args) throws Exception {
Expand Down Expand Up @@ -228,7 +230,9 @@ public double get() {
metricsInitialized = true;
}

addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider());
AtomicReference<WebSocketService> webSocketServiceRef = new AtomicReference<>();
addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider(), webSocketServiceRef);
webSocketService = webSocketServiceRef.get();

// start web-service
server.start();
Expand All @@ -242,6 +246,9 @@ public void close() {
if (server != null) {
server.stop();
}
if (webSocketService != null) {
webSocketService.close();
}
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
} finally {
Expand All @@ -250,9 +257,17 @@ public void close() {
}

public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider) throws Exception {
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider) throws Exception {
addWebServerHandlers(server, config, service, discoveryProvider, null);
}

public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider,
AtomicReference<WebSocketService> webSocketServiceRef) throws Exception {
// We can make 'status.html' publicly accessible without authentication since
// it does not contain any sensitive data.
server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(),
Expand Down Expand Up @@ -301,6 +316,9 @@ public static void addWebServerHandlers(WebServer server,
serviceConfiguration.setBrokerClientTlsEnabled(config.isTlsEnabledWithBroker());
WebSocketService webSocketService = new WebSocketService(createClusterData(config), serviceConfiguration);
webSocketService.start();
if (webSocketServiceRef != null) {
webSocketServiceRef.set(webSocketService);
}
final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService);
server.addServlet(WebSocketProducerServlet.SERVLET_PATH,
new ServletHolder(producerWebSocketServlet));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ protected void setup() throws Exception {

proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();

proxyService.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ protected void setup() throws Exception {
proxyConfig.setBrokerClientAuthenticationParameters(String.format("keyStoreType:%s,keyStorePath:%s,keyStorePassword:%s",
KEYSTORE_TYPE, BROKER_KEYSTORE_FILE_PATH, BROKER_KEYSTORE_PW));

resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
new ZKMetadataStore(mockZooKeeperGlobal));
resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig)));
discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
doReturn(report).when(discoveryProvider).nextBroker();
ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ protected void setup() throws Exception {
proxyConfig.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
proxyConfig.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName()));

resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
new ZKMetadataStore(mockZooKeeperGlobal));
resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig)));
discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
doReturn(report).when(discoveryProvider).nextBroker();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ protected void setup() throws Exception {

proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();

Optional<Integer> proxyLogLevel = Optional.of(2);
assertEquals(proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel());
Expand Down
Loading

0 comments on commit 69740c8

Please sign in to comment.