Skip to content

Commit

Permalink
Merge #98: Implement retryable calls
Browse files Browse the repository at this point in the history
b96270e feat(async,blocking)!: implement retryable calls (valued mammal)

Pull request description:

  Based on #93 the PR implements retryable calls for request failure due to too many requests (429) or service unavailable (503).

  Inspired by #71
  h/t @e1a0a0ea

  ### Notes

  I've added the field `max_retries` to the `Builder`. `max_retries` is also added to each of `AsyncClient`, `BlockingClient`. I added the dependency `async-std` in order to have async `sleep`.

  Instead of implementing a trait on the `Request` type as in #71, the approach is to add a method on the client that sends a get request to a url and returns the response after allowing for retries. I tested on the bdk `wallet_esplora_*` example crates against https://blockstream.info/testnet/api and it seemed to resolve the 429 issue.

ACKs for top commit:
  oleonardolima:
    ACK b96270e
  notmandatory:
    tACK b96270e

Tree-SHA512: 78124106959ba9a5cce58e343bbf30c29b4bc7e1ac434ba71eeb2e774d73ea003d0520139781062947ed27563748925c24998b2a5be450bc511bb2c7090a6682
  • Loading branch information
notmandatory committed Sep 25, 2024
2 parents ce6a635 + b96270e commit 1542262
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 25 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ hex = { version = "0.2", package = "hex-conservative" }
log = "^0.4"
minreq = { version = "2.11.0", features = ["json-using-serde"], optional = true }
reqwest = { version = "0.11", features = ["json"], default-features = false, optional = true }
async-std = { version = "1.13.0", optional = true }

[dev-dependencies]
serde_json = "1.0"
Expand All @@ -37,7 +38,7 @@ blocking-https = ["blocking", "minreq/https"]
blocking-https-rustls = ["blocking", "minreq/https-rustls"]
blocking-https-native = ["blocking", "minreq/https-native"]
blocking-https-bundled = ["blocking", "minreq/https-bundled"]
async = ["reqwest", "reqwest/socks"]
async = ["async-std", "reqwest", "reqwest/socks"]
async-https = ["async", "reqwest/default-tls"]
async-https-native = ["async", "reqwest/native-tls"]
async-https-rustls = ["async", "reqwest/rustls-tls"]
Expand Down
52 changes: 44 additions & 8 deletions src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

//! Esplora by way of `reqwest` HTTP client.
use async_std::task;
use std::collections::HashMap;
use std::str::FromStr;

Expand All @@ -24,16 +25,21 @@ use bitcoin::{
#[allow(unused_imports)]
use log::{debug, error, info, trace};

use reqwest::{header, Client};
use reqwest::{header, Client, Response};

use crate::{BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus};
use crate::{
BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus,
BASE_BACKOFF_MILLIS, RETRYABLE_ERROR_CODES,
};

#[derive(Debug, Clone)]
pub struct AsyncClient {
/// The URL of the Esplora Server.
url: String,
/// The inner [`reqwest::Client`] to make HTTP requests.
client: Client,
/// Number of times to retry a request
max_retries: usize,
}

impl AsyncClient {
Expand Down Expand Up @@ -63,12 +69,20 @@ impl AsyncClient {
client_builder = client_builder.default_headers(headers);
}

Ok(Self::from_client(builder.base_url, client_builder.build()?))
Ok(AsyncClient {
url: builder.base_url,
client: client_builder.build()?,
max_retries: builder.max_retries,
})
}

/// Build an async client from the base url and [`Client`]
pub fn from_client(url: String, client: Client) -> Self {
AsyncClient { url, client }
AsyncClient {
url,
client,
max_retries: crate::DEFAULT_MAX_RETRIES,
}
}

/// Make an HTTP GET request to given URL, deserializing to any `T` that
Expand All @@ -84,7 +98,7 @@ impl AsyncClient {
/// [`bitcoin::consensus::Decodable`] deserialization.
async fn get_response<T: Decodable>(&self, path: &str) -> Result<T, Error> {
let url = format!("{}{}", self.url, path);
let response = self.client.get(url).send().await?;
let response = self.get_with_retry(&url).await?;

if !response.status().is_success() {
return Err(Error::HttpResponse {
Expand Down Expand Up @@ -124,7 +138,7 @@ impl AsyncClient {
path: &str,
) -> Result<T, Error> {
let url = format!("{}{}", self.url, path);
let response = self.client.get(url).send().await?;
let response = self.get_with_retry(&url).await?;

if !response.status().is_success() {
return Err(Error::HttpResponse {
Expand Down Expand Up @@ -166,7 +180,7 @@ impl AsyncClient {
/// [`bitcoin::consensus::Decodable`] deserialization.
async fn get_response_hex<T: Decodable>(&self, path: &str) -> Result<T, Error> {
let url = format!("{}{}", self.url, path);
let response = self.client.get(url).send().await?;
let response = self.get_with_retry(&url).await?;

if !response.status().is_success() {
return Err(Error::HttpResponse {
Expand Down Expand Up @@ -203,7 +217,7 @@ impl AsyncClient {
/// This function will return an error either from the HTTP client.
async fn get_response_text(&self, path: &str) -> Result<String, Error> {
let url = format!("{}{}", self.url, path);
let response = self.client.get(url).send().await?;
let response = self.get_with_retry(&url).await?;

if !response.status().is_success() {
return Err(Error::HttpResponse {
Expand Down Expand Up @@ -410,4 +424,26 @@ impl AsyncClient {
pub fn client(&self) -> &Client {
&self.client
}

/// Sends a GET request to the given `url`, retrying failed attempts
/// for retryable error codes until max retries hit.
async fn get_with_retry(&self, url: &str) -> Result<Response, Error> {
let mut delay = BASE_BACKOFF_MILLIS;
let mut attempts = 0;

loop {
match self.client.get(url).send().await? {
resp if attempts < self.max_retries && is_status_retryable(resp.status()) => {
task::sleep(delay).await;
attempts += 1;
delay *= 2;
}
resp => return Ok(resp),
}
}
}
}

fn is_status_retryable(status: reqwest::StatusCode) -> bool {
RETRYABLE_ERROR_CODES.contains(&status.as_u16())
}
62 changes: 46 additions & 16 deletions src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::str::FromStr;
use std::thread;

#[allow(unused_imports)]
use log::{debug, error, info, trace};

use minreq::{Proxy, Request};
use minreq::{Proxy, Request, Response};

use bitcoin::consensus::{deserialize, serialize, Decodable};
use bitcoin::hashes::{sha256, Hash};
Expand All @@ -27,7 +28,10 @@ use bitcoin::{
block::Header as BlockHeader, Block, BlockHash, MerkleBlock, Script, Transaction, Txid,
};

use crate::{BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus};
use crate::{
BlockStatus, BlockSummary, Builder, Error, MerkleProof, OutputStatus, Tx, TxStatus,
BASE_BACKOFF_MILLIS, RETRYABLE_ERROR_CODES,
};

#[derive(Debug, Clone)]
pub struct BlockingClient {
Expand All @@ -39,6 +43,8 @@ pub struct BlockingClient {
pub timeout: Option<u64>,
/// HTTP headers to set on every request made to Esplora server
pub headers: HashMap<String, String>,
/// Number of times to retry a request
pub max_retries: usize,
}

impl BlockingClient {
Expand All @@ -49,6 +55,7 @@ impl BlockingClient {
proxy: builder.proxy,
timeout: builder.timeout,
headers: builder.headers,
max_retries: builder.max_retries,
}
}

Expand Down Expand Up @@ -80,20 +87,20 @@ impl BlockingClient {
}

fn get_opt_response<T: Decodable>(&self, path: &str) -> Result<Option<T>, Error> {
match self.get_request(path)?.send() {
match self.get_with_retry(path) {
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
let message = resp.as_str().unwrap_or_default().to_string();
Err(Error::HttpResponse { status, message })
}
Ok(resp) => Ok(Some(deserialize::<T>(resp.as_bytes())?)),
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

fn get_opt_response_txid(&self, path: &str) -> Result<Option<Txid>, Error> {
match self.get_request(path)?.send() {
match self.get_with_retry(path) {
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
Expand All @@ -103,12 +110,12 @@ impl BlockingClient {
Ok(resp) => Ok(Some(
Txid::from_str(resp.as_str().map_err(Error::Minreq)?).map_err(Error::HexToArray)?,
)),
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

fn get_opt_response_hex<T: Decodable>(&self, path: &str) -> Result<Option<T>, Error> {
match self.get_request(path)?.send() {
match self.get_with_retry(path) {
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
Expand All @@ -122,12 +129,12 @@ impl BlockingClient {
.map_err(Error::BitcoinEncoding)
.map(|r| Some(r))
}
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

fn get_response_hex<T: Decodable>(&self, path: &str) -> Result<T, Error> {
match self.get_request(path)?.send() {
match self.get_with_retry(path) {
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
let message = resp.as_str().unwrap_or_default().to_string();
Expand All @@ -138,51 +145,51 @@ impl BlockingClient {
let hex_vec = Vec::from_hex(hex_str).unwrap();
deserialize::<T>(&hex_vec).map_err(Error::BitcoinEncoding)
}
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

fn get_response_json<'a, T: serde::de::DeserializeOwned>(
&'a self,
path: &'a str,
) -> Result<T, Error> {
let response = self.get_request(path)?.send();
let response = self.get_with_retry(path);
match response {
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
let message = resp.as_str().unwrap_or_default().to_string();
Err(Error::HttpResponse { status, message })
}
Ok(resp) => Ok(resp.json::<T>().map_err(Error::Minreq)?),
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

fn get_opt_response_json<T: serde::de::DeserializeOwned>(
&self,
path: &str,
) -> Result<Option<T>, Error> {
match self.get_request(path)?.send() {
match self.get_with_retry(path) {
Ok(resp) if is_status_not_found(resp.status_code) => Ok(None),
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
let message = resp.as_str().unwrap_or_default().to_string();
Err(Error::HttpResponse { status, message })
}
Ok(resp) => Ok(Some(resp.json::<T>()?)),
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

fn get_response_str(&self, path: &str) -> Result<String, Error> {
match self.get_request(path)?.send() {
match self.get_with_retry(path) {
Ok(resp) if !is_status_ok(resp.status_code) => {
let status = u16::try_from(resp.status_code).map_err(Error::StatusCode)?;
let message = resp.as_str().unwrap_or_default().to_string();
Err(Error::HttpResponse { status, message })
}
Ok(resp) => Ok(resp.as_str()?.to_string()),
Err(e) => Err(Error::Minreq(e)),
Err(e) => Err(e),
}
}

Expand Down Expand Up @@ -339,6 +346,24 @@ impl BlockingClient {
};
self.get_response_json(&path)
}

/// Sends a GET request to the given `url`, retrying failed attempts
/// for retryable error codes until max retries hit.
pub fn get_with_retry(&self, url: &str) -> Result<Response, Error> {
let mut delay = BASE_BACKOFF_MILLIS;
let mut attempts = 0;

loop {
match self.get_request(url)?.send()? {
resp if attempts < self.max_retries && is_status_retryable(resp.status_code) => {
thread::sleep(delay);
attempts += 1;
delay *= 2;
}
resp => return Ok(resp),
}
}
}
}

fn is_status_ok(status: i32) -> bool {
Expand All @@ -348,3 +373,8 @@ fn is_status_ok(status: i32) -> bool {
fn is_status_not_found(status: i32) -> bool {
status == 404
}

fn is_status_retryable(status: i32) -> bool {
let status = status as u16;
RETRYABLE_ERROR_CODES.contains(&status)
}
24 changes: 24 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
use std::collections::HashMap;
use std::fmt;
use std::num::TryFromIntError;
use std::time::Duration;

pub mod api;

Expand All @@ -83,6 +84,19 @@ pub use blocking::BlockingClient;
#[cfg(feature = "async")]
pub use r#async::AsyncClient;

/// Response status codes for which the request may be retried.
const RETRYABLE_ERROR_CODES: [u16; 3] = [
429, // TOO_MANY_REQUESTS
500, // INTERNAL_SERVER_ERROR
503, // SERVICE_UNAVAILABLE
];

/// Base backoff in milliseconds.
const BASE_BACKOFF_MILLIS: Duration = Duration::from_millis(256);

/// Default max retries.
const DEFAULT_MAX_RETRIES: usize = 6;

/// Get a fee value in sats/vbytes from the estimates
/// that matches the confirmation target set as parameter.
///
Expand Down Expand Up @@ -117,6 +131,8 @@ pub struct Builder {
pub timeout: Option<u64>,
/// HTTP headers to set on every request made to Esplora server.
pub headers: HashMap<String, String>,
/// Max retries
pub max_retries: usize,
}

impl Builder {
Expand All @@ -127,6 +143,7 @@ impl Builder {
proxy: None,
timeout: None,
headers: HashMap::new(),
max_retries: DEFAULT_MAX_RETRIES,
}
}

Expand All @@ -148,6 +165,13 @@ impl Builder {
self
}

/// Set the maximum number of times to retry a request if the response status
/// is one of [`RETRYABLE_ERROR_CODES`].
pub fn max_retries(mut self, count: usize) -> Self {
self.max_retries = count;
self
}

/// Build a blocking client from builder
#[cfg(feature = "blocking")]
pub fn build_blocking(self) -> BlockingClient {
Expand Down

0 comments on commit 1542262

Please sign in to comment.