Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype targets.merge function (array.combine_maps) #1826

Merged
merged 21 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ v1.5.0-rc.0
### Features

- Add the function `path_join` to the stdlib. (@wildum)

- Add `pyroscope.receive_http` component to receive and forward Pyroscope profiles (@marcsanmi)

- Add support to `loki.source.syslog` for the RFC3164 format ("BSD syslog"). (@sushain97)
Expand All @@ -44,6 +45,8 @@ v1.5.0-rc.0
- (_Experimental_) Add a `prometheus.write.queue` component to add an alternative to `prometheus.remote_write`
which allowing the writing of metrics to a prometheus endpoint. (@mattdurham)

- (_Experimental_) Add the `arrary.combine_maps` function to the stdlib. (@ptodev, @wildum)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding it directly to the rc changelog to avoid another PR to main to switch it when we patch it

### Enhancements

- The `mimir.rules.kubernetes` component now supports adding extra label matchers
Expand Down
47 changes: 47 additions & 0 deletions docs/sources/reference/stdlib/array.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,50 @@ Elements within the list can be any type.
> array.concat([[1, 2], [3, 4]], [[5, 6]])
[[1, 2], [3, 4], [5, 6]]
```

## array.combine_maps

{{< docs/shared lookup="stability/experimental.md" source="alloy" version="<ALLOY_VERSION>" >}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we create a separate shared markdown for "feature" and leave the "component" alone?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to avoid having two almost identical shared paragraphs but I'm ok to have both if "feature" is too vague for "component". @clayton-cornell what do you think?


The `array.combine_maps` function allows you to join two arrays of maps if certain keys have matching values in both maps. It's particularly useful when combining labels of targets coming from different `prometheus.discovery.*` or `prometheus.exporter.*` components.
It takes three arguments:

* The first two arguments are a of type `list(map(string))`. The keys of the map are strings.
The value for each key could be of any Alloy type such as a `string`, `integer`, `map`, or a `capsule`.
* The third input is an `array` containing strings. The strings are the keys whose value has to match for maps to be combined.

The maps that don't contain all the keys provided in the third argument will be discarded. When maps are combined and both contain the same keys, the last value from the second argument will be used.

Pseudo function code:
```
for every map in arg1:
for every map in arg2:
if the condition key matches in both:
merge maps and add to result
```

### Examples
thampiotr marked this conversation as resolved.
Show resolved Hide resolved

```alloy
> array.combine_maps([{"instance"="1.1.1.1", "team"="A"}], [{"instance"="1.1.1.1", "cluster"="prod"}], ["instance"])
[{"instance"="1.1.1.1", "team"="A", "cluster"="prod"}]

// Second map overrides the team in the first map
> array.combine_maps([{"instance"="1.1.1.1", "team"="A"}], [{"instance"="1.1.1.1", "team"="B"}], ["instance"])
[{"instance"="1.1.1.1", "team"="B"}]

// If multiple maps from the first argument match with multiple maps from the second argument, different combinations will be created.
> array.combine_maps([{"instance"="1.1.1.1", "team"="A"}, {"instance"="1.1.1.1", "team"="B"}], [{"instance"="1.1.1.1", "cluster"="prod"}, {"instance"="1.1.1.1", "cluster"="ops"}], ["instance"])
[{"instance"="1.1.1.1", "team"="A", "cluster"="prod"}, {"instance"="1.1.1.1", "team"="A", "cluster"="ops"}, {"instance"="1.1.1.1", "team"="B", "cluster"="prod"}, {"instance"="1.1.1.1", "team"="B", "cluster"="ops"}]
```

Examples using discovery and exporter components:
```alloy
> array.combine_maps(discovery.kubernetes.k8s_pods.targets, prometheus.exporter.postgres, ["instance"])

