diff --git a/ydb/library/backup/backup.cpp b/ydb/library/backup/backup.cpp index 2905284f8069..38da7fd28a5d 100644 --- a/ydb/library/backup/backup.cpp +++ b/ydb/library/backup/backup.cpp @@ -4,12 +4,15 @@ #include #include +#include #include #include #include +#include #include #include #include +#include #include #include @@ -475,6 +478,36 @@ void BackupPermissions(TDriver driver, const TString& dbPrefix, const TString& p WriteProtoToFile(proto, folderPath, NDump::NFiles::Permissions()); } +Ydb::Table::ChangefeedDescription ProtoFromChangefeedDesc(const NTable::TChangefeedDescription& changefeedDesc) { + Ydb::Table::ChangefeedDescription protoChangeFeedDesc; + changefeedDesc.SerializeTo(protoChangeFeedDesc); + return protoChangeFeedDesc; +} + +NTopic::TDescribeTopicResult DescribeTopic(TDriver driver, const TString& path) { + NYdb::NTopic::TTopicClient client(driver); + return NConsoleClient::RetryFunction([&]() { + return client.DescribeTopic(path).GetValueSync(); + }); +} + +void BackupChangefeeds(TDriver driver, const TString& tablePath, const TFsPath& folderPath) { + auto desc = DescribeTable(driver, tablePath); + + for (const auto& changefeedDesc : desc.GetChangefeedDescriptions()) { + TFsPath changefeedDirPath = CreateDirectory(folderPath, changefeedDesc.GetName()); + + auto protoChangeFeedDesc = ProtoFromChangefeedDesc(changefeedDesc); + const auto descTopicResult = DescribeTopic(driver, JoinDatabasePath(tablePath, changefeedDesc.GetName())); + VerifyStatus(descTopicResult); + const auto& topicDescription = descTopicResult.GetTopicDescription(); + const auto protoTopicDescription = NYdb::TProtoAccessor::GetProto(topicDescription); + + WriteProtoToFile(protoChangeFeedDesc, changefeedDirPath, NDump::NFiles::Changefeed()); + WriteProtoToFile(protoTopicDescription, changefeedDirPath, NDump::NFiles::Topic()); + } +} + void BackupTable(TDriver driver, const TString& dbPrefix, const TString& backupPrefix, const TString& path, const TFsPath& folderPath, bool schemaOnly, bool preservePoolKinds, bool ordered) { Y_ENSURE(!path.empty()); @@ -486,8 +519,9 @@ void BackupTable(TDriver driver, const TString& dbPrefix, const TString& backupP auto desc = DescribeTable(driver, fullPath); auto proto = ProtoFromTableDescription(desc, preservePoolKinds); - WriteProtoToFile(proto, folderPath, NDump::NFiles::TableScheme()); + + BackupChangefeeds(driver, JoinDatabasePath(dbPrefix, path), folderPath); BackupPermissions(driver, dbPrefix, path, folderPath); if (!schemaOnly) { diff --git a/ydb/library/backup/ya.make b/ydb/library/backup/ya.make index 6a0a2f1e94a2..24a074d7443c 100644 --- a/ydb/library/backup/ya.make +++ b/ydb/library/backup/ya.make @@ -12,8 +12,10 @@ PEERDIR( ydb/public/lib/yson_value ydb/public/lib/ydb_cli/dump/files ydb/public/sdk/cpp/client/ydb_driver + ydb/public/sdk/cpp/client/ydb_proto ydb/public/sdk/cpp/client/ydb_result ydb/public/sdk/cpp/client/ydb_table + ydb/public/sdk/cpp/client/ydb_topic ydb/public/sdk/cpp/client/ydb_value ) diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 03ddeb2990a8..7efe6a271d39 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -2816,10 +2816,10 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { return ret; } -void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const { +template +void TChangefeedDescription::SerializeCommonFields(TProto& proto) const { proto.set_name(Name_); proto.set_virtual_timestamps(VirtualTimestamps_); - proto.set_initial_scan(InitialScan_); proto.set_aws_region(AwsRegion_); switch (Mode_) { @@ -2860,12 +2860,35 @@ void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const { SetDuration(*ResolvedTimestamps_, *proto.mutable_resolved_timestamps_interval()); } + for (const auto& [key, value] : Attributes_) { + (*proto.mutable_attributes())[key] = value; + } +} + +void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const { + SerializeCommonFields(proto); + proto.set_initial_scan(InitialScan_); + if (RetentionPeriod_) { SetDuration(*RetentionPeriod_, *proto.mutable_retention_period()); } +} - for (const auto& [key, value] : Attributes_) { - (*proto.mutable_attributes())[key] = value; +void TChangefeedDescription::SerializeTo(Ydb::Table::ChangefeedDescription& proto) const { + SerializeCommonFields(proto); + + switch (State_) { + case EChangefeedState::Enabled: + proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_ENABLED); + break; + case EChangefeedState::Disabled: + proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_DISABLED); + break; + case EChangefeedState::InitialScan: + proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_INITIAL_SCAN); + break; + case EChangefeedState::Unknown: + break; } } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index ef0060886e9d..9be6449dc31b 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -389,6 +389,7 @@ class TChangefeedDescription { const std::optional& GetInitialScanProgress() const; void SerializeTo(Ydb::Table::Changefeed& proto) const; + void SerializeTo(Ydb::Table::ChangefeedDescription& proto) const; TString ToString() const; void Out(IOutputStream& o) const; @@ -399,6 +400,9 @@ class TChangefeedDescription { template static TChangefeedDescription FromProto(const TProto& proto); + template + void SerializeCommonFields(TProto& proto) const; + private: TString Name_; EChangefeedMode Mode_;