From cea247002273aa571117f10562fb1dfa4c1687d5 Mon Sep 17 00:00:00 2001 From: Vijay Vammi Date: Tue, 14 Jan 2025 00:41:22 +0000 Subject: [PATCH] feat: moving mini k8s and k8s job * perf: type checking imports to increase perf * feat: moving mini k8s and k8s job * fix: lesser sensitive git errors --- .github/workflows/pr.yaml | 2 +- .github/workflows/release.yaml | 2 +- examples/11-jobs/k8s-job.yaml | 7 +- examples/11-jobs/mini-k8s-job.yaml | 42 +++++ extensions/job_executor/k8s.py | 174 +++++++++++++++--- .../pipeline_executor/local_container.py | 1 - pyproject.toml | 1 + runnable/__init__.py | 1 + runnable/catalog.py | 3 +- runnable/entrypoints.py | 6 +- runnable/executor.py | 2 +- runnable/parameters.py | 9 - runnable/utils.py | 30 +-- 13 files changed, 207 insertions(+), 73 deletions(-) create mode 100644 examples/11-jobs/mini-k8s-job.yaml diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index a59f6f2a..ede78fe9 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -38,7 +38,7 @@ jobs: run: uv python install - name: Install the project - run: uv sync --all-extras --dev + run: uv sync --all-extras --dev --frozen - name: Run lint # For example, using `flake8` diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 0ad4bb61..67aa2128 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -25,7 +25,7 @@ jobs: - name: "Set up Python" run: uv python install - run: | - uv sync --only-group release + uv sync --only-group release --frozen - name: Figure version continue-on-error: true env: diff --git a/examples/11-jobs/k8s-job.yaml b/examples/11-jobs/k8s-job.yaml index 7386eca1..aae76527 100644 --- a/examples/11-jobs/k8s-job.yaml +++ b/examples/11-jobs/k8s-job.yaml @@ -1,6 +1,10 @@ job-executor: type: "k8s-job" config: + pvc_claim_name: runnable + config_path: + mock: false + namespace: enterprise-mlops jobSpec: # activeDeadlineSeconds: Optional[int] # selector: Optional[LabelSelector] @@ -9,7 +13,6 @@ job-executor: # metadata: # annotations: Optional[Dict[str, str]] # generate_name: Optional[str] = run_id - # namespace: Optional[str] = "default" spec: # activeDeadlineSeconds: Optional[int] # nodeSelector: Optional[Dict[str, str]] @@ -25,7 +28,7 @@ job-executor: # env: # - name: str # value: str - image: runnable-m1 + image: harbor.csis.astrazeneca.net/mlops/runnable:latest # imagePullPolicy: Optional[str] = choose from [Always, Never, IfNotPresent] # resources: # limits: diff --git a/examples/11-jobs/mini-k8s-job.yaml b/examples/11-jobs/mini-k8s-job.yaml new file mode 100644 index 00000000..4e191fe7 --- /dev/null +++ b/examples/11-jobs/mini-k8s-job.yaml @@ -0,0 +1,42 @@ +job-executor: + type: "mini-k8s-job" + config: + mock: true + jobSpec: + # activeDeadlineSeconds: Optional[int] + # selector: Optional[LabelSelector] + # ttlSecondsAfterFinished: Optional[int] + template: + # metadata: + # annotations: Optional[Dict[str, str]] + # generate_name: Optional[str] = run_id + # namespace: Optional[str] = "default" + spec: + # activeDeadlineSeconds: Optional[int] + # nodeSelector: Optional[Dict[str, str]] + # tolerations: Optional[List[Toleration]] + # volumes: + # - name: str + # hostPath: + # path: str + # serviceAccountName: Optional[str] + # restartPolicy: Optional[str] = Choose from [Always, OnFailure, Never] + container: + # command: List[str] + # env: + # - name: str + # value: str + image: runnable-m1 + # imagePullPolicy: Optional[str] = choose from [Always, Never, IfNotPresent] + # resources: + # limits: + # cpu: str + # memory: str + # gpu: str + # requests: + # cpu: str + # memory: str + # gpu: str + # volumeMounts: + # - name: str + # mountPath: str diff --git a/extensions/job_executor/k8s.py b/extensions/job_executor/k8s.py index c91fd5e2..51e2a3bf 100644 --- a/extensions/job_executor/k8s.py +++ b/extensions/job_executor/k8s.py @@ -101,23 +101,40 @@ class HostPath(BaseModel): path: str -class Volume(BaseModel): +class HostPathVolume(BaseModel): name: str host_path: HostPath -class TemplateSpec(BaseModel): +class PVCClaim(BaseModel): + claim_name: str + + model_config = ConfigDict( + alias_generator=to_camel, + populate_by_name=True, + from_attributes=True, + ) + + +class PVCVolume(BaseModel): + name: str + persistent_volume_claim: PVCClaim + + +class K8sTemplateSpec(BaseModel): active_deadline_seconds: int = Field(default=60 * 60 * 2) # 2 hours node_selector: Optional[dict[str, str]] = None tolerations: Optional[list[dict[str, str]]] = None - volumes: Optional[list[Volume]] = Field(default_factory=lambda: []) + volumes: Optional[list[HostPathVolume | PVCVolume]] = Field( + default_factory=lambda: [] + ) service_account_name: Optional[str] = "default" restart_policy: RestartPolicy = RestartPolicy.NEVER container: Container -class Template(BaseModel): - spec: TemplateSpec +class K8sTemplate(BaseModel): + spec: K8sTemplateSpec metadata: Optional[ObjectMetaData] = None @@ -125,32 +142,25 @@ class Spec(BaseModel): active_deadline_seconds: Optional[int] = Field(default=60 * 60 * 2) # 2 hours backoff_limit: int = 6 selector: Optional[LabelSelector] = None - template: Template + template: K8sTemplate ttl_seconds_after_finished: Optional[int] = Field(default=60 * 60 * 24) # 24 hours -class K8sJobExecutor(GenericJobExecutor): +class GenericK8sJobExecutor(GenericJobExecutor): service_name: str = "k8s-job" config_path: Optional[str] = None job_spec: Spec mock: bool = False - - # The location the mount of .run_log_store is mounted to in minikube - # ensure that minikube mount $HOME/workspace/runnable/.run_log_store:/volume/run_logs is executed first - # $HOME/workspace/runnable/.catalog:/volume/catalog - # Ensure that the docker build is done with eval $(minikube docker-env) - mini_k8s_run_log_location: str = Field(default="/volume/run_logs/") - mini_k8s_catalog_location: str = Field(default="/volume/catalog/") + namespace: str = Field(default="default") _is_local: bool = PrivateAttr(default=False) + _volume_mounts: list[VolumeMount] = PrivateAttr(default_factory=lambda: []) + _volumes: list[HostPathVolume | PVCVolume] = PrivateAttr(default_factory=lambda: []) _container_log_location: str = PrivateAttr(default="/tmp/run_logs/") _container_catalog_location: str = PrivateAttr(default="/tmp/catalog/") _container_secrets_location: str = PrivateAttr(default="/tmp/dotenv") - _volumes: list[Volume] = [] - _volume_mounts: list[VolumeMount] = [] - model_config = ConfigDict( alias_generator=to_camel, populate_by_name=True, @@ -287,14 +297,17 @@ def submit_k8s_job(self, task: BaseTaskType): ) logger.info(f"Submitting job: {job.__dict__}") + if self.mock: + print(job.__dict__) + return try: k8s_batch = self._client.BatchV1Api() response = k8s_batch.create_namespaced_job( body=job, - namespace="default", _preload_content=False, pretty=True, + namespace=self.namespace, ) logger.debug(f"Kubernetes job response: {response}") except Exception as e: @@ -302,6 +315,43 @@ def submit_k8s_job(self, task: BaseTaskType): print(e) raise + def _create_volumes(self): ... + + def _use_volumes(self): + match self._context.run_log_store.service_name: + case "file-system": + self._context.run_log_store.log_folder = self._container_log_location + case "chunked-fs": + self._context.run_log_store.log_folder = self._container_log_location + + match self._context.catalog_handler.service_name: + case "file-system": + self._context.catalog_handler.catalog_location = ( + self._container_catalog_location + ) + + +class MiniK8sJobExecutor(GenericK8sJobExecutor): + service_name: str = "k8s-job" + config_path: Optional[str] = None + job_spec: Spec + mock: bool = False + + # The location the mount of .run_log_store is mounted to in minikube + # ensure that minikube mount $HOME/workspace/runnable/.run_log_store:/volume/run_logs is executed first + # $HOME/workspace/runnable/.catalog:/volume/catalog + # Ensure that the docker build is done with eval $(minikube docker-env) + mini_k8s_run_log_location: str = Field(default="/volume/run_logs/") + mini_k8s_catalog_location: str = Field(default="/volume/catalog/") + + _is_local: bool = PrivateAttr(default=False) + + model_config = ConfigDict( + alias_generator=to_camel, + populate_by_name=True, + from_attributes=True, + ) + def _create_volumes(self): match self._context.run_log_store.service_name: case "file-system": @@ -311,7 +361,7 @@ def _create_volumes(self): # You then are creating a volume that is mounted to /tmp/run_logs in the container # You are then referring to it. # https://stackoverflow.com/questions/57411456/minikube-mounted-host-folders-are-not-working - Volume( + HostPathVolume( name="run-logs", host_path=HostPath(path=self.mini_k8s_run_log_location), ) @@ -323,7 +373,7 @@ def _create_volumes(self): ) case "chunked-fs": self._volumes.append( - Volume( + HostPathVolume( name="run-logs", host_path=HostPath(path=self.mini_k8s_run_log_location), ) @@ -337,7 +387,7 @@ def _create_volumes(self): match self._context.catalog_handler.service_name: case "file-system": self._volumes.append( - Volume( + HostPathVolume( name="catalog", host_path=HostPath(path=self.mini_k8s_catalog_location), ) @@ -348,15 +398,87 @@ def _create_volumes(self): ) ) - def _use_volumes(self): + +class K8sJobExecutor(GenericK8sJobExecutor): + service_name: str = "k8s-job" + config_path: Optional[str] = None + job_spec: Spec + mock: bool = False + pvc_claim_name: str + + # change the spec to pull image if not present + def model_post_init(self, __context): + self.job_spec.template.spec.container.image_pull_policy = ImagePullPolicy.ALWAYS + + _is_local: bool = PrivateAttr(default=False) + + model_config = ConfigDict( + alias_generator=to_camel, + populate_by_name=True, + from_attributes=True, + ) + + def execute_job(self, job: BaseTaskType, catalog_settings=Optional[List[str]]): + self._use_volumes() + self._set_up_run_log() + + job_log = self._context.run_log_store.create_job_log() + self._context.run_log_store.add_job_log( + run_id=self._context.run_id, job_log=job_log + ) + + job_log = self._context.run_log_store.get_job_log(run_id=self._context.run_id) + + attempt_log = job.execute_command( + attempt_number=self.step_attempt_number, + mock=self.mock, + ) + + job_log.status = attempt_log.status + job_log.attempts.append(attempt_log) + + data_catalogs_put: Optional[List[DataCatalog]] = self._sync_catalog( + catalog_settings=catalog_settings + ) + logger.debug(f"data_catalogs_put: {data_catalogs_put}") + + job_log.add_data_catalogs(data_catalogs_put or []) + + console.print("Summary of job") + console.print(job_log.get_summary()) + + self._context.run_log_store.add_job_log( + run_id=self._context.run_id, job_log=job_log + ) + + def _create_volumes(self): + self._volumes.append( + PVCVolume( + name=self.pvc_claim_name, + persistent_volume_claim=PVCClaim(claim_name=self.pvc_claim_name), + ) + ) match self._context.run_log_store.service_name: case "file-system": - self._context.run_log_store.log_folder = self._container_log_location + self._volume_mounts.append( + VolumeMount( + name=self.pvc_claim_name, + mount_path=self._container_log_location, + ) + ) case "chunked-fs": - self._context.run_log_store.log_folder = self._container_log_location + self._volume_mounts.append( + VolumeMount( + name=self.pvc_claim_name, + mount_path=self._container_log_location, + ) + ) match self._context.catalog_handler.service_name: case "file-system": - self._context.catalog_handler.catalog_location = ( - self._container_catalog_location + self._volume_mounts.append( + VolumeMount( + name=self.pvc_claim_name, + mount_path=self._container_catalog_location, + ) ) diff --git a/extensions/pipeline_executor/local_container.py b/extensions/pipeline_executor/local_container.py index 3bd401fb..4659bbf2 100644 --- a/extensions/pipeline_executor/local_container.py +++ b/extensions/pipeline_executor/local_container.py @@ -268,7 +268,6 @@ def _spin_container( f"Please provide a docker_image using executor_config of the step {node.name} or at global config" ) - # TODO: Should consider using getpass.getuser() when running the docker container? Volume permissions container = client.containers.create( image=docker_image, command=command, diff --git a/pyproject.toml b/pyproject.toml index 884ac954..6bb16604 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -98,6 +98,7 @@ include = [ [project.entry-points.'job_executor'] "local" = "extensions.job_executor.local:LocalJobExecutor" "local-container" = "extensions.job_executor.local_container:LocalContainerJobExecutor" +"mini-k8s-job" = "extensions.job_executor.k8s:MiniK8sJobExecutor" "k8s-job" = "extensions.job_executor.k8s:K8sJobExecutor" # "argo" = "extensions.pipeline_executor.argo:ArgoExecutor" # "mocked" = "extensions.pipeline_executor.mocked:MockedExecutor" diff --git a/runnable/__init__.py b/runnable/__init__.py index d8922c58..408e49e1 100644 --- a/runnable/__init__.py +++ b/runnable/__init__.py @@ -1,5 +1,6 @@ # ruff: noqa + import logging import os from logging.config import dictConfig diff --git a/runnable/catalog.py b/runnable/catalog.py index a98b1456..f8c8f220 100644 --- a/runnable/catalog.py +++ b/runnable/catalog.py @@ -10,8 +10,6 @@ logger = logging.getLogger(defaults.LOGGER_NAME) -# TODO: Should ** be allowed as glob pattern as it can potentially copy everything to catalog - def is_catalog_out_of_sync( catalog, synced_catalogs=Optional[List[DataCatalog]] @@ -170,3 +168,4 @@ def sync_between_runs(self, previous_run_id: str, run_id: str): Does nothing """ logger.info("Using a do-nothing catalog, doing nothing while sync between runs") + logger.info("Using a do-nothing catalog, doing nothing while sync between runs") diff --git a/runnable/entrypoints.py b/runnable/entrypoints.py index 5d02492e..a9de6fe4 100644 --- a/runnable/entrypoints.py +++ b/runnable/entrypoints.py @@ -16,9 +16,6 @@ logger = logging.getLogger(defaults.LOGGER_NAME) -print("") # removes the buffer print - - def get_default_configs() -> RunnableConfig: """ User can provide extensions as part of their code base, runnable-config.yaml provides the place to put them. @@ -128,11 +125,10 @@ def prepare_configurations( "job-executor", None ) # type: ignore if not job_executor_config: - executor_config = cast( + job_executor_config = cast( ServiceConfig, runnable_defaults.get("job-executor", defaults.DEFAULT_JOB_EXECUTOR), ) - assert job_executor_config, "Job executor is not provided" configured_executor = utils.get_provider_by_name_and_type( "job_executor", job_executor_config diff --git a/runnable/executor.py b/runnable/executor.py index 0e255669..c6516903 100644 --- a/runnable/executor.py +++ b/runnable/executor.py @@ -11,9 +11,9 @@ from runnable import defaults from runnable.datastore import DataCatalog, JobLog, StepLog from runnable.defaults import TypeMapVariable -from runnable.graph import Graph if TYPE_CHECKING: # pragma: no cover + from runnable.graph import Graph from runnable.nodes import BaseNode from runnable.tasks import BaseTaskType diff --git a/runnable/parameters.py b/runnable/parameters.py index 801dc88d..3cfbf317 100644 --- a/runnable/parameters.py +++ b/runnable/parameters.py @@ -15,8 +15,6 @@ logger = logging.getLogger(defaults.LOGGER_NAME) -# TODO: Revisit this, it might be a bit too complicated than required - def get_user_set_parameters(remove: bool = False) -> Dict[str, JsonParameter]: """ @@ -50,13 +48,6 @@ def get_user_set_parameters(remove: bool = False) -> Dict[str, JsonParameter]: return parameters -def serialize_parameter_as_str(value: Any) -> str: - if isinstance(value, BaseModel): - return json.dumps(value.model_dump()) - - return json.dumps(value) - - def filter_arguments_for_func( func: Callable[..., Any], params: Dict[str, Any], diff --git a/runnable/utils.py b/runnable/utils.py index ff56c255..40ce9b6b 100644 --- a/runnable/utils.py +++ b/runnable/utils.py @@ -17,7 +17,7 @@ from stevedore import driver import runnable.context as context -from runnable import defaults, names +from runnable import console, defaults, names from runnable.defaults import TypeMapVariable if TYPE_CHECKING: # pragma: no cover @@ -176,7 +176,7 @@ def is_a_git_repo() -> bool: logger.info("Found the code to be git versioned") return True except BaseException: # pylint: disable=W0702 - logger.error("No git repo found, unsafe hash") + console.print("Not a git repo", style="bold red") return False @@ -195,27 +195,7 @@ def get_current_code_commit() -> Union[str, None]: logger.info("Found the git commit to be: %s", label) return label except BaseException: # pylint: disable=W0702 - logger.exception("Error getting git hash") - raise - - -def archive_git_tracked(name: str): - """Generate a git archive of the tracked files. - - Args: - name (str): The name to give the archive - - Raises: - Exception: If its not a git repo - """ - command = f"git archive -v -o {name}.tar.gz --format=tar.gz HEAD" - - if not is_a_git_repo(): - raise Exception("Not a git repo") - try: - subprocess.check_output(command.split()).strip().decode("utf-8") - except BaseException: # pylint: disable=W0702 - logger.exception("Error archiving repo") + console.print("Not a git repo, error getting hash", style="bold red") raise @@ -234,7 +214,7 @@ def is_git_clean() -> Tuple[bool, Union[None, str]]: return True, None return False, label except BaseException: # pylint: disable=W0702 - logger.exception("Error checking if the code is git clean") + console.print("Not a git repo, not clean", style="bold red") return False, None @@ -253,7 +233,7 @@ def get_git_remote() -> Union[str, None]: logger.info("Found the git remote to be: %s", label) return label except BaseException: # pylint: disable=W0702 - logger.exception("Error getting git remote") + console.print("Not a git repo, no remote", style="bold red") raise