Skip to content

Commit

Permalink
Allow empty PutObject requests (#295)
Browse files Browse the repository at this point in the history
The new streaming PUT implementation in #282 broke empty PutObject requests (i.e. with a 0-byte body). This is because the CRT does not currently support 0-byte meta-requests without Content-Length. Here is the returned error:

```
0 byte meta requests without Content-Length header are currently not supported. Set Content-Length header to 0 to upload empty object 
```

While this limitation is likely to be lifted in the CRT in the future, this change addresses it in `mountpoint-s3-client` by delaying the request until the first write is requested or complete is called, so that the Content-Length=0 header can be set in the latter case.

A minor complication of this change is that `S3PutObjectRequest` now needs to hold on to the client to issue the request at a later time. To allow for it, `S3CrtClient` has been refactored to hold a pointer to a `S3CrtClientInner` which can be passed to the request.

---------

Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro authored Jun 19, 2023
1 parent be84645 commit 5d6fa05
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 121 deletions.
4 changes: 2 additions & 2 deletions mountpoint-s3-client/examples/client_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ fn main() {
part_size,
..Default::default()
};
let client = Arc::new(S3CrtClient::new(region, config).expect("couldn't create client"));
let client = S3CrtClient::new(region, config).expect("couldn't create client");

for i in 0..iterations.unwrap_or(1) {
let received_size = Arc::new(AtomicU64::new(0));
let start = Instant::now();
let client = Arc::clone(&client);
let client = client.clone();
let received_size_clone = Arc::clone(&received_size);
futures::executor::block_on(async move {
let mut request = client
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ pub enum GetObjectAttributesError {

/// Parameters to a [ObjectClient::put_object] request
/// TODO: Populate this struct with parameters from the S3 API, e.g., storage class, encryption.
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct PutObjectParams {}

Expand Down
72 changes: 26 additions & 46 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use mountpoint_s3_crt::auth::credentials::{
use mountpoint_s3_crt::common::allocator::Allocator;
use mountpoint_s3_crt::common::uri::Uri;
use mountpoint_s3_crt::http::request_response::{Header, Headers, Message};
use mountpoint_s3_crt::io::async_stream::{AsyncInputStream, AsyncStreamWriter};
use mountpoint_s3_crt::io::async_stream::AsyncInputStream;
use mountpoint_s3_crt::io::channel_bootstrap::{ClientBootstrap, ClientBootstrapOptions};
use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
use mountpoint_s3_crt::io::host_resolver::{HostResolver, HostResolverDefaultOptions};
Expand All @@ -34,7 +34,8 @@ use tracing::{error, trace, Span};
use crate::build_info;
use crate::endpoint::{AddressingStyle, Endpoint, EndpointError};
use crate::object_client::*;
use crate::s3_crt_client::get_object::GetObjectRequest;
use crate::s3_crt_client::get_object::S3GetObjectRequest;
use crate::s3_crt_client::put_object::S3PutObjectRequest;

macro_rules! request_span {
($self:expr, $method:expr) => {{
Expand Down Expand Up @@ -89,8 +90,25 @@ pub enum S3ClientAuthConfig {
Provider(CredentialsProvider),
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct S3CrtClient {
inner: Arc<S3CrtClientInner>,
}

impl S3CrtClient {
pub fn new(region: &str, config: S3ClientConfig) -> Result<Self, NewClientError> {
Ok(Self {
inner: Arc::new(S3CrtClientInner::new(region, config)?),
})
}

pub fn event_loop_group(&self) -> EventLoopGroup {
self.inner.event_loop_group.clone()
}
}

#[derive(Debug)]
struct S3CrtClientInner {
s3_client: Client,
event_loop_group: EventLoopGroup,
endpoint: Endpoint,
Expand All @@ -103,8 +121,8 @@ pub struct S3CrtClient {
part_size: Option<usize>,
}

impl S3CrtClient {
pub fn new(region: &str, config: S3ClientConfig) -> Result<Self, NewClientError> {
impl S3CrtClientInner {
fn new(region: &str, config: S3ClientConfig) -> Result<Self, NewClientError> {
let allocator = Allocator::default();

let mut event_loop_group = EventLoopGroup::new_default(&allocator, None, || {}).unwrap();
Expand Down Expand Up @@ -200,10 +218,6 @@ impl S3CrtClient {
})
}

pub fn event_loop_group(&self) -> EventLoopGroup {
self.event_loop_group.clone()
}

/// Create a new HTTP request template for the given HTTP method and S3 bucket name.
/// Pre-populates common headers used across all requests. Sets the "accept" header assuming the
/// response should be XML; this header should be overwritten for requests like GET that return
Expand Down Expand Up @@ -632,45 +646,9 @@ fn extract_range_header(headers: &Headers) -> Option<Range<u64>> {
Some(start..end + 1)
}

#[derive(Debug)]
pub struct S3PutObjectRequest {
body: S3HttpRequest<Vec<u8>, PutObjectError>,
writer: AsyncStreamWriter,
}

impl S3PutObjectRequest {
fn new(body: S3HttpRequest<Vec<u8>, PutObjectError>, writer: AsyncStreamWriter) -> Self {
Self { body, writer }
}
}

#[async_trait]
impl PutObjectRequest for S3PutObjectRequest {
type ClientError = S3RequestError;

async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError> {
self.writer
.write(slice)
.await
.map_err(|e| S3RequestError::InternalError(Box::new(e)).into())
}

async fn complete(mut self) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
let body = {
self.writer
.complete()
.await
.map_err(|e| S3RequestError::InternalError(Box::new(e)))?;
self.body
};
body.await?;
Ok(PutObjectResult {})
}
}

#[async_trait]
impl ObjectClient for S3CrtClient {
type GetObjectResult = GetObjectRequest;
type GetObjectResult = S3GetObjectRequest;
type PutObjectRequest = S3PutObjectRequest;
type ClientError = S3RequestError;

Expand Down Expand Up @@ -755,6 +733,7 @@ mod tests {
let client = S3CrtClient::new("eu-west-1", config).expect("Create test client");

let mut message = client
.inner
.new_request_template("GET", "plutotestankit")
.expect("new request template expected");

Expand All @@ -780,6 +759,7 @@ mod tests {
let client = S3CrtClient::new("eu-west-1", config).expect("Create test client");

let mut message = client
.inner
.new_request_template("GET", "plutotestankit")
.expect("new request template expected");

Expand Down
16 changes: 9 additions & 7 deletions mountpoint-s3-client/src/s3_crt_client/delete_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,26 @@ impl S3CrtClient {
bucket: &str,
key: &str,
) -> ObjectClientResult<DeleteObjectResult, DeleteObjectError, S3RequestError> {
let span = request_span!(self, "delete_object");
let span = request_span!(self.inner, "delete_object");
span.in_scope(|| debug!(?bucket, ?key, "new request"));

// Scope the endpoint, message, etc. since otherwise rustc thinks we use Message across the await.
let request = {
let mut message = self
.inner
.new_request_template("DELETE", bucket)
.map_err(S3RequestError::construction_failure)?;
message
.set_request_path(format!("/{key}"))
.map_err(S3RequestError::construction_failure)?;

self.make_simple_http_request(message, MetaRequestType::Default, span, |result| {
let parsed = parse_delete_object_error(&result);
parsed
.map(ObjectClientError::ServiceError)
.unwrap_or(ObjectClientError::ClientError(S3RequestError::ResponseError(result)))
})?
self.inner
.make_simple_http_request(message, MetaRequestType::Default, span, |result| {
let parsed = parse_delete_object_error(&result);
parsed
.map(ObjectClientError::ServiceError)
.unwrap_or(ObjectClientError::ClientError(S3RequestError::ResponseError(result)))
})?
};

let _body = request.await?;
Expand Down
15 changes: 8 additions & 7 deletions mountpoint-s3-client/src/s3_crt_client/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ impl S3CrtClient {
key: &str,
range: Option<Range<u64>>,
if_match: Option<ETag>,
) -> Result<GetObjectRequest, ObjectClientError<GetObjectError, S3RequestError>> {
let span = request_span!(self, "get_object");
) -> Result<S3GetObjectRequest, ObjectClientError<GetObjectError, S3RequestError>> {
let span = request_span!(self.inner, "get_object");
span.in_scope(
|| debug!(?bucket, ?key, ?range, ?if_match, size=?range.as_ref().map(|range| range.end - range.start), "new request"),
);

let mut message = self
.inner
.new_request_template("GET", bucket)
.map_err(S3RequestError::construction_failure)?;

Expand Down Expand Up @@ -62,7 +63,7 @@ impl S3CrtClient {
.map_err(S3RequestError::construction_failure)?;

let length = range.end.saturating_sub(range.start);
let part_size = self.part_size.unwrap_or(0) as u64;
let part_size = self.inner.part_size.unwrap_or(0) as u64;
if length >= part_size {
(MetaRequestType::GetObject, 0)
} else {
Expand All @@ -79,7 +80,7 @@ impl S3CrtClient {

let (sender, receiver) = futures::channel::mpsc::unbounded();

let request = self.make_meta_request(
let request = self.inner.make_meta_request(
message,
request_type,
span,
Expand All @@ -99,7 +100,7 @@ impl S3CrtClient {
},
)?;

Ok(GetObjectRequest {
Ok(S3GetObjectRequest {
request,
finish_receiver: receiver,
finished: false,
Expand All @@ -109,15 +110,15 @@ impl S3CrtClient {

#[derive(Debug)]
#[pin_project]
pub struct GetObjectRequest {
pub struct S3GetObjectRequest {
#[pin]
request: S3HttpRequest<(), GetObjectError>,
#[pin]
finish_receiver: UnboundedReceiver<Result<GetBodyPart, Error>>,
finished: bool,
}

impl Stream for GetObjectRequest {
impl Stream for S3GetObjectRequest {
type Item = ObjectClientResult<GetBodyPart, GetObjectError, S3RequestError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Expand Down
16 changes: 9 additions & 7 deletions mountpoint-s3-client/src/s3_crt_client/get_object_attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl S3CrtClient {
) -> ObjectClientResult<GetObjectAttributesResult, GetObjectAttributesError, S3RequestError> {
let body = {
let mut message = self
.inner
.new_request_template("GET", bucket)
.map_err(S3RequestError::construction_failure)?;

Expand Down Expand Up @@ -139,7 +140,7 @@ impl S3CrtClient {
.add_header(&Header::new("x-amz-object-attributes", object_attributes.join(",")))
.map_err(S3RequestError::construction_failure)?;

let span = request_span!(self, "get_object_attributes");
let span = request_span!(self.inner, "get_object_attributes");
span.in_scope(|| {
debug!(
?bucket,
Expand All @@ -151,12 +152,13 @@ impl S3CrtClient {
)
});

self.make_simple_http_request(message, MetaRequestType::Default, span, |result| {
let parsed = parse_get_object_attributes_error(&result);
parsed
.map(ObjectClientError::ServiceError)
.unwrap_or(ObjectClientError::ClientError(S3RequestError::ResponseError(result)))
})?
self.inner
.make_simple_http_request(message, MetaRequestType::Default, span, |result| {
let parsed = parse_get_object_attributes_error(&result);
parsed
.map(ObjectClientError::ServiceError)
.unwrap_or(ObjectClientError::ClientError(S3RequestError::ResponseError(result)))
})?
};

let body = body.await?;
Expand Down
34 changes: 18 additions & 16 deletions mountpoint-s3-client/src/s3_crt_client/head_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,31 @@ impl S3CrtClient {
pub async fn head_bucket(&self, bucket: &str) -> ObjectClientResult<(), HeadBucketError, S3RequestError> {
let body = {
let mut message = self
.inner
.new_request_template("HEAD", bucket)
.map_err(S3RequestError::construction_failure)?;

message
.set_request_path("/")
.map_err(S3RequestError::construction_failure)?;

let span = request_span!(self, "head_bucket");
span.in_scope(|| debug!(?bucket, endpoint = ?self.endpoint, "new request"));

self.make_simple_http_request(message, MetaRequestType::Default, span, |request_result| {
match request_result.response_status {
301 => try_parse_redirect(&request_result)
.map(ObjectClientError::ServiceError)
.unwrap_or(ObjectClientError::ClientError(S3RequestError::ResponseError(
request_result,
))),
// S3 returns 400 for invalid or expired STS tokens
400 | 403 => ObjectClientError::ServiceError(HeadBucketError::PermissionDenied(request_result)),
404 => ObjectClientError::ServiceError(HeadBucketError::NoSuchBucket),
_ => ObjectClientError::ClientError(S3RequestError::ResponseError(request_result)),
}
})?
let span = request_span!(self.inner, "head_bucket");
span.in_scope(|| debug!(?bucket, endpoint = ?self.inner.endpoint, "new request"));

self.inner
.make_simple_http_request(message, MetaRequestType::Default, span, |request_result| {
match request_result.response_status {
301 => try_parse_redirect(&request_result)
.map(ObjectClientError::ServiceError)
.unwrap_or(ObjectClientError::ClientError(S3RequestError::ResponseError(
request_result,
))),
// S3 returns 400 for invalid or expired STS tokens
400 | 403 => ObjectClientError::ServiceError(HeadBucketError::PermissionDenied(request_result)),
404 => ObjectClientError::ServiceError(HeadBucketError::NoSuchBucket),
_ => ObjectClientError::ClientError(S3RequestError::ResponseError(request_result)),
}
})?
};

body.await.map(|_body| ())
Expand Down
5 changes: 3 additions & 2 deletions mountpoint-s3-client/src/s3_crt_client/head_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl S3CrtClient {
) -> ObjectClientResult<HeadObjectResult, HeadObjectError, S3RequestError> {
let request = {
let mut message = self
.inner
.new_request_template("HEAD", bucket)
.map_err(S3RequestError::construction_failure)?;

Expand All @@ -80,10 +81,10 @@ impl S3CrtClient {
let header: Arc<Mutex<Option<Result<HeadObjectResult, ParseError>>>> = Default::default();
let header1 = header.clone();

let span = request_span!(self, "head_object");
let span = request_span!(self.inner, "head_object");
span.in_scope(|| debug!(?bucket, ?key, "new request"));

self.make_meta_request(
self.inner.make_meta_request(
message,
MetaRequestType::Default,
span,
Expand Down
16 changes: 9 additions & 7 deletions mountpoint-s3-client/src/s3_crt_client/list_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl S3CrtClient {
// Scope the endpoint, message, etc. since otherwise rustc thinks we use Message across the await.
let body = {
let mut message = self
.inner
.new_request_template("GET", bucket)
.map_err(S3RequestError::construction_failure)?;

Expand All @@ -158,7 +159,7 @@ impl S3CrtClient {
.set_request_path_and_query("/", query)
.map_err(S3RequestError::construction_failure)?;

let span = request_span!(self, "list_objects");
let span = request_span!(self.inner, "list_objects");
span.in_scope(|| {
debug!(
?bucket,
Expand All @@ -170,12 +171,13 @@ impl S3CrtClient {
)
});

self.make_simple_http_request(message, MetaRequestType::Default, span, |result| {
let parsed = parse_list_objects_error(&result);
parsed
.map(ObjectClientError::ServiceError)
.unwrap_or(ObjectClientError::ClientError(S3RequestError::ResponseError(result)))
})?
self.inner
.make_simple_http_request(message, MetaRequestType::Default, span, |result| {
let parsed = parse_list_objects_error(&result);
parsed
.map(ObjectClientError::ServiceError)
.unwrap_or(ObjectClientError::ClientError(S3RequestError::ResponseError(result)))
})?
};

let body = body.await?;
Expand Down
Loading

0 comments on commit 5d6fa05

Please sign in to comment.