diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 653eb8084..f9e5ed98c 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -14,7 +14,6 @@ jobs: fail-fast: false matrix: PY: - - "3.8" - "3.9" - "3.10" - "3.11" diff --git a/docs/source/async.rst b/docs/source/async.rst index 58334b333..dc44df381 100644 --- a/docs/source/async.rst +++ b/docs/source/async.rst @@ -152,3 +152,37 @@ available as the attribute ``.loop``. + +AsyncFileSystemWrapper +---------------------- + +The `AsyncFileSystemWrapper` class is an experimental feature that allows you to convert +a synchronous filesystem into an asynchronous one. This is useful for quickly integrating +synchronous filesystems into workflows that may expect `AsyncFileSystem` instances. + +Basic Usage +~~~~~~~~~~~ + +To use `AsyncFileSystemWrapper`, wrap any synchronous filesystem to work in an asynchronous context. +In this example, the synchronous `LocalFileSystem` is wrapped, creating an `AsyncFileSystem` instance +backed by the normal, synchronous methods of `LocalFileSystem`: + +.. code-block:: python + + import asyncio + import fsspec + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + + async def async_copy_file(): + sync_fs = fsspec.filesystem('file') # by-default synchronous, local filesystem + async_fs = AsyncFileSystemWrapper(sync_fs) + return await async_fs._copy('/source/file.txt', '/destination/file.txt') + + asyncio.run(async_copy_file()) + +Limitations +----------- + +This is experimental. Users should not expect this wrapper to magically make things faster. +It is primarily provided to allow usage of synchronous filesystems with interfaces that expect +`AsyncFileSystem` instances. diff --git a/fsspec/implementations/asyn_wrapper.py b/fsspec/implementations/asyn_wrapper.py new file mode 100644 index 000000000..9ba7811ce --- /dev/null +++ b/fsspec/implementations/asyn_wrapper.py @@ -0,0 +1,96 @@ +import asyncio +import inspect +import functools +from fsspec.asyn import AsyncFileSystem + + +def async_wrapper(func, obj=None): + """ + Wraps a synchronous function to make it awaitable. + + Parameters + ---------- + func : callable + The synchronous function to wrap. + obj : object, optional + The instance to bind the function to, if applicable. + + Returns + ------- + coroutine + An awaitable version of the function. + """ + + @functools.wraps(func) + async def wrapper(*args, **kwargs): + return await asyncio.to_thread(func, *args, **kwargs) + + return wrapper + + +class AsyncFileSystemWrapper(AsyncFileSystem): + """ + A wrapper class to convert a synchronous filesystem into an asynchronous one. + + This class takes an existing synchronous filesystem implementation and wraps all + its methods to provide an asynchronous interface. + + Parameters + ---------- + sync_fs : AbstractFileSystem + The synchronous filesystem instance to wrap. + """ + + def __init__(self, sync_fs, *args, **kwargs): + super().__init__(*args, **kwargs) + self.asynchronous = True + self.fs = sync_fs + self._wrap_all_sync_methods() + + @property + def fsid(self): + return f"async_{self.fs.fsid}" + + def _wrap_all_sync_methods(self): + """ + Wrap all synchronous methods of the underlying filesystem with asynchronous versions. + """ + for method_name in dir(self.fs): + if method_name.startswith("_"): + continue + + attr = inspect.getattr_static(self.fs, method_name) + if isinstance(attr, property): + continue + + method = getattr(self.fs, method_name) + if callable(method) and not asyncio.iscoroutinefunction(method): + async_method = async_wrapper(method, obj=self) + setattr(self, f"_{method_name}", async_method) + + @classmethod + def wrap_class(cls, sync_fs_class): + """ + Create a new class that can be used to instantiate an AsyncFileSystemWrapper + with lazy instantiation of the underlying synchronous filesystem. + + Parameters + ---------- + sync_fs_class : type + The class of the synchronous filesystem to wrap. + + Returns + ------- + type + A new class that wraps the provided synchronous filesystem class. + """ + + class GeneratedAsyncFileSystemWrapper(cls): + def __init__(self, *args, **kwargs): + sync_fs = sync_fs_class(*args, **kwargs) + super().__init__(sync_fs) + + GeneratedAsyncFileSystemWrapper.__name__ = ( + f"Async{sync_fs_class.__name__}Wrapper" + ) + return GeneratedAsyncFileSystemWrapper diff --git a/fsspec/implementations/tests/test_asyn_wrapper.py b/fsspec/implementations/tests/test_asyn_wrapper.py new file mode 100644 index 000000000..d29202003 --- /dev/null +++ b/fsspec/implementations/tests/test_asyn_wrapper.py @@ -0,0 +1,142 @@ +import asyncio +import pytest +import os + +import fsspec +from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper +from fsspec.implementations.local import LocalFileSystem +from .test_local import csv_files, filetexts + + +def test_is_async(): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + assert async_fs.async_impl + + +def test_class_wrapper(): + fs_cls = LocalFileSystem + async_fs_cls = AsyncFileSystemWrapper.wrap_class(fs_cls) + assert async_fs_cls.__name__ == "AsyncLocalFileSystemWrapper" + async_fs = async_fs_cls() + assert async_fs.async_impl + + +@pytest.mark.asyncio +async def test_cats(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + result = await async_fs._cat(".test.fakedata.1.csv") + assert result == b"a,b\n1,2\n" + + out = set( + ( + await async_fs._cat([".test.fakedata.1.csv", ".test.fakedata.2.csv"]) + ).values() + ) + assert out == {b"a,b\n1,2\n", b"a,b\n3,4\n"} + + result = await async_fs._cat(".test.fakedata.1.csv", None, None) + assert result == b"a,b\n1,2\n" + + result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=6) + assert result == b"a,b\n1,2\n"[1:6] + + result = await async_fs._cat(".test.fakedata.1.csv", start=-1) + assert result == b"a,b\n1,2\n"[-1:] + + result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=-2) + assert result == b"a,b\n1,2\n"[1:-2] + + # test synchronous API is available as expected + result = async_fs.cat(".test.fakedata.1.csv", start=1, end=-2) + assert result == b"a,b\n1,2\n"[1:-2] + + out = set( + ( + await async_fs._cat( + [".test.fakedata.1.csv", ".test.fakedata.2.csv"], start=1, end=-1 + ) + ).values() + ) + assert out == {b"a,b\n1,2\n"[1:-1], b"a,b\n3,4\n"[1:-1]} + + +@pytest.mark.asyncio +async def test_basic_crud_operations(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + await async_fs._touch(".test.fakedata.3.csv") + assert await async_fs._exists(".test.fakedata.3.csv") + + data = await async_fs._cat(".test.fakedata.1.csv") + assert data == b"a,b\n1,2\n" + + await async_fs._pipe(".test.fakedata.1.csv", b"a,b\n5,6\n") + data = await async_fs._cat(".test.fakedata.1.csv") + assert data == b"a,b\n5,6\n" + + await async_fs._rm(".test.fakedata.1.csv") + assert not await async_fs._exists(".test.fakedata.1.csv") + + +@pytest.mark.asyncio +async def test_error_handling(): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + with pytest.raises(FileNotFoundError): + await async_fs._cat(".test.non_existent.csv") + + with pytest.raises(FileNotFoundError): + await async_fs._rm(".test.non_existent.csv") + + +@pytest.mark.asyncio +async def test_concurrent_operations(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + async def read_file(file_path): + return await async_fs._cat(file_path) + + results = await asyncio.gather( + read_file(".test.fakedata.1.csv"), + read_file(".test.fakedata.2.csv"), + read_file(".test.fakedata.1.csv"), + ) + + assert results == [b"a,b\n1,2\n", b"a,b\n3,4\n", b"a,b\n1,2\n"] + + +@pytest.mark.asyncio +async def test_directory_operations(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + await async_fs._makedirs("new_directory") + assert await async_fs._isdir("new_directory") + + files = await async_fs._ls(".") + filenames = [os.path.basename(file) for file in files] + + assert ".test.fakedata.1.csv" in filenames + assert ".test.fakedata.2.csv" in filenames + assert "new_directory" in filenames + + +@pytest.mark.asyncio +async def test_batch_operations(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + await async_fs._rm([".test.fakedata.1.csv", ".test.fakedata.2.csv"]) + assert not await async_fs._exists(".test.fakedata.1.csv") + assert not await async_fs._exists(".test.fakedata.2.csv")