Skip to content
This repository has been archived by the owner on Oct 10, 2023. It is now read-only.

Stream logs from run_in_venv to the logging module at the pip install step #76

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-flojoy-node-env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ jobs:

- name: Run python tests
run: |
python -m pytest -vv tests/flojoy_node_venv_test_.py --runslow
python -m pytest -vv -s tests/flojoy_node_venv_test_.py --runslow
19 changes: 11 additions & 8 deletions flojoy/flojoy_node_venv.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def TORCH_NODE(default: Matrix) -> Matrix:
import cloudpickle

from .utils import FLOJOY_CACHE_DIR
from .logging import LogPipe

__all__ = ["run_in_venv"]

Expand All @@ -59,18 +60,20 @@ def _install_pip_dependencies(
venv_executable: os.PathLike, pip_dependencies: tuple[str], verbose: bool = False
):
"""Install pip dependencies into the virtual environment."""
# TODO(roulbac): Stream logs from pip install
command = [venv_executable, "-m", "pip", "install"]
if not verbose:
command += ["-q", "-q"]
command += list(pip_dependencies)
result = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True
)
if verbose:
# Log every line if verbose, prefix with [pip]
for line in result.stdout.decode().splitlines():
logging.info(f"[ _install_pip_dependencies ] {line}")
with LogPipe(logging.INFO) as pipe_stdout, LogPipe(logging.ERROR) as pipe_stderr:
proc = subprocess.Popen(command, stdout=pipe_stdout, stderr=pipe_stderr)
proc.wait()
if proc.returncode != 0:
raise subprocess.CalledProcessError(
proc.returncode,
command,
output=pipe_stdout.buffer.getvalue(),
stderr=pipe_stderr.buffer.getvalue(),
)


def _get_venv_syspath(venv_executable: os.PathLike) -> list[str]:
Expand Down
70 changes: 70 additions & 0 deletions flojoy/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import logging
import io
import threading
import os
from time import sleep


class LogPipe:
"""A context manager that creates a pipe which can be written to by the subprocessing
module and read from by the logging module. This is intended to capture and redirect logs
from a subprocess to the logging module.

Example usage:
```
with logpipe.LogPipe(logging.INFO) as logpipe_stdout, logpipe.LogPipe(logging.ERROR) as logpipe_stderr:
with subprocess.Popen(
command=["python", "-m", "pip", "install", "flytekit"],
stdout=logpipe_stdout,
stderr=logpipe_stderr
) as proc:
pass
# Here the logs are available in the logpipe_stdout.buffer and logpipe_stderr.buffer.
# This is useful for exception handling, so that we can accompany a raised exception with the logs that led to it.
captured_stdout = logpipe_stdout.buffer.getvalue()
captured_stderr = logpipe_stderr.buffer.getvalue()
```
"""

def __init__(self, level: int):
"""Setup the object with a logger and a log level.

Args:
level: The log level to use for the captured logs.
"""
self.level = level
self.fdRead, self.fdWrite = os.pipe()
self.pipeReader = os.fdopen(self.fdRead, mode="rb")
self.thread = threading.Thread(target=self.run, daemon=True)
self.buffer = io.BytesIO()
self.closed = False

def __enter__(self):
"""Start the thread when entering the context."""
self.thread.start()
return self

def __exit__(self, exc_type, exc_value, traceback):
"""Ensure everything is closed and terminated when exiting the context."""
self.close()
self.thread.join(2.0)

def fileno(self):
"""Return the write file descriptor of the pipe."""
return self.fdWrite

def run(self):
"""Log everything that comes from the pipe."""
while not self.closed or self.pipeReader.peek():
byte_data = self.pipeReader.readline()
if byte_data:
logging.log(self.level, byte_data.decode("utf-8").rstrip("\n"))
self.buffer.write(byte_data)
else:
sleep(0.1)
self.pipeReader.close()

def close(self):
"""Close the write end of the pipe."""
os.close(self.fdWrite)
self.closed = True
30 changes: 20 additions & 10 deletions tests/flojoy_node_venv_test_.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,21 @@
import shutil
from unittest.mock import patch
import tempfile
import logging


pytestmark = pytest.mark.slow


@pytest.fixture(scope="module")
def logging_debug():
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.DEBUG,
force=True,
)


# Define a fixture to patch tempfile.tempdir
@pytest.fixture
def mock_venv_cache_dir():
Expand All @@ -26,12 +36,12 @@ def mock_venv_cache_dir():
shutil.rmtree(_test_tempdir)


def test_run_in_venv_imports_jax_properly(mock_venv_cache_dir):
def test_run_in_venv_imports_jax_properly(mock_venv_cache_dir, logging_debug):
"""Test that run_in_venv imports properly jax for example"""

from flojoy import run_in_venv

@run_in_venv(pip_dependencies=["jax[cpu]==0.4.13"])
@run_in_venv(pip_dependencies=["jax[cpu]==0.4.13"], verbose=True)
def empty_function_with_jax():
# Import jax to check if it is installed
# Fetch the list of installed packages
Expand All @@ -56,11 +66,11 @@ def empty_function_with_jax():
assert packages_dict["jax"] == "0.4.13"


def test_run_in_venv_imports_flytekit_properly(mock_venv_cache_dir):
def test_run_in_venv_imports_flytekit_properly(mock_venv_cache_dir, logging_debug):
from flojoy import run_in_venv

# Define a function that imports flytekit and returns its version
@run_in_venv(pip_dependencies=["flytekit==1.8.2"])
@run_in_venv(pip_dependencies=["flytekit==1.8.2"], verbose=True)
def empty_function_with_flytekit():
import sys
import importlib.metadata
Expand All @@ -83,12 +93,12 @@ def empty_function_with_flytekit():
assert packages_dict["flytekit"] == "1.8.2"


def test_run_in_venv_imports_opencv_properly(mock_venv_cache_dir):
def test_run_in_venv_imports_opencv_properly(mock_venv_cache_dir, logging_debug):
# Define a function that imports opencv-python-headless and returns its version

from flojoy import flojoy, run_in_venv
from flojoy import run_in_venv

@run_in_venv(pip_dependencies=["opencv-python-headless==4.7.0.72"])
@run_in_venv(pip_dependencies=["opencv-python-headless==4.7.0.72"], verbose=True)
def empty_function_with_opencv():
import sys
import importlib.metadata
Expand All @@ -111,7 +121,7 @@ def empty_function_with_opencv():
assert packages_dict["opencv-python-headless"] == "4.7.0.72"


def test_run_in_venv_does_not_hang_on_error(mock_venv_cache_dir):
def test_run_in_venv_does_not_hang_on_error(mock_venv_cache_dir, logging_debug):
"""Test that run_in_venv imports properly jax for example"""

from flojoy import run_in_venv
Expand All @@ -126,14 +136,14 @@ def empty_function_with_error():


@pytest.mark.parametrize("daemon", [True, False])
def test_run_in_venv_runs_within_thread(mock_venv_cache_dir, daemon):
def test_run_in_venv_runs_within_thread(mock_venv_cache_dir, logging_debug, daemon):
from threading import Thread
from queue import Queue

def function_to_run_within_thread(queue):
from flojoy import run_in_venv

@run_in_venv(pip_dependencies=["numpy==1.23.0"])
@run_in_venv(pip_dependencies=["numpy==1.23.0"], verbose=True)
def func_with_venv():
import numpy as np

Expand Down