Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove async std from translator #1424

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions roles/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions roles/translator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ path = "src/main.rs"
stratum-common = { path = "../../common" }
async-channel = "1.5.1"
async-recursion = "0.3.2"
async-std = { version = "1.12.0", features = ["attributes"] }
binary_sv2 = { path = "../../protocols/v2/binary-sv2/binary-sv2" }
buffer_sv2 = { path = "../../utils/buffer" }
codec_sv2 = { path = "../../protocols/v2/codec-sv2", features = ["noise_sv2", "with_buffer_pool"] }
framing_sv2 = { path = "../../protocols/v2/framing-sv2" }
network_helpers_sv2 = { path = "../roles-utils/network-helpers", features=["async_std", "with_buffer_pool"] }
network_helpers_sv2 = { path = "../roles-utils/network-helpers", features=["with_tokio", "with_buffer_pool"] }
once_cell = "1.12.0"
roles_logic_sv2 = { path = "../../protocols/v2/roles-logic-sv2" }
serde = { version = "1.0.89", default-features = false, features = ["derive", "alloc"] }
Expand All @@ -42,7 +41,6 @@ v1 = { path = "../../protocols/v1", package="sv1_api" }
error_handling = { path = "../../utils/error-handling" }
key-utils = { path = "../../utils/key-utils" }
tokio-util = { version = "0.7.10", features = ["codec"] }
async-compat = "0.2.1"
rand = "0.8.4"


