From 41e5066adcf0cee25e2bc6f2042acdcee8a725e3 Mon Sep 17 00:00:00 2001 From: Truc Nguyen Date: Fri, 10 Jan 2025 12:50:57 +0000 Subject: [PATCH 1/6] Copy createBrokerConfig from AK to kafka rest to avoid depending on upstream --- .../integration/AuthorizationErrorTest.java | 2 +- .../integration/ClusterTestHarness.java | 168 +++++++++++++++++- .../kafkarest/testing/KafkaBrokerFixture.java | 3 +- 3 files changed, 170 insertions(+), 3 deletions(-) diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AuthorizationErrorTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AuthorizationErrorTest.java index 45fcef6d22..deebe7eefe 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AuthorizationErrorTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AuthorizationErrorTest.java @@ -97,7 +97,7 @@ protected Properties getBrokerProperties(int i) { saslProps.setProperty("sasl.mechanism.inter.broker.protocol", "PLAIN"); Option saslProperties = Option.apply(saslProps); Properties brokerProps = - kafka.utils.TestUtils.createBrokerConfig( + createBrokerConfig( 0, false, false, diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java index 2dd14ac2f4..6388075c17 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java @@ -34,11 +34,13 @@ import io.confluent.kafkarest.KafkaRestConfig; import io.confluent.kafkarest.common.CompletableFutures; import io.confluent.rest.RestConfig; +import java.io.File; import java.io.IOException; import java.net.ServerSocket; import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; +import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -60,6 +62,7 @@ import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Invocation; import javax.ws.rs.client.WebTarget; +import kafka.security.JaasTestUtils; import kafka.server.KafkaBroker; import kafka.server.KafkaConfig; import kafka.server.QuorumTestHarness; @@ -81,11 +84,21 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.network.ConnectionMode; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.server.config.DelegationTokenManagerConfigs; +import org.apache.kafka.server.config.KRaftConfigs; +import org.apache.kafka.server.config.ReplicationConfigs; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.storage.internals.log.CleanerConfig; import org.eclipse.jetty.server.RequestLog; import org.eclipse.jetty.server.Server; import org.glassfish.jersey.apache.connector.ApacheConnectorProvider; @@ -100,6 +113,7 @@ import scala.Option; import scala.collection.JavaConverters; import scala.collection.Seq; +import scala.jdk.javaapi.OptionConverters; /** * Test harness to run against a real, local Kafka cluster and REST proxy. This is essentially @@ -396,7 +410,7 @@ protected void overrideKafkaRestConfigs(Properties restProperties) {} protected Properties getBrokerProperties(int i) { Properties props = - TestUtils.createBrokerConfig( + createBrokerConfig( i, false, false, @@ -424,6 +438,158 @@ protected Properties getBrokerProperties(int i) { return props; } + private static boolean shouldEnable( + Option interBrokerSecurityProtocol, SecurityProtocol protocol) { + if (interBrokerSecurityProtocol.isDefined()) { + return interBrokerSecurityProtocol.get() == protocol; + } + return false; + } + + public static Properties createBrokerConfig( + int nodeId, + boolean enableControlledShutdown, + boolean enableDeleteTopic, + int port, + Option interBrokerSecurityProtocol, + Option trustStoreFile, + Option saslProperties, + boolean enablePlaintext, + boolean enableSaslPlaintext, + int saslPlaintextPort, + boolean enableSsl, + int sslPort, + boolean enableSaslSsl, + int saslSslPort, + Option rack, + int logDirCount, + boolean enableToken, + int numPartitions, + short defaultReplicationFactor, + boolean enableFetchFromFollower) { + List> protocolAndPorts = new ArrayList<>(); + if (enablePlaintext || shouldEnable(interBrokerSecurityProtocol, SecurityProtocol.PLAINTEXT)) { + protocolAndPorts.add(new SimpleEntry<>(SecurityProtocol.PLAINTEXT, port)); + } + if (enableSsl || shouldEnable(interBrokerSecurityProtocol, SecurityProtocol.SSL)) { + protocolAndPorts.add(new SimpleEntry<>(SecurityProtocol.SSL, sslPort)); + } + if (enableSaslPlaintext + || shouldEnable(interBrokerSecurityProtocol, SecurityProtocol.SASL_PLAINTEXT)) { + protocolAndPorts.add(new SimpleEntry<>(SecurityProtocol.SASL_PLAINTEXT, saslPlaintextPort)); + } + if (enableSaslSsl || shouldEnable(interBrokerSecurityProtocol, SecurityProtocol.SASL_SSL)) { + protocolAndPorts.add(new SimpleEntry<>(SecurityProtocol.SASL_SSL, saslSslPort)); + } + + String listeners = + protocolAndPorts.stream() + .map(a -> String.format("%s://localhost:%d", a.getKey().name, a.getValue())) + .collect(Collectors.joining(",")); + + Properties props = new Properties(); + props.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, "true"); + props.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"); + props.setProperty( + KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, + String.valueOf(TimeUnit.MINUTES.toMillis(10))); + props.put(KRaftConfigs.NODE_ID_CONFIG, nodeId); + props.put(ServerConfigs.BROKER_ID_CONFIG, nodeId); + props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners); + props.put(SocketServerConfigs.LISTENERS_CONFIG, listeners); + props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER"); + props.put( + SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, + protocolAndPorts.stream() + .map(p -> String.format("%s:%s", p.getKey(), p.getKey())) + .collect(Collectors.joining(",")) + + ",CONTROLLER:PLAINTEXT"); + + if (logDirCount > 1) { + String logDirs = + IntStream.rangeClosed(1, logDirCount) + .mapToObj( + i -> { + // We would like to allow user to specify both relative path and absolute path + // as log directory for backward-compatibility reason + // We can verify this by using a mixture of relative path and absolute path as + // log directories in the test + return (i % 2 == 0) + ? TestUtils.tempDir().getAbsolutePath() + : TestUtils.tempRelativeDir("data").getAbsolutePath(); + }) + .collect(Collectors.joining(",")); + props.put(ServerLogConfigs.LOG_DIRS_CONFIG, logDirs); + } else { + props.put(ServerLogConfigs.LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath()); + } + props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker"); + // Note: this is just a placeholder value for controller.quorum.voters. JUnit + // tests use random port assignment, so the controller ports are not known ahead of + // time. Therefore, we ignore controller.quorum.voters and use + // controllerQuorumVotersFuture instead. + props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1000@localhost:0"); + props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, "1500"); + props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, "1500"); + props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, enableControlledShutdown); + props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, enableDeleteTopic); + props.put(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000"); + props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152"); + props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); + props.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100"); + if (!props.containsKey(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG)) { + props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5"); + } + if (!props.containsKey(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)) { + props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0"); + } + if (rack.isDefined()) { + props.put(ServerConfigs.BROKER_RACK_CONFIG, rack.get()); + } + // Reduce number of threads per broker + props.put(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG, "2"); + props.put(ServerConfigs.BACKGROUND_THREADS_CONFIG, "2"); + + if (protocolAndPorts.stream().anyMatch(p -> JaasTestUtils.usesSslTransportLayer(p.getKey()))) { + try { + props.putAll( + JaasTestUtils.sslConfigs( + ConnectionMode.SERVER, + false, + OptionConverters.toJava(trustStoreFile), + String.format("server%d", nodeId))); + } catch (Exception e) { + fail("Failed to create SSL configs", e); + } + } + + if (protocolAndPorts.stream().anyMatch(p -> JaasTestUtils.usesSaslAuthentication(p.getKey()))) { + props.putAll( + JaasTestUtils.saslConfigs( + saslProperties.isEmpty() ? Optional.empty() : Optional.of(saslProperties.get()))); + } + + if (interBrokerSecurityProtocol.isDefined()) { + props.put( + ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, + interBrokerSecurityProtocol.get().name); + } + if (enableToken) { + props.put(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG, "secretkey"); + } + + props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, numPartitions); + props.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, defaultReplicationFactor); + + if (enableFetchFromFollower) { + props.put(ServerConfigs.BROKER_RACK_CONFIG, nodeId); + props.put( + ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, + "org.apache.kafka.common.replica.RackAwareReplicaSelector"); + } + return props; + } + @AfterEach public void tearDown() throws Exception { log.info("Starting teardown of {}", getClass().getSimpleName()); diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/testing/KafkaBrokerFixture.java b/kafka-rest/src/test/java/io/confluent/kafkarest/testing/KafkaBrokerFixture.java index c0989fc2de..c3805c2e8d 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/testing/KafkaBrokerFixture.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/testing/KafkaBrokerFixture.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.confluent.kafkarest.integration.ClusterTestHarness; import io.confluent.kafkarest.testing.QuorumControllerFixture.DefaultTestInfo; import java.io.File; import java.io.IOException; @@ -106,7 +107,7 @@ public void beforeEach(ExtensionContext extensionContext) throws Exception { private Properties getBrokerConfigs() { checkState(logDir != null); Properties properties = - TestUtils.createBrokerConfig( + ClusterTestHarness.createBrokerConfig( brokerId, false, false, From 511c1c6ad75c1086a2b9b401911e0b13c71bc262 Mon Sep 17 00:00:00 2001 From: Truc Nguyen Date: Fri, 10 Jan 2025 14:38:58 +0000 Subject: [PATCH 2/6] Fix import control --- checkstyle/import_control.xml | 11 +++++++++++ .../kafkarest/integration/ClusterTestHarness.java | 3 +-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/checkstyle/import_control.xml b/checkstyle/import_control.xml index 951f47c64f..e2554f1521 100644 --- a/checkstyle/import_control.xml +++ b/checkstyle/import_control.xml @@ -134,14 +134,25 @@ + + + + + + + + + + + diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java index 6388075c17..b9e64876ab 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java @@ -113,7 +113,6 @@ import scala.Option; import scala.collection.JavaConverters; import scala.collection.Seq; -import scala.jdk.javaapi.OptionConverters; /** * Test harness to run against a real, local Kafka cluster and REST proxy. This is essentially @@ -556,7 +555,7 @@ public static Properties createBrokerConfig( JaasTestUtils.sslConfigs( ConnectionMode.SERVER, false, - OptionConverters.toJava(trustStoreFile), + trustStoreFile.isEmpty() ? Optional.empty() : Optional.of(trustStoreFile.get()), String.format("server%d", nodeId))); } catch (Exception e) { fail("Failed to create SSL configs", e); From 825fc877a66275fbe8430c300921353f5c2bb1c3 Mon Sep 17 00:00:00 2001 From: Truc Nguyen Date: Fri, 10 Jan 2025 16:20:12 +0000 Subject: [PATCH 3/6] Use string values --- .../kafkarest/integration/ClusterTestHarness.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java index b9e64876ab..2e35fca4e2 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java @@ -492,8 +492,8 @@ public static Properties createBrokerConfig( props.setProperty( KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, String.valueOf(TimeUnit.MINUTES.toMillis(10))); - props.put(KRaftConfigs.NODE_ID_CONFIG, nodeId); - props.put(ServerConfigs.BROKER_ID_CONFIG, nodeId); + props.put(KRaftConfigs.NODE_ID_CONFIG, String.valueOf(nodeId)); + props.put(ServerConfigs.BROKER_ID_CONFIG, String.valueOf(nodeId)); props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners); props.put(SocketServerConfigs.LISTENERS_CONFIG, listeners); props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER"); @@ -577,11 +577,11 @@ public static Properties createBrokerConfig( props.put(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG, "secretkey"); } - props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, numPartitions); - props.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, defaultReplicationFactor); + props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, String.valueOf(numPartitions)); + props.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, String.valueOf(defaultReplicationFactor)); if (enableFetchFromFollower) { - props.put(ServerConfigs.BROKER_RACK_CONFIG, nodeId); + props.put(ServerConfigs.BROKER_RACK_CONFIG, String.valueOf(nodeId)); props.put( ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, "org.apache.kafka.common.replica.RackAwareReplicaSelector"); From 5664f6efb8933fb02c4726338d36b8011c0f95e4 Mon Sep 17 00:00:00 2001 From: Truc Nguyen Date: Fri, 10 Jan 2025 16:56:41 +0000 Subject: [PATCH 4/6] Fix checkstyle --- .../confluent/kafkarest/integration/ClusterTestHarness.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java index 2e35fca4e2..2c5958b7f4 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java @@ -578,7 +578,9 @@ public static Properties createBrokerConfig( } props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, String.valueOf(numPartitions)); - props.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, String.valueOf(defaultReplicationFactor)); + props.put( + ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, + String.valueOf(defaultReplicationFactor)); if (enableFetchFromFollower) { props.put(ServerConfigs.BROKER_RACK_CONFIG, String.valueOf(nodeId)); From 238d332744fff8d713dea33f3e7e1ad39b70c6dc Mon Sep 17 00:00:00 2001 From: Truc Nguyen Date: Tue, 4 Feb 2025 15:18:55 +0000 Subject: [PATCH 5/6] Add link to source --- .../confluent/kafkarest/integration/ClusterTestHarness.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java index 2c5958b7f4..b383244361 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java @@ -445,6 +445,11 @@ private static boolean shouldEnable( return false; } + /** + * Taken from the same function name in kafka.utils.TestUtils of AK + * https://github.com/confluentinc/kafka/blob/0ce5fb0dbb87661e794cdfc40badbe3b91d8d825/core/src/test/scala + * /unit/kafka/utils/TestUtils.scala#L228 + */ public static Properties createBrokerConfig( int nodeId, boolean enableControlledShutdown, From 6bcf25d30ab600caa40ca17e49497ed45fe6c8f5 Mon Sep 17 00:00:00 2001 From: Truc Nguyen Date: Tue, 4 Feb 2025 16:16:30 +0000 Subject: [PATCH 6/6] Disable consumergroup tests --- .../integration/v3/ConsumerGroupsResourceIntegrationTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v3/ConsumerGroupsResourceIntegrationTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v3/ConsumerGroupsResourceIntegrationTest.java index 1da562670f..f3b0e6778f 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v3/ConsumerGroupsResourceIntegrationTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v3/ConsumerGroupsResourceIntegrationTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.BytesDeserializer; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -49,6 +50,7 @@ public ConsumerGroupsResourceIntegrationTest() { super(/* numBrokers= */ 1, /* withSchemaRegistry= */ false); } + @Disabled("KNET-17421 to fix this") @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"kraft"}) public void listConsumerGroups_returnsConsumerGroups(String quorum) { @@ -105,6 +107,7 @@ public void listConsumerGroups_nonExistingCluster_returnsNotFound(String quorum) assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus()); } + @Disabled("KNET-17421 to fix this") @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"kraft"}) public void getConsumerGroup_returnsConsumerGroup(String quorum) {