Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Optimize rm API via batch delete #72

Merged
merged 3 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from fsspec import AbstractFileSystem
from fsspec.spec import AbstractBufferedFile
from fsspec.utils import setup_logging as setup_logger
from tos.exceptions import TosClientError, TosServerError
from tos.models import CommonPrefixInfo
from tos.models2 import (
CreateMultipartUploadOutput,
Expand Down Expand Up @@ -394,6 +395,48 @@ def rmdir(self, path: str) -> None:
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def rm(
self, path: str, recursive: bool = False, maxdepth: Optional[int] = None
) -> None:
"""Delete files.

Parameters
----------
path: str or list of str
File(s) to delete.
recursive: bool
If file(s) are directories, recursively delete contents and then
also remove the directory
maxdepth: int or None
Depth to pass to walk for finding files to delete, if recursive.
If None, there will be no limit and infinite recursion may be
possible.

"""
if isinstance(path, str):
if not self.exists(path):
raise FileNotFoundError(path)

bucket, key, _ = self._split_path(path)
if not key:
raise TosfsError(f"Cannot remove a bucket {bucket} using rm api.")

if not recursive or maxdepth:
return super().rm(path, recursive=recursive, maxdepth=maxdepth)

if self.isfile(path):
self.rm_file(path)
else:
try:
self._list_and_batch_delete_objects(bucket, key)
except (TosClientError, TosServerError) as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e
else:
for single_path in path:
self.rm(single_path, recursive=recursive, maxdepth=maxdepth)

def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None:
"""Create directory entry at path.

Expand Down Expand Up @@ -1064,6 +1107,41 @@ def glob(

return out if detail else list(out)

def _list_and_batch_delete_objects(self, bucket: str, key: str) -> None:
is_truncated = True
continuation_token = ""
all_results = []

class DeletingObject:
def __init__(self, key: str, version_id: Optional[str] = None):
self.key = key
self.version_id = version_id

while is_truncated:
resp = self.tos_client.list_objects_type2(
bucket,
prefix=key.rstrip("/") + "/",
delimiter="/",
max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS,
continuation_token=continuation_token,
)
is_truncated = resp.is_truncated
continuation_token = resp.next_continuation_token
all_results.extend(resp.contents + resp.common_prefixes)

deleting_objects = [
DeletingObject(o.key if hasattr(o, "key") else o.prefix)
for o in all_results
]

if deleting_objects:
delete_resp = self.tos_client.delete_multi_objects(
bucket, deleting_objects, quiet=True
)
if delete_resp.error:
for d in delete_resp.error:
logger.warning("Deleted object: %s failed", d)

def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None:
"""Copy file between locations on tos.

Expand Down
35 changes: 35 additions & 0 deletions tosfs/tests/test_tosfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,41 @@ def test_glob(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> No
)


def test_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
file_name = random_str()
dir_name = random_str()
sub_dir_name = random_str()
sub_file_name = random_str()

# Test Non-Recursive Deletion
tosfs.touch(f"{bucket}/{temporary_workspace}/{file_name}")
assert tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}")
tosfs.rm(f"{bucket}/{temporary_workspace}/{file_name}")
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}")

# Test Recursive Deletion
tosfs.makedirs(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}")
tosfs.touch(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_file_name}")
tosfs.touch(
f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}"
)
assert tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_file_name}")
assert tosfs.exists(
f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}"
)
tosfs.rm(f"{bucket}/{temporary_workspace}/{dir_name}", recursive=True)
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}")
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}")

# Test Deletion of Non-Existent Path
with pytest.raises(FileNotFoundError):
tosfs.rm(f"{bucket}/{temporary_workspace}/nonexistent")

# Test Deletion of Bucket
with pytest.raises(TosfsError):
tosfs.rm(bucket)


###########################################################
# File operation tests #
###########################################################
Expand Down