diff --git a/logprep/abc/component.py b/logprep/abc/component.py index d7ad756fb..3d10a80b7 100644 --- a/logprep/abc/component.py +++ b/logprep/abc/component.py @@ -15,7 +15,7 @@ from schedule import Scheduler from logprep.metrics.metrics import Metric -from logprep.util.defaults import DEFAULT_HEALTH_TIMEOUT +from logprep.util.defaults import DEFAULT_HEALTH_TIMEOUT, EXITCODES from logprep.util.helper import camel_to_snake logger = logging.getLogger("Component") @@ -99,6 +99,21 @@ def describe(self) -> str: def setup(self): """Set the component up.""" self._populate_cached_properties() + if not "http" in self._config.type: + self._wait_for_health() + + def _wait_for_health(self) -> None: + """Wait for the component to be healthy. + if the component is not healthy after a period of time, the process will exit. + """ + for i in range(3): + if self.health(): + break + logger.info("Wait for %s initially becoming healthy: %s/3", self.name, i + 1) + time.sleep(1 + i) + else: + logger.error("Component '%s' did not become healthy", self.name) + sys.exit(EXITCODES.PIPELINE_ERROR.value) def _populate_cached_properties(self): _ = [ diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 6c8c85203..c20a211d0 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -217,15 +217,13 @@ def _setup(self): output.input_connector = self._input if output.default: self._input.output_connector = output + output.setup() self.logger.debug( f"Created connectors -> input: '{self._input.describe()}'," f" output -> '{[output.describe() for _, output in self._output.items()]}'" ) self._input.pipeline_index = self.pipeline_index self._input.setup() - for _, output in self._output.items(): - output.setup() - self.logger.debug("Finished creating connectors") self.logger.info("Start building pipeline") _ = self._pipeline diff --git a/logprep/util/http.py b/logprep/util/http.py index 1e452c8b5..22afe9328 100644 --- a/logprep/util/http.py +++ b/logprep/util/http.py @@ -90,21 +90,23 @@ def start(self): while not self.server.started: continue - def shut_down(self): + def shut_down(self, wait: float = 1) -> None: """Stop thread with uvicorn+falcon http server, wait for uvicorn to exit gracefully and join the thread""" - if not self.thread.is_alive() or self.server is None: + if self.thread is None or self.server is None: return - while self.thread.is_alive(): - self.server.should_exit = True + self.server.should_exit = True + while 1: self._logger.debug("Wait for server to exit gracefully...") - continue + if not self.thread.is_alive(): + time.sleep(wait) + if not self.thread.is_alive(): # we have to double check if it is really dead + break + time.sleep(wait) self.thread.join() - self.server = None - self.thread = None - def restart(self): + def restart(self, wait: float = 1) -> None: """Restart the server by shutting down the existing server and starting a new one""" - self.shut_down() + self.shut_down(wait=wait) self.start() diff --git a/tests/unit/util/test_http.py b/tests/unit/util/test_http.py index 0fbaf7be1..d1a306d7b 100644 --- a/tests/unit/util/test_http.py +++ b/tests/unit/util/test_http.py @@ -1,4 +1,6 @@ # pylint: disable=missing-docstring +from unittest import mock + from logprep.util.http import ThreadingHTTPServer @@ -10,6 +12,7 @@ def test_start_server(self): server = ThreadingHTTPServer(uvicorn_config, app, False) server.start() assert server.thread.is_alive() + server.shut_down(0.1) def test_shutdown_server(self): uvicorn_config = {} @@ -18,11 +21,19 @@ def test_shutdown_server(self): server.start() thread = server.thread uvicorn_server = server.server - server.shut_down() + server.shut_down(0.1) assert not thread.is_alive() assert uvicorn_server.should_exit - assert server.thread is None - assert server.server is None + + def test_shutdown_server_double_checks_thread_is_dead(self): + uvicorn_config = {} + app = None + server = ThreadingHTTPServer(uvicorn_config, app, False) + server.thread = mock.MagicMock() + server.server = mock.MagicMock() + server.thread.is_alive.return_value = False + server.shut_down(0.1) + assert server.thread.is_alive.call_count == 2 def test_restart_server(self): uvicorn_config = {} @@ -30,7 +41,6 @@ def test_restart_server(self): server = ThreadingHTTPServer(uvicorn_config, app, False) server.start() thread = server.thread - server.restart() + server.restart(0.1) assert server.thread is not thread - server.shut_down() - assert server.thread is None + server.shut_down(0.1)