From 7a7e63c54f9bee3f6ae4eec874071e924a9dc741 Mon Sep 17 00:00:00 2001 From: Timothee Guerin Date: Tue, 1 May 2018 16:36:44 -0700 Subject: [PATCH] Feature: New Toolkit configuration (#507) --- .vscode/launch.json | 4 +- CHANGELOG.md | 7 ++ aztk/client.py | 5 +- aztk/internal/configuration_base.py | 4 +- aztk/models/__init__.py | 1 + aztk/models/models.py | 35 ++++-- aztk/models/toolkit.py | 115 ++++++++++++++++++ aztk/node_scripts/setup_host.sh | 1 + aztk/spark/client.py | 13 +- aztk/spark/models/models.py | 41 ++++--- aztk/utils/__init__.py | 1 + aztk/utils/constants.py | 1 + aztk/utils/deprecation.py | 53 ++++++++ aztk/version.py | 12 +- aztk_cli/config.py | 9 +- aztk_cli/config/cluster.yaml | 18 ++- aztk_cli/config/job.yaml | 88 ++++++++------ aztk_cli/entrypoint.py | 14 ++- .../spark/endpoints/cluster/cluster_create.py | 3 +- aztk_cli/spark/endpoints/init.py | 27 ++-- aztk_cli/spark/endpoints/job/submit.py | 2 +- aztk_cli/toolkit.py | 61 ++++++++++ aztk_cli/utils.py | 10 +- docs/13-configuration.md | 15 ++- docs/70-jobs.md | 4 +- requirements.txt | 2 +- tests/models/test_toolkit.py | 72 +++++++++++ 27 files changed, 512 insertions(+), 106 deletions(-) create mode 100644 aztk/models/toolkit.py create mode 100644 aztk/utils/deprecation.py create mode 100644 aztk_cli/toolkit.py create mode 100644 tests/models/test_toolkit.py diff --git a/.vscode/launch.json b/.vscode/launch.json index 6baa77f5..6ea3a340 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -10,7 +10,7 @@ "request": "launch", "stopOnEntry": false, "pythonPath": "${config:python.pythonPath}", - "program": "${workspaceFolder}/cli/entrypoint.py", + "program": "${workspaceFolder}/aztk_cli/entrypoint.py", "cwd": "${workspaceFolder}", "console": "integratedTerminal", "args": [ @@ -28,7 +28,7 @@ "request": "launch", "stopOnEntry": false, "pythonPath": "${config:python.pythonPath}", - "program": "${workspaceFolder}/cli/entrypoint.py", + "program": "${workspaceFolder}/aztk_cli/entrypoint.py", "console": "integratedTerminal", "cwd": "${workspaceFolder}", "args": [ diff --git a/CHANGELOG.md b/CHANGELOG.md index a872c199..a481b371 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ + # Changelog +## Next + +**Breaking changes** + +- Moved `docker_repo` under a new `toolkit` key. `docker_repo` is now optional if you want to use the default docker images + ## 0.6.0 Mixed Mode, Cluster Run & Copy **Features:** diff --git a/aztk/client.py b/aztk/client.py index b3a96c15..94231bf6 100644 --- a/aztk/client.py +++ b/aztk/client.py @@ -1,11 +1,7 @@ import asyncio import concurrent.futures -import sys -import yaml -from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta, timezone -import aztk.models as models import azure.batch.models as batch_models import azure.batch.models.batch_error as batch_error import aztk.utils.azure_api as azure_api @@ -13,6 +9,7 @@ import aztk.utils.get_ssh_key as get_ssh_key import aztk.utils.helpers as helpers import aztk.utils.ssh as ssh_lib +import aztk.models as models import azure.batch.models as batch_models from azure.batch.models import batch_error from Crypto.PublicKey import RSA diff --git a/aztk/internal/configuration_base.py b/aztk/internal/configuration_base.py index 8e3424fe..7b8e7766 100644 --- a/aztk/internal/configuration_base.py +++ b/aztk/internal/configuration_base.py @@ -1,5 +1,5 @@ import yaml -from aztk.error import AztkError +from aztk.error import AztkError, InvalidModelError class ConfigurationBase: """ @@ -38,7 +38,7 @@ def valid(self): def _validate_required(self, attrs): for attr in attrs: if not getattr(self, attr): - raise AztkError("{0} missing {1}.".format(self.__class__.__name__, attr)) + raise InvalidModelError("{0} missing {1}.".format(self.__class__.__name__, attr)) def _merge_attributes(self, other, attrs): for attr in attrs: diff --git a/aztk/models/__init__.py b/aztk/models/__init__.py index aed4fa32..3fa7f261 100644 --- a/aztk/models/__init__.py +++ b/aztk/models/__init__.py @@ -1 +1,2 @@ +from .toolkit import Toolkit, TOOLKIT_MAP from .models import * diff --git a/aztk/models/models.py b/aztk/models/models.py index ebdbe11c..70d2f123 100644 --- a/aztk/models/models.py +++ b/aztk/models/models.py @@ -1,12 +1,11 @@ import io from typing import List -from aztk import error -from aztk.utils import constants import azure.batch.models as batch_models +from aztk import error +from aztk.utils import helpers, deprecate from aztk.models.plugins import PluginConfiguration from aztk.internal import ConfigurationBase -import yaml -import logging +from .toolkit import Toolkit class FileShare: @@ -27,7 +26,7 @@ def __init__(self, name: str, payload: io.StringIO): class CustomScript: - def __init__(self, name: str = None, script = None, run_on=None): + def __init__(self, name: str = None, script=None, run_on=None): self.name = name self.script = script self.run_on = run_on @@ -49,13 +48,20 @@ def merge(self, other): "password", ]) + def validate(self): + pass + class ClusterConfiguration(ConfigurationBase): """ Cluster configuration model + + Args: + toolkit """ def __init__(self, + toolkit: Toolkit = None, custom_scripts: List[CustomScript] = None, file_shares: List[FileShare] = None, cluster_id: str = None, @@ -63,10 +69,10 @@ def __init__(self, vm_low_pri_count=0, vm_size=None, subnet_id=None, - docker_repo: str = None, plugins: List[PluginConfiguration] = None, user_configuration: UserConfiguration = None): super().__init__() + self.toolkit = toolkit self.custom_scripts = custom_scripts self.file_shares = file_shares self.cluster_id = cluster_id @@ -74,7 +80,6 @@ def __init__(self, self.vm_size = vm_size self.vm_low_pri_count = vm_low_pri_count self.subnet_id = subnet_id - self.docker_repo = docker_repo self.user_configuration = user_configuration self.plugins = plugins @@ -85,12 +90,12 @@ def merge(self, other): """ self._merge_attributes(other, [ + "toolkit", "custom_scripts", "file_shares", "cluster_id", "vm_size", "subnet_id", - "docker_repo", "vm_count", "vm_low_pri_count", "plugins", @@ -109,11 +114,23 @@ def merge(self, other): def mixed_mode(self) -> bool: return self.vm_count > 0 and self.vm_low_pri_count > 0 + + def gpu_enabled(self): + return helpers.is_gpu_enabled(self.vm_size) + + def get_docker_repo(self): + return self.toolkit.get_docker_repo(self.gpu_enabled()) + def validate(self) -> bool: """ Validate the config at its current state. Raises: Error if invalid """ + if self.toolkit is None: + raise error.InvalidModelError( + "Please supply a toolkit for the cluster") + + self.toolkit.validate() if self.cluster_id is None: raise error.AztkError( @@ -135,7 +152,7 @@ def validate(self) -> bool: ) if self.custom_scripts: - logging.warning("Custom scripts are DEPRECATED and will be removed in 0.8.0. Use plugins instead See https://aztk.readthedocs.io/en/latest/15-plugins.html") + deprecate("Custom scripts are DEPRECATED and will be removed in 0.8.0. Use plugins instead See https://aztk.readthedocs.io/en/latest/15-plugins.html") class RemoteLogin: diff --git a/aztk/models/toolkit.py b/aztk/models/toolkit.py new file mode 100644 index 00000000..9032f095 --- /dev/null +++ b/aztk/models/toolkit.py @@ -0,0 +1,115 @@ +from aztk.internal import ConfigurationBase +from aztk.error import InvalidModelError +from aztk.utils import constants, deprecate + + +class ToolkitDefinition: + def __init__(self, versions, environments): + self.versions = versions + self.environments = environments + +class ToolkitEnvironmentDefinition: + def __init__(self, versions=None, default=""): + self.versions = versions or [""] + self.default = default + +TOOLKIT_MAP = dict( + spark=ToolkitDefinition( + versions=["1.6.3", "2.1.0", "2.2.0", "2.3.0"], + environments=dict( + base=ToolkitEnvironmentDefinition(), + r=ToolkitEnvironmentDefinition(), + miniconda=ToolkitEnvironmentDefinition(), + anaconda=ToolkitEnvironmentDefinition(), + ) + ), +) + + +class Toolkit(ConfigurationBase): + """ + Toolkit for a cluster. + This will help pick the docker image needed + + Args: + software (str): Name of the toolkit(spark) + version (str): Version of the toolkit + environment (str): Which environment to use for this toolkit + environment_version (str): If there is multiple version for an environment you can specify which one + """ + def __init__(self, + software: str, + version: str, + environment: str = None, + environment_version: str = None, + docker_repo=None): + + self.software = software + self.version = str(version) + self.environment = environment + self.environment_version = environment_version + self.docker_repo = docker_repo + + + def validate(self): + self._validate_required(["software", "version"]) + + if self.software not in TOOLKIT_MAP: + raise InvalidModelError("Toolkit '{0}' is not in the list of allowed toolkits {1}".format( + self.software, list(TOOLKIT_MAP.keys()))) + + toolkit_def = TOOLKIT_MAP[self.software] + + if self.version not in toolkit_def.versions: + raise InvalidModelError("Toolkit '{0}' with version '{1}' is not available. Use one of: {2}".format( + self.software, self.version, toolkit_def.versions)) + if self.version == "1.6": + deprecate("Spark version 1.6 is being deprecated for Aztk. Please use 2.1 and above.") + + if self.environment: + if self.environment not in toolkit_def.environments: + raise InvalidModelError("Environment '{0}' for toolkit '{1}' is not available. Use one of: {2}".format( + self.environment, self.software, list(toolkit_def.environments.keys()))) + + env_def = toolkit_def.environments[self.environment] + + if self.environment_version and self.environment_version not in env_def.versions: + raise InvalidModelError( + "Environment '{0}' version '{1}' for toolkit '{2}' is not available. Use one of: {3}".format( + self.environment, self.environment_version, self.software, env_def.versions)) + + + def get_docker_repo(self, gpu: bool): + if self.docker_repo: + return self.docker_repo + + repo = "aztk/{0}".format(self.software) + + return "{repo}:{tag}".format( + repo=repo, + tag=self._get_docker_tag(gpu), + ) + + def _get_docker_tag(self, gpu: bool): + environment = self.environment or "base" + environment_def = self._get_environent_definition() + environment_version = self.environment_version or (environment_def and environment_def.default) + + array = [ + "v{docker_image_version}".format(docker_image_version=constants.DOCKER_IMAGE_VERSION), + "{toolkit}{version}".format(toolkit=self.software, version=self.version), + ] + if self.environment: + array.append("{0}{1}".format(environment, environment_version)) + + array.append("gpu" if gpu else "base") + + return '-'.join(array) + + + def _get_environent_definition(self) -> ToolkitEnvironmentDefinition: + toolkit = TOOLKIT_MAP.get(self.software) + + if toolkit: + return toolkit.environments.get(self.environment or "base") + return None diff --git a/aztk/node_scripts/setup_host.sh b/aztk/node_scripts/setup_host.sh index 4bb93410..3228b931 100644 --- a/aztk/node_scripts/setup_host.sh +++ b/aztk/node_scripts/setup_host.sh @@ -3,6 +3,7 @@ # Entry point for the start task. It will install all dependencies and start docker. # Usage: # setup_host.sh [container_name] [docker_repo_name] +set -e export AZTK_WORKING_DIR=/mnt/batch/tasks/startup/wd export PYTHONUNBUFFERED=TRUE diff --git a/aztk/spark/client.py b/aztk/spark/client.py index 0d47b7c7..4557e17b 100644 --- a/aztk/spark/client.py +++ b/aztk/spark/client.py @@ -14,6 +14,10 @@ from aztk.internal.cluster_data import NodeData +DEFAULT_CLUSTER_CONFIG = models.ClusterConfiguration( + worker_on_master=True, +) + class Client(BaseClient): """ Aztk Spark Client @@ -25,7 +29,7 @@ class Client(BaseClient): def __init__(self, secrets_config): super().__init__(secrets_config) - def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = False): + def create_cluster(self, configuration: models.ClusterConfiguration, wait: bool = False): """ Create a new aztk spark cluster @@ -36,6 +40,9 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = Returns: aztk.spark.models.Cluster """ + cluster_conf = models.ClusterConfiguration() + cluster_conf.merge(DEFAULT_CLUSTER_CONFIG) + cluster_conf.merge(configuration) cluster_conf.validate() cluster_data = self._get_cluster_data(cluster_conf.cluster_id) try: @@ -47,7 +54,7 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = zip_resource_files, cluster_conf.cluster_id, cluster_conf.gpu_enabled(), - cluster_conf.docker_repo, + cluster_conf.get_docker_repo(), cluster_conf.file_shares, cluster_conf.plugins, cluster_conf.mixed_mode(), @@ -211,7 +218,7 @@ def submit_job(self, job_configuration): zip_resource_files, job_configuration.id, job_configuration.gpu_enabled, - job_configuration.docker_repo, + job_configuration.get_docker_repo(), mixed_mode=job_configuration.mixed_mode(), worker_on_master=job_configuration.worker_on_master) diff --git a/aztk/spark/models/models.py b/aztk/spark/models/models.py index d0c20173..d234d753 100644 --- a/aztk/spark/models/models.py +++ b/aztk/spark/models/models.py @@ -1,10 +1,17 @@ -import io -from Crypto.PublicKey import RSA from typing import List +from Crypto.PublicKey import RSA +import azure.batch.models as batch_models import aztk.models from aztk import error from aztk.utils import constants, helpers -import azure.batch.models as batch_models + +class SparkToolkit(aztk.models.Toolkit): + def __init__(self, version: str, environment: str = None, environment_version: str = None): + super().__init__( + version=version, + environment=environment, + environment_version=environment_version, + ) class Cluster(aztk.models.Cluster): @@ -97,7 +104,7 @@ def __init__( vm_low_pri_count=0, vm_size=None, subnet_id=None, - docker_repo: str = None, + toolkit: SparkToolkit = None, user_configuration: UserConfiguration = None, spark_configuration: SparkConfiguration = None, worker_on_master: bool = None): @@ -107,7 +114,7 @@ def __init__( vm_count=vm_count, vm_low_pri_count=vm_low_pri_count, vm_size=vm_size, - docker_repo=docker_repo, + toolkit=toolkit, subnet_id=subnet_id, file_shares=file_shares, user_configuration=user_configuration, @@ -115,12 +122,9 @@ def __init__( self.spark_configuration = spark_configuration self.worker_on_master = worker_on_master - def gpu_enabled(self): - return helpers.is_gpu_enabled(self.vm_size) - def merge(self, other): super().merge(other) - self._merge_attributes(other, ["worker_on_master"]) + self._merge_attributes(other, ["spark_configuration", "worker_on_master"]) class SecretsConfiguration(aztk.models.SecretsConfiguration): @@ -206,7 +210,7 @@ def __init__( vm_size, custom_scripts=None, spark_configuration=None, - docker_repo=None, + toolkit=None, max_dedicated_nodes=0, max_low_pri_nodes=0, subnet_id=None, @@ -217,7 +221,7 @@ def __init__( self.spark_configuration = spark_configuration self.vm_size = vm_size self.gpu_enabled = helpers.is_gpu_enabled(vm_size) - self.docker_repo = docker_repo + self.toolkit = toolkit self.max_dedicated_nodes = max_dedicated_nodes self.max_low_pri_nodes = max_low_pri_nodes self.subnet_id = subnet_id @@ -225,9 +229,9 @@ def __init__( def to_cluster_config(self): return ClusterConfiguration( - cluster_id = self.id, - custom_scripts = self.custom_scripts, - docker_repo=self.docker_repo, + cluster_id=self.id, + custom_scripts=self.custom_scripts, + toolkit=self.toolkit, vm_size=self.vm_size, vm_count=self.max_dedicated_nodes, vm_low_pri_count=self.max_low_pri_nodes, @@ -239,11 +243,20 @@ def to_cluster_config(self): def mixed_mode(self) -> bool: return self.max_dedicated_nodes > 0 and self.max_low_pri_nodes > 0 + def get_docker_repo(self) -> str: + return self.toolkit.get_docker_repo(self.gpu_enabled) + def validate(self) -> bool: """ Validate the config at its current state. Raises: Error if invalid """ + if self.toolkit is None: + raise error.InvalidModelError( + "Please supply a toolkit in the cluster configuration") + + self.toolkit.validate() + if self.id is None: raise error.AztkError("Please supply an ID for the Job in your configuration.") diff --git a/aztk/utils/__init__.py b/aztk/utils/__init__.py index 11ed3ffb..295f5c40 100644 --- a/aztk/utils/__init__.py +++ b/aztk/utils/__init__.py @@ -1,3 +1,4 @@ +from .deprecation import deprecated, deprecate from . import azure_api from . import command_builder from . import constants diff --git a/aztk/utils/constants.py b/aztk/utils/constants.py index afca3777..55e69c03 100644 --- a/aztk/utils/constants.py +++ b/aztk/utils/constants.py @@ -2,6 +2,7 @@ """ DOCKER """ +DOCKER_IMAGE_VERSION = "0.1.0" DEFAULT_DOCKER_REPO = "aztk/spark:v0.1.0-spark2.3.0-base" DEFAULT_DOCKER_REPO_GPU = "aztk/spark:v0.1.0-spark2.3.0-gpu" DEFAULT_SPARK_PYTHON_DOCKER_REPO = "aztk/spark:v0.1.0-spark2.3.0-miniconda-base" diff --git a/aztk/utils/deprecation.py b/aztk/utils/deprecation.py new file mode 100644 index 00000000..5a7a2bbf --- /dev/null +++ b/aztk/utils/deprecation.py @@ -0,0 +1,53 @@ +import warnings +import functools +import inspect +import aztk.version as version + +def deprecated(reason: str = None): + """ + This is a decorator which can be used to mark functions + as deprecated. It will result in a warning being emitted + when the function is used. + + Args: + reason (str): Reason to why this class or function is being deprecated + """ + + def decorator(func): + if inspect.isclass(func): + msg = "Call to deprecated class {name} ({reason})." + else: + msg = "Call to deprecated function {name} ({reason})." + + @functools.wraps(func) + def new_func(*args, **kwargs): + deprecate(msg.format(func.__name__, reason)) + return func(*args, **kwargs) + return new_func + + return decorator + + +def deprecate(message: str): + """ + Print a deprecate warning. + + Args: + message (str): Message to print + """ + + deprecated_version = _get_deprecated_version() + warnings.simplefilter('always', DeprecationWarning) # turn off filter + warnings.warn("{0} It will be removed in Aztk version {1}".format(message, deprecated_version), + category=DeprecationWarning, + stacklevel=2) + warnings.simplefilter('default', DeprecationWarning) # reset filter + + +def _get_deprecated_version(): + """ + Returns the next version where the deprecated funtionality will be removed + """ + if version.major == 0: + return "0.{minor}.0".format(minor=version.minor + 1) + return "{major}.0.0".format(major=version.major + 1) diff --git a/aztk/version.py b/aztk/version.py index 31b82af4..32dd4af6 100644 --- a/aztk/version.py +++ b/aztk/version.py @@ -21,5 +21,15 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. +major = 0 +minor = 7 +patch = 0 -__version__ = '0.7.0b20' +suffix = 'b20' + +__version__ = "{major}.{minor}.{patch}{suffix}".format( + major=major, + minor=minor, + patch=patch, + suffix=suffix, +) diff --git a/aztk_cli/config.py b/aztk_cli/config.py index 69650c58..2ee3cc17 100644 --- a/aztk_cli/config.py +++ b/aztk_cli/config.py @@ -10,6 +10,7 @@ ClusterConfiguration, UserConfiguration, ) +from aztk.models import Toolkit from aztk.models.plugins.internal import PluginReference def load_aztk_secrets() -> SecretsConfiguration: @@ -185,8 +186,8 @@ def cluster_config_from_dict(config: dict): mount_path=file_share['mount_path'], )) - if config.get('docker_repo') is not None: - output.docker_repo = config['docker_repo'] + if config.get('toolkit') is not None: + output.toolkit = Toolkit.from_dict(config['toolkit']) if config.get('plugins') not in [[None], None]: output.plugins = [] @@ -299,7 +300,7 @@ def __init__(self): self.custom_scripts = None self.spark_configuration = None self.vm_size = None - self.docker_repo = None + self.toolkit = None self.max_dedicated_nodes = 0 self.max_low_pri_nodes = 0 self.spark_defaults_conf = None @@ -317,7 +318,7 @@ def _merge_dict(self, config): cluster_configuration = config.get('cluster_configuration') if cluster_configuration: self.vm_size = cluster_configuration.get('vm_size') - self.docker_repo = cluster_configuration.get('docker_repo') + self.toolkit = Toolkit.from_dict(cluster_configuration.get('toolkit')) if cluster_configuration.get('size') is not None: self.max_dedicated_nodes = cluster_configuration.get('size') if cluster_configuration.get('size_low_pri') is not None: diff --git a/aztk_cli/config/cluster.yaml b/aztk_cli/config/cluster.yaml index ad2b0283..0c1432a1 100644 --- a/aztk_cli/config/cluster.yaml +++ b/aztk_cli/config/cluster.yaml @@ -2,6 +2,19 @@ # id: +# Toolkit configuration [Required] You can use `aztk toolkit` command to find which are the available tookits +toolkit: + software: spark + version: 2.2.0 + # Which environemnt is needed for spark anaconda, r, miniconda + environment: {environment} + # Optional version for the environment + # environment_version: + + # Optional docker repository(To bring your custom docker image. Just specify the Toolkit software, version and environemnt if using default images) + # docker_repo: + + # vm_size: vm_size: standard_a2 @@ -14,10 +27,7 @@ size: 2 # username: (optional) username: spark -# docker_repo: -docker_repo: - -# # optional custom scripts to run on the Spark master, Spark worker or all nodes in the cluster +# **DEPRECATED** Use plugins instead # custom_scripts: # - script: # runOn: diff --git a/aztk_cli/config/job.yaml b/aztk_cli/config/job.yaml index ce0397ab..1e8ad8d3 100644 --- a/aztk_cli/config/job.yaml +++ b/aztk_cli/config/job.yaml @@ -4,16 +4,24 @@ # For more information see the documentation at: https://github.com/Azure/aztk/blob/v0.7.0/docs/70-jobs.md job: - id: + id: cluster_configuration: vm_size: standard_d2_v2 size: 2 size_low_pri: 0 - subnet_id: - docker_repo: # defaults to aztk/base:spark2.2.0 - # custom_scripts: - # - script: - # runOn: + subnet_id: + # Toolkit configuration [Required] + toolkit: + software: spark + version: 2.2.0 + # Which environemnt is needed for spark anaconda, r, miniconda + environment: {environment} + # Optional version for the environment + # environment_version: + + # Optional docker repository(To bring your custom docker image. Just specify the Toolkit software, version and environemnt if using default images) + # docker_repo: + spark_configuration: spark_defaults_conf: .aztk/spark-defaults.conf @@ -24,54 +32,54 @@ job: applications: - name: # unique ID application: # /path/to/application - application_args: - - - main_class: - jars: - - - py_files: - - + application_args: + - + main_class: + jars: + - + py_files: + - files: - - - driver_java_options: - driver_library_path: - driver_class_path: - driver_memory: - executor_memory: - driver_cores: - executor_cores: - + - + driver_java_options: + driver_library_path: + driver_class_path: + driver_memory: + executor_memory: + driver_cores: + executor_cores: + - name: # unique ID application: # /path/to/application - application_args: - - - main_class: - jars: - - - py_files: - - + application_args: + - + main_class: + jars: + - + py_files: + - files: - - - driver_java_options: - driver_library_path: - driver_class_path: - driver_memory: - executor_memory: - driver_cores: - executor_cores: + - + driver_java_options: + driver_library_path: + driver_class_path: + driver_memory: + executor_memory: + driver_cores: + executor_cores: # # Functioning example, execute from the root of the repo # job: # cluster_configuration: # vm_size: standard_f2 # size: 3 - + # applications: # - name: pipy100 # application: ./examples/src/main/python/pi.py -# application_args: +# application_args: # - 100 # - name: pipy200 # application: ./examples/src/main/python/pi.py -# application_args: +# application_args: # - 200 diff --git a/aztk_cli/entrypoint.py b/aztk_cli/entrypoint.py index 1779f38a..ccc656fa 100644 --- a/aztk_cli/entrypoint.py +++ b/aztk_cli/entrypoint.py @@ -5,12 +5,20 @@ pip install -e . """ import argparse +import warnings from typing import NamedTuple import azure.batch.models.batch_error as batch_error import aztk from aztk_cli import logger, log, utils, constants from aztk_cli.spark.endpoints import spark -from . import plugins +from . import plugins, toolkit + + +# Makes sure the warnings are displayed nicely in the CLI without a stacktrace +def _show_warn(message, *_args): + log.warning(message) + +warnings.showwarning = _show_warn def main(): parser = argparse.ArgumentParser(prog=constants.CLI_EXE) @@ -24,9 +32,12 @@ def main(): "spark", help="Commands to run spark jobs") plugins_parser = subparsers.add_parser( "plugins", help="Commands to list and view plugins") + toolkit_parser = subparsers.add_parser( + "toolkit", help="List current toolkit information and browse available ones") spark.setup_parser(spark_parser) plugins.setup_parser(plugins_parser) + toolkit.setup_parser(toolkit_parser) args = parser.parse_args() parse_common_args(args) @@ -58,6 +69,7 @@ def run_software(args: NamedTuple): softwares = {} softwares[aztk.models.Software.spark] = spark.execute softwares["plugins"] = plugins.execute + softwares["toolkit"] = toolkit.execute func = softwares[args.software] func(args) diff --git a/aztk_cli/spark/endpoints/cluster/cluster_create.py b/aztk_cli/spark/endpoints/cluster/cluster_create.py index 061a25ab..348727be 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_create.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_create.py @@ -49,8 +49,7 @@ def execute(args: typing.NamedTuple): user_configuration=UserConfiguration( username=args.username, password=args.password, - ), - docker_repo=args.docker_repo)) + ))) wait = wait if args.wait is None else args.wait user_configuration = cluster_conf.user_configuration diff --git a/aztk_cli/spark/endpoints/init.py b/aztk_cli/spark/endpoints/init.py index 04313553..851f6593 100644 --- a/aztk_cli/spark/endpoints/init.py +++ b/aztk_cli/spark/endpoints/init.py @@ -2,7 +2,7 @@ import os import typing from distutils.dir_util import copy_tree - +from aztk_cli import log import aztk.utils.constants as constants @@ -10,7 +10,8 @@ def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--global', dest='global_flag', action='store_true', help="Create a .aztk/ folder in your home directory for global configurations.") software_parser = parser.add_mutually_exclusive_group() - software_parser.add_argument('--python', action="store_true", required=False) + software_parser.add_argument('--miniconda', action="store_true", required=False) + software_parser.add_argument('--annaconda', action="store_true", required=False) software_parser.add_argument('--r', '--R', action="store_true", required=False) software_parser.add_argument('--java', action="store_true", required=False) software_parser.add_argument('--scala', action="store_true", required=False) @@ -18,23 +19,29 @@ def setup_parser(parser: argparse.ArgumentParser): def execute(args: typing.NamedTuple): # software_specific init - if args.python: - docker_repo = constants.DEFAULT_SPARK_PYTHON_DOCKER_REPO + if args.miniconda: + environment = "miniconda" + elif args.annaconda: + environment = "annaconda" elif args.r: - docker_repo = constants.DEFAULT_SPARK_R_BASE_DOCKER_REPO + environment = "r" else: - docker_repo = constants.DEFAULT_DOCKER_REPO + environment = "" if args.global_flag: - create_directory(constants.GLOBAL_INIT_DIRECTORY_DEST, docker_repo) + create_directory(constants.GLOBAL_INIT_DIRECTORY_DEST, environment) else: - create_directory(constants.LOCAL_INIT_DIRECTORY_DEST, docker_repo) + create_directory(constants.LOCAL_INIT_DIRECTORY_DEST, environment) -def create_directory(dest_path: str, docker_repo: str): +def create_directory(dest_path: str, environment: str): config_src_path = constants.INIT_DIRECTORY_SOURCE config_dest_path = dest_path + if os.path.isdir(config_dest_path): + log.warning("This directory has already been initialized.") + return + copy_tree(config_src_path, config_dest_path, update=1) secrets_template_path = os.path.join(dest_path, 'secrets.yaml.template') @@ -51,6 +58,6 @@ def create_directory(dest_path: str, docker_repo: str): if os.path.isfile(cluster_path): with open(cluster_path, 'r', encoding='UTF-8') as stream: cluster_yaml = stream.read() - cluster_yaml = cluster_yaml.replace("docker_repo: \n", "docker_repo: {}\n".format(docker_repo)) + cluster_yaml = cluster_yaml.replace("{environment}", "{}\n".format(environment)) with open(cluster_path, 'w', encoding='UTF-8') as file: file.write(cluster_yaml) diff --git a/aztk_cli/spark/endpoints/job/submit.py b/aztk_cli/spark/endpoints/job/submit.py index c0b8999a..2cd0a3c6 100644 --- a/aztk_cli/spark/endpoints/job/submit.py +++ b/aztk_cli/spark/endpoints/job/submit.py @@ -40,7 +40,7 @@ def execute(args: typing.NamedTuple): custom_scripts=job_conf.custom_scripts, spark_configuration=spark_configuration, vm_size=job_conf.vm_size, - docker_repo=job_conf.docker_repo, + toolkit=job_conf.toolkit, max_dedicated_nodes=job_conf.max_dedicated_nodes, max_low_pri_nodes=job_conf.max_low_pri_nodes, subnet_id=job_conf.subnet_id, diff --git a/aztk_cli/toolkit.py b/aztk_cli/toolkit.py new file mode 100644 index 00000000..7d962675 --- /dev/null +++ b/aztk_cli/toolkit.py @@ -0,0 +1,61 @@ +import argparse +import typing + +from aztk.models import TOOLKIT_MAP, Toolkit +from aztk_cli import log + + +def setup_parser(parser: argparse.ArgumentParser): + parser.add_argument('toolkit_software', nargs='?') + parser.add_argument('version', nargs='?') + parser.add_argument('environment', nargs='?') + parser.add_argument('--gpu', action='store_true') + + +def execute(args: typing.NamedTuple): + if not args.toolkit_software: + return print_available_softwares() + + if not validate_software(args.toolkit_software): + return None + + if not args.version: + return print_available_software_version(args.toolkit_software) + if not args.environment: + print_available_environments(args.toolkit_software) + + toolkit = Toolkit( + software=args.toolkit_software, + version=args.version, + environment=args.environment, + ) + + toolkit.validate() + log.info('Docker image picked for this toolkit: %s', toolkit.get_docker_repo(args.gpu)) + return None + + +def print_available_softwares(): + log.info("Available toolkits: ") + for toolkit in TOOLKIT_MAP: + log.info(" - %s", toolkit) + +def validate_software(software: str): + if software not in TOOLKIT_MAP: + log.error("Software '%s' is not supported.", software) + print_available_softwares() + return False + return True + +def print_available_software_version(software: str): + toolkit_def = TOOLKIT_MAP.get(software) + log.info("Available version for %s: ", software) + for version in toolkit_def.versions: + log.info(" - %s", version) + +def print_available_environments(software: str): + toolkit_def = TOOLKIT_MAP.get(software) + + log.info("Available environment for %s: ", software) + for env in toolkit_def.environments: + log.info(" - %s", env) diff --git a/aztk_cli/utils.py b/aztk_cli/utils.py index 99e89d73..98444e8e 100644 --- a/aztk_cli/utils.py +++ b/aztk_cli/utils.py @@ -422,16 +422,18 @@ def print_cluster_conf(cluster_conf: ClusterConfiguration, wait: bool): user_configuration = cluster_conf.user_configuration log.info("-------------------------------------------") - log.info("spark cluster id: %s", cluster_conf.cluster_id) - log.info("spark cluster size: %s", + log.info("cluster id: %s", cluster_conf.cluster_id) + log.info("cluster toolkit: %s %s", cluster_conf.toolkit.software, cluster_conf.toolkit.version) + log.info("cluster size: %s", cluster_conf.vm_count + cluster_conf.vm_low_pri_count) log.info("> dedicated: %s", cluster_conf.vm_count) log.info("> low priority: %s", cluster_conf.vm_low_pri_count) - log.info("spark cluster vm size: %s", cluster_conf.vm_size) + log.info("cluster vm size: %s", cluster_conf.vm_size) log.info("custom scripts: %s", len(cluster_conf.custom_scripts) if cluster_conf.custom_scripts else 0) log.info("subnet ID: %s", cluster_conf.subnet_id) log.info("file shares: %s", len(cluster_conf.file_shares) if cluster_conf.file_shares is not None else 0) - log.info("docker repo name: %s", cluster_conf.docker_repo) + log.info("gpu enabled: %s", str(cluster_conf.gpu_enabled())) + log.info("docker repo name: %s", cluster_conf.get_docker_repo()) log.info("wait for cluster: %s", wait) log.info("username: %s", user_configuration.username) if user_configuration.password: diff --git a/docs/13-configuration.md b/docs/13-configuration.md index 28e8a6d7..51fa28d9 100644 --- a/docs/13-configuration.md +++ b/docs/13-configuration.md @@ -11,6 +11,18 @@ This is the default cluster configuration: # id: id: spark_cluster +# Toolkit configuration [Required] You can use `aztk toolkit` command to find which are the available tookits +toolkit: + software: spark + version: 2.2 + # environment: python + # Optional version for the environment + # environment_version: + + # Optional docker repository(To bring your custom docker image. Just specify the Toolkit software, version and environemnt if using default images) + # docker_repo: + + # vm_size: vm_size: standard_a2 @@ -22,9 +34,6 @@ size: 2 # username: (optional) username: spark -# docker_repo: -docker_repo: aztk/base:v0.1.0-spark2.3.0-base - # Enable plugins plugins: # - name: spark_ui_proxy diff --git a/docs/70-jobs.md b/docs/70-jobs.md index 00f07ff6..2400c614 100644 --- a/docs/70-jobs.md +++ b/docs/70-jobs.md @@ -42,7 +42,9 @@ Jobs also require a definition of the cluster on which the Applications will run cluster_configuration: vm_size: size: - docker_repo: + toolkit: + software: spark + version: 2.2 subnet_id: custom_scripts: - List diff --git a/requirements.txt b/requirements.txt index a066369a..2a52ccd8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ paramiko==2.4.0 # Development yapf==0.20.1 -pylint==1.8.2 +pylint==1.8.4 pytest==3.1.3 pytest-xdist==1.22.0 twine==1.11.0 diff --git a/tests/models/test_toolkit.py b/tests/models/test_toolkit.py new file mode 100644 index 00000000..76c9d88a --- /dev/null +++ b/tests/models/test_toolkit.py @@ -0,0 +1,72 @@ +import pytest + +from aztk.error import InvalidModelError +from aztk.models import Toolkit +from aztk.utils import constants + +docker_image_version = constants.DOCKER_IMAGE_VERSION + +def test_basic_toolkit(): + toolkit = Toolkit(software="spark", version="2.2.0") + assert toolkit.software == "spark" + assert toolkit.version == "2.2.0" + +def test_environment(): + toolkit = Toolkit(software="spark", version="2.2.0", environment="miniconda") + assert toolkit.software == "spark" + assert toolkit.version == "2.2.0" + assert toolkit.environment == "miniconda" + +# Test validation +def test_valid_software_and_version(): + Toolkit(software="spark", version="2.2.0").validate() + +def test_valid_software_version_and_environment(): + Toolkit(software="spark", version="2.2.0", environment="miniconda").validate() + +def test_missing_software_raise_error(): + with pytest.raises(InvalidModelError): + Toolkit(software=None, version="2.2.0").validate() + +def test_invalid_software_raise_error(): + with pytest.raises(InvalidModelError): + Toolkit(software="non-supported", version="2.2.0").validate() + +def test_missing_version_raise_error(): + with pytest.raises(InvalidModelError): + Toolkit(software="spark", version=None).validate() + +def test_invalid_version_raise_error(): + with pytest.raises(InvalidModelError): + Toolkit(software="spark", version="780.0").validate() + +def test_invalid_environment_raise_error(): + with pytest.raises(InvalidModelError): + Toolkit(software="spark", version="2.2.0", environment="dos").validate() + +def test_invalid_environment_version_raise_error(): + with pytest.raises(InvalidModelError): + Toolkit(software="spark", version="2.2.0", environment="miniconda", environment_version="7.1.9").validate() + + +## Test get docker image +def test_get_right_docker_repo(): + repo = Toolkit(software="spark", version="2.2.0").get_docker_repo(False) + + assert repo == "aztk/spark:v{0}-spark2.2.0-base".format(docker_image_version) + +def test_get_right_docker_repo_for_gpu(): + repo = Toolkit(software="spark", version="2.1.0").get_docker_repo(True) + + assert repo == "aztk/spark:v{0}-spark2.1.0-gpu".format(docker_image_version) + + +def test_get_right_docker_repo_with_env(): + repo = Toolkit(software="spark", version="2.2.0", environment="miniconda").get_docker_repo(False) + + assert repo == "aztk/spark:v{0}-spark2.2.0-miniconda-base".format(docker_image_version) + +def test_get_right_docker_repo_with_env_for_gpu(): + repo = Toolkit(software="spark", version="2.2.0", environment="miniconda").get_docker_repo(True) + + assert repo == "aztk/spark:v{0}-spark2.2.0-miniconda-gpu".format(docker_image_version)