Skip to content

Commit

Permalink
Implement heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
rabe1028 committed Sep 28, 2023
1 parent 670aac4 commit badd9dd
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/controllers/public/nodex_receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ impl ConnectionRepository {
}
}

pub fn connection_count(&self) -> usize {
self.connections.read().unwrap().len()
}

pub fn insert(&mut self, addr: Addr<MessageReceiveActor>) -> bool {
self.connections.write().unwrap().insert(addr)
}
Expand Down
38 changes: 38 additions & 0 deletions src/handlers/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use crate::{
controllers::public::nodex_receive::ConnectionRepository, network::Network, services::hub::Hub,
};
use std::sync::{atomic::AtomicBool, Arc};

pub async fn handler(
shutdown_marker: Arc<AtomicBool>,
connection_repository: ConnectionRepository,
) {
let network = Network::new();

let heartbeat_interval_sec = if let Some(heartbeat_interval_sec) = network.get_heartbeat() {
heartbeat_interval_sec
} else {
log::info!("heartbeat is disabled");
return;
};

let project_did = network.root.project_did.expect("project_did is not set");

log::info!("heartbeat task is started");

let mut interval =
tokio::time::interval(std::time::Duration::from_secs(heartbeat_interval_sec));
let hub = Hub::new();

while !shutdown_marker.load(std::sync::atomic::Ordering::SeqCst) {
interval.tick().await;

let is_active = connection_repository.connection_count() > 0;

if let Err(e) = hub.heartbeat(&project_did, is_active).await {
log::error!("failed to send heartbeat: {:?}", e);
}
}

log::info!("heartbeat task is stopped");
}
1 change: 1 addition & 0 deletions src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use serde_json::Value;
use tokio::sync::oneshot;

pub mod heartbeat;
pub mod receiver;
pub mod sender;

Expand Down
6 changes: 6 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ async fn main() -> std::io::Result<()> {
connection_repository.clone(),
));

let heartbeat_task = tokio::spawn(handlers::heartbeat::handler(
Arc::clone(&shutdown_marker),
connection_repository.clone(),
));

let server_task = tokio::spawn(server);
let sender_task = tokio::spawn(handlers::sender::handler(
rx,
Expand Down Expand Up @@ -250,6 +255,7 @@ async fn main() -> std::io::Result<()> {
sender_task,
receiver_task,
message_polling_task,
heartbeat_task,
shutdown
) {
Ok(_) => Ok(()),
Expand Down
17 changes: 17 additions & 0 deletions src/nodex/utils/hub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,23 @@ impl HubClient {
self.post(url.unwrap().as_ref(), &payload).await
}

pub async fn heartbeat(
&self,
path: &str,
project_did: &str,
is_active: bool,
) -> Result<reqwest::Response, NodeXError> {
let url = self.base_url.join(path);
let payload = json!({
"is_health": is_active,
});
let payload = DIDCommEncryptedService::generate(project_did, &payload, None)
.await?
.to_string();

self.post(url.unwrap().as_ref(), &payload).await
}

#[allow(dead_code)]
pub async fn put(&self, _path: &str) -> Result<reqwest::Response, NodeXError> {
let url = self.base_url.join(_path);
Expand Down
22 changes: 22 additions & 0 deletions src/services/hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,26 @@ impl Hub {
}
}
}

pub async fn heartbeat(&self, project_did: &str, is_active: bool) -> Result<(), NodeXError> {
let res = match self
.http_client
.heartbeat("/v1/heartbeat", project_did, is_active)
.await
{
Ok(v) => v,
Err(e) => {
log::error!("{:?}", e);
return Err(NodeXError {});
}
};

match res.json::<EmptyResponse>().await {
Ok(_) => Ok(()),
Err(e) => {
log::error!("{:?}", e);
Err(NodeXError {})
}
}
}
}

0 comments on commit badd9dd

Please sign in to comment.