diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 59b697e..04dadf8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -43,7 +43,7 @@ jobs: - name: Clippy uses: clechasseur/rs-clippy-check@v3 with: - args: --all-features --bins --examples --tests --benches -- -D warnings + args: --all-features --bins --examples --tests --benches -- -W clippy::all -W clippy::pedantic -D warnings tests: needs: pre_job diff --git a/src/commands/clean.rs b/src/commands/clean.rs index 0f36ebc..5798f45 100644 --- a/src/commands/clean.rs +++ b/src/commands/clean.rs @@ -9,7 +9,7 @@ pub fn cmd(data_dir: &Path) -> Result<()> { .context(format!("failed to remove directory {}", data_dir.display()))?; info!("Successfully removed : {}", data_dir.to_string_lossy()); } else { - warn!("Data directory does not exist") + warn!("Data directory does not exist"); } Ok(()) diff --git a/src/commands/config/apply.rs b/src/commands/config/apply.rs index e8d5595..7b456d5 100644 --- a/src/commands/config/apply.rs +++ b/src/commands/config/apply.rs @@ -13,12 +13,12 @@ fn apply_service_config( plugin_name: &str, plugin_version: &str, service_name: &str, - config: HashMap, + config: &HashMap, admin_socket: &Path, ) -> Result<()> { let mut queries: Vec = Vec::new(); - for (key, value) in &config { + for (key, value) in config { let value = serde_json::to_string(&value) .context(format!("failed to serialize the string with key {key}"))?; queries.push(format!( @@ -83,12 +83,12 @@ pub fn cmd(config_path: &Path, data_dir: &Path) -> Result<()> { &cargo_manifest.package.name, &cargo_manifest.package.version, &service_name, - service_config, + &service_config, &admin_socket, ) .context(format!( "failed to apply service config for service {service_name}" - ))? + ))?; } Ok(()) diff --git a/src/commands/lib.rs b/src/commands/lib.rs index b723e4c..98cfeda 100644 --- a/src/commands/lib.rs +++ b/src/commands/lib.rs @@ -6,6 +6,7 @@ pub enum BuildType { Debug, } +#[allow(clippy::needless_pass_by_value)] pub fn cargo_build(build_type: BuildType) -> Result<()> { let output = match build_type { BuildType::Release => Command::new("cargo") diff --git a/src/commands/plugin/new.rs b/src/commands/plugin/new.rs index b34e11e..ad18175 100644 --- a/src/commands/plugin/new.rs +++ b/src/commands/plugin/new.rs @@ -4,7 +4,7 @@ use std::{ ffi::OsStr, fs::{self, File}, io::Write, - path::{Path, PathBuf}, + path::Path, process::Command, }; @@ -71,14 +71,14 @@ where Ok(()) } -fn workspace_init(root_path: &PathBuf, project_name: &str) -> Result<()> { +fn workspace_init(root_path: &Path, project_name: &str) -> Result<()> { let cargo_toml_path = root_path.join("Cargo.toml"); let mut cargo_toml = File::create(cargo_toml_path).context("failed to create Cargo.toml for workspace")?; cargo_toml - .write_all(format!("[workspace]\nmembers = [\n \"{}\",\n]", project_name).as_bytes())?; + .write_all(format!("[workspace]\nmembers = [\n \"{project_name}\",\n]").as_bytes())?; fs::copy( root_path.join(project_name).join("topology.toml"), @@ -114,7 +114,7 @@ pub fn cmd(path: Option<&Path>, without_git: bool, init_workspace: bool) -> Resu let plugin_path = if init_workspace { path.join(project_name) } else { - path.to_path_buf() + path.clone() }; std::fs::create_dir_all(&plugin_path) diff --git a/src/commands/plugin/pack.rs b/src/commands/plugin/pack.rs index 21402c1..5765f74 100644 --- a/src/commands/plugin/pack.rs +++ b/src/commands/plugin/pack.rs @@ -65,7 +65,7 @@ pub fn cmd(pack_debug: bool) -> Result<()> { ) .context("failed to parse Cargo.toml")?; - let normalized_package_name = cargo_manifest.package.name.replace("-", "_"); + let normalized_package_name = cargo_manifest.package.name.replace('-', "_"); let compressed_file = File::create(format!( "target/{}-{}.tar.gz", @@ -77,7 +77,7 @@ pub fn cmd(pack_debug: bool) -> Result<()> { let lib_name = format!("lib{normalized_package_name}.{LIB_EXT}"); let mut lib_file = - File::open(build_dir.join(&lib_name)).context(format!("failed to open {}", lib_name))?; + File::open(build_dir.join(&lib_name)).context(format!("failed to open {lib_name}"))?; let mut manifest_file = File::open(build_dir.join("manifest.yaml")).context("failed to open file manifest.yaml")?; diff --git a/src/commands/run.rs b/src/commands/run.rs index d15972b..f28d732 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -58,20 +58,20 @@ fn enable_plugins( continue; }; for service in services { - let plugin_dir = plugins_dir.join(service.plugin.clone()); + let current_plugin_dir = plugins_dir.join(service.plugin.clone()); - if !plugin_dir.exists() { + if !current_plugin_dir.exists() { bail!( "directory {} does not exist, run \"cargo build\" inside plugin directory", - plugin_dir.display() + current_plugin_dir.display() ); } plugins.entry(service.plugin.clone()).or_insert_with(|| { - let mut versions: Vec<_> = fs::read_dir(plugin_dir) + let mut versions: Vec<_> = fs::read_dir(current_plugin_dir) .unwrap() .map(|r| r.unwrap()) .collect(); - versions.sort_by_key(|dir| dir.path()); + versions.sort_by_key(std::fs::DirEntry::path); versions .last() .unwrap() @@ -85,8 +85,7 @@ fn enable_plugins( let mut queries: Vec = Vec::new(); // Queries to set migration variables, order of commands is not important - push_migration_envs_queries(&mut queries, topology, &plugins) - .context("failed to push migration variables")?; + push_migration_envs_queries(&mut queries, topology, &plugins); for (plugin, version) in &plugins { queries.push(format!(r#"CREATE PLUGIN "{plugin}" {version};"#)); @@ -134,10 +133,10 @@ fn push_migration_envs_queries( queries: &mut Vec, topology: &Topology, plugins: &HashMap, -) -> Result<()> { +) { info!("setting migration variables"); - for (_, tier) in &topology.tiers { + for tier in topology.tiers.values() { let Some(migration_envs) = &tier.migration_envs else { continue; }; @@ -150,8 +149,6 @@ fn push_migration_envs_queries( } } } - - Ok(()) } fn kill_picodata_instances() -> Result<()> { @@ -169,9 +166,9 @@ pub fn cmd( topology_path: &PathBuf, data_dir: &Path, disable_plugin_install: bool, - base_http_port: &i32, + base_http_port: i32, picodata_path: &PathBuf, - base_pg_port: &i32, + base_pg_port: i32, use_release: bool, ) -> Result<()> { fs::create_dir_all(data_dir).unwrap(); @@ -213,7 +210,7 @@ pub fn cmd( let bin_port = 3000 + instance_id; let http_port = base_http_port + instance_id; let pg_port = base_pg_port + instance_id; - let instance_data_dir = data_dir.join("cluster").join(format!("i_{}", instance_id)); + let instance_data_dir = data_dir.join("cluster").join(format!("i_{instance_id}")); // TODO: make it as child processes with catch output and redirect it to main // output @@ -227,34 +224,31 @@ pub fn cmd( "--plugin-dir", plugins_dir, "--listen", - &format!("127.0.0.1:{}", bin_port), + &format!("127.0.0.1:{bin_port}"), "--peer", - &format!("127.0.0.1:{}", first_instance_bin_port), + &format!("127.0.0.1:{first_instance_bin_port}"), "--init-replication-factor", &tier.replication_factor.to_string(), "--http-listen", - &format!("127.0.0.1:{}", http_port), + &format!("127.0.0.1:{http_port}"), "--pg-listen", - &format!("127.0.0.1:{}", pg_port), + &format!("127.0.0.1:{pg_port}"), "--tier", tier_name, ]) .spawn() - .context(format!( - "failed to start picodata instance: {}", - instance_id - ))?; + .context(format!("failed to start picodata instance: {instance_id}"))?; thread::sleep(Duration::from_secs(5)); // Save pid of picodata process to kill it after let pid = process.id(); let pid_location = instance_data_dir.join("pid"); let mut file = File::create(pid_location)?; - writeln!(file, "{}", pid)?; + writeln!(file, "{pid}")?; let processes_lock = Arc::clone(&get_picodata_processes()); let mut processes = processes_lock.lock().unwrap(); - processes.push(process) + processes.push(process); } } diff --git a/src/main.rs b/src/main.rs index fc511d3..a15e180 100644 --- a/src/main.rs +++ b/src/main.rs @@ -129,21 +129,21 @@ fn main() -> Result<()> { &topology, &data_dir, disable_plugin_install, - &base_http_port, + base_http_port, &picodata_path, - &base_pg_port, + base_pg_port, release, ) .context("failed to execute Run command")?, Command::Stop { data_dir } => { - commands::stop::cmd(&data_dir).context("failed to execute \"stop\" command")? + commands::stop::cmd(&data_dir).context("failed to execute \"stop\" command")?; } Command::Clean { data_dir } => { - commands::clean::cmd(&data_dir).context("failed to execute \"clean\" command")? + commands::clean::cmd(&data_dir).context("failed to execute \"clean\" command")?; } Command::Plugin { command } => match command { Plugin::Pack { debug } => { - commands::plugin::pack::cmd(debug).context("failed to execute \"pack\" command")? + commands::plugin::pack::cmd(debug).context("failed to execute \"pack\" command")?; } Plugin::New { path, diff --git a/tests/helpers/helpers.rs b/tests/helpers/helpers.rs deleted file mode 100644 index 45d342b..0000000 --- a/tests/helpers/helpers.rs +++ /dev/null @@ -1,184 +0,0 @@ -use log::info; -use std::ffi::OsStr; -use std::io::{BufRead, BufReader, Write}; -use std::thread; -use std::{ - fs::{self}, - io::ErrorKind, - path::Path, - process::{Child, Command, Stdio}, - time::{Duration, Instant}, -}; - -pub const PLUGIN_DIR: &str = "./tests/test_plugin/"; -pub const TESTS_DIR: &str = "./tests"; - -pub struct Cluster {} - -impl Drop for Cluster { - fn drop(&mut self) { - run_pike(vec!["stop"], PLUGIN_DIR.to_string()).unwrap(); - } -} - -impl Cluster { - pub fn new() -> Cluster { - info!("cleaning artefacts from previous run"); - - match fs::remove_file(Path::new(TESTS_DIR).join("instance.log")) { - Ok(_) => info!("Clearing logs."), - Err(e) if e.kind() == ErrorKind::NotFound => { - info!("instance.log not found, skipping cleanup") - } - Err(e) => panic!("failed to delete instance.log: {}", e), - } - - match fs::remove_dir_all(PLUGIN_DIR) { - Ok(_) => info!("clearing test plugin dir."), - Err(e) if e.kind() == ErrorKind::NotFound => { - info!("plugin dir not found, skipping cleanup") - } - Err(e) => panic!("failed to delete plugin_dir: {}", e), - } - - Cluster {} - } -} - -pub fn run_cluster(timeout: Duration, total_instances: i32) -> Result { - // Set up cleanup function - let cluster_handle = Cluster::new(); - - // Create plugin from template - let mut plugin_creation_proc = - run_pike(vec!["plugin", "new", "test_plugin"], TESTS_DIR).unwrap(); - - wait_for_proc(&mut plugin_creation_proc, Duration::from_secs(10)); - - // Build the plugin - Command::new("cargo") - .args(vec!["build"]) - .current_dir(PLUGIN_DIR) - .output()?; - - // Setup the cluster - run_pike(vec!["run"], PLUGIN_DIR).unwrap(); - - let start_time = Instant::now(); - - // Run in the loop until we get info about successful plugin installation - loop { - // Check if cluster set up correctly - let mut picodata_admin = await_picodata_admin(Duration::from_secs(60))?; - let stdout = picodata_admin - .stdout - .take() - .expect("Failed to capture stdout"); - - if start_time.elapsed() >= timeout { - panic!("cluster setup timeouted"); - } - - let queries = vec![ - r#"SELECT enabled FROM _pico_plugin;"#, - r#"SELECT current_state FROM _pico_instance;"#, - r#"\help;"#, - ]; - - // New scope to avoid infinite cycle while reading picodata stdout - { - let picodata_stdin = picodata_admin.stdin.as_mut().unwrap(); - for query in queries { - picodata_stdin.write_all(query.as_bytes()).unwrap() - } - picodata_admin.wait().unwrap(); - } - - let mut plugin_ready = false; - let mut can_connect = false; - let mut online_instances_counter = 0; - - let reader = BufReader::new(stdout); - for line in reader.lines() { - let line = line.expect("failed to read picodata stdout"); - println!("{}", line); - - if line.contains("true") { - plugin_ready = true; - } - if line.contains("Connected to admin console by socket") { - can_connect = true; - } - if line.contains("Online") { - online_instances_counter += 1; - } - } - - picodata_admin.kill().unwrap(); - - if can_connect && plugin_ready && online_instances_counter == total_instances { - return Ok(cluster_handle); - } - - thread::sleep(Duration::from_secs(5)); - } -} - -pub fn run_pike(args: Vec, current_dir: P) -> Result -where - A: AsRef, - P: AsRef, -{ - let root_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap(); - Command::new(format!("{root_dir}/target/debug/cargo-pike")) - .arg("pike") - .args(args) - .current_dir(current_dir) - .spawn() -} - -pub fn wait_for_proc(proc: &mut Child, timeout: Duration) { - let start_time = Instant::now(); - - loop { - if start_time.elapsed() >= timeout { - panic!("Process hanging for too long"); - } - - match proc.try_wait().unwrap() { - Some(_) => { - break; - } - None => { - std::thread::sleep(std::time::Duration::from_millis(100)); - } - } - } -} - -pub fn await_picodata_admin(timeout: Duration) -> Result { - let start_time = Instant::now(); - - loop { - if start_time.elapsed() >= timeout { - panic!("process hanging for too long"); - } - - let picodata_admin = Command::new("picodata") - .arg("admin") - .arg("./tests/test_plugin/tmp/cluster/i_1/admin.sock") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn(); - - match picodata_admin { - Ok(process) => { - info!("successfully connected to picodata cluster."); - return Ok(process); - } - Err(_) => { - std::thread::sleep(Duration::from_secs(1)); - } - } - } -} diff --git a/tests/helpers/mod.rs b/tests/helpers/mod.rs index 39ef443..914d22d 100644 --- a/tests/helpers/mod.rs +++ b/tests/helpers/mod.rs @@ -1 +1,184 @@ -pub(crate) mod helpers; +use log::info; +use std::ffi::OsStr; +use std::io::{BufRead, BufReader, Write}; +use std::thread; +use std::{ + fs::{self}, + io::ErrorKind, + path::Path, + process::{Child, Command, Stdio}, + time::{Duration, Instant}, +}; + +pub const PLUGIN_DIR: &str = "./tests/test_plugin/"; +pub const TESTS_DIR: &str = "./tests"; + +pub struct Cluster {} + +impl Drop for Cluster { + fn drop(&mut self) { + run_pike(vec!["stop"], PLUGIN_DIR).unwrap(); + } +} + +impl Cluster { + pub fn new() -> Cluster { + info!("cleaning artefacts from previous run"); + + match fs::remove_file(Path::new(TESTS_DIR).join("instance.log")) { + Ok(()) => info!("Clearing logs."), + Err(e) if e.kind() == ErrorKind::NotFound => { + info!("instance.log not found, skipping cleanup"); + } + Err(e) => panic!("failed to delete instance.log: {e}"), + } + + match fs::remove_dir_all(PLUGIN_DIR) { + Ok(()) => info!("clearing test plugin dir."), + Err(e) if e.kind() == ErrorKind::NotFound => { + info!("plugin dir not found, skipping cleanup"); + } + Err(e) => panic!("failed to delete plugin_dir: {e}"), + } + + Cluster {} + } +} + +pub fn run_cluster(timeout: Duration, total_instances: i32) -> Result { + // Set up cleanup function + let cluster_handle = Cluster::new(); + + // Create plugin from template + let mut plugin_creation_proc = + run_pike(vec!["plugin", "new", "test_plugin"], TESTS_DIR).unwrap(); + + wait_for_proc(&mut plugin_creation_proc, Duration::from_secs(10)); + + // Build the plugin + Command::new("cargo") + .args(vec!["build"]) + .current_dir(PLUGIN_DIR) + .output()?; + + // Setup the cluster + run_pike(vec!["run"], PLUGIN_DIR).unwrap(); + + let start_time = Instant::now(); + + // Run in the loop until we get info about successful plugin installation + loop { + // Check if cluster set up correctly + let mut picodata_admin = await_picodata_admin(Duration::from_secs(60))?; + let stdout = picodata_admin + .stdout + .take() + .expect("Failed to capture stdout"); + + assert!(start_time.elapsed() < timeout, "cluster setup timeouted"); + + let queries = vec![ + r"SELECT enabled FROM _pico_plugin;", + r"SELECT current_state FROM _pico_instance;", + r"\help;", + ]; + + // New scope to avoid infinite cycle while reading picodata stdout + { + let picodata_stdin = picodata_admin.stdin.as_mut().unwrap(); + for query in queries { + picodata_stdin.write_all(query.as_bytes()).unwrap(); + } + picodata_admin.wait().unwrap(); + } + + let mut plugin_ready = false; + let mut can_connect = false; + let mut online_instances_counter = 0; + + let reader = BufReader::new(stdout); + for line in reader.lines() { + let line = line.expect("failed to read picodata stdout"); + println!("{line}"); + + if line.contains("true") { + plugin_ready = true; + } + if line.contains("Connected to admin console by socket") { + can_connect = true; + } + if line.contains("Online") { + online_instances_counter += 1; + } + } + + picodata_admin.kill().unwrap(); + + if can_connect && plugin_ready && online_instances_counter == total_instances { + return Ok(cluster_handle); + } + + thread::sleep(Duration::from_secs(5)); + } +} + +pub fn run_pike(args: Vec, current_dir: P) -> Result +where + A: AsRef, + P: AsRef, +{ + let root_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap(); + Command::new(format!("{root_dir}/target/debug/cargo-pike")) + .arg("pike") + .args(args) + .current_dir(current_dir) + .spawn() +} + +pub fn wait_for_proc(proc: &mut Child, timeout: Duration) { + let start_time = Instant::now(); + + loop { + assert!( + start_time.elapsed() < timeout, + "Process hanging for too long" + ); + + match proc.try_wait().unwrap() { + Some(_) => { + break; + } + None => { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + } + } +} + +pub fn await_picodata_admin(timeout: Duration) -> Result { + let start_time = Instant::now(); + + loop { + assert!( + start_time.elapsed() < timeout, + "process hanging for too long" + ); + + let picodata_admin = Command::new("picodata") + .arg("admin") + .arg("./tests/test_plugin/tmp/cluster/i_1/admin.sock") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn(); + + match picodata_admin { + Ok(process) => { + info!("successfully connected to picodata cluster."); + return Ok(process); + } + Err(_) => { + std::thread::sleep(Duration::from_secs(1)); + } + } + } +} diff --git a/tests/run.rs b/tests/run.rs index ef5e6b9..5145a43 100644 --- a/tests/run.rs +++ b/tests/run.rs @@ -1,5 +1,6 @@ mod helpers; -use helpers::helpers::run_cluster; + +use helpers::run_cluster; use std::time::Duration; const TOTAL_INSTANCES: i32 = 4;