diff --git a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py index 0f0e313e728..8bb87544f56 100644 --- a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py +++ b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py @@ -1,5 +1,6 @@ """ Service for interacting with TransformationDB """ +import datetime from DIRAC import S_ERROR, S_OK from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations @@ -393,25 +394,45 @@ def export_extendTransformation(self, transName, nTasks): types_getTasksToSubmit = [[int, str], int] def export_getTasksToSubmit(self, transName, numTasks, site=""): - """Get information necessary for submission for a given number of tasks for a given transformation""" + """ + Retrieve the necessary information for the submission of a specified number of tasks + for a given transformation. This includes reserving tasks to avoid race conditions. + + :param int | str transName: Name of the transformation + :param int numTasks: Number of tasks to retrieve for submission + :param str site: Optional site specification + :return: S_OK Dictionary containing transformation and task submission details + """ + # Get the transformation details res = self.transformationDB.getTransformation(transName) if not res["OK"]: return res transDict = res["Value"] + submitDict = {} + + # Apply a delay to avoid race conditions + older = datetime.datetime.now() - datetime.timedelta(seconds=30) + + # Retrieve tasks that are ready for submission res = self.transformationDB.getTasksForSubmission( - transName, numTasks=numTasks, site=site, statusList=["Created"] + transName, numTasks=numTasks, site=site, statusList=["Created"], older=older ) if not res["OK"]: return res tasksDict = res["Value"] + + # Reserve each task for submission for taskID, taskDict in tasksDict.items(): res = self.transformationDB.reserveTask(transName, int(taskID)) if not res["OK"]: return res - else: - submitDict[taskID] = taskDict + # Add reserved task to the submission dictionary + submitDict[taskID] = taskDict + + # Add the job dictionary to the transformation details transDict["JobDictionary"] = submitDict + return S_OK(transDict) ####################################################################