Skip to content

Commit

Permalink
refactored the timeouts mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
Gal Ben David committed Jan 13, 2021
1 parent 7d7c6ed commit 4794693
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 421 deletions.
16 changes: 5 additions & 11 deletions docs/worker/config/timeouts.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,14 @@ The `timeouts` parameter controls the killer timeouts for the worker.
frozen=True,
)
class Timeouts:
soft_timeout: float = 0.0
hard_timeout: float = 0.0
critical_timeout: float = 0.0
timeout: float = 0.0
grace_period: float = 10.0
```

The `timeouts` parameter defines how much time the process should run before the killer must try to kill it.

The following timeouts can be configured:

- `soft_timeout` [both] - On `serial` executor, by the time this timeout is reached, a `SIGINT` would be sent to the worker. On `threaded` worker, an exception would be raised inside the thread.
- `hard_timeout` [serial] - By the time this timeout is reached, a `SIGABRT` would be sent to the worker.
- `critical_timeout` [serial] - By the time this timeout is reached, a `SIGKILL` would be sent to the worker.
- `timeout` - On `serial` executor, by the time this timeout is reached, a `SIGTERM` is sent to the worker. On `threaded` worker, an exception would be raised inside the thread.
- `grace_period` [serial] - If the worker will not become responding after the SIGTERM is sent, it will be escalated to a SIGKILL signal.

By default, no timeouts are applied. It means that the tasks will never timeout. One should use timeouts wisely and set them according to the expected type of the task. If the task, in its ordinary case, should run for no longer than 30s, you can set the timeout to 1m and keep the task from being stuck forever.

Expand All @@ -30,8 +26,6 @@ By default, no timeouts are applied. It means that the tasks will never timeout.

```python
sergeant.config.Timeouts(
soft_timeout=10.0,
hard_timeout=15.0,
critical_timeout=20.0,
timeout=10.0,
)
```
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ docker run \
### Spawn a supervisor to spawn the consumers
```shell
python3 -m sergeant.supervisor \
--worker-module=examples.4_soft_timeout.consumer \
--worker-module=examples.4_timeout.consumer \
--worker-class=Worker \
--concurrent-worker=1
```

### Produce the tasks via the producer
```shell
python3 -m examples.4_soft_timeout.producer
python3 -m examples.4_timeout.producer
```

### Output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ class Worker(
log_to_stdout=True,
),
timeouts=sergeant.config.Timeouts(
soft_timeout=1.0,
hard_timeout=0.0,
critical_timeout=0.0,
timeout=1.0,
),
)

Expand All @@ -40,8 +38,8 @@ def work(
task,
):
if task.kwargs['timeout']:
self.logger.info(f'Going to timeout')
self.logger.info('Going to timeout')
time.sleep(2)
self.logger.info(f'You won\'t see this print')
self.logger.info('You won\'t see this print')
else:
self.logger.info(f'Not going to timeout')
self.logger.info('Not going to timeout')
File renamed without changes.
5 changes: 2 additions & 3 deletions sergeant/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ class Connector:
frozen=True,
)
class Timeouts:
soft_timeout: float = 0.0
hard_timeout: float = 0.0
critical_timeout: float = 0.0
timeout: float = 0.0
grace_period: float = 10.0


@dataclasses.dataclass(
Expand Down
32 changes: 8 additions & 24 deletions sergeant/executor/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,26 @@ def __init__(
) -> None:
self.worker_object = worker_object
self.currently_working = False
self.original_int = signal.getsignal(signal.SIGINT)
self.original_abrt = signal.getsignal(signal.SIGABRT)
self.original_term = signal.getsignal(signal.SIGTERM)

has_soft_timeout = self.worker_object.config.timeouts.soft_timeout > 0
has_hard_timeout = self.worker_object.config.timeouts.hard_timeout > 0
has_critical_timeout = self.worker_object.config.timeouts.critical_timeout > 0

self.should_use_a_killer = has_soft_timeout or has_hard_timeout or has_critical_timeout
self.should_use_a_killer = self.worker_object.config.timeouts.timeout > 0
if self.should_use_a_killer:
self.killer = killer.process.Killer(
pid_to_kill=os.getpid(),
sleep_interval=0.1,
soft_timeout=self.worker_object.config.timeouts.soft_timeout,
hard_timeout=self.worker_object.config.timeouts.hard_timeout,
critical_timeout=self.worker_object.config.timeouts.critical_timeout,
timeout=self.worker_object.config.timeouts.timeout,
grace_period=self.worker_object.config.timeouts.grace_period,
)

signal.signal(signal.SIGABRT, self.sigabrt_handler)
signal.signal(signal.SIGINT, self.sigint_handler)

def sigabrt_handler(
self,
signal_num: int,
frame: types.FrameType,
) -> None:
if self.currently_working:
raise worker.WorkerHardTimedout()
signal.signal(signal.SIGTERM, self.sigterm_handler)

def sigint_handler(
def sigterm_handler(
self,
signal_num: int,
frame: types.FrameType,
) -> None:
if self.currently_working:
raise worker.WorkerSoftTimedout()
raise worker.WorkerTimedout()

def execute_tasks(
self,
Expand Down Expand Up @@ -200,7 +185,6 @@ def __del__(
try:
self.killer.kill()

signal.signal(signal.SIGABRT, self.original_abrt)
signal.signal(signal.SIGINT, self.original_int)
signal.signal(signal.SIGTERM, self.original_term)
except Exception:
pass
7 changes: 3 additions & 4 deletions sergeant/executor/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ def __init__(
self.worker_object = worker_object
self.number_of_threads = number_of_threads

has_soft_timeout = self.worker_object.config.timeouts.soft_timeout > 0
self.should_use_a_killer = has_soft_timeout
self.should_use_a_killer = self.worker_object.config.timeouts.timeout > 0
self.thread_killer = killer.thread.Killer(
exception=worker.WorkerSoftTimedout,
exception=worker.WorkerTimedout,
sleep_interval=0.1,
)
self.interrupt_exception: typing.Optional[Exception] = None
Expand Down Expand Up @@ -161,7 +160,7 @@ def pre_work(
if self.should_use_a_killer:
self.thread_killer.add(
thread_id=threading.get_ident(),
timeout=self.worker_object.config.timeouts.soft_timeout,
timeout=self.worker_object.config.timeouts.timeout,
)

def post_work(
Expand Down
103 changes: 28 additions & 75 deletions sergeant/killer/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ def __init__(
pipe: multiprocessing.connection.Connection,
pid_to_kill: int,
sleep_interval: float,
soft_timeout: float,
hard_timeout: float,
critical_timeout: float,
timeout: float,
grace_period: float,
) -> None:
self.init_logger()

Expand All @@ -31,12 +30,10 @@ def __init__(
self.sleep_interval = sleep_interval
self.time_elapsed = 0.0

self.soft_timeout = soft_timeout
self.hard_timeout = hard_timeout
self.critical_timeout = critical_timeout
self.timeout = timeout
self.critical_timeout = timeout + grace_period

self.soft_timeout_raised = False
self.hard_timeout_raised = False
self.timeout_raised = False
self.critical_timeout_raised = False

self.running = False
Expand Down Expand Up @@ -95,17 +92,15 @@ def process_requests(
msg='reset request was received',
)
self.time_elapsed = 0.0
self.soft_timeout_raised = False
self.hard_timeout_raised = False
self.timeout_raised = False
self.critical_timeout_raised = False
elif data == b'stop_and_reset':
self.logger.info(
msg='stop_and_reset request was received',
)
self.running = False
self.time_elapsed = 0.0
self.soft_timeout_raised = False
self.hard_timeout_raised = False
self.timeout_raised = False
self.critical_timeout_raised = False
elif data == b'kill':
self.logger.info(
Expand Down Expand Up @@ -133,14 +128,13 @@ def kill_loop(
break

if self.running:
self.check_and_process_soft_timeout()
self.check_and_process_hard_timeout()
self.check_and_process_timeout()
self.check_and_process_critical_timeout()

after = time.time()
self.time_elapsed += after - before

if self.kill_received is False:
if not self.kill_received:
try:
self.logger.info(
msg='waiting for process to terminate',
Expand Down Expand Up @@ -193,56 +187,25 @@ def is_process_alive(

return True

def check_and_process_soft_timeout(
def check_and_process_timeout(
self,
) -> None:
if not self.soft_timeout:
return

if self.soft_timeout_raised:
return

if self.time_elapsed >= self.soft_timeout:
self.logger.info(
msg='raising soft timeout',
)
self.soft_timeout_raised = True
self.kill_process(
process=self.process_to_kill,
signal_code=signal.SIGINT,
)

def check_and_process_hard_timeout(
self,
) -> None:
if not self.hard_timeout:
return

if self.hard_timeout_raised:
return

if self.time_elapsed >= self.hard_timeout:
if not self.timeout_raised and self.time_elapsed >= self.timeout:
self.logger.info(
msg='raising hard timeout',
msg='sending timeout signal',
)
self.hard_timeout_raised = True
self.timeout_raised = True
self.kill_process(
process=self.process_to_kill,
signal_code=signal.SIGABRT,
signal_code=signal.SIGTERM,
)

def check_and_process_critical_timeout(
self,
) -> None:
if not self.critical_timeout:
return

if self.critical_timeout_raised:
return

if self.time_elapsed >= self.critical_timeout:
if not self.critical_timeout_raised and self.time_elapsed >= self.critical_timeout:
self.logger.info(
msg='raising critical timeout',
msg='sending critical timeout signal',
)
self.critical_timeout_raised = True
self.kill_process(
Expand Down Expand Up @@ -282,9 +245,8 @@ def __init__(
self,
pid_to_kill: int,
sleep_interval: float,
soft_timeout: float,
hard_timeout: float,
critical_timeout: float,
timeout: float,
grace_period: float,
) -> None:
self.parent_pipe: multiprocessing.connection.Connection
self.child_pipe: multiprocessing.connection.Connection
Expand All @@ -296,9 +258,8 @@ def __init__(
f'{sys.executable} {os.path.relpath(__file__)} '
f'--pid-to-kill {pid_to_kill} '
f'--sleep-interval {sleep_interval} '
f'--soft-timeout {soft_timeout} '
f'--hard-timeout {hard_timeout} '
f'--critical-timeout {critical_timeout} '
f'--timeout {timeout} '
f'--grace-period {grace_period} '
f'--pipe-fd {self.child_pipe.fileno()} '
),
),
Expand Down Expand Up @@ -369,25 +330,18 @@ def main():
dest='sleep_interval',
)
parser.add_argument(
'--soft-timeout',
help='soft timeout',
type=float,
required=True,
dest='soft_timeout',
)
parser.add_argument(
'--hard-timeout',
help='hard timeout',
'--timeout',
help='timeout',
type=float,
required=True,
dest='hard_timeout',
dest='timeout',
)
parser.add_argument(
'--critical-timeout',
help='critical timeout',
'--grace-period',
help='grace period',
type=float,
required=True,
dest='critical_timeout',
dest='grace_period',
)
parser.add_argument(
'--pipe-fd',
Expand All @@ -410,9 +364,8 @@ def main():
pipe=pipe,
pid_to_kill=args.pid_to_kill,
sleep_interval=args.sleep_interval,
soft_timeout=args.soft_timeout,
hard_timeout=args.hard_timeout,
critical_timeout=args.critical_timeout,
timeout=args.timeout,
grace_period=args.grace_period,
)
killer_server.kill_loop()

Expand Down
2 changes: 1 addition & 1 deletion sergeant/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def kill(
pass

try:
self.process.terminate()
self.process.kill()
except Exception:
pass

Expand Down
14 changes: 1 addition & 13 deletions sergeant/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ def on_starvation(


class WorkerException(
Exception,
BaseException,
):
pass

Expand All @@ -686,18 +686,6 @@ class WorkerTimedout(
pass


class WorkerHardTimedout(
WorkerTimedout,
):
pass


class WorkerSoftTimedout(
WorkerTimedout,
):
pass


class WorkerRequeue(
WorkerException,
):
Expand Down
Loading

0 comments on commit 4794693

Please sign in to comment.