Skip to content

Commit

Permalink
Merge pull request #7102 from aldbr/cherry-pick-2-7530e17f8-integration
Browse files Browse the repository at this point in the history
[sweep:integration] AREX fixes: proxy renewal logic + submission with tokens + correctly report aborted pilots
  • Loading branch information
fstagni authored Aug 2, 2023
2 parents d0b65b8 + 42b31e1 commit 7b5fedf
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 106 deletions.
2 changes: 2 additions & 0 deletions src/DIRAC/Resources/Computing/ARCComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
MANDATORY_PARAMETERS = ["Queue"] # Mandatory for ARC CEs in GLUE2?
# See https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#rest-interface-job-states
# We let "Deleted, Hold, Undefined" for the moment as we are not sure whether they are still used
# "None" is a special case: it is returned when the job ID is not found in the system
STATES_MAP = {
"Accepting": PilotStatus.WAITING,
"Accepted": PilotStatus.WAITING,
Expand All @@ -84,6 +85,7 @@
"Wiped": PilotStatus.ABORTED,
"Deleted": PilotStatus.ABORTED,
"Hold": PilotStatus.FAILED,
"None": PilotStatus.ABORTED,
"Undefined": PilotStatus.UNKNOWN,
}

Expand Down
228 changes: 122 additions & 106 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,16 @@ def __init__(self, ceUniqueID):
self.restVersion = "1.0"
# Time left before proxy renewal: 3 hours is a good default
self.proxyTimeLeftBeforeRenewal = 10800
# Current delegation ID, generated/fetched in submitJob(), renewed in getJobStatus()
self._delegationID = None
# Timeout
self.timeout = 5.0
# Request session
self.session = None
self.headers = {}
self.headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
# URL used to communicate with the REST interface
self.base_url = ""

Expand Down Expand Up @@ -80,13 +85,6 @@ def _reset(self):
# Set up the request framework
self.session = requests.Session()
self.session.verify = Locations.getCAsLocation()
self.headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
# Attach the token to the headers if present
if os.environ.get("BEARER_TOKEN"):
self.headers["Authorization"] = "Bearer " + os.environ["BEARER_TOKEN"]

return S_OK()

Expand Down Expand Up @@ -176,11 +174,22 @@ def _checkSession(self):
if not self.session:
return S_ERROR("REST interface not initialised.")

# Get a proxy
# Reinitialize the authentication parameters
self.session.cert = None
self.headers.pop("Authorization", None)

# Get a proxy: still mandatory, even if tokens are used to authenticate
result = self._prepareProxy()
if not result["OK"]:
self.log.error("Failed to set up proxy", result["Message"])
return result

if self.token:
# Attach the token to the headers if present
self.headers["Authorization"] = "Bearer " + self.token["access_token"]
return S_OK()

# Attach the proxy to the session, only if the token is unavailable
self.session.cert = Locations.getProxyLocation()
return S_OK()

Expand Down Expand Up @@ -226,11 +235,15 @@ def __uploadCertificate(self, delegationID, csrContent):

# Get a proxy and sign the CSR
proxy = X509Chain()
result = proxy.loadProxyFromFile(self.session.cert)
proxyFile = Locations.getProxyLocation()
if not proxyFile:
return S_ERROR(f"No proxy available")
result = proxy.loadProxyFromFile(proxyFile)
if not result["OK"]:
return S_ERROR(f"Can't load {self.session.cert}: {result['Message']}")
return S_ERROR(f"Can't load {proxyFile}: {result['Message']}")
result = proxy.generateChainFromRequestString(csrContent)
if not result["OK"]:
self.log.error("Problem with the Certificate Signing Request:", result["Message"])
return S_ERROR("Problem with the Certificate Signing Request")

# Submit the certificate
Expand Down Expand Up @@ -262,38 +275,44 @@ def _prepareDelegation(self):
return result
return S_OK(delegationID)

