Skip to content

Commit

Permalink
dbench workflow
Browse files Browse the repository at this point in the history
Signed-off-by: Manimaran M <[email protected]>
  • Loading branch information
Manimaran-MM committed Jan 29, 2025
1 parent 61a480b commit 3601117
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 0 deletions.
68 changes: 68 additions & 0 deletions tests/cephfs/cephfs_utilsV1.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def prepare_clients(self, clients, build):
"gcc",
"python3-devel",
"git",
"postgresql postgresql-server postgresql-contrib",
]
if build.endswith("7") or build.startswith("3"):
pkgs.extend(
Expand All @@ -126,6 +127,23 @@ def prepare_clients(self, clients, build):
client.node.exec_command(
cmd="git clone https://github.com/bengland2/smallfile.git"
)

out, rc = client.node.exec_command(
sudo=True, cmd="rpm -qa | grep -w 'dbench'", check_ec=False
)
if "dbench" not in out:
log.info("Installing dbench")
client.node.exec_command(
sudo=True,
cmd=(
"dnf config-manager "
"--add-repo=https://download.fedoraproject.org/pub/epel/9/Everything/x86_64/"
),
)
client.node.exec_command(
sudo=True,
cmd="dnf install dbench -y --nogpgcheck",
)
if (
hasattr(clients[0].node, "vm_node")
and clients[0].node.vm_node.node_type == "baremetal"
Expand Down Expand Up @@ -3180,11 +3198,44 @@ def dd():
long_running=True,
)

def dbench():
log.info("IO tool scheduled : dbench")
io_params = {
"clients": random.choice(
range(8, 33, 8) # Randomly selects 8, 16, 24, or 32 for clients
),
"duration": random.choice(
range(60, 601, 60)
), # Randomly selects 60, 120, ..., 600 seconds
"testdir_prefix": "dbench_io_dir",
}
if kwargs.get("dbench_params"):
dbench_params = kwargs.get("dbench_params")
for io_param in io_params:
if dbench_params.get(io_param):
io_params[io_param] = dbench_params[io_param]

dir_suffix = "".join(
[
random.choice(string.ascii_lowercase + string.digits)
for _ in range(4)
]
)
io_path = f"{mounting_dir}/{io_params['testdir_prefix']}_{dir_suffix}"
client.exec_command(sudo=True, cmd=f"mkdir {io_path}")

client.exec_command(
sudo=True,
cmd=f"dbench {io_params['clients']} -t {io_params['duration']} -D {io_path}",
long_running=True,
)

io_tool_map = {
"dd": dd,
"smallfile": smallfile,
"wget": wget,
"file_extract": file_extract,
"dbench": dbench,
}

log.info(f"IO tools planned to run : {io_tools}")
Expand Down Expand Up @@ -3657,6 +3708,23 @@ def get_ceph_health_status(self, client, validate=True, status=["HEALTH_OK"]):
log.info("Ceph Cluster is Healthy")
return health_status

def monitor_ceph_health(self, client, retry, interval):
"""
Monitors Ceph health and prints ceph status at regular intervals
Args:
client : client node.
retry : Number of times to retry the command
intervals: Time duration between the retries (in seconds)
Return:
Prints Status of the Ceph Health.
"""
for i in range(1, retry + 1):
log.info(f"Running health status: Iteration: {i}")
self.get_ceph_health_status(client)
fs_status_info = self.get_fs_status_dump(client)
log.info(f"FS Status: {fs_status_info}")
time.sleep(interval)

@retry(CommandFailed, tries=10, delay=30)
def wait_for_host_online(self, client1, node):
out, rc = client1.exec_command(sudo=True, cmd="ceph orch host ls -f json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import traceback

from ceph.ceph import CommandFailed
from ceph.parallel import parallel
from tests.cephfs.cephfs_utilsV1 import FsUtils
from tests.cephfs.cephfs_volume_management import wait_for_process
from utility.log import Log
Expand Down Expand Up @@ -68,6 +69,11 @@ def run(ceph_cluster, **kw):
"subvol_name": f"{subvolume_name}_3",
"group_name": subvolume_group_name,
},
{
"vol_name": fs_name,
"subvol_name": f"{subvolume_name}_4",
"group_name": subvolume_group_name,
},
]

# Run the FS lifecycle for 5 iteration
Expand Down Expand Up @@ -202,6 +208,58 @@ def run(ceph_cluster, **kw):
long_running=True,
)

# Mounting Subvolume4 on Kernel - Run dbench
log.info(f"Mount subvolume {subvolume_name}_4 on Kernel Client")
kernel_mount_dir_dbench = f"/mnt/cephfs_kernel{mounting_dir}_4/"
mon_node_ips = fs_util.get_mon_node_ips()
subvol_path_kernel, rc = client1.exec_command(
sudo=True,
cmd=f"ceph fs subvolume getpath {fs_name} {subvolume_name}_4 {subvolume_group_name}",
)
fs_util.kernel_mount(
[client1],
kernel_mount_dir_dbench,
",".join(mon_node_ips),
sub_dir=f"{subvol_path_kernel.strip()}",
extra_params=f",fs={fs_name}",
)

with parallel() as p:
log.info("Start Writing IO on the subvolume using dbench")

p.spawn(
fs_util.run_ios_V1,
client=client1,
mounting_dir=kernel_mount_dir_dbench,
io_tools=["dbench"],
dbench_params={"duration": 40},
)

# Spawn the health monitoring task
p.spawn(
fs_util.monitor_ceph_health,
client=client1,
retry=4,
interval=10, # Set the interval for health checks during parallel operation
)

# Get ceph fs dump output
fs_dump_dict = fs_util.collect_fs_dump_for_validation(client1, fs_name)
log.debug(f"Output of FS dump: {fs_dump_dict}")

# Get ceph fs get of specific volume
fs_get_dict = fs_util.collect_fs_get_for_validation(client1, fs_name)
log.debug(f"Output of FS get: {fs_get_dict}")

# Get ceph fs status
fs_status_dict = fs_util.collect_fs_status_data_for_validation(
client1, fs_name
)
log.debug(f"Output of FS status: {fs_status_dict}")

fs_health_status = fs_util.get_ceph_health_status(client1)
log.debug(f"Output of FS Health status: {fs_health_status}")

for subvolume in subvolume_list:
subvolume_size_subvol = fs_util.get_subvolume_info(
client=client1, **subvolume
Expand All @@ -224,6 +282,12 @@ def run(ceph_cluster, **kw):
mounting_dir=kernel_mount_dir,
)

fs_util.client_clean_up(
"umount",
kernel_clients=[client1],
mounting_dir=kernel_mount_dir_dbench,
)

client1.exec_command(sudo=True, cmd=f"rm -rf {nfs_mounting_dir}/*")
client1.exec_command(sudo=True, cmd=f"umount {nfs_mounting_dir}")
client1.exec_command(
Expand Down

0 comments on commit 3601117

Please sign in to comment.