diff --git a/core/application/Application.cpp b/core/application/Application.cpp index a88f9a4e1b..f104eb12a4 100644 --- a/core/application/Application.cpp +++ b/core/application/Application.cpp @@ -268,9 +268,6 @@ void Application::Start() { // GCOVR_EXCL_START LogtailPlugin::GetInstance()->LoadPluginBase(); } - // TODO: this should be refactored to internal pipeline - AlarmManager::GetInstance()->Init(); - time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, lastCheckTagsTime = 0, lastQueueGCTime = 0; #ifndef LOGTAIL_NO_TC_MALLOC time_t lastTcmallocReleaseMemTime = 0; @@ -373,7 +370,6 @@ void Application::Exit() { LogtailMonitor::GetInstance()->Stop(); LoongCollectorMonitor::GetInstance()->Stop(); - AlarmManager::GetInstance()->Stop(); LogtailPlugin::GetInstance()->StopBuiltInModules(); // from now on, alarm should not be used. diff --git a/core/collection_pipeline/CollectionPipeline.h b/core/collection_pipeline/CollectionPipeline.h index 657fe95447..24ab795d61 100644 --- a/core/collection_pipeline/CollectionPipeline.h +++ b/core/collection_pipeline/CollectionPipeline.h @@ -123,6 +123,7 @@ class CollectionPipeline { friend class PipelineUnittest; friend class InputContainerStdioUnittest; friend class InputFileUnittest; + friend class InputInternalAlarmsUnittest; friend class InputInternalMetricsUnittest; friend class InputPrometheusUnittest; friend class ProcessorTagNativeUnittest; diff --git a/core/collection_pipeline/plugin/PluginRegistry.cpp b/core/collection_pipeline/plugin/PluginRegistry.cpp index ae67384fa2..85c652eff1 100644 --- a/core/collection_pipeline/plugin/PluginRegistry.cpp +++ b/core/collection_pipeline/plugin/PluginRegistry.cpp @@ -33,6 +33,7 @@ #include "plugin/input/InputPrometheus.h" #if defined(__linux__) && !defined(__ANDROID__) #include "plugin/input/InputFileSecurity.h" +#include "plugin/input/InputInternalAlarms.h" #include "plugin/input/InputInternalMetrics.h" #include "plugin/input/InputNetworkObserver.h" #include "plugin/input/InputNetworkSecurity.h" @@ -129,6 +130,7 @@ bool PluginRegistry::IsValidNativeFlusherPlugin(const string& name) const { void PluginRegistry::LoadStaticPlugins() { RegisterInputCreator(new StaticInputCreator()); RegisterInputCreator(new StaticInputCreator()); + RegisterInputCreator(new StaticInputCreator(), true); RegisterInputCreator(new StaticInputCreator(), true); #if defined(__linux__) && !defined(__ANDROID__) RegisterInputCreator(new StaticInputCreator()); diff --git a/core/models/PipelineEventGroup.h b/core/models/PipelineEventGroup.h index 929c6e1ebc..46aad28159 100644 --- a/core/models/PipelineEventGroup.h +++ b/core/models/PipelineEventGroup.h @@ -60,6 +60,9 @@ enum class EventGroupMetaKey { PROMETHEUS_STREAM_ID, PROMETHEUS_STREAM_TOTAL, + INTERNAL_DATA_TARGET_REGION, + INTERNAL_DATA_TYPE, + SOURCE_ID }; diff --git a/core/monitor/AlarmManager.cpp b/core/monitor/AlarmManager.cpp index 58dfdcc7c5..4f6c1a446d 100644 --- a/core/monitor/AlarmManager.cpp +++ b/core/monitor/AlarmManager.cpp @@ -12,9 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "AlarmManager.h" +#include "monitor/AlarmManager.h" -#include "Monitor.h" #include "app_config/AppConfig.h" #include "collection_pipeline/queue/QueueKeyManager.h" #include "collection_pipeline/queue/SenderQueueManager.h" @@ -24,6 +23,7 @@ #include "common/TimeUtil.h" #include "common/version.h" #include "constants/Constants.h" +#include "monitor/SelfMonitorServer.h" #include "protobuf/sls/sls_logs.pb.h" #include "provider/Provider.h" @@ -107,178 +107,88 @@ AlarmManager::AlarmManager() { mMessageType[REGISTER_HANDLERS_TOO_SLOW_ALARM] = "REGISTER_HANDLERS_TOO_SLOW_ALARM"; } -void AlarmManager::Init() { - mThreadRes = async(launch::async, &AlarmManager::SendAlarmLoop, this); -} - -void AlarmManager::Stop() { - ForceToSend(); - { - lock_guard lock(mThreadRunningMux); - mIsThreadRunning = false; - } - mStopCV.notify_one(); - if (!mThreadRes.valid()) { - return; - } - future_status s = mThreadRes.wait_for(chrono::seconds(1)); - if (s == future_status::ready) { - LOG_INFO(sLogger, ("alarm gathering", "stopped successfully")); - } else { - LOG_WARNING(sLogger, ("alarm gathering", "forced to stopped")); - } -} - -bool AlarmManager::SendAlarmLoop() { - LOG_INFO(sLogger, ("alarm gathering", "started")); - { - unique_lock lock(mThreadRunningMux); - while (mIsThreadRunning) { - SendAllRegionAlarm(); - if (mStopCV.wait_for(lock, std::chrono::seconds(3), [this]() { return !mIsThreadRunning; })) { - break; - } - } - } - SendAllRegionAlarm(); - return true; -} - -void AlarmManager::SendAllRegionAlarm() { +void AlarmManager::FlushAllRegionAlarm(vector& pipelineEventGroupList) { int32_t currentTime = time(nullptr); size_t sendRegionIndex = 0; size_t sendAlarmTypeIndex = 0; do { - LogGroup logGroup; - string region; - { - PTScopedLock lock(mAlarmBufferMutex); - if (mAllAlarmMap.size() <= sendRegionIndex) { - break; - } - auto allAlarmIter = mAllAlarmMap.begin(); - size_t iterIndex = 0; - while (iterIndex != sendRegionIndex) { - ++iterIndex; - ++allAlarmIter; - } - region = allAlarmIter->first; - // LOG_DEBUG(sLogger, ("1Send Alarm", region)("region", sendRegionIndex)); - AlarmVector& alarmBufferVec = *(allAlarmIter->second.first); - std::vector& lastUpdateTimeVec = allAlarmIter->second.second; - // check this region end - if (sendAlarmTypeIndex >= alarmBufferVec.size()) { - // jump this region - ++sendRegionIndex; - sendAlarmTypeIndex = 0; - continue; - } - // LOG_DEBUG(sLogger, ("2Send Alarm", region)("region", sendRegionIndex)("alarm index", - // mMessageType[sendAlarmTypeIndex])); - // check valid - if (alarmBufferVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM - || lastUpdateTimeVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM) { - LOG_ERROR(sLogger, - ("invalid alarm item", region)("alarm vec", alarmBufferVec.size())("update vec", - lastUpdateTimeVec.size())); - // jump this region - ++sendRegionIndex; - sendAlarmTypeIndex = 0; - continue; - } - - // LOG_DEBUG(sLogger, ("3Send Alarm", region)("region", sendRegionIndex)("alarm index", - // mMessageType[sendAlarmTypeIndex])); - map>& alarmMap = alarmBufferVec[sendAlarmTypeIndex]; - if (alarmMap.size() == 0 - || currentTime - lastUpdateTimeVec[sendAlarmTypeIndex] < INT32_FLAG(logtail_alarm_interval)) { - // go next alarm type - ++sendAlarmTypeIndex; - continue; - } - // check sender queue status, if invalid jump this region - - string project = GetProfileSender()->GetProfileProjectName(region); - QueueKey alarmPrjLogstoreKey - = QueueKeyManager::GetInstance()->GetKey("-flusher_sls-" + project + "#" + ALARM_SLS_LOGSTORE_NAME); - if (SenderQueueManager::GetInstance()->GetQueue(alarmPrjLogstoreKey) == nullptr) { - CollectionPipelineContext ctx; - SenderQueueManager::GetInstance()->CreateQueue( - alarmPrjLogstoreKey, - "self_monitor", - ctx, - {{"region", FlusherSLS::GetRegionConcurrencyLimiter(region)}, - {"project", FlusherSLS::GetProjectConcurrencyLimiter(project)}, - {"logstore", FlusherSLS::GetLogstoreConcurrencyLimiter(project, ALARM_SLS_LOGSTORE_NAME)}}); - } - if (!SenderQueueManager::GetInstance()->IsValidToPush(alarmPrjLogstoreKey)) { - // jump this region - ++sendRegionIndex; - sendAlarmTypeIndex = 0; - continue; - } - - // LOG_DEBUG(sLogger, ("4Send Alarm", region)("region", sendRegionIndex)("alarm index", - // mMessageType[sendAlarmTypeIndex])); - logGroup.set_source(LoongCollectorMonitor::mIpAddr); - logGroup.set_category(ALARM_SLS_LOGSTORE_NAME); - auto now = GetCurrentLogtailTime(); - for (map>::iterator mapIter = alarmMap.begin(); mapIter != alarmMap.end(); - ++mapIter) { - auto& messagePtr = mapIter->second; - - // LOG_DEBUG(sLogger, ("5Send Alarm", region)("region", sendRegionIndex)("alarm index", - // sendAlarmTypeIndex)("msg", messagePtr->mMessage)); - - Log* logPtr = logGroup.add_logs(); - SetLogTime(logPtr, - AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() - : now.tv_sec); - Log_Content* contentPtr = logPtr->add_contents(); - contentPtr->set_key("alarm_type"); - contentPtr->set_value(messagePtr->mMessageType); - - contentPtr = logPtr->add_contents(); - contentPtr->set_key("alarm_message"); - contentPtr->set_value(messagePtr->mMessage); - - contentPtr = logPtr->add_contents(); - contentPtr->set_key("alarm_count"); - contentPtr->set_value(ToString(messagePtr->mCount)); + PTScopedLock lock(mAlarmBufferMutex); + if (mAllAlarmMap.size() <= sendRegionIndex) { + break; + } + auto allAlarmIter = mAllAlarmMap.begin(); + size_t iterIndex = 0; + while (iterIndex != sendRegionIndex) { + ++iterIndex; + ++allAlarmIter; + } + string region = allAlarmIter->first; - contentPtr = logPtr->add_contents(); - contentPtr->set_key("ip"); - contentPtr->set_value(LoongCollectorMonitor::mIpAddr); + AlarmVector& alarmBufferVec = *(allAlarmIter->second.first); + std::vector& lastUpdateTimeVec = allAlarmIter->second.second; + // check this region end + if (sendAlarmTypeIndex >= alarmBufferVec.size()) { + // jump this region + ++sendRegionIndex; + sendAlarmTypeIndex = 0; + continue; + } - contentPtr = logPtr->add_contents(); - contentPtr->set_key("os"); - contentPtr->set_value(OS_NAME); + // check valid + if (alarmBufferVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM + || lastUpdateTimeVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM) { + LOG_ERROR(sLogger, + ("invalid alarm item", region)("alarm vec", alarmBufferVec.size())("update vec", + lastUpdateTimeVec.size())); + // jump this region + ++sendRegionIndex; + sendAlarmTypeIndex = 0; + continue; + } - contentPtr = logPtr->add_contents(); - contentPtr->set_key("ver"); - contentPtr->set_value(ILOGTAIL_VERSION); + map>& alarmMap = alarmBufferVec[sendAlarmTypeIndex]; + if (alarmMap.size() == 0 + || currentTime - lastUpdateTimeVec[sendAlarmTypeIndex] < INT32_FLAG(logtail_alarm_interval)) { + // go next alarm type + ++sendAlarmTypeIndex; + continue; + } - if (!messagePtr->mProjectName.empty()) { - contentPtr = logPtr->add_contents(); - contentPtr->set_key("project_name"); - contentPtr->set_value(messagePtr->mProjectName); - } + PipelineEventGroup pipelineEventGroup(std::make_shared()); + pipelineEventGroup.SetTagNoCopy(LOG_RESERVED_KEY_SOURCE, LoongCollectorMonitor::mIpAddr); + pipelineEventGroup.SetMetadata(EventGroupMetaKey::INTERNAL_DATA_TARGET_REGION, region); + pipelineEventGroup.SetMetadata(EventGroupMetaKey::INTERNAL_DATA_TYPE, + SelfMonitorServer::INTERNAL_DATA_TYPE_ALARM); + auto now = GetCurrentLogtailTime(); + for (map>::iterator mapIter = alarmMap.begin(); mapIter != alarmMap.end(); + ++mapIter) { + auto& messagePtr = mapIter->second; - if (!messagePtr->mCategory.empty()) { - contentPtr = logPtr->add_contents(); - contentPtr->set_key("category"); - contentPtr->set_value(messagePtr->mCategory); - } + LogEvent* logEvent = pipelineEventGroup.AddLogEvent(); + logEvent->SetTimestamp(AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() + : now.tv_sec); + logEvent->SetContent("alarm_type", messagePtr->mMessageType); + logEvent->SetContent("alarm_message", messagePtr->mMessage); + logEvent->SetContent("alarm_count", ToString(messagePtr->mCount)); + logEvent->SetContent("ip", LoongCollectorMonitor::mIpAddr); + logEvent->SetContent("os", OS_NAME); + logEvent->SetContent("ver", string(ILOGTAIL_VERSION)); + if (!messagePtr->mProjectName.empty()) { + logEvent->SetContent("project_name", messagePtr->mProjectName); + } + if (!messagePtr->mCategory.empty()) { + logEvent->SetContent("category", messagePtr->mCategory); } - lastUpdateTimeVec[sendAlarmTypeIndex] = currentTime; - alarmMap.clear(); - ++sendAlarmTypeIndex; } - if (logGroup.logs_size() <= 0) { + lastUpdateTimeVec[sendAlarmTypeIndex] = currentTime; + alarmMap.clear(); + ++sendAlarmTypeIndex; + + if (pipelineEventGroup.GetEvents().size() <= 0) { continue; } // this is an anonymous send and non lock send - GetProfileSender()->SendToProfileProject(region, logGroup); + pipelineEventGroupList.emplace_back(std::move(pipelineEventGroup)); } while (true); } diff --git a/core/monitor/AlarmManager.h b/core/monitor/AlarmManager.h index 6357097262..b45f4727a6 100644 --- a/core/monitor/AlarmManager.h +++ b/core/monitor/AlarmManager.h @@ -27,6 +27,7 @@ #include #include "common/Lock.h" +#include "models/PipelineEventGroup.h" namespace logtail { @@ -122,9 +123,6 @@ class AlarmManager { return &instance; } - void Init(); - void Stop(); - void SendAlarm(const AlarmType alarmType, const std::string& message, const std::string& projectName = "", @@ -134,22 +132,16 @@ class AlarmManager { void ForceToSend(); bool IsLowLevelAlarmValid(); + void FlushAllRegionAlarm(std::vector& pipelineEventGroupList); + private: using AlarmVector = std::vector>>; AlarmManager(); ~AlarmManager() = default; - bool SendAlarmLoop(); // without lock AlarmVector* MakesureLogtailAlarmMapVecUnlocked(const std::string& region); - void SendAllRegionAlarm(); - - std::future mThreadRes; - std::mutex mThreadRunningMux; - bool mIsThreadRunning = true; - std::condition_variable mStopCV; - std::vector mMessageType; std::map, std::vector>> mAllAlarmMap; @@ -157,6 +149,10 @@ class AlarmManager { std::atomic_int mLastLowLevelTime{0}; std::atomic_int mLastLowLevelCount{0}; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class AlarmManagerUnittest; +#endif }; } // namespace logtail diff --git a/core/monitor/SelfMonitorServer.cpp b/core/monitor/SelfMonitorServer.cpp index 5e56a955a3..c7fd5692ae 100644 --- a/core/monitor/SelfMonitorServer.cpp +++ b/core/monitor/SelfMonitorServer.cpp @@ -16,15 +16,16 @@ #include "monitor/SelfMonitorServer.h" -#include "collection_pipeline/CollectionPipelineManager.h" #include "common/LogtailCommonFlags.h" -#include "monitor/Monitor.h" #include "runner/ProcessorRunner.h" using namespace std; namespace logtail { +const string SelfMonitorServer::INTERNAL_DATA_TYPE_ALARM = "__metric__"; +const string SelfMonitorServer::INTERNAL_DATA_TYPE_METRIC = "__alarm__"; + SelfMonitorServer::SelfMonitorServer() { } @@ -40,16 +41,20 @@ void SelfMonitorServer::Init() { void SelfMonitorServer::Monitor() { LOG_INFO(sLogger, ("self-monitor", "started")); int32_t lastMonitorTime = time(NULL); + int32_t lastAlarmTime = time(NULL); { unique_lock lock(mThreadRunningMux); while (mIsThreadRunning) { if (mStopCV.wait_for(lock, std::chrono::seconds(1), [this]() { return !mIsThreadRunning; })) { break; } - int32_t monitorTime = time(NULL); - if ((monitorTime - lastMonitorTime) >= 60) { // 60s - lastMonitorTime = monitorTime; + int32_t nowTime = time(NULL); + if ((nowTime - lastMonitorTime) >= 60) { // 60s + lastMonitorTime = nowTime; SendMetrics(); + } + if ((nowTime - lastAlarmTime) >= 3) { // 3s + lastAlarmTime = nowTime; SendAlarms(); } } @@ -59,6 +64,7 @@ void SelfMonitorServer::Monitor() { } void SelfMonitorServer::Stop() { + AlarmManager::GetInstance()->ForceToSend(); { lock_guard lock(mThreadRunningMux); mIsThreadRunning = false; @@ -75,9 +81,12 @@ void SelfMonitorServer::Stop() { } } -void SelfMonitorServer::UpdateMetricPipeline(CollectionPipelineContext* ctx, SelfMonitorMetricRules* rules) { +void SelfMonitorServer::UpdateMetricPipeline(CollectionPipelineContext* ctx, + size_t inputIndex, + SelfMonitorMetricRules* rules) { WriteLock lock(mMetricPipelineLock); mMetricPipelineCtx = ctx; + mMetricInputIndex = inputIndex; mSelfMonitorMetricRules = rules; LOG_INFO(sLogger, ("self-monitor metrics pipeline", "updated")); } @@ -85,6 +94,7 @@ void SelfMonitorServer::UpdateMetricPipeline(CollectionPipelineContext* ctx, Sel void SelfMonitorServer::RemoveMetricPipeline() { WriteLock lock(mMetricPipelineLock); mMetricPipelineCtx = nullptr; + mMetricInputIndex = 0; mSelfMonitorMetricRules = nullptr; LOG_INFO(sLogger, ("self-monitor metrics pipeline", "removed")); } @@ -103,16 +113,13 @@ void SelfMonitorServer::SendMetrics() { PipelineEventGroup pipelineEventGroup(std::make_shared()); pipelineEventGroup.SetTagNoCopy(LOG_RESERVED_KEY_SOURCE, LoongCollectorMonitor::mIpAddr); - pipelineEventGroup.SetTag(LOG_RESERVED_KEY_TOPIC, "__metric__"); + pipelineEventGroup.SetTag(LOG_RESERVED_KEY_TOPIC, INTERNAL_DATA_TYPE_METRIC); // todo: delete this tag + pipelineEventGroup.SetMetadata(EventGroupMetaKey::INTERNAL_DATA_TYPE, INTERNAL_DATA_TYPE_METRIC); ReadAsPipelineEventGroup(pipelineEventGroup); - shared_ptr pipeline - = CollectionPipelineManager::GetInstance()->FindConfigByName(mMetricPipelineCtx->GetConfigName()); - if (pipeline.get() != nullptr) { - if (pipelineEventGroup.GetEvents().size() > 0) { - ProcessorRunner::GetInstance()->PushQueue( - pipeline->GetContext().GetProcessQueueKey(), 0, std::move(pipelineEventGroup)); - } + if (pipelineEventGroup.GetEvents().size() > 0) { + ProcessorRunner::GetInstance()->PushQueue( + mMetricPipelineCtx->GetProcessQueueKey(), mMetricInputIndex, std::move(pipelineEventGroup)); } } @@ -169,12 +176,38 @@ void SelfMonitorServer::ReadAsPipelineEventGroup(PipelineEventGroup& pipelineEve } } -void SelfMonitorServer::UpdateAlarmPipeline(CollectionPipelineContext* ctx) { - lock_guard lock(mAlarmPipelineMux); +void SelfMonitorServer::UpdateAlarmPipeline(CollectionPipelineContext* ctx, size_t inputIndex) { + WriteLock lock(mAlarmPipelineMux); mAlarmPipelineCtx = ctx; + mAlarmInputIndex = inputIndex; + LOG_INFO(sLogger, ("self-monitor alarms pipeline", "updated")); +} + +void SelfMonitorServer::RemoveAlarmPipeline() { + WriteLock lock(mAlarmPipelineMux); + mAlarmPipelineCtx = nullptr; + mAlarmInputIndex = 0; + LOG_INFO(sLogger, ("self-monitor alarms pipeline", "removed")); } void SelfMonitorServer::SendAlarms() { + // metadata: + // INTERNAL_DATA_TARGET_REGION:${region} + // INTERNAL_DATA_TYPE:__alarm__ + vector pipelineEventGroupList; + AlarmManager::GetInstance()->FlushAllRegionAlarm(pipelineEventGroupList); + + ReadLock lock(mAlarmPipelineMux); + if (mAlarmPipelineCtx == nullptr) { + return; + } + + for (auto& pipelineEventGroup : pipelineEventGroupList) { + if (pipelineEventGroup.GetEvents().size() > 0) { + ProcessorRunner::GetInstance()->PushQueue( + mAlarmPipelineCtx->GetProcessQueueKey(), mAlarmInputIndex, std::move(pipelineEventGroup)); + } + } } } // namespace logtail diff --git a/core/monitor/SelfMonitorServer.h b/core/monitor/SelfMonitorServer.h index 6bb9dd64a1..e6cd6a402c 100644 --- a/core/monitor/SelfMonitorServer.h +++ b/core/monitor/SelfMonitorServer.h @@ -19,6 +19,7 @@ #include #include "collection_pipeline/CollectionPipeline.h" +#include "monitor/Monitor.h" namespace logtail { @@ -32,9 +33,14 @@ class SelfMonitorServer { void Monitor(); void Stop(); - void UpdateMetricPipeline(CollectionPipelineContext* ctx, SelfMonitorMetricRules* rules); + void UpdateMetricPipeline(CollectionPipelineContext* ctx, size_t inputIndex, SelfMonitorMetricRules* rules); void RemoveMetricPipeline(); - void UpdateAlarmPipeline(CollectionPipelineContext* ctx); // Todo + void UpdateAlarmPipeline(CollectionPipelineContext* ctx, size_t inputIndex); + void RemoveAlarmPipeline(); + + static const std::string INTERNAL_DATA_TYPE_ALARM; + static const std::string INTERNAL_DATA_TYPE_METRIC; + private: SelfMonitorServer(); ~SelfMonitorServer() = default; @@ -44,21 +50,26 @@ class SelfMonitorServer { bool mIsThreadRunning = true; std::condition_variable mStopCV; + // metrics void SendMetrics(); bool ProcessSelfMonitorMetricEvent(SelfMonitorMetricEvent& event, const SelfMonitorMetricRule& rule); void PushSelfMonitorMetricEvents(std::vector& events); void ReadAsPipelineEventGroup(PipelineEventGroup& pipelineEventGroup); + mutable ReadWriteLock mMetricPipelineLock; CollectionPipelineContext* mMetricPipelineCtx = nullptr; + size_t mMetricInputIndex = 0; SelfMonitorMetricRules* mSelfMonitorMetricRules = nullptr; SelfMonitorMetricEventMap mSelfMonitorMetricEventMap; - mutable ReadWriteLock mMetricPipelineLock; + // alarms void SendAlarms(); - CollectionPipelineContext* mAlarmPipelineCtx; - std::mutex mAlarmPipelineMux; + mutable ReadWriteLock mAlarmPipelineMux; + CollectionPipelineContext* mAlarmPipelineCtx = nullptr; + size_t mAlarmInputIndex = 0; #ifdef APSARA_UNIT_TEST_MAIN + friend class InputInternalAlarmsUnittest; friend class InputInternalMetricsUnittest; #endif }; diff --git a/core/plugin/input/InputInternalAlarms.cpp b/core/plugin/input/InputInternalAlarms.cpp new file mode 100644 index 0000000000..4df7b14fb0 --- /dev/null +++ b/core/plugin/input/InputInternalAlarms.cpp @@ -0,0 +1,41 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "plugin/input/InputInternalAlarms.h" + +#include "monitor/SelfMonitorServer.h" + +namespace logtail { + +const std::string InputInternalAlarms::sName = "input_internal_alarms"; + +bool InputInternalAlarms::Init(const Json::Value& config, Json::Value& optionalGoPipeline) { + return true; +} + +bool InputInternalAlarms::Start() { + SelfMonitorServer::GetInstance()->UpdateAlarmPipeline(mContext, mIndex); + return true; +} + +bool InputInternalAlarms::Stop(bool isPipelineRemoving) { + if (isPipelineRemoving) { + SelfMonitorServer::GetInstance()->RemoveAlarmPipeline(); + } + return true; +} + +} // namespace logtail diff --git a/core/plugin/input/InputInternalAlarms.h b/core/plugin/input/InputInternalAlarms.h new file mode 100644 index 0000000000..b1477ab7d5 --- /dev/null +++ b/core/plugin/input/InputInternalAlarms.h @@ -0,0 +1,34 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "collection_pipeline/plugin/interface/Input.h" + +namespace logtail { + +class InputInternalAlarms : public Input { +public: + static const std::string sName; + + const std::string& Name() const override { return sName; } + bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override; + bool Start() override; + bool Stop(bool isPipelineRemoving) override; + bool SupportAck() const override { return true; } +}; + +} // namespace logtail diff --git a/core/plugin/input/InputInternalMetrics.cpp b/core/plugin/input/InputInternalMetrics.cpp index f0c110ac7e..9d13ba6ef8 100644 --- a/core/plugin/input/InputInternalMetrics.cpp +++ b/core/plugin/input/InputInternalMetrics.cpp @@ -50,7 +50,7 @@ bool InputInternalMetrics::Init(const Json::Value& config, Json::Value& optional } bool InputInternalMetrics::Start() { - SelfMonitorServer::GetInstance()->UpdateMetricPipeline(mContext, &mSelfMonitorMetricRules); + SelfMonitorServer::GetInstance()->UpdateMetricPipeline(mContext, mIndex, &mSelfMonitorMetricRules); return true; } diff --git a/core/unittest/input/CMakeLists.txt b/core/unittest/input/CMakeLists.txt index 4544e875bf..0e63b54d8f 100644 --- a/core/unittest/input/CMakeLists.txt +++ b/core/unittest/input/CMakeLists.txt @@ -36,6 +36,9 @@ target_link_libraries(input_ebpf_network_security_unittest unittest_base) add_executable(input_ebpf_network_observer_unittest InputNetworkObserverUnittest.cpp) target_link_libraries(input_ebpf_network_observer_unittest unittest_base) +add_executable(input_internal_alarms_unittest InputInternalAlarmsUnittest.cpp) +target_link_libraries(input_internal_alarms_unittest ${UT_BASE_TARGET}) + add_executable(input_internal_metrics_unittest InputInternalMetricsUnittest.cpp) target_link_libraries(input_internal_metrics_unittest ${UT_BASE_TARGET}) @@ -47,4 +50,5 @@ gtest_discover_tests(input_ebpf_file_security_unittest) gtest_discover_tests(input_ebpf_process_security_unittest) gtest_discover_tests(input_ebpf_network_security_unittest) gtest_discover_tests(input_ebpf_network_observer_unittest) +gtest_discover_tests(input_internal_alarms_unittest) gtest_discover_tests(input_internal_metrics_unittest) diff --git a/core/unittest/input/InputInternalAlarmsUnittest.cpp b/core/unittest/input/InputInternalAlarmsUnittest.cpp new file mode 100644 index 0000000000..8576c77265 --- /dev/null +++ b/core/unittest/input/InputInternalAlarmsUnittest.cpp @@ -0,0 +1,90 @@ +// Copyright 2023 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "json/json.h" + +#include "app_config/AppConfig.h" +#include "collection_pipeline/CollectionPipeline.h" +#include "collection_pipeline/CollectionPipelineContext.h" +#include "collection_pipeline/plugin/PluginRegistry.h" +#include "common/JsonUtil.h" +#include "monitor/SelfMonitorServer.h" +#include "plugin/input/InputInternalAlarms.h" +#include "unittest/Unittest.h" + +DECLARE_FLAG_INT32(default_plugin_log_queue_size); + +using namespace std; + +namespace logtail { + +class InputInternalAlarmsUnittest : public testing::Test { +public: + void OnPipelineUpdate(); + +protected: + static void SetUpTestCase() { + LoongCollectorMonitor::GetInstance()->Init(); + PluginRegistry::GetInstance()->LoadPlugins(); + } + + static void TearDownTestCase() { + PluginRegistry::GetInstance()->UnloadPlugins(); + LoongCollectorMonitor::GetInstance()->Stop(); + } + + void SetUp() override { + p.mName = "test_config"; + ctx.SetConfigName("test_config"); + p.mPluginID.store(0); + ctx.SetPipeline(p); + } + +private: + CollectionPipeline p; + CollectionPipelineContext ctx; +}; + +void InputInternalAlarmsUnittest::OnPipelineUpdate() { + Json::Value configJson, optionalGoPipeline; + InputInternalAlarms input; + input.SetContext(ctx); + string configStr, errorMsg; + + configStr = R"( + { + "Type": "input_internal_alarms" + } + )"; + APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); + input.SetContext(ctx); + input.SetMetricsRecordRef(InputInternalAlarms::sName, "1"); + APSARA_TEST_TRUE(input.Init(configJson, optionalGoPipeline)); + + APSARA_TEST_EQUAL(nullptr, SelfMonitorServer::GetInstance()->mAlarmPipelineCtx); + APSARA_TEST_TRUE(input.Start()); + APSARA_TEST_NOT_EQUAL(nullptr, SelfMonitorServer::GetInstance()->mAlarmPipelineCtx); + APSARA_TEST_TRUE(input.Stop(true)); + APSARA_TEST_EQUAL(nullptr, SelfMonitorServer::GetInstance()->mAlarmPipelineCtx); +} + +UNIT_TEST_CASE(InputInternalAlarmsUnittest, OnPipelineUpdate) + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/monitor/AlarmManagerUnittest.cpp b/core/unittest/monitor/AlarmManagerUnittest.cpp new file mode 100644 index 0000000000..5f573431bc --- /dev/null +++ b/core/unittest/monitor/AlarmManagerUnittest.cpp @@ -0,0 +1,86 @@ +// Copyright 2025 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "json/json.h" + +#include "AlarmManager.h" +#include "unittest/Unittest.h" + +namespace logtail { + +static std::atomic_bool running(true); + +class AlarmManagerUnittest : public ::testing::Test { +public: + void SetUp() {} + + void TearDown() {} + + void TestSendAlarm(); + void TestFlushAllRegionAlarm(); +}; + +APSARA_UNIT_TEST_CASE(AlarmManagerUnittest, TestSendAlarm, 0); +APSARA_UNIT_TEST_CASE(AlarmManagerUnittest, TestFlushAllRegionAlarm, 1); + +void AlarmManagerUnittest::TestSendAlarm() { + { + std::string message = "Test Alarm Message"; + std::string projectName = "TestProject"; + std::string category = "TestCategory"; + std::string region = "TestRegion"; + AlarmType alarmType = USER_CONFIG_ALARM; // Assuming USER_CONFIG_ALARM is valid + + AlarmManager::GetInstance()->SendAlarm(alarmType, message, projectName, category, region); + // Assuming we have a method to retrieve alarms for testing + AlarmManager::AlarmVector& alarmBufferVec + = *AlarmManager::GetInstance()->MakesureLogtailAlarmMapVecUnlocked(region); + + std::string key = projectName + "_" + category; + APSARA_TEST_EQUAL(1U, alarmBufferVec[alarmType].size()); + APSARA_TEST_EQUAL(true, alarmBufferVec[alarmType].find(key) != alarmBufferVec[alarmType].end()); + APSARA_TEST_EQUAL(category, alarmBufferVec[alarmType][key]->mCategory); + APSARA_TEST_EQUAL(1, alarmBufferVec[alarmType][key]->mCount); + APSARA_TEST_EQUAL(message, alarmBufferVec[alarmType][key]->mMessage); + APSARA_TEST_EQUAL(AlarmManager::GetInstance()->mMessageType[alarmType], + alarmBufferVec[alarmType][key]->mMessageType); + APSARA_TEST_EQUAL(projectName, alarmBufferVec[alarmType][key]->mProjectName); + } +} + +void AlarmManagerUnittest::TestFlushAllRegionAlarm() { + AlarmManager::GetInstance()->mAllAlarmMap.clear(); + // Simulate adding some alarms + AlarmManager::GetInstance()->SendAlarm(USER_CONFIG_ALARM, "Test1", "Project1", "Cat1", "Region1"); + AlarmManager::GetInstance()->SendAlarm(GLOBAL_CONFIG_ALARM, "Test2", "Project2", "Cat2", "Region2"); + + std::vector pipelineEventGroupList; + AlarmManager::GetInstance()->FlushAllRegionAlarm(pipelineEventGroupList); + + // Assuming each alarm results in a PipelineEventGroup + APSARA_TEST_EQUAL(2U, pipelineEventGroupList.size()); +} + +} // namespace logtail + +int main(int argc, char** argv) { + logtail::Logger::Instance().InitGlobalLoggers(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/core/unittest/monitor/CMakeLists.txt b/core/unittest/monitor/CMakeLists.txt index d3e7c760f2..c24003b6f9 100644 --- a/core/unittest/monitor/CMakeLists.txt +++ b/core/unittest/monitor/CMakeLists.txt @@ -13,7 +13,10 @@ # limitations under the License. cmake_minimum_required(VERSION 3.22) -project(metric_manager_unittest) +project(self_monitor_unittest) + +add_executable(alarm_manager_unittest AlarmManagerUnittest.cpp) +target_link_libraries(alarm_manager_unittest ${UT_BASE_TARGET}) add_executable(metric_manager_unittest MetricManagerUnittest.cpp) target_link_libraries(metric_manager_unittest ${UT_BASE_TARGET}) @@ -25,6 +28,7 @@ add_executable(self_monitor_metric_event_unittest SelfMonitorMetricEventUnittest target_link_libraries(self_monitor_metric_event_unittest ${UT_BASE_TARGET}) include(GoogleTest) +gtest_discover_tests(alarm_manager_unittest) gtest_discover_tests(metric_manager_unittest) gtest_discover_tests(plugin_metric_manager_unittest) gtest_discover_tests(self_monitor_metric_event_unittest)