diff --git a/api/transfer.py b/api/transfer.py index 5194015d..72eae0db 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -36,7 +36,8 @@ from util.notification import create_notification from submodules.model.enums import NotificationType from submodules.model.models import UploadTask -from util import daemon, notification +from util import notification +from submodules.model import daemon from controller.transfer.cognition.minio_upload import handle_cognition_file_upload from controller.task_master import manager as task_master_manager @@ -232,7 +233,7 @@ def put(self, request) -> PlainTextResponse: return PlainTextResponse("Bad project id", status_code=400) task_id = request.path_params["task_id"] - daemon.run( + daemon.run_without_db_token( cognition_import_wizard.prepare_and_finalize_setup, cognition_project_id=cognition_project_id, task_id=task_id, @@ -302,7 +303,7 @@ def init_file_import(task: UploadTask, project_id: str, is_global_update: bool) cognition_preparator.prepare_cognition_import(project_id, task) else: transfer_manager.import_records_from_file(project_id, task) - daemon.run( + daemon.run_with_db_token( __recalculate_missing_attributes_and_embeddings, project_id, str(task.user_id), @@ -378,7 +379,6 @@ def __recalculate_missing_attributes_and_embeddings( def __calculate_missing_attributes(project_id: str, user_id: str) -> None: # wait a second to ensure that the process is started in the tokenization service time.sleep(5) - ctx_token = general.get_ctx_token() attributes_usable = attribute.get_all_ordered( project_id, True, @@ -387,7 +387,6 @@ def __calculate_missing_attributes(project_id: str, user_id: str) -> None: ], ) if len(attributes_usable) == 0: - general.remove_and_refresh_session(ctx_token, False) return # stored as list so connection results do not affect @@ -405,7 +404,7 @@ def __calculate_missing_attributes(project_id: str, user_id: str) -> None: i += 1 if i >= 60: i = 0 - ctx_token = general.remove_and_refresh_session(ctx_token, True) + daemon.reset_session_token_in_thread() if tokenization.is_doc_bin_creation_running_or_queued(project_id): time.sleep(2) continue @@ -420,7 +419,7 @@ def __calculate_missing_attributes(project_id: str, user_id: str) -> None: break if i >= 60: i = 0 - ctx_token = general.remove_and_refresh_session(ctx_token, True) + daemon.reset_session_token_in_thread() current_att_id = attribute_ids[0] current_att = attribute.get(project_id, current_att_id) @@ -468,4 +467,3 @@ def __calculate_missing_attributes(project_id: str, user_id: str) -> None: project_id=project_id, message="calculate_attribute:finished:all", ) - general.remove_and_refresh_session(ctx_token, False) diff --git a/controller/attribute/manager.py b/controller/attribute/manager.py index a0da8663..928149de 100644 --- a/controller/attribute/manager.py +++ b/controller/attribute/manager.py @@ -16,7 +16,9 @@ RecordTokenizationScope, AttributeVisibility, ) -from util import daemon, notification +from util import notification + +from submodules.model import daemon from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType @@ -246,7 +248,7 @@ def calculate_user_attribute_all_records( notification.send_organization_update( project_id=project_id, message=f"calculate_attribute:started:{attribute_id}" ) - daemon.run( + daemon.run_without_db_token( __calculate_user_attribute_all_records, project_id, org_id, diff --git a/controller/attribute/util.py b/controller/attribute/util.py index 3436c64e..48729d17 100644 --- a/controller/attribute/util.py +++ b/controller/attribute/util.py @@ -18,9 +18,10 @@ ) from submodules.model.models import Attribute from submodules.s3 import controller as s3 -from util import daemon, notification +from util import notification from controller.knowledge_base import util as knowledge_base from submodules.model import enums +from submodules.model import daemon client = docker.from_env() image = os.getenv("AC_EXEC_ENV_IMAGE") @@ -118,7 +119,7 @@ def run_attribute_calculation_exec_env( ) set_progress(project_id, attribute_item, 0.05) __containers_running[container_name] = True - daemon.run( + daemon.run_without_db_token( read_container_logs_thread, project_id, container_name, @@ -163,7 +164,7 @@ def extend_logs( if not attribute.logs: attribute.logs = logs else: - all_logs = [l for l in attribute.logs] + all_logs = [ll for ll in attribute.logs] all_logs += logs attribute.logs = all_logs general.commit() @@ -195,7 +196,7 @@ def read_container_logs_thread( break if attribute_item.state == enums.AttributeState.FAILED.value: break - if not name in __containers_running: + if name not in __containers_running: break try: # timestamps included to filter out logs that have already been read @@ -205,11 +206,13 @@ def read_container_logs_thread( timestamps=True, since=last_timestamp, ) - except: + except Exception: # failsafe for containers that shut down during the read break current_logs = [ - l for l in str(log_lines.decode("utf-8")).split("\n") if len(l.strip()) > 0 + ll + for ll in str(log_lines.decode("utf-8")).split("\n") + if len(ll.strip()) > 0 ] if len(current_logs) == 0: continue @@ -218,8 +221,8 @@ def read_container_logs_thread( last_timestamp = parser.parse(last_timestamp_str).replace( tzinfo=None ) + datetime.timedelta(seconds=1) - non_progress_logs = [l for l in current_logs if "progress" not in l] - progress_logs = [l for l in current_logs if "progress" in l] + non_progress_logs = [ll for ll in current_logs if "progress" not in ll] + progress_logs = [ll for ll in current_logs if "progress" in ll] if len(non_progress_logs) > 0: extend_logs(project_id, attribute_item, non_progress_logs) if len(progress_logs) == 0: diff --git a/controller/embedding/manager.py b/controller/embedding/manager.py index ab639f20..8183459a 100644 --- a/controller/embedding/manager.py +++ b/controller/embedding/manager.py @@ -4,7 +4,7 @@ from submodules.model import enums from submodules.model.models import Embedding -from util import daemon, notification +from util import notification from . import util from . import connector from .terms import TERMS_INFO @@ -16,6 +16,7 @@ general, project, ) +from submodules.model import daemon from submodules.model.util import sql_alchemy_to_dict from controller.embedding.connector import collection_on_qdrant @@ -74,14 +75,14 @@ def get_recommended_encoders(is_managed: bool) -> List[Any]: def create_embedding(project_id: str, embedding_id: str) -> None: - daemon.run(connector.request_embedding, project_id, embedding_id) + daemon.run_without_db_token(connector.request_embedding, project_id, embedding_id) def create_embeddings_one_by_one( project_id: str, embeddings_ids: List[str], ) -> None: - daemon.run(__embed_one_by_one_helper, project_id, embeddings_ids) + daemon.run_without_db_token(__embed_one_by_one_helper, project_id, embeddings_ids) def request_tensor_upload(project_id: str, embedding_id: str) -> Any: @@ -319,7 +320,9 @@ def __recreate_embedding(project_id: str, embedding_id: str) -> Embedding: general.commit() connector.request_deleting_embedding(project_id, old_id) - daemon.run(connector.request_embedding, project_id, new_embedding_item.id) + daemon.run_without_db_token( + connector.request_embedding, project_id, new_embedding_item.id + ) return new_embedding_item diff --git a/controller/embedding/util.py b/controller/embedding/util.py index 90e7d87a..a7cb733f 100644 --- a/controller/embedding/util.py +++ b/controller/embedding/util.py @@ -1,8 +1,4 @@ -from controller.embedding import connector -from submodules.model import enums -from submodules.model.business_objects import agreement, embedding, general -from submodules.model.models import Embedding -from util import daemon +from submodules.model.business_objects import embedding def has_encoder_running(project_id: str) -> bool: diff --git a/controller/information_source/manager.py b/controller/information_source/manager.py index 4a70f039..5ff81be6 100644 --- a/controller/information_source/manager.py +++ b/controller/information_source/manager.py @@ -1,6 +1,5 @@ -import json import os -from typing import List, Optional +from typing import List from controller.information_source.util import resolve_source_return_type from submodules.model import InformationSource, LabelingTask, enums from submodules.model.business_objects import ( @@ -10,8 +9,7 @@ ) from controller.misc import config_service from controller.labeling_access_link import manager as link_manager -from controller.record_label_association import manager as rla_manager -from util import daemon +from submodules.model import daemon def get_information_source(project_id: str, source_id: str) -> InformationSource: @@ -65,11 +63,7 @@ def update_information_source( ) -> None: labeling_task_item: LabelingTask = labeling_task.get(project_id, labeling_task_id) return_type: str = resolve_source_return_type(labeling_task_item) - item = information_source.get(project_id, source_id) - new_payload_needed = ( - str(item.source_code) != code or str(item.labeling_task_id) != labeling_task_id - ) - item = information_source.update( + information_source.update( project_id, source_id, labeling_task_id=labeling_task_id, @@ -94,7 +88,9 @@ def delete_information_source(project_id: str, source_id: str) -> None: == enums.InformationSourceType.ACTIVE_LEARNING.value and config_service.get_config_value("is_managed") ): - daemon.run(__delete_active_learner_from_inference_dir, project_id, source_id) + daemon.run_without_db_token( + __delete_active_learner_from_inference_dir, project_id, source_id + ) information_source.delete(project_id, source_id, with_commit=True) diff --git a/controller/misc/config_service.py b/controller/misc/config_service.py index 2a3d2da4..a215fb4f 100644 --- a/controller/misc/config_service.py +++ b/controller/misc/config_service.py @@ -1,8 +1,7 @@ from typing import Dict, Any, Optional, Union import requests -import json import time -from util import daemon +from submodules.model import daemon from util import service_requests __config = None @@ -28,7 +27,7 @@ def refresh_config(): ) global __config __config = response.json() - daemon.run(invalidate_after, 3600) # one hour as failsave + daemon.run_with_db_token(invalidate_after, 3600) # one hour as failsave def get_config_value( diff --git a/controller/payload/payload_scheduler.py b/controller/payload/payload_scheduler.py index 4a17c440..5d1fc3ec 100644 --- a/controller/payload/payload_scheduler.py +++ b/controller/payload/payload_scheduler.py @@ -6,7 +6,6 @@ import pytz import json import docker -import timeit import traceback # from datetime import datetime @@ -14,7 +13,7 @@ import datetime from exceptions.exceptions import PayloadSchedulerError -from submodules.model import enums, events +from submodules.model import enums from submodules.model.business_objects import ( information_source, embedding, @@ -26,6 +25,7 @@ project, organization, ) +from submodules.model import daemon from submodules.model.business_objects.embedding import get_embedding_record_ids from submodules.model.business_objects.information_source import ( get_exclusion_record_ids, @@ -46,7 +46,7 @@ RecordLabelAssociation, InformationSourcePayload, ) -from util import daemon, notification +from util import notification from submodules.s3 import controller as s3 from controller.knowledge_base import util as knowledge_base from controller.misc import config_service @@ -232,7 +232,6 @@ def execution_pipeline( project_id, information_source_item.name, ) - start = timeit.default_timer() run_container( payload_item, project_id, @@ -289,7 +288,6 @@ def execution_pipeline( project_id, f"payload_failed:{information_source_item.id}:{payload_item.id}:{information_source_item.type}", ) - stop = timeit.default_timer() general.commit() org_id = organization.get_id_by_project_id(project_id) @@ -309,7 +307,7 @@ def execution_pipeline( print(traceback.format_exc()) if asynchronous: - daemon.run( + daemon.run_without_db_token( prepare_and_run_execution_pipeline, str(payload.id), project_id, @@ -386,7 +384,7 @@ def run_container( ) set_payload_progress(project_id, information_source_payload, 0.05) __containers_running[container_name] = True - daemon.run( + daemon.run_without_db_token( read_container_logs_thread, project_id, container_name, diff --git a/controller/project/manager.py b/controller/project/manager.py index 89377191..7afb7f69 100644 --- a/controller/project/manager.py +++ b/controller/project/manager.py @@ -17,8 +17,8 @@ information_source, general, ) +from submodules.model import daemon from fast_api.types import HuddleData, ProjectSize -from util import daemon from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType, RecordTokenizationScope from submodules.model.business_objects import util as db_util @@ -139,7 +139,7 @@ def delete_project(project_id: str) -> None: org_id = organization.get_id_by_project_id(project_id) project.delete_by_id(project_id, with_commit=True) - daemon.run(__background_cleanup, org_id, project_id) + daemon.run_without_db_token(__background_cleanup, org_id, project_id) def __background_cleanup(org_id: str, project_id: str) -> None: @@ -295,7 +295,7 @@ def __get_first_data_id(project_id: str, user_id: str, huddle_type: str) -> str: def check_in_deletion_projects() -> None: # this is only supposed to be called during startup of the application - daemon.run(__check_in_deletion_projects) + daemon.run_without_db_token(__check_in_deletion_projects) def __check_in_deletion_projects() -> None: diff --git a/controller/record/manager.py b/controller/record/manager.py index 4e71ae4e..a02a7704 100644 --- a/controller/record/manager.py +++ b/controller/record/manager.py @@ -15,12 +15,12 @@ ) from service.search import search from submodules.model import enums +from submodules.model import daemon from controller.embedding import connector as embedding_connector from controller.record import neural_search_connector from controller.embedding import manager as embedding_manager from controller.tokenization import tokenization_service -from util import daemon from util.miscellaneous_functions import chunk_list import time import traceback @@ -109,7 +109,7 @@ def get_records_by_extended_search( def delete_record(project_id: str, record_id: str) -> None: record.delete(project_id, record_id, with_commit=True) - daemon.run(__reupload_embeddings, project_id) + daemon.run_without_db_token(__reupload_embeddings, project_id) def delete_all_records(project_id: str) -> None: @@ -251,7 +251,7 @@ def __check_and_prep_edit_records( f"can't find embedding PCA for {embedding_item.name}. Try rebuilding or removing the embeddings on settings page." ) continue - if not embedding_item.attribute_id in useable_embeddings: + if embedding_item.attribute_id not in useable_embeddings: useable_embeddings[embedding_item.attribute_id] = [] useable_embeddings[embedding_item.attribute_id].append(embedding_item) diff --git a/controller/record_label_association/manager.py b/controller/record_label_association/manager.py index 9180ffeb..4827a03b 100644 --- a/controller/record_label_association/manager.py +++ b/controller/record_label_association/manager.py @@ -19,7 +19,7 @@ update_is_relevant_manual_label, update_is_valid_manual_label_for_project, ) -from util import daemon +from submodules.model import daemon from controller.weak_supervision import weak_supervision_service as weak_supervision from controller.knowledge_term import manager as term_manager from controller.information_source import manager as information_source_manager @@ -125,7 +125,7 @@ def create_manual_classification_label( ) if label_source_type == enums.LabelSource.INFORMATION_SOURCE.value: update_annotator_progress(project_id, source_id, user_id) - daemon.run( + daemon.run_without_db_token( weak_supervision.calculate_quality_after_labeling, project_id, labeling_task_id, @@ -137,14 +137,14 @@ def create_manual_classification_label( ) if not as_gold_star: label_ids = [str(row.id) for row in label_ids.all()] - daemon.run( + daemon.run_without_db_token( __check_label_duplication_classification_and_react, project_id, record_id, user_id, label_ids, ) - daemon.run( + daemon.run_with_db_token( __update_label_payloads_for_neural_search, project_id, record_id, @@ -216,7 +216,7 @@ def create_manual_extraction_label( ) if label_source_type == enums.LabelSource.INFORMATION_SOURCE.value: update_annotator_progress(project_id, source_id, user_id) - daemon.run( + daemon.run_without_db_token( weak_supervision.calculate_quality_after_labeling, project_id, labeling_task_id, @@ -267,7 +267,7 @@ def create_gold_star_association( update_is_relevant_manual_label( project_id, labeling_task_id, record_id, with_commit=True ) - daemon.run( + daemon.run_with_db_token( __update_label_payloads_for_neural_search, project_id, record_id, @@ -298,7 +298,7 @@ def delete_record_label_association( if source_ids: for s_id in source_ids: update_annotator_progress(project_id, s_id, user_id) - daemon.run( + daemon.run_with_db_token( __update_label_payloads_for_neural_search, project_id, record_id, @@ -315,7 +315,7 @@ def delete_gold_star_association( update_is_relevant_manual_label( project_id, labeling_task_id, record_id, with_commit=True ) - daemon.run( + daemon.run_with_db_token( __update_label_payloads_for_neural_search, project_id, record_id, diff --git a/controller/tokenization/manager.py b/controller/tokenization/manager.py index 61cb4072..ba598f4c 100644 --- a/controller/tokenization/manager.py +++ b/controller/tokenization/manager.py @@ -9,7 +9,7 @@ get_tokenized_record_from_db, get_tokenized_records_from_db, ) -from util import daemon +from submodules.model import daemon from controller.tokenization import tokenization_service from controller.tokenization.tokenization_service import ( request_tokenize_record, @@ -84,7 +84,7 @@ def delete_docbins(project_id: str, records: List[Record]) -> None: def start_record_tokenization(project_id: str, record_id: str) -> None: - daemon.run( + daemon.run_without_db_token( request_tokenize_record, project_id, record_id, diff --git a/controller/transfer/project_transfer_manager.py b/controller/transfer/project_transfer_manager.py index 67eea9dd..8212af5f 100644 --- a/controller/transfer/project_transfer_manager.py +++ b/controller/transfer/project_transfer_manager.py @@ -28,7 +28,8 @@ ) from submodules.model.enums import NotificationType from controller.labeling_access_link import manager as link_manager -from util import daemon, notification, file, security +from util import notification, file, security +from submodules.model import daemon from util.decorator import param_throttle from controller.embedding import manager as embedding_manager from util.notification import create_notification @@ -899,7 +900,7 @@ def __replace_embedding_name( ) general.commit() - daemon.run( + daemon.run_without_db_token( __post_processing_import_threaded, project_id, task_id, diff --git a/controller/weak_supervision/manager.py b/controller/weak_supervision/manager.py index 06b7b044..6171bf2e 100644 --- a/controller/weak_supervision/manager.py +++ b/controller/weak_supervision/manager.py @@ -19,7 +19,8 @@ get_selected_labeling_task_names, ) from submodules.model.enums import NotificationType -from util import daemon, notification +from util import notification +from submodules.model import daemon from controller.weak_supervision.weak_supervision_service import ( initiate_weak_supervision, ) @@ -102,7 +103,7 @@ def execution_pipeline( finally: general.reset_ctx_token(ctx_token) - daemon.run( + daemon.run_without_db_token( execution_pipeline, project_id, str(user_id), diff --git a/fast_api/routes/misc.py b/fast_api/routes/misc.py index 9df4df18..5bfbea02 100644 --- a/fast_api/routes/misc.py +++ b/fast_api/routes/misc.py @@ -314,3 +314,31 @@ def update_customer_buttons( update_request.visible, ) ) + + +@router.get("/dummy/create/wrong/session") +def dummy(): + + def something(): + from submodules.model.business_objects import general + + # general.get_ctx_token() + from submodules.model.business_objects import organization + + print("organization", organization.get_all(), flush=True) + import json + + print( + json.dumps( + general.get_session_lookup(exclude_last_x_seconds=-1), + indent=4, + default=str, + ), + flush=True, + ) + + from submodules.model import daemon + + daemon.run_with_db_token(something) + + return SILENT_SUCCESS_RESPONSE diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py index ee2736b3..6b2c49d2 100644 --- a/fast_api/routes/task_execution.py +++ b/fast_api/routes/task_execution.py @@ -9,8 +9,7 @@ WeakSupervisionActionExecutionBody, ) from fastapi import APIRouter -from util import daemon -from submodules.model.enums import InformationSourceType +from submodules.model import daemon from fast_api.routes.client_response import pack_json_result, SILENT_SUCCESS_RESPONSE router = APIRouter() @@ -22,7 +21,7 @@ def calculate_attributes( attribute_calculation_task_execution: AttributeCalculationTaskExecutionBody, ): - daemon.run( + daemon.run_with_db_token( attribute_manager.calculate_user_attribute_all_records, attribute_calculation_task_execution.project_id, attribute_calculation_task_execution.organization_id, @@ -58,7 +57,7 @@ def data_slice( data_slice_action_execution: DataSliceActionExecutionBody, ): - daemon.run( + daemon.run_with_db_token( data_slice_manager.create_outlier_slice, data_slice_action_execution.project_id, data_slice_action_execution.user_id, @@ -75,7 +74,7 @@ def weak_supervision( weak_supervision_action_execution: WeakSupervisionActionExecutionBody, ): - daemon.run( + daemon.run_with_db_token( weak_supervision_manager.run_weak_supervision, weak_supervision_action_execution.project_id, weak_supervision_action_execution.user_id, diff --git a/middleware/log_storage.py b/middleware/log_storage.py index 57e29872..66327cd4 100644 --- a/middleware/log_storage.py +++ b/middleware/log_storage.py @@ -5,7 +5,7 @@ import traceback from time import sleep from threading import Lock -from util import daemon +from submodules.model import daemon from fastapi import Request from datetime import datetime from submodules.model.enums import AdminLogLevel, try_parse_enum_value @@ -31,7 +31,7 @@ def add_to_persist_queue(log_path: str, data: Dict[str, Any]): def start_persist_thread(): - daemon.run(__persist_log_loop) + daemon.run_without_db_token(__persist_log_loop) def __persist_log_loop(): diff --git a/submodules/model b/submodules/model index cab584a7..23a32ff5 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit cab584a7a4dae6e2bf8abc21cd31692d2e31c542 +Subproject commit 23a32ff563e6ce8782b6df1d838330d526ee28a7 diff --git a/util/daemon.py b/util/daemon.py deleted file mode 100644 index 5807041d..00000000 --- a/util/daemon.py +++ /dev/null @@ -1,19 +0,0 @@ -import threading - - -def run(target, *args, **kwargs): - threading.Thread( - target=target, - args=args, - kwargs=kwargs, - daemon=True, - ).start() - - -def prepare_thread(target, *args, **kwargs) -> threading.Thread: - return threading.Thread( - target=target, - args=args, - kwargs=kwargs, - daemon=True, - ) diff --git a/util/record_ide.py b/util/record_ide.py index dcbd593b..2e3905b1 100644 --- a/util/record_ide.py +++ b/util/record_ide.py @@ -6,13 +6,12 @@ from controller.tokenization import manager as tokenization_manager import pickle import tarfile -from submodules.model.business_objects import attribute, tokenization from submodules.model.business_objects import record from submodules.model.business_objects.record import get_tokenized_record_from_db import time import uuid -from util import daemon +from submodules.model import daemon client = docker.from_env() image = os.getenv("RECORD_IDE_IMAGE") @@ -70,7 +69,7 @@ def run_record_ide( f"{container.name}:/{knowledge_base_bytes_path}", knowledge_base_tar_path, ) - daemon.run(cancel_container, container_name, container) + daemon.run_without_db_token(cancel_container, container_name, container) __containers_running[container_name] = True container.start() logs_arr = [ diff --git a/util/user_activity.py b/util/user_activity.py index fccf956a..c85f526b 100644 --- a/util/user_activity.py +++ b/util/user_activity.py @@ -1,7 +1,7 @@ import time from typing import Dict, Union, Any, List from submodules.model.business_objects import user_activity, general -from util import daemon +from submodules.model import daemon import os from datetime import datetime import json @@ -23,7 +23,7 @@ def add_user_activity_entry( global __thread_running if not __thread_running: __thread_running = True - daemon.run(__start_thread_db_write) + daemon.run_without_db_token(__start_thread_db_write) activity_set = [user_id, activity, datetime.now(), False] __write_backup_file(activity_set)