Skip to content

Commit

Permalink
Merge pull request #7316 from fstagni/cherry-pick-2-71c456a1b-integra…
Browse files Browse the repository at this point in the history
…tion

[sweep:integration] ElasticJobParametersDB: do not configure the IndexPrefix name
  • Loading branch information
fstagni authored Nov 28, 2023
2 parents af7ab51 + 66d4307 commit 2acd553
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 62 deletions.
18 changes: 11 additions & 7 deletions src/DIRAC/Core/Utilities/ElasticSearchDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@
try:
from opensearchpy import OpenSearch as Elasticsearch
from opensearchpy.exceptions import (
ConnectionError as ElasticConnectionError,
TransportError,
ConflictError,
NotFoundError,
RequestError,
ConflictError,
TransportError,
)
from opensearchpy.exceptions import (
ConnectionError as ElasticConnectionError,
)
from opensearchpy.helpers import BulkIndexError, bulk
except ImportError:
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import (
ConnectionError as ElasticConnectionError,
TransportError,
ConflictError,
NotFoundError,
RequestError,
ConflictError,
TransportError,
)
from elasticsearch.exceptions import (
ConnectionError as ElasticConnectionError,
)
from elasticsearch.helpers import BulkIndexError, bulk

Expand All @@ -39,7 +43,7 @@
except ImportError:
from opensearch_dsl import A, Q, Search
except ImportError:
from elasticsearch_dsl import Search, Q, A
from elasticsearch_dsl import A, Q, Search


