Skip to content

Commit

Permalink
Bug: Fix rm api and walk api bug (#137)
Browse files Browse the repository at this point in the history
* Bug: Fix rm api add wrong objects bug

* Bug: Fix rm api and walk api bug
  • Loading branch information
yanghua authored Sep 26, 2024
1 parent 1a76263 commit 490311a
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 54 deletions.
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
.ONESHELL:
ENV_PREFIX=$(shell poetry env info -p 2>/dev/null)/bin/
TEST_DIR?="tosfs/tests/"

.PHONY: help
help: ## Show the help.
Expand Down Expand Up @@ -48,11 +47,11 @@ lint: ## Run pep8, black, mypy linters.

.PHONY: test
test: ## Run tests and generate coverage report.
$(ENV_PREFIX)pytest -vv -s --cov-config .coveragerc --cov=tosfs -l --tb=short --maxfail=1 ${TEST_DIR} --ignore=${TEST_DIR}/test_stability.py
$(ENV_PREFIX)pytest -vv -s --cov-config .coveragerc --cov=tosfs -l --tb=short --maxfail=1 tosfs/tests/ --ignore=tosfs/tests/test_stability.py

.PHONY: test_stability
test_stability: ## Run stability tests.
$(ENV_PREFIX)pytest -vv -s --cov-config .coveragerc --cov=tosfs -l --tb=short --maxfail=1 ${TEST_DIR}/test_stability.py
$(ENV_PREFIX)pytest -vv -s --cov-config .coveragerc --cov=tosfs -l --tb=short --maxfail=1 tosfs/tests/test_stability.py

.PHONY: watch
watch: ## Run tests on every change.
Expand Down
16 changes: 6 additions & 10 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ readme = "README.md"

[tool.poetry.dependencies]
python = "^3.9"
fsspec = ">=2023.5.0"
fsspec = "==2023.5.0"
tos = ">=2.7.0"

[tool.poetry.group.dev.dependencies]
Expand Down
204 changes: 185 additions & 19 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,10 +936,117 @@ def walk(
if path in ["", "*"] + ["{}://".format(p) for p in self.protocol]:
raise ValueError("Cannot access all of TOS via path {}.".format(path))

return super().walk(
return self._fsspec_walk(
path, maxdepth=maxdepth, topdown=topdown, on_error=on_error, **kwargs
)

def _fsspec_walk( # noqa
self,
path: str,
maxdepth: Optional[int] = None,
topdown: bool = True,
on_error: str = "omit",
**kwargs: Any,
) -> Any:
"""Return all files belows path.
Copied from fsspec(2024.9.0) to fix fsspec(2023.5.0.)
List all files, recursing into subdirectories; output is iterator-style,
like ``os.walk()``. For a simple list of files, ``find()`` is available.
When topdown is True, the caller can modify the dirnames list in-place (perhaps
using del or slice assignment), and walk() will
only recurse into the subdirectories whose names remain in dirnames;
this can be used to prune the search, impose a specific order of visiting,
or even to inform walk() about directories the caller creates or renames before
it resumes walk() again.
Modifying dirnames when topdown is False has no effect. (see os.walk)
Note that the "files" outputted will include anything that is not
a directory, such as links.
Parameters
----------
path: str
Root to recurse into
maxdepth: int
Maximum recursion depth. None means limitless, but not recommended
on link-based file-systems.
topdown: bool (True)
Whether to walk the directory tree from the top downwards or from
the bottom upwards.
on_error: "omit", "raise", a collable
if omit (default), path with exception will simply be empty;
If raise, an underlying exception will be raised;
if callable, it will be called with a single OSError instance as argument
kwargs: passed to ``ls``
"""
# type: ignore
if maxdepth is not None and maxdepth < 1:
raise ValueError("maxdepth must be at least 1")

path = self._strip_protocol(path)
full_dirs = {}
dirs = {}
files = {}

detail = kwargs.pop("detail", False)
try:
listing = self.ls(path, detail=True, **kwargs)
except (FileNotFoundError, OSError) as e:
if on_error == "raise":
raise
elif callable(on_error):
on_error(e)
if detail:
return path, {}, {} # type: ignore
return path, [], [] # type: ignore

for info in listing:
# each info name must be at least [path]/part , but here
# we check also for names like [path]/part/
pathname = info["name"].rstrip("/") # type: ignore
name = pathname.rsplit("/", 1)[-1]
if info["type"] == "directory" and pathname != path: # type: ignore
# do not include "self" path
full_dirs[name] = pathname
dirs[name] = info
elif pathname == path:
# file-like with same name as give path
files[""] = info
else:
files[name] = info

if not detail:
dirs = list(dirs) # type: ignore
files = list(files) # type: ignore

if topdown:
# Yield before recursion if walking top down
yield path, dirs, files

if maxdepth is not None:
maxdepth -= 1
if maxdepth < 1:
if not topdown:
yield path, dirs, files
return

for d in dirs:
yield from self.walk(
full_dirs[d],
maxdepth=maxdepth,
detail=detail,
topdown=topdown,
**kwargs,
)

if not topdown:
# Yield after recursion if walking bottom up
yield path, dirs, files

def find(
self,
path: str,
Expand Down Expand Up @@ -984,7 +1091,7 @@ def find(
"Can not specify 'prefix' option alongside 'maxdepth' options."
)
if maxdepth:
return super().find(
return self._fsspec_find(
bucket + "/" + key,
maxdepth=maxdepth,
withdirs=withdirs,
Expand All @@ -999,6 +1106,54 @@ def find(
else:
return [o["name"] for o in out]

def _fsspec_find( # noqa #
self,
path: str,
maxdepth: Optional[int] = None,
withdirs: bool = False,
detail: bool = False,
**kwargs: Any, # type: ignore
) -> Any:
"""List all files below path.
Copied from fsspec(2024.9.0) to fix fsspec(2023.5.0.)
Like posix ``find`` command without conditions
Parameters
----------
path : str
maxdepth: int or None
If not None, the maximum number of levels to descend
withdirs: bool
Whether to include directory paths in the output. This is True
when used by glob, but users usually only want files.
kwargs are passed to ``ls``.
"""
# TODO: allow equivalent of -name parameter
path = self._strip_protocol(path)
out = {}

# Add the root directory if withdirs is requested
# This is needed for posix glob compliance
if withdirs and path != "" and self.isdir(path):
out[path] = self.info(path)

for _, dirs, files in self._fsspec_walk(path, maxdepth, detail=True, **kwargs):
if withdirs:
files.update(dirs)
out.update({info["name"]: info for name, info in files.items()})
if not out and self.isfile(path):
# walk works on directories, but find should also return [path]
# when path happens to be a file
out[path] = {}
names = sorted(out)
if not detail:
return names
else:
return {name: out[name] for name in names}

def expand_path(
self,
path: Union[str, List[str]],
Expand Down Expand Up @@ -1219,6 +1374,17 @@ def __init__(self, key: str, version_id: Optional[str] = None):
self.key = key
self.version_id = version_id

def delete_objects(deleting_objects: List[DeletingObject]) -> None:
delete_resp = retryable_func_executor(
lambda: self.tos_client.delete_multi_objects(
bucket, deleting_objects, quiet=True
),
max_retry_num=self.max_retry_num,
)
if delete_resp.error:
for d in delete_resp.error:
logger.warning("Deleted object: %s failed", d)

while is_truncated:

def _call_list_objects_type2(
Expand All @@ -1227,7 +1393,6 @@ def _call_list_objects_type2(
return self.tos_client.list_objects_type2(
bucket,
prefix=key.rstrip("/") + "/",
delimiter="/",
max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS,
continuation_token=continuation_token,
)
Expand All @@ -1239,23 +1404,15 @@ def _call_list_objects_type2(
)
is_truncated = resp.is_truncated
continuation_token = resp.next_continuation_token
all_results.extend(resp.contents + resp.common_prefixes)
all_results = resp.contents

deleting_objects = [
DeletingObject(o.key if hasattr(o, "key") else o.prefix)
for o in all_results
]
deleting_objects = [
DeletingObject(o.key if hasattr(o, "key") else o.prefix)
for o in all_results
]

if deleting_objects:
delete_resp = retryable_func_executor(
lambda: self.tos_client.delete_multi_objects(
bucket, deleting_objects, quiet=True
),
max_retry_num=self.max_retry_num,
)
if delete_resp.error:
for d in delete_resp.error:
logger.warning("Deleted object: %s failed", d)
if deleting_objects:
delete_objects(deleting_objects)

def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None:
"""Copy file between locations on tos.
Expand Down Expand Up @@ -1683,7 +1840,16 @@ def exists(self, path: str, **kwargs: Any) -> bool:
)
except TosServerError as ex:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
return False
resp = retryable_func_executor(
lambda: self.tos_client.list_objects_type2(
bucket,
key.rstrip("/") + "/",
start_after=key.rstrip("/") + "/",
max_keys=1,
),
max_retry_num=self.max_retry_num,
)
return len(resp.contents) > 0
else:
raise ex
else:
Expand Down
Loading

0 comments on commit 490311a

Please sign in to comment.