Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TRON-2208: Add toggle in tron config to disable retries on LOST k8s jobs #988

Merged
merged 15 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tron/config/config_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,12 +866,14 @@ class ValidateKubernetes(Validator):
defaults = {
"kubeconfig_path": None,
"enabled": False,
"disable_retries_on_lost": False,
"default_volumes": (),
}

validators = {
"kubeconfig_path": valid_string,
"enabled": valid_bool,
"disable_retries_on_lost": valid_bool,
"default_volumes": build_list_of_type_validator(valid_volume, allow_empty=True),
}

Expand Down
1 change: 1 addition & 0 deletions tron/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def config_object_factory(name, required=None, optional=None):
optional=[
"kubeconfig_path",
"enabled",
"disable_retries_on_lost",
"default_volumes",
],
)
Expand Down
34 changes: 32 additions & 2 deletions tron/core/actionrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1314,8 +1314,38 @@ def kill(self, final: bool = True) -> Optional[str]:

return "\n".join(msgs)

def _exit_unsuccessful(
self, exit_status=None, retry_original_command=True, is_lost_task=False
) -> Optional[Union[bool, ActionCommand]]:

k8s_cluster = KubernetesClusterRepository.get_cluster()
disable_retries_on_lost = False if not k8s_cluster else k8s_cluster.disable_retries_on_lost

if self.is_done:
log.info(
f"{self} got exit code {exit_status} but already in terminal " f'state "{self.state}", not retrying',
)
return None
if self.last_attempt is not None:
self.last_attempt.exit(exit_status)
if self.retries_remaining is not None:
if disable_retries_on_lost and is_lost_task:
log.info(f"{self} skipping auto-retries due to disable_retries_on_lost being enabled.")
else:
if self.retries_remaining > 0:
self.retries_remaining -= 1
return self.restart(original_command=retry_original_command)
else:
log.info(
f"Reached maximum number of retries: {len(self.attempts)}",
)
if exit_status is None:
return self._done("fail_unknown", exit_status)
else:
return self._done("fail", exit_status)

def handle_action_command_state_change(
self, action_command: ActionCommand, event: str, event_data=None
self, action_command: KubernetesTask, event: str, event_data=None
) -> Optional[Union[bool, ActionCommand]]:
"""
Observe ActionCommand state changes and transition the ActionCommand state machine to a new state.
Expand All @@ -1331,7 +1361,7 @@ def handle_action_command_state_change(
if event == ActionCommand.EXITING:
if action_command.exit_status is None:
# This is different from SSHActionRun - allows retries to happen, if configured
return self._exit_unsuccessful(None)
return self._exit_unsuccessful(None, is_lost_task=action_command.is_lost_task)

if not action_command.exit_status:
return self.success()
Expand Down
7 changes: 7 additions & 0 deletions tron/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def __init__(self, action_run_id: str, task_config: KubernetesTaskConfig, serial

self.log.info(f"Kubernetes task {self.get_kubernetes_id()} created with config {self.get_config()}")

self.is_lost_task = False

def get_event_logger(self) -> Logger:
"""
Get or create a logger for a the action run associated with this task.
Expand Down Expand Up @@ -252,6 +254,7 @@ def handle_event(self, event: Event) -> None:
self.log.warning(f" tronctl skip {self.id}")
self.log.warning("If you want Tron to NOT run it and consider it as a failure, fail it with:")
self.log.warning(f" tronctl fail {self.id}")
self.is_lost_task = True
cuza marked this conversation as resolved.
Show resolved Hide resolved
self.exited(None)
else:
self.log.info(
Expand Down Expand Up @@ -280,10 +283,12 @@ def __init__(
enabled: bool = True,
default_volumes: Optional[List[ConfigVolume]] = None,
pod_launch_timeout: Optional[int] = None,
disable_retries_on_lost: bool = False,
):
# general k8s config
self.kubeconfig_path = kubeconfig_path
self.enabled = enabled
self.disable_retries_on_lost = disable_retries_on_lost
self.default_volumes: Optional[List[ConfigVolume]] = default_volumes or []
self.pod_launch_timeout = pod_launch_timeout or DEFAULT_POD_LAUNCH_TIMEOUT_S
# creating a task_proc executor has a couple steps:
Expand Down Expand Up @@ -618,6 +623,7 @@ def recover(self, task: KubernetesTask) -> None:
class KubernetesClusterRepository:
# Kubernetes config
kubernetes_enabled: bool = False
kubernetes_disable_retries_on_lost: bool = False
kubeconfig_path: Optional[str] = None
pod_launch_timeout: Optional[int] = None
default_volumes: Optional[List[ConfigVolume]] = None
Expand Down Expand Up @@ -658,6 +664,7 @@ def shutdown(cls) -> None:
def configure(cls, kubernetes_options: ConfigKubernetes) -> None:
cls.kubeconfig_path = kubernetes_options.kubeconfig_path
cls.kubernetes_enabled = kubernetes_options.enabled
cls.kubernetes_disable_retries_on_lost = kubernetes_options.disable_retries_on_lost
cls.default_volumes = kubernetes_options.default_volumes

for cluster in cls.clusters.values():
Expand Down
Loading