Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RHOAIENG-18848: chore(tests/containers): initial kubernetes/openshift deployment support #892

Merged
merged 6 commits into from
Feb 25, 2025

Conversation

jiridanek
Copy link
Member

@jiridanek jiridanek commented Feb 10, 2025

https://issues.redhat.com/browse/RHOAIENG-18848

Description

Test that deploys image on kubernetes/openshift and connects to it to check it started.

How Has This Been Tested?

Merge criteria:

  • The commits are squashed in a cohesive manner and have meaningful messages.
  • Testing instructions have been added in the PR body (for PRs involving changes that are not immediately obvious).
  • The developer has manually tested the changes and verified that the changes work

@openshift-ci openshift-ci bot added size/xxl and removed size/xxl labels Feb 10, 2025
@openshift-ci openshift-ci bot added size/xxl and removed size/xxl labels Feb 10, 2025
@openshift-ci openshift-ci bot added size/xxl and removed size/xxl labels Feb 23, 2025
@jiridanek jiridanek changed the base branch from refactor/konflux-migration to main February 23, 2025 10:21
@openshift-ci openshift-ci bot removed the size/xxl label Feb 23, 2025
@openshift-ci openshift-ci bot added size/xxl and removed size/xxl labels Feb 24, 2025
…iner on openshift/kubernetes

* install local-path provisioner on kubernetes in github actions
* more careful printing of pod status in case `containerStatuses == None`
* sort out how we want to work with privileged/unprivileged client
* only run the new test if we have kubernetes around
* add pod waiting and port forwarding utils

diff --git c/.github/workflows/build-notebooks-TEMPLATE.yaml i/.github/workflows/build-notebooks-TEMPLATE.yaml
index 8a98aa21..13507b78 100644
--- c/.github/workflows/build-notebooks-TEMPLATE.yaml
+++ i/.github/workflows/build-notebooks-TEMPLATE.yaml
@@ -290,10 +290,10 @@ jobs:
       - name: Install deps
         run: poetry install --sync

-      - name: Run container tests (in PyTest)
+      - name: Run Testcontainers container tests (in PyTest)
         run: |
           set -Eeuxo pipefail
-          poetry run pytest --capture=fd tests/containers --image="${{ steps.calculated_vars.outputs.OUTPUT_IMAGE }}"
+          poetry run pytest --capture=fd tests/containers -m 'not openshift' --image="${{ steps.calculated_vars.outputs.OUTPUT_IMAGE }}"
         env:
           DOCKER_HOST: "unix:///var/run/podman/podman.sock"
           TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: "/var/run/podman/podman.sock"
@@ -439,6 +439,16 @@ jobs:
           kubectl wait deployments --all --all-namespaces --for=condition=Available --timeout=100s
           kubectl wait pods --all --all-namespaces --for=condition=Ready --timeout=100s

+      - name: "Install local-path provisioner"
+        if: ${{ steps.have-tests.outputs.tests == 'true' }}
+        run: |
+          set -Eeuxo pipefail
+          kubectl apply -f https://raw.githubusercontent.com/rancher/local-path-provisioner/v0.0.31/deploy/local-path-storage.yaml
+          kubectl wait deployments --all --namespace=local-path-storage --for=condition=Available --timeout=100s
+          # https://kubernetes.io/docs/tasks/administer-cluster/change-default-storage-class/
+          kubectl get storageclass
+          kubectl patch storageclass local-path -p '{"metadata": {"annotations":{"storageclass.kubernetes.io/is-default-class":"true"}}}'
+
       - name: "Run image tests"
         if: ${{ steps.have-tests.outputs.tests == 'true' }}
         run: python3 ci/cached-builds/make_test.py --target ${{ inputs.target }}
@@ -449,6 +459,18 @@ jobs:

       # endregion

+      - name: Run OpenShift container tests (in PyTest)
+        if: ${{ steps.have-tests.outputs.tests == 'true' }}
+        run: |
+          set -Eeuxo pipefail
+          poetry run pytest --capture=fd tests/containers -m 'openshift' --image="${{ steps.calculated_vars.outputs.OUTPUT_IMAGE }}"
+        env:
+          # TODO(jdanek): this Testcontainers stuff should not be necessary but currently it has to be there
+          DOCKER_HOST: "unix:///var/run/podman/podman.sock"
+          TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: "/var/run/podman/podman.sock"
+          # pulling the Ryuk container from docker.io introduces CI flakiness
+          TESTCONTAINERS_RYUK_DISABLED: "true"
+
       # region Trivy vulnerability scan

       - name: Run Trivy vulnerability scanner
