Skip to content

Commit

Permalink
save progress
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Nov 17, 2024
1 parent 50b6715 commit 75d9a50
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
17 changes: 10 additions & 7 deletions src/shuffle/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl ExecutionPlan for ShuffleReaderExec {

for input_part in 0..self.properties.partitioning.partition_count() {
let file = format!(
"/{}/shuffle_{}_{}_{partition}.arrow",
"{}/shuffle_{}_{}_{partition}.arrow",
self.shuffle_dir, self.stage_id, input_part
);
debug!(
Expand All @@ -132,7 +132,7 @@ impl ExecutionPlan for ShuffleReaderExec {
let mut local_file = File::create(&file)?;
let object_store = create_object_store()?;

let result: Result<()> = block_in_place(move || {
let result: Result<Option<LocalShuffleStream>> = block_in_place(move || {
Handle::current().block_on(async move {
match object_store.get(&object_path).await {
Ok(get_result) => {
Expand All @@ -143,20 +143,23 @@ impl ExecutionPlan for ShuffleReaderExec {
}
println!("Deleting {} from object storage", object_path);
object_store.delete(&object_path).await?;
Ok(Some(LocalShuffleStream::new(
PathBuf::from(&file),
self.schema.clone(),
)))
}
Err(e) => {
// TODO need to distinguish between file not existing and other errors
println!("Download failed: {}", e);
Ok(None)
}
}
Ok(())
})
});

result?;

let stream = LocalShuffleStream::new(PathBuf::from(&file), self.schema.clone());
streams.push(Box::pin(stream));
if let Some(stream) = result? {
streams.push(Box::pin(stream));
}
}
Ok(Box::pin(CombinedRecordBatchStream::new(
self.schema.clone(),
Expand Down
9 changes: 6 additions & 3 deletions src/shuffle/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl ExecutionPlan for ShuffleWriterExec {
Partitioning::UnknownPartitioning(_) => {
// stream the results from the query, preserving the input partitioning
let path =
format!("/{shuffle_dir}/shuffle_{stage_id}_{input_partition}_0.arrow");
format!("{shuffle_dir}/shuffle_{stage_id}_{input_partition}_0.arrow");
let path = Path::new(&path);
debug!(
"ShuffleWriterExec[stage={}] Writing results to {:?}",
Expand Down Expand Up @@ -197,7 +197,7 @@ impl ExecutionPlan for ShuffleWriterExec {
}
None => {
let path = format!(
"/{shuffle_dir}/shuffle_{stage_id}_{input_partition}_{output_partition}.arrow",
"{shuffle_dir}/shuffle_{stage_id}_{input_partition}_{output_partition}.arrow",
);
let path = Path::new(&path);
debug!("ShuffleWriterExec[stage={}] Writing results to {:?}", stage_id, path);
Expand Down Expand Up @@ -337,8 +337,11 @@ impl IPCWriter {

let object_store_path = ObjectStorePath::from_filesystem_path(&self.path)?;

// disabled due to error "Your proposed upload is smaller than the minimum allowed object size"
let multi_part_enabled = false;

// TODO make threshold configurable
if self.num_bytes > 10 * 1024 * 1024 {
if multi_part_enabled && self.num_bytes > 10 * 1024 * 1024 {
println!(
"Uploading shuffle file {} containing {} bytes (put_multipart)",
&self.path.display(),
Expand Down

0 comments on commit 75d9a50

Please sign in to comment.