Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Kafka to 3.8.0 #2180

Merged
merged 6 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ project(':cruise-control') {
implementation "io.netty:netty-handler:${nettyVersion}"
implementation "io.netty:netty-transport-native-epoll:${nettyVersion}"
api "org.apache.kafka:kafka_$scalaBinaryVersion:$kafkaVersion"
api "org.apache.kafka:kafka-server:$kafkaVersion"
api "org.apache.kafka:kafka-server-common:$kafkaVersion"
api "org.apache.kafka:kafka-clients:$kafkaVersion"
// Add following dependency when upgrading to Kafka 3.5
api "org.apache.kafka:kafka-storage:$kafkaVersion"
Expand Down Expand Up @@ -446,6 +448,7 @@ project(':cruise-control-metrics-reporter') {
implementation "com.yammer.metrics:metrics-core:2.2.0"
implementation "org.apache.logging.log4j:log4j-slf4j-impl:2.17.2"
implementation "org.apache.kafka:kafka_$scalaBinaryVersion:$kafkaVersion"
implementation "org.apache.kafka:kafka-server:$kafkaVersion"
akatona84 marked this conversation as resolved.
Show resolved Hide resolved
implementation "org.apache.kafka:kafka-clients:$kafkaVersion"
implementation 'com.google.code.findbugs:jsr305:3.0.2'
// Temporary pin for vulnerability
Expand All @@ -456,7 +459,9 @@ project(':cruise-control-metrics-reporter') {
testImplementation 'org.powermock:powermock-module-junit4:2.0.9'
testImplementation 'org.powermock:powermock-api-easymock:2.0.9'
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion:test"
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-server-common:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-group-coordinator:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation "org.apache.zookeeper:zookeeper:${zookeeperVersion}"
testOutput sourceSets.test.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
Expand All @@ -16,6 +15,10 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -79,7 +82,7 @@ public Properties overridingProps() {
Properties props = new Properties();
int port = CCKafkaTestUtils.findLocalPort();
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
props.setProperty(KafkaConfig.ListenersProp(), "PLAINTEXT://127.0.0.1:" + port);
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:" + port);
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
"127.0.0.1:" + port);
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
Expand All @@ -91,11 +94,11 @@ public Properties overridingProps() {
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "1");
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
// disable topic auto-creation to leave the metrics reporter to create the metrics topic
props.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
props.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
props.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
props.setProperty(KafkaConfig.DefaultReplicationFactorProp(), "2");
props.setProperty(KafkaConfig.NumPartitionsProp(), "2");
props.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false");
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
props.setProperty(ServerLogConfigs.NUM_PARTITIONS_CONFIG, "2");
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
import java.io.IOException;
import java.util.Properties;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.Assert;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG;
Expand Down Expand Up @@ -45,14 +48,14 @@ public Properties overridingProps() {
}
}
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
props.setProperty(KafkaConfig.ListenersProp(), "SSL://127.0.0.1:" + port);
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://127.0.0.1:" + port);
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), "127.0.0.1:" + port);
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG), SecurityProtocol.SSL.name);
props.setProperty(CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
props.setProperty(CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
props.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
props.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
props.setProperty(KafkaConfig.DefaultReplicationFactorProp(), "2");
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
Expand All @@ -35,6 +34,10 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -86,13 +89,13 @@ public Properties overridingProps() {
Properties props = new Properties();
int port = CCKafkaTestUtils.findLocalPort();
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
props.setProperty(KafkaConfig.ListenersProp(), "PLAINTEXT://" + HOST + ":" + port);
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://" + HOST + ":" + port);
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), HOST + ":" + port);
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
props.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
props.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
props.setProperty(KafkaConfig.DefaultReplicationFactorProp(), "2");
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
return props;
}

