Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow passing custom HTTP headers #17219

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ flate2 = { version = "1", default-features = false }
futures = "0.3.25"
hashbrown = { version = "0.14", features = ["rayon", "ahash", "serde"] }
hex = "0.4.3"
http = { version = "1.1", default-features = false }
indexmap = { version = "2", features = ["std"] }
itoa = "1.0.6"
itoap = { version = "1", features = ["simd"] }
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ fast-float = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
glob = { version = "0.3" }
http = { workspace = true, optional = true }
itoa = { workspace = true, optional = true }
memchr = { workspace = true }
memmap = { package = "memmap2", version = "0.7" }
Expand Down Expand Up @@ -116,7 +117,7 @@ file_cache = ["async", "dep:blake3", "dep:fs4"]
aws = ["object_store/aws", "cloud", "reqwest"]
azure = ["object_store/azure", "cloud"]
gcp = ["object_store/gcp", "cloud"]
http = ["object_store/http", "cloud"]
http = ["object_store/http", "cloud", "dep:http"]
partition = ["polars-core/partition_by"]
temporal = ["dtype-datetime", "dtype-date", "dtype-time"]
simd = []
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub async fn build_object_store(
{
let store = object_store::http::HttpBuilder::new()
.with_url(url)
.with_client_options(super::get_client_options())
.with_client_options(super::get_client_options(&options)?)
.build()?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
Expand Down
88 changes: 82 additions & 6 deletions crates/polars-io/src/cloud/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::io::Read;
use std::path::Path;
use std::str::FromStr;

#[cfg(feature = "http")]
use http::header::{HeaderName, HeaderValue};
#[cfg(feature = "aws")]
use object_store::aws::AmazonS3Builder;
#[cfg(feature = "aws")]
Expand All @@ -18,6 +20,8 @@ use object_store::gcp::GoogleCloudStorageBuilder;
pub use object_store::gcp::GoogleConfigKey;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
use object_store::ClientOptions;
#[cfg(feature = "http")]
use object_store::Error as ObjectStoreError;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
use object_store::{BackoffConfig, RetryConfig};
#[cfg(feature = "aws")]
Expand Down Expand Up @@ -45,6 +49,29 @@ use crate::utils::resolve_homedir;
static BUCKET_REGION: Lazy<std::sync::Mutex<FastFixedCache<SmartString, SmartString>>> =
Lazy::new(|| std::sync::Mutex::new(FastFixedCache::new(32)));

#[allow(dead_code)]
#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[non_exhaustive]
pub enum HttpConfigKey {
Headers,
}

#[cfg(feature = "http")]
impl FromStr for HttpConfigKey {
type Err = ObjectStoreError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"headers" => Ok(HttpConfigKey::Headers),
_ => Err(ObjectStoreError::UnknownConfigurationKey {
key: s.into(),
store: "http",
}),
}
}
}

/// The type of the config keys must satisfy the following requirements:
/// 1. must be easily collected into a HashMap, the type required by the object_crate API.
/// 2. be Serializable, required when the serde-lazy feature is defined.
Expand All @@ -67,6 +94,8 @@ pub struct CloudOptions {
azure: Option<Configs<AzureConfigKey>>,
#[cfg(feature = "gcp")]
gcp: Option<Configs<GoogleConfigKey>>,
#[cfg(feature = "http")]
http: Option<Configs<HttpConfigKey>>,
}

impl Default for CloudOptions {
Expand All @@ -81,6 +110,8 @@ impl Default for CloudOptions {
azure: Default::default(),
#[cfg(feature = "gcp")]
gcp: Default::default(),
#[cfg(feature = "http")]
http: Default::default(),
}
}
}
Expand Down Expand Up @@ -170,15 +201,35 @@ fn get_retry_config(max_retries: usize) -> RetryConfig {
}

#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
pub(super) fn get_client_options() -> ClientOptions {
ClientOptions::default()
pub(super) fn get_client_options(options: &CloudOptions) -> PolarsResult<ClientOptions> {
Ok(ClientOptions::default()
// We set request timeout super high as the timeout isn't reset at ACK,
// but starts from the moment we start downloading a body.
// https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.timeout
.with_timeout_disabled()
// Concurrency can increase connection latency, so set to None, similar to default.
.with_connect_timeout_disabled()
.with_allow_http(true)
.with_default_headers({
match &options.http {
Some(options) => options
.iter()
.map(|(opt, value)| {
match opt {
HttpConfigKey::Headers => {
// TODO: find a way to encode a map in the value so we can pass
// multiple headers
let name = "Foobar";
let name: HeaderName = name.parse().map_err(to_compute_err)?;
let value: HeaderValue = value.parse().map_err(to_compute_err)?;
Ok((name, value))
},
}
})
.collect::<PolarsResult<_>>()?,
_ => Default::default(),
}
}))
}

#[cfg(feature = "aws")]
Expand Down Expand Up @@ -316,7 +367,7 @@ impl CloudOptions {
};

builder
.with_client_options(get_client_options())
.with_client_options(get_client_options(self)?)
.with_retry(get_retry_config(self.max_retries))
.build()
.map_err(to_compute_err)
Expand Down Expand Up @@ -349,13 +400,28 @@ impl CloudOptions {
}

builder
.with_client_options(get_client_options())
.with_client_options(get_client_options(self)?)
.with_url(url)
.with_retry(get_retry_config(self.max_retries))
.build()
.map_err(to_compute_err)
}

/// Set the configuration for HTTP connections. This is the preferred API from rust.
#[cfg(feature = "gcp")]
pub fn with_http<I: IntoIterator<Item = (HttpConfigKey, impl Into<String>)>>(
mut self,
configs: I,
) -> Self {
self.http = Some(
configs
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<Configs<HttpConfigKey>>(),
);
self
}

/// Set the configuration for GCP connections. This is the preferred API from rust.
#[cfg(feature = "gcp")]
pub fn with_gcp<I: IntoIterator<Item = (GoogleConfigKey, impl Into<String>)>>(
Expand Down Expand Up @@ -383,7 +449,7 @@ impl CloudOptions {
}

builder
.with_client_options(get_client_options())
.with_client_options(get_client_options(self)?)
.with_url(url)
.with_retry(get_retry_config(self.max_retries))
.build()
Expand Down Expand Up @@ -420,7 +486,17 @@ impl CloudOptions {
}
},
CloudType::File => Ok(Self::default()),
CloudType::Http => Ok(Self::default()),
CloudType::Http => {
#[cfg(feature = "http")]
{
parsed_untyped_config::<HttpConfigKey, _>(config)
.map(|http| Self::default().with_http(http))
}
#[cfg(not(feature = "http"))]
{
polars_bail!(ComputeError: "'http' feature is not enabled");
}
},
CloudType::Gcp => {
#[cfg(feature = "gcp")]
{
Expand Down
Loading