Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature rescheduleJob method #72

Merged
merged 14 commits into from
Oct 19, 2023
173 changes: 170 additions & 3 deletions src/diracx/db/jobs/db.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from __future__ import annotations

import logging
from datetime import datetime, timezone
from typing import Any

from sqlalchemy import func, insert, select, update
from sqlalchemy import delete, func, insert, select, update

from diracx.core.exceptions import InvalidQueryError
from diracx.core.models import ScalarSearchOperator, ScalarSearchSpec
from diracx.core.utils import JobStatus

from ..utils import BaseDB, apply_search_filters
from .schema import Base as JobDBBase
from .schema import InputData, JobJDLs, Jobs
from .schema import InputData, JobJDLs, JobParameters, Jobs, OptimizerParameters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #29 (and at the last hackathon) we discuss against adding MySQL support for JobParameters. Before we ratify this decision I would, at a minimum, avoid adding this here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I am TBH not sure that OptimizerParameters will be something that we'll need in DiracX. So, avoid adding code for it atm.



class JobDB(BaseDB):
Expand All @@ -22,6 +24,11 @@
# to find a way to make it dynamic
jdl2DBParameters = ["JobName", "JobType", "JobGroup"]

# TODO: set maxRescheduling value from CS
# maxRescheduling = self.getCSOption("MaxRescheduling", 3)
# For now:
maxRescheduling = 3

async def summary(self, group_by, search) -> list[dict[str, str | int]]:
columns = [Jobs.__table__.columns[x] for x in group_by]

Expand Down Expand Up @@ -141,6 +148,20 @@
)
await self.conn.execute(stmt)

async def getJobJDL(self, job_id: int, original: bool = False) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I am not sure we have decided if there will be JDLs in DiracX...

from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import extractJDL

Check warning on line 152 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L152

Added line #L152 was not covered by tests

if original:
stmt = select(JobJDLs.OriginalJDL).where(JobJDLs.JobID == job_id)

Check warning on line 155 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L154-L155

Added lines #L154 - L155 were not covered by tests
else:
stmt = select(JobJDLs.JDL).where(JobJDLs.JobID == job_id)

Check warning on line 157 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L157

Added line #L157 was not covered by tests

jdl = (await self.conn.execute(stmt)).scalar_one()
if jdl:
jdl = extractJDL(jdl)

Check warning on line 161 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L159-L161

Added lines #L159 - L161 were not covered by tests

return jdl

Check warning on line 163 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L163

Added line #L163 was not covered by tests

async def insert(
self,
jdl,
Expand Down Expand Up @@ -193,7 +214,7 @@

return {
"JobID": job_id,
"Status": JobStatus.FAILED,
"Status": JobStatus.Failed,
fstagni marked this conversation as resolved.
Show resolved Hide resolved
"MinorStatus": "Error in JDL syntax",
}

Expand Down Expand Up @@ -242,3 +263,149 @@
"MinorStatus": initial_minor_status,
"TimeStamp": datetime.now(tz=timezone.utc),
}

async def delete_job_parameters(self, job_id: int):
stmt = delete(JobParameters).where(JobParameters.JobID == job_id)
await self.conn.execute(stmt)

Check warning on line 269 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L268-L269

Added lines #L268 - L269 were not covered by tests

async def delete_job_optimizer_parameters(self, job_id: int):
stmt = delete(OptimizerParameters).where(OptimizerParameters.JobID == job_id)
await self.conn.execute(stmt)

Check warning on line 273 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L272-L273

Added lines #L272 - L273 were not covered by tests

async def rescheduleJob(self, job_id) -> dict[str, Any]:
"""Reschedule given job"""
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRAC.Core.Utilities.ReturnValues import SErrorException
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus

Check warning on line 279 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L277-L279

Added lines #L277 - L279 were not covered by tests

result = await self.search(

Check warning on line 281 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L281

Added line #L281 was not covered by tests
parameters=[
"Status",
"MinorStatus",
"VerifiedFlag",
"RescheduleCounter",
"Owner",
"OwnerGroup",
],
search=[
ScalarSearchSpec(
parameter="JobID", operator=ScalarSearchOperator.EQUAL, value=job_id
)
],
sorts=[],
)
if not result:
raise ValueError(f"Job {job_id} not found.")

Check warning on line 298 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L297-L298

Added lines #L297 - L298 were not covered by tests

jobAttrs = result[0]

Check warning on line 300 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L300

Added line #L300 was not covered by tests

if "VerifiedFlag" not in jobAttrs:
raise ValueError(f"Job {job_id} not found in the system")

Check warning on line 303 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L302-L303

Added lines #L302 - L303 were not covered by tests