> array.combine_maps(prometheus.exporter.redis.default.targets, [{"instance"="1.1.1.1", "testLabelKey" = "testLabelVal"}], ["instance"])
```

You can find more examples in the [tests][].

[tests]: https://github.com/grafana/alloy/blob/main/syntax/vm/vm_stdlib_test.go
6 changes: 3 additions & 3 deletions docs/sources/shared/stability/experimental.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ description: Shared content, experimental
headless: true
---

> **EXPERIMENTAL**: This is an [experimental][] component. Experimental
> components are subject to frequent breaking changes, and may be removed with
> **EXPERIMENTAL**: This is an [experimental][] feature. Experimental
> features are subject to frequent breaking changes, and may be removed with
> no equivalent replacement. The `stability.level` flag must be set to `experimental`
> to use the component.
> to use the feature.

[experimental]: https://grafana.com/docs/release-life-cycle/
21 changes: 20 additions & 1 deletion internal/runtime/internal/controller/component_references.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/internal/dag"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/syntax/ast"
Expand All @@ -29,7 +30,7 @@ type Reference struct {

// ComponentReferences returns the list of references a component is making to
// other components.
func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scope) ([]Reference, diag.Diagnostics) {
func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scope, minStability featuregate.Stability) ([]Reference, diag.Diagnostics) {
var (
traversals []Traversal

Expand Down Expand Up @@ -63,6 +64,24 @@ func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scop
refs = append(refs, ref)
} else if scope.IsStdlibDeprecated(t[0].Name) {
level.Warn(l).Log("msg", "this stdlib function is deprecated; please refer to the documentation for updated usage and alternatives", "function", t[0].Name)
} else {
var tokenNames []string
for _, token := range t {
tokenNames = append(tokenNames, token.Name)
}
wildum marked this conversation as resolved.
Show resolved Hide resolved

funcName := strings.Join(tokenNames, ".")
if scope.IsStdlibExperimental(funcName) {
if err := featuregate.CheckAllowed(featuregate.StabilityExperimental, minStability, funcName); err != nil {
thampiotr marked this conversation as resolved.
Show resolved Hide resolved
diags = append(diags, diag.Diagnostic{
Severity: diag.SeverityLevelError,
Message: err.Error(),
StartPos: ast.StartPos(t[0]).Position(),
EndPos: ast.StartPos(t[len(t)-1]).Position(),
})
continue
}
}
wildum marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/runtime/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics {

// Finally, wire component references.
l.cache.mut.RLock()
refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.scope)
refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.scope, l.globals.MinStability)
l.cache.mut.RUnlock()
for _, ref := range refs {
g.AddEdge(dag.Edge{From: n, To: ref.Target})
Expand Down
145 changes: 143 additions & 2 deletions syntax/internal/stdlib/stdlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@ import (
"gopkg.in/yaml.v3"
)

// There identifiers are deprecated in favour of the namespaced ones.
// TODO: refactor the stdlib to have consistent naming between namespaces and identifiers.

// ExperimentalIdentifiers contains the full name (namespace + identifier's name) of stdlib
// identifiers that are considered "experimental".
var ExperimentalIdentifiers = map[string]bool{
"array.combine_maps": true,
}

// These identifiers are deprecated in favour of the namespaced ones.
var DeprecatedIdentifiers = map[string]interface{}{
"env": os.Getenv,
"nonsensitive": nonSensitive,
Expand Down Expand Up @@ -86,7 +94,8 @@ var str = map[string]interface{}{
}

var array = map[string]interface{}{
"concat": concat,
"concat": concat,
"combine_maps": combineMaps,
}

var convert = map[string]interface{}{
Expand Down Expand Up @@ -146,6 +155,138 @@ var concat = value.RawFunction(func(funcValue value.Value, args ...value.Value)
return value.Array(raw...), nil
})

// This function assumes that the types of the value.Value objects are correct.
func shouldJoin(left value.Value, right value.Value, conditions value.Value) (bool, error) {
for i := 0; i < conditions.Len(); i++ {
condition := conditions.Index(i).Text()

leftVal, ok := left.Key(condition)
if !ok {
return false, nil
}

rightVal, ok := right.Key(condition)
if !ok {
return false, nil
}

if !leftVal.Equal(rightVal) {
return false, nil
}
}
return true, nil
}

// Merge two maps.
// If a key exists in both maps, the value from the right map will be used.
func concatMaps(left, right value.Value) (value.Value, error) {
res := make(map[string]value.Value)

for _, key := range left.Keys() {
val, ok := left.Key(key)
if !ok {
return value.Null, fmt.Errorf("concatMaps: key %s not found in left map while iterating - this should never happen", key)
}
res[key] = val
}

for _, key := range right.Keys() {
val, ok := right.Key(key)
if !ok {
return value.Null, fmt.Errorf("concatMaps: key %s not found in right map while iterating - this should never happen", key)
}
res[key] = val
}

return value.Object(res), nil
}

// Inputs:
// args[0]: []map[string]string: lhs array
// args[1]: []map[string]string: rhs array
// args[2]: []string: merge conditions
var combineMaps = value.RawFunction(func(funcValue value.Value, args ...value.Value) (value.Value, error) {
if len(args) != 3 {
return value.Value{}, fmt.Errorf("combine_maps: expected 3 arguments, got %d", len(args))
}

// Validate args[0] and args[1]
for i := range []int{0, 1} {
if args[i].Type() != value.TypeArray {
return value.Null, value.ArgError{
Function: funcValue,
Argument: args[i],
Index: i,
Inner: value.TypeError{
Value: args[i],
Expected: value.TypeArray,
},
}
}
for j := 0; j < args[i].Len(); j++ {
if args[i].Index(j).Type() != value.TypeObject {
return value.Null, value.ArgError{
Function: funcValue,
Argument: args[i].Index(j),
Index: j,
Inner: value.TypeError{
Value: args[i].Index(j),
Expected: value.TypeObject,
},
}
}
}
}

// Validate args[2]
if args[2].Type() != value.TypeArray {
return value.Null, value.ArgError{
Function: funcValue,
Argument: args[2],
Index: 2,
Inner: value.TypeError{
Value: args[2],
Expected: value.TypeArray,
},
}
}
if args[2].Len() == 0 {
return value.Null, value.ArgError{
Function: funcValue,
Argument: args[2],
Index: 2,
Inner: fmt.Errorf("combine_maps: merge conditions must not be empty"),
}
}

// We cannot preallocate the size of the result array, because we don't know
// how well the merge is going to go. If none of the merge conditions are met,
// the result array will be empty.
res := []value.Value{}

for i := 0; i < args[0].Len(); i++ {
for j := 0; j < args[1].Len(); j++ {
left := args[0].Index(i)
right := args[1].Index(j)

join, err := shouldJoin(left, right, args[2])
if err != nil {
return value.Null, err
}

if join {
val, err := concatMaps(left, right)
if err != nil {
return value.Null, err
}
res = append(res, val)
}
}
}

return value.Array(res...), nil
})

func jsonDecode(in string) (interface{}, error) {
var res interface{}
err := json.Unmarshal([]byte(in), &res)
Expand Down
15 changes: 14 additions & 1 deletion syntax/internal/value/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (v Value) Key(key string) (index Value, ok bool) {
//
// An ArgError will be returned if one of the arguments is invalid. An Error
// will be returned if the function call returns an error or if the number of
// arguments doesn't match.
// arguments doesn't match
func (v Value) Call(args ...Value) (Value, error) {
if v.ty != TypeFunction {
panic("syntax/value: Call called on non-function type")
Expand Down Expand Up @@ -553,3 +553,16 @@ func convertGoNumber(nval Number, target reflect.Type) reflect.Value {

panic("unsupported number conversion")
}

// Equal will result in panic if the values are funcs, maps or slices
func (v Value) Equal(rhs Value) bool {
if v.Type() != rhs.Type() {
return false
}

if !v.rv.Equal(rhs.rv) {
wildum marked this conversation as resolved.
Show resolved Hide resolved
return false
}

return true
}
6 changes: 6 additions & 0 deletions syntax/internal/value/value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ func TestValue_Call(t *testing.T) {
require.Equal(t, int64(15+43), res.Int())
})

t.Run("equal - string", func(t *testing.T) {
v := value.String("aa")
w := value.String("aa")
require.True(t, v.Equal(w))
})

t.Run("fully variadic", func(t *testing.T) {
add := func(nums ...int) int {
var sum int
Expand Down
3 changes: 3 additions & 0 deletions syntax/vm/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func makeDiagnostic(err error, assoc map[value.Value]ast.Node) error {
case value.FieldError:
fmt.Fprintf(&expr, ".%s", ne.Field)
val = ne.Value
case value.ArgError:
message = ne.Inner.Error()
val = ne.Argument
}

cause = val
Expand Down
6 changes: 6 additions & 0 deletions syntax/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,3 +509,9 @@ func (s *Scope) IsStdlibDeprecated(name string) bool {
_, exist := stdlib.DeprecatedIdentifiers[name]
return exist
}

// IsStdlibExperimental returns true if the scoped identifier is experimental.
func (s *Scope) IsStdlibExperimental(fullName string) bool {
_, exist := stdlib.ExperimentalIdentifiers[fullName]
return exist
}
Loading