diff --git a/requirements-minimal.txt b/requirements-minimal.txt index 11684cc6a..ae05a9829 100644 --- a/requirements-minimal.txt +++ b/requirements-minimal.txt @@ -1,9 +1,11 @@ +addict # not sure why check-requirements is not picking this up from task_processing[mesos_executor] argcomplete boto3 bsddb3 cryptography dataclasses ecdsa>=0.13.3 +http-parser # not sure why check-requirements is not picking this up from task_processing[mesos_executor] humanize ipdb ipython @@ -15,6 +17,7 @@ psutil py-bcrypt pyasn1 pyformance +pymesos # not sure why check-requirements is not picking this up from task_processing[mesos_executor] pysensu-yelp PyStaticConfiguration pytimeparse diff --git a/requirements.txt b/requirements.txt index b070dfe99..76725833c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -80,7 +80,7 @@ setuptools==65.5.1 six==1.15.0 sshpubkeys==3.1.0 stack-data==0.6.2 -task-processing==1.2.0 +task-processing==1.3.0 traitlets==5.0.0 Twisted==22.10.0 typing-extensions==4.5.0 diff --git a/tests/core/actionrun_test.py b/tests/core/actionrun_test.py index 5917efcfe..cfa0afc49 100644 --- a/tests/core/actionrun_test.py +++ b/tests/core/actionrun_test.py @@ -1900,6 +1900,7 @@ def test_recover(self, mock_cluster_repo, mock_filehandler, mock_k8s_action_run) task_id=last_attempt.kubernetes_task_id, node_selectors=mock_k8s_action_run.command_config.node_selectors, node_affinities=mock_k8s_action_run.command_config.node_affinities, + topology_spread_constraints=mock_k8s_action_run.command_config.topology_spread_constraints, pod_labels=mock_k8s_action_run.command_config.labels, pod_annotations=mock_k8s_action_run.command_config.annotations, service_account_name=mock_k8s_action_run.command_config.service_account_name, diff --git a/tests/kubernetes_test.py b/tests/kubernetes_test.py index 59c3d56db..6e13c31bb 100644 --- a/tests/kubernetes_test.py +++ b/tests/kubernetes_test.py @@ -474,6 +474,7 @@ def test_create_task_disabled(): cap_drop=[], node_selectors={"yelp.com/pool": "default"}, node_affinities=[], + topology_spread_constraints=[], pod_labels={}, pod_annotations={}, service_account_name=None, @@ -504,6 +505,7 @@ def test_create_task(mock_kubernetes_cluster): cap_drop=[], node_selectors={"yelp.com/pool": "default"}, node_affinities=[], + topology_spread_constraints=[], pod_labels={}, pod_annotations={}, service_account_name=None, @@ -535,6 +537,7 @@ def test_create_task_with_task_id(mock_kubernetes_cluster): cap_drop=[], node_selectors={"yelp.com/pool": "default"}, node_affinities=[], + topology_spread_constraints=[], pod_labels={}, pod_annotations={}, service_account_name=None, @@ -569,6 +572,7 @@ def test_create_task_with_invalid_task_id(mock_kubernetes_cluster): cap_drop=[], node_selectors={"yelp.com/pool": "default"}, node_affinities=[], + topology_spread_constraints=[], pod_labels={}, pod_annotations={}, service_account_name=None, @@ -616,6 +620,7 @@ def test_create_task_with_config(mock_kubernetes_cluster): "cap_drop": ["KILL", "CHOWN"], "node_selectors": {"yelp.com/pool": "default"}, "node_affinities": [], + "topology_spread_constraints": [], "labels": {}, "annotations": {}, "service_account_name": None, @@ -641,6 +646,7 @@ def test_create_task_with_config(mock_kubernetes_cluster): cap_drop=["KILL", "CHOWN"], node_selectors={"yelp.com/pool": "default"}, node_affinities=[], + topology_spread_constraints=[], pod_labels={}, pod_annotations={}, service_account_name=None, diff --git a/tron/config/config_parse.py b/tron/config/config_parse.py index 347402c22..04e66a599 100644 --- a/tron/config/config_parse.py +++ b/tron/config/config_parse.py @@ -59,6 +59,7 @@ from tron.config.schema import ConfigSecretVolumeItem from tron.config.schema import ConfigSSHOptions from tron.config.schema import ConfigState +from tron.config.schema import ConfigTopologySpreadConstraints from tron.config.schema import ConfigVolume from tron.config.schema import MASTER_NAMESPACE from tron.config.schema import NamedTronConfig @@ -394,6 +395,41 @@ class ValidateNodeAffinity(Validator): valid_node_affinity = ValidateNodeAffinity() +def _valid_when_unsatisfiable(value: str, config_context: ConfigContext) -> str: + valid_values = {"DoNotSchedule", "ScheduleAnyway"} + if value not in valid_values: + raise ConfigError(f"Got {value} as a when_unsatisfiable value, expected one of {valid_values}") + + return value + + +def _valid_topology_spread_label_selector(value: Dict[str, str], config_context: ConfigContext) -> Dict[str, str]: + if not value: + raise ConfigError("TopologySpreadConstraints must have a label_selector") + + # XXX: we probably also want to enforce k8s limits for label lengths and whatnot + if not all(isinstance(k, str) for k in value.keys()): + raise ConfigError("TopologySpreadConstraints label_selector keys must be strings") + + if not all(isinstance(s, str) for s in value.values()): + raise ConfigError("TopologySpreadConstraints label_selector values must be strings") + + return value + + +class ValidateTopologySpreadConstraints(Validator): + config_class = ConfigTopologySpreadConstraints + validators = { + "max_skew": valid_int, + "when_unsatisfiable": _valid_when_unsatisfiable, + "topology_key": valid_string, + "label_selector": _valid_topology_spread_label_selector, + } + + +valid_topology_spread_constraints = ValidateTopologySpreadConstraints() + + class ValidateSSHOptions(Validator): """Validate SSH options.""" @@ -564,6 +600,7 @@ class ValidateAction(Validator): "trigger_timeout": None, "node_selectors": None, "node_affinities": None, + "topology_spread_constraints": None, "labels": None, "annotations": None, "service_account_name": None, @@ -605,6 +642,9 @@ class ValidateAction(Validator): "trigger_timeout": config_utils.valid_time_delta, "node_selectors:": valid_dict, "node_affinities": build_list_of_type_validator(valid_node_affinity, allow_empty=True), + "topology_spread_constraints": build_list_of_type_validator( + valid_topology_spread_constraints, allow_empty=True + ), "labels:": valid_dict, "annotations": valid_dict, "service_account_name": valid_string, @@ -655,6 +695,7 @@ class ValidateCleanupAction(Validator): "trigger_timeout": None, "node_selectors": None, "node_affinities": None, + "topology_spread_constraints": None, "labels": None, "annotations": None, "service_account_name": None, @@ -691,6 +732,9 @@ class ValidateCleanupAction(Validator): "trigger_timeout": config_utils.valid_time_delta, "node_selectors:": valid_dict, "node_affinities": build_list_of_type_validator(valid_node_affinity, allow_empty=True), + "topology_spread_constraints": build_list_of_type_validator( + valid_topology_spread_constraints, allow_empty=True + ), "labels": valid_dict, "annotations": valid_dict, "service_account_name": valid_string, diff --git a/tron/config/schema.py b/tron/config/schema.py index 948a65110..8630828e1 100644 --- a/tron/config/schema.py +++ b/tron/config/schema.py @@ -183,6 +183,7 @@ def config_object_factory(name, required=None, optional=None): "trigger_timeout", # datetime.deltatime or None "node_selectors", # Dict of str, str "node_affinities", # List of ConfigNodeAffinity + "topology_spread_constraints", # List of ConfigTopologySpreadConstraints "labels", # Dict of str, str "annotations", # Dict of str, str "service_account_name", # str @@ -222,6 +223,7 @@ def config_object_factory(name, required=None, optional=None): "trigger_timeout", # datetime.deltatime or None "node_selectors", # Dict of str, str "node_affinities", # List of ConfigNodeAffinity + "topology_spread_constraints", # List of ConfigTopologySpreadConstraints "labels", # Dict of str, str "annotations", # Dict of str, str "service_account_name", # str @@ -306,6 +308,12 @@ def _asdict(self) -> dict: optional=[], ) +ConfigTopologySpreadConstraints = config_object_factory( + name="ConfigTopologySpreadConstraints", + required=["max_skew", "label_selector", "topology_key", "when_unsatisfiable"], + optional=[], +) + ConfigParameter = config_object_factory( name="ConfigParameter", required=[ diff --git a/tron/core/action.py b/tron/core/action.py index 7b24efe3c..becbafcca 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -13,6 +13,7 @@ from tron.config.schema import ConfigNodeAffinity from tron.config.schema import ConfigProjectedSAVolume from tron.config.schema import ConfigSecretVolume +from tron.config.schema import ConfigTopologySpreadConstraints log = logging.getLogger(__name__) @@ -39,6 +40,7 @@ class ActionCommandConfig: extra_volumes: set = field(default_factory=set) node_selectors: dict = field(default_factory=dict) node_affinities: List[ConfigNodeAffinity] = field(default_factory=list) + topology_spread_constraints: List[ConfigTopologySpreadConstraints] = field(default_factory=list) labels: dict = field(default_factory=dict) annotations: dict = field(default_factory=dict) service_account_name: Optional[str] = None @@ -98,6 +100,7 @@ def from_config(cls, config: ConfigAction) -> "Action": cap_drop=config.cap_drop or [], node_selectors=config.node_selectors or {}, node_affinities=config.node_affinities or [], + topology_spread_constraints=config.topology_spread_constraints or [], labels=config.labels or {}, annotations=config.annotations or {}, service_account_name=config.service_account_name or None, diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index cc2a25bb2..b6a07df69 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -1178,6 +1178,7 @@ def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]: cap_drop=attempt.command_config.cap_drop, node_selectors=attempt.command_config.node_selectors, node_affinities=attempt.command_config.node_affinities, + topology_spread_constraints=attempt.command_config.topology_spread_constraints, pod_labels=build_labels(run_id=self.id, original_labels=attempt.command_config.labels), pod_annotations=attempt.command_config.annotations, service_account_name=attempt.command_config.service_account_name, @@ -1253,6 +1254,7 @@ def recover(self) -> Optional[KubernetesTask]: task_id=last_attempt.kubernetes_task_id, node_selectors=last_attempt.command_config.node_selectors, node_affinities=last_attempt.command_config.node_affinities, + topology_spread_constraints=last_attempt.command_config.topology_spread_constraints, pod_labels=build_labels(run_id=self.id, original_labels=last_attempt.command_config.labels), pod_annotations=last_attempt.command_config.annotations, service_account_name=last_attempt.command_config.service_account_name, diff --git a/tron/kubernetes.py b/tron/kubernetes.py index afd5cfd53..f0c5f4db8 100644 --- a/tron/kubernetes.py +++ b/tron/kubernetes.py @@ -24,6 +24,7 @@ from tron.config.schema import ConfigProjectedSAVolume from tron.config.schema import ConfigSecretSource from tron.config.schema import ConfigSecretVolume +from tron.config.schema import ConfigTopologySpreadConstraints from tron.config.schema import ConfigVolume from tron.serialize.filehandler import OutputStreamSerializer from tron.utils import exitcode @@ -490,6 +491,7 @@ def create_task( cap_drop: Collection[str], node_selectors: Dict[str, str], node_affinities: List[ConfigNodeAffinity], + topology_spread_constraints: List[ConfigTopologySpreadConstraints], pod_labels: Dict[str, str], pod_annotations: Dict[str, str], service_account_name: Optional[str], @@ -529,6 +531,7 @@ def create_task( ], node_selectors=node_selectors, node_affinities=[affinity._asdict() for affinity in node_affinities], + topology_spread_constraints=[tsc._asdict() for tsc in topology_spread_constraints], labels=pod_labels, annotations=pod_annotations, service_account_name=service_account_name,