diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 89cf535..6b43e22 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9, "3.10", "3.11"] + python-version: [3.8, 3.9, "3.10", 3.11, 3.12] steps: - uses: actions/checkout@v2 diff --git a/s3path.py b/_old_s3path.py similarity index 100% rename from s3path.py rename to _old_s3path.py diff --git a/s3path/__init__.py b/s3path/__init__.py new file mode 100644 index 0000000..590d250 --- /dev/null +++ b/s3path/__init__.py @@ -0,0 +1,787 @@ +from __future__ import annotations + +""" +s3path provides a Pythonic API to S3 by wrapping boto3 with pathlib interface +""" + +import sys +from collections import deque +from datetime import timedelta +from contextlib import suppress +from pathlib import PurePath, Path +from typing import Union, Literal, Optional +from io import DEFAULT_BUFFER_SIZE, TextIOWrapper + +import smart_open +from botocore.exceptions import ClientError +from boto3.resources.factory import ServiceResource + +from . import flavour +from . import accessor +from .accessor import StatResult + +__version__ = '0.5.1' +__all__ = ( + 'register_configuration_parameter', + 'S3Path', + 'VersionedS3Path', + 'PureS3Path', + 'PureVersionedS3Path', + 'StatResult', +) + + +def register_configuration_parameter( + path: PureS3Path, + *, + parameters: Optional[dict] = None, + resource: Optional[ServiceResource] = None, + glob_new_algorithm: Optional[bool] = None): + if not isinstance(path, PureS3Path): + raise TypeError(f'path argument have to be a {PurePath} type. got {type(path)}') + if parameters and not isinstance(parameters, dict): + raise TypeError(f'parameters argument have to be a dict type. got {type(path)}') + if parameters is None and resource is None and glob_new_algorithm is None: + raise ValueError('user have to specify parameters or resource arguments') + accessor.configuration_map.set_configuration( + path, + resource=resource, + arguments=parameters, + glob_new_algorithm=glob_new_algorithm) + + +class PureS3Path(PurePath): + """ + PurePath subclass for AWS S3 service. + + S3 is not a file-system but we can look at it like a POSIX system. + """ + _flavour = flavour + __slots__ = () + + def __init__(self, *args): + super().__init__(*args) + + new_parts = list(self.parts) + for part in new_parts[1:]: + if part == '..': + index = new_parts.index(part) + new_parts.pop(index - 1) + new_parts.remove(part) + + self._raw_paths = new_parts + self._load_parts() + + @classmethod + def from_uri(cls, uri: str): + """ + from_uri class method create a class instance from url + + >> from s3path import PureS3Path + >> PureS3Path.from_uri('s3:///') + << PureS3Path('//') + """ + if not uri.startswith('s3://'): + raise ValueError('Provided uri seems to be no S3 URI!') + return cls(uri[4:]) + + @classmethod + def from_bucket_key(cls, bucket: str, key: str): + """ + from_bucket_key class method create a class instance from bucket, key pair's + + >> from s3path import PureS3Path + >> PureS3Path.from_bucket_key(bucket='', key='') + << PureS3Path('//') + """ + bucket = cls(cls._flavour.sep, bucket) + if len(bucket.parts) != 2: + raise ValueError(f'bucket argument contains more then one path element: {bucket}') + key = cls(key) + if key.is_absolute(): + key = key.relative_to('/') + return bucket / key + + @property + def bucket(self) -> str: + """ + The AWS S3 Bucket name, or '' + """ + self._absolute_path_validation() + with suppress(ValueError): + _, bucket, *_ = self.parts + return bucket + return '' + + @property + def is_bucket(self) -> bool: + """ + Check if Path is a bucket + """ + return self.is_absolute() and self == PureS3Path(f"/{self.bucket}") + + @property + def key(self) -> str: + """ + The AWS S3 Key name, or '' + """ + self._absolute_path_validation() + key = self._flavour.sep.join(self.parts[2:]) + return key + + def as_uri(self) -> str: + """ + Return the path as a 's3' URI. + """ + uri = super().as_uri() + return uri.replace('file:///', 's3://') + + def _absolute_path_validation(self): + if not self.is_absolute(): + raise ValueError('relative path have no bucket, key specification') + + +class _PathNotSupportedMixin: + _NOT_SUPPORTED_MESSAGE = '{method} is unsupported on S3 service' + + @classmethod + def cwd(cls): + """ + cwd class method is unsupported on S3 service + AWS S3 don't have this file system action concept + """ + message = cls._NOT_SUPPORTED_MESSAGE.format(method=cls.cwd.__qualname__) + raise NotImplementedError(message) + + @classmethod + def home(cls): + """ + home class method is unsupported on S3 service + AWS S3 don't have this file system action concept + """ + message = cls._NOT_SUPPORTED_MESSAGE.format(method=cls.home.__qualname__) + raise NotImplementedError(message) + + def chmod(self, mode, *, follow_symlinks=True): + """ + chmod method is unsupported on S3 service + AWS S3 don't have this file system action concept + """ + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.chmod.__qualname__) + raise NotImplementedError(message) + + def expanduser(self): + """ + expanduser method is unsupported on S3 service + AWS S3 don't have this file system action concept + """ + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.expanduser.__qualname__) + raise NotImplementedError(message) + + def lchmod(self, mode): + """ + lchmod method is unsupported on S3 service + AWS S3 don't have this file system action concept + """ + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.lchmod.__qualname__) + raise NotImplementedError(message) + + def group(self): + """ + group method is unsupported on S3 service + AWS S3 don't have this file system action concept + """ + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.group.__qualname__) + raise NotImplementedError(message) + + def is_block_device(self): + """ + is_block_device method is unsupported on S3 service + AWS S3 don't have this file system action concept + """ + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.is_block_device.__qualname__) + raise NotImplementedError(message) + + def is_char_device(self): + """ + is_char_device method is unsupported on S3 service + AWS S3 don't have this file system action concept + """ + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.is_char_device.__qualname__) + raise NotImplementedError(message) + + def lstat(self): + """ + lstat method is unsupported on S3 service + AWS S3 don't have this file system action concept + """ + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.lstat.__qualname__) + raise NotImplementedError(message) + + def resolve(self): + """ + resolve method is unsupported on S3 service + AWS S3 don't have this file system action concept + """ + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.resolve.__qualname__) + raise NotImplementedError(message) + + def symlink_to(self, *args, **kwargs): + """ + symlink_to method is unsupported on S3 service + AWS S3 don't have this file system action concept + """ + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.symlink_to.__qualname__) + raise NotImplementedError(message) + + def hardlink_to(self, *args, **kwargs): + """ + hardlink_to method is unsupported on S3 service + AWS S3 don't have this file system action concept + """ + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.hardlink_to.__qualname__) + raise NotImplementedError(message) + + def readlink(self): + """ + readlink method is unsupported on S3 service + AWS S3 don't have this file system action concept + """ + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.readlink.__qualname__) + raise NotImplementedError(message) + + def is_symlink(self) -> Literal[False]: + """ + AWS S3 Service doesn't have symlink feature, There for this method will always return False + """ + return False + + def is_socket(self) -> Literal[False]: + """ + AWS S3 Service doesn't have sockets feature, There for this method will always return False + """ + return False + + def is_fifo(self) -> Literal[False]: + """ + AWS S3 Service doesn't have fifo feature, There for this method will always return False + """ + return False + + def is_mount(self) -> Literal[False]: + """ + AWS S3 Service doesn't have mounting feature, There for this method will always return False + """ + return False + + +class S3Path(_PathNotSupportedMixin, Path, PureS3Path): + def __new__(cls, *args, **kwargs): + if not cls._flavour.is_supported: + raise NotImplementedError(f'cannot instantiate {cls.__name__!r} on your system') + return super().__new__(cls, *args, **kwargs) + + def stat(self, *, follow_symlinks: bool = True) -> accessor.StatResult: + """ + Returns information about this path (similarly to boto3's ObjectSummary). + For compatibility with pathlib, the returned object some similar attributes like os.stat_result. + The result is looked up at each call to this method + """ + if not follow_symlinks: + raise NotImplementedError( + f'Setting follow_symlinks to {follow_symlinks} is unsupported on S3 service.') + + self._absolute_path_validation() + if not self.key: + return None + return accessor.stat(self, follow_symlinks=follow_symlinks) + + def absolute(self) -> S3Path: + """ + Handle absolute method only if the path is already an absolute one + since we have no way to compute an absolute path from a relative one in S3. + """ + if self.is_absolute(): + return self + # We can't compute the absolute path from a relative one + raise ValueError("Absolute path can't be determined for relative S3Path objects") + + def owner(self) -> str: + """ + Returns the name of the user owning the Bucket or key. + Similarly to boto3's ObjectSummary owner attribute + """ + self._absolute_path_validation() + if not self.is_file(): + raise KeyError('file not found') + return accessor.owner(self) + + def rename(self, target): # todo: Union[str, S3Path]) -> S3Path: + """ + Renames this file or Bucket / key prefix / key to the given target. + If target exists and is a file, it will be replaced silently if the user has permission. + If path is a key prefix, it will replace all the keys with the same prefix to the new target prefix. + Target can be either a string or another S3Path object. + """ + self._absolute_path_validation() + if not isinstance(target, type(self)): + target = type(self)(target) + target._absolute_path_validation() + accessor.rename(self, target) + return type(self)(target) + + def replace(self, target): # todo: Union[str, S3Path]) -> S3Path: + """ + Renames this Bucket / key prefix / key to the given target. + If target points to an existing Bucket / key prefix / key, it will be unconditionally replaced. + """ + return self.rename(target) + + def rmdir(self): + """ + Removes this Bucket / key prefix. The Bucket / key prefix must be empty + """ + self._absolute_path_validation() + if self.is_file(): + raise NotADirectoryError() + if not self.is_dir(): + raise FileNotFoundError() + accessor.rmdir(self) + + def samefile(self, other_path: Union[str, S3Path]) -> bool: + """ + Returns whether this path points to the same Bucket key as other_path, + Which can be either a Path object, or a string + """ + self._absolute_path_validation() + if not isinstance(other_path, Path): + other_path = type(self)(other_path) + return self.bucket == other_path.bucket and self.key == other_path.key and self.is_file() + + def touch(self, mode: int = 0o666, exist_ok: bool = True): + """ + Creates a key at this given path. + If the key already exists, + the function succeeds if exist_ok is true (and its modification time is updated to the current time), + otherwise FileExistsError is raised + """ + if self.exists() and not exist_ok: + raise FileExistsError() + self.write_text('') + + def mkdir(self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False): + """ + Create a path bucket. + AWS S3 Service doesn't support folders, therefore the mkdir method will only create the current bucket. + If the bucket path already exists, FileExistsError is raised. + + If exist_ok is false (the default), FileExistsError is raised if the target Bucket already exists. + If exist_ok is true, OSError exceptions will be ignored. + + if parents is false (the default), mkdir will create the bucket only if this is a Bucket path. + if parents is true, mkdir will create the bucket even if the path have a Key path. + + mode argument is ignored. + """ + try: + if not self.bucket: + raise FileNotFoundError(f'No bucket in {type(self)} {self}') + if self.key and not parents: + raise FileNotFoundError(f'Only bucket path can be created, got {self}') + if type(self)(self._flavour.sep, self.bucket).exists(): + raise FileExistsError(f'Bucket {self.bucket} already exists') + accessor.mkdir(self, mode) + except OSError: + if not exist_ok: + raise + + def is_dir(self) -> bool: + """ + Returns True if the path points to a Bucket or a key prefix, False if it points to a full key path. + False is also returned if the path doesn’t exist. + Other errors (such as permission errors) are propagated. + """ + self._absolute_path_validation() + if self.bucket and not self.key: + return True + return accessor.is_dir(self) + + def is_file(self) -> bool: + """ + Returns True if the path points to a Bucket key, False if it points to Bucket or a key prefix. + False is also returned if the path doesn’t exist. + Other errors (such as permission errors) are propagated. + """ + self._absolute_path_validation() + if not self.bucket or not self.key: + return False + try: + return bool(self.stat()) + except ClientError: + return False + + def exists(self) -> bool: + """ + Whether the path points to an existing Bucket, key or key prefix. + """ + self._absolute_path_validation() + if not self.bucket: + return True + return accessor.exists(self) + + def iterdir(self): # todo: -> Generator[S3Path, None, None]: + """ + When the path points to a Bucket or a key prefix, yield path objects of the directory contents + """ + self._absolute_path_validation() + for name in accessor.listdir(self): + yield self._make_child_relpath(name) + + def open( + self, + mode: Literal["r", "w", "rb", "wb"] = 'r', + buffering: int = DEFAULT_BUFFER_SIZE, + encoding: Optional[str] = None, + errors: Optional[str] = None, + newline: Optional[str] = None + ) -> Union[TextIOWrapper, smart_open.s3.Reader, smart_open.s3.MultipartWriter]: + """ + Opens the Bucket key pointed to by the path, returns a Key file object that you can read/write with + """ + self._absolute_path_validation() + if smart_open.__version__ < '4.0.0' and mode.startswith('b'): + mode = ''.join(reversed(mode)) + return accessor.open( + self, + mode=mode, + buffering=buffering, + encoding=encoding, + errors=errors, + newline=newline) + + def glob(self, pattern: str): # todo: -> Generator[S3Path, None, None]: + """ + Glob the given relative pattern in the Bucket / key prefix represented by this path, + yielding all matching files (of any kind) + """ + self._absolute_path_validation() + general_options = accessor.configuration_map.get_general_options(self) + glob_new_algorithm = general_options['glob_new_algorithm'] + if not glob_new_algorithm: + yield from super().glob(pattern) + return + yield from self._glob(pattern) + + def rglob(self, pattern: str): # todo: -> Generator[S3Path, None, None]: + """ + This is like calling S3Path.glob with "**/" added in front of the given relative pattern + """ + self._absolute_path_validation() + general_options = accessor.configuration_map.get_general_options(self) + glob_new_algorithm = general_options['glob_new_algorithm'] + if not glob_new_algorithm: + yield from super().rglob(pattern) + return + yield from self._rglob(pattern) + + def get_presigned_url(self, expire_in: Union[timedelta, int] = 3600) -> str: + """ + Returns a pre-signed url. Anyone with the url can make a GET request to get the file. + You can set an expiration date with the expire_in argument (integer or timedelta object). + + Note that generating a presigned url may require more information or setup than to use other + S3Path functions. It's because it needs to know the exact aws region and use s3v4 as signature + version. Meaning you may have to do this: + + ```python + import boto3 + from botocore.config import Config + from s3path import S3Path, register_configuration_parameter + + resource = boto3.resource( + "s3", + config=Config(signature_version="s3v4"), + region_name="the aws region name" + ) + register_configuration_parameter(S3Path("/"), resource=resource) + ``` + + A simple example: + ```python + from s3path import S3Path + import requests + + file = S3Path("/my-bucket/toto.txt") + file.write_text("hello world") + + presigned_url = file.get_presigned_url() + print(requests.get(presigned_url).content) + b"hello world" + """ + self._absolute_path_validation() + if isinstance(expire_in, timedelta): + expire_in = int(expire_in.total_seconds()) + if expire_in <= 0: + raise ValueError( + f"The expire_in argument can't represent a negative or null time delta. " + f'You provided expire_in = {expire_in} seconds which is below or equal to 0 seconds.') + return accessor.get_presigned_url(self, expire_in) + + def unlink(self, missing_ok: bool = False): + """ + Remove this key from its bucket. + """ + self._absolute_path_validation() + # S3 doesn't care if you remove full prefixes or buckets with its delete API + # so unless we manually check, this call will be dropped through without any + # validation and could result in data loss + try: + if self.is_dir(): + raise IsADirectoryError(str(self)) + if not self.is_file(): + raise FileNotFoundError(str(self)) + except (IsADirectoryError, FileNotFoundError): + if missing_ok: + return + raise + try: + # XXX: Note: If we don't check if the file exists here, S3 will always return + # success even if we try to delete a key that doesn't exist. So, if we want + # to raise a `FileNotFoundError`, we need to manually check if the file exists + # before we make the API call -- since we want to delete the file anyway, + # we can just ignore this for now and be satisfied that the file will be removed + accessor.unlink(self) + except FileNotFoundError: + if not missing_ok: + raise + + def _scandir(self): + """ + Override _scandir so _Selector will rely on an S3 compliant implementation + """ + return accessor.scandir(self) + + def _glob(self, pattern): + """ Glob with new Algorithm that better fit S3 API """ + sys.audit("pathlib.Path.glob", self, pattern) + if not pattern: + raise ValueError(f'Unacceptable pattern: {pattern}') + drv, root, pattern_parts = self._parse_path(pattern) + if drv or root: + raise NotImplementedError("Non-relative patterns are unsupported") + for part in pattern_parts: + if part != '**' and '**' in part: + raise ValueError("Invalid pattern: '**' can only be an entire path component") + selector = _Selector(self, pattern=pattern) + yield from selector.select() + + def _rglob(self, pattern): + """ RGlob with new Algorithm that better fit S3 API """ + sys.audit("pathlib.Path.rglob", self, pattern) + if not pattern: + raise ValueError(f'Unacceptable pattern: {pattern}') + drv, root, pattern_parts = self._parse_path(pattern) + if drv or root: + raise NotImplementedError("Non-relative patterns are unsupported") + for part in pattern_parts: + if part != '**' and '**' in part: + raise ValueError("Invalid pattern: '**' can only be an entire path component") + pattern = f'**{self._flavour.sep}{pattern}' + selector = _Selector(self, pattern=pattern) + yield from selector.select() + + +class PureVersionedS3Path(PureS3Path): + """ + PurePath subclass for AWS S3 service Keys with Versions. + + S3 is not a file-system, but we can look at it like a POSIX system. + """ + + def __new__(cls, *args, version_id: str): + self = super().__new__(cls, *args) + self.version_id = version_id + return self + + def __repr__(self) -> str: + return f'{type(self).__name__}({self.as_posix()}, version_id={self.version_id})' + + def __truediv__(self, key): + if not isinstance(key, (PureS3Path, str)): + return NotImplemented + + key = S3Path(key) if isinstance(key, str) else key + return key.__rtruediv__(self) + + def __rtruediv__(self, key): + if not isinstance(key, (PureS3Path, str)): + return NotImplemented + + new_path = super().__rtruediv__(key) + new_path.version_id = self.version_id + return new_path + + @classmethod + def from_uri(cls, uri: str, *, version_id: str): + """ + from_uri class method creates a class instance from uri and version id + + >> from s3path import VersionedS3Path + >> VersionedS3Path.from_uri('s3:///', version_id='') + << VersionedS3Path('//', version_id='') + """ + + self = PureS3Path.from_uri(uri) + return cls(self, version_id=version_id) + + @classmethod + def from_bucket_key(cls, bucket: str, key: str, *, version_id: str): + """ + from_bucket_key class method creates a class instance from bucket, key and version id + + >> from s3path import VersionedS3Path + >> VersionedS3Path.from_bucket_key('', '', version_id='') + << VersionedS3Path('//', version_id='') + """ + + self = PureS3Path.from_bucket_key(bucket=bucket, key=key) + return cls(self, version_id=version_id) + + def with_segments(self, *pathsegments): + """Construct a new path object from any number of path-like objects. + Subclasses may override this method to customize how new path objects + are created from methods like `iterdir()`. + """ + return type(self)(*pathsegments, version_id=self.version_id) + + def joinpath(self, *args): + if not args: + return self + + new_path = super().joinpath(*args) + + if isinstance(args[-1], PureVersionedS3Path): + new_path.version_id = args[-1].version_id + else: + new_path = S3Path(new_path) + + return new_path + + +class VersionedS3Path(PureVersionedS3Path, S3Path): + """ + S3Path subclass for AWS S3 service Keys with Versions. + + >> from s3path import VersionedS3Path + >> VersionedS3Path('//', version_id='') + << VersionedS3Path('//', version_id='') + """ + + +def _is_wildcard_pattern(pat): + # Whether this pattern needs actual matching using fnmatch, or can + # be looked up directly as a file. + return "*" in pat or "?" in pat or "[" in pat + + +class _Selector: + def __init__(self, path, *, pattern): + self._path = path + self._prefix, pattern = self._prefix_splitter(pattern) + self._full_keys = self._calculate_full_or_just_folder(pattern) + self._target_level = self._calculate_pattern_level(pattern) + self.match = self._path._flavour.compile_pattern_parts(self._prefix, pattern, path.bucket) + + def select(self): + for target in self._deep_cached_dir_scan(): + target = self._path._flavour.sep.join(('', self._path.bucket, target)) + if self.match(target): + yield type(self._path)(target) + + def _prefix_splitter(self, pattern): + if not _is_wildcard_pattern(pattern): + if self._path.key: + return f'{self._path.key}{self._path._flavour.sep}{pattern}', '' + return pattern, '' + + *_, pattern_parts = self._path._parse_path(pattern) + prefix = '' + for index, part in enumerate(pattern_parts): + if _is_wildcard_pattern(part): + break + prefix += f'{part}{self._path._flavour.sep}' + + if pattern.startswith(prefix): + pattern = pattern.replace(prefix, '', 1) + + key_prefix = self._path.key + if key_prefix: + prefix = self._path._flavour.sep.join((key_prefix, prefix)) + return prefix, pattern + + def _calculate_pattern_level(self, pattern): + if '**' in pattern: + return None + if self._prefix: + pattern = f'{self._prefix}{self._path._flavour.sep}{pattern}' + *_, pattern_parts = self._path._parse_path(pattern) + return len(pattern_parts) + + def _calculate_full_or_just_folder(self, pattern): + if '**' in pattern: + return True + *_, pattern_parts = self._path._parse_path(pattern) + for part in pattern_parts[:-1]: + if '*' in part: + return True + return False + + def _deep_cached_dir_scan(self): + cache = _DeepDirCache() + prefix_sep_count = self._prefix.count(self._path._flavour.sep) + for key in accessor.iter_keys(self._path, prefix=self._prefix, full_keys=self._full_keys): + key_sep_count = key.count(self._path._flavour.sep) + 1 + key_parts = key.rsplit(self._path._flavour.sep, maxsplit=key_sep_count - prefix_sep_count) + target_path_parts = key_parts[:self._target_level] + target_path = (self._path._flavour.sep).join(target_path_parts) + if cache.in_cache(target_path): + continue + yield target_path + cache.add(target_path_parts) + + +class _DeepDirCache: + def __init__(self): + self._queue = deque() + self._tree = {} + + def __repr__(self): + return f'{type(self).__name__}({self._tree, self._queue})' + + def in_cache(self, target_path: str) -> bool: + return target_path in self._queue + + def add(self, directory_parts): + tree = self._tree + for part in directory_parts: + if part in tree: + tree = tree[part] + continue + if tree: + deep_count = self._deep_count(tree) + tree.clear() + for _ in range(deep_count): + with suppress(IndexError): + self._queue.pop() + tree[part] = {} + directory = '/'.join(directory_parts) + self._queue.append(directory) + + def _deep_count(self, tree): + count = 0 + while True: + try: + tree = next(iter(tree.values())) + except StopIteration: + return count + count += 1 diff --git a/s3path/accessor.py b/s3path/accessor.py new file mode 100644 index 0000000..775b91e --- /dev/null +++ b/s3path/accessor.py @@ -0,0 +1,494 @@ +from os import stat_result +from functools import lru_cache +from collections import namedtuple +from io import UnsupportedOperation + +import boto3 +from boto3.s3.transfer import TransferManager +from botocore.exceptions import ClientError +from botocore.docs.docstring import LazyLoadedDocstring +import smart_open + +from .config import S3ConfigurationMap + + +class StatResult(namedtuple('BaseStatResult', 'size, last_modified, version_id', defaults=(None,))): + """ + Base of os.stat_result but with boto3 s3 features + """ + + def __getattr__(self, item): + if item in vars(stat_result): + raise UnsupportedOperation(f'{type(self).__name__} do not support {item} attribute') + return super().__getattribute__(item) + + @property + def st_size(self) -> int: + return self.size + + @property + def st_mtime(self) -> float: + return self.last_modified.timestamp() + + @property + def st_version_id(self) -> str: + return self.version_id + + +configuration_map = S3ConfigurationMap() + + +def stat(path, *, follow_symlinks=True): + if not follow_symlinks: + raise NotImplementedError( + f'Setting follow_symlinks to {follow_symlinks} is unsupported on S3 service.') + resource, _ = configuration_map.get_configuration(path) + if _is_versioned_path(path): + object_summary = resource.ObjectVersion(path.bucket, path.key, path.version_id).get() + return StatResult( + size=object_summary.get('ContentLength'), + last_modified=object_summary.get('LastModified'), + version_id=object_summary.get('VersionId'), + ) + object_summary = resource.ObjectSummary(path.bucket, path.key) + return StatResult( + size=object_summary.size, + last_modified=object_summary.last_modified, + ) + + +def owner(path): + bucket_name = path.bucket + key_name = path.key + resource, _ = configuration_map.get_configuration(path) + object_summary = resource.ObjectSummary(bucket_name, key_name) + # return object_summary.owner['DisplayName'] + # This is a hack till boto3 resolve this issue: + # https://github.com/boto/boto3/issues/1950 + responce = object_summary.meta.client.list_objects_v2( + Bucket=object_summary.bucket_name, + Prefix=object_summary.key, + FetchOwner=True) + return responce['Contents'][0]['Owner']['DisplayName'] + + +def rename(path, target): + source_bucket_name = path.bucket + source_key_name = path.key + target_bucket_name = target.bucket + target_key_name = target.key + + resource, config = configuration_map.get_configuration(path) + + if not is_dir(path): + target_bucket = resource.Bucket(target_bucket_name) + object_summary = resource.ObjectSummary(source_bucket_name, source_key_name) + old_source = {'Bucket': object_summary.bucket_name, 'Key': object_summary.key} + _boto3_method_with_extraargs( + target_bucket.copy, + config=config, + args=(old_source, target_key_name), + allowed_extra_args=TransferManager.ALLOWED_COPY_ARGS, + ) + _boto3_method_with_parameters(object_summary.delete) + return + bucket = resource.Bucket(source_bucket_name) + target_bucket = resource.Bucket(target_bucket_name) + for object_summary in bucket.objects.filter(Prefix=source_key_name): + old_source = {'Bucket': object_summary.bucket_name, 'Key': object_summary.key} + new_key = object_summary.key.replace(source_key_name, target_key_name) + _, config = configuration_map.get_configuration(type(path)(target_bucket_name, new_key)) + _boto3_method_with_extraargs( + target_bucket.copy, + config=config, + args=(old_source, new_key), + allowed_extra_args=TransferManager.ALLOWED_COPY_ARGS, + ) + _boto3_method_with_parameters(object_summary.delete) + + +replace = rename + + +def rmdir(path): + bucket_name = path.bucket + key_name = path.key + resource, config = configuration_map.get_configuration(path) + bucket = resource.Bucket(bucket_name) + for object_summary in bucket.objects.filter(Prefix=key_name): + _boto3_method_with_parameters(object_summary.delete, config=config) + if path.is_bucket: + _boto3_method_with_parameters(bucket.delete, config=config) + + +def mkdir(path, mode): + resource, config = configuration_map.get_configuration(path) + _boto3_method_with_parameters( + resource.create_bucket, + config=config, + kwargs={'Bucket': path.bucket}, + ) + + +def is_dir(path): + if str(path) == path.root: + return True + resource, _ = configuration_map.get_configuration(path) + bucket = resource.Bucket(path.bucket) + return any(bucket.objects.filter(Prefix=generate_prefix(path))) + + +def exists(path): + bucket_name = path.bucket + resource, _ = configuration_map.get_configuration(path) + + if not path.key: + # Check whether or not the bucket exists. + # See https://stackoverflow.com/questions/26871884 + try: + resource.meta.client.head_bucket(Bucket=bucket_name) + return True + except ClientError as e: + error_code = e.response['Error']['Code'] + if error_code == '404': + # Not found + return False + raise e + + bucket = resource.Bucket(bucket_name) + key_name = str(path.key) + + if _is_versioned_path(path): + for object in bucket.object_versions.filter(Prefix=key_name): + if object.version_id != path.version_id: + continue + if object.key == key_name: + return True + if object.key.startswith(key_name + path._flavour.sep): + return True + return False + + for object in bucket.objects.filter(Prefix=key_name): + if object.key == key_name: + return True + if object.key.startswith(key_name + path._flavour.sep): + return True + return False + + +def iter_keys(path, *, prefix=None, full_keys=True): + resource, _ = configuration_map.get_configuration(path) + bucket_name = path.bucket + + def get_keys(): + continuation_token = None + while True: + if continuation_token: + kwargs['ContinuationToken'] = continuation_token + response = resource.meta.client.list_objects_v2(**kwargs) + for file in response.get('Contents', ()): + yield file['Key'] + for folder in response.get('CommonPrefixes', ()): + yield folder['Prefix'] + if not response.get('IsTruncated'): + break + continuation_token = response.get('NextContinuationToken') + + # get buckets + if not bucket_name and not full_keys: + for bucket in resource.buckets.filter(): + yield bucket.name + return + # get keys in buckets + if not bucket_name: + for bucket in resource.buckets.filter(): + kwargs = {'Bucket': bucket.name} + yield from get_keys() + return + # get keys or part of keys in buckets + kwargs = {'Bucket': bucket_name} + if prefix: + kwargs['Prefix'] = prefix + if not full_keys: + kwargs['Delimiter'] = path._flavour.sep + yield from get_keys() + + +def scandir(path): + return _S3Scandir(path=path) + + +def listdir(path): + with scandir(path) as scandir_iter: + return [entry.name for entry in scandir_iter] + + +def open(path, *, mode='r', buffering=-1, encoding=None, errors=None, newline=None): + resource, config = configuration_map.get_configuration(path) + + if smart_open.__version__ < '4.0.0' and mode.startswith('b'): + mode = ''.join(reversed(mode)) + smart_open_kwargs = { + 'uri': "s3:/" + str(path), + 'mode': mode, + 'buffering': buffering, + 'encoding': encoding, + 'errors': errors, + 'newline': newline, + } + transport_params = {'defer_seek': True} + if _is_versioned_path(path): + transport_params['version_id'] = path.version_id + dummy_object = resource.Object('bucket', 'key') + + if smart_open.__version__ >= '5.1.0': + _smart_open_new_version_kwargs( + dummy_object, + resource, + config, + transport_params, + smart_open_kwargs) + else: + _smart_open_old_version_kwargs( + dummy_object, + resource, + config, + transport_params, + smart_open_kwargs) + + file_object = smart_open.open(**smart_open_kwargs) + return file_object + + +def get_presigned_url(path, expire_in: int) -> str: + resource, config = configuration_map.get_configuration(path) + return _boto3_method_with_parameters( + resource.meta.client.generate_presigned_url, + config=config, + kwargs={ + 'ClientMethod': 'get_object', + 'Params': {'Bucket': path.bucket, 'Key': path.key}, + 'ExpiresIn': expire_in, + } + ) + + +def generate_prefix(path): + sep = path._flavour.sep + if not path.key: + return '' + key_name = path.key + if not key_name.endswith(sep): + return key_name + sep + return key_name + + +def unlink(path, *args, **kwargs): + bucket_name = path.bucket + key_name = path.key + resource, config = configuration_map.get_configuration(path) + bucket = resource.Bucket(bucket_name) + try: + _boto3_method_with_parameters( + bucket.meta.client.delete_object, + config=config, + kwargs={"Bucket": bucket_name, "Key": key_name} + ) + except ClientError: + raise OSError(f'/{bucket_name}/{key_name}') + + +def _is_versioned_path(path): + return hasattr(path, 'version_id') and bool(path.version_id) + + +def _smart_open_new_version_kwargs( + dummy_object, + resource, + config, + transport_params, + smart_open_kwargs): + """ + New Smart-Open api + Doc: https://github.com/RaRe-Technologies/smart_open/blob/develop/MIGRATING_FROM_OLDER_VERSIONS.rst + """ + get_object_kwargs = _update_kwargs_with_config( + dummy_object.meta.client.get_object, config=config) + create_multipart_upload_kwargs = _update_kwargs_with_config( + dummy_object.meta.client.create_multipart_upload, config=config) + transport_params.update( + client=resource.meta.client, + client_kwargs={ + 'S3.Client.create_multipart_upload': create_multipart_upload_kwargs, + 'S3.Client.get_object': get_object_kwargs + }, + ) + smart_open_kwargs.update( + compression='disable', + transport_params=transport_params, + ) + + +def _smart_open_old_version_kwargs( + dummy_object, + resource, + config, + transport_params, + smart_open_kwargs): + """ + Old Smart-Open api + <5.0.0 + """ + def get_resource_kwargs(): + # This is a good example of the complicity of boto3 and botocore + # resource arguments from the resource object :-/ + # very annoying... + + try: + access_key = resource.meta.client._request_signer._credentials.access_key + secret_key = resource.meta.client._request_signer._credentials.secret_key + token = resource.meta.client._request_signer._credentials.token + except AttributeError: + access_key = secret_key = token = None + return { + 'endpoint_url': resource.meta.client.meta._endpoint_url, + 'config': resource.meta.client._client_config, + 'region_name': resource.meta.client._client_config.region_name, + 'use_ssl': resource.meta.client._endpoint.host.startswith('https'), + 'verify': resource.meta.client._endpoint.http_session._verify, + 'aws_access_key_id': access_key, + 'aws_secret_access_key': secret_key, + 'aws_session_token': token, + } + + initiate_multipart_upload_kwargs = _update_kwargs_with_config( + dummy_object.initiate_multipart_upload, config=config) + object_kwargs = _update_kwargs_with_config(dummy_object.get, config=config) + transport_params.update( + multipart_upload_kwargs=initiate_multipart_upload_kwargs, + object_kwargs=object_kwargs, + resource_kwargs=get_resource_kwargs(), + session=boto3.DEFAULT_SESSION, + ) + smart_open_kwargs.update( + ignore_ext=True, + transport_params=transport_params, + ) + + +def _update_kwargs_with_config(boto3_method, config, kwargs=None): + kwargs = kwargs or {} + if config is not None: + kwargs.update({ + key: value + for key, value in config.items() + if key in _get_action_arguments(boto3_method) + }) + return kwargs + + +def _boto3_method_with_parameters(boto3_method, config=None, args=(), kwargs=None): + kwargs = _update_kwargs_with_config(boto3_method, config, kwargs) + return boto3_method(*args, **kwargs) + + +def _boto3_method_with_extraargs( + boto3_method, + config=None, + args=(), + kwargs=None, + extra_args=None, + allowed_extra_args=()): + kwargs = kwargs or {} + extra_args = extra_args or {} + if config is not None: + extra_args.update({ + key: value + for key, value in config.items() + if key in allowed_extra_args + }) + kwargs["ExtraArgs"] = extra_args + return boto3_method(*args, **kwargs) + +@lru_cache() +def _get_action_arguments(action): + if isinstance(action.__doc__, LazyLoadedDocstring): + docs = action.__doc__._generate() + else: + docs = action.__doc__ + return set( + line.replace(':param ', '').strip().strip(':') + for line in docs.splitlines() + if line.startswith(':param ') + ) + + +class _S3Scandir: + def __init__(self, *, path): + self._path = path + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return + + def __iter__(self): + bucket_name = self._path.bucket + resource, _ = configuration_map.get_configuration(self._path) + if not bucket_name: + for bucket in resource.buckets.filter(Prefix=str(self._path)): + yield _S3DirEntry(bucket.name, is_dir=True) + return + bucket = resource.Bucket(bucket_name) + sep = self._path._flavour.sep + + kwargs = { + 'Bucket': bucket.name, + 'Prefix': generate_prefix(self._path), + 'Delimiter': sep} + + continuation_token = None + while True: + if continuation_token: + kwargs['ContinuationToken'] = continuation_token + response = bucket.meta.client.list_objects_v2(**kwargs) + for folder in response.get('CommonPrefixes', ()): + full_name = folder['Prefix'][:-1] if folder['Prefix'].endswith(sep) else folder['Prefix'] + name = full_name.split(sep)[-1] + yield _S3DirEntry(name, is_dir=True) + for file in response.get('Contents', ()): + if file['Key'] == response['Prefix']: + continue + name = file['Key'].split(sep)[-1] + yield _S3DirEntry(name=name, is_dir=False, size=file['Size'], last_modified=file['LastModified']) + if not response.get('IsTruncated'): + break + continuation_token = response.get('NextContinuationToken') + + +class _S3DirEntry: + def __init__(self, name, is_dir, size=None, last_modified=None): + self.name = name + self._is_dir = is_dir + self._stat = StatResult(size=size, last_modified=last_modified) + + def __repr__(self): + return f'{type(self).__name__}(name={self.name}, is_dir={self._is_dir}, stat={self._stat})' + + def inode(self, *args, **kwargs): + return None + + def is_dir(self, follow_symlinks=False): + if follow_symlinks: + raise TypeError('AWS S3 Service does not have symlink feature') + return self._is_dir + + def is_file(self): + return not self._is_dir + + def is_symlink(self, *args, **kwargs): + return False + + def stat(self): + return self._stat diff --git a/s3path/config.py b/s3path/config.py new file mode 100644 index 0000000..9c676b8 --- /dev/null +++ b/s3path/config.py @@ -0,0 +1,63 @@ +from threading import Lock +from itertools import chain +from functools import lru_cache + +import boto3 + + +class S3ConfigurationMap: + def __init__(self): + self.arguments = None + self.resources = None + self.general_options = None + self.setup_lock = Lock() + self.is_setup = False + + def __repr__(self): + return f'{type(self).__name__}' \ + f'(arguments={self.arguments}, resources={self.resources}, is_setup={self.is_setup})' + + @property + def default_resource(self): + return boto3.resource('s3') + + def set_configuration(self, path, *, resource=None, arguments=None, glob_new_algorithm=None): + self._delayed_setup() + path_name = str(path) + if arguments is not None: + self.arguments[path_name] = arguments + if resource is not None: + self.resources[path_name] = resource + if glob_new_algorithm is not None: + self.general_options[path_name] = {'glob_new_algorithm': glob_new_algorithm} + self.get_configuration.cache_clear() + + @lru_cache() + def get_configuration(self, path): + self._delayed_setup() + resources = arguments = None + for path in chain([path], path.parents): + path_name = str(path) + if resources is None and path_name in self.resources: + resources = self.resources[path_name] + if arguments is None and path_name in self.arguments: + arguments = self.arguments[path_name] + return resources, arguments + + @lru_cache() + def get_general_options(self, path): + self._delayed_setup() + for path in chain([path], path.parents): + path_name = str(path) + if path_name in self.general_options: + return self.general_options[path_name] + return + + def _delayed_setup(self): + """ Resolves a circular dependency between us and PureS3Path """ + with self.setup_lock: + if not self.is_setup: + self.arguments = {'/': {}} + self.resources = {'/': self.default_resource} + self.general_options = {'/': {'glob_new_algorithm': True}} + self.is_setup = True diff --git a/s3path/flavour.py b/s3path/flavour.py new file mode 100644 index 0000000..34779f9 --- /dev/null +++ b/s3path/flavour.py @@ -0,0 +1,60 @@ +import re +import sys +import fnmatch +import posixpath +from pathlib import PurePath + +try: + import boto3 + import smart_open + + is_supported = True +except ImportError: + boto3 = smart_open = None + + is_supported = False + + +if sys.version_info >= (3, 12): + def __getattr__(name): + return getattr(posixpath, name) +else: + from pathlib import _posix_flavour + def __getattr__(name): + return getattr(_posix_flavour, name) + + + def parse_parts(parts): + drv, root, parsed = _posix_flavour.parse_parts(parts) + for part in parsed[1:]: + if part == '..': + index = parsed.index(part) + parsed.pop(index - 1) + parsed.remove(part) + return drv, root, parsed + + + def make_uri(path): + uri = _posix_flavour.make_uri(path) + return uri.replace('file:///', 's3://') + + +def compile_pattern_parts(prefix, pattern, bucket): + pattern = posixpath.sep.join(( + '', + bucket, + prefix, + pattern, + )) + + *_, pattern_parts = PurePath._parse_path(pattern) + new_regex_pattern = '' + for part in pattern_parts: + if part == posixpath.sep: + continue + if '**' in part: + new_regex_pattern += f'{posixpath.sep}*(?s:{part.replace("**", ".*")})' + continue + new_regex_pattern += f'{posixpath.sep}{fnmatch.translate(part)[:-2]}' + new_regex_pattern += '/*\Z' + return re.compile(new_regex_pattern).fullmatch diff --git a/setup.py b/setup.py index 4fd9d46..6d6c3a6 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ long_description = fh.read() setup( name='s3path', - version='0.5.0', + version='0.5.1', url='https://github.com/liormizr/s3path', author='Lior Mizrahi', author_email='li.mizr@gmail.com', @@ -30,5 +30,6 @@ 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', ], ) diff --git a/tests/conftest.py b/tests/conftest.py index dcab6e1..e487f60 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,13 +2,13 @@ import pytest from moto import mock_s3 -from s3path import register_configuration_parameter, PureS3Path, _s3_accessor +from s3path import register_configuration_parameter, PureS3Path, accessor def _cleanup(): - _s3_accessor.configuration_map.get_configuration.cache_clear() - _s3_accessor.configuration_map.get_general_options.cache_clear() - _s3_accessor.configuration_map.is_setup = False + accessor.configuration_map.get_configuration.cache_clear() + accessor.configuration_map.get_general_options.cache_clear() + accessor.configuration_map.is_setup = False @pytest.fixture() diff --git a/tests/test_pure_path_operations.py b/tests/test_pure_path_operations.py index 684c5c9..f1a1390 100644 --- a/tests/test_pure_path_operations.py +++ b/tests/test_pure_path_operations.py @@ -1,5 +1,4 @@ import os -import sys import pytest from pathlib import Path, PurePosixPath, PureWindowsPath from s3path import PureS3Path diff --git a/tests/test_s3path_configuration.py b/tests/test_s3path_configuration.py index 5dd5223..64f03a3 100644 --- a/tests/test_s3path_configuration.py +++ b/tests/test_s3path_configuration.py @@ -7,25 +7,25 @@ import boto3 from botocore.client import Config -from s3path import S3Path, PureS3Path, register_configuration_parameter, _s3_accessor +from s3path import S3Path, PureS3Path, register_configuration_parameter, accessor def test_s3_configuration_map_repr(): - assert repr(_s3_accessor.configuration_map) + assert repr(accessor.configuration_map) def test_basic_configuration(reset_configuration_cache): path = S3Path('/foo/') - _s3_accessor.configuration_map.arguments = _s3_accessor.configuration_map.resources = None + accessor.configuration_map.arguments = accessor.configuration_map.resources = None - assert path not in (_s3_accessor.configuration_map.arguments or ()) - assert path not in (_s3_accessor.configuration_map.resources or ()) - assert _s3_accessor.configuration_map.get_configuration(path) == ( - _s3_accessor.configuration_map.default_resource, {}) + assert str(path) not in (accessor.configuration_map.arguments or ()) + assert str(path) not in (accessor.configuration_map.resources or ()) + assert accessor.configuration_map.get_configuration(path) == ( + accessor.configuration_map.default_resource, {}) - assert (_s3_accessor.configuration_map.get_configuration(S3Path('/foo/')) - == _s3_accessor.configuration_map.get_configuration(PureS3Path('/foo/'))) + assert (accessor.configuration_map.get_configuration(S3Path('/foo/')) + == accessor.configuration_map.get_configuration(PureS3Path('/foo/'))) def test_register_configuration_exceptions(reset_configuration_cache): @@ -42,13 +42,13 @@ def test_register_configuration_exceptions(reset_configuration_cache): def test_hierarchical_configuration(reset_configuration_cache): path = S3Path('/foo/') register_configuration_parameter(path, parameters={'ContentType': 'text/html'}) - assert path in _s3_accessor.configuration_map.arguments - assert path not in _s3_accessor.configuration_map.resources - assert _s3_accessor.configuration_map.get_configuration(path) == ( - _s3_accessor.configuration_map.default_resource, {'ContentType': 'text/html'}) + assert str(path) in accessor.configuration_map.arguments + assert str(path) not in accessor.configuration_map.resources + assert accessor.configuration_map.get_configuration(path) == ( + accessor.configuration_map.default_resource, {'ContentType': 'text/html'}) - assert (_s3_accessor.configuration_map.get_configuration(S3Path('/foo/')) - == _s3_accessor.configuration_map.get_configuration(PureS3Path('/foo/'))) + assert (accessor.configuration_map.get_configuration(S3Path('/foo/')) + == accessor.configuration_map.get_configuration(PureS3Path('/foo/'))) def test_boto_methods_with_configuration(s3_mock, reset_configuration_cache): @@ -84,26 +84,26 @@ def test_configuration_per_bucket(reset_configuration_cache): config=Config(signature_version='s3v4'), region_name='us-east-1')) - assert _s3_accessor.configuration_map.get_configuration(PureS3Path('/')) == ( - _s3_accessor.configuration_map.default_resource, {'ContentType': 'text/html'}) - assert _s3_accessor.configuration_map.get_configuration(PureS3Path('/some_bucket')) == ( - _s3_accessor.configuration_map.default_resource, {'ContentType': 'text/html'}) - assert _s3_accessor.configuration_map.get_configuration(PureS3Path('/some_bucket')) == ( - _s3_accessor.configuration_map.default_resource, {'ContentType': 'text/html'}) + assert accessor.configuration_map.get_configuration(PureS3Path('/')) == ( + accessor.configuration_map.default_resource, {'ContentType': 'text/html'}) + assert accessor.configuration_map.get_configuration(PureS3Path('/some_bucket')) == ( + accessor.configuration_map.default_resource, {'ContentType': 'text/html'}) + assert accessor.configuration_map.get_configuration(PureS3Path('/some_bucket')) == ( + accessor.configuration_map.default_resource, {'ContentType': 'text/html'}) - resources, arguments = _s3_accessor.configuration_map.get_configuration(minio_bucket_path) + resources, arguments = accessor.configuration_map.get_configuration(minio_bucket_path) assert arguments == {'OutputSerialization': {'CSV': {}}} assert resources.meta.client._endpoint.host == 'http://localhost:9000' - resources, arguments = _s3_accessor.configuration_map.get_configuration(minio_bucket_path / 'some_key') + resources, arguments = accessor.configuration_map.get_configuration(minio_bucket_path / 'some_key') assert arguments == {'OutputSerialization': {'CSV': {}}} assert resources.meta.client._endpoint.host == 'http://localhost:9000' - resources, arguments = _s3_accessor.configuration_map.get_configuration(local_stack_bucket_path) + resources, arguments = accessor.configuration_map.get_configuration(local_stack_bucket_path) assert arguments == {} assert resources.meta.client._endpoint.host == 'http://localhost:4566' - resources, arguments = _s3_accessor.configuration_map.get_configuration(local_stack_bucket_path / 'some_key') + resources, arguments = accessor.configuration_map.get_configuration(local_stack_bucket_path / 'some_key') assert arguments == {} assert resources.meta.client._endpoint.host == 'http://localhost:4566' @@ -124,12 +124,12 @@ def test_open_method_with_custom_endpoint_url(): def test_issue_123(): path = S3Path('/bucket') - old_resource, _ = path._accessor.configuration_map.get_configuration(path) + old_resource, _ = accessor.configuration_map.get_configuration(path) boto3.setup_default_session() s3 = boto3.resource('s3') register_configuration_parameter(path, resource=s3) - new_resource, _ = path._accessor.configuration_map.get_configuration(path) + new_resource, _ = accessor.configuration_map.get_configuration(path) assert new_resource is s3 assert new_resource is not old_resource