Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT Extract an api interface from ClusterInstance #18697

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions checkstyle/import-control-test-common-internal-api.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
<!-- things from server-common -->
<allow pkg="org.apache.kafka.server.common" />

<!-- things from clients -->
<allow pkg="org.apache.kafka.clients" />

<allow pkg="java" />
<allow pkg="javax.security" />
<allow pkg="org.junit" />
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/java/kafka/admin/AdminFenceProducersTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.Cluster;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
Expand Down Expand Up @@ -56,9 +56,9 @@ public class AdminFenceProducersTest {
private static final String TXN_ID = "mytxnid";
private static final String INCORRECT_BROKER_PORT = "225";
private static final ProducerRecord<byte[], byte[]> RECORD = new ProducerRecord<>(TOPIC_NAME, null, new byte[1]);
private final ClusterInstance clusterInstance;
private final Cluster clusterInstance;

AdminFenceProducersTest(ClusterInstance clusterInstance) {
AdminFenceProducersTest(Cluster clusterInstance) {
this.clusterInstance = clusterInstance;
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/test/java/kafka/admin/ClientTelemetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.Cluster;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
Expand Down Expand Up @@ -73,7 +73,7 @@ public class ClientTelemetryTest {
serverProperties = {
@ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "kafka.admin.ClientTelemetryTest$GetIdClientTelemetry"),
})
public void testClientInstanceId(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
public void testClientInstanceId(Cluster clusterInstance) throws InterruptedException, ExecutionException {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
configs.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true);
Expand Down Expand Up @@ -125,7 +125,7 @@ public void testClientInstanceId(ClusterInstance clusterInstance) throws Interru
}

@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
public void testIntervalMsParser(ClusterInstance clusterInstance) {
public void testIntervalMsParser(Cluster clusterInstance) {
List<String> alterOpts = asList("--bootstrap-server", clusterInstance.bootstrapServers(),
"--alter", "--entity-type", "client-metrics", "--entity-name", "test", "--add-config", "interval.ms=bbb");
try (Admin client = clusterInstance.admin()) {
Expand All @@ -137,7 +137,7 @@ public void testIntervalMsParser(ClusterInstance clusterInstance) {
}

@ClusterTest(types = Type.KRAFT)
public void testMetrics(ClusterInstance clusterInstance) {
public void testMetrics(Cluster clusterInstance) {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
List<String> expectedMetricsName = Arrays.asList("request-size-max", "io-wait-ratio", "response-total",
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/java/kafka/admin/DeleteTopicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.Cluster;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
Expand Down Expand Up @@ -74,7 +75,7 @@ public class DeleteTopicTest {
private final Map<Integer, List<Integer>> expectedReplicaAssignment = Map.of(0, List.of(0, 1, 2));

@ClusterTest
public void testDeleteTopicWithAllAliveReplicas(ClusterInstance cluster) throws Exception {
public void testDeleteTopicWithAllAliveReplicas(Cluster cluster) throws Exception {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, expectedReplicaAssignment))).all().get();
admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
Expand Down Expand Up @@ -258,7 +259,7 @@ public void testDeleteTopicWithCleaner(ClusterInstance cluster) throws Exception
}

@ClusterTest
public void testDeleteTopicAlreadyMarkedAsDeleted(ClusterInstance cluster) throws Exception {
public void testDeleteTopicAlreadyMarkedAsDeleted(Cluster cluster) throws Exception {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, expectedReplicaAssignment))).all().get();
admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package kafka.admin;

import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.Cluster;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.NoRetryException;
Expand All @@ -38,9 +38,9 @@ public class UserScramCredentialsCommandTest {
private static final String USER1 = "user1";
private static final String USER2 = "user2";

private final ClusterInstance cluster;
private final Cluster cluster;

public UserScramCredentialsCommandTest(ClusterInstance cluster) {
public UserScramCredentialsCommandTest(Cluster cluster) {
this.cluster = cluster;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.Cluster;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
Expand Down Expand Up @@ -61,7 +61,7 @@ public class ConsumerIntegrationTest {
@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic")
})
})
public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance clusterInstance) throws Exception {
public void testAsyncConsumerWithOldGroupCoordinator(Cluster clusterInstance) throws Exception {
String topic = "test-topic";
clusterInstance.createTopic(topic, 1, (short) 1);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(Map.of(
Expand All @@ -86,7 +86,7 @@ public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance clusterInst
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
})
public void testFetchPartitionsAfterFailedListenerWithGroupProtocolClassic(ClusterInstance clusterInstance)
public void testFetchPartitionsAfterFailedListenerWithGroupProtocolClassic(Cluster clusterInstance)
throws InterruptedException {
testFetchPartitionsAfterFailedListener(clusterInstance, GroupProtocol.CLASSIC);
}
Expand All @@ -95,12 +95,12 @@ public void testFetchPartitionsAfterFailedListenerWithGroupProtocolClassic(Clust
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
})
public void testFetchPartitionsAfterFailedListenerWithGroupProtocolConsumer(ClusterInstance clusterInstance)
public void testFetchPartitionsAfterFailedListenerWithGroupProtocolConsumer(Cluster clusterInstance)
throws InterruptedException {
testFetchPartitionsAfterFailedListener(clusterInstance, GroupProtocol.CONSUMER);
}

private static void testFetchPartitionsAfterFailedListener(ClusterInstance clusterInstance, GroupProtocol groupProtocol)
private static void testFetchPartitionsAfterFailedListener(Cluster clusterInstance, GroupProtocol groupProtocol)
throws InterruptedException {
var topic = "topic";
try (var producer = clusterInstance.producer(Map.of(
Expand Down Expand Up @@ -134,7 +134,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
})
public void testFetchPartitionsWithAlwaysFailedListenerWithGroupProtocolClassic(ClusterInstance clusterInstance)
public void testFetchPartitionsWithAlwaysFailedListenerWithGroupProtocolClassic(Cluster clusterInstance)
throws InterruptedException {
testFetchPartitionsWithAlwaysFailedListener(clusterInstance, GroupProtocol.CLASSIC);
}
Expand All @@ -143,12 +143,12 @@ public void testFetchPartitionsWithAlwaysFailedListenerWithGroupProtocolClassic(
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
})
public void testFetchPartitionsWithAlwaysFailedListenerWithGroupProtocolConsumer(ClusterInstance clusterInstance)
public void testFetchPartitionsWithAlwaysFailedListenerWithGroupProtocolConsumer(Cluster clusterInstance)
throws InterruptedException {
testFetchPartitionsWithAlwaysFailedListener(clusterInstance, GroupProtocol.CONSUMER);
}

private static void testFetchPartitionsWithAlwaysFailedListener(ClusterInstance clusterInstance, GroupProtocol groupProtocol)
private static void testFetchPartitionsWithAlwaysFailedListener(Cluster clusterInstance, GroupProtocol groupProtocol)
throws InterruptedException {
var topic = "topic";
try (var producer = clusterInstance.producer(Map.of(
Expand Down Expand Up @@ -188,7 +188,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}

@ClusterTest(types = {Type.KRAFT}, brokers = 3)
public void testLeaderEpoch(ClusterInstance clusterInstance) throws Exception {
public void testLeaderEpoch(Cluster clusterInstance) throws Exception {
String topic = "test-topic";
clusterInstance.createTopic(topic, 1, (short) 2);
var msgNum = 10;
Expand Down Expand Up @@ -227,7 +227,7 @@ public void testLeaderEpoch(ClusterInstance clusterInstance) throws Exception {
}
}

private void sendMsg(ClusterInstance clusterInstance, String topic, int sendMsgNum) {
private void sendMsg(Cluster clusterInstance, String topic, int sendMsgNum) {
try (var producer = clusterInstance.producer(Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.Cluster;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
Expand Down Expand Up @@ -88,14 +89,14 @@
@Timeout(120)
@ClusterTestDefaults(types = {Type.KRAFT})
public class BootstrapControllersIntegrationTest {
private Map<String, Object> adminConfig(ClusterInstance clusterInstance, boolean usingBootstrapControllers) {
private Map<String, Object> adminConfig(Cluster clusterInstance, boolean usingBootstrapControllers) {
return usingBootstrapControllers ?
Collections.singletonMap(BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers()) :
Collections.singletonMap(BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
}

@ClusterTest
public void testPutBrokersInBootstrapControllersConfig(ClusterInstance clusterInstance) {
public void testPutBrokersInBootstrapControllersConfig(Cluster clusterInstance) {
Map<String, Object> config = Collections.singletonMap(BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapServers());
try (Admin admin = Admin.create(config)) {
ExecutionException exception = assertThrows(ExecutionException.class,
Expand All @@ -108,7 +109,7 @@ public void testPutBrokersInBootstrapControllersConfig(ClusterInstance clusterIn
}

@ClusterTest
public void testPutControllersInBootstrapBrokersConfig(ClusterInstance clusterInstance) {
public void testPutControllersInBootstrapBrokersConfig(Cluster clusterInstance) {
Map<String, Object> config = Collections.singletonMap(BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapControllers());
try (Admin admin = Admin.create(config)) {
ExecutionException exception = assertThrows(ExecutionException.class,
Expand All @@ -120,16 +121,16 @@ public void testPutControllersInBootstrapBrokersConfig(ClusterInstance clusterIn
}

@ClusterTest
public void testDescribeClusterByControllers(ClusterInstance clusterInstance) throws Exception {
public void testDescribeClusterByControllers(Cluster clusterInstance) throws Exception {
testDescribeCluster(clusterInstance, true);
}

@ClusterTest
public void testDescribeCluster(ClusterInstance clusterInstance) throws Exception {
public void testDescribeCluster(Cluster clusterInstance) throws Exception {
testDescribeCluster(clusterInstance, false);
}

private void testDescribeCluster(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception {
private void testDescribeCluster(Cluster clusterInstance, boolean usingBootstrapControllers) throws Exception {
try (Admin admin = Admin.create(adminConfig(clusterInstance, usingBootstrapControllers))) {
DescribeClusterResult result = admin.describeCluster();
assertEquals(clusterInstance.clusterId(), result.clusterId().get(1, TimeUnit.MINUTES));
Expand All @@ -140,16 +141,16 @@ private void testDescribeCluster(ClusterInstance clusterInstance, boolean usingB
}

@ClusterTest
public void testDescribeFeaturesByControllers(ClusterInstance clusterInstance) throws Exception {
public void testDescribeFeaturesByControllers(Cluster clusterInstance) throws Exception {
testDescribeFeatures(clusterInstance, true);
}

@ClusterTest
public void testDescribeFeatures(ClusterInstance clusterInstance) throws Exception {
public void testDescribeFeatures(Cluster clusterInstance) throws Exception {
testDescribeFeatures(clusterInstance, false);
}

private void testDescribeFeatures(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception {
private void testDescribeFeatures(Cluster clusterInstance, boolean usingBootstrapControllers) throws Exception {
try (Admin admin = Admin.create(adminConfig(clusterInstance, usingBootstrapControllers))) {
DescribeFeaturesResult result = admin.describeFeatures();
short metadataVersion = clusterInstance.config().metadataVersion().featureLevel();
Expand All @@ -160,16 +161,16 @@ private void testDescribeFeatures(ClusterInstance clusterInstance, boolean using
}

@ClusterTest
public void testUpdateFeaturesByControllers(ClusterInstance clusterInstance) {
public void testUpdateFeaturesByControllers(Cluster clusterInstance) {
testUpdateFeatures(clusterInstance, true);
}

@ClusterTest
public void testUpdateFeatures(ClusterInstance clusterInstance) {
public void testUpdateFeatures(Cluster clusterInstance) {
testUpdateFeatures(clusterInstance, false);
}

private void testUpdateFeatures(ClusterInstance clusterInstance, boolean usingBootstrapControllers) {
private void testUpdateFeatures(Cluster clusterInstance, boolean usingBootstrapControllers) {
try (Admin admin = Admin.create(adminConfig(clusterInstance, usingBootstrapControllers))) {
UpdateFeaturesResult result = admin.updateFeatures(Collections.singletonMap("foo.bar.feature",
new FeatureUpdate((short) 1, FeatureUpdate.UpgradeType.UPGRADE)),
Expand All @@ -186,16 +187,16 @@ private void testUpdateFeatures(ClusterInstance clusterInstance, boolean usingBo
}

@ClusterTest
public void testDescribeMetadataQuorumByControllers(ClusterInstance clusterInstance) throws Exception {
public void testDescribeMetadataQuorumByControllers(Cluster clusterInstance) throws Exception {
testDescribeMetadataQuorum(clusterInstance, true);
}

@ClusterTest
public void testDescribeMetadataQuorum(ClusterInstance clusterInstance) throws Exception {
public void testDescribeMetadataQuorum(Cluster clusterInstance) throws Exception {
testDescribeMetadataQuorum(clusterInstance, false);
}

private void testDescribeMetadataQuorum(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception {
private void testDescribeMetadataQuorum(Cluster clusterInstance, boolean usingBootstrapControllers) throws Exception {
try (Admin admin = Admin.create(adminConfig(clusterInstance, usingBootstrapControllers))) {
DescribeMetadataQuorumResult result = admin.describeMetadataQuorum();
assertTrue(clusterInstance.controllerIds().contains(
Expand All @@ -204,7 +205,7 @@ private void testDescribeMetadataQuorum(ClusterInstance clusterInstance, boolean
}

@ClusterTest
public void testUsingBootstrapControllersOnUnsupportedAdminApi(ClusterInstance clusterInstance) {
public void testUsingBootstrapControllersOnUnsupportedAdminApi(Cluster clusterInstance) {
try (Admin admin = Admin.create(adminConfig(clusterInstance, true))) {
ListOffsetsResult result = admin.listOffsets(Collections.singletonMap(
new TopicPartition("foo", 0), OffsetSpec.earliest()));
Expand Down Expand Up @@ -257,7 +258,7 @@ private void testIncrementalAlterConfigs(ClusterInstance clusterInstance, boolea
}

@ClusterTest(brokers = 3)
public void testAlterReassignmentsWithBootstrapControllers(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
public void testAlterReassignmentsWithBootstrapControllers(Cluster clusterInstance) throws ExecutionException, InterruptedException {
String topicName = "foo";
try (Admin admin = Admin.create(adminConfig(clusterInstance, false))) {
Map<Integer, List<Integer>> assignments = new HashMap<>();
Expand Down
Loading
Loading