Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "retention" feature allowing idle metrics to expire #204

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ For details of each metric type, see [Prometheus documentation](http://prometheu
- `type`: metric type (required)
- `desc`: description of this metric (required)
- `key`: key name of record for instrumentation (**optional**)
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)

If key is empty, the metric values is treated as 1, so the counter increments by 1 on each record regardless of contents of the record.
Expand All @@ -310,6 +312,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by
- `type`: metric type (required)
- `desc`: description of metric (required)
- `key`: key name of record for instrumentation (required)
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)

### summary type
Expand All @@ -332,6 +336,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by
- `type`: metric type (required)
- `desc`: description of metric (required)
- `key`: key name of record for instrumentation (required)
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)

### histogram type
Expand All @@ -356,6 +362,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by
- `desc`: description of metric (required)
- `key`: key name of record for instrumentation (required)
- `buckets`: buckets of record for instrumentation (optional)
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)

## Labels
Expand Down Expand Up @@ -430,6 +438,33 @@ Prometheus output/filter plugin can have multiple metric section. Top-level labe

In this case, `message_foo_counter` has `tag`, `hostname`, `key` and `data_type` labels.

## Retention

By default metrics with all encountered label combinations are preserved until the next restart of fluentd.
Even if a label combination did not receive any update for a long time.
That behavior is not always desirable e.g. when the contents of of fields change for good and the metric becomes idle.
For these metrics you can set `retention` and `retention_check_interval` like this:

```
<metric>
name message_foo_counter
type counter
desc The total number of foo in message.
key foo
retention 3600 # 1h
retention_check_interval 1800 # 30m
<labels>
bar ${bar}
</labels>
</metric>
```

If `${bar}` was `baz` one time but after that no records with that value were processed, then after one hour the metric
`foo{bar="baz"}` might be removed.
When this actually happens depends on `retention_check_interval` (default 60).
It causes a background thread to check every 30 minutes for expired metrics.
So worst case the metrics are removed 30 minutes after expiration.
You can set this value as low as `1`, but that may put more stress on your CPU.

## Try plugin with nginx

Expand Down
13 changes: 13 additions & 0 deletions lib/fluent/plugin/filter_prometheus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class PrometheusFilter < Fluent::Plugin::Filter
include Fluent::Plugin::PrometheusLabelParser
include Fluent::Plugin::Prometheus

helpers :thread

def initialize
super
@registry = ::Prometheus::Client.registry
Expand All @@ -22,6 +24,17 @@ def configure(conf)
@metrics = Fluent::Plugin::Prometheus.parse_metrics_elements(conf, @registry, labels)
end

def start
super
Fluent::Plugin::Prometheus.start_retention_threads(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really need to use thread?
I'm not sure but timer_execute (run it in plugin thread) might be enough to do it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phihos any update?

@metrics,
@registry,
method(:thread_create),
method(:thread_current_running?),
@log
)
end

def filter(tag, time, record)
instrument_single(tag, time, record, @metrics)
record
Expand Down
13 changes: 13 additions & 0 deletions lib/fluent/plugin/out_prometheus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class PrometheusOutput < Fluent::Plugin::Output
include Fluent::Plugin::PrometheusLabelParser
include Fluent::Plugin::Prometheus

helpers :thread

def initialize
super
@registry = ::Prometheus::Client.registry
Expand All @@ -22,6 +24,17 @@ def configure(conf)
@metrics = Fluent::Plugin::Prometheus.parse_metrics_elements(conf, @registry, labels)
end

def start
super
Fluent::Plugin::Prometheus.start_retention_threads(
@metrics,
@registry,
method(:thread_create),
method(:thread_current_running?),
@log
)
end

def process(tag, es)
instrument(tag, es, @metrics)
end
Expand Down
149 changes: 123 additions & 26 deletions lib/fluent/plugin/prometheus.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'prometheus/client'
require 'prometheus/client/formats/text'
require 'fluent/plugin/prometheus/placeholder_expander'
require 'fluent/plugin/prometheus/data_store'

module Fluent
module Plugin
Expand Down Expand Up @@ -81,6 +82,17 @@ def self.parse_metrics_elements(conf, registry, labels = {})
metrics
end

def self.start_retention_threads(metrics, registry, thread_create, thread_running, log)
metrics.select { |metric| metric.has_retention? }.each do |metric|
thread_create.call("prometheus_retention_#{metric.name}".to_sym) do
while thread_running.call()
metric.remove_expired_metrics(registry, log)
sleep(metric.retention_check_interval)
end
end
end
end

