From f727e9efc2df1725bedfbbf8d062688c9fa55c8e Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Mon, 10 Jun 2024 18:34:20 +0100 Subject: [PATCH 01/28] MLCOMPUTE-1203 | Configure Spark driver pod memory and cores based on Spark args --- paasta_tools/spark_tools.py | 21 +++++++++++++++++++++ paasta_tools/tron_tools.py | 15 +++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/paasta_tools/spark_tools.py b/paasta_tools/spark_tools.py index 2bc7ade7ae..3e782c1f8d 100644 --- a/paasta_tools/spark_tools.py +++ b/paasta_tools/spark_tools.py @@ -23,6 +23,7 @@ SPARK_JOB_USER = "TRON" SPARK_PROMETHEUS_SHARD = "ml-compute" SPARK_DNS_POD_TEMPLATE = "/nail/srv/configs/spark_dns_pod_template.yaml" +MEM_MULTIPLIER = {'k': 1024, 'm': 1024**2, 'g': 1024**3, 't': 1024**4} log = logging.getLogger(__name__) @@ -247,3 +248,23 @@ def get_spark_driver_monitoring_labels( "spark.yelp.com/driver_ui_port": ui_port_str, } return labels + + +def get_spark_memory_in_unit(mem: str, unit: str) -> int: + """ + Converts Spark memory to the desired unit. + mem is the same format as JVM memory strings: just number or number followed by 'k', 'm', 'g' or 't'. + unit can be 'k', 'm', 'g' or 't'. + Returns memory as an integer converted to the desired unit. + """ + memory_bytes = 0 + if mem: + if mem[-1] in MEM_MULTIPLIER: + memory_bytes = int(mem[:-1]) * MEM_MULTIPLIER[mem[-1]] + else: + try: + memory_bytes = int(mem) + except ValueError: + memory_bytes = 0 + memory_unit = memory_bytes/MEM_MULTIPLIER[unit] + return memory_unit diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index 68fd2f4a71..0cac7c0d56 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -972,7 +972,18 @@ def format_tron_action_dict(action_config: TronActionConfig): "max_runtime", spark_tools.DEFAULT_SPARK_RUNTIME_TIMEOUT ), ) + # set Spark driver pod CPU and memory config if it is specified by Spark arguments + if "spark.driver.cores" in spark_config: + result["cpus"] = spark_config["spark.driver.cores"] + if "spark.driver.memory" in spark_config: + # need to set mem in MB based on tron schema + memory_in_mb = spark_tools.get_spark_memory_in_unit(spark_config["spark.driver.memory"], 'm') + if memory_in_mb: + result["mem"] = str(memory_in_mb) + + # point to the KUBECONFIG needed by Spark driver result["env"]["KUBECONFIG"] = system_paasta_config.get_spark_kubeconfig() + # spark, unlike normal batches, needs to expose several ports for things like the spark # ui and for executor->driver communication result["ports"] = list( @@ -1014,8 +1025,8 @@ def format_tron_action_dict(action_config: TronActionConfig): # the following config is only valid for k8s/Mesos since we're not running SSH actions # in a containerized fashion if executor in (KUBERNETES_EXECUTOR_NAMES + MESOS_EXECUTOR_NAMES): - result["cpus"] = action_config.get_cpus() - result["mem"] = action_config.get_mem() + result.setdefault("cpus", action_config.get_cpus()) + result.setdefault("mem", action_config.get_mem()) result["disk"] = action_config.get_disk() result["extra_volumes"] = format_volumes(extra_volumes) result["docker_image"] = action_config.get_docker_url() From 488b3b359df55b2f641256e52cc3a65801ff26b8 Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Thu, 13 Jun 2024 15:20:27 +0100 Subject: [PATCH 02/28] MLCOMPUTE-1203 | add validation to prevent users from specifying cpus and mem configs for Spark drivers on k8s --- paasta_tools/tron_tools.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index 0cac7c0d56..7aaefcc21d 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -603,6 +603,20 @@ def validate(self): error_msgs.append( f"{self.get_job_name()}.{self.get_action_name()} must have a deploy_group set" ) + # We are not allowing users to specify `cpus` and `mem` configuration if the action is a Spark job + # with driver running on k8s (executor: spark), because we derive these values from `spark.driver.cores` + # and `spark.driver.memory` in order to avoid confusion. + if self.get_executor() == "spark": + if "cpus" in self.config_dict: + error_msgs.append( + f"{self.get_job_name()}.{self.get_action_name()} is a Spark job. `cpus` config is not allowed. " + f"Please specify the driver cores using `spark.driver.cores`." + ) + if "mem" in self.config_dict: + error_msgs.append( + f"{self.get_job_name()}.{self.get_action_name()} is a Spark job. `mem` config is not allowed. " + f"Please specify the driver memory using `spark.driver.memory`." + ) return error_msgs def get_pool(self) -> str: From fac144db90e8ce456339faf7b2de8b771dbe0d19 Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Thu, 13 Jun 2024 10:30:00 -0700 Subject: [PATCH 03/28] MLCOMPUTE-1203 | overload instance config methods, add tests --- paasta_tools/spark_tools.py | 6 +-- paasta_tools/tron_tools.py | 55 +++++++++++++++++++-------- tests/test_tron_tools.py | 76 ++++++++++++++++++++++++++++++++++++- 3 files changed, 116 insertions(+), 21 deletions(-) diff --git a/paasta_tools/spark_tools.py b/paasta_tools/spark_tools.py index 3e782c1f8d..de072e47aa 100644 --- a/paasta_tools/spark_tools.py +++ b/paasta_tools/spark_tools.py @@ -23,7 +23,7 @@ SPARK_JOB_USER = "TRON" SPARK_PROMETHEUS_SHARD = "ml-compute" SPARK_DNS_POD_TEMPLATE = "/nail/srv/configs/spark_dns_pod_template.yaml" -MEM_MULTIPLIER = {'k': 1024, 'm': 1024**2, 'g': 1024**3, 't': 1024**4} +MEM_MULTIPLIER = {"k": 1024, "m": 1024**2, "g": 1024**3, "t": 1024**4} log = logging.getLogger(__name__) @@ -250,7 +250,7 @@ def get_spark_driver_monitoring_labels( return labels -def get_spark_memory_in_unit(mem: str, unit: str) -> int: +def get_spark_memory_in_unit(mem: str, unit: str) -> float: """ Converts Spark memory to the desired unit. mem is the same format as JVM memory strings: just number or number followed by 'k', 'm', 'g' or 't'. @@ -266,5 +266,5 @@ def get_spark_memory_in_unit(mem: str, unit: str) -> int: memory_bytes = int(mem) except ValueError: memory_bytes = 0 - memory_unit = memory_bytes/MEM_MULTIPLIER[unit] + memory_unit = memory_bytes / MEM_MULTIPLIER[unit] return memory_unit diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index 841f3706d0..0f652bbb85 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -282,6 +282,32 @@ def __init__( self.job, self.action = decompose_instance(instance) # Indicate whether this config object is created for validation self.for_validation = for_validation + self.action_spark_config = None + + def get_cpus(self) -> float: + # set Spark driver pod CPU if it is specified by Spark arguments + cpus = 0 + if ( + self.action_spark_config + and "spark.driver.cores" in self.action_spark_config + ): + cpus = float(self.action_spark_config["spark.driver.cores"]) + # use the soa config otherwise + return cpus or super().get_cpus() + + def get_mem(self) -> float: + # set Spark driver pod memory if it is specified by Spark arguments + mem_mb = 0 + if ( + self.action_spark_config + and "spark.driver.memory" in self.action_spark_config + ): + # need to set mem in MB based on tron schema + mem_mb = spark_tools.get_spark_memory_in_unit( + self.action_spark_config["spark.driver.memory"], "m" + ) + # use the soa config otherwise + return mem_mb or super().get_mem() def build_spark_config(self) -> Dict[str, str]: system_paasta_config = load_system_paasta_config() @@ -990,31 +1016,26 @@ def format_tron_action_dict(action_config: TronActionConfig): is_mrjob = action_config.config_dict.get("mrjob", False) system_paasta_config = load_system_paasta_config() # inject spark configs to the original spark-submit command - spark_config = action_config.build_spark_config() + action_config.action_spark_config = action_config.build_spark_config() result["command"] = spark_tools.build_spark_command( result["command"], - spark_config, + action_config.action_spark_config, is_mrjob, action_config.config_dict.get( "max_runtime", spark_tools.DEFAULT_SPARK_RUNTIME_TIMEOUT ), ) - # set Spark driver pod CPU and memory config if it is specified by Spark arguments - if "spark.driver.cores" in spark_config: - result["cpus"] = spark_config["spark.driver.cores"] - if "spark.driver.memory" in spark_config: - # need to set mem in MB based on tron schema - memory_in_mb = spark_tools.get_spark_memory_in_unit(spark_config["spark.driver.memory"], 'm') - if memory_in_mb: - result["mem"] = str(memory_in_mb) - # point to the KUBECONFIG needed by Spark driver result["env"]["KUBECONFIG"] = system_paasta_config.get_spark_kubeconfig() # spark, unlike normal batches, needs to expose several ports for things like the spark # ui and for executor->driver communication result["ports"] = list( - set(spark_tools.get_spark_ports_from_config(spark_config)) + set( + spark_tools.get_spark_ports_from_config( + action_config.action_spark_config + ) + ) ) # mount KUBECONFIG file for Spark drivers to communicate with EKS cluster extra_volumes.append( @@ -1028,10 +1049,12 @@ def format_tron_action_dict(action_config: TronActionConfig): ) # Add pod annotations and labels for Spark monitoring metrics monitoring_annotations = ( - spark_tools.get_spark_driver_monitoring_annotations(spark_config) + spark_tools.get_spark_driver_monitoring_annotations( + action_config.action_spark_config + ) ) monitoring_labels = spark_tools.get_spark_driver_monitoring_labels( - spark_config + action_config.action_spark_config ) result["annotations"].update(monitoring_annotations) result["labels"].update(monitoring_labels) @@ -1052,8 +1075,8 @@ def format_tron_action_dict(action_config: TronActionConfig): # the following config is only valid for k8s/Mesos since we're not running SSH actions # in a containerized fashion if executor in (KUBERNETES_EXECUTOR_NAMES + MESOS_EXECUTOR_NAMES): - result.setdefault("cpus", action_config.get_cpus()) - result.setdefault("mem", action_config.get_mem()) + result["cpus"] = action_config.get_cpus() + result["mem"] = action_config.get_mem() result["disk"] = action_config.get_disk() result["extra_volumes"] = format_volumes(extra_volumes) result["docker_image"] = action_config.get_docker_url() diff --git a/tests/test_tron_tools.py b/tests/test_tron_tools.py index dfa0126c6e..13d562bebc 100644 --- a/tests/test_tron_tools.py +++ b/tests/test_tron_tools.py @@ -637,6 +637,80 @@ def test_validate_action_valid_deploy_group( errors = job_config.validate() assert len(errors) == 0 + @mock.patch("paasta_tools.utils.get_pipeline_deploy_groups", autospec=True) + @mock.patch("paasta_tools.tron_tools.load_system_paasta_config", autospec=True) + def test_validate_invalid_cpus_in_executor_spark_action( + self, mock_load_system_paasta_config, mock_get_pipeline_deploy_groups + ): + job_dict = { + "node": "batch_server", + "schedule": "daily 12:10:00", + "monitoring": {"team": "noop", "page": True}, + "actions": { + "first": { + "cpus": 1, + "command": "echo first", + "deploy_group": "deploy_group_2", + } + }, + } + mock_get_pipeline_deploy_groups.return_value = [ + "deploy_group_1", + "deploy_group_2", + ] + job_config = tron_tools.TronJobConfig("my_job", job_dict, "fake-cluster") + errors = job_config.validate() + assert len(errors) == 1 + + @mock.patch("paasta_tools.utils.get_pipeline_deploy_groups", autospec=True) + @mock.patch("paasta_tools.tron_tools.load_system_paasta_config", autospec=True) + def test_validate_invalid_mem_in_executor_spark_action( + self, mock_load_system_paasta_config, mock_get_pipeline_deploy_groups + ): + job_dict = { + "node": "batch_server", + "schedule": "daily 12:10:00", + "monitoring": {"team": "noop", "page": True}, + "actions": { + "first": { + "mem": 4096, + "command": "echo first", + "deploy_group": "deploy_group_2", + } + }, + } + mock_get_pipeline_deploy_groups.return_value = [ + "deploy_group_1", + "deploy_group_2", + ] + job_config = tron_tools.TronJobConfig("my_job", job_dict, "fake-cluster") + errors = job_config.validate() + assert len(errors) == 1 + + @mock.patch("paasta_tools.utils.get_pipeline_deploy_groups", autospec=True) + @mock.patch("paasta_tools.tron_tools.load_system_paasta_config", autospec=True) + def test_validate_valid_executor_spark_action( + self, mock_load_system_paasta_config, mock_get_pipeline_deploy_groups + ): + job_dict = { + "node": "batch_server", + "schedule": "daily 12:10:00", + "monitoring": {"team": "noop", "page": True}, + "actions": { + "first": { + "command": "echo first", + "deploy_group": "deploy_group_2", + } + }, + } + mock_get_pipeline_deploy_groups.return_value = [ + "deploy_group_1", + "deploy_group_2", + ] + job_config = tron_tools.TronJobConfig("my_job", job_dict, "fake-cluster") + errors = job_config.validate() + assert len(errors) == 0 + @mock.patch("paasta_tools.utils.get_pipeline_deploy_groups", autospec=True) @mock.patch("paasta_tools.tron_tools.load_system_paasta_config", autospec=True) def test_validate_monitoring( @@ -953,8 +1027,6 @@ def test_format_tron_action_dict_spark(self): "service": "my_service", "deploy_group": "prod", "executor": "spark", - "cpus": 2, - "mem": 1200, "disk": 42, "pool": "special_pool", "env": {"SHELL": "/bin/bash"}, From 9614c92ff6e40ef091047d4f94a9b62b4c765bb6 Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Thu, 13 Jun 2024 11:57:41 -0700 Subject: [PATCH 04/28] MLCOMPUTE-1203 | fix tests --- paasta_tools/tron_tools.py | 7 ++++--- tests/test_tron_tools.py | 11 +++++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index 0f652bbb85..e12f5d8931 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -466,7 +466,6 @@ def get_env( system_paasta_config: Optional["SystemPaastaConfig"] = None, ) -> Dict[str, str]: env = super().get_env(system_paasta_config=system_paasta_config) - if self.get_executor() == "spark": # Required by some sdks like boto3 client. Throws NoRegionError otherwise. # AWS_REGION takes precedence if set. @@ -952,6 +951,9 @@ def format_tron_action_dict(action_config: TronActionConfig): result["executor"] = EXECUTOR_NAME_TO_TRON_EXECUTOR_TYPE.get( executor, "kubernetes" ) + if executor == "spark": + # inject spark configs to the original spark-submit command + action_config.action_spark_config = action_config.build_spark_config() result["secret_env"] = action_config.get_secret_env() result["field_selector_env"] = action_config.get_field_selector_env() @@ -1015,8 +1017,7 @@ def format_tron_action_dict(action_config: TronActionConfig): if executor == "spark": is_mrjob = action_config.config_dict.get("mrjob", False) system_paasta_config = load_system_paasta_config() - # inject spark configs to the original spark-submit command - action_config.action_spark_config = action_config.build_spark_config() + # build the entire spark command by adding additional arguments result["command"] = spark_tools.build_spark_command( result["command"], action_config.action_spark_config, diff --git a/tests/test_tron_tools.py b/tests/test_tron_tools.py index 13d562bebc..4d3c4467fd 100644 --- a/tests/test_tron_tools.py +++ b/tests/test_tron_tools.py @@ -648,6 +648,7 @@ def test_validate_invalid_cpus_in_executor_spark_action( "monitoring": {"team": "noop", "page": True}, "actions": { "first": { + "executor": "spark", "cpus": 1, "command": "echo first", "deploy_group": "deploy_group_2", @@ -673,6 +674,7 @@ def test_validate_invalid_mem_in_executor_spark_action( "monitoring": {"team": "noop", "page": True}, "actions": { "first": { + "executor": "spark", "mem": 4096, "command": "echo first", "deploy_group": "deploy_group_2", @@ -698,6 +700,7 @@ def test_validate_valid_executor_spark_action( "monitoring": {"team": "noop", "page": True}, "actions": { "first": { + "executor": "spark", "command": "echo first", "deploy_group": "deploy_group_2", } @@ -1284,8 +1287,8 @@ def test_format_tron_action_dict_spark(self): "PAASTA_CLUSTER": "test-cluster", "PAASTA_DEPLOY_GROUP": "prod", "PAASTA_DOCKER_IMAGE": "my_service:paasta-123abcde", - "PAASTA_RESOURCE_CPUS": "2", - "PAASTA_RESOURCE_MEM": "1200", + "PAASTA_RESOURCE_CPUS": "1", + "PAASTA_RESOURCE_MEM": "1024.0", "PAASTA_RESOURCE_DISK": "42", "PAASTA_GIT_SHA": "123abcde", "PAASTA_INSTANCE_TYPE": "spark", @@ -1338,8 +1341,8 @@ def test_format_tron_action_dict_spark(self): }, ], "ports": [39091], - "cpus": 2, - "mem": 1200, + "cpus": 1, + "mem": 1024.0, "disk": 42, "docker_image": "docker-registry.com:400/my_service:paasta-123abcde", } From bbd6c59bd286f68a9aed2495966b5297746dcbef Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Thu, 13 Jun 2024 20:09:46 +0100 Subject: [PATCH 05/28] MLCOMPUTE-1203 | fix tests --- paasta_tools/spark_tools.py | 8 ++++---- paasta_tools/tron_tools.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/paasta_tools/spark_tools.py b/paasta_tools/spark_tools.py index de072e47aa..03a56b9928 100644 --- a/paasta_tools/spark_tools.py +++ b/paasta_tools/spark_tools.py @@ -257,14 +257,14 @@ def get_spark_memory_in_unit(mem: str, unit: str) -> float: unit can be 'k', 'm', 'g' or 't'. Returns memory as an integer converted to the desired unit. """ - memory_bytes = 0 + memory_bytes = 0.0 if mem: if mem[-1] in MEM_MULTIPLIER: - memory_bytes = int(mem[:-1]) * MEM_MULTIPLIER[mem[-1]] + memory_bytes = float(mem[:-1]) * MEM_MULTIPLIER[mem[-1]] else: try: - memory_bytes = int(mem) + memory_bytes = float(mem) except ValueError: - memory_bytes = 0 + print(f"Unable to parse memory value {mem}") memory_unit = memory_bytes / MEM_MULTIPLIER[unit] return memory_unit diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index e12f5d8931..45fc85e738 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -286,7 +286,7 @@ def __init__( def get_cpus(self) -> float: # set Spark driver pod CPU if it is specified by Spark arguments - cpus = 0 + cpus = 0.0 if ( self.action_spark_config and "spark.driver.cores" in self.action_spark_config @@ -297,7 +297,7 @@ def get_cpus(self) -> float: def get_mem(self) -> float: # set Spark driver pod memory if it is specified by Spark arguments - mem_mb = 0 + mem_mb = 0.0 if ( self.action_spark_config and "spark.driver.memory" in self.action_spark_config From 978255877180b836f4cdba81a256770635bf5167 Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Thu, 13 Jun 2024 23:57:03 +0100 Subject: [PATCH 06/28] MLCOMPUTE-1203 | minor fixes --- paasta_tools/spark_tools.py | 8 +++++--- paasta_tools/tron_tools.py | 20 +++++++++----------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/paasta_tools/spark_tools.py b/paasta_tools/spark_tools.py index 03a56b9928..94267e6e8b 100644 --- a/paasta_tools/spark_tools.py +++ b/paasta_tools/spark_tools.py @@ -8,6 +8,7 @@ from typing import List from typing import Mapping from typing import Set +from typing import Literal from mypy_extensions import TypedDict @@ -250,12 +251,12 @@ def get_spark_driver_monitoring_labels( return labels -def get_spark_memory_in_unit(mem: str, unit: str) -> float: +def get_spark_memory_in_unit(mem: str, unit: Literal["k", "m", "g", "t"]) -> float: """ Converts Spark memory to the desired unit. mem is the same format as JVM memory strings: just number or number followed by 'k', 'm', 'g' or 't'. unit can be 'k', 'm', 'g' or 't'. - Returns memory as an integer converted to the desired unit. + Returns memory as a float converted to the desired unit. """ memory_bytes = 0.0 if mem: @@ -265,6 +266,7 @@ def get_spark_memory_in_unit(mem: str, unit: str) -> float: try: memory_bytes = float(mem) except ValueError: - print(f"Unable to parse memory value {mem}") + print(f"Unable to parse memory value {mem}. Defaulting to 2 GB.") + memory_bytes = 2147483648 # default to 2 GB memory_unit = memory_bytes / MEM_MULTIPLIER[unit] return memory_unit diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index 45fc85e738..bc49490282 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -286,28 +286,25 @@ def __init__( def get_cpus(self) -> float: # set Spark driver pod CPU if it is specified by Spark arguments - cpus = 0.0 if ( self.action_spark_config and "spark.driver.cores" in self.action_spark_config ): - cpus = float(self.action_spark_config["spark.driver.cores"]) - # use the soa config otherwise - return cpus or super().get_cpus() + return float(self.action_spark_config["spark.driver.cores"]) + # we fall back to this default if there's no spark.driver.cores config + return super().get_cpus() def get_mem(self) -> float: # set Spark driver pod memory if it is specified by Spark arguments - mem_mb = 0.0 if ( self.action_spark_config and "spark.driver.memory" in self.action_spark_config ): - # need to set mem in MB based on tron schema - mem_mb = spark_tools.get_spark_memory_in_unit( + return spark_tools.get_spark_memory_in_unit( self.action_spark_config["spark.driver.memory"], "m" ) - # use the soa config otherwise - return mem_mb or super().get_mem() + # we fall back to this default if there's no spark.driver.memory config + return super().get_mem() def build_spark_config(self) -> Dict[str, str]: system_paasta_config = load_system_paasta_config() @@ -952,7 +949,8 @@ def format_tron_action_dict(action_config: TronActionConfig): executor, "kubernetes" ) if executor == "spark": - # inject spark configs to the original spark-submit command + # build the complete Spark configuration + # TODO: add conditional check for Spark specific commands spark-submit, pyspark etc ? action_config.action_spark_config = action_config.build_spark_config() result["secret_env"] = action_config.get_secret_env() @@ -1017,7 +1015,7 @@ def format_tron_action_dict(action_config: TronActionConfig): if executor == "spark": is_mrjob = action_config.config_dict.get("mrjob", False) system_paasta_config = load_system_paasta_config() - # build the entire spark command by adding additional arguments + # inject additional Spark configs in case of Spark commands result["command"] = spark_tools.build_spark_command( result["command"], action_config.action_spark_config, From 1b458746f02ac7c9f03904a2b17b957ed7419a00 Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Fri, 14 Jun 2024 17:49:16 +0100 Subject: [PATCH 07/28] MLCOMPUTE-1203 | move building spark config to constructor --- paasta_tools/spark_tools.py | 16 +++++++--------- paasta_tools/tron_tools.py | 12 ++++++------ 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/paasta_tools/spark_tools.py b/paasta_tools/spark_tools.py index 94267e6e8b..d7c548637a 100644 --- a/paasta_tools/spark_tools.py +++ b/paasta_tools/spark_tools.py @@ -258,15 +258,13 @@ def get_spark_memory_in_unit(mem: str, unit: Literal["k", "m", "g", "t"]) -> flo unit can be 'k', 'm', 'g' or 't'. Returns memory as a float converted to the desired unit. """ - memory_bytes = 0.0 - if mem: - if mem[-1] in MEM_MULTIPLIER: + try: + memory_bytes = float(mem) + except ValueError: + try: memory_bytes = float(mem[:-1]) * MEM_MULTIPLIER[mem[-1]] - else: - try: - memory_bytes = float(mem) - except ValueError: - print(f"Unable to parse memory value {mem}. Defaulting to 2 GB.") - memory_bytes = 2147483648 # default to 2 GB + except (ValueError, IndexError): + print(f"Unable to parse memory value {mem}. Defaulting to 2 GB.") + memory_bytes = 2147483648 # default to 2 GB memory_unit = memory_bytes / MEM_MULTIPLIER[unit] return memory_unit diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index bc49490282..fd8bd30023 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -283,6 +283,10 @@ def __init__( # Indicate whether this config object is created for validation self.for_validation = for_validation self.action_spark_config = None + if self.get_executor() == "spark": + # build the complete Spark configuration + # TODO: add conditional check for Spark specific commands spark-submit, pyspark etc ? + self.action_spark_config = self.build_spark_config() def get_cpus(self) -> float: # set Spark driver pod CPU if it is specified by Spark arguments @@ -300,9 +304,9 @@ def get_mem(self) -> float: self.action_spark_config and "spark.driver.memory" in self.action_spark_config ): - return spark_tools.get_spark_memory_in_unit( + return int(spark_tools.get_spark_memory_in_unit( self.action_spark_config["spark.driver.memory"], "m" - ) + )) # we fall back to this default if there's no spark.driver.memory config return super().get_mem() @@ -948,10 +952,6 @@ def format_tron_action_dict(action_config: TronActionConfig): result["executor"] = EXECUTOR_NAME_TO_TRON_EXECUTOR_TYPE.get( executor, "kubernetes" ) - if executor == "spark": - # build the complete Spark configuration - # TODO: add conditional check for Spark specific commands spark-submit, pyspark etc ? - action_config.action_spark_config = action_config.build_spark_config() result["secret_env"] = action_config.get_secret_env() result["field_selector_env"] = action_config.get_field_selector_env() From 659b96f1629b7cb74bc84122bea72ed2b91782ab Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Mon, 17 Jun 2024 09:28:02 -0700 Subject: [PATCH 08/28] MLCOMPUTE-1203 | fix tests and formatting --- paasta_tools/spark_tools.py | 2 +- paasta_tools/tron_tools.py | 8 +- tests/test_tron_tools.py | 260 ++++++++++++++++++++---------------- 3 files changed, 150 insertions(+), 120 deletions(-) diff --git a/paasta_tools/spark_tools.py b/paasta_tools/spark_tools.py index d7c548637a..335625496f 100644 --- a/paasta_tools/spark_tools.py +++ b/paasta_tools/spark_tools.py @@ -6,9 +6,9 @@ from typing import cast from typing import Dict from typing import List +from typing import Literal from typing import Mapping from typing import Set -from typing import Literal from mypy_extensions import TypedDict diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index fd8bd30023..04f68fc808 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -304,9 +304,11 @@ def get_mem(self) -> float: self.action_spark_config and "spark.driver.memory" in self.action_spark_config ): - return int(spark_tools.get_spark_memory_in_unit( - self.action_spark_config["spark.driver.memory"], "m" - )) + return int( + spark_tools.get_spark_memory_in_unit( + self.action_spark_config["spark.driver.memory"], "m" + ) + ) # we fall back to this default if there's no spark.driver.memory config return super().get_mem() diff --git a/tests/test_tron_tools.py b/tests/test_tron_tools.py index 4d3c4467fd..1cf35064e6 100644 --- a/tests/test_tron_tools.py +++ b/tests/test_tron_tools.py @@ -637,10 +637,16 @@ def test_validate_action_valid_deploy_group( errors = job_config.validate() assert len(errors) == 0 + @mock.patch( + "paasta_tools.tron_tools.TronActionConfig.build_spark_config", autospec=True + ) @mock.patch("paasta_tools.utils.get_pipeline_deploy_groups", autospec=True) @mock.patch("paasta_tools.tron_tools.load_system_paasta_config", autospec=True) def test_validate_invalid_cpus_in_executor_spark_action( - self, mock_load_system_paasta_config, mock_get_pipeline_deploy_groups + self, + mock_load_system_paasta_config, + mock_get_pipeline_deploy_groups, + mock_build_spark_config, ): job_dict = { "node": "batch_server", @@ -663,10 +669,16 @@ def test_validate_invalid_cpus_in_executor_spark_action( errors = job_config.validate() assert len(errors) == 1 + @mock.patch( + "paasta_tools.tron_tools.TronActionConfig.build_spark_config", autospec=True + ) @mock.patch("paasta_tools.utils.get_pipeline_deploy_groups", autospec=True) @mock.patch("paasta_tools.tron_tools.load_system_paasta_config", autospec=True) def test_validate_invalid_mem_in_executor_spark_action( - self, mock_load_system_paasta_config, mock_get_pipeline_deploy_groups + self, + mock_load_system_paasta_config, + mock_get_pipeline_deploy_groups, + mock_build_spark_config, ): job_dict = { "node": "batch_server", @@ -689,10 +701,16 @@ def test_validate_invalid_mem_in_executor_spark_action( errors = job_config.validate() assert len(errors) == 1 + @mock.patch( + "paasta_tools.tron_tools.TronActionConfig.build_spark_config", autospec=True + ) @mock.patch("paasta_tools.utils.get_pipeline_deploy_groups", autospec=True) @mock.patch("paasta_tools.tron_tools.load_system_paasta_config", autospec=True) def test_validate_valid_executor_spark_action( - self, mock_load_system_paasta_config, mock_get_pipeline_deploy_groups + self, + mock_load_system_paasta_config, + mock_get_pipeline_deploy_groups, + mock_build_spark_config, ): job_dict = { "node": "batch_server", @@ -1013,7 +1031,33 @@ def test_format_tron_action_dict_paasta(self): assert result["docker_image"] == expected_docker assert result["env"]["SHELL"] == "/bin/bash" - def test_format_tron_action_dict_spark(self): + @mock.patch( + "paasta_tools.tron_tools.TronActionConfig.get_docker_registry", autospec=True + ) + @mock.patch("paasta_tools.kubernetes_tools.kube_client", autospec=True) + @mock.patch( + "paasta_tools.kubernetes_tools.kube_config.load_kube_config", autospec=True + ) + @mock.patch("paasta_tools.tron_tools.load_system_paasta_config", autospec=True) + @mock.patch("paasta_tools.tron_tools.get_k8s_url_for_cluster", autospec=True) + @mock.patch( + "service_configuration_lib.spark_config._get_k8s_docker_volumes_conf", + autospec=True, + ) + @mock.patch( + "service_configuration_lib.spark_config.utils.load_spark_srv_conf", + autospec=True, + ) + def test_format_tron_action_dict_spark( + self, + mock_load_spark_srv_conf, + mock_get_k8s_docker_volumes_conf, + mock_get_k8s_url_for_cluster, + mock_load_system_paasta_config, + mock_load_kube_config, + mock_kube_client, + mock_get_docker_registry, + ): action_dict = { "iam_role_provider": "aws", "iam_role": "arn:aws:iam::000000000000:role/some_role", @@ -1054,111 +1098,54 @@ def test_format_tron_action_dict_spark(self): "desired_state": "start", "force_bounce": None, } - action_config = tron_tools.TronActionConfig( - service="my_service", - instance=tron_tools.compose_instance("my_job", "do_something"), - config_dict=TronActionConfigDict(action_dict), - branch_dict=utils.BranchDictV2(branch_dict), - cluster="test-cluster", - ) - - with mock.patch.object( - action_config, "get_docker_registry", return_value="docker-registry.com:400" - ), mock.patch( - "paasta_tools.utils.InstanceConfig.use_docker_disk_quota", - autospec=True, - return_value=False, - ), mock.patch( - "paasta_tools.kubernetes_tools.kube_config.load_kube_config", autospec=True - ), mock.patch( - "paasta_tools.kubernetes_tools.kube_client", - autospec=True, - ), mock.patch( - "paasta_tools.tron_tools._spark_k8s_role", - autospec=True, - return_value="spark", - ), mock.patch( - "paasta_tools.tron_tools.load_system_paasta_config", - autospec=True, - return_value=MOCK_SYSTEM_PAASTA_CONFIG, - ), mock.patch( - "paasta_tools.tron_tools.add_volumes_for_authenticating_services", - autospec=True, - return_value=[], - ), mock.patch( - "paasta_tools.tron_tools.get_k8s_url_for_cluster", - autospec=True, - return_value="https://k8s.test-cluster.paasta:6443", - ), mock.patch( - "service_configuration_lib.spark_config._get_k8s_docker_volumes_conf", - autospec=True, - return_value={ - "spark.kubernetes.executor.volumes.hostPath.0.mount.path": "/nail/tmp", - "spark.kubernetes.executor.volumes.hostPath.0.options.path": "/nail/tmp", - "spark.kubernetes.executor.volumes.hostPath.0.mount.readOnly": "false", - "spark.kubernetes.executor.volumes.hostPath.1.mount.path": "/etc/pki/spark", - "spark.kubernetes.executor.volumes.hostPath.1.options.path": "/etc/pki/spark", - "spark.kubernetes.executor.volumes.hostPath.1.mount.readOnly": "true", - "spark.kubernetes.executor.volumes.hostPath.2.mount.path": "/etc/passwd", - "spark.kubernetes.executor.volumes.hostPath.2.options.path": "/etc/passwd", - "spark.kubernetes.executor.volumes.hostPath.2.mount.readOnly": "true", - "spark.kubernetes.executor.volumes.hostPath.3.mount.path": "/etc/group", - "spark.kubernetes.executor.volumes.hostPath.3.options.path": "/etc/group", - "spark.kubernetes.executor.volumes.hostPath.3.mount.readOnly": "true", - }, - ), mock.patch( - "service_configuration_lib.spark_config.utils.load_spark_srv_conf", - autospec=True, - return_value=( - {}, - { - "target_mem_cpu_ratio": 7, - "resource_configs": { - "recommended": { - "cpu": 4, - "mem": 28, - }, - "medium": { - "cpu": 8, - "mem": 56, - }, - "max": { - "cpu": 12, - "mem": 110, - }, + mock_get_k8s_docker_volumes_conf.return_value = { + "spark.kubernetes.executor.volumes.hostPath.0.mount.path": "/nail/tmp", + "spark.kubernetes.executor.volumes.hostPath.0.options.path": "/nail/tmp", + "spark.kubernetes.executor.volumes.hostPath.0.mount.readOnly": "false", + "spark.kubernetes.executor.volumes.hostPath.1.mount.path": "/etc/pki/spark", + "spark.kubernetes.executor.volumes.hostPath.1.options.path": "/etc/pki/spark", + "spark.kubernetes.executor.volumes.hostPath.1.mount.readOnly": "true", + "spark.kubernetes.executor.volumes.hostPath.2.mount.path": "/etc/passwd", + "spark.kubernetes.executor.volumes.hostPath.2.options.path": "/etc/passwd", + "spark.kubernetes.executor.volumes.hostPath.2.mount.readOnly": "true", + "spark.kubernetes.executor.volumes.hostPath.3.mount.path": "/etc/group", + "spark.kubernetes.executor.volumes.hostPath.3.options.path": "/etc/group", + "spark.kubernetes.executor.volumes.hostPath.3.mount.readOnly": "true", + } + mock_load_spark_srv_conf.return_value = ( + {}, + { + "target_mem_cpu_ratio": 7, + "resource_configs": { + "recommended": { + "cpu": 4, + "mem": 28, }, - "cost_factor": { - "test-cluster": { - "test-pool": 100, - }, - "spark-pnw-prod": { - "batch": 0.041, - "stable_batch": 0.142, - }, + "medium": { + "cpu": 8, + "mem": 56, }, - "adjust_executor_res_ratio_thresh": 99999, - "default_resources_waiting_time_per_executor": 2, - "default_clusterman_observed_scaling_time": 15, - "high_cost_threshold_daily": 500, - "preferred_spark_ui_port_start": 39091, - "preferred_spark_ui_port_end": 39100, - "defaults": { - "spark.executor.cores": 4, - "spark.executor.instances": 2, - "spark.executor.memory": 28, - "spark.task.cpus": 1, - "spark.sql.shuffle.partitions": 128, - "spark.dynamicAllocation.executorAllocationRatio": 0.8, - "spark.dynamicAllocation.cachedExecutorIdleTimeout": "1500s", - "spark.yelp.dra.minExecutorRatio": 0.25, + "max": { + "cpu": 12, + "mem": 110, }, - "mandatory_defaults": { - "spark.kubernetes.allocation.batch.size": 512, - "spark.kubernetes.decommission.script": "/opt/spark/kubernetes/dockerfiles/spark/decom.sh", - "spark.logConf": "true", + }, + "cost_factor": { + "test-cluster": { + "test-pool": 100, + }, + "spark-pnw-prod": { + "batch": 0.041, + "stable_batch": 0.142, }, }, - { + "adjust_executor_res_ratio_thresh": 99999, + "default_resources_waiting_time_per_executor": 2, + "default_clusterman_observed_scaling_time": 15, + "high_cost_threshold_daily": 500, + "preferred_spark_ui_port_start": 39091, + "preferred_spark_ui_port_end": 39100, + "defaults": { "spark.executor.cores": 4, "spark.executor.instances": 2, "spark.executor.memory": 28, @@ -1168,21 +1155,62 @@ def test_format_tron_action_dict_spark(self): "spark.dynamicAllocation.cachedExecutorIdleTimeout": "1500s", "spark.yelp.dra.minExecutorRatio": 0.25, }, - { + "mandatory_defaults": { "spark.kubernetes.allocation.batch.size": 512, "spark.kubernetes.decommission.script": "/opt/spark/kubernetes/dockerfiles/spark/decom.sh", "spark.logConf": "true", }, - { - "test-cluster": { - "test-pool": 100, - }, - "spark-pnw-prod": { - "batch": 0.041, - "stable_batch": 0.142, - }, + }, + { + "spark.executor.cores": 4, + "spark.executor.instances": 2, + "spark.executor.memory": 28, + "spark.task.cpus": 1, + "spark.sql.shuffle.partitions": 128, + "spark.dynamicAllocation.executorAllocationRatio": 0.8, + "spark.dynamicAllocation.cachedExecutorIdleTimeout": "1500s", + "spark.yelp.dra.minExecutorRatio": 0.25, + }, + { + "spark.kubernetes.allocation.batch.size": 512, + "spark.kubernetes.decommission.script": "/opt/spark/kubernetes/dockerfiles/spark/decom.sh", + "spark.logConf": "true", + }, + { + "test-cluster": { + "test-pool": 100, }, - ), + "spark-pnw-prod": { + "batch": 0.041, + "stable_batch": 0.142, + }, + }, + ) + mock_get_k8s_url_for_cluster.return_value = ( + "https://k8s.test-cluster.paasta:6443" + ) + mock_load_system_paasta_config.return_value = MOCK_SYSTEM_PAASTA_CONFIG + mock_get_docker_registry.return_value = "docker-registry.com:400" + action_config = tron_tools.TronActionConfig( + service="my_service", + instance=tron_tools.compose_instance("my_job", "do_something"), + config_dict=TronActionConfigDict(action_dict), + branch_dict=utils.BranchDictV2(branch_dict), + cluster="test-cluster", + ) + + with mock.patch( + "paasta_tools.utils.InstanceConfig.use_docker_disk_quota", + autospec=True, + return_value=False, + ), mock.patch( + "paasta_tools.tron_tools._spark_k8s_role", + autospec=True, + return_value="spark", + ), mock.patch( + "paasta_tools.tron_tools.add_volumes_for_authenticating_services", + autospec=True, + return_value=[], ): result = tron_tools.format_tron_action_dict(action_config) @@ -1288,7 +1316,7 @@ def test_format_tron_action_dict_spark(self): "PAASTA_DEPLOY_GROUP": "prod", "PAASTA_DOCKER_IMAGE": "my_service:paasta-123abcde", "PAASTA_RESOURCE_CPUS": "1", - "PAASTA_RESOURCE_MEM": "1024.0", + "PAASTA_RESOURCE_MEM": "1024", "PAASTA_RESOURCE_DISK": "42", "PAASTA_GIT_SHA": "123abcde", "PAASTA_INSTANCE_TYPE": "spark", @@ -1342,7 +1370,7 @@ def test_format_tron_action_dict_spark(self): ], "ports": [39091], "cpus": 1, - "mem": 1024.0, + "mem": 1024, "disk": 42, "docker_image": "docker-registry.com:400/my_service:paasta-123abcde", } From d60db187c0c99e140bbca0fdf6c1ce37c1af70c8 Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Thu, 27 Jun 2024 10:56:56 +0100 Subject: [PATCH 09/28] DAR-2360 | move the creation of spark config away from the TronActionConfig constructor --- paasta_tools/tron_tools.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index 04f68fc808..fe9440a747 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -283,10 +283,6 @@ def __init__( # Indicate whether this config object is created for validation self.for_validation = for_validation self.action_spark_config = None - if self.get_executor() == "spark": - # build the complete Spark configuration - # TODO: add conditional check for Spark specific commands spark-submit, pyspark etc ? - self.action_spark_config = self.build_spark_config() def get_cpus(self) -> float: # set Spark driver pod CPU if it is specified by Spark arguments @@ -926,6 +922,12 @@ def format_tron_action_dict(action_config: TronActionConfig): :param action_config: TronActionConfig """ executor = action_config.get_executor() + # not building the Spark config in TronActionConfig constructor since it needs a KUBECONFIG file and + # fails in yelpsoa configs repo validation while getting the action configs. + if executor == "spark": + # build the complete Spark configuration + # TODO: add conditional check for Spark specific commands spark-submit, pyspark etc ? + action_config.action_spark_config = action_config.build_spark_config() result = { "command": action_config.get_cmd(), "executor": executor, From 01e43d4e032e9e8b787f9e2c57e5dfa0e8199adf Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Thu, 27 Jun 2024 18:48:13 +0100 Subject: [PATCH 10/28] MLCOMPUTE-1203 | add validation signature to methods loading instance configs --- paasta_tools/adhoc_tools.py | 2 +- paasta_tools/cassandracluster_tools.py | 3 +++ paasta_tools/cli/cmds/validate.py | 3 ++- paasta_tools/cli/utils.py | 3 +++ paasta_tools/eks_tools.py | 3 +++ paasta_tools/flink_tools.py | 3 +++ paasta_tools/flinkeks_tools.py | 3 +++ paasta_tools/kafkacluster_tools.py | 3 +++ paasta_tools/kubernetes_tools.py | 3 +++ paasta_tools/monkrelaycluster_tools.py | 3 +++ paasta_tools/nrtsearchservice_tools.py | 3 +++ paasta_tools/nrtsearchserviceeks_tools.py | 3 +++ paasta_tools/tron_tools.py | 15 +++++++++------ paasta_tools/vitesscluster_tools.py | 1 + 14 files changed, 43 insertions(+), 8 deletions(-) diff --git a/paasta_tools/adhoc_tools.py b/paasta_tools/adhoc_tools.py index 53f2b882e8..e3d4b9f40f 100644 --- a/paasta_tools/adhoc_tools.py +++ b/paasta_tools/adhoc_tools.py @@ -31,7 +31,7 @@ def load_adhoc_job_config( - service, instance, cluster, load_deployments=True, soa_dir=DEFAULT_SOA_DIR + service, instance, cluster, load_deployments=True, soa_dir=DEFAULT_SOA_DIR, for_validation: bool = False, ): general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/cassandracluster_tools.py b/paasta_tools/cassandracluster_tools.py index 5f54c0cae7..33a36455ad 100644 --- a/paasta_tools/cassandracluster_tools.py +++ b/paasta_tools/cassandracluster_tools.py @@ -151,6 +151,7 @@ def load_cassandracluster_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> CassandraClusterDeploymentConfig: """Read a service instance's configuration for CassandraCluster. @@ -163,6 +164,8 @@ def load_cassandracluster_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from + :param for_validation: currently unused - part of the signature for instance types that cannot be + fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/cli/cmds/validate.py b/paasta_tools/cli/cmds/validate.py index 447c39d0b2..989017d436 100644 --- a/paasta_tools/cli/cmds/validate.py +++ b/paasta_tools/cli/cmds/validate.py @@ -191,6 +191,7 @@ def load_all_instance_configs_for_service( cluster=cluster, load_deployments=False, soa_dir=soa_dir, + for_validation=True, ) ret.append((instance, instance_config)) @@ -504,7 +505,7 @@ def validate_tron(service_path: str, verbose: bool = False) -> bool: # TODO(TRON-1761): unify tron/paasta validate cron syntax validation service_config = load_tron_service_config( - service=service, cluster=cluster, soa_dir=soa_dir + service=service, cluster=cluster, soa_dir=soa_dir, for_validation=True, ) for config in service_config: cron_expression = config.get_cron_expression() diff --git a/paasta_tools/cli/utils.py b/paasta_tools/cli/utils.py index 082e693d10..13d3052a82 100644 --- a/paasta_tools/cli/utils.py +++ b/paasta_tools/cli/utils.py @@ -607,6 +607,7 @@ def get_jenkins_build_output_url(): NamedArg(str, "cluster"), NamedArg(bool, "load_deployments"), NamedArg(str, "soa_dir"), + NamedArg(str, "for_validation"), ], InstanceConfig, ] @@ -719,6 +720,7 @@ def get_instance_config( soa_dir: str = DEFAULT_SOA_DIR, load_deployments: bool = False, instance_type: Optional[str] = None, + for_validation: bool = False, ) -> InstanceConfig: """Returns the InstanceConfig object for whatever type of instance it is. (kubernetes)""" @@ -740,6 +742,7 @@ def get_instance_config( cluster=cluster, load_deployments=load_deployments, soa_dir=soa_dir, + for_validation=for_validation, ) diff --git a/paasta_tools/eks_tools.py b/paasta_tools/eks_tools.py index 2355c58dc2..d802cd6233 100644 --- a/paasta_tools/eks_tools.py +++ b/paasta_tools/eks_tools.py @@ -42,6 +42,7 @@ def load_eks_service_config_no_cache( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> "EksDeploymentConfig": """Read a service instance's configuration for EKS. @@ -54,6 +55,8 @@ def load_eks_service_config_no_cache( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from + :param for_validation: currently unused - part of the signature for instance types that cannot be + fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/flink_tools.py b/paasta_tools/flink_tools.py index a944820743..4b93a20320 100644 --- a/paasta_tools/flink_tools.py +++ b/paasta_tools/flink_tools.py @@ -118,6 +118,7 @@ def load_flink_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> FlinkDeploymentConfig: """Read a service instance's configuration for Flink. @@ -130,6 +131,8 @@ def load_flink_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from + :param for_validation: currently unused - part of the signature for instance types that cannot be + fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/flinkeks_tools.py b/paasta_tools/flinkeks_tools.py index 7ca47ee32c..031e55ec9a 100644 --- a/paasta_tools/flinkeks_tools.py +++ b/paasta_tools/flinkeks_tools.py @@ -42,6 +42,7 @@ def load_flinkeks_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> FlinkEksDeploymentConfig: """Read a service instance's configuration for Flink on EKS. @@ -54,6 +55,8 @@ def load_flinkeks_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from + :param for_validation: currently unused - part of the signature for instance types that cannot be + fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/kafkacluster_tools.py b/paasta_tools/kafkacluster_tools.py index ca0e800f9d..f51e68e821 100644 --- a/paasta_tools/kafkacluster_tools.py +++ b/paasta_tools/kafkacluster_tools.py @@ -84,6 +84,7 @@ def load_kafkacluster_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> KafkaClusterDeploymentConfig: """Read a service instance's configuration for KafkaCluster. @@ -96,6 +97,8 @@ def load_kafkacluster_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from + :param for_validation: currently unused - part of the signature for instance types that cannot be + fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/kubernetes_tools.py b/paasta_tools/kubernetes_tools.py index 12f5e744b6..454a10a901 100644 --- a/paasta_tools/kubernetes_tools.py +++ b/paasta_tools/kubernetes_tools.py @@ -444,6 +444,7 @@ def load_kubernetes_service_config_no_cache( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> "KubernetesDeploymentConfig": """Read a service instance's configuration for kubernetes. @@ -456,6 +457,8 @@ def load_kubernetes_service_config_no_cache( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from + :param for_validation: currently unused - part of the signature for instance types that cannot be + fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/monkrelaycluster_tools.py b/paasta_tools/monkrelaycluster_tools.py index dda2914781..a4eb383722 100644 --- a/paasta_tools/monkrelaycluster_tools.py +++ b/paasta_tools/monkrelaycluster_tools.py @@ -87,6 +87,7 @@ def load_monkrelaycluster_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> MonkRelayClusterDeploymentConfig: """Read a service instance's configuration for MonkRelayCluster. @@ -99,6 +100,8 @@ def load_monkrelaycluster_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from + :param for_validation: currently unused - part of the signature for instance types that cannot be + fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/nrtsearchservice_tools.py b/paasta_tools/nrtsearchservice_tools.py index e604a3d5fc..9d0fc79cfe 100644 --- a/paasta_tools/nrtsearchservice_tools.py +++ b/paasta_tools/nrtsearchservice_tools.py @@ -84,6 +84,7 @@ def load_nrtsearchservice_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> NrtsearchServiceDeploymentConfig: """Read a service instance's configuration for Nrtsearch. @@ -96,6 +97,8 @@ def load_nrtsearchservice_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from + :param for_validation: currently unused - part of the signature for instance types that cannot be + fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/nrtsearchserviceeks_tools.py b/paasta_tools/nrtsearchserviceeks_tools.py index 49b6372ea2..db249a6e56 100644 --- a/paasta_tools/nrtsearchserviceeks_tools.py +++ b/paasta_tools/nrtsearchserviceeks_tools.py @@ -20,6 +20,7 @@ def load_nrtsearchserviceeks_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> NrtsearchServiceEksDeploymentConfig: """Read a service instance's configuration for Nrtsearch. @@ -32,6 +33,8 @@ def load_nrtsearchserviceeks_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from + :param for_validation: currently unused - part of the signature for instance types that cannot be + fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index fe9440a747..67f0d36b29 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -283,6 +283,11 @@ def __init__( # Indicate whether this config object is created for validation self.for_validation = for_validation self.action_spark_config = None + # building the Spark config inside the constructor for workflows that might need the final configs. + if self.get_executor() == "spark": + # build the complete Spark configuration + # TODO: add conditional check for Spark specific commands spark-submit, pyspark etc ? + self.action_spark_config = self.build_spark_config() def get_cpus(self) -> float: # set Spark driver pod CPU if it is specified by Spark arguments @@ -922,12 +927,6 @@ def format_tron_action_dict(action_config: TronActionConfig): :param action_config: TronActionConfig """ executor = action_config.get_executor() - # not building the Spark config in TronActionConfig constructor since it needs a KUBECONFIG file and - # fails in yelpsoa configs repo validation while getting the action configs. - if executor == "spark": - # build the complete Spark configuration - # TODO: add conditional check for Spark specific commands spark-submit, pyspark etc ? - action_config.action_spark_config = action_config.build_spark_config() result = { "command": action_config.get_cmd(), "executor": executor, @@ -1132,12 +1131,14 @@ def load_tron_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> TronActionConfig: for action in load_tron_instance_configs( service=service, cluster=cluster, load_deployments=load_deployments, soa_dir=soa_dir, + for_validation=for_validation, ): if action.get_instance() == instance: return action @@ -1152,6 +1153,7 @@ def load_tron_instance_configs( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> Tuple[TronActionConfig, ...]: ret: List[TronActionConfig] = [] @@ -1160,6 +1162,7 @@ def load_tron_instance_configs( cluster=cluster, load_deployments=load_deployments, soa_dir=soa_dir, + for_validation=for_validation, ) for job in jobs: diff --git a/paasta_tools/vitesscluster_tools.py b/paasta_tools/vitesscluster_tools.py index 66f809f563..dec79b3cf9 100644 --- a/paasta_tools/vitesscluster_tools.py +++ b/paasta_tools/vitesscluster_tools.py @@ -787,6 +787,7 @@ def load_vitess_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> VitessDeploymentConfig: general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir From 45f7b4a1b3a26804fab0703a9331994d3806a816 Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Thu, 27 Jun 2024 19:02:34 +0100 Subject: [PATCH 11/28] MLCOMPUTE-1203 | add validation signature to methods loading instance configs --- paasta_tools/eks_tools.py | 3 +++ paasta_tools/kubernetes_tools.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/paasta_tools/eks_tools.py b/paasta_tools/eks_tools.py index d802cd6233..49e8b173d5 100644 --- a/paasta_tools/eks_tools.py +++ b/paasta_tools/eks_tools.py @@ -100,6 +100,7 @@ def load_eks_service_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> "EksDeploymentConfig": """Read a service instance's configuration for EKS. @@ -112,6 +113,8 @@ def load_eks_service_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from + :param for_validation: currently unused - part of the signature for instance types that cannot be + fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" return load_eks_service_config_no_cache( service=service, diff --git a/paasta_tools/kubernetes_tools.py b/paasta_tools/kubernetes_tools.py index 454a10a901..6eefea1623 100644 --- a/paasta_tools/kubernetes_tools.py +++ b/paasta_tools/kubernetes_tools.py @@ -502,6 +502,7 @@ def load_kubernetes_service_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, + for_validation: bool = False, ) -> "KubernetesDeploymentConfig": """Read a service instance's configuration for kubernetes. @@ -514,6 +515,8 @@ def load_kubernetes_service_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from + :param for_validation: currently unused - part of the signature for instance types that cannot be + fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" return load_kubernetes_service_config_no_cache( service=service, From 5d27b1b288a257d27ad7f57ee367dd11c02046c9 Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Thu, 27 Jun 2024 11:06:00 -0700 Subject: [PATCH 12/28] formatting --- paasta_tools/adhoc_tools.py | 7 ++++++- paasta_tools/cli/cmds/validate.py | 5 ++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/paasta_tools/adhoc_tools.py b/paasta_tools/adhoc_tools.py index e3d4b9f40f..7f4fb2a3dd 100644 --- a/paasta_tools/adhoc_tools.py +++ b/paasta_tools/adhoc_tools.py @@ -31,7 +31,12 @@ def load_adhoc_job_config( - service, instance, cluster, load_deployments=True, soa_dir=DEFAULT_SOA_DIR, for_validation: bool = False, + service, + instance, + cluster, + load_deployments=True, + soa_dir=DEFAULT_SOA_DIR, + for_validation: bool = False, ): general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/cli/cmds/validate.py b/paasta_tools/cli/cmds/validate.py index 989017d436..8869e19bb3 100644 --- a/paasta_tools/cli/cmds/validate.py +++ b/paasta_tools/cli/cmds/validate.py @@ -505,7 +505,10 @@ def validate_tron(service_path: str, verbose: bool = False) -> bool: # TODO(TRON-1761): unify tron/paasta validate cron syntax validation service_config = load_tron_service_config( - service=service, cluster=cluster, soa_dir=soa_dir, for_validation=True, + service=service, + cluster=cluster, + soa_dir=soa_dir, + for_validation=True, ) for config in service_config: cron_expression = config.get_cron_expression() From b94c2ed64bda2ecf75246e90467e6729849a8477 Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Thu, 27 Jun 2024 19:10:07 +0100 Subject: [PATCH 13/28] MLCOMPUTE-1203 | fix arg type --- paasta_tools/cli/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paasta_tools/cli/utils.py b/paasta_tools/cli/utils.py index 13d3052a82..158cf1ebfe 100644 --- a/paasta_tools/cli/utils.py +++ b/paasta_tools/cli/utils.py @@ -607,7 +607,7 @@ def get_jenkins_build_output_url(): NamedArg(str, "cluster"), NamedArg(bool, "load_deployments"), NamedArg(str, "soa_dir"), - NamedArg(str, "for_validation"), + NamedArg(bool, "for_validation"), ], InstanceConfig, ] From 6d903bea23435dcb9c5b2000acccedd40bbbefdf Mon Sep 17 00:00:00 2001 From: Sameer Sharma Date: Thu, 27 Jun 2024 19:27:26 +0100 Subject: [PATCH 14/28] add missing pos argument --- paasta_tools/cli/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/paasta_tools/cli/utils.py b/paasta_tools/cli/utils.py index 158cf1ebfe..4853322907 100644 --- a/paasta_tools/cli/utils.py +++ b/paasta_tools/cli/utils.py @@ -971,6 +971,7 @@ def get_instance_configs_for_service( cluster=cluster, soa_dir=soa_dir, load_deployments=False, + for_validation=False, ) From 2e30fc661ac789a36c0b8b1cc2f628c1a17fefa9 Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 12:07:57 -0700 Subject: [PATCH 15/28] Revert "add missing pos argument" This reverts commit 6d903bea23435dcb9c5b2000acccedd40bbbefdf. --- paasta_tools/cli/utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paasta_tools/cli/utils.py b/paasta_tools/cli/utils.py index 4853322907..158cf1ebfe 100644 --- a/paasta_tools/cli/utils.py +++ b/paasta_tools/cli/utils.py @@ -971,7 +971,6 @@ def get_instance_configs_for_service( cluster=cluster, soa_dir=soa_dir, load_deployments=False, - for_validation=False, ) From db49af6e071d08108eaf0f91c1544b012d0dc7fc Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 12:08:00 -0700 Subject: [PATCH 16/28] Revert "MLCOMPUTE-1203 | fix arg type" This reverts commit b94c2ed64bda2ecf75246e90467e6729849a8477. --- paasta_tools/cli/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paasta_tools/cli/utils.py b/paasta_tools/cli/utils.py index 158cf1ebfe..13d3052a82 100644 --- a/paasta_tools/cli/utils.py +++ b/paasta_tools/cli/utils.py @@ -607,7 +607,7 @@ def get_jenkins_build_output_url(): NamedArg(str, "cluster"), NamedArg(bool, "load_deployments"), NamedArg(str, "soa_dir"), - NamedArg(bool, "for_validation"), + NamedArg(str, "for_validation"), ], InstanceConfig, ] From dae03fa0070771d968b986081767656887fbfde9 Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 12:08:01 -0700 Subject: [PATCH 17/28] Revert "formatting" This reverts commit 5d27b1b288a257d27ad7f57ee367dd11c02046c9. --- paasta_tools/adhoc_tools.py | 7 +------ paasta_tools/cli/cmds/validate.py | 5 +---- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/paasta_tools/adhoc_tools.py b/paasta_tools/adhoc_tools.py index 7f4fb2a3dd..e3d4b9f40f 100644 --- a/paasta_tools/adhoc_tools.py +++ b/paasta_tools/adhoc_tools.py @@ -31,12 +31,7 @@ def load_adhoc_job_config( - service, - instance, - cluster, - load_deployments=True, - soa_dir=DEFAULT_SOA_DIR, - for_validation: bool = False, + service, instance, cluster, load_deployments=True, soa_dir=DEFAULT_SOA_DIR, for_validation: bool = False, ): general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/cli/cmds/validate.py b/paasta_tools/cli/cmds/validate.py index 8869e19bb3..989017d436 100644 --- a/paasta_tools/cli/cmds/validate.py +++ b/paasta_tools/cli/cmds/validate.py @@ -505,10 +505,7 @@ def validate_tron(service_path: str, verbose: bool = False) -> bool: # TODO(TRON-1761): unify tron/paasta validate cron syntax validation service_config = load_tron_service_config( - service=service, - cluster=cluster, - soa_dir=soa_dir, - for_validation=True, + service=service, cluster=cluster, soa_dir=soa_dir, for_validation=True, ) for config in service_config: cron_expression = config.get_cron_expression() From 7790790fc69fa68817cba0e52374b568ec426094 Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 12:08:03 -0700 Subject: [PATCH 18/28] Revert "MLCOMPUTE-1203 | add validation signature to methods loading instance configs" This reverts commit 45f7b4a1b3a26804fab0703a9331994d3806a816. --- paasta_tools/eks_tools.py | 3 --- paasta_tools/kubernetes_tools.py | 3 --- 2 files changed, 6 deletions(-) diff --git a/paasta_tools/eks_tools.py b/paasta_tools/eks_tools.py index 49e8b173d5..d802cd6233 100644 --- a/paasta_tools/eks_tools.py +++ b/paasta_tools/eks_tools.py @@ -100,7 +100,6 @@ def load_eks_service_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> "EksDeploymentConfig": """Read a service instance's configuration for EKS. @@ -113,8 +112,6 @@ def load_eks_service_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from - :param for_validation: currently unused - part of the signature for instance types that cannot be - fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" return load_eks_service_config_no_cache( service=service, diff --git a/paasta_tools/kubernetes_tools.py b/paasta_tools/kubernetes_tools.py index 6eefea1623..454a10a901 100644 --- a/paasta_tools/kubernetes_tools.py +++ b/paasta_tools/kubernetes_tools.py @@ -502,7 +502,6 @@ def load_kubernetes_service_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> "KubernetesDeploymentConfig": """Read a service instance's configuration for kubernetes. @@ -515,8 +514,6 @@ def load_kubernetes_service_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from - :param for_validation: currently unused - part of the signature for instance types that cannot be - fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" return load_kubernetes_service_config_no_cache( service=service, From 2217de81c7cc205cd2423b1a238d68c157a789fe Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 12:08:25 -0700 Subject: [PATCH 19/28] Revert "MLCOMPUTE-1203 | add validation signature to methods loading instance configs" This reverts commit 01e43d4e032e9e8b787f9e2c57e5dfa0e8199adf. --- paasta_tools/adhoc_tools.py | 2 +- paasta_tools/cassandracluster_tools.py | 3 --- paasta_tools/cli/cmds/validate.py | 3 +-- paasta_tools/cli/utils.py | 3 --- paasta_tools/eks_tools.py | 3 --- paasta_tools/flink_tools.py | 3 --- paasta_tools/flinkeks_tools.py | 3 --- paasta_tools/kafkacluster_tools.py | 3 --- paasta_tools/kubernetes_tools.py | 3 --- paasta_tools/monkrelaycluster_tools.py | 3 --- paasta_tools/nrtsearchservice_tools.py | 3 --- paasta_tools/nrtsearchserviceeks_tools.py | 3 --- paasta_tools/tron_tools.py | 15 ++++++--------- paasta_tools/vitesscluster_tools.py | 1 - 14 files changed, 8 insertions(+), 43 deletions(-) diff --git a/paasta_tools/adhoc_tools.py b/paasta_tools/adhoc_tools.py index e3d4b9f40f..53f2b882e8 100644 --- a/paasta_tools/adhoc_tools.py +++ b/paasta_tools/adhoc_tools.py @@ -31,7 +31,7 @@ def load_adhoc_job_config( - service, instance, cluster, load_deployments=True, soa_dir=DEFAULT_SOA_DIR, for_validation: bool = False, + service, instance, cluster, load_deployments=True, soa_dir=DEFAULT_SOA_DIR ): general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/cassandracluster_tools.py b/paasta_tools/cassandracluster_tools.py index 33a36455ad..5f54c0cae7 100644 --- a/paasta_tools/cassandracluster_tools.py +++ b/paasta_tools/cassandracluster_tools.py @@ -151,7 +151,6 @@ def load_cassandracluster_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> CassandraClusterDeploymentConfig: """Read a service instance's configuration for CassandraCluster. @@ -164,8 +163,6 @@ def load_cassandracluster_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from - :param for_validation: currently unused - part of the signature for instance types that cannot be - fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/cli/cmds/validate.py b/paasta_tools/cli/cmds/validate.py index 989017d436..447c39d0b2 100644 --- a/paasta_tools/cli/cmds/validate.py +++ b/paasta_tools/cli/cmds/validate.py @@ -191,7 +191,6 @@ def load_all_instance_configs_for_service( cluster=cluster, load_deployments=False, soa_dir=soa_dir, - for_validation=True, ) ret.append((instance, instance_config)) @@ -505,7 +504,7 @@ def validate_tron(service_path: str, verbose: bool = False) -> bool: # TODO(TRON-1761): unify tron/paasta validate cron syntax validation service_config = load_tron_service_config( - service=service, cluster=cluster, soa_dir=soa_dir, for_validation=True, + service=service, cluster=cluster, soa_dir=soa_dir ) for config in service_config: cron_expression = config.get_cron_expression() diff --git a/paasta_tools/cli/utils.py b/paasta_tools/cli/utils.py index 13d3052a82..082e693d10 100644 --- a/paasta_tools/cli/utils.py +++ b/paasta_tools/cli/utils.py @@ -607,7 +607,6 @@ def get_jenkins_build_output_url(): NamedArg(str, "cluster"), NamedArg(bool, "load_deployments"), NamedArg(str, "soa_dir"), - NamedArg(str, "for_validation"), ], InstanceConfig, ] @@ -720,7 +719,6 @@ def get_instance_config( soa_dir: str = DEFAULT_SOA_DIR, load_deployments: bool = False, instance_type: Optional[str] = None, - for_validation: bool = False, ) -> InstanceConfig: """Returns the InstanceConfig object for whatever type of instance it is. (kubernetes)""" @@ -742,7 +740,6 @@ def get_instance_config( cluster=cluster, load_deployments=load_deployments, soa_dir=soa_dir, - for_validation=for_validation, ) diff --git a/paasta_tools/eks_tools.py b/paasta_tools/eks_tools.py index d802cd6233..2355c58dc2 100644 --- a/paasta_tools/eks_tools.py +++ b/paasta_tools/eks_tools.py @@ -42,7 +42,6 @@ def load_eks_service_config_no_cache( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> "EksDeploymentConfig": """Read a service instance's configuration for EKS. @@ -55,8 +54,6 @@ def load_eks_service_config_no_cache( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from - :param for_validation: currently unused - part of the signature for instance types that cannot be - fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/flink_tools.py b/paasta_tools/flink_tools.py index 4b93a20320..a944820743 100644 --- a/paasta_tools/flink_tools.py +++ b/paasta_tools/flink_tools.py @@ -118,7 +118,6 @@ def load_flink_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> FlinkDeploymentConfig: """Read a service instance's configuration for Flink. @@ -131,8 +130,6 @@ def load_flink_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from - :param for_validation: currently unused - part of the signature for instance types that cannot be - fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/flinkeks_tools.py b/paasta_tools/flinkeks_tools.py index 031e55ec9a..7ca47ee32c 100644 --- a/paasta_tools/flinkeks_tools.py +++ b/paasta_tools/flinkeks_tools.py @@ -42,7 +42,6 @@ def load_flinkeks_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> FlinkEksDeploymentConfig: """Read a service instance's configuration for Flink on EKS. @@ -55,8 +54,6 @@ def load_flinkeks_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from - :param for_validation: currently unused - part of the signature for instance types that cannot be - fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/kafkacluster_tools.py b/paasta_tools/kafkacluster_tools.py index f51e68e821..ca0e800f9d 100644 --- a/paasta_tools/kafkacluster_tools.py +++ b/paasta_tools/kafkacluster_tools.py @@ -84,7 +84,6 @@ def load_kafkacluster_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> KafkaClusterDeploymentConfig: """Read a service instance's configuration for KafkaCluster. @@ -97,8 +96,6 @@ def load_kafkacluster_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from - :param for_validation: currently unused - part of the signature for instance types that cannot be - fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/kubernetes_tools.py b/paasta_tools/kubernetes_tools.py index 454a10a901..12f5e744b6 100644 --- a/paasta_tools/kubernetes_tools.py +++ b/paasta_tools/kubernetes_tools.py @@ -444,7 +444,6 @@ def load_kubernetes_service_config_no_cache( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> "KubernetesDeploymentConfig": """Read a service instance's configuration for kubernetes. @@ -457,8 +456,6 @@ def load_kubernetes_service_config_no_cache( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from - :param for_validation: currently unused - part of the signature for instance types that cannot be - fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/monkrelaycluster_tools.py b/paasta_tools/monkrelaycluster_tools.py index a4eb383722..dda2914781 100644 --- a/paasta_tools/monkrelaycluster_tools.py +++ b/paasta_tools/monkrelaycluster_tools.py @@ -87,7 +87,6 @@ def load_monkrelaycluster_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> MonkRelayClusterDeploymentConfig: """Read a service instance's configuration for MonkRelayCluster. @@ -100,8 +99,6 @@ def load_monkrelaycluster_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from - :param for_validation: currently unused - part of the signature for instance types that cannot be - fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/nrtsearchservice_tools.py b/paasta_tools/nrtsearchservice_tools.py index 9d0fc79cfe..e604a3d5fc 100644 --- a/paasta_tools/nrtsearchservice_tools.py +++ b/paasta_tools/nrtsearchservice_tools.py @@ -84,7 +84,6 @@ def load_nrtsearchservice_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> NrtsearchServiceDeploymentConfig: """Read a service instance's configuration for Nrtsearch. @@ -97,8 +96,6 @@ def load_nrtsearchservice_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from - :param for_validation: currently unused - part of the signature for instance types that cannot be - fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/nrtsearchserviceeks_tools.py b/paasta_tools/nrtsearchserviceeks_tools.py index db249a6e56..49b6372ea2 100644 --- a/paasta_tools/nrtsearchserviceeks_tools.py +++ b/paasta_tools/nrtsearchserviceeks_tools.py @@ -20,7 +20,6 @@ def load_nrtsearchserviceeks_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> NrtsearchServiceEksDeploymentConfig: """Read a service instance's configuration for Nrtsearch. @@ -33,8 +32,6 @@ def load_nrtsearchserviceeks_instance_config( :param load_deployments: A boolean indicating if the corresponding deployments.json for this service should also be loaded :param soa_dir: The SOA configuration directory to read from - :param for_validation: currently unused - part of the signature for instance types that cannot be - fully created in places where paasta validate runs without special handling :returns: A dictionary of whatever was in the config for the service instance""" general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index 67f0d36b29..fe9440a747 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -283,11 +283,6 @@ def __init__( # Indicate whether this config object is created for validation self.for_validation = for_validation self.action_spark_config = None - # building the Spark config inside the constructor for workflows that might need the final configs. - if self.get_executor() == "spark": - # build the complete Spark configuration - # TODO: add conditional check for Spark specific commands spark-submit, pyspark etc ? - self.action_spark_config = self.build_spark_config() def get_cpus(self) -> float: # set Spark driver pod CPU if it is specified by Spark arguments @@ -927,6 +922,12 @@ def format_tron_action_dict(action_config: TronActionConfig): :param action_config: TronActionConfig """ executor = action_config.get_executor() + # not building the Spark config in TronActionConfig constructor since it needs a KUBECONFIG file and + # fails in yelpsoa configs repo validation while getting the action configs. + if executor == "spark": + # build the complete Spark configuration + # TODO: add conditional check for Spark specific commands spark-submit, pyspark etc ? + action_config.action_spark_config = action_config.build_spark_config() result = { "command": action_config.get_cmd(), "executor": executor, @@ -1131,14 +1132,12 @@ def load_tron_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> TronActionConfig: for action in load_tron_instance_configs( service=service, cluster=cluster, load_deployments=load_deployments, soa_dir=soa_dir, - for_validation=for_validation, ): if action.get_instance() == instance: return action @@ -1153,7 +1152,6 @@ def load_tron_instance_configs( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> Tuple[TronActionConfig, ...]: ret: List[TronActionConfig] = [] @@ -1162,7 +1160,6 @@ def load_tron_instance_configs( cluster=cluster, load_deployments=load_deployments, soa_dir=soa_dir, - for_validation=for_validation, ) for job in jobs: diff --git a/paasta_tools/vitesscluster_tools.py b/paasta_tools/vitesscluster_tools.py index 51914a15ab..1655febd6c 100644 --- a/paasta_tools/vitesscluster_tools.py +++ b/paasta_tools/vitesscluster_tools.py @@ -796,7 +796,6 @@ def load_vitess_instance_config( cluster: str, load_deployments: bool = True, soa_dir: str = DEFAULT_SOA_DIR, - for_validation: bool = False, ) -> VitessDeploymentConfig: general_config = service_configuration_lib.read_service_configuration( service, soa_dir=soa_dir From 800f0121a5284f95153c5a5e5c7bf61b3a90420f Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 13:00:15 -0700 Subject: [PATCH 20/28] Start splitting SA creation and validation (for tron) Depending on merge order, we'll want to slightly refactor this after https://github.com/Yelp/paasta/pull/3906 if this goes out first (or refactor this before merging if #3906 goes out first) - but this makes it so that we never create SAs in Tron outside of setup_tron_namespace (which will get called from a host that has the correct permissions) This avoids needing to pass around any dry-run/validation flags :) --- paasta_tools/setup_tron_namespace.py | 27 +++++++++++++++++++++++++++ paasta_tools/tron_tools.py | 4 ++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/paasta_tools/setup_tron_namespace.py b/paasta_tools/setup_tron_namespace.py index 3653282cd5..8eca53a4ee 100755 --- a/paasta_tools/setup_tron_namespace.py +++ b/paasta_tools/setup_tron_namespace.py @@ -25,8 +25,11 @@ import sys import ruamel.yaml as yaml +import spark_tools from paasta_tools import tron_tools +from paasta_tools.kubernetes_tools import create_or_find_service_account_name +from paasta_tools.tron_tools import KUBERNETES_NAMESPACE from paasta_tools.tron_tools import MASTER_NAMESPACE log = logging.getLogger(__name__) @@ -63,6 +66,26 @@ def parse_args(): return args +def ensure_service_accounts(config: dict) -> None: + for job in config.get("jobs", []): + for action in job.get("actions", []): + if action.get("service_account_name") is not None: + create_or_find_service_account_name( + action["service_account_name"], + namespace=KUBERNETES_NAMESPACE, + dry_run=False, + ) + # spark executors are special in that we want the SA to exist in two namespaces: + # the tron namespace - for the spark driver + # and the spark namespace - for the spark executor + if action.get("executor") == "spark": + create_or_find_service_account_name( + action["service_account_name"], + namespace=spark_tools.SPARK_EXECUTOR_NAMESPACE, + dry_run=False, + ) + + def main(): args = parse_args() log_level = logging.DEBUG if args.verbose else logging.INFO @@ -133,6 +156,10 @@ def main(): log.info(f"{new_config}") updated.append(service) else: + # PaaSTA will not necessarily have created the SAs we want to use + # ...so let's go ahead and create them! + ensure_service_accounts(new_config) + if client.update_namespace(service, new_config): updated.append(service) log.debug(f"Updated {service}") diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index fe9440a747..9902905c78 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -387,7 +387,7 @@ def build_spark_config(self) -> Dict[str, str]: iam_role=self.get_spark_executor_iam_role(), namespace=spark_tools.SPARK_EXECUTOR_NAMESPACE, kubeconfig_file=system_paasta_config.get_spark_kubeconfig(), - dry_run=self.for_validation, + dry_run=True, ) return spark_conf @@ -1009,7 +1009,7 @@ def format_tron_action_dict(action_config: TronActionConfig): iam_role=action_config.get_iam_role(), namespace=EXECUTOR_TYPE_TO_NAMESPACE[executor], k8s_role=None, - dry_run=action_config.for_validation, + dry_run=True, ) # service account token volumes for service authentication From fbae67a4dee1792c6ca121e14d4af3409e8cd1b1 Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 13:20:16 -0700 Subject: [PATCH 21/28] whoops --- paasta_tools/setup_tron_namespace.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paasta_tools/setup_tron_namespace.py b/paasta_tools/setup_tron_namespace.py index 8eca53a4ee..69c937330d 100755 --- a/paasta_tools/setup_tron_namespace.py +++ b/paasta_tools/setup_tron_namespace.py @@ -25,8 +25,8 @@ import sys import ruamel.yaml as yaml -import spark_tools +from paasta_tools import spark_tools from paasta_tools import tron_tools from paasta_tools.kubernetes_tools import create_or_find_service_account_name from paasta_tools.tron_tools import KUBERNETES_NAMESPACE From d154f24626dc600ab838346942a881ff7442ae2a Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 13:31:23 -0700 Subject: [PATCH 22/28] correctly iterate over dicts i blame copilot for letting me turn off my brain --- paasta_tools/setup_tron_namespace.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paasta_tools/setup_tron_namespace.py b/paasta_tools/setup_tron_namespace.py index 69c937330d..a43ec34b4e 100755 --- a/paasta_tools/setup_tron_namespace.py +++ b/paasta_tools/setup_tron_namespace.py @@ -67,8 +67,8 @@ def parse_args(): def ensure_service_accounts(config: dict) -> None: - for job in config.get("jobs", []): - for action in job.get("actions", []): + for _, job in config.get("jobs", {}).items(): + for _, action in job.get("actions", {}).items(): if action.get("service_account_name") is not None: create_or_find_service_account_name( action["service_account_name"], From 784856ff19c563197c61e5654bc93a3d8fb2b3cc Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 13:45:26 -0700 Subject: [PATCH 23/28] correctly parse yaml i wish that we didn't pass around a huge string --- paasta_tools/setup_tron_namespace.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/paasta_tools/setup_tron_namespace.py b/paasta_tools/setup_tron_namespace.py index a43ec34b4e..dddb279b54 100755 --- a/paasta_tools/setup_tron_namespace.py +++ b/paasta_tools/setup_tron_namespace.py @@ -66,7 +66,11 @@ def parse_args(): return args -def ensure_service_accounts(config: dict) -> None: +def ensure_service_accounts(raw_config: str) -> None: + # this is kinda silly, but the tron create_config functions return strings + # we should refactor to pass the dicts around until the we're going to send the config to tron + # (where we can finally convert it to a string) + config = yaml.safe_load(raw_config) for _, job in config.get("jobs", {}).items(): for _, action in job.get("actions", {}).items(): if action.get("service_account_name") is not None: @@ -151,6 +155,7 @@ def main(): k8s_enabled=k8s_enabled_for_cluster, dry_run=args.dry_run, ) + ensure_service_accounts(new_config) if args.dry_run: log.info(f"Would update {service} to:") log.info(f"{new_config}") From 8f555e39395e36365fa4c9326a771e20582ce8ab Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 13:49:17 -0700 Subject: [PATCH 24/28] remove test call --- paasta_tools/setup_tron_namespace.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paasta_tools/setup_tron_namespace.py b/paasta_tools/setup_tron_namespace.py index dddb279b54..e7160e7bf5 100755 --- a/paasta_tools/setup_tron_namespace.py +++ b/paasta_tools/setup_tron_namespace.py @@ -155,7 +155,6 @@ def main(): k8s_enabled=k8s_enabled_for_cluster, dry_run=args.dry_run, ) - ensure_service_accounts(new_config) if args.dry_run: log.info(f"Would update {service} to:") log.info(f"{new_config}") From 36dc8d36121ef615b8aedfce2b4f348004dc2ac8 Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 14:00:24 -0700 Subject: [PATCH 25/28] use spark kubeconfig --- paasta_tools/setup_tron_namespace.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/paasta_tools/setup_tron_namespace.py b/paasta_tools/setup_tron_namespace.py index e7160e7bf5..9f3e33cc4f 100755 --- a/paasta_tools/setup_tron_namespace.py +++ b/paasta_tools/setup_tron_namespace.py @@ -31,6 +31,8 @@ from paasta_tools.kubernetes_tools import create_or_find_service_account_name from paasta_tools.tron_tools import KUBERNETES_NAMESPACE from paasta_tools.tron_tools import MASTER_NAMESPACE +from paasta_tools.utils import load_system_paasta_config +from paasta_tools.utils import SystemPaastaConfig log = logging.getLogger(__name__) @@ -66,7 +68,9 @@ def parse_args(): return args -def ensure_service_accounts(raw_config: str) -> None: +def ensure_service_accounts( + raw_config: str, system_paasta_config: SystemPaastaConfig +) -> None: # this is kinda silly, but the tron create_config functions return strings # we should refactor to pass the dicts around until the we're going to send the config to tron # (where we can finally convert it to a string) @@ -86,6 +90,7 @@ def ensure_service_accounts(raw_config: str) -> None: create_or_find_service_account_name( action["service_account_name"], namespace=spark_tools.SPARK_EXECUTOR_NAMESPACE, + kubeconfig_file=system_paasta_config.get_spark_kubeconfig(), dry_run=False, ) @@ -146,6 +151,7 @@ def main(): k8s_enabled_for_cluster = ( yaml.safe_load(master_config).get("k8s_options", {}).get("enabled", False) ) + system_paasta_config = load_system_paasta_config() for service in sorted(services): try: new_config = tron_tools.create_complete_config( @@ -162,7 +168,7 @@ def main(): else: # PaaSTA will not necessarily have created the SAs we want to use # ...so let's go ahead and create them! - ensure_service_accounts(new_config) + ensure_service_accounts(new_config, system_paasta_config) if client.update_namespace(service, new_config): updated.append(service) From 6a4f3b83d8b3f8e6c5a269baaea0b7b147cf2712 Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 14:20:01 -0700 Subject: [PATCH 26/28] Use krall's latest changes --- paasta_tools/setup_tron_namespace.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/paasta_tools/setup_tron_namespace.py b/paasta_tools/setup_tron_namespace.py index 9f3e33cc4f..e285c05f53 100755 --- a/paasta_tools/setup_tron_namespace.py +++ b/paasta_tools/setup_tron_namespace.py @@ -28,11 +28,11 @@ from paasta_tools import spark_tools from paasta_tools import tron_tools -from paasta_tools.kubernetes_tools import create_or_find_service_account_name +from paasta_tools.kubernetes_tools import ensure_service_account +from paasta_tools.kubernetes_tools import KubeClient from paasta_tools.tron_tools import KUBERNETES_NAMESPACE from paasta_tools.tron_tools import MASTER_NAMESPACE from paasta_tools.utils import load_system_paasta_config -from paasta_tools.utils import SystemPaastaConfig log = logging.getLogger(__name__) @@ -69,7 +69,7 @@ def parse_args(): def ensure_service_accounts( - raw_config: str, system_paasta_config: SystemPaastaConfig + raw_config: str, kube_client: KubeClient, spark_kube_client: KubeClient ) -> None: # this is kinda silly, but the tron create_config functions return strings # we should refactor to pass the dicts around until the we're going to send the config to tron @@ -78,20 +78,19 @@ def ensure_service_accounts( for _, job in config.get("jobs", {}).items(): for _, action in job.get("actions", {}).items(): if action.get("service_account_name") is not None: - create_or_find_service_account_name( + ensure_service_account( action["service_account_name"], namespace=KUBERNETES_NAMESPACE, - dry_run=False, + kube_client=kube_client, ) # spark executors are special in that we want the SA to exist in two namespaces: # the tron namespace - for the spark driver # and the spark namespace - for the spark executor if action.get("executor") == "spark": - create_or_find_service_account_name( + ensure_service_account( action["service_account_name"], namespace=spark_tools.SPARK_EXECUTOR_NAMESPACE, - kubeconfig_file=system_paasta_config.get_spark_kubeconfig(), - dry_run=False, + kube_client=spark_kube_client, ) @@ -152,6 +151,10 @@ def main(): yaml.safe_load(master_config).get("k8s_options", {}).get("enabled", False) ) system_paasta_config = load_system_paasta_config() + kube_client = KubeClient() + spark_kube_client = KubeClient( + config_file=system_paasta_config.get_spark_kubeconfig() + ) for service in sorted(services): try: new_config = tron_tools.create_complete_config( @@ -168,7 +171,7 @@ def main(): else: # PaaSTA will not necessarily have created the SAs we want to use # ...so let's go ahead and create them! - ensure_service_accounts(new_config, system_paasta_config) + ensure_service_accounts(new_config, kube_client, spark_kube_client) if client.update_namespace(service, new_config): updated.append(service) From b0081462377137b1dccec578770665f307e616f9 Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 14:22:40 -0700 Subject: [PATCH 27/28] don't make kubeclients outside of dry-run --- paasta_tools/setup_tron_namespace.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/paasta_tools/setup_tron_namespace.py b/paasta_tools/setup_tron_namespace.py index e285c05f53..3752baeb2f 100755 --- a/paasta_tools/setup_tron_namespace.py +++ b/paasta_tools/setup_tron_namespace.py @@ -150,11 +150,6 @@ def main(): k8s_enabled_for_cluster = ( yaml.safe_load(master_config).get("k8s_options", {}).get("enabled", False) ) - system_paasta_config = load_system_paasta_config() - kube_client = KubeClient() - spark_kube_client = KubeClient( - config_file=system_paasta_config.get_spark_kubeconfig() - ) for service in sorted(services): try: new_config = tron_tools.create_complete_config( @@ -169,6 +164,12 @@ def main(): log.info(f"{new_config}") updated.append(service) else: + # NOTE: these are all lru_cache'd so it should be fine to call these for every service + system_paasta_config = load_system_paasta_config() + kube_client = KubeClient() + spark_kube_client = KubeClient( + config_file=system_paasta_config.get_spark_kubeconfig() + ) # PaaSTA will not necessarily have created the SAs we want to use # ...so let's go ahead and create them! ensure_service_accounts(new_config, kube_client, spark_kube_client) From 7de6ee684a9394686d08b167ebb57c69fab1ee5f Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Fri, 28 Jun 2024 14:33:36 -0700 Subject: [PATCH 28/28] build spark config in constructor --- paasta_tools/tron_tools.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index 1f443fe7fd..b3a94f798c 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -280,9 +280,15 @@ def __init__( soa_dir=soa_dir, ) self.job, self.action = decompose_instance(instance) + # Indicate whether this config object is created for validation self.for_validation = for_validation + self.action_spark_config = None + if self.get_executor() == "spark": + # build the complete Spark configuration + # TODO: add conditional check for Spark specific commands spark-submit, pyspark etc ? + self.action_spark_config = self.build_spark_config() def get_cpus(self) -> float: # set Spark driver pod CPU if it is specified by Spark arguments @@ -918,12 +924,6 @@ def format_tron_action_dict(action_config: TronActionConfig): :param action_config: TronActionConfig """ executor = action_config.get_executor() - # not building the Spark config in TronActionConfig constructor since it needs a KUBECONFIG file and - # fails in yelpsoa configs repo validation while getting the action configs. - if executor == "spark": - # build the complete Spark configuration - # TODO: add conditional check for Spark specific commands spark-submit, pyspark etc ? - action_config.action_spark_config = action_config.build_spark_config() result = { "command": action_config.get_cmd(), "executor": executor,