Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
liormizr committed Nov 17, 2022
1 parent c85c025 commit b5525a6
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 14 deletions.
14 changes: 14 additions & 0 deletions docs/advance.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ This example show how to specify default AWS S3 parameters, a `LocalStack`_ Buck
>>> register_configuration_parameter(minio_bucket_path, resource=minio_resource)
s3path library general options:
-------------------------------

In Version 0.4.0 we added a new algorithm for the r/glob methods.
To enable the old (pathlib common) Algorithm you can configure it like this:

.. code:: python
>>> from s3path import PureS3Path, register_configuration_parameter
>>> # Define path's for configuration
>>> path = PureS3Path('/')
>>> register_configuration_parameter(path, glob_new_algorithm=False)
.. _pathlib : https://docs.python.org/3/library/pathlib.html
.. _boto3 : https://github.com/boto/boto3
.. _configuration: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
Expand Down
19 changes: 18 additions & 1 deletion docs/interface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,19 @@ In other words, it enables recursive globbing:
S3Path('/pypi-proxy/boto3/index.html'),
S3Path('/pypi-proxy/botocore/index.html')]
**NOTE:** Using the "**" pattern in large Buckets may consume an inordinate amount of time.
New in version 0.4.0:
New Algorithm that better suited to s3 API.
Especially for recursive searches.

To enable the old (pathlib common) Algorithm you can configure it like this:

.. code:: python
register_configuration_parameter(path, glob_new_algorithm=False)
For more configuration details please see this `Advanced S3Path configuration`_

**NOTE:** Using the "**" pattern in large Buckets may consume an inordinate amount of time in the old algorithm.

S3Path.is_dir()
^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -258,6 +270,10 @@ This is like calling S3Path.glob_ with ``"**/"`` added in front of the given rel
S3Path('/pypi-proxy/index.html'),
S3Path('/pypi-proxy/botocore/index.html')]
New in version 0.4.0:
New Algorithm that better suited to s3 API.
Especially for recursive searches.

S3Path.rmdir()
^^^^^^^^^^^^^^

Expand Down Expand Up @@ -491,3 +507,4 @@ Changes in PureS3Path:
.. _IsADirectoryError : https://docs.python.org/3/library/exceptions.html#IsADirectoryError
.. _NotImplementedError : https://docs.python.org/3/library/exceptions.html#NotImplementedError
.. _ObjectSummary : https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#objectsummary
.. _Abstract pathlib interface: https://github.com/liormizr/s3path/blob/master/docs/interface.rst
246 changes: 236 additions & 10 deletions s3path.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
"""
s3path provides a Pythonic API to S3 by wrapping boto3 with pathlib interface
"""
import re
import sys
import fnmatch
from os import stat_result
from threading import Lock
from itertools import chain
from functools import lru_cache
from contextlib import suppress
from collections import namedtuple
from platform import python_version
from collections import namedtuple, deque
from distutils.version import StrictVersion
from io import DEFAULT_BUFFER_SIZE, UnsupportedOperation
from pathlib import _PosixFlavour, _Accessor, PurePath, Path
from threading import Lock
from pathlib import _PosixFlavour, _Accessor, _is_wildcard_pattern, PurePath, Path

try:
import boto3
Expand All @@ -26,7 +29,7 @@
smart_open = None
ALLOWED_COPY_ARGS = []

__version__ = '0.3.4'
__version__ = '0.4.0'
__all__ = (
'register_configuration_parameter',
'S3Path',
Expand All @@ -52,13 +55,34 @@ def make_uri(self, path):
uri = super().make_uri(path)
return uri.replace('file:///', 's3://')

def compile_pattern_parts(self, prefix, pattern, bucket):
pattern = self.sep.join((
'',
bucket,
prefix,
pattern,
))

*_, pattern_parts = self.parse_parts((pattern,))
new_regex_pattern = ''
for part in pattern_parts:
if part == self.sep:
continue
if '**' in part:
new_regex_pattern += f'{self.sep}*(?s:{part.replace("**", ".*")})'
continue
new_regex_pattern += f'{self.sep}{fnmatch.translate(part)[:-2]}'
new_regex_pattern += '/*\Z'
return re.compile(new_regex_pattern).fullmatch


class _S3ConfigurationMap:
def __init__(self, default_resource_kwargs, **default_arguments):
self.default_resource_kwargs = default_resource_kwargs
self.default_arguments = default_arguments
self.arguments = None
self.resources = None
self.general_options = None
self.setup_lock = Lock()
self.is_setup = False

Expand All @@ -72,6 +96,7 @@ def _delayed_setup(self):
if not self.is_setup:
self.arguments = {PureS3Path('/'): self.default_arguments}
self.resources = {PureS3Path('/'): self.default_resource}
self.general_options = {PureS3Path('/'): {'glob_new_algorithm': True}}
self.is_setup = True

def __repr__(self):
Expand All @@ -81,12 +106,14 @@ def __repr__(self):
resources=self.resources,
is_setup=self.is_setup)

