diff --git a/internal/controller/trigger/validation/suscription_test.go b/internal/controller/trigger/validation/suscription_test.go index cbf686b9a..78998d752 100644 --- a/internal/controller/trigger/validation/suscription_test.go +++ b/internal/controller/trigger/validation/suscription_test.go @@ -18,12 +18,10 @@ import ( "context" "testing" - "google.golang.org/protobuf/types/known/structpb" - ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" metapb "github.com/linkall-labs/vanus/proto/pkg/meta" - . "github.com/smartystreets/goconvey/convey" + "google.golang.org/protobuf/types/known/structpb" ) func TestSubscriptionRequestValidator(t *testing.T) { diff --git a/internal/primitive/transform/action/action.go b/internal/primitive/transform/action/action.go index d844f17ab..909fe9bec 100644 --- a/internal/primitive/transform/action/action.go +++ b/internal/primitive/transform/action/action.go @@ -143,3 +143,18 @@ var ( ErrExist = fmt.Errorf("action have exist") ErrArgNumber = fmt.Errorf("action arg number invalid") ) + +type NestAction interface { + Action + InitAction(actions []Action) error +} + +type NestActionImpl struct { + CommonAction + Actions []Action +} + +func (c *NestActionImpl) InitAction(actions []Action) error { + c.Actions = actions + return nil +} diff --git a/internal/primitive/transform/action/array/foreach.go b/internal/primitive/transform/action/array/foreach.go new file mode 100644 index 000000000..470626893 --- /dev/null +++ b/internal/primitive/transform/action/array/foreach.go @@ -0,0 +1,64 @@ +// Copyright 2023 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package array + +import ( + "github.com/linkall-labs/vanus/internal/primitive/transform/action" + "github.com/linkall-labs/vanus/internal/primitive/transform/arg" + "github.com/linkall-labs/vanus/internal/primitive/transform/common" + "github.com/linkall-labs/vanus/internal/primitive/transform/context" + "github.com/pkg/errors" +) + +// ["array_foreach","array root", function]. +type arrayForeachAction struct { + action.NestActionImpl +} + +func NewArrayForeachAction() action.Action { + a := &arrayForeachAction{} + a.CommonAction = action.CommonAction{ + ActionName: "ARRAY_FOREACH", + FixedArgs: []arg.TypeList{[]arg.Type{arg.EventData}}, + } + return a +} + +func (a *arrayForeachAction) Init(args []arg.Arg) error { + a.TargetArg = args[0] + a.Args = args + a.ArgTypes = []common.Type{common.Array} + return nil +} + +func (a *arrayForeachAction) Execute(ceCtx *context.EventContext) error { + args, err := a.RunArgs(ceCtx) + if err != nil { + return err + } + arrayValue, _ := args[0].([]interface{}) + for i := range arrayValue { + newCtx := &context.EventContext{ + Data: arrayValue[i], + } + for i := range a.Actions { + err = a.Actions[i].Execute(newCtx) + if err != nil { + return errors.Wrapf(err, "action %dst execute error", i+1) + } + } + } + return a.TargetArg.SetValue(ceCtx, arrayValue) +} diff --git a/internal/primitive/transform/action/array/foreach_test.go b/internal/primitive/transform/action/array/foreach_test.go new file mode 100644 index 000000000..59f47d473 --- /dev/null +++ b/internal/primitive/transform/action/array/foreach_test.go @@ -0,0 +1,68 @@ +// Copyright 2023 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package array_test + +import ( + stdJson "encoding/json" + "testing" + + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/linkall-labs/vanus/internal/primitive/transform/action/array" + "github.com/linkall-labs/vanus/internal/primitive/transform/context" + "github.com/linkall-labs/vanus/internal/primitive/transform/runtime" + . "github.com/smartystreets/goconvey/convey" +) + +func TestReplaceArrayAction(t *testing.T) { + funcName := array.NewArrayForeachAction().Name() + Convey("test replace array valid", t, func() { + jsonStr := `{ + "array": [ + { + "name": "name1", + "number": 1 + }, + { + "name": "name2", + "number": "2" + }, + { + "name": "name3", + "number": "3" + } + ] + }` + Convey("replace valid", func() { + a, err := runtime.NewAction([]interface{}{funcName, "$.data.array", []interface{}{"add_prefix", "@.name", "prefix"}}) + So(err, ShouldBeNil) + + e := cetest.MinEvent() + var data map[string]interface{} + err = stdJson.Unmarshal([]byte(jsonStr), &data) + So(err, ShouldBeNil) + err = a.Execute(&context.EventContext{ + Event: &e, + Data: data, + }) + So(err, ShouldBeNil) + value, exist := data["array"] + So(exist, ShouldBeTrue) + So(len(value.([]interface{})), ShouldEqual, 3) + So(value.([]interface{})[0].(map[string]interface{})["name"], ShouldEqual, "prefixname1") + So(value.([]interface{})[1].(map[string]interface{})["name"], ShouldEqual, "prefixname2") + So(value.([]interface{})[2].(map[string]interface{})["name"], ShouldEqual, "prefixname3") + }) + }) +} diff --git a/internal/primitive/transform/action/render/array.go b/internal/primitive/transform/action/array/render.go similarity index 99% rename from internal/primitive/transform/action/render/array.go rename to internal/primitive/transform/action/array/render.go index 5d34c6290..b7341bd29 100644 --- a/internal/primitive/transform/action/render/array.go +++ b/internal/primitive/transform/action/array/render.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package render +package array import ( "fmt" diff --git a/internal/primitive/transform/action/render/array_test.go b/internal/primitive/transform/action/array/render_test.go similarity index 98% rename from internal/primitive/transform/action/render/array_test.go rename to internal/primitive/transform/action/array/render_test.go index bf6ba7c3f..5ae1090b2 100644 --- a/internal/primitive/transform/action/render/array_test.go +++ b/internal/primitive/transform/action/array/render_test.go @@ -12,21 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -package render_test +package array_test import ( stdJson "encoding/json" "testing" cetest "github.com/cloudevents/sdk-go/v2/test" - "github.com/linkall-labs/vanus/internal/primitive/transform/action/render" + "github.com/linkall-labs/vanus/internal/primitive/transform/action/array" "github.com/linkall-labs/vanus/internal/primitive/transform/context" "github.com/linkall-labs/vanus/internal/primitive/transform/runtime" . "github.com/smartystreets/goconvey/convey" ) func TestRenderArrayAction(t *testing.T) { - funcName := render.NewRenderArrayAction().Name() + funcName := array.NewRenderArrayAction().Name() Convey("test render array invalid", t, func() { jsonStr := `{ "array": [ diff --git a/internal/primitive/transform/arg/arg.go b/internal/primitive/transform/arg/arg.go index a928ed070..3c2a1020e 100644 --- a/internal/primitive/transform/arg/arg.go +++ b/internal/primitive/transform/arg/arg.go @@ -83,6 +83,9 @@ func NewArg(arg interface{}) (Arg, error) { if argLen >= 2 && argName[:2] == EventArgPrefix { return newEventAttribute(argName) } + if argLen >= 2 && argName[:2] == EventDataSubArgPrefix { + return newEventData(EventDataArgPrefix + "." + argName[2:]), nil + } if argLen >= 3 && argName[0] == '<' && argName[argLen-1] == '>' && argName[1] != '@' { return newDefine(argName), nil } @@ -91,6 +94,7 @@ func NewArg(arg interface{}) (Arg, error) { } const ( - EventArgPrefix = "$." - EventDataArgPrefix = EventArgPrefix + "data" + EventArgPrefix = "$." + EventDataArgPrefix = EventArgPrefix + "data" + EventDataSubArgPrefix = "@." ) diff --git a/internal/primitive/transform/arg/event.go b/internal/primitive/transform/arg/event.go index 3c67c1f18..dd43751c7 100644 --- a/internal/primitive/transform/arg/event.go +++ b/internal/primitive/transform/arg/event.go @@ -115,8 +115,7 @@ func (arg eventData) Evaluate(ceCtx *context.EventContext) (interface{}, error) } func (arg eventData) SetValue(ceCtx *context.EventContext, value interface{}) error { - util.SetData(ceCtx.Data, arg.path, value) - return nil + return util.SetData(ceCtx.Data, arg.path, value) } func (arg eventData) DeleteValue(ceCtx *context.EventContext) error { diff --git a/internal/primitive/transform/runtime/action.go b/internal/primitive/transform/runtime/action.go index 12dd0e760..6d4ca0838 100644 --- a/internal/primitive/transform/runtime/action.go +++ b/internal/primitive/transform/runtime/action.go @@ -15,8 +15,7 @@ package runtime import ( - "fmt" - stdStrs "strings" + "strings" "github.com/linkall-labs/vanus/internal/primitive/transform/action" "github.com/linkall-labs/vanus/internal/primitive/transform/arg" @@ -29,9 +28,9 @@ var actionMap = map[string]newAction{} func AddAction(actionFn newAction) error { a := actionFn() - name := stdStrs.ToUpper(a.Name()) + name := strings.ToUpper(a.Name()) if _, exist := actionMap[name]; exist { - return fmt.Errorf("action %s has exist", name) + return errors.Errorf("action %s has exist", name) } actionMap[name] = actionFn return nil @@ -40,36 +39,64 @@ func AddAction(actionFn newAction) error { func NewAction(command []interface{}) (action.Action, error) { funcName, ok := command[0].(string) if !ok { - return nil, fmt.Errorf("command name must be string") + return nil, errors.Errorf("command name must be string") } - actionFn, exist := actionMap[stdStrs.ToUpper(funcName)] + actionFn, exist := actionMap[strings.ToUpper(funcName)] if !exist { - return nil, fmt.Errorf("command %s not exist", funcName) + return nil, errors.Errorf("command %s not exist", funcName) } a := actionFn() argNum := len(command) - 1 if argNum < a.Arity() { - return nil, fmt.Errorf("command %s arg number is not enough, it need %d but only have %d", + return nil, errors.Errorf("command %s arg number is not enough, it need %d but only have %d", funcName, a.Arity(), argNum) } - if argNum > a.Arity() && !a.IsVariadic() { - return nil, fmt.Errorf("command %s arg number is too many, it need %d but have %d", funcName, a.Arity(), argNum) + nestAction, isNestAction := a.(action.NestAction) + if !isNestAction { + if argNum > a.Arity() && !a.IsVariadic() { + return nil, errors.Errorf("command %s arg number is too many, it need %d but have %d", funcName, a.Arity(), argNum) + } + } else { + argNum = a.Arity() } args := make([]arg.Arg, argNum) - for i := 1; i < len(command); i++ { - _arg, err := arg.NewArg(command[i]) + for i := 0; i < len(args); i++ { + index := i + 1 + _arg, err := arg.NewArg(command[index]) if err != nil { - return nil, errors.Wrapf(err, "command %s arg %d is invalid", funcName, i) + return nil, errors.Wrapf(err, "command %s arg %d is invalid", funcName, index) } - argType := a.ArgType(i - 1) + argType := a.ArgType(i) if !argType.Contains(_arg) { - return nil, fmt.Errorf("command %s arg %d not support type %s", funcName, i, _arg.Type()) + return nil, errors.Errorf("command %s arg %d not support type %s", funcName, index, _arg.Type()) } - args[i-1] = _arg + args[i] = _arg } err := a.Init(args) if err != nil { return nil, errors.Wrapf(err, "command %s init error", funcName) } + if isNestAction { + actions := make([]action.Action, len(command)-1-argNum) + if len(actions) == 0 { + return nil, errors.Errorf("command %s arg number is not enough, lost function arg", funcName) + } + for i := 0; i < len(actions); i++ { + index := i + 1 + argNum + if arr, ok := command[index].([]interface{}); ok { + _a, err := NewAction(arr) + if err != nil { + return nil, errors.Wrapf(err, "action %s arg %d new action failed", funcName, index) + } + actions[i] = _a + } else { + return nil, errors.Errorf("arg %d is invalid", index) + } + } + err = nestAction.InitAction(actions) + if err != nil { + return nil, errors.Wrapf(err, "command %s init action error", funcName) + } + } return a, nil } diff --git a/internal/primitive/transform/runtime/action_bench_test.go b/internal/primitive/transform/runtime/action_bench_test.go index 13699d78a..fdcb2e65c 100644 --- a/internal/primitive/transform/runtime/action_bench_test.go +++ b/internal/primitive/transform/runtime/action_bench_test.go @@ -17,10 +17,9 @@ package runtime import ( "testing" - "github.com/linkall-labs/vanus/internal/primitive/transform/context" - ce "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/linkall-labs/vanus/internal/primitive/transform/context" ) func actionBenchmark(command []interface{}) func(b *testing.B) { diff --git a/internal/primitive/transform/runtime/init.go b/internal/primitive/transform/runtime/init.go index 27bf52286..e204edfb3 100644 --- a/internal/primitive/transform/runtime/init.go +++ b/internal/primitive/transform/runtime/init.go @@ -15,11 +15,11 @@ package runtime import ( + "github.com/linkall-labs/vanus/internal/primitive/transform/action/array" "github.com/linkall-labs/vanus/internal/primitive/transform/action/common" "github.com/linkall-labs/vanus/internal/primitive/transform/action/condition" "github.com/linkall-labs/vanus/internal/primitive/transform/action/datetime" "github.com/linkall-labs/vanus/internal/primitive/transform/action/math" - "github.com/linkall-labs/vanus/internal/primitive/transform/action/render" "github.com/linkall-labs/vanus/internal/primitive/transform/action/source" "github.com/linkall-labs/vanus/internal/primitive/transform/action/strings" "github.com/linkall-labs/vanus/internal/primitive/transform/action/structs" @@ -53,8 +53,9 @@ func init() { strings.NewReplaceBetweenPositionsAction, // condition condition.NewConditionIfAction, - // render - render.NewRenderArrayAction, + // array + array.NewRenderArrayAction, + array.NewArrayForeachAction, // common common.NewLengthAction, // source diff --git a/internal/trigger/util/event.go b/internal/trigger/util/event.go index 16e9b181e..114634ccc 100644 --- a/internal/trigger/util/event.go +++ b/internal/trigger/util/event.go @@ -17,12 +17,14 @@ package util import ( "encoding/json" "fmt" + "strconv" "strings" "time" ce "github.com/cloudevents/sdk-go/v2" "github.com/ohler55/ojg/jp" "github.com/ohler55/ojg/oj" + "github.com/pkg/errors" ) // LookupAttribute lookup event attribute value by attribute name. @@ -147,7 +149,7 @@ func SetAttribute(e *ce.Event, attr string, value interface{}) error { } return event.SetDataSchema(v) case "datacontenttype", "specversion": - return fmt.Errorf("attribute %s not support modify", attr) + return errors.Errorf("attribute %s not support modify", attr) default: return event.SetExtension(attr, value) } @@ -166,32 +168,101 @@ func DeleteAttribute(e *ce.Event, attr string) error { } // SetData set value to data path, now data must is map, not support array. -func SetData(data interface{}, path string, value interface{}) { +func SetData(data interface{}, path string, value interface{}) error { paths := strings.Split(path, ".") switch data.(type) { case map[string]interface{}: - setData(data, paths, value) + return setData(data, paths, value) case []interface{}: // todo ,now not support } + return errors.New("not support") } -func setData(data interface{}, paths []string, value interface{}) { +func setData(data interface{}, paths []string, value interface{}) error { + pathType, key, index, err := getPathIndex(paths[0]) + if err != nil { + return err + } switch m := data.(type) { case map[string]interface{}: + switch pathType { + case pathMap: + // key . + if len(paths) == 1 { + m[key] = value + return nil + } + v, ok := m[key] + if !ok || v == nil { + v = map[string]interface{}{} + m[key] = v + } + return setData(v, paths[1:], value) + case pathArray: + // arr[2] . + v, ok := m[key] + if !ok || v == nil { + m[key] = make([]interface{}, index+1) + } else { + vv, ok := v.([]interface{}) + if !ok { + return errors.Errorf("json path %s is array, but value is not array", paths[0]) + } + for i := len(vv); i <= index; i++ { + vv = append(vv, nil) + } + m[key] = vv + } + return setData(m[key], paths, value) + } + case []interface{}: if len(paths) == 1 { - m[paths[0]] = value - return + // arr[2]. + m[index] = value + return nil } - v, ok := m[paths[0]] - if !ok { - v = make(map[string]interface{}) - m[paths[0]] = v + v := m[index] + if v == nil { + m[index] = map[string]interface{}{} + } else { + _, ok := v.(map[string]interface{}) + if !ok { + // todo multidimensional array + return errors.Errorf("json path %s is array, but index %d value is not map", paths[0], index) + } } - setData(v, paths[1:], value) - case []interface{}: - // todo ,now not support + return setData(m[index], paths[1:], value) + } + return errors.New("not support") +} + +type pathType string + +const ( + pathMap pathType = "map" + pathArray pathType = "array" +) + +func getPathIndex(path string) (pathType, string, int, error) { + x := strings.Index(path, "[") + if x <= 0 { + return pathMap, path, 0, nil + } + y := strings.Index(path[x+1:], "]") + if y <= 0 { + return pathMap, path, 0, nil + } + index := path[x+1 : x+1+y] + v, err := strconv.ParseInt(index, 10, 32) + if err != nil { + // todo map or array + return pathMap, path, 0, errors.Wrapf(err, "json path %s get array index error, get a not number value", path) + } + if v < 0 { + return pathArray, path[:x], 0, errors.Errorf("json path %s get array index get a negative number", path) } + return pathArray, path[:x], int(v), nil } func DeleteData(data interface{}, path string) error { diff --git a/internal/trigger/util/event_test.go b/internal/trigger/util/event_test.go index 7165f2acc..cb47b720e 100644 --- a/internal/trigger/util/event_test.go +++ b/internal/trigger/util/event_test.go @@ -151,6 +151,24 @@ func TestSetData(t *testing.T) { }, "str": "stringV", "array": []interface{}{1.1, "str"}, + "arrayObj": []interface{}{ + map[string]interface{}{ + "map": map[string]interface{}{ + "number": 123.4, + "str": "str", + }, + "str": "stringV", + "array": []interface{}{1.1, "str"}, + }, + map[string]interface{}{ + "map": map[string]interface{}{ + "number": 123.4, + "str": "str", + }, + "str": "stringV", + "array": []interface{}{1.1, "str"}, + }, + }, } Convey("test add root key", func() { Convey("add common", func() { @@ -173,13 +191,21 @@ func TestSetData(t *testing.T) { So(exist, ShouldBeTrue) So(v, ShouldEqual, "v") }) - Convey("add array", func() { + Convey("add array value", func() { SetData(data, "addKey", []interface{}{123, "str"}) v, exist := data["addKey"].([]interface{}) So(exist, ShouldBeTrue) So(v[0], ShouldEqual, 123) So(v[1], ShouldEqual, "str") }) + Convey("array value add element", func() { + SetData(data, "array[3]", []interface{}{123, "str"}) + vv, exist := data["array"].([]interface{}) + So(exist, ShouldBeTrue) + v := vv[3].([]interface{}) + So(v[0], ShouldEqual, 123) + So(v[1], ShouldEqual, "str") + }) }) Convey("test replace root key", func() { SetData(data, "str", 1.1) @@ -208,18 +234,41 @@ func TestSetData(t *testing.T) { So(exist, ShouldBeTrue) So(v, ShouldEqual, "v") }) - Convey("add array", func() { + Convey("add value array", func() { SetData(data, "map.addKey", []interface{}{123, "string"}) v, exist := data["map"].(map[string]interface{})["addKey"].([]interface{}) So(exist, ShouldBeTrue) So(v[0], ShouldEqual, 123) So(v[1], ShouldEqual, "string") }) + Convey("add no exist array with object", func() { + SetData(data, "map.addKey[0].array", []interface{}{123, "string"}) + v, exist := data["map"].(map[string]interface{})["addKey"].([]interface{})[0].(map[string]interface{})["array"].([]interface{}) + So(exist, ShouldBeTrue) + So(v[0], ShouldEqual, 123) + So(v[1], ShouldEqual, "string") + }) + Convey("array object value add key", func() { + SetData(data, "arrayObj[0].addArray", []interface{}{123, "string"}) + v, exist := data["arrayObj"].([]interface{})[0].(map[string]interface{})["addArray"].([]interface{}) + So(exist, ShouldBeTrue) + So(v[0], ShouldEqual, 123) + So(v[1], ShouldEqual, "string") + }) }) Convey("test replace second key", func() { - SetData(data, "map.str", map[string]interface{}{"k": "v"}) - _, exist := data["map"].(map[string]interface{})["str"] - So(exist, ShouldBeTrue) + Convey("replace str", func() { + SetData(data, "map.str", map[string]interface{}{"k": "v"}) + v, exist := data["map"].(map[string]interface{})["str"] + So(exist, ShouldBeTrue) + So(v.(map[string]interface{})["k"], ShouldEqual, "v") + }) + Convey("array object value replace key", func() { + SetData(data, "arrayObj[1].str", map[string]interface{}{"k": "v"}) + v, exist := data["arrayObj"].([]interface{})[1].(map[string]interface{})["str"] + So(exist, ShouldBeTrue) + So(v.(map[string]interface{})["k"], ShouldEqual, "v") + }) }) Convey("test add third key", func() { Convey("add map", func() {