diff --git a/aztk/spark/helpers/submit.py b/aztk/spark/helpers/submit.py index 0feee70f..8e01b39b 100644 --- a/aztk/spark/helpers/submit.py +++ b/aztk/spark/helpers/submit.py @@ -101,11 +101,20 @@ def generate_task(spark_client, container_id, application): return task +def affinitize_task_to_master(spark_client, cluster_id, task): + cluster = spark_client.get_cluster(cluster_id) + master_node = spark_client.batch_client.compute_node.get(pool_id=cluster_id, node_id=cluster.master_node_id) + task.affinity_info = batch_models.AffinityInformation(affinity_id=master_node.affinity_id) + return task + + def submit_application(spark_client, cluster_id, application, wait: bool = False): """ Submit a spark app """ task = generate_task(spark_client, cluster_id, application) + task = affinitize_task_to_master(spark_client, cluster_id, task) + # Add task to batch job (which has the same name as cluster_id) job_id = cluster_id diff --git a/node_scripts/job_submission.py b/node_scripts/job_submission.py index 1fa9db6e..31ffddce 100644 --- a/node_scripts/job_submission.py +++ b/node_scripts/job_submission.py @@ -1,13 +1,22 @@ -import sys +import datetime import os -import yaml import subprocess -import datetime +import sys from typing import List -import azure.storage.blob as blob import azure.batch.models as batch_models +import azure.storage.blob as blob +import yaml from command_builder import CommandBuilder from core import config +from install.pick_master import get_master_node_id + + +def affinitize_task_to_master(batch_client, cluster_id, task): + pool = batch_client.pool.get(config.pool_id) + master_node_id = get_master_node_id(pool) + master_node = batch_client.compute_node.get(pool_id=cluster_id, node_id=master_node_id) + task.affinity_info = batch_models.AffinityInformation(affinity_id=master_node.affinity_id) + return task def schedule_tasks(tasks_path): @@ -16,7 +25,7 @@ def schedule_tasks(tasks_path): ''' batch_client = config.batch_client blob_client = config.blob_client - + for task_definition in tasks_path: with open(task_definition, 'r') as stream: try: @@ -24,6 +33,8 @@ def schedule_tasks(tasks_path): except yaml.YAMLError as exc: print(exc) + # affinitize task to master + task = affinitize_task_to_master(batch_client, os.environ["AZ_BATCH_POOL_ID"], task) # schedule the task batch_client.task.add(job_id=os.environ['AZ_BATCH_JOB_ID'], task=task)