diff --git a/CHANGELOG.md b/CHANGELOG.md index 947e80198..e6beb922b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,11 +6,22 @@ * remove AutoRuleCorpusTester ### Features + +* adds health check endpoint to metrics on path `/health` +* changes helm chart to use new readiness check +* adds `healthcheck_timeout` option to all components to tweak the timeout of healthchecks +* adds `desired_cluster_status` option to opensearch output to signal healthy cluster status +* initially run health checks on setup for every configured component +* make `imagePullPolicy` configurable for helm chart deployments + + ### Improvements * remove AutoRuleCorpusTester * adds support for rust extension development * adds prebuild wheels for architectures `x86_64` and `aarch64` on `manylinux` and `musllinux` based linux platforms to releases +* add manual how to use local images with minikube example setup to documentation +* move `Configuration` to top level of documentation ### Bugfix diff --git a/charts/logprep/Chart.yaml b/charts/logprep/Chart.yaml index ca6b1fdd9..ebf1f33bf 100644 --- a/charts/logprep/Chart.yaml +++ b/charts/logprep/Chart.yaml @@ -6,7 +6,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: "13.2.3" +version: "13.3.0" # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/logprep/templates/deployment.yaml b/charts/logprep/templates/deployment.yaml index 0df9e711d..64f1b7cfc 100644 --- a/charts/logprep/templates/deployment.yaml +++ b/charts/logprep/templates/deployment.yaml @@ -34,7 +34,7 @@ spec: resources: {{- toYaml .Values.resources | nindent 12 }} image: {{ .Values.image.registry }}/{{ .Values.image.repository }}:{{ .Values.image.tag }} - imagePullPolicy: Always + imagePullPolicy: {{ .Values.image.pullPolicy }} ports: {{- if .Values.exporter.enabled }} - name: exporter @@ -106,28 +106,15 @@ spec: {{- if .Values.extraMounts }} {{- toYaml .Values.extraMounts | nindent 12 }} {{- end }} - {{- if or .Values.exporter.enabled (eq .Values.input.type "http_input") }} - {{- if eq .Values.input.type "http_input" }} + {{- if .Values.exporter.enabled }} readinessProbe: httpGet: path: /health - port: {{ .Values.input.uvicorn_config.port }} - initialDelaySeconds: 5 - timeoutSeconds: 10 - periodSeconds: 5 - failureThreshold: 3 - {{- else }} - readinessProbe: - httpGet: - path: /metrics port: {{ .Values.exporter.port }} initialDelaySeconds: 5 timeoutSeconds: 10 periodSeconds: 5 failureThreshold: 3 - {{- end }} - {{- end }} - {{- if .Values.exporter.enabled }} startupProbe: httpGet: path: /metrics diff --git a/charts/logprep/values.yaml b/charts/logprep/values.yaml index 011405993..1d2638a62 100644 --- a/charts/logprep/values.yaml +++ b/charts/logprep/values.yaml @@ -7,6 +7,7 @@ image: registry: ghcr.io repository: fkie-cad/logprep tag: py3.11-stable + pullPolicy: Always # The pod resources resources: diff --git a/doc/source/user_manual/configuration/getter.rst b/doc/source/configuration/getter.rst similarity index 100% rename from doc/source/user_manual/configuration/getter.rst rename to doc/source/configuration/getter.rst diff --git a/doc/source/user_manual/configuration/index.rst b/doc/source/configuration/index.rst similarity index 100% rename from doc/source/user_manual/configuration/index.rst rename to doc/source/configuration/index.rst diff --git a/doc/source/user_manual/configuration/input.rst b/doc/source/configuration/input.rst similarity index 100% rename from doc/source/user_manual/configuration/input.rst rename to doc/source/configuration/input.rst diff --git a/doc/source/user_manual/configuration/metrics.rst b/doc/source/configuration/metrics.rst similarity index 100% rename from doc/source/user_manual/configuration/metrics.rst rename to doc/source/configuration/metrics.rst diff --git a/doc/source/user_manual/configuration/output.rst b/doc/source/configuration/output.rst similarity index 100% rename from doc/source/user_manual/configuration/output.rst rename to doc/source/configuration/output.rst diff --git a/doc/source/user_manual/configuration/processor.rst b/doc/source/configuration/processor.rst similarity index 100% rename from doc/source/user_manual/configuration/processor.rst rename to doc/source/configuration/processor.rst diff --git a/doc/source/user_manual/configuration/rules.rst b/doc/source/configuration/rules.rst similarity index 100% rename from doc/source/user_manual/configuration/rules.rst rename to doc/source/configuration/rules.rst diff --git a/doc/source/examples/minikube.rst b/doc/source/examples/minikube.rst index ec9bb7e95..0c0b385ac 100644 --- a/doc/source/examples/minikube.rst +++ b/doc/source/examples/minikube.rst @@ -15,25 +15,19 @@ with the following commands: .. code-block:: bash :caption: Install package prerequisites - sudo apt-get install -y \ - apt-transport-https \ - ca-certificates \ - curl \ - software-properties-common + sudo apt-get install -y apt-transport-https ca-certificates curl software-properties-common .. code-block:: bash :caption: Install minikube - sudo curl -Lo /usr/local/bin/minikube \ - https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 + sudo curl -Lo /usr/local/bin/minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 sudo chmod +x /usr/local/bin/minikube .. code-block:: bash :caption: Install kubectl - sudo curl -Lo /usr/local/bin/kubectl \ - "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" + sudo curl -Lo /usr/local/bin/kubectl "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" sudo chmod +x /usr/local/bin/kubectl @@ -56,8 +50,8 @@ with the following commands: minikube config set driver docker minikube config set cpus 16 minikube config set memory 16GB - minikube addons enable ingress minikube start + minikube addons enable ingress Deploy the example ------------------ @@ -125,4 +119,36 @@ Test the opensiem connector: } 2024-07-17 11:15:35 301643 Generator INFO : Execution time: 0.067013 seconds -open your browser and go to `http://dashboards.opensiem`_ to see the generated data in the opensearch dashboards. +open your browser and go to `opensearch dashboard `_ to see the generated data in the opensearch dashboards. + + +Use local container images +-------------------------- + +If you want to use local logprep container images, you can build the images with the following commands: + +.. code-block:: bash + :caption: switch docker context to minikube in bash + + eval $(minikube docker-env) + +for powershell: + +.. code-block:: powershell + :caption: switch docker context to minikube in powershell + + (minikube docker-env).replace("export ", '$env:') | out-string | Invoke-Expression + +Then build the logprep image with the following command: + +.. code-block:: bash + :caption: build this image using the Dockerfile in the root of the repository + + docker buildx build -t local/logprep:latest --build-arg PYTHON_VERSION=3.11 --build-arg LOGPREP_VERSION=dev . + +Then install the opensiem example using the local logprep image: + +.. code-block:: bash + :caption: use the local values file to deploy the opensiem example + + helm install opensiem examples/k8s --values examples/k8s/values-dev.yaml diff --git a/doc/source/index.rst b/doc/source/index.rst index e7c94d165..a057c6fa8 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -1,19 +1,27 @@ -======================================== -Welcome to the Documentation of Logprep! -======================================== +====================================== +Logprep: The swiss army knife for logs +====================================== + +This is the documentation for Logprep. The swiss army knife for logs. +It provides tools for: + +* **collection** of logs from various sources +* **normalization** via different processors +* **shipping** to different datalake targets +* **generation** of events for load testing +* **pseudonymization** and **depseudonymization** of fields in log data to comply with GDPR + +and it is written in **Python**! .. toctree:: - :maxdepth: 2 - :caption: Content: + :maxdepth: 3 installation user_manual/index + configuration/index development/index examples/index -================== -Indices and Tables -================== * :ref:`genindex` * :ref:`modindex` \ No newline at end of file diff --git a/doc/source/user_manual/execution.rst b/doc/source/user_manual/execution.rst index ee0a27b40..64915eac2 100644 --- a/doc/source/user_manual/execution.rst +++ b/doc/source/user_manual/execution.rst @@ -195,3 +195,24 @@ Exit Codes :undoc-members: :inherited-members: :noindex: + + +Healthchecks +------------ + +Logprep provides a health endpoint which can be used to check the health of all components. +The asgi app for the healthcheck endpoint is implemented in :code:`logprep.metrics.exporter.make_patched_asgi_app` and +will be recreated on every restart of logprep (e.g. after a configuration change) or on creation of the first pipeline process. +The healthcheck endpoint is available at :code:`/health` if metrics are enabled and can be accessed via HTTP GET. + +* On success, the healthcheck endpoint will return a :code:`200` status code and a payload :code:`OK`. +* On failure, the healthcheck endpoint will return a :code:`503` status code and a payload :code:`FAIL`. + +Healthchecks are implemented in components via the :code:`health()` method. You have to ensure to call +the :code:`super.health()` method in new implemented health checks. +The health is checked for the first time after the first pipeline process is started and then every 5 seconds. +You can configure the healthcheck timeout on component level with the parameter :code:`health_timeout`. +The default value is 1 second. + +Healthchecks are used in the provided helm charts as default for readiness probes. + diff --git a/doc/source/user_manual/index.rst b/doc/source/user_manual/index.rst index b8da91383..1b8cf9bae 100644 --- a/doc/source/user_manual/index.rst +++ b/doc/source/user_manual/index.rst @@ -9,5 +9,4 @@ User Manual execution verification testing_rules - configuration/index security_best_practices diff --git a/examples/exampledata/config/pipeline.yml b/examples/exampledata/config/pipeline.yml index b45ff72f2..b9e209c99 100644 --- a/examples/exampledata/config/pipeline.yml +++ b/examples/exampledata/config/pipeline.yml @@ -114,13 +114,13 @@ output: - 127.0.0.1:9200 default_index: processed error_index: errors - message_backlog_size: 10000 + message_backlog_size: 2500 timeout: 10000 flush_timeout: 60 max_retries: 3 - parallel_bulk: false user: admin secret: admin + desired_cluster_status: ["green", "yellow"] kafka: type: confluentkafka_output default: false diff --git a/examples/k8s/values-dev.yaml b/examples/k8s/values-dev.yaml new file mode 100644 index 000000000..1c4cb7247 --- /dev/null +++ b/examples/k8s/values-dev.yaml @@ -0,0 +1,126 @@ +connector: + image: + # point your docker context to minikube and build the image + # `eval $(minikube docker-env)` or `(minikube docker-env).replace("export ", '$env:') | out-string | Invoke-Expression` + # build this image using the Dockerfile in the root of the repository + # `docker buildx build -t local/logprep:latest --build-arg PYTHON_VERSION=3.11 --build-arg LOGPREP_VERSION=dev .` + registry: local + repository: logprep + tag: latest + pullPolicy: IfNotPresent + replicas: 1 + secrets: {} + logger: + level: INFO + input: + type: http_input + message_backlog_size: 150000 + collect_meta: True + metafield_name: "@metadata" + uvicorn_config: + host: 0.0.0.0 + port: 9000 + workers: 1 + access_log: true + server_header: false + date_header: false + ws: none + interface: asgi3 + backlog: 16384 + timeout_keep_alive: 65 + endpoints: + /auth-json: json + /json: json + /lab/123/(ABC|DEF)/pl.*: plaintext + /lab/123/ABC/auditlog: jsonl + /health: plaintext + output: + type: confluentkafka_output + topic: consumer + error_topic: errors + flush_timeout: 300 + send_timeout: 0 + kafka_config: + bootstrap.servers: opensiem-kafka:9092 + compression.type: none + statistics.interval.ms: "60000" + queue.buffering.max.messages: "100000000" + queue.buffering.max.kbytes: "1048576" + queue.buffering.max.ms: "10000" + batch.size: "1000000" + request.required.acks: "-1" + ingress: + enabled: true + +## for additional configurations see: `https://github.com/bitnami/charts/blob/main/bitnami/kafka/values.yaml` +kafka: + listeners: + client: + protocol: PLAINTEXT + controller: + replicaCount: 3 + metrics: + jmx: + enabled: true + provisioning: + enabled: true + replicationFactor: 1 + numPartitions: 10 + topics: + - name: consumer + +logprep: + image: + registry: local + repository: logprep + tag: latest + pullPolicy: IfNotPresent + logger: + level: INFO + input: + type: confluentkafka_input + topic: consumer + kafka_config: + bootstrap.servers: opensiem-kafka:9092 + group.id: cgroup3 + enable.auto.commit: "true" + auto.commit.interval.ms: "10000" + enable.auto.offset.store: "false" + queued.min.messages: "100000" + queued.max.messages.kbytes: "65536" + statistics.interval.ms: "60000" + preprocessing: + version_info_target_field: Logprep_version_info + log_arrival_time_target_field: event.ingested + hmac: + target: + key: "thisisasecureandrandomkey" + output_field: Full_event + output: + type: opensearch_output + hosts: + - opensiem-opensearch:9200 + default_index: processed + error_index: errors + timeout: 10000 + message_backlog_size: 2500 + parallel_bulk: true + flush_timeout: 60 + max_retries: 3 + chunk_size: 500 + thread_count: 5 + user: admin + secret: admin + desired_cluster_status: ["green", "yellow"] +## for additional configurations see: `https://github.com/bitnami/charts/blob/main/bitnami/opensearch/values.yaml` +opensearch: + dashboards: + enabled: true + ingest: + replicaCount: 1 + master: + replicaCount: 1 + data: + replicaCount: 1 + coordinating: + replicaCount: 1 diff --git a/logprep/abc/component.py b/logprep/abc/component.py index 27e4097c3..adb3a72a7 100644 --- a/logprep/abc/component.py +++ b/logprep/abc/component.py @@ -2,6 +2,9 @@ import functools import inspect +import logging +import sys +import time from abc import ABC from functools import cached_property from typing import Callable @@ -12,8 +15,11 @@ from schedule import Scheduler from logprep.metrics.metrics import Metric +from logprep.util.defaults import DEFAULT_HEALTH_TIMEOUT, EXITCODES from logprep.util.helper import camel_to_snake +logger = logging.getLogger("Component") + class Component(ABC): """Abstract Component Class to define the Interface""" @@ -28,6 +34,13 @@ class Config: type: str = field(validator=validators.instance_of(str)) """Type of the component""" + health_timeout: float = field( + validator=validators.instance_of(float), + default=DEFAULT_HEALTH_TIMEOUT, + converter=float, + ) + """Timeout in seconds for health check: Default is 1 seconds""" + @define(kw_only=True) class Metrics: """Base Metric class to track and expose statistics about logprep""" @@ -86,6 +99,25 @@ def describe(self) -> str: def setup(self): """Set the component up.""" self._populate_cached_properties() + if not "http" in self._config.type: + # HTTP input connector spins up an http server + # only on the first pipeline process + # but this runs on all pipeline processes which leads to never + # completing the setup phase + self._wait_for_health() + + def _wait_for_health(self) -> None: + """Wait for the component to be healthy. + if the component is not healthy after a period of time, the process will exit. + """ + for i in range(3): + if self.health(): + break + logger.info("Wait for %s initially becoming healthy: %s/3", self.name, i + 1) + time.sleep(1 + i) + else: + logger.error("Component '%s' did not become healthy", self.name) + sys.exit(EXITCODES.PIPELINE_ERROR.value) def _populate_cached_properties(self): _ = [ @@ -103,10 +135,22 @@ def shut_down(self): if hasattr(self, "__dict__"): self.__dict__.clear() + def health(self) -> bool: + """Check the health of the component. + + Returns + ------- + bool + True if the component is healthy, False otherwise. + + """ + logger.debug("Checking health of %s", self.name) + return True + def _schedule_task( self, task: Callable, seconds: int, args: tuple = None, kwargs: dict = None ) -> None: - """Schedule a task to run periodicly during pipeline run. + """Schedule a task to run periodically during pipeline run. The task is run in :code:`pipeline.py` in the :code:`process_pipeline` method. Parameters diff --git a/logprep/abc/connector.py b/logprep/abc/connector.py index cf89fa0bb..28e03feec 100644 --- a/logprep/abc/connector.py +++ b/logprep/abc/connector.py @@ -15,11 +15,11 @@ class Metrics(Component.Metrics): number_of_processed_events: CounterMetric = field( factory=lambda: CounterMetric( - description="Number of successfull events", + description="Number of successful events", name="number_of_processed_events", ) ) - """Number of successfull events""" + """Number of successful events""" number_of_failed_events: CounterMetric = field( factory=lambda: CounterMetric( diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 9bd5e144a..0816ce08e 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -31,6 +31,7 @@ import logging from functools import cached_property, partial from socket import getfqdn +from types import MappingProxyType from typing import Callable, Optional, Tuple, Union import msgspec @@ -211,15 +212,16 @@ class Config(Input.Config): topic: str = field(validator=validators.instance_of(str)) """The topic from which new log messages will be fetched.""" - kafka_config: Optional[dict] = field( + kafka_config: Optional[MappingProxyType] = field( validator=[ - validators.instance_of(dict), + validators.instance_of(MappingProxyType), validators.deep_mapping( key_validator=validators.instance_of(str), value_validator=validators.instance_of(str), ), partial(keys_in_validator, expected_keys=["bootstrap.servers", "group.id"]), - ] + ], + converter=MappingProxyType, ) """ Kafka configuration for the kafka client. At minimum the following keys must be set: @@ -504,13 +506,31 @@ def _lost_callback(self, consumer, topic_partitions): topic_partition.offset = OFFSET_STORED self._consumer.assign(topic_partitions) - def setup(self) -> None: - try: - super().setup() - except (KafkaException, ValueError) as error: - raise FatalInputError(self, str(error)) from error - def shut_down(self) -> None: """Close consumer, which also commits kafka offsets.""" self._consumer.close() super().shut_down() + + def health(self) -> bool: + """Check the health of the component. + + Returns + ------- + bool + True if the component is healthy, False otherwise. + """ + + try: + self._consumer.list_topics(timeout=self._config.health_timeout) + except KafkaException as error: + logger.error("Health check failed: %s", error) + self.metrics.number_of_errors += 1 + return False + return super().health() + + def setup(self) -> None: + """Set the component up.""" + try: + super().setup() + except KafkaException as error: + raise FatalInputError(self, f"Could not setup kafka consumer: {error}") from error diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index 96a1b6028..99da911fa 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -29,6 +29,7 @@ from datetime import datetime from functools import cached_property, partial from socket import getfqdn +from types import MappingProxyType from typing import Optional from attrs import define, field, validators @@ -151,16 +152,17 @@ class Config(Output.Config): """The topic into which events should be written that couldn't be processed successfully.""" flush_timeout: float send_timeout: int = field(validator=validators.instance_of(int), default=0) - kafka_config: Optional[dict] = field( + kafka_config: Optional[MappingProxyType] = field( validator=[ - validators.instance_of(dict), + validators.instance_of(MappingProxyType), validators.deep_mapping( key_validator=validators.instance_of(str), value_validator=validators.instance_of((str, dict)), ), partial(keys_in_validator, expected_keys=["bootstrap.servers"]), ], - factory=dict, + factory=MappingProxyType, + converter=MappingProxyType, ) """ Kafka configuration for the kafka client. At minimum the following keys must be set: @@ -193,7 +195,7 @@ def _kafka_config(self) -> dict: return DEFAULTS | self._config.kafka_config | injected_config @cached_property - def _producer(self): + def _producer(self) -> Producer: return Producer(self._kafka_config) def _error_callback(self, error: KafkaException): @@ -324,13 +326,24 @@ def store_failed( # block program until buffer is empty self._producer.flush(timeout=self._config.flush_timeout) - def setup(self): - try: - super().setup() - except (KafkaException, ValueError) as error: - raise FatalOutputError(self, str(error)) from error - def shut_down(self) -> None: """ensures that all messages are flushed""" if self._producer is not None: self._producer.flush(self._config.flush_timeout) + + def health(self) -> bool: + """Check the health of kafka producer.""" + try: + self._producer.list_topics(timeout=self._config.health_timeout) + except KafkaException as error: + logger.error("Health check failed: %s", error) + self.metrics.number_of_errors += 1 + return False + return super().health() + + def setup(self) -> None: + """Set the component up.""" + try: + super().setup() + except KafkaException as error: + raise FatalOutputError(self, f"Could not setup kafka producer: {error}") from error diff --git a/logprep/connector/http/input.py b/logprep/connector/http/input.py index 1317e796a..87b4bd86e 100644 --- a/logprep/connector/http/input.py +++ b/logprep/connector/http/input.py @@ -84,10 +84,13 @@ import zlib from abc import ABC from base64 import b64encode -from typing import Callable, Mapping, Tuple, Union +from functools import cached_property +from typing import Callable, List, Mapping, Tuple, Union import falcon.asgi import msgspec +import requests +import rstr from attrs import define, field, validators from falcon import ( # pylint: disable=no-name-in-module HTTP_200, @@ -138,7 +141,7 @@ async def func_wrapper(*args, **kwargs): resp = args[2] resp.status = HTTP_200 if endpoint.messages.full(): - raise HTTPTooManyRequests(description="Logprep Message Queue is full.") + raise queue.Full() return else: raise HTTPMethodNotAllowed(["POST"]) @@ -314,11 +317,11 @@ class Metrics(Input.Metrics): number_of_http_requests: CounterMetric = field( factory=lambda: CounterMetric( - description="Number of incomming requests", + description="Number of incoming requests", name="number_of_http_requests", ) ) - """Number of incomming requests""" + """Number of incoming requests""" message_backlog_size: GaugeMetric = field( factory=lambda: GaugeMetric( @@ -351,7 +354,7 @@ class Config(Input.Config): :location: uvicorn_config :suggested-value: uvicorn_config.access_log: true, uvicorn_config.server_header: false, uvicorn_config.data_header: false - Additionaly to the below it is recommended to configure `ssl on the metrics server endpoint + Additionally to the below it is recommended to configure `ssl on the metrics server endpoint `_ .. code-block:: yaml @@ -374,7 +377,7 @@ class Config(Input.Config): ) """Configure endpoint routes with a Mapping of a path to an endpoint. Possible endpoints are: :code:`json`, :code:`jsonl`, :code:`plaintext`. It's possible to use wildcards and - regexes for pattern matching. + regexps for pattern matching. .. autoclass:: logprep.connector.http.input.PlaintextHttpEndpoint @@ -490,3 +493,35 @@ def shut_down(self): if self.http_server is None: return self.http_server.shut_down() + + @cached_property + def health_endpoints(self) -> List[str]: + """Returns a list of endpoints for internal healthcheck + the endpoints are examples to match against the configured regex enabled + endpoints. The endpoints are normalized to match the regex patterns and this ensures that the endpoints should not be too long + """ + normalized_endpoints = (endpoint.replace(".*", "b") for endpoint in self._config.endpoints) + normalized_endpoints = (endpoint.replace(".+", "b") for endpoint in normalized_endpoints) + normalized_endpoints = (endpoint.replace("+", "{5}") for endpoint in normalized_endpoints) + normalized_endpoints = (endpoint.replace("*", "{5}") for endpoint in normalized_endpoints) + return [rstr.xeger(endpoint) for endpoint in normalized_endpoints] + + def health(self) -> bool: + """Health check for the HTTP Input Connector + + Returns + ------- + bool + :code:`True` if all endpoints can be called without error + """ + for endpoint in self.health_endpoints: + try: + requests.get( + f"{self.target}{endpoint}", timeout=self._config.health_timeout + ).raise_for_status() + except (requests.exceptions.RequestException, requests.exceptions.Timeout) as error: + logger.error("Health check failed for endpoint: %s due to %s", endpoint, str(error)) + self.metrics.number_of_errors += 1 + return False + + return super().health() diff --git a/logprep/connector/opensearch/output.py b/logprep/connector/opensearch/output.py index effe3e1ea..dd65b242f 100644 --- a/logprep/connector/opensearch/output.py +++ b/logprep/connector/opensearch/output.py @@ -69,8 +69,8 @@ def dumps(self, data): except (ValueError, TypeError) as e: raise search.exceptions.SerializationError(data, e) - def loads(self, data): - return self._decoder.decode(data) + def loads(self, s): + return self._decoder.decode(s) class OpensearchOutput(Output): @@ -144,6 +144,10 @@ class Config(Output.Config): default=500, validator=[validators.instance_of(int), validators.gt(1)] ) """Chunk size to use for bulk requests.""" + desired_cluster_status: list = field( + default=["green"], validator=validators.instance_of(list) + ) + """Desired cluster status for health check as list of strings. Default is ["green"]""" __slots__ = ["_message_backlog", "_size_error_pattern"] @@ -219,10 +223,6 @@ def setup(self): super().setup() flush_timeout = self._config.flush_timeout self._schedule_task(task=self._write_backlog, seconds=flush_timeout) - try: - self._search_context.info() - except (OpenSearchException, TimeoutError) as error: - raise FatalOutputError(self, error) from error def describe(self) -> str: """Get name of Opensearch endpoint with the host. @@ -554,3 +554,15 @@ def _split_message_backlog_by_size_limit(self): else: messages_over_size_limit.append((message, message_size)) return messages_under_size_limit, messages_over_size_limit + + def health(self) -> bool: + """Check the health of the component.""" + try: + resp = self._search_context.cluster.health( + params={"timeout": self._config.health_timeout} + ) + except (OpenSearchException, ConnectionError) as error: + logger.error("Health check failed: %s", error) + self.metrics.number_of_errors += 1 + return False + return super().health() and resp.get("status") in self._config.desired_cluster_status diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 06d1894c4..c20a211d0 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -217,15 +217,13 @@ def _setup(self): output.input_connector = self._input if output.default: self._input.output_connector = output + output.setup() self.logger.debug( f"Created connectors -> input: '{self._input.describe()}'," f" output -> '{[output.describe() for _, output in self._output.items()]}'" ) self._input.pipeline_index = self.pipeline_index self._input.setup() - for _, output in self._output.items(): - output.setup() - self.logger.debug("Finished creating connectors") self.logger.info("Start building pipeline") _ = self._pipeline @@ -341,3 +339,16 @@ def stop(self) -> None: self.logger.debug(f"Stopping pipeline ({self._process_name})") with self._continue_iterating.get_lock(): self._continue_iterating.value = False + + def get_health_functions(self) -> Tuple[bool]: + """Return health function of components""" + output_health_functions = [] + if self._output: + output_health_functions = [output.health for output in self._output.values()] + return tuple( + itertools.chain( + [self._input.health], + [processor.health for processor in self._pipeline], + output_health_functions, + ) + ) diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 0c27d7348..5889c87b0 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -8,6 +8,7 @@ import multiprocessing.managers import multiprocessing.queues import random +import signal import time from attr import define, field @@ -182,23 +183,24 @@ def stop(self): """Stop processing any pipelines by reducing the pipeline count to zero.""" self._decrease_to_count(0) if self.prometheus_exporter: + self.prometheus_exporter.server.server.handle_exit(signal.SIGTERM, None) self.prometheus_exporter.cleanup_prometheus_multiprocess_dir() - def restart(self): + def restart(self, daemon=True): """Restarts all pipelines""" + if self.prometheus_exporter: + self.prometheus_exporter.run(daemon=daemon) self.set_count(0) self.set_count(self._configuration.process_count) - if not self.prometheus_exporter: - return - if not self.prometheus_exporter.is_running: - self.prometheus_exporter.run() def _create_pipeline(self, index) -> multiprocessing.Process: pipeline = Pipeline(pipeline_index=index, config=self._configuration) - logger.info("Created new pipeline") + if pipeline.pipeline_index == 1 and self.prometheus_exporter: + self.prometheus_exporter.update_healthchecks(pipeline.get_health_functions()) process = multiprocessing.Process( target=pipeline.run, daemon=True, name=f"Pipeline-{index}" ) process.stop = pipeline.stop process.start() + logger.info("Created new pipeline") return process diff --git a/logprep/metrics/exporter.py b/logprep/metrics/exporter.py index 69fa8a08f..1cdaa7c71 100644 --- a/logprep/metrics/exporter.py +++ b/logprep/metrics/exporter.py @@ -3,37 +3,65 @@ import os import shutil from logging import getLogger +from typing import Callable, Iterable from prometheus_client import REGISTRY, make_asgi_app, multiprocess from logprep.util import http from logprep.util.configuration import MetricsConfig +from logprep.util.defaults import DEFAULT_HEALTH_STATE + +logger = getLogger("Exporter") + + +def make_patched_asgi_app(functions: Iterable[Callable] | None) -> Callable: + """Creates an ASGI app that includes health check and metrics handling""" + + prometheus_app = make_asgi_app(REGISTRY) + + functions = functions if functions else [lambda: DEFAULT_HEALTH_STATE] + + response_start_ok = {"type": "http.response.start", "status": 200} + response_body_ok = {"type": "http.response.body", "body": b"OK"} + response_start_fail = {"type": "http.response.start", "status": 503} + response_body_fail = {"type": "http.response.body", "body": b"FAIL"} + + async def asgi_app(scope, receive, send): + """asgi app with health check and metrics handling""" + if scope["type"] == "http" and scope["path"] == "/health": + success = all(f() for f in functions) + await send(response_start_ok if success else response_start_fail) + await send(response_body_ok if success else response_body_fail) + else: + await prometheus_app(scope, receive, send) + + return asgi_app class PrometheusExporter: """Used to control the prometheus exporter and to manage the metrics""" + @property + def is_running(self) -> bool: + """Returns whether the exporter is running""" + return self.server and self.server.thread and self.server.thread.is_alive() + def __init__(self, configuration: MetricsConfig): - self.is_running = False - logger_name = "Exporter" - self._logger = getLogger(logger_name) - self._logger.debug("Initializing Prometheus Exporter") + logger.debug("Initializing Prometheus Exporter") self.configuration = configuration - self._port = configuration.port - self._app = make_asgi_app(REGISTRY) - self._server = http.ThreadingHTTPServer( - configuration.uvicorn_config | {"port": self._port, "host": "0.0.0.0"}, - self._app, - daemon=True, - logger_name=logger_name, - ) + self.server = None + self.healthcheck_functions = None + self._multiprocessing_prepared = False def _prepare_multiprocessing(self): """ Sets up the proper metric registry for multiprocessing and handles the necessary temporary multiprocessing directory that the prometheus client expects. """ + if self._multiprocessing_prepared: + return multiprocess.MultiProcessCollector(REGISTRY) + self._multiprocessing_prepared = True def cleanup_prometheus_multiprocess_dir(self): """removes the prometheus multiprocessing directory""" @@ -45,7 +73,7 @@ def cleanup_prometheus_multiprocess_dir(self): os.remove(os.path.join(root, file)) for directory in dirs: shutil.rmtree(os.path.join(root, directory), ignore_errors=True) - self._logger.info("Cleaned up %s", multiprocess_dir) + logger.info("Cleaned up %s", multiprocess_dir) def mark_process_dead(self, pid): """ @@ -59,9 +87,35 @@ def mark_process_dead(self, pid): """ multiprocess.mark_process_dead(pid) - def run(self): + def run(self, daemon=True): """Starts the default prometheus http endpoint""" + if self.is_running: + return + port = self.configuration.port + self.init_server(daemon=daemon) self._prepare_multiprocessing() - self._server.start() - self._logger.info("Prometheus Exporter started on port %s", self._port) - self.is_running = True + self.server.start() + logger.info("Prometheus Exporter started on port %s", port) + + def init_server(self, daemon=True) -> None: + """Initializes the server""" + port = self.configuration.port + self.server = http.ThreadingHTTPServer( + self.configuration.uvicorn_config | {"port": port, "host": "0.0.0.0"}, + make_patched_asgi_app(self.healthcheck_functions), + daemon=daemon, + logger_name="Exporter", + ) + + def restart(self): + """Restarts the exporter""" + if self.server and self.server.thread and self.server.thread.is_alive(): + self.server.shut_down() + self.run() + + def update_healthchecks(self, healthcheck_functions: Iterable[Callable], daemon=True) -> None: + """Updates the healthcheck functions""" + self.healthcheck_functions = healthcheck_functions + self.server.shut_down() + self.init_server(daemon=daemon) + self.run() diff --git a/logprep/processor/amides/processor.py b/logprep/processor/amides/processor.py index c059d98ee..5a257091a 100644 --- a/logprep/processor/amides/processor.py +++ b/logprep/processor/amides/processor.py @@ -9,7 +9,7 @@ SIEM rule was tried to be evaded. An overview of AMIDES is depicted in the figure below. -.. figure:: ../../_images/amides.svg +.. figure:: ../_images/amides.svg :align: center Overview of the AMIDES architecture. diff --git a/logprep/processor/labeler/rule.py b/logprep/processor/labeler/rule.py index 910cc3265..804e1f109 100644 --- a/logprep/processor/labeler/rule.py +++ b/logprep/processor/labeler/rule.py @@ -27,6 +27,8 @@ :noindex: """ +from typing import Iterable + from attrs import define, field, validators from logprep.processor.field_manager.rule import FieldManagerRule @@ -45,7 +47,7 @@ class Config(FieldManagerRule.Config): validators.instance_of(dict), validators.deep_mapping( key_validator=validators.instance_of(str), - value_validator=validators.instance_of(list), + value_validator=validators.instance_of(Iterable), ), ] ) diff --git a/logprep/processor/pseudonymizer/rule.py b/logprep/processor/pseudonymizer/rule.py index 2aa1b096b..947951d18 100644 --- a/logprep/processor/pseudonymizer/rule.py +++ b/logprep/processor/pseudonymizer/rule.py @@ -63,7 +63,7 @@ class Config(FieldManagerRule.Config): validator=[ validators.deep_mapping( key_validator=validators.instance_of(str), - value_validator=validators.instance_of(str), + value_validator=validators.instance_of((str, re.Pattern)), ), validators.min_len(1), ] diff --git a/logprep/util/credentials.py b/logprep/util/credentials.py index 54ce955b7..0ce425d17 100644 --- a/logprep/util/credentials.py +++ b/logprep/util/credentials.py @@ -91,7 +91,7 @@ Authentication Process: ^^^^^^^^^^^^^^^^^^^^^^^ -.. figure:: ../../_images/Credentials.svg +.. figure:: ../_images/Credentials.svg :align: left """ diff --git a/logprep/util/defaults.py b/logprep/util/defaults.py index 33dbb5b29..0d87c687b 100644 --- a/logprep/util/defaults.py +++ b/logprep/util/defaults.py @@ -56,3 +56,7 @@ class EXITCODES(Enum): "disable_existing_loggers": False, } ENV_NAME_LOGPREP_CREDENTIALS_FILE = "LOGPREP_CREDENTIALS_FILE" + +DEFAULT_HEALTH_STATE = False # unhealthy + +DEFAULT_HEALTH_TIMEOUT = 1 # seconds diff --git a/logprep/util/http.py b/logprep/util/http.py index a96a3bbc6..22afe9328 100644 --- a/logprep/util/http.py +++ b/logprep/util/http.py @@ -5,6 +5,7 @@ import logging import os import threading +import time import uvicorn @@ -54,6 +55,7 @@ def __init__( if ( hasattr(self, "thread") + and self.thread is not None and self.thread.is_alive() # pylint: disable=access-member-before-definition ): self.shut_down() @@ -69,28 +71,42 @@ def __init__( logprep_log_config = json.loads( os.environ.get("LOGPREP_LOG_CONFIG", json.dumps(DEFAULT_LOG_CONFIG)) ) - uvicorn_config = uvicorn.Config(**uvicorn_config, app=app, log_config=logprep_log_config) + self.uvicorn_config = uvicorn.Config( + **uvicorn_config, app=app, log_config=logprep_log_config + ) logging.getLogger("uvicorn.access").name = self._logger_name logging.getLogger("uvicorn.error").name = self._logger_name - self.server = uvicorn.Server(uvicorn_config) - self.thread = threading.Thread(daemon=daemon, target=self.server.run) + self.server = None + self.thread = None + self.daemon = daemon def start(self): """Collect all configs, initiate application server and webserver and run thread with uvicorn+falcon http server and wait until it is up (started)""" - + self.server = uvicorn.Server(self.uvicorn_config) + self.thread = threading.Thread(daemon=self.daemon, target=self.server.run) self.thread.start() while not self.server.started: continue - def shut_down(self): + def shut_down(self, wait: float = 1) -> None: """Stop thread with uvicorn+falcon http server, wait for uvicorn to exit gracefully and join the thread""" - if not self.thread.is_alive(): + if self.thread is None or self.server is None: return self.server.should_exit = True - while self.thread.is_alive(): + while 1: self._logger.debug("Wait for server to exit gracefully...") - continue + if not self.thread.is_alive(): + time.sleep(wait) + if not self.thread.is_alive(): # we have to double check if it is really dead + break + time.sleep(wait) self.thread.join() + + def restart(self, wait: float = 1) -> None: + """Restart the server by shutting down the existing server and + starting a new one""" + self.shut_down(wait=wait) + self.start() diff --git a/pyproject.toml b/pyproject.toml index c1f8799fe..f15f330b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,6 +97,7 @@ dependencies = [ "falcon==3.1.3", "uvloop", "httptools", + "rstr", ] [project.optional-dependencies] diff --git a/tests/acceptance/test_amides.py b/tests/acceptance/test_amides.py index 14ababacf..57ad2a4a0 100644 --- a/tests/acceptance/test_amides.py +++ b/tests/acceptance/test_amides.py @@ -20,7 +20,7 @@ def config(): config_dict = { "process_count": 1, "timeout": 0.1, - "profile_pipelines": True, + "profile_pipelines": False, "pipeline": [ { "amides": { diff --git a/tests/unit/charts/test_exporter_config.py b/tests/unit/charts/test_exporter_config.py index 6d454e78e..476dfa09b 100644 --- a/tests/unit/charts/test_exporter_config.py +++ b/tests/unit/charts/test_exporter_config.py @@ -210,7 +210,7 @@ def test_probe_ports_are_populated_from_values(self): assert liveness_probe["httpGet"]["path"] == "/metrics" readiness_probe = self.deployment["spec.template.spec.containers.0.readinessProbe"] assert readiness_probe["httpGet"]["port"] == 1337 - assert readiness_probe["httpGet"]["path"] == "/metrics" + assert readiness_probe["httpGet"]["path"] == "/health" startup_probe = self.deployment["spec.template.spec.containers.0.startupProbe"] assert startup_probe["httpGet"]["port"] == 1337 assert startup_probe["httpGet"]["path"] == "/metrics" diff --git a/tests/unit/charts/test_input_config.py b/tests/unit/charts/test_input_config.py index f5e60556b..9e30a3770 100644 --- a/tests/unit/charts/test_input_config.py +++ b/tests/unit/charts/test_input_config.py @@ -97,13 +97,3 @@ def test_http_input_config_sets_deployment_port(self): break else: assert False, "http-input port not found" - - def test_probe_ports_are_populated_from_values(self): - self.manifests = self.render_chart("logprep", {"input": http_input_config}) - liveness_probe = self.deployment["spec.template.spec.containers.0.livenessProbe"] - assert liveness_probe["httpGet"]["port"] == 8000 - readiness_probe = self.deployment["spec.template.spec.containers.0.readinessProbe"] - assert readiness_probe["httpGet"]["port"] == 9999 - assert readiness_probe["httpGet"]["path"] == "/health" - startup_probe = self.deployment["spec.template.spec.containers.0.startupProbe"] - assert startup_probe["httpGet"]["port"] == 8000 diff --git a/tests/unit/component/base.py b/tests/unit/component/base.py index a09acae1d..c6cbe87fa 100644 --- a/tests/unit/component/base.py +++ b/tests/unit/component/base.py @@ -34,6 +34,7 @@ class BaseComponentTestCase(ABC): def setup_method(self) -> None: config = {"Test Instance Name": self.CONFIG} self.object = Factory.create(configuration=config) + self.object._wait_for_health = mock.MagicMock() assert "metrics" not in self.object.__dict__, "metrics should be a cached_property" self.metric_attributes = asdict( self.object.metrics, @@ -125,6 +126,13 @@ def test_setup_populates_cached_properties(self, mock_getmembers): self.object.setup() mock_getmembers.assert_called_with(self.object) + def test_setup_calls_wait_for_health(self): + self.object.setup() + self.object._wait_for_health.assert_called() + def test_config_is_immutable(self): with pytest.raises(FrozenInstanceError): self.object._config.type = "new_type" + + def test_health_returns_bool(self): + assert isinstance(self.object.health(), bool) diff --git a/tests/unit/connector/test_confluent_kafka_common.py b/tests/unit/connector/test_confluent_kafka_common.py index ea8409482..25ef76dcb 100644 --- a/tests/unit/connector/test_confluent_kafka_common.py +++ b/tests/unit/connector/test_confluent_kafka_common.py @@ -56,3 +56,8 @@ def test_stats_set_age_metric_explicitly(self): json_string = Path(KAFKA_STATS_JSON_PATH).read_text("utf8") self.object._stats_callback(json_string) assert self.object.metrics.librdkafka_age == 1337 + + def test_kafka_config_is_immutable(self): + self.object.setup() + with pytest.raises(TypeError): + self.object._config.kafka_config["client.id"] = "test" diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index e5fa3850e..c190fdff1 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -32,6 +32,7 @@ class TestConfluentKafkaInput(BaseInputTestCase, CommonConfluentKafkaTestCase): "type": "confluentkafka_input", "kafka_config": {"bootstrap.servers": "testserver:9092", "group.id": "testgroup"}, "topic": "test_input_raw", + "health_timeout": 0.1, } expected_metrics = [ @@ -115,8 +116,8 @@ def test_shut_down_calls_consumer_close(self, _): @mock.patch("logprep.connector.confluent_kafka.input.Consumer") def test_batch_finished_callback_calls_offsets_handler_for_setting(self, _, settings, handlers): input_config = deepcopy(self.CONFIG) + input_config["kafka_config"] |= settings kafka_input = Factory.create({"test": input_config}) - kafka_input._config.kafka_config.update(settings) kafka_consumer = kafka_input._consumer message = "test message" kafka_input._last_valid_records = {0: message} @@ -141,8 +142,8 @@ def test_batch_finished_callback_raises_input_warning_on_kafka_exception( self, _, settings, handler ): input_config = deepcopy(self.CONFIG) + input_config["kafka_config"] |= settings kafka_input = Factory.create({"test": input_config}) - kafka_input._config.kafka_config.update(settings) kafka_consumer = kafka_input._consumer return_sequence = [KafkaException("test error"), None] @@ -288,8 +289,8 @@ def test_default_config_is_injected(self, mock_consumer): @mock.patch("logprep.connector.confluent_kafka.input.Consumer") def test_client_id_can_be_overwritten(self, mock_consumer): input_config = deepcopy(self.CONFIG) + input_config["kafka_config"]["client.id"] = "thisclientid" kafka_input = Factory.create({"test": input_config}) - kafka_input._config.kafka_config["client.id"] = "thisclientid" kafka_input.setup() mock_consumer.assert_called() assert mock_consumer.call_args[0][0].get("client.id") == "thisclientid" @@ -297,8 +298,9 @@ def test_client_id_can_be_overwritten(self, mock_consumer): @mock.patch("logprep.connector.confluent_kafka.input.Consumer") def test_statistics_interval_can_be_overwritten(self, mock_consumer): - kafka_input = Factory.create({"test": self.CONFIG}) - kafka_input._config.kafka_config["statistics.interval.ms"] = "999999999" + input_config = deepcopy(self.CONFIG) + input_config["kafka_config"]["statistics.interval.ms"] = "999999999" + kafka_input = Factory.create({"test": input_config}) kafka_input.setup() mock_consumer.assert_called() assert mock_consumer.call_args[0][0].get("statistics.interval.ms") == "999999999" @@ -389,3 +391,26 @@ def test_revoke_callback_writes_output_backlog_and_calls_batch_finished_callback self.object._revoke_callback(mock_consumer, mock_partitions) self.object.output_connector._write_backlog.assert_called() self.object.batch_finished_callback.assert_called() + + def test_health_returns_true_if_no_error(self): + with mock.patch("logprep.connector.confluent_kafka.input.Consumer"): + assert self.object.health() + + def test_health_returns_false_on_kafka_exception(self): + self.object._consumer = mock.MagicMock() + self.object._consumer.list_topics.side_effect = KafkaException("test error") + assert not self.object.health() + + def test_health_logs_error_on_kafka_exception(self): + self.object._consumer = mock.MagicMock() + self.object._consumer.list_topics.side_effect = KafkaException("test error") + with mock.patch("logging.Logger.error") as mock_error: + self.object.health() + mock_error.assert_called() + + def test_health_counts_metrics_on_kafka_exception(self): + self.object.metrics.number_of_errors = 0 + self.object._consumer = mock.MagicMock() + self.object._consumer.list_topics.side_effect = KafkaException("test error") + assert not self.object.health() + assert self.object.metrics.number_of_errors == 1 diff --git a/tests/unit/connector/test_confluent_kafka_output.py b/tests/unit/connector/test_confluent_kafka_output.py index 10582ec91..385b0435a 100644 --- a/tests/unit/connector/test_confluent_kafka_output.py +++ b/tests/unit/connector/test_confluent_kafka_output.py @@ -10,6 +10,7 @@ from unittest import mock import pytest +from confluent_kafka.error import KafkaException from logprep.abc.output import CriticalOutputError, FatalOutputError from logprep.factory import Factory @@ -161,3 +162,26 @@ def test_raises_value_error_if_mandatory_parameters_not_set(self): expected_error_message = r"keys are missing: {'bootstrap.servers'}" with pytest.raises(InvalidConfigurationError, match=expected_error_message): Factory.create({"test": config}) + + def test_health_returns_true_if_no_error(self): + self.object._producer = mock.MagicMock() + assert self.object.health() + + def test_health_returns_false_on_kafka_exception(self): + self.object._producer = mock.MagicMock() + self.object._producer.list_topics.side_effect = KafkaException("test error") + assert not self.object.health() + + def test_health_logs_error_on_kafka_exception(self): + self.object._producer = mock.MagicMock() + self.object._producer.list_topics.side_effect = KafkaException("test error") + with mock.patch("logging.Logger.error") as mock_error: + self.object.health() + mock_error.assert_called() + + def test_health_counts_metrics_on_kafka_exception(self): + self.object.metrics.number_of_errors = 0 + self.object._producer = mock.MagicMock() + self.object._producer.list_topics.side_effect = KafkaException("test error") + assert not self.object.health() + assert self.object.metrics.number_of_errors == 1 diff --git a/tests/unit/connector/test_http_input.py b/tests/unit/connector/test_http_input.py index b2e9c88b7..c094cbf26 100644 --- a/tests/unit/connector/test_http_input.py +++ b/tests/unit/connector/test_http_input.py @@ -12,6 +12,7 @@ import pytest import requests +import responses import uvicorn from requests.auth import HTTPBasicAuth @@ -73,7 +74,7 @@ def setup_method(self): "/plaintext": "plaintext", "/auth-json-secret": "json", "/auth-json-file": "json", - "/.*/[A-Z]{2}/json$": "json", + "/[A-Za-z0-9]*/[A-Z]{2}/json$": "json", }, } @@ -115,6 +116,12 @@ def test_get_method_returns_200_with_authentication(self): resp = requests.get(url=f"{self.target}/auth-json-secret", timeout=0.5) assert resp.status_code == 200 + def test_get_method_returns_429_if_queue_is_full(self): + self.object.messages.full = mock.MagicMock() + self.object.messages.full.return_value = True + resp = requests.get(url=f"{self.target}/json", timeout=20) + assert resp.status_code == 429 + def test_get_error_code_too_many_requests(self): data = {"message": "my log message"} self.object.messages.put = mock.MagicMock() @@ -454,3 +461,83 @@ def test_raises_http_bad_request_on_decode_error(self, endpoint): data = "this is not a valid json nor jsonl" resp = requests.post(url=f"{self.target}/{endpoint}", data=data, timeout=0.5) assert resp.status_code == 400 + + @responses.activate + def test_health_endpoint_is_ready_if_all_endpoints_are_successful(self): + for endpoint in self.object.health_endpoints: + responses.get(f"http://127.0.0.1:9000{endpoint}", status=200) + assert self.object.health(), "Health endpoint should be ready" + + @responses.activate + def test_health_endpoint_is_not_ready_if_one_endpoint_has_status_429(self): + for endpoint in self.object.health_endpoints[0:-2]: + responses.get(f"http://127.0.0.1:9000{endpoint}", status=200) + endpoint = self.object.health_endpoints[-1] + responses.get(f"http://127.0.0.1:9000{endpoint}", status=429) # bad + assert not self.object.health(), "Health endpoint should not be ready" + + @responses.activate + def test_health_endpoint_is_not_ready_if_one_endpoint_has_status_500(self): + for endpoint in self.object.health_endpoints[1:-1]: + responses.get(f"http://127.0.0.1:9000{endpoint}", status=200) + endpoint = self.object.health_endpoints[0] + responses.get(f"http://127.0.0.1:9000{endpoint}", status=500) # bad + assert not self.object.health(), "Health endpoint should not be ready" + + @responses.activate + def test_health_endpoint_is_not_ready_on_connection_error(self): + for endpoint in self.object.health_endpoints[1:-1]: + responses.get(f"http://127.0.0.1:9000{endpoint}", status=200) + endpoint = self.object.health_endpoints[0] + responses.get(f"http://127.0.0.1:9000{endpoint}", body=requests.ConnectionError("bad")) + assert not self.object.health(), "Health endpoint should not be ready" + + @responses.activate + def test_health_endpoint_is_not_ready_if_one_endpoint_has_read_timeout(self): + for endpoint in self.object.health_endpoints[1:-1]: + responses.get(f"http://127.0.0.1:9000{endpoint}", status=200) + endpoint = self.object.health_endpoints[0] + responses.get(f"http://127.0.0.1:9000{endpoint}", body=requests.Timeout("bad")) + assert not self.object.health(), "Health endpoint should not be ready" + + @responses.activate + def test_health_check_logs_error(self): + endpoint = self.object.health_endpoints[0] + responses.get(f"http://127.0.0.1:9000{endpoint}", body=requests.Timeout("bad")) + with mock.patch("logging.Logger.error") as mock_logger: + assert not self.object.health(), "Health endpoint should not be ready" + mock_logger.assert_called() + + @responses.activate + def test_health_counts_errors(self): + self.object.metrics.number_of_errors = 0 + endpoint = self.object.health_endpoints[0] + responses.get(f"http://127.0.0.1:9000{endpoint}", status=500) # bad + assert not self.object.health() + assert self.object.metrics.number_of_errors == 1 + + def test_health_endpoints_are_shortened(self): + config = deepcopy(self.CONFIG) + endpoints = { + "/json": "json", + "/jsonl$": "jsonl", + "/.*/blah$": "json", + "/fooo.*/.+": "json", + "/[A-Za-z0-9]*/[A-Z]{2}/json$": "json", + } + expected_matching_regexes = ( + "/json", + "/jsonl$", + "/b/blah$", + "/fooob/b", + "/[A-Za-z0-9]{5}/[A-Z]{2}/json$", + ) + config["endpoints"] = endpoints + connector = Factory.create({"test connector": config}) + health_endpoints = connector.health_endpoints + for endpoint, expected in zip(health_endpoints, expected_matching_regexes): + assert re.match(expected, endpoint) + + @pytest.mark.skip("Not implemented") + def test_setup_calls_wait_for_health(self): + pass diff --git a/tests/unit/connector/test_http_output.py b/tests/unit/connector/test_http_output.py index e1bfb19ab..686aec190 100644 --- a/tests/unit/connector/test_http_output.py +++ b/tests/unit/connector/test_http_output.py @@ -149,3 +149,7 @@ def test_store_counts_processed_events(self): self.object.metrics.number_of_processed_events = 0 self.object.store({"message": "my event message"}) assert self.object.metrics.number_of_processed_events == 1 + + @pytest.mark.skip(reason="not implemented") + def test_setup_calls_wait_for_health(self): + pass diff --git a/tests/unit/connector/test_json_input.py b/tests/unit/connector/test_json_input.py index 92ac8907b..5b850045c 100644 --- a/tests/unit/connector/test_json_input.py +++ b/tests/unit/connector/test_json_input.py @@ -67,3 +67,7 @@ def test_repeat_documents_repeats_documents(self, mock_parse): for order in range(0, 9): event, _ = object.get_next(self.timeout) assert event.get("order") == order % 3 + + @pytest.mark.skip(reason="not implemented") + def test_setup_calls_wait_for_health(self): + pass diff --git a/tests/unit/connector/test_jsonl_input.py b/tests/unit/connector/test_jsonl_input.py index fe928c651..a8e3a234f 100644 --- a/tests/unit/connector/test_jsonl_input.py +++ b/tests/unit/connector/test_jsonl_input.py @@ -61,3 +61,7 @@ def test_repeat_documents_repeats_documents(self, mock_parse): for order in range(0, 9): event, _ = object.get_next(self.timeout) assert event.get("order") == order % 3 + + @pytest.mark.skip(reason="not implemented") + def test_setup_calls_wait_for_health(self): + pass diff --git a/tests/unit/connector/test_opensearch_output.py b/tests/unit/connector/test_opensearch_output.py index 60a348cd6..78bb6713c 100644 --- a/tests/unit/connector/test_opensearch_output.py +++ b/tests/unit/connector/test_opensearch_output.py @@ -362,12 +362,6 @@ def test_handle_serialization_error_raises_fatal_output_error(self): with pytest.raises(FatalOutputError): self.object._handle_serialization_error(mock.MagicMock()) - def test_setup_raises_fatal_output_error_if_opensearch_error_is_raised(self): - self.object._search_context.info = mock.MagicMock() - self.object._search_context.info.side_effect = SearchException - with pytest.raises(FatalOutputError): - self.object.setup() - def test_setup_registers_flush_timout_tasks(self): job_count = len(Component._scheduler.jobs) with mock.patch.object(self.object, "_search_context", new=mock.MagicMock()): @@ -459,3 +453,33 @@ def test_write_backlog_is_successful_after_two_retries(self, mock_sleep): self.object._write_backlog() assert mock_sleep.call_count == 2 assert self.object._message_backlog == [] + + def test_health_returns_true_on_success(self): + self.object._search_context = mock.MagicMock() + self.object._search_context.cluster.health.return_value = {"status": "green"} + assert self.object.health() + + @pytest.mark.parametrize("exception", [SearchException, ConnectionError]) + def test_health_returns_false_on_failure(self, exception): + self.object._search_context = mock.MagicMock() + self.object._search_context.cluster.health.side_effect = exception + assert not self.object.health() + + def test_health_logs_on_failure(self): + self.object._search_context = mock.MagicMock() + self.object._search_context.cluster.health.side_effect = SearchException + with mock.patch("logging.Logger.error") as mock_error: + assert not self.object.health() + mock_error.assert_called() + + def test_health_counts_metrics_on_failure(self): + self.object.metrics.number_of_errors = 0 + self.object._search_context = mock.MagicMock() + self.object._search_context.cluster.health.side_effect = SearchException + assert not self.object.health() + assert self.object.metrics.number_of_errors == 1 + + def test_health_returns_false_on_cluster_status_not_green(self): + self.object._search_context = mock.MagicMock() + self.object._search_context.cluster.health.return_value = {"status": "yellow"} + assert not self.object.health() diff --git a/tests/unit/connector/test_real_kafka.py b/tests/unit/connector/test_real_kafka.py index a35d00f8b..404816593 100644 --- a/tests/unit/connector/test_real_kafka.py +++ b/tests/unit/connector/test_real_kafka.py @@ -4,7 +4,6 @@ # pylint: disable=protected-access import logging import os -import re import subprocess import time import uuid @@ -110,7 +109,7 @@ def test_librdkafka_logs_forwarded_to_logprep_logger(self, caplog): }, } kafka_input = Factory.create({"librdkafkatest": input_config}) - kafka_input.get_next(10) + kafka_input.get_next(20) assert "Failed to resolve 'notexisting:9092'" in caplog.text @pytest.mark.skip(reason="is only for debugging") diff --git a/tests/unit/connector/test_s3_output.py b/tests/unit/connector/test_s3_output.py index 8cf474cab..58b54949d 100644 --- a/tests/unit/connector/test_s3_output.py +++ b/tests/unit/connector/test_s3_output.py @@ -325,3 +325,7 @@ def _calculate_backlog_size(s3_output): def test_setup_populates_cached_properties(self, mock_getmembers): self.object.setup() mock_getmembers.assert_called_with(self.object) + + @pytest.mark.skip(reason="Not implemented yet") + def test_setup_calls_wait_for_health(self): + pass diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 4ffb1db36..70f6b944b 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -5,6 +5,8 @@ import multiprocessing from copy import deepcopy from logging import DEBUG +from multiprocessing import Lock +from typing import Iterable, Tuple from unittest import mock import pytest @@ -710,3 +712,34 @@ def test_stop_sets_continue_iterating_to_false(self): pipeline._continue_iterating.value = True pipeline.stop() assert not pipeline._continue_iterating.value + + def test_health_returns_health_functions_without_output(self): + pipeline = Pipeline(config=self.config) + assert pipeline._output is None + health = pipeline.get_health_functions() + assert isinstance(health, Tuple) + assert len(health) > 0 + assert all(callable(health_function) for health_function in health) + + def test_health_returns_health_functions_with_output(self): + self.config.output = { + "dummy_output": {"type": "dummy_output"}, + } + pipeline = Pipeline(config=self.config) + assert pipeline._output is not None + health = pipeline.get_health_functions() + assert isinstance(health, Tuple) + assert len(health) > 0 + assert all(callable(health_function) for health_function in health) + + def test_health_returns_health_functions_with_multiple_outputs(self): + self.config.output = { + "dummy_output1": {"type": "dummy_output"}, + "dummy_output2": {"type": "dummy_output"}, + } + pipeline = Pipeline(config=self.config) + assert pipeline._output is not None + health = pipeline.get_health_functions() + assert isinstance(health, Tuple) + assert len(health) > 0 + assert all(callable(health_function) for health_function in health) diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index 36b474394..e2f552b7d 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -171,10 +171,9 @@ def test_restart_calls_prometheus_exporter_run(self): config = deepcopy(self.config) config.metrics = MetricsConfig(enabled=True, port=666) pipeline_manager = PipelineManager(config) - pipeline_manager.prometheus_exporter.is_running = False - with mock.patch.object(pipeline_manager.prometheus_exporter, "run") as mock_run: - pipeline_manager.restart() - mock_run.assert_called() + pipeline_manager.prometheus_exporter = mock.MagicMock() + pipeline_manager.restart() + pipeline_manager.prometheus_exporter.run.assert_called() def test_restart_sets_deterministic_pipline_index(self): config = deepcopy(self.config) @@ -256,6 +255,22 @@ def test_restart_failed_pipeline_restarts_immediately_on_negative_restart_count_ pipeline_manager.restart_failed_pipeline() mock_time_sleep.assert_not_called() + def test_restart_injects_healthcheck_functions(self): + pipeline_manager = PipelineManager(self.config) + pipeline_manager.prometheus_exporter = mock.MagicMock() + pipeline_manager._pipelines = [mock.MagicMock()] + pipeline_manager.restart() + pipeline_manager.prometheus_exporter.update_healthchecks.assert_called() + + def test_restart_ensures_prometheus_exporter_is_running(self): + config = deepcopy(self.config) + config.metrics = MetricsConfig(enabled=True, port=666) + pipeline_manager = PipelineManager(config) + pipeline_manager.prometheus_exporter._prepare_multiprocessing = mock.MagicMock() + with mock.patch("logprep.util.http.ThreadingHTTPServer"): + pipeline_manager.restart() + pipeline_manager.prometheus_exporter.server.start.assert_called() + class TestThrottlingQueue: diff --git a/tests/unit/generator/http/test_controller.py b/tests/unit/generator/http/test_controller.py index ee04694df..7d21cc380 100644 --- a/tests/unit/generator/http/test_controller.py +++ b/tests/unit/generator/http/test_controller.py @@ -4,6 +4,7 @@ import os from unittest import mock +import pytest import responses from logprep.generator.http.controller import Controller @@ -26,6 +27,7 @@ def setup_method(self): thread_count=1, ) + # @pytest.mark.skip(reason="This test blocks and has to be fixed") # TODO: Fix this test @responses.activate def test_run(self, tmp_path): dataset_path = tmp_path / "dataset" diff --git a/tests/unit/metrics/test_exporter.py b/tests/unit/metrics/test_exporter.py index 44097a4a8..742b8edc5 100644 --- a/tests/unit/metrics/test_exporter.py +++ b/tests/unit/metrics/test_exporter.py @@ -3,14 +3,15 @@ # pylint: disable=attribute-defined-outside-init # pylint: disable=line-too-long import os.path -from logging.config import dictConfig from unittest import mock +import pytest +import requests from prometheus_client import REGISTRY from logprep.metrics.exporter import PrometheusExporter +from logprep.util import http from logprep.util.configuration import MetricsConfig -from logprep.util.defaults import DEFAULT_LOG_CONFIG @mock.patch( @@ -20,16 +21,16 @@ class TestPrometheusExporter: def setup_method(self): REGISTRY.__init__() - self.metrics_config = MetricsConfig(enabled=True, port=80) + self.metrics_config = MetricsConfig(enabled=True, port=8000) def test_correct_setup(self): exporter = PrometheusExporter(self.metrics_config) - assert exporter._port == self.metrics_config.port + assert exporter.configuration.port == self.metrics_config.port def test_default_port_if_missing_in_config(self): metrics_config = MetricsConfig(enabled=True) exporter = PrometheusExporter(metrics_config) - assert exporter._port == 8000 + assert exporter.configuration.port == 8000 @mock.patch("logprep.util.http.ThreadingHTTPServer.start") def test_run_starts_http_server(self, mock_http_server_start): @@ -78,4 +79,107 @@ def test_mark_process_dead_calls_multiprocess_mark_dead(self, mock_multiprocess) def test_exporter_spawns_server_on_all_interfaces(self): exporter = PrometheusExporter(self.metrics_config) - assert exporter._server.server.config.host == "0.0.0.0" + exporter.init_server() + assert exporter.server.uvicorn_config.host == "0.0.0.0" + + def test_is_running_returns_false_when_server_not_set(self): + exporter = PrometheusExporter(self.metrics_config) + assert not exporter.is_running + + def test_is_running_returns_false_when_server_thread_not_set(self): + exporter = PrometheusExporter(self.metrics_config) + exporter.server = http.ThreadingHTTPServer({}, None, False) + assert not exporter.is_running + + def test_is_running_returns_false_when_server_thread_is_not_alive(self): + exporter = PrometheusExporter(self.metrics_config) + exporter.server = http.ThreadingHTTPServer({}, None, False) + exporter.server.thread = mock.Mock() + exporter.server.thread.is_alive.return_value = False + assert not exporter.is_running + + def test_is_running_returns_true_when_server_thread_is_alive(self): + exporter = PrometheusExporter(self.metrics_config) + exporter.server = http.ThreadingHTTPServer({}, None, False) + exporter.server.thread = mock.Mock() + exporter.server.thread.is_alive.return_value = True + assert exporter.is_running + + +@mock.patch( + "logprep.metrics.exporter.PrometheusExporter._prepare_multiprocessing", + new=lambda *args, **kwargs: None, +) +class TestHealthEndpoint: + def setup_method(self): + REGISTRY.__init__() + self.metrics_config = MetricsConfig(enabled=True, port=8000) + + def test_health_endpoint_returns_503_as_default_health_state(self): + exporter = PrometheusExporter(self.metrics_config) + exporter.run(daemon=False) + resp = requests.get("http://localhost:8000/health", timeout=0.5) + assert resp.status_code == 503 + exporter.server.shut_down() + + def test_health_endpoint_calls_health_check_functions(self): + exporter = PrometheusExporter(self.metrics_config) + function_mock = mock.Mock(return_value=True) + exporter.healthcheck_functions = [function_mock] + exporter.run(daemon=False) + resp = requests.get("http://localhost:8000/health", timeout=0.5) + assert resp.status_code == 200 + assert function_mock.call_count == 1 + + exporter.server.shut_down() + + def test_health_endpoint_calls_updated_functions(self): + exporter = PrometheusExporter(self.metrics_config) + function_mock = mock.Mock(return_value=True) + exporter.healthcheck_functions = [function_mock] + exporter.run(daemon=False) + requests.get("http://localhost:8000/health", timeout=0.5) + assert function_mock.call_count == 1, "initial function should be called" + new_function_mock = mock.Mock(return_value=True) + exporter.update_healthchecks([new_function_mock]) + requests.get("http://localhost:8000/health", timeout=0.5) + assert new_function_mock.call_count == 1, "New function should be called" + assert function_mock.call_count == 1, "Old function should not be called" + + exporter.server.shut_down() + + @pytest.mark.parametrize( + "functions, expected", + [ + ([lambda: True], 200), + ([lambda: True, lambda: True], 200), + ([lambda: False], 503), + ([lambda: False, lambda: False], 503), + ([lambda: False, lambda: True, lambda: True], 503), + ], + ) + def test_health_check_returns_status_code(self, functions, expected): + exporter = PrometheusExporter(self.metrics_config) + exporter.run(daemon=False) + exporter.update_healthchecks(functions) + resp = requests.get("http://localhost:8000/health", timeout=0.5) + assert resp.status_code == expected + exporter.server.shut_down() + + @pytest.mark.parametrize( + "functions, expected", + [ + ([lambda: True], "OK"), + ([lambda: True, lambda: True], "OK"), + ([lambda: False], "FAIL"), + ([lambda: False, lambda: False], "FAIL"), + ([lambda: False, lambda: True, lambda: True], "FAIL"), + ], + ) + def test_health_check_returns_body(self, functions, expected): + exporter = PrometheusExporter(self.metrics_config) + exporter.run(daemon=False) + exporter.update_healthchecks(functions) + resp = requests.get("http://localhost:8000/health", timeout=0.5) + assert resp.content.decode() == expected + exporter.server.shut_down() diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index d0adc908b..9f1c4225b 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -95,8 +95,6 @@ def setup_method(self) -> None: patcher.start() self.patchers.append(patcher) super().setup_method() - config = {"Test Instance Name": self.CONFIG} - self.object = Factory.create(configuration=config) self.specific_rules = self.set_rules(self.specific_rules_dirs) self.generic_rules = self.set_rules(self.generic_rules_dirs) self.match_all_event = { diff --git a/tests/unit/util/test_http.py b/tests/unit/util/test_http.py new file mode 100644 index 000000000..d1a306d7b --- /dev/null +++ b/tests/unit/util/test_http.py @@ -0,0 +1,46 @@ +# pylint: disable=missing-docstring +from unittest import mock + +from logprep.util.http import ThreadingHTTPServer + + +class TestThreadingHTTPServer: + + def test_start_server(self): + uvicorn_config = {} + app = None + server = ThreadingHTTPServer(uvicorn_config, app, False) + server.start() + assert server.thread.is_alive() + server.shut_down(0.1) + + def test_shutdown_server(self): + uvicorn_config = {} + app = None + server = ThreadingHTTPServer(uvicorn_config, app, False) + server.start() + thread = server.thread + uvicorn_server = server.server + server.shut_down(0.1) + assert not thread.is_alive() + assert uvicorn_server.should_exit + + def test_shutdown_server_double_checks_thread_is_dead(self): + uvicorn_config = {} + app = None + server = ThreadingHTTPServer(uvicorn_config, app, False) + server.thread = mock.MagicMock() + server.server = mock.MagicMock() + server.thread.is_alive.return_value = False + server.shut_down(0.1) + assert server.thread.is_alive.call_count == 2 + + def test_restart_server(self): + uvicorn_config = {} + app = None + server = ThreadingHTTPServer(uvicorn_config, app, False) + server.start() + thread = server.thread + server.restart(0.1) + assert server.thread is not thread + server.shut_down(0.1)