diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/AclChangeSetCalculators.java b/kafka/src/main/java/io/specmesh/kafka/provision/AclChangeSetCalculators.java index 22a06e42..e07b61a0 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/AclChangeSetCalculators.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/AclChangeSetCalculators.java @@ -28,11 +28,29 @@ */ public class AclChangeSetCalculators { + /** Calc unspecified acls */ + public static final class CleanUnspecifiedCalculator implements ChangeSetCalculator { + + /** + * returns unspec'd acls + * + * @param existingAcls - existing + * @param requiredAcls - needed + * @return set of acls to remove + */ + @Override + public Collection calculate( + final Collection existingAcls, final Collection requiredAcls) { + existingAcls.removeAll(requiredAcls); + return existingAcls; + } + } + /** Returns those acls to create and ignores existing */ public static final class CreateOrUpdateCalculator implements ChangeSetCalculator { /** - * Calculate set of Acls that dont already exist + * Calculate set of Acls that don't already exist * * @param existingAcls - existing * @param requiredAcls - needed @@ -85,7 +103,7 @@ interface ChangeSetCalculator { * * @param existingAcls - existing * @param requiredAcls - needed - * @return - set of those that dont exist + * @return - set of those that don't exist */ Collection calculate(Collection existingAcls, Collection requiredAcls); } @@ -108,10 +126,15 @@ public static ChangeSetBuilder builder() { /** * build it * + * @param cleanUnspecified - clean up * @return required calculator */ - public ChangeSetCalculator build() { - return new CreateOrUpdateCalculator(); + public ChangeSetCalculator build(final boolean cleanUnspecified) { + if (cleanUnspecified) { + return new CleanUnspecifiedCalculator(); + } else { + return new CreateOrUpdateCalculator(); + } } } } diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/AclWriters.java b/kafka/src/main/java/io/specmesh/kafka/provision/AclMutators.java similarity index 56% rename from kafka/src/main/java/io/specmesh/kafka/provision/AclWriters.java rename to kafka/src/main/java/io/specmesh/kafka/provision/AclMutators.java index 462f2587..61616a8a 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/AclWriters.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/AclMutators.java @@ -26,21 +26,24 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.acl.AclBindingFilter; -/** AclsWriters for writing Acls */ -public class AclWriters { +/** Mutant Acls */ +public class AclMutators { /** Writes Acls */ - public static final class SimpleAclWriter implements AclWriter { + public static final class AclUnspecCleaner implements AclMutator { private final Admin adminClient; + private final boolean dryRun; /** * defensive * * @param adminClient - cluster connection + * @param dryRun - dryRun */ - private SimpleAclWriter(final Admin adminClient) { + private AclUnspecCleaner(final Admin adminClient, final boolean dryRun) { this.adminClient = adminClient; + this.dryRun = dryRun; } /** @@ -50,14 +53,29 @@ private SimpleAclWriter(final Admin adminClient) { * @return set of ACLs with CREATE or FAILED + Exception */ @Override - public Collection write(final Collection acls) { + public Collection mutate(final Collection acls) { + + final var aclsToDelete = + acls.stream() + .filter( + acl -> + !acl.state().equals(STATE.CREATE) + && !acl.state().equals(STATE.UPDATE)) + .collect(Collectors.toList()); + + final var aclBindingFilters = + aclsToDelete.stream() + .map(acl -> acl.aclBinding().toFilter()) + .collect(Collectors.toList()); - final var updateBindingsFilters = bindingFiltersForUpdates(acls); try { - adminClient.deleteAcls(updateBindingsFilters); + aclsToDelete.forEach(acl -> acl.state(STATE.DELETE)); + if (!dryRun) { + adminClient.deleteAcls(aclBindingFilters); + aclsToDelete.forEach(acl -> acl.state(STATE.DELETED)); + } } catch (Exception ex) { acls.stream() - .filter(acl -> acl.state().equals(STATE.UPDATE)) .peek( acl -> acl.state(STATE.FAILED) @@ -69,6 +87,36 @@ public Collection write(final Collection acls) { .toString())); return acls; } + return aclsToDelete; + } + } + + /** Writes Acls */ + public static final class AclWriter implements AclMutator { + + private final Admin adminClient; + + /** + * defensive + * + * @param adminClient - cluster connection + */ + private AclWriter(final Admin adminClient) { + this.adminClient = adminClient; + } + + /** + * Write the set of acls and change status to CREATED + * + * @param acls to write + * @return set of ACLs with CREATE or FAILED + Exception + */ + @Override + public Collection mutate(final Collection acls) { + + if (deleteAclsInPrepForUpdate(acls)) { + return acls; + } final var createAcls = acls.stream() @@ -98,8 +146,29 @@ public Collection write(final Collection acls) { return acls; } + private boolean deleteAclsInPrepForUpdate(final Collection acls) { + final var updateBindingsFilters = bindingFiltersForUpdates(acls); + try { + adminClient.deleteAcls(updateBindingsFilters); + } catch (Exception ex) { + acls.stream() + .filter(acl -> acl.state().equals(STATE.UPDATE)) + .peek( + acl -> + acl.state(STATE.FAILED) + .messages( + acl.messages() + "\n Failed to delete ACLs") + .exception( + new ProvisioningException( + "Failed to delete ACL", ex) + .toString())); + return true; + } + return false; + } + /** - * Extract the bindingFilter for updates (so they can be deleted) + * Extract the bindingFilter for updates - need to deleted first * * @param acls to filter * @return bindings @@ -113,7 +182,7 @@ private Collection bindingFiltersForUpdates(final Collection write(final Collection acls) { + public Collection mutate(final Collection acls) { return acls; } } /** Write Acls API */ - interface AclWriter { + interface AclMutator { /** * Write some acls * * @param acls to write * @return updated status of acls */ - Collection write(Collection acls); + Collection mutate(Collection acls); } - /** TopicWriter builder */ + /** Mutator builder */ @SuppressFBWarnings( value = "EI_EXPOSE_REP2", justification = "adminClient() passed as param to prevent API pollution") - public static final class AclWriterBuilder { + public static final class AclMutatorBuilder { private Admin adminClient; - private boolean noopWriter; + private boolean dryRun; + private boolean cleanUnspecified; /** defensive */ - private AclWriterBuilder() {} + private AclMutatorBuilder() {} /** * main builder * * @return builder */ - public static AclWriterBuilder builder() { - return new AclWriterBuilder(); + public static AclMutatorBuilder builder() { + return new AclMutatorBuilder(); } /** @@ -164,32 +234,40 @@ public static AclWriterBuilder builder() { * @param adminClient - cluster connection * @return builder */ - public AclWriterBuilder adminClient(final Admin adminClient) { + public AclMutatorBuilder adminClient(final Admin adminClient) { this.adminClient = adminClient; return this; } /** - * use a noop writer + * use a noop * * @param dryRun - true is dry running * @return the builder */ - public AclWriterBuilder noop(final boolean dryRun) { - noopWriter = dryRun; + public AclMutatorBuilder noop(final boolean dryRun) { + this.dryRun = dryRun; + return this; + } + + public AclMutatorBuilder unspecified(final boolean cleanUnspecified) { + this.cleanUnspecified = cleanUnspecified; return this; } /** * build it * - * @return the specified topic writer impl + * @return the required mutator impl */ - public AclWriter build() { - if (noopWriter) { - return new NoopAclWriter(); + public AclMutator build() { + if (cleanUnspecified) { + return new AclUnspecCleaner(adminClient, dryRun); + } + if (dryRun) { + return new NoopAclMutator(); } else { - return new SimpleAclWriter(adminClient); + return new AclWriter(adminClient); } } } diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/AclProvisioner.java b/kafka/src/main/java/io/specmesh/kafka/provision/AclProvisioner.java index f9770403..d43ae023 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/AclProvisioner.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/AclProvisioner.java @@ -19,7 +19,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.specmesh.kafka.KafkaApiSpec; import java.util.Collection; -import java.util.List; import java.util.Set; import java.util.stream.Collectors; import lombok.AccessLevel; @@ -40,20 +39,24 @@ private AclProvisioner() {} * Provision acls in the Kafka cluster * * @param dryRun for mode of operation + * @param cleanUnspecified remove unwanted * @param apiSpec respect the spec * @param adminClient cluster connection * @return status of provisioning * @throws Provisioner.ProvisioningException on interrupt */ public static Collection provision( - final boolean dryRun, final KafkaApiSpec apiSpec, final Admin adminClient) { + final boolean dryRun, + final boolean cleanUnspecified, + final KafkaApiSpec apiSpec, + final Admin adminClient) { - final var requiredAcls = apiSpec.requiredAcls(); - final var existing = reader(adminClient).read(requiredAcls); + final var requiredAcls = bindingsToAcls(apiSpec.requiredAcls()); + final var existing = reader(adminClient).read(apiSpec.id(), requiredAcls); - final var required = calculator().calculate(existing, bindingsToAcls(requiredAcls)); + final var required = calculator(cleanUnspecified).calculate(existing, requiredAcls); - return writer(dryRun, adminClient).write(required); + return writer(dryRun, cleanUnspecified, adminClient).mutate(required); } /** @@ -61,8 +64,9 @@ public static Collection provision( * * @return calculator */ - private static AclChangeSetCalculators.ChangeSetCalculator calculator() { - return AclChangeSetCalculators.ChangeSetBuilder.builder().build(); + private static AclChangeSetCalculators.ChangeSetCalculator calculator( + final boolean cleanUnspecified) { + return AclChangeSetCalculators.ChangeSetBuilder.builder().build(cleanUnspecified); } /** @@ -79,11 +83,17 @@ private static AclReaders.AclReader reader(final Admin adminClient) { * acl writer * * @param dryRun - real or not + * @param cleanUnspecified - remove unspecd * @param adminClient - cluster connection * @return - writer instance */ - private static AclWriters.AclWriter writer(final boolean dryRun, final Admin adminClient) { - return AclWriters.AclWriterBuilder.builder().noop(dryRun).adminClient(adminClient).build(); + private static AclMutators.AclMutator writer( + final boolean dryRun, final boolean cleanUnspecified, final Admin adminClient) { + return AclMutators.AclMutatorBuilder.builder() + .noop(dryRun) + .unspecified(cleanUnspecified) + .adminClient(adminClient) + .build(); } /** @@ -92,14 +102,14 @@ private static AclWriters.AclWriter writer(final boolean dryRun, final Admin adm * @param allAcls bindings to convert * @return - converted set */ - private static List bindingsToAcls(final Set allAcls) { + private static Collection bindingsToAcls(final Set allAcls) { return allAcls.stream() .map( aclBinding -> Acl.builder() .aclBinding(aclBinding) .name(aclBinding.toString()) - .state(Status.STATE.READ) + .state(Status.STATE.CREATE) .build()) .collect(Collectors.toList()); } diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/AclReaders.java b/kafka/src/main/java/io/specmesh/kafka/provision/AclReaders.java index 8ed2e17b..58cc57ea 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/AclReaders.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/AclReaders.java @@ -21,11 +21,10 @@ import io.specmesh.kafka.provision.Provisioner.ProvisioningException; import java.util.Collection; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; /** AclReaders for reading Acls */ public class AclReaders { @@ -45,53 +44,68 @@ private SimpleAclReader(final Admin adminClient) { } /** - * Read set of acls for specAclsBindingsNeeded + * Read set of acls that exist for this Spec (and those that dont) - i.e. removed * * @param specAclsBindingsNeeded to filter against - * @return found acls with status set to READ + * @return existing ACLs with status set to READ */ @Override - public Collection read(final Set specAclsBindingsNeeded) { - - final var aclBindingFilters = - specAclsBindingsNeeded.stream() - .map(AclBinding::toFilter) - .collect(Collectors.toList()); - - final var existingAcls = - aclBindingFilters.stream() - .map( - bindingFilter -> { - try { - return adminClient - .describeAcls(bindingFilter) - .values() - .get( - Provisioner.REQUEST_TIMEOUT, - TimeUnit.SECONDS); - } catch (Exception e) { - if (e.getCause() - .toString() - .contains( - "org.apache.kafka.common.errors.SecurityDisabledException")) { - return List.of(); - } - throw new ProvisioningException( - "Failed to read ACLs", e); - } - }) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - - return existingAcls.stream() - .map( - aclBinding -> - Acl.builder() - .name(aclBinding.toString()) - .aclBinding(aclBinding) - .state(Status.STATE.READ) - .build()) - .collect(Collectors.toList()); + public Collection read( + final String prefixPattern, final Collection specAclsBindingsNeeded) { + + try { + // crappy admin client for reading 'ALL' ACLs to pickup removed ACLs + final var allAcls = + adminClient + .describeAcls(AclBindingFilter.ANY) + .values() + .get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS); + + // will include ACLs that may have been removed + final var existingAclsSuperSet = + allAcls.stream() + .filter( + aclBinding -> + aclBinding + .toFilter() + .patternFilter() + .name() + .contains(prefixPattern)) + .collect(Collectors.toSet()); + + // now look for any in the spec that didnt get discovered in the 'filtering' - will + // be 'CLUSTER' type + final var aclNeededAsStringSet = + specAclsBindingsNeeded.stream() + .map(acl -> acl.aclBinding().toString()) + .collect(Collectors.toSet()); + final var allAclsSpecifiedAndFound = + allAcls.stream() + .filter(anAcl -> aclNeededAsStringSet.contains(anAcl.toString())) + .collect(Collectors.toList()); + + // add them to the list + existingAclsSuperSet.addAll(allAclsSpecifiedAndFound); + + return existingAclsSuperSet.stream() + .map( + aclBinding -> + Acl.builder() + .name(aclBinding.toString()) + .aclBinding(aclBinding) + .state(Status.STATE.READ) + .build()) + .collect(Collectors.toList()); + } catch (Exception e) { + if (e.getCause() != null + && e.getCause() + .toString() + .contains( + "org.apache.kafka.common.errors.SecurityDisabledException")) { + return List.of(); + } + throw new ProvisioningException("Failed to read ACLs", e); + } } } @@ -100,10 +114,11 @@ interface AclReader { /** * read some acls * - * @param prefix to read + * @param prefixPattern prefixpattern + * @param acls to read * @return updated status of acls */ - Collection read(Set prefix); + Collection read(String prefixPattern, Collection acls); } /** builder */ diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/Provisioner.java b/kafka/src/main/java/io/specmesh/kafka/provision/Provisioner.java index e87c18cb..933b8de8 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/Provisioner.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/Provisioner.java @@ -64,7 +64,7 @@ public static Status provision( apiSpec, schemaResources, registryClient))); - status.acls(AclProvisioner.provision(dryRun, apiSpec, adminClient)); + status.acls(AclProvisioner.provision(dryRun, cleanUnspecified, apiSpec, adminClient)); return status.build(); } diff --git a/kafka/src/test/java/io/specmesh/kafka/AclProvisionerUpdateFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/AclProvisionerUpdateFunctionalTest.java new file mode 100644 index 00000000..a0670197 --- /dev/null +++ b/kafka/src/test/java/io/specmesh/kafka/AclProvisionerUpdateFunctionalTest.java @@ -0,0 +1,175 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.kafka; + +import static org.apache.kafka.common.acl.AclOperation.ALL; +import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE; +import static org.apache.kafka.common.acl.AclPermissionType.ALLOW; +import static org.apache.kafka.common.resource.PatternType.LITERAL; +import static org.apache.kafka.common.resource.PatternType.PREFIXED; +import static org.apache.kafka.common.resource.Resource.CLUSTER_NAME; +import static org.apache.kafka.common.resource.ResourceType.CLUSTER; +import static org.apache.kafka.common.resource.ResourceType.GROUP; +import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.specmesh.kafka.provision.AclProvisioner; +import io.specmesh.kafka.provision.Provisioner; +import io.specmesh.kafka.provision.Status.STATE; +import io.specmesh.test.TestSpecLoader; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** + * Tests execution DryRuns and UPDATES where the provisioner-functional-test-api.yml is already + * provisioned + */ +@SuppressFBWarnings( + value = "IC_INIT_CIRCULARITY", + justification = "shouldHaveInitializedEnumsCorrectly() proves this is false positive") +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class AclProvisionerUpdateFunctionalTest { + + private static final KafkaApiSpec API_SPEC = + TestSpecLoader.loadFromClassPath("provisioner-functional-test-api.yaml"); + + private static final KafkaApiSpec API_UPDATE_SPEC = + TestSpecLoader.loadFromClassPath("provisioner-update-functional-test-api.yaml"); + + private enum Domain { + /** The domain associated with the spec. */ + SELF(API_SPEC.id()), + /** An unrelated domain. */ + UNRELATED("london.hammersmith.transport"), + /** A domain granted access to the protected topic. */ + LIMITED("some.other.domain.root"); + + final String domainId; + + Domain(final String name) { + this.domainId = name; + } + } + + private static final String ADMIN_USER = "admin"; + + @RegisterExtension + private static final KafkaEnvironment KAFKA_ENV = + DockerKafkaEnvironment.builder() + .withSaslAuthentication( + ADMIN_USER, + ADMIN_USER + "-secret", + Domain.SELF.domainId, + Domain.SELF.domainId + "-secret") + .withKafkaAcls(aclsForOtherDomain(Domain.LIMITED)) + .withKafkaAcls(aclsForOtherDomain(Domain.UNRELATED)) + .build(); + + @Test + @Order(1) + void shouldProvisionExistingSpec() { + try (Admin adminClient = KAFKA_ENV.adminClient()) { + AclProvisioner.provision(false, false, API_SPEC, adminClient); + } + } + + @Test + @Order(2) + void shouldPublishUpdatedAcls() { + try (Admin adminClient = KAFKA_ENV.adminClient()) { + final var dryRunAcls = + AclProvisioner.provision(true, false, API_UPDATE_SPEC, adminClient); + assertThat(dryRunAcls, is(hasSize(2))); + assertThat( + dryRunAcls.stream().filter(acl -> acl.state().equals(STATE.CREATE)).count(), + is(2L)); + + final var createdAcls = + AclProvisioner.provision(false, false, API_UPDATE_SPEC, adminClient); + + assertThat(createdAcls, is(hasSize(2))); + assertThat( + createdAcls.stream().filter(acl -> acl.state().equals(STATE.CREATED)).count(), + is(2L)); + } + } + + @Test + @Order(3) + void shouldCleanUnSpecdAcls() + throws ExecutionException, InterruptedException, TimeoutException { + try (Admin adminClient = KAFKA_ENV.adminClient()) { + + adminClient.deleteAcls(List.of(AclBindingFilter.ANY)); + + // Setup UnExpected ACL + final var pattern = + new ResourcePattern( + ResourceType.TOPIC, + API_SPEC.id() + "_public.something.NOT_NEEDED", + PatternType.LITERAL); + final var aclBinding1 = + new AclBinding(pattern, API_SPEC.requiredAcls().iterator().next().entry()); + + adminClient + .createAcls(List.of(aclBinding1)) + .all() + .get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS); + + final var dryRunAcls = AclProvisioner.provision(true, true, API_SPEC, adminClient); + assertThat(dryRunAcls, is(hasSize(1))); + assertThat(dryRunAcls.iterator().next().state(), is(STATE.DELETE)); + + final var createdAcls = AclProvisioner.provision(false, true, API_SPEC, adminClient); + + assertThat(createdAcls, is(hasSize(1))); + assertThat(createdAcls.iterator().next().state(), is(STATE.DELETED)); + } + } + + private static Set aclsForOtherDomain(final Domain domain) { + final String principal = "User:" + domain.domainId; + return Set.of( + new AclBinding( + new ResourcePattern(CLUSTER, CLUSTER_NAME, LITERAL), + new AccessControlEntry(principal, "*", IDEMPOTENT_WRITE, ALLOW)), + new AclBinding( + new ResourcePattern(GROUP, domain.domainId, LITERAL), + new AccessControlEntry(principal, "*", ALL, ALLOW)), + new AclBinding( + new ResourcePattern(TRANSACTIONAL_ID, domain.domainId, PREFIXED), + new AccessControlEntry(principal, "*", ALL, ALLOW))); + } +} diff --git a/kafka/src/test/java/io/specmesh/kafka/ExporterFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/ExporterFunctionalTest.java index 69f41298..98d508b0 100644 --- a/kafka/src/test/java/io/specmesh/kafka/ExporterFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/ExporterFunctionalTest.java @@ -97,7 +97,7 @@ private enum Domain { static void setUp() { try (Admin adminClient = KAFKA_ENV.adminClient()) { TopicProvisioner.provision(false, false, API_SPEC, adminClient); - AclProvisioner.provision(false, API_SPEC, adminClient); + AclProvisioner.provision(false, false, API_SPEC, adminClient); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java index 3045be4f..87bafed5 100644 --- a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java @@ -129,7 +129,7 @@ private enum Domain { static void setUp() { try (Admin adminClient = KAFKA_ENV.adminClient()) { TopicProvisioner.provision(false, false, API_SPEC, adminClient); - AclProvisioner.provision(false, API_SPEC, adminClient); + AclProvisioner.provision(false, false, API_SPEC, adminClient); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java index c79babe4..602129aa 100644 --- a/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java @@ -159,7 +159,7 @@ void shouldDryRunTopicsFromEmptyCluster() { void shouldDryRunACLsFromEmptyCluster() { try (Admin adminClient = KAFKA_ENV.adminClient()) { - final var changeset = AclProvisioner.provision(true, API_SPEC, adminClient); + final var changeset = AclProvisioner.provision(true, false, API_SPEC, adminClient); // Verify - 11 created assertThat( @@ -302,7 +302,7 @@ void shouldProvisionTopicsFromEmptyCluster() throws ExecutionException, Interrup void shouldDoRealACLsFromEmptyCluster() throws ExecutionException, InterruptedException { try (Admin adminClient = KAFKA_ENV.adminClient()) { - final var changeset = AclProvisioner.provision(false, API_SPEC, adminClient); + final var changeset = AclProvisioner.provision(false, false, API_SPEC, adminClient); // Verify - 11 created assertThat( diff --git a/kafka/src/test/java/io/specmesh/kafka/ProvisionerUpdatingFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/TopicProvisionerUpdateFunctionalTest.java similarity index 90% rename from kafka/src/test/java/io/specmesh/kafka/ProvisionerUpdatingFunctionalTest.java rename to kafka/src/test/java/io/specmesh/kafka/TopicProvisionerUpdateFunctionalTest.java index 72ed9568..635ebe85 100644 --- a/kafka/src/test/java/io/specmesh/kafka/ProvisionerUpdatingFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/TopicProvisionerUpdateFunctionalTest.java @@ -30,7 +30,6 @@ import static org.hamcrest.Matchers.is; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.specmesh.kafka.provision.AclProvisioner; import io.specmesh.kafka.provision.Status.STATE; import io.specmesh.kafka.provision.TopicProvisioner; import io.specmesh.kafka.provision.TopicProvisioner.Topic; @@ -62,7 +61,7 @@ value = "IC_INIT_CIRCULARITY", justification = "shouldHaveInitializedEnumsCorrectly() proves this is false positive") @TestMethodOrder(MethodOrderer.OrderAnnotation.class) -class ProvisionerUpdatingFunctionalTest { +class TopicProvisionerUpdateFunctionalTest { private static final KafkaApiSpec API_SPEC = TestSpecLoader.loadFromClassPath("provisioner-functional-test-api.yaml"); @@ -106,7 +105,6 @@ private enum Domain { void shouldProvisionExistingSpec() { try (Admin adminClient = KAFKA_ENV.adminClient()) { TopicProvisioner.provision(false, false, API_SPEC, adminClient); - AclProvisioner.provision(false, API_SPEC, adminClient); } } @@ -143,25 +141,6 @@ void shouldDoTopicUpdates() { } } - @Test - @Order(4) - void shouldPublishUpdatedAcls() { - try (Admin adminClient = KAFKA_ENV.adminClient()) { - final var dryRunAcls = AclProvisioner.provision(true, API_UPDATE_SPEC, adminClient); - assertThat(dryRunAcls, is(hasSize(2))); - assertThat( - dryRunAcls.stream().filter(acl -> acl.state().equals(STATE.CREATE)).count(), - is(2L)); - - final var createdAcls = AclProvisioner.provision(false, API_UPDATE_SPEC, adminClient); - - assertThat(createdAcls, is(hasSize(2))); - assertThat( - createdAcls.stream().filter(acl -> acl.state().equals(STATE.CREATED)).count(), - is(2L)); - } - } - @Test @Order(5) void shouldCleanupNonSpecTopicsDryRun()