Skip to content

Commit

Permalink
Add async wrapper for sync FS (#1745)
Browse files Browse the repository at this point in the history

Co-authored-by: Martin Durant <[email protected]>
  • Loading branch information
moradology and martindurant authored Nov 12, 2024
1 parent 9a16171 commit 4cb98ab
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 1 deletion.
1 change: 0 additions & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ jobs:
fail-fast: false
matrix:
PY:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
Expand Down
34 changes: 34 additions & 0 deletions docs/source/async.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,37 @@ available as the attribute ``.loop``.

<script data-goatcounter="https://fsspec.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>

AsyncFileSystemWrapper
----------------------

The `AsyncFileSystemWrapper` class is an experimental feature that allows you to convert
a synchronous filesystem into an asynchronous one. This is useful for quickly integrating
synchronous filesystems into workflows that may expect `AsyncFileSystem` instances.

Basic Usage
~~~~~~~~~~~

To use `AsyncFileSystemWrapper`, wrap any synchronous filesystem to work in an asynchronous context.
In this example, the synchronous `LocalFileSystem` is wrapped, creating an `AsyncFileSystem` instance
backed by the normal, synchronous methods of `LocalFileSystem`:

.. code-block:: python
import asyncio
import fsspec
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
async def async_copy_file():
sync_fs = fsspec.filesystem('file') # by-default synchronous, local filesystem
async_fs = AsyncFileSystemWrapper(sync_fs)
return await async_fs._copy('/source/file.txt', '/destination/file.txt')
asyncio.run(async_copy_file())
Limitations
-----------

This is experimental. Users should not expect this wrapper to magically make things faster.
It is primarily provided to allow usage of synchronous filesystems with interfaces that expect
`AsyncFileSystem` instances.
96 changes: 96 additions & 0 deletions fsspec/implementations/asyn_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import asyncio
import inspect
import functools
from fsspec.asyn import AsyncFileSystem


def async_wrapper(func, obj=None):
"""
Wraps a synchronous function to make it awaitable.
Parameters
----------
func : callable
The synchronous function to wrap.
obj : object, optional
The instance to bind the function to, if applicable.
Returns
-------
coroutine
An awaitable version of the function.
"""

@functools.wraps(func)
async def wrapper(*args, **kwargs):
return await asyncio.to_thread(func, *args, **kwargs)

return wrapper


class AsyncFileSystemWrapper(AsyncFileSystem):
"""
A wrapper class to convert a synchronous filesystem into an asynchronous one.
This class takes an existing synchronous filesystem implementation and wraps all
its methods to provide an asynchronous interface.
Parameters
----------
sync_fs : AbstractFileSystem
The synchronous filesystem instance to wrap.
"""

def __init__(self, sync_fs, *args, **kwargs):
super().__init__(*args, **kwargs)
self.asynchronous = True
self.fs = sync_fs
self._wrap_all_sync_methods()

@property
def fsid(self):
return f"async_{self.fs.fsid}"

def _wrap_all_sync_methods(self):
"""
Wrap all synchronous methods of the underlying filesystem with asynchronous versions.
"""
for method_name in dir(self.fs):
if method_name.startswith("_"):
continue

attr = inspect.getattr_static(self.fs, method_name)
if isinstance(attr, property):
continue

method = getattr(self.fs, method_name)
if callable(method) and not asyncio.iscoroutinefunction(method):
async_method = async_wrapper(method, obj=self)
setattr(self, f"_{method_name}", async_method)

@classmethod
def wrap_class(cls, sync_fs_class):
"""
Create a new class that can be used to instantiate an AsyncFileSystemWrapper
with lazy instantiation of the underlying synchronous filesystem.
Parameters
----------
sync_fs_class : type
The class of the synchronous filesystem to wrap.
Returns
-------
type
A new class that wraps the provided synchronous filesystem class.
"""

class GeneratedAsyncFileSystemWrapper(cls):
def __init__(self, *args, **kwargs):
sync_fs = sync_fs_class(*args, **kwargs)
super().__init__(sync_fs)

GeneratedAsyncFileSystemWrapper.__name__ = (
f"Async{sync_fs_class.__name__}Wrapper"
)
return GeneratedAsyncFileSystemWrapper
142 changes: 142 additions & 0 deletions fsspec/implementations/tests/test_asyn_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import asyncio
import pytest
import os

import fsspec
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
from fsspec.implementations.local import LocalFileSystem
from .test_local import csv_files, filetexts


def test_is_async():
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)
assert async_fs.async_impl


