Skip to content

Commit

Permalink
Merge branch 'red-hat-storage:main' into earmark_feature
Browse files Browse the repository at this point in the history
  • Loading branch information
Harishacharya-redhat committed Jan 15, 2025
2 parents 2e9f216 + 8adee02 commit 4e25f0a
Show file tree
Hide file tree
Showing 38 changed files with 2,026 additions and 128 deletions.
4 changes: 2 additions & 2 deletions .github/mergify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ pull_request_rules:
- "check-success=tox (3.9.18)"
- check-success=WIP
actions:
queue:
merge_method: merge
merge:
method: merge
- name: ask to resolve conflict
conditions:
- conflict
Expand Down
27 changes: 20 additions & 7 deletions ceph/ceph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1719,13 +1719,26 @@ def reconnect(self):

def __getstate__(self):
d = dict(self.__dict__)
del d["vm_node"]
del d["rssh"]
del d["ssh"]
del d["rssh_transport"]
del d["ssh_transport"]
del d["root_connection"]
del d["connection"]
if d.get("vm_node"):
del d["vm_node"]

if d.get("rssh"):
del d["rssh"]

if d.get("ssh"):
del d["ssh"]

if d.get("rssh_transport"):
del d["rssh_transport"]

if d.get("ssh_transport"):
del d["ssh_transport"]

if d.get("ssh_transport"):
del d["root_connection"]

if d.get("connection"):
del d["connection"]

return d

Expand Down
89 changes: 61 additions & 28 deletions ceph/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@
If one of the spawned functions throws an exception, it will be thrown
when iterating over the results, or when the with block ends.
At the end of the with block, the main thread waits until all
spawned functions have completed, or, if one exited with an exception,
kills the rest and raises the exception.
When the scope of with block changes, the main thread waits until all
spawned functions have completed within the given timeout. On timeout,
all pending threads/processes are issued shutdown command.
"""
import logging
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from datetime import datetime, timedelta
from time import sleep

logger = logging.getLogger(__name__)

Expand All @@ -56,24 +58,29 @@ def __init__(
self,
thread_pool=True,
timeout=None,
shutdown_wait=True,
shutdown_cancel_pending=False,
):
"""Object initialization method.
Args:
thread_pool (bool) Whether to use threads or processes.
timeout (int | float) Maximum allowed time.
shutdown_wait (bool) If disabled, it would not wait for executing
threads/process to complete.
shutdown_cancel_pending (bool) If enabled, it would cancel pending tasks.
"""
self._executor = ThreadPoolExecutor() if thread_pool else ProcessPoolExecutor()
self._timeout = timeout
self._shutdown_wait = shutdown_wait
self._cancel_pending = shutdown_cancel_pending
self._futures = list()
self._results = list()
self._iter_index = 0

@property
def count(self):
return len(self._futures)

@property
def results(self):
return self._results

def spawn(self, fun, *args, **kwargs):
"""Triggers the first class method.
Expand All @@ -93,30 +100,56 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, trackback):
_exceptions = []
exception_count = 0

for _f in as_completed(self._futures, timeout=self._timeout):
try:
_not_done = self._futures[:]
_end_time = datetime.now() + timedelta(
seconds=self._timeout if self._timeout else 3600
)

# Wait for all futures to complete within the given time or 1 hour.
while datetime.now() < _end_time:
# if the list is empty break
if len(_not_done) == 0:
break

sleep(2.0)
for _f in _not_done:
if _f.done():
_not_done.remove(_f)

# Graceful shutdown of running threads
if _not_done:
self._executor.shutdown(wait=False, cancel_futures=self._cancel_pending)

if exc_value is not None:
logger.exception(trackback)
return False

# Check for any exceptions and raise
# At this point, all threads/processes should have completed or cancelled
try:
for _f in self._futures:
self._results.append(_f.result())
except Exception as e:
logger.exception(e)
_exceptions.append(e)
exception_count += 1

if exception_count > 0 and not self._shutdown_wait:
# At this point we are ignoring results
self._executor.shutdown(wait=False, cancel_futures=self._cancel_pending)
raise _exceptions[0]

if len(_exceptions) > 0:
raise _exceptions[0]
except Exception:
logger.exception("Encountered an exception during parallel execution.")
raise

return False if exception_count == 0 else True
return True

def __iter__(self):
return self

def __next__(self):
for r in self._results:
yield r
if self.count == 0 or self._iter_index == self.count:
self._iter_index = 0 # reset the counter
raise StopIteration()

try:
# Keeping timeout consistent when called within the context
_timeout = self._timeout if self._timeout else 3600
out = self._futures[self._iter_index].result(timeout=_timeout)
except Exception as e:
logger.exception(e)
out = e

