Skip to content

Commit

Permalink
Implement (un)stage/(un)publish volume according csi spec for block m…
Browse files Browse the repository at this point in the history
…ode (#2269)

doc: https://github.com/ydb-platform/nbs/blob/main/cloud/blockstore/tools/csi_driver/stage-publish-unpublish-unstage-flow.md

1. Implement stage/publish volume according csi spec for block mode
2. Split CSI Driver tests into two targets: csi_sanity tests and e2e tests
3. Parametrize e2e tests with block mode
  • Loading branch information
antonmyagkov authored Oct 21, 2024
1 parent 1fee614 commit 046dcf7
Show file tree
Hide file tree
Showing 11 changed files with 585 additions and 397 deletions.
54 changes: 54 additions & 0 deletions cloud/blockstore/tests/csi_driver/csi_sanity_tests/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pytest
import subprocess
import yaml

from pathlib import Path

import yatest.common as common
import cloud.blockstore.tests.csi_driver.lib.csi_runner as csi


@pytest.mark.parametrize('mount_path,volume_access_type,vm_mode',
[("/var/lib/kubelet/pods/123/volumes/kubernetes.io~csi/456/mount", "mount", False),
("/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish/123/456", "block", False),
("/var/lib/kubelet/pods/123/volumes/kubernetes.io~csi/456/mount", "mount", True)])
def test_csi_sanity_nbs_backend(mount_path, volume_access_type, vm_mode):
env, run = csi.init(vm_mode)
backend = "nbs"

try:
CSI_SANITY_BINARY_PATH = common.binary_path("cloud/blockstore/tools/testing/csi-sanity/bin/csi-sanity")
mount_dir = Path(mount_path)
mount_dir.parent.mkdir(parents=True, exist_ok=True)

params_file = Path("params.yaml")
params_file.write_text(yaml.safe_dump(
{
"backend": backend,
"instanceId": "test-instance-id",
}
))

skipTests = []
args = [CSI_SANITY_BINARY_PATH,
"-csi.endpoint",
env.csi._endpoint,
"--csi.mountdir",
mount_dir,
"-csi.testvolumeparameters",
params_file,
"-csi.testvolumeaccesstype",
volume_access_type,
"--ginkgo.skip",
'|'.join(skipTests)]
subprocess.run(
args,
check=True,
capture_output=True,
text=True,
)
except subprocess.CalledProcessError as e:
csi.log_called_process_error(e)
raise
finally:
csi.cleanup_after_test(env)
36 changes: 36 additions & 0 deletions cloud/blockstore/tests/csi_driver/csi_sanity_tests/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
PY3TEST()

SIZE(MEDIUM)
TIMEOUT(600)

TEST_SRCS(
test.py
)

DEPENDS(
cloud/blockstore/tests/csi_driver/lib
cloud/blockstore/tools/csi_driver/cmd/nbs-csi-driver
cloud/blockstore/tools/csi_driver/client
cloud/blockstore/tools/testing/csi-sanity/bin
cloud/blockstore/apps/client
cloud/blockstore/apps/disk_agent
cloud/blockstore/apps/endpoint_proxy
cloud/blockstore/apps/server
contrib/ydb/apps/ydbd
)

PEERDIR(
cloud/blockstore/tests/csi_driver/lib
cloud/blockstore/config
cloud/blockstore/tests/python/lib
cloud/storage/core/protos
contrib/ydb/core/protos
contrib/ydb/tests/library
)
SET_APPEND(QEMU_INVOKE_TEST YES)
SET_APPEND(QEMU_VIRTIO none)
SET_APPEND(QEMU_ENABLE_KVM True)
SET_APPEND(QEMU_MEM 8G)
INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/qemu.inc)

END()
197 changes: 197 additions & 0 deletions cloud/blockstore/tests/csi_driver/e2e_tests/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import logging
import pytest
import subprocess

from pathlib import Path

import cloud.blockstore.tests.csi_driver.lib.csi_runner as csi


