Skip to content

Commit

Permalink
Add a prometheus label mapping component
Browse files Browse the repository at this point in the history
  • Loading branch information
vaxvms committed Nov 4, 2024
1 parent 0b41558 commit 9f21a6a
Show file tree
Hide file tree
Showing 5 changed files with 451 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Main (unreleased)
- (_Experimental_) Add a `prometheus.write.queue` component to add an alternative to `prometheus.remote_write`
which allowing the writing of metrics to a prometheus endpoint. (@mattdurham)

- Add a `prometheus.mapping` component to add a label based on a source_label and a mapping table. (@vaxvms)

### Enhancements

- The `mimir.rules.kubernetes` component now supports adding extra label matchers
Expand Down
139 changes: 139 additions & 0 deletions docs/sources/reference/components/prometheus/prometheus.mapping.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
---
canonical: https://grafana.com/docs/alloy/latest/reference/components/prometheus/prometheus.mapping/
aliases:
- ../prometheus.mapping/ # /docs/alloy/latest/reference/components/prometheus.mapping/
description: Learn about prometheus.mapping
title: prometheus.mapping
---

# prometheus.mapping

Prometheus metrics follow the [OpenMetrics](https://openmetrics.io/) format.
Each time series is uniquely identified by its metric name, plus optional
key-value pairs called labels. Each sample represents a datapoint in the
time series and contains a value and an optional timestamp.
```
<metric name>{<label_1>=<label_val_1>, <label_2>=<label_val_2> ...} <value> [timestamp]
```

The `prometheus.mapping` component create a new label on each metric passed
along to the exported receiver by applying a mapping table to a label value.

The most common use of `prometheus.mapping` is to create a new label based filter Prometheus metrics or
standardize the label set that is passed to one or more downstream
receivers. The `rule` blocks are applied to the label set of each metric in
order of their appearance in the configuration file. The configured rules can
be retrieved by calling the function in the `rules` export field.

Multiple `prometheus.mapping` components can be specified by giving them
different labels.

## Usage

```alloy
prometheus.mapping "LABEL" {
forward_to = RECEIVER_LIST
source_label = "labelA"
target_label = "labelB"
mapping = {
"from" = "to",
...
}
}
```

## Arguments

The following arguments are supported:

Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`forward_to` | `list(MetricsReceiver)` | Where the metrics should be forwarded to, after relabeling takes place. | | yes
`source_label` | `string` | Name of the source label to use for mapping. | | yes
`target_label` | `string` | Name of the target label to use for mapping. | | yes
`mapping` | `map(string,string)` | Mapping from source label value to target labek value. | | yes

## Exported fields

The following fields are exported and can be referenced by other components:

Name | Type | Description
---- | ---- | -----------
`receiver` | `MetricsReceiver` | The input receiver where samples are sent to be relabeled.

## Component health

`prometheus.mapping` is only reported as unhealthy if given an invalid
configuration. In those cases, exported fields are kept at their last healthy
values.

## Debug information

`prometheus.mapping` does not expose any component-specific debug information.

## Debug metrics

* `prometheus_mapping_metrics_processed` (counter): Total number of metrics processed.
* `prometheus_mapping_metrics_written` (counter): Total number of metrics written.

## Example

Let's create an instance of a see `prometheus.mapping` component and see how
it acts on the following metrics.

```alloy
prometheus.mapping "keep_backend_only" {
forward_to = [prometheus.remote_write.onprem.receiver]
source_label = "app"
target_label = "team"
mapping = {
"frontend" = "teamA"
"backend" = "teamB"
"database" = "teamC"
}
}
```

```
metric_a{__address__ = "localhost", instance = "development", app = "frontend"} 10
metric_a{__address__ = "localhost", instance = "development", app = "backend"} 2
metric_a{__address__ = "cluster_a", instance = "production", app = "frontend"} 7
metric_a{__address__ = "cluster_a", instance = "production", app = "backend"} 9
metric_a{__address__ = "cluster_b", instance = "production", app = "database"} 4
```

After applying the mapping a new label `team` is created based on mapping table
and `app` label value.

```
metric_a{team = "teamA", __address__ = "localhost", instance = "development", app = "frontend"} 10
metric_a{team = "teamB", __address__ = "localhost", instance = "development", app = "backend"} 2
metric_a{host = "teamA", __address__ = "cluster_a", instance = "production", app = "frontend"} 7
metric_a{host = "teamA", __address__ = "cluster_a", instance = "production", app = "backend"} 9
metric_a{host = "teamC", __address__ = "cluster_a", instance = "production", app = "database"} 4
```

The resulting metrics are then propagated to each receiver defined in the
`forward_to` argument.
<!-- START GENERATED COMPATIBLE COMPONENTS -->

## Compatible components

`prometheus.mapping` can accept arguments from the following components:

- Components that export [Prometheus `MetricsReceiver`](../../../compatibility/#prometheus-metricsreceiver-exporters)

`prometheus.mapping` has exports that can be consumed by the following components:

- Components that consume [Prometheus `MetricsReceiver`](../../../compatibility/#prometheus-metricsreceiver-consumers)

{{< admonition type="note" >}}
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly.
Refer to the linked documentation for more details.
{{< /admonition >}}

<!-- END GENERATED COMPATIBLE COMPONENTS -->
1 change: 1 addition & 0 deletions internal/component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ import (
_ "github.com/grafana/alloy/internal/component/prometheus/exporter/statsd" // Import prometheus.exporter.statsd
_ "github.com/grafana/alloy/internal/component/prometheus/exporter/unix" // Import prometheus.exporter.unix
_ "github.com/grafana/alloy/internal/component/prometheus/exporter/windows" // Import prometheus.exporter.windows
_ "github.com/grafana/alloy/internal/component/prometheus/mapping" // Import prometheus.mapping
_ "github.com/grafana/alloy/internal/component/prometheus/operator/podmonitors" // Import prometheus.operator.podmonitors
_ "github.com/grafana/alloy/internal/component/prometheus/operator/probes" // Import prometheus.operator.probes
_ "github.com/grafana/alloy/internal/component/prometheus/operator/servicemonitors" // Import prometheus.operator.servicemonitors
Expand Down
227 changes: 227 additions & 0 deletions internal/component/prometheus/mapping/mapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package mapping

import (
"context"
"fmt"
"sync"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/prometheus"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
prometheus_client "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"go.uber.org/atomic"
)

const name = "prometheus.mapping"

func init() {
component.Register(component.Registration{
Name: name,
Stability: featuregate.StabilityGenerallyAvailable,
Args: Arguments{},
Exports: Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}

// Arguments holds values which are used to configure the prometheus.relabel
// component.
type Arguments struct {
// Where the relabelled metrics should be forwarded to.
ForwardTo []storage.Appendable `alloy:"forward_to,attr"`

// Labels to use for mapping
SourceLabel string `alloy:"source_label,attr"`
TargetLabel string `alloy:"target_label,attr"`

// Mapping
LabelValuesMapping map[string]string `alloy:"mapping,attr"`
}

// SetToDefault implements syntax.Defaulter.
func (arg *Arguments) SetToDefault() {
}

// Validate implements syntax.Validator.
func (arg *Arguments) Validate() error {
return nil
}

// Exports holds values which are exported by the prometheus.relabel component.
type Exports struct {
Receiver storage.Appendable `alloy:"receiver,attr"`
}

// Component implements the prometheus.mapping component.
type Component struct {
sourceLabel string
targetLabel string

mappings map[string]string
mut sync.RWMutex
opts component.Options
receiver *prometheus.Interceptor
metricsProcessed prometheus_client.Counter
metricsOutgoing prometheus_client.Counter
fanout *prometheus.Fanout
exited atomic.Bool
ls labelstore.LabelStore

debugDataPublisher livedebugging.DebugDataPublisher
}

var (
_ component.Component = (*Component)(nil)
_ component.LiveDebugging = (*Component)(nil)
)

// New creates a new prometheus.relabel component.
func New(o component.Options, args Arguments) (*Component, error) {
debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName)
if err != nil {
return nil, err
}

data, err := o.GetServiceData(labelstore.ServiceName)
if err != nil {
return nil, err
}
c := &Component{
opts: o,
ls: data.(labelstore.LabelStore),
sourceLabel: args.SourceLabel,
targetLabel: args.TargetLabel,
mappings: args.LabelValuesMapping,
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
}
c.metricsProcessed = prometheus_client.NewCounter(prometheus_client.CounterOpts{
Name: "alloy_prometheus_mapping_metrics_processed",
Help: "Total number of metrics processed",
})
c.metricsOutgoing = prometheus_client.NewCounter(prometheus_client.CounterOpts{
Name: "alloy_prometheus_mapping_metrics_written",
Help: "Total number of metrics written",
})

for _, metric := range []prometheus_client.Collector{c.metricsProcessed, c.metricsOutgoing} {
err = o.Registerer.Register(metric)
if err != nil {
return nil, err
}
}

c.fanout = prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer, c.ls)
c.receiver = prometheus.NewInterceptor(
c.fanout,
c.ls,
prometheus.WithAppendHook(func(_ storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) {
if c.exited.Load() {
return 0, fmt.Errorf("%s has exited", o.ID)
}

newLbl := c.mapping(v, l)
if newLbl.IsEmpty() {
return 0, nil
}
c.metricsOutgoing.Inc()
return next.Append(0, newLbl, t, v)
}),
prometheus.WithExemplarHook(func(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) {
if c.exited.Load() {
return 0, fmt.Errorf("%s has exited", o.ID)
}

newLbl := c.mapping(0, l)
if newLbl.IsEmpty() {
return 0, nil
}
return next.AppendExemplar(0, newLbl, e)
}),
prometheus.WithMetadataHook(func(_ storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) {
if c.exited.Load() {
return 0, fmt.Errorf("%s has exited", o.ID)
}

newLbl := c.mapping(0, l)
if newLbl.IsEmpty() {
return 0, nil
}
return next.UpdateMetadata(0, newLbl, m)
}),
prometheus.WithHistogramHook(func(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error) {
if c.exited.Load() {
return 0, fmt.Errorf("%s has exited", o.ID)
}

newLbl := c.mapping(0, l)
if newLbl.IsEmpty() {
return 0, nil
}
return next.AppendHistogram(0, newLbl, t, h, fh)
}),
)

// Immediately export the receiver which remains the same for the component
// lifetime.
o.OnStateChange(Exports{Receiver: c.receiver})

// Call to Update() to set the relabelling rules once at the start.
if err = c.Update(args); err != nil {
return nil, err
}

return c, nil
}

// Run implements component.Component.
func (c *Component) Run(ctx context.Context) error {
defer c.exited.Store(true)

<-ctx.Done()
return nil
}

// Update implements component.Component.
func (c *Component) Update(args component.Arguments) error {
c.mut.Lock()
defer c.mut.Unlock()

newArgs := args.(Arguments)
c.sourceLabel = newArgs.SourceLabel
c.targetLabel = newArgs.TargetLabel
c.mappings = newArgs.LabelValuesMapping
c.fanout.UpdateChildren(newArgs.ForwardTo)

c.opts.OnStateChange(Exports{Receiver: c.receiver})

return nil
}

func (c *Component) mapping(val float64, lbls labels.Labels) labels.Labels {
// Relabel against a copy of the labels to prevent modifying the original
// slice.
lb := labels.NewBuilder(lbls.Copy())
sourceValue := lb.Get(c.sourceLabel)
targetValue := c.mappings[sourceValue]
lb.Set(c.targetLabel, targetValue)
newLabels := lb.Labels()

componentID := livedebugging.ComponentID(c.opts.ID)
if c.debugDataPublisher.IsActive(componentID) {
c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", lbls.String(), newLabels.String()))
}

return newLabels
}

func (c *Component) LiveDebugging(_ int) {}
Loading

0 comments on commit 9f21a6a

Please sign in to comment.