Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate dsub #195

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2062294
Add dsub to conda environment
nisthapanda Sep 26, 2023
863bdcf
Add functions for running SPRAS with dsub in All of Us
nisthapanda Sep 26, 2023
152d762
Merge remote-tracking branch 'upstream/master' into dsub
nisthapanda Sep 26, 2023
92c2a97
Add functions for running with dsub in All of Us
nisthapanda Sep 27, 2023
61976b7
Resolved volume mapping issues
nisthapanda Sep 27, 2023
1725d1b
Change location of dsub logs in Google Cloud Storage
nisthapanda Sep 27, 2023
2aac5f2
Merge branch 'master' into dsub
nisthapanda Nov 13, 2023
bb1af98
Merge pull request #1 from nisthapanda/dsub
nisthapanda Nov 13, 2023
f0134a3
Merge remote-tracking branch 'spras/direction' into dsub
nisthapanda Nov 21, 2023
d702721
Merge SPRAS updates
nisthapanda Jan 17, 2024
616cd67
Move run_container_dsub and helper functions to contianers.py, add te…
nisthapanda Jan 17, 2024
11d0450
Update dsub version in environment.yml
nisthapanda Jan 17, 2024
2769ab0
Update dsub version in environment.yml
nisthapanda Jan 17, 2024
6b2a3c3
Update sphinx-rtd-theme version in environment.yml
nisthapanda Jan 17, 2024
4b8a66a
Merge branch 'Reed-CompBio:master' into dsub
nisthapanda Jun 13, 2024
28682a8
Change to use gcloud storage rsync to copy files for faster transfer …
nisthapanda Jun 13, 2024
2f8723a
Change the the container tag to match container in GCR
nisthapanda Jun 13, 2024
8634119
Merge branch 'Reed-CompBio:master' into dsub
nisthapanda Jun 19, 2024
b712711
Merge branch 'Reed-CompBio:master' into dsub
nisthapanda Jun 25, 2024
cea8505
Remove temporary solution for running OI1 with dsub
nisthapanda Jun 25, 2024
6d6f4bb
Add dsub as a possible framework
nisthapanda Jun 25, 2024
816ec07
Create workaround for uploading empty directories to GCS, Change how …
nisthapanda Jun 25, 2024
e9be16b
Add dsub volume mount prefix to paths in meo-properties.txt file
nisthapanda Jun 25, 2024
53eaf35
Remove unused package and extra spaces
nisthapanda Jun 25, 2024
ce951cd
Remove print statement, remove trailing spaces
nisthapanda Jun 25, 2024
c225dcf
Remove trailing spaces, code formatting
nisthapanda Jun 25, 2024
d0172fb
Merge branch 'Reed-CompBio:master' into dsub
nisthapanda Jul 1, 2024
88105ee
Merge SPRAS master
nisthapanda Aug 16, 2024
5452b6f
Add dependencies from dsub to conda environment
nisthapanda Aug 21, 2024
0bf57a3
Merge branch 'Reed-CompBio:master' into dsub
nisthapanda Aug 28, 2024
f80c3fe
Add minimal `dsub` framework documentation in config.yaml
jhiemstrawisc Dec 10, 2024
5aa72ee
Add runtime warning if user selects `dsub` framework
jhiemstrawisc Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
# The length of the hash used to identify a parameter combination
hash_length: 7

# Specify the container framework. Current supported versions include 'docker' and
# 'singularity'. If container_framework is not specified, SPRAS will default to docker.
# Specify the container framework used by each PRM wrapper. Valid options include:
# - docker (default if not specified)
# - singularity -- Also known as apptainer, useful in HPC/HTC environments where docker isn't allowed
# - dsub -- experimental with limited support, used for running on Google Cloud
container_framework: docker

