Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Commit

Permalink
Change Vsock to VirFs to tranfer files between the VM and the node. (#…
Browse files Browse the repository at this point in the history
…136)

* return error if submit file fail

* set virtFs to run the VM. Change VM execution result management

* correct download url

* correct download url

* add waiting received Tx for parent one

* return error if submit file fail

* set virtFs to run the VM. Change VM execution result management

* return error if submit file fail

* Revert "correct download url"

This reverts commit 3b0e66c.

* Revert "add waiting received Tx for parent one"

This reverts commit 5015b61.

* update cargo.lock

* Apply suggestions from code review

Co-authored-by: Tuomas Mäkinen <[email protected]>

* correct build and clippy

---------

Co-authored-by: Tuomas Mäkinen <[email protected]>
  • Loading branch information
musitdev and tuommaki authored Mar 14, 2024
1 parent 2d6a154 commit 8752850
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 511 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ clap = { version = "4", features = ["derive", "env", "string"], optional = true
console-subscriber = { version = "0.2", optional = true }
futures = { version = "0.3.30", optional = true }
futures-util = { version = "0.3", features = [ "io" ], optional = true }
gevulot-shim = { path = "../shim", default-features = false }
home = { version = "0.5", optional = true}
http-body-util = { version = "0.1", optional = true }
hyper = { version = "1", features = ["full"], optional = true }
Expand Down
137 changes: 71 additions & 66 deletions crates/node/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,16 +509,12 @@ impl TaskManager for Scheduler {

async fn submit_result(
&self,
tx_hash: Hash,
program: Hash,
result: grpc::task_result_request::Result,
) -> Result<(), String> {
tracing::debug!("submit_result result:{result:#?} ");
tracing::debug!("submit_result tx:{tx_hash} result:{result:#?}");

let grpc::task_result_request::Result::Task(result) = result else {
todo!("task failed; handle it correctly")
};

let tx_hash: Hash = (&*result.id).into();
let mut state = self.state.lock().await;
if let Some(running_task) = state.running_tasks.remove(&tx_hash) {
tracing::info!(
Expand All @@ -534,70 +530,79 @@ impl TaskManager for Scheduler {
);
}

// Handle tx execution's result files so that they are available as an input for next task if needed.
let executed_files: Vec<(TaskVmFile<VmOutput>, TxFile<Output>)> = result
.files
.into_iter()
.map(|file| {
let vm_file = TaskVmFile::<VmOutput>::new(file.path.to_string(), tx_hash);
let dest = TxFile::<Output>::new(
file.path,
self.http_download_host.clone(),
file.checksum[..].into(),
);
(vm_file, dest)
})
.collect();

//TODO -Verify that all expected files has been generated.

let new_tx_files: Vec<TxFile<Output>> = executed_files
.iter()
.map(|(_, file)| file)
.cloned()
.collect();
let nonce = rand::thread_rng().next_u64();
let tx = match running_task.task.kind {
TaskKind::Proof => Transaction::new(
Payload::Proof {
parent: running_task.task.tx,
prover: program,
proof: result.data,
files: new_tx_files,
},
&self.node_key,
),
TaskKind::Verification => Transaction::new(
Payload::Verification {
parent: running_task.task.tx,
verifier: program,
verification: result.data,
files: new_tx_files,
},
&self.node_key,
),
TaskKind::PoW => {
todo!("proof of work tasks not implemented yet");
match result {
grpc::task_result_request::Result::Task(result) => {
// Handle tx execution's result files so that they are available as an input for next task if needed.
let executed_files: Vec<(TaskVmFile<VmOutput>, TxFile<Output>)> = result
.files
.into_iter()
.map(|file| {
let vm_file =
TaskVmFile::<VmOutput>::new(file.path.to_string(), tx_hash);
let dest = TxFile::<Output>::new(
file.path,
self.http_download_host.clone(),
file.checksum[..].into(),
);
(vm_file, dest)
})
.collect();

//TODO -Verify that all expected files has been generated.

let new_tx_files: Vec<TxFile<Output>> = executed_files
.iter()
.map(|(_, file)| file)
.cloned()
.collect();
let nonce = rand::thread_rng().next_u64();
let tx = match running_task.task.kind {
TaskKind::Proof => Transaction::new(
Payload::Proof {
parent: running_task.task.tx,
prover: program,
proof: result.data,
files: new_tx_files,
},
&self.node_key,
),
TaskKind::Verification => Transaction::new(
Payload::Verification {
parent: running_task.task.tx,
verifier: program,
verification: result.data,
files: new_tx_files,
},
&self.node_key,
),
TaskKind::PoW => {
todo!("proof of work tasks not implemented yet");
}
TaskKind::Nop => {
panic!(
"impossible to receive result from a task ({}/{}) with task.kind == Nop",
running_task.task.id, running_task.task.tx
);
}
};
tracing::info!("Submit result Tx created:{}", tx.hash.to_string());

// Move tx file from execution Tx path to new Tx path
for (source_file, dest_file) in executed_files {
move_vmfile(&source_file, &dest_file, &self.data_directory, tx.hash)
.await.map_err(|err| format!("failed to move excution file from: {source_file:?} to: {dest_file:?} error: {err}"))?;
}

// Send tx to validation process.
self.tx_sender.send_tx(tx).await.map_err(|err| {
format!("failed to send Tx result to validation process: {}", err)
})?;
}
TaskKind::Nop => {
panic!(
"impossible to receive result from a task ({}/{}) with task.kind == Nop",
running_task.task.id, running_task.task.tx
);
grpc::task_result_request::Result::Error(error) => {
tracing::warn!("Error during Tx:{tx_hash} execution:{error:?}");
}
};
tracing::info!("Submit result Tx created:{}", tx.hash.to_string());

// Move tx file from execution Tx path to new Tx path
for (source_file, dest_file) in executed_files {
move_vmfile(&source_file, &dest_file, &self.data_directory, tx.hash).await.map_err(|err| format!("failed to move excution file from: {source_file:?} to: {dest_file:?} error: {err}"))?;
}

// Send tx to validation process.
self.tx_sender.send_tx(tx).await.map_err(|err| {
format!("failed to send Tx result to validation process: {}", err)
})?;

tracing::debug!("terminating VM running program {}", program);

match state.running_vms.remove(&tx_hash) {
Expand Down
8 changes: 8 additions & 0 deletions crates/node/src/types/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ impl<E> TaskVmFile<E> {
&self.vm_file_path
}
}
impl TaskVmFile<()> {
pub fn get_workspace_path(data_directory: &Path, tx_hash: Hash) -> PathBuf {
PathBuf::new()
.join(data_directory)
.join(tx_hash.to_string())
.join(gevulot_shim::WORKSPACE_NAME)
}
}

// Define A task file send to the VM. Extension contains the node file path.
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
Expand Down
60 changes: 33 additions & 27 deletions crates/node/src/vmm/qemu.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use std::{
any::Any,
collections::HashMap,
fs::File,
path::Path,
process::{Child, Command, Stdio},
sync::Arc,
time::{Duration, Instant},
};

use async_trait::async_trait;
use eyre::Result;
use gevulot_node::types::file::TaskVmFile;
use qapi::{
futures::{QapiStream, QmpStreamTokio},
qmp,
qmp::StatusInfo,
};
use rand::{distributions::Alphanumeric, Rng};
use serde_json::json;
use std::{
any::Any,
collections::HashMap,
fs::File,
path::Path,
process::{Child, Command, Stdio},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
io::{ReadHalf, WriteHalf},
net::{TcpStream, ToSocketAddrs},
Expand Down Expand Up @@ -162,12 +162,12 @@ impl Provider for Qemu {
.to_lowercase();

// XXX: This isn't async and will call out to `ops` for now.
tracing::debug!("creating workspace volume:{workspace_volume_label:?} for the VM");
let workspace_file =
nanos::volume::create(&self.config.data_directory, &workspace_volume_label, "2g")?
.into_os_string();
let workspace_file = workspace_file.to_str().expect("workspace volume path");
tracing::debug!("workspace volume:{workspace_file:?} created");
// tracing::debug!("creating workspace volume:{workspace_volume_label:?} for the VM");
// let workspace_file =
// nanos::volume::create(&self.config.data_directory, &workspace_volume_label, "2g")?
// .into_os_string();
// let workspace_file = workspace_file.to_str().expect("workspace volume path");
// tracing::debug!("workspace volume:{workspace_file:?} created");

let cpus = req.cpus;
let mem_req = req.mem;
Expand Down Expand Up @@ -197,6 +197,15 @@ impl Provider for Qemu {
let qemu_vm_handle = &mut self.vm_registry.get_mut(&cid).unwrap();
let mut cmd = Command::new("/usr/bin/qemu-system-x86_64");

//define the VirtFs local path and create all necessary folder
let workspace_path = TaskVmFile::get_workspace_path(&self.config.data_directory, tx_hash);
// Ensure any necessary subdirectories exists.
if let Ok(false) = tokio::fs::try_exists(&workspace_path).await {
if let Err(err) = tokio::fs::create_dir_all(&workspace_path).await {
tracing::error!("create_dir_all fail for {workspace_path:?} err:{err}");
}
}

cmd.args(["-machine", "q35"])
.args([
"-device",
Expand Down Expand Up @@ -239,6 +248,14 @@ impl Provider for Qemu {
])*/
.args(["-display", "none"])
.args(["-serial", "stdio"])
//VirtFs
.args([
"-virtfs",
&format!(
"local,path={},mount_tag=1,security_model=none,multidevs=remap,id=hd1",
&workspace_path.to_str().unwrap().to_string()
),
])
// VSOCK
.args(["-device", &format!("vhost-vsock-pci,guest-cid={cid}")])
// QMP
Expand Down Expand Up @@ -301,17 +318,6 @@ impl Provider for Qemu {
client.unwrap()
};

// Attach the workspace volume.
let err_add = qmp_client.blockdev_add("workspace", workspace_file).await;
if err_add.is_err() {
tracing::error!("blockdev_add failed: {:?}", err_add);
}

let err_add = qmp_client.device_add("workspace", 1).await;
if err_add.is_err() {
tracing::error!("device_add failed: {:?}", err_add);
}

qmp_client.system_reset().await?;

Ok(VMHandle {
Expand Down
Loading

0 comments on commit 8752850

Please sign in to comment.