Skip to content

Commit

Permalink
MLCOMPUTE-2001 | Cleanup spark related logs from setup_tron_namespace (
Browse files Browse the repository at this point in the history
…#3979)

* Cleanup spark related logs from setup_tron_namespace

* Fix driver on k8s pod template overwriting
  • Loading branch information
chi-yelp authored Oct 14, 2024
1 parent 613d3d9 commit fd8188b
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 26 deletions.
4 changes: 2 additions & 2 deletions paasta_tools/cli/cmds/spark_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ def add_subparser(subparsers):
"default_pool"
)
except PaastaNotConfiguredError:
default_spark_cluster = "pnw-devc"
default_spark_cluster = "pnw-devc-spark"
default_spark_pool = "batch"
valid_clusters = ["spark-pnw-prod", "pnw-devc"]
valid_clusters = ["pnw-devc-spark", "pnw-prod-spark"]

list_parser.add_argument(
"-c",
Expand Down
2 changes: 1 addition & 1 deletion paasta_tools/setup_tron_namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def main():
# since we need to print out what failed in either case
failed.append(service)

if args.bulk_config_fetch:
if args.dry_run and args.bulk_config_fetch:
updated_namespaces = client.update_namespaces(new_configs)

if updated_namespaces:
Expand Down
33 changes: 20 additions & 13 deletions paasta_tools/spark_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
SPARK_AWS_CREDS_PROVIDER = "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
SPARK_EXECUTOR_NAMESPACE = "paasta-spark"
SPARK_DRIVER_POOL = "stable"
SPARK_JOB_USER = "TRON"
SPARK_TRON_JOB_USER = "TRON"
SPARK_PROMETHEUS_SHARD = "ml-compute"
SPARK_DNS_POD_TEMPLATE = "/nail/srv/configs/spark_dns_pod_template.yaml"
MEM_MULTIPLIER = {"k": 1024, "m": 1024**2, "g": 1024**3, "t": 1024**4}
Expand Down Expand Up @@ -176,7 +176,9 @@ def inject_spark_conf_str(original_cmd: str, spark_conf_str: str) -> str:
return original_cmd


def auto_add_timeout_for_spark_job(cmd: str, timeout_job_runtime: str) -> str:
def auto_add_timeout_for_spark_job(
cmd: str, timeout_job_runtime: str, silent: bool = False
) -> str:
# Timeout only to be added for spark-submit commands
# TODO: Add timeout for jobs using mrjob with spark-runner
if "spark-submit" not in cmd:
Expand All @@ -189,16 +191,17 @@ def auto_add_timeout_for_spark_job(cmd: str, timeout_job_runtime: str) -> str:
split_cmd = cmd.split("spark-submit")
# split_cmd[0] will always be an empty string or end with a space
cmd = f"{split_cmd[0]}timeout {timeout_job_runtime} spark-submit{split_cmd[1]}"
log.info(
PaastaColors.blue(
f"NOTE: Job will exit in given time {timeout_job_runtime}. "
f"Adjust timeout value using --timeout-job-timeout. "
f"New Updated Command with timeout: {cmd}"
),
)
if not silent:
log.info(
PaastaColors.blue(
f"NOTE: Job will exit in given time {timeout_job_runtime}. "
f"Adjust timeout value using --timeout-job-runtime. "
f"New Updated Command with timeout: {cmd}"
),
)
except Exception as e:
err_msg = (
f"'timeout' could not be added to command: '{cmd}' due to error '{e}'. "
f"'timeout' could not be added to spark command: '{cmd}' due to error '{e}'. "
"Please report to #spark."
)
log.warn(err_msg)
Expand All @@ -211,9 +214,12 @@ def build_spark_command(
spark_config_dict: Dict[str, Any],
is_mrjob: bool,
timeout_job_runtime: str,
silent: bool = False,
) -> str:
command = f"{inject_spark_conf_str(original_cmd, create_spark_config_str(spark_config_dict, is_mrjob=is_mrjob))}"
return auto_add_timeout_for_spark_job(command, timeout_job_runtime)
command = inject_spark_conf_str(
original_cmd, create_spark_config_str(spark_config_dict, is_mrjob=is_mrjob)
)
return auto_add_timeout_for_spark_job(command, timeout_job_runtime, silent=silent)


def get_spark_ports_from_config(spark_conf: Dict[str, str]) -> List[int]:
Expand All @@ -238,14 +244,15 @@ def get_spark_driver_monitoring_annotations(

def get_spark_driver_monitoring_labels(
spark_config: Dict[str, str],
user: str,
) -> Dict[str, str]:
"""
Returns Spark driver pod labels - generally for Prometheus metric relabeling.
"""
ui_port_str = str(spark_config.get("spark.ui.port", ""))
labels = {
"paasta.yelp.com/prometheus_shard": SPARK_PROMETHEUS_SHARD,
"spark.yelp.com/user": SPARK_JOB_USER,
"spark.yelp.com/user": user,
"spark.yelp.com/driver_ui_port": ui_port_str,
}
return labels
19 changes: 11 additions & 8 deletions paasta_tools/tron_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def build_spark_config(self) -> Dict[str, str]:

docker_img_url = self.get_docker_url(system_paasta_config)

spark_conf_builder = SparkConfBuilder()
spark_conf_builder = SparkConfBuilder(is_driver_on_k8s_tron=True)
spark_conf = spark_conf_builder.get_spark_conf(
cluster_manager="kubernetes",
spark_app_base_name=spark_app_name,
Expand All @@ -366,7 +366,7 @@ def build_spark_config(self) -> Dict[str, str]:
force_spark_resource_configs=self.config_dict.get(
"force_spark_resource_configs", False
),
user=spark_tools.SPARK_JOB_USER,
user=spark_tools.SPARK_TRON_JOB_USER,
)
# delete the dynamically generated spark.app.id to prevent frequent config updates in Tron.
# spark.app.id will be generated later by yelp spark-submit wrapper or Spark itself.
Expand All @@ -380,16 +380,17 @@ def build_spark_config(self) -> Dict[str, str]:
if "spark.app.name" not in stringified_spark_args
else stringified_spark_args["spark.app.name"]
)
# TODO: Remove this once dynamic pod template is generated inside the driver using spark-submit wrapper

# TODO(MLCOMPUTE-1220): Remove this once dynamic pod template is generated inside the driver using spark-submit wrapper
if "spark.kubernetes.executor.podTemplateFile" in spark_conf:
print(
log.info(
f"Replacing spark.kubernetes.executor.podTemplateFile="
f"{spark_conf['spark.kubernetes.executor.podTemplateFile']} with "
f"spark.kubernetes.executor.podTemplateFile={spark_tools.SPARK_DNS_POD_TEMPLATE}"
)
spark_conf[
"spark.kubernetes.executor.podTemplateFile"
] = spark_tools.SPARK_DNS_POD_TEMPLATE
spark_conf[
"spark.kubernetes.executor.podTemplateFile"
] = spark_tools.SPARK_DNS_POD_TEMPLATE

spark_conf.update(
{
Expand Down Expand Up @@ -1044,6 +1045,7 @@ def format_tron_action_dict(action_config: TronActionConfig):
action_config.config_dict.get(
"max_runtime", spark_tools.DEFAULT_SPARK_RUNTIME_TIMEOUT
),
silent=True,
)
# point to the KUBECONFIG needed by Spark driver
result["env"]["KUBECONFIG"] = system_paasta_config.get_spark_kubeconfig()
Expand Down Expand Up @@ -1074,7 +1076,8 @@ def format_tron_action_dict(action_config: TronActionConfig):
)
)
monitoring_labels = spark_tools.get_spark_driver_monitoring_labels(
action_config.action_spark_config
action_config.action_spark_config,
user=spark_tools.SPARK_TRON_JOB_USER,
)
result["annotations"].update(monitoring_annotations)
result["labels"].update(monitoring_labels)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ rsa==4.7.2
ruamel.yaml==0.15.96
s3transfer==0.10.0
sensu-plugin==0.3.1
service-configuration-lib==2.18.21
service-configuration-lib==2.18.24
setuptools==39.0.1
signalfx==1.0.17
simplejson==3.10.0
Expand Down
2 changes: 1 addition & 1 deletion tests/test_tron_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,6 @@ def test_format_tron_action_dict_spark(
"--conf spark.kubernetes.executor.label.yelp.com/pool=special_pool "
"--conf spark.kubernetes.executor.label.paasta.yelp.com/pool=special_pool "
"--conf spark.kubernetes.executor.label.yelp.com/owner=core_ml "
"--conf spark.kubernetes.executor.podTemplateFile=/nail/srv/configs/spark_dns_pod_template.yaml "
"--conf spark.kubernetes.executor.volumes.hostPath.0.mount.path=/nail/bulkdata "
"--conf spark.kubernetes.executor.volumes.hostPath.0.options.path=/nail/bulkdata "
"--conf spark.kubernetes.executor.volumes.hostPath.0.mount.readOnly=true "
Expand Down Expand Up @@ -1307,6 +1306,7 @@ def test_format_tron_action_dict_spark(
"--conf spark.kubernetes.allocation.batch.size=512 "
"--conf spark.kubernetes.decommission.script=/opt/spark/kubernetes/dockerfiles/spark/decom.sh "
"--conf spark.logConf=true "
"--conf spark.kubernetes.executor.podTemplateFile=/nail/srv/configs/spark_dns_pod_template.yaml "
"--conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider "
"--conf spark.driver.host=$PAASTA_POD_IP "
"--conf spark.kubernetes.authenticate.executor.serviceAccountName=paasta--arn-aws-iam-000000000000-role-some-role "
Expand Down

0 comments on commit fd8188b

Please sign in to comment.