diff --git a/CHANGELOG.md b/CHANGELOG.md index 92ccc88bcd..ea2b042112 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ Main (unreleased) - Add the possibility to export span events as logs in `otelcol.connector.spanlogs`. (@steve-hb) +- Add `pyroscope.relabel` component to modify or filter profiles using Prometheus relabeling rules. (@marcsanmi) + ### Enhancements - (_Experimental_) Log instance label key in `database_observability.mysql` (@cristiangreco) diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index aa9a3b043c..3a22b11990 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -392,6 +392,7 @@ The following components, grouped by namespace, _export_ Pyroscope `ProfilesRece {{< collapse title="pyroscope" >}} +- [pyroscope.relabel](../components/pyroscope/pyroscope.relabel) - [pyroscope.write](../components/pyroscope/pyroscope.write) {{< /collapse >}} @@ -408,6 +409,7 @@ The following components, grouped by namespace, _consume_ Pyroscope `ProfilesRec - [pyroscope.ebpf](../components/pyroscope/pyroscope.ebpf) - [pyroscope.java](../components/pyroscope/pyroscope.java) - [pyroscope.receive_http](../components/pyroscope/pyroscope.receive_http) +- [pyroscope.relabel](../components/pyroscope/pyroscope.relabel) - [pyroscope.scrape](../components/pyroscope/pyroscope.scrape) {{< /collapse >}} diff --git a/docs/sources/reference/components/pyroscope/pyroscope.relabel.md b/docs/sources/reference/components/pyroscope/pyroscope.relabel.md new file mode 100644 index 0000000000..45125c9362 --- /dev/null +++ b/docs/sources/reference/components/pyroscope/pyroscope.relabel.md @@ -0,0 +1,137 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/pyroscope/pyroscope.relabel/ +aliases: + - ../pyroscope.relabel/ # /docs/alloy/latest/reference/components/pyroscope.relabel/ +description: Learn about pyroscope.relabel +title: pyroscope.relabel +--- + +Public preview + +# `pyroscope.relabel` + +The `pyroscope.relabel` component rewrites the label set of each profile passed to its receiver by applying one or more relabeling rules and forwards the results to the list of receivers. + +If no rules are defined or applicable to some profiles, then those profiles are forwarded as-is to each receiver passed in the component's arguments. +The profile is dropped if no labels remain after the relabeling rules are applied. + +The most common use of `pyroscope.relabel` is to filter profiles or standardize the label set that is passed to one or more downstream receivers. The `rule` blocks are applied to the label set of each profile in order of their appearance in the configuration file. + +## Usage + +```alloy +pyroscope.relabel "LABEL" { + forward_to = + + rule { + ... + } + + ... +} +``` + +## Arguments + +You can use the following arguments with `pyroscope.relabel`: + +| Name | Type | Description | Default | Required | +| ---------------- | ---------------------------- | --------------------------------------------------------- | ------- | -------- | +| `forward_to` | `list(pyroscope.Appendable)` | List of receivers to forward profiles to after relabeling | | yes | +| `max_cache_size` | `number` | Maximum number of entries in the label cache | 10000 | no | + +## Blocks + +You can use the following blocks with `pyroscope.relabel`: + +| Name | Description | Required | +| -------------- | ------------------------------------------------------ | -------- | +| [`rule`][rule] | Relabeling rules to apply to received profile entries. | no | + +[rule]: #rule + +### rule + +{{< docs/shared lookup="reference/components/rule-block.md" source="alloy" version="" >}} + +## Exported fields + +The following fields are exported and can be referenced by other components: + +| Name | Type | Description | +| ---------- | ------------------ | ------------------------------------------------ | +| `receiver` | `ProfilesReceiver` | A receiver that accepts profiles for relabeling. | +| `rules` | `[]relabel.Config` | The list of relabeling rules. | + +## Component health + +`pyroscope.relabel` is reported as unhealthy if it is given an invalid configuration. + +## Debug metrics + +* `pyroscope_relabel_cache_hits` (counter): Total number of cache hits. +* `pyroscope_relabel_cache_misses` (counter): Total number of cache misses. +* `pyroscope_relabel_cache_size` (gauge): Total size of relabel cache. +* `pyroscope_relabel_profiles_dropped` (counter): Total number of profiles dropped by relabeling rules. +* `pyroscope_relabel_profiles_processed` (counter): Total number of profiles processed. +* `pyroscope_relabel_profiles_written` (counter): Total number of profiles forwarded. + +## Example + +```alloy +pyroscope.receive_http "default" { + forward_to = [pyroscope.relabel.filter_profiles.receiver] + + http { + listen_address = "0.0.0.0" + listen_port = 9999 + } +} + +pyroscope.relabel "filter_profiles" { + forward_to = [pyroscope.write.staging.receiver] + + // This creates a consistent hash value (0 or 1) for each unique combination of labels + // Using multiple source labels provides better sampling distribution across your profiles + rule { + source_labels = ["env"] + target_label = "__tmp_hash" + action = "hashmod" + modulus = 2 + } + + // This effectively samples ~50% of profile series + // The same combination of source label values will always hash to the same number, + // ensuring consistent sampling + rule { + source_labels = ["__tmp_hash"] + action = "drop" + regex = "^1$" + } +} + +pyroscope.write "staging" { + endpoint { + url = "http://pyroscope-staging:4040" + } +} +``` + + + +## Compatible components + +`pyroscope.relabel` can accept arguments from the following components: + +- Components that export [Pyroscope `ProfilesReceiver`](../../../compatibility/#pyroscope-profilesreceiver-exporters) + +`pyroscope.relabel` has exports that can be consumed by the following components: + +- Components that consume [Pyroscope `ProfilesReceiver`](../../../compatibility/#pyroscope-profilesreceiver-consumers) + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/internal/component/all/all.go b/internal/component/all/all.go index d82838a2ab..b8df19f4d1 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -146,6 +146,7 @@ import ( _ "github.com/grafana/alloy/internal/component/pyroscope/ebpf" // Import pyroscope.ebpf _ "github.com/grafana/alloy/internal/component/pyroscope/java" // Import pyroscope.java _ "github.com/grafana/alloy/internal/component/pyroscope/receive_http" // Import pyroscope.receive_http + _ "github.com/grafana/alloy/internal/component/pyroscope/relabel" // Import pyroscope.relabel _ "github.com/grafana/alloy/internal/component/pyroscope/scrape" // Import pyroscope.scrape _ "github.com/grafana/alloy/internal/component/pyroscope/write" // Import pyroscope.write _ "github.com/grafana/alloy/internal/component/remote/http" // Import remote.http diff --git a/internal/component/pyroscope/appender.go b/internal/component/pyroscope/appender.go index a856e80b0b..28eb62795f 100644 --- a/internal/component/pyroscope/appender.go +++ b/internal/component/pyroscope/appender.go @@ -37,6 +37,7 @@ type IncomingProfile struct { Body io.ReadCloser Headers http.Header URL *url.URL + Labels labels.Labels } var _ Appendable = (*Fanout)(nil) diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go index 4e969694dc..8f17b54a85 100644 --- a/internal/component/pyroscope/receive_http/receive_http.go +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -25,6 +25,7 @@ import ( pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + "github.com/grafana/pyroscope/api/model/labelset" ) const ( @@ -218,6 +219,26 @@ func (c *Component) getAppendables() []pyroscope.Appendable { func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { appendables := c.getAppendables() + // Parse labels early + var lbls labels.Labels + if nameParam := r.URL.Query().Get("name"); nameParam != "" { + ls, err := labelset.Parse(nameParam) + if err != nil { + level.Warn(c.opts.Logger).Log( + "msg", "Failed to parse labels from name parameter", + "name", nameParam, + "err", err, + ) + // Continue with empty labels instead of returning an error + } else { + var labelPairs []labels.Label + for k, v := range ls.Labels() { + labelPairs = append(labelPairs, labels.Label{Name: k, Value: v}) + } + lbls = labels.New(labelPairs...) + } + } + // Create a pipe for each appendable pipeWriters := make([]io.Writer, len(appendables)) pipeReaders := make([]io.Reader, len(appendables)) @@ -251,6 +272,7 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { Body: io.NopCloser(pipeReaders[i]), Headers: r.Header.Clone(), URL: r.URL, + Labels: lbls, } err := appendable.Appender().AppendIngest(ctx, profile) diff --git a/internal/component/pyroscope/receive_http/receive_http_test.go b/internal/component/pyroscope/receive_http/receive_http_test.go index 255189d4a2..46a3098124 100644 --- a/internal/component/pyroscope/receive_http/receive_http_test.go +++ b/internal/component/pyroscope/receive_http/receive_http_test.go @@ -25,6 +25,7 @@ import ( pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + "github.com/grafana/pyroscope/api/model/labelset" ) // TestForwardsProfilesIngest verifies the behavior of the @@ -44,13 +45,14 @@ func TestForwardsProfilesIngest(t *testing.T) { appendableErrors []error expectedStatus int expectedForwards int + expectedLabels map[string]string }{ { name: "Small profile", profileSize: 1024, // 1KB method: "POST", path: "/ingest", - queryParams: "name=test_app_1&from=1234567890&until=1234567900", + queryParams: "name=test_app&from=1234567890&until=1234567900", headers: map[string]string{"Content-Type": "application/octet-stream"}, appendableErrors: []error{nil, nil}, expectedStatus: http.StatusOK, @@ -61,7 +63,7 @@ func TestForwardsProfilesIngest(t *testing.T) { profileSize: 1024 * 1024, // 1MB method: "POST", path: "/ingest", - queryParams: "name=test_app_2&from=1234567891&until=1234567901&custom=param1", + queryParams: "name=test_app&from=1234567891&until=1234567901&custom=param1", headers: map[string]string{"X-Scope-OrgID": "1234"}, appendableErrors: []error{nil}, expectedStatus: http.StatusOK, @@ -72,7 +74,7 @@ func TestForwardsProfilesIngest(t *testing.T) { profileSize: 1024, method: "GET", path: "/ingest", - queryParams: "name=test_app_3&from=1234567892&until=1234567902", + queryParams: "name=test_app&from=1234567892&until=1234567902", headers: map[string]string{}, appendableErrors: []error{nil, nil}, expectedStatus: http.StatusMethodNotAllowed, @@ -94,7 +96,7 @@ func TestForwardsProfilesIngest(t *testing.T) { profileSize: 1024, method: "POST", path: "/invalid", - queryParams: "name=test_app_4&from=1234567893&until=1234567903", + queryParams: "name=test_app&from=1234567893&until=1234567903", headers: map[string]string{"Content-Type": "application/octet-stream"}, appendableErrors: []error{nil, nil}, expectedStatus: http.StatusNotFound, @@ -105,7 +107,7 @@ func TestForwardsProfilesIngest(t *testing.T) { profileSize: 2048, method: "POST", path: "/ingest", - queryParams: "name=test_app_5&from=1234567894&until=1234567904&scenario=all_fail", + queryParams: "name=test_app&from=1234567894&until=1234567904&scenario=all_fail", headers: map[string]string{"Content-Type": "application/octet-stream", "X-Test": "fail-all"}, appendableErrors: []error{fmt.Errorf("error1"), fmt.Errorf("error2")}, expectedStatus: http.StatusInternalServerError, @@ -116,12 +118,42 @@ func TestForwardsProfilesIngest(t *testing.T) { profileSize: 4096, method: "POST", path: "/ingest", - queryParams: "name=test_app_6&from=1234567895&until=1234567905&scenario=partial_failure", + queryParams: "name=test_app&from=1234567895&until=1234567905&scenario=partial_failure", headers: map[string]string{"X-Custom-ID": "test-6"}, appendableErrors: []error{fmt.Errorf("error"), nil}, expectedStatus: http.StatusInternalServerError, expectedForwards: 2, }, + { + name: "Valid labels are parsed and forwarded", + profileSize: 1024, + method: "POST", + path: "/ingest", + queryParams: "name=test.app{env=prod,region=us-east}", + headers: map[string]string{"Content-Type": "application/octet-stream"}, + appendableErrors: []error{nil, nil}, + expectedStatus: http.StatusOK, + expectedForwards: 2, + expectedLabels: map[string]string{ + "__name__": "test.app", + "env": "prod", + "region": "us-east", + }, + }, + { + name: "Invalid labels still forward profile", + profileSize: 1024, + method: "POST", + path: "/ingest", + queryParams: "name=test.app{invalid-label-syntax}", + headers: map[string]string{"Content-Type": "application/octet-stream"}, + appendableErrors: []error{nil, nil}, + expectedStatus: http.StatusOK, + expectedForwards: 2, + expectedLabels: map[string]string{ + "__name__": "test.app", // Only __name__ is preserved + }, + }, } for _, tt := range tests { @@ -136,7 +168,7 @@ func TestForwardsProfilesIngest(t *testing.T) { require.Equal(t, tt.expectedForwards, forwardedCount, "Unexpected number of forwards") if tt.expectedForwards > 0 { - verifyForwardedProfiles(t, appendables, testProfile, tt.headers, tt.queryParams) + verifyForwardedProfiles(t, appendables, testProfile, tt.headers, tt.queryParams, tt.expectedLabels) } }) } @@ -296,11 +328,26 @@ func verifyForwardedProfiles( expectedProfile []byte, expectedHeaders map[string]string, expectedQueryParams string, + expectedLabels map[string]string, ) { for i, app := range appendables { testApp, ok := app.(*testAppender) require.True(t, ok, "Appendable is not a testAppender") + // Verify labels if name parameter exists and is valid + if nameParam := testApp.lastProfile.URL.Query().Get("name"); nameParam != "" { + ls, err := labelset.Parse(nameParam) + if err == nil { + require.Equal(t, ls.Labels(), testApp.lastProfile.Labels.Map(), + "Labels mismatch for appendable %d", i) + } + } + + if expectedLabels != nil { + require.Equal(t, expectedLabels, testApp.lastProfile.Labels.Map(), + "Labels mismatch for appendable %d", i) + } + if testApp.lastProfile != nil { // Verify profile body body, err := io.ReadAll(testApp.lastProfile.Body) @@ -446,6 +493,7 @@ func (a *testAppender) AppendIngest(_ context.Context, profile *pyroscope.Incomi Body: io.NopCloser(&buf), Headers: profile.Headers, URL: profile.URL, + Labels: profile.Labels, } a.lastProfile = newProfile diff --git a/internal/component/pyroscope/relabel/metrics.go b/internal/component/pyroscope/relabel/metrics.go new file mode 100644 index 0000000000..f85737a957 --- /dev/null +++ b/internal/component/pyroscope/relabel/metrics.go @@ -0,0 +1,52 @@ +package relabel + +import "github.com/prometheus/client_golang/prometheus" + +type metrics struct { + profilesProcessed prometheus.Counter + profilesOutgoing prometheus.Counter + profilesDropped prometheus.Counter + cacheHits prometheus.Counter + cacheMisses prometheus.Counter + cacheSize prometheus.Gauge +} + +func newMetrics(reg prometheus.Registerer) *metrics { + m := &metrics{ + profilesProcessed: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "pyroscope_relabel_profiles_processed", + Help: "Total number of profiles processed", + }), + profilesOutgoing: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "pyroscope_relabel_profiles_written", + Help: "Total number of profiles forwarded", + }), + profilesDropped: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "pyroscope_relabel_profiles_dropped", + Help: "Total number of profiles dropped by relabeling rules", + }), + cacheHits: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "pyroscope_relabel_cache_hits", + Help: "Total number of cache hits", + }), + cacheMisses: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "pyroscope_relabel_cache_misses", + Help: "Total number of cache misses", + }), + cacheSize: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "pyroscope_relabel_cache_size", + Help: "Total size of relabel cache", + }), + } + + reg.MustRegister( + m.profilesProcessed, + m.profilesOutgoing, + m.profilesDropped, + m.cacheHits, + m.cacheMisses, + m.cacheSize, + ) + + return m +} diff --git a/internal/component/pyroscope/relabel/relabel.go b/internal/component/pyroscope/relabel/relabel.go new file mode 100644 index 0000000000..e991c68d33 --- /dev/null +++ b/internal/component/pyroscope/relabel/relabel.go @@ -0,0 +1,338 @@ +// Package relabel provides label manipulation for Pyroscope profiles. +// +// Label Handling: +// The component handles two types of label representations: +// - labels.Labels ([]Label): Used by Pyroscope and relabeling logic +// - model.LabelSet (map[string]string): Used for efficient fingerprinting and cache lookups +// +// Cache Implementation: +// The cache uses model.LabelSet's fingerprinting to store label sets efficiently. +// Each cache entry contains both the original and relabeled labels to handle collisions +// and avoid recomputing relabeling rules. +package relabel + +import ( + "context" + "fmt" + "reflect" + "sort" + "sync" + "sync/atomic" + + "github.com/grafana/alloy/internal/component" + alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/pyroscope/api/model/labelset" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" +) + +func init() { + component.Register(component.Registration{ + Name: "pyroscope.relabel", + Stability: featuregate.StabilityPublicPreview, + Args: Arguments{}, + Exports: Exports{}, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +type Arguments struct { + // Where the relabeled metrics should be forwarded to. + ForwardTo []pyroscope.Appendable `alloy:"forward_to,attr"` + // The relabelling rules to apply to each log entry before it's forwarded. + RelabelConfigs []*alloy_relabel.Config `alloy:"rule,block,optional"` + // The maximum number of items to hold in the component's LRU cache. + MaxCacheSize int `alloy:"max_cache_size,attr,optional"` +} + +// DefaultArguments provides the default arguments for the pyroscope.relabel +// component. +var DefaultArguments = Arguments{ + MaxCacheSize: 10_000, +} + +// SetToDefault implements syntax.Defaulter. +func (a *Arguments) SetToDefault() { + *a = DefaultArguments +} + +// Exports holds values which are exported by the pyroscope.relabel component. +type Exports struct { + Receiver pyroscope.Appendable `alloy:"receiver,attr"` + Rules alloy_relabel.Rules `alloy:"rules,attr"` +} + +// Component implements the pyroscope.relabel component. +type Component struct { + opts component.Options + metrics *metrics + mut sync.RWMutex + rcs []*relabel.Config + fanout *pyroscope.Fanout + cache *lru.Cache[model.Fingerprint, []cacheItem] + maxCacheSize int + exited atomic.Bool +} + +var ( + _ component.Component = (*Component)(nil) +) + +// New creates a new pyroscope.relabel component. +func New(o component.Options, args Arguments) (*Component, error) { + cache, err := lru.New[model.Fingerprint, []cacheItem](args.MaxCacheSize) + if err != nil { + return nil, err + } + + c := &Component{ + opts: o, + metrics: newMetrics(o.Registerer), + cache: cache, + maxCacheSize: args.MaxCacheSize, + } + + c.fanout = pyroscope.NewFanout(args.ForwardTo, o.ID, o.Registerer) + + o.OnStateChange(Exports{ + Receiver: c, + Rules: args.RelabelConfigs, + }) + + if err := c.Update(args); err != nil { + return nil, err + } + + return c, nil +} + +func (c *Component) Run(ctx context.Context) error { + defer c.exited.Store(true) + <-ctx.Done() + return nil +} + +func (c *Component) Update(args component.Arguments) error { + c.mut.Lock() + defer c.mut.Unlock() + + newArgs := args.(Arguments) + newRCS := alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelConfigs) + + // If relabeling rules changed, purge the cache + if relabelingChanged(c.rcs, newRCS) { + level.Debug(c.opts.Logger).Log("msg", "received new relabel configs, purging cache") + c.cache.Purge() + c.metrics.cacheSize.Set(0) + } + + if newArgs.MaxCacheSize != c.maxCacheSize { + evicted := c.cache.Resize(newArgs.MaxCacheSize) + if evicted > 0 { + level.Debug(c.opts.Logger).Log("msg", "resizing cache led to evicting items", "evicted_count", evicted) + } + c.maxCacheSize = newArgs.MaxCacheSize + } + + c.rcs = newRCS + c.fanout.UpdateChildren(newArgs.ForwardTo) + + c.opts.OnStateChange(Exports{ + Receiver: c, + Rules: newArgs.RelabelConfigs, + }) + + return nil +} + +func (c *Component) Append(ctx context.Context, lbls labels.Labels, samples []*pyroscope.RawSample) error { + if c.exited.Load() { + return fmt.Errorf("%s has exited", c.opts.ID) + } + + c.mut.RLock() + defer c.mut.RUnlock() + + newLabels, keep, err := c.relabel(lbls) + if err != nil { + return err + } + if !keep { + level.Debug(c.opts.Logger).Log("msg", "profile dropped by relabel rules", "labels", lbls.String()) + return nil + } + + return c.fanout.Appender().Append(ctx, newLabels, samples) +} + +func (c *Component) AppendIngest(ctx context.Context, profile *pyroscope.IncomingProfile) error { + if c.exited.Load() { + return fmt.Errorf("%s has exited", c.opts.ID) + } + + c.mut.RLock() + defer c.mut.RUnlock() + + c.metrics.profilesProcessed.Inc() + + if profile.Labels.IsEmpty() { + c.metrics.profilesOutgoing.Inc() + return c.fanout.Appender().AppendIngest(ctx, profile) + } + + newLabels, keep, err := c.relabel(profile.Labels) + if err != nil { + return fmt.Errorf("processing labels: %w", err) + } + if !keep { + c.metrics.profilesDropped.Inc() + level.Debug(c.opts.Logger).Log("msg", "profile dropped by relabel rules") + return nil + } + + profile.Labels = newLabels + c.metrics.profilesOutgoing.Inc() + return c.fanout.Appender().AppendIngest(ctx, profile) +} + +func (c *Component) Appender() pyroscope.Appender { + return c +} + +type cacheItem struct { + original model.LabelSet + relabeled model.LabelSet +} + +// relabel applies the configured relabeling rules to the input labels +// Returns the new labels, whether to keep the profile, and any error +func (c *Component) relabel(lbls labels.Labels) (labels.Labels, bool, error) { + labelSet := toModelLabelSet(lbls) + hash := labelSet.Fingerprint() + + // Check cache + if result, keep, found := c.getCacheEntry(hash, labelSet); found { + return result, keep, nil + } + + // Apply relabeling + builder := labels.NewBuilder(lbls) + keep := relabel.ProcessBuilder(builder, c.rcs...) + if !keep { + c.metrics.profilesDropped.Inc() + c.addToCache(hash, labelSet, labels.EmptyLabels()) + return labels.EmptyLabels(), false, nil + } + + newLabels := builder.Labels() + + // Cache result + c.addToCache(hash, labelSet, newLabels) + + return newLabels, true, nil +} + +func (c *Component) getCacheEntry(hash model.Fingerprint, labelSet model.LabelSet) (labels.Labels, bool, bool) { + if val, ok := c.cache.Get(hash); ok { + for _, item := range val { + if labelSet.Equal(item.original) { + c.metrics.cacheHits.Inc() + if len(item.relabeled) == 0 { + c.metrics.profilesDropped.Inc() + return labels.Labels{}, false, true + } + return toLabelsLabels(item.relabeled), true, true + } + } + } + c.metrics.cacheMisses.Inc() + return labels.Labels{}, false, false +} + +func (c *Component) addToCache(hash model.Fingerprint, original model.LabelSet, relabeled labels.Labels) { + var cacheValue []cacheItem + if val, exists := c.cache.Get(hash); exists { + cacheValue = val + } + cacheValue = append(cacheValue, cacheItem{ + original: original, + relabeled: toModelLabelSet(relabeled), + }) + c.cache.Add(hash, cacheValue) + c.metrics.cacheSize.Set(float64(c.cache.Len())) +} + +// extractLabelsFromIncomingProfile converts profile labels to Prometheus labels +func extractLabelsFromIncomingProfile(profile *pyroscope.IncomingProfile) (labels.Labels, error) { + nameParam := profile.URL.Query().Get("name") + if nameParam == "" { + return labels.Labels{}, nil + } + + ls, err := labelset.Parse(nameParam) + if err != nil { + return labels.Labels{}, fmt.Errorf("parsing labels from name parameter: %w", err) + } + + var lbls []labels.Label + for k, v := range ls.Labels() { + lbls = append(lbls, labels.Label{Name: k, Value: v}) + } + return labels.New(lbls...), nil +} + +// updateProfileLabels converts Prometheus labels back to pyroscope format +func updateProfileLabels(profile *pyroscope.IncomingProfile, lbls labels.Labels) { + newLS := labelset.New(make(map[string]string)) + for _, l := range lbls { + newLS.Add(l.Name, l.Value) + } + + query := profile.URL.Query() + query.Set("name", newLS.Normalized()) + profile.URL.RawQuery = query.Encode() +} + +// Helper function to detect relabel config changes +func relabelingChanged(prev, next []*relabel.Config) bool { + if len(prev) != len(next) { + return true + } + for i := range prev { + if !reflect.DeepEqual(prev[i], next[i]) { + return true + } + } + return false +} + +// toModelLabelSet converts labels.Labels to model.LabelSet +func toModelLabelSet(lbls labels.Labels) model.LabelSet { + labelSet := make(model.LabelSet, lbls.Len()) + lbls.Range(func(l labels.Label) { + labelSet[model.LabelName(l.Name)] = model.LabelValue(l.Value) + }) + return labelSet +} + +// toLabelsLabels converts model.LabelSet to labels.Labels +func toLabelsLabels(ls model.LabelSet) labels.Labels { + result := make(labels.Labels, 0, len(ls)) + for name, value := range ls { + result = append(result, labels.Label{ + Name: string(name), + Value: string(value), + }) + } + // Labels need to be sorted + sort.Sort(result) + return result +} diff --git a/internal/component/pyroscope/relabel/relabel_test.go b/internal/component/pyroscope/relabel/relabel_test.go new file mode 100644 index 0000000000..05fea6f3c3 --- /dev/null +++ b/internal/component/pyroscope/relabel/relabel_test.go @@ -0,0 +1,343 @@ +package relabel + +import ( + "context" + "errors" + "sync" + "testing" + + "github.com/grafana/alloy/internal/component" + alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/pyroscope/api/model/labelset" + "github.com/grafana/regexp" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestRelabeling(t *testing.T) { + tests := []struct { + name string + rules []*alloy_relabel.Config + inputLabels labels.Labels + wantLabels labels.Labels + wantDropped bool + }{ + { + name: "basic profile without labels", + rules: []*alloy_relabel.Config{ + { + SourceLabels: []string{"foo"}, + TargetLabel: "bar", + Action: "replace", + Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("(.+)")}, + Replacement: "$1", + }, + }, + inputLabels: labels.EmptyLabels(), + wantLabels: labels.EmptyLabels(), + wantDropped: false, + }, + { + name: "rename label", + rules: []*alloy_relabel.Config{ + { + SourceLabels: []string{"foo"}, + TargetLabel: "bar", + Action: "replace", + Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("(.+)")}, + Replacement: "$1", + }, + { + Action: "labeldrop", + Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("foo")}, + }, + }, + inputLabels: labels.FromStrings("foo", "hello"), + wantLabels: labels.FromStrings("bar", "hello"), + wantDropped: false, + }, + { + name: "drop profile with matching drop label", + rules: []*alloy_relabel.Config{{ + SourceLabels: []string{"env"}, + Action: "drop", + Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("dev")}, + }}, + inputLabels: labels.FromStrings("env", "dev", "region", "us-1"), + wantLabels: labels.EmptyLabels(), + wantDropped: true, + }, + { + name: "keep profile with matching label", + rules: []*alloy_relabel.Config{{ + SourceLabels: []string{"env"}, + Action: "keep", + Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("prod")}, + }}, + inputLabels: labels.FromStrings("env", "prod", "region", "us-1"), + wantLabels: labels.FromStrings("env", "prod", "region", "us-1"), + wantDropped: false, + }, + { + name: "drop profile not matching keep", + rules: []*alloy_relabel.Config{{ + SourceLabels: []string{"env"}, + Action: "keep", + Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("prod")}, + }}, + inputLabels: labels.FromStrings("env", "dev", "region", "us-1"), + wantLabels: labels.EmptyLabels(), + wantDropped: true, + }, + { + name: "drop all labels not dropping profile", + rules: []*alloy_relabel.Config{ + { + Action: "labeldrop", + Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("env|region")}, + }, + }, + inputLabels: labels.FromStrings("env", "prod", "region", "us-1"), + wantLabels: labels.EmptyLabels(), + wantDropped: false, + }, + { + name: "keep profile with no labels when using drop action", + rules: []*alloy_relabel.Config{ + { + SourceLabels: []string{"env"}, + Action: "drop", + Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("dev")}, + }, + }, + inputLabels: labels.EmptyLabels(), + wantLabels: labels.EmptyLabels(), + wantDropped: false, + }, + { + name: "hashmod sampling with profile that should pass", + rules: []*alloy_relabel.Config{ + { + SourceLabels: []string{"env"}, + Action: "hashmod", + Modulus: 2, + TargetLabel: "__tmp_hash", + }, + { + SourceLabels: []string{"__tmp_hash"}, + Action: "drop", + Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("^1$")}, + }, + }, + // Use a value we know will hash to 1 + inputLabels: labels.FromStrings("env", "prod", "region", "us-1"), + wantLabels: labels.EmptyLabels(), + wantDropped: true, + }, + { + name: "multiple rules", + rules: []*alloy_relabel.Config{ + { + SourceLabels: []string{"env"}, + TargetLabel: "environment", + Action: "replace", + Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("(.+)")}, + Replacement: "$1", + }, + { + Action: "labeldrop", + Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("^env$")}, + }, + { + SourceLabels: []string{"region"}, + TargetLabel: "zone", + Action: "replace", + Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("us-(.+)")}, + Replacement: "zone-$1", + }, + }, + inputLabels: labels.FromStrings("env", "prod", "region", "us-1"), + wantLabels: labels.FromStrings("environment", "prod", "region", "us-1", "zone", "zone-1"), + wantDropped: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + app := NewTestAppender() + + c, err := New(component.Options{ + Logger: util.TestLogger(t), + Registerer: prometheus.NewRegistry(), + OnStateChange: func(e component.Exports) {}, + }, Arguments{ + ForwardTo: []pyroscope.Appendable{app}, + RelabelConfigs: tt.rules, + MaxCacheSize: 10, + }) + require.NoError(t, err) + + profile := &pyroscope.IncomingProfile{ + Labels: tt.inputLabels, + } + + err = c.AppendIngest(context.Background(), profile) + + profiles := app.Profiles() + + if tt.wantDropped { + if errors.Is(err, labelset.ErrServiceNameIsRequired) { + require.Empty(t, profiles, "profile should have been dropped") + return + } + require.NoError(t, err) + require.Empty(t, profiles, "profile should have been dropped") + return + } + + gotProfile := app.Profiles()[0] + require.Equal(t, tt.wantLabels, gotProfile.Labels) + }) + } +} + +func TestCache(t *testing.T) { + app := NewTestAppender() + c, err := New(component.Options{ + Logger: util.TestLogger(t), + Registerer: prometheus.NewRegistry(), + OnStateChange: func(e component.Exports) {}, + }, Arguments{ + ForwardTo: []pyroscope.Appendable{app}, + RelabelConfigs: []*alloy_relabel.Config{{ + SourceLabels: []string{"env"}, + Action: "replace", + TargetLabel: "environment", + Regex: alloy_relabel.Regexp{Regexp: regexp.MustCompile("(.+)")}, + Replacement: "staging", + }}, + MaxCacheSize: 4, + }) + require.NoError(t, err) + + // Test basic cache functionality + labels := labels.FromStrings("env", "prod") + err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: labels}) + require.NoError(t, err) + require.Equal(t, 1, c.cache.Len(), "cache should have 1 entry") + + // Test cache hit + err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: labels}) + require.NoError(t, err) + require.Equal(t, 1, c.cache.Len(), "cache length should not change after hit") +} + +func TestCacheCollisions(t *testing.T) { + app := NewTestAppender() + c, err := New(component.Options{ + Logger: util.TestLogger(t), + Registerer: prometheus.NewRegistry(), + OnStateChange: func(e component.Exports) {}, + }, Arguments{ + ForwardTo: []pyroscope.Appendable{app}, + RelabelConfigs: []*alloy_relabel.Config{}, + MaxCacheSize: 4, + }) + require.NoError(t, err) + + // These LabelSets are known to collide + ls1 := labels.FromStrings("A", "K6sjsNNczPl", "__name__", "app.cpu") + ls2 := labels.FromStrings("A", "cswpLMIZpwt", "__name__", "app.cpu") + + // Verify collision + require.Equal(t, toModelLabelSet(ls1).Fingerprint(), toModelLabelSet(ls2).Fingerprint(), + "expected labelset fingerprints to collide") + + // Add both colliding profiles + err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: ls1}) + require.NoError(t, err) + err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: ls2}) + require.NoError(t, err) + + // Verify both are stored under same hash + hash := toModelLabelSet(ls1).Fingerprint() + val, ok := c.cache.Get(hash) + require.True(t, ok, "colliding entry should be in cache") + require.Len(t, val, 2, "should have both colliding items under same key") + + // Verify items are stored correctly + require.Equal(t, toModelLabelSet(ls1), val[0].original, "first item should match ls1") + require.Equal(t, toModelLabelSet(ls2), val[1].original, "second item should match ls2") +} + +func TestCacheLRU(t *testing.T) { + app := NewTestAppender() + c, err := New(component.Options{ + Logger: util.TestLogger(t), + Registerer: prometheus.NewRegistry(), + OnStateChange: func(e component.Exports) {}, + }, Arguments{ + ForwardTo: []pyroscope.Appendable{app}, + RelabelConfigs: []*alloy_relabel.Config{}, + MaxCacheSize: 2, + }) + require.NoError(t, err) + + // Add profiles up to cache size + labels1 := labels.FromStrings("env", "prod") + labels2 := labels.FromStrings("env", "dev") + labels3 := labels.FromStrings("env", "stage") + + err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: labels1}) + require.NoError(t, err) + err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: labels2}) + require.NoError(t, err) + + // Add one more to trigger eviction + err = c.AppendIngest(context.Background(), &pyroscope.IncomingProfile{Labels: labels3}) + require.NoError(t, err) + + // Verify size and that oldest entry was evicted + require.Equal(t, 2, c.cache.Len()) + _, ok := c.cache.Get(toModelLabelSet(labels1).Fingerprint()) + require.False(t, ok, "oldest entry should have been evicted") +} + +type TestAppender struct { + mu sync.Mutex + profiles []*pyroscope.IncomingProfile +} + +func NewTestAppender() *TestAppender { + return &TestAppender{ + profiles: make([]*pyroscope.IncomingProfile, 0), + } +} + +// Appender implements pyroscope.Appendable +func (t *TestAppender) Appender() pyroscope.Appender { + return t +} + +// Append implements pyroscope.Appender +func (t *TestAppender) Append(_ context.Context, _ labels.Labels, _ []*pyroscope.RawSample) error { + return nil +} + +// AppendIngest implements pyroscope.Appender +func (t *TestAppender) AppendIngest(_ context.Context, profile *pyroscope.IncomingProfile) error { + t.mu.Lock() + defer t.mu.Unlock() + t.profiles = append(t.profiles, profile) + return nil +} + +func (t *TestAppender) Profiles() []*pyroscope.IncomingProfile { + t.mu.Lock() + defer t.mu.Unlock() + return t.profiles +} diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 4e0af355c1..541b61d169 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -394,11 +394,14 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco // Handle labels query := profile.URL.Query() - if nameParam := query.Get("name"); nameParam != "" { - ls, err := labelset.Parse(nameParam) - if err != nil { - return err - } + if !profile.Labels.IsEmpty() { + ls := labelset.New(make(map[string]string)) + + profile.Labels.Range(func(l labels.Label) { + ls.Add(l.Name, l.Value) + }) + + // Add external labels (which will override any existing ones) for k, v := range f.config.ExternalLabels { ls.Add(k, v) } diff --git a/internal/component/pyroscope/write/write_test.go b/internal/component/pyroscope/write/write_test.go index b2c6debdae..17c8d30351 100644 --- a/internal/component/pyroscope/write/write_test.go +++ b/internal/component/pyroscope/write/write_test.go @@ -20,6 +20,7 @@ import ( pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + "github.com/grafana/pyroscope/api/model/labelset" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" @@ -277,17 +278,26 @@ func Test_Write_AppendIngest(t *testing.T) { return func(w http.ResponseWriter, r *http.Request) { appendCount.Inc() require.Equal(t, expectedPath, r.URL.Path, "Unexpected path") + + // Header assertions require.Equal(t, "endpoint-value", r.Header.Get("X-Test-Header")) require.Equal(t, []string{"profile-value1", "profile-value2"}, r.Header["X-Profile-Header"]) - query := r.URL.Query() - name := query.Get("name") - require.Contains(t, name, "my.awesome.app.cpu", "Base name should be preserved") - require.Contains(t, name, "env=prod", "External label should override profile label") - require.Contains(t, name, "cluster=cluster-1", "External label should be added") - require.Contains(t, name, "region=us-west-1", "Profile-only label should be preserved") - require.Equal(t, "value", query.Get("key"), "Original query parameter should be preserved") + // Label assertions - parse the name parameter once + ls, err := labelset.Parse(r.URL.Query().Get("name")) + require.NoError(t, err) + labels := ls.Labels() + + // Check each label individually + require.Equal(t, "my.awesome.app.cpu", labels["__name__"], "Base name should be preserved") + require.Equal(t, "prod", labels["env"], "External label should override profile label") + require.Equal(t, "cluster-1", labels["cluster"], "External label should be added") + require.Equal(t, "us-west-1", labels["region"], "Profile-only label should be preserved") + + // Check non-label query params + require.Equal(t, "value", r.URL.Query().Get("key"), "Original query parameter should be preserved") + // Body assertion body, err := io.ReadAll(r.Body) require.NoError(t, err, "Failed to read request body") require.Equal(t, testData, body, "Unexpected body content") @@ -341,8 +351,13 @@ func Test_Write_AppendIngest(t *testing.T) { }, URL: &url.URL{ Path: "/ingest", - RawQuery: "name=my.awesome.app.cpu{env=staging,region=us-west-1}&key=value", + RawQuery: "key=value", }, + Labels: labels.FromMap(map[string]string{ + "__name__": "my.awesome.app.cpu", + "env": "staging", + "region": "us-west-1", + }), } err = export.Receiver.Appender().AppendIngest(context.Background(), incomingProfile)