From b591c23c31266d069b1c9f279b0e995e083a41b7 Mon Sep 17 00:00:00 2001 From: Satya Ortiz-Gagne Date: Tue, 17 Sep 2024 21:42:25 -0400 Subject: [PATCH] Add slurm system setup --- config/base.yaml | 7 +- config/cloud-multinodes-system.yaml | 13 + config/cloud-system.yaml | 24 + config/examples/cloud-multinodes-system.yaml | 10 + config/examples/cloud-system.yaml | 11 + docs/usage.rst | 89 +++- milabench/cli/cloud.py | 51 +- milabench/commands/__init__.py | 11 +- milabench/scripts/covalent/__main__.py | 463 ++++++++++++++---- milabench/scripts/covalent/covalent_bashrc.sh | 46 ++ .../scripts/covalent/milabench_bashrc.sh | 31 ++ .../scripts/covalent/python3/__main__.py | 20 + milabench/scripts/covalent/requirements.txt | 3 +- milabench/scripts/utils.py | 15 +- 14 files changed, 669 insertions(+), 125 deletions(-) create mode 100644 milabench/scripts/covalent/covalent_bashrc.sh create mode 100644 milabench/scripts/covalent/milabench_bashrc.sh create mode 100644 milabench/scripts/covalent/python3/__main__.py diff --git a/config/base.yaml b/config/base.yaml index f15333458..2509a5132 100644 --- a/config/base.yaml +++ b/config/base.yaml @@ -28,6 +28,7 @@ _torchvision: --loader: pytorch --data: "{milabench_data}/FakeImageNet" + _torchvision_ddp: inherits: _defaults definition: ../benchmarks/torchvision_ddp @@ -113,6 +114,7 @@ _timm: --dataset: "FakeImageNet" --workers: "auto({n_worker}, 8)" + _accelerate_opt: inherits: _defaults tags: @@ -149,6 +151,7 @@ _accelerate_opt: use_deepspeed: true num_machines: 1 + fp16: inherits: _flops @@ -388,6 +391,7 @@ brax: --num-minibatches: 32 --num-envs: 8192 + _diffusion: inherits: _defaults definition: ../benchmarks/diffusion @@ -530,11 +534,11 @@ _llm: definition: ../benchmarks/llm install_group: torch + llm-lora-single: inherits: _llm plan: method: per_gpu - argv: "{milabench_code}/recipes/lora_finetune_single_device.py": true --config: "{milabench_code}/configs/llama3_8B_lora_single_device.yaml" @@ -596,6 +600,7 @@ llm-lora-ddp-nodes: requires_capabilities: - "len(nodes) >= ${num_machines}" + llm-lora-mp-gpus: inherits: _llm plan: diff --git a/config/cloud-multinodes-system.yaml b/config/cloud-multinodes-system.yaml index 4f7fae391..96cf7c11d 100644 --- a/config/cloud-multinodes-system.yaml +++ b/config/cloud-multinodes-system.yaml @@ -38,3 +38,16 @@ system: size: Standard_NV72ads_A10_v5 location: eastus2 disk_size: 512 + slurm__a100_x2: + address: localhost + bashrc_path: "{bashrc_path}" + remote_workdir: "scratch/cov-{job_uuid}-workdir" + use_srun: null + options: + ntasks-per-node: 1 + gpus-per-task: a100l:2 + cpus-per-task: 12 + time: "3:0:0" + mem: 64000 + partition: short-unkillable + nodelist: cn-g[001-029] diff --git a/config/cloud-system.yaml b/config/cloud-system.yaml index 056ef3640..2bee1ef13 100644 --- a/config/cloud-system.yaml +++ b/config/cloud-system.yaml @@ -38,3 +38,27 @@ system: size: Standard_NV72ads_A10_v5 location: eastus2 disk_size: 512 + slurm__a100_x1: + address: localhost + bashrc_path: "{bashrc_path}" + remote_workdir: "scratch/cov-{job_uuid}-workdir" + use_srun: null + options: + ntasks-per-node: 1 + gpus-per-task: a100l:1 + cpus-per-task: 6 + time: "3:0:0" + mem: 32000 + partition: unkillable + slurm__a100_x4: + address: localhost + bashrc_path: "{bashrc_path}" + remote_workdir: "scratch/cov-{job_uuid}-workdir" + use_srun: null + options: + ntasks-per-node: 1 + gpus-per-task: a100l:4 + cpus-per-task: 24 + time: "3:0:0" + mem: 128000 + partition: short-unkillable diff --git a/config/examples/cloud-multinodes-system.yaml b/config/examples/cloud-multinodes-system.yaml index 5066af5eb..27f1e41b1 100644 --- a/config/examples/cloud-multinodes-system.yaml +++ b/config/examples/cloud-multinodes-system.yaml @@ -35,3 +35,13 @@ system: volume_size: 8 region: us-east-2 state_id: 71669879043a3864225aabb94f91a2d4 + slurm: + address: localhost + bashrc_path: "{bashrc_path}" + remote_workdir: "scratch/cov-{job_uuid}-workdir" + use_srun: null + options: + ntasks-per-node: 1 + cpus-per-task: 1 + time: "0:30:0" + mem: 1000 diff --git a/config/examples/cloud-system.yaml b/config/examples/cloud-system.yaml index b3d1f70aa..d3d3942cc 100644 --- a/config/examples/cloud-system.yaml +++ b/config/examples/cloud-system.yaml @@ -28,3 +28,14 @@ system: instance_type: t2.micro volume_size: 8 region: us-east-2 + slurm: + # covalent-slurm-plugin args + address: localhost + bashrc_path: "{bashrc_path}" + remote_workdir: "scratch/cov-{job_uuid}-workdir" + use_srun: null + options: + ntasks-per-node: 1 + cpus-per-task: 1 + time: "0:30:0" + mem: 1000 diff --git a/docs/usage.rst b/docs/usage.rst index b2a25d85d..4d7012e96 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -102,7 +102,7 @@ Create a cloud system configuration ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Add a ``cloud_profiles`` section to the ``system`` configuration which lists the -supported cloud profiles. +supported cloud and slurm profiles. .. notes:: @@ -150,14 +150,95 @@ Run milabench on the cloud ^^^^^^^^^^^^^^^^^^^^^^^^^^ 1. | Initialize the cloud instances - | ``milabench cloud --system {{SYSTEM_CONFIG.YAML}} --setup --run-on {{PROFILE}} >{{SYSTEM_CLOUD_CONFIG.YAML}}`` + | ``milabench cloud --setup --system {{SYSTEM_CONFIG.YAML}} --run-on {{PROFILE}} >{{SYSTEM_CLOUD_CONFIG.YAML}}`` 2. | Prepare, install and run milabench | ``milabench [prepare|install|run] --system {{SYSTEM_CLOUD_CONFIG.YAML}}`` 3. | Destroy the cloud instances - | ``milabench teardown --system {{SYSTEM_CLOUD_CONFIG.YAML}} --run-on {{PROFILE}}`` + | ``milabench cloud --teardown --system {{SYSTEM_CLOUD_CONFIG.YAML}} --run-on {{PROFILE}}`` | or - | ``milabench teardown --system {{SYSTEM_CLOUD_CONFIG.YAML}} --run-on {{PLATFORM}} --all`` + | ``milabench cloud --teardown --system {{SYSTEM_CLOUD_CONFIG.YAML}} --run-on {{PLATFORM}} --all`` | to destroy not just a single cloud instance but all instances on a specified platform that were instanced from the current local machine + + +Use milabench on slurm +~~~~~~~~~~~~~~~~~~~~~~ + + +Create a slurm system configuration +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Add a ``cloud_profiles`` section to the ``system`` configuration which lists the +supported cloud and slurm profiles. + +.. notes:: + + Nodes that should be created on the cloud should have the ``1.1.1.1`` ip + address placeholder. Other ip addresses will be used as-is and no cloud + instance will be created for that node + +.. notes:: + + A cloud profile entry needs to start with a covalent plugin (e.g. `slurm`). To + define multiple profiles on the same cloud platform, use the form + ``{PLATFORM}__{PROFILE_NAME}`` (e.g. ``slurm__profile``). All cloud profile + attributes will be used as is as argument for the target covalent plugin + +.. code-block:: yaml + + system: + nodes: + - name: manager + # Use 1.1.1.1 as an ip placeholder + ip: 1.1.1.1 + main: true + user: + - name: node1 + ip: 1.1.1.1 + main: false + user: + + # Cloud instances profiles + cloud_profiles: + # The cloud platform to use in the form of {PLATFORM} or + # {PLATFORM}__{PROFILE_NAME} + slurm: + username: usename + address: localhost + ssh_key_file: ssh_key_file + # bashrc_path will be replaced by the content of + # milabench/scripts/covalent/covalent_bashrc.sh + bashrc_path: "{bashrc_path}" + # job_uuid will be replaced by the generated job's uuid + remote_workdir: "cov-{job_uuid}-workdir" + use_srun: null + options: + ntasks-per-node: 1 + cpus-per-task: 1 + time: "0:30:0" + mem: 1000 + + +Run milabench on slurm +^^^^^^^^^^^^^^^^^^^^^^ + +1. | Initialize the slurm instances + | ``milabench cloud --setup --system {{SYSTEM_CONFIG.YAML}} --run-on {{PROFILE}} >{{SYSTEM_SLURM_CONFIG.YAML}}`` + +2. | Prepare, install and run milabench + | ``milabench [prepare|install|run] --system {{SYSTEM_SLURM_CONFIG.YAML}}`` + +3. | Destroy the slurm instances + | ``milabench cloud --teardown --system {{SYSTEM_SLURM_CONFIG.YAML}} --run-on {{PROFILE}}`` + +.. notes:: + + Because the milabench's path is expected to be the same on local machine and + the remote machine, it's currently necessary to run the commands from the + slurm cluster. As the ``milabench cloud --[setup|teardown]`` commands requires + a covalent server to run and to avoid overloading the login nodes resources, + it's preferable to request a cpu compute node which will host to the covalent + server. An allocation with minimal resources like ``--nodes 1 --cpus-per-task + 1 --mem 1000`` should be enough. diff --git a/milabench/cli/cloud.py b/milabench/cli/cloud.py index 859cdab87..1d89b69e4 100644 --- a/milabench/cli/cloud.py +++ b/milabench/cli/cloud.py @@ -60,29 +60,33 @@ def manage_cloud(pack, run_on, action="setup"): "private_ip":(lambda v: ("internal_ip",v)), "username":(lambda v: ("user",v)), "ssh_key_file":(lambda v: ("key",v)), - # "env":(lambda v: ("env",[".", v, ";", "conda", "activate", "milabench", "&&"])), + "env":(lambda v: ("env",[".", v, "milabench", "&&"])), + "slurm_job_id":(lambda v: ("slurm_job_id",v)), } - plan_params = deepcopy(pack.config["system"]["cloud_profiles"][run_on]) + plan_params = pack.config["system"]["cloud_profiles"][run_on] run_on, *profile = run_on.split("__") profile = profile[0] if profile else "" default_state_prefix = profile or run_on default_state_id = "_".join((pack.config["hash"][:6], blabla())) - local_base = pack.dirs.base.absolute() - local_data_dir = _get_common_dir(ROOT_FOLDER.parent, local_base.parent) - if local_data_dir is None: - local_data_dir = local_base.parent - remote_data_dir = XPath("/data") / local_data_dir.name + plan_params["state_prefix"] = plan_params.get("state_prefix", default_state_prefix) + plan_params["state_id"] = plan_params.get("state_id", default_state_id) + plan_params["keep_alive"] = None + + # local_base = pack.dirs.base.absolute() + # local_data_dir = _get_common_dir(ROOT_FOLDER.parent, local_base.parent) + # if local_data_dir is None: + # local_data_dir = local_base.parent + # remote_data_dir = XPath("/data") / local_data_dir.name + + plan_params_copy = deepcopy(plan_params) nodes = iter(enumerate(pack.config["system"]["nodes"])) for i, n in nodes: - if n["ip"] != "1.1.1.1": + if n["ip"] != "1.1.1.1" and action == _SETUP: continue - plan_params["state_prefix"] = plan_params.get("state_prefix", default_state_prefix) - plan_params["state_id"] = plan_params.get("state_id", default_state_id) - plan_params["cluster_size"] = max(len(pack.config["system"]["nodes"]), i + 1) - plan_params["keep_alive"] = None + plan_params_copy["cluster_size"] = max(len(pack.config["system"]["nodes"]), i + 1) import milabench.scripts.covalent as cv @@ -101,17 +105,17 @@ def manage_cloud(pack, run_on, action="setup"): "-m", cv.__name__, run_on, f"--{action}", - *_flatten_cli_args(**plan_params) + *_flatten_cli_args(**plan_params_copy) ] - if action == _SETUP: - cmd += [ - "--", - "bash", "-c", - _or_sudo(f"mkdir -p '{local_data_dir.parent}'") + - " && " + _or_sudo(f"chmod a+rwX '{local_data_dir.parent}'") + - f" && mkdir -p '{remote_data_dir}'" - f" && ln -sfT '{remote_data_dir}' '{local_data_dir}'" - ] + # if action == _SETUP: + # cmd += [ + # "--", + # "bash", "-c", + # _or_sudo(f"mkdir -p '{local_data_dir.parent}'") + + # " && " + _or_sudo(f"chmod a+rwX '{local_data_dir.parent}'") + + # f" && mkdir -p '{remote_data_dir}'" + # f" && ln -sfT '{remote_data_dir}' '{local_data_dir}'" + # ] p = subprocess.Popen( cmd, stdout=subprocess.PIPE, @@ -155,6 +159,9 @@ def manage_cloud(pack, run_on, action="setup"): stderr ) + if action == _TEARDOWN: + break + return pack.config["system"] diff --git a/milabench/commands/__init__.py b/milabench/commands/__init__.py index ad281b23b..fcd6ccbf4 100644 --- a/milabench/commands/__init__.py +++ b/milabench/commands/__init__.py @@ -450,6 +450,11 @@ def _find_node_config(self) -> Dict: return n return {} + def _load_env(self, node): + if node.get("env", None): + return node["env"] + return [] + def is_local(self): localnode = self.pack.config["system"]["self"] @@ -484,7 +489,7 @@ def _argv(self, **kwargs) -> List: argv.append(f"-p{self.port}") argv.append(host) - return argv # + ["env", "-i"] + return argv + self._load_env(node) class SCPCommand(SSHCommand, CmdCommand): @@ -505,6 +510,10 @@ def __init__( self.src = src self.dest = dest if dest is not None else self.src + def _load_env(self, node): + del node + return [] + def _argv(self, **kwargs) -> List: argv = super()._argv(**kwargs) diff --git a/milabench/scripts/covalent/__main__.py b/milabench/scripts/covalent/__main__.py index d4de9d932..5ea2661c7 100644 --- a/milabench/scripts/covalent/__main__.py +++ b/milabench/scripts/covalent/__main__.py @@ -1,9 +1,50 @@ import argparse +import ast import os import pathlib import subprocess import sys import tempfile +from time import sleep +import uuid + + +def _arg_pop(args:argparse.Namespace, key:str): + value = args.__getattribute__(key) + args.__delattr__(key) + return value + +ARGS_DEFAULT_SETUP = { + "slurm": { + "state_prefix": {}, + "state_id": {}, + "cluster_size": {}, + "keep_alive": {"action": "store_true"}, + } +} + +ARGS_MAP = { + "slurm": { + "state_prefix": lambda args, k:_arg_pop(args, k), + "state_id": lambda args, k:args.options.setdefault("job-name", _arg_pop(args, k)), + "cluster_size": lambda args, k:args.options.setdefault("nodes", _arg_pop(args, k)), + "keep_alive": lambda args, k:_arg_pop(args, k), + } +} + +_SETUP = {} + +_TEARDOWN = {} + +_CONNECTION_ATTRIBUTES = { + "hostname": None, + "username": None, + "ssh_key_file": None, + "private_ip": None, + "env": None, + "python_path": None, + "slurm_job_id": None +} def serve(*argv): @@ -19,69 +60,328 @@ def _get_executor_kwargs(args): } -def executor(executor_cls, args, *argv): +def _wait_for_any(*dispatch_ids): import covalent as ct - def _popen(cmd, *args, _env=None, **kwargs): - _env = _env if _env is not None else {} + dispatch_ids = set(dispatch_ids) + while True: + for dispatch_id in set(dispatch_ids): + status = ct.get_result( + dispatch_id=dispatch_id, + wait=False, + status_only=True + )["status"] + if status in [ct.status.COMPLETED]: + yield dispatch_id + dispatch_ids.remove(dispatch_id) + elif status in [ct.status.FAILED, ct.status.CANCELLED]: + raise RuntimeError(f"Job {dispatch_id} failed") + sleep(5) + + +def _format(lines:list, **template_kv): + for l in lines: + for k, v in template_kv.items(): + if "{{" + k + "}}" in l: + yield l[:l.find("{{")] + v + l[l.find("}}")+2:] + break + else: + yield l - for envvar in _env.keys(): - envvar_val = _env[envvar] - if not envvar_val: - continue +def _popen(cmd, *args, _env=None, **kwargs): + _env = _env if _env is not None else {} - envvar_val = pathlib.Path(envvar_val).expanduser() - if str(envvar_val) != _env[envvar]: - _env[envvar] = str(envvar_val) + for envvar in _env.keys(): + envvar_val = _env[envvar] - if "MILABENCH_CONFIG_CONTENT" in _env: - _config_dir = pathlib.Path(_env["MILABENCH_CONFIG"]).parent - with tempfile.NamedTemporaryFile("wt", dir=str(_config_dir), suffix=".yaml", delete=False) as _f: - _f.write(_env["MILABENCH_CONFIG_CONTENT"]) - _env["MILABENCH_CONFIG"] = _f.name + if not envvar_val: + continue - try: - cmd = (str(pathlib.Path(cmd[0]).expanduser()), *cmd[1:]) - except IndexError: - pass - - cwd = kwargs.pop("cwd", None) - if cwd is not None: - cwd = str(pathlib.Path(cwd).expanduser()) - kwargs["cwd"] = cwd - - _env = {**os.environ.copy(), **kwargs.pop("env", {}), **_env} - - kwargs = { - **kwargs, - "env": _env, - "stdout": subprocess.PIPE, - "stderr": subprocess.PIPE, - } - p = subprocess.Popen(cmd, *args, **kwargs) + envvar_val = pathlib.Path(envvar_val).expanduser() + if str(envvar_val) != _env[envvar]: + _env[envvar] = str(envvar_val) + + if "MILABENCH_CONFIG_CONTENT" in _env: + _config_dir = pathlib.Path(_env["MILABENCH_CONFIG"]).parent + with tempfile.NamedTemporaryFile("wt", dir=str(_config_dir), suffix=".yaml", delete=False) as _f: + _f.write(_env["MILABENCH_CONFIG_CONTENT"]) + _env["MILABENCH_CONFIG"] = _f.name + + try: + cmd = (str(pathlib.Path(cmd[0]).expanduser()), *cmd[1:]) + except IndexError: + pass + + cwd = kwargs.pop("cwd", None) + if cwd is not None: + cwd = str(pathlib.Path(cwd).expanduser()) + kwargs["cwd"] = cwd + + _env = {**os.environ.copy(), **kwargs.pop("env", {}), **_env} + + kwargs = { + **kwargs, + "env": _env, + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + } + p = subprocess.Popen(cmd, *args, **kwargs) + + stdout_chunks = [] + while True: + line = p.stdout.readline() + if not line: + break + line_str = line.decode("utf-8").strip() + stdout_chunks.append(line_str) + print(line_str) + + _, stderr = p.communicate() + stderr = stderr.decode("utf-8").strip() + stdout = os.linesep.join(stdout_chunks) + + if p.returncode != 0: + raise subprocess.CalledProcessError( + p.returncode, + (cmd, args, kwargs), + stdout, + stderr + ) + return p.returncode, stdout, stderr + + +def _setup_terraform(executor:"ct.executor.BaseExecutor"): + import covalent as ct + + result = ct.dispatch_sync( + ct.lattice(executor.get_connection_attributes) + )().result + + assert result and result[0] + + all_connection_attributes, _ = result + master_host:str = next(iter(all_connection_attributes)) + + if len(all_connection_attributes) > 1: + # Add master node to known host to avoid unknown host error The + # authenticity of host '[hostname] ([IP address])' can't be established. + new_host = subprocess.run( + ["ssh-keyscan", master_host], + stdout=subprocess.PIPE, + check=True + ).stdout.decode("utf8") + known_hosts = pathlib.Path("~/.ssh/known_hosts").expanduser() + with known_hosts.open("at") as _f: + _f.write(new_host) + + # Add ssh file to master node to allow connections to worker nodes + ssh_key_file = all_connection_attributes[master_host]["ssh_key_file"] + fn = pathlib.Path(ssh_key_file) + result = ct.dispatch_sync( + ct.lattice(executor.cp_to_remote) + )(f".ssh/{fn.name.split('.')[0]}", str(fn)) + + assert result.status == ct.status.COMPLETED + + return all_connection_attributes + + +def _teardown_terraform(executor:"ct.executor.BaseExecutor"): + result = executor.stop_cloud_instance().result + assert result is not None + + +def _slurm_executor(executor:"ct.executor.SlurmExecutor", job_uuid:uuid.UUID): + import covalent as ct + + _executor = ct.executor.SlurmExecutor() + _executor.from_dict(executor.to_dict()) + + executor = _executor + executor.conda_env = executor.conda_env or "covalent" + bashrc_path = f"""'' +{pathlib.Path(__file__).with_name("covalent_bashrc.sh").read_text()} +""" + bashrc_path = "\n".join( + _format( + bashrc_path.splitlines(), + conda_env=executor.conda_env, + python_version=f"{sys.version_info.major}.{sys.version_info.minor}", + covalent_version=ct.__version__, + ) + ) + executor.bashrc_path = executor.bashrc_path or "{bashrc_path}" + if "{bashrc_path}" in executor.bashrc_path: + executor.bashrc_path = executor.bashrc_path.format(bashrc_path=bashrc_path) + executor.remote_workdir = executor.remote_workdir or "cov-{job_uuid}-workdir" + executor.remote_workdir = executor.remote_workdir.format(job_uuid=job_uuid) + executor.options["job-name"] = ( + executor.options.get("job-name", None) or f"cov-{job_uuid}" + ) + + return executor - stdout_chunks = [] + +def _setup_slurm(executor:"ct.executor.SlurmExecutor"): + import covalent as ct + + job_uuid = uuid.uuid4() + job_file = f"covalent_job_{job_uuid}" + + _executor = ct.executor.SlurmExecutor() + _executor.from_dict(executor.to_dict()) + + executor = _slurm_executor(executor, job_uuid) + + job_connection_executor = ct.executor.SlurmExecutor() + job_connection_executor.from_dict(executor.to_dict()) + # Store job connection attributes + job_connection_executor.prerun_commands = f""" +# print connection attributes +printenv | + grep -E ".*SLURM.*NODENAME|.*SLURM.*JOB_ID" | + sort -u >>"{job_file}" && +srun printenv | + grep -E ".*SLURM.*NODENAME|.*SLURM.*JOB_ID" | + sort -u >>"{job_file}" && +echo "USERNAME=$USER" >>"{job_file}" && +echo "{job_uuid}" >>"{job_file}" +""".splitlines() + + query_executor = ct.executor.SlurmExecutor() + query_executor.from_dict(executor.to_dict()) + query_executor.options = { + "nodes": 1, + "cpus-per-task": 1, + "mem": 1000, + "job-name": executor.options["job-name"], + } + + @ct.electron() + def _empty(): + pass + + @ct.electron() + def _loop(): while True: - line = p.stdout.readline() - if not line: - break - line_str = line.decode("utf-8").strip() - stdout_chunks.append(line_str) - print(line_str) - - _, stderr = p.communicate() - stderr = stderr.decode("utf-8").strip() - stdout = os.linesep.join(stdout_chunks) - - if p.returncode != 0: - raise subprocess.CalledProcessError( - p.returncode, - (cmd, args, kwargs), - stdout, - stderr + sleep(60) + + @ct.electron() + def _query_connection_attributes(milabench_bashrc:str=""): + _job_file = pathlib.Path(job_file).expanduser() + _job_file.touch() + content = _job_file.read_text().splitlines() + while (not content or content[-1].strip() != f"{job_uuid}"): + sleep(5) + content = _job_file.read_text().splitlines() + + nodes = [] + connection_attributes = _CONNECTION_ATTRIBUTES.copy() + + milabench_bashrc = "\n".join( + _format( + milabench_bashrc.splitlines(), + milabench_env="cov-slurm-milabench", + python_version=f"{sys.version_info.major}.{sys.version_info.minor}", ) - return p.returncode, stdout, stderr + ) + if milabench_bashrc: + milabench_bashrc_file = _job_file.with_name("milabench_bashrc.sh").resolve() + milabench_bashrc_file.write_text(milabench_bashrc) + connection_attributes["env"] = str(milabench_bashrc_file) + + for l in _job_file.read_text().splitlines(): + try: + key, value = l.strip().split("=") + except ValueError: + # end flag + break + if "NODENAME" in key and value not in nodes: + nodes.append(value) + elif "USERNAME" in key: + connection_attributes["username"] = value + elif "JOB_ID" in key: + connection_attributes["slurm_job_id"] = value + + return { + hostname: { + **connection_attributes, + **{ + "hostname": hostname, + "private_ip": hostname, + }, + } + for hostname in nodes + } + + try: + # setup covalent for jobs + next(_wait_for_any(ct.dispatch(ct.lattice(_empty, executor=query_executor))())) + # setup nodes and retrieve connection attributes + job_dispatch_id = ct.dispatch( + ct.lattice( + lambda:_loop(), + executor=job_connection_executor + ), + disable_run=False + )() + query_dispatch_id = ct.dispatch( + ct.lattice( + _query_connection_attributes, + executor=query_executor + ), + disable_run=False + )( + milabench_bashrc=pathlib.Path(__file__).with_name("milabench_bashrc.sh").read_text() + ) + next(_wait_for_any(job_dispatch_id, query_dispatch_id)) + all_connection_attributes = ct.get_result( + dispatch_id=query_dispatch_id, + wait=False + ).result + + assert all_connection_attributes + + except: + _teardown_slurm(query_executor) + raise + + return all_connection_attributes + + +def _teardown_slurm(executor:"ct.executor.SlurmExecutor"): + import covalent as ct + + @ct.electron() + def _empty(): + pass + + assert executor.options["job-name"], "Jobs to teardown must have an explicit name" + + _exec = _slurm_executor(executor, "DELETE") + _exec.options = { + "nodes": 1, + "cpus-per-task": 1, + "mem": 1000, + "job-name": executor.options["job-name"], + } + _exec.prerun_commands = f""" +# cancel jobs +scancel --jobname="{_exec.options['job-name']}" +""".splitlines() + ct.dispatch_sync(ct.lattice(_empty, executor=_exec))() + + +def executor(executor_cls, args:argparse.Namespace, *argv): + import covalent as ct + + _SETUP[ct.executor.AzureExecutor] = _setup_terraform + _SETUP[ct.executor.EC2Executor] = _setup_terraform + _SETUP[ct.executor.SlurmExecutor] = _setup_slurm + _TEARDOWN[ct.executor.AzureExecutor] = _teardown_terraform + _TEARDOWN[ct.executor.EC2Executor] = _teardown_terraform + _TEARDOWN[ct.executor.SlurmExecutor] = _teardown_slurm executor:ct.executor.BaseExecutor = executor_cls( **_get_executor_kwargs(args), @@ -89,45 +389,13 @@ def _popen(cmd, *args, _env=None, **kwargs): return_code = 0 try: if args.setup: - result = ct.dispatch_sync( - ct.lattice(executor.get_connection_attributes) - )().result - - assert result and result[0] - - all_connection_attributes, _ = result - master_host:str = None - for hostname, connection_attributes in all_connection_attributes.items(): + for hostname, connection_attributes in _SETUP[executor_cls](executor).items(): print(f"hostname::>{hostname}") for attribute,value in connection_attributes.items(): - if attribute == "hostname": + if attribute == "hostname" or value is None: continue print(f"{attribute}::>{value}") - master_host = master_host or hostname - - if len(all_connection_attributes) > 1: - # Add master node to known host to avoid unknown host error - # The authenticity of host '[hostname] ([IP address])' can't be established. - new_host = subprocess.run( - ["ssh-keyscan", master_host], - stdout=subprocess.PIPE, - check=True - ).stdout.decode("utf8") - known_hosts = pathlib.Path("~/.ssh/known_hosts").expanduser() - with known_hosts.open("at") as _f: - _f.write(new_host) - - # Add ssh file to master node to allow connections to worker - # nodes - ssh_key_file = all_connection_attributes[master_host]["ssh_key_file"] - fn = pathlib.Path(ssh_key_file) - result = ct.dispatch_sync( - ct.lattice(executor.cp_to_remote) - )(f".ssh/{fn.name.split('.')[0]}", str(fn)) - - assert result.status == ct.status.COMPLETED - if argv: result = ct.dispatch_sync( ct.lattice(executor.list_running_instances) @@ -166,8 +434,7 @@ def _popen(cmd, *args, _env=None, **kwargs): finally: if args.teardown: - result = executor.stop_cloud_instance().result - assert result is not None + _TEARDOWN[executor_cls](executor) return return_code @@ -187,7 +454,7 @@ def main(argv=None): subparsers = parser.add_subparsers() subparser = subparsers.add_parser("serve") subparser.add_argument(f"argv", nargs=argparse.REMAINDER) - for p in ("azure","ec2"): + for p in ("azure", "ec2", "slurm"): try: config = ct.get_config(f"executors.{p}") except KeyError: @@ -201,10 +468,21 @@ def main(argv=None): add_argument_kwargs = {} if isinstance(default, bool): add_argument_kwargs["action"] = "store_false" if default else "store_true" + elif any(isinstance(default, t) for t in [dict, list]): + add_argument_kwargs["type"] = ast.literal_eval + add_argument_kwargs["default"] = str(default) else: add_argument_kwargs["default"] = default subparser.add_argument(f"--{param.replace('_', '-')}", **add_argument_kwargs) + for param, add_argument_kwargs in ARGS_DEFAULT_SETUP.get(p, {}).items(): + if param in config: + raise ValueError( + f"Found existing argument {param} in both {config} and" + f" {ARGS_DEFAULT_SETUP}" + ) + subparser.add_argument(f"--{param.replace('_', '-')}", **add_argument_kwargs) + try: cv_argv, argv = argv[:argv.index("--")], argv[argv.index("--")+1:] except ValueError: @@ -212,6 +490,9 @@ def main(argv=None): args = parser.parse_args(cv_argv) + for arg, _map in ARGS_MAP.get(cv_argv[0], {}).items(): + _map(args, arg) + if cv_argv[0] == "serve": assert not argv return serve(*args.argv) @@ -219,6 +500,8 @@ def main(argv=None): executor_cls = ct.executor.AzureExecutor elif cv_argv[0] == "ec2": executor_cls = ct.executor.EC2Executor + elif cv_argv[0] == "slurm": + executor_cls = ct.executor.SlurmExecutor else: raise diff --git a/milabench/scripts/covalent/covalent_bashrc.sh b/milabench/scripts/covalent/covalent_bashrc.sh new file mode 100644 index 000000000..197620c1c --- /dev/null +++ b/milabench/scripts/covalent/covalent_bashrc.sh @@ -0,0 +1,46 @@ +function _get_options { + set +o | cut -d' ' -f2- | while read set_option + do + echo "${set_option}" + done +} + +function _set_options { + while [[ $# -gt 0 ]] + do + local _arg="$1"; shift + case "${_arg}" in + -o) set -o "$1"; shift ;; + +o) set +o "$1"; shift ;; + -h | --help | *) + exit 1 + ;; + esac + done +} + +_options=$(_get_options) +set -o errexit -o pipefail + +_NAME="{{conda_env}}" +conda --version >&2 2>/dev/null || module load anaconda/3 +# {{python_version}} needs to be formatted +conda activate ${_NAME} || conda create -y -n ${_NAME} "python={{python_version}}" virtualenv +conda activate ${_NAME} + +function conda { + echo "DEACTIVATED conda" "$@" >&2 +} + +if [ ! -f venv/bin/activate ] +then + python3 -m virtualenv venv/ + . venv/bin/activate + # {{covalent_version}} needs to be formatted + python3 -m pip install "covalent=={{covalent_version}}" +else + . venv/bin/activate +fi + +_set_options $_options +unset _options diff --git a/milabench/scripts/covalent/milabench_bashrc.sh b/milabench/scripts/covalent/milabench_bashrc.sh new file mode 100644 index 000000000..1ee96d788 --- /dev/null +++ b/milabench/scripts/covalent/milabench_bashrc.sh @@ -0,0 +1,31 @@ +function _get_options { + set +o | cut -d' ' -f2- | while read set_option + do + echo "${set_option}" + done +} + +function _set_options { + while [[ $# -gt 0 ]] + do + local _arg="$1"; shift + case "${_arg}" in + -o) set -o "$1"; shift ;; + +o) set +o "$1"; shift ;; + -h | --help | *) + exit 1 + ;; + esac + done +} + +_options=$(_get_options) +set -o errexit -o pipefail + +_NAME="{{milabench_env}}" +conda --version >&2 2>/dev/null || module load anaconda/3 +conda activate ${_NAME} || conda create -y -n ${_NAME} "python={{python_version}}" virtualenv +conda activate ${_NAME} + +_set_options $_options +unset _options diff --git a/milabench/scripts/covalent/python3/__main__.py b/milabench/scripts/covalent/python3/__main__.py new file mode 100644 index 000000000..186d9a170 --- /dev/null +++ b/milabench/scripts/covalent/python3/__main__.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python3 + +import subprocess +import sys + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + from ...utils import get_module_venv + from .. import __main__ + check_if_module = "import covalent" + python3, env = get_module_venv(__main__.__file__, check_if_module) + + return subprocess.call([python3, *argv], env=env) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/milabench/scripts/covalent/requirements.txt b/milabench/scripts/covalent/requirements.txt index b70efc793..4b322dc3f 100644 --- a/milabench/scripts/covalent/requirements.txt +++ b/milabench/scripts/covalent/requirements.txt @@ -1,3 +1,4 @@ covalent==0.232 covalent-ec2-plugin @ git+https://github.com/satyaog/covalent-ec2-plugin.git@feature/milabench -covalent-azure-plugin @ git+https://github.com/satyaog/covalent-azure-plugin.git@feature/milabench \ No newline at end of file +covalent-azure-plugin @ git+https://github.com/satyaog/covalent-azure-plugin.git@feature/milabench +covalent-slurm-plugin diff --git a/milabench/scripts/utils.py b/milabench/scripts/utils.py index 5aec72d06..022481089 100644 --- a/milabench/scripts/utils.py +++ b/milabench/scripts/utils.py @@ -1,3 +1,4 @@ +import getpass import json import pathlib import subprocess @@ -16,9 +17,9 @@ def get_venv(venv:pathlib.Path) -> dict: return json.loads(env) -def run_in_module_venv(module_main:str, check_if_module:str, argv:list=None): +def get_module_venv(module_main:str, check_if_module:str): module = pathlib.Path(module_main).resolve().parent - cache_dir = pathlib.Path(f"/tmp/milabench/{module.name}_venv") + cache_dir = pathlib.Path(f"/tmp/{getpass.getuser()}/milabench/{module.name}_venv") python3 = str(cache_dir / "bin/python3") try: subprocess.run([python3, "-c", check_if_module], check=True, @@ -38,7 +39,9 @@ def run_in_module_venv(module_main:str, check_if_module:str, argv:list=None): str(module / "requirements.txt") ], stdout=sys.stderr, check=True) subprocess.run([python3, "-c", check_if_module], check=True, stdout=sys.stderr) - return subprocess.call( - [python3, module_main, *argv], - env=get_venv(cache_dir) - ) \ No newline at end of file + return python3, get_venv(cache_dir) + + +def run_in_module_venv(module_main:str, check_if_module:str, argv:list=None): + python3, env = get_module_venv(module_main, check_if_module) + return subprocess.call([python3, module_main, *argv], env=env) \ No newline at end of file