Skip to content

Commit

Permalink
KIKIMR-22049: fix signed headers for S3 multipart upload (#10333)
Browse files Browse the repository at this point in the history
  • Loading branch information
pixcc authored Oct 11, 2024
1 parent 0eb2fea commit 7401315
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 25 deletions.
15 changes: 5 additions & 10 deletions ydb/core/tx/datashard/export_s3_uploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer);

auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(Settings.GetSchemeKey())
.WithStorageClass(Settings.GetStorageClass());
.WithKey(Settings.GetSchemeKey());
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));

this->Become(&TThis::StateUploadScheme);
Expand All @@ -199,8 +198,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
google::protobuf::TextFormat::PrintToString(Permissions.GetRef(), &Buffer);

auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(Settings.GetPermissionsKey())
.WithStorageClass(Settings.GetStorageClass());
.WithKey(Settings.GetPermissionsKey());
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));

this->Become(&TThis::StateUploadPermissions);
Expand All @@ -212,8 +210,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
Buffer = std::move(Metadata);

auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(Settings.GetMetadataKey())
.WithStorageClass(Settings.GetStorageClass());
.WithKey(Settings.GetMetadataKey());
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));

this->Become(&TThis::StateUploadMetadata);
Expand Down Expand Up @@ -311,8 +308,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
void UploadData() {
if (!MultiPart) {
auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
.WithStorageClass(Settings.GetStorageClass());
.WithKey(Settings.GetDataKey(DataFormat, CompressionCodec));
this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(Buffer)));
} else {
if (!UploadId) {
Expand Down Expand Up @@ -351,8 +347,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {

if (!upload) {
auto request = Aws::S3::Model::CreateMultipartUploadRequest()
.WithKey(Settings.GetDataKey(DataFormat, CompressionCodec))
.WithStorageClass(Settings.GetStorageClass());
.WithKey(Settings.GetDataKey(DataFormat, CompressionCodec));
this->Send(Client, new TEvExternalStorage::TEvCreateMultipartUploadRequest(request));
} else {
UploadId = upload->Id;
Expand Down
40 changes: 25 additions & 15 deletions ydb/core/wrappers/s3_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,31 +251,41 @@ class TInputStreamContext: public TContextBase<TEvRequest, TEvResponse> {

}; // TInputStreamContext

template <typename TEvRequest, typename TEvResponse>
class TPutInputStreamContext: public TInputStreamContext<TEvRequest, TEvResponse> {
template <typename TEvRequest, typename TEvResponse, template <typename, typename> typename TContext = TContextBase>
class TContextWithStorageClass : public TContext<TEvRequest, TEvResponse> {
private:
using TBase = TInputStreamContext<TEvRequest, TEvResponse>;
using TBase = TContext<TEvRequest, TEvResponse>;

public:
using TBase::TBase;

const typename TBase::TRequest& PrepareRequest(typename TEvRequest::TPtr& ev) override {
auto& request = ev->Get()->MutableRequest();
auto storageClass = TBase::StorageClass;

// workaround for minio.
// aws s3 treats NOT_SET as STANDARD
// but internally sdk just doesn't set corresponding header, while adds it to SignedHeaders
// and minio implementation treats it as error, returning to client error
// which literally can't be debugged e.g. "There were headers present in the request which were not signed"
if (storageClass == Aws::S3::Model::StorageClass::NOT_SET) {
storageClass = Aws::S3::Model::StorageClass::STANDARD;
if (TBase::StorageClass != Aws::S3::Model::StorageClass::NOT_SET) {
request.WithStorageClass(TBase::StorageClass);
}

request.WithStorageClass(storageClass);
return TBase::PrepareRequest(ev);
}
}; // TContextWithStorageClass

template <typename TEvRequest, typename TEvResponse>
class TPutInputStreamContext: public TContextWithStorageClass<TEvRequest, TEvResponse, TInputStreamContext> {
private:
using TBase = TContextWithStorageClass<TEvRequest, TEvResponse, TInputStreamContext>;

public:
using TBase::TBase;
}; // TPutInputStreamContext

template <typename TEvRequest, typename TEvResponse>
class TCreateMultipartUploadContext: public TContextWithStorageClass<TEvRequest, TEvResponse> {
private:
using TBase = TContextWithStorageClass<TEvRequest, TEvResponse>;

public:
using TBase::TBase;
}; // TCreateMultipartUploadContext

} // anonymous

TS3ExternalStorage::~TS3ExternalStorage() {
Expand Down Expand Up @@ -342,7 +352,7 @@ void TS3ExternalStorage::Execute(TEvDeleteObjectsRequest::TPtr& ev) const {
}

void TS3ExternalStorage::Execute(TEvCreateMultipartUploadRequest::TPtr& ev) const {
Call<TEvCreateMultipartUploadRequest, TEvCreateMultipartUploadResponse, TContextBase>(
Call<TEvCreateMultipartUploadRequest, TEvCreateMultipartUploadResponse, TCreateMultipartUploadContext>(
#if AWS_SDK_VERSION_MAJOR == 1 && AWS_SDK_VERSION_MINOR >= 11
ev, &S3Client::CreateMultipartUploadAsync<>);
#else
Expand Down

0 comments on commit 7401315

Please sign in to comment.