diff --git a/docs/worker/config/logging.md b/docs/worker/config/logging.md index 99d29bc..2c995ed 100644 --- a/docs/worker/config/logging.md +++ b/docs/worker/config/logging.md @@ -43,6 +43,7 @@ There are the following configuration options: - `on_retry` [True] - Whenever a task asks for retry. - `on_max_retries` [True] - Each time a task asks for a retry beyond the maximum number of tries. - `on_requeue` [True] - Every time a task requeues. + - `on_stop` [True] - Every time a task calls `stop` or is interrupted by a `SIGTERM`. - `handlers` - List of handlers [logging.Handler] attached to the `logging.Logger` object. diff --git a/docs/worker/handlers/on_stop.md b/docs/worker/handlers/on_stop.md new file mode 100644 index 0000000..3143b65 --- /dev/null +++ b/docs/worker/handlers/on_stop.md @@ -0,0 +1,18 @@ +# on_stop + +The `on_stop` handler is invoked when a task calls the stop method or receives a `SIGTERM` signal. + +## Definition + +```python +def on_stop( + self, + task: sergeant.objects.Task, +) -> None +``` + +Possible use cases include: + +- Send a log message +- Create a metrics collector +- Close open sessions when the worker is being stopped \ No newline at end of file diff --git a/docs/worker/methods/stop.md b/docs/worker/methods/stop.md index d2dddc3..4b61183 100644 --- a/docs/worker/methods/stop.md +++ b/docs/worker/methods/stop.md @@ -2,6 +2,8 @@ It is possible to stop the worker from running by using the `stop` method. The supervisor receives the request, and will never spawn a new worker instead. A worker uses this method when there is no solution to a problem. Using this method in conjunction with `on_starvation` is a good example of how it should be used. If a worker is starving and there are not enough tasks available to consume, it can be intentionally stopped to reduce the load on the queue. By calling `stop`, all the workers under a specific supervisor will stop as well. +When the worker is interrupted by a `SIGTERM`, the signal handler invokes the `stop` function to interrupt the worker's workflow. + ## Definition @@ -32,7 +34,7 @@ def stop( self.stop() ``` -=== "OnFailure" +=== "OnFailure | OnStop" ```python def work( self, @@ -53,4 +55,10 @@ def stop( ): if isinstance(exception, pymongo.errors.ServerSelectionTimeoutError): self.stop() + + def on_stop( + self, + task, + ): + self.database.close() ``` diff --git a/pyproject.toml b/pyproject.toml index 5680751..7054c05 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "sergeant" -version = "0.25.0" +version = "0.26.0" readme = "README.md" homepage = "https://github.com/Intsights/sergeant" repository = "https://github.com/Intsights/sergeant" diff --git a/sergeant/config.py b/sergeant/config.py index 2bf31c8..9547ca8 100644 --- a/sergeant/config.py +++ b/sergeant/config.py @@ -17,6 +17,7 @@ class LoggingEvents: on_max_retries: bool = True on_requeue: bool = True on_starvation: bool = True + on_stop: bool = True @dataclasses.dataclass( diff --git a/sergeant/executor/threaded.py b/sergeant/executor/threaded.py index 950cea8..4746cc3 100644 --- a/sergeant/executor/threaded.py +++ b/sergeant/executor/threaded.py @@ -141,6 +141,14 @@ def execute_task( killer_object=killer_object, ) + if isinstance( + exception, + worker.WorkerStop, + ): + self.worker_object.handle_stop( + task=task, + ) + self.interrupt_exception = exception except Exception as exception: self.post_work( diff --git a/sergeant/worker.py b/sergeant/worker.py index 52c367c..7eed046 100644 --- a/sergeant/worker.py +++ b/sergeant/worker.py @@ -37,6 +37,8 @@ def __init__( ) self.executor_obj: typing.Optional[executor._executor.Executor] = None + self.received_stop_signal = False + signal.signal(signal.SIGTERM, self.stop_signal_handler) def generate_config( @@ -55,8 +57,9 @@ def stop_signal_handler( msg='stop signal has been received', ) signal.signal(signal.SIGTERM, signal.SIG_DFL) + self.received_stop_signal = True - raise WorkerStop() + self.stop() def init_worker( self, @@ -259,6 +262,9 @@ def iterate_tasks( yield task finally: + if self.received_stop_signal: + iterated_tasks -= 1 + if iterated_tasks < len(tasks): self.push_tasks( kwargs_list=[ @@ -615,6 +621,34 @@ def handle_starvation( }, ) + def handle_stop( + self, + task: objects.Task, + ) -> None: + try: + if self.config.logging.events.on_stop: + self.logger.warning( + msg='task has stopped', + extra={ + 'task': task, + 'received_stop_signal': self.received_stop_signal, + }, + ) + + self.on_stop( + task=task, + ) + except WorkerInterrupt: + raise + except Exception as exception: + self.logger.error( + msg=f'on_stop handler has failed: {exception}', + extra={ + 'task': task, + 'received_stop_signal': self.received_stop_signal, + }, + ) + def initialize( self, ) -> None: @@ -689,6 +723,12 @@ def on_starvation( ) -> None: pass + def on_stop( + self, + task: objects.Task, + ) -> None: + pass + class WorkerException( BaseException, diff --git a/tests/worker/test_handlers.py b/tests/worker/test_handlers.py index 8d52bdc..e0f7735 100644 --- a/tests/worker/test_handlers.py +++ b/tests/worker/test_handlers.py @@ -697,6 +697,123 @@ def test_on_requeue( }, ) + def test_on_stop( + self, + ): + worker = sergeant.worker.Worker() + worker.config = sergeant.config.WorkerConfig( + name='some_worker', + connector=sergeant.config.Connector( + type='redis', + params={ + 'nodes': [ + { + 'host': 'localhost', + 'port': 6379, + 'password': None, + 'database': 0, + }, + ], + }, + ), + max_retries=3, + logging=sergeant.config.Logging( + events=sergeant.config.LoggingEvents( + on_stop=False, + ), + ), + ) + worker.init_broker() + + task = sergeant.objects.Task() + + worker.on_stop = unittest.mock.MagicMock() + worker.logger = unittest.mock.MagicMock() + + worker.handle_stop( + task=task, + ) + worker.on_stop.assert_called_once() + worker.logger.info.assert_not_called() + + worker.on_stop.reset_mock() + worker.logger.reset_mock() + worker.config = worker.config.replace( + logging=sergeant.config.Logging( + events=sergeant.config.LoggingEvents( + on_stop=True, + ), + ), + ) + worker.handle_stop( + task=task, + ) + worker.on_stop.assert_called_once() + worker.logger.info.assert_called_once_with( + msg='task has stopped', + extra={ + 'task': task, + 'received_stop_signal': False, + }, + ) + + worker.on_stop.reset_mock() + worker.logger.reset_mock() + worker.on_stop.side_effect = Exception('exception message') + worker.handle_stop( + task=task, + ) + worker.on_stop.assert_called_once() + worker.logger.info.assert_called_once_with( + msg='task has stopped', + extra={ + 'task': task, + 'received_stop_signal': False, + }, + ) + worker.logger.error.assert_called_once_with( + msg='on_stop handler has failed: exception message', + extra={ + 'task': task, + }, + ) + + worker.on_stop.reset_mock() + worker.logger.reset_mock() + worker.on_stop.side_effect = sergeant.worker.WorkerStop() + with self.assertRaises( + expected_exception=sergeant.worker.WorkerStop, + ): + worker.handle_stop( + task=task, + ) + worker.on_stop.assert_called_once() + worker.logger.info.assert_called_once_with( + msg='task has stopped', + extra={ + 'task': task, + 'received_stop_signal': False, + }, + ) + + worker.on_stop.reset_mock() + worker.logger.reset_mock() + worker.on_stop.side_effect = sergeant.worker.WorkerRespawn() + with self.assertRaises( + expected_exception=sergeant.worker.WorkerRespawn, + ): + worker.handle_stop( + task=task, + ) + worker.on_stop.assert_called_once() + worker.logger.info.assert_called_once_with( + msg='task has stopped', + extra={ + 'task': task, + 'received_stop_signal': False, + }, + ) + def test_on_starvation( self, ):