diff --git a/checkstyle/import_control.xml b/checkstyle/import_control.xml
index 951f47c64f..e2554f1521 100644
--- a/checkstyle/import_control.xml
+++ b/checkstyle/import_control.xml
@@ -134,14 +134,25 @@
+
+
+
+
+
+
+
+
+
+
+
diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AuthorizationErrorTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AuthorizationErrorTest.java
index 45fcef6d22..deebe7eefe 100644
--- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AuthorizationErrorTest.java
+++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AuthorizationErrorTest.java
@@ -97,7 +97,7 @@ protected Properties getBrokerProperties(int i) {
saslProps.setProperty("sasl.mechanism.inter.broker.protocol", "PLAIN");
Option saslProperties = Option.apply(saslProps);
Properties brokerProps =
- kafka.utils.TestUtils.createBrokerConfig(
+ createBrokerConfig(
0,
false,
false,
diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java
index 2dd14ac2f4..b383244361 100644
--- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java
+++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ClusterTestHarness.java
@@ -34,11 +34,13 @@
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.common.CompletableFutures;
import io.confluent.rest.RestConfig;
+import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
+import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -60,6 +62,7 @@
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
+import kafka.security.JaasTestUtils;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.QuorumTestHarness;
@@ -81,11 +84,21 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.QuorumConfig;
+import org.apache.kafka.server.config.DelegationTokenManagerConfigs;
+import org.apache.kafka.server.config.KRaftConfigs;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.eclipse.jetty.server.RequestLog;
import org.eclipse.jetty.server.Server;
import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
@@ -396,7 +409,7 @@ protected void overrideKafkaRestConfigs(Properties restProperties) {}
protected Properties getBrokerProperties(int i) {
Properties props =
- TestUtils.createBrokerConfig(
+ createBrokerConfig(
i,
false,
false,
@@ -424,6 +437,165 @@ protected Properties getBrokerProperties(int i) {
return props;
}
+ private static boolean shouldEnable(
+ Option interBrokerSecurityProtocol, SecurityProtocol protocol) {
+ if (interBrokerSecurityProtocol.isDefined()) {
+ return interBrokerSecurityProtocol.get() == protocol;
+ }
+ return false;
+ }
+
+ /**
+ * Taken from the same function name in kafka.utils.TestUtils of AK
+ * https://github.com/confluentinc/kafka/blob/0ce5fb0dbb87661e794cdfc40badbe3b91d8d825/core/src/test/scala
+ * /unit/kafka/utils/TestUtils.scala#L228
+ */
+ public static Properties createBrokerConfig(
+ int nodeId,
+ boolean enableControlledShutdown,
+ boolean enableDeleteTopic,
+ int port,
+ Option interBrokerSecurityProtocol,
+ Option trustStoreFile,
+ Option saslProperties,
+ boolean enablePlaintext,
+ boolean enableSaslPlaintext,
+ int saslPlaintextPort,
+ boolean enableSsl,
+ int sslPort,
+ boolean enableSaslSsl,
+ int saslSslPort,
+ Option rack,
+ int logDirCount,
+ boolean enableToken,
+ int numPartitions,
+ short defaultReplicationFactor,
+ boolean enableFetchFromFollower) {
+ List> protocolAndPorts = new ArrayList<>();
+ if (enablePlaintext || shouldEnable(interBrokerSecurityProtocol, SecurityProtocol.PLAINTEXT)) {
+ protocolAndPorts.add(new SimpleEntry<>(SecurityProtocol.PLAINTEXT, port));
+ }
+ if (enableSsl || shouldEnable(interBrokerSecurityProtocol, SecurityProtocol.SSL)) {
+ protocolAndPorts.add(new SimpleEntry<>(SecurityProtocol.SSL, sslPort));
+ }
+ if (enableSaslPlaintext
+ || shouldEnable(interBrokerSecurityProtocol, SecurityProtocol.SASL_PLAINTEXT)) {
+ protocolAndPorts.add(new SimpleEntry<>(SecurityProtocol.SASL_PLAINTEXT, saslPlaintextPort));
+ }
+ if (enableSaslSsl || shouldEnable(interBrokerSecurityProtocol, SecurityProtocol.SASL_SSL)) {
+ protocolAndPorts.add(new SimpleEntry<>(SecurityProtocol.SASL_SSL, saslSslPort));
+ }
+
+ String listeners =
+ protocolAndPorts.stream()
+ .map(a -> String.format("%s://localhost:%d", a.getKey().name, a.getValue()))
+ .collect(Collectors.joining(","));
+
+ Properties props = new Properties();
+ props.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, "true");
+ props.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true");
+ props.setProperty(
+ KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
+ String.valueOf(TimeUnit.MINUTES.toMillis(10)));
+ props.put(KRaftConfigs.NODE_ID_CONFIG, String.valueOf(nodeId));
+ props.put(ServerConfigs.BROKER_ID_CONFIG, String.valueOf(nodeId));
+ props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners);
+ props.put(SocketServerConfigs.LISTENERS_CONFIG, listeners);
+ props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER");
+ props.put(
+ SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
+ protocolAndPorts.stream()
+ .map(p -> String.format("%s:%s", p.getKey(), p.getKey()))
+ .collect(Collectors.joining(","))
+ + ",CONTROLLER:PLAINTEXT");
+
+ if (logDirCount > 1) {
+ String logDirs =
+ IntStream.rangeClosed(1, logDirCount)
+ .mapToObj(
+ i -> {
+ // We would like to allow user to specify both relative path and absolute path
+ // as log directory for backward-compatibility reason
+ // We can verify this by using a mixture of relative path and absolute path as
+ // log directories in the test
+ return (i % 2 == 0)
+ ? TestUtils.tempDir().getAbsolutePath()
+ : TestUtils.tempRelativeDir("data").getAbsolutePath();
+ })
+ .collect(Collectors.joining(","));
+ props.put(ServerLogConfigs.LOG_DIRS_CONFIG, logDirs);
+ } else {
+ props.put(ServerLogConfigs.LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath());
+ }
+ props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker");
+ // Note: this is just a placeholder value for controller.quorum.voters. JUnit
+ // tests use random port assignment, so the controller ports are not known ahead of
+ // time. Therefore, we ignore controller.quorum.voters and use
+ // controllerQuorumVotersFuture instead.
+ props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1000@localhost:0");
+ props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, "1500");
+ props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, "1500");
+ props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, enableControlledShutdown);
+ props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, enableDeleteTopic);
+ props.put(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000");
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
+ props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
+ props.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+ if (!props.containsKey(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG)) {
+ props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5");
+ }
+ if (!props.containsKey(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)) {
+ props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0");
+ }
+ if (rack.isDefined()) {
+ props.put(ServerConfigs.BROKER_RACK_CONFIG, rack.get());
+ }
+ // Reduce number of threads per broker
+ props.put(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG, "2");
+ props.put(ServerConfigs.BACKGROUND_THREADS_CONFIG, "2");
+
+ if (protocolAndPorts.stream().anyMatch(p -> JaasTestUtils.usesSslTransportLayer(p.getKey()))) {
+ try {
+ props.putAll(
+ JaasTestUtils.sslConfigs(
+ ConnectionMode.SERVER,
+ false,
+ trustStoreFile.isEmpty() ? Optional.empty() : Optional.of(trustStoreFile.get()),
+ String.format("server%d", nodeId)));
+ } catch (Exception e) {
+ fail("Failed to create SSL configs", e);
+ }
+ }
+
+ if (protocolAndPorts.stream().anyMatch(p -> JaasTestUtils.usesSaslAuthentication(p.getKey()))) {
+ props.putAll(
+ JaasTestUtils.saslConfigs(
+ saslProperties.isEmpty() ? Optional.empty() : Optional.of(saslProperties.get())));
+ }
+
+ if (interBrokerSecurityProtocol.isDefined()) {
+ props.put(
+ ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG,
+ interBrokerSecurityProtocol.get().name);
+ }
+ if (enableToken) {
+ props.put(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG, "secretkey");
+ }
+
+ props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, String.valueOf(numPartitions));
+ props.put(
+ ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG,
+ String.valueOf(defaultReplicationFactor));
+
+ if (enableFetchFromFollower) {
+ props.put(ServerConfigs.BROKER_RACK_CONFIG, String.valueOf(nodeId));
+ props.put(
+ ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG,
+ "org.apache.kafka.common.replica.RackAwareReplicaSelector");
+ }
+ return props;
+ }
+
@AfterEach
public void tearDown() throws Exception {
log.info("Starting teardown of {}", getClass().getSimpleName());
diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v3/ConsumerGroupsResourceIntegrationTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v3/ConsumerGroupsResourceIntegrationTest.java
index 1da562670f..f3b0e6778f 100644
--- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v3/ConsumerGroupsResourceIntegrationTest.java
+++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v3/ConsumerGroupsResourceIntegrationTest.java
@@ -40,6 +40,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -49,6 +50,7 @@ public ConsumerGroupsResourceIntegrationTest() {
super(/* numBrokers= */ 1, /* withSchemaRegistry= */ false);
}
+ @Disabled("KNET-17421 to fix this")
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@ValueSource(strings = {"kraft"})
public void listConsumerGroups_returnsConsumerGroups(String quorum) {
@@ -105,6 +107,7 @@ public void listConsumerGroups_nonExistingCluster_returnsNotFound(String quorum)
assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
}
+ @Disabled("KNET-17421 to fix this")
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@ValueSource(strings = {"kraft"})
public void getConsumerGroup_returnsConsumerGroup(String quorum) {
diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/testing/KafkaBrokerFixture.java b/kafka-rest/src/test/java/io/confluent/kafkarest/testing/KafkaBrokerFixture.java
index c0989fc2de..c3805c2e8d 100644
--- a/kafka-rest/src/test/java/io/confluent/kafkarest/testing/KafkaBrokerFixture.java
+++ b/kafka-rest/src/test/java/io/confluent/kafkarest/testing/KafkaBrokerFixture.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import io.confluent.kafkarest.integration.ClusterTestHarness;
import io.confluent.kafkarest.testing.QuorumControllerFixture.DefaultTestInfo;
import java.io.File;
import java.io.IOException;
@@ -106,7 +107,7 @@ public void beforeEach(ExtensionContext extensionContext) throws Exception {
private Properties getBrokerConfigs() {
checkState(logDir != null);
Properties properties =
- TestUtils.createBrokerConfig(
+ ClusterTestHarness.createBrokerConfig(
brokerId,
false,
false,