Skip to content

Commit

Permalink
Prototype targets.merge function (array.combine_maps) (grafana#1826)
Browse files Browse the repository at this point in the history
* Prototype map.inner_join function

* Rename to "targets.merge", remove unnecessary params

* Add failure tests

* Delete incorrect comment

* rename targets.merge to array.combine_maps, mark it as experimental, make is more permissive and add tests

* update changelog

* Update CHANGELOG.md

Co-authored-by: Piotr <[email protected]>

* Update docs/sources/reference/stdlib/array.md

Co-authored-by: Piotr <[email protected]>

* Update docs/sources/reference/stdlib/array.md

Co-authored-by: Piotr <[email protected]>

* Update docs/sources/reference/stdlib/array.md

Co-authored-by: Piotr <[email protected]>

* rename mapCombine to combineMaps

* document panic

* add equal test

* add more tests

* Update syntax/internal/stdlib/stdlib.go

Co-authored-by: Piotr <[email protected]>

* Update syntax/internal/stdlib/stdlib.go

Co-authored-by: Piotr <[email protected]>

* add examples in doc

* fix error propagation

* remove value nul on len function

* refactor code into a traversal function

* update doc to avoid modifying the experimental shared doc

---------

Co-authored-by: William Dumont <[email protected]>
Co-authored-by: Piotr <[email protected]>
  • Loading branch information
3 people authored and vaxvms committed Nov 15, 2024
1 parent 9ed8fc8 commit 03b13c5
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ v1.5.0-rc.0
### Features

- Add the function `path_join` to the stdlib. (@wildum)

- Add `pyroscope.receive_http` component to receive and forward Pyroscope profiles (@marcsanmi)

- Add support to `loki.source.syslog` for the RFC3164 format ("BSD syslog"). (@sushain97)
Expand All @@ -47,6 +48,8 @@ v1.5.0-rc.0
- (_Experimental_) Add a `prometheus.write.queue` component to add an alternative to `prometheus.remote_write`
which allowing the writing of metrics to a prometheus endpoint. (@mattdurham)

- (_Experimental_) Add the `arrary.combine_maps` function to the stdlib. (@ptodev, @wildum)

### Enhancements

- The `mimir.rules.kubernetes` component now supports adding extra label matchers
Expand Down
51 changes: 51 additions & 0 deletions docs/sources/reference/stdlib/array.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,54 @@ Elements within the list can be any type.
> array.concat([[1, 2], [3, 4]], [[5, 6]])
[[1, 2], [3, 4], [5, 6]]
```

## array.combine_maps

> **EXPERIMENTAL**: This is an [experimental][] feature. Experimental
> features are subject to frequent breaking changes, and may be removed with
> no equivalent replacement. The `stability.level` flag must be set to `experimental`
> to use the feature.
The `array.combine_maps` function allows you to join two arrays of maps if certain keys have matching values in both maps. It's particularly useful when combining labels of targets coming from different `prometheus.discovery.*` or `prometheus.exporter.*` components.
It takes three arguments:

* The first two arguments are a of type `list(map(string))`. The keys of the map are strings.
The value for each key could be of any Alloy type such as a `string`, `integer`, `map`, or a `capsule`.
* The third input is an `array` containing strings. The strings are the keys whose value has to match for maps to be combined.

The maps that don't contain all the keys provided in the third argument will be discarded. When maps are combined and both contain the same keys, the last value from the second argument will be used.

Pseudo function code:
```
for every map in arg1:
for every map in arg2:
if the condition key matches in both:
merge maps and add to result
```

### Examples

```alloy
> array.combine_maps([{"instance"="1.1.1.1", "team"="A"}], [{"instance"="1.1.1.1", "cluster"="prod"}], ["instance"])
[{"instance"="1.1.1.1", "team"="A", "cluster"="prod"}]
// Second map overrides the team in the first map
> array.combine_maps([{"instance"="1.1.1.1", "team"="A"}], [{"instance"="1.1.1.1", "team"="B"}], ["instance"])
[{"instance"="1.1.1.1", "team"="B"}]
// If multiple maps from the first argument match with multiple maps from the second argument, different combinations will be created.
> array.combine_maps([{"instance"="1.1.1.1", "team"="A"}, {"instance"="1.1.1.1", "team"="B"}], [{"instance"="1.1.1.1", "cluster"="prod"}, {"instance"="1.1.1.1", "cluster"="ops"}], ["instance"])
[{"instance"="1.1.1.1", "team"="A", "cluster"="prod"}, {"instance"="1.1.1.1", "team"="A", "cluster"="ops"}, {"instance"="1.1.1.1", "team"="B", "cluster"="prod"}, {"instance"="1.1.1.1", "team"="B", "cluster"="ops"}]
```

Examples using discovery and exporter components:
```alloy
> array.combine_maps(discovery.kubernetes.k8s_pods.targets, prometheus.exporter.postgres, ["instance"])
> array.combine_maps(prometheus.exporter.redis.default.targets, [{"instance"="1.1.1.1", "testLabelKey" = "testLabelVal"}], ["instance"])
```

You can find more examples in the [tests][].

[tests]: https://github.com/grafana/alloy/blob/main/syntax/vm/vm_stdlib_test.go
[experimental]: https://grafana.com/docs/release-life-cycle/
24 changes: 23 additions & 1 deletion internal/runtime/internal/controller/component_references.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/internal/dag"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/syntax/ast"
Expand All @@ -18,6 +19,17 @@ import (
// will be (field_a, field_b, field_c).
type Traversal []*ast.Ident

// String returns a dot-separated string representation of the field names in the traversal.
// For example, a traversal of fields [field_a, field_b, field_c] returns "field_a.field_b.field_c".
// Returns an empty string if the traversal contains no fields.
func (t Traversal) String() string {
var fieldNames []string
for _, field := range t {
fieldNames = append(fieldNames, field.Name)
}
return strings.Join(fieldNames, ".")
}

// Reference describes an Alloy expression reference to a BlockNode.
type Reference struct {
Target BlockNode // BlockNode being referenced
Expand All @@ -29,7 +41,7 @@ type Reference struct {

// ComponentReferences returns the list of references a component is making to
// other components.
func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scope) ([]Reference, diag.Diagnostics) {
func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scope, minStability featuregate.Stability) ([]Reference, diag.Diagnostics) {
var (
traversals []Traversal

Expand Down Expand Up @@ -63,6 +75,16 @@ func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scop
refs = append(refs, ref)
} else if scope.IsStdlibDeprecated(t[0].Name) {
level.Warn(l).Log("msg", "this stdlib function is deprecated; please refer to the documentation for updated usage and alternatives", "function", t[0].Name)
} else if funcName := t.String(); scope.IsStdlibExperimental(funcName) {
if err := featuregate.CheckAllowed(featuregate.StabilityExperimental, minStability, funcName); err != nil {
diags = append(diags, diag.Diagnostic{
Severity: diag.SeverityLevelError,
Message: err.Error(),
StartPos: ast.StartPos(t[0]).Position(),
EndPos: ast.StartPos(t[len(t)-1]).Position(),
})
continue
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/runtime/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics {

// Finally, wire component references.
l.cache.mut.RLock()
refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.scope)
refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.scope, l.globals.MinStability)
l.cache.mut.RUnlock()
for _, ref := range refs {
g.AddEdge(dag.Edge{From: n, To: ref.Target})
Expand Down
145 changes: 143 additions & 2 deletions syntax/internal/stdlib/stdlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@ import (
"gopkg.in/yaml.v3"
)

// There identifiers are deprecated in favour of the namespaced ones.
// TODO: refactor the stdlib to have consistent naming between namespaces and identifiers.

// ExperimentalIdentifiers contains the full name (namespace + identifier's name) of stdlib
// identifiers that are considered "experimental".
var ExperimentalIdentifiers = map[string]bool{
"array.combine_maps": true,
}

// These identifiers are deprecated in favour of the namespaced ones.
var DeprecatedIdentifiers = map[string]interface{}{
"env": os.Getenv,
"nonsensitive": nonSensitive,
Expand Down Expand Up @@ -86,7 +94,8 @@ var str = map[string]interface{}{
}

var array = map[string]interface{}{
"concat": concat,
"concat": concat,
"combine_maps": combineMaps,
}

var convert = map[string]interface{}{
Expand Down Expand Up @@ -146,6 +155,138 @@ var concat = value.RawFunction(func(funcValue value.Value, args ...value.Value)
return value.Array(raw...), nil
})

// This function assumes that the types of the value.Value objects are correct.
func shouldJoin(left value.Value, right value.Value, conditions value.Value) (bool, error) {
for i := 0; i < conditions.Len(); i++ {
condition := conditions.Index(i).Text()

leftVal, ok := left.Key(condition)
if !ok {
return false, nil
}

rightVal, ok := right.Key(condition)
if !ok {
return false, nil
}

if !leftVal.Equal(rightVal) {
return false, nil
}
}
return true, nil
}

// Merge two maps.
// If a key exists in both maps, the value from the right map will be used.
func concatMaps(left, right value.Value) (value.Value, error) {
res := make(map[string]value.Value)

for _, key := range left.Keys() {
val, ok := left.Key(key)
if !ok {
return value.Null, fmt.Errorf("concatMaps: key %s not found in left map while iterating - this should never happen", key)
}
res[key] = val
}

for _, key := range right.Keys() {
val, ok := right.Key(key)
if !ok {
return value.Null, fmt.Errorf("concatMaps: key %s not found in right map while iterating - this should never happen", key)
}
res[key] = val
}

return value.Object(res), nil
}

// Inputs:
// args[0]: []map[string]string: lhs array
// args[1]: []map[string]string: rhs array
// args[2]: []string: merge conditions
var combineMaps = value.RawFunction(func(funcValue value.Value, args ...value.Value) (value.Value, error) {
if len(args) != 3 {
return value.Value{}, fmt.Errorf("combine_maps: expected 3 arguments, got %d", len(args))
}

// Validate args[0] and args[1]
for i := range []int{0, 1} {
if args[i].Type() != value.TypeArray {
return value.Null, value.ArgError{
Function: funcValue,
Argument: args[i],
Index: i,
Inner: value.TypeError{
Value: args[i],
Expected: value.TypeArray,
},
}
}
for j := 0; j < args[i].Len(); j++ {
if args[i].Index(j).Type() != value.TypeObject {
return value.Null, value.ArgError{
Function: funcValue,
Argument: args[i].Index(j),
Index: j,
Inner: value.TypeError{
Value: args[i].Index(j),
Expected: value.TypeObject,
},
}
}
}
}

// Validate args[2]
if args[2].Type() != value.TypeArray {
return value.Null, value.ArgError{
Function: funcValue,
Argument: args[2],
Index: 2,
Inner: value.TypeError{
Value: args[2],
Expected: value.TypeArray,
},
}
}
if args[2].Len() == 0 {
return value.Null, value.ArgError{
Function: funcValue,
Argument: args[2],
Index: 2,
Inner: fmt.Errorf("combine_maps: merge conditions must not be empty"),
}
}

// We cannot preallocate the size of the result array, because we don't know
// how well the merge is going to go. If none of the merge conditions are met,
// the result array will be empty.
res := []value.Value{}

for i := 0; i < args[0].Len(); i++ {
for j := 0; j < args[1].Len(); j++ {
left := args[0].Index(i)
right := args[1].Index(j)

join, err := shouldJoin(left, right, args[2])
if err != nil {
return value.Null, err
}

if join {
val, err := concatMaps(left, right)
if err != nil {
return value.Null, err
}
res = append(res, val)
}
}
}

return value.Array(res...), nil
})

func jsonDecode(in string) (interface{}, error) {
var res interface{}
err := json.Unmarshal([]byte(in), &res)
Expand Down
15 changes: 14 additions & 1 deletion syntax/internal/value/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (v Value) Key(key string) (index Value, ok bool) {
//
// An ArgError will be returned if one of the arguments is invalid. An Error
// will be returned if the function call returns an error or if the number of
// arguments doesn't match.
// arguments doesn't match
func (v Value) Call(args ...Value) (Value, error) {
if v.ty != TypeFunction {
panic("syntax/value: Call called on non-function type")
Expand Down Expand Up @@ -553,3 +553,16 @@ func convertGoNumber(nval Number, target reflect.Type) reflect.Value {

panic("unsupported number conversion")
}

// Equal will result in panic if the values are funcs, maps or slices
func (v Value) Equal(rhs Value) bool {
if v.Type() != rhs.Type() {
return false
}

if !v.rv.Equal(rhs.rv) {
return false
}

return true
}
6 changes: 6 additions & 0 deletions syntax/internal/value/value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ func TestValue_Call(t *testing.T) {
require.Equal(t, int64(15+43), res.Int())
})

t.Run("equal - string", func(t *testing.T) {
v := value.String("aa")
w := value.String("aa")
require.True(t, v.Equal(w))
})

t.Run("fully variadic", func(t *testing.T) {
add := func(nums ...int) int {
var sum int
Expand Down
3 changes: 3 additions & 0 deletions syntax/vm/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func makeDiagnostic(err error, assoc map[value.Value]ast.Node) error {
case value.FieldError:
fmt.Fprintf(&expr, ".%s", ne.Field)
val = ne.Value
case value.ArgError:
message = ne.Inner.Error()
val = ne.Argument
}

cause = val
Expand Down
6 changes: 6 additions & 0 deletions syntax/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,3 +509,9 @@ func (s *Scope) IsStdlibDeprecated(name string) bool {
_, exist := stdlib.DeprecatedIdentifiers[name]
return exist
}

// IsStdlibExperimental returns true if the scoped identifier is experimental.
func (s *Scope) IsStdlibExperimental(fullName string) bool {
_, exist := stdlib.ExperimentalIdentifiers[fullName]
return exist
}
Loading

0 comments on commit 03b13c5

Please sign in to comment.