diff --git a/charms/kfp-api/lib/charms/loki_k8s/v1/loki_push_api.py b/charms/kfp-api/lib/charms/loki_k8s/v1/loki_push_api.py
new file mode 100644
index 00000000..c3c1d086
--- /dev/null
+++ b/charms/kfp-api/lib/charms/loki_k8s/v1/loki_push_api.py
@@ -0,0 +1,2750 @@
+#!/usr/bin/env python3
+# Copyright 2023 Canonical Ltd.
+# See LICENSE file for licensing details.
+# Learn more at: https://juju.is/docs/sdk
+r"""## Overview.
+This document explains how to use the two principal objects this library provides:
+- `LokiPushApiProvider`: This object is meant to be used by any Charmed Operator that needs to
+implement the provider side of the `loki_push_api` relation interface. For instance, a Loki charm.
+The provider side of the relation represents the server side, to which logs are being pushed.
+- `LokiPushApiConsumer`: This object is meant to be used by any Charmed Operator that needs to
+send log to Loki by implementing the consumer side of the `loki_push_api` relation interface.
+For instance, a Promtail or Grafana agent charm which needs to send logs to Loki.
+- `LogProxyConsumer`: DEPRECATED.
+This object can be used by any Charmed Operator which needs to send telemetry, such as logs, to
+Loki through a Log Proxy by implementing the consumer side of the `loki_push_api` relation
+In order to be able to control the labels on the logs pushed this object adds a Pebble layer
+that runs Promtail in the workload container, injecting Juju topology labels into the
+logs on the fly.
+This object is deprecated. Consider migrating to LogForwarder with the release of Juju 3.6 LTS.
+- `LogForwarder`: This object can be used by any Charmed Operator which needs to send the workload
+standard output (stdout) through Pebble's log forwarding mechanism, to Loki endpoints through the
+`loki_push_api` relation interface.
+In order to be able to control the labels on the logs pushed this object updates the pebble layer's
+"log-targets" section with Juju topology.
+Filtering logs in Loki is largely performed on the basis of labels. In the Juju ecosystem, Juju
+topology labels are used to uniquely identify the workload which generates telemetry like logs.
+## LokiPushApiProvider Library Usage
+This object may be used by any Charmed Operator which implements the `loki_push_api` interface.
+For instance, Loki or Grafana Agent.
+For this purpose a charm needs to instantiate the `LokiPushApiProvider` object with one mandatory
+and three optional arguments.
+- `charm`: A reference to the parent (Loki) charm.
+- `relation_name`: The name of the relation that the charm uses to interact
+ with its clients, which implement `LokiPushApiConsumer` `LogForwarder`, or `LogProxyConsumer`
+ (note that LogProxyConsumer is deprecated).
+ If provided, this relation name must match a provided relation in metadata.yaml with the
+ `loki_push_api` interface.
+ The default relation name is "logging" for `LokiPushApiConsumer` and `LogForwarder`, and
+ "log-proxy" for `LogProxyConsumer` (note that LogProxyConsumer is deprecated).
+ For example, a provider's `metadata.yaml` file may look as follows:
+ ```yaml
+ provides:
+ logging:
+ interface: loki_push_api
+ ```
+ Subsequently, a Loki charm may instantiate the `LokiPushApiProvider` in its constructor as
+ follows:
+ from charms.loki_k8s.v1.loki_push_api import LokiPushApiProvider
+ from loki_server import LokiServer
+ ...
+ class LokiOperatorCharm(CharmBase):
+ ...
+ def __init__(self, *args):
+ super().__init__(*args)
+ ...
+ external_url = urlparse(self._external_url)
+ self.loki_provider = LokiPushApiProvider(
+ self,
+ address=external_url.hostname or self.hostname,
+ port=external_url.port or 80,
+ scheme=external_url.scheme,
+ path=f"{external_url.path}/loki/api/v1/push",
+ )
+ ...
+ - `port`: Loki Push Api endpoint port. Default value: `3100`.
+ - `scheme`: Loki Push Api endpoint scheme (`HTTP` or `HTTPS`). Default value: `HTTP`
+ - `address`: Loki Push Api endpoint address. Default value: `localhost`
+ - `path`: Loki Push Api endpoint path. Default value: `loki/api/v1/push`
+The `LokiPushApiProvider` object has several responsibilities:
+1. Set the URL of the Loki Push API in the relation application data bag; the URL
+ must be unique to all instances (e.g. using a load balancer).
+2. Set the Promtail binary URL (`promtail_binary_zip_url`) so clients that use
+ `LogProxyConsumer` object could download and configure it.
+3. Process the metadata of the consumer application, provided via the
+ "metadata" field of the consumer data bag, which are used to annotate the
+ alert rules (see next point). An example for "metadata" is the following:
+ {'model': 'loki',
+ 'model_uuid': '0b7d1071-ded2-4bf5-80a3-10a81aeb1386',
+ 'application': 'promtail-k8s'
+ }
+4. Process alert rules set into the relation by the `LokiPushApiConsumer`
+ objects, e.g.:
+ '{
+ "groups": [{
+ "name": "loki_0b7d1071-ded2-4bf5-80a3-10a81aeb1386_promtail-k8s_alerts",
+ "rules": [{
+ "alert": "HighPercentageError",
+ "expr": "sum(rate({app=\\"foo\\", env=\\"production\\"} |= \\"error\\" [5m]))
+ by (job) \\n /\\nsum(rate({app=\\"foo\\", env=\\"production\\"}[5m]))
+ by (job)\\n > 0.05
+ \\n", "for": "10m",
+ "labels": {
+ "severity": "page",
+ "juju_model": "loki",
+ "juju_model_uuid": "0b7d1071-ded2-4bf5-80a3-10a81aeb1386",
+ "juju_application": "promtail-k8s"
+ },
+ "annotations": {
+ "summary": "High request latency"
+ }
+ }]
+ }]
+ }'
+Once these alert rules are sent over relation data, the `LokiPushApiProvider` object
+stores these files in the directory `/loki/rules` inside the Loki charm container. After
+storing alert rules files, the object will check alert rules by querying Loki API
+endpoint: [`loki/api/v1/rules`](https://grafana.com/docs/loki/latest/api/#list-rule-groups).
+If there are changes in the alert rules a `loki_push_api_alert_rules_changed` event will
+be emitted with details about the `RelationEvent` which triggered it.
+This events should be observed in the charm that uses `LokiPushApiProvider`:
+ def __init__(self, *args):
+ super().__init__(*args)
+ ...
+ self.loki_provider = LokiPushApiProvider(self)
+ self.framework.observe(
+ self.loki_provider.on.loki_push_api_alert_rules_changed,
+ self._loki_push_api_alert_rules_changed,
+ )
+## LokiPushApiConsumer Library Usage
+This Loki charm interacts with its clients using the Loki charm library. Charms
+seeking to send log to Loki, must do so using the `LokiPushApiConsumer` object from
+this charm library.
+> **NOTE**: `LokiPushApiConsumer` also depends on an additional charm library.
+> Ensure sure you `charmcraft fetch-lib charms.observability_libs.v0.juju_topology`
+> when using this library.
+For the simplest use cases, using the `LokiPushApiConsumer` object only requires
+instantiating it, typically in the constructor of your charm (the one which
+sends logs).
+from charms.loki_k8s.v1.loki_push_api import LokiPushApiConsumer
+class LokiClientCharm(CharmBase):
+ def __init__(self, *args):
+ super().__init__(*args)
+ ...
+ self._loki_consumer = LokiPushApiConsumer(self)
+The `LokiPushApiConsumer` constructor requires two things:
+- A reference to the parent (LokiClientCharm) charm.
+- Optionally, the name of the relation that the Loki charm uses to interact
+ with its clients. If provided, this relation name must match a required
+ relation in metadata.yaml with the `loki_push_api` interface.
+ This argument is not required if your metadata.yaml has precisely one
+ required relation in metadata.yaml with the `loki_push_api` interface, as the
+ lib will automatically resolve the relation name inspecting the using the
+ meta information of the charm
+Any time the relation between a Loki provider charm and a Loki consumer charm is
+established, a `LokiPushApiEndpointJoined` event is fired. In the consumer side
+is it possible to observe this event with:
+ self._loki_consumer.on.loki_push_api_endpoint_joined,
+ self._on_loki_push_api_endpoint_joined,
+Any time there are departures in relations between the consumer charm and Loki
+the consumer charm is informed, through a `LokiPushApiEndpointDeparted` event, for instance:
+ self._loki_consumer.on.loki_push_api_endpoint_departed,
+ self._on_loki_push_api_endpoint_departed,
+The consumer charm can then choose to update its configuration in both situations.
+Note that LokiPushApiConsumer does not add any labels automatically on its own. In
+order to better integrate with the Canonical Observability Stack, you may want to configure your
+software to add Juju topology labels. The
+[observability-libs](https://charmhub.io/observability-libs) library can be used to get topology
+labels in charm code. See :func:`LogProxyConsumer._scrape_configs` for an example of how
+to do this with promtail.
+## LogProxyConsumer Library Usage
+> Note: This object is deprecated. Consider migrating to LogForwarder with the release of Juju 3.6
+> LTS.
+Let's say that we have a workload charm that produces logs, and we need to send those logs to a
+workload implementing the `loki_push_api` interface, such as `Loki` or `Grafana Agent`.
+Adopting this object in a Charmed Operator consist of two steps:
+1. Use the `LogProxyConsumer` class by instantiating it in the `__init__` method of the charmed
+ operator. There are two ways to get logs in to promtail. You can give it a list of files to
+ read, or you can write to it using the syslog protocol.
+ For example:
+ ```python
+ from charms.loki_k8s.v1.loki_push_api import LogProxyConsumer
+ ...
+ def __init__(self, *args):
+ ...
+ self._log_proxy = LogProxyConsumer(
+ self,
+ logs_scheme={
+ "workload-a": {
+ "log-files": ["/tmp/worload-a-1.log", "/tmp/worload-a-2.log"],
+ "syslog-port": 1514,
+ },
+ "workload-b": {"log-files": ["/tmp/worload-b.log"], "syslog-port": 1515},
+ },
+ relation_name="log-proxy",
+ )
+ self.framework.observe(
+ self._log_proxy.on.promtail_digest_error,
+ self._promtail_error,
+ )
+ def _promtail_error(self, event):
+ logger.error(event.message)
+ self.unit.status = BlockedStatus(event.message)
+ ```
+ Any time the relation between a provider charm and a LogProxy consumer charm is
+ established, a `LogProxyEndpointJoined` event is fired. In the consumer side is it
+ possible to observe this event with:
+ ```python
+ self.framework.observe(
+ self._log_proxy.on.log_proxy_endpoint_joined,
+ self._on_log_proxy_endpoint_joined,
+ )
+ ```
+ Any time there are departures in relations between the consumer charm and the provider
+ the consumer charm is informed, through a `LogProxyEndpointDeparted` event, for instance:
+ ```python
+ self.framework.observe(
+ self._log_proxy.on.log_proxy_endpoint_departed,
+ self._on_log_proxy_endpoint_departed,
+ )
+ ```
+ The consumer charm can then choose to update its configuration in both situations.
+ Note that:
+ - You can configure your syslog software using `localhost` as the address and the method
+ `LogProxyConsumer.syslog_port("container_name")` to get the port, or, alternatively, if you are using rsyslog
+ you may use the method `LogProxyConsumer.rsyslog_config("container_name")`.
+2. Modify the `metadata.yaml` file to add:
+ - The `log-proxy` relation in the `requires` section:
+ ```yaml
+ requires:
+ log-proxy:
+ interface: loki_push_api
+ optional: true
+ ```
+Once the library is implemented in a Charmed Operator and a relation is established with
+the charm that implements the `loki_push_api` interface, the library will inject a
+Pebble layer that runs Promtail in the workload container to send logs.
+By default, the promtail binary injected into the container will be downloaded from the internet.
+If, for any reason, the container has limited network access, you may allow charm administrators
+to provide their own promtail binary at runtime by adding the following snippet to your charm
+ promtail-bin:
+ type: file
+ description: Promtail binary for logging
+ filename: promtail-linux
+Which would then allow operators to deploy the charm this way:
+juju deploy \
+ ./your_charm.charm \
+ --resource promtail-bin=/tmp/promtail-linux-amd64
+If a different resource name is used, it can be specified with the `promtail_resource_name`
+argument to the `LogProxyConsumer` constructor.
+The object can emit a `PromtailDigestError` event:
+- Promtail binary cannot be downloaded.
+- The sha256 sum mismatch for promtail binary.
+The object can raise a `ContainerNotFoundError` event:
+- No `container_name` parameter has been specified and the Pod has more than 1 container.
+These can be monitored via the PromtailDigestError events via:
+ self.framework.observe(
+ self._loki_consumer.on.promtail_digest_error,
+ self._promtail_error,
+ )
+ def _promtail_error(self, event):
+ logger.error(msg)
+ self.unit.status = BlockedStatus(event.message)
+ )
+## LogForwarder class Usage
+Let's say that we have a charm's workload that writes logs to the standard output (stdout),
+and we need to send those logs to a workload implementing the `loki_push_api` interface,
+such as `Loki` or `Grafana Agent`. To know how to reach a Loki instance, a charm would
+typically use the `loki_push_api` interface.
+Use the `LogForwarder` class by instantiating it in the `__init__` method of the charm:
+from charms.loki_k8s.v1.loki_push_api import LogForwarder
+ def __init__(self, *args):
+ ...
+ self._log_forwarder = LogForwarder(
+ self,
+ relation_name="logging" # optional, defaults to `logging`
+ )
+The `LogForwarder` by default will observe relation events on the `logging` endpoint and
+enable/disable log forwarding automatically.
+Next, modify the `metadata.yaml` file to add:
+The `log-forwarding` relation in the `requires` section:
+ logging:
+ interface: loki_push_api
+ optional: true
+Once the LogForwader class is implemented in your charm and the relation (implementing the
+`loki_push_api` interface) is active and healthy, the library will inject a Pebble layer in
+each workload container the charm has access to, to configure Pebble's log forwarding
+feature and start sending logs to Loki.
+## Alerting Rules
+This charm library also supports gathering alerting rules from all related Loki client
+charms and enabling corresponding alerts within the Loki charm. Alert rules are
+automatically gathered by `LokiPushApiConsumer` object from a directory conventionally
+named `loki_alert_rules`.
+This directory must reside at the top level in the `src` folder of the
+consumer charm. Each file in this directory is assumed to be a single alert rule
+in YAML format. The file name must have the `.rule` extension.
+The format of this alert rule conforms to the
+[Loki docs](https://grafana.com/docs/loki/latest/rules/#alerting-rules).
+An example of the contents of one such file is shown below.
+alert: HighPercentageError
+expr: |
+ sum(rate({%%juju_topology%%} |= "error" [5m])) by (job)
+ /
+ sum(rate({%%juju_topology%%}[5m])) by (job)
+ > 0.05
+for: 10m
+ severity: page
+ summary: High request latency
+It is **critical** to use the `%%juju_topology%%` filter in the expression for the alert
+rule shown above. This filter is a stub that is automatically replaced by the
+`LokiPushApiConsumer` following Loki Client's Juju topology (application, model and its
+UUID). Such a topology filter is essential to ensure that alert rules submitted by one
+provider charm generates alerts only for that same charm.
+The Loki charm may be related to multiple Loki client charms. Without this, filter
+rules submitted by one provider charm will also result in corresponding alerts for other
+provider charms. Hence, every alert rule expression must include such a topology filter stub.
+Gathering alert rules and generating rule files within the Loki charm is easily done using
+the `alerts()` method of `LokiPushApiProvider`. Alerts generated by Loki will automatically
+include Juju topology labels in the alerts. These labels indicate the source of the alert.
+The following labels are automatically added to every alert
+- `juju_model`
+- `juju_model_uuid`
+- `juju_application`
+Whether alert rules files does not contain the keys `alert` or `expr` or there is no alert
+rules file in `alert_rules_path` a `loki_push_api_alert_rules_error` event is emitted.
+To handle these situations the event must be observed in the `LokiClientCharm` charm.py file:
+class LokiClientCharm(CharmBase):
+ def __init__(self, *args):
+ super().__init__(*args)
+ ...
+ self._loki_consumer = LokiPushApiConsumer(self)
+ self.framework.observe(
+ self._loki_consumer.on.loki_push_api_alert_rules_error,
+ self._alert_rules_error
+ )
+ def _alert_rules_error(self, event):
+ self.unit.status = BlockedStatus(event.message)
+## Relation Data
+The Loki charm uses both application and unit relation data to obtain information regarding
+Loki Push API and alert rules.
+Units of consumer charm send their alert rules over app relation data using the `alert_rules`
+import json
+import logging
+import os
+import platform
+import re
+import socket
+import subprocess
+import tempfile
+import typing
+from copy import deepcopy
+from gzip import GzipFile
+from hashlib import sha256
+from io import BytesIO
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Tuple, Union
+from urllib import request
+from urllib.error import URLError
+import yaml
+from cosl import JujuTopology
+from ops.charm import (
+ CharmBase,
+ HookEvent,
+ PebbleReadyEvent,
+ RelationBrokenEvent,
+ RelationCreatedEvent,
+ RelationDepartedEvent,
+ RelationEvent,
+ RelationJoinedEvent,
+ RelationRole,
+ WorkloadEvent,
+from ops.framework import EventBase, EventSource, Object, ObjectEvents
+from ops.jujuversion import JujuVersion
+from ops.model import Container, ModelError, Relation
+from ops.pebble import APIError, ChangeError, Layer, PathError, ProtocolError
+# The unique Charmhub library identifier, never change it
+LIBID = "bf76f23cdd03464b877c52bd1d2f563e"
+# Increment this major API version when introducing breaking changes
+# Increment this PATCH version before using `charmcraft publish-lib` or reset
+# to 0 if you are raising the major API version
+PYDEPS = ["cosl"]
+logger = logging.getLogger(__name__)
+RELATION_INTERFACE_NAME = "loki_push_api"
+DEFAULT_ALERT_RULES_RELATIVE_PATH = "./src/loki_alert_rules"
+PROMTAIL_BASE_URL = "https://github.com/canonical/loki-k8s-operator/releases/download"
+# To update Promtail version you only need to change the PROMTAIL_VERSION and
+# update all sha256 sums in PROMTAIL_BINARIES. To support a new architecture
+# you only need to add a new key value pair for the architecture in PROMTAIL_BINARIES.
+ "filename": "promtail-static-arm64",
+ "zipsha": "c083fdb45e5c794103f974eeb426489b4142438d9e10d0ae272b2aff886e249b",
+ "binsha": "4cd055c477a301c0bdfdbcea514e6e93f6df5d57425ce10ffc77f3e16fec1ddf",
+ "amd64": {
+ "filename": "promtail-static-amd64",
+ "zipsha": "6873cbdabf23062aeefed6de5f00ff382710332af3ab90a48c253ea17e08f465",
+ "binsha": "28da9b99f81296fe297831f3bc9d92aea43b4a92826b8ff04ba433b8cb92fb50",
+ },
+# Paths in `charm` container
+BINARY_DIR = "/tmp"
+# Paths in `workload` container
+WORKLOAD_BINARY_DIR = "/opt/promtail"
+WORKLOAD_CONFIG_DIR = "/etc/promtail"
+WORKLOAD_CONFIG_FILE_NAME = "promtail_config.yaml"
+# These are the initial port values. As we can have more than one container,
+# we use odd and even numbers to avoid collisions.
+# Each new container adds 2 to the previous value.
+HTTP_LISTEN_PORT_START = 9080 # even start port
+GRPC_LISTEN_PORT_START = 9095 # odd start port
+class RelationNotFoundError(ValueError):
+ """Raised if there is no relation with the given name."""
+ def __init__(self, relation_name: str):
+ self.relation_name = relation_name
+ self.message = "No relation named '{}' found".format(relation_name)
+ super().__init__(self.message)
+class RelationInterfaceMismatchError(Exception):
+ """Raised if the relation with the given name has a different interface."""
+ def __init__(
+ self,
+ relation_name: str,
+ expected_relation_interface: str,
+ actual_relation_interface: str,
+ ):
+ self.relation_name = relation_name
+ self.expected_relation_interface = expected_relation_interface
+ self.actual_relation_interface = actual_relation_interface
+ self.message = (
+ "The '{}' relation has '{}' as interface rather than the expected '{}'".format(
+ relation_name, actual_relation_interface, expected_relation_interface
+ )
+ )
+ super().__init__(self.message)
+class RelationRoleMismatchError(Exception):
+ """Raised if the relation with the given name has a different direction."""
+ def __init__(
+ self,
+ relation_name: str,
+ expected_relation_role: RelationRole,
+ actual_relation_role: RelationRole,
+ ):
+ self.relation_name = relation_name
+ self.expected_relation_interface = expected_relation_role
+ self.actual_relation_role = actual_relation_role
+ self.message = "The '{}' relation has role '{}' rather than the expected '{}'".format(
+ relation_name, repr(actual_relation_role), repr(expected_relation_role)
+ )
+ super().__init__(self.message)
+def _validate_relation_by_interface_and_direction(
+ charm: CharmBase,
+ relation_name: str,
+ expected_relation_interface: str,
+ expected_relation_role: RelationRole,
+ """Verifies that a relation has the necessary characteristics.
+ Verifies that the `relation_name` provided: (1) exists in metadata.yaml,
+ (2) declares as interface the interface name passed as `relation_interface`
+ and (3) has the right "direction", i.e., it is a relation that `charm`
+ provides or requires.
+ Args:
+ charm: a `CharmBase` object to scan for the matching relation.
+ relation_name: the name of the relation to be verified.
+ expected_relation_interface: the interface name to be matched by the
+ relation named `relation_name`.
+ expected_relation_role: whether the `relation_name` must be either
+ provided or required by `charm`.
+ Raises:
+ RelationNotFoundError: If there is no relation in the charm's metadata.yaml
+ with the same name as provided via `relation_name` argument.
+ RelationInterfaceMismatchError: The relation with the same name as provided
+ via `relation_name` argument does not have the same relation interface
+ as specified via the `expected_relation_interface` argument.
+ RelationRoleMismatchError: If the relation with the same name as provided
+ via `relation_name` argument does not have the same role as specified
+ via the `expected_relation_role` argument.
+ """
+ if relation_name not in charm.meta.relations:
+ raise RelationNotFoundError(relation_name)
+ relation = charm.meta.relations[relation_name]
+ actual_relation_interface = relation.interface_name
+ if actual_relation_interface != expected_relation_interface:
+ raise RelationInterfaceMismatchError(
+ relation_name,
+ expected_relation_interface,
+ actual_relation_interface, # pyright: ignore
+ )
+ if expected_relation_role == RelationRole.provides:
+ if relation_name not in charm.meta.provides:
+ raise RelationRoleMismatchError(
+ relation_name, RelationRole.provides, RelationRole.requires
+ )
+ elif expected_relation_role == RelationRole.requires:
+ if relation_name not in charm.meta.requires:
+ raise RelationRoleMismatchError(
+ relation_name, RelationRole.requires, RelationRole.provides
+ )
+ else:
+ raise Exception("Unexpected RelationDirection: {}".format(expected_relation_role))
+class InvalidAlertRulePathError(Exception):
+ """Raised if the alert rules folder cannot be found or is otherwise invalid."""
+ def __init__(
+ self,
+ alert_rules_absolute_path: Path,
+ message: str,
+ ):
+ self.alert_rules_absolute_path = alert_rules_absolute_path
+ self.message = message
+ super().__init__(self.message)
+def _is_official_alert_rule_format(rules_dict: dict) -> bool:
+ """Are alert rules in the upstream format as supported by Loki.
+ Alert rules in dictionary format are in "official" form if they
+ contain a "groups" key, since this implies they contain a list of
+ alert rule groups.
+ Args:
+ rules_dict: a set of alert rules in Python dictionary format
+ Returns:
+ True if alert rules are in official Loki file format.
+ """
+ return "groups" in rules_dict
+def _is_single_alert_rule_format(rules_dict: dict) -> bool:
+ """Are alert rules in single rule format.
+ The Loki charm library supports reading of alert rules in a
+ custom format that consists of a single alert rule per file. This
+ does not conform to the official Loki alert rule file format
+ which requires that each alert rules file consists of a list of
+ alert rule groups and each group consists of a list of alert
+ rules.
+ Alert rules in dictionary form are considered to be in single rule
+ format if in the least it contains two keys corresponding to the
+ alert rule name and alert expression.
+ Returns:
+ True if alert rule is in single rule file format.
+ """
+ # one alert rule per file
+ return set(rules_dict) >= {"alert", "expr"}
+class AlertRules:
+ """Utility class for amalgamating Loki alert rule files and injecting juju topology.
+ An `AlertRules` object supports aggregating alert rules from files and directories in both
+ official and single rule file formats using the `add_path()` method. All the alert rules
+ read are annotated with Juju topology labels and amalgamated into a single data structure
+ in the form of a Python dictionary using the `as_dict()` method. Such a dictionary can be
+ easily dumped into JSON format and exchanged over relation data. The dictionary can also
+ be dumped into YAML format and written directly into an alert rules file that is read by
+ Loki. Note that multiple `AlertRules` objects must not be written into the same file,
+ since Loki allows only a single list of alert rule groups per alert rules file.
+ The official Loki format is a YAML file conforming to the Loki documentation
+ (https://grafana.com/docs/loki/latest/api/#list-rule-groups).
+ The custom single rule format is a subsection of the official YAML, having a single alert
+ rule, effectively "one alert per file".
+ """
+ # This class uses the following terminology for the various parts of a rule file:
+ # - alert rules file: the entire groups[] yaml, including the "groups:" key.
+ # - alert groups (plural): the list of groups[] (a list, i.e. no "groups:" key) - it is a list
+ # of dictionaries that have the "name" and "rules" keys.
+ # - alert group (singular): a single dictionary that has the "name" and "rules" keys.
+ # - alert rules (plural): all the alerts in a given alert group - a list of dictionaries with
+ # the "alert" and "expr" keys.
+ # - alert rule (singular): a single dictionary that has the "alert" and "expr" keys.
+ def __init__(self, topology: Optional[JujuTopology] = None):
+ """Build and alert rule object.
+ Args:
+ topology: a `JujuTopology` instance that is used to annotate all alert rules.
+ """
+ self.topology = topology
+ self.tool = CosTool(None)
+ self.alert_groups = [] # type: List[dict]
+ def _from_file(self, root_path: Path, file_path: Path) -> List[dict]:
+ """Read a rules file from path, injecting juju topology.
+ Args:
+ root_path: full path to the root rules folder (used only for generating group name)
+ file_path: full path to a *.rule file.
+ Returns:
+ A list of dictionaries representing the rules file, if file is valid (the structure is
+ formed by `yaml.safe_load` of the file); an empty list otherwise.
+ """
+ with file_path.open() as rf:
+ # Load a list of rules from file then add labels and filters
+ try:
+ rule_file = yaml.safe_load(rf) or {}
+ except Exception as e:
+ logger.error("Failed to read alert rules from %s: %s", file_path.name, e)
+ return []
+ if _is_official_alert_rule_format(rule_file):
+ alert_groups = rule_file["groups"]
+ elif _is_single_alert_rule_format(rule_file):
+ # convert to list of alert groups
+ # group name is made up from the file name
+ alert_groups = [{"name": file_path.stem, "rules": [rule_file]}]
+ else:
+ # invalid/unsupported
+ reason = "file is empty" if not rule_file else "unexpected file structure"
+ logger.error("Invalid rules file (%s): %s", reason, file_path.name)
+ return []
+ # update rules with additional metadata
+ for alert_group in alert_groups:
+ # update group name with topology and sub-path
+ alert_group["name"] = self._group_name(
+ str(root_path),
+ str(file_path),
+ alert_group["name"],
+ )
+ # add "juju_" topology labels
+ for alert_rule in alert_group["rules"]:
+ if "labels" not in alert_rule:
+ alert_rule["labels"] = {}
+ if self.topology:
+ # only insert labels that do not already exist
+ for label, val in self.topology.label_matcher_dict.items():
+ if label not in alert_rule["labels"]:
+ alert_rule["labels"][label] = val
+ # insert juju topology filters into a prometheus alert rule
+ # logql doesn't like empty matchers, so add a job matcher which hits
+ # any string as a "wildcard" which the topology labels will
+ # filter down
+ alert_rule["expr"] = self.tool.inject_label_matchers(
+ re.sub(r"%%juju_topology%%", r'job=~".+"', alert_rule["expr"]),
+ self.topology.label_matcher_dict,
+ )
+ return alert_groups
+ def _group_name(
+ self,
+ root_path: typing.Union[Path, str],
+ file_path: typing.Union[Path, str],
+ group_name: str,
+ ) -> str:
+ """Generate group name from path and topology.
+ The group name is made up of the relative path between the root dir_path, the file path,
+ and topology identifier.
+ Args:
+ root_path: path to the root rules dir.
+ file_path: path to rule file.
+ group_name: original group name to keep as part of the new augmented group name
+ Returns:
+ New group name, augmented by juju topology and relative path.
+ """
+ file_path = Path(file_path) if not isinstance(file_path, Path) else file_path
+ root_path = Path(root_path) if not isinstance(root_path, Path) else root_path
+ rel_path = file_path.parent.relative_to(root_path.as_posix())
+ # We should account for both absolute paths and Windows paths. Convert it to a POSIX
+ # string, strip off any leading /, then join it
+ path_str = ""
+ if not rel_path == Path("."):
+ # Get rid of leading / and optionally drive letters so they don't muck up
+ # the template later, since Path.parts returns them. The 'if relpath.is_absolute ...'
+ # isn't even needed since re.sub doesn't throw exceptions if it doesn't match, so it's
+ # optional, but it makes it clear what we're doing.
+ # Note that Path doesn't actually care whether the path is valid just to instantiate
+ # the object, so we can happily strip that stuff out to make templating nicer
+ rel_path = Path(
+ re.sub(r"^([A-Za-z]+:)?/", "", rel_path.as_posix())
+ if rel_path.is_absolute()
+ else str(rel_path)
+ )
+ # Get rid of relative path characters in the middle which both os.path and pathlib
+ # leave hanging around. We could use path.resolve(), but that would lead to very
+ # long template strings when rules come from pods and/or other deeply nested charm
+ # paths
+ path_str = "_".join(filter(lambda x: x not in ["..", "/"], rel_path.parts))
+ # Generate group name:
+ # - name, from juju topology
+ # - suffix, from the relative path of the rule file;
+ group_name_parts = [self.topology.identifier] if self.topology else []
+ group_name_parts.extend([path_str, group_name, "alerts"])
+ # filter to remove empty strings
+ return "_".join(filter(lambda x: x, group_name_parts))
+ @classmethod
+ def _multi_suffix_glob(
+ cls, dir_path: Path, suffixes: List[str], recursive: bool = True
+ ) -> list:
+ """Helper function for getting all files in a directory that have a matching suffix.
+ Args:
+ dir_path: path to the directory to glob from.
+ suffixes: list of suffixes to include in the glob (items should begin with a period).
+ recursive: a flag indicating whether a glob is recursive (nested) or not.
+ Returns:
+ List of files in `dir_path` that have one of the suffixes specified in `suffixes`.
+ """
+ all_files_in_dir = dir_path.glob("**/*" if recursive else "*")
+ return list(filter(lambda f: f.is_file() and f.suffix in suffixes, all_files_in_dir))
+ def _from_dir(self, dir_path: Path, recursive: bool) -> List[dict]:
+ """Read all rule files in a directory.
+ All rules from files for the same directory are loaded into a single
+ group. The generated name of this group includes juju topology.
+ By default, only the top directory is scanned; for nested scanning, pass `recursive=True`.
+ Args:
+ dir_path: directory containing *.rule files (alert rules without groups).
+ recursive: flag indicating whether to scan for rule files recursively.
+ Returns:
+ a list of dictionaries representing prometheus alert rule groups, each dictionary
+ representing an alert group (structure determined by `yaml.safe_load`).
+ """
+ alert_groups = [] # type: List[dict]
+ # Gather all alerts into a list of groups
+ for file_path in self._multi_suffix_glob(dir_path, [".rule", ".rules"], recursive):
+ alert_groups_from_file = self._from_file(dir_path, file_path)
+ if alert_groups_from_file:
+ logger.debug("Reading alert rule from %s", file_path)
+ alert_groups.extend(alert_groups_from_file)
+ return alert_groups
+ def add_path(self, path_str: str, *, recursive: bool = False):
+ """Add rules from a dir path.
+ All rules from files are aggregated into a data structure representing a single rule file.
+ All group names are augmented with juju topology.
+ Args:
+ path_str: either a rules file or a dir of rules files.
+ recursive: whether to read files recursively or not (no impact if `path` is a file).
+ Raises:
+ InvalidAlertRulePathError: if the provided path is invalid.
+ """
+ path = Path(path_str) # type: Path
+ if path.is_dir():
+ self.alert_groups.extend(self._from_dir(path, recursive))
+ elif path.is_file():
+ self.alert_groups.extend(self._from_file(path.parent, path))
+ else:
+ logger.debug("The alerts file does not exist: %s", path)
+ def as_dict(self) -> dict:
+ """Return standard alert rules file in dict representation.
+ Returns:
+ a dictionary containing a single list of alert rule groups.
+ The list of alert rule groups is provided as value of the
+ "groups" dictionary key.
+ """
+ return {"groups": self.alert_groups} if self.alert_groups else {}
+def _resolve_dir_against_charm_path(charm: CharmBase, *path_elements: str) -> str:
+ """Resolve the provided path items against the directory of the main file.
+ Look up the directory of the `main.py` file being executed. This is normally
+ going to be the charm.py file of the charm including this library. Then, resolve
+ the provided path elements and, if the result path exists and is a directory,
+ return its absolute path; otherwise, raise en exception.
+ Raises:
+ InvalidAlertRulePathError, if the path does not exist or is not a directory.
+ """
+ charm_dir = Path(str(charm.charm_dir))
+ if not charm_dir.exists() or not charm_dir.is_dir():
+ # Operator Framework does not currently expose a robust
+ # way to determine the top level charm source directory
+ # that is consistent across deployed charms and unit tests
+ # Hence for unit tests the current working directory is used
+ # TODO: updated this logic when the following ticket is resolved
+ # https://github.com/canonical/operator/issues/643
+ charm_dir = Path(os.getcwd())
+ alerts_dir_path = charm_dir.absolute().joinpath(*path_elements)
+ if not alerts_dir_path.exists():
+ raise InvalidAlertRulePathError(alerts_dir_path, "directory does not exist")
+ if not alerts_dir_path.is_dir():
+ raise InvalidAlertRulePathError(alerts_dir_path, "is not a directory")
+ return str(alerts_dir_path)
+class NoRelationWithInterfaceFoundError(Exception):
+ """No relations with the given interface are found in the charm meta."""
+ def __init__(self, charm: CharmBase, relation_interface: Optional[str] = None):
+ self.charm = charm
+ self.relation_interface = relation_interface
+ self.message = (
+ "No relations with interface '{}' found in the meta of the '{}' charm".format(
+ relation_interface, charm.meta.name
+ )
+ )
+ super().__init__(self.message)
+class MultipleRelationsWithInterfaceFoundError(Exception):
+ """Multiple relations with the given interface are found in the charm meta."""
+ def __init__(self, charm: CharmBase, relation_interface: str, relations: list):
+ self.charm = charm
+ self.relation_interface = relation_interface
+ self.relations = relations
+ self.message = (
+ "Multiple relations with interface '{}' found in the meta of the '{}' charm.".format(
+ relation_interface, charm.meta.name
+ )
+ )
+ super().__init__(self.message)
+class LokiPushApiEndpointDeparted(EventBase):
+ """Event emitted when Loki departed."""
+class LokiPushApiEndpointJoined(EventBase):
+ """Event emitted when Loki joined."""
+class LokiPushApiAlertRulesChanged(EventBase):
+ """Event emitted if there is a change in the alert rules."""
+ def __init__(self, handle, relation, relation_id, app=None, unit=None):
+ """Pretend we are almost like a RelationEvent.
+ Fields to serialize:
+ {
+ "relation_name": ,
+ "relation_id": ,
+ "app_name": ,
+ "unit_name":
+ }
+ In this way, we can transparently use `RelationEvent.snapshot()` to pass
+ it back if we need to log it.
+ """
+ super().__init__(handle)
+ self.relation = relation
+ self.relation_id = relation_id
+ self.app = app
+ self.unit = unit
+ def snapshot(self) -> Dict:
+ """Save event information."""
+ if not self.relation:
+ return {}
+ snapshot = {"relation_name": self.relation.name, "relation_id": self.relation.id}
+ if self.app:
+ snapshot["app_name"] = self.app.name
+ if self.unit:
+ snapshot["unit_name"] = self.unit.name
+ return snapshot
+ def restore(self, snapshot: dict):
+ """Restore event information."""
+ self.relation = self.framework.model.get_relation(
+ snapshot["relation_name"], snapshot["relation_id"]
+ )
+ app_name = snapshot.get("app_name")
+ if app_name:
+ self.app = self.framework.model.get_app(app_name)
+ else:
+ self.app = None
+ unit_name = snapshot.get("unit_name")
+ if unit_name:
+ self.unit = self.framework.model.get_unit(unit_name)
+ else:
+ self.unit = None
+class InvalidAlertRuleEvent(EventBase):
+ """Event emitted when alert rule files are not parsable.
+ Enables us to set a clear status on the provider.
+ """
+ def __init__(self, handle, errors: str = "", valid: bool = False):
+ super().__init__(handle)
+ self.errors = errors
+ self.valid = valid
+ def snapshot(self) -> Dict:
+ """Save alert rule information."""
+ return {
+ "valid": self.valid,
+ "errors": self.errors,
+ }
+ def restore(self, snapshot):
+ """Restore alert rule information."""
+ self.valid = snapshot["valid"]
+ self.errors = snapshot["errors"]
+class LokiPushApiEvents(ObjectEvents):
+ """Event descriptor for events raised by `LokiPushApiProvider`."""
+ loki_push_api_endpoint_departed = EventSource(LokiPushApiEndpointDeparted)
+ loki_push_api_endpoint_joined = EventSource(LokiPushApiEndpointJoined)
+ loki_push_api_alert_rules_changed = EventSource(LokiPushApiAlertRulesChanged)
+ alert_rule_status_changed = EventSource(InvalidAlertRuleEvent)
+class LokiPushApiProvider(Object):
+ """A LokiPushApiProvider class."""
+ on = LokiPushApiEvents() # pyright: ignore
+ def __init__(
+ self,
+ charm,
+ relation_name: str = DEFAULT_RELATION_NAME,
+ *,
+ port: Union[str, int] = 3100,
+ scheme: str = "http",
+ address: str = "localhost",
+ path: str = "loki/api/v1/push",
+ ):
+ """A Loki service provider.
+ Args:
+ charm: a `CharmBase` instance that manages this
+ instance of the Loki service.
+ relation_name: an optional string name of the relation between `charm`
+ and the Loki charmed service. The default is "logging".
+ It is strongly advised not to change the default, so that people
+ deploying your charm will have a consistent experience with all
+ other charms that consume metrics endpoints.
+ port: an optional port of the Loki service (default is "3100").
+ scheme: an optional scheme of the Loki API URL (default is "http").
+ address: an optional address of the Loki service (default is "localhost").
+ path: an optional path of the Loki API URL (default is "loki/api/v1/push")
+ Raises:
+ RelationNotFoundError: If there is no relation in the charm's metadata.yaml
+ with the same name as provided via `relation_name` argument.
+ RelationInterfaceMismatchError: The relation with the same name as provided
+ via `relation_name` argument does not have the `loki_push_api` relation
+ interface.
+ RelationRoleMismatchError: If the relation with the same name as provided
+ via `relation_name` argument does not have the `RelationRole.requires`
+ role.
+ """
+ _validate_relation_by_interface_and_direction(
+ charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.provides
+ )
+ super().__init__(charm, relation_name)
+ self._charm = charm
+ self._relation_name = relation_name
+ self._tool = CosTool(self)
+ self.port = int(port)
+ self.scheme = scheme
+ self.address = address
+ self.path = path
+ events = self._charm.on[relation_name]
+ self.framework.observe(self._charm.on.upgrade_charm, self._on_lifecycle_event)
+ self.framework.observe(events.relation_joined, self._on_logging_relation_joined)
+ self.framework.observe(events.relation_changed, self._on_logging_relation_changed)
+ self.framework.observe(events.relation_departed, self._on_logging_relation_departed)
+ self.framework.observe(events.relation_broken, self._on_logging_relation_broken)
+ def _on_lifecycle_event(self, _):
+ # Upgrade event or other charm-level event
+ should_update = False
+ for relation in self._charm.model.relations[self._relation_name]:
+ # Don't accidentally flip a True result back.
+ should_update = should_update or self._process_logging_relation_changed(relation)
+ if should_update:
+ # We don't have a RelationEvent, so build it up by hand
+ first_rel = self._charm.model.relations[self._relation_name][0]
+ self.on.loki_push_api_alert_rules_changed.emit(
+ relation=first_rel,
+ relation_id=first_rel.id,
+ )
+ def _on_logging_relation_joined(self, event: RelationJoinedEvent):
+ """Set basic data on relation joins.
+ Set the promtail binary URL location, which will not change, and anything
+ else which may be required, but is static..
+ Args:
+ event: a `CharmEvent` in response to which the consumer
+ charm must set its relation data.
+ """
+ if self._charm.unit.is_leader():
+ event.relation.data[self._charm.app].update(self._promtail_binary_url)
+ logger.debug("Saved promtail binary url: %s", self._promtail_binary_url)
+ def _on_logging_relation_changed(self, event: HookEvent):
+ """Handle changes in related consumers.
+ Anytime there are changes in the relation between Loki
+ and its consumers charms.
+ Args:
+ event: a `CharmEvent` in response to which the consumer
+ charm must update its relation data.
+ """
+ should_update = self._process_logging_relation_changed(event.relation) # pyright: ignore
+ if should_update:
+ self.on.loki_push_api_alert_rules_changed.emit(
+ relation=event.relation, # pyright: ignore
+ relation_id=event.relation.id, # pyright: ignore
+ app=self._charm.app,
+ unit=self._charm.unit,
+ )
+ def _on_logging_relation_broken(self, event: RelationBrokenEvent):
+ """Removes alert rules files when consumer charms left the relation with Loki.
+ Args:
+ event: a `CharmEvent` in response to which the Loki
+ charm must update its relation data.
+ """
+ self.on.loki_push_api_alert_rules_changed.emit(
+ relation=event.relation,
+ relation_id=event.relation.id,
+ app=self._charm.app,
+ unit=self._charm.unit,
+ )
+ def _on_logging_relation_departed(self, event: RelationDepartedEvent):
+ """Removes alert rules files when consumer charms left the relation with Loki.
+ Args:
+ event: a `CharmEvent` in response to which the Loki
+ charm must update its relation data.
+ """
+ self.on.loki_push_api_alert_rules_changed.emit(
+ relation=event.relation,
+ relation_id=event.relation.id,
+ app=self._charm.app,
+ unit=self._charm.unit,
+ )
+ def _should_update_alert_rules(self, relation) -> bool:
+ """Determine whether alert rules should be regenerated.
+ If there are alert rules in the relation data bag, tell the charm
+ whether to regenerate them based on the boolean returned here.
+ """
+ if relation.data.get(relation.app).get("alert_rules", None) is not None:
+ return True
+ return False
+ def _process_logging_relation_changed(self, relation: Relation) -> bool:
+ """Handle changes in related consumers.
+ Anytime there are changes in relations between Loki
+ and its consumers charms, Loki set the `loki_push_api`
+ into the relation data. Set the endpoint building
+ appropriately, and if there are alert rules present in
+ the relation, let the caller know.
+ Besides Loki generates alert rules files based what
+ consumer charms forwards,
+ Args:
+ relation: the `Relation` instance to update.
+ Returns:
+ A boolean indicating whether an event should be emitted, so we
+ only emit one on lifecycle events
+ """
+ relation.data[self._charm.unit]["public_address"] = socket.getfqdn() or ""
+ self.update_endpoint(relation=relation)
+ return self._should_update_alert_rules(relation)
+ @property
+ def _promtail_binary_url(self) -> dict:
+ """URL from which Promtail binary can be downloaded."""
+ # construct promtail binary url paths from parts
+ promtail_binaries = {}
+ for arch, info in PROMTAIL_BINARIES.items():
+ info["url"] = "{}/promtail-{}/{}.gz".format(
+ )
+ promtail_binaries[arch] = info
+ return {"promtail_binary_zip_url": json.dumps(promtail_binaries)}
+ def update_endpoint(self, url: str = "", relation: Optional[Relation] = None) -> None:
+ """Triggers programmatically the update of endpoint in unit relation data.
+ This method should be used when the charm relying on this library needs
+ to update the relation data in response to something occurring outside
+ the `logging` relation lifecycle, e.g., in case of a
+ host address change because the charmed operator becomes connected to an
+ Ingress after the `logging` relation is established.
+ Args:
+ url: An optional url value to update relation data.
+ relation: An optional instance of `class:ops.model.Relation` to update.
+ """
+ # if no relation is specified update all of them
+ if not relation:
+ if not self._charm.model.relations.get(self._relation_name):
+ return
+ relations_list = self._charm.model.relations.get(self._relation_name)
+ else:
+ relations_list = [relation]
+ endpoint = self._endpoint(url or self._url)
+ for relation in relations_list:
+ relation.data[self._charm.unit].update({"endpoint": json.dumps(endpoint)})
+ logger.debug("Saved endpoint in unit relation data")
+ @property
+ def _url(self) -> str:
+ """Get local Loki Push API url.
+ Return url to loki, including port number, but without the endpoint subpath.
+ """
+ return "http://{}:{}".format(socket.getfqdn(), self.port)
+ def _endpoint(self, url) -> dict:
+ """Get Loki push API endpoint for a given url.
+ Args:
+ url: A loki unit URL.
+ Returns: str
+ """
+ endpoint = "/loki/api/v1/push"
+ return {"url": url.rstrip("/") + endpoint}
+ @property
+ def alerts(self) -> dict: # noqa: C901
+ """Fetch alerts for all relations.
+ A Loki alert rules file consists of a list of "groups". Each
+ group consists of a list of alerts (`rules`) that are sequentially
+ executed. This method returns all the alert rules provided by each
+ related metrics provider charm. These rules may be used to generate a
+ separate alert rules file for each relation since the returned list
+ of alert groups are indexed by relation ID. Also for each relation ID
+ associated scrape metadata such as Juju model, UUID and application
+ name are provided so a unique name may be generated for the rules
+ file. For each relation the structure of data returned is a dictionary
+ with four keys
+ - groups
+ - model
+ - model_uuid
+ - application
+ The value of the `groups` key is such that it may be used to generate
+ a Loki alert rules file directly using `yaml.dump` but the
+ `groups` key itself must be included as this is required by Loki,
+ for example as in `yaml.dump({"groups": alerts["groups"]})`.
+ Currently only accepts a list of rules and these
+ rules are all placed into a single group, even though Loki itself
+ allows for multiple groups within a single alert rules file.
+ Returns:
+ a dictionary of alert rule groups and associated scrape
+ metadata indexed by relation ID.
+ """
+ alerts = {} # type: Dict[str, dict] # mapping b/w juju identifiers and alert rule files
+ for relation in self._charm.model.relations[self._relation_name]:
+ if not relation.units or not relation.app:
+ continue
+ alert_rules = json.loads(relation.data[relation.app].get("alert_rules", "{}"))
+ if not alert_rules:
+ continue
+ alert_rules = self._inject_alert_expr_labels(alert_rules)
+ identifier, topology = self._get_identifier_by_alert_rules(alert_rules)
+ if not topology:
+ try:
+ metadata = json.loads(relation.data[relation.app]["metadata"])
+ identifier = JujuTopology.from_dict(metadata).identifier
+ alerts[identifier] = self._tool.apply_label_matchers(alert_rules) # type: ignore
+ except KeyError as e:
+ logger.debug(
+ "Relation %s has no 'metadata': %s",
+ relation.id,
+ e,
+ )
+ if not identifier:
+ logger.error(
+ "Alert rules were found but no usable group or identifier was present."
+ )
+ continue
+ _, errmsg = self._tool.validate_alert_rules(alert_rules)
+ if errmsg:
+ relation.data[self._charm.app]["event"] = json.dumps({"errors": errmsg})
+ continue
+ alerts[identifier] = alert_rules
+ return alerts
+ def _get_identifier_by_alert_rules(
+ self, rules: dict
+ ) -> Tuple[Union[str, None], Union[JujuTopology, None]]:
+ """Determine an appropriate dict key for alert rules.
+ The key is used as the filename when writing alerts to disk, so the structure
+ and uniqueness is important.
+ Args:
+ rules: a dict of alert rules
+ Returns:
+ A tuple containing an identifier, if found, and a JujuTopology, if it could
+ be constructed.
+ """
+ if "groups" not in rules:
+ logger.debug("No alert groups were found in relation data")
+ return None, None
+ # Construct an ID based on what's in the alert rules if they have labels
+ for group in rules["groups"]:
+ try:
+ labels = group["rules"][0]["labels"]
+ topology = JujuTopology(
+ # Don't try to safely get required constructor fields. There's already
+ # a handler for KeyErrors
+ model_uuid=labels["juju_model_uuid"],
+ model=labels["juju_model"],
+ application=labels["juju_application"],
+ unit=labels.get("juju_unit", ""),
+ charm_name=labels.get("juju_charm", ""),
+ )
+ return topology.identifier, topology
+ except KeyError:
+ logger.debug("Alert rules were found but no usable labels were present")
+ continue
+ logger.warning(
+ "No labeled alert rules were found, and no 'scrape_metadata' "
+ "was available. Using the alert group name as filename."
+ )
+ try:
+ for group in rules["groups"]:
+ return group["name"], None
+ except KeyError:
+ logger.debug("No group name was found to use as identifier")
+ return None, None
+ def _inject_alert_expr_labels(self, rules: Dict[str, Any]) -> Dict[str, Any]:
+ """Iterate through alert rules and inject topology into expressions.
+ Args:
+ rules: a dict of alert rules
+ """
+ if "groups" not in rules:
+ return rules
+ modified_groups = []
+ for group in rules["groups"]:
+ # Copy off rules, so we don't modify an object we're iterating over
+ rules_copy = group["rules"]
+ for idx, rule in enumerate(rules_copy):
+ labels = rule.get("labels")
+ if labels:
+ try:
+ topology = JujuTopology(
+ # Don't try to safely get required constructor fields. There's already
+ # a handler for KeyErrors
+ model_uuid=labels["juju_model_uuid"],
+ model=labels["juju_model"],
+ application=labels["juju_application"],
+ unit=labels.get("juju_unit", ""),
+ charm_name=labels.get("juju_charm", ""),
+ )
+ # Inject topology and put it back in the list
+ rule["expr"] = self._tool.inject_label_matchers(
+ re.sub(r"%%juju_topology%%,?", "", rule["expr"]),
+ topology.label_matcher_dict,
+ )
+ except KeyError:
+ # Some required JujuTopology key is missing. Just move on.
+ pass
+ group["rules"][idx] = rule
+ modified_groups.append(group)
+ rules["groups"] = modified_groups
+ return rules
+class ConsumerBase(Object):
+ """Consumer's base class."""
+ def __init__(
+ self,
+ charm: CharmBase,
+ relation_name: str = DEFAULT_RELATION_NAME,
+ alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
+ recursive: bool = False,
+ skip_alert_topology_labeling: bool = False,
+ ):
+ super().__init__(charm, relation_name)
+ self._charm = charm
+ self._relation_name = relation_name
+ self.topology = JujuTopology.from_charm(charm)
+ try:
+ alert_rules_path = _resolve_dir_against_charm_path(charm, alert_rules_path)
+ except InvalidAlertRulePathError as e:
+ logger.debug(
+ "Invalid Loki alert rules folder at %s: %s",
+ e.alert_rules_absolute_path,
+ e.message,
+ )
+ self._alert_rules_path = alert_rules_path
+ self._skip_alert_topology_labeling = skip_alert_topology_labeling
+ self._recursive = recursive
+ def _handle_alert_rules(self, relation):
+ if not self._charm.unit.is_leader():
+ return
+ alert_rules = (
+ AlertRules(None) if self._skip_alert_topology_labeling else AlertRules(self.topology)
+ )
+ alert_rules.add_path(self._alert_rules_path, recursive=self._recursive)
+ alert_rules_as_dict = alert_rules.as_dict()
+ relation.data[self._charm.app]["metadata"] = json.dumps(self.topology.as_dict())
+ relation.data[self._charm.app]["alert_rules"] = json.dumps(
+ alert_rules_as_dict,
+ sort_keys=True, # sort, to prevent unnecessary relation_changed events
+ )
+ @property
+ def loki_endpoints(self) -> List[dict]:
+ """Fetch Loki Push API endpoints sent from LokiPushApiProvider through relation data.
+ Returns:
+ A list of dictionaries with Loki Push API endpoints, for instance:
+ [
+ {"url": "http://loki1:3100/loki/api/v1/push"},
+ {"url": "http://loki2:3100/loki/api/v1/push"},
+ ]
+ """
+ endpoints = [] # type: list
+ for relation in self._charm.model.relations[self._relation_name]:
+ for unit in relation.units:
+ if unit.app == self._charm.app:
+ # This is a peer unit
+ continue
+ endpoint = relation.data[unit].get("endpoint")
+ if endpoint:
+ deserialized_endpoint = json.loads(endpoint)
+ endpoints.append(deserialized_endpoint)
+ return endpoints
+class LokiPushApiConsumer(ConsumerBase):
+ """Loki Consumer class."""
+ on = LokiPushApiEvents() # pyright: ignore
+ def __init__(
+ self,
+ charm: CharmBase,
+ relation_name: str = DEFAULT_RELATION_NAME,
+ alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
+ recursive: bool = True,
+ skip_alert_topology_labeling: bool = False,
+ ):
+ """Construct a Loki charm client.
+ The `LokiPushApiConsumer` object provides configurations to a Loki client charm, such as
+ the Loki API endpoint to push logs. It is intended for workloads that can speak
+ loki_push_api (https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki), such
+ as grafana-agent.
+ (If you need to forward workload stdout logs, then use LogForwarder; if you need to forward
+ log files, then use LogProxyConsumer.)
+ `LokiPushApiConsumer` can be instantiated as follows:
+ self._loki_consumer = LokiPushApiConsumer(self)
+ Args:
+ charm: a `CharmBase` object that manages this `LokiPushApiConsumer` object.
+ Typically, this is `self` in the instantiating class.
+ relation_name: the string name of the relation interface to look up.
+ If `charm` has exactly one relation with this interface, the relation's
+ name is returned. If none or multiple relations with the provided interface
+ are found, this method will raise either a NoRelationWithInterfaceFoundError or
+ MultipleRelationsWithInterfaceFoundError exception, respectively.
+ alert_rules_path: a string indicating a path where alert rules can be found
+ recursive: Whether to scan for rule files recursively.
+ skip_alert_topology_labeling: whether to skip the alert topology labeling.
+ Raises:
+ RelationNotFoundError: If there is no relation in the charm's metadata.yaml
+ with the same name as provided via `relation_name` argument.
+ RelationInterfaceMismatchError: The relation with the same name as provided
+ via `relation_name` argument does not have the `loki_push_api` relation
+ interface.
+ RelationRoleMismatchError: If the relation with the same name as provided
+ via `relation_name` argument does not have the `RelationRole.provides`
+ role.
+ Emits:
+ loki_push_api_endpoint_joined: This event is emitted when the relation between the
+ Charmed Operator that instantiates `LokiPushApiProvider` (Loki charm for instance)
+ and the Charmed Operator that instantiates `LokiPushApiConsumer` is established.
+ loki_push_api_endpoint_departed: This event is emitted when the relation between the
+ Charmed Operator that implements `LokiPushApiProvider` (Loki charm for instance)
+ and the Charmed Operator that implements `LokiPushApiConsumer` is removed.
+ loki_push_api_alert_rules_error: This event is emitted when an invalid alert rules
+ file is encountered or if `alert_rules_path` is empty.
+ """
+ _validate_relation_by_interface_and_direction(
+ charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.requires
+ )
+ super().__init__(
+ charm, relation_name, alert_rules_path, recursive, skip_alert_topology_labeling
+ )
+ events = self._charm.on[relation_name]
+ self.framework.observe(self._charm.on.upgrade_charm, self._on_lifecycle_event)
+ self.framework.observe(events.relation_joined, self._on_logging_relation_joined)
+ self.framework.observe(events.relation_changed, self._on_logging_relation_changed)
+ self.framework.observe(events.relation_departed, self._on_logging_relation_departed)
+ def _on_lifecycle_event(self, _: HookEvent):
+ """Update require relation data on charm upgrades and other lifecycle events.
+ Args:
+ event: a `CharmEvent` in response to which the consumer
+ charm must update its relation data.
+ """
+ # Upgrade event or other charm-level event
+ self._reinitialize_alert_rules()
+ self.on.loki_push_api_endpoint_joined.emit()
+ def _on_logging_relation_joined(self, event: RelationJoinedEvent):
+ """Handle changes in related consumers.
+ Update relation data and emit events when a relation is established.
+ Args:
+ event: a `CharmEvent` in response to which the consumer
+ charm must update its relation data.
+ Emits:
+ loki_push_api_endpoint_joined: Once the relation is established, this event is emitted.
+ loki_push_api_alert_rules_error: This event is emitted when an invalid alert rules
+ file is encountered or if `alert_rules_path` is empty.
+ """
+ # Alert rules will not change over the lifecycle of a charm, and do not need to be
+ # constantly set on every relation_changed event. Leave them here.
+ self._handle_alert_rules(event.relation)
+ self.on.loki_push_api_endpoint_joined.emit()
+ def _on_logging_relation_changed(self, event: RelationEvent):
+ """Handle changes in related consumers.
+ Anytime there are changes in the relation between Loki
+ and its consumers charms.
+ Args:
+ event: a `CharmEvent` in response to which the consumer
+ charm must update its relation data.
+ Emits:
+ loki_push_api_endpoint_joined: Once the relation is established, this event is emitted.
+ loki_push_api_alert_rules_error: This event is emitted when an invalid alert rules
+ file is encountered or if `alert_rules_path` is empty.
+ """
+ if self._charm.unit.is_leader():
+ ev = json.loads(event.relation.data[event.app].get("event", "{}"))
+ if ev:
+ valid = bool(ev.get("valid", True))
+ errors = ev.get("errors", "")
+ if valid and not errors:
+ self.on.alert_rule_status_changed.emit(valid=valid)
+ else:
+ self.on.alert_rule_status_changed.emit(valid=valid, errors=errors)
+ self.on.loki_push_api_endpoint_joined.emit()
+ def _reinitialize_alert_rules(self):
+ """Reloads alert rules and updates all relations."""
+ for relation in self._charm.model.relations[self._relation_name]:
+ self._handle_alert_rules(relation)
+ def _process_logging_relation_changed(self, relation: Relation):
+ self._handle_alert_rules(relation)
+ self.on.loki_push_api_endpoint_joined.emit()
+ def _on_logging_relation_departed(self, _: RelationEvent):
+ """Handle departures in related providers.
+ Anytime there are departures in relations between the consumer charm and Loki
+ the consumer charm is informed, through a `LokiPushApiEndpointDeparted` event.
+ The consumer charm can then choose to update its configuration.
+ """
+ # Provide default to avoid throwing, as in some complicated scenarios with
+ # upgrades and hook failures we might not have data in the storage
+ self.on.loki_push_api_endpoint_departed.emit()
+class ContainerNotFoundError(Exception):
+ """Raised if the specified container does not exist."""
+ def __init__(self):
+ msg = "The specified container does not exist."
+ self.message = msg
+ super().__init__(self.message)
+class PromtailDigestError(EventBase):
+ """Event emitted when there is an error with Promtail initialization."""
+ def __init__(self, handle, message):
+ super().__init__(handle)
+ self.message = message
+ def snapshot(self):
+ """Save message information."""
+ return {"message": self.message}
+ def restore(self, snapshot):
+ """Restore message information."""
+ self.message = snapshot["message"]
+class LogProxyEndpointDeparted(EventBase):
+ """Event emitted when a Log Proxy has departed."""
+class LogProxyEndpointJoined(EventBase):
+ """Event emitted when a Log Proxy joins."""
+class LogProxyEvents(ObjectEvents):
+ """Event descriptor for events raised by `LogProxyConsumer`."""
+ promtail_digest_error = EventSource(PromtailDigestError)
+ log_proxy_endpoint_departed = EventSource(LogProxyEndpointDeparted)
+ log_proxy_endpoint_joined = EventSource(LogProxyEndpointJoined)
+class LogProxyConsumer(ConsumerBase):
+ """LogProxyConsumer class.
+ > Note: This object is deprecated. Consider migrating to LogForwarder with the release of Juju
+ > 3.6 LTS.
+ The `LogProxyConsumer` object provides a method for attaching `promtail` to
+ a workload in order to generate structured logging data from applications
+ which traditionally log to syslog or do not have native Loki integration.
+ The `LogProxyConsumer` can be instantiated as follows:
+ self._log_proxy = LogProxyConsumer(
+ self,
+ logs_scheme={
+ "workload-a": {
+ "log-files": ["/tmp/worload-a-1.log", "/tmp/worload-a-2.log"],
+ "syslog-port": 1514,
+ },
+ "workload-b": {"log-files": ["/tmp/worload-b.log"], "syslog-port": 1515},
+ },
+ relation_name="log-proxy",
+ )
+ Args:
+ charm: a `CharmBase` object that manages this `LokiPushApiConsumer` object.
+ Typically, this is `self` in the instantiating class.
+ logs_scheme: a dict which maps containers and a list of log files and syslog port.
+ relation_name: the string name of the relation interface to look up.
+ If `charm` has exactly one relation with this interface, the relation's
+ name is returned. If none or multiple relations with the provided interface
+ are found, this method will raise either a NoRelationWithInterfaceFoundError or
+ MultipleRelationsWithInterfaceFoundError exception, respectively.
+ containers_syslog_port: a dict which maps (and enable) containers and syslog port.
+ alert_rules_path: an optional path for the location of alert rules
+ files. Defaults to "./src/loki_alert_rules",
+ resolved from the directory hosting the charm entry file.
+ The alert rules are automatically updated on charm upgrade.
+ recursive: Whether to scan for rule files recursively.
+ promtail_resource_name: An optional promtail resource name from metadata
+ if it has been modified and attached
+ insecure_skip_verify: skip SSL verification.
+ Raises:
+ RelationNotFoundError: If there is no relation in the charm's metadata.yaml
+ with the same name as provided via `relation_name` argument.
+ RelationInterfaceMismatchError: The relation with the same name as provided
+ via `relation_name` argument does not have the `loki_push_api` relation
+ interface.
+ RelationRoleMismatchError: If the relation with the same name as provided
+ via `relation_name` argument does not have the `RelationRole.provides`
+ role.
+ """
+ on = LogProxyEvents() # pyright: ignore
+ def __init__(
+ self,
+ charm,
+ *,
+ logs_scheme=None,
+ relation_name: str = DEFAULT_LOG_PROXY_RELATION_NAME,
+ alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
+ recursive: bool = False,
+ promtail_resource_name: Optional[str] = None,
+ insecure_skip_verify: bool = False,
+ ):
+ super().__init__(charm, relation_name, alert_rules_path, recursive)
+ self._charm = charm
+ self._logs_scheme = logs_scheme or {}
+ self._relation_name = relation_name
+ self.topology = JujuTopology.from_charm(charm)
+ self._promtail_resource_name = promtail_resource_name or "promtail-bin"
+ self.insecure_skip_verify = insecure_skip_verify
+ self._promtails_ports = self._generate_promtails_ports(logs_scheme)
+ # architecture used for promtail binary
+ arch = platform.processor()
+ if arch in ["x86_64", "amd64"]:
+ self._arch = "amd64"
+ elif arch in ["aarch64", "arm64", "armv8b", "armv8l"]:
+ self._arch = "arm64"
+ else:
+ self._arch = arch
+ events = self._charm.on[relation_name]
+ self.framework.observe(events.relation_created, self._on_relation_created)
+ self.framework.observe(events.relation_changed, self._on_relation_changed)
+ self.framework.observe(events.relation_departed, self._on_relation_departed)
+ self._observe_pebble_ready()
+ def _observe_pebble_ready(self):
+ for container in self._containers.keys():
+ snake_case_container_name = container.replace("-", "_")
+ self.framework.observe(
+ getattr(self._charm.on, f"{snake_case_container_name}_pebble_ready"),
+ self._on_pebble_ready,
+ )
+ def _on_pebble_ready(self, event: WorkloadEvent):
+ """Event handler for `pebble_ready`."""
+ if self.model.relations[self._relation_name]:
+ self._setup_promtail(event.workload)
+ def _on_relation_created(self, _: RelationCreatedEvent) -> None:
+ """Event handler for `relation_created`."""
+ for container in self._containers.values():
+ if container.can_connect():
+ self._setup_promtail(container)
+ def _on_relation_changed(self, event: RelationEvent) -> None:
+ """Event handler for `relation_changed`.
+ Args:
+ event: The event object `RelationChangedEvent`.
+ """
+ self._handle_alert_rules(event.relation)
+ if self._charm.unit.is_leader():
+ ev = json.loads(event.relation.data[event.app].get("event", "{}"))
+ if ev:
+ valid = bool(ev.get("valid", True))
+ errors = ev.get("errors", "")
+ if valid and not errors:
+ self.on.alert_rule_status_changed.emit(valid=valid)
+ else:
+ self.on.alert_rule_status_changed.emit(valid=valid, errors=errors)
+ for container in self._containers.values():
+ if not container.can_connect():
+ continue
+ if self.model.relations[self._relation_name]:
+ if "promtail" not in container.get_plan().services:
+ self._setup_promtail(container)
+ continue
+ new_config = self._promtail_config(container.name)
+ if new_config != self._current_config(container):
+ container.push(
+ WORKLOAD_CONFIG_PATH, yaml.safe_dump(new_config), make_dirs=True
+ )
+ # Loki may send endpoints late. Don't necessarily start, there may be
+ # no clients
+ if new_config["clients"]:
+ container.restart(WORKLOAD_SERVICE_NAME)
+ self.on.log_proxy_endpoint_joined.emit()
+ else:
+ self.on.promtail_digest_error.emit("No promtail client endpoints available!")
+ def _on_relation_departed(self, _: RelationEvent) -> None:
+ """Event handler for `relation_departed`.
+ Args:
+ event: The event object `RelationDepartedEvent`.
+ """
+ for container in self._containers.values():
+ if not container.can_connect():
+ continue
+ if not self._charm.model.relations[self._relation_name]:
+ container.stop(WORKLOAD_SERVICE_NAME)
+ continue
+ new_config = self._promtail_config(container.name)
+ if new_config != self._current_config(container):
+ container.push(WORKLOAD_CONFIG_PATH, yaml.safe_dump(new_config), make_dirs=True)
+ if new_config["clients"]:
+ container.restart(WORKLOAD_SERVICE_NAME)
+ else:
+ container.stop(WORKLOAD_SERVICE_NAME)
+ self.on.log_proxy_endpoint_departed.emit()
+ def _add_pebble_layer(self, workload_binary_path: str, container: Container) -> None:
+ """Adds Pebble layer that manages Promtail service in Workload container.
+ Args:
+ workload_binary_path: string providing path to promtail binary in workload container.
+ container: container into which the layer is to be added.
+ """
+ pebble_layer = Layer(
+ {
+ "summary": "promtail layer",
+ "description": "pebble config layer for promtail",
+ "services": {
+ "override": "replace",
+ "command": f"{workload_binary_path} {self._cli_args}",
+ "startup": "disabled",
+ }
+ },
+ }
+ )
+ container.add_layer(container.name, pebble_layer, combine=True)
+ def _create_directories(self, container: Container) -> None:
+ """Creates the directories for Promtail binary and config file."""
+ container.make_dir(path=WORKLOAD_BINARY_DIR, make_parents=True)
+ container.make_dir(path=WORKLOAD_CONFIG_DIR, make_parents=True)
+ def _obtain_promtail(self, promtail_info: dict, container: Container) -> None:
+ """Obtain promtail binary from an attached resource or download it.
+ Args:
+ promtail_info: dictionary containing information about promtail binary
+ that must be used. The dictionary must have three keys
+ - "filename": filename of promtail binary
+ - "zipsha": sha256 sum of zip file of promtail binary
+ - "binsha": sha256 sum of unpacked promtail binary
+ container: container into which promtail is to be obtained.
+ """
+ workload_binary_path = os.path.join(WORKLOAD_BINARY_DIR, promtail_info["filename"])
+ if self._promtail_attached_as_resource:
+ self._push_promtail_if_attached(container, workload_binary_path)
+ return
+ if self._promtail_must_be_downloaded(promtail_info):
+ self._download_and_push_promtail_to_workload(container, promtail_info)
+ else:
+ binary_path = os.path.join(BINARY_DIR, promtail_info["filename"])
+ self._push_binary_to_workload(container, binary_path, workload_binary_path)
+ def _push_binary_to_workload(
+ self, container: Container, binary_path: str, workload_binary_path: str
+ ) -> None:
+ """Push promtail binary into workload container.
+ Args:
+ binary_path: path in charm container from which promtail binary is read.
+ workload_binary_path: path in workload container to which promtail binary is pushed.
+ container: container into which promtail is to be uploaded.
+ """
+ with open(binary_path, "rb") as f:
+ container.push(workload_binary_path, f, permissions=0o755, make_dirs=True)
+ logger.debug("The promtail binary file has been pushed to the workload container.")
+ @property
+ def _promtail_attached_as_resource(self) -> bool:
+ """Checks whether Promtail binary is attached to the charm or not.
+ Returns:
+ a boolean representing whether Promtail binary is attached as a resource or not.
+ """
+ try:
+ self._charm.model.resources.fetch(self._promtail_resource_name)
+ return True
+ except ModelError:
+ return False
+ except NameError as e:
+ if "invalid resource name" in str(e):
+ return False
+ raise
+ def _push_promtail_if_attached(self, container: Container, workload_binary_path: str) -> bool:
+ """Checks whether Promtail binary is attached to the charm or not.
+ Args:
+ workload_binary_path: string specifying expected path of promtail
+ in workload container
+ container: container into which promtail is to be pushed.
+ Returns:
+ a boolean representing whether Promtail binary is attached or not.
+ """
+ logger.info("Promtail binary file has been obtained from an attached resource.")
+ resource_path = self._charm.model.resources.fetch(self._promtail_resource_name)
+ self._push_binary_to_workload(container, resource_path, workload_binary_path)
+ return True
+ def _promtail_must_be_downloaded(self, promtail_info: dict) -> bool:
+ """Checks whether promtail binary must be downloaded or not.
+ Args:
+ promtail_info: dictionary containing information about promtail binary
+ that must be used. The dictionary must have three keys
+ - "filename": filename of promtail binary
+ - "zipsha": sha256 sum of zip file of promtail binary
+ - "binsha": sha256 sum of unpacked promtail binary
+ Returns:
+ a boolean representing whether Promtail binary must be downloaded or not.
+ """
+ binary_path = os.path.join(BINARY_DIR, promtail_info["filename"])
+ if not self._is_promtail_binary_in_charm(binary_path):
+ return True
+ if not self._sha256sums_matches(binary_path, promtail_info["binsha"]):
+ return True
+ logger.debug("Promtail binary file is already in the the charm container.")
+ return False
+ def _sha256sums_matches(self, file_path: str, sha256sum: str) -> bool:
+ """Checks whether a file's sha256sum matches or not with a specific sha256sum.
+ Args:
+ file_path: A string representing the files' patch.
+ sha256sum: The sha256sum against which we want to verify.
+ Returns:
+ a boolean representing whether a file's sha256sum matches or not with
+ a specific sha256sum.
+ """
+ try:
+ with open(file_path, "rb") as f:
+ file_bytes = f.read()
+ result = sha256(file_bytes).hexdigest()
+ if result != sha256sum:
+ msg = "File sha256sum mismatch, expected:'{}' but got '{}'".format(
+ sha256sum, result
+ )
+ logger.debug(msg)
+ return False
+ return True
+ except (APIError, FileNotFoundError):
+ msg = "File: '{}' could not be opened".format(file_path)
+ logger.error(msg)
+ return False
+ def _is_promtail_binary_in_charm(self, binary_path: str) -> bool:
+ """Check if Promtail binary is already stored in charm container.
+ Args:
+ binary_path: string path of promtail binary to check
+ Returns:
+ a boolean representing whether Promtail is present or not.
+ """
+ return True if Path(binary_path).is_file() else False
+ def _download_and_push_promtail_to_workload(
+ self, container: Container, promtail_info: dict
+ ) -> None:
+ """Downloads a Promtail zip file and pushes the binary to the workload.
+ Args:
+ promtail_info: dictionary containing information about promtail binary
+ that must be used. The dictionary must have three keys
+ - "filename": filename of promtail binary
+ - "zipsha": sha256 sum of zip file of promtail binary
+ - "binsha": sha256 sum of unpacked promtail binary
+ container: container into which promtail is to be uploaded.
+ """
+ # Check for Juju proxy variables and fall back to standard ones if not set
+ # If no Juju proxy variable was set, we set proxies to None to let the ProxyHandler get
+ # the proxy env variables from the environment
+ proxies = {
+ # The ProxyHandler uses only the protocol names as keys
+ # https://docs.python.org/3/library/urllib.request.html#urllib.request.ProxyHandler
+ "https": os.environ.get("JUJU_CHARM_HTTPS_PROXY", ""),
+ "http": os.environ.get("JUJU_CHARM_HTTP_PROXY", ""),
+ # The ProxyHandler uses `no` for the no_proxy key
+ # https://github.com/python/cpython/blob/3.12/Lib/urllib/request.py#L2553
+ "no": os.environ.get("JUJU_CHARM_NO_PROXY", ""),
+ }
+ proxies = {k: v for k, v in proxies.items() if v != ""} or None
+ proxy_handler = request.ProxyHandler(proxies)
+ opener = request.build_opener(proxy_handler)
+ with opener.open(promtail_info["url"]) as r:
+ file_bytes = r.read()
+ file_path = os.path.join(BINARY_DIR, promtail_info["filename"] + ".gz")
+ with open(file_path, "wb") as f:
+ f.write(file_bytes)
+ logger.info(
+ "Promtail binary zip file has been downloaded and stored in: %s",
+ file_path,
+ )
+ decompressed_file = GzipFile(fileobj=BytesIO(file_bytes))
+ binary_path = os.path.join(BINARY_DIR, promtail_info["filename"])
+ with open(binary_path, "wb") as outfile:
+ outfile.write(decompressed_file.read())
+ logger.debug("Promtail binary file has been downloaded.")
+ workload_binary_path = os.path.join(WORKLOAD_BINARY_DIR, promtail_info["filename"])
+ self._push_binary_to_workload(container, binary_path, workload_binary_path)
+ @property
+ def _cli_args(self) -> str:
+ """Return the cli arguments to pass to promtail.
+ Returns:
+ The arguments as a string
+ """
+ return "-config.file={}".format(WORKLOAD_CONFIG_PATH)
+ def _current_config(self, container) -> dict:
+ """Property that returns the current Promtail configuration.
+ Returns:
+ A dict containing Promtail configuration.
+ """
+ if not container.can_connect():
+ logger.debug("Could not connect to promtail container!")
+ return {}
+ try:
+ raw_current = container.pull(WORKLOAD_CONFIG_PATH).read()
+ return yaml.safe_load(raw_current)
+ except (ProtocolError, PathError) as e:
+ logger.warning(
+ "Could not check the current promtail configuration due to "
+ "a failure in retrieving the file: %s",
+ e,
+ )
+ return {}
+ def _promtail_config(self, container_name: str) -> dict:
+ """Generates the config file for Promtail.
+ Reference: https://grafana.com/docs/loki/latest/send-data/promtail/configuration
+ """
+ config = {"clients": self._clients_list()}
+ if self.insecure_skip_verify:
+ for client in config["clients"]:
+ client["tls_config"] = {"insecure_skip_verify": True}
+ config.update(self._server_config(container_name))
+ config.update(self._positions)
+ config.update(self._scrape_configs(container_name))
+ return config
+ def _clients_list(self) -> list:
+ """Generates a list of clients for use in the promtail config.
+ Returns:
+ A list of endpoints
+ """
+ return self.loki_endpoints
+ def _server_config(self, container_name: str) -> dict:
+ """Generates the server section of the Promtail config file.
+ Returns:
+ A dict representing the `server` section.
+ """
+ return {
+ "server": {
+ "http_listen_port": self._promtails_ports[container_name]["http_listen_port"],
+ "grpc_listen_port": self._promtails_ports[container_name]["grpc_listen_port"],
+ }
+ }
+ @property
+ def _positions(self) -> dict:
+ """Generates the positions section of the Promtail config file.
+ Returns:
+ A dict representing the `positions` section.
+ """
+ return {"positions": {"filename": WORKLOAD_POSITIONS_PATH}}
+ def _scrape_configs(self, container_name: str) -> dict:
+ """Generates the scrape_configs section of the Promtail config file.
+ Returns:
+ A dict representing the `scrape_configs` section.
+ """
+ job_name = f"juju_{self.topology.identifier}"
+ # The new JujuTopology doesn't include unit, but LogProxyConsumer should have it
+ common_labels = {
+ f"juju_{k}": v
+ for k, v in self.topology.as_dict(remapped_keys={"charm_name": "charm"}).items()
+ }
+ common_labels["container"] = container_name
+ scrape_configs = []
+ # Files config
+ labels = common_labels.copy()
+ labels.update(
+ {
+ "job": job_name,
+ "__path__": "",
+ }
+ )
+ config = {"targets": ["localhost"], "labels": labels}
+ scrape_config = {
+ "job_name": "system",
+ "static_configs": self._generate_static_configs(config, container_name),
+ }
+ scrape_configs.append(scrape_config)
+ # Syslog config
+ syslog_port = self._logs_scheme.get(container_name, {}).get("syslog-port")
+ if syslog_port:
+ relabel_mappings = [
+ "severity",
+ "facility",
+ "hostname",
+ "app_name",
+ "proc_id",
+ "msg_id",
+ ]
+ syslog_labels = common_labels.copy()
+ syslog_labels.update({"job": f"{job_name}_syslog"})
+ syslog_config = {
+ "job_name": "syslog",
+ "syslog": {
+ "listen_address": f"{syslog_port}",
+ "label_structured_data": True,
+ "labels": syslog_labels,
+ },
+ "relabel_configs": [
+ {"source_labels": [f"__syslog_message_{val}"], "target_label": val}
+ for val in relabel_mappings
+ ]
+ + [{"action": "labelmap", "regex": "__syslog_message_sd_(.+)"}],
+ }
+ scrape_configs.append(syslog_config) # type: ignore
+ return {"scrape_configs": scrape_configs}
+ def _generate_static_configs(self, config: dict, container_name: str) -> list:
+ """Generates static_configs section.
+ Returns:
+ - a list of dictionaries representing static_configs section
+ """
+ static_configs = []
+ for _file in self._logs_scheme.get(container_name, {}).get("log-files", []):
+ conf = deepcopy(config)
+ conf["labels"]["__path__"] = _file
+ static_configs.append(conf)
+ return static_configs
+ def _setup_promtail(self, container: Container) -> None:
+ # Use the first
+ relations = self._charm.model.relations[self._relation_name]
+ if len(relations) > 1:
+ logger.debug(
+ "Multiple log_proxy relations. Getting Promtail from application {}".format(
+ relations[0].app.name
+ )
+ )
+ relation = relations[0]
+ promtail_binaries = json.loads(
+ relation.data[relation.app].get("promtail_binary_zip_url", "{}")
+ )
+ if not promtail_binaries:
+ return
+ self._create_directories(container)
+ self._ensure_promtail_binary(promtail_binaries, container)
+ container.push(
+ yaml.safe_dump(self._promtail_config(container.name)),
+ make_dirs=True,
+ )
+ workload_binary_path = os.path.join(
+ WORKLOAD_BINARY_DIR, promtail_binaries[self._arch]["filename"]
+ )
+ self._add_pebble_layer(workload_binary_path, container)
+ if self._current_config(container).get("clients"):
+ try:
+ container.restart(WORKLOAD_SERVICE_NAME)
+ except ChangeError as e:
+ self.on.promtail_digest_error.emit(str(e))
+ else:
+ self.on.log_proxy_endpoint_joined.emit()
+ else:
+ self.on.promtail_digest_error.emit("No promtail client endpoints available!")
+ def _ensure_promtail_binary(self, promtail_binaries: dict, container: Container):
+ if self._is_promtail_installed(promtail_binaries[self._arch], container):
+ return
+ try:
+ self._obtain_promtail(promtail_binaries[self._arch], container)
+ except URLError as e:
+ msg = f"Promtail binary couldn't be downloaded - {str(e)}"
+ logger.warning(msg)
+ self.on.promtail_digest_error.emit(msg)
+ def _is_promtail_installed(self, promtail_info: dict, container: Container) -> bool:
+ """Determine if promtail has already been installed to the container.
+ Args:
+ promtail_info: dictionary containing information about promtail binary
+ that must be used. The dictionary must at least contain a key
+ "filename" giving the name of promtail binary
+ container: container in which to check whether promtail is installed.
+ """
+ workload_binary_path = f"{WORKLOAD_BINARY_DIR}/{promtail_info['filename']}"
+ try:
+ container.list_files(workload_binary_path)
+ except (APIError, FileNotFoundError):
+ return False
+ return True
+ def _generate_promtails_ports(self, logs_scheme) -> dict:
+ return {
+ container: {
+ "http_listen_port": HTTP_LISTEN_PORT_START + 2 * i,
+ "grpc_listen_port": GRPC_LISTEN_PORT_START + 2 * i,
+ }
+ for i, container in enumerate(logs_scheme.keys())
+ }
+ def syslog_port(self, container_name: str) -> str:
+ """Gets the port on which promtail is listening for syslog in this container.
+ Returns:
+ A str representing the port
+ """
+ return str(self._logs_scheme.get(container_name, {}).get("syslog-port"))
+ def rsyslog_config(self, container_name: str) -> str:
+ """Generates a config line for use with rsyslog.
+ Returns:
+ The rsyslog config line as a string
+ """
+ return 'action(type="omfwd" protocol="tcp" target="" port="{}" Template="RSYSLOG_SyslogProtocol23Format" TCP_Framing="octet-counted")'.format(
+ self._logs_scheme.get(container_name, {}).get("syslog-port")
+ )
+ @property
+ def _containers(self) -> Dict[str, Container]:
+ return {cont: self._charm.unit.get_container(cont) for cont in self._logs_scheme.keys()}
+class _PebbleLogClient:
+ @staticmethod
+ def check_juju_version() -> bool:
+ """Make sure the Juju version supports Log Forwarding."""
+ juju_version = JujuVersion.from_environ()
+ if not juju_version > JujuVersion(version=str("3.3")):
+ msg = f"Juju version {juju_version} does not support Pebble log forwarding. Juju >= 3.4 is needed."
+ logger.warning(msg)
+ return False
+ return True
+ @staticmethod
+ def _build_log_target(
+ unit_name: str, loki_endpoint: str, topology: JujuTopology, enable: bool
+ ) -> Dict:
+ """Build a log target for the log forwarding Pebble layer.
+ Log target's syntax for enabling/disabling forwarding is explained here:
+ https://github.com/canonical/pebble?tab=readme-ov-file#log-forwarding
+ """
+ services_value = ["all"] if enable else ["-all"]
+ log_target = {
+ "override": "replace",
+ "services": services_value,
+ "type": "loki",
+ "location": loki_endpoint,
+ }
+ if enable:
+ log_target.update(
+ {
+ "labels": {
+ "product": "Juju",
+ "charm": topology._charm_name,
+ "juju_model": topology._model,
+ "juju_model_uuid": topology._model_uuid,
+ "juju_application": topology._application,
+ "juju_unit": topology._unit,
+ },
+ }
+ )
+ return {unit_name: log_target}
+ @staticmethod
+ def _build_log_targets(
+ loki_endpoints: Optional[Dict[str, str]], topology: JujuTopology, enable: bool
+ ):
+ """Build all the targets for the log forwarding Pebble layer."""
+ targets = {}
+ if not loki_endpoints:
+ return targets
+ for unit_name, endpoint in loki_endpoints.items():
+ targets.update(
+ _PebbleLogClient._build_log_target(
+ unit_name=unit_name,
+ loki_endpoint=endpoint,
+ topology=topology,
+ enable=enable,
+ )
+ )
+ return targets
+ @staticmethod
+ def disable_inactive_endpoints(
+ container: Container, active_endpoints: Dict[str, str], topology: JujuTopology
+ ):
+ """Disable forwarding for inactive endpoints by checking against the Pebble plan."""
+ pebble_layer = container.get_plan().to_dict().get("log-targets", None)
+ if not pebble_layer:
+ return
+ for unit_name, target in pebble_layer.items():
+ # If the layer is a disabled log forwarding endpoint, skip it
+ if "-all" in target["services"]: # pyright: ignore
+ continue
+ if unit_name not in active_endpoints:
+ layer = Layer(
+ { # pyright: ignore
+ "log-targets": _PebbleLogClient._build_log_targets(
+ loki_endpoints={unit_name: "(removed)"},
+ topology=topology,
+ enable=False,
+ )
+ }
+ )
+ container.add_layer(f"{container.name}-log-forwarding", layer=layer, combine=True)
+ @staticmethod
+ def enable_endpoints(
+ container: Container, active_endpoints: Dict[str, str], topology: JujuTopology
+ ):
+ """Enable forwarding for the specified Loki endpoints."""
+ layer = Layer(
+ { # pyright: ignore
+ "log-targets": _PebbleLogClient._build_log_targets(
+ loki_endpoints=active_endpoints,
+ topology=topology,
+ enable=True,
+ )
+ }
+ )
+ container.add_layer(f"{container.name}-log-forwarding", layer, combine=True)
+class LogForwarder(ConsumerBase):
+ """Forward the standard outputs of all workloads operated by a charm to one or multiple Loki endpoints.
+ This class implements Pebble log forwarding. Juju >= 3.4 is needed.
+ """
+ def __init__(
+ self,
+ charm: CharmBase,
+ *,
+ relation_name: str = DEFAULT_RELATION_NAME,
+ alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
+ recursive: bool = True,
+ skip_alert_topology_labeling: bool = False,
+ ):
+ _PebbleLogClient.check_juju_version()
+ super().__init__(
+ charm, relation_name, alert_rules_path, recursive, skip_alert_topology_labeling
+ )
+ self._charm = charm
+ self._relation_name = relation_name
+ on = self._charm.on[self._relation_name]
+ self.framework.observe(on.relation_joined, self._update_logging)
+ self.framework.observe(on.relation_changed, self._update_logging)
+ self.framework.observe(on.relation_departed, self._update_logging)
+ self.framework.observe(on.relation_broken, self._update_logging)
+ for container_name in self._charm.meta.containers.keys():
+ snake_case_container_name = container_name.replace("-", "_")
+ self.framework.observe(
+ getattr(self._charm.on, f"{snake_case_container_name}_pebble_ready"),
+ self._on_pebble_ready,
+ )
+ def _on_pebble_ready(self, event: PebbleReadyEvent):
+ if not (loki_endpoints := self._retrieve_endpoints_from_relation()):
+ logger.warning("No Loki endpoints available")
+ return
+ self._update_endpoints(event.workload, loki_endpoints)
+ def _update_logging(self, _):
+ """Update the log forwarding to match the active Loki endpoints."""
+ if not (loki_endpoints := self._retrieve_endpoints_from_relation()):
+ logger.warning("No Loki endpoints available")
+ return
+ for container in self._charm.unit.containers.values():
+ self._update_endpoints(container, loki_endpoints)
+ def _retrieve_endpoints_from_relation(self) -> dict:
+ loki_endpoints = {}
+ # Get the endpoints from relation data
+ for relation in self._charm.model.relations[self._relation_name]:
+ loki_endpoints.update(self._fetch_endpoints(relation))
+ return loki_endpoints
+ def _update_endpoints(self, container: Container, loki_endpoints: dict):
+ _PebbleLogClient.disable_inactive_endpoints(
+ container=container,
+ active_endpoints=loki_endpoints,
+ topology=self.topology,
+ )
+ _PebbleLogClient.enable_endpoints(
+ container=container, active_endpoints=loki_endpoints, topology=self.topology
+ )
+ def is_ready(self, relation: Optional[Relation] = None):
+ """Check if the relation is active and healthy."""
+ if not relation:
+ relations = self._charm.model.relations[self._relation_name]
+ if not relations:
+ return False
+ return all(self.is_ready(relation) for relation in relations)
+ try:
+ if self._extract_urls(relation):
+ return True
+ return False
+ except (KeyError, json.JSONDecodeError):
+ return False
+ def _extract_urls(self, relation: Relation) -> Dict[str, str]:
+ """Default getter function to extract Loki endpoints from a relation.
+ Returns:
+ A dictionary of remote units and the respective Loki endpoint.
+ {
+ "loki/0": "http://loki:3100/loki/api/v1/push",
+ "another-loki/0": "http://another-loki:3100/loki/api/v1/push",
+ }
+ """
+ endpoints: Dict = {}
+ for unit in relation.units:
+ endpoint = relation.data[unit]["endpoint"]
+ deserialized_endpoint = json.loads(endpoint)
+ url = deserialized_endpoint["url"]
+ endpoints[unit.name] = url
+ return endpoints
+ def _fetch_endpoints(self, relation: Relation) -> Dict[str, str]:
+ """Fetch Loki Push API endpoints from relation data using the endpoints getter."""
+ endpoints: Dict = {}
+ if not self.is_ready(relation):
+ logger.warning(f"The relation '{relation.name}' is not ready yet.")
+ return endpoints
+ # if the code gets here, the function won't raise anymore because it's
+ # also called in is_ready()
+ endpoints = self._extract_urls(relation)
+ return endpoints
+class CosTool:
+ """Uses cos-tool to inject label matchers into alert rule expressions and validate rules."""
+ _path = None
+ _disabled = False
+ def __init__(self, charm):
+ self._charm = charm
+ @property
+ def path(self):
+ """Lazy lookup of the path of cos-tool."""
+ if self._disabled:
+ return None
+ if not self._path:
+ self._path = self._get_tool_path()
+ if not self._path:
+ logger.debug("Skipping injection of juju topology as label matchers")
+ self._disabled = True
+ return self._path
+ def apply_label_matchers(self, rules) -> dict:
+ """Will apply label matchers to the expression of all alerts in all supplied groups."""
+ if not self.path:
+ return rules
+ for group in rules["groups"]:
+ rules_in_group = group.get("rules", [])
+ for rule in rules_in_group:
+ topology = {}
+ # if the user for some reason has provided juju_unit, we'll need to honor it
+ # in most cases, however, this will be empty
+ for label in [
+ "juju_model",
+ "juju_model_uuid",
+ "juju_application",
+ "juju_charm",
+ "juju_unit",
+ ]:
+ if label in rule["labels"]:
+ topology[label] = rule["labels"][label]
+ rule["expr"] = self.inject_label_matchers(rule["expr"], topology)
+ return rules
+ def validate_alert_rules(self, rules: dict) -> Tuple[bool, str]:
+ """Will validate correctness of alert rules, returning a boolean and any errors."""
+ if not self.path:
+ logger.debug("`cos-tool` unavailable. Not validating alert correctness.")
+ return True, ""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ rule_path = Path(tmpdir + "/validate_rule.yaml")
+ # Smash "our" rules format into what upstream actually uses, which is more like:
+ #
+ # groups:
+ # - name: foo
+ # rules:
+ # - alert: SomeAlert
+ # expr: up
+ # - alert: OtherAlert
+ # expr: up
+ transformed_rules = {"groups": []} # type: ignore
+ for rule in rules["groups"]:
+ transformed_rules["groups"].append(rule)
+ rule_path.write_text(yaml.dump(transformed_rules))
+ args = [str(self.path), "--format", "logql", "validate", str(rule_path)]
+ # noinspection PyBroadException
+ try:
+ self._exec(args)
+ return True, ""
+ except subprocess.CalledProcessError as e:
+ logger.debug("Validating the rules failed: %s", e.output)
+ return False, ", ".join([line for line in e.output if "error validating" in line])
+ def inject_label_matchers(self, expression, topology) -> str:
+ """Add label matchers to an expression."""
+ if not topology:
+ return expression
+ if not self.path:
+ logger.debug("`cos-tool` unavailable. Leaving expression unchanged: %s", expression)
+ return expression
+ args = [str(self.path), "--format", "logql", "transform"]
+ args.extend(
+ ["--label-matcher={}={}".format(key, value) for key, value in topology.items()]
+ )
+ args.extend(["{}".format(expression)])
+ # noinspection PyBroadException
+ try:
+ return self._exec(args)
+ except subprocess.CalledProcessError as e:
+ logger.debug('Applying the expression failed: "%s", falling back to the original', e)
+ print('Applying the expression failed: "{}", falling back to the original'.format(e))
+ return expression
+ def _get_tool_path(self) -> Optional[Path]:
+ arch = platform.processor()
+ arch = "amd64" if arch == "x86_64" else arch
+ res = "cos-tool-{}".format(arch)
+ try:
+ path = Path(res).resolve()
+ path.chmod(0o777)
+ return path
+ except NotImplementedError:
+ logger.debug("System lacks support for chmod")
+ except FileNotFoundError:
+ logger.debug('Could not locate cos-tool at: "{}"'.format(res))
+ return None
+ def _exec(self, cmd) -> str:
+ result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE)
+ output = result.stdout.decode("utf-8").strip()
+ return output
diff --git a/charms/kfp-api/metadata.yaml b/charms/kfp-api/metadata.yaml
index 3442bcd3..110e0ec6 100755
--- a/charms/kfp-api/metadata.yaml
+++ b/charms/kfp-api/metadata.yaml
@@ -68,6 +68,9 @@ requires:
- service-port
versions: [v1]
__schema_source: https://raw.githubusercontent.com/canonical/operator-schemas/master/k8s-service.yaml
+ logging:
+ interface: loki_push_api
+ optional: true
interface: k8s-service
diff --git a/charms/kfp-api/requirements-integration.in b/charms/kfp-api/requirements-integration.in
index 512c9a8c..686cf140 100644
--- a/charms/kfp-api/requirements-integration.in
+++ b/charms/kfp-api/requirements-integration.in
@@ -4,3 +4,5 @@ pytest-operator
+# This is required due to the abstraction of cos integration
diff --git a/charms/kfp-api/requirements-integration.txt b/charms/kfp-api/requirements-integration.txt
index eafe65a2..ed7b23f1 100644
--- a/charms/kfp-api/requirements-integration.txt
+++ b/charms/kfp-api/requirements-integration.txt
@@ -4,35 +4,42 @@
# pip-compile requirements-integration.in
+ # via httpx
# via stack-data
# via
+ # jsonschema
# outcome
# trio
# via ipython
# via paramiko
# via selenium-wire
# via selenium-wire
# via google-auth
# via
+ # httpcore
+ # httpx
# kubernetes
# requests
# selenium
# selenium-wire
# via
# cryptography
# pynacl
+ # via -r requirements-integration.in
# via requests
# via
# paramiko
# pyopenssl
@@ -40,54 +47,76 @@ decorator==5.1.1
# via
# ipdb
# ipython
+ # via charmed-kubeflow-chisme
# via
+ # anyio
# pytest
# trio
# trio-websocket
# via stack-data
# via kubernetes
- # via wsproto
+ # via
+ # httpcore
+ # wsproto
# via selenium-wire
# via h2
+ # via httpx
+ # via lightkube
# via juju
# via
# h2
# selenium-wire
# via
+ # anyio
+ # httpx
# requests
# trio
+ # via jsonschema
# via pytest
# via pytest-operator
# via ipdb
# via ipython
- # via pytest-operator
+ # via
+ # charmed-kubeflow-chisme
+ # pytest-operator
+ # via serialized-data-interface
# via
# -r requirements-integration.in
+ # charmed-kubeflow-chisme
# pytest-operator
# via selenium-wire
# via juju
+ # via charmed-kubeflow-chisme
+ # via lightkube
# via juju
# via jinja2
# via ipython
# via typing-inspect
@@ -95,42 +124,50 @@ oauthlib==3.2.2
# via
# kubernetes
# requests-oauthlib
+ # via
+ # charmed-kubeflow-chisme
+ # serialized-data-interface
+ # via deepdiff
# via trio
- # via pytest
+ # via
+ # juju
+ # pytest
# via juju
# via jedi
# via ipython
# via ipython
+ # via jsonschema
# via pytest
# via ipython
# via macaroonbakery
# via pexpect
# via stack-data
# via
# juju
# pyasn1-modules
# rsa
# selenium-wire
# via google-auth
# via cffi
# via ipython
- # via hvac
# via macaroonbakery
@@ -138,104 +175,125 @@ pynacl==1.5.0
# macaroonbakery
# paramiko
# pymacaroons
# via selenium-wire
# via selenium-wire
# via
# juju
# macaroonbakery
+ # via jsonschema
# via
# selenium-wire
# urllib3
# via
# pytest-asyncio
# pytest-operator
# via pytest-operator
# via -r requirements-integration.in
# via kubernetes
# via pyrfc3339
# via
# juju
# kubernetes
+ # lightkube
+ # ops
# pytest-operator
+ # serialized-data-interface
# via
# hvac
# kubernetes
# macaroonbakery
# requests-oauthlib
+ # serialized-data-interface
# via kubernetes
# via google-auth
+ # via charmed-kubeflow-chisme
+ # via ruamel-yaml
# via
# -r requirements-integration.in
# selenium-wire
# via -r requirements-integration.in
+ # via charmed-kubeflow-chisme
# via
# asttokens
- # google-auth
# kubernetes
# macaroonbakery
- # paramiko
# pymacaroons
# python-dateutil
- # via trio
+ # via
+ # anyio
+ # httpx
+ # trio
# via trio
# via ipython
- # via -r requirements-integration.in
+ # via
+ # -r requirements-integration.in
+ # charmed-kubeflow-chisme
# via
# ipdb
# pytest
# via juju
# via
# ipython
# matplotlib-inline
# via
# selenium
# trio-websocket
# via selenium
# via
+ # anyio
# ipython
+ # selenium
# typing-inspect
# via juju
# via
# kubernetes
# requests
# selenium
# via prompt-toolkit
- # via kubernetes
+ # via
+ # kubernetes
+ # ops
# via juju
# via
# selenium-wire
# trio-websocket
+ # via importlib-resources
# via selenium-wire
diff --git a/charms/kfp-api/requirements-unit.txt b/charms/kfp-api/requirements-unit.txt
index b3af24a5..3637a8d0 100644
--- a/charms/kfp-api/requirements-unit.txt
+++ b/charms/kfp-api/requirements-unit.txt
@@ -93,7 +93,7 @@ requests==2.31.0
# via serialized-data-interface
# via charmed-kubeflow-chisme
# via ruamel-yaml
# via
diff --git a/charms/kfp-api/requirements.txt b/charms/kfp-api/requirements.txt
index e38480f0..e08458ca 100644
--- a/charms/kfp-api/requirements.txt
+++ b/charms/kfp-api/requirements.txt
@@ -50,7 +50,7 @@ markupsafe==2.1.3
# via jinja2
# via -r requirements.in
# via
# -r requirements.in
# charmed-kubeflow-chisme
diff --git a/charms/kfp-api/src/charm.py b/charms/kfp-api/src/charm.py
index 9dc05ca5..13532970 100755
--- a/charms/kfp-api/src/charm.py
+++ b/charms/kfp-api/src/charm.py
@@ -17,6 +17,7 @@
from charmed_kubeflow_chisme.pebble import update_layer
from charms.data_platform_libs.v0.data_interfaces import DatabaseRequires
+from charms.loki_k8s.v1.loki_push_api import LogForwarder
from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch
from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider
from jsonschema import ValidationError
@@ -138,6 +139,7 @@ def __init__(self, *args):
+ self._logging = LogForwarder(charm=self)
def _charm_default_kubernetes_labels(self):
diff --git a/charms/kfp-api/tests/integration/test_charm.py b/charms/kfp-api/tests/integration/test_charm.py
index e84604c6..22e4374d 100644
--- a/charms/kfp-api/tests/integration/test_charm.py
+++ b/charms/kfp-api/tests/integration/test_charm.py
@@ -1,15 +1,20 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
-import json
import logging
from pathlib import Path
import pytest
-import requests
import yaml
+from charmed_kubeflow_chisme.testing import (
+ assert_alert_rules,
+ assert_logging,
+ assert_metrics_endpoint,
+ deploy_and_assert_grafana_agent,
+ get_alert_rules,
from pytest_operator.plugin import OpsTest
-from tenacity import Retrying, stop_after_attempt, stop_after_delay, wait_exponential
logger = logging.getLogger(__name__)
@@ -32,16 +37,6 @@
MYSQL_CHANNEL = "8.0/stable"
MYSQL_CONFIG = {"profile": "testing"}
-PROMETHEUS_K8S = "prometheus-k8s"
-PROMETHEUS_K8S_CHANNEL = "latest/stable"
-GRAFANA_K8S = "grafana-k8s"
-GRAFANA_K8S_CHANNEL = "latest/stable"
-PROMETHEUS_SCRAPE_K8S = "prometheus-scrape-config-k8s"
-PROMETHEUS_SCRAPE_CONFIG = {"scrape_interval": "30s"}
class TestCharm:
@@ -96,6 +91,11 @@ async def test_build_and_deploy(self, ops_test: OpsTest):
timeout=90 * 20,
+ # Deploying grafana-agent-k8s and add all relations
+ await deploy_and_assert_grafana_agent(
+ ops_test.model, APP_NAME, metrics=True, dashboard=True, logging=True
+ )
# FIXME: this test case belongs in unit tests as it is asserting the status of the
# unit under a certain condition, we don't actually need the presence of any deployed
# charm to test this.
@@ -203,74 +203,27 @@ async def test_msql_relation_with_relational_db_relation(self, ops_test: OpsTest
# remove redundant relation
await ops_test.juju("remove-relation", f"{APP_NAME}:mysql", f"{KFP_DB}:mysql")
- async def test_prometheus_grafana_integration(self, ops_test: OpsTest):
- """Deploy prometheus, grafana and required relations, then test the metrics."""
- # Deploy and relate prometheus
- await ops_test.model.deploy(
- )
- await ops_test.model.deploy(
- )
- await ops_test.model.deploy(
- )
- await ops_test.model.add_relation(APP_NAME, PROMETHEUS_SCRAPE_K8S)
- await ops_test.model.add_relation(
- f"{PROMETHEUS_K8S}:grafana-dashboard", f"{GRAFANA_K8S}:grafana-dashboard"
- )
- await ops_test.model.add_relation(
- f"{APP_NAME}:grafana-dashboard", f"{GRAFANA_K8S}:grafana-dashboard"
- )
- await ops_test.model.add_relation(
- f"{PROMETHEUS_K8S}:metrics-endpoint", f"{PROMETHEUS_SCRAPE_K8S}:metrics-endpoint"
- )
- # prometheus-k8s needs a significant amount of time to deploy in GH runners,
- # please make sure the timeout stays above or equal to 90*20
- await ops_test.model.wait_for_idle(
- apps=["grafana-k8s", "prometheus-k8s", "prometheus-scrape-config-k8s"],
- status="active",
- raise_on_blocked=False,
- raise_on_error=False,
- timeout=90 * 20,
- idle_period=20,
- )
- status = await ops_test.model.get_status()
- prometheus_unit_ip = status["applications"][PROMETHEUS_K8S]["units"][
- ]["address"]
- logger.info(f"Prometheus available at http://{prometheus_unit_ip}:9090")
- for attempt in self.retry_for_5_attempts:
- logger.info(
- f"Testing prometheus deployment (attempt " f"{attempt.retry_state.attempt_number})"
- )
- with attempt:
- r = requests.get(
- f"http://{prometheus_unit_ip}:9090/api/v1/query?"
- f'query=up{{juju_application="{APP_NAME}"}}'
- )
- response = json.loads(r.content.decode("utf-8"))
- response_status = response["status"]
- logger.info(f"Response status is {response_status}")
- assert response_status == "success"
- response_metric = response["data"]["result"][0]["metric"]
- assert response_metric["juju_application"] == APP_NAME
- assert response_metric["juju_model"] == ops_test.model_name
- # Helper to retry calling a function over 30 seconds or 5 attempts
- retry_for_5_attempts = Retrying(
- stop=(stop_after_attempt(5) | stop_after_delay(30)),
- wait=wait_exponential(multiplier=1, min=1, max=10),
- reraise=True,
- )
+ async def test_alert_rules(self, ops_test: OpsTest):
+ """Test check charm alert rules and rules defined in relation data bag."""
+ app = ops_test.model.applications[APP_NAME]
+ alert_rules = get_alert_rules()
+ logger.info("found alert_rules: %s", alert_rules)
+ await assert_alert_rules(app, alert_rules)
+ async def test_metrics_enpoint(self, ops_test: OpsTest):
+ """Test metrics_endpoints are defined in relation data bag and their accessibility.
+ This function gets all the metrics_endpoints from the relation data bag, checks if
+ they are available from the grafana-agent-k8s charm and finally compares them with the
+ ones provided to the function.
+ """
+ app = ops_test.model.applications[APP_NAME]
+ await assert_metrics_endpoint(app, metrics_port=8888, metrics_path="/metrics")
+ async def test_logging(self, ops_test: OpsTest):
+ """Test logging is defined in relation data bag."""
+ app = ops_test.model.applications[GRAFANA_AGENT_APP]
+ await assert_logging(app)
async def test_remove_application(self, ops_test: OpsTest):
"""Test that the application can be removed successfully."""
diff --git a/charms/kfp-api/tests/unit/test_operator.py b/charms/kfp-api/tests/unit/test_operator.py
index d7ae61b4..6015e2fb 100644
--- a/charms/kfp-api/tests/unit/test_operator.py
+++ b/charms/kfp-api/tests/unit/test_operator.py
@@ -53,6 +53,13 @@ def harness() -> Harness:
class TestCharm:
"""Test class for KfamApiOperator."""
+ @patch("charm.KubernetesServicePatch", lambda x, y: None)
+ @patch("charm.KfpApiOperator.k8s_resource_handler")
+ def test_log_forwarding(self, k8s_resource_handler: MagicMock, harness: Harness):
+ with patch("charm.LogForwarder") as mock_logging:
+ harness.begin()
+ mock_logging.assert_called_once_with(charm=harness.charm)
@patch("charm.KubernetesServicePatch", lambda x, y: None)
def test_not_leader(self, k8s_resource_handler: MagicMock, harness: Harness):