# Only used if container_framework is set to singularity, this will unpack the singularity containers
Expand Down
7 changes: 7 additions & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ dependencies:
- scikit-learn=1.2
- seaborn=0.12
- spython=0.2
# for dsub
- python-dateutil<=2.9.0
- pytz<=2024.1
- pyyaml<=6.0.1
- tenacity<=8.2.3
- tabulate<=0.9.0
# Only required for GraphSpace
- commonmark=0.9
- docutils=0.19
Expand All @@ -27,3 +33,4 @@ dependencies:
- pip:
- graphspace_python==1.3.1
- sphinx-rtd-theme==2.0.0
- dsub==0.4.13
6 changes: 4 additions & 2 deletions spras/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, raw_config):
# __init__ makes clear exactly what is being configured.
# Directory used for storing output
self.out_dir = None
# Container framework used by PRMs. Valid options are "docker" and "singularity"
# Container framework used by PRMs. Valid options are "docker", "dsub", and "singularity"
self.container_framework = None
# The container prefix (host and organization) to use for images. Default is "docker.io/reedcompbio"
self.container_prefix = DEFAULT_CONTAINER_PREFIX
Expand Down Expand Up @@ -114,9 +114,11 @@ def process_config(self, raw_config):
# However, if we get a bad value, we raise an exception.
if "container_framework" in raw_config:
container_framework = raw_config["container_framework"].lower()
if container_framework not in ("docker", "singularity"):
if container_framework not in ("docker", "singularity", "dsub"):
msg = "SPRAS was configured to run with an unknown container framework: '" + raw_config["container_framework"] + "'. Accepted values are 'docker' or 'singularity'."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update the options in this line too

raise ValueError(msg)
if container_framework == "dsub":
print("Warning: 'dsub' framework integration is experimental and may not be fully supported.")
self.container_framework = container_framework
else:
self.container_framework = "docker"
Expand Down
147 changes: 147 additions & 0 deletions spras/containers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import platform
import re
import subprocess
from pathlib import Path, PurePath, PurePosixPath
from typing import Any, Dict, List, Optional, Tuple, Union

