Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dirac MyProxy support #101

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@
"service": "test",
// "shutdownAction": "stopCompose",
"workspaceFolder": "/workspace",

// Features to add to the dev container. More info: https://containers.dev/features.
// "features": {},

// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],

// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "uname -a",
"postCreateCommand": "pip install -e ."

// Configure tool-specific properties.
// "customizations": {},

,
"customizations": {
"vscode": {
"extensions": [
"ms-python.python"
]
}
}
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}
32 changes: 32 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,28 @@ The config.yaml in current working directory is used.

</details>

```yaml
destinations:
grid:
scheduler:
type: dirac
storage_element: StorageElementOne
proxy:
# Passphrase for ~/.globus/userkey.pem
password_file: /path/to/passwordfile
filesystem:
type: dirac
lfn_root: /tutoVO/user/c/ciuser/bartenderjobs
storage_element: StorageElementOne
proxy:
password_file: /path/to/passwordfile
```

### Example of running jobs on a DIRAC grid with myproxy

Requires [diracos](https://github.com/DIRACGrid/DIRACOS2) and dirac.cfg
to be installed and configured.

```yaml
destinations:
grid:
Expand All @@ -389,12 +411,22 @@ destinations:
storage_element: StorageElementOne
proxy:
log_level: DEBUG
myproxy:
username: myusername
password_file: /path/to/passwordfile
proxy_rfc: /tmp/x509up_u1000-rfc
proxy: /tmp/x509up_u1000
filesystem:
type: dirac
lfn_root: /tutoVO/user/c/ciuser/bartenderjobs
storage_element: StorageElementOne
proxy:
log_level: DEBUG
myproxy:
username: myusername
password_file: /path/to/passwordfile
proxy_rfc: /tmp/x509up_u1000-rfc
proxy: /tmp/x509up_u1000
```

### Example of running jobs direct on submission
Expand Down
2 changes: 1 addition & 1 deletion docs/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ the `WorkloadManagement_SiteDirector` service starting a pilot.
To look around inside the DIRAC server use

```shell
docker-compose -f tests_dirac/docker-compose.yml exec dirac-tuto bash
docker compose -f tests_dirac/docker-compose.yml exec dirac-tuto bash
```

Sometimes the DIRAC server needs clearing of its state,
Expand Down
2 changes: 1 addition & 1 deletion src/bartender/filesystems/dirac.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def upload(self, src: JobDescription, target: JobDescription) -> None:
input_tar_on_grid = target.job_dir / archive_fn.name
logger.warning(
f"Uploading {archive_fn} to {input_tar_on_grid}"
f"on {self.storage_element}",
f" on {self.storage_element}",
)
result = await put(
lfn=str(input_tar_on_grid),
Expand Down
2 changes: 1 addition & 1 deletion src/bartender/filesystems/dirac_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def _validate_lfn_root(
cls, # noqa: N805 signature of validator
v: str, # noqa: WPS111 signature of validator
) -> str:
pattern = r"^\/\w+\/user\/([a-zA-Z])\/\1\w+\/.*$"
pattern = r"^\/[\w\.]+\/user\/([a-zA-Z])\/\w+\/.*$"
if not re.match(pattern, v):
template = "/<VO>/user/<initial>/<username>/<whatever>"
raise ValueError(
Expand Down
22 changes: 21 additions & 1 deletion src/bartender/schedulers/dirac.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,14 @@ def _job_script_content(self, description: JobDescription) -> str:

def _command_script(self, description: JobDescription) -> str:
command = description.command
dl_image = ""
if self.config.apptainer_image:
image = self.config.apptainer_image
image = str(self.config.apptainer_image)
if self.config.apptainer_image.is_relative_to(_lfn_user_home(description)):
image = self.config.apptainer_image.name
# Also exclude sif file from in output.tar
lfn_image = self.config.apptainer_image
dl_image = f"dirac-dms-get-file {lfn_image} && echo {image} >> .input_files.txt" # noqa: E501
# TODO if command is complex then qoutes are likely needed
command = f"apptainer run {image} {description.command}"
# added echo so DIRAC
Expand All @@ -265,6 +271,7 @@ def _command_script(self, description: JobDescription) -> str:
return dedent(
f"""\
# Run command
{dl_image}
echo 'Running command for {description.job_dir}'
({command}) > stdout.txt 2> stderr.txt
echo -n $? > returncode
Expand Down Expand Up @@ -302,6 +309,19 @@ async def _jdl_script(self, description: JobDescription, scriptdir: Path) -> str
)


def _lfn_user_home(description: JobDescription) -> Path:
"""Return user's home directory on grid storage.

Args:
description: Description of job.

Returns:
User's home directory on grid storage.
"""
nr_home_dir_parts = 5
return Path(*description.job_dir.parts[:nr_home_dir_parts])


def _relative_output_dir(description: JobDescription) -> Path:
"""Return description.output_dir relative to user's home directory.

Expand Down
6 changes: 4 additions & 2 deletions src/bartender/schedulers/dirac_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ class DiracSchedulerConfig(BaseModel):
"""Configuration for DIRAC scheduler.

Args:
apptainer_image: Path on cvmfs to apptainer image.
Will run application command inside apptainer image.
apptainer_image: Path on cvmfs or grid storage to apptainer image.
When set will run application command inside apptainer image.
Image can also be on grid storage,
it will then be downloaded to current directory first.
storage_element: Storage element to upload output files to.
proxy: Proxy configuration.
"""
Expand Down
118 changes: 114 additions & 4 deletions src/bartender/shared/dirac.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import logging
import shutil
from pathlib import Path
from subprocess import ( # noqa: S404 security implications OK
PIPE,
CalledProcessError,
Expand All @@ -11,7 +13,7 @@
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
from DIRAC.Core.Utilities.exceptions import DIRACInitError

from bartender.shared.dirac_config import ProxyConfig
from bartender.shared.dirac_config import MyProxyConfig, ProxyConfig

logger = logging.getLogger(__file__)

Expand Down Expand Up @@ -40,7 +42,12 @@ async def proxy_init(config: ProxyConfig) -> None:

Raises:
CalledProcessError: If failed to create proxy.

Returns:
Nothing
"""
if config.myproxy:
return await myproxy_init(config)
cmd = _proxy_init_command(config)
logger.warning(f"Running command: {cmd}")
process = await asyncio.create_subprocess_exec(
Expand All @@ -49,8 +56,13 @@ async def proxy_init(config: ProxyConfig) -> None:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
password = None
if config.password:
password = config.password.encode()
elif config.password_file:
password = config.password_file.read_bytes()
stdout, stderr = await process.communicate(
config.password.encode() if config.password else None,
password,
)
if process.returncode:
raise CalledProcessError(process.returncode, cmd, stderr=stderr, output=stdout)
Expand All @@ -66,10 +78,16 @@ def sync_proxy_init(config: ProxyConfig) -> None:
# but dirac-proxy-init script is too long to copy here
# and password would be unpassable so decided to keep calling subprocess.
cmd = _proxy_init_command(config)
password = None
if config.password:
password = config.password.encode()
elif config.password_file:
password = config.password_file.read_bytes()

logger.warning(f"Running command: {cmd}")
run( # noqa: S603 subprocess call OK
cmd,
input=config.password.encode() if config.password else None,
input=password,
stdout=PIPE,
stderr=PIPE,
check=True,
Expand All @@ -86,7 +104,7 @@ def _proxy_init_command(config: ProxyConfig) -> list[str]:
parts.extend(["-K", config.key])
if config.group:
parts.extend(["-g", config.group])
if config.password:
if config.password or config.password_file:
parts.append("-p")
return parts

Expand Down Expand Up @@ -162,3 +180,95 @@ async def teardown_proxy_renewer() -> None:
task.cancel()
await asyncio.gather(task, return_exceptions=True)
renewer = None # noqa: WPS442 simpler then singleton


async def myproxy_init(config: ProxyConfig) -> None:
"""
Create or renew proxy using MyProxy server.

Args:
config: The MyProxy configuration.

Raises:
ValueError: If no MyProxy configuration is provided.
"""
if config.myproxy is None:
raise ValueError("No myproxy configuration")

# myproxy-logon \
# --pshost config.pshost \
# --proxy_lifetime config.proxy_lifetime \
# --username config.username \
# --out tmprfcfile \
# --stdin_pass \
# < config.password_file
await myproxy_logon(config.myproxy)
# cp tmprfcfile proxyfile
await shutil.copy(config.myproxy.proxy_rfc, config.myproxy.proxy)
# dirac-admin-proxy-upload -d -P tmprfcfile
await proxy_upload(config.myproxy.proxy)
# then check that proxy is valid and has time left
get_time_left_on_proxy()


async def myproxy_logon(config: MyProxyConfig) -> None:
"""
Log in to MyProxy server using the provided configuration.

Args:
config: The configuration object containing the necessary parameters.

Raises:
CalledProcessError: If the MyProxy logon process returns a non-zero exit code.

Returns:
None
"""
password = config.password_file.read_bytes()
cmd = [
"myproxy-logon",
"--pshost",
config.pshost,
"--proxy_lifetime",
config.proxy_lifetime,
"--username",
config.username,
"--out",
str(config.proxy_rfc),
"--stdin_pass",
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate(
password,
)
if process.returncode:
raise CalledProcessError(process.returncode, cmd, stderr=stderr, output=stdout)


async def proxy_upload(proxy: Path) -> None:
"""
Uploads the given proxy file using dirac-admin-proxy-upload command.

Args:
proxy: The path to the proxy file.

Raises:
CalledProcessError: If the subprocess returns a non-zero exit code.

Returns:
None
"""
# TODO use Python library to upload proxy
cmd = ["dirac-admin-proxy-upload", "-d", "-P", str(proxy)]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode:
raise CalledProcessError(process.returncode, cmd, stderr=stderr, output=stdout)
29 changes: 28 additions & 1 deletion src/bartender/shared/dirac_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pkgutil
from pathlib import Path
from typing import Literal, Optional

from pydantic import BaseModel
from pydantic import BaseModel, FilePath

DIRAC_INSTALLED = (
pkgutil.find_loader("DIRAC") is not None
Expand All @@ -23,6 +24,27 @@
]


class MyProxyConfig(BaseModel):
"""Configuration for MyProxy server.

Args:
pshost: The hostname of the MyProxy server.
username: Username for the delegated proxy
proxy_lifetime: Lifetime of proxies delegated by the server
password_file: The path to the file containing the password for the proxy.
proxy_rfc: The path to the generated RFC proxy file.
proxy: The path to the generated proxy file.
This proxy file should be used submit and manage jobs.
"""

pshost: str = "px.grid.sara.nl"
username: str
password_file: FilePath
proxy_lifetime: str = "167:59" # 7 days
proxy_rfc: Path
proxy: Path


class ProxyConfig(BaseModel):
"""Configuration for DIRAC proxy.

Expand All @@ -33,6 +55,9 @@ class ProxyConfig(BaseModel):
valid: How long proxy should be valid. Format HH:MM.
By default is 24 hours.
password: The password for the private key file.
password_file: The path to the file containing
the password for the private key file.
Should not end with a newline.
min_life: If proxy has less than this many seconds left, renew it.
Default 30 minutes.
log_level: The log level for the DIRAC logger. Default INFO.
Expand All @@ -43,5 +68,7 @@ class ProxyConfig(BaseModel):
group: Optional[str] = None
valid: Optional[str] = None
password: Optional[str] = None
password_file: Optional[FilePath] = None
min_life: int = 1800
log_level: LogLevel = "INFO"
myproxy: Optional[MyProxyConfig] = None
Loading
Loading