Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add InputInternalAlarms #2061

Merged
merged 7 commits into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,6 @@ void Application::Start() { // GCOVR_EXCL_START
LogtailPlugin::GetInstance()->LoadPluginBase();
}

// TODO: this should be refactored to internal pipeline
AlarmManager::GetInstance()->Init();

time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, lastCheckTagsTime = 0, lastQueueGCTime = 0;
#ifndef LOGTAIL_NO_TC_MALLOC
time_t lastTcmallocReleaseMemTime = 0;
Expand Down Expand Up @@ -373,7 +370,6 @@ void Application::Exit() {

LogtailMonitor::GetInstance()->Stop();
LoongCollectorMonitor::GetInstance()->Stop();
AlarmManager::GetInstance()->Stop();
LogtailPlugin::GetInstance()->StopBuiltInModules();
// from now on, alarm should not be used.

Expand Down
1 change: 1 addition & 0 deletions core/collection_pipeline/CollectionPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class CollectionPipeline {
friend class PipelineUnittest;
friend class InputContainerStdioUnittest;
friend class InputFileUnittest;
friend class InputInternalAlarmsUnittest;
friend class InputInternalMetricsUnittest;
friend class InputPrometheusUnittest;
friend class ProcessorTagNativeUnittest;
Expand Down
2 changes: 2 additions & 0 deletions core/collection_pipeline/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "plugin/input/InputPrometheus.h"
#if defined(__linux__) && !defined(__ANDROID__)
#include "plugin/input/InputFileSecurity.h"
#include "plugin/input/InputInternalAlarms.h"
#include "plugin/input/InputInternalMetrics.h"
#include "plugin/input/InputNetworkObserver.h"
#include "plugin/input/InputNetworkSecurity.h"
Expand Down Expand Up @@ -129,6 +130,7 @@ bool PluginRegistry::IsValidNativeFlusherPlugin(const string& name) const {
void PluginRegistry::LoadStaticPlugins() {
RegisterInputCreator(new StaticInputCreator<InputFile>());
RegisterInputCreator(new StaticInputCreator<InputPrometheus>());
RegisterInputCreator(new StaticInputCreator<InputInternalAlarms>(), true);
RegisterInputCreator(new StaticInputCreator<InputInternalMetrics>(), true);
#if defined(__linux__) && !defined(__ANDROID__)
RegisterInputCreator(new StaticInputCreator<InputContainerStdio>());
Expand Down
3 changes: 3 additions & 0 deletions core/models/PipelineEventGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ enum class EventGroupMetaKey {
PROMETHEUS_STREAM_ID,
PROMETHEUS_STREAM_TOTAL,

INTERNAL_DATA_TARGET_REGION,
INTERNAL_DATA_TYPE,

SOURCE_ID
};

Expand Down
230 changes: 70 additions & 160 deletions core/monitor/AlarmManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "AlarmManager.h"
#include "monitor/AlarmManager.h"

#include "Monitor.h"
#include "app_config/AppConfig.h"
#include "collection_pipeline/queue/QueueKeyManager.h"
#include "collection_pipeline/queue/SenderQueueManager.h"
Expand All @@ -24,6 +23,7 @@
#include "common/TimeUtil.h"
#include "common/version.h"
#include "constants/Constants.h"
#include "monitor/SelfMonitorServer.h"
#include "protobuf/sls/sls_logs.pb.h"
#include "provider/Provider.h"

Expand Down Expand Up @@ -107,178 +107,88 @@ AlarmManager::AlarmManager() {
mMessageType[REGISTER_HANDLERS_TOO_SLOW_ALARM] = "REGISTER_HANDLERS_TOO_SLOW_ALARM";
}

void AlarmManager::Init() {
mThreadRes = async(launch::async, &AlarmManager::SendAlarmLoop, this);
}

void AlarmManager::Stop() {
ForceToSend();
{
lock_guard<mutex> lock(mThreadRunningMux);
mIsThreadRunning = false;
}
mStopCV.notify_one();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("alarm gathering", "stopped successfully"));
} else {
LOG_WARNING(sLogger, ("alarm gathering", "forced to stopped"));
}
}

bool AlarmManager::SendAlarmLoop() {
LOG_INFO(sLogger, ("alarm gathering", "started"));
{
unique_lock<mutex> lock(mThreadRunningMux);
while (mIsThreadRunning) {
SendAllRegionAlarm();
if (mStopCV.wait_for(lock, std::chrono::seconds(3), [this]() { return !mIsThreadRunning; })) {
break;
}
}
}
SendAllRegionAlarm();
return true;
}

void AlarmManager::SendAllRegionAlarm() {
void AlarmManager::FlushAllRegionAlarm(vector<PipelineEventGroup>& pipelineEventGroupList) {
int32_t currentTime = time(nullptr);
size_t sendRegionIndex = 0;
size_t sendAlarmTypeIndex = 0;
do {
LogGroup logGroup;
string region;
{
PTScopedLock lock(mAlarmBufferMutex);
if (mAllAlarmMap.size() <= sendRegionIndex) {
break;
}
auto allAlarmIter = mAllAlarmMap.begin();
size_t iterIndex = 0;
while (iterIndex != sendRegionIndex) {
++iterIndex;
++allAlarmIter;
}
region = allAlarmIter->first;
// LOG_DEBUG(sLogger, ("1Send Alarm", region)("region", sendRegionIndex));
AlarmVector& alarmBufferVec = *(allAlarmIter->second.first);
std::vector<int32_t>& lastUpdateTimeVec = allAlarmIter->second.second;
// check this region end
if (sendAlarmTypeIndex >= alarmBufferVec.size()) {
// jump this region
++sendRegionIndex;
sendAlarmTypeIndex = 0;
continue;
}
// LOG_DEBUG(sLogger, ("2Send Alarm", region)("region", sendRegionIndex)("alarm index",
// mMessageType[sendAlarmTypeIndex]));
// check valid
if (alarmBufferVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM
|| lastUpdateTimeVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM) {
LOG_ERROR(sLogger,
("invalid alarm item", region)("alarm vec", alarmBufferVec.size())("update vec",
lastUpdateTimeVec.size()));
// jump this region
++sendRegionIndex;
sendAlarmTypeIndex = 0;
continue;
}

// LOG_DEBUG(sLogger, ("3Send Alarm", region)("region", sendRegionIndex)("alarm index",
// mMessageType[sendAlarmTypeIndex]));
map<string, unique_ptr<AlarmMessage>>& alarmMap = alarmBufferVec[sendAlarmTypeIndex];
if (alarmMap.size() == 0
|| currentTime - lastUpdateTimeVec[sendAlarmTypeIndex] < INT32_FLAG(logtail_alarm_interval)) {
// go next alarm type
++sendAlarmTypeIndex;
continue;
}
// check sender queue status, if invalid jump this region

string project = GetProfileSender()->GetProfileProjectName(region);
QueueKey alarmPrjLogstoreKey
= QueueKeyManager::GetInstance()->GetKey("-flusher_sls-" + project + "#" + ALARM_SLS_LOGSTORE_NAME);
if (SenderQueueManager::GetInstance()->GetQueue(alarmPrjLogstoreKey) == nullptr) {
CollectionPipelineContext ctx;
SenderQueueManager::GetInstance()->CreateQueue(
alarmPrjLogstoreKey,
"self_monitor",
ctx,
{{"region", FlusherSLS::GetRegionConcurrencyLimiter(region)},
{"project", FlusherSLS::GetProjectConcurrencyLimiter(project)},
{"logstore", FlusherSLS::GetLogstoreConcurrencyLimiter(project, ALARM_SLS_LOGSTORE_NAME)}});
}
if (!SenderQueueManager::GetInstance()->IsValidToPush(alarmPrjLogstoreKey)) {
// jump this region
++sendRegionIndex;
sendAlarmTypeIndex = 0;
continue;
}

// LOG_DEBUG(sLogger, ("4Send Alarm", region)("region", sendRegionIndex)("alarm index",
// mMessageType[sendAlarmTypeIndex]));
logGroup.set_source(LoongCollectorMonitor::mIpAddr);
logGroup.set_category(ALARM_SLS_LOGSTORE_NAME);
auto now = GetCurrentLogtailTime();
for (map<string, unique_ptr<AlarmMessage>>::iterator mapIter = alarmMap.begin(); mapIter != alarmMap.end();
++mapIter) {
auto& messagePtr = mapIter->second;

// LOG_DEBUG(sLogger, ("5Send Alarm", region)("region", sendRegionIndex)("alarm index",
// sendAlarmTypeIndex)("msg", messagePtr->mMessage));

Log* logPtr = logGroup.add_logs();
SetLogTime(logPtr,
AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta()
: now.tv_sec);
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key("alarm_type");
contentPtr->set_value(messagePtr->mMessageType);

contentPtr = logPtr->add_contents();
contentPtr->set_key("alarm_message");
contentPtr->set_value(messagePtr->mMessage);

contentPtr = logPtr->add_contents();
contentPtr->set_key("alarm_count");
contentPtr->set_value(ToString(messagePtr->mCount));
PTScopedLock lock(mAlarmBufferMutex);
if (mAllAlarmMap.size() <= sendRegionIndex) {
break;
}
auto allAlarmIter = mAllAlarmMap.begin();
size_t iterIndex = 0;
while (iterIndex != sendRegionIndex) {
++iterIndex;
++allAlarmIter;
}
string region = allAlarmIter->first;

contentPtr = logPtr->add_contents();
contentPtr->set_key("ip");
contentPtr->set_value(LoongCollectorMonitor::mIpAddr);
AlarmVector& alarmBufferVec = *(allAlarmIter->second.first);
std::vector<int32_t>& lastUpdateTimeVec = allAlarmIter->second.second;
// check this region end
if (sendAlarmTypeIndex >= alarmBufferVec.size()) {
// jump this region
++sendRegionIndex;
sendAlarmTypeIndex = 0;
continue;
}

contentPtr = logPtr->add_contents();
contentPtr->set_key("os");
contentPtr->set_value(OS_NAME);
// check valid
if (alarmBufferVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM
|| lastUpdateTimeVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM) {
LOG_ERROR(sLogger,
("invalid alarm item", region)("alarm vec", alarmBufferVec.size())("update vec",
lastUpdateTimeVec.size()));
// jump this region
++sendRegionIndex;
sendAlarmTypeIndex = 0;
continue;
}

contentPtr = logPtr->add_contents();
contentPtr->set_key("ver");
contentPtr->set_value(ILOGTAIL_VERSION);
map<string, unique_ptr<AlarmMessage>>& alarmMap = alarmBufferVec[sendAlarmTypeIndex];
if (alarmMap.size() == 0
|| currentTime - lastUpdateTimeVec[sendAlarmTypeIndex] < INT32_FLAG(logtail_alarm_interval)) {
// go next alarm type
++sendAlarmTypeIndex;
continue;
}

if (!messagePtr->mProjectName.empty()) {
contentPtr = logPtr->add_contents();
contentPtr->set_key("project_name");
contentPtr->set_value(messagePtr->mProjectName);
}
PipelineEventGroup pipelineEventGroup(std::make_shared<SourceBuffer>());
pipelineEventGroup.SetTagNoCopy(LOG_RESERVED_KEY_SOURCE, LoongCollectorMonitor::mIpAddr);
pipelineEventGroup.SetMetadata(EventGroupMetaKey::INTERNAL_DATA_TARGET_REGION, region);
pipelineEventGroup.SetMetadata(EventGroupMetaKey::INTERNAL_DATA_TYPE,
SelfMonitorServer::INTERNAL_DATA_TYPE_ALARM);
auto now = GetCurrentLogtailTime();
for (map<string, unique_ptr<AlarmMessage>>::iterator mapIter = alarmMap.begin(); mapIter != alarmMap.end();
++mapIter) {
auto& messagePtr = mapIter->second;

if (!messagePtr->mCategory.empty()) {
contentPtr = logPtr->add_contents();
contentPtr->set_key("category");
contentPtr->set_value(messagePtr->mCategory);
}
LogEvent* logEvent = pipelineEventGroup.AddLogEvent();
logEvent->SetTimestamp(AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta()
: now.tv_sec);
logEvent->SetContent("alarm_type", messagePtr->mMessageType);
logEvent->SetContent("alarm_message", messagePtr->mMessage);
logEvent->SetContent("alarm_count", ToString(messagePtr->mCount));
logEvent->SetContent("ip", LoongCollectorMonitor::mIpAddr);
logEvent->SetContent("os", OS_NAME);
logEvent->SetContent("ver", string(ILOGTAIL_VERSION));
if (!messagePtr->mProjectName.empty()) {
logEvent->SetContent("project_name", messagePtr->mProjectName);
}
if (!messagePtr->mCategory.empty()) {
logEvent->SetContent("category", messagePtr->mCategory);
}
lastUpdateTimeVec[sendAlarmTypeIndex] = currentTime;
alarmMap.clear();
++sendAlarmTypeIndex;
}
if (logGroup.logs_size() <= 0) {
lastUpdateTimeVec[sendAlarmTypeIndex] = currentTime;
alarmMap.clear();
++sendAlarmTypeIndex;

if (pipelineEventGroup.GetEvents().size() <= 0) {
continue;
}
// this is an anonymous send and non lock send
GetProfileSender()->SendToProfileProject(region, logGroup);
pipelineEventGroupList.emplace_back(std::move(pipelineEventGroup));
} while (true);
}

Expand Down
18 changes: 7 additions & 11 deletions core/monitor/AlarmManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <vector>

#include "common/Lock.h"
#include "models/PipelineEventGroup.h"

namespace logtail {

Expand Down Expand Up @@ -122,9 +123,6 @@ class AlarmManager {
return &instance;
}

void Init();
void Stop();

void SendAlarm(const AlarmType alarmType,
const std::string& message,
const std::string& projectName = "",
Expand All @@ -134,29 +132,27 @@ class AlarmManager {
void ForceToSend();
bool IsLowLevelAlarmValid();

void FlushAllRegionAlarm(std::vector<PipelineEventGroup>& pipelineEventGroupList);

private:
using AlarmVector = std::vector<std::map<std::string, std::unique_ptr<AlarmMessage>>>;

AlarmManager();
~AlarmManager() = default;

bool SendAlarmLoop();
// without lock
AlarmVector* MakesureLogtailAlarmMapVecUnlocked(const std::string& region);
void SendAllRegionAlarm();

std::future<bool> mThreadRes;
std::mutex mThreadRunningMux;
bool mIsThreadRunning = true;
std::condition_variable mStopCV;


std::vector<std::string> mMessageType;
std::map<std::string, std::pair<std::shared_ptr<AlarmVector>, std::vector<int32_t>>> mAllAlarmMap;
PTMutex mAlarmBufferMutex;

std::atomic_int mLastLowLevelTime{0};
std::atomic_int mLastLowLevelCount{0};

#ifdef APSARA_UNIT_TEST_MAIN
friend class AlarmManagerUnittest;
#endif
};

} // namespace logtail
Loading
Loading