Skip to content

Commit

Permalink
1. support honor_timestamps, 2. support honor_labels, apply global mT…
Browse files Browse the repository at this point in the history
…ags to MetricEvent in relabel phase 3. optimization relabel, manage MetricEvent tags directly (#1742)
  • Loading branch information
catdogpandas authored Oct 8, 2024
1 parent eb19fdf commit bf7de40
Show file tree
Hide file tree
Showing 30 changed files with 946 additions and 802 deletions.
1 change: 0 additions & 1 deletion core/models/PipelineEventGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ enum class EventGroupMetaKey {
PROMETHEUS_SCRAPE_RESPONSE_SIZE,
PROMETHEUS_SAMPLES_SCRAPED,
PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC,
PROMETHEUS_INSTANCE,
PROMETHEUS_UP_STATE,

SOURCE_ID
Expand Down
19 changes: 14 additions & 5 deletions core/plugin/processor/inner/ProcessorPromParseMetricNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ namespace logtail {
const string ProcessorPromParseMetricNative::sName = "processor_prom_parse_metric_native";

// only for inner processor
bool ProcessorPromParseMetricNative::Init(const Json::Value&) {
bool ProcessorPromParseMetricNative::Init(const Json::Value& config) {
mScrapeConfigPtr = std::make_unique<ScrapeConfig>();
if (!mScrapeConfigPtr->InitStaticConfig(config)) {
return false;
}
return true;
}

Expand All @@ -27,9 +31,11 @@ void ProcessorPromParseMetricNative::Process(PipelineEventGroup& eGroup) {
auto timestampMilliSec = StringTo<uint64_t>(scrapeTimestampMilliSecStr.to_string());
auto timestamp = timestampMilliSec / 1000;
auto nanoSec = timestampMilliSec % 1000 * 1000000;
TextParser parser(mScrapeConfigPtr->mHonorTimestamps);
parser.SetDefaultTimestamp(timestamp, nanoSec);

for (auto& e : events) {
ProcessEvent(e, newEvents, eGroup, timestamp, nanoSec);
ProcessEvent(e, newEvents, eGroup, parser);
}
events.swap(newEvents);
eGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SAMPLES_SCRAPED, ToString(events.size()));
Expand All @@ -39,14 +45,17 @@ bool ProcessorPromParseMetricNative::IsSupportedEvent(const PipelineEventPtr& e)
return e.Is<LogEvent>();
}

bool ProcessorPromParseMetricNative::ProcessEvent(
PipelineEventPtr& e, EventsContainer& newEvents, PipelineEventGroup& eGroup, uint64_t timestamp, uint32_t nanoSec) {
bool ProcessorPromParseMetricNative::ProcessEvent(PipelineEventPtr& e,
EventsContainer& newEvents,
PipelineEventGroup& eGroup,
TextParser& parser) {
if (!IsSupportedEvent(e)) {
return false;
}
auto& sourceEvent = e.Cast<LogEvent>();
std::unique_ptr<MetricEvent> metricEvent = eGroup.CreateMetricEvent();
if (mParser.ParseLine(sourceEvent.GetContent(prometheus::PROMETHEUS), timestamp, nanoSec, *metricEvent)) {
if (parser.ParseLine(sourceEvent.GetContent(prometheus::PROMETHEUS), *metricEvent)) {
metricEvent->SetTag(string(prometheus::NAME), metricEvent->GetName());
newEvents.emplace_back(std::move(metricEvent));
}
return true;
Expand Down
5 changes: 3 additions & 2 deletions core/plugin/processor/inner/ProcessorPromParseMetricNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "models/PipelineEventPtr.h"
#include "pipeline/plugin/interface/Processor.h"
#include "prometheus/labels/TextParser.h"
#include "prometheus/schedulers/ScrapeConfig.h"

namespace logtail {
class ProcessorPromParseMetricNative : public Processor {
Expand All @@ -20,8 +21,8 @@ class ProcessorPromParseMetricNative : public Processor {
bool IsSupportedEvent(const PipelineEventPtr&) const override;

private:
bool ProcessEvent(PipelineEventPtr&, EventsContainer&, PipelineEventGroup&, uint64_t timestamp, uint32_t nanoSec);
TextParser mParser;
bool ProcessEvent(PipelineEventPtr&, EventsContainer&, PipelineEventGroup&, TextParser& parser);
std::unique_ptr<ScrapeConfig> mScrapeConfigPtr;

#ifdef APSARA_UNIT_TEST_MAIN
friend class InputPrometheusUnittest;
Expand Down
178 changes: 96 additions & 82 deletions core/plugin/processor/inner/ProcessorPromRelabelMetricNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,116 +19,117 @@

#include <cstddef>

#include "common/Flags.h"
#include "common/StringTools.h"
#include "models/MetricEvent.h"
#include "models/PipelineEventGroup.h"
#include "models/PipelineEventPtr.h"
#include "prometheus/Constants.h"
#include "prometheus/Utils.h"

using namespace std;
DECLARE_FLAG_STRING(_pod_name_);
namespace logtail {

const string ProcessorPromRelabelMetricNative::sName = "processor_prom_relabel_metric_native";

// only for inner processor
bool ProcessorPromRelabelMetricNative::Init(const Json::Value& config) {
std::string errorMsg;
if (config.isMember(prometheus::METRIC_RELABEL_CONFIGS) && config[prometheus::METRIC_RELABEL_CONFIGS].isArray()
&& config[prometheus::METRIC_RELABEL_CONFIGS].size() > 0) {
for (const auto& item : config[prometheus::METRIC_RELABEL_CONFIGS]) {
mRelabelConfigs.emplace_back(item);
if (!mRelabelConfigs.back().Validate()) {
errorMsg = "metric_relabel_configs is invalid";
LOG_ERROR(sLogger, ("init prometheus processor failed", errorMsg));
return false;
}
}
}


if (config.isMember(prometheus::JOB_NAME) && config[prometheus::JOB_NAME].isString()) {
mJobName = config[prometheus::JOB_NAME].asString();
} else {
mScrapeConfigPtr = std::make_unique<ScrapeConfig>();
if (!mScrapeConfigPtr->InitStaticConfig(config)) {
return false;
}
if (config.isMember(prometheus::SCRAPE_TIMEOUT) && config[prometheus::SCRAPE_TIMEOUT].isString()) {
string tmpScrapeTimeoutString = config[prometheus::SCRAPE_TIMEOUT].asString();
mScrapeTimeoutSeconds = DurationToSecond(tmpScrapeTimeoutString);
} else {
mScrapeTimeoutSeconds = 10;
}
if (config.isMember(prometheus::SAMPLE_LIMIT) && config[prometheus::SAMPLE_LIMIT].isInt64()) {
mSampleLimit = config[prometheus::SAMPLE_LIMIT].asInt64();
} else {
mSampleLimit = -1;
}
if (config.isMember(prometheus::SERIES_LIMIT) && config[prometheus::SERIES_LIMIT].isInt64()) {
mSeriesLimit = config[prometheus::SERIES_LIMIT].asInt64();
} else {
mSeriesLimit = -1;
}

mLoongCollectorScraper = STRING_FLAG(_pod_name_);

return true;
}

void ProcessorPromRelabelMetricNative::Process(PipelineEventGroup& metricGroup) {
auto instance = metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_INSTANCE);

EventsContainer& events = metricGroup.MutableEvents();

size_t wIdx = 0;
for (size_t rIdx = 0; rIdx < events.size(); ++rIdx) {
if (ProcessEvent(events[rIdx], instance)) {
if (wIdx != rIdx) {
events[wIdx] = std::move(events[rIdx]);
// if mMetricRelabelConfigs is empty and honor_labels is true, skip it
auto targetTags = metricGroup.GetTags();
if (!mScrapeConfigPtr->mMetricRelabelConfigs.Empty() || !targetTags.empty()) {
EventsContainer& events = metricGroup.MutableEvents();
size_t wIdx = 0;
for (size_t rIdx = 0; rIdx < events.size(); ++rIdx) {
if (ProcessEvent(events[rIdx], targetTags)) {
if (wIdx != rIdx) {
events[wIdx] = std::move(events[rIdx]);
}
++wIdx;
}
++wIdx;
}
events.resize(wIdx);
}

// delete mTags when key starts with __
for (const auto& [k, v] : targetTags) {
if (k.starts_with("__")) {
metricGroup.DelTag(k);
}
}
events.resize(wIdx);

AddAutoMetrics(metricGroup);

// delete all tags
for (const auto& [k, v] : targetTags) {
metricGroup.DelTag(k);
}
}

bool ProcessorPromRelabelMetricNative::IsSupportedEvent(const PipelineEventPtr& e) const {
return e.Is<MetricEvent>();
}

bool ProcessorPromRelabelMetricNative::ProcessEvent(PipelineEventPtr& e, StringView instance) {
bool ProcessorPromRelabelMetricNative::ProcessEvent(PipelineEventPtr& e, const GroupTags& targetTags) {
if (!IsSupportedEvent(e)) {
return false;
}
auto& sourceEvent = e.Cast<MetricEvent>();

Labels labels;

labels.Reset(&sourceEvent);
Labels result;

// if keep this sourceEvent
if (prometheus::Process(labels, mRelabelConfigs, result)) {
// if k/v in labels by not result, then delete it
labels.Range([&result, &sourceEvent](const Label& label) {
if (result.Get(label.name).empty()) {
sourceEvent.DelTag(StringView(label.name));
if (!mScrapeConfigPtr->mHonorLabels) {
// metric event labels is secondary
// if confiliction, then rename it exported_<label_name>
for (const auto& [k, v] : targetTags) {
if (sourceEvent.HasTag(k)) {
auto key = prometheus::EXPORTED_PREFIX + k.to_string();
sourceEvent.SetTag(key, sourceEvent.GetTag(k).to_string());
sourceEvent.DelTag(k);
} else {
sourceEvent.SetTag(k, v);
}
}
} else {
// if mHonorLabels is true, then keep sourceEvent labels
for (const auto& [k, v] : targetTags) {
if (!sourceEvent.HasTag(k)) {
sourceEvent.SetTag(k, v);
}
});
}
}

// for each k/v in result, set it to sourceEvent
result.Range([&sourceEvent](const Label& label) { sourceEvent.SetTag(label.name, label.value); });
if (!mScrapeConfigPtr->mMetricRelabelConfigs.Empty()
&& !mScrapeConfigPtr->mMetricRelabelConfigs.Process(sourceEvent)) {
return false;
}
// set metricEvent name
sourceEvent.SetNameNoCopy(sourceEvent.GetTag(prometheus::NAME));

// set metricEvent name
if (!result.Get(prometheus::NAME).empty()) {
sourceEvent.SetName(result.Get(prometheus::NAME));

// delete tag __<label_name>
vector<StringView> toDelete;
for (auto it = sourceEvent.TagsBegin(); it != sourceEvent.TagsEnd(); ++it) {
if (it->first.starts_with("__")) {
toDelete.push_back(it->first);
}
}
for (const auto& k : toDelete) {
sourceEvent.DelTag(k);
}

sourceEvent.SetTag(prometheus::JOB, mJobName);
sourceEvent.SetTag(prometheus::INSTANCE, instance);
// set metricEvent name
sourceEvent.SetTag(prometheus::NAME, sourceEvent.GetName());

return true;
}
return false;
return true;
}

void ProcessorPromRelabelMetricNative::AddAutoMetrics(PipelineEventGroup& metricGroup) {
Expand All @@ -137,61 +138,74 @@ void ProcessorPromRelabelMetricNative::AddAutoMetrics(PipelineEventGroup& metric
return;
}

auto targetTags = metricGroup.GetTags();

StringView scrapeTimestampMilliSecStr
= metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC);
auto timestampMilliSec = StringTo<uint64_t>(scrapeTimestampMilliSecStr.to_string());
auto timestamp = timestampMilliSec / 1000;
auto nanoSec = timestampMilliSec % 1000 * 1000000;

auto instance = metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_INSTANCE);

uint64_t samplesPostMetricRelabel = metricGroup.GetEvents().size();

auto scrapeDurationSeconds
= StringTo<double>(metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_DURATION).to_string());

AddMetric(metricGroup, prometheus::SCRAPE_DURATION_SECONDS, scrapeDurationSeconds, timestamp, nanoSec, instance);
AddMetric(metricGroup, prometheus::SCRAPE_DURATION_SECONDS, scrapeDurationSeconds, timestamp, nanoSec, targetTags);

auto scrapeResponseSize
= StringTo<uint64_t>(metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_RESPONSE_SIZE).to_string());
AddMetric(metricGroup, prometheus::SCRAPE_RESPONSE_SIZE_BYTES, scrapeResponseSize, timestamp, nanoSec, instance);

if (mSampleLimit > 0) {
AddMetric(metricGroup, prometheus::SCRAPE_SAMPLES_LIMIT, mSampleLimit, timestamp, nanoSec, instance);
AddMetric(metricGroup, prometheus::SCRAPE_RESPONSE_SIZE_BYTES, scrapeResponseSize, timestamp, nanoSec, targetTags);

if (mScrapeConfigPtr->mSampleLimit > 0) {
AddMetric(metricGroup,
prometheus::SCRAPE_SAMPLES_LIMIT,
mScrapeConfigPtr->mSampleLimit,
timestamp,
nanoSec,
targetTags);
}

AddMetric(metricGroup,
prometheus::SCRAPE_SAMPLES_POST_METRIC_RELABELING,
samplesPostMetricRelabel,
timestamp,
nanoSec,
instance);
targetTags);

auto samplesScraped
= StringTo<uint64_t>(metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SAMPLES_SCRAPED).to_string());

AddMetric(metricGroup, prometheus::SCRAPE_SAMPLES_SCRAPED, samplesScraped, timestamp, nanoSec, instance);
AddMetric(metricGroup, prometheus::SCRAPE_SAMPLES_SCRAPED, samplesScraped, timestamp, nanoSec, targetTags);

AddMetric(metricGroup, prometheus::SCRAPE_TIMEOUT_SECONDS, mScrapeTimeoutSeconds, timestamp, nanoSec, instance);
AddMetric(metricGroup,
prometheus::SCRAPE_TIMEOUT_SECONDS,
mScrapeConfigPtr->mScrapeTimeoutSeconds,
timestamp,
nanoSec,
targetTags);

// up metric must be the last one
bool upState = StringTo<bool>(metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_UP_STATE).to_string());

AddMetric(metricGroup, prometheus::UP, 1.0 * upState, timestamp, nanoSec, instance);
AddMetric(metricGroup, prometheus::UP, 1.0 * upState, timestamp, nanoSec, targetTags);
}

void ProcessorPromRelabelMetricNative::AddMetric(PipelineEventGroup& metricGroup,
const string& name,
double value,
time_t timestamp,
uint32_t nanoSec,
StringView instance) {
const GroupTags& targetTags) {
auto* metricEvent = metricGroup.AddMetricEvent();
metricEvent->SetName(name);
metricEvent->SetValue<UntypedSingleValue>(value);
metricEvent->SetTimestamp(timestamp, nanoSec);
metricEvent->SetTag(prometheus::JOB, mJobName);
metricEvent->SetTag(prometheus::INSTANCE, instance);
metricEvent->SetTag(prometheus::NAME, name);
metricEvent->SetTag(prometheus::LC_SCRAPER, mLoongCollectorScraper);
for (const auto& [k, v] : targetTags) {
metricEvent->SetTag(k, v);
}
}

} // namespace logtail
15 changes: 5 additions & 10 deletions core/plugin/processor/inner/ProcessorPromRelabelMetricNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "models/PipelineEventGroup.h"
#include "models/PipelineEventPtr.h"
#include "pipeline/plugin/interface/Processor.h"
#include "prometheus/labels/Relabel.h"
#include "prometheus/schedulers/ScrapeConfig.h"

namespace logtail {
class ProcessorPromRelabelMetricNative : public Processor {
Expand All @@ -36,23 +36,18 @@ class ProcessorPromRelabelMetricNative : public Processor {
bool IsSupportedEvent(const PipelineEventPtr& e) const override;

private:
bool ProcessEvent(PipelineEventPtr& e, StringView instance);
bool ProcessEvent(PipelineEventPtr& e, const GroupTags& targetTags);

void AddAutoMetrics(PipelineEventGroup& metricGroup);
void AddMetric(PipelineEventGroup& metricGroup,
const std::string& name,
double value,
time_t timestamp,
uint32_t nanoSec,
StringView instance);
const GroupTags& targetTags);

std::vector<RelabelConfig> mRelabelConfigs;

// from config
std::string mJobName;
int64_t mScrapeTimeoutSeconds;
int64_t mSampleLimit;
int64_t mSeriesLimit;
std::unique_ptr<ScrapeConfig> mScrapeConfigPtr;
std::string mLoongCollectorScraper;

#ifdef APSARA_UNIT_TEST_MAIN
friend class ProcessorPromRelabelMetricNativeUnittest;
Expand Down
Loading

0 comments on commit bf7de40

Please sign in to comment.