Skip to content

Commit

Permalink
IcingaDB: Process dependencies runtime updates
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Feb 12, 2025
1 parent 7d7b4c1 commit e46b25d
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 32 deletions.
199 changes: 168 additions & 31 deletions lib/icingadb/icingadb-objects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ void IcingaDB::ConfigStaticInitialize()
IcingaDB::VersionChangedHandler(object);
});

DependencyGroup::OnChildRegistered.connect(&IcingaDB::DependencyGroupChildRegisteredHandler);
DependencyGroup::OnChildRemoved.connect(&IcingaDB::DependencyGroupChildRemovedHandler);

/* downtime start */
Downtime::OnDowntimeTriggered.connect(&IcingaDB::DowntimeStartedHandler);
/* fixed/flexible downtime end or remove */
Expand Down Expand Up @@ -1149,16 +1152,20 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
* - `icinga:config:dependency:edge:state`: State information for (each) dependency edge. Multiple edges may share the
* same state.
*
* For initial dumps, it shouldn't be necessary to set the `runtimeUpdates` parameter.
* If the `onlyDependencyGroup` parameter is set, only dependencies from this group are processed. This is useful
* when only a specific dependency group should be processed, e.g. during runtime updates. For initial config dumps,
* it shouldn't be necessary to set the `runtimeUpdates` and `onlyDependencyGroup` parameters.
*
* @param checkable The checkable object to extract dependencies from.
* @param hMSets The map of Redis HMSETs to insert the dependency data into.
* @param runtimeUpdates The vector of runtime updates to append the dependency data to.
* @param onlyDependencyGroup If set, only process dependency objects from this group.
*/
void IcingaDB::InsertCheckableDependencies(
const Checkable::Ptr& checkable,
std::map<String, RedisConnection::Query>& hMSets,
std::vector<Dictionary::Ptr>* runtimeUpdates
std::vector<Dictionary::Ptr>* runtimeUpdates,
const DependencyGroup::Ptr& onlyDependencyGroup
)
{
// Only generate a dependency node event if the Checkable is actually part of some dependency graph.
Expand Down Expand Up @@ -1226,7 +1233,12 @@ void IcingaDB::InsertCheckableDependencies(
}
}

for (auto& dependencyGroup : checkable->GetDependencyGroups()) {
std::vector<DependencyGroup::Ptr> dependencyGroups{onlyDependencyGroup};
if (!onlyDependencyGroup) {
dependencyGroups = checkable->GetDependencyGroups();
}

for (auto& dependencyGroup : dependencyGroups) {
String edgeFromNodeId(checkableId);
bool syncSharedEdgeState(false);

Expand Down Expand Up @@ -1493,34 +1505,7 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
UpdateState(checkable, runtimeUpdate ? StateUpdate::Full : StateUpdate::Volatile);
}

std::vector<std::vector<String> > transaction = {{"MULTI"}};

for (auto& kv : hMSets) {
if (!kv.second.empty()) {
kv.second.insert(kv.second.begin(), {"HMSET", kv.first});
transaction.emplace_back(std::move(kv.second));
}
}

for (auto& objectAttributes : runtimeUpdates) {
std::vector<String> xAdd({"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"});
ObjectLock olock(objectAttributes);

for (const Dictionary::Pair& kv : objectAttributes) {
String value = IcingaToStreamValue(kv.second);
if (!value.IsEmpty()) {
xAdd.emplace_back(kv.first);
xAdd.emplace_back(value);
}
}

transaction.emplace_back(std::move(xAdd));
}

if (transaction.size() > 1) {
transaction.push_back({"EXEC"});
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {1});
}
ExecuteRedisTransaction(hMSets, runtimeUpdates);

if (checkable) {
SendNextUpdate(checkable);
Expand Down Expand Up @@ -2884,6 +2869,102 @@ void IcingaDB::SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dict
}
}

void IcingaDB::SendDependencyGroupChildRegistered(const Checkable::Ptr& child, const DependencyGroup::Ptr& dependencyGroup)
{
if (!m_Rcon || !m_Rcon->IsConnected()) {
return;
}

std::vector<Dictionary::Ptr> runtimeUpdates;
std::map<String, RedisConnection::Query> hMSets;
InsertCheckableDependencies(child, hMSets, &runtimeUpdates, dependencyGroup);
ExecuteRedisTransaction(hMSets, runtimeUpdates);

UpdateState(child, StateUpdate::Full);

std::set<Checkable::Ptr> parents;
dependencyGroup->LoadParents(parents);
for (const auto& parent : parents) {
// The affect{ed,s}_children might now have different outcome, so update the parent Checkable as well.
// The grandparent Checkable may still have wrong numbers of affected children, though it's not worth
// traversing the whole tree way up and sending config updates for each one of them, as the next Redis
// config dump is going to fix it anyway.
SendConfigUpdate(parent, true);
}
}

