Skip to content

Commit

Permalink
handle inputdata
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed May 16, 2024
1 parent 0f6a6b9 commit c8eba7b
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 11 deletions.
32 changes: 27 additions & 5 deletions logprep/connector/http/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

import logging
from functools import cached_property
from typing import Iterable

import requests
from attrs import define, field, validators

from logprep.abc.output import Output
from logprep.metrics.metrics import CounterMetric, GaugeMetric

logger = logging.getLogger("HttpOutput")

Expand All @@ -17,6 +19,18 @@ class HttpOutput(Output):
"""Output that sends http post requests to paths under a given endpoint
with configured credentials"""

@define(kw_only=True)
class Metrics(Output.Metrics):
"""Tracks statistics about this connector"""

number_of_http_requests: CounterMetric = field(
factory=lambda: CounterMetric(
description="Number of incomming requests",
name="number_of_http_requests",
)
)
"""Number of outgoing requests"""

@define(kw_only=True)
class Config(Output.Config):
"""Configuration for the HttpOutput."""
Expand All @@ -42,37 +56,45 @@ def password(self):
def _headers(self):
return {"Content-Type": "application/x-ndjson; charset=utf-8"}

def store_custom(self, document: dict, target: str):
def store_custom(self, document: dict | tuple | list, target: str) -> None:
"""
Send a batch of events to an endpoint and return the received status code times the number
of events.
"""
self._send_post_request(target, document)
self.metrics.number_of_processed_events += 1
if isinstance(document, dict):
self._send_post_request(target, self._encoder.encode(document))
self.metrics.number_of_processed_events += 1
elif isinstance(document, (tuple, list)):
self._send_post_request(target, self._encoder.encode_lines(document))
self.metrics.number_of_processed_events += len(document)
else:
error = TypeError(f"Document type {type(document)} is not supported")
self.store_failed(str(error), document, document)

def store(self, document: tuple[str, dict] | dict) -> dict:
if isinstance(document, tuple):
target, document = document
target = f"{self._config.target_url}{target}"
else:
target = self._config.target_url
self.store_custom(document, target)

def store_failed(self, error_message: str, document_received: dict, document_processed: dict):
self.metrics.number_of_failed_events += 1

def _send_post_request(self, event_target: str, request_data: str) -> dict:
def _send_post_request(self, event_target: str, request_data: bytes) -> dict:
"""Send a post request with given data to the specified endpoint"""
try:
try:
response = requests.post(
f"{event_target}",
headers=self._headers,
data=request_data,
verify=False,
auth=(self.user, self.password),
timeout=2,
)
logger.debug("Servers response code is: %i", response.status_code)
self.metrics.number_of_http_requests += 1
response.raise_for_status()
if self.input_connector is not None:
self.input_connector.batch_finished_callback()
Expand Down
70 changes: 64 additions & 6 deletions tests/unit/connector/test_http_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import pytest
import responses

from logprep.connector.http.output import HttpOutput
from tests.unit.connector.base import BaseOutputTestCase

TARGET_URL = "https://www.test.de"
Expand All @@ -19,21 +18,26 @@ class TestOutput(BaseOutputTestCase):
"password": "password",
}

expected_metrics = [
*BaseOutputTestCase.expected_metrics,
"logprep_number_of_http_requests",
]

@responses.activate
def test_one_repeat(self):
self.object.metrics.number_of_processed_events = 0
responses.add(responses.POST, f"{TARGET_URL}/123", status=200)
events_string = """[{"event1_key": "event1_value"}\n{"event2_key": "event2_value"}]"""
batch = (f"{TARGET_URL}/123", events_string)
events = [{"event1_key": "event1_value"}, {"event2_key": "event2_value"}]
batch = (f"{TARGET_URL}/123", events)
self.object.store(batch)
assert self.object.metrics.number_of_processed_events == 1
assert self.object.metrics.number_of_processed_events == 2

@responses.activate
def test_404_status_code(self):
self.object.metrics.number_of_failed_events = 0
responses.add(responses.POST, f"{TARGET_URL}/123", status=404)
events_string = """[{"event1_key": "event1_value"}\n{"event2_key": "event2_value"}]"""
batch = (f"{TARGET_URL}/123", events_string)
events = [{"event1_key": "event1_value"}, {"event2_key": "event2_value"}]
batch = (f"{TARGET_URL}/123", events)
self.object.store(batch)
assert self.object.metrics.number_of_failed_events == 1

Expand All @@ -43,3 +47,57 @@ def test_store_calls_batch_finished_callback(self):
self.object.input_connector = mock.MagicMock()
self.object.store({"message": "my event message"})
self.object.input_connector.batch_finished_callback.assert_called()

@pytest.mark.parametrize(
"testcase, input_data, number_of_expected_requests, number_of_expeted_events",
[
("dict to root", {"message": "my event message"}, 1, 1),
("dict to target", ("/abc", {"message": "my event message"}), 1, 1),
(
"list to target",
("/abc", [{"message": "my event message"}, {"message": "my event message"}]),
1,
2,
),
(
"tuple to target",
("/abc", ({"message": "my event message"}, {"message": "my event message"})),
1,
2,
),
],
)
@responses.activate
def test_store_counts_number_of_requests_and_events(
self, testcase, input_data, number_of_expected_requests, number_of_expeted_events
):
if isinstance(input_data, tuple):
target_url = input_data[0]
target_url = f"{TARGET_URL}{target_url}"
else:
target_url = TARGET_URL
responses.add(responses.POST, f"{target_url}", status=200)
self.object.metrics.number_of_processed_events = 0
self.object.metrics.number_of_http_requests = 0
self.object.metrics.number_of_failed_events = 0
self.object.store(input_data)
assert (
self.object.metrics.number_of_failed_events == 0
), f"no failed events for input {testcase}"
assert self.object.metrics.number_of_processed_events == number_of_expeted_events
assert self.object.metrics.number_of_http_requests == number_of_expected_requests

@pytest.mark.parametrize(
"testcase, input_data",
[
("generator", (a for a in range(10))),
("set", {1, 2, 3}),
("int", 123),
("float", 123.123),
("str", "123"),
],
)
def test_send_not_supported_input_data(self, testcase, input_data):
self.object.metrics.number_of_failed_events = 0
self.object.store_custom(123, input_data)
assert self.object.metrics.number_of_failed_events == 1, testcase

0 comments on commit c8eba7b

Please sign in to comment.