Skip to content

Commit

Permalink
Support generated usernames (#341)
Browse files Browse the repository at this point in the history
* Support generated usernames

Some platforms do not allow the administrator to control / choose the name of the principle. Instead, it is a generated name.

For example, Confluent Cloud when using Service Accounts, uses the system generated account id as the principle name, e.g. `sa-l35n4kd`

Specmesh now supports tweaking the ACLs created to use the system generated principle name via:

1. the `Provisioner.builder().domainUserAlias("bob")` method.
2. the `--domain-user` or `-du` command line parameters to `Provision` command.


---------

Co-authored-by: Andrew Coates <[email protected]>
  • Loading branch information
big-andy-coates and Andrew Coates authored Jun 20, 2024
1 parent 4ce4874 commit 746207a
Show file tree
Hide file tree
Showing 10 changed files with 539 additions and 203 deletions.
28 changes: 19 additions & 9 deletions cli/src/main/java/io/specmesh/cli/Provision.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,8 @@ public static void main(final String[] args) {
+ "` from:"
+ new File(propertyFilename).getAbsolutePath());
properties.load(fis);
properties
.entrySet()
.forEach(
entry -> {
properties.put(
entry.getKey().toString().replace(".", "-"),
entry.getValue());
});
properties.forEach(
(key, value) -> properties.put(key.toString().replace(".", "-"), value));
System.out.println(
"Loaded `properties` from cwd:" + new File(propertyFilename).getAbsolutePath());
} catch (IOException e) {
Expand All @@ -82,7 +76,7 @@ public static void main(final String[] args) {
+ new File(propertyFilename).getAbsolutePath()
+ "\nERROR:"
+ e);
e.printStackTrace();
throw new RuntimeException(e);
}

final var provider = new CommandLine.PropertiesDefaultProvider(properties);
Expand Down Expand Up @@ -146,6 +140,22 @@ public void spec(final String path) {
builder.specPath(path);
}

@Option(
names = {"-du", "--domain-user"},
description =
"optional custom domain user, to be used when creating ACLs. By default,"
+ " specmesh expects the principle used to authenticate with Kafka to have"
+ " the same name as the domain id. For example, given a domain id of"
+ " 'urn:acme.products', specmesh expects the user to be called"
+ " 'acme.products', and creates ACLs accordingly. In some situations, e.g."
+ " Confluent Cloud Service Accounts, the username is system generated or"
+ " outside control of administrators. In these situations, use this"
+ " option to provide the generated username and specmesh will provision"
+ " ACLs accordingly.")
public void domainUserAlias(final String alias) {
builder.domainUserAlias(alias);
}

