Skip to content

Commit

Permalink
added killer shutdown call on worker's exit to make sure the worker i…
Browse files Browse the repository at this point in the history
…s able to shutdown properly, and if not, it will get killed by the killer
  • Loading branch information
Gal Ben David committed Jan 21, 2021
1 parent 8df7bec commit fc65802
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 5 deletions.
9 changes: 7 additions & 2 deletions sergeant/executor/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,18 @@ def post_work(
},
)

def __del__(
def shutdown(
self,
) -> None:
if self.should_use_a_killer:
try:
self.killer.kill()
self.killer.shutdown()

signal.signal(signal.SIGTERM, self.original_term)
except Exception:
pass

def __del__(
self,
) -> None:
self.shutdown()
10 changes: 10 additions & 0 deletions sergeant/executor/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,13 @@ def post_work(
'exception': exception,
},
)

def shutdown(
self,
) -> None:
self.thread_killer.stop()

def __del__(
self,
) -> None:
self.shutdown()
15 changes: 13 additions & 2 deletions sergeant/killer/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(

self.running = False
self.kill_received = False
self.shutdown_received = False
self.pipe_was_closed = False

def init_logger(
Expand Down Expand Up @@ -95,6 +96,11 @@ def process_requests(
self.time_elapsed = 0.0
self.timeout_raised = False
self.critical_timeout_raised = False
elif data == b'shutdown':
self.logger.info(
msg='shutdown request was received',
)
self.shutdown_received = True
elif data == b'kill':
self.logger.info(
msg='kill request was received',
Expand Down Expand Up @@ -122,7 +128,7 @@ def kill_loop(

if self.kill_received:
break
elif self.pipe_was_closed:
elif self.pipe_was_closed or self.shutdown_received:
self.logger.info(
msg='waiting for process to terminate',
)
Expand Down Expand Up @@ -276,14 +282,19 @@ def stop_and_reset(
) -> None:
self.parent_pipe.send_bytes(b'stop_and_reset')

def shutdown(
self,
) -> None:
self.parent_pipe.send_bytes(b'shutdown')

def kill(
self,
) -> None:
self.parent_pipe.send_bytes(b'kill')

try:
self.killer_process.wait(
timeout=1.0,
timeout=2.0,
)
except Exception:
pass
Expand Down
4 changes: 3 additions & 1 deletion sergeant/killer/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ def __init__(
exception: typing.Type[BaseException],
sleep_interval: float = 0.1,
) -> None:
super().__init__()
super().__init__(
daemon=True,
)

self.exception = exception
self.sleep_interval = sleep_interval
Expand Down
2 changes: 2 additions & 0 deletions sergeant/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ def work_loop(
'total_cpu_time': total_cpu_time,
}

self.executor_obj.shutdown()

return summary

def retry(
Expand Down
2 changes: 2 additions & 0 deletions tests/killer/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def test_timeouts_killer(
expr=self.sigterm_fired,
)

killer.kill()

def test_sleep_case_killer(
self,
):
Expand Down

0 comments on commit fc65802

Please sign in to comment.