diff --git a/CHANGELOG.md b/CHANGELOG.md
index 92ccc88bcd..ea2b042112 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -14,6 +14,8 @@ Main (unreleased)
- Add the possibility to export span events as logs in `otelcol.connector.spanlogs`. (@steve-hb)
+- Add `pyroscope.relabel` component to modify or filter profiles using Prometheus relabeling rules. (@marcsanmi)
+
### Enhancements
- (_Experimental_) Log instance label key in `database_observability.mysql` (@cristiangreco)
diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md
index aa9a3b043c..3a22b11990 100644
--- a/docs/sources/reference/compatibility/_index.md
+++ b/docs/sources/reference/compatibility/_index.md
@@ -392,6 +392,7 @@ The following components, grouped by namespace, _export_ Pyroscope `ProfilesRece
{{< collapse title="pyroscope" >}}
+- [pyroscope.relabel](../components/pyroscope/pyroscope.relabel)
- [pyroscope.write](../components/pyroscope/pyroscope.write)
{{< /collapse >}}
@@ -408,6 +409,7 @@ The following components, grouped by namespace, _consume_ Pyroscope `ProfilesRec
- [pyroscope.ebpf](../components/pyroscope/pyroscope.ebpf)
- [pyroscope.java](../components/pyroscope/pyroscope.java)
- [pyroscope.receive_http](../components/pyroscope/pyroscope.receive_http)
+- [pyroscope.relabel](../components/pyroscope/pyroscope.relabel)
- [pyroscope.scrape](../components/pyroscope/pyroscope.scrape)
{{< /collapse >}}
diff --git a/docs/sources/reference/components/pyroscope/pyroscope.relabel.md b/docs/sources/reference/components/pyroscope/pyroscope.relabel.md
new file mode 100644
index 0000000000..45125c9362
--- /dev/null
+++ b/docs/sources/reference/components/pyroscope/pyroscope.relabel.md
@@ -0,0 +1,137 @@
+---
+canonical: https://grafana.com/docs/alloy/latest/reference/components/pyroscope/pyroscope.relabel/
+aliases:
+ - ../pyroscope.relabel/ # /docs/alloy/latest/reference/components/pyroscope.relabel/
+description: Learn about pyroscope.relabel
+title: pyroscope.relabel
+---
+
+Public preview
+
+# `pyroscope.relabel`
+
+The `pyroscope.relabel` component rewrites the label set of each profile passed to its receiver by applying one or more relabeling rules and forwards the results to the list of receivers.
+
+If no rules are defined or applicable to some profiles, then those profiles are forwarded as-is to each receiver passed in the component's arguments.
+The profile is dropped if no labels remain after the relabeling rules are applied.
+
+The most common use of `pyroscope.relabel` is to filter profiles or standardize the label set that is passed to one or more downstream receivers. The `rule` blocks are applied to the label set of each profile in order of their appearance in the configuration file.
+
+## Usage
+
+```alloy
+pyroscope.relabel "LABEL" {
+ forward_to =
+
+ rule {
+ ...
+ }
+
+ ...
+}
+```
+
+## Arguments
+
+You can use the following arguments with `pyroscope.relabel`:
+
+| Name | Type | Description | Default | Required |
+| ---------------- | ---------------------------- | --------------------------------------------------------- | ------- | -------- |
+| `forward_to` | `list(pyroscope.Appendable)` | List of receivers to forward profiles to after relabeling | | yes |
+| `max_cache_size` | `number` | Maximum number of entries in the label cache | 10000 | no |
+
+## Blocks
+
+You can use the following blocks with `pyroscope.relabel`:
+
+| Name | Description | Required |
+| -------------- | ------------------------------------------------------ | -------- |
+| [`rule`][rule] | Relabeling rules to apply to received profile entries. | no |
+
+[rule]: #rule
+
+### rule
+
+{{< docs/shared lookup="reference/components/rule-block.md" source="alloy" version="" >}}
+
+## Exported fields
+
+The following fields are exported and can be referenced by other components:
+
+| Name | Type | Description |
+| ---------- | ------------------ | ------------------------------------------------ |
+| `receiver` | `ProfilesReceiver` | A receiver that accepts profiles for relabeling. |
+| `rules` | `[]relabel.Config` | The list of relabeling rules. |
+
+## Component health
+
+`pyroscope.relabel` is reported as unhealthy if it is given an invalid configuration.
+
+## Debug metrics
+
+* `pyroscope_relabel_cache_hits` (counter): Total number of cache hits.
+* `pyroscope_relabel_cache_misses` (counter): Total number of cache misses.
+* `pyroscope_relabel_cache_size` (gauge): Total size of relabel cache.
+* `pyroscope_relabel_profiles_dropped` (counter): Total number of profiles dropped by relabeling rules.
+* `pyroscope_relabel_profiles_processed` (counter): Total number of profiles processed.
+* `pyroscope_relabel_profiles_written` (counter): Total number of profiles forwarded.
+
+## Example
+
+```alloy
+pyroscope.receive_http "default" {
+ forward_to = [pyroscope.relabel.filter_profiles.receiver]
+
+ http {
+ listen_address = "0.0.0.0"
+ listen_port = 9999
+ }
+}
+
+pyroscope.relabel "filter_profiles" {
+ forward_to = [pyroscope.write.staging.receiver]
+
+ // This creates a consistent hash value (0 or 1) for each unique combination of labels
+ // Using multiple source labels provides better sampling distribution across your profiles
+ rule {
+ source_labels = ["env"]
+ target_label = "__tmp_hash"
+ action = "hashmod"
+ modulus = 2
+ }
+
+ // This effectively samples ~50% of profile series
+ // The same combination of source label values will always hash to the same number,
+ // ensuring consistent sampling
+ rule {
+ source_labels = ["__tmp_hash"]
+ action = "drop"
+ regex = "^1$"
+ }
+}
+
+pyroscope.write "staging" {
+ endpoint {
+ url = "http://pyroscope-staging:4040"
+ }
+}
+```
+
+
+
+## Compatible components
+
+`pyroscope.relabel` can accept arguments from the following components:
+
+- Components that export [Pyroscope `ProfilesReceiver`](../../../compatibility/#pyroscope-profilesreceiver-exporters)
+
+`pyroscope.relabel` has exports that can be consumed by the following components:
+
+- Components that consume [Pyroscope `ProfilesReceiver`](../../../compatibility/#pyroscope-profilesreceiver-consumers)
+
+{{< admonition type="note" >}}
+Connecting some components may not be sensible or components may require further configuration to make the connection work correctly.
+Refer to the linked documentation for more details.
+{{< /admonition >}}
+
+
diff --git a/internal/component/all/all.go b/internal/component/all/all.go
index d82838a2ab..b8df19f4d1 100644
--- a/internal/component/all/all.go
+++ b/internal/component/all/all.go
@@ -146,6 +146,7 @@ import (
_ "github.com/grafana/alloy/internal/component/pyroscope/ebpf" // Import pyroscope.ebpf
_ "github.com/grafana/alloy/internal/component/pyroscope/java" // Import pyroscope.java
_ "github.com/grafana/alloy/internal/component/pyroscope/receive_http" // Import pyroscope.receive_http
+ _ "github.com/grafana/alloy/internal/component/pyroscope/relabel" // Import pyroscope.relabel
_ "github.com/grafana/alloy/internal/component/pyroscope/scrape" // Import pyroscope.scrape
_ "github.com/grafana/alloy/internal/component/pyroscope/write" // Import pyroscope.write
_ "github.com/grafana/alloy/internal/component/remote/http" // Import remote.http
diff --git a/internal/component/pyroscope/appender.go b/internal/component/pyroscope/appender.go
index a856e80b0b..28eb62795f 100644
--- a/internal/component/pyroscope/appender.go
+++ b/internal/component/pyroscope/appender.go
@@ -37,6 +37,7 @@ type IncomingProfile struct {
Body io.ReadCloser
Headers http.Header
URL *url.URL
+ Labels labels.Labels
}
var _ Appendable = (*Fanout)(nil)
diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go
index 4e969694dc..8f17b54a85 100644
--- a/internal/component/pyroscope/receive_http/receive_http.go
+++ b/internal/component/pyroscope/receive_http/receive_http.go
@@ -25,6 +25,7 @@ import (
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
+ "github.com/grafana/pyroscope/api/model/labelset"
)
const (
@@ -218,6 +219,26 @@ func (c *Component) getAppendables() []pyroscope.Appendable {
func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
appendables := c.getAppendables()
+ // Parse labels early
+ var lbls labels.Labels
+ if nameParam := r.URL.Query().Get("name"); nameParam != "" {
+ ls, err := labelset.Parse(nameParam)
+ if err != nil {
+ level.Warn(c.opts.Logger).Log(
+ "msg", "Failed to parse labels from name parameter",
+ "name", nameParam,
+ "err", err,
+ )
+ // Continue with empty labels instead of returning an error
+ } else {
+ var labelPairs []labels.Label
+ for k, v := range ls.Labels() {
+ labelPairs = append(labelPairs, labels.Label{Name: k, Value: v})
+ }
+ lbls = labels.New(labelPairs...)
+ }
+ }
+
// Create a pipe for each appendable
pipeWriters := make([]io.Writer, len(appendables))
pipeReaders := make([]io.Reader, len(appendables))
@@ -251,6 +272,7 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
Body: io.NopCloser(pipeReaders[i]),
Headers: r.Header.Clone(),
URL: r.URL,
+ Labels: lbls,
}
err := appendable.Appender().AppendIngest(ctx, profile)
diff --git a/internal/component/pyroscope/receive_http/receive_http_test.go b/internal/component/pyroscope/receive_http/receive_http_test.go
index 255189d4a2..46a3098124 100644
--- a/internal/component/pyroscope/receive_http/receive_http_test.go
+++ b/internal/component/pyroscope/receive_http/receive_http_test.go
@@ -25,6 +25,7 @@ import (
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
+ "github.com/grafana/pyroscope/api/model/labelset"
)
// TestForwardsProfilesIngest verifies the behavior of the
@@ -44,13 +45,14 @@ func TestForwardsProfilesIngest(t *testing.T) {
appendableErrors []error
expectedStatus int
expectedForwards int
+ expectedLabels map[string]string
}{
{
name: "Small profile",
profileSize: 1024, // 1KB
method: "POST",
path: "/ingest",
- queryParams: "name=test_app_1&from=1234567890&until=1234567900",
+ queryParams: "name=test_app&from=1234567890&until=1234567900",
headers: map[string]string{"Content-Type": "application/octet-stream"},
appendableErrors: []error{nil, nil},
expectedStatus: http.StatusOK,
@@ -61,7 +63,7 @@ func TestForwardsProfilesIngest(t *testing.T) {
profileSize: 1024 * 1024, // 1MB
method: "POST",
path: "/ingest",
- queryParams: "name=test_app_2&from=1234567891&until=1234567901&custom=param1",
+ queryParams: "name=test_app&from=1234567891&until=1234567901&custom=param1",
headers: map[string]string{"X-Scope-OrgID": "1234"},
appendableErrors: []error{nil},
expectedStatus: http.StatusOK,
@@ -72,7 +74,7 @@ func TestForwardsProfilesIngest(t *testing.T) {
profileSize: 1024,
method: "GET",
path: "/ingest",
- queryParams: "name=test_app_3&from=1234567892&until=1234567902",
+ queryParams: "name=test_app&from=1234567892&until=1234567902",
headers: map[string]string{},
appendableErrors: []error{nil, nil},
expectedStatus: http.StatusMethodNotAllowed,
@@ -94,7 +96,7 @@ func TestForwardsProfilesIngest(t *testing.T) {
profileSize: 1024,
method: "POST",
path: "/invalid",
- queryParams: "name=test_app_4&from=1234567893&until=1234567903",
+ queryParams: "name=test_app&from=1234567893&until=1234567903",
headers: map[string]string{"Content-Type": "application/octet-stream"},
appendableErrors: []error{nil, nil},
expectedStatus: http.StatusNotFound,
@@ -105,7 +107,7 @@ func TestForwardsProfilesIngest(t *testing.T) {
profileSize: 2048,
method: "POST",
path: "/ingest",
- queryParams: "name=test_app_5&from=1234567894&until=1234567904&scenario=all_fail",
+ queryParams: "name=test_app&from=1234567894&until=1234567904&scenario=all_fail",
headers: map[string]string{"Content-Type": "application/octet-stream", "X-Test": "fail-all"},
appendableErrors: []error{fmt.Errorf("error1"), fmt.Errorf("error2")},
expectedStatus: http.StatusInternalServerError,
@@ -116,12 +118,42 @@ func TestForwardsProfilesIngest(t *testing.T) {
profileSize: 4096,
method: "POST",
path: "/ingest",
- queryParams: "name=test_app_6&from=1234567895&until=1234567905&scenario=partial_failure",
+ queryParams: "name=test_app&from=1234567895&until=1234567905&scenario=partial_failure",
headers: map[string]string{"X-Custom-ID": "test-6"},
appendableErrors: []error{fmt.Errorf("error"), nil},
expectedStatus: http.StatusInternalServerError,
expectedForwards: 2,
},
+ {
+ name: "Valid labels are parsed and forwarded",
+ profileSize: 1024,
+ method: "POST",
+ path: "/ingest",
+ queryParams: "name=test.app{env=prod,region=us-east}",
+ headers: map[string]string{"Content-Type": "application/octet-stream"},
+ appendableErrors: []error{nil, nil},
+ expectedStatus: http.StatusOK,
+ expectedForwards: 2,
+ expectedLabels: map[string]string{
+ "__name__": "test.app",
+ "env": "prod",
+ "region": "us-east",
+ },
+ },
+ {
+ name: "Invalid labels still forward profile",
+ profileSize: 1024,
+ method: "POST",
+ path: "/ingest",
+ queryParams: "name=test.app{invalid-label-syntax}",
+ headers: map[string]string{"Content-Type": "application/octet-stream"},
+ appendableErrors: []error{nil, nil},
+ expectedStatus: http.StatusOK,
+ expectedForwards: 2,
+ expectedLabels: map[string]string{
+ "__name__": "test.app", // Only __name__ is preserved
+ },
+ },
}
for _, tt := range tests {
@@ -136,7 +168,7 @@ func TestForwardsProfilesIngest(t *testing.T) {
require.Equal(t, tt.expectedForwards, forwardedCount, "Unexpected number of forwards")
if tt.expectedForwards > 0 {
- verifyForwardedProfiles(t, appendables, testProfile, tt.headers, tt.queryParams)
+ verifyForwardedProfiles(t, appendables, testProfile, tt.headers, tt.queryParams, tt.expectedLabels)
}
})
}
@@ -296,11 +328,26 @@ func verifyForwardedProfiles(
expectedProfile []byte,
expectedHeaders map[string]string,
expectedQueryParams string,
+ expectedLabels map[string]string,
) {
for i, app := range appendables {
testApp, ok := app.(*testAppender)
require.True(t, ok, "Appendable is not a testAppender")
+ // Verify labels if name parameter exists and is valid
+ if nameParam := testApp.lastProfile.URL.Query().Get("name"); nameParam != "" {
+ ls, err := labelset.Parse(nameParam)
+ if err == nil {
+ require.Equal(t, ls.Labels(), testApp.lastProfile.Labels.Map(),
+ "Labels mismatch for appendable %d", i)
+ }
+ }
+
+ if expectedLabels != nil {
+ require.Equal(t, expectedLabels, testApp.lastProfile.Labels.Map(),
+ "Labels mismatch for appendable %d", i)
+ }
+
if testApp.lastProfile != nil {
// Verify profile body
body, err := io.ReadAll(testApp.lastProfile.Body)
@@ -446,6 +493,7 @@ func (a *testAppender) AppendIngest(_ context.Context, profile *pyroscope.Incomi
Body: io.NopCloser(&buf),
Headers: profile.Headers,
URL: profile.URL,
+ Labels: profile.Labels,
}
a.lastProfile = newProfile
diff --git a/internal/component/pyroscope/relabel/metrics.go b/internal/component/pyroscope/relabel/metrics.go
new file mode 100644
index 0000000000..f85737a957
--- /dev/null
+++ b/internal/component/pyroscope/relabel/metrics.go
@@ -0,0 +1,52 @@
+package relabel
+
+import "github.com/prometheus/client_golang/prometheus"
+
+type metrics struct {
+ profilesProcessed prometheus.Counter
+ profilesOutgoing prometheus.Counter
+ profilesDropped prometheus.Counter
+ cacheHits prometheus.Counter
+ cacheMisses prometheus.Counter
+ cacheSize prometheus.Gauge
+}
+
+func newMetrics(reg prometheus.Registerer) *metrics {
+ m := &metrics{
+ profilesProcessed: prometheus.NewCounter(prometheus.CounterOpts{
+ Name: "pyroscope_relabel_profiles_processed",
+ Help: "Total number of profiles processed",
+ }),
+ profilesOutgoing: prometheus.NewCounter(prometheus.CounterOpts{
+ Name: "pyroscope_relabel_profiles_written",
+ Help: "Total number of profiles forwarded",
+ }),
+ profilesDropped: prometheus.NewCounter(prometheus.CounterOpts{
+ Name: "pyroscope_relabel_profiles_dropped",
+ Help: "Total number of profiles dropped by relabeling rules",
+ }),
+ cacheHits: prometheus.NewCounter(prometheus.CounterOpts{
+ Name: "pyroscope_relabel_cache_hits",
+ Help: "Total number of cache hits",
+ }),
+ cacheMisses: prometheus.NewCounter(prometheus.CounterOpts{
+ Name: "pyroscope_relabel_cache_misses",
+ Help: "Total number of cache misses",
+ }),
+ cacheSize: prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: "pyroscope_relabel_cache_size",
+ Help: "Total size of relabel cache",
+ }),
+ }
+
+ reg.MustRegister(
+ m.profilesProcessed,
+ m.profilesOutgoing,
+ m.profilesDropped,
+ m.cacheHits,
+ m.cacheMisses,
+ m.cacheSize,
+ )
+
+ return m
+}
diff --git a/internal/component/pyroscope/relabel/relabel.go b/internal/component/pyroscope/relabel/relabel.go
new file mode 100644
index 0000000000..e991c68d33
--- /dev/null
+++ b/internal/component/pyroscope/relabel/relabel.go
@@ -0,0 +1,338 @@
+// Package relabel provides label manipulation for Pyroscope profiles.
+//
+// Label Handling:
+// The component handles two types of label representations:
+// - labels.Labels ([]Label): Used by Pyroscope and relabeling logic
+// - model.LabelSet (map[string]string): Used for efficient fingerprinting and cache lookups
+//
+// Cache Implementation:
+// The cache uses model.LabelSet's fingerprinting to store label sets efficiently.
+// Each cache entry contains both the original and relabeled labels to handle collisions
+// and avoid recomputing relabeling rules.
+package relabel
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+ "sort"
+ "sync"
+ "sync/atomic"
+
+ "github.com/grafana/alloy/internal/component"
+ alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
+ "github.com/grafana/alloy/internal/component/pyroscope"
+ "github.com/grafana/alloy/internal/featuregate"
+ "github.com/grafana/alloy/internal/runtime/logging/level"
+ "github.com/grafana/pyroscope/api/model/labelset"
+
+ lru "github.com/hashicorp/golang-lru/v2"
+ "github.com/prometheus/common/model"
+ "github.com/prometheus/prometheus/model/labels"
+ "github.com/prometheus/prometheus/model/relabel"
+)
+
+func init() {
+ component.Register(component.Registration{
+ Name: "pyroscope.relabel",
+ Stability: featuregate.StabilityPublicPreview,
+ Args: Arguments{},
+ Exports: Exports{},
+ Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
+ return New(opts, args.(Arguments))
+ },
+ })
+}
+
+type Arguments struct {
+ // Where the relabeled metrics should be forwarded to.
+ ForwardTo []pyroscope.Appendable `alloy:"forward_to,attr"`
+ // The relabelling rules to apply to each log entry before it's forwarded.
+ RelabelConfigs []*alloy_relabel.Config `alloy:"rule,block,optional"`
+ // The maximum number of items to hold in the component's LRU cache.
+ MaxCacheSize int `alloy:"max_cache_size,attr,optional"`
+}
+
+// DefaultArguments provides the default arguments for the pyroscope.relabel
+// component.
+var DefaultArguments = Arguments{
+ MaxCacheSize: 10_000,
+}
+
+// SetToDefault implements syntax.Defaulter.
+func (a *Arguments) SetToDefault() {
+ *a = DefaultArguments
+}
+
+// Exports holds values which are exported by the pyroscope.relabel component.
+type Exports struct {
+ Receiver pyroscope.Appendable `alloy:"receiver,attr"`
+ Rules alloy_relabel.Rules `alloy:"rules,attr"`
+}
+
+// Component implements the pyroscope.relabel component.
+type Component struct {
+ opts component.Options
+ metrics *metrics
+ mut sync.RWMutex
+ rcs []*relabel.Config
+ fanout *pyroscope.Fanout
+ cache *lru.Cache[model.Fingerprint, []cacheItem]
+ maxCacheSize int
+ exited atomic.Bool
+}
+
+var (
+ _ component.Component = (*Component)(nil)
+)
+
+// New creates a new pyroscope.relabel component.
+func New(o component.Options, args Arguments) (*Component, error) {
+ cache, err := lru.New[model.Fingerprint, []cacheItem](args.MaxCacheSize)
+ if err != nil {
+ return nil, err
+ }
+
+ c := &Component{
+ opts: o,
+ metrics: newMetrics(o.Registerer),
+ cache: cache,
+ maxCacheSize: args.MaxCacheSize,
+ }
+
+ c.fanout = pyroscope.NewFanout(args.ForwardTo, o.ID, o.Registerer)
+
+ o.OnStateChange(Exports{
+ Receiver: c,
+ Rules: args.RelabelConfigs,
+ })
+
+ if err := c.Update(args); err != nil {
+ return nil, err
+ }
+
+ return c, nil
+}
+
+func (c *Component) Run(ctx context.Context) error {
+ defer c.exited.Store(true)
+ <-ctx.Done()
+ return nil
+}
+
+func (c *Component) Update(args component.Arguments) error {
+ c.mut.Lock()
+ defer c.mut.Unlock()
+
+ newArgs := args.(Arguments)
+ newRCS := alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelConfigs)
+
+ // If relabeling rules changed, purge the cache
+ if relabelingChanged(c.rcs, newRCS) {
+ level.Debug(c.opts.Logger).Log("msg", "received new relabel configs, purging cache")
+ c.cache.Purge()
+ c.metrics.cacheSize.Set(0)
+ }
+
+ if newArgs.MaxCacheSize != c.maxCacheSize {
+ evicted := c.cache.Resize(newArgs.MaxCacheSize)
+ if evicted > 0 {
+ level.Debug(c.opts.Logger).Log("msg", "resizing cache led to evicting items", "evicted_count", evicted)
+ }
+ c.maxCacheSize = newArgs.MaxCacheSize
+ }
+
+ c.rcs = newRCS
+ c.fanout.UpdateChildren(newArgs.ForwardTo)
+
+ c.opts.OnStateChange(Exports{
+ Receiver: c,
+ Rules: newArgs.RelabelConfigs,
+ })
+
+ return nil
+}
+
+func (c *Component) Append(ctx context.Context, lbls labels.Labels, samples []*pyroscope.RawSample) error {
+ if c.exited.Load() {
+ return fmt.Errorf("%s has exited", c.opts.ID)
+ }
+
+ c.mut.RLock()
+ defer c.mut.RUnlock()
+
+ newLabels, keep, err := c.relabel(lbls)
+ if err != nil {
+ return err
+ }
+ if !keep {
+ level.Debug(c.opts.Logger).Log("msg", "profile dropped by relabel rules", "labels", lbls.String())
+ return nil
+ }
+
+ return c.fanout.Appender().Append(ctx, newLabels, samples)
+}
+
+func (c *Component) AppendIngest(ctx context.Context, profile *pyroscope.IncomingProfile) error {
+ if c.exited.Load() {
+ return fmt.Errorf("%s has exited", c.opts.ID)
+ }
+
+ c.mut.RLock()
+ defer c.mut.RUnlock()
+
+ c.metrics.profilesProcessed.Inc()
+
+ if profile.Labels.IsEmpty() {
+ c.metrics.profilesOutgoing.Inc()
+ return c.fanout.Appender().AppendIngest(ctx, profile)
+ }
+
+ newLabels, keep, err := c.relabel(profile.Labels)
+ if err != nil {
+ return fmt.Errorf("processing labels: %w", err)
+ }
+ if !keep {
+ c.metrics.profilesDropped.Inc()
+ level.Debug(c.opts.Logger).Log("msg", "profile dropped by relabel rules")
+ return nil
+ }
+
+ profile.Labels = newLabels
+ c.metrics.profilesOutgoing.Inc()
+ return c.fanout.Appender().AppendIngest(ctx, profile)
+}
+
+func (c *Component) Appender() pyroscope.Appender {
+ return c
+}
+
+type cacheItem struct {
+ original model.LabelSet
+ relabeled model.LabelSet
+}
+
+// relabel applies the configured relabeling rules to the input labels
+// Returns the new labels, whether to keep the profile, and any error
+func (c *Component) relabel(lbls labels.Labels) (labels.Labels, bool, error) {
+ labelSet := toModelLabelSet(lbls)
+ hash := labelSet.Fingerprint()
+
+ // Check cache
+ if result, keep, found := c.getCacheEntry(hash, labelSet); found {
+ return result, keep, nil
+ }
+
+ // Apply relabeling
+ builder := labels.NewBuilder(lbls)
+ keep := relabel.ProcessBuilder(builder, c.rcs...)
+ if !keep {
+ c.metrics.profilesDropped.Inc()
+ c.addToCache(hash, labelSet, labels.EmptyLabels())
+ return labels.EmptyLabels(), false, nil
+ }
+
+ newLabels := builder.Labels()
+
+ // Cache result
+ c.addToCache(hash, labelSet, newLabels)
+
+ return newLabels, true, nil
+}
+
+func (c *Component) getCacheEntry(hash model.Fingerprint, labelSet model.LabelSet) (labels.Labels, bool, bool) {
+ if val, ok := c.cache.Get(hash); ok {
+ for _, item := range val {
+ if labelSet.Equal(item.original) {
+ c.metrics.cacheHits.Inc()
+ if len(item.relabeled) == 0 {
+ c.metrics.profilesDropped.Inc()
+ return labels.Labels{}, false, true
+ }
+ return toLabelsLabels(item.relabeled), true, true
+ }
+ }
+ }
+ c.metrics.cacheMisses.Inc()
+ return labels.Labels{}, false, false
+}
+
+func (c *Component) addToCache(hash model.Fingerprint, original model.LabelSet, relabeled labels.Labels) {
+ var cacheValue []cacheItem
+ if val, exists := c.cache.Get(hash); exists {
+ cacheValue = val
+ }
+ cacheValue = append(cacheValue, cacheItem{
+ original: original,
+ relabeled: toModelLabelSet(relabeled),
+ })
+ c.cache.Add(hash, cacheValue)
+ c.metrics.cacheSize.Set(float64(c.cache.Len()))
+}
+
+// extractLabelsFromIncomingProfile converts profile labels to Prometheus labels
+func extractLabelsFromIncomingProfile(profile *pyroscope.IncomingProfile) (labels.Labels, error) {
+ nameParam := profile.URL.Query().Get("name")
+ if nameParam == "" {
+ return labels.Labels{}, nil
+ }
+
+ ls, err := labelset.Parse(nameParam)
+ if err != nil {
+ return labels.Labels{}, fmt.Errorf("parsing labels from name parameter: %w", err)
+ }
+
+ var lbls []labels.Label
+ for k, v := range ls.Labels() {
+ lbls = append(lbls, labels.Label{Name: k, Value: v})
+ }
+ return labels.New(lbls...), nil
+}
+
+// updateProfileLabels converts Prometheus labels back to pyroscope format
+func updateProfileLabels(profile *pyroscope.IncomingProfile, lbls labels.Labels) {
+ newLS := labelset.New(make(map[string]string))
+ for _, l := range lbls {
+ newLS.Add(l.Name, l.Value)
+ }
+
+ query := profile.URL.Query()
+ query.Set("name", newLS.Normalized())
+ profile.URL.RawQuery = query.Encode()
+}
+
+// Helper function to detect relabel config changes
+func relabelingChanged(prev, next []*relabel.Config) bool {
+ if len(prev) != len(next) {
+ return true
+ }
+ for i := range prev {
+ if !reflect.DeepEqual(prev[i], next[i]) {
+ return true
+ }
+ }
+ return false
+}
+
+// toModelLabelSet converts labels.Labels to model.LabelSet
+func toModelLabelSet(lbls labels.Labels) model.LabelSet {
+ labelSet := make(model.LabelSet, lbls.Len())
+ lbls.Range(func(l labels.Label) {
+ labelSet[model.LabelName(l.Name)] = model.LabelValue(l.Value)
+ })
+ return labelSet
+}
+
+// toLabelsLabels converts model.LabelSet to labels.Labels
+func toLabelsLabels(ls model.LabelSet) labels.Labels {
+ result := make(labels.Labels, 0, len(ls))
+ for name, value := range ls {
+ result = append(result, labels.Label{
+ Name: string(name),
+ Value: string(value),
+ })
+ }
+ // Labels need to be sorted
+ sort.Sort(result)
+ return result
+}
diff --git a/internal/component/pyroscope/relabel/relabel_test.go b/internal/component/pyroscope/relabel/relabel_test.go
new file mode 100644
index 0000000000..05fea6f3c3
--- /dev/null
+++ b/internal/component/pyroscope/relabel/relabel_test.go
@@ -0,0 +1,343 @@
+package relabel
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+
+ "github.com/grafana/alloy/internal/component"
+ alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
+ "github.com/grafana/alloy/internal/component/pyroscope"
+ "github.com/grafana/alloy/internal/util"
+ "github.com/grafana/pyroscope/api/model/labelset"
+ "github.com/grafana/regexp"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/prometheus/model/labels"
+ "github.com/stretchr/testify/require"
+)
+
+func TestRelabeling(t *testing.T) {
+ tests := []struct {
+ name string
+ rules []*alloy_relabel.Config
+ inputLabels labels.Labels
+ wantLabels labels.Labels
+ wantDropped bool
+ }{
+ {
+ name: "basic profile without labels",
+ rules: []*alloy_relabel.Config{
+ {
+ SourceLabels: []string{"foo"},
+ TargetLabel: "bar",
+ Action: "replace",
+ Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("(.+)")},
+ Replacement: "$1",
+ },
+ },
+ inputLabels: labels.EmptyLabels(),
+ wantLabels: labels.EmptyLabels(),
+ wantDropped: false,
+ },
+ {
+ name: "rename label",
+ rules: []*alloy_relabel.Config{
+ {
+ SourceLabels: []string{"foo"},
+ TargetLabel: "bar",
+ Action: "replace",
+ Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("(.+)")},
+ Replacement: "$1",
+ },
+ {
+ Action: "labeldrop",
+ Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("foo")},
+ },
+ },
+ inputLabels: labels.FromStrings("foo", "hello"),
+ wantLabels: labels.FromStrings("bar", "hello"),
+ wantDropped: false,
+ },
+ {
+ name: "drop profile with matching drop label",
+ rules: []*alloy_relabel.Config{{
+ SourceLabels: []string{"env"},
+ Action: "drop",
+ Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("dev")},
+ }},
+ inputLabels: labels.FromStrings("env", "dev", "region", "us-1"),
+ wantLabels: labels.EmptyLabels(),
+ wantDropped: true,
+ },
+ {
+ name: "keep profile with matching label",
+ rules: []*alloy_relabel.Config{{
+ SourceLabels: []string{"env"},
+ Action: "keep",
+ Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("prod")},
+ }},
+ inputLabels: labels.FromStrings("env", "prod", "region", "us-1"),
+ wantLabels: labels.FromStrings("env", "prod", "region", "us-1"),
+ wantDropped: false,
+ },
+ {
+ name: "drop profile not matching keep",
+ rules: []*alloy_relabel.Config{{
+ SourceLabels: []string{"env"},
+ Action: "keep",
+ Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("prod")},
+ }},
+ inputLabels: labels.FromStrings("env", "dev", "region", "us-1"),
+ wantLabels: labels.EmptyLabels(),
+ wantDropped: true,
+ },
+ {
+ name: "drop all labels not dropping profile",
+ rules: []*alloy_relabel.Config{
+ {
+ Action: "labeldrop",
+ Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("env|region")},
+ },
+ },
+ inputLabels: labels.FromStrings("env", "prod", "region", "us-1"),
+ wantLabels: labels.EmptyLabels(),
+ wantDropped: false,
+ },
+ {
+ name: "keep profile with no labels when using drop action",
+ rules: []*alloy_relabel.Config{
+ {
+ SourceLabels: []string{"env"},
+ Action: "drop",
+ Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("dev")},
+ },
+ },
+ inputLabels: labels.EmptyLabels(),
+ wantLabels: labels.EmptyLabels(),
+ wantDropped: false,
+ },
+ {
+ name: "hashmod sampling with profile that should pass",
+ rules: []*alloy_relabel.Config{
+ {
+ SourceLabels: []string{"env"},
+ Action: "hashmod",
+ Modulus: 2,
+ TargetLabel: "__tmp_hash",
+ },
+ {
+ SourceLabels: []string{"__tmp_hash"},
+ Action: "drop",
+ Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("^1$")},
+ },
+ },
+ // Use a value we know will hash to 1
+ inputLabels: labels.FromStrings("env", "prod", "region", "us-1"),
+ wantLabels: labels.EmptyLabels(),
+ wantDropped: true,
+ },
+ {
+ name: "multiple rules",
+ rules: []*alloy_relabel.Config{
+ {
+ SourceLabels: []string{"env"},
+ TargetLabel: "environment",
+ Action: "replace",
+ Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("(.+)")},
+ Replacement: "$1",
+ },
+ {
+ Action: "labeldrop",
+ Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("^env$")},
+ },
+ {
+ SourceLabels: []string{"region"},
+ TargetLabel: "zone",
+ Action: "replace",
+ Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("us-(.+)")},
+ Replacement: "zone-$1",
+ },
+ },
+ inputLabels: labels.FromStrings("env", "prod", "region", "us-1"),
+ wantLabels: labels.FromStrings("environment", "prod", "region", "us-1", "zone", "zone-1"),
+ wantDropped: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ app := NewTestAppender()
+
+ c, err := New(component.Options{
+ Logger: util.TestLogger(t),
+ Registerer: prometheus.NewRegistry(),
+ OnStateChange: func(e component.Exports) {},
+ }, Arguments{
+ ForwardTo: []pyroscope.Appendable{app},
+ RelabelConfigs: tt.rules,
+ MaxCacheSize: 10,
+ })
+ require.NoError(t, err)
+
+ profile := &pyroscope.IncomingProfile{
+ Labels: tt.inputLabels,
+ }
+
+ err = c.AppendIngest(context.Background(), profile)
+
+ profiles := app.Profiles()
+
+ if tt.wantDropped {
+ if errors.Is(err, labelset.ErrServiceNameIsRequired) {
+ require.Empty(t, profiles, "profile should have been dropped")
+ return
+ }
+ require.NoError(t, err)
+ require.Empty(t, profiles, "profile should have been dropped")
+ return
+ }
+
+ gotProfile := app.Profiles()[0]
+ require.Equal(t, tt.wantLabels, gotProfile.Labels)
+ })
+ }
+}
+
+func TestCache(t *testing.T) {
+ app := NewTestAppender()
+ c, err := New(component.Options{
+ Logger: util.TestLogger(t),
+ Registerer: prometheus.NewRegistry(),
+ OnStateChange: func(e component.Exports) {},
+ }, Arguments{
+ ForwardTo: []pyroscope.Appendable{app},
+ RelabelConfigs: []*alloy_relabel.Config{{
+ SourceLabels: []string{"env"},
+ Action: "replace",
+ TargetLabel: "environment",
+ Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("(.+)")},
+ Replacement: "staging",
+ }},
+ MaxCacheSize: 4,
+ })
+ require.NoError(t, err)
+
+ // Test basic cache functionality
+ labels := labels.FromStrings("env", "prod")
+ err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: labels})
+ require.NoError(t, err)
+ require.Equal(t, 1, c.cache.Len(), "cache should have 1 entry")
+
+ // Test cache hit
+ err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: labels})
+ require.NoError(t, err)
+ require.Equal(t, 1, c.cache.Len(), "cache length should not change after hit")
+}
+
+func TestCacheCollisions(t *testing.T) {
+ app := NewTestAppender()
+ c, err := New(component.Options{
+ Logger: util.TestLogger(t),
+ Registerer: prometheus.NewRegistry(),
+ OnStateChange: func(e component.Exports) {},
+ }, Arguments{
+ ForwardTo: []pyroscope.Appendable{app},
+ RelabelConfigs: []*alloy_relabel.Config{},
+ MaxCacheSize: 4,
+ })
+ require.NoError(t, err)
+
+ // These LabelSets are known to collide
+ ls1 := labels.FromStrings("A", "K6sjsNNczPl", "__name__", "app.cpu")
+ ls2 := labels.FromStrings("A", "cswpLMIZpwt", "__name__", "app.cpu")
+
+ // Verify collision
+ require.Equal(t, toModelLabelSet(ls1).Fingerprint(), toModelLabelSet(ls2).Fingerprint(),
+ "expected labelset fingerprints to collide")
+
+ // Add both colliding profiles
+ err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: ls1})
+ require.NoError(t, err)
+ err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: ls2})
+ require.NoError(t, err)
+
+ // Verify both are stored under same hash
+ hash := toModelLabelSet(ls1).Fingerprint()
+ val, ok := c.cache.Get(hash)
+ require.True(t, ok, "colliding entry should be in cache")
+ require.Len(t, val, 2, "should have both colliding items under same key")
+
+ // Verify items are stored correctly
+ require.Equal(t, toModelLabelSet(ls1), val[0].original, "first item should match ls1")
+ require.Equal(t, toModelLabelSet(ls2), val[1].original, "second item should match ls2")
+}
+
+func TestCacheLRU(t *testing.T) {
+ app := NewTestAppender()
+ c, err := New(component.Options{
+ Logger: util.TestLogger(t),
+ Registerer: prometheus.NewRegistry(),
+ OnStateChange: func(e component.Exports) {},
+ }, Arguments{
+ ForwardTo: []pyroscope.Appendable{app},
+ RelabelConfigs: []*alloy_relabel.Config{},
+ MaxCacheSize: 2,
+ })
+ require.NoError(t, err)
+
+ // Add profiles up to cache size
+ labels1 := labels.FromStrings("env", "prod")
+ labels2 := labels.FromStrings("env", "dev")
+ labels3 := labels.FromStrings("env", "stage")
+
+ err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: labels1})
+ require.NoError(t, err)
+ err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: labels2})
+ require.NoError(t, err)
+
+ // Add one more to trigger eviction
+ err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: labels3})
+ require.NoError(t, err)
+
+ // Verify size and that oldest entry was evicted
+ require.Equal(t, 2, c.cache.Len())
+ _, ok := c.cache.Get(toModelLabelSet(labels1).Fingerprint())
+ require.False(t, ok, "oldest entry should have been evicted")
+}
+
+type TestAppender struct {
+ mu sync.Mutex
+ profiles []*pyroscope.IncomingProfile
+}
+
+func NewTestAppender() *TestAppender {
+ return &TestAppender{
+ profiles: make([]*pyroscope.IncomingProfile, 0),
+ }
+}
+
+// Appender implements pyroscope.Appendable
+func (t *TestAppender) Appender() pyroscope.Appender {
+ return t
+}
+
+// Append implements pyroscope.Appender
+func (t *TestAppender) Append(_ context.Context, _ labels.Labels, _ []*pyroscope.RawSample) error {
+ return nil
+}
+
+// AppendIngest implements pyroscope.Appender
+func (t *TestAppender) AppendIngest(_ context.Context, profile *pyroscope.IncomingProfile) error {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ t.profiles = append(t.profiles, profile)
+ return nil
+}
+
+func (t *TestAppender) Profiles() []*pyroscope.IncomingProfile {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ return t.profiles
+}
diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go
index 4e0af355c1..541b61d169 100644
--- a/internal/component/pyroscope/write/write.go
+++ b/internal/component/pyroscope/write/write.go
@@ -394,11 +394,14 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco
// Handle labels
query := profile.URL.Query()
- if nameParam := query.Get("name"); nameParam != "" {
- ls, err := labelset.Parse(nameParam)
- if err != nil {
- return err
- }
+ if !profile.Labels.IsEmpty() {
+ ls := labelset.New(make(map[string]string))
+
+ profile.Labels.Range(func(l labels.Label) {
+ ls.Add(l.Name, l.Value)
+ })
+
+ // Add external labels (which will override any existing ones)
for k, v := range f.config.ExternalLabels {
ls.Add(k, v)
}
diff --git a/internal/component/pyroscope/write/write_test.go b/internal/component/pyroscope/write/write_test.go
index b2c6debdae..17c8d30351 100644
--- a/internal/component/pyroscope/write/write_test.go
+++ b/internal/component/pyroscope/write/write_test.go
@@ -20,6 +20,7 @@ import (
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
+ "github.com/grafana/pyroscope/api/model/labelset"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
@@ -277,17 +278,26 @@ func Test_Write_AppendIngest(t *testing.T) {
return func(w http.ResponseWriter, r *http.Request) {
appendCount.Inc()
require.Equal(t, expectedPath, r.URL.Path, "Unexpected path")
+
+ // Header assertions
require.Equal(t, "endpoint-value", r.Header.Get("X-Test-Header"))
require.Equal(t, []string{"profile-value1", "profile-value2"}, r.Header["X-Profile-Header"])
- query := r.URL.Query()
- name := query.Get("name")
- require.Contains(t, name, "my.awesome.app.cpu", "Base name should be preserved")
- require.Contains(t, name, "env=prod", "External label should override profile label")
- require.Contains(t, name, "cluster=cluster-1", "External label should be added")
- require.Contains(t, name, "region=us-west-1", "Profile-only label should be preserved")
- require.Equal(t, "value", query.Get("key"), "Original query parameter should be preserved")
+ // Label assertions - parse the name parameter once
+ ls, err := labelset.Parse(r.URL.Query().Get("name"))
+ require.NoError(t, err)
+ labels := ls.Labels()
+
+ // Check each label individually
+ require.Equal(t, "my.awesome.app.cpu", labels["__name__"], "Base name should be preserved")
+ require.Equal(t, "prod", labels["env"], "External label should override profile label")
+ require.Equal(t, "cluster-1", labels["cluster"], "External label should be added")
+ require.Equal(t, "us-west-1", labels["region"], "Profile-only label should be preserved")
+
+ // Check non-label query params
+ require.Equal(t, "value", r.URL.Query().Get("key"), "Original query parameter should be preserved")
+ // Body assertion
body, err := io.ReadAll(r.Body)
require.NoError(t, err, "Failed to read request body")
require.Equal(t, testData, body, "Unexpected body content")
@@ -341,8 +351,13 @@ func Test_Write_AppendIngest(t *testing.T) {
},
URL: &url.URL{
Path: "/ingest",
- RawQuery: "name=my.awesome.app.cpu{env=staging,region=us-west-1}&key=value",
+ RawQuery: "key=value",
},
+ Labels: labels.FromMap(map[string]string{
+ "__name__": "my.awesome.app.cpu",
+ "env": "staging",
+ "region": "us-west-1",
+ }),
}
err = export.Receiver.Appender().AppendIngest(context.Background(), incomingProfile)