From 8e64b651bcb8fcd49d7b3b543f31f3e019caff12 Mon Sep 17 00:00:00 2001 From: jlanson Date: Mon, 12 Aug 2024 14:40:41 -0400 Subject: [PATCH] feat: inital handling of plugin startup and context management --- Cargo.lock | 2 + hipcheck/Cargo.toml | 2 + hipcheck/src/main.rs | 2 + hipcheck/src/plugin/manager.rs | 125 +++++++++++++++++++++++++++ hipcheck/src/plugin/mod.rs | 19 +++++ hipcheck/src/plugin/types.rs | 149 +++++++++++++++++++++++++++++++++ 6 files changed, 299 insertions(+) create mode 100644 hipcheck/src/plugin/manager.rs create mode 100644 hipcheck/src/plugin/mod.rs create mode 100644 hipcheck/src/plugin/types.rs diff --git a/Cargo.lock b/Cargo.lock index e9620bd6..78267926 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1061,6 +1061,7 @@ dependencies = [ "pathbuf", "petgraph", "prost", + "rand", "rayon", "regex", "rustls", @@ -1077,6 +1078,7 @@ dependencies = [ "tar", "tempfile", "term_size", + "tokio", "toml", "tonic", "tonic-build", diff --git a/hipcheck/Cargo.toml b/hipcheck/Cargo.toml index 296995d0..7648bbd7 100644 --- a/hipcheck/Cargo.toml +++ b/hipcheck/Cargo.toml @@ -88,6 +88,8 @@ tabled = "0.15.0" fs_extra = "1.3.0" tonic = "0.12.1" prost = "0.13.1" +rand = "0.8.5" +tokio = { version = "1.39.2", features = ["time"] } # Exactly matching the version of rustls used by ureq # Get rid of default features since we don't use the AWS backed crypto provider (we use ring). diff --git a/hipcheck/src/main.rs b/hipcheck/src/main.rs index 777d20af..dd2b65d1 100644 --- a/hipcheck/src/main.rs +++ b/hipcheck/src/main.rs @@ -15,6 +15,8 @@ mod git2_log_shim; mod git2_rustls_transport; mod log_bridge; mod metric; +#[allow(unused)] +mod plugin; mod report; mod session; mod setup; diff --git a/hipcheck/src/plugin/manager.rs b/hipcheck/src/plugin/manager.rs new file mode 100644 index 00000000..2e699932 --- /dev/null +++ b/hipcheck/src/plugin/manager.rs @@ -0,0 +1,125 @@ +use crate::hipcheck::plugin_client::PluginClient; +use crate::plugin::{HcPluginClient, Plugin, PluginContext}; +use crate::{hc_error, Result, F64}; +use rand::Rng; +use std::collections::HashSet; +use std::ops::Range; +use std::process::Command; +use tokio::time::{sleep_until, Duration, Instant}; + +pub struct PluginExecutor { + max_spawn_attempts: usize, + max_conn_attempts: usize, + port_range: Range, + backoff_interval: Duration, + jitter_percent: u8, + est_ports: HashSet, +} +impl PluginExecutor { + pub fn new( + max_spawn_attempts: usize, + max_conn_attempts: usize, + port_range: Range, + backoff_interval_micros: u64, + jitter_percent: u8, + ) -> Result { + if jitter_percent > 100 { + return Err(hc_error!( + "jitter_percent must be <= 100, got {}", + jitter_percent + )); + } + let backoff_interval = Duration::from_micros(backoff_interval_micros); + Ok(PluginExecutor { + max_spawn_attempts, + max_conn_attempts, + port_range, + backoff_interval, + jitter_percent, + est_ports: HashSet::new(), + }) + } + fn get_available_port(&mut self) -> Result { + for i in self.port_range.start..self.port_range.end { + if !self.est_ports.contains(&i) + && std::net::TcpListener::bind(format!("127.0.0.1:{i}")).is_ok() + { + return Ok(i); + } + } + Err(hc_error!("Failed to find available port")) + } + pub async fn start_plugin(&mut self, plugin: &Plugin) -> Result { + let mut rng = rand::thread_rng(); + // Plugin startup design has inherent TOCTOU flaws since we tell the plugin + // which port we expect it to bind to. We can try to ensure the port we pass + // on the cmdline is not already in use, but it is still possible for that + // port to become unavailable between our check and the plugin's bind attempt. + // Hence the need for subsequent attempts if we get unlucky + let mut spawn_attempts: usize = 0; + while spawn_attempts < self.max_spawn_attempts { + // Find free port for process. Don't retry if we fail since this means all + // ports in the desired range are already bound + let port = self.get_available_port()?; + let port_str = port.to_string(); + // Spawn plugin process + let Ok(mut proc) = Command::new(&plugin.entrypoint) + .args(["--port", port_str.as_str()]) + .spawn() + else { + spawn_attempts += 1; + continue; + }; + // Attempt to connect to the plugin's gRPC server up to N times, using + // linear backoff with a percentage jitter. + let mut conn_attempts = 0; + let mut opt_grpc: Option = None; + while conn_attempts < self.max_conn_attempts { + // Jitter could be positive or negative, so mult by 2 to cover both sides + let jitter: i32 = rng.gen_range(0..(2 * self.jitter_percent)) as i32; + // Then subtract by self.jitter_percent to center around 0, and add to 100% + let jitter_percent = 1.0 + ((jitter - (self.jitter_percent as i32)) as f64 / 100.0); + // Once we are confident this math works, we can remove this + if !(0.0..=2.0).contains(&jitter_percent) { + panic!("Math error! We should have better guardrails around PluginExecutor field values."); + } + // sleep_duration = (backoff * conn_attempts) * (1.0 +/- jitter_percent) + let mut sleep_duration: Duration = self + .backoff_interval + .saturating_mul(conn_attempts as u32) + .mul_f64(jitter_percent); + sleep_until(Instant::now() + sleep_duration).await; + if let Ok(grpc) = + PluginClient::connect(format!("http://127.0.0.1:{port_str}")).await + { + opt_grpc = Some(grpc); + break; + } else { + conn_attempts += 1; + } + } + // If opt_grpc is None, we did not manage to connect to the plugin. Kill it + // and try again + let Some(grpc) = opt_grpc else { + if let Err(e) = proc.kill() { + println!("Failed to kill child process for plugin: {e}"); + } + spawn_attempts += 1; + continue; + }; + self.est_ports.insert(port); + // We now have an open gRPC connection to our plugin process + return Ok(PluginContext { + plugin: plugin.clone(), + port, + grpc, + proc, + channel: None, + }); + } + Err(hc_error!( + "Reached max spawn attempts for plugin {}", + plugin.name + )) + } +} diff --git a/hipcheck/src/plugin/mod.rs b/hipcheck/src/plugin/mod.rs new file mode 100644 index 00000000..b39a80f7 --- /dev/null +++ b/hipcheck/src/plugin/mod.rs @@ -0,0 +1,19 @@ +mod manager; +mod types; + +use crate::plugin::manager::*; +pub use crate::plugin::types::*; + +pub fn dummy() { + let plugin = Plugin { + name: "dummy".to_owned(), + entrypoint: "./dummy".to_owned(), + }; + let manager = PluginExecutor::new( + /* max_spawn_attempts */ 3, + /* max_conn_attempts */ 5, + /* port_range */ 40000..u16::MAX, + /* backoff_interval_micros */ 1000, + /* jitter_percent */ 10, + ); +} diff --git a/hipcheck/src/plugin/types.rs b/hipcheck/src/plugin/types.rs new file mode 100644 index 00000000..d1cda2fd --- /dev/null +++ b/hipcheck/src/plugin/types.rs @@ -0,0 +1,149 @@ +use crate::hipcheck::plugin_client::PluginClient; +use crate::hipcheck::{ + Configuration, ConfigurationResult as PluginConfigResult, ConfigurationStatus, Empty, + Schema as PluginSchema, +}; +use crate::{hc_error, Result}; +use serde_json::Value; +use std::collections::HashMap; +use std::ops::Not; +use std::process::Child; +use tonic::transport::Channel; + +pub type HcPluginClient = PluginClient; + +#[derive(Clone, Debug)] +pub struct Plugin { + pub name: String, + pub entrypoint: String, +} + +// Hipcheck-facing version of struct from crate::hipcheck +pub struct Schema { + pub query_name: String, + pub key_schema: Value, + pub output_schema: Value, +} +impl TryFrom for Schema { + type Error = crate::error::Error; + fn try_from(value: PluginSchema) -> Result { + let key_schema: Value = serde_json::from_str(value.key_schema.as_str())?; + let output_schema: Value = serde_json::from_str(value.output_schema.as_str())?; + Ok(Schema { + query_name: value.query_name, + key_schema, + output_schema, + }) + } +} + +// Hipcheck-facing version of struct from crate::hipcheck +pub struct ConfigurationResult { + pub status: ConfigurationStatus, + pub message: Option, +} +impl TryFrom for ConfigurationResult { + type Error = crate::error::Error; + fn try_from(value: PluginConfigResult) -> Result { + let status: ConfigurationStatus = value.status.try_into()?; + let message = value.message.is_empty().not().then_some(value.message); + Ok(ConfigurationResult { status, message }) + } +} +// hipcheck::ConfigurationStatus has an enum that captures both error and success +// scenarios. The below code allows interpreting the struct as a Rust Result. If +// the success variant was the status, Ok(()) is returned, otherwise the code +// is stuffed into a custom error type enum that equals the protoc-generated one +// minus the success variant. +impl ConfigurationResult { + pub fn as_result(&self) -> std::result::Result<(), ConfigError> { + let Ok(error) = self.status.try_into() else { + return Ok(()); + }; + Err(ConfigError::new(error, self.message.clone())) + } +} +pub enum ConfigErrorType { + Unknown = 0, + MissingRequiredConfig = 2, + UnrecognizedConfig = 3, + InvalidConfigValue = 4, +} +impl TryFrom for ConfigErrorType { + type Error = crate::error::Error; + fn try_from(value: ConfigurationStatus) -> Result { + use ConfigErrorType::*; + use ConfigurationStatus::*; + Ok(match value as i32 { + x if x == ErrorUnknown as i32 => Unknown, + x if x == ErrorMissingRequiredConfiguration as i32 => MissingRequiredConfig, + x if x == ErrorUnrecognizedConfiguration as i32 => UnrecognizedConfig, + x if x == ErrorInvalidConfigurationValue as i32 => InvalidConfigValue, + x => { + return Err(hc_error!("status value '{}' is not an error", x)); + } + }) + } +} +pub struct ConfigError { + error: ConfigErrorType, + message: Option, +} +impl ConfigError { + pub fn new(error: ConfigErrorType, message: Option) -> Self { + ConfigError { error, message } + } +} + +// State for managing an actively running plugin process +pub struct PluginContext { + pub plugin: Plugin, + pub port: u16, + pub grpc: HcPluginClient, + pub proc: Child, + pub channel: Option, +} +// Redefinition of `grpc` field's functions with more useful types, additional +// error & sanity checking +impl PluginContext { + pub async fn get_query_schemas(&mut self) -> Result> { + let mut res = self.grpc.get_query_schemas(Empty {}).await?; + let stream = res.get_mut(); + let mut schema_builder: HashMap = HashMap::new(); + while let Some(msg) = stream.message().await? { + // If we received a PluginSchema msg with this query name before, + // treat as a chunked msg and append its strings to existing entry + if let Some(existing) = schema_builder.get_mut(&msg.query_name) { + existing.key_schema.push_str(msg.key_schema.as_str()); + existing.output_schema.push_str(msg.output_schema.as_str()); + } else { + schema_builder.insert(msg.query_name.clone(), msg); + } + } + // Convert the aggregated PluginSchemas to Schema objects + schema_builder + .into_values() + .map(TryInto::try_into) + .collect() + } + pub async fn set_configuration(&mut self, conf: &Value) -> Result { + let conf_query = Configuration { + configuration: serde_json::to_string(&conf)?, + }; + let res = self.grpc.set_configuration(conf_query).await?; + res.into_inner().try_into() + } + // TODO - the String in the result should be replaced with a structured + // type once the policy expression code is integrated + pub async fn get_default_policy_expression(&mut self) -> Result { + let mut res = self.grpc.get_default_policy_expression(Empty {}).await?; + Ok(res.get_ref().policy_expression.to_owned()) + } +} +impl Drop for PluginContext { + fn drop(&mut self) { + if let Err(e) = self.proc.kill() { + println!("Failed to kill child: {e}"); + } + } +}