Skip to content

Commit

Permalink
134: add inventory service and clients (sans auth) (#86)
Browse files Browse the repository at this point in the history
* add add inventory service and clients

* update to db field to i64; update db query

* impl feedback:  persist inventory cache; remove unauthenticated inventory service call; logs
  • Loading branch information
JettTech authored Mar 5, 2025
1 parent d057e66 commit d9ff222
Show file tree
Hide file tree
Showing 26 changed files with 746 additions and 166 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"rust/hpos-hal",
"rust/clients/host_agent",
"rust/clients/orchestrator",
"rust/services/inventory",
"rust/services/workload",
"rust/util_libs",
"rust/netdiag",
Expand Down
1 change: 1 addition & 0 deletions rust/clients/host_agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ tempfile = "3.15.0"
machineid-rs = "1.2.4"
util_libs = { path = "../../util_libs" }
workload = { path = "../../services/workload" }
inventory = { path = "../../services/inventory" }
hpos-hal = { path = "../../hpos-hal" }
netdiag = { path = "../../netdiag" }
6 changes: 6 additions & 0 deletions rust/clients/host_agent/src/agent_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ pub struct DaemonzeArgs {
default_value = "30"
)]
pub(crate) nats_connect_timeout_secs: u64,

#[arg(long, short, help = "host agent inventory check interval (in seconds)")]
pub(crate) host_inventory_check_interval_sec: Option<u64>,

#[arg(long, help = "host agent inventory file path")]
pub(crate) host_inventory_file_path: Option<String>,
}

/// A set of commands for being able to manage the local host. We may (later) want to gate some
Expand Down
35 changes: 35 additions & 0 deletions rust/clients/host_agent/src/hostd/host_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::{path::PathBuf, time::Duration};
use util_libs::nats::{
jetstream_client::{get_event_listeners, get_nats_url, with_event_listeners, JsClient},
types::JsClientBuilder,
};

const HOST_AGENT_CLIENT_NAME: &str = "Host Agent";
const HOST_AGENT_INBOX_PREFIX: &str = "_WORKLOAD_INBOX";

pub async fn run(host_pubkey: &str, host_creds_path: &Option<PathBuf>) -> anyhow::Result<JsClient> {
let nats_url = get_nats_url();
log::info!("nats_url : {}", nats_url);
log::info!("host_creds_path : {:?}", host_creds_path);
log::info!("host_pubkey : {}", host_pubkey);

let pubkey_lowercase: String = host_pubkey.to_string().to_lowercase();

let creds = host_creds_path
.as_ref()
.map(|path| path.to_string_lossy().to_string());

let host_client = JsClient::new(JsClientBuilder {
nats_url: nats_url.clone(),
name: HOST_AGENT_CLIENT_NAME.to_string(),
inbox_prefix: format!("{}.{}", pubkey_lowercase, HOST_AGENT_INBOX_PREFIX),
credentials_path: creds.clone(),
ping_interval: Some(Duration::from_secs(10)),
request_timeout: Some(Duration::from_secs(29)),
listeners: vec![with_event_listeners(get_event_listeners())],
})
.await
.map_err(|e| anyhow::anyhow!("connecting to NATS via {nats_url}: {e}"))?;

Ok(host_client)
}
76 changes: 76 additions & 0 deletions rust/clients/host_agent/src/hostd/inventory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
This client is associated with the:
- WORKLOAD account
- host user
This client is responsible for subscribing to workload streams that handle:
- installing new workloads onto the hosting device
- removing workloads from the hosting device
- sending workload status upon request
- sending out active periodic workload reports
*/

use anyhow::Result;
use hpos_hal::inventory::HoloInventory;
use inventory::INVENTORY_UPDATE_SUBJECT;
use tokio::time::sleep;
use util_libs::nats::{jetstream_client::JsClient, types::PublishInfo};

pub fn should_check_inventory(
start: chrono::DateTime<chrono::Utc>,
check_interval_duration: chrono::TimeDelta,
) -> bool {
let now = chrono::Utc::now();
now.signed_duration_since(start) > check_interval_duration
}

