Skip to content

Commit

Permalink
part 3 - ACLs for unspec cleanup #58 (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
bluemonk3y authored Oct 2, 2023
1 parent 953e273 commit d45e432
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,29 @@
*/
public class AclChangeSetCalculators {

/** Calc unspecified acls */
public static final class CleanUnspecifiedCalculator implements ChangeSetCalculator {

/**
* returns unspec'd acls
*
* @param existingAcls - existing
* @param requiredAcls - needed
* @return set of acls to remove
*/
@Override
public Collection<Acl> calculate(
final Collection<Acl> existingAcls, final Collection<Acl> requiredAcls) {
existingAcls.removeAll(requiredAcls);
return existingAcls;
}
}

/** Returns those acls to create and ignores existing */
public static final class CreateOrUpdateCalculator implements ChangeSetCalculator {

/**
* Calculate set of Acls that dont already exist
* Calculate set of Acls that don't already exist
*
* @param existingAcls - existing
* @param requiredAcls - needed
Expand Down Expand Up @@ -85,7 +103,7 @@ interface ChangeSetCalculator {
*
* @param existingAcls - existing
* @param requiredAcls - needed
* @return - set of those that dont exist
* @return - set of those that don't exist
*/
Collection<Acl> calculate(Collection<Acl> existingAcls, Collection<Acl> requiredAcls);
}
Expand All @@ -108,10 +126,15 @@ public static ChangeSetBuilder builder() {
/**
* build it
*
* @param cleanUnspecified - clean up
* @return required calculator
*/
public ChangeSetCalculator build() {
return new CreateOrUpdateCalculator();
public ChangeSetCalculator build(final boolean cleanUnspecified) {
if (cleanUnspecified) {
return new CleanUnspecifiedCalculator();
} else {
return new CreateOrUpdateCalculator();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,24 @@
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.acl.AclBindingFilter;

/** AclsWriters for writing Acls */
public class AclWriters {
/** Mutant Acls */
public class AclMutators {

/** Writes Acls */
public static final class SimpleAclWriter implements AclWriter {
public static final class AclUnspecCleaner implements AclMutator {

private final Admin adminClient;
private final boolean dryRun;

/**
* defensive
*
* @param adminClient - cluster connection
* @param dryRun - dryRun
*/
private SimpleAclWriter(final Admin adminClient) {
private AclUnspecCleaner(final Admin adminClient, final boolean dryRun) {
this.adminClient = adminClient;
this.dryRun = dryRun;
}

/**
Expand All @@ -50,14 +53,29 @@ private SimpleAclWriter(final Admin adminClient) {
* @return set of ACLs with CREATE or FAILED + Exception
*/
@Override
public Collection<Acl> write(final Collection<Acl> acls) {
public Collection<Acl> mutate(final Collection<Acl> acls) {

final var aclsToDelete =
acls.stream()
.filter(
acl ->
!acl.state().equals(STATE.CREATE)
&& !acl.state().equals(STATE.UPDATE))
.collect(Collectors.toList());

final var aclBindingFilters =
aclsToDelete.stream()
.map(acl -> acl.aclBinding().toFilter())
.collect(Collectors.toList());

final var updateBindingsFilters = bindingFiltersForUpdates(acls);
try {
adminClient.deleteAcls(updateBindingsFilters);
aclsToDelete.forEach(acl -> acl.state(STATE.DELETE));
if (!dryRun) {
adminClient.deleteAcls(aclBindingFilters);
aclsToDelete.forEach(acl -> acl.state(STATE.DELETED));
}
} catch (Exception ex) {
acls.stream()
.filter(acl -> acl.state().equals(STATE.UPDATE))
.peek(
acl ->
acl.state(STATE.FAILED)
Expand All @@ -69,6 +87,36 @@ public Collection<Acl> write(final Collection<Acl> acls) {
.toString()));
return acls;
}
return aclsToDelete;
}
}

/** Writes Acls */
public static final class AclWriter implements AclMutator {

private final Admin adminClient;

/**
* defensive
*
* @param adminClient - cluster connection
*/
private AclWriter(final Admin adminClient) {
this.adminClient = adminClient;
}

/**
* Write the set of acls and change status to CREATED
*
* @param acls to write
* @return set of ACLs with CREATE or FAILED + Exception
*/
@Override
public Collection<Acl> mutate(final Collection<Acl> acls) {

if (deleteAclsInPrepForUpdate(acls)) {
return acls;
}

final var createAcls =
acls.stream()
Expand Down Expand Up @@ -98,8 +146,29 @@ public Collection<Acl> write(final Collection<Acl> acls) {
return acls;
}

private boolean deleteAclsInPrepForUpdate(final Collection<Acl> acls) {
final var updateBindingsFilters = bindingFiltersForUpdates(acls);
try {
adminClient.deleteAcls(updateBindingsFilters);
} catch (Exception ex) {
acls.stream()
.filter(acl -> acl.state().equals(STATE.UPDATE))
.peek(
acl ->
acl.state(STATE.FAILED)
.messages(
acl.messages() + "\n Failed to delete ACLs")
.exception(
new ProvisioningException(
"Failed to delete ACL", ex)
.toString()));
return true;
}
return false;
}

/**
* Extract the bindingFilter for updates (so they can be deleted)
* Extract the bindingFilter for updates - need to deleted first
*
* @param acls to filter
* @return bindings
Expand All @@ -113,7 +182,7 @@ private Collection<AclBindingFilter> bindingFiltersForUpdates(final Collection<A
}

/** Do nothing writer */
public static final class NoopAclWriter implements AclWriter {
public static final class NoopAclMutator implements AclMutator {

/**
* Do nothing
Expand All @@ -122,40 +191,41 @@ public static final class NoopAclWriter implements AclWriter {
* @return acls with status set to CREATED or FAILED
*/
@Override
public Collection<Acl> write(final Collection<Acl> acls) {
public Collection<Acl> mutate(final Collection<Acl> acls) {
return acls;
}
}

/** Write Acls API */
interface AclWriter {
interface AclMutator {
/**
* Write some acls
*
* @param acls to write
* @return updated status of acls
*/
Collection<Acl> write(Collection<Acl> acls);
Collection<Acl> mutate(Collection<Acl> acls);
}

/** TopicWriter builder */
/** Mutator builder */
@SuppressFBWarnings(
value = "EI_EXPOSE_REP2",
justification = "adminClient() passed as param to prevent API pollution")
public static final class AclWriterBuilder {
public static final class AclMutatorBuilder {
private Admin adminClient;
private boolean noopWriter;
private boolean dryRun;
private boolean cleanUnspecified;

/** defensive */
private AclWriterBuilder() {}
private AclMutatorBuilder() {}

/**
* main builder
*
* @return builder
*/
public static AclWriterBuilder builder() {
return new AclWriterBuilder();
public static AclMutatorBuilder builder() {
return new AclMutatorBuilder();
}

/**
Expand All @@ -164,32 +234,40 @@ public static AclWriterBuilder builder() {
* @param adminClient - cluster connection
* @return builder
*/
public AclWriterBuilder adminClient(final Admin adminClient) {
public AclMutatorBuilder adminClient(final Admin adminClient) {
this.adminClient = adminClient;
return this;
}

/**
* use a noop writer
* use a noop
*
* @param dryRun - true is dry running
* @return the builder
*/
public AclWriterBuilder noop(final boolean dryRun) {
noopWriter = dryRun;
public AclMutatorBuilder noop(final boolean dryRun) {
this.dryRun = dryRun;
return this;
}

public AclMutatorBuilder unspecified(final boolean cleanUnspecified) {
this.cleanUnspecified = cleanUnspecified;
return this;
}

/**
* build it
*
* @return the specified topic writer impl
* @return the required mutator impl
*/
public AclWriter build() {
if (noopWriter) {
return new NoopAclWriter();
public AclMutator build() {
if (cleanUnspecified) {
return new AclUnspecCleaner(adminClient, dryRun);
}
if (dryRun) {
return new NoopAclMutator();
} else {
return new SimpleAclWriter(adminClient);
return new AclWriter(adminClient);
}
}
}
Expand Down
34 changes: 22 additions & 12 deletions kafka/src/main/java/io/specmesh/kafka/provision/AclProvisioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.specmesh.kafka.KafkaApiSpec;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.AccessLevel;
Expand All @@ -40,29 +39,34 @@ private AclProvisioner() {}
* Provision acls in the Kafka cluster
*
* @param dryRun for mode of operation
* @param cleanUnspecified remove unwanted
* @param apiSpec respect the spec
* @param adminClient cluster connection
* @return status of provisioning
* @throws Provisioner.ProvisioningException on interrupt
*/
public static Collection<Acl> provision(
final boolean dryRun, final KafkaApiSpec apiSpec, final Admin adminClient) {
final boolean dryRun,
final boolean cleanUnspecified,
final KafkaApiSpec apiSpec,
final Admin adminClient) {

final var requiredAcls = apiSpec.requiredAcls();
final var existing = reader(adminClient).read(requiredAcls);
final var requiredAcls = bindingsToAcls(apiSpec.requiredAcls());
final var existing = reader(adminClient).read(apiSpec.id(), requiredAcls);

final var required = calculator().calculate(existing, bindingsToAcls(requiredAcls));
final var required = calculator(cleanUnspecified).calculate(existing, requiredAcls);

return writer(dryRun, adminClient).write(required);
return writer(dryRun, cleanUnspecified, adminClient).mutate(required);
}

/**
* changeset calculator
*
* @return calculator
*/
private static AclChangeSetCalculators.ChangeSetCalculator calculator() {
return AclChangeSetCalculators.ChangeSetBuilder.builder().build();
private static AclChangeSetCalculators.ChangeSetCalculator calculator(
final boolean cleanUnspecified) {
return AclChangeSetCalculators.ChangeSetBuilder.builder().build(cleanUnspecified);
}

/**
Expand All @@ -79,11 +83,17 @@ private static AclReaders.AclReader reader(final Admin adminClient) {
* acl writer
*
* @param dryRun - real or not
* @param cleanUnspecified - remove unspecd
* @param adminClient - cluster connection
* @return - writer instance
*/
private static AclWriters.AclWriter writer(final boolean dryRun, final Admin adminClient) {
return AclWriters.AclWriterBuilder.builder().noop(dryRun).adminClient(adminClient).build();
private static AclMutators.AclMutator writer(
final boolean dryRun, final boolean cleanUnspecified, final Admin adminClient) {
return AclMutators.AclMutatorBuilder.builder()
.noop(dryRun)
.unspecified(cleanUnspecified)
.adminClient(adminClient)
.build();
}

/**
Expand All @@ -92,14 +102,14 @@ private static AclWriters.AclWriter writer(final boolean dryRun, final Admin adm
* @param allAcls bindings to convert
* @return - converted set
*/
private static List<Acl> bindingsToAcls(final Set<AclBinding> allAcls) {
private static Collection<Acl> bindingsToAcls(final Set<AclBinding> allAcls) {
return allAcls.stream()
.map(
aclBinding ->
Acl.builder()
.aclBinding(aclBinding)
.name(aclBinding.toString())
.state(Status.STATE.READ)
.state(Status.STATE.CREATE)
.build())
.collect(Collectors.toList());
}
Expand Down
Loading

0 comments on commit d45e432

Please sign in to comment.