Skip to content

Commit

Permalink
feat: add Extensions to object store PutOptions
Browse files Browse the repository at this point in the history
Follow-up to apache#7170.
  • Loading branch information
crepererum committed Feb 27, 2025
1 parent 8df892b commit 14dfccf
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 10 deletions.
5 changes: 5 additions & 0 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,11 @@ impl Request<'_> {
Self { builder, ..self }
}

pub(crate) fn with_extensions(self, extensions: ::http::Extensions) -> Self {
let builder = self.builder.extensions(extensions);
Self { builder, ..self }
}

pub(crate) fn with_payload(mut self, payload: PutPayload) -> Self {
if (!self.config.skip_signature && self.config.sign_payload)
|| self.config.checksum.is_some()
Expand Down
14 changes: 11 additions & 3 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,23 @@ impl ObjectStore for AmazonS3 {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
let PutOptions {
mode,
tags,
attributes,
extensions,
} = opts;

let request = self
.client
.request(Method::PUT, location)
.with_payload(payload)
.with_attributes(opts.attributes)
.with_tags(opts.tags)
.with_attributes(attributes)
.with_tags(tags)
.with_extensions(extensions)
.with_encryption_headers();

match (opts.mode, &self.client.config.conditional_put) {
match (mode, &self.client.config.conditional_put) {
(PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
(PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
(PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
Expand Down
19 changes: 16 additions & 3 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ impl PutRequest<'_> {
Self { builder, ..self }
}

fn with_extensions(self, extensions: ::http::Extensions) -> Self {
let builder = self.builder.extensions(extensions);
Self { builder, ..self }
}

async fn send(self) -> Result<HttpResponse> {
let credential = self.config.get_credential().await?;
let sensitive = credential
Expand Down Expand Up @@ -540,12 +545,20 @@ impl AzureClient {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
let PutOptions {
mode,
tags,
attributes,
extensions,
} = opts;

let builder = self
.put_request(path, payload)
.with_attributes(opts.attributes)
.with_tags(opts.tags);
.with_attributes(attributes)
.with_extensions(extensions)
.with_tags(tags);

let builder = match &opts.mode {
let builder = match &mode {
PutMode::Overwrite => builder.idempotent(true),
PutMode::Create => builder.header(&IF_NONE_MATCH, "*"),
PutMode::Update(v) => {
Expand Down
20 changes: 17 additions & 3 deletions object_store/src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ impl Request<'_> {
}
}

fn with_extensions(self, extensions: ::http::Extensions) -> Self {
let builder = self.builder.extensions(extensions);
Self { builder, ..self }
}

async fn send(self) -> Result<HttpResponse> {
let credential = self.config.credentials.get_credential().await?;
let resp = self
Expand Down Expand Up @@ -384,12 +389,21 @@ impl GoogleCloudStorageClient {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
let PutOptions {
mode,
// not supported by GCP
tags: _,
attributes,
extensions,
} = opts;

let builder = self
.request(Method::PUT, path)
.with_payload(payload)
.with_attributes(opts.attributes);
.with_attributes(attributes)
.with_extensions(extensions);

let builder = match &opts.mode {
let builder = match &mode {
PutMode::Overwrite => builder.idempotent(true),
PutMode::Create => builder.header(&VERSION_MATCH, "0"),
PutMode::Update(v) => {
Expand All @@ -398,7 +412,7 @@ impl GoogleCloudStorageClient {
}
};

match (opts.mode, builder.do_put().await) {
match (mode, builder.do_put().await) {
(PutMode::Create, Err(crate::Error::Precondition { path, source })) => {
Err(crate::Error::AlreadyExists { path, source })
}
Expand Down
29 changes: 28 additions & 1 deletion object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ impl From<PutResult> for UpdateVersion {
}

/// Options for a put request
#[derive(Debug, Clone, PartialEq, Eq, Default)]
#[derive(Debug, Clone, Default)]
pub struct PutOptions {
/// Configure the [`PutMode`] for this operation
pub mode: PutMode,
Expand All @@ -1166,8 +1166,35 @@ pub struct PutOptions {
///
/// Implementations that don't support an attribute should return an error
pub attributes: Attributes,
/// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
/// that need to pass context-specific information (like tracing spans) via trait methods.
///
/// These extensions are ignored entirely by backends offered through this crate.
///
/// They are also eclused from [`PartialEq`] and [`Eq`].
pub extensions: ::http::Extensions,
}

impl PartialEq<Self> for PutOptions {
fn eq(&self, other: &Self) -> bool {
let Self {
mode,
tags,
attributes,
extensions: _,
} = self;
let Self {
mode: other_mode,
tags: other_tags,
attributes: other_attributes,
extensions: _,
} = other;
(mode == other_mode) && (tags == other_tags) && (attributes == other_attributes)
}
}

impl Eq for PutOptions {}

impl From<PutMode> for PutOptions {
fn from(mode: PutMode) -> Self {
Self {
Expand Down

0 comments on commit 14dfccf

Please sign in to comment.