Skip to content

Commit

Permalink
save progress
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Nov 17, 2024
1 parent 9d5297f commit 16bf259
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
3 changes: 1 addition & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ pub fn execute_partition(
py: Python,
) -> PyResult<PyResultSet> {
let plan = deserialize_execution_plan(plan_bytes)?;
_execute_partition(plan, part)
.unwrap()
_execute_partition(plan, part)?
.into_iter()
.map(|batch| batch.to_pyarrow(py))
.collect()
Expand Down
36 changes: 20 additions & 16 deletions src/shuffle/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,7 @@ impl ExecutionPlan for ShuffleReaderExec {
) -> Result<SendableRecordBatchStream> {
let mut streams: Vec<SendableRecordBatchStream> = vec![];

// TODO
let num_input_parts = 8;

for input_part in 0..num_input_parts {
for input_part in 0..self.properties.partitioning.partition_count() {
let file = format!(
"/{}/shuffle_{}_{}_{partition}.arrow",
self.shuffle_dir, self.stage_id, input_part
Expand Down Expand Up @@ -186,22 +183,30 @@ impl Stream for LocalShuffleStream {
if self.reader.is_none() {
// download the file from object storage
println!("Downloading {} from object storage", self.file.display());
let object_path = ObjectStorePath::from_filesystem_path(&self.file)?;
let mut file2 = File::create(&self.file)?;
let path_str = format!("{}", self.file.display());
let object_path = ObjectStorePath::from(path_str);
let mut local_file = File::create(&self.file)?;
let object_store = create_object_store()?;
let foo: Result<()> = Handle::current().block_on(async move {
let mut stream = object_store.get(&object_path).await?.into_stream();
while let Some(chunk) = stream.next().await {
let bytes = chunk?;
file2.write_all(&bytes)?;
let result: Result<()> = Handle::current().block_on(async move {
match object_store.get(&object_path).await {
Ok(get_result) => {
let mut stream = get_result.into_stream();
while let Some(chunk) = stream.next().await {
let bytes = chunk?;
local_file.write_all(&bytes)?;
}
println!("Deleting {} from object storage", object_path);
object_store.delete(&object_path).await?;
}
Err(e) => {
// TODO need to distinguish between file not existing and other errors
println!("Download failed: {}", e);
}
}
Ok(())
});

foo?;

// println!("Deleting {} from object store", &object_path);
//TODO object_store.delete(&object_path)?;
result?;

let file = File::open(&self.file)?;
self.reader = Some(FileReader::try_new(file, None)?);
Expand All @@ -212,7 +217,6 @@ impl Stream for LocalShuffleStream {
// "Not all shuffle files have the same schema".to_string(),
// ));
// }

}
if let Some(reader) = self.reader.as_mut() {
if let Some(batch) = reader.next() {
Expand Down

0 comments on commit 16bf259

Please sign in to comment.