Skip to content

Commit

Permalink
feat: moving mini k8s and k8s job
Browse files Browse the repository at this point in the history
* perf: type checking imports to increase perf

* feat: moving mini k8s and k8s job

* fix: lesser sensitive git errors
  • Loading branch information
vijayvammi authored Jan 14, 2025
1 parent 6dc0264 commit cea2470
Show file tree
Hide file tree
Showing 13 changed files with 207 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
run: uv python install

- name: Install the project
run: uv sync --all-extras --dev
run: uv sync --all-extras --dev --frozen

- name: Run lint
# For example, using `flake8`
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: "Set up Python"
run: uv python install
- run: |
uv sync --only-group release
uv sync --only-group release --frozen
- name: Figure version
continue-on-error: true
env:
Expand Down
7 changes: 5 additions & 2 deletions examples/11-jobs/k8s-job.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
job-executor:
type: "k8s-job"
config:
pvc_claim_name: runnable
config_path:
mock: false
namespace: enterprise-mlops
jobSpec:
# activeDeadlineSeconds: Optional[int]
# selector: Optional[LabelSelector]
Expand All @@ -9,7 +13,6 @@ job-executor:
# metadata:
# annotations: Optional[Dict[str, str]]
# generate_name: Optional[str] = run_id
# namespace: Optional[str] = "default"
spec:
# activeDeadlineSeconds: Optional[int]
# nodeSelector: Optional[Dict[str, str]]
Expand All @@ -25,7 +28,7 @@ job-executor:
# env:
# - name: str
# value: str
image: runnable-m1
image: harbor.csis.astrazeneca.net/mlops/runnable:latest
# imagePullPolicy: Optional[str] = choose from [Always, Never, IfNotPresent]
# resources:
# limits:
Expand Down
42 changes: 42 additions & 0 deletions examples/11-jobs/mini-k8s-job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
job-executor:
type: "mini-k8s-job"
config:
mock: true
jobSpec:
# activeDeadlineSeconds: Optional[int]
# selector: Optional[LabelSelector]
# ttlSecondsAfterFinished: Optional[int]
template:
# metadata:
# annotations: Optional[Dict[str, str]]
# generate_name: Optional[str] = run_id
# namespace: Optional[str] = "default"
spec:
# activeDeadlineSeconds: Optional[int]
# nodeSelector: Optional[Dict[str, str]]
# tolerations: Optional[List[Toleration]]
# volumes:
# - name: str
# hostPath:
# path: str
# serviceAccountName: Optional[str]
# restartPolicy: Optional[str] = Choose from [Always, OnFailure, Never]
container:
# command: List[str]
# env:
# - name: str
# value: str
image: runnable-m1
# imagePullPolicy: Optional[str] = choose from [Always, Never, IfNotPresent]
# resources:
# limits:
# cpu: str
# memory: str
# gpu: str
# requests:
# cpu: str
# memory: str
# gpu: str
# volumeMounts:
# - name: str
# mountPath: str
174 changes: 148 additions & 26 deletions extensions/job_executor/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,56 +101,66 @@ class HostPath(BaseModel):
path: str


class Volume(BaseModel):
class HostPathVolume(BaseModel):
name: str
host_path: HostPath


class TemplateSpec(BaseModel):
class PVCClaim(BaseModel):
claim_name: str

model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
from_attributes=True,
)


class PVCVolume(BaseModel):
name: str
persistent_volume_claim: PVCClaim


class K8sTemplateSpec(BaseModel):
active_deadline_seconds: int = Field(default=60 * 60 * 2) # 2 hours
node_selector: Optional[dict[str, str]] = None
tolerations: Optional[list[dict[str, str]]] = None
volumes: Optional[list[Volume]] = Field(default_factory=lambda: [])
volumes: Optional[list[HostPathVolume | PVCVolume]] = Field(
default_factory=lambda: []
)
service_account_name: Optional[str] = "default"
restart_policy: RestartPolicy = RestartPolicy.NEVER
container: Container


class Template(BaseModel):
spec: TemplateSpec
class K8sTemplate(BaseModel):
spec: K8sTemplateSpec
metadata: Optional[ObjectMetaData] = None


