Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

134: add inventory service and clients (sans auth) #86

Merged
merged 8 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"rust/hpos-hal",
"rust/clients/host_agent",
"rust/clients/orchestrator",
"rust/services/inventory",
"rust/services/workload",
"rust/util_libs",
"rust/netdiag",
Expand Down
1 change: 1 addition & 0 deletions rust/clients/host_agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ tempfile = "3.15.0"
machineid-rs = "1.2.4"
util_libs = { path = "../../util_libs" }
workload = { path = "../../services/workload" }
inventory = { path = "../../services/inventory" }
hpos-hal = { path = "../../hpos-hal" }
netdiag = { path = "../../netdiag" }
35 changes: 35 additions & 0 deletions rust/clients/host_agent/src/hostd/host_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::{path::PathBuf, time::Duration};
use util_libs::nats::{
jetstream_client::{get_event_listeners, get_nats_url, with_event_listeners, JsClient},
types::JsClientBuilder,
};

const HOST_AGENT_CLIENT_NAME: &str = "Host Agent";
const HOST_AGENT_INBOX_PREFIX: &str = "_WORKLOAD_INBOX";

pub async fn run(host_pubkey: &str, host_creds_path: &Option<PathBuf>) -> anyhow::Result<JsClient> {
let nats_url = get_nats_url();
log::info!("nats_url : {}", nats_url);
log::info!("host_creds_path : {:?}", host_creds_path);
log::info!("host_pubkey : {}", host_pubkey);

let pubkey_lowercase: String = host_pubkey.to_string().to_lowercase();

let creds = host_creds_path
.as_ref()
.map(|path| path.to_string_lossy().to_string());

let host_client = JsClient::new(JsClientBuilder {
nats_url: nats_url.clone(),
name: HOST_AGENT_CLIENT_NAME.to_string(),
inbox_prefix: format!("{}.{}", pubkey_lowercase, HOST_AGENT_INBOX_PREFIX),
credentials_path: creds.clone(),
ping_interval: Some(Duration::from_secs(10)),
request_timeout: Some(Duration::from_secs(29)),
listeners: vec![with_event_listeners(get_event_listeners())],
})
.await
.map_err(|e| anyhow::anyhow!("connecting to NATS via {nats_url}: {e}"))?;

Ok(host_client)
}
67 changes: 67 additions & 0 deletions rust/clients/host_agent/src/hostd/inventory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
This client is associated with the:
- WORKLOAD account
- host user

This client is responsible for subscribing to workload streams that handle:
- installing new workloads onto the hosting device
- removing workloads from the hosting device
- sending workload status upon request
- sending out active periodic workload reports
*/

use anyhow::Result;
use hpos_hal::inventory::HoloInventory;
use inventory::HOST_AUTHENTICATED_SUBJECT;
use tokio::time::sleep;
use util_libs::nats::{jetstream_client::JsClient, types::PublishInfo};

pub fn should_check_inventory(
start: chrono::DateTime<chrono::Utc>,
check_interval_duration: chrono::TimeDelta,
) -> bool {
let now = chrono::Utc::now();
now.signed_duration_since(start) > check_interval_duration
}

