Skip to content

Commit

Permalink
#58 Topics: remove topics that aren't in the spec.
Browse files Browse the repository at this point in the history
note on 'Provision' command updated flags:
- use -dry or --dry-run to check action (inc cleaning)
- use -clean or --clean-unspecified to clean resources
  • Loading branch information
bluemonk3y committed Sep 30, 2023
1 parent 718413c commit 04bde95
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 69 deletions.
20 changes: 15 additions & 5 deletions cli/src/main/java/io/specmesh/cli/Provision.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,22 @@ public static void main(final String[] args) {
@Option(
names = {"-dry", "--dry-run"},
description =
"Compares the cluster against the spec, outputting proposed changes if"
+ " compatible.If the spec incompatible with the cluster (not sure how it"
+ " could be) then will fail with a descriptive error message.A return"
+ " value of 0=indicates no changes needed; 1=changes needed; -1=not"
+ " compatible, blah blah")
"Compares the cluster resources against the spec, outputting proposed changes"
+ " if compatible. If the spec incompatible with the cluster (not sure how"
+ " it could be) then will fail with a descriptive error message. A return"
+ " value of '0' = indicates no changes needed; '1' = changes needed; '-1'"
+ " not compatible")
private boolean dryRun;

@Option(
names = {"-clean", "--clean-unspecified"},
description =
"Compares the cluster resources against the spec, outputting proposed set of"
+ " resources that are unexpected (not specified). Use with '-dry-run' for"
+ " non-destructive checks. This operation will not create resources, it"
+ " will only remove unspecified resources")
private boolean cleanUnspecified;

