Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disk import: Prioritize returning upload errors #992

Merged
merged 6 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions cli/tests/test_disk_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,79 @@ fn test_disk_import_bad_file_size() {
.failure();
}

// A disk import where an upload task on write_import fails
#[test]
fn test_disk_write_import_fail() {
let mut src = rand::rngs::SmallRng::seed_from_u64(425);
let server = MockServer::start();

let disk_view_mock = server.disk_view(|when, then| {
when.into_inner().any_request();
then.client_error(
404,
&oxide::types::Error {
error_code: None,
message: "disk not found".into(),
request_id: Uuid::mock_value(&mut src).unwrap().to_string(),
},
);
});

let disk_create_mock = server.disk_create(|when, then| {
when.into_inner().any_request();
then.created(&Disk {
name: "test-import".parse().unwrap(),
..Disk::mock_value(&mut src).unwrap()
});
});

let start_bulk_write_mock = server.disk_bulk_write_import_start(|when, then| {
when.into_inner().any_request();
then.no_content();
});

let disk_bulk_write_mock = server.disk_bulk_write_import(|when, then| {
when.into_inner().any_request();
then.server_error(
503,
&oxide::types::Error {
error_code: None,
message: "I can't do that Dave".into(),
request_id: Uuid::mock_value(&mut src).unwrap().to_string(),
},
);
});

let test_file = Testfile::new_random(CHUNK_SIZE * 2).unwrap();
let output = "upload task failed: Error Response: status: 503 Service Unavailable;";

Command::cargo_bin("oxide")
.unwrap()
.env("RUST_BACKTRACE", "1")
.env("OXIDE_HOST", server.url(""))
.env("OXIDE_TOKEN", "test_disk_import_bulk_import_start_fail")
.arg("disk")
.arg("import")
.arg("--project")
.arg("myproj")
.arg("--description")
.arg("disk description")
.arg("--path")
.arg(test_file.path())
.arg("--disk")
.arg("test-disk-import-bulk-import-start-fail")
.arg("--parallelism") // Ensure only one write request is sent.
.arg("1")
wfchandler marked this conversation as resolved.
Show resolved Hide resolved
.assert()
.failure()
.stderr(predicate::str::starts_with(output));

disk_view_mock.assert_hits(2);
disk_create_mock.assert();
start_bulk_write_mock.assert();
disk_bulk_write_mock.assert();
}

// Test for required parameters being supplied
#[test]
fn test_disk_import_required_parameters() {
Expand Down
32 changes: 24 additions & 8 deletions sdk/src/extras/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,14 @@ pub mod types {
if !chunk.iter().all(|x| *x == 0) {
let encoded = base64::engine::general_purpose::STANDARD.encode(&chunk[0..n]);

if let Err(e) = senders[i % self.upload_thread_ct]
if senders[i % self.upload_thread_ct]
.send((offset, encoded, n as u64))
.await
.is_err()
{
break Err(DiskImportError::other(format!(
"sending chunk to thread failed: {e}"
)));
// Failure to send indicates that the upload task exited early
// due to an error on its end. We will return that error below.
break Ok(());
}
} else {
// Bump the progress bar here to make it consistent
Expand All @@ -417,14 +418,29 @@ pub mod types {

read_result?;
wfchandler marked this conversation as resolved.
Show resolved Hide resolved

let mut results = Vec::with_capacity(handles.len());
let mut errors = Vec::new();
for handle in handles {
let result = handle.await.map_err(DiskImportError::other)?;
results.push(result);
if let Err(err) = result {
errors.push(err);
}
}

if results.iter().any(|x| x.is_err()) {
return Err(DiskImportError::other("one of the upload threads failed"));
match errors.len() {
1 => {
return Err(DiskImportError::context(
"upload task failed",
wfchandler marked this conversation as resolved.
Show resolved Hide resolved
errors.remove(0),
))
}
2.. => {
let mut msg = String::from("upload tasks failed:");
for err in errors {
msg += &format!("\n * {err}");
}
return Err(DiskImportError::Other(msg.into()));
}
_ => {}
wfchandler marked this conversation as resolved.
Show resolved Hide resolved
}

// Stop the bulk write process
Expand Down