Skip to content

Commit

Permalink
Performance: Use iterator to optimize large dir list
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Sep 19, 2024
1 parent c2b7978 commit 643c6cb
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 73 deletions.
158 changes: 85 additions & 73 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,85 @@ def ls(

return files if detail else sorted([o["name"] for o in files])

def ls_iterate(
self,
path: str,
detail: bool = False,
versions: bool = False,
batch_size: int = LS_OPERATION_DEFAULT_MAX_ITEMS,
**kwargs: Union[str, bool, float, None],
) -> Generator[Union[dict, str], None, None]:
"""List objects under the given path in batches then returns an iterator.
Parameters
----------
path : str
The path to list.
detail : bool, optional
Whether to return detailed information (default is False).
versions : bool, optional
Whether to list object versions (default is False).
batch_size : int, optional
The number of items to fetch in each batch (default is 1000).
**kwargs : dict, optional
Additional arguments.
Returns
-------
Generator[Union[dict, str], None, None]
An iterator that yields objects under the given path.
Raises
------
ValueError
If versions is specified but the filesystem is not version aware.
"""
if versions and not self.version_aware:
raise ValueError(
"versions cannot be specified if the filesystem "
"is not version aware."
)

path = self._strip_protocol(path).rstrip("/")
bucket, key, _ = self._split_path(path)
prefix = key.lstrip("/") + "/" if key else ""
continuation_token = ""
is_truncated = True

while is_truncated:

def _call_list_objects_type2(
continuation_token: str = continuation_token,
) -> ListObjectType2Output:
return self.tos_client.list_objects_type2(
bucket,
prefix,
start_after=prefix,
delimiter="/",
max_keys=batch_size,
continuation_token=continuation_token,
)

resp = retryable_func_executor(
_call_list_objects_type2,
args=(continuation_token,),
max_retry_num=self.max_retry_num,
)
is_truncated = resp.is_truncated
continuation_token = resp.next_continuation_token
results = resp.contents + resp.common_prefixes

for obj in results:
if isinstance(obj, CommonPrefixInfo):
info = self._fill_dir_info(bucket, obj)
elif obj.key.endswith("/"):
info = self._fill_dir_info(bucket, None, obj.key)
else:
info = self._fill_file_info(obj, bucket, versions)

yield info if detail else info["name"]

def info(
self,
path: str,
Expand Down Expand Up @@ -392,7 +471,10 @@ def rmdir(self, path: str) -> None:
if not self.isdir(path):
raise NotADirectoryError(f"{path} is not a directory.")

if len(self._listdir(bucket, max_items=1, prefix=key.rstrip("/") + "/")) > 0:
if (
len(self._listobjects(bucket, max_items=1, prefix=key.rstrip("/") + "/"))
> 0
):
raise TosfsError(f"Directory {path} is not empty.")

retryable_func_executor(
Expand Down Expand Up @@ -1683,40 +1765,6 @@ def _lsdir(
include_self: bool = False,
versions: bool = False,
) -> List[dict]:
"""List objects in a directory.
Parameters
----------
path : str
The path to list.
max_items : int, optional
The maximum number of items to return (default is 1000).
delimiter : str, optional
The delimiter to use for grouping objects (default is '/').
prefix : str, optional
The prefix to use for filtering objects (default is '').
include_self : bool, optional
Whether to include the directory itself in the listing (default is False).
versions : bool, optional
Whether to list object versions (default is False).
Returns
-------
List[dict]
A list of objects in the directory.
Raises
------
ValueError
If `versions` is specified but the filesystem is not version aware.
tos.exceptions.TosClientError
If there is a client error while listing the objects.
tos.exceptions.TosServerError
If there is a server error while listing the objects.
TosfsError
If there is an unknown error while listing the objects.
"""
bucket, key, _ = self._split_path(path)
if not prefix:
prefix = ""
Expand All @@ -1726,7 +1774,7 @@ def _lsdir(
logger.debug("Get directory listing for %s", path)
dirs = []
files = []
for obj in self._listdir(
for obj in self._listobjects(
bucket,
max_items=max_items,
delimiter=delimiter,
Expand All @@ -1744,7 +1792,7 @@ def _lsdir(

return files

def _listdir(
def _listobjects(
self,
bucket: str,
max_items: int = LS_OPERATION_DEFAULT_MAX_ITEMS,
Expand All @@ -1753,42 +1801,6 @@ def _listdir(
include_self: bool = False,
versions: bool = False,
) -> List[Union[CommonPrefixInfo, ListedObject, ListedObjectVersion]]:
"""List objects in a bucket.
Parameters
----------
bucket : str
The bucket name.
max_items : int, optional
The maximum number of items to return (default is 1000).
delimiter : str, optional
The delimiter to use for grouping objects (default is '/').
prefix : str, optional
The prefix to use for filtering objects (default is '').
include_self : bool, optional
Whether to include the bucket itself in the listing (default is False).
versions : bool, optional
Whether to list object versions (default is False).
Returns
-------
List[Union[CommonPrefixInfo, ListedObject, ListedObjectVersion]]
A list of objects in the bucket.
The list may contain `CommonPrefixInfo` for directories,
`ListedObject` for files, and `ListedObjectVersion` for versioned objects.
Raises
------
ValueError
If `versions` is specified but the filesystem is not version aware.
tos.exceptions.TosClientError
If there is a client error while listing the objects.
tos.exceptions.TosServerError
If there is a server error while listing the objects.
TosfsError
If there is an unknown error while listing the objects.
"""
if versions and not self.version_aware:
raise ValueError(
"versions cannot be specified if the filesystem is "
Expand Down
42 changes: 42 additions & 0 deletions tosfs/tests/test_tosfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,48 @@ def test_ls_dir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) ->
assert tosfs.ls(f"{bucket}/{temporary_workspace}/nonexistent", detail=False) == []


def test_ls_iterate(
tosfs: TosFileSystem, bucket: str, temporary_workspace: str
) -> None:
dir_name = random_str()
another_dir_name = random_str()
sub_dir_name = random_str()
file_name = random_str()
sub_file_name = random_str()

tosfs.makedirs(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}")
tosfs.makedirs(f"{bucket}/{temporary_workspace}/{another_dir_name}")
tosfs.touch(f"{bucket}/{temporary_workspace}/{dir_name}/{file_name}")
tosfs.touch(
f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}"
)

# Test listing without detail
result = list(tosfs.ls_iterate(f"{bucket}/{temporary_workspace}"))
assert f"{bucket}/{temporary_workspace}/{dir_name}" in result

# Test listing with detail
result = list(tosfs.ls_iterate(f"{bucket}/{temporary_workspace}", detail=True))
assert any(
item["name"] == f"{bucket}/{temporary_workspace}/{dir_name}" for item in result
)

# Test list with iterate
for item in tosfs.ls_iterate(f"{bucket}/{temporary_workspace}", detail=True):
assert item["name"] in sorted(
[
f"{bucket}/{temporary_workspace}/{dir_name}",
f"{bucket}/{temporary_workspace}/{another_dir_name}",
]
)

# Test listing with batch size and while loop more than one time
result = []
for batch in tosfs.ls_iterate(f"{bucket}/{temporary_workspace}", batch_size=1):
result.append(batch)
assert len(result) == len([dir_name, another_dir_name])


def test_inner_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
file_name = random_str()
tosfs.tos_client.put_object(bucket=bucket, key=f"{temporary_workspace}/{file_name}")
Expand Down

0 comments on commit 643c6cb

Please sign in to comment.