Skip to content

Commit

Permalink
Cleanup, and make sure the write_parquet_cloud example can be run aga…
Browse files Browse the repository at this point in the history
…in as-is
  • Loading branch information
Qqwy committed Feb 4, 2024
1 parent 87af454 commit eca2306
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 56 deletions.
38 changes: 20 additions & 18 deletions crates/polars-pipe/src/executors/sinks/output/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,35 @@ pub struct IpcCloudSink {}
#[cfg(feature = "cloud")]
impl IpcCloudSink {
#[allow(clippy::new_ret_no_self)]
pub async fn new(
pub fn new(
uri: &str,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
ipc_options: IpcWriterOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = IpcWriter::new(cloud_writer)
.with_compression(ipc_options.compression)
.batched(schema)?;
polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = IpcWriter::new(cloud_writer)
.with_compression(ipc_options.compression)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;
let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);
let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);

let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
ipc_options.maintain_order,
morsels_per_sink,
)));
let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
ipc_options.maintain_order,
morsels_per_sink,
)));

Ok(FilesSink {
sender,
io_thread_handle,
Ok(FilesSink {
sender,
io_thread_handle,
})
})
}
}
Expand Down
51 changes: 26 additions & 25 deletions crates/polars-pipe/src/executors/sinks/output/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,40 +52,41 @@ pub struct ParquetCloudSink {}
#[cfg(feature = "cloud")]
impl ParquetCloudSink {
#[allow(clippy::new_ret_no_self)]
#[tokio::main(flavor = "current_thread")]
pub async fn new(
pub fn new(
uri: &str,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
parquet_options: ParquetWriteOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = ParquetWriter::new(cloud_writer)
.with_compression(parquet_options.compression)
.with_data_page_size(parquet_options.data_pagesize_limit)
.with_statistics(parquet_options.statistics)
.with_row_group_size(parquet_options.row_group_size)
// This is important! Otherwise we will deadlock
// See: #7074
.set_parallel(false)
.batched(schema)?;
polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = ParquetWriter::new(cloud_writer)
.with_compression(parquet_options.compression)
.with_data_page_size(parquet_options.data_pagesize_limit)
.with_statistics(parquet_options.statistics)
.with_row_group_size(parquet_options.row_group_size)
// This is important! Otherwise we will deadlock
// See: #7074
.set_parallel(false)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;
let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);
let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);

let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
parquet_options.maintain_order,
morsels_per_sink,
)));
let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
parquet_options.maintain_order,
morsels_per_sink,
)));

Ok(FilesSink {
sender,
io_thread_handle,
Ok(FilesSink {
sender,
io_thread_handle,
})
})
}
}
Expand Down
7 changes: 2 additions & 5 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,12 @@ where
)?)
as Box<dyn SinkTrait>,
#[cfg(feature = "ipc")]
FileType::Ipc(ipc_options) => Box::new(
polars_io::pl_async::get_runtime().block_on_potential_spawn(
IpcCloudSink::new(
FileType::Ipc(ipc_options) => Box::new(IpcCloudSink::new(
uri,
cloud_options.as_ref(),
*ipc_options,
input_schema.as_ref(),
)
)?)
)?)
as Box<dyn SinkTrait>,
#[allow(unreachable_patterns)]
other_file_type => todo!("Cloud-sinking of the file type {other_file_type:?} is not (yet) supported."),
Expand Down
2 changes: 1 addition & 1 deletion examples/write_parquet_cloud/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ edition = "2021"

[dependencies]
aws-creds = "0.36.0"
polars = { path = "../../crates/polars", features = ["lazy", "aws", "parquet", "cloud_write"] }
polars = { path = "../../crates/polars", features = ["lazy", "aws", "parquet", "cloud_write", "streaming"] }
11 changes: 4 additions & 7 deletions examples/write_parquet_cloud/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@ fn sink_aws() {

// Propagate the credentials and other cloud options.
let cloud_options = cloud::CloudOptions::default().with_aws([
// (Key::AccessKeyId, &cred.access_key.unwrap()),
// (Key::SecretAccessKey, &cred.secret_key.unwrap()),
// (Key::Region, &"eu-central-1".into()),
(Key::AccessKeyId, "test".to_string()),
(Key::SecretAccessKey, "test".to_string()),
(Key::Endpoint, "http://localhost:4566".to_string()),
(Key::Region, "us-east-1".to_string()),
(Key::AccessKeyId, &cred.access_key.unwrap()),
(Key::SecretAccessKey, &cred.secret_key.unwrap()),
(Key::Region, &"eu-central-1".into()),
(Key::Endpoint, &"http://localhost:4566".to_string()),
]);
let cloud_options = Some(cloud_options);

Expand Down

0 comments on commit eca2306

Please sign in to comment.