diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py new file mode 100644 index 00000000000..22b661ca13d --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py @@ -0,0 +1,186 @@ +""" :mod: PilotLoggingAgent + + PilotLoggingAgent sends Pilot log files to an SE. + +.. literalinclude:: ../ConfigTemplate.cfg + :start-after: ##BEGIN PilotLoggingAgent + :end-before: ##END + :dedent: 2 + :caption: PilotLoggingAgent options +""" + +# # imports +import os +import time +from DIRAC import S_OK, S_ERROR, gConfig +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOs +from DIRAC.Core.Base.AgentModule import AgentModule +from DIRAC.Core.Utilities.Proxy import executeWithUserProxy +from DIRAC.DataManagementSystem.Client.DataManager import DataManager +from DIRAC.WorkloadManagementSystem.Client.TornadoPilotLoggingClient import TornadoPilotLoggingClient + + +class PilotLoggingAgent(AgentModule): + """ + .. class:: PilotLoggingAgent + + The agent sends completed pilot log files to permanent storage for analysis. + """ + + def __init__(self, *args, **kwargs): + """c'tor""" + super().__init__(*args, **kwargs) + self.clearPilotsDelay = 30 + + def initialize(self): + """ + agent's initialisation. Use this agent's CS information to: + Determine what Defaults/Shifter shifter proxy to use., + get the target SE name from the CS. + Obtain log file location from Tornado. + + :param self: self reference + """ + # pilot logs lifetime in days + self.clearPilotsDelay = self.am_getOption("ClearPilotsDelay", self.clearPilotsDelay) + # configured VOs and setup + res = getVOs() + if not res["OK"]: + return res + self.voList = res.get("Value", []) + + if isinstance(self.voList, str): + self.voList = [self.voList] + + return S_OK() + + def execute(self): + """ + Execute one agent cycle. Upload log files to the SE and register them in the DFC. + Use a shifter proxy dynamically loaded for every VO + + :param self: self reference + """ + voRes = {} + for vo in self.voList: + self.opsHelper = Operations(vo=vo) + # is remote pilot logging enabled for the VO ? + pilotLogging = self.opsHelper.getValue("/Pilot/RemoteLogging", False) + if pilotLogging: + res = self.opsHelper.getOptionsDict("Shifter/DataManager") + if not res["OK"]: + voRes[vo] = "No shifter defined - skipped" + self.log.error(f"No shifter defined for VO: {vo} - skipping ...") + continue + + proxyUser = res["Value"].get("User") + proxyGroup = res["Value"].get("Group") + if proxyGroup is None or proxyUser is None: + self.log.error( + f"No proxy user or group defined for pilot: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}" + ) + voRes[vo] = "No proxy user or group defined - skipped" + continue + + self.log.info(f"Proxy used for pilot logging: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}") + res = self.executeForVO( # pylint: disable=unexpected-keyword-arg + vo, proxyUserName=proxyUser, proxyUserGroup=proxyGroup + ) + if not res["OK"]: + voRes[vo] = res["Message"] + if voRes: + for key, value in voRes.items(): + self.log.error(f"Error for {key} vo; message: {value}") + voRes.update(S_ERROR("Agent cycle for some VO finished with errors")) + return voRes + return S_OK() + + @executeWithUserProxy + def executeForVO(self, vo): + """ + Execute one agent cycle for a VO. It obtains VO-specific configuration pilot options from the CS: + UploadPath - the path where the VO wants to upload pilot logs. It has to start with a VO name (/vo/path). + UploadSE - Storage element where the logs will be kept. + + :param str vo: vo enabled for remote pilot logging + :return: S_OK or S_ERROR + :rtype: dict + """ + + self.log.info(f"Pilot files upload cycle started for VO: {vo}") + res = self.opsHelper.getOptionsDict("Pilot") + if not res["OK"]: + return S_ERROR(f"No pilot section for {vo} vo") + pilotOptions = res["Value"] + uploadSE = pilotOptions.get("UploadSE") + if uploadSE is None: + return S_ERROR("Upload SE not defined") + self.log.info(f"Pilot upload SE: {uploadSE}") + + uploadPath = pilotOptions.get("UploadPath") + if uploadPath is None: + return S_ERROR(f"Upload path on SE {uploadSE} not defined") + self.log.info(f"Pilot upload path: {uploadPath}") + + client = TornadoPilotLoggingClient(useCertificates=True) + resDict = client.getMetadata() + + if not resDict["OK"]: + return resDict + + # vo-specific source log path: + pilotLogPath = os.path.join(resDict["Value"]["LogPath"], vo) + # check for new files and upload them + if not os.path.exists(pilotLogPath): + # not a disaster, the VO is enabled, but no logfiles were ever stored. + return S_OK() + # delete old pilot log files for the vo VO + self.clearOldPilotLogs(pilotLogPath) + + self.log.info(f"Pilot log files location = {pilotLogPath} for VO: {vo}") + + # get finalised (.log) files from Tornado and upload them to the selected SE + + files = [ + f for f in os.listdir(pilotLogPath) if os.path.isfile(os.path.join(pilotLogPath, f)) and f.endswith("log") + ] + + if not files: + self.log.info("No files to upload for this cycle") + for elem in files: + lfn = os.path.join(uploadPath, elem) + name = os.path.join(pilotLogPath, elem) + res = DataManager().putAndRegister(lfn=lfn, fileName=name, diracSE=uploadSE, overwrite=True) + if not res["OK"]: + self.log.error("Could not upload", f"to {uploadSE}: {res['Message']}") + else: + self.log.verbose("File uploaded: ", f"LFN = {res['Value']}") + try: + os.remove(name) + except Exception as excp: + self.log.exception("Cannot remove a local file after uploading", lException=excp) + return S_OK() + + def clearOldPilotLogs(self, pilotLogPath): + """ + Delete old pilot log files unconditionally. Assumes that pilotLogPath exists. + + :param str pilotLogPath: log files directory + :return: None + :rtype: None + """ + + files = os.listdir(pilotLogPath) + seconds = int(self.clearPilotsDelay) * 86400 + currentTime = time.time() + + for file in files: + fullpath = os.path.join(pilotLogPath, file) + modifTime = os.stat(fullpath).st_mtime + if modifTime < currentTime - seconds: + self.log.debug(f" Deleting old log : {fullpath}") + try: + os.remove(fullpath) + except Exception as excp: + self.log.exception(f"Cannot remove an old log file after {fullpath}", lException=excp) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index b8947afb4b1..279410393bc 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -1017,10 +1017,6 @@ def _getPilotOptions(self, queue, **kwargs): pilotOptions.append("--pythonVersion=3") - # Debug - if self.pilotLogLevel.lower() == "debug": - pilotOptions.append("-ddd") - # DIRAC Extensions to be used in pilots pilotExtensionsList = opsHelper.getValue("Pilot/Extensions", []) extensionsList = [] diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py new file mode 100644 index 00000000000..2e7939e48b6 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py @@ -0,0 +1,189 @@ +""" Test class for PilotLoggingAgent Agent +""" +import os +import time +import tempfile + +import pytest +from unittest.mock import MagicMock, patch + +# DIRAC Components +import DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent as plaModule +from DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent import PilotLoggingAgent +from DIRAC import gLogger, gConfig, S_OK, S_ERROR + +gLogger.setLevel("DEBUG") + +# Mock Objects +mockReply = MagicMock() +mockReply1 = MagicMock() +mockOperations = MagicMock() +mockTornadoClient = MagicMock() +mockDataManager = MagicMock() +mockAM = MagicMock() +mockNone = MagicMock() +mockNone.return_value = None + +upDict = { + "OK": True, + "Value": {"User": "proxyUser", "Group": "proxyGroup"}, +} + + +@pytest.fixture +def plaBase(mocker): + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.AgentModule.__init__") + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule._AgentModule__moduleProperties", + side_effect=lambda x, y=None: y, + create=True, + ) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.AgentModule.am_getOption", return_value=mockAM) + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.getVOs", + return_value={"OK": True, "Value": ["gridpp", "lz"]}, + ) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.gConfig.getValue", return_value="GridPP") + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations.getValue", side_effect=mockReply) + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations.getOptionsDict", side_effect=mockReply1 + ) + pla = PilotLoggingAgent() + pla.log = gLogger + pla._AgentModule__configDefaults = mockAM + return pla + + +@pytest.fixture +def pla_initialised(mocker, plaBase): + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.PilotLoggingAgent.executeForVO") + plaBase.initialize() + return plaBase + + +@pytest.fixture +def pla(mocker, plaBase): + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.TornadoPilotLoggingClient", + side_effect=mockTornadoClient, + ) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations", side_effect=mockOperations) + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.DataManager", + side_effect=mockDataManager, + ) + plaBase.initialize() + return plaBase + + +def test_initialize(plaBase): + res = plaBase.initialize() + assert plaBase.voList == plaModule.getVOs()["Value"] + assert res == S_OK() + + +@pytest.mark.parametrize( + "mockReplyInput, expected, expectedExecOut, expected2", + [ + ("/Pilot/RemoteLogging", [True, False], S_OK(), upDict), + ("/Pilot/RemoteLogging", [False, False], S_OK(), upDict), + ("/Pilot/RemoteLogging", [True, False], S_ERROR("Execute for VO failed"), upDict), + ], +) +def test_execute(pla_initialised, mockReplyInput, expected, expectedExecOut, expected2): + """Testing a thin version of execute (executeForVO is mocked)""" + assert pla_initialised.voList == plaModule.getVOs()["Value"] + mockReply.side_effect = expected + mockReply1.return_value = expected2 + # remote pilot logging on (gridpp only) and off. + pla_initialised.executeForVO.return_value = expectedExecOut + res = pla_initialised.execute() + if not any(expected): + pla_initialised.executeForVO.assert_not_called() + else: + assert pla_initialised.executeForVO.called + pla_initialised.executeForVO.assert_called_with( + "gridpp", + proxyUserName=upDict["Value"]["User"], + proxyUserGroup=upDict["Value"]["Group"], + ) + assert res["OK"] == expectedExecOut["OK"] + + +@pytest.mark.parametrize( + "ppath, files, result", + [ + ("pilot/log/path/", ["file1.log", "file2.log", "file3.log"], S_OK()), + ("pilot/log/path/", [], S_OK()), + ], +) +def test_executeForVO(pla, ppath, files, result): + opsHelperValues = {"OK": True, "Value": {"UploadSE": "testUploadSE", "UploadPath": "/gridpp/uploadPath"}} + # full local temporary path: + filepath = os.path.join(tempfile.TemporaryDirectory().name, ppath) + # this is what getMetadata returns: + resDict = {"OK": True, "Value": {"LogPath": filepath}} + mockTornadoClient.return_value.getMetadata.return_value = resDict + mockDataManager.return_value.putAndRegister.return_value = result + if files: + os.makedirs(os.path.join(filepath, "gridpp"), exist_ok=True) + for elem in files: + open(os.path.join(filepath, "gridpp", elem), "w") + mockOperations.return_value.getOptionsDict.return_value = opsHelperValues + pla.opsHelper = mockOperations.return_value + # success route + res = pla.executeForVO(vo="gridpp") + mockTornadoClient.assert_called_with(useCertificates=True) + assert mockTornadoClient.return_value.getMetadata.called + # only called with a non-empty file list: + if files: + assert mockDataManager.return_value.putAndRegister.called + assert res == S_OK() + + +def test_executeForVOMetaFails(pla): + opsHelperValues = {"OK": True, "Value": {"UploadSE": "testUploadSE", "UploadPath": "/gridpp/uploadPath"}} + mockOperations.return_value.getOptionsDict.return_value = opsHelperValues + pla.opsHelper = mockOperations.return_value + # getMetadata call fails. + mockTornadoClient.return_value.getMetadata.return_value = {"OK": False, "Message": "Failed, sorry.."} + res = pla.executeForVO(vo="anything") + assert res["OK"] is False + + +@pytest.mark.parametrize( + "opsHelperValues, expectedRes", + [ + ({"OK": True, "Value": {"UploadPath": "/gridpp/uploadPath"}}, S_ERROR("Upload SE not defined")), + ({"OK": True, "Value": {"UploadSE": "testUploadSE"}}, S_ERROR("Upload path on SE testUploadSE not defined")), + ({"OK": False}, S_ERROR(f"No pilot section for gridpp vo")), + ], +) +def test_executeForVOBadConfig(pla, opsHelperValues, expectedRes): + """Testing an incomplete configuration""" + mockOperations.return_value.getOptionsDict.return_value = opsHelperValues + pla.opsHelper = mockOperations.return_value + res = pla.executeForVO(vo="gridpp") + assert res["OK"] is False + assert res["Message"] == expectedRes["Message"] + mockTornadoClient.return_value.getMetadata.reset_mock() + mockTornadoClient.return_value.getMetadata.assert_not_called() + + +@pytest.mark.parametrize( + "filename, fileAge, ageLimit, expectedResult", [("survives.log", 10, 20, True), ("getsdeleted.log", 21, 20, False)] +) +def test_oldLogsCleaner(plaBase, filename, fileAge, ageLimit, expectedResult): + """Testing old files removal""" + plaBase.clearPilotsDelay = ageLimit + filepath = tempfile.TemporaryDirectory().name + os.makedirs(filepath, exist_ok=True) + testfile = os.path.join(filepath, filename) + fd = open(testfile, "w") + fd.close() + assert os.path.exists(testfile) is True + # cannot patch os.stat globally because os.path.exists uses it ! + with patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.os.stat") as mockOSStat: + mockOSStat.return_value.st_mtime = time.time() - fileAge * 86400 # file older that fileAge in seconds + plaBase.clearOldPilotLogs(filepath) + assert os.path.exists(testfile) is expectedResult diff --git a/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheLoggingPlugin.py new file mode 100644 index 00000000000..32ca1eb65cb --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheLoggingPlugin.py @@ -0,0 +1,124 @@ +""" +File cache logging plugin. +""" +import os +import json +import re +from DIRAC import S_OK, S_ERROR, gLogger +from DIRAC.WorkloadManagementSystem.Client.PilotLoggingPlugins.PilotLoggingPlugin import PilotLoggingPlugin + +sLog = gLogger.getSubLogger(__name__) + + +class FileCacheLoggingPlugin(PilotLoggingPlugin): + """ + File cache logging. Log records are appended to a file, one for each pilot. + It is assumed that an agent will be installed together with this plugin, which will copy + the files to a safe place and clear the cache. + """ + + def __init__(self): + """ + Sets the pilot log files location for a WebServer. + + """ + # UUID pattern + self.pattern = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") + # pilot stamp pattern + self.stamppattern = re.compile(r"^[0-9a-f]{32}$") + self.meta = {} + logPath = os.path.join(os.getcwd(), "pilotlogs") + self.meta["LogPath"] = logPath + if not os.path.exists(logPath): + os.makedirs(logPath) + sLog.verbose("Pilot logging directory:", logPath) + + def sendMessage(self, message, pilotUUID, vo): + """ + File cache sendMessage method. Write the log message to a file line by line. + + :param str message: text to log in json format + :param str pilotUUID: pilot id. Optimally it should be a pilot stamp if available, otherwise a generated UUID. + :param str vo: VO name of a pilot which sent the message. + :return: S_OK or S_ERROR + :rtype: dict + """ + + if not self._verifyUUIDPattern(pilotUUID): + return S_ERROR("Pilot UUID is invalid") + + dirname = os.path.join(self.meta["LogPath"], vo) + try: + if not os.path.exists(dirname): + os.mkdir(dirname) + with open(os.path.join(dirname, pilotUUID), "a") as pilotLog: + try: + messageContent = json.loads(message) + if isinstance(messageContent, list): + for elem in messageContent: + pilotLog.write(elem + "\n") + else: + # it could be a string, if emitted by pilot logger StringIO handler + pilotLog.write(messageContent) + except OSError as oserr: + sLog.error("Error writing to log file:", repr(oserr)) + return S_ERROR(repr(oserr)) + except OSError as err: + sLog.exception("Error opening a pilot log file", lException=err) + return S_ERROR(repr(err)) + return S_OK(f"Message logged successfully for pilot: {pilotUUID} and {vo}") + + def finaliseLogs(self, payload, logfile, vo): + """ + Finalise a log file. Finalised logfile can be copied to a secure location. + + :param dict payload: additional info, a plugin might want to use (i.e. the system return code of a pilot script) + :param str logfile: log filename (pilotUUID). + :param str vo: VO name of a pilot which sent the message. + :return: S_OK or S_ERROR + :rtype: dict + """ + + returnCode = json.loads(payload).get("retCode", 0) + + if not self._verifyUUIDPattern(logfile): + return S_ERROR("Pilot UUID is invalid") + + try: + filepath = self.meta["LogPath"] + os.rename(os.path.join(filepath, vo, logfile), os.path.join(filepath, vo, logfile + ".log")) + sLog.info(f"Log file {logfile} finalised for pilot: (return code: {returnCode})") + return S_OK() + except Exception as err: + sLog.exception("Exception when finalising log") + return S_ERROR(repr(err)) + + def getMeta(self): + """ + Return any metadata related to this plugin. The "LogPath" is the minimum requirement for the dict to contain. + + :return: Dirac S_OK containing the metadata or S_ERROR if the LogPath is not defined. + :rtype: dict + """ + if "LogPath" in self.meta: + return S_OK(self.meta) + return S_ERROR("No Pilot logging directory defined") + + def _verifyUUIDPattern(self, logfile): + """ + Verify if the name of the log file matches the required pattern. + + :param str name: file name + :return: re.match result + :rtype: re.Match object or None. + """ + + res = self.stamppattern.match(logfile) + if not res: + res = self.pattern.match(logfile) + if not res: + sLog.error( + "Pilot UUID does not match the UUID nor the stamp pattern. ", + f"UUID: {logfile}, pilot stamp pattern {self.stamppattern}, UUID pattern {self.pattern}", + ) + return res diff --git a/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/MQPilotLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/MQPilotLoggingPlugin.py new file mode 100644 index 00000000000..6e878b642be --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/MQPilotLoggingPlugin.py @@ -0,0 +1,57 @@ +""" +MeessageQueue Pilot logging plugin. Just log messages. +""" +import re +from DIRAC import S_OK, S_ERROR, gLogger +from DIRAC.WorkloadManagementSystem.Client.PilotLoggingPlugins.PilotLoggingPlugin import PilotLoggingPlugin + +sLog = gLogger.getSubLogger(__name__) + + +class MQPilotLoggingPlugin(PilotLoggingPlugin): + """ + A template of a MQ logging plugin. + It gets the message and converts it to a list of Dictionaries to be shipped to a remote MQ service + """ + + def __init__(self): + sLog.warning("MQPilotLoggingPlugin skeleton is being used. NO-op") + self.rcompiled = re.compile( + r"(?P[0-9-]+)T(?P