Skip to content

Commit

Permalink
feat: update dummy plugin to handle concurrent queries
Browse files Browse the repository at this point in the history
  • Loading branch information
j-lanson authored and alilleybrinker committed Aug 26, 2024
1 parent 88bb6d5 commit c999d29
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 80 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 56 additions & 2 deletions hipcheck/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#![allow(unused)]

use crate::plugin::{ActivePlugin, HcPluginCore, PluginExecutor, PluginResponse, PluginWithConfig};
use crate::plugin::{ActivePlugin, PluginResponse};
pub use crate::plugin::{HcPluginCore, PluginExecutor, PluginWithConfig};
use crate::{hc_error, Result};
use futures::future::{BoxFuture, FutureExt};
use serde_json::Value;
use std::sync::{Arc, LazyLock};
use tokio::runtime::Runtime;
use tokio::runtime::{Handle, Runtime};

// Salsa doesn't natively support async functions, so our recursive `query()` function that
// interacts with plugins (which use async) has to get a handle to the underlying runtime,
Expand Down Expand Up @@ -62,6 +64,54 @@ fn query(
}
}

// Demonstration of how the above `query()` function would be implemented as async
pub fn async_query(
core: Arc<HcPluginCore>,
publisher: String,
plugin: String,
query: String,
key: Value,
) -> BoxFuture<'static, Result<Value>> {
async move {
// Find the plugin
let Some(p_handle) = core.plugins.get(&plugin) else {
return Err(hc_error!("No such plugin {}::{}", publisher, plugin));
};
// Initiate the query. If remote closed or we got our response immediately,
// return
let mut ar = match p_handle.query(query, key).await? {
PluginResponse::RemoteClosed => {
return Err(hc_error!("Plugin channel closed unexpected"));
}
PluginResponse::Completed(v) => {
return Ok(v);
}
PluginResponse::AwaitingResult(a) => a,
};
// Otherwise, the plugin needs more data to continue. Recursively query
// (with salsa memo-ization) to get the needed data, and resume our
// current query by providing the plugin the answer.
loop {
let answer = async_query(
Arc::clone(&core),
ar.publisher.clone(),
ar.plugin.clone(),
ar.query.clone(),
ar.key.clone(),
)
.await?;
ar = match p_handle.resume_query(ar, answer).await? {
PluginResponse::RemoteClosed => {
return Err(hc_error!("Plugin channel closed unexpected"));
}
PluginResponse::Completed(v) => return Ok(v),
PluginResponse::AwaitingResult(a) => a,
};
}
}
.boxed()
}

#[salsa::database(HcEngineStorage)]
pub struct HcEngineImpl {
// Query storage
Expand All @@ -77,13 +127,17 @@ impl HcEngineImpl {
// independent of Salsa.
pub fn new(executor: PluginExecutor, plugins: Vec<(PluginWithConfig)>) -> Result<Self> {
let runtime = RUNTIME.handle();
println!("Starting HcPluginCore");
let core = runtime.block_on(HcPluginCore::new(executor, plugins))?;
let mut engine = HcEngineImpl {
storage: Default::default(),
};
engine.set_core(Arc::new(core));
Ok(engine)
}
pub fn runtime() -> &'static Handle {
RUNTIME.handle()
}
// TODO - "run" function that takes analysis heirarchy and target, and queries each
// analysis plugin to kick off the execution
}
59 changes: 46 additions & 13 deletions hipcheck/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use crate::analysis::report_builder::Report;
use crate::analysis::score::score_results;
use crate::cache::HcCache;
use crate::context::Context as _;
use crate::engine::{HcEngine, HcEngineImpl};
use crate::error::Error;
use crate::error::Result;
use crate::plugin::{Plugin, PluginExecutor, PluginWithConfig};
Expand Down Expand Up @@ -641,6 +640,10 @@ fn check_github_token() -> StdResult<(), EnvVarCheckError> {
}

fn cmd_plugin() {
use crate::engine::{async_query, HcEngine, HcEngineImpl};
use std::sync::Arc;
use tokio::task::JoinSet;

let tgt_dir = "./target/debug";
let entrypoint = pathbuf![tgt_dir, "dummy_rand_data"];
let plugin = Plugin {
Expand All @@ -665,19 +668,49 @@ fn cmd_plugin() {
return;
}
};
let res = match engine.query(
"MITRE".to_owned(),
"rand_data".to_owned(),
"rand_data".to_owned(),
serde_json::json!(7),
) {
Ok(r) => r,
Err(e) => {
println!("Query failed: {e}");
return;
let core = engine.core();
let handle = HcEngineImpl::runtime();
// @Note - how to initiate multiple queries with async calls
handle.block_on(async move {
let mut futs = JoinSet::new();
for i in 1..10 {
let arc_core = Arc::clone(&core);
println!("Spawning");
futs.spawn(async_query(
arc_core,
"MITRE".to_owned(),
"rand_data".to_owned(),
"rand_data".to_owned(),
serde_json::json!(i),
));
}
};
println!("Result: {res}");
while let Some(res) = futs.join_next().await {
println!("res: {res:?}");
}
});
// @Note - how to initiate multiple queries with sync calls
// let conc: Vec<thread::JoinHandle<()>> = vec![];
// for i in 0..10 {
// let fut = thread::spawn(|| {
// let res = match engine.query(
// "MITRE".to_owned(),
// "rand_data".to_owned(),
// "rand_data".to_owned(),
// serde_json::json!(i),
// ) {
// Ok(r) => r,
// Err(e) => {
// println!("{i}: Query failed: {e}");
// return;
// }
// };
// println!("{i}: Result: {res}");
// });
// conc.push(fut);
// }
// while let Some(x) = conc.pop() {
// x.join().unwrap();
// }
}

fn cmd_ready(config: &CliConfig) {
Expand Down
2 changes: 2 additions & 0 deletions plugins/dummy_rand_data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ edition = "2021"
publish = false

[dependencies]
anyhow = "1.0.86"
clap = { version = "4.5.16", features = ["derive"] }
indexmap = "2.4.0"
prost = "0.13.1"
rand = "0.8.5"
serde_json = "1.0.125"
Expand Down
Loading

0 comments on commit c999d29

Please sign in to comment.