Skip to content

Commit

Permalink
Attempt storing stream for Python with yoke
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Nov 4, 2024
1 parent 3f1bc05 commit 977addb
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 27 deletions.
66 changes: 65 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ thiserror = "1"
tokio = "1.40"
url = "2"

[patch.crates-io]
object_store = { git = "https://github.com/kylebarron/arrow-rs", branch = "kyle/list-returns-static-stream" }
# [patch.crates-io]
# object_store = { git = "https://github.com/kylebarron/arrow-rs", branch = "kyle/list-returns-static-stream" }

[profile.release]
lto = true
Expand Down
1 change: 1 addition & 0 deletions obstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ tokio = { workspace = true, features = [
"sync",
] }
url = { workspace = true }
yoke = { version = "0.7.4", features = ["derive"] }

# We opt-in to using rustls as the TLS provider for reqwest, which is the HTTP
# library used by object_store.
Expand Down
82 changes: 58 additions & 24 deletions obstore/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use pyo3::prelude::*;
use pyo3_arrow::PyRecordBatch;
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
use tokio::sync::Mutex;
use yoke::Yoke;

use crate::runtime::get_runtime;

Expand Down Expand Up @@ -47,6 +48,15 @@ impl IntoPy<PyObject> for PyObjectMeta {
}
}

#[derive(yoke::Yokeable)]
struct ListStreamWrapper<'a>(Fuse<BoxStream<'a, Result<ObjectMeta, object_store::Error>>>);

impl<'a> ListStreamWrapper<'a> {
fn new(stream: BoxStream<'a, Result<ObjectMeta, object_store::Error>>) -> Self {
Self(stream.fuse())
}
}

// Note: we fuse the underlying stream so that we can get `None` multiple times.
//
// In general, you can't poll an iterator after it's already emitted None. But the issue here is
Expand All @@ -65,19 +75,19 @@ impl IntoPy<PyObject> for PyObjectMeta {
// - https://docs.rs/futures/latest/futures/prelude/stream/trait.StreamExt.html#method.fuse
#[pyclass(name = "ListStream")]
pub(crate) struct PyListStream {
stream: Arc<Mutex<Fuse<BoxStream<'static, object_store::Result<ObjectMeta>>>>>,
stream: Arc<Mutex<Yoke<ListStreamWrapper<'static>, Arc<dyn ObjectStore>>>>,
chunk_size: usize,
return_arrow: bool,
}

impl PyListStream {
fn new(
stream: BoxStream<'static, object_store::Result<ObjectMeta>>,
stream: Yoke<ListStreamWrapper<'static>, Arc<dyn ObjectStore>>,
chunk_size: usize,
return_arrow: bool,
) -> Self {
Self {
stream: Arc::new(Mutex::new(stream.fuse())),
stream: Arc::new(Mutex::new(stream)),
chunk_size,
return_arrow,
}
Expand Down Expand Up @@ -140,15 +150,15 @@ impl IntoPy<PyObject> for PyListIterResult {
}

async fn next_stream(
stream: Arc<Mutex<Fuse<BoxStream<'static, object_store::Result<ObjectMeta>>>>>,
stream: Arc<Mutex<ListStreamWrapper<'static>>>,
chunk_size: usize,
sync: bool,
return_arrow: bool,
) -> PyResult<PyListIterResult> {
let mut stream = stream.lock().await;
let mut metas: Vec<PyObjectMeta> = vec![];
loop {
match stream.next().await {
match stream.0.next().await {
Some(Ok(meta)) => {
metas.push(PyObjectMeta(meta));
if metas.len() >= chunk_size {
Expand Down Expand Up @@ -188,26 +198,44 @@ async fn next_stream(
}

async fn collect_stream(
stream: Arc<Mutex<Fuse<BoxStream<'static, object_store::Result<ObjectMeta>>>>>,
// stream: Arc<Mutex<ListStreamWrapper<'static>>>,
stream: Arc<Mutex<Yoke<ListStreamWrapper<'static>, Arc<dyn ObjectStore>>>>,
return_arrow: bool,
) -> PyResult<PyListIterResult> {
let mut stream = stream.lock().await;
let mut metas: Vec<PyObjectMeta> = vec![];
loop {
match stream.next().await {
Some(Ok(meta)) => {
metas.push(PyObjectMeta(meta));
}
Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()),
None => match return_arrow {
true => {
return Ok(PyListIterResult::Arrow(object_meta_to_arrow(&metas)));
}
false => {
return Ok(PyListIterResult::Native(metas));
stream.with_mut(|stream_inner| {
match stream_inner.0.next().await {
Some(Ok(meta)) => {
metas.push(PyObjectMeta(meta));
}
},
};
Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()),
None => match return_arrow {
true => {
return Ok(PyListIterResult::Arrow(object_meta_to_arrow(&metas)));
}
false => {
return Ok(PyListIterResult::Native(metas));
}
},
};
});

// match stream.next().await {
// Some(Ok(meta)) => {
// metas.push(PyObjectMeta(meta));
// }
// Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()),
// None => match return_arrow {
// true => {
// return Ok(PyListIterResult::Arrow(object_meta_to_arrow(&metas)));
// }
// false => {
// return Ok(PyListIterResult::Native(metas));
// }
// },
// };
}
}

Expand Down Expand Up @@ -357,11 +385,17 @@ pub(crate) fn list(

let store = store.into_inner().clone();
let prefix = prefix.map(|s| s.into());
let stream = if let Some(offset) = offset {
store.list_with_offset(prefix.as_ref(), &offset.into())
} else {
store.list(prefix.as_ref())
};

let stream: Yoke<ListStreamWrapper<'static>, Arc<dyn ObjectStore>> =
if let Some(offset) = offset {
Yoke::attach_to_cart(store.clone(), |cart| {
ListStreamWrapper::new(cart.list_with_offset(prefix.as_ref(), &offset.into()))
})
} else {
Yoke::attach_to_cart(store.clone(), |cart| {
ListStreamWrapper::new(cart.list(prefix.as_ref()))
})
};
Ok(PyListStream::new(stream, chunk_size, return_arrow))
}

Expand Down

0 comments on commit 977addb

Please sign in to comment.