Skip to content

Commit

Permalink
Add aws instance type to affinity terms in the pod template
Browse files Browse the repository at this point in the history
  • Loading branch information
austinzh committed Jan 19, 2024
1 parent 5e12922 commit cecffa3
Showing 1 changed file with 67 additions and 38 deletions.
105 changes: 67 additions & 38 deletions paasta_tools/cli/cmds/spark_run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import copy
import json
import logging
import os
Expand All @@ -14,6 +15,7 @@
from typing import Optional
from typing import Tuple
from typing import Union
from collections import defaultdict

import yaml
from boto3.exceptions import Boto3Error
Expand Down Expand Up @@ -75,27 +77,9 @@
POD_TEMPLATE_PATH = "/nail/tmp/spark-pt-{file_uuid}.yaml"
DEFAULT_RUNTIME_TIMEOUT = "12h"

POD_TEMPLATE = """
apiVersion: v1
kind: Pod
metadata:
labels:
spark: {spark_pod_label}
spec:
dnsPolicy: Default
affinity:
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 95
podAffinityTerm:
labelSelector:
matchExpressions:
- key: spark
operator: In
values:
- {spark_pod_label}
topologyKey: topology.kubernetes.io/hostname
"""
POD_TEMPLATE = defaultdict(
dict, {"apiVersion": "v1", "kind": "Pod", "spec": {"dnsPolicy": "Default"}}
)

deprecated_opts = {
"j": "spark.jars",
Expand Down Expand Up @@ -265,6 +249,11 @@ def add_subparser(subparsers):
default=default_spark_pool,
)

list_parser.add_argument(
"--aws-instance-types",
help="AWS instance types for executor, seperate by comma(,)",
)

list_parser.add_argument(
"-w",
"--work-dir",
Expand Down Expand Up @@ -522,6 +511,47 @@ def should_enable_compact_bin_packing(disable_compact_bin_packing, cluster_manag
return True


# inplace add a low priority podAffinityTerm for compact bin packing
def add_compact_bin_packing_affinity_term(pod: Dict, spark_pod_label: str):
pod["metadata"]["labels"]["spark"] = spark_pod_label
pod["spec"]["affinity"]["podAffinity"].setdefault(
"preferredDuringSchedulingIgnoredDuringExecution", []
).append(
{
"weight": 95,
"podAffinityTerm": {
"labelSelector": {
"matchExpressions": [
{
"key": "spark",
"operator": "In",
"values": [{"spark_pod_label": spark_pod_label}],
}
]
},
"topologyKey": "topology.kubernetes.io/hostname",
},
}
)
return pod


# inplace add nodeAffinity for node selection
def add_node_affinity_terms(pod: Dict, pool: str, instance_types: str):
pod["spec"]["affinity"]["nodeAffinity"][
"requiredDuringSchedulingIgnoredDuringExecution"
].setdefault("nodeSelectorTerms", []).extend(
[
{
"key": "node.kubernetes.io/instance-type",
"operator": "In",
"values": instance_types.split(","),
},
]
)
return pod


def get_docker_run_cmd(
container_name,
volumes,
Expand Down Expand Up @@ -694,11 +724,8 @@ def get_spark_env(

def _parse_user_spark_args(
spark_args: Optional[str],
pod_template_path: str,
enable_compact_bin_packing: bool = False,
enable_spark_dra: bool = False,
) -> Dict[str, str]:

user_spark_opts = {}
if spark_args:
for spark_arg in spark_args.split():
Expand All @@ -713,9 +740,6 @@ def _parse_user_spark_args(
sys.exit(1)
user_spark_opts[fields[0]] = fields[1]

if enable_compact_bin_packing:
user_spark_opts["spark.kubernetes.executor.podTemplateFile"] = pod_template_path

if enable_spark_dra:
if (
"spark.dynamicAllocation.enabled" in user_spark_opts
Expand Down Expand Up @@ -1286,21 +1310,26 @@ def paasta_spark_run(args):

volumes = instance_config.get_volumes(system_paasta_config.get_volumes())
app_base_name = get_spark_app_name(args.cmd or instance_config.get_cmd())

if args.enable_compact_bin_packing:
document = POD_TEMPLATE.format(
spark_pod_label=limit_size_with_hash(f"exec-{app_base_name}"),
)
parsed_pod_template = yaml.safe_load(document)
with open(pod_template_path, "w") as f:
yaml.dump(parsed_pod_template, f)


user_spark_opts = _parse_user_spark_args(
args.spark_args,
pod_template_path,
args.enable_compact_bin_packing,
args.enable_dra,
)

if "spark.kubernetes.executor.podTemplateFile" not in user_spark_opts:
# update pod_template only if use does not provide one
pod_template = copy.deepcopy(POD_TEMPLATE)

spark_pod_label = limit_size_with_hash(f"exec-{app_base_name}")
if args.enable_compact_bin_packing:
add_compact_bin_packing_affinity_term(pod_template, spark_pod_label)
if args.aws_instance_types:
add_node_affinity_terms(pod_template, args.aws_instance_types)

with open(pod_template_path, "w") as f:
yaml.dump(pod_template, f)
user_spark_opts["spark.kubernetes.executor.podTemplateFile"] = pod_template_path


args.cmd = _auto_add_timeout_for_job(args.cmd, args.timeout_job_runtime)

Expand Down

0 comments on commit cecffa3

Please sign in to comment.