From 16bf2595340c8950844b2c1a62a2b072add64867 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 17 Nov 2024 12:22:46 -0700 Subject: [PATCH] save progress --- src/context.rs | 3 +-- src/shuffle/reader.rs | 36 ++++++++++++++++++++---------------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/context.rs b/src/context.rs index 86c1b3a..1e063ee 100644 --- a/src/context.rs +++ b/src/context.rs @@ -129,8 +129,7 @@ pub fn execute_partition( py: Python, ) -> PyResult { let plan = deserialize_execution_plan(plan_bytes)?; - _execute_partition(plan, part) - .unwrap() + _execute_partition(plan, part)? .into_iter() .map(|batch| batch.to_pyarrow(py)) .collect() diff --git a/src/shuffle/reader.rs b/src/shuffle/reader.rs index 6de3598..7d3af2e 100644 --- a/src/shuffle/reader.rs +++ b/src/shuffle/reader.rs @@ -116,10 +116,7 @@ impl ExecutionPlan for ShuffleReaderExec { ) -> Result { let mut streams: Vec = vec![]; - // TODO - let num_input_parts = 8; - - for input_part in 0..num_input_parts { + for input_part in 0..self.properties.partitioning.partition_count() { let file = format!( "/{}/shuffle_{}_{}_{partition}.arrow", self.shuffle_dir, self.stage_id, input_part @@ -186,22 +183,30 @@ impl Stream for LocalShuffleStream { if self.reader.is_none() { // download the file from object storage println!("Downloading {} from object storage", self.file.display()); - let object_path = ObjectStorePath::from_filesystem_path(&self.file)?; - let mut file2 = File::create(&self.file)?; + let path_str = format!("{}", self.file.display()); + let object_path = ObjectStorePath::from(path_str); + let mut local_file = File::create(&self.file)?; let object_store = create_object_store()?; - let foo: Result<()> = Handle::current().block_on(async move { - let mut stream = object_store.get(&object_path).await?.into_stream(); - while let Some(chunk) = stream.next().await { - let bytes = chunk?; - file2.write_all(&bytes)?; + let result: Result<()> = Handle::current().block_on(async move { + match object_store.get(&object_path).await { + Ok(get_result) => { + let mut stream = get_result.into_stream(); + while let Some(chunk) = stream.next().await { + let bytes = chunk?; + local_file.write_all(&bytes)?; + } + println!("Deleting {} from object storage", object_path); + object_store.delete(&object_path).await?; + } + Err(e) => { + // TODO need to distinguish between file not existing and other errors + println!("Download failed: {}", e); + } } Ok(()) }); - foo?; - - // println!("Deleting {} from object store", &object_path); - //TODO object_store.delete(&object_path)?; + result?; let file = File::open(&self.file)?; self.reader = Some(FileReader::try_new(file, None)?); @@ -212,7 +217,6 @@ impl Stream for LocalShuffleStream { // "Not all shuffle files have the same schema".to_string(), // )); // } - } if let Some(reader) = self.reader.as_mut() { if let Some(batch) = reader.next() {