Skip to content

Commit

Permalink
Cleanup, and make sure the write_parquet_cloud example can be run aga…
Browse files Browse the repository at this point in the history
…in as-is
  • Loading branch information
Qqwy committed Feb 4, 2024
1 parent 87af454 commit e8f3e67
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 24 deletions.
38 changes: 20 additions & 18 deletions crates/polars-pipe/src/executors/sinks/output/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,35 @@ pub struct IpcCloudSink {}
#[cfg(feature = "cloud")]
impl IpcCloudSink {
#[allow(clippy::new_ret_no_self)]
pub async fn new(
pub fn new(
uri: &str,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
ipc_options: IpcWriterOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = IpcWriter::new(cloud_writer)
.with_compression(ipc_options.compression)
.batched(schema)?;
polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = IpcWriter::new(cloud_writer)
.with_compression(ipc_options.compression)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;
let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);
let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);

let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
ipc_options.maintain_order,
morsels_per_sink,
)));
let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
ipc_options.maintain_order,
morsels_per_sink,
)));

Ok(FilesSink {
sender,
io_thread_handle,
Ok(FilesSink {
sender,
io_thread_handle,
})
})
}
}
Expand Down
7 changes: 2 additions & 5 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,12 @@ where
)?)
as Box<dyn SinkTrait>,
#[cfg(feature = "ipc")]
FileType::Ipc(ipc_options) => Box::new(
polars_io::pl_async::get_runtime().block_on_potential_spawn(
IpcCloudSink::new(
FileType::Ipc(ipc_options) => Box::new(IpcCloudSink::new(
uri,
cloud_options.as_ref(),
*ipc_options,
input_schema.as_ref(),
)
)?)
)?)
as Box<dyn SinkTrait>,
#[allow(unreachable_patterns)]
other_file_type => todo!("Cloud-sinking of the file type {other_file_type:?} is not (yet) supported."),
Expand Down
2 changes: 1 addition & 1 deletion examples/write_parquet_cloud/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ edition = "2021"

[dependencies]
aws-creds = "0.36.0"
polars = { path = "../../crates/polars", features = ["lazy", "aws", "parquet", "cloud_write"] }
polars = { path = "../../crates/polars", features = ["lazy", "aws", "parquet", "cloud_write", "streaming"] }

0 comments on commit e8f3e67

Please sign in to comment.