pub async fn run(
host_client: JsClient,
host_pubkey: &str,
inventory_file_path: &str,
host_inventory_check_interval_sec: u64,
) -> Result<(), async_nats::Error> {
log::info!("Host Agent Client: starting Inventory job...");

// Store latest inventory record in memory
let starting_inventory = HoloInventory::from_host();
starting_inventory.save_to_file(inventory_file_path)?;

let one_hour_interval = tokio::time::Duration::from_secs(host_inventory_check_interval_sec);
let check_interval_duration = chrono::TimeDelta::seconds(one_hour_interval.as_secs() as i64);
let mut last_check_time = chrono::Utc::now();

let pubkey_lowercase = host_pubkey.to_string().to_lowercase();

loop {
// Periodically check inventory and compare against latest state (in-memory)
if should_check_inventory(last_check_time, check_interval_duration) {
log::debug!("Checking Host inventory...");

let current_inventory = HoloInventory::from_host();
if HoloInventory::load_from_file(inventory_file_path)? != current_inventory {
log::debug!("Host Inventory has changed. About to push update to Orchestrator");
let authenticated_user_inventory_subject =
format!("INVENTORY.{pubkey_lowercase}.{INVENTORY_UPDATE_SUBJECT}");

let payload_bytes = serde_json::to_vec(&current_inventory)?;

let payload = PublishInfo {
subject: authenticated_user_inventory_subject,
msg_id: chrono::Utc::now().to_string(),
data: payload_bytes,
headers: None,
};

host_client.publish(payload).await?;
current_inventory.save_to_file(inventory_file_path)?;
} else {
log::debug!("Host Inventory has not changed.");
}

last_check_time = chrono::Utc::now();
}

sleep(one_hour_interval).await;
}
}
2 changes: 2 additions & 0 deletions rust/clients/host_agent/src/hostd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
pub mod gen_leaf_server;
pub mod host_client;
pub mod inventory;
pub mod workload;
52 changes: 10 additions & 42 deletions rust/clients/host_agent/src/hostd/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,27 @@ This client is responsible for subscribing to workload streams that handle:

use anyhow::{anyhow, Result};
use async_nats::Message;
use std::{path::PathBuf, sync::Arc, time::Duration};
use std::sync::Arc;
use util_libs::nats::{
jetstream_client,
types::{ConsumerBuilder, EndpointType, JsClientBuilder, JsServiceBuilder},
jetstream_client::JsClient,
types::{ConsumerBuilder, EndpointType, JsServiceBuilder},
};
use workload::{
host_api::HostWorkloadApi, types::WorkloadServiceSubjects, WorkloadServiceApi,
WORKLOAD_SRV_DESC, WORKLOAD_SRV_NAME, WORKLOAD_SRV_SUBJ, WORKLOAD_SRV_VERSION,
};

const HOST_AGENT_CLIENT_NAME: &str = "Host Agent";
const HOST_AGENT_INBOX_PREFIX: &str = "_WORKLOAD_INBOX";

