diff --git a/ydb/core/grpc_services/rpc_ping.cpp b/ydb/core/grpc_services/rpc_ping.cpp index 7198cd7d056d..16309051c903 100644 --- a/ydb/core/grpc_services/rpc_ping.cpp +++ b/ydb/core/grpc_services/rpc_ping.cpp @@ -217,6 +217,205 @@ class TExecuteTxProxyPingRPC : public TActorBootstrapped std::shared_ptr Request_; }; +//////////////////////////////////////////////////////////////////////////////// + +using TEvActorChainRequest = TGrpcRequestNoOperationCall; + +// creates a chain of TChainWorkerActor workers and replies, when they finish +class TExecuteActorChainPingRPC : public TActorBootstrapped { +private: + struct TEvPrivate { + enum EEv { + EvChainItemComplete = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvEnd + }; + + struct TEvChainItemComplete : TEventLocal { + }; + }; + + class TChainWorkerActor : public TActorBootstrapped { + public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::OTHER; + } + + TChainWorkerActor(const TActorId& prevActor, size_t chainsLeft, size_t workUsec, bool noTailChain) + : ReplyTo(prevActor) + , ChainsLeft(chainsLeft) + , WorkUsec(workUsec) + , NoTailChain(noTailChain) + {} + + void Bootstrap(const TActorContext &ctx) { + this->Become(&TThis::StateWork); + + Proceed(ctx); + } + + private: + void StateWork(TAutoPtr& ev) { + try { + switch (ev->GetTypeRewrite()) { + HFunc(TEvPrivate::TEvChainItemComplete, Handle); + default: + UnexpectedEvent(__func__, ev); + } + } catch (const yexception& ex) { + InternalError(ex.what()); + } + } + + void Proceed(const TActorContext &ctx) { + LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, SelfId() << " chain worker with " + << ChainsLeft << " next chains and " << ReplyTo << " prev actor bootstrapped"); + + if (ChainsLeft == 0) { + WorkAndDie(); + return; + } + + auto* nextChainActor = new TChainWorkerActor(SelfId(), ChainsLeft - 1, WorkUsec, NoTailChain); + if (NoTailChain) { + RegisterWithSameMailbox(nextChainActor); + } else { + // same mailbox + tail + TlsActivationContext->ExecutorThread.RegisterActor( + nextChainActor, + &TlsActivationContext->Mailbox, + this->SelfId()); + } + } + + void WorkAndDie() { + // assume that 1 iteration is ~ 0.5 ns + size_t iterations = WorkUsec * 1000 / 2; + volatile size_t dummy = 0; + for (size_t i = 0; i < iterations; ++i) { + ++dummy; + } + + if (NoTailChain) { + Send(ReplyTo, new TEvPrivate::TEvChainItemComplete()); + } else { + Send(ReplyTo, new TEvPrivate::TEvChainItemComplete()); + } + + PassAway(); + } + + void Handle(TEvPrivate::TEvChainItemComplete::TPtr&, const TActorContext& ctx) { + LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, SelfId() << " chain worker with " + << ChainsLeft << " next chains and " << ReplyTo << " prev actor got chain reply"); + + WorkAndDie(); + } + + void InternalError(const TString& message) { + ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message); + Send(ReplyTo, new TEvPrivate::TEvChainItemComplete()); + PassAway(); + } + + void UnexpectedEvent(const TString& state, TAutoPtr& ev) { + InternalError(TStringBuilder() << "TChainWorkerActor in state " << state << " received unexpected event " + << ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite())); + } + + private: + TActorId ReplyTo; + size_t ChainsLeft; + size_t WorkUsec; + bool NoTailChain; + }; + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::OTHER; + } + + TExecuteActorChainPingRPC(TEvActorChainRequest* request) + : Request_(request) + {} + + void Bootstrap(const TActorContext &ctx) { + this->Become(&TThis::StateWork); + + Proceed(ctx); + } + +private: + void StateWork(TAutoPtr& ev) { + try { + switch (ev->GetTypeRewrite()) { + HFunc(TEvPrivate::TEvChainItemComplete, Handle); + default: + UnexpectedEvent(__func__, ev); + } + } catch (const yexception& ex) { + InternalError(ex.what()); + } + } + + void Proceed(const TActorContext &ctx) { + const auto* req = Request_->GetProtoRequest(); + size_t chainLength = req->GetChainLength(); + if (chainLength == 0) { + chainLength = 10; + } + size_t workUsec = req->GetWorkUsec(); + if (workUsec == 0) { + workUsec = 5; + } + + bool noTailChain = req->GetNoTailChain(); + + LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() + << " sending ping to ActorChain of length " << chainLength << " with work " << workUsec << " usec"); + + auto* nextChainActor = new TChainWorkerActor(this->SelfId(), chainLength, workUsec, noTailChain); + TActorId nextChainActorId; + if (noTailChain) { + nextChainActorId = RegisterWithSameMailbox(nextChainActor); + } else { + // same mailbox + tail + nextChainActorId = TlsActivationContext->ExecutorThread.RegisterActor( + nextChainActor, + &TlsActivationContext->Mailbox, + this->SelfId()); + } + + if (!nextChainActorId) { + LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " failed to send ping to ActorChain"); + ReplyWithResult(StatusIds::INTERNAL_ERROR, ctx); + } + } + + void Handle(TEvPrivate::TEvChainItemComplete::TPtr&, const TActorContext& ctx) { + LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " got ping response from full ActorChain"); + ReplyWithResult(StatusIds::SUCCESS, ctx); + } + +private: + void ReplyWithResult(StatusIds::StatusCode status, const TActorContext &ctx) { + Request_->ReplyWithYdbStatus(status); + Die(ctx); + } + + void InternalError(const TString& message) { + ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message); + ReplyWithResult(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext()); + } + + void UnexpectedEvent(const TString& state, TAutoPtr& ev) { + InternalError(TStringBuilder() << "TExecuteActorChainPingRPC in state " << state << " received unexpected event " + << ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite())); + } + +private: + std::shared_ptr Request_; +}; + } // anonymous //////////////////////////////////////////////////////////////////////////////// @@ -248,4 +447,10 @@ void DoTxProxyPing(std::unique_ptr p, const IFacilityProvider& f.RegisterActor(new TExecuteTxProxyPingRPC(request)); } +void DoActorChainPing(std::unique_ptr p, const IFacilityProvider& f) { + auto* request = dynamic_cast(p.release()); + Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoActorChainPing"); + f.RegisterActor(new TExecuteActorChainPingRPC(request)); +} + } // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/service_debug.h b/ydb/core/grpc_services/service_debug.h index df73e870d829..29a0285ee3c2 100644 --- a/ydb/core/grpc_services/service_debug.h +++ b/ydb/core/grpc_services/service_debug.h @@ -12,5 +12,6 @@ void DoGrpcProxyPing(std::unique_ptr p, const IFacilityProvider void DoKqpPing(std::unique_ptr p, const IFacilityProvider& f); void DoSchemeCachePing(std::unique_ptr p, const IFacilityProvider& f); void DoTxProxyPing(std::unique_ptr p, const IFacilityProvider& f); +void DoActorChainPing(std::unique_ptr p, const IFacilityProvider& f); } // namespace NKikimr::NGRpcService diff --git a/ydb/core/jaeger_tracing/request_discriminator.h b/ydb/core/jaeger_tracing/request_discriminator.h index f3249d0d64c7..fac53d9ea391 100644 --- a/ydb/core/jaeger_tracing/request_discriminator.h +++ b/ydb/core/jaeger_tracing/request_discriminator.h @@ -78,6 +78,7 @@ enum class ERequestType: size_t { PING_KQP, PING_SCHEME_CACHE, PING_TX_PROXY, + PING_ACTOR_CHAIN, // Requests inside write session TOPIC_STREAMWRITE, diff --git a/ydb/public/api/grpc/ydb_debug_v1.proto b/ydb/public/api/grpc/ydb_debug_v1.proto index 7d3b414338f5..87692e405260 100644 --- a/ydb/public/api/grpc/ydb_debug_v1.proto +++ b/ydb/public/api/grpc/ydb_debug_v1.proto @@ -11,4 +11,5 @@ service DebugService { rpc PingKqpProxy(Debug.KqpProxyRequest) returns (Debug.KqpProxyResponse); rpc PingSchemeCache(Debug.SchemeCacheRequest) returns (Debug.SchemeCacheResponse); rpc PingTxProxy(Debug.TxProxyRequest) returns (Debug.TxProxyResponse); + rpc PingActorChain(Debug.ActorChainRequest) returns (Debug.ActorChainResponse); } diff --git a/ydb/public/api/protos/ydb_debug.proto b/ydb/public/api/protos/ydb_debug.proto index 163d578e3496..4870923badeb 100644 --- a/ydb/public/api/protos/ydb_debug.proto +++ b/ydb/public/api/protos/ydb_debug.proto @@ -57,3 +57,22 @@ message TxProxyResponse { StatusIds.StatusCode status = 1; repeated Ydb.Issue.IssueMessage issues = 2; } + +// Ping Actor Chain + +message ActorChainRequest { + + // number of actors to be created, default 10 + uint32 ChainLength = 1; + + // immitate work duration for each actor (approximate), default ~ 5 usec + uint32 WorkUsec = 2; + + // don't use tail sends and registrations + bool NoTailChain = 3; +} + +message ActorChainResponse { + StatusIds.StatusCode status = 1; + repeated Ydb.Issue.IssueMessage issues = 2; +} diff --git a/ydb/public/lib/ydb_cli/commands/ydb_latency.cpp b/ydb/public/lib/ydb_cli/commands/ydb_latency.cpp index 6b90f0eece69..98fd96a70b74 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_latency.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_latency.cpp @@ -113,7 +113,10 @@ void Evaluate( } else { ++result.ErrorCount; } - } catch (...) { + } catch (yexception ex) { + TStringStream ss; + ss << "Failed to perform request: " << ex.what() << Endl; + Cerr << ss.Str(); ++result.ErrorCount; } } @@ -143,8 +146,13 @@ TCommandLatency::TCommandLatency() , Format(DEFAULT_FORMAT) , RunKind(DEFAULT_RUN_KIND) , Percentile(DEFAULT_PERCENTILE) + , ChainConfig(new NDebug::TActorChainPingSettings()) {} +TCommandLatency::~TCommandLatency() { + delete ChainConfig; +} + void TCommandLatency::Config(TConfig& config) { TYdbCommand::Config(config); @@ -164,8 +172,19 @@ void TCommandLatency::Config(TConfig& config) { 'f', "format", TStringBuilder() << "Output format. Available options: " << availableFormats) .OptionalArgument("STRING").StoreResult(&Format).DefaultValue(DEFAULT_FORMAT); config.Opts->AddLongOption( - 'k', "kind", TStringBuilder() << "Use only specified ping kind. Available options: "<< availableKinds) + 'k', "kind", TStringBuilder() << "Use only specified ping kind. Available options: " << availableKinds) .OptionalArgument("STRING").StoreResult(&RunKind).DefaultValue(DEFAULT_RUN_KIND); + + // actor chain options + config.Opts->AddLongOption( + "chain-length", TStringBuilder() << "Chain length (ActorChain kind only)") + .OptionalArgument("INT").StoreResult(&ChainConfig->ChainLength).DefaultValue(ChainConfig->ChainLength); + config.Opts->AddLongOption( + "chain-work-usec", TStringBuilder() << "Amount of work for each actor in the chain (ActorChain kind only)") + .OptionalArgument("INT").StoreResult(&ChainConfig->WorkUsec).DefaultValue(ChainConfig->WorkUsec); + config.Opts->AddLongOption( + "no-tail-chain", TStringBuilder() << "Don't use Tail sends and registrations (ActorChain kind only)") + .NoArgument().SetFlag(&ChainConfig->NoTailChain).DefaultValue(ChainConfig->NoTailChain); } void TCommandLatency::Parse(TConfig& config) { @@ -218,6 +237,13 @@ int TCommandLatency::Run(TConfig& config) { }; }; + auto chainConfig = *ChainConfig; + auto txActorChainPingFactory = [debugClient, chainConfig] () { + return [debugClient, chainConfig] () { + return TCommandPing::PingActorChain(*debugClient, chainConfig); + }; + }; + using TTaskPair = std::pair; const std::vector allTasks = { { TCommandPing::EPingKind::PlainGrpc, plainGrpcPingFactory }, @@ -226,6 +252,7 @@ int TCommandLatency::Run(TConfig& config) { { TCommandPing::EPingKind::Select1, select1Factory }, { TCommandPing::EPingKind::SchemeCache, schemeCachePingFactory }, { TCommandPing::EPingKind::TxProxy, txProxyPingFactory }, + { TCommandPing::EPingKind::ActorChain, txActorChainPingFactory }, }; std::vector runTasks; diff --git a/ydb/public/lib/ydb_cli/commands/ydb_latency.h b/ydb/public/lib/ydb_cli/commands/ydb_latency.h index c58b4d3790ee..18fe0d5b05cc 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_latency.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_latency.h @@ -22,6 +22,7 @@ class TCommandLatency public: TCommandLatency(); + ~TCommandLatency(); virtual void Config(TConfig& config) override; virtual void Parse(TConfig& config) override; @@ -33,6 +34,8 @@ class TCommandLatency EFormat Format; TCommandPing::EPingKind RunKind; double Percentile; + + NDebug::TActorChainPingSettings* ChainConfig; }; } // NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp b/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp index 94c0ed9ae7be..9403507e82a2 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp @@ -20,6 +20,7 @@ const TVector PingKindDescriptions = { "ping executes a very simple 'SELECT 1;' query", "ping goes through GRPC layer to SchemeCache and returns", "ping goes through GRPC layer to TxProxy and allocates TxId", + "ping goes through GRPC layer including GRPC proxy and creates dummy actors chain to process request" }; } // anonymous @@ -104,6 +105,9 @@ int TCommandPing::RunCommand(TConfig& config) { case EPingKind::TxProxy: isOk = PingTxProxy(pingClient); break; + case EPingKind::ActorChain: + isOk = PingActorChain(pingClient, NDebug::TActorChainPingSettings()); + break; default: std::cerr << "Unknown ping kind" << std::endl; return EXIT_FAILURE; @@ -138,6 +142,18 @@ int TCommandPing::RunCommand(TConfig& config) { return 0; } +bool CheckResult(const TStatus& status) { + if (status.IsSuccess()) { + return true; + } + + for (const auto& issue: status.GetIssues()) { + Cerr << "Error: " << issue.ToString(true) << Endl; + } + + return false; +} + bool TCommandPing::PingPlainGrpc(NDebug::TDebugClient& client) { auto asyncResult = client.PingPlainGrpc(NDebug::TPlainGrpcPingSettings()); asyncResult.GetValueSync(); @@ -149,44 +165,35 @@ bool TCommandPing::PingPlainKqp(NDebug::TDebugClient& client) { auto asyncResult = client.PingKqpProxy(NDebug::TKqpProxyPingSettings()); auto result = asyncResult.GetValueSync(); - if (result.IsSuccess()) { - return true; - } - - return false; + return CheckResult(result); } bool TCommandPing::PingGrpcProxy(NDebug::TDebugClient& client) { auto asyncResult = client.PingGrpcProxy(NDebug::TGrpcProxyPingSettings()); auto result = asyncResult.GetValueSync(); - if (result.IsSuccess()) { - return true; - } - - return false; + return CheckResult(result); } bool TCommandPing::PingSchemeCache(NDebug::TDebugClient& client) { auto asyncResult = client.PingSchemeCache(NDebug::TSchemeCachePingSettings()); auto result = asyncResult.GetValueSync(); - if (result.IsSuccess()) { - return true; - } - - return false; + return CheckResult(result); } bool TCommandPing::PingTxProxy(NDebug::TDebugClient& client) { auto asyncResult = client.PingTxProxy(NDebug::TTxProxyPingSettings()); auto result = asyncResult.GetValueSync(); - if (result.IsSuccess()) { - return true; - } + return CheckResult(result); +} - return false; +bool TCommandPing::PingActorChain(NDebug::TDebugClient& client, const NDebug::TActorChainPingSettings& settings) { + auto asyncResult = client.PingActorChain(settings); + auto result = asyncResult.GetValueSync(); + + return CheckResult(result); } bool TCommandPing::PingKqpSelect1(NQuery::TQueryClient& client, const TString& query) { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_ping.h b/ydb/public/lib/ydb_cli/commands/ydb_ping.h index 27065f8f0dbe..49d554c2d5dc 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_ping.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_ping.h @@ -14,6 +14,7 @@ namespace NQuery { namespace NDebug { class TDebugClient; + struct TActorChainPingSettings; }; namespace NConsoleClient { @@ -29,6 +30,7 @@ class TCommandPing : public TYdbCommand, public TCommandWithFormat, Select1, SchemeCache, TxProxy, + ActorChain, AllKinds, }; @@ -43,6 +45,7 @@ class TCommandPing : public TYdbCommand, public TCommandWithFormat, static bool PingGrpcProxy(NDebug::TDebugClient& client); static bool PingSchemeCache(NDebug::TDebugClient& client); static bool PingTxProxy(NDebug::TDebugClient& client); + static bool PingActorChain(NDebug::TDebugClient& client, const NDebug::TActorChainPingSettings& settings); static bool PingKqpSelect1(NQuery::TQueryClient& client, const TString& query); static bool PingKqpSelect1(NQuery::TSession& session, const TString& query); diff --git a/ydb/public/sdk/cpp/client/ydb_debug/client.cpp b/ydb/public/sdk/cpp/client/ydb_debug/client.cpp index 0749645dd847..8b14ede26a95 100644 --- a/ydb/public/sdk/cpp/client/ydb_debug/client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_debug/client.cpp @@ -40,6 +40,28 @@ class TDebugClient::TImpl: public TClientImplCommon { return pingPromise; } + auto PingActorChain(const TActorChainPingSettings& settings) { + auto pingPromise = NewPromise(); + auto responseCb = [pingPromise] (Debug::ActorChainResponse*, TPlainStatus status) mutable { + TActorChainPingResult val(TStatus(std::move(status))); + pingPromise.SetValue(std::move(val)); + }; + + Debug::ActorChainRequest request; + request.SetChainLength(settings.ChainLength); + request.SetWorkUsec(settings.WorkUsec); + request.SetNoTailChain(settings.NoTailChain); + + Connections_->Run( + std::move(request), + responseCb, + &Debug::V1::DebugService::Stub::AsyncPingActorChain, + DbDriverState_, + TRpcRequestSettings::Make(settings)); + + return pingPromise; + } + ~TImpl() = default; }; @@ -74,4 +96,8 @@ TAsyncTxProxyPingResult TDebugClient::PingTxProxy(const TTxProxyPingSettings& se settings, &Debug::V1::DebugService::Stub::AsyncPingTxProxy); } +TAsyncActorChainPingResult TDebugClient::PingActorChain(const TActorChainPingSettings& settings) { + return Impl_->PingActorChain(settings); +} + } // namespace NYdb::NDebug \ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_debug/client.h b/ydb/public/sdk/cpp/client/ydb_debug/client.h index 9e0b4581a7a3..1643fd050bea 100644 --- a/ydb/public/sdk/cpp/client/ydb_debug/client.h +++ b/ydb/public/sdk/cpp/client/ydb_debug/client.h @@ -41,6 +41,13 @@ class TTxProxyPingResult: public TStatus { {} }; +class TActorChainPingResult: public TStatus { +public: + TActorChainPingResult(TStatus&& status) + : TStatus(std::move(status)) + {} +}; + //////////////////////////////////////////////////////////////////////////////// using TAsyncPlainGrpcPingResult = NThreading::TFuture; @@ -48,6 +55,7 @@ using TAsyncGrpcProxyPingResult = NThreading::TFuture; using TAsyncKqpProxyPingResult = NThreading::TFuture; using TAsyncSchemeCachePingResult = NThreading::TFuture; using TAsyncTxProxyPingResult = NThreading::TFuture; +using TAsyncActorChainPingResult = NThreading::TFuture; //////////////////////////////////////////////////////////////////////////////// @@ -57,6 +65,12 @@ struct TKqpProxyPingSettings : public TOperationRequestSettings {}; struct TTxProxyPingSettings : public TOperationRequestSettings {}; +struct TActorChainPingSettings : public TOperationRequestSettings { + size_t ChainLength = 10; + size_t WorkUsec = 5; + bool NoTailChain = false; +}; + //////////////////////////////////////////////////////////////////////////////// struct TClientSettings : public TCommonClientSettingsBase { @@ -75,6 +89,8 @@ class TDebugClient { TAsyncSchemeCachePingResult PingSchemeCache(const TSchemeCachePingSettings& settings); TAsyncTxProxyPingResult PingTxProxy(const TTxProxyPingSettings& settings); + TAsyncActorChainPingResult PingActorChain(const TActorChainPingSettings& settings); + private: class TImpl; std::shared_ptr Impl_; diff --git a/ydb/services/ydb/ydb_debug.cpp b/ydb/services/ydb/ydb_debug.cpp index 194f609b9ece..b4d3b2205162 100644 --- a/ydb/services/ydb/ydb_debug.cpp +++ b/ydb/services/ydb/ydb_debug.cpp @@ -70,6 +70,7 @@ void TGRpcYdbDebugService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { ADD_REQUEST(PingKqpProxy, KqpProxyRequest, KqpProxyResponse, DoKqpPing, KQP); ADD_REQUEST(PingSchemeCache, SchemeCacheRequest, SchemeCacheResponse, DoSchemeCachePing, SCHEME_CACHE); ADD_REQUEST(PingTxProxy, TxProxyRequest, TxProxyResponse, DoTxProxyPing, TX_PROXY); + ADD_REQUEST(PingActorChain, ActorChainRequest, ActorChainResponse, DoActorChainPing, ACTOR_CHAIN); #undef ADD_REQUEST