Skip to content

Commit

Permalink
improve device resolution
Browse files Browse the repository at this point in the history
Signed-off-by: Zen <[email protected]>
  • Loading branch information
desultory committed Jan 12, 2025
1 parent e6d995d commit 8560fda
Showing 1 changed file with 29 additions and 11 deletions.
40 changes: 29 additions & 11 deletions src/ugrd/fs/mounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,19 +520,20 @@ def _resolve_dev(self, device_path) -> str:
Takes the device path, such as /dev/root, and resolves it to a device indexed in blkid.
If the device is an overlayfs, resolves the lowerdir device.
"""
if device_path in self["_blkid_info"]:
if str(device_path) in self["_blkid_info"]:
self.logger.debug("Device already resolved to blkid indexed device: %s" % device_path)
return device_path

while self["_mounts"][device_path]["fstype"] == "overlay":
device_path = _resolve_overlay_lower_device(self, device_path)
self.logger.debug("Resolving device: %s" % device_path)
mountpoint = _resolve_device_mountpoint(self, device_path)
device_path = _resolve_overlay_lower_device(self, mountpoint)

major, minor = _get_device_id(self["_mounts"][device_path]["device"])
major, minor = _get_device_id(self["_mounts"][mountpoint]["device"])
for device in self["_blkid_info"]:
check_major, check_minor = _get_device_id(device)
if (major, minor) == (check_major, check_minor):
self.logger.info(
"Resolved device: %s -> %s" % (self["_mounts"][device_path]["device"], colorize(device, "cyan"))
"Resolved device: %s -> %s" % (device_path, colorize(device, "cyan"))
)
return device
self.logger.critical("Failed to resolve device: %s" % colorize(device_path, "red", bold=True))
Expand All @@ -541,6 +542,13 @@ def _resolve_dev(self, device_path) -> str:
return device_path


def _resolve_device_mountpoint(self, device) -> str:
""" Gets the mountpoint of a device based on the device path."""
for mountpoint, mount_info in self["_mounts"].items():
if str(device) == mount_info["device"]:
return mountpoint
raise AutodetectError("Device mountpoint not found: %s" % device)

def _resolve_overlay_lower_dir(self, mountpoint) -> str:
for option in self["_mounts"][mountpoint]["options"]:
if option.startswith("lowerdir="):
Expand All @@ -551,12 +559,24 @@ def _resolve_overlay_lower_dir(self, mountpoint) -> str:


def _resolve_overlay_lower_device(self, mountpoint) -> dict:
"""Returns device for the lower overlayfs mountpoint."""
"""Returns device for the lower overlayfs mountpoint.
If it's not an overlayfs, returns the device for the mountpoint.
If it is, iterate through the lowerdir devices until a non-overlayfs mount is found.
"""
if self["_mounts"][mountpoint]["fstype"] != "overlay":
return self["_mounts"][mountpoint]["device"]

lowerdir = _resolve_overlay_lower_dir(self, mountpoint)
return self["_mounts"][lowerdir]["device"]
while self["_mounts"][mountpoint]["fstype"] == "overlay":
lowerdir = _resolve_overlay_lower_dir(self, mountpoint)
lower_path = Path(lowerdir)
while str(lower_path) not in self["_mounts"]:
lower_path = lower_path.parent
if lower_path == Path("/"):
raise AutodetectError("Lowerdir mount not found: %s" % lowerdir)
mountpoint = lower_path

return self["_mounts"][mountpoint]["device"]


@contains("autodetect_root", "Skipping root autodetection, autodetect_root is disabled.", log_level=30)
Expand Down Expand Up @@ -710,16 +730,14 @@ def _validate_host_mount(self, mount, destination_path=None) -> bool:
"""Checks if a defined mount exists on the host."""
if mount.get("no_validate"):
return self.logger.warning("Skipping host mount validation for config:\n%s" % pretty_print(mount))

if mount.get("base_mount"):
return self.logger.debug("Skipping host mount validation for base mount: %s" % mount)

mount_type, mount_val = _get_mount_source_type(self, mount, with_val=True)
# If a destination path is passed, like for /, use that instead of the mount's destination
destination_path = str(mount["destination"]) if destination_path is None else destination_path

# Using the mount path, get relevant host mount info
host_source_dev = _resolve_dev(self, destination_path)
host_source_dev = _resolve_dev(self, self["_mounts"][destination_path]["device"])
if ":" in host_source_dev: # Handle bcachefs
host_source_dev = host_source_dev.split(":")[0]

Expand Down

0 comments on commit 8560fda

Please sign in to comment.