Skip to content

Commit

Permalink
Add check for zero read iterators count after cancelation. (#13135)
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik authored Dec 31, 2024
1 parent acaff68 commit ec34bf1
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 0 deletions.
38 changes: 38 additions & 0 deletions ydb/core/kqp/ut/common/kqp_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1393,6 +1393,44 @@ void WaitForZeroSessions(const NKqp::TKqpCounters& counters) {
UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work");
}

void WaitForZeroReadIterators(Tests::TServer& server, const TString& path) {
int iterators = 0;
static const TString counterName = "DataShard/ReadIteratorsCount";

for (int i = 0; i < 10; i++, Sleep(TDuration::Seconds(1))) {
TTestActorRuntime* runtime = server.GetRuntime();
auto sender = runtime->AllocateEdgeActor();
auto shards = GetTableShards(&server, sender, path);
UNIT_ASSERT_C(shards.size() > 0, "Table: " << path << " has no shards");
iterators = 0;
for (auto x : shards) {
runtime->SendToPipe(
x,
sender,
new TEvTablet::TEvGetCounters,
0,
GetPipeConfigWithRetries());

auto ev = runtime->GrabEdgeEvent<TEvTablet::TEvGetCountersResponse>(sender);
UNIT_ASSERT(ev);

const NKikimrTabletBase::TEvGetCountersResponse& resp = ev->Get()->Record;
for (const auto& counter : resp.GetTabletCounters().GetAppCounters().GetSimpleCounters()) {
if (counterName != counter.GetName()) {
continue;
}

iterators += counter.GetValue();
}
}
if (iterators == 0) {
break;
}
}

UNIT_ASSERT_C(iterators == 0, "Unable to wait for proper read iterator count, it looks like cancelation doesn`t work (" << iterators << ")");
}

NJson::TJsonValue SimplifyPlan(NJson::TJsonValue& opt, const TGetPlanParams& params) {
if (auto ops = opt.GetMapSafe().find("Operators"); ops != opt.GetMapSafe().end()) {
auto opName = ops->second.GetArraySafe()[0].GetMapSafe().at("Name").GetStringSafe();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/common/kqp_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ TVector<ui64> GetTableShards(Tests::TServer::TPtr server, TActorId sender, const
TVector<ui64> GetColumnTableShards(Tests::TServer* server, TActorId sender, const TString &path);

void WaitForZeroSessions(const NKqp::TKqpCounters& counters);
void WaitForZeroReadIterators(Tests::TServer& server, const TString& path);

bool JoinOrderAndAlgosMatch(const TString& optimized, const TString& reference);

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/ut/query/kqp_limits_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
}

WaitForZeroSessions(counters);
WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}

void DoCancelAfterRo(bool follower, bool streamLookup, bool dependedRead) {
Expand Down Expand Up @@ -869,6 +870,11 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
UNIT_ASSERT(wasCanceled);
}
WaitForZeroSessions(counters);

WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
if (follower) {
WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/OneShardWithFolower");
}
}

Y_UNIT_TEST(CancelAfterRoTx) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}

WaitForZeroSessions(counters);
WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}

Y_UNIT_TEST(IsNull) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2351,6 +2351,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}

WaitForZeroSessions(counters);
WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");

for (const auto& service: kikimr.GetTestServer().GetGRpcServer().GetServices()) {
UNIT_ASSERT_VALUES_EQUAL(service->RequestsInProgress(), 0);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/service/kqp_service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ Y_UNIT_TEST_SUITE(KqpService) {
tx = result.GetTransaction();
}
}, 0, SessionsCount + 1, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
WaitForZeroReadIterators(kikimr->GetTestServer(), "/Root/EightShard");
}

TVector<TAsyncDataQueryResult> simulateSessionBusy(ui32 count, TSession& session) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
}
UNIT_ASSERT(unsuccessStatus);
WaitForZeroSessions(counters);
WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}

void DoStreamExecuteYqlScriptTimeoutBruteForce(bool clientTimeout, bool operationTimeout) {
Expand Down Expand Up @@ -774,6 +775,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
}

WaitForZeroSessions(counters);
WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}

Y_UNIT_TEST(StreamExecuteYqlScriptScanCancelAfterBruteForce) {
Expand Down Expand Up @@ -816,6 +818,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
}

WaitForZeroSessions(counters);
WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}

// Check in case of CANCELED status we have no made changes in the table
Expand Down Expand Up @@ -866,6 +869,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
}

WaitForZeroSessions(counters);
WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}

Y_UNIT_TEST(StreamExecuteYqlScriptWriteCancelAfterBruteForced) {
Expand Down Expand Up @@ -905,6 +909,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
}

WaitForZeroSessions(counters);
WaitForZeroReadIterators(kikimr.GetTestServer(), "/Root/EightShard");
}

Y_UNIT_TEST(StreamExecuteYqlScriptScanClientTimeoutBruteForce) {
Expand Down

0 comments on commit ec34bf1

Please sign in to comment.