diff --git a/src/DIRAC/Resources/Computing/ARCComputingElement.py b/src/DIRAC/Resources/Computing/ARCComputingElement.py index ef8234b6061..bcb4c9f72fc 100755 --- a/src/DIRAC/Resources/Computing/ARCComputingElement.py +++ b/src/DIRAC/Resources/Computing/ARCComputingElement.py @@ -64,6 +64,7 @@ MANDATORY_PARAMETERS = ["Queue"] # Mandatory for ARC CEs in GLUE2? # See https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#rest-interface-job-states # We let "Deleted, Hold, Undefined" for the moment as we are not sure whether they are still used +# "None" is a special case: it is returned when the job ID is not found in the system STATES_MAP = { "Accepting": PilotStatus.WAITING, "Accepted": PilotStatus.WAITING, @@ -84,6 +85,7 @@ "Wiped": PilotStatus.ABORTED, "Deleted": PilotStatus.ABORTED, "Hold": PilotStatus.FAILED, + "None": PilotStatus.ABORTED, "Undefined": PilotStatus.UNKNOWN, } diff --git a/src/DIRAC/Resources/Computing/AREXComputingElement.py b/src/DIRAC/Resources/Computing/AREXComputingElement.py index d09f3216e22..4f25130951a 100755 --- a/src/DIRAC/Resources/Computing/AREXComputingElement.py +++ b/src/DIRAC/Resources/Computing/AREXComputingElement.py @@ -46,11 +46,16 @@ def __init__(self, ceUniqueID): self.restVersion = "1.0" # Time left before proxy renewal: 3 hours is a good default self.proxyTimeLeftBeforeRenewal = 10800 + # Current delegation ID, generated/fetched in submitJob(), renewed in getJobStatus() + self._delegationID = None # Timeout self.timeout = 5.0 # Request session self.session = None - self.headers = {} + self.headers = { + "Accept": "application/json", + "Content-Type": "application/json", + } # URL used to communicate with the REST interface self.base_url = "" @@ -80,13 +85,6 @@ def _reset(self): # Set up the request framework self.session = requests.Session() self.session.verify = Locations.getCAsLocation() - self.headers = { - "Accept": "application/json", - "Content-Type": "application/json", - } - # Attach the token to the headers if present - if os.environ.get("BEARER_TOKEN"): - self.headers["Authorization"] = "Bearer " + os.environ["BEARER_TOKEN"] return S_OK() @@ -176,11 +174,22 @@ def _checkSession(self): if not self.session: return S_ERROR("REST interface not initialised.") - # Get a proxy + # Reinitialize the authentication parameters + self.session.cert = None + self.headers.pop("Authorization", None) + + # Get a proxy: still mandatory, even if tokens are used to authenticate result = self._prepareProxy() if not result["OK"]: self.log.error("Failed to set up proxy", result["Message"]) return result + + if self.token: + # Attach the token to the headers if present + self.headers["Authorization"] = "Bearer " + self.token["access_token"] + return S_OK() + + # Attach the proxy to the session, only if the token is unavailable self.session.cert = Locations.getProxyLocation() return S_OK() @@ -226,11 +235,15 @@ def __uploadCertificate(self, delegationID, csrContent): # Get a proxy and sign the CSR proxy = X509Chain() - result = proxy.loadProxyFromFile(self.session.cert) + proxyFile = Locations.getProxyLocation() + if not proxyFile: + return S_ERROR(f"No proxy available") + result = proxy.loadProxyFromFile(proxyFile) if not result["OK"]: - return S_ERROR(f"Can't load {self.session.cert}: {result['Message']}") + return S_ERROR(f"Can't load {proxyFile}: {result['Message']}") result = proxy.generateChainFromRequestString(csrContent) if not result["OK"]: + self.log.error("Problem with the Certificate Signing Request:", result["Message"]) return S_ERROR("Problem with the Certificate Signing Request") # Submit the certificate @@ -262,38 +275,44 @@ def _prepareDelegation(self): return result return S_OK(delegationID) - def _getDelegationID(self, arcJobID): - """Query and return the delegation ID of the given job. + def _getDelegationIDs(self): + """Query and return the delegation IDs. - This happens when the call is from self.renewJobs. This function needs to know the - delegation associated to the job + This happens when the call is from self.renewDelegations. More info at https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#jobs-management https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#delegations-management - :param str jobID: ARC job ID - :return: delegation ID + :return: list of delegation IDs """ - params = {"action": "delegations"} - query = self._urlJoin("jobs") + query = self._urlJoin("delegations") # Submit the POST request to get the delegation - jobsJson = {"job": [{"id": arcJobID}]} - result = self._request("post", query, params=params, data=json.dumps(jobsJson)) + result = self._request("get", query) if not result["OK"]: - self.log.error("Issue while interacting with the delegation.", result["Message"]) - return S_ERROR("Issue while interacting with the delegation") + self.log.error("Issue while interacting with the delegations.", result["Message"]) + return S_ERROR("Issue while interacting with the delegations") response = result["Value"] - responseDelegation = response.json() - if "delegation_id" not in responseDelegation["job"]: - return S_ERROR(f"Cannot find the Delegation ID for Job {arcJobID}") + # If there is no delegation, response.json is expected to return an exception + try: + responseDelegation = response.json() + except requests.JSONDecodeError: + return S_OK([]) + + # This is not expected + if "delegation" not in responseDelegation: + return S_OK([]) + + # If there is a single delegationID, then we get an str instead of a list + # Not specified in the documentation + delegations = responseDelegation["delegation"] + if isinstance(delegations, dict): + delegations = [delegations] - delegationIDs = responseDelegation["job"]["delegation_id"] - # Documentation says "Array", but a single string is returned if there is only one - if not isinstance(delegationIDs, list): - delegationIDs = [delegationIDs] - return S_OK(delegationIDs[0]) + # responseDelegation should be {'delegation': [{'id': }, ...]} + delegationIDs = [delegationContent["id"] for delegationContent in delegations] + return S_OK(delegationIDs) ############################################################################# @@ -374,14 +393,23 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs= self.log.verbose(f"Executable file path: {executableFile}") - # Get a "delegation" and use the same delegation for all the jobs - delegation = "" - result = self._prepareDelegation() + # Get a delegation and use the same delegation for all the jobs + result = self._getDelegationIDs() if not result["OK"]: - self.log.warn("Could not get a delegation", f"For CE {self.ceHost}") - self.log.warn("Continue without a delegation") + self.log.error("Could not get delegation IDs.", result["Message"]) + return S_ERROR("Could not get delegation IDs") + + delegationIDs = result["Value"] + if not delegationIDs: + # No existing delegation, we need to prepare one + result = self._prepareDelegation() + if not result["OK"]: + self.log.warn("Could not get a new delegation", f"for CE {self.ceHost}") + return S_ERROR("Could not get a new delegation") + self._delegationID = result["Value"] else: - delegation = f"\n(delegationid={result['Value']})" + self._delegationID = delegationIDs[0] + delegation = f"\n(delegationid={self._delegationID})" if not inputs: inputs = [] @@ -563,73 +591,66 @@ def getCEStatus(self): ############################################################################# - def _renewJobs(self, arcJobList): - """Written for the REST interface - jobList is already in the ARC format - - :param list arcJobList: list of ARC Job ID - """ - # Renew the jobs - for arcJob in arcJobList: - # First get the delegation (proxy) - result = self._getDelegationID(arcJob) - if not result["OK"]: - self.log.warn("Could not get a delegation from", f"Job {arcJob}") - continue - delegationID = result["Value"] - - # Prepare the command - params = {"action": "get"} - query = self._urlJoin(os.path.join("delegations", delegationID)) + def _renewDelegation(self): + """Renew the delegations""" + # Prepare the command + params = {"action": "get"} + query = self._urlJoin(os.path.join("delegations", self._delegationID)) - # Submit the POST request to get the proxy - result = self._request("post", query, params=params) - if not result["OK"]: - self.log.debug("Could not get a proxy for", f"job {arcJob}: {result['Message']}") - continue - response = result["Value"] + # Submit the POST request to get the proxy + result = self._request("post", query, params=params) + if not result["OK"]: + self.log.error("Could not get a proxy for", f"delegation {self._delegationID}: {result['Message']}") + return S_ERROR(f"Could not get a proxy for delegation {self._delegationID}") + response = result["Value"] - proxy = X509Chain() - result = proxy.loadChainFromString(response.text) - if not result["OK"]: - continue + proxy = X509Chain() + result = proxy.loadChainFromString(response.text) + if not result["OK"]: + self.log.error("Could not load proxy for", f"delegation {self._delegationID}: {result['Message']}") + return S_ERROR(f"Could not load proxy for delegation {self._delegationID}") - # Now test and renew the proxy - result = proxy.getRemainingSecs() - if not result["OK"]: - continue - timeLeft = result["Value"] + # Now test and renew the proxy + result = proxy.getRemainingSecs() + if not result["OK"]: + self.log.error( + "Could not get remaining time from the proxy for", + f"delegation {self._delegationID}: {result['Message']}", + ) + return S_ERROR(f"Could not get remaining time from the proxy for delegation {self._delegationID}") + timeLeft = result["Value"] - if timeLeft >= self.proxyTimeLeftBeforeRenewal: - # No need to renew. Proxy is long enough - continue + if timeLeft >= self.proxyTimeLeftBeforeRenewal: + # No need to renew. Proxy is long enough + return S_OK() - self.log.debug( - "Renewing proxy for job", - f"{arcJob} whose proxy expires at {timeLeft}", + self.log.verbose( + "Renewing delegation", + f"{self._delegationID} whose proxy expires at {timeLeft}", + ) + # Proxy needs to be renewed - try to renew it + # First, get a new CSR from the delegation + params = {"action": "renew"} + query = self._urlJoin(os.path.join("delegations", self._delegationID)) + result = self._request("post", query, params=params) + if not result["OK"]: + self.log.error( + "Proxy not renewed, failed to get CSR", + f"for delegation {self._delegationID}", ) - # Proxy needs to be renewed - try to renew it - # First, get a new CSR from the delegation - params = {"action": "renew"} - query = self._urlJoin(os.path.join("delegations", delegationID)) - result = self._request("post", query, params=params) + return S_ERROR(f"Proxy not renewed, failed to get CSR for delegation {self._delegationID}") + response = result["Value"] - if not response.ok: - self.log.debug( - "Proxy not renewed, failed to get CSR", - f"for job {arcJob} with delegation {delegationID}", - ) - continue - - # Then, sign and upload the certificate - result = self.__uploadCertificate(delegationID, response.text) - if not result["OK"]: - self.log.debug( - "Proxy not renewed, failed to send renewed proxy", - f"for job {arcJob} with delegation {delegationID}: {result['Message']}", - ) - continue + # Then, sign and upload the certificate + result = self.__uploadCertificate(self._delegationID, response.text) + if not result["OK"]: + self.log.error( + "Proxy not renewed, failed to send renewed proxy", + f"delegation {self._delegationID}: {result['Message']}", + ) + return S_ERROR(f"Proxy not renewed, failed to send renewed proxy for delegation {self._delegationID}") - self.log.debug("Proxy successfully renewed", f"for job {arcJob}") + self.log.verbose("Proxy successfully renewed", f"for delegation {self._delegationID}") return S_OK() @@ -665,7 +686,6 @@ def getJobStatus(self, jobIDList): response = result["Value"] resultDict = {} - jobsToRenew = [] jobsToCancel = [] # A single job is returned in a dict, while multiple jobs are returned in a list @@ -681,23 +701,19 @@ def getJobStatus(self, jobIDList): self.log.debug("REST ARC status", f"for job {jobID} is {arcState}") resultDict[jobID] = self.mapStates[arcState] - # Renew proxy only of jobs which are running or queuing - if arcState in ("Running", "Queuing"): - jobsToRenew.append(arcJob["id"]) # Cancel held jobs so they don't sit in the queue forever if arcState == "Hold": jobsToCancel.append(arcJob["id"]) self.log.debug(f"Killing held job {jobID}") - # Renew jobs to be renewed - # Does not work at present - wait for a new release of ARC CEs for this. - if jobsToRenew: - result = self._renewJobs(jobsToRenew) + # Renew delegation to renew the proxies of the jobs + if self._delegationID: + result = self._renewDelegation() if not result["OK"]: # Only log here as we still want to return statuses - self.log.warn("Failed to renew job proxies:", result["Message"]) + self.log.warn("Failed to renew delegation", f"{self._delegationID}: {result['Message']}") - # Kill jobs to be killed + # Kill held jobs if jobsToCancel: result = self._killJob(jobsToCancel) if not result["OK"]: