Skip to content

Commit

Permalink
[API] Add endpoints to list KafkaRebalances and patch action annotati…
Browse files Browse the repository at this point in the history
…on (#1060)

* Add endpoints to list KafkaRebalances and patch action annotation
* Add meta model, return rebalance conditions
* Add rebalance ITs, use latest Strimzi Kafka image for tests
* Reducing duplication, remove unnecessary rebalance id filter
* Do not suppress state conditions from response array
* Add sessionId and autoApproval to KafkaRebalance

---------

Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar authored Oct 1, 2024
1 parent 73c9b56 commit 3454a0a
Show file tree
Hide file tree
Showing 23 changed files with 1,425 additions and 72 deletions.
4 changes: 2 additions & 2 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

<!-- System test image dependencies -->
<keycloak.image>quay.io/keycloak/keycloak:21.1</keycloak.image>
<strimzi-kafka.tag>quay.io/strimzi/kafka:0.41.0-kafka-3.7.0</strimzi-kafka.tag>
<strimzi-kafka.tag>quay.io/strimzi/kafka:0.43.0-kafka-3.8.0</strimzi-kafka.tag>
</properties>

<dependencies>
Expand Down Expand Up @@ -357,7 +357,7 @@
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<systemProperties>
<keycloak.image>${keycloak.image}</keycloak.image>
<strimzi-kafka.tag>${strimzi-kafka.tag}</strimzi-kafka.tag>
<strimzi.test-container.kafka.custom.image>${strimzi-kafka.tag}</strimzi.test-container.kafka.custom.image>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
<quarkus.jacoco.reuse-data-file>true</quarkus.jacoco.reuse-data-file>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public Response listClusters(
KafkaCluster.Fields.STATUS,
KafkaCluster.Fields.CONDITIONS,
KafkaCluster.Fields.NODE_POOLS,
KafkaCluster.Fields.CRUISE_CONTROL_ENABLED,
},
message = "list contains a value that is not valid or not available for the operation",
payload = ErrorCategory.InvalidQueryParameter.class)
Expand All @@ -92,6 +93,7 @@ public Response listClusters(
KafkaCluster.Fields.STATUS,
KafkaCluster.Fields.CONDITIONS,
KafkaCluster.Fields.NODE_POOLS,
KafkaCluster.Fields.CRUISE_CONTROL_ENABLED,
}))
List<String> fields,

Expand Down Expand Up @@ -137,6 +139,7 @@ public CompletionStage<Response> describeCluster(
KafkaCluster.Fields.STATUS,
KafkaCluster.Fields.CONDITIONS,
KafkaCluster.Fields.NODE_POOLS,
KafkaCluster.Fields.CRUISE_CONTROL_ENABLED,
},
payload = ErrorCategory.InvalidQueryParameter.class)
@Parameter(
Expand All @@ -159,6 +162,7 @@ public CompletionStage<Response> describeCluster(
KafkaCluster.Fields.STATUS,
KafkaCluster.Fields.CONDITIONS,
KafkaCluster.Fields.NODE_POOLS,
KafkaCluster.Fields.CRUISE_CONTROL_ENABLED,
}))
List<String> fields) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package com.github.streamshub.console.api;

import java.util.List;
import java.util.function.Consumer;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.validation.ConstraintTarget;
import jakarta.validation.Valid;
import jakarta.ws.rs.BeanParam;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.PATCH;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriInfo;

import org.eclipse.microprofile.openapi.annotations.enums.Explode;
import org.eclipse.microprofile.openapi.annotations.enums.SchemaType;
import org.eclipse.microprofile.openapi.annotations.media.Schema;
import org.eclipse.microprofile.openapi.annotations.parameters.Parameter;
import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
import org.eclipse.microprofile.openapi.annotations.responses.APIResponseSchema;
import org.eclipse.microprofile.openapi.annotations.tags.Tag;

import com.github.streamshub.console.api.model.KafkaRebalance;
import com.github.streamshub.console.api.model.KafkaRebalanceFilterParams;
import com.github.streamshub.console.api.model.ListFetchParams;
import com.github.streamshub.console.api.service.KafkaRebalanceService;
import com.github.streamshub.console.api.support.ErrorCategory;
import com.github.streamshub.console.api.support.FieldFilter;
import com.github.streamshub.console.api.support.ListRequestContext;
import com.github.streamshub.console.api.support.StringEnumeration;

import io.xlate.validation.constraints.Expression;

