Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sweep:integration] getTasksToSubmit consider tasks inserted by 30 seconds or more #7865

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
""" Service for interacting with TransformationDB
"""
import datetime

from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
Expand Down Expand Up @@ -393,25 +394,45 @@ def export_extendTransformation(self, transName, nTasks):
types_getTasksToSubmit = [[int, str], int]

def export_getTasksToSubmit(self, transName, numTasks, site=""):
"""Get information necessary for submission for a given number of tasks for a given transformation"""
"""
Retrieve the necessary information for the submission of a specified number of tasks
for a given transformation. This includes reserving tasks to avoid race conditions.

:param int | str transName: Name of the transformation
:param int numTasks: Number of tasks to retrieve for submission
:param str site: Optional site specification
:return: S_OK Dictionary containing transformation and task submission details
"""
# Get the transformation details
res = self.transformationDB.getTransformation(transName)
if not res["OK"]:
return res
transDict = res["Value"]

submitDict = {}

# Apply a delay to avoid race conditions
older = datetime.datetime.now() - datetime.timedelta(seconds=30)

# Retrieve tasks that are ready for submission
res = self.transformationDB.getTasksForSubmission(
transName, numTasks=numTasks, site=site, statusList=["Created"]
transName, numTasks=numTasks, site=site, statusList=["Created"], older=older
)
if not res["OK"]:
return res
tasksDict = res["Value"]

# Reserve each task for submission
for taskID, taskDict in tasksDict.items():
res = self.transformationDB.reserveTask(transName, int(taskID))
if not res["OK"]:
return res
else:
submitDict[taskID] = taskDict
# Add reserved task to the submission dictionary
submitDict[taskID] = taskDict

# Add the job dictionary to the transformation details
transDict["JobDictionary"] = submitDict

return S_OK(transDict)

####################################################################
Expand Down
Loading