Skip to content

Commit

Permalink
feat: Allow passing custom HTTP headers (#17197)
Browse files Browse the repository at this point in the history
  • Loading branch information
douglas-raillard-arm committed Jul 12, 2024
1 parent b40be85 commit d69c404
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ flate2 = { version = "1", default-features = false }
futures = "0.3.25"
hashbrown = { version = "0.14", features = ["rayon", "ahash", "serde"] }
hex = "0.4.3"
http = { version = "1.1", default-features = false }
indexmap = { version = "2", features = ["std"] }
itoa = "1.0.6"
itoap = { version = "1", features = ["simd"] }
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ fast-float = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
glob = { version = "0.3" }
http = { workspace = true, optional = true }
itoa = { workspace = true, optional = true }
memchr = { workspace = true }
memmap = { package = "memmap2", version = "0.7" }
Expand Down Expand Up @@ -116,7 +117,7 @@ file_cache = ["async", "dep:blake3", "dep:fs4"]
aws = ["object_store/aws", "cloud", "reqwest"]
azure = ["object_store/azure", "cloud"]
gcp = ["object_store/gcp", "cloud"]
http = ["object_store/http", "cloud"]
http = ["object_store/http", "cloud", "dep:http"]
partition = ["polars-core/partition_by"]
temporal = ["dtype-datetime", "dtype-date", "dtype-time"]
simd = []
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub async fn build_object_store(
{
let store = object_store::http::HttpBuilder::new()
.with_url(url)
.with_client_options(super::get_client_options())
.with_client_options(super::get_client_options(&options)?)
.build()?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
Expand Down
88 changes: 82 additions & 6 deletions crates/polars-io/src/cloud/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::io::Read;
use std::path::Path;
use std::str::FromStr;

#[cfg(feature = "http")]
use http::header::{HeaderName, HeaderValue};
#[cfg(feature = "aws")]
use object_store::aws::AmazonS3Builder;
#[cfg(feature = "aws")]
Expand All @@ -18,6 +20,8 @@ use object_store::gcp::GoogleCloudStorageBuilder;
pub use object_store::gcp::GoogleConfigKey;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
use object_store::ClientOptions;
#[cfg(feature = "http")]
use object_store::Error as ObjectStoreError;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
use object_store::{BackoffConfig, RetryConfig};
#[cfg(feature = "aws")]
Expand Down Expand Up @@ -45,6 +49,29 @@ use crate::utils::resolve_homedir;
static BUCKET_REGION: Lazy<std::sync::Mutex<FastFixedCache<SmartString, SmartString>>> =
Lazy::new(|| std::sync::Mutex::new(FastFixedCache::new(32)));

#[allow(dead_code)]
#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[non_exhaustive]
pub enum HttpConfigKey {
Headers,
}

#[cfg(feature = "http")]
impl FromStr for HttpConfigKey {
type Err = ObjectStoreError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"headers" => Ok(HttpConfigKey::Headers),
_ => Err(ObjectStoreError::UnknownConfigurationKey {
key: s.into(),
store: "http",
}),
}
}
}

/// The type of the config keys must satisfy the following requirements:
/// 1. must be easily collected into a HashMap, the type required by the object_crate API.
/// 2. be Serializable, required when the serde-lazy feature is defined.
Expand All @@ -67,6 +94,8 @@ pub struct CloudOptions {
azure: Option<Configs<AzureConfigKey>>,
#[cfg(feature = "gcp")]
gcp: Option<Configs<GoogleConfigKey>>,
#[cfg(feature = "http")]
http: Option<Configs<HttpConfigKey>>,
}

impl Default for CloudOptions {
Expand All @@ -81,6 +110,8 @@ impl Default for CloudOptions {
azure: Default::default(),
#[cfg(feature = "gcp")]
gcp: Default::default(),
#[cfg(feature = "http")]
http: Default::default(),
}
}
}
Expand Down Expand Up @@ -170,15 +201,35 @@ fn get_retry_config(max_retries: usize) -> RetryConfig {
}

#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
pub(super) fn get_client_options() -> ClientOptions {
ClientOptions::default()
pub(super) fn get_client_options(options: &CloudOptions) -> PolarsResult<ClientOptions> {
Ok(ClientOptions::default()
// We set request timeout super high as the timeout isn't reset at ACK,
// but starts from the moment we start downloading a body.
// https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.timeout
.with_timeout_disabled()
// Concurrency can increase connection latency, so set to None, similar to default.
.with_connect_timeout_disabled()
.with_allow_http(true)
.with_default_headers({
match &options.http {
Some(options) => options
.iter()
.map(|(opt, value)| {
match opt {
HttpConfigKey::Headers => {
// TODO: find a way to encode a map in the value so we can pass
// multiple headers
let name = "Foobar";
let name: HeaderName = name.parse().map_err(to_compute_err)?;
let value: HeaderValue = value.parse().map_err(to_compute_err)?;
Ok((name, value))
},
}
})
.collect::<PolarsResult<_>>()?,
_ => Default::default(),
}
}))
}

#[cfg(feature = "aws")]
Expand Down Expand Up @@ -316,7 +367,7 @@ impl CloudOptions {
};

builder
.with_client_options(get_client_options())
.with_client_options(get_client_options(self)?)
.with_retry(get_retry_config(self.max_retries))
.build()
.map_err(to_compute_err)
Expand Down Expand Up @@ -349,13 +400,28 @@ impl CloudOptions {
}

builder
.with_client_options(get_client_options())
.with_client_options(get_client_options(self)?)
.with_url(url)
.with_retry(get_retry_config(self.max_retries))
.build()
.map_err(to_compute_err)
}

/// Set the configuration for HTTP connections. This is the preferred API from rust.
#[cfg(feature = "gcp")]
pub fn with_http<I: IntoIterator<Item = (HttpConfigKey, impl Into<String>)>>(
mut self,
configs: I,
) -> Self {
self.http = Some(
configs
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<Configs<HttpConfigKey>>(),
);
self
}

/// Set the configuration for GCP connections. This is the preferred API from rust.
#[cfg(feature = "gcp")]
pub fn with_gcp<I: IntoIterator<Item = (GoogleConfigKey, impl Into<String>)>>(
Expand Down Expand Up @@ -383,7 +449,7 @@ impl CloudOptions {
}

builder
.with_client_options(get_client_options())
.with_client_options(get_client_options(self)?)
.with_url(url)
.with_retry(get_retry_config(self.max_retries))
.build()
Expand Down Expand Up @@ -420,7 +486,17 @@ impl CloudOptions {
}
},
CloudType::File => Ok(Self::default()),
CloudType::Http => Ok(Self::default()),
CloudType::Http => {
#[cfg(feature = "http")]
{
parsed_untyped_config::<HttpConfigKey, _>(config)
.map(|http| Self::default().with_http(http))
}
#[cfg(not(feature = "http"))]
{
polars_bail!(ComputeError: "'http' feature is not enabled");
}
},
CloudType::Gcp => {
#[cfg(feature = "gcp")]
{
Expand Down

0 comments on commit d69c404

Please sign in to comment.