Skip to content

Commit

Permalink
Consistency option (core part) (#11970)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Nov 25, 2024
1 parent a328b30 commit 7644097
Show file tree
Hide file tree
Showing 11 changed files with 340 additions and 46 deletions.
14 changes: 13 additions & 1 deletion ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,19 @@ message TReplicationConfig {
TTargetSpecific Specific = 5;
}

optional bool InitialSync = 6;
reserved 6; // InitialSync

message TWeakConsistency {
}

message TStrongConsistency {
optional uint64 CommitIntervalMilliSeconds = 1;
}

oneof Consistency {
TWeakConsistency WeakConsistency = 7;
TStrongConsistency StrongConsistency = 8;
}
}

message TReplicationState {
Expand Down
93 changes: 69 additions & 24 deletions ydb/core/tx/replication/controller/dst_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/protos/console_config.pb.h>
#include <ydb/core/protos/replication.pb.h>
#include <ydb/core/protos/schemeshard/operations.pb.h>
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
#include <ydb/core/tx/scheme_board/events.h>
Expand Down Expand Up @@ -226,10 +227,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
AllocateTxId();
}

static void FillReplicationConfig(NKikimrSchemeOp::TTableReplicationConfig& replicationConfig) {
// TODO: support other modes
replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
void FillReplicationConfig(NKikimrSchemeOp::TTableReplicationConfig& replicationConfig) const {
NController::FillReplicationConfig(replicationConfig, Mode, Consistency);
}

void AllocateTxId() {
Expand Down Expand Up @@ -375,22 +374,7 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
return false;
}

const auto& replicationConfig = got.GetReplicationConfig();

switch (replicationConfig.GetMode()) {
case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY:
break;
default:
error = "Unsupported replication mode";
return false;
}

switch (replicationConfig.GetConsistency()) {
case NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK:
break;
default:
error = TStringBuilder() << "Unsupported replication consistency"
<< ": " << static_cast<int>(replicationConfig.GetConsistency());
if (!CheckReplicationConfig(got.GetReplicationConfig(), Mode, Consistency, error)) {
return false;
}

Expand Down Expand Up @@ -623,7 +607,9 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
ui64 tid,
TReplication::ETargetKind kind,
const TString& srcPath,
const TString& dstPath)
const TString& dstPath,
EReplicationMode mode,
EReplicaConsistency consistency)
: Parent(parent)
, SchemeShardId(schemeShardId)
, YdbProxy(proxy)
Expand All @@ -633,6 +619,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
, Kind(kind)
, SrcPath(srcPath)
, DstPath(dstPath)
, Mode(mode)
, Consistency(consistency)
, LogPrefix("DstCreator", ReplicationId, TargetId)
{
}
Expand Down Expand Up @@ -665,6 +653,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
const TReplication::ETargetKind Kind;
const TString SrcPath;
const TString DstPath;
const EReplicationMode Mode;
const EReplicaConsistency Consistency;
const TActorLogPrefix LogPrefix;

TPathId DomainKey;
Expand All @@ -680,17 +670,72 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {

}; // TDstCreator

static NKikimrSchemeOp::TTableReplicationConfig::EConsistency ConvertConsistency(EReplicaConsistency value) {
switch (value) {
case EReplicaConsistency::Weak:
return NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK;
case EReplicaConsistency::Strong:
return NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_STRONG;
}
}

static NKikimrSchemeOp::TTableReplicationConfig::EReplicationMode ConvertMode(EReplicationMode value) {
switch (value) {
case EReplicationMode::ReadOnly:
return NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY;
}
}

void FillReplicationConfig(
NKikimrSchemeOp::TTableReplicationConfig& out,
EReplicationMode mode,
EReplicaConsistency consistency
) {
out.SetMode(ConvertMode(mode));
out.SetConsistency(ConvertConsistency(consistency));
}

bool CheckReplicationConfig(
const NKikimrSchemeOp::TTableReplicationConfig& in,
EReplicationMode mode,
EReplicaConsistency consistency,
TString& error
) {
if (in.GetMode() != ConvertMode(mode)) {
error = TStringBuilder() << "Replication mode mismatch"
<< ": expected: " << ConvertMode(mode)
<< ", got: " << static_cast<int>(in.GetMode());
return false;
}

if (in.GetConsistency() != ConvertConsistency(consistency)) {
error = TStringBuilder() << "Replication consistency mismatch"
<< ": expected: " << ConvertConsistency(consistency)
<< ", got: " << static_cast<int>(in.GetConsistency());
return false;
}

return true;
}

IActor* CreateDstCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx) {
const auto* target = replication->FindTarget(targetId);
Y_ABORT_UNLESS(target);

const auto consistency = replication->GetConfig().HasStrongConsistency()
? EReplicaConsistency::Strong
: EReplicaConsistency::Weak;

return CreateDstCreator(ctx.SelfID, replication->GetSchemeShardId(), replication->GetYdbProxy(), replication->GetPathId(),
replication->GetId(), target->GetId(), target->GetKind(), target->GetSrcPath(), target->GetDstPath());
replication->GetId(), target->GetId(), target->GetKind(), target->GetSrcPath(), target->GetDstPath(),
EReplicationMode::ReadOnly, consistency);
}

