From 2cc34afc0320e66a996b9517d6729a495563fc54 Mon Sep 17 00:00:00 2001 From: massakam Date: Tue, 23 Jul 2019 14:40:48 +0900 Subject: [PATCH] Process requests asynchronously on some REST APIs (2) (#4778) Master Issue: #4756 ### Motivation This is a continuation of https://github.com/apache/pulsar/pull/4765. ### Modifications Added async rest handlers to the following APIs: ``` DELETE /admin/namespaces/{tenant}/{cluster}/{namespace} PUT /admin/namespaces/{tenant}/{cluster}/{namespace}/unload POST /admin/namespaces/{tenant}/{cluster}/{namespace}/clearBacklog POST /admin/namespaces/{tenant}/{cluster}/{namespace}/clearBacklog/{subscription} POST /admin/namespaces/{tenant}/{cluster}/{namespace}/unsubscribe/{subscription} DELETE /admin/v2/namespaces/{tenant}/{namespace} PUT /admin/v2/namespaces/{tenant}/{namespace}/unload POST /admin/v2/namespaces/{tenant}/{namespace}/clearBacklog POST /admin/v2/namespaces/{tenant}/{namespace}/clearBacklog/{subscription} POST /admin/v2/namespaces/{tenant}/{namespace}/unsubscribe/{subscription} ``` --- .../broker/admin/impl/NamespacesBase.java | 260 +++++++++++------- .../pulsar/broker/admin/v1/Namespaces.java | 73 +++-- .../broker/admin/v1/NonPersistentTopics.java | 4 + .../broker/admin/v1/PersistentTopics.java | 9 +- .../pulsar/broker/admin/v2/Namespaces.java | 72 +++-- .../broker/admin/v2/NonPersistentTopics.java | 4 + .../broker/admin/v2/PersistentTopics.java | 9 +- .../apache/pulsar/broker/admin/AdminTest.java | 10 +- .../pulsar/broker/admin/NamespacesTest.java | 139 ++++++---- .../pulsar/client/admin/Namespaces.java | 62 ++++- .../client/admin/internal/NamespacesImpl.java | 94 +++++-- 11 files changed, 511 insertions(+), 225 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index c6b7806556e13..fac78a0858f2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -47,6 +47,7 @@ import java.util.stream.Collectors; import javax.ws.rs.WebApplicationException; +import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriBuilder; @@ -129,7 +130,7 @@ protected void internalCreateNamespace(Policies policies) { } @SuppressWarnings("deprecation") - protected void internalDeleteNamespace(boolean authoritative) { + protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) { validateAdminAccessForTenant(namespaceName.getTenant()); validatePoliciesReadOnlyAccess(); @@ -179,9 +180,11 @@ protected void internalDeleteNamespace(boolean authoritative) { } } } catch (WebApplicationException wae) { - throw wae; + asyncResponse.resume(wae); + return; } catch (Exception e) { - throw new RestException(e); + asyncResponse.resume(new RestException(e)); + return; } boolean isEmpty; @@ -190,12 +193,16 @@ protected void internalDeleteNamespace(boolean authoritative) { && getPartitionedTopicList(TopicDomain.persistent).isEmpty() && getPartitionedTopicList(TopicDomain.non_persistent).isEmpty(); } catch (Exception e) { - throw new RestException(e); + asyncResponse.resume(new RestException(e)); + return; } if (!isEmpty) { - log.debug("Found topics on namespace {}", namespaceName); - throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace"); + if (log.isDebugEnabled()) { + log.debug("Found topics on namespace {}", namespaceName); + } + asyncResponse.resume(new RestException(Status.CONFLICT, "Cannot delete non empty namespace")); + return; } // set the policies to deleted so that somebody else cannot acquire this namespace @@ -206,35 +213,58 @@ && getPartitionedTopicList(TopicDomain.persistent).isEmpty() policiesCache().invalidate(path(POLICIES, namespaceName.toString())); } catch (Exception e) { log.error("[{}] Failed to delete namespace on global ZK {}", clientAppId(), namespaceName, e); - throw new RestException(e); + asyncResponse.resume(new RestException(e)); + return; } // remove from owned namespace map and ephemeral node from ZK + final List> futures = Lists.newArrayList(); try { NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() .getBundles(namespaceName); for (NamespaceBundle bundle : bundles.getBundles()) { // check if the bundle is owned by any broker, if not then we do not need to delete the bundle if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) { - pulsar().getAdminClient().namespaces().deleteNamespaceBundle(namespaceName.toString(), - bundle.getBundleRange()); + futures.add(pulsar().getAdminClient().namespaces() + .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange())); } } - - // we have successfully removed all the ownership for the namespace, the policies znode can be deleted now - final String globalZkPolicyPath = path(POLICIES, namespaceName.toString()); - final String lcaolZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, namespaceName.toString()); - globalZk().delete(globalZkPolicyPath, -1); - localZk().delete(lcaolZkPolicyPath, -1); - policiesCache().invalidate(globalZkPolicyPath); - localCacheService().policiesCache().invalidate(lcaolZkPolicyPath); - } catch (PulsarAdminException cae) { - throw new RestException(cae); } catch (Exception e) { log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e); - // avoid throwing exception in case of the second failure + asyncResponse.resume(new RestException(e)); + return; } + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + if (exception.getCause() instanceof PulsarAdminException) { + asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); + return null; + } else { + log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, exception); + asyncResponse.resume(new RestException(exception.getCause())); + return null; + } + } + + try { + // we have successfully removed all the ownership for the namespace, the policies znode can be deleted + // now + final String globalZkPolicyPath = path(POLICIES, namespaceName.toString()); + final String lcaolZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, namespaceName.toString()); + globalZk().delete(globalZkPolicyPath, -1); + localZk().delete(lcaolZkPolicyPath, -1); + policiesCache().invalidate(globalZkPolicyPath); + localCacheService().policiesCache().invalidate(lcaolZkPolicyPath); + } catch (Exception e) { + log.error("[{}] Failed to remove owned namespace {} from ZK", clientAppId(), namespaceName, e); + asyncResponse.resume(new RestException(e)); + return null; + } + + asyncResponse.resume(Response.ok().build()); + return null; + }); } @SuppressWarnings("deprecation") @@ -274,7 +304,9 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori } URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost()) .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build(); - log.debug("[{}] Redirecting the rest call to {}: cluster={}", clientAppId(), redirect, replCluster); + if(log.isDebugEnabled()) { + log.debug("[{}] Redirecting the rest call to {}: cluster={}", clientAppId(), redirect, replCluster); + } throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); } } @@ -554,7 +586,7 @@ protected void internalModifyDeduplication(boolean enableDeduplication) { } @SuppressWarnings("deprecation") - protected void internalUnloadNamespace() { + protected void internalUnloadNamespace(AsyncResponse asyncResponse) { log.info("[{}] Unloading namespace {}", clientAppId(), namespaceName); validateSuperUserAccess(); @@ -569,18 +601,35 @@ protected void internalUnloadNamespace() { Policies policies = getNamespacePolicies(namespaceName); + final List> futures = Lists.newArrayList(); List boundaries = policies.bundles.getBoundaries(); for (int i = 0; i < boundaries.size() - 1; i++) { String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1)); try { - pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespaceName.toString(), bundle); - } catch (PulsarServerException | PulsarAdminException e) { - log.error(String.format("[%s] Failed to unload namespace %s", clientAppId(), namespaceName), e); - throw new RestException(e); + futures.add(pulsar().getAdminClient().namespaces().unloadNamespaceBundleAsync(namespaceName.toString(), + bundle)); + } catch (PulsarServerException e) { + log.error("[{}] Failed to unload namespace {}", clientAppId(), namespaceName, e); + asyncResponse.resume(new RestException(e)); + return; } } - log.info("[{}] Successfully unloaded all the bundles in namespace {}", clientAppId(), namespaceName); + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + log.error("[{}] Failed to unload namespace {}", clientAppId(), namespaceName, exception); + if (exception.getCause() instanceof PulsarAdminException) { + asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); + return null; + } else { + asyncResponse.resume(new RestException(exception.getCause())); + return null; + } + } + log.info("[{}] Successfully unloaded all the bundles in namespace {}", clientAppId(), namespaceName); + asyncResponse.resume(Response.ok().build()); + return null; + }); } @@ -1114,41 +1163,45 @@ protected PersistencePolicies internalGetPersistence() { } } - protected void internalClearNamespaceBacklog(boolean authoritative) { + protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) { validateAdminAccessForTenant(namespaceName.getTenant()); + final List> futures = Lists.newArrayList(); try { NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() .getBundles(namespaceName); - Exception exception = null; for (NamespaceBundle nsBundle : bundles.getBundles()) { - try { - // check if the bundle is owned by any broker, if not then there is no backlog on this bundle to - // clear - if (pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) { - // TODO: make this admin call asynchronous - pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklog(namespaceName.toString(), - nsBundle.getBundleRange()); - } - } catch (Exception e) { - if (exception == null) { - exception = e; - } + // check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear + if (pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) { + futures.add(pulsar().getAdminClient().namespaces() + .clearNamespaceBundleBacklogAsync(namespaceName.toString(), nsBundle.getBundleRange())); } } + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + return; + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + return; + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { if (exception != null) { - if (exception instanceof PulsarAdminException) { - throw new RestException((PulsarAdminException) exception); + log.warn("[{}] Failed to clear backlog on the bundles for namespace {}: {}", clientAppId(), + namespaceName, exception.getCause().getMessage()); + if (exception.getCause() instanceof PulsarAdminException) { + asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); + return null; } else { - throw new RestException(exception.getCause()); + asyncResponse.resume(new RestException(exception.getCause())); + return null; } } - } catch (WebApplicationException wae) { - throw wae; - } catch (Exception e) { - throw new RestException(e); - } - log.info("[{}] Successfully cleared backlog on all the bundles for namespace {}", clientAppId(), namespaceName); + log.info("[{}] Successfully cleared backlog on all the bundles for namespace {}", clientAppId(), + namespaceName); + asyncResponse.resume(Response.ok().build()); + return null; + }); } @SuppressWarnings("deprecation") @@ -1172,42 +1225,46 @@ protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean a bundleRange); } - protected void internalClearNamespaceBacklogForSubscription(String subscription, boolean authoritative) { + protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription, + boolean authoritative) { validateAdminAccessForTenant(namespaceName.getTenant()); + final List> futures = Lists.newArrayList(); try { NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() .getBundles(namespaceName); - Exception exception = null; for (NamespaceBundle nsBundle : bundles.getBundles()) { - try { - // check if the bundle is owned by any broker, if not then there is no backlog on this bundle to - // clear - if (pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) { - // TODO: make this admin call asynchronous - pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscription( - namespaceName.toString(), nsBundle.getBundleRange(), subscription); - } - } catch (Exception e) { - if (exception == null) { - exception = e; - } + // check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear + if (pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) { + futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync( + namespaceName.toString(), nsBundle.getBundleRange(), subscription)); } } + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + return; + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + return; + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { if (exception != null) { - if (exception instanceof PulsarAdminException) { - throw new RestException((PulsarAdminException) exception); + log.warn("[{}] Failed to clear backlog for subscription {} on the bundles for namespace {}: {}", + clientAppId(), subscription, namespaceName, exception.getCause().getMessage()); + if (exception.getCause() instanceof PulsarAdminException) { + asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); + return null; } else { - throw new RestException(exception.getCause()); + asyncResponse.resume(new RestException(exception.getCause())); + return null; } } - } catch (WebApplicationException wae) { - throw wae; - } catch (Exception e) { - throw new RestException(e); - } - log.info("[{}] Successfully cleared backlog for subscription {} on all the bundles for namespace {}", - clientAppId(), subscription, namespaceName); + log.info("[{}] Successfully cleared backlog for subscription {} on all the bundles for namespace {}", + clientAppId(), subscription, namespaceName); + asyncResponse.resume(Response.ok().build()); + return null; + }); } @SuppressWarnings("deprecation") @@ -1232,41 +1289,46 @@ protected void internalClearNamespaceBundleBacklogForSubscription(String subscri subscription, namespaceName, bundleRange); } - protected void internalUnsubscribeNamespace(String subscription, boolean authoritative) { + protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription, + boolean authoritative) { validateAdminAccessForTenant(namespaceName.getTenant()); + final List> futures = Lists.newArrayList(); try { NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() .getBundles(namespaceName); - Exception exception = null; for (NamespaceBundle nsBundle : bundles.getBundles()) { - try { - // check if the bundle is owned by any broker, if not then there are no subscriptions - if (pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) { - // TODO: make this admin call asynchronous - pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundle(namespaceName.toString(), - nsBundle.getBundleRange(), subscription); - } - } catch (Exception e) { - if (exception == null) { - exception = e; - } + // check if the bundle is owned by any broker, if not then there are no subscriptions + if (pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) { + futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync( + namespaceName.toString(), nsBundle.getBundleRange(), subscription)); } } + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + return; + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + return; + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { if (exception != null) { - if (exception instanceof PulsarAdminException) { - throw new RestException((PulsarAdminException) exception); + log.warn("[{}] Failed to unsubscribe {} on the bundles for namespace {}: {}", clientAppId(), + subscription, namespaceName, exception.getCause().getMessage()); + if (exception.getCause() instanceof PulsarAdminException) { + asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); + return null; } else { - throw new RestException(exception.getCause()); + asyncResponse.resume(new RestException(exception.getCause())); + return null; } } - } catch (WebApplicationException wae) { - throw wae; - } catch (Exception e) { - throw new RestException(e); - } - log.info("[{}] Successfully unsubscribed {} on all the bundles for namespace {}", clientAppId(), subscription, - namespaceName); + log.info("[{}] Successfully unsubscribed {} on all the bundles for namespace {}", clientAppId(), + subscription, namespaceName); + asyncResponse.resume(Response.ok().build()); + return null; + }); } @SuppressWarnings("deprecation") @@ -1619,7 +1681,9 @@ protected BundlesData validateBundlesData(BundlesData initialBundles) { partitions.add(String.format("0x%08x", partBoundary)); } if (partitions.size() != initialBundles.getBoundaries().size()) { - log.debug("Input bundles included repeated partition points. Ignored."); + if (log.isDebugEnabled()) { + log.debug("Input bundles included repeated partition points. Ignored."); + } } try { NamespaceBundleFactory.validateFullRange(partitions); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 0fec21dc5a744..a47bc23168cfa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -51,6 +51,9 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response.Status; import java.util.List; @@ -178,11 +181,17 @@ public void createNamespace(@PathParam("property") String property, @PathParam(" @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace is not empty") }) - public void deleteNamespace(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, + public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateNamespaceName(property, cluster, namespace); - internalDeleteNamespace(authoritative); + try { + validateNamespaceName(property, cluster, namespace); + internalDeleteNamespace(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @DELETE @@ -400,10 +409,16 @@ public BundlesData getBundlesData(@PathParam("property") String property, @PathP @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Namespace is already unloaded or Namespace has bundles activated") }) - public void unloadNamespace(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { - validateNamespaceName(property, cluster, namespace); - internalUnloadNamespace(); + public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { + try { + validateNamespaceName(property, cluster, namespace); + internalUnloadNamespace(asyncResponse); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @PUT @@ -605,11 +620,18 @@ public PersistencePolicies getPersistence(@PathParam("property") String property @ApiOperation(hidden = true, value = "Clear backlog for all topics on a namespace.") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBacklog(@PathParam("property") String property, @PathParam("cluster") String cluster, + public void clearNamespaceBacklog(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateNamespaceName(property, cluster, namespace); - internalClearNamespaceBacklog(authoritative); + try { + validateNamespaceName(property, cluster, namespace); + internalClearNamespaceBacklog(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @POST @@ -630,12 +652,18 @@ public void clearNamespaceBundleBacklog(@PathParam("property") String property, @ApiOperation(hidden = true, value = "Clear backlog for a given subscription on all topics on a namespace.") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBacklogForSubscription(@PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @PathParam("subscription") String subscription, + public void clearNamespaceBacklogForSubscription(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateNamespaceName(property, cluster, namespace); - internalClearNamespaceBacklogForSubscription(subscription, authoritative); + try { + validateNamespaceName(property, cluster, namespace); + internalClearNamespaceBacklogForSubscription(asyncResponse, subscription, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @POST @@ -656,11 +684,18 @@ public void clearNamespaceBundleBacklogForSubscription(@PathParam("property") St @ApiOperation(hidden = true, value = "Unsubscribes the given subscription on all topics on a namespace.") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void unsubscribeNamespace(@PathParam("property") String property, @PathParam("cluster") String cluster, + public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateNamespaceName(property, cluster, namespace); - internalUnsubscribeNamespace(subscription, authoritative); + try { + validateNamespaceName(property, cluster, namespace); + internalUnsubscribeNamespace(asyncResponse, subscription, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java index ced620acb7c1b..f1347e342ed4f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java @@ -37,6 +37,7 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; @@ -180,6 +181,9 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr // check cluster ownership for a given global namespace: redirect if peer-cluster owns it validateGlobalNamespaceOwnership(nsName); } + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + return; } catch (Exception e) { asyncResponse.resume(new RestException(e)); return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 78efca2012054..87df5899c792e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -32,6 +32,7 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; @@ -76,8 +77,10 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr try { validateNamespaceName(property, cluster, namespace); asyncResponse.resume(internalGetList()); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); } catch (Exception e) { - asyncResponse.resume(e instanceof RestException ? e : new RestException(e)); + asyncResponse.resume(new RestException(e)); } } @@ -286,6 +289,8 @@ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse, try { validateTopicName(property, cluster, namespace, encodedTopic); internalGetPartitionedStats(asyncResponse, authoritative, perPartition); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); } catch (Exception e) { asyncResponse.resume(new RestException(e)); } @@ -306,6 +311,8 @@ public void getPartitionedStatsInternal( try { validateTopicName(property, cluster, namespace, encodedTopic); internalGetPartitionedStatsInternal(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); } catch (Exception e) { asyncResponse.resume(new RestException(e)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index d29972e66398e..435892eefb262 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -32,6 +32,9 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; import org.apache.pulsar.broker.admin.impl.NamespacesBase; @@ -126,10 +129,17 @@ public void createNamespace(@PathParam("tenant") String tenant, @PathParam("name @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace is not empty") }) - public void deleteNamespace(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateNamespaceName(tenant, namespace); - internalDeleteNamespace(authoritative); + try { + validateNamespaceName(tenant, namespace); + internalDeleteNamespace(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @DELETE @@ -302,9 +312,16 @@ public BundlesData getBundlesData(@PathParam("tenant") String tenant, @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or namespace doesn't exist"), @ApiResponse(code = 412, message = "Namespace is already unloaded or Namespace has bundles activated") }) - public void unloadNamespace(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { - validateNamespaceName(tenant, namespace); - internalUnloadNamespace(); + public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + try { + validateNamespaceName(tenant, namespace); + internalUnloadNamespace(asyncResponse); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @PUT @@ -545,10 +562,17 @@ public PersistencePolicies getPersistence(@PathParam("tenant") String tenant, @ApiOperation(value = "Clear backlog for all topics on a namespace.") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBacklog(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + public void clearNamespaceBacklog(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateNamespaceName(tenant, namespace); - internalClearNamespaceBacklog(authoritative); + try { + validateNamespaceName(tenant, namespace); + internalClearNamespaceBacklog(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @POST @@ -568,11 +592,18 @@ public void clearNamespaceBundleBacklog(@PathParam("tenant") String tenant, @ApiOperation(value = "Clear backlog for a given subscription on all topics on a namespace.") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBacklogForSubscription(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, + public void clearNamespaceBacklogForSubscription(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + @PathParam("subscription") String subscription, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateNamespaceName(tenant, namespace); - internalClearNamespaceBacklogForSubscription(subscription, authoritative); + try { + validateNamespaceName(tenant, namespace); + internalClearNamespaceBacklogForSubscription(asyncResponse, subscription, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @POST @@ -593,11 +624,18 @@ public void clearNamespaceBundleBacklogForSubscription(@PathParam("tenant") Stri @ApiOperation(value = "Unsubscribes the given subscription on all topics on a namespace.") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void unsubscribeNamespace(@PathParam("tenant") String tenant, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, + public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + @PathParam("subscription") String subscription, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateNamespaceName(tenant, namespace); - internalUnsubscribeNamespace(subscription, authoritative); + try { + validateNamespaceName(tenant, namespace); + internalUnsubscribeNamespace(asyncResponse, subscription, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 9807c9fdce1a7..8125f5b31b4fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -38,6 +38,7 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; @@ -240,6 +241,9 @@ public void getList( // check cluster ownership for a given global namespace: redirect if peer-cluster owns it validateGlobalNamespaceOwnership(namespaceName); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + return; } catch (Exception e) { asyncResponse.resume(new RestException(e)); return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 04aa79f959e9b..96a5ca98eed64 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -32,6 +32,7 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; @@ -83,8 +84,10 @@ public void getList( try { validateNamespaceName(tenant, namespace); asyncResponse.resume(internalGetList()); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); } catch (Exception e) { - asyncResponse.resume(e instanceof RestException ? e : new RestException(e)); + asyncResponse.resume(new RestException(e)); } } @@ -482,6 +485,8 @@ public void getPartitionedStats( try { validatePartitionedTopicName(tenant, namespace, encodedTopic); internalGetPartitionedStats(asyncResponse, authoritative, perPartition); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); } catch (Exception e) { asyncResponse.resume(new RestException(e)); } @@ -510,6 +515,8 @@ public void getPartitionedStatsInternal( try { validateTopicName(tenant, namespace, encodedTopic); internalGetPartitionedStatsInternal(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); } catch (Exception e) { asyncResponse.resume(new RestException(e)); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 643020d774111..b277ffeb3669f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; @@ -47,6 +48,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.UriInfo; @@ -84,6 +86,7 @@ import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs.Ids; +import org.mockito.ArgumentCaptor; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -505,8 +508,11 @@ void properties() throws Exception { properties.createTenant("tenant-config-is-null", null); assertEquals(properties.getTenantAdmin("tenant-config-is-null"), nullTenantInfo); - - namespaces.deleteNamespace("my-tenant", "use", "my-namespace", false); + AsyncResponse response = mock(AsyncResponse.class); + namespaces.deleteNamespace(response, "my-tenant", "use", "my-namespace", false); + ArgumentCaptor captor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(captor.capture()); + assertEquals(captor.getValue().getStatus(), Status.OK.getStatusCode()); properties.deleteTenant("my-tenant"); properties.deleteTenant("tenant-config-is-null"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 138404e127174..763a42c4826d9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; @@ -43,9 +44,12 @@ import java.util.EnumSet; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import javax.ws.rs.ClientErrorException; import javax.ws.rs.WebApplicationException; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; @@ -79,6 +83,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.ZooDefs; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; import org.testng.annotations.AfterMethod; @@ -525,16 +530,14 @@ public void testNamespacesApiRedirects() throws Exception { // Trick to force redirection conf.setAuthorizationEnabled(true); - try { - namespaces.deleteNamespace(this.testTenant, this.testOtherCluster, - this.testLocalNamespaces.get(2).getLocalName(), false); - fail("Should have raised exception to redirect request"); - } catch (WebApplicationException wae) { - // OK - assertEquals(wae.getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode()); - assertEquals(wae.getResponse().getLocation().toString(), - UriBuilder.fromUri(uri).host("broker-usc.com").port(BROKER_WEBSERVICE_PORT).toString()); - } + AsyncResponse response = mock(AsyncResponse.class); + namespaces.deleteNamespace(response, this.testTenant, this.testOtherCluster, + this.testLocalNamespaces.get(2).getLocalName(), false); + ArgumentCaptor captor = ArgumentCaptor.forClass(WebApplicationException.class); + verify(response, timeout(5000).times(1)).resume(captor.capture()); + assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode()); + assertEquals(captor.getValue().getResponse().getLocation().toString(), + UriBuilder.fromUri(uri).host("broker-usc.com").port(BROKER_WEBSERVICE_PORT).toString()); uri = URI.create("http://localhost" + ":" + BROKER_WEBSERVICE_PORT + "/admin/namespace/" + this.testLocalNamespaces.get(2).toString() + "/unload"); @@ -572,27 +575,23 @@ public boolean matches(NamespaceName nsname) { doReturn(uri).when(uriInfo).getRequestUri(); doReturn(true).when(namespaces).isLeaderBroker(); - try { - namespaces.deleteNamespace(this.testLocalNamespaces.get(2).getTenant(), - this.testLocalNamespaces.get(2).getCluster(), this.testLocalNamespaces.get(2).getLocalName(), - false); - fail("Should have raised exception to redirect request"); - } catch (WebApplicationException wae) { - // OK - assertEquals(wae.getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode()); - assertEquals(wae.getResponse().getLocation().toString(), - UriBuilder.fromUri(uri).host("broker-usc.com").port(BROKER_WEBSERVICE_PORT).toString()); - } + response = mock(AsyncResponse.class); + namespaces.deleteNamespace(response, this.testLocalNamespaces.get(2).getTenant(), + this.testLocalNamespaces.get(2).getCluster(), this.testLocalNamespaces.get(2).getLocalName(), false); + captor = ArgumentCaptor.forClass(WebApplicationException.class); + verify(response, timeout(5000).times(1)).resume(captor.capture()); + assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode()); + assertEquals(captor.getValue().getResponse().getLocation().toString(), + UriBuilder.fromUri(uri).host("broker-usc.com").port(BROKER_WEBSERVICE_PORT).toString()); } @Test public void testDeleteNamespaces() throws Exception { - try { - namespaces.deleteNamespace(this.testTenant, this.testLocalCluster, "non-existing-namespace-1", false); - fail("should have failed"); - } catch (RestException e) { - assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); - } + AsyncResponse response = mock(AsyncResponse.class); + namespaces.deleteNamespace(response, this.testTenant, this.testLocalCluster, "non-existing-namespace-1", false); + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); + assertEquals(errorCaptor.getValue().getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); NamespaceName testNs = this.testLocalNamespaces.get(1); TopicName topicName = TopicName.get(testNs.getPersistentTopicName("my-topic")); @@ -603,39 +602,49 @@ public void testDeleteNamespaces() throws Exception { URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress()); doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, false, false, false); doReturn(true).when(nsSvc).isServiceUnitOwned(testNs); - try { - namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false); - fail("should have failed"); - } catch (RestException e) { - // Ok, namespace not empty - assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode()); - } + + response = mock(AsyncResponse.class); + namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false); + errorCaptor = ArgumentCaptor.forClass(RestException.class); + // Ok, namespace not empty + verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); + assertEquals(errorCaptor.getValue().getResponse().getStatus(), Status.CONFLICT.getStatusCode()); + // delete the topic from ZK mockZookKeeper.delete("/managed-ledgers/" + topicName.getPersistenceNamingEncoding(), -1); ZkUtils.createFullPathOptimistic(mockZookKeeper, "/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(), new byte[0], null, null); - try { - namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false); - fail("should have failed"); - } catch (RestException e) { - // Ok, namespace not empty - assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode()); - } + + response = mock(AsyncResponse.class); + namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false); + errorCaptor = ArgumentCaptor.forClass(RestException.class); + // Ok, namespace not empty + verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); + assertEquals(errorCaptor.getValue().getResponse().getStatus(), Status.CONFLICT.getStatusCode()); + mockZookKeeper.delete("/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(), -1); testNs = this.testGlobalNamespaces.get(0); // setup ownership to localhost doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, false, false, false); doReturn(true).when(nsSvc).isServiceUnitOwned(testNs); - namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false); + response = mock(AsyncResponse.class); + namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false); + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + assertEquals(responseCaptor.getValue().getStatus(), Status.OK.getStatusCode()); testNs = this.testLocalNamespaces.get(0); // setup ownership to localhost doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, false, false, false); doReturn(true).when(nsSvc).isServiceUnitOwned(testNs); - namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false); + response = mock(AsyncResponse.class); + namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + assertEquals(responseCaptor.getValue().getStatus(), Status.OK.getStatusCode()); List nsList = Lists.newArrayList(this.testLocalNamespaces.get(1).toString(), this.testLocalNamespaces.get(2).toString()); nsList.sort(null); @@ -647,7 +656,11 @@ public void testDeleteNamespaces() throws Exception { // setup ownership to localhost doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, false, false, false); doReturn(true).when(nsSvc).isServiceUnitOwned(testNs); - namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false); + response = mock(AsyncResponse.class); + namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + assertEquals(responseCaptor.getValue().getStatus(), Status.OK.getStatusCode()); } @Test @@ -682,9 +695,11 @@ public boolean matches(NamespaceBundle bundle) { } })); - doThrow(new PulsarAdminException.PreconditionFailedException( - new ClientErrorException(Status.PRECONDITION_FAILED))).when(namespacesAdmin) - .deleteNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + CompletableFuture preconditionFailed = new CompletableFuture<>(); + preconditionFailed.completeExceptionally(new PulsarAdminException.PreconditionFailedException( + new ClientErrorException(Status.PRECONDITION_FAILED))); + doReturn(preconditionFailed).when(namespacesAdmin) + .deleteNamespaceBundleAsync(Mockito.anyString(), Mockito.anyString()); try { namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000", @@ -694,19 +709,18 @@ public boolean matches(NamespaceBundle bundle) { assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); } - try { - namespaces.deleteNamespace(testTenant, testLocalCluster, bundledNsLocal, false); - fail("Should have failed"); - } catch (RestException re) { - assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); - } + AsyncResponse response = mock(AsyncResponse.class); + namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false); + ArgumentCaptor captor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000).times(1)).resume(captor.capture()); + assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData); // make one bundle owned doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0), false, true, false); doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0)); - doNothing().when(namespacesAdmin).deleteNamespaceBundle( + doReturn(CompletableFuture.completedFuture(null)).when(namespacesAdmin).deleteNamespaceBundleAsync( testTenant + "/" + testLocalCluster + "/" + bundledNsLocal, "0x00000000_0x80000000"); try { @@ -717,12 +731,11 @@ public boolean matches(NamespaceBundle bundle) { assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); } - try { - namespaces.deleteNamespace(testTenant, testLocalCluster, bundledNsLocal, false); - fail("should have failed"); - } catch (RestException re) { - assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); - } + response = mock(AsyncResponse.class); + namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false); + captor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000).times(1)).resume(captor.capture()); + assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); // ensure all three bundles are owned by the local broker for (NamespaceBundle bundle : nsBundles.getBundles()) { @@ -744,7 +757,11 @@ public void testUnloadNamespaces() throws Exception { doNothing().when(namespaces).validateBundleOwnership(bundle, false, true); // The namespace unload should succeed on all the bundles - namespaces.unloadNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName()); + AsyncResponse response = mock(AsyncResponse.class); + namespaces.unloadNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName()); + ArgumentCaptor captor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(captor.capture()); + assertEquals(captor.getValue().getStatus(), Status.OK.getStatusCode()); } @Test diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index a8289938c5f4e..c69a4a078f857 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; @@ -303,6 +304,20 @@ public interface Namespaces { */ void deleteNamespaceBundle(String namespace, String bundleRange) throws PulsarAdminException; + /** + * Delete an existing bundle in a namespace asynchronously. + *

+ * The bundle needs to be empty. + * + * @param namespace + * Namespace name + * @param bundleRange + * range of the bundle + * + * @return a future that can be used to track when the bundle is deleted + */ + CompletableFuture deleteNamespaceBundleAsync(String namespace, String bundleRange); + /** * Get permissions on a namespace. *

@@ -884,12 +899,24 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi * Unload namespace bundle * * @param namespace - * @bundle range of bundle to unload + * @param bundle + * range of bundle to unload * @throws PulsarAdminException * Unexpected error */ void unloadNamespaceBundle(String namespace, String bundle) throws PulsarAdminException; + /** + * Unload namespace bundle asynchronously + * + * @param namespace + * @param bundle + * range of bundle to unload + * + * @return a future that can be used to track when the bundle is unloaded + */ + CompletableFuture unloadNamespaceBundleAsync(String namespace, String bundle); + /** * Split namespace bundle * @@ -1013,6 +1040,16 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi */ void clearNamespaceBundleBacklog(String namespace, String bundle) throws PulsarAdminException; + /** + * Clear backlog for all topics on a namespace bundle asynchronously + * + * @param namespace + * @param bundle + * + * @return a future that can be used to track when the bundle is cleared + */ + CompletableFuture clearNamespaceBundleBacklogAsync(String namespace, String bundle); + /** * Clear backlog for a given subscription on all topics on a namespace bundle * @@ -1025,6 +1062,18 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi void clearNamespaceBundleBacklogForSubscription(String namespace, String bundle, String subscription) throws PulsarAdminException; + /** + * Clear backlog for a given subscription on all topics on a namespace bundle asynchronously + * + * @param namespace + * @param bundle + * @param subscription + * + * @return a future that can be used to track when the bundle is cleared + */ + CompletableFuture clearNamespaceBundleBacklogForSubscriptionAsync(String namespace, String bundle, + String subscription); + /** * Unsubscribes the given subscription on all topics on a namespace * @@ -1044,6 +1093,17 @@ void clearNamespaceBundleBacklogForSubscription(String namespace, String bundle, */ void unsubscribeNamespaceBundle(String namespace, String bundle, String subscription) throws PulsarAdminException; + /** + * Unsubscribes the given subscription on all topics on a namespace bundle asynchronously + * + * @param namespace + * @param bundle + * @param subscription + * + * @return a future that can be used to track when the subscription is unsubscribed + */ + CompletableFuture unsubscribeNamespaceBundleAsync(String namespace, String bundle, String subscription); + /** * Set the encryption required status for all topics within a namespace. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 08f7b872dadf4..a5470a43042b4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Entity; @@ -192,14 +194,22 @@ public void deleteNamespace(String namespace) throws PulsarAdminException { @Override public void deleteNamespaceBundle(String namespace, String bundleRange) throws PulsarAdminException { try { - NamespaceName ns = NamespaceName.get(namespace); - WebTarget path = namespacePath(ns, bundleRange); - request(path).delete(ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + deleteNamespaceBundleAsync(namespace, bundleRange).get(); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); } } + @Override + public CompletableFuture deleteNamespaceBundleAsync(String namespace, String bundleRange) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, bundleRange); + return asyncDeleteRequest(path); + } + @Override public Map> getPermissions(String namespace) throws PulsarAdminException { try { @@ -496,14 +506,22 @@ public String getReplicationConfigVersion(String namespace) throws PulsarAdminEx @Override public void unloadNamespaceBundle(String namespace, String bundle) throws PulsarAdminException { try { - NamespaceName ns = NamespaceName.get(namespace); - WebTarget path = namespacePath(ns, bundle, "unload"); - request(path).put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + unloadNamespaceBundleAsync(namespace, bundle).get(); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); } } + @Override + public CompletableFuture unloadNamespaceBundleAsync(String namespace, String bundle) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, bundle, "unload"); + return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + @Override public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles) throws PulsarAdminException { @@ -631,26 +649,43 @@ public void clearNamespaceBacklogForSubscription(String namespace, String subscr @Override public void clearNamespaceBundleBacklog(String namespace, String bundle) throws PulsarAdminException { try { - NamespaceName ns = NamespaceName.get(namespace); - WebTarget path = namespacePath(ns, bundle, "clearBacklog"); - request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + clearNamespaceBundleBacklogAsync(namespace, bundle).get(); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); } } + @Override + public CompletableFuture clearNamespaceBundleBacklogAsync(String namespace, String bundle) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, bundle, "clearBacklog"); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + @Override public void clearNamespaceBundleBacklogForSubscription(String namespace, String bundle, String subscription) throws PulsarAdminException { try { - NamespaceName ns = NamespaceName.get(namespace); - WebTarget path = namespacePath(ns, bundle, "clearBacklog", subscription); - request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + clearNamespaceBundleBacklogForSubscriptionAsync(namespace, bundle, subscription).get(); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); } } + @Override + public CompletableFuture clearNamespaceBundleBacklogForSubscriptionAsync(String namespace, String bundle, + String subscription) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, bundle, "clearBacklog", subscription); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + @Override public void unsubscribeNamespace(String namespace, String subscription) throws PulsarAdminException { try { @@ -666,14 +701,23 @@ public void unsubscribeNamespace(String namespace, String subscription) throws P public void unsubscribeNamespaceBundle(String namespace, String bundle, String subscription) throws PulsarAdminException { try { - NamespaceName ns = NamespaceName.get(namespace); - WebTarget path = namespacePath(ns, bundle, "unsubscribe", subscription); - request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + unsubscribeNamespaceBundleAsync(namespace, bundle, subscription).get(); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); } } + @Override + public CompletableFuture unsubscribeNamespaceBundleAsync(String namespace, String bundle, + String subscription) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, bundle, "unsubscribe", subscription); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + @Override public void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscriptionAuthMode) throws PulsarAdminException { try {