Skip to content

Commit

Permalink
pythonGH-128520: Read path metadata from pathlib.types.PathInfo whe…
Browse files Browse the repository at this point in the history
…n copying

Add private `PathInfo._stat()` and `_xattrs()` methods, which are called
when copying metadata to a local path. This removes all need for the
`CopyReader` and `_LocalCopyReader` classes, so we delete them. The
`CopyWriter` and `_LocalCopyWriter` classes are moved into `pathlib._os`,
renamed to `Copier` and `LocalCopier`, and refactored so that only one
copier object is created per copy operation.

This internal refactor shouldn't have any user-facing impact.
  • Loading branch information
barneygale committed Feb 8, 2025
1 parent 718ab66 commit 1fb81f5
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 363 deletions.
175 changes: 8 additions & 167 deletions Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
"""

import functools
import io
import posixpath
from errno import EINVAL
from glob import _PathGlobber, _no_recurse_symlinks
from pathlib._os import copyfileobj
from pathlib._os import Copier, magic_open


@functools.cache
Expand All @@ -41,162 +39,6 @@ def _explode_path(path):
return path, names


def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
newline=None):
"""
Open the file pointed to by this path and return a file object, as
the built-in open() function does.
"""
try:
return io.open(path, mode, buffering, encoding, errors, newline)
except TypeError:
pass
cls = type(path)
text = 'b' not in mode
mode = ''.join(sorted(c for c in mode if c not in 'bt'))
if text:
try:
attr = getattr(cls, f'__open_{mode}__')
except AttributeError:
pass
else:
return attr(path, buffering, encoding, errors, newline)

try:
attr = getattr(cls, f'__open_{mode}b__')
except AttributeError:
pass
else:
stream = attr(path, buffering)
if text:
stream = io.TextIOWrapper(stream, encoding, errors, newline)
return stream

raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")


class CopyReader:
"""
Class that implements the "read" part of copying between path objects.
An instance of this class is available from the ReadablePath._copy_reader
property.
"""
__slots__ = ('_path',)

def __init__(self, path):
self._path = path

_readable_metakeys = frozenset()

def _read_metadata(self, metakeys, *, follow_symlinks=True):
"""
Returns path metadata as a dict with string keys.
"""
raise NotImplementedError


class CopyWriter:
"""
Class that implements the "write" part of copying between path objects. An
instance of this class is available from the WritablePath._copy_writer
property.
"""
__slots__ = ('_path',)

def __init__(self, path):
self._path = path

_writable_metakeys = frozenset()

def _write_metadata(self, metadata, *, follow_symlinks=True):
"""
Sets path metadata from the given dict with string keys.
"""
raise NotImplementedError

def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
self._ensure_distinct_path(source)
if preserve_metadata:
metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys
else:
metakeys = None
if not follow_symlinks and source.is_symlink():
self._create_symlink(source, metakeys)
elif source.is_dir():
self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
else:
self._create_file(source, metakeys)
return self._path

def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
"""Copy the given directory to our path."""
children = list(source.iterdir())
self._path.mkdir(exist_ok=dirs_exist_ok)
for src in children:
dst = self._path.joinpath(src.name)
if not follow_symlinks and src.is_symlink():
dst._copy_writer._create_symlink(src, metakeys)
elif src.is_dir():
dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
else:
dst._copy_writer._create_file(src, metakeys)
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)

def _create_file(self, source, metakeys):
"""Copy the given file to our path."""
self._ensure_different_file(source)
with magic_open(source, 'rb') as source_f:
try:
with magic_open(self._path, 'wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not self._path.exists():
# Raise a less confusing exception.
raise FileNotFoundError(
f'Directory does not exist: {self._path}') from e
raise
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)

def _create_symlink(self, source, metakeys):
"""Copy the given symbolic link to our path."""
self._path.symlink_to(source.readlink())
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
if metadata:
self._write_metadata(metadata, follow_symlinks=False)

def _ensure_different_file(self, source):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
"""
pass

def _ensure_distinct_path(self, source):
"""
Raise OSError(EINVAL) if the other path is within this path.
"""
# Note: there is no straightforward, foolproof algorithm to determine
# if one directory is within another (a particularly perverse example
# would be a single network share mounted in one location via NFS, and
# in another location via CIFS), so we simply checks whether the
# other path is lexically equal to, or within, this path.
if source == self._path:
err = OSError(EINVAL, "Source and target are the same path")
elif source in self._path.parents:
err = OSError(EINVAL, "Source path is a parent of target path")
else:
return
err.filename = str(source)
err.filename2 = str(self._path)
raise err


class JoinablePath:
"""Base class for pure path objects.
Expand Down Expand Up @@ -512,22 +354,21 @@ def readlink(self):
"""
raise NotImplementedError

_copy_reader = property(CopyReader)

def copy(self, target, follow_symlinks=True, dirs_exist_ok=False,
preserve_metadata=False):
"""
Recursively copy this file or directory tree to the given destination.
"""
if not hasattr(target, '_copy_writer'):
if not hasattr(target, '_copier'):
target = self.with_segments(target)

# Delegate to the target path's CopyWriter object.
# Delegate to the target path's copier.
try:
create = target._copy_writer._create
copier = target._copier
except AttributeError:
raise TypeError(f"Target is not writable: {target}") from None
return create(self, follow_symlinks, dirs_exist_ok, preserve_metadata)
copier.ensure_distinct_path(self, target)
return copier(preserve_metadata, follow_symlinks, dirs_exist_ok).copy(self, target)

def copy_into(self, target_dir, *, follow_symlinks=True,
dirs_exist_ok=False, preserve_metadata=False):
Expand All @@ -537,7 +378,7 @@ def copy_into(self, target_dir, *, follow_symlinks=True,
name = self.name
if not name:
raise ValueError(f"{self!r} has an empty name")
elif hasattr(target_dir, '_copy_writer'):
elif hasattr(target_dir, '_copier'):
target = target_dir / name
else:
target = self.with_segments(target_dir, name)
Expand Down Expand Up @@ -588,4 +429,4 @@ def write_text(self, data, encoding=None, errors=None, newline=None):
with magic_open(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f:
return f.write(data)

_copy_writer = property(CopyWriter)
_copier = Copier
Loading

0 comments on commit 1fb81f5

Please sign in to comment.