Skip to content

Commit

Permalink
fix storage
Browse files Browse the repository at this point in the history
Signed-off-by: dierbei <[email protected]>
  • Loading branch information
dierbei committed Aug 15, 2024
1 parent 85ec6c2 commit 774a9e4
Show file tree
Hide file tree
Showing 8 changed files with 531 additions and 191 deletions.
16 changes: 8 additions & 8 deletions examples/simple_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use slog::{error, info};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use std::thread;
use tokio::runtime::Runtime;
use tokio::time::Duration;

use raft_rs::network::{NetworkLayer, TCPManager};
Expand All @@ -34,7 +32,7 @@ async fn main() {
.clone()
.iter()
.map(|n| ServerConfig {
election_timeout: Duration::from_millis(200),
election_timeout: Duration::from_millis(1000),
address: n.address,
default_leader: Some(1u32),
leadership_preferences: HashMap::new(),
Expand All @@ -47,10 +45,11 @@ async fn main() {
for (i, config) in configs.into_iter().enumerate() {
let id = cluster_nodes[i];
let cc = cluster_config.clone();
handles.push(thread::spawn(move || {
let rt = Runtime::new().unwrap();
let mut server = Server::new(id, config, cc);
rt.block_on(server.start());
handles.push(tokio::spawn(async move {
// let rt = Runtime::new().unwrap();
let mut server = Server::new(id, config, cc).await;
server.start().await;
// rt.block_on(server.start());
}));
}

Expand All @@ -60,7 +59,8 @@ async fn main() {
tokio::time::sleep(Duration::from_secs(2)).await;
// Join all server threads
for handle in handles {
handle.join().unwrap();
// handle.join().unwrap();
handle.await.unwrap();
}
}

Expand Down
38 changes: 22 additions & 16 deletions examples/simulate_add_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use slog::error;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use std::thread;
use tokio::runtime::Runtime;
use tokio::time::Duration;

use raft_rs::network::{NetworkLayer, TCPManager};
Expand All @@ -17,15 +15,15 @@ use raft_rs::server::{Server, ServerConfig};
#[tokio::main]
async fn main() {
// Define cluster configuration
let mut cluster_nodes = vec![1, 2, 3, 4, 5];
let cluster_nodes = vec![1, 2, 3, 4, 5];
let peers = vec![
NodeMeta::from((1, SocketAddr::from_str("127.0.0.1:5001").unwrap())),
NodeMeta::from((2, SocketAddr::from_str("127.0.0.1:5002").unwrap())),
NodeMeta::from((3, SocketAddr::from_str("127.0.0.1:5003").unwrap())),
NodeMeta::from((4, SocketAddr::from_str("127.0.0.1:5004").unwrap())),
NodeMeta::from((5, SocketAddr::from_str("127.0.0.1:5005").unwrap())),
];
let mut cluster_config = ClusterConfig::new(peers.clone());
let cluster_config = ClusterConfig::new(peers.clone());
// Create server configs
let configs: Vec<_> = peers
.clone()
Expand All @@ -44,10 +42,11 @@ async fn main() {
for (i, config) in configs.into_iter().enumerate() {
let id = cluster_nodes[i];
let cc = cluster_config.clone();
handles.push(thread::spawn(move || {
let rt = Runtime::new().unwrap();
let mut server = Server::new(id, config, cc);
rt.block_on(server.start());
handles.push(tokio::spawn(async move {
// let rt = Runtime::new().unwrap();
let mut server = Server::new(id, config, cc).await;
server.start().await;
// rt.block_on(server.start());
}));
}

Expand All @@ -66,10 +65,11 @@ async fn main() {
};

// Launching a new node
handles.push(thread::spawn(move || {
let rt = Runtime::new().unwrap();
let mut server = Server::new(new_node_id, new_node_conf, cluster_config);
rt.block_on(server.start());
handles.push(tokio::spawn(async move {
// let rt = Runtime::new().unwrap();
let mut server = Server::new(new_node_id, new_node_conf, cluster_config).await;
server.start().await;
// rt.block_on(server.start());
}));

// Simulate sending a Raft Join request after a few seconds
Expand All @@ -79,15 +79,15 @@ async fn main() {

// Wait for all servers to finish
for handle in handles {
handle.join().unwrap();
// handle.join().unwrap();
handle.await.unwrap();
}
}

async fn add_node_request(new_node_id: u32, addr: SocketAddr) {
let log = get_logger();

let server_address = addr;
let network_manager = TCPManager::new(server_address);
let network_manager = TCPManager::new(addr);

let request_data = vec![
new_node_id.to_be_bytes().to_vec(),
Expand All @@ -98,7 +98,13 @@ async fn add_node_request(new_node_id: u32, addr: SocketAddr) {
.concat();

// Let's assume that 5001 is the port of the leader node.
if let Err(e) = network_manager.send(&server_address, &request_data).await {
if let Err(e) = network_manager
.send(
&SocketAddr::from_str("127.0.0.1:5001").unwrap(),
&request_data,
)
.await
{
error!(log, "Failed to send client request: {}", e);
}
}
4 changes: 2 additions & 2 deletions examples/simulate_node_failure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn main() {
let id = cluster_nodes[i];
let cc = cluster_config.clone();
let server_handle = tokio::spawn(async move {
let mut server = Server::new(id, config, cc);
let mut server = Server::new(id, config, cc).await;
server.start().await;
});
server_handles.push(server_handle);
Expand Down Expand Up @@ -77,7 +77,7 @@ async fn main() {
};
let cc = cluster_config.clone();
let server_handle = tokio::spawn(async move {
let mut server = Server::new(server_to_stop.try_into().unwrap(), config, cc);
let mut server = Server::new(server_to_stop.try_into().unwrap(), config, cc).await;
server.start().await;
});
server_handles[server_to_stop - 1] = server_handle;
Expand Down
148 changes: 148 additions & 0 deletions src/c.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
fn get_random_file_path() -> std::io::Result<PathBuf> {
let filename: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(10)
.map(char::from)
.collect();

let filepath = PathBuf::from(format!("/tmp/.tmp{}", filename));

Ok(filepath)
}

// fn remove_random_file(path: PathBuf) -> std::io::Result<()> {
// std::fs::remove_file(path)?;

// Ok(())
// }

fn file_exists(path: &PathBuf) -> bool {
std::path::Path::new(path).exists()
}

#[test]
fn test_retrieve_checksum() {
let data_str = "Some data followed by a checksum".as_bytes();
let calculated_checksum = calculate_checksum(data_str);

let data = [data_str, calculated_checksum.as_slice()].concat();

let retrieved_checksum = retrieve_checksum(&data);
assert_eq!(calculated_checksum, retrieved_checksum);
}

#[tokio::test]
async fn test_store_async() {
let random_path = get_random_file_path().unwrap();

let storage: Box<dyn Storage> = Box::new(LocalStorage::new_from_path(&random_path).await);
let payload_data = "Some data to test raft".as_bytes();
let store_result = storage.store(payload_data).await;
assert!(store_result.is_ok());

let disk_data = storage.retrieve().await.unwrap();
assert_eq!(payload_data.len() + CHECKSUM_LEN, disk_data.len());

let data = &disk_data[..disk_data.len() - CHECKSUM_LEN];
assert_eq!(payload_data, data);

storage.delete().await.unwrap();
}

#[tokio::test]
async fn test_delete() {
let random_path = get_random_file_path().unwrap();

let storage: Box<dyn Storage> = Box::new(LocalStorage::new_from_path(&random_path).await);
let delete_result = storage.delete().await;
assert!(delete_result.is_ok());
assert!(!file_exists(&random_path));
}

#[tokio::test]
async fn test_compaction_file_lt_max_file_size() {
let random_path = get_random_file_path().unwrap();

let storage: Box<dyn Storage> = Box::new(LocalStorage::new_from_path(&random_path).await);
let mock_data = vec![0u8; 1_000_000 /*1 MB*/ - 500];
let store_result = storage.store(&mock_data).await;
assert!(store_result.is_ok());

let compaction_result = storage.compaction().await;
assert!(compaction_result.is_ok());
assert!(file_exists(&random_path));

storage.delete().await.unwrap();
}

#[tokio::test]
async fn test_compaction_file_gt_max_file_size() {
let random_path = get_random_file_path().unwrap();

let storage: Box<dyn Storage> = Box::new(LocalStorage::new_from_path(&random_path).await);
let mock_data = vec![0u8; 1_000_000 /*1 MB*/];
let store_result = storage.store(&mock_data).await;
assert!(store_result.is_ok());

let compaction_result = storage.compaction().await;
assert!(compaction_result.is_ok());

assert!(!file_exists(&random_path));
}

#[tokio::test]
async fn test_retrieve_data() {
let random_path = get_random_file_path().unwrap();

let storage: Box<dyn Storage> = Box::new(LocalStorage::new_from_path(&random_path).await);
let log_entry_size = std::mem::size_of::<LogEntry>();

// Insert the first data first
let entry1 = LogEntry {
leader_id: 1,
server_id: 1,
term: 1,
command: LogCommand::Set,
data: 1,
};
let serialize_data = bincode::serialize(&entry1).unwrap();
storage.store(&serialize_data).await.unwrap();
let disk_data = storage.retrieve().await.unwrap();
let log_entry_bytes = &disk_data[0..log_entry_size];
let disk_entry: LogEntry = bincode::deserialize(log_entry_bytes).unwrap();
assert_eq!(entry1, disk_entry);

// Then insert the second data
let entry2 = LogEntry {
leader_id: 2,
server_id: 2,
term: 2,
command: LogCommand::Set,
data: 2,
};
let serialize_data = bincode::serialize(&entry2).unwrap();
storage.store(&serialize_data).await.unwrap();
let disk_data = storage.retrieve().await.unwrap();

// Try to read two pieces of data and sit down to compare
let mut log_entrys = vec![];
let mut cursor = Cursor::new(&disk_data);
loop {
let mut bytes_data = vec![0u8; log_entry_size];
if cursor.read_exact(&mut bytes_data).is_err() {
break;
}
let struct_data: LogEntry = bincode::deserialize(&bytes_data).unwrap();

let mut checksum = [0u8; CHECKSUM_LEN];
if cursor.read_exact(&mut checksum).is_err() {
break;
}

log_entrys.push(struct_data);
}

assert_eq!(vec![entry1, entry2], log_entrys);

storage.delete().await.unwrap();
}
4 changes: 2 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ pub enum NetworkError {
ConnectError(SocketAddr),
#[error("Failed binding to {0}")]
BindError(SocketAddr),
#[error("Broadcast failed")]
BroadcastError,
#[error("Broadcast failed, errmsg: {0}")]
BroadcastError(String),
}

#[derive(Error, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl NetworkLayer for TCPManager {
.into_iter()
.collect::<std::result::Result<_, _>>()
// FIXME: We should let client decide what to do with the errors
.map_err(|_e| NetworkError::BroadcastError)?;
.map_err(|e| NetworkError::BroadcastError(e.to_string()))?;
Ok(())
}

Expand Down
Loading

0 comments on commit 774a9e4

Please sign in to comment.