Expand Down Expand Up @@ -210,7 +213,8 @@ public void testUpdatingMetricsTopicConfig() throws ExecutionException, Interrup
public void testGetKafkaBootstrapServersConfigure() {
// Test with a "listeners" config with a host
Map<Object, Object> brokerConfig = buildBrokerConfigs().get(0);
Map<String, Object> listenersMap = Collections.singletonMap(KafkaConfig.ListenersProp(), brokerConfig.get(KafkaConfig.ListenersProp()));
Map<String, Object> listenersMap = Collections.singletonMap(
SocketServerConfigs.LISTENERS_CONFIG, brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG));
String bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
String urlParse = "\\[?([0-9a-zA-Z\\-%._:]*)]?:(-?[0-9]+)";
Pattern urlParsePattern = Pattern.compile(urlParse);
Expand All @@ -219,15 +223,15 @@ public void testGetKafkaBootstrapServersConfigure() {

// Test with a "listeners" config without a host in the first listener.
String listeners = "SSL://:1234,PLAINTEXT://myhost:4321";
listenersMap = Collections.singletonMap(KafkaConfig.ListenersProp(), listeners);
listenersMap = Collections.singletonMap(SocketServerConfigs.LISTENERS_CONFIG, listeners);
bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
assertTrue(urlParsePattern.matcher(bootstrapServers).matches());
assertEquals(DEFAULT_BOOTSTRAP_SERVERS_HOST, bootstrapServers.split(":")[0]);
assertEquals("1234", bootstrapServers.split(":")[1]);

// Test with "listeners" and "port" config together.
listenersMap = new HashMap<>();
listenersMap.put(KafkaConfig.ListenersProp(), listeners);
listenersMap.put(SocketServerConfigs.LISTENERS_CONFIG, listeners);
listenersMap.put("port", "43");
bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
assertTrue(urlParsePattern.matcher(bootstrapServers).matches());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
Expand Down Expand Up @@ -94,11 +97,11 @@ private static KafkaServer createKafkaServer(KafkaConfig kafkaConfig) throws Cla
}

