Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
RHOAIENG-18848: chore(tests/containers): initial test that runs conta…
…iner on openshift/kubernetes * install local-path provisioner on kubernetes in github actions * more careful printing of pod status in case `containerStatuses == None` * sort out how we want to work with privileged/unprivileged client * only run the new test if we have kubernetes around * add pod waiting and port forwarding utils diff --git c/.github/workflows/build-notebooks-TEMPLATE.yaml i/.github/workflows/build-notebooks-TEMPLATE.yaml index 8a98aa21..13507b78 100644 --- c/.github/workflows/build-notebooks-TEMPLATE.yaml +++ i/.github/workflows/build-notebooks-TEMPLATE.yaml @@ -290,10 +290,10 @@ jobs: - name: Install deps run: poetry install --sync - - name: Run container tests (in PyTest) + - name: Run Testcontainers container tests (in PyTest) run: | set -Eeuxo pipefail - poetry run pytest --capture=fd tests/containers --image="${{ steps.calculated_vars.outputs.OUTPUT_IMAGE }}" + poetry run pytest --capture=fd tests/containers -m 'not openshift' --image="${{ steps.calculated_vars.outputs.OUTPUT_IMAGE }}" env: DOCKER_HOST: "unix:///var/run/podman/podman.sock" TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: "/var/run/podman/podman.sock" @@ -439,6 +439,16 @@ jobs: kubectl wait deployments --all --all-namespaces --for=condition=Available --timeout=100s kubectl wait pods --all --all-namespaces --for=condition=Ready --timeout=100s + - name: "Install local-path provisioner" + if: ${{ steps.have-tests.outputs.tests == 'true' }} + run: | + set -Eeuxo pipefail + kubectl apply -f https://raw.githubusercontent.com/rancher/local-path-provisioner/v0.0.31/deploy/local-path-storage.yaml + kubectl wait deployments --all --namespace=local-path-storage --for=condition=Available --timeout=100s + # https://kubernetes.io/docs/tasks/administer-cluster/change-default-storage-class/ + kubectl get storageclass + kubectl patch storageclass local-path -p '{"metadata": {"annotations":{"storageclass.kubernetes.io/is-default-class":"true"}}}' + - name: "Run image tests" if: ${{ steps.have-tests.outputs.tests == 'true' }} run: python3 ci/cached-builds/make_test.py --target ${{ inputs.target }} @@ -449,6 +459,18 @@ jobs: # endregion + - name: Run OpenShift container tests (in PyTest) + if: ${{ steps.have-tests.outputs.tests == 'true' }} + run: | + set -Eeuxo pipefail + poetry run pytest --capture=fd tests/containers -m 'openshift' --image="${{ steps.calculated_vars.outputs.OUTPUT_IMAGE }}" + env: + # TODO(jdanek): this Testcontainers stuff should not be necessary but currently it has to be there + DOCKER_HOST: "unix:///var/run/podman/podman.sock" + TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: "/var/run/podman/podman.sock" + # pulling the Ryuk container from docker.io introduces CI flakiness + TESTCONTAINERS_RYUK_DISABLED: "true" + # region Trivy vulnerability scan - name: Run Trivy vulnerability scanner diff --git c/README.md i/README.md index 961e5907..22703ac0 100644 --- c/README.md +++ i/README.md @@ -105,7 +105,7 @@ sudo dnf install podman systemctl --user start podman.service systemctl --user status podman.service systemctl --user status podman.socket -DOCKER_HOST=unix:///run/user/$UID/podman/podman.sock poetry run pytest tests/containers --image quay.io/opendatahub/workbench-images@sha256:e98d19df346e7abb1fa3053f6d41f0d1fa9bab39e49b4cb90b510ca33452c2e4 +DOCKER_HOST=unix:///run/user/$UID/podman/podman.sock poetry run pytest tests/containers -m 'not openshift' --image quay.io/opendatahub/workbench-images@sha256:e98d19df346e7abb1fa3053f6d41f0d1fa9bab39e49b4cb90b510ca33452c2e4 # Mac OS brew install podman @@ -113,7 +113,7 @@ podman machine init podman machine set --rootful sudo podman-mac-helper install podman machine start -poetry run pytest tests/containers --image quay.io/opendatahub/workbench-images@sha256:e98d19df346e7abb1fa3053f6d41f0d1fa9bab39e49b4cb90b510ca33452c2e4 +poetry run pytest tests/containers -m 'not openshift' --image quay.io/opendatahub/workbench-images@sha256:e98d19df346e7abb1fa3053f6d41f0d1fa9bab39e49b4cb90b510ca33452c2e4 ``` When using lima on macOS, it might be useful to give yourself access to rootful podman socket diff --git c/pyproject.toml i/pyproject.toml index 9271a4c3..6440b123 100644 --- c/pyproject.toml +++ i/pyproject.toml @@ -8,6 +8,7 @@ package-mode = false [tool.poetry.dependencies] python = "~3.12" +requests = "^2.32.3" [tool.poetry.group.dev.dependencies] diff --git c/pytest.ini i/pytest.ini index 2b320d7a..aff25089 100644 --- c/pytest.ini +++ i/pytest.ini @@ -15,3 +15,5 @@ log_cli_level = INFO log_file = logs/pytest-logs.txt log_file_level = DEBUG + +markers = openshift diff --git c/tests/containers/base_image_test.py i/tests/containers/base_image_test.py index 03f3d9ae..b7e00498 100644 --- c/tests/containers/base_image_test.py +++ i/tests/containers/base_image_test.py @@ -11,12 +11,13 @@ import tempfile import textwrap from typing import TYPE_CHECKING, Any, Callable -import pytest import testcontainers.core.container import testcontainers.core.waiting_utils from tests.containers import docker_utils +import pytest + logging.basicConfig(level=logging.DEBUG) LOGGER = logging.getLogger(__name__) @@ -72,7 +73,8 @@ class TestBaseImage: if "not found" in line: unsatisfied_deps.append((dlib, line.strip())) assert output - print("OUTPUT>", json.dumps({"dir": path, "count_scanned": count_scanned, "unsatisfied": unsatisfied_deps})) + print("OUTPUT>", + json.dumps({"dir": path, "count_scanned": count_scanned, "unsatisfied": unsatisfied_deps})) try: container.start() @@ -105,18 +107,7 @@ class TestBaseImage: with subtests.test(f"{dlib=}"): pytest.fail(f"{dlib=} has unsatisfied dependencies {deps=}") - def test_oc_command_runs(self, image: str): - container = testcontainers.core.container.DockerContainer(image=image, user=23456, group_add=[0]) - container.with_command("/bin/sh -c 'sleep infinity'") - try: - container.start() - ecode, output = container.exec(["/bin/sh", "-c", "oc version"]) - finally: - docker_utils.NotebookContainer(container).stop(timeout=0) - - logging.debug(output.decode()) - assert ecode == 0 - + # @pytest.mark.environmentss("docker") def test_oc_command_runs_fake_fips(self, image: str, subtests: pytest_subtests.SubTests): """Establishes a best-effort fake FIPS environment and attempts to execute `oc` binary in it. @@ -190,7 +181,8 @@ class TestBaseImage: docker_utils.NotebookContainer(container).stop(timeout=0) -def encode_python_function_execution_command_interpreter(python: str, function: Callable[..., Any], *args: list[Any]) -> list[str]: +def encode_python_function_execution_command_interpreter(python: str, function: Callable[..., Any], *args: list[Any]) -> \ + list[str]: """Returns a cli command that will run the given Python function encoded inline. All dependencies (imports, ...) must be part of function body.""" code = textwrap.dedent(inspect.getsource(function)) diff --git c/tests/containers/cancellation_token.py i/tests/containers/cancellation_token.py new file mode 100644 index 00000000..d7d62603 --- /dev/null +++ i/tests/containers/cancellation_token.py @@ -0,0 +1,37 @@ +import os +import threading + + +class CancellationToken: + """Flag to signal a thread it should cancel itself. + This cooperative cancellation pattern is commonly used in c# and go + See https://learn.microsoft.com/en-us/dotnet/api/system.threading.cancellationtoken?view=net-9.0 + """ + + def __init__(self): + # consider using the wrapt.synchronized decorator + # https://github.com/GrahamDumpleton/wrapt/blob/develop/blog/07-the-missing-synchronized-decorator.md + self._lock = threading.Lock() + self._canceled = False + # something selectable avoids having to use short timeout in select + self._read_fd, self._write_fd = os.pipe() + + def fileno(self): + """This lets us use the token in select() calls""" + return self._read_fd + + @Property + def cancelled(self): + with self._lock: + return self._canceled + + def cancel(self): + with self._lock: + os.write(self._write_fd, b'x') + self._canceled = True + + def __del__(self): + # consider https://docs.python.org/3/library/weakref.html#weakref.finalize + with self._lock: + os.close(self._read_fd) + os.close(self._write_fd) diff --git c/tests/containers/kubernetes_utils.py i/tests/containers/kubernetes_utils.py new file mode 100644 index 00000000..66829829 --- /dev/null +++ i/tests/containers/kubernetes_utils.py @@ -0,0 +1,473 @@ +from __future__ import annotations + +import contextlib +import functools +import logging +import threading +import time +import traceback +import typing +import socket +from socket import socket +from typing import Any, Callable, Generator + +import requests + +import kubernetes +import kubernetes.dynamic.exceptions +import kubernetes.stream.ws_client +import kubernetes.dynamic.exceptions +import kubernetes.stream.ws_client +import kubernetes.client.api.core_v1_api +from kubernetes.dynamic import DynamicClient, ResourceField + +import ocp_resources.pod +import ocp_resources.deployment +import ocp_resources.service +import ocp_resources.persistent_volume_claim +import ocp_resources.project_request +import ocp_resources.namespace +import ocp_resources.project_project_openshift_io +import ocp_resources.deployment +import ocp_resources.resource +import ocp_resources.pod +import ocp_resources.namespace +import ocp_resources.project_project_openshift_io +import ocp_resources.project_request + +from tests.containers import socket_proxy + + +class TestFrameConstants: + GLOBAL_POLL_INTERVAL_MEDIUM = 10 + TIMEOUT_2MIN = 2 * 60 + + +logging.basicConfig(level=logging.DEBUG) +LOGGER = logging.getLogger(__name__) + + +# https://github.com/RedHatQE/openshift-python-wrapper/tree/main/examples + +def get_client() -> kubernetes.dynamic.DynamicClient: + try: + # client = kubernetes.dynamic.DynamicClient(client=kubernetes.config.new_client_from_config()) + # probably same as above + client = ocp_resources.resource.get_client() + return client + except kubernetes.config.ConfigException as e: + # probably bad config + logging.error(e) + except kubernetes.dynamic.exceptions.UnauthorizedError as e: + # wrong or expired credentials + logging.error(e) + except kubernetes.client.ApiException as e: + # unexpected, we catch unauthorized above + logging.error(e) + except Exception as e: + # unexpected error, assert here + logging.error(e) + + raise RuntimeError("Failed to instantiate client") + + +def get_username(client: kubernetes.dynamic.DynamicClient) -> str: + # can't just access + # > client.configuration.username + # because we normally auth using tokens, not username and password + + # this is what kubectl does (see kubectl -v8 auth whoami) + self_subject_review_resource: kubernetes.dynamic.Resource = client.resources.get( + api_version="authentication.k8s.io/v1", kind="SelfSubjectReview" + ) + self_subject_review: kubernetes.dynamic.ResourceInstance = client.create(self_subject_review_resource) + username: str = self_subject_review.status.userInfo.username + return username + + +class TestKubernetesUtils: + def test_get_username(self): + client = get_client() + username = get_username(client) + assert username is not None and len(username) > 0 + + +class TestFrame: + def __init__[T](self): + self.stack: list[tuple[T, Callable[[T], None] | None]] = [] + + def defer_resource[T: ocp_resources.resource.Resource](self, resource: T, wait=False, + destructor: Callable[[T], None] | None = None) -> T: + result = resource.deploy(wait=wait) + self.defer(resource, destructor) + return result + + def add[T](self, resource: T, destructor: Callable[[T], None] = None) -> T: + self.defer(resource, destructor) + return resource + + def defer[T](self, resource: T, destructor: Callable[[T], None] = None) -> T: + self.stack.append((resource, destructor)) + + def destroy(self, wait=False): + while self.stack: + resource, destructor = self.stack.pop() + if destructor is not None: + destructor(resource) + else: + resource.clean_up(wait=wait) + + def __enter__(self) -> TestFrame: + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.destroy(wait=True) + + +class ImageDeployment: + def __init__(self, client: kubernetes.dynamic.DynamicClient, image: str): + self.client = client + self.image = image + self.tf = TestFrame() + + def __enter__(self) -> ImageDeployment: + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.tf.destroy() + + def deploy(self, container_name: str) -> None: + LOGGER.debug(f"Deploying {self.image}") + # custom namespace is necessary, because we cannot assign a SCC to pods created in one of the default namespaces: + # default, kube-system, kube-public, openshift-node, openshift-infra, openshift. + # https://docs.openshift.com/container-platform/4.17/authentication/managing-security-context-constraints.html#role-based-access-to-ssc_configuring-internal-oauth + + # TODO(jdanek): sort out how we want to work with privileged/unprivileged client + # take inspiration from odh-tests + ns = create_namespace(privileged_client=True, name=f"test-ns-{container_name}") + self.tf.defer_resource(ns) + + pvc = ocp_resources.persistent_volume_claim.PersistentVolumeClaim( + name=container_name, + namespace=ns.name, + accessmodes=ocp_resources.persistent_volume_claim.PersistentVolumeClaim.AccessMode.RWO, + volume_mode=ocp_resources.persistent_volume_claim.PersistentVolumeClaim.VolumeMode.FILE, + size="1Gi", + ) + self.tf.defer_resource(pvc, wait=True) + deployment = ocp_resources.deployment.Deployment( + client=self.client, + name=container_name, + namespace=ns.name, + selector={"matchLabels": {"app": container_name}}, + replicas=1, + template={ + "metadata": { + "annotations": { + # This will result in the container spec having something like below, + # regardless of what kind of namespace this is being run in. + # Keep in mind that `default` is a privileged namespace and this annotation has no effect there. + # ``` + # spec: + # securityContext: + # seLinuxOptions: + # level: 's0:c34,c4' + # fsGroup: 1001130000 + # seccompProfile: + # type: RuntimeDefault + # ``` + "openshift.io/scc": "restricted-v2" + }, + "labels": { + "app": container_name, + } + }, + "spec": { + "containers": [ + { + "name": container_name, + "image": self.image, + # "command": ["/bin/sh", "-c", "while true ; do date; sleep 5; done;"], + "ports": [ + { + "containerPort": 8888, + "name": "notebook-port", + "protocol": "TCP", + } + ], + # rstudio will not start without its volume mount and it does not log the error for it + # See the testcontainers implementation of this (the tty=True part) + "volumeMounts": [ + { + "mountPath": "/opt/app-root/src", + "name": "my-workbench" + } + ], + }, + ], + "volumes": [ + { + "name": "my-workbench", + "persistentVolumeClaim": { + "claimName": container_name, + } + } + ] + } + } + ) + self.tf.defer_resource(deployment) + LOGGER.debug(f"Waiting for pods to become ready...") + PodUtils.wait_for_pods_ready(self.client, namespace_name=ns.name, label_selector=f"app={container_name}", + expect_pods_count=1) + + core_v1_api = kubernetes.client.api.core_v1_api.CoreV1Api(api_client=self.client.client) + pod_name: kubernetes.client.models.v1_pod_list.V1PodList = core_v1_api.list_namespaced_pod( + namespace=ns.name, + label_selector=f"app={container_name}" + ) + assert len(pod_name.items) == 1 + pod: kubernetes.client.models.v1_pod.V1Pod = pod_name.items[0] + + p = socket_proxy.SocketProxy(exposing_contextmanager(core_v1_api, pod), "localhost", 0) + t = threading.Thread(target=p.listen_and_serve_until_canceled) + t.start() + self.tf.defer(t, lambda thread: thread.join()) + self.tf.defer(p.cancellation_token, lambda token: token.cancel()) + + self.port = p.get_actual_port() + LOGGER.debug(f"Listening on port {self.port}") + resp = requests.get(f"http://localhost:{self.port}") + assert resp.status_code == 200 + LOGGER.debug(f"Done with portforward") + + +class PodUtils: + READINESS_TIMEOUT = TestFrameConstants.TIMEOUT_2MIN + + # consider using timeout_sampler + @staticmethod + def wait_for_pods_ready( + client: DynamicClient, namespace_name: str, label_selector: str, expect_pods_count: int + ) -> None: + """Wait for all pods in namespace to be ready + :param client: + :param namespace_name: name of the namespace + :param label_selector: + :param expect_pods_count: + """ + + # it's a dynamic client with the `resource` parameter already filled in + class ResourceType(kubernetes.dynamic.Resource, kubernetes.dynamic.DynamicClient): + pass + + resource: ResourceType = client.resources.get( + kind=ocp_resources.pod.Pod.kind, + api_version=ocp_resources.pod.Pod.api_version, + ) + + def ready() -> bool: + pods = resource.get(namespace=namespace_name, label_selector=label_selector).items + if not pods and expect_pods_count == 0: + logging.debug("All expected Pods %s in Namespace %s are ready", label_selector, namespace_name) + return True + if not pods: + logging.debug("Pods matching %s/%s are not ready", namespace_name, label_selector) + return False + if len(pods) != expect_pods_count: + logging.debug("Expected Pods %s/%s are not ready", namespace_name, label_selector) + return False + pod: ResourceField + for pod in pods: + if not Readiness.is_pod_ready(pod) and not Readiness.is_pod_succeeded(pod): + if not pod.status.containerStatuses: + pod_status = pod.status + else: + pod_status = {cs.name: cs.state for cs in pod.status.containerStatuses} + + logging.debug("Pod is not ready: %s/%s (%s)", + namespace_name, pod.metadata.name, pod_status) + return False + else: + # check all containers in pods are ready + for cs in pod.status.containerStatuses: + if not (cs.ready or cs.state.get("terminated", {}).get("reason", "") == "Completed"): + logging.debug( + f"Container {cs.getName()} of Pod {namespace_name}/{pod.metadata.name} not ready ({cs.state=})" + ) + return False + logging.info("Pods matching %s/%s are ready", namespace_name, label_selector) + return True + + Wait.until( + description=f"readiness of all Pods matching {label_selector} in Namespace {namespace_name}", + poll_interval=TestFrameConstants.GLOBAL_POLL_INTERVAL_MEDIUM, + timeout=PodUtils.READINESS_TIMEOUT, + ready=ready, + ) + + +class Wait: + @staticmethod + def until( + description: str, + poll_interval: float, + timeout: float, + ready: Callable[[], bool], + on_timeout: Callable[[], None] | None = None, + ) -> None: + """For every poll (happening once each {@code pollIntervalMs}) checks if supplier {@code ready} is true. + + If yes, the wait is closed. Otherwise, waits another {@code pollIntervalMs} and tries again. + Once the wait timeout (specified by {@code timeoutMs} is reached and supplier wasn't true until that time, + runs the {@code onTimeout} (f.e. print of logs, showing the actual value that was checked inside {@code ready}), + and finally throws {@link WaitException}. + @param description information about on what we are waiting + @param pollIntervalMs poll interval in milliseconds + @param timeoutMs timeout specified in milliseconds + @param ready {@link BooleanSupplier} containing code, which should be executed each poll, + verifying readiness of the particular thing + @param onTimeout {@link Runnable} executed once timeout is reached and + before the {@link WaitException} is thrown.""" + logging.info("Waiting for: %s", description) + deadline = time.monotonic() + timeout + + exception_message: str | None = None + previous_exception_message: str | None = None + + # in case we are polling every 1s, we want to print exception after x tries, not on the first try + # for minutes poll interval will 2 be enough + exception_appearance_count: int = 2 if (poll_interval // 60) > 0 else max(int(timeout // poll_interval // 4), 2) + exception_count: int = 0 + new_exception_appearance: int = 0 + + stack_trace_error: str | None = None + + while True: + try: + result: bool = ready() + except KeyboardInterrupt: + raise # quick exit if the user gets tired of waiting + except Exception as e: + exception_message = str(e) + + exception_count += 1 + new_exception_appearance += 1 + if ( + exception_count == exception_appearance_count + and exception_message is not None + and exception_message == previous_exception_message + ): + logging.info(f"While waiting for: {description} exception occurred: {exception_message}") + # log the stacktrace + stack_trace_error = traceback.format_exc() + elif ( + exception_message is not None + and exception_message != previous_exception_message + and new_exception_appearance == 2 + ): + previous_exception_message = exception_message + + result = False + + time_left: float = deadline - time.monotonic() + if result: + return + if time_left <= 0: + if exception_count > 1: + logging.error("Exception waiting for: %s, %s", description, exception_message) + + if stack_trace_error is not None: + # printing handled stacktrace + logging.error(stack_trace_error) + if on_timeout is not None: + on_timeout() + wait_exception: WaitException = WaitException(f"Timeout after {timeout} s waiting for {description}") + logging.error(wait_exception) + raise wait_exception + + sleep_time: float = min(poll_interval, time_left) + time.sleep(sleep_time) # noqa: FCN001 + + +class WaitException(Exception): + pass + + +class Readiness: + @staticmethod + def is_pod_ready(pod: ResourceField) -> bool: + Utils.check_not_none(value=pod, message="Pod can't be null.") + + condition = ocp_resources.pod.Pod.Condition.READY + status = ocp_resources.pod.Pod.Condition.Status.TRUE + for cond in pod.get("status", {}).get("conditions", []): + if cond["type"] == condition and cond["status"].casefold() == status.casefold(): + return True + return False + + @staticmethod + def is_pod_succeeded(pod: ResourceField) -> bool: + Utils.check_not_none(value=pod, message="Pod can't be null.") + return pod.status is not None and "Succeeded" == pod.status.phase + + +class Utils: + @staticmethod + def check_not_none(value: Any, message: str) -> None: + if value is None: + raise ValueError(message) + + [email protected] +def exposing_contextmanager( + core_v1_api: kubernetes.client.CoreV1Api, + pod: kubernetes.client.models.V1Pod +) -> Generator[socket, None, None]: + # If we e.g., specify the wrong port, the pf = portforward() call succeeds, + # but pf.connected will later flip to False + # we need to check that _everything_ works before moving on + pf = None + s = None + while not pf or not pf.connected or not s: + pf: kubernetes.stream.ws_client.PortForward = kubernetes.stream.portforward( + api_method=core_v1_api.connect_get_namespaced_pod_portforward, + name=pod.metadata.name, + namespace=pod.metadata.namespace, + ports=",".join(str(p) for p in [8888]), + ) + s: typing.Union[kubernetes.stream.ws_client.PortForward._Port._Socket, socket.socket] | None = pf.socket(8888) + assert s, "Failed to establish connection" + + try: + yield s + finally: + s.close() + pf.close() + + [email protected](ocp_resources.namespace.Namespace.__init__) +def create_namespace(privileged_client: bool = False, *args, + **kwargs) -> ocp_resources.project_project_openshift_io.Project: + if not privileged_client: + with ocp_resources.project_request.ProjectRequest(*args, **kwargs): + project = ocp_resources.project_project_openshift_io.Project(*args, **kwargs) + project.wait_for_status(status=project.Status.ACTIVE, timeout=TestFrameConstants.TIMEOUT_2MIN) + return project + else: + with ocp_resources.namespace.Namespace(*args, **kwargs) as ns: + ns.wait_for_status(status=ocp_resources.namespace.Namespace.Status.ACTIVE, + timeout=TestFrameConstants.TIMEOUT_2MIN) + return ns + + +__all__ = [ + get_client, + get_username, + exposing_contextmanager, + create_namespace, + PodUtils, + TestFrame, + TestFrameConstants, + ImageDeployment, +] diff --git c/tests/containers/socket_proxy.py i/tests/containers/socket_proxy.py new file mode 100644 index 00000000..77fc3485 --- /dev/null +++ i/tests/containers/socket_proxy.py @@ -0,0 +1,173 @@ +from __future__ import annotations + +import contextlib +import logging +import socket +import select +import threading +import subprocess +import typing + +from tests.containers.cancellation_token import CancellationToken + +"""Proxies kubernetes portforwards to a local port. + +This is implemented as a thread running select() loop and managing the sockets. + +There are alternative implementations for this. + +1) Run oc port-forward in a subprocess +* There isn't a nice way where kubectl would report in machine-readable way the + port number, kubernetes/kubectl#1190 (comment) +2) Use the socket as is, mount a custom adaptor to the requests library +* The code to do this is weird. This is what docker-py does w.r.t. the docker socket. + It defines a custom 'http+docker://' protocol, and an adaptor for it, that uses the docker socket. +3) Implement proxy using asyncio +* There are advantages to asyncio, but since we don't have Python asyncio anywhere else yet, + it is probably best to avoid using asyncio. + +Out of these, the oc port-forward subprocess is a decent alternative solution. +""" + +class SubprocessProxy: + # + def __init__(self, namespace: str, name: str, port: int): + self.namespace = namespace + self.name = name + self.port = port + + def start(self): + self.forwarder = subprocess.Popen( + ["kubectl", "port-forward", self.namespace, self.name], + text=True, + ) + self.forwarder.communicate() + + def stop(self): + self.forwarder.terminate() + + +class SocketProxy: + def __init__( + self, + remote_socket_factory: typing.ContextManager[socket.socket], + local_host: str = "localhost", + local_port: int = 0, + buffer_size: int = 4096 + ) -> None: + """ + + :param local_host: probably "localhost" would make most sense here + :param local_port: usually leave as to 0, which will make the OS choose a free port + :param remote_socket_factory: this is a context manager for kubernetes port forwarding + :param buffer_size: do not poke it, leave this at the default value + """ + self.local_host = local_host + self.local_port = local_port + self.buffer_size = buffer_size + self.remote_socket_factory = remote_socket_factory + + self.cancellation_token = CancellationToken() + + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.bind((self.local_host, self.local_port)) + self.server_socket.listen(1) + logging.info(f"Proxy listening on {self.local_host}:{self.local_port}") + + def listen_and_serve_until_canceled(self): + """Accepts the client, creates a new socket to the remote, and proxies the data. + + Handles at most one client at a time. """ + try: + while not self.cancellation_token.cancelled: + client_socket, addr = self.server_socket.accept() + logging.info(f"Accepted connection from {addr[0]}:{addr[1]}") + self._handle_client(client_socket) + except Exception as e: + logging.exception(f"Proxying failed to listen", exc_info=e) + raise + finally: + self.server_socket.close() + + def get_actual_port(self) -> int: + """Returns the port that the proxy is listening on. + When port number 0 was passed in, this will return the actual randomly assigned port.""" + return self.server_socket.getsockname()[1] + + def _handle_client(self, client_socket): + with client_socket as _, self.remote_socket_factory as remote_socket: + while True: + readable, _, _ = select.select([client_socket, remote_socket, self.cancellation_token], [], []) + + if self.cancellation_token.cancelled: + break + + if client_socket in readable: + data = client_socket.recv(self.buffer_size) + if not data: + break + remote_socket.send(data) + + if remote_socket in readable: + data = remote_socket.recv(self.buffer_size) + if not data: + break + client_socket.send(data) + + +if __name__ == "__main__": + """Sample application to show how this can work.""" + + + @contextlib.contextmanager + def remote_socket_factory(): + class MockServer(threading.Thread): + def __init__(self, local_host: str = "localhost", local_port: int = 0): + self.local_host = local_host + self.local_port = local_port + + self.is_socket_bound = threading.Event() + + super().__init__() + + def run(self): + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.bind((self.local_host, self.local_port)) + self.server_socket.listen(1) + print(f"MockServer listening on {self.local_host}:{self.local_port}") + self.is_socket_bound.set() + + client_socket, addr = self.server_socket.accept() + logging.info(f"MockServer accepted connection from {addr[0]}:{addr[1]}") + + client_socket.send(b"Hello World\n") + client_socket.close() + + def get_actual_port(self): + self.is_socket_bound.wait() + return self.server_socket.getsockname()[1] + + server = MockServer() + server.start() + + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client_socket.connect(("localhost", server.get_actual_port())) + + yield client_socket + + client_socket.close() + server.join() + + + proxy = SocketProxy(remote_socket_factory(), "localhost", 0) + thread = threading.Thread(target=proxy.listen_and_serve_until_canceled) + thread.start() + + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client_socket.connect(("localhost", proxy.get_actual_port())) + + print(client_socket.recv(1024)) # prints Hello World + + thread.join() diff --git c/tests/containers/workbenches/workbench_image_test.py i/tests/containers/workbenches/workbench_image_test.py index cbfb7dae..892f775c 100644 --- c/tests/containers/workbenches/workbench_image_test.py +++ i/tests/containers/workbenches/workbench_image_test.py @@ -21,7 +21,7 @@ import testcontainers.core.waiting_utils import pytest import pytest_subtests -from tests.containers import docker_utils, podman_machine_utils +from tests.containers import docker_utils, podman_machine_utils, kubernetes_utils class TestWorkbenchImage: @@ -108,6 +108,18 @@ class TestWorkbenchImage: finally: docker_utils.NotebookContainer(container).stop(timeout=0) + @pytest.mark.openshift + def test_image_run_on_openshift(self, image: str): + skip_if_not_workbench_image(image) + + client = kubernetes_utils.get_client() + print(client) + + username = kubernetes_utils.get_username(client) + print(username) + + with kubernetes_utils.ImageDeployment(client, image) as image: + image.deploy(container_name="notebook-tests-pod") class WorkbenchContainer(testcontainers.core.container.DockerContainer): @functools.wraps(testcontainers.core.container.DockerContainer.__init__)
- Loading branch information