Skip to content

Commit

Permalink
Changes to new deamon structure (#258)
Browse files Browse the repository at this point in the history
* Changes to new deamon structure

* Remove old daemon file

* fix import

* Submodule dev change
  • Loading branch information
JWittmeyer authored Oct 9, 2024
1 parent 9ddadfd commit 2b20c2f
Show file tree
Hide file tree
Showing 21 changed files with 103 additions and 99 deletions.
14 changes: 6 additions & 8 deletions api/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
from util.notification import create_notification
from submodules.model.enums import NotificationType
from submodules.model.models import UploadTask
from util import daemon, notification
from util import notification
from submodules.model import daemon
from controller.transfer.cognition.minio_upload import handle_cognition_file_upload

from controller.task_master import manager as task_master_manager
Expand Down Expand Up @@ -232,7 +233,7 @@ def put(self, request) -> PlainTextResponse:
return PlainTextResponse("Bad project id", status_code=400)
task_id = request.path_params["task_id"]

daemon.run(
daemon.run_without_db_token(
cognition_import_wizard.prepare_and_finalize_setup,
cognition_project_id=cognition_project_id,
task_id=task_id,
Expand Down Expand Up @@ -302,7 +303,7 @@ def init_file_import(task: UploadTask, project_id: str, is_global_update: bool)
cognition_preparator.prepare_cognition_import(project_id, task)
else:
transfer_manager.import_records_from_file(project_id, task)
daemon.run(
daemon.run_with_db_token(
__recalculate_missing_attributes_and_embeddings,
project_id,
str(task.user_id),
Expand Down Expand Up @@ -378,7 +379,6 @@ def __recalculate_missing_attributes_and_embeddings(
def __calculate_missing_attributes(project_id: str, user_id: str) -> None:
# wait a second to ensure that the process is started in the tokenization service
time.sleep(5)
ctx_token = general.get_ctx_token()
attributes_usable = attribute.get_all_ordered(
project_id,
True,
Expand All @@ -387,7 +387,6 @@ def __calculate_missing_attributes(project_id: str, user_id: str) -> None:
],
)
if len(attributes_usable) == 0:
general.remove_and_refresh_session(ctx_token, False)
return

# stored as list so connection results do not affect
Expand All @@ -405,7 +404,7 @@ def __calculate_missing_attributes(project_id: str, user_id: str) -> None:
i += 1
if i >= 60:
i = 0
ctx_token = general.remove_and_refresh_session(ctx_token, True)
daemon.reset_session_token_in_thread()
if tokenization.is_doc_bin_creation_running_or_queued(project_id):
time.sleep(2)
continue
Expand All @@ -420,7 +419,7 @@ def __calculate_missing_attributes(project_id: str, user_id: str) -> None:
break
if i >= 60:
i = 0
ctx_token = general.remove_and_refresh_session(ctx_token, True)
daemon.reset_session_token_in_thread()

current_att_id = attribute_ids[0]
current_att = attribute.get(project_id, current_att_id)
Expand Down Expand Up @@ -468,4 +467,3 @@ def __calculate_missing_attributes(project_id: str, user_id: str) -> None:
project_id=project_id,
message="calculate_attribute:finished:all",
)
general.remove_and_refresh_session(ctx_token, False)
6 changes: 4 additions & 2 deletions controller/attribute/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
RecordTokenizationScope,
AttributeVisibility,
)
from util import daemon, notification
from util import notification

from submodules.model import daemon

from controller.task_master import manager as task_master_manager
from submodules.model.enums import TaskType
Expand Down Expand Up @@ -246,7 +248,7 @@ def calculate_user_attribute_all_records(
notification.send_organization_update(
project_id=project_id, message=f"calculate_attribute:started:{attribute_id}"
)
daemon.run(
daemon.run_without_db_token(
__calculate_user_attribute_all_records,
project_id,
org_id,
Expand Down
19 changes: 11 additions & 8 deletions controller/attribute/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
)
from submodules.model.models import Attribute
from submodules.s3 import controller as s3
from util import daemon, notification
from util import notification
from controller.knowledge_base import util as knowledge_base
from submodules.model import enums
from submodules.model import daemon

client = docker.from_env()
image = os.getenv("AC_EXEC_ENV_IMAGE")
Expand Down Expand Up @@ -118,7 +119,7 @@ def run_attribute_calculation_exec_env(
)
set_progress(project_id, attribute_item, 0.05)
__containers_running[container_name] = True
daemon.run(
daemon.run_without_db_token(
read_container_logs_thread,
project_id,
container_name,
Expand Down Expand Up @@ -163,7 +164,7 @@ def extend_logs(
if not attribute.logs:
attribute.logs = logs
else:
all_logs = [l for l in attribute.logs]
all_logs = [ll for ll in attribute.logs]
all_logs += logs
attribute.logs = all_logs
general.commit()
Expand Down Expand Up @@ -195,7 +196,7 @@ def read_container_logs_thread(
break
if attribute_item.state == enums.AttributeState.FAILED.value:
break
if not name in __containers_running:
if name not in __containers_running:
break
try:
# timestamps included to filter out logs that have already been read
Expand All @@ -205,11 +206,13 @@ def read_container_logs_thread(
timestamps=True,
since=last_timestamp,
)
except:
except Exception:
# failsafe for containers that shut down during the read
break
current_logs = [
l for l in str(log_lines.decode("utf-8")).split("\n") if len(l.strip()) > 0
ll
for ll in str(log_lines.decode("utf-8")).split("\n")
if len(ll.strip()) > 0
]
if len(current_logs) == 0:
continue
Expand All @@ -218,8 +221,8 @@ def read_container_logs_thread(
last_timestamp = parser.parse(last_timestamp_str).replace(
tzinfo=None
) + datetime.timedelta(seconds=1)
non_progress_logs = [l for l in current_logs if "progress" not in l]
progress_logs = [l for l in current_logs if "progress" in l]
non_progress_logs = [ll for ll in current_logs if "progress" not in ll]
progress_logs = [ll for ll in current_logs if "progress" in ll]
if len(non_progress_logs) > 0:
extend_logs(project_id, attribute_item, non_progress_logs)
if len(progress_logs) == 0:
Expand Down
11 changes: 7 additions & 4 deletions controller/embedding/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from submodules.model import enums
from submodules.model.models import Embedding
from util import daemon, notification
from util import notification
from . import util
from . import connector
from .terms import TERMS_INFO
Expand All @@ -16,6 +16,7 @@
general,
project,
)
from submodules.model import daemon
from submodules.model.util import sql_alchemy_to_dict
from controller.embedding.connector import collection_on_qdrant

Expand Down Expand Up @@ -74,14 +75,14 @@ def get_recommended_encoders(is_managed: bool) -> List[Any]:


def create_embedding(project_id: str, embedding_id: str) -> None:
daemon.run(connector.request_embedding, project_id, embedding_id)
daemon.run_without_db_token(connector.request_embedding, project_id, embedding_id)


def create_embeddings_one_by_one(
project_id: str,
embeddings_ids: List[str],
) -> None:
daemon.run(__embed_one_by_one_helper, project_id, embeddings_ids)
daemon.run_without_db_token(__embed_one_by_one_helper, project_id, embeddings_ids)


def request_tensor_upload(project_id: str, embedding_id: str) -> Any:
Expand Down Expand Up @@ -319,7 +320,9 @@ def __recreate_embedding(project_id: str, embedding_id: str) -> Embedding:
general.commit()

connector.request_deleting_embedding(project_id, old_id)
daemon.run(connector.request_embedding, project_id, new_embedding_item.id)
daemon.run_without_db_token(
connector.request_embedding, project_id, new_embedding_item.id
)
return new_embedding_item


Expand Down
6 changes: 1 addition & 5 deletions controller/embedding/util.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
from controller.embedding import connector
from submodules.model import enums
from submodules.model.business_objects import agreement, embedding, general
from submodules.model.models import Embedding
from util import daemon
from submodules.model.business_objects import embedding


def has_encoder_running(project_id: str) -> bool:
Expand Down
16 changes: 6 additions & 10 deletions controller/information_source/manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import os
from typing import List, Optional
from typing import List
from controller.information_source.util import resolve_source_return_type
from submodules.model import InformationSource, LabelingTask, enums
from submodules.model.business_objects import (
Expand All @@ -10,8 +9,7 @@
)
from controller.misc import config_service
from controller.labeling_access_link import manager as link_manager
from controller.record_label_association import manager as rla_manager
from util import daemon
from submodules.model import daemon


def get_information_source(project_id: str, source_id: str) -> InformationSource:
Expand Down Expand Up @@ -65,11 +63,7 @@ def update_information_source(
) -> None:
labeling_task_item: LabelingTask = labeling_task.get(project_id, labeling_task_id)
return_type: str = resolve_source_return_type(labeling_task_item)
item = information_source.get(project_id, source_id)
new_payload_needed = (
str(item.source_code) != code or str(item.labeling_task_id) != labeling_task_id
)
item = information_source.update(
information_source.update(
project_id,
source_id,
labeling_task_id=labeling_task_id,
Expand All @@ -94,7 +88,9 @@ def delete_information_source(project_id: str, source_id: str) -> None:
== enums.InformationSourceType.ACTIVE_LEARNING.value
and config_service.get_config_value("is_managed")
):
daemon.run(__delete_active_learner_from_inference_dir, project_id, source_id)
daemon.run_without_db_token(
__delete_active_learner_from_inference_dir, project_id, source_id
)

information_source.delete(project_id, source_id, with_commit=True)

Expand Down
5 changes: 2 additions & 3 deletions controller/misc/config_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Dict, Any, Optional, Union
import requests
import json
import time
from util import daemon
from submodules.model import daemon
from util import service_requests

__config = None
Expand All @@ -28,7 +27,7 @@ def refresh_config():
)
global __config
__config = response.json()
daemon.run(invalidate_after, 3600) # one hour as failsave
daemon.run_with_db_token(invalidate_after, 3600) # one hour as failsave


def get_config_value(
Expand Down
12 changes: 5 additions & 7 deletions controller/payload/payload_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
import pytz
import json
import docker
import timeit
import traceback

# from datetime import datetime
from dateutil import parser
import datetime

from exceptions.exceptions import PayloadSchedulerError
from submodules.model import enums, events
from submodules.model import enums
from submodules.model.business_objects import (
information_source,
embedding,
Expand All @@ -26,6 +25,7 @@
project,
organization,
)
from submodules.model import daemon
from submodules.model.business_objects.embedding import get_embedding_record_ids
from submodules.model.business_objects.information_source import (
get_exclusion_record_ids,
Expand All @@ -46,7 +46,7 @@
RecordLabelAssociation,
InformationSourcePayload,
)
from util import daemon, notification
from util import notification
from submodules.s3 import controller as s3
from controller.knowledge_base import util as knowledge_base
from controller.misc import config_service
Expand Down Expand Up @@ -232,7 +232,6 @@ def execution_pipeline(
project_id,
information_source_item.name,
)
start = timeit.default_timer()
run_container(
payload_item,
project_id,
Expand Down Expand Up @@ -289,7 +288,6 @@ def execution_pipeline(
project_id,
f"payload_failed:{information_source_item.id}:{payload_item.id}:{information_source_item.type}",
)
stop = timeit.default_timer()
general.commit()

org_id = organization.get_id_by_project_id(project_id)
Expand All @@ -309,7 +307,7 @@ def execution_pipeline(
print(traceback.format_exc())

if asynchronous:
daemon.run(
daemon.run_without_db_token(
prepare_and_run_execution_pipeline,
str(payload.id),
project_id,
Expand Down Expand Up @@ -386,7 +384,7 @@ def run_container(
)
set_payload_progress(project_id, information_source_payload, 0.05)
__containers_running[container_name] = True
daemon.run(
daemon.run_without_db_token(
read_container_logs_thread,
project_id,
container_name,
Expand Down
6 changes: 3 additions & 3 deletions controller/project/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
information_source,
general,
)
from submodules.model import daemon
from fast_api.types import HuddleData, ProjectSize
from util import daemon
from controller.task_master import manager as task_master_manager
from submodules.model.enums import TaskType, RecordTokenizationScope
from submodules.model.business_objects import util as db_util
Expand Down Expand Up @@ -139,7 +139,7 @@ def delete_project(project_id: str) -> None:
org_id = organization.get_id_by_project_id(project_id)
project.delete_by_id(project_id, with_commit=True)

daemon.run(__background_cleanup, org_id, project_id)
daemon.run_without_db_token(__background_cleanup, org_id, project_id)


def __background_cleanup(org_id: str, project_id: str) -> None:
Expand Down Expand Up @@ -295,7 +295,7 @@ def __get_first_data_id(project_id: str, user_id: str, huddle_type: str) -> str:

def check_in_deletion_projects() -> None:
# this is only supposed to be called during startup of the application
daemon.run(__check_in_deletion_projects)
daemon.run_without_db_token(__check_in_deletion_projects)


def __check_in_deletion_projects() -> None:
Expand Down
6 changes: 3 additions & 3 deletions controller/record/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
)
from service.search import search
from submodules.model import enums
from submodules.model import daemon

from controller.embedding import connector as embedding_connector
from controller.record import neural_search_connector
from controller.embedding import manager as embedding_manager
from controller.tokenization import tokenization_service
from util import daemon
from util.miscellaneous_functions import chunk_list
import time
import traceback
Expand Down Expand Up @@ -109,7 +109,7 @@ def get_records_by_extended_search(

def delete_record(project_id: str, record_id: str) -> None:
record.delete(project_id, record_id, with_commit=True)
daemon.run(__reupload_embeddings, project_id)
daemon.run_without_db_token(__reupload_embeddings, project_id)


def delete_all_records(project_id: str) -> None:
Expand Down Expand Up @@ -251,7 +251,7 @@ def __check_and_prep_edit_records(
f"can't find embedding PCA for {embedding_item.name}. Try rebuilding or removing the embeddings on settings page."
)
continue
if not embedding_item.attribute_id in useable_embeddings:
if embedding_item.attribute_id not in useable_embeddings:
useable_embeddings[embedding_item.attribute_id] = []
useable_embeddings[embedding_item.attribute_id].append(embedding_item)

Expand Down
Loading

0 comments on commit 2b20c2f

Please sign in to comment.