diff --git a/kannon/master.py b/kannon/master.py index ca32dd9..587bb9b 100644 --- a/kannon/master.py +++ b/kannon/master.py @@ -5,6 +5,7 @@ from collections import deque from copy import deepcopy from time import sleep +from typing import List, Optional import gokart from gokart.target import make_target @@ -27,7 +28,9 @@ def __init__( # kannon resources job_prefix: str, path_child_script: str = "./run_child.py", - env_to_inherit: list[str] | None = None, + env_to_inherit: Optional[List[str]] = None, + master_pod_name: Optional[str] = None, + master_pod_uid: Optional[str] = None, max_child_jobs: int | None = None, ) -> None: # validation @@ -42,6 +45,10 @@ def __init__( if env_to_inherit is None: env_to_inherit = ["TASK_WORKSPACE_DIRECTORY"] self.env_to_inherit = env_to_inherit + + self.master_pod_name = master_pod_name + self.master_pod_uid = master_pod_uid + if max_child_jobs is not None and max_child_jobs <= 0: raise ValueError(f"max_child_jobs must be positive integer, but got {max_child_jobs}") self.max_child_jobs = max_child_jobs @@ -164,6 +171,20 @@ def _create_child_job_object(self, job_name: str, task_pkl_path: str) -> client. job.spec.template.spec.containers[0].env = child_envs # replace job name job.metadata.name = job_name + # add owner reference from child to parent if master pod info is available + if self.master_pod_name and self.master_pod_uid: + owner_reference = client.V1OwnerReference( + api_version="batch/v1", + kind="Pod", + name=self.master_pod_name, # owner pod name + uid=self.master_pod_uid, # owner pod uid + ) + if job.metadata.owner_references: + job.metadata.owner_references.append(owner_reference) + else: + job.metadata.owner_references = [owner_reference] + else: + logger.warning("Owner reference is not set because master pod info is not provided.") return job diff --git a/test/unit_test/test_master.py b/test/unit_test/test_master.py index a6c8c31..207cf0f 100644 --- a/test/unit_test/test_master.py +++ b/test/unit_test/test_master.py @@ -172,6 +172,58 @@ class Example(gokart.TaskOnKart): with self.assertRaises(ValueError): master._create_child_job_object("test-job", path_to_pkl) + def test_owner_reference_set(self) -> None: + + class Example(gokart.TaskOnKart): + pass + + path_to_pkl = "path/to/obj" + template_job = self._get_template_job() + master_pod_name = "dummy-master-pod-name" + master_pod_uid = "dummy-master-pod-uid" + master = Kannon( + api_instance=None, + template_job=template_job, + job_prefix="", + path_child_script=__file__, # just pass any existing file as dummy + env_to_inherit=["TASK_WORKSPACE_DIRECTORY"], + master_pod_name=master_pod_name, + master_pod_uid=master_pod_uid, + ) + # set os env + os.environ.update({"TASK_WORKSPACE_DIRECTORY": "/cache"}) + child_job_name = "test-job" + child_job = master._create_child_job_object(child_job_name, path_to_pkl) + + owner_references = child_job.metadata.owner_references + self.assertEqual(len(owner_references), 1) + owner_reference = owner_references[0] + self.assertEqual(owner_reference.name, master_pod_name) + self.assertEqual(owner_reference.uid, master_pod_uid) + + def test_owner_reference_not_set(self) -> None: + + class Example(gokart.TaskOnKart): + pass + + path_to_pkl = "path/to/obj" + template_job = self._get_template_job() + master = Kannon( + api_instance=None, + template_job=template_job, + job_prefix="", + path_child_script=__file__, # just pass any existing file as dummy + env_to_inherit=["TASK_WORKSPACE_DIRECTORY"], + ) + # set os env + os.environ.update({"TASK_WORKSPACE_DIRECTORY": "/cache"}) + child_job_name = "test-job" + with self.assertLogs() as cm: + child_job = master._create_child_job_object(child_job_name, path_to_pkl) + + self.assertEqual(cm.output, ['WARNING:kannon.master:Owner reference is not set because master pod info is not provided.']) + self.assertTrue(child_job.metadata.owner_references is None) + if __name__ == '__main__': unittest.main()