From 36dcd209db133b89d4bd49d347bc9059a91f228d Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Tue, 28 Nov 2023 15:07:35 +0100 Subject: [PATCH 1/5] fix (diracx): clearer error message --- src/DIRAC/Core/Security/DiracX.py | 2 ++ src/DIRAC/FrameworkSystem/scripts/dirac_diracx_whoami.py | 9 ++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/DIRAC/Core/Security/DiracX.py b/src/DIRAC/Core/Security/DiracX.py index d5f8214436d..3d0f726d08e 100644 --- a/src/DIRAC/Core/Security/DiracX.py +++ b/src/DIRAC/Core/Security/DiracX.py @@ -74,6 +74,8 @@ def DiracXClient() -> _DiracClient: proxyLocation = getDefaultProxyLocation() diracxToken = diracxTokenFromPEM(proxyLocation) + if not diracxToken: + raise ValueError(f"No dirax token in the proxy file {proxyLocation}") with NamedTemporaryFile(mode="wt") as token_file: token_file.write(json.dumps(diracxToken)) diff --git a/src/DIRAC/FrameworkSystem/scripts/dirac_diracx_whoami.py b/src/DIRAC/FrameworkSystem/scripts/dirac_diracx_whoami.py index 7f425a879ea..c24c4e6dc8c 100644 --- a/src/DIRAC/FrameworkSystem/scripts/dirac_diracx_whoami.py +++ b/src/DIRAC/FrameworkSystem/scripts/dirac_diracx_whoami.py @@ -13,9 +13,12 @@ def main(): Script.parseCommandLine() - with DiracXClient() as api: - user_info = api.auth.userinfo() - print(json.dumps(user_info.as_dict(), indent=2)) + try: + with DiracXClient() as api: + user_info = api.auth.userinfo() + print(json.dumps(user_info.as_dict(), indent=2)) + except Exception as e: + print(f"Failed to access DiracX: {e}") if __name__ == "__main__": From f10524523a29d27b5b4cc2278d6a0e8c1fc01a74 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Tue, 10 Oct 2023 12:03:55 +0200 Subject: [PATCH 2/5] test: Add future client tests --- .../WorkloadManagement/Test_JobMonitoring.py | 215 ++++++++++++++++++ .../WorkloadManagement/__init__.py | 0 tests/Integration/FutureClient/__init__.py | 0 .../FutureClient/generate-test-file.py | 65 ++++++ tests/Integration/FutureClient/utils.py | 20 ++ 5 files changed, 300 insertions(+) create mode 100644 tests/Integration/FutureClient/WorkloadManagement/Test_JobMonitoring.py create mode 100644 tests/Integration/FutureClient/WorkloadManagement/__init__.py create mode 100644 tests/Integration/FutureClient/__init__.py create mode 100755 tests/Integration/FutureClient/generate-test-file.py create mode 100644 tests/Integration/FutureClient/utils.py diff --git a/tests/Integration/FutureClient/WorkloadManagement/Test_JobMonitoring.py b/tests/Integration/FutureClient/WorkloadManagement/Test_JobMonitoring.py new file mode 100644 index 00000000000..3014534ad11 --- /dev/null +++ b/tests/Integration/FutureClient/WorkloadManagement/Test_JobMonitoring.py @@ -0,0 +1,215 @@ +from functools import partial + +import pytest + +import DIRAC + +DIRAC.initialize() +from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient +from ..utils import compare_results + +TEST_JOBS = [7470, 7471, 7469] +TEST_JOB_IDS = [TEST_JOBS] + TEST_JOBS + [str(x) for x in TEST_JOBS] + + +def test_getApplicationStates(): + # JobMonitoringClient().getApplicationStates(condDict = None, older = None, newer = None) + method = JobMonitoringClient().getApplicationStates + pytest.skip() + + +def test_getAtticJobParameters(): + # JobMonitoringClient().getAtticJobParameters(jobID: int, parameters = None, rescheduleCycle = -1) + method = JobMonitoringClient().getAtticJobParameters + pytest.skip() + + +def test_getCounters(): + # JobMonitoringClient().getCounters(attrList: list, attrDict = None, cutDate = ) + method = JobMonitoringClient().getCounters + pytest.skip() + + +def test_getInputData(): + # JobMonitoringClient().getInputData(jobID: int) + method = JobMonitoringClient().getInputData + pytest.skip() + + +def test_getJobAttribute(): + # JobMonitoringClient().getJobAttribute(jobID: int, attribute: str) + method = JobMonitoringClient().getJobAttribute + pytest.skip() + + +def test_getJobAttributes(): + # JobMonitoringClient().getJobAttributes(jobID: int, attrList = None) + method = JobMonitoringClient().getJobAttributes + pytest.skip() + + +def test_getJobGroups(): + # JobMonitoringClient().getJobGroups(condDict = None, older = None, cutDate = None) + method = JobMonitoringClient().getJobGroups + pytest.skip() + + +def test_getJobHeartBeatData(): + # JobMonitoringClient().getJobHeartBeatData(jobID: int) + method = JobMonitoringClient().getJobHeartBeatData + pytest.skip() + + +def test_getJobJDL(): + # JobMonitoringClient().getJobJDL(jobID: int, original: bool) + method = JobMonitoringClient().getJobJDL + pytest.skip() + + +def test_getJobLoggingInfo(): + # JobMonitoringClient().getJobLoggingInfo(jobID: int) + method = JobMonitoringClient().getJobLoggingInfo + pytest.skip() + + +def test_getJobOptParameters(): + # JobMonitoringClient().getJobOptParameters(jobID: int) + method = JobMonitoringClient().getJobOptParameters + pytest.skip() + + +def test_getJobOwner(): + # JobMonitoringClient().getJobOwner(jobID: int) + method = JobMonitoringClient().getJobOwner + pytest.skip() + + +def test_getJobPageSummaryWeb(): + # JobMonitoringClient().getJobPageSummaryWeb(self: dict, selectDict: list, sortList: int, startItem: int, maxItems, selectJobs = True) + method = JobMonitoringClient().getJobPageSummaryWeb + pytest.skip() + + +def test_getJobParameter(): + # JobMonitoringClient().getJobParameter(jobID: str | int, parName: str) + method = JobMonitoringClient().getJobParameter + pytest.skip() + + +def test_getJobParameters(): + # JobMonitoringClient().getJobParameters(jobIDs: str | int | list, parName = None) + method = JobMonitoringClient().getJobParameters + pytest.skip() + + +def test_getJobSite(): + # JobMonitoringClient().getJobSite(jobID: int) + method = JobMonitoringClient().getJobSite + pytest.skip() + + +def test_getJobStats(): + # JobMonitoringClient().getJobStats(attribute: str, selectDict: dict) + method = JobMonitoringClient().getJobStats + pytest.skip() + + +def test_getJobSummary(): + # JobMonitoringClient().getJobSummary(jobID: int) + method = JobMonitoringClient().getJobSummary + pytest.skip() + + +def test_getJobTypes(): + # JobMonitoringClient().getJobTypes(condDict = None, older = None, newer = None) + method = JobMonitoringClient().getJobTypes + pytest.skip() + + +def test_getJobs(): + # JobMonitoringClient().getJobs(attrDict = None, cutDate = None) + method = JobMonitoringClient().getJobs + pytest.skip() + + +@pytest.mark.parametrize("jobIDs", TEST_JOB_IDS) +def test_getJobsApplicationStatus(jobIDs): + # JobMonitoringClient().getJobsApplicationStatus(jobIDs: str | int | list) + method = JobMonitoringClient().getJobsApplicationStatus + compare_results(partial(method, jobIDs)) + + +@pytest.mark.parametrize("jobIDs", TEST_JOB_IDS) +def test_getJobsMinorStatus(jobIDs): + # JobMonitoringClient().getJobsMinorStatus(jobIDs: str | int | list) + method = JobMonitoringClient().getJobsMinorStatus + compare_results(partial(method, jobIDs)) + + +def test_getJobsParameters(): + # JobMonitoringClient().getJobsParameters(jobIDs: str | int | list, parameters: list) + method = JobMonitoringClient().getJobsParameters + pytest.skip() + + +@pytest.mark.parametrize("jobIDs", TEST_JOB_IDS) +def test_getJobsSites(jobIDs): + # JobMonitoringClient().getJobsSites(jobIDs: str | int | list) + method = JobMonitoringClient().getJobsSites + compare_results(partial(method, jobIDs)) + + +@pytest.mark.parametrize("jobIDs", TEST_JOB_IDS) +def test_getJobsStates(jobIDs): + # JobMonitoringClient().getJobsStates(jobIDs: str | int | list) + method = JobMonitoringClient().getJobsStates + compare_results(partial(method, jobIDs)) + + +@pytest.mark.parametrize("jobIDs", TEST_JOB_IDS) +def test_getJobsStatus(jobIDs): + # JobMonitoringClient().getJobsStatus(jobIDs: str | int | list) + method = JobMonitoringClient().getJobsStatus + compare_results(partial(method, jobIDs)) + + +def test_getJobsSummary(): + # JobMonitoringClient().getJobsSummary(jobIDs: list) + method = JobMonitoringClient().getJobsSummary + pytest.skip() + + +def test_getMinorStates(): + # JobMonitoringClient().getMinorStates(condDict = None, older = None, newer = None) + method = JobMonitoringClient().getMinorStates + pytest.skip() + + +def test_getOwnerGroup(): + # JobMonitoringClient().getOwnerGroup() + method = JobMonitoringClient().getOwnerGroup + pytest.skip() + + +def test_getOwners(): + # JobMonitoringClient().getOwners(condDict = None, older = None, newer = None) + method = JobMonitoringClient().getOwners + pytest.skip() + + +def test_getSiteSummary(): + # JobMonitoringClient().getSiteSummary() + method = JobMonitoringClient().getSiteSummary + pytest.skip() + + +def test_getSites(): + # JobMonitoringClient().getSites(condDict = None, older = None, newer = None) + method = JobMonitoringClient().getSites + pytest.skip() + + +def test_getStates(): + # JobMonitoringClient().getStates(condDict = None, older = None, newer = None) + method = JobMonitoringClient().getStates + pytest.skip() diff --git a/tests/Integration/FutureClient/WorkloadManagement/__init__.py b/tests/Integration/FutureClient/WorkloadManagement/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/Integration/FutureClient/__init__.py b/tests/Integration/FutureClient/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/Integration/FutureClient/generate-test-file.py b/tests/Integration/FutureClient/generate-test-file.py new file mode 100755 index 00000000000..6ec2bf200f1 --- /dev/null +++ b/tests/Integration/FutureClient/generate-test-file.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +import argparse +import inspect +import importlib +from itertools import zip_longest + +BASE_EXPORTS = {"export_whoami", "export_refreshConfiguration", "export_ping", "export_echo"} + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Generate pytest stubs for testing the future client of a DIRAC system component" + ) + parser.add_argument("system", help="DIRAC system name") + parser.add_argument("component", help="DIRAC component name") + args = parser.parse_args() + main(args.system, args.component) + + +def main(system, component): + client_name = f"{component}Client" + handler = getattr( + importlib.import_module(f"DIRAC.{system}System.Service.{component}Handler"), f"{component}Handler" + ) + + print("from functools import partial") + print() + print("import pytest") + print() + print("import DIRAC") + print("DIRAC.initialize()") + print(f"from DIRAC.{system}System.Client.{client_name} import {client_name}") + print("from ..utils import compare_results") + print() + print() + + for export in dir(handler): + if not export.startswith("export_") or export in BASE_EXPORTS: + continue + method_name = export[len("export_") :] + types = getattr(handler, f"types_{method_name}") + signature = inspect.signature(getattr(handler, export)) + type_infos = [] + for parameter, dtype in zip_longest(signature.parameters.values(), types): + if dtype is None: + dtype = "" + elif isinstance(dtype, (list, tuple)): + dtype = f": {' | '.join([d.__name__ for d in dtype])}" + else: + dtype = f": {dtype.__name__}" + if parameter.default == inspect._empty: + default = "" + else: + default = f" = {parameter.default}" + type_infos += [f"{parameter.name}{dtype}{default}"] + print(f"def test_{method_name}(monkeypatch):") + print(f" # {client_name}().{method_name}({', '.join(type_infos)})") + print(f" method = {client_name}().{method_name}") + print(f" pytest.skip()") + print() + print() + + +if __name__ == "__main__": + parse_args() diff --git a/tests/Integration/FutureClient/utils.py b/tests/Integration/FutureClient/utils.py new file mode 100644 index 00000000000..9fb9afdb2b5 --- /dev/null +++ b/tests/Integration/FutureClient/utils.py @@ -0,0 +1,20 @@ +def compare_results(test_func): + """Compare the results from DIRAC and DiracX based services for a reentrant function.""" + ClientClass = test_func.func.__self__ + assert ClientClass.diracxClient, "FutureClient is not set up!" + + # Get the result from the diracx-based handler + future_result = test_func() + + # Get the result from the DIRAC-based handler + diracxClient = ClientClass.diracxClient + ClientClass.diracxClient = None + try: + old_result = test_func() + finally: + ClientClass.diracxClient = diracxClient + # We don't care about the rpcStub + old_result.pop("rpcStub") + + # Ensure the results match + assert old_result == future_result From 8110a738f712615e72889595ebfc828923d84b99 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Tue, 28 Nov 2023 18:07:16 +0100 Subject: [PATCH 3/5] feat (diracx): start JobStateUpdateClient --- .../Client/JobStateUpdateClient.py | 5 ++ .../FutureClient/JobStateUpdateClient.py | 50 ++++++++++++ .../WorkloadManagement/Test_JobStateUpdate.py | 81 +++++++++++++++++++ 3 files changed, 136 insertions(+) create mode 100644 src/DIRAC/WorkloadManagementSystem/FutureClient/JobStateUpdateClient.py create mode 100644 tests/Integration/FutureClient/WorkloadManagement/Test_JobStateUpdate.py diff --git a/src/DIRAC/WorkloadManagementSystem/Client/JobStateUpdateClient.py b/src/DIRAC/WorkloadManagementSystem/Client/JobStateUpdateClient.py index d49468f8c6f..87072b9a337 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/JobStateUpdateClient.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/JobStateUpdateClient.py @@ -1,12 +1,17 @@ """ Class that contains client access to the JobStateUpdate handler. """ from DIRAC.Core.Base.Client import Client, createClient +from DIRAC.WorkloadManagementSystem.FutureClient.JobStateUpdateClient import ( + JobStateUpdateClient as futureJobStateUpdateClient, +) @createClient("WorkloadManagement/JobStateUpdate") class JobStateUpdateClient(Client): """JobStateUpdateClient sets url for the JobStateUpdateHandler.""" + diracxClient = futureJobStateUpdateClient + def __init__(self, url=None, **kwargs): """ Sets URL for JobStateUpdate handler diff --git a/src/DIRAC/WorkloadManagementSystem/FutureClient/JobStateUpdateClient.py b/src/DIRAC/WorkloadManagementSystem/FutureClient/JobStateUpdateClient.py new file mode 100644 index 00000000000..f8f662d7b30 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/FutureClient/JobStateUpdateClient.py @@ -0,0 +1,50 @@ +from DIRAC.Core.Security.DiracX import DiracXClient +from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue + + +class JobStateUpdateClient: + def sendHeartBeat(self, jobID: str | int, dynamicData: dict, staticData: dict): + raise NotImplementedError("TODO") + + def setJobApplicationStatus(self, jobID: str | int, appStatus: str, source: str = "Unknown"): + raise NotImplementedError("TODO") + + def setJobAttribute(self, jobID: str | int, attribute: str, value: str): + with DiracXClient() as api: + api.jobs.set_single_job_properties(jobID, "need to [patch the client to have a nice summer body ?") + raise NotImplementedError("TODO") + + def setJobFlag(self, jobID: str | int, flag: str): + raise NotImplementedError("TODO") + + def setJobParameter(self, jobID: str | int, name: str, value: str): + raise NotImplementedError("TODO") + + def setJobParameters(self, jobID: str | int, parameters: list): + raise NotImplementedError("TODO") + + def setJobSite(self, jobID: str | int, site: str): + raise NotImplementedError("TODO") + + def setJobStatus( + self, + jobID: str | int, + status: str = "", + minorStatus: str = "", + source: str = "Unknown", + datetime=None, + force=False, + ): + raise NotImplementedError("TODO") + + def setJobStatusBulk(self, jobID: str | int, statusDict: dict, force=False): + raise NotImplementedError("TODO") + + def setJobsParameter(self, jobsParameterDict: dict): + raise NotImplementedError("TODO") + + def unsetJobFlag(self, jobID: str | int, flag: str): + raise NotImplementedError("TODO") + + def updateJobFromStager(self, jobID: str | int, status: str): + raise NotImplementedError("TODO") diff --git a/tests/Integration/FutureClient/WorkloadManagement/Test_JobStateUpdate.py b/tests/Integration/FutureClient/WorkloadManagement/Test_JobStateUpdate.py new file mode 100644 index 00000000000..c9fbf920367 --- /dev/null +++ b/tests/Integration/FutureClient/WorkloadManagement/Test_JobStateUpdate.py @@ -0,0 +1,81 @@ +from functools import partial + +import pytest + +import DIRAC + +DIRAC.initialize() +from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient +from ..utils import compare_results + + +def test_sendHeartBeat(monkeypatch): + # JobStateUpdateClient().sendHeartBeat(jobID: str | int, dynamicData: dict, staticData: dict) + method = JobStateUpdateClient().sendHeartBeat + pytest.skip() + + +def test_setJobApplicationStatus(monkeypatch): + # JobStateUpdateClient().setJobApplicationStatus(jobID: str | int, appStatus: str, source: str = Unknown) + method = JobStateUpdateClient().setJobApplicationStatus + pytest.skip() + + +def test_setJobAttribute(monkeypatch): + # JobStateUpdateClient().setJobAttribute(jobID: str | int, attribute: str, value: str) + method = JobStateUpdateClient().setJobAttribute + pytest.skip() + + +def test_setJobFlag(monkeypatch): + # JobStateUpdateClient().setJobFlag(jobID: str | int, flag: str) + method = JobStateUpdateClient().setJobFlag + pytest.skip() + + +def test_setJobParameter(monkeypatch): + # JobStateUpdateClient().setJobParameter(jobID: str | int, name: str, value: str) + method = JobStateUpdateClient().setJobParameter + pytest.skip() + + +def test_setJobParameters(monkeypatch): + # JobStateUpdateClient().setJobParameters(jobID: str | int, parameters: list) + method = JobStateUpdateClient().setJobParameters + pytest.skip() + + +def test_setJobSite(monkeypatch): + # JobStateUpdateClient().setJobSite(jobID: str | int, site: str) + method = JobStateUpdateClient().setJobSite + pytest.skip() + + +def test_setJobStatus(monkeypatch): + # JobStateUpdateClient().setJobStatus(jobID: str | int, status: str = , minorStatus: str = , source: str = Unknown, datetime = None, force = False) + method = JobStateUpdateClient().setJobStatus + pytest.skip() + + +def test_setJobStatusBulk(monkeypatch): + # JobStateUpdateClient().setJobStatusBulk(jobID: str | int, statusDict: dict, force = False) + method = JobStateUpdateClient().setJobStatusBulk + pytest.skip() + + +def test_setJobsParameter(monkeypatch): + # JobStateUpdateClient().setJobsParameter(jobsParameterDict: dict) + method = JobStateUpdateClient().setJobsParameter + pytest.skip() + + +def test_unsetJobFlag(monkeypatch): + # JobStateUpdateClient().unsetJobFlag(jobID: str | int, flag: str) + method = JobStateUpdateClient().unsetJobFlag + pytest.skip() + + +def test_updateJobFromStager(monkeypatch): + # JobStateUpdateClient().updateJobFromStager(jobID: str | int, status: str) + method = JobStateUpdateClient().updateJobFromStager + pytest.skip() From 3d7633188a9a3f9e3a26672f086d597d9e3e0fb4 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Wed, 29 Nov 2023 11:23:37 +0100 Subject: [PATCH 4/5] test (diracx): generate-test-file warns about potentially unused methods --- tests/Integration/FutureClient/generate-test-file.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/Integration/FutureClient/generate-test-file.py b/tests/Integration/FutureClient/generate-test-file.py index 6ec2bf200f1..e9e888bc4ad 100755 --- a/tests/Integration/FutureClient/generate-test-file.py +++ b/tests/Integration/FutureClient/generate-test-file.py @@ -3,6 +3,7 @@ import inspect import importlib from itertools import zip_longest +import subprocess BASE_EXPORTS = {"export_whoami", "export_refreshConfiguration", "export_ping", "export_echo"} @@ -23,6 +24,8 @@ def main(system, component): importlib.import_module(f"DIRAC.{system}System.Service.{component}Handler"), f"{component}Handler" ) + dirac_location = importlib.import_module("DIRAC").__path__[0] + print("from functools import partial") print() print("import pytest") @@ -53,7 +56,15 @@ def main(system, component): else: default = f" = {parameter.default}" type_infos += [f"{parameter.name}{dtype}{default}"] + + cmd_check_used = ["grep", "-R", rf"\.{method_name}(", dirac_location] + + res = subprocess.run(cmd_check_used, capture_output=True) + is_method_used = bool(res.stdout) + print(f"def test_{method_name}(monkeypatch):") + if not is_method_used: + print(f" # WARNING: possibly unused") print(f" # {client_name}().{method_name}({', '.join(type_infos)})") print(f" method = {client_name}().{method_name}") print(f" pytest.skip()") From e2a2f91c76980fc9227fbd5abc527cf9310f775a Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Thu, 30 Nov 2023 08:31:37 +0100 Subject: [PATCH 5/5] feat: Partially implement FutureClient/JobStateUpdat --- .../FutureClient/JobStateUpdateClient.py | 108 +++++++++++++++--- .../WorkloadManagement/Test_JobStateUpdate.py | 76 ++++++++++-- tests/Integration/FutureClient/utils.py | 53 ++++++--- 3 files changed, 200 insertions(+), 37 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/FutureClient/JobStateUpdateClient.py b/src/DIRAC/WorkloadManagementSystem/FutureClient/JobStateUpdateClient.py index f8f662d7b30..201376014bd 100644 --- a/src/DIRAC/WorkloadManagementSystem/FutureClient/JobStateUpdateClient.py +++ b/src/DIRAC/WorkloadManagementSystem/FutureClient/JobStateUpdateClient.py @@ -1,50 +1,132 @@ +import functools +from datetime import datetime, timezone + + from DIRAC.Core.Security.DiracX import DiracXClient from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue +from DIRAC.Core.Utilities.TimeUtilities import fromString + + +def stripValueIfOK(func): + """Decorator to remove S_OK["Value"] from the return value of a function if it is OK. + + This is done as some update functions return the number of modified rows in + the database. This likely not actually useful so it isn't supported in + DiracX. Stripping the "Value" key of the dictionary means that we should + get a fairly straight forward error if the assumption is incorrect. + """ + + @functools.wraps(func) + def wrapper(*args, **kwargs): + result = func(*args, **kwargs) + if result.get("OK"): + assert result.pop("Value") is None, "Value should be None if OK" + return result + + return wrapper class JobStateUpdateClient: + @stripValueIfOK + @convertToReturnValue def sendHeartBeat(self, jobID: str | int, dynamicData: dict, staticData: dict): - raise NotImplementedError("TODO") + print("HACK: This is a no-op until we decide what to do") + @stripValueIfOK + @convertToReturnValue def setJobApplicationStatus(self, jobID: str | int, appStatus: str, source: str = "Unknown"): - raise NotImplementedError("TODO") + statusDict = { + "application_status": appStatus, + } + if source: + statusDict["Source"] = source + with DiracXClient() as api: + api.jobs.set_single_job_status( + jobID, + {datetime.now(tz=timezone.utc): statusDict}, + ) + @stripValueIfOK + @convertToReturnValue def setJobAttribute(self, jobID: str | int, attribute: str, value: str): with DiracXClient() as api: - api.jobs.set_single_job_properties(jobID, "need to [patch the client to have a nice summer body ?") - raise NotImplementedError("TODO") + if attribute == "Status": + api.jobs.set_single_job_status( + jobID, + {datetime.now(tz=timezone.utc): {"status": value}}, + ) + else: + api.jobs.set_single_job_properties(jobID, {attribute: value}) + @stripValueIfOK + @convertToReturnValue def setJobFlag(self, jobID: str | int, flag: str): - raise NotImplementedError("TODO") + with DiracXClient() as api: + api.jobs.set_single_job_properties(jobID, {flag: True}) + @stripValueIfOK + @convertToReturnValue def setJobParameter(self, jobID: str | int, name: str, value: str): - raise NotImplementedError("TODO") + print("HACK: This is a no-op until we decide what to do") + @stripValueIfOK + @convertToReturnValue def setJobParameters(self, jobID: str | int, parameters: list): - raise NotImplementedError("TODO") + print("HACK: This is a no-op until we decide what to do") + @stripValueIfOK + @convertToReturnValue def setJobSite(self, jobID: str | int, site: str): - raise NotImplementedError("TODO") + with DiracXClient() as api: + api.jobs.set_single_job_properties(jobID, {"Site": site}) + @stripValueIfOK + @convertToReturnValue def setJobStatus( self, jobID: str | int, status: str = "", minorStatus: str = "", source: str = "Unknown", - datetime=None, + datetime_=None, force=False, ): - raise NotImplementedError("TODO") + statusDict = {} + if status: + statusDict["Status"] = status + if minorStatus: + statusDict["MinorStatus"] = minorStatus + if source: + statusDict["Source"] = source + if datetime_ is None: + datetime_ = datetime.utcnow() + with DiracXClient() as api: + api.jobs.set_single_job_status( + jobID, + {fromString(datetime_).replace(tzinfo=timezone.utc): statusDict}, + force=force, + ) + @stripValueIfOK + @convertToReturnValue def setJobStatusBulk(self, jobID: str | int, statusDict: dict, force=False): - raise NotImplementedError("TODO") + statusDict = {fromString(k).replace(tzinfo=timezone.utc): v for k, v in statusDict.items()} + with DiracXClient() as api: + api.jobs.set_job_status_bulk( + {jobID: statusDict}, + force=force, + ) + @stripValueIfOK + @convertToReturnValue def setJobsParameter(self, jobsParameterDict: dict): - raise NotImplementedError("TODO") + print("HACK: This is a no-op until we decide what to do") + @stripValueIfOK + @convertToReturnValue def unsetJobFlag(self, jobID: str | int, flag: str): - raise NotImplementedError("TODO") + with DiracXClient() as api: + api.jobs.set_single_job_properties(jobID, {flag: False}) def updateJobFromStager(self, jobID: str | int, status: str): raise NotImplementedError("TODO") diff --git a/tests/Integration/FutureClient/WorkloadManagement/Test_JobStateUpdate.py b/tests/Integration/FutureClient/WorkloadManagement/Test_JobStateUpdate.py index c9fbf920367..2ddebce69d3 100644 --- a/tests/Integration/FutureClient/WorkloadManagement/Test_JobStateUpdate.py +++ b/tests/Integration/FutureClient/WorkloadManagement/Test_JobStateUpdate.py @@ -1,12 +1,45 @@ +from datetime import datetime from functools import partial +from textwrap import dedent import pytest import DIRAC DIRAC.initialize() +from DIRAC.Core.Security.DiracX import DiracXClient from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient -from ..utils import compare_results +from ..utils import compare_results2 + +test_jdl = """ +Arguments = "Hello world from DiracX"; +Executable = "echo"; +JobGroup = jobGroup; +JobName = jobName; +JobType = User; +LogLevel = INFO; +MinNumberOfProcessors = 1000; +OutputSandbox = + { + std.err, + std.out + }; +Priority = 1; +Sites = ANY; +StdError = std.err; +StdOutput = std.out; +""" + + +@pytest.fixture() +def example_jobids(): + from DIRAC.Interfaces.API.Dirac import Dirac + from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise + + d = Dirac() + job_id_1 = returnValueOrRaise(d.submitJob(test_jdl)) + job_id_2 = returnValueOrRaise(d.submitJob(test_jdl)) + return job_id_1, job_id_2 def test_sendHeartBeat(monkeypatch): @@ -15,16 +48,22 @@ def test_sendHeartBeat(monkeypatch): pytest.skip() -def test_setJobApplicationStatus(monkeypatch): +def test_setJobApplicationStatus(monkeypatch, example_jobids): # JobStateUpdateClient().setJobApplicationStatus(jobID: str | int, appStatus: str, source: str = Unknown) method = JobStateUpdateClient().setJobApplicationStatus - pytest.skip() + args = ["MyApplicationStatus"] + test_func1 = partial(method, example_jobids[0], *args) + test_func2 = partial(method, example_jobids[1], *args) + compare_results2(monkeypatch, test_func1, test_func2) -def test_setJobAttribute(monkeypatch): +@pytest.mark.parametrize("args", [["Status", "Killed"], ["JobGroup", "newJobGroup"]]) +def test_setJobAttribute(monkeypatch, example_jobids, args): # JobStateUpdateClient().setJobAttribute(jobID: str | int, attribute: str, value: str) method = JobStateUpdateClient().setJobAttribute - pytest.skip() + test_func1 = partial(method, example_jobids[0], *args) + test_func2 = partial(method, example_jobids[1], *args) + compare_results2(monkeypatch, test_func1, test_func2) def test_setJobFlag(monkeypatch): @@ -45,22 +84,37 @@ def test_setJobParameters(monkeypatch): pytest.skip() -def test_setJobSite(monkeypatch): +@pytest.mark.parametrize("jobid_type", [int, str]) +def test_setJobSite(monkeypatch, example_jobids, jobid_type): # JobStateUpdateClient().setJobSite(jobID: str | int, site: str) method = JobStateUpdateClient().setJobSite - pytest.skip() + args = ["LCG.CERN.ch"] + test_func1 = partial(method, jobid_type(example_jobids[0]), *args) + test_func2 = partial(method, jobid_type(example_jobids[1]), *args) + compare_results2(monkeypatch, test_func1, test_func2) -def test_setJobStatus(monkeypatch): +def test_setJobStatus(monkeypatch, example_jobids): # JobStateUpdateClient().setJobStatus(jobID: str | int, status: str = , minorStatus: str = , source: str = Unknown, datetime = None, force = False) method = JobStateUpdateClient().setJobStatus - pytest.skip() + args = ["", "My Minor"] + test_func1 = partial(method, example_jobids[0], *args) + test_func2 = partial(method, example_jobids[1], *args) + compare_results2(monkeypatch, test_func1, test_func2) -def test_setJobStatusBulk(monkeypatch): +def test_setJobStatusBulk(monkeypatch, example_jobids): # JobStateUpdateClient().setJobStatusBulk(jobID: str | int, statusDict: dict, force = False) method = JobStateUpdateClient().setJobStatusBulk - pytest.skip() + args = [ + { + datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"): {"ApplicationStatus": "SomethingElse"}, + datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"): {"ApplicationStatus": "Something"}, + } + ] + test_func1 = partial(method, example_jobids[0], *args) + test_func2 = partial(method, example_jobids[1], *args) + compare_results2(monkeypatch, test_func1, test_func2) def test_setJobsParameter(monkeypatch): diff --git a/tests/Integration/FutureClient/utils.py b/tests/Integration/FutureClient/utils.py index 9fb9afdb2b5..fe1b08636b9 100644 --- a/tests/Integration/FutureClient/utils.py +++ b/tests/Integration/FutureClient/utils.py @@ -1,20 +1,47 @@ -def compare_results(test_func): +import time + + +def compare_results(monkeypatch, test_func): """Compare the results from DIRAC and DiracX based services for a reentrant function.""" - ClientClass = test_func.func.__self__ - assert ClientClass.diracxClient, "FutureClient is not set up!" + compare_results2(monkeypatch, test_func, test_func) + +def compare_results2(monkeypatch, test_func1, test_func2): + """Compare the results from DIRAC and DiracX based services for two functions which should behave identically.""" # Get the result from the diracx-based handler - future_result = test_func() + start = time.monotonic() + with monkeypatch.context() as m: + m.setattr("DIRAC.Core.Tornado.Client.ClientSelector.useLegacyAdapter", lambda *_: True) + try: + future_result = test_func1() + except Exception as e: + future_result = e + else: + assert "rpcStub" not in future_result, "rpcStub should never be present when using DiracX!" + diracx_duration = time.monotonic() - start # Get the result from the DIRAC-based handler - diracxClient = ClientClass.diracxClient - ClientClass.diracxClient = None - try: - old_result = test_func() - finally: - ClientClass.diracxClient = diracxClient - # We don't care about the rpcStub + start = time.monotonic() + with monkeypatch.context() as m: + m.setattr("DIRAC.Core.Tornado.Client.ClientSelector.useLegacyAdapter", lambda *_: False) + old_result = test_func2() + assert "rpcStub" in old_result, "rpcStub should always be present when using legacy DIRAC!" + legacy_duration = time.monotonic() - start + + # We don't care about the rpcStub or Errno old_result.pop("rpcStub") + old_result.pop("Errno", None) + + if not old_result["OK"]: + assert not future_result["OK"], "FutureClient should have failed too!" + elif "Value" in future_result: + # Ensure the results match exactly + assert old_result == future_result + else: + # See the "stripValueIfOK" decorator for explanation + assert old_result["OK"] == future_result["OK"] + # assert isinstance(old_result["Value"], int) - # Ensure the results match - assert old_result == future_result + # if 3 * legacy_duration < diracx_duration: + # print(f"Legacy DIRAC took {legacy_duration:.3f}s, FutureClient took {diracx_duration:.3f}s") + # assert False, "FutureClient should be faster than legacy DIRAC!"