Skip to content

Commit

Permalink
tests and docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Nov 1, 2024
1 parent 0a0a2fc commit 032c976
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 7 deletions.
36 changes: 30 additions & 6 deletions obstore/python/obstore/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,30 @@


class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem):
"""An fsspec implementation based on a obstore Store"""

def __init__(
self,
store,
store: obs.store.ObjectStore,
*args,
asynchronous=False,
asynchronous: bool = False,
loop=None,
batch_size=None,
batch_size: int | None = None,
**kwargs,
):
"""
store: a configured instance of one of the store classes in objstore.store
asynchronous: id this instance meant to be be called using the async API? This
should only be set to true when running within a coroutine
loop: since both fsspec/python and tokio/rust may be using loops, this should
be kept None for now
batch_size: some operations on many files will batch their requests; if you
are seeing timeouts, you may want to set this number smaller than the defaults,
which are determined in fsspec.asyn._get_batch_size
kwargs: not currently supported; extra configuration for the backend should be
done to the Store passed in the first argument.
"""

self.store = store
super().__init__(
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size, **kwargs
Expand Down Expand Up @@ -74,7 +89,9 @@ async def _cat_ranges(
on_error="return",
**kwargs,
):
# TODO: need to go through this again and test it
if not len(paths) == len(starts) == len(ends):
raise ValueError

per_file_requests: Dict[str, List[Tuple[int, int, int]]] = defaultdict(list)
for idx, (path, start, end) in enumerate(zip(paths, starts, ends)):
per_file_requests[path].append((start, end, idx))
Expand All @@ -95,7 +112,7 @@ async def _cat_ranges(
path, ranges = per_file_request
for buffer, ranges_ in zip(buffers, ranges):
initial_index = ranges_[2]
output_buffers[initial_index] = buffer
output_buffers[initial_index] = buffer.as_bytes()

return output_buffers

Expand Down Expand Up @@ -147,9 +164,16 @@ def _open(self, path, mode="rb", **kwargs):

class BufferedFileSimple(fsspec.spec.AbstractBufferedFile):
def __init__(self, fs, path, mode="rb", cache_type="none", **kwargs):
if mode != "rb":
raise ValueError("Only 'rb' mode is currently supported")
super().__init__(fs, path, mode, mode, cache_type=cache_type, **kwargs)

def read(self, length=-1):
def read(self, length: int = -1):
"""Return bytes from the remote file
length: if positive, returns up to this many bytes; if negative, return all
remaining byets.
"""
if length < 0:
data = self.fs.cat_file(self.path, self.loc, self.size)
self.loc = self.size
Expand Down
4 changes: 3 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import boto3
import pytest
import urllib3
from botocore import UNSIGNED
from botocore.client import Config
from moto.moto_server.threaded_moto_server import ThreadedMotoServer
Expand Down Expand Up @@ -35,7 +36,8 @@ def s3(moto_server_uri: str):
)
client.create_bucket(Bucket=TEST_BUCKET_NAME, ACL="public-read")
client.put_object(Bucket=TEST_BUCKET_NAME, Key="afile", Body=b"hello world")
return moto_server_uri
yield moto_server_uri
urllib3.request(method="post", url=f"{moto_server_uri}/moto-api/reset")


@pytest.fixture()
Expand Down
75 changes: 75 additions & 0 deletions tests/test_fsspec.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

import pyarrow.parquet as pq
import pytest

Expand All @@ -21,8 +23,81 @@ def test_list(fs):
assert out[1]["type"] == "directory"


@pytest.mark.asyncio
async def test_list_async(s3_store):
fs = AsyncFsspecStore(s3_store, asynchronous=True)
out = await fs._ls("", detail=False)
assert out == ["afile"]
await fs._pipe_file("dir/bfile", b"data")
out = await fs._ls("", detail=False)
assert out == ["afile", "dir"]
out = await fs._ls("", detail=True)
assert out[0]["type"] == "file"
assert out[1]["type"] == "directory"


@pytest.mark.network
def test_remote_parquet():
store = obs.store.HTTPStore.from_url("https://github.com")
fs = AsyncFsspecStore(store)
url = "opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet"
pq.read_metadata(url, filesystem=fs)


def test_multi_file_ops(fs):
data = {"dir/test1": b"test data1", "dir/test2": b"test data2"}
fs.pipe(data)
out = fs.cat(list(data))
assert out == data
out = fs.cat("dir", recursive=True)
assert out == data
fs.cp("dir", "dir2", recursive=True)
out = fs.find("", detail=False)
assert out == ["afile", "dir/test1", "dir/test2", "dir2/test1", "dir2/test2"]
fs.rm(["dir", "dir2"], recursive=True)
out = fs.find("", detail=False)
assert out == ["afile"]


def test_cat_ranges_one(fs):
data1 = os.urandom(10000)
fs.pipe_file("data1", data1)

# single range
out = fs.cat_ranges(["data1"], [10], [20])
assert out == [data1[10:20]]

# range oob
out = fs.cat_ranges(["data1"], [0], [11000])
assert out == [data1]

# two disjoint ranges, one file
out = fs.cat_ranges(["data1", "data1"], [10, 40], [20, 60])
assert out == [data1[10:20], data1[40:60]]

# two adjoining ranges, one file
out = fs.cat_ranges(["data1", "data1"], [10, 30], [20, 60])
assert out == [data1[10:20], data1[30:60]]

# two overlapping ranges, one file
out = fs.cat_ranges(["data1", "data1"], [10, 15], [20, 60])
assert out == [data1[10:20], data1[15:60]]

# completely overlapping ranges, one file
out = fs.cat_ranges(["data1", "data1"], [10, 0], [20, 60])
assert out == [data1[10:20], data1[0:60]]


def test_cat_ranges_two(fs):
data1 = os.urandom(10000)
data2 = os.urandom(10000)
fs.pipe({"data1": data1, "data2": data2})

# single range in each file
out = fs.cat_ranges(["data1", "data2"], [10, 10], [20, 20])
assert out == [data1[10:20], data2[10:20]]


def test_cat_ranges_error(fs):
with pytest.raises(ValueError):
fs.cat_ranges(["path"], [], [])

0 comments on commit 032c976

Please sign in to comment.