from DIRAC import S_ERROR, S_OK, gLogger
Expand Down
52 changes: 26 additions & 26 deletions src/DIRAC/Interfaces/API/Dirac.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def submitJob(self, job, mode="wms"):
Example usage:
>>> print dirac.submitJob(job)
>>> print(dirac.submitJob(job))
{'OK': True, 'Value': '12345'}
:param job: Instance of Job class or JDL string
Expand Down Expand Up @@ -218,7 +218,7 @@ def getInputDataCatalog(self, lfns, siteName="", fileName="pool_xml_catalog.xml"
Example usage:
>>> print print d.getInputDataCatalog('/lhcb/a/b/c/00001680_00000490_5.dst',None,'myCat.xml')
>>> print(getInputDataCatalog('/lhcb/a/b/c/00001680_00000490_5.dst',None,'myCat.xml'))
{'Successful': {'<LFN>': {'pfntype': 'ROOT_All', 'protocol': 'SRM2',
'pfn': '<PFN>', 'turl': '<TURL>', 'guid': '3E3E097D-0AC0-DB11-9C0A-00188B770645',
'se': 'CERN-disk'}}, 'Failed': [], 'OK': True, 'Value': ''}
Expand Down Expand Up @@ -677,7 +677,7 @@ def getReplicas(self, lfns, active=True, preferDisk=False, diskOnly=False, print
Example usage:
>>> print dirac.getReplicas('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst')
>>> print(dirac.getReplicas('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'))
{'OK': True, 'Value': {'Successful': {'/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst':
{'CERN-RDST':
'srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'}},
Expand Down Expand Up @@ -733,7 +733,7 @@ def getReplicasForJobs(self, lfns, diskOnly=False, printOutput=False):
Example usage:
>>> print dirac.getReplicasForJobs('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst')
>>> print(dirac.getReplicasForJobs('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'))
{'OK': True, 'Value': {'Successful': {'/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst':
{'CERN-RDST':
'srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'}},
Expand Down Expand Up @@ -786,7 +786,7 @@ def getAllReplicas(self, lfns, printOutput=False):
Example usage:
>>> print dirac.getAllReplicas('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst')
>>> print(dirac.getAllReplicas('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'))
{'OK': True, 'Value': {'Successful': {'/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst':
{'CERN-RDST':
'srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'}},
Expand Down Expand Up @@ -908,7 +908,7 @@ def getLfnMetadata(self, lfns, printOutput=False):
Example usage:
>>> print dirac.getLfnMetadata('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst')
>>> print(dirac.getLfnMetadata('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'))
{'OK': True, 'Value': {'Successful': {'/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst':
{'Status': '-', 'Size': 619475828L, 'GUID': 'E871FBA6-71EA-DC11-8F0C-000E0C4DEB4B', 'ChecksumType': 'AD',
'CheckSumValue': ''}}, 'Failed': {}}}
Expand Down Expand Up @@ -962,7 +962,7 @@ def addFile(self, lfn, fullPath, diracSE, fileGuid=None, printOutput=False):
Example Usage:
>>> print dirac.addFile('/lhcb/user/p/paterson/myFile.tar.gz','myFile.tar.gz','CERN-USER')
>>> print(dirac.addFile('/lhcb/user/p/paterson/myFile.tar.gz','myFile.tar.gz','CERN-USER'))
{'OK': True, 'Value':{'Failed': {},
'Successful': {'/lhcb/user/p/paterson/test/myFile.tar.gz': {'put': 64.246301889419556,
'register': 1.1102778911590576}}}}
Expand Down Expand Up @@ -1003,7 +1003,7 @@ def getFile(self, lfn, destDir="", printOutput=False):
Example Usage:
>>> print dirac.getFile('/lhcb/user/p/paterson/myFile.tar.gz')
>>> print(dirac.getFile('/lhcb/user/p/paterson/myFile.tar.gz'))
{'OK': True, 'Value':{'Failed': {},
'Successful': {'/lhcb/user/p/paterson/test/myFile.tar.gz': '/afs/cern.ch/user/p/paterson/myFile.tar.gz'}}}
Expand Down Expand Up @@ -1042,7 +1042,7 @@ def replicateFile(self, lfn, destinationSE, sourceSE="", localCache="", printOut
Example Usage:
>>> print dirac.replicateFile('/lhcb/user/p/paterson/myFile.tar.gz','CNAF-USER')
>>> print(dirac.replicateFile('/lhcb/user/p/paterson/myFile.tar.gz','CNAF-USER'))
{'OK': True, 'Value':{'Failed': {},
'Successful': {'/lhcb/user/p/paterson/test/myFile.tar.gz': {'register': 0.44766902923583984,
'replicate': 56.42345404624939}}}}
Expand Down Expand Up @@ -1105,7 +1105,7 @@ def replicate(self, lfn, destinationSE, sourceSE="", printOutput=False):
Example Usage:
>>> print dirac.replicate('/lhcb/user/p/paterson/myFile.tar.gz','CNAF-USER')
>>> print(dirac.replicate('/lhcb/user/p/paterson/myFile.tar.gz','CNAF-USER'))
{'OK': True, 'Value':{'Failed': {},
'Successful': {'/lhcb/user/p/paterson/test/myFile.tar.gz': {'register': 0.44766902923583984}}}}
Expand Down Expand Up @@ -1145,7 +1145,7 @@ def getAccessURL(self, lfn, storageElement, printOutput=False, protocol=False):
Example Usage:
>>> print dirac.getAccessURL('/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst','CERN-RAW')
>>> print(dirac.getAccessURL('/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst','CERN-RAW'))
{'OK': True, 'Value': {'Successful': {'srm://...': {'SRM2': 'rfio://...'}}, 'Failed': {}}}
:param lfn: Logical File Name (LFN)
Expand Down Expand Up @@ -1178,7 +1178,7 @@ def getPhysicalFileAccessURL(self, pfn, storageElement, printOutput=False):
Example Usage:
>>> print dirac.getPhysicalFileAccessURL('srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst','CERN_M-DST')
>>> print(dirac.getPhysicalFileAccessURL('srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst','CERN_M-DST'))
{'OK': True, 'Value':{'Failed': {},
'Successful': {'srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst': {'RFIO': 'castor://...'}}}}
Expand Down Expand Up @@ -1210,7 +1210,7 @@ def getPhysicalFileMetadata(self, pfn, storageElement, printOutput=False):
Example Usage:
>>> print dirac.getPhysicalFileMetadata('srm://srm.grid.sara.nl/pnfs/grid.sara.nl/data
>>> print(dirac.getPhysicalFileMetadata('srm://srm.grid.sara.nl/pnfs/grid.sara.nl/data)
/lhcb/data/CCRC08/RAW/LHCb/CCRC/23341/023341_0000039571.raw','NIKHEF-RAW')
{'OK': True, 'Value': {'Successful': {'srm://...': {'SRM2': 'rfio://...'}}, 'Failed': {}}}
Expand Down Expand Up @@ -1241,7 +1241,7 @@ def removeFile(self, lfn, printOutput=False):
Example Usage:
>>> print dirac.removeFile('LFN:/lhcb/data/CCRC08/RAW/LHCb/CCRC/22808/022808_0000018443.raw')
>>> print(dirac.removeFile('LFN:/lhcb/data/CCRC08/RAW/LHCb/CCRC/22808/022808_0000018443.raw'))
{'OK': True, 'Value':...}
:param lfn: Logical File Name (LFN)
Expand Down Expand Up @@ -1269,7 +1269,7 @@ def removeReplica(self, lfn, storageElement, printOutput=False):
Example Usage:
>>> print dirac.removeReplica('LFN:/lhcb/user/p/paterson/myDST.dst','CERN-USER')
>>> print(dirac.removeReplica('LFN:/lhcb/user/p/paterson/myDST.dst','CERN-USER'))
{'OK': True, 'Value':...}
:param lfn: Logical File Name (LFN)
Expand Down Expand Up @@ -1300,7 +1300,7 @@ def getInputSandbox(self, jobID, outputDir=None):
Example Usage:
>>> print dirac.getInputSandbox(12345)
>>> print(dirac.getInputSandbox(12345))
{'OK': True, 'Value': ['Job__Sandbox__.tar.bz2']}
:param jobID: JobID
Expand Down Expand Up @@ -1348,7 +1348,7 @@ def getOutputSandbox(self, jobID, outputDir=None, oversized=True, noJobDir=False
Example Usage:
>>> print dirac.getOutputSandbox(12345)
>>> print(dirac.getOutputSandbox(12345))
{'OK': True, 'Value': ['Job__Sandbox__.tar.bz2']}
:param jobID: JobID
Expand Down Expand Up @@ -1436,7 +1436,7 @@ def deleteJob(self, jobID):
Example Usage:
>>> print dirac.deleteJob(12345)
>>> print(dirac.deleteJob(12345))
{'OK': True, 'Value': [12345]}
:param jobID: JobID
Expand Down Expand Up @@ -1468,7 +1468,7 @@ def rescheduleJob(self, jobID):
Example Usage:
>>> print dirac.rescheduleJob(12345)
>>> print(dirac.rescheduleJob(12345))
{'OK': True, 'Value': [12345]}
:param jobID: JobID
Expand Down Expand Up @@ -1524,7 +1524,7 @@ def getJobStatus(self, jobID):
Example Usage:
>>> print dirac.getJobStatus(79241)
>>> print(dirac.getJobStatus(79241))
{79241: {'Status': 'Done',
'MinorStatus': 'Execution Complete',
'ApplicationStatus': 'some app status'
Expand Down Expand Up @@ -2030,7 +2030,7 @@ def getJobAttributes(self, jobID, printOutput=False):
Example Usage:
>>> print dirac.getJobAttributes(79241)
>>> print(dirac.getJobAttributes(79241))
{'AccountedFlag': 'False','ApplicationNumStatus': '0',
'ApplicationStatus': 'Job Finished Successfully',
'CPUTime': '0.0'}
Expand Down Expand Up @@ -2067,7 +2067,7 @@ def getJobParameters(self, jobID, printOutput=False):
Example Usage:
>>> print dirac.getJobParameters(79241)
>>> print(dirac.getJobParameters(79241))
{'OK': True, 'Value': {'JobPath': 'JobPath,JobSanity,JobPolicy,InputData,JobScheduling,TaskQueue',
'JobSanityCheck': 'Job: 768 JDL: OK, InputData: 2 LFNs OK, '}
Expand Down Expand Up @@ -2106,7 +2106,7 @@ def getJobLoggingInfo(self, jobID, printOutput=False):
Example Usage:
>>> print dirac.getJobLoggingInfo(79241)
>>> print(dirac.getJobLoggingInfo(79241))
{'OK': True, 'Value': [('Received', 'JobPath', 'Unknown', '2008-01-29 15:37:09', 'JobPathAgent'),
('Checking', 'JobSanity', 'Unknown', '2008-01-29 15:37:14', 'JobSanityAgent')]}
Expand Down Expand Up @@ -2149,7 +2149,7 @@ def peekJob(self, jobID, printOutput=False):
Example Usage:
>>> print dirac.peekJob(1484)
>>> print(dirac.peekJob(1484))
{'OK': True, 'Value': 'Job peek result'}
:param jobID: JobID
Expand Down Expand Up @@ -2187,7 +2187,7 @@ def pingService(self, system, service, printOutput=False, url=None):
Example Usage:
>>> print dirac.pingService('WorkloadManagement','JobManager')
>>> print(dirac.pingService('WorkloadManagement','JobManager'))
{'OK': True, 'Value': 'Job ping result'}
:param system: system
Expand Down Expand Up @@ -2235,7 +2235,7 @@ def getJobJDL(self, jobID, original=False, printOutput=False):
Example Usage:
>>> print dirac.getJobJDL(12345)
>>> print(dirac.getJobJDL(12345))
{'Arguments': 'jobDescription.xml',...}
:param jobID: JobID
Expand Down
29 changes: 5 additions & 24 deletions src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,19 @@
""" Module containing a front-end to the ElasticSearch-based ElasticJobParametersDB.
This module interacts with one ES index: "ElasticJobParametersDB",
which is a drop-in replacement for MySQL-based table JobDB.JobParameters.
While JobDB.JobParameters in MySQL is defined as::
CREATE TABLE `JobParameters` (
`JobID` INT(11) UNSIGNED NOT NULL,
`Name` VARCHAR(100) NOT NULL,
`Value` TEXT NOT NULL,
PRIMARY KEY (`JobID`,`Name`),
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
Here we define a dynamic mapping with the constant fields::
"JobID": {"type": "long"},
"timestamp": {"type": "date"},
and all other custom fields added dynamically.
This is a drop-in replacement for MySQL-based table JobDB.JobParameters.
The reason for switching to a ES-based JobParameters lies in the extended searching
capabilities of ES..
capabilities of ES.
This results in higher traceability for DIRAC jobs.
The following class methods are provided for public usage
- getJobParameters()
- setJobParameter()
- deleteJobParameters()
"""
from DIRAC import S_OK, S_ERROR, gConfig
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC.ConfigurationSystem.Client.PathFinder import getDatabaseSection
from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals
from DIRAC.Core.Base.ElasticDB import ElasticDB
from DIRAC.Core.Utilities import TimeUtilities


mapping = {
Expand All @@ -57,8 +39,7 @@ def __init__(self, parentLogger=None):
"""Standard Constructor"""

try:
section = getDatabaseSection("WorkloadManagement/ElasticJobParametersDB")
indexPrefix = gConfig.getValue(f"{section}/IndexPrefix", CSGlobals.getSetup()).lower()
indexPrefix = CSGlobals.getSetup().lower()

# Connecting to the ES cluster
super().__init__("WorkloadManagement/ElasticJobParametersDB", indexPrefix, parentLogger=parentLogger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
"""
from pydantic import ValidationError

from DIRAC import S_OK, S_ERROR
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername, getVOForGroup
from DIRAC.Core.DISET.MessageClient import MessageClient
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRAC.Core.Utilities.DErrno import EWMSJDL, EWMSSUBM
from DIRAC.Core.Utilities.JDL import jdlToBaseJobDescriptionModel
Expand Down Expand Up @@ -352,7 +351,6 @@ def export_rescheduleJob(self, jobIDs):
)
for jobID in validJobList:
self.taskQueueDB.deleteJob(jobID)
# gJobDB.deleteJobFromQueue(jobID)
result = self.jobDB.rescheduleJob(jobID)
self.log.debug(str(result))
if not result["OK"]:
Expand Down

0 comments on commit 2acd553

Please sign in to comment.