Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce locality counters for single node tx. (#13415) #13735

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,9 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
ComputeActorDelays = KqpGroup->GetHistogram("NodeScheduler/Delays", NMonitoring::ExponentialHistogram(20, 2, 1));
ThrottledActorsSpuriousActivations = KqpGroup->GetCounter("NodeScheduler/SpuriousActivations", true);
SchedulerDelays = KqpGroup->GetHistogram("NodeScheduler/Delay", NMonitoring::ExponentialHistogram(20, 2, 1));

TotalSingleNodeReqCount = KqpGroup->GetCounter("TotalSingleNodeReqCount", true);
NonLocalSingleNodeReqCount = KqpGroup->GetCounter("NonLocalSingleNodeReqCount", true);
}

::NMonitoring::TDynamicCounterPtr TKqpCounters::GetKqpCounters() const {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,10 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
NMonitoring::THistogramPtr DataTxTotalTimeHistogram;
NMonitoring::THistogramPtr ScanTxTotalTimeHistogram;

// Locality metrics for request
NMonitoring::TDynamicCounters::TCounterPtr TotalSingleNodeReqCount;
NMonitoring::TDynamicCounters::TCounterPtr NonLocalSingleNodeReqCount;

TAlignedPagePoolCounters AllocCounters;

// db counters
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,8 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
const ui64 shardId = res->GetOrigin();
LastShard = shardId;

ParticipantNodes.emplace(ev->Sender.NodeId());

TShardState* shardState = ShardStates.FindPtr(shardId);
YQL_ENSURE(shardState);

Expand Down Expand Up @@ -1941,7 +1943,8 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
if (i64 msc = (i64)Request.MaxShardCount; msc > 0) {
shardsLimit = std::min(shardsLimit, (ui32)msc);
}
size_t shards = datashardTasks.size() + sourceScanPartitionsCount;
const size_t shards = datashardTasks.size() + sourceScanPartitionsCount;

if (shardsLimit > 0 && shards > shardsLimit) {
LOG_W("Too many affected shards: datashardTasks=" << shards << ", limit: " << shardsLimit);
Counters->TxProxyMon->TxResultError->Inc();
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
ShardIdToNodeId = std::move(reply.ShardNodes);
for (auto& [shardId, nodeId] : ShardIdToNodeId) {
ShardsOnNode[nodeId].push_back(shardId);
ParticipantNodes.emplace(nodeId);
}

if (IsDebugLogEnabled()) {
Expand Down Expand Up @@ -1860,6 +1861,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
LOG_N("Full stats: " << response.GetResult().GetStats());
}
}

for (const auto nodeId : ParticipantNodes) {
response.MutableResult()->AddParticipantNodes(nodeId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

это же локальное сообщение, зачем в протобуф перекладывать?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

По аналогии со статистикой. Вообще там не очевидно как использовать поля в TEvTxResponse

}
}

Request.Transactions.crop(0);
Expand Down Expand Up @@ -1988,6 +1993,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
THashMap<NYql::NDq::TStageId, THashMap<ui64, TShardInfo>> SourceScanStageIdToParititions;

ui32 StatementResultIndex;

// Track which nodes has been involved during execution
THashSet<ui32> ParticipantNodes;

bool AlreadyReplied = false;

private:
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@ class TKqpQueryState : public TNonCopyable {
ui32 StatementResultSize = 0;

TMaybe<TString> CommandTagName;
THashSet<uint32_t> ParticipantNodes;

bool IsLocalExecution(ui32 nodeId) const {
if (RequestEv->GetRequestCtx() == nullptr) {
return false;
}
if (ParticipantNodes.size() == 1) {
return *ParticipantNodes.begin() == nodeId;
}
return false;
}

NKikimrKqp::EQueryAction GetAction() const {
return QueryAction;
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->QueryStats.Executions.back().Swap(executerResults.MutableStats());
}

if (executerResults.ParticipantNodesSize()) {
for (auto nodeId : executerResults.GetParticipantNodes()) {
QueryState->ParticipantNodes.emplace(nodeId);
}
}

if (response->GetStatus() != Ydb::StatusIds::SUCCESS) {
const auto executionType = ev->ExecutionType;

Expand Down Expand Up @@ -2118,6 +2124,13 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->PoolHandlerActor = Nothing();
}

if (QueryState && QueryState->ParticipantNodes.size() == 1) {
Counters->TotalSingleNodeReqCount->Inc();
if (!QueryState->IsLocalExecution(SelfId().NodeId())) {
Counters->NonLocalSingleNodeReqCount->Inc();
}
}

LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx}
<< " TransactionsToBeAborted.size(): " << (CleanupCtx ? CleanupCtx->TransactionsToBeAborted.size() : 0)
<< " WorkerId: " << (workerId ? *workerId : TActorId())
Expand Down
216 changes: 216 additions & 0 deletions ydb/core/kqp/ut/query/kqp_stats_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
Expand Down Expand Up @@ -637,6 +638,221 @@ Y_UNIT_TEST(SysViewCancelled) {
}
}