IActor* CreateDstCreator(const TActorId& parent, ui64 schemeShardId, const TActorId& proxy, const TPathId& pathId,
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath)
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath,
EReplicationMode mode, EReplicaConsistency consistency)
{
return new TDstCreator(parent, schemeShardId, proxy, pathId, rid, tid, kind, srcPath, dstPath);
return new TDstCreator(parent, schemeShardId, proxy, pathId, rid, tid, kind, srcPath, dstPath, mode, consistency);
}

}
27 changes: 26 additions & 1 deletion ydb/core/tx/replication/controller/dst_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,35 @@

#include "replication.h"

namespace NKikimrSchemeOp {
class TTableReplicationConfig;
}

namespace NKikimr::NReplication::NController {

enum class EReplicationMode {
ReadOnly,
};

enum class EReplicaConsistency {
Weak,
Strong,
};

void FillReplicationConfig(
NKikimrSchemeOp::TTableReplicationConfig& out,
EReplicationMode mode,
EReplicaConsistency consistency);
bool CheckReplicationConfig(
const NKikimrSchemeOp::TTableReplicationConfig& in,
EReplicationMode mode,
EReplicaConsistency consistency,
TString& error);

IActor* CreateDstCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx);
IActor* CreateDstCreator(const TActorId& parent, ui64 schemeShardId, const TActorId& proxy, const TPathId& pathId,
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath);
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath,
EReplicationMode mode = EReplicationMode::ReadOnly,
EReplicaConsistency consistency = EReplicaConsistency::Weak);

}
38 changes: 27 additions & 11 deletions ydb/core/tx/replication/controller/dst_creator_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ namespace NKikimr::NReplication::NController {
Y_UNIT_TEST_SUITE(DstCreator) {
using namespace NTestHelpers;

void CheckTableReplica(const TTestTableDescription& tableDesc, const NKikimrSchemeOp::TTableDescription& replicatedDesc) {
void CheckTableReplica(
const TTestTableDescription& tableDesc,
const NKikimrSchemeOp::TTableDescription& replicatedDesc,
EReplicationMode mode = EReplicationMode::ReadOnly,
EReplicaConsistency consistency = EReplicaConsistency::Weak
) {
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.KeyColumnNamesSize(), tableDesc.KeyColumns.size());
for (ui32 i = 0; i < replicatedDesc.KeyColumnNamesSize(); ++i) {
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.GetKeyColumnNames(i), tableDesc.KeyColumns[i]);
Expand All @@ -28,12 +33,15 @@ Y_UNIT_TEST_SUITE(DstCreator) {
UNIT_ASSERT(FindIfPtr(tableDesc.Columns, pred));
}

const auto& replCfg = replicatedDesc.GetReplicationConfig();
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetMode(), NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetConsistency(), NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
TString error;
UNIT_ASSERT_C(CheckReplicationConfig(replicatedDesc.GetReplicationConfig(), mode, consistency, error), error);
}

void Basic(const TString& replicatedPath) {
void Basic(
const TString& replicatedPath,
EReplicationMode mode = EReplicationMode::ReadOnly,
EReplicaConsistency consistency = EReplicaConsistency::Weak
) {
TEnv env;
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);

Expand All @@ -50,7 +58,7 @@ Y_UNIT_TEST_SUITE(DstCreator) {
env.CreateTable("/Root", *MakeTableDescription(tableDesc));
env.GetRuntime().Register(CreateDstCreator(
env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"),
1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath
1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath, mode, consistency
));

auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
Expand All @@ -59,7 +67,7 @@ Y_UNIT_TEST_SUITE(DstCreator) {
auto desc = env.GetDescription(replicatedPath);
const auto& replicatedDesc = desc.GetPathDescription().GetTable();

CheckTableReplica(tableDesc, replicatedDesc);
CheckTableReplica(tableDesc, replicatedDesc, mode, consistency);
}

Y_UNIT_TEST(Basic) {
Expand All @@ -70,6 +78,10 @@ Y_UNIT_TEST_SUITE(DstCreator) {
Basic("/Root/Dir/Replicated");
}

Y_UNIT_TEST(StrongConsistency) {
Basic("/Root/Replicated", EReplicationMode::ReadOnly, EReplicaConsistency::Strong);
}

void WithIndex(const TString& replicatedPath, NKikimrSchemeOp::EIndexType indexType) {
TEnv env(TFeatureFlags().SetEnableChangefeedsOnIndexTables(true));
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
Expand Down Expand Up @@ -370,15 +382,15 @@ Y_UNIT_TEST_SUITE(DstCreator) {
});
}

Y_UNIT_TEST(UnsupportedReplicationMode) {
Y_UNIT_TEST(ReplicationModeMismatch) {
auto changeMode = [](const TTestTableDescription& desc) {
auto copy = desc;
copy.ReplicationConfig->Mode = TTestTableDescription::TReplicationConfig::MODE_NONE;
copy.ReplicationConfig->Consistency = TTestTableDescription::TReplicationConfig::CONSISTENCY_UNKNOWN;
return copy;
};

ExistingDst(NKikimrScheme::StatusSchemeError, "Unsupported replication mode", changeMode, TTestTableDescription{
ExistingDst(NKikimrScheme::StatusSchemeError, "Replication mode mismatch", changeMode, TTestTableDescription{
.Name = "Table",
.KeyColumns = {"key"},
.Columns = {
Expand All @@ -388,20 +400,24 @@ Y_UNIT_TEST_SUITE(DstCreator) {
});
}

Y_UNIT_TEST(UnsupportedReplicationConsistency) {
Y_UNIT_TEST(ReplicationConsistencyMismatch) {
auto changeConsistency = [](const TTestTableDescription& desc) {
auto copy = desc;
copy.ReplicationConfig->Consistency = TTestTableDescription::TReplicationConfig::CONSISTENCY_STRONG;
return copy;
};

ExistingDst(NKikimrScheme::StatusSchemeError, "Unsupported replication consistency", changeConsistency, TTestTableDescription{
ExistingDst(NKikimrScheme::StatusSchemeError, "Replication consistency mismatch", changeConsistency, TTestTableDescription{
.Name = "Table",
.KeyColumns = {"key"},
.Columns = {
{.Name = "key", .Type = "Uint32"},
{.Name = "value", .Type = "Utf8"},
},
.ReplicationConfig = TTestTableDescription::TReplicationConfig{
.Mode = TTestTableDescription::TReplicationConfig::MODE_READ_ONLY,
.Consistency = TTestTableDescription::TReplicationConfig::CONSISTENCY_WEAK,
},
});
}
}
Expand Down
40 changes: 32 additions & 8 deletions ydb/core/tx/replication/controller/stream_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,25 @@ namespace NKikimr::NReplication::NController {

class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
static NYdb::NTable::TChangefeedDescription MakeChangefeed(
const TString& name, const TDuration& retentionPeriod, const NJson::TJsonMap& attrs)
const TString& name,
const TDuration& retentionPeriod,
const std::optional<TDuration>& resolvedTimestamps,
const NJson::TJsonMap& attrs)
{
using namespace NYdb::NTable;
return TChangefeedDescription(name, EChangefeedMode::Updates, EChangefeedFormat::Json)

auto desc = TChangefeedDescription(name, EChangefeedMode::Updates, EChangefeedFormat::Json)
.WithRetentionPeriod(retentionPeriod)
.WithInitialScan()
.AddAttribute("__async_replication", NJson::WriteJson(attrs, false));

if (resolvedTimestamps) {
desc
.WithVirtualTimestamps()
.WithResolvedTimestamps(*resolvedTimestamps);
}

return desc;
}

void RequestPermission() {
Expand Down Expand Up @@ -161,17 +173,19 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
const TString& srcPath,
const TString& dstPath,
const TString& streamName,
const TDuration& streamRetentionPeriod)
const TDuration& retentionPeriod,
const std::optional<TDuration>& resolvedTimestamps,
bool supportsTopicAutopartitioning)
: Parent(parent)
, YdbProxy(proxy)
, ReplicationId(rid)
, TargetId(tid)
, Kind(kind)
, SrcPath(srcPath)
, Changefeed(MakeChangefeed(streamName, streamRetentionPeriod, NJson::TJsonMap{
, Changefeed(MakeChangefeed(streamName, retentionPeriod, resolvedTimestamps, NJson::TJsonMap{
{"path", dstPath},
{"id", ToString(rid)},
{"supports_topic_autopartitioning", AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication()},
{"supports_topic_autopartitioning", supportsTopicAutopartitioning},
}))
, LogPrefix("StreamCreator", ReplicationId, TargetId)
{
Expand Down Expand Up @@ -202,17 +216,27 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx) {
const auto* target = replication->FindTarget(targetId);
Y_ABORT_UNLESS(target);

const auto& config = replication->GetConfig();
const auto resolvedTimestamps = config.HasStrongConsistency()
? std::make_optional(TDuration::MilliSeconds(config.GetStrongConsistency().GetCommitIntervalMilliSeconds()))
: std::nullopt;

return CreateStreamCreator(ctx.SelfID, replication->GetYdbProxy(),
replication->GetId(), target->GetId(), target->GetKind(),
target->GetSrcPath(), target->GetDstPath(), target->GetStreamName(),
TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds()));
TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds()), resolvedTimestamps,
AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication());
}

IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath,
const TString& streamName, const TDuration& streamRetentionPeriod)
const TString& streamName, const TDuration& retentionPeriod,
const std::optional<TDuration>& resolvedTimestamps,
bool supportsTopicAutopartitioning)
{
return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath, streamName, streamRetentionPeriod);
return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath,
streamName, retentionPeriod, resolvedTimestamps, supportsTopicAutopartitioning);
}

}
Loading

0 comments on commit 7644097

Please sign in to comment.