diff --git a/README.md b/README.md index ecfdf15b..4b9bed8d 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,12 @@ Currently, this toolkit is designed to run batch Spark jobs that require additio ## Setup 1. Clone the repo +```bash + git clone -b stable https://www.github.com/azure/aztk + + # You can also clone directly from master to get the latest bits + git clone https://www.github.com/azure/aztk +``` 2. Use pip to install required packages (requires python 3.5+ and pip 9.0.1+) ```bash pip install -r requirements.txt diff --git a/aztk/aztklib.py b/aztk/aztklib.py new file mode 100644 index 00000000..df345806 --- /dev/null +++ b/aztk/aztklib.py @@ -0,0 +1,37 @@ +from aztk.clusterlib import Cluster +from aztk.joblib import Job +import aztk.azure_api as azure_api +import aztk.config as config +import aztk.error as error + + +class Aztk: + def __init__(self): + secrets_config = config.SecretsConfig() + secrets_config.load_secrets_config() + + blob_config = azure_api.BlobConfig( + account_key=secrets_config.storage_account_key, + account_name=secrets_config.storage_account_name, + account_suffix=secrets_config.storage_account_suffix + ) + batch_config = azure_api.BatchConfig( + account_key=secrets_config.batch_account_key, + account_name=secrets_config.batch_account_name, + account_url=secrets_config.batch_service_url + ) + + self.batch_client = azure_api.make_batch_client(batch_config) + self.blob_client = azure_api.make_blob_client(blob_config) + + self.cluster = Cluster( + batch_client=self.batch_client, + blob_client=self.blob_client, + batch_config=batch_config, + blob_config=blob_config, + secrets_config=secrets_config + ) + self.job = Job( + batch_client=self.batch_client, + blob_client=self.blob_client + ) diff --git a/aztk/azure_api.py b/aztk/azure_api.py index eed11a75..f358ea25 100644 --- a/aztk/azure_api.py +++ b/aztk/azure_api.py @@ -6,14 +6,6 @@ from .version import __version__ -global_config = None - -batch_client = None -batch_config = None -blob_config = None -blob_client = None - - class BatchConfig: def __init__(self, account_key: str, account_name: str, account_url: str): self.account_key = account_key @@ -28,43 +20,35 @@ def __init__(self, account_key: str, account_name: str, account_suffix: str): self.account_suffix = account_suffix -def get_batch_client(): - """ - :returns: the batch client singleton - """ - if not batch_client: - __load_batch_client() - return batch_client - - -def get_blob_client(): - """ - :returns: the blob client singleton - """ - if not blob_client: - __load_blob_client() - return blob_client +def _validate_batch_config(batch_config: BatchConfig): + + if batch_config.account_key is None: + raise error.AzureApiInitError("Batch account key is not set in secrets.yaml config") + if batch_config.account_name is None: + raise error.AzureApiInitError("Batch account name is not set in secrets.yaml config") + if batch_config.account_url is None: + raise error.AzureApiInitError("Batch service url is not set in secrets.yaml config") -def create_batch_client( - batch_account_key: str, - batch_account_name: str, - batch_service_url: str): +def make_batch_client(batch_config: BatchConfig): """ Creates a batch client object :param str batch_account_key: batch account key :param str batch_account_name: batch account name :param str batch_service_url: batch service url """ + # Validate the given config + _validate_batch_config(batch_config) + # Set up SharedKeyCredentials credentials = batch_auth.SharedKeyCredentials( - batch_account_name, - batch_account_key) + batch_config.account_name, + batch_config.account_key) # Set up Batch Client batch_client = batch.BatchServiceClient( credentials, - base_url=batch_service_url) + base_url=batch_config.account_url) # Set retry policy batch_client.config.retry_policy.retries = 5 @@ -73,105 +57,29 @@ def create_batch_client( return batch_client -def create_blob_client( - storage_account_key: str, - storage_account_name: str, - storage_account_suffix: str): +def _validate_blob_config(blob_config: BlobConfig): + if blob_config.account_key is None: + raise error.AzureApiInitError("Storage account key is not set in secrets.yaml config") + if blob_config.account_name is None: + raise error.AzureApiInitError("Storage account name is not set in secrets.yaml config") + if blob_config.account_suffix is None: + raise error.AzureApiInitError("Storage account suffix is not set in secrets.yaml config") + + +def make_blob_client(blob_config: BlobConfig): """ Creates a blob client object :param str storage_account_key: storage account key :param str storage_account_name: storage account name :param str storage_account_suffix: storage account suffix """ + # Validate Blob config + _validate_blob_config(blob_config) + # Set up BlockBlobStorage blob_client = blob.BlockBlobService( - account_name=storage_account_name, - account_key=storage_account_key, - endpoint_suffix=storage_account_suffix) - - return blob_client - - -def get_batch_config() -> BatchConfig: - if not batch_config: - __load_batch_config() - - return batch_config - - -def __load_batch_config(): - secrets_config = config.SecretsConfig() - secrets_config.load_secrets_config() - - global batch_config - - if secrets_config.batch_account_key is None: - raise error.AzureApiInitError("Batch account key is not set in secrets.yaml config") - if secrets_config.batch_account_name is None: - raise error.AzureApiInitError("Batch account name is not set in secrets.yaml config") - if secrets_config.batch_service_url is None: - raise error.AzureApiInitError("Batch service url is not set in secrets.yaml config") - - # Get configuration - account_key = secrets_config.batch_account_key - account_name = secrets_config.batch_account_name - account_url = secrets_config.batch_service_url - - batch_config = BatchConfig( - account_key=account_key, account_name=account_name, account_url=account_url) - - -def __load_batch_client(): - global batch_client - - config = get_batch_config() - - # create batch client - batch_client = create_batch_client( - config.account_key, - config.account_name, - config.account_url) - - -def get_blob_config() -> BlobConfig: - if not blob_config: - __load_blob_config() - - return blob_config - -def __load_blob_config(): - secrets_config = config.SecretsConfig() - secrets_config.load_secrets_config() - - global blob_config - - if secrets_config.storage_account_key is None: - raise error.AzureApiInitError("Storage account key is not set in secrets.yaml config") - if secrets_config.storage_account_name is None: - raise error.AzureApiInitError("Storage account name is not set in secrets.yaml config") - if secrets_config.storage_account_suffix is None: - raise error.AzureApiInitError("Storage account suffix is not set in secrets.yaml config") - - # Get configuration - storage_account_key = secrets_config.storage_account_key - storage_account_name = secrets_config.storage_account_name - storage_account_suffix = secrets_config.storage_account_suffix - - blob_config = BlobConfig( - account_key=storage_account_key, - account_name=storage_account_name, - account_suffix=storage_account_suffix) - - -def __load_blob_client(): - global blob_client - - blob_config = get_blob_config() - - # create storage client - blob_client = create_blob_client( - blob_config.account_key, - blob_config.account_name, - blob_config.account_suffix) - + account_name=blob_config.account_name, + account_key=blob_config.account_key, + endpoint_suffix=blob_config.account_suffix) + return blob_client \ No newline at end of file diff --git a/aztk/clusterlib.py b/aztk/clusterlib.py index 3ca034dc..bf85925b 100644 --- a/aztk/clusterlib.py +++ b/aztk/clusterlib.py @@ -10,541 +10,529 @@ from aztk.models import Software import aztk.error as error import azure.batch.models as batch_models -from . import azure_api, constants, upload_node_scripts, util, log +from . import constants, upload_node_scripts, util, log from aztk.error import ClusterNotReadyError, AztkError from collections import namedtuple import aztk.config as config import getpass - POOL_ADMIN_USER_IDENTITY = batch_models.UserIdentity( auto_user=batch_models.AutoUserSpecification( scope=batch_models.AutoUserScope.pool, elevation_level=batch_models.ElevationLevel.admin)) -def cluster_install_cmd( - zip_resource_file: batch_models.ResourceFile, - docker_repo: str = None): - """ - For Docker on ubuntu 16.04 - return the command line - to be run on the start task of the pool to setup spark. - """ - - docker_repo = docker_repo or constants.DEFAULT_DOCKER_REPO - - ret = [ - 'apt-get -y clean', - 'apt-get -y update', - 'apt-get install --fix-missing', - 'apt-get -y install unzip', - 'unzip $AZ_BATCH_TASK_WORKING_DIR/{0}'.format( - zip_resource_file.file_path), - 'chmod 777 $AZ_BATCH_TASK_WORKING_DIR/setup_node.sh', - '/bin/bash $AZ_BATCH_TASK_WORKING_DIR/setup_node.sh {0} {1} "{2}"'.format( - constants.DOCKER_SPARK_CONTAINER_NAME, - docker_repo, - docker_run_cmd(docker_repo)), - ] - - return ret - - -def docker_run_cmd(docker_repo: str = None) -> str: - """ - Build the docker run command by setting up the environment variables - """ - - cmd = CommandBuilder('docker run') - cmd.add_option('--net', 'host') - cmd.add_option('--name', constants.DOCKER_SPARK_CONTAINER_NAME) - cmd.add_option('-v', '/mnt/batch/tasks:/batch') - - cmd.add_option('-e', 'DOCKER_WORKING_DIR=/batch/startup/wd') - cmd.add_option('-e', 'AZ_BATCH_ACCOUNT_NAME=$AZ_BATCH_ACCOUNT_NAME') - cmd.add_option('-e', 'BATCH_ACCOUNT_KEY=$BATCH_ACCOUNT_KEY') - cmd.add_option('-e', 'BATCH_ACCOUNT_URL=$BATCH_ACCOUNT_URL') - cmd.add_option('-e', 'STORAGE_ACCOUNT_NAME=$STORAGE_ACCOUNT_NAME') - cmd.add_option('-e', 'STORAGE_ACCOUNT_KEY=$STORAGE_ACCOUNT_KEY') - cmd.add_option('-e', 'STORAGE_ACCOUNT_SUFFIX=$STORAGE_ACCOUNT_SUFFIX') - cmd.add_option('-e', 'AZ_BATCH_POOL_ID=$AZ_BATCH_POOL_ID') - cmd.add_option('-e', 'AZ_BATCH_NODE_ID=$AZ_BATCH_NODE_ID') - cmd.add_option( - '-e', 'AZ_BATCH_NODE_IS_DEDICATED=$AZ_BATCH_NODE_IS_DEDICATED') - cmd.add_option('-e', 'SPARK_MASTER_UI_PORT=$SPARK_MASTER_UI_PORT') - cmd.add_option('-e', 'SPARK_WORKER_UI_PORT=$SPARK_WORKER_UI_PORT') - cmd.add_option('-e', 'SPARK_JUPYTER_PORT=$SPARK_JUPYTER_PORT') - cmd.add_option('-e', 'SPARK_WEB_UI_PORT=$SPARK_WEB_UI_PORT') - cmd.add_option('-p', '8080:8080') - cmd.add_option('-p', '7077:7077') - cmd.add_option('-p', '4040:4040') - cmd.add_option('-p', '8888:8888') - cmd.add_option('-d', docker_repo) - cmd.add_argument('/bin/bash /batch/startup/wd/docker_main.sh') - return cmd.to_str() - - -def generate_cluster_start_task( - zip_resource_file: batch_models.ResourceFile, - docker_repo: str = None): - """ - This will return the start task object for the pool to be created. - :param cluster_id str: Id of the cluster(Used for uploading the resource files) - :param zip_resource_file: Resource file object pointing to the zip file containing scripts to run on the node - :param custom_script str: List of paths to local files to be uploaded to storage and run after spark started. - """ - - resource_files = [zip_resource_file] - - spark_master_ui_port = constants.DOCKER_SPARK_MASTER_UI_PORT - spark_worker_ui_port = constants.DOCKER_SPARK_WORKER_UI_PORT - spark_jupyter_port = constants.DOCKER_SPARK_JUPYTER_PORT - spark_web_ui_port = constants.DOCKER_SPARK_WEB_UI_PORT - - # TODO use certificate - batch_config = azure_api.get_batch_config() - blob_config = azure_api.get_blob_config() - - environment_settings = [ - batch_models.EnvironmentSetting( - name="BATCH_ACCOUNT_KEY", value=batch_config.account_key), - batch_models.EnvironmentSetting( - name="BATCH_ACCOUNT_URL", value=batch_config.account_url), - batch_models.EnvironmentSetting( - name="STORAGE_ACCOUNT_NAME", value=blob_config.account_name), - batch_models.EnvironmentSetting( - name="STORAGE_ACCOUNT_KEY", value=blob_config.account_key), - batch_models.EnvironmentSetting( - name="STORAGE_ACCOUNT_SUFFIX", value=blob_config.account_suffix), - batch_models.EnvironmentSetting( - name="SPARK_MASTER_UI_PORT", value=spark_master_ui_port), - batch_models.EnvironmentSetting( - name="SPARK_WORKER_UI_PORT", value=spark_worker_ui_port), - batch_models.EnvironmentSetting( - name="SPARK_JUPYTER_PORT", value=spark_jupyter_port), - batch_models.EnvironmentSetting( - name="SPARK_WEB_UI_PORT", value=spark_web_ui_port), - ] + get_docker_credentials() - - # start task command - command = cluster_install_cmd(zip_resource_file, docker_repo) - - return batch_models.StartTask( - command_line=util.wrap_commands_in_shell(command), - resource_files=resource_files, - environment_settings=environment_settings, - user_identity=POOL_ADMIN_USER_IDENTITY, - wait_for_success=True) - -def upload_custom_script_config(custom_scripts=None): - with open(os.path.join(constants.CUSTOM_SCRIPTS_DEST, 'custom-scripts.yaml'), 'w+') as f: - f.write(yaml.dump(custom_scripts, default_flow_style = False)) - - -def move_custom_scripts(custom_scripts=None): - if custom_scripts is None: - return - - # remove lingering custom_scripts from a previous cluster create - clean_up_custom_scripts() - - os.mkdir(os.path.join(constants.ROOT_PATH, 'node_scripts', 'custom-scripts')) - custom_scripts_dir = os.path.join(constants.ROOT_PATH, 'node_scripts', 'custom-scripts') - - for index, custom_script in enumerate(custom_scripts): - ''' - custom_script: {script: str, runOn: str} - ''' - path = pathlib.Path(custom_script['script']) - dest_file = pathlib.Path(custom_scripts_dir, path.name) - new_file_name = str(index) + '_' + dest_file.name - src = str(path.absolute()) - dest = str(pathlib.Path(dest_file.with_name(new_file_name)).absolute()) - - if path.is_dir(): - copy_tree(src, dest) - else: - copy(src, dest) - - custom_scripts[index]['script'] = dest_file.with_name(new_file_name).name - - upload_custom_script_config(custom_scripts) - - -def clean_up_custom_scripts(): - if os.path.exists(os.path.join(constants.ROOT_PATH, constants.CUSTOM_SCRIPTS_DEST)): - rmtree(constants.CUSTOM_SCRIPTS_DEST) - -def get_docker_credentials(): - creds = [] - - secrets_config = config.SecretsConfig() - secrets_config.load_secrets_config() - - if secrets_config.docker_endpoint: - creds.append(batch_models.EnvironmentSetting( - name="DOCKER_ENDPOINT", value=secrets_config.docker_endpoint)) - if secrets_config.docker_username: - creds.append(batch_models.EnvironmentSetting( - name="DOCKER_USERNAME", value=secrets_config.docker_username)) - if secrets_config.docker_password: - creds.append(batch_models.EnvironmentSetting( - name="DOCKER_PASSWORD", value=secrets_config.docker_password)) - - return creds - - -def create_cluster( - custom_scripts: List[object], - cluster_id: str, - vm_count, - vm_low_pri_count, - vm_size, - username: str, - password: str = None, - ssh_key: str = None, - docker_repo: str = None, - wait=True): - """ - Create a spark cluster - :param custom_scripts: List of objects containing each scripts path and execution location (master/worker/all-nodes) - :parm cluster_id: Id of the cluster - :param vm_count: Number of node in the cluster - :param vm_low_pri_count: Number of low pri node in the cluster - :param vm_size: Tier of the node(standard_a2, standard_g2, etc.) - :param username: Optional username of user to add to the pool when ready(Need wait to be True) - :param password: Optional password of user to add to the pool when ready(Need wait to be True) - :param wait: If this function should wait for the cluster to be ready(Master and all slave booted) - """ - # move custom scripts to node_scripts/ for upload - move_custom_scripts(custom_scripts) - - # Upload start task scripts - zip_resource_file = upload_node_scripts.zip_and_upload() - - # clean up custom scripts - clean_up_custom_scripts() - - batch_client = azure_api.get_batch_client() - - # vm image - publisher = 'Canonical' - offer = 'UbuntuServer' - sku = '16.04' - - # reuse pool_id as job_id - pool_id = cluster_id - job_id = cluster_id - - # Get a verified node agent sku - sku_to_use, image_ref_to_use = \ - util.select_latest_verified_vm_image_with_node_agent_sku( - publisher, offer, sku) - - # Confiure the pool - pool = batch_models.PoolAddParameter( - id=pool_id, - virtual_machine_configuration=batch_models.VirtualMachineConfiguration( - image_reference=image_ref_to_use, - node_agent_sku_id=sku_to_use), - vm_size=vm_size, - target_dedicated_nodes=vm_count, - target_low_priority_nodes=vm_low_pri_count, - start_task=generate_cluster_start_task( - zip_resource_file, docker_repo), - enable_inter_node_communication=True, - max_tasks_per_node=1, - metadata=[ - batch_models.MetadataItem( - name=constants.AZTK_SOFTWARE_METADATA_KEY, value=Software.spark), - ]) - - # Get user ssh key, prompt for password if necessary - ssh_key = ssh.get_user_public_key(ssh_key) - if username is not None and password is None and ssh_key is None: - password = getpass.getpass("Please input a password for user '{0}': ".format(username)) - confirm_password = getpass.getpass("Please confirm your password for user '{0}': ".format(username)) - if password != confirm_password: - raise AztkError("Password confirmation did not match, please try again.") - if not password: - raise AztkError( - "Password is empty, cannot add user to cluster. Provide a ssh public key in .aztk/secrets.yaml. Or provide an ssh-key or password with commnad line parameters (--ssh-key or --password).") - - # Create the pool + create user for the pool - util.create_pool_if_not_exist(pool) - - # Create job - job = batch_models.JobAddParameter( - id=job_id, - pool_info=batch_models.PoolInformation(pool_id=pool_id)) - - # Add job to batch - batch_client.job.add(job) - - # Wait for the app to finish - if wait: - util.wait_for_master_to_be_ready(pool_id) - - if username is not None: - create_user(pool_id, username, password, ssh_key) - - -def create_user( - cluster_id: str, - username: str, - password: str = None, - ssh_key: str = None) -> str: - """ - Create a cluster user - :param cluster_id: id of the spark cluster - :param username: username of the user to add - :param password: password of the user to add - """ - batch_client = azure_api.get_batch_client() - - # Create new ssh user for the master node - batch_client.compute_node.add_user( - cluster_id, - util.get_master_node_id(cluster_id), - batch_models.ComputeNodeUser( - username, - is_admin=True, - password=password, - ssh_public_key=ssh_key, - expiry_time=datetime.now() + timedelta(days=365))) - - return ( - '*' * len(password) if password else None, - ssh_key, - ) - - class Cluster: - def __init__(self, pool, nodes=None): - self.id = pool.id - self.pool = pool - self.nodes = nodes - self.master_node_id = util.get_master_node_id_from_pool(pool) - if pool.state.value is batch_models.PoolState.active: - self.visible_state = pool.allocation_state.value - else: - self.visible_state = pool.state.value - self.vm_size = pool.vm_size - self.total_current_nodes = pool.current_dedicated_nodes + \ - pool.current_low_priority_nodes - self.total_target_nodes = pool.target_dedicated_nodes + \ - pool.target_low_priority_nodes - self.dedicated_nodes = pool.current_dedicated_nodes - self.low_pri_nodes = pool.current_low_priority_nodes - self.target_dedicated_nodes = pool.target_dedicated_nodes - self.target_low_pri_nodes = pool.target_low_priority_nodes - - -def pretty_node_count(cluster: Cluster) -> str: - if cluster.pool.allocation_state is batch_models.AllocationState.resizing: - return '{} -> {}'.format( - cluster.total_current_nodes, - cluster.total_target_nodes) - else: - return '{}'.format(cluster.total_current_nodes) - - -def pretty_dedicated_node_count(cluster: Cluster)-> str: - if (cluster.pool.allocation_state is batch_models.AllocationState.resizing - or cluster.pool.state is batch_models.PoolState.deleting)\ - and cluster.dedicated_nodes != cluster.target_dedicated_nodes: - return '{} -> {}'.format( - cluster.dedicated_nodes, - cluster.target_dedicated_nodes) - else: - return '{}'.format(cluster.dedicated_nodes) - - -def pretty_low_pri_node_count(cluster: Cluster)-> str: - if (cluster.pool.allocation_state is batch_models.AllocationState.resizing - or cluster.pool.state is batch_models.PoolState.deleting)\ - and cluster.low_pri_nodes != cluster.target_low_pri_nodes: - return '{} -> {}'.format( - cluster.low_pri_nodes, - cluster.target_low_pri_nodes) - else: - return '{}'.format(cluster.low_pri_nodes) - - -def print_cluster(cluster: Cluster): - node_count = pretty_node_count(cluster) - - log.info("") - log.info("Cluster %s", cluster.id) - log.info("------------------------------------------") - log.info("State: %s", cluster.visible_state) - log.info("Node Size: %s", cluster.vm_size) - log.info("Nodes: %s", node_count) - log.info("| Dedicated: %s", pretty_dedicated_node_count(cluster)) - log.info("| Low priority: %s", pretty_low_pri_node_count(cluster)) - log.info("") - - print_format = '{:<36}| {:<15} | {:<21}| {:<8}' - print_format_underline = '{:-<36}|{:-<17}|{:-<22}|{:-<8}' - log.info(print_format.format("Nodes", "State", "IP:Port", "Master")) - log.info(print_format_underline.format('', '', '', '')) - - if not cluster.nodes: - return - for node in cluster.nodes: - ip, port = util.get_connection_info(cluster.id, node.id) - log.info(print_format.format(node.id, node.state.value, '{}:{}'.format(ip, port), - '*' if node.id == cluster.master_node_id else '')) - log.info('') - - -def get_cluster(cluster_id: str) -> Cluster: - """ - Print the information for the given cluster - :param cluster_id: Id of the cluster - """ - batch_client = azure_api.get_batch_client() - - pool = batch_client.pool.get(cluster_id) - if pool.state is batch_models.PoolState.deleting: - return Cluster(pool) - - nodes = batch_client.compute_node.list(pool_id=cluster_id) - return Cluster(pool, nodes) - - -def is_pool_running_spark(pool: batch_models.CloudPool): - if pool.metadata is None: - return False - - for metadata in pool.metadata: - if metadata.name == constants.AZTK_SOFTWARE_METADATA_KEY: - return metadata.value == Software.spark - - return False - - -def list_clusters() -> List[Cluster]: - """ - List all the cluster on your account. - """ - batch_client = azure_api.get_batch_client() - - pools = batch_client.pool.list() - - return [Cluster(pool) for pool in pools if is_pool_running_spark(pool)] - - -def print_clusters(clusters: List[Cluster]): - print_format = '{:<34}| {:<10}| {:<20}| {:<7}' - print_format_underline = '{:-<34}|{:-<11}|{:-<21}|{:-<7}' - - log.info(print_format.format('Cluster', 'State', 'VM Size', 'Nodes')) - log.info(print_format_underline.format('', '', '', '')) - for cluster in clusters: - node_count = pretty_node_count(cluster) - - log.info(print_format.format(cluster.id, - cluster.visible_state, - cluster.vm_size, - node_count)) - - -def delete_cluster(cluster_id: str) -> bool: - """ - Delete a spark cluster - :param cluster_id: Id of the cluster to delete - """ - batch_client = azure_api.get_batch_client() - - # delete pool by id - pool_id = cluster_id - # job id is equal to pool id - job_id = pool_id - job_exists = True - - try: - batch_client.job.get(job_id) - except: - job_exists = False - - pool_exists = batch_client.pool.exists(pool_id) - - if job_exists: - batch_client.job.delete(job_id) - - if pool_exists: - batch_client.pool.delete(pool_id) - - return job_exists or pool_exists - - -def ssh_in_master( - cluster_id: str, - username: str=None, - webui: str=None, - jobui: str=None, - jupyter: str=None, - ports=None, - connect: bool=True): - """ - SSH into head node of spark-app - :param cluster_id: Id of the cluster to ssh in - :param username: Username to use to ssh - :param webui: Port for the spark master web ui (Local port) - :param jobui: Port for the job web ui (Local port) - :param jupyter: Port for jupyter(Local port) - :param ports: an list of local and remote ports - :type ports: [[, ]] - """ - batch_client = azure_api.get_batch_client() - - # Get master node id from task (job and task are both named pool_id) - master_node_id = util.get_master_node_id(cluster_id) - - if master_node_id is None: - raise ClusterNotReadyError("Master node has not yet been picked!") - - # get remote login settings for the user - remote_login_settings = batch_client.compute_node.get_remote_login_settings( - cluster_id, master_node_id) - - master_node_ip = remote_login_settings.remote_login_ip_address - master_node_port = remote_login_settings.remote_login_port - - pool = batch_client.pool.get(cluster_id) - - spark_master_ui_port = constants.DOCKER_SPARK_MASTER_UI_PORT - spark_worker_ui_port = constants.DOCKER_SPARK_WORKER_UI_PORT - spark_jupyter_port = constants.DOCKER_SPARK_JUPYTER_PORT - spark_web_ui_port = constants.DOCKER_SPARK_WEB_UI_PORT - - ssh_command = CommandBuilder('ssh') + def __init__(self, batch_client, blob_client, batch_config, blob_config, secrets_config): + self.batch_client = batch_client + self.blob_client = blob_client + self.batch_config = batch_config + self.blob_config = blob_config + self.secrets_config = secrets_config + + def cluster_install_cmd(self, + zip_resource_file: batch_models.ResourceFile, + docker_repo: str = None): + """ + For Docker on ubuntu 16.04 - return the command line + to be run on the start task of the pool to setup spark. + """ + docker_repo = docker_repo or constants.DEFAULT_DOCKER_REPO + + ret = [ + 'apt-get -y clean', + 'apt-get -y update', + 'apt-get install --fix-missing', + 'apt-get -y install unzip', + 'unzip $AZ_BATCH_TASK_WORKING_DIR/{0}'.format( + zip_resource_file.file_path), + 'chmod 777 $AZ_BATCH_TASK_WORKING_DIR/setup_node.sh', + '/bin/bash $AZ_BATCH_TASK_WORKING_DIR/setup_node.sh {0} {1} "{2}"'.format( + constants.DOCKER_SPARK_CONTAINER_NAME, + docker_repo, + self.docker_run_cmd(docker_repo)), + ] + + return ret + + def docker_run_cmd(self, docker_repo: str = None) -> str: + """ + Build the docker run command by setting up the environment variables + """ + + cmd = CommandBuilder('docker run') + cmd.add_option('--net', 'host') + cmd.add_option('--name', constants.DOCKER_SPARK_CONTAINER_NAME) + cmd.add_option('-v', '/mnt/batch/tasks:/batch') + + cmd.add_option('-e', 'DOCKER_WORKING_DIR=/batch/startup/wd') + cmd.add_option('-e', 'AZ_BATCH_ACCOUNT_NAME=$AZ_BATCH_ACCOUNT_NAME') + cmd.add_option('-e', 'BATCH_ACCOUNT_KEY=$BATCH_ACCOUNT_KEY') + cmd.add_option('-e', 'BATCH_ACCOUNT_URL=$BATCH_ACCOUNT_URL') + cmd.add_option('-e', 'STORAGE_ACCOUNT_NAME=$STORAGE_ACCOUNT_NAME') + cmd.add_option('-e', 'STORAGE_ACCOUNT_KEY=$STORAGE_ACCOUNT_KEY') + cmd.add_option('-e', 'STORAGE_ACCOUNT_SUFFIX=$STORAGE_ACCOUNT_SUFFIX') + cmd.add_option('-e', 'AZ_BATCH_POOL_ID=$AZ_BATCH_POOL_ID') + cmd.add_option('-e', 'AZ_BATCH_NODE_ID=$AZ_BATCH_NODE_ID') + cmd.add_option( + '-e', 'AZ_BATCH_NODE_IS_DEDICATED=$AZ_BATCH_NODE_IS_DEDICATED') + cmd.add_option('-e', 'SPARK_MASTER_UI_PORT=$SPARK_MASTER_UI_PORT') + cmd.add_option('-e', 'SPARK_WORKER_UI_PORT=$SPARK_WORKER_UI_PORT') + cmd.add_option('-e', 'SPARK_JUPYTER_PORT=$SPARK_JUPYTER_PORT') + cmd.add_option('-e', 'SPARK_WEB_UI_PORT=$SPARK_WEB_UI_PORT') + cmd.add_option('-p', '8080:8080') + cmd.add_option('-p', '7077:7077') + cmd.add_option('-p', '4040:4040') + cmd.add_option('-p', '8888:8888') + cmd.add_option('-d', docker_repo) + cmd.add_argument('/bin/bash /batch/startup/wd/docker_main.sh') + return cmd.to_str() + + def generate_cluster_start_task( + self, + zip_resource_file: batch_models.ResourceFile, + docker_repo: str = None): + """ + This will return the start task object for the pool to be created. + :param cluster_id str: Id of the cluster(Used for uploading the resource files) + :param zip_resource_file: Resource file object pointing to the zip file containing scripts to run on the node + """ + + resource_files = [zip_resource_file] + + spark_master_ui_port = constants.DOCKER_SPARK_MASTER_UI_PORT + spark_worker_ui_port = constants.DOCKER_SPARK_WORKER_UI_PORT + spark_jupyter_port = constants.DOCKER_SPARK_JUPYTER_PORT + spark_web_ui_port = constants.DOCKER_SPARK_WEB_UI_PORT + + # TODO use certificate + environment_settings = [ + batch_models.EnvironmentSetting( + name="BATCH_ACCOUNT_KEY", value=self.batch_config.account_key), + batch_models.EnvironmentSetting( + name="BATCH_ACCOUNT_URL", value=self.batch_config.account_url), + batch_models.EnvironmentSetting( + name="STORAGE_ACCOUNT_NAME", value=self.blob_config.account_name), + batch_models.EnvironmentSetting( + name="STORAGE_ACCOUNT_KEY", value=self.blob_config.account_key), + batch_models.EnvironmentSetting( + name="STORAGE_ACCOUNT_SUFFIX", value=self.blob_config.account_suffix), + batch_models.EnvironmentSetting( + name="SPARK_MASTER_UI_PORT", value=spark_master_ui_port), + batch_models.EnvironmentSetting( + name="SPARK_WORKER_UI_PORT", value=spark_worker_ui_port), + batch_models.EnvironmentSetting( + name="SPARK_JUPYTER_PORT", value=spark_jupyter_port), + batch_models.EnvironmentSetting( + name="SPARK_WEB_UI_PORT", value=spark_web_ui_port), + ] + self.get_docker_credentials() + + # start task command + command = self.cluster_install_cmd(zip_resource_file, docker_repo) + + return batch_models.StartTask( + command_line=util.wrap_commands_in_shell(command), + resource_files=resource_files, + environment_settings=environment_settings, + user_identity=POOL_ADMIN_USER_IDENTITY, + wait_for_success=True) + + def upload_custom_script_config(self, custom_scripts=None): + with open(os.path.join(constants.CUSTOM_SCRIPTS_DEST, 'custom-scripts.yaml'), 'w+') as f: + f.write(yaml.dump(custom_scripts, default_flow_style=False)) + + def move_custom_scripts(self, custom_scripts=None): + if custom_scripts is None: + return + + # remove lingering custom_scripts from a previous cluster create + self.clean_up_custom_scripts() + + os.mkdir(os.path.join(constants.ROOT_PATH, 'node_scripts', 'custom-scripts')) + custom_scripts_dir = os.path.join(constants.ROOT_PATH, 'node_scripts', 'custom-scripts') + + for index, custom_script in enumerate(custom_scripts): + ''' + custom_script: {script: str, runOn: str} + ''' + path = pathlib.Path(custom_script['script']) + dest_file = pathlib.Path(custom_scripts_dir, path.name) + new_file_name = str(index) + '_' + dest_file.name + src = str(path.absolute()) + dest = str(pathlib.Path(dest_file.with_name(new_file_name)).absolute()) + + if path.is_dir(): + copy_tree(src, dest) + else: + copy(src, dest) + + custom_scripts[index]['script'] = dest_file.with_name(new_file_name).name + + self.upload_custom_script_config(custom_scripts) + + def clean_up_custom_scripts(self): + if os.path.exists(os.path.join(constants.ROOT_PATH, constants.CUSTOM_SCRIPTS_DEST)): + rmtree(constants.CUSTOM_SCRIPTS_DEST) + + def get_docker_credentials(self): + creds = [] + + secrets_config = config.SecretsConfig() + secrets_config.load_secrets_config() + + if secrets_config.docker_endpoint: + creds.append(batch_models.EnvironmentSetting( + name="DOCKER_ENDPOINT", value=secrets_config.docker_endpoint)) + if secrets_config.docker_username: + creds.append(batch_models.EnvironmentSetting( + name="DOCKER_USERNAME", value=secrets_config.docker_username)) + if secrets_config.docker_password: + creds.append(batch_models.EnvironmentSetting( + name="DOCKER_PASSWORD", value=secrets_config.docker_password)) + + return creds + + def create_cluster( + self, + custom_scripts: List[object], + cluster_id: str, + vm_count, + vm_low_pri_count, + vm_size, + username: str, + password: str = None, + ssh_key: str = None, + docker_repo: str = None, + wait=True): + """ + Create a spark cluster + :param custom_scripts: List of objects containing each scripts path and execution location (master/worker/all-nodes) + :parm cluster_id: Id of the cluster + :param vm_count: Number of node in the cluster + :param vm_low_pri_count: Number of low pri node in the cluster + :param vm_size: Tier of the node(standard_a2, standard_g2, etc.) + :param username: Optional username of user to add to the pool when ready(Need wait to be True) + :param password: Optional password of user to add to the pool when ready(Need wait to be True) + :param wait: If this function should wait for the cluster to be ready(Master and all slave booted) + """ + # copy spark conf files if they exist + config.load_spark_config() + + # move custom scripts to node_scripts/ for upload + self.move_custom_scripts(custom_scripts) + + # upload start task scripts + zip_resource_file = upload_node_scripts.zip_and_upload(self.blob_client) + + # clean up spark conf files + config.cleanup_spark_config() + + # Clean up custom scripts + self.clean_up_custom_scripts() + + # vm image + publisher = 'Canonical' + offer = 'UbuntuServer' + sku = '16.04' + + # reuse pool_id as job_id + pool_id = cluster_id + job_id = cluster_id + + # Get a verified node agent sku + sku_to_use, image_ref_to_use = \ + util.select_latest_verified_vm_image_with_node_agent_sku( + publisher, offer, sku, self.batch_client) + + # Confiure the pool + pool = batch_models.PoolAddParameter( + id=pool_id, + virtual_machine_configuration=batch_models.VirtualMachineConfiguration( + image_reference=image_ref_to_use, + node_agent_sku_id=sku_to_use), + vm_size=vm_size, + target_dedicated_nodes=vm_count, + target_low_priority_nodes=vm_low_pri_count, + start_task=self.generate_cluster_start_task( + zip_resource_file, docker_repo), + enable_inter_node_communication=True, + max_tasks_per_node=1, + metadata=[ + batch_models.MetadataItem( + name=constants.AZTK_SOFTWARE_METADATA_KEY, value=Software.spark), + ]) + + # Get user ssh key, prompt for password if necessary + ssh_key = ssh.get_user_public_key(ssh_key, self.secrets_config) + if username is not None and password is None and ssh_key is None: + password = getpass.getpass("Please input a password for user '{0}': ".format(username)) + confirm_password = getpass.getpass("Please confirm your password for user '{0}': ".format(username)) + if password != confirm_password: + raise AztkError("Password confirmation did not match, please try again.") + if not password: + raise AztkError( + "Password is empty, cannot add user to cluster. Provide a ssh public key in .aztk/secrets.yaml. Or provide an ssh-key or password with commnad line parameters (--ssh-key or --password).") + + # Create the pool + create user for the pool + util.create_pool_if_not_exist(pool, self.batch_client) + + # Create job + job = batch_models.JobAddParameter( + id=job_id, + pool_info=batch_models.PoolInformation(pool_id=pool_id)) + + # Add job to batch + self.batch_client.job.add(job) + + # Wait for the app to finish + if wait: + util.wait_for_master_to_be_ready(pool_id, self.batch_client) + + if username is not None: + self.create_user(pool_id, username, password, ssh_key) + + def create_user( + self, + cluster_id: str, + username: str, + password: str = None, + ssh_key: str = None) -> str: + """ + Create a cluster user + :param cluster_id: id of the spark cluster + :param username: username of the user to add + :param password: password of the user to add + """ + + # Create new ssh user for the master node + self.batch_client.compute_node.add_user( + cluster_id, + util.get_master_node_id(cluster_id, self.batch_client), + batch_models.ComputeNodeUser( + username, + is_admin=True, + password=password, + ssh_public_key=ssh_key, + expiry_time=datetime.now() + timedelta(days=365))) + + return ( + '*' * len(password) if password else None, + ssh_key, + ) + + class ClusterModel: + def __init__(self, pool, nodes=None): + self.id = pool.id + self.pool = pool + self.nodes = nodes + self.master_node_id = util.get_master_node_id_from_pool(pool) + if pool.state.value is batch_models.PoolState.active: + self.visible_state = pool.allocation_state.value + else: + self.visible_state = pool.state.value + self.vm_size = pool.vm_size + self.total_current_nodes = pool.current_dedicated_nodes + \ + pool.current_low_priority_nodes + self.total_target_nodes = pool.target_dedicated_nodes + \ + pool.target_low_priority_nodes + self.dedicated_nodes = pool.current_dedicated_nodes + self.low_pri_nodes = pool.current_low_priority_nodes + self.target_dedicated_nodes = pool.target_dedicated_nodes + self.target_low_pri_nodes = pool.target_low_priority_nodes + + def pretty_node_count(self, cluster: ClusterModel) -> str: + if cluster.pool.allocation_state is batch_models.AllocationState.resizing: + return '{} -> {}'.format( + cluster.total_current_nodes, + cluster.total_target_nodes) + else: + return '{}'.format(cluster.total_current_nodes) + + def pretty_dedicated_node_count(self, cluster: ClusterModel)-> str: + if (cluster.pool.allocation_state is batch_models.AllocationState.resizing + or cluster.pool.state is batch_models.PoolState.deleting)\ + and cluster.dedicated_nodes != cluster.target_dedicated_nodes: + return '{} -> {}'.format( + cluster.dedicated_nodes, + cluster.target_dedicated_nodes) + else: + return '{}'.format(cluster.dedicated_nodes) + + def pretty_low_pri_node_count(self, cluster: ClusterModel)-> str: + if (cluster.pool.allocation_state is batch_models.AllocationState.resizing\ + or cluster.pool.state is batch_models.PoolState.deleting)\ + and cluster.low_pri_nodes != cluster.target_low_pri_nodes: + return '{} -> {}'.format( + cluster.low_pri_nodes, + cluster.target_low_pri_nodes) + else: + return '{}'.format(cluster.low_pri_nodes) + + def print_cluster(self, cluster: ClusterModel): + node_count = self.pretty_node_count(cluster) + + log.info("") + log.info("Cluster %s", cluster.id) + log.info("------------------------------------------") + log.info("State: %s", cluster.visible_state) + log.info("Node Size: %s", cluster.vm_size) + log.info("Nodes: %s", node_count) + log.info("| Dedicated: %s", self.pretty_dedicated_node_count(cluster)) + log.info("| Low priority: %s", self.pretty_low_pri_node_count(cluster)) + log.info("") + + print_format = '{:<36}| {:<15} | {:<21}| {:<8}' + print_format_underline = '{:-<36}|{:-<17}|{:-<22}|{:-<8}' + log.info(print_format.format("Nodes", "State", "IP:Port", "Master")) + log.info(print_format_underline.format('', '', '', '')) + + if not cluster.nodes: + return + for node in cluster.nodes: + ip, port = util.get_connection_info(cluster.id, node.id, self.batch_client) + log.info(print_format.format(node.id, node.state.value, '{}:{}'.format(ip, port), + '*' if node.id == cluster.master_node_id else '')) + log.info('') + + def get_cluster(self, cluster_id: str): + """ + Print the information for the given cluster + :param cluster_id: Id of the cluster + """ + + pool = self.batch_client.pool.get(cluster_id) + if pool.state is batch_models.PoolState.deleting: + return self.ClusterModel(pool) + + nodes = self.batch_client.compute_node.list(pool_id=cluster_id) + return self.ClusterModel(pool, nodes) + + def is_pool_running_spark(self, pool: batch_models.CloudPool): + if pool.metadata is None: + return False + + for metadata in pool.metadata: + if metadata.name == constants.AZTK_SOFTWARE_METADATA_KEY: + return metadata.value == Software.spark - # get ssh private key path if specified - ssh_priv_key = ssh.get_user_private_key_path() - if ssh_priv_key is not None: - ssh_command.add_option("-i", ssh_priv_key) + return False - ssh_command.add_option("-L", "{0}:localhost:{1}".format( - webui, spark_master_ui_port), enable=bool(webui)) - ssh_command.add_option("-L", "{0}:localhost:{1}".format( - jobui, spark_web_ui_port), enable=bool(jobui)) - ssh_command.add_option("-L", "{0}:localhost:{1}".format( - jupyter, spark_jupyter_port), enable=bool(jupyter)) - - if ports is not None: - for port in ports: - ssh_command.add_option( - "-L", "{0}:localhost:{1}".format(port[0], port[1])) - - user = username if username is not None else '' - ssh_command.add_argument( - "{0}@{1} -p {2}".format(user, master_node_ip, master_node_port)) - - command = ssh_command.to_str() - ssh_command_array = command.split() - - if connect: - call(ssh_command_array) - return '\n\t{}\n'.format(command) + def list_clusters(self): + """ + List all the cluster on your account. + """ + + pools = self.batch_client.pool.list() + + return [self.ClusterModel(pool) for pool in pools if self.is_pool_running_spark(pool)] + + def print_clusters(self, clusters: List[ClusterModel]): + print_format = '{:<34}| {:<10}| {:<20}| {:<7}' + print_format_underline = '{:-<34}|{:-<11}|{:-<21}|{:-<7}' + + log.info(print_format.format('Cluster', 'State', 'VM Size', 'Nodes')) + log.info(print_format_underline.format('', '', '', '')) + for cluster in clusters: + node_count = self.pretty_node_count(cluster) + + log.info(print_format.format(cluster.id, + cluster.visible_state, + cluster.vm_size, + node_count)) + + def delete_cluster(self, cluster_id: str) -> bool: + """ + Delete a spark cluster + :param cluster_id: Id of the cluster to delete + """ + # delete pool by id + pool_id = cluster_id + + # job id is equal to pool id + job_id = pool_id + job_exists = True + + try: + self.batch_client.job.get(job_id) + except: + job_exists = False + + pool_exists = self.batch_client.pool.exists(pool_id) + + if job_exists: + self.batch_client.job.delete(job_id) + + if pool_exists: + self.batch_client.pool.delete(pool_id) + + return job_exists or pool_exists + + def ssh_in_master( + self, + cluster_id: str, + username: str=None, + webui: str=None, + jobui: str=None, + jupyter: str=None, + ports=None, + connect: bool=True): + """ + SSH into head node of spark-app + :param cluster_id: Id of the cluster to ssh in + :param username: Username to use to ssh + :param webui: Port for the spark master web ui (Local port) + :param jobui: Port for the job web ui (Local port) + :param jupyter: Port for jupyter(Local port) + :param ports: an list of local and remote ports + :type ports: [[, ]] + """ + + # Get master node id from task (job and task are both named pool_id) + master_node_id = util.get_master_node_id(cluster_id, self.batch_client) + + if master_node_id is None: + raise ClusterNotReadyError("Master node has not yet been picked!") + + # get remote login settings for the user + remote_login_settings = self.batch_client.compute_node.get_remote_login_settings( + cluster_id, master_node_id) + + master_node_ip = remote_login_settings.remote_login_ip_address + master_node_port = remote_login_settings.remote_login_port + + pool = self.batch_client.pool.get(cluster_id) + + spark_master_ui_port = constants.DOCKER_SPARK_MASTER_UI_PORT + spark_worker_ui_port = constants.DOCKER_SPARK_WORKER_UI_PORT + spark_jupyter_port = constants.DOCKER_SPARK_JUPYTER_PORT + spark_web_ui_port = constants.DOCKER_SPARK_WEB_UI_PORT + + ssh_command = CommandBuilder('ssh') + + # get ssh private key path if specified + ssh_priv_key = self.secrets_config.ssh_priv_key + if ssh_priv_key is not None: + ssh_command.add_option("-i", ssh_priv_key) + + ssh_command.add_option("-L", "{0}:localhost:{1}".format( + webui, spark_master_ui_port), enable=bool(webui)) + ssh_command.add_option("-L", "{0}:localhost:{1}".format( + jobui, spark_web_ui_port), enable=bool(jobui)) + ssh_command.add_option("-L", "{0}:localhost:{1}".format( + jupyter, spark_jupyter_port), enable=bool(jupyter)) + + if ports is not None: + for port in ports: + ssh_command.add_option( + "-L", "{0}:localhost:{1}".format(port[0], port[1])) + + user = username if username is not None else '' + ssh_command.add_argument( + "{0}@{1} -p {2}".format(user, master_node_ip, master_node_port)) + + command = ssh_command.to_str() + ssh_command_array = command.split() + + if connect: + call(ssh_command_array) + return '\n\t{}\n'.format(command) diff --git a/aztk/core/ssh.py b/aztk/core/ssh.py index 72dd8cd6..c923173c 100644 --- a/aztk/core/ssh.py +++ b/aztk/core/ssh.py @@ -1,20 +1,18 @@ import os from aztk.config import SecretsConfig -def get_user_public_key(key_or_path: str = None): +def get_user_public_key(key_or_path: str = None, secrets_config: SecretsConfig = None): """ Return the ssh key. It will first check if the given argument is a ssh key or a path to one otherwise will check the configuration file. """ if not key_or_path: - secrets_conf = SecretsConfig() - secrets_conf.load_secrets_config() - - if not secrets_conf.ssh_pub_key: + if not secrets_config.ssh_pub_key: return None - key_or_path = secrets_conf.ssh_pub_key + key_or_path = secrets_config.ssh_pub_key + if not key_or_path: return None @@ -26,15 +24,6 @@ def get_user_public_key(key_or_path: str = None): return key -def get_user_private_key_path(): - """ - Return the path to the ssh private key if given. - It check the configuration file secrets.yaml. - """ - secrets_conf = SecretsConfig() - secrets_conf.load_secrets_config() - return secrets_conf.ssh_priv_key - def __read_ssh_key_from_file(path: str) -> str: """ diff --git a/aztk/joblib.py b/aztk/joblib.py index 0d779174..929f35ae 100644 --- a/aztk/joblib.py +++ b/aztk/joblib.py @@ -4,244 +4,238 @@ from aztk.core import CommandBuilder import azure.batch.models as batch_models import azure.batch.models.batch_error as batch_error -from . import azure_api, util, log, constants - -output_file = constants.TASK_WORKING_DIR + \ - "/" + constants.SPARK_SUBMIT_LOGS_FILE - - -def get_node(node_id: str, cluster_id: str) -> batch_models.ComputeNode: - batch_client = azure_api.get_batch_client() - return batch_client.compute_node.get(cluster_id, node_id) - - -def app_submit_cmd( - cluster_id: str, - name: str, - app: str, - app_args: str, - main_class: str, - jars: List[str], - py_files: List[str], - files: List[str], - driver_java_options: str, - driver_library_path: str, - driver_class_path: str, - driver_memory: str, - executor_memory: str, - driver_cores: str, - executor_cores: str): - master_id = util.get_master_node_id(cluster_id) - master_ip = get_node(master_id, cluster_id).ip_address - - # get pool data from pool meta key/value store - batch_client = azure_api.get_batch_client() - pool = batch_client.pool.get(cluster_id) - - spark_home = constants.DOCKER_SPARK_HOME - - # 2>&1 redirect stdout and stderr to be in the same file - spark_submit_cmd = CommandBuilder( - '{0}/bin/spark-submit >> {1} 2>&1'.format(spark_home, constants.SPARK_SUBMIT_LOGS_FILE)) - spark_submit_cmd.add_option( - '--master', 'spark://{0}:7077'.format(master_ip)) - spark_submit_cmd.add_option('--name', name) - spark_submit_cmd.add_option('--class', main_class) - spark_submit_cmd.add_option('--jars', jars and ','.join(jars)) - spark_submit_cmd.add_option('--py-files', py_files and ','.join(py_files)) - spark_submit_cmd.add_option('--jars', files and ','.join(files)) - spark_submit_cmd.add_option('--driver-java-options', driver_java_options) - spark_submit_cmd.add_option('--driver-library-path', driver_library_path) - spark_submit_cmd.add_option('--driver-class-path', driver_class_path) - spark_submit_cmd.add_option('--driver-memory', driver_memory) - spark_submit_cmd.add_option('--executor-memory', executor_memory) - spark_submit_cmd.add_option('--driver-cores', driver_cores) - spark_submit_cmd.add_option('--executor-cores', executor_cores) - - spark_submit_cmd.add_argument( - '/batch/workitems/{0}/{1}/{2}/wd/'.format(cluster_id, "job-1", name) + - app + ' ' + ' '.join(app_args)) - - docker_exec_cmd = CommandBuilder('sudo docker exec') - docker_exec_cmd.add_option('-e', 'PYSPARK_PYTHON=/usr/bin/python3') - docker_exec_cmd.add_option('-i', constants.DOCKER_SPARK_CONTAINER_NAME) - docker_exec_cmd.add_argument(spark_submit_cmd.to_str()) - - return [ - docker_exec_cmd.to_str() - ] - - -def submit_app( - cluster_id: str, - name: str, - app: str, - app_args: List[str], - wait: bool, - main_class: str, - jars: List[str], - py_files: List[str], - files: List[str], - driver_java_options: str, - driver_library_path: str, - driver_class_path: str, - driver_memory: str, - executor_memory: str, - driver_cores: str, - executor_cores: str): - """ - Submit a spark app - """ - batch_client = azure_api.get_batch_client() - - resource_files = [] - - app_resource_file = util.upload_file_to_container(container_name=name, file_path=app, use_full_path=True) - # Upload application file - resource_files.append(app_resource_file) - - # Upload dependent JARS - for jar in jars: - resource_files.append( - util.upload_file_to_container(container_name=name, file_path=jar, use_full_path=True)) - - # Upload dependent python files - for py_file in py_files: - resource_files.append( - util.upload_file_to_container(container_name=name, file_path=py_file, use_full_path=True)) - - # Upload other dependent files - for file in files: - resource_files.append( - util.upload_file_to_container(container_name=name, file_path=file, use_full_path=True)) - - # create command to submit task - cmd = app_submit_cmd( - cluster_id=cluster_id, - name=name, - app=app_resource_file.file_path, - app_args=app_args, - main_class=main_class, - jars=jars, - py_files=py_files, - files=files, - driver_java_options=driver_java_options, - driver_library_path=driver_library_path, - driver_class_path=driver_class_path, - driver_memory=driver_memory, - executor_memory=executor_memory, - driver_cores=driver_cores, - executor_cores=executor_cores) - - # Get pool size - pool = batch_client.pool.get(cluster_id) - pool_size = util.get_cluster_total_target_nodes(pool) - - # Affinitize task to master node - master_node_affinity_id = util.get_master_node_id(cluster_id) - - # Create task - task = batch_models.TaskAddParameter( - id=name, - affinity_info=batch_models.AffinityInformation( - affinity_id=master_node_affinity_id), - command_line=util.wrap_commands_in_shell(cmd), - resource_files=resource_files, - user_identity=batch_models.UserIdentity( - auto_user=batch_models.AutoUserSpecification( - scope=batch_models.AutoUserScope.task, - elevation_level=batch_models.ElevationLevel.admin)) - ) - - # Add task to batch job (which has the same name as cluster_id) - job_id = cluster_id - batch_client.task.add(job_id=job_id, task=task) - - # Wait for the app to finish - if wait: - read_log(cluster_id, name, tail=True) - - -def wait_for_app_to_be_running(cluster_id: str, app_name: str) -> batch_models.CloudTask: - """ - Wait for the batch task to leave the waiting state into running(or completed if it was fast enough) - """ - batch_client = azure_api.get_batch_client() - - while True: - task = batch_client.task.get(cluster_id, app_name) - - if task.state is batch_models.TaskState.active or task.state is batch_models.TaskState.preparing: - log.info("Task is waiting to be scheduled.") - time.sleep(5) - else: - return task - - -def check_task_node_exist(cluster_id: str, task: batch_models.CloudTask) -> bool: - batch_client = azure_api.get_batch_client() - - try: - batch_client.compute_node.get( - cluster_id, task.node_info.node_id) - return True - except batch_error.BatchErrorException: - return False - - -def get_output_file_properties(cluster_id: str, app_name: str): - while True: - try: - file = util.get_file_properties( - cluster_id, app_name, output_file) - return file - except batch_error.BatchErrorException as e: - if e.response.status_code == 404: - log.info("Output file hasn't been created yet") +from . import util, log, constants + + +class Job(): + + def __init__(self, batch_client, blob_client): + self.batch_client = batch_client + self.blob_client = blob_client + + output_file = constants.TASK_WORKING_DIR + \ + "/" + constants.SPARK_SUBMIT_LOGS_FILE + + def get_node(self, node_id: str, cluster_id: str) -> batch_models.ComputeNode: + return self.batch_client.compute_node.get(cluster_id, node_id) + + def app_submit_cmd( + self, + cluster_id: str, + name: str, + app: str, + app_args: str, + main_class: str, + jars: List[str], + py_files: List[str], + files: List[str], + driver_java_options: str, + driver_library_path: str, + driver_class_path: str, + driver_memory: str, + executor_memory: str, + driver_cores: str, + executor_cores: str): + master_id = util.get_master_node_id(cluster_id, self.batch_client) + master_ip = self.get_node(master_id, cluster_id).ip_address + + # get pool data from pool meta key/value store + pool = self.batch_client.pool.get(cluster_id) + + spark_home = constants.DOCKER_SPARK_HOME + + # 2>&1 redirect stdout and stderr to be in the same file + spark_submit_cmd = CommandBuilder( + '{0}/bin/spark-submit >> {1} 2>&1'.format(spark_home, constants.SPARK_SUBMIT_LOGS_FILE)) + spark_submit_cmd.add_option( + '--master', 'spark://{0}:7077'.format(master_ip)) + spark_submit_cmd.add_option('--name', name) + spark_submit_cmd.add_option('--class', main_class) + spark_submit_cmd.add_option('--jars', jars and ','.join(jars)) + spark_submit_cmd.add_option('--py-files', py_files and ','.join(py_files)) + spark_submit_cmd.add_option('--jars', files and ','.join(files)) + spark_submit_cmd.add_option('--driver-java-options', driver_java_options) + spark_submit_cmd.add_option('--driver-library-path', driver_library_path) + spark_submit_cmd.add_option('--driver-class-path', driver_class_path) + spark_submit_cmd.add_option('--driver-memory', driver_memory) + spark_submit_cmd.add_option('--executor-memory', executor_memory) + spark_submit_cmd.add_option('--driver-cores', driver_cores) + spark_submit_cmd.add_option('--executor-cores', executor_cores) + + spark_submit_cmd.add_argument( + '/batch/workitems/{0}/{1}/{2}/wd/'.format(cluster_id, "job-1", name) + + app + ' ' + ' '.join(app_args)) + + docker_exec_cmd = CommandBuilder('sudo docker exec') + docker_exec_cmd.add_option('-e', 'PYSPARK_PYTHON=/usr/bin/python3') + docker_exec_cmd.add_option('-i', constants.DOCKER_SPARK_CONTAINER_NAME) + docker_exec_cmd.add_argument(spark_submit_cmd.to_str()) + + return [ + docker_exec_cmd.to_str() + ] + + def submit_app( + self, + cluster_id: str, + name: str, + app: str, + app_args: List[str], + wait: bool, + main_class: str, + jars: List[str], + py_files: List[str], + files: List[str], + driver_java_options: str, + driver_library_path: str, + driver_class_path: str, + driver_memory: str, + executor_memory: str, + driver_cores: str, + executor_cores: str): + """ + Submit a spark app + """ + + resource_files = [] + + app_resource_file = util.upload_file_to_container(container_name=name, file_path=app, blob_client=self.blob_client, use_full_path=True) + # Upload application file + resource_files.append(app_resource_file) + + # Upload dependent JARS + for jar in jars: + resource_files.append( + util.upload_file_to_container(container_name=name, file_path=jar, blob_client=self.blob_client, use_full_path=True)) + + # Upload dependent python files + for py_file in py_files: + resource_files.append( + util.upload_file_to_container(container_name=name, file_path=py_file, blob_client=self.blob_client, use_full_path=True)) + + # Upload other dependent files + for file in files: + resource_files.append( + util.upload_file_to_container(container_name=name, file_path=file, blob_client=self.blob_client, use_full_path=True)) + + # create command to submit task + cmd = self.app_submit_cmd( + cluster_id=cluster_id, + name=name, + app=app_resource_file.file_path, + app_args=app_args, + main_class=main_class, + jars=jars, + py_files=py_files, + files=files, + driver_java_options=driver_java_options, + driver_library_path=driver_library_path, + driver_class_path=driver_class_path, + driver_memory=driver_memory, + executor_memory=executor_memory, + driver_cores=driver_cores, + executor_cores=executor_cores) + + # Get pool size + pool = self.batch_client.pool.get(cluster_id) + pool_size = util.get_cluster_total_target_nodes(pool) + + # Affinitize task to master node + master_node_affinity_id = util.get_master_node_id(cluster_id, self.batch_client) + + # Create task + task = batch_models.TaskAddParameter( + id=name, + affinity_info=batch_models.AffinityInformation( + affinity_id=master_node_affinity_id), + command_line=util.wrap_commands_in_shell(cmd), + resource_files=resource_files, + user_identity=batch_models.UserIdentity( + auto_user=batch_models.AutoUserSpecification( + scope=batch_models.AutoUserScope.task, + elevation_level=batch_models.ElevationLevel.admin)) + ) + + # Add task to batch job (which has the same name as cluster_id) + job_id = cluster_id + self.batch_client.task.add(job_id=job_id, task=task) + + # Wait for the app to finish + if wait: + self.read_log(cluster_id, name, tail=True) + + def wait_for_app_to_be_running(self, cluster_id: str, app_name: str) -> batch_models.CloudTask: + """ + Wait for the batch task to leave the waiting state into running(or completed if it was fast enough) + """ + while True: + task = self.batch_client.task.get(cluster_id, app_name) + + if task.state is batch_models.TaskState.active or task.state is batch_models.TaskState.preparing: + log.info("Task is waiting to be scheduled.") time.sleep(5) - continue else: - raise e - + return task -def read_log(cluster_id: str, app_name: str, tail=False): - job_id = cluster_id - task_id = app_name - - batch_client = azure_api.get_batch_client() - current_bytes = 0 - - task = wait_for_app_to_be_running(cluster_id, app_name) + def check_task_node_exist(self, cluster_id: str, task: batch_models.CloudTask) -> bool: + try: + self.batch_client.compute_node.get( + cluster_id, task.node_info.node_id) + return True + except batch_error.BatchErrorException: + return False + + def get_output_file_properties(self, cluster_id: str, app_name: str): + while True: + try: + file = util.get_file_properties( + cluster_id, app_name, self.output_file, self.batch_client) + return file + except batch_error.BatchErrorException as e: + if e.response.status_code == 404: + log.info("Output file hasn't been created yet") + time.sleep(5) + continue + else: + raise e + + def read_log(self, cluster_id: str, app_name: str, tail=False): + job_id = cluster_id + task_id = app_name + + current_bytes = 0 + + task = self.wait_for_app_to_be_running(cluster_id, app_name) + + if not self.check_task_node_exist(cluster_id, task): + log.error("The app ran on doesn't exists anymore(Node id: %s)!", + task.node_info.node_id) + return - if not check_task_node_exist(cluster_id, task): - log.error("The app ran on doesn't exists anymore(Node id: %s)!", - task.node_info.node_id) - return + while True: + file = self.get_output_file_properties(cluster_id, app_name) + target_bytes = file.content_length - while True: - file = get_output_file_properties(cluster_id, app_name) - target_bytes = file.content_length + if target_bytes != current_bytes: + ocp_range = None - if target_bytes != current_bytes: - ocp_range = None + if tail: + ocp_range = "bytes={0}-{1}".format( + current_bytes, target_bytes - 1) - if tail: - ocp_range = "bytes={0}-{1}".format( - current_bytes, target_bytes - 1) + stream = self.batch_client.file.get_from_task( + job_id, task_id, self.output_file, batch_models.FileGetFromTaskOptions(ocp_range=ocp_range)) + content = util.read_stream_as_string(stream) - stream = batch_client.file.get_from_task( - job_id, task_id, output_file, batch_models.FileGetFromTaskOptions(ocp_range=ocp_range)) - content = util.read_stream_as_string(stream) + print(content, end="") + current_bytes = target_bytes - print(content, end="") - current_bytes = target_bytes + if not tail: + return - if not tail: + if task.state is batch_models.TaskState.completed: + log.info("Spark application is completed!") return + task = self.batch_client.task.get(cluster_id, app_name) - if task.state is batch_models.TaskState.completed: - log.info("Spark application is completed!") - return - task = batch_client.task.get(cluster_id, app_name) - - time.sleep(5) + time.sleep(5) diff --git a/aztk/spark/cli/cluster_add_user.py b/aztk/spark/cli/cluster_add_user.py index 4f621ad1..121df0ad 100644 --- a/aztk/spark/cli/cluster_add_user.py +++ b/aztk/spark/cli/cluster_add_user.py @@ -1,7 +1,7 @@ import argparse import typing -from aztk import clusterlib, log - +from aztk import log +from aztk.aztklib import Aztk def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--id', dest='cluster_id', required=True, @@ -18,11 +18,12 @@ def setup_parser(parser: argparse.ArgumentParser): def execute(args: typing.NamedTuple): + aztk = Aztk() log.info('-------------------------------------------') log.info('spark cluster id: {}'.format(args.cluster_id)) log.info('username: {}'.format(args.username)) log.info('-------------------------------------------') - password, ssh_key = clusterlib.create_user( + password, ssh_key = aztk.cluster.create_user( args.cluster_id, args.username, args.password, diff --git a/aztk/spark/cli/cluster_app_logs.py b/aztk/spark/cli/cluster_app_logs.py index f5291f88..f15bac26 100644 --- a/aztk/spark/cli/cluster_app_logs.py +++ b/aztk/spark/cli/cluster_app_logs.py @@ -1,6 +1,6 @@ import argparse import typing -from aztk import joblib +from aztk.aztklib import Aztk def setup_parser(parser: argparse.ArgumentParser): @@ -17,7 +17,8 @@ def setup_parser(parser: argparse.ArgumentParser): def execute(args: typing.NamedTuple): + aztk = Aztk() cluster_id = args.cluster_id app_name = args.app_name tail = args.tail - joblib.read_log(cluster_id, app_name, tail=tail) + aztk.job.read_log(cluster_id, app_name, tail=tail) diff --git a/aztk/spark/cli/cluster_create.py b/aztk/spark/cli/cluster_create.py index ae1ef5ef..f6d2a0d7 100644 --- a/aztk/spark/cli/cluster_create.py +++ b/aztk/spark/cli/cluster_create.py @@ -1,8 +1,11 @@ import argparse import typing from aztk.config import load_spark_config, cleanup_spark_config -from aztk import clusterlib, log +from aztk import log from aztk.config import ClusterConfig +from aztk.aztklib import Aztk + + def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--id', dest='cluster_id', @@ -35,19 +38,21 @@ def setup_parser(parser: argparse.ArgumentParser): def execute(args: typing.NamedTuple): + aztk = Aztk() + # read cluster.yaml configuartion file, overwrite values with args cluster_conf = ClusterConfig() cluster_conf.merge( - uid = args.cluster_id, - size = args.size, - size_low_pri = args.size_low_pri, - vm_size = args.vm_size, - wait = args.wait, - username = args.username, - password = args.password, - ssh_key = args.ssh_key, - docker_repo = args.docker_repo) + uid=args.cluster_id, + size=args.size, + size_low_pri=args.size_low_pri, + vm_size=args.vm_size, + wait=args.wait, + username=args.username, + password=args.password, + ssh_key=args.ssh_key, + docker_repo=args.docker_repo) log.info("-------------------------------------------") log.info("spark cluster id: %s", cluster_conf.uid) @@ -64,7 +69,7 @@ def execute(args: typing.NamedTuple): log.info("-------------------------------------------") # create spark cluster - clusterlib.create_cluster( + aztk.cluster.create_cluster( cluster_conf.custom_scripts, cluster_conf.uid, cluster_conf.size, @@ -76,4 +81,7 @@ def execute(args: typing.NamedTuple): cluster_conf.docker_repo, cluster_conf.wait) - log.info("Cluster created successfully.") + if cluster_conf.wait: + log.info("Cluster %s created successfully.", cluster_conf.uid) + else: + log.info("Cluster %s is being provisioned.", cluster_conf.uid) diff --git a/aztk/spark/cli/cluster_delete.py b/aztk/spark/cli/cluster_delete.py index 3a65afa2..d8617f56 100644 --- a/aztk/spark/cli/cluster_delete.py +++ b/aztk/spark/cli/cluster_delete.py @@ -1,6 +1,7 @@ import argparse import typing -from aztk import clusterlib, log +from aztk import log +from aztk.aztklib import Aztk def setup_parser(parser: argparse.ArgumentParser): @@ -15,7 +16,9 @@ def setup_parser(parser: argparse.ArgumentParser): help='Do not prompt for confirmation, force deletion of cluster.') parser.set_defaults(force=False) + def execute(args: typing.NamedTuple): + aztk = Aztk() cluster_id = args.cluster_id if not args.force: @@ -25,7 +28,7 @@ def execute(args: typing.NamedTuple): log.error("Confirmation cluster id does not match. Please try again.") return - if clusterlib.delete_cluster(cluster_id): + if aztk.cluster.delete_cluster(cluster_id): log.info("Deleting cluster %s", cluster_id) else: log.error("Cluster with id '%s' doesn't exist or was already deleted.", cluster_id) diff --git a/aztk/spark/cli/cluster_get.py b/aztk/spark/cli/cluster_get.py index 5adfb16a..ac73e6a1 100644 --- a/aztk/spark/cli/cluster_get.py +++ b/aztk/spark/cli/cluster_get.py @@ -1,7 +1,6 @@ import argparse import typing -from aztk import clusterlib - +from aztk.aztklib import Aztk def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--id', @@ -11,6 +10,7 @@ def setup_parser(parser: argparse.ArgumentParser): def execute(args: typing.NamedTuple): + aztk = Aztk() cluster_id = args.cluster_id - cluster = clusterlib.get_cluster(cluster_id) - clusterlib.print_cluster(cluster) + cluster = aztk.cluster.get_cluster(cluster_id) + aztk.cluster.print_cluster(cluster) diff --git a/aztk/spark/cli/cluster_list.py b/aztk/spark/cli/cluster_list.py index 637c84f5..4ed2d6fd 100644 --- a/aztk/spark/cli/cluster_list.py +++ b/aztk/spark/cli/cluster_list.py @@ -1,6 +1,6 @@ import argparse import typing -from aztk import clusterlib +from aztk.aztklib import Aztk def setup_parser(_: argparse.ArgumentParser): @@ -9,5 +9,6 @@ def setup_parser(_: argparse.ArgumentParser): def execute(_: typing.NamedTuple): - clusters = clusterlib.list_clusters() - clusterlib.print_clusters(clusters) + aztk = Aztk() + clusters = aztk.cluster.list_clusters() + aztk.cluster.print_clusters(clusters) diff --git a/aztk/spark/cli/cluster_ssh.py b/aztk/spark/cli/cluster_ssh.py index 8e42eeb1..856be383 100644 --- a/aztk/spark/cli/cluster_ssh.py +++ b/aztk/spark/cli/cluster_ssh.py @@ -1,8 +1,8 @@ import argparse import typing -from aztk import clusterlib, log +from aztk import log from aztk.config import SshConfig - +from aztk.aztklib import Aztk def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--id', dest="cluster_id", @@ -24,6 +24,7 @@ def setup_parser(parser: argparse.ArgumentParser): def execute(args: typing.NamedTuple): + aztk = Aztk() ssh_conf = SshConfig() ssh_conf.merge( @@ -45,7 +46,7 @@ def execute(args: typing.NamedTuple): log.info("-------------------------------------------") # get ssh command - ssh_cmd = clusterlib.ssh_in_master( + ssh_cmd = aztk.cluster.ssh_in_master( cluster_id=ssh_conf.cluster_id, webui=ssh_conf.web_ui_port, jobui=ssh_conf.job_ui_port, diff --git a/aztk/spark/cli/cluster_submit.py b/aztk/spark/cli/cluster_submit.py index d68c9358..4ae308b4 100644 --- a/aztk/spark/cli/cluster_submit.py +++ b/aztk/spark/cli/cluster_submit.py @@ -1,7 +1,7 @@ import argparse import typing -from aztk import joblib, log - +from aztk import log +from aztk.aztklib import Aztk def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--id', dest='cluster_id', required=True, @@ -67,6 +67,7 @@ def setup_parser(parser: argparse.ArgumentParser): def execute(args: typing.NamedTuple): + aztk = Aztk() jars = [] py_files = [] files = [] @@ -110,7 +111,7 @@ def execute(args: typing.NamedTuple): log.info("Application arguments: %s", args.app_args) log.info("-------------------------------------------") - joblib.submit_app( + aztk.job.submit_app( cluster_id=args.cluster_id, name=args.name, app=args.app, diff --git a/aztk/upload_node_scripts.py b/aztk/upload_node_scripts.py index 36ed5d10..91d9ac80 100644 --- a/aztk/upload_node_scripts.py +++ b/aztk/upload_node_scripts.py @@ -43,14 +43,15 @@ def __create_zip(): logging.debug("Ziped file") -def __upload(): +def __upload(blob_client): logging.debug("Uploading node scripts...") return util.upload_file_to_container( container_name="spark-node-scripts", file_path=local_tmp_zipfile, + blob_client=blob_client, use_full_path=False) -def zip_and_upload(): +def zip_and_upload(blob_client): __create_zip() - return __upload() + return __upload(blob_client) diff --git a/aztk/util.py b/aztk/util.py index ebb0b90c..16ce7511 100644 --- a/aztk/util.py +++ b/aztk/util.py @@ -8,14 +8,14 @@ import azure.batch.models as batch_models import azure.storage.blob as blob from .version import __version__ -from . import azure_api, constants, log +from . import constants, log _STANDARD_OUT_FILE_NAME = 'stdout.txt' _STANDARD_ERROR_FILE_NAME = 'stderr.txt' -def wait_for_tasks_to_complete(job_id, timeout): +def wait_for_tasks_to_complete(job_id, timeout, batch_client): """ Waits for all the tasks in a particular job to complete. :param batch_client: The batch client to use. @@ -24,8 +24,6 @@ def wait_for_tasks_to_complete(job_id, timeout): :param timeout: The maximum amount of time to wait. :type timeout: `datetime.timedelta` """ - batch_client = azure_api.get_batch_client() - time_to_timeout_at = datetime.datetime.now() + timeout while datetime.datetime.now() < time_to_timeout_at: @@ -44,14 +42,13 @@ class MasterInvalidStateError(Exception): pass -def wait_for_master_to_be_ready(cluster_id: str): - batch_client = azure_api.get_batch_client() +def wait_for_master_to_be_ready(cluster_id: str, batch_client): master_node_id = None log.info("Waiting for spark master to be ready") start_time = datetime.datetime.now() while True: if not master_node_id: - master_node_id = get_master_node_id(cluster_id) + master_node_id = get_master_node_id(cluster_id, batch_client) if not master_node_id: time.sleep(5) continue @@ -76,11 +73,11 @@ def wait_for_master_to_be_ready(cluster_id: str): time.sleep(5) -def upload_file_to_container(container_name, file_path, use_full_path=False, node_path=None) -> batch_models.ResourceFile: +def upload_file_to_container(container_name, file_path, blob_client=None, use_full_path=False, node_path=None) -> batch_models.ResourceFile: """ Uploads a local file to an Azure Blob storage container. - :param block_blob_client: A blob service client. - :type block_blob_client: `azure.storage.blob.BlockBlobService` + :param blob_client: A blob service client. + :type blocblob_clientk_blob_client: `azure.storage.blob.BlockBlobService` :param str container_name: The name of the Azure Blob storage container. :param str file_path: The local path to the file. :param str node_path: Path on the local node. By default will be the same as file_path @@ -88,7 +85,6 @@ def upload_file_to_container(container_name, file_path, use_full_path=False, nod :return: A ResourceFile initialized with a SAS URL appropriate for Batch tasks. """ - block_blob_client = azure_api.get_blob_client() file_path = normalize_path(file_path) blob_name = None if use_full_path: @@ -99,20 +95,20 @@ def upload_file_to_container(container_name, file_path, use_full_path=False, nod if not node_path: node_path = blob_name - block_blob_client.create_container(container_name, + blob_client.create_container(container_name, fail_on_exist=False) - block_blob_client.create_blob_from_path(container_name, + blob_client.create_blob_from_path(container_name, blob_name, file_path) - sas_token = block_blob_client.generate_blob_shared_access_signature( + sas_token = blob_client.generate_blob_shared_access_signature( container_name, blob_name, permission=blob.BlobPermissions.READ, expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=2)) - sas_url = block_blob_client.make_blob_url(container_name, + sas_url = blob_client.make_blob_url(container_name, blob_name, sas_token=sas_token) @@ -148,12 +144,11 @@ def get_master_node_id_from_pool(pool: batch_models.CloudPool): return None -def get_master_node_id(pool_id): - batch_client = azure_api.get_batch_client() +def get_master_node_id(pool_id, batch_client): return get_master_node_id_from_pool(batch_client.pool.get(pool_id)) -def create_pool_if_not_exist(pool): +def create_pool_if_not_exist(pool, batch_client): """ Creates the specified pool if it doesn't already exist :param batch_client: The batch client to use. @@ -161,9 +156,6 @@ def create_pool_if_not_exist(pool): :param pool: The pool to create. :type pool: `batchserviceclient.models.PoolAddParameter` """ - - batch_client = azure_api.get_batch_client() - try: batch_client.pool.add(pool) return True @@ -174,7 +166,7 @@ def create_pool_if_not_exist(pool): return False -def wait_for_all_nodes_state(pool, node_state): +def wait_for_all_nodes_state(pool, node_state, batch_client): """ Waits for all nodes in pool to reach any specified state in set :param batch_client: The batch client to use. @@ -185,8 +177,6 @@ def wait_for_all_nodes_state(pool, node_state): :rtype: list :return: list of `batchserviceclient.models.ComputeNode` """ - batch_client = azure_api.get_batch_client() - log.info('Waiting for all nodes in pool %s to reach desired state...', pool.id) while True: # refresh pool to ensure that there is no resize error @@ -205,7 +195,7 @@ def wait_for_all_nodes_state(pool, node_state): def select_latest_verified_vm_image_with_node_agent_sku( - publisher, offer, sku_starts_with): + publisher, offer, sku_starts_with, batch_client): """ Select the latest verified image that Azure Batch supports given a publisher, offer and sku (starts with filter). @@ -217,8 +207,6 @@ def select_latest_verified_vm_image_with_node_agent_sku( :rtype: tuple :return: (node agent sku id to use, vm image ref to use) """ - batch_client = azure_api.get_batch_client() - # get verified vm image list and node agent sku ids from service node_agent_skus = batch_client.account.list_node_agent_skus() @@ -237,12 +225,12 @@ def select_latest_verified_vm_image_with_node_agent_sku( def create_sas_token( - container_name, blob_name, permission, expiry=None, + container_name, blob_name, permission, blob_client, expiry=None, timeout=None): """ Create a blob sas token - :param block_blob_client: The storage block blob client to use. - :type block_blob_client: `azure.storage.blob.BlockBlobService` + :param blob_client: The storage block blob client to use. + :type blob_client: `azure.storage.blob.BlockBlobService` :param str container_name: The name of the container to upload the blob to. :param str blob_name: The name of the blob to upload the local file to. :param expiry: The SAS expiry time. @@ -252,24 +240,22 @@ def create_sas_token( :return: A SAS token :rtype: str """ - block_blob_client = azure_api.get_blob_client() - if expiry is None: if timeout is None: timeout = 30 expiry = datetime.datetime.utcnow() + datetime.timedelta( minutes=timeout) - return block_blob_client.generate_blob_shared_access_signature( + return blob_client.generate_blob_shared_access_signature( container_name, blob_name, permission=permission, expiry=expiry) def upload_blob_and_create_sas( - container_name, blob_name, file_name, expiry, + container_name, blob_name, file_name, expiry, blob_client, timeout=None): """ Uploads a file from local disk to Azure Storage and creates a SAS for it. - :param block_blob_client: The storage block blob client to use. - :type block_blob_client: `azure.storage.blob.BlockBlobService` + :param blob_client: The storage block blob client to use. + :type blob_client: `azure.storage.blob.BlockBlobService` :param str container_name: The name of the container to upload the blob to. :param str blob_name: The name of the blob to upload the local file to. :param str file_name: The name of the local file to upload. @@ -280,13 +266,11 @@ def upload_blob_and_create_sas( :return: A SAS URL to the blob with the specified expiry time. :rtype: str """ - block_blob_client = azure_api.get_blob_client() - - block_blob_client.create_container( + blob_client.create_container( container_name, fail_on_exist=False) - block_blob_client.create_blob_from_path( + blob_client.create_blob_from_path( container_name, blob_name, file_name) @@ -295,10 +279,11 @@ def upload_blob_and_create_sas( container_name, blob_name, permission=blob.BlobPermissions.READ, + blob_client = None, expiry=expiry, timeout=timeout) - sas_url = block_blob_client.make_blob_url( + sas_url = blob_client.make_blob_url( container_name, blob_name, sas_token=sas_token) @@ -318,7 +303,7 @@ def wrap_commands_in_shell(commands): ';'.join(commands)) -def get_connection_info(pool_id, node_id): +def get_connection_info(pool_id, node_id, batch_client): """ Get connection info of specified node in pool :param batch_client: The batch client to use. @@ -326,8 +311,6 @@ def get_connection_info(pool_id, node_id): :param str pool_id: The pool id to look up :param str node_id: The node id to look up """ - batch_client = azure_api.get_batch_client() - rls = batch_client.compute_node.get_remote_login_settings( pool_id, node_id) remote_ip = rls.remote_login_ip_address @@ -380,9 +363,7 @@ def normalize_path(path: str)-> str: return path -def get_file_properties(job_id: str, task_id: str, file_path: str): - batch_client = azure_api.get_batch_client() - +def get_file_properties(job_id: str, task_id: str, file_path: str, batch_client): raw = batch_client.file.get_properties_from_task( job_id, task_id, file_path, raw=True) diff --git a/config/secrets.yaml.template b/config/secrets.yaml.template index e75a547b..2e2509ec 100644 --- a/config/secrets.yaml.template +++ b/config/secrets.yaml.template @@ -1,13 +1,14 @@ # For instructions on creating a Batch and Storage account, see # Getting Started (https://github.com/Azure/aztk/blob/master/docs/00-getting-started.md) +# NOTE - YAML requires a space after the colon. Ex: "batchaccountname: mybatchaccount" batch: - batchaccountname: - batchaccountkey: - batchserviceurl: + batchaccountname: + batchaccountkey: + batchserviceurl: storage: - storageaccountname: - storageaccountkey: + storageaccountname: + storageaccountkey: storageaccountsuffix: core.windows.net # Configuration for private docker repositories. If using public containers you do not need to provide authentification diff --git a/tests/test_clusterlib.py b/tests/test_clusterlib.py index d9326f45..9539402e 100644 --- a/tests/test_clusterlib.py +++ b/tests/test_clusterlib.py @@ -1,6 +1,7 @@ -from aztk import clusterlib +from aztk.clusterlib import Cluster import azure.batch.models as batch_models +clusterlib = Cluster(None, None, None, None, None) def create_mock_cluster( pool_id='test', @@ -22,7 +23,7 @@ def create_mock_cluster( state=state, allocation_state=allocation_state ) - cluster = clusterlib.Cluster(pool) + cluster = clusterlib.ClusterModel(pool) return cluster