if not jobAttrs["VerifiedFlag"]:
raise ValueError(

Check warning on line 306 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L305-L306

Added lines #L305 - L306 were not covered by tests
f"Job {job_id} not Verified: Status {jobAttrs['Status']}, Minor Status: {jobAttrs['MinorStatus']}"
)

reschedule_counter = int(jobAttrs["RescheduleCounter"]) + 1

Check warning on line 310 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L310

Added line #L310 was not covered by tests

# TODO: update maxRescheduling:
# self.maxRescheduling = self.getCSOption("MaxRescheduling", self.maxRescheduling)

if reschedule_counter > self.maxRescheduling:
logging.warn(f"Job {job_id}: Maximum number of reschedulings is reached.")
self.setJobAttributes(

Check warning on line 317 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L315-L317

Added lines #L315 - L317 were not covered by tests
job_id,
{
"Status": JobStatus.Failed,
fstagni marked this conversation as resolved.
Show resolved Hide resolved
"MinorStatus": "Maximum of reschedulings reached",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleaner way would be to add this to JobMinorStatus module.

},
)
raise ValueError(

Check warning on line 324 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L324

Added line #L324 was not covered by tests
f"Maximum number of reschedulings is reached: {self.maxRescheduling}"
)

new_job_attributes = {"RescheduleCounter": reschedule_counter}

Check warning on line 328 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L328

Added line #L328 was not covered by tests

# TODO: get the job parameters from JobMonitoringClient
# result = JobMonitoringClient().getJobParameters(jobID)
# if result["OK"]:
# parDict = result["Value"]
# for key, value in parDict.get(jobID, {}).items():
# result = self.setAtticJobParameter(jobID, key, value, rescheduleCounter - 1)
# if not result["OK"]:
# break

# Delete job in JobParameters and OptimizerParameters:
await self.delete_job_parameters(job_id)
await self.delete_job_optimizer_parameters(job_id)

Check warning on line 341 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L340-L341

Added lines #L340 - L341 were not covered by tests

job_jdl = await self.getJobJDL(job_id, original=True)
if not job_jdl.strip().startswith("["):
job_jdl = f"[{job_jdl}]"

Check warning on line 345 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L343-L345

Added lines #L343 - L345 were not covered by tests

classAdJob = ClassAd(job_jdl)
classAdReq = ClassAd("[]")
retVal = {}
retVal["JobID"] = job_id

Check warning on line 350 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L347-L350

Added lines #L347 - L350 were not covered by tests

classAdJob.insertAttributeInt("JobID", job_id)

Check warning on line 352 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L352

Added line #L352 was not covered by tests

try:
result = self._checkAndPrepareJob(

Check warning on line 355 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L354-L355

Added lines #L354 - L355 were not covered by tests
job_id,
classAdJob,
classAdReq,
jobAttrs["Owner"],
jobAttrs["OwnerGroup"],
new_job_attributes,
classAdJob.getAttributeString("VirtualOrganization"),
)
except SErrorException as e:
raise ValueError(e) from e

Check warning on line 365 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L364-L365

Added lines #L364 - L365 were not covered by tests

priority = classAdJob.getAttributeInt("Priority")
if priority is None:
priority = 0
jobAttrs["UserPriority"] = priority

Check warning on line 370 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L367-L370

Added lines #L367 - L370 were not covered by tests

siteList = classAdJob.getListFromExpression("Site")
if not siteList:
site = "ANY"
elif len(siteList) > 1:
site = "Multiple"

Check warning on line 376 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L372-L376

Added lines #L372 - L376 were not covered by tests
else:
site = siteList[0]

Check warning on line 378 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L378

Added line #L378 was not covered by tests

jobAttrs["Site"] = site

Check warning on line 380 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L380

Added line #L380 was not covered by tests

jobAttrs["Status"] = JobStatus.RECEIVED

Check warning on line 382 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L382

Added line #L382 was not covered by tests

jobAttrs["MinorStatus"] = JobMinorStatus.RESCHEDULED

Check warning on line 384 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L384

Added line #L384 was not covered by tests

jobAttrs["ApplicationStatus"] = "Unknown"

Check warning on line 386 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L386

Added line #L386 was not covered by tests

jobAttrs["ApplicationNumStatus"] = 0

Check warning on line 388 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L388

Added line #L388 was not covered by tests

jobAttrs["LastUpdateTime"] = str(datetime.utcnow())

Check warning on line 390 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L390

Added line #L390 was not covered by tests

jobAttrs["RescheduleTime"] = str(datetime.utcnow())

Check warning on line 392 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L392

Added line #L392 was not covered by tests

