diff --git a/flojoy/flojoy_node_venv.py b/flojoy/flojoy_node_venv.py index 8c6a1dd..e43db2c 100644 --- a/flojoy/flojoy_node_venv.py +++ b/flojoy/flojoy_node_venv.py @@ -23,6 +23,7 @@ def TORCH_NODE(default: Matrix) -> Matrix: import importlib.metadata import logging import multiprocessing +import multiprocessing.connection import os import shutil import subprocess @@ -90,7 +91,7 @@ def _get_venv_syspath(venv_executable: os.PathLike) -> list[str]: class PickleableFunctionWithPipeIO: """A wrapper for a function that can be pickled and executed in a child process.""" - def __init__(self, func_serialized, child_conn, venv_executable): + def __init__(self, func_serialized: bytes, child_conn: multiprocessing.connection.Connection, venv_executable: str): self._func_serialized = func_serialized self._child_conn = child_conn self._venv_executable = venv_executable @@ -105,7 +106,7 @@ def __call__(self, *args_serialized, **kwargs_serialized): except Exception as e: result = (e, traceback.format_exception(type(e), e, e.__traceback__)) serialized_result = cloudpickle.dumps(result) - self._child_conn.send(serialized_result) + self._child_conn.send_bytes(serialized_result) def _get_venv_executable_path(venv_path: os.PathLike | str) -> os.PathLike | str: """Get the path to the python executable of the virtual environment.""" @@ -187,7 +188,7 @@ def wrapper(*args, **kwargs): # Start the process process.start() # Fetch result from the child process - serialized_result = parent_conn.recv() + serialized_result = parent_conn.recv_bytes() # Wait for the process to finish process.join() # Check if the process sent an exception with a traceback diff --git a/setup.py b/setup.py index b6b1ed3..4b784b3 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ name="flojoy", packages=find_packages(exclude=["tests"]), package_data={"flojoy": ["__init__.pyi"]}, - version="0.1.5-dev14", + version="0.1.5-dev15", license="MIT", description="Python client library for Flojoy.", author="flojoy",