Skip to content

Commit

Permalink
disk import: Prioritize returning upload errors (#992)
Browse files Browse the repository at this point in the history
When sending chunks of the file to upload tasks, we return an error if
the channel to that task is closed. This situation will happen if the
upload task has panicked or already returned due to an error. However,
we check for an error from the sending task first, and the `failed to
send chunks` error will be surfaced to the user, rather than the true
problem that occurred in the upload task.

In the inverse situation, where the reader fails, the uploaders will
return gracefully when their channels are closed, so we do not need to
worry about a problem reading the file showing up as an upload error.

Update our error handling to check for an error from the upload tasks,
then the reader if those returned cleanly.
  • Loading branch information
wfchandler authored Feb 3, 2025
1 parent ef819f4 commit 521ff90
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 9 deletions.
73 changes: 73 additions & 0 deletions cli/tests/test_disk_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,79 @@ fn test_disk_import_bad_file_size() {
.failure();
}

// A disk import where an upload task on write_import fails
#[test]
fn test_disk_write_import_fail() {
let mut src = rand::rngs::SmallRng::seed_from_u64(425);
let server = MockServer::start();

let disk_view_mock = server.disk_view(|when, then| {
when.into_inner().any_request();
then.client_error(
404,
&oxide::types::Error {
error_code: None,
message: "disk not found".into(),
request_id: Uuid::mock_value(&mut src).unwrap().to_string(),
},
);
});

let disk_create_mock = server.disk_create(|when, then| {
when.into_inner().any_request();
then.created(&Disk {
name: "test-import".parse().unwrap(),
..Disk::mock_value(&mut src).unwrap()
});
});

let start_bulk_write_mock = server.disk_bulk_write_import_start(|when, then| {
when.into_inner().any_request();
then.no_content();
});

let disk_bulk_write_mock = server.disk_bulk_write_import(|when, then| {
when.into_inner().any_request();
then.server_error(
503,
&oxide::types::Error {
error_code: None,
message: "I can't do that Dave".into(),
request_id: Uuid::mock_value(&mut src).unwrap().to_string(),
},
);
});

let test_file = Testfile::new_random(CHUNK_SIZE * 2).unwrap();
let output = r#"(?m)\AErrors while uploading the disk image:\n \* Error Response: status: 503 Service Unavailable;.*$\n \* Error Response: status: 503 Service Unavailable;.*"#;

Command::cargo_bin("oxide")
.unwrap()
.env("RUST_BACKTRACE", "1")
.env("OXIDE_HOST", server.url(""))
.env("OXIDE_TOKEN", "test_disk_import_bulk_import_start_fail")
.arg("disk")
.arg("import")
.arg("--project")
.arg("myproj")
.arg("--description")
.arg("disk description")
.arg("--path")
.arg(test_file.path())
.arg("--disk")
.arg("test-disk-import-bulk-import-start-fail")
.arg("--parallelism")
.arg("2")
.assert()
.failure()
.stderr(predicate::str::is_match(output).unwrap());

disk_view_mock.assert_hits(2);
disk_create_mock.assert();
start_bulk_write_mock.assert();
disk_bulk_write_mock.assert_hits(2);
}

// Test for required parameters being supplied
#[test]
fn test_disk_import_required_parameters() {
Expand Down
36 changes: 27 additions & 9 deletions sdk/src/extras/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,14 @@ pub mod types {
if !chunk.iter().all(|x| *x == 0) {
let encoded = base64::engine::general_purpose::STANDARD.encode(&chunk[0..n]);

if let Err(e) = senders[i % self.upload_thread_ct]
if senders[i % self.upload_thread_ct]
.send((offset, encoded, n as u64))
.await
.is_err()
{
break Err(DiskImportError::other(format!(
"sending chunk to thread failed: {e}"
)));
// Failure to send indicates that the upload task exited early
// due to an error on its end. We will return that error below.
break Ok(());
}
} else {
// Bump the progress bar here to make it consistent
Expand All @@ -415,16 +416,33 @@ pub mod types {
drop(tx);
}

read_result?;
let mut errors = Vec::new();
if let Err(e) = read_result {
errors.push(e);
}

let mut results = Vec::with_capacity(handles.len());
for handle in handles {
let result = handle.await.map_err(DiskImportError::other)?;
results.push(result);
if let Err(err) = result {
errors.push(err);
}
}

if results.iter().any(|x| x.is_err()) {
return Err(DiskImportError::other("one of the upload threads failed"));
match errors.len() {
1 => {
return Err(DiskImportError::context(
"Error while uploading the disk image",
errors.remove(0),
))
}
2.. => {
let mut msg = String::from("Errors while uploading the disk image:");
for err in errors {
msg += &format!("\n * {err}");
}
return Err(DiskImportError::Other(msg.into()));
}
0 => {}
}

// Stop the bulk write process
Expand Down

0 comments on commit 521ff90

Please sign in to comment.