Skip to content
This repository has been archived by the owner on Oct 10, 2023. It is now read-only.

Commit

Permalink
use fstring instead of multiple arg for logging and some refactoring (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
smahmed776 authored Oct 3, 2023
1 parent 1f5a25d commit fb878d9
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 54 deletions.
6 changes: 6 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
},
"python.formatting.provider": "none"
}
8 changes: 8 additions & 0 deletions flojoy/CONSTANTS.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
import sys, os

KEY_WORKER_JOBS = "WORKER_JOBS"
KEY_ALL_JOBEST_IDS = "ALL_JOBSET_IDS"
FLOJOY_DIR = ".flojoy"
CREDENTIAL_FILE= "credentials.txt"
if sys.platform == "win32":
FLOJOY_CACHE_DIR = os.path.realpath(os.path.join(os.environ["APPDATA"], FLOJOY_DIR))
else:
FLOJOY_CACHE_DIR = os.path.realpath(os.path.join(os.environ["HOME"], FLOJOY_DIR))
9 changes: 3 additions & 6 deletions flojoy/flojoy_node_venv.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ def TORCH_NODE(default: Matrix) -> Matrix:
"""
from collections.abc import Callable, Iterable, Mapping
from concurrent.futures import Future, ThreadPoolExecutor
import threading
from time import sleep
from typing import Any, Callable, Optional
from typing import Any, Callable

import hashlib
from contextlib import ExitStack, contextmanager
Expand All @@ -42,7 +41,7 @@ def TORCH_NODE(default: Matrix) -> Matrix:
from functools import wraps
import cloudpickle

from .utils import FLOJOY_CACHE_DIR
from .CONSTANTS import FLOJOY_CACHE_DIR
from .logging import LogPipe, LogPipeMode, StreamEnum

__all__ = ["run_in_venv"]
Expand Down Expand Up @@ -100,7 +99,7 @@ def _bootstrap_venv(
# Acquire a lock on the virtual environment to ensure no other process is using it
with portalocker.Lock(
lockfile_path, mode="ab", fail_when_locked=False, flags=portalocker.LOCK_EX
) as lock:
):
logger.info(f"Acquired lock on {lockfile_path}...")
# Check if the virtual environment is complete, i.e. it contains a .venv_is_complete file
venv_is_complete_path = os.path.realpath(
Expand Down Expand Up @@ -183,8 +182,6 @@ def _bootstrap_venv(
with open(venv_is_complete_path, "w") as f:
f.write("")

# Leaved the portalocker.Lock on the virtual environment directory.
return


class PipInstallThread(threading.Thread):
Expand Down
41 changes: 20 additions & 21 deletions flojoy/flojoy_python.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from contextlib import ContextDecorator
import inspect
import os
import traceback
from contextlib import ContextDecorator
from functools import wraps
from inspect import signature
from typing import Any, Callable, Optional

from .config import logger
Expand All @@ -14,8 +13,15 @@
from .models.JobResults.JobFailure import JobFailure
from .models.JobResults.JobSuccess import JobSuccess
from .node_init import NodeInitService
from .parameter_types import format_param_value
from .data_container import DataContainer, Stateful
from typing import Callable, Any, Optional
from .job_result_utils import get_frontend_res_obj_from_result, get_dc_from_result
from .utils import get_hf_hub_cache_path
from .config import logger
from .parameter_types import format_param_value
from inspect import signature
from .job_service import JobService
from .connection_manager import DeviceConnectionManager

__all__ = ["flojoy", "DefaultParams", "display"]

Expand Down Expand Up @@ -44,12 +50,8 @@ def fetch_inputs(previous_jobs: list[dict[str, str]]):
edge = prev_job.get("edge", "")

logger.debug(
"fetching input from prev job id:",
prev_job_id,
" for input:",
input_name,
"edge: ",
edge,
f"fetching input from prev job id: {prev_job_id}"
+ f"for input: {input_name} edge: {edge}"
)

job_result = JobService().get_job_result(prev_job_id)
Expand All @@ -73,8 +75,8 @@ def fetch_inputs(previous_jobs: list[dict[str, str]]):
else:
dict_inputs[input_name] = result

except Exception:
logger.debug(traceback.format_exc())
except Exception as e:
logger.debug(f"{e} {traceback.format_exc()}")

return dict_inputs

Expand Down Expand Up @@ -176,7 +178,7 @@ def wrapper(
ctrls: dict[str, Any] | None = None,
):
try:
logger.debug("previous jobs:", previous_jobs)
logger.debug(f"previous jobs: {previous_jobs}")
# Get command parameters set by the user through the control panel
func_params: dict[str, Any] = {}
if ctrls is not None:
Expand All @@ -187,10 +189,7 @@ def wrapper(
func_params["type"] = "default"

logger.debug(
"executing node_id:",
node_id,
"previous_jobs:",
previous_jobs,
f"executing node_id: {node_id} previous_jobs: {previous_jobs}"
)
dict_inputs = fetch_inputs(previous_jobs)

Expand All @@ -213,18 +212,18 @@ def wrapper(
node_type="default",
)

logger.debug(node_id, " params: ", args.keys())
logger.debug(f"{node_id} params: {args.keys()}")

# check if node has an init container and if so, inject it
if NodeInitService().has_init_store(node_id):
args["init_container"] = NodeInitService().get_init_store(node_id)

if inject_connection:
print("injecting connection", flush=True)
logger.debug("injecting connection")
device = args["connection"]
print(f"device handle: {device}", flush=True)
id = device.get_id()
connection = DeviceConnectionManager.get_connection(id)
logger.debug(f"device handle: {device}")
_id = device.get_id()
connection = DeviceConnectionManager.get_connection(_id)
args["connection"] = connection

# This fixes when people forget to add `= None` in
Expand Down
31 changes: 4 additions & 27 deletions flojoy/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import decimal
import json as _json
import os
import sys
from pathlib import Path
from typing import Any, Callable, Optional

import logging
import numpy as np
import pandas as pd
import yaml
import requests
from dotenv import dotenv_values # type:ignore

# TODO(roulbac): Remove these imports once the nodes using them have been
# tested and updated to use huggingface_hub directly
Expand All @@ -24,29 +21,21 @@
from .node_init import NodeInit, NodeInitService
import keyring
import base64
from .CONSTANTS import FLOJOY_DIR, FLOJOY_CACHE_DIR, CREDENTIAL_FILE


__all__ = [
"send_to_socket",
"get_env_var",
"set_env_var",
"delete_env_var",
"get_credentials",
"hf_hub_download",
"snapshot_download",
"send_to_socket",
"hf_hub_download",
"snapshot_download",
"clear_flojoy_memory",
]

FLOJOY_DIR = ".flojoy"


if sys.platform == "win32":
FLOJOY_CACHE_DIR = os.path.realpath(os.path.join(os.environ["APPDATA"], FLOJOY_DIR))
else:
FLOJOY_CACHE_DIR = os.path.realpath(os.path.join(os.environ["HOME"], FLOJOY_DIR))

# # package result
# def package_result(result: dict | None, fn: str, node_id: str, jobset_id: str) -> dict:
Expand Down Expand Up @@ -75,11 +64,6 @@ def get_hf_hub_cache_path() -> str:
return os.path.join(FLOJOY_CACHE_DIR, "cache", "huggingface")


env_vars = dotenv_values("../.env")
port = env_vars.get("VITE_BACKEND_PORT", "5392")
BACKEND_URL = os.environ.get("BACKEND_URL", f"http://127.0.0.1:{port}")


def set_offline():
"""
Sets the is_offline flag to True, which means that results will not be sent to the backend via HTTP.
Expand Down Expand Up @@ -116,13 +100,6 @@ def clear_flojoy_memory():
DeviceConnectionManager.clear()