@Option(
names = "-D",
mapFallbackValue = "",
Expand All @@ -113,6 +122,7 @@ public Integer call() throws Exception {
final var status =
Provisioner.provision(
dryRun,
cleanUnspecified,
specMeshSpec(),
schemaPath,
Clients.adminClient(brokerUrl, username, secret),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class StorageConsumptionFunctionalTest {
void shouldGetStorageAndConsumptionMetrics() throws Exception {

Provisioner.provision(
false,
false,
API_SPEC,
"./build/resources/test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public static void provision() {
final SchemaRegistryClient schemaRegistryClient =
new CachedSchemaRegistryClient(KAFKA_ENV.schemeRegistryServer(), 5);
Provisioner.provision(
false,
false,
API_SPEC,
"./build/resources/test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ private Provisioner() {}
* Provision Topics, ACLS and schemas
*
* @param dryRun test or execute
* @param cleanUnspecified cleanup
* @param apiSpec given spec
* @param schemaResources schema path
* @param adminClient kafka admin client
Expand All @@ -41,6 +42,7 @@ private Provisioner() {}
*/
public static Status provision(
final boolean dryRun,
final boolean cleanUnspecified,
final KafkaApiSpec apiSpec,
final String schemaResources,
final Admin adminClient,
Expand All @@ -49,7 +51,10 @@ public static Status provision(
apiSpec.apiSpec().validate();

final var status =
Status.builder().topics(TopicProvisioner.provision(dryRun, apiSpec, adminClient));
Status.builder()
.topics(
TopicProvisioner.provision(
dryRun, cleanUnspecified, apiSpec, adminClient));
schemaRegistryClient.ifPresent(
registryClient ->
status.schemas(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,22 @@ public Collection<Topic> calculate(
.collect(Collectors.toList());
}
}
/** Returns those topics to create and ignores existing topics */
public static final class UnspecifiedCalculator implements ChangeSetCalculator {

/**
* Calculate changes of topics that are unspecified
*
* @param existingTopics - existing
* @param requiredTopics - set of topics that should exist
* @return list of new topics with 'CREATE' flag
*/
public Collection<Topic> calculate(
final Collection<Topic> existingTopics, final Collection<Topic> requiredTopics) {
existingTopics.removeAll(requiredTopics);
return existingTopics;
}
}
/** Main API */
interface ChangeSetCalculator {
/**
Expand Down Expand Up @@ -182,10 +198,15 @@ public static ChangeSetBuilder builder() {
/**
* build it
*
* @param cleanUnspecified - to remove unspecified resources
* @return required calculator
*/
public ChangeSetCalculator build() {
return new CollectiveCalculator(new CreateCalculator(), new UpdateCalculator());
public ChangeSetCalculator build(final boolean cleanUnspecified) {
if (cleanUnspecified) {
return new UnspecifiedCalculator();
} else {
return new CollectiveCalculator(new CreateCalculator(), new UpdateCalculator());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,19 @@
import org.apache.kafka.common.config.ConfigResource;

/** Write topics using provided input set */
public class TopicWriters {
public class TopicMutators {

/** Collection based */
public static final class CollectiveWriter implements TopicWriter {
/** collection based */
public static final class CollectiveMutator implements TopicMutator {

private final Stream<TopicWriter> writers;
private final Stream<TopicMutator> writers;

/**
* iterate over the writers
*
* @param writers to iterate
*/
private CollectiveWriter(final TopicWriter... writers) {
private CollectiveMutator(final TopicMutator... writers) {
this.writers = Arrays.stream(writers);
}

Expand All @@ -65,16 +65,16 @@ private CollectiveWriter(final TopicWriter... writers) {
* @return updated status
*/
@Override
public Collection<Topic> write(final Collection<Topic> topics) {
public Collection<Topic> mutate(final Collection<Topic> topics) {
return this.writers
.map(writer -> writer.write(topics))
.map(writer -> writer.mutate(topics))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
}

/** only handles update requests */
public static final class UpdateWriter implements TopicWriter {
/** updates */
public static final class UpdateMutator implements TopicMutator {

private final Admin adminClient;

Expand All @@ -83,7 +83,7 @@ public static final class UpdateWriter implements TopicWriter {
*
* @param adminClient - cluster connection
*/
UpdateWriter(final Admin adminClient) {
UpdateMutator(final Admin adminClient) {
this.adminClient = adminClient;
}

Expand All @@ -94,7 +94,7 @@ public static final class UpdateWriter implements TopicWriter {
* @return topics with updated flag
* @throws ProvisioningException when things break
*/
public Collection<Topic> write(final Collection<Topic> topics)
public Collection<Topic> mutate(final Collection<Topic> topics)
throws ProvisioningException {

final var topicsToUpdate =
Expand Down Expand Up @@ -200,17 +200,77 @@ private void updatePartitions(final Topic topic, final TopicDescription descript
}
}

/** creates the topic */
public static final class CreateWriter implements TopicWriter {
/** delete non-spec resources */
public static final class CleanUnspecifiedMutator implements TopicMutator {

private final boolean dryRun;
private final Admin adminClient;

/**
* Needs the admin client
*
* @param dryRun - test or execute flag
* @param adminClient - cluster connection
*/
private CreateWriter(final Admin adminClient) {
CleanUnspecifiedMutator(final boolean dryRun, final Admin adminClient) {
this.dryRun = dryRun;
this.adminClient = adminClient;
}

/**
* Remove topics that are not CREATE or UPSDATE (i.e. not in the spec)
*
* @param topics to write
* @return topics with updated flag
* @throws ProvisioningException when things break
*/
public Collection<Topic> mutate(final Collection<Topic> topics)
throws ProvisioningException {

// spec topics will have CREATE or UPDATE status - remove the others (unwanted)
final var unwanted =
topics.stream()
.filter(
topic ->
!topic.state().equals(STATE.CREATE)
&& !topic.state().equals(STATE.UPDATE))
.collect(Collectors.toList());

try {
if (!dryRun) {
adminClient
.deleteTopics(toTopicNames(unwanted))
.all()
.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS);
}
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
throw new ProvisioningException("Failed to cleanup unwanted topics", ex);
}
return unwanted;
}

/**
* convert to names
*
* @param topicsToUpdate source list
* @return just the names
*/
private List<String> toTopicNames(final List<Topic> topicsToUpdate) {
return topicsToUpdate.stream().map(Topic::name).collect(Collectors.toList());
}
}

/** creations */
public static final class CreateMutator implements TopicMutator {

private final Admin adminClient;

/**
* Needs the admin client
*
* @param adminClient - cluster connection
*/
private CreateMutator(final Admin adminClient) {
this.adminClient = adminClient;
}

Expand All @@ -221,7 +281,7 @@ private CreateWriter(final Admin adminClient) {
* @return topics with updated flag
* @throws ProvisioningException when things break
*/
public Collection<Topic> write(final Collection<Topic> topics)
public Collection<Topic> mutate(final Collection<Topic> topics)
throws ProvisioningException {

final var topicsToCreate =
Expand Down Expand Up @@ -266,62 +326,77 @@ private Collection<NewTopic> asNewTopic(final Collection<Topic> topics) {
}
}

/** Noop write that does nada */
public static final class NoopWriter implements TopicWriter {
/** Noopper that does nada */
public static final class NoopMutator implements TopicMutator {
/**
* Do nothing write
*
* @param topics to ignore
* @return unmodified list
* @throws ProvisioningException when things go wrong
*/
public Collection<Topic> write(final Collection<Topic> topics)
public Collection<Topic> mutate(final Collection<Topic> topics)
throws ProvisioningException {
return topics;
}
}

/** Interface for writing topics to kafka */
interface TopicWriter {
/** Interface for writing/mutating topics to kafka */
interface TopicMutator {
/**
* Api for writing
*
* @param topics to write
* @param topics to do stuff against
* @return updated state of topics written
*/
Collection<Topic> write(Collection<Topic> topics);
Collection<Topic> mutate(Collection<Topic> topics);
}

/** TopicWriter builder */
/** TopicMutator builder */
@SuppressFBWarnings(
value = "EI_EXPOSE_REP2",
justification = "adminClient() passed as param to prevent API pollution")
public static final class TopicWriterBuilder {
public static final class TopicMutatorBuilder {
private Admin adminClient;
private boolean noopWriter;
private boolean noop;
private boolean cleanUnspecified;
private boolean dryRun;

/** defensive */
private TopicWriterBuilder() {}
private TopicMutatorBuilder() {}

/**
* add the adminClient
*
* @param adminClient - cluster connection
* @return builder
*/
public TopicWriterBuilder adminClient(final Admin adminClient) {
public TopicMutatorBuilder adminClient(final Admin adminClient) {
this.adminClient = adminClient;
return this;
}

/**
* use a noop writer
* use a noop mutator
*
* @param dryRun - true is dry running
* @return the builder
*/
public TopicWriterBuilder noopWriter(final boolean dryRun) {
this.noopWriter = dryRun;
public TopicMutatorBuilder noopMutator(final boolean dryRun) {
this.noop = dryRun;
return this;
}
/**
* use the delete mutator
*
* @param cleanUnspecified - to cleanup resources
* @param dryRun - test.validate the proposed operation
* @return the builder
*/
public TopicMutatorBuilder cleanUnspecified(
final boolean cleanUnspecified, final boolean dryRun) {
this.cleanUnspecified = cleanUnspecified;
this.dryRun = dryRun;
return this;
}

Expand All @@ -330,21 +405,23 @@ public TopicWriterBuilder noopWriter(final boolean dryRun) {
*
* @return builder
*/
public static TopicWriterBuilder builder() {
return new TopicWriterBuilder();
public static TopicMutatorBuilder builder() {
return new TopicMutatorBuilder();
}

/**
* build it
*
* @return the specified topic writer impl
* @return the specified topic mutator impl
*/
public TopicWriter build() {
if (noopWriter) {
return new NoopWriter();
public TopicMutator build() {
if (cleanUnspecified) {
return new CleanUnspecifiedMutator(dryRun, adminClient);
} else if (noop) {
return new NoopMutator();
} else {
return new CollectiveWriter(
new CreateWriter(adminClient), new UpdateWriter(adminClient));
return new CollectiveMutator(
new CreateMutator(adminClient), new UpdateMutator(adminClient));
}
}
}
Expand Down
Loading

0 comments on commit 04bde95

Please sign in to comment.