Skip to content

Commit

Permalink
topic: preliminary refactoring in the restore tool code
Browse files Browse the repository at this point in the history
  • Loading branch information
jepett0 committed Jan 31, 2025
1 parent 0c9e55a commit 79b9420
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 32 deletions.
66 changes: 36 additions & 30 deletions ydb/public/lib/ydb_cli/dump/restore_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,29 @@ bool IsDatabase(TSchemeClient& client, const TString& path) {
return result.GetStatus() == EStatus::SUCCESS && result.GetEntry().Type == ESchemeEntryType::SubDomain;
}

TMaybe<TRestoreResult> ErrorOnIncomplete(const TFsPath& fsPath) {
if (fsPath.Child(NFiles::Incomplete().FileName).Exists()) {
return Result<TRestoreResult>(EStatus::BAD_REQUEST,
TStringBuilder() << "There is incomplete file in folder: " << fsPath.GetPath().Quote()
);
}
return Nothing();
}

TRestoreResult CheckExistenceAndType(TSchemeClient& client, const TString& dbPath, NScheme::ESchemeEntryType expectedType) {
auto pathDescription = DescribePath(client, dbPath);
if (!pathDescription.IsSuccess()) {
return Result<TRestoreResult>(dbPath, std::move(pathDescription));
}
if (pathDescription.GetEntry().Type != expectedType) {
return Result<TRestoreResult>(dbPath, EStatus::SCHEME_ERROR,
TStringBuilder() << "Expected a " << expectedType << ", but got: " << pathDescription.GetEntry().Type
);
}

return Result<TRestoreResult>();
}

} // anonymous

namespace NPrivate {
Expand Down Expand Up @@ -349,9 +372,8 @@ TRestoreResult TRestoreClient::RestoreFolder(const TFsPath& fsPath, const TStrin
TStringBuilder() << "Specified folder is not a directory: " << fsPath.GetPath());
}