@Option(
names = {"-u", "--username"},
description = "username or api key for the Kafka cluster connection")
Expand Down
12 changes: 5 additions & 7 deletions kafka/src/main/java/io/specmesh/kafka/Clients.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public static Map<String, Object> clientSaslAuthProperties(
* @return true if principal was set
*/
private static boolean isPrincipalSpecified(final String principal) {
return principal != null && principal.length() > 0;
return principal != null && !principal.isEmpty();
}

private static String buildJaasConfig(final String userName, final String password) {
Expand Down Expand Up @@ -226,7 +226,7 @@ public static Map<String, Object> producerProperties(
final Class<?> valueSerializerClass,
final boolean acksAll,
final Map<String, Object>... additionalProperties) {
final Map<String, Object> props = clientProperties(domainId, bootstrapServers);
final Map<String, Object> props = clientProperties(bootstrapServers);
props.putAll(
Map.of(
AdminClientConfig.CLIENT_ID_CONFIG,
Expand Down Expand Up @@ -279,7 +279,7 @@ public static Map<String, Object> kstreamsProperties(
final boolean acksAll,
final Map<String, Object>... additionalProperties) {

final Map<String, Object> props = clientProperties(domainId, bootstrapServers);
final Map<String, Object> props = clientProperties(bootstrapServers);
props.putAll(
Map.of(
StreamsConfig.APPLICATION_ID_CONFIG,
Expand Down Expand Up @@ -354,7 +354,7 @@ public static Map<String, Object> consumerProperties(
final Class<?> valueDeserializerClass,
final boolean autoOffsetResetEarliest,
final Map<String, Object>... additionalProperties) {
final Map<String, Object> props = clientProperties(domainId, bootstrapServers);
final Map<String, Object> props = clientProperties(bootstrapServers);
props.putAll(
Map.of(
ConsumerConfig.CLIENT_ID_CONFIG,
Expand All @@ -375,10 +375,8 @@ public static Map<String, Object> consumerProperties(
return props;
}

private static Map<String, Object> clientProperties(
final String domainId, final String bootstrapServers) {
private static Map<String, Object> clientProperties(final String bootstrapServers) {
final Map<String, Object> basicProps = new HashMap<>();
basicProps.put(AdminClientConfig.CLIENT_ID_CONFIG, domainId);
basicProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return basicProps;
}
Expand Down
70 changes: 46 additions & 24 deletions kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,19 @@ public List<NewTopic> listDomainOwnedTopics() {
*/
@Deprecated
public List<AclBinding> listACLsForDomainOwnedTopics() {
return listACLsForDomainOwnedTopics(id());
}

private List<AclBinding> listACLsForDomainOwnedTopics(final String user) {
validateTopicConfig();

final List<AclBinding> topicAcls = new ArrayList<>();
topicAcls.addAll(ownTopicAcls());
topicAcls.addAll(ownTransactionIdsAcls());
topicAcls.addAll(ownTopicAcls(user));
topicAcls.addAll(ownTransactionIdsAcls(user));
topicAcls.addAll(publicTopicAcls());
topicAcls.addAll(protectedTopicAcls());
topicAcls.addAll(privateTopicAcls());
topicAcls.addAll(prefixedAcls(CLUSTER, CLUSTER_NAME, principal(), IDEMPOTENT_WRITE));
topicAcls.addAll(privateTopicAcls(user));
topicAcls.addAll(prefixedAcls(CLUSTER, CLUSTER_NAME, principal(user), IDEMPOTENT_WRITE));
return topicAcls;
}

Expand All @@ -142,9 +146,33 @@ public List<AclBinding> listACLsForDomainOwnedTopics() {
* @return returns the set of required acls.
*/
public Set<AclBinding> requiredAcls() {
return requiredAcls(id());
}

/**
* Get the set of required ACLs for this domain spec, supplying a custom username.
*
* <p>Standard convention is to use the domain id for the username. Only use an alternative name
* if required.
*
* <p>This includes {@code ALLOW} ACLs for:
*
* <ul>
* <li>Everyone to consume the spec's public topics
* <li>Specifically configured domains to consume the spec's protected topics
* <li>The spec's domain to be able to create ad-hoc private topics
* <li>The spec's domain to produce and consume its topics
* <li>The spec's domain to use its own consumer groups
* <li>The spec's domain to use its own transaction ids
* </ul>
*
* @param userName the username to use as the principal in the acl bindings.
* @return returns the set of required acls.
*/
public Set<AclBinding> requiredAcls(final String userName) {
final Set<AclBinding> acls = new HashSet<>();
acls.addAll(ownGroupAcls());
acls.addAll(listACLsForDomainOwnedTopics());
acls.addAll(ownGroupAcls(userName));
acls.addAll(listACLsForDomainOwnedTopics(userName));
acls.addAll(grantAccessControlUsingGrantTagOnly());
return acls;
}
Expand Down Expand Up @@ -199,13 +227,7 @@ public Stream<SchemaInfo> topicSchemas(final String topicName) {
.flatMap(Optional::stream);
}

/**
* Format the principal
*
* @param domainIdAsUsername the domain id
* @return the principal
*/
public static String formatPrincipal(final String domainIdAsUsername) {
private static String formatPrincipal(final String domainIdAsUsername) {
return domainIdAsUsername.equals(PUBLIC) ? "User:*" : "User:" + domainIdAsUsername;
}

Expand All @@ -229,20 +251,20 @@ private void validateTopicConfig() {
});
}

private String principal() {
return formatPrincipal(id());
private String principal(final String user) {
return formatPrincipal(user);
}

private Set<AclBinding> ownGroupAcls() {
return prefixedAcls(GROUP, id(), principal(), READ);
private Set<AclBinding> ownGroupAcls(final String user) {
return prefixedAcls(GROUP, id(), principal(user), READ);
}

private Set<AclBinding> ownTopicAcls() {
return prefixedAcls(TOPIC, id(), principal(), DESCRIBE, READ, WRITE);
private Set<AclBinding> ownTopicAcls(final String user) {
return prefixedAcls(TOPIC, id(), principal(user), DESCRIBE, READ, WRITE);
}

private Set<AclBinding> ownTransactionIdsAcls() {
return prefixedAcls(TRANSACTIONAL_ID, id(), principal(), DESCRIBE, WRITE);
private Set<AclBinding> ownTransactionIdsAcls(final String user) {
return prefixedAcls(TRANSACTIONAL_ID, id(), principal(user), DESCRIBE, WRITE);
}

private Set<AclBinding> publicTopicAcls() {
Expand Down Expand Up @@ -310,7 +332,7 @@ private List<AclBinding> grantAccessControlUsingGrantTagOnly() {
/**
* the path is using public,private explicit based control
*
* @param key
* @param key resource name
* @return true if it is
*/
private boolean isUsingPathPerms(final String key) {
Expand All @@ -319,8 +341,8 @@ private boolean isUsingPathPerms(final String key) {
|| key.startsWith(id() + DELIMITER + PUBLIC + DELIMITER);
}

private Set<AclBinding> privateTopicAcls() {
return prefixedAcls(TOPIC, id() + DELIMITER + PRIVATE, principal(), CREATE);
private Set<AclBinding> privateTopicAcls(final String user) {
return prefixedAcls(TOPIC, id() + DELIMITER + PRIVATE, principal(user), CREATE);
}

private static Set<AclBinding> literalAcls(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,36 @@ public final class AclProvisioner {

/** defensive */
private AclProvisioner() {}

/**
* Provision acls in the Kafka cluster
*
* @param dryRun for mode of operation
* @param cleanUnspecified remove unwanted
* @param apiSpec respect the spec
* @param adminClient cluster connection
* @return status of provisioning
* @throws ProvisioningException on interrupt
* @deprecated use {@link AclProvisioner#provision(boolean, boolean, KafkaApiSpec, String,
* Admin)}
*/
@Deprecated(forRemoval = true, since = "0.10.1")
public static Collection<Acl> provision(
final boolean dryRun,
final boolean cleanUnspecified,
final KafkaApiSpec apiSpec,
final Admin adminClient) {

return provision(dryRun, cleanUnspecified, apiSpec, apiSpec.id(), adminClient);
}

/**
* Provision acls in the Kafka cluster
*
* @param dryRun for mode of operation
* @param cleanUnspecified remove unwanted
* @param apiSpec respect the spec
* @param userName user name
* @param adminClient cluster connection
* @return status of provisioning
* @throws ProvisioningException on interrupt
Expand All @@ -49,9 +73,10 @@ public static Collection<Acl> provision(
final boolean dryRun,
final boolean cleanUnspecified,
final KafkaApiSpec apiSpec,
final String userName,
final Admin adminClient) {

final var requiredAcls = bindingsToAcls(apiSpec.requiredAcls());
final var requiredAcls = bindingsToAcls(apiSpec.requiredAcls(userName));
final var existing = reader(adminClient).read(apiSpec.id(), requiredAcls);

final var required = calculator(cleanUnspecified).calculate(existing, requiredAcls);
Expand Down
Loading

0 comments on commit 746207a

Please sign in to comment.