diff --git a/.github/workflows/test-flojoy-node-env.yaml b/.github/workflows/test-flojoy-node-env.yaml index d4e5d1c..26c464b 100644 --- a/.github/workflows/test-flojoy-node-env.yaml +++ b/.github/workflows/test-flojoy-node-env.yaml @@ -5,19 +5,22 @@ on: branches: - "main" paths: - - "flojoy/flojoy_node_env.py" + - "flojoy/flojoy_node_venv.py" - "tests/flojoy_node_venv_test_.py" pull_request: paths: - - "flojoy/flojoy_node_env.py" + - "flojoy/flojoy_node_venv.py" - "tests/flojoy_node_venv_test_.py" workflow_dispatch: jobs: - ubuntu: - runs-on: ubuntu-latest + pytest: + strategy: + matrix: + os: [ubuntu, macos, windows] + runs-on: ${{ matrix.os }}-latest steps: - uses: actions/checkout@v3 with: @@ -34,50 +37,5 @@ jobs: pip install -e . - name: Run python tests - run: python -m pytest -vv tests/flojoy_node_venv_test_.py --runslow - - macos: - runs-on: macos-latest - steps: - - uses: actions/checkout@v3 - with: - submodules: recursive - - - uses: actions/setup-python@v4 - with: - python-version: "3.10" - cache: "pip" - - name: Install pip dependencies run: | - pip install ruff pytest - pip install -r requirements.txt - pip install -e . - - - name: Run python tests - run: python -m pytest -vv tests/flojoy_node_venv_test_.py --runslow - - windows: - runs-on: windows-latest - steps: - - uses: actions/checkout@v3 - with: - submodules: recursive - - - uses: actions/setup-python@v4 - with: - python-version: "3.10" - cache: "pip" - - - name: Install pip dependencies - run: | - pip install ruff pytest - pip install -r requirements.txt - pip install -e . - shell: powershell - - - name: Run python tests - run: python -m pytest -vv tests/flojoy_node_venv_test_.py --runslow - shell: powershell - - - + python -m pytest -vv tests/flojoy_node_venv_test_.py --runslow \ No newline at end of file diff --git a/flojoy/flojoy_node_venv.py b/flojoy/flojoy_node_venv.py index 69c3b3e..84c5cf5 100644 --- a/flojoy/flojoy_node_venv.py +++ b/flojoy/flojoy_node_venv.py @@ -19,8 +19,11 @@ def TORCH_NODE(default: Matrix) -> Matrix: return Matrix(...) """ +from typing import Callable + import hashlib import importlib.metadata +import inspect import logging import multiprocessing import multiprocessing.connection @@ -62,13 +65,14 @@ def __exit__(self, exc_type, exc_val, exc_tb): class SwapSysPath: """Temporarily swap the sys.path of the child process with the sys.path of the parent process.""" - def __init__(self, venv_executable): + def __init__(self, venv_executable, extra_sys_path): self.new_path = _get_venv_syspath(venv_executable) + self.extra_sys_path = [] if extra_sys_path is None else extra_sys_path self.old_path = None def __enter__(self): self.old_path = sys.path - sys.path = self.new_path + sys.path = self.new_path + self.extra_sys_path def __exit__(self, exc_type, exc_val, exc_tb): sys.path = self.old_path @@ -78,11 +82,18 @@ def _install_pip_dependencies( venv_executable: os.PathLike, pip_dependencies: tuple[str], verbose: bool = False ): """Install pip dependencies into the virtual environment.""" + # TODO(roulbac): Stream logs from pip install command = [venv_executable, "-m", "pip", "install"] if not verbose: command += ["-q", "-q"] command += list(pip_dependencies) - subprocess.check_call(command) + result = subprocess.run( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True + ) + if verbose: + # Log every line if verbose, prefix with [pip] + for line in result.stdout.decode().splitlines(): + logging.info(f"[ _install_pip_dependencies ] {line}") def _get_venv_syspath(venv_executable: os.PathLike) -> list[str]: @@ -97,26 +108,40 @@ class PickleableFunctionWithPipeIO: def __init__( self, - func_serialized: bytes, + func: Callable, child_conn: multiprocessing.connection.Connection, venv_executable: str, ): - self._func_serialized = func_serialized + self._func_serialized = cloudpickle.dumps(func) + func_module_path = os.path.dirname(os.path.realpath(inspect.getabsfile(func))) + # Check that the function is in a directory indeed + self._extra_sys_path = ( + [func_module_path] if os.path.isdir(func_module_path) else None + ) self._child_conn = child_conn self._venv_executable = venv_executable def __call__(self, *args_serialized, **kwargs_serialized): - fn = cloudpickle.loads(self._func_serialized) - args = [cloudpickle.loads(arg) for arg in args_serialized] - kwargs = { - key: cloudpickle.loads(value) for key, value in kwargs_serialized.items() - } - with SwapSysPath(venv_executable=self._venv_executable): + with SwapSysPath( + venv_executable=self._venv_executable, extra_sys_path=self._extra_sys_path + ): try: - result = fn(*args, **kwargs) + fn = cloudpickle.loads(self._func_serialized) + args = [cloudpickle.loads(arg) for arg in args_serialized] + kwargs = { + key: cloudpickle.loads(value) + for key, value in kwargs_serialized.items() + } + serialized_result = cloudpickle.dumps(fn(*args, **kwargs)) except Exception as e: - result = (e, traceback.format_exception(type(e), e, e.__traceback__)) - serialized_result = cloudpickle.dumps(result) + # Not all exceptions are expected to be picklable + # so we clone their traceback and send our own custom type of exception + exc = ChildProcessError( + f"Child process failed with an exception of type {type(e)}." + ).with_traceback(e.__traceback__) + serialized_result = cloudpickle.dumps( + (exc, traceback.format_exception(type(e), e, e.__traceback__)) + ) self._child_conn.send_bytes(serialized_result) @@ -170,9 +195,9 @@ def TORCH_NODE(default: Matrix) -> Matrix: os.makedirs(venv_cache_dir, exist_ok=True) # Generate a path-safe hash of the pip dependencies # this prevents the duplication of virtual environments - pip_dependencies_hash = hashlib.sha256( - "".join(pip_dependencies).encode() - ).hexdigest() + pip_dependencies_hash = hashlib.md5( + "".join(sorted(pip_dependencies)).encode() + ).hexdigest()[:8] venv_path = os.path.join(venv_cache_dir, f"{pip_dependencies_hash}") venv_executable = _get_venv_executable_path(venv_path) # Create the node_env virtual environment if it does not exist @@ -189,8 +214,11 @@ def TORCH_NODE(default: Matrix) -> Matrix: except subprocess.CalledProcessError as e: shutil.rmtree(venv_path) logging.error( - f"Failed to install pip dependencies into virtual environment from the provided list: {pip_dependencies}" + f"[ _install_pip_dependencies ] Failed to install pip dependencies into virtual environment from the provided list: {pip_dependencies}. The virtual environment under {venv_path} has been deleted." ) + # Log every line of e.stderr + for line in e.stderr.decode().splitlines(): + logging.error(f"[ _install_pip_dependencies ] {line}") raise e # Define the decorator @@ -203,9 +231,8 @@ def wrapper(*args, **kwargs): kwargs_serialized = { key: cloudpickle.dumps(value) for key, value in kwargs.items() } - func_serialized = cloudpickle.dumps(func) pickleable_func_with_pipe = PickleableFunctionWithPipeIO( - func_serialized, child_conn, venv_executable + func, child_conn, venv_executable ) # Start the context manager that will change the executable used by multiprocessing with MultiprocessingExecutableContextManager(venv_executable): @@ -227,7 +254,9 @@ def wrapper(*args, **kwargs): # Fetch exception and formatted traceback (list[str]) exception, tcb = result # Reraise an exception with the same class - logging.error(f"Error in child process \n{''.join(tcb)}") + logging.error( + f"[ run_in_venv ] Error in child process with the following traceback:\n{''.join(tcb)}" + ) raise exception return result diff --git a/flojoy/utils.py b/flojoy/utils.py index 6f3da39..29101e5 100644 --- a/flojoy/utils.py +++ b/flojoy/utils.py @@ -39,9 +39,9 @@ if sys.platform == "win32": - FLOJOY_CACHE_DIR = os.path.join(os.environ["APPDATA"], FLOJOY_DIR) + FLOJOY_CACHE_DIR = os.path.realpath(os.path.join(os.environ["APPDATA"], FLOJOY_DIR)) else: - FLOJOY_CACHE_DIR = os.path.join(os.environ["HOME"], FLOJOY_DIR) + FLOJOY_CACHE_DIR = os.path.realpath(os.path.join(os.environ["HOME"], FLOJOY_DIR)) # Make as a function to mock at test-time diff --git a/setup.py b/setup.py index fe837f4..3f2b6de 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ name="flojoy", packages=find_packages(exclude=["tests"]), package_data={"flojoy": ["__init__.pyi"]}, - version="0.1.5-dev15", + version="0.1.5-dev21", license="MIT", description="Python client library for Flojoy.", author="flojoy", diff --git a/tests/flojoy_node_venv_test_.py b/tests/flojoy_node_venv_test_.py index 95302bb..35dc2d3 100644 --- a/tests/flojoy_node_venv_test_.py +++ b/tests/flojoy_node_venv_test_.py @@ -11,7 +11,9 @@ # Define a fixture to patch tempfile.tempdir @pytest.fixture def mock_venv_cache_dir(): - _test_tempdir = os.path.join(tempfile.gettempdir(), "test_flojoy_node_venv") + _test_tempdir = os.path.realpath( + os.path.join(tempfile.gettempdir(), "test_flojoy_node_venv") + ) # Wipe the directory to be patched if it exists shutil.rmtree(_test_tempdir, ignore_errors=True) os.makedirs(_test_tempdir) @@ -48,7 +50,8 @@ def empty_function_with_jax(): # Test for executable assert sys_executable.startswith(mock_venv_cache_dir) # Test for sys.path - assert sys_path[-1].startswith(mock_venv_cache_dir) + assert sys_path[-1].startswith(os.path.dirname(__file__)) + assert sys_path[-2].startswith(mock_venv_cache_dir) # Test for package version assert packages_dict["jax"] == "0.4.13" @@ -74,7 +77,8 @@ def empty_function_with_flytekit(): # Test for executable assert sys_executable.startswith(mock_venv_cache_dir) # Test for sys.path - assert sys_path[-1].startswith(mock_venv_cache_dir) + assert sys_path[-1].startswith(os.path.dirname(__file__)) + assert sys_path[-2].startswith(mock_venv_cache_dir) # Test for package version assert packages_dict["flytekit"] == "1.8.2" @@ -101,7 +105,8 @@ def empty_function_with_opencv(): # Test for executable assert sys_executable.startswith(mock_venv_cache_dir) # Test for sys.path - assert sys_path[-1].startswith(mock_venv_cache_dir) + assert sys_path[-1].startswith(os.path.dirname(__file__)) + assert sys_path[-2].startswith(mock_venv_cache_dir) # Test for package version assert packages_dict["opencv-python-headless"] == "4.7.0.72" @@ -116,5 +121,35 @@ def empty_function_with_error(): return 1 / 0 # Run the function and expect an error - with pytest.raises(ZeroDivisionError): + with pytest.raises(ChildProcessError): empty_function_with_error() + + +@pytest.mark.parametrize("daemon", [True, False]) +def test_run_in_venv_runs_within_thread(mock_venv_cache_dir, daemon): + from threading import Thread + from queue import Queue + + def function_to_run_within_thread(queue): + from flojoy import run_in_venv + + @run_in_venv(pip_dependencies=["numpy==1.23.0"]) + def func_with_venv(): + import numpy as np + + return 42 + + # Run the function + queue.put(func_with_venv()) + + # Run the function in a thread + queue = Queue() + thread = Thread(target=function_to_run_within_thread, args=(queue,), daemon=daemon) + thread.start() + thread.join() + # Check that the thread has finished + assert not thread.is_alive() + # Check that there is something in the queue + assert not queue.empty() + # Check that the function has returned + assert queue.get(timeout=60) == 42