// TODO: Use _host_creds_path for auth once we add in the more resilient auth pattern.
pub async fn run(
mut host_client: JsClient,
host_pubkey: &str,
host_creds_path: &Option<PathBuf>,
) -> Result<jetstream_client::JsClient, async_nats::Error> {
log::info!("Host Agent Client: Connecting to server...");
log::info!("host_creds_path : {:?}", host_creds_path);
) -> Result<JsClient, async_nats::Error> {
log::info!("Host Agent Client: starting workload service...");
log::info!("host_pubkey : {}", host_pubkey);

let pubkey_lowercase = host_pubkey.to_string().to_lowercase();

// ==================== Setup NATS ====================
// Connect to Nats server
let nats_url = jetstream_client::get_nats_url();
log::info!("nats_url : {}", nats_url);

let event_listeners = jetstream_client::get_event_listeners();

// Spin up Nats Client and loaded in the Js Stream Service
let mut host_workload_client = jetstream_client::JsClient::new(JsClientBuilder {
nats_url: nats_url.clone(),
name: HOST_AGENT_CLIENT_NAME.to_string(),
inbox_prefix: format!("{}.{}", HOST_AGENT_INBOX_PREFIX, pubkey_lowercase),
credentials_path: host_creds_path
.as_ref()
.map(|path| path.to_string_lossy().to_string()),
ping_interval: Some(Duration::from_secs(10)),
request_timeout: Some(Duration::from_secs(29)),
listeners: vec![jetstream_client::with_event_listeners(
event_listeners.clone(),
)],
})
.await
.map_err(|e| anyhow::anyhow!("connecting to NATS via {nats_url}: {e}"))?;

// ==================== Setup JS Stream Service ====================
// Instantiate the Workload API
let workload_api = HostWorkloadApi::default();
let pubkey_lowercase = host_pubkey.to_string().to_lowercase();

// Register Workload Streams for Host Agent to consume
// NB: Subjects are published by orchestrator or nats-db-connector
Expand All @@ -72,11 +42,9 @@ pub async fn run(
version: WORKLOAD_SRV_VERSION.to_string(),
service_subject: WORKLOAD_SRV_SUBJ.to_string(),
};
host_workload_client
.add_js_service(workload_stream_service)
.await?;
host_client.add_js_service(workload_stream_service).await?;

let workload_service = host_workload_client
let workload_service = host_client
.get_js_service(WORKLOAD_SRV_NAME.to_string())
.await
.ok_or(anyhow!(
Expand Down Expand Up @@ -151,5 +119,5 @@ pub async fn run(
})
.await?;

Ok(host_workload_client)
Ok(host_client)
}
41 changes: 36 additions & 5 deletions rust/clients/host_agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,48 @@ async fn daemonize(args: &DaemonzeArgs) -> Result<(), async_nats::Error> {
// TODO: would it be a good idea to reuse this client in the workload_manager and elsewhere later on?
bare_client.close().await?;

let host_workload_client = hostd::workload::run(
"host_id_placeholder>",
let host_client = hostd::host_client::run(
"host_pubkey_placeholder>",
&args.nats_leafnode_client_creds_path,
)
.await?;

// Get Host Agent inventory check duration env var..
// If none exists, default to 1 hour
let host_inventory_check_interval_sec =
&args.host_inventory_check_interval_sec.unwrap_or_else(|| {
std::env::var("HOST_INVENTORY_CHECK_DURATION")
.unwrap_or_else(|_| "3600".to_string())
.parse::<u64>()
.unwrap_or(3600) // 3600 seconds = 1 hour
});

// Get Host Agent inventory storage file path
// If none exists, default to "/var/lib/holo_inventory.json"
let inventory_file_path = args.host_inventory_file_path.as_ref().map_or_else(
|| {
std::env::var("HOST_INVENTORY_FILE_PATH")
.unwrap_or("/var/lib/holo_inventory.json".to_string())
},
|s| s.to_owned(),
);

hostd::inventory::run(
host_client.clone(),
"host_pubkey_placeholder>",
&inventory_file_path,
host_inventory_check_interval_sec.to_owned(),
)
.await?;

hostd::workload::run(host_client.clone(), "host_pubkey_placeholder>").await?;

// Only exit program when explicitly requested
tokio::signal::ctrl_c().await?;

// Close client and drain internal buffer before exiting to make sure all messages are sent
host_workload_client.close().await?;

// Close host client connection and drain internal buffer before exiting to make sure all messages are sent
// NB: Calling drain/close on any one of the Client instances will close the underlying connection.
// This affects all instances that share the same connection (including clones) because they are all references to the same resource.
host_client.close().await?;
Ok(())
}
1 change: 1 addition & 0 deletions rust/clients/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ utoipa-swagger-ui = { version = "9", features = [
utoipa = { version = "5", features = ["actix_extras"] }
util_libs = { path = "../../util_libs" }
workload = { path = "../../services/workload" }
inventory = { path = "../../services/inventory" }
33 changes: 33 additions & 0 deletions rust/clients/orchestrator/src/admin_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::path::PathBuf;
use std::time::Duration;
use std::vec;
use util_libs::nats::{
jetstream_client::{get_event_listeners, get_nats_url, with_event_listeners, JsClient},
types::JsClientBuilder,
};

const ORCHESTRATOR_ADMIN_CLIENT_NAME: &str = "Orchestrator Admin Client";
const ORCHESTRATOR_ADMIN_CLIENT_INBOX_PREFIX: &str = "ORCHESTRATOR._ADMIN_INBOX";

pub async fn run(admin_creds_path: &Option<PathBuf>) -> anyhow::Result<JsClient> {
let nats_url = get_nats_url();
log::info!("nats_url : {}", nats_url);

let creds = admin_creds_path
.as_ref()
.map(|path| path.to_string_lossy().to_string());

let admin_client = JsClient::new(JsClientBuilder {
nats_url: nats_url.clone(),
name: ORCHESTRATOR_ADMIN_CLIENT_NAME.to_string(),
inbox_prefix: ORCHESTRATOR_ADMIN_CLIENT_INBOX_PREFIX.to_string(),
credentials_path: creds.clone(),
request_timeout: Some(Duration::from_secs(29)),
ping_interval: Some(Duration::from_secs(10)),
listeners: vec![with_event_listeners(get_event_listeners())],
})
.await
.map_err(|e| anyhow::anyhow!("connecting to NATS via {nats_url}: {e}"))?;

Ok(admin_client)
}
Loading

0 comments on commit d9ff222

Please sign in to comment.