From b1ac99fead6b44555f74b440323a547937a33035 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Thu, 30 Jan 2025 12:43:53 +0530 Subject: [PATCH 1/2] remove async-std from translator --- roles/Cargo.lock | 1 - roles/translator/Cargo.toml | 3 +- .../src/lib/downstream_sv1/downstream.rs | 107 +++++++++--------- roles/translator/src/lib/mod.rs | 2 +- .../src/lib/upstream_sv2/diff_management.rs | 2 +- .../src/lib/upstream_sv2/upstream.rs | 6 +- 6 files changed, 57 insertions(+), 64 deletions(-) diff --git a/roles/Cargo.lock b/roles/Cargo.lock index 21a79749ef..2d557bae26 100644 --- a/roles/Cargo.lock +++ b/roles/Cargo.lock @@ -2743,7 +2743,6 @@ dependencies = [ "async-channel 1.9.0", "async-compat", "async-recursion 0.3.2", - "async-std", "binary_sv2", "buffer_sv2", "codec_sv2", diff --git a/roles/translator/Cargo.toml b/roles/translator/Cargo.toml index 95b3463969..5bd0d702c7 100644 --- a/roles/translator/Cargo.toml +++ b/roles/translator/Cargo.toml @@ -23,12 +23,11 @@ path = "src/main.rs" stratum-common = { path = "../../common" } async-channel = "1.5.1" async-recursion = "0.3.2" -async-std = { version = "1.12.0", features = ["attributes"] } binary_sv2 = { path = "../../protocols/v2/binary-sv2/binary-sv2" } buffer_sv2 = { path = "../../utils/buffer" } codec_sv2 = { path = "../../protocols/v2/codec-sv2", features = ["noise_sv2", "with_buffer_pool"] } framing_sv2 = { path = "../../protocols/v2/framing-sv2" } -network_helpers_sv2 = { path = "../roles-utils/network-helpers", features=["async_std", "with_buffer_pool"] } +network_helpers_sv2 = { path = "../roles-utils/network-helpers", features=["with_tokio", "with_buffer_pool"] } once_cell = "1.12.0" roles_logic_sv2 = { path = "../../protocols/v2/roles-logic-sv2" } serde = { version = "1.0.89", default-features = false, features = ["derive", "alloc"] } diff --git a/roles/translator/src/lib/downstream_sv1/downstream.rs b/roles/translator/src/lib/downstream_sv1/downstream.rs index 5dfa33a3b9..d23d2159d4 100644 --- a/roles/translator/src/lib/downstream_sv1/downstream.rs +++ b/roles/translator/src/lib/downstream_sv1/downstream.rs @@ -5,15 +5,14 @@ use crate::{ status, }; use async_channel::{bounded, Receiver, Sender}; -use async_std::{ - io::BufReader, +use error_handling::handle_result; +use futures::{FutureExt, StreamExt}; +use tokio::{ + io::{AsyncWriteExt, BufReader}, net::{TcpListener, TcpStream}, - prelude::*, - task, + sync::broadcast, + task::AbortHandle, }; -use error_handling::handle_result; -use futures::FutureExt; -use tokio::{sync::broadcast, task::AbortHandle}; use super::{kill, DownstreamMessages, SubmitShareWithChannelId, SUBSCRIBE_TIMEOUT_SECS}; @@ -112,16 +111,10 @@ impl Downstream { upstream_difficulty_config: Arc>, task_collector: Arc>>, ) { - let stream = std::sync::Arc::new(stream); - // Reads and writes from Downstream SV1 Mining Device Client - let (socket_reader, socket_writer) = (stream.clone(), stream); + let (socket_reader, mut socket_writer) = stream.into_split(); let (tx_outgoing, receiver_outgoing) = bounded(10); - let socket_writer_clone = socket_writer.clone(); - // Used to send SV1 `mining.notify` messages to the Downstreams - let _socket_writer_notify = socket_writer; - let downstream = Arc::new(Mutex::new(Downstream { connection_id, authorized_names: vec![], @@ -160,11 +153,9 @@ impl Downstream { // role, or the message is sent upwards to the Bridge for translation into a SV2 message // and then sent to the SV2 Upstream role. let socket_reader_task = tokio::task::spawn(async move { - let reader = BufReader::new(&*socket_reader); - let mut messages = FramedRead::new( - async_compat::Compat::new(reader), - LinesCodec::new_with_max_length(MAX_LINE_LENGTH), - ); + let reader = BufReader::new(socket_reader); + let mut messages = + FramedRead::new(reader, LinesCodec::new_with_max_length(MAX_LINE_LENGTH)); loop { // Read message from SV1 Mining Device Client socket // On message receive, parse to `json_rpc:Message` and send to Upstream @@ -238,7 +229,7 @@ impl Downstream { } }; debug!("Sending to Mining Device: {} - {:?}", &host_, &to_send); - let res = (&*socket_writer_clone) + let res = socket_writer .write_all(to_send.as_bytes()) .await; handle_result!(tx_status_writer, res); @@ -341,7 +332,7 @@ impl Downstream { ); break; } - task::sleep(std::time::Duration::from_secs(1)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } let _ = Self::remove_miner_hashrate_from_channel(self_); @@ -369,41 +360,45 @@ impl Downstream { upstream_difficulty_config: Arc>, task_collector: Arc>>, ) { - let task_collector_downstream = task_collector.clone(); - - let accept_connections = tokio::task::spawn(async move { - let downstream_listener = TcpListener::bind(downstream_addr).await.unwrap(); - let mut downstream_incoming = downstream_listener.incoming(); - - while let Some(stream) = downstream_incoming.next().await { - let stream = stream.expect("Err on SV1 Downstream connection stream"); - let expected_hash_rate = downstream_difficulty_config.min_individual_miner_hashrate; - let open_sv1_downstream = bridge - .safe_lock(|s| s.on_new_sv1_connection(expected_hash_rate)) - .unwrap(); - - let host = stream.peer_addr().unwrap().to_string(); - match open_sv1_downstream { - Ok(opened) => { - info!("PROXY SERVER - ACCEPTING FROM DOWNSTREAM: {}", host); - Downstream::new_downstream( - stream, - opened.channel_id, - tx_sv1_submit.clone(), - tx_mining_notify.subscribe(), - tx_status.listener_to_connection(), - opened.extranonce, - opened.last_notify, - opened.extranonce2_len as usize, - host, - downstream_difficulty_config.clone(), - upstream_difficulty_config.clone(), - task_collector_downstream.clone(), - ) - .await; - } - Err(e) => { - tracing::error!("Failed to create a new downstream connection: {:?}", e); + let accept_connections = tokio::task::spawn({ + let task_collector = task_collector.clone(); + async move { + let listener = TcpListener::bind(downstream_addr).await.unwrap(); + + while let Ok((stream, _)) = listener.accept().await { + let expected_hash_rate = + downstream_difficulty_config.min_individual_miner_hashrate; + let open_sv1_downstream = bridge + .safe_lock(|s| s.on_new_sv1_connection(expected_hash_rate)) + .unwrap(); + + let host = stream.peer_addr().unwrap().to_string(); + + match open_sv1_downstream { + Ok(opened) => { + info!("PROXY SERVER - ACCEPTING FROM DOWNSTREAM: {}", host); + Downstream::new_downstream( + stream, + opened.channel_id, + tx_sv1_submit.clone(), + tx_mining_notify.subscribe(), + tx_status.listener_to_connection(), + opened.extranonce, + opened.last_notify, + opened.extranonce2_len as usize, + host, + downstream_difficulty_config.clone(), + upstream_difficulty_config.clone(), + task_collector.clone(), + ) + .await; + } + Err(e) => { + tracing::error!( + "Failed to create a new downstream connection: {:?}", + e + ); + } } } } diff --git a/roles/translator/src/lib/mod.rs b/roles/translator/src/lib/mod.rs index d0ae2eca8f..5618d98728 100644 --- a/roles/translator/src/lib/mod.rs +++ b/roles/translator/src/lib/mod.rs @@ -247,7 +247,7 @@ impl TranslatorSv2 { if target != [0; 32] { break; }; - async_std::task::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; } let task_collector_bridge = task_collector_init_task.clone(); diff --git a/roles/translator/src/lib/upstream_sv2/diff_management.rs b/roles/translator/src/lib/upstream_sv2/diff_management.rs index 830bf28d8f..69eec25117 100644 --- a/roles/translator/src/lib/upstream_sv2/diff_management.rs +++ b/roles/translator/src/lib/upstream_sv2/diff_management.rs @@ -43,7 +43,7 @@ impl Upstream { super::super::error::ChannelSendError::General(e.to_string()), ) })?; - async_std::task::sleep(Duration::from_secs(timeout as u64)).await; + tokio::time::sleep(Duration::from_secs(timeout as u64)).await; Ok(()) } } diff --git a/roles/translator/src/lib/upstream_sv2/upstream.rs b/roles/translator/src/lib/upstream_sv2/upstream.rs index c025c14071..3f2acfe025 100644 --- a/roles/translator/src/lib/upstream_sv2/upstream.rs +++ b/roles/translator/src/lib/upstream_sv2/upstream.rs @@ -9,12 +9,11 @@ use crate::{ upstream_sv2::{EitherFrame, Message, StdFrame, UpstreamConnection}, }; use async_channel::{Receiver, Sender}; -use async_std::net::TcpStream; use binary_sv2::u256_from_int; use codec_sv2::{HandshakeRole, Initiator}; use error_handling::handle_result; use key_utils::Secp256k1PublicKey; -use network_helpers_sv2::Connection; +use network_helpers_sv2::noise_connection_tokio::Connection; use roles_logic_sv2::{ common_messages_sv2::{Protocol, SetupConnection}, common_properties::{IsMiningUpstream, IsUpstream}, @@ -38,6 +37,7 @@ use std::{ sync::{atomic::AtomicBool, Arc}, }; use tokio::{ + net::TcpStream, task::AbortHandle, time::{sleep, Duration}, }; @@ -154,7 +154,7 @@ impl Upstream { ); // Channel to send and receive messages to the SV2 Upstream role - let (receiver, sender) = Connection::new(socket, HandshakeRole::Initiator(initiator), 10) + let (receiver, sender, _, _) = Connection::new(socket, HandshakeRole::Initiator(initiator)) .await .unwrap(); // Initialize `UpstreamConnection` with channel for SV2 Upstream role communication and From 933495b30b75aef3aace19ce6078f3f91a2e2db2 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Thu, 30 Jan 2025 12:45:20 +0530 Subject: [PATCH 2/2] remove async-compact --- roles/Cargo.lock | 14 -------------- roles/translator/Cargo.toml | 1 - 2 files changed, 15 deletions(-) diff --git a/roles/Cargo.lock b/roles/Cargo.lock index 2d557bae26..2329b56c12 100644 --- a/roles/Cargo.lock +++ b/roles/Cargo.lock @@ -181,19 +181,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "async-compat" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bab94bde396a3f7b4962e396fdad640e241ed797d4d8d77fc8c237d14c58fc0" -dependencies = [ - "futures-core", - "futures-io", - "once_cell", - "pin-project-lite", - "tokio", -] - [[package]] name = "async-executor" version = "1.13.1" @@ -2741,7 +2728,6 @@ name = "translator_sv2" version = "0.1.3" dependencies = [ "async-channel 1.9.0", - "async-compat", "async-recursion 0.3.2", "binary_sv2", "buffer_sv2", diff --git a/roles/translator/Cargo.toml b/roles/translator/Cargo.toml index 5bd0d702c7..11f75be772 100644 --- a/roles/translator/Cargo.toml +++ b/roles/translator/Cargo.toml @@ -41,7 +41,6 @@ v1 = { path = "../../protocols/v1", package="sv1_api" } error_handling = { path = "../../utils/error-handling" } key-utils = { path = "../../utils/key-utils" } tokio-util = { version = "0.7.10", features = ["codec"] } -async-compat = "0.2.1" rand = "0.8.4"