Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async wrapper for sync FS #1745

Merged
merged 5 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions fsspec/implementations/asyn_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import asyncio
import inspect
import functools
from fsspec.asyn import AsyncFileSystem


def async_wrapper(func, obj=None):
"""
Wraps a synchronous function to make it awaitable.

Parameters
----------
func : callable
The synchronous function to wrap.
obj : object, optional
The instance to bind the function to, if applicable.

Returns
-------
coroutine
An awaitable version of the function.
"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
self = obj or args[0]
return await asyncio.to_thread(func, *args, **kwargs)
return wrapper


class AsyncFileSystemWrapper(AsyncFileSystem):
martindurant marked this conversation as resolved.
Show resolved Hide resolved
"""
A wrapper class to convert a synchronous filesystem into an asynchronous one.

This class takes an existing synchronous filesystem implementation and wraps all
its methods to provide an asynchronous interface.

Parameters
----------
sync_fs : AbstractFileSystem
The synchronous filesystem instance to wrap.
"""
def __init__(self, sync_fs, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fs = sync_fs
self._wrap_all_sync_methods()

@property
def fsid(self):
return f"async_{self.fs.fsid}"

def _wrap_all_sync_methods(self):
"""
Wrap all synchronous methods of the underlying filesystem with asynchronous versions.
"""
for method_name in dir(self.fs):
if method_name.startswith("_"):
continue

attr = inspect.getattr_static(self.fs, method_name)
if isinstance(attr, property):
continue

method = getattr(self.fs, method_name)
if callable(method) and not asyncio.iscoroutinefunction(method):
async_method = async_wrapper(method, obj=self)
setattr(self, f"_{method_name}", async_method)

@classmethod
def wrap_class(cls, sync_fs_class):
"""
Create a new class that can be used to instantiate an AsyncFileSystemWrapper
with lazy instantiation of the underlying synchronous filesystem.

Parameters
----------
sync_fs_class : type
The class of the synchronous filesystem to wrap.

Returns
-------
type
A new class that wraps the provided synchronous filesystem class.
"""
class GeneratedAsyncFileSystemWrapper(cls):
def __init__(self, *args, **kwargs):
sync_fs = sync_fs_class(*args, **kwargs)
super().__init__(sync_fs)

GeneratedAsyncFileSystemWrapper.__name__ = f"Async{sync_fs_class.__name__}Wrapper"
return GeneratedAsyncFileSystemWrapper
131 changes: 131 additions & 0 deletions fsspec/implementations/tests/test_asyn_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import asyncio
import pytest
import os

import fsspec
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
from fsspec.implementations.local import LocalFileSystem
from .test_local import csv_files, filetexts


def test_is_async():
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)
assert async_fs.async_impl == True


def test_class_wrapper():
fs_cls = LocalFileSystem
async_fs_cls = AsyncFileSystemWrapper.wrap_class(fs_cls)
assert async_fs_cls.__name__ == "AsyncLocalFileSystemWrapper"
async_fs = async_fs_cls()
assert async_fs.async_impl == True


@pytest.mark.asyncio
async def test_cats():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

result = await async_fs._cat(".test.fakedata.1.csv")
assert result == b"a,b\n1,2\n"

out = set((await async_fs._cat([".test.fakedata.1.csv", ".test.fakedata.2.csv"])).values())
assert out == {b"a,b\n1,2\n", b"a,b\n3,4\n"}

result = await async_fs._cat(".test.fakedata.1.csv", None, None)
assert result == b"a,b\n1,2\n"

result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=6)
assert result == b"a,b\n1,2\n"[1:6]

result = await async_fs._cat(".test.fakedata.1.csv", start=-1)
assert result == b"a,b\n1,2\n"[-1:]

result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=-2)
assert result == b"a,b\n1,2\n"[1:-2]

# test synchronous API is available as expected
result = async_fs.cat(".test.fakedata.1.csv", start=1, end=-2)
assert result == b"a,b\n1,2\n"[1:-2]

out = set(
(await async_fs._cat(
[".test.fakedata.1.csv", ".test.fakedata.2.csv"], start=1, end=-1
)).values()
)
assert out == {b"a,b\n1,2\n"[1:-1], b"a,b\n3,4\n"[1:-1]}

@pytest.mark.asyncio
async def test_basic_crud_operations():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

await async_fs._touch(".test.fakedata.3.csv")
assert await async_fs._exists(".test.fakedata.3.csv")

data = await async_fs._cat(".test.fakedata.1.csv")
assert data == b"a,b\n1,2\n"

await async_fs._pipe(".test.fakedata.1.csv", b"a,b\n5,6\n")
data = await async_fs._cat(".test.fakedata.1.csv")
assert data == b"a,b\n5,6\n"

await async_fs._rm(".test.fakedata.1.csv")
assert not await async_fs._exists(".test.fakedata.1.csv")

@pytest.mark.asyncio
async def test_error_handling():
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

with pytest.raises(FileNotFoundError):
await async_fs._cat(".test.non_existent.csv")

with pytest.raises(FileNotFoundError):
await async_fs._rm(".test.non_existent.csv")

@pytest.mark.asyncio
async def test_concurrent_operations():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

async def read_file(file_path):
return await async_fs._cat(file_path)

results = await asyncio.gather(
read_file(".test.fakedata.1.csv"),
read_file(".test.fakedata.2.csv"),
read_file(".test.fakedata.1.csv")
)

assert results == [b"a,b\n1,2\n", b"a,b\n3,4\n", b"a,b\n1,2\n"]

@pytest.mark.asyncio
async def test_directory_operations():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

await async_fs._makedirs("new_directory")
assert await async_fs._isdir("new_directory")

files = await async_fs._ls(".")
filenames = [os.path.basename(file) for file in files]

assert ".test.fakedata.1.csv" in filenames
assert ".test.fakedata.2.csv" in filenames
assert "new_directory" in filenames

@pytest.mark.asyncio
async def test_batch_operations():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

await async_fs._rm([".test.fakedata.1.csv", ".test.fakedata.2.csv"])
assert not await async_fs._exists(".test.fakedata.1.csv")
assert not await async_fs._exists(".test.fakedata.2.csv")
Loading