From eca2306ba7f2a330b901fb5a498e8bb7d8030574 Mon Sep 17 00:00:00 2001 From: qqwy Date: Sun, 4 Feb 2024 20:08:28 +0100 Subject: [PATCH] Cleanup, and make sure the write_parquet_cloud example can be run again as-is --- .../src/executors/sinks/output/ipc.rs | 38 +++++++------- .../src/executors/sinks/output/parquet.rs | 51 ++++++++++--------- crates/polars-pipe/src/pipeline/convert.rs | 7 +-- examples/write_parquet_cloud/Cargo.toml | 2 +- examples/write_parquet_cloud/src/main.rs | 11 ++-- 5 files changed, 53 insertions(+), 56 deletions(-) diff --git a/crates/polars-pipe/src/executors/sinks/output/ipc.rs b/crates/polars-pipe/src/executors/sinks/output/ipc.rs index 00c23fefb26b..98e33f1d4387 100644 --- a/crates/polars-pipe/src/executors/sinks/output/ipc.rs +++ b/crates/polars-pipe/src/executors/sinks/output/ipc.rs @@ -43,33 +43,35 @@ pub struct IpcCloudSink {} #[cfg(feature = "cloud")] impl IpcCloudSink { #[allow(clippy::new_ret_no_self)] - pub async fn new( + pub fn new( uri: &str, cloud_options: Option<&polars_io::cloud::CloudOptions>, ipc_options: IpcWriterOptions, schema: &Schema, ) -> PolarsResult { - let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?; - let writer = IpcWriter::new(cloud_writer) - .with_compression(ipc_options.compression) - .batched(schema)?; + polars_io::pl_async::get_runtime().block_on_potential_spawn(async { + let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?; + let writer = IpcWriter::new(cloud_writer) + .with_compression(ipc_options.compression) + .batched(schema)?; - let writer = Box::new(writer) as Box; + let writer = Box::new(writer) as Box; - let morsels_per_sink = morsels_per_sink(); - let backpressure = morsels_per_sink * 2; - let (sender, receiver) = bounded(backpressure); + let morsels_per_sink = morsels_per_sink(); + let backpressure = morsels_per_sink * 2; + let (sender, receiver) = bounded(backpressure); - let io_thread_handle = Arc::new(Some(init_writer_thread( - receiver, - writer, - ipc_options.maintain_order, - morsels_per_sink, - ))); + let io_thread_handle = Arc::new(Some(init_writer_thread( + receiver, + writer, + ipc_options.maintain_order, + morsels_per_sink, + ))); - Ok(FilesSink { - sender, - io_thread_handle, + Ok(FilesSink { + sender, + io_thread_handle, + }) }) } } diff --git a/crates/polars-pipe/src/executors/sinks/output/parquet.rs b/crates/polars-pipe/src/executors/sinks/output/parquet.rs index c1c79a44e577..3df2a7688026 100644 --- a/crates/polars-pipe/src/executors/sinks/output/parquet.rs +++ b/crates/polars-pipe/src/executors/sinks/output/parquet.rs @@ -52,40 +52,41 @@ pub struct ParquetCloudSink {} #[cfg(feature = "cloud")] impl ParquetCloudSink { #[allow(clippy::new_ret_no_self)] - #[tokio::main(flavor = "current_thread")] - pub async fn new( + pub fn new( uri: &str, cloud_options: Option<&polars_io::cloud::CloudOptions>, parquet_options: ParquetWriteOptions, schema: &Schema, ) -> PolarsResult { - let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?; - let writer = ParquetWriter::new(cloud_writer) - .with_compression(parquet_options.compression) - .with_data_page_size(parquet_options.data_pagesize_limit) - .with_statistics(parquet_options.statistics) - .with_row_group_size(parquet_options.row_group_size) - // This is important! Otherwise we will deadlock - // See: #7074 - .set_parallel(false) - .batched(schema)?; + polars_io::pl_async::get_runtime().block_on_potential_spawn(async { + let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?; + let writer = ParquetWriter::new(cloud_writer) + .with_compression(parquet_options.compression) + .with_data_page_size(parquet_options.data_pagesize_limit) + .with_statistics(parquet_options.statistics) + .with_row_group_size(parquet_options.row_group_size) + // This is important! Otherwise we will deadlock + // See: #7074 + .set_parallel(false) + .batched(schema)?; - let writer = Box::new(writer) as Box; + let writer = Box::new(writer) as Box; - let morsels_per_sink = morsels_per_sink(); - let backpressure = morsels_per_sink * 2; - let (sender, receiver) = bounded(backpressure); + let morsels_per_sink = morsels_per_sink(); + let backpressure = morsels_per_sink * 2; + let (sender, receiver) = bounded(backpressure); - let io_thread_handle = Arc::new(Some(init_writer_thread( - receiver, - writer, - parquet_options.maintain_order, - morsels_per_sink, - ))); + let io_thread_handle = Arc::new(Some(init_writer_thread( + receiver, + writer, + parquet_options.maintain_order, + morsels_per_sink, + ))); - Ok(FilesSink { - sender, - io_thread_handle, + Ok(FilesSink { + sender, + io_thread_handle, + }) }) } } diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 6b219aed83ba..f0b83a60fdd7 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -221,15 +221,12 @@ where )?) as Box, #[cfg(feature = "ipc")] - FileType::Ipc(ipc_options) => Box::new( - polars_io::pl_async::get_runtime().block_on_potential_spawn( - IpcCloudSink::new( + FileType::Ipc(ipc_options) => Box::new(IpcCloudSink::new( uri, cloud_options.as_ref(), *ipc_options, input_schema.as_ref(), - ) - )?) + )?) as Box, #[allow(unreachable_patterns)] other_file_type => todo!("Cloud-sinking of the file type {other_file_type:?} is not (yet) supported."), diff --git a/examples/write_parquet_cloud/Cargo.toml b/examples/write_parquet_cloud/Cargo.toml index fe02ad8f8457..733972cb60cd 100644 --- a/examples/write_parquet_cloud/Cargo.toml +++ b/examples/write_parquet_cloud/Cargo.toml @@ -7,4 +7,4 @@ edition = "2021" [dependencies] aws-creds = "0.36.0" -polars = { path = "../../crates/polars", features = ["lazy", "aws", "parquet", "cloud_write"] } +polars = { path = "../../crates/polars", features = ["lazy", "aws", "parquet", "cloud_write", "streaming"] } diff --git a/examples/write_parquet_cloud/src/main.rs b/examples/write_parquet_cloud/src/main.rs index b253227899e9..19ff7c215980 100644 --- a/examples/write_parquet_cloud/src/main.rs +++ b/examples/write_parquet_cloud/src/main.rs @@ -37,13 +37,10 @@ fn sink_aws() { // Propagate the credentials and other cloud options. let cloud_options = cloud::CloudOptions::default().with_aws([ - // (Key::AccessKeyId, &cred.access_key.unwrap()), - // (Key::SecretAccessKey, &cred.secret_key.unwrap()), - // (Key::Region, &"eu-central-1".into()), - (Key::AccessKeyId, "test".to_string()), - (Key::SecretAccessKey, "test".to_string()), - (Key::Endpoint, "http://localhost:4566".to_string()), - (Key::Region, "us-east-1".to_string()), + (Key::AccessKeyId, &cred.access_key.unwrap()), + (Key::SecretAccessKey, &cred.secret_key.unwrap()), + (Key::Region, &"eu-central-1".into()), + (Key::Endpoint, &"http://localhost:4566".to_string()), ]); let cloud_options = Some(cloud_options);