Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prom agent info for HPA and Rebalance #2028

Open
wants to merge 58 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
7af6044
feat: prom agent basic info and targets info
catdogpandas Jan 6, 2025
39c7284
feat: prom agent info basic health value
catdogpandas Jan 6, 2025
27a1849
chore: remove unnecessory logs
catdogpandas Jan 6, 2025
bb81d0e
feat: use response size instead of series
catdogpandas Jan 8, 2025
78c78a8
chore: remove job name in targets info
catdogpandas Jan 8, 2025
08e51b9
Merge branch 'main' into feat/prom-agent-info
catdogpandas Jan 8, 2025
4399c2b
feat: update labels hash calc
catdogpandas Jan 8, 2025
51fe67d
Merge branch 'main' into feat/prom-agent-info
catdogpandas Jan 8, 2025
f479d2d
feat: update instance calc
catdogpandas Jan 9, 2025
f9ac8bc
Revert "feat: update instance calc"
catdogpandas Jan 9, 2025
bd68622
Merge branch 'main' into feat/prom-agent-info
catdogpandas Jan 9, 2025
32f3a57
chore: update instance
catdogpandas Jan 9, 2025
d811148
chore: update
catdogpandas Jan 9, 2025
c74b96c
feat: calc hash
catdogpandas Jan 9, 2025
5ca13cb
chore: update
catdogpandas Jan 9, 2025
2cb6eb3
update
catdogpandas Jan 13, 2025
0874105
update
catdogpandas Jan 13, 2025
60d1a82
update
catdogpandas Jan 14, 2025
3c43d20
feat: update health calc
catdogpandas Jan 14, 2025
502b9d6
update
catdogpandas Jan 14, 2025
6c9d94c
update
catdogpandas Jan 14, 2025
1a63a12
update
catdogpandas Jan 14, 2025
9f15143
update
catdogpandas Jan 14, 2025
dbdc8c6
update
catdogpandas Jan 14, 2025
7eea950
feat: add lc_target_hash
catdogpandas Jan 14, 2025
eea0acd
Merge branch 'main' into feat/prom-agent-info
catdogpandas Jan 14, 2025
46cfaf4
update
catdogpandas Jan 15, 2025
728a34d
update
catdogpandas Jan 15, 2025
4db3f54
prom job
catdogpandas Jan 15, 2025
0a6d8c8
update
catdogpandas Jan 15, 2025
7ff69db
update
catdogpandas Jan 15, 2025
58d713c
chore: update ut
catdogpandas Jan 17, 2025
f89dc8c
chore: update ut
catdogpandas Jan 17, 2025
574fd92
update
catdogpandas Jan 17, 2025
640729b
update
catdogpandas Jan 17, 2025
b67091a
chore: update code style
catdogpandas Jan 23, 2025
9eacb7b
Merge branch 'main' into feat/prom-agent-info
catdogpandas Jan 23, 2025
b24f0f6
update code style
catdogpandas Jan 23, 2025
bcb3c42
update code style
catdogpandas Jan 23, 2025
123ff49
update code style
catdogpandas Jan 23, 2025
ecb992d
chore: update
catdogpandas Jan 24, 2025
3c61cf7
chore: update
catdogpandas Jan 24, 2025
3b82b6d
chore: update code style
catdogpandas Jan 24, 2025
408cf35
Merge branch 'main' into feat/prom-agent-info
catdogpandas Feb 11, 2025
8e02f74
chore: remove health ut
catdogpandas Feb 11, 2025
94558d0
Merge branch 'main' into feat/prom-agent-info
catdogpandas Feb 19, 2025
6d508cd
chore: update
catdogpandas Feb 19, 2025
315925f
Merge branch 'main' into feat/prom-agent-info
catdogpandas Feb 25, 2025
ccb5f20
chore: fix ut
catdogpandas Feb 25, 2025
cb0d2ea
chore: add ut
catdogpandas Feb 25, 2025
0c0acba
Merge branch 'main' into feat/prom-agent-info
catdogpandas Feb 27, 2025
690e009
chore: update
catdogpandas Feb 27, 2025
2794943
feat: add http sink info
catdogpandas Feb 27, 2025
a9711a3
chore: update
catdogpandas Feb 27, 2025
c444ee8
Merge branch 'main' into feat/prom-agent-info
catdogpandas Feb 27, 2025
baa25ae
chore: update
catdogpandas Feb 27, 2025
2718e8e
chore: update
catdogpandas Feb 27, 2025
a643527
chore: update
catdogpandas Feb 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/monitor/Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ void LogtailMonitor::Monitor() {

GetMemStat();
CalCpuStat(curCpuStat, mCpuStat);
mCpuUsage.store(mCpuStat.mCpuUsage);
mMemoryUsage.store(mMemStat.mRss);
if (CheckHardMemLimit()) {
LOG_ERROR(sLogger,
("Resource used by program exceeds hard limit",
Expand Down
5 changes: 5 additions & 0 deletions core/monitor/Monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class LogtailMonitor {
// GetRealtimeCpuLevel return a value to indicates current CPU usage level.
// LogInput use it to do flow control.
float GetRealtimeCpuLevel() { return mRealtimeCpuStat.mCpuUsage / mScaledCpuUsageUpLimit; }
[[nodiscard]] float GetCpuUsage() const { return mCpuUsage.load(); }
[[nodiscard]] float GetMemoryUsage() const { return mMemoryUsage.load(); }

private:
LogtailMonitor();
Expand Down Expand Up @@ -160,6 +162,9 @@ class LogtailMonitor {
// Memory usage statistics.
MemStat mMemStat;

std::atomic<float> mCpuUsage = 0;
std::atomic<float> mMemoryUsage = 0;

// Current scale up level, updated by CheckScaledCpuUsageUpLimit.
float mScaledCpuUsageUpLimit;
#if defined(__linux__)
Expand Down
15 changes: 11 additions & 4 deletions core/plugin/processor/inner/ProcessorPromRelabelMetricNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
#include "plugin/processor/inner/ProcessorPromRelabelMetricNative.h"

#include <cstddef>

#include "json/json.h"
#include <json/json.h>

#include "common/Flags.h"
#include "common/StringTools.h"
Expand Down Expand Up @@ -186,18 +185,26 @@ void ProcessorPromRelabelMetricNative::UpdateAutoMetrics(const PipelineEventGrou

void ProcessorPromRelabelMetricNative::AddAutoMetrics(PipelineEventGroup& eGroup,
const prom::AutoMetric& autoMetric) const {
auto targetTags = eGroup.GetTags();
if (!eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC)) {
LOG_ERROR(sLogger, ("scrape_timestamp_milliseconds is not set", ""));
return;
}
auto targetTags = eGroup.GetTags();
auto toDelete = GetToDeleteTargetLabels(targetTags);
for (const auto& item : toDelete) {
targetTags.erase(item);
}
if (!eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID)) {
LOG_ERROR(sLogger, ("prometheus stream id", ""));
return;
}
targetTags[prometheus::LC_TARGET_HASH] = eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID);

StringView scrapeTimestampMilliSecStr = eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC);
auto timestampMilliSec = StringTo<uint64_t>(scrapeTimestampMilliSecStr.to_string());
auto timestamp = timestampMilliSec / 1000;
auto nanoSec = timestampMilliSec % 1000 * 1000000;


AddMetric(
eGroup, prometheus::SCRAPE_DURATION_SECONDS, autoMetric.mScrapeDurationSeconds, timestamp, nanoSec, targetTags);

Expand Down
13 changes: 13 additions & 0 deletions core/prometheus/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const uint64_t RefeshIntervalSeconds = 5;
const char* const META = "__meta_";
const char* const UNDEFINED = "undefined";
const std::string PROMETHEUS = "prometheus";
const char* const LC_TARGET_HASH = "lc_target_hash";

// relabel config
const char* const SOURCE_LABELS = "source_labels";
Expand Down Expand Up @@ -100,6 +101,8 @@ const char* const SCHEME_LABEL_NAME = "__scheme__";
const char* const METRICS_PATH_LABEL_NAME = "__metrics_path__";
const char* const PARAM_LABEL_NAME = "__param_";
const char* const LABELS = "labels";
const char* const TARGET_HASH = "hash";
const char* const REBALANCE_MS = "rebalance_ms";

// auto metrics
const char* const SCRAPE_STATE = "scrape_state";
Expand All @@ -119,4 +122,14 @@ const char* const ACCEPT_ENCODING = "Accept-Encoding";
const char* const GZIP = "gzip";
const char* const IDENTITY = "identity";

const char* const AGENT_INFO = "AgentInfo";
const char* const TARGETS_INFO = "TargetsInfo";
const char* const CPU_LIMIT = "CpuLimit";
const char* const CPU_USAGE = "CpuUsage";
const char* const MEM_LIMIT = "MemLimit";
const char* const MEM_USAGE = "MemUsage";
const char* const HASH = "Hash";
const char* const SIZE = "Size";
const char* const SCRAPE_DELAY_SECONDS = "ScrapeDelaySeconds";

} // namespace logtail::prometheus
35 changes: 35 additions & 0 deletions core/prometheus/PrometheusInputRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>

#include "application/Application.h"
#include "collection_pipeline/queue/ProcessQueueManager.h"
#include "common/Flags.h"
#include "common/JsonUtil.h"
#include "common/StringTools.h"
Expand All @@ -31,6 +32,7 @@
#include "common/http/Curl.h"
#include "common/timer/Timer.h"
#include "logger/Logger.h"
#include "monitor/Monitor.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "prometheus/Constants.h"
Expand All @@ -51,6 +53,7 @@ PrometheusInputRunner::PrometheusInputRunner()
mEventPool(true),
mUnRegisterMs(0) {
mTimer = std::make_shared<Timer>();
mLastUpdateTime = std::chrono::steady_clock::now();

// self monitor
MetricLabels labels;
Expand Down Expand Up @@ -175,6 +178,8 @@ void PrometheusInputRunner::Init() {
mUnRegisterMs = 0;
} else {
mUnRegisterMs.store(StringTo<uint64_t>(tmpStr));
// adjust unRegisterMs to scrape targets for zero-cost
mUnRegisterMs -= 1000;
LOG_INFO(sLogger, ("unRegisterMs", ToString(mUnRegisterMs)));
}
}
Expand Down Expand Up @@ -293,4 +298,34 @@ string PrometheusInputRunner::GetAllProjects() {
void PrometheusInputRunner::CheckGC() {
mEventPool.CheckGC();
}

PromAgentInfo PrometheusInputRunner::GetAgentInfo() {
std::lock_guard<mutex> lock(mAgentInfoMutex);
auto curTime = std::chrono::steady_clock::now();
#ifdef APSARA_UNIT_TEST_MAIN
curTime += std::chrono::seconds(prometheus::RefeshIntervalSeconds);
#endif
if (curTime - mLastUpdateTime >= std::chrono::seconds(prometheus::RefeshIntervalSeconds)) {
mLastUpdateTime = curTime;
mAgentInfo.mCpuUsage = LogtailMonitor::GetInstance()->GetCpuUsage();
mAgentInfo.mMemUsage = LogtailMonitor::GetInstance()->GetMemoryUsage();
mAgentInfo.mCpuLimit = AppConfig::GetInstance()->GetCpuUsageUpLimit();
mAgentInfo.mMemLimit = AppConfig::GetInstance()->GetMemUsageUpLimit();

int queueNums = 0;
int validToPushNums = 0;

{
ReadLock lock(mSubscriberMapRWLock);
queueNums = mTargetSubscriberSchedulerMap.size();
for (auto& [k, v] : mTargetSubscriberSchedulerMap) {
if (ProcessQueueManager::GetInstance()->IsValidToPush(v->mQueueKey)) {
validToPushNums++;
}
}
}
}

return mAgentInfo;
}
}; // namespace logtail
12 changes: 12 additions & 0 deletions core/prometheus/PrometheusInputRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@

namespace logtail {

struct PromAgentInfo {
float mCpuUsage;
float mMemUsage;
float mCpuLimit;
float mMemLimit;
};

class PrometheusInputRunner : public InputRunner {
public:
PrometheusInputRunner(const PrometheusInputRunner&) = delete;
Expand All @@ -42,6 +49,7 @@ class PrometheusInputRunner : public InputRunner {
return &sInstance;
}
void CheckGC();
PromAgentInfo GetAgentInfo();

// input plugin update
void UpdateScrapeInput(std::shared_ptr<TargetSubscriberScheduler> targetSubscriber,
Expand Down Expand Up @@ -74,6 +82,10 @@ class PrometheusInputRunner : public InputRunner {
int32_t mServicePort;
std::string mPodName;

std::mutex mAgentInfoMutex;
PromAgentInfo mAgentInfo{0.0F, 0.0F, 0.0F, 0.0F};
std::chrono::steady_clock::time_point mLastUpdateTime;

std::shared_ptr<Timer> mTimer;
EventPool mEventPool;

Expand Down
4 changes: 2 additions & 2 deletions core/prometheus/component/StreamScraper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void StreamScraper::PushEventGroup(PipelineEventGroup&& eGroup) const {
void StreamScraper::SendMetrics() {
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC,
ToString(mScrapeTimestampMilliSec));
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID, GetId() + ToString(mScrapeTimestampMilliSec));
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID, GetId());

SetTargetLabels(mEventGroup);
PushEventGroup(std::move(mEventGroup));
Expand All @@ -127,7 +127,7 @@ void StreamScraper::SetAutoMetricMeta(double scrapeDurationSeconds, bool upState
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_DURATION, ToString(scrapeDurationSeconds));
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_RESPONSE_SIZE, ToString(mRawSize));
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_UP_STATE, ToString(upState));
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID, GetId() + ToString(mScrapeTimestampMilliSec));
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID, GetId());
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_TOTAL, ToString(mStreamIndex));
}
std::string StreamScraper::GetId() {
Expand Down
24 changes: 20 additions & 4 deletions core/prometheus/labels/Labels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,26 @@ void Labels::Range(const std::function<void(const string& k, const string& v)>&
uint64_t Labels::Hash() {
string hash;
uint64_t sum = prometheus::OFFSET64;
Range([&hash](const string& k, const string& v) { hash += k + "\xff" + v + "\xff"; });
for (auto i : hash) {
sum ^= (uint64_t)i;
sum *= prometheus::PRIME64;
vector<string> names;
Range([&names](const string& k, const string&) { names.push_back(k); });
sort(names.begin(), names.end());
auto calc = [](uint64_t h, uint64_t c) {
h ^= (uint64_t)c;
h *= prometheus::PRIME64;
return h;
};
auto calcString = [](uint64_t h, const string& s) {
for (auto c : s) {
h ^= (uint64_t)c;
h *= prometheus::PRIME64;
}
return h;
};
for (const auto& name : names) {
sum = calcString(sum, name);
sum = calc(sum, 255);
sum = calcString(sum, Get(name));
sum = calc(sum, 255);
}
return sum;
}
Expand Down
4 changes: 4 additions & 0 deletions core/prometheus/schedulers/BaseScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ using namespace std;
namespace logtail {
void BaseScheduler::ExecDone() {
mExecCount++;
while (mLatestExecTime > mFirstExecTime + chrono::seconds(mExecCount * mInterval)) {
mExecCount++;
}

mLatestExecTime = mFirstExecTime + chrono::seconds(mExecCount * mInterval);
mLatestScrapeTime = mFirstScrapeTime + chrono::seconds(mExecCount * mInterval);
}
Expand Down
35 changes: 16 additions & 19 deletions core/prometheus/schedulers/ScrapeScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,18 @@ using namespace std;
namespace logtail {

ScrapeScheduler::ScrapeScheduler(std::shared_ptr<ScrapeConfig> scrapeConfigPtr,
std::string host,
string host,
int32_t port,
Labels labels,
QueueKey queueKey,
size_t inputIndex)
size_t inputIndex,
const PromTargetInfo& targetInfo)
: mScrapeConfigPtr(std::move(scrapeConfigPtr)),
mHost(std::move(host)),
mPort(port),
mTargetInfo(targetInfo),
mQueueKey(queueKey),
mInputIndex(inputIndex),
mTargetLabels(labels) {
string tmpTargetURL = mScrapeConfigPtr->mScheme + "://" + mHost + ":" + ToString(mPort)
+ mScrapeConfigPtr->mMetricsPath
+ (mScrapeConfigPtr->mQueryString.empty() ? "" : "?" + mScrapeConfigPtr->mQueryString);
mHash = mScrapeConfigPtr->mJobName + tmpTargetURL + ToString(labels.Hash());
mInstance = mHost + ":" + ToString(mPort);
mScrapeResponseSizeBytes(-1) {
mInterval = mScrapeConfigPtr->mScrapeIntervalSeconds;
}

Expand Down Expand Up @@ -83,28 +79,27 @@ void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t) {
scrapeState = prom::NetworkCodeToState(NetworkCode::Ok);
}

mScrapeDurationSeconds = scrapeDurationMilliSeconds * sRate;
mUpState = response.GetStatusCode() == 200;
if (response.GetStatusCode() != 200) {
LOG_WARNING(sLogger,
("scrape failed, status code",
response.GetStatusCode())("target", mHash)("curl msg", response.GetNetworkStatus().mMessage));
("scrape failed, status code", response.GetStatusCode())("target", mTargetInfo.mHash)(
"curl msg", response.GetNetworkStatus().mMessage));
}

auto mScrapeDurationSeconds = scrapeDurationMilliSeconds * sRate;
auto mUpState = response.GetStatusCode() == 200;
auto scrapeDurationSeconds = scrapeDurationMilliSeconds * sRate;
auto upState = response.GetStatusCode() == 200;
streamScraper->mStreamIndex++;
streamScraper->FlushCache();
streamScraper->SetAutoMetricMeta(mScrapeDurationSeconds, mUpState, scrapeState);
streamScraper->SetAutoMetricMeta(scrapeDurationSeconds, upState, scrapeState);
streamScraper->SendMetrics();
mScrapeResponseSizeBytes = streamScraper->mRawSize;
streamScraper->Reset();

mPluginTotalDelayMs->Add(scrapeDurationMilliSeconds);
}


string ScrapeScheduler::GetId() const {
return mHash;
return mTargetInfo.mHash;
}

void ScrapeScheduler::SetComponent(shared_ptr<Timer> timer, EventPool* eventPool) {
Expand All @@ -126,6 +121,7 @@ void ScrapeScheduler::ScheduleNext() {
return true;
}
this->DelayExecTime(1);
this->mExecDelayCount++;
this->mPromDelayTotal->Add(1);
this->ScheduleNext();
return false;
Expand Down Expand Up @@ -176,7 +172,8 @@ std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::
mScrapeConfigPtr->mRequestHeaders,
"",
HttpResponse(
new prom::StreamScraper(mTargetLabels, mQueueKey, mInputIndex, mHash, mEventPool, mLatestScrapeTime),
new prom::StreamScraper(
mTargetInfo.mLabels, mQueueKey, mInputIndex, mTargetInfo.mHash, mEventPool, mLatestScrapeTime),
[](void* p) { delete static_cast<prom::StreamScraper*>(p); },
prom::StreamScraper::MetricWriteCallback),
mScrapeConfigPtr->mScrapeTimeoutSeconds,
Expand Down Expand Up @@ -206,7 +203,7 @@ void ScrapeScheduler::Cancel() {
void ScrapeScheduler::InitSelfMonitor(const MetricLabels& defaultLabels) {
mSelfMonitor = std::make_shared<PromSelfMonitorUnsafe>();
MetricLabels labels = defaultLabels;
labels.emplace_back(METRIC_LABEL_KEY_INSTANCE, mInstance);
labels.emplace_back(METRIC_LABEL_KEY_INSTANCE, mTargetInfo.mInstance);

static const std::unordered_map<std::string, MetricType> sScrapeMetricKeys
= {{METRIC_PLUGIN_OUT_EVENTS_TOTAL, MetricType::METRIC_TYPE_COUNTER},
Expand Down
Loading
Loading