Skip to content
This repository has been archived by the owner on Oct 10, 2023. It is now read-only.

[WIP] New node API: Venv support #59

Merged
merged 99 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
3662162
Don't load parameter manifest for param types
smahmed776 Jun 21, 2023
b3108f0
some formatting
smahmed776 Jun 21, 2023
de8d1aa
format codes
smahmed776 Jun 21, 2023
8d9e948
Add inherited datacontainer types
39bytes Jun 26, 2023
1b2a5cb
Update decorator to accept deps kwarg
39bytes Jun 26, 2023
4593b5c
Forgot to remove comment
39bytes Jun 26, 2023
7493007
Bump version
39bytes Jun 26, 2023
15a5932
Export DataContainer types
39bytes Jun 26, 2023
0c82c42
Fix decorator default param
39bytes Jun 26, 2023
925c0be
Small fixes
39bytes Jun 26, 2023
d1e98ae
Merge branch 'main' into jeff-autogen-manifest
39bytes Jun 26, 2023
7070fe4
Fix
39bytes Jun 26, 2023
9e4110f
Format
39bytes Jun 26, 2023
8204dd4
Fix type errors
39bytes Jun 26, 2023
849c543
Merge branch 'generate-command-section' into jeff-autogen-manifest
39bytes Jun 26, 2023
0fb9d14
attempt 3 times for fetching previous job result
smahmed776 Jun 26, 2023
51cbfe9
Merge branch 'main' into generate-command-section
smahmed776 Jun 26, 2023
e8189ab
Add node type to decorator
39bytes Jun 28, 2023
f6714c5
Let studio backend decide what to do when directions are not specified
39bytes Jun 28, 2023
dde6c93
LOOP and CONDITIONAL working (#48)
smahmed776 Jul 5, 2023
38cee77
Support multi inputs (#49)
39bytes Jul 5, 2023
62dae20
Add method for parsing all list types (#51)
smahmed776 Jul 6, 2023
e3e8587
migrate node_sdk and dao to flojoy package
izi-on Jul 6, 2023
877f489
migrate job service
izi-on Jul 6, 2023
47ea45f
fix imports
izi-on Jul 6, 2023
e05918c
Merge branch 'migrate-node-sdk' into generate-command-section
izi-on Jul 6, 2023
5ccc983
reduce waiting time for fetching job input
izi-on Jul 6, 2023
9e91000
added extra param for all dc types
youngsun4786 Jul 7, 2023
0b53327
fixed default input
youngsun4786 Jul 7, 2023
040c019
Merge pull request #53 from flojoy-io/extra-parameter-nick
smahmed776 Jul 7, 2023
584d052
checkpoint
izi-on Jul 7, 2023
b7f7ae2
add missing locks
izi-on Jul 7, 2023
e09355b
rewrite job service
izi-on Jul 8, 2023
6ca4410
no redis version
izi-on Jul 9, 2023
ec3a3d9
removed some useless print statements
izi-on Jul 9, 2023
58a9231
small tweaks to memory
izi-on Jul 9, 2023
4bfd3bb
more adjustments to memory
izi-on Jul 9, 2023
1534bec
raise exception when no object found from datastore
izi-on Jul 9, 2023
6dd7826
fix return type of get_job_function
izi-on Jul 10, 2023
e59c8e8
get rid of redis entirely
izi-on Jul 10, 2023
327f5e0
Merge branch 'generate-command-section' into new-node-api
39bytes Jul 11, 2023
2b1e23a
export smallmemory from package and update some functions
smahmed776 Jul 12, 2023
4200414
node init implemented
izi-on Jul 13, 2023
b24f52f
add @run_in_venv decorator
Roulbac Jul 14, 2023
66e55d6
add cloudpickle to reqs
Roulbac Jul 14, 2023
74a67f0
implement node init
izi-on Jul 14, 2023
873fa81
pin flojoy and cloudpickle versions to those of parent process
Roulbac Jul 14, 2023
8cf6136
delete venv on failed install
Roulbac Jul 14, 2023
6cf4b56
merge new-node api
izi-on Jul 14, 2023
1961168
add multiplatform CI for flojoy_node_env
Roulbac Jul 14, 2023
e8381fb
fix CI pytest
Roulbac Jul 14, 2023
b4cb69b
fix on.push.branches
Roulbac Jul 14, 2023
255bd61
fix wf name
Roulbac Jul 14, 2023
86201cd
update test_flojoy_node_env.py
Roulbac Jul 14, 2023
c84c628
empty commit to trigger wf
Roulbac Jul 14, 2023
32d987c
fix ci
Roulbac Jul 14, 2023
dcc7f59
fix ci
Roulbac Jul 14, 2023
624658d
fix tempfile.gettempdir patch
Roulbac Jul 14, 2023
27a6158
fix tempfile.gettempdir patch
Roulbac Jul 14, 2023
d75f94c
fix test
Roulbac Jul 14, 2023
bb03b53
add start_method override to MultiprocessingExecutableContextManager
Roulbac Jul 14, 2023
4b6c930
force set_start_method
Roulbac Jul 14, 2023
6fd4d26
add support for win32 python executable path
Roulbac Jul 14, 2023
e30f1a2
fix mock_tempdir for windows
Roulbac Jul 14, 2023
356178f
add 2 more tests
Roulbac Jul 14, 2023
18fd20a
add --runslow option for pytest
Roulbac Jul 14, 2023
f9356a7
fix automated test
Roulbac Jul 14, 2023
4914fc5
merge new-node-api in current
Roulbac Jul 14, 2023
6786755
merge new-node-api in current
Roulbac Jul 14, 2023
4c29ed4
cache in FLOJOY_CACHE_DIR
Roulbac Jul 14, 2023
a2a41c6
add docstr to run_in_venv
Roulbac Jul 14, 2023
dedb2b8
Merge pull request #56 from flojoy-io/reda-pip-env
jackparmer Jul 14, 2023
c77b704
Update flojoy_node_venv.py
Roulbac Jul 14, 2023
4f2e3ea
publish from current branch
smahmed776 Jul 14, 2023
b0a033e
Merge branch 'new-node-api' of github.com:flojoy-io/python into new-n…
smahmed776 Jul 14, 2023
07c68c0
add vector type here
39bytes Jul 14, 2023
6c55750
add flojoy hf_hub_download
Roulbac Jul 15, 2023
08748c7
change default cache path
Roulbac Jul 15, 2023
9f59c5c
bump pypi version to dev6
Roulbac Jul 15, 2023
41d79df
Merge pull request #60 from flojoy-io/reda-hf-hub-download
Roulbac Jul 15, 2023
763abf0
include typestub in pypi dist and clean __init__.pyi
Roulbac Jul 17, 2023
7014d3c
up version to dev7
Roulbac Jul 17, 2023
31c3094
fix stubs
Roulbac Jul 17, 2023
fe69544
add missing imports to pyi
Roulbac Jul 17, 2023
b6a150b
add snapshot_download
Roulbac Jul 17, 2023
e6e6321
Merge branch 'main' into new-node-api
smahmed776 Jul 17, 2023
9f8525f
bump version to dev11
smahmed776 Jul 17, 2023
7cd5d54
add surface case to plotly utils
smahmed776 Jul 17, 2023
126b1d1
add error handing from run_in_venv
Roulbac Jul 18, 2023
87fb27f
fix typo
Roulbac Jul 18, 2023
9e45426
add hfhub to reqs
Roulbac Jul 18, 2023
e0ed1dd
install flojoy in automated test
Roulbac Jul 18, 2023
4d08e7b
move flytekit to 1.6.2
Roulbac Jul 18, 2023
260f422
omit flyte test because of a bug
Roulbac Jul 18, 2023
d428115
Merge pull request #61 from flojoy-io/reda-fix-run-in-venv-hang
Roulbac Jul 19, 2023
d06e291
Update setup.py
Roulbac Jul 19, 2023
a31b83c
replace run_in_venv recv_bytes by recv
Roulbac Jul 31, 2023
e492415
align run_in_venv pipe IO to read/send bytes
Roulbac Jul 31, 2023
0604f4c
Add new field in result object called "text_blob" for TextBlob DC typ…
smahmed776 Aug 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ on:
push:
branches:
- main
- new-node-api

permissions:
contents: read
Expand Down
83 changes: 83 additions & 0 deletions .github/workflows/test-flojoy-node-env.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
name: Test flojoy_node_env

on:
push:
branches:
- "main"
paths:
- "flojoy/flojoy_node_env.py"
- "tests/flojoy_node_venv_test_.py"

pull_request:
paths:
- "flojoy/flojoy_node_env.py"
- "tests/flojoy_node_venv_test_.py"

workflow_dispatch:

jobs:
ubuntu:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
submodules: recursive

- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: Install pip dependencies
run: |
pip install ruff pytest
pip install -r requirements.txt
pip install -e .

- name: Run python tests
run: python -m pytest -vv tests/flojoy_node_venv_test_.py --runslow

macos:
runs-on: macos-latest
steps:
- uses: actions/checkout@v3
with:
submodules: recursive

- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: Install pip dependencies
run: |
pip install ruff pytest
pip install -r requirements.txt
pip install -e .

- name: Run python tests
run: python -m pytest -vv tests/flojoy_node_venv_test_.py --runslow

windows:
runs-on: windows-latest
steps:
- uses: actions/checkout@v3
with:
submodules: recursive

- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"

- name: Install pip dependencies
run: |
pip install ruff pytest
pip install -r requirements.txt
pip install -e .
shell: powershell

- name: Run python tests
run: python -m pytest -vv tests/flojoy_node_venv_test_.py --runslow
shell: powershell



2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ from flojoy import flojoy, DataContainer
def BUTTER(v, params):
''' Apply a butterworth filter to an input vector '''

print('Butterworth inputs:', v)
logger.debug('Butterworth inputs:', v)

x = v[0].x
sig = v[0].y
Expand Down
4 changes: 4 additions & 0 deletions flojoy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@
from .utils import *
from .parameter_types import *
from .small_memory import *
from .flojoy_node_venv import *
from .job_service import *
from .node_init import *
from .config import *
281 changes: 278 additions & 3 deletions flojoy/__init__.pyi

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions flojoy/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import logging

LOGGER_NAME = "flojoy"


class FlojoyConfig:
_instance = None

@classmethod
def get_instance(cls):
if cls._instance is None:
cls._instance = FlojoyConfig()
return cls._instance

def __init__(self):
self.is_offline = False


logger = logging.getLogger(LOGGER_NAME)
123 changes: 92 additions & 31 deletions flojoy/dao.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import numpy as np
import pandas as pd
from typing import Any
from typing import Any, Callable
from threading import Lock
from .data_container import DCNpArrayType

MAX_LIST_SIZE = 1000

_dict_sm_lock = Lock() # dict small memory lock
_dict_job_lock = Lock() # dict job lock
_dict_node_init_container_lock = Lock()
_dict_node_init_func_lock = Lock()
_init_lock = Lock()

"""
Used by clients to create a new instance of the datastorage
"""
Expand All @@ -25,121 +31,176 @@ def create_storage():

class Dao:
_instance = None
_init_lock = Lock()
_dict_sm_lock = Lock() # dict small memory lock
_dict_job_lock = Lock() # dict job lock

@classmethod
def get_instance(cls):
with cls._init_lock:
with _init_lock:
if Dao._instance is None:
Dao._instance = Dao()
return Dao._instance

def __init__(self):
self.storage: dict[str, Any] = {} # small memory
self.job_results: dict[str, Any] = {}
self.storage = {} # small memory
self.job_results = {}
self.node_init_container = {}
self.node_init_func = {}

"""
METHODS FOR JOB RESULTS
"""

def get_job_result(self, job_id: str) -> Any | None:
res = self.job_results.get(job_id, None)
with _dict_job_lock:
res = self.job_results.get(job_id, None)
if res is None:
raise ValueError(f"Job result with id {job_id} does not exist")
return res

def post_job_result(self, job_id: str, result: Any):
with self._dict_job_lock:
with _dict_job_lock:
self.job_results[job_id] = result

def clear_job_results(self):
with self._dict_job_lock:
with _dict_job_lock:
self.job_results.clear()

def job_exists(self, job_id: str) -> bool:
return job_id in self.job_results.keys()
with _dict_job_lock:
return job_id in self.job_results.keys()

def delete_job(self, job_id: str):
with self._dict_job_lock:
with _dict_job_lock:
self.job_results.pop(job_id, None)

"""
METHODS FOR SMALL MEMORY
"""

def clear_small_memory(self):
with self._dict_sm_lock:
with _dict_sm_lock:
self.storage.clear()

def check_if_valid(self, result: Any | None, expected_type: Any):
if result is not None and not isinstance(result, expected_type):
raise ValueError(
f"Expected {expected_type} type, but got {type(result)} instead!"
)
with _dict_sm_lock:
if result is not None and not isinstance(result, expected_type):
raise ValueError(
f"Expected {expected_type} type, but got {type(result)} instead!"
)

def set_np_array(self, memo_key: str, value: DCNpArrayType):
with self._dict_sm_lock:
with _dict_sm_lock:
self.storage[memo_key] = value

def set_pandas_dataframe(self, key: str, dframe: pd.DataFrame):
with self._dict_sm_lock:
with _dict_sm_lock:
self.storage[key] = dframe

def set_str(self, key: str, value: str):
with self._dict_sm_lock:
with _dict_sm_lock:
self.storage[key] = value

def get_pd_dataframe(self, key: str) -> pd.DataFrame | None:
encoded = self.storage.get(key, None)
with _dict_sm_lock:
encoded = self.storage.get(key, None)
self.check_if_valid(encoded, pd.DataFrame)
return encoded

def get_np_array(self, memo_key: str) -> DCNpArrayType | None:
encoded = self.storage.get(memo_key, None)
with _dict_sm_lock:
encoded = self.storage.get(memo_key, None)
self.check_if_valid(encoded, np.ndarray)
return encoded

def get_str(self, key: str) -> str | None:
encoded = self.storage.get(key, None)
with _dict_sm_lock:
encoded = self.storage.get(key, None)
return encoded

def get_obj(self, key: str) -> dict[str, Any] | None:
r_obj = self.storage.get(key, None)
with _dict_sm_lock:
r_obj = self.storage.get(key, None)
self.check_if_valid(r_obj, dict)
return r_obj

def set_obj(self, key: str, value: dict[str, Any]):
with self._dict_sm_lock:
with _dict_sm_lock:
self.storage[key] = value

def delete_object(self, key: str):
with self._dict_sm_lock:
with _dict_sm_lock:
self.storage.pop(key)

def remove_item_from_set(self, key: str, item: Any):
res = self.storage.get(key, None)
with _dict_sm_lock:
res = self.storage.get(key, None)
self.check_if_valid(res, set)
if not res:
return
with self._dict_sm_lock:
with _dict_sm_lock:
res.remove(item)

def add_to_set(self, key: str, value: Any):
res: set[Any] | None = self.storage.get(key, None)
with _dict_sm_lock:
res: set[Any] | None = self.storage.get(key, None)
if res is None:
res = set()
res.add(value)
self.storage[key] = res
return
self.check_if_valid(res, set)
with self._dict_sm_lock:
with _dict_sm_lock:
res.add(value)

def get_set_list(self, key: str) -> list[Any] | None:
res = self.storage.get(key, None)
with _dict_sm_lock:
res = self.storage.get(key, None)
if res is None:
return None
self.check_if_valid(res, set)
return list(res)

"""
METHODS FOR NODE INIT
"""

# -- for node container --
def clear_node_init_containers(self):
with _dict_node_init_container_lock:
self.node_init_container.clear()

def set_init_container(self, node_id: str, value):
with _dict_node_init_container_lock:
self.node_init_container[node_id] = value

def get_init_container(self, node_id: str):
with _dict_node_init_container_lock:
res = self.node_init_container.get(node_id, None)
from .node_init import NodeInitContainer # avoid circular import

self.check_if_valid(res, NodeInitContainer)
return res

def has_init_container(self, node_id: str) -> bool:
with _dict_node_init_container_lock:
return node_id in self.node_init_container.keys()

# ------------------------

# -- for node init function --
def set_init_function(self, node_func, node_init_func):
with _dict_node_init_func_lock:
self.node_init_func[node_func] = node_init_func

def get_init_function(self, node_func: Callable):
with _dict_node_init_func_lock:
res = self.node_init_func.get(node_func, None)
from .node_init import NodeInit # avoid circular import

self.check_if_valid(res, NodeInit)
return res

def has_init_function(self, node_func) -> bool:
with _dict_node_init_func_lock:
return node_func in self.node_init_func.keys()

# ----------------------------
Loading