-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathlua_filter.go
142 lines (128 loc) · 4.24 KB
/
lua_filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/*
* Copyright 2024 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package filter
import (
"encoding/json"
"fmt"
"github.com/rulego/rulego"
"github.com/rulego/rulego-components/pkg/lua_engine"
"github.com/rulego/rulego/api/types"
"github.com/rulego/rulego/utils/maps"
lua "github.com/yuin/gopher-lua"
"strings"
)
// init registers the component to rulego
func init() {
_ = rulego.Registry.Register(&LuaFilter{})
}
// LuaFilterConfiguration node configuration
type LuaFilterConfiguration struct {
//Script configures the function body content or the script file path with `.lua` as the suffix
//Only need to provide the function body content, if it is a file path, then need to provide the complete script function:
//function Filter(msg, metadata, msgType) ${Script} \n end
//return bool
//The parameter msg, if the data type of msg is JSON, then it will be converted to the Lua table type before calling the function
Script string
}
// LuaFilter is a component that filters messages based on Lua scripts.
type LuaFilter struct {
Config LuaFilterConfiguration
// pool is a sync.Pool of *lua.LState
pool *luaEngine.LStatePool
}
// New creates a new instance of LuaFilter
func (x *LuaFilter) New() types.Node {
return &LuaFilter{Config: LuaFilterConfiguration{
Script: "return msg.temperature > 50",
}}
}
// Type returns the type of the component
func (x *LuaFilter) Type() string {
return "x/luaFilter"
}
// Init initializes the component
func (x *LuaFilter) Init(ruleConfig types.Config, configuration types.Configuration) error {
err := maps.Map2Struct(configuration, &x.Config)
if err == nil {
if strings.HasSuffix(x.Config.Script, ".lua") {
if err = luaEngine.ValidateLua(x.Config.Script); err != nil {
return err
}
// create a new LStatePool from file
x.pool = luaEngine.NewFileLStatePool(ruleConfig, x.Config.Script)
} else {
script := fmt.Sprintf("function Filter(msg, metadata, msgType) %s \nend", x.Config.Script)
if err = luaEngine.ValidateLua(script); err != nil {
return err
}
// create a new LStatePool from script
x.pool = luaEngine.NewStringLStatePool(ruleConfig, script)
}
}
return err
}
// OnMsg handles the message
func (x *LuaFilter) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
// get a *lua.LState from the pool
L := x.pool.Get()
if L == nil {
// if there is no available *lua.LState, tell the next node to fail
ctx.TellFailure(msg, fmt.Errorf("x/luaFilter lua.LState nil error"))
return
}
// defer putting back the *lua.LState to the pool
defer x.pool.Put(L)
//var data interface{} = msg.Data
var dataMap map[string]interface{}
if msg.DataType == types.JSON {
_ = json.Unmarshal([]byte(msg.Data), &dataMap)
}
var err error
filter := L.GetGlobal("Filter")
p := lua.P{
Fn: filter,
NRet: 1,
Protect: true,
}
if dataMap != nil {
// Call the Filter function, passing in msg, metadata, msgType as arguments.
err = L.CallByParam(p, luaEngine.MapToLTable(L, dataMap), luaEngine.StringMapToLTable(L, msg.Metadata), lua.LString(msg.Type))
} else {
// Call the Filter function, passing in msg, metadata, msgType as arguments.
err = L.CallByParam(p, lua.LString(msg.Data), luaEngine.StringMapToLTable(L, msg.Metadata), lua.LString(msg.Type))
}
if err != nil {
// if there is an error, tell the next node to fail
ctx.TellFailure(msg, err)
return
}
// get the return value from the script
ret := L.Get(-1)
// pop the value from the stack
L.Pop(1)
// check if the return value is a boolean
if ret.Type() == lua.LTBool && ret == lua.LTrue {
ctx.TellNext(msg, types.True)
} else {
ctx.TellNext(msg, types.False)
}
}
// Destroy releases the resources of the component
func (x *LuaFilter) Destroy() {
if x.pool != nil {
x.pool.Shutdown()
}
}