From 38a018184f789db456794302b1391eed2bc2472a Mon Sep 17 00:00:00 2001 From: Paulo Bressan Date: Wed, 28 Feb 2024 11:59:07 -0300 Subject: [PATCH] Implement k8s event to remove auth key inside the ogmios proxy (#26) * chore: adjusted handlers * chore: added update keys for k8s events --- proxy/src/auth.rs | 49 ++++++++++++++++++++++++++++++++--------------- proxy/src/main.rs | 17 ---------------- 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/proxy/src/auth.rs b/proxy/src/auth.rs index 6aa62fd..3856a04 100644 --- a/proxy/src/auth.rs +++ b/proxy/src/auth.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use futures_util::TryStreamExt; use operator::{ @@ -8,14 +8,14 @@ use operator::{ watcher::{self, Config}, WatchStreamExt, }, - Api, Client, + Api, Client, ResourceExt, }, OgmiosPort, }; use tokio::{pin, sync::RwLock}; use tracing::error; -use crate::State; +use crate::{Consumer, State}; pub async fn start(state: Arc>) { let client = Client::try_default() @@ -23,17 +23,9 @@ pub async fn start(state: Arc>) { .expect("failed to create kube client"); let api = Api::::all(client.clone()); - let result = api.list(&ListParams::default()).await; - if let Err(err) = result { - error!(error = err.to_string(), "error to get crds"); - std::process::exit(1); - } + update_auth(state.clone(), api.clone()).await; - for crd in result.unwrap().items.iter() { - state.write().await.add_auth_token(crd); - } - - let stream = watcher::watcher(api, Config::default()).applied_objects(); + let stream = watcher::watcher(api.clone(), Config::default()).touched_objects(); pin!(stream); loop { @@ -42,8 +34,35 @@ pub async fn start(state: Arc>) { error!(error = err.to_string(), "fail crd auth watcher"); continue; } - if let Some(crd) = result.unwrap() { - state.write().await.add_auth_token(&crd); + + update_auth(state.clone(), api.clone()).await; + } +} + +async fn update_auth(state: Arc>, api: Api) { + let result = api.list(&ListParams::default()).await; + if let Err(err) = result { + error!( + error = err.to_string(), + "error to get crds while updating auth keys" + ); + return; + } + + let mut consumers = HashMap::new(); + for crd in result.unwrap().items.iter() { + if crd.status.is_some() { + let network = crd.spec.network.to_string(); + let version = crd.spec.version; + let auth_token = crd.status.as_ref().unwrap().auth_token.clone(); + let namespace = crd.metadata.namespace.as_ref().unwrap().clone(); + let port_name = crd.name_any(); + + let hash_key = format!("{}.{}.{}", network, version, auth_token); + let consumer = Consumer::new(namespace, port_name); + + consumers.insert(hash_key, consumer); } } + state.write().await.consumers = consumers; } diff --git a/proxy/src/main.rs b/proxy/src/main.rs index d37d87c..7471d25 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -1,8 +1,6 @@ use config::Config; use dotenv::dotenv; use metrics::Metrics; -use operator::kube::ResourceExt; -use operator::OgmiosPort; use prometheus::Registry; use regex::Regex; use std::collections::HashMap; @@ -76,21 +74,6 @@ impl State { }) } - pub fn add_auth_token(&mut self, crd: &OgmiosPort) { - if crd.status.is_some() { - let network = crd.spec.network.to_string(); - let version = crd.spec.version; - let auth_token = crd.status.as_ref().unwrap().auth_token.clone(); - let namespace = crd.metadata.namespace.as_ref().unwrap().clone(); - let port_name = crd.name_any(); - - let hash_key = format!("{}.{}.{}", network, version, auth_token); - let consumer = Consumer::new(namespace, port_name); - - self.consumers.insert(hash_key, consumer); - } - } - pub fn get_auth_token(&self, network: &str, version: &str, token: &str) -> Option { let hash_key = format!("{}.{}.{}", network, version, token); self.consumers.get(&hash_key).cloned()