From 977addb5c950498264c2538fa3580c016be4af06 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 4 Nov 2024 14:30:03 -0500 Subject: [PATCH] Attempt storing stream for Python with yoke --- Cargo.lock | 66 +++++++++++++++++++++++++++++++++++- Cargo.toml | 4 +-- obstore/Cargo.toml | 1 + obstore/src/list.rs | 82 ++++++++++++++++++++++++++++++++------------- 4 files changed, 126 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f0a8932..a36e1af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1165,7 +1165,8 @@ dependencies = [ [[package]] name = "object_store" version = "0.11.1" -source = "git+https://github.com/kylebarron/arrow-rs?branch=kyle/list-returns-static-stream#33062b14efc66ff35353bf785cbd444056b09b66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eb4c22c6154a1e759d7099f9ffad7cc5ef8245f9efbab4a41b92623079c82f3" dependencies = [ "async-trait", "base64", @@ -1211,6 +1212,7 @@ dependencies = [ "reqwest", "tokio", "url", + "yoke", ] [[package]] @@ -1909,6 +1911,12 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "static_assertions" version = "1.1.0" @@ -1941,6 +1949,17 @@ dependencies = [ "futures-core", ] +[[package]] +name = "synstructure" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "target-lexicon" version = "0.12.16" @@ -2386,6 +2405,30 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "yoke" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.7.35" @@ -2407,6 +2450,27 @@ dependencies = [ "syn", ] +[[package]] +name = "zerofrom" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zeroize" version = "1.8.1" diff --git a/Cargo.toml b/Cargo.toml index bc7fe7f..6f7de35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,8 +34,8 @@ thiserror = "1" tokio = "1.40" url = "2" -[patch.crates-io] -object_store = { git = "https://github.com/kylebarron/arrow-rs", branch = "kyle/list-returns-static-stream" } +# [patch.crates-io] +# object_store = { git = "https://github.com/kylebarron/arrow-rs", branch = "kyle/list-returns-static-stream" } [profile.release] lto = true diff --git a/obstore/Cargo.toml b/obstore/Cargo.toml index f38391f..eeb9f83 100644 --- a/obstore/Cargo.toml +++ b/obstore/Cargo.toml @@ -38,6 +38,7 @@ tokio = { workspace = true, features = [ "sync", ] } url = { workspace = true } +yoke = { version = "0.7.4", features = ["derive"] } # We opt-in to using rustls as the TLS provider for reqwest, which is the HTTP # library used by object_store. diff --git a/obstore/src/list.rs b/obstore/src/list.rs index f2e4c2b..8a431c9 100644 --- a/obstore/src/list.rs +++ b/obstore/src/list.rs @@ -16,6 +16,7 @@ use pyo3::prelude::*; use pyo3_arrow::PyRecordBatch; use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult}; use tokio::sync::Mutex; +use yoke::Yoke; use crate::runtime::get_runtime; @@ -47,6 +48,15 @@ impl IntoPy for PyObjectMeta { } } +#[derive(yoke::Yokeable)] +struct ListStreamWrapper<'a>(Fuse>>); + +impl<'a> ListStreamWrapper<'a> { + fn new(stream: BoxStream<'a, Result>) -> Self { + Self(stream.fuse()) + } +} + // Note: we fuse the underlying stream so that we can get `None` multiple times. // // In general, you can't poll an iterator after it's already emitted None. But the issue here is @@ -65,19 +75,19 @@ impl IntoPy for PyObjectMeta { // - https://docs.rs/futures/latest/futures/prelude/stream/trait.StreamExt.html#method.fuse #[pyclass(name = "ListStream")] pub(crate) struct PyListStream { - stream: Arc>>>>, + stream: Arc, Arc>>>, chunk_size: usize, return_arrow: bool, } impl PyListStream { fn new( - stream: BoxStream<'static, object_store::Result>, + stream: Yoke, Arc>, chunk_size: usize, return_arrow: bool, ) -> Self { Self { - stream: Arc::new(Mutex::new(stream.fuse())), + stream: Arc::new(Mutex::new(stream)), chunk_size, return_arrow, } @@ -140,7 +150,7 @@ impl IntoPy for PyListIterResult { } async fn next_stream( - stream: Arc>>>>, + stream: Arc>>, chunk_size: usize, sync: bool, return_arrow: bool, @@ -148,7 +158,7 @@ async fn next_stream( let mut stream = stream.lock().await; let mut metas: Vec = vec![]; loop { - match stream.next().await { + match stream.0.next().await { Some(Ok(meta)) => { metas.push(PyObjectMeta(meta)); if metas.len() >= chunk_size { @@ -188,26 +198,44 @@ async fn next_stream( } async fn collect_stream( - stream: Arc>>>>, + // stream: Arc>>, + stream: Arc, Arc>>>, return_arrow: bool, ) -> PyResult { let mut stream = stream.lock().await; let mut metas: Vec = vec![]; loop { - match stream.next().await { - Some(Ok(meta)) => { - metas.push(PyObjectMeta(meta)); - } - Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()), - None => match return_arrow { - true => { - return Ok(PyListIterResult::Arrow(object_meta_to_arrow(&metas))); - } - false => { - return Ok(PyListIterResult::Native(metas)); + stream.with_mut(|stream_inner| { + match stream_inner.0.next().await { + Some(Ok(meta)) => { + metas.push(PyObjectMeta(meta)); } - }, - }; + Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()), + None => match return_arrow { + true => { + return Ok(PyListIterResult::Arrow(object_meta_to_arrow(&metas))); + } + false => { + return Ok(PyListIterResult::Native(metas)); + } + }, + }; + }); + + // match stream.next().await { + // Some(Ok(meta)) => { + // metas.push(PyObjectMeta(meta)); + // } + // Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()), + // None => match return_arrow { + // true => { + // return Ok(PyListIterResult::Arrow(object_meta_to_arrow(&metas))); + // } + // false => { + // return Ok(PyListIterResult::Native(metas)); + // } + // }, + // }; } } @@ -357,11 +385,17 @@ pub(crate) fn list( let store = store.into_inner().clone(); let prefix = prefix.map(|s| s.into()); - let stream = if let Some(offset) = offset { - store.list_with_offset(prefix.as_ref(), &offset.into()) - } else { - store.list(prefix.as_ref()) - }; + + let stream: Yoke, Arc> = + if let Some(offset) = offset { + Yoke::attach_to_cart(store.clone(), |cart| { + ListStreamWrapper::new(cart.list_with_offset(prefix.as_ref(), &offset.into())) + }) + } else { + Yoke::attach_to_cart(store.clone(), |cart| { + ListStreamWrapper::new(cart.list(prefix.as_ref())) + }) + }; Ok(PyListStream::new(stream, chunk_size, return_arrow)) }