Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add owner reference from child job to master pod #38

Merged
merged 7 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion kannon/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from collections import deque
from copy import deepcopy
from time import sleep
from typing import List, Optional

import gokart
from gokart.target import make_target
Expand All @@ -27,7 +28,9 @@ def __init__(
# kannon resources
job_prefix: str,
path_child_script: str = "./run_child.py",
env_to_inherit: list[str] | None = None,
env_to_inherit: Optional[List[str]] = None,
master_pod_name: Optional[str] = None,
master_pod_uid: Optional[str] = None,
max_child_jobs: int | None = None,
) -> None:
# validation
Expand All @@ -42,6 +45,10 @@ def __init__(
if env_to_inherit is None:
env_to_inherit = ["TASK_WORKSPACE_DIRECTORY"]
self.env_to_inherit = env_to_inherit

self.master_pod_name = master_pod_name
self.master_pod_uid = master_pod_uid

if max_child_jobs is not None and max_child_jobs <= 0:
raise ValueError(f"max_child_jobs must be positive integer, but got {max_child_jobs}")
self.max_child_jobs = max_child_jobs
Expand Down Expand Up @@ -164,6 +171,20 @@ def _create_child_job_object(self, job_name: str, task_pkl_path: str) -> client.
job.spec.template.spec.containers[0].env = child_envs
# replace job name
job.metadata.name = job_name
# add owner reference from child to parent if master pod info is available
if self.master_pod_name and self.master_pod_uid:
owner_reference = client.V1OwnerReference(
api_version="batch/v1",
kind="Pod",
name=self.master_pod_name, # owner pod name
uid=self.master_pod_uid, # owner pod uid
)
if job.metadata.owner_references:
job.metadata.owner_references.append(owner_reference)
else:
job.metadata.owner_references = [owner_reference]
else:
logger.warning("Owner reference is not set because master pod info is not provided.")

return job

Expand Down
52 changes: 52 additions & 0 deletions test/unit_test/test_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,58 @@ class Example(gokart.TaskOnKart):
with self.assertRaises(ValueError):
master._create_child_job_object("test-job", path_to_pkl)

def test_owner_reference_set(self) -> None:

class Example(gokart.TaskOnKart):
pass

path_to_pkl = "path/to/obj"
template_job = self._get_template_job()
master_pod_name = "dummy-master-pod-name"
master_pod_uid = "dummy-master-pod-uid"
master = Kannon(
api_instance=None,
template_job=template_job,
job_prefix="",
path_child_script=__file__, # just pass any existing file as dummy
env_to_inherit=["TASK_WORKSPACE_DIRECTORY"],
master_pod_name=master_pod_name,
master_pod_uid=master_pod_uid,
)
# set os env
os.environ.update({"TASK_WORKSPACE_DIRECTORY": "/cache"})
child_job_name = "test-job"
child_job = master._create_child_job_object(child_job_name, path_to_pkl)

owner_references = child_job.metadata.owner_references
self.assertEqual(len(owner_references), 1)
owner_reference = owner_references[0]
self.assertEqual(owner_reference.name, master_pod_name)
self.assertEqual(owner_reference.uid, master_pod_uid)

def test_owner_reference_not_set(self) -> None:

class Example(gokart.TaskOnKart):
pass

path_to_pkl = "path/to/obj"
template_job = self._get_template_job()
master = Kannon(
api_instance=None,
template_job=template_job,
job_prefix="",
path_child_script=__file__, # just pass any existing file as dummy
env_to_inherit=["TASK_WORKSPACE_DIRECTORY"],
)
# set os env
os.environ.update({"TASK_WORKSPACE_DIRECTORY": "/cache"})
child_job_name = "test-job"
with self.assertLogs() as cm:
child_job = master._create_child_job_object(child_job_name, path_to_pkl)

self.assertEqual(cm.output, ['WARNING:kannon.master:Owner reference is not set because master pod info is not provided.'])
self.assertTrue(child_job.metadata.owner_references is None)


if __name__ == '__main__':
unittest.main()