diff --git a/cli/src/main/java/io/specmesh/cli/Provision.java b/cli/src/main/java/io/specmesh/cli/Provision.java index 0c12b852..3e2a3e46 100644 --- a/cli/src/main/java/io/specmesh/cli/Provision.java +++ b/cli/src/main/java/io/specmesh/cli/Provision.java @@ -64,14 +64,8 @@ public static void main(final String[] args) { + "` from:" + new File(propertyFilename).getAbsolutePath()); properties.load(fis); - properties - .entrySet() - .forEach( - entry -> { - properties.put( - entry.getKey().toString().replace(".", "-"), - entry.getValue()); - }); + properties.forEach( + (key, value) -> properties.put(key.toString().replace(".", "-"), value)); System.out.println( "Loaded `properties` from cwd:" + new File(propertyFilename).getAbsolutePath()); } catch (IOException e) { @@ -82,7 +76,7 @@ public static void main(final String[] args) { + new File(propertyFilename).getAbsolutePath() + "\nERROR:" + e); - e.printStackTrace(); + throw new RuntimeException(e); } final var provider = new CommandLine.PropertiesDefaultProvider(properties); @@ -146,6 +140,22 @@ public void spec(final String path) { builder.specPath(path); } + @Option( + names = {"-du", "--domain-user"}, + description = + "optional custom domain user, to be used when creating ACLs. By default," + + " specmesh expects the principle used to authenticate with Kafka to have" + + " the same name as the domain id. For example, given a domain id of" + + " 'urn:acme.products', specmesh expects the user to be called" + + " 'acme.products', and creates ACLs accordingly. In some situations, e.g." + + " Confluent Cloud Service Accounts, the username is system generated or" + + " outside control of administrators. In these situations, use this" + + " option to provide the generated username and specmesh will provision" + + " ACLs accordingly.") + public void domainUserAlias(final String alias) { + builder.domainUserAlias(alias); + } + @Option( names = {"-u", "--username"}, description = "username or api key for the Kafka cluster connection") diff --git a/kafka/src/main/java/io/specmesh/kafka/Clients.java b/kafka/src/main/java/io/specmesh/kafka/Clients.java index 8e607fbb..4d7a79f5 100644 --- a/kafka/src/main/java/io/specmesh/kafka/Clients.java +++ b/kafka/src/main/java/io/specmesh/kafka/Clients.java @@ -147,7 +147,7 @@ public static Map clientSaslAuthProperties( * @return true if principal was set */ private static boolean isPrincipalSpecified(final String principal) { - return principal != null && principal.length() > 0; + return principal != null && !principal.isEmpty(); } private static String buildJaasConfig(final String userName, final String password) { @@ -226,7 +226,7 @@ public static Map producerProperties( final Class valueSerializerClass, final boolean acksAll, final Map... additionalProperties) { - final Map props = clientProperties(domainId, bootstrapServers); + final Map props = clientProperties(bootstrapServers); props.putAll( Map.of( AdminClientConfig.CLIENT_ID_CONFIG, @@ -279,7 +279,7 @@ public static Map kstreamsProperties( final boolean acksAll, final Map... additionalProperties) { - final Map props = clientProperties(domainId, bootstrapServers); + final Map props = clientProperties(bootstrapServers); props.putAll( Map.of( StreamsConfig.APPLICATION_ID_CONFIG, @@ -354,7 +354,7 @@ public static Map consumerProperties( final Class valueDeserializerClass, final boolean autoOffsetResetEarliest, final Map... additionalProperties) { - final Map props = clientProperties(domainId, bootstrapServers); + final Map props = clientProperties(bootstrapServers); props.putAll( Map.of( ConsumerConfig.CLIENT_ID_CONFIG, @@ -375,10 +375,8 @@ public static Map consumerProperties( return props; } - private static Map clientProperties( - final String domainId, final String bootstrapServers) { + private static Map clientProperties(final String bootstrapServers) { final Map basicProps = new HashMap<>(); - basicProps.put(AdminClientConfig.CLIENT_ID_CONFIG, domainId); basicProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); return basicProps; } diff --git a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java index 4722ecd3..eb69245a 100644 --- a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java +++ b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java @@ -113,15 +113,19 @@ public List listDomainOwnedTopics() { */ @Deprecated public List listACLsForDomainOwnedTopics() { + return listACLsForDomainOwnedTopics(id()); + } + + private List listACLsForDomainOwnedTopics(final String user) { validateTopicConfig(); final List topicAcls = new ArrayList<>(); - topicAcls.addAll(ownTopicAcls()); - topicAcls.addAll(ownTransactionIdsAcls()); + topicAcls.addAll(ownTopicAcls(user)); + topicAcls.addAll(ownTransactionIdsAcls(user)); topicAcls.addAll(publicTopicAcls()); topicAcls.addAll(protectedTopicAcls()); - topicAcls.addAll(privateTopicAcls()); - topicAcls.addAll(prefixedAcls(CLUSTER, CLUSTER_NAME, principal(), IDEMPOTENT_WRITE)); + topicAcls.addAll(privateTopicAcls(user)); + topicAcls.addAll(prefixedAcls(CLUSTER, CLUSTER_NAME, principal(user), IDEMPOTENT_WRITE)); return topicAcls; } @@ -142,9 +146,33 @@ public List listACLsForDomainOwnedTopics() { * @return returns the set of required acls. */ public Set requiredAcls() { + return requiredAcls(id()); + } + + /** + * Get the set of required ACLs for this domain spec, supplying a custom username. + * + *

Standard convention is to use the domain id for the username. Only use an alternative name + * if required. + * + *

This includes {@code ALLOW} ACLs for: + * + *

    + *
  • Everyone to consume the spec's public topics + *
  • Specifically configured domains to consume the spec's protected topics + *
  • The spec's domain to be able to create ad-hoc private topics + *
  • The spec's domain to produce and consume its topics + *
  • The spec's domain to use its own consumer groups + *
  • The spec's domain to use its own transaction ids + *
+ * + * @param userName the username to use as the principal in the acl bindings. + * @return returns the set of required acls. + */ + public Set requiredAcls(final String userName) { final Set acls = new HashSet<>(); - acls.addAll(ownGroupAcls()); - acls.addAll(listACLsForDomainOwnedTopics()); + acls.addAll(ownGroupAcls(userName)); + acls.addAll(listACLsForDomainOwnedTopics(userName)); acls.addAll(grantAccessControlUsingGrantTagOnly()); return acls; } @@ -199,13 +227,7 @@ public Stream topicSchemas(final String topicName) { .flatMap(Optional::stream); } - /** - * Format the principal - * - * @param domainIdAsUsername the domain id - * @return the principal - */ - public static String formatPrincipal(final String domainIdAsUsername) { + private static String formatPrincipal(final String domainIdAsUsername) { return domainIdAsUsername.equals(PUBLIC) ? "User:*" : "User:" + domainIdAsUsername; } @@ -229,20 +251,20 @@ private void validateTopicConfig() { }); } - private String principal() { - return formatPrincipal(id()); + private String principal(final String user) { + return formatPrincipal(user); } - private Set ownGroupAcls() { - return prefixedAcls(GROUP, id(), principal(), READ); + private Set ownGroupAcls(final String user) { + return prefixedAcls(GROUP, id(), principal(user), READ); } - private Set ownTopicAcls() { - return prefixedAcls(TOPIC, id(), principal(), DESCRIBE, READ, WRITE); + private Set ownTopicAcls(final String user) { + return prefixedAcls(TOPIC, id(), principal(user), DESCRIBE, READ, WRITE); } - private Set ownTransactionIdsAcls() { - return prefixedAcls(TRANSACTIONAL_ID, id(), principal(), DESCRIBE, WRITE); + private Set ownTransactionIdsAcls(final String user) { + return prefixedAcls(TRANSACTIONAL_ID, id(), principal(user), DESCRIBE, WRITE); } private Set publicTopicAcls() { @@ -310,7 +332,7 @@ private List grantAccessControlUsingGrantTagOnly() { /** * the path is using public,private explicit based control * - * @param key + * @param key resource name * @return true if it is */ private boolean isUsingPathPerms(final String key) { @@ -319,8 +341,8 @@ private boolean isUsingPathPerms(final String key) { || key.startsWith(id() + DELIMITER + PUBLIC + DELIMITER); } - private Set privateTopicAcls() { - return prefixedAcls(TOPIC, id() + DELIMITER + PRIVATE, principal(), CREATE); + private Set privateTopicAcls(final String user) { + return prefixedAcls(TOPIC, id() + DELIMITER + PRIVATE, principal(user), CREATE); } private static Set literalAcls( diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/AclProvisioner.java b/kafka/src/main/java/io/specmesh/kafka/provision/AclProvisioner.java index 8b65ead4..eb93f8cb 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/AclProvisioner.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/AclProvisioner.java @@ -35,12 +35,36 @@ public final class AclProvisioner { /** defensive */ private AclProvisioner() {} + + /** + * Provision acls in the Kafka cluster + * + * @param dryRun for mode of operation + * @param cleanUnspecified remove unwanted + * @param apiSpec respect the spec + * @param adminClient cluster connection + * @return status of provisioning + * @throws ProvisioningException on interrupt + * @deprecated use {@link AclProvisioner#provision(boolean, boolean, KafkaApiSpec, String, + * Admin)} + */ + @Deprecated(forRemoval = true, since = "0.10.1") + public static Collection provision( + final boolean dryRun, + final boolean cleanUnspecified, + final KafkaApiSpec apiSpec, + final Admin adminClient) { + + return provision(dryRun, cleanUnspecified, apiSpec, apiSpec.id(), adminClient); + } + /** * Provision acls in the Kafka cluster * * @param dryRun for mode of operation * @param cleanUnspecified remove unwanted * @param apiSpec respect the spec + * @param userName user name * @param adminClient cluster connection * @return status of provisioning * @throws ProvisioningException on interrupt @@ -49,9 +73,10 @@ public static Collection provision( final boolean dryRun, final boolean cleanUnspecified, final KafkaApiSpec apiSpec, + final String userName, final Admin adminClient) { - final var requiredAcls = bindingsToAcls(apiSpec.requiredAcls()); + final var requiredAcls = bindingsToAcls(apiSpec.requiredAcls(userName)); final var existing = reader(adminClient).read(apiSpec.id(), requiredAcls); final var required = calculator(cleanUnspecified).calculate(existing, requiredAcls); diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/Provisioner.java b/kafka/src/main/java/io/specmesh/kafka/provision/Provisioner.java index 307af405..d2502912 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/Provisioner.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/Provisioner.java @@ -22,7 +22,7 @@ import io.specmesh.kafka.Clients; import io.specmesh.kafka.KafkaApiSpec; import io.specmesh.kafka.provision.schema.SchemaProvisioner; -import java.util.Optional; +import java.util.Collection; import lombok.Builder; import lombok.Getter; import lombok.experimental.Accessors; @@ -49,6 +49,7 @@ public final class Provisioner { @Builder.Default private boolean closeSchemaClient = false; private String schemaPath; @Builder.Default private String specPath = ""; + @Builder.Default private String domainUserAlias = ""; private KafkaApiSpec apiSpec; private String username; private String secret; @@ -59,34 +60,30 @@ public final class Provisioner { public Status provision() { return provision( - Provisioner::provision, Clients::adminClient, Clients::schemaRegistryClient, - KafkaApiSpec::loadFromClassPath); + KafkaApiSpec::loadFromClassPath, + TopicProvisioner::provision, + SchemaProvisioner::provision, + AclProvisioner::provision); } @VisibleForTesting Status provision( - final ProvisionerMethod method, final AdminFactory adminFactory, final SrClientFactory srClientFactory, - final SpecLoader specLoader) { + final SpecLoader specLoader, + final TopicProvision topicProvision, + final SchemaProvision schemaProvision, + final AclProvision aclProvision) { try { ensureApiSpec(specLoader); ensureAdminClient(adminFactory); ensureSrClient(srClientFactory); - final var status = - method.provision( - !aclDisabled, - dryRun, - cleanUnspecified, - apiSpec, - schemaPath, - adminClient, - srDisabled ? Optional.empty() : Optional.of(schemaRegistryClient)); - - System.out.println(status.toString()); + final Status status = provision(topicProvision, schemaProvision, aclProvision); + + System.out.println(status); this.state = status; return status; } finally { @@ -110,6 +107,33 @@ Status provision( } } + private Status provision( + final TopicProvision topicProvision, + final SchemaProvision schemaProvision, + final AclProvision aclProvision) { + final String userName = domainUserAlias.isBlank() ? apiSpec.id() : domainUserAlias; + + apiSpec.apiSpec().validate(); + + final Status.StatusBuilder status = + Status.builder() + .topics( + topicProvision.provision( + dryRun, cleanUnspecified, apiSpec, adminClient)); + if (!srDisabled) { + status.schemas( + schemaProvision.provision( + dryRun, cleanUnspecified, apiSpec, schemaPath, schemaRegistryClient)); + } + + if (!aclDisabled) { + status.acls( + aclProvision.provision( + dryRun, cleanUnspecified, apiSpec, userName, adminClient)); + } + return status.build(); + } + private void ensureSrClient(final SrClientFactory srClientFactory) { if (srDisabled || schemaRegistryClient != null) { return; @@ -149,51 +173,6 @@ private void ensureApiSpec(final SpecLoader specLoader) { apiSpec = specLoader.loadFromClassPath(specPath, Provisioner.class.getClassLoader()); } - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - private static Status provision( - final boolean aclEnabled, - final boolean dryRun, - final boolean cleanUnspecified, - final KafkaApiSpec apiSpec, - final String schemaResources, - final Admin adminClient, - final Optional schemaRegistryClient) { - - apiSpec.apiSpec().validate(); - - final var status = - Status.builder() - .topics( - TopicProvisioner.provision( - dryRun, cleanUnspecified, apiSpec, adminClient)); - schemaRegistryClient.ifPresent( - registryClient -> - status.schemas( - SchemaProvisioner.provision( - dryRun, - cleanUnspecified, - apiSpec, - schemaResources, - registryClient))); - if (aclEnabled) { - status.acls(AclProvisioner.provision(dryRun, cleanUnspecified, apiSpec, adminClient)); - } - return status.build(); - } - - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - @VisibleForTesting - interface ProvisionerMethod { - Status provision( - boolean aclEnabled, - boolean dryRun, - boolean cleanUnspecified, - KafkaApiSpec apiSpec, - String schemaResources, - Admin adminClient, - Optional schemaRegistryClient); - } - @VisibleForTesting interface AdminFactory { Admin adminClient(String brokerUrl, String username, String secret); @@ -209,4 +188,30 @@ SchemaRegistryClient schemaRegistryClient( interface SpecLoader { KafkaApiSpec loadFromClassPath(String spec, ClassLoader classLoader); } + + @VisibleForTesting + interface TopicProvision { + Collection provision( + boolean dryRun, boolean cleanUnspecified, KafkaApiSpec apiSpec, Admin adminClient); + } + + @VisibleForTesting + interface SchemaProvision { + Collection provision( + boolean dryRun, + boolean cleanUnspecified, + KafkaApiSpec apiSpec, + String baseResourcePath, + SchemaRegistryClient client); + } + + @VisibleForTesting + interface AclProvision { + Collection provision( + boolean dryRun, + boolean cleanUnspecified, + KafkaApiSpec apiSpec, + String userName, + Admin adminClient); + } } diff --git a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java index 87bafed5..ba808e98 100644 --- a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java @@ -129,7 +129,7 @@ private enum Domain { static void setUp() { try (Admin adminClient = KAFKA_ENV.adminClient()) { TopicProvisioner.provision(false, false, API_SPEC, adminClient); - AclProvisioner.provision(false, false, API_SPEC, adminClient); + AclProvisioner.provision(false, false, API_SPEC, API_SPEC.id(), adminClient); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java index 853ffd47..083999bc 100644 --- a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java @@ -17,14 +17,12 @@ package io.specmesh.kafka; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import io.specmesh.apiparser.model.SchemaInfo; import io.specmesh.test.TestSpecLoader; -import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Set; @@ -32,6 +30,8 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.acl.AclBinding; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; public class KafkaAPISpecTest { @@ -118,9 +118,11 @@ public void shouldListAppOwnedTopics() { assertThat(newTopics, hasSize(3)); } - @Test - public void shouldGenerateAclToAllowAnyOneToConsumePublicTopics() { - final Set acls = API_SPEC.requiredAcls(); + @ParameterizedTest + @ValueSource(strings = {"bob", ""}) + public void shouldGenerateAclToAllowAnyOneToConsumePublicTopics(final String username) { + final Set acls = + username.isBlank() ? API_SPEC.requiredAcls() : API_SPEC.requiredAcls(username); assertThat( acls.stream().map(Object::toString).collect(Collectors.toSet()), @@ -129,9 +131,12 @@ public void shouldGenerateAclToAllowAnyOneToConsumePublicTopics() { ExpectedAcl.DESCRIBE_PUBLIC_TOPICS.text)); } - @Test - public void shouldGenerateAclToAllowSpecificUsersToConsumeProtectedTopics() { - final Set acls = API_SPEC.requiredAcls(); + @ParameterizedTest + @ValueSource(strings = {"bob", ""}) + public void shouldGenerateAclToAllowSpecificUsersToConsumeProtectedTopics( + final String username) { + final Set acls = + username.isBlank() ? API_SPEC.requiredAcls() : API_SPEC.requiredAcls(username); assertThat( acls.stream().map(Object::toString).collect(Collectors.toSet()), @@ -140,53 +145,63 @@ public void shouldGenerateAclToAllowSpecificUsersToConsumeProtectedTopics() { ExpectedAcl.DESCRIBE_PROTECTED_TOPICS.text)); } - @Test - public void shouldGenerateAclToAllowControlOfOwnPrivateTopics() { - final Set acls = API_SPEC.requiredAcls(); + @ParameterizedTest + @ValueSource(strings = {"bob", ""}) + public void shouldGenerateAclToAllowControlOfOwnPrivateTopics(final String username) { + final Set acls = + username.isBlank() ? API_SPEC.requiredAcls() : API_SPEC.requiredAcls(username); assertThat( acls.stream().map(Object::toString).collect(Collectors.toSet()), hasItems( - ExpectedAcl.DESCRIBE_OWN_TOPICS.text, - ExpectedAcl.READ_OWN_TOPICS.text, - ExpectedAcl.WRITE_OWN_TOPICS.text, - ExpectedAcl.CREATE_OWN_PRIVATE_TOPICS.text)); + adjustAcl(ExpectedAcl.DESCRIBE_OWN_TOPICS.text, username), + adjustAcl(ExpectedAcl.READ_OWN_TOPICS.text, username), + adjustAcl(ExpectedAcl.WRITE_OWN_TOPICS.text, username), + adjustAcl(ExpectedAcl.CREATE_OWN_PRIVATE_TOPICS.text, username))); } - @Test - public void shouldGenerateAclsToAllowToUseOwnConsumerGroups() { - final Set acls = API_SPEC.requiredAcls(); + @ParameterizedTest + @ValueSource(strings = {"bob", ""}) + public void shouldGenerateAclsToAllowToUseOwnConsumerGroups(final String username) { + final Set acls = + username.isBlank() ? API_SPEC.requiredAcls() : API_SPEC.requiredAcls(username); assertThat( acls.stream().map(Object::toString).collect(Collectors.toSet()), - hasItems(ExpectedAcl.READ_OWN_GROUPS.text)); + hasItems(adjustAcl(ExpectedAcl.READ_OWN_GROUPS.text, username))); } - @Test - public void shouldGenerateAclsToAllowToUseOwnTransactionId() { - final Set acls = API_SPEC.requiredAcls(); + @ParameterizedTest + @ValueSource(strings = {"bob", ""}) + public void shouldGenerateAclsToAllowToUseOwnTransactionId(final String username) { + final Set acls = + username.isBlank() ? API_SPEC.requiredAcls() : API_SPEC.requiredAcls(username); assertThat( acls.stream().map(Object::toString).collect(Collectors.toSet()), - hasItems(ExpectedAcl.WRITE_OWN_TX_IDS.text, ExpectedAcl.DESCRIBE_OWN_TX_IDS.text)); + hasItems( + adjustAcl(ExpectedAcl.WRITE_OWN_TX_IDS.text, username), + adjustAcl(ExpectedAcl.DESCRIBE_OWN_TX_IDS.text, username))); } - @Test - public void shouldGenerateAclsToAllowIdempotentWriteOnOlderClusters() { - final Set acls = API_SPEC.requiredAcls(); + @ParameterizedTest + @ValueSource(strings = {"bob", ""}) + public void shouldGenerateAclsToAllowIdempotentWriteOnOlderClusters(final String username) { + final Set acls = + username.isBlank() ? API_SPEC.requiredAcls() : API_SPEC.requiredAcls(username); assertThat( acls.stream().map(Object::toString).collect(Collectors.toSet()), - hasItems(ExpectedAcl.OWN_IDEMPOTENT_WRITE.text)); + hasItems(adjustAcl(ExpectedAcl.OWN_IDEMPOTENT_WRITE.text, username))); } - @Test - void shouldNotHaveAnyAdditionalAcls() { - final Set acls = API_SPEC.requiredAcls(); + @ParameterizedTest + @ValueSource(strings = {"bob", ""}) + void shouldNotHaveAnyAdditionalAcls(final String username) { + final Set acls = + username.isBlank() ? API_SPEC.requiredAcls() : API_SPEC.requiredAcls(username); - assertThat( - acls.stream().map(Object::toString).collect(Collectors.toSet()), - containsInAnyOrder(Arrays.stream(ExpectedAcl.values()).map(e -> e.text).toArray())); + assertThat(acls, hasSize(ExpectedAcl.values().length)); } @Test @@ -196,4 +211,10 @@ public void shouldGetSchemaInfoForOwnedTopics() { "london.hammersmith.olympia.bigdatalondon._public.attendee"); assertThat(schema.flatMap(SchemaInfo::schemaIdLocation), is(Optional.of("header"))); } + + private String adjustAcl(final String acl, final String username) { + return username.isBlank() + ? acl + : acl.replaceAll("User:" + API_SPEC.id(), "User:" + username); + } } diff --git a/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java index 01e3eb77..72d7d411 100644 --- a/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java @@ -62,6 +62,7 @@ * Tests execution DryRuns and UPDATES where the provisioner-functional-test-api.yml is NOT * provisioned */ +@SuppressWarnings("OptionalGetWithoutIsPresent") @SuppressFBWarnings( value = "IC_INIT_CIRCULARITY", justification = "shouldHaveInitializedEnumsCorrectly() proves this is false positive") @@ -153,7 +154,8 @@ void shouldDryRunTopicsFromEmptyCluster() { void shouldDryRunACLsFromEmptyCluster() { try (Admin adminClient = KAFKA_ENV.adminClient()) { - final var changeset = AclProvisioner.provision(true, false, API_SPEC, adminClient); + final var changeset = + AclProvisioner.provision(true, false, API_SPEC, API_SPEC.id(), adminClient); // Verify - 11 created assertThat( @@ -286,7 +288,8 @@ void shouldProvisionTopicsFromEmptyCluster() throws ExecutionException, Interrup void shouldDoRealACLsFromEmptyCluster() throws ExecutionException, InterruptedException { try (Admin adminClient = KAFKA_ENV.adminClient()) { - final var changeset = AclProvisioner.provision(false, false, API_SPEC, adminClient); + final var changeset = + AclProvisioner.provision(false, false, API_SPEC, API_SPEC.id(), adminClient); // Verify - 11 created assertThat( diff --git a/kafka/src/test/java/io/specmesh/kafka/provision/AclProvisionerUpdateFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/provision/AclProvisionerUpdateFunctionalTest.java index 2d6025b5..5259f492 100644 --- a/kafka/src/test/java/io/specmesh/kafka/provision/AclProvisionerUpdateFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/provision/AclProvisionerUpdateFunctionalTest.java @@ -102,7 +102,7 @@ private enum Domain { @Order(1) void shouldProvisionExistingSpec() { try (Admin adminClient = KAFKA_ENV.adminClient()) { - AclProvisioner.provision(false, false, API_SPEC, adminClient); + AclProvisioner.provision(false, false, API_SPEC, API_SPEC.id(), adminClient); } } @@ -111,14 +111,16 @@ void shouldProvisionExistingSpec() { void shouldPublishUpdatedAcls() { try (Admin adminClient = KAFKA_ENV.adminClient()) { final var dryRunAcls = - AclProvisioner.provision(true, false, API_UPDATE_SPEC, adminClient); + AclProvisioner.provision( + true, false, API_UPDATE_SPEC, API_UPDATE_SPEC.id(), adminClient); assertThat(dryRunAcls, is(hasSize(2))); assertThat( dryRunAcls.stream().filter(acl -> acl.state().equals(STATE.CREATE)).count(), is(2L)); final var createdAcls = - AclProvisioner.provision(false, false, API_UPDATE_SPEC, adminClient); + AclProvisioner.provision( + false, false, API_UPDATE_SPEC, API_UPDATE_SPEC.id(), adminClient); assertThat(createdAcls, is(hasSize(2))); assertThat( @@ -149,11 +151,13 @@ void shouldCleanUnSpecdAcls() .all() .get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS); - final var dryRunAcls = AclProvisioner.provision(true, true, API_SPEC, adminClient); + final var dryRunAcls = + AclProvisioner.provision(true, true, API_SPEC, API_SPEC.id(), adminClient); assertThat(dryRunAcls, is(hasSize(1))); assertThat(dryRunAcls.iterator().next().state(), is(STATE.DELETE)); - final var createdAcls = AclProvisioner.provision(false, true, API_SPEC, adminClient); + final var createdAcls = + AclProvisioner.provision(false, true, API_SPEC, API_SPEC.id(), adminClient); assertThat(createdAcls, is(hasSize(1))); assertThat(createdAcls.iterator().next().state(), is(STATE.DELETED)); diff --git a/kafka/src/test/java/io/specmesh/kafka/provision/ProvisionerTest.java b/kafka/src/test/java/io/specmesh/kafka/provision/ProvisionerTest.java index e7f84b94..0a4f7512 100644 --- a/kafka/src/test/java/io/specmesh/kafka/provision/ProvisionerTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/provision/ProvisionerTest.java @@ -22,17 +22,20 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.specmesh.kafka.KafkaApiSpec; -import java.util.Optional; import org.apache.kafka.clients.admin.Admin; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -43,24 +46,27 @@ @MockitoSettings(strictness = Strictness.LENIENT) class ProvisionerTest { - @Mock private Provisioner.ProvisionerMethod provisionMethod; + private static final String DOMAIN_ID = "mktx.something"; + @Mock private Provisioner.AdminFactory adminFactory; @Mock private Provisioner.SrClientFactory srClientFactory; @Mock private Provisioner.SpecLoader specLoader; - @Mock private Status status; - @Mock private KafkaApiSpec spec; + @Mock private Provisioner.TopicProvision topicProvisioner; + @Mock private Provisioner.SchemaProvision schemaProvisioner; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private KafkaApiSpec spec; + @Mock private SchemaRegistryClient srClient; @Mock private Admin adminClient; + @Mock private Provisioner.AclProvision aclProvisioner; @BeforeEach void setUp() { - when(provisionMethod.provision( - anyBoolean(), anyBoolean(), anyBoolean(), any(), any(), any(), any())) - .thenReturn(status); - when(adminFactory.adminClient(any(), any(), any())).thenReturn(adminClient); when(srClientFactory.schemaRegistryClient(any(), any(), any())).thenReturn(srClient); when(specLoader.loadFromClassPath(any(), any())).thenReturn(spec); + when(spec.id()).thenReturn(DOMAIN_ID); } @Test @@ -89,10 +95,12 @@ void shouldThrowIfBrokerUrlNotProvided() { IllegalStateException.class, () -> provisioner.provision( - provisionMethod, adminFactory, srClientFactory, - specLoader)); + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner)); // Then: assertThat(e.getMessage(), is("Please set a broker url")); @@ -114,10 +122,12 @@ void shouldThrowIfSchemaRegistryUrlNotProvided() { IllegalStateException.class, () -> provisioner.provision( - provisionMethod, adminFactory, srClientFactory, - specLoader)); + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner)); // Then: assertThat(e.getMessage(), is("Please set a schema registry url")); @@ -135,7 +145,13 @@ void shouldNotThrowIfSchemaRegistryUrlNotProvidedWhenSchemasAreDisabled() { .build(); // When: - provisioner.provision(provisionMethod, adminFactory, srClientFactory, specLoader); + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); // Then: did not throw } @@ -156,10 +172,12 @@ void shouldThrowIfSpecPathNotProvided() { IllegalStateException.class, () -> provisioner.provision( - provisionMethod, adminFactory, srClientFactory, - specLoader)); + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner)); // Then: assertThat(e.getMessage(), is("Please set the path to the specification file")); @@ -168,7 +186,8 @@ void shouldThrowIfSpecPathNotProvided() { @Test void shouldNotThrowIfSpecPathNotProvidedButApiSpecIs() { // Given: - final KafkaApiSpec explicitSpec = mock(KafkaApiSpec.class); + final KafkaApiSpec explicitSpec = + mock(KafkaApiSpec.class, withSettings().defaultAnswer(RETURNS_DEEP_STUBS)); final Provisioner provisioner = Provisioner.builder() .brokerUrl("something") @@ -178,18 +197,21 @@ void shouldNotThrowIfSpecPathNotProvidedButApiSpecIs() { .build(); // When: - provisioner.provision(provisionMethod, adminFactory, srClientFactory, specLoader); + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); // Then: did not throw, and - verify(provisionMethod) - .provision( - anyBoolean(), - anyBoolean(), - anyBoolean(), - eq(explicitSpec), - any(), - any(), - any()); + verify(topicProvisioner).provision(anyBoolean(), anyBoolean(), eq(explicitSpec), any()); + verify(schemaProvisioner) + .provision(anyBoolean(), anyBoolean(), eq(explicitSpec), any(), any()); + verify(aclProvisioner) + .provision(anyBoolean(), anyBoolean(), eq(explicitSpec), any(), any()); + verify(explicitSpec.apiSpec()).validate(); } @Test @@ -198,14 +220,22 @@ void shouldWorkWithMinimalConfig() { final Provisioner provisioner = minimalBuilder().build(); // When: - provisioner.provision(provisionMethod, adminFactory, srClientFactory, specLoader); + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); // Then: verify(adminFactory).adminClient("kafka-url", null, null); verify(specLoader).loadFromClassPath("spec-path", Provisioner.class.getClassLoader()); verify(srClientFactory).schemaRegistryClient("sr-url", null, null); - verify(provisionMethod) - .provision(true, false, false, spec, null, adminClient, Optional.of(srClient)); + + verify(topicProvisioner).provision(false, false, spec, adminClient); + verify(schemaProvisioner).provision(false, false, spec, null, srClient); + verify(aclProvisioner).provision(false, false, spec, DOMAIN_ID, adminClient); } @Test @@ -214,7 +244,13 @@ void shouldUseAuthToCreateAdmin() { final Provisioner provisioner = minimalBuilder().username("bob").secret("shhhh!").build(); // When: - provisioner.provision(provisionMethod, adminFactory, srClientFactory, specLoader); + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); // Then: verify(adminFactory).adminClient("kafka-url", "bob", "shhhh!"); @@ -227,7 +263,13 @@ void shouldUseAuthToCreateSrClient() { minimalBuilder().srApiKey("bob").srApiSecret("shhhh!").build(); // When: - provisioner.provision(provisionMethod, adminFactory, srClientFactory, specLoader); + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); // Then: verify(srClientFactory).schemaRegistryClient("sr-url", "bob", "shhhh!"); @@ -239,11 +281,16 @@ void shouldSupportDisablingAcls() { final Provisioner provisioner = minimalBuilder().aclDisabled(true).build(); // When: - provisioner.provision(provisionMethod, adminFactory, srClientFactory, specLoader); + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); // Then: - verify(provisionMethod) - .provision(eq(false), anyBoolean(), anyBoolean(), any(), any(), any(), any()); + verify(aclProvisioner, never()).provision(anyBoolean(), anyBoolean(), any(), any(), any()); } @Test @@ -252,18 +299,17 @@ void shouldSupportDisablingSchemas() { final Provisioner provisioner = minimalBuilder().srDisabled(true).build(); // When: - provisioner.provision(provisionMethod, adminFactory, srClientFactory, specLoader); + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); // Then: - verify(provisionMethod) - .provision( - anyBoolean(), - anyBoolean(), - anyBoolean(), - any(), - any(), - any(), - eq(Optional.empty())); + verify(schemaProvisioner, never()) + .provision(anyBoolean(), anyBoolean(), any(), any(), any()); } @Test @@ -272,11 +318,18 @@ void shouldSupportDryRun() { final Provisioner provisioner = minimalBuilder().dryRun(true).build(); // When: - provisioner.provision(provisionMethod, adminFactory, srClientFactory, specLoader); + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); // Then: - verify(provisionMethod) - .provision(anyBoolean(), eq(true), anyBoolean(), any(), any(), any(), any()); + verify(topicProvisioner).provision(eq(true), anyBoolean(), any(), any()); + verify(schemaProvisioner).provision(eq(true), anyBoolean(), any(), any(), any()); + verify(aclProvisioner).provision(eq(true), anyBoolean(), any(), any(), any()); } @Test @@ -285,11 +338,18 @@ void shouldSupportCleaningUnspecific() { final Provisioner provisioner = minimalBuilder().cleanUnspecified(true).build(); // When: - provisioner.provision(provisionMethod, adminFactory, srClientFactory, specLoader); + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); // Then: - verify(provisionMethod) - .provision(anyBoolean(), anyBoolean(), eq(true), any(), any(), any(), any()); + verify(topicProvisioner).provision(anyBoolean(), eq(true), any(), any()); + verify(schemaProvisioner).provision(anyBoolean(), eq(true), any(), any(), any()); + verify(aclProvisioner).provision(anyBoolean(), eq(true), any(), any(), any()); } @Test @@ -298,18 +358,206 @@ void shouldSupportCustomSchemaRoot() { final Provisioner provisioner = minimalBuilder().schemaPath("schema-path").build(); // When: - provisioner.provision(provisionMethod, adminFactory, srClientFactory, specLoader); + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); + + // Then: + verify(schemaProvisioner) + .provision(anyBoolean(), anyBoolean(), any(), eq("schema-path"), any()); + } + + @Test + void shouldSupportUserAlias() { + // Given: + final Provisioner provisioner = minimalBuilder().domainUserAlias("bob").build(); + + // When: + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); + + // Then: + verify(aclProvisioner).provision(anyBoolean(), anyBoolean(), any(), eq("bob"), any()); + } + + @Test + void shouldValidateSpec() { + // Given: + final Provisioner provisioner = minimalBuilder().build(); + + // When: + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); + + // Then: + verify(spec.apiSpec()).validate(); + } + + @Test + void shouldSupportExplicitAdminClient() { + // Given: + final Admin userAdmin = mock(); + final Provisioner provisioner = minimalBuilder().adminClient(userAdmin).build(); + + // When: + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); + + // Then: + verify(topicProvisioner).provision(anyBoolean(), anyBoolean(), any(), eq(userAdmin)); + verify(aclProvisioner).provision(anyBoolean(), anyBoolean(), any(), any(), eq(userAdmin)); + } + + @Test + void shouldCloseAdminClient() { + // Given: + final Provisioner provisioner = minimalBuilder().build(); + + // When: + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); + + // Then: + verify(adminClient).close(); + } + + @Test + void shouldNotCloseUserSuppliedAdminClientByDefault() { + // Given: + final Admin userAdmin = mock(); + final Provisioner provisioner = minimalBuilder().adminClient(userAdmin).build(); + + // When: + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); + + // Then: + verify(userAdmin, never()).close(); + } + + @Test + void shouldCloseUserSuppliedAdminClient() { + // Given: + final Admin userAdmin = mock(); + final Provisioner provisioner = + minimalBuilder().adminClient(userAdmin).closeAdminClient(true).build(); + + // When: + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); + + // Then: + verify(userAdmin).close(); + } + + @Test + void shouldSupportExplicitSrClient() { + // Given: + final SchemaRegistryClient client = mock(); + final Provisioner provisioner = minimalBuilder().schemaRegistryClient(client).build(); + + // When: + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); + + // Then: + verify(schemaProvisioner).provision(anyBoolean(), anyBoolean(), any(), any(), eq(client)); + } + + @Test + void shouldCloseSrClient() throws Exception { + // Given: + final Provisioner provisioner = minimalBuilder().build(); + + // When: + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); + + // Then: + verify(srClient).close(); + } + + @Test + void shouldNotCloseUserSuppliedSrClientByDefault() throws Exception { + // Given: + final SchemaRegistryClient client = mock(); + final Provisioner provisioner = minimalBuilder().schemaRegistryClient(client).build(); + + // When: + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); + + // Then: + verify(client, never()).close(); + } + + @Test + void shouldCloseUserSuppliedSrClient() throws Exception { + // Given: + final SchemaRegistryClient client = mock(); + final Provisioner provisioner = + minimalBuilder().schemaRegistryClient(client).closeSchemaClient(true).build(); + + // When: + provisioner.provision( + adminFactory, + srClientFactory, + specLoader, + topicProvisioner, + schemaProvisioner, + aclProvisioner); // Then: - verify(provisionMethod) - .provision( - anyBoolean(), - anyBoolean(), - anyBoolean(), - any(), - eq("schema-path"), - any(), - any()); + verify(client).close(); } private static Provisioner.ProvisionerBuilder minimalBuilder() {