void IcingaDB::SendDependencyGroupChildRemoved(const DependencyGroup::Ptr& dependencyGroup, const std::vector<Dependency::Ptr>& dependencies)
{
if (!m_Rcon || !m_Rcon->IsConnected() || dependencies.empty()) {
return;
}

RedisConnection::Queries hdels, xAdds;
auto deleteState([this, &hdels, &xAdds](const String& id, const String& redisKey) {
hdels.emplace_back(RedisConnection::Query{"HDEL", m_PrefixConfigObject + redisKey, id});
xAdds.emplace_back(RedisConnection::Query{
"XADD", "icinga:runtime:state", "MAXLEN", "~", "1000000", "*", "runtime_type", "delete",
"redis_key", m_PrefixConfigObject + redisKey, "id", id
});
});

Checkable::Ptr child;
std::set<Checkable*> detachedParents;
for (const auto& dependency : dependencies) {
child = dependency->GetChild(); // All dependencies have the same child.
const auto& parent(dependency->GetParent());
if (auto [_, inserted] = detachedParents.insert(dependency->GetParent().get()); inserted) {
String edgeId;
if (dependencyGroup->IsRedundancyGroup()) {
// If there are no other dependencies in the dependency group that connect the redundancy group with
// the parent Checkable, we've to remove the edge and its state accordingly.
if (dependencyGroup->IsEmpty() || !dependencyGroup->HasParentWithConfig(dependency)) {
auto id(HashValue(new Array{dependencyGroup->GetIcingaDBIdentifier(), GetObjectIdentifier(parent)}));
deleteState(id, "dependency:edge:state");
DeleteRelationship(id, "dependency:edge");
}

// Remove the connection from the child Checkable to the redundancy group.
edgeId = HashValue(new Array{GetObjectIdentifier(child), dependencyGroup->GetIcingaDBIdentifier()});
} else {
// Remove the edge between the parent and child Checkable linked through the removed dependency.
edgeId = HashValue(new Array{GetObjectIdentifier(child), GetObjectIdentifier(parent)});
}

DeleteRelationship(edgeId, "dependency:edge");

// The affect{ed,s}_children might now have different outcome, so update the parent Checkable as well.
// The grandparent Checkable may still have wrong numbers of affected children, though it's not worth
// traversing the whole tree way up and sending config updates for each one of them, as the next Redis
// config dump is going to fix it anyway.
SendConfigUpdate(parent, true);
}
}

if (dependencyGroup->IsRedundancyGroup() && dependencyGroup->IsEmpty()) {
String redundancyGroupId(dependencyGroup->GetIcingaDBIdentifier());
deleteState(redundancyGroupId, "dependency:edge:state");
deleteState(redundancyGroupId, "redundancygroup:state");

DeleteRelationship(redundancyGroupId, "dependency:node");
DeleteRelationship(redundancyGroupId, "redundancygroup");
} else if (dependencyGroup->IsEmpty()) {
// Note: The Icinga DB identifier of a non-redundant dependency group is used as the edge state ID
// and shared by all of its dependency objects. See also SerializeDependencyEdgeState() for details.
deleteState(dependencyGroup->GetIcingaDBIdentifier(), "dependency:edge:state");
}

if (!child->HasAnyDependencies()) {
// If the child Checkable has no parent and reverse dependencies, we can safely remove the dependency node.
DeleteRelationship(GetObjectIdentifier(child), "dependency:node");
}

if (!hdels.empty()) {
m_Rcon->FireAndForgetQueries(std::move(hdels), Prio::RuntimeStateSync);
m_Rcon->FireAndForgetQueries(std::move(xAdds), Prio::RuntimeStateStream, {0, 1});
}
}

Dictionary::Ptr IcingaDB::SerializeState(const Checkable::Ptr& checkable)
{
Dictionary::Ptr attrs = new Dictionary();
Expand Down Expand Up @@ -3161,6 +3242,20 @@ void IcingaDB::NextCheckUpdatedHandler(const Checkable::Ptr& checkable)
}
}

void IcingaDB::DependencyGroupChildRegisteredHandler(const Checkable::Ptr& child, const DependencyGroup::Ptr& dependencyGroup)
{
for (const auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
rw->SendDependencyGroupChildRegistered(child, dependencyGroup);
}
}

void IcingaDB::DependencyGroupChildRemovedHandler(const DependencyGroup::Ptr& dependencyGroup, const std::vector<Dependency::Ptr>& dependencies)
{
for (const auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
rw->SendDependencyGroupChildRemoved(dependencyGroup, dependencies);
}
}