@Path("/api/kafkas/{clusterId}/rebalances")
@Tag(name = "Kafka Cluster Resources")
public class KafkaRebalancesResource {

@Inject
UriInfo uriInfo;

@Inject
KafkaRebalanceService rebalanceService;

/**
* Allows the value of {@link FieldFilter#requestedFields} to be set for
* the request.
*/
@Inject
@Named("requestedFields")
Consumer<List<String>> requestedFields;

@GET
@Produces(MediaType.APPLICATION_JSON)
@APIResponseSchema(KafkaRebalance.RebalanceDataList.class)
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
public Response listRebalances(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
String clusterId,

@QueryParam(KafkaRebalance.FIELDS_PARAM)
@DefaultValue(KafkaRebalance.Fields.LIST_DEFAULT)
@StringEnumeration(
source = KafkaRebalance.FIELDS_PARAM,
allowedValues = {
KafkaRebalance.Fields.NAME,
KafkaRebalance.Fields.NAMESPACE,
KafkaRebalance.Fields.CREATION_TIMESTAMP,
KafkaRebalance.Fields.STATUS,
KafkaRebalance.Fields.MODE,
KafkaRebalance.Fields.BROKERS,
KafkaRebalance.Fields.GOALS,
KafkaRebalance.Fields.SKIP_HARD_GOAL_CHECK,
KafkaRebalance.Fields.REBALANCE_DISK,
KafkaRebalance.Fields.EXCLUDED_TOPICS,
KafkaRebalance.Fields.CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER,
KafkaRebalance.Fields.CONCURRENT_INTRABROKER_PARTITION_MOVEMENTS,
KafkaRebalance.Fields.CONCURRENT_LEADER_MOVEMENTS,
KafkaRebalance.Fields.REPLICATION_THROTTLE,
KafkaRebalance.Fields.REPLICA_MOVEMENT_STRATEGIES,
KafkaRebalance.Fields.SESSION_ID,
KafkaRebalance.Fields.OPTIMIZATION_RESULT,
KafkaRebalance.Fields.CONDITIONS,
},
payload = ErrorCategory.InvalidQueryParameter.class)
@Parameter(
description = FieldFilter.FIELDS_DESCR,
explode = Explode.FALSE,
schema = @Schema(
type = SchemaType.ARRAY,
implementation = String.class,
enumeration = {
KafkaRebalance.Fields.NAME,
KafkaRebalance.Fields.NAMESPACE,
KafkaRebalance.Fields.CREATION_TIMESTAMP,
KafkaRebalance.Fields.STATUS,
KafkaRebalance.Fields.MODE,
KafkaRebalance.Fields.BROKERS,
KafkaRebalance.Fields.GOALS,
KafkaRebalance.Fields.SKIP_HARD_GOAL_CHECK,
KafkaRebalance.Fields.REBALANCE_DISK,
KafkaRebalance.Fields.EXCLUDED_TOPICS,
KafkaRebalance.Fields.CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER,
KafkaRebalance.Fields.CONCURRENT_INTRABROKER_PARTITION_MOVEMENTS,
KafkaRebalance.Fields.CONCURRENT_LEADER_MOVEMENTS,
KafkaRebalance.Fields.REPLICATION_THROTTLE,
KafkaRebalance.Fields.REPLICA_MOVEMENT_STRATEGIES,
KafkaRebalance.Fields.SESSION_ID,
KafkaRebalance.Fields.OPTIMIZATION_RESULT,
KafkaRebalance.Fields.CONDITIONS,
}))
List<String> fields,

@BeanParam
@Valid
ListFetchParams listParams,

@BeanParam
@Valid
KafkaRebalanceFilterParams filters) {

requestedFields.accept(fields);

ListRequestContext<KafkaRebalance> listSupport = new ListRequestContext<>(
filters.buildPredicates(),
KafkaRebalance.Fields.COMPARATOR_BUILDER,
uriInfo.getRequestUri(),
listParams,
KafkaRebalance::fromCursor);

var rebalanceList = rebalanceService.listRebalances(listSupport);
var responseEntity = new KafkaRebalance.RebalanceDataList(rebalanceList, listSupport);

return Response.ok(responseEntity).build();
}

@Path("{rebalanceId}")
@PATCH
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@APIResponseSchema(responseCode = "200", value = KafkaRebalance.RebalanceData.class)
@Expression(
targetName = "args",
// Only check when the request body Id is present (separately checked for @NotNull)
when = "args[2].data.id != null",
// Verify the Id in the request body matches the Id in the URL
value = "args[1].equals(args[2].data.id)",
message = "resource ID conflicts with operation URL",
node = { "data", "id" },
payload = ErrorCategory.InvalidResource.class,
validationAppliesTo = ConstraintTarget.PARAMETERS)
public Response patchRebalance(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
String clusterId,

@PathParam("rebalanceId")
@Parameter(description = "Rebalance identifier")
String rebalanceId,

@Valid
KafkaRebalance.RebalanceData rebalance) {

requestedFields.accept(List.of(
KafkaRebalance.Fields.NAME,
KafkaRebalance.Fields.NAMESPACE,
KafkaRebalance.Fields.CREATION_TIMESTAMP,
KafkaRebalance.Fields.STATUS,
KafkaRebalance.Fields.MODE,
KafkaRebalance.Fields.BROKERS,
KafkaRebalance.Fields.GOALS,
KafkaRebalance.Fields.SKIP_HARD_GOAL_CHECK,
KafkaRebalance.Fields.REBALANCE_DISK,
KafkaRebalance.Fields.EXCLUDED_TOPICS,
KafkaRebalance.Fields.CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER,
KafkaRebalance.Fields.CONCURRENT_INTRABROKER_PARTITION_MOVEMENTS,
KafkaRebalance.Fields.CONCURRENT_LEADER_MOVEMENTS,
KafkaRebalance.Fields.REPLICATION_THROTTLE,
KafkaRebalance.Fields.REPLICA_MOVEMENT_STRATEGIES,
KafkaRebalance.Fields.SESSION_ID,
KafkaRebalance.Fields.OPTIMIZATION_RESULT));

var result = rebalanceService.patchRebalance(rebalanceId, rebalance.getData());
var responseEntity = new KafkaRebalance.RebalanceData(result);

return Response.ok(responseEntity).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;

import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -35,7 +36,7 @@ abstract class UnwrappingExceptionHandler<T extends Throwable> implements Except

@Override
public Response toResponse(T exception) {
Throwable cause = exception.getCause();
Throwable cause = Optional.ofNullable(exception.getCause()).orElse(exception);
List<Throwable> suppressed = Arrays.asList(exception.getSuppressed());

if (suppressed.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.github.streamshub.console.api.errors.client;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.NotAllowedException;
import jakarta.ws.rs.ext.Provider;

import com.github.streamshub.console.api.support.ErrorCategory;

@Provider
@ApplicationScoped
public class NotAllowedExceptionHandler extends AbstractClientExceptionHandler<NotAllowedException> {

public NotAllowedExceptionHandler() {
super(ErrorCategory.MethodNotAllowed.class, "HTTP method not allowed for this resource", (String) null);
}

@Override
public boolean handlesException(Throwable thrown) {
return thrown instanceof NotAllowedException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
@JsonInclude(value = Include.NON_NULL)
public class Error {

@Schema(description = "A meta object containing non-standard meta-information about the error")
Map<String, Object> meta;
@Schema(
description = "A meta object containing non-standard meta-information about the error",
implementation = JsonApiMeta.class)
JsonApiMeta meta;

@Schema(description = """
a links object that MAY contain the following members:
Expand Down Expand Up @@ -58,15 +60,12 @@ public Error(String title, String detail, Throwable cause) {
this.cause = cause;
}

public Map<String, Object> getMeta() {
public JsonApiMeta getMeta() {
return meta;
}

public Error addMeta(String key, Object value) {
if (meta == null) {
meta = new LinkedHashMap<>();
}
meta.put(key, value);
meta = JsonApiMeta.put(meta, key, value);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
@JsonInclude(value = Include.NON_NULL)
public abstract class JsonApiDocument {

private Map<String, Object> meta;
private JsonApiMeta meta;
private Map<String, String> links;

static <K, V> Map<K, V> addEntry(Map<K, V> map, K key, V value) {
Expand All @@ -27,7 +27,7 @@ static <K, V> Map<K, V> addEntry(Map<K, V> map, K key, V value) {
}

@JsonProperty
public Map<String, Object> meta() {
public JsonApiMeta meta() {
return meta;
}

Expand All @@ -36,7 +36,7 @@ public Object meta(String key) {
}

public JsonApiDocument addMeta(String key, Object value) {
meta = addEntry(meta, key, value);
meta = JsonApiMeta.put(meta, key, value);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.github.streamshub.console.api.model;

import java.util.LinkedHashMap;
import java.util.Map;

import org.eclipse.microprofile.openapi.annotations.media.Schema;

import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;

@Schema(additionalProperties = Object.class)
public class JsonApiMeta {

public static JsonApiMeta put(JsonApiMeta meta, String key, Object value) {
if (meta == null) {
meta = new JsonApiMeta();
}
meta.put(key, value);
return meta;
}

@JsonIgnore
private Map<String, Object> meta;

@JsonAnyGetter
public Map<String, Object> get() {
return meta;
}

public Object get(String key) {
return meta != null ? meta.get(key) : null;
}

@JsonAnySetter
public JsonApiMeta put(String key, Object value) {
if (meta == null) {
meta = new LinkedHashMap<>();
}
meta.put(key, value);
return this;
}

}
Loading

0 comments on commit 3454a0a

Please sign in to comment.