Skip to content
This repository has been archived by the owner on Oct 10, 2023. It is now read-only.

Fix exception handling for run_in_venv #66

Merged
merged 25 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6de42b9
scope the pickling into the exception handling
Roulbac Aug 2, 2023
a950121
fix typo in CI paths for flojoy_node_venv
Roulbac Aug 2, 2023
cd64171
add clarity in error logs for run_in_venv
Roulbac Aug 2, 2023
db69473
remove trailing log line
Roulbac Aug 2, 2023
e69da43
throw childprocesserror when run_in_venv function fails
Roulbac Aug 2, 2023
70730e2
minor logging clarification
Roulbac Aug 2, 2023
1be1c19
bump version to dev17
Roulbac Aug 2, 2023
c660dfd
apply black formatter
Roulbac Aug 2, 2023
ea44bc0
move func serialization within PickleableFunctionWithPipeIO
Roulbac Aug 2, 2023
4afa182
bump to dev18
Roulbac Aug 2, 2023
e9baee0
run black formatter
Roulbac Aug 2, 2023
646a7b9
fix test for sys_path in run_in_venv
Roulbac Aug 2, 2023
d419d62
add test to call a run_in_venv decorated function within a Thread
Roulbac Aug 2, 2023
76af006
parametrize the thread test with daemon
Roulbac Aug 2, 2023
2cdea65
run black formatter
Roulbac Aug 2, 2023
4a90fe0
use realpath for flojoy cache
Roulbac Aug 3, 2023
d5db7e8
Merge remote-tracking branch 'origin/main' into reda-fix-run-in-venv-…
Roulbac Aug 7, 2023
9a144c4
bump to dev20
Roulbac Aug 7, 2023
b75d8f9
apply black formatter
Roulbac Aug 7, 2023
3d921b8
hash the pip venv to be shorter
Roulbac Aug 8, 2023
67ed13e
bump to dev21
Roulbac Aug 8, 2023
7479d31
add todo
Roulbac Aug 13, 2023
e00577c
Merge remote-tracking branch 'origin/main' into reda-fix-run-in-venv-…
Roulbac Aug 13, 2023
1234be0
Update flojoy/flojoy_node_venv.py
Roulbac Aug 14, 2023
ae12533
style
itsjoeoui Aug 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 8 additions & 50 deletions .github/workflows/test-flojoy-node-env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@ on:
branches:
- "main"
paths:
- "flojoy/flojoy_node_env.py"
- "flojoy/flojoy_node_venv.py"
- "tests/flojoy_node_venv_test_.py"

pull_request:
paths:
- "flojoy/flojoy_node_env.py"
- "flojoy/flojoy_node_venv.py"
- "tests/flojoy_node_venv_test_.py"

workflow_dispatch:

jobs:
ubuntu:
runs-on: ubuntu-latest
pytest:
strategy:
matrix:
os: [ubuntu, macos, windows]
runs-on: ${{ matrix.os }}-latest
steps:
- uses: actions/checkout@v3
with:
Expand All @@ -34,50 +37,5 @@ jobs:
pip install -e .

- name: Run python tests
run: python -m pytest -vv tests/flojoy_node_venv_test_.py --runslow

macos:
runs-on: macos-latest
steps:
- uses: actions/checkout@v3
with:
submodules: recursive

- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: Install pip dependencies
run: |
pip install ruff pytest
pip install -r requirements.txt
pip install -e .

- name: Run python tests
run: python -m pytest -vv tests/flojoy_node_venv_test_.py --runslow

windows:
runs-on: windows-latest
steps:
- uses: actions/checkout@v3
with:
submodules: recursive

- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"

- name: Install pip dependencies
run: |
pip install ruff pytest
pip install -r requirements.txt
pip install -e .
shell: powershell

- name: Run python tests
run: python -m pytest -vv tests/flojoy_node_venv_test_.py --runslow
shell: powershell



