From 125a8b45e66fde8b49c69067ff4692bed16aa3dd Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Fri, 11 Oct 2024 21:45:39 +0300 Subject: [PATCH] Cosmetic changes & simplification --- ydb/core/wrappers/abstract.cpp | 3 +- ydb/core/wrappers/abstract.h | 8 +- ydb/core/wrappers/events/abstract.h | 48 ++--- ydb/core/wrappers/events/common.h | 122 +++++-------- ydb/core/wrappers/events/delete_objects.h | 32 ++-- ydb/core/wrappers/events/get_object.h | 18 +- ydb/core/wrappers/events/list_objects.h | 32 ++-- ydb/core/wrappers/events/object_exists.h | 18 +- ydb/core/wrappers/events/s3_out.cpp | 6 +- ydb/core/wrappers/events/s3_out.h | 14 +- ydb/core/wrappers/events/ya.make | 2 +- ydb/core/wrappers/fake_storage.cpp | 2 + ydb/core/wrappers/fake_storage.h | 6 +- ydb/core/wrappers/s3_storage.cpp | 206 +++++++++------------- ydb/core/wrappers/s3_storage.h | 68 ++++--- ydb/core/wrappers/s3_storage_config.cpp | 22 ++- ydb/core/wrappers/s3_storage_config.h | 6 +- ydb/core/wrappers/s3_wrapper.cpp | 16 +- ydb/core/wrappers/s3_wrapper.h | 1 + 19 files changed, 266 insertions(+), 364 deletions(-) diff --git a/ydb/core/wrappers/abstract.cpp b/ydb/core/wrappers/abstract.cpp index 0f3681beec77..c13cd9bc6e0a 100644 --- a/ydb/core/wrappers/abstract.cpp +++ b/ydb/core/wrappers/abstract.cpp @@ -2,8 +2,6 @@ #include "fake_storage_config.h" #include "s3_storage_config.h" -#include - namespace NKikimr::NWrappers::NExternalStorage { IExternalStorageOperator::TPtr IExternalStorageConfig::ConstructStorageOperator(bool verbose) const { @@ -17,4 +15,5 @@ IExternalStorageConfig::TPtr IExternalStorageConfig::Construct(const NKikimrSche return std::make_shared(settings); } } + } diff --git a/ydb/core/wrappers/abstract.h b/ydb/core/wrappers/abstract.h index 145bab9d77ba..c8c8adab8c99 100644 --- a/ydb/core/wrappers/abstract.h +++ b/ydb/core/wrappers/abstract.h @@ -1,12 +1,14 @@ #pragma once -#include + +#include #include #include #include +#include #include #include -#include -#include + +#include namespace NKikimr::NWrappers { diff --git a/ydb/core/wrappers/events/abstract.h b/ydb/core/wrappers/events/abstract.h index de4a651dfc74..a07a8b634803 100644 --- a/ydb/core/wrappers/events/abstract.h +++ b/ydb/core/wrappers/events/abstract.h @@ -1,6 +1,8 @@ #pragma once + #include -#include + +#include namespace NKikimr::NWrappers::NExternalStorage { @@ -11,28 +13,30 @@ class IRequestContext { }; #define EV_REQUEST_RESPONSE(name) \ - Ev##name##Request, \ - Ev##name##Response - - enum EEv { - EvBegin = EventSpaceBegin(TKikimrEvents::ES_S3_WRAPPER), - - EV_REQUEST_RESPONSE(GetObject), - EV_REQUEST_RESPONSE(HeadObject), - EV_REQUEST_RESPONSE(PutObject), - EV_REQUEST_RESPONSE(DeleteObject), - EV_REQUEST_RESPONSE(DeleteObjects), - EV_REQUEST_RESPONSE(CreateMultipartUpload), - EV_REQUEST_RESPONSE(UploadPart), - EV_REQUEST_RESPONSE(CompleteMultipartUpload), - EV_REQUEST_RESPONSE(AbortMultipartUpload), - EV_REQUEST_RESPONSE(ListObjects), - EV_REQUEST_RESPONSE(CheckObjectExists), - EV_REQUEST_RESPONSE(UploadPartCopy), - EvEnd, - }; + Ev##name##Request, \ + Ev##name##Response + +enum EEv { + EvBegin = EventSpaceBegin(TKikimrEvents::ES_S3_WRAPPER), + + EV_REQUEST_RESPONSE(GetObject), + EV_REQUEST_RESPONSE(HeadObject), + EV_REQUEST_RESPONSE(PutObject), + EV_REQUEST_RESPONSE(DeleteObject), + EV_REQUEST_RESPONSE(DeleteObjects), + EV_REQUEST_RESPONSE(CreateMultipartUpload), + EV_REQUEST_RESPONSE(UploadPart), + EV_REQUEST_RESPONSE(CompleteMultipartUpload), + EV_REQUEST_RESPONSE(AbortMultipartUpload), + EV_REQUEST_RESPONSE(ListObjects), + EV_REQUEST_RESPONSE(CheckObjectExists), + EV_REQUEST_RESPONSE(UploadPartCopy), + + EvEnd, +}; #undef EV_REQUEST_RESPONSE - static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_S3_WRAPPER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_S3_WRAPPER)"); +static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_S3_WRAPPER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_S3_WRAPPER)"); + } diff --git a/ydb/core/wrappers/events/common.h b/ydb/core/wrappers/events/common.h index a7303c772a55..30c7f97f7b49 100644 --- a/ydb/core/wrappers/events/common.h +++ b/ydb/core/wrappers/events/common.h @@ -1,23 +1,21 @@ #pragma once + #include "abstract.h" #include "s3_out.h" #include -#include -#include - -#include - namespace NKikimr::NWrappers::NExternalStorage { template struct TGenericRequest: public NActors::TEventLocal { private: IRequestContext::TPtr RequestContext; + public: using TRequest = T; TRequest Request; + IRequestContext::TPtr GetRequestContext() const { return RequestContext; } @@ -30,11 +28,7 @@ struct TGenericRequest: public NActors::TEventLocal { return Request; } - explicit TGenericRequest(const TRequest& request) - : Request(request) { - } - - explicit TGenericRequest(const TRequest& request, IRequestContext::TPtr requestContext) + explicit TGenericRequest(const TRequest& request, IRequestContext::TPtr requestContext = nullptr) : RequestContext(requestContext) , Request(request) { @@ -57,7 +51,8 @@ struct TRequestWithBody: public TGenericRequest { explicit TRequestWithBody(const typename TGeneric::TRequest& request, TString&& body) : TGeneric(request) - , Body(std::move(body)) { + , Body(std::move(body)) + { } TString ToString() const override { @@ -70,81 +65,59 @@ struct TRequestWithBody: public TGenericRequest { using TBase = TRequestWithBody; }; -template -struct TBaseGenericResponse: public NActors::TEventLocal { +template +struct TGenericResponse: public NActors::TEventLocal { private: - using TBase = NActors::TEventLocal; IRequestContext::TPtr RequestContext; + public: - using TOutcome = Aws::Utils::Outcome; + using TOutcome = Aws::Utils::Outcome; using TResult = Aws::Utils::Outcome; using TAwsResult = U; - using TAwsOutcome = TResult; using TKey = std::optional; + TKey Key; TResult Result; - explicit TBaseGenericResponse(const TOutcome& outcome) - : Result(TDerived::ResultFromOutcome(outcome)) { + explicit TGenericResponse(const TOutcome& outcome, IRequestContext::TPtr requestContext = nullptr) + : RequestContext(requestContext) + , Result(TDerived::ResultFromOutcome(outcome)) + { } - explicit TBaseGenericResponse(const TOutcome& outcome, IRequestContext::TPtr requestContext) + explicit TGenericResponse(const TKey& key, const TOutcome& outcome, IRequestContext::TPtr requestContext = nullptr) : RequestContext(requestContext) - , Result(TDerived::ResultFromOutcome(outcome)) { + , Key(key) + , Result(TDerived::ResultFromOutcome(outcome)) + { } bool IsSuccess() const { return Result.IsSuccess(); } + const Aws::S3::S3Error& GetError() const { return Result.GetError(); } + const U& GetResult() const { return Result.GetResult(); } - template - std::shared_ptr GetRequestContextAs() const { - return dynamic_pointer_cast(RequestContext); + template + std::shared_ptr GetRequestContextAs() const { + Y_ABORT_UNLESS(RequestContext); + return dynamic_pointer_cast(RequestContext); } static TResult ResultFromOutcome(const TOutcome& outcome) { return outcome; } - TString ToString() const override { - return TStringBuilder() << this->ToStringHeader() << " {" - << " Result: " << Result - << " }"; - } -}; - -template -struct TGenericResponse: public TBaseGenericResponse { -private: - using TBase = TBaseGenericResponse; -public: - using TOutcome = typename TBase::TOutcome; - using TResult = typename TBase::TResult; - using TKey = std::optional; - - TKey Key; - - explicit TGenericResponse(const TKey& key, const TOutcome& outcome) - : TBase(outcome) - , Key(key) { - } - - explicit TGenericResponse(const TKey& key, const TOutcome& outcome, IRequestContext::TPtr requestContext) - : TBase(outcome, requestContext) - , Key(key) - { - } - TString ToString() const override { return TStringBuilder() << this->ToStringHeader() << " {" << " Key: " << (Key ? "null" : *Key) - << " Result: " << TBase::Result + << " Result: " << Result << " }"; } }; @@ -153,18 +126,21 @@ template struct TResponseWithBody: public TGenericResponse { private: using TBase = TGenericResponse; + public: using TKey = typename TBase::TKey; TString Body; explicit TResponseWithBody(const TKey& key, const typename TBase::TOutcome& outcome) - : TBase(key, outcome) { + : TBase(key, outcome) + { } explicit TResponseWithBody(const TKey& key, const typename TBase::TOutcome& outcome, TString&& body) : TBase(key, outcome) - , Body(std::move(body)) { + , Body(std::move(body)) + { } TString ToString() const override { @@ -177,34 +153,34 @@ struct TResponseWithBody: public TGenericResponse { }; #define DEFINE_REQUEST(name, base) \ - struct TEv##name##Request: public base { \ - using TBase::TBase; \ - } + struct TEv##name##Request: public base { \ + using TBase::TBase; \ + } #define DEFINE_GENERIC_REQUEST(name) \ - DEFINE_REQUEST(name, TGenericRequest) + DEFINE_REQUEST(name, TGenericRequest) #define DECLARE_GENERIC_RESPONSE(name) \ - struct TEv##name##Response: public TGenericResponse {\ - private:\ - using TBase = TGenericResponse;\ - public:\ - using TBase::TBase; + struct TEv##name##Response: public TGenericResponse { \ + private: \ + using TBase = TGenericResponse; \ + public: \ + using TBase::TBase; #define DECLARE_RESPONSE_WITH_BODY(name, result_t) \ - struct TEv##name##Response: public TResponseWithBody {\ - private:\ - using TBase = TResponseWithBody;\ - public:\ - using TBase::TBase; + struct TEv##name##Response: public TResponseWithBody { \ + private: \ + using TBase = TResponseWithBody; \ + public: \ + using TBase::TBase; #define DEFINE_GENERIC_RESPONSE(name) \ - DECLARE_GENERIC_RESPONSE(name) \ - } + DECLARE_GENERIC_RESPONSE(name) \ + } #define DEFINE_GENERIC_REQUEST_RESPONSE(name) \ - DEFINE_GENERIC_REQUEST(name); \ - DEFINE_GENERIC_RESPONSE(name) + DEFINE_GENERIC_REQUEST(name); \ + DEFINE_GENERIC_RESPONSE(name) DEFINE_REQUEST(PutObject, TRequestWithBody); DEFINE_GENERIC_RESPONSE(PutObject); diff --git a/ydb/core/wrappers/events/delete_objects.h b/ydb/core/wrappers/events/delete_objects.h index eec58495f0b7..8d3996a5f285 100644 --- a/ydb/core/wrappers/events/delete_objects.h +++ b/ydb/core/wrappers/events/delete_objects.h @@ -1,30 +1,24 @@ #pragma once -#include "abstract.h" -#include "common.h" -#include -#include +#include "common.h" -#include #include #include -#include -#include namespace NKikimr::NWrappers::NExternalStorage { - class TEvDeleteObjectsRequest: public TGenericRequest { - private: - using TBase = TGenericRequest; - public: - using TBase::TBase; - }; +class TEvDeleteObjectsRequest: public TGenericRequest { +private: + using TBase = TGenericRequest; +public: + using TBase::TBase; +}; - class TEvDeleteObjectsResponse: public TBaseGenericResponse { - private: - using TBase = TBaseGenericResponse; - public: - using TBase::TBase; - }; +class TEvDeleteObjectsResponse: public TGenericResponse { +private: + using TBase = TGenericResponse; +public: + using TBase::TBase; +}; } diff --git a/ydb/core/wrappers/events/get_object.h b/ydb/core/wrappers/events/get_object.h index ca1f3d4ddb4a..12ebaa6ba8af 100644 --- a/ydb/core/wrappers/events/get_object.h +++ b/ydb/core/wrappers/events/get_object.h @@ -1,15 +1,9 @@ #pragma once -#include "abstract.h" -#include "common.h" -#include -#include -#include +#include "common.h" -#include -#include -#include -#include +#include +#include namespace NKikimr::NWrappers::NExternalStorage { @@ -24,6 +18,7 @@ class TEvGetObjectResponse: public TResponseWithBody; std::pair ReadInterval; + public: ui32 GetReadIntervalLength() const { return ReadInterval.second - ReadInterval.first + 1; @@ -42,7 +37,8 @@ class TEvGetObjectResponse: public TResponseWithBody -#include +#include "common.h" -#include #include #include -#include -#include namespace NKikimr::NWrappers::NExternalStorage { - class TEvListObjectsRequest: public TGenericRequest { - private: - using TBase = TGenericRequest; - public: - using TBase::TBase; - }; +class TEvListObjectsRequest: public TGenericRequest { +private: + using TBase = TGenericRequest; +public: + using TBase::TBase; +}; - class TEvListObjectsResponse: public TBaseGenericResponse { - private: - using TBase = TBaseGenericResponse; - public: - using TBase::TBase; - }; +class TEvListObjectsResponse: public TGenericResponse { +private: + using TBase = TGenericResponse; +public: + using TBase::TBase; +}; } diff --git a/ydb/core/wrappers/events/object_exists.h b/ydb/core/wrappers/events/object_exists.h index 87b1d571becb..734d4d2e88f2 100644 --- a/ydb/core/wrappers/events/object_exists.h +++ b/ydb/core/wrappers/events/object_exists.h @@ -1,34 +1,28 @@ #pragma once -#include "abstract.h" -#include "common.h" -#include -#include -#include +#include "common.h" #include #include -#include -#include namespace NKikimr::NWrappers::NExternalStorage { -class TEvCheckObjectExistsRequest: public TGenericRequest { +class TEvCheckObjectExistsRequest: public TGenericRequest { private: using TBase = TGenericRequest; public: using TBase::TBase; }; -class TEvCheckObjectExistsResponse: public TBaseGenericResponse { +class TEvCheckObjectExistsResponse: public TGenericResponse { private: - using TBase = TBaseGenericResponse; + using TBase = TGenericResponse; public: using TBase::TBase; + bool IsExists() const { return Result.IsSuccess(); } }; + } diff --git a/ydb/core/wrappers/events/s3_out.cpp b/ydb/core/wrappers/events/s3_out.cpp index 65517ebecb6c..06eac6ab80fe 100644 --- a/ydb/core/wrappers/events/s3_out.cpp +++ b/ydb/core/wrappers/events/s3_out.cpp @@ -7,8 +7,7 @@ #include #include -namespace NKikimr { -namespace NWrappers { +namespace NKikimr::NWrappers { using namespace Aws::S3::Model; @@ -221,5 +220,4 @@ void Out(IOutputStream& out, const TStringOutcome& outcome) { OutOutcome(out, outcome); } -} // NWrappers -} // NKikimr +} // NKikimr::NWrappers diff --git a/ydb/core/wrappers/events/s3_out.h b/ydb/core/wrappers/events/s3_out.h index 24b37943635d..76b46ce3d8f3 100644 --- a/ydb/core/wrappers/events/s3_out.h +++ b/ydb/core/wrappers/events/s3_out.h @@ -3,20 +3,19 @@ #include #include #include +#include +#include #include -#include #include +#include #include -#include -#include -#include #include +#include #include #include -namespace NKikimr { -namespace NWrappers { +namespace NKikimr::NWrappers { void Out(IOutputStream& out, const Aws::S3::Model::GetObjectRequest& request); void Out(IOutputStream& out, const Aws::S3::Model::GetObjectResult& result); @@ -68,8 +67,7 @@ void Out(IOutputStream& out, const Aws::S3::Model::CompletedPart& part); using TStringOutcome = Aws::Utils::Outcome; void Out(IOutputStream& out, const TStringOutcome& outcome); -} // NWrappers -} // NKikimr +} // NKikimr::NWrappers Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::GetObjectRequest, out, value) { NKikimr::NWrappers::Out(out, value); diff --git a/ydb/core/wrappers/events/ya.make b/ydb/core/wrappers/events/ya.make index 1950191a60d8..c330fb84d4e6 100644 --- a/ydb/core/wrappers/events/ya.make +++ b/ydb/core/wrappers/events/ya.make @@ -17,10 +17,10 @@ ELSE() PEERDIR( contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3 contrib/libs/curl - ydb/library/actors/core ydb/core/base ydb/core/protos ydb/core/wrappers/ut_helpers + ydb/library/actors/core ) ENDIF() diff --git a/ydb/core/wrappers/fake_storage.cpp b/ydb/core/wrappers/fake_storage.cpp index d978b40cb036..7b755514f752 100644 --- a/ydb/core/wrappers/fake_storage.cpp +++ b/ydb/core/wrappers/fake_storage.cpp @@ -8,7 +8,9 @@ #include #include + #include + #include #ifndef KIKIMR_DISABLE_S3_OPS diff --git a/ydb/core/wrappers/fake_storage.h b/ydb/core/wrappers/fake_storage.h index c672835c9874..fff953f8203f 100644 --- a/ydb/core/wrappers/fake_storage.h +++ b/ydb/core/wrappers/fake_storage.h @@ -5,13 +5,9 @@ #include "abstract.h" #include -#include - +#include #include -#include -#include - namespace NKikimr::NWrappers::NExternalStorage { class TFakeBucketStorage { private: diff --git a/ydb/core/wrappers/s3_storage.cpp b/ydb/core/wrappers/s3_storage.cpp index 22e4e0566cf8..431c35da5d15 100644 --- a/ydb/core/wrappers/s3_storage.cpp +++ b/ydb/core/wrappers/s3_storage.cpp @@ -5,9 +5,9 @@ #include #include #include -#include + #include -#include + #include #ifndef KIKIMR_DISABLE_S3_OPS @@ -23,18 +23,20 @@ using namespace Aws::Utils::Stream; namespace { template -class TCommonContextBase: public AsyncCallerContext { -protected: - using TRequest = typename TEvRequest::TRequest; - using TOutcome = typename TEvResponse::TOutcome; +class TContextBase: public AsyncCallerContext { public: - explicit TCommonContextBase(const TActorSystem* sys, const TActorId& sender, IRequestContext::TPtr requestContext, const Aws::S3::Model::StorageClass storageClass, const TReplyAdapterContainer& replyAdapter) + explicit TContextBase( + const TActorSystem* sys, + const TActorId& sender, + IRequestContext::TPtr requestContext, + const Aws::S3::Model::StorageClass storageClass, + const TReplyAdapterContainer& replyAdapter) : AsyncCallerContext() + , ActorSystem(sys) + , Sender(sender) , RequestContext(requestContext) , StorageClass(storageClass) , ReplyAdapter(replyAdapter) - , ActorSystem(sys) - , Sender(sender) { } @@ -42,7 +44,7 @@ class TCommonContextBase: public AsyncCallerContext { return ActorSystem; } - virtual const TRequest& PrepareRequest(typename TEvRequest::TPtr& ev) { + virtual const typename TEvRequest::TRequest& PrepareRequest(typename TEvRequest::TPtr& ev) { return ev->Get()->GetRequest(); } @@ -55,91 +57,56 @@ class TCommonContextBase: public AsyncCallerContext { Send(Sender, std::move(ev)); } - mutable bool Replied = false; - IRequestContext::TPtr RequestContext; - const Aws::S3::Model::StorageClass StorageClass; - const TReplyAdapterContainer& ReplyAdapter; private: const TActorSystem* ActorSystem; const TActorId Sender; -}; // TCommonContextBase -template -class TContextBase: public TCommonContextBase { -private: - using TBase = TCommonContextBase; protected: - virtual std::unique_ptr MakeResponse(const typename TEvResponse::TKey& key, const typename TBase::TOutcome& outcome) const { - return TBase::ReplyAdapter.RebuildReplyEvent(std::make_unique(key, outcome)); - } + mutable bool Replied = false; + IRequestContext::TPtr RequestContext; + const Aws::S3::Model::StorageClass StorageClass; + const TReplyAdapterContainer& ReplyAdapter; + +}; // TContextBase +template +class TBasicContext: public TContextBase { public: - using TBase::Send; - using TBase::TBase; - void Reply(const typename TBase::TRequest& request, const typename TBase::TOutcome& outcome) const { - Y_ABORT_UNLESS(!std::exchange(TBase::Replied, true), "Double-reply"); + using TContextBase::TContextBase; - typename TEvResponse::TKey key; - if (request.KeyHasBeenSet()) { - key = request.GetKey(); - } - Send(MakeResponse(key, outcome)); + void Reply(const typename TEvRequest::TRequest&, const typename TEvResponse::TOutcome& outcome) const { + Y_ABORT_UNLESS(!std::exchange(this->Replied, true), "Double-reply"); + this->Send(std::make_unique(outcome, this->RequestContext)); } }; -template <> -class TContextBase - : public TCommonContextBase { -private: - using TBase = TCommonContextBase; +template +class TGenericContext: public TContextBase { public: - using TBase::Send; - using TBase::TBase; - void Reply(const typename TBase::TRequest& /*request*/, const typename TBase::TOutcome& outcome) const { - Y_ABORT_UNLESS(!std::exchange(TBase::Replied, true), "Double-reply"); + using TContextBase::TContextBase; - Send(std::make_unique(outcome)); - } -}; + void Reply(const typename TEvRequest::TRequest& request, const typename TEvResponse::TOutcome& outcome) const { + Y_ABORT_UNLESS(!std::exchange(this->Replied, true), "Double-reply"); -template <> -class TContextBase - : public TCommonContextBase { -private: - using TBase = TCommonContextBase; -public: - using TBase::Send; - using TBase::TBase; - void Reply(const typename TBase::TRequest& /*request*/, const typename TBase::TOutcome& outcome) const { - Y_ABORT_UNLESS(!std::exchange(TBase::Replied, true), "Double-reply"); + typename TEvResponse::TKey key; + if (request.KeyHasBeenSet()) { + key = request.GetKey(); + } - Send(std::make_unique(outcome)); + this->Send(MakeResponse(key, outcome)); } -}; -template <> -class TContextBase - : public TCommonContextBase { -private: - using TBase = TCommonContextBase; -public: - using TBase::Send; - using TBase::TBase; - void Reply(const typename TBase::TRequest& /*request*/, const typename TBase::TOutcome& outcome) const { - Y_ABORT_UNLESS(!std::exchange(TBase::Replied, true), "Double-reply"); - Send(std::make_unique(outcome, RequestContext)); +protected: + virtual std::unique_ptr MakeResponse( + const typename TEvResponse::TKey& key, + const typename TEvResponse::TOutcome& outcome) const + { + return this->ReplyAdapter.RebuildReplyEvent(std::make_unique(key, outcome)); } }; template -class TOutputStreamContext: public TContextBase { -private: - using TBase = TContextBase; -protected: - using TRequest = typename TEvRequest::TRequest; - using TOutcome = typename TEvResponse::TOutcome; - -private: +class TOutputStreamContext: public TGenericContext { class TOutputStreamBuf: public PreallocatedStreamBuf { TOutputStreamBuf(char* data, size_t size) : PreallocatedStreamBuf(reinterpret_cast(data), size) @@ -173,11 +140,10 @@ class TOutputStreamContext: public TContextBase { return true; } - std::optional> Range; public: - using TContextBase::TContextBase; + using TGenericContext::TGenericContext; - const TRequest& PrepareRequest(typename TEvRequest::TPtr& ev) override { + const typename TEvRequest::TRequest& PrepareRequest(typename TEvRequest::TPtr& ev) override { auto& request = ev->Get()->Request; std::pair range; @@ -188,13 +154,17 @@ class TOutputStreamContext: public TContextBase { request.SetResponseStreamFactory([this]() { return Aws::New("StreamContext", MakeUnique("StreamContext", Buffer)); - }); + } + ); return request; } protected: - std::unique_ptr MakeResponse(const typename TEvResponse::TKey& key, const TOutcome& outcome) const override { + std::unique_ptr MakeResponse( + const typename TEvResponse::TKey& key, + const typename TEvResponse::TOutcome& outcome) const override + { Y_ABORT_UNLESS(Range); std::unique_ptr response; if (outcome.IsSuccess()) { @@ -202,42 +172,39 @@ class TOutputStreamContext: public TContextBase { } else { response = std::make_unique(key, *Range, outcome); } - return TBase::ReplyAdapter.RebuildReplyEvent(std::move(response)); + return this->ReplyAdapter.RebuildReplyEvent(std::move(response)); } private: + std::optional> Range; mutable TString Buffer; }; // TOutputStreamContext template -class TInputStreamContext: public TContextBase { -private: - using TBase = TContextBase; -protected: - using TRequest = typename TEvRequest::TRequest; - using TOutcome = typename TEvResponse::TOutcome; - -private: +class TInputStreamContext: public TGenericContext { class TInputStreamBuf: public PreallocatedStreamBuf { TInputStreamBuf(char* data, size_t size) - : PreallocatedStreamBuf(reinterpret_cast(data), size) { + : PreallocatedStreamBuf(reinterpret_cast(data), size) + { } TInputStreamBuf(const char* data, size_t size) - : TInputStreamBuf(const_cast(data), size) { + : TInputStreamBuf(const_cast(data), size) + { } public: explicit TInputStreamBuf(const TStringBuf buf) - : TInputStreamBuf(buf.data(), buf.size()) { + : TInputStreamBuf(buf.data(), buf.size()) + { } }; public: - using TBase::TBase; + using TGenericContext::TGenericContext; - const TRequest& PrepareRequest(typename TEvRequest::TPtr& ev) override { + const typename TEvRequest::TRequest& PrepareRequest(typename TEvRequest::TPtr& ev) override { auto& request = ev->Get()->MutableRequest(); Buffer = std::move(ev->Get()->Body); request.SetBody(MakeShared("StreamContext", @@ -251,40 +218,31 @@ class TInputStreamContext: public TContextBase { }; // TInputStreamContext -template typename TContext = TContextBase> -class TContextWithStorageClass : public TContext { -private: - using TBase = TContext; - +template typename TContext = TGenericContext> +class TContextWithStorageClass: public TContext { public: - using TBase::TBase; + using TContext::TContext; - const typename TBase::TRequest& PrepareRequest(typename TEvRequest::TPtr& ev) override { + const typename TEvRequest::TRequest& PrepareRequest(typename TEvRequest::TPtr& ev) override { auto& request = ev->Get()->MutableRequest(); - if (TBase::StorageClass != Aws::S3::Model::StorageClass::NOT_SET) { - request.WithStorageClass(TBase::StorageClass); + if (this->StorageClass != Aws::S3::Model::StorageClass::NOT_SET) { + request.WithStorageClass(this->StorageClass); } - return TBase::PrepareRequest(ev); + return TContext::PrepareRequest(ev); } -}; // TContextWithStorageClass +}; template -class TPutInputStreamContext: public TContextWithStorageClass { -private: - using TBase = TContextWithStorageClass; - +class TPutObjectContext: public TContextWithStorageClass { public: - using TBase::TBase; -}; // TPutInputStreamContext + using TContextWithStorageClass::TContextWithStorageClass; +}; template class TCreateMultipartUploadContext: public TContextWithStorageClass { -private: - using TBase = TContextWithStorageClass; - public: - using TBase::TBase; -}; // TCreateMultipartUploadContext + using TContextWithStorageClass::TContextWithStorageClass; +}; } // anonymous @@ -302,7 +260,7 @@ void TS3ExternalStorage::Execute(TEvGetObjectRequest::TPtr& ev) const { } void TS3ExternalStorage::Execute(TEvCheckObjectExistsRequest::TPtr& ev) const { - Call( + Call( #if AWS_SDK_VERSION_MAJOR == 1 && AWS_SDK_VERSION_MINOR >= 11 ev, &S3Client::HeadObjectAsync<>); #else @@ -311,7 +269,7 @@ void TS3ExternalStorage::Execute(TEvCheckObjectExistsRequest::TPtr& ev) const { } void TS3ExternalStorage::Execute(TEvListObjectsRequest::TPtr& ev) const { - Call( + Call( #if AWS_SDK_VERSION_MAJOR == 1 && AWS_SDK_VERSION_MINOR >= 11 ev, &S3Client::ListObjectsAsync<>); #else @@ -320,7 +278,7 @@ void TS3ExternalStorage::Execute(TEvListObjectsRequest::TPtr& ev) const { } void TS3ExternalStorage::Execute(TEvHeadObjectRequest::TPtr& ev) const { - Call( + Call( #if AWS_SDK_VERSION_MAJOR == 1 && AWS_SDK_VERSION_MINOR >= 11 ev, &S3Client::HeadObjectAsync<>); #else @@ -329,12 +287,12 @@ void TS3ExternalStorage::Execute(TEvHeadObjectRequest::TPtr& ev) const { } void TS3ExternalStorage::Execute(TEvPutObjectRequest::TPtr& ev) const { - Call( + Call( ev, &S3Client::PutObjectAsync); } void TS3ExternalStorage::Execute(TEvDeleteObjectRequest::TPtr& ev) const { - Call( + Call( #if AWS_SDK_VERSION_MAJOR == 1 && AWS_SDK_VERSION_MINOR >= 11 ev, &S3Client::DeleteObjectAsync<>); #else @@ -343,7 +301,7 @@ void TS3ExternalStorage::Execute(TEvDeleteObjectRequest::TPtr& ev) const { } void TS3ExternalStorage::Execute(TEvDeleteObjectsRequest::TPtr& ev) const { - Call( + Call( #if AWS_SDK_VERSION_MAJOR == 1 && AWS_SDK_VERSION_MINOR >= 11 ev, &S3Client::DeleteObjectsAsync<>); #else @@ -370,7 +328,7 @@ void TS3ExternalStorage::Execute(TEvUploadPartRequest::TPtr& ev) const { } void TS3ExternalStorage::Execute(TEvCompleteMultipartUploadRequest::TPtr& ev) const { - Call( + Call( #if AWS_SDK_VERSION_MAJOR == 1 && AWS_SDK_VERSION_MINOR >= 11 ev, &S3Client::CompleteMultipartUploadAsync<>); #else @@ -379,7 +337,7 @@ void TS3ExternalStorage::Execute(TEvCompleteMultipartUploadRequest::TPtr& ev) co } void TS3ExternalStorage::Execute(TEvAbortMultipartUploadRequest::TPtr& ev) const { - Call( + Call( #if AWS_SDK_VERSION_MAJOR == 1 && AWS_SDK_VERSION_MINOR >= 11 ev, &S3Client::AbortMultipartUploadAsync<>); #else @@ -388,7 +346,7 @@ void TS3ExternalStorage::Execute(TEvAbortMultipartUploadRequest::TPtr& ev) const } void TS3ExternalStorage::Execute(TEvUploadPartCopyRequest::TPtr& ev) const { - Call( + Call( #if AWS_SDK_VERSION_MAJOR == 1 && AWS_SDK_VERSION_MINOR >= 11 ev, &S3Client::UploadPartCopyAsync<>); #else diff --git a/ydb/core/wrappers/s3_storage.h b/ydb/core/wrappers/s3_storage.h index 5af392c4f63c..26624a771737 100644 --- a/ydb/core/wrappers/s3_storage.h +++ b/ydb/core/wrappers/s3_storage.h @@ -4,27 +4,22 @@ #include "abstract.h" -#include -#include - -#include -#include #include #include +#include #include #include #include +#include #include #include #include -#include -#include +#include #include +#include #include + #include -#include -#include -#include #include #include @@ -60,27 +55,28 @@ class TS3ExternalStorage: public IExternalStorageOperator { const Aws::S3::S3Client*, const typename TEvRequest::TRequest& request, const typename TEvResponse::TOutcome& outcome, - const std::shared_ptr& context) { - const auto* ctx = static_cast(context.get()); - - Y_DEFER { - std::unique_lock guard(RunningQueriesMutex); - --RunningQueriesCount; - if (RunningQueriesCount == 0) { - RunningQueriesNotifier.notify_all(); - } - }; - - if (Verbose) { - LOG_NOTICE_S(*ctx->GetActorSystem(), NKikimrServices::S3_WRAPPER, "Response" - << ": uuid# " << ctx->GetUUID() - << ", response# " << outcome); - } else { - LOG_INFO_S(*ctx->GetActorSystem(), NKikimrServices::S3_WRAPPER, "Response" - << ": uuid# " << ctx->GetUUID() - << ", response# " << outcome); + const std::shared_ptr& context) + { + const auto* ctx = static_cast(context.get()); + + Y_DEFER { + std::unique_lock guard(RunningQueriesMutex); + --RunningQueriesCount; + if (RunningQueriesCount == 0) { + RunningQueriesNotifier.notify_all(); } - ctx->Reply(request, outcome); + }; + + if (Verbose) { + LOG_NOTICE_S(*ctx->GetActorSystem(), NKikimrServices::S3_WRAPPER, "Response" + << ": uuid# " << ctx->GetUUID() + << ", response# " << outcome); + } else { + LOG_INFO_S(*ctx->GetActorSystem(), NKikimrServices::S3_WRAPPER, "Response" + << ": uuid# " << ctx->GetUUID() + << ", response# " << outcome); + } + ctx->Reply(request, outcome); }; if (Verbose) { @@ -99,11 +95,12 @@ class TS3ExternalStorage: public IExternalStorageOperator { } public: - TS3ExternalStorage(const Aws::Client::ClientConfiguration& config, - const Aws::Auth::AWSCredentials& credentials, - const TString& bucket, const Aws::S3::Model::StorageClass storageClass, - bool verbose = true, - bool useVirtualAdressing = true) + TS3ExternalStorage( + const Aws::Client::ClientConfiguration& config, + const Aws::Auth::AWSCredentials& credentials, + const TString& bucket, const Aws::S3::Model::StorageClass storageClass, + bool verbose = true, + bool useVirtualAdressing = true) : Client(new Aws::S3::S3Client( credentials, config, @@ -132,6 +129,7 @@ class TS3ExternalStorage: public IExternalStorageOperator { virtual void Execute(TEvAbortMultipartUploadRequest::TPtr& ev) const override; virtual void Execute(TEvUploadPartCopyRequest::TPtr& ev) const override; }; + } // NKikimr::NWrappers::NExternalStorage #endif // KIKIMR_DISABLE_S3_OPS diff --git a/ydb/core/wrappers/s3_storage_config.cpp b/ydb/core/wrappers/s3_storage_config.cpp index b8e36f1db910..920cabeea20e 100644 --- a/ydb/core/wrappers/s3_storage_config.cpp +++ b/ydb/core/wrappers/s3_storage_config.cpp @@ -3,8 +3,6 @@ #include -#include - #ifndef KIKIMR_DISABLE_S3_OPS namespace NKikimr::NWrappers::NExternalStorage { @@ -12,7 +10,7 @@ namespace { namespace NPrivate { -template +template Aws::Client::ClientConfiguration ConfigFromSettings(const TSettings& settings) { Aws::Client::ClientConfiguration config; @@ -39,7 +37,7 @@ Aws::Client::ClientConfiguration ConfigFromSettings(const TSettings& settings) { return config; } -template +template Aws::Auth::AWSCredentials CredentialsFromSettings(const TSettings& settings) { return Aws::Auth::AWSCredentials(settings.access_key(), settings.secret_key()); } @@ -50,21 +48,21 @@ Aws::Auth::AWSCredentials CredentialsFromSettings(const TSettings& settings) { class TS3ThreadsPoolByEndpoint { private: - class TPool { public: std::shared_ptr Executor; ui32 ThreadsCount = 0; + TPool(const std::shared_ptr& executor, const ui32 threadsCount) : Executor(executor) , ThreadsCount(threadsCount) { - } }; THashMap Pools; TMutex Mutex; + std::shared_ptr GetPoolImpl(const TString& endpoint, const ui32 threadsCount) { TGuard g(Mutex); auto it = Pools.find(endpoint); @@ -77,6 +75,7 @@ class TS3ThreadsPoolByEndpoint { } return it->second.Executor; } + public: static std::shared_ptr GetPool(const TString& endpoint, const ui32 threadsCount) { return Singleton()->GetPoolImpl(endpoint, threadsCount); @@ -90,15 +89,17 @@ Aws::Client::ClientConfiguration TS3ExternalStorageConfig::ConfigFromSettings(co if (settings.HasConnectionTimeoutMs()) { config.connectTimeoutMs = settings.GetConnectionTimeoutMs(); } + if (settings.HasRequestTimeoutMs()) { config.requestTimeoutMs = settings.GetRequestTimeoutMs(); } + if (settings.HasHttpRequestTimeoutMs()) { config.httpRequestTimeoutMs = settings.GetHttpRequestTimeoutMs(); } + config.executor = TS3ThreadsPoolByEndpoint::GetPool(settings.GetEndpoint(), settings.GetExecutorThreadsCount()); config.enableTcpKeepAlive = true; - // config.lowSpeedLimit = 0; config.maxConnections = settings.HasMaxConnectionsCount() ? settings.GetMaxConnectionsCount() : settings.GetExecutorThreadsCount(); config.caPath = "/etc/ssl/certs"; @@ -184,8 +185,10 @@ TS3ExternalStorageConfig::TS3ExternalStorageConfig(const Ydb::Export::ExportToS3 Bucket = settings.bucket(); } -TS3ExternalStorageConfig::TS3ExternalStorageConfig(const Aws::Auth::AWSCredentials& credentials, - const Aws::Client::ClientConfiguration& config, const TString& bucket) +TS3ExternalStorageConfig::TS3ExternalStorageConfig( + const Aws::Auth::AWSCredentials& credentials, + const Aws::Client::ClientConfiguration& config, + const TString& bucket) : Config(config) , Credentials(credentials) { @@ -220,6 +223,7 @@ Aws::S3::Model::StorageClass TS3ExternalStorageConfig::ConvertStorageClass(const case Ydb::Export::ExportToS3Settings::OUTPOSTS: return Aws::S3::Model::StorageClass::OUTPOSTS; case Ydb::Export::ExportToS3Settings::STORAGE_CLASS_UNSPECIFIED: + [[fallthrough]]; default: return Aws::S3::Model::StorageClass::NOT_SET; } diff --git a/ydb/core/wrappers/s3_storage_config.h b/ydb/core/wrappers/s3_storage_config.h index b276bd79b363..8623f1176038 100644 --- a/ydb/core/wrappers/s3_storage_config.h +++ b/ydb/core/wrappers/s3_storage_config.h @@ -12,9 +12,6 @@ #include -#include -#include - namespace NKikimr::NWrappers::NExternalStorage { class TS3ExternalStorageConfig: public IExternalStorageConfig { @@ -31,9 +28,11 @@ class TS3ExternalStorageConfig: public IExternalStorageConfig { static Aws::Auth::AWSCredentials CredentialsFromSettings(const Ydb::Import::ImportFromS3Settings& settings); static Aws::Client::ClientConfiguration ConfigFromSettings(const Ydb::Export::ExportToS3Settings& settings); static Aws::Auth::AWSCredentials CredentialsFromSettings(const Ydb::Export::ExportToS3Settings& settings); + protected: virtual TString DoGetStorageId() const override; virtual IExternalStorageOperator::TPtr DoConstructStorageOperator(bool verbose) const override; + public: static Aws::S3::Model::StorageClass ConvertStorageClass(const Ydb::Export::ExportToS3Settings::StorageClass storage); @@ -50,6 +49,7 @@ class TS3ExternalStorageConfig: public IExternalStorageConfig { TS3ExternalStorageConfig(const Ydb::Export::ExportToS3Settings& settings); TS3ExternalStorageConfig(const Aws::Auth::AWSCredentials& credentials, const Aws::Client::ClientConfiguration& config, const TString& bucket); }; + } // NKikimr::NWrappers::NExternalStorage #endif // KIKIMR_DISABLE_S3_OPS diff --git a/ydb/core/wrappers/s3_wrapper.cpp b/ydb/core/wrappers/s3_wrapper.cpp index 025fb9eef4b7..dc2fbc4f3a1a 100644 --- a/ydb/core/wrappers/s3_wrapper.cpp +++ b/ydb/core/wrappers/s3_wrapper.cpp @@ -4,27 +4,15 @@ #include "s3_wrapper.h" -#include -#include -#include -#include -#include - #include #include -#include - -#include -#include -#include namespace NKikimr::NWrappers { namespace NExternalStorage { class TS3Wrapper: public TActor { - - template + template void Handle(T& ev) { StorageOperator->Execute(ev); } @@ -54,7 +42,7 @@ class TS3Wrapper: public TActor { hFunc(TEvCheckObjectExistsRequest, Handle); hFunc(TEvUploadPartCopyRequest, Handle); - cFunc(TEvents::TEvPoison::EventType, PassAway); + sFunc(TEvents::TEvPoison, PassAway); } } diff --git a/ydb/core/wrappers/s3_wrapper.h b/ydb/core/wrappers/s3_wrapper.h index aa4c35bb007d..4f2c5a62b3af 100644 --- a/ydb/core/wrappers/s3_wrapper.h +++ b/ydb/core/wrappers/s3_wrapper.h @@ -5,4 +5,5 @@ namespace NKikimr::NWrappers { IActor* CreateS3Wrapper(NExternalStorage::IExternalStorageOperator::TPtr storage); + } // NKikimr::NWrappers