From 68241da7c33fdc6d3d3b0fb21465911943414171 Mon Sep 17 00:00:00 2001 From: ndefokou Date: Tue, 3 Dec 2024 16:01:35 +0100 Subject: [PATCH 01/14] adding the retry logic on the database --- Cargo.toml | 1 + crates/database/Cargo.toml | 2 + crates/database/src/lib.rs | 130 +++++++++++++++++++++-------------- crates/retry_util/Cargo.toml | 8 +++ crates/retry_util/src/lib.rs | 15 ++++ 5 files changed, 103 insertions(+), 53 deletions(-) create mode 100644 crates/retry_util/Cargo.toml create mode 100644 crates/retry_util/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 16637fd2..194e0286 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "crates/web-plugins/didcomm-messaging/did-utils", "crates/web-plugins/didcomm-messaging/shared", "crates/web-plugins/didcomm-messaging/protocols/*", + "crates/retry_util", ] [workspace.dependencies] diff --git a/crates/database/Cargo.toml b/crates/database/Cargo.toml index dd42696b..27550f8a 100644 --- a/crates/database/Cargo.toml +++ b/crates/database/Cargo.toml @@ -10,3 +10,5 @@ async-trait.workspace = true serde.workspace = true thiserror.workspace = true tokio.workspace = true + +retry_util = { path = "../retry_util" } \ No newline at end of file diff --git a/crates/database/src/lib.rs b/crates/database/src/lib.rs index 98d415de..34273545 100644 --- a/crates/database/src/lib.rs +++ b/crates/database/src/lib.rs @@ -6,6 +6,7 @@ use mongodb::{ Client, Collection, Database, }; use once_cell::sync::OnceCell; +use retry_util::retry_async_operation; use serde::{Deserialize, Serialize}; use std::sync::Arc; use thiserror::Error; @@ -71,18 +72,23 @@ where fn get_collection(&self) -> Arc>>; async fn find_all(&self) -> Result, RepositoryError> { - let mut entities = Vec::new(); - let collection = self.get_collection(); - - // Lock the Mutex and get the Collection - let mut cursor = collection.read().await.find(None, None).await?; - while cursor.advance().await? { - entities.push(cursor.deserialize_current()?); - } - - Ok(entities) + let operation = || { + let collection = self.get_collection(); + async move { + let collection = collection.read().await; + let mut entities = Vec::new(); + let mut cursor = collection.find(None, None).await?; + while cursor.advance().await? { + entities.push(cursor.deserialize_current()?); + } + Ok(entities) + } + }; + + retry_async_operation(operation, 3).await } + /// Counts all entities by filter. /// Counts all entities by filter. async fn count_by(&self, filter: BsonDocument) -> Result { let collection = self.get_collection(); @@ -96,7 +102,18 @@ where } async fn find_one(&self, id: ObjectId) -> Result, RepositoryError> { - self.find_one_by(doc! {"_id": id}).await + let operation = || { + let collection = self.get_collection(); + async move { + let collection = collection.read().await; + collection + .find_one(doc! {"_id": id}, None) + .await + .map_err(|err| RepositoryError::from(err)) + } + }; + + retry_async_operation(operation, 3).await } async fn find_one_by(&self, filter: BsonDocument) -> Result, RepositoryError> { @@ -108,21 +125,21 @@ where } /// Stores a new entity. - async fn store(&self, mut entity: Entity) -> Result { - let collection = self.get_collection(); - - // Lock the Mutex and get the Collection - let collection = collection.read().await; - - // Insert the new entity into the database - let metadata = collection.insert_one(entity.clone(), None).await?; - - // Set the ID if it was inserted and return the updated entity - if let Bson::ObjectId(oid) = metadata.inserted_id { - entity.set_id(oid); - } - - Ok(entity) + async fn store(&self, entity: Entity) -> Result { + let operation = move || { + let collection = self.get_collection(); + let mut entity = entity.clone(); + async move { + let collection = collection.read().await; + let metadata = collection.insert_one(entity.clone(), None).await?; + if let Bson::ObjectId(oid) = metadata.inserted_id { + entity.set_id(oid); + } + Ok(entity) + } + }; + + retry_async_operation(operation, 3).await } async fn find_all_by( @@ -147,40 +164,47 @@ where } async fn delete_one(&self, id: ObjectId) -> Result<(), RepositoryError> { - let collection = self.get_collection(); - - // Lock the Mutex and get the Collection - let collection = collection.read().await; - - // Delete the entity from the database - collection.delete_one(doc! {"_id": id}, None).await?; - - Ok(()) + let operation = || { + let collection = self.get_collection(); + async move { + let collection = collection.read().await; + collection + .delete_one(doc! {"_id": id}, None) + .await + .map(|_| ()) + .map_err(|err| RepositoryError::from(err)) + } + }; + + retry_async_operation(operation, 3).await } async fn update(&self, entity: Entity) -> Result { if entity.id().is_none() { return Err(RepositoryError::MissingIdentifier); } - let collection = self.get_collection(); - // Lock the Mutex and get the Collection - let collection = collection.read().await; - - // Update the entity in the database - let metadata = collection - .update_one( - doc! {"_id": entity.id().unwrap()}, - doc! {"$set": bson::to_document(&entity).map_err(|_| RepositoryError::BsonConversionError)?}, - None, - ) - .await?; - - if metadata.matched_count > 0 { - Ok(entity) - } else { - Err(RepositoryError::TargetNotFound) - } + let operation = move || { + let collection = self.get_collection(); + let entity = entity.clone(); + async move { + let collection = collection.read().await; + let metadata = collection + .update_one( + doc! {"_id": entity.id().unwrap()}, + doc! {"$set": bson::to_document(&entity).map_err(|_| RepositoryError::BsonConversionError)?}, + None, + ) + .await?; + if metadata.matched_count > 0 { + Ok(entity.clone()) + } else { + Err(RepositoryError::TargetNotFound) + } + } + }; + + retry_async_operation(operation, 3).await } } diff --git a/crates/retry_util/Cargo.toml b/crates/retry_util/Cargo.toml new file mode 100644 index 00000000..67ebd85f --- /dev/null +++ b/crates/retry_util/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "retry_util" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1", features = ["full"] } +tokio-retry = "0.3.0" diff --git a/crates/retry_util/src/lib.rs b/crates/retry_util/src/lib.rs new file mode 100644 index 00000000..469652eb --- /dev/null +++ b/crates/retry_util/src/lib.rs @@ -0,0 +1,15 @@ +use std::future::Future; +use tokio_retry::{strategy::ExponentialBackoff, Retry}; + +pub async fn retry_async_operation(operation: F, max_retries: usize) -> Result +where + F: Fn() -> Fut, + Fut: Future>, +{ + let retry_strategy = ExponentialBackoff::from_millis(10) + .factor(2) + .max_delay(std::time::Duration::from_secs(2)) + .take(max_retries); + + Retry::spawn(retry_strategy, operation).await +} From be8b0643e726f8dfd386b30c952f7c85bf455bf1 Mon Sep 17 00:00:00 2001 From: ndefokou Date: Wed, 4 Dec 2024 17:18:58 +0100 Subject: [PATCH 02/14] adding the retry logic on the different protocols --- Cargo.toml | 1 - crates/database/Cargo.toml | 2 - crates/database/src/lib.rs | 132 +++++++----------- crates/retry_util/Cargo.toml | 8 -- crates/retry_util/src/lib.rs | 15 -- .../protocols/forward/src/handler.rs | 32 +++-- .../src/handler/stateful.rs | 108 ++++++++++---- .../protocols/pickup/src/handler.rs | 109 +++++++++------ .../didcomm-messaging/shared/src/lib.rs | 1 + .../didcomm-messaging/shared/src/retry.rs | 75 ++++++++++ 10 files changed, 308 insertions(+), 175 deletions(-) delete mode 100644 crates/retry_util/Cargo.toml delete mode 100644 crates/retry_util/src/lib.rs create mode 100644 crates/web-plugins/didcomm-messaging/shared/src/retry.rs diff --git a/Cargo.toml b/Cargo.toml index 194e0286..16637fd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,6 @@ members = [ "crates/web-plugins/didcomm-messaging/did-utils", "crates/web-plugins/didcomm-messaging/shared", "crates/web-plugins/didcomm-messaging/protocols/*", - "crates/retry_util", ] [workspace.dependencies] diff --git a/crates/database/Cargo.toml b/crates/database/Cargo.toml index 27550f8a..dd42696b 100644 --- a/crates/database/Cargo.toml +++ b/crates/database/Cargo.toml @@ -10,5 +10,3 @@ async-trait.workspace = true serde.workspace = true thiserror.workspace = true tokio.workspace = true - -retry_util = { path = "../retry_util" } \ No newline at end of file diff --git a/crates/database/src/lib.rs b/crates/database/src/lib.rs index 34273545..8a1ed214 100644 --- a/crates/database/src/lib.rs +++ b/crates/database/src/lib.rs @@ -6,7 +6,6 @@ use mongodb::{ Client, Collection, Database, }; use once_cell::sync::OnceCell; -use retry_util::retry_async_operation; use serde::{Deserialize, Serialize}; use std::sync::Arc; use thiserror::Error; @@ -72,23 +71,18 @@ where fn get_collection(&self) -> Arc>>; async fn find_all(&self) -> Result, RepositoryError> { - let operation = || { - let collection = self.get_collection(); - async move { - let collection = collection.read().await; - let mut entities = Vec::new(); - let mut cursor = collection.find(None, None).await?; - while cursor.advance().await? { - entities.push(cursor.deserialize_current()?); - } - Ok(entities) - } - }; - - retry_async_operation(operation, 3).await + let mut entities = Vec::new(); + let collection = self.get_collection(); + + // Lock the Mutex and get the Collection + let mut cursor = collection.read().await.find(None, None).await?; + while cursor.advance().await? { + entities.push(cursor.deserialize_current()?); + } + + Ok(entities) } - /// Counts all entities by filter. /// Counts all entities by filter. async fn count_by(&self, filter: BsonDocument) -> Result { let collection = self.get_collection(); @@ -102,18 +96,7 @@ where } async fn find_one(&self, id: ObjectId) -> Result, RepositoryError> { - let operation = || { - let collection = self.get_collection(); - async move { - let collection = collection.read().await; - collection - .find_one(doc! {"_id": id}, None) - .await - .map_err(|err| RepositoryError::from(err)) - } - }; - - retry_async_operation(operation, 3).await + self.find_one_by(doc! {"_id": id}).await } async fn find_one_by(&self, filter: BsonDocument) -> Result, RepositoryError> { @@ -125,21 +108,21 @@ where } /// Stores a new entity. - async fn store(&self, entity: Entity) -> Result { - let operation = move || { - let collection = self.get_collection(); - let mut entity = entity.clone(); - async move { - let collection = collection.read().await; - let metadata = collection.insert_one(entity.clone(), None).await?; - if let Bson::ObjectId(oid) = metadata.inserted_id { - entity.set_id(oid); - } - Ok(entity) - } - }; - - retry_async_operation(operation, 3).await + async fn store(&self, mut entity: Entity) -> Result { + let collection = self.get_collection(); + + // Lock the Mutex and get the Collection + let collection = collection.read().await; + + // Insert the new entity into the database + let metadata = collection.insert_one(entity.clone(), None).await?; + + // Set the ID if it was inserted and return the updated entity + if let Bson::ObjectId(oid) = metadata.inserted_id { + entity.set_id(oid); + } + + Ok(entity) } async fn find_all_by( @@ -164,47 +147,40 @@ where } async fn delete_one(&self, id: ObjectId) -> Result<(), RepositoryError> { - let operation = || { - let collection = self.get_collection(); - async move { - let collection = collection.read().await; - collection - .delete_one(doc! {"_id": id}, None) - .await - .map(|_| ()) - .map_err(|err| RepositoryError::from(err)) - } - }; - - retry_async_operation(operation, 3).await + let collection = self.get_collection(); + + // Lock the Mutex and get the Collection + let collection = collection.read().await; + + // Delete the entity from the database + collection.delete_one(doc! {"_id": id}, None).await?; + + Ok(()) } async fn update(&self, entity: Entity) -> Result { if entity.id().is_none() { return Err(RepositoryError::MissingIdentifier); } + let collection = self.get_collection(); - let operation = move || { - let collection = self.get_collection(); - let entity = entity.clone(); - async move { - let collection = collection.read().await; - let metadata = collection - .update_one( - doc! {"_id": entity.id().unwrap()}, - doc! {"$set": bson::to_document(&entity).map_err(|_| RepositoryError::BsonConversionError)?}, - None, - ) - .await?; - if metadata.matched_count > 0 { - Ok(entity.clone()) - } else { - Err(RepositoryError::TargetNotFound) - } - } - }; - - retry_async_operation(operation, 3).await + // Lock the Mutex and get the Collection + let collection = collection.read().await; + + // Update the entity in the database + let metadata = collection + .update_one( + doc! {"_id": entity.id().unwrap()}, + doc! {"$set": bson::to_document(&entity).map_err(|_| RepositoryError::BsonConversionError)?}, + None, + ) + .await?; + + if metadata.matched_count > 0 { + Ok(entity) + } else { + Err(RepositoryError::TargetNotFound) + } } } @@ -212,4 +188,4 @@ impl From for RepositoryError { fn from(error: MongoError) -> Self { RepositoryError::Generic(error.to_string()) } -} +} \ No newline at end of file diff --git a/crates/retry_util/Cargo.toml b/crates/retry_util/Cargo.toml deleted file mode 100644 index 67ebd85f..00000000 --- a/crates/retry_util/Cargo.toml +++ /dev/null @@ -1,8 +0,0 @@ -[package] -name = "retry_util" -version = "0.1.0" -edition = "2021" - -[dependencies] -tokio = { version = "1", features = ["full"] } -tokio-retry = "0.3.0" diff --git a/crates/retry_util/src/lib.rs b/crates/retry_util/src/lib.rs deleted file mode 100644 index 469652eb..00000000 --- a/crates/retry_util/src/lib.rs +++ /dev/null @@ -1,15 +0,0 @@ -use std::future::Future; -use tokio_retry::{strategy::ExponentialBackoff, Retry}; - -pub async fn retry_async_operation(operation: F, max_retries: usize) -> Result -where - F: Fn() -> Fut, - Fut: Future>, -{ - let retry_strategy = ExponentialBackoff::from_millis(10) - .factor(2) - .max_delay(std::time::Duration::from_secs(2)) - .take(max_retries); - - Retry::spawn(retry_strategy, operation).await -} diff --git a/crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs b/crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs index cc64de99..1cb3aed6 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs @@ -5,9 +5,11 @@ use mongodb::bson::doc; use serde_json::{json, Value}; use shared::{ repository::entity::{Connection, RoutedMessage}, + retry::{retry_async, RetryOptions}, state::{AppState, AppStateRepository}, }; use std::sync::Arc; +use std::time::Duration; /// Mediator receives forwarded messages, extract the next field in the message body, and the attachments in the message /// then stores the attachment with the next field as key for pickup @@ -36,14 +38,28 @@ pub(crate) async fn mediator_forward_process( AttachmentData::Base64 { value: data } => json!(data.base64), AttachmentData::Links { value: data } => json!(data.links), }; - message_repository - .store(RoutedMessage { - id: None, - message: attached, - recipient_did: next.as_ref().unwrap().to_owned(), - }) - .await - .map_err(|_| ForwardError::InternalServerError)?; + retry_async( + || { + let attached = attached.clone(); + let recipient_did = next.as_ref().unwrap().to_owned(); + + async move { + message_repository + .store(RoutedMessage { + id: None, + message: attached, + recipient_did, + }) + .await + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(1)), + ) + .await + .map_err(|_| ForwardError::InternalServerError)?; } Ok(None) } diff --git a/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs b/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs index 027e89b9..ce5a54fb 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs @@ -21,9 +21,10 @@ use serde_json::json; use shared::{ midlw::ensure_transport_return_route_is_decorated_all, repository::entity::Connection, + retry::{retry_async, RetryOptions}, state::{AppState, AppStateRepository}, }; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use uuid::Uuid; /// Process a DIDComm mediate request @@ -53,10 +54,24 @@ pub(crate) async fn process_mediate_request( .ok_or(MediationError::InternalServerError)?; // If there is already mediation, send mediate deny - if let Some(_connection) = connection_repository - .find_one_by(doc! { "client_did": sender_did}) - .await - .map_err(|_| MediationError::InternalServerError)? + if let Some(_connection) = retry_async( + || { + let sender_did = sender_did.clone(); + let connection_repository = connection_repository.clone(); + + async move { + connection_repository + .find_one_by(doc! { "client_did": sender_did }) + .await + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(1)), + ) + .await + .map_err(|_| MediationError::InternalServerError)? { tracing::info!("Sending mediate deny."); return Ok(Some( @@ -85,15 +100,24 @@ pub(crate) async fn process_mediate_request( .as_ref() .ok_or(MediationError::InternalServerError)?; - let diddoc = state - .did_resolver - .resolve(&routing_did) - .await - .map_err(|err| { - tracing::error!("Failed to resolve DID: {:?}", err); - MediationError::InternalServerError - })? - .ok_or(MediationError::InternalServerError)?; + let diddoc = retry_async( + || { + let did_resolver = state.did_resolver.clone(); + let routing_did = routing_did.clone(); + + async move { did_resolver.resolve(&routing_did).await.map_err(|_| ()) } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(1)), + ) + .await + .map_err(|err| { + tracing::error!("Failed to resolve DID: {:?}", err); + MediationError::InternalServerError + })? + .ok_or(MediationError::InternalServerError)?; let agreem_keys_jwk: Jwk = agreem_keys.try_into().unwrap(); @@ -229,11 +253,29 @@ pub(crate) async fn process_plain_keylist_update_message( // Find connection for this keylist update - let connection = connection_repository - .find_one_by(doc! { "client_did": &sender }) - .await - .unwrap() - .ok_or_else(|| MediationError::UncoordinatedSender)?; + let connection = retry_async( + || { + let connection_repository = connection_repository.clone(); + let sender = sender.clone(); + + async move { + connection_repository + .find_one_by(doc! { "client_did": &sender }) + .await + .map_err(|_| ()) + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(1)), + ) + .await + .map_err(|err| { + tracing::error!("Failed to find connection after retries: {:?}", err); + MediationError::InternalServerError + })? + .ok_or_else(|| MediationError::UncoordinatedSender)?; // Prepare handles to relevant collections @@ -347,11 +389,29 @@ pub(crate) async fn process_plain_keylist_query_message( .as_ref() .ok_or(MediationError::InternalServerError)?; - let connection = connection_repository - .find_one_by(doc! { "client_did": &sender }) - .await - .unwrap() - .ok_or_else(|| MediationError::UncoordinatedSender)?; + let connection = retry_async( + || { + let connection_repository = connection_repository.clone(); + let sender = sender.clone(); + + async move { + connection_repository + .find_one_by(doc! { "client_did": &sender }) + .await + .map_err(|_| ()) + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(1)), + ) + .await + .map_err(|err| { + tracing::error!("Failed to find connection after retries: {:?}", err); + MediationError::InternalServerError + })? + .ok_or_else(|| MediationError::UncoordinatedSender)?; println!("keylist: {:?}", connection); diff --git a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs index 87c0a2b5..d0d4bbc0 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs @@ -12,9 +12,10 @@ use serde_json::Value; use shared::{ midlw::ensure_transport_return_route_is_decorated_all, repository::entity::{Connection, RoutedMessage}, + retry::{retry_async, RetryOptions}, state::{AppState, AppStateRepository}, }; -use std::{str::FromStr, sync::Arc}; +use std::{str::FromStr, sync::Arc, time::Duration}; use uuid::Uuid; // Process pickup status request @@ -160,21 +161,27 @@ pub(crate) async fn handle_message_acknowledgement( })?; for id in message_id_list { - let msg_id = ObjectId::from_str(id); - if msg_id.is_err() { - return Err(PickupError::MalformedRequest(format!( - "Invalid message id: {id}" - ))); - } - repository - .message_repository - .delete_one(msg_id.unwrap()) - .await - .map_err(|_| { - PickupError::InternalError( - "Failed to process the request. Please try again later.".to_owned(), - ) - })?; + let msg_id = ObjectId::from_str(id) + .map_err(|_| PickupError::MalformedRequest(format!("Invalid message id: {id}")))?; + + retry_async( + || { + let message_repository = repository.message_repository.clone(); + let msg_id = msg_id.clone(); + + async move { message_repository.delete_one(msg_id).await.map_err(|_| ()) } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(1)), + ) + .await + .map_err(|_| { + PickupError::InternalError( + "Failed to process the request. Please try again later.".to_owned(), + ) + })?; } let message_count = count_messages(repository, None, connection).await?; @@ -247,17 +254,29 @@ async fn count_messages( ) -> Result { let recipients = recipients(recipient_did, &connection); - let count = repository - .message_repository - .count_by(doc! { "recipient_did": { "$in": recipients } }) - .await - .map_err(|_| { - PickupError::InternalError( - "Failed to process the request. Please try again later.".to_owned(), - ) - })?; + retry_async( + || { + let message_repository = repository.message_repository.clone(); + let recipients = recipients.clone(); - Ok(count) + async move { + message_repository + .count_by(doc! { "recipient_did": { "$in": recipients } }) + .await + .map_err(|_| ()) + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(1)), + ) + .await + .map_err(|_| { + PickupError::InternalError( + "Failed to process the request. Please try again later.".to_owned(), + ) + }) } async fn messages( @@ -268,20 +287,32 @@ async fn messages( ) -> Result, PickupError> { let recipients = recipients(recipient_did, &connection); - let routed_messages = repository - .message_repository - .find_all_by( - doc! { "recipient_did": { "$in": recipients } }, - Some(limit as i64), - ) - .await - .map_err(|_| { - PickupError::InternalError( - "Failed to process the request. Please try again later.".to_owned(), - ) - })?; + retry_async( + || { + let message_repository = repository.message_repository.clone(); + let recipients = recipients.clone(); - Ok(routed_messages) + async move { + message_repository + .find_all_by( + doc! { "recipient_did": { "$in": recipients } }, + Some(limit as i64), + ) + .await + .map_err(|_| ()) + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(1)), + ) + .await + .map_err(|_| { + PickupError::InternalError( + "Failed to process the request. Please try again later.".to_owned(), + ) + }) } #[inline] diff --git a/crates/web-plugins/didcomm-messaging/shared/src/lib.rs b/crates/web-plugins/didcomm-messaging/shared/src/lib.rs index 530e4728..51a5f522 100644 --- a/crates/web-plugins/didcomm-messaging/shared/src/lib.rs +++ b/crates/web-plugins/didcomm-messaging/shared/src/lib.rs @@ -3,3 +3,4 @@ pub mod midlw; pub mod repository; pub mod state; pub mod utils; +pub mod retry; \ No newline at end of file diff --git a/crates/web-plugins/didcomm-messaging/shared/src/retry.rs b/crates/web-plugins/didcomm-messaging/shared/src/retry.rs new file mode 100644 index 00000000..117f5fa3 --- /dev/null +++ b/crates/web-plugins/didcomm-messaging/shared/src/retry.rs @@ -0,0 +1,75 @@ +use std::time::Duration; +use tokio::time::sleep; + +pub struct RetryOptions { + retries: usize, + fixed_backoff: Option, + exponential_backoff: Option, + max_delay: Option, +} + +impl RetryOptions { + pub fn new() -> Self { + Self { + retries: 3, + fixed_backoff: None, + exponential_backoff: None, + max_delay: None, + } + } + + pub fn retries(mut self, count: usize) -> Self { + self.retries = count; + self + } + + pub fn fixed_backoff(mut self, delay: Duration) -> Self { + self.fixed_backoff = Some(delay); + self + } + + pub fn exponential_backoff(mut self, initial_delay: Duration) -> Self { + self.exponential_backoff = Some(initial_delay); + self + } + + pub fn max_delay(mut self, delay: Duration) -> Self { + self.max_delay = Some(delay); + self + } +} + +pub async fn retry_async(mut operation: F, options: RetryOptions) -> Result +where + F: FnMut() -> Fut, + Fut: std::future::Future>, +{ + let RetryOptions { + retries, + fixed_backoff, + exponential_backoff, + max_delay, + } = options; + + let mut attempt = 0; + let mut delay = exponential_backoff.unwrap_or_default(); + let max_delay = max_delay.unwrap_or_else(|| Duration::from_secs(60)); // Default max delay of 60 seconds + + loop { + attempt += 1; + + match operation().await { + Ok(result) => return Ok(result), + Err(err) if attempt <= retries => { + if let Some(fixed) = fixed_backoff { + sleep(fixed).await; + } else if delay > Duration::ZERO { + let next_delay = delay.min(max_delay); + sleep(next_delay).await; + delay = (delay * 2).min(max_delay); + } + } + Err(err) => return Err(err), + } + } +} From e472be99628d8f22b949f9dcc04a6429737c0457 Mon Sep 17 00:00:00 2001 From: ndefokou Date: Thu, 5 Dec 2024 07:58:06 +0100 Subject: [PATCH 03/14] fixe(): formating files --- crates/database/src/lib.rs | 2 +- crates/web-plugins/didcomm-messaging/shared/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/database/src/lib.rs b/crates/database/src/lib.rs index 8a1ed214..98d415de 100644 --- a/crates/database/src/lib.rs +++ b/crates/database/src/lib.rs @@ -188,4 +188,4 @@ impl From for RepositoryError { fn from(error: MongoError) -> Self { RepositoryError::Generic(error.to_string()) } -} \ No newline at end of file +} diff --git a/crates/web-plugins/didcomm-messaging/shared/src/lib.rs b/crates/web-plugins/didcomm-messaging/shared/src/lib.rs index 51a5f522..ec524725 100644 --- a/crates/web-plugins/didcomm-messaging/shared/src/lib.rs +++ b/crates/web-plugins/didcomm-messaging/shared/src/lib.rs @@ -1,6 +1,6 @@ pub mod errors; pub mod midlw; pub mod repository; +pub mod retry; pub mod state; pub mod utils; -pub mod retry; \ No newline at end of file From 3384fc3c8c3f48020225668071d0b84d49da4169 Mon Sep 17 00:00:00 2001 From: ndefokou Date: Wed, 11 Dec 2024 17:18:01 +0100 Subject: [PATCH 04/14] implementing the circuit breaker --- crates/circuit-breaker/Cargo.toml | 6 ++ crates/circuit-breaker/src/lib.rs | 14 +++ .../protocols/forward/src/error.rs | 3 + .../protocols/forward/src/handler.rs | 20 +++- .../protocols/pickup/src/handler.rs | 53 +++++++-- .../shared/src/CircuitBreaker.rs | 102 ++++++++++++++++++ .../didcomm-messaging/shared/src/lib.rs | 1 + .../didcomm-messaging/shared/src/state.rs | 7 +- 8 files changed, 193 insertions(+), 13 deletions(-) create mode 100644 crates/circuit-breaker/Cargo.toml create mode 100644 crates/circuit-breaker/src/lib.rs create mode 100644 crates/web-plugins/didcomm-messaging/shared/src/CircuitBreaker.rs diff --git a/crates/circuit-breaker/Cargo.toml b/crates/circuit-breaker/Cargo.toml new file mode 100644 index 00000000..905af9b2 --- /dev/null +++ b/crates/circuit-breaker/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "circuit-breaker" +version = "0.1.0" +edition = "2021" + +[dependencies] diff --git a/crates/circuit-breaker/src/lib.rs b/crates/circuit-breaker/src/lib.rs new file mode 100644 index 00000000..7d12d9af --- /dev/null +++ b/crates/circuit-breaker/src/lib.rs @@ -0,0 +1,14 @@ +pub fn add(left: usize, right: usize) -> usize { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} diff --git a/crates/web-plugins/didcomm-messaging/protocols/forward/src/error.rs b/crates/web-plugins/didcomm-messaging/protocols/forward/src/error.rs index 5076f3bb..376c2247 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/forward/src/error.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/forward/src/error.rs @@ -10,6 +10,8 @@ pub(crate) enum ForwardError { UncoordinatedSender, #[error("Internal server error")] InternalServerError, + #[error("Service unavailable")] + CircuitOpen, } impl IntoResponse for ForwardError { @@ -18,6 +20,7 @@ impl IntoResponse for ForwardError { ForwardError::MalformedBody => StatusCode::BAD_REQUEST, ForwardError::UncoordinatedSender => StatusCode::UNAUTHORIZED, ForwardError::InternalServerError => StatusCode::INTERNAL_SERVER_ERROR, + ForwardError::CircuitOpen => StatusCode::SERVICE_UNAVAILABLE, }; let body = Json(serde_json::json!({ diff --git a/crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs b/crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs index 1cb3aed6..0b260527 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs @@ -26,6 +26,11 @@ pub(crate) async fn mediator_forward_process( .as_ref() .ok_or_else(|| ForwardError::InternalServerError)?; + let circuit_breaker = state.circuit_breaker.clone(); + if circuit_breaker.is_open() { + return Err(ForwardError::CircuitOpen); + } + let next = match checks(&message, connection_repository).await.ok() { Some(next) => Ok(next), None => Err(ForwardError::InternalServerError), @@ -38,7 +43,8 @@ pub(crate) async fn mediator_forward_process( AttachmentData::Base64 { value: data } => json!(data.base64), AttachmentData::Links { value: data } => json!(data.links), }; - retry_async( + + let result = retry_async( || { let attached = attached.clone(); let recipient_did = next.as_ref().unwrap().to_owned(); @@ -58,9 +64,17 @@ pub(crate) async fn mediator_forward_process( .exponential_backoff(Duration::from_millis(100)) .max_delay(Duration::from_secs(1)), ) - .await - .map_err(|_| ForwardError::InternalServerError)?; + .await; + + match result { + Ok(_) => circuit_breaker.record_success(), + Err(_) => { + circuit_breaker.record_failure(); + return Err(ForwardError::InternalServerError); + } + }; } + Ok(None) } diff --git a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs index d0d4bbc0..7ec7b3cb 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs @@ -35,7 +35,19 @@ pub(crate) async fn handle_status_request( let sender_did = sender_did(&message)?; let repository = repository(state.clone())?; - let connection = client_connection(&repository, sender_did).await?; + + let connection = retry_async( + || { + let repository = repository.clone(); + async move { client_connection(&repository, sender_did).await } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(2)), + ) + .await + .map_err(|_| PickupError::InternalError("Failed to retrieve client connection".to_owned()))?; let message_count = count_messages(repository, recipient_did, connection).await?; @@ -76,15 +88,40 @@ pub(crate) async fn handle_delivery_request( .and_then(|val| val.as_str()); let sender_did = sender_did(&message)?; - // Get the messages limit - let limit = message - .body - .get("limit") - .and_then(Value::as_u64) - .ok_or_else(|| PickupError::MalformedRequest("Invalid \"limit\" specifier".to_owned()))?; + let message_body = message.body.clone(); + + let limit = retry_async( + || { + let message_body = message_body.clone(); + async move { + message_body + .get("limit") + .and_then(Value::as_u64) + .ok_or_else(|| { + PickupError::MalformedRequest("Invalid \"limit\" specifier".to_owned()) + }) + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(1)), + ) + .await?; let repository = repository(state.clone())?; - let connection = client_connection(&repository, sender_did).await?; + let connection = retry_async( + || { + let repository = repository.clone(); + async move { client_connection(&repository, sender_did).await } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(2)), + ) + .await + .map_err(|_| PickupError::InternalError("Failed to retrieve client connection".to_owned()))?; let messages = messages(repository, recipient_did, connection, limit as usize).await?; diff --git a/crates/web-plugins/didcomm-messaging/shared/src/CircuitBreaker.rs b/crates/web-plugins/didcomm-messaging/shared/src/CircuitBreaker.rs new file mode 100644 index 00000000..cc05079c --- /dev/null +++ b/crates/web-plugins/didcomm-messaging/shared/src/CircuitBreaker.rs @@ -0,0 +1,102 @@ +use std::sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Mutex, +}; +use std::time::{Duration, Instant}; + +#[derive(Debug)] +pub struct CircuitBreaker { + state: AtomicBool, // true = Open, false = Closed + failure_count: AtomicUsize, + last_failure_time: Mutex>, + threshold: usize, + reset_timeout: Duration, +} + +impl CircuitBreaker { + /// Creating a new CircuitBreaker with the given failure threshold and reset timeout. + pub fn new(threshold: usize, reset_timeout: Duration) -> Self { + Self { + state: AtomicBool::new(false), + failure_count: AtomicUsize::new(0), + last_failure_time: Mutex::new(None), + threshold, + reset_timeout, + } + } + + pub fn is_open(&self) -> bool { + if self.state.load(Ordering::Relaxed) { + let mut last_failure_time = self.last_failure_time.lock().unwrap(); + if let Some(last_time) = *last_failure_time { + if last_time.elapsed() > self.reset_timeout { + self.state.store(false, Ordering::Relaxed); + self.failure_count.store(0, Ordering::Relaxed); + *last_failure_time = None; + return false; + } + } + true + } else { + false + } + } + + pub fn record_failure(&self) { + let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1; + if failures >= self.threshold { + self.state.store(true, Ordering::Relaxed); + let mut last_failure_time = self.last_failure_time.lock().unwrap(); + *last_failure_time = Some(Instant::now()); + } + } + + pub fn record_success(&self) { + self.failure_count.store(0, Ordering::Relaxed); + self.state.store(false, Ordering::Relaxed); + } + + pub fn call(&self, f: F) -> Result>, String> + where + F: FnOnce() -> Result, + { + if self.is_open() { + return Ok(None); + } + + let result = f(); + match result { + Ok(_) => self.record_success(), + Err(_) => self.record_failure(), + } + + Ok(Some(result)) + } +} + +// pub fn request(dice: u32) -> Result { +// if dice > 6 { +// Err("400: Bad request.".to_string()) +// } else { +// Ok(dice) +// } +// } + +// // Example usage +// fn main() { +// let breaker = CircuitBreaker::new(3, Duration::from_secs(5)); + +// for i in 1..=10 { +// let result = breaker.call(|| request(i)); + +// match result { +// Ok(Some(Ok(value))) => println!("Request succeeded with value: {}", value), +// Ok(Some(Err(err))) => println!("Request failed: {}", err), +// Ok(None) => println!("Circuit breaker is open."), +// Err(err) => println!("Error: {}", err), +// } + +// // Simulate a delay between requests +// thread::sleep(Duration::from_millis(500)); +// } +// } diff --git a/crates/web-plugins/didcomm-messaging/shared/src/lib.rs b/crates/web-plugins/didcomm-messaging/shared/src/lib.rs index ec524725..852a5c24 100644 --- a/crates/web-plugins/didcomm-messaging/shared/src/lib.rs +++ b/crates/web-plugins/didcomm-messaging/shared/src/lib.rs @@ -4,3 +4,4 @@ pub mod repository; pub mod retry; pub mod state; pub mod utils; +pub mod CircuitBreaker; \ No newline at end of file diff --git a/crates/web-plugins/didcomm-messaging/shared/src/state.rs b/crates/web-plugins/didcomm-messaging/shared/src/state.rs index 17aa6b02..40dadce9 100644 --- a/crates/web-plugins/didcomm-messaging/shared/src/state.rs +++ b/crates/web-plugins/didcomm-messaging/shared/src/state.rs @@ -1,11 +1,11 @@ use database::Repository; use did_utils::didcore::Document; use keystore::Secrets; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use crate::{ repository::entity::{Connection, RoutedMessage}, - utils::resolvers::{LocalDIDResolver, LocalSecretsResolver}, + utils::resolvers::{LocalDIDResolver, LocalSecretsResolver}, CircuitBreaker::CircuitBreaker, }; #[derive(Clone)] @@ -23,6 +23,8 @@ pub struct AppState { // Persistence layer pub repository: Option, + pub circuit_breaker: Arc, + // disclosed protocols `https://org.didcomm.com/{protocol-name}/{version}/{request-type}`` pub supported_protocols: Option>, } @@ -55,6 +57,7 @@ impl AppState { did_resolver, secrets_resolver, repository, + circuit_breaker: Arc::new(CircuitBreaker::new(3, Duration::from_secs(10))), supported_protocols: disclose_protocols, }) } From ffc396b798d737be8061abddcf238dac581830d1 Mon Sep 17 00:00:00 2001 From: ndefokou Date: Wed, 11 Dec 2024 17:20:24 +0100 Subject: [PATCH 05/14] implementing the circuit breaker --- crates/web-plugins/didcomm-messaging/shared/src/lib.rs | 2 +- crates/web-plugins/didcomm-messaging/shared/src/state.rs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/web-plugins/didcomm-messaging/shared/src/lib.rs b/crates/web-plugins/didcomm-messaging/shared/src/lib.rs index 852a5c24..a777ec3a 100644 --- a/crates/web-plugins/didcomm-messaging/shared/src/lib.rs +++ b/crates/web-plugins/didcomm-messaging/shared/src/lib.rs @@ -1,7 +1,7 @@ +pub mod CircuitBreaker; pub mod errors; pub mod midlw; pub mod repository; pub mod retry; pub mod state; pub mod utils; -pub mod CircuitBreaker; \ No newline at end of file diff --git a/crates/web-plugins/didcomm-messaging/shared/src/state.rs b/crates/web-plugins/didcomm-messaging/shared/src/state.rs index 40dadce9..38e221fa 100644 --- a/crates/web-plugins/didcomm-messaging/shared/src/state.rs +++ b/crates/web-plugins/didcomm-messaging/shared/src/state.rs @@ -5,7 +5,8 @@ use std::{sync::Arc, time::Duration}; use crate::{ repository::entity::{Connection, RoutedMessage}, - utils::resolvers::{LocalDIDResolver, LocalSecretsResolver}, CircuitBreaker::CircuitBreaker, + utils::resolvers::{LocalDIDResolver, LocalSecretsResolver}, + CircuitBreaker::CircuitBreaker, }; #[derive(Clone)] @@ -23,7 +24,7 @@ pub struct AppState { // Persistence layer pub repository: Option, - pub circuit_breaker: Arc, + pub circuit_breaker: Arc, // disclosed protocols `https://org.didcomm.com/{protocol-name}/{version}/{request-type}`` pub supported_protocols: Option>, From 3539283686f6b5a2ef85675bf108b2c18dc75fcb Mon Sep 17 00:00:00 2001 From: ndefokou Date: Wed, 11 Dec 2024 17:42:19 +0100 Subject: [PATCH 06/14] implementing the circuit breaker --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 16637fd2..2945e3c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ members = [ "crates/web-plugins/*", "crates/web-plugins/didcomm-messaging/did-utils", "crates/web-plugins/didcomm-messaging/shared", - "crates/web-plugins/didcomm-messaging/protocols/*", + "crates/web-plugins/didcomm-messaging/protocols/*", "crates/circuit-breaker", ] [workspace.dependencies] From 124547cfcc0ba3f5d67bd373d24f326f2b6cbb7b Mon Sep 17 00:00:00 2001 From: ndefokou Date: Wed, 11 Dec 2024 17:58:18 +0100 Subject: [PATCH 07/14] fixe() test --- .../didcomm-messaging/protocols/pickup/src/handler.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs index 7ec7b3cb..fa332148 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs @@ -556,9 +556,9 @@ mod tests { let error = handle_status_request(state, invalid_request) .await .unwrap_err(); - - assert_eq!(error, PickupError::MissingClientConnection); - } + + assert_eq!(error.to_string(), "Failed to retrieve client connection"); + } #[tokio::test] async fn test_handle_delivery_request_with_recipient_did_in_keylist() { From ae52335eb386f61b41fed9f503461b5de3ef7027 Mon Sep 17 00:00:00 2001 From: ndefokou Date: Wed, 11 Dec 2024 18:00:03 +0100 Subject: [PATCH 08/14] fixe() file formating --- .../didcomm-messaging/protocols/pickup/src/handler.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs index fa332148..cee169ad 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs @@ -556,9 +556,9 @@ mod tests { let error = handle_status_request(state, invalid_request) .await .unwrap_err(); - + assert_eq!(error.to_string(), "Failed to retrieve client connection"); - } + } #[tokio::test] async fn test_handle_delivery_request_with_recipient_did_in_keylist() { From 6c1b4be714f8f09792ea9c777420434cc9c62852 Mon Sep 17 00:00:00 2001 From: ndefokou Date: Thu, 12 Dec 2024 12:32:58 +0100 Subject: [PATCH 09/14] fixe() removing tuples --- .../src/handler/stateful.rs | 7 ++--- .../protocols/pickup/src/error.rs | 4 +++ .../protocols/pickup/src/handler.rs | 4 +-- .../shared/src/CircuitBreaker.rs | 27 ------------------- 4 files changed, 10 insertions(+), 32 deletions(-) diff --git a/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs b/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs index ce5a54fb..1ac5b86e 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs @@ -23,6 +23,7 @@ use shared::{ repository::entity::Connection, retry::{retry_async, RetryOptions}, state::{AppState, AppStateRepository}, + CircuitBreaker::CircuitBreaker, }; use std::{sync::Arc, time::Duration}; use uuid::Uuid; @@ -110,7 +111,7 @@ pub(crate) async fn process_mediate_request( RetryOptions::new() .retries(5) .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(1)), + .max_delay(Duration::from_secs(2)), ) .await .map_err(|err| { @@ -268,7 +269,7 @@ pub(crate) async fn process_plain_keylist_update_message( RetryOptions::new() .retries(5) .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(1)), + .max_delay(Duration::from_secs(2)), ) .await .map_err(|err| { @@ -404,7 +405,7 @@ pub(crate) async fn process_plain_keylist_query_message( RetryOptions::new() .retries(5) .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(1)), + .max_delay(Duration::from_secs(2)), ) .await .map_err(|err| { diff --git a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/error.rs b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/error.rs index 4fd25b89..da67b383 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/error.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/error.rs @@ -14,6 +14,9 @@ pub(crate) enum PickupError { #[error("Malformed request. {0}")] MalformedRequest(String), + + #[error("Service unavailable")] + CircuitOpen, } impl IntoResponse for PickupError { @@ -22,6 +25,7 @@ impl IntoResponse for PickupError { PickupError::MissingSenderDID | PickupError::MalformedRequest(_) => { StatusCode::BAD_REQUEST } + PickupError::CircuitOpen => StatusCode::SERVICE_UNAVAILABLE, PickupError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR, PickupError::MissingClientConnection => StatusCode::UNAUTHORIZED, }; diff --git a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs index cee169ad..e5f281fd 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs @@ -105,7 +105,7 @@ pub(crate) async fn handle_delivery_request( RetryOptions::new() .retries(5) .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(1)), + .max_delay(Duration::from_secs(2)), ) .await?; @@ -211,7 +211,7 @@ pub(crate) async fn handle_message_acknowledgement( RetryOptions::new() .retries(5) .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(1)), + .max_delay(Duration::from_secs(2)), ) .await .map_err(|_| { diff --git a/crates/web-plugins/didcomm-messaging/shared/src/CircuitBreaker.rs b/crates/web-plugins/didcomm-messaging/shared/src/CircuitBreaker.rs index cc05079c..836a11d3 100644 --- a/crates/web-plugins/didcomm-messaging/shared/src/CircuitBreaker.rs +++ b/crates/web-plugins/didcomm-messaging/shared/src/CircuitBreaker.rs @@ -73,30 +73,3 @@ impl CircuitBreaker { Ok(Some(result)) } } - -// pub fn request(dice: u32) -> Result { -// if dice > 6 { -// Err("400: Bad request.".to_string()) -// } else { -// Ok(dice) -// } -// } - -// // Example usage -// fn main() { -// let breaker = CircuitBreaker::new(3, Duration::from_secs(5)); - -// for i in 1..=10 { -// let result = breaker.call(|| request(i)); - -// match result { -// Ok(Some(Ok(value))) => println!("Request succeeded with value: {}", value), -// Ok(Some(Err(err))) => println!("Request failed: {}", err), -// Ok(None) => println!("Circuit breaker is open."), -// Err(err) => println!("Error: {}", err), -// } - -// // Simulate a delay between requests -// thread::sleep(Duration::from_millis(500)); -// } -// } From 6bc341b0000d659a7fe3024f42414d29b0385bf8 Mon Sep 17 00:00:00 2001 From: ndefokou Date: Mon, 16 Dec 2024 11:18:19 +0100 Subject: [PATCH 10/14] fixe() removing un use crate --- Cargo.toml | 2 +- crates/circuit-breaker/Cargo.toml | 6 ------ crates/circuit-breaker/src/lib.rs | 14 -------------- 3 files changed, 1 insertion(+), 21 deletions(-) delete mode 100644 crates/circuit-breaker/Cargo.toml delete mode 100644 crates/circuit-breaker/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 2945e3c2..16637fd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ members = [ "crates/web-plugins/*", "crates/web-plugins/didcomm-messaging/did-utils", "crates/web-plugins/didcomm-messaging/shared", - "crates/web-plugins/didcomm-messaging/protocols/*", "crates/circuit-breaker", + "crates/web-plugins/didcomm-messaging/protocols/*", ] [workspace.dependencies] diff --git a/crates/circuit-breaker/Cargo.toml b/crates/circuit-breaker/Cargo.toml deleted file mode 100644 index 905af9b2..00000000 --- a/crates/circuit-breaker/Cargo.toml +++ /dev/null @@ -1,6 +0,0 @@ -[package] -name = "circuit-breaker" -version = "0.1.0" -edition = "2021" - -[dependencies] diff --git a/crates/circuit-breaker/src/lib.rs b/crates/circuit-breaker/src/lib.rs deleted file mode 100644 index 7d12d9af..00000000 --- a/crates/circuit-breaker/src/lib.rs +++ /dev/null @@ -1,14 +0,0 @@ -pub fn add(left: usize, right: usize) -> usize { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} From 3885322844df02cffd4bc7493a6d8409bb1e815f Mon Sep 17 00:00:00 2001 From: ndefokou Date: Wed, 18 Dec 2024 16:34:31 +0100 Subject: [PATCH 11/14] fixe()integrating the circuit breaker on the different protocols --- .../protocols/forward/Cargo.toml | 1 + .../protocols/forward/src/handler.rs | 130 +-- .../protocols/forward/src/plugin.rs | 13 +- .../mediator-coordination/src/errors.rs | 3 + .../src/handler/stateful.rs | 774 ++++++++++-------- .../mediator-coordination/src/plugin.rs | 26 +- .../protocols/pickup/Cargo.toml | 1 + .../protocols/pickup/src/handler.rs | 635 ++++++++------ .../protocols/pickup/src/plugin.rs | 26 +- .../shared/src/CircuitBreaker.rs | 75 -- .../shared/src/circuit_breaker.rs | 90 ++ .../didcomm-messaging/shared/src/lib.rs | 2 +- .../didcomm-messaging/shared/src/state.rs | 4 +- 13 files changed, 1056 insertions(+), 724 deletions(-) delete mode 100644 crates/web-plugins/didcomm-messaging/shared/src/CircuitBreaker.rs create mode 100644 crates/web-plugins/didcomm-messaging/shared/src/circuit_breaker.rs diff --git a/crates/web-plugins/didcomm-messaging/protocols/forward/Cargo.toml b/crates/web-plugins/didcomm-messaging/protocols/forward/Cargo.toml index 85278439..043bf1aa 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/forward/Cargo.toml +++ b/crates/web-plugins/didcomm-messaging/protocols/forward/Cargo.toml @@ -17,6 +17,7 @@ thiserror.workspace = true didcomm = { workspace = true, features = ["uniffi"] } hyper = { workspace = true, features = ["full"] } axum = { workspace = true, features = ["macros"] } +tokio = "1.27.0" [dev-dependencies] keystore = { workspace = true, features = ["test-utils"] } diff --git a/crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs b/crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs index 0b260527..7cb1aafd 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs @@ -4,78 +4,84 @@ use didcomm::{AttachmentData, Message}; use mongodb::bson::doc; use serde_json::{json, Value}; use shared::{ + circuit_breaker::CircuitBreaker, repository::entity::{Connection, RoutedMessage}, retry::{retry_async, RetryOptions}, state::{AppState, AppStateRepository}, }; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; /// Mediator receives forwarded messages, extract the next field in the message body, and the attachments in the message /// then stores the attachment with the next field as key for pickup pub(crate) async fn mediator_forward_process( state: Arc, message: Message, + circuit_breaker: Arc>, ) -> Result, ForwardError> { - let AppStateRepository { - message_repository, - connection_repository, - .. - } = state - .repository - .as_ref() - .ok_or_else(|| ForwardError::InternalServerError)?; - - let circuit_breaker = state.circuit_breaker.clone(); - if circuit_breaker.is_open() { - return Err(ForwardError::CircuitOpen); - } - - let next = match checks(&message, connection_repository).await.ok() { - Some(next) => Ok(next), - None => Err(ForwardError::InternalServerError), - }; - - let attachments = message.attachments.unwrap_or_default(); - for attachment in attachments { - let attached = match attachment.data { - AttachmentData::Json { value: data } => data.json, - AttachmentData::Base64 { value: data } => json!(data.base64), - AttachmentData::Links { value: data } => json!(data.links), - }; - - let result = retry_async( - || { - let attached = attached.clone(); - let recipient_did = next.as_ref().unwrap().to_owned(); - - async move { - message_repository - .store(RoutedMessage { - id: None, - message: attached, - recipient_did, - }) - .await + let mut cb = circuit_breaker.lock().await; + + let result = cb + .call_async(|| { + let state = Arc::clone(&state); + let message = message.clone(); + async move { + let AppStateRepository { + message_repository, + connection_repository, + .. + } = state + .repository + .as_ref() + .ok_or_else(|| ForwardError::InternalServerError)?; + + let next = match checks(&message, connection_repository).await.ok() { + Some(next) => Ok(next), + None => Err(ForwardError::InternalServerError), + }?; + + let attachments = message.attachments.unwrap_or_default(); + for attachment in attachments { + let attached = match attachment.data { + AttachmentData::Json { value: data } => data.json, + AttachmentData::Base64 { value: data } => json!(data.base64), + AttachmentData::Links { value: data } => json!(data.links), + }; + retry_async( + || { + let attached = attached.clone(); + let recipient_did = next.to_owned(); + + async move { + message_repository + .store(RoutedMessage { + id: None, + message: attached, + recipient_did, + }) + .await + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(1)), + ) + .await + .map_err(|_| ForwardError::InternalServerError)?; } - }, - RetryOptions::new() - .retries(5) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(1)), - ) + Ok::, ForwardError>(None) + } + }) .await; - match result { - Ok(_) => circuit_breaker.record_success(), - Err(_) => { - circuit_breaker.record_failure(); - return Err(ForwardError::InternalServerError); - } - }; + match result { + Some(Ok(None)) => Ok(None), + Some(Ok(Some(_))) => Err(ForwardError::InternalServerError), + Some(Err(err)) => Err(err), + None => Err(ForwardError::CircuitOpen), } - - Ok(None) } async fn checks( @@ -113,6 +119,7 @@ mod test { use keystore::Secrets; use serde_json::json; use shared::{ + circuit_breaker, repository::{ entity::Connection, tests::{MockConnectionRepository, MockMessagesRepository}, @@ -196,9 +203,16 @@ mod test { .await .expect("Unable unpack"); - let msg = mediator_forward_process(Arc::new(state.clone()), msg) - .await - .unwrap(); + // Wrap the CircuitBreaker in Arc and Mutex + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + + let msg: Option = mediator_forward_process( + Arc::new(state.clone()), + msg, + Arc::new(circuit_breaker.into()), + ) + .await + .unwrap(); println!("Mediator1 is forwarding message \n{:?}\n", msg); } diff --git a/crates/web-plugins/didcomm-messaging/protocols/forward/src/plugin.rs b/crates/web-plugins/didcomm-messaging/protocols/forward/src/plugin.rs index 75744505..86325bc2 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/forward/src/plugin.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/forward/src/plugin.rs @@ -3,8 +3,9 @@ use async_trait::async_trait; use axum::response::{IntoResponse, Response}; use didcomm::Message; use message_api::{MessageHandler, MessagePlugin, MessageRouter}; -use shared::state::AppState; -use std::sync::Arc; +use shared::{circuit_breaker::CircuitBreaker, state::AppState}; +use std::{sync::Arc, time::Duration}; +use tokio::sync::Mutex; pub struct RoutingProtocol; @@ -17,7 +18,13 @@ impl MessageHandler for ForwardHandler { state: Arc, msg: Message, ) -> Result, Response> { - crate::handler::mediator_forward_process(state, msg) + let circuit_breaker = Arc::new(Mutex::new(CircuitBreaker::new( + 2, + Duration::from_millis(5000), + ))); + + // Pass the state, msg, and the circuit_breaker as arguments + crate::handler::mediator_forward_process(state, msg, circuit_breaker) .await .map_err(|e| e.into_response()) } diff --git a/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/errors.rs b/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/errors.rs index 7e01a08f..2a15b917 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/errors.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/errors.rs @@ -15,6 +15,8 @@ pub(crate) enum MediationError { UnexpectedMessageFormat, #[error("internal server error")] InternalServerError, + #[error("service unavailable")] + CircuitOpen, } impl IntoResponse for MediationError { @@ -26,6 +28,7 @@ impl IntoResponse for MediationError { MediationError::UncoordinatedSender => StatusCode::UNAUTHORIZED, MediationError::UnexpectedMessageFormat => StatusCode::BAD_REQUEST, MediationError::InternalServerError => StatusCode::INTERNAL_SERVER_ERROR, + MediationError::CircuitOpen => StatusCode::SERVICE_UNAVAILABLE, }; let body = Json(serde_json::json!({ diff --git a/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs b/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs index 1ac5b86e..e02e718c 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs @@ -19,19 +19,21 @@ use keystore::Secrets; use mongodb::bson::doc; use serde_json::json; use shared::{ + circuit_breaker::CircuitBreaker, midlw::ensure_transport_return_route_is_decorated_all, repository::entity::Connection, retry::{retry_async, RetryOptions}, state::{AppState, AppStateRepository}, - CircuitBreaker::CircuitBreaker, }; use std::{sync::Arc, time::Duration}; +use tokio::sync::Mutex; use uuid::Uuid; /// Process a DIDComm mediate request pub(crate) async fn process_mediate_request( state: Arc, plain_message: Message, + circuit_breaker: Arc>, ) -> Result, MediationError> { // This is to Check message type compliance ensure_jwm_type_is_mediation_request(&plain_message)?; @@ -44,140 +46,165 @@ pub(crate) async fn process_mediate_request( let sender_did = plain_message.from.as_ref().unwrap(); - // Retrieve repository to connection entities + // Acquire the CircuitBreaker lock + let mut cb = circuit_breaker.lock().await; - let AppStateRepository { - connection_repository, - .. - } = state - .repository - .as_ref() - .ok_or(MediationError::InternalServerError)?; - - // If there is already mediation, send mediate deny - if let Some(_connection) = retry_async( - || { + // Wrap the process logic in the CircuitBreaker call + let result = cb + .call_async(|| { + let state = state.clone(); let sender_did = sender_did.clone(); - let connection_repository = connection_repository.clone(); + let mediator_did = mediator_did.clone(); async move { - connection_repository - .find_one_by(doc! { "client_did": sender_did }) + // Retrieve repository to connection entities + // Retrieve repository to connection entities + + // Retrieve repository to connection entities + + let AppStateRepository { + connection_repository, + .. + } = state + .repository + .as_ref() + .ok_or(MediationError::InternalServerError)?; + + // If there is already mediation, send mediate deny + if let Some(_connection) = retry_async( + || { + let sender_did = sender_did.clone(); + let connection_repository = connection_repository.clone(); + + async move { + connection_repository + .find_one_by(doc! { "client_did": sender_did }) + .await + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(1)), + ) + .await + .map_err(|_| MediationError::InternalServerError)? + { + tracing::info!("Sending mediate deny."); + return Ok(Some( + Message::build( + format!("urn:uuid:{}", Uuid::new_v4()), + MEDIATE_DENY_2_0.to_string(), + json!(MediationDeny { + id: format!("urn:uuid:{}", Uuid::new_v4()), + message_type: MEDIATE_DENY_2_0.to_string(), + ..Default::default() + }), + ) + .to(sender_did.clone()) + .from(mediator_did.clone()) + .finalize(), + )); + } else { + // Issue mediate grant response + tracing::info!("Sending mediate grant."); + // Create routing, store it and send mediation grant + let (routing_did, auth_keys, agreem_keys) = + generate_did_peer(state.public_domain.to_string()); + + let AppStateRepository { keystore, .. } = state + .repository + .as_ref() + .ok_or(MediationError::InternalServerError)?; + + let diddoc = retry_async( + || { + let did_resolver = state.did_resolver.clone(); + let routing_did = routing_did.clone(); + + async move { did_resolver.resolve(&routing_did).await.map_err(|_| ()) } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(2)), + ) .await - } - }, - RetryOptions::new() - .retries(5) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(1)), - ) - .await - .map_err(|_| MediationError::InternalServerError)? - { - tracing::info!("Sending mediate deny."); - return Ok(Some( - Message::build( - format!("urn:uuid:{}", Uuid::new_v4()), - MEDIATE_DENY_2_0.to_string(), - json!(MediationDeny { - id: format!("urn:uuid:{}", Uuid::new_v4()), - message_type: MEDIATE_DENY_2_0.to_string(), - ..Default::default() - }), - ) - .to(sender_did.clone()) - .from(mediator_did.clone()) - .finalize(), - )); - } else { - /* Issue mediate grant response */ - tracing::info!("Sending mediate grant."); - // Create routing, store it and send mediation grant - let (routing_did, auth_keys, agreem_keys) = - generate_did_peer(state.public_domain.to_string()); - - let AppStateRepository { keystore, .. } = state - .repository - .as_ref() - .ok_or(MediationError::InternalServerError)?; - - let diddoc = retry_async( - || { - let did_resolver = state.did_resolver.clone(); - let routing_did = routing_did.clone(); - - async move { did_resolver.resolve(&routing_did).await.map_err(|_| ()) } - }, - RetryOptions::new() - .retries(5) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(2)), - ) - .await - .map_err(|err| { - tracing::error!("Failed to resolve DID: {:?}", err); - MediationError::InternalServerError - })? - .ok_or(MediationError::InternalServerError)?; - - let agreem_keys_jwk: Jwk = agreem_keys.try_into().unwrap(); - - let agreem_keys_secret = Secrets { - id: None, - kid: diddoc.key_agreement.get(0).unwrap().clone(), - secret_material: agreem_keys_jwk, - }; - - match keystore.store(agreem_keys_secret).await { - Ok(_stored_connection) => { - tracing::info!("Successfully stored agreement keys.") - } - Err(error) => tracing::error!("Error storing agreement keys: {:?}", error), - } + .map_err(|err| { + tracing::error!("Failed to resolve DID: {:?}", err); + MediationError::InternalServerError + })? + .ok_or(MediationError::InternalServerError)?; + + let agreem_keys_jwk: Jwk = agreem_keys.try_into().unwrap(); + + let agreem_keys_secret = Secrets { + id: None, + kid: diddoc.key_agreement.get(0).unwrap().clone(), + secret_material: agreem_keys_jwk, + }; + + match keystore.store(agreem_keys_secret).await { + Ok(_stored_connection) => { + tracing::info!("Successfully stored agreement keys.") + } + Err(error) => tracing::error!("Error storing agreement keys: {:?}", error), + } - let auth_keys_jwk: Jwk = auth_keys.try_into().unwrap(); + let auth_keys_jwk: Jwk = auth_keys.try_into().unwrap(); + + let auth_keys_secret = Secrets { + id: None, + kid: diddoc.authentication.get(0).unwrap().clone(), + secret_material: auth_keys_jwk, + }; + + match keystore.store(auth_keys_secret).await { + Ok(_stored_connection) => { + tracing::info!("Successfully stored authentication keys.") + } + Err(error) => { + tracing::error!("Error storing authentication keys: {:?}", error) + } + } - let auth_keys_secret = Secrets { - id: None, - kid: diddoc.authentication.get(0).unwrap().clone(), - secret_material: auth_keys_jwk, - }; + let mediation_grant = create_mediation_grant(&routing_did); + + let new_connection = Connection { + id: None, + client_did: sender_did.to_string(), + mediator_did: mediator_did.to_string(), + keylist: vec!["".to_string()], + routing_did: routing_did, + }; + + // Use store_one to store the sample connection + match connection_repository.store(new_connection).await { + Ok(_stored_connection) => { + tracing::info!("Successfully stored connection: ") + } + Err(error) => tracing::error!("Error storing connection: {:?}", error), + } - match keystore.store(auth_keys_secret).await { - Ok(_stored_connection) => { - tracing::info!("Successfully stored authentication keys.") + Ok(Some( + Message::build( + format!("urn:uuid:{}", Uuid::new_v4()), + mediation_grant.message_type.clone(), + json!(mediation_grant), + ) + .to(sender_did.clone()) + .from(mediator_did.clone()) + .finalize(), + )) + } } - Err(error) => tracing::error!("Error storing authentication keys: {:?}", error), - } - - let mediation_grant = create_mediation_grant(&routing_did); - - let new_connection = Connection { - id: None, - client_did: sender_did.to_string(), - mediator_did: mediator_did.to_string(), - keylist: vec!["".to_string()], - routing_did: routing_did, - }; + }) + .await; - // Use store_one to store the sample connection - match connection_repository.store(new_connection).await { - Ok(_stored_connection) => { - tracing::info!("Successfully stored connection: ") - } - Err(error) => tracing::error!("Error storing connection: {:?}", error), - } - - Ok(Some( - Message::build( - format!("urn:uuid:{}", Uuid::new_v4()), - mediation_grant.message_type.clone(), - json!(mediation_grant), - ) - .to(sender_did.clone()) - .from(mediator_did.clone()) - .finalize(), - )) + match result { + Some(Ok(response)) => Ok(response), + Some(Err(err)) => Err(err), + None => Err(MediationError::CircuitOpen), } } @@ -230,6 +257,7 @@ fn generate_did_peer(service_endpoint: String) -> (String, Ed25519KeyPair, X2551 pub(crate) async fn process_plain_keylist_update_message( state: Arc, message: Message, + circuit_breaker: Arc>, ) -> Result, MediationError> { // Extract message sender @@ -242,214 +270,247 @@ pub(crate) async fn process_plain_keylist_update_message( let keylist_update_body: KeylistUpdateBody = serde_json::from_value(message.body) .map_err(|_| MediationError::UnexpectedMessageFormat)?; - // Retrieve repository to connection entities - - let AppStateRepository { - connection_repository, - .. - } = state - .repository - .as_ref() - .ok_or(MediationError::InternalServerError)?; + let mut cb = circuit_breaker.lock().await; - // Find connection for this keylist update - - let connection = retry_async( - || { - let connection_repository = connection_repository.clone(); + let result = cb + .call_async(|| { + let state = state.clone(); let sender = sender.clone(); + let keylist_update_body = keylist_update_body.clone(); async move { - connection_repository - .find_one_by(doc! { "client_did": &sender }) - .await - .map_err(|_| ()) - } - }, - RetryOptions::new() - .retries(5) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(2)), - ) - .await - .map_err(|err| { - tracing::error!("Failed to find connection after retries: {:?}", err); - MediationError::InternalServerError - })? - .ok_or_else(|| MediationError::UncoordinatedSender)?; - - // Prepare handles to relevant collections - - let mut updated_keylist = connection.keylist.clone(); - let updates = keylist_update_body.updates; - - // Closure to check if a specific key is duplicated across commands - - let key_is_duplicate = |recipient_did| { - updates - .iter() - .filter(|e| &e.recipient_did == recipient_did) - .count() - > 1 - }; - - // Perform updates to persist - - let confirmations: Vec<_> = updates - .iter() - .map(|update| KeylistUpdateConfirmation { - recipient_did: update.recipient_did.clone(), - action: update.action.clone(), - result: { - if let KeylistUpdateAction::Unknown(_) = &update.action { - KeylistUpdateResult::ClientError - } else if key_is_duplicate(&update.recipient_did) { - KeylistUpdateResult::ClientError - } else { - match connection - .keylist + let AppStateRepository { + connection_repository, + .. + } = state + .repository + .as_ref() + .ok_or(MediationError::InternalServerError)?; + + // Find connection for this keylist update + + let connection = retry_async( + || { + let connection_repository = connection_repository.clone(); + let sender = sender.clone(); + + async move { + connection_repository + .find_one_by(doc! { "client_did": &sender }) + .await + .map_err(|_| ()) + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(2)), + ) + .await + .map_err(|err| { + tracing::error!("Failed to find connection after retries: {:?}", err); + MediationError::InternalServerError + })? + .ok_or_else(|| MediationError::UncoordinatedSender)?; + + // Prepare handles to relevant collections + + let mut updated_keylist = connection.keylist.clone(); + let updates = keylist_update_body.updates; + + // Closure to check if a specific key is duplicated across commands + + let key_is_duplicate = |recipient_did| { + updates .iter() - .position(|x| x == &update.recipient_did) - { - Some(index) => match &update.action { - KeylistUpdateAction::Add => KeylistUpdateResult::NoChange, - KeylistUpdateAction::Remove => { - updated_keylist.swap_remove(index); - KeylistUpdateResult::Success + .filter(|e| &e.recipient_did == recipient_did) + .count() + > 1 + }; + + // Process keylist updates + let confirmations: Vec<_> = updates + .iter() + .map(|update| KeylistUpdateConfirmation { + recipient_did: update.recipient_did.clone(), + action: update.action.clone(), + result: { + if let KeylistUpdateAction::Unknown(_) = &update.action { + KeylistUpdateResult::ClientError + } else if key_is_duplicate(&update.recipient_did) { + KeylistUpdateResult::ClientError + } else { + match connection + .keylist + .iter() + .position(|x| x == &update.recipient_did) + { + Some(index) => match &update.action { + KeylistUpdateAction::Add => KeylistUpdateResult::NoChange, + KeylistUpdateAction::Remove => { + updated_keylist.swap_remove(index); + KeylistUpdateResult::Success + } + KeylistUpdateAction::Unknown(_) => unreachable!(), + }, + None => match &update.action { + KeylistUpdateAction::Add => { + updated_keylist.push(update.recipient_did.clone()); + KeylistUpdateResult::Success + } + KeylistUpdateAction::Remove => { + KeylistUpdateResult::NoChange + } + KeylistUpdateAction::Unknown(_) => unreachable!(), + }, + } } - KeylistUpdateAction::Unknown(_) => unreachable!(), }, - None => match &update.action { - KeylistUpdateAction::Add => { - updated_keylist.push(update.recipient_did.clone()); - KeylistUpdateResult::Success + }) + .collect(); + + let confirmations = match connection_repository + .update(Connection { + keylist: updated_keylist, + ..connection + }) + .await + { + Ok(_) => confirmations, + Err(_) => confirmations + .into_iter() + .map(|mut confirmation| { + if confirmation.result != KeylistUpdateResult::ClientError { + confirmation.result = KeylistUpdateResult::ServerError } - KeylistUpdateAction::Remove => KeylistUpdateResult::NoChange, - KeylistUpdateAction::Unknown(_) => unreachable!(), - }, - } - } - }, - }) - .collect(); - - // Persist updated keylist, update confirmations if server error - let confirmations = match connection_repository - .update(Connection { - keylist: updated_keylist, - ..connection + confirmation + }) + .collect(), + }; + + // Build response + + let mediator_did = &state.diddoc.id; + + Ok(Some( + Message::build( + format!("urn:uuid:{}", Uuid::new_v4()), + KEYLIST_UPDATE_RESPONSE_2_0.to_string(), + json!(KeylistUpdateResponseBody { + updated: confirmations + }), + ) + .to(sender) + .from(mediator_did.to_owned()) + .finalize(), + )) + } }) - .await - { - Ok(_) => confirmations, - Err(_) => confirmations - .into_iter() - .map(|mut confirmation| { - if confirmation.result != KeylistUpdateResult::ClientError { - confirmation.result = KeylistUpdateResult::ServerError - } - - confirmation - }) - .collect(), - }; - - // Build response - - let mediator_did = &state.diddoc.id; + .await; - Ok(Some( - Message::build( - format!("urn:uuid:{}", Uuid::new_v4()), - KEYLIST_UPDATE_RESPONSE_2_0.to_string(), - json!(KeylistUpdateResponseBody { - updated: confirmations - }), - ) - .to(sender) - .from(mediator_did.to_owned()) - .finalize(), - )) + match result { + Some(Ok(response)) => Ok(response), + Some(Err(err)) => Err(err), + None => Err(MediationError::CircuitOpen), + } } pub(crate) async fn process_plain_keylist_query_message( state: Arc, message: Message, + circuit_breaker: Arc>, ) -> Result, MediationError> { println!("Processing keylist query..."); let sender = message .from .expect("unpacking middleware failed to prevent anonymous senders"); - let AppStateRepository { - connection_repository, - .. - } = state - .repository - .as_ref() - .ok_or(MediationError::InternalServerError)?; - - let connection = retry_async( - || { - let connection_repository = connection_repository.clone(); + let mut cb = circuit_breaker.lock().await; + + let result = cb + .call_async(|| { + let state = state.clone(); let sender = sender.clone(); async move { - connection_repository - .find_one_by(doc! { "client_did": &sender }) - .await - .map_err(|_| ()) - } - }, - RetryOptions::new() - .retries(5) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(2)), - ) - .await - .map_err(|err| { - tracing::error!("Failed to find connection after retries: {:?}", err); - MediationError::InternalServerError - })? - .ok_or_else(|| MediationError::UncoordinatedSender)?; - - println!("keylist: {:?}", connection); - - let keylist_entries = connection - .keylist - .iter() - .map(|key| KeylistEntry { - recipient_did: key.clone(), - }) - .collect::>(); - - let body = KeylistBody { - keys: keylist_entries, - pagination: None, - }; + let AppStateRepository { + connection_repository, + .. + } = state + .repository + .as_ref() + .ok_or(MediationError::InternalServerError)?; + + let connection = retry_async( + || { + let connection_repository = connection_repository.clone(); + let sender = sender.clone(); + + async move { + connection_repository + .find_one_by(doc! { "client_did": &sender }) + .await + .map_err(|_| ()) + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(2)), + ) + .await + .map_err(|err| { + tracing::error!("Failed to find connection after retries: {:?}", err); + MediationError::InternalServerError + })? + .ok_or_else(|| MediationError::UncoordinatedSender)?; + + println!("keylist: {:?}", connection); + + let keylist_entries = connection + .keylist + .iter() + .map(|key| KeylistEntry { + recipient_did: key.clone(), + }) + .collect::>(); + + let body = KeylistBody { + keys: keylist_entries, + pagination: None, + }; + + let keylist_object = Keylist { + id: format!("urn:uuid:{}", Uuid::new_v4()), + message_type: KEYLIST_2_0.to_string(), + body: body, + additional_properties: None, + }; - let keylist_object = Keylist { - id: format!("urn:uuid:{}", Uuid::new_v4()), - message_type: KEYLIST_2_0.to_string(), - body: body, - additional_properties: None, - }; + let mediator_did = &state.diddoc.id; - let mediator_did = &state.diddoc.id; + let message = Message::build( + format!("urn:uuid:{}", Uuid::new_v4()), + KEYLIST_2_0.to_string(), + json!(keylist_object), + ) + .to(sender.clone()) + .from(mediator_did.clone()) + .finalize(); - let message = Message::build( - format!("urn:uuid:{}", Uuid::new_v4()), - KEYLIST_2_0.to_string(), - json!(keylist_object), - ) - .to(sender.clone()) - .from(mediator_did.clone()) - .finalize(); + println!("message: {:?}", message); - println!("message: {:?}", message); + Ok(Some(message)) + } + }) + .await; - Ok(Some(message)) + match result { + Some(Ok(response)) => Ok(response), + Some(Err(err)) => Err(err), + None => Err(MediationError::CircuitOpen), + } } #[cfg(test)] @@ -457,7 +518,8 @@ mod tests { use super::*; use shared::{ - repository::tests::MockConnectionRepository, utils::tests_utils::tests as global, + circuit_breaker, repository::tests::MockConnectionRepository, + utils::tests_utils::tests as global, }; #[allow(clippy::needless_update)] @@ -491,11 +553,17 @@ mod tests { .from(global::_edge_did()) .finalize(); + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + // Process request - let response = process_plain_keylist_query_message(Arc::clone(&state), message) - .await - .unwrap() - .expect("Response should not be None"); + let response = process_plain_keylist_query_message( + Arc::clone(&state), + message, + Arc::new(circuit_breaker.into()), + ) + .await + .unwrap() + .expect("Response should not be None"); assert_eq!(response.type_, KEYLIST_2_0); assert_eq!(response.from.unwrap(), global::_mediator_did(&state)); @@ -515,10 +583,16 @@ mod tests { .from("did:example:uncoordinated_sender".to_string()) .finalize(); + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + // Process request - let err = process_plain_keylist_query_message(Arc::clone(&state), message) - .await - .unwrap_err(); + let err = process_plain_keylist_query_message( + Arc::clone(&state), + message, + Arc::new(circuit_breaker.into()), + ) + .await + .unwrap_err(); // Assert issued error for uncoordinated sender assert_eq!(err, MediationError::UncoordinatedSender,); } @@ -551,10 +625,16 @@ mod tests { // Process request - let response = process_plain_keylist_update_message(Arc::clone(&state), message) - .await - .unwrap() - .expect("Response should not be None"); + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + + let response = process_plain_keylist_update_message( + Arc::clone(&state), + message, + Arc::new(circuit_breaker.into()), + ) + .await + .unwrap() + .expect("Response should not be None"); let response = response; // Assert metadata @@ -650,10 +730,16 @@ mod tests { // Process request - let response = process_plain_keylist_update_message(Arc::clone(&state), message) - .await - .unwrap() - .expect("Response should not be None"); + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + + let response = process_plain_keylist_update_message( + Arc::clone(&state), + message, + Arc::new(circuit_breaker.into()), + ) + .await + .unwrap() + .expect("Response should not be None"); // Assert updates assert_eq!( @@ -713,11 +799,16 @@ mod tests { .finalize(); // Process request + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); - let response = process_plain_keylist_update_message(Arc::clone(&state), message) - .await - .unwrap() - .expect("Response should not be None"); + let response = process_plain_keylist_update_message( + Arc::clone(&state), + message, + Arc::new(circuit_breaker.into()), + ) + .await + .unwrap() + .expect("Response should not be None"); // Assert updates assert_eq!( @@ -773,11 +864,16 @@ mod tests { .finalize(); // Process request + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); - let response = process_plain_keylist_update_message(Arc::clone(&state), message) - .await - .unwrap() - .expect("Response should not be None"); + let response = process_plain_keylist_update_message( + Arc::clone(&state), + message, + Arc::new(circuit_breaker.into()), + ) + .await + .unwrap() + .expect("Response should not be None"); // Assert updates @@ -811,9 +907,15 @@ mod tests { .finalize(); // Process request - let err = process_plain_keylist_update_message(Arc::clone(&state), message) - .await - .unwrap_err(); + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + + let err = process_plain_keylist_update_message( + Arc::clone(&state), + message, + Arc::new(circuit_breaker.into()), + ) + .await + .unwrap_err(); // Assert issued error assert_eq!(err, MediationError::UnexpectedMessageFormat,); @@ -857,9 +959,15 @@ mod tests { .finalize(); // Process request - let err = process_plain_keylist_update_message(Arc::clone(&state), message) - .await - .unwrap_err(); + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + + let err = process_plain_keylist_update_message( + Arc::clone(&state), + message, + Arc::new(circuit_breaker.into()), + ) + .await + .unwrap_err(); // Assert issued error assert_eq!(err, MediationError::UncoordinatedSender,); diff --git a/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/plugin.rs b/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/plugin.rs index 3aab4d69..978da015 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/plugin.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/plugin.rs @@ -3,8 +3,9 @@ use async_trait::async_trait; use axum::response::{IntoResponse, Response}; use didcomm::Message; use message_api::{MessageHandler, MessagePlugin, MessageRouter}; -use shared::state::AppState; -use std::sync::Arc; +use shared::{circuit_breaker::CircuitBreaker, state::AppState}; +use std::{sync::Arc, time::Duration}; +use tokio::sync::Mutex; pub struct MediatorCoordinationProtocol; @@ -19,7 +20,12 @@ impl MessageHandler for MediateRequestHandler { state: Arc, msg: Message, ) -> Result, Response> { - crate::handler::stateful::process_mediate_request(state, msg) + let circuit_breaker = Arc::new(Mutex::new(CircuitBreaker::new( + 2, + Duration::from_millis(5000), + ))); + + crate::handler::stateful::process_mediate_request(state, msg, circuit_breaker) .await .map_err(|e| e.into_response()) } @@ -32,7 +38,12 @@ impl MessageHandler for KeylistUpdateHandler { state: Arc, msg: Message, ) -> Result, Response> { - crate::handler::stateful::process_plain_keylist_update_message(state, msg) + let circuit_breaker = Arc::new(Mutex::new(CircuitBreaker::new( + 2, + Duration::from_millis(5000), + ))); + + crate::handler::stateful::process_plain_keylist_update_message(state, msg, circuit_breaker) .await .map_err(|e| e.into_response()) } @@ -45,7 +56,12 @@ impl MessageHandler for KeylistQueryHandler { state: Arc, msg: Message, ) -> Result, Response> { - crate::handler::stateful::process_plain_keylist_query_message(state, msg) + let circuit_breaker = Arc::new(Mutex::new(CircuitBreaker::new( + 2, + Duration::from_millis(5000), + ))); + + crate::handler::stateful::process_plain_keylist_query_message(state, msg, circuit_breaker) .await .map_err(|e| e.into_response()) } diff --git a/crates/web-plugins/didcomm-messaging/protocols/pickup/Cargo.toml b/crates/web-plugins/didcomm-messaging/protocols/pickup/Cargo.toml index 97694183..5b8fd91c 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/pickup/Cargo.toml +++ b/crates/web-plugins/didcomm-messaging/protocols/pickup/Cargo.toml @@ -18,6 +18,7 @@ thiserror.workspace = true async-trait.workspace = true uuid = { workspace = true, features = ["v4"] } axum = { workspace = true, features = ["macros"] } +tokio = "1.27.0" [dev-dependencies] hyper = "0.14.27" diff --git a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs index e5f281fd..36778e81 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs @@ -10,237 +10,320 @@ use didcomm::{Attachment, Message, MessageBuilder}; use mongodb::bson::{doc, oid::ObjectId}; use serde_json::Value; use shared::{ + circuit_breaker::CircuitBreaker, midlw::ensure_transport_return_route_is_decorated_all, repository::entity::{Connection, RoutedMessage}, retry::{retry_async, RetryOptions}, state::{AppState, AppStateRepository}, }; use std::{str::FromStr, sync::Arc, time::Duration}; +use tokio::sync::Mutex; use uuid::Uuid; // Process pickup status request pub(crate) async fn handle_status_request( state: Arc, message: Message, + circuit_breaker: Arc>, ) -> Result, PickupError> { // Validate the return_route header ensure_transport_return_route_is_decorated_all(&message) .map_err(|_| PickupError::MalformedRequest("Missing return_route header".to_owned()))?; let mediator_did = &state.diddoc.id; - let recipient_did = message - .body - .get("recipient_did") - .and_then(|val| val.as_str()); let sender_did = sender_did(&message)?; - let repository = repository(state.clone())?; - - let connection = retry_async( - || { - let repository = repository.clone(); - async move { client_connection(&repository, sender_did).await } - }, - RetryOptions::new() - .retries(5) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(2)), - ) - .await - .map_err(|_| PickupError::InternalError("Failed to retrieve client connection".to_owned()))?; - - let message_count = count_messages(repository, recipient_did, connection).await?; - - let id = Uuid::new_v4().urn().to_string(); - let response_builder: MessageBuilder = StatusResponse { - id: id.as_str(), - type_: STATUS_RESPONSE_3_0, - body: BodyStatusResponse { - recipient_did, - message_count, - live_delivery: Some(false), - ..Default::default() - }, - } - .into(); + let mut cb = circuit_breaker.lock().await; - let response = response_builder - .to(sender_did.to_owned()) - .from(mediator_did.to_owned()) - .finalize(); + let result = cb + .call_async(|| { + let state = state.clone(); + let message = message.clone(); + let circuit_breaker = circuit_breaker.clone(); + async move { + let recipient_did = message + .body + .get("recipient_did") + .and_then(|val| val.as_str()); + + let repository = repository(state.clone())?; + + let connection = retry_async( + || { + let repository = repository.clone(); + async move { client_connection(&repository, sender_did).await } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(2)), + ) + .await + .map_err(|_| { + PickupError::InternalError("Failed to retrieve client connection".to_owned()) + })?; + + // Pass `recipient_did` to count_messages, allowing it to handle `None` + let message_count = count_messages( + repository, + recipient_did, + connection, + circuit_breaker.clone(), + ) + .await?; + + let id = Uuid::new_v4().urn().to_string(); + let response_builder: MessageBuilder = StatusResponse { + id: id.as_str(), + type_: STATUS_RESPONSE_3_0, + body: BodyStatusResponse { + recipient_did: recipient_did.to_owned(), + message_count, + live_delivery: Some(false), + ..Default::default() + }, + } + .into(); + + let response = response_builder + .to(sender_did.to_owned()) + .from(mediator_did.to_owned()) + .finalize(); + + Ok(Some(response)) + } + }) + .await; - Ok(Some(response)) + match result { + Some(Ok(response)) => Ok(response), + Some(Err(err)) => Err(err), + None => Err(PickupError::CircuitOpen), + } } // Process pickup delivery request pub(crate) async fn handle_delivery_request( state: Arc, message: Message, + circuit_breaker: Arc>, ) -> Result, PickupError> { // Validate the return_route header ensure_transport_return_route_is_decorated_all(&message) .map_err(|_| PickupError::MalformedRequest("Missing return_route header".to_owned()))?; let mediator_did = &state.diddoc.id; - let recipient_did = message - .body - .get("recipient_did") - .and_then(|val| val.as_str()); + let sender_did = sender_did(&message)?; let message_body = message.body.clone(); - let limit = retry_async( - || { + let mut cb = circuit_breaker.lock().await; + + let result = cb + .call_async(|| { + let state = state.clone(); let message_body = message_body.clone(); + let circuit_breaker = circuit_breaker.clone(); async move { - message_body - .get("limit") - .and_then(Value::as_u64) - .ok_or_else(|| { - PickupError::MalformedRequest("Invalid \"limit\" specifier".to_owned()) - }) + let recipient_did = message_body.get("recipient_did").and_then(Value::as_str); + + let limit = retry_async( + || { + let message_body = message_body.clone(); + async move { + message_body + .get("limit") + .and_then(Value::as_u64) + .ok_or_else(|| { + PickupError::MalformedRequest( + "Invalid \"limit\" specifier".to_owned(), + ) + }) + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(2)), + ) + .await?; + + let repository = repository(state.clone())?; + let connection = retry_async( + || { + let repository = repository.clone(); + async move { client_connection(&repository, sender_did).await } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(2)), + ) + .await + .map_err(|_| { + PickupError::InternalError("Failed to retrieve client connection".to_owned()) + })?; + + let messages = messages( + repository, + recipient_did, + connection, + limit as usize, + circuit_breaker.clone(), + ) + .await?; + + let response_builder: MessageBuilder; + let id = Uuid::new_v4().urn().to_string(); + + if messages.is_empty() { + response_builder = StatusResponse { + id: id.as_str(), + type_: STATUS_RESPONSE_3_0, + body: BodyStatusResponse { + recipient_did, + message_count: 0, + live_delivery: Some(false), + ..Default::default() + }, + } + .into(); + } else { + let mut attachments: Vec = Vec::with_capacity(messages.len()); + + for message in messages { + let attached = Attachment::json(message.message) + .id(message.id.map(|id| id.to_string()).ok_or_else(|| { + PickupError::InternalError( + "Failed to load requested messages. Please try again later." + .to_owned(), + ) + })?) + .finalize(); + + attachments.push(attached); + } + + response_builder = DeliveryResponse { + id: id.as_str(), + thid: id.as_str(), + type_: MESSAGE_DELIVERY_3_0, + body: BodyDeliveryResponse { recipient_did }, + attachments, + } + .into(); + } + + let response = response_builder + .to(sender_did.to_owned()) + .from(mediator_did.to_owned()) + .finalize(); + + Ok(Some(response)) } - }, - RetryOptions::new() - .retries(5) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(2)), - ) - .await?; - - let repository = repository(state.clone())?; - let connection = retry_async( - || { - let repository = repository.clone(); - async move { client_connection(&repository, sender_did).await } - }, - RetryOptions::new() - .retries(5) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(2)), - ) - .await - .map_err(|_| PickupError::InternalError("Failed to retrieve client connection".to_owned()))?; - - let messages = messages(repository, recipient_did, connection, limit as usize).await?; - - let response_builder: MessageBuilder; - let id = Uuid::new_v4().urn().to_string(); - - if messages.is_empty() { - response_builder = StatusResponse { - id: id.as_str(), - type_: STATUS_RESPONSE_3_0, - body: BodyStatusResponse { - recipient_did, - message_count: 0, - live_delivery: Some(false), - ..Default::default() - }, - } - .into(); - } else { - let mut attachments: Vec = Vec::with_capacity(messages.len()); - - for message in messages { - let attached = Attachment::json(message.message) - .id(message.id.map(|id| id.to_string()).ok_or_else(|| { - PickupError::InternalError( - "Failed to load requested messages. Please try again later.".to_owned(), - ) - })?) - .finalize(); - - attachments.push(attached); - } + }) + .await; - response_builder = DeliveryResponse { - id: id.as_str(), - thid: id.as_str(), - type_: MESSAGE_DELIVERY_3_0, - body: BodyDeliveryResponse { recipient_did }, - attachments, - } - .into(); + match result { + Some(Ok(response)) => Ok(response), + Some(Err(err)) => Err(err), + None => Err(PickupError::CircuitOpen), } - - let response = response_builder - .to(sender_did.to_owned()) - .from(mediator_did.to_owned()) - .finalize(); - - Ok(Some(response)) } // Process pickup messages acknowledgement pub(crate) async fn handle_message_acknowledgement( state: Arc, message: Message, + circuit_breaker: Arc>, ) -> Result, PickupError> { // Validate the return_route header ensure_transport_return_route_is_decorated_all(&message) .map_err(|_| PickupError::MalformedRequest("Missing return_route header".to_owned()))?; - let mediator_did = &state.diddoc.id; - let repository = repository(state.clone())?; - let sender_did = sender_did(&message)?; - let connection = client_connection(&repository, sender_did).await?; - - // Get the message id list - let message_id_list = message - .body - .get("message_id_list") - .and_then(|v| v.as_array()) - .map(|a| a.iter().filter_map(|v| v.as_str()).collect::>()) - .ok_or_else(|| { - PickupError::MalformedRequest("Invalid \"message_id_list\" specifier".to_owned()) - })?; - - for id in message_id_list { - let msg_id = ObjectId::from_str(id) - .map_err(|_| PickupError::MalformedRequest(format!("Invalid message id: {id}")))?; - - retry_async( - || { - let message_repository = repository.message_repository.clone(); - let msg_id = msg_id.clone(); - - async move { message_repository.delete_one(msg_id).await.map_err(|_| ()) } - }, - RetryOptions::new() - .retries(5) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(2)), - ) - .await - .map_err(|_| { - PickupError::InternalError( - "Failed to process the request. Please try again later.".to_owned(), - ) - })?; - } - - let message_count = count_messages(repository, None, connection).await?; - - let id = Uuid::new_v4().urn().to_string(); - let response_builder: MessageBuilder = StatusResponse { - id: id.as_str(), - type_: STATUS_RESPONSE_3_0, - body: BodyStatusResponse { - message_count, - live_delivery: Some(false), - ..Default::default() - }, - } - .into(); + // Acquire the CircuitBreaker lock + let mut cb = circuit_breaker.lock().await; - let response = response_builder - .to(sender_did.to_owned()) - .from(mediator_did.to_owned()) - .finalize(); + // Wrap the message acknowledgement logic in the CircuitBreaker call + let result = cb + .call_async(|| { + let state = state.clone(); + let message = message.clone(); + let circuit_breaker = circuit_breaker.clone(); + async move { + let mediator_did = &state.diddoc.id; + let repository = repository(state.clone())?; + let sender_did = sender_did(&message)?; + let connection = client_connection(&repository, sender_did).await?; + + // Get the message ID list + let message_id_list = message + .body + .get("message_id_list") + .and_then(|v| v.as_array()) + .map(|a| a.iter().filter_map(|v| v.as_str()).collect::>()) + .ok_or_else(|| { + PickupError::MalformedRequest( + "Invalid \"message_id_list\" specifier".to_owned(), + ) + })?; + + for id in message_id_list { + let msg_id = ObjectId::from_str(id).map_err(|_| { + PickupError::MalformedRequest(format!("Invalid message id: {id}")) + })?; + + retry_async( + || { + let message_repository = repository.message_repository.clone(); + let msg_id = msg_id.clone(); + + async move { message_repository.delete_one(msg_id).await.map_err(|_| ()) } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(2)), + ) + .await + .map_err(|_| { + PickupError::InternalError( + "Failed to process the request. Please try again later.".to_owned(), + ) + })?; + } + + let message_count = + count_messages(repository, None, connection, circuit_breaker).await?; + + let id = Uuid::new_v4().urn().to_string(); + let response_builder: MessageBuilder = StatusResponse { + id: id.as_str(), + type_: STATUS_RESPONSE_3_0, + body: BodyStatusResponse { + message_count, + live_delivery: Some(false), + ..Default::default() + }, + } + .into(); + + let response = response_builder + .to(sender_did.to_owned()) + .from(mediator_did.to_owned()) + .finalize(); + + Ok(Some(response)) + } + }) + .await; - Ok(Some(response)) + match result { + Some(Ok(response)) => Ok(response), + Some(Err(err)) => Err(err), + None => Err(PickupError::CircuitOpen), + } } // Process live delivery change request @@ -288,32 +371,50 @@ async fn count_messages( repository: AppStateRepository, recipient_did: Option<&str>, connection: Connection, + circuit_breaker: Arc>, ) -> Result { let recipients = recipients(recipient_did, &connection); - retry_async( - || { + let mut cb = circuit_breaker.lock().await; + + let result = cb + .call_async(|| { let message_repository = repository.message_repository.clone(); let recipients = recipients.clone(); async move { - message_repository - .count_by(doc! { "recipient_did": { "$in": recipients } }) - .await - .map_err(|_| ()) + retry_async( + || { + let message_repository = message_repository.clone(); + let recipients = recipients.clone(); + + async move { + message_repository + .count_by(doc! { "recipient_did": { "$in": recipients } }) + .await + .map_err(|_| ()) + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(2)), + ) + .await + .map_err(|_| { + PickupError::InternalError( + "Failed to process the request. Please try again later.".to_owned(), + ) + }) } - }, - RetryOptions::new() - .retries(5) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(1)), - ) - .await - .map_err(|_| { - PickupError::InternalError( - "Failed to process the request. Please try again later.".to_owned(), - ) - }) + }) + .await; + + match result { + Some(Ok(count)) => Ok(count), + Some(Err(err)) => Err(err), + None => Err(PickupError::CircuitOpen), + } } async fn messages( @@ -321,35 +422,53 @@ async fn messages( recipient_did: Option<&str>, connection: Connection, limit: usize, + circuit_breaker: Arc>, ) -> Result, PickupError> { let recipients = recipients(recipient_did, &connection); - retry_async( - || { + let mut cb = circuit_breaker.lock().await; + + let result = cb + .call_async(|| { let message_repository = repository.message_repository.clone(); let recipients = recipients.clone(); async move { - message_repository - .find_all_by( - doc! { "recipient_did": { "$in": recipients } }, - Some(limit as i64), + retry_async( + || { + let message_repository = message_repository.clone(); + let recipients = recipients.clone(); + + async move { + message_repository + .find_all_by( + doc! { "recipient_did": { "$in": recipients } }, + Some(limit as i64), + ) + .await + .map_err(|_| ()) + } + }, + RetryOptions::new() + .retries(5) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(1)), + ) + .await + .map_err(|_| { + PickupError::InternalError( + "Failed to retrieve messages. Please try again later.".to_owned(), ) - .await - .map_err(|_| ()) + }) } - }, - RetryOptions::new() - .retries(5) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(1)), - ) - .await - .map_err(|_| { - PickupError::InternalError( - "Failed to process the request. Please try again later.".to_owned(), - ) - }) + }) + .await; + + match result { + Some(Ok(messages)) => Ok(messages), + Some(Err(err)) => Err(err), + None => Err(PickupError::CircuitOpen), + } } #[inline] @@ -407,6 +526,7 @@ mod tests { }; use serde_json::json; use shared::{ + circuit_breaker, repository::tests::{MockConnectionRepository, MockMessagesRepository}, utils::tests_utils::tests as global, }; @@ -471,10 +591,15 @@ mod tests { .from(global::_edge_did()) .finalize(); - let response = handle_status_request(Arc::clone(&state), request) - .await - .unwrap() - .expect("Response should not be None"); + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + let response = handle_status_request( + Arc::clone(&state), + request, + Arc::new(circuit_breaker.into()), + ) + .await + .unwrap() + .expect("Response should not be None"); assert_eq!(response.type_, STATUS_RESPONSE_3_0); assert_eq!(response.from.unwrap(), global::_mediator_did(&state)); @@ -500,10 +625,16 @@ mod tests { .from(global::_edge_did()) .finalize(); - let response = handle_status_request(Arc::clone(&state), request) - .await - .unwrap() - .expect("Response should not be None"); + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + + let response = handle_status_request( + Arc::clone(&state), + request, + Arc::new(circuit_breaker.into()), + ) + .await + .unwrap() + .expect("Response should not be None"); assert_eq!( response.body, @@ -527,10 +658,15 @@ mod tests { .from(global::_edge_did()) .finalize(); - let response = handle_status_request(Arc::clone(&state), request) - .await - .unwrap() - .expect("Response should not be None"); + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + let response = handle_status_request( + Arc::clone(&state), + request, + Arc::new(circuit_breaker.into()), + ) + .await + .unwrap() + .expect("Response should not be None"); assert_eq!( response.body, @@ -553,11 +689,13 @@ mod tests { .from("did:key:invalid".to_owned()) .finalize(); - let error = handle_status_request(state, invalid_request) + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + let error = handle_status_request(state, invalid_request, Arc::new(circuit_breaker.into())) .await .unwrap_err(); assert_eq!(error.to_string(), "Failed to retrieve client connection"); + // assert_eq!(error, PickupError::MissingClientConnection); } #[tokio::test] @@ -574,10 +712,15 @@ mod tests { .from(global::_edge_did()) .finalize(); - let response = handle_delivery_request(Arc::clone(&state), request) - .await - .unwrap() - .expect("Response should not be None"); + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + let response = handle_delivery_request( + Arc::clone(&state), + request, + Arc::new(circuit_breaker.into()), + ) + .await + .unwrap() + .expect("Response should not be None"); let expected_attachments = vec![ Attachment::json(json!("test1")) @@ -619,7 +762,8 @@ mod tests { // When the specified recipient did is not in the keylist, // it should return a status response with a message count of 0 - let response = handle_delivery_request(state, request) + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + let response = handle_delivery_request(state, request, Arc::new(circuit_breaker.into())) .await .unwrap() .expect("Response should not be None"); @@ -648,7 +792,9 @@ mod tests { // When the limit is set to 0, it should return all the messages in the queue // and since the recipient did is not specified, it should return the messages // for all the dids in the keylist for that sender connection - let response = handle_delivery_request(state, request) + + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + let response = handle_delivery_request(state, request, Arc::new(circuit_breaker.into())) .await .unwrap() .expect("Response should not be None"); @@ -688,7 +834,9 @@ mod tests { // Since the recipient did is not specified, it should return the messages // for all the dids in the keylist for that sender connection (2 in this case) // The limit is set to 1 so it should return the first message in the queue - let response = handle_delivery_request(state, request) + + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + let response = handle_delivery_request(state, request, Arc::new(circuit_breaker.into())) .await .unwrap() .expect("Response should not be None"); @@ -719,10 +867,13 @@ mod tests { .finalize(); // Should return 2 since these ids are not associated with any message - let response = handle_message_acknowledgement(state, request) - .await - .unwrap() - .expect("Response should not be None"); + + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + let response = + handle_message_acknowledgement(state, request, Arc::new(circuit_breaker.into())) + .await + .unwrap() + .expect("Response should not be None"); assert_eq!(response.type_, STATUS_RESPONSE_3_0); assert_eq!( @@ -747,10 +898,12 @@ mod tests { // Should return 1 since one id in the list is associated // to the first message in the queue and then will be deleted - let response = handle_message_acknowledgement(state, request) - .await - .unwrap() - .expect("Response should not be None"); + let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3)); + let response = + handle_message_acknowledgement(state, request, Arc::new(circuit_breaker.into())) + .await + .unwrap() + .expect("Response should not be None"); assert_eq!(response.type_, STATUS_RESPONSE_3_0); assert_eq!( diff --git a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/plugin.rs b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/plugin.rs index c40df56e..1d312070 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/plugin.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/plugin.rs @@ -5,8 +5,9 @@ use async_trait::async_trait; use axum::response::{IntoResponse, Response}; use didcomm::Message; use message_api::{MessageHandler, MessagePlugin, MessageRouter}; -use shared::state::AppState; -use std::sync::Arc; +use shared::{circuit_breaker::CircuitBreaker, state::AppState}; +use std::{sync::Arc, time::Duration}; +use tokio::sync::Mutex; pub struct PickupProtocol; @@ -22,7 +23,12 @@ impl MessageHandler for StatusRequestHandler { state: Arc, msg: Message, ) -> Result, Response> { - crate::handler::handle_status_request(state, msg) + let circuit_breaker = Arc::new(Mutex::new(CircuitBreaker::new( + 2, + Duration::from_millis(5000), + ))); + + crate::handler::handle_status_request(state, msg, circuit_breaker) .await .map_err(|e| e.into_response()) } @@ -35,7 +41,12 @@ impl MessageHandler for DeliveryRequestHandler { state: Arc, msg: Message, ) -> Result, Response> { - crate::handler::handle_delivery_request(state, msg) + let circuit_breaker = Arc::new(Mutex::new(CircuitBreaker::new( + 2, + Duration::from_millis(5000), + ))); + + crate::handler::handle_delivery_request(state, msg, circuit_breaker) .await .map_err(|e| e.into_response()) } @@ -48,7 +59,12 @@ impl MessageHandler for MessageReceivedHandler { state: Arc, msg: Message, ) -> Result, Response> { - crate::handler::handle_message_acknowledgement(state, msg) + let circuit_breaker = Arc::new(Mutex::new(CircuitBreaker::new( + 2, + Duration::from_millis(5000), + ))); + + crate::handler::handle_message_acknowledgement(state, msg, circuit_breaker) .await .map_err(|e| e.into_response()) } diff --git a/crates/web-plugins/didcomm-messaging/shared/src/CircuitBreaker.rs b/crates/web-plugins/didcomm-messaging/shared/src/CircuitBreaker.rs deleted file mode 100644 index 836a11d3..00000000 --- a/crates/web-plugins/didcomm-messaging/shared/src/CircuitBreaker.rs +++ /dev/null @@ -1,75 +0,0 @@ -use std::sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Mutex, -}; -use std::time::{Duration, Instant}; - -#[derive(Debug)] -pub struct CircuitBreaker { - state: AtomicBool, // true = Open, false = Closed - failure_count: AtomicUsize, - last_failure_time: Mutex>, - threshold: usize, - reset_timeout: Duration, -} - -impl CircuitBreaker { - /// Creating a new CircuitBreaker with the given failure threshold and reset timeout. - pub fn new(threshold: usize, reset_timeout: Duration) -> Self { - Self { - state: AtomicBool::new(false), - failure_count: AtomicUsize::new(0), - last_failure_time: Mutex::new(None), - threshold, - reset_timeout, - } - } - - pub fn is_open(&self) -> bool { - if self.state.load(Ordering::Relaxed) { - let mut last_failure_time = self.last_failure_time.lock().unwrap(); - if let Some(last_time) = *last_failure_time { - if last_time.elapsed() > self.reset_timeout { - self.state.store(false, Ordering::Relaxed); - self.failure_count.store(0, Ordering::Relaxed); - *last_failure_time = None; - return false; - } - } - true - } else { - false - } - } - - pub fn record_failure(&self) { - let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1; - if failures >= self.threshold { - self.state.store(true, Ordering::Relaxed); - let mut last_failure_time = self.last_failure_time.lock().unwrap(); - *last_failure_time = Some(Instant::now()); - } - } - - pub fn record_success(&self) { - self.failure_count.store(0, Ordering::Relaxed); - self.state.store(false, Ordering::Relaxed); - } - - pub fn call(&self, f: F) -> Result>, String> - where - F: FnOnce() -> Result, - { - if self.is_open() { - return Ok(None); - } - - let result = f(); - match result { - Ok(_) => self.record_success(), - Err(_) => self.record_failure(), - } - - Ok(Some(result)) - } -} diff --git a/crates/web-plugins/didcomm-messaging/shared/src/circuit_breaker.rs b/crates/web-plugins/didcomm-messaging/shared/src/circuit_breaker.rs new file mode 100644 index 00000000..2238cd16 --- /dev/null +++ b/crates/web-plugins/didcomm-messaging/shared/src/circuit_breaker.rs @@ -0,0 +1,90 @@ +use std::time::{Duration, Instant}; + +#[derive(Debug)] +enum State { + // The circuit breaker is closed and allowing requests + // to pass through + Closed, + // The circuit breaker is open and blocking requests + Open, + // The circuit breaker is half-open and allowing a limited + // number of requests to pass through + HalfOpen, +} + +pub struct CircuitBreaker { + state: State, + // The duration to wait before transitioning from the + // open state to the half-open state + trip_timeout: Duration, + // The maximum number of requests allowed through in + // the closed state + max_failures: usize, + // The number of consecutive failures in the closed + // state + consecutive_failures: usize, + // The time when the circuit breaker transitioned to the + // open state + opened_at: Option, +} + +impl CircuitBreaker { + pub fn new(max_failures: usize, trip_timeout: Duration) -> CircuitBreaker { + CircuitBreaker { + state: State::Closed, + max_failures, + trip_timeout, + consecutive_failures: 0, + opened_at: None, + } + } + + pub async fn call_async(&mut self, f: F) -> Option> + where + F: FnOnce() -> Fut, + Fut: std::future::Future>, + { + match self.state { + State::Closed => { + if self.consecutive_failures < self.max_failures { + let result = f().await; + if result.is_err() { + self.record_failure(); + } + Some(result) + } else { + self.opened_at = Some(Instant::now()); + self.state = State::Open; + self.consecutive_failures = 0; + None + } + } + State::Open => { + if let Some(opened_at) = self.opened_at { + if Instant::now().duration_since(opened_at) >= self.trip_timeout { + self.state = State::HalfOpen; + self.opened_at = None; + } + } + None + } + State::HalfOpen => { + let result = f().await; + if result.is_err() { + self.state = State::Open; + } else { + self.state = State::Closed; + } + Some(result) + } + } + } + + fn record_failure(&mut self) { + match self.state { + State::Closed => self.consecutive_failures += 1, + State::Open => (), + State::HalfOpen => self.consecutive_failures += 1, + } + } +} diff --git a/crates/web-plugins/didcomm-messaging/shared/src/lib.rs b/crates/web-plugins/didcomm-messaging/shared/src/lib.rs index a777ec3a..77ffa0c8 100644 --- a/crates/web-plugins/didcomm-messaging/shared/src/lib.rs +++ b/crates/web-plugins/didcomm-messaging/shared/src/lib.rs @@ -1,4 +1,4 @@ -pub mod CircuitBreaker; +pub mod circuit_breaker; pub mod errors; pub mod midlw; pub mod repository; diff --git a/crates/web-plugins/didcomm-messaging/shared/src/state.rs b/crates/web-plugins/didcomm-messaging/shared/src/state.rs index 38e221fa..34a3f389 100644 --- a/crates/web-plugins/didcomm-messaging/shared/src/state.rs +++ b/crates/web-plugins/didcomm-messaging/shared/src/state.rs @@ -4,9 +4,7 @@ use keystore::Secrets; use std::{sync::Arc, time::Duration}; use crate::{ - repository::entity::{Connection, RoutedMessage}, - utils::resolvers::{LocalDIDResolver, LocalSecretsResolver}, - CircuitBreaker::CircuitBreaker, + circuit_breaker::CircuitBreaker, repository::entity::{Connection, RoutedMessage}, utils::resolvers::{LocalDIDResolver, LocalSecretsResolver} }; #[derive(Clone)] From 8bf79d9c9c9b93fe5a7cde6a18172a2dd2a7b64b Mon Sep 17 00:00:00 2001 From: ndefokou Date: Wed, 18 Dec 2024 16:56:24 +0100 Subject: [PATCH 12/14] fixe()formating files --- crates/web-plugins/didcomm-messaging/shared/src/state.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/web-plugins/didcomm-messaging/shared/src/state.rs b/crates/web-plugins/didcomm-messaging/shared/src/state.rs index 34a3f389..6f498b17 100644 --- a/crates/web-plugins/didcomm-messaging/shared/src/state.rs +++ b/crates/web-plugins/didcomm-messaging/shared/src/state.rs @@ -4,7 +4,9 @@ use keystore::Secrets; use std::{sync::Arc, time::Duration}; use crate::{ - circuit_breaker::CircuitBreaker, repository::entity::{Connection, RoutedMessage}, utils::resolvers::{LocalDIDResolver, LocalSecretsResolver} + circuit_breaker::CircuitBreaker, + repository::entity::{Connection, RoutedMessage}, + utils::resolvers::{LocalDIDResolver, LocalSecretsResolver}, }; #[derive(Clone)] From 01d5d3632ccf5f235abdce2e1ce50a80c5229605 Mon Sep 17 00:00:00 2001 From: ndefokou Date: Thu, 19 Dec 2024 15:43:32 +0100 Subject: [PATCH 13/14] fixe()Removing code duplication --- .../protocols/pickup/src/handler.rs | 132 ++++-------------- .../didcomm-messaging/shared/src/retry.rs | 4 +- 2 files changed, 31 insertions(+), 105 deletions(-) diff --git a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs index 36778e81..823857d2 100644 --- a/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs +++ b/crates/web-plugins/didcomm-messaging/protocols/pickup/src/handler.rs @@ -39,7 +39,6 @@ pub(crate) async fn handle_status_request( .call_async(|| { let state = state.clone(); let message = message.clone(); - let circuit_breaker = circuit_breaker.clone(); async move { let recipient_did = message .body @@ -64,13 +63,7 @@ pub(crate) async fn handle_status_request( })?; // Pass `recipient_did` to count_messages, allowing it to handle `None` - let message_count = count_messages( - repository, - recipient_did, - connection, - circuit_breaker.clone(), - ) - .await?; + let message_count = count_messages(repository, recipient_did, connection).await?; let id = Uuid::new_v4().urn().to_string(); let response_builder: MessageBuilder = StatusResponse { @@ -124,7 +117,6 @@ pub(crate) async fn handle_delivery_request( .call_async(|| { let state = state.clone(); let message_body = message_body.clone(); - let circuit_breaker = circuit_breaker.clone(); async move { let recipient_did = message_body.get("recipient_did").and_then(Value::as_str); @@ -165,14 +157,8 @@ pub(crate) async fn handle_delivery_request( PickupError::InternalError("Failed to retrieve client connection".to_owned()) })?; - let messages = messages( - repository, - recipient_did, - connection, - limit as usize, - circuit_breaker.clone(), - ) - .await?; + let messages = + messages(repository, recipient_did, connection, limit as usize).await?; let response_builder: MessageBuilder; let id = Uuid::new_v4().urn().to_string(); @@ -250,7 +236,6 @@ pub(crate) async fn handle_message_acknowledgement( .call_async(|| { let state = state.clone(); let message = message.clone(); - let circuit_breaker = circuit_breaker.clone(); async move { let mediator_did = &state.diddoc.id; let repository = repository(state.clone())?; @@ -294,8 +279,7 @@ pub(crate) async fn handle_message_acknowledgement( })?; } - let message_count = - count_messages(repository, None, connection, circuit_breaker).await?; + let message_count = count_messages(repository, None, connection).await?; let id = Uuid::new_v4().urn().to_string(); let response_builder: MessageBuilder = StatusResponse { @@ -371,50 +355,21 @@ async fn count_messages( repository: AppStateRepository, recipient_did: Option<&str>, connection: Connection, - circuit_breaker: Arc>, + // circuit_breaker: Arc>, ) -> Result { let recipients = recipients(recipient_did, &connection); - let mut cb = circuit_breaker.lock().await; - - let result = cb - .call_async(|| { - let message_repository = repository.message_repository.clone(); - let recipients = recipients.clone(); - - async move { - retry_async( - || { - let message_repository = message_repository.clone(); - let recipients = recipients.clone(); - - async move { - message_repository - .count_by(doc! { "recipient_did": { "$in": recipients } }) - .await - .map_err(|_| ()) - } - }, - RetryOptions::new() - .retries(5) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(2)), - ) - .await - .map_err(|_| { - PickupError::InternalError( - "Failed to process the request. Please try again later.".to_owned(), - ) - }) - } - }) - .await; + let count = repository + .message_repository + .count_by(doc! { "recipient_did": { "$in": recipients } }) + .await + .map_err(|_| { + PickupError::InternalError( + "Failed to process the request. Please try again later.".to_owned(), + ) + })?; - match result { - Some(Ok(count)) => Ok(count), - Some(Err(err)) => Err(err), - None => Err(PickupError::CircuitOpen), - } + Ok(count) } async fn messages( @@ -422,53 +377,24 @@ async fn messages( recipient_did: Option<&str>, connection: Connection, limit: usize, - circuit_breaker: Arc>, + // circuit_breaker: Arc>, ) -> Result, PickupError> { let recipients = recipients(recipient_did, &connection); - let mut cb = circuit_breaker.lock().await; - - let result = cb - .call_async(|| { - let message_repository = repository.message_repository.clone(); - let recipients = recipients.clone(); - - async move { - retry_async( - || { - let message_repository = message_repository.clone(); - let recipients = recipients.clone(); - - async move { - message_repository - .find_all_by( - doc! { "recipient_did": { "$in": recipients } }, - Some(limit as i64), - ) - .await - .map_err(|_| ()) - } - }, - RetryOptions::new() - .retries(5) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(1)), - ) - .await - .map_err(|_| { - PickupError::InternalError( - "Failed to retrieve messages. Please try again later.".to_owned(), - ) - }) - } - }) - .await; + let routed_messages = repository + .message_repository + .find_all_by( + doc! { "recipient_did": { "$in": recipients } }, + Some(limit as i64), + ) + .await + .map_err(|_| { + PickupError::InternalError( + "Failed to process the request. Please try again later.".to_owned(), + ) + })?; - match result { - Some(Ok(messages)) => Ok(messages), - Some(Err(err)) => Err(err), - None => Err(PickupError::CircuitOpen), - } + Ok(routed_messages) } #[inline] diff --git a/crates/web-plugins/didcomm-messaging/shared/src/retry.rs b/crates/web-plugins/didcomm-messaging/shared/src/retry.rs index 117f5fa3..3eab1a0a 100644 --- a/crates/web-plugins/didcomm-messaging/shared/src/retry.rs +++ b/crates/web-plugins/didcomm-messaging/shared/src/retry.rs @@ -60,7 +60,7 @@ where match operation().await { Ok(result) => return Ok(result), - Err(err) if attempt <= retries => { + Err(_err) if attempt <= retries => { if let Some(fixed) = fixed_backoff { sleep(fixed).await; } else if delay > Duration::ZERO { @@ -69,7 +69,7 @@ where delay = (delay * 2).min(max_delay); } } - Err(err) => return Err(err), + Err(_err) => return Err(_err), } } } From 9eff984d58376afdb9cd02fd404db12e6abd0b0d Mon Sep 17 00:00:00 2001 From: ndefokou Date: Thu, 9 Jan 2025 16:27:40 +0100 Subject: [PATCH 14/14] fixe():forward protocol --- crates/web-plugins/didcomm-messaging/shared/src/retry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/web-plugins/didcomm-messaging/shared/src/retry.rs b/crates/web-plugins/didcomm-messaging/shared/src/retry.rs index 3eab1a0a..dd96dc7b 100644 --- a/crates/web-plugins/didcomm-messaging/shared/src/retry.rs +++ b/crates/web-plugins/didcomm-messaging/shared/src/retry.rs @@ -53,7 +53,7 @@ where let mut attempt = 0; let mut delay = exponential_backoff.unwrap_or_default(); - let max_delay = max_delay.unwrap_or_else(|| Duration::from_secs(60)); // Default max delay of 60 seconds + let max_delay = max_delay.unwrap_or_else(|| Duration::from_secs(60)); loop { attempt += 1;