From badd9ddc0bcf1b8db69a79880fc13eca3e6575b0 Mon Sep 17 00:00:00 2001 From: Ryo Abe <30817722+rabe1028@users.noreply.github.com> Date: Fri, 29 Sep 2023 01:41:53 +0900 Subject: [PATCH] Implement heartbeat --- src/controllers/public/nodex_receive.rs | 4 +++ src/handlers/heartbeat.rs | 38 +++++++++++++++++++++++++ src/handlers/mod.rs | 1 + src/main.rs | 6 ++++ src/nodex/utils/hub_client.rs | 17 +++++++++++ src/services/hub.rs | 22 ++++++++++++++ 6 files changed, 88 insertions(+) create mode 100644 src/handlers/heartbeat.rs diff --git a/src/controllers/public/nodex_receive.rs b/src/controllers/public/nodex_receive.rs index a7ef4a30..c9f8b304 100644 --- a/src/controllers/public/nodex_receive.rs +++ b/src/controllers/public/nodex_receive.rs @@ -28,6 +28,10 @@ impl ConnectionRepository { } } + pub fn connection_count(&self) -> usize { + self.connections.read().unwrap().len() + } + pub fn insert(&mut self, addr: Addr) -> bool { self.connections.write().unwrap().insert(addr) } diff --git a/src/handlers/heartbeat.rs b/src/handlers/heartbeat.rs new file mode 100644 index 00000000..2496907e --- /dev/null +++ b/src/handlers/heartbeat.rs @@ -0,0 +1,38 @@ +use crate::{ + controllers::public::nodex_receive::ConnectionRepository, network::Network, services::hub::Hub, +}; +use std::sync::{atomic::AtomicBool, Arc}; + +pub async fn handler( + shutdown_marker: Arc, + connection_repository: ConnectionRepository, +) { + let network = Network::new(); + + let heartbeat_interval_sec = if let Some(heartbeat_interval_sec) = network.get_heartbeat() { + heartbeat_interval_sec + } else { + log::info!("heartbeat is disabled"); + return; + }; + + let project_did = network.root.project_did.expect("project_did is not set"); + + log::info!("heartbeat task is started"); + + let mut interval = + tokio::time::interval(std::time::Duration::from_secs(heartbeat_interval_sec)); + let hub = Hub::new(); + + while !shutdown_marker.load(std::sync::atomic::Ordering::SeqCst) { + interval.tick().await; + + let is_active = connection_repository.connection_count() > 0; + + if let Err(e) = hub.heartbeat(&project_did, is_active).await { + log::error!("failed to send heartbeat: {:?}", e); + } + } + + log::info!("heartbeat task is stopped"); +} diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index bf7a36ff..a04b5bdf 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -1,6 +1,7 @@ use serde_json::Value; use tokio::sync::oneshot; +pub mod heartbeat; pub mod receiver; pub mod sender; diff --git a/src/main.rs b/src/main.rs index d7128ea8..0df38974 100644 --- a/src/main.rs +++ b/src/main.rs @@ -223,6 +223,11 @@ async fn main() -> std::io::Result<()> { connection_repository.clone(), )); + let heartbeat_task = tokio::spawn(handlers::heartbeat::handler( + Arc::clone(&shutdown_marker), + connection_repository.clone(), + )); + let server_task = tokio::spawn(server); let sender_task = tokio::spawn(handlers::sender::handler( rx, @@ -250,6 +255,7 @@ async fn main() -> std::io::Result<()> { sender_task, receiver_task, message_polling_task, + heartbeat_task, shutdown ) { Ok(_) => Ok(()), diff --git a/src/nodex/utils/hub_client.rs b/src/nodex/utils/hub_client.rs index 009f7468..10294c81 100644 --- a/src/nodex/utils/hub_client.rs +++ b/src/nodex/utils/hub_client.rs @@ -183,6 +183,23 @@ impl HubClient { self.post(url.unwrap().as_ref(), &payload).await } + pub async fn heartbeat( + &self, + path: &str, + project_did: &str, + is_active: bool, + ) -> Result { + let url = self.base_url.join(path); + let payload = json!({ + "is_health": is_active, + }); + let payload = DIDCommEncryptedService::generate(project_did, &payload, None) + .await? + .to_string(); + + self.post(url.unwrap().as_ref(), &payload).await + } + #[allow(dead_code)] pub async fn put(&self, _path: &str) -> Result { let url = self.base_url.join(_path); diff --git a/src/services/hub.rs b/src/services/hub.rs index 2b2fffd9..8d16e85e 100644 --- a/src/services/hub.rs +++ b/src/services/hub.rs @@ -174,4 +174,26 @@ impl Hub { } } } + + pub async fn heartbeat(&self, project_did: &str, is_active: bool) -> Result<(), NodeXError> { + let res = match self + .http_client + .heartbeat("/v1/heartbeat", project_did, is_active) + .await + { + Ok(v) => v, + Err(e) => { + log::error!("{:?}", e); + return Err(NodeXError {}); + } + }; + + match res.json::().await { + Ok(_) => Ok(()), + Err(e) => { + log::error!("{:?}", e); + Err(NodeXError {}) + } + } + } }