python -m pytest -vv tests/flojoy_node_venv_test_.py --runslow
64 changes: 46 additions & 18 deletions flojoy/flojoy_node_venv.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ def TORCH_NODE(default: Matrix) -> Matrix:
return Matrix(...)

"""
from typing import Callable

import hashlib
import importlib.metadata
import inspect
import logging
import multiprocessing
import multiprocessing.connection
Expand Down Expand Up @@ -62,13 +65,14 @@ def __exit__(self, exc_type, exc_val, exc_tb):
class SwapSysPath:
"""Temporarily swap the sys.path of the child process with the sys.path of the parent process."""

def __init__(self, venv_executable):
def __init__(self, venv_executable, extra_sys_path):
self.new_path = _get_venv_syspath(venv_executable)
self.extra_sys_path = [] if extra_sys_path is None else extra_sys_path
self.old_path = None

def __enter__(self):
self.old_path = sys.path
sys.path = self.new_path
sys.path = self.new_path + self.extra_sys_path

def __exit__(self, exc_type, exc_val, exc_tb):
sys.path = self.old_path
Expand All @@ -82,7 +86,13 @@ def _install_pip_dependencies(
if not verbose:
command += ["-q", "-q"]
command += list(pip_dependencies)
subprocess.check_call(command)
result = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True
)
if verbose:
# Log every line if verbose, prefix with [pip]
for line in result.stdout.decode().splitlines():
logging.info(f"[ _install_pip_dependencies ] {line}")


def _get_venv_syspath(venv_executable: os.PathLike) -> list[str]:
Expand All @@ -97,26 +107,40 @@ class PickleableFunctionWithPipeIO:

def __init__(
self,
func_serialized: bytes,
func: Callable,
child_conn: multiprocessing.connection.Connection,
venv_executable: str,
):
self._func_serialized = func_serialized
self._func_serialized = cloudpickle.dumps(func)
func_module_path = os.path.dirname(os.path.realpath(inspect.getabsfile(func)))
# Check that the function is in a directory indeed
self._extra_sys_path = (
[func_module_path] if os.path.isdir(func_module_path) else None
)
self._child_conn = child_conn
self._venv_executable = venv_executable

def __call__(self, *args_serialized, **kwargs_serialized):
fn = cloudpickle.loads(self._func_serialized)
args = [cloudpickle.loads(arg) for arg in args_serialized]
kwargs = {
key: cloudpickle.loads(value) for key, value in kwargs_serialized.items()
}
with SwapSysPath(venv_executable=self._venv_executable):
with SwapSysPath(
venv_executable=self._venv_executable, extra_sys_path=self._extra_sys_path
):
try:
result = fn(*args, **kwargs)
fn = cloudpickle.loads(self._func_serialized)
args = [cloudpickle.loads(arg) for arg in args_serialized]
kwargs = {
key: cloudpickle.loads(value)
for key, value in kwargs_serialized.items()
}
serialized_result = cloudpickle.dumps(fn(*args, **kwargs))
except Exception as e:
result = (e, traceback.format_exception(type(e), e, e.__traceback__))
serialized_result = cloudpickle.dumps(result)
# Not all exceptions are expected to be picklable
# so we clone their traceback and send our own custom type of exception
exc = ChildProcessError(
f"Child process failed with an exception of type {type(e)}."
).with_traceback(e.__traceback__)
serialized_result = cloudpickle.dumps(
(exc, traceback.format_exception(type(e), e, e.__traceback__))
)
self._child_conn.send_bytes(serialized_result)


Expand Down Expand Up @@ -189,8 +213,11 @@ def TORCH_NODE(default: Matrix) -> Matrix:
except subprocess.CalledProcessError as e:
shutil.rmtree(venv_path)
logging.error(
f"Failed to install pip dependencies into virtual environment from the provided list: {pip_dependencies}"
f"[ _install_pip_dependencies ] Failed to install pip dependencies into virtual environment from the provided list: {pip_dependencies}. The virtual environment under {venv_path} has been deleted."
)
# Log every line of e.stderr
for line in e.stderr.decode().splitlines():
logging.error(f"[ _install_pip_dependencies ] {line}")
raise e

# Define the decorator
Expand All @@ -203,9 +230,8 @@ def wrapper(*args, **kwargs):
kwargs_serialized = {
key: cloudpickle.dumps(value) for key, value in kwargs.items()
}
func_serialized = cloudpickle.dumps(func)
pickleable_func_with_pipe = PickleableFunctionWithPipeIO(
func_serialized, child_conn, venv_executable
func, child_conn, venv_executable
)
# Start the context manager that will change the executable used by multiprocessing
with MultiprocessingExecutableContextManager(venv_executable):
Expand All @@ -227,7 +253,9 @@ def wrapper(*args, **kwargs):
# Fetch exception and formatted traceback (list[str])
exception, tcb = result
# Reraise an exception with the same class
logging.error(f"Error in child process \n{''.join(tcb)}")
logging.error(
f"[ run_in_venv ] Error in child process with the following traceback:\n{''.join(tcb)}"
)
raise exception
return result

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
name="flojoy",
packages=find_packages(exclude=["tests"]),
package_data={"flojoy": ["__init__.pyi"]},
version="0.1.5-dev15",
version="0.1.5-dev18",
license="MIT",
description="Python client library for Flojoy.",
author="flojoy",
Expand Down
45 changes: 41 additions & 4 deletions tests/flojoy_node_venv_test_.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def empty_function_with_jax():
# Test for executable
assert sys_executable.startswith(mock_venv_cache_dir)
# Test for sys.path
assert sys_path[-1].startswith(mock_venv_cache_dir)
assert sys_path[-1].startswith(os.path.dirname(__file__))
assert sys_path[-2].startswith(mock_venv_cache_dir)
# Test for package version
assert packages_dict["jax"] == "0.4.13"

Expand All @@ -76,7 +77,8 @@ def empty_function_with_flytekit():
# Test for executable
assert sys_executable.startswith(mock_venv_cache_dir)
# Test for sys.path
assert sys_path[-1].startswith(mock_venv_cache_dir)
assert sys_path[-1].startswith(os.path.dirname(__file__))
assert sys_path[-2].startswith(mock_venv_cache_dir)
# Test for package version
assert packages_dict["flytekit"] == "1.8.2"

Expand Down Expand Up @@ -104,7 +106,8 @@ def empty_function_with_opencv():
# Test for executable
assert sys_executable.startswith(mock_venv_cache_dir)
# Test for sys.path
assert sys_path[-1].startswith(mock_venv_cache_dir)
assert sys_path[-1].startswith(os.path.dirname(__file__))
assert sys_path[-2].startswith(mock_venv_cache_dir)
# Test for package version
assert packages_dict["opencv-python-headless"] == "4.7.0.72"

Expand All @@ -119,5 +122,39 @@ def empty_function_with_error():
return 1 / 0

# Run the function and expect an error
with pytest.raises(ZeroDivisionError):
with pytest.raises(ChildProcessError):
empty_function_with_error()


def test_run_in_venv_runs_within_thread(mock_venv_cache_dir):
Roulbac marked this conversation as resolved.
Show resolved Hide resolved

from threading import Thread
from queue import Queue

def function_to_run_within_thread(queue):

from flojoy import run_in_venv

@run_in_venv(
pip_dependencies=["numpy==1.23.0"]
)
def func_with_venv():
import numpy as np

return 42

# Run the function
queue.put(func_with_venv())

# Run the function in a thread
queue = Queue()
thread = Thread(target=function_to_run_within_thread, args=(queue,))
thread.start()
thread.join()
# Check that the thread has finished
assert not thread.is_alive()
# Check that there is something in the queue
assert not queue.empty()
# Check that the function has returned
assert queue.get(timeout=60) == 42

Loading