Skip to content

Commit

Permalink
Merge pull request #1003 from Yelp/luisp/TRON-2213-tsc
Browse files Browse the repository at this point in the history
Add TSC support to Tron
  • Loading branch information
KaspariK authored Oct 29, 2024
2 parents 2528880 + c1b001f commit 01fc582
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 1 deletion.
3 changes: 3 additions & 0 deletions requirements-minimal.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
addict # not sure why check-requirements is not picking this up from task_processing[mesos_executor]
argcomplete
boto3
bsddb3
cryptography
dataclasses
ecdsa>=0.13.3
http-parser # not sure why check-requirements is not picking this up from task_processing[mesos_executor]
humanize
ipdb
ipython
Expand All @@ -15,6 +17,7 @@ psutil
py-bcrypt
pyasn1
pyformance
pymesos # not sure why check-requirements is not picking this up from task_processing[mesos_executor]
pysensu-yelp
PyStaticConfiguration
pytimeparse
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ setuptools==65.5.1
six==1.15.0
sshpubkeys==3.1.0
stack-data==0.6.2
task-processing==1.2.0
task-processing==1.3.0
traitlets==5.0.0
Twisted==22.10.0
typing-extensions==4.5.0
Expand Down
1 change: 1 addition & 0 deletions tests/core/actionrun_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1900,6 +1900,7 @@ def test_recover(self, mock_cluster_repo, mock_filehandler, mock_k8s_action_run)
task_id=last_attempt.kubernetes_task_id,
node_selectors=mock_k8s_action_run.command_config.node_selectors,
node_affinities=mock_k8s_action_run.command_config.node_affinities,
topology_spread_constraints=mock_k8s_action_run.command_config.topology_spread_constraints,
pod_labels=mock_k8s_action_run.command_config.labels,
pod_annotations=mock_k8s_action_run.command_config.annotations,
service_account_name=mock_k8s_action_run.command_config.service_account_name,
Expand Down
6 changes: 6 additions & 0 deletions tests/kubernetes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ def test_create_task_disabled():
cap_drop=[],
node_selectors={"yelp.com/pool": "default"},
node_affinities=[],
topology_spread_constraints=[],
pod_labels={},
pod_annotations={},
service_account_name=None,
Expand Down Expand Up @@ -504,6 +505,7 @@ def test_create_task(mock_kubernetes_cluster):
cap_drop=[],
node_selectors={"yelp.com/pool": "default"},
node_affinities=[],
topology_spread_constraints=[],
pod_labels={},
pod_annotations={},
service_account_name=None,
Expand Down Expand Up @@ -535,6 +537,7 @@ def test_create_task_with_task_id(mock_kubernetes_cluster):
cap_drop=[],
node_selectors={"yelp.com/pool": "default"},
node_affinities=[],
topology_spread_constraints=[],
pod_labels={},
pod_annotations={},
service_account_name=None,
Expand Down Expand Up @@ -569,6 +572,7 @@ def test_create_task_with_invalid_task_id(mock_kubernetes_cluster):
cap_drop=[],
node_selectors={"yelp.com/pool": "default"},
node_affinities=[],
topology_spread_constraints=[],
pod_labels={},
pod_annotations={},
service_account_name=None,
Expand Down Expand Up @@ -616,6 +620,7 @@ def test_create_task_with_config(mock_kubernetes_cluster):
"cap_drop": ["KILL", "CHOWN"],
"node_selectors": {"yelp.com/pool": "default"},
"node_affinities": [],
"topology_spread_constraints": [],
"labels": {},
"annotations": {},
"service_account_name": None,
Expand All @@ -641,6 +646,7 @@ def test_create_task_with_config(mock_kubernetes_cluster):
cap_drop=["KILL", "CHOWN"],
node_selectors={"yelp.com/pool": "default"},
node_affinities=[],
topology_spread_constraints=[],
pod_labels={},
pod_annotations={},
service_account_name=None,
Expand Down
44 changes: 44 additions & 0 deletions tron/config/config_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from tron.config.schema import ConfigSecretVolumeItem
from tron.config.schema import ConfigSSHOptions
from tron.config.schema import ConfigState
from tron.config.schema import ConfigTopologySpreadConstraints
from tron.config.schema import ConfigVolume
from tron.config.schema import MASTER_NAMESPACE
from tron.config.schema import NamedTronConfig
Expand Down Expand Up @@ -394,6 +395,41 @@ class ValidateNodeAffinity(Validator):
valid_node_affinity = ValidateNodeAffinity()


