-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't print full KeyboardInterrupt Exceptions #43
base: master
Are you sure you want to change the base?
Conversation
There's a unfortunately a third place:
I'm wondering if we can handle CTRL+C in a similar way as other interrupts. The doc at https://docs.python.org/3.8/library/signal.html says "SIGINT is translated into a KeyboardInterrupt exception if the parent process has not changed it." So I'd say we try to "change it". |
the problem is that other signals are even worse to handle directly in the code. In the signal handler we are using now I basically translate any received signal to SIGINT and then handle that as the rest of the mara code has already some places where KeyboardInterrupt is handled. Some context: Adding this PR (should) make all stacktraces for ctrl+c go away. Adding a signal handler in flask is not possible, at least not in all processes as a) we run in a thread at the point we can install one in run_pipeline and you cannot install a signal handler in a non-main thread and b) if you try to do it on import time, this interferes with flasks own shutdown sequence which expects a KeyboardInterrupt. For the usecase of the commandline pipeline runner, we can install it in run_pipeline (and we do now install the below in root_pipeline() which is run in that main process). If you want to shutdown "cleanly" (= kill all tasks, register the bad output in the inner run() process, send it as events to the event handler in run_pipeline() and persist/forward to frontend), the the below does what's needed for that. It basically first kills the tasks with SIGINT (nice because psql cancels the current query before exiting), then waits a few seconds, kills all otherchildren with SIGINT and then raises a SIGINT so that the main run_pipeline loop also cleanly exits. It also makes sure that the signal handler in run_pipeline orchestrates this be making any other process forward the signal to the run_pipeline process. it also makes now sure that we close all open runs/node_runs thanks to the KeyboardInterrupt handler and atexit code. I'm not really happy with the code as it become bigger with each missing piece we found, but it does cleanly shut down our system so that we now have at least some debugging output whats going on in case our nice and shiny k8s environment decides to kill us for some (still unknown) reason... import signal
import os
import sys
import time
import atexit
import psutil
_ALL_SIGNALS = [signal.SIGTERM, signal.SIGINT, signal.SIGQUIT]
def _signal_children(signum: int, parent_pid: int = None, only_tasks=False):
"""Signals all children, starting with the youngest ones"""
if parent_pid is None:
parent_pid = os.getpid()
parent = psutil.Process(parent_pid)
processes = parent.children(recursive=True)
for child in reversed(processes):
if only_tasks:
if _name_process(child.pid) in ("Main", "Executor"):
# protect these two so events are handled
continue
try:
os.kill(child.pid, signum)
except :
# e.g. when the process is already gone as we already killed the child of it
pass
def _log(msg: str):
print(msg, flush=True, file=sys.stderr)
def _name_process(pid: int):
"""Names the process of the pid"""
# The idea is that we start with a main process (run_pipeline), which starts the Executor (run())
# which starts the system stats process and the individual tasks. All of these python processes share the same
# "name" (the commandline). Within the tasks it's different and the process parent of the main process is different.
try:
p = psutil.Process(pid)
p_name = p.name()
except psutil.NoSuchProcess:
# process died already...
return
python_name = psutil.Process(os.getpid()).name()
name_for_level = {
1: "Main",
2: "Executor",
3: "Task/SystemStats",
4: "Task Child"
}
if p_name != python_name:
# this assumes we only get children as pid arg!
return name_for_level[4]
for level in [1, 2, 3]:
if p.parent().name() != p_name:
return name_for_level[level]
p = p.parent()
# this can happen for python function tasks which open a new process
return name_for_level[4]
def install_gracefully_shutdown_signal_handler():
# gracefully shutting down:
# by installing this signal handler, all children also get it. This goes down to the process which runs
# run_pipeline, the executor started within that function and the TaskProcess started by the executor.
# Installing a signal handler in the webserver turned out to be totally unreliable which is why we only install
# this on a best effort base.
# if the "highest parent" (which in a normal exec is the process which runs run_pipeline) get a SIGTERM/SIGINT:
# - set 'shutdown_flag' (and return if it is already set to not do the work twice)
# - signal SIGINT to all tasks
# - schedule a SIGINT in 2 sec for all children (so including the executor)
# - schedule a SIGKILL in 10 sec for all children and raise a KeyboardInterupt() to shutdown the run_pipeline()
# - return to let the run_pipeline receive the events for all the killed children
# if any children TaskProcess or the stats process gets a SIGTERM/SIGINT:
# - set a "shutdown_flag" (and return if it is already set to not do the work twice)
# - signal SIGINT to the main process (which should be the executor)
# - return to let the TaskProcess and the executor send the right events
installing_process_pid = os.getpid()
def _SIGKILL_all_children_handler(signum, frame):
_log(f"Signaling SIGKILL to all children (PID: {os.getpid()})")
_signal_children(signal.SIGKILL)
raise KeyboardInterrupt()
def _SIGINT_all_children_handler(signum, frame):
_log(f"Signaling SIGINT to all children (PID: {os.getpid()})")
_signal_children(signal.SIGINT)
_log(f"Scheduling SIGKILL to all children in 10 sec (PID: {os.getpid()})")
signal.signal(signal.SIGALRM, _SIGKILL_all_children_handler)
signal.alarm(10)
def _main_signal_handler(signum, frame):
"""kills children and signals the parent (up to the executor) to shutdown"""
if hasattr(_main_signal_handler, 'shutdown_flag'):
# already did the work
return
_main_signal_handler.shutdown_flag = True
_sig_name = signal.Signals(signum).name
_p_name = _name_process(os.getpid())
_log(f"Received shutdown signal {_sig_name} in {_p_name} process (PID: {os.getpid()})")
if installing_process_pid != os.getpid():
# we are in a child -> bubble up the signal to the parent and the
# main pid (which should be the executor itself)
os.kill(installing_process_pid, signal.SIGINT)
# no other action to not die in the executor where events are generated and the TaskProcess
# both will die naturally when all children have died
else:
# we are in the main process, usually whatever runs run_pipeline
_log(f"Shutdown all tasks (PID: {os.getpid()})")
_signal_children(signal.SIGINT, installing_process_pid, only_tasks=True)
# send another shutdown signal after 1 second to make sure that any newly started child processes
# also get killed. this happens because we cannot prevent the executor from starting new ones unless
# the executor already has seen an error from the same pipeline. It would be nicer if we could do it
# directly in the executor...
# While we sleep, no event handling happens!
time.sleep(1)
_signal_children(signal.SIGINT, installing_process_pid, only_tasks=True)
_log(f"Scheduling SIGNINT to all children in 3 sec (PID: {os.getpid()})")
signal.signal(signal.SIGALRM, _SIGINT_all_children_handler)
signal.alarm(3)
# return to let the process receive/send events -> if nothing is there anymore, the process dies in the end...
return
def cancel_alarm():
# if we are about to shutdown because all processes already exited without a SIGKILL, we have to disable
# the alarm which would sent this SIGKILL, otherwise we get a bad exit
signal.alarm(0)
# on fork, the signal handler stays on children unless overwritten
try:
for sig_no in _ALL_SIGNALS:
signal.signal(sig_no, _main_signal_handler)
atexit.register(cancel_alarm)
_log("Installed signal handler to gracefully shutdown the ETL")
except ValueError:
# we are in a webserver, you are on your own...
_log("Not in main thread, not adding signal handler") |
Hm, i thought i had that covered by the if instanceof() in the print? The stacktrace doesn't really make sense, it looks liek the exception is raised during the handling of the first exception (do press ctrl+c twice?) but that one should still end up in a single line of |
1f736a5
to
38eb8bb
Compare
@jankatins are you running that in production? The previous changes you made around cancelling runs really improved things a lot, but here I'm afraid we add a lot of complexity for little gains |
The two commits in this PR only do not print stacktraces in two places and were only to clean up such stacktraces as requested in #42. The change looks big because of the indention (wrapping something in The code example for the signal handler here on the other hand is run in production (installed in |
I found two places where such things showed up:
This should cover the most likely cases where this can occur.
Closes: #42