Expand Down
107 changes: 51 additions & 56 deletions roles/translator/src/lib/downstream_sv1/downstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ use crate::{
status,
};
use async_channel::{bounded, Receiver, Sender};
use async_std::{
io::BufReader,
use error_handling::handle_result;
use futures::{FutureExt, StreamExt};
use tokio::{
io::{AsyncWriteExt, BufReader},
net::{TcpListener, TcpStream},
prelude::*,
task,
sync::broadcast,
task::AbortHandle,
};
use error_handling::handle_result;
use futures::FutureExt;
use tokio::{sync::broadcast, task::AbortHandle};

use super::{kill, DownstreamMessages, SubmitShareWithChannelId, SUBSCRIBE_TIMEOUT_SECS};

Expand Down Expand Up @@ -112,16 +111,10 @@ impl Downstream {
upstream_difficulty_config: Arc<Mutex<UpstreamDifficultyConfig>>,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
) {
let stream = std::sync::Arc::new(stream);

// Reads and writes from Downstream SV1 Mining Device Client
let (socket_reader, socket_writer) = (stream.clone(), stream);
let (socket_reader, mut socket_writer) = stream.into_split();
let (tx_outgoing, receiver_outgoing) = bounded(10);

let socket_writer_clone = socket_writer.clone();
// Used to send SV1 `mining.notify` messages to the Downstreams
let _socket_writer_notify = socket_writer;

let downstream = Arc::new(Mutex::new(Downstream {
connection_id,
authorized_names: vec![],
Expand Down Expand Up @@ -160,11 +153,9 @@ impl Downstream {
// role, or the message is sent upwards to the Bridge for translation into a SV2 message
// and then sent to the SV2 Upstream role.
let socket_reader_task = tokio::task::spawn(async move {
let reader = BufReader::new(&*socket_reader);
let mut messages = FramedRead::new(
async_compat::Compat::new(reader),
LinesCodec::new_with_max_length(MAX_LINE_LENGTH),
);
let reader = BufReader::new(socket_reader);
let mut messages =
FramedRead::new(reader, LinesCodec::new_with_max_length(MAX_LINE_LENGTH));
Shourya742 marked this conversation as resolved.
Show resolved Hide resolved
loop {
// Read message from SV1 Mining Device Client socket
// On message receive, parse to `json_rpc:Message` and send to Upstream
Expand Down Expand Up @@ -238,7 +229,7 @@ impl Downstream {
}
};
debug!("Sending to Mining Device: {} - {:?}", &host_, &to_send);
let res = (&*socket_writer_clone)
let res = socket_writer
.write_all(to_send.as_bytes())
.await;
handle_result!(tx_status_writer, res);
Expand Down Expand Up @@ -341,7 +332,7 @@ impl Downstream {
);
break;
}
task::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
let _ = Self::remove_miner_hashrate_from_channel(self_);
Expand Down Expand Up @@ -369,41 +360,45 @@ impl Downstream {
upstream_difficulty_config: Arc<Mutex<UpstreamDifficultyConfig>>,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
) {
let task_collector_downstream = task_collector.clone();

let accept_connections = tokio::task::spawn(async move {
let downstream_listener = TcpListener::bind(downstream_addr).await.unwrap();
let mut downstream_incoming = downstream_listener.incoming();

while let Some(stream) = downstream_incoming.next().await {
let stream = stream.expect("Err on SV1 Downstream connection stream");
let expected_hash_rate = downstream_difficulty_config.min_individual_miner_hashrate;
let open_sv1_downstream = bridge
.safe_lock(|s| s.on_new_sv1_connection(expected_hash_rate))
.unwrap();

let host = stream.peer_addr().unwrap().to_string();
match open_sv1_downstream {
Ok(opened) => {
info!("PROXY SERVER - ACCEPTING FROM DOWNSTREAM: {}", host);
Downstream::new_downstream(
stream,
opened.channel_id,
tx_sv1_submit.clone(),
tx_mining_notify.subscribe(),
tx_status.listener_to_connection(),
opened.extranonce,
opened.last_notify,
opened.extranonce2_len as usize,
host,
downstream_difficulty_config.clone(),
upstream_difficulty_config.clone(),
task_collector_downstream.clone(),
)
.await;
}
Err(e) => {
tracing::error!("Failed to create a new downstream connection: {:?}", e);
let accept_connections = tokio::task::spawn({
let task_collector = task_collector.clone();
async move {
plebhash marked this conversation as resolved.
Show resolved Hide resolved
let listener = TcpListener::bind(downstream_addr).await.unwrap();

while let Ok((stream, _)) = listener.accept().await {
let expected_hash_rate =
downstream_difficulty_config.min_individual_miner_hashrate;
let open_sv1_downstream = bridge
.safe_lock(|s| s.on_new_sv1_connection(expected_hash_rate))
.unwrap();

let host = stream.peer_addr().unwrap().to_string();

match open_sv1_downstream {
Ok(opened) => {
info!("PROXY SERVER - ACCEPTING FROM DOWNSTREAM: {}", host);
Downstream::new_downstream(
stream,
opened.channel_id,
tx_sv1_submit.clone(),
tx_mining_notify.subscribe(),
tx_status.listener_to_connection(),
opened.extranonce,
opened.last_notify,
opened.extranonce2_len as usize,
host,
downstream_difficulty_config.clone(),
upstream_difficulty_config.clone(),
task_collector.clone(),
)
.await;
}
Err(e) => {
tracing::error!(
"Failed to create a new downstream connection: {:?}",
e
);
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion roles/translator/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl TranslatorSv2 {
if target != [0; 32] {
break;
};
async_std::task::sleep(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

let task_collector_bridge = task_collector_init_task.clone();
Expand Down
2 changes: 1 addition & 1 deletion roles/translator/src/lib/upstream_sv2/diff_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Upstream {
super::super::error::ChannelSendError::General(e.to_string()),
)
})?;
async_std::task::sleep(Duration::from_secs(timeout as u64)).await;
tokio::time::sleep(Duration::from_secs(timeout as u64)).await;
Ok(())
}
}
6 changes: 3 additions & 3 deletions roles/translator/src/lib/upstream_sv2/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ use crate::{
upstream_sv2::{EitherFrame, Message, StdFrame, UpstreamConnection},
};
use async_channel::{Receiver, Sender};
use async_std::net::TcpStream;
use binary_sv2::u256_from_int;
use codec_sv2::{HandshakeRole, Initiator};
use error_handling::handle_result;
use key_utils::Secp256k1PublicKey;
use network_helpers_sv2::Connection;
use network_helpers_sv2::noise_connection_tokio::Connection;
Shourya742 marked this conversation as resolved.
Show resolved Hide resolved
use roles_logic_sv2::{
common_messages_sv2::{Protocol, SetupConnection},
common_properties::{IsMiningUpstream, IsUpstream},
Expand All @@ -38,6 +37,7 @@ use std::{
sync::{atomic::AtomicBool, Arc},
};
use tokio::{
net::TcpStream,
task::AbortHandle,
time::{sleep, Duration},
};
Expand Down Expand Up @@ -154,7 +154,7 @@ impl Upstream {
);

// Channel to send and receive messages to the SV2 Upstream role
let (receiver, sender) = Connection::new(socket, HandshakeRole::Initiator(initiator), 10)
let (receiver, sender, _, _) = Connection::new(socket, HandshakeRole::Initiator(initiator))
Shourya742 marked this conversation as resolved.
Show resolved Hide resolved
.await
.unwrap();
// Initialize `UpstreamConnection` with channel for SV2 Upstream role communication and
Expand Down
Loading