Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Feature: New Toolkit configuration (#507)
Browse files Browse the repository at this point in the history
  • Loading branch information
timotheeguerin authored May 1, 2018
1 parent 9bc7639 commit 7a7e63c
Show file tree
Hide file tree
Showing 27 changed files with 512 additions and 106 deletions.
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"request": "launch",
"stopOnEntry": false,
"pythonPath": "${config:python.pythonPath}",
"program": "${workspaceFolder}/cli/entrypoint.py",
"program": "${workspaceFolder}/aztk_cli/entrypoint.py",
"cwd": "${workspaceFolder}",
"console": "integratedTerminal",
"args": [
Expand All @@ -28,7 +28,7 @@
"request": "launch",
"stopOnEntry": false,
"pythonPath": "${config:python.pythonPath}",
"program": "${workspaceFolder}/cli/entrypoint.py",
"program": "${workspaceFolder}/aztk_cli/entrypoint.py",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}",
"args": [
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@

# Changelog

## Next

**Breaking changes**

- Moved `docker_repo` under a new `toolkit` key. `docker_repo` is now optional if you want to use the default docker images

## 0.6.0 Mixed Mode, Cluster Run & Copy

**Features:**
Expand Down
5 changes: 1 addition & 4 deletions aztk/client.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import asyncio
import concurrent.futures
import sys
import yaml
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone

import aztk.models as models
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error
import aztk.utils.azure_api as azure_api
import aztk.utils.constants as constants
import aztk.utils.get_ssh_key as get_ssh_key
import aztk.utils.helpers as helpers
import aztk.utils.ssh as ssh_lib
import aztk.models as models
import azure.batch.models as batch_models
from azure.batch.models import batch_error
from Crypto.PublicKey import RSA
Expand Down
4 changes: 2 additions & 2 deletions aztk/internal/configuration_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import yaml
from aztk.error import AztkError
from aztk.error import AztkError, InvalidModelError

class ConfigurationBase:
"""
Expand Down Expand Up @@ -38,7 +38,7 @@ def valid(self):
def _validate_required(self, attrs):
for attr in attrs:
if not getattr(self, attr):
raise AztkError("{0} missing {1}.".format(self.__class__.__name__, attr))
raise InvalidModelError("{0} missing {1}.".format(self.__class__.__name__, attr))

def _merge_attributes(self, other, attrs):
for attr in attrs:
Expand Down
1 change: 1 addition & 0 deletions aztk/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .toolkit import Toolkit, TOOLKIT_MAP
from .models import *
35 changes: 26 additions & 9 deletions aztk/models/models.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import io
from typing import List
from aztk import error
from aztk.utils import constants
import azure.batch.models as batch_models
from aztk import error
from aztk.utils import helpers, deprecate
from aztk.models.plugins import PluginConfiguration
from aztk.internal import ConfigurationBase
import yaml
import logging
from .toolkit import Toolkit


class FileShare:
Expand All @@ -27,7 +26,7 @@ def __init__(self, name: str, payload: io.StringIO):


class CustomScript:
def __init__(self, name: str = None, script = None, run_on=None):
def __init__(self, name: str = None, script=None, run_on=None):
self.name = name
self.script = script
self.run_on = run_on
Expand All @@ -49,32 +48,38 @@ def merge(self, other):
"password",
])

def validate(self):
pass


class ClusterConfiguration(ConfigurationBase):
"""
Cluster configuration model
Args:
toolkit
"""

def __init__(self,
toolkit: Toolkit = None,
custom_scripts: List[CustomScript] = None,
file_shares: List[FileShare] = None,
cluster_id: str = None,
vm_count=0,
vm_low_pri_count=0,
vm_size=None,
subnet_id=None,
docker_repo: str = None,
plugins: List[PluginConfiguration] = None,
user_configuration: UserConfiguration = None):
super().__init__()
self.toolkit = toolkit
self.custom_scripts = custom_scripts
self.file_shares = file_shares
self.cluster_id = cluster_id
self.vm_count = vm_count
self.vm_size = vm_size
self.vm_low_pri_count = vm_low_pri_count
self.subnet_id = subnet_id
self.docker_repo = docker_repo
self.user_configuration = user_configuration
self.plugins = plugins

Expand All @@ -85,12 +90,12 @@ def merge(self, other):
"""

self._merge_attributes(other, [
"toolkit",
"custom_scripts",
"file_shares",
"cluster_id",
"vm_size",
"subnet_id",
"docker_repo",
"vm_count",
"vm_low_pri_count",
"plugins",
Expand All @@ -109,11 +114,23 @@ def merge(self, other):
def mixed_mode(self) -> bool:
return self.vm_count > 0 and self.vm_low_pri_count > 0


def gpu_enabled(self):
return helpers.is_gpu_enabled(self.vm_size)

def get_docker_repo(self):
return self.toolkit.get_docker_repo(self.gpu_enabled())

def validate(self) -> bool:
"""
Validate the config at its current state.
Raises: Error if invalid
"""
if self.toolkit is None:
raise error.InvalidModelError(
"Please supply a toolkit for the cluster")

self.toolkit.validate()

if self.cluster_id is None:
raise error.AztkError(
Expand All @@ -135,7 +152,7 @@ def validate(self) -> bool:
)

if self.custom_scripts:
logging.warning("Custom scripts are DEPRECATED and will be removed in 0.8.0. Use plugins instead See https://aztk.readthedocs.io/en/latest/15-plugins.html")
deprecate("Custom scripts are DEPRECATED and will be removed in 0.8.0. Use plugins instead See https://aztk.readthedocs.io/en/latest/15-plugins.html")


class RemoteLogin:
Expand Down
115 changes: 115 additions & 0 deletions aztk/models/toolkit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from aztk.internal import ConfigurationBase
from aztk.error import InvalidModelError
from aztk.utils import constants, deprecate


class ToolkitDefinition:
def __init__(self, versions, environments):
self.versions = versions
self.environments = environments

class ToolkitEnvironmentDefinition:
def __init__(self, versions=None, default=""):
self.versions = versions or [""]
self.default = default

TOOLKIT_MAP = dict(
spark=ToolkitDefinition(
versions=["1.6.3", "2.1.0", "2.2.0", "2.3.0"],
environments=dict(
base=ToolkitEnvironmentDefinition(),
r=ToolkitEnvironmentDefinition(),
miniconda=ToolkitEnvironmentDefinition(),
anaconda=ToolkitEnvironmentDefinition(),
)
),
)


class Toolkit(ConfigurationBase):
"""
Toolkit for a cluster.
This will help pick the docker image needed
Args:
software (str): Name of the toolkit(spark)
version (str): Version of the toolkit
environment (str): Which environment to use for this toolkit
environment_version (str): If there is multiple version for an environment you can specify which one
"""
def __init__(self,
software: str,
version: str,
environment: str = None,
environment_version: str = None,
docker_repo=None):

self.software = software
self.version = str(version)
self.environment = environment
self.environment_version = environment_version
self.docker_repo = docker_repo


def validate(self):
self._validate_required(["software", "version"])

if self.software not in TOOLKIT_MAP:
raise InvalidModelError("Toolkit '{0}' is not in the list of allowed toolkits {1}".format(
self.software, list(TOOLKIT_MAP.keys())))

toolkit_def = TOOLKIT_MAP[self.software]

if self.version not in toolkit_def.versions:
raise InvalidModelError("Toolkit '{0}' with version '{1}' is not available. Use one of: {2}".format(
self.software, self.version, toolkit_def.versions))
if self.version == "1.6":
deprecate("Spark version 1.6 is being deprecated for Aztk. Please use 2.1 and above.")

if self.environment:
if self.environment not in toolkit_def.environments:
raise InvalidModelError("Environment '{0}' for toolkit '{1}' is not available. Use one of: {2}".format(
self.environment, self.software, list(toolkit_def.environments.keys())))

env_def = toolkit_def.environments[self.environment]

if self.environment_version and self.environment_version not in env_def.versions:
raise InvalidModelError(
"Environment '{0}' version '{1}' for toolkit '{2}' is not available. Use one of: {3}".format(
self.environment, self.environment_version, self.software, env_def.versions))


def get_docker_repo(self, gpu: bool):
if self.docker_repo:
return self.docker_repo

repo = "aztk/{0}".format(self.software)

return "{repo}:{tag}".format(
repo=repo,
tag=self._get_docker_tag(gpu),
)

def _get_docker_tag(self, gpu: bool):
environment = self.environment or "base"
environment_def = self._get_environent_definition()
environment_version = self.environment_version or (environment_def and environment_def.default)

array = [
"v{docker_image_version}".format(docker_image_version=constants.DOCKER_IMAGE_VERSION),
"{toolkit}{version}".format(toolkit=self.software, version=self.version),
]
if self.environment:
array.append("{0}{1}".format(environment, environment_version))

array.append("gpu" if gpu else "base")

return '-'.join(array)


def _get_environent_definition(self) -> ToolkitEnvironmentDefinition:
toolkit = TOOLKIT_MAP.get(self.software)

if toolkit:
return toolkit.environments.get(self.environment or "base")
return None
1 change: 1 addition & 0 deletions aztk/node_scripts/setup_host.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Entry point for the start task. It will install all dependencies and start docker.
# Usage:
# setup_host.sh [container_name] [docker_repo_name]
set -e

export AZTK_WORKING_DIR=/mnt/batch/tasks/startup/wd
export PYTHONUNBUFFERED=TRUE
Expand Down
13 changes: 10 additions & 3 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
from aztk.internal.cluster_data import NodeData


DEFAULT_CLUSTER_CONFIG = models.ClusterConfiguration(
worker_on_master=True,
)

class Client(BaseClient):
"""
Aztk Spark Client
Expand All @@ -25,7 +29,7 @@ class Client(BaseClient):
def __init__(self, secrets_config):
super().__init__(secrets_config)

def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = False):
def create_cluster(self, configuration: models.ClusterConfiguration, wait: bool = False):
"""
Create a new aztk spark cluster
Expand All @@ -36,6 +40,9 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool =
Returns:
aztk.spark.models.Cluster
"""
cluster_conf = models.ClusterConfiguration()
cluster_conf.merge(DEFAULT_CLUSTER_CONFIG)
cluster_conf.merge(configuration)
cluster_conf.validate()
cluster_data = self._get_cluster_data(cluster_conf.cluster_id)
try:
Expand All @@ -47,7 +54,7 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool =
zip_resource_files,
cluster_conf.cluster_id,
cluster_conf.gpu_enabled(),
cluster_conf.docker_repo,
cluster_conf.get_docker_repo(),
cluster_conf.file_shares,
cluster_conf.plugins,
cluster_conf.mixed_mode(),
Expand Down Expand Up @@ -211,7 +218,7 @@ def submit_job(self, job_configuration):
zip_resource_files,
job_configuration.id,
job_configuration.gpu_enabled,
job_configuration.docker_repo,
job_configuration.get_docker_repo(),
mixed_mode=job_configuration.mixed_mode(),
worker_on_master=job_configuration.worker_on_master)

Expand Down
Loading

0 comments on commit 7a7e63c

Please sign in to comment.