From 9a3edf2e66e4a72e7bfc761b6b70bc6ed09252b9 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Tue, 21 Nov 2023 15:33:26 +0900 Subject: [PATCH 1/6] Add additional waiting --- kannon/master.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index 5b71618..7313dfc 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -49,16 +49,24 @@ def build(self, root_task: gokart.TaskOnKart) -> None: task_queue = self._create_task_queue(root_task) # consume task queue - launched_task_ids: Set[str] = set() logger.info("Consuming task queue...") while task_queue: task = task_queue.popleft() if task.complete(): logger.info(f"Task {self._gen_task_info(task)} is already completed.") continue - if task.make_unique_id() in launched_task_ids: + if task.make_unique_id() in self.task_id_to_job_name: + # check if task is still running on child job + job_name = self.task_id_to_job_name[task.make_unique_id()] + job_status = get_job_status( + self.api_instance, + job_name, + self.namespace, + ) + if job_status == JobStatus.FAILED: + raise RuntimeError(f"Task {self._gen_task_info(task)} on job {job_name} has failed.") logger.info(f"Task {self._gen_task_info(task)} is still running on child job.") - task_queue.append(task) + task_queue.append(task) # re-enqueue task to check if it is done continue # TODO: enable user to specify duration to sleep for each task @@ -72,7 +80,6 @@ def build(self, root_task: gokart.TaskOnKart) -> None: if isinstance(task, TaskOnBullet): logger.info(f"Trying to run task {self._gen_task_info(task)} on child job...") self._exec_bullet_task(task) - launched_task_ids.add(task.make_unique_id()) # mark as already launched task task_queue.append(task) # re-enqueue task to check if it is done elif isinstance(task, gokart.TaskOnKart): logger.info(f"Executing task {self._gen_task_info(task)} on master job...") @@ -125,8 +132,7 @@ def _exec_bullet_task(self, task: TaskOnBullet) -> None: ) create_job(self.api_instance, job, self.namespace) logger.info(f"Created child job {job_name} with task {self._gen_task_info(task)}") - task_unique_id = task.make_unique_id() - self.task_id_to_job_name[task_unique_id] = job_name + self.task_id_to_job_name[task.make_unique_id()] = job_name def _create_child_job_object(self, job_name: str, task_pkl_path: str) -> client.V1Job: # TODO: use python -c to avoid dependency to execute_task.py From c38520b50a22a0fd6210cc34c801e2e7b58d0fce Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Tue, 21 Nov 2023 16:40:16 +0900 Subject: [PATCH 2/6] fix mock of kube api --- test/integration_test/test_master_build.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/integration_test/test_master_build.py b/test/integration_test/test_master_build.py index b824a56..0534f04 100644 --- a/test/integration_test/test_master_build.py +++ b/test/integration_test/test_master_build.py @@ -7,6 +7,7 @@ import gokart import luigi from kubernetes import client +from luigi.task import flatten from kannon import Kannon, TaskOnBullet @@ -54,8 +55,19 @@ def _exec_gokart_task(self, task: MockTaskOnKart) -> None: task.run() def _exec_bullet_task(self, task: MockTaskOnBullet) -> None: + self.task_id_to_job_name[task.make_unique_id()] = "dummy_job_name" task.run() + def _check_child_task_status(self, task: MockTaskOnBullet) -> bool: + return True + + def _is_executable(self, task: MockTaskOnKart) -> bool: + children = flatten(task.requires()) + for child in children: + if not child.complete(): + return False + return True + class TestConsumeTaskQueue(unittest.TestCase): From 72275a0e4514989d5b85e661a1938729ea197037 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Tue, 21 Nov 2023 16:41:00 +0900 Subject: [PATCH 3/6] add util method to check child task --- kannon/master.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/kannon/master.py b/kannon/master.py index 7313dfc..90a0c25 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -57,16 +57,9 @@ def build(self, root_task: gokart.TaskOnKart) -> None: continue if task.make_unique_id() in self.task_id_to_job_name: # check if task is still running on child job - job_name = self.task_id_to_job_name[task.make_unique_id()] - job_status = get_job_status( - self.api_instance, - job_name, - self.namespace, - ) - if job_status == JobStatus.FAILED: - raise RuntimeError(f"Task {self._gen_task_info(task)} on job {job_name} has failed.") + assert self._check_child_task_status(task), f"Child task {self._gen_task_info(task)} failed." logger.info(f"Task {self._gen_task_info(task)} is still running on child job.") - task_queue.append(task) # re-enqueue task to check if it is done + task_queue.append(task) # re-enqueue task to check if it is done continue # TODO: enable user to specify duration to sleep for each task @@ -169,6 +162,19 @@ def _gen_task_info(task: gokart.TaskOnKart) -> str: def _gen_pkl_path(task: gokart.TaskOnKart) -> str: return os.path.join(task.workspace_directory, 'kannon', f'task_obj_{task.make_unique_id()}.pkl') + def _check_child_task_status(self, task: TaskOnBullet) -> bool: + if task.make_unique_id() not in self.task_id_to_job_name: + raise f"Task {self._gen_task_info(task)} is not found in `task_id_to_job_name`" + job_name = self.task_id_to_job_name[task.make_unique_id()] + job_status = get_job_status( + self.api_instance, + job_name, + self.namespace, + ) + if job_status == JobStatus.FAILED: + raise RuntimeError(f"Task {self._gen_task_info(task)} on job {job_name} has failed.") + return True + def _is_executable(self, task: gokart.TaskOnKart) -> bool: children = flatten(task.requires()) From eb30b12d78e995c0bffec85d0ce034399a82a7db Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Tue, 21 Nov 2023 16:45:53 +0900 Subject: [PATCH 4/6] fix exception --- kannon/master.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kannon/master.py b/kannon/master.py index 90a0c25..da9e60f 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -164,7 +164,7 @@ def _gen_pkl_path(task: gokart.TaskOnKart) -> str: def _check_child_task_status(self, task: TaskOnBullet) -> bool: if task.make_unique_id() not in self.task_id_to_job_name: - raise f"Task {self._gen_task_info(task)} is not found in `task_id_to_job_name`" + raise ValueError(f"Task {self._gen_task_info(task)} is not found in `task_id_to_job_name`") job_name = self.task_id_to_job_name[task.make_unique_id()] job_status = get_job_status( self.api_instance, From 90bbb91254dc60146e79fe556a8f8a962af86527 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Tue, 21 Nov 2023 17:36:35 +0900 Subject: [PATCH 5/6] Add owner reference from child job to parent pod --- kannon/master.py | 18 ++++++++++++ test/unit_test/test_master.py | 52 +++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/kannon/master.py b/kannon/master.py index da9e60f..6d675cf 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -27,6 +27,8 @@ def __init__( job_prefix: str, path_child_script: str = "./run_child.py", env_to_inherit: Optional[List[str]] = None, + master_pod_name: Optional[str] = None, + master_pod_uid: Optional[str] = None, ) -> None: # validation if not os.path.exists(path_child_script): @@ -40,6 +42,8 @@ def __init__( if env_to_inherit is None: env_to_inherit = ["TASK_WORKSPACE_DIRECTORY"] self.env_to_inherit = env_to_inherit + self.master_pod_name = master_pod_name + self.master_pod_uid = master_pod_uid self.task_id_to_job_name: Dict[str, str] = dict() @@ -151,6 +155,20 @@ def _create_child_job_object(self, job_name: str, task_pkl_path: str) -> client. job.spec.template.spec.containers[0].env = child_envs # replace job name job.metadata.name = job_name + # add owner reference from child to parent if master pod info is available + if self.master_pod_name and self.master_pod_uid: + owner_reference = client.V1OwnerReference( + api_version="batch/v1", + kind="Pod", + name=self.master_pod_name, # owner pod name + uid=self.master_pod_uid, # owner pod uid + ) + if job.metadata.owner_references: + job.metadata.owner_references.append(owner_reference) + else: + job.metadata.owner_references = [owner_reference] + else: + logger.warning("Owner reference is not set because master pod info is not provided.") return job diff --git a/test/unit_test/test_master.py b/test/unit_test/test_master.py index a6c8c31..207cf0f 100644 --- a/test/unit_test/test_master.py +++ b/test/unit_test/test_master.py @@ -172,6 +172,58 @@ class Example(gokart.TaskOnKart): with self.assertRaises(ValueError): master._create_child_job_object("test-job", path_to_pkl) + def test_owner_reference_set(self) -> None: + + class Example(gokart.TaskOnKart): + pass + + path_to_pkl = "path/to/obj" + template_job = self._get_template_job() + master_pod_name = "dummy-master-pod-name" + master_pod_uid = "dummy-master-pod-uid" + master = Kannon( + api_instance=None, + template_job=template_job, + job_prefix="", + path_child_script=__file__, # just pass any existing file as dummy + env_to_inherit=["TASK_WORKSPACE_DIRECTORY"], + master_pod_name=master_pod_name, + master_pod_uid=master_pod_uid, + ) + # set os env + os.environ.update({"TASK_WORKSPACE_DIRECTORY": "/cache"}) + child_job_name = "test-job" + child_job = master._create_child_job_object(child_job_name, path_to_pkl) + + owner_references = child_job.metadata.owner_references + self.assertEqual(len(owner_references), 1) + owner_reference = owner_references[0] + self.assertEqual(owner_reference.name, master_pod_name) + self.assertEqual(owner_reference.uid, master_pod_uid) + + def test_owner_reference_not_set(self) -> None: + + class Example(gokart.TaskOnKart): + pass + + path_to_pkl = "path/to/obj" + template_job = self._get_template_job() + master = Kannon( + api_instance=None, + template_job=template_job, + job_prefix="", + path_child_script=__file__, # just pass any existing file as dummy + env_to_inherit=["TASK_WORKSPACE_DIRECTORY"], + ) + # set os env + os.environ.update({"TASK_WORKSPACE_DIRECTORY": "/cache"}) + child_job_name = "test-job" + with self.assertLogs() as cm: + child_job = master._create_child_job_object(child_job_name, path_to_pkl) + + self.assertEqual(cm.output, ['WARNING:kannon.master:Owner reference is not set because master pod info is not provided.']) + self.assertTrue(child_job.metadata.owner_references is None) + if __name__ == '__main__': unittest.main() From 07010528b7904f18bc392da3f91980d589f9d9d3 Mon Sep 17 00:00:00 2001 From: yutaro-oguri Date: Tue, 5 Dec 2023 19:34:59 +0900 Subject: [PATCH 6/6] apply isort --- kannon/master.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kannon/master.py b/kannon/master.py index fc42067..587bb9b 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -5,7 +5,7 @@ from collections import deque from copy import deepcopy from time import sleep -from typing import Optional, List +from typing import List, Optional import gokart from gokart.target import make_target