diff --git a/Makefile b/Makefile index 29f96e9..b354fcf 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,5 @@ .ONESHELL: ENV_PREFIX=$(shell poetry env info -p 2>/dev/null)/bin/ -TEST_DIR?="tosfs/tests/" .PHONY: help help: ## Show the help. @@ -48,11 +47,11 @@ lint: ## Run pep8, black, mypy linters. .PHONY: test test: ## Run tests and generate coverage report. - $(ENV_PREFIX)pytest -vv -s --cov-config .coveragerc --cov=tosfs -l --tb=short --maxfail=1 ${TEST_DIR} --ignore=${TEST_DIR}/test_stability.py + $(ENV_PREFIX)pytest -vv -s --cov-config .coveragerc --cov=tosfs -l --tb=short --maxfail=1 tosfs/tests/ --ignore=tosfs/tests/test_stability.py .PHONY: test_stability test_stability: ## Run stability tests. - $(ENV_PREFIX)pytest -vv -s --cov-config .coveragerc --cov=tosfs -l --tb=short --maxfail=1 ${TEST_DIR}/test_stability.py + $(ENV_PREFIX)pytest -vv -s --cov-config .coveragerc --cov=tosfs -l --tb=short --maxfail=1 tosfs/tests/test_stability.py .PHONY: watch watch: ## Run tests on every change. diff --git a/poetry.lock b/poetry.lock index 4b17890..b779fe6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -291,13 +291,13 @@ test = ["pytest (>=6)"] [[package]] name = "fsspec" -version = "2024.6.1" +version = "2023.5.0" description = "File-system specification" optional = false python-versions = ">=3.8" files = [ - {file = "fsspec-2024.6.1-py3-none-any.whl", hash = "sha256:3cb443f8bcd2efb31295a5b9fdb02aee81d8452c80d28f97a6d0959e6cee101e"}, - {file = "fsspec-2024.6.1.tar.gz", hash = "sha256:fad7d7e209dd4c1208e3bbfda706620e0da5142bebbd9c384afb95b07e798e49"}, + {file = "fsspec-2023.5.0-py3-none-any.whl", hash = "sha256:51a4ad01a5bb66fcc58036e288c0d53d3975a0df2a5dc59a93b59bade0391f2a"}, + {file = "fsspec-2023.5.0.tar.gz", hash = "sha256:b3b56e00fb93ea321bc9e5d9cf6f8522a0198b20eb24e02774d329e9c6fb84ce"}, ] [package.extras] @@ -305,8 +305,7 @@ abfs = ["adlfs"] adl = ["adlfs"] arrow = ["pyarrow (>=1)"] dask = ["dask", "distributed"] -dev = ["pre-commit", "ruff"] -doc = ["numpydoc", "sphinx", "sphinx-design", "sphinx-rtd-theme", "yarl"] +devel = ["pytest", "pytest-cov"] dropbox = ["dropbox", "dropboxdrivefs", "requests"] full = ["adlfs", "aiohttp (!=4.0.0a0,!=4.0.0a1)", "dask", "distributed", "dropbox", "dropboxdrivefs", "fusepy", "gcsfs", "libarchive-c", "ocifs", "panel", "paramiko", "pyarrow (>=1)", "pygit2", "requests", "s3fs", "smbprotocol", "tqdm"] fuse = ["fusepy"] @@ -316,16 +315,13 @@ github = ["requests"] gs = ["gcsfs"] gui = ["panel"] hdfs = ["pyarrow (>=1)"] -http = ["aiohttp (!=4.0.0a0,!=4.0.0a1)"] +http = ["aiohttp (!=4.0.0a0,!=4.0.0a1)", "requests"] libarchive = ["libarchive-c"] oci = ["ocifs"] s3 = ["s3fs"] sftp = ["paramiko"] smb = ["smbprotocol"] ssh = ["paramiko"] -test = ["aiohttp (!=4.0.0a0,!=4.0.0a1)", "numpy", "pytest", "pytest-asyncio (!=0.22.0)", "pytest-benchmark", "pytest-cov", "pytest-mock", "pytest-recording", "pytest-rerunfailures", "requests"] -test-downstream = ["aiobotocore (>=2.5.4,<3.0.0)", "dask-expr", "dask[dataframe,test]", "moto[server] (>4,<5)", "pytest-timeout", "xarray"] -test-full = ["adlfs", "aiohttp (!=4.0.0a0,!=4.0.0a1)", "cloudpickle", "dask", "distributed", "dropbox", "dropboxdrivefs", "fastparquet", "fusepy", "gcsfs", "jinja2", "kerchunk", "libarchive-c", "lz4", "notebook", "numpy", "ocifs", "pandas", "panel", "paramiko", "pyarrow", "pyarrow (>=1)", "pyftpdlib", "pygit2", "pytest", "pytest-asyncio (!=0.22.0)", "pytest-benchmark", "pytest-cov", "pytest-mock", "pytest-recording", "pytest-rerunfailures", "python-snappy", "requests", "smbprotocol", "tqdm", "urllib3", "zarr", "zstandard"] tqdm = ["tqdm"] [[package]] @@ -737,4 +733,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "2afca8d4ce9d407ec9c308bfb562ae0c046061066ad9c7a115a215e1c7941238" +content-hash = "1bb1712f54089469cbb3c278bad0114a8104a28f988d82ff8d87bf88aa5d0fa5" diff --git a/pyproject.toml b/pyproject.toml index db1a095..3d59f1f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.9" -fsspec = ">=2023.5.0" +fsspec = "==2023.5.0" tos = ">=2.7.0" [tool.poetry.group.dev.dependencies] diff --git a/tosfs/core.py b/tosfs/core.py index db21471..807c1bb 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -936,10 +936,117 @@ def walk( if path in ["", "*"] + ["{}://".format(p) for p in self.protocol]: raise ValueError("Cannot access all of TOS via path {}.".format(path)) - return super().walk( + return self._fsspec_walk( path, maxdepth=maxdepth, topdown=topdown, on_error=on_error, **kwargs ) + def _fsspec_walk( # noqa + self, + path: str, + maxdepth: Optional[int] = None, + topdown: bool = True, + on_error: str = "omit", + **kwargs: Any, + ) -> Any: + """Return all files belows path. + + Copied from fsspec(2024.9.0) to fix fsspec(2023.5.0.) + + List all files, recursing into subdirectories; output is iterator-style, + like ``os.walk()``. For a simple list of files, ``find()`` is available. + + When topdown is True, the caller can modify the dirnames list in-place (perhaps + using del or slice assignment), and walk() will + only recurse into the subdirectories whose names remain in dirnames; + this can be used to prune the search, impose a specific order of visiting, + or even to inform walk() about directories the caller creates or renames before + it resumes walk() again. + Modifying dirnames when topdown is False has no effect. (see os.walk) + + Note that the "files" outputted will include anything that is not + a directory, such as links. + + Parameters + ---------- + path: str + Root to recurse into + maxdepth: int + Maximum recursion depth. None means limitless, but not recommended + on link-based file-systems. + topdown: bool (True) + Whether to walk the directory tree from the top downwards or from + the bottom upwards. + on_error: "omit", "raise", a collable + if omit (default), path with exception will simply be empty; + If raise, an underlying exception will be raised; + if callable, it will be called with a single OSError instance as argument + kwargs: passed to ``ls`` + + """ + # type: ignore + if maxdepth is not None and maxdepth < 1: + raise ValueError("maxdepth must be at least 1") + + path = self._strip_protocol(path) + full_dirs = {} + dirs = {} + files = {} + + detail = kwargs.pop("detail", False) + try: + listing = self.ls(path, detail=True, **kwargs) + except (FileNotFoundError, OSError) as e: + if on_error == "raise": + raise + elif callable(on_error): + on_error(e) + if detail: + return path, {}, {} # type: ignore + return path, [], [] # type: ignore + + for info in listing: + # each info name must be at least [path]/part , but here + # we check also for names like [path]/part/ + pathname = info["name"].rstrip("/") # type: ignore + name = pathname.rsplit("/", 1)[-1] + if info["type"] == "directory" and pathname != path: # type: ignore + # do not include "self" path + full_dirs[name] = pathname + dirs[name] = info + elif pathname == path: + # file-like with same name as give path + files[""] = info + else: + files[name] = info + + if not detail: + dirs = list(dirs) # type: ignore + files = list(files) # type: ignore + + if topdown: + # Yield before recursion if walking top down + yield path, dirs, files + + if maxdepth is not None: + maxdepth -= 1 + if maxdepth < 1: + if not topdown: + yield path, dirs, files + return + + for d in dirs: + yield from self.walk( + full_dirs[d], + maxdepth=maxdepth, + detail=detail, + topdown=topdown, + **kwargs, + ) + + if not topdown: + # Yield after recursion if walking bottom up + yield path, dirs, files + def find( self, path: str, @@ -984,7 +1091,7 @@ def find( "Can not specify 'prefix' option alongside 'maxdepth' options." ) if maxdepth: - return super().find( + return self._fsspec_find( bucket + "/" + key, maxdepth=maxdepth, withdirs=withdirs, @@ -999,6 +1106,54 @@ def find( else: return [o["name"] for o in out] + def _fsspec_find( # noqa # + self, + path: str, + maxdepth: Optional[int] = None, + withdirs: bool = False, + detail: bool = False, + **kwargs: Any, # type: ignore + ) -> Any: + """List all files below path. + + Copied from fsspec(2024.9.0) to fix fsspec(2023.5.0.) + + Like posix ``find`` command without conditions + + Parameters + ---------- + path : str + maxdepth: int or None + If not None, the maximum number of levels to descend + withdirs: bool + Whether to include directory paths in the output. This is True + when used by glob, but users usually only want files. + kwargs are passed to ``ls``. + + """ + # TODO: allow equivalent of -name parameter + path = self._strip_protocol(path) + out = {} + + # Add the root directory if withdirs is requested + # This is needed for posix glob compliance + if withdirs and path != "" and self.isdir(path): + out[path] = self.info(path) + + for _, dirs, files in self._fsspec_walk(path, maxdepth, detail=True, **kwargs): + if withdirs: + files.update(dirs) + out.update({info["name"]: info for name, info in files.items()}) + if not out and self.isfile(path): + # walk works on directories, but find should also return [path] + # when path happens to be a file + out[path] = {} + names = sorted(out) + if not detail: + return names + else: + return {name: out[name] for name in names} + def expand_path( self, path: Union[str, List[str]], @@ -1219,6 +1374,17 @@ def __init__(self, key: str, version_id: Optional[str] = None): self.key = key self.version_id = version_id + def delete_objects(deleting_objects: List[DeletingObject]) -> None: + delete_resp = retryable_func_executor( + lambda: self.tos_client.delete_multi_objects( + bucket, deleting_objects, quiet=True + ), + max_retry_num=self.max_retry_num, + ) + if delete_resp.error: + for d in delete_resp.error: + logger.warning("Deleted object: %s failed", d) + while is_truncated: def _call_list_objects_type2( @@ -1227,7 +1393,6 @@ def _call_list_objects_type2( return self.tos_client.list_objects_type2( bucket, prefix=key.rstrip("/") + "/", - delimiter="/", max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, continuation_token=continuation_token, ) @@ -1239,23 +1404,15 @@ def _call_list_objects_type2( ) is_truncated = resp.is_truncated continuation_token = resp.next_continuation_token - all_results.extend(resp.contents + resp.common_prefixes) + all_results = resp.contents - deleting_objects = [ - DeletingObject(o.key if hasattr(o, "key") else o.prefix) - for o in all_results - ] + deleting_objects = [ + DeletingObject(o.key if hasattr(o, "key") else o.prefix) + for o in all_results + ] - if deleting_objects: - delete_resp = retryable_func_executor( - lambda: self.tos_client.delete_multi_objects( - bucket, deleting_objects, quiet=True - ), - max_retry_num=self.max_retry_num, - ) - if delete_resp.error: - for d in delete_resp.error: - logger.warning("Deleted object: %s failed", d) + if deleting_objects: + delete_objects(deleting_objects) def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None: """Copy file between locations on tos. @@ -1683,7 +1840,16 @@ def exists(self, path: str, **kwargs: Any) -> bool: ) except TosServerError as ex: if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: - return False + resp = retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + key.rstrip("/") + "/", + start_after=key.rstrip("/") + "/", + max_keys=1, + ), + max_retry_num=self.max_retry_num, + ) + return len(resp.contents) > 0 else: raise ex else: diff --git a/tosfs/tests/test_tosfs.py b/tosfs/tests/test_tosfs.py index f24f8e1..7fb9bbe 100644 --- a/tosfs/tests/test_tosfs.py +++ b/tosfs/tests/test_tosfs.py @@ -14,7 +14,6 @@ import os.path import tempfile -import fsspec import pytest from tos.exceptions import TosServerError @@ -22,8 +21,6 @@ from tosfs.exceptions import TosfsError from tosfs.utils import create_temp_dir, random_str -fsspec_version = fsspec.__version__ - def test_ls_bucket(tosfs: TosFileSystem, bucket: str) -> None: assert bucket in tosfs.ls("", detail=False) @@ -226,6 +223,11 @@ def test_exists_object( tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}") assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}") + tosfs.touch(f"{bucket}/{temporary_workspace}/a/b/c/d.txt") + assert tosfs.exists(f"{bucket}/{temporary_workspace}/a") + assert tosfs.exists(f"{bucket}/{temporary_workspace}/a/b") + assert tosfs.exists(f"{bucket}/{temporary_workspace}/a/b/c") + def test_mkdir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None: dir_name = random_str() @@ -462,6 +464,11 @@ def test_find(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> No ) assert result[f"{bucket}/{temporary_workspace}"]["type"] == "directory" + result = tosfs.find( + f"{bucket}/{temporary_workspace}/", withdirs=True, maxdepth=1, detail=True + ) + assert len(result) == 1 + dir_name = random_str() sub_dir_name = random_str() file_name = random_str() @@ -630,24 +637,15 @@ def test_glob(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> No ) # Test with maxdepth - assert ( - sorted(tosfs.glob(f"{bucket}/{temporary_workspace}/**", maxdepth=2)) - == sorted( - [ - f"{bucket}/{temporary_workspace}/{dir_name}", - f"{bucket}/{temporary_workspace}/{dir_name}/{file_name}", - f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}", - ] - ) - if fsspec_version == "2023.5.0" - else sorted( - [ - f"{bucket}/{temporary_workspace}", - f"{bucket}/{temporary_workspace}/{dir_name}", - f"{bucket}/{temporary_workspace}/{dir_name}/{file_name}", - f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}", - ] - ) + assert sorted( + tosfs.glob(f"{bucket}/{temporary_workspace}/**", maxdepth=2) + ) == sorted( + [ + f"{bucket}/{temporary_workspace}", + f"{bucket}/{temporary_workspace}/{dir_name}", + f"{bucket}/{temporary_workspace}/{dir_name}/{file_name}", + f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}", + ] ) # Test with detail @@ -683,7 +681,13 @@ def test_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}" ) tosfs.rm(f"{bucket}/{temporary_workspace}/{dir_name}", recursive=True) + assert not tosfs.exists( + f"{bucket}/{temporary_workspace}/{dir_name}/{sub_file_name}" + ) assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}") + assert not tosfs.exists( + f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}" + ) assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}") # Test Deletion of Non-Existent Path