diff --git c/README.md i/README.md
index 961e5907..22703ac0 100644
--- c/README.md
+++ i/README.md
@@ -105,7 +105,7 @@ sudo dnf install podman
 systemctl --user start podman.service
 systemctl --user status podman.service
 systemctl --user status podman.socket
-DOCKER_HOST=unix:///run/user/$UID/podman/podman.sock poetry run pytest tests/containers --image quay.io/opendatahub/workbench-images@sha256:e98d19df346e7abb1fa3053f6d41f0d1fa9bab39e49b4cb90b510ca33452c2e4
+DOCKER_HOST=unix:///run/user/$UID/podman/podman.sock poetry run pytest tests/containers -m 'not openshift' --image quay.io/opendatahub/workbench-images@sha256:e98d19df346e7abb1fa3053f6d41f0d1fa9bab39e49b4cb90b510ca33452c2e4

 # Mac OS
 brew install podman
@@ -113,7 +113,7 @@ podman machine init
 podman machine set --rootful
 sudo podman-mac-helper install
 podman machine start
-poetry run pytest tests/containers --image quay.io/opendatahub/workbench-images@sha256:e98d19df346e7abb1fa3053f6d41f0d1fa9bab39e49b4cb90b510ca33452c2e4
+poetry run pytest tests/containers -m 'not openshift' --image quay.io/opendatahub/workbench-images@sha256:e98d19df346e7abb1fa3053f6d41f0d1fa9bab39e49b4cb90b510ca33452c2e4
 ```

 When using lima on macOS, it might be useful to give yourself access to rootful podman socket
diff --git c/pyproject.toml i/pyproject.toml
index 9271a4c3..6440b123 100644
--- c/pyproject.toml
+++ i/pyproject.toml
@@ -8,6 +8,7 @@ package-mode = false

 [tool.poetry.dependencies]
 python = "~3.12"
+requests = "^2.32.3"

 [tool.poetry.group.dev.dependencies]
diff --git c/pytest.ini i/pytest.ini
index 2b320d7a..aff25089 100644
--- c/pytest.ini
+++ i/pytest.ini
@@ -15,3 +15,5 @@ log_cli_level = INFO

 log_file = logs/pytest-logs.txt
 log_file_level = DEBUG
+
+markers = openshift
diff --git c/tests/containers/base_image_test.py i/tests/containers/base_image_test.py
index 03f3d9ae..b7e00498 100644
--- c/tests/containers/base_image_test.py
+++ i/tests/containers/base_image_test.py
@@ -11,12 +11,13 @@ import tempfile
 import textwrap
 from typing import TYPE_CHECKING, Any, Callable

-import pytest
 import testcontainers.core.container
 import testcontainers.core.waiting_utils

 from tests.containers import docker_utils

+import pytest
+
 logging.basicConfig(level=logging.DEBUG)
 LOGGER = logging.getLogger(__name__)

@@ -72,7 +73,8 @@ class TestBaseImage:
                         if "not found" in line:
                             unsatisfied_deps.append((dlib, line.strip()))
                     assert output
-                print("OUTPUT>", json.dumps({"dir": path, "count_scanned": count_scanned, "unsatisfied": unsatisfied_deps}))
+                print("OUTPUT>",
+                      json.dumps({"dir": path, "count_scanned": count_scanned, "unsatisfied": unsatisfied_deps}))

         try:
             container.start()
@@ -105,18 +107,7 @@ class TestBaseImage:
                 with subtests.test(f"{dlib=}"):
                     pytest.fail(f"{dlib=} has unsatisfied dependencies {deps=}")

-    def test_oc_command_runs(self, image: str):
-        container = testcontainers.core.container.DockerContainer(image=image, user=23456, group_add=[0])
-        container.with_command("/bin/sh -c 'sleep infinity'")
-        try:
-            container.start()
-            ecode, output = container.exec(["/bin/sh", "-c", "oc version"])
-        finally:
-            docker_utils.NotebookContainer(container).stop(timeout=0)
-
-        logging.debug(output.decode())
-        assert ecode == 0
-
+    # @pytest.mark.environmentss("docker")
     def test_oc_command_runs_fake_fips(self, image: str, subtests: pytest_subtests.SubTests):
         """Establishes a best-effort fake FIPS environment and attempts to execute `oc` binary in it.

@@ -190,7 +181,8 @@ class TestBaseImage:
             docker_utils.NotebookContainer(container).stop(timeout=0)

-def encode_python_function_execution_command_interpreter(python: str, function: Callable[..., Any], *args: list[Any]) -> list[str]:
+def encode_python_function_execution_command_interpreter(python: str, function: Callable[..., Any], *args: list[Any]) -> \
+        list[str]:
     """Returns a cli command that will run the given Python function encoded inline.
     All dependencies (imports, ...) must be part of function body."""
     code = textwrap.dedent(inspect.getsource(function))
diff --git c/tests/containers/cancellation_token.py i/tests/containers/cancellation_token.py
new file mode 100644
index 00000000..d7d62603
--- /dev/null
+++ i/tests/containers/cancellation_token.py
@@ -0,0 +1,37 @@
+import os
+import threading
+
+
+class CancellationToken:
+    """Flag to signal a thread it should cancel itself.
+    This cooperative cancellation pattern is commonly used in c# and go
+    See https://learn.microsoft.com/en-us/dotnet/api/system.threading.cancellationtoken?view=net-9.0
+    """
+
+    def __init__(self):
+        # consider using the wrapt.synchronized decorator
+        # https://github.com/GrahamDumpleton/wrapt/blob/develop/blog/07-the-missing-synchronized-decorator.md
+        self._lock = threading.Lock()
+        self._canceled = False
+        # something selectable avoids having to use short timeout in select
+        self._read_fd, self._write_fd = os.pipe()
+
+    def fileno(self):
+        """This lets us use the token in select() calls"""
+        return self._read_fd
+
+    @Property
+    def cancelled(self):
+        with self._lock:
+            return self._canceled
+
+    def cancel(self):
+        with self._lock:
+            os.write(self._write_fd, b'x')
+            self._canceled = True
+
+    def __del__(self):
+        # consider https://docs.python.org/3/library/weakref.html#weakref.finalize
+        with self._lock:
+            os.close(self._read_fd)
+            os.close(self._write_fd)
diff --git c/tests/containers/kubernetes_utils.py i/tests/containers/kubernetes_utils.py
new file mode 100644
index 00000000..66829829
--- /dev/null
+++ i/tests/containers/kubernetes_utils.py
@@ -0,0 +1,473 @@
+from __future__ import annotations
+
+import contextlib
+import functools
+import logging
+import threading
+import time
+import traceback
+import typing
+import socket
+from socket import socket
+from typing import Any, Callable, Generator
+
+import requests
+
+import kubernetes
+import kubernetes.dynamic.exceptions
+import kubernetes.stream.ws_client
+import kubernetes.dynamic.exceptions
+import kubernetes.stream.ws_client
+import kubernetes.client.api.core_v1_api
+from kubernetes.dynamic import DynamicClient, ResourceField
+
+import ocp_resources.pod
+import ocp_resources.deployment
+import ocp_resources.service
+import ocp_resources.persistent_volume_claim
+import ocp_resources.project_request
+import ocp_resources.namespace
+import ocp_resources.project_project_openshift_io
+import ocp_resources.deployment
+import ocp_resources.resource
+import ocp_resources.pod
+import ocp_resources.namespace
+import ocp_resources.project_project_openshift_io
+import ocp_resources.project_request
+
+from tests.containers import socket_proxy
+
+
+class TestFrameConstants:
+    GLOBAL_POLL_INTERVAL_MEDIUM = 10
+    TIMEOUT_2MIN = 2 * 60
+
+
+logging.basicConfig(level=logging.DEBUG)
+LOGGER = logging.getLogger(__name__)
+
+
+# https://github.com/RedHatQE/openshift-python-wrapper/tree/main/examples
+
+def get_client() -> kubernetes.dynamic.DynamicClient:
+    try:
+        # client = kubernetes.dynamic.DynamicClient(client=kubernetes.config.new_client_from_config())
+        # probably same as above
+        client = ocp_resources.resource.get_client()
+        return client
+    except kubernetes.config.ConfigException as e:
+        # probably bad config
+        logging.error(e)
+    except kubernetes.dynamic.exceptions.UnauthorizedError as e:
+        # wrong or expired credentials
+        logging.error(e)
+    except kubernetes.client.ApiException as e:
+        # unexpected, we catch unauthorized above
+        logging.error(e)
+    except Exception as e:
+        # unexpected error, assert here
+        logging.error(e)
+
+    raise RuntimeError("Failed to instantiate client")
+
+
+def get_username(client: kubernetes.dynamic.DynamicClient) -> str:
+    # can't just access
+    # > client.configuration.username
+    # because we normally auth using tokens, not username and password
+
+    # this is what kubectl does (see kubectl -v8 auth whoami)
+    self_subject_review_resource: kubernetes.dynamic.Resource = client.resources.get(
+        api_version="authentication.k8s.io/v1", kind="SelfSubjectReview"
+    )
+    self_subject_review: kubernetes.dynamic.ResourceInstance = client.create(self_subject_review_resource)
+    username: str = self_subject_review.status.userInfo.username
+    return username
+
+
+class TestKubernetesUtils:
+    def test_get_username(self):
+        client = get_client()
+        username = get_username(client)
+        assert username is not None and len(username) > 0
+
+
+class TestFrame:
+    def __init__[T](self):
+        self.stack: list[tuple[T, Callable[[T], None] | None]] = []
+
+    def defer_resource[T: ocp_resources.resource.Resource](self, resource: T, wait=False,
+                                                           destructor: Callable[[T], None] | None = None) -> T:
+        result = resource.deploy(wait=wait)
+        self.defer(resource, destructor)
+        return result
+
+    def add[T](self, resource: T, destructor: Callable[[T], None] = None) -> T:
+        self.defer(resource, destructor)
+        return resource
+
+    def defer[T](self, resource: T, destructor: Callable[[T], None] = None) -> T:
+        self.stack.append((resource, destructor))
+
+    def destroy(self, wait=False):
+        while self.stack:
+            resource, destructor = self.stack.pop()
+            if destructor is not None:
+                destructor(resource)
+            else:
+                resource.clean_up(wait=wait)
+
+    def __enter__(self) -> TestFrame:
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.destroy(wait=True)
+
+
+class ImageDeployment:
+    def __init__(self, client: kubernetes.dynamic.DynamicClient, image: str):
+        self.client = client
+        self.image = image
+        self.tf = TestFrame()
+
+    def __enter__(self) -> ImageDeployment:
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.tf.destroy()
+
+    def deploy(self, container_name: str) -> None:
+        LOGGER.debug(f"Deploying {self.image}")
+        # custom namespace is necessary, because we cannot assign a SCC to pods created in one of the default namespaces:
+        #  default, kube-system, kube-public, openshift-node, openshift-infra, openshift.
+        # https://docs.openshift.com/container-platform/4.17/authentication/managing-security-context-constraints.html#role-based-access-to-ssc_configuring-internal-oauth
+
+        # TODO(jdanek): sort out how we want to work with privileged/unprivileged client
+        #  take inspiration from odh-tests
+        ns = create_namespace(privileged_client=True, name=f"test-ns-{container_name}")
+        self.tf.defer_resource(ns)
+
+        pvc = ocp_resources.persistent_volume_claim.PersistentVolumeClaim(
+            name=container_name,
+            namespace=ns.name,
+            accessmodes=ocp_resources.persistent_volume_claim.PersistentVolumeClaim.AccessMode.RWO,
+            volume_mode=ocp_resources.persistent_volume_claim.PersistentVolumeClaim.VolumeMode.FILE,
+            size="1Gi",
+        )
+        self.tf.defer_resource(pvc, wait=True)
+        deployment = ocp_resources.deployment.Deployment(
+            client=self.client,
+            name=container_name,
+            namespace=ns.name,
+            selector={"matchLabels": {"app": container_name}},
+            replicas=1,
+            template={
+                "metadata": {
+                    "annotations": {
+                        # This will result in the container spec having something like below,
+                        # regardless of what kind of namespace this is being run in.
+                        # Keep in mind that `default` is a privileged namespace and this annotation has no effect there.
+                        # ```
+                        # spec:
+                        #   securityContext:
+                        #     seLinuxOptions:
+                        #       level: 's0:c34,c4'
+                        #     fsGroup: 1001130000
+                        #     seccompProfile:
+                        #       type: RuntimeDefault
+                        # ```
+                        "openshift.io/scc": "restricted-v2"
+                    },
+                    "labels": {
+                        "app": container_name,
+                    }
+                },
+                "spec": {
+                    "containers": [
+                        {
+                            "name": container_name,
+                            "image": self.image,
+                            # "command": ["/bin/sh", "-c", "while true ; do date; sleep 5; done;"],
+                            "ports": [
+                                {
+                                    "containerPort": 8888,
+                                    "name": "notebook-port",
+                                    "protocol": "TCP",
+                                }
+                            ],
+                            # rstudio will not start without its volume mount and it does not log the error for it
+                            # See the testcontainers implementation of this (the tty=True part)
+                            "volumeMounts": [
+                                {
+                                    "mountPath": "/opt/app-root/src",
+                                    "name": "my-workbench"
+                                }
+                            ],
+                        },
+                    ],
+                    "volumes": [
+                        {
+                            "name": "my-workbench",
+                            "persistentVolumeClaim": {
+                                "claimName": container_name,
+                            }
+                        }
+                    ]
+                }
+            }
+        )
+        self.tf.defer_resource(deployment)
+        LOGGER.debug(f"Waiting for pods to become ready...")
+        PodUtils.wait_for_pods_ready(self.client, namespace_name=ns.name, label_selector=f"app={container_name}",
+                                     expect_pods_count=1)
+
+        core_v1_api = kubernetes.client.api.core_v1_api.CoreV1Api(api_client=self.client.client)
+        pod_name: kubernetes.client.models.v1_pod_list.V1PodList = core_v1_api.list_namespaced_pod(
+            namespace=ns.name,
+            label_selector=f"app={container_name}"
+        )
+        assert len(pod_name.items) == 1
+        pod: kubernetes.client.models.v1_pod.V1Pod = pod_name.items[0]
+
+        p = socket_proxy.SocketProxy(exposing_contextmanager(core_v1_api, pod), "localhost", 0)
+        t = threading.Thread(target=p.listen_and_serve_until_canceled)
+        t.start()
+        self.tf.defer(t, lambda thread: thread.join())
+        self.tf.defer(p.cancellation_token, lambda token: token.cancel())
+
+        self.port = p.get_actual_port()
+        LOGGER.debug(f"Listening on port {self.port}")
+        resp = requests.get(f"http://localhost:{self.port}")
+        assert resp.status_code == 200
+        LOGGER.debug(f"Done with portforward")
+
+
+class PodUtils:
+    READINESS_TIMEOUT = TestFrameConstants.TIMEOUT_2MIN
+
+    # consider using timeout_sampler
+    @staticmethod
+    def wait_for_pods_ready(
+            client: DynamicClient, namespace_name: str, label_selector: str, expect_pods_count: int
+    ) -> None:
+        """Wait for all pods in namespace to be ready
+        :param client:
+        :param namespace_name: name of the namespace
+        :param label_selector:
+        :param expect_pods_count:
+        """
+
+        # it's a dynamic client with the `resource` parameter already filled in
+        class ResourceType(kubernetes.dynamic.Resource, kubernetes.dynamic.DynamicClient):
+            pass
+
+        resource: ResourceType = client.resources.get(
+            kind=ocp_resources.pod.Pod.kind,
+            api_version=ocp_resources.pod.Pod.api_version,
+        )
+
+        def ready() -> bool:
+            pods = resource.get(namespace=namespace_name, label_selector=label_selector).items
+            if not pods and expect_pods_count == 0:
+                logging.debug("All expected Pods %s in Namespace %s are ready", label_selector, namespace_name)
+                return True
+            if not pods:
+                logging.debug("Pods matching %s/%s are not ready", namespace_name, label_selector)
+                return False
+            if len(pods) != expect_pods_count:
+                logging.debug("Expected Pods %s/%s are not ready", namespace_name, label_selector)
+                return False
+            pod: ResourceField
+            for pod in pods:
+                if not Readiness.is_pod_ready(pod) and not Readiness.is_pod_succeeded(pod):
+                    if not pod.status.containerStatuses:
+                        pod_status = pod.status
+                    else:
+                        pod_status = {cs.name: cs.state for cs in pod.status.containerStatuses}
+
+                    logging.debug("Pod is not ready: %s/%s (%s)",
+                                  namespace_name, pod.metadata.name, pod_status)
+                    return False
+                else:
+                    # check all containers in pods are ready
+                    for cs in pod.status.containerStatuses:
+                        if not (cs.ready or cs.state.get("terminated", {}).get("reason", "") == "Completed"):
+                            logging.debug(
+                                f"Container {cs.getName()} of Pod {namespace_name}/{pod.metadata.name} not ready ({cs.state=})"
+                            )
+                            return False
+            logging.info("Pods matching %s/%s are ready", namespace_name, label_selector)
+            return True
+
+        Wait.until(
+            description=f"readiness of all Pods matching {label_selector} in Namespace {namespace_name}",
+            poll_interval=TestFrameConstants.GLOBAL_POLL_INTERVAL_MEDIUM,
+            timeout=PodUtils.READINESS_TIMEOUT,
+            ready=ready,
+        )
+
+
+class Wait:
+    @staticmethod
+    def until(
+            description: str,
+            poll_interval: float,
+            timeout: float,
+            ready: Callable[[], bool],
+            on_timeout: Callable[[], None] | None = None,
+    ) -> None:
+        """For every poll (happening once each {@code pollIntervalMs}) checks if supplier {@code ready} is true.
+
+        If yes, the wait is closed. Otherwise, waits another {@code pollIntervalMs} and tries again.
+        Once the wait timeout (specified by {@code timeoutMs} is reached and supplier wasn't true until that time,
+        runs the {@code onTimeout} (f.e. print of logs, showing the actual value that was checked inside {@code ready}),
+        and finally throws {@link WaitException}.
+        @param description    information about on what we are waiting
+        @param pollIntervalMs poll interval in milliseconds
+        @param timeoutMs      timeout specified in milliseconds
+        @param ready          {@link BooleanSupplier} containing code, which should be executed each poll,
+                               verifying readiness of the particular thing
+        @param onTimeout      {@link Runnable} executed once timeout is reached and
+                               before the {@link WaitException} is thrown."""
+        logging.info("Waiting for: %s", description)
+        deadline = time.monotonic() + timeout
+
+        exception_message: str | None = None
+        previous_exception_message: str | None = None
+
+        # in case we are polling every 1s, we want to print exception after x tries, not on the first try
+        # for minutes poll interval will 2 be enough
+        exception_appearance_count: int = 2 if (poll_interval // 60) > 0 else max(int(timeout // poll_interval // 4), 2)
+        exception_count: int = 0
+        new_exception_appearance: int = 0
+
+        stack_trace_error: str | None = None
+
+        while True:
+            try:
+                result: bool = ready()
+            except KeyboardInterrupt:
+                raise  # quick exit if the user gets tired of waiting
+            except Exception as e:
+                exception_message = str(e)
+
+                exception_count += 1
+                new_exception_appearance += 1
+                if (
+                        exception_count == exception_appearance_count
+                        and exception_message is not None
+                        and exception_message == previous_exception_message
+                ):
+                    logging.info(f"While waiting for: {description} exception occurred: {exception_message}")
+                    # log the stacktrace
+                    stack_trace_error = traceback.format_exc()
+                elif (
+                        exception_message is not None
+                        and exception_message != previous_exception_message
+                        and new_exception_appearance == 2
+                ):
+                    previous_exception_message = exception_message
+
+                result = False
+
+            time_left: float = deadline - time.monotonic()
+            if result:
+                return
+            if time_left <= 0:
+                if exception_count > 1:
+                    logging.error("Exception waiting for: %s, %s", description, exception_message)
+
+                    if stack_trace_error is not None:
+                        # printing handled stacktrace
+                        logging.error(stack_trace_error)
+                if on_timeout is not None:
+                    on_timeout()
+                wait_exception: WaitException = WaitException(f"Timeout after {timeout} s waiting for {description}")
+                logging.error(wait_exception)
+                raise wait_exception
+
+            sleep_time: float = min(poll_interval, time_left)
+            time.sleep(sleep_time)  # noqa: FCN001
+
+
+class WaitException(Exception):
+    pass
+
+
+class Readiness:
+    @staticmethod
+    def is_pod_ready(pod: ResourceField) -> bool:
+        Utils.check_not_none(value=pod, message="Pod can't be null.")
+
+        condition = ocp_resources.pod.Pod.Condition.READY
+        status = ocp_resources.pod.Pod.Condition.Status.TRUE
+        for cond in pod.get("status", {}).get("conditions", []):
+            if cond["type"] == condition and cond["status"].casefold() == status.casefold():
+                return True
+        return False
+
+    @staticmethod
+    def is_pod_succeeded(pod: ResourceField) -> bool:
+        Utils.check_not_none(value=pod, message="Pod can't be null.")
+        return pod.status is not None and "Succeeded" == pod.status.phase
+
+
+class Utils:
+    @staticmethod
+    def check_not_none(value: Any, message: str) -> None:
+        if value is None:
+            raise ValueError(message)
+
+
[email protected]
+def exposing_contextmanager(
+        core_v1_api: kubernetes.client.CoreV1Api,
+        pod: kubernetes.client.models.V1Pod
+) -> Generator[socket, None, None]:
+    # If we e.g., specify the wrong port, the pf = portforward() call succeeds,
+    # but pf.connected will later flip to False
+    # we need to check that _everything_ works before moving on
+    pf = None
+    s = None
+    while not pf or not pf.connected or not s:
+        pf: kubernetes.stream.ws_client.PortForward = kubernetes.stream.portforward(
+            api_method=core_v1_api.connect_get_namespaced_pod_portforward,
+            name=pod.metadata.name,
+            namespace=pod.metadata.namespace,
+            ports=",".join(str(p) for p in [8888]),
+        )
+        s: typing.Union[kubernetes.stream.ws_client.PortForward._Port._Socket, socket.socket] | None = pf.socket(8888)
+    assert s, "Failed to establish connection"
+
+    try:
+        yield s
+    finally:
+        s.close()
+        pf.close()
+
+
[email protected](ocp_resources.namespace.Namespace.__init__)
+def create_namespace(privileged_client: bool = False, *args,
+                     **kwargs) -> ocp_resources.project_project_openshift_io.Project:
+    if not privileged_client:
+        with ocp_resources.project_request.ProjectRequest(*args, **kwargs):
+            project = ocp_resources.project_project_openshift_io.Project(*args, **kwargs)
+            project.wait_for_status(status=project.Status.ACTIVE, timeout=TestFrameConstants.TIMEOUT_2MIN)
+            return project
+    else:
+        with ocp_resources.namespace.Namespace(*args, **kwargs) as ns:
+            ns.wait_for_status(status=ocp_resources.namespace.Namespace.Status.ACTIVE,
+                               timeout=TestFrameConstants.TIMEOUT_2MIN)
+            return ns
+
+
+__all__ = [
+    get_client,
+    get_username,
+    exposing_contextmanager,
+    create_namespace,
+    PodUtils,
+    TestFrame,
+    TestFrameConstants,
+    ImageDeployment,
+]
diff --git c/tests/containers/socket_proxy.py i/tests/containers/socket_proxy.py
new file mode 100644
index 00000000..77fc3485
--- /dev/null
+++ i/tests/containers/socket_proxy.py
@@ -0,0 +1,173 @@
+from __future__ import annotations
+
+import contextlib
+import logging
+import socket
+import select
+import threading
+import subprocess
+import typing
+
+from tests.containers.cancellation_token import CancellationToken
+
+"""Proxies kubernetes portforwards to a local port.
+
+This is implemented as a thread running select() loop and managing the sockets.
+
+There are alternative implementations for this.
+
+1) Run oc port-forward in a subprocess
+* There isn't a nice way where kubectl would report in machine-readable way the
+  port number, kubernetes/kubectl#1190 (comment)
+2) Use the socket as is, mount a custom adaptor to the requests library
+* The code to do this is weird. This is what docker-py does w.r.t. the docker socket.
+  It defines a custom 'http+docker://' protocol, and an adaptor for it, that uses the docker socket.
+3) Implement proxy using asyncio
+* There are advantages to asyncio, but since we don't have Python asyncio anywhere else yet,
+  it is probably best to avoid using asyncio.
+
+Out of these, the oc port-forward subprocess is a decent alternative solution.
+"""
+
+class SubprocessProxy:
+    #
+    def __init__(self, namespace: str, name: str, port: int):
+        self.namespace = namespace
+        self.name = name
+        self.port = port
+
+    def start(self):
+        self.forwarder = subprocess.Popen(
+            ["kubectl", "port-forward", self.namespace, self.name],
+            text=True,
+        )
+        self.forwarder.communicate()
+
+    def stop(self):
+        self.forwarder.terminate()
+
+
+class SocketProxy:
+    def __init__(
+            self,
+            remote_socket_factory: typing.ContextManager[socket.socket],
+            local_host: str = "localhost",
+            local_port: int = 0,
+            buffer_size: int = 4096
+    ) -> None:
+        """
+
+        :param local_host: probably "localhost" would make most sense here
+        :param local_port: usually leave as to 0, which will make the OS choose a free port
+        :param remote_socket_factory: this is a context manager for kubernetes port forwarding
+        :param buffer_size: do not poke it, leave this at the default value
+        """
+        self.local_host = local_host
+        self.local_port = local_port
+        self.buffer_size = buffer_size
+        self.remote_socket_factory = remote_socket_factory
+
+        self.cancellation_token = CancellationToken()
+
+        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.server_socket.bind((self.local_host, self.local_port))
+        self.server_socket.listen(1)
+        logging.info(f"Proxy listening on {self.local_host}:{self.local_port}")
+
+    def listen_and_serve_until_canceled(self):
+        """Accepts the client, creates a new socket to the remote, and proxies the data.
+
+        Handles at most one client at a time. """
+        try:
+            while not self.cancellation_token.cancelled:
+                client_socket, addr = self.server_socket.accept()
+                logging.info(f"Accepted connection from {addr[0]}:{addr[1]}")
+                self._handle_client(client_socket)
+        except Exception as e:
+            logging.exception(f"Proxying failed to listen", exc_info=e)
+            raise
+        finally:
+            self.server_socket.close()
+
+    def get_actual_port(self) -> int:
+        """Returns the port that the proxy is listening on.
+        When port number 0 was passed in, this will return the actual randomly assigned port."""
+        return self.server_socket.getsockname()[1]
+
+    def _handle_client(self, client_socket):
+        with client_socket as _, self.remote_socket_factory as remote_socket:
+            while True:
+                readable, _, _ = select.select([client_socket, remote_socket, self.cancellation_token], [], [])
+
+                if self.cancellation_token.cancelled:
+                    break
+
+                if client_socket in readable:
+                    data = client_socket.recv(self.buffer_size)
+                    if not data:
+                        break
+                    remote_socket.send(data)
+
+                if remote_socket in readable:
+                    data = remote_socket.recv(self.buffer_size)
+                    if not data:
+                        break
+                    client_socket.send(data)
+
+
+if __name__ == "__main__":
+    """Sample application to show how this can work."""
+
+
+    @contextlib.contextmanager
+    def remote_socket_factory():
+        class MockServer(threading.Thread):
+            def __init__(self, local_host: str = "localhost", local_port: int = 0):
+                self.local_host = local_host
+                self.local_port = local_port
+
+                self.is_socket_bound = threading.Event()
+
+                super().__init__()
+
+            def run(self):
+                self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+                self.server_socket.bind((self.local_host, self.local_port))
+                self.server_socket.listen(1)
+                print(f"MockServer listening on {self.local_host}:{self.local_port}")
+                self.is_socket_bound.set()
+
+                client_socket, addr = self.server_socket.accept()
+                logging.info(f"MockServer accepted connection from {addr[0]}:{addr[1]}")
+
+                client_socket.send(b"Hello World\n")
+                client_socket.close()
+
+            def get_actual_port(self):
+                self.is_socket_bound.wait()
+                return self.server_socket.getsockname()[1]
+
+        server = MockServer()
+        server.start()
+
+        client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        client_socket.connect(("localhost", server.get_actual_port()))
+
+        yield client_socket
+
+        client_socket.close()
+        server.join()
+
+
+    proxy = SocketProxy(remote_socket_factory(), "localhost", 0)
+    thread = threading.Thread(target=proxy.listen_and_serve_until_canceled)
+    thread.start()
+
+    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    client_socket.connect(("localhost", proxy.get_actual_port()))
+
+    print(client_socket.recv(1024))  # prints Hello World
+
+    thread.join()
diff --git c/tests/containers/workbenches/workbench_image_test.py i/tests/containers/workbenches/workbench_image_test.py
index cbfb7dae..892f775c 100644
--- c/tests/containers/workbenches/workbench_image_test.py
+++ i/tests/containers/workbenches/workbench_image_test.py
@@ -21,7 +21,7 @@ import testcontainers.core.waiting_utils
 import pytest
 import pytest_subtests

-from tests.containers import docker_utils, podman_machine_utils
+from tests.containers import docker_utils, podman_machine_utils, kubernetes_utils

 class TestWorkbenchImage:
@@ -108,6 +108,18 @@ class TestWorkbenchImage:
         finally:
             docker_utils.NotebookContainer(container).stop(timeout=0)

+    @pytest.mark.openshift
+    def test_image_run_on_openshift(self, image: str):
+        skip_if_not_workbench_image(image)
+
+        client = kubernetes_utils.get_client()
+        print(client)
+
+        username = kubernetes_utils.get_username(client)
+        print(username)
+
+        with kubernetes_utils.ImageDeployment(client, image) as image:
+            image.deploy(container_name="notebook-tests-pod")

 class WorkbenchContainer(testcontainers.core.container.DockerContainer):
     @functools.wraps(testcontainers.core.container.DockerContainer.__init__)
@openshift-ci openshift-ci bot added size/xxl and removed size/xxl labels Feb 25, 2025
@jiridanek jiridanek added the tide/merge-method-squash Denotes a PR that should be squashed by tide when it merges. label Feb 25, 2025
@openshift-ci openshift-ci bot added size/xxl and removed size/xxl labels Feb 25, 2025
@jstourac
Copy link
Member

/lgtm

@jiridanek
Copy link
Member Author

thanks for your help
/override ci/prow/images
/approve

Copy link
Contributor

openshift-ci bot commented Feb 25, 2025

@jiridanek: Overrode contexts on behalf of jiridanek: ci/prow/images

In response to this:

thanks for your help
/override ci/prow/images
/approve

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

Copy link
Contributor

openshift-ci bot commented Feb 25, 2025

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: jiridanek

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@openshift-merge-bot openshift-merge-bot bot merged commit d954ab6 into opendatahub-io:main Feb 25, 2025
7 checks passed
@jiridanek jiridanek deleted the jd_ocp_pytest branch February 25, 2025 15:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved lgtm ok-to-test size/xxl tide/merge-method-squash Denotes a PR that should be squashed by tide when it merges.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants