diff --git a/qcfractal/qcfractal/components/tasks/socket.py b/qcfractal/qcfractal/components/tasks/socket.py index 21c4889ef..85fb6d847 100644 --- a/qcfractal/qcfractal/components/tasks/socket.py +++ b/qcfractal/qcfractal/components/tasks/socket.py @@ -190,6 +190,9 @@ def update_finished( manager.failures += len(tasks_failures) manager.rejected += len(tasks_rejected) + # Mark that we have heard from the manager + manager.modified_on = now_at_utc() + # Automatically reset ones that should be reset if self.root_socket.qcf_config.auto_reset.enabled and to_be_reset: self._logger.info(f"Auto resetting {len(to_be_reset)} records") @@ -341,6 +344,9 @@ def claim_tasks( manager.claimed += len(found) + # Mark that we have heard from the manager + manager.modified_on = now_at_utc() + self._logger.info(f"Manager {manager_name} has claimed {len(found)} new tasks") return found diff --git a/qcfractal/qcfractal/components/tasks/test_socket_claim.py b/qcfractal/qcfractal/components/tasks/test_socket_claim.py index 86d6bdbc3..440cdb1d0 100644 --- a/qcfractal/qcfractal/components/tasks/test_socket_claim.py +++ b/qcfractal/qcfractal/components/tasks/test_socket_claim.py @@ -4,11 +4,11 @@ from __future__ import annotations -from qcarchivetesting.testing_classes import QCATestingSnowflake from typing import TYPE_CHECKING import pytest +from qcarchivetesting.testing_classes import QCATestingSnowflake from qcfractal.components.managers.db_models import ComputeManagerORM from qcfractal.components.optimization.testing_helpers import load_test_data as load_opt_test_data from qcfractal.components.record_db_models import BaseRecordORM @@ -18,9 +18,11 @@ generate_task_key as generate_td_task_key, ) from qcfractal.testing_helpers import run_service +from qcfractalcompute.compress import compress_result from qcportal.exceptions import ComputeManagerError from qcportal.managers import ManagerName from qcportal.record_models import PriorityEnum +from qcportal.utils import now_at_utc if TYPE_CHECKING: from qcfractal.db_socket import SQLAlchemySocket @@ -35,6 +37,64 @@ input_spec_7, molecule_7, result_data_7 = load_sp_test_data("sp_rdkit_benzene_energy") +def test_task_socket_update_manager_time(storage_socket: SQLAlchemySocket, session: Session): + # Test that claiming & returning updates the modified_on time of the manager + mname1 = ManagerName(cluster="test_cluster", hostname="a_host1", uuid="1234-5678-1234-5678") + + mprog1 = {"qcengine": ["unknown"], "psi4": ["unknown"], "geometric": ["v3.0"]} + + mid_1 = storage_socket.managers.activate( + name_data=mname1, + manager_version="v2.0", + username="bill", + programs=mprog1, + tags=["tag1"], + ) + + meta, id_1 = storage_socket.records.singlepoint.add( + [molecule_1], input_spec_1, "tag1", PriorityEnum.low, None, None, True + ) + + # claim up to two tasks + t0 = now_at_utc() + tasks = storage_socket.tasks.claim_tasks(mname1.fullname, mprog1, ["tag1"], 2) + t1 = now_at_utc() + assert len(tasks) == 1 + + # Check assignments + session.expire_all() + + m1 = session.get(ComputeManagerORM, mid_1) + assert m1.claimed == 1 + assert t0 < m1.modified_on < t1 + + # Claiming but getting nothing also updates the time + t0 = now_at_utc() + tasks0 = storage_socket.tasks.claim_tasks(mname1.fullname, mprog1, ["tag1"], 2) + t1 = now_at_utc() + assert len(tasks0) == 0 + + session.expire_all() + m1 = session.get(ComputeManagerORM, mid_1) + assert m1.claimed == 1 + assert t0 < m1.modified_on < t1 + + # Return the task data + t0 = now_at_utc() + rmeta = storage_socket.tasks.update_finished( + mname1.fullname, + { + tasks[0]["id"]: compress_result(result_data_1.dict()), + }, + ) + t1 = now_at_utc() + + assert rmeta.n_accepted == 1 + session.expire_all() + m1 = session.get(ComputeManagerORM, mid_1) + assert t0 < m1.modified_on < t1 + + def test_task_socket_claim_mixed(storage_socket: SQLAlchemySocket, session: Session): mname1 = ManagerName(cluster="test_cluster", hostname="a_host1", uuid="1234-5678-1234-5678") mname2 = ManagerName(cluster="test_cluster", hostname="a_host2", uuid="2234-5678-1234-5678")