Skip to content

Commit

Permalink
Merge pull request #7187 from fstagni/sb_simplifications
Browse files Browse the repository at this point in the history
[8.1] Small simplifications of SandboxStoreClient
  • Loading branch information
fstagni authored Sep 26, 2023
2 parents d2a29a7 + 7c9ff20 commit ed8ec00
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 49 deletions.
69 changes: 23 additions & 46 deletions src/DIRAC/WorkloadManagementSystem/Client/SandboxStoreClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@
Will connect to the WorkloadManagement/SandboxStore service.
"""

import hashlib
import os
import re
import tarfile
import hashlib
import tempfile
import re
from io import BytesIO, StringIO

from DIRAC import gLogger, S_OK, S_ERROR

from DIRAC.Core.Tornado.Client.ClientSelector import TransferClientSelector as TransferClient
from DIRAC import S_ERROR, S_OK, gLogger
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
from DIRAC.Core.Base.Client import Client
from DIRAC.Core.Utilities.File import mkDir
from DIRAC.Resources.Storage.StorageElement import StorageElement
from DIRAC.Core.Tornado.Client.ClientSelector import TransferClientSelector as TransferClient
from DIRAC.Core.Utilities.File import getGlobbedTotalSize, mkDir
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
from DIRAC.Core.Utilities.File import getGlobbedTotalSize
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
from DIRAC.Resources.Storage.StorageElement import StorageElement


class SandboxStoreClient:
Expand Down Expand Up @@ -65,8 +63,7 @@ def __getTransferClient(self):
"""Get RPC client for TransferClient"""
if self.__transferClient:
return self.__transferClient
else:
return TransferClient(self.__serviceName, **self.__kwargs)
return TransferClient(self.__serviceName, **self.__kwargs)

# Upload sandbox to jobs and pilots

Expand Down Expand Up @@ -239,8 +236,18 @@ def downloadSandbox(self, sbLocation, destinationDir="", inMemory=False, unpack=
# Jobs

def assignSandboxesToJob(self, jobId, sbList, ownerName="", ownerGroup=""):
"""Assign SB to a job"""
return self.__assignSandboxesToEntity(f"Job:{jobId}", sbList, ownerName, ownerGroup)
"""
Assign sandboxes to a job.
sbList must be a list of sandboxes and relation types
sbList = [ ( "SB:SEName|SEPFN", "Input" ), ( "SB:SEName|SEPFN", "Output" ) ]
"""
eId = f"Job:{jobId}"
for sbT in sbList:
if sbT[1] not in self.__validSandboxTypes:
return S_ERROR(f"Invalid Sandbox type {sbT[1]}")
if SandboxStoreClient.__smdb and ownerName and ownerGroup:
return SandboxStoreClient.__smdb.assignSandboxesToEntities({eId: sbList}, ownerName, ownerGroup)
return self.__getRPCClient().assignSandboxesToEntities({eId: sbList}, ownerName, ownerGroup)

def unassignJobs(self, jobIdList):
"""Unassign SB to a job"""
Expand All @@ -249,11 +256,12 @@ def unassignJobs(self, jobIdList):
entitiesList = []
for jobId in jobIdList:
entitiesList.append(f"Job:{jobId}")
return self.__unassignEntities(entitiesList)
return self.__getRPCClient().unassignEntities(entitiesList)

def downloadSandboxForJob(self, jobId, sbType, destinationPath="", inMemory=False, unpack=True):
"""Download SB for a job"""
result = self.__getSandboxesForEntity(f"Job:{jobId}")
result = self.__getRPCClient().getSandboxesAssignedToEntity(f"Job:{jobId}")

if not result["OK"]:
return result
sbDict = result["Value"]
Expand All @@ -276,34 +284,3 @@ def downloadSandboxForJob(self, jobId, sbType, destinationPath="", inMemory=Fals
return result
downloadedSandboxesLoc.append(result["Value"])
return S_OK(downloadedSandboxesLoc)

##############
# Entities

def __getSandboxesForEntity(self, eId):
"""
Get the sandboxes assigned to jobs and the relation type
"""
rpcClient = self.__getRPCClient()
return rpcClient.getSandboxesAssignedToEntity(eId)

def __assignSandboxesToEntity(self, eId, sbList, ownerName="", ownerGroup=""):
"""
Assign sandboxes to a job.
sbList must be a list of sandboxes and relation types
sbList = [ ( "SB:SEName|SEPFN", "Input" ), ( "SB:SEName|SEPFN", "Output" ) ]
"""
for sbT in sbList:
if sbT[1] not in self.__validSandboxTypes:
return S_ERROR(f"Invalid Sandbox type {sbT[1]}")
if SandboxStoreClient.__smdb and ownerName and ownerGroup:
return SandboxStoreClient.__smdb.assignSandboxesToEntities({eId: sbList}, ownerName, ownerGroup)
rpcClient = self.__getRPCClient()
return rpcClient.assignSandboxesToEntities({eId: sbList}, ownerName, ownerGroup)

def __unassignEntities(self, eIdList):
"""
Unassign a list of jobs of their respective sandboxes
"""
rpcClient = self.__getRPCClient()
return rpcClient.unassignEntities(eIdList)
6 changes: 3 additions & 3 deletions src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,9 +789,9 @@ def processJobOutputs(self):
self.outputSandboxSize = getGlobbedTotalSize(fileList)
self.log.info("Attempting to upload Sandbox with limit:", self.sandboxSizeLimit)
sandboxClient = SandboxStoreClient()
result_sbUpload = sandboxClient.uploadFilesAsSandboxForJob(
fileList, self.jobID, "Output", self.sandboxSizeLimit
) # 1024*1024*10
result_sbUpload = sandboxClient.uploadFilesAsSandbox(
fileList, self.sandboxSizeLimit, assignTo={f"Job:{self.jobID}": "Output"}
)
if not result_sbUpload["OK"]:
self.log.error("Output sandbox upload failed with message", result_sbUpload["Message"])
outputSandboxData = result_sbUpload.get("SandboxFileName")
Expand Down

0 comments on commit ed8ec00

Please sign in to comment.