def test_nbs_csi_driver_mounted_disk_protected_from_deletion():
env, run = csi.init()
try:
volume_name = "example-disk"
volume_size = 10 * 1024 ** 3
pod_name = "example-pod"
pod_id = "deadbeef"
access_type = "mount"
env.csi.create_volume(name=volume_name, size=volume_size)
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id, volume_name, pod_name, access_type)
result = run(
"destroyvolume",
"--disk-id",
volume_name,
input=volume_name,
code=1,
)
logging.info("Stdout: %s", result.stdout)
logging.info("Stderr: %s", result.stderr)
if result.returncode != 1:
raise AssertionError("Destroyvolume must return exit code 1")
assert "E_REJECTED" in result.stdout
except subprocess.CalledProcessError as e:
csi.log_called_process_error(e)
raise
finally:
csi.cleanup_after_test(env, volume_name, access_type, [pod_id])


def test_nbs_csi_driver_volume_stat():
# Scenario
# 1. create volume and publish volume
# 2. get volume stats and validate output
# 3. create two files in the mounted directory
# 4. get volume stats again and validate output
# 5. check that the difference between used/available bytes is 2 block sizes
# 6. check that the difference between used/available inodes is 2
env, run = csi.init()
try:
volume_name = "example-disk"
volume_size = 1024 ** 3
pod_name = "example-pod"
pod_id = "deadbeef"
access_type = "mount"
env.csi.create_volume(name=volume_name, size=volume_size)
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id, volume_name, pod_name, access_type)
stats1 = env.csi.volumestats(pod_id, volume_name)

assert "usage" in stats1
usage_array1 = stats1["usage"]
assert 2 == len(usage_array1)
for usage in usage_array1:
usage = usage_array1[0]
assert {"unit", "total", "available", "used"} == usage.keys()
assert 0 != usage["total"]
assert usage["total"] == usage["available"] + usage["used"]

mount_path = Path("/var/lib/kubelet/pods") / pod_id / "volumes/kubernetes.io~csi" / volume_name / "mount"
(mount_path / "test1.file").write_bytes(b"\0")
(mount_path / "test2.file").write_bytes(b"\0")

stats2 = env.csi.volumestats(pod_id, volume_name)
assert "usage" in stats2
usage_array2 = stats2["usage"]
assert 2 == len(usage_array2)
for usage in usage_array2:
usage = usage_array2[0]
assert {"unit", "total", "available", "used"} == usage.keys()
assert 0 != usage["total"]
assert usage["total"] == usage["available"] + usage["used"]

bytesUsage1 = usage_array1[0]
bytesUsage2 = usage_array2[0]
assert 4096 * 2 == bytesUsage1["available"] - bytesUsage2["available"]
assert 4096 * 2 == bytesUsage2["used"] - bytesUsage1["used"]

nodesUsage1 = usage_array1[1]
nodesUsage2 = usage_array2[1]
assert 2 == nodesUsage1["available"] - nodesUsage2["available"]
assert 2 == nodesUsage2["used"] - nodesUsage1["used"]
except subprocess.CalledProcessError as e:
csi.log_called_process_error(e)
raise
finally:
csi.cleanup_after_test(env, volume_name, access_type, [pod_id])


@pytest.mark.parametrize('fs_type', ["ext4", "xfs"])
def test_node_volume_expand(fs_type):
env, run = csi.init()
try:
volume_name = "example-disk"
volume_size = 1024 ** 3
pod_name = "example-pod"
pod_id = "deadbeef"
access_type = "mount"
env.csi.create_volume(name=volume_name, size=volume_size)

env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id, volume_name, pod_name, fs_type)
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id, volume_name, pod_name, access_type, fs_type)

new_volume_size = 2 * volume_size
env.csi.expand_volume(pod_id, volume_name, new_volume_size, access_type)