def self.placeholder_expander(log)
Fluent::Plugin::Prometheus::ExpandBuilder.new(log: log)
end
Expand All @@ -97,6 +109,11 @@ def stringify_keys(hash_to_stringify)
end.to_h
end

def initialize
super
::Prometheus::Client.config.data_store = Fluent::Plugin::Prometheus::DataStore.new
end

def configure(conf)
super
@placeholder_values = {}
Expand Down Expand Up @@ -151,6 +168,8 @@ class Metric
attr_reader :name
attr_reader :key
attr_reader :desc
attr_reader :retention
attr_reader :retention_check_interval

def initialize(element, registry, labels)
['name', 'desc'].each do |key|
Expand All @@ -162,6 +181,11 @@ def initialize(element, registry, labels)
@name = element['name']
@key = element['key']
@desc = element['desc']
@retention = element['retention'].to_i
@retention_check_interval = element.fetch('retention_check_interval', 60).to_i
if has_retention?
@last_modified_store = LastModifiedStore.new
end

@base_labels = Fluent::Plugin::Prometheus.parse_labels_elements(element)
@base_labels = labels.merge(@base_labels)
Expand Down Expand Up @@ -192,6 +216,74 @@ def self.get(registry, name, type, docstring)

metric
end

def set_value?(value)
if value
return true
end
false
end

def instrument(record, expander)
value = self.value(record)
if self.set_value?(value)
labels = labels(record, expander)
set_value(value, labels)
if has_retention?
@last_modified_store.set_last_updated(labels)
end
end
end

def has_retention?
@retention > 0
end

def remove_expired_metrics(registry, log)
if has_retention?
metric = registry.get(@name)

expiration_time = Time.now - @retention
expired_label_sets = @last_modified_store.get_labels_not_modified_since(expiration_time)

expired_label_sets.each { |expired_label_set|
log.debug "Metric #{@name} with labels #{expired_label_set} expired. Removing..."
metric.remove(expired_label_set) # this method is supplied by the require at the top of this method
@last_modified_store.remove(expired_label_set)
}
else
log.warn('remove_expired_metrics should not be called when retention is not set for this metric!')
end
end

class LastModifiedStore
def initialize
@internal_store = Hash.new
@lock = Monitor.new
end

def synchronize
@lock.synchronize { yield }
end

def set_last_updated(labels)
synchronize do
@internal_store[labels] = Time.now
end
end

def remove(labels)
synchronize do
@internal_store.delete(labels)
end
end

def get_labels_not_modified_since(time)
synchronize do
@internal_store.select { |k, v| v < time }.keys
end
end
end
end

class Gauge < Metric
Expand All @@ -208,16 +300,17 @@ def initialize(element, registry, labels)
end
end

def instrument(record, expander)
def value(record)
if @key.is_a?(String)
value = record[@key]
record[@key]
else
value = @key.call(record)
end
if value
@gauge.set(value, labels: labels(record, expander))
@key.call(record)
end
end

def set_value(value, labels)
@gauge.set(value, labels: labels)
end
end

class Counter < Metric
Expand All @@ -230,20 +323,22 @@ def initialize(element, registry, labels)
end
end

def instrument(record, expander)
# use record value of the key if key is specified, otherwise just increment
def value(record)
if @key.nil?
value = 1
1
elsif @key.is_a?(String)
value = record[@key]
record[@key]
else
value = @key.call(record)
@key.call(record)
end
end

# ignore if record value is nil
return if value.nil?
def set_value?(value)
!value.nil?
end

@counter.increment(by: value, labels: labels(record, expander))
def set_value(value, labels)
@counter.increment(by: value, labels: labels)
end
end

Expand All @@ -261,16 +356,17 @@ def initialize(element, registry, labels)
end
end

def instrument(record, expander)
def value(record)
if @key.is_a?(String)
value = record[@key]
record[@key]
else
value = @key.call(record)
end
if value
@summary.observe(value, labels: labels(record, expander))
@key.call(record)
end
end

def set_value(value, labels)
@summary.observe(value, labels: labels)
end
end

class Histogram < Metric
Expand All @@ -294,16 +390,17 @@ def initialize(element, registry, labels)
end
end

def instrument(record, expander)
def value(record)
if @key.is_a?(String)
value = record[@key]
record[@key]
else
value = @key.call(record)
end
if value
@histogram.observe(value, labels: labels(record, expander))
@key.call(record)
end
end

def set_value(value, labels)
@histogram.observe(value, labels: labels)
end
end
end
end
Expand Down
Loading