Skip to content

Commit

Permalink
simultaneous writing session for tests (#13187)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 8, 2025
1 parent 5299845 commit 39e93de
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 16 deletions.
6 changes: 4 additions & 2 deletions ydb/core/kqp/ut/olap/helpers/typed_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void TTypedLocalHelper::GetStats(std::vector<NJson::TJsonValue>& stats, const bo
}
}

void TTypedLocalHelper::TWritingGuard::SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch,
void TTypedLocalHelper::TSimultaneousWritingSession::SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch,
const Ydb::StatusIds_StatusCode expectedStatus /*= = Ydb::StatusIds::SUCCESS*/) const {
auto* runtime = KikimrRunner.GetTestServer().GetRuntime();

Expand Down Expand Up @@ -224,7 +224,9 @@ void TTypedLocalHelper::TWritingGuard::SendDataViaActorSystem(TString testTable,
});
}

void TTypedLocalHelper::TWritingGuard::WaitWritings() {
void TTypedLocalHelper::TSimultaneousWritingSession::Finalize() {
AFL_VERIFY(!Finished);
Finished = true;
auto* runtime = KikimrRunner.GetTestServer().GetRuntime();
TDispatchOptions options;
options.CustomFinalCondition = [&]() {
Expand Down
16 changes: 7 additions & 9 deletions ydb/core/kqp/ut/olap/helpers/typed_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,25 @@ class TTypedLocalHelper: public Tests::NCS::THelper {
SetShardingMethod("HASH_FUNCTION_CONSISTENCY_64");
}

class TWritingGuard {
class TSimultaneousWritingSession {
private:
bool Finished = false;
TKikimrRunner& KikimrRunner;
const TString TablePath;
mutable std::atomic<size_t> Responses = 0;
void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch,
const Ydb::StatusIds_StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) const;

void WaitWritings();

public:
TWritingGuard(TKikimrRunner& kikimrRunner, const TString& tablePath)
TSimultaneousWritingSession(TKikimrRunner& kikimrRunner, const TString& tablePath)
: KikimrRunner(kikimrRunner)
, TablePath(tablePath)
{
}

template <class TFiller>
void FillTable(const TFiller& fillPolicy, const double pkKff = 0, const ui32 numRows = 800000) const {
AFL_VERIFY(!Finished);
std::vector<NArrow::NConstruction::IArrayBuilder::TPtr> builders;
builders.emplace_back(
NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::Int64Type>>::BuildNotNullable(
Expand All @@ -66,13 +66,11 @@ class TTypedLocalHelper: public Tests::NCS::THelper {
SendDataViaActorSystem(TablePath, batch, Ydb::StatusIds::SUCCESS);
}

void Finalize() {
WaitWritings();
}
void Finalize();
};

TWritingGuard StartWriting(const TString& tablePath) {
return TWritingGuard(KikimrRunner, tablePath);
TSimultaneousWritingSession StartWriting(const TString& tablePath) {
return TSimultaneousWritingSession(KikimrRunner, tablePath);
}

void ExecuteSchemeQuery(const TString& alterQuery, const NYdb::EStatus expectedStatus = NYdb::EStatus::SUCCESS) const;
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kqp/ut/olap/write_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,14 @@ Y_UNIT_TEST_SUITE(KqpOlapWrite) {
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("Utf8", kikimr);
helper.CreateTestOlapTable();
auto writeGuard = helper.StartWriting("/Root/olapStore/olapTable");
writeGuard.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "aaa", 1), 0, 800000);
auto writeSession = helper.StartWriting("/Root/olapStore/olapTable");
writeSession.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "aaa", 1), 0, 800000);
Sleep(TDuration::Seconds(1));
writeGuard.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "bbb", 1), 0.5, 800000);
writeSession.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "bbb", 1), 0.5, 800000);
Sleep(TDuration::Seconds(1));
writeGuard.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "ccc", 1), 0.75, 800000);
writeSession.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "ccc", 1), 0.75, 800000);
Sleep(TDuration::Seconds(1));
writeGuard.Finalize();
writeSession.Finalize();

auto selectQuery = TString(R"(
SELECT
Expand Down

0 comments on commit 39e93de

Please sign in to comment.