From 09c5316d14c967a3acbc7aeb6291fadfd7b7c069 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 11 Sep 2024 11:51:48 +0800 Subject: [PATCH 1/3] Core: Optimize rm API via batch delete --- tosfs/core.py | 69 +++++++++++++++++++++++++++++++++++++++ tosfs/tests/test_tosfs.py | 35 ++++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/tosfs/core.py b/tosfs/core.py index 5026397..3ed0919 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -394,6 +394,41 @@ def rmdir(self, path: str) -> None: except Exception as e: raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + def rm( + self, path: str, recursive: bool = False, maxdepth: Optional[int] = None + ) -> None: + """Delete files. + + Parameters + ---------- + path: str or list of str + File(s) to delete. + recursive: bool + If file(s) are directories, recursively delete contents and then + also remove the directory + maxdepth: int or None + Depth to pass to walk for finding files to delete, if recursive. + If None, there will be no limit and infinite recursion may be + possible. + + """ + if not self.exists(path): + raise FileNotFoundError(path) + + bucket, key, _ = self._split_path(path) + if not key: + raise TosfsError(f"Cannot remove a bucket {bucket} using rm api.") + + if not recursive or maxdepth: + return super().rm(path, recursive=recursive, maxdepth=maxdepth) + + try: + self._list_and_batch_delete_objects(bucket, key) + except (tos.exceptions.TosClientError, tos.exceptions.TosServerError) as e: + raise e + except Exception as e: + raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None: """Create directory entry at path. @@ -1064,6 +1099,40 @@ def glob( return out if detail else list(out) + def _list_and_batch_delete_objects(self, bucket: str, key: str) -> None: + is_truncated = True + continuation_token = "" + all_results = [] + + class DeletingObject: + def __init__(self, key: str, version_id: Optional[str] = None): + self.key = key + self.version_id = version_id + + while is_truncated: + resp = self.tos_client.list_objects_type2( + bucket, + prefix=key.rstrip("/") + "/", + delimiter="/", + max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, + continuation_token=continuation_token, + ) + is_truncated = resp.is_truncated + continuation_token = resp.next_continuation_token + all_results.extend(resp.contents + resp.common_prefixes) + + deleting_objects = [ + DeletingObject(o.key if hasattr(o, "key") else o.prefix) + for o in all_results + ] + + delete_resp = self.tos_client.delete_multi_objects( + bucket, deleting_objects, quiet=True + ) + if delete_resp.error: + for d in delete_resp.error: + logger.warning("Deleted object: %s failed", d) + def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None: """Copy file between locations on tos. diff --git a/tosfs/tests/test_tosfs.py b/tosfs/tests/test_tosfs.py index a663d56..9e14d5c 100644 --- a/tosfs/tests/test_tosfs.py +++ b/tosfs/tests/test_tosfs.py @@ -628,6 +628,41 @@ def test_glob(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> No ) +def test_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None: + file_name = random_str() + dir_name = random_str() + sub_dir_name = random_str() + sub_file_name = random_str() + + # Test Non-Recursive Deletion + tosfs.touch(f"{bucket}/{temporary_workspace}/{file_name}") + assert tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}") + tosfs.rm(f"{bucket}/{temporary_workspace}/{file_name}") + assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}") + + # Test Recursive Deletion + tosfs.makedirs(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}") + tosfs.touch(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_file_name}") + tosfs.touch( + f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}" + ) + assert tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_file_name}") + assert tosfs.exists( + f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}" + ) + tosfs.rm(f"{bucket}/{temporary_workspace}/{dir_name}", recursive=True) + assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}") + assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}") + + # Test Deletion of Non-Existent Path + with pytest.raises(FileNotFoundError): + tosfs.rm(f"{bucket}/{temporary_workspace}/nonexistent") + + # Test Deletion of Bucket + with pytest.raises(TosfsError): + tosfs.rm(bucket) + + ########################################################### # File operation tests # ########################################################### From 82a1aa378b5719b43e5117a34b93322a952cee53 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 11 Sep 2024 12:36:27 +0800 Subject: [PATCH 2/3] Fix fsspec rm test issue --- tosfs/core.py | 46 +++++++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/tosfs/core.py b/tosfs/core.py index 3ed0919..535fb5b 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -412,22 +412,29 @@ def rm( possible. """ - if not self.exists(path): - raise FileNotFoundError(path) + if isinstance(path, str): + if not self.exists(path): + raise FileNotFoundError(path) - bucket, key, _ = self._split_path(path) - if not key: - raise TosfsError(f"Cannot remove a bucket {bucket} using rm api.") + bucket, key, _ = self._split_path(path) + if not key: + raise TosfsError(f"Cannot remove a bucket {bucket} using rm api.") - if not recursive or maxdepth: - return super().rm(path, recursive=recursive, maxdepth=maxdepth) + if not recursive or maxdepth: + return super().rm(path, recursive=recursive, maxdepth=maxdepth) - try: - self._list_and_batch_delete_objects(bucket, key) - except (tos.exceptions.TosClientError, tos.exceptions.TosServerError) as e: - raise e - except Exception as e: - raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + if self.isfile(path): + self.rm_file(path) + else: + try: + self._list_and_batch_delete_objects(bucket, key) + except (tos.exceptions.TosClientError, tos.exceptions.TosServerError) as e: + raise e + except Exception as e: + raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + else: + for single_path in path: + self.rm(single_path, recursive=recursive, maxdepth=maxdepth) def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None: """Create directory entry at path. @@ -1126,12 +1133,13 @@ def __init__(self, key: str, version_id: Optional[str] = None): for o in all_results ] - delete_resp = self.tos_client.delete_multi_objects( - bucket, deleting_objects, quiet=True - ) - if delete_resp.error: - for d in delete_resp.error: - logger.warning("Deleted object: %s failed", d) + if deleting_objects: + delete_resp = self.tos_client.delete_multi_objects( + bucket, deleting_objects, quiet=True + ) + if delete_resp.error: + for d in delete_resp.error: + logger.warning("Deleted object: %s failed", d) def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None: """Copy file between locations on tos. From 12bda51c86fc6038fea0e4385e5b28ef531ade88 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 11 Sep 2024 12:40:21 +0800 Subject: [PATCH 3/3] Fix code style issue --- tosfs/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tosfs/core.py b/tosfs/core.py index 535fb5b..0634f5e 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -25,6 +25,7 @@ from fsspec import AbstractFileSystem from fsspec.spec import AbstractBufferedFile from fsspec.utils import setup_logging as setup_logger +from tos.exceptions import TosClientError, TosServerError from tos.models import CommonPrefixInfo from tos.models2 import ( CreateMultipartUploadOutput, @@ -428,7 +429,7 @@ def rm( else: try: self._list_and_batch_delete_objects(bucket, key) - except (tos.exceptions.TosClientError, tos.exceptions.TosServerError) as e: + except (TosClientError, TosServerError) as e: raise e except Exception as e: raise TosfsError(f"Tosfs failed with unknown error: {e}") from e