Skip to content

Commit

Permalink
Better solution, by using block_on_potential_spawn in all relevant lo…
Browse files Browse the repository at this point in the history
…cations.
  • Loading branch information
Qqwy committed Feb 4, 2024
1 parent e03a835 commit 87af454
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 24 deletions.
26 changes: 9 additions & 17 deletions crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl CloudWriter {
Ok((multipart_id, s3_writer))
}

async fn abort(&self) -> PolarsResult<()> {
async fn abort(&mut self) -> PolarsResult<()> {
self.object_store
.abort_multipart(&self.path, &self.multipart_id)
.await
Expand All @@ -197,21 +197,17 @@ impl CloudWriter {

impl std::io::Write for CloudWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
std::thread::scope(|s| {
s.spawn(|| {
get_runtime().block_on(async {
let res = self.writer.write(buf).await;
if res.is_err() {
let _ = self.abort().await;
}
res
})
}).join().unwrap()
get_runtime().block_on_potential_spawn(async {
let res = self.writer.write(buf).await;
if res.is_err() {
let _ = self.abort().await;
}
res
})
}

fn flush(&mut self) -> std::io::Result<()> {
get_runtime().block_on(async {
get_runtime().block_on_potential_spawn(async {
let res = self.writer.flush().await;
if res.is_err() {
let _ = self.abort().await;
Expand All @@ -223,11 +219,7 @@ impl std::io::Write for CloudWriter {

impl Drop for CloudWriter {
fn drop(&mut self) {
std::thread::scope(|s| {
s.spawn(|| {
let _ = get_runtime().block_on(self.writer.shutdown());
}).join().unwrap()
})
let _ = get_runtime().block_on_potential_spawn(self.writer.shutdown());
}
}

Expand Down
1 change: 0 additions & 1 deletion crates/polars-pipe/src/executors/sinks/output/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub struct IpcCloudSink {}
#[cfg(feature = "cloud")]
impl IpcCloudSink {
#[allow(clippy::new_ret_no_self)]
#[tokio::main(flavor = "current_thread")]
pub async fn new(
uri: &str,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
Expand Down
7 changes: 5 additions & 2 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,15 @@ where
)?)
as Box<dyn SinkTrait>,
#[cfg(feature = "ipc")]
FileType::Ipc(ipc_options) => Box::new(IpcCloudSink::new(
FileType::Ipc(ipc_options) => Box::new(
polars_io::pl_async::get_runtime().block_on_potential_spawn(
IpcCloudSink::new(
uri,
cloud_options.as_ref(),
*ipc_options,
input_schema.as_ref(),
)?)
)
)?)
as Box<dyn SinkTrait>,
#[allow(unreachable_patterns)]
other_file_type => todo!("Cloud-sinking of the file type {other_file_type:?} is not (yet) supported."),
Expand Down
12 changes: 8 additions & 4 deletions examples/write_parquet_cloud/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use polars::prelude::*;

// Login to your aws account and then copy the ../datasets/foods1.parquet file to your own bucket.
// Adjust the link below.
const TEST_S3_LOCATION: &str = "s3://polarstesting/polars_write_example_cloud.parquet";
const TEST_S3_LOCATION: &str = "s3://test-bucket/test-writes/polars_write_example_cloud.parquet";

fn main() -> PolarsResult<()> {
sink_file();
Expand Down Expand Up @@ -37,9 +37,13 @@ fn sink_aws() {

// Propagate the credentials and other cloud options.
let cloud_options = cloud::CloudOptions::default().with_aws([
(Key::AccessKeyId, &cred.access_key.unwrap()),
(Key::SecretAccessKey, &cred.secret_key.unwrap()),
(Key::Region, &"eu-central-1".into()),
// (Key::AccessKeyId, &cred.access_key.unwrap()),
// (Key::SecretAccessKey, &cred.secret_key.unwrap()),
// (Key::Region, &"eu-central-1".into()),
(Key::AccessKeyId, "test".to_string()),
(Key::SecretAccessKey, "test".to_string()),
(Key::Endpoint, "http://localhost:4566".to_string()),
(Key::Region, "us-east-1".to_string()),
]);
let cloud_options = Some(cloud_options);

Expand Down

0 comments on commit 87af454

Please sign in to comment.