pub async fn run(host_client: JsClient, host_pubkey: &str) -> Result<(), async_nats::Error> {
log::info!("Host Agent Client: starting Inventory job...");

// Store latest inventory record in memory
let mut in_memory_cache = HoloInventory::from_host();

let one_hour_interval = tokio::time::Duration::from_secs(3600); // 1 hour in seconds
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Holo-Host/developers - I'd like to open a discussion about what duration we think is appropriate for a regular inventory check. I put it to 1hr to start.. but would like to surface this as a question and come to an agreement as a team.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An hour is fine. It'll surface the changes we're looking for within a reasonable timeframe. We're not looking for realtime updates, and given that we're only sending the inventory when it changes from the last run, we're not wasting any capacity storing redundant copies. Ideally, we'd have a configuration file for the agent and this would be tunable. A constant is fine til we have that I guess.

let check_interval_duration = chrono::TimeDelta::seconds(one_hour_interval.as_secs() as i64);
let mut last_check_time = chrono::Utc::now();

let pubkey_lowercase = host_pubkey.to_string().to_lowercase();

loop {
// Periodically check inventory and compare against latest state (in-memory)
if should_check_inventory(last_check_time, check_interval_duration) {
log::debug!("Host Inventory has changed. About to push update to Orchestrator");
let current_inventory = HoloInventory::from_host();
if in_memory_cache != current_inventory {
let authenticated_user_inventory_subject =
format!("INVENTORY.{HOST_AUTHENTICATED_SUBJECT}.{pubkey_lowercase}.update");

let payload_bytes = serde_json::to_vec(&current_inventory)?;

let payload = PublishInfo {
subject: authenticated_user_inventory_subject,
msg_id: chrono::Utc::now().to_string(),
data: payload_bytes,
headers: None,
};

host_client.publish(payload).await?;
in_memory_cache = current_inventory
}
last_check_time = chrono::Utc::now();
} else {
log::debug!("Host Inventory has not changed.");
}

sleep(one_hour_interval).await;
}
}
2 changes: 2 additions & 0 deletions rust/clients/host_agent/src/hostd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
pub mod gen_leaf_server;
pub mod host_client;
pub mod inventory;
pub mod workload;
52 changes: 10 additions & 42 deletions rust/clients/host_agent/src/hostd/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,27 @@ This client is responsible for subscribing to workload streams that handle:

use anyhow::{anyhow, Result};
use async_nats::Message;
use std::{path::PathBuf, sync::Arc, time::Duration};
use std::sync::Arc;
use util_libs::nats::{
jetstream_client,
types::{ConsumerBuilder, EndpointType, JsClientBuilder, JsServiceBuilder},
jetstream_client::JsClient,
types::{ConsumerBuilder, EndpointType, JsServiceBuilder},
};
use workload::{
host_api::HostWorkloadApi, types::WorkloadServiceSubjects, WorkloadServiceApi,
WORKLOAD_SRV_DESC, WORKLOAD_SRV_NAME, WORKLOAD_SRV_SUBJ, WORKLOAD_SRV_VERSION,
};

const HOST_AGENT_CLIENT_NAME: &str = "Host Agent";
const HOST_AGENT_INBOX_PREFIX: &str = "_WORKLOAD_INBOX";

