diff --git a/Cargo.toml b/Cargo.toml index 8963fdf..c2f01b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ hex = { version = "0.2", package = "hex-conservative" } log = "^0.4" minreq = { version = "2.11.0", features = ["json-using-serde"], optional = true } reqwest = { version = "0.11", features = ["json"], default-features = false, optional = true } +async-std = { version = "1.13.0", optional = true } [dev-dependencies] serde_json = "1.0" @@ -37,7 +38,7 @@ blocking-https = ["blocking", "minreq/https"] blocking-https-rustls = ["blocking", "minreq/https-rustls"] blocking-https-native = ["blocking", "minreq/https-native"] blocking-https-bundled = ["blocking", "minreq/https-bundled"] -async = ["reqwest", "reqwest/socks"] +async = ["async-std", "reqwest", "reqwest/socks"] async-https = ["async", "reqwest/default-tls"] async-https-native = ["async", "reqwest/native-tls"] async-https-rustls = ["async", "reqwest/rustls-tls"] diff --git a/src/async.rs b/src/async.rs index ac0a761..93e4449 100644 --- a/src/async.rs +++ b/src/async.rs @@ -11,6 +11,7 @@ //! Esplora by way of `reqwest` HTTP client. +use async_std::task; use std::collections::HashMap; use std::str::FromStr; @@ -24,9 +25,12 @@ use bitcoin::{ #[allow(unused_imports)] use log::{debug, error, info, trace}; -use reqwest::{header, Client}; +use reqwest::{header, Client, Response}; -use crate::{BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus}; +use crate::{ + BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus, + BASE_BACKOFF_MILLIS, RETRYABLE_ERROR_CODES, +}; #[derive(Debug, Clone)] pub struct AsyncClient { @@ -34,6 +38,8 @@ pub struct AsyncClient { url: String, /// The inner [`reqwest::Client`] to make HTTP requests. client: Client, + /// Number of times to retry a request + max_retries: usize, } impl AsyncClient { @@ -63,12 +69,20 @@ impl AsyncClient { client_builder = client_builder.default_headers(headers); } - Ok(Self::from_client(builder.base_url, client_builder.build()?)) + Ok(AsyncClient { + url: builder.base_url, + client: client_builder.build()?, + max_retries: builder.max_retries, + }) } /// Build an async client from the base url and [`Client`] pub fn from_client(url: String, client: Client) -> Self { - AsyncClient { url, client } + AsyncClient { + url, + client, + max_retries: crate::DEFAULT_MAX_RETRIES, + } } /// Make an HTTP GET request to given URL, deserializing to any `T` that @@ -84,7 +98,7 @@ impl AsyncClient { /// [`bitcoin::consensus::Decodable`] deserialization. async fn get_response(&self, path: &str) -> Result { let url = format!("{}{}", self.url, path); - let response = self.client.get(url).send().await?; + let response = self.get_with_retry(&url).await?; if !response.status().is_success() { return Err(Error::HttpResponse { @@ -124,7 +138,7 @@ impl AsyncClient { path: &str, ) -> Result { let url = format!("{}{}", self.url, path); - let response = self.client.get(url).send().await?; + let response = self.get_with_retry(&url).await?; if !response.status().is_success() { return Err(Error::HttpResponse { @@ -166,7 +180,7 @@ impl AsyncClient { /// [`bitcoin::consensus::Decodable`] deserialization. async fn get_response_hex(&self, path: &str) -> Result { let url = format!("{}{}", self.url, path); - let response = self.client.get(url).send().await?; + let response = self.get_with_retry(&url).await?; if !response.status().is_success() { return Err(Error::HttpResponse { @@ -203,7 +217,7 @@ impl AsyncClient { /// This function will return an error either from the HTTP client. async fn get_response_text(&self, path: &str) -> Result { let url = format!("{}{}", self.url, path); - let response = self.client.get(url).send().await?; + let response = self.get_with_retry(&url).await?; if !response.status().is_success() { return Err(Error::HttpResponse { @@ -410,4 +424,26 @@ impl AsyncClient { pub fn client(&self) -> &Client { &self.client } + + /// Sends a GET request to the given `url`, retrying failed attempts + /// for retryable error codes until max retries hit. + async fn get_with_retry(&self, url: &str) -> Result { + let mut delay = BASE_BACKOFF_MILLIS; + let mut attempts = 0; + + loop { + match self.client.get(url).send().await? { + resp if attempts < self.max_retries && is_status_retryable(resp.status()) => { + task::sleep(delay).await; + attempts += 1; + delay *= 2; + } + resp => return Ok(resp), + } + } + } +} + +fn is_status_retryable(status: reqwest::StatusCode) -> bool { + RETRYABLE_ERROR_CODES.contains(&status.as_u16()) } diff --git a/src/blocking.rs b/src/blocking.rs index be1cf1e..e9f51b7 100644 --- a/src/blocking.rs +++ b/src/blocking.rs @@ -14,11 +14,12 @@ use std::collections::HashMap; use std::convert::TryFrom; use std::str::FromStr; +use std::thread; #[allow(unused_imports)] use log::{debug, error, info, trace}; -use minreq::{Proxy, Request}; +use minreq::{Proxy, Request, Response}; use bitcoin::consensus::{deserialize, serialize, Decodable}; use bitcoin::hashes::{sha256, Hash}; @@ -27,7 +28,10 @@ use bitcoin::{ block::Header as BlockHeader, Block, BlockHash, MerkleBlock, Script, Transaction, Txid, }; -use crate::{BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus}; +use crate::{ + BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus, + BASE_BACKOFF_MILLIS, RETRYABLE_ERROR_CODES, +}; #[derive(Debug, Clone)] pub struct BlockingClient { @@ -39,6 +43,8 @@ pub struct BlockingClient { pub timeout: Option, /// HTTP headers to set on every request made to Esplora server pub headers: HashMap, + /// Number of times to retry a request + pub max_retries: usize, } impl BlockingClient { @@ -49,6 +55,7 @@ impl BlockingClient { proxy: builder.proxy, timeout: builder.timeout, headers: builder.headers, + max_retries: builder.max_retries, } } @@ -80,7 +87,7 @@ impl BlockingClient { } fn get_opt_response(&self, path: &str) -> Result, Error> { - match self.get_request(path)?.send() { + match self.get_with_retry(path) { Ok(resp) if is_status_not_found(resp.status_code) => Ok(None), Ok(resp) if !is_status_ok(resp.status_code) => { let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?; @@ -88,12 +95,12 @@ impl BlockingClient { Err(Error::HttpResponse { status, message }) } Ok(resp) => Ok(Some(deserialize::(resp.as_bytes())?)), - Err(e) => Err(Error::Minreq(e)), + Err(e) => Err(e), } } fn get_opt_response_txid(&self, path: &str) -> Result, Error> { - match self.get_request(path)?.send() { + match self.get_with_retry(path) { Ok(resp) if is_status_not_found(resp.status_code) => Ok(None), Ok(resp) if !is_status_ok(resp.status_code) => { let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?; @@ -103,12 +110,12 @@ impl BlockingClient { Ok(resp) => Ok(Some( Txid::from_str(resp.as_str().map_err(Error::Minreq)?).map_err(Error::HexToArray)?, )), - Err(e) => Err(Error::Minreq(e)), + Err(e) => Err(e), } } fn get_opt_response_hex(&self, path: &str) -> Result, Error> { - match self.get_request(path)?.send() { + match self.get_with_retry(path) { Ok(resp) if is_status_not_found(resp.status_code) => Ok(None), Ok(resp) if !is_status_ok(resp.status_code) => { let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?; @@ -122,12 +129,12 @@ impl BlockingClient { .map_err(Error::BitcoinEncoding) .map(|r| Some(r)) } - Err(e) => Err(Error::Minreq(e)), + Err(e) => Err(e), } } fn get_response_hex(&self, path: &str) -> Result { - match self.get_request(path)?.send() { + match self.get_with_retry(path) { Ok(resp) if !is_status_ok(resp.status_code) => { let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?; let message = resp.as_str().unwrap_or_default().to_string(); @@ -138,7 +145,7 @@ impl BlockingClient { let hex_vec = Vec::from_hex(hex_str).unwrap(); deserialize::(&hex_vec).map_err(Error::BitcoinEncoding) } - Err(e) => Err(Error::Minreq(e)), + Err(e) => Err(e), } } @@ -146,7 +153,7 @@ impl BlockingClient { &'a self, path: &'a str, ) -> Result { - let response = self.get_request(path)?.send(); + let response = self.get_with_retry(path); match response { Ok(resp) if !is_status_ok(resp.status_code) => { let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?; @@ -154,7 +161,7 @@ impl BlockingClient { Err(Error::HttpResponse { status, message }) } Ok(resp) => Ok(resp.json::().map_err(Error::Minreq)?), - Err(e) => Err(Error::Minreq(e)), + Err(e) => Err(e), } } @@ -162,7 +169,7 @@ impl BlockingClient { &self, path: &str, ) -> Result, Error> { - match self.get_request(path)?.send() { + match self.get_with_retry(path) { Ok(resp) if is_status_not_found(resp.status_code) => Ok(None), Ok(resp) if !is_status_ok(resp.status_code) => { let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?; @@ -170,19 +177,19 @@ impl BlockingClient { Err(Error::HttpResponse { status, message }) } Ok(resp) => Ok(Some(resp.json::()?)), - Err(e) => Err(Error::Minreq(e)), + Err(e) => Err(e), } } fn get_response_str(&self, path: &str) -> Result { - match self.get_request(path)?.send() { + match self.get_with_retry(path) { Ok(resp) if !is_status_ok(resp.status_code) => { let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?; let message = resp.as_str().unwrap_or_default().to_string(); Err(Error::HttpResponse { status, message }) } Ok(resp) => Ok(resp.as_str()?.to_string()), - Err(e) => Err(Error::Minreq(e)), + Err(e) => Err(e), } } @@ -339,6 +346,24 @@ impl BlockingClient { }; self.get_response_json(&path) } + + /// Sends a GET request to the given `url`, retrying failed attempts + /// for retryable error codes until max retries hit. + pub fn get_with_retry(&self, url: &str) -> Result { + let mut delay = BASE_BACKOFF_MILLIS; + let mut attempts = 0; + + loop { + match self.get_request(url)?.send()? { + resp if attempts < self.max_retries && is_status_retryable(resp.status_code) => { + thread::sleep(delay); + attempts += 1; + delay *= 2; + } + resp => return Ok(resp), + } + } + } } fn is_status_ok(status: i32) -> bool { @@ -348,3 +373,8 @@ fn is_status_ok(status: i32) -> bool { fn is_status_not_found(status: i32) -> bool { status == 404 } + +fn is_status_retryable(status: i32) -> bool { + let status = status as u16; + RETRYABLE_ERROR_CODES.contains(&status) +} diff --git a/src/lib.rs b/src/lib.rs index 20722be..75b4730 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,6 +69,7 @@ use std::collections::HashMap; use std::fmt; use std::num::TryFromIntError; +use std::time::Duration; pub mod api; @@ -83,6 +84,19 @@ pub use blocking::BlockingClient; #[cfg(feature = "async")] pub use r#async::AsyncClient; +/// Response status codes for which the request may be retried. +const RETRYABLE_ERROR_CODES: [u16; 3] = [ + 429, // TOO_MANY_REQUESTS + 500, // INTERNAL_SERVER_ERROR + 503, // SERVICE_UNAVAILABLE +]; + +/// Base backoff in milliseconds. +const BASE_BACKOFF_MILLIS: Duration = Duration::from_millis(256); + +/// Default max retries. +const DEFAULT_MAX_RETRIES: usize = 6; + /// Get a fee value in sats/vbytes from the estimates /// that matches the confirmation target set as parameter. /// @@ -117,6 +131,8 @@ pub struct Builder { pub timeout: Option, /// HTTP headers to set on every request made to Esplora server. pub headers: HashMap, + /// Max retries + pub max_retries: usize, } impl Builder { @@ -127,6 +143,7 @@ impl Builder { proxy: None, timeout: None, headers: HashMap::new(), + max_retries: DEFAULT_MAX_RETRIES, } } @@ -148,6 +165,13 @@ impl Builder { self } + /// Set the maximum number of times to retry a request if the response status + /// is one of [`RETRYABLE_ERROR_CODES`]. + pub fn max_retries(mut self, count: usize) -> Self { + self.max_retries = count; + self + } + /// Build a blocking client from builder #[cfg(feature = "blocking")] pub fn build_blocking(self) -> BlockingClient {