stats = env.csi.volumestats(pod_id, volume_name)
assert "usage" in stats
usage_array = stats["usage"]
assert 2 == len(usage_array)
bytes_usage = usage_array[0]
assert "total" in bytes_usage
# approximate check that total space is around 2GB
assert bytes_usage["total"] // 1000 ** 3 == 2

# check that expand_volume is idempotent method
env.csi.expand_volume(pod_id, volume_name, new_volume_size, access_type)
except subprocess.CalledProcessError as e:
csi.log_called_process_error(e)
raise
finally:
csi.cleanup_after_test(env, volume_name, access_type, [pod_id])


@pytest.mark.parametrize('access_type,vm_mode', [("mount", True), ("mount", False), ("block", False)])
def test_publish_volume_twice_on_the_same_node(access_type, vm_mode):
env, run = csi.init(vm_mode=vm_mode)
try:
volume_name = "example-disk"
volume_size = 1024 ** 3
pod_name1 = "example-pod-1"
pod_name2 = "example-pod-2"
pod_id1 = "deadbeef1"
pod_id2 = "deadbeef2"
env.csi.create_volume(name=volume_name, size=volume_size)
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type)
env.csi.publish_volume(pod_id2, volume_name, pod_name2, access_type)
except subprocess.CalledProcessError as e:
csi.log_called_process_error(e)
raise
finally:
csi.cleanup_after_test(env, volume_name, access_type, [pod_id1, pod_id2])


# test can be removed after migration of all endpoints to the new format
@pytest.mark.parametrize('access_type', ["mount", "block"])
def test_restart_kubelet_with_old_format_endpoint(access_type):
env, run = csi.init()
try:
volume_name = "example-disk"
volume_size = 1024 ** 3
pod_name1 = "example-pod-1"
pod_id1 = "deadbeef1"
env.csi.create_volume(name=volume_name, size=volume_size)
# skip stage to create endpoint with old format
env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type)
# run stage/publish again to simulate kubelet restart
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type)
except subprocess.CalledProcessError as e:
csi.log_called_process_error(e)
raise
finally:
csi.cleanup_after_test(env, volume_name, access_type, [pod_id1])


@pytest.mark.parametrize('access_type', ["mount", "block"])
def test_restart_kubelet_with_new_format_endpoint(access_type):
env, run = csi.init()
try:
volume_name = "example-disk"
volume_size = 1024 ** 3
pod_name1 = "example-pod-1"
pod_id1 = "deadbeef1"
env.csi.create_volume(name=volume_name, size=volume_size)
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type)
# run stage/publish again to simulate kubelet restart
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type)
except subprocess.CalledProcessError as e:
csi.log_called_process_error(e)
raise
finally:
csi.cleanup_after_test(env, volume_name, access_type, [pod_id1])
36 changes: 36 additions & 0 deletions cloud/blockstore/tests/csi_driver/e2e_tests/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
PY3TEST()

SIZE(MEDIUM)
TIMEOUT(600)

TEST_SRCS(
test.py
)

DEPENDS(
cloud/blockstore/tools/csi_driver/cmd/nbs-csi-driver
cloud/blockstore/tools/csi_driver/client
cloud/blockstore/tests/csi_driver/lib
cloud/blockstore/tools/testing/csi-sanity/bin
cloud/blockstore/apps/client
cloud/blockstore/apps/disk_agent
cloud/blockstore/apps/endpoint_proxy
cloud/blockstore/apps/server
contrib/ydb/apps/ydbd
)

PEERDIR(
cloud/blockstore/config
cloud/blockstore/tests/csi_driver/lib
cloud/blockstore/tests/python/lib
cloud/storage/core/protos
contrib/ydb/core/protos
contrib/ydb/tests/library
)
SET_APPEND(QEMU_INVOKE_TEST YES)
SET_APPEND(QEMU_VIRTIO none)
SET_APPEND(QEMU_ENABLE_KVM True)
SET_APPEND(QEMU_MEM 8G)
INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/qemu.inc)

END()
Empty file.
Loading

0 comments on commit 046dcf7

Please sign in to comment.