From 52c5f8456a7a3828dc3ec52fb86f565f7bcaceb4 Mon Sep 17 00:00:00 2001 From: hewigovens <360470+hewigovens@users.noreply.github.com> Date: Thu, 4 Apr 2024 17:46:29 +0900 Subject: [PATCH] try with out [async_trait] --- Cargo.lock | 51 ++++++++++++++++++++++ Cargo.toml | 1 + examples/ethereum-rpc/src/main.rs | 10 +++-- reqwest-enum/Cargo.toml | 3 +- reqwest-enum/src/jsonrpc.rs | 2 + reqwest-enum/src/provider.rs | 72 +++++++++++++++++++++++++++++++ 6 files changed, 135 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 852ffbe..abc34c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -202,6 +202,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -209,6 +224,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -217,6 +233,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -235,10 +279,16 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -709,6 +759,7 @@ name = "reqwest-enum" version = "0.1.0" dependencies = [ "async-trait", + "futures", "reqwest", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 5693056..aa9acc4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,4 @@ serde = { version = "^1.0.0", features = ["derive"] } serde_json = "^1.0.0" async-trait = "^0.1.0" +futures = "^0.3.0" diff --git a/examples/ethereum-rpc/src/main.rs b/examples/ethereum-rpc/src/main.rs index d0db046..aa3276a 100644 --- a/examples/ethereum-rpc/src/main.rs +++ b/examples/ethereum-rpc/src/main.rs @@ -1,14 +1,18 @@ extern crate reqwest_enum; use ethereum_rpc::EthereumRPC; use reqwest_enum::jsonrpc::JsonRpcResult; -use reqwest_enum::provider::{JsonRpcProviderType, Provider}; +use reqwest_enum::provider::{JsonRpcProviderType, JsonRpcProviderType2, Provider}; #[tokio::main] async fn main() -> Result<(), Box> { let provider = Provider::::default(); - let targets = vec![EthereumRPC::ChainId, EthereumRPC::GasPrice]; - let results: Vec> = provider.batch(targets).await?; + let targets = vec![ + EthereumRPC::ChainId, + EthereumRPC::GasPrice, + EthereumRPC::BlockNumber, + ]; + let results: Vec> = provider.batch_chunk_by(targets, 2).await?; for result in results { match result { JsonRpcResult::Value(response) => { diff --git a/reqwest-enum/Cargo.toml b/reqwest-enum/Cargo.toml index 81b13fd..2909cc6 100644 --- a/reqwest-enum/Cargo.toml +++ b/reqwest-enum/Cargo.toml @@ -13,10 +13,11 @@ edition = { workspace = true } [features] default = ["jsonrpc"] -jsonrpc = [] +jsonrpc = ["dep:futures"] [dependencies] reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } async-trait = { workspace = true } +futures = { workspace = true, optional = true } diff --git a/reqwest-enum/src/jsonrpc.rs b/reqwest-enum/src/jsonrpc.rs index 6bc468f..6c61569 100644 --- a/reqwest-enum/src/jsonrpc.rs +++ b/reqwest-enum/src/jsonrpc.rs @@ -60,8 +60,10 @@ pub struct JsonRpcError { pub message: String, } +#[cfg(feature = "jsonrpc")] impl std::error::Error for JsonRpcError {} +#[cfg(feature = "jsonrpc")] impl From for JsonRpcError { fn from(err: reqwest::Error) -> Self { JsonRpcError { diff --git a/reqwest-enum/src/provider.rs b/reqwest-enum/src/provider.rs index e03b2d4..59daec6 100644 --- a/reqwest-enum/src/provider.rs +++ b/reqwest-enum/src/provider.rs @@ -31,6 +31,16 @@ pub trait JsonRpcProviderType: ProviderType { ) -> Result>, JsonRpcError>; } +use core::future::Future; +use futures::future::join_all; +pub trait JsonRpcProviderType2: ProviderType { + fn batch_chunk_by( + &self, + targets: Vec, + chunk_size: usize, + ) -> impl Future>, JsonRpcError>>; +} + pub type EndpointFn = fn(target: &T) -> String; pub struct Provider { /// endpoint closure to customize the endpoint (url / path) @@ -94,6 +104,68 @@ where } } +impl JsonRpcProviderType2 for Provider +where + T: JsonRpcTarget + Send, +{ + async fn batch_chunk_by( + &self, + targets: Vec, + chunk_size: usize, + ) -> Result>, JsonRpcError> { + if targets.is_empty() || chunk_size == 0 { + return Err(JsonRpcError { + code: -32600, + message: "Invalid Request".into(), + }); + } + + let chunk_targets = targets.chunks(chunk_size).collect::>(); + let mut rpc_requests = Vec::::new(); + + for (chunk_idx, chunk) in chunk_targets.into_iter().enumerate() { + let target = &chunk[0]; + let mut request = self.request_builder(target); + let mut requests = Vec::::new(); + for (k, v) in chunk.iter().enumerate() { + let request = JsonRpcRequest::new( + v.method_name(), + v.params(), + (chunk_idx * chunk_size + k) as u64, + ); + requests.push(request); + } + + request = request.body(HTTPBody::from_array(&requests).inner); + rpc_requests.push(request); + } + let bodies = join_all(rpc_requests.into_iter().map(|request| async move { + let response = request.send().await?; + let body = response.json::>>().await?; + Ok(body) + })) + .await; + + let mut results = Vec::>::new(); + let mut error: Option = None; + + for result in bodies { + match result { + Ok(body) => { + results.extend(body); + } + Err(err) => { + error = Some(err); + } + } + } + if let Some(err) = error { + return Err(err); + } + Ok(results) + } +} + impl Provider where T: Target,