Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Feature: Spark add worker on master option (#415)
Browse files Browse the repository at this point in the history
* Add worker_on_master to ClusterConfiguration

* add worker_on_master to JobConfiguration
  • Loading branch information
jafreck authored Feb 23, 2018
1 parent 17755e0 commit e188170
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 15 deletions.
8 changes: 5 additions & 3 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool =
cluster_conf.gpu_enabled(),
cluster_conf.docker_repo,
cluster_conf.file_shares,
cluster_conf.mixed_mode())
cluster_conf.mixed_mode(),
cluster_conf.worker_on_master)

software_metadata_key = "spark"

Expand Down Expand Up @@ -170,7 +171,8 @@ def submit_job(self, job_configuration):
start_task = create_cluster_helper.generate_cluster_start_task(self,
zip_resource_files,
job_configuration.gpu_enabled,
job_configuration.docker_repo)
job_configuration.docker_repo,
worker_on_master=job_configuration.worker_on_master)

application_tasks = []
for application in job_configuration.applications:
Expand Down Expand Up @@ -199,7 +201,7 @@ def submit_job(self, job_configuration):
else:
raise error.AztkError("Jobs do not support both dedicated and low priority nodes." \
" JobConfiguration fields max_dedicated_nodes and max_low_pri_nodes are mutually exclusive values.")