def send_to_socket(data: str):
if FlojoyConfig.get_instance().is_offline:
return
logger.debug("posting data to socket:", f"{BACKEND_URL}/worker_response")
requests.post(f"{BACKEND_URL}/worker_response", json=data)


class PlotlyJSONEncoder(_json.JSONEncoder):
"""
Meant to be passed as the `cls` kwarg to json.dumps(obj, cls=..)
Expand Down Expand Up @@ -333,7 +310,7 @@ def get_env_var(key: str) -> Optional[str]:
def set_env_var(key: str, value: str):
keyring.set_password("flojoy", key, value)
home = str(Path.home())
file_path = os.path.join(home, os.path.join(FLOJOY_DIR, "credentials.txt"))
file_path = os.path.join(home, os.path.join(FLOJOY_DIR, CREDENTIAL_FILE))

if not os.path.exists(file_path):
with open(file_path, "w") as f:
Expand All @@ -351,7 +328,7 @@ def set_env_var(key: str, value: str):

def delete_env_var(key: str):
home = str(Path.home())
file_path = os.path.join(home, os.path.join(FLOJOY_DIR, "credentials.txt"))
file_path = os.path.join(home, os.path.join(FLOJOY_DIR, CREDENTIAL_FILE))

if not os.path.exists(file_path):
return
Expand All @@ -372,7 +349,7 @@ def delete_env_var(key: str):

def get_credentials() -> list[dict[str, str]]:
home = str(Path.home())
file_path = os.path.join(home, os.path.join(FLOJOY_DIR, "credentials.txt"))
file_path = os.path.join(home, os.path.join(FLOJOY_DIR, CREDENTIAL_FILE))

if not os.path.exists(file_path):
return []
Expand Down

0 comments on commit fb878d9

Please sign in to comment.