def _valid_when_unsatisfiable(value: str, config_context: ConfigContext) -> str:
valid_values = {"DoNotSchedule", "ScheduleAnyway"}
if value not in valid_values:
raise ConfigError(f"Got {value} as a when_unsatisfiable value, expected one of {valid_values}")

return value


def _valid_topology_spread_label_selector(value: Dict[str, str], config_context: ConfigContext) -> Dict[str, str]:
if not value:
raise ConfigError("TopologySpreadConstraints must have a label_selector")

# XXX: we probably also want to enforce k8s limits for label lengths and whatnot
if not all(isinstance(k, str) for k in value.keys()):
raise ConfigError("TopologySpreadConstraints label_selector keys must be strings")

if not all(isinstance(s, str) for s in value.values()):
raise ConfigError("TopologySpreadConstraints label_selector values must be strings")

return value


class ValidateTopologySpreadConstraints(Validator):
config_class = ConfigTopologySpreadConstraints
validators = {
"max_skew": valid_int,
"when_unsatisfiable": _valid_when_unsatisfiable,
"topology_key": valid_string,
"label_selector": _valid_topology_spread_label_selector,
}


valid_topology_spread_constraints = ValidateTopologySpreadConstraints()


class ValidateSSHOptions(Validator):
"""Validate SSH options."""

Expand Down Expand Up @@ -564,6 +600,7 @@ class ValidateAction(Validator):
"trigger_timeout": None,
"node_selectors": None,
"node_affinities": None,
"topology_spread_constraints": None,
"labels": None,
"annotations": None,
"service_account_name": None,
Expand Down Expand Up @@ -605,6 +642,9 @@ class ValidateAction(Validator):
"trigger_timeout": config_utils.valid_time_delta,
"node_selectors:": valid_dict,
"node_affinities": build_list_of_type_validator(valid_node_affinity, allow_empty=True),
"topology_spread_constraints": build_list_of_type_validator(
valid_topology_spread_constraints, allow_empty=True
),
"labels:": valid_dict,
"annotations": valid_dict,
"service_account_name": valid_string,
Expand Down Expand Up @@ -655,6 +695,7 @@ class ValidateCleanupAction(Validator):
"trigger_timeout": None,
"node_selectors": None,
"node_affinities": None,
"topology_spread_constraints": None,
"labels": None,
"annotations": None,
"service_account_name": None,
Expand Down Expand Up @@ -691,6 +732,9 @@ class ValidateCleanupAction(Validator):
"trigger_timeout": config_utils.valid_time_delta,
"node_selectors:": valid_dict,
"node_affinities": build_list_of_type_validator(valid_node_affinity, allow_empty=True),
"topology_spread_constraints": build_list_of_type_validator(
valid_topology_spread_constraints, allow_empty=True
),
"labels": valid_dict,
"annotations": valid_dict,
"service_account_name": valid_string,
Expand Down
8 changes: 8 additions & 0 deletions tron/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def config_object_factory(name, required=None, optional=None):
"trigger_timeout", # datetime.deltatime or None
"node_selectors", # Dict of str, str
"node_affinities", # List of ConfigNodeAffinity
"topology_spread_constraints", # List of ConfigTopologySpreadConstraints
"labels", # Dict of str, str
"annotations", # Dict of str, str
"service_account_name", # str
Expand Down Expand Up @@ -222,6 +223,7 @@ def config_object_factory(name, required=None, optional=None):
"trigger_timeout", # datetime.deltatime or None
"node_selectors", # Dict of str, str
"node_affinities", # List of ConfigNodeAffinity
"topology_spread_constraints", # List of ConfigTopologySpreadConstraints
"labels", # Dict of str, str
"annotations", # Dict of str, str
"service_account_name", # str
Expand Down Expand Up @@ -306,6 +308,12 @@ def _asdict(self) -> dict:
optional=[],
)

