From 643c6cb1d7af3848420184b10146bf17360a1d90 Mon Sep 17 00:00:00 2001 From: yanghua Date: Thu, 19 Sep 2024 18:52:36 +0800 Subject: [PATCH] Performance: Use iterator to optimize large dir list --- tosfs/core.py | 158 ++++++++++++++++++++------------------ tosfs/tests/test_tosfs.py | 42 ++++++++++ 2 files changed, 127 insertions(+), 73 deletions(-) diff --git a/tosfs/core.py b/tosfs/core.py index 4e7c06a..c22279a 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -312,6 +312,85 @@ def ls( return files if detail else sorted([o["name"] for o in files]) + def ls_iterate( + self, + path: str, + detail: bool = False, + versions: bool = False, + batch_size: int = LS_OPERATION_DEFAULT_MAX_ITEMS, + **kwargs: Union[str, bool, float, None], + ) -> Generator[Union[dict, str], None, None]: + """List objects under the given path in batches then returns an iterator. + + Parameters + ---------- + path : str + The path to list. + detail : bool, optional + Whether to return detailed information (default is False). + versions : bool, optional + Whether to list object versions (default is False). + batch_size : int, optional + The number of items to fetch in each batch (default is 1000). + **kwargs : dict, optional + Additional arguments. + + Returns + ------- + Generator[Union[dict, str], None, None] + An iterator that yields objects under the given path. + + Raises + ------ + ValueError + If versions is specified but the filesystem is not version aware. + + """ + if versions and not self.version_aware: + raise ValueError( + "versions cannot be specified if the filesystem " + "is not version aware." + ) + + path = self._strip_protocol(path).rstrip("/") + bucket, key, _ = self._split_path(path) + prefix = key.lstrip("/") + "/" if key else "" + continuation_token = "" + is_truncated = True + + while is_truncated: + + def _call_list_objects_type2( + continuation_token: str = continuation_token, + ) -> ListObjectType2Output: + return self.tos_client.list_objects_type2( + bucket, + prefix, + start_after=prefix, + delimiter="/", + max_keys=batch_size, + continuation_token=continuation_token, + ) + + resp = retryable_func_executor( + _call_list_objects_type2, + args=(continuation_token,), + max_retry_num=self.max_retry_num, + ) + is_truncated = resp.is_truncated + continuation_token = resp.next_continuation_token + results = resp.contents + resp.common_prefixes + + for obj in results: + if isinstance(obj, CommonPrefixInfo): + info = self._fill_dir_info(bucket, obj) + elif obj.key.endswith("/"): + info = self._fill_dir_info(bucket, None, obj.key) + else: + info = self._fill_file_info(obj, bucket, versions) + + yield info if detail else info["name"] + def info( self, path: str, @@ -392,7 +471,10 @@ def rmdir(self, path: str) -> None: if not self.isdir(path): raise NotADirectoryError(f"{path} is not a directory.") - if len(self._listdir(bucket, max_items=1, prefix=key.rstrip("/") + "/")) > 0: + if ( + len(self._listobjects(bucket, max_items=1, prefix=key.rstrip("/") + "/")) + > 0 + ): raise TosfsError(f"Directory {path} is not empty.") retryable_func_executor( @@ -1683,40 +1765,6 @@ def _lsdir( include_self: bool = False, versions: bool = False, ) -> List[dict]: - """List objects in a directory. - - Parameters - ---------- - path : str - The path to list. - max_items : int, optional - The maximum number of items to return (default is 1000). - delimiter : str, optional - The delimiter to use for grouping objects (default is '/'). - prefix : str, optional - The prefix to use for filtering objects (default is ''). - include_self : bool, optional - Whether to include the directory itself in the listing (default is False). - versions : bool, optional - Whether to list object versions (default is False). - - Returns - ------- - List[dict] - A list of objects in the directory. - - Raises - ------ - ValueError - If `versions` is specified but the filesystem is not version aware. - tos.exceptions.TosClientError - If there is a client error while listing the objects. - tos.exceptions.TosServerError - If there is a server error while listing the objects. - TosfsError - If there is an unknown error while listing the objects. - - """ bucket, key, _ = self._split_path(path) if not prefix: prefix = "" @@ -1726,7 +1774,7 @@ def _lsdir( logger.debug("Get directory listing for %s", path) dirs = [] files = [] - for obj in self._listdir( + for obj in self._listobjects( bucket, max_items=max_items, delimiter=delimiter, @@ -1744,7 +1792,7 @@ def _lsdir( return files - def _listdir( + def _listobjects( self, bucket: str, max_items: int = LS_OPERATION_DEFAULT_MAX_ITEMS, @@ -1753,42 +1801,6 @@ def _listdir( include_self: bool = False, versions: bool = False, ) -> List[Union[CommonPrefixInfo, ListedObject, ListedObjectVersion]]: - """List objects in a bucket. - - Parameters - ---------- - bucket : str - The bucket name. - max_items : int, optional - The maximum number of items to return (default is 1000). - delimiter : str, optional - The delimiter to use for grouping objects (default is '/'). - prefix : str, optional - The prefix to use for filtering objects (default is ''). - include_self : bool, optional - Whether to include the bucket itself in the listing (default is False). - versions : bool, optional - Whether to list object versions (default is False). - - Returns - ------- - List[Union[CommonPrefixInfo, ListedObject, ListedObjectVersion]] - A list of objects in the bucket. - The list may contain `CommonPrefixInfo` for directories, - `ListedObject` for files, and `ListedObjectVersion` for versioned objects. - - Raises - ------ - ValueError - If `versions` is specified but the filesystem is not version aware. - tos.exceptions.TosClientError - If there is a client error while listing the objects. - tos.exceptions.TosServerError - If there is a server error while listing the objects. - TosfsError - If there is an unknown error while listing the objects. - - """ if versions and not self.version_aware: raise ValueError( "versions cannot be specified if the filesystem is " diff --git a/tosfs/tests/test_tosfs.py b/tosfs/tests/test_tosfs.py index 75ce1a2..809a045 100644 --- a/tosfs/tests/test_tosfs.py +++ b/tosfs/tests/test_tosfs.py @@ -55,6 +55,48 @@ def test_ls_dir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> assert tosfs.ls(f"{bucket}/{temporary_workspace}/nonexistent", detail=False) == [] +def test_ls_iterate( + tosfs: TosFileSystem, bucket: str, temporary_workspace: str +) -> None: + dir_name = random_str() + another_dir_name = random_str() + sub_dir_name = random_str() + file_name = random_str() + sub_file_name = random_str() + + tosfs.makedirs(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}") + tosfs.makedirs(f"{bucket}/{temporary_workspace}/{another_dir_name}") + tosfs.touch(f"{bucket}/{temporary_workspace}/{dir_name}/{file_name}") + tosfs.touch( + f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}" + ) + + # Test listing without detail + result = list(tosfs.ls_iterate(f"{bucket}/{temporary_workspace}")) + assert f"{bucket}/{temporary_workspace}/{dir_name}" in result + + # Test listing with detail + result = list(tosfs.ls_iterate(f"{bucket}/{temporary_workspace}", detail=True)) + assert any( + item["name"] == f"{bucket}/{temporary_workspace}/{dir_name}" for item in result + ) + + # Test list with iterate + for item in tosfs.ls_iterate(f"{bucket}/{temporary_workspace}", detail=True): + assert item["name"] in sorted( + [ + f"{bucket}/{temporary_workspace}/{dir_name}", + f"{bucket}/{temporary_workspace}/{another_dir_name}", + ] + ) + + # Test listing with batch size and while loop more than one time + result = [] + for batch in tosfs.ls_iterate(f"{bucket}/{temporary_workspace}", batch_size=1): + result.append(batch) + assert len(result) == len([dir_name, another_dir_name]) + + def test_inner_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None: file_name = random_str() tosfs.tos_client.put_object(bucket=bucket, key=f"{temporary_workspace}/{file_name}")