Skip to content

Commit

Permalink
Feautre: Better handling for stop signal
Browse files Browse the repository at this point in the history
* call stop function when getting stop signal and added on_stop function to workers

* log level change - error -> warninig

* Handling WrokerStop exception toghether with WorkerInterrupt handling

* docs fixes

* doc fix
  • Loading branch information
noam-shakuri authored Aug 17, 2022
1 parent c99c768 commit dc1f061
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/worker/config/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ There are the following configuration options:
- `on_retry` [True] - Whenever a task asks for retry.
- `on_max_retries` [True] - Each time a task asks for a retry beyond the maximum number of tries.
- `on_requeue` [True] - Every time a task requeues.
- `on_stop` [True] - Every time a task calls `stop` or is interrupted by a `SIGTERM`.
- `handlers` - List of handlers [logging.Handler] attached to the `logging.Logger` object.


Expand Down
18 changes: 18 additions & 0 deletions docs/worker/handlers/on_stop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# on_stop

The `on_stop` handler is invoked when a task calls the stop method or receives a `SIGTERM` signal.

## Definition

```python
def on_stop(
self,
task: sergeant.objects.Task,
) -> None
```

Possible use cases include:

- Send a log message
- Create a metrics collector
- Close open sessions when the worker is being stopped
10 changes: 9 additions & 1 deletion docs/worker/methods/stop.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

It is possible to stop the worker from running by using the `stop` method. The supervisor receives the request, and will never spawn a new worker instead. A worker uses this method when there is no solution to a problem. Using this method in conjunction with `on_starvation` is a good example of how it should be used. If a worker is starving and there are not enough tasks available to consume, it can be intentionally stopped to reduce the load on the queue. By calling `stop`, all the workers under a specific supervisor will stop as well.

When the worker is interrupted by a `SIGTERM`, the signal handler invokes the `stop` function to interrupt the worker's workflow.


## Definition

Expand Down Expand Up @@ -32,7 +34,7 @@ def stop(
self.stop()
```

=== "OnFailure"
=== "OnFailure | OnStop"
```python
def work(
self,
Expand All @@ -53,4 +55,10 @@ def stop(
):
if isinstance(exception, pymongo.errors.ServerSelectionTimeoutError):
self.stop()

def on_stop(
self,
task,
):
self.database.close()
```
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "sergeant"
version = "0.25.0"
version = "0.26.0"
readme = "README.md"
homepage = "https://github.com/Intsights/sergeant"
repository = "https://github.com/Intsights/sergeant"
Expand Down
1 change: 1 addition & 0 deletions sergeant/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class LoggingEvents:
on_max_retries: bool = True
on_requeue: bool = True
on_starvation: bool = True
on_stop: bool = True


@dataclasses.dataclass(
Expand Down
8 changes: 8 additions & 0 deletions sergeant/executor/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ def execute_task(
killer_object=killer_object,
)

if isinstance(
exception,
worker.WorkerStop,
):
self.worker_object.handle_stop(
task=task,
)

self.interrupt_exception = exception
except Exception as exception:
self.post_work(
Expand Down
42 changes: 41 additions & 1 deletion sergeant/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def __init__(
)
self.executor_obj: typing.Optional[executor._executor.Executor] = None

self.received_stop_signal = False

signal.signal(signal.SIGTERM, self.stop_signal_handler)

def generate_config(
Expand All @@ -55,8 +57,9 @@ def stop_signal_handler(
msg='stop signal has been received',
)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self.received_stop_signal = True

raise WorkerStop()
self.stop()

def init_worker(
self,
Expand Down Expand Up @@ -259,6 +262,9 @@ def iterate_tasks(

yield task
finally:
if self.received_stop_signal:
iterated_tasks -= 1

if iterated_tasks < len(tasks):
self.push_tasks(
kwargs_list=[
Expand Down Expand Up @@ -615,6 +621,34 @@ def handle_starvation(
},
)

def handle_stop(
self,
task: objects.Task,
) -> None:
try:
if self.config.logging.events.on_stop:
self.logger.warning(
msg='task has stopped',
extra={
'task': task,
'received_stop_signal': self.received_stop_signal,
},
)

self.on_stop(
task=task,
)
except WorkerInterrupt:
raise
except Exception as exception:
self.logger.error(
msg=f'on_stop handler has failed: {exception}',
extra={
'task': task,
'received_stop_signal': self.received_stop_signal,
},
)

def initialize(
self,
) -> None:
Expand Down Expand Up @@ -689,6 +723,12 @@ def on_starvation(
) -> None:
pass

def on_stop(
self,
task: objects.Task,
) -> None:
pass


class WorkerException(
BaseException,
Expand Down
117 changes: 117 additions & 0 deletions tests/worker/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,123 @@ def test_on_requeue(
},
)

def test_on_stop(
self,
):
worker = sergeant.worker.Worker()
worker.config = sergeant.config.WorkerConfig(
name='some_worker',
connector=sergeant.config.Connector(
type='redis',
params={
'nodes': [
{
'host': 'localhost',
'port': 6379,
'password': None,
'database': 0,
},
],
},
),
max_retries=3,
logging=sergeant.config.Logging(
events=sergeant.config.LoggingEvents(
on_stop=False,
),
),
)
worker.init_broker()

task = sergeant.objects.Task()

worker.on_stop = unittest.mock.MagicMock()
worker.logger = unittest.mock.MagicMock()

worker.handle_stop(
task=task,
)
worker.on_stop.assert_called_once()
worker.logger.info.assert_not_called()

worker.on_stop.reset_mock()
worker.logger.reset_mock()
worker.config = worker.config.replace(
logging=sergeant.config.Logging(
events=sergeant.config.LoggingEvents(
on_stop=True,
),
),
)
worker.handle_stop(
task=task,
)
worker.on_stop.assert_called_once()
worker.logger.info.assert_called_once_with(
msg='task has stopped',
extra={
'task': task,
'received_stop_signal': False,
},
)

worker.on_stop.reset_mock()
worker.logger.reset_mock()
worker.on_stop.side_effect = Exception('exception message')
worker.handle_stop(
task=task,
)
worker.on_stop.assert_called_once()
worker.logger.info.assert_called_once_with(
msg='task has stopped',
extra={
'task': task,
'received_stop_signal': False,
},
)
worker.logger.error.assert_called_once_with(
msg='on_stop handler has failed: exception message',
extra={
'task': task,
},
)

worker.on_stop.reset_mock()
worker.logger.reset_mock()
worker.on_stop.side_effect = sergeant.worker.WorkerStop()
with self.assertRaises(
expected_exception=sergeant.worker.WorkerStop,
):
worker.handle_stop(
task=task,
)
worker.on_stop.assert_called_once()
worker.logger.info.assert_called_once_with(
msg='task has stopped',
extra={
'task': task,
'received_stop_signal': False,
},
)

worker.on_stop.reset_mock()
worker.logger.reset_mock()
worker.on_stop.side_effect = sergeant.worker.WorkerRespawn()
with self.assertRaises(
expected_exception=sergeant.worker.WorkerRespawn,
):
worker.handle_stop(
task=task,
)
worker.on_stop.assert_called_once()
worker.logger.info.assert_called_once_with(
msg='task has stopped',
extra={
'task': task,
'received_stop_signal': False,
},
)

def test_on_starvation(
self,
):
Expand Down

0 comments on commit dc1f061

Please sign in to comment.