Skip to content

Commit

Permalink
Add session cleanup + rename daemon run
Browse files Browse the repository at this point in the history
  • Loading branch information
anmarhindi committed Oct 15, 2024
1 parent 2b20c2f commit fae18c8
Show file tree
Hide file tree
Showing 17 changed files with 24 additions and 26 deletions.
2 changes: 1 addition & 1 deletion api/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def put(self, request) -> PlainTextResponse:
return PlainTextResponse("Bad project id", status_code=400)
task_id = request.path_params["task_id"]

daemon.run_without_db_token(
daemon.run(
cognition_import_wizard.prepare_and_finalize_setup,
cognition_project_id=cognition_project_id,
task_id=task_id,
Expand Down
2 changes: 2 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
)
from util import security, clean_up
from middleware import log_storage
from submodules.model import session

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -154,3 +155,4 @@
clean_up.clean_up_disk()

log_storage.start_persist_thread()
session.start_session_cleanup_thread()
2 changes: 1 addition & 1 deletion controller/attribute/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def calculate_user_attribute_all_records(
notification.send_organization_update(
project_id=project_id, message=f"calculate_attribute:started:{attribute_id}"
)
daemon.run_without_db_token(
daemon.run(
__calculate_user_attribute_all_records,
project_id,
org_id,
Expand Down
2 changes: 1 addition & 1 deletion controller/attribute/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def run_attribute_calculation_exec_env(
)
set_progress(project_id, attribute_item, 0.05)
__containers_running[container_name] = True
daemon.run_without_db_token(
daemon.run(
read_container_logs_thread,
project_id,
container_name,
Expand Down
8 changes: 3 additions & 5 deletions controller/embedding/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ def get_recommended_encoders(is_managed: bool) -> List[Any]:


def create_embedding(project_id: str, embedding_id: str) -> None:
daemon.run_without_db_token(connector.request_embedding, project_id, embedding_id)
daemon.run(connector.request_embedding, project_id, embedding_id)


def create_embeddings_one_by_one(
project_id: str,
embeddings_ids: List[str],
) -> None:
daemon.run_without_db_token(__embed_one_by_one_helper, project_id, embeddings_ids)
daemon.run(__embed_one_by_one_helper, project_id, embeddings_ids)


def request_tensor_upload(project_id: str, embedding_id: str) -> Any:
Expand Down Expand Up @@ -320,9 +320,7 @@ def __recreate_embedding(project_id: str, embedding_id: str) -> Embedding:
general.commit()

connector.request_deleting_embedding(project_id, old_id)
daemon.run_without_db_token(
connector.request_embedding, project_id, new_embedding_item.id
)
daemon.run(connector.request_embedding, project_id, new_embedding_item.id)
return new_embedding_item


Expand Down
4 changes: 1 addition & 3 deletions controller/information_source/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ def delete_information_source(project_id: str, source_id: str) -> None:
== enums.InformationSourceType.ACTIVE_LEARNING.value
and config_service.get_config_value("is_managed")
):
daemon.run_without_db_token(
__delete_active_learner_from_inference_dir, project_id, source_id
)
daemon.run(__delete_active_learner_from_inference_dir, project_id, source_id)

information_source.delete(project_id, source_id, with_commit=True)

Expand Down
4 changes: 2 additions & 2 deletions controller/payload/payload_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def execution_pipeline(
print(traceback.format_exc())

if asynchronous:
daemon.run_without_db_token(
daemon.run(
prepare_and_run_execution_pipeline,
str(payload.id),
project_id,
Expand Down Expand Up @@ -384,7 +384,7 @@ def run_container(
)
set_payload_progress(project_id, information_source_payload, 0.05)
__containers_running[container_name] = True
daemon.run_without_db_token(
daemon.run(
read_container_logs_thread,
project_id,
container_name,
Expand Down
4 changes: 2 additions & 2 deletions controller/project/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def delete_project(project_id: str) -> None:
org_id = organization.get_id_by_project_id(project_id)
project.delete_by_id(project_id, with_commit=True)

daemon.run_without_db_token(__background_cleanup, org_id, project_id)
daemon.run(__background_cleanup, org_id, project_id)


def __background_cleanup(org_id: str, project_id: str) -> None:
Expand Down Expand Up @@ -295,7 +295,7 @@ def __get_first_data_id(project_id: str, user_id: str, huddle_type: str) -> str:

def check_in_deletion_projects() -> None:
# this is only supposed to be called during startup of the application
daemon.run_without_db_token(__check_in_deletion_projects)
daemon.run(__check_in_deletion_projects)


def __check_in_deletion_projects() -> None:
Expand Down
2 changes: 1 addition & 1 deletion controller/record/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def get_records_by_extended_search(

def delete_record(project_id: str, record_id: str) -> None:
record.delete(project_id, record_id, with_commit=True)
daemon.run_without_db_token(__reupload_embeddings, project_id)
daemon.run(__reupload_embeddings, project_id)


def delete_all_records(project_id: str) -> None:
Expand Down
6 changes: 3 additions & 3 deletions controller/record_label_association/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def create_manual_classification_label(
)
if label_source_type == enums.LabelSource.INFORMATION_SOURCE.value:
update_annotator_progress(project_id, source_id, user_id)
daemon.run_without_db_token(
daemon.run(
weak_supervision.calculate_quality_after_labeling,
project_id,
labeling_task_id,
Expand All @@ -137,7 +137,7 @@ def create_manual_classification_label(
)
if not as_gold_star:
label_ids = [str(row.id) for row in label_ids.all()]
daemon.run_without_db_token(
daemon.run(
__check_label_duplication_classification_and_react,
project_id,
record_id,
Expand Down Expand Up @@ -216,7 +216,7 @@ def create_manual_extraction_label(
)
if label_source_type == enums.LabelSource.INFORMATION_SOURCE.value:
update_annotator_progress(project_id, source_id, user_id)
daemon.run_without_db_token(
daemon.run(
weak_supervision.calculate_quality_after_labeling,
project_id,
labeling_task_id,
Expand Down
2 changes: 1 addition & 1 deletion controller/tokenization/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def delete_docbins(project_id: str, records: List[Record]) -> None:


def start_record_tokenization(project_id: str, record_id: str) -> None:
daemon.run_without_db_token(
daemon.run(
request_tokenize_record,
project_id,
record_id,
Expand Down
2 changes: 1 addition & 1 deletion controller/transfer/project_transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ def __replace_embedding_name(
)

general.commit()
daemon.run_without_db_token(
daemon.run(
__post_processing_import_threaded,
project_id,
task_id,
Expand Down
2 changes: 1 addition & 1 deletion controller/weak_supervision/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def execution_pipeline(
finally:
general.reset_ctx_token(ctx_token)

daemon.run_without_db_token(
daemon.run(
execution_pipeline,
project_id,
str(user_id),
Expand Down
2 changes: 1 addition & 1 deletion middleware/log_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def add_to_persist_queue(log_path: str, data: Dict[str, Any]):


def start_persist_thread():
daemon.run_without_db_token(__persist_log_loop)
daemon.run(__persist_log_loop)


def __persist_log_loop():
Expand Down
2 changes: 1 addition & 1 deletion submodules/model
2 changes: 1 addition & 1 deletion util/record_ide.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def run_record_ide(
f"{container.name}:/{knowledge_base_bytes_path}",
knowledge_base_tar_path,
)
daemon.run_without_db_token(cancel_container, container_name, container)
daemon.run(cancel_container, container_name, container)
__containers_running[container_name] = True
container.start()
logs_arr = [
Expand Down
2 changes: 1 addition & 1 deletion util/user_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def add_user_activity_entry(
global __thread_running
if not __thread_running:
__thread_running = True
daemon.run_without_db_token(__start_thread_db_write)
daemon.run(__start_thread_db_write)

activity_set = [user_id, activity, datetime.now(), False]
__write_backup_file(activity_set)
Expand Down

0 comments on commit fae18c8

Please sign in to comment.