reqJDL = classAdReq.asJDL()
classAdJob.insertAttributeInt("JobRequirements", reqJDL)

Check warning on line 395 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L394-L395

Added lines #L394 - L395 were not covered by tests

jobJDL = classAdJob.asJDL()

Check warning on line 397 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L397

Added line #L397 was not covered by tests

# Replace the JobID placeholder if any
jobJDL = jobJDL.replace("%j", str(job_id))

Check warning on line 400 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L400

Added line #L400 was not covered by tests

result = self.setJobJDL(job_id, jobJDL)

Check warning on line 402 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L402

Added line #L402 was not covered by tests

result = self.setJobAttributes(job_id, jobAttrs)

Check warning on line 404 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L404

Added line #L404 was not covered by tests

retVal["InputData"] = classAdJob.lookupAttribute("InputData")
retVal["RescheduleCounter"] = reschedule_counter
retVal["Status"] = JobStatus.RECEIVED
retVal["MinorStatus"] = JobMinorStatus.RESCHEDULED

Check warning on line 409 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L406-L409

Added lines #L406 - L409 were not covered by tests

return retVal

Check warning on line 411 in src/diracx/db/jobs/db.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/db/jobs/db.py#L411

Added line #L411 was not covered by tests
53 changes: 53 additions & 0 deletions src/diracx/routers/job_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,59 @@
return [{"job_id": job.job_id, "status": job.status} for job in job_update]


@router.post("/reschedule")
async def reschedule_bulk_jobs(
job_ids: Annotated[list[int], Query()],
job_db: JobDB,
user_info: Annotated[UserInfo, Depends(verify_dirac_token)],
):
rescheduled_jobs = []
# TODO:
# validJobList, invalidJobList, nonauthJobList, ownerJobList = self.jobPolicy.evaluateJobRights(
# jobList, RIGHT_RESCHEDULE
# )
# For the moment all jobs are valid:
valid_job_list = job_ids
for job_id in valid_job_list:
# TODO: delete job in TaskQueueDB
# self.taskQueueDB.deleteJob(jobID)
result = job_db.rescheduleJob(job_id)
# TODO: add reschedule logging in JobLoggingDB
# self.jobLoggingDB.addLoggingRecord(
# result["JobID"],
# status=result["Status"],
# minorStatus=result["MinorStatus"],
# applicationStatus="Unknown",
# source="JobManager",
# )
if result:
rescheduled_jobs.append(job_id)
# To uncomment when jobPolicy is setup:
# if invalid_job_list or non_auth_job_list:
# logging.error("Some jobs failed to reschedule")
# if invalid_job_list:
# logging.info(f"Invalid jobs: {invalid_job_list}")
# if non_auth_job_list:
# logging.info(f"Non authorized jobs: {nonauthJobList}")

# TODO: send jobs to OtimizationMind
# self.__sendJobsToOptimizationMind(validJobList)
return rescheduled_jobs


@router.post("/{job_id}/reschedule")
async def reschedule_single_job(
job_id: int,
job_db: JobDB,
user_info: Annotated[UserInfo, Depends(verify_dirac_token)],
):
try:
result = await job_db.rescheduleJob(job_id)
except ValueError as e:
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail=str(e)) from e
return result

Check warning on line 326 in src/diracx/routers/job_manager/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/diracx/routers/job_manager/__init__.py#L322-L326

Added lines #L322 - L326 were not covered by tests


EXAMPLE_SEARCHES = {
"Show all": {
"summary": "Show all",
Expand Down
17 changes: 17 additions & 0 deletions tests/routers/test_job_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from http import HTTPStatus

import pytest
from fastapi.testclient import TestClient

TEST_JDL = """
Arguments = "jobDescription.xml -o LogLevel=INFO";
Expand Down Expand Up @@ -208,3 +209,19 @@ def test_user_cannot_submit_multiple_jdl_if_at_least_one_of_them_is_parametric(
def test_user_without_the_normal_user_property_cannot_submit_job(admin_user_client):
res = admin_user_client.post("/jobs/", json=[TEST_JDL])
assert res.status_code == HTTPStatus.FORBIDDEN, res.json()


def test_insert_and_reschedule(normal_user_client: TestClient):
job_definitions = [TEST_JDL]
r = normal_user_client.post("/jobs/", json=job_definitions)
assert r.status_code == 200, r.json()
assert len(r.json()) == len(job_definitions)

submitted_job_ids = sorted([job_dict["JobID"] for job_dict in r.json()])

# Test /jobs/reschedule
r = normal_user_client.post(
"/jobs/reschedule",
params={"job_ids": submitted_job_ids},
)
assert r.status_code == 200, r.json()