Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MLCOMPUTE-1497 | include spark driver memory overhead in driver pod memory #3926

Merged
merged 5 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion paasta_tools/cli/schemas/tron_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@
"type": "boolean"
},
"spark.app.name": {
"type": "boolean"
"type": "string"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice-to-have suggestion: might be worth adding some additional constraints here too re: min/max length since (iirc) this gets turned into a k8s label and those values have a max length (and if there's any characters that are not allowed, there's a field for adding a pattern that these strings need to match to pass validation)

},
"spark.task.maxFailures": {
"type": "integer",
Expand Down
21 changes: 1 addition & 20 deletions paasta_tools/spark_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import cast
from typing import Dict
from typing import List
from typing import Literal
from typing import Mapping
from typing import Set

Expand All @@ -25,6 +24,7 @@
SPARK_PROMETHEUS_SHARD = "ml-compute"
SPARK_DNS_POD_TEMPLATE = "/nail/srv/configs/spark_dns_pod_template.yaml"
MEM_MULTIPLIER = {"k": 1024, "m": 1024**2, "g": 1024**3, "t": 1024**4}
SPARK_DRIVER_DEFAULT_DISK_MB = 5120 # 5GB

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -249,22 +249,3 @@ def get_spark_driver_monitoring_labels(
"spark.yelp.com/driver_ui_port": ui_port_str,
}
return labels


def get_spark_memory_in_unit(mem: str, unit: Literal["k", "m", "g", "t"]) -> float:
"""
Converts Spark memory to the desired unit.
mem is the same format as JVM memory strings: just number or number followed by 'k', 'm', 'g' or 't'.
unit can be 'k', 'm', 'g' or 't'.
Returns memory as a float converted to the desired unit.
"""
try:
memory_bytes = float(mem)
except ValueError:
try:
memory_bytes = float(mem[:-1]) * MEM_MULTIPLIER[mem[-1]]
except (ValueError, IndexError):
print(f"Unable to parse memory value {mem}. Defaulting to 2 GB.")
memory_bytes = 2147483648 # default to 2 GB
memory_unit = memory_bytes / MEM_MULTIPLIER[unit]
return memory_unit
23 changes: 12 additions & 11 deletions paasta_tools/tron_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from mypy_extensions import TypedDict
from service_configuration_lib import read_extra_service_information
from service_configuration_lib import read_yaml_file
from service_configuration_lib.spark_config import get_total_driver_memory_mb
from service_configuration_lib.spark_config import SparkConfBuilder

from paasta_tools.mesos_tools import mesos_services_running_here
Expand Down Expand Up @@ -301,19 +302,19 @@ def get_cpus(self) -> float:
return super().get_cpus()

def get_mem(self) -> float:
# set Spark driver pod memory if it is specified by Spark arguments
if (
self.action_spark_config
and "spark.driver.memory" in self.action_spark_config
):
return int(
spark_tools.get_spark_memory_in_unit(
self.action_spark_config["spark.driver.memory"], "m"
)
)
# we fall back to this default if there's no spark.driver.memory config
# get Spark driver pod memory specified by Spark arguments
if self.action_spark_config:
return get_total_driver_memory_mb(self.action_spark_config)
# we fall back to this default if there's no Spark config
return super().get_mem()

def get_disk(self, default: float = 1024) -> float:
# increase default threshold for Spark driver pod memory because 1G is too low
if self.action_spark_config and "disk" not in self.config_dict:
return spark_tools.SPARK_DRIVER_DEFAULT_DISK_MB
# we fall back to this default if there's no Spark config
return super().get_disk()

def build_spark_config(self) -> Dict[str, str]:
system_paasta_config = load_system_paasta_config()
resolved_cluster = system_paasta_config.get_eks_cluster_aliases().get(
Expand Down
2 changes: 1 addition & 1 deletion requirements-minimal.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ requests-cache >= 0.4.10
retry
ruamel.yaml
sensu-plugin
service-configuration-lib >= 2.18.19
service-configuration-lib >= 2.18.20
signalfx
slackclient >= 1.2.1
sticht >= 1.1.0
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ rsa==4.7.2
ruamel.yaml==0.15.96
s3transfer==0.10.0
sensu-plugin==0.3.1
service-configuration-lib==2.18.19
service-configuration-lib==2.18.20
setuptools==39.0.1
signalfx==1.0.17
simplejson==3.10.0
Expand Down
4 changes: 2 additions & 2 deletions tests/test_tron_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ def test_format_tron_action_dict_spark(
"PAASTA_DEPLOY_GROUP": "prod",
"PAASTA_DOCKER_IMAGE": "my_service:paasta-123abcde",
"PAASTA_RESOURCE_CPUS": "1",
"PAASTA_RESOURCE_MEM": "1024",
"PAASTA_RESOURCE_MEM": "1126",
"PAASTA_RESOURCE_DISK": "42",
"PAASTA_GIT_SHA": "123abcde",
"PAASTA_INSTANCE_TYPE": "spark",
Expand Down Expand Up @@ -1366,7 +1366,7 @@ def test_format_tron_action_dict_spark(
],
"ports": [39091],
"cpus": 1,
"mem": 1024,
"mem": 1126,
"disk": 42,
"docker_image": "docker-registry.com:400/my_service:paasta-123abcde",
}
Expand Down
Loading