Skip to content

Commit

Permalink
Add async iterator over bytes in result of get (#11)
Browse files Browse the repository at this point in the history
* Add async iterator over bytes in result of `get`

* Add synchronous iteration

* Implement `aiter` and `iter` on `GetResult`

* Chunk size

* Add python test
  • Loading branch information
kylebarron authored Oct 18, 2024
1 parent 922b58f commit c10f3fe
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 13 deletions.
68 changes: 67 additions & 1 deletion object-store-rs/python/object_store_rs/_get.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,34 @@ class GetOptions(TypedDict):
"""

class GetResult:
"""Result for a get request"""
"""Result for a get request.
You can materialize the entire buffer by using either `bytes` or `bytes_async`, or
you can stream the result using `stream`. `__iter__` and `__aiter__` are implemented
as aliases to `stream`, so you can alternatively call `iter()` or `aiter()` on
`GetResult` to start an iterator.
Using as an async iterator:
```py
resp = await obs.get_async(store, path)
# 5MB chunk size in stream
stream = resp.stream(min_chunk_size=5 * 1024 * 1024)
async for buf in stream:
print(len(buf))
```
Using as a sync iterator:
```py
resp = obs.get(store, path)
# 20MB chunk size in stream
stream = resp.stream(min_chunk_size=20 * 1024 * 1024)
for buf in stream:
print(len(buf))
```
Note that after calling `bytes`, `bytes_async`, or `stream`, you will no longer be
able to call other methods on this object, such as the `meta` attribute.
"""

def bytes(self) -> bytes:
"""
Expand All @@ -98,6 +125,45 @@ class GetResult:
def meta(self) -> ObjectMeta:
"""The ObjectMeta for this object"""

def stream(self, min_chunk_size: int = 10 * 1024 * 1024) -> BytesStream:
"""Return a chunked stream over the result's bytes.
Args:
min_chunk_size: The minimum size in bytes for each chunk in the returned
`BytesStream`. All chunks except for the last chunk will be at least
this size. Defaults to 10*1024*1024 (10MB).
Returns:
A chunked stream
"""

def __aiter__(self) -> BytesStream:
"""
Return a chunked stream over the result's bytes with the default (10MB) chunk
size.
"""

def __iter__(self) -> BytesStream:
"""
Return a chunked stream over the result's bytes with the default (10MB) chunk
size.
"""

class BytesStream:
"""An async stream of bytes."""

def __aiter__(self) -> BytesStream:
"""Return `Self` as an async iterator."""

def __iter__(self) -> BytesStream:
"""Return `Self` as an async iterator."""

async def __anext__(self) -> bytes:
"""Return the next chunk of bytes in the stream."""

def __next__(self) -> bytes:
"""Return the next chunk of bytes in the stream."""

def get(
store: ObjectStore, location: str, *, options: GetOptions | None = None
) -> GetResult:
Expand Down
144 changes: 134 additions & 10 deletions object-store-rs/src/get.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
use std::sync::Arc;

use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{GetOptions, GetResult, ObjectStore};
use pyo3::exceptions::PyValueError;
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult};
use pyo3_object_store::PyObjectStore;
use tokio::sync::Mutex;

use crate::list::PyObjectMeta;
use crate::runtime::get_runtime;

/// 10MB default chunk size
const DEFAULT_BYTES_CHUNK_SIZE: usize = 10 * 1024 * 1024;

#[derive(FromPyObject)]
pub(crate) struct PyGetOptions {
if_match: Option<String>,
Expand Down Expand Up @@ -54,7 +63,7 @@ impl PyGetResult {
let runtime = get_runtime(py)?;
py.allow_threads(|| {
let bytes = runtime.block_on(get_result.bytes())?;
Ok::<_, PyObjectStoreError>(PyBytesWrapper(bytes))
Ok::<_, PyObjectStoreError>(PyBytesWrapper::new(bytes))
})
}

Expand All @@ -68,7 +77,7 @@ impl PyGetResult {
.bytes()
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
Ok(PyBytesWrapper(bytes))
Ok(PyBytesWrapper::new(bytes))
})
}

Expand All @@ -80,14 +89,129 @@ impl PyGetResult {
.ok_or(PyValueError::new_err("Result has already been disposed."))?;
Ok(PyObjectMeta::new(inner.meta.clone()))
}

#[pyo3(signature = (min_chunk_size = DEFAULT_BYTES_CHUNK_SIZE))]
fn stream(&mut self, min_chunk_size: usize) -> PyResult<PyBytesStream> {
let get_result = self
.0
.take()
.ok_or(PyValueError::new_err("Result has already been disposed."))?;
Ok(PyBytesStream::new(get_result.into_stream(), min_chunk_size))
}

fn __aiter__(&mut self) -> PyResult<PyBytesStream> {
self.stream(DEFAULT_BYTES_CHUNK_SIZE)
}

fn __iter__(&mut self) -> PyResult<PyBytesStream> {
self.stream(DEFAULT_BYTES_CHUNK_SIZE)
}
}

#[pyclass(name = "BytesStream")]
pub struct PyBytesStream {
stream: Arc<Mutex<BoxStream<'static, object_store::Result<Bytes>>>>,
min_chunk_size: usize,
}

impl PyBytesStream {
fn new(stream: BoxStream<'static, object_store::Result<Bytes>>, min_chunk_size: usize) -> Self {
Self {
stream: Arc::new(Mutex::new(stream)),
min_chunk_size,
}
}
}

async fn next_stream(
stream: Arc<Mutex<BoxStream<'static, object_store::Result<Bytes>>>>,
min_chunk_size: usize,
sync: bool,
) -> PyResult<PyBytesWrapper> {
let mut stream = stream.lock().await;
let mut buffers: Vec<Bytes> = vec![];
loop {
match stream.next().await {
Some(Ok(bytes)) => {
buffers.push(bytes);
let total_buffer_len = buffers.iter().fold(0, |acc, buf| acc + buf.len());
if total_buffer_len >= min_chunk_size {
return Ok(PyBytesWrapper::new_multiple(buffers));
}
}
Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()),
None => {
if buffers.is_empty() {
// Depending on whether the iteration is sync or not, we raise either a
// StopIteration or a StopAsyncIteration
if sync {
return Err(PyStopIteration::new_err("stream exhausted"));
} else {
return Err(PyStopAsyncIteration::new_err("stream exhausted"));
}
} else {
return Ok(PyBytesWrapper::new_multiple(buffers));
}
}
};
}
}

#[pymethods]
impl PyBytesStream {
fn __aiter__(slf: Py<Self>) -> Py<Self> {
slf
}

fn __iter__(slf: Py<Self>) -> Py<Self> {
slf
}

fn __anext__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<PyAny>> {
let stream = self.stream.clone();
pyo3_async_runtimes::tokio::future_into_py(
py,
next_stream(stream, self.min_chunk_size, false),
)
}

fn __next__<'py>(&'py self, py: Python<'py>) -> PyResult<PyBytesWrapper> {
let runtime = get_runtime(py)?;
let stream = self.stream.clone();
runtime.block_on(next_stream(stream, self.min_chunk_size, true))
}
}

pub(crate) struct PyBytesWrapper(bytes::Bytes);
pub(crate) struct PyBytesWrapper(Vec<Bytes>);

impl PyBytesWrapper {
pub fn new(buf: Bytes) -> Self {
Self(vec![buf])
}

// TODO: return buffer protocol object
pub fn new_multiple(buffers: Vec<Bytes>) -> Self {
Self(buffers)
}
}

// TODO: return buffer protocol object? This isn't possible on an array of Bytes, so if you want to
// support the buffer protocol in the future (e.g. for get_range) you may need to have a separate
// wrapper of Bytes
impl IntoPy<PyObject> for PyBytesWrapper {
fn into_py(self, py: Python<'_>) -> PyObject {
PyBytes::new_bound(py, &self.0).into_py(py)
let total_len = self.0.iter().fold(0, |acc, buf| acc + buf.len());
// Copy all internal Bytes objects into a single PyBytes
// Since our inner callback is infallible, this will only panic on out of memory
PyBytes::new_bound_with(py, total_len, |target| {
let mut offset = 0;
for buf in self.0.iter() {
target[offset..offset + buf.len()].copy_from_slice(buf);
offset += buf.len();
}
Ok(())
})
.unwrap()
.into_py(py)
}
}

Expand Down Expand Up @@ -144,7 +268,7 @@ pub(crate) fn get_range(
let range = offset..offset + length;
py.allow_threads(|| {
let out = runtime.block_on(store.as_ref().get_range(&location.into(), range))?;
Ok::<_, PyObjectStoreError>(PyBytesWrapper(out))
Ok::<_, PyObjectStoreError>(PyBytesWrapper::new(out))
})
}

Expand All @@ -163,7 +287,7 @@ pub(crate) fn get_range_async(
.get_range(&location.into(), range)
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
Ok(PyBytesWrapper(out))
Ok(PyBytesWrapper::new(out))
})
}

Expand All @@ -183,7 +307,7 @@ pub(crate) fn get_ranges(
.collect::<Vec<_>>();
py.allow_threads(|| {
let out = runtime.block_on(store.as_ref().get_ranges(&location.into(), &ranges))?;
Ok::<_, PyObjectStoreError>(out.into_iter().map(PyBytesWrapper).collect())
Ok::<_, PyObjectStoreError>(out.into_iter().map(PyBytesWrapper::new).collect())
})
}

Expand All @@ -206,6 +330,6 @@ pub(crate) fn get_ranges_async(
.get_ranges(&location.into(), &ranges)
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
Ok(out.into_iter().map(PyBytesWrapper).collect::<Vec<_>>())
Ok(out.into_iter().map(PyBytesWrapper::new).collect::<Vec<_>>())
})
}
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dev-dependencies = [
"mkdocstrings[python]>=0.26.1",
"pandas>=2.2.3",
"pip>=24.2",
"pytest-asyncio>=0.24.0",
"pytest>=8.3.3",
]

Expand Down
46 changes: 46 additions & 0 deletions tests/test_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import object_store_rs as obs
import pytest
from object_store_rs.store import MemoryStore


def test_stream_sync():
store = MemoryStore()

data = b"the quick brown fox jumps over the lazy dog," * 5000
path = "big-data.txt"

obs.put_file(store, path, data)
resp = obs.get(store, path)
stream = resp.stream(min_chunk_size=0)

# Note: it looks from manual testing that with the local store we're only getting
# one chunk and not able to test the chunk sizing.
pos = 0
for chunk in stream:
size = len(chunk)
assert chunk == data[pos : pos + size]
pos += size

assert pos == len(data)


@pytest.mark.asyncio
async def test_stream_async():
store = MemoryStore()

data = b"the quick brown fox jumps over the lazy dog," * 5000
path = "big-data.txt"

await obs.put_file_async(store, path, data)
resp = await obs.get_async(store, path)
stream = resp.stream(min_chunk_size=0)

# Note: it looks from manual testing that with the local store we're only getting
# one chunk and not able to test the chunk sizing.
pos = 0
async for chunk in stream:
size = len(chunk)
assert chunk == data[pos : pos + size]
pos += size

assert pos == len(data)
2 changes: 0 additions & 2 deletions tests/test_hello.py

This file was deleted.

14 changes: 14 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c10f3fe

Please sign in to comment.