Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Change tx root folder to txfiles #151

Merged
merged 11 commits into from
Mar 19, 2024
38 changes: 19 additions & 19 deletions crates/node/src/txvalidation/download_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub async fn download_asset_file(
tracing::info!("download_file:{asset_file:?} local_directory_path:{local_directory_path:?} local_relative_file_path:{local_relative_file_path:?} http_peer_list:{http_peer_list:?}");

// Detect if the file already exist. If yes don't download.
if asset_file.exist(&local_relative_file_path).await {
if asset_file.exist(local_relative_file_path).await {
tracing::trace!(
"download_asset_file: File already exist, skip download: {:#?}",
asset_file.get_save_path()
Expand All @@ -40,7 +40,7 @@ pub async fn download_asset_file(
let file_uri = asset_file.get_uri();
let mut resp = match tokio::time::timeout(
tokio::time::Duration::from_secs(5),
http_client.get(asset_file.file.url).send(),
http_client.get(&asset_file.url).send(),
)
.await
{
Expand All @@ -50,7 +50,7 @@ pub async fn download_asset_file(
.iter()
.filter_map(|(peer, port)| {
port.map(|port| {
//use parse to create an URL, no new method.
// Use parse to create an URL, no new method.
let mut url =
reqwest::Url::parse(&format!("{HTTP_SERVER_SCHEME}localhost")).unwrap(); //unwrap always succeed
url.set_ip_host(peer.ip()).unwrap(); //unwrap always succeed
Expand All @@ -72,35 +72,35 @@ pub async fn download_asset_file(
}
resp.ok_or(eyre!(
"Download no host found to download the file: {}",
asset_file.file.name
asset_file.name
))?
}
Err(err) => {
return Err(eyre!(
"Download file: {:?}, request send timeout.",
asset_file.file.name
asset_file.name
));
}
};

if resp.status() == reqwest::StatusCode::OK {
let file_path = local_directory_path.join(&local_relative_file_path);
let file_path = local_directory_path.join(local_relative_file_path);
// Ensure any necessary subdirectories exists.
if let Some(parent) = file_path.parent() {
if let Ok(false) = tokio::fs::try_exists(parent).await {
tokio::fs::create_dir_all(parent).await?;
}
}

//create a tmp file during download.
//this way the file won't be available for download from the other nodes
//until it is completely written.
// Create a tmp file during download.
// This way the file won't be available for download from the other nodes
// until it is completely written.
let mut tmp_file_path = file_path.clone();
tmp_file_path.set_extension("tmp");
let fd = tokio::fs::File::create(&tmp_file_path).await?;
let mut fd = tokio::io::BufWriter::new(fd);

//create the Hasher to verify the Hash
// Create the Hasher to verify the Hash
let mut hasher = blake3::Hasher::new();

loop {
Expand All @@ -113,25 +113,25 @@ pub async fn download_asset_file(
Ok(Err(_)) => {
return Err(eyre!(
"Download file: {:?}, connection timeout",
asset_file.file.name
asset_file.name
));
}
Err(err) => {
return Err(eyre!(
"Download file: {:?}, http error:{err}",
asset_file.file.name
asset_file.name
));
}
}
}

fd.flush().await?;
let checksum: crate::types::Hash = (&hasher.finalize()).into();
if checksum != asset_file.file.checksum {
if checksum != asset_file.checksum {
Err(eyre!(
"download_file:{:?} bad checksum checksum:{checksum} set_file.checksum:{}.",
asset_file.file.name,
asset_file.file.checksum
asset_file.name,
asset_file.checksum
))
} else {
//rename to original name
Expand All @@ -140,14 +140,14 @@ pub async fn download_asset_file(
} else {
Err(eyre!(
"failed to download file: {:?} response status: {}",
asset_file.file.name,
asset_file.name,
resp.status()
))
}
}

//start the local server and serve the specified file path.
//Return the server task join handle.
// Start the local server and serve the specified file path.
// Return the server task join handle.
pub async fn serve_files(
mut bind_addr: SocketAddr,
http_download_port: u16,
Expand Down Expand Up @@ -207,7 +207,7 @@ async fn server_process_file(
let file = match tokio::fs::File::open(&file_path).await {
Ok(file) => file,
Err(_) => {
//try to see if the file is currently being updated.
// Try to see if the file is currently being updated.
file_path.set_extension("tmp");
let (status_code, message) = if file_path.as_path().exists() {
(
Expand Down
6 changes: 3 additions & 3 deletions crates/node/src/txvalidation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod download_manager;
mod event;

// `ValidatedTxReceiver` provides a simple trait to decouple event based
// transaction handling from the execution part.
// Transaction handling from the execution part.
#[async_trait::async_trait]
pub trait ValidatedTxReceiver: Send + Sync {
async fn send_new_tx(&mut self, tx: Transaction<Validated>) -> eyre::Result<()>;
Expand Down Expand Up @@ -61,8 +61,8 @@ pub enum EventProcessError {

pub type CallbackSender = oneshot::Sender<Result<(), EventProcessError>>;

//Sending Tx interface.
//Some marker type to define the sender source.
// Sending Tx interface.
// Some marker type to define the sender source.
pub struct RpcSender;
#[derive(Clone)]
pub struct P2pSender;
Expand Down
121 changes: 69 additions & 52 deletions crates/node/src/types/file.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use crate::types::transaction;
use crate::types::transaction::ProgramData;
use crate::types::Hash;
use eyre::Result;
use serde::Deserialize;
use serde::Serialize;
use std::path::Path;
use std::path::PathBuf;

// List of folder where the different file type are stored.
pub const IMAGES_DIR: &str = "images";
pub const TX_FILES_DIR: &str = "txfiles";
pub const VM_FILES_DIR: &str = "vmfiles";

// Describe a file use by an executed task.
#[derive(Clone, Debug)]
pub struct TaskVmFile<E> {
Expand All @@ -22,6 +28,7 @@ impl TaskVmFile<()> {
pub fn get_workspace_path(data_directory: &Path, tx_hash: Hash) -> PathBuf {
PathBuf::new()
.join(data_directory)
.join(TX_FILES_DIR)
.join(tx_hash.to_string())
.join(gevulot_shim::WORKSPACE_NAME)
}
Expand All @@ -46,15 +53,14 @@ impl TaskVmFile<VmInput> {
let to = PathBuf::new()
.join(data_dir)
.join(tx_file.get_relatif_path());
tracing::trace!("TaskVmFile copy_file_for_vm_exe to:{to:?}",);
if !tokio::fs::try_exists(&to).await.unwrap_or(false) {
let from = PathBuf::new().join(data_dir).join(&self.extension.0);
if let Some(parent) = to.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|err| format!("mkdir {parent:?} fail:{err}"))?;
}
tracing::trace!("TaskVmFile copy_file_for_vm_exe from:{from:?} to:{to:?}",);
tracing::debug!("TaskVmFile copy_file_for_vm_exe from:{from:?} to:{to:?}",);
tokio::fs::copy(&from, &to)
.await
.map_err(|err| format!("copy file from:{from:?} to:{to:?} error:{err}"))?;
Expand Down Expand Up @@ -124,7 +130,8 @@ impl TaskVmFile<VmOutput> {
file_path = file_path.strip_prefix("/").unwrap(); // Unwrap tested in `is_absolute()`.
}

let mut path = PathBuf::from(&self.extension.0.to_string());
let mut path = PathBuf::from(TX_FILES_DIR);
path.push(self.extension.0.to_string());
path.push(file_path);
path
}
Expand Down Expand Up @@ -187,42 +194,53 @@ pub struct DbFile {
// AssetFile: Use to download the file asset associated to a Tx.
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct AssetFile {
tx_hash: String,
pub name: String,
pub file_path: PathBuf,
pub url: String,
pub checksum: Hash,
// Verify_exist: define if the exist() verification do a real file system verification.
// Some file must be download even if there's already present.
verify_exist: bool,
pub file: DbFile,
pub verify_exist: bool,
}

impl AssetFile {
pub fn new(
name: String,
url: String,
checksum: Hash,
tx_hash: String,
verify_exist: bool,
) -> Self {
AssetFile {
tx_hash,
verify_exist,
file: DbFile {
name,
url,
pub fn new_from_program_data(data: &ProgramData, tx_hash: Hash) -> Result<Option<Self>> {
match data {
ProgramData::Input {
file_name,
file_url,
checksum,
},
} => {
//verify the url is valide.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//verify the url is valide.
// Verify the URL is valid.

reqwest::Url::parse(file_url)?;
let mut file_name_path = Path::new(&file_name);
if file_name_path.is_absolute() {
file_name_path = file_name_path.strip_prefix("/").unwrap(); // Unwrap tested in `is_absolute()`.
}

// Build file path
let mut file_path = PathBuf::from(TX_FILES_DIR);
file_path.push(tx_hash.to_string());
file_path.push(file_name_path);
Ok(Some(AssetFile {
name: file_name.to_string(),
file_path,
url: file_url.to_string(),
checksum: checksum.to_string().into(),
verify_exist: false,
}))
}
ProgramData::Output { .. } => {
/* ProgramData::Output as input means it comes from another
program execution -> skip this branch. */
Ok(None)
}
}
}

// Get relative File path for downloaded files to be saved on the node.
// The path is is <Tx Hash>/<self.name>
pub fn get_save_path(&self) -> PathBuf {
let mut file_path = Path::new(&self.file.name);
if file_path.is_absolute() {
file_path = file_path.strip_prefix("/").unwrap(); // Unwrap tested in `is_absolute()`.
}

let mut path = PathBuf::from(&self.tx_hash);
path.push(file_path);
path
pub fn get_save_path(&self) -> &Path {
&self.file_path
}

// Get relative File path for downloaded files to be saved on the node.
Expand Down Expand Up @@ -271,26 +289,24 @@ impl TxFile<Output> {
}

pub fn into_download_file(self, tx_hash: Hash) -> AssetFile {
let relative_path = self.get_relatif_path(tx_hash).to_str().unwrap().to_string();
let url = format!("{}/{}", self.url, relative_path);
let relative_path = self.get_relatif_path(tx_hash);
let url = format!("{}/{}", self.url, relative_path.to_str().unwrap());

let file_name = Path::new(&self.name).file_name().unwrap_or_default();
let mut path = PathBuf::from(self.checksum.to_string());
path.push(file_name);
AssetFile::new(
path.to_str().unwrap().to_string(),
AssetFile {
name: self.name,
file_path: relative_path,
url,
self.checksum,
tx_hash.to_string(),
true,
)
checksum: self.checksum,
verify_exist: true,
}
}

// Relative File path for Proof or Verify Tx file.
// The path is <Tx Hash>/<self.checksum>/<filename>
pub fn get_relatif_path(&self, tx_hash: Hash) -> PathBuf {
let file_name = Path::new(&self.name).file_name().unwrap_or_default();
let mut path = PathBuf::from(tx_hash.to_string());
let mut path = PathBuf::from(TX_FILES_DIR);
path.push(tx_hash.to_string());
path.push(self.checksum.to_string());
path.push(file_name);
path
Expand Down Expand Up @@ -327,15 +343,16 @@ impl TxFile<Image> {

impl From<TxFile<Image>> for AssetFile {
fn from(file: TxFile<Image>) -> Self {
//image file has the image directory happened at the beginning.
let mut extention = PathBuf::from("images");
extention.push(file.extention.0.to_string());
AssetFile::new(
file.name,
file.url,
file.checksum,
extention.to_str().unwrap().to_string(),
false,
)
let mut file_path = PathBuf::from(IMAGES_DIR);
file_path.push(file.extention.0.to_string()); //Tx hash
file_path.push(&file.name);

AssetFile {
name: file.name,
file_path,
url: file.url,
checksum: file.checksum,
verify_exist: false,
}
}
}
Loading
Loading