-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrate dsub #195
base: master
Are you sure you want to change the base?
Integrate dsub #195
Changes from all commits
2062294
863bdcf
152d762
92c2a97
61976b7
1725d1b
2aac5f2
bb1af98
f0134a3
d702721
616cd67
11d0450
2769ab0
6b2a3c3
4b8a66a
28682a8
2f8723a
8634119
b712711
cea8505
6d6f4bb
816ec07
e9be16b
53eaf35
ce951cd
c225dcf
d0172fb
88105ee
5452b6f
0bf57a3
f80c3fe
5aa72ee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -1,6 +1,7 @@ | ||||||||
import os | ||||||||
import platform | ||||||||
import re | ||||||||
import subprocess | ||||||||
from pathlib import Path, PurePath, PurePosixPath | ||||||||
from typing import Any, Dict, List, Optional, Tuple, Union | ||||||||
|
||||||||
|
@@ -41,6 +42,79 @@ def convert_docker_path(src_path: PurePath, dest_path: PurePath, file_path: Unio | |||||||
rel_path = file_path.relative_to(src_path) | ||||||||
return PurePosixPath(dest_path, rel_path) | ||||||||
|
||||||||
def download_gcs(gcs_path: str, local_path: str, is_dir: bool): | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My IDE shows a warning requesting 2 blank lines before these new functions. |
||||||||
# check that output path exists | ||||||||
if not os.path.exists(Path(local_path).parent): | ||||||||
os.makedirs(Path(local_path).parent) | ||||||||
|
||||||||
# build command | ||||||||
cmd = 'gcloud storage' | ||||||||
# rsync with checksums to make file transfer faster for larger files | ||||||||
cmd = cmd + ' rsync --checksums-only' | ||||||||
# check if directory | ||||||||
if is_dir: | ||||||||
cmd = cmd + ' -r' | ||||||||
cmd = cmd + ' ' + gcs_path + ' ' + local_path | ||||||||
|
||||||||
print(cmd) | ||||||||
# run command | ||||||||
subprocess.run(cmd, shell=True) | ||||||||
|
||||||||
if is_dir and Path(Path(local_path)/'gcs_temp.txt').exists(): | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nisthapanda why do we check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need the |
||||||||
os.remove(Path(Path(local_path)/'gcs_temp.txt')) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Does this work? It's a bit simpler to me to stick with pathlib operations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This works. |
||||||||
|
||||||||
def upload_gcs(local_path: str, gcs_path: str, is_dir: bool): | ||||||||
# check if path exists in cloud storage | ||||||||
exists = len(subprocess.run(f'gcloud storage ls {gcs_path}', shell=True, capture_output=True, text=True).stdout) | ||||||||
# if path exists rsyc | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
if exists > 0: | ||||||||
cmd = 'gcloud storage rsync --checksums-only' | ||||||||
# if directory is empty | ||||||||
elif exists == 0 and len(os.listdir(local_path)) == 0: | ||||||||
# create a temporary file because GCS will not recognize empty directories | ||||||||
Path(Path(local_path)/'gcs_temp.txt').touch() | ||||||||
# copy path to cloud storage | ||||||||
cmd = 'gcloud storage cp -c' | ||||||||
# else copy path to cloud storage | ||||||||
else: | ||||||||
cmd = 'gcloud storage cp -c' | ||||||||
# check if directory | ||||||||
if is_dir: | ||||||||
cmd = cmd + ' -r' | ||||||||
cmd = cmd + ' ' + str(Path(local_path).resolve()) + ' ' + gcs_path | ||||||||
|
||||||||
print(cmd) | ||||||||
# run command | ||||||||
subprocess.run(cmd, shell=True) | ||||||||
|
||||||||
def prepare_dsub_cmd(flags: dict): | ||||||||
# set constant flags | ||||||||
dsub_command = 'dsub' | ||||||||
flags['provider'] = 'google-cls-v2' | ||||||||
flags['regions'] = 'us-central1' | ||||||||
flags['user-project'] = os.getenv('GOOGLE_PROJECT') | ||||||||
flags['project'] = os.getenv('GOOGLE_PROJECT') | ||||||||
flags['network'] = 'network' | ||||||||
flags['subnetwork'] = 'subnetwork' | ||||||||
flags['service-account'] = subprocess.run(['gcloud', 'config' ,'get-value' ,'account'], capture_output=True, text=True).stdout.replace('\n', '') | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
|
||||||||
# order flags according to flag_list | ||||||||
flag_list = ["provider", "regions", "zones", "location", "user-project", "project", "network", "subnetwork", "service-account", "image", "env", "logging", "input", "input-recursive", "mount", "output", "output-recursive", "command", "script"] | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wrap the long line |
||||||||
ordered_flags = {f:flags[f] for f in flag_list if f in flags.keys()} | ||||||||
|
||||||||
# iteratively add flags to the command | ||||||||
for flag in ordered_flags.keys(): | ||||||||
if isinstance(ordered_flags.get(flag), list): | ||||||||
for f in ordered_flags.get(flag): | ||||||||
dsub_command = dsub_command + " --" + flag + " " + f | ||||||||
else: | ||||||||
dsub_command = dsub_command + " --" + flag + " " + ordered_flags.get(flag) | ||||||||
|
||||||||
# Wait for dsub job to complegte | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
dsub_command = dsub_command + " --wait" | ||||||||
print(f"Command: {dsub_command}") | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
return dsub_command | ||||||||
|
||||||||
|
||||||||
# TODO consider a better default environment variable | ||||||||
# TODO environment currently a single string (e.g. 'TMPDIR=/OmicsIntegrator1'), should it be a list? | ||||||||
|
@@ -65,6 +139,8 @@ def run_container(framework: str, container_suffix: str, command: List[str], vol | |||||||
return run_container_docker(container, command, volumes, working_dir, environment) | ||||||||
elif normalized_framework == 'singularity': | ||||||||
return run_container_singularity(container, command, volumes, working_dir, environment) | ||||||||
elif normalized_framework == 'dsub': | ||||||||
return run_container_dsub(container, command, volumes, working_dir, environment) | ||||||||
else: | ||||||||
raise ValueError(f'{framework} is not a recognized container framework. Choose "docker" or "singularity".') | ||||||||
|
||||||||
|
@@ -258,3 +334,74 @@ def prepare_volume(filename: Union[str, PurePath], volume_base: Union[str, PureP | |||||||
src = parent | ||||||||
|
||||||||
return (src, dest), container_filename | ||||||||
|
||||||||
def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'): | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
""" | ||||||||
Runs a command in the container using Docker. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
Attempts to automatically correct file owner and group for new files created by the container, setting them to the | ||||||||
current owner and group IDs. | ||||||||
Does not modify the owner or group for existing files modified by the container. | ||||||||
Comment on lines
+341
to
+343
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
@param container: name of the container in the Google Cloud Container Registry | ||||||||
@param command: command to run in the container | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
@param volumes: a list of volumes to mount where each item is a (source, destination) tuple | ||||||||
@param working_dir: the working directory in the container | ||||||||
@param environment: environment variables to set in the container | ||||||||
@return: path of output from dsub | ||||||||
""" | ||||||||
# Dictionary of flags for dsub command | ||||||||
flags = dict() | ||||||||
|
||||||||
workspace_bucket = os.getenv('WORKSPACE_BUCKET') | ||||||||
# Add path in the workspace bucket and label for dsub command for each volume | ||||||||
dsub_volumes = [(src, dst, workspace_bucket + str(dst), "INPUT_" + str(i),) for i, (src, dst) in enumerate(volumes)] | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
|
||||||||
# Prepare command that will be run inside the container for dsub | ||||||||
container_command = list() | ||||||||
for item in command: | ||||||||
# Find if item is volume | ||||||||
to_replace = [(str(path[1]), "${"+path[3]+'}') for path in dsub_volumes if str(path[1]) in item] | ||||||||
# Replace volume path with dsub volume path | ||||||||
if len(to_replace) == 1: | ||||||||
# Get path that will be replaced | ||||||||
path = to_replace[0][0] | ||||||||
# Get dsub input variable that will replace path | ||||||||
env_variable = to_replace[0][1] | ||||||||
# Replace path with env_variable | ||||||||
container_path = item.replace(path, env_variable) | ||||||||
# Add / if there is no suffix | ||||||||
if container_path == env_variable: | ||||||||
container_path = container_path + '/' | ||||||||
container_command.append(container_path) | ||||||||
else: | ||||||||
container_command.append(item) | ||||||||
|
||||||||
# Add a command to copy the volumes to the workspace buckets | ||||||||
container_command.append(('; cp -rf ' + f'/mnt/data/input/gs/{workspace_bucket}{working_dir}/*' + ' $OUTPUT').replace('gs://', '')) | ||||||||
|
||||||||
# Make the command into a string | ||||||||
flags['command'] = ' '.join(container_command) | ||||||||
flags['command'] = "'" + flags['command'] + "'" | ||||||||
|
||||||||
## Push volumes to WORKSPACE_BUCKET | ||||||||
for src, _dst, gcs_path, _env in dsub_volumes: | ||||||||
upload_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True) | ||||||||
|
||||||||
## Prepare flags for dsub command | ||||||||
flags['image'] = container | ||||||||
flags['env'] = environment | ||||||||
flags['input-recursive'] = [vol[3]+'='+vol[2] for vol in dsub_volumes] | ||||||||
flags['output-recursive'] = "OUTPUT=" + workspace_bucket + working_dir | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
flags['logging'] = workspace_bucket + '/dsub/' | ||||||||
|
||||||||
# Create dsub command | ||||||||
dsub_command = prepare_dsub_cmd(flags) | ||||||||
|
||||||||
# Run dsub as subprocess | ||||||||
subprocess.run(dsub_command, shell=True) | ||||||||
|
||||||||
# Pull output volumes from WORKSPACE_BUCKET | ||||||||
for src, _dst, gcs_path, _env in dsub_volumes: | ||||||||
download_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True) | ||||||||
|
||||||||
# return location of dsub logs in WORKSPACE_BUCKET | ||||||||
return 'dsub logs: {logs}'.format(logs = flags['logging']) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to update the options in this line too