Skip to content

Commit

Permalink
Prometheus configurable queries
Browse files Browse the repository at this point in the history
  allow externalized prometheus queries in a properties file

This resolves linkedin#1540
  • Loading branch information
g.chouet committed Feb 23, 2024
1 parent dcdb4b1 commit 7417b28
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 7 deletions.
10 changes: 10 additions & 0 deletions config/prometheus-query.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# This file contains all the custom prometheus queries
# This is an example property file for Kafka Cruise Control configurable prometheus queries.


# =======================================
# This must define all the queries for the different RawMetricType needed.
# based on the schema <RAW_METRIC_TYPE>=query

BROKER_CPU_UTIL=1 - avg by (instance) (irate(node_cpu_seconds_total{mode="idle"}[2m]))
# ..
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus;

import com.linkedin.cruisecontrol.common.CruiseControlConfigurable;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;
import static com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.PrometheusMetricSampler.PROMETHEUS_QUERY_FILE_CONFIG;

/**
* Configurable prometheus query supplier. This needs a configuration file to specify the different
* prometheus metrics.
* <p>
* See {@link PrometheusQuerySupplier}
*/
public class ConfigurablePrometheusQuerySupplier implements CruiseControlConfigurable, PrometheusQuerySupplier {
private static final Map<RawMetricType, String> TYPE_TO_QUERY = new HashMap<>();

@Override
public Map<RawMetricType, String> get() {
return TYPE_TO_QUERY;
}

@Override
public void configure(Map<String, ?> configs) {
String configFileName = validateNotNull((String) configs.get(PROMETHEUS_QUERY_FILE_CONFIG),
"Prometheus configuration file is missing.");

internalParse(configFileName);
}

/**
* Parse the config file to fill in the map
* @param configFileName the name of the input config file
*/
private void internalParse(String configFileName) {
Properties props = new Properties();
try (InputStream propStream = new FileInputStream(configFileName)) {
props.load(propStream);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}

// load each entry of the properties into the internal map
props.forEach((key, value) -> loadEntry((String) key, (String) value));
}

private void loadEntry(String rawMetricTypeName, String query) {
// will throw a IllegalArgumentException if the name is unknown in the RawMetricType enum
TYPE_TO_QUERY.put(RawMetricType.valueOf(rawMetricTypeName), query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ public class PrometheusMetricSampler extends AbstractMetricSampler {

// Config name visible to tests
static final String PROMETHEUS_QUERY_SUPPLIER_CONFIG = "prometheus.query.supplier";
static final String PROMETHEUS_QUERY_FILE_CONFIG = "prometheus.query.file";

private static final Class<?> DEFAULT_PROMETHEUS_QUERY_SUPPLIER = DefaultPrometheusQuerySupplier.class;
private static final Class<?> PROMETHEUS_QUERY_FILE_SUPPLIER = ConfigurablePrometheusQuerySupplier.class;

private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricSampler.class);

Expand Down Expand Up @@ -120,15 +123,21 @@ private void configurePrometheusAdapter(Map<String, ?> configs) {
}

private void configureQueryMap(Map<String, ?> configs) {
String prometheusQueryFileName = (String) configs.get(PROMETHEUS_QUERY_FILE_CONFIG);
String prometheusQuerySupplierClassName = (String) configs.get(PROMETHEUS_QUERY_SUPPLIER_CONFIG);
Class<?> prometheusQuerySupplierClass = DEFAULT_PROMETHEUS_QUERY_SUPPLIER;
if (prometheusQuerySupplierClassName != null) {
prometheusQuerySupplierClass = (Class<?>) ConfigDef.parseType(PROMETHEUS_QUERY_SUPPLIER_CONFIG,
prometheusQuerySupplierClassName, CLASS);
if (!PrometheusQuerySupplier.class.isAssignableFrom(prometheusQuerySupplierClass)) {
throw new ConfigException(String.format(
"Invalid %s is provided to prometheus metric sampler, provided %s",
PROMETHEUS_QUERY_SUPPLIER_CONFIG, prometheusQuerySupplierClass));
// prometheus configuration file is first over other custom or default query supplier class
if (null != prometheusQueryFileName) {
prometheusQuerySupplierClass = PROMETHEUS_QUERY_FILE_SUPPLIER;
} else {
if (prometheusQuerySupplierClassName != null) {
prometheusQuerySupplierClass = (Class<?>) ConfigDef.parseType(PROMETHEUS_QUERY_SUPPLIER_CONFIG,
prometheusQuerySupplierClassName, CLASS);
if (!PrometheusQuerySupplier.class.isAssignableFrom(prometheusQuerySupplierClass)) {
throw new ConfigException(String.format(
"Invalid %s is provided to prometheus metric sampler, provided %s",
PROMETHEUS_QUERY_SUPPLIER_CONFIG, prometheusQuerySupplierClass));
}
}
}
PrometheusQuerySupplier prometheusQuerySupplier = KafkaCruiseControlConfigUtils.getConfiguredInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,44 @@ public void testGetSamplesPrometheusQuerySupplierInvalidClass() throws Exception
_prometheusMetricSampler.configure(config);
}

@Test
public void testGetSamplesCustomPrometheusQueryFile() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(PROMETHEUS_SERVER_ENDPOINT_CONFIG, "http://kafka-cluster-1.org:9090");
addCapacityConfig(config);
config.put(PROMETHEUS_QUERY_FILE_CONFIG, this.getClass().getClassLoader().getResource("prometheusQueriesTest.properties").getFile());
_prometheusMetricSampler.configure(config);

MetricSamplerOptions metricSamplerOptions = buildMetricSamplerOptions(TEST_TOPIC);
_prometheusMetricSampler._prometheusAdapter = _prometheusAdapter;

expect(_prometheusAdapter.queryMetric(eq(TestQuerySupplier.TEST_QUERY), anyLong(), anyLong()))
.andReturn(buildBrokerResults());
replay(_prometheusAdapter);

_prometheusMetricSampler.getSamples(metricSamplerOptions);

verify(_prometheusAdapter);
}

@Test(expected = IllegalArgumentException.class)
public void testGetSamplesCustomPrometheusQueryFileNotFoundFile() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(PROMETHEUS_SERVER_ENDPOINT_CONFIG, "http://kafka-cluster-1.org:9090");
addCapacityConfig(config);
config.put(PROMETHEUS_QUERY_FILE_CONFIG, "/a/b/file.properties");
_prometheusMetricSampler.configure(config);
}

@Test(expected = IllegalArgumentException.class)
public void testGetSamplesCustomPrometheusQueryFileUnknownMetricType() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(PROMETHEUS_SERVER_ENDPOINT_CONFIG, "http://kafka-cluster-1.org:9090");
addCapacityConfig(config);
config.put(PROMETHEUS_QUERY_FILE_CONFIG, this.getClass().getClassLoader().getResource("prometheusQueriesTestFailing.properties").getFile());
_prometheusMetricSampler.configure(config);
}

private static MetricSamplerOptions buildMetricSamplerOptions(String topic) {

return new MetricSamplerOptions(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALL_TOPIC_BYTES_IN=test_query
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALL_TOPIC_BYTES_IN=test_query
Unknown="I don't know"
1 change: 1 addition & 0 deletions docs/wiki/User Guide/Configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ We are still trying to improve cruise control. And following are some configurat
| prometheus.server.endpoint | String | Y | | The HTTP endpoint of the Prometheus server which is to be used as a source for sampling metrics. |
| prometheus.query.resolution.step.ms | Integer | N | 60,000 | The resolution of the Prometheus query made to the server. If this is set to 30 seconds for a 2 minutes query interval, the query returns with 4 values, which are then aggregated into the metric sample. |
| prometheus.query.supplier | Class | N | com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.DefaultPrometheusQuerySupplier | The class that supplies the Prometheus queries corresponding to Kafka raw metrics. If there are no customizations done when configuring Prometheus node exporter, the default class should work fine. |
| prometheus.query.file | String | N | | The configuration file supplying the custom Prometheus queries corresponding to Kafka raw metrics. Takes precedence over prometheus.query.supplier. |
| prometheus.broker.metrics.scraping.frequency.seconds | Integer | N | 60 | The scraping frequency with which Prometheus scrapes metrics from Kafka brokers. This value is used by DefaultPrometheusQuerySupplier to construct the iRate query that is used to get broker cpu metrics. |
### KafkaSampleStore configurations
| Name | Type | Required? | Default Value | Description |
Expand Down

0 comments on commit 7417b28

Please sign in to comment.