def test_class_wrapper():
fs_cls = LocalFileSystem
async_fs_cls = AsyncFileSystemWrapper.wrap_class(fs_cls)
assert async_fs_cls.__name__ == "AsyncLocalFileSystemWrapper"
async_fs = async_fs_cls()
assert async_fs.async_impl


@pytest.mark.asyncio
async def test_cats():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

result = await async_fs._cat(".test.fakedata.1.csv")
assert result == b"a,b\n1,2\n"

out = set(
(
await async_fs._cat([".test.fakedata.1.csv", ".test.fakedata.2.csv"])
).values()
)
assert out == {b"a,b\n1,2\n", b"a,b\n3,4\n"}

result = await async_fs._cat(".test.fakedata.1.csv", None, None)
assert result == b"a,b\n1,2\n"

result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=6)
assert result == b"a,b\n1,2\n"[1:6]

result = await async_fs._cat(".test.fakedata.1.csv", start=-1)
assert result == b"a,b\n1,2\n"[-1:]

result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=-2)
assert result == b"a,b\n1,2\n"[1:-2]

# test synchronous API is available as expected
result = async_fs.cat(".test.fakedata.1.csv", start=1, end=-2)
assert result == b"a,b\n1,2\n"[1:-2]

out = set(
(
await async_fs._cat(
[".test.fakedata.1.csv", ".test.fakedata.2.csv"], start=1, end=-1
)
).values()
)
assert out == {b"a,b\n1,2\n"[1:-1], b"a,b\n3,4\n"[1:-1]}


@pytest.mark.asyncio
async def test_basic_crud_operations():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

await async_fs._touch(".test.fakedata.3.csv")
assert await async_fs._exists(".test.fakedata.3.csv")

data = await async_fs._cat(".test.fakedata.1.csv")
assert data == b"a,b\n1,2\n"

await async_fs._pipe(".test.fakedata.1.csv", b"a,b\n5,6\n")
data = await async_fs._cat(".test.fakedata.1.csv")
assert data == b"a,b\n5,6\n"

await async_fs._rm(".test.fakedata.1.csv")
assert not await async_fs._exists(".test.fakedata.1.csv")


@pytest.mark.asyncio
async def test_error_handling():
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

with pytest.raises(FileNotFoundError):
await async_fs._cat(".test.non_existent.csv")

with pytest.raises(FileNotFoundError):
await async_fs._rm(".test.non_existent.csv")


@pytest.mark.asyncio
async def test_concurrent_operations():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

async def read_file(file_path):
return await async_fs._cat(file_path)

results = await asyncio.gather(
read_file(".test.fakedata.1.csv"),
read_file(".test.fakedata.2.csv"),
read_file(".test.fakedata.1.csv"),
)

assert results == [b"a,b\n1,2\n", b"a,b\n3,4\n", b"a,b\n1,2\n"]


@pytest.mark.asyncio
async def test_directory_operations():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

await async_fs._makedirs("new_directory")
assert await async_fs._isdir("new_directory")

files = await async_fs._ls(".")
filenames = [os.path.basename(file) for file in files]

assert ".test.fakedata.1.csv" in filenames
assert ".test.fakedata.2.csv" in filenames
assert "new_directory" in filenames


@pytest.mark.asyncio
async def test_batch_operations():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

await async_fs._rm([".test.fakedata.1.csv", ".test.fakedata.2.csv"])
assert not await async_fs._exists(".test.fakedata.1.csv")
assert not await async_fs._exists(".test.fakedata.2.csv")

0 comments on commit 4cb98ab

Please sign in to comment.