-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature rescheduleJob method #72
Merged
chrisburr
merged 14 commits into
DIRACGrid:main
from
natthan-pigoux:feature/npigoux-reschedule
Oct 19, 2023
Merged
Changes from 6 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
26f89bf
Start adding reschedule method
npigouxCTA cf05880
Continue reschedule method
npigouxCTA dabe619
add rescheduleJob method
npigouxCTA 0e1945b
add test for reschedule router
npigouxCTA c113ac4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] a24eb55
Remove OwnerDN and DiracSetup
natthan-pigoux 9559dab
merge main and resolve conflicts
natthan-pigoux c7046b3
Remove JobParameters and OptimizerParameters reference
natthan-pigoux e1582e8
Add MAXRESCHEDULING in JobStatus
natthan-pigoux 42c6d81
fix jobstatus.FAILED
natthan-pigoux 6d81526
Create MinroStatus
natthan-pigoux 9849186
Add JobMinorStatus enum
natthan-pigoux e321e41
Merge main and resolve conflicts
natthan-pigoux 1d21729
Update rescheduleJob with JobLoggingDB
natthan-pigoux File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,18 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
from datetime import datetime, timezone | ||
from typing import Any | ||
|
||
from sqlalchemy import func, insert, select, update | ||
from sqlalchemy import delete, func, insert, select, update | ||
|
||
from diracx.core.exceptions import InvalidQueryError | ||
from diracx.core.models import ScalarSearchOperator, ScalarSearchSpec | ||
from diracx.core.utils import JobStatus | ||
|
||
from ..utils import BaseDB, apply_search_filters | ||
from .schema import Base as JobDBBase | ||
from .schema import InputData, JobJDLs, Jobs | ||
from .schema import InputData, JobJDLs, JobParameters, Jobs, OptimizerParameters | ||
|
||
|
||
class JobDB(BaseDB): | ||
|
@@ -22,6 +24,11 @@ | |
# to find a way to make it dynamic | ||
jdl2DBParameters = ["JobName", "JobType", "JobGroup"] | ||
|
||
# TODO: set maxRescheduling value from CS | ||
# maxRescheduling = self.getCSOption("MaxRescheduling", 3) | ||
# For now: | ||
maxRescheduling = 3 | ||
|
||
async def summary(self, group_by, search) -> list[dict[str, str | int]]: | ||
columns = [Jobs.__table__.columns[x] for x in group_by] | ||
|
||
|
@@ -141,6 +148,20 @@ | |
) | ||
await self.conn.execute(stmt) | ||
|
||
async def getJobJDL(self, job_id: int, original: bool = False) -> str: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TBH I am not sure we have decided if there will be JDLs in DiracX... |
||
from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import extractJDL | ||
|
||
if original: | ||
stmt = select(JobJDLs.OriginalJDL).where(JobJDLs.JobID == job_id) | ||
else: | ||
stmt = select(JobJDLs.JDL).where(JobJDLs.JobID == job_id) | ||
|
||
jdl = (await self.conn.execute(stmt)).scalar_one() | ||
if jdl: | ||
jdl = extractJDL(jdl) | ||
|
||
return jdl | ||
|
||
async def insert( | ||
self, | ||
jdl, | ||
|
@@ -193,7 +214,7 @@ | |
|
||
return { | ||
"JobID": job_id, | ||
"Status": JobStatus.FAILED, | ||
"Status": JobStatus.Failed, | ||
fstagni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"MinorStatus": "Error in JDL syntax", | ||
} | ||
|
||
|
@@ -242,3 +263,149 @@ | |
"MinorStatus": initial_minor_status, | ||
"TimeStamp": datetime.now(tz=timezone.utc), | ||
} | ||
|
||
async def delete_job_parameters(self, job_id: int): | ||
stmt = delete(JobParameters).where(JobParameters.JobID == job_id) | ||
await self.conn.execute(stmt) | ||
|
||
async def delete_job_optimizer_parameters(self, job_id: int): | ||
stmt = delete(OptimizerParameters).where(OptimizerParameters.JobID == job_id) | ||
await self.conn.execute(stmt) | ||
|
||
async def rescheduleJob(self, job_id) -> dict[str, Any]: | ||
"""Reschedule given job""" | ||
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd | ||
from DIRAC.Core.Utilities.ReturnValues import SErrorException | ||
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus | ||
|
||
result = await self.search( | ||
parameters=[ | ||
"Status", | ||
"MinorStatus", | ||
"VerifiedFlag", | ||
"RescheduleCounter", | ||
"Owner", | ||
"OwnerGroup", | ||
], | ||
search=[ | ||
ScalarSearchSpec( | ||
parameter="JobID", operator=ScalarSearchOperator.EQUAL, value=job_id | ||
) | ||
], | ||
sorts=[], | ||
) | ||
if not result: | ||
raise ValueError(f"Job {job_id} not found.") | ||
|
||
jobAttrs = result[0] | ||
|
||
if "VerifiedFlag" not in jobAttrs: | ||
raise ValueError(f"Job {job_id} not found in the system") | ||
|
||
if not jobAttrs["VerifiedFlag"]: | ||
raise ValueError( | ||
f"Job {job_id} not Verified: Status {jobAttrs['Status']}, Minor Status: {jobAttrs['MinorStatus']}" | ||
) | ||
|
||
reschedule_counter = int(jobAttrs["RescheduleCounter"]) + 1 | ||
|
||
# TODO: update maxRescheduling: | ||
# self.maxRescheduling = self.getCSOption("MaxRescheduling", self.maxRescheduling) | ||
|
||
if reschedule_counter > self.maxRescheduling: | ||
logging.warn(f"Job {job_id}: Maximum number of reschedulings is reached.") | ||
self.setJobAttributes( | ||
job_id, | ||
{ | ||
"Status": JobStatus.Failed, | ||
fstagni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"MinorStatus": "Maximum of reschedulings reached", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The cleaner way would be to add this to |
||
}, | ||
) | ||
raise ValueError( | ||
f"Maximum number of reschedulings is reached: {self.maxRescheduling}" | ||
) | ||
|
||
new_job_attributes = {"RescheduleCounter": reschedule_counter} | ||
|
||
# TODO: get the job parameters from JobMonitoringClient | ||
# result = JobMonitoringClient().getJobParameters(jobID) | ||
# if result["OK"]: | ||
# parDict = result["Value"] | ||
# for key, value in parDict.get(jobID, {}).items(): | ||
# result = self.setAtticJobParameter(jobID, key, value, rescheduleCounter - 1) | ||
# if not result["OK"]: | ||
# break | ||
|
||
# Delete job in JobParameters and OptimizerParameters: | ||
await self.delete_job_parameters(job_id) | ||
await self.delete_job_optimizer_parameters(job_id) | ||
|
||
job_jdl = await self.getJobJDL(job_id, original=True) | ||
if not job_jdl.strip().startswith("["): | ||
job_jdl = f"[{job_jdl}]" | ||
|
||
classAdJob = ClassAd(job_jdl) | ||
classAdReq = ClassAd("[]") | ||
retVal = {} | ||
retVal["JobID"] = job_id | ||
|
||
classAdJob.insertAttributeInt("JobID", job_id) | ||
|
||
try: | ||
result = self._checkAndPrepareJob( | ||
job_id, | ||
classAdJob, | ||
classAdReq, | ||
jobAttrs["Owner"], | ||
jobAttrs["OwnerGroup"], | ||
new_job_attributes, | ||
classAdJob.getAttributeString("VirtualOrganization"), | ||
) | ||
except SErrorException as e: | ||
raise ValueError(e) from e | ||
|
||
priority = classAdJob.getAttributeInt("Priority") | ||
if priority is None: | ||
priority = 0 | ||
jobAttrs["UserPriority"] = priority | ||
|
||
siteList = classAdJob.getListFromExpression("Site") | ||
if not siteList: | ||
site = "ANY" | ||
elif len(siteList) > 1: | ||
site = "Multiple" | ||
else: | ||
site = siteList[0] | ||
|
||
jobAttrs["Site"] = site | ||
|
||
jobAttrs["Status"] = JobStatus.RECEIVED | ||
|
||
jobAttrs["MinorStatus"] = JobMinorStatus.RESCHEDULED | ||
|
||
jobAttrs["ApplicationStatus"] = "Unknown" | ||
|
||
jobAttrs["ApplicationNumStatus"] = 0 | ||
|
||
jobAttrs["LastUpdateTime"] = str(datetime.utcnow()) | ||
|
||
jobAttrs["RescheduleTime"] = str(datetime.utcnow()) | ||
|
||
reqJDL = classAdReq.asJDL() | ||
classAdJob.insertAttributeInt("JobRequirements", reqJDL) | ||
|
||
jobJDL = classAdJob.asJDL() | ||
|
||
# Replace the JobID placeholder if any | ||
jobJDL = jobJDL.replace("%j", str(job_id)) | ||
|
||
result = self.setJobJDL(job_id, jobJDL) | ||
|
||
result = self.setJobAttributes(job_id, jobAttrs) | ||
|
||
retVal["InputData"] = classAdJob.lookupAttribute("InputData") | ||
retVal["RescheduleCounter"] = reschedule_counter | ||
retVal["Status"] = JobStatus.RECEIVED | ||
retVal["MinorStatus"] = JobMinorStatus.RESCHEDULED | ||
|
||
return retVal | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #29 (and at the last hackathon) we discuss against adding MySQL support for
JobParameters
. Before we ratify this decision I would, at a minimum, avoid adding this here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I am TBH not sure that
OptimizerParameters
will be something that we'll need in DiracX. So, avoid adding code for it atm.