Skip to content

Commit

Permalink
Support progress stats for workload (#13725)
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd authored Jan 24, 2025
1 parent 2fdfe89 commit 6a1a387
Show file tree
Hide file tree
Showing 15 changed files with 247 additions and 38 deletions.
1 change: 1 addition & 0 deletions ydb/apps/ydb/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Added statistics output on the current progress of the query in `ydb workload` command
* Fixed a bug where arm64 YDB CLI binary was downloading amd64 binary to replace itself during `ydb update`. To update already installed binaries to the latest arm64 version, YDB CLI should be re-installed
* Fixed a bug where `ydb workload tpch import generator` and `ydb workload tpcds import generator` commands were failing due to not all tables were created
* Fixed a bug with backslashes in `ydb workload` benchmark paths on Windows
Expand Down
25 changes: 25 additions & 0 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ydb/core/grpc_services/grpc_integrity_trails.h>
#include <ydb/core/grpc_services/rpc_kqp_base.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/core/kqp/opt/kqp_query_plan.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/public/api/protos/ydb_query.pb.h>

Expand Down Expand Up @@ -214,6 +215,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
HFunc(TEvents::TEvWakeup, Handle);
HFunc(TRpcServices::TEvGrpcNextReply, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
hFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
hFunc(NKikimr::NGRpcService::TEvSubscribeGrpcCancel, Handle);
default:
Expand Down Expand Up @@ -281,6 +283,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
settings,
req->pool_id());

ev->SetProgressStatsPeriod(TDuration::MilliSeconds(req->stats_period_ms()));

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId())) {
NYql::TIssues issues;
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
Expand Down Expand Up @@ -353,6 +357,27 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
channel.SendAck(SelfId());
}

void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
auto& record = ev->Get()->Record;

Ydb::Query::ExecuteQueryResponsePart response;
response.set_status(Ydb::StatusIds::SUCCESS);

if (NeedReportStats(*Request_->GetProtoRequest())) {
if (record.HasQueryStats()) {
FillQueryStats(*response.mutable_exec_stats(), record.GetQueryStats());
response.mutable_exec_stats()->set_query_plan(NKqp::SerializeAnalyzePlan(record.GetQueryStats()));
}
}

TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);

FlowControl_.PushResponse(out.size());

Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS);
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx);

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,10 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
ExportAggStats(a.Rows, *aggrStat.MutableRows());
}
}

for (const auto& [_, tableStats] : TableStats) {
stats.AddTables()->CopyFrom(*tableStats);
}
}

