Skip to content

Commit

Permalink
Fix bug with fill tables in root directory and partially created tabl…
Browse files Browse the repository at this point in the history
…es in tpch and tpcds workloads (#13577)
  • Loading branch information
iddqdex authored Jan 20, 2025
1 parent c23916f commit c84691a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 17 deletions.
1 change: 1 addition & 0 deletions ydb/apps/ydb/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Fixed a bug where `ydb workload tpch import generator` and `ydb workload tpcds import generator` commands were failing due to not all tables were created
* Fixed a bug with backslashes in `ydb workload` benchmark paths on Windows
* Added CREATE TABLE text suggestion on scheme error during `ydb import file csv`
* Backup and restore of changefeeds has been added to `ydb tools dump` and `ydb tools restore`. As a result, there are changes in the backup file structure: for tables with changefeeds, a subdirectory is created for each changefeed, named after the changefeed. This subdirectory contains two files: `changefeed_description.pb`, which contains the changefeed description, and `topic_description.pb`, which contains information about the underlying topic.
Expand Down
49 changes: 32 additions & 17 deletions ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ TWorkloadCommandImport::TUploadCommand::TUploadCommand(NYdbWorkload::TWorkloadPa
, Initializer(initializer)
{}

int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGenerator& workloadGen, TConfig& config) {
int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGenerator& /*workloadGen*/, TConfig& /*config*/) {
auto dataGeneratorList = Initializer->GetBulkInitialData();
AtomicSet(ErrorsCount, 0);
InFlightSemaphore = MakeHolder<TFastSemaphore>(UploadParams.MaxInFlight);
if (UploadParams.FileOutputPath.IsDefined()) {
Writer = MakeHolder<TFileWriter>(*this);
} else {
Writer = MakeHolder<TDbWriter>(*this, workloadGen, config);
Writer = MakeHolder<TDbWriter>(*this);
}
for (auto dataGen : dataGeneratorList) {
TThreadPoolParams params;
Expand All @@ -81,21 +81,11 @@ int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGe
}
class TWorkloadCommandImport::TUploadCommand::TDbWriter: public IWriter {
public:
TDbWriter(TWorkloadCommandImport::TUploadCommand& owner, NYdbWorkload::IWorkloadQueryGenerator& workloadGen, TConfig& config)
TDbWriter(TWorkloadCommandImport::TUploadCommand& owner)
: IWriter(owner)
{
RetrySettings.RetryUndefined(true);
RetrySettings.MaxRetries(30);
for(const auto& path: workloadGen.GetCleanPaths()) {
const auto list = NConsoleClient::RecursiveList(*owner.SchemeClient, config.Database + "/" + path.c_str());
for (const auto& entry : list.Entries) {
if (entry.Type == NScheme::ESchemeEntryType::ColumnTable || entry.Type == NScheme::ESchemeEntryType::Table) {
const auto tableDescr = owner.TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync().GetSession().DescribeTable(entry.Name).ExtractValueSync().GetTableDescription();
auto& params = ArrowCsvParams[entry.Name];
params.Columns = tableDescr.GetTableColumns();
}
}
}
}

TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) override {
Expand All @@ -120,11 +110,11 @@ class TWorkloadCommandImport::TUploadCommand::TDbWriter: public IWriter {
private:
TAsyncStatus WriteCsv(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) {
const auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData());
const auto* param = MapFindPtr(ArrowCsvParams, portion->GetTable());
if (!param) {
return NThreading::MakeFuture(TStatus(EStatus::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue("Table does not exist: " + portion->GetTable())})));
const auto param = GetCSVParams(portion->GetTable());
if (!param.Status.IsSuccess()) {
return NThreading::MakeFuture(param.Status);
}
auto arrowCsv = NKikimr::NFormats::TArrowCSVTable::Create(param->Columns, true);
auto arrowCsv = NKikimr::NFormats::TArrowCSVTable::Create(param.Columns, true);
if (!arrowCsv.ok()) {
return NThreading::MakeFuture(TStatus(EStatus::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue(arrowCsv.status().ToString())})));
}
Expand Down Expand Up @@ -160,11 +150,36 @@ class TWorkloadCommandImport::TUploadCommand::TDbWriter: public IWriter {
}

struct TArrowCSVParams {
TStatus Status = TStatus(EStatus::SUCCESS, NYql::TIssues());
TVector<NYdb::NTable::TTableColumn> Columns;
};

TArrowCSVParams GetCSVParams(const TString& table) {
auto g = Guard(CsvParamsLock);
auto result = ArrowCsvParams.emplace(table, TArrowCSVParams());
if (result.second) {
auto session = Owner.TableClient->GetSession(NTable::TCreateSessionSettings()).ExtractValueSync();
if (!session.IsSuccess()) {
auto issues = session.GetIssues();
issues.AddIssue("Cannot create session");
result.first->second.Status = TStatus(session.GetStatus(), std::move(issues));
} else {
const auto tableDescr = session.GetSession().DescribeTable(table).ExtractValueSync();
if (!tableDescr.IsSuccess()) {
auto issues = tableDescr.GetIssues();
issues.AddIssue("Cannot descibe table " + table);
result.first->second.Status = TStatus(tableDescr.GetStatus(), std::move(issues));
} else {
result.first->second.Columns = tableDescr.GetTableDescription().GetTableColumns();
}
}
}
return result.first->second;
}

TMap<TString, TArrowCSVParams> ArrowCsvParams;
NRetry::TRetryOperationSettings RetrySettings;
TAdaptiveLock CsvParamsLock;
};

class TWorkloadCommandImport::TUploadCommand::TFileWriter: public IWriter {
Expand Down

0 comments on commit c84691a

Please sign in to comment.