diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 9df84b45775a9..b8d75bd0fbcac 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -25,10 +25,12 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -140,6 +142,8 @@ protected static String getTlsFileForClient(String name) { protected boolean enableBrokerInterceptor = false; + private final List closeables = new ArrayList<>(); + public MockedPulsarServiceBaseTest() { resetConfig(); } @@ -274,6 +278,8 @@ protected final void internalCleanup() throws Exception { pulsarTestContext = null; } resetConfig(); + callCloseables(closeables); + closeables.clear(); onCleanup(); } @@ -291,6 +297,21 @@ protected void onCleanup() { } + protected T registerCloseable(T closeable) { + closeables.add(closeable); + return closeable; + } + + private static void callCloseables(List closeables) { + for (int i = closeables.size() - 1; i >= 0; i--) { + try { + closeables.get(i).close(); + } catch (Exception e) { + log.error("Failure in calling close method", e); + } + } + } + protected abstract void setup() throws Exception; protected abstract void cleanup() throws Exception; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java index 73cb43d52b112..9d5cfe5b5d28e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java @@ -20,6 +20,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import lombok.Cleanup; import org.apache.pulsar.common.policies.data.PublishRate; import org.testng.annotations.Test; @@ -27,6 +28,7 @@ public class PrecisePublishLimiterTest { @Test void shouldResetMsgLimitAfterUpdate() { + @Cleanup PrecisePublishLimiter precisePublishLimiter = new PrecisePublishLimiter(new PublishRate(), () -> { }); precisePublishLimiter.update(new PublishRate(1, 1)); @@ -37,6 +39,7 @@ void shouldResetMsgLimitAfterUpdate() { @Test void shouldResetBytesLimitAfterUpdate() { + @Cleanup PrecisePublishLimiter precisePublishLimiter = new PrecisePublishLimiter(new PublishRate(), () -> { }); precisePublishLimiter.update(new PublishRate(1, 1)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java index ac1f570218704..25ce90e1cf091 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java @@ -61,6 +61,7 @@ public void testTransactionMetaStoreAssignAndFailover() throws Exception { pulsarServiceList.remove(crashedMetaStore); crashedMetaStore.close(); + pulsarClient.close(); pulsarClient = buildClient(); Awaitility.await().atMost(5, TimeUnit.SECONDS) .untilAsserted(() -> { @@ -90,7 +91,7 @@ public void testTransactionMetaStoreUnload() throws Exception { .removeTransactionMetadataStore(TransactionCoordinatorID.get(f))); } checkTransactionCoordinatorNum(0); - buildClient(); + pulsarClient = buildClient(); checkTransactionCoordinatorNum(16); pulsarClient.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java index ecf1278eab75c..c24e192361921 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import com.google.common.collect.Sets; +import java.net.MalformedURLException; import java.net.URI; import java.net.URL; import java.nio.file.Path; @@ -112,21 +113,22 @@ protected final void clientSetup() throws Exception { Path path = Paths.get(CREDENTIALS_FILE).toAbsolutePath(); log.info("Credentials File path: {}", path.toString()); - // AuthenticationOAuth2 - Authentication authentication = AuthenticationFactoryOAuth2.clientCredentials( - new URL(server.getIssuer()), - path.toUri().toURL(), // key file path - audience - ); - closeAdmin(); admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) - .authentication(authentication) + .authentication(createAuthentication(path)) .build()); replacePulsarClient(PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString()) .statsInterval(0, TimeUnit.SECONDS) - .authentication(authentication)); + .authentication(createAuthentication(path))); + } + + private Authentication createAuthentication(Path path) throws MalformedURLException { + return AuthenticationFactoryOAuth2.clientCredentials( + new URL(server.getIssuer()), + path.toUri().toURL(), // key file path + audience + ); } @AfterMethod(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java index 8c93b293c41a4..8ab94e29cfe6e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java @@ -70,8 +70,9 @@ public void testPartitionedTopicAutoCreation() { Assert.assertEquals(nsAutoTopicCreationOverride, expectedPolicies); // Background invalidate cache final MetadataCache nsCache = pulsar.getPulsarResources().getNamespaceResources().getCache(); + @Cleanup("interrupt") final Thread t1 = new Thread(() -> { - while (true) { + while (!Thread.currentThread().isInterrupted()) { nsCache.invalidate("/admin/policies/" + namespace); } }); @@ -90,7 +91,5 @@ public void testPartitionedTopicAutoCreation() { // double-check policies final AutoTopicCreationOverride actualPolicies2 = admin.namespaces().getAutoTopicCreation(namespace); Assert.assertEquals(actualPolicies2, expectedPolicies); - - t1.interrupt(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index d7e15f4ce8367..9d7493733fe89 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -527,6 +527,7 @@ public void testPulsarFunctionBKCleanup() throws Exception { log.info("dlog url: {}", url); URI dlogUri = URI.create(url); + @Cleanup Namespace dlogNamespace = NamespaceBuilder.newBuilder() .conf(dlogConf) .clientId("function-worker-" + workerConfig.getWorkerId()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java index e4d7b8349ec2d..3f34857f59e8b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java @@ -83,7 +83,7 @@ public void setup() throws Exception { } service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service) .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index a3b26a4a9d122..d4f7c72bed016 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -65,7 +65,7 @@ protected void setup() throws Exception { config.setWebServicePort(Optional.of(0)); config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service) .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); service.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java index 173948ab1be5b..85f512e15670e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java @@ -68,7 +68,7 @@ public void configTest(int numIoThreads, int connectionsPerBroker) throws Except config.setServiceUrl("http://localhost:8080"); config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100"); WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service) .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); service.start(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java index cf7304615f5be..5234ca0057875 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java @@ -74,7 +74,7 @@ public void setup() throws Exception { config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); config.setCryptoKeyReaderFactoryClassName(CryptoKeyReaderFactoryImpl.class.getName()); service = spy(new WebSocketService(config)); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service) .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java index ab5a43b115ab1..6c9c5deb0c3ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java @@ -65,7 +65,7 @@ public void setup() throws Exception { config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); config.setWebSocketSessionIdleTimeoutMillis(3 * 1000); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service) .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java index 8ba9283138926..b4ecb84f580b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java @@ -67,7 +67,7 @@ public void setup() throws Exception { config.setWebSocketSessionIdleTimeoutMillis(3 * 1000); config.setWebSocketPingDurationSeconds(2); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service) .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java index 16936d65fc22b..d81c39be28487 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java @@ -78,7 +78,7 @@ public void setup() throws Exception { config.setClusterName("test"); config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); service = spy(new WebSocketService(config)); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service) .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 8c64e40f927c4..ad51158034315 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -100,7 +100,7 @@ public void setup() throws Exception { config.setClusterName("test"); config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service) .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java index 91cd4fab470d6..dca4964fc987e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java @@ -74,7 +74,7 @@ public void setup() throws Exception { config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service) .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java index 0a432406001ad..c3e75bcb4f0ec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java @@ -59,7 +59,7 @@ public void setup() throws Exception { config.setServiceUrl(pulsar.getSafeWebServiceAddress()); config.setServiceUrlTls(pulsar.getWebServiceAddressTls()); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(service) + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(service) .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java index 01c851290b621..9767be625a070 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java @@ -84,7 +84,7 @@ public void setup() throws Exception { } service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service) .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index a934b8b078426..216d9ea308539 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -388,7 +388,7 @@ public void close() throws IOException { } if (statsExecutor != null) { - statsExecutor.shutdown(); + statsExecutor.shutdownNow(); } if (proxyAdditionalServlets != null) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 7427331641318..485befa00ac87 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -35,6 +35,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.concurrent.atomic.AtomicReference; import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.util.datetime.FixedDateFormat; @@ -101,6 +102,7 @@ public class ProxyServiceStarter { private ProxyService proxyService; private WebServer server; + private WebSocketService webSocketService; private static boolean metricsInitialized; public ProxyServiceStarter(String[] args) throws Exception { @@ -228,7 +230,9 @@ public double get() { metricsInitialized = true; } - addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider()); + AtomicReference webSocketServiceRef = new AtomicReference<>(); + addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider(), webSocketServiceRef); + webSocketService = webSocketServiceRef.get(); // start web-service server.start(); @@ -242,6 +246,9 @@ public void close() { if (server != null) { server.stop(); } + if (webSocketService != null) { + webSocketService.close(); + } } catch (Exception e) { log.warn("server couldn't stop gracefully {}", e.getMessage(), e); } finally { @@ -250,9 +257,17 @@ public void close() { } public static void addWebServerHandlers(WebServer server, - ProxyConfiguration config, - ProxyService service, - BrokerDiscoveryProvider discoveryProvider) throws Exception { + ProxyConfiguration config, + ProxyService service, + BrokerDiscoveryProvider discoveryProvider) throws Exception { + addWebServerHandlers(server, config, service, discoveryProvider, null); + } + + public static void addWebServerHandlers(WebServer server, + ProxyConfiguration config, + ProxyService service, + BrokerDiscoveryProvider discoveryProvider, + AtomicReference webSocketServiceRef) throws Exception { // We can make 'status.html' publicly accessible without authentication since // it does not contain any sensitive data. server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(), @@ -301,6 +316,9 @@ public static void addWebServerHandlers(WebServer server, serviceConfiguration.setBrokerClientTlsEnabled(config.isTlsEnabledWithBroker()); WebSocketService webSocketService = new WebSocketService(createClusterData(config), serviceConfiguration); webSocketService.start(); + if (webSocketServiceRef != null) { + webSocketServiceRef.set(webSocketService); + } final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService); server.addServlet(WebSocketProducerServlet.SERVLET_PATH, new ServletHolder(producerWebSocketServlet)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java index 79662097c3b2f..fde7c938d0a62 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java @@ -143,8 +143,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java index 604354e868ebe..bc2029861f415 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java @@ -102,11 +102,11 @@ protected void setup() throws Exception { proxyConfig.setBrokerClientAuthenticationParameters(String.format("keyStoreType:%s,keyStorePath:%s,keyStorePassword:%s", KEYSTORE_TYPE, BROKER_KEYSTORE_FILE_PATH, BROKER_KEYSTORE_PW)); - resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), - new ZKMetadataStore(mockZooKeeperGlobal)); + resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)), + registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))); webServer = new WebServer(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig))); - discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource)); + discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null); doReturn(report).when(discoveryProvider).nextBroker(); ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java index 100ea64dd2e08..d83de9652cfde 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java @@ -98,11 +98,11 @@ protected void setup() throws Exception { proxyConfig.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH); proxyConfig.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName())); - resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), - new ZKMetadataStore(mockZooKeeperGlobal)); + resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)), + registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))); webServer = new WebServer(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig))); - discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource)); + discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null); doReturn(report).when(discoveryProvider).nextBroker(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java index 17cd3c33e799d..34ab22c7fc656 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java @@ -84,8 +84,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); Optional proxyLogLevel = Optional.of(2); assertEquals(proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel()); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java index 85f44b8171c45..1c93cb20c70df 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java @@ -140,8 +140,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java index 336f11ae19da6..a070d1e84d339 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java @@ -61,8 +61,9 @@ protected void setup() throws Exception { proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java index 3aa71413d540b..37fd66cd7dab4 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java @@ -41,8 +41,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java index 8b3092c6f5170..5704ba55fed86 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java @@ -63,8 +63,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java index 246dd9f85e319..90e15ede2f436 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java @@ -19,24 +19,20 @@ package org.apache.pulsar.proxy.server; import static java.nio.charset.StandardCharsets.UTF_8; - import java.io.IOException; import java.util.Properties; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.BooleanSupplier; - import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.core.Response; - import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.resources.PulsarResources; @@ -56,7 +52,6 @@ import org.eclipse.jetty.util.ProcessorUtils; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.logging.LoggingFeature; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -81,8 +76,8 @@ protected void setup() throws Exception { // Set number of CPU's to two for unit tests for running in resource constrained env. ProcessorUtils.setAvailableProcessors(2); - resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), - new ZKMetadataStore(mockZooKeeperGlobal)); + resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)), + registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))); backingServer1 = new Server(0); backingServer1.setHandler(newHandler("server1")); backingServer1.start(); @@ -164,6 +159,7 @@ protected void cleanup() throws Exception { backingServer1.stop(); backingServer2.stop(); + backingServer3.stop(); client.close(); } @@ -204,7 +200,7 @@ public void testSingleRedirect() throws Exception { WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get(); @@ -233,7 +229,7 @@ public void testMultipleRedirect() throws Exception { WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); webServer.start(); try { Response r1 = client.target(webServer.getServiceUri()).path("/server1/foobar").request().get(); @@ -264,7 +260,7 @@ public void testTryingToUseExistingPath() throws Exception { WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); } @@ -283,7 +279,7 @@ public void testLongPathInProxyTo() throws Exception { WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get(); @@ -310,7 +306,7 @@ public void testProxyToEndsInSlash() throws Exception { WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get(); @@ -336,7 +332,7 @@ public void testLongPath() throws Exception { WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/foo/bar/blah/foobar").request().get(); @@ -366,7 +362,7 @@ public void testLongUri() throws Exception { WebServer webServerMaxUriLen8k = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen8k, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); webServerMaxUriLen8k.start(); try { Response r = client.target(webServerMaxUriLen8k.getServiceUri()).path(longUri.toString()).request().get(); @@ -378,7 +374,7 @@ public void testLongUri() throws Exception { proxyConfig.setHttpMaxRequestHeaderSize(12 * 1024); WebServer webServerMaxUriLen12k = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen12k, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); webServerMaxUriLen12k.start(); try { Response r = client.target(webServerMaxUriLen12k.getServiceUri()).path(longUri.toString()).request().get(); @@ -402,7 +398,7 @@ public void testPathEndsInSlash() throws Exception { WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get(); @@ -434,7 +430,7 @@ public void testStreaming() throws Exception { WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); webServer.start(); HttpClient httpClient = new HttpClient(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java index 88e7b269d6eeb..6a9745f05507b 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java @@ -90,8 +90,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java index 5feef74e3b94b..4ceb85a852492 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java @@ -77,8 +77,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java index 5c4e40ed65a70..5ee03395b80c8 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java @@ -89,8 +89,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index 1b63aa14dfe42..167c3b196465a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -69,8 +69,9 @@ protected void setup() throws Exception { AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); proxyService = Mockito.spy(new ProxyService(proxyConfig, authenticationService)); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java index ad237c2539700..08066f2e5bf53 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java @@ -69,8 +69,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java index 82cd702aa7f0a..0d93185f5e899 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.pulsar.proxy.server; import static com.google.common.base.Preconditions.checkArgument; @@ -23,14 +24,10 @@ import static java.util.Objects.requireNonNull; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; - import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; - import java.util.Optional; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; - import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; @@ -78,12 +75,13 @@ protected void setup() throws Exception { proxyConfig.setProxyLogLevel(Optional.ofNullable(2)); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + PulsarConfigurationLoader.convertFrom(proxyConfig)))); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); Optional proxyLogLevel = Optional.of(2); - assertEquals( proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel()); + assertEquals(proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel()); proxyService.start(); } @@ -100,8 +98,9 @@ public void testProducer() throws Exception { @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) .build(); - Producer producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic") - .create(); + Producer producer = + client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic") + .create(); for (int i = 0; i < 10; i++) { producer.send("test".getBytes()); @@ -114,10 +113,10 @@ public void testProducerConsumer() throws Exception { PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) .build(); Producer producer = client.newProducer(Schema.BYTES) - .topic("persistent://sample/test/local/producer-consumer-topic") - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); + .topic("persistent://sample/test/local/producer-consumer-topic") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // Create a consumer directly attached to broker Consumer consumer = pulsarClient.newConsumer() @@ -149,9 +148,9 @@ public void testPartitions() throws Exception { admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2); Producer producer = client.newProducer(Schema.BYTES) - .topic("persistent://sample/test/local/partitioned-topic") - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .topic("persistent://sample/test/local/partitioned-topic") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); // Create a consumer directly attached to broker Consumer consumer = pulsarClient.newConsumer().topic("persistent://sample/test/local/partitioned-topic") @@ -171,18 +170,18 @@ public void testPartitions() throws Exception { public void testRegexSubscription() throws Exception { @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) - .connectionsPerBroker(5).ioThreads(5).build(); + .connectionsPerBroker(5).ioThreads(5).build(); // create two topics by subscribing to a topic and closing it try (Consumer ignored = client.newConsumer() - .topic("persistent://sample/test/local/topic1") - .subscriptionName("ignored") - .subscribe()) { + .topic("persistent://sample/test/local/topic1") + .subscriptionName("ignored") + .subscribe()) { } try (Consumer ignored = client.newConsumer() - .topic("persistent://sample/test/local/topic2") - .subscriptionName("ignored") - .subscribe()) { + .topic("persistent://sample/test/local/topic2") + .subscriptionName("ignored") + .subscribe()) { } String subName = "regex-sub-proxy-parser-test-" + System.currentTimeMillis(); @@ -190,16 +189,16 @@ public void testRegexSubscription() throws Exception { String regexSubscriptionPattern = "persistent://sample/test/local/topic.*"; log.info("Regex subscribe to topics {}", regexSubscriptionPattern); try (Consumer consumer = client.newConsumer() - .topicsPattern(regexSubscriptionPattern) - .subscriptionName(subName) - .subscribe()) { + .topicsPattern(regexSubscriptionPattern) + .subscriptionName(subName) + .subscribe()) { log.info("Successfully subscribe to topics using regex {}", regexSubscriptionPattern); final int numMessages = 20; try (Producer producer = client.newProducer(Schema.BYTES) - .topic("persistent://sample/test/local/topic1") - .create()) { + .topic("persistent://sample/test/local/topic1") + .create()) { for (int i = 0; i < numMessages; i++) { producer.send(("message-" + i).getBytes(UTF_8)); } @@ -219,8 +218,12 @@ public void testProtocolVersionAdvertisement() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); conf.setServiceUrl(proxyService.getServiceUrl()); + @Cleanup("shutdownNow") + EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, + new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon())); @Cleanup - PulsarClient client = getClientActiveConsumerChangeNotSupported(conf); + PulsarClient client = getClientActiveConsumerChangeNotSupported(conf, + eventLoopGroup); Producer producer = client.newProducer().topic(topic).create(); Consumer consumer = client.newConsumer().topic(topic).subscriptionName(sub) @@ -243,10 +246,9 @@ public void testProtocolVersionAdvertisement() throws Exception { ((PulsarClientImpl) client).getCnxPool().close(); } - private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf) + private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf, + final EventLoopGroup eventLoopGroup) throws Exception { - ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon()); - EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory); ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> { return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java index 6948996ad4636..b692987d17af6 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java @@ -74,8 +74,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java index def58be6df372..925e8192e145a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java @@ -95,7 +95,9 @@ public void testProducer() throws Exception { @Test public void testProduceAndConsumeMessageWithWebsocket() throws Exception { + @Cleanup("stop") HttpClient producerClient = new HttpClient(); + @Cleanup("stop") WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient); producerWebSocketClient.start(); MyWebSocket producerSocket = new MyWebSocket(); @@ -106,7 +108,9 @@ public void testProduceAndConsumeMessageWithWebsocket() throws Exception { produceRequest.setContext("context"); produceRequest.setPayload(Base64.getEncoder().encodeToString("my payload".getBytes())); + @Cleanup("stop") HttpClient consumerClient = new HttpClient(); + @Cleanup("stop") WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient); consumerWebSocketClient.start(); MyWebSocket consumerSocket = new MyWebSocket(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java index 6247c2a66e874..b21162577a25e 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java @@ -106,7 +106,9 @@ public void testProducer() throws Exception { @Test public void testProduceAndConsumeMessageWithWebsocket() throws Exception { + @Cleanup("stop") HttpClient producerClient = new HttpClient(); + @Cleanup("stop") WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient); producerWebSocketClient.start(); MyWebSocket producerSocket = new MyWebSocket(); @@ -117,7 +119,9 @@ public void testProduceAndConsumeMessageWithWebsocket() throws Exception { produceRequest.setContext("context"); produceRequest.setPayload(Base64.getEncoder().encodeToString("my payload".getBytes())); + @Cleanup("stop") HttpClient consumerClient = new HttpClient(); + @Cleanup("stop") WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient); consumerWebSocketClient.start(); MyWebSocket consumerSocket = new MyWebSocket(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java index 140af88aae71b..a2692f96dcce0 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java @@ -77,8 +77,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); Optional proxyLogLevel = Optional.of(2); assertEquals(proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel()); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java index 97279659af626..79ea7c5d6a31c 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java @@ -93,8 +93,9 @@ protected LookupProxyHandler newLookupProxyHandler(ProxyConnection proxyConnecti return new TestLookupProxyHandler(this, proxyConnection); } }); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index e799e2e948a4a..51f7afee09060 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -97,8 +97,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java index 64b0cd6b1a610..a1b27abece4d1 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java @@ -64,8 +64,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java index 0f1fa74a20916..f6dff8fc3ea49 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java @@ -76,8 +76,9 @@ protected void setup() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java index 37465b21322bc..f3302b637a144 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java @@ -409,8 +409,8 @@ public void testProxyAuthorizationWithPrefixSubscriptionAuthMode() throws Except @Test void testGetStatus() throws Exception { log.info("-- Starting {} test --", methodName); - final PulsarResources resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), - new ZKMetadataStore(mockZooKeeperGlobal)); + final PulsarResources resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)), + registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))); final AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); final WebServer webServer = new WebServer(proxyConfig, authService); @@ -433,8 +433,8 @@ void testGetStatus() throws Exception { void testGetMetrics() throws Exception { log.info("-- Starting {} test --", methodName); startProxy(); - PulsarResources resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), - new ZKMetadataStore(mockZooKeeperGlobal)); + PulsarResources resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)), + registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); proxyConfig.setAuthenticateMetricsEndpoint(false); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java index d3291c8fb910d..a44e2a85efa61 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java @@ -93,11 +93,11 @@ protected void setup() throws Exception { proxyConfig.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH); proxyConfig.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName())); - resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), - new ZKMetadataStore(mockZooKeeperGlobal)); + resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)), + registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))); webServer = new WebServer(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig))); - discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource)); + discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null); doReturn(report).when(discoveryProvider).nextBroker(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java index 14cd9f41d9986..d239815ae81e8 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java @@ -79,9 +79,9 @@ protected void setup() throws Exception { webServer = new WebServer(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig))); - resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), - new ZKMetadataStore(mockZooKeeperGlobal)); - discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource)); + resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)), + registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))); + discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); adminProxyHandler = new AdminProxyWrapper(proxyConfig, discoveryProvider); ServletHolder servletHolder = new ServletHolder(adminProxyHandler); webServer.addServlet("/admin", servletHolder);