Skip to content

Commit

Permalink
fix buffer full deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam-D-Lewis committed Jan 29, 2025
1 parent f4ca133 commit 2d62890
Showing 1 changed file with 70 additions and 18 deletions.
88 changes: 70 additions & 18 deletions src/_nebari/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import re
import secrets
import selectors
import signal
import string
import subprocess
Expand Down Expand Up @@ -46,6 +47,67 @@ def change_directory(directory):
os.chdir(current_directory)


def strip_ansi_errors(line):
"""Strips red ANSI escape code from a string."""
ansi_escape = re.compile(r"\x1b\[31m")
stripped_line = ansi_escape.sub("", line.decode("utf-8"))
return stripped_line.encode("utf-8")


def process_streams(
process, line_prefix, strip_errors, print_stdout=True, print_stderr=True
):
sel = selectors.DefaultSelector()
sel.register(process.stdout, selectors.EVENT_READ, data="stdout")
if process.stderr and process.stderr != process.stdout:
sel.register(process.stderr, selectors.EVENT_READ, data="stderr")

outputs = {"stdout": [], "stderr": []}
partial = {"stdout": b"", "stderr": b""}

try:
while True:
events = sel.select(timeout=0.1)
if not events and process.poll() is not None:
break

for key, _ in events:
data = key.fileobj.read1(8192)
if not data:
sel.unregister(key.fileobj)
continue

stream_name = key.data
chunk = partial[stream_name] + data
lines = chunk.split(b"\n")
partial[stream_name] = lines[-1]

for line in lines[:-1]:
line_w_newline = line + b"\n"
if strip_errors:
line_w_newline = strip_ansi_errors(line_w_newline)

# Handle stdout
if stream_name == "stdout":
if print_stdout:
sys.stdout.buffer.write(line_prefix + line_w_newline)
sys.stdout.flush()
else:
outputs["stdout"].append(line_w_newline)

# Handle stderr
if stream_name == "stderr":
if print_stderr:
sys.stderr.buffer.write(line_prefix + line_w_newline)
sys.stderr.flush()
else:
outputs["stderr"].append(line_w_newline)
finally:
sel.close()

return outputs["stdout"], outputs["stderr"]


def run_subprocess_cmd(processargs, prefix=b"", capture_output=False, **kwargs):
"""Runs subprocess command with realtime stdout logging with optional line prefix."""
if prefix:
Expand All @@ -71,6 +133,7 @@ def run_subprocess_cmd(processargs, prefix=b"", capture_output=False, **kwargs):
stderr=stderr_stream,
preexec_fn=os.setsid,
)

# Set timeout thread
timeout_timer = None
if timeout > 0:
Expand All @@ -84,25 +147,14 @@ def kill_process():
timeout_timer = threading.Timer(timeout, kill_process)
timeout_timer.start()

print_stream = process.stderr if capture_output else process.stdout
for line in iter(lambda: print_stream.readline(), b""):
full_line = line_prefix + line
if strip_errors:
full_line = full_line.decode("utf-8")
full_line = re.sub(
r"\x1b\[31m", "", full_line
) # Remove red ANSI escape code
full_line = full_line.encode("utf-8")

sys.stdout.buffer.write(full_line)
sys.stdout.flush()
print_stream.close()

output = []
if capture_output:
for line in iter(lambda: process.stdout.readline(), b""):
output.append(line)
process.stdout.close()
output, _ = process_streams(
process, line_prefix, strip_errors, print_stdout=False, print_stderr=True
)
else:
process_streams(
process, line_prefix, strip_errors, print_stdout=True, print_stderr=True
)

if timeout_timer is not None:
timeout_timer.cancel()
Expand Down

0 comments on commit 2d62890

Please sign in to comment.