Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gen_to_unit incompatible with multiprocessing #85

Open
cboulay opened this issue Jan 26, 2024 · 3 comments
Open

gen_to_unit incompatible with multiprocessing #85

cboulay opened this issue Jan 26, 2024 · 3 comments

Comments

@cboulay
Copy link
Collaborator

cboulay commented Jan 26, 2024

File "/Users/me/mambaforge/envs/ez/lib/python3.11/multiprocessing/reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class 'abc.DummyUnit'>: attribute lookup DummyUnit on abc failed

But it works fine if I fully define DummySettings(ez.Settings) and Dummy(Gen).

I tried providing the __module__ on the dynamic type defined at the bottom of the gen_to_unit method...

        type(
            f"{func.__name__.capitalize()}Unit",
            (ez.Unit,),
            {
                ...,
                "__module__": getmodule(func),
                ...
            },
        ),

This gets past the first error and might be a good idea anyway, but a different error remains:

_pickle.PicklingError: Can't pickle <class 'DummyUnit'>: import of module <module 'ezmsg.chad.dummy' from '/Users/me/Code/tmp/ezmsg/ezmsg-chad/src/ezmsg/chad/dummy.py'> failed

Apparently we also (or only) need to provide __reduce__. Relevant SO answer.

@cboulay
Copy link
Collaborator Author

cboulay commented Jan 26, 2024

I pip installed multiprocess which is supposed to be identical to multiprocessing except it uses dill instead of pickle. Then I modified the import statements in ezmsg.core.backendprocess to use multiprocess instead of multiprocessing. This works!

I don't know if this is the right solution for this project though.

@cboulay
Copy link
Collaborator Author

cboulay commented Feb 2, 2024

@pperanich , minimum repro (except needs #90 to show working example):

import asyncio
import ezmsg.core as ez
import numpy as np
from dataclasses import replace
from typing import Any, Generator
from ezmsg.util.messages.axisarray import AxisArray
from ezmsg.util.debuglog import DebugLog
from ezmsg.util.gen_to_unit import gen_to_unit
from ezmsg.util.generator import consumer


@consumer
def pow(n: float) -> Generator[AxisArray, AxisArray, None]:
    axis_arr_in = AxisArray(np.array([]), dims=[""])
    axis_arr_out = AxisArray(np.array([]), dims=[""])
    while True:
        axis_arr_in = yield axis_arr_out
        axis_arr_out = replace(axis_arr_in, data=axis_arr_in.data**n)


AutoPowSettings, AutoPow = gen_to_unit(pow)


class MessageSender(ez.Unit):
    OUTPUT = ez.OutputStream(Any)

    @ez.publisher(OUTPUT)
    async def send_data(self):
        yield self.OUTPUT, AxisArray(np.arange(100), dims=["data"])
        await asyncio.sleep(1)
        raise ez.NormalTermination


if __name__ == "__main__":
    comps = {
        "SEND": MessageSender(),
        "POW": AutoPow(3),
        "LOG": DebugLog()
    }
    conns = (
        (comps["SEND"].OUTPUT, comps["POW"].INPUT_SIGNAL),
        (comps["POW"].OUTPUT_SIGNAL, comps["LOG"].INPUT)
    )

    ez.run(
        components=comps,
        connections=conns,
        process_components=(comps["POW"],)  # Comment me to work.
    )

@cboulay cboulay closed this as completed Feb 2, 2024
@cboulay cboulay reopened this Feb 2, 2024
@griffinmilsap
Copy link
Collaborator

Can verify this is still an issue. Maybe best path forward is to add multiprocess to optional dependencies and use it if installed. We can add an environment variable to force use of stdlib multiprocessing if multiprocess is installed by some other module

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants