Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async iterator over bytes in result of get #11

Merged
merged 6 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion object-store-rs/python/object_store_rs/_get.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,34 @@ class GetOptions(TypedDict):
"""

class GetResult:
"""Result for a get request"""
"""Result for a get request.

You can materialize the entire buffer by using either `bytes` or `bytes_async`, or
you can stream the result using `stream`. `__iter__` and `__aiter__` are implemented
as aliases to `stream`, so you can alternatively call `iter()` or `aiter()` on
`GetResult` to start an iterator.

Using as an async iterator:
```py
resp = await obs.get_async(store, path)
# 5MB chunk size in stream
stream = resp.stream(min_chunk_size=5 * 1024 * 1024)
async for buf in stream:
print(len(buf))
```

Using as a sync iterator:
```py
resp = obs.get(store, path)
# 20MB chunk size in stream
stream = resp.stream(min_chunk_size=20 * 1024 * 1024)
for buf in stream:
print(len(buf))
```

Note that after calling `bytes`, `bytes_async`, or `stream`, you will no longer be
able to call other methods on this object, such as the `meta` attribute.
"""

def bytes(self) -> bytes:
"""
Expand All @@ -98,6 +125,45 @@ class GetResult:
def meta(self) -> ObjectMeta:
"""The ObjectMeta for this object"""

def stream(self, min_chunk_size: int = 10 * 1024 * 1024) -> BytesStream:
"""Return a chunked stream over the result's bytes.

Args:
min_chunk_size: The minimum size in bytes for each chunk in the returned
`BytesStream`. All chunks except for the last chunk will be at least
this size. Defaults to 10*1024*1024 (10MB).

Returns:
A chunked stream
"""

def __aiter__(self) -> BytesStream:
"""
Return a chunked stream over the result's bytes with the default (10MB) chunk
size.
"""

def __iter__(self) -> BytesStream:
"""
Return a chunked stream over the result's bytes with the default (10MB) chunk
size.
"""

class BytesStream:
"""An async stream of bytes."""

def __aiter__(self) -> BytesStream:
"""Return `Self` as an async iterator."""

def __iter__(self) -> BytesStream:
"""Return `Self` as an async iterator."""

async def __anext__(self) -> bytes:
"""Return the next chunk of bytes in the stream."""

def __next__(self) -> bytes:
"""Return the next chunk of bytes in the stream."""

def get(
store: ObjectStore, location: str, *, options: GetOptions | None = None
) -> GetResult:
Expand Down
144 changes: 134 additions & 10 deletions object-store-rs/src/get.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
use std::sync::Arc;

use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{GetOptions, GetResult, ObjectStore};
use pyo3::exceptions::PyValueError;
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult};
use pyo3_object_store::PyObjectStore;
use tokio::sync::Mutex;

use crate::list::PyObjectMeta;
use crate::runtime::get_runtime;

/// 10MB default chunk size
const DEFAULT_BYTES_CHUNK_SIZE: usize = 10 * 1024 * 1024;

#[derive(FromPyObject)]
pub(crate) struct PyGetOptions {
if_match: Option<String>,
Expand Down Expand Up @@ -54,7 +63,7 @@ impl PyGetResult {
let runtime = get_runtime(py)?;
py.allow_threads(|| {
let bytes = runtime.block_on(get_result.bytes())?;
Ok::<_, PyObjectStoreError>(PyBytesWrapper(bytes))
Ok::<_, PyObjectStoreError>(PyBytesWrapper::new(bytes))
})
}

Expand All @@ -68,7 +77,7 @@ impl PyGetResult {
.bytes()
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
Ok(PyBytesWrapper(bytes))
Ok(PyBytesWrapper::new(bytes))
})
}

Expand All @@ -80,14 +89,129 @@ impl PyGetResult {
.ok_or(PyValueError::new_err("Result has already been disposed."))?;
Ok(PyObjectMeta::new(inner.meta.clone()))
}

#[pyo3(signature = (min_chunk_size = DEFAULT_BYTES_CHUNK_SIZE))]
fn stream(&mut self, min_chunk_size: usize) -> PyResult<PyBytesStream> {
let get_result = self
.0
.take()
.ok_or(PyValueError::new_err("Result has already been disposed."))?;
Ok(PyBytesStream::new(get_result.into_stream(), min_chunk_size))
}

fn __aiter__(&mut self) -> PyResult<PyBytesStream> {
self.stream(DEFAULT_BYTES_CHUNK_SIZE)
}

fn __iter__(&mut self) -> PyResult<PyBytesStream> {
self.stream(DEFAULT_BYTES_CHUNK_SIZE)
}
}

#[pyclass(name = "BytesStream")]
pub struct PyBytesStream {
stream: Arc<Mutex<BoxStream<'static, object_store::Result<Bytes>>>>,
min_chunk_size: usize,
}

impl PyBytesStream {
fn new(stream: BoxStream<'static, object_store::Result<Bytes>>, min_chunk_size: usize) -> Self {
Self {
stream: Arc::new(Mutex::new(stream)),
min_chunk_size,
}
}
}

async fn next_stream(
stream: Arc<Mutex<BoxStream<'static, object_store::Result<Bytes>>>>,
min_chunk_size: usize,
sync: bool,
) -> PyResult<PyBytesWrapper> {
let mut stream = stream.lock().await;
let mut buffers: Vec<Bytes> = vec![];
loop {
match stream.next().await {
Some(Ok(bytes)) => {
buffers.push(bytes);
let total_buffer_len = buffers.iter().fold(0, |acc, buf| acc + buf.len());
if total_buffer_len >= min_chunk_size {
return Ok(PyBytesWrapper::new_multiple(buffers));
}
}
Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()),
None => {
if buffers.is_empty() {
// Depending on whether the iteration is sync or not, we raise either a
// StopIteration or a StopAsyncIteration
if sync {
return Err(PyStopIteration::new_err("stream exhausted"));
} else {
return Err(PyStopAsyncIteration::new_err("stream exhausted"));
}
} else {
return Ok(PyBytesWrapper::new_multiple(buffers));
}
}
};
}
}

#[pymethods]
impl PyBytesStream {
fn __aiter__(slf: Py<Self>) -> Py<Self> {
slf
}

fn __iter__(slf: Py<Self>) -> Py<Self> {
slf
}

fn __anext__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<PyAny>> {
let stream = self.stream.clone();
pyo3_async_runtimes::tokio::future_into_py(
py,
next_stream(stream, self.min_chunk_size, false),
)
}

fn __next__<'py>(&'py self, py: Python<'py>) -> PyResult<PyBytesWrapper> {
let runtime = get_runtime(py)?;
let stream = self.stream.clone();
runtime.block_on(next_stream(stream, self.min_chunk_size, true))
}
}

pub(crate) struct PyBytesWrapper(bytes::Bytes);
pub(crate) struct PyBytesWrapper(Vec<Bytes>);

impl PyBytesWrapper {
pub fn new(buf: Bytes) -> Self {
Self(vec![buf])
}

// TODO: return buffer protocol object
pub fn new_multiple(buffers: Vec<Bytes>) -> Self {
Self(buffers)
}
}

// TODO: return buffer protocol object? This isn't possible on an array of Bytes, so if you want to
// support the buffer protocol in the future (e.g. for get_range) you may need to have a separate
// wrapper of Bytes
impl IntoPy<PyObject> for PyBytesWrapper {
fn into_py(self, py: Python<'_>) -> PyObject {
PyBytes::new_bound(py, &self.0).into_py(py)
let total_len = self.0.iter().fold(0, |acc, buf| acc + buf.len());
// Copy all internal Bytes objects into a single PyBytes
// Since our inner callback is infallible, this will only panic on out of memory
PyBytes::new_bound_with(py, total_len, |target| {
let mut offset = 0;
for buf in self.0.iter() {
target[offset..offset + buf.len()].copy_from_slice(buf);
offset += buf.len();
}
Ok(())
})
.unwrap()
.into_py(py)
}
}

Expand Down Expand Up @@ -144,7 +268,7 @@ pub(crate) fn get_range(
let range = offset..offset + length;
py.allow_threads(|| {
let out = runtime.block_on(store.as_ref().get_range(&location.into(), range))?;
Ok::<_, PyObjectStoreError>(PyBytesWrapper(out))
Ok::<_, PyObjectStoreError>(PyBytesWrapper::new(out))
})
}

Expand All @@ -163,7 +287,7 @@ pub(crate) fn get_range_async(
.get_range(&location.into(), range)
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
Ok(PyBytesWrapper(out))
Ok(PyBytesWrapper::new(out))
})
}

Expand All @@ -183,7 +307,7 @@ pub(crate) fn get_ranges(
.collect::<Vec<_>>();
py.allow_threads(|| {
let out = runtime.block_on(store.as_ref().get_ranges(&location.into(), &ranges))?;
Ok::<_, PyObjectStoreError>(out.into_iter().map(PyBytesWrapper).collect())
Ok::<_, PyObjectStoreError>(out.into_iter().map(PyBytesWrapper::new).collect())
})
}

Expand All @@ -206,6 +330,6 @@ pub(crate) fn get_ranges_async(
.get_ranges(&location.into(), &ranges)
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
Ok(out.into_iter().map(PyBytesWrapper).collect::<Vec<_>>())
Ok(out.into_iter().map(PyBytesWrapper::new).collect::<Vec<_>>())
})
}
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dev-dependencies = [
"mkdocstrings[python]>=0.26.1",
"pandas>=2.2.3",
"pip>=24.2",
"pytest-asyncio>=0.24.0",
"pytest>=8.3.3",
]

Expand Down
46 changes: 46 additions & 0 deletions tests/test_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import object_store_rs as obs
import pytest
from object_store_rs.store import MemoryStore


def test_stream_sync():
store = MemoryStore()

data = b"the quick brown fox jumps over the lazy dog," * 5000
path = "big-data.txt"

obs.put_file(store, path, data)
resp = obs.get(store, path)
stream = resp.stream(min_chunk_size=0)

# Note: it looks from manual testing that with the local store we're only getting
# one chunk and not able to test the chunk sizing.
pos = 0
for chunk in stream:
size = len(chunk)
assert chunk == data[pos : pos + size]
pos += size

assert pos == len(data)


@pytest.mark.asyncio
async def test_stream_async():
store = MemoryStore()

data = b"the quick brown fox jumps over the lazy dog," * 5000
path = "big-data.txt"

await obs.put_file_async(store, path, data)
resp = await obs.get_async(store, path)
stream = resp.stream(min_chunk_size=0)

# Note: it looks from manual testing that with the local store we're only getting
# one chunk and not able to test the chunk sizing.
pos = 0
async for chunk in stream:
size = len(chunk)
assert chunk == data[pos : pos + size]
pos += size

assert pos == len(data)
2 changes: 0 additions & 2 deletions tests/test_hello.py

This file was deleted.

14 changes: 14 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.