-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TRON-2210: Add a tool that can sync from pod state to tron w/ tronctl #985
Merged
Merged
Changes from 1 commit
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
93bf504
TRON-2210: Add a tool that can sync from pod state to tron w/ tronctl
jfongatyelp 03b7958
Update sync to handle in-progress retries and don't tronctl yet; add …
jfongatyelp 64db0ab
Update sync_tron_from_k8s to match sanitized instance labels, cleanup…
jfongatyelp 5eb6ba8
Update tools/sync_tron_state_from_k8s.py
jfongatyelp File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
""" | ||
Update tron state from k8s api if tron has not yet updated correctly | ||
|
||
Usage: | ||
python tools/sync_tron_state_from_k8s.py -c <kubeconfig_path> (--do-work|--num-runs N|--tronctl-wrapper tronctl-pnw-devc) | ||
|
||
This will search for completed pods in the cluster specified in the kubeconfig in the `tron` namespace and use tronctl to transition any whose states do not match. | ||
""" | ||
import argparse | ||
import subprocess | ||
from typing import Any | ||
from typing import Dict | ||
from typing import List | ||
from typing import Optional | ||
|
||
from kubernetes.client import V1Pod | ||
from task_processing.plugins.kubernetes.kube_client import KubeClient | ||
|
||
from tron.commands.client import Client | ||
from tron.commands.cmd_utils import get_client_config | ||
|
||
POD_STATUS_TO_TRON_STATE = { | ||
"Succeeded": "success", | ||
"Failed": "fail", | ||
"Unknown": "Unknown", # This should never really happen | ||
} | ||
|
||
TRON_MODIFIABLE_STATES = [ | ||
"starting", # stuck jobs | ||
"running", # stuck jobs | ||
"unknown", | ||
"lost", | ||
] | ||
|
||
|
||
def parse_args(): | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument("--kubeconfig-path", dest="kubeconfig_path", help="KUBECONFIG path") | ||
parser.add_argument( | ||
"--do-work", | ||
dest="do_work", | ||
action="store_true", | ||
default=False, | ||
help="Actually modify tron actions that need updating; without this flag we will only print those that would be updated", | ||
) | ||
parser.add_argument("--tron-url", default=None, help="Tron url (default will read from paasta tron config)") | ||
parser.add_argument( | ||
"--tronctl-wrapper", | ||
default="tronctl", | ||
dest="tronctl_wrapper", | ||
help="Tronctl wrapper to use (will not use wrapper by default)", | ||
) | ||
parser.add_argument("-n", "--num-runs", dest="num_runs", default=100, help="Maximum number of job runs to retrieve") | ||
args = parser.parse_args() | ||
|
||
return args | ||
|
||
|
||
def fetch_completed_pods(kubeconfig_path: str) -> Dict[str, V1Pod]: | ||
kube_client = KubeClient(kubeconfig_path=kubeconfig_path, user_agent="sync_tron_state_from_k8s") | ||
|
||
# Bit of a hack, no helper to fetch pods so reach into core api | ||
completed_pod_list = kube_client.core.list_namespaced_pod( | ||
namespace="tron", field_selector="status.phase!=Running,status.phase!=Pending" | ||
) | ||
|
||
return {pod.metadata.name: pod for pod in completed_pod_list.items} | ||
|
||
|
||
def get_tron_state_from_api(tron_server: str, num_runs: int = 100) -> Dict[str, Dict[any, any]]: | ||
if not tron_server: | ||
client_config = get_client_config() | ||
tron_server = client_config.get("server", "http://localhost:8089") | ||
client = Client(tron_server) | ||
# /jobs returns only the latest 5 runs, we'll need to request all runs instead ourselves | ||
jobs = client.jobs( | ||
include_job_runs=False, | ||
include_action_runs=False, | ||
include_action_graph=False, | ||
include_node_pool=False, | ||
) | ||
|
||
for job in jobs: | ||
# What am I doing wrong here, why do I have to append /api | ||
url = f'/api{job["url"]}' | ||
print(f'Fetching job {job["name"]} at {url}') | ||
try: | ||
job_runs = client.job( | ||
url, | ||
include_action_runs=True, # action runs | ||
count=num_runs, # TODO: fetch job run_limit and use that for count ? | ||
) | ||
job["runs"] = job_runs["runs"] | ||
except Exception as e: | ||
print(f"Hit exception: {e}") | ||
return jobs | ||
|
||
|
||
def get_matching_pod(action_run: Dict[str, any], pods: Dict[str, V1Pod]) -> Optional[V1Pod]: | ||
"""Given a tron action_run, try to find the right pod that matches.""" | ||
action_name = action_run["action_name"] | ||
job_name = action_run["job_name"] | ||
run_num = action_run["run_num"] | ||
|
||
service, job = job_name.split(".") | ||
# TODO: how to fetch k8s shortened instance name to match labels? | ||
jfongatyelp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
instance_name = f"{job}.{action_name}" | ||
# If action has retries, there will be multiple pods w/ same job_run; we only want the latest | ||
matching_pods = sorted( | ||
[ | ||
pod | ||
for pod in pods.values() | ||
if pod.metadata.labels["paasta.yelp.com/service"] == service | ||
and pod.metadata.labels["paasta.yelp.com/instance"] == instance_name | ||
and pod.metadata.labels["tron.yelp.com/run_num"] == run_num | ||
], | ||
key=lambda pod: pod.metadata.creation_timestamp, | ||
reverse=True, | ||
) | ||
return matching_pods[0] if matching_pods else None | ||
|
||
|
||
def get_desired_state_from_pod(pod: V1Pod) -> str: | ||
k8s_state = pod.status.phase | ||
return POD_STATUS_TO_TRON_STATE.get(k8s_state, "NoMatch") | ||
|
||
|
||
def update_tron_from_pods( | ||
jobs: List[Dict[str, Any]], pods: Dict[str, V1Pod], tronctl_wrapper: str = "tronctl", do_work: bool = True | ||
jfongatyelp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
): | ||
updated = [] | ||
error = [] | ||
# todo: calculate whether there are more jobs in pnw-prod than completed pods | ||
for job in jobs: | ||
if job["runs"]: | ||
# job_runs | ||
for job_run in job["runs"]: | ||
for action in job_run.get("runs", []): | ||
action_run_id = action["id"] | ||
if action["state"] in TRON_MODIFIABLE_STATES: | ||
pod = get_matching_pod(action, pods) | ||
if pod: | ||
desired_state = get_desired_state_from_pod(pod) | ||
if action["state"] != desired_state: | ||
print(f'{action_run_id} state {action["state"]} needs updating to {desired_state}') | ||
cmd = [tronctl_wrapper, desired_state, action_run_id] | ||
if do_work: | ||
# tronctl-$cluster success/fail svc.job.run.action | ||
try: | ||
print(f"Running {cmd}") | ||
proc = subprocess.run(cmd, capture_output=True, text=True) | ||
if proc.returncode != 0: | ||
print(f"Got non-zero exit code: {proc.returncode}") | ||
print(f"\t{proc.stderr}") | ||
error.append(action_run_id) | ||
updated.append(action_run_id) | ||
except Exception as e: | ||
print(f"ERROR: Hit exception: {repr(e)}") | ||
error.append(action_run_id) | ||
else: | ||
print(f"Dry-Run: Would run {cmd}") | ||
updated.append(action_run_id) | ||
else: | ||
print(f"action run {action_run_id} not found in list of finished pods, no action taken") | ||
else: | ||
print(f'Action state {action["state"]} for {action_run_id} not modifiable, no action taken') | ||
print(f"Updated {len(updated)} actions: {','.join(updated)}") | ||
print(f"Hit {len(error)} errors on actions: {','.join(error)}") | ||
return {"updated": updated, "error": error} | ||
|
||
|
||
if __name__ == "__main__": | ||
args = parse_args() | ||
|
||
jobs = get_tron_state_from_api(args.tron_url, args.num_runs) | ||
print(f"Found {len(jobs)} jobs.") | ||
|
||
pods = fetch_completed_pods(args.kubeconfig_path) | ||
|
||
update_tron_from_pods(jobs, pods, args.tronctl_wrapper, args.do_work) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not doing anything wrong - i'm not quite sure why this method doesn't automatically add in /api for us