job = self.__submit_job(
job_configuration=job_configuration,
start_task=start_task,
Expand Down
24 changes: 18 additions & 6 deletions aztk/spark/helpers/create_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from aztk.utils import helpers
from aztk.utils import constants
from aztk import models as aztk_models
from aztk.spark.models import ClusterConfiguration
import azure.batch.models as batch_models

POOL_ADMIN_USER_IDENTITY = batch_models.UserIdentity(
Expand All @@ -13,7 +14,11 @@
'''
Cluster create helper methods
'''
def __docker_run_cmd(docker_repo: str = None, gpu_enabled: bool = False, file_mounts = None, mixed_mode = False) -> str:
def __docker_run_cmd(docker_repo: str = None,
gpu_enabled: bool = False,
worker_on_master: bool = True,
file_mounts = None,
mixed_mode = False) -> str:
"""
Build the docker run command by setting up the environment variables
"""
Expand Down Expand Up @@ -45,6 +50,11 @@ def __docker_run_cmd(docker_repo: str = None, gpu_enabled: bool = False, file_mo
cmd.add_option('-e', 'AZ_BATCH_NODE_ID=$AZ_BATCH_NODE_ID')
cmd.add_option(
'-e', 'AZ_BATCH_NODE_IS_DEDICATED=$AZ_BATCH_NODE_IS_DEDICATED')
if worker_on_master is not None:
cmd.add_option('-e', 'WORKER_ON_MASTER={}'.format(worker_on_master))
else:
# default to True if not specified
cmd.add_option('-e', 'WORKER_ON_MASTER={}'.format(True))
cmd.add_option('-e', 'MIXED_MODE={}'.format(mixed_mode))
cmd.add_option('-e', 'SPARK_WEB_UI_PORT=$SPARK_WEB_UI_PORT')
cmd.add_option('-e', 'SPARK_WORKER_UI_PORT=$SPARK_WORKER_UI_PORT')
Expand All @@ -69,7 +79,7 @@ def __docker_run_cmd(docker_repo: str = None, gpu_enabled: bool = False, file_mo
cmd.add_option('-p', '50090:50090') # Secondary NameNode http address
cmd.add_option('-d', docker_repo)
cmd.add_argument('/bin/bash /mnt/batch/tasks/startup/wd/docker_main.sh')

return cmd.to_str()

def __get_docker_credentials(spark_client):
Expand Down Expand Up @@ -123,6 +133,7 @@ def __get_secrets_env(spark_client):
def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile,
gpu_enabled: bool,
docker_repo: str = None,
worker_on_master: bool = True,
file_mounts = None,
mixed_mode: bool = False):
"""
Expand Down Expand Up @@ -159,7 +170,7 @@ def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile,
constants.DOCKER_SPARK_CONTAINER_NAME,
gpu_enabled,
docker_repo,
__docker_run_cmd(docker_repo, gpu_enabled, file_mounts, mixed_mode)),
__docker_run_cmd(docker_repo, gpu_enabled, worker_on_master, file_mounts, mixed_mode)),
]

commands = shares + setup
Expand All @@ -171,7 +182,8 @@ def generate_cluster_start_task(
gpu_enabled: bool,
docker_repo: str = None,
file_shares: List[aztk_models.FileShare] = None,
mixed_mode: bool = False):
mixed_mode: bool = False,
worker_on_master: bool = True):
"""
This will return the start task object for the pool to be created.
:param cluster_id str: Id of the cluster(Used for uploading the resource files)
Expand All @@ -184,7 +196,7 @@ def generate_cluster_start_task(
spark_jupyter_port = constants.DOCKER_SPARK_JUPYTER_PORT
spark_job_ui_port = constants.DOCKER_SPARK_JOB_UI_PORT
spark_rstudio_server_port = constants.DOCKER_SPARK_RSTUDIO_SERVER_PORT

spark_container_name = constants.DOCKER_SPARK_CONTAINER_NAME
spark_submit_logs_file = constants.SPARK_SUBMIT_LOGS_FILE

Expand All @@ -207,7 +219,7 @@ def generate_cluster_start_task(
] + __get_docker_credentials(spark_client)

# start task command
command = __cluster_install_cmd(zip_resource_file, gpu_enabled, docker_repo, file_shares, mixed_mode)
command = __cluster_install_cmd(zip_resource_file, gpu_enabled, docker_repo, worker_on_master, file_shares, mixed_mode)

return batch_models.StartTask(
command_line=helpers.wrap_commands_in_shell(command),
Expand Down
17 changes: 13 additions & 4 deletions aztk/spark/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ def __init__(
vm_low_pri_count=0,
vm_size=None,
subnet_id=None,
docker_repo: str=None,
user_configuration: UserConfiguration=None,
spark_configuration: SparkConfiguration = None):
docker_repo: str = None,
user_configuration: UserConfiguration = None,
spark_configuration: SparkConfiguration = None,
worker_on_master: bool = None):
super().__init__(
custom_scripts=custom_scripts,
cluster_id=cluster_id,
Expand All @@ -108,10 +109,16 @@ def __init__(
user_configuration=user_configuration,
)
self.spark_configuration = spark_configuration
self.worker_on_master = worker_on_master

def gpu_enabled(self):
return helpers.is_gpu_enabled(self.vm_size)

def merge(self, other):
super().merge(other)
self._merge_attributes(other, ["worker_on_master"])


class SecretsConfiguration(aztk.models.SecretsConfiguration):
pass

Expand Down Expand Up @@ -198,7 +205,8 @@ def __init__(
docker_repo=None,
max_dedicated_nodes=None,
max_low_pri_nodes=None,
subnet_id=None):
subnet_id=None,
worker_on_master=None):
self.id = id
self.applications = applications
self.custom_scripts = custom_scripts
Expand All @@ -209,6 +217,7 @@ def __init__(
self.max_dedicated_nodes = max_dedicated_nodes
self.max_low_pri_nodes = max_low_pri_nodes
self.subnet_id = subnet_id
self.worker_on_master = worker_on_master


class JobState():
Expand Down
5 changes: 5 additions & 0 deletions cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ def cluster_config_from_dict(config: dict):
if config.get('docker_repo') is not None:
output.docker_repo = config['docker_repo']

if config.get('worker_on_master') is not None:
output.worker_on_master = config['worker_on_master']

if config.get('wait') is not None:
wait = config['wait']

Expand Down Expand Up @@ -302,6 +305,7 @@ def __init__(self):
self.spark_env_sh = None
self.core_site_xml = None
self.subnet_id = None
self.worker_on_master = None

def _merge_dict(self, config):
config = config.get('job')
Expand All @@ -317,6 +321,7 @@ def _merge_dict(self, config):
self.max_low_pri_nodes = cluster_configuration.get('size_low_pri')
self.custom_scripts = cluster_configuration.get('custom_scripts')
self.subnet_id = cluster_configuration.get('subnet_id')
self.worker_on_master = cluster_configuration.get("worker_on_master")

self.applications = config.get('applications')

Expand Down
3 changes: 2 additions & 1 deletion cli/spark/endpoints/job/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def execute(args: typing.NamedTuple):
docker_repo=job_conf.docker_repo,
max_dedicated_nodes=job_conf.max_dedicated_nodes,
max_low_pri_nodes=job_conf.max_low_pri_nodes,
subnet_id=job_conf.subnet_id
subnet_id=job_conf.subnet_id,
worker_on_master=job_conf.worker_on_master
)

#TODO: utils.print_job_conf(job_configuration)
Expand Down
3 changes: 2 additions & 1 deletion node_scripts/install/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def setup_as_master():
print("Setting up as master.")
spark.setup_connection()
spark.start_spark_master()
spark.start_spark_worker()
if os.environ["WORKER_ON_MASTER"] == "True":
spark.start_spark_worker()


def setup_as_worker():
Expand Down

0 comments on commit e188170

Please sign in to comment.