diff --git a/Cargo.lock b/Cargo.lock index 10847a510..6a2563ea7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -382,6 +382,16 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -393,6 +403,35 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-executor" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b0c4a4f319e45986f347ee47fef8bf5e81c9abc3f6f58dc2391439f30df65f0" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand 2.0.0", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + [[package]] name = "async-io" version = "1.13.0" @@ -422,6 +461,39 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-attributes", + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite 0.2.10", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" + [[package]] name = "async-trait" version = "0.1.72" @@ -695,6 +767,22 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" +[[package]] +name = "blocking" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c36a4d0d48574b3dd360b4b7d95cc651d2b6557b6402848a27d4b228a473e2a" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "fastrand 2.0.0", + "futures-io", + "futures-lite", + "piper", + "tracing", +] + [[package]] name = "bounded-collections" version = "0.1.8" @@ -3826,6 +3914,15 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "kvdb" version = "0.13.0" @@ -4495,6 +4592,9 @@ name = "log" version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +dependencies = [ + "value-bag", +] [[package]] name = "lru" @@ -6022,6 +6122,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.0", + "futures-io", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -9965,6 +10076,7 @@ dependencies = [ name = "stellar-relay-lib" version = "1.0.3" dependencies = [ + "async-std", "base64 0.13.1", "env_logger 0.9.3", "err-derive", @@ -11119,10 +11231,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cdbaf5e132e593e9fc1de6a15bbec912395b11fb9719e061cf64f804524c503" + [[package]] name = "vault" version = "1.0.3" dependencies = [ + "async-std", "async-trait", "base64 0.13.1", "bincode", diff --git a/clients/stellar-relay-lib/Cargo.toml b/clients/stellar-relay-lib/Cargo.toml index 50bb38442..5446a6534 100644 --- a/clients/stellar-relay-lib/Cargo.toml +++ b/clients/stellar-relay-lib/Cargo.toml @@ -39,6 +39,7 @@ tokio = { version = "1.0", features = [ "sync", # to make channels available "time" # for timeouts and sleep, when reconnecting ] } +async-std = { version = "1.12.0", features = ["attributes"] } [features] std = [ diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index f9bb8d56a..a588a46de 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -1,8 +1,7 @@ +use async_std::net::TcpStream; use std::{ fmt::{Debug, Formatter}, - net::TcpStream, - sync::{Arc, Mutex}, - time::Duration, + net::Shutdown, }; use substrate_stellar_sdk::{ types::{AuthenticatedMessageV0, Curve25519Public, HmacSha256Mac, MessageType}, @@ -37,7 +36,7 @@ pub struct Connector { flow_controller: FlowController, /// for writing/reading xdr messages to/from Stellar Node. - pub(crate) tcp_stream: Arc>, + pub(crate) tcp_stream: TcpStream, } impl Debug for Connector { @@ -53,10 +52,32 @@ impl Debug for Connector { .field("receive_scp_messages", &self.receive_scp_messages) .field("handshake_state", &self.handshake_state) .field("flow_controller", &self.flow_controller) + .field( + "local_addr", + &self + .tcp_stream + .local_addr() + .map(|addr| addr.to_string()) + .unwrap_or("cannot provide".to_string()), + ) + .field( + "peer_addr", + &self + .tcp_stream + .peer_addr() + .map(|addr| addr.to_string()) + .unwrap_or("cannot provide".to_string()), + ) .finish() } } +impl Drop for Connector { + fn drop(&mut self) { + self.stop(); + } +} + impl Connector { /// Verifies the AuthenticatedMessage, received from the Stellar Node pub(super) fn verify_auth( @@ -115,22 +136,17 @@ impl Connector { /// returns a Connector and starts creating a connection to Stellar pub async fn start(local_node: NodeInfo, conn_info: ConnectionInfo) -> Result { + // Create the stream + let tcp_stream = TcpStream::connect(conn_info.address()) + .await + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; + let connection_auth = ConnectionAuth::new( &local_node.network_id, conn_info.keypair(), conn_info.auth_cert_expiration, ); - // Create the stream - let tcp_stream = TcpStream::connect(conn_info.address()) - .map_err(|e| Error::ConnectionFailed(e.to_string()))?; - - if let Err(e) = - tcp_stream.set_read_timeout(Some(Duration::from_secs(conn_info.timeout_in_secs))) - { - log::warn!("start(): failed to set read timeout for the stream: {e:?}"); - } - let mut connector = Connector { local: LocalInfo::new(local_node), remote_info: None, @@ -142,7 +158,7 @@ impl Connector { receive_scp_messages: conn_info.recv_scp_msgs, handshake_state: HandshakeState::Connecting, flow_controller: FlowController::default(), - tcp_stream: Arc::new(Mutex::new(tcp_stream)), + tcp_stream, }; // To start the handshake, send a hello message to Stellar @@ -150,6 +166,12 @@ impl Connector { Ok(connector) } + + pub fn stop(&mut self) { + if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) { + log::error!("stop(): failed to shutdown tcp stream: {}", e); + } + } } // getters setters @@ -231,7 +253,6 @@ impl Connector { mod test { use crate::{connection::hmac::HMacKeys, node::RemoteInfo, StellarOverlayConfig}; use serial_test::serial; - use std::net::Shutdown; use substrate_stellar_sdk::{ compound_types::LimitedString, @@ -263,16 +284,6 @@ mod test { new_auth_cert } - impl Connector { - fn shutdown(&mut self) { - self.tcp_stream - .lock() - .unwrap() - .shutdown(Shutdown::Both) - .expect("should shutdown both read and write of stream"); - } - } - async fn create_connector() -> (NodeInfo, ConnectionInfo, Connector) { let cfg_file_path = "./resources/config/testnet/stellar_relay_config_sdftest1.json"; let secret_key_path = "./resources/secretkey/stellar_secretkey_testnet"; @@ -294,7 +305,7 @@ mod test { #[tokio::test] #[serial] async fn create_new_connector_works() { - let (node_info, _, mut connector) = create_connector().await; + let (node_info, _, connector) = create_connector().await; let connector_local_node = connector.local.node(); @@ -303,8 +314,6 @@ mod test { assert_eq!(connector_local_node.overlay_min_version, node_info.overlay_min_version); assert_eq!(connector_local_node.version_str, node_info.version_str); assert_eq!(connector_local_node.network_id, node_info.network_id); - - connector.shutdown(); } #[tokio::test] @@ -314,8 +323,6 @@ mod test { assert_eq!(connector.local_sequence(), 0); connector.increment_local_sequence(); assert_eq!(connector.local_sequence(), 1); - - connector.shutdown(); } #[tokio::test] @@ -340,8 +347,6 @@ mod test { connector.set_remote(RemoteInfo::new(&hello)); assert!(connector.remote().is_some()); - - connector.shutdown(); } #[tokio::test] @@ -370,8 +375,6 @@ mod test { connector.increment_remote_sequence().unwrap(); connector.increment_remote_sequence().unwrap(); assert_eq!(connector.remote().unwrap().sequence(), 3); - - connector.shutdown(); } #[tokio::test] @@ -408,8 +411,6 @@ mod test { )); //assert assert!(connector.hmac_keys().is_some()); - - connector.shutdown(); } #[tokio::test] @@ -426,8 +427,6 @@ mod test { connector.handshake_completed(); assert!(connector.is_handshake_created()); - - connector.shutdown(); } #[tokio::test] @@ -437,7 +436,5 @@ mod test { assert!(!connector.inner_check_to_send_more(MessageType::ScpMessage)); connector.enable_flow_controller(node_info.overlay_version, node_info.overlay_version); - - connector.shutdown(); } } diff --git a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs index 87d2c2fde..b4101ea71 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs @@ -103,7 +103,6 @@ impl Connector { String::from_utf8(other.to_base64_xdr()) .unwrap_or(format!("{:?}", other.to_base64_xdr())) ); - self.check_to_send_more(msg_type).await?; return Ok(Some(other)) }, @@ -124,6 +123,8 @@ impl Connector { self.local().node().overlay_version, remote.node().overlay_version, ); + } else { + log::warn!("process_auth_message(): No remote overlay version after handshake."); } self.check_to_send_more(MessageType::Auth).await diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 7b48daf99..4e308c7ad 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -1,11 +1,6 @@ use crate::connection::{xdr_converter::get_xdr_message_length, Connector, Error, Xdr}; -use std::{ - io::Read, - net::{Shutdown, TcpStream}, - sync::{Arc, Mutex}, -}; +use async_std::io::ReadExt; use substrate_stellar_sdk::{types::StellarMessage, XdrCodec}; - use tokio::sync::{mpsc, mpsc::error::TryRecvError}; /// Polls for messages coming from the Stellar Node and communicates it back to the user @@ -21,7 +16,7 @@ pub(crate) async fn poll_messages_from_stellar( mut send_to_node_receiver: mpsc::Receiver, ) { log::info!("poll_messages_from_stellar(): started."); - // clone the stream to perform a read operation on the next function calls + loop { if send_to_user_sender.is_closed() { log::info!("poll_messages_from_stellar(): closing receiver during disconnection"); @@ -35,20 +30,12 @@ pub(crate) async fn poll_messages_from_stellar( if let Err(e) = connector.send_to_node(msg).await { log::error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}"); }, - Err(TryRecvError::Disconnected) => { - log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) got disconnected."); - break - }, + Err(TryRecvError::Disconnected) => break, Err(TryRecvError::Empty) => {}, } // check for messages from Stellar Node. - let stream_clone = connector.tcp_stream.clone(); - // Spawn a blocking task to read from the stream - let xdr_result = read_message_from_stellar(stream_clone).await; - - // Check the result of the blocking task - let xdr = match xdr_result { + let xdr = match read_message_from_stellar(&mut connector).await { Err(e) => { log::error!("poll_messages_from_stellar(): {e:?}"); break @@ -72,18 +59,17 @@ pub(crate) async fn poll_messages_from_stellar( }, } } - // make sure to shutdown the stream - if let Err(e) = connector.tcp_stream.clone().lock().unwrap().shutdown(Shutdown::Both) { - log::error!("poll_messages_from_stellar(): Failed to shutdown the tcp stream: {e:?}"); - }; + + // make sure to shutdown the connector + connector.stop(); send_to_node_receiver.close(); drop(send_to_user_sender); - log::info!("poll_messages_from_stellar(): stopped."); + log::debug!("poll_messages_from_stellar(): stopped."); } /// Returns Xdr format of the `StellarMessage` sent from the Stellar Node -async fn read_message_from_stellar(stream_clone: Arc>) -> Result { +async fn read_message_from_stellar(connector: &mut Connector) -> Result { // holds the number of bytes that were missing from the previous stellar message. let mut lack_bytes_from_prev = 0; let mut readbuf: Vec = vec![]; @@ -93,18 +79,8 @@ async fn read_message_from_stellar(stream_clone: Arc>) -> Resul // check whether or not we should read the bytes as: // 1. the length of the next stellar message // 2. the remaining bytes of the previous stellar message - // Temporary scope for locking - let result = { - let mut stream = stream_clone.lock().unwrap(); - stream.read(&mut buff_for_reading) - }; - - match result { - Ok(size) if size == 0 => { - // No data available to read - tokio::task::yield_now().await; - continue - }, + match connector.tcp_stream.read(&mut buff_for_reading).await { + Ok(size) if size == 0 => continue, Ok(_) if lack_bytes_from_prev == 0 => { // if there are no more bytes lacking from the previous message, // then check the size of next stellar message. @@ -113,23 +89,22 @@ async fn read_message_from_stellar(stream_clone: Arc>) -> Resul // If it's not enough, skip it. if expect_msg_len == 0 { // there's nothing to read; wait for the next iteration - log::trace!( - "read_message_from_stellar(): Nothing left to read; waiting for next loop" - ); - tokio::task::yield_now().await; + log::trace!("read_message_from_stellar(): expect_msg_len == 0"); continue } + + // let's start reading the actual stellar message. readbuf = vec![0; expect_msg_len]; + match read_message( - stream_clone.clone(), + connector, &mut lack_bytes_from_prev, &mut readbuf, expect_msg_len, - ) { - Ok(None) => { - tokio::task::yield_now().await; - continue - }, + ) + .await + { + Ok(None) => continue, Ok(Some(xdr)) => return Ok(xdr), Err(e) => { log::trace!("read_message_from_stellar(): ERROR: {e:?}"); @@ -141,17 +116,14 @@ async fn read_message_from_stellar(stream_clone: Arc>) -> Resul // The next few bytes was read. Add it to the readbuf. lack_bytes_from_prev -= size; readbuf.append(&mut buff_for_reading); + // make sure to cleanup the buffer + buff_for_reading = vec![0; 4]; // let's read the continuation number of bytes from the previous message. - match read_unfinished_message( - stream_clone.clone(), - &mut lack_bytes_from_prev, - &mut readbuf, - ) { - Ok(None) => { - tokio::task::yield_now().await; - continue - }, + match read_unfinished_message(connector, &mut lack_bytes_from_prev, &mut readbuf) + .await + { + Ok(None) => continue, Ok(Some(xdr)) => return Ok(xdr), Err(e) => { log::trace!("read_message_from_stellar(): ERROR: {e:?}"); @@ -172,18 +144,22 @@ async fn read_message_from_stellar(stream_clone: Arc>) -> Resul /// This reads a number of bytes based on the expected message length. /// /// # Arguments -/// * `stream` - the TcpStream for reading the xdr stellar message +/// * `connector` - a ref struct that contains the config and necessary info for connecting to +/// Stellar Node /// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message /// * `xpect_msg_len` - the expected # of bytes of the Stellar message -fn read_message( - stream: Arc>, +async fn read_message( + connector: &mut Connector, lack_bytes_from_prev: &mut usize, readbuf: &mut Vec, xpect_msg_len: usize, ) -> Result, Error> { - let mut stream = stream.lock().unwrap(); - let actual_msg_len = stream.read(readbuf).map_err(|e| Error::ReadFailed(e.to_string()))?; + let actual_msg_len = connector + .tcp_stream + .read(readbuf) + .await + .map_err(|e| Error::ReadFailed(e.to_string()))?; // only when the message has the exact expected size bytes, should we send to user. if actual_msg_len == xpect_msg_len { @@ -205,21 +181,22 @@ fn read_message( /// Reads a continuation of bytes that belong to the previous message /// /// # Arguments -/// * `stream` - the TcpStream for reading the xdr stellar message +/// * `connector` - a ref struct that contains the config and necessary info for connecting to +/// Stellar Node /// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message -fn read_unfinished_message( - stream: Arc>, +async fn read_unfinished_message( + connector: &mut Connector, lack_bytes_from_prev: &mut usize, readbuf: &mut Vec, ) -> Result, Error> { // let's read the continuation number of bytes from the previous message. let mut cont_buf = vec![0; *lack_bytes_from_prev]; - let actual_msg_len = stream - .lock() - .unwrap() + let actual_msg_len = connector + .tcp_stream .read(&mut cont_buf) + .await .map_err(|e| Error::ReadFailed(e.to_string()))?; // this partial message completes the previous message. diff --git a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs index 1f1a0916a..1145dddb6 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs @@ -1,5 +1,7 @@ -use std::io::Write; +use async_std::io::WriteExt; +use std::time::Duration; use substrate_stellar_sdk::types::{MessageType, SendMore, StellarMessage}; +use tokio::time::timeout; use crate::connection::{ flow_controller::MAX_FLOOD_MSG_CAP, @@ -13,25 +15,14 @@ impl Connector { // Create the XDR message outside the closure let xdr_msg = self.create_xdr_message(msg)?; - // Clone the TcpStream (or its Arc> wrapper) - let stream_clone = self.tcp_stream.clone(); - - // this may really not be necessary - let write_result = tokio::task::spawn_blocking(move || { - let mut stream = stream_clone.lock().unwrap(); - stream.write_all(&xdr_msg).map_err(|e| { - log::error!("send_to_node(): Failed to send message to node: {e:?}"); - Error::WriteFailed(e.to_string()) - }) - }); - - // Await the result of the blocking task - match write_result.await { - Ok(result) => result, - Err(e) => { - log::error!("send_to_node(): Error occurred in blocking task: {e:?}"); - Err(Error::WriteFailed(e.to_string())) - }, + match timeout( + Duration::from_secs(self.timeout_in_secs), + self.tcp_stream.write_all(&xdr_msg), + ) + .await + { + Ok(res) => res.map_err(|e| Error::WriteFailed(e.to_string())), + Err(_) => Err(Error::Timeout), } } diff --git a/clients/stellar-relay-lib/src/connection/error.rs b/clients/stellar-relay-lib/src/connection/error.rs index ba3438e71..334600383 100644 --- a/clients/stellar-relay-lib/src/connection/error.rs +++ b/clients/stellar-relay-lib/src/connection/error.rs @@ -51,9 +51,6 @@ pub enum Error { #[error(display = "{:?}", _0)] XDRConversionError(XDRError), - #[error(display = "{:?}", _0)] - StdIOError(std::io::Error), - #[error(display = "{:?}", _0)] StellarSdkError(StellarSdkError), @@ -73,12 +70,6 @@ pub enum Error { VersionStrTooLong, } -impl From for Error { - fn from(e: std::io::Error) -> Self { - Error::StdIOError(e) - } -} - impl From for Error { fn from(e: XDRError) -> Self { Error::XDRConversionError(e) diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index fcf0e440d..c311fa29e 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -58,7 +58,6 @@ impl StellarOverlayConnection { pub fn listen(&mut self) -> Result, Error> { loop { if !self.is_alive() { - self.disconnect(); return Err(Error::Disconnected) } @@ -82,20 +81,20 @@ impl StellarOverlayConnection { let is_closed = self.sender.is_closed(); if is_closed { - self.disconnect(); + self.stop(); } !is_closed } - pub fn disconnect(&mut self) { - log::info!("disconnect(): closing connection to overlay network"); + pub fn stop(&mut self) { + log::info!("stop(): closing connection to overlay network"); self.receiver.close(); } } impl Drop for StellarOverlayConnection { fn drop(&mut self) { - self.disconnect(); + self.stop(); } } diff --git a/clients/stellar-relay-lib/src/tests/mod.rs b/clients/stellar-relay-lib/src/tests/mod.rs index 0bd51ff45..26e77bf72 100644 --- a/clients/stellar-relay-lib/src/tests/mod.rs +++ b/clients/stellar-relay-lib/src/tests/mod.rs @@ -1,13 +1,13 @@ use crate::{ connection::ConnectionInfo, node::NodeInfo, StellarOverlayConfig, StellarOverlayConnection, }; +use async_std::{future::timeout, sync::Mutex}; use serial_test::serial; use std::{sync::Arc, thread::sleep, time::Duration}; use substrate_stellar_sdk::{ types::{ScpStatementExternalize, ScpStatementPledges, StellarMessage}, Hash, IntoHash, }; -use tokio::{sync::Mutex, time::timeout}; fn secret_key(is_mainnet: bool) -> String { let path = if is_mainnet { @@ -59,7 +59,7 @@ async fn stellar_overlay_should_receive_scp_messages() { if let Ok(Some(msg)) = ov_conn_locked.listen() { scps_vec_clone.lock().await.push(msg); - ov_conn_locked.disconnect(); + ov_conn_locked.stop(); } }) .await @@ -107,15 +107,11 @@ async fn stellar_overlay_should_receive_tx_set() { StellarMessage::TxSet(set) => { let tx_set_hash = set.into_hash().expect("should return a hash"); actual_tx_set_hashes_clone.lock().await.push(tx_set_hash); - - ov_conn_locked.disconnect(); break }, StellarMessage::GeneralizedTxSet(set) => { let tx_set_hash = set.into_hash().expect("should return a hash"); actual_tx_set_hashes_clone.lock().await.push(tx_set_hash); - - ov_conn_locked.disconnect(); break }, _ => {}, @@ -145,7 +141,7 @@ async fn stellar_overlay_disconnect_works() { // let it run for a second, before disconnecting. sleep(Duration::from_secs(1)); - overlay_connection.disconnect(); + overlay_connection.stop(); // let the disconnection call pass for a few seconds, before checking its status. sleep(Duration::from_secs(3)); diff --git a/clients/vault/Cargo.toml b/clients/vault/Cargo.toml index f91f12b06..4dfd5c6a7 100644 --- a/clients/vault/Cargo.toml +++ b/clients/vault/Cargo.toml @@ -20,6 +20,7 @@ parachain-metadata-foucoco = ["runtime/parachain-metadata-foucoco"] integration-test = ["runtime/standalone-metadata", "integration"] [dependencies] +async-std = "1.12.0" async-trait = "0.1.40" base64 = { version = '0.13.0', default-features = false, features = ['alloc'] } bincode = "1.3.3" diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json index cee8b5f6b..d82db5ad8 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json @@ -7,7 +7,7 @@ "ledger_version": 20, "overlay_version": 31, "overlay_min_version": 27, - "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", + "version_str": "stellar-core 20.2.0.rc1 (3076c138d77735c6ce8230886a540f4d54d85c59)", "is_pub_net": false }, "stellar_history_archive_urls": [] diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json index 894a57110..057dd44cf 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json @@ -7,7 +7,7 @@ "ledger_version": 20, "overlay_version": 31, "overlay_min_version": 27, - "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", + "version_str": "stellar-core 20.2.0.rc1 (3076c138d77735c6ce8230886a540f4d54d85c59)", "is_pub_net": false }, "stellar_history_archive_urls": [] diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json index 1f0a7291f..c9334f817 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json @@ -7,7 +7,7 @@ "ledger_version": 20, "overlay_version": 31, "overlay_min_version": 27, - "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", + "version_str": "stellar-core 20.2.0.rc1 (3076c138d77735c6ce8230886a540f4d54d85c59)", "is_pub_net": false }, "stellar_history_archive_urls": [] diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index 8945bf878..77ff5fff5 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -91,7 +91,6 @@ pub async fn start_oracle_agent( // if a disconnect signal was sent, disconnect from Stellar. Ok(_) | Err(TryRecvError::Disconnected) => { tracing::info!("start_oracle_agent(): disconnect overlay..."); - overlay_conn.disconnect(); break }, Err(TryRecvError::Empty) => {}, @@ -112,7 +111,6 @@ pub async fn start_oracle_agent( Ok(None) => {}, // connection got lost Err(e) => { - overlay_conn.disconnect(); tracing::error!("start_oracle_agent(): encounter error in overlay: {e:?}"); if let Err(e) = shutdown_sender_clone2.send(()) { @@ -124,10 +122,13 @@ pub async fn start_oracle_agent( }, } } + + // shutdown the overlay connection + overlay_conn.stop(); }); tokio::spawn(on_shutdown(shutdown_sender.clone(), async move { - tracing::info!("start_oracle_agent(): sending signal to shutdown overlay connection..."); + tracing::debug!("start_oracle_agent(): sending signal to shutdown overlay connection..."); if let Err(e) = disconnect_signal_sender.send(()).await { tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); } @@ -141,6 +142,12 @@ pub async fn start_oracle_agent( }) } +impl Drop for OracleAgent { + fn drop(&mut self) { + self.stop(); + } +} + impl OracleAgent { /// This method returns the proof for a given slot or an error if the proof cannot be provided. /// The agent will try every possible way to get the proof before returning an error. @@ -192,19 +199,18 @@ impl OracleAgent { } /// Stops listening for new SCP messages. - pub fn stop(&self) -> Result<(), Error> { - tracing::info!("stop(): Shutting down OracleAgent..."); + pub fn stop(&self) { + tracing::debug!("stop(): Shutting down OracleAgent..."); if let Err(e) = self.shutdown_sender.send(()) { tracing::error!("stop(): Failed to send shutdown signal in OracleAgent: {:?}", e); } - Ok(()) } } #[cfg(test)] mod tests { use crate::oracle::{ - get_random_secret_key, get_test_secret_key, get_test_stellar_relay_config, + get_random_secret_key, get_test_secret_key, specific_stellar_relay_config, traits::ArchiveStorage, ScpArchiveStorage, TransactionsArchiveStorage, }; @@ -215,11 +221,15 @@ mod tests { #[ntest::timeout(1_800_000)] // timeout at 30 minutes #[serial] async fn test_get_proof_for_current_slot() { + // let it run for a few seconds, making sure that the other tests have successfully shutdown + // their connection to Stellar Node + sleep(Duration::from_secs(2)).await; + let shutdown_sender = ShutdownSender::new(); // We use a random secret key to avoid conflicts with other tests. let agent = start_oracle_agent( - get_test_stellar_relay_config(true), + specific_stellar_relay_config(true, 0), &get_random_secret_key(), shutdown_sender, ) @@ -244,12 +254,16 @@ mod tests { #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_get_proof_for_archived_slot() { + // let it run for a few seconds, making sure that the other tests have successfully shutdown + // their connection to Stellar Node + sleep(Duration::from_secs(2)).await; + let scp_archive_storage = ScpArchiveStorage::default(); let tx_archive_storage = TransactionsArchiveStorage::default(); let shutdown_sender = ShutdownSender::new(); let agent = start_oracle_agent( - get_test_stellar_relay_config(true), + specific_stellar_relay_config(true, 1), &get_test_secret_key(true), shutdown_sender, ) @@ -266,17 +280,19 @@ mod tests { // These might return an error if the file does not exist, but that's fine. let _ = scp_archive_storage.remove_file(target_slot); let _ = tx_archive_storage.remove_file(target_slot); - - agent.stop().expect("Failed to stop the agent"); } #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_get_proof_for_archived_slot_with_fallback() { + // let it run for a few seconds, making sure that the other tests have successfully shutdown + // their connection to Stellar Node + sleep(Duration::from_secs(2)).await; + let scp_archive_storage = ScpArchiveStorage::default(); let tx_archive_storage = TransactionsArchiveStorage::default(); - let base_config = get_test_stellar_relay_config(true); + let base_config = specific_stellar_relay_config(true, 2); // We add two fake archive urls to the config to make sure that the agent will actually fall // back to other archives. let mut archive_urls = base_config.stellar_history_archive_urls().clone(); @@ -302,18 +318,15 @@ mod tests { // These might return an error if the file does not exist, but that's fine. let _ = scp_archive_storage.remove_file(target_slot); let _ = tx_archive_storage.remove_file(target_slot); - - agent.stop().expect("Failed to stop the agent"); } #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_get_proof_for_archived_slot_fails_without_archives() { - env_logger::init(); let scp_archive_storage = ScpArchiveStorage::default(); let tx_archive_storage = TransactionsArchiveStorage::default(); - let base_config = get_test_stellar_relay_config(true); + let base_config = specific_stellar_relay_config(true, 0); let modified_config: StellarOverlayConfig = StellarOverlayConfig { stellar_history_archive_urls: vec![], ..base_config }; @@ -324,6 +337,8 @@ mod tests { // This slot should be archived on the public network let target_slot = 44041116; + tracing::info!("let's sleep for 3 seconds,to get the network up and running"); + sleep(Duration::from_secs(3)).await; let proof_result = agent.get_proof(target_slot).await; assert!(matches!(proof_result, Err(Error::ProofTimeout(_)))); @@ -331,8 +346,5 @@ mod tests { // These might return an error if the file does not exist, but that's fine. let _ = scp_archive_storage.remove_file(target_slot); let _ = tx_archive_storage.remove_file(target_slot); - - println!("HOY PLEAAASE"); - agent.stop().expect("Failed to stop the agent"); } } diff --git a/clients/vault/src/oracle/collector/collector.rs b/clients/vault/src/oracle/collector/collector.rs index 3a9e3aac9..97f014c98 100644 --- a/clients/vault/src/oracle/collector/collector.rs +++ b/clients/vault/src/oracle/collector/collector.rs @@ -234,7 +234,7 @@ mod test { use crate::oracle::{ collector::{collector::AddTxSet, ScpMessageCollector}, - get_test_stellar_relay_config, + random_stellar_relay_config, traits::FileHandler, EnvelopesFileHandler, }; @@ -273,7 +273,7 @@ mod test { } fn stellar_history_archive_urls() -> Vec { - get_test_stellar_relay_config(true).stellar_history_archive_urls() + random_stellar_relay_config(true).stellar_history_archive_urls() } #[test] diff --git a/clients/vault/src/oracle/storage/impls.rs b/clients/vault/src/oracle/storage/impls.rs index 28be05d8a..e4dd33677 100644 --- a/clients/vault/src/oracle/storage/impls.rs +++ b/clients/vault/src/oracle/storage/impls.rs @@ -160,8 +160,8 @@ mod test { use crate::oracle::{ constants::MAX_SLOTS_PER_FILE, errors::Error, - get_test_stellar_relay_config, impls::ArchiveStorage, + random_stellar_relay_config, storage::{ traits::{FileHandler, FileHandlerExt}, EnvelopesFileHandler, @@ -174,7 +174,7 @@ mod test { impl Default for ScpArchiveStorage { fn default() -> Self { - let cfg = get_test_stellar_relay_config(true); + let cfg = random_stellar_relay_config(true); let archive_urls = cfg.stellar_history_archive_urls(); let archive_url = archive_urls.first().expect("should have an archive url"); ScpArchiveStorage(archive_url.clone()) @@ -183,7 +183,7 @@ mod test { impl Default for TransactionsArchiveStorage { fn default() -> Self { - let cfg = get_test_stellar_relay_config(true); + let cfg = random_stellar_relay_config(true); let archive_urls = cfg.stellar_history_archive_urls(); let archive_url = archive_urls.first().expect("should have an archive url"); TransactionsArchiveStorage(archive_url.clone()) diff --git a/clients/vault/src/oracle/testing_utils.rs b/clients/vault/src/oracle/testing_utils.rs index e5e10760b..6f245cd6e 100644 --- a/clients/vault/src/oracle/testing_utils.rs +++ b/clients/vault/src/oracle/testing_utils.rs @@ -1,19 +1,43 @@ use stellar_relay_lib::sdk::SecretKey; -pub fn get_test_stellar_relay_config(is_mainnet: bool) -> stellar_relay_lib::StellarOverlayConfig { +pub fn random_stellar_relay_config(is_mainnet: bool) -> stellar_relay_lib::StellarOverlayConfig { use rand::seq::SliceRandom; - let stellar_node_points: Vec<&str> = if is_mainnet { + let (stellar_node_points, dir) = stellar_relay_config_choices(is_mainnet); + + let node_point = stellar_node_points + .choose(&mut rand::thread_rng()) + .expect("should return a value"); + + stellar_relay_config_abs_path(dir, node_point) +} + +pub fn specific_stellar_relay_config( + is_mainnet: bool, + index: usize, +) -> stellar_relay_lib::StellarOverlayConfig { + let (stellar_node_points, dir) = stellar_relay_config_choices(is_mainnet); + + let node_point = stellar_node_points.get(index).expect("should return a value"); + + stellar_relay_config_abs_path(dir, node_point) +} + +fn stellar_relay_config_choices(is_mainnet: bool) -> (Vec<&'static str>, &'static str) { + let node_points = if is_mainnet { vec!["frankfurt", "iowa", "singapore"] } else { vec!["sdftest1", "sdftest2", "sdftest3"] }; - let dir = if is_mainnet { "mainnet" } else { "testnet" }; - let res = stellar_node_points - .choose(&mut rand::thread_rng()) - .expect("should return a value"); - let path_string = format!("./resources/config/{dir}/stellar_relay_config_{res}.json"); + let dir = if is_mainnet { "mainnet" } else { "testnet" }; + (node_points, dir) +} +fn stellar_relay_config_abs_path( + dir: &str, + node_point: &str, +) -> stellar_relay_lib::StellarOverlayConfig { + let path_string = format!("./resources/config/{dir}/stellar_relay_config_{node_point}.json"); stellar_relay_lib::StellarOverlayConfig::try_from_path(path_string.as_str()) .expect("should be able to extract config") diff --git a/clients/vault/tests/helper/mod.rs b/clients/vault/tests/helper/mod.rs index 7747f0b10..caa17e9b5 100644 --- a/clients/vault/tests/helper/mod.rs +++ b/clients/vault/tests/helper/mod.rs @@ -20,7 +20,7 @@ use std::{future::Future, sync::Arc}; use stellar_relay_lib::StellarOverlayConfig; use tokio::sync::RwLock; use vault::{ - oracle::{get_test_secret_key, get_test_stellar_relay_config, start_oracle_agent, OracleAgent}, + oracle::{get_test_secret_key, random_stellar_relay_config, start_oracle_agent, OracleAgent}, ArcRwLock, }; use wallet::StellarWallet; @@ -28,7 +28,7 @@ use wallet::StellarWallet; pub type StellarPublicKey = [u8; 32]; lazy_static! { - pub static ref CFG: StellarOverlayConfig = get_test_stellar_relay_config(false); + pub static ref CFG: StellarOverlayConfig = random_stellar_relay_config(false); pub static ref SECRET_KEY: String = get_test_secret_key(false); // TODO clean this up by extending the `get_test_secret_key()` function pub static ref DESTINATION_SECRET_KEY: String = "SDNQJEIRSA6YF5JNS6LQLCBF2XVWZ2NJV3YLC322RGIBJIJRIRGWKLEF".to_string(); diff --git a/clients/wallet/src/horizon/tests.rs b/clients/wallet/src/horizon/tests.rs index 348de6ef4..dfbd9e845 100644 --- a/clients/wallet/src/horizon/tests.rs +++ b/clients/wallet/src/horizon/tests.rs @@ -20,7 +20,9 @@ use crate::types::FilterWith; use super::*; -const SECRET: &str = "SBLI7RKEJAEFGLZUBSCOFJHQBPFYIIPLBCKN7WVCWT4NEG2UJEW33N73"; +// const SECRET: &str = "SBLI7RKEJAEFGLZUBSCOFJHQBPFYIIPLBCKN7WVCWT4NEG2UJEW33N73"; + +const SECRET: &str = "SB6WHKIU2HGVBRNKNOEOQUY4GFC4ZLG5XPGWLEAHTIZXBXXYACC76VSQ"; #[derive(Clone)] struct MockFilter;