Expand Down Expand Up @@ -41,6 +42,79 @@ def convert_docker_path(src_path: PurePath, dest_path: PurePath, file_path: Unio
rel_path = file_path.relative_to(src_path)
return PurePosixPath(dest_path, rel_path)

def download_gcs(gcs_path: str, local_path: str, is_dir: bool):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My IDE shows a warning requesting 2 blank lines before these new functions.

# check that output path exists
if not os.path.exists(Path(local_path).parent):
os.makedirs(Path(local_path).parent)

# build command
cmd = 'gcloud storage'
# rsync with checksums to make file transfer faster for larger files
cmd = cmd + ' rsync --checksums-only'
# check if directory
if is_dir:
cmd = cmd + ' -r'
cmd = cmd + ' ' + gcs_path + ' ' + local_path

print(cmd)
# run command
subprocess.run(cmd, shell=True)

if is_dir and Path(Path(local_path)/'gcs_temp.txt').exists():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nisthapanda why do we check is_dir here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the is_dir here.

os.remove(Path(Path(local_path)/'gcs_temp.txt'))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
os.remove(Path(Path(local_path)/'gcs_temp.txt'))
Path(Path(local_path)/'gcs_temp.txt').unlink()

Does this work? It's a bit simpler to me to stick with pathlib operations.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works.


def upload_gcs(local_path: str, gcs_path: str, is_dir: bool):
# check if path exists in cloud storage
exists = len(subprocess.run(f'gcloud storage ls {gcs_path}', shell=True, capture_output=True, text=True).stdout)
# if path exists rsyc
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# if path exists rsyc
# if path exists rsync

if exists > 0:
cmd = 'gcloud storage rsync --checksums-only'
# if directory is empty
elif exists == 0 and len(os.listdir(local_path)) == 0:
# create a temporary file because GCS will not recognize empty directories
Path(Path(local_path)/'gcs_temp.txt').touch()
# copy path to cloud storage
cmd = 'gcloud storage cp -c'
# else copy path to cloud storage
else:
cmd = 'gcloud storage cp -c'
# check if directory
if is_dir:
cmd = cmd + ' -r'
cmd = cmd + ' ' + str(Path(local_path).resolve()) + ' ' + gcs_path

print(cmd)
# run command
subprocess.run(cmd, shell=True)

def prepare_dsub_cmd(flags: dict):
# set constant flags
dsub_command = 'dsub'
flags['provider'] = 'google-cls-v2'
flags['regions'] = 'us-central1'
flags['user-project'] = os.getenv('GOOGLE_PROJECT')
flags['project'] = os.getenv('GOOGLE_PROJECT')
flags['network'] = 'network'
flags['subnetwork'] = 'subnetwork'
flags['service-account'] = subprocess.run(['gcloud', 'config' ,'get-value' ,'account'], capture_output=True, text=True).stdout.replace('\n', '')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
flags['service-account'] = subprocess.run(['gcloud', 'config' ,'get-value' ,'account'], capture_output=True, text=True).stdout.replace('\n', '')
flags['service-account'] = subprocess.run(['gcloud', 'config', 'get-value', 'account'], capture_output=True, text=True).stdout.replace('\n', '')


# order flags according to flag_list
flag_list = ["provider", "regions", "zones", "location", "user-project", "project", "network", "subnetwork", "service-account", "image", "env", "logging", "input", "input-recursive", "mount", "output", "output-recursive", "command", "script"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap the long line

ordered_flags = {f:flags[f] for f in flag_list if f in flags.keys()}

# iteratively add flags to the command
for flag in ordered_flags.keys():
if isinstance(ordered_flags.get(flag), list):
for f in ordered_flags.get(flag):
dsub_command = dsub_command + " --" + flag + " " + f
else:
dsub_command = dsub_command + " --" + flag + " " + ordered_flags.get(flag)

# Wait for dsub job to complegte
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Wait for dsub job to complegte
# Wait for dsub job to complete

dsub_command = dsub_command + " --wait"
print(f"Command: {dsub_command}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
print(f"Command: {dsub_command}")
print(f"dsub command: {dsub_command}")

return dsub_command


# TODO consider a better default environment variable
# TODO environment currently a single string (e.g. 'TMPDIR=/OmicsIntegrator1'), should it be a list?
Expand All @@ -65,6 +139,8 @@ def run_container(framework: str, container_suffix: str, command: List[str], vol
return run_container_docker(container, command, volumes, working_dir, environment)
elif normalized_framework == 'singularity':
return run_container_singularity(container, command, volumes, working_dir, environment)
elif normalized_framework == 'dsub':
return run_container_dsub(container, command, volumes, working_dir, environment)
else:
raise ValueError(f'{framework} is not a recognized container framework. Choose "docker" or "singularity".')

Expand Down Expand Up @@ -258,3 +334,74 @@ def prepare_volume(filename: Union[str, PurePath], volume_base: Union[str, PureP
src = parent

return (src, dest), container_filename

def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'):
def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True') -> str:

"""
Runs a command in the container using Docker.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Runs a command in the container using Docker.
Runs a command in Google Cloud using dsub.

Attempts to automatically correct file owner and group for new files created by the container, setting them to the
current owner and group IDs.
Does not modify the owner or group for existing files modified by the container.
Comment on lines +341 to +343
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Attempts to automatically correct file owner and group for new files created by the container, setting them to the
current owner and group IDs.
Does not modify the owner or group for existing files modified by the container.

@param container: name of the container in the Google Cloud Container Registry
@param command: command to run in the container
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@param command: command to run in the container
@param command: command to run

@param volumes: a list of volumes to mount where each item is a (source, destination) tuple
@param working_dir: the working directory in the container
@param environment: environment variables to set in the container
@return: path of output from dsub
"""
# Dictionary of flags for dsub command
flags = dict()

workspace_bucket = os.getenv('WORKSPACE_BUCKET')
# Add path in the workspace bucket and label for dsub command for each volume
dsub_volumes = [(src, dst, workspace_bucket + str(dst), "INPUT_" + str(i),) for i, (src, dst) in enumerate(volumes)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dsub_volumes = [(src, dst, workspace_bucket + str(dst), "INPUT_" + str(i),) for i, (src, dst) in enumerate(volumes)]
dsub_volumes = [(src, dst, workspace_bucket + str(dst), "INPUT_" + str(i),) for i, (src, dst) in enumerate(volumes)]


# Prepare command that will be run inside the container for dsub
container_command = list()
for item in command:
# Find if item is volume
to_replace = [(str(path[1]), "${"+path[3]+'}') for path in dsub_volumes if str(path[1]) in item]
# Replace volume path with dsub volume path
if len(to_replace) == 1:
# Get path that will be replaced
path = to_replace[0][0]
# Get dsub input variable that will replace path
env_variable = to_replace[0][1]
# Replace path with env_variable
container_path = item.replace(path, env_variable)
# Add / if there is no suffix
if container_path == env_variable:
container_path = container_path + '/'
container_command.append(container_path)
else:
container_command.append(item)

# Add a command to copy the volumes to the workspace buckets
container_command.append(('; cp -rf ' + f'/mnt/data/input/gs/{workspace_bucket}{working_dir}/*' + ' $OUTPUT').replace('gs://', ''))

# Make the command into a string
flags['command'] = ' '.join(container_command)
flags['command'] = "'" + flags['command'] + "'"

## Push volumes to WORKSPACE_BUCKET
for src, _dst, gcs_path, _env in dsub_volumes:
upload_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True)

## Prepare flags for dsub command
flags['image'] = container
flags['env'] = environment
flags['input-recursive'] = [vol[3]+'='+vol[2] for vol in dsub_volumes]
flags['output-recursive'] = "OUTPUT=" + workspace_bucket + working_dir
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
flags['output-recursive'] = "OUTPUT=" + workspace_bucket + working_dir
flags['output-recursive'] = "OUTPUT=" + workspace_bucket + working_dir

flags['logging'] = workspace_bucket + '/dsub/'

# Create dsub command
dsub_command = prepare_dsub_cmd(flags)

# Run dsub as subprocess
subprocess.run(dsub_command, shell=True)

# Pull output volumes from WORKSPACE_BUCKET
for src, _dst, gcs_path, _env in dsub_volumes:
download_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True)

# return location of dsub logs in WORKSPACE_BUCKET
return 'dsub logs: {logs}'.format(logs = flags['logging'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return 'dsub logs: {logs}'.format(logs = flags['logging'])
return 'dsub logs: {logs}'.format(logs=flags['logging'])

16 changes: 14 additions & 2 deletions spras/meo.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from pathlib import Path

from spras.containers import prepare_volume, run_container
Expand All @@ -15,7 +16,7 @@
# Does not support MINSAT or MAXCSP
# TODO add parameter validation
def write_properties(filename=Path('properties.txt'), edges=None, sources=None, targets=None, edge_output=None,
path_output=None, max_path_length=None, local_search=None, rand_restarts=None):
path_output=None, max_path_length=None, local_search=None, rand_restarts=None, framework='docker'):
"""
Write the properties file for Maximum Edge Orientation
See https://github.com/agitter/meo/blob/master/sample.props for property descriptions and the default values at
Expand All @@ -27,6 +28,17 @@ def write_properties(filename=Path('properties.txt'), edges=None, sources=None,
if edges is None or sources is None or targets is None or edge_output is None or path_output is None:
raise ValueError('Required Maximum Edge Orientation properties file arguments are missing')

if framework == 'dsub':
# Get path inside dsub container
workspace_bucket = os.getenv('WORKSPACE_BUCKET')
input_prefix = f'/mnt/data/input/gs/{workspace_bucket}'.replace('gs://', '')
# Add input prefix to all MEO paths
edges = input_prefix + edges
sources = input_prefix + sources
targets = input_prefix + targets
edge_output = input_prefix + edge_output
path_output = input_prefix + path_output

with open(filename, 'w') as f:
# Write the required properties
f.write(f'edges.file = {Path(edges).as_posix()}\n')
Expand Down Expand Up @@ -148,7 +160,7 @@ def run(edges=None, sources=None, targets=None, output_file=None, max_path_lengt
properties_file_local = Path(out_dir, properties_file)
write_properties(filename=properties_file_local, edges=edge_file, sources=source_file, targets=target_file,
edge_output=mapped_output_file, path_output=mapped_path_output,
max_path_length=max_path_length, local_search=local_search, rand_restarts=rand_restarts)
max_path_length=max_path_length, local_search=local_search, rand_restarts=rand_restarts, framework=container_framework)
bind_path, properties_file = prepare_volume(str(properties_file_local), work_dir)
volumes.append(bind_path)

Expand Down
Loading