diff --git a/logprep/connector/http/output.py b/logprep/connector/http/output.py index c6b6af943..85284a576 100644 --- a/logprep/connector/http/output.py +++ b/logprep/connector/http/output.py @@ -4,11 +4,13 @@ import logging from functools import cached_property +from typing import Iterable import requests from attrs import define, field, validators from logprep.abc.output import Output +from logprep.metrics.metrics import CounterMetric, GaugeMetric logger = logging.getLogger("HttpOutput") @@ -17,6 +19,18 @@ class HttpOutput(Output): """Output that sends http post requests to paths under a given endpoint with configured credentials""" + @define(kw_only=True) + class Metrics(Output.Metrics): + """Tracks statistics about this connector""" + + number_of_http_requests: CounterMetric = field( + factory=lambda: CounterMetric( + description="Number of incomming requests", + name="number_of_http_requests", + ) + ) + """Number of outgoing requests""" + @define(kw_only=True) class Config(Output.Config): """Configuration for the HttpOutput.""" @@ -42,17 +56,25 @@ def password(self): def _headers(self): return {"Content-Type": "application/x-ndjson; charset=utf-8"} - def store_custom(self, document: dict, target: str): + def store_custom(self, document: dict | tuple | list, target: str) -> None: """ Send a batch of events to an endpoint and return the received status code times the number of events. """ - self._send_post_request(target, document) - self.metrics.number_of_processed_events += 1 + if isinstance(document, dict): + self._send_post_request(target, self._encoder.encode(document)) + self.metrics.number_of_processed_events += 1 + elif isinstance(document, (tuple, list)): + self._send_post_request(target, self._encoder.encode_lines(document)) + self.metrics.number_of_processed_events += len(document) + else: + error = TypeError(f"Document type {type(document)} is not supported") + self.store_failed(str(error), document, document) def store(self, document: tuple[str, dict] | dict) -> dict: if isinstance(document, tuple): target, document = document + target = f"{self._config.target_url}{target}" else: target = self._config.target_url self.store_custom(document, target) @@ -60,19 +82,19 @@ def store(self, document: tuple[str, dict] | dict) -> dict: def store_failed(self, error_message: str, document_received: dict, document_processed: dict): self.metrics.number_of_failed_events += 1 - def _send_post_request(self, event_target: str, request_data: str) -> dict: + def _send_post_request(self, event_target: str, request_data: bytes) -> dict: """Send a post request with given data to the specified endpoint""" try: try: response = requests.post( f"{event_target}", headers=self._headers, - data=request_data, verify=False, auth=(self.user, self.password), timeout=2, ) logger.debug("Servers response code is: %i", response.status_code) + self.metrics.number_of_http_requests += 1 response.raise_for_status() if self.input_connector is not None: self.input_connector.batch_finished_callback() diff --git a/tests/unit/connector/test_http_output.py b/tests/unit/connector/test_http_output.py index c971652d4..19cdcf0ba 100644 --- a/tests/unit/connector/test_http_output.py +++ b/tests/unit/connector/test_http_output.py @@ -4,7 +4,6 @@ import pytest import responses -from logprep.connector.http.output import HttpOutput from tests.unit.connector.base import BaseOutputTestCase TARGET_URL = "https://www.test.de" @@ -19,21 +18,26 @@ class TestOutput(BaseOutputTestCase): "password": "password", } + expected_metrics = [ + *BaseOutputTestCase.expected_metrics, + "logprep_number_of_http_requests", + ] + @responses.activate def test_one_repeat(self): self.object.metrics.number_of_processed_events = 0 responses.add(responses.POST, f"{TARGET_URL}/123", status=200) - events_string = """[{"event1_key": "event1_value"}\n{"event2_key": "event2_value"}]""" - batch = (f"{TARGET_URL}/123", events_string) + events = [{"event1_key": "event1_value"}, {"event2_key": "event2_value"}] + batch = (f"{TARGET_URL}/123", events) self.object.store(batch) - assert self.object.metrics.number_of_processed_events == 1 + assert self.object.metrics.number_of_processed_events == 2 @responses.activate def test_404_status_code(self): self.object.metrics.number_of_failed_events = 0 responses.add(responses.POST, f"{TARGET_URL}/123", status=404) - events_string = """[{"event1_key": "event1_value"}\n{"event2_key": "event2_value"}]""" - batch = (f"{TARGET_URL}/123", events_string) + events = [{"event1_key": "event1_value"}, {"event2_key": "event2_value"}] + batch = (f"{TARGET_URL}/123", events) self.object.store(batch) assert self.object.metrics.number_of_failed_events == 1 @@ -43,3 +47,57 @@ def test_store_calls_batch_finished_callback(self): self.object.input_connector = mock.MagicMock() self.object.store({"message": "my event message"}) self.object.input_connector.batch_finished_callback.assert_called() + + @pytest.mark.parametrize( + "testcase, input_data, number_of_expected_requests, number_of_expeted_events", + [ + ("dict to root", {"message": "my event message"}, 1, 1), + ("dict to target", ("/abc", {"message": "my event message"}), 1, 1), + ( + "list to target", + ("/abc", [{"message": "my event message"}, {"message": "my event message"}]), + 1, + 2, + ), + ( + "tuple to target", + ("/abc", ({"message": "my event message"}, {"message": "my event message"})), + 1, + 2, + ), + ], + ) + @responses.activate + def test_store_counts_number_of_requests_and_events( + self, testcase, input_data, number_of_expected_requests, number_of_expeted_events + ): + if isinstance(input_data, tuple): + target_url = input_data[0] + target_url = f"{TARGET_URL}{target_url}" + else: + target_url = TARGET_URL + responses.add(responses.POST, f"{target_url}", status=200) + self.object.metrics.number_of_processed_events = 0 + self.object.metrics.number_of_http_requests = 0 + self.object.metrics.number_of_failed_events = 0 + self.object.store(input_data) + assert ( + self.object.metrics.number_of_failed_events == 0 + ), f"no failed events for input {testcase}" + assert self.object.metrics.number_of_processed_events == number_of_expeted_events + assert self.object.metrics.number_of_http_requests == number_of_expected_requests + + @pytest.mark.parametrize( + "testcase, input_data", + [ + ("generator", (a for a in range(10))), + ("set", {1, 2, 3}), + ("int", 123), + ("float", 123.123), + ("str", "123"), + ], + ) + def test_send_not_supported_input_data(self, testcase, input_data): + self.object.metrics.number_of_failed_events = 0 + self.object.store_custom(123, input_data) + assert self.object.metrics.number_of_failed_events == 1, testcase