From e58d838c5a2591f2684727a656828bb62ae512ca Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Wed, 22 Jan 2025 08:46:50 +0000 Subject: [PATCH] polish pipeline watcher log --- core/config/watcher/PipelineConfigWatcher.cpp | 65 +++++++++---------- core/config/watcher/PipelineConfigWatcher.h | 5 +- 2 files changed, 33 insertions(+), 37 deletions(-) diff --git a/core/config/watcher/PipelineConfigWatcher.cpp b/core/config/watcher/PipelineConfigWatcher.cpp index 5d42905834..f2da1c0ecc 100644 --- a/core/config/watcher/PipelineConfigWatcher.cpp +++ b/core/config/watcher/PipelineConfigWatcher.cpp @@ -33,7 +33,7 @@ namespace logtail { PipelineConfigWatcher::PipelineConfigWatcher() : ConfigWatcher(), - mPipelineManager(CollectionPipelineManager::GetInstance()), + mCollectionPipelineManager(CollectionPipelineManager::GetInstance()), mTaskPipelineManager(TaskPipelineManager::GetInstance()) { } @@ -48,7 +48,7 @@ pair PipelineConfigWatcher::CheckConfigDif InsertPipelines(pDiff, tDiff, configSet, singletonCache); CheckSingletonInput(pDiff, singletonCache); - for (const auto& name : mPipelineManager->GetAllConfigNames()) { + for (const auto& name : mCollectionPipelineManager->GetAllConfigNames()) { if (configSet.find(name) == configSet.end()) { pDiff.mRemoved.push_back(name); LOG_INFO(sLogger, @@ -94,7 +94,7 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(CollectionConfigDiff& pDiff, unordered_set& configSet, SingletonConfigCache& singletonCache) { #ifdef __ENTERPRISE__ - const std::map& builtInPipelines + const map& builtInPipelines = EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs(); for (const auto& pipeline : builtInPipelines) { @@ -138,7 +138,7 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(CollectionConfigDiff& pDiff, if (!IsConfigEnabled(pipelineName, *detail)) { switch (GetConfigType(*detail)) { case ConfigType::Collection: - if (mPipelineManager->FindConfigByName(pipelineName)) { + if (mCollectionPipelineManager->FindConfigByName(pipelineName)) { pDiff.mRemoved.push_back(pipelineName); LOG_INFO(sLogger, ("existing valid config modified and disabled", @@ -178,7 +178,7 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(CollectionConfigDiff& pDiff, void PipelineConfigWatcher::InsertPipelines(CollectionConfigDiff& pDiff, TaskConfigDiff& tDiff, - std::unordered_set& configSet, + unordered_set& configSet, SingletonConfigCache& singletonCache) { for (const auto& dir : mSourceDir) { error_code ec; @@ -248,7 +248,7 @@ void PipelineConfigWatcher::InsertPipelines(CollectionConfigDiff& pDiff, if (!IsConfigEnabled(configName, *detail)) { switch (GetConfigType(*detail)) { case ConfigType::Collection: - if (mPipelineManager->FindConfigByName(configName)) { + if (mCollectionPipelineManager->FindConfigByName(configName)) { pDiff.mRemoved.push_back(configName); LOG_INFO(sLogger, ("existing valid config modified and disabled", @@ -278,8 +278,8 @@ void PipelineConfigWatcher::InsertPipelines(CollectionConfigDiff& pDiff, continue; } } else { - LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object")); - CheckUnchangedConfig(configName, path, pDiff, tDiff, singletonCache); + // check unchanged config just for singleton input + CheckUnchangedConfig(configName, path, pDiff, singletonCache); } } } @@ -332,7 +332,7 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName, SingletonConfigCache& singletonCache) { switch (GetConfigType(*configDetail)) { case ConfigType::Collection: { - shared_ptr p = mPipelineManager->FindConfigByName(configName); + shared_ptr p = mCollectionPipelineManager->FindConfigByName(configName); if (!p) { CollectionConfig config(configName, std::move(configDetail)); if (!config.Parse()) { @@ -419,39 +419,36 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName, return true; } -bool PipelineConfigWatcher::CheckUnchangedConfig(const std::string& configName, +bool PipelineConfigWatcher::CheckUnchangedConfig(const string& configName, const filesystem::path& path, CollectionConfigDiff& pDiff, - TaskConfigDiff& tDiff, SingletonConfigCache& singletonCache) { - auto pipeline = mPipelineManager->FindConfigByName(configName); - auto task = mTaskPipelineManager->FindPipelineByName(configName).get(); - if (task) { + if (mTaskPipelineManager->FindPipelineByName(configName)) { return true; - } else if (pipeline) { // running pipeline in last config update - std::unique_ptr configDetail = make_unique(); + } + + const auto& pipeline = mCollectionPipelineManager->FindConfigByName(configName); + if (pipeline) { + // if this pipeline is selected in the end, we simply pass it, thus, the config here is just a dummy + unique_ptr configDetail = make_unique(); CollectionConfig config(configName, std::move(configDetail)); config.mCreateTime = pipeline->GetContext().GetCreateTime(); config.mSingletonInput = pipeline->GetSingletonInput(); PushPipelineConfig(std::move(config), ConfigDiffEnum::Unchanged, pDiff, singletonCache); - } else { // low priority singleton input in last config update, sort it again + } else { + // low priority singleton input in last config update, sort it again unique_ptr detail = make_unique(); if (!LoadConfigDetailFromFile(path, *detail)) { return false; } if (!IsConfigEnabled(configName, *detail)) { - LOG_DEBUG(sLogger, ("unchanged config found and disabled", "skip current object")("config", configName)); + LOG_DEBUG(sLogger, + ("existing disabled config file unchanged", "skip current object")("config", configName)); return false; } CollectionConfig config(configName, std::move(detail)); if (!config.Parse()) { - LOG_ERROR(sLogger, ("new config found but invalid", "skip current object")("config", configName)); - AlarmManager::GetInstance()->SendAlarm(CATEGORY_CONFIG_ALARM, - "new config found but invalid: skip current object, config: " - + configName, - config.mProject, - config.mLogstore, - config.mRegion); + LOG_DEBUG(sLogger, ("existing invalid config file unchanged", "skip current object")("config", configName)); return false; } if (config.mSingletonInput) { @@ -492,15 +489,15 @@ void PipelineConfigWatcher::PushPipelineConfig(CollectionConfig&& config, void PipelineConfigWatcher::CheckSingletonInput(CollectionConfigDiff& pDiff, SingletonConfigCache& singletonCache) { for (auto& [name, configs] : singletonCache) { - std::sort(configs.begin(), - configs.end(), - [](const std::shared_ptr& a, - const std::shared_ptr& b) -> bool { - if (a->config.mCreateTime == b->config.mCreateTime) { - return a->config.mName < b->config.mName; - } - return a->config.mCreateTime < b->config.mCreateTime; - }); + sort(configs.begin(), + configs.end(), + [](const shared_ptr& a, + const shared_ptr& b) -> bool { + if (a->config.mCreateTime == b->config.mCreateTime) { + return a->config.mName < b->config.mName; + } + return a->config.mCreateTime < b->config.mCreateTime; + }); for (size_t i = 0; i < configs.size(); ++i) { const auto& diffEnum = configs[i]->diffEnum; const auto& configName = configs[i]->config.mName; diff --git a/core/config/watcher/PipelineConfigWatcher.h b/core/config/watcher/PipelineConfigWatcher.h index ac17df8023..66030e79f7 100644 --- a/core/config/watcher/PipelineConfigWatcher.h +++ b/core/config/watcher/PipelineConfigWatcher.h @@ -50,7 +50,7 @@ class PipelineConfigWatcher : public ConfigWatcher { std::pair CheckConfigDiff(); #ifdef APSARA_UNIT_TEST_MAIN - void SetPipelineManager(const CollectionPipelineManager* pm) { mPipelineManager = pm; } + void SetPipelineManager(const CollectionPipelineManager* pm) { mCollectionPipelineManager = pm; } #endif private: @@ -78,7 +78,6 @@ class PipelineConfigWatcher : public ConfigWatcher { bool CheckUnchangedConfig(const std::string& configName, const std::filesystem::path& path, CollectionConfigDiff& pDiff, - TaskConfigDiff& tDiff, SingletonConfigCache& singletonCache); void PushPipelineConfig(CollectionConfig&& config, ConfigDiffEnum diffEnum, @@ -86,7 +85,7 @@ class PipelineConfigWatcher : public ConfigWatcher { SingletonConfigCache& singletonCache); void CheckSingletonInput(CollectionConfigDiff& pDiff, SingletonConfigCache& singletonCache); - const CollectionPipelineManager* mPipelineManager = nullptr; + const CollectionPipelineManager* mCollectionPipelineManager = nullptr; const TaskPipelineManager* mTaskPipelineManager = nullptr; #ifdef APSARA_UNIT_TEST_MAIN