self._iter_index += 1
return out
40 changes: 40 additions & 0 deletions cli/ceph/fs/sub_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,43 @@ def getpath(self, volume, subvolume, **kwargs):
if isinstance(out, tuple):
return out[0].strip()
return out

def set_subvolume_earmark(self, volume, subvolume, earmark, **kwargs):
"""
Sets an earmark to the subvolume
Args:
volume (str): Name of vol where subvol is present
subvolume (str): subvol name
earmark (str): earmark name
"""
cmd = f"{self.base_cmd} earmark set {volume} {subvolume} --earmark {earmark} {build_cmd_from_args(**kwargs)}"
out = self.execute(sudo=True, cmd=cmd)
if isinstance(out, tuple):
return out[0].strip()
return out

def get_subvolume_earmark(self, volume, subvolume, **kwargs):
"""
Gets earmark from subvolume, if earmark is already present
Args:
volume (str): Name of vol where subvol is present
subvolume (str): subvol name
"""
cmd = f"{self.base_cmd} earmark get {volume} {subvolume} {build_cmd_from_args(**kwargs)}"
out = self.execute(sudo=True, cmd=cmd)
if isinstance(out, tuple):
return out[0].strip()
return out

def remove_subvolume_earmark(self, volume, subvolume, **kwargs):
"""
Remove the earmark from subvolume
Args:
volume (str): Name of vol where subvol is present
subvolume (str): subvol name
"""
cmd = f"{self.base_cmd} earmark rm {volume} {subvolume} {build_cmd_from_args(**kwargs)}"
out = self.execute(sudo=True, cmd=cmd)
if isinstance(out, tuple):
return out[0].strip()
return out
43 changes: 43 additions & 0 deletions conf/squid/common/3node-1client.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
globals:
- ceph-cluster:
name: ceph
node1:
role:
- _admin
- installer
- mon
- mgr
- osd
- crash
- grafana
- prometheus
- node-exporter
- alertmanager
no-of-volumes: 6
disk-size: 20
node2:
role:
- mon
- mgr
- mds
- osd
- rgw
- crash
- node-exporter
- alertmanager
no-of-volumes: 6
disk-size: 20
node3:
role:
- mon
- mgr
- mds
- osd
- rgw
- crash
- node-exporter
no-of-volumes: 6
disk-size: 20
node4:
role:
- client
58 changes: 58 additions & 0 deletions conf/squid/common/5node-2client-rh.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
globals:
- ceph-cluster:
name: ceph
node1:
role:
- _admin
- installer
- mon
- mgr
- crash
- grafana
- prometheus
- alertmanager
node2:
role:
- mon
- mgr
- osd
- rgw
- nfs
- crash
no-of-volumes: 6
disk-size: 40
node3:
role:
- mon
- mgr
- osd
- rgw
- crash
no-of-volumes: 6
disk-size: 40
node4:
role:
- mon
- mgr
- mds
- osd
- crash
no-of-volumes: 6
disk-size: 40
node5:
role:
- mon
- mgr
- mds
- osd
- crash
no-of-volumes: 6
disk-size: 40
node6:
id: node6
role:
- client
node7:
id: node7
role:
- client
70 changes: 70 additions & 0 deletions conf/squid/common/7node-2client-ibm.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
globals:
- ceph-cluster:
name: ceph
node1:
role:
- _admin
- installer
- mon
- mgr
- crash
- grafana
- prometheus
- alertmanager
node2:
role:
- mon
- mgr
- osd
- rgw
- nfs
- crash
no-of-volumes: 6
disk-size: 40
node3:
role:
- mon
- mgr
- osd
- rgw
- crash
no-of-volumes: 6
disk-size: 40
node4:
role:
- mon
- mgr
- mds
- osd
- crash
no-of-volumes: 6
disk-size: 40
node5:
role:
- mon
- mgr
- mds
- osd
- crash
no-of-volumes: 6
disk-size: 40
node6:
role:
- mon
- mgr
- crash
- nvmeof-gw
node7:
role:
- mon
- mgr
- crash
- nvmeof-gw
node8:
id: node8
role:
- client
node9:
id: node9
role:
- client
1 change: 1 addition & 0 deletions suites/reef/cephfs/tier-4_cephfs_recovery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,4 @@ tests:
module: cephfs_journal_tool.cephfs_journal_tool_event_mode.py
name: journal_tool_event_mode
polarion-id: "CEPH-83595482"
comments: "BZ 2335321"
Loading

0 comments on commit 4e25f0a

Please sign in to comment.