Skip to content

Commit

Permalink
Reducing duplication, remove unnecessary rebalance id filter
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Sep 20, 2024
1 parent 212af1b commit 1658a7c
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 140 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
package com.github.streamshub.console.api.model;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonObjectBuilder;
import jakarta.json.JsonValue;

import org.eclipse.microprofile.openapi.annotations.media.Schema;

Expand All @@ -24,7 +19,7 @@
import static java.util.Comparator.nullsLast;

@Schema(name = "KafkaCluster")
public class KafkaCluster extends Resource<KafkaCluster.Attributes> {
public class KafkaCluster extends Resource<KafkaCluster.Attributes> implements PaginatedKubeResource {

public static class Fields {
public static final String NAME = "name";
Expand Down Expand Up @@ -166,58 +161,35 @@ public KafkaCluster(String id, List<Node> nodes, Node controller, List<String> a
* of Topic fields used to compare entities for pagination/sorting.
*/
public static KafkaCluster fromCursor(JsonObject cursor) {
if (cursor == null) {
return null;
}

KafkaCluster cluster = new KafkaCluster(cursor.getString("id"), null, null, null);
JsonObject attr = cursor.getJsonObject("attributes");
cluster.name(attr.getString(Fields.NAME, null));
cluster.namespace(attr.getString(Fields.NAMESPACE, null));
cluster.creationTimestamp(attr.getString(Fields.CREATION_TIMESTAMP, null));

return cluster;
}

public String toCursor(List<String> sortFields) {
JsonObjectBuilder cursor = Json.createObjectBuilder()
.add("id", id == null ? Json.createValue("") : Json.createValue(id));

JsonObjectBuilder attrBuilder = Json.createObjectBuilder();
maybeAddAttribute(attrBuilder, sortFields, Fields.NAME, attributes.name);
maybeAddAttribute(attrBuilder, sortFields, Fields.NAMESPACE, attributes.namespace);
maybeAddAttribute(attrBuilder, sortFields, Fields.CREATION_TIMESTAMP, attributes.creationTimestamp);
cursor.add("attributes", attrBuilder.build());

return Base64.getUrlEncoder().encodeToString(cursor.build().toString().getBytes(StandardCharsets.UTF_8));
}

static void maybeAddAttribute(JsonObjectBuilder attrBuilder, List<String> sortFields, String key, String value) {
if (sortFields.contains(key)) {
attrBuilder.add(key, value != null ? Json.createValue(value) : JsonValue.NULL);
}
return PaginatedKubeResource.fromCursor(cursor, id -> new KafkaCluster(id, null, null, null));
}

@Override
public String name() {
return attributes.name;
}

@Override
public void name(String name) {
attributes.name = name;
}

@Override
public String namespace() {
return attributes.namespace;
}

@Override
public void namespace(String namespace) {
attributes.namespace = namespace;
}

@Override
public String creationTimestamp() {
return attributes.creationTimestamp;
}

@Override
public void creationTimestamp(String creationTimestamp) {
attributes.creationTimestamp = creationTimestamp;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
package com.github.streamshub.console.api.model;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonObjectBuilder;
import jakarta.json.JsonValue;

import org.eclipse.microprofile.openapi.annotations.media.Schema;
import org.eclipse.microprofile.openapi.annotations.media.SchemaProperty;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonFilter;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.streamshub.console.api.support.ComparatorBuilder;
import com.github.streamshub.console.api.support.ErrorCategory;
import com.github.streamshub.console.api.support.ListRequestContext;
Expand Down Expand Up @@ -48,7 +43,7 @@
message = "resource type conflicts with operation",
node = "type",
payload = ErrorCategory.ResourceConflict.class)
public class KafkaRebalance extends Resource<KafkaRebalance.Attributes> {
public class KafkaRebalance extends Resource<KafkaRebalance.Attributes> implements PaginatedKubeResource {

public static final String API_TYPE = "kafkaRebalances";
public static final String FIELDS_PARAM = "fields[" + API_TYPE + "]";
Expand Down Expand Up @@ -253,62 +248,39 @@ public KafkaRebalance(String id, String type, KafkaRebalance.Attributes attribut
* of KafkaRebalance fields used to compare entities for pagination/sorting.
*/
public static KafkaRebalance fromCursor(JsonObject cursor) {
if (cursor == null) {
return null;
}

KafkaRebalance rebalance = new KafkaRebalance(cursor.getString("id"));
JsonObject attr = cursor.getJsonObject("attributes");
rebalance.name(attr.getString(Fields.NAME, null));
rebalance.namespace(attr.getString(Fields.NAMESPACE, null));
rebalance.creationTimestamp(attr.getString(Fields.CREATION_TIMESTAMP, null));

return rebalance;
}

public String toCursor(List<String> sortFields) {
JsonObjectBuilder cursor = Json.createObjectBuilder()
.add("id", id == null ? Json.createValue("") : Json.createValue(id));

JsonObjectBuilder attrBuilder = Json.createObjectBuilder();
maybeAddAttribute(attrBuilder, sortFields, Fields.NAME, attributes.name);
maybeAddAttribute(attrBuilder, sortFields, Fields.NAMESPACE, attributes.namespace);
maybeAddAttribute(attrBuilder, sortFields, Fields.CREATION_TIMESTAMP, attributes.creationTimestamp);
cursor.add("attributes", attrBuilder.build());

return Base64.getUrlEncoder().encodeToString(cursor.build().toString().getBytes(StandardCharsets.UTF_8));
}

static void maybeAddAttribute(JsonObjectBuilder attrBuilder, List<String> sortFields, String key, String value) {
if (sortFields.contains(key)) {
attrBuilder.add(key, value != null ? Json.createValue(value) : JsonValue.NULL);
}
return PaginatedKubeResource.fromCursor(cursor, KafkaRebalance::new);
}

public String action() {
return ((Meta) super.getMeta()).action();
}

@Override
public String name() {
return attributes.name;
}

@Override
public void name(String name) {
attributes.name = name;
}

@Override
public String namespace() {
return attributes.namespace;
}

@Override
public void namespace(String namespace) {
attributes.namespace = namespace;
}

@Override
public String creationTimestamp() {
return attributes.creationTimestamp;
}

@Override
public void creationTimestamp(String creationTimestamp) {
attributes.creationTimestamp = creationTimestamp;
}
Expand All @@ -333,82 +305,42 @@ public void mode(String mode) {
attributes.mode = mode;
}

public List<Integer> brokers() {
return attributes.brokers;
}

public void brokers(List<Integer> brokers) {
attributes.brokers = brokers;
}

public List<String> goals() {
return attributes.goals;
}

public void goals(List<String> goals) {
attributes.goals = goals;
}

public boolean skipHardGoalCheck() {
return attributes.skipHardGoalCheck;
}

public void skipHardGoalCheck(boolean skipHardGoalCheck) {
attributes.skipHardGoalCheck = skipHardGoalCheck;
}

public boolean rebalanceDisk() {
return attributes.rebalanceDisk;
}

public void rebalanceDisk(boolean rebalanceDisk) {
attributes.rebalanceDisk = rebalanceDisk;
}

public String excludedTopics() {
return attributes.excludedTopics;
}

public void excludedTopics(String excludedTopics) {
attributes.excludedTopics = excludedTopics;
}

public int concurrentPartitionMovementsPerBroker() {
return attributes.concurrentPartitionMovementsPerBroker;
}

public void concurrentPartitionMovementsPerBroker(int concurrentPartitionMovementsPerBroker) {
attributes.concurrentPartitionMovementsPerBroker = concurrentPartitionMovementsPerBroker;
}

public int concurrentIntraBrokerPartitionMovements() {
return attributes.concurrentIntraBrokerPartitionMovements;
}

public void concurrentIntraBrokerPartitionMovements(int concurrentIntraBrokerPartitionMovements) {
attributes.concurrentIntraBrokerPartitionMovements = concurrentIntraBrokerPartitionMovements;
}

public int concurrentLeaderMovements() {
return attributes.concurrentLeaderMovements;
}

public void concurrentLeaderMovements(int concurrentLeaderMovements) {
attributes.concurrentLeaderMovements = concurrentLeaderMovements;
}

public long replicationThrottle() {
return attributes.replicationThrottle;
}

public void replicationThrottle(long replicationThrottle) {
attributes.replicationThrottle = replicationThrottle;
}

public List<String> replicaMovementStrategies() {
return attributes.replicaMovementStrategies;
}

public void replicaMovementStrategies(List<String> replicaMovementStrategies) {
attributes.replicaMovementStrategies = replicaMovementStrategies;
}
Expand All @@ -417,10 +349,6 @@ public Map<String, Object> optimizationResult() {
return attributes.optimizationResult;
}

public List<Condition> conditions() {
return attributes.conditions;
}

public void conditions(List<Condition> conditions) {
attributes.conditions = conditions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,6 @@

public class KafkaRebalanceFilterParams {

@QueryParam("filter[id]")
@Parameter(
description = "Retrieve only rebalances with an ID matching this parameter",
schema = @Schema(implementation = String[].class, minItems = 2),
explode = Explode.FALSE)
@Expression(
when = "self != null",
value = "self.operator == 'eq' || self.operator == 'in'",
message = "unsupported filter operator, supported values: [ 'eq', 'in' ]",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[id]")
@Expression(
when = "self != null",
value = "self.operands.size() >= 1",
message = "at least 1 operand is required",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[id]")
FetchFilter idFilter;

@QueryParam("filter[name]")
@Parameter(
description = "Retrieve only rebalances with a name matching this parameter",
Expand Down Expand Up @@ -100,10 +81,6 @@ public List<Predicate<KafkaRebalance>> buildPredicates() {
predicates.add(new FetchFilterPredicate<>(nameFilter, KafkaRebalance::name));
}

if (idFilter != null) {
predicates.add(new FetchFilterPredicate<>(idFilter, KafkaRebalance::getId));
}

if (statusFilter != null) {
predicates.add(new FetchFilterPredicate<>(statusFilter, KafkaRebalance::status));
}
Expand Down
Loading

0 comments on commit 1658a7c

Please sign in to comment.