void TQueryExecutionStats::AdjustAsyncAggr(NYql::NDqProto::TDqAsyncStatsAggr& stats) {
Expand Down
8 changes: 7 additions & 1 deletion ydb/public/api/protos/ydb_query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,13 @@ message ExecuteQueryRequest {
// Allows to set size limitation (in bytes) for one result part
int64 response_part_limit_bytes = 9 [(Ydb.value) = "[0; 33554432]"];

string pool_id = 10; // Workload manager pool id
// Workload manager pool id
string pool_id = 10;

// Time interval for sending periodical query statistics.
// When query statistics are enabled (stats_mode != STATS_MODE_NONE), by default statistics will be sent only once after query execution is finished.
// In case when stats_period_ms is specified and is non-zero, query statistics will be additionally sent every stats_period_ms milliseconds beginning from the start of query execution.
int64 stats_period_ms = 11 [(Ydb.value) = ">= 0"];
}

message ResultSetMeta {
Expand Down
97 changes: 79 additions & 18 deletions ydb/public/lib/ydb_cli/commands/benchmark_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
#include <ydb-cpp-sdk/client/table/table.h>
#include <ydb/public/lib/ydb_cli/common/pretty_table.h>
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
#include <ydb/public/lib/ydb_cli/common/formats.h>
#include <ydb/public/lib/ydb_cli/common/format.h>
#include <ydb/public/lib/ydb_cli/common/plan2svg.h>
#include <ydb/public/lib/ydb_cli/common/progress_indication.h>
#include <ydb-cpp-sdk/client/proto/accessor.h>

#include <ydb/public/api/protos/ydb_query.pb.h>
#include <yql/essentials/public/decimal/yql_decimal.h>
Expand Down Expand Up @@ -179,9 +184,21 @@ class TQueryResultScanner {
}

template <typename TIterator>
bool Scan(TIterator& it) {
bool Scan(TIterator& it, std::optional<TString> planFileName = std::nullopt) {

TProgressIndication progressIndication(true);
TMaybe<NQuery::TExecStats> execStats;

TString currentPlanFileNameStats;
TString currentPlanWithStatsFileName;
TString currentPlanWithStatsFileNameJson;
if (planFileName) {
currentPlanFileNameStats = TStringBuilder() << *planFileName << ".stats";
currentPlanWithStatsFileName = TStringBuilder() << *planFileName << ".svg";
currentPlanWithStatsFileNameJson = TStringBuilder() << *planFileName << ".json";
}
for (;;) {
auto streamPart = it.ReadNext().GetValueSync();
auto streamPart = it.ReadNext().ExtractValueSync();
ui64 rsIndex = 0;

if constexpr (std::is_same_v<TIterator, NTable::TScanQueryPartIterator>) {
Expand All @@ -191,13 +208,47 @@ class TQueryResultScanner {
PlanAst = streamPart.GetQueryStats().GetAst().value_or("");
}
} else {
const auto& stats = streamPart.GetStats();
rsIndex = streamPart.GetResultSetIndex();
if (stats) {
ServerTiming += stats->GetTotalDuration();
QueryPlan = stats->GetPlan().value_or("");
PlanAst = stats->GetAst().value_or("");
if (streamPart.HasStats()) {
execStats = streamPart.ExtractStats();

if (planFileName) {
TFileOutput out(currentPlanFileNameStats);
out << execStats->ToString();
{
auto plan = execStats->GetPlan();
if (plan) {
{
TPlanVisualizer pv;
TFileOutput out(currentPlanWithStatsFileName);
try {
pv.LoadPlans(*execStats->GetPlan());
out << pv.PrintSvg();
} catch (std::exception& e) {
out << "<svg width='1024' height='256' xmlns='http://www.w3.org/2000/svg'><text>" << e.what() << "<text></svg>";
}
}
{
TFileOutput out(currentPlanWithStatsFileNameJson);
TQueryPlanPrinter queryPlanPrinter(EDataFormat::JsonBase64, true, out, 120);
queryPlanPrinter.Print(*execStats->GetPlan());
}
}
}
}

const auto& protoStats = TProtoAccessor::GetProto(execStats.GetRef());
for (const auto& queryPhase : protoStats.query_phases()) {
for (const auto& tableAccessStats : queryPhase.table_access()) {
progressIndication.UpdateProgress({tableAccessStats.reads().rows(), tableAccessStats.reads().bytes(),
tableAccessStats.updates().rows(), tableAccessStats.updates().bytes(),
tableAccessStats.deletes().rows(), tableAccessStats.deletes().bytes()});
}
}

progressIndication.Render();
}

rsIndex = streamPart.GetResultSetIndex();
}

if (!streamPart.IsSuccess()) {
Expand All @@ -212,6 +263,11 @@ class TQueryResultScanner {
RawResults[rsIndex].emplace_back(streamPart.ExtractResultSet());
}
}
if (execStats) {
ServerTiming += execStats->GetTotalDuration();
QueryPlan = execStats->GetPlan().value_or("");
PlanAst = execStats->GetAst().value_or("");
}
return true;
}
};
Expand Down Expand Up @@ -255,32 +311,35 @@ TQueryBenchmarkResult ExecuteImpl(const TString& query, NTable::TTableClient& cl
}
}

TQueryBenchmarkResult Execute(const TString& query, NTable::TTableClient& client, const TQueryBenchmarkDeadline& deadline) {
return ExecuteImpl(query, client, deadline, false);
TQueryBenchmarkResult Execute(const TString& query, NTable::TTableClient& client, const TQueryBenchmarkSettings& settings) {
return ExecuteImpl(query, client, settings.Deadline, false);
}

TQueryBenchmarkResult Explain(const TString& query, NTable::TTableClient& client, const TQueryBenchmarkDeadline& deadline) {
return ExecuteImpl(query, client, deadline, true);
}

TQueryBenchmarkResult ExecuteImpl(const TString& query, NQuery::TQueryClient& client, const TQueryBenchmarkDeadline& deadline, bool explainOnly) {
TQueryBenchmarkResult ExecuteImpl(const TString& query, NQuery::TQueryClient& client, const TQueryBenchmarkSettings& benchmarkSettings, bool explainOnly) {
NQuery::TExecuteQuerySettings settings;
settings.StatsMode(NQuery::EStatsMode::Full);
settings.ExecMode(explainOnly ? NQuery::EExecMode::Explain : NQuery::EExecMode::Execute);
if (auto error = SetTimeoutSettings(settings, deadline)) {
if (benchmarkSettings.WithProgress) {
settings.StatsCollectPeriod(std::chrono::milliseconds(3000));
}
if (auto error = SetTimeoutSettings(settings, benchmarkSettings.Deadline)) {
return *error;
}
auto it = client.StreamExecuteQuery(
query,
NYdb::NQuery::TTxControl::BeginTx().CommitTx(),
settings).GetValueSync();
if (auto error = ResultByStatus(it, deadline.Name)) {
if (auto error = ResultByStatus(it, benchmarkSettings.Deadline.Name)) {
return *error;
}

TQueryResultScanner composite;
composite.SetDeadlineName(deadline.Name);
if (!composite.Scan(it)) {
composite.SetDeadlineName(benchmarkSettings.Deadline.Name);
if (!composite.Scan(it, benchmarkSettings.PlanFileName)) {
return TQueryBenchmarkResult::Error(
composite.GetErrorInfo(), composite.GetQueryPlan(), composite.GetPlanAst());
} else {
Expand All @@ -293,12 +352,14 @@ TQueryBenchmarkResult ExecuteImpl(const TString& query, NQuery::TQueryClient& cl
}
}

TQueryBenchmarkResult Execute(const TString& query, NQuery::TQueryClient& client, const TQueryBenchmarkDeadline& deadline) {
return ExecuteImpl(query, client, deadline, false);
TQueryBenchmarkResult Execute(const TString& query, NQuery::TQueryClient& client, const TQueryBenchmarkSettings& settings) {
return ExecuteImpl(query, client, settings, false);
}

TQueryBenchmarkResult Explain(const TString& query, NQuery::TQueryClient& client, const TQueryBenchmarkDeadline& deadline) {
return ExecuteImpl(query, client, deadline, true);
TQueryBenchmarkSettings settings;
settings.Deadline = deadline;
return ExecuteImpl(query, client, settings, true);
}

NJson::TJsonValue GetQueryLabels(ui32 queryId) {
Expand Down
10 changes: 8 additions & 2 deletions ydb/public/lib/ydb_cli/commands/benchmark_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,16 @@ struct TQueryBenchmarkDeadline {
TString Name;
};

struct TQueryBenchmarkSettings {
TQueryBenchmarkDeadline Deadline;
std::optional<TString> PlanFileName;
bool WithProgress = false;
};

TString FullTablePath(const TString& database, const TString& table);
bool HasCharsInString(const TString& str);
TQueryBenchmarkResult Execute(const TString & query, NTable::TTableClient & client, const TQueryBenchmarkDeadline& deadline);
TQueryBenchmarkResult Execute(const TString & query, NQuery::TQueryClient & client, const TQueryBenchmarkDeadline& deadline);
TQueryBenchmarkResult Execute(const TString & query, NTable::TTableClient & client, const TQueryBenchmarkSettings& settings);
TQueryBenchmarkResult Execute(const TString & query, NQuery::TQueryClient & client, const TQueryBenchmarkSettings& settings);
TQueryBenchmarkResult Explain(const TString & query, NTable::TTableClient & client, const TQueryBenchmarkDeadline& deadline);
TQueryBenchmarkResult Explain(const TString & query, NQuery::TQueryClient & client, const TQueryBenchmarkDeadline& deadline);
NJson::TJsonValue GetQueryLabels(ui32 queryId);
Expand Down
12 changes: 11 additions & 1 deletion ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <library/cpp/json/json_writer.h>
#include <util/string/printf.h>
#include <util/folder/path.h>
#include <optional>

namespace NYdb::NConsoleClient {
TWorkloadCommandBenchmark::TWorkloadCommandBenchmark(NYdbWorkload::TWorkloadParams& params, const NYdbWorkload::IWorkloadQueryGenerator::TWorkloadType& workload)
Expand Down Expand Up @@ -358,9 +359,18 @@ bool TWorkloadCommandBenchmark::RunBench(TClient* client, NYdbWorkload::IWorkloa
for (ui32 i = 0; i < IterationsCount && Now() < GlobalDeadline; ++i) {
auto t1 = TInstant::Now();
TQueryBenchmarkResult res = TQueryBenchmarkResult::Error("undefined", "undefined", "undefined");

TQueryBenchmarkSettings settings;
settings.Deadline = GetDeadline();
settings.WithProgress = true;

if (PlanFileName) {
settings.PlanFileName = TStringBuilder() << PlanFileName << "." << queryN << "." << ToString(i);
}

try {
if (client) {
res = Execute(query, *client, GetDeadline());
res = Execute(query, *client, settings);
} else {
res = TQueryBenchmarkResult::Result(TQueryBenchmarkResult::TRawResults(), TDuration::Zero(), "", "");
}
Expand Down
22 changes: 7 additions & 15 deletions ydb/public/lib/ydb_cli/common/print_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <google/protobuf/port_def.inc>

#include <util/string/printf.h>
#include <util/stream/format.h>

namespace NYdb {
namespace NConsoleClient {
Expand Down Expand Up @@ -51,23 +52,14 @@ void PrintSchemeEntry(IOutputStream& o, const NScheme::TSchemeEntry& entry, NCol
o << entry.Name << colors.OldColor();
}

TString PrettySize(size_t size) {
TString PrettySize(ui64 size) {
double sizeFormat = size;
TString mod = "b";
const char* mods[] = { "Kb", "Mb", "Gb", "Tb", "Pb", "Eb" };
TString numFormat = "%.0f";

for (const char* nextMod : mods) {
if (sizeFormat > 1024) {
sizeFormat /= 1024;
mod = nextMod;
numFormat = "%.02f";
} else {
break;
}
}
return ToString(HumanReadableSize(sizeFormat, ESizeFormat::SF_QUANTITY)) + " B";
}

return Sprintf((numFormat + " %s").data(), sizeFormat, mod.data());
TString PrettyNumber(ui64 number) {
double numberFormat = number;
return ToString(HumanReadableSize(numberFormat, ESizeFormat::SF_QUANTITY));
}

TString FormatTime(TInstant time) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/public/lib/ydb_cli/common/print_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ namespace NConsoleClient {
void PrintSchemeEntry(IOutputStream& o, const NScheme::TSchemeEntry& entry, NColorizer::TColors colors);
TString FormatTime(TInstant time);
TString FormatDuration(TDuration duration);
TString PrettySize(size_t size);
TString PrettySize(ui64 size);
TString PrettyNumber(ui64 number);
TString EntryTypeToString(NScheme::ESchemeEntryType entry);

int PrintProtoJsonBase64(const google::protobuf::Message& msg);
Expand Down
Loading

0 comments on commit 6a1a387

Please sign in to comment.