if (IsFileExists(fsPath.Child(NFiles::Incomplete().FileName))) {
return Result<TRestoreResult>(EStatus::BAD_REQUEST,
TStringBuilder() << "There is incomplete file in folder: " << fsPath.GetPath());
if (auto error = ErrorOnIncomplete(fsPath)) {
return *error;
}

const TString objectDbPath = Join('/', dbPath, fsPath.GetName());
Expand Down Expand Up @@ -410,10 +432,8 @@ TRestoreResult TRestoreClient::RestoreView(
) {
LOG_D("Process " << fsPath.GetPath().Quote());

if (fsPath.Child(NFiles::Incomplete().FileName).Exists()) {
return Result<TRestoreResult>(EStatus::BAD_REQUEST,
TStringBuilder() << "There is incomplete file in folder: " << fsPath.GetPath().Quote()
);
if (auto error = ErrorOnIncomplete(fsPath)) {
return *error;
}

const TString dbPath = dbRestoreRoot + dbPathRelativeToRestoreRoot;
Expand All @@ -430,17 +450,7 @@ TRestoreResult TRestoreClient::RestoreView(
}

if (settings.DryRun_) {
auto pathDescription = DescribePath(SchemeClient, dbPath);
if (!pathDescription.IsSuccess()) {
return Result<TRestoreResult>(dbPath, std::move(pathDescription));
}
if (pathDescription.GetEntry().Type != NScheme::ESchemeEntryType::View) {
return Result<TRestoreResult>(dbPath, EStatus::SCHEME_ERROR,
TStringBuilder() << "expected a view, got: " << pathDescription.GetEntry().Type
);
}

return Result<TRestoreResult>();
return CheckExistenceAndType(SchemeClient, dbPath, NScheme::ESchemeEntryType::View);
}

LOG_D("Executing view creation query: " << query.Quote());
Expand Down Expand Up @@ -472,9 +482,8 @@ TRestoreResult TRestoreClient::RestoreTable(const TFsPath& fsPath, const TString
{
LOG_D("Process " << fsPath.GetPath().Quote());

if (fsPath.Child(NFiles::Incomplete().FileName).Exists()) {
return Result<TRestoreResult>(EStatus::BAD_REQUEST,
TStringBuilder() << "There is incomplete file in folder: " << fsPath.GetPath());
if (auto error = ErrorOnIncomplete(fsPath)) {
return *error;
}

auto scheme = ReadTableScheme(fsPath, Log.get());
Expand Down Expand Up @@ -802,9 +811,8 @@ TRestoreResult TRestoreClient::RestoreIndexes(const TString& dbPath, const TTabl

TRestoreResult TRestoreClient::RestoreChangefeeds(const TFsPath& fsPath, const TString& dbPath) {
LOG_D("Process " << fsPath.GetPath().Quote());
if (fsPath.Child(NFiles::Incomplete().FileName).Exists()) {
return Result<TRestoreResult>(EStatus::BAD_REQUEST,
TStringBuilder() << "There is incomplete file in folder: " << fsPath.GetPath());
if (auto error = ErrorOnIncomplete(fsPath)) {
return *error;
}

auto changefeedProto = ReadChangefeedDescription(fsPath, Log.get());
Expand Down Expand Up @@ -851,9 +859,8 @@ TRestoreResult TRestoreClient::RestoreConsumers(const TString& topicPath, const
TRestoreResult TRestoreClient::RestorePermissions(const TFsPath& fsPath, const TString& dbPath,
const TRestoreSettings& settings, bool isAlreadyExisting)
{
if (fsPath.Child(NFiles::Incomplete().FileName).Exists()) {
return Result<TRestoreResult>(EStatus::BAD_REQUEST,
TStringBuilder() << "There is incomplete file in folder: " << fsPath.GetPath());
if (auto error = ErrorOnIncomplete(fsPath)) {
return *error;
}

if (!settings.RestoreACL_) {
Expand All @@ -879,9 +886,8 @@ TRestoreResult TRestoreClient::RestoreEmptyDir(const TFsPath& fsPath, const TStr
{
LOG_D("Process " << fsPath.GetPath().Quote());

if (fsPath.Child(NFiles::Incomplete().FileName).Exists()) {
return Result<TRestoreResult>(EStatus::BAD_REQUEST,
TStringBuilder() << "There is incomplete file in folder: " << fsPath.GetPath());
if (auto error = ErrorOnIncomplete(fsPath)) {
return *error;
}

LOG_I("Restore empty directory " << fsPath.GetPath().Quote() << " to " << dbPath.Quote());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace NYdb::inline V3 {
}

namespace NYdb::inline V3::NTopic {

enum class EMeteringMode : uint32_t {
Unspecified = 0,
ReservedCapacity = 1,
Expand Down Expand Up @@ -187,6 +187,8 @@ friend struct TAutoPartitioningSettingsBuilder;
, DownUtilizationPercent_(downUtilizationPercent)
, UpUtilizationPercent_(upUtilizationPercent) {}

void SerializeTo(Ydb::Topic::AutoPartitioningSettings& proto) const;

EAutoPartitioningStrategy GetStrategy() const;
TDuration GetStabilizationWindow() const;
ui32 GetDownUtilizationPercent() const;
Expand Down Expand Up @@ -228,6 +230,8 @@ class TPartitioningSettings {
{
}

void SerializeTo(Ydb::Topic::PartitioningSettings& proto) const;

uint64_t GetMinActivePartitions() const;
uint64_t GetMaxActivePartitions() const;
uint64_t GetPartitionCountLimit() const;
Expand Down Expand Up @@ -437,8 +441,11 @@ struct TConsumerSettings {

using TAttributes = std::map<std::string, std::string>;

TConsumerSettings(TSettings& parent): Parent_(parent) {}
TConsumerSettings(TSettings& parent) : Parent_(parent) {}
TConsumerSettings(TSettings& parent, const std::string& name) : ConsumerName_(name), Parent_(parent) {}
TConsumerSettings(TSettings& parent, const Ydb::Topic::Consumer& proto);

void SerializeTo(Ydb::Topic::Consumer& proto) const;

FLUENT_SETTING(std::string, ConsumerName);
FLUENT_SETTING_DEFAULT(bool, Important, false);
Expand Down Expand Up @@ -526,6 +533,11 @@ struct TCreateTopicSettings : public TOperationRequestSettings<TCreateTopicSetti
using TSelf = TCreateTopicSettings;
using TAttributes = std::map<std::string, std::string>;

TCreateTopicSettings() = default;
TCreateTopicSettings(const Ydb::Topic::CreateTopicRequest& proto);

void SerializeTo(Ydb::Topic::CreateTopicRequest& proto) const;

FLUENT_SETTING(TPartitioningSettings, PartitioningSettings);

FLUENT_SETTING_DEFAULT(TDuration, RetentionPeriod, TDuration::Hours(24));
Expand Down
120 changes: 120 additions & 0 deletions ydb/public/sdk/cpp/src/client/topic/impl/topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ TPartitioningSettings::TPartitioningSettings(const Ydb::Topic::PartitioningSetti
, AutoPartitioningSettings_(settings.auto_partitioning_settings())
{}

void TPartitioningSettings::SerializeTo(Ydb::Topic::PartitioningSettings& proto) const {
proto.set_min_active_partitions(MinActivePartitions_);
proto.set_max_active_partitions(MaxActivePartitions_);
proto.set_partition_count_limit(PartitionCountLimit_);
AutoPartitioningSettings_.SerializeTo(*proto.mutable_auto_partitioning_settings());
}

uint64_t TPartitioningSettings::GetMinActivePartitions() const {
return MinActivePartitions_;
}
Expand All @@ -250,6 +257,14 @@ TAutoPartitioningSettings::TAutoPartitioningSettings(const Ydb::Topic::AutoParti
, UpUtilizationPercent_(settings.partition_write_speed().up_utilization_percent())
{}

void TAutoPartitioningSettings::SerializeTo(Ydb::Topic::AutoPartitioningSettings& proto) const {
proto.set_strategy(static_cast<Ydb::Topic::AutoPartitioningStrategy>(Strategy_));
auto& writeSpeed = *proto.mutable_partition_write_speed();
writeSpeed.mutable_stabilization_window()->set_seconds(StabilizationWindow_.Seconds());
writeSpeed.set_down_utilization_percent(DownUtilizationPercent_);
writeSpeed.set_up_utilization_percent(UpUtilizationPercent_);
}

EAutoPartitioningStrategy TAutoPartitioningSettings::GetStrategy() const {
return Strategy_;
}
Expand Down Expand Up @@ -535,4 +550,109 @@ TAsyncStatus TTopicClient::CommitOffset(const std::string& path, uint64_t partit
return Impl_->CommitOffset(path, partitionId, consumerName, offset, settings);
}

namespace {

Ydb::Topic::SupportedCodecs SerializeCodecs(const std::vector<ECodec>& codecs) {
Ydb::Topic::SupportedCodecs proto;
for (ECodec codec : codecs) {
proto.add_codecs(static_cast<Ydb::Topic::Codec>(codec));
}
return proto;
}

std::vector<ECodec> DeserializeCodecs(const Ydb::Topic::SupportedCodecs& proto) {
std::vector<ECodec> codecs;
codecs.reserve(proto.codecs_size());
for (int codec : proto.codecs()) {
codecs.emplace_back(static_cast<ECodec>(codec));
}
return codecs;
}

google::protobuf::Map<TProtoStringType, TProtoStringType> SerializeAttributes(const std::map<std::string, std::string>& attributes) {
google::protobuf::Map<TProtoStringType, TProtoStringType> proto;
for (const auto& [key, value] : attributes) {
proto.emplace(key, value);
}
return proto;
}

std::map<std::string, std::string> DeserializeAttributes(const google::protobuf::Map<TProtoStringType, TProtoStringType>& proto) {
std::map<std::string, std::string> attributes;
for (const auto& [key, value] : proto) {
attributes.emplace(key, value);
}
return attributes;
}

template <typename TSettings>
google::protobuf::RepeatedPtrField<Ydb::Topic::Consumer> SerializeConsumers(const std::vector<TConsumerSettings<TSettings>>& consumers) {
google::protobuf::RepeatedPtrField<Ydb::Topic::Consumer> proto;
proto.Reserve(consumers.size());
for (const auto& consumer : consumers) {
consumer.SerializeTo(*proto.Add());
}
return proto;
}

template <typename TSettings>
std::vector<TConsumerSettings<TSettings>> DeserializeConsumers(TSettings& parent, const google::protobuf::RepeatedPtrField<Ydb::Topic::Consumer>& proto) {
std::vector<TConsumerSettings<TSettings>> consumers;
consumers.reserve(proto.size());
for (const auto& consumer : proto) {
consumers.emplace_back(TConsumerSettings<TSettings>(parent, consumer));
}
return consumers;
}

}

template <typename TSettings>
TConsumerSettings<TSettings>::TConsumerSettings(TSettings& parent, const Ydb::Topic::Consumer& proto)
: ConsumerName_(proto.name())
, Important_(proto.important())
, ReadFrom_(TInstant::Seconds(proto.read_from().seconds()))
, SupportedCodecs_(DeserializeCodecs(proto.supported_codecs()))
, Attributes_(DeserializeAttributes(proto.attributes()))
, Parent_(parent)
{
}

template <typename TSettings>
void TConsumerSettings<TSettings>::SerializeTo(Ydb::Topic::Consumer& proto) const {
proto.set_name(ConsumerName_);
proto.set_important(Important_);
proto.mutable_read_from()->set_seconds(ReadFrom_.Seconds());
*proto.mutable_supported_codecs() = SerializeCodecs(SupportedCodecs_);
*proto.mutable_attributes() = SerializeAttributes(Attributes_);
}

template class TConsumerSettings<TCreateTopicSettings>;
template class TConsumerSettings<TAlterTopicSettings>;

TCreateTopicSettings::TCreateTopicSettings(const Ydb::Topic::CreateTopicRequest& proto)
: PartitioningSettings_(TPartitioningSettings(proto.partitioning_settings()))
, RetentionPeriod_(TDuration::Seconds(proto.retention_period().seconds()))
, SupportedCodecs_(DeserializeCodecs(proto.supported_codecs()))
, RetentionStorageMb_(proto.retention_storage_mb())
, MeteringMode_(TProtoAccessor::FromProto(proto.metering_mode()))
, PartitionWriteSpeedBytesPerSecond_(proto.partition_write_speed_bytes_per_second())
, PartitionWriteBurstBytes_(proto.partition_write_burst_bytes())
, Attributes_(DeserializeAttributes(proto.attributes()))
{
Consumers_ = DeserializeConsumers(*this, proto.consumers());
}

void TCreateTopicSettings::SerializeTo(Ydb::Topic::CreateTopicRequest& request) const {
PartitioningSettings_.SerializeTo(*request.mutable_partitioning_settings());
request.mutable_retention_period()->set_seconds(RetentionPeriod_.Seconds());
*request.mutable_supported_codecs() = SerializeCodecs(SupportedCodecs_);
request.set_retention_storage_mb(RetentionStorageMb_);
request.set_metering_mode(TProtoAccessor::GetProto(MeteringMode_));
request.set_partition_write_speed_bytes_per_second(PartitionWriteSpeedBytesPerSecond_);
request.set_partition_write_burst_bytes(PartitionWriteBurstBytes_);
*request.mutable_consumers() = SerializeConsumers(Consumers_);
*request.mutable_attributes() = SerializeAttributes(Attributes_);
}

} // namespace NYdb::NTopic

0 comments on commit 79b9420

Please sign in to comment.