From 046dcf7b20a852584247aa256a0baec915f75441 Mon Sep 17 00:00:00 2001 From: Anton Myagkov Date: Mon, 21 Oct 2024 17:26:08 +0200 Subject: [PATCH] Implement (un)stage/(un)publish volume according csi spec for block mode (#2269) doc: https://github.com/ydb-platform/nbs/blob/main/cloud/blockstore/tools/csi_driver/stage-publish-unpublish-unstage-flow.md 1. Implement stage/publish volume according csi spec for block mode 2. Split CSI Driver tests into two targets: csi_sanity tests and e2e tests 3. Parametrize e2e tests with block mode --- .../tests/csi_driver/csi_sanity_tests/test.py | 54 +++ .../tests/csi_driver/csi_sanity_tests/ya.make | 36 ++ .../tests/csi_driver/e2e_tests/test.py | 197 +++++++++ .../tests/csi_driver/e2e_tests/ya.make | 36 ++ .../tests/csi_driver/lib/__init__.py | 0 .../csi_driver/{test.py => lib/csi_runner.py} | 377 ++++-------------- cloud/blockstore/tests/csi_driver/lib/ya.make | 16 + cloud/blockstore/tests/csi_driver/ya.make | 42 +- .../tools/csi_driver/client/main.go | 115 +++--- .../tools/csi_driver/internal/driver/node.go | 63 ++- .../csi_driver/internal/driver/node_test.go | 46 ++- 11 files changed, 585 insertions(+), 397 deletions(-) create mode 100644 cloud/blockstore/tests/csi_driver/csi_sanity_tests/test.py create mode 100644 cloud/blockstore/tests/csi_driver/csi_sanity_tests/ya.make create mode 100644 cloud/blockstore/tests/csi_driver/e2e_tests/test.py create mode 100644 cloud/blockstore/tests/csi_driver/e2e_tests/ya.make create mode 100644 cloud/blockstore/tests/csi_driver/lib/__init__.py rename cloud/blockstore/tests/csi_driver/{test.py => lib/csi_runner.py} (50%) create mode 100644 cloud/blockstore/tests/csi_driver/lib/ya.make diff --git a/cloud/blockstore/tests/csi_driver/csi_sanity_tests/test.py b/cloud/blockstore/tests/csi_driver/csi_sanity_tests/test.py new file mode 100644 index 00000000000..31b2b76f59c --- /dev/null +++ b/cloud/blockstore/tests/csi_driver/csi_sanity_tests/test.py @@ -0,0 +1,54 @@ +import pytest +import subprocess +import yaml + +from pathlib import Path + +import yatest.common as common +import cloud.blockstore.tests.csi_driver.lib.csi_runner as csi + + +@pytest.mark.parametrize('mount_path,volume_access_type,vm_mode', + [("/var/lib/kubelet/pods/123/volumes/kubernetes.io~csi/456/mount", "mount", False), + ("/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish/123/456", "block", False), + ("/var/lib/kubelet/pods/123/volumes/kubernetes.io~csi/456/mount", "mount", True)]) +def test_csi_sanity_nbs_backend(mount_path, volume_access_type, vm_mode): + env, run = csi.init(vm_mode) + backend = "nbs" + + try: + CSI_SANITY_BINARY_PATH = common.binary_path("cloud/blockstore/tools/testing/csi-sanity/bin/csi-sanity") + mount_dir = Path(mount_path) + mount_dir.parent.mkdir(parents=True, exist_ok=True) + + params_file = Path("params.yaml") + params_file.write_text(yaml.safe_dump( + { + "backend": backend, + "instanceId": "test-instance-id", + } + )) + + skipTests = [] + args = [CSI_SANITY_BINARY_PATH, + "-csi.endpoint", + env.csi._endpoint, + "--csi.mountdir", + mount_dir, + "-csi.testvolumeparameters", + params_file, + "-csi.testvolumeaccesstype", + volume_access_type, + "--ginkgo.skip", + '|'.join(skipTests)] + subprocess.run( + args, + check=True, + capture_output=True, + text=True, + ) + except subprocess.CalledProcessError as e: + csi.log_called_process_error(e) + raise + finally: + csi.cleanup_after_test(env) diff --git a/cloud/blockstore/tests/csi_driver/csi_sanity_tests/ya.make b/cloud/blockstore/tests/csi_driver/csi_sanity_tests/ya.make new file mode 100644 index 00000000000..fcaf412c0fb --- /dev/null +++ b/cloud/blockstore/tests/csi_driver/csi_sanity_tests/ya.make @@ -0,0 +1,36 @@ +PY3TEST() + +SIZE(MEDIUM) +TIMEOUT(600) + +TEST_SRCS( + test.py +) + +DEPENDS( + cloud/blockstore/tests/csi_driver/lib + cloud/blockstore/tools/csi_driver/cmd/nbs-csi-driver + cloud/blockstore/tools/csi_driver/client + cloud/blockstore/tools/testing/csi-sanity/bin + cloud/blockstore/apps/client + cloud/blockstore/apps/disk_agent + cloud/blockstore/apps/endpoint_proxy + cloud/blockstore/apps/server + contrib/ydb/apps/ydbd +) + +PEERDIR( + cloud/blockstore/tests/csi_driver/lib + cloud/blockstore/config + cloud/blockstore/tests/python/lib + cloud/storage/core/protos + contrib/ydb/core/protos + contrib/ydb/tests/library +) +SET_APPEND(QEMU_INVOKE_TEST YES) +SET_APPEND(QEMU_VIRTIO none) +SET_APPEND(QEMU_ENABLE_KVM True) +SET_APPEND(QEMU_MEM 8G) +INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/qemu.inc) + +END() diff --git a/cloud/blockstore/tests/csi_driver/e2e_tests/test.py b/cloud/blockstore/tests/csi_driver/e2e_tests/test.py new file mode 100644 index 00000000000..a48d1d5ede8 --- /dev/null +++ b/cloud/blockstore/tests/csi_driver/e2e_tests/test.py @@ -0,0 +1,197 @@ +import logging +import pytest +import subprocess + +from pathlib import Path + +import cloud.blockstore.tests.csi_driver.lib.csi_runner as csi + + +def test_nbs_csi_driver_mounted_disk_protected_from_deletion(): + env, run = csi.init() + try: + volume_name = "example-disk" + volume_size = 10 * 1024 ** 3 + pod_name = "example-pod" + pod_id = "deadbeef" + access_type = "mount" + env.csi.create_volume(name=volume_name, size=volume_size) + env.csi.stage_volume(volume_name, access_type) + env.csi.publish_volume(pod_id, volume_name, pod_name, access_type) + result = run( + "destroyvolume", + "--disk-id", + volume_name, + input=volume_name, + code=1, + ) + logging.info("Stdout: %s", result.stdout) + logging.info("Stderr: %s", result.stderr) + if result.returncode != 1: + raise AssertionError("Destroyvolume must return exit code 1") + assert "E_REJECTED" in result.stdout + except subprocess.CalledProcessError as e: + csi.log_called_process_error(e) + raise + finally: + csi.cleanup_after_test(env, volume_name, access_type, [pod_id]) + + +def test_nbs_csi_driver_volume_stat(): + # Scenario + # 1. create volume and publish volume + # 2. get volume stats and validate output + # 3. create two files in the mounted directory + # 4. get volume stats again and validate output + # 5. check that the difference between used/available bytes is 2 block sizes + # 6. check that the difference between used/available inodes is 2 + env, run = csi.init() + try: + volume_name = "example-disk" + volume_size = 1024 ** 3 + pod_name = "example-pod" + pod_id = "deadbeef" + access_type = "mount" + env.csi.create_volume(name=volume_name, size=volume_size) + env.csi.stage_volume(volume_name, access_type) + env.csi.publish_volume(pod_id, volume_name, pod_name, access_type) + stats1 = env.csi.volumestats(pod_id, volume_name) + + assert "usage" in stats1 + usage_array1 = stats1["usage"] + assert 2 == len(usage_array1) + for usage in usage_array1: + usage = usage_array1[0] + assert {"unit", "total", "available", "used"} == usage.keys() + assert 0 != usage["total"] + assert usage["total"] == usage["available"] + usage["used"] + + mount_path = Path("/var/lib/kubelet/pods") / pod_id / "volumes/kubernetes.io~csi" / volume_name / "mount" + (mount_path / "test1.file").write_bytes(b"\0") + (mount_path / "test2.file").write_bytes(b"\0") + + stats2 = env.csi.volumestats(pod_id, volume_name) + assert "usage" in stats2 + usage_array2 = stats2["usage"] + assert 2 == len(usage_array2) + for usage in usage_array2: + usage = usage_array2[0] + assert {"unit", "total", "available", "used"} == usage.keys() + assert 0 != usage["total"] + assert usage["total"] == usage["available"] + usage["used"] + + bytesUsage1 = usage_array1[0] + bytesUsage2 = usage_array2[0] + assert 4096 * 2 == bytesUsage1["available"] - bytesUsage2["available"] + assert 4096 * 2 == bytesUsage2["used"] - bytesUsage1["used"] + + nodesUsage1 = usage_array1[1] + nodesUsage2 = usage_array2[1] + assert 2 == nodesUsage1["available"] - nodesUsage2["available"] + assert 2 == nodesUsage2["used"] - nodesUsage1["used"] + except subprocess.CalledProcessError as e: + csi.log_called_process_error(e) + raise + finally: + csi.cleanup_after_test(env, volume_name, access_type, [pod_id]) + + +@pytest.mark.parametrize('fs_type', ["ext4", "xfs"]) +def test_node_volume_expand(fs_type): + env, run = csi.init() + try: + volume_name = "example-disk" + volume_size = 1024 ** 3 + pod_name = "example-pod" + pod_id = "deadbeef" + access_type = "mount" + env.csi.create_volume(name=volume_name, size=volume_size) + + env.csi.stage_volume(volume_name, access_type) + env.csi.publish_volume(pod_id, volume_name, pod_name, fs_type) + env.csi.stage_volume(volume_name, access_type) + env.csi.publish_volume(pod_id, volume_name, pod_name, access_type, fs_type) + + new_volume_size = 2 * volume_size + env.csi.expand_volume(pod_id, volume_name, new_volume_size, access_type) + + stats = env.csi.volumestats(pod_id, volume_name) + assert "usage" in stats + usage_array = stats["usage"] + assert 2 == len(usage_array) + bytes_usage = usage_array[0] + assert "total" in bytes_usage + # approximate check that total space is around 2GB + assert bytes_usage["total"] // 1000 ** 3 == 2 + + # check that expand_volume is idempotent method + env.csi.expand_volume(pod_id, volume_name, new_volume_size, access_type) + except subprocess.CalledProcessError as e: + csi.log_called_process_error(e) + raise + finally: + csi.cleanup_after_test(env, volume_name, access_type, [pod_id]) + + +@pytest.mark.parametrize('access_type,vm_mode', [("mount", True), ("mount", False), ("block", False)]) +def test_publish_volume_twice_on_the_same_node(access_type, vm_mode): + env, run = csi.init(vm_mode=vm_mode) + try: + volume_name = "example-disk" + volume_size = 1024 ** 3 + pod_name1 = "example-pod-1" + pod_name2 = "example-pod-2" + pod_id1 = "deadbeef1" + pod_id2 = "deadbeef2" + env.csi.create_volume(name=volume_name, size=volume_size) + env.csi.stage_volume(volume_name, access_type) + env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type) + env.csi.publish_volume(pod_id2, volume_name, pod_name2, access_type) + except subprocess.CalledProcessError as e: + csi.log_called_process_error(e) + raise + finally: + csi.cleanup_after_test(env, volume_name, access_type, [pod_id1, pod_id2]) + + +# test can be removed after migration of all endpoints to the new format +@pytest.mark.parametrize('access_type', ["mount", "block"]) +def test_restart_kubelet_with_old_format_endpoint(access_type): + env, run = csi.init() + try: + volume_name = "example-disk" + volume_size = 1024 ** 3 + pod_name1 = "example-pod-1" + pod_id1 = "deadbeef1" + env.csi.create_volume(name=volume_name, size=volume_size) + # skip stage to create endpoint with old format + env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type) + # run stage/publish again to simulate kubelet restart + env.csi.stage_volume(volume_name, access_type) + env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type) + except subprocess.CalledProcessError as e: + csi.log_called_process_error(e) + raise + finally: + csi.cleanup_after_test(env, volume_name, access_type, [pod_id1]) + + +@pytest.mark.parametrize('access_type', ["mount", "block"]) +def test_restart_kubelet_with_new_format_endpoint(access_type): + env, run = csi.init() + try: + volume_name = "example-disk" + volume_size = 1024 ** 3 + pod_name1 = "example-pod-1" + pod_id1 = "deadbeef1" + env.csi.create_volume(name=volume_name, size=volume_size) + env.csi.stage_volume(volume_name, access_type) + env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type) + # run stage/publish again to simulate kubelet restart + env.csi.stage_volume(volume_name, access_type) + env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type) + except subprocess.CalledProcessError as e: + csi.log_called_process_error(e) + raise + finally: + csi.cleanup_after_test(env, volume_name, access_type, [pod_id1]) diff --git a/cloud/blockstore/tests/csi_driver/e2e_tests/ya.make b/cloud/blockstore/tests/csi_driver/e2e_tests/ya.make new file mode 100644 index 00000000000..0c67aa28eda --- /dev/null +++ b/cloud/blockstore/tests/csi_driver/e2e_tests/ya.make @@ -0,0 +1,36 @@ +PY3TEST() + +SIZE(MEDIUM) +TIMEOUT(600) + +TEST_SRCS( + test.py +) + +DEPENDS( + cloud/blockstore/tools/csi_driver/cmd/nbs-csi-driver + cloud/blockstore/tools/csi_driver/client + cloud/blockstore/tests/csi_driver/lib + cloud/blockstore/tools/testing/csi-sanity/bin + cloud/blockstore/apps/client + cloud/blockstore/apps/disk_agent + cloud/blockstore/apps/endpoint_proxy + cloud/blockstore/apps/server + contrib/ydb/apps/ydbd +) + +PEERDIR( + cloud/blockstore/config + cloud/blockstore/tests/csi_driver/lib + cloud/blockstore/tests/python/lib + cloud/storage/core/protos + contrib/ydb/core/protos + contrib/ydb/tests/library +) +SET_APPEND(QEMU_INVOKE_TEST YES) +SET_APPEND(QEMU_VIRTIO none) +SET_APPEND(QEMU_ENABLE_KVM True) +SET_APPEND(QEMU_MEM 8G) +INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/qemu.inc) + +END() diff --git a/cloud/blockstore/tests/csi_driver/lib/__init__.py b/cloud/blockstore/tests/csi_driver/lib/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/cloud/blockstore/tests/csi_driver/test.py b/cloud/blockstore/tests/csi_driver/lib/csi_runner.py similarity index 50% rename from cloud/blockstore/tests/csi_driver/test.py rename to cloud/blockstore/tests/csi_driver/lib/csi_runner.py index 85a51f807a1..fce7f34d8c5 100644 --- a/cloud/blockstore/tests/csi_driver/test.py +++ b/cloud/blockstore/tests/csi_driver/lib/csi_runner.py @@ -1,12 +1,10 @@ import contextlib import logging import os -import pytest import subprocess import tempfile import time import json -import yaml from pathlib import Path @@ -52,66 +50,6 @@ def tear_down(self): self.sockets_temporary_directory.cleanup() -def init(vm_mode: bool = False): - server_config_patch = TServerConfig() - server_config_patch.NbdEnabled = True - endpoints_dir = Path(common.output_path()) / f"endpoints-{hash(common.context.test_name)}" - endpoints_dir.mkdir(exist_ok=True) - server_config_patch.EndpointStorageType = EEndpointStorageType.ENDPOINT_STORAGE_FILE - server_config_patch.EndpointStorageDir = str(endpoints_dir) - server_config_patch.AllowAllRequestsViaUDS = True - # We run inside qemu, so do not need to cleanup - temp_dir = tempfile.TemporaryDirectory(dir="/tmp") - logging.info("Created temporary dir %s", temp_dir.name) - sockets_dir = Path(temp_dir.name) - server_config_patch.UnixSocketPath = str(sockets_dir / "grpc.sock") - server_config_patch.VhostEnabled = True - server_config_patch.NbdDevicePrefix = "/dev/nbd" - server = TServerAppConfig() - server.ServerConfig.CopyFrom(server_config_patch) - server.ServerConfig.ThreadsCount = thread_count() - server.ServerConfig.StrictContractValidation = True - server.KikimrServiceConfig.CopyFrom(TKikimrServiceConfig()) - subprocess.check_call(["modprobe", "nbd"], timeout=20) - env = CsiLoadTest( - sockets_dir=str(sockets_dir), - grpc_unix_socket_path=server_config_patch.UnixSocketPath, - sockets_temporary_directory=temp_dir, - vm_mode=vm_mode, - endpoint="", - server_app_config=server, - storage_config_patches=None, - use_in_memory_pdisks=True, - with_endpoint_proxy=True, - with_netlink=True) - - client_config_path = Path(yatest_common.output_path()) / "client-config.txt" - client_config = TClientAppConfig() - client_config.ClientConfig.CopyFrom(TClientConfig()) - client_config.ClientConfig.RetryTimeout = 1 - client_config.ClientConfig.Host = "localhost" - client_config.ClientConfig.InsecurePort = env.nbs_port - client_config_path.write_text(MessageToString(client_config)) - - def run(*args, **kwargs): - args = [BINARY_PATH, *args, "--config", str(client_config_path)] - script_input = kwargs.get("input") - if script_input is not None: - script_input = script_input + "\n" - - logging.info("running command: %s" % args) - result = subprocess.run( - args, - cwd=kwargs.get("cwd"), - check=False, - capture_output=True, - input=script_input, - text=True, - ) - return result - return env, run - - class NbsCsiDriverRunner: def __init__(self, sockets_dir: str, grpc_unix_socket_path: str, vm_mode: bool): @@ -191,11 +129,13 @@ def create_volume(self, name: str, size: int): def delete_volume(self, name: str): return self._controller_run("deletevolume", "--id", name) - def stage_volume(self, volume_id: str): + def stage_volume(self, volume_id: str, access_type: str): return self._node_run( "stagevolume", "--volume-id", volume_id, + "--access-type", + access_type, ) def unstage_volume(self, volume_id: str): @@ -205,7 +145,13 @@ def unstage_volume(self, volume_id: str): volume_id, ) - def publish_volume(self, pod_id: str, volume_id: str, pod_name: str, fs_type: str = ""): + def publish_volume( + self, + pod_id: str, + volume_id: str, + pod_name: str, + access_type: str, + fs_type: str = ""): return self._node_run( "publishvolume", "--pod-id", @@ -216,15 +162,19 @@ def publish_volume(self, pod_id: str, volume_id: str, pod_name: str, fs_type: st pod_name, "--fs-type", fs_type, + "--access-type", + access_type, ) - def unpublish_volume(self, pod_id: str, volume_id: str): + def unpublish_volume(self, pod_id: str, volume_id: str, access_type: str): return self._node_run( "unpublishvolume", "--pod-id", pod_id, "--volume-id", volume_id, + "--access-type", + access_type, ) def stop(self): @@ -244,7 +194,7 @@ def volumestats(self, pod_id: str, volume_id: str): ) return json.loads(ret) - def expand_volume(self, pod_id: str, volume_id: str, size: int): + def expand_volume(self, pod_id: str, volume_id: str, size: int, access_type: str): return self._node_run( "expandvolume", "--pod-id", @@ -253,9 +203,19 @@ def expand_volume(self, pod_id: str, volume_id: str, size: int): volume_id, "--size", str(size), + "--access-type", + access_type, ) +@contextlib.contextmanager +def called_process_error_logged(): + try: + yield + except subprocess.CalledProcessError as e: + log_called_process_error(e) + + def log_called_process_error(exc): logging.error( "Failed %s, stdout: %s, stderr: %s", @@ -266,7 +226,11 @@ def log_called_process_error(exc): ) -def cleanup_after_test(env: CsiLoadTest, volume_name: str = "", pods: list[str] = []): +def cleanup_after_test( + env: CsiLoadTest, + volume_name: str = "", + access_type: str = "mount", + pods: list[str] = []): if env is None: return @@ -275,7 +239,7 @@ def cleanup_after_test(env: CsiLoadTest, volume_name: str = "", pods: list[str] for pod_id in pods: with called_process_error_logged(): - env.csi.unpublish_volume(pod_id, volume_name) + env.csi.unpublish_volume(pod_id, volume_name, access_type) with called_process_error_logged(): env.csi.unstage_volume(volume_name) @@ -285,236 +249,61 @@ def cleanup_after_test(env: CsiLoadTest, volume_name: str = "", pods: list[str] env.tear_down() -@contextlib.contextmanager -def called_process_error_logged(): - try: - yield - except subprocess.CalledProcessError as e: - log_called_process_error(e) - - -def test_nbs_csi_driver_mounted_disk_protected_from_deletion(): - env, run = init() - try: - volume_name = "example-disk" - volume_size = 10 * 1024 ** 3 - pod_name = "example-pod" - pod_id = "deadbeef" - env.csi.create_volume(name=volume_name, size=volume_size) - env.csi.stage_volume(volume_name) - env.csi.publish_volume(pod_id, volume_name, pod_name) - result = run( - "destroyvolume", - "--disk-id", - volume_name, - input=volume_name, - code=1, - ) - logging.info("Stdout: %s", result.stdout) - logging.info("Stderr: %s", result.stderr) - if result.returncode != 1: - raise AssertionError("Destroyvolume must return exit code 1") - assert "E_REJECTED" in result.stdout - except subprocess.CalledProcessError as e: - log_called_process_error(e) - raise - finally: - cleanup_after_test(env, volume_name, [pod_id]) - - -def test_nbs_csi_driver_volume_stat(): - # Scenario - # 1. create volume and publish volume - # 2. get volume stats and validate output - # 3. create two files in the mounted directory - # 4. get volume stats again and validate output - # 5. check that the difference between used/available bytes is 2 block sizes - # 6. check that the difference between used/available inodes is 2 - env, run = init() - try: - volume_name = "example-disk" - volume_size = 1024 ** 3 - pod_name = "example-pod" - pod_id = "deadbeef" - env.csi.create_volume(name=volume_name, size=volume_size) - env.csi.stage_volume(volume_name) - env.csi.publish_volume(pod_id, volume_name, pod_name) - stats1 = env.csi.volumestats(pod_id, volume_name) - - assert "usage" in stats1 - usage_array1 = stats1["usage"] - assert 2 == len(usage_array1) - for usage in usage_array1: - usage = usage_array1[0] - assert {"unit", "total", "available", "used"} == usage.keys() - assert 0 != usage["total"] - assert usage["total"] == usage["available"] + usage["used"] - - mount_path = Path("/var/lib/kubelet/pods") / pod_id / "volumes/kubernetes.io~csi" / volume_name / "mount" - (mount_path / "test1.file").write_bytes(b"\0") - (mount_path / "test2.file").write_bytes(b"\0") - - stats2 = env.csi.volumestats(pod_id, volume_name) - assert "usage" in stats2 - usage_array2 = stats2["usage"] - assert 2 == len(usage_array2) - for usage in usage_array2: - usage = usage_array2[0] - assert {"unit", "total", "available", "used"} == usage.keys() - assert 0 != usage["total"] - assert usage["total"] == usage["available"] + usage["used"] - - bytesUsage1 = usage_array1[0] - bytesUsage2 = usage_array2[0] - assert 4096 * 2 == bytesUsage1["available"] - bytesUsage2["available"] - assert 4096 * 2 == bytesUsage2["used"] - bytesUsage1["used"] - - nodesUsage1 = usage_array1[1] - nodesUsage2 = usage_array2[1] - assert 2 == nodesUsage1["available"] - nodesUsage2["available"] - assert 2 == nodesUsage2["used"] - nodesUsage1["used"] - except subprocess.CalledProcessError as e: - log_called_process_error(e) - raise - finally: - cleanup_after_test(env, volume_name, [pod_id]) +def init(vm_mode: bool = False): + server_config_patch = TServerConfig() + server_config_patch.NbdEnabled = True + endpoints_dir = Path(common.output_path()) / f"endpoints-{hash(common.context.test_name)}" + endpoints_dir.mkdir(exist_ok=True) + server_config_patch.EndpointStorageType = EEndpointStorageType.ENDPOINT_STORAGE_FILE + server_config_patch.EndpointStorageDir = str(endpoints_dir) + server_config_patch.AllowAllRequestsViaUDS = True + # We run inside qemu, so do not need to cleanup + temp_dir = tempfile.TemporaryDirectory(dir="/tmp") + logging.info("Created temporary dir %s", temp_dir.name) + sockets_dir = Path(temp_dir.name) + server_config_patch.UnixSocketPath = str(sockets_dir / "grpc.sock") + server_config_patch.VhostEnabled = True + server_config_patch.NbdDevicePrefix = "/dev/nbd" + server = TServerAppConfig() + server.ServerConfig.CopyFrom(server_config_patch) + server.ServerConfig.ThreadsCount = thread_count() + server.ServerConfig.StrictContractValidation = True + server.KikimrServiceConfig.CopyFrom(TKikimrServiceConfig()) + subprocess.check_call(["modprobe", "nbd"], timeout=20) + env = CsiLoadTest( + sockets_dir=str(sockets_dir), + grpc_unix_socket_path=server_config_patch.UnixSocketPath, + sockets_temporary_directory=temp_dir, + vm_mode=vm_mode, + endpoint="", + server_app_config=server, + storage_config_patches=None, + use_in_memory_pdisks=True, + with_endpoint_proxy=True, + with_netlink=True) + client_config_path = Path(yatest_common.output_path()) / "client-config.txt" + client_config = TClientAppConfig() + client_config.ClientConfig.CopyFrom(TClientConfig()) + client_config.ClientConfig.RetryTimeout = 1 + client_config.ClientConfig.Host = "localhost" + client_config.ClientConfig.InsecurePort = env.nbs_port + client_config_path.write_text(MessageToString(client_config)) -@pytest.mark.parametrize('mount_path,volume_access_type,vm_mode', - [("/var/lib/kubelet/pods/123/volumes/kubernetes.io~csi/456/mount", "mount", False), - ("/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish/123/456", "block", False), - ("/var/lib/kubelet/pods/123/volumes/kubernetes.io~csi/456/mount", "mount", True)]) -def test_csi_sanity_nbs_backend(mount_path, volume_access_type, vm_mode): - env, run = init(vm_mode) - backend = "nbs" + def run(*args, **kwargs): + args = [BINARY_PATH, *args, "--config", str(client_config_path)] + script_input = kwargs.get("input") + if script_input is not None: + script_input = script_input + "\n" - try: - CSI_SANITY_BINARY_PATH = common.binary_path("cloud/blockstore/tools/testing/csi-sanity/bin/csi-sanity") - mount_dir = Path(mount_path) - mount_dir.parent.mkdir(parents=True, exist_ok=True) - - params_file = Path("params.yaml") - params_file.write_text(yaml.safe_dump( - { - "backend": backend, - "instanceId": "test-instance-id", - } - )) - - skipTests = [] - args = [CSI_SANITY_BINARY_PATH, - "-csi.endpoint", - env.csi._endpoint, - "--csi.mountdir", - mount_dir, - "-csi.testvolumeparameters", - params_file, - "-csi.testvolumeaccesstype", - volume_access_type, - "--ginkgo.skip", - '|'.join(skipTests)] - subprocess.run( + logging.info("running command: %s" % args) + result = subprocess.run( args, - check=True, + cwd=kwargs.get("cwd"), + check=False, capture_output=True, + input=script_input, text=True, ) - except subprocess.CalledProcessError as e: - log_called_process_error(e) - raise - finally: - cleanup_after_test(env) - - -@pytest.mark.parametrize('fs_type', ["ext4", "xfs"]) -def test_node_volume_expand(fs_type): - env, run = init() - try: - volume_name = "example-disk" - volume_size = 1024 ** 3 - pod_name = "example-pod" - pod_id = "deadbeef" - env.csi.create_volume(name=volume_name, size=volume_size) - env.csi.stage_volume(volume_name) - env.csi.publish_volume(pod_id, volume_name, pod_name, fs_type) - - new_volume_size = 2 * volume_size - env.csi.expand_volume(pod_id, volume_name, new_volume_size) - - stats = env.csi.volumestats(pod_id, volume_name) - assert "usage" in stats - usage_array = stats["usage"] - assert 2 == len(usage_array) - bytes_usage = usage_array[0] - assert "total" in bytes_usage - # approximate check that total space is around 2GB - assert bytes_usage["total"] // 1000 ** 3 == 2 - - # check that expand_volume is idempotent method - env.csi.expand_volume(pod_id, volume_name, new_volume_size) - except subprocess.CalledProcessError as e: - log_called_process_error(e) - raise - finally: - cleanup_after_test(env, volume_name, [pod_id]) - - -@pytest.mark.parametrize('vm_mode', [True, False]) -def test_publish_volume_twice_on_the_same_node(vm_mode): - env, run = init(vm_mode=vm_mode) - try: - volume_name = "example-disk" - volume_size = 1024 ** 3 - pod_name1 = "example-pod-1" - pod_name2 = "example-pod-2" - pod_id1 = "deadbeef1" - pod_id2 = "deadbeef2" - env.csi.create_volume(name=volume_name, size=volume_size) - env.csi.stage_volume(volume_name) - env.csi.publish_volume(pod_id1, volume_name, pod_name1) - env.csi.publish_volume(pod_id2, volume_name, pod_name2) - except subprocess.CalledProcessError as e: - log_called_process_error(e) - raise - finally: - cleanup_after_test(env, volume_name, [pod_id1, pod_id2]) - - -def test_restart_kubelet_with_old_format_endpoint(): - env, run = init() - try: - volume_name = "example-disk" - volume_size = 1024 ** 3 - pod_name1 = "example-pod-1" - pod_id1 = "deadbeef1" - env.csi.create_volume(name=volume_name, size=volume_size) - # skip stage to create endpoint with old format - env.csi.publish_volume(pod_id1, volume_name, pod_name1) - # run stage/publish again to simulate kubelet restart - env.csi.stage_volume(volume_name) - env.csi.publish_volume(pod_id1, volume_name, pod_name1) - except subprocess.CalledProcessError as e: - log_called_process_error(e) - raise - finally: - cleanup_after_test(env, volume_name, [pod_id1]) - - -def test_restart_kubelet_with_new_format_endpoint(): - env, run = init() - try: - volume_name = "example-disk" - volume_size = 1024 ** 3 - pod_name1 = "example-pod-1" - pod_id1 = "deadbeef1" - env.csi.create_volume(name=volume_name, size=volume_size) - env.csi.stage_volume(volume_name) - env.csi.publish_volume(pod_id1, volume_name, pod_name1) - # run stage/publish again to simulate kubelet restart - env.csi.stage_volume(volume_name) - env.csi.publish_volume(pod_id1, volume_name, pod_name1) - except subprocess.CalledProcessError as e: - log_called_process_error(e) - raise - finally: - cleanup_after_test(env, volume_name, [pod_id1]) + return result + return env, run diff --git a/cloud/blockstore/tests/csi_driver/lib/ya.make b/cloud/blockstore/tests/csi_driver/lib/ya.make new file mode 100644 index 00000000000..a61405758d6 --- /dev/null +++ b/cloud/blockstore/tests/csi_driver/lib/ya.make @@ -0,0 +1,16 @@ +PY3_LIBRARY() + +PEERDIR( + cloud/blockstore/config + cloud/blockstore/tests/python/lib + cloud/storage/core/protos + contrib/ydb/core/protos + contrib/ydb/tests/library +) + +PY_SRCS( + __init__.py + csi_runner.py +) + +END() diff --git a/cloud/blockstore/tests/csi_driver/ya.make b/cloud/blockstore/tests/csi_driver/ya.make index 7281364969a..116e71b2516 100644 --- a/cloud/blockstore/tests/csi_driver/ya.make +++ b/cloud/blockstore/tests/csi_driver/ya.make @@ -1,43 +1,9 @@ IF (BUILD_CSI_DRIVER) -PY3TEST() - -SIZE(MEDIUM) -TIMEOUT(600) -REQUIREMENTS( - cpu:4 - ram:16 +RECURSE( + lib + csi_sanity_tests + e2e_tests ) - -TEST_SRCS( - test.py -) - -DEPENDS( - cloud/blockstore/tools/csi_driver/cmd/nbs-csi-driver - cloud/blockstore/tools/csi_driver/client - cloud/blockstore/tools/testing/csi-sanity/bin - cloud/blockstore/apps/client - cloud/blockstore/apps/disk_agent - cloud/blockstore/apps/endpoint_proxy - cloud/blockstore/apps/server - contrib/ydb/apps/ydbd -) - -PEERDIR( - cloud/blockstore/config - cloud/blockstore/tests/python/lib - cloud/storage/core/protos - contrib/ydb/core/protos - contrib/ydb/tests/library -) -SET_APPEND(QEMU_INVOKE_TEST YES) -SET_APPEND(QEMU_VIRTIO none) -SET_APPEND(QEMU_ENABLE_KVM True) -SET_APPEND(QEMU_MEM 8G) -INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/qemu.inc) - -END() - ENDIF() diff --git a/cloud/blockstore/tools/csi_driver/client/main.go b/cloud/blockstore/tools/csi_driver/client/main.go index 79542a3d836..14230ecd336 100644 --- a/cloud/blockstore/tools/csi_driver/client/main.go +++ b/cloud/blockstore/tools/csi_driver/client/main.go @@ -189,8 +189,39 @@ func newDeleteVolumeCommand(endpoint *string) *cobra.Command { return &cmd } +func createVolumeCapability( + accessMode csi.VolumeCapability_AccessMode_Mode, + accessType string) *csi.VolumeCapability { + volumeCapability := &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{Mode: accessMode}, + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + } + if accessType == "block" { + volumeCapability.AccessType = &csi.VolumeCapability_Block{ + Block: &csi.VolumeCapability_BlockVolume{}, + } + } + + return volumeCapability +} + +func getTargetPath(podId string, volumeId string, accessType string) string { + targetPathPattern := "/var/lib/kubelet/pods/%s/volumes/kubernetes.io~csi/%s/mount" + if accessType == "block" { + targetPathPattern = "/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish/%s/%s" + } + targetPath := fmt.Sprintf( + targetPathPattern, + podId, + volumeId, + ) + return targetPath +} + func newNodeStageVolumeCommand(endpoint *string) *cobra.Command { - var volumeId, stagingTargetPath string + var volumeId, stagingTargetPath, accessType string cmd := cobra.Command{ Use: "stagevolume", Short: "Send stage volume request to the CSI node", @@ -214,15 +245,8 @@ func newNodeStageVolumeCommand(endpoint *string) *cobra.Command { &csi.NodeStageVolumeRequest{ VolumeId: volumeId, StagingTargetPath: stagingTargetPath, - VolumeCapability: &csi.VolumeCapability{ - AccessType: &csi.VolumeCapability_Mount{ - Mount: nil, - }, - AccessMode: &csi.VolumeCapability_AccessMode{ - Mode: accessMode, - }, - }, - VolumeContext: volumeContext, + VolumeCapability: createVolumeCapability(accessMode, accessType), + VolumeContext: volumeContext, }, ) if err != nil { @@ -245,6 +269,12 @@ func newNodeStageVolumeCommand(endpoint *string) *cobra.Command { "a/globalmount", "staging target path", ) + cmd.Flags().StringVar( + &accessType, + "access-type", + "mount", + "mount or block access type", + ) err := cmd.MarkFlagRequired("volume-id") if err != nil { log.Fatal(err) @@ -254,6 +284,7 @@ func newNodeStageVolumeCommand(endpoint *string) *cobra.Command { func newPublishVolumeCommand(endpoint *string) *cobra.Command { var volumeId, podId, stagingTargetPath, podName, fsType string + var accessType string var readOnly bool cmd := cobra.Command{ Use: "publishvolume", @@ -269,12 +300,6 @@ func newPublishVolumeCommand(endpoint *string) *cobra.Command { log.Fatal(err) } - targetPath := fmt.Sprintf( - "/var/lib/kubelet/pods/%s/volumes/kubernetes.io~csi/"+ - "%s/mount", - podId, - volumeId, - ) volumeContext := map[string]string{ "csi.storage.k8s.io/pod.uid": podId, "csi.storage.k8s.io/serviceAccount.name": "default", @@ -293,18 +318,11 @@ func newPublishVolumeCommand(endpoint *string) *cobra.Command { VolumeId: volumeId, PublishContext: nil, StagingTargetPath: stagingTargetPath, - TargetPath: targetPath, - VolumeCapability: &csi.VolumeCapability{ - AccessType: &csi.VolumeCapability_Mount{ - Mount: nil, - }, - AccessMode: &csi.VolumeCapability_AccessMode{ - Mode: accessMode, - }, - }, - Readonly: false, - Secrets: nil, - VolumeContext: volumeContext, + TargetPath: getTargetPath(podId, volumeId, accessType), + VolumeCapability: createVolumeCapability(accessMode, accessType), + Readonly: false, + Secrets: nil, + VolumeContext: volumeContext, }, ) if err != nil { @@ -346,6 +364,12 @@ func newPublishVolumeCommand(endpoint *string) *cobra.Command { "", "filesystem type: ext4, xfs", ) + cmd.Flags().StringVar( + &accessType, + "access-type", + "mount", + "mount or block access type", + ) err := cmd.MarkFlagRequired("volume-id") if err != nil { @@ -411,7 +435,7 @@ func newNodeUnstageVolumeCommand(endpoint *string) *cobra.Command { } func newUnpublishVolumeCommand(endpoint *string) *cobra.Command { - var volumeId, podId string + var volumeId, podId, accessType string cmd := cobra.Command{ Use: "unpublishvolume", Short: "Send unpublish volume request to the CSI node", @@ -423,17 +447,11 @@ func newUnpublishVolumeCommand(endpoint *string) *cobra.Command { defer cancelFunc() client, err := newNodeClient(ctx, *endpoint) - targetPath := fmt.Sprintf( - "/var/lib/kubelet/pods/%s/volumes/kubernetes.io~csi/"+ - "%s/mount", - podId, - volumeId, - ) response, err := client.NodeUnpublishVolume( ctx, &csi.NodeUnpublishVolumeRequest{ VolumeId: volumeId, - TargetPath: targetPath, + TargetPath: getTargetPath(podId, volumeId, accessType), }, ) if err != nil { @@ -450,6 +468,12 @@ func newUnpublishVolumeCommand(endpoint *string) *cobra.Command { "volume id", ) cmd.Flags().StringVar(&podId, "pod-id", "", "pod id") + cmd.Flags().StringVar( + &accessType, + "access-type", + "mount", + "mount or block access type", + ) err := cmd.MarkFlagRequired("volume-id") if err != nil { log.Fatal(err) @@ -520,7 +544,7 @@ func newNodeGetVolumeStatsCommand(endpoint *string) *cobra.Command { //////////////////////////////////////////////////////////////////////////////// func newNodeExpandVolumeCommand(endpoint *string) *cobra.Command { - var volumeId, podId string + var volumeId, podId, accessType string var size int64 cmd := cobra.Command{ Use: "expandvolume", @@ -532,21 +556,16 @@ func newNodeExpandVolumeCommand(endpoint *string) *cobra.Command { ) defer cancelFunc() client, err := newNodeClient(ctx, *endpoint) - - volumePath := fmt.Sprintf( - "/var/lib/kubelet/pods/%s/volumes/kubernetes.io~csi/"+ - "%s/mount", - podId, - volumeId, - ) + accessMode := csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER _, err = client.NodeExpandVolume( ctx, &csi.NodeExpandVolumeRequest{ VolumeId: volumeId, - VolumePath: volumePath, + VolumePath: getTargetPath(podId, volumeId, accessType), CapacityRange: &csi.CapacityRange{ RequiredBytes: size, }, + VolumeCapability: createVolumeCapability(accessMode, accessType), }, ) if err != nil { @@ -566,6 +585,12 @@ func newNodeExpandVolumeCommand(endpoint *string) *cobra.Command { "size", 0, "The new size of the disk in bytes") + cmd.Flags().StringVar( + &accessType, + "access-type", + "mount", + "mount or block access type", + ) err := cmd.MarkFlagRequired("volume-id") if err != nil { diff --git a/cloud/blockstore/tools/csi_driver/internal/driver/node.go b/cloud/blockstore/tools/csi_driver/internal/driver/node.go index 94acf1f953d..f5bf8e25f3c 100644 --- a/cloud/blockstore/tools/csi_driver/internal/driver/node.go +++ b/cloud/blockstore/tools/csi_driver/internal/driver/node.go @@ -212,6 +212,12 @@ func (s *nodeService) NodeStageVolume( } } case *csi.VolumeCapability_Block: + if nfsBackend { + return nil, s.statusError(codes.InvalidArgument, + "'Block' volume mode is not supported with nfs backend") + } else { + err = s.nodeStageDiskAsBlockDevice(ctx, req) + } default: return nil, s.statusError(codes.InvalidArgument, "Unknown access type") } @@ -771,7 +777,35 @@ func (s *nodeService) nodeStageDiskAsFilesystem( return nil } -func (s *nodeService) nodePublishDiskAsBlockDevice( +func (s *nodeService) nodeStageDiskAsBlockDevice( + ctx context.Context, + req *csi.NodeStageVolumeRequest) error { + + resp, err := s.startNbsEndpointForNBD(ctx, "", req.VolumeId, req.VolumeContext) + if err != nil { + if s.IsMountConflictError(err) { + localEndpoint, err := s.hasLocalEndpoint(ctx, req.VolumeId) + if err != nil { + return err + } + if localEndpoint { + return nil + } + } + return fmt.Errorf("failed to start NBS endpoint: %w", err) + } + + if resp.NbdDeviceFile == "" { + return fmt.Errorf("NbdDeviceFile shouldn't be empty") + } + + logVolume(req.VolumeId, "endpoint started with device: %q", resp.NbdDeviceFile) + + devicePath := filepath.Join(req.StagingTargetPath, req.VolumeId) + return s.mountBlockDevice(req.VolumeId, resp.NbdDeviceFile, devicePath) +} + +func (s *nodeService) nodePublishDiskAsBlockDeviceDeprecated( ctx context.Context, req *csi.NodePublishVolumeRequest) error { @@ -788,6 +822,20 @@ func (s *nodeService) nodePublishDiskAsBlockDevice( return s.mountBlockDevice(req.VolumeId, resp.NbdDeviceFile, req.TargetPath) } +func (s *nodeService) nodePublishDiskAsBlockDevice( + ctx context.Context, + req *csi.NodePublishVolumeRequest) error { + + devicePath := filepath.Join(req.StagingTargetPath, req.VolumeId) + mounted, _ := s.mounter.IsMountPoint(devicePath) + if !mounted { + // Fallback to previous implementation for already staged volumes + return s.nodePublishDiskAsBlockDeviceDeprecated(ctx, req) + } + + return s.mountBlockDevice(req.VolumeId, devicePath, req.TargetPath) +} + func (s *nodeService) startNbsEndpointForNBD( ctx context.Context, instanceId string, @@ -905,7 +953,16 @@ func (s *nodeService) nodeUnstageVolume( ctx context.Context, req *csi.NodeUnstageVolumeRequest) error { - mounted, _ := s.mounter.IsMountPoint(req.StagingTargetPath) + // Check mount points for StagingTargetPath and StagingTargetPath/VolumeId + // as it's not possible to distingiush mount and blok mode from request + // parameters + mountPoint := req.StagingTargetPath + mounted, _ := s.mounter.IsMountPoint(mountPoint) + if !mounted { + mountPoint = filepath.Join(req.StagingTargetPath, req.VolumeId) + mounted, _ = s.mounter.IsMountPoint(mountPoint) + } + if !mounted { // Fallback to previous implementation for already mounted volumes to // stop endpoint in nodeUnpublishVolume @@ -913,7 +970,7 @@ func (s *nodeService) nodeUnstageVolume( return nil } - if err := s.mounter.CleanupMountPoint(req.StagingTargetPath); err != nil { + if err := s.mounter.CleanupMountPoint(mountPoint); err != nil { return err } diff --git a/cloud/blockstore/tools/csi_driver/internal/driver/node_test.go b/cloud/blockstore/tools/csi_driver/internal/driver/node_test.go index fe0a5be6e2e..4b804d9954c 100644 --- a/cloud/blockstore/tools/csi_driver/internal/driver/node_test.go +++ b/cloud/blockstore/tools/csi_driver/internal/driver/node_test.go @@ -549,13 +549,15 @@ func TestPublishUnpublishDeviceForInfrakuber(t *testing.T) { clientID := "testClientId" podID := "test-pod-id-13" diskID := "test-disk-id-42" - actualClientId := "testClientId-test-pod-id-13" + actualClientId := "testClientId-testNodeId" targetPath := filepath.Join(tempDir, "volumeDevices", "publish", diskID, podID) targetBlkPathPattern := filepath.Join(tempDir, "volumeDevices/publish/([a-z0-9-]+)/([a-z0-9-]+)") stagingTargetPath := "testStagingTargetPath" + stagingDevicePath := filepath.Join(stagingTargetPath, diskID) socketsDir := filepath.Join(tempDir, "sockets") - sourcePath := filepath.Join(socketsDir, podID, diskID) - socketPath := filepath.Join(sourcePath, "nbs.sock") + sourcePath := filepath.Join(socketsDir, diskID) + socketPath := filepath.Join(socketsDir, diskID, "nbs.sock") + deprecatedSocketPath := filepath.Join(socketsDir, podID, diskID, "nbs.sock") nodeService := newNodeService( nodeID, @@ -580,19 +582,11 @@ func TestPublishUnpublishDeviceForInfrakuber(t *testing.T) { volumeContext := map[string]string{} - _, err = nodeService.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{ - VolumeId: diskID, - StagingTargetPath: stagingTargetPath, - VolumeCapability: &volumeCapability, - VolumeContext: volumeContext, - }) - require.NoError(t, err) - hostType := nbs.EHostType_HOST_TYPE_DEFAULT nbsClient.On("StartEndpoint", ctx, &nbs.TStartEndpointRequest{ UnixSocketPath: socketPath, DiskId: diskID, - InstanceId: podID, + InstanceId: nodeID, ClientId: actualClientId, DeviceName: diskID, IpcType: ipcType, @@ -610,9 +604,20 @@ func TestPublishUnpublishDeviceForInfrakuber(t *testing.T) { NbdDeviceFile: nbdDeviceFile, }, nil) - mockCallIsMountPoint := mounter.On("IsMountPoint", targetPath).Return(false, nil) + mockCallIsMountPointStagingPath := mounter.On("IsMountPoint", stagingDevicePath).Return(true, nil) + mockCallMount := mounter.On("Mount", nbdDeviceFile, stagingDevicePath, "", []string{"bind"}).Return(nil) + + _, err = nodeService.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{ + VolumeId: diskID, + StagingTargetPath: stagingTargetPath, + VolumeCapability: &volumeCapability, + VolumeContext: volumeContext, + }) + require.NoError(t, err) + mockCallMount.Unset() - mounter.On("Mount", nbdDeviceFile, targetPath, "", []string{"bind"}).Return(nil) + mockCallIsMountPointTargetPath := mounter.On("IsMountPoint", targetPath).Return(false, nil) + mounter.On("Mount", stagingDevicePath, targetPath, "", []string{"bind"}).Return(nil) _, err = nodeService.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{ VolumeId: diskID, @@ -623,6 +628,9 @@ func TestPublishUnpublishDeviceForInfrakuber(t *testing.T) { }) require.NoError(t, err) + mockCallIsMountPointStagingPath.Unset() + mockCallIsMountPointTargetPath.Unset() + fileInfo, err := os.Stat(sourcePath) assert.False(t, os.IsNotExist(err)) assert.True(t, fileInfo.IsDir()) @@ -641,7 +649,7 @@ func TestPublishUnpublishDeviceForInfrakuber(t *testing.T) { mounter.On("CleanupMountPoint", targetPath).Return(nil) nbsClient.On("StopEndpoint", ctx, &nbs.TStopEndpointRequest{ - UnixSocketPath: socketPath, + UnixSocketPath: deprecatedSocketPath, }).Return(&nbs.TStopEndpointResponse{}, nil) _, err = nodeService.NodeUnpublishVolume(ctx, &csi.NodeUnpublishVolumeRequest{ @@ -650,12 +658,16 @@ func TestPublishUnpublishDeviceForInfrakuber(t *testing.T) { }) require.NoError(t, err) - mockCallIsMountPoint.Unset() - _, err = os.Stat(filepath.Join(socketsDir, podID)) assert.True(t, os.IsNotExist(err)) mounter.On("IsMountPoint", stagingTargetPath).Return(false, nil) + mounter.On("IsMountPoint", stagingDevicePath).Return(true, nil) + mounter.On("CleanupMountPoint", stagingDevicePath).Return(nil) + + nbsClient.On("StopEndpoint", ctx, &nbs.TStopEndpointRequest{ + UnixSocketPath: socketPath, + }).Return(&nbs.TStopEndpointResponse{}, nil) _, err = nodeService.NodeUnstageVolume(ctx, &csi.NodeUnstageVolumeRequest{ VolumeId: diskID,