From fae18c8337ea5e59f3a99cd7d89e8af299aee578 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Tue, 15 Oct 2024 14:43:34 +0200 Subject: [PATCH] Add session cleanup + rename daemon run --- api/transfer.py | 2 +- app.py | 2 ++ controller/attribute/manager.py | 2 +- controller/attribute/util.py | 2 +- controller/embedding/manager.py | 8 +++----- controller/information_source/manager.py | 4 +--- controller/payload/payload_scheduler.py | 4 ++-- controller/project/manager.py | 4 ++-- controller/record/manager.py | 2 +- controller/record_label_association/manager.py | 6 +++--- controller/tokenization/manager.py | 2 +- controller/transfer/project_transfer_manager.py | 2 +- controller/weak_supervision/manager.py | 2 +- middleware/log_storage.py | 2 +- submodules/model | 2 +- util/record_ide.py | 2 +- util/user_activity.py | 2 +- 17 files changed, 24 insertions(+), 26 deletions(-) diff --git a/api/transfer.py b/api/transfer.py index 72eae0db..861f7dff 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -233,7 +233,7 @@ def put(self, request) -> PlainTextResponse: return PlainTextResponse("Bad project id", status_code=400) task_id = request.path_params["task_id"] - daemon.run_without_db_token( + daemon.run( cognition_import_wizard.prepare_and_finalize_setup, cognition_project_id=cognition_project_id, task_id=task_id, diff --git a/app.py b/app.py index eed4e3d0..6271906a 100644 --- a/app.py +++ b/app.py @@ -62,6 +62,7 @@ ) from util import security, clean_up from middleware import log_storage +from submodules.model import session logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) @@ -154,3 +155,4 @@ clean_up.clean_up_disk() log_storage.start_persist_thread() +session.start_session_cleanup_thread() diff --git a/controller/attribute/manager.py b/controller/attribute/manager.py index 928149de..9b10e4c9 100644 --- a/controller/attribute/manager.py +++ b/controller/attribute/manager.py @@ -248,7 +248,7 @@ def calculate_user_attribute_all_records( notification.send_organization_update( project_id=project_id, message=f"calculate_attribute:started:{attribute_id}" ) - daemon.run_without_db_token( + daemon.run( __calculate_user_attribute_all_records, project_id, org_id, diff --git a/controller/attribute/util.py b/controller/attribute/util.py index 48729d17..ad7c8352 100644 --- a/controller/attribute/util.py +++ b/controller/attribute/util.py @@ -119,7 +119,7 @@ def run_attribute_calculation_exec_env( ) set_progress(project_id, attribute_item, 0.05) __containers_running[container_name] = True - daemon.run_without_db_token( + daemon.run( read_container_logs_thread, project_id, container_name, diff --git a/controller/embedding/manager.py b/controller/embedding/manager.py index 8183459a..d075d1b9 100644 --- a/controller/embedding/manager.py +++ b/controller/embedding/manager.py @@ -75,14 +75,14 @@ def get_recommended_encoders(is_managed: bool) -> List[Any]: def create_embedding(project_id: str, embedding_id: str) -> None: - daemon.run_without_db_token(connector.request_embedding, project_id, embedding_id) + daemon.run(connector.request_embedding, project_id, embedding_id) def create_embeddings_one_by_one( project_id: str, embeddings_ids: List[str], ) -> None: - daemon.run_without_db_token(__embed_one_by_one_helper, project_id, embeddings_ids) + daemon.run(__embed_one_by_one_helper, project_id, embeddings_ids) def request_tensor_upload(project_id: str, embedding_id: str) -> Any: @@ -320,9 +320,7 @@ def __recreate_embedding(project_id: str, embedding_id: str) -> Embedding: general.commit() connector.request_deleting_embedding(project_id, old_id) - daemon.run_without_db_token( - connector.request_embedding, project_id, new_embedding_item.id - ) + daemon.run(connector.request_embedding, project_id, new_embedding_item.id) return new_embedding_item diff --git a/controller/information_source/manager.py b/controller/information_source/manager.py index 5ff81be6..3f2d5ac1 100644 --- a/controller/information_source/manager.py +++ b/controller/information_source/manager.py @@ -88,9 +88,7 @@ def delete_information_source(project_id: str, source_id: str) -> None: == enums.InformationSourceType.ACTIVE_LEARNING.value and config_service.get_config_value("is_managed") ): - daemon.run_without_db_token( - __delete_active_learner_from_inference_dir, project_id, source_id - ) + daemon.run(__delete_active_learner_from_inference_dir, project_id, source_id) information_source.delete(project_id, source_id, with_commit=True) diff --git a/controller/payload/payload_scheduler.py b/controller/payload/payload_scheduler.py index 5d1fc3ec..14923a7c 100644 --- a/controller/payload/payload_scheduler.py +++ b/controller/payload/payload_scheduler.py @@ -307,7 +307,7 @@ def execution_pipeline( print(traceback.format_exc()) if asynchronous: - daemon.run_without_db_token( + daemon.run( prepare_and_run_execution_pipeline, str(payload.id), project_id, @@ -384,7 +384,7 @@ def run_container( ) set_payload_progress(project_id, information_source_payload, 0.05) __containers_running[container_name] = True - daemon.run_without_db_token( + daemon.run( read_container_logs_thread, project_id, container_name, diff --git a/controller/project/manager.py b/controller/project/manager.py index 7afb7f69..d77cacd7 100644 --- a/controller/project/manager.py +++ b/controller/project/manager.py @@ -139,7 +139,7 @@ def delete_project(project_id: str) -> None: org_id = organization.get_id_by_project_id(project_id) project.delete_by_id(project_id, with_commit=True) - daemon.run_without_db_token(__background_cleanup, org_id, project_id) + daemon.run(__background_cleanup, org_id, project_id) def __background_cleanup(org_id: str, project_id: str) -> None: @@ -295,7 +295,7 @@ def __get_first_data_id(project_id: str, user_id: str, huddle_type: str) -> str: def check_in_deletion_projects() -> None: # this is only supposed to be called during startup of the application - daemon.run_without_db_token(__check_in_deletion_projects) + daemon.run(__check_in_deletion_projects) def __check_in_deletion_projects() -> None: diff --git a/controller/record/manager.py b/controller/record/manager.py index a02a7704..ec1fddea 100644 --- a/controller/record/manager.py +++ b/controller/record/manager.py @@ -109,7 +109,7 @@ def get_records_by_extended_search( def delete_record(project_id: str, record_id: str) -> None: record.delete(project_id, record_id, with_commit=True) - daemon.run_without_db_token(__reupload_embeddings, project_id) + daemon.run(__reupload_embeddings, project_id) def delete_all_records(project_id: str) -> None: diff --git a/controller/record_label_association/manager.py b/controller/record_label_association/manager.py index 4827a03b..421d789a 100644 --- a/controller/record_label_association/manager.py +++ b/controller/record_label_association/manager.py @@ -125,7 +125,7 @@ def create_manual_classification_label( ) if label_source_type == enums.LabelSource.INFORMATION_SOURCE.value: update_annotator_progress(project_id, source_id, user_id) - daemon.run_without_db_token( + daemon.run( weak_supervision.calculate_quality_after_labeling, project_id, labeling_task_id, @@ -137,7 +137,7 @@ def create_manual_classification_label( ) if not as_gold_star: label_ids = [str(row.id) for row in label_ids.all()] - daemon.run_without_db_token( + daemon.run( __check_label_duplication_classification_and_react, project_id, record_id, @@ -216,7 +216,7 @@ def create_manual_extraction_label( ) if label_source_type == enums.LabelSource.INFORMATION_SOURCE.value: update_annotator_progress(project_id, source_id, user_id) - daemon.run_without_db_token( + daemon.run( weak_supervision.calculate_quality_after_labeling, project_id, labeling_task_id, diff --git a/controller/tokenization/manager.py b/controller/tokenization/manager.py index ba598f4c..a258601d 100644 --- a/controller/tokenization/manager.py +++ b/controller/tokenization/manager.py @@ -84,7 +84,7 @@ def delete_docbins(project_id: str, records: List[Record]) -> None: def start_record_tokenization(project_id: str, record_id: str) -> None: - daemon.run_without_db_token( + daemon.run( request_tokenize_record, project_id, record_id, diff --git a/controller/transfer/project_transfer_manager.py b/controller/transfer/project_transfer_manager.py index 8212af5f..90053668 100644 --- a/controller/transfer/project_transfer_manager.py +++ b/controller/transfer/project_transfer_manager.py @@ -900,7 +900,7 @@ def __replace_embedding_name( ) general.commit() - daemon.run_without_db_token( + daemon.run( __post_processing_import_threaded, project_id, task_id, diff --git a/controller/weak_supervision/manager.py b/controller/weak_supervision/manager.py index 6171bf2e..7ed43b75 100644 --- a/controller/weak_supervision/manager.py +++ b/controller/weak_supervision/manager.py @@ -103,7 +103,7 @@ def execution_pipeline( finally: general.reset_ctx_token(ctx_token) - daemon.run_without_db_token( + daemon.run( execution_pipeline, project_id, str(user_id), diff --git a/middleware/log_storage.py b/middleware/log_storage.py index 66327cd4..c187b7da 100644 --- a/middleware/log_storage.py +++ b/middleware/log_storage.py @@ -31,7 +31,7 @@ def add_to_persist_queue(log_path: str, data: Dict[str, Any]): def start_persist_thread(): - daemon.run_without_db_token(__persist_log_loop) + daemon.run(__persist_log_loop) def __persist_log_loop(): diff --git a/submodules/model b/submodules/model index 23a32ff5..d37a44bf 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 23a32ff563e6ce8782b6df1d838330d526ee28a7 +Subproject commit d37a44bf46ab71d71fba2896482f002bc4d90056 diff --git a/util/record_ide.py b/util/record_ide.py index 2e3905b1..7c34d31f 100644 --- a/util/record_ide.py +++ b/util/record_ide.py @@ -69,7 +69,7 @@ def run_record_ide( f"{container.name}:/{knowledge_base_bytes_path}", knowledge_base_tar_path, ) - daemon.run_without_db_token(cancel_container, container_name, container) + daemon.run(cancel_container, container_name, container) __containers_running[container_name] = True container.start() logs_arr = [ diff --git a/util/user_activity.py b/util/user_activity.py index c85f526b..8262502a 100644 --- a/util/user_activity.py +++ b/util/user_activity.py @@ -23,7 +23,7 @@ def add_user_activity_entry( global __thread_running if not __thread_running: __thread_running = True - daemon.run_without_db_token(__start_thread_db_write) + daemon.run(__start_thread_db_write) activity_set = [user_id, activity, datetime.now(), False] __write_backup_file(activity_set)