void IcingaDB::HostProblemChangedHandler(const Service::Ptr& service) {
for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
/* Host state changes affect is_handled and severity of services. */
Expand Down Expand Up @@ -3278,3 +3373,45 @@ void IcingaDB::DeleteRelationship(const String& id, const String& redisKeyWithou

m_Rcon->FireAndForgetQueries(queries, Prio::Config);
}

/**
* Execute the provided HMSET values and runtime updates in a single Redis transaction.
*
* The HMSETs should just contain the necessary key value pairs to be set in Redis, i.e, without the HMSET command
* itself. This function will then go through each of the map keys and prepend the HMSET command when transforming the
* map into valid Redis queries. Likewise, the runtime updates should just contain the key value pairs to be streamed
* to the icinga:runtime pipeline, and this function will generate a XADD query for each one of the vector elements.
*
* @param hMSets A map of Redis keys and their respective HMSET values.
* @param runtimeUpdates A list of dictionaries to be sent to the icinga:runtime stream.
*/
void IcingaDB::ExecuteRedisTransaction(std::map<String, RedisConnection::Query>& hMSets, const std::vector<Dictionary::Ptr>& runtimeUpdates) const
{
RedisConnection::Queries transaction{{"MULTI"}};
for (auto& [redisKey, query] : hMSets) {
if (!query.empty()) {
RedisConnection::Query hSet{"HMSET", redisKey};
hSet.insert(hSet.end(), std::make_move_iterator(query.begin()), std::make_move_iterator(query.end()));
transaction.emplace_back(std::move(hSet));
}
}

for (auto& attrs : runtimeUpdates) {
RedisConnection::Query xAdd{"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"};

ObjectLock olock(attrs);
for (auto& [key, value]: attrs) {
if (auto streamVal(IcingaToStreamValue(value)); !streamVal.IsEmpty()) {
xAdd.emplace_back(key);
xAdd.emplace_back(std::move(streamVal));
}
}

transaction.emplace_back(std::move(xAdd));
}

if (transaction.size() > 1) {
transaction.emplace_back(RedisConnection::Query{"EXEC"});
m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {1});
}
}
7 changes: 6 additions & 1 deletion lib/icingadb/icingadb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class IcingaDB : public ObjectImpl<IcingaDB>
std::vector<String> GetTypeOverwriteKeys(const String& type);
std::vector<String> GetTypeDumpSignalKeys(const Type::Ptr& type);
void InsertCheckableDependencies(const Checkable::Ptr& checkable, std::map<String, RedisConnection::Query>& hMSets,
std::vector<Dictionary::Ptr>* runtimeUpdates);
std::vector<Dictionary::Ptr>* runtimeUpdates, const DependencyGroup::Ptr& onlyDependencyGroup = nullptr);
void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate);
void UpdateDependenciesState(const Checkable::Ptr& checkable) const;
Expand All @@ -115,6 +115,7 @@ class IcingaDB : public ObjectImpl<IcingaDB>
void AddObjectDataToRuntimeUpdates(std::vector<Dictionary::Ptr>& runtimeUpdates, const String& objectKey,
const String& redisKey, const Dictionary::Ptr& data);
void DeleteRelationship(const String& id, const String& redisKeyWithoutPrefix, bool hasChecksum = false);
void ExecuteRedisTransaction(std::map<String, RedisConnection::Query>& hMSets, const std::vector<Dictionary::Ptr>& runtimeUpdates) const;

void SendSentNotification(
const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
Expand All @@ -139,6 +140,8 @@ class IcingaDB : public ObjectImpl<IcingaDB>
void SendCommandEnvChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues);
void SendCommandArgumentsChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues);
void SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues);
void SendDependencyGroupChildRegistered(const Checkable::Ptr& child, const DependencyGroup::Ptr& dependencyGroup);
void SendDependencyGroupChildRemoved(const DependencyGroup::Ptr& dependencyGroup, const std::vector<Dependency::Ptr>& dependencies);

void ForwardHistoryEntries();

Expand Down Expand Up @@ -185,6 +188,8 @@ class IcingaDB : public ObjectImpl<IcingaDB>
static void FlappingChangeHandler(const Checkable::Ptr& checkable, double changeTime);
static void NewCheckResultHandler(const Checkable::Ptr& checkable);
static void NextCheckUpdatedHandler(const Checkable::Ptr& checkable);
static void DependencyGroupChildRegisteredHandler(const Checkable::Ptr& child, const DependencyGroup::Ptr& dependencyGroup);
static void DependencyGroupChildRemovedHandler(const DependencyGroup::Ptr& dependencyGroup, const std::vector<Dependency::Ptr>& dependencies);
static void HostProblemChangedHandler(const Service::Ptr& service);
static void AcknowledgementSetHandler(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, bool persistent, double changeTime, double expiry);
static void AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const String& removedBy, double changeTime);
Expand Down

0 comments on commit e46b25d

Please sign in to comment.