Y_UNIT_TEST(OneShardLocalExec) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
{
auto result = session.ExecuteDataQuery(R"(
SELECT * FROM `/Root/KeyValue` WHERE Key = 1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), 2);
}
{
auto result = session.ExecuteDataQuery(R"(
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1, "1");
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), 3);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
SELECT * FROM `/Root/KeyValue` WHERE Key = 1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), 4);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1, "1");
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), 5);
}
UNIT_ASSERT_VALUES_EQUAL(counters.NonLocalSingleNodeReqCount->Val(), 0);
}

Y_UNIT_TEST(OneShardNonLocalExec) {
TKikimrRunner kikimr(TKikimrSettings().SetNodeCount(2));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
auto monPort = kikimr.GetTestServer().GetRuntime()->GetMonPort();

auto firstNodeId = kikimr.GetTestServer().GetRuntime()->GetFirstNodeId();

TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);

auto expectedTotalSingleNodeReqCount = counters.TotalSingleNodeReqCount->Val();
auto expectedNonLocalSingleNodeReqCount = counters.NonLocalSingleNodeReqCount->Val();

auto drainNode = [monPort](size_t nodeId, bool undrain = false) {
TNetworkAddress addr("localhost", monPort);
TSocket s(addr);
TString url;
if (undrain) {
url = "/tablets/app?TabletID=72057594037968897&node=" + std::to_string(nodeId) + "&page=SetDown&down=0";
} else {
url = "/tablets/app?TabletID=72057594037968897&node=" + std::to_string(nodeId) + "&page=DrainNode";
}
SendMinimalHttpRequest(s, "localhost", url);
TSocketInput si(s);
THttpInput input(&si);
TString firstLine = input.FirstLine();

const auto httpCode = ParseHttpRetCode(firstLine);
UNIT_ASSERT_VALUES_EQUAL(httpCode, 200);
};

auto waitTablets = [&session](size_t nodeId) mutable {
TDescribeTableSettings describeTableSettings =
TDescribeTableSettings()
.WithTableStatistics(true)
.WithPartitionStatistics(true)
.WithShardNodesInfo(true);

bool done = false;
for (int i = 0; i < 10; i++) {
std::unordered_set<ui32> nodeIds;
auto res = session.DescribeTable("Root/EightShard", describeTableSettings)
.ExtractValueSync();

UNIT_ASSERT_EQUAL(res.IsTransportError(), false);
UNIT_ASSERT_EQUAL(res.GetStatus(), EStatus::SUCCESS);
UNIT_ASSERT_VALUES_EQUAL(res.GetTableDescription().GetPartitionsCount(), 8);
UNIT_ASSERT_VALUES_EQUAL(res.GetTableDescription().GetPartitionStats().size(), 8);
for (const auto& s : res.GetTableDescription().GetPartitionStats()) {
nodeIds.emplace(s.LeaderNodeId);
}
if (nodeIds.size() == 1 && *nodeIds.begin() == nodeId) {
done = true;
break;
}
Sleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(done, "unable to wait tablets move on specific node");
};

// Move all tablets on the node2, we have a grpc connection to node 1
// so all sessions will be created on the node 1
drainNode(firstNodeId);
waitTablets(firstNodeId + 1);

{
auto result = session.ExecuteDataQuery(R"(
SELECT * FROM `/Root/EightShard` WHERE Key = 1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = session.ExecuteDataQuery(R"(
UPSERT INTO `/Root/EightShard` (Key, Data) VALUES (1, 1);
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
SELECT * FROM `/Root/EightShard` WHERE Key = 1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
UPSERT INTO `/Root/EightShard` (Key, Data) VALUES (1, 1);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = session.ExecuteDataQuery(R"(
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
expectedNonLocalSingleNodeReqCount += 6;
UNIT_ASSERT_VALUES_EQUAL(counters.NonLocalSingleNodeReqCount->Val(), expectedNonLocalSingleNodeReqCount);

// Now resume node 1 and move all tablets on the node1
// so all tablets will be on the same node with session
drainNode(firstNodeId, true);
drainNode(firstNodeId + 1);
waitTablets(firstNodeId);

{
auto result = session.ExecuteDataQuery(R"(
SELECT * FROM `/Root/EightShard` WHERE Key = 1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = session.ExecuteDataQuery(R"(
UPSERT INTO `/Root/EightShard` (Key, Data) VALUES (1, 1);
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
SELECT * FROM `/Root/EightShard` WHERE Key = 1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
UPSERT INTO `/Root/EightShard` (Key, Data) VALUES (1, 1);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = session.ExecuteDataQuery(R"(
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = session.ExecuteDataQuery(R"(
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
SELECT * FROM `/Root/EightShard` WHERE Key = 1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
SELECT * FROM `/Root/EightShard` WHERE Key = 1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
// All executions are local - same value of counter
UNIT_ASSERT_VALUES_EQUAL(counters.NonLocalSingleNodeReqCount->Val(), expectedNonLocalSingleNodeReqCount);
}

} // suite

} // namespace NKqp
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ message TExecuterTxResult {
reserved 5; // (deprecated) Stats
optional NYql.NDqProto.TDqExecutionStats Stats = 6;
reserved 7;
repeated uint32 ParticipantNodes = 8;
};

message TExecuterTxResponse {
Expand Down
Loading