Skip to content

Commit

Permalink
Sasl docker network (#387)
Browse files Browse the repository at this point in the history
* Enable SASL on internal listener when enabling SASL.

The broker created by the `DockerKafkaEnvironment` test utility has two listeners: one used by other containers within the Docker network (`BROKER`) and one used by test code (`PLAINTEXT`: terrible confusing name!).

When SASL is enabled, previously only the `PLAINTEXT` was being switched to have SASL enabled with `SASL_PLAINTEXT`. The docker network listener, `BROKER`, was left as `PLAINTEXT`.

This mean any other Docker containers, including Schema Registry, would connect on an unauthenticated connection, and assigned the `ANONYMOUS` user: which was added as a super-user.

This is bad!

If people want to test their services and components work and they're running them in Docker, then those components needs to connect to an endpoint that requires authentication and, hence, has ACLs applied.

This PR enables SASL on the docker network listener `BROKER` when SASL is enabled, and only adds the `ANONYMOUS` super-user if SASL is not enabled.

* Fix warning

* Fix spelling of schema registry

---------

Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
big-andy-coates and Andy Coates authored Sep 19, 2024
1 parent 72bc79e commit b475dbf
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void shouldProvisionTopicsAndAclResourcesWithSchemas() throws Exception {
+ " --username admin"
+ " --secret admin-secret"
+ " --schema-registry "
+ KAFKA_ENV.schemeRegistryServer()
+ KAFKA_ENV.schemaRegistryServer()
+ " --schema-path ./resources"
+ " -Dsome.property=testOne"
+ " -Dsome.other.property=testTwo")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ private static <V> Consumer<Long, V> consumer(
API_SPEC.id(),
"unique-service-id",
KAFKA_ENV.kafkaBootstrapServers(),
KAFKA_ENV.schemeRegistryServer(),
KAFKA_ENV.schemaRegistryServer(),
LongDeserializer.class,
valueDeserializer,
true,
Expand Down Expand Up @@ -369,7 +369,7 @@ private static <V> Producer<Long, V> producer(
API_SPEC.id(),
UUID.randomUUID().toString(),
KAFKA_ENV.kafkaBootstrapServers(),
KAFKA_ENV.schemeRegistryServer(),
KAFKA_ENV.schemaRegistryServer(),
LongSerializer.class,
valueSerializer,
false,
Expand Down
110 changes: 94 additions & 16 deletions kafka-test/src/main/java/io/specmesh/kafka/DockerKafkaEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@
* .build();
* }</pre>
*
* The `KAFKA_ENV` can then be queried for the {@link #kafkaBootstrapServers() Kafka endpoint} and
* ths {@link #schemeRegistryServer() Schema Registry endpoint}.
* <p>The `KAFKA_ENV` can then be queried for the {@link #kafkaBootstrapServers() Kafka endpoint}
* and ths {@link #schemaRegistryServer() Schema Registry endpoint}, which can be used to
* communicate with the services from within test code.
*
* <p>Use {@link #dockerNetworkKafkaBootstrapServers()} and {@link
* #dockerNetworkSchemaRegistryServer()} to configure other Docker instances to communicate with
* Kafka and the Schema Registry, respectively.
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public final class DockerKafkaEnvironment
Expand All @@ -77,6 +82,9 @@ public final class DockerKafkaEnvironment
AfterEachCallback,
AfterAllCallback {

/** The port to use when connecting to Kafka from inside the Docker network */
public static final int KAFKA_DOCKER_NETWORK_PORT = 9092;

private final int startUpAttempts;
private final Duration startUpTimeout;
private final DockerImageName kafkaDockerImage;
Expand All @@ -91,7 +99,7 @@ public final class DockerKafkaEnvironment
private KafkaContainer kafkaBroker;
private SchemaRegistryContainer schemaRegistry;
private boolean invokedStatically = false;
private AtomicInteger setUpCount = new AtomicInteger(1);
private final AtomicInteger setUpCount = new AtomicInteger(1);

/**
* @return returns a {@link Builder} instance to allow customisation of the environment.
Expand Down Expand Up @@ -152,16 +160,57 @@ public void afterAll(final ExtensionContext context) {
tearDown();
}

/**
* @return bootstrap servers for connecting to Kafka from outside the Docker network, i.e. from
* test code
*/
@Override
public String kafkaBootstrapServers() {
return kafkaBroker.getBootstrapServers();
}

/**
* @return bootstrap servers for connecting to Kafka from inside the Docker network
*/
public String dockerNetworkKafkaBootstrapServers() {
final String protocol = adminUser.isPresent() ? "SASL_PLAINTEXT" : "PLAINTEXT";

return protocol
+ "://"
+ kafkaBroker.getNetworkAliases().get(0)
+ ":"
+ KAFKA_DOCKER_NETWORK_PORT;
}

/**
* @return bootstrap servers for connecting to Schema Registry from outside the Docker network,
* i.e. from test code
*/
@SuppressWarnings("deprecation")
@Override
public String schemeRegistryServer() {
return schemaRegistryServer();
}

/**
* @return bootstrap servers for connecting to Schema Registry from outside the Docker network,
* i.e. from test code
*/
@Override
public String schemaRegistryServer() {
return schemaRegistry.hostNetworkUrl().toString();
}

/**
* @return bootstrap servers for connecting to Schema Registry from inside the Docker network
*/
public String dockerNetworkSchemaRegistryServer() {
return "http://"
+ schemaRegistry.getNetworkAliases().get(0)
+ ":"
+ SchemaRegistryContainer.SCHEMA_REGISTRY_PORT;
}

@Override
public Admin adminClient() {
final Map<String, Object> properties = new HashMap<>();
Expand Down Expand Up @@ -206,10 +255,14 @@ private void setUp() {
image ->
schemaRegistry =
new SchemaRegistryContainer(srDockerImage.get())
.withKafka(kafkaBroker)
.withNetwork(network)
.dependsOn(kafkaBroker)
.withNetworkAliases("schema-registry")
.withStartupAttempts(startUpAttempts)
.withStartupTimeout(startUpTimeout)
.withEnv(
"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
dockerNetworkKafkaBootstrapServers())
.withEnv(srEnv))
.map(container -> (Startable) container)
.orElse(kafkaBroker);
Expand Down Expand Up @@ -252,15 +305,22 @@ private void installAcls() {
}
}

/**
* @deprecated use {@link #dockerNetworkKafkaBootstrapServers}
* @return docker network bootstrap servers
*/
@Deprecated
public String testNetworkKafkaBootstrapServers() {
return "PLAINTEXT://" + kafkaBroker.getNetworkAliases().get(0) + ":9092";
return dockerNetworkKafkaBootstrapServers();
}

/**
* @deprecated use {@link #dockerNetworkSchemaRegistryServer}
* @return docker network schema registry endpoint
*/
@Deprecated
public String testNetworkSchemeRegistryServer() {
return "http://"
+ schemaRegistry.getNetworkAliases().get(0)
+ ":"
+ SchemaRegistryContainer.SCHEMA_REGISTRY_PORT;
return dockerNetworkSchemaRegistryServer();
}

/** Builder of {@link DockerKafkaEnvironment}. */
Expand Down Expand Up @@ -401,6 +461,11 @@ public Builder withSchemaRegistryEnv(final Map<String, String> env) {
*
* <p>An {@code admin} user will be created
*
* <p>Enables SASL for both host and docker listeners, i.e. both for the listener test code
* will connect to and other Docker containers will connect to, including Schema Registry
* and other Kafka brokers. Schema Registry is correctly configured to connect to Kafka
* using the supplied {@code adminUser} and {@code adminPassword}
*
* @param adminUser name of the admin user.
* @param adminPassword password for the admin user.
* @param additionalUsers additional usernames and passwords or api-keys and tokens.
Expand Down Expand Up @@ -476,8 +541,9 @@ private void maybeEnableAcls() {
return;
}

final String adminUser = adminUser().map(u -> "User:" + u.userName + ";").orElse("");
withKafkaEnv("KAFKA_SUPER_USERS", adminUser + "User:ANONYMOUS");
final String adminUser =
adminUser().map(u -> "User:" + u.userName).orElse("User:ANONYMOUS");
withKafkaEnv("KAFKA_SUPER_USERS", adminUser);
withKafkaEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "false");
withKafkaEnv("KAFKA_AUTHORIZER_CLASS_NAME", "kafka.security.authorizer.AclAuthorizer");
}
Expand All @@ -488,13 +554,25 @@ private void maybeEnableSasl() {
return;
}

final String jaasConfig = buildJaasConfig(adminUser.get());

withKafkaEnv(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT");
withKafkaEnv(
"KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG",
buildJaasConfig(adminUser.get()));
"BROKER:SASL_PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT");
withKafkaEnv("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS", "PLAIN");
withKafkaEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN");
withKafkaEnv("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG", jaasConfig);
withKafkaEnv("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", jaasConfig);
withKafkaEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN");

Clients.clientSaslAuthProperties(adminUser.get().userName, adminUser.get().password)
.forEach(
(key, value) -> {
withSchemaRegistryEnv(
"SCHEMA_REGISTRY_KAFKASTORE_"
+ key.toUpperCase().replaceAll("\\.", "_"),
value.toString());
});
}

private String buildJaasConfig(final Credentials adminUser) {
Expand Down Expand Up @@ -522,7 +600,7 @@ private static class Credentials {

public CachedSchemaRegistryClient srClient() {
return new CachedSchemaRegistryClient(
schemeRegistryServer(),
schemaRegistryServer(),
5,
List.of(
new ProtobufSchemaProvider(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,16 @@ public interface KafkaEnvironment extends Extension {

/**
* @return Connection string for connecting to Schema Registry.
* @deprecated use {@link #schemaRegistryServer}
*/
@Deprecated
String schemeRegistryServer();

/**
* @return Connection string for connecting to Schema Registry.
*/
String schemaRegistryServer();

/**
* @return Returns an admin client for the Kafka cluster. Caller is responsible for closing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.specmesh.kafka.schema;

import static io.specmesh.kafka.DockerKafkaEnvironment.KAFKA_DOCKER_NETWORK_PORT;

import java.net.MalformedURLException;
import java.net.URL;
import org.testcontainers.containers.GenericContainer;
Expand All @@ -25,7 +27,6 @@
/** Test container for the Schema Registry */
public final class SchemaRegistryContainer extends GenericContainer<SchemaRegistryContainer> {

private static final int KAFKA_INSECURE_BROKER_CONNECTION = 9092;
private static final DockerImageName DEFAULT_IMAGE_NAME =
DockerImageName.parse("confluentinc/cp-schema-registry:7.5.3");

Expand Down Expand Up @@ -73,7 +74,7 @@ public SchemaRegistryContainer withKafka(final KafkaContainer kafka) {
"PLAINTEXT://"
+ kafka.getNetworkAliases().get(0)
+ ":"
+ KAFKA_INSECURE_BROKER_CONNECTION);
+ KAFKA_DOCKER_NETWORK_PORT);
dependsOn(kafka);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private static <V> Consumer<Long, V> consumer(
API_SPEC.id(),
UUID.randomUUID().toString(),
KAFKA_ENV.kafkaBootstrapServers(),
KAFKA_ENV.schemeRegistryServer(),
KAFKA_ENV.schemaRegistryServer(),
LongDeserializer.class,
valueDeserializer,
false,
Expand Down Expand Up @@ -332,7 +332,7 @@ private static <V> Producer<Long, V> producer(
API_SPEC.id(),
UUID.randomUUID().toString(),
KAFKA_ENV.kafkaBootstrapServers(),
KAFKA_ENV.schemeRegistryServer(),
KAFKA_ENV.schemaRegistryServer(),
LongSerializer.class,
valueSerializer,
false,
Expand Down Expand Up @@ -382,7 +382,7 @@ private AutoCloseable streamsApp(
API_SPEC.id(),
"streams-appid-service-thing",
KAFKA_ENV.kafkaBootstrapServers(),
KAFKA_ENV.schemeRegistryServer(),
KAFKA_ENV.schemaRegistryServer(),
Serdes.LongSerde.class,
KafkaProtobufSerde.class,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private static <V> Consumer<Long, V> consumer(
API_SPEC.id(),
UUID.randomUUID().toString(),
KAFKA_ENV.kafkaBootstrapServers(),
KAFKA_ENV.schemeRegistryServer(),
KAFKA_ENV.schemaRegistryServer(),
LongDeserializer.class,
valueDeserializer,
true,
Expand Down Expand Up @@ -168,7 +168,7 @@ private static <V> Producer<Long, V> producer(
API_SPEC.id(),
UUID.randomUUID().toString(),
KAFKA_ENV.kafkaBootstrapServers(),
KAFKA_ENV.schemeRegistryServer(),
KAFKA_ENV.schemaRegistryServer(),
LongSerializer.class,
valueSerializer,
false,
Expand Down

0 comments on commit b475dbf

Please sign in to comment.