Skip to content

Commit

Permalink
Better solution, by using block_on_potential_spawn in all relevant lo…
Browse files Browse the repository at this point in the history
…cations.
  • Loading branch information
Qqwy committed Feb 4, 2024
1 parent e03a835 commit ef4ae59
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 20 deletions.
26 changes: 9 additions & 17 deletions crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl CloudWriter {
Ok((multipart_id, s3_writer))
}

async fn abort(&self) -> PolarsResult<()> {
async fn abort(&mut self) -> PolarsResult<()> {
self.object_store
.abort_multipart(&self.path, &self.multipart_id)
.await
Expand All @@ -197,21 +197,17 @@ impl CloudWriter {

impl std::io::Write for CloudWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
std::thread::scope(|s| {
s.spawn(|| {
get_runtime().block_on(async {
let res = self.writer.write(buf).await;
if res.is_err() {
let _ = self.abort().await;
}
res
})
}).join().unwrap()
get_runtime().block_on_potential_spawn(async {
let res = self.writer.write(buf).await;
if res.is_err() {
let _ = self.abort().await;
}
res
})
}

fn flush(&mut self) -> std::io::Result<()> {
get_runtime().block_on(async {
get_runtime().block_on_potential_spawn(async {
let res = self.writer.flush().await;
if res.is_err() {
let _ = self.abort().await;
Expand All @@ -223,11 +219,7 @@ impl std::io::Write for CloudWriter {

impl Drop for CloudWriter {
fn drop(&mut self) {
std::thread::scope(|s| {
s.spawn(|| {
let _ = get_runtime().block_on(self.writer.shutdown());
}).join().unwrap()
})
let _ = get_runtime().block_on_potential_spawn(self.writer.shutdown());
}
}

Expand Down
1 change: 0 additions & 1 deletion crates/polars-pipe/src/executors/sinks/output/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub struct IpcCloudSink {}
#[cfg(feature = "cloud")]
impl IpcCloudSink {
#[allow(clippy::new_ret_no_self)]
#[tokio::main(flavor = "current_thread")]
pub async fn new(
uri: &str,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
Expand Down
5 changes: 3 additions & 2 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,13 @@ where
)?)
as Box<dyn SinkTrait>,
#[cfg(feature = "ipc")]
FileType::Ipc(ipc_options) => Box::new(IpcCloudSink::new(
FileType::Ipc(ipc_options) => Box::new(polars_io::pl_async::get_runtime().block_on_potential_spawn(
IpcCloudSink::new(
uri,
cloud_options.as_ref(),
*ipc_options,
input_schema.as_ref(),
)?)
))?)
as Box<dyn SinkTrait>,
#[allow(unreachable_patterns)]
other_file_type => todo!("Cloud-sinking of the file type {other_file_type:?} is not (yet) supported."),
Expand Down

0 comments on commit ef4ae59

Please sign in to comment.