def set_configuration(self, path, *, resource=None, arguments=None):
def set_configuration(self, path, *, resource=None, arguments=None, glob_new_algorithm=None):
self._delayed_setup()
if arguments is not None:
self.arguments[path] = arguments
if resource is not None:
self.resources[path] = resource
if glob_new_algorithm is not None:
self.general_options[path] = {'glob_new_algorithm': glob_new_algorithm}

@lru_cache()
def get_configuration(self, path):
Expand All @@ -99,6 +126,14 @@ def get_configuration(self, path):
arguments = self.arguments[path]
return resources, arguments

@lru_cache()
def get_general_options(self, path):
self._delayed_setup()
for path in chain([path], path.parents):
if path in self.general_options:
return self.general_options[path]
return


class _S3Scandir:
def __init__(self, *, s3_accessor, path):
Expand Down Expand Up @@ -328,6 +363,43 @@ def unlink(self, path, *args, **kwargs):
except ClientError:
raise OSError("/{0}/{1}".format(bucket_name, key_name))

def iter_keys(self, path, *, prefix=None, full_keys=True):
resource, _ = self.configuration_map.get_configuration(path)
bucket_name = path.bucket

def get_keys():
continuation_token = None
while True:
if continuation_token:
kwargs['ContinuationToken'] = continuation_token
response = resource.meta.client.list_objects_v2(**kwargs)
for file in response.get('Contents', ()):
yield file['Key']
for folder in response.get('CommonPrefixes', ()):
yield folder['Prefix']
if not response.get('IsTruncated'):
break
continuation_token = response.get('NextContinuationToken')

# get buckets
if not bucket_name and not full_keys:
for bucket in resource.buckets.filter():
yield bucket.name
return
# get keys in buckets
if not bucket_name:
for bucket in resource.buckets.filter():
kwargs = {'Bucket': bucket.name}
yield from get_keys()
return
# get keys or part of keys in buckets
kwargs = {'Bucket': bucket_name}
if prefix:
kwargs['Prefix'] = prefix
if not full_keys:
kwargs['Delimiter'] = path._flavour.sep
yield from get_keys()

def _update_kwargs_with_config(self, boto3_method, config, kwargs=None):
kwargs = kwargs or {}
if config is not None:
Expand Down Expand Up @@ -550,18 +622,131 @@ def readlink(self):
raise NotImplementedError(message)


class _Selector:
def __init__(self, path, *, pattern):
self._path = path
self._prefix, pattern = self._prefix_splitter(pattern)
self._full_keys = self._calculate_full_or_just_folder(pattern)
self._target_level = self._calculate_pattern_level(pattern)
self.match = self._path._flavour.compile_pattern_parts(self._prefix, pattern, path.bucket)

def select(self):
for target in self._deep_cached_dir_scan():
target = self._path._flavour.sep.join(('', self._path.bucket, target))
if self.match(target):
yield type(self._path)(target)

def _prefix_splitter(self, pattern):
*_, pattern_parts = self._path._flavour.parse_parts((pattern,))
prefix = ''
key_prefix = self._path.key
for part in pattern_parts:
if _is_wildcard_pattern(part):
break
if prefix:
prefix += f'{self._path._flavour.sep}{part}'
else:
prefix = part
prefix_folder = f'{prefix}{self._path._flavour.sep}'
if prefix_folder == pattern:
prefix = prefix_folder
if key_prefix:
prefix = f'{key_prefix}{self._path._flavour.sep}{prefix}'
if pattern.startswith(prefix):
pattern = pattern.replace(prefix, '', 1)
return prefix, pattern

def _calculate_pattern_level(self, pattern):
if '**' in pattern:
return None
if self._prefix:
pattern = f'{self._prefix}{self._path._flavour.sep}{pattern}'
*_, pattern_parts = self._path._flavour.parse_parts((pattern,))
index = 0
for index, part in enumerate(reversed(pattern_parts), 1):
if not _is_wildcard_pattern(part):
break
return index

def _calculate_full_or_just_folder(self, pattern):
if '**' in pattern:
return True
*_, pattern_parts = self._path._flavour.parse_parts((pattern,))
for part in pattern_parts[:-1]:
if '*' in part:
return True
return False