def _getDelegationID(self, arcJobID):
"""Query and return the delegation ID of the given job.
def _getDelegationIDs(self):
"""Query and return the delegation IDs.
This happens when the call is from self.renewJobs. This function needs to know the
delegation associated to the job
This happens when the call is from self.renewDelegations.
More info at
https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#jobs-management
https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#delegations-management
:param str jobID: ARC job ID
:return: delegation ID
:return: list of delegation IDs
"""
params = {"action": "delegations"}
query = self._urlJoin("jobs")
query = self._urlJoin("delegations")

# Submit the POST request to get the delegation
jobsJson = {"job": [{"id": arcJobID}]}
result = self._request("post", query, params=params, data=json.dumps(jobsJson))
result = self._request("get", query)
if not result["OK"]:
self.log.error("Issue while interacting with the delegation.", result["Message"])
return S_ERROR("Issue while interacting with the delegation")
self.log.error("Issue while interacting with the delegations.", result["Message"])
return S_ERROR("Issue while interacting with the delegations")
response = result["Value"]

responseDelegation = response.json()
if "delegation_id" not in responseDelegation["job"]:
return S_ERROR(f"Cannot find the Delegation ID for Job {arcJobID}")
# If there is no delegation, response.json is expected to return an exception
try:
responseDelegation = response.json()
except requests.JSONDecodeError:
return S_OK([])

# This is not expected
if "delegation" not in responseDelegation:
return S_OK([])

# If there is a single delegationID, then we get an str instead of a list
# Not specified in the documentation
delegations = responseDelegation["delegation"]
if isinstance(delegations, dict):
delegations = [delegations]

delegationIDs = responseDelegation["job"]["delegation_id"]
# Documentation says "Array", but a single string is returned if there is only one
if not isinstance(delegationIDs, list):
delegationIDs = [delegationIDs]
return S_OK(delegationIDs[0])
# responseDelegation should be {'delegation': [{'id': <delegationID>}, ...]}
delegationIDs = [delegationContent["id"] for delegationContent in delegations]
return S_OK(delegationIDs)

#############################################################################

Expand Down Expand Up @@ -374,14 +393,23 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=

self.log.verbose(f"Executable file path: {executableFile}")

# Get a "delegation" and use the same delegation for all the jobs
delegation = ""
result = self._prepareDelegation()
# Get a delegation and use the same delegation for all the jobs
result = self._getDelegationIDs()
if not result["OK"]:
self.log.warn("Could not get a delegation", f"For CE {self.ceHost}")
self.log.warn("Continue without a delegation")
self.log.error("Could not get delegation IDs.", result["Message"])
return S_ERROR("Could not get delegation IDs")

delegationIDs = result["Value"]
if not delegationIDs:
# No existing delegation, we need to prepare one
result = self._prepareDelegation()
if not result["OK"]:
self.log.warn("Could not get a new delegation", f"for CE {self.ceHost}")
return S_ERROR("Could not get a new delegation")
self._delegationID = result["Value"]
else:
delegation = f"\n(delegationid={result['Value']})"
self._delegationID = delegationIDs[0]
delegation = f"\n(delegationid={self._delegationID})"

if not inputs:
inputs = []
Expand Down Expand Up @@ -563,73 +591,66 @@ def getCEStatus(self):

#############################################################################

def _renewJobs(self, arcJobList):
"""Written for the REST interface - jobList is already in the ARC format
:param list arcJobList: list of ARC Job ID
"""
# Renew the jobs
for arcJob in arcJobList:
# First get the delegation (proxy)
result = self._getDelegationID(arcJob)
if not result["OK"]:
self.log.warn("Could not get a delegation from", f"Job {arcJob}")
continue
delegationID = result["Value"]

# Prepare the command
params = {"action": "get"}
query = self._urlJoin(os.path.join("delegations", delegationID))
def _renewDelegation(self):
"""Renew the delegations"""
# Prepare the command
params = {"action": "get"}
query = self._urlJoin(os.path.join("delegations", self._delegationID))

# Submit the POST request to get the proxy
result = self._request("post", query, params=params)
if not result["OK"]:
self.log.debug("Could not get a proxy for", f"job {arcJob}: {result['Message']}")
continue
response = result["Value"]
# Submit the POST request to get the proxy
result = self._request("post", query, params=params)
if not result["OK"]:
self.log.error("Could not get a proxy for", f"delegation {self._delegationID}: {result['Message']}")
return S_ERROR(f"Could not get a proxy for delegation {self._delegationID}")
response = result["Value"]

proxy = X509Chain()
result = proxy.loadChainFromString(response.text)
if not result["OK"]:
continue
proxy = X509Chain()
result = proxy.loadChainFromString(response.text)
if not result["OK"]:
self.log.error("Could not load proxy for", f"delegation {self._delegationID}: {result['Message']}")
return S_ERROR(f"Could not load proxy for delegation {self._delegationID}")

# Now test and renew the proxy
result = proxy.getRemainingSecs()
if not result["OK"]:
continue
timeLeft = result["Value"]
# Now test and renew the proxy
result = proxy.getRemainingSecs()
if not result["OK"]:
self.log.error(
"Could not get remaining time from the proxy for",
f"delegation {self._delegationID}: {result['Message']}",
)
return S_ERROR(f"Could not get remaining time from the proxy for delegation {self._delegationID}")
timeLeft = result["Value"]

if timeLeft >= self.proxyTimeLeftBeforeRenewal:
# No need to renew. Proxy is long enough
continue
if timeLeft >= self.proxyTimeLeftBeforeRenewal:
# No need to renew. Proxy is long enough
return S_OK()

self.log.debug(
"Renewing proxy for job",
f"{arcJob} whose proxy expires at {timeLeft}",
self.log.verbose(
"Renewing delegation",
f"{self._delegationID} whose proxy expires at {timeLeft}",
)
# Proxy needs to be renewed - try to renew it
# First, get a new CSR from the delegation
params = {"action": "renew"}
query = self._urlJoin(os.path.join("delegations", self._delegationID))
result = self._request("post", query, params=params)
if not result["OK"]:
self.log.error(
"Proxy not renewed, failed to get CSR",
f"for delegation {self._delegationID}",
)
# Proxy needs to be renewed - try to renew it
# First, get a new CSR from the delegation
params = {"action": "renew"}
query = self._urlJoin(os.path.join("delegations", delegationID))
result = self._request("post", query, params=params)
return S_ERROR(f"Proxy not renewed, failed to get CSR for delegation {self._delegationID}")
response = result["Value"]

if not response.ok:
self.log.debug(
"Proxy not renewed, failed to get CSR",
f"for job {arcJob} with delegation {delegationID}",
)
continue

# Then, sign and upload the certificate
result = self.__uploadCertificate(delegationID, response.text)
if not result["OK"]:
self.log.debug(
"Proxy not renewed, failed to send renewed proxy",
f"for job {arcJob} with delegation {delegationID}: {result['Message']}",
)
continue
# Then, sign and upload the certificate
result = self.__uploadCertificate(self._delegationID, response.text)
if not result["OK"]:
self.log.error(
"Proxy not renewed, failed to send renewed proxy",
f"delegation {self._delegationID}: {result['Message']}",
)
return S_ERROR(f"Proxy not renewed, failed to send renewed proxy for delegation {self._delegationID}")

self.log.debug("Proxy successfully renewed", f"for job {arcJob}")
self.log.verbose("Proxy successfully renewed", f"for delegation {self._delegationID}")

return S_OK()

Expand Down Expand Up @@ -665,7 +686,6 @@ def getJobStatus(self, jobIDList):
response = result["Value"]

resultDict = {}
jobsToRenew = []
jobsToCancel = []

# A single job is returned in a dict, while multiple jobs are returned in a list
Expand All @@ -681,23 +701,19 @@ def getJobStatus(self, jobIDList):
self.log.debug("REST ARC status", f"for job {jobID} is {arcState}")
resultDict[jobID] = self.mapStates[arcState]

# Renew proxy only of jobs which are running or queuing
if arcState in ("Running", "Queuing"):
jobsToRenew.append(arcJob["id"])
# Cancel held jobs so they don't sit in the queue forever
if arcState == "Hold":
jobsToCancel.append(arcJob["id"])
self.log.debug(f"Killing held job {jobID}")

# Renew jobs to be renewed
# Does not work at present - wait for a new release of ARC CEs for this.
if jobsToRenew:
result = self._renewJobs(jobsToRenew)
# Renew delegation to renew the proxies of the jobs
if self._delegationID:
result = self._renewDelegation()
if not result["OK"]:
# Only log here as we still want to return statuses
self.log.warn("Failed to renew job proxies:", result["Message"])
self.log.warn("Failed to renew delegation", f"{self._delegationID}: {result['Message']}")

# Kill jobs to be killed
# Kill held jobs
if jobsToCancel:
result = self._killJob(jobsToCancel)
if not result["OK"]:
Expand Down

0 comments on commit 7b5fedf

Please sign in to comment.