Skip to content
This repository has been archived by the owner on Oct 10, 2023. It is now read-only.

Commit

Permalink
Add new field in result object called "text_blob" for TextBlob DC type (
Browse files Browse the repository at this point in the history
#64)

* bytes type and base64 encoder

* fix support for old node params typing

* text_blob dtype

* add bytes class

* add textblob class support

* offline mode for precompilation

* merge node-init into main

* get rid of useless var

* add lock for read operations in dao

* fix/hacky invalid value plotly error

* style

* option to toggle print

* switch to logging instead

* style

* add new field in result object called text_blob

* return text_blob for both text_blob and bytes type

---------

Co-authored-by: Vinicius <[email protected]>
Co-authored-by: Luiz Tauffer <[email protected]>
Co-authored-by: Hristo <[email protected]>
Co-authored-by: Joey Yu <[email protected]>
  • Loading branch information
5 people authored Aug 1, 2023
1 parent e492415 commit 0604f4c
Show file tree
Hide file tree
Showing 19 changed files with 402 additions and 229 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ from flojoy import flojoy, DataContainer
def BUTTER(v, params):
''' Apply a butterworth filter to an input vector '''

print('Butterworth inputs:', v)
logger.debug('Butterworth inputs:', v)

x = v[0].x
sig = v[0].y
Expand Down
1 change: 1 addition & 0 deletions flojoy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
from .flojoy_node_venv import *
from .job_service import *
from .node_init import *
from .config import *
4 changes: 2 additions & 2 deletions flojoy/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
from typing import Optional, Union, Any, Literal, TypedDict
from pathlib import Path

from .data_container import *
from .flojoy_python import *
from .job_result_builder import *
from .flojoy_instruction import *
from .plotly_utils import *
from .module_scraper import *
from .job_result_utils import *
from .data_container import *
from .utils import *
from .parameter_types import *
from .small_memory import *
from .flojoy_node_venv import *
from .job_service import *
from .node_init import *
from .data_container import *
from .config import *

def hf_hub_download(
repo_id: str,
Expand Down
19 changes: 19 additions & 0 deletions flojoy/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import logging

LOGGER_NAME = "flojoy"


class FlojoyConfig:
_instance = None

@classmethod
def get_instance(cls):
if cls._instance is None:
cls._instance = FlojoyConfig()
return cls._instance

def __init__(self):
self.is_offline = False


logger = logging.getLogger(LOGGER_NAME)
157 changes: 76 additions & 81 deletions flojoy/dao.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import numpy as np
import pandas as pd
from typing import Any, Callable, cast
from typing import Any, Callable
from threading import Lock
from .data_container import DCNpArrayType

MAX_LIST_SIZE = 1000

_dict_sm_lock = Lock() # dict small memory lock
_dict_job_lock = Lock() # dict job lock
_dict_node_init_container_lock = Lock()
_dict_node_init_func_lock = Lock()
_init_lock = Lock()

"""
Used by clients to create a new instance of the datastorage
"""
Expand All @@ -24,188 +31,176 @@ def create_storage():

class Dao:
_instance = None
_init_lock = Lock()

@classmethod
def get_instance(cls):
with cls._init_lock:
with _init_lock:
if Dao._instance is None:
Dao._instance = Dao()
return Dao._instance

def __init__(self):

self.storage = {} # small memory
self.job_results = {}
self.node_init_container = {}
self.node_init_func = {}

self.dict_sm_lock = Lock()
self.dict_job_lock = Lock()
self.dict_node_init_container_lock = Lock()
self.dict_node_init_func_lock = Lock()

self.storage = {} # small memory
self.job_results = {}
self.node_init_container = {}
self.node_init_func = {}

"""
METHODS FOR JOB RESULTS
"""

def get_job_result(self, job_id: str) -> Any | None:
res = self.job_results.get(job_id, None)
with _dict_job_lock:
res = self.job_results.get(job_id, None)
if res is None:
raise Exception(f"Job result with id {job_id} does not exist")
raise ValueError(f"Job result with id {job_id} does not exist")
return res

def post_job_result(self, job_id: str, result: Any):
with self.dict_job_lock:
with _dict_job_lock:
self.job_results[job_id] = result

def clear_job_results(self):
with self.dict_job_lock:
with _dict_job_lock:
self.job_results.clear()

def job_exists(self, job_id: str) -> bool:
return job_id in self.job_results.keys()
with _dict_job_lock:
return job_id in self.job_results.keys()

def delete_job(self, job_id: str):
with self.dict_job_lock:
with _dict_job_lock:
self.job_results.pop(job_id, None)

"""
METHODS FOR SMALL MEMORY
"""

def clear_small_memory(self):
with self.dict_sm_lock:
with _dict_sm_lock:
self.storage.clear()

def check_if_valid(self, result, expected_type):
if result is not None and not isinstance(result, expected_type):
raise ValueError(
f"Expected {expected_type} type, but got {type(result)} instead!"
)
def check_if_valid(self, result: Any | None, expected_type: Any):
with _dict_sm_lock:
if result is not None and not isinstance(result, expected_type):
raise ValueError(
f"Expected {expected_type} type, but got {type(result)} instead!"
)

def set_np_array(self, memo_key: str, value: np.ndarray):
# encoded = self.serialize_np(value)
# self.storage[memo_key] = encoded
with self.dict_sm_lock:
def set_np_array(self, memo_key: str, value: DCNpArrayType):
with _dict_sm_lock:
self.storage[memo_key] = value

def set_pandas_dataframe(self, key: str, dframe: pd.DataFrame):
# encode = dframe.to_json()
# self.storage[key] = encode
with self.dict_sm_lock:
with _dict_sm_lock:
self.storage[key] = dframe

def set_str(self, key: str, value: str):
with self.dict_sm_lock:
with _dict_sm_lock:
self.storage[key] = value

def get_pd_dataframe(self, key: str) -> pd.DataFrame:
encoded = self.storage.get(key, None)
if encoded is None:
return pd.read_json("")
def get_pd_dataframe(self, key: str) -> pd.DataFrame | None:
with _dict_sm_lock:
encoded = self.storage.get(key, None)
self.check_if_valid(encoded, pd.DataFrame)
# decode = encoded.decode("utf-8") if encoded is not None else ""
# read_json = pd.read_json(decode)
return encoded.head()

def get_np_array(self, memo_key: str, np_meta_data: dict[str, str]) -> np.ndarray:
encoded = self.storage.get(memo_key, None)
if encoded is None:
return np.array([])
return encoded

def get_np_array(self, memo_key: str) -> DCNpArrayType | None:
with _dict_sm_lock:
encoded = self.storage.get(memo_key, None)
self.check_if_valid(encoded, np.ndarray)
return encoded

def get_str(self, key: str) -> str | None:
encoded = self.storage.get(key, None)
if encoded is None:
return None
with _dict_sm_lock:
encoded = self.storage.get(key, None)
return encoded

def get_obj(self, key: str) -> dict[str, Any] | None:
r_obj = self.storage.get(key, {})
# if r_obj:
# return cast(dict[str, Any], json.loads(r_obj))
with _dict_sm_lock:
r_obj = self.storage.get(key, None)
self.check_if_valid(r_obj, dict)
return r_obj

def set_obj(self, key: str, value: dict[str, Any]):
# dump = json.dumps(value)
with self.dict_sm_lock:
with _dict_sm_lock:
self.storage[key] = value

def delete_object(self, key: str):
with self.dict_sm_lock:
with _dict_sm_lock:
self.storage.pop(key)

def remove_item_from_set(self, key: str, item: Any):
res = self.storage.get(key, None)
with _dict_sm_lock:
res = self.storage.get(key, None)
self.check_if_valid(res, set)
with self.dict_sm_lock:
if not res:
return
with _dict_sm_lock:
res.remove(item)

def add_to_set(self, key: str, value: Any):
res = self.storage.get(key, None)
with _dict_sm_lock:
res: set[Any] | None = self.storage.get(key, None)
if res is None:
res = set()
res.add(value)
self.storage[key] = res
return
self.check_if_valid(res, set)
with self.dict_sm_lock:
with _dict_sm_lock:
res.add(value)

def get_set_list(self, key: str) -> list[Any] | None:
res = self.storage.get(key, None)
with _dict_sm_lock:
res = self.storage.get(key, None)
if res is None:
return None
self.check_if_valid(res, set)
return list(res)

def serialize_np(self, np_array: np.ndarray):
return np_array.ravel().tostring()

def desirialize_np(self, encoded: bytes, np_meta_data: dict[str, str]):
d_type = np_meta_data.get("d_type", "")
dimensions = np_meta_data.get("dimensions", [])
shapes_in_int = [int(shape) for shape in dimensions]
return np.fromstring(encoded, dtype=d_type).reshape(*shapes_in_int)

"""
METHODS FOR NODE INIT
"""

# -- for node container --
def clear_node_init_containers(self):
with self.dict_node_init_container_lock:
with _dict_node_init_container_lock:
self.node_init_container.clear()

def set_init_container(self, node_id: str, value):
with self.dict_node_init_container_lock:
with _dict_node_init_container_lock:
self.node_init_container[node_id] = value

def get_init_container(self, node_id: str):
res = self.node_init_container.get(node_id, None)
from .node_init import NodeInitContainer # avoid circular import
with _dict_node_init_container_lock:
res = self.node_init_container.get(node_id, None)
from .node_init import NodeInitContainer # avoid circular import

self.check_if_valid(res, NodeInitContainer)
return res

def has_init_container(self, node_id: str) -> bool:
return node_id in self.node_init_container.keys()
with _dict_node_init_container_lock:
return node_id in self.node_init_container.keys()

# ------------------------

# -- for node init function --
def set_init_function(self, node_func, node_init_func):
with self.dict_node_init_func_lock:
with _dict_node_init_func_lock:
self.node_init_func[node_func] = node_init_func

def get_init_function(self, node_func: Callable):
res = self.node_init_func.get(node_func, None)
from .node_init import NodeInit # avoid circular import
with _dict_node_init_func_lock:
res = self.node_init_func.get(node_func, None)
from .node_init import NodeInit # avoid circular import

self.check_if_valid(res, NodeInit)
return res

def has_init_function(self, node_func) -> bool:
return node_func in self.node_init_func.keys()
with _dict_node_init_func_lock:
return node_func in self.node_init_func.keys()

# ----------------------------
Loading

0 comments on commit 0604f4c

Please sign in to comment.