Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0] Use apptainer for SingularityComputingElement and enhance debugging #8000

Open
wants to merge 1 commit into
base: rel-v8r0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions dirac.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -639,10 +639,6 @@ Resources
# Default: /cvmfs/cernvm-prod.cern.ch/cvm4
ContainerRoot = /cvmfs/cernvm-prod.cern.ch/cvm4

# The binary to start the container
# default: singularity
ContainerBin = /opt/extras/bin/singularity

# List of directories to bind
ContainerBind = /etc/grid-security,someDir:::BoundHere

Expand Down
99 changes: 28 additions & 71 deletions src/DIRAC/Resources/Computing/SingularityComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
See the Configuration/Resources/Computing documention for details on
where to set the option parameters.
"""

import io
import json
import os
import re
import shutil
import sys
import tempfile
from pathlib import Path

import DIRAC
from DIRAC import S_OK, S_ERROR, gConfig, gLogger
Expand Down Expand Up @@ -64,10 +63,6 @@
echo "Finishing inner container wrapper scripts at `date`."

"""
# Path to a directory on CVMFS to use as a fallback if no
# other version found: Only used if node has user namespaces
FALLBACK_SINGULARITY = "/cvmfs/oasis.opensciencegrid.org/mis/singularity/current/bin"

CONTAINER_WRAPPER_NO_INSTALL = """#!/bin/bash

echo "Starting inner container wrapper scripts (no install) at `date`."
Expand Down Expand Up @@ -112,7 +107,6 @@ def __init__(self, ceUniqueID):
self.__root = self.ceParameters["ContainerRoot"]
self.__workdir = CONTAINER_WORKDIR
self.__innerdir = CONTAINER_INNERDIR
self.__singularityBin = "singularity"
self.__installDIRACInContainer = self.ceParameters.get("InstallDIRACInContainer", False)
if isinstance(self.__installDIRACInContainer, str) and self.__installDIRACInContainer.lower() in (
"false",
Expand All @@ -122,47 +116,6 @@ def __init__(self, ceUniqueID):

self.processors = int(self.ceParameters.get("NumberOfProcessors", 1))

def __hasUserNS(self):
"""Detect if this node has user namespaces enabled.
Returns True if they are enabled, False otherwise.
"""
try:
with open("/proc/sys/user/max_user_namespaces") as proc_fd:
maxns = int(proc_fd.readline().strip())
# Any "reasonable number" of namespaces is sufficient
return maxns > 100
except Exception:
# Any failure, missing file, doesn't contain a number, etc. and we
# assume they are disabled.
return False

def __hasSingularity(self):
"""Search the current PATH for an exectuable named singularity.
Returns True if it is found, False otherwise.
"""
if self.ceParameters.get("ContainerBin"):
binPath = self.ceParameters["ContainerBin"]
if os.path.isfile(binPath) and os.access(binPath, os.X_OK):
self.__singularityBin = binPath
self.log.debug(f'Use singularity from "{self.__singularityBin}"')
return True
if "PATH" not in os.environ:
return False # Hmm, PATH not set? How unusual...
searchPaths = os.environ["PATH"].split(os.pathsep)
# We can use CVMFS as a last resort if userNS is enabled
if self.__hasUserNS():
searchPaths.append(FALLBACK_SINGULARITY)
for searchPath in searchPaths:
binPath = os.path.join(searchPath, "singularity")
if os.path.isfile(binPath):
# File found, check it's executable to be certain:
if os.access(binPath, os.X_OK):
self.log.debug(f'Found singularity at "{binPath}"')
self.__singularityBin = binPath
return True
# No suitable binaries found
return False

@staticmethod
def __findInstallBaseDir():
"""Find the path to root of the current DIRAC installation"""
Expand Down Expand Up @@ -326,11 +279,12 @@ def __getEnv(self):
We blank almost everything to prevent contamination from the host system.
"""

if not self.__installDIRACInContainer:
payloadEnv = {k: v for k, v in os.environ.items() if ENV_VAR_WHITELIST.match(k)}
else:
if self.__installDIRACInContainer:
payloadEnv = {}
else:
payloadEnv = {k: v for k, v in os.environ.items() if ENV_VAR_WHITELIST.match(k)}