// TODO: Use _host_creds_path for auth once we add in the more resilient auth pattern.
pub async fn run(
mut host_client: JsClient,
host_pubkey: &str,
host_creds_path: &Option<PathBuf>,
) -> Result<jetstream_client::JsClient, async_nats::Error> {
log::info!("Host Agent Client: Connecting to server...");
log::info!("host_creds_path : {:?}", host_creds_path);
) -> Result<JsClient, async_nats::Error> {
log::info!("Host Agent Client: starting workload service...");
log::info!("host_pubkey : {}", host_pubkey);

let pubkey_lowercase = host_pubkey.to_string().to_lowercase();

// ==================== Setup NATS ====================
// Connect to Nats server
let nats_url = jetstream_client::get_nats_url();
log::info!("nats_url : {}", nats_url);

let event_listeners = jetstream_client::get_event_listeners();

// Spin up Nats Client and loaded in the Js Stream Service
let mut host_workload_client = jetstream_client::JsClient::new(JsClientBuilder {
nats_url: nats_url.clone(),
name: HOST_AGENT_CLIENT_NAME.to_string(),
inbox_prefix: format!("{}.{}", HOST_AGENT_INBOX_PREFIX, pubkey_lowercase),
credentials_path: host_creds_path
.as_ref()
.map(|path| path.to_string_lossy().to_string()),
ping_interval: Some(Duration::from_secs(10)),
request_timeout: Some(Duration::from_secs(29)),
listeners: vec![jetstream_client::with_event_listeners(
event_listeners.clone(),
)],
})
.await
.map_err(|e| anyhow::anyhow!("connecting to NATS via {nats_url}: {e}"))?;

// ==================== Setup JS Stream Service ====================
// Instantiate the Workload API
let workload_api = HostWorkloadApi::default();
let pubkey_lowercase = host_pubkey.to_string().to_lowercase();

// Register Workload Streams for Host Agent to consume
// NB: Subjects are published by orchestrator or nats-db-connector
Expand All @@ -72,11 +42,9 @@ pub async fn run(
version: WORKLOAD_SRV_VERSION.to_string(),
service_subject: WORKLOAD_SRV_SUBJ.to_string(),
};
host_workload_client
.add_js_service(workload_stream_service)
.await?;
host_client.add_js_service(workload_stream_service).await?;

let workload_service = host_workload_client
let workload_service = host_client
.get_js_service(WORKLOAD_SRV_NAME.to_string())
.await
.ok_or(anyhow!(
Expand Down Expand Up @@ -151,5 +119,5 @@ pub async fn run(
})
.await?;

Ok(host_workload_client)
Ok(host_client)
}
15 changes: 10 additions & 5 deletions rust/clients/host_agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,22 @@ async fn daemonize(args: &DaemonzeArgs) -> Result<(), async_nats::Error> {
// TODO: would it be a good idea to reuse this client in the workload_manager and elsewhere later on?
bare_client.close().await?;

let host_workload_client = hostd::workload::run(
"host_id_placeholder>",
let host_client = hostd::host_client::run(
"host_pubkey_placeholder>",
&args.nats_leafnode_client_creds_path,
)
.await?;

hostd::inventory::run(host_client.clone(), "host_pubkey_placeholder>").await?;

hostd::workload::run(host_client.clone(), "host_pubkey_placeholder>").await?;

// Only exit program when explicitly requested
tokio::signal::ctrl_c().await?;

// Close client and drain internal buffer before exiting to make sure all messages are sent
host_workload_client.close().await?;

// Close host client connection and drain internal buffer before exiting to make sure all messages are sent
// NB: Calling drain/close on any one of the Client instances will close the underlying connection.
// This affects all instances that share the same connection (including clones) because they are all references to the same resource.
host_client.close().await?;
Ok(())
}
1 change: 1 addition & 0 deletions rust/clients/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ utoipa-swagger-ui = { version = "9", features = [
utoipa = { version = "5", features = ["actix_extras"] }
util_libs = { path = "../../util_libs" }
workload = { path = "../../services/workload" }
inventory = { path = "../../services/inventory" }
33 changes: 33 additions & 0 deletions rust/clients/orchestrator/src/admin_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::path::PathBuf;
use std::time::Duration;
use std::vec;
use util_libs::nats::{
jetstream_client::{get_event_listeners, get_nats_url, with_event_listeners, JsClient},
types::JsClientBuilder,
};

const ORCHESTRATOR_ADMIN_CLIENT_NAME: &str = "Orchestrator Admin Client";
const ORCHESTRATOR_ADMIN_CLIENT_INBOX_PREFIX: &str = "ORCHESTRATOR._ADMIN_INBOX";

pub async fn run(admin_creds_path: &Option<PathBuf>) -> anyhow::Result<JsClient> {
let nats_url = get_nats_url();
log::info!("nats_url : {}", nats_url);

let creds = admin_creds_path
.as_ref()
.map(|path| path.to_string_lossy().to_string());

let admin_client = JsClient::new(JsClientBuilder {
nats_url: nats_url.clone(),
name: ORCHESTRATOR_ADMIN_CLIENT_NAME.to_string(),
inbox_prefix: ORCHESTRATOR_ADMIN_CLIENT_INBOX_PREFIX.to_string(),
credentials_path: creds.clone(),
request_timeout: Some(Duration::from_secs(29)),
ping_interval: Some(Duration::from_secs(10)),
listeners: vec![with_event_listeners(get_event_listeners())],
})
.await
.map_err(|e| anyhow::anyhow!("connecting to NATS via {nats_url}: {e}"))?;

Ok(admin_client)
}
Loading