From b4657d51df823fdc6349d0bfbacdaea26067188b Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Thu, 9 Jan 2025 15:12:32 +0100 Subject: [PATCH] refactor DAG creation and submission. Fix #8892 --- scripts/CMSRunAnalysis.py | 41 +- scripts/TweakPSet.py | 4 +- .../TaskWorker/Actions/DagmanCreator.py | 621 ++++++++---------- .../TaskWorker/Actions/DagmanResubmitter.py | 3 +- .../TaskWorker/Actions/DagmanSubmitter.py | 344 +++++----- src/python/TaskWorker/Actions/PreJob.py | 9 - 6 files changed, 463 insertions(+), 559 deletions(-) diff --git a/scripts/CMSRunAnalysis.py b/scripts/CMSRunAnalysis.py index e5d88006ca..c99c1796e5 100644 --- a/scripts/CMSRunAnalysis.py +++ b/scripts/CMSRunAnalysis.py @@ -288,10 +288,8 @@ def parseArgs(): parser = PassThroughOptionParser() parser.add_option('--jobId', dest='jobId', type='string') parser.add_option('--json', dest='jsonArgFile', type='string') - parser.add_option('-a', dest='archiveJob', type='string') - parser.add_option('-o', dest='outFiles', type='string') - parser.add_option('--inputFile', dest='inputFile', type='string') - parser.add_option('--sourceURL', dest='sourceURL', type='string') + parser.add_option('--userSandbox', dest='userSandbox', type='string') + parser.add_option('--inputFileList', dest='inputFileList', type='string') parser.add_option('--jobNumber', dest='jobNumber', type='string') parser.add_option('--cmsswVersion', dest='cmsswVersion', type='string') parser.add_option('--scramArch', dest='scramArch', type='string') @@ -334,19 +332,6 @@ def parseArgs(): for key, value in arguments.items(): setattr(opts, key, value) - # remap key in input_args.json to the argument names required by CMSRunAnalysis.py - # use as : value_of_argument_name = inputArgs[argMap[argument_name]] - # to ease transition to cleaner code the new key are only added if missing - argMap = { - 'archiveJob': 'CRAB_Archive', 'outFiles': 'CRAB_AdditionalOutputFiles', - 'sourceURL': 'CRAB_ISB', 'cmsswVersion': 'CRAB_JobSW', - 'scramArch': 'CRAB_JobArch', 'runAndLumis': 'runAndLumiMask', - 'inputFile' : 'inputFiles', 'lheInputFiles': 'lheInputFiles' - } - for key, value in argMap.items(): - if not getattr(opts, key, None): - setattr(opts, key, arguments[value]) # assign to our variables - # allow for most input arguments to be passed via a (job specific) JSON file if getattr(opts, 'jsonArgFile', None): arguments = {} @@ -366,13 +351,11 @@ def parseArgs(): try: print(f"==== Parameters Dump at {UTCNow()} ===") - print("archiveJob: ", opts.archiveJob) - print("sourceURL: ", opts.sourceURL) + print("userSandbox: ", opts.userSandbox) print("jobNumber: ", opts.jobNumber) print("cmsswVersion: ", opts.cmsswVersion) print("scramArch: ", opts.scramArch) - print("inputFile ", opts.inputFile) - print("outFiles: ", opts.outFiles) + print("inputFileList ", opts.inputFileList) print("runAndLumis: ", opts.runAndLumis) print("lheInputFiles: ", opts.lheInputFiles) print("firstEvent: ", opts.firstEvent) @@ -697,7 +680,7 @@ def compareBrachListWithReference(branchList, tier): print(f"==== SCRAM Obj INITIALIZED at {UTCNow()} ====") print("==== Extract user sandbox in top and CMSSW directory ====") - extractUserSandbox(options.archiveJob) + extractUserSandbox(options.userSandbox) #Multi-microarch env: Setting cmssw env after extracting the sandbox print(f"==== SCRAM runtime environment CREATED at {UTCNow()} ====") @@ -901,19 +884,7 @@ def compareBrachListWithReference(branchList, tier): mintime() sys.exit(EC_ReportHandlingErr) - # rename output files. Doing this after checksums otherwise outfile is not found. - if jobExitCode == 0: - try: - oldName = 'UNKNOWN' - newName = 'UNKNOWN' - for oldName, newName in literal_eval(options.outFiles).items(): - os.rename(oldName, newName) - except Exception as ex: # pylint: disable=broad-except - handleException("FAILED", EC_MoveOutErr, f"Exception while renaming file {oldName} to {newName}.") - mintime() - sys.exit(EC_MoveOutErr) - else: - mintime() + mintime() print(f"==== CMSRunAnalysis.py FINISHED at {UTCNow()} ====") print(f"Local time : {time.ctime()}") diff --git a/scripts/TweakPSet.py b/scripts/TweakPSet.py index 756a3d38ee..734cb9b8c4 100644 --- a/scripts/TweakPSet.py +++ b/scripts/TweakPSet.py @@ -121,8 +121,8 @@ def createScriptLines(opts, pklIn): if opts.runAndLumis: runAndLumis = readFileFromTarball(opts.runAndLumis, 'run_and_lumis.tar.gz') inputFiles = {} - if opts.inputFile: - inputFiles = readFileFromTarball(opts.inputFile, 'input_files.tar.gz') + if opts.inputFileList: + inputFiles = readFileFromTarball(opts.inputFileList, 'input_files.tar.gz') # build a tweak object with the needed changes to be applied to PSet tweak = PSetTweak() diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 3d010eb1e1..5bf02b02b4 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -28,16 +28,17 @@ from TaskWorker.Actions.Splitter import SplittingSummary from TaskWorker.WorkerExceptions import TaskWorkerException, SubmissionRefusedException from RucioUtils import getWritePFN -from CMSGroupMapper import get_egroup_users +from CMSGroupMapper import get_egroup_users, map_user_to_groups -import WMCore.WMSpec.WMTask from WMCore import Lexicon from WMCore.Services.CRIC.CRIC import CRIC from WMCore.WMRuntime.Tools.Scram import ARCH_TO_OS, SCRAM_TO_ARCH if 'useHtcV2' in os.environ: + import htcondor2 as htcondor import classad2 as classad else: + import htcondor import classad DAG_HEADER = """ @@ -50,6 +51,7 @@ """ + DAG_FRAGMENT = """ JOB Job{count} Job.{count}.submit SCRIPT {prescriptDefer} PRE Job{count} dag_bootstrap.sh PREJOB $RETRY {count} {taskname} {backend} {stage} @@ -65,135 +67,13 @@ ABORT-DAG-ON Job{count} 3 """ + SUBDAG_FRAGMENT = """ SUBDAG EXTERNAL Job{count}SubJobs RunJobs{count}.subdag NOOP SCRIPT DEFER 4 300 PRE Job{count}SubJobs dag_bootstrap.sh PREDAG {stage} {completion} {count} """ -JOB_SUBMIT = \ -""" -+CRAB_ReqName = %(requestname)s -+CRAB_Workflow = %(workflow)s -+CMS_JobType = %(jobtype)s -+CRAB_JobSW = %(jobsw)s -+CRAB_JobArch = %(jobarch)s -+CRAB_DBSURL = %(dbsurl)s -+CRAB_PostJobStatus = "NOT RUN" -+CRAB_PostJobLastUpdate = 0 -+CRAB_PublishName = %(publishname)s -+CRAB_Publish = %(publication)s -+CRAB_PublishDBSURL = %(publishdbsurl)s -+CRAB_ISB = %(cacheurl)s -+CRAB_AdditionalOutputFiles = %(addoutputfiles)s -+CRAB_EDMOutputFiles = %(edmoutfiles)s -+CRAB_TFileOutputFiles = %(tfileoutfiles)s -+CRAB_UserDN = %(userdn)s -+CRAB_UserHN = %(userhn)s -+CRAB_AsyncDest = %(asyncdest)s -+CRAB_StageoutPolicy = %(stageoutpolicy)s -+CRAB_UserRole = %(tm_user_role)s -+CRAB_UserGroup = %(tm_user_group)s -+CRAB_TaskWorker = %(worker_name)s -+CRAB_RetryOnASOFailures = %(retry_aso)s -+CRAB_ASOTimeout = %(aso_timeout)s -+CRAB_RestHost = %(resthost)s -+CRAB_DbInstance = %(dbinstance)s -+CRAB_NumAutomJobRetries = %(numautomjobretries)s -CRAB_Attempt = %(attempt)d -CRAB_ISB = %(cacheurl_flatten)s -CRAB_AdditionalOutputFiles = %(addoutputfiles_flatten)s -CRAB_JobSW = %(jobsw_flatten)s -CRAB_JobArch = %(jobarch_flatten)s -CRAB_Archive = %(cachefilename_flatten)s -CRAB_Id = $(count) -+CRAB_Id = "$(count)" -+CRAB_JobCount = %(jobcount)d -+CRAB_OutTempLFNDir = "%(temp_dest)s" -+CRAB_OutLFNDir = "%(output_dest)s" -+CRAB_oneEventMode = %(oneEventMode)s -+CRAB_PrimaryDataset = %(primarydataset)s -+CRAB_DAGType = "Job" -accounting_group = %(accounting_group)s -accounting_group_user = %(accounting_group_user)s -+CRAB_SubmitterIpAddr = %(submitter_ip_addr)s -+CRAB_TaskLifetimeDays = %(task_lifetime_days)s -+CRAB_TaskEndTime = %(task_endtime)s -+CRAB_SplitAlgo = %(splitalgo)s -+CRAB_AlgoArgs = %(algoargs)s -+CMS_WMTool = %(cms_wmtool)s -+CMS_TaskType = %(cms_tasktype)s -+CMS_SubmissionTool = "CRAB" -+CMS_Type = %(cms_type)s - - -# These attributes help gWMS decide what platforms this job can run on; see https://twiki.cern.ch/twiki/bin/view/CMSPublic/CompOpsMatchArchitecture -+REQUIRED_ARCH = %(required_arch)s -+REQUIRED_MINIMUM_MICROARCH = %(required_minimum_microarch)s -+DESIRED_CMSDataset = %(inputdata)s - -+JOBGLIDEIN_CMSSite = "$$([ifThenElse(GLIDEIN_CMSSite is undefined, \\"Unknown\\", GLIDEIN_CMSSite)])" -job_ad_information_attrs = MATCH_EXP_JOBGLIDEIN_CMSSite, JOBGLIDEIN_CMSSite, RemoteSysCpu, RemoteUserCpu - -# Recover job output and logs on eviction events; make sure they aren't spooled -# This requires 8.1.6 or later (https://htcondor-wiki.cs.wisc.edu/index.cgi/tktview?tn=4292) -# This allows us to return stdout to users when they hit memory limits (which triggers PeriodicRemove). -WhenToTransferOutput = ON_EXIT_OR_EVICT -+SpoolOnEvict = false - -# Keep job in the queue upon completion long enough for the postJob to run, -# allowing the monitoring script to fetch the postJob status and job exit-code updated by the postJob -LeaveJobInQueue = ifThenElse((JobStatus=?=4 || JobStatus=?=3) && (time() - EnteredCurrentStatus < 30 * 60*60), true, false) - -universe = vanilla -Executable = gWMS-CMSRunAnalysis.sh -Output = job_out.$(CRAB_Id) -Error = job_err.$(CRAB_Id) -Log = job_log -# args changed... - -Arguments = "--jobId=$(CRAB_Id)" - -transfer_input_files = CMSRunAnalysis.sh, cmscp.py%(additional_input_file)s -transfer_output_files = jobReport.json.$(count), WMArchiveReport.json.$(count) -# make sure coredump (if any) is not added to output files ref: https://lists.cs.wisc.edu/archive/htcondor-users/2022-September/msg00052.shtml -coresize = 0 -# TODO: fold this into the config file instead of hardcoding things. -Environment = "SCRAM_ARCH=$(CRAB_JobArch) %(additional_environment_options)s" -should_transfer_files = YES -#x509userproxy = %(x509up_file)s -use_x509userproxy = true -%(opsys_req)s -Requirements = stringListMember(TARGET.Arch, REQUIRED_ARCH) -# Ref: https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode -periodic_release = (HoldReasonCode == 28) || (HoldReasonCode == 30) || (HoldReasonCode == 13) || (HoldReasonCode == 6) -# Remove if -# a) job is in the 'held' status for more than 7 minutes -# b) job is idle more than 7 days -# c) job is running and one of: -# 1) Over memory use -# 2) Over wall clock limit -# 3) Over disk usage of N GB, which is set in ServerUtilities -# d) job is idle and users proxy expired 1 day ago. (P.S. why 1 day ago? because there is recurring action which is updating user proxy and lifetime.) -# == If New periodic remove expression is added, also it should have Periodic Remove Reason. Otherwise message will not be clear and it is hard to debug -periodic_remove = ((JobStatus =?= 5) && (time() - EnteredCurrentStatus > 7*60)) || \ - ((JobStatus =?= 1) && (time() - EnteredCurrentStatus > 7*24*60*60)) || \ - ((JobStatus =?= 2) && ( \ - (MemoryUsage =!= UNDEFINED && MemoryUsage > RequestMemory) || \ - (MaxWallTimeMinsRun*60 < time() - EnteredCurrentStatus) || \ - (DiskUsage > %(max_disk_space)s))) || \ - (time() > CRAB_TaskEndTime) || \ - ((JobStatus =?= 1) && (time() > (x509UserProxyExpiration + 86400))) -+PeriodicRemoveReason = ifThenElse(time() - EnteredCurrentStatus > 7*24*60*60 && isUndefined(MemoryUsage), "Removed due to idle time limit", \ - ifThenElse(time() > x509UserProxyExpiration, "Removed job due to proxy expiration", \ - ifThenElse(MemoryUsage > RequestMemory, "Removed due to memory use", \ - ifThenElse(MaxWallTimeMinsRun*60 < time() - EnteredCurrentStatus, "Removed due to wall clock limit", \ - ifThenElse(DiskUsage > %(max_disk_space)s, "Removed due to disk usage", \ - ifThenElse(time() > CRAB_TaskEndTime, "Removed due to reached CRAB_TaskEndTime", \ - "Removed due to job being held")))))) -%(accelerator_jdl)s -%(extra_jdl)s -queue -""" + SPLIT_ARG_MAP = {"Automatic": "minutes_per_job", "LumiBased": "lumis_per_job", "EventBased": "events_per_job", @@ -241,6 +121,7 @@ def makeLFNPrefixes(task): return temp_dest, dest + def validateLFNs(path, outputFiles): """ validate against standard Lexicon the LFN's that this task will try to publish in DBS @@ -253,9 +134,9 @@ def validateLFNs(path, outputFiles): dirCounter = '0001' # need to be same length as 'counter' used later in makeDagSpecs for origFile in outputFiles: - info = origFile.rsplit(".", 1) - if len(info) == 2: # filename ends with ., put jobId before the dot - fileName = f"{info[0]}_{jobId}.{info[1]}" + nameParts = origFile.rsplit(".", 1) + if len(nameParts) == 2: # filename ends with ., put jobId before the dot + fileName = f"{nameParts[0]}_{jobId}.{nameParts[1]}" else: fileName = f"{origFile}_{jobId}" testLfn = os.path.join(path, dirCounter, fileName) @@ -267,6 +148,7 @@ def validateLFNs(path, outputFiles): msg += "\n and therefore can not be handled in our DataBase" raise SubmissionRefusedException(msg) + def validateUserLFNs(path, outputFiles): """ validate against standard Lexicon a user-defined LFN which will not go in DBS, but still needs to be sane @@ -279,9 +161,9 @@ def validateUserLFNs(path, outputFiles): dirCounter = '0001' # need to be same length as 'counter' used later in makeDagSpecs for origFile in outputFiles: - info = origFile.rsplit(".", 1) - if len(info) == 2: # filename ends with ., put jobId before the dot - fileName = f"{info[0]}_{jobId}.{info[1]}" + nameParts = origFile.rsplit(".", 1) + if len(nameParts) == 2: # filename ends with ., put jobId before the dot + fileName = f"{nameParts[0]}_{jobId}.{nameParts[1]}" else: fileName = f"{origFile}_{jobId}" testLfn = os.path.join(path, dirCounter, fileName) @@ -293,58 +175,6 @@ def validateUserLFNs(path, outputFiles): msg += "\n and therefore can not be handled in our DataBase" raise SubmissionRefusedException(msg) -def transform_strings(data): - """ - Converts the arguments in the data dictionary to the arguments necessary - for the job submit file string. - """ - info = {} - for var in 'workflow', 'jobtype', 'jobsw', 'jobarch', 'inputdata', 'primarydataset', 'splitalgo', 'algoargs', \ - 'cachefilename', 'cacheurl', 'userhn', 'publishname', 'asyncdest', 'dbsurl', 'publishdbsurl', \ - 'userdn', 'requestname', 'oneEventMode', 'tm_user_vo', 'tm_user_role', 'tm_user_group', \ - 'tm_maxmemory', 'tm_numcores', 'tm_maxjobruntime', 'tm_priority', \ - 'stageoutpolicy', 'taskType', 'worker_name', 'cms_wmtool', 'cms_tasktype', 'cms_type', \ - 'required_arch', 'required_minimum_microarch', 'resthost', 'dbinstance', 'submitter_ip_addr', \ - 'task_lifetime_days', 'task_endtime', 'maxproberuntime', 'maxtailruntime': - val = data.get(var, None) - if val is None: - info[var] = 'undefined' - else: - info[var] = json.dumps(val) - - for var in 'accounting_group', 'accounting_group_user': - info[var] = data[var] - - for var in 'savelogsflag', 'blacklistT1', 'retry_aso', 'aso_timeout', 'publication', 'saveoutput', 'numautomjobretries', 'jobcount': - info[var] = int(data[var]) - - for var in 'siteblacklist', 'sitewhitelist', 'addoutputfiles', 'tfileoutfiles', 'edmoutfiles': - val = data[var] - if val is None: - info[var] = "{}" - else: - info[var] = "{" + json.dumps(val)[1:-1] + "}" - - info['lumimask'] = '"' + json.dumps(WMCore.WMSpec.WMTask.buildLumiMask(data['runs'], data['lumis'])).replace(r'"', r'\"') + '"' - - splitArgName = SPLIT_ARG_MAP[data['splitalgo']] - info['algoargs'] = '"' + json.dumps({'halt_job_on_file_boundaries': False, 'splitOnRun': False, splitArgName : data['algoargs']}).replace('"', r'\"') + '"' - info['attempt'] = 0 - - for var in ["cacheurl", "jobsw", "jobarch", "cachefilename", "asyncdest", "requestname"]: - info[var+"_flatten"] = data[var] - - info["addoutputfiles_flatten"] = '{}' - - temp_dest, dest = makeLFNPrefixes(data) - info["temp_dest"] = temp_dest - info["output_dest"] = dest - info['x509up_file'] = os.path.split(data['user_proxy'])[-1] - info['user_proxy'] = data['user_proxy'] - info['scratch'] = data['scratch'] - - return info - def getLocation(default_name): """ Get the location of the runtime code (job wrapper, postjob, anything executed on the schedd @@ -378,11 +208,12 @@ def __init__(self, config, crabserver, procnum=-1, rucioClient=None): self.rucioClient = rucioClient self.runningInTW = crabserver is not None - def populateGlideinMatching(self, info): + def populateGlideinMatching(self, task): """ actually simply set the required arch and microarch """ - scram_arch = info['tm_job_arch'] + matchInfo = {} + scram_arch = task['tm_job_arch'] # required_arch, set default - info['required_arch'] = "X86_64" + matchInfo['required_arch'] = "X86_64" # The following regex matches a scram arch into four groups # for example el9_amd64_gcc10 is matched as (el)(9)_(amd64)_(gcc10) # later, only the third group is returned, the one corresponding to the arch. @@ -392,22 +223,24 @@ def populateGlideinMatching(self, info): if arch not in SCRAM_TO_ARCH: msg = f"Job configured for non-supported ScramArch '{arch}'" raise SubmissionRefusedException(msg) - info['required_arch'] = SCRAM_TO_ARCH.get(arch) + matchInfo['required_arch'] = SCRAM_TO_ARCH.get(arch) # required minimum micro_arch may need to be handled differently in the future (arm, risc, ...) # and may need different classAd(s) in the JDL, so try to be general here - min_micro_arch = info['tm_job_min_microarch'] + min_micro_arch = task['tm_job_min_microarch'] if not min_micro_arch: - info['required_minimum_microarch'] = '2' # the current default for CMSSW - return + matchInfo['required_minimum_microarch'] = '2' # the current default for CMSSW + return matchInfo if min_micro_arch == 'any': - info['required_minimum_microarch'] = 0 - return + matchInfo['required_minimum_microarch'] = 0 + return matchInfo if min_micro_arch.startswith('x86-64-v'): - info['required_minimum_microarch'] = int(min_micro_arch.split('v')[-1]) - return + matchInfo['required_minimum_microarch'] = int(min_micro_arch.split('v')[-1]) + return matchInfo self.logger.error(f"Not supported microarch: {min_micro_arch}. Ignore it") - info['required_minimum_microarch'] = 'any' + matchInfo['required_minimum_microarch'] = 'any' + + return matchInfo def getDashboardTaskType(self, task): """ Get the dashboard activity name for the task. @@ -455,133 +288,245 @@ def isGlobalBlacklistIgnored(self, kwargs): def makeJobSubmit(self, task): """ - Create the submit file. This is reused by all jobs in the task; differences - between the jobs are taken care of in the makeDagSpecs. - Any key defined in the dictionary passed to transform_strings - is deleted unless accounted for in the transform_strings method. + Prepare an HTCondor Submit object which will serve as template for + all job submissions. It will be persisted in Job.submiy JDL file and + customized by PreJob.py for each job using info from DAG + Differences between the jobs are taken care of in the makeDagSpecs and + propagated to the scheduler via the DAG description files """ - if os.path.exists("Job.submit"): - info = {'jobcount': int(task['jobcount'])} - return info - - # From here on out, we convert from tm_* names to the DataWorkflow names - info = dict(task) - - info['workflow'] = task['tm_taskname'] - info['jobtype'] = 'Analysis' - info['jobsw'] = info['tm_job_sw'] - info['jobarch'] = info['tm_job_arch'] - info['inputdata'] = info['tm_input_dataset'] - ## The 1st line below is for backward compatibility with entries in the TaskDB - ## made by CRAB server < 3.3.1511, where tm_primary_dataset = null - ## and tm_input_dataset always contains at least one '/'. Once we don't - ## care about backward compatibility anymore, remove the 1st line and - ## uncomment the 2nd line. - info['primarydataset'] = info['tm_primary_dataset'] if info['tm_primary_dataset'] else info['tm_input_dataset'].split('/')[1] - #info['primarydataset'] = info['tm_primary_dataset'] - info['splitalgo'] = info['tm_split_algo'] - info['algoargs'] = info['tm_split_args'] - info['cachefilename'] = info['tm_user_sandbox'] - info['cacheurl'] = info['tm_cache_url'] - info['userhn'] = info['tm_username'] - info['publishname'] = info['tm_publish_name'] - info['asyncdest'] = info['tm_asyncdest'] - info['dbsurl'] = info['tm_dbs_url'] - info['publishdbsurl'] = info['tm_publish_dbs_url'] - info['publication'] = 1 if info['tm_publication'] == 'T' else 0 - info['userdn'] = info['tm_user_dn'] - info['requestname'] = task['tm_taskname'].replace('"', '') - info['savelogsflag'] = 1 if info['tm_save_logs'] == 'T' else 0 # Note: this must always be 0 for probe jobs, is taken care of in PostJob.py - info['blacklistT1'] = 0 - info['siteblacklist'] = task['tm_site_blacklist'] - info['sitewhitelist'] = task['tm_site_whitelist'] - info['addoutputfiles'] = task['tm_outfiles'] - info['tfileoutfiles'] = task['tm_tfile_outfiles'] - info['edmoutfiles'] = task['tm_edm_outfiles'] - info['oneEventMode'] = 1 if info['tm_one_event_mode'] == 'T' else 0 - info['taskType'] = self.getDashboardTaskType(task) - info['worker_name'] = getattr(self.config.TaskWorker, 'name', 'unknown') - info['retry_aso'] = 1 if getattr(self.config.TaskWorker, 'retryOnASOFailures', True) else 0 - if task['tm_output_lfn'].startswith('/store/user/rucio') or \ - task['tm_output_lfn'].startswith('/store/group/rucio'): - info['aso_timeout'] = getattr(self.config.TaskWorker, 'ASORucioTimeout', 0) - else: - info['aso_timeout'] = getattr(self.config.TaskWorker, 'ASOTimeout', 0) - info['submitter_ip_addr'] = task['tm_submitter_ip_addr'] - info['cms_wmtool'] = self.setCMS_WMTool(task) - info['cms_tasktype'] = self.setCMS_TaskType(task) - info['cms_type'] = self.setCMS_Type(task) + jobSubmit = htcondor.Submit() - #Classads for task lifetime management, see https://github.com/dmwm/CRABServer/issues/5505 - info['task_lifetime_days'] = TASKLIFETIME // 24 // 60 // 60 - info['task_endtime'] = int(task["tm_start_time"]) + TASKLIFETIME - - self.populateGlideinMatching(info) + if os.path.exists("Job.submit"): + jobSubmit['jobcount'] = str(task['jobcount']) + return jobSubmit + + # these are classAds that we want to be added to each grid job + # in the assignement the RHS has to be a "classAd primitieve type" i.e. + # integer or double-quoted string. If we put a simple string like + # ad = something, HTCondor will look for a variable named something. + # Therefore 3 types are acceptable RHS here: + # - for int variables, convert to str: jobSubmit[] = str(myInt) + # - for string variable, need to quote: jobSubmit[] = classad.quote(myString) + # - if we "type the literal", we can simply use: jobSubmit[]="my text" + # Note that argument to classad.quote can only be string or None + # we prefer to use classad.quote to f'"{myString}"' as it is more clear and robust + # e.g. it handles cases where myString contains '"' by inserting proper escaping + + jobSubmit['My.CRAB_Reqname'] = classad.quote(task['tm_taskname']) + jobSubmit['My.CRAB_workflow'] = classad.quote(task['tm_taskname']) + jobSubmit['My.CMS_JobType'] = classad.quote('Analysis') + jobSubmit['My.CRAB_JobSW'] = classad.quote(task['tm_job_sw']) + jobSubmit['My.CRAB_JobArch'] = classad.quote(task['tm_job_arch']) + # Note: next ad must always be 0 for probe jobs, this is taken care of in PreJob.py + jobSubmit['My.CRAB_SaveLogsFlag'] = "1" if task['tm_save_logs'] == 'T' else "0" + jobSubmit['My.CRAB_DBSURL'] = classad.quote(task['tm_dbs_url']) + jobSubmit['My.CRAB_PostJobStatus'] = classad.quote("NOT RUN") + jobSubmit['My.CRAB_PostJobLastUpdate'] = "0" + jobSubmit['My.CRAB_PublishName'] = classad.quote(task['tm_publish_name']) + jobSubmit['My.CRAB_Publish'] = "1" if task['tm_publication'] == 'T' else "0" + jobSubmit['My.CRAB_PublishDBSURL'] = classad.quote(task['tm_publish_dbs_url']) + jobSubmit['My.CRAB_ISB'] = classad.quote(task['tm_cache_url']) + + def pythonListToClassAdValue(aList): + # python lists need special handling to become the string '{"a","b",...,"c"}' + quotedItems = json.dumps(aList) # from [s1, s2] to the string '["s1","s2"]' + quotedItems = quotedItems.lstrip('[').rstrip(']') # remove square brackets [ ] + value = "{" + quotedItems + "}" # make final string adding the curly brackets { } + return value + jobSubmit['My.CRAB_SiteBlacklist'] = pythonListToClassAdValue(task['tm_site_blacklist']) + jobSubmit['My.CRAB_SiteWhitelist'] = pythonListToClassAdValue(task['tm_site_whitelist']) + jobSubmit['My.CRAB_AdditionalOutputFiles'] = pythonListToClassAdValue(task['tm_outfiles']) + jobSubmit['My.CRAB_EDMOutputFiles'] = pythonListToClassAdValue(task['tm_edm_outfiles']) + jobSubmit['My.CRAB_TFileOutputFiles'] = pythonListToClassAdValue(task['tm_outfiles']) + jobSubmit['My.CRAB_UserDN'] = classad.quote(task['tm_user_dn']) + jobSubmit['My.CRAB_UserHN'] = classad.quote(task['tm_username']) + jobSubmit['My.CRAB_AsyncDest'] = classad.quote(task['tm_asyncdest']) + jobSubmit['My.CRAB_StageoutPolicy'] = classad.quote(task['stageoutpolicy']) + # for VOMS role and group, PostJob and RenewRemoteProxies code want the undefined value, not " + userRole = task['tm_user_role'] + jobSubmit['My.CRAB_UserRole'] = classad.quote(userRole) if userRole else 'undefined' + userGroup = task['tm_user_group'] + jobSubmit['My.CRAB_UserGroup'] = classad.quote(userGroup) if userGroup else 'undefined' + jobSubmit['My.CRAB_TaskWorker'] = classad.quote(getattr(self.config.TaskWorker, 'name', 'unknown')) + retry_aso = "1" if getattr(self.config.TaskWorker, 'retryOnASOFailures', True) else "0" + jobSubmit['My.CRAB_RetryOnASOFailures'] = retry_aso + jobSubmit['My.CRAB_ASOTimeout'] = str(getattr(self.config.TaskWorker, 'ASORucioTimeout', 0)) + jobSubmit['My.CRAB_RestHost'] = classad.quote(task['resthost']) + jobSubmit['My.CRAB_DbInstance'] = classad.quote(task['dbinstance']) + jobSubmit['My.CRAB_NumAutomJobRetries'] = str(task['numautomjobretries']) + jobSubmit['My.CRAB_Id'] = "$(count)" # count macro will be defined via VARS line in the DAG file + jobSubmit['My.CRAB_JobCount'] = str(task['jobcount']) + temp_dest, dest = makeLFNPrefixes(task) + jobSubmit['My.CRAB_OutTempLFNDir'] = classad.quote(temp_dest) + jobSubmit['My.CRAB_OutLFNDir'] = classad.quote(dest) + oneEventMode = "1" if task['tm_one_event_mode'] == 'T' else "0" + jobSubmit['My.CRAB_oneEventMode'] = oneEventMode + jobSubmit['My.CRAB_PrimaryDataset'] = classad.quote(task['tm_primary_dataset']) + jobSubmit['My.CRAB_DAGType'] = classad.quote("Job") + jobSubmit['My.CRAB_SubmitterIpAddr'] = classad.quote(task['tm_submitter_ip_addr']) + jobSubmit['My.CRAB_TaskLifetimeDays'] = str(TASKLIFETIME // 24 // 60 // 60) + jobSubmit['My.CRAB_TaskEndTime'] = str(int(task["tm_start_time"]) + TASKLIFETIME) + jobSubmit['My.CRAB_SplitAlgo'] = classad.quote(task['tm_split_algo']) + jobSubmit['My.CRAB_AlgoArgs'] = classad.quote(str(task['tm_split_args'])) # from dict to str before quoting + jobSubmit['My.CMS_WMTool'] = classad.quote(self.setCMS_WMTool(task)) + jobSubmit['My.CMS_TaskType'] = classad.quote(self.setCMS_TaskType(task)) + jobSubmit['My.CMS_SubmissionTool'] = classad.quote("CRAB") + jobSubmit['My.CMS_Type'] = classad.quote(self.setCMS_Type(task)) + transferOutputs = "1" if task['tm_transfer_outputs'] == 'T' else "0" # Note: this must always be 0 for probe jobs, is taken care of in PreJob.py + jobSubmit['My.CRAB_TransferOutputs'] = transferOutputs + + # These attributes help gWMS decide what platforms this job can run on; see https://twiki.cern.ch/twiki/bin/view/CMSPublic/CompOpsMatchArchitecture + matchInfo = self.populateGlideinMatching(task) + jobSubmit['My.REQUIRED_ARCH'] = classad.quote(matchInfo['required_arch']) + jobSubmit['My.REQUIRED_MINIMUM_MICROARCH'] = classad.quote(matchInfo['required_minimum_microarch']) + jobSubmit['My.DESIRED_CMSDataset'] = classad.quote(task['tm_input_dataset']) + + ## Add group information (local groups in SITECONF via CMSGroupMapper, VOMS groups via task info in DB) + groups = set.union(map_user_to_groups(task['tm_username']), task['user_groups']) + groups = ','.join(groups) # from the set {'g1','g2'...,'gN'} to the string 'g1,g2,..gN' + jobSubmit['My.CMSGroups'] = classad.quote(groups) + + # do we really need this ? i.e. or can replace it with GLIDEIN_CMSSite where it is used in the code ? + jobSubmit['My.JOBGLIDEIN_CMSSite'] = classad.quote('$$([ifThenElse(GLIDEIN_CMSSite is undefined, "Unknown", GLIDEIN_CMSSite)])') + + # + # now actual HTC Job Submission commands, here right hand side can be simply strings + # - info['runs'] = [] - info['lumis'] = [] - info['saveoutput'] = 1 if info['tm_transfer_outputs'] == 'T' else 0 # Note: this must always be 0 for probe jobs, is taken care of in PostJob.py egroups = getattr(self.config.TaskWorker, 'highPrioEgroups', []) - if egroups and info['userhn'] in self.getHighPrioUsers(info['user_proxy'], info['workflow'], egroups): - info['accounting_group'] = 'highprio' + if egroups and task['tm_username'] in self.getHighPrioUsers(egroups): + jobSubmit['accounting_group'] = 'highprio' + else: + jobSubmit['accounting_group'] = 'analysis' + jobSubmit['accounting_group_user'] = task['tm_username'] + + jobSubmit['job_ad_information_attrs'] = "MATCH_EXP_JOBGLIDEIN_CMSSite, JOBGLIDEIN_CMSSite, RemoteSysCpu, RemoteUserCpu" + + # Recover job output and logs on eviction events; make sure they aren't spooled + # This allows us to return stdout to users when they hit memory limits (which triggers PeriodicRemove). + jobSubmit['WhenToTransferOutput'] = "ON_EXIT_OR_EVICT" + # old code had this line in Job.submit, but I can't find it used nor documented anywhere + # +SpoolOnEvict = false + + # Keep job in the queue upon completion long enough for the postJob to run, + # allowing the monitoring script to fetch the postJob status and job exit-code updated by the postJob + jobSubmit['LeaveJobInQueue'] = "ifThenElse((JobStatus=?=4 || JobStatus=?=3) " + \ + "&& (time() - EnteredCurrentStatus < 30 * 60*60), true, false)" + + jobSubmit['universe'] = "vanilla" + jobSubmit['Executable'] = "gWMS-CMSRunAnalysis.sh" + jobSubmit['Output'] = "job_out.$(count)" + jobSubmit['Error'] = "job_err.$(count)" + jobSubmit['Log'] = "job_log" + + # from https://htcondor.readthedocs.io/en/latest/man-pages/condor_submit.html#submit-description-file-commands + # The entire string representing the command line arguments is surrounded by double quote marks + jobSubmit['Arguments'] = classad.quote("--jobId=$(count)") + jobSubmit['transfer_output_files'] = "jobReport.json.$(count), WMArchiveReport.json.$(count)" + + additional_input_file = "" + additional_environment_options = "" + if os.path.exists("CMSRunAnalysis.tar.gz"): + additional_environment_options += ' CRAB_RUNTIME_TARBALL=local' + additional_input_file += ", CMSRunAnalysis.tar.gz" else: - info['accounting_group'] = 'analysis' - info['accounting_group_user'] = info['userhn'] - info = transform_strings(info) - info['faillimit'] = task['tm_fail_limit'] - # tm_extrajdl and tm_user_config['acceleratorparams'] contain list of k=v - # assignements to be turned into classAds, so here we turn them from a python list of strings to - # a single string with k=v separated by \n which can be pasted into the Job.submit JDL - info['extra_jdl'] = '\n'.join(literal_eval(task['tm_extrajdl'])) + raise TaskWorkerException(f"Cannot find CMSRunAnalysis.tar.gz inside the cwd: {os.getcwd()}") + if os.path.exists("TaskManagerRun.tar.gz"): + additional_environment_options += ' CRAB_TASKMANAGER_TARBALL=local' + else: + raise TaskWorkerException(f"Cannot find TaskManagerRun.tar.gz inside the cwd: {os.getcwd()}") + additional_input_file += ", sandbox.tar.gz" # it will be present on SPOOL_DIR after dab_bootstrap + additional_input_file += ", input_args.json" + additional_input_file += ", run_and_lumis.tar.gz" + additional_input_file += ", input_files.tar.gz" + additional_input_file += ", submit_env.sh" + additional_input_file += ", cmscp.sh" + jobSubmit['transfer_input_files'] = f"CMSRunAnalysis.sh, cmscp.py{additional_input_file}" + # make sure coredump (if any) is not added to output files ref: https://lists.cs.wisc.edu/archive/htcondor-users/2022-September/msg00052.shtml + jobSubmit['coresize'] = "0" + + # we should fold this into the config file instead of hardcoding things. + jobSubmit['Environment'] = classad.quote(f"SCRAM_ARCH=$(CRAB_JobArch){additional_environment_options}") + jobSubmit['should_transfer_files'] = "YES" + jobSubmit['use_x509userproxy'] = "true" + + arch = task['tm_job_arch'].split("_")[0] # extracts "slc7" from "slc7_amd64_gcc10" + required_os_list = ARCH_TO_OS.get(arch) + if not required_os_list: + raise SubmissionRefusedException(f"Unsupported architecture {arch}") + # ARCH_TO_OS.get("slc7") gives a list with one item only: ['rhel7'] + jobSubmit['My.REQUIRED_OS'] = classad.quote(required_os_list[0]) + jobSubmit['Requirements'] = "stringListMember(TARGET.Arch, REQUIRED_ARCH)" + + # Ref: https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode + jobSubmit['periodic_release'] = "(HoldReasonCode == 28) || (HoldReasonCode == 30) " + \ + "|| (HoldReasonCode == 13) || (HoldReasonCode == 6)" + + # Remove if + # a) job is in the 'held' status for more than 7 minutes + # b) job is idle more than 7 days + # c) job is running and one of: + # 1) Over memory use + # 2) Over wall clock limit + # 3) Over disk usage of N GB, which is set in ServerUtilities + # d) the taks EndTime has been reached + # e) job is idle and users proxy expired 1 day ago. + # (P.S. why 1 day ago? because there is recurring action which is updating user proxy and lifetime.) + # ** If New periodic remove expression is added, also it should have Periodic Remove Reason. ** + # ** Otherwise message will not be clear and it is hard to debug ** + periodicRemove = "( (JobStatus =?= 5) && (time() - EnteredCurrentStatus > 7*60) )" # a) + periodicRemove += "|| ( (JobStatus =?= 1) && (time() - EnteredCurrentStatus > 7*24*60*60) )" # b) + periodicRemove += "|| ( (JobStatus =?= 2) && ( " # c) + periodicRemove += "(MemoryUsage =!= UNDEFINED && MemoryUsage > RequestMemory)" # c) 1) + periodicRemove += "|| (MaxWallTimeMinsRun * 60 < time() - EnteredCurrentStatus)" # c) 2) + periodicRemove += f"|| (DiskUsage > {MAX_DISK_SPACE})" # c) 3) + periodicRemove += "))" # these parentheses close the "if running" condition, i.e. JobStatus==2 + periodicRemove += "|| (time() > CRAB_TaskEndTime)" # d) + periodicRemove += "|| ( (JobStatus =?= 1) && (time() > (x509UserProxyExpiration + 86400)))""" # e) + jobSubmit['periodic_remove'] = periodicRemove + + # remove reasons are "ordered" in the following big IF starting from the less-conditial ones + # order is relevant and getting it right is "an art" + periodicRemoveReason = "ifThenElse(" + periodicRemoveReason += "time() - EnteredCurrentStatus > 7 * 24 * 60 * 60 && isUndefined(MemoryUsage)," + periodicRemoveReason += "\"Removed due to idle time limit\"," # set this reasons. Else + periodicRemoveReason += "ifThenElse(time() > x509UserProxyExpiration, \"Removed job due to proxy expiration\"," + periodicRemoveReason += "ifThenElse(MemoryUsage > RequestMemory, \"Removed due to memory use\"," + periodicRemoveReason += "ifThenElse(MaxWallTimeMinsRun * 60 < time() - EnteredCurrentStatus, \"Removed due to wall clock limit\"," + periodicRemoveReason += f"ifThenElse(DiskUsage > {MAX_DISK_SPACE}, \"Removed due to disk usage\"," + periodicRemoveReason += "ifThenElse(time() > CRAB_TaskEndTime, \"Removed due to reached CRAB_TaskEndTime\"," + periodicRemoveReason += "\"Removed due to job being held\"))))))" # one closed ")" for each "ifThenElse(" + jobSubmit['My.PeriodicRemoveReason'] = periodicRemoveReason + + # tm_extrajdl and tm_user_config['acceleratorparams'] contain list of k=v assignements to be turned into classAds + # also special handling is needed because is retrieved from DB not as a python list, but as a string + # with format "['a=b','c=d'...]" (a change in RESTWorkerWorkflow would be needed to get a python list + # so we use here the same literal_eval which is used in RESTWorkerWorkflow.py ) + for extraJdl in literal_eval(task['tm_extrajdl']): + k,v = extraJdl.split('=',1) + jobSubmit[k] = v + if task['tm_user_config']['requireaccelerator']: # hardcoding accelerator to GPU (SI currently only have nvidia GPU) - info['accelerator_jdl'] = '+RequiresGPU=1\nrequest_GPUs=1' + jobSubmit['My.RequiresGPU'] = "1" + jobSubmit['request_GPUs'] = "1" if task['tm_user_config']['acceleratorparams']: gpuMemoryMB = task['tm_user_config']['acceleratorparams'].get('GPUMemoryMB', None) cudaCapabilities = task['tm_user_config']['acceleratorparams'].get('CUDACapabilities', None) cudaRuntime = task['tm_user_config']['acceleratorparams'].get('CUDARuntime', None) if gpuMemoryMB: - info['accelerator_jdl'] += f"\n+GPUMemoryMB={gpuMemoryMB}" + jobSubmit['My.GPUMemoryMB'] = classad.quote(gpuMemoryMB) if cudaCapabilities: cudaCapability = ','.join(sorted(cudaCapabilities)) - info['accelerator_jdl'] += f"\n+CUDACapability={classad.quote(cudaCapability)}" + jobSubmit['My.CUDACapability'] = classad.quote(cudaCapability) if cudaRuntime: - info['accelerator_jdl'] += f"\n+CUDARuntime={classad.quote(cudaRuntime)}" - else: - info['accelerator_jdl'] = '' - arch = info['jobarch_flatten'].split("_")[0] # extracts "slc7" from "slc7_amd64_gcc10" - required_os_list = ARCH_TO_OS.get(arch) - if not required_os_list: - raise SubmissionRefusedException(f"Unsupported architecture {arch}") - # ARCH_TO_OS.get("slc7") gives a list with one item only: ['rhel7'] - info['opsys_req'] = f'+REQUIRED_OS="{required_os_list[0]}"' - - info.setdefault("additional_environment_options", '') - info.setdefault("additional_input_file", "") - if os.path.exists("CMSRunAnalysis.tar.gz"): - info['additional_environment_options'] += 'CRAB_RUNTIME_TARBALL=local' - info['additional_input_file'] += ", CMSRunAnalysis.tar.gz" - else: - raise TaskWorkerException(f"Cannot find CMSRunAnalysis.tar.gz inside the cwd: {os.getcwd()}") - if os.path.exists("TaskManagerRun.tar.gz"): - info['additional_environment_options'] += ' CRAB_TASKMANAGER_TARBALL=local' - else: - raise TaskWorkerException(f"Cannot find TaskManagerRun.tar.gz inside the cwd: {os.getcwd()}") - info['additional_input_file'] += ", sandbox.tar.gz" # it will be present on SPOOL_DIR after dab_bootstrap - info['additional_input_file'] += ", input_args.json" - info['additional_input_file'] += ", run_and_lumis.tar.gz" - info['additional_input_file'] += ", input_files.tar.gz" - info['additional_input_file'] += ", submit_env.sh" - info['additional_input_file'] += ", cmscp.sh" - - info['max_disk_space'] = MAX_DISK_SPACE + jobSubmit['My.CUDARuntime'] = classad.quote(cudaRuntime) with open("Job.submit", "w", encoding='utf-8') as fd: - fd.write(JOB_SUBMIT % info) - - return info + print(jobSubmit, file=fd) + return jobSubmit def getPreScriptDefer(self, task, jobid): """ Return the string to be used for deferring prejobs @@ -602,7 +547,6 @@ def getPreScriptDefer(self, task, jobid): prescriptDeferString = '' return prescriptDeferString - def makeDagSpecs(self, task, siteinfo, jobgroup, block, availablesites, datasites, outfiles, startjobid, parent=None, stage='conventional'): """ need a comment line here """ dagSpecs = [] @@ -651,9 +595,9 @@ def makeDagSpecs(self, task, siteinfo, jobgroup, block, availablesites, datasite remoteOutputFiles = [] localOutputFiles = [] for origFile in outfiles: - info = origFile.rsplit(".", 1) - if len(info) == 2: - fileName = f"{info[0]}_{count}.{info[1]}" + nameParts = origFile.rsplit(".", 1) + if len(nameParts) == 2: # filename ends with ., put jobId before the dot + fileName = f"{nameParts[0]}_{count}.{nameParts[1]}" else: fileName = f"{origFile}_{count}" remoteOutputFiles.append(fileName) @@ -709,7 +653,7 @@ def makeDagSpecs(self, task, siteinfo, jobgroup, block, availablesites, datasite def prepareJobArguments(self, dagSpecs, task): """ Prepare an object with all the input parameters of each jobs. It is a list with a dictionary for each job. The dictionary key/value pairs are the variables needed in CMSRunAnalysis.py - This will be save in "input_args*.json", a differnt json file for the main DAG and each subdags + This will be save in "input_args.json" by the caller, adding to existing list in ther if any Inputs: dagSpecs : list of dictionaries with information for each DAG job task: dictionary, the "standard" task dictionary with info from the DataBase TASK table @@ -720,18 +664,17 @@ def prepareJobArguments(self, dagSpecs, task): argdicts = [] for dagspec in dagSpecs: argDict = {} - argDict['inputFiles'] = f"job_input_file_list_{dagspec['count']}.txt" #'job_input_file_list_1.txt' - argDict['runAndLumiMask'] = f"job_lumis_{dagspec['count']}.json" + argDict['inputFileList'] = f"job_input_file_list_{dagspec['count']}.txt" #'job_input_file_list_1.txt' + argDict['runAndLumis'] = f"job_lumis_{dagspec['count']}.json" argDict['CRAB_Id'] = dagspec['count'] #'1' argDict['lheInputFiles'] = dagspec['lheInputFiles'] # False argDict['firstEvent'] = dagspec['firstEvent'] # 'None' argDict['lastEvent'] = dagspec['lastEvent'] # 'None' argDict['firstLumi'] = dagspec['firstLumi'] # 'None' argDict['firstRun'] = dagspec['firstRun'] # 'None' - argDict['CRAB_Archive'] = task['tm_user_sandbox'] #'sandbox.tar.gz' - argDict['CRAB_ISB'] = task['tm_cache_url'] # 'https://cmsweb.cern.ch/crabcache' - argDict['CRAB_JobSW'] = task['tm_job_sw'] # 'CMSSW_9_2_5' - argDict['CRAB_JobArch'] = task['tm_job_arch'] # 'slc6_amd64_gcc530' + argDict['userSandbox'] = task['tm_user_sandbox'] #'sandbox.tar.gz' + argDict['cmsswVersion'] = task['tm_job_sw'] # 'CMSSW_9_2_5' + argDict['scramArch'] = task['tm_job_arch'] # 'slc6_amd64_gcc530' argDict['seeding'] = 'AutomaticSeeding' argDict['scriptExe'] = task['tm_scriptexe'] # argDict['eventsPerLumi'] = task['tm_events_per_lumi'] # @@ -761,7 +704,7 @@ def createSubdag(self, splitterResult, **kwargs): automatic splitting (multiple subdags which will be added in the scheduler by the PreDag.py script which calls this DagmanCreator Returns: - info : dictionary : passes info to next action (DagmanSubmitter) + jobSubmit : HTCondor submit object : passes the Job.submit template to next action (DagmanSubmitter) splitterResult : object : this is the output of previous action (Splitter) and is part of input arguments to DagmanCreator ! As far as Stefano can tell returning it here is a "perverse" way to pass it also to DagmanSubmitter @@ -1129,14 +1072,13 @@ def getBlacklistMsg(): with open("site.ad.json", "w", encoding='utf-8') as fd: json.dump(siteinfo, fd) - ## Save the DAG into a file. with open(dagFileName, "w", encoding='utf-8') as fd: fd.write(dag) kwargs['task']['jobcount'] = len(dagSpecs) - info = self.makeJobSubmit(kwargs['task']) + jobSubmit = self.makeJobSubmit(kwargs['task']) # list of input arguments needed for each jobs argdicts = self.prepareJobArguments(dagSpecs, kwargs['task']) @@ -1150,32 +1092,31 @@ def getBlacklistMsg(): with open(argFileName, 'w', encoding='utf-8') as fd: json.dump(argdicts, fd) + # add maxidle, maxpost and faillimit to the object passed to DagmanSubmitter + # first two be used in the DAG submission and the latter the PostJob maxidle = getattr(self.config.TaskWorker, 'maxIdle', MAX_IDLE_JOBS) if maxidle == -1: - maxidle = info['jobcount'] + maxidle = kwargs['task']['jobcount'] elif maxidle == 0: - maxidle = int(max(MAX_IDLE_JOBS, info['jobcount']*.1)) - info['maxidle'] = maxidle + maxidle = int(max(MAX_IDLE_JOBS, kwargs['task']['jobcount']*.1)) + jobSubmit['My.CRAB_MaxIdle'] = str(maxidle) maxpost = getattr(self.config.TaskWorker, 'maxPost', MAX_POST_JOBS) if maxpost == -1: - maxpost = info['jobcount'] + maxpost = kwargs['task']['jobcount'] elif maxpost == 0: - maxpost = int(max(MAX_POST_JOBS, info['jobcount']*.1)) - info['maxpost'] = maxpost + maxpost = int(max(MAX_POST_JOBS, kwargs['task']['jobcount']*.1)) + jobSubmit['My.CRAB_MaxPost'] = str(maxpost) - if info.get('faillimit') is None: - info['faillimit'] = -1 - #if info['jobcount'] > 200 - # info['faillimit'] = 100 - #else: - # info['faillimit'] = -1 - elif info.get('faillimit') < 0: - info['faillimit'] = -1 + if not 'My.CRAB_FailedNodeLimit' in jobSubmit: + jobSubmit['My.CRAB_FailedNodeLimit'] = "-1" + elif int(jobSubmit['My.CRAB_FailedNodeLimit']) < 0: + jobSubmit['My.CRAB_FailedNodeLimit'] = "-1" - return info, splitterResult, subdags - def getHighPrioUsers(self, userProxy, workflow, egroups): + return jobSubmit, splitterResult, subdags + + def getHighPrioUsers(self, egroups): """ get the list of high priority users """ highPrioUsers = set() @@ -1186,11 +1127,10 @@ def getHighPrioUsers(self, userProxy, workflow, egroups): msg = "Error when getting the high priority users list." \ " Will ignore the high priority list and continue normally." \ f" Error reason: {ex}" - self.uploadWarning(msg, userProxy, workflow) + self.logger.error(msg) return [] return highPrioUsers - def executeInternal(self, *args, **kw): """ all real work is done here """ transform_location = getLocation('CMSRunAnalysis.sh') @@ -1253,7 +1193,7 @@ def executeInternal(self, *args, **kw): filesForSched.append("input_dataset_lumis.json") filesForSched.append("input_dataset_duplicate_lumis.json") - info, splitterResult, subdags = self.createSubdag(*args, **kw) + jobSubmit, splitterResult, subdags = self.createSubdag(*args, **kw) # as splitter summary is useful for dryrun, let's add it to the InputFiles tarball jobGroups = splitterResult[0] # the first returned value of Splitter action is the splitterFactory output @@ -1266,15 +1206,14 @@ def executeInternal(self, *args, **kw): self.prepareTarballForSched(filesForSched, subdags) - return info, params, ["InputFiles.tar.gz"], splitterResult - + return jobSubmit, params, ["InputFiles.tar.gz"], splitterResult def execute(self, *args, **kw): """ entry point called by Hanlder """ cwd = os.getcwd() try: os.chdir(kw['tempDir']) - info, params, inputFiles, splitterResult = self.executeInternal(*args, **kw) - return TaskWorker.DataObjects.Result.Result(task=kw['task'], result=(info, params, inputFiles, splitterResult)) + jobSubmit, params, inputFiles, splitterResult = self.executeInternal(*args, **kw) + return TaskWorker.DataObjects.Result.Result(task=kw['task'], result=(jobSubmit, params, inputFiles, splitterResult)) finally: os.chdir(cwd) diff --git a/src/python/TaskWorker/Actions/DagmanResubmitter.py b/src/python/TaskWorker/Actions/DagmanResubmitter.py index 00becaffc2..d2eccc40de 100644 --- a/src/python/TaskWorker/Actions/DagmanResubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanResubmitter.py @@ -61,8 +61,7 @@ def executeInternal(self, *args, **kwargs): #pylint: disable=unused-argument raise TaskWorkerException(msg) from exp # Check memory and walltime - checkMemoryWalltime(None, task, 'resubmit', self.logger, self.uploadWarning) - + checkMemoryWalltime(task, 'resubmit', self.logger, self.uploadWarning) # Find only the originally submitted DAG to hold and release: this # will re-trigger the scripts and adjust retries and other diff --git a/src/python/TaskWorker/Actions/DagmanSubmitter.py b/src/python/TaskWorker/Actions/DagmanSubmitter.py index f0a0458379..3b7baa0109 100644 --- a/src/python/TaskWorker/Actions/DagmanSubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanSubmitter.py @@ -10,7 +10,6 @@ from http.client import HTTPException from urllib.parse import urlencode -import CMSGroupMapper import HTCondorLocator from ServerUtilities import FEEDBACKMAIL @@ -28,80 +27,74 @@ import htcondor import classad -## These are the CRAB attributes that we want to add to the job class ad when -## using the submitDirect() method. I.e. to submit the dagman boostrap job to the scheduler (AP) -# We do not need all of this anymore !! Most of them are in Job.submit created by -# DagmanCreator and are not needed to submit/run the DagMan. -SUBMIT_INFO = [ \ - ('+CRAB_ReqName', 'requestname'), - ('+CRAB_Workflow', 'workflow'), - ('+CMS_JobType', 'jobtype'), - ('+CRAB_JobSW', 'jobsw'), - ('+CRAB_JobArch', 'jobarch'), - ('+DESIRED_CMSDataset', 'inputdata'), - ('+CRAB_DBSURL', 'dbsurl'), - ('+CRAB_PublishName', 'publishname'), - ('+CRAB_Publish', 'publication'), - ('+CRAB_PublishDBSURL', 'publishdbsurl'), - ('+CRAB_PrimaryDataset', 'primarydataset'), - ('+CRAB_ISB', 'cacheurl'), - ('+CRAB_AdditionalOutputFiles', 'addoutputfiles'), - ('+CRAB_EDMOutputFiles', 'edmoutfiles'), - ('+CRAB_TFileOutputFiles', 'tfileoutfiles'), - ('+CRAB_TransferOutputs', 'saveoutput'), - ('+CRAB_SaveLogsFlag', 'savelogsflag'), - ('+CRAB_UserDN', 'userdn'), - ('+CRAB_UserHN', 'userhn'), - ('+CRAB_AsyncDest', 'asyncdest'), - #('+CRAB_StageoutPolicy', 'stageoutpolicy'), - ('+CRAB_UserRole', 'tm_user_role'), - ('+CRAB_UserGroup', 'tm_user_group'), - ('+CRAB_TaskWorker', 'worker_name'), - ('+CRAB_RetryOnASOFailures', 'retry_aso'), - ('+CRAB_ASOTimeout', 'aso_timeout'), - ('+CRAB_RestHost', 'resthost'), - ('+CRAB_DbInstance', 'dbinstance'), - ('+CRAB_NumAutomJobRetries', 'numautomjobretries'), - ('+CRAB_SplitAlgo', 'splitalgo'), - ('+CRAB_AlgoArgs', 'algoargs'), - ('+CRAB_LumiMask', 'lumimask'), - ('+CRAB_JobCount', 'jobcount'), - ('+CRAB_UserVO', 'tm_user_vo'), - ('+CRAB_SiteBlacklist', 'siteblacklist'), - ('+CRAB_SiteWhitelist', 'sitewhitelist'), - ('+CRAB_RequestedMemory', 'tm_maxmemory'), - ('+CRAB_RequestedCores', 'tm_numcores'), - ('+MaxWallTimeMins', 'tm_maxjobruntime'), - ('+MaxWallTimeMinsRun', 'tm_maxjobruntime'), - ('+MaxWallTimeMinsProbe', 'maxproberuntime'), - ('+MaxWallTimeMinsTail', 'maxtailruntime'), - ('+CRAB_FailedNodeLimit', 'faillimit'), - ('+CRAB_DashboardTaskType', 'taskType'), - ('+CRAB_MaxIdle', 'maxidle'), - ('+CRAB_MaxPost', 'maxpost'), - ('+CMS_Type', 'cms_type'), - ('+CMS_WMTool', 'cms_wmtool'), - ('+CMS_TaskType', 'cms_tasktype'), - ] - - -def addCRABInfoToJobJDL(jdl, info): + +def addJobSubmitInfoToDagJobJDL(dagJdl, jobSubmit): """ - given a submit objecty, add in the appropriate CRAB_& attributes - from the info directory + given an htondor.Submit objecty, add the appropriate information + from the jobSubmit prepared by DagmanCreator """ - for adName, dictName in SUBMIT_INFO: - if dictName in info and info[dictName] is not None: # 0 or False still is valid info[] - jdl[adName] = str(info[dictName]) - # CRAB_JobReleaseTimeout is passed in the config as an extraJDL, even if - # it is meant to be used in the PreJob which will look in the DAG ads - # so we need to add it to the DAG JDL as well - # Note: extraJDL config. param is a list, DagmanCreator changes it to multiple lines - if 'extra_jdl' in info and info['extra_jdl']: - for ejdl in info['extra_jdl'].split('\n'): - k, v = ejdl.split('=', 1) - if k == "+CRAB_JobReleaseTimeout": - jdl[k] = v + # These are the CRAB attributes that we want to add to the job class ad when + # using the submitDirect() method. I.e. to submit the dagman boostrap job to the scheduler (AP) + # These ads are the way to communivate variable values to the scripts which run in the scheduler, + # Why classAds rather than e.g. a JSON or `setenv` file ? It started this way, and it stuck. + adsToPort = [ + # used in bootstrap script (in other places too !) and useful to lookup things with condor_q + 'My.CRAB_RestHost', + 'My.CRAB_DbInstance', + 'My.CRAB_ReqName', + 'My.CRAB_Workflow', + 'My.CRAB_UserDN', + 'My.CRAB_UserHN', + # these are used in Pre/Post scripts + 'My.CMS_JobType', + 'My.CRAB_JobSW', + 'My.CRAB_JobArch', + 'My.DESIRED_CMSDataset', + 'My.CRAB_MaxIdle', + 'My.CRAB_MaxPost', + 'My.CRAB_FailedNodeLimit', + 'My.CRAB_SaveLogsFlag', + 'My.CRAB_TransferOutputs', + 'My.CRAB_SiteBlacklist', + 'My.CRAB_SiteWhitelist', + 'My.CRAB_JobReleaseTimeout', + 'My.CRAB_RequestedMemory', + 'My.CRAB_RequestedCores', + 'My.MaxWallTimeMins', + 'My.MaxWallTimeMinsRun', + 'My.MaxWallTimeMinsProbe', + 'My.MaxWallTimeMinsTail', + # these are used in PostJob only + 'My.CRAB_UserRole', + 'My.CRAB_UserGroup', + 'My.CRAB_DBSURL', + 'My.CRAB_PrimaryDataset', + 'My.CRAB_AdditionalOutputFiles', + 'My.CRAB_EDMOutputFiles', + 'My.CRAB_TFileOutputFiles', + 'My.CRAB_RetryOnASOFailures', + 'My.CRAB_AsyncDest', + 'My.CRAB_ASOTimeout', + 'My.CRAB_PublishName', + 'My.CRAB_Publish', + # usefulness of these is to be be determined + 'My.CRAB_PublishDBSURL', # not used anywhere in our code in GH + 'My.CRAB_TaskWorker', + 'My.CRAB_NumAutomJobRetries', + 'My.CRAB_SplitAlgo', + 'My.CRAB_AlgoArgs', + 'My.CRAB_LumiMask', + 'My.CRAB_JobCount', + 'My.CRAB_UserVO', + 'My.CRAB_DashboardTaskType', + 'My.CMS_Type', + 'My.CMS_WMTool', + 'My.CMS_TaskType', + ] + + for adName in adsToPort: + if adName in jobSubmit: + dagJdl[adName] = jobSubmit[adName] class ScheddStats(dict): @@ -151,10 +144,17 @@ def __str__(self): scheddStats = ScheddStats() -def checkMemoryWalltime(info, task, cmd, logger, warningUploader): +def checkMemoryWalltime(task, cmd, logger, warningUploader): """ Check memory and walltime and if user requires too much: - upload warning back to crabserver - change walltime to max 47h Issue: #4742 + NOTE: this is used also in DagmanResubmitter. + Arguments: + task : dictionary : the Task info from taskdb + cmd: string : "tm" if called by D.Submitter, "resubmit" if callead by D.Resubmitter + logger: logging object : the current logger + warningUploader : should remove this as arg. and use CRABUtils.TaskUtils.uploadWarning + must remove from TW/Actions/TaskAction and cleanup of TW/Actions code """ stdmaxjobruntime = 2750 @@ -169,8 +169,8 @@ def checkMemoryWalltime(info, task, cmd, logger, warningUploader): msg += " Jobs may not find a site where to run." msg += f" CRAB has changed this value to {stdmaxjobruntime} minutes." logger.warning(msg) - if info is not None: - info['tm_maxjobruntime'] = str(stdmaxjobruntime) + if cmd == 'tm': # means this was called by DagmanSubmitter, not DagmanResubmitter + task['tm_maxjobruntime'] = str(stdmaxjobruntime) # somehow TaskAction/uploadWaning wants the user proxy to make a POST to task DB warningUploader(msg, task['user_proxy'], task['tm_taskname']) if memory is not None and memory > absmaxmemory: @@ -203,7 +203,6 @@ def sendScheddToREST(self, task, schedd): """ Try to set the schedd to the oracle database in the REST interface Raises TaskWorkerException in case of failure """ - task['tm_schedd'] = schedd configreq = {'workflow':task['tm_taskname'], 'subresource':'updateschedd', 'scheddname':schedd} try: @@ -218,6 +217,7 @@ def sendScheddToREST(self, task, schedd): def pickAndSetSchedd(self, task): """ Pick up a schedd using the correct formula Send it to the REST + Set it in the task{} dictionary If we can't send the schedd to the REST this is considered a permanent error @@ -237,8 +237,10 @@ def pickAndSetSchedd(self, task): schedd = loc.getSchedd(chooserFunction=self.config.TaskWorker.scheddPickerFunction) else: schedd = loc.getSchedd() #uses the default memory stuff + self.logger.debug("Finished picking up scheduler. Sending schedd name (%s) to REST", schedd) - self.sendScheddToREST(task, schedd) + self.sendScheddToREST(task, schedd) # this will raise if it fails + task['tm_schedd'] = schedd return schedd @@ -251,14 +253,20 @@ def execute(self, *args, **kwargs): In case of multiple failures is will set a new schedd and return back to the asction handler for retries. """ task = kwargs['task'] - schedd = task['tm_schedd'] - info = args[0][0] - inputFiles = args[0][2] + schedName = task['tm_schedd'] + # jobSubmit from DagmanCreator is an htcondor.Submit() object with same content as Job.submit file + # with the addition of : CRAB_MaxIdle, CRAB_MaxPost, CRAB_FailedNodeLimit which only make sense for DAG + jobSubmit = args[0][0] + # DagmanCreated also created the list of files to be transferred to the scheduler host + filesForScheduler = args[0][2] - checkMemoryWalltime(info, task, 'tm', self.logger, self.uploadWarning) + cwd = os.getcwd() - if not schedd: - schedd = self.pickAndSetSchedd(task) + checkMemoryWalltime(task, 'tm', self.logger, self.uploadWarning) + + if not schedName: + self.pickAndSetSchedd(task) + schedName = task['tm_schedd'] self.logger.debug("Starting duplicate check") dupRes = self.duplicateCheck(task) @@ -266,29 +274,33 @@ def execute(self, *args, **kwargs): if dupRes is not None: return dupRes - for retry in range(self.config.TaskWorker.max_retry + 1): #max_retry can be 0 - self.logger.debug("Trying to submit task %s to schedd %s for the %s time.", task['tm_taskname'], schedd, str(retry)) + self.logger.debug("Try to submit %s to %s for the %s time.", task['tm_taskname'], schedName, str(retry+1)) + os.chdir(kwargs['tempDir']) try: - execInt = self.executeInternal(info, inputFiles, **kwargs) - scheddStats.success(schedd, self.clusterId) + execInt = self.executeInternal(jobSubmit, filesForScheduler, task) + scheddStats.success(schedName, self.clusterId) + os.chdir(cwd) return execInt except Exception as ex: #pylint: disable=broad-except - scheddStats.failure(schedd) - msg = f"Failed to submit task to: {schedd} . Task: {task['tm_taskname']};\n{ex}" + scheddStats.failure(schedName) + msg = f"Failed to submit task to: {schedName} . Task: {task['tm_taskname']};\n{ex}" self.logger.exception(msg) - scheddStats.taskError(schedd, msg) + scheddStats.taskError(schedName, msg) if retry < self.config.TaskWorker.max_retry: #do not sleep on the last retry - self.logger.error("Will retry in %s seconds on %s.", self.config.TaskWorker.retry_interval[retry], schedd) + self.logger.error("Will retry in %s seconds on %s.", + self.config.TaskWorker.retry_interval[retry], schedName) time.sleep(self.config.TaskWorker.retry_interval[retry]) finally: self.logger.info(scheddStats) + os.chdir(cwd) ## All the submission retries to the current schedd have failed. Record the ## failures. ## Returning back to Handler.py for retries, and in case try on a new schedd - self.logger.debug("Choosing a new schedd and then retrying") - schedd = self.pickAndSetSchedd(task) + self.logger.debug("Choosing a new schedd and retry") + self.pickAndSetSchedd(task) + schedName = task['tm_schedd'] ## All the submission retries to this schedd have failed. msg = "The CRAB server backend was not able to submit the jobs to the Grid schedulers." @@ -299,7 +311,7 @@ def execute(self, *args, **kwargs): msg += f" The submission was retried {nTries} times on {nScheds} schedulers." msg += f" These are the failures per Grid scheduler:\n {scheddStats.taskErrors}" - raise TaskWorkerException(msg, retry=(schedd is not None)) + raise TaskWorkerException(msg, retry=(schedName is not None)) def duplicateCheck(self, task): @@ -322,8 +334,7 @@ def duplicateCheck(self, task): schedd, dummyAddress = loc.getScheddObjNew(task['tm_schedd']) self.logger.debug("Got schedd obj for %s ", task['tm_schedd']) - rootConst = f'CRAB_DAGType =?= "BASE" && CRAB_ReqName =?= {classad.quote(workflow)}' \ - '&& (isUndefined(CRAB_Attempt) || CRAB_Attempt == 0)' + rootConst = f'CRAB_DAGType =?= "BASE" && CRAB_ReqName =?= {classad.quote(workflow)}' self.logger.debug("Duplicate check is querying the schedd: %s", rootConst) results = list(schedd.query(rootConst, [])) @@ -374,32 +385,39 @@ def duplicateCheck(self, task): # Note that we don't re-send Dashboard jobs; we assume this is a rare occurrance and # don't want to upset any info already in the Dashboard. - return Result.Result(task=task, result=(-1)) + return Result.Result(task=task, result=-1) - def executeInternal(self, info, inputFiles, **kwargs): + def executeInternal(self, jobSubmit, filesForScheduler, task): """Internal execution to submit to selected scheduler Before submission it does duplicate check to see if - task was not submitted by previous time""" - - task = kwargs['task'] - workflow = task['tm_taskname'] - - cwd = os.getcwd() - os.chdir(kwargs['tempDir']) - - info['start_time'] = task['tm_start_time'] - info['inputFilesString'] = ", ".join(inputFiles + ['subdag.jdl']) + task was not submitted by previous time + Arguments: + jobSubmit : htcondor.Submit object: the content of Job.submit file + filesForScheduler : list : list of files to be spooled for dagman job submission + task : dictionary : the Task infor from taskdb + Returns: + if all ok , returns a TaskWorker.DataObjects.Result oject + Result(task=task, result='OK') + otherwise, raise TaskWorkerException + Side effects: + HTC job is submitted to remote schedulere (AP) to execute dagman_bootstrap_startup.sh + DAGJob.jdl file is written to tmp directory with the JDL used to submit that + subdag.jdl file is written to tmp directory with the a JDL fragment to be included in DAG + description files by PreDag.py when creating subdags for automatic splitting + this cointains a (very large) subset of DAGJob.jdl + """ + + # start preparing the JDL which will be used for submitting the Dagman bootstrap job + dagJobJDL = htcondor.Submit() + addJobSubmitInfoToDagJobJDL(dagJobJDL, jobSubmit) # start with the Job.submit from DagmanCreator + + dagJobJDL['CRAB_TaskSubmitTime'] = str(task['tm_start_time']) + dagJobJDL['transfer_input_files'] = ", ".join(filesForScheduler + ['subdag.jdl']) outputFiles = ["RunJobs.dag.dagman.out", "RunJobs.dag.rescue.001"] - info['outputFilesString'] = ", ".join(outputFiles) - arg = "RunJobs.dag" - - # for uniformity with values prepared in DagmanCreator (in JSON format), add double quotes - info['resthost'] = f"\"{self.crabserver.server['host']}\"" - info['dbinstance'] = f'"{self.crabserver.getDbInstance()}"' + dagJobJDL['transfer_output_files'] = ", ".join(outputFiles) try: - info['remote_condor_setup'] = '' if task['tm_collector']: self.backendurls['htcondorPool'] = task['tm_collector'] loc = HTCondorLocator.HTCondorLocator(self.backendurls) @@ -413,7 +431,7 @@ def executeInternal(self, info, inputFiles, **kwargs): msg = "The CRAB server backend was not able to contact the Grid scheduler." msg += " Please try again later." msg += f" Message from the scheduler: {exp}" - self.logger.exception("%s: %s", workflow, msg) + self.logger.exception("%s: %s", task['tm_taskname'], msg) raise TaskWorkerException(msg, retry=True) from exp try: @@ -423,17 +441,14 @@ def executeInternal(self, info, inputFiles, **kwargs): raise TaskWorkerException(msg, retry=True) from ex # Get location of schedd-specific environment script from schedd ad. - info['remote_condor_setup'] = loc.scheddAd.get("RemoteCondorSetup", "") - - info["CMSGroups"] = set.union(CMSGroupMapper.map_user_to_groups(kwargs['task']['tm_username']), kwargs['task']['user_groups']) - self.logger.info("User %s mapped to local groups %s.", kwargs['task']['tm_username'], info["CMSGroups"]) - if not info["CMSGroups"]: - raise TaskWorkerException(f"CMSGroups can not be empty. Failing task {task['tm_taskname']}", retry=True) + dagJobJDL['My.RemoteCondorSetup'] = loc.scheddAd.get("RemoteCondorSetup", "") self.logger.debug("Finally submitting to the schedd") if address: + cmd = 'dag_bootstrap_startup.sh' + arg = "RunJobs.dag" try: - self.clusterId = self.submitDirect(schedd, 'dag_bootstrap_startup.sh', arg, info) + self.clusterId = self.submitDirect(schedd, cmd, arg, dagJobJDL, task) except Exception as submissionError: msg = f"Something went wrong: {submissionError} \n" if self.clusterId: @@ -442,7 +457,7 @@ def executeInternal(self, info, inputFiles, **kwargs): msg += 'No clusterId was returned to DagmanSubmitter.' msg += " Clean up condor queue before trying again." self.logger.error(msg) - constrain = f"crab_reqname==\"{kwargs['task']['tm_taskname']}\"" + constrain = f"crab_reqname==\"{task['tm_taskname']}\"" constrain = str(constrain) # beware unicode, it breaks htcondor binding self.logger.error("Sending: condor_rm -constrain '%s'", constrain) schedd.act(htcondor.JobAction.Remove, constrain) @@ -452,9 +467,9 @@ def executeInternal(self, info, inputFiles, **kwargs): raise TaskWorkerException("Not able to get schedd address.", retry=True) self.logger.debug("Submission finished") finally: - os.chdir(cwd) + pass - configreq = {'workflow': kwargs['task']['tm_taskname'], + configreq = {'workflow': task['tm_taskname'], 'status': "SUBMITTED", 'subresource': 'success', 'clusterid' : self.clusterId} #that's the condor cluster id of the dag_bootstrap.sh @@ -462,59 +477,49 @@ def executeInternal(self, info, inputFiles, **kwargs): data = urlencode(configreq) self.crabserver.post(api='workflowdb', data=data) - return Result.Result(task=kwargs['task'], result='OK') + return Result.Result(task=task, result='OK') - def submitDirect(self, schedd, cmd, arg, info): #pylint: disable=R0201 + def submitDirect(self, schedd, cmd, arg, dagJobJDL, task): """ Submit directly to the schedd using the HTCondor module """ - jobJDL = htcondor.Submit() - addCRABInfoToJobJDL(jobJDL, info) - - if info["CMSGroups"]: - jobJDL["+CMSGroups"] = classad.quote(','.join(info["CMSGroups"])) - else: - jobJDL["+CMSGroups"] = classad.Value.Undefined # NOTE: Changes here must be synchronized with the job_submit in DagmanCreator.py in CAFTaskWorker - jobJDL["+CRAB_Attempt"] = "0" - jobJDL["+CMS_SubmissionTool"] = classad.quote("CRAB") + dagJobJDL["+CMS_SubmissionTool"] = classad.quote("CRAB") # We switched from local to scheduler universe. Why? It seems there's no way in the # local universe to change the hold signal at runtime. That's fairly important for our # resubmit implementation. - jobJDL["JobUniverse"] = "7" - jobJDL["HoldKillSig"] = "SIGUSR1" - jobJDL["X509UserProxy"] = info['user_proxy'] + dagJobJDL["JobUniverse"] = "7" + dagJobJDL["HoldKillSig"] = "SIGUSR1" + dagJobJDL["X509UserProxy"] = task['user_proxy'] # submission command "priority" maps to jobAd "JobPrio" ! - jobJDL["priority"] = info['tm_priority'] - jobJDL["Requirements"] = "TARGET.Cpus >= 1" # see https://github.com/dmwm/CRABServer/issues/8456#issuecomment-2145887432 - jobJDL["Requirements"] = "True" + dagJobJDL["priority"] = str(task['tm_priority']) + dagJobJDL["Requirements"] = "TARGET.Cpus >= 1" # see https://github.com/dmwm/CRABServer/issues/8456#issuecomment-2145887432 + dagJobJDL["Requirements"] = "True" environmentString = "PATH=/usr/bin:/bin CRAB3_VERSION=3.3.0-pre1" environmentString += " CONDOR_ID=$(ClusterId).$(ProcId)" - environmentString += " " + " ".join(info['additional_environment_options'].split(';')) + environmentString += " CRAB_RUNTIME_TARBALL=local CRAB_TASKMANAGER_TARBALL=local" + ##SB environmentString += " " + " ".join(task['additional_environment_options'].split(';')) if 'useHtcV2' in os.environ: environmentString += " useHtcV2=True" # Environment command in JDL requires proper quotes https://htcondor.readthedocs.io/en/latest/man-pages/condor_submit.html#environment - jobJDL["Environment"] = classad.quote(environmentString) - jobJDL["+RemoteCondorSetup"] = classad.quote(info['remote_condor_setup']) - jobJDL["+CRAB_TaskSubmitTime"] = str(info['start_time']) # this is an int (seconds from epoch) - jobJDL['+CRAB_TaskLifetimeDays'] = str(TASKLIFETIME // 24 // 60 // 60) - jobJDL['+CRAB_TaskEndTime'] = str(int(info['start_time']) + TASKLIFETIME) + dagJobJDL["Environment"] = classad.quote(environmentString) + dagJobJDL['+CRAB_TaskLifetimeDays'] = str(TASKLIFETIME // 24 // 60 // 60) + dagJobJDL['+CRAB_TaskEndTime'] = str(int(task['tm_start_time']) + TASKLIFETIME) #For task management info see https://github.com/dmwm/CRABServer/issues/4681#issuecomment-302336451 - jobJDL["LeaveJobInQueue"] = "True" - jobJDL["PeriodicHold"] = "time() > CRAB_TaskEndTime" - jobJDL["transfer_output_files"] = str(info['outputFilesString']) - jobJDL["OnExitHold"] = "(ExitCode =!= UNDEFINED && ExitCode != 0)" - jobJDL["OnExitRemove"] = "( ExitSignal =?= 11 || (ExitCode =!= UNDEFINED && ExitCode >=0 && ExitCode <= 2))" - # jobJDL["OtherJobRemoveRequirements"] = "DAGManJobId =?= ClusterId" # appears unused SB - jobJDL["RemoveKillSig"] = "SIGUSR1" - - # prepare a jobJDL fragment to be used when running in the scheduler + dagJobJDL["LeaveJobInQueue"] = "True" + dagJobJDL["PeriodicHold"] = "time() > CRAB_TaskEndTime" + dagJobJDL["OnExitHold"] = "(ExitCode =!= UNDEFINED && ExitCode != 0)" + dagJobJDL["OnExitRemove"] = "( ExitSignal =?= 11 || (ExitCode =!= UNDEFINED && ExitCode >=0 && ExitCode <= 2))" + # dagJobJDL["OtherJobRemoveRequirements"] = "DAGManJobId =?= ClusterId" # appears unused SB + dagJobJDL["RemoveKillSig"] = "SIGUSR1" + + # prepare a dagJobJDL fragment to be used when running in the scheduler # to create subdags for automatic splitting. A crucial change is location of the proxy subdagJDL = htcondor.Submit() # submit object does not have a copy method - for k,v in jobJDL.items(): # so we have to create a new object and + for k,v in dagJobJDL.items(): # so we have to create a new object and subdagJDL[k] = v # fill it one element at a time - subdagJDL['X509UserProxy'] = os.path.basename(jobJDL['X509UserProxy']) # proxy in scheduler will be in cwd + subdagJDL['X509UserProxy'] = os.path.basename(dagJobJDL['X509UserProxy']) # proxy in scheduler will be in cwd # make sure that there is no "queue" statement in subdagJDL "jdl fragment" (introduced in v2 HTC bindings) # since condor_submit_dag will add one anyhow @@ -525,27 +530,26 @@ def submitDirect(self, schedd, cmd, arg, info): #pylint: disable=R0201 with open('subdag.jdl', 'w', encoding='utf-8') as fd: print(subdag, file=fd) - jobJDL["+CRAB_DAGType"] = classad.quote("BASE") # we want the ad value to be "BASE", not BASE - jobJDL["output"] = os.path.join(info['scratch'], "request.out") - jobJDL["error"] = os.path.join(info['scratch'], "request.err") - jobJDL["Cmd"] = cmd - jobJDL['Args'] = arg - jobJDL["transfer_input_files"] = str(info['inputFilesString']) + dagJobJDL["+CRAB_DAGType"] = classad.quote("BASE") # we want the ad value to be "BASE", not BASE + dagJobJDL["output"] = os.path.join(task['scratch'], "request.out") + dagJobJDL["error"] = os.path.join(task['scratch'], "request.err") + dagJobJDL["Executable"] = cmd + dagJobJDL['Arguments'] = arg # for debugging purpose with open('DAGJob.jdl', 'w', encoding='utf-8') as fd: - print(jobJDL, file=fd) + print(dagJobJDL, file=fd) htcondor.param['DELEGATE_FULL_JOB_GSI_CREDENTIALS'] = 'true' htcondor.param['DELEGATE_JOB_GSI_CREDENTIALS_LIFETIME'] = '0' try: - submitResult = schedd.submit(description=jobJDL, count=1, spool=True) + submitResult = schedd.submit(description=dagJobJDL, count=1, spool=True) clusterId = submitResult.cluster() numProcs = submitResult.num_procs() if 'useHtcV2' in os.environ: schedd.spool(submitResult) else: - myjobs = jobJDL.jobs(count=numProcs, clusterid=clusterId) + myjobs = dagJobJDL.jobs(count=numProcs, clusterid=clusterId) schedd.spool(list(myjobs)) except Exception as hte: raise TaskWorkerException(f"Submission failed with:\n{hte}") from hte diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index 6b3f971e48..34ff25a594 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -351,15 +351,6 @@ def alter_submit(self, crab_retry): ## Job..submit content. new_submit_text = self.redo_sites(new_submit_text, crab_retry, use_resubmit_info) - ## Add group information: - username = self.task_ad.get('CRAB_UserHN') - if 'CMSGroups' in self.task_ad: - new_submit_text += '+CMSGroups = %s\n' % classad.quote(self.task_ad['CMSGroups']) - elif username: - groups = CMSGroupMapper.map_user_to_groups(username) - if groups: - new_submit_text += '+CMSGroups = %s\n' % classad.quote(groups) - ## Finally add (copy) all the content of the generic Job.submit file. with open("Job.submit", 'r', encoding='utf-8') as fd: new_submit_text += fd.read()