payloadEnv["PATH"] = str(Path(sys.executable).parent)
payloadEnv["TMP"] = "/tmp"
payloadEnv["TMPDIR"] = "/tmp"
payloadEnv["X509_USER_PROXY"] = os.path.join(self.__innerdir, "proxy")
Expand Down Expand Up @@ -361,10 +315,6 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
"""
rootImage = self.__root
renewTask = None
# Check that singularity is available
if not self.__hasSingularity():
self.log.error("Singularity is not installed on PATH.")
return S_ERROR("Failed to find singularity")

self.log.info("Creating singularity container")

Expand Down Expand Up @@ -396,19 +346,19 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
# Mount /cvmfs in if it exists on the host
withCVMFS = os.path.isdir("/cvmfs")
innerCmd = os.path.join(self.__innerdir, "dirac_container.sh")
cmd = [self.__singularityBin, "exec"]
cmd.extend(["--contain"]) # use minimal /dev and empty other directories (e.g. /tmp and $HOME)
cmd.extend(["--ipc"]) # run container in a new IPC namespace
cmd.extend(["--workdir", baseDir]) # working directory to be used for /tmp, /var/tmp and $HOME
cmd.extend(["--home", "/tmp"]) # Avoid using small tmpfs for default $HOME and use scratch /tmp instead
if self.__hasUserNS():
cmd.append("--userns")
outerCmd = ["apptainer", "exec"]
outerCmd.extend(["--contain"]) # use minimal /dev and empty other directories (e.g. /tmp and $HOME)
outerCmd.extend(["--ipc"]) # run container in a new IPC namespace
outerCmd.extend(["--workdir", baseDir]) # working directory to be used for /tmp, /var/tmp and $HOME
outerCmd.extend(["--home", "/tmp"]) # Avoid using small tmpfs for default $HOME and use scratch /tmp instead
outerCmd.append("--userns")
if withCVMFS:
cmd.extend(["--bind", "/cvmfs"])
outerCmd.extend(["--bind", "/cvmfs"])
if not self.__installDIRACInContainer:
cmd.extend(["--bind", "{0}:{0}:ro".format(self.__findInstallBaseDir())])
outerCmd.extend(["--bind", "{0}:{0}:ro".format(self.__findInstallBaseDir())])

bindPaths = self.ceParameters.get("ContainerBind", "").split(",")
rawBindPaths = self.ceParameters.get("ContainerBind", "")
bindPaths = rawBindPaths.split(",") if rawBindPaths else []
siteName = gConfig.getValue("/LocalSite/Site", "")
ceName = gConfig.getValue("/LocalSite/GridCE", "")
if siteName and ceName:
Expand Down Expand Up @@ -441,20 +391,20 @@ def submitJob(self, executableFile, proxy=None, **kwargs):

for bindPath in bindPaths:
if len(bindPath.split(":::")) == 1:
cmd.extend(["--bind", bindPath.strip()])
outerCmd.extend(["--bind", bindPath.strip()])
elif len(bindPath.split(":::")) in [2, 3]:
cmd.extend(["--bind", ":".join([bp.strip() for bp in bindPath.split(":::")])])
outerCmd.extend(["--bind", ":".join([bp.strip() for bp in bindPath.split(":::")])])

if "ContainerOptions" in self.ceParameters:
containerOpts = self.ceParameters["ContainerOptions"].split(",")
for opt in containerOpts:
cmd.extend([opt.strip()])
if os.path.isdir(rootImage) or os.path.isfile(rootImage):
cmd.extend([rootImage, innerCmd])
else:
outerCmd.extend([opt.strip()])
if not (os.path.isdir(rootImage) or os.path.isfile(rootImage)):
# if we are here is because there's no image, or it is not accessible (e.g. not on CVMFS)
self.log.error("Singularity image to exec not found: ", rootImage)
return S_ERROR("Failed to find singularity image to exec")
outerCmd.append(rootImage)
cmd = outerCmd + [innerCmd]

self.log.debug(f"Execute singularity command: {cmd}")
self.log.debug(f"Execute singularity env: {self.__getEnv()}")
Expand All @@ -464,6 +414,13 @@ def submitJob(self, executableFile, proxy=None, **kwargs):

if not result["OK"]:
self.log.error("Fail to run Singularity", result["Message"])
# If we fail to run the container try to run it again with verbose output
# to help with debugging.
self.log.error("Singularity command was: ", cmd)
self.log.error(f"Singularity env was: {self.__getEnv()}")
debugCmd = [outerCmd[0], "--debug"] + outerCmd[1:] + ["echo", "All okay"]
self.log.error("Running with debug output to facilitate debugging", debugCmd)
result = systemCall(0, debugCmd, callbackFunction=self.sendOutput, env=self.__getEnv())
if proxy and renewTask:
gThreadScheduler.removeTask(renewTask)
self.__deleteWorkArea(baseDir)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
""" Test class for Job Agent
"""
import multiprocessing
import os
import pytest
import time
from unittest.mock import MagicMock
from concurrent.futures import ProcessPoolExecutor
from functools import partial

from DIRAC import gLogger, S_OK, S_ERROR
from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error
Expand Down Expand Up @@ -587,7 +589,7 @@ def test_submitJob(mocker, mockJWInput, expected):
("Pool/Singularity", jobScript % "1", (["Failed to find singularity"], []), ([], [])),
],
)
def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult2):
def test_submitAndCheckJob(monkeypatch, mocker, localCE, job, expectedResult1, expectedResult2):
"""Test the submission and the management of the job status."""
jobName = "testJob.py"
with open(jobName, "w") as execFile:
Expand All @@ -606,8 +608,14 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent._sendFailoverRequest", return_value=S_OK())
mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK())
mocker.patch(
"DIRAC.Resources.Computing.SingularityComputingElement.SingularityComputingElement._SingularityComputingElement__hasSingularity",
return_value=False,
"DIRAC.Resources.Computing.SingularityComputingElement.SingularityComputingElement.submitJob",
return_value=S_ERROR("Failed to find singularity"),
aldbr marked this conversation as resolved.
Show resolved Hide resolved
)
# We need to force ProcessPoolExecutor to use the fork context to enable the
# mocks to propagate to the subprocesses used by PoolComputingElement
mocker.patch(
"concurrent.futures.ProcessPoolExecutor",
side_effect=partial(ProcessPoolExecutor, mp_context=multiprocessing.get_context("fork")),
)

jobAgent = JobAgent("JobAgent", "Test")
Expand Down
Loading