Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
Process requests asynchronously on some REST APIs (2) (#4778)
Browse files Browse the repository at this point in the history
Master Issue: #4756

### Motivation

This is a continuation of apache#4765.

### Modifications

Added async rest handlers to the following APIs:
```
DELETE /admin/namespaces/{tenant}/{cluster}/{namespace}
PUT    /admin/namespaces/{tenant}/{cluster}/{namespace}/unload
POST   /admin/namespaces/{tenant}/{cluster}/{namespace}/clearBacklog
POST   /admin/namespaces/{tenant}/{cluster}/{namespace}/clearBacklog/{subscription}
POST   /admin/namespaces/{tenant}/{cluster}/{namespace}/unsubscribe/{subscription}

DELETE /admin/v2/namespaces/{tenant}/{namespace}
PUT    /admin/v2/namespaces/{tenant}/{namespace}/unload
POST   /admin/v2/namespaces/{tenant}/{namespace}/clearBacklog
POST   /admin/v2/namespaces/{tenant}/{namespace}/clearBacklog/{subscription}
POST   /admin/v2/namespaces/{tenant}/{namespace}/unsubscribe/{subscription}
```
  • Loading branch information
massakam authored and sijie committed Jul 23, 2019
1 parent 8e1d00f commit 2cc34af
Show file tree
Hide file tree
Showing 11 changed files with 511 additions and 225 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import java.util.List;
Expand Down Expand Up @@ -178,11 +181,17 @@ public void createNamespace(@PathParam("property") String property, @PathParam("
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace is not empty") })
public void deleteNamespace(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(property, cluster, namespace);
internalDeleteNamespace(authoritative);
try {
validateNamespaceName(property, cluster, namespace);
internalDeleteNamespace(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@DELETE
Expand Down Expand Up @@ -400,10 +409,16 @@ public BundlesData getBundlesData(@PathParam("property") String property, @PathP
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is already unloaded or Namespace has bundles activated") })
public void unloadNamespace(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
internalUnloadNamespace();
public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
try {
validateNamespaceName(property, cluster, namespace);
internalUnloadNamespace(asyncResponse);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@PUT
Expand Down Expand Up @@ -605,11 +620,18 @@ public PersistencePolicies getPersistence(@PathParam("property") String property
@ApiOperation(hidden = true, value = "Clear backlog for all topics on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void clearNamespaceBacklog(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void clearNamespaceBacklog(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(property, cluster, namespace);
internalClearNamespaceBacklog(authoritative);
try {
validateNamespaceName(property, cluster, namespace);
internalClearNamespaceBacklog(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
Expand All @@ -630,12 +652,18 @@ public void clearNamespaceBundleBacklog(@PathParam("property") String property,
@ApiOperation(hidden = true, value = "Clear backlog for a given subscription on all topics on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void clearNamespaceBacklogForSubscription(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
public void clearNamespaceBacklogForSubscription(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(property, cluster, namespace);
internalClearNamespaceBacklogForSubscription(subscription, authoritative);
try {
validateNamespaceName(property, cluster, namespace);
internalClearNamespaceBacklogForSubscription(asyncResponse, subscription, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
Expand All @@ -656,11 +684,18 @@ public void clearNamespaceBundleBacklogForSubscription(@PathParam("property") St
@ApiOperation(hidden = true, value = "Unsubscribes the given subscription on all topics on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void unsubscribeNamespace(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(property, cluster, namespace);
internalUnsubscribeNamespace(subscription, authoritative);
try {
validateNamespaceName(property, cluster, namespace);
internalUnsubscribeNamespace(asyncResponse, subscription, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -180,6 +181,9 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(nsName);
}
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
return;
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -76,8 +77,10 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr
try {
validateNamespaceName(property, cluster, namespace);
asyncResponse.resume(internalGetList());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(e instanceof RestException ? e : new RestException(e));
asyncResponse.resume(new RestException(e));
}
}

Expand Down Expand Up @@ -286,6 +289,8 @@ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse,
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
Expand All @@ -306,6 +311,8 @@ public void getPartitionedStatsInternal(
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetPartitionedStatsInternal(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;

import org.apache.pulsar.broker.admin.impl.NamespacesBase;
Expand Down Expand Up @@ -126,10 +129,17 @@ public void createNamespace(@PathParam("tenant") String tenant, @PathParam("name
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace is not empty") })
public void deleteNamespace(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
internalDeleteNamespace(authoritative);
try {
validateNamespaceName(tenant, namespace);
internalDeleteNamespace(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@DELETE
Expand Down Expand Up @@ -302,9 +312,16 @@ public BundlesData getBundlesData(@PathParam("tenant") String tenant,
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is already unloaded or Namespace has bundles activated") })
public void unloadNamespace(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalUnloadNamespace();
public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
try {
validateNamespaceName(tenant, namespace);
internalUnloadNamespace(asyncResponse);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@PUT
Expand Down Expand Up @@ -545,10 +562,17 @@ public PersistencePolicies getPersistence(@PathParam("tenant") String tenant,
@ApiOperation(value = "Clear backlog for all topics on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void clearNamespaceBacklog(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
public void clearNamespaceBacklog(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
internalClearNamespaceBacklog(authoritative);
try {
validateNamespaceName(tenant, namespace);
internalClearNamespaceBacklog(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
Expand All @@ -568,11 +592,18 @@ public void clearNamespaceBundleBacklog(@PathParam("tenant") String tenant,
@ApiOperation(value = "Clear backlog for a given subscription on all topics on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void clearNamespaceBacklogForSubscription(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("subscription") String subscription,
public void clearNamespaceBacklogForSubscription(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
internalClearNamespaceBacklogForSubscription(subscription, authoritative);
try {
validateNamespaceName(tenant, namespace);
internalClearNamespaceBacklogForSubscription(asyncResponse, subscription, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
Expand All @@ -593,11 +624,18 @@ public void clearNamespaceBundleBacklogForSubscription(@PathParam("tenant") Stri
@ApiOperation(value = "Unsubscribes the given subscription on all topics on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void unsubscribeNamespace(@PathParam("tenant") String tenant, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("subscription") String subscription,
public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
internalUnsubscribeNamespace(subscription, authoritative);
try {
validateNamespaceName(tenant, namespace);
internalUnsubscribeNamespace(asyncResponse, subscription, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -240,6 +241,9 @@ public void getList(

// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
return;
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -83,8 +84,10 @@ public void getList(
try {
validateNamespaceName(tenant, namespace);
asyncResponse.resume(internalGetList());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(e instanceof RestException ? e : new RestException(e));
asyncResponse.resume(new RestException(e));
}
}

Expand Down Expand Up @@ -482,6 +485,8 @@ public void getPartitionedStats(
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
Expand Down Expand Up @@ -510,6 +515,8 @@ public void getPartitionedStatsInternal(
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGetPartitionedStatsInternal(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
Expand All @@ -47,6 +48,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
Expand Down Expand Up @@ -84,6 +86,7 @@
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -505,8 +508,11 @@ void properties() throws Exception {
properties.createTenant("tenant-config-is-null", null);
assertEquals(properties.getTenantAdmin("tenant-config-is-null"), nullTenantInfo);


namespaces.deleteNamespace("my-tenant", "use", "my-namespace", false);
AsyncResponse response = mock(AsyncResponse.class);
namespaces.deleteNamespace(response, "my-tenant", "use", "my-namespace", false);
ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getStatus(), Status.OK.getStatusCode());
properties.deleteTenant("my-tenant");
properties.deleteTenant("tenant-config-is-null");
}
Expand Down
Loading

0 comments on commit 2cc34af

Please sign in to comment.