Skip to content
This repository has been archived by the owner on Jan 7, 2022. It is now read-only.

Commit

Permalink
Ensure LCM receives ServerConfig updates
Browse files Browse the repository at this point in the history
Summary: Internal logs config is available from two places - ServerConfig and LocalLogsConfig. Configuration.cpp used to look up the ServerConfig where as a number other callsites look up on the LocalLogsConfig. These versions could diverge since LocalLogsConfig is only updated whenever LogsConfigStateMachine publishes a new state. This divergence could lead to number of problems if one subcomponent sees a log and another subcomponent does not. For example, recovery of a new internal log could get stuck, replication property changes to internal logs are not propogated, writes to metadata log of a newly added log failing etc. This diff fixes the problem by ensuring that LCM subscribes to ServerConfig updates and publishes a new LocalLogsConfig with updated internal_logs if a change is detected. Also updates Configuration.cpp to always lookup log properties using LocalLogsConfig irrespective of whether log id is internal or not.

Reviewed By: AhmedSoliman

Differential Revision: D17752931

fbshipit-source-id: 1f96c99fb6866e044518acd9a703216f69a642c2
  • Loading branch information
Vijaykumar Padmanaban authored and facebook-github-bot committed Oct 10, 2019
1 parent 0d994d1 commit 98bb864
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 15 deletions.
4 changes: 4 additions & 0 deletions logdevice/common/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ void Worker::onServerConfigUpdated() {

sender().noteConfigurationChanged(getNodesConfiguration());

if (logsconfig_manager_) {
logsconfig_manager_->onServerConfigUpdated();
}

clientReadStreams().noteConfigurationChanged();
// propagate the config change to metadata sequencer
runningWriteMetaDataRecords().noteConfigurationChanged();
Expand Down
12 changes: 0 additions & 12 deletions logdevice/common/configuration/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ std::shared_ptr<LogsConfig::LogGroupNode>
Configuration::getLogGroupByIDShared(logid_t id) const {
if (MetaDataLog::isMetaDataLog(id)) {
return server_config_->getMetaDataLogGroup();
} else if (configuration::InternalLogs::isInternal(id)) {
const auto raw_directory =
server_config_->getInternalLogsConfig().getLogGroupByID(id);
return raw_directory != nullptr ? raw_directory->log_group : nullptr;
} else {
return logs_config_->getLogGroupByIDShared(id);
}
Expand All @@ -60,8 +56,6 @@ Configuration::getLogGroupInDirectoryByIDRaw(logid_t id) const {
ld_check(logs_config_->isLocal());
if (MetaDataLog::isMetaDataLog(id)) {
return &server_config_->getMetaDataLogGroupInDir();
} else if (configuration::InternalLogs::isInternal(id)) {
return server_config_->getInternalLogsConfig().getLogGroupByID(id);
} else {
return localLogsConfig()->getLogGroupInDirectoryByIDRaw(id);
}
Expand All @@ -72,12 +66,6 @@ void Configuration::getLogGroupByIDAsync(
std::function<void(std::shared_ptr<LogsConfig::LogGroupNode>)> cb) const {
if (MetaDataLog::isMetaDataLog(id)) {
cb(server_config_->getMetaDataLogGroup());
return;
} else if (configuration::InternalLogs::isInternal(id)) {
const auto raw_directory =
server_config_->getInternalLogsConfig().getLogGroupByID(id);
cb(raw_directory != nullptr ? raw_directory->log_group : nullptr);
return;
} else {
logs_config_->getLogGroupByIDAsync(id, cb);
}
Expand Down
35 changes: 35 additions & 0 deletions logdevice/common/configuration/InternalLogs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,39 @@ folly::dynamic InternalLogs::toDynamic() const {
return logs;
}

bool InternalLogs::operator!=(const InternalLogs& other) const {
return !(*this == other);
}

bool InternalLogs::operator==(const InternalLogs& other) const {
// If sizes don't match, they are not equal
if (size() != other.size()) {
return false;
}

for (const auto& kv : nameLookup()) {
auto log_id = kv.second;
// Check if the log exists in both configs and that their attrs match
auto this_config_attr =
logExists(log_id) ? getLogGroupByID(log_id)->log_group : nullptr;
auto other_config_attr = other.logExists(log_id)
? other.getLogGroupByID(log_id)->log_group
: nullptr;
// Log not configured in both configs.
if (this_config_attr == nullptr && other_config_attr == nullptr) {
continue;
}
// Log is present in one config but not the other
if (this_config_attr == nullptr || other_config_attr == nullptr) {
return false;
}
// Log id present in both configs but the attributes don't match
if (!(*this_config_attr == *other_config_attr)) {
return false;
}
}

return true;
}

}}} // namespace facebook::logdevice::configuration
3 changes: 3 additions & 0 deletions logdevice/common/configuration/InternalLogs.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ class InternalLogs {
*/
folly::dynamic toDynamic() const;

bool operator!=(const InternalLogs& other) const;
bool operator==(const InternalLogs& other) const;

private:
void reset();

Expand Down
28 changes: 28 additions & 0 deletions logdevice/common/configuration/logs/LogsConfigManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,34 @@ void LogsConfigManager::onSettingsUpdated() {
}
}

void LogsConfigManager::onServerConfigUpdated() {
auto server_config = updateable_config_->getServerConfig();
const auto& server_config_internal_log =
server_config->getInternalLogsConfig();
auto logs_config = updateable_config_->getLocalLogsConfig();
const auto& logs_config_internal_log = logs_config->getInternalLogs();

if (server_config_internal_log != logs_config_internal_log) {
// Make a new copy of the existing config
std::shared_ptr<LocalLogsConfig> new_logs_config =
std::make_shared<LocalLogsConfig>(
*updateable_config_->getLocalLogsConfig());
// Setting the InternalLogs from ServerConfig
new_logs_config->setInternalLogsConfig(server_config_internal_log);
// We want the latest namespace delimiter to be set for this config.
new_logs_config->setNamespaceDelimiter(
server_config->getNamespaceDelimiter());
updateable_config_->updateableLogsConfig()->update(new_logs_config);
ld_info("Published new LogsConfig (fully loaded? %s) version (%lu) from "
"LogsConfigManager because ServerConfig was updated to version:%s",
new_logs_config->isFullyLoaded() ? "yes" : "no",
new_logs_config->getVersion(),
toString(server_config->getVersion()).c_str());
// increment the counter of number of published updates
STAT_INCR(getStats(), logsconfig_manager_published_server_config_update);
}
}

void LogsConfigManager::start() {
if (is_running_) {
return;
Expand Down
7 changes: 7 additions & 0 deletions logdevice/common/configuration/logs/LogsConfigManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ class LogsConfigManager {
*/
void onSettingsUpdated();

/**
* Callback that gets called when the ServerConfig is updated. Must be called
* on the same worker on which this manager is running as a new
* LocalLogsConfig may be published
*/
void onServerConfigUpdated();

/**
* Decides on which worker the LogsConfigManager and LogsConfigStateMachine
* should bind to.
Expand Down
8 changes: 5 additions & 3 deletions logdevice/common/stats/common_stats.inc
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
/* can be included multiple times */

#ifndef STAT_DEFINE
#error STAT_DEFINE() macro not defined
#define STAT_DEFINE(...)
#error STAT_DEFINE() macro not defined
#define STAT_DEFINE(...)
#endif

// Stats shared between server and client
Expand Down Expand Up @@ -52,7 +52,6 @@ STAT_DEFINE(metadata_log_read_failed_corruption, SUM)
STAT_DEFINE(metadata_log_read_dataloss, SUM)
STAT_DEFINE(metadata_log_read_failed_other, SUM)


// Number of CONFIG_CHANGED_Messages received with Action::Reload.
STAT_DEFINE(config_changed_reload, SUM)
// Number of CONFIG_CHANGED_Messages ignored because the config is already
Expand Down Expand Up @@ -92,6 +91,9 @@ STAT_DEFINE(nodeset_finder_fallback_to_metadata_log, SUM)
// LogsConfigManager
// Number of updates sent to UpdateableLogsConfig by LogsConfigManager
STAT_DEFINE(logsconfig_manager_published_update, SUM)
// Number of times a new LocalLogsConfig was published because
// server config was updated
STAT_DEFINE(logsconfig_manager_published_server_config_update, SUM)
// LogsConfig manager receiving updates from the state machine
STAT_DEFINE(logsconfig_manager_received_update, SUM)
// The version of the last processed delta/snapshot on this node
Expand Down
2 changes: 2 additions & 0 deletions logdevice/common/test/RandomNodeSetSelectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ TEST(WeightAwareNodeSetSelectorTest, InternalLogs) {
ASSERT_NE(nullptr, log_group_node);
log_group_node = il.insert("maintenance_log_deltas", log_attrs);
ASSERT_NE(nullptr, log_group_node);
logs_config->setInternalLogsConfig(il);

ShapingConfig shaping_cfg(
std::set<NodeLocationScope>{NodeLocationScope::NODE},
Expand Down Expand Up @@ -1108,6 +1109,7 @@ TEST(WeightAwareNodeSetSelectorTest, InternalLogsConfiguredTooSmall) {
log_attrs.set_replicateAcross(replication.getDistinctReplicationFactors());
auto log_group_node = il.insert("event_log_snapshots", log_attrs);
ASSERT_NE(nullptr, log_group_node);
logs_config->setInternalLogsConfig(il);

ShapingConfig shaping_cfg(
std::set<NodeLocationScope>{NodeLocationScope::NODE},
Expand Down
73 changes: 73 additions & 0 deletions logdevice/test/ServerConfigSourceIntegrationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,76 @@ TEST_F(ServerConfigSourceIntegrationTest, StaleServerConfigFetchFromSource) {
EXPECT_LT(
0, cluster->getNode(0).stats()["config_changed_ignored_not_trusted"]);
}

TEST_F(ServerConfigSourceIntegrationTest, ServerConfigInternalLogUpdate) {
auto cluster =
IntegrationTestUtils::ClusterFactory()
.enableLogsConfigManager()
.setInternalLogsReplicationFactor(1)
.eventLogMode(
IntegrationTestUtils::ClusterFactory::EventLogMode::SNAPSHOTTED)
.create(1);

// The default version for the cluster is 1
std::shared_ptr<Configuration> cluster_config = cluster->getConfig()->get();
EXPECT_EQ(config_version_t(1), cluster_config->serverConfig()->getVersion());

// Bump version of server config
auto new_server_config =
cluster_config->serverConfig()->withVersion(config_version_t(2));
cluster->writeConfig(
new_server_config.get(), cluster_config->logsConfig().get());

// Wait until the node picks up the updated config
wait_until([&]() -> bool {
std::string reply = cluster->getNode(0).sendCommand("info config");
auto updated_config = Configuration::fromJson(reply, nullptr, nullptr);
ld_check(updated_config);

return new_server_config->getVersion() ==
updated_config->serverConfig()->getVersion();
});

// We should not have published a new LogsConfig
EXPECT_EQ(0,
cluster->getNode(0)
.stats()["logsconfig_manager_published_server_config_update"]);

// Now update the internal logs section of the config.
logsconfig::LogAttributes log_attrs;
log_attrs.set_replicationFactor(1);
log_attrs.set_extraCopies(0);
log_attrs.set_syncedCopies(0);
log_attrs.set_maxWritesInFlight(2);
configuration::InternalLogs internalLogs;
internalLogs.insert("config_log_deltas", log_attrs);
internalLogs.insert("config_log_snapshots", log_attrs);
internalLogs.insert("event_log_deltas", log_attrs);
internalLogs.insert("event_log_snapshots", log_attrs);
internalLogs.insert("maintenance_log_deltas", log_attrs);
internalLogs.insert("maintenance_log_snapshots", log_attrs);

auto server_config_updated_internal_logs =
ServerConfig::fromDataTest(new_server_config->getClusterName(),
new_server_config->getNodesConfig(),
new_server_config->getMetaDataLogsConfig(),
ServerConfig::PrincipalsConfig(),
new_server_config->getSecurityConfig(),
ServerConfig::TraceLoggerConfig(),
new_server_config->getTrafficShapingConfig(),
new_server_config->getReadIOShapingConfig(),
new_server_config->getServerSettingsConfig(),
new_server_config->getClientSettingsConfig(),
internalLogs);
cluster->writeConfig(
server_config_updated_internal_logs->withVersion(config_version_t(3))
.get(),
cluster_config->logsConfig().get());

// Wait till logs config gets updated
wait_until([&]() -> bool {
return cluster->getNode(0)
.stats()["logsconfig_manager_published_server_config_update"] ==
1;
});
}

0 comments on commit 98bb864

Please sign in to comment.