diff --git a/Cargo.lock b/Cargo.lock index 3dd0cbb20640..92246398039c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3111,6 +3111,7 @@ dependencies = [ "futures", "glob", "home", + "http 1.1.0", "itoa", "memchr", "memmap2", diff --git a/Cargo.toml b/Cargo.toml index b996a7877bfe..c72e1eb572c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ flate2 = { version = "1", default-features = false } futures = "0.3.25" hashbrown = { version = "0.14", features = ["rayon", "ahash", "serde"] } hex = "0.4.3" +http = { version = "1.1", default-features = false } indexmap = { version = "2", features = ["std"] } itoa = "1.0.6" itoap = { version = "1", features = ["simd"] } diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index 99602fc968e8..a5ad43e36534 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -28,6 +28,7 @@ fast-float = { workspace = true, optional = true } flate2 = { workspace = true, optional = true } futures = { workspace = true, optional = true } glob = { version = "0.3" } +http = { workspace = true, optional = true } itoa = { workspace = true, optional = true } memchr = { workspace = true } memmap = { package = "memmap2", version = "0.7" } @@ -116,7 +117,7 @@ file_cache = ["async", "dep:blake3", "dep:fs4"] aws = ["object_store/aws", "cloud", "reqwest"] azure = ["object_store/azure", "cloud"] gcp = ["object_store/gcp", "cloud"] -http = ["object_store/http", "cloud"] +http = ["object_store/http", "cloud", "dep:http"] partition = ["polars-core/partition_by"] temporal = ["dtype-datetime", "dtype-date", "dtype-time"] simd = [] diff --git a/crates/polars-io/src/cloud/object_store_setup.rs b/crates/polars-io/src/cloud/object_store_setup.rs index d70abb421005..3bf5634136d4 100644 --- a/crates/polars-io/src/cloud/object_store_setup.rs +++ b/crates/polars-io/src/cloud/object_store_setup.rs @@ -113,7 +113,7 @@ pub async fn build_object_store( { let store = object_store::http::HttpBuilder::new() .with_url(url) - .with_client_options(super::get_client_options()) + .with_client_options(super::get_client_options(&options)?) .build()?; Ok::<_, PolarsError>(Arc::new(store) as Arc) } diff --git a/crates/polars-io/src/cloud/options.rs b/crates/polars-io/src/cloud/options.rs index c175f09829a1..28a12a2740ab 100644 --- a/crates/polars-io/src/cloud/options.rs +++ b/crates/polars-io/src/cloud/options.rs @@ -4,6 +4,8 @@ use std::io::Read; use std::path::Path; use std::str::FromStr; +#[cfg(feature = "http")] +use http::header::{HeaderName, HeaderValue}; #[cfg(feature = "aws")] use object_store::aws::AmazonS3Builder; #[cfg(feature = "aws")] @@ -18,6 +20,8 @@ use object_store::gcp::GoogleCloudStorageBuilder; pub use object_store::gcp::GoogleConfigKey; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))] use object_store::ClientOptions; +#[cfg(feature = "http")] +use object_store::Error as ObjectStoreError; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] use object_store::{BackoffConfig, RetryConfig}; #[cfg(feature = "aws")] @@ -45,6 +49,29 @@ use crate::utils::resolve_homedir; static BUCKET_REGION: Lazy>> = Lazy::new(|| std::sync::Mutex::new(FastFixedCache::new(32))); +#[allow(dead_code)] +#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[non_exhaustive] +pub enum HttpConfigKey { + Headers, +} + +#[cfg(feature = "http")] +impl FromStr for HttpConfigKey { + type Err = ObjectStoreError; + + fn from_str(s: &str) -> Result { + match s { + "headers" => Ok(HttpConfigKey::Headers), + _ => Err(ObjectStoreError::UnknownConfigurationKey { + key: s.into(), + store: "http", + }), + } + } +} + /// The type of the config keys must satisfy the following requirements: /// 1. must be easily collected into a HashMap, the type required by the object_crate API. /// 2. be Serializable, required when the serde-lazy feature is defined. @@ -67,6 +94,8 @@ pub struct CloudOptions { azure: Option>, #[cfg(feature = "gcp")] gcp: Option>, + #[cfg(feature = "http")] + http: Option>, } impl Default for CloudOptions { @@ -81,6 +110,8 @@ impl Default for CloudOptions { azure: Default::default(), #[cfg(feature = "gcp")] gcp: Default::default(), + #[cfg(feature = "http")] + http: Default::default(), } } } @@ -170,8 +201,8 @@ fn get_retry_config(max_retries: usize) -> RetryConfig { } #[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))] -pub(super) fn get_client_options() -> ClientOptions { - ClientOptions::default() +pub(super) fn get_client_options(options: &CloudOptions) -> PolarsResult { + Ok(ClientOptions::default() // We set request timeout super high as the timeout isn't reset at ACK, // but starts from the moment we start downloading a body. // https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.timeout @@ -179,6 +210,26 @@ pub(super) fn get_client_options() -> ClientOptions { // Concurrency can increase connection latency, so set to None, similar to default. .with_connect_timeout_disabled() .with_allow_http(true) + .with_default_headers({ + match &options.http { + Some(options) => options + .iter() + .map(|(opt, value)| { + match opt { + HttpConfigKey::Headers => { + // TODO: find a way to encode a map in the value so we can pass + // multiple headers + let name = "Foobar"; + let name: HeaderName = name.parse().map_err(to_compute_err)?; + let value: HeaderValue = value.parse().map_err(to_compute_err)?; + Ok((name, value)) + }, + } + }) + .collect::>()?, + _ => Default::default(), + } + })) } #[cfg(feature = "aws")] @@ -316,7 +367,7 @@ impl CloudOptions { }; builder - .with_client_options(get_client_options()) + .with_client_options(get_client_options(self)?) .with_retry(get_retry_config(self.max_retries)) .build() .map_err(to_compute_err) @@ -349,13 +400,28 @@ impl CloudOptions { } builder - .with_client_options(get_client_options()) + .with_client_options(get_client_options(self)?) .with_url(url) .with_retry(get_retry_config(self.max_retries)) .build() .map_err(to_compute_err) } + /// Set the configuration for HTTP connections. This is the preferred API from rust. + #[cfg(feature = "gcp")] + pub fn with_http)>>( + mut self, + configs: I, + ) -> Self { + self.http = Some( + configs + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect::>(), + ); + self + } + /// Set the configuration for GCP connections. This is the preferred API from rust. #[cfg(feature = "gcp")] pub fn with_gcp)>>( @@ -383,7 +449,7 @@ impl CloudOptions { } builder - .with_client_options(get_client_options()) + .with_client_options(get_client_options(self)?) .with_url(url) .with_retry(get_retry_config(self.max_retries)) .build() @@ -420,7 +486,17 @@ impl CloudOptions { } }, CloudType::File => Ok(Self::default()), - CloudType::Http => Ok(Self::default()), + CloudType::Http => { + #[cfg(feature = "http")] + { + parsed_untyped_config::(config) + .map(|http| Self::default().with_http(http)) + } + #[cfg(not(feature = "http"))] + { + polars_bail!(ComputeError: "'http' feature is not enabled"); + } + }, CloudType::Gcp => { #[cfg(feature = "gcp")] {