From 03b13c5d9b7eccee668791d04b00dcb1a0e9bc65 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Mon, 11 Nov 2024 11:35:26 +0000 Subject: [PATCH] Prototype targets.merge function (array.combine_maps) (#1826) * Prototype map.inner_join function * Rename to "targets.merge", remove unnecessary params * Add failure tests * Delete incorrect comment * rename targets.merge to array.combine_maps, mark it as experimental, make is more permissive and add tests * update changelog * Update CHANGELOG.md Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> * Update docs/sources/reference/stdlib/array.md Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> * Update docs/sources/reference/stdlib/array.md Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> * Update docs/sources/reference/stdlib/array.md Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> * rename mapCombine to combineMaps * document panic * add equal test * add more tests * Update syntax/internal/stdlib/stdlib.go Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> * Update syntax/internal/stdlib/stdlib.go Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> * add examples in doc * fix error propagation * remove value nul on len function * refactor code into a traversal function * update doc to avoid modifying the experimental shared doc --------- Co-authored-by: William Dumont Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> --- CHANGELOG.md | 3 + docs/sources/reference/stdlib/array.md | 51 ++++++ .../controller/component_references.go | 24 ++- .../runtime/internal/controller/loader.go | 2 +- syntax/internal/stdlib/stdlib.go | 145 +++++++++++++++++- syntax/internal/value/value.go | 15 +- syntax/internal/value/value_test.go | 6 + syntax/vm/error.go | 3 + syntax/vm/vm.go | 6 + syntax/vm/vm_stdlib_test.go | 133 ++++++++++++++++ 10 files changed, 383 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c8019e348..b2b15938fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ v1.5.0-rc.0 ### Features - Add the function `path_join` to the stdlib. (@wildum) + - Add `pyroscope.receive_http` component to receive and forward Pyroscope profiles (@marcsanmi) - Add support to `loki.source.syslog` for the RFC3164 format ("BSD syslog"). (@sushain97) @@ -47,6 +48,8 @@ v1.5.0-rc.0 - (_Experimental_) Add a `prometheus.write.queue` component to add an alternative to `prometheus.remote_write` which allowing the writing of metrics to a prometheus endpoint. (@mattdurham) +- (_Experimental_) Add the `arrary.combine_maps` function to the stdlib. (@ptodev, @wildum) + ### Enhancements - The `mimir.rules.kubernetes` component now supports adding extra label matchers diff --git a/docs/sources/reference/stdlib/array.md b/docs/sources/reference/stdlib/array.md index 482cc60e65..b9fb947bdc 100644 --- a/docs/sources/reference/stdlib/array.md +++ b/docs/sources/reference/stdlib/array.md @@ -32,3 +32,54 @@ Elements within the list can be any type. > array.concat([[1, 2], [3, 4]], [[5, 6]]) [[1, 2], [3, 4], [5, 6]] ``` + +## array.combine_maps + +> **EXPERIMENTAL**: This is an [experimental][] feature. Experimental +> features are subject to frequent breaking changes, and may be removed with +> no equivalent replacement. The `stability.level` flag must be set to `experimental` +> to use the feature. + +The `array.combine_maps` function allows you to join two arrays of maps if certain keys have matching values in both maps. It's particularly useful when combining labels of targets coming from different `prometheus.discovery.*` or `prometheus.exporter.*` components. +It takes three arguments: + +* The first two arguments are a of type `list(map(string))`. The keys of the map are strings. + The value for each key could be of any Alloy type such as a `string`, `integer`, `map`, or a `capsule`. +* The third input is an `array` containing strings. The strings are the keys whose value has to match for maps to be combined. + +The maps that don't contain all the keys provided in the third argument will be discarded. When maps are combined and both contain the same keys, the last value from the second argument will be used. + +Pseudo function code: +``` +for every map in arg1: + for every map in arg2: + if the condition key matches in both: + merge maps and add to result +``` + +### Examples + +```alloy +> array.combine_maps([{"instance"="1.1.1.1", "team"="A"}], [{"instance"="1.1.1.1", "cluster"="prod"}], ["instance"]) +[{"instance"="1.1.1.1", "team"="A", "cluster"="prod"}] + +// Second map overrides the team in the first map +> array.combine_maps([{"instance"="1.1.1.1", "team"="A"}], [{"instance"="1.1.1.1", "team"="B"}], ["instance"]) +[{"instance"="1.1.1.1", "team"="B"}] + +// If multiple maps from the first argument match with multiple maps from the second argument, different combinations will be created. +> array.combine_maps([{"instance"="1.1.1.1", "team"="A"}, {"instance"="1.1.1.1", "team"="B"}], [{"instance"="1.1.1.1", "cluster"="prod"}, {"instance"="1.1.1.1", "cluster"="ops"}], ["instance"]) +[{"instance"="1.1.1.1", "team"="A", "cluster"="prod"}, {"instance"="1.1.1.1", "team"="A", "cluster"="ops"}, {"instance"="1.1.1.1", "team"="B", "cluster"="prod"}, {"instance"="1.1.1.1", "team"="B", "cluster"="ops"}] +``` + +Examples using discovery and exporter components: +```alloy +> array.combine_maps(discovery.kubernetes.k8s_pods.targets, prometheus.exporter.postgres, ["instance"]) + +> array.combine_maps(prometheus.exporter.redis.default.targets, [{"instance"="1.1.1.1", "testLabelKey" = "testLabelVal"}], ["instance"]) +``` + +You can find more examples in the [tests][]. + +[tests]: https://github.com/grafana/alloy/blob/main/syntax/vm/vm_stdlib_test.go +[experimental]: https://grafana.com/docs/release-life-cycle/ \ No newline at end of file diff --git a/internal/runtime/internal/controller/component_references.go b/internal/runtime/internal/controller/component_references.go index cc5205dfdc..5754878042 100644 --- a/internal/runtime/internal/controller/component_references.go +++ b/internal/runtime/internal/controller/component_references.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/internal/dag" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/syntax/ast" @@ -18,6 +19,17 @@ import ( // will be (field_a, field_b, field_c). type Traversal []*ast.Ident +// String returns a dot-separated string representation of the field names in the traversal. +// For example, a traversal of fields [field_a, field_b, field_c] returns "field_a.field_b.field_c". +// Returns an empty string if the traversal contains no fields. +func (t Traversal) String() string { + var fieldNames []string + for _, field := range t { + fieldNames = append(fieldNames, field.Name) + } + return strings.Join(fieldNames, ".") +} + // Reference describes an Alloy expression reference to a BlockNode. type Reference struct { Target BlockNode // BlockNode being referenced @@ -29,7 +41,7 @@ type Reference struct { // ComponentReferences returns the list of references a component is making to // other components. -func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scope) ([]Reference, diag.Diagnostics) { +func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scope, minStability featuregate.Stability) ([]Reference, diag.Diagnostics) { var ( traversals []Traversal @@ -63,6 +75,16 @@ func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scop refs = append(refs, ref) } else if scope.IsStdlibDeprecated(t[0].Name) { level.Warn(l).Log("msg", "this stdlib function is deprecated; please refer to the documentation for updated usage and alternatives", "function", t[0].Name) + } else if funcName := t.String(); scope.IsStdlibExperimental(funcName) { + if err := featuregate.CheckAllowed(featuregate.StabilityExperimental, minStability, funcName); err != nil { + diags = append(diags, diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: err.Error(), + StartPos: ast.StartPos(t[0]).Position(), + EndPos: ast.StartPos(t[len(t)-1]).Position(), + }) + continue + } } } diff --git a/internal/runtime/internal/controller/loader.go b/internal/runtime/internal/controller/loader.go index 8cbe0061fe..fae75f5865 100644 --- a/internal/runtime/internal/controller/loader.go +++ b/internal/runtime/internal/controller/loader.go @@ -615,7 +615,7 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics { // Finally, wire component references. l.cache.mut.RLock() - refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.scope) + refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.scope, l.globals.MinStability) l.cache.mut.RUnlock() for _, ref := range refs { g.AddEdge(dag.Edge{From: n, To: ref.Target}) diff --git a/syntax/internal/stdlib/stdlib.go b/syntax/internal/stdlib/stdlib.go index 995e538d4b..85dadd9c67 100644 --- a/syntax/internal/stdlib/stdlib.go +++ b/syntax/internal/stdlib/stdlib.go @@ -17,7 +17,15 @@ import ( "gopkg.in/yaml.v3" ) -// There identifiers are deprecated in favour of the namespaced ones. +// TODO: refactor the stdlib to have consistent naming between namespaces and identifiers. + +// ExperimentalIdentifiers contains the full name (namespace + identifier's name) of stdlib +// identifiers that are considered "experimental". +var ExperimentalIdentifiers = map[string]bool{ + "array.combine_maps": true, +} + +// These identifiers are deprecated in favour of the namespaced ones. var DeprecatedIdentifiers = map[string]interface{}{ "env": os.Getenv, "nonsensitive": nonSensitive, @@ -86,7 +94,8 @@ var str = map[string]interface{}{ } var array = map[string]interface{}{ - "concat": concat, + "concat": concat, + "combine_maps": combineMaps, } var convert = map[string]interface{}{ @@ -146,6 +155,138 @@ var concat = value.RawFunction(func(funcValue value.Value, args ...value.Value) return value.Array(raw...), nil }) +// This function assumes that the types of the value.Value objects are correct. +func shouldJoin(left value.Value, right value.Value, conditions value.Value) (bool, error) { + for i := 0; i < conditions.Len(); i++ { + condition := conditions.Index(i).Text() + + leftVal, ok := left.Key(condition) + if !ok { + return false, nil + } + + rightVal, ok := right.Key(condition) + if !ok { + return false, nil + } + + if !leftVal.Equal(rightVal) { + return false, nil + } + } + return true, nil +} + +// Merge two maps. +// If a key exists in both maps, the value from the right map will be used. +func concatMaps(left, right value.Value) (value.Value, error) { + res := make(map[string]value.Value) + + for _, key := range left.Keys() { + val, ok := left.Key(key) + if !ok { + return value.Null, fmt.Errorf("concatMaps: key %s not found in left map while iterating - this should never happen", key) + } + res[key] = val + } + + for _, key := range right.Keys() { + val, ok := right.Key(key) + if !ok { + return value.Null, fmt.Errorf("concatMaps: key %s not found in right map while iterating - this should never happen", key) + } + res[key] = val + } + + return value.Object(res), nil +} + +// Inputs: +// args[0]: []map[string]string: lhs array +// args[1]: []map[string]string: rhs array +// args[2]: []string: merge conditions +var combineMaps = value.RawFunction(func(funcValue value.Value, args ...value.Value) (value.Value, error) { + if len(args) != 3 { + return value.Value{}, fmt.Errorf("combine_maps: expected 3 arguments, got %d", len(args)) + } + + // Validate args[0] and args[1] + for i := range []int{0, 1} { + if args[i].Type() != value.TypeArray { + return value.Null, value.ArgError{ + Function: funcValue, + Argument: args[i], + Index: i, + Inner: value.TypeError{ + Value: args[i], + Expected: value.TypeArray, + }, + } + } + for j := 0; j < args[i].Len(); j++ { + if args[i].Index(j).Type() != value.TypeObject { + return value.Null, value.ArgError{ + Function: funcValue, + Argument: args[i].Index(j), + Index: j, + Inner: value.TypeError{ + Value: args[i].Index(j), + Expected: value.TypeObject, + }, + } + } + } + } + + // Validate args[2] + if args[2].Type() != value.TypeArray { + return value.Null, value.ArgError{ + Function: funcValue, + Argument: args[2], + Index: 2, + Inner: value.TypeError{ + Value: args[2], + Expected: value.TypeArray, + }, + } + } + if args[2].Len() == 0 { + return value.Null, value.ArgError{ + Function: funcValue, + Argument: args[2], + Index: 2, + Inner: fmt.Errorf("combine_maps: merge conditions must not be empty"), + } + } + + // We cannot preallocate the size of the result array, because we don't know + // how well the merge is going to go. If none of the merge conditions are met, + // the result array will be empty. + res := []value.Value{} + + for i := 0; i < args[0].Len(); i++ { + for j := 0; j < args[1].Len(); j++ { + left := args[0].Index(i) + right := args[1].Index(j) + + join, err := shouldJoin(left, right, args[2]) + if err != nil { + return value.Null, err + } + + if join { + val, err := concatMaps(left, right) + if err != nil { + return value.Null, err + } + res = append(res, val) + } + } + } + + return value.Array(res...), nil +}) + func jsonDecode(in string) (interface{}, error) { var res interface{} err := json.Unmarshal([]byte(in), &res) diff --git a/syntax/internal/value/value.go b/syntax/internal/value/value.go index 3c8554b88c..829449370e 100644 --- a/syntax/internal/value/value.go +++ b/syntax/internal/value/value.go @@ -396,7 +396,7 @@ func (v Value) Key(key string) (index Value, ok bool) { // // An ArgError will be returned if one of the arguments is invalid. An Error // will be returned if the function call returns an error or if the number of -// arguments doesn't match. +// arguments doesn't match func (v Value) Call(args ...Value) (Value, error) { if v.ty != TypeFunction { panic("syntax/value: Call called on non-function type") @@ -553,3 +553,16 @@ func convertGoNumber(nval Number, target reflect.Type) reflect.Value { panic("unsupported number conversion") } + +// Equal will result in panic if the values are funcs, maps or slices +func (v Value) Equal(rhs Value) bool { + if v.Type() != rhs.Type() { + return false + } + + if !v.rv.Equal(rhs.rv) { + return false + } + + return true +} diff --git a/syntax/internal/value/value_test.go b/syntax/internal/value/value_test.go index fbebcabdd7..ec52f192c9 100644 --- a/syntax/internal/value/value_test.go +++ b/syntax/internal/value/value_test.go @@ -140,6 +140,12 @@ func TestValue_Call(t *testing.T) { require.Equal(t, int64(15+43), res.Int()) }) + t.Run("equal - string", func(t *testing.T) { + v := value.String("aa") + w := value.String("aa") + require.True(t, v.Equal(w)) + }) + t.Run("fully variadic", func(t *testing.T) { add := func(nums ...int) int { var sum int diff --git a/syntax/vm/error.go b/syntax/vm/error.go index 7f3ada3d5a..38d9528a5e 100644 --- a/syntax/vm/error.go +++ b/syntax/vm/error.go @@ -44,6 +44,9 @@ func makeDiagnostic(err error, assoc map[value.Value]ast.Node) error { case value.FieldError: fmt.Fprintf(&expr, ".%s", ne.Field) val = ne.Value + case value.ArgError: + message = ne.Inner.Error() + val = ne.Argument } cause = val diff --git a/syntax/vm/vm.go b/syntax/vm/vm.go index 71b2893ccc..f8d0e44341 100644 --- a/syntax/vm/vm.go +++ b/syntax/vm/vm.go @@ -509,3 +509,9 @@ func (s *Scope) IsStdlibDeprecated(name string) bool { _, exist := stdlib.DeprecatedIdentifiers[name] return exist } + +// IsStdlibExperimental returns true if the scoped identifier is experimental. +func (s *Scope) IsStdlibExperimental(fullName string) bool { + _, exist := stdlib.ExperimentalIdentifiers[fullName] + return exist +} diff --git a/syntax/vm/vm_stdlib_test.go b/syntax/vm/vm_stdlib_test.go index e8009ae157..36a74c29ab 100644 --- a/syntax/vm/vm_stdlib_test.go +++ b/syntax/vm/vm_stdlib_test.go @@ -39,6 +39,99 @@ func TestVM_Stdlib(t *testing.T) { {"encoding.from_yaml nil field", "encoding.from_yaml(`foo: null`)", map[string]interface{}{"foo": nil}}, {"encoding.from_yaml nil array element", `encoding.from_yaml("[0, null]")`, []interface{}{0, nil}}, {"encoding.from_base64", `encoding.from_base64("Zm9vYmFyMTIzIT8kKiYoKSctPUB+")`, string(`foobar123!?$*&()'-=@~`)}, + + // Map tests + { + // Basic case. No conflicting key/val pairs. + "array.combine_maps", + `array.combine_maps([{"a" = "a1", "b" = "b1"}], [{"a" = "a1", "c" = "c1"}], ["a"])`, + []map[string]interface{}{{"a": "a1", "b": "b1", "c": "c1"}}, + }, + { + // The first array has 2 maps, each with the same key/val pairs. + "array.combine_maps", + `array.combine_maps([{"a" = "a1", "b" = "b1"}, {"a" = "a1", "b" = "b1"}], [{"a" = "a1", "c" = "c1"}], ["a"])`, + []map[string]interface{}{{"a": "a1", "b": "b1", "c": "c1"}, {"a": "a1", "b": "b1", "c": "c1"}}, + }, + { + // Non-unique merge criteria. + "array.combine_maps", + `array.combine_maps([{"pod" = "a", "lbl" = "q"}, {"pod" = "b", "lbl" = "q"}], [{"pod" = "c", "lbl" = "q"}, {"pod" = "d", "lbl" = "q"}], ["lbl"])`, + []map[string]interface{}{{"lbl": "q", "pod": "c"}, {"lbl": "q", "pod": "d"}, {"lbl": "q", "pod": "c"}, {"lbl": "q", "pod": "d"}}, + }, + { + // Basic case. Integer and string values. + "array.combine_maps", + `array.combine_maps([{"a" = 1, "b" = 2.2}], [{"a" = 1, "c" = "c1"}], ["a"])`, + []map[string]interface{}{{"a": 1, "b": 2.2, "c": "c1"}}, + }, + { + // The second map will override a value from the first. + "array.combine_maps", + `array.combine_maps([{"a" = 1, "b" = 2.2}], [{"a" = 1, "b" = "3.3"}], ["a"])`, + []map[string]interface{}{{"a": 1, "b": "3.3"}}, + }, + { + // Not enough matches for a join. + "array.combine_maps", + `array.combine_maps([{"a" = 1, "b" = 2.2}], [{"a" = 2, "b" = "3.3"}], ["a"])`, + []map[string]interface{}{}, + }, + { + // Not enough matches for a join. + // The "a" value has differing types. + "array.combine_maps", + `array.combine_maps([{"a" = 1, "b" = 2.2}], [{"a" = "1", "b" = "3.3"}], ["a"])`, + []map[string]interface{}{}, + }, + { + // Basic case. Some values are arrays and maps. + "array.combine_maps", + `array.combine_maps([{"a" = 1, "b" = [1,2,3]}], [{"a" = 1, "c" = {"d" = {"e" = 10}}}], ["a"])`, + []map[string]interface{}{{"a": 1, "b": []interface{}{1, 2, 3}, "c": map[string]interface{}{"d": map[string]interface{}{"e": 10}}}}, + }, + { + // Join key not present in ARG2 + "array.combine_maps", + `array.combine_maps([{"a" = 1, "n" = 1.1}], [{"a" = 1, "n" = 2.1}, {"n" = 2.2}], ["a"])`, + []map[string]interface{}{{"a": 1, "n": 2.1}}, + }, + { + // Join key not present in ARG1 + "array.combine_maps", + `array.combine_maps([{"a" = 1, "n" = 1.1}, {"n" = 1.2}], [{"a" = 1, "n" = 2.1}], ["a"])`, + []map[string]interface{}{{"a": 1, "n": 2.1}}, + }, + { + // Join with multiple keys + "array.combine_maps", + `array.combine_maps([{"a" = 1, "b" = 3, "n" = 1.1}], [{"a" = 1, "b" = 3, "n" = 2.1}], ["a", "b"])`, + []map[string]interface{}{{"a": 1, "b": 3, "n": 2.1}}, + }, + { + // Join with multiple keys + // Some maps don't match all keys + "array.combine_maps", + `array.combine_maps([{"a" = 1, "n" = 1.1}, {"a" = 1, "b" = 3, "n" = 1.1}, {"b" = 3, "n" = 1.1}], [{"a" = 1, "n" = 2.3}, {"b" = 1, "n" = 2.3}, {"a" = 1, "b" = 3, "n" = 2.1}], ["a", "b"])`, + []map[string]interface{}{{"a": 1, "b": 3, "n": 2.1}}, + }, + { + // Join with multiple keys + // No match because one key is missing + "array.combine_maps", + `array.combine_maps([{"a" = 1, "n" = 1.1}, {"a" = 1, "b" = 3, "n" = 1.1}, {"b" = 3, "n" = 1.1}], [{"a" = 1, "n" = 2.3}, {"b" = 1, "n" = 2.3}, {"a" = 1, "b" = 3, "n" = 2.1}], ["a", "b", "c"])`, + []map[string]interface{}{}, + }, + { + // Multi match ends up with len(ARG1) * len(ARG2) maps + "array.combine_maps", + `array.combine_maps([{"a" = 1, "n" = 1.1}, {"a" = 1, "n" = 1.2}, {"a" = 1, "n" = 1.3}], [{"a" = 1, "n" = 2.1}, {"a" = 1, "n" = 2.2}, {"a" = 1, "n" = 2.3}], ["a"])`, + []map[string]interface{}{ + {"a": 1, "n": 2.1}, {"a": 1, "n": 2.2}, {"a": 1, "n": 2.3}, + {"a": 1, "n": 2.1}, {"a": 1, "n": 2.2}, {"a": 1, "n": 2.3}, + {"a": 1, "n": 2.1}, {"a": 1, "n": 2.2}, {"a": 1, "n": 2.3}, + }, + }, } for _, tc := range tt { @@ -55,6 +148,46 @@ func TestVM_Stdlib(t *testing.T) { } } +func TestVM_Stdlib_Errors(t *testing.T) { + tt := []struct { + name string + input string + expectedErr string + }{ + // Map tests + { + // Error: invalid RHS type - string. + "array.combine_maps", + `array.combine_maps([{"a" = "a1", "b" = "b1"}], "a", ["a"])`, + `"a" should be array, got string`, + }, + { + // Error: invalid RHS type - an array with strings. + "array.combine_maps", + `array.combine_maps([{"a" = "a1", "b" = "b1"}], ["a"], ["a"])`, + `"a" should be object, got string`, + }, + { + "array.combine_maps", + `array.combine_maps([{"a" = "a1", "b" = "b1"}], [{"a" = "a1", "c" = "b1"}], [])`, + `combine_maps: merge conditions must not be empty`, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + expr, err := parser.ParseExpression(tc.input) + require.NoError(t, err) + + eval := vm.New(expr) + + rv := reflect.New(reflect.TypeOf([]map[string]interface{}{})) + err = eval.Evaluate(nil, rv.Interface()) + require.ErrorContains(t, err, tc.expectedErr) + }) + } +} + func TestStdlibCoalesce(t *testing.T) { t.Setenv("TEST_VAR2", "Hello!")