Skip to content

Commit

Permalink
fix thread does not terminate sometimes
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Sep 13, 2024
1 parent 9534a4c commit 185e86d
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 19 deletions.
17 changes: 16 additions & 1 deletion logprep/abc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from schedule import Scheduler

from logprep.metrics.metrics import Metric
from logprep.util.defaults import DEFAULT_HEALTH_TIMEOUT
from logprep.util.defaults import DEFAULT_HEALTH_TIMEOUT, EXITCODES
from logprep.util.helper import camel_to_snake

logger = logging.getLogger("Component")
Expand Down Expand Up @@ -99,6 +99,21 @@ def describe(self) -> str:
def setup(self):
"""Set the component up."""
self._populate_cached_properties()
if not "http" in self._config.type:
self._wait_for_health()

def _wait_for_health(self) -> None:
"""Wait for the component to be healthy.
if the component is not healthy after a period of time, the process will exit.
"""
for i in range(3):
if self.health():
break
logger.info("Wait for %s initially becoming healthy: %s/3", self.name, i + 1)
time.sleep(1 + i)
else:
logger.error("Component '%s' did not become healthy", self.name)
sys.exit(EXITCODES.PIPELINE_ERROR.value)

def _populate_cached_properties(self):
_ = [
Expand Down
4 changes: 1 addition & 3 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,13 @@ def _setup(self):
output.input_connector = self._input
if output.default:
self._input.output_connector = output
output.setup()
self.logger.debug(
f"Created connectors -> input: '{self._input.describe()}',"
f" output -> '{[output.describe() for _, output in self._output.items()]}'"
)
self._input.pipeline_index = self.pipeline_index
self._input.setup()
for _, output in self._output.items():
output.setup()

self.logger.debug("Finished creating connectors")
self.logger.info("Start building pipeline")
_ = self._pipeline
Expand Down
20 changes: 11 additions & 9 deletions logprep/util/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,23 @@ def start(self):
while not self.server.started:
continue

def shut_down(self):
def shut_down(self, wait: float = 1) -> None:
"""Stop thread with uvicorn+falcon http server, wait for uvicorn
to exit gracefully and join the thread"""
if not self.thread.is_alive() or self.server is None:
if self.thread is None or self.server is None:
return
while self.thread.is_alive():
self.server.should_exit = True
self.server.should_exit = True
while 1:
self._logger.debug("Wait for server to exit gracefully...")
continue
if not self.thread.is_alive():
time.sleep(wait)
if not self.thread.is_alive(): # we have to double check if it is really dead
break
time.sleep(wait)
self.thread.join()
self.server = None
self.thread = None

def restart(self):
def restart(self, wait: float = 1) -> None:
"""Restart the server by shutting down the existing server and
starting a new one"""
self.shut_down()
self.shut_down(wait=wait)
self.start()
22 changes: 16 additions & 6 deletions tests/unit/util/test_http.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# pylint: disable=missing-docstring
from unittest import mock

from logprep.util.http import ThreadingHTTPServer


Expand All @@ -10,6 +12,7 @@ def test_start_server(self):
server = ThreadingHTTPServer(uvicorn_config, app, False)
server.start()
assert server.thread.is_alive()
server.shut_down(0.1)

def test_shutdown_server(self):
uvicorn_config = {}
Expand All @@ -18,19 +21,26 @@ def test_shutdown_server(self):
server.start()
thread = server.thread
uvicorn_server = server.server
server.shut_down()
server.shut_down(0.1)
assert not thread.is_alive()
assert uvicorn_server.should_exit
assert server.thread is None
assert server.server is None

def test_shutdown_server_double_checks_thread_is_dead(self):
uvicorn_config = {}
app = None
server = ThreadingHTTPServer(uvicorn_config, app, False)
server.thread = mock.MagicMock()
server.server = mock.MagicMock()
server.thread.is_alive.return_value = False
server.shut_down(0.1)
assert server.thread.is_alive.call_count == 2

def test_restart_server(self):
uvicorn_config = {}
app = None
server = ThreadingHTTPServer(uvicorn_config, app, False)
server.start()
thread = server.thread
server.restart()
server.restart(0.1)
assert server.thread is not thread
server.shut_down()
assert server.thread is None
server.shut_down(0.1)

0 comments on commit 185e86d

Please sign in to comment.