Skip to content

Commit

Permalink
Add InputInternalAlarms (#2061)
Browse files Browse the repository at this point in the history
* init

* polish

* polish

* polish

* polish

* polish

* polish
  • Loading branch information
Takuka0311 authored Feb 8, 2025
1 parent 6b6cc66 commit 8a3227b
Show file tree
Hide file tree
Showing 15 changed files with 409 additions and 198 deletions.
4 changes: 0 additions & 4 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,6 @@ void Application::Start() { // GCOVR_EXCL_START
LogtailPlugin::GetInstance()->LoadPluginBase();
}

// TODO: this should be refactored to internal pipeline
AlarmManager::GetInstance()->Init();

time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, lastCheckTagsTime = 0, lastQueueGCTime = 0;
#ifndef LOGTAIL_NO_TC_MALLOC
time_t lastTcmallocReleaseMemTime = 0;
Expand Down Expand Up @@ -373,7 +370,6 @@ void Application::Exit() {

LogtailMonitor::GetInstance()->Stop();
LoongCollectorMonitor::GetInstance()->Stop();
AlarmManager::GetInstance()->Stop();
LogtailPlugin::GetInstance()->StopBuiltInModules();
// from now on, alarm should not be used.

Expand Down
1 change: 1 addition & 0 deletions core/collection_pipeline/CollectionPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class CollectionPipeline {
friend class PipelineUnittest;
friend class InputContainerStdioUnittest;
friend class InputFileUnittest;
friend class InputInternalAlarmsUnittest;
friend class InputInternalMetricsUnittest;
friend class InputPrometheusUnittest;
friend class ProcessorTagNativeUnittest;
Expand Down
2 changes: 2 additions & 0 deletions core/collection_pipeline/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "plugin/input/InputPrometheus.h"
#if defined(__linux__) && !defined(__ANDROID__)
#include "plugin/input/InputFileSecurity.h"
#include "plugin/input/InputInternalAlarms.h"
#include "plugin/input/InputInternalMetrics.h"
#include "plugin/input/InputNetworkObserver.h"
#include "plugin/input/InputNetworkSecurity.h"
Expand Down Expand Up @@ -134,6 +135,7 @@ bool PluginRegistry::IsValidNativeFlusherPlugin(const string& name) const {
void PluginRegistry::LoadStaticPlugins() {
RegisterInputCreator(new StaticInputCreator<InputFile>());
RegisterInputCreator(new StaticInputCreator<InputPrometheus>());
RegisterInputCreator(new StaticInputCreator<InputInternalAlarms>(), true);
RegisterInputCreator(new StaticInputCreator<InputInternalMetrics>(), true);
#if defined(__linux__) && !defined(__ANDROID__)
RegisterInputCreator(new StaticInputCreator<InputContainerStdio>());
Expand Down
3 changes: 3 additions & 0 deletions core/models/PipelineEventGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ enum class EventGroupMetaKey {
PROMETHEUS_STREAM_ID,
PROMETHEUS_STREAM_TOTAL,

INTERNAL_DATA_TARGET_REGION,
INTERNAL_DATA_TYPE,

SOURCE_ID
};

Expand Down
230 changes: 70 additions & 160 deletions core/monitor/AlarmManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "AlarmManager.h"
#include "monitor/AlarmManager.h"

#include "Monitor.h"
#include "app_config/AppConfig.h"
#include "collection_pipeline/queue/QueueKeyManager.h"
#include "collection_pipeline/queue/SenderQueueManager.h"
Expand All @@ -24,6 +23,7 @@
#include "common/TimeUtil.h"
#include "common/version.h"
#include "constants/Constants.h"
#include "monitor/SelfMonitorServer.h"
#include "protobuf/sls/sls_logs.pb.h"
#include "provider/Provider.h"

Expand Down Expand Up @@ -107,178 +107,88 @@ AlarmManager::AlarmManager() {
mMessageType[REGISTER_HANDLERS_TOO_SLOW_ALARM] = "REGISTER_HANDLERS_TOO_SLOW_ALARM";
}

void AlarmManager::Init() {
mThreadRes = async(launch::async, &AlarmManager::SendAlarmLoop, this);
}

void AlarmManager::Stop() {
ForceToSend();
{
lock_guard<mutex> lock(mThreadRunningMux);
mIsThreadRunning = false;
}
mStopCV.notify_one();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("alarm gathering", "stopped successfully"));
} else {
LOG_WARNING(sLogger, ("alarm gathering", "forced to stopped"));
}
}

bool AlarmManager::SendAlarmLoop() {
LOG_INFO(sLogger, ("alarm gathering", "started"));
{
unique_lock<mutex> lock(mThreadRunningMux);
while (mIsThreadRunning) {
SendAllRegionAlarm();
if (mStopCV.wait_for(lock, std::chrono::seconds(3), [this]() { return !mIsThreadRunning; })) {
break;
}
}
}
SendAllRegionAlarm();
return true;
}

void AlarmManager::SendAllRegionAlarm() {
void AlarmManager::FlushAllRegionAlarm(vector<PipelineEventGroup>& pipelineEventGroupList) {
int32_t currentTime = time(nullptr);
size_t sendRegionIndex = 0;
size_t sendAlarmTypeIndex = 0;
do {
LogGroup logGroup;
string region;
{
PTScopedLock lock(mAlarmBufferMutex);
if (mAllAlarmMap.size() <= sendRegionIndex) {
break;
}
auto allAlarmIter = mAllAlarmMap.begin();
size_t iterIndex = 0;
while (iterIndex != sendRegionIndex) {
++iterIndex;
++allAlarmIter;
}
region = allAlarmIter->first;
// LOG_DEBUG(sLogger, ("1Send Alarm", region)("region", sendRegionIndex));
AlarmVector& alarmBufferVec = *(allAlarmIter->second.first);
std::vector<int32_t>& lastUpdateTimeVec = allAlarmIter->second.second;
// check this region end
if (sendAlarmTypeIndex >= alarmBufferVec.size()) {
// jump this region
++sendRegionIndex;
sendAlarmTypeIndex = 0;
continue;
}
// LOG_DEBUG(sLogger, ("2Send Alarm", region)("region", sendRegionIndex)("alarm index",
// mMessageType[sendAlarmTypeIndex]));
// check valid
if (alarmBufferVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM
|| lastUpdateTimeVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM) {
LOG_ERROR(sLogger,
("invalid alarm item", region)("alarm vec", alarmBufferVec.size())("update vec",
lastUpdateTimeVec.size()));
// jump this region
++sendRegionIndex;
sendAlarmTypeIndex = 0;
continue;
}

// LOG_DEBUG(sLogger, ("3Send Alarm", region)("region", sendRegionIndex)("alarm index",
// mMessageType[sendAlarmTypeIndex]));
map<string, unique_ptr<AlarmMessage>>& alarmMap = alarmBufferVec[sendAlarmTypeIndex];
if (alarmMap.size() == 0
|| currentTime - lastUpdateTimeVec[sendAlarmTypeIndex] < INT32_FLAG(logtail_alarm_interval)) {
// go next alarm type
++sendAlarmTypeIndex;
continue;
}
// check sender queue status, if invalid jump this region

string project = GetProfileSender()->GetProfileProjectName(region);
QueueKey alarmPrjLogstoreKey
= QueueKeyManager::GetInstance()->GetKey("-flusher_sls-" + project + "#" + ALARM_SLS_LOGSTORE_NAME);
if (SenderQueueManager::GetInstance()->GetQueue(alarmPrjLogstoreKey) == nullptr) {
CollectionPipelineContext ctx;
SenderQueueManager::GetInstance()->CreateQueue(
alarmPrjLogstoreKey,
"self_monitor",
ctx,
{{"region", FlusherSLS::GetRegionConcurrencyLimiter(region)},
{"project", FlusherSLS::GetProjectConcurrencyLimiter(project)},
{"logstore", FlusherSLS::GetLogstoreConcurrencyLimiter(project, ALARM_SLS_LOGSTORE_NAME)}});
}
if (!SenderQueueManager::GetInstance()->IsValidToPush(alarmPrjLogstoreKey)) {
// jump this region
++sendRegionIndex;
sendAlarmTypeIndex = 0;
continue;
}

// LOG_DEBUG(sLogger, ("4Send Alarm", region)("region", sendRegionIndex)("alarm index",
// mMessageType[sendAlarmTypeIndex]));
logGroup.set_source(LoongCollectorMonitor::mIpAddr);
logGroup.set_category(ALARM_SLS_LOGSTORE_NAME);
auto now = GetCurrentLogtailTime();
for (map<string, unique_ptr<AlarmMessage>>::iterator mapIter = alarmMap.begin(); mapIter != alarmMap.end();
++mapIter) {
auto& messagePtr = mapIter->second;

// LOG_DEBUG(sLogger, ("5Send Alarm", region)("region", sendRegionIndex)("alarm index",
// sendAlarmTypeIndex)("msg", messagePtr->mMessage));

Log* logPtr = logGroup.add_logs();
SetLogTime(logPtr,
AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta()
: now.tv_sec);
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key("alarm_type");
contentPtr->set_value(messagePtr->mMessageType);

contentPtr = logPtr->add_contents();
contentPtr->set_key("alarm_message");
contentPtr->set_value(messagePtr->mMessage);

contentPtr = logPtr->add_contents();
contentPtr->set_key("alarm_count");
contentPtr->set_value(ToString(messagePtr->mCount));
PTScopedLock lock(mAlarmBufferMutex);
if (mAllAlarmMap.size() <= sendRegionIndex) {
break;
}
auto allAlarmIter = mAllAlarmMap.begin();
size_t iterIndex = 0;
while (iterIndex != sendRegionIndex) {
++iterIndex;
++allAlarmIter;
}
string region = allAlarmIter->first;

contentPtr = logPtr->add_contents();
contentPtr->set_key("ip");
contentPtr->set_value(LoongCollectorMonitor::mIpAddr);
AlarmVector& alarmBufferVec = *(allAlarmIter->second.first);
std::vector<int32_t>& lastUpdateTimeVec = allAlarmIter->second.second;
// check this region end
if (sendAlarmTypeIndex >= alarmBufferVec.size()) {
// jump this region
++sendRegionIndex;
sendAlarmTypeIndex = 0;
continue;
}

contentPtr = logPtr->add_contents();
contentPtr->set_key("os");
contentPtr->set_value(OS_NAME);
// check valid
if (alarmBufferVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM
|| lastUpdateTimeVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM) {
LOG_ERROR(sLogger,
("invalid alarm item", region)("alarm vec", alarmBufferVec.size())("update vec",
lastUpdateTimeVec.size()));
// jump this region
++sendRegionIndex;
sendAlarmTypeIndex = 0;
continue;
}

contentPtr = logPtr->add_contents();
contentPtr->set_key("ver");
contentPtr->set_value(ILOGTAIL_VERSION);
map<string, unique_ptr<AlarmMessage>>& alarmMap = alarmBufferVec[sendAlarmTypeIndex];
if (alarmMap.size() == 0
|| currentTime - lastUpdateTimeVec[sendAlarmTypeIndex] < INT32_FLAG(logtail_alarm_interval)) {
// go next alarm type
++sendAlarmTypeIndex;
continue;
}

if (!messagePtr->mProjectName.empty()) {
contentPtr = logPtr->add_contents();
contentPtr->set_key("project_name");
contentPtr->set_value(messagePtr->mProjectName);
}
PipelineEventGroup pipelineEventGroup(std::make_shared<SourceBuffer>());
pipelineEventGroup.SetTagNoCopy(LOG_RESERVED_KEY_SOURCE, LoongCollectorMonitor::mIpAddr);
pipelineEventGroup.SetMetadata(EventGroupMetaKey::INTERNAL_DATA_TARGET_REGION, region);
pipelineEventGroup.SetMetadata(EventGroupMetaKey::INTERNAL_DATA_TYPE,
SelfMonitorServer::INTERNAL_DATA_TYPE_ALARM);
auto now = GetCurrentLogtailTime();
for (map<string, unique_ptr<AlarmMessage>>::iterator mapIter = alarmMap.begin(); mapIter != alarmMap.end();
++mapIter) {
auto& messagePtr = mapIter->second;

if (!messagePtr->mCategory.empty()) {
contentPtr = logPtr->add_contents();
contentPtr->set_key("category");
contentPtr->set_value(messagePtr->mCategory);
}
LogEvent* logEvent = pipelineEventGroup.AddLogEvent();
logEvent->SetTimestamp(AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta()
: now.tv_sec);
logEvent->SetContent("alarm_type", messagePtr->mMessageType);
logEvent->SetContent("alarm_message", messagePtr->mMessage);
logEvent->SetContent("alarm_count", ToString(messagePtr->mCount));
logEvent->SetContent("ip", LoongCollectorMonitor::mIpAddr);
logEvent->SetContent("os", OS_NAME);
logEvent->SetContent("ver", string(ILOGTAIL_VERSION));
if (!messagePtr->mProjectName.empty()) {
logEvent->SetContent("project_name", messagePtr->mProjectName);
}
if (!messagePtr->mCategory.empty()) {
logEvent->SetContent("category", messagePtr->mCategory);
}
lastUpdateTimeVec[sendAlarmTypeIndex] = currentTime;
alarmMap.clear();
++sendAlarmTypeIndex;
}
if (logGroup.logs_size() <= 0) {
lastUpdateTimeVec[sendAlarmTypeIndex] = currentTime;
alarmMap.clear();
++sendAlarmTypeIndex;

if (pipelineEventGroup.GetEvents().size() <= 0) {
continue;
}
// this is an anonymous send and non lock send
GetProfileSender()->SendToProfileProject(region, logGroup);
pipelineEventGroupList.emplace_back(std::move(pipelineEventGroup));
} while (true);
}

Expand Down
18 changes: 7 additions & 11 deletions core/monitor/AlarmManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <vector>

#include "common/Lock.h"
#include "models/PipelineEventGroup.h"

namespace logtail {

Expand Down Expand Up @@ -122,9 +123,6 @@ class AlarmManager {
return &instance;
}

void Init();
void Stop();

void SendAlarm(const AlarmType alarmType,
const std::string& message,
const std::string& projectName = "",
Expand All @@ -134,29 +132,27 @@ class AlarmManager {
void ForceToSend();
bool IsLowLevelAlarmValid();

void FlushAllRegionAlarm(std::vector<PipelineEventGroup>& pipelineEventGroupList);

private:
using AlarmVector = std::vector<std::map<std::string, std::unique_ptr<AlarmMessage>>>;

AlarmManager();
~AlarmManager() = default;

bool SendAlarmLoop();
// without lock
AlarmVector* MakesureLogtailAlarmMapVecUnlocked(const std::string& region);
void SendAllRegionAlarm();

std::future<bool> mThreadRes;
std::mutex mThreadRunningMux;
bool mIsThreadRunning = true;
std::condition_variable mStopCV;


std::vector<std::string> mMessageType;
std::map<std::string, std::pair<std::shared_ptr<AlarmVector>, std::vector<int32_t>>> mAllAlarmMap;
PTMutex mAlarmBufferMutex;

std::atomic_int mLastLowLevelTime{0};
std::atomic_int mLastLowLevelCount{0};

#ifdef APSARA_UNIT_TEST_MAIN
friend class AlarmManagerUnittest;
#endif
};

} // namespace logtail
Loading

0 comments on commit 8a3227b

Please sign in to comment.