def _deep_cached_dir_scan(self):
cache = _DeepDirCache()
prefix_sep_count = self._prefix.count(self._path._flavour.sep)
for key in self._path._accessor.iter_keys(self._path, prefix=self._prefix, full_keys=self._full_keys):
key_sep_count = key.count(self._path._flavour.sep) + 1
key_parts = key.rsplit(self._path._flavour.sep, maxsplit=key_sep_count - prefix_sep_count)
key_parts_count = sum(1 for _ in key.split(self._path._flavour.sep) if _)
for index in range(prefix_sep_count, key_parts_count + 1):
if self._target_level and self._target_level < index:
break
target_path_parts = key_parts[:index]
target_path = (self._path._flavour.sep).join(target_path_parts)
if cache.in_cache(target_path):
continue
if self._target_level is None or self._target_level == index:
yield target_path
cache.add(target_path_parts, target_path)


class _DeepDirCache:
def __init__(self):
self._queue = deque()
self._tree = {}

def __repr__(self):
return f'{type(self).__name__}{self._tree, self._queue}'

def in_cache(self, directory):
return directory in self._queue

def add(self, directory_parts, directory):
tree = self._tree
for part in directory_parts:
if part in tree:
tree = tree[part]
continue
if tree:
deep_count = self._deep_count(tree)
tree.clear()
for _ in range(deep_count):
self._queue.pop()
tree[part] = {}
self._queue.append(directory)

def _deep_count(self, tree):
count = 0
while True:
try:
tree = next(iter(tree.values()))
except StopIteration:
return count
count += 1


_s3_flavour = _S3Flavour()
_s3_accessor = _S3Accessor()


def register_configuration_parameter(path, *, parameters=None, resource=None):
def register_configuration_parameter(path, *, parameters=None, resource=None, glob_new_algorithm=None):
if not isinstance(path, PureS3Path):
raise TypeError('path argument have to be a {} type. got {}'.format(PurePath, type(path)))
if parameters and not isinstance(parameters, dict):
raise TypeError('parameters argument have to be a dict type. got {}'.format(type(path)))
if parameters is None and resource is None:
if parameters is None and resource is None and glob_new_algorithm is None:
raise ValueError('user have to specify parameters or resource arguments')
_s3_accessor.configuration_map.set_configuration(path, resource=resource, arguments=parameters)
_s3_accessor.configuration_map.set_configuration(
path,
resource=resource,
arguments=parameters,
glob_new_algorithm=glob_new_algorithm)


class PureS3Path(PurePath):
Expand Down Expand Up @@ -709,13 +894,54 @@ def glob(self, pattern):
Glob the given relative pattern in the Bucket / key prefix represented by this path,
yielding all matching files (of any kind)
"""
yield from super().glob(pattern)
self._absolute_path_validation()
general_options = self._accessor.configuration_map.get_general_options(self)
glob_new_algorithm = general_options['glob_new_algorithm']
if not glob_new_algorithm:
yield from super().glob(pattern)
return
yield from self._glob(pattern)

def _glob(self, pattern):
""" Glob with new Algorithm that better fit S3 API """
sys.audit("pathlib.Path.glob", self, pattern)
if not pattern:
raise ValueError("Unacceptable pattern: {!r}".format(pattern))
drv, root, pattern_parts = self._flavour.parse_parts((pattern,))
if drv or root:
raise NotImplementedError("Non-relative patterns are unsupported")
for part in pattern_parts:
if part != '**' and '**' in part:
raise ValueError("Invalid pattern: '**' can only be an entire path component")
selector = _Selector(self, pattern=pattern)
yield from selector.select()

def rglob(self, pattern):
"""
This is like calling S3Path.glob with "**/" added in front of the given relative pattern
"""
yield from super().rglob(pattern)
self._absolute_path_validation()
general_options = self._accessor.configuration_map.get_general_options(self)
glob_new_algorithm = general_options['glob_new_algorithm']
if not glob_new_algorithm:
yield from super().rglob(pattern)
return
yield from self._rglob(pattern)

def _rglob(self, pattern):
""" RGlob with new Algorithm that better fit S3 API """
sys.audit("pathlib.Path.rglob", self, pattern)
if not pattern:
raise ValueError("Unacceptable pattern: {!r}".format(pattern))
drv, root, pattern_parts = self._flavour.parse_parts((pattern,))
if drv or root:
raise NotImplementedError("Non-relative patterns are unsupported")
for part in pattern_parts:
if part != '**' and '**' in part:
raise ValueError("Invalid pattern: '**' can only be an entire path component")
pattern = f'**{self._flavour.sep}{pattern}'
selector = _Selector(self, pattern=pattern)
yield from selector.select()

def open(self, mode='r', buffering=DEFAULT_BUFFER_SIZE, encoding=None, errors=None, newline=None):
"""
Expand Down
Loading

0 comments on commit b5525a6

Please sign in to comment.