Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

134: add inventory service and clients (sans auth) #86

Merged
merged 8 commits into from
Mar 5, 2025
135 changes: 67 additions & 68 deletions rust/services/inventory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ pub mod types;
use anyhow::Result;
use async_nats::jetstream::ErrorCode;
use async_nats::Message;
use bson::oid::ObjectId;
use bson::{self, doc, DateTime};
use bson::{self, doc, oid::ObjectId, Bson, DateTime};
use hpos_hal::inventory::HoloInventory;
use mongodb::results::UpdateResult;
use mongodb::{options::UpdateModifications, Client as MongoDBClient};
Expand Down Expand Up @@ -87,57 +86,14 @@ impl InventoryServiceApi {
.host_collection
.get_one_from(doc! { "device_id": &host_pubkey })
.await?
.ok_or(ServiceError::Internal(format!(
"Failed to fetch Host. host_pubkey={}",
host_pubkey
)))?;

let host_id = host._id.ok_or(ServiceError::Internal(format!(
"Failed to fetch Host. host_pubkey={}",
host_pubkey
)))?;

// Ensure assigned host *still* has enough capacity for assigned workload(s)
// ..and if no, remove host from workload and create collection of all ineligible workloads
let mut ineligible_assigned_workloads: Vec<ObjectId> = vec![];
for workload_id in host.assigned_workloads {
let workload = self
.workload_collection
.get_one_from(doc! { "_id": workload_id })
.await?
.ok_or(ServiceError::Internal(format!(
"Failed to fetch Workload. workload_id={}",
workload_id
)))?;

if !self.verify_host_meets_workload_criteria(&host_inventory, &workload) {
ineligible_assigned_workloads.push(workload_id);

self.host_collection
.update_one_within(
doc! { "_id": host_id },
UpdateModifications::Document(doc! {
"$pull": {
"assigned_workloads": workload_id
}
}),
)
.await?;
};
}
// ...and remove host from all ineligible workloads
if !ineligible_assigned_workloads.is_empty() {
self.workload_collection
.update_many_within(
doc! { "_id": { "$in": ineligible_assigned_workloads } },
UpdateModifications::Document(doc! {
"$pull": {
"assigned_hosts": host_id
}
}),
)
.await?;
}
.ok_or_else(|| {
ServiceError::Internal(format!(
"Failed to fetch Host. host_pubkey={}",
host_pubkey
))
})?;

self.handle_ineligible_host_workloads(host).await?;
}
InventoryPayloadType::Unauthenticated(host_inventory) => {
log::debug!(
Expand Down Expand Up @@ -216,25 +172,68 @@ impl InventoryServiceApi {
.await
}

// Verifies that a host meets the workload criteria
pub fn verify_host_meets_workload_criteria(
&self,
host_inventory: &HoloInventory,
workload: &Workload,
) -> bool {
let host_drive_capacity = host_inventory.drives.iter().fold(0, |mut acc, d| {
fn calculate_host_drive_capacity(&self, host_inventory: &HoloInventory) -> i64 {
host_inventory.drives.iter().fold(0 as i64, |mut acc, d| {
if let Some(capacity) = d.capacity_bytes {
acc += capacity;
acc += capacity as i64;
}
acc
});
if host_drive_capacity < workload.system_specs.capacity.drive {
return false;
}
if host_inventory.cpus.len() < workload.system_specs.capacity.cores as usize {
return false;
})
}

async fn handle_ineligible_host_workloads(&self, host: Host) -> Result<(), ServiceError> {
let host_id = host._id.ok_or_else(|| {
ServiceError::Internal(format!(
"Host is missing '_id' field. host_pubkey={}",
host.device_id
))
})?;

// Fetch all assigned workloads that exceed the host's capcity in a single query
let ineligible_assigned_workloads: Vec<ObjectId> = self
.workload_collection
.get_many_from(doc! {
"_id": { "$in": &host.assigned_workloads },
"$expr": {
"$and": [
{ "$gt": ["$workload.system_specs.capacity.drive", Bson::Int64(self.calculate_host_drive_capacity(&host.inventory))] },
{ "$gt": ["$workload.system_specs.capacity.cores", Bson::Int64( host.inventory.cpus.len() as i64)] }
]
}
})
.await?
.into_iter()
.map(|workload| workload._id.ok_or_else(|| {
ServiceError::Internal(format!(
"Host is missing '_id' field. host_pubkey={}",
host.device_id
))
}))
.collect::<Result<Vec<ObjectId>, _>>()?;

// Update database only if there are ineligible workloads
if !ineligible_assigned_workloads.is_empty() {
// Remove ineligible workloads from host
self.host_collection
.update_one_within(
doc! { "_id": host_id },
UpdateModifications::Document(doc! {
"$pull": { "assigned_workloads": { "$in": &ineligible_assigned_workloads } }
}),
)
.await?;

// Remove host from ineligible workloads
self.workload_collection
.update_many_within(
doc! { "_id": { "$in": &ineligible_assigned_workloads } },
UpdateModifications::Document(doc! {
"$pull": { "assigned_hosts": host_id }
}),
)
.await?;
}

true
Ok(())
}
}