ConfigTopologySpreadConstraints = config_object_factory(
name="ConfigTopologySpreadConstraints",
required=["max_skew", "label_selector", "topology_key", "when_unsatisfiable"],
optional=[],
)

ConfigParameter = config_object_factory(
name="ConfigParameter",
required=[
Expand Down
3 changes: 3 additions & 0 deletions tron/core/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from tron.config.schema import ConfigNodeAffinity
from tron.config.schema import ConfigProjectedSAVolume
from tron.config.schema import ConfigSecretVolume
from tron.config.schema import ConfigTopologySpreadConstraints

log = logging.getLogger(__name__)

Expand All @@ -39,6 +40,7 @@ class ActionCommandConfig:
extra_volumes: set = field(default_factory=set)
node_selectors: dict = field(default_factory=dict)
node_affinities: List[ConfigNodeAffinity] = field(default_factory=list)
topology_spread_constraints: List[ConfigTopologySpreadConstraints] = field(default_factory=list)
labels: dict = field(default_factory=dict)
annotations: dict = field(default_factory=dict)
service_account_name: Optional[str] = None
Expand Down Expand Up @@ -98,6 +100,7 @@ def from_config(cls, config: ConfigAction) -> "Action":
cap_drop=config.cap_drop or [],
node_selectors=config.node_selectors or {},
node_affinities=config.node_affinities or [],
topology_spread_constraints=config.topology_spread_constraints or [],
labels=config.labels or {},
annotations=config.annotations or {},
service_account_name=config.service_account_name or None,
Expand Down
2 changes: 2 additions & 0 deletions tron/core/actionrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,7 @@ def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]:
cap_drop=attempt.command_config.cap_drop,
node_selectors=attempt.command_config.node_selectors,
node_affinities=attempt.command_config.node_affinities,
topology_spread_constraints=attempt.command_config.topology_spread_constraints,
pod_labels=build_labels(run_id=self.id, original_labels=attempt.command_config.labels),
pod_annotations=attempt.command_config.annotations,
service_account_name=attempt.command_config.service_account_name,
Expand Down Expand Up @@ -1253,6 +1254,7 @@ def recover(self) -> Optional[KubernetesTask]:
task_id=last_attempt.kubernetes_task_id,
node_selectors=last_attempt.command_config.node_selectors,
node_affinities=last_attempt.command_config.node_affinities,
topology_spread_constraints=last_attempt.command_config.topology_spread_constraints,
pod_labels=build_labels(run_id=self.id, original_labels=last_attempt.command_config.labels),
pod_annotations=last_attempt.command_config.annotations,
service_account_name=last_attempt.command_config.service_account_name,
Expand Down
3 changes: 3 additions & 0 deletions tron/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from tron.config.schema import ConfigProjectedSAVolume
from tron.config.schema import ConfigSecretSource
from tron.config.schema import ConfigSecretVolume
from tron.config.schema import ConfigTopologySpreadConstraints
from tron.config.schema import ConfigVolume
from tron.serialize.filehandler import OutputStreamSerializer
from tron.utils import exitcode
Expand Down Expand Up @@ -490,6 +491,7 @@ def create_task(
cap_drop: Collection[str],
node_selectors: Dict[str, str],
node_affinities: List[ConfigNodeAffinity],
topology_spread_constraints: List[ConfigTopologySpreadConstraints],
pod_labels: Dict[str, str],
pod_annotations: Dict[str, str],
service_account_name: Optional[str],
Expand Down Expand Up @@ -529,6 +531,7 @@ def create_task(
],
node_selectors=node_selectors,
node_affinities=[affinity._asdict() for affinity in node_affinities],
topology_spread_constraints=[tsc._asdict() for tsc in topology_spread_constraints],
labels=pod_labels,
annotations=pod_annotations,
service_account_name=service_account_name,
Expand Down

0 comments on commit 01fc582

Please sign in to comment.