diff --git a/dirac.cfg b/dirac.cfg index e7ccabbfc1b..d9150d86a3c 100644 --- a/dirac.cfg +++ b/dirac.cfg @@ -639,10 +639,6 @@ Resources # Default: /cvmfs/cernvm-prod.cern.ch/cvm4 ContainerRoot = /cvmfs/cernvm-prod.cern.ch/cvm4 - # The binary to start the container - # default: singularity - ContainerBin = /opt/extras/bin/singularity - # List of directories to bind ContainerBind = /etc/grid-security,someDir:::BoundHere diff --git a/src/DIRAC/Resources/Computing/SingularityComputingElement.py b/src/DIRAC/Resources/Computing/SingularityComputingElement.py index f0e153d11b4..af95f89df1d 100644 --- a/src/DIRAC/Resources/Computing/SingularityComputingElement.py +++ b/src/DIRAC/Resources/Computing/SingularityComputingElement.py @@ -11,14 +11,13 @@ See the Configuration/Resources/Computing documention for details on where to set the option parameters. """ - -import io import json import os import re import shutil import sys import tempfile +from pathlib import Path import DIRAC from DIRAC import S_OK, S_ERROR, gConfig, gLogger @@ -64,10 +63,6 @@ echo "Finishing inner container wrapper scripts at `date`." """ -# Path to a directory on CVMFS to use as a fallback if no -# other version found: Only used if node has user namespaces -FALLBACK_SINGULARITY = "/cvmfs/oasis.opensciencegrid.org/mis/singularity/current/bin" - CONTAINER_WRAPPER_NO_INSTALL = """#!/bin/bash echo "Starting inner container wrapper scripts (no install) at `date`." @@ -112,7 +107,6 @@ def __init__(self, ceUniqueID): self.__root = self.ceParameters["ContainerRoot"] self.__workdir = CONTAINER_WORKDIR self.__innerdir = CONTAINER_INNERDIR - self.__singularityBin = "singularity" self.__installDIRACInContainer = self.ceParameters.get("InstallDIRACInContainer", False) if isinstance(self.__installDIRACInContainer, str) and self.__installDIRACInContainer.lower() in ( "false", @@ -122,47 +116,6 @@ def __init__(self, ceUniqueID): self.processors = int(self.ceParameters.get("NumberOfProcessors", 1)) - def __hasUserNS(self): - """Detect if this node has user namespaces enabled. - Returns True if they are enabled, False otherwise. - """ - try: - with open("/proc/sys/user/max_user_namespaces") as proc_fd: - maxns = int(proc_fd.readline().strip()) - # Any "reasonable number" of namespaces is sufficient - return maxns > 100 - except Exception: - # Any failure, missing file, doesn't contain a number, etc. and we - # assume they are disabled. - return False - - def __hasSingularity(self): - """Search the current PATH for an exectuable named singularity. - Returns True if it is found, False otherwise. - """ - if self.ceParameters.get("ContainerBin"): - binPath = self.ceParameters["ContainerBin"] - if os.path.isfile(binPath) and os.access(binPath, os.X_OK): - self.__singularityBin = binPath - self.log.debug(f'Use singularity from "{self.__singularityBin}"') - return True - if "PATH" not in os.environ: - return False # Hmm, PATH not set? How unusual... - searchPaths = os.environ["PATH"].split(os.pathsep) - # We can use CVMFS as a last resort if userNS is enabled - if self.__hasUserNS(): - searchPaths.append(FALLBACK_SINGULARITY) - for searchPath in searchPaths: - binPath = os.path.join(searchPath, "singularity") - if os.path.isfile(binPath): - # File found, check it's executable to be certain: - if os.access(binPath, os.X_OK): - self.log.debug(f'Found singularity at "{binPath}"') - self.__singularityBin = binPath - return True - # No suitable binaries found - return False - @staticmethod def __findInstallBaseDir(): """Find the path to root of the current DIRAC installation""" @@ -326,11 +279,12 @@ def __getEnv(self): We blank almost everything to prevent contamination from the host system. """ - if not self.__installDIRACInContainer: - payloadEnv = {k: v for k, v in os.environ.items() if ENV_VAR_WHITELIST.match(k)} - else: + if self.__installDIRACInContainer: payloadEnv = {} + else: + payloadEnv = {k: v for k, v in os.environ.items() if ENV_VAR_WHITELIST.match(k)} + payloadEnv["PATH"] = str(Path(sys.executable).parent) payloadEnv["TMP"] = "/tmp" payloadEnv["TMPDIR"] = "/tmp" payloadEnv["X509_USER_PROXY"] = os.path.join(self.__innerdir, "proxy") @@ -361,10 +315,6 @@ def submitJob(self, executableFile, proxy=None, **kwargs): """ rootImage = self.__root renewTask = None - # Check that singularity is available - if not self.__hasSingularity(): - self.log.error("Singularity is not installed on PATH.") - return S_ERROR("Failed to find singularity") self.log.info("Creating singularity container") @@ -396,19 +346,19 @@ def submitJob(self, executableFile, proxy=None, **kwargs): # Mount /cvmfs in if it exists on the host withCVMFS = os.path.isdir("/cvmfs") innerCmd = os.path.join(self.__innerdir, "dirac_container.sh") - cmd = [self.__singularityBin, "exec"] - cmd.extend(["--contain"]) # use minimal /dev and empty other directories (e.g. /tmp and $HOME) - cmd.extend(["--ipc"]) # run container in a new IPC namespace - cmd.extend(["--workdir", baseDir]) # working directory to be used for /tmp, /var/tmp and $HOME - cmd.extend(["--home", "/tmp"]) # Avoid using small tmpfs for default $HOME and use scratch /tmp instead - if self.__hasUserNS(): - cmd.append("--userns") + outerCmd = ["apptainer", "exec"] + outerCmd.extend(["--contain"]) # use minimal /dev and empty other directories (e.g. /tmp and $HOME) + outerCmd.extend(["--ipc"]) # run container in a new IPC namespace + outerCmd.extend(["--workdir", baseDir]) # working directory to be used for /tmp, /var/tmp and $HOME + outerCmd.extend(["--home", "/tmp"]) # Avoid using small tmpfs for default $HOME and use scratch /tmp instead + outerCmd.append("--userns") if withCVMFS: - cmd.extend(["--bind", "/cvmfs"]) + outerCmd.extend(["--bind", "/cvmfs"]) if not self.__installDIRACInContainer: - cmd.extend(["--bind", "{0}:{0}:ro".format(self.__findInstallBaseDir())]) + outerCmd.extend(["--bind", "{0}:{0}:ro".format(self.__findInstallBaseDir())]) - bindPaths = self.ceParameters.get("ContainerBind", "").split(",") + rawBindPaths = self.ceParameters.get("ContainerBind", "") + bindPaths = rawBindPaths.split(",") if rawBindPaths else [] siteName = gConfig.getValue("/LocalSite/Site", "") ceName = gConfig.getValue("/LocalSite/GridCE", "") if siteName and ceName: @@ -441,20 +391,20 @@ def submitJob(self, executableFile, proxy=None, **kwargs): for bindPath in bindPaths: if len(bindPath.split(":::")) == 1: - cmd.extend(["--bind", bindPath.strip()]) + outerCmd.extend(["--bind", bindPath.strip()]) elif len(bindPath.split(":::")) in [2, 3]: - cmd.extend(["--bind", ":".join([bp.strip() for bp in bindPath.split(":::")])]) + outerCmd.extend(["--bind", ":".join([bp.strip() for bp in bindPath.split(":::")])]) if "ContainerOptions" in self.ceParameters: containerOpts = self.ceParameters["ContainerOptions"].split(",") for opt in containerOpts: - cmd.extend([opt.strip()]) - if os.path.isdir(rootImage) or os.path.isfile(rootImage): - cmd.extend([rootImage, innerCmd]) - else: + outerCmd.extend([opt.strip()]) + if not (os.path.isdir(rootImage) or os.path.isfile(rootImage)): # if we are here is because there's no image, or it is not accessible (e.g. not on CVMFS) self.log.error("Singularity image to exec not found: ", rootImage) return S_ERROR("Failed to find singularity image to exec") + outerCmd.append(rootImage) + cmd = outerCmd + [innerCmd] self.log.debug(f"Execute singularity command: {cmd}") self.log.debug(f"Execute singularity env: {self.__getEnv()}") @@ -464,6 +414,13 @@ def submitJob(self, executableFile, proxy=None, **kwargs): if not result["OK"]: self.log.error("Fail to run Singularity", result["Message"]) + # If we fail to run the container try to run it again with verbose output + # to help with debugging. + self.log.error("Singularity command was: ", cmd) + self.log.error(f"Singularity env was: {self.__getEnv()}") + debugCmd = [outerCmd[0], "--debug"] + outerCmd[1:] + ["echo", "All okay"] + self.log.error("Running with debug output to facilitate debugging", debugCmd) + result = systemCall(0, debugCmd, callbackFunction=self.sendOutput, env=self.__getEnv()) if proxy and renewTask: gThreadScheduler.removeTask(renewTask) self.__deleteWorkArea(baseDir) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py index 20c6a542f17..e6b15df5d9a 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py @@ -1,9 +1,11 @@ """ Test class for Job Agent """ +import multiprocessing import os import pytest import time -from unittest.mock import MagicMock +from concurrent.futures import ProcessPoolExecutor +from functools import partial from DIRAC import gLogger, S_OK, S_ERROR from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error @@ -587,7 +589,7 @@ def test_submitJob(mocker, mockJWInput, expected): ("Pool/Singularity", jobScript % "1", (["Failed to find singularity"], []), ([], [])), ], ) -def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult2): +def test_submitAndCheckJob(monkeypatch, mocker, localCE, job, expectedResult1, expectedResult2): """Test the submission and the management of the job status.""" jobName = "testJob.py" with open(jobName, "w") as execFile: @@ -606,8 +608,14 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent._sendFailoverRequest", return_value=S_OK()) mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK()) mocker.patch( - "DIRAC.Resources.Computing.SingularityComputingElement.SingularityComputingElement._SingularityComputingElement__hasSingularity", - return_value=False, + "DIRAC.Resources.Computing.SingularityComputingElement.SingularityComputingElement.submitJob", + return_value=S_ERROR("Failed to find singularity"), + ) + # We need to force ProcessPoolExecutor to use the fork context to enable the + # mocks to propagate to the subprocesses used by PoolComputingElement + mocker.patch( + "concurrent.futures.ProcessPoolExecutor", + side_effect=partial(ProcessPoolExecutor, mp_context=multiprocessing.get_context("fork")), ) jobAgent = JobAgent("JobAgent", "Test")