private void parseConfigs(Map<Object, Object> config) {
_id = Integer.parseInt((String) config.get(KafkaConfig.BrokerIdProp()));
_logDir = new File((String) config.get(KafkaConfig.LogDirProp()));
_id = Integer.parseInt((String) config.get(ServerConfigs.BROKER_ID_CONFIG));
_logDir = new File((String) config.get(ServerLogConfigs.LOG_DIR_CONFIG));

// Bind addresses
String listenersString = (String) config.get(KafkaConfig.ListenersProp());
String listenersString = (String) config.get(SocketServerConfigs.LISTENERS_CONFIG);
for (String protocolAddr : listenersString.split("\\s*,\\s*")) {
try {
URI uri = new URI(protocolAddr.trim());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.test.TestSslUtils;


Expand Down Expand Up @@ -257,27 +264,27 @@ public Map<Object, Object> buildConfig() {
if (_sslPort >= 0) {
csvJoiner.add(SecurityProtocol.SSL.name + "://localhost:" + _sslPort);
}
props.put(KafkaConfig.BrokerIdProp(), Integer.toString(_nodeId));
props.put(KafkaConfig.ListenersProp(), csvJoiner.toString());
props.put(KafkaConfig.LogDirProp(), _logDirectory.getAbsolutePath());
props.put(KafkaConfig.ZkConnectProp(), _zkConnect);
props.put(KafkaConfig.ReplicaSocketTimeoutMsProp(), Long.toString(_socketTimeoutMs));
props.put(KafkaConfig.ControllerSocketTimeoutMsProp(), Long.toString(_socketTimeoutMs));
props.put(KafkaConfig.ControlledShutdownEnableProp(), Boolean.toString(_enableControlledShutdown));
props.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.toString(_enableDeleteTopic));
props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), Long.toString(_controlledShutdownRetryBackoff));
props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp(), Long.toString(_logCleanerDedupBufferSize));
props.put(KafkaConfig.LogCleanerEnableProp(), Boolean.toString(_enableLogCleaner));
props.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
props.put(KafkaConfig.SslEndpointIdentificationAlgorithmProp(), "");
props.put(ServerConfigs.BROKER_ID_CONFIG, Integer.toString(_nodeId));
props.put(SocketServerConfigs.LISTENERS_CONFIG, csvJoiner.toString());
props.put(ServerLogConfigs.LOG_DIR_CONFIG, _logDirectory.getAbsolutePath());
props.put(ZkConfigs.ZK_CONNECT_CONFIG, _zkConnect);
props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, Long.toString(_socketTimeoutMs));
props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, Long.toString(_socketTimeoutMs));
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, Boolean.toString(_enableControlledShutdown));
props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, Boolean.toString(_enableDeleteTopic));
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS_CONFIG, Long.toString(_controlledShutdownRetryBackoff));
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, Long.toString(_logCleanerDedupBufferSize));
props.put(CleanerConfig.LOG_CLEANER_ENABLE_PROP, Boolean.toString(_enableLogCleaner));
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
if (_rack != null) {
props.put(KafkaConfig.RackProp(), _rack);
props.put(ServerConfigs.BROKER_RACK_CONFIG, _rack);
}
if (_trustStore != null || _sslPort > 0) {
try {
props.putAll(TestSslUtils.createSslConfig(false, true, Mode.SERVER, _trustStore, "server" + _nodeId));
// Switch interbroker to ssl
props.put(KafkaConfig.InterBrokerSecurityProtocolProp(), SecurityProtocol.SSL.name);
props.put(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
} catch (Exception e) {
throw new IllegalStateException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

import java.io.File;
import java.util.Properties;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
Expand Down Expand Up @@ -56,7 +56,7 @@ protected void setSecurityConfigs(Properties clientProps, String certAlias) {
throw new AssertionError("ssl set but no trust store provided");
}
clientProps.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.name);
clientProps.setProperty(KafkaConfig.SslEndpointIdentificationAlgorithmProp(), "");
clientProps.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
try {
clientProps.putAll(TestSslUtils.createSslConfig(true, true, Mode.CLIENT, trustStoreFile, certAlias));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import com.linkedin.kafka.cruisecontrol.detector.AnomalyState;
import com.linkedin.kafka.cruisecontrol.detector.TopicReplicationFactorAnomalyFinder;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import kafka.server.KafkaConfig;
import net.minidev.json.JSONArray;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -97,7 +97,7 @@ public Map<Object, Object> overridingProps() {
Map<Object, Object> props = KafkaCruiseControlIntegrationTestUtils.createBrokerProps();
Entry<File, File> logFolders = Map.entry(CCKafkaTestUtils.newTempDir(), CCKafkaTestUtils.newTempDir());
_brokerLogDirs.add(logFolders);
props.put(KafkaConfig.LogDirsProp(), logFolders.getKey().getAbsolutePath() + "," + logFolders.getValue().getAbsolutePath());
props.put(ServerLogConfigs.LOG_DIR_CONFIG, logFolders.getKey().getAbsolutePath() + "," + logFolders.getValue().getAbsolutePath());
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore;
import kafka.server.KafkaConfig;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
Expand All @@ -45,6 +44,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.network.SocketServerConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -168,7 +168,7 @@ public static Map<Object, Object> createBrokerProps() {
StringJoiner csvJoiner = new StringJoiner(",");
csvJoiner.add(SecurityProtocol.PLAINTEXT.name + "://localhost:"
+ KafkaCruiseControlIntegrationTestUtils.findRandomOpenPortOnAllLocalInterfaces());
props.put(KafkaConfig.ListenersProp(), csvJoiner.toString());
props.put(SocketServerConfigs.LISTENERS_CONFIG, csvJoiner.toString());
props.put(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG, "true");
props.put(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "2");
props.put(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "1");
Expand Down
Loading