Skip to content

Commit

Permalink
Allow more control over Prometheus metrics collection
Browse files Browse the repository at this point in the history
  • Loading branch information
mateka committed Jan 13, 2025
1 parent b2b3b58 commit 6cba1e0
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 13 deletions.
12 changes: 12 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,18 @@ metric_namespace
Optional prefix to add to the beginning of every metric sent to Datadog.
Default value is "luigi".

[prometheus]
---------

use_task_family
Should task family be used as a prometheus bucket label.
Default value is true.

task_parameters
List of task arguments' names used as additional prometheus bucket labels.
Passed in a form of a json list.


Per Task Retry-Policy
---------------------

Expand Down
44 changes: 31 additions & 13 deletions luigi/contrib/prometheus_metric.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,81 @@
from prometheus_client import CollectorRegistry, Counter, Gauge, generate_latest, CONTENT_TYPE_LATEST
from luigi import parameter
from luigi.metrics import MetricsCollector
from luigi.task import Config

class prometheus(Config):
use_task_family = parameter.BoolParameter(default=True, parsing=parameter.BoolParameter.EXPLICIT_PARSING)
task_parameters = parameter.ListParameter(default=[])


class PrometheusMetricsCollector(MetricsCollector):

def _generate_task_labels(self, task):
return {
label: task.family if label == "family" else task.params.get(label)
for label in self.labels
}

def __init__(self):
super(PrometheusMetricsCollector, self).__init__()
self.registry = CollectorRegistry()
config = prometheus()
self.labels = list(config.task_parameters)
if config.use_task_family:
self.labels += ["family"]
if not self.labels:
raise ValueError("Prometheus labels cannot be empty (see prometheus configuration)")
self.task_started_counter = Counter(
'luigi_task_started_total',
'number of started luigi tasks',
['family'],
self.labels,
registry=self.registry
)
self.task_failed_counter = Counter(
'luigi_task_failed_total',
'number of failed luigi tasks',
['family'],
self.labels,
registry=self.registry
)
self.task_disabled_counter = Counter(
'luigi_task_disabled_total',
'number of disabled luigi tasks',
['family'],
self.labels,
registry=self.registry
)
self.task_done_counter = Counter(
'luigi_task_done_total',
'number of done luigi tasks',
['family'],
self.labels,
registry=self.registry
)
self.task_execution_time = Gauge(
'luigi_task_execution_time_seconds',
'luigi task execution time in seconds',
['family'],
self.labels,
registry=self.registry
)

def generate_latest(self):
return generate_latest(self.registry)

def handle_task_started(self, task):
self.task_started_counter.labels(family=task.family).inc()
self.task_execution_time.labels(family=task.family)
self.task_started_counter.labels(**self._generate_task_labels(task)).inc()
self.task_execution_time.labels(**self._generate_task_labels(task))

def handle_task_failed(self, task):
self.task_failed_counter.labels(family=task.family).inc()
self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running)
self.task_failed_counter.labels(**self._generate_task_labels(task)).inc()
self.task_execution_time.labels(**self._generate_task_labels(task)).set(task.updated - task.time_running)

def handle_task_disabled(self, task, config):
self.task_disabled_counter.labels(family=task.family).inc()
self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running)
self.task_disabled_counter.labels(**self._generate_task_labels(task)).inc()
self.task_execution_time.labels(**self._generate_task_labels(task)).set(task.updated - task.time_running)

def handle_task_done(self, task):
self.task_done_counter.labels(family=task.family).inc()
self.task_done_counter.labels(**self._generate_task_labels(task)).inc()
# time_running can be `None` if task was already complete
if task.time_running is not None:
self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running)
self.task_execution_time.labels(**self._generate_task_labels(task)).set(task.updated - task.time_running)

def configure_http_handler(self, http_handler):
http_handler.set_header('Content-Type', CONTENT_TYPE_LATEST)

0 comments on commit 6cba1e0

Please sign in to comment.