-
Notifications
You must be signed in to change notification settings - Fork 176
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #6208 from martynia/integration_janusz_pilotlogsWr…
…apper_dev [integration] Remote Pilot Logger to Tornado
- Loading branch information
Showing
15 changed files
with
999 additions
and
120 deletions.
There are no files selected for viewing
186 changes: 186 additions & 0 deletions
186
src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
""" :mod: PilotLoggingAgent | ||
PilotLoggingAgent sends Pilot log files to an SE. | ||
.. literalinclude:: ../ConfigTemplate.cfg | ||
:start-after: ##BEGIN PilotLoggingAgent | ||
:end-before: ##END | ||
:dedent: 2 | ||
:caption: PilotLoggingAgent options | ||
""" | ||
|
||
# # imports | ||
import os | ||
import time | ||
from DIRAC import S_OK, S_ERROR, gConfig | ||
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations | ||
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOs | ||
from DIRAC.Core.Base.AgentModule import AgentModule | ||
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy | ||
from DIRAC.DataManagementSystem.Client.DataManager import DataManager | ||
from DIRAC.WorkloadManagementSystem.Client.TornadoPilotLoggingClient import TornadoPilotLoggingClient | ||
|
||
|
||
class PilotLoggingAgent(AgentModule): | ||
""" | ||
.. class:: PilotLoggingAgent | ||
The agent sends completed pilot log files to permanent storage for analysis. | ||
""" | ||
|
||
def __init__(self, *args, **kwargs): | ||
"""c'tor""" | ||
super().__init__(*args, **kwargs) | ||
self.clearPilotsDelay = 30 | ||
|
||
def initialize(self): | ||
""" | ||
agent's initialisation. Use this agent's CS information to: | ||
Determine what Defaults/Shifter shifter proxy to use., | ||
get the target SE name from the CS. | ||
Obtain log file location from Tornado. | ||
:param self: self reference | ||
""" | ||
# pilot logs lifetime in days | ||
self.clearPilotsDelay = self.am_getOption("ClearPilotsDelay", self.clearPilotsDelay) | ||
# configured VOs and setup | ||
res = getVOs() | ||
if not res["OK"]: | ||
return res | ||
self.voList = res.get("Value", []) | ||
|
||
if isinstance(self.voList, str): | ||
self.voList = [self.voList] | ||
|
||
return S_OK() | ||
|
||
def execute(self): | ||
""" | ||
Execute one agent cycle. Upload log files to the SE and register them in the DFC. | ||
Use a shifter proxy dynamically loaded for every VO | ||
:param self: self reference | ||
""" | ||
voRes = {} | ||
for vo in self.voList: | ||
self.opsHelper = Operations(vo=vo) | ||
# is remote pilot logging enabled for the VO ? | ||
pilotLogging = self.opsHelper.getValue("/Pilot/RemoteLogging", False) | ||
if pilotLogging: | ||
res = self.opsHelper.getOptionsDict("Shifter/DataManager") | ||
if not res["OK"]: | ||
voRes[vo] = "No shifter defined - skipped" | ||
self.log.error(f"No shifter defined for VO: {vo} - skipping ...") | ||
continue | ||
|
||
proxyUser = res["Value"].get("User") | ||
proxyGroup = res["Value"].get("Group") | ||
if proxyGroup is None or proxyUser is None: | ||
self.log.error( | ||
f"No proxy user or group defined for pilot: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}" | ||
) | ||
voRes[vo] = "No proxy user or group defined - skipped" | ||
continue | ||
|
||
self.log.info(f"Proxy used for pilot logging: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}") | ||
res = self.executeForVO( # pylint: disable=unexpected-keyword-arg | ||
vo, proxyUserName=proxyUser, proxyUserGroup=proxyGroup | ||
) | ||
if not res["OK"]: | ||
voRes[vo] = res["Message"] | ||
if voRes: | ||
for key, value in voRes.items(): | ||
self.log.error(f"Error for {key} vo; message: {value}") | ||
voRes.update(S_ERROR("Agent cycle for some VO finished with errors")) | ||
return voRes | ||
return S_OK() | ||
|
||
@executeWithUserProxy | ||
def executeForVO(self, vo): | ||
""" | ||
Execute one agent cycle for a VO. It obtains VO-specific configuration pilot options from the CS: | ||
UploadPath - the path where the VO wants to upload pilot logs. It has to start with a VO name (/vo/path). | ||
UploadSE - Storage element where the logs will be kept. | ||
:param str vo: vo enabled for remote pilot logging | ||
:return: S_OK or S_ERROR | ||
:rtype: dict | ||
""" | ||
|
||
self.log.info(f"Pilot files upload cycle started for VO: {vo}") | ||
res = self.opsHelper.getOptionsDict("Pilot") | ||
if not res["OK"]: | ||
return S_ERROR(f"No pilot section for {vo} vo") | ||
pilotOptions = res["Value"] | ||
uploadSE = pilotOptions.get("UploadSE") | ||
if uploadSE is None: | ||
return S_ERROR("Upload SE not defined") | ||
self.log.info(f"Pilot upload SE: {uploadSE}") | ||
|
||
uploadPath = pilotOptions.get("UploadPath") | ||
if uploadPath is None: | ||
return S_ERROR(f"Upload path on SE {uploadSE} not defined") | ||
self.log.info(f"Pilot upload path: {uploadPath}") | ||
|
||
client = TornadoPilotLoggingClient(useCertificates=True) | ||
resDict = client.getMetadata() | ||
|
||
if not resDict["OK"]: | ||
return resDict | ||
|
||
# vo-specific source log path: | ||
pilotLogPath = os.path.join(resDict["Value"]["LogPath"], vo) | ||
# check for new files and upload them | ||
if not os.path.exists(pilotLogPath): | ||
# not a disaster, the VO is enabled, but no logfiles were ever stored. | ||
return S_OK() | ||
# delete old pilot log files for the vo VO | ||
self.clearOldPilotLogs(pilotLogPath) | ||
|
||
self.log.info(f"Pilot log files location = {pilotLogPath} for VO: {vo}") | ||
|
||
# get finalised (.log) files from Tornado and upload them to the selected SE | ||
|
||
files = [ | ||
f for f in os.listdir(pilotLogPath) if os.path.isfile(os.path.join(pilotLogPath, f)) and f.endswith("log") | ||
] | ||
|
||
if not files: | ||
self.log.info("No files to upload for this cycle") | ||
for elem in files: | ||
lfn = os.path.join(uploadPath, elem) | ||
name = os.path.join(pilotLogPath, elem) | ||
res = DataManager().putAndRegister(lfn=lfn, fileName=name, diracSE=uploadSE, overwrite=True) | ||
if not res["OK"]: | ||
self.log.error("Could not upload", f"to {uploadSE}: {res['Message']}") | ||
else: | ||
self.log.verbose("File uploaded: ", f"LFN = {res['Value']}") | ||
try: | ||
os.remove(name) | ||
except Exception as excp: | ||
self.log.exception("Cannot remove a local file after uploading", lException=excp) | ||
return S_OK() | ||
|
||
def clearOldPilotLogs(self, pilotLogPath): | ||
""" | ||
Delete old pilot log files unconditionally. Assumes that pilotLogPath exists. | ||
:param str pilotLogPath: log files directory | ||
:return: None | ||
:rtype: None | ||
""" | ||
|
||
files = os.listdir(pilotLogPath) | ||
seconds = int(self.clearPilotsDelay) * 86400 | ||
currentTime = time.time() | ||
|
||
for file in files: | ||
fullpath = os.path.join(pilotLogPath, file) | ||
modifTime = os.stat(fullpath).st_mtime | ||
if modifTime < currentTime - seconds: | ||
self.log.debug(f" Deleting old log : {fullpath}") | ||
try: | ||
os.remove(fullpath) | ||
except Exception as excp: | ||
self.log.exception(f"Cannot remove an old log file after {fullpath}", lException=excp) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
189 changes: 189 additions & 0 deletions
189
src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
""" Test class for PilotLoggingAgent Agent | ||
""" | ||
import os | ||
import time | ||
import tempfile | ||
|
||
import pytest | ||
from unittest.mock import MagicMock, patch | ||
|
||
# DIRAC Components | ||
import DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent as plaModule | ||
from DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent import PilotLoggingAgent | ||
from DIRAC import gLogger, gConfig, S_OK, S_ERROR | ||
|
||
gLogger.setLevel("DEBUG") | ||
|
||
# Mock Objects | ||
mockReply = MagicMock() | ||
mockReply1 = MagicMock() | ||
mockOperations = MagicMock() | ||
mockTornadoClient = MagicMock() | ||
mockDataManager = MagicMock() | ||
mockAM = MagicMock() | ||
mockNone = MagicMock() | ||
mockNone.return_value = None | ||
|
||
upDict = { | ||
"OK": True, | ||
"Value": {"User": "proxyUser", "Group": "proxyGroup"}, | ||
} | ||
|
||
|
||
@pytest.fixture | ||
def plaBase(mocker): | ||
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.AgentModule.__init__") | ||
mocker.patch( | ||
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule._AgentModule__moduleProperties", | ||
side_effect=lambda x, y=None: y, | ||
create=True, | ||
) | ||
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.AgentModule.am_getOption", return_value=mockAM) | ||
mocker.patch( | ||
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.getVOs", | ||
return_value={"OK": True, "Value": ["gridpp", "lz"]}, | ||
) | ||
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.gConfig.getValue", return_value="GridPP") | ||
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations.getValue", side_effect=mockReply) | ||
mocker.patch( | ||
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations.getOptionsDict", side_effect=mockReply1 | ||
) | ||
pla = PilotLoggingAgent() | ||
pla.log = gLogger | ||
pla._AgentModule__configDefaults = mockAM | ||
return pla | ||
|
||
|
||
@pytest.fixture | ||
def pla_initialised(mocker, plaBase): | ||
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.PilotLoggingAgent.executeForVO") | ||
plaBase.initialize() | ||
return plaBase | ||
|
||
|
||
@pytest.fixture | ||
def pla(mocker, plaBase): | ||
mocker.patch( | ||
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.TornadoPilotLoggingClient", | ||
side_effect=mockTornadoClient, | ||
) | ||
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations", side_effect=mockOperations) | ||
mocker.patch( | ||
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.DataManager", | ||
side_effect=mockDataManager, | ||
) | ||
plaBase.initialize() | ||
return plaBase | ||
|
||
|
||
def test_initialize(plaBase): | ||
res = plaBase.initialize() | ||
assert plaBase.voList == plaModule.getVOs()["Value"] | ||
assert res == S_OK() | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"mockReplyInput, expected, expectedExecOut, expected2", | ||
[ | ||
("/Pilot/RemoteLogging", [True, False], S_OK(), upDict), | ||
("/Pilot/RemoteLogging", [False, False], S_OK(), upDict), | ||
("/Pilot/RemoteLogging", [True, False], S_ERROR("Execute for VO failed"), upDict), | ||
], | ||
) | ||
def test_execute(pla_initialised, mockReplyInput, expected, expectedExecOut, expected2): | ||
"""Testing a thin version of execute (executeForVO is mocked)""" | ||
assert pla_initialised.voList == plaModule.getVOs()["Value"] | ||
mockReply.side_effect = expected | ||
mockReply1.return_value = expected2 | ||
# remote pilot logging on (gridpp only) and off. | ||
pla_initialised.executeForVO.return_value = expectedExecOut | ||
res = pla_initialised.execute() | ||
if not any(expected): | ||
pla_initialised.executeForVO.assert_not_called() | ||
else: | ||
assert pla_initialised.executeForVO.called | ||
pla_initialised.executeForVO.assert_called_with( | ||
"gridpp", | ||
proxyUserName=upDict["Value"]["User"], | ||
proxyUserGroup=upDict["Value"]["Group"], | ||
) | ||
assert res["OK"] == expectedExecOut["OK"] | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"ppath, files, result", | ||
[ | ||
("pilot/log/path/", ["file1.log", "file2.log", "file3.log"], S_OK()), | ||
("pilot/log/path/", [], S_OK()), | ||
], | ||
) | ||
def test_executeForVO(pla, ppath, files, result): | ||
opsHelperValues = {"OK": True, "Value": {"UploadSE": "testUploadSE", "UploadPath": "/gridpp/uploadPath"}} | ||
# full local temporary path: | ||
filepath = os.path.join(tempfile.TemporaryDirectory().name, ppath) | ||
# this is what getMetadata returns: | ||
resDict = {"OK": True, "Value": {"LogPath": filepath}} | ||
mockTornadoClient.return_value.getMetadata.return_value = resDict | ||
mockDataManager.return_value.putAndRegister.return_value = result | ||
if files: | ||
os.makedirs(os.path.join(filepath, "gridpp"), exist_ok=True) | ||
for elem in files: | ||
open(os.path.join(filepath, "gridpp", elem), "w") | ||
mockOperations.return_value.getOptionsDict.return_value = opsHelperValues | ||
pla.opsHelper = mockOperations.return_value | ||
# success route | ||
res = pla.executeForVO(vo="gridpp") | ||
mockTornadoClient.assert_called_with(useCertificates=True) | ||
assert mockTornadoClient.return_value.getMetadata.called | ||
# only called with a non-empty file list: | ||
if files: | ||
assert mockDataManager.return_value.putAndRegister.called | ||
assert res == S_OK() | ||
|
||
|
||
def test_executeForVOMetaFails(pla): | ||
opsHelperValues = {"OK": True, "Value": {"UploadSE": "testUploadSE", "UploadPath": "/gridpp/uploadPath"}} | ||
mockOperations.return_value.getOptionsDict.return_value = opsHelperValues | ||
pla.opsHelper = mockOperations.return_value | ||
# getMetadata call fails. | ||
mockTornadoClient.return_value.getMetadata.return_value = {"OK": False, "Message": "Failed, sorry.."} | ||
res = pla.executeForVO(vo="anything") | ||
assert res["OK"] is False | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"opsHelperValues, expectedRes", | ||
[ | ||
({"OK": True, "Value": {"UploadPath": "/gridpp/uploadPath"}}, S_ERROR("Upload SE not defined")), | ||
({"OK": True, "Value": {"UploadSE": "testUploadSE"}}, S_ERROR("Upload path on SE testUploadSE not defined")), | ||
({"OK": False}, S_ERROR(f"No pilot section for gridpp vo")), | ||
], | ||
) | ||
def test_executeForVOBadConfig(pla, opsHelperValues, expectedRes): | ||
"""Testing an incomplete configuration""" | ||
mockOperations.return_value.getOptionsDict.return_value = opsHelperValues | ||
pla.opsHelper = mockOperations.return_value | ||
res = pla.executeForVO(vo="gridpp") | ||
assert res["OK"] is False | ||
assert res["Message"] == expectedRes["Message"] | ||
mockTornadoClient.return_value.getMetadata.reset_mock() | ||
mockTornadoClient.return_value.getMetadata.assert_not_called() | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"filename, fileAge, ageLimit, expectedResult", [("survives.log", 10, 20, True), ("getsdeleted.log", 21, 20, False)] | ||
) | ||
def test_oldLogsCleaner(plaBase, filename, fileAge, ageLimit, expectedResult): | ||
"""Testing old files removal""" | ||
plaBase.clearPilotsDelay = ageLimit | ||
filepath = tempfile.TemporaryDirectory().name | ||
os.makedirs(filepath, exist_ok=True) | ||
testfile = os.path.join(filepath, filename) | ||
fd = open(testfile, "w") | ||
fd.close() | ||
assert os.path.exists(testfile) is True | ||
# cannot patch os.stat globally because os.path.exists uses it ! | ||
with patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.os.stat") as mockOSStat: | ||
mockOSStat.return_value.st_mtime = time.time() - fileAge * 86400 # file older that fileAge in seconds | ||
plaBase.clearOldPilotLogs(filepath) | ||
assert os.path.exists(testfile) is expectedResult |
Oops, something went wrong.