Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist demoted and removed brokers #2115

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions config/cruisecontrol.properties
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,52 @@ num.concurrent.leader.movements=1000
execution.progress.check.interval.ms=10000


# Configurations for persistent data
# =======================================

# The method to use to store persisted data. This is the first "persisted.data" config to set which
# will determine which other configs of the series should be configured. The available options are:
# "kafka",
# "memory"
# The default is "memory", which doesn't durably persist any runtime data.
#persisted.data.persist.method=kafka

# The name of the kafka topic to use to persist data when "persisted.data.persist.method" is set to
# "kafka". If the topic is not present, then it will be created.
#persisted.data.kafka.topic.name=__CruiseControlPersistentData

# The number of partitions to ensure are present for the kafka topic. Only applies when
# "persisted.data.persist.method" is set to "kafka". If the topic has fewer than this number of
# partitions, then partitions will be added.
#persisted.data.kafka.topic.partition.count=2

# The replication factor to use for the kafka topic. Only applies when
# "persisted.data.persist.method" is set to "kafka". Multiple partition replicas are desirable to
# ensure the topic is reasonably available.
#persisted.data.kafka.topic.replication.factor=2

# The configs to apply to the kafka topic used to persist Cruise Control data. Only applies if
# "persisted.data.persist.method" is set to "kafka". This "list" should be a semicolon separated
# string of 'key=value' pairs. The keys and values need to be valid Kafka Topic configs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is unnecessary to separate them with semicolon and prefixing should be enough. My problem with semicolon separation is that with many enough configs it makes the list a little bit hard to interpret.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I can switch the semicolons to commas and provide an example that demonstrates tidy formatting. In case I've misunderstood your request, the reason for the separator character is that I want to provide a mechanism for users to add any number of key=value pairs to the list rather than mandating which configs are permitted.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I can't switch the semicolons to commas because individual values can have commas. I can make sure the docs make it clear that readable formatting is easy and desirable.

# See: https://kafka.apache.org/documentation/#topicconfigs
# e.g. persisted.data.kafka.topic.additional.configs.map=max.compaction.lag.ms=600000;min.insync.replicas=2
#persisted.data.kafka.topic.additional.configs.map=

# The additional configs to use when creating the kafka producer to persist Cruise Control data.
# Only applies if "persisted.data.persist.method" is set to "kafka". This "list" should be a
# semicolon separated string of 'key=value' pairs. The keys and values need to be valid Kafka
# Producer configs. See: https://kafka.apache.org/documentation/#producerconfigs
# e.g. persisted.data.kafka.producer.additional.configs.map=buffer.memory=100000000;batch.size=900000
#persisted.data.kafka.producer.additional.configs.map=

# The additional configs to use when creating the kafka consumer to read persisted Cruise Control
# data. Only applies if "persisted.data.persist.method" is set to "kafka". This "list" should be a
# semicolon separated string of 'key=value' pairs. The keys and values need to be valid Kafka
# Consumer configs. See: https://kafka.apache.org/documentation/#consumerconfigs
# e.g. persisted.data.kafka.consumer.additional.configs.map=group.id=my-group-id;fetch.max.bytes=2000000
#persisted.data.kafka.consumer.additional.configs.map=


# Configurations for anomaly detector
# =======================================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import com.linkedin.kafka.cruisecontrol.persisteddata.PersistedMapFactory;
import com.linkedin.kafka.cruisecontrol.persisteddata.namespace.ExecutorPersistedData;
import com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager;
import com.linkedin.kafka.cruisecontrol.servlet.response.stats.BrokerStats;
import java.io.InputStream;
Expand Down Expand Up @@ -82,6 +84,7 @@ public class KafkaCruiseControl {
private final GoalOptimizer _goalOptimizer;
private final ExecutorService _goalOptimizerExecutor;
private final Executor _executor;
private final PersistedMapFactory _persistedMapFactory;
private final AnomalyDetectorManager _anomalyDetectorManager;
private final Time _time;
private final AdminClient _adminClient;
Expand Down Expand Up @@ -122,7 +125,9 @@ public KafkaCruiseControl(KafkaCruiseControlConfig config, MetricRegistry dropwi
Provisioner.class,
Collections.singletonMap(KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, this));
_anomalyDetectorManager = new AnomalyDetectorManager(this, _time, dropwizardMetricRegistry);
_executor = new Executor(config, _time, dropwizardMetricRegistry, _anomalyDetectorManager);
_persistedMapFactory = new PersistedMapFactory(config, _adminClient);
_executor = new Executor(config, _time, dropwizardMetricRegistry, _anomalyDetectorManager,
new ExecutorPersistedData(_persistedMapFactory.instance()));
_loadMonitor = new LoadMonitor(config, _time, dropwizardMetricRegistry, KafkaMetricDef.commonMetricDef());
_goalOptimizerExecutor = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("GoalOptimizerExecutor"));
_goalOptimizer = new GoalOptimizer(config, _loadMonitor, _time, dropwizardMetricRegistry, _executor, _adminClient);
Expand All @@ -138,11 +143,13 @@ public KafkaCruiseControl(KafkaCruiseControlConfig config, MetricRegistry dropwi
LoadMonitor loadMonitor,
ExecutorService goalOptimizerExecutor,
GoalOptimizer goalOptimizer,
PersistedMapFactory persistedMapFactory,
Provisioner provisioner) {
_config = config;
_time = time;
_adminClient = createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config));
_anomalyDetectorManager = anomalyDetectorManager;
_persistedMapFactory = persistedMapFactory;
_executor = executor;
_loadMonitor = loadMonitor;
_goalOptimizerExecutor = goalOptimizerExecutor;
Expand Down Expand Up @@ -468,9 +475,9 @@ public boolean dropRecentBrokers(Set<Integer> brokersToDrop, boolean isRemoved)
*/
public void addRecentBrokersPermanently(Set<Integer> brokersToAdd, boolean isRemoved) {
if (isRemoved) {
_executor.addRecentlyRemovedBrokers(brokersToAdd);
_executor.addRecentlyRemovedBrokersPermanently(brokersToAdd);
} else {
_executor.addRecentlyDemotedBrokers(brokersToAdd);
_executor.addRecentlyDemotedBrokersPermanently(brokersToAdd);
}
}

Expand Down
Loading