diff --git a/CHANGELOG.md b/CHANGELOG.md index c10d73e72..c744c6be8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ * `timestamper` now writes `_timestamper_missing_field_warning` tag to event tags instead of `_timestamper_failure` in case of missing fields * rename `--thread_count` parameter to `--thread-count` in http generator * removed `--report` parameter and feature from http generator -* when using `extend_target_list` in the `field manager`the ordering of the given source fields is now preserved +* when using `extend_target_list` in the `field manager`the ordering of the given source fields is now preserved * logprep now exits with a negative exit code if pipeline restart fails 5 times * this was implemented because further restart behavior should be configured on level of a system init service or container orchestrating service like k8s * the `restart_count` parameter is configurable. If you want the old behavior back, you can set this parameter to a negative number @@ -31,12 +31,13 @@ * add pseudonymization tools to logprep -> see: `logprep pseudo --help` * add `restart_count` parameter to configuration * add option `mode` to `pseudonymizer` processor and to pseudonymization tools to chose the AES Mode for encryption and decryption +* add retry mechanism to opensearch parallel bulk, if opensearch returns 429 `rejected_execution_exception` ### Improvements * remove logger from Components and Factory signatures * align processor architecture to use methods like `write_to_target`, `add_field_to` and `get_dotted_field_value` when reading and writing from and to events - * required substantial refactoring of the `hyperscan_resolver`, `generic_resolver` and `template_replacer` + * required substantial refactoring of the `hyperscan_resolver`, `generic_resolver` and `template_replacer` * change `pseudonymizer`, `pre_detector`, `selective_extractor` processors and `pipeline` to handle `extra_data` the same way * refactor `clusterer`, `pre_detector` and `pseudonymizer` processors and change `rule_tree` so that the processor do not require `process` override * required substantial refactoring of the `clusterer` diff --git a/logprep/abc/output.py b/logprep/abc/output.py index fd9d128c6..1cc595572 100644 --- a/logprep/abc/output.py +++ b/logprep/abc/output.py @@ -3,7 +3,6 @@ """ from abc import abstractmethod -from logging import Logger from typing import Optional from attrs import define, field, validators @@ -39,7 +38,7 @@ def __init__(self, output, message, raw_input): class FatalOutputError(OutputError): - """Must not be catched.""" + """Must not be caught.""" class Output(Connector): diff --git a/logprep/connector/elasticsearch/output.py b/logprep/connector/elasticsearch/output.py index c18ba530e..2355c7130 100644 --- a/logprep/connector/elasticsearch/output.py +++ b/logprep/connector/elasticsearch/output.py @@ -95,7 +95,9 @@ class Config(Output.Config): """(Optional) Timeout for the connection (default is 500ms).""" max_retries: int = field(validator=validators.instance_of(int), default=0) """(Optional) Maximum number of retries for documents rejected with code 429 (default is 0). - Increases backoff time by 2 seconds per try, but never exceeds 600 seconds.""" + Increases backoff time by 2 seconds per try, but never exceeds 600 seconds. When using + parallel_bulk in the opensearch connector then the backoff time starts with 1 second. With + each consecutive retry 500 to 1000 ms will be added to the delay, chosen randomly """ user: Optional[str] = field(validator=validators.instance_of(str), default="") """(Optional) User used for authentication.""" secret: Optional[str] = field(validator=validators.instance_of(str), default="") @@ -439,7 +441,7 @@ def _message_exceeds_max_size_error(self, error): return True if error.error == "rejected_execution_exception": - reason = error.info.get("error", {}).get("reason", {}) + reason = error.info.get("error", {}).get("reason", "") match = self._size_error_pattern.match(reason) if match and int(match.group("size")) >= int(match.group("max_size")): return True diff --git a/logprep/connector/opensearch/output.py b/logprep/connector/opensearch/output.py index 3d1568401..7ea404347 100644 --- a/logprep/connector/opensearch/output.py +++ b/logprep/connector/opensearch/output.py @@ -31,6 +31,8 @@ """ import logging +import random +import time from functools import cached_property import opensearchpy as search @@ -38,7 +40,7 @@ from opensearchpy import helpers from opensearchpy.serializer import JSONSerializer -from logprep.abc.output import Output +from logprep.abc.output import Output, FatalOutputError from logprep.connector.elasticsearch.output import ElasticsearchOutput logger = logging.getLogger("OpenSearchOutput") @@ -87,12 +89,10 @@ class Config(ElasticsearchOutput.Config): default=4, validator=[validators.instance_of(int), validators.gt(1)] ) """Number of threads to use for bulk requests.""" - queue_size: int = field( default=4, validator=[validators.instance_of(int), validators.gt(1)] ) """Number of queue size to use for bulk requests.""" - chunk_size: int = field( default=500, validator=[validators.instance_of(int), validators.gt(1)] ) @@ -137,15 +137,30 @@ def _bulk(self, client, actions, *args, **kwargs): self._handle_transport_error(error) def _parallel_bulk(self, client, actions, *args, **kwargs): - for success, item in helpers.parallel_bulk( - client, - actions=actions, - chunk_size=self._config.chunk_size, - queue_size=self._config.queue_size, - raise_on_error=True, - raise_on_exception=True, - ): - if not success: - result = item[list(item.keys())[0]] - if "error" in result: - raise result.get("error") + bulk_delays = 1 + for _ in range(self._config.max_retries + 1): + try: + for success, item in helpers.parallel_bulk( + client, + actions=actions, + chunk_size=self._config.chunk_size, + queue_size=self._config.queue_size, + raise_on_error=True, + raise_on_exception=True, + ): + if not success: + result = item[list(item.keys())[0]] + if "error" in result: + raise result.get("error") + break + except search.ConnectionError as error: + raise error + except search.exceptions.TransportError as error: + if self._message_exceeds_max_size_error(error): + raise error + time.sleep(bulk_delays) + bulk_delays += random.randint(500, 1000) / 1000 + else: + raise FatalOutputError( + self, "Opensearch too many requests, all parallel bulk retries failed" + ) diff --git a/tests/unit/connector/test_elasticsearch_output.py b/tests/unit/connector/test_elasticsearch_output.py index 182293606..ef8c7fdfa 100644 --- a/tests/unit/connector/test_elasticsearch_output.py +++ b/tests/unit/connector/test_elasticsearch_output.py @@ -256,7 +256,7 @@ def test_handle_bulk_index_error_calls_bulk_with_error_documents(self, fake_bulk {"invalid": "error"}, [{"foo": "*" * 500}], 1, - TypeError, + FatalOutputError, ), ( 429, diff --git a/tests/unit/connector/test_opensearch_output.py b/tests/unit/connector/test_opensearch_output.py index ece59af4e..1621f2721 100644 --- a/tests/unit/connector/test_opensearch_output.py +++ b/tests/unit/connector/test_opensearch_output.py @@ -158,8 +158,9 @@ def test_write_to_search_context_calls_handle_serialization_error_if_serializati @mock.patch( "opensearchpy.helpers.parallel_bulk", - side_effect=search.ConnectionError, + side_effect=search.ConnectionError(-1), ) + @mock.patch("time.sleep", mock.MagicMock()) # to speed up test execution def test_write_to_search_context_calls_handle_connection_error_if_connection_error(self, _): self.object._config.message_backlog_size = 1 self.object._handle_connection_error = mock.MagicMock() @@ -269,7 +270,7 @@ def test_handle_bulk_index_error_calls_bulk_with_error_documents(self, fake_bulk {"invalid": "error"}, [{"foo": "*" * 500}], 1, - TypeError, + FatalOutputError, ), ( 429, @@ -404,3 +405,34 @@ def test_opensearch_parallel_bulk(self): def test_setup_populates_cached_properties(self, mock_getmembers): self.object.setup() mock_getmembers.assert_called_with(self.object) + + @mock.patch( + "opensearchpy.helpers.parallel_bulk", + side_effect=search.TransportError(429, "rejected_execution_exception", {}), + ) + @mock.patch("time.sleep") + def test_write_backlog_fails_if_all_retries_are_exceeded(self, _, mock_sleep): + self.object._config.maximum_message_size_mb = 1 + self.object._config.max_retries = 5 + self.object._message_backlog = [{"some": "event"}] + with pytest.raises( + FatalOutputError, match="Opensearch too many requests, all parallel bulk retries failed" + ): + self.object._write_backlog() + assert mock_sleep.call_count == 6 # one initial try + 5 retries + assert self.object._message_backlog == [{"some": "event"}] + + @mock.patch("time.sleep") + def test_write_backlog_is_successful_after_two_retries(self, mock_sleep): + side_effects = [ + search.TransportError(429, "rejected_execution_exception", {}), + search.TransportError(429, "rejected_execution_exception", {}), + [], + ] + with mock.patch("opensearchpy.helpers.parallel_bulk", side_effect=side_effects): + self.object._config.maximum_message_size_mb = 1 + self.object._config.max_retries = 5 + self.object._message_backlog = [{"some": "event"}] + self.object._write_backlog() + assert mock_sleep.call_count == 2 + assert self.object._message_backlog == []