class Spec(BaseModel):
active_deadline_seconds: Optional[int] = Field(default=60 * 60 * 2) # 2 hours
backoff_limit: int = 6
selector: Optional[LabelSelector] = None
template: Template
template: K8sTemplate
ttl_seconds_after_finished: Optional[int] = Field(default=60 * 60 * 24) # 24 hours


class K8sJobExecutor(GenericJobExecutor):
class GenericK8sJobExecutor(GenericJobExecutor):
service_name: str = "k8s-job"
config_path: Optional[str] = None
job_spec: Spec
mock: bool = False

# The location the mount of .run_log_store is mounted to in minikube
# ensure that minikube mount $HOME/workspace/runnable/.run_log_store:/volume/run_logs is executed first
# $HOME/workspace/runnable/.catalog:/volume/catalog
# Ensure that the docker build is done with eval $(minikube docker-env)
mini_k8s_run_log_location: str = Field(default="/volume/run_logs/")
mini_k8s_catalog_location: str = Field(default="/volume/catalog/")
namespace: str = Field(default="default")

_is_local: bool = PrivateAttr(default=False)
_volume_mounts: list[VolumeMount] = PrivateAttr(default_factory=lambda: [])
_volumes: list[HostPathVolume | PVCVolume] = PrivateAttr(default_factory=lambda: [])

_container_log_location: str = PrivateAttr(default="/tmp/run_logs/")
_container_catalog_location: str = PrivateAttr(default="/tmp/catalog/")
_container_secrets_location: str = PrivateAttr(default="/tmp/dotenv")

_volumes: list[Volume] = []
_volume_mounts: list[VolumeMount] = []

model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
Expand Down Expand Up @@ -287,21 +297,61 @@ def submit_k8s_job(self, task: BaseTaskType):
)

logger.info(f"Submitting job: {job.__dict__}")
if self.mock:
print(job.__dict__)
return

try:
k8s_batch = self._client.BatchV1Api()
response = k8s_batch.create_namespaced_job(
body=job,
namespace="default",
_preload_content=False,
pretty=True,
namespace=self.namespace,
)
logger.debug(f"Kubernetes job response: {response}")
except Exception as e:
logger.exception(e)
print(e)
raise

def _create_volumes(self): ...

def _use_volumes(self):
match self._context.run_log_store.service_name:
case "file-system":
self._context.run_log_store.log_folder = self._container_log_location
case "chunked-fs":
self._context.run_log_store.log_folder = self._container_log_location

match self._context.catalog_handler.service_name:
case "file-system":
self._context.catalog_handler.catalog_location = (
self._container_catalog_location
)


class MiniK8sJobExecutor(GenericK8sJobExecutor):
service_name: str = "k8s-job"
config_path: Optional[str] = None
job_spec: Spec
mock: bool = False

# The location the mount of .run_log_store is mounted to in minikube
# ensure that minikube mount $HOME/workspace/runnable/.run_log_store:/volume/run_logs is executed first
# $HOME/workspace/runnable/.catalog:/volume/catalog
# Ensure that the docker build is done with eval $(minikube docker-env)
mini_k8s_run_log_location: str = Field(default="/volume/run_logs/")
mini_k8s_catalog_location: str = Field(default="/volume/catalog/")

_is_local: bool = PrivateAttr(default=False)

model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
from_attributes=True,
)

def _create_volumes(self):
match self._context.run_log_store.service_name:
case "file-system":
Expand All @@ -311,7 +361,7 @@ def _create_volumes(self):
# You then are creating a volume that is mounted to /tmp/run_logs in the container
# You are then referring to it.
# https://stackoverflow.com/questions/57411456/minikube-mounted-host-folders-are-not-working
Volume(
HostPathVolume(
name="run-logs",
host_path=HostPath(path=self.mini_k8s_run_log_location),
)
Expand All @@ -323,7 +373,7 @@ def _create_volumes(self):
)
case "chunked-fs":
self._volumes.append(
Volume(
HostPathVolume(
name="run-logs",
host_path=HostPath(path=self.mini_k8s_run_log_location),
)
Expand All @@ -337,7 +387,7 @@ def _create_volumes(self):
match self._context.catalog_handler.service_name:
case "file-system":
self._volumes.append(
Volume(
HostPathVolume(
name="catalog",
host_path=HostPath(path=self.mini_k8s_catalog_location),
)
Expand All @@ -348,15 +398,87 @@ def _create_volumes(self):
)
)

def _use_volumes(self):

class K8sJobExecutor(GenericK8sJobExecutor):
service_name: str = "k8s-job"
config_path: Optional[str] = None
job_spec: Spec
mock: bool = False
pvc_claim_name: str

# change the spec to pull image if not present
def model_post_init(self, __context):
self.job_spec.template.spec.container.image_pull_policy = ImagePullPolicy.ALWAYS

_is_local: bool = PrivateAttr(default=False)

model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
from_attributes=True,
)

def execute_job(self, job: BaseTaskType, catalog_settings=Optional[List[str]]):
self._use_volumes()
self._set_up_run_log()

job_log = self._context.run_log_store.create_job_log()
self._context.run_log_store.add_job_log(
run_id=self._context.run_id, job_log=job_log
)

job_log = self._context.run_log_store.get_job_log(run_id=self._context.run_id)

attempt_log = job.execute_command(
attempt_number=self.step_attempt_number,
mock=self.mock,
)

job_log.status = attempt_log.status
job_log.attempts.append(attempt_log)

data_catalogs_put: Optional[List[DataCatalog]] = self._sync_catalog(
catalog_settings=catalog_settings
)
logger.debug(f"data_catalogs_put: {data_catalogs_put}")

job_log.add_data_catalogs(data_catalogs_put or [])

console.print("Summary of job")
console.print(job_log.get_summary())

self._context.run_log_store.add_job_log(
run_id=self._context.run_id, job_log=job_log
)

def _create_volumes(self):
self._volumes.append(
PVCVolume(
name=self.pvc_claim_name,
persistent_volume_claim=PVCClaim(claim_name=self.pvc_claim_name),
)
)
match self._context.run_log_store.service_name:
case "file-system":
self._context.run_log_store.log_folder = self._container_log_location
self._volume_mounts.append(
VolumeMount(
name=self.pvc_claim_name,
mount_path=self._container_log_location,
)
)
case "chunked-fs":
self._context.run_log_store.log_folder = self._container_log_location
self._volume_mounts.append(
VolumeMount(
name=self.pvc_claim_name,
mount_path=self._container_log_location,
)
)

match self._context.catalog_handler.service_name:
case "file-system":
self._context.catalog_handler.catalog_location = (
self._container_catalog_location
self._volume_mounts.append(
VolumeMount(
name=self.pvc_claim_name,
mount_path=self._container_catalog_location,
)
)
1 change: 0 additions & 1 deletion extensions/pipeline_executor/local_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ def _spin_container(
f"Please provide a docker_image using executor_config of the step {node.name} or at global config"
)

# TODO: Should consider using getpass.getuser() when running the docker container? Volume permissions
container = client.containers.create(
image=docker_image,
command=command,
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ include = [
[project.entry-points.'job_executor']
"local" = "extensions.job_executor.local:LocalJobExecutor"
"local-container" = "extensions.job_executor.local_container:LocalContainerJobExecutor"
"mini-k8s-job" = "extensions.job_executor.k8s:MiniK8sJobExecutor"
"k8s-job" = "extensions.job_executor.k8s:K8sJobExecutor"
# "argo" = "extensions.pipeline_executor.argo:ArgoExecutor"
# "mocked" = "extensions.pipeline_executor.mocked:MockedExecutor"
Expand Down
1 change: 1 addition & 0 deletions runnable/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# ruff: noqa


import logging
import os
from logging.config import dictConfig
Expand Down
3 changes: 1 addition & 2 deletions runnable/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

logger = logging.getLogger(defaults.LOGGER_NAME)

# TODO: Should ** be allowed as glob pattern as it can potentially copy everything to catalog


def is_catalog_out_of_sync(
catalog, synced_catalogs=Optional[List[DataCatalog]]
Expand Down Expand Up @@ -170,3 +168,4 @@ def sync_between_runs(self, previous_run_id: str, run_id: str):
Does nothing
"""
logger.info("Using a do-nothing catalog, doing nothing while sync between runs")
logger.info("Using a do-nothing catalog, doing nothing while sync between runs")
Loading

0 comments on commit cea2470

Please sign in to comment.