diff --git a/ydb/public/lib/ydb_cli/commands/ya.make b/ydb/public/lib/ydb_cli/commands/ya.make index 8d97f61bb8e1..cccbd4fc5e70 100644 --- a/ydb/public/lib/ydb_cli/commands/ya.make +++ b/ydb/public/lib/ydb_cli/commands/ya.make @@ -12,6 +12,7 @@ SRCS( ydb_cluster.cpp ydb_debug.cpp ydb_dynamic_config.cpp + ydb_latency.cpp ydb_ping.cpp ydb_profile.cpp ydb_root_common.cpp @@ -72,6 +73,7 @@ PEERDIR( ) GENERATE_ENUM_SERIALIZATION(ydb_ping.h) +GENERATE_ENUM_SERIALIZATION(ydb_latency.h) END() diff --git a/ydb/public/lib/ydb_cli/commands/ydb_debug.cpp b/ydb/public/lib/ydb_cli/commands/ydb_debug.cpp index a78e0226c1e0..c2160b320f35 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_debug.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_debug.cpp @@ -1,5 +1,6 @@ #include "ydb_debug.h" +#include "ydb_latency.h" #include "ydb_ping.h" namespace NYdb::NConsoleClient { @@ -7,6 +8,7 @@ namespace NYdb::NConsoleClient { TCommandDebug::TCommandDebug() : TClientCommandTree("debug", {}, "YDB cluster debug operations") { + AddCommand(std::make_unique()); AddCommand(std::make_unique()); } diff --git a/ydb/public/lib/ydb_cli/commands/ydb_latency.cpp b/ydb/public/lib/ydb_cli/commands/ydb_latency.cpp new file mode 100644 index 000000000000..6b90f0eece69 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/ydb_latency.cpp @@ -0,0 +1,343 @@ +#include "ydb_latency.h" + +#include +#include + +#include + +#include +#include + +#include +#include +#include +#include + +namespace NYdb::NConsoleClient { + +namespace { + +constexpr int DEFAULT_WARMUP_SECONDS = 1; +constexpr int DEFAULT_INTERVAL_SECONDS = 5; +constexpr int DEFAULT_MAX_INFLIGHT = 128; +constexpr int DEFAULT_PERCENTILE = 99.0; + +constexpr TCommandLatency::EFormat DEFAULT_FORMAT = TCommandLatency::EFormat::Plain; +constexpr TCommandPing::EPingKind DEFAULT_RUN_KIND = TCommandPing::EPingKind::AllKinds; + +// 1-16, 32, 64, ... +constexpr int INCREMENT_UNTIL_THREAD_COUNT = 16; + +const TString QUERY = "SELECT 1;"; + +// Factory returns callable, which makes requests to the DB. +// In case of QueryService, this callable contains associated session. +using TRequestMaker = std::function; +using TCallableFactory = std::function; + +struct TResult { + TCommandPing::EPingKind Kind; + int ThreadCount = 0; + int LatencyUs = 0; + int Throughput = 0; +}; + +struct alignas(64) TEvaluateResult { + TEvaluateResult() + : LatencyHistogramUs(1, 1024, 5) + { + } + + ui64 OkCount = 0; + ui64 ErrorCount = 0; + int LatencyUs = 0; + + NHdr::THistogram LatencyHistogramUs; +}; + +void Evaluate( + TEvaluateResult& total, + ui64 warmupSeconds, + ui64 intervalSeconds, + int threadCount, + TCallableFactory factory, + double percentile) +{ + std::atomic startMeasure{false}; + std::atomic stop{false}; + + // May delay for ~50ms to compute frequency, better to delay here + volatile auto clockRate = NHPTimer::GetClockRate(); + Y_UNUSED(clockRate); + + auto timer = std::thread([&startMeasure, &stop, warmupSeconds, intervalSeconds]() { + std::this_thread::sleep_for(std::chrono::seconds(warmupSeconds)); + startMeasure.store(true, std::memory_order_relaxed); + + std::this_thread::sleep_for(std::chrono::seconds(intervalSeconds)); + stop.store(true, std::memory_order_relaxed); + }); + + std::vector results(threadCount); + std::vector threads; + + for (int i = 0; i < threadCount; ++i) { + threads.emplace_back([i, &results, &startMeasure, &stop, &factory]() { + auto& result = results[i]; + + THPTimer timer; + + TRequestMaker requester; + try { + requester = factory(); + } catch (yexception ex) { + Cerr << "Failed to create request maker: " << ex.what() << Endl; + return; + } + + while (!startMeasure.load(std::memory_order_relaxed)) { + try { + requester(); + } catch (...) { + continue; + } + } + + while (!stop.load(std::memory_order_relaxed)) { + try { + timer.Reset(); + if (requester()) { + int usecPassed = static_cast(timer.Passed() * 1'000'000); + result.LatencyHistogramUs.RecordValue(usecPassed); + ++result.OkCount; + } else { + ++result.ErrorCount; + } + } catch (...) { + ++result.ErrorCount; + } + } + }); + } + + timer.join(); + for (auto& thread : threads) { + thread.join(); + } + + for (const auto& result: results) { + total.OkCount += result.OkCount; + total.ErrorCount += result.ErrorCount; + total.LatencyHistogramUs.Add(result.LatencyHistogramUs); + } + + total.LatencyUs = total.LatencyHistogramUs.GetValueAtPercentile(percentile); +} + +} // anonymous + +TCommandLatency::TCommandLatency() + : TYdbCommand("latency", {}, "Check basic latency with variable inflight") + , IntervalSeconds(DEFAULT_INTERVAL_SECONDS) + , MaxInflight(DEFAULT_MAX_INFLIGHT) + , Format(DEFAULT_FORMAT) + , RunKind(DEFAULT_RUN_KIND) + , Percentile(DEFAULT_PERCENTILE) +{} + +void TCommandLatency::Config(TConfig& config) { + TYdbCommand::Config(config); + + const TString& availableKinds = GetEnumAllNames(); + const TString& availableFormats = GetEnumAllNames(); + + config.Opts->AddLongOption( + 'i', "interval", TStringBuilder() << "Seconds for each latency kind") + .RequiredArgument("INT").StoreResult(&IntervalSeconds).DefaultValue(DEFAULT_INTERVAL_SECONDS); + config.Opts->AddLongOption( + 'm', "max-inflight", TStringBuilder() << "Max inflight") + .RequiredArgument("INT").StoreResult(&MaxInflight).DefaultValue(DEFAULT_MAX_INFLIGHT); + config.Opts->AddLongOption( + 'p', "percentile", TStringBuilder() << "Latency percentile") + .RequiredArgument("DOUBLE").StoreResult(&Percentile).DefaultValue(DEFAULT_PERCENTILE); + config.Opts->AddLongOption( + 'f', "format", TStringBuilder() << "Output format. Available options: " << availableFormats) + .OptionalArgument("STRING").StoreResult(&Format).DefaultValue(DEFAULT_FORMAT); + config.Opts->AddLongOption( + 'k', "kind", TStringBuilder() << "Use only specified ping kind. Available options: "<< availableKinds) + .OptionalArgument("STRING").StoreResult(&RunKind).DefaultValue(DEFAULT_RUN_KIND); +} + +void TCommandLatency::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +int TCommandLatency::Run(TConfig& config) { + TDriver driver = CreateDriver(config); + + SetInterruptHandlers(); + + auto debugClient = std::make_shared(driver); + auto queryClient = std::make_shared(driver); + + auto plainGrpcPingFactory = [debugClient] () { + return [debugClient] () { + return TCommandPing::PingPlainGrpc(*debugClient); + }; + }; + + auto grpcPingFactory = [debugClient] () { + return [debugClient] () { + return TCommandPing::PingGrpcProxy(*debugClient); + }; + }; + + auto plainKqpPingFactory = [debugClient] () { + return [debugClient] () { + return TCommandPing::PingPlainKqp(*debugClient); + }; + }; + + auto schemeCachePingFactory = [debugClient] () { + return [debugClient] () { + return TCommandPing::PingSchemeCache(*debugClient); + }; + }; + + auto txProxyPingFactory = [debugClient] () { + return [debugClient] () { + return TCommandPing::PingTxProxy(*debugClient); + }; + }; + + auto select1Factory = [queryClient] () { + // note, that each thread has own session + auto session = std::make_shared(queryClient->GetSession().GetValueSync().GetSession()); + return [session] () { + return TCommandPing::PingKqpSelect1(*session, QUERY); + }; + }; + + using TTaskPair = std::pair; + const std::vector allTasks = { + { TCommandPing::EPingKind::PlainGrpc, plainGrpcPingFactory }, + { TCommandPing::EPingKind::GrpcProxy, grpcPingFactory }, + { TCommandPing::EPingKind::PlainKqp, plainKqpPingFactory }, + { TCommandPing::EPingKind::Select1, select1Factory }, + { TCommandPing::EPingKind::SchemeCache, schemeCachePingFactory }, + { TCommandPing::EPingKind::TxProxy, txProxyPingFactory }, + }; + + std::vector runTasks; + if (RunKind == TCommandPing::EPingKind::AllKinds) { + runTasks = allTasks; + } else { + for (size_t i = 0; i < allTasks.size(); ++i) { + if (allTasks[i].first == RunKind) { + runTasks = { allTasks[i] }; + break; + } + } + } + + if (runTasks.empty()) { + return -1; // sanity check, never happens + } + + std::vector results; + for (const auto& [taskKind, factory]: runTasks) { + for (int threadCount = 1; threadCount <= MaxInflight && !IsInterrupted(); ) { + TEvaluateResult result; + Evaluate(result, DEFAULT_WARMUP_SECONDS, IntervalSeconds, threadCount, factory, Percentile); + + bool skip = false; + if (result.ErrorCount) { + auto totalRequests = result.ErrorCount + result.OkCount; + double errorsPercent = 100.0 * result.ErrorCount / totalRequests; + if (errorsPercent >= 1) { + Cerr << "Skipping " << taskKind << ", threads=" << threadCount + << ": error rate=" << errorsPercent << "%" << Endl; + skip = true; + } + } + + if (!skip) { + ui64 throughput = result.OkCount / IntervalSeconds; + ui64 throughputPerThread = throughput / threadCount; + ui64 latencyUsec = result.LatencyUs; + + results.emplace_back(taskKind, threadCount, latencyUsec, throughput); + + if (Format == EFormat::Plain) { + Cout << taskKind << " threads=" << threadCount + << ", throughput: " << throughput + << ", per thread: " << throughputPerThread + << ", latency p" << Percentile << " usec: " << latencyUsec + << ", ok: " << result.OkCount + << ", error: " << result.ErrorCount << Endl; + } + } + + if (threadCount < INCREMENT_UNTIL_THREAD_COUNT) { + ++threadCount; + } else { + threadCount *= 2; + } + } + } + + if (Format == EFormat::Plain) { + return 0; + } + + TMap> latencies; + TMap> throughputs; + for (const auto& result: results) { + latencies[result.Kind].push_back(result.LatencyUs); + throughputs[result.Kind].push_back(result.Throughput); + } + + if (Format == EFormat::CSV) { + const int maxThreadsMeasured = results.back().ThreadCount; + + Cout << Endl; + Cout << "Latencies" << Endl; + + TStringStream ss; + ss << "Kind"; + for (int i = 1; i <= maxThreadsMeasured;) { + ss << "," << i; + if (i < INCREMENT_UNTIL_THREAD_COUNT) { + ++i; + } else { + i *= 2; + } + } + ss << Endl; + TString header = ss.Str(); + + Cout << header; + for (const auto& [kind, vec]: latencies) { + Cout << kind; + for (auto value: vec) { + Cout << "," << value; + } + Cout << Endl; + } + Cout << Endl; + + Cout << "Througputs" << Endl; + Cout << header; + for (const auto& [kind, vec]: throughputs) { + Cout << kind; + for (auto value: vec) { + Cout << "," << value; + } + Cout << Endl; + } + } + + return 0; +} + +} // NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/ydb_latency.h b/ydb/public/lib/ydb_cli/commands/ydb_latency.h new file mode 100644 index 000000000000..c58b4d3790ee --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/ydb_latency.h @@ -0,0 +1,38 @@ +#pragma once + +#include "ydb_command.h" + +#include "ydb_ping.h" + +#include +#include + +namespace NYdb::NConsoleClient { + +class TCommandLatency + : public TYdbCommand + , public TCommandWithFormat + , public TInterruptibleCommand +{ +public: + enum class EFormat { + Plain = 0, + CSV, + }; + +public: + TCommandLatency(); + + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; + +private: + int IntervalSeconds; + int MaxInflight; + EFormat Format; + TCommandPing::EPingKind RunKind; + double Percentile; +}; + +} // NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp b/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp index 81d24b93ac14..94c0ed9ae7be 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp @@ -224,4 +224,24 @@ bool TCommandPing::PingKqpSelect1(NQuery::TQueryClient& client, const TString& q return false; } +bool TCommandPing::PingKqpSelect1(NQuery::TSession& session, const TString& query) { + NQuery::TExecuteQuerySettings settings; + + // Execute query + settings.ExecMode(NQuery::EExecMode::Execute); + settings.StatsMode(NQuery::EStatsMode::None); + + settings.Syntax(NQuery::ESyntax::YqlV1); + + // Execute query without parameters + auto asyncResult = session.ExecuteQuery( + query, + NQuery::TTxControl::NoTx(), + settings + ); + + auto result = asyncResult.GetValueSync(); + return result.IsSuccess(); +} + } // NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/ydb_ping.h b/ydb/public/lib/ydb_cli/commands/ydb_ping.h index 557ee0c2ddd5..27065f8f0dbe 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_ping.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_ping.h @@ -1,7 +1,5 @@ #pragma once -#pragma once - #include "ydb_command.h" #include @@ -10,8 +8,8 @@ namespace NYdb { namespace NQuery { - class TExecuteQueryIterator; class TQueryClient; + class TSession; } // namespace NQuery namespace NDebug { @@ -31,6 +29,7 @@ class TCommandPing : public TYdbCommand, public TCommandWithFormat, Select1, SchemeCache, TxProxy, + AllKinds, }; public: @@ -39,17 +38,17 @@ class TCommandPing : public TYdbCommand, public TCommandWithFormat, virtual void Parse(TConfig& config) override; virtual int Run(TConfig& config) override; -private: - int RunCommand(TConfig& config); - int PrintResponse(NQuery::TExecuteQueryIterator& result); + static bool PingPlainGrpc(NDebug::TDebugClient& client); + static bool PingPlainKqp(NDebug::TDebugClient& client); + static bool PingGrpcProxy(NDebug::TDebugClient& client); + static bool PingSchemeCache(NDebug::TDebugClient& client); + static bool PingTxProxy(NDebug::TDebugClient& client); - bool PingPlainGrpc(NDebug::TDebugClient& client); - bool PingPlainKqp(NDebug::TDebugClient& client); - bool PingGrpcProxy(NDebug::TDebugClient& client); - bool PingSchemeCache(NDebug::TDebugClient& client); - bool PingTxProxy(NDebug::TDebugClient& client); + static bool PingKqpSelect1(NQuery::TQueryClient& client, const TString& query); + static bool PingKqpSelect1(NQuery::TSession& session, const TString& query); - bool PingKqpSelect1(NQuery::TQueryClient& client, const TString& query); +private: + int RunCommand(TConfig& config); private: int Count;