Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy TestUtils#createBrokerConfig to avoid depending on upstream internal API #1341

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions checkstyle/import_control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,25 @@
<!-- b. Non-public Apache Kafka APIs -->
<allow class="org.apache.kafka.clients.CommonClientConfigs" />
<allow class="org.apache.kafka.common.config.types.Password" />
<allow class="org.apache.kafka.common.network.ConnectionMode" />
<allow class="org.apache.kafka.common.network.ListenerName" />
<allow class="org.apache.kafka.common.protocol.Errors" />
<allow class="org.apache.kafka.common.security.JaasUtils" />
<allow class="org.apache.kafka.coordinator.group.GroupCoordinatorConfig" />
<allow class="org.apache.kafka.metadata.authorizer.StandardAuthorizer" />
<allow class="org.apache.kafka.network.SocketServerConfigs" />
<allow class="org.apache.kafka.raft.QuorumConfig" />
<allow class="org.apache.kafka.server.config.DelegationTokenManagerConfigs" />
<allow class="org.apache.kafka.server.config.KRaftConfigs" />
<allow class="org.apache.kafka.server.config.ReplicationConfigs" />
<allow class="org.apache.kafka.server.config.ServerConfigs" />
<allow class="org.apache.kafka.server.config.ServerLogConfigs" />
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
<allow class="org.apache.kafka.test.TestSslUtils" />
<!-- c. Non-public Confluent Community Kafka APIs -->
<allow class="kafka.admin.AclCommand" />
<allow class="kafka.security.authorizer.AclAuthorizer" />
<allow class="kafka.security.JaasTestUtils" />
<allow class="kafka.server.KafkaConfig" />
<allow class="kafka.server.KafkaServer" />
<allow class="kafka.server.KafkaBroker" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected Properties getBrokerProperties(int i) {
saslProps.setProperty("sasl.mechanism.inter.broker.protocol", "PLAIN");
Option<Properties> saslProperties = Option.apply(saslProps);
Properties brokerProps =
kafka.utils.TestUtils.createBrokerConfig(
createBrokerConfig(
0,
false,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.common.CompletableFutures;
import io.confluent.rest.RestConfig;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -60,6 +62,7 @@
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import kafka.security.JaasTestUtils;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.QuorumTestHarness;
Expand All @@ -81,11 +84,21 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.config.DelegationTokenManagerConfigs;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.eclipse.jetty.server.RequestLog;
import org.eclipse.jetty.server.Server;
import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
Expand Down Expand Up @@ -396,7 +409,7 @@ protected void overrideKafkaRestConfigs(Properties restProperties) {}

protected Properties getBrokerProperties(int i) {
Properties props =
TestUtils.createBrokerConfig(
createBrokerConfig(
i,
false,
false,
Expand Down Expand Up @@ -424,6 +437,165 @@ protected Properties getBrokerProperties(int i) {
return props;
}

private static boolean shouldEnable(
Option<SecurityProtocol> interBrokerSecurityProtocol, SecurityProtocol protocol) {
if (interBrokerSecurityProtocol.isDefined()) {
return interBrokerSecurityProtocol.get() == protocol;
}
return false;
}

/**
* Taken from the same function name in kafka.utils.TestUtils of AK
* https://github.com/confluentinc/kafka/blob/0ce5fb0dbb87661e794cdfc40badbe3b91d8d825/core/src/test/scala
* /unit/kafka/utils/TestUtils.scala#L228
*/
public static Properties createBrokerConfig(
trnguyencflt marked this conversation as resolved.
Show resolved Hide resolved
int nodeId,
boolean enableControlledShutdown,
boolean enableDeleteTopic,
int port,
Option<SecurityProtocol> interBrokerSecurityProtocol,
Option<File> trustStoreFile,
Option<Properties> saslProperties,
boolean enablePlaintext,
boolean enableSaslPlaintext,
int saslPlaintextPort,
boolean enableSsl,
int sslPort,
boolean enableSaslSsl,
int saslSslPort,
Option<String> rack,
int logDirCount,
boolean enableToken,
int numPartitions,
short defaultReplicationFactor,
boolean enableFetchFromFollower) {
List<Map.Entry<SecurityProtocol, Integer>> protocolAndPorts = new ArrayList<>();
if (enablePlaintext || shouldEnable(interBrokerSecurityProtocol, SecurityProtocol.PLAINTEXT)) {
protocolAndPorts.add(new SimpleEntry<>(SecurityProtocol.PLAINTEXT, port));
}
if (enableSsl || shouldEnable(interBrokerSecurityProtocol, SecurityProtocol.SSL)) {
protocolAndPorts.add(new SimpleEntry<>(SecurityProtocol.SSL, sslPort));
}
if (enableSaslPlaintext
|| shouldEnable(interBrokerSecurityProtocol, SecurityProtocol.SASL_PLAINTEXT)) {
protocolAndPorts.add(new SimpleEntry<>(SecurityProtocol.SASL_PLAINTEXT, saslPlaintextPort));
}
if (enableSaslSsl || shouldEnable(interBrokerSecurityProtocol, SecurityProtocol.SASL_SSL)) {
protocolAndPorts.add(new SimpleEntry<>(SecurityProtocol.SASL_SSL, saslSslPort));
}

String listeners =
protocolAndPorts.stream()
.map(a -> String.format("%s://localhost:%d", a.getKey().name, a.getValue()))
.collect(Collectors.joining(","));

Properties props = new Properties();
props.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, "true");
props.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true");
props.setProperty(
KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
String.valueOf(TimeUnit.MINUTES.toMillis(10)));
props.put(KRaftConfigs.NODE_ID_CONFIG, String.valueOf(nodeId));
props.put(ServerConfigs.BROKER_ID_CONFIG, String.valueOf(nodeId));
props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners);
props.put(SocketServerConfigs.LISTENERS_CONFIG, listeners);
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER");
props.put(
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
protocolAndPorts.stream()
.map(p -> String.format("%s:%s", p.getKey(), p.getKey()))
.collect(Collectors.joining(","))
+ ",CONTROLLER:PLAINTEXT");

if (logDirCount > 1) {
String logDirs =
IntStream.rangeClosed(1, logDirCount)
.mapToObj(
i -> {
// We would like to allow user to specify both relative path and absolute path
// as log directory for backward-compatibility reason
// We can verify this by using a mixture of relative path and absolute path as
// log directories in the test
return (i % 2 == 0)
? TestUtils.tempDir().getAbsolutePath()
: TestUtils.tempRelativeDir("data").getAbsolutePath();
})
.collect(Collectors.joining(","));
props.put(ServerLogConfigs.LOG_DIRS_CONFIG, logDirs);
} else {
props.put(ServerLogConfigs.LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath());
}
props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker");
// Note: this is just a placeholder value for controller.quorum.voters. JUnit
// tests use random port assignment, so the controller ports are not known ahead of
// time. Therefore, we ignore controller.quorum.voters and use
// controllerQuorumVotersFuture instead.
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1000@localhost:0");
props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, "1500");
props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, "1500");
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, enableControlledShutdown);
props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, enableDeleteTopic);
props.put(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000");
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
if (!props.containsKey(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG)) {
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5");
}
if (!props.containsKey(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)) {
props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0");
}
if (rack.isDefined()) {
props.put(ServerConfigs.BROKER_RACK_CONFIG, rack.get());
}
// Reduce number of threads per broker
props.put(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG, "2");
props.put(ServerConfigs.BACKGROUND_THREADS_CONFIG, "2");

if (protocolAndPorts.stream().anyMatch(p -> JaasTestUtils.usesSslTransportLayer(p.getKey()))) {
try {
props.putAll(
JaasTestUtils.sslConfigs(
ConnectionMode.SERVER,
false,
trustStoreFile.isEmpty() ? Optional.empty() : Optional.of(trustStoreFile.get()),
String.format("server%d", nodeId)));
} catch (Exception e) {
fail("Failed to create SSL configs", e);
}
}

if (protocolAndPorts.stream().anyMatch(p -> JaasTestUtils.usesSaslAuthentication(p.getKey()))) {
props.putAll(
JaasTestUtils.saslConfigs(
saslProperties.isEmpty() ? Optional.empty() : Optional.of(saslProperties.get())));
}

if (interBrokerSecurityProtocol.isDefined()) {
props.put(
ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG,
interBrokerSecurityProtocol.get().name);
}
if (enableToken) {
props.put(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG, "secretkey");
}

props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, String.valueOf(numPartitions));
props.put(
ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG,
String.valueOf(defaultReplicationFactor));

if (enableFetchFromFollower) {
props.put(ServerConfigs.BROKER_RACK_CONFIG, String.valueOf(nodeId));
props.put(
ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG,
"org.apache.kafka.common.replica.RackAwareReplicaSelector");
}
return props;
}

@AfterEach
public void tearDown() throws Exception {
log.info("Starting teardown of {}", getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand All @@ -49,6 +50,7 @@ public ConsumerGroupsResourceIntegrationTest() {
super(/* numBrokers= */ 1, /* withSchemaRegistry= */ false);
}

@Disabled("KNET-17421 to fix this")
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@ValueSource(strings = {"kraft"})
public void listConsumerGroups_returnsConsumerGroups(String quorum) {
Expand Down Expand Up @@ -105,6 +107,7 @@ public void listConsumerGroups_nonExistingCluster_returnsNotFound(String quorum)
assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
}

@Disabled("KNET-17421 to fix this")
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@ValueSource(strings = {"kraft"})
public void getConsumerGroup_returnsConsumerGroup(String quorum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.kafkarest.integration.ClusterTestHarness;
import io.confluent.kafkarest.testing.QuorumControllerFixture.DefaultTestInfo;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -106,7 +107,7 @@ public void beforeEach(ExtensionContext extensionContext) throws Exception {
private Properties getBrokerConfigs() {
checkState(logDir != null);
Properties properties =
TestUtils.createBrokerConfig(
ClusterTestHarness.createBrokerConfig(
brokerId,
false,
false,
Expand Down