From 31dda6a27c9719dc33ab0d8e9f19c1e1f49972f8 Mon Sep 17 00:00:00 2001 From: seeflood <349895584@qq.com> Date: Fri, 1 Apr 2022 10:51:42 +0800 Subject: [PATCH] feat: Add startup hooks (#405) --- cmd/layotto/main.go | 1 + .../helloworld/component/helloworld.go | 23 +++ .../helloworld/component/in_memory.go | 38 ++++ .../helloworld/component/say_goodbye.go | 38 ++++ .../helloworld/grpc_api.go | 65 +++++- cmd/layotto_multiple_api/main.go | 13 +- components/custom/component.go | 20 ++ components/custom/config.go | 19 ++ components/custom/registry.go | 76 +++++++ components/custom/registry_test.go | 41 ++++ components/pkg/mock/custom_component_mock.go | 47 +++++ configs/config_in_memory.json | 7 + docs/en/start/api_plugin/helloworld.md | 13 +- docs/zh/_sidebar.md | 1 + docs/zh/component_specs/custom/common.md | 50 +++++ docs/zh/design/api_plugin/design.md | 94 ++++++++- docs/zh/start/api_plugin/helloworld.md | 29 +-- pkg/grpc/dapr/dapr_api.go | 7 +- pkg/grpc/dapr/dapr_api_secret_test.go | 8 +- pkg/grpc/dapr/dapr_api_test.go | 9 +- pkg/grpc/default_api/api.go | 17 +- pkg/grpc/default_api/api_pubsub.go | 1 + pkg/grpc/default_api/api_state.go | 1 + pkg/grpc/default_api/api_test.go | 15 +- pkg/grpc/grpc.go | 12 +- pkg/grpc/grpc_api.go | 5 +- pkg/grpc/options.go | 1 + pkg/runtime/config.go | 5 + pkg/runtime/options.go | 21 +- pkg/runtime/options_test.go | 8 +- pkg/runtime/runtime.go | 189 +++++++++++++----- pkg/runtime/runtime_test.go | 155 +++++++++++++- 32 files changed, 899 insertions(+), 130 deletions(-) create mode 100644 cmd/layotto_multiple_api/helloworld/component/helloworld.go create mode 100644 cmd/layotto_multiple_api/helloworld/component/in_memory.go create mode 100644 cmd/layotto_multiple_api/helloworld/component/say_goodbye.go create mode 100644 components/custom/component.go create mode 100644 components/custom/config.go create mode 100644 components/custom/registry.go create mode 100644 components/custom/registry_test.go create mode 100644 components/pkg/mock/custom_component_mock.go create mode 100644 docs/zh/component_specs/custom/common.md diff --git a/cmd/layotto/main.go b/cmd/layotto/main.go index 68cf14cbf5..a78967bf30 100644 --- a/cmd/layotto/main.go +++ b/cmd/layotto/main.go @@ -197,6 +197,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp } // 2. new instance rt := runtime.NewMosnRuntime(cfg) + rt.AppendInitRuntimeStage(runtime.DefaultInitRuntimeStage) // 3. run server, err := rt.Run( runtime.WithGrpcOptions(opts...), diff --git a/cmd/layotto_multiple_api/helloworld/component/helloworld.go b/cmd/layotto_multiple_api/helloworld/component/helloworld.go new file mode 100644 index 0000000000..fef68dcce8 --- /dev/null +++ b/cmd/layotto_multiple_api/helloworld/component/helloworld.go @@ -0,0 +1,23 @@ +// +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package component + +import ( + "mosn.io/layotto/components/custom" +) + +type HelloWorld interface { + custom.Component + SayHello(name string) (string, error) +} diff --git a/cmd/layotto_multiple_api/helloworld/component/in_memory.go b/cmd/layotto_multiple_api/helloworld/component/in_memory.go new file mode 100644 index 0000000000..ee15d1295d --- /dev/null +++ b/cmd/layotto_multiple_api/helloworld/component/in_memory.go @@ -0,0 +1,38 @@ +// +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package component + +import ( + "context" + "mosn.io/layotto/components/custom" +) + +type inMemoryHelloWorld struct { + ctx context.Context + config *custom.Config +} + +func (i *inMemoryHelloWorld) Initialize(ctx context.Context, config custom.Config) error { + i.ctx = ctx + i.config = &config + return nil +} + +func (i *inMemoryHelloWorld) SayHello(name string) (string, error) { + return "Hello " + name, nil +} + +func NewInMemoryHelloWorld() custom.Component { + return &inMemoryHelloWorld{} +} diff --git a/cmd/layotto_multiple_api/helloworld/component/say_goodbye.go b/cmd/layotto_multiple_api/helloworld/component/say_goodbye.go new file mode 100644 index 0000000000..00a4770e78 --- /dev/null +++ b/cmd/layotto_multiple_api/helloworld/component/say_goodbye.go @@ -0,0 +1,38 @@ +// +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package component + +import ( + "context" + "mosn.io/layotto/components/custom" +) + +type sayGoodbyeHelloWorld struct { + ctx context.Context + config *custom.Config +} + +func (s *sayGoodbyeHelloWorld) Initialize(ctx context.Context, config custom.Config) error { + s.ctx = ctx + s.config = &config + return nil +} + +func (s *sayGoodbyeHelloWorld) SayHello(name string) (string, error) { + return "Goodbye " + name, nil +} + +func NewSayGoodbyeHelloWorld() custom.Component { + return &sayGoodbyeHelloWorld{} +} diff --git a/cmd/layotto_multiple_api/helloworld/grpc_api.go b/cmd/layotto_multiple_api/helloworld/grpc_api.go index f12bb1a65d..7a345066ea 100644 --- a/cmd/layotto_multiple_api/helloworld/grpc_api.go +++ b/cmd/layotto_multiple_api/helloworld/grpc_api.go @@ -18,32 +18,77 @@ package helloworld import ( "context" + "fmt" rawGRPC "google.golang.org/grpc" pb "google.golang.org/grpc/examples/helloworld/helloworld" + "mosn.io/layotto/cmd/layotto_multiple_api/helloworld/component" + "mosn.io/layotto/components/lock" "mosn.io/layotto/pkg/grpc" grpc_api "mosn.io/layotto/pkg/grpc" - mgrpc "mosn.io/mosn/pkg/filter/network/grpc" + "mosn.io/pkg/log" ) +const componentType = "helloworld" + +// This demo will always use this component name. +const componentName = "in-memory" + func NewHelloWorldAPI(ac *grpc_api.ApplicationContext) grpc.GrpcAPI { - return &server{} + // 1. convert custom components + name2component := make(map[string]component.HelloWorld) + if len(ac.CustomComponent) != 0 { + // we only care about those components of type "helloworld" + name2comp, ok := ac.CustomComponent[componentType] + if ok && len(name2comp) > 0 { + for name, v := range name2comp { + // convert them using type assertion + comp, ok := v.(component.HelloWorld) + if !ok { + errMsg := fmt.Sprintf("custom component %s does not implement HelloWorld interface", name) + log.DefaultLogger.Errorf(errMsg) + } + name2component[name] = comp + } + } + } + // 2. construct your API implementation + return &server{ + appId: ac.AppId, + // Your API plugin can store and use all the components. + // For example,this demo set all the LockStore components here. + name2LockStore: ac.LockStores, + // Custom components of type "helloworld" + name2component: name2component, + } } // server is used to implement helloworld.GreeterServer. type server struct { + appId string + // custom components which implements the `HelloWorld` interface + name2component map[string]component.HelloWorld + // LockStore components. They are not used in this demo, we put them here as a demo. + name2LockStore map[string]lock.LockStore pb.UnimplementedGreeterServer } -func (s *server) Init(conn *rawGRPC.ClientConn) error { - return nil +// SayHello implements helloworld.GreeterServer.SayHello +func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { + if _, ok := s.name2component[componentName]; !ok { + return &pb.HelloReply{Message: "We don't want to talk with you!"}, nil + } + message, err := s.name2component[componentName].SayHello(in.GetName()) + if err != nil { + return nil, err + } + return &pb.HelloReply{Message: message}, nil } -func (s *server) Register(grpcServer *rawGRPC.Server, registeredServer mgrpc.RegisteredServer) (mgrpc.RegisteredServer, error) { - pb.RegisterGreeterServer(grpcServer, s) - return registeredServer, nil +func (s *server) Init(conn *rawGRPC.ClientConn) error { + return nil } -// SayHello implements helloworld.GreeterServer -func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { - return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil +func (s *server) Register(rawGrpcServer *rawGRPC.Server) error { + pb.RegisterGreeterServer(rawGrpcServer, s) + return nil } diff --git a/cmd/layotto_multiple_api/main.go b/cmd/layotto_multiple_api/main.go index b2b4d4d408..5b16a2d2f9 100644 --- a/cmd/layotto_multiple_api/main.go +++ b/cmd/layotto_multiple_api/main.go @@ -20,6 +20,8 @@ import ( "encoding/json" "mosn.io/api" helloworld_api "mosn.io/layotto/cmd/layotto_multiple_api/helloworld" + "mosn.io/layotto/cmd/layotto_multiple_api/helloworld/component" + "mosn.io/layotto/components/custom" component_actuators "mosn.io/layotto/components/pkg/actuators" l8_grpc "mosn.io/layotto/pkg/grpc" "mosn.io/layotto/pkg/grpc/dapr" @@ -188,6 +190,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp } // 2. new instance rt := runtime.NewMosnRuntime(cfg) + rt.AppendInitRuntimeStage(runtime.DefaultInitRuntimeStage) // 3. run server, err := rt.Run( runtime.WithGrpcOptions(opts...), @@ -206,7 +209,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp // a demo to show how to register your own gRPC API helloworld_api.NewHelloWorldAPI, // support Dapr API - // Currently it only support Dapr's InvokeService and InvokeBinding API. + // Currently it only support Dapr's InvokeService,secret API,state API and InvokeBinding API. // Note: this feature is still in Alpha state and we don't recommend that you use it in your production environment. dapr.NewDaprAPI_Alpha, ), @@ -378,7 +381,13 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp runtime_sequencer.NewFactory("in-memory", func() sequencer.Store { return sequencer_inmemory.NewInMemorySequencer() }), - )) + ), + // Custom components + runtime.WithCustomComponentFactory("helloworld", + custom.NewComponentFactory("in-memory", component.NewInMemoryHelloWorld), + custom.NewComponentFactory("goodbye", component.NewSayGoodbyeHelloWorld), + ), + ) return server, err } diff --git a/components/custom/component.go b/components/custom/component.go new file mode 100644 index 0000000000..2b76f67ec1 --- /dev/null +++ b/components/custom/component.go @@ -0,0 +1,20 @@ +// +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package custom + +import "context" + +type Component interface { + Initialize(ctx context.Context, config Config) error +} diff --git a/components/custom/config.go b/components/custom/config.go new file mode 100644 index 0000000000..a4959b778b --- /dev/null +++ b/components/custom/config.go @@ -0,0 +1,19 @@ +// +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package custom + +type Config struct { + Version string `json:"version"` + Metadata map[string]string `json:"metadata"` +} diff --git a/components/custom/registry.go b/components/custom/registry.go new file mode 100644 index 0000000000..31359d99b2 --- /dev/null +++ b/components/custom/registry.go @@ -0,0 +1,76 @@ +// +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package custom + +import ( + "fmt" + "mosn.io/layotto/components/pkg/info" +) + +type Registry interface { + Register(componentType string, factorys ...*ComponentFactory) + Create(componentType, name string) (Component, error) +} + +type ComponentFactory struct { + Name string + FactoryMethod func() Component +} + +func NewComponentFactory(name string, f func() Component) *ComponentFactory { + return &ComponentFactory{ + Name: name, + FactoryMethod: f, + } +} + +type componentRegistry struct { + stores map[string]map[string]func() Component + info *info.RuntimeInfo +} + +func NewRegistry(info *info.RuntimeInfo) Registry { + return &componentRegistry{ + stores: make(map[string]map[string]func() Component), + info: info, + } +} + +func (r *componentRegistry) Register(componentType string, fs ...*ComponentFactory) { + if len(fs) == 0 { + return + } + r.info.AddService(componentType) + // lazy init + if _, ok := r.stores[componentType]; !ok { + r.stores[componentType] = make(map[string]func() Component) + } + // register FactoryMethod + for _, f := range fs { + r.stores[componentType][f.Name] = f.FactoryMethod + r.info.RegisterComponent(componentType, f.Name) + } +} + +func (r *componentRegistry) Create(componentType, name string) (Component, error) { + store, ok := r.stores[componentType] + if !ok { + return nil, fmt.Errorf("custom component type %s is not regsitered", componentType) + } + if f, ok := store[name]; ok { + r.info.LoadComponent(componentType, name) + return f(), nil + } + return nil, fmt.Errorf("custom component %s is not regsitered", name) +} diff --git a/components/custom/registry_test.go b/components/custom/registry_test.go new file mode 100644 index 0000000000..84d443dcf3 --- /dev/null +++ b/components/custom/registry_test.go @@ -0,0 +1,41 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package custom + +import ( + "mosn.io/layotto/components/pkg/info" + "strings" + "testing" +) + +func TestNewRegistry(t *testing.T) { + r := NewRegistry(info.NewRuntimeInfo()) + compType := "my_component" + compName := "etcd" + r.Register(compType, + NewComponentFactory(compName, func() Component { + return nil + }), + ) + _, err := r.Create(compType, compName) + if err != nil { + t.Fatalf("create mock store failed: %v", err) + } + if _, err := r.Create(compType, "not exists"); !strings.Contains(err.Error(), "not regsitered") { + t.Fatalf("create mock store failed: %v", err) + } +} diff --git a/components/pkg/mock/custom_component_mock.go b/components/pkg/mock/custom_component_mock.go new file mode 100644 index 0000000000..7a26367fc6 --- /dev/null +++ b/components/pkg/mock/custom_component_mock.go @@ -0,0 +1,47 @@ +// +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package mock + +import ( + "context" + "mosn.io/layotto/components/custom" +) + +type CustomComponentMock struct { + ctx context.Context + config *custom.Config + initTimes int +} + +func NewCustomComponentMock() custom.Component { + return &CustomComponentMock{} +} + +func (c *CustomComponentMock) InitTimes() int { + return c.initTimes +} + +func (c *CustomComponentMock) Initialize(ctx context.Context, config custom.Config) error { + c.ctx = ctx + c.config = &config + c.initTimes++ + return nil +} + +func (c *CustomComponentMock) GetReceivedConfig() *custom.Config { + return c.config +} +func (c *CustomComponentMock) GetReceivedCtx() context.Context { + return c.ctx +} diff --git a/configs/config_in_memory.json b/configs/config_in_memory.json index 32c8ef30c2..983ff8d6e7 100644 --- a/configs/config_in_memory.json +++ b/configs/config_in_memory.json @@ -50,6 +50,13 @@ "metadata": {} } }, + "custom_component": { + "helloworld": { + "in-memory": { + "metadata": {} + } + } + }, "app": { "app_id": "app1", "grpc_callback_port": 9999 diff --git a/docs/en/start/api_plugin/helloworld.md b/docs/en/start/api_plugin/helloworld.md index 560887503a..4c7d768b2e 100644 --- a/docs/en/start/api_plugin/helloworld.md +++ b/docs/en/start/api_plugin/helloworld.md @@ -3,15 +3,13 @@ This is a demo to show you how to register your own API. Layotto has the api-plugin feature to let you add your own API based on your need. -## step 0. change directory -```shell -cd ${projectpath}/cmd/layotto_multiple_api -``` - ## step 1. start Layotto with a new helloworld API Build and run Layotto : ```shell +# change directory +cd ${projectpath}/cmd/layotto_multiple_api +# build it go build -o layotto # run it ./layotto start -c ../../configs/config_in_memory.json @@ -19,7 +17,7 @@ go build -o layotto Q: What happened? -Check the code in `main.go` and you will find a new API was registered during startup: +Check the code in [`main.go`](https://github.com/mosn/layotto/blob/d74ff0e8940e0eb9c73b1d3275a17d29be36bd5c/cmd/layotto_multiple_api/main.go#L203) and you will find a new API was registered during startup: ```go // register your grpc API here @@ -33,6 +31,9 @@ Check the code in `main.go` and you will find a new API was registered during st ## step 2. invoke the helloworld API ```shell +# change directory +cd ${projectpath}/cmd/layotto_multiple_api +# run demo client go run client/main.go ``` The result will be: diff --git a/docs/zh/_sidebar.md b/docs/zh/_sidebar.md index bf3275fe60..bc53a848bc 100644 --- a/docs/zh/_sidebar.md +++ b/docs/zh/_sidebar.md @@ -71,6 +71,7 @@ - [Redis](zh/component_specs/sequencer/redis.md) - [Zookeeper](zh/component_specs/sequencer/zookeeper.md) - [MongoDB](zh/component_specs/sequencer/mongo.md) + - [自定义组件](zh/component_specs/custom/common.md) - [如何部署、升级 Layotto](zh/operation/) - 设计文档 - [Actuator设计文档](zh/design/actuator/actuator-design-doc.md) diff --git a/docs/zh/component_specs/custom/common.md b/docs/zh/component_specs/custom/common.md new file mode 100644 index 0000000000..dfc96bf786 --- /dev/null +++ b/docs/zh/component_specs/custom/common.md @@ -0,0 +1,50 @@ +# 自定义组件 +## 什么是自定义组件? + +Layotto 中的组件分为两种: +- 预置组件 + +比如 `PubSub` 组件,比如 `state.Store` 组件 + +- 自定义组件 + +允许您自己扩展自己的组件,比如[使用指南](zh/design/api_plugin/design?id=_24-使用指南) 中的 `HelloWorld` 组件。 + +## 配置文件结构 +```json + "custom_component": { + "": { + "": { + "metadata": { + "": "", + "": "" + } + }, + "": { + "metadata": { + "": "", + "": "" + } + } + } + }, +``` + +您可以在metadata里配置组件关心的key/value配置。 + +## 示例 +例如,在`configs/config_in_memory.json` 中,配置了类型是`helloworld` 的 `CustomComponent`,只有一个组件,其组件名是 `in-memory`: + +```json + "custom_component": { + "helloworld": { + "in-memory": { + "metadata": {} + } + } + }, +``` + + +## 如何使用"自定义组件"? +详见 [使用指南](zh/design/api_plugin/design?id=_24-使用指南) \ No newline at end of file diff --git a/docs/zh/design/api_plugin/design.md b/docs/zh/design/api_plugin/design.md index d4e5eca6af..234f38ee24 100644 --- a/docs/zh/design/api_plugin/design.md +++ b/docs/zh/design/api_plugin/design.md @@ -133,12 +133,24 @@ protoc 编译工具会根据 proto 文件帮你编译出 go 语言的 interface ```go // server is used to implement helloworld.GreeterServer. type server struct { - pb.UnimplementedGreeterServer + appId string + // custom components which implements the `HelloWorld` interface + name2component map[string]component.HelloWorld + // LockStore components. They are not used in this demo, we put them here as a demo. + name2LockStore map[string]lock.LockStore + pb.UnimplementedGreeterServer } -// SayHello implements helloworld.GreeterServer +// SayHello implements helloworld.GreeterServer.SayHello func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { - return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil + if _, ok := s.name2component[componentName]; !ok { + return &pb.HelloReply{Message: "We don't want to talk with you!"}, nil + } + message, err := s.name2component[componentName].SayHello(in.GetName()) + if err != nil { + return nil, err + } + return &pb.HelloReply{Message: message}, nil } ``` @@ -194,9 +206,55 @@ type ApplicationContext struct { Sequencers map[string]sequencer.Store SendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) SecretStores map[string]secretstores.SecretStore + CustomComponent map[string]map[string]custom.Component } ``` +##### 解释:`CustomComponent`是什么? +是"自定义组件"。 + +Layotto 中的组件分为两种: +- 预置组件 + +比如 `pubsub` 组件,比如 `state` 组件 + +- 自定义组件 + +允许您自己扩展自己的组件,比如下面示例中的 `HelloWorld` 组件。 + +##### 解释:如何配置自定义组件? +在 json 配置文件中按以下格式配置: +```json + "custom_component": { + "": { + "": { + "metadata": { + "": "", + "": "" + } + }, + "": { + "metadata": { + "": "", + "": "" + } + } + } + }, +``` + +例如,在`configs/config_in_memory.json` 中,配置了类型是`helloworld` 的 `CustomComponent`,只有一个组件,其组件名是 `in-memory`: +```json + "custom_component": { + "helloworld": { + "in-memory": { + "metadata": {} + } + } + }, +``` + +##### 看个例子 看个具体的例子,在[helloworld 示例中](https://github.com/mosn/layotto/blob/main/cmd/layotto_multiple_api/helloworld/grpc_api.go), `*server` 实现了 `Init` 和 `Register` 方法: @@ -215,14 +273,38 @@ func (s *server) Register(grpcServer *rawGRPC.Server, registeredServer mgrpc.Reg ```go func NewHelloWorldAPI(ac *grpc_api.ApplicationContext) grpc.GrpcAPI { - return &server{} + // 1. convert custom components + name2component := make(map[string]component.HelloWorld) + if len(ac.CustomComponent) != 0 { + // we only care about those components of type "helloworld" + name2comp, ok := ac.CustomComponent[componentType] + if ok && len(name2comp) > 0 { + for name, v := range name2comp { + // convert them using type assertion + comp, ok := v.(component.HelloWorld) + if !ok { + errMsg := fmt.Sprintf("custom component %s does not implement HelloWorld interface", name) + log.DefaultLogger.Errorf(errMsg) + } + name2component[name] = comp + } + } + } + // 2. construct your API implementation + return &server{ + appId: ac.AppId, + // Your API plugin can store and use all the components. + // For example,this demo set all the LockStore components here. + name2LockStore: ac.LockStores, + // Custom components of type "helloworld" + name2component: name2component, + } } ``` +##### 解释:这些回调函数、构造函数是干嘛的? 看了这个例子,你也许会问:这些回调函数、构造函数是干嘛的? -##### `GrpcAPI` 的生命周期 - 上述钩子用于给用户扩展自定义启动逻辑。Layotto 会在启动过程中回调上述生命周期钩子和构造函数。调用顺序大致为: `Layotto 初始化好所有组件` ---> 调用`NewGrpcAPI`构造函数 ---> `GrpcAPI.Init` ---> `Layotto 创建 grpc 服务器` ---> `GrpcAPI.Register` diff --git a/docs/zh/start/api_plugin/helloworld.md b/docs/zh/start/api_plugin/helloworld.md index b71ebc4fb6..7e80332bfd 100644 --- a/docs/zh/start/api_plugin/helloworld.md +++ b/docs/zh/start/api_plugin/helloworld.md @@ -3,15 +3,14 @@ This is a demo to show you how to register your own API. Layotto has the api-plugin feature to let you add your own API based on your need. -## step 0. change directory -```shell -cd ${projectpath}/cmd/layotto_multiple_api -``` -## step 1. start Layotto with a new helloworld API +## step 1. start Layotto with a helloworld API-Plugin Build and run Layotto : ```shell +# change directory +cd ${projectpath}/cmd/layotto_multiple_api +# build it go build -o layotto # run it ./layotto start -c ../../configs/config_in_memory.json @@ -19,22 +18,26 @@ go build -o layotto Q: What happened? -Check the code in `main.go` and you will find a new API was registered during startup: +Check the code in [`main.go`](https://github.com/mosn/layotto/blob/d74ff0e8940e0eb9c73b1d3275a17d29be36bd5c/cmd/layotto_multiple_api/main.go#L203) and you will find a new API was registered during startup: ```go - // register your grpc API here - runtime.WithGrpcAPI( - // default grpc API - default_api.NewGrpcAPI, - // a demo to show how to register your own API - helloworld_api.NewHelloWorldAPI, - ), + // register your grpc API here + runtime.WithGrpcAPI( + // default grpc API + default_api.NewGrpcAPI, + // a demo to show how to register your own API + helloworld_api.NewHelloWorldAPI, + ), ``` ## step 2. invoke the helloworld API ```shell +# change directory +cd ${projectpath}/cmd/layotto_multiple_api +# run demo client go run client/main.go ``` + The result will be: ```shell diff --git a/pkg/grpc/dapr/dapr_api.go b/pkg/grpc/dapr/dapr_api.go index 1b72902773..4c50196457 100644 --- a/pkg/grpc/dapr/dapr_api.go +++ b/pkg/grpc/dapr/dapr_api.go @@ -42,7 +42,6 @@ import ( "mosn.io/layotto/pkg/grpc/dapr/proto/runtime/v1" dapr_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/runtime/v1" "mosn.io/layotto/pkg/messages" - mgrpc "mosn.io/mosn/pkg/filter/network/grpc" "mosn.io/pkg/log" "strings" ) @@ -82,9 +81,9 @@ func (d *daprGrpcAPI) startSubscribing() error { return nil } -func (d *daprGrpcAPI) Register(s *grpc.Server, registeredServer mgrpc.RegisteredServer) (mgrpc.RegisteredServer, error) { - dapr_v1pb.RegisterDaprServer(s, d) - return registeredServer, nil +func (d *daprGrpcAPI) Register(rawGrpcServer *grpc.Server) error { + dapr_v1pb.RegisterDaprServer(rawGrpcServer, d) + return nil } func (d *daprGrpcAPI) InvokeService(ctx context.Context, in *runtime.InvokeServiceRequest) (*dapr_common_v1pb.InvokeResponse, error) { diff --git a/pkg/grpc/dapr/dapr_api_secret_test.go b/pkg/grpc/dapr/dapr_api_secret_test.go index 0f0b23cafc..4d910da02c 100644 --- a/pkg/grpc/dapr/dapr_api_secret_test.go +++ b/pkg/grpc/dapr/dapr_api_secret_test.go @@ -116,9 +116,7 @@ func TestNewDaprAPI_GetSecretStores(t *testing.T) { } // Setup Dapr API server grpcAPI := NewDaprAPI_Alpha(&grpc_api.ApplicationContext{ - "", nil, nil, nil, nil, - nil, nil, nil, nil, - nil, fakeStores}) + SecretStores: fakeStores}) err := grpcAPI.Init(nil) if err != nil { t.Errorf("grpcAPI.Init error") @@ -190,9 +188,7 @@ func TestGetBulkSecret(t *testing.T) { // Setup Dapr API server // Setup Dapr API server grpcAPI := NewDaprAPI_Alpha(&grpc_api.ApplicationContext{ - "", nil, nil, nil, nil, - nil, nil, nil, nil, - nil, fakeStores}) + SecretStores: fakeStores}) // Run test server err := grpcAPI.Init(nil) if err != nil { diff --git a/pkg/grpc/dapr/dapr_api_test.go b/pkg/grpc/dapr/dapr_api_test.go index 4bd211ae07..650f5678f1 100644 --- a/pkg/grpc/dapr/dapr_api_test.go +++ b/pkg/grpc/dapr/dapr_api_test.go @@ -65,14 +65,13 @@ func TestNewDaprAPI_Alpha(t *testing.T) { } // construct API grpcAPI := NewDaprAPI_Alpha(&grpc_api.ApplicationContext{ - "", nil, nil, nil, nil, - map[string]state.Store{"mock": store}, nil, nil, nil, - func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + StateStores: map[string]state.Store{"mock": store}, + SendToOutputBindingFn: func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { if name == "error-binding" { return nil, errors.New("error when invoke binding") } return &bindings.InvokeResponse{Data: []byte("ok")}, nil - }, nil}) + }}) err := grpcAPI.Init(nil) if err != nil { t.Errorf("grpcAPI.Init error") @@ -108,7 +107,7 @@ func startDaprServerForTest(port int, srv DaprGrpcAPI) *grpc.Server { server := grpc.NewServer() go func() { - srv.Register(server, server) + srv.Register(server) if err := server.Serve(lis); err != nil { panic(err) } diff --git a/pkg/grpc/default_api/api.go b/pkg/grpc/default_api/api.go index e42b6baa90..0aa0d50468 100644 --- a/pkg/grpc/default_api/api.go +++ b/pkg/grpc/default_api/api.go @@ -23,18 +23,16 @@ import ( "sync" "github.com/dapr/components-contrib/bindings" - grpc_api "mosn.io/layotto/pkg/grpc" - "mosn.io/layotto/pkg/grpc/dapr" - dapr_common_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/common/v1" - dapr_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/runtime/v1" - mgrpc "mosn.io/mosn/pkg/filter/network/grpc" - "github.com/dapr/components-contrib/pubsub" "github.com/dapr/components-contrib/state" jsoniter "github.com/json-iterator/go" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" "mosn.io/layotto/components/file" + grpc_api "mosn.io/layotto/pkg/grpc" + "mosn.io/layotto/pkg/grpc/dapr" + dapr_common_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/common/v1" + dapr_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/runtime/v1" "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/hello" @@ -78,6 +76,7 @@ type API interface { PublishEvent(context.Context, *runtimev1pb.PublishEventRequest) (*emptypb.Empty, error) // State GetState(ctx context.Context, in *runtimev1pb.GetStateRequest) (*runtimev1pb.GetStateResponse, error) + // Get a batch of state data GetBulkState(ctx context.Context, in *runtimev1pb.GetBulkStateRequest) (*runtimev1pb.GetBulkStateResponse, error) SaveState(ctx context.Context, in *runtimev1pb.SaveStateRequest) (*emptypb.Empty, error) DeleteState(ctx context.Context, in *runtimev1pb.DeleteStateRequest) (*emptypb.Empty, error) @@ -136,10 +135,10 @@ func (a *api) Init(conn *grpc.ClientConn) error { return a.startSubscribing() } -func (a *api) Register(s *grpc.Server, registeredServer mgrpc.RegisteredServer) (mgrpc.RegisteredServer, error) { +func (a *api) Register(rawGrpcServer *grpc.Server) error { LayottoAPISingleton = a - runtimev1pb.RegisterRuntimeServer(s, a) - return registeredServer, nil + runtimev1pb.RegisterRuntimeServer(rawGrpcServer, a) + return nil } func NewGrpcAPI(ac *grpc_api.ApplicationContext) grpc_api.GrpcAPI { diff --git a/pkg/grpc/default_api/api_pubsub.go b/pkg/grpc/default_api/api_pubsub.go index 8a202252f9..dc8b7cc7b0 100644 --- a/pkg/grpc/default_api/api_pubsub.go +++ b/pkg/grpc/default_api/api_pubsub.go @@ -37,6 +37,7 @@ import ( "mosn.io/pkg/log" ) +// Publishes events to the specific topic. func (a *api) PublishEvent(ctx context.Context, in *runtimev1pb.PublishEventRequest) (*emptypb.Empty, error) { result, err := a.doPublishEvent(ctx, in.PubsubName, in.Topic, in.Data, in.DataContentType, in.Metadata) if err != nil { diff --git a/pkg/grpc/default_api/api_state.go b/pkg/grpc/default_api/api_state.go index 6e5df4fa23..deda130158 100644 --- a/pkg/grpc/default_api/api_state.go +++ b/pkg/grpc/default_api/api_state.go @@ -62,6 +62,7 @@ func (a *api) SaveState(ctx context.Context, in *runtimev1pb.SaveStateRequest) ( return a.daprAPI.SaveState(ctx, daprReq) } +// GetBulkState gets a batch of state data func (a *api) GetBulkState(ctx context.Context, in *runtimev1pb.GetBulkStateRequest) (*runtimev1pb.GetBulkStateResponse, error) { if in == nil { return &runtimev1pb.GetBulkStateResponse{}, status.Error(codes.InvalidArgument, "GetBulkStateRequest is nil") diff --git a/pkg/grpc/default_api/api_test.go b/pkg/grpc/default_api/api_test.go index 21e66b08d2..86e71bbc65 100644 --- a/pkg/grpc/default_api/api_test.go +++ b/pkg/grpc/default_api/api_test.go @@ -168,10 +168,21 @@ func TestInvokeService(t *testing.T) { }, } - a := NewAPI("", nil, nil, + a := NewAPI( + "", + nil, + nil, map[string]rpc.Invoker{ mosninvoker.Name: mockInvoker, - }, nil, nil, nil, nil, nil, nil, nil) + }, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + ) _, err := a.InvokeService(context.Background(), in) assert.Nil(t, err) diff --git a/pkg/grpc/grpc.go b/pkg/grpc/grpc.go index e17b661f63..60324af117 100644 --- a/pkg/grpc/grpc.go +++ b/pkg/grpc/grpc.go @@ -37,16 +37,18 @@ func NewGrpcServer(opts ...Option) (mgrpc.RegisteredServer, error) { } func NewDefaultServer(apis []GrpcAPI, opts ...grpc.ServerOption) (mgrpc.RegisteredServer, error) { + return NewRawGrpcServer(apis, opts...) +} + +func NewRawGrpcServer(apis []GrpcAPI, opts ...grpc.ServerOption) (*grpc.Server, error) { s := grpc.NewServer(opts...) - // create registeredServer to manage lifecycle of the grpc server - var registeredServer mgrpc.RegisteredServer = s var err error = nil // loop registering grpc api for _, grpcAPI := range apis { - registeredServer, err = grpcAPI.Register(s, registeredServer) + err = grpcAPI.Register(s) if err != nil { - return registeredServer, err + return s, err } } - return registeredServer, nil + return s, nil } diff --git a/pkg/grpc/grpc_api.go b/pkg/grpc/grpc_api.go index 23bc4baee7..6158927e29 100644 --- a/pkg/grpc/grpc_api.go +++ b/pkg/grpc/grpc_api.go @@ -23,12 +23,12 @@ import ( "github.com/dapr/components-contrib/state" "google.golang.org/grpc" "mosn.io/layotto/components/configstores" + "mosn.io/layotto/components/custom" "mosn.io/layotto/components/file" "mosn.io/layotto/components/hello" "mosn.io/layotto/components/lock" "mosn.io/layotto/components/rpc" "mosn.io/layotto/components/sequencer" - mgrpc "mosn.io/mosn/pkg/filter/network/grpc" ) // GrpcAPI is the interface of API plugin. It has lifecycle related methods @@ -38,7 +38,7 @@ type GrpcAPI interface { Init(conn *grpc.ClientConn) error // Bind this API to the grpc server - Register(s *grpc.Server, registeredServer mgrpc.RegisteredServer) (mgrpc.RegisteredServer, error) + Register(rawGrpcServer *grpc.Server) error } // NewGrpcAPI is the constructor of GrpcAPI @@ -58,4 +58,5 @@ type ApplicationContext struct { Sequencers map[string]sequencer.Store SendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) SecretStores map[string]secretstores.SecretStore + CustomComponent map[string]map[string]custom.Component } diff --git a/pkg/grpc/options.go b/pkg/grpc/options.go index 7120635324..474a08ae5c 100644 --- a/pkg/grpc/options.go +++ b/pkg/grpc/options.go @@ -29,6 +29,7 @@ type grpcOptions struct { type Option func(o *grpcOptions) +// apis CANNOT be nil. func WithGrpcAPIs(apis []GrpcAPI) Option { return func(o *grpcOptions) { o.apis = apis diff --git a/pkg/runtime/config.go b/pkg/runtime/config.go index 0e43157bbf..6625d0d80b 100644 --- a/pkg/runtime/config.go +++ b/pkg/runtime/config.go @@ -18,6 +18,7 @@ package runtime import ( "encoding/json" + "mosn.io/layotto/components/custom" "mosn.io/layotto/pkg/runtime/bindings" @@ -49,6 +50,10 @@ type MosnRuntimeConfig struct { SequencerManagement map[string]sequencer.Config `json:"sequencer"` Bindings map[string]bindings.Metadata `json:"bindings"` SecretStoresManagement map[string]bindings.Metadata `json:"secretStores"` + // + // e.g. <"super_pubsub","etcd",config> + CustomComponent map[string]map[string]custom.Config `json:"custom_component,omitempty"` + Extends map[string]json.RawMessage `json:"extends,omitempty"` // extend config } func ParseRuntimeConfig(data json.RawMessage) (*MosnRuntimeConfig, error) { diff --git a/pkg/runtime/options.go b/pkg/runtime/options.go index 18f402416c..76fe77286d 100644 --- a/pkg/runtime/options.go +++ b/pkg/runtime/options.go @@ -19,6 +19,7 @@ package runtime import ( "google.golang.org/grpc" "mosn.io/layotto/components/configstores" + "mosn.io/layotto/components/custom" "mosn.io/layotto/components/file" "mosn.io/layotto/components/hello" "mosn.io/layotto/components/rpc" @@ -45,6 +46,9 @@ type services struct { outputBinding []*mbindings.OutputBindingFactory inputBinding []*mbindings.InputBindingFactory secretStores []*msecretstores.SecretStoresFactory + // Custom components. + // The key is component type + custom map[string][]*custom.ComponentFactory } type runtimeOptions struct { @@ -58,6 +62,14 @@ type runtimeOptions struct { apiFactorys []rgrpc.NewGrpcAPI } +func newRuntimeOptions() *runtimeOptions { + return &runtimeOptions{ + services: services{ + custom: make(map[string][]*custom.ComponentFactory), + }, + } +} + type Option func(o *runtimeOptions) func WithNewServer(f rgrpc.NewServer) Option { @@ -89,7 +101,14 @@ func WithErrInterceptor(i ErrInterceptor) Option { } } -// services options +func WithCustomComponentFactory(componentType string, factorys ...*custom.ComponentFactory) Option { + return func(o *runtimeOptions) { + if len(factorys) == 0 { + return + } + o.services.custom[componentType] = append(o.services.custom[componentType], factorys...) + } +} func WithHelloFactory(hellos ...*hello.HelloFactory) Option { return func(o *runtimeOptions) { diff --git a/pkg/runtime/options_test.go b/pkg/runtime/options_test.go index c648403792..c9157c57b7 100644 --- a/pkg/runtime/options_test.go +++ b/pkg/runtime/options_test.go @@ -26,13 +26,7 @@ func TestWithErrInterceptor(t *testing.T) { var f ErrInterceptor = func(err error, format string, args ...interface{}) { cnt++ } - rt := &runtimeOptions{ - services: services{}, - srvMaker: nil, - errInt: nil, - options: nil, - apiFactorys: nil, - } + rt := newRuntimeOptions() WithErrInterceptor(f)(rt) rt.errInt(nil, "") assert.True(t, cnt == 1) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index cbd679aac7..ae86b22e81 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -21,8 +21,10 @@ import ( "errors" "fmt" "github.com/dapr/components-contrib/secretstores" + "mosn.io/layotto/components/custom" msecretstores "mosn.io/layotto/pkg/runtime/secretstores" "strings" + "time" "github.com/dapr/components-contrib/bindings" mbindings "mosn.io/layotto/pkg/runtime/bindings" @@ -53,16 +55,17 @@ type MosnRuntime struct { info *info.RuntimeInfo srv mgrpc.RegisteredServer // component registry - helloRegistry hello.Registry - configStoreRegistry configstores.Registry - rpcRegistry rpc.Registry - pubSubRegistry runtime_pubsub.Registry - stateRegistry runtime_state.Registry - lockRegistry runtime_lock.Registry - sequencerRegistry runtime_sequencer.Registry - fileRegistry file.Registry - bindingsRegistry mbindings.Registry - secretStoresRegistry msecretstores.Registry + helloRegistry hello.Registry + configStoreRegistry configstores.Registry + rpcRegistry rpc.Registry + pubSubRegistry runtime_pubsub.Registry + stateRegistry runtime_state.Registry + lockRegistry runtime_lock.Registry + sequencerRegistry runtime_sequencer.Registry + fileRegistry file.Registry + bindingsRegistry mbindings.Registry + secretStoresRegistry msecretstores.Registry + customComponentRegistry custom.Registry // component pool hellos map[string]hello.HelloService // config management system component @@ -70,43 +73,55 @@ type MosnRuntime struct { rpcs map[string]rpc.Invoker pubSubs map[string]pubsub.PubSub // state implementations store here are already initialized - states map[string]state.Store - files map[string]file.File - locks map[string]lock.LockStore - sequencers map[string]sequencer.Store - outputBindings map[string]bindings.OutputBinding - secretStores map[string]secretstores.SecretStore + states map[string]state.Store + files map[string]file.File + locks map[string]lock.LockStore + sequencers map[string]sequencer.Store + outputBindings map[string]bindings.OutputBinding + secretStores map[string]secretstores.SecretStore + customComponent map[string]map[string]custom.Component // app callback AppCallbackConn *rawGRPC.ClientConn // extends - errInt ErrInterceptor + errInt ErrInterceptor + started bool + initRuntimeStages []initRuntimeStage } +func (m *MosnRuntime) RuntimeConfig() *MosnRuntimeConfig { + return m.runtimeConfig +} + +type initRuntimeStage func(o *runtimeOptions, m *MosnRuntime) error + func NewMosnRuntime(runtimeConfig *MosnRuntimeConfig) *MosnRuntime { info := info.NewRuntimeInfo() return &MosnRuntime{ - runtimeConfig: runtimeConfig, - info: info, - helloRegistry: hello.NewRegistry(info), - configStoreRegistry: configstores.NewRegistry(info), - rpcRegistry: rpc.NewRegistry(info), - pubSubRegistry: runtime_pubsub.NewRegistry(info), - stateRegistry: runtime_state.NewRegistry(info), - bindingsRegistry: mbindings.NewRegistry(info), - fileRegistry: file.NewRegistry(info), - lockRegistry: runtime_lock.NewRegistry(info), - sequencerRegistry: runtime_sequencer.NewRegistry(info), - secretStoresRegistry: msecretstores.NewRegistry(info), - hellos: make(map[string]hello.HelloService), - configStores: make(map[string]configstores.Store), - rpcs: make(map[string]rpc.Invoker), - pubSubs: make(map[string]pubsub.PubSub), - states: make(map[string]state.Store), - files: make(map[string]file.File), - locks: make(map[string]lock.LockStore), - sequencers: make(map[string]sequencer.Store), - outputBindings: make(map[string]bindings.OutputBinding), - secretStores: make(map[string]secretstores.SecretStore), + runtimeConfig: runtimeConfig, + info: info, + helloRegistry: hello.NewRegistry(info), + configStoreRegistry: configstores.NewRegistry(info), + rpcRegistry: rpc.NewRegistry(info), + pubSubRegistry: runtime_pubsub.NewRegistry(info), + stateRegistry: runtime_state.NewRegistry(info), + bindingsRegistry: mbindings.NewRegistry(info), + fileRegistry: file.NewRegistry(info), + lockRegistry: runtime_lock.NewRegistry(info), + sequencerRegistry: runtime_sequencer.NewRegistry(info), + secretStoresRegistry: msecretstores.NewRegistry(info), + customComponentRegistry: custom.NewRegistry(info), + hellos: make(map[string]hello.HelloService), + configStores: make(map[string]configstores.Store), + rpcs: make(map[string]rpc.Invoker), + pubSubs: make(map[string]pubsub.PubSub), + states: make(map[string]state.Store), + files: make(map[string]file.File), + locks: make(map[string]lock.LockStore), + sequencers: make(map[string]sequencer.Store), + outputBindings: make(map[string]bindings.OutputBinding), + secretStores: make(map[string]secretstores.SecretStore), + customComponent: make(map[string]map[string]custom.Component), + started: false, } } @@ -136,10 +151,13 @@ func (m *MosnRuntime) sendToOutputBinding(name string, req *bindings.InvokeReque } func (m *MosnRuntime) Run(opts ...Option) (mgrpc.RegisteredServer, error) { + // 0. mark already started + m.started = true + // 1. init runtime stage // prepare runtimeOptions - var o runtimeOptions + o := newRuntimeOptions() for _, opt := range opts { - opt(&o) + opt(o) } // set ErrInterceptor if o.errInt != nil { @@ -150,7 +168,7 @@ func (m *MosnRuntime) Run(opts ...Option) (mgrpc.RegisteredServer, error) { } } // init runtime with runtimeOptions - if err := m.initRuntime(&o); err != nil { + if err := m.initRuntime(o); err != nil { return nil, err } // prepare grpcOpts @@ -158,7 +176,7 @@ func (m *MosnRuntime) Run(opts ...Option) (mgrpc.RegisteredServer, error) { if o.srvMaker != nil { grpcOpts = append(grpcOpts, grpc.WithNewServer(o.srvMaker)) } - // create GrpcAPIs + // 2. init GrpcAPI stage var apis []grpc.GrpcAPI ac := &grpc.ApplicationContext{ m.runtimeConfig.AppManagement.AppId, @@ -172,6 +190,7 @@ func (m *MosnRuntime) Run(opts ...Option) (mgrpc.RegisteredServer, error) { m.sequencers, m.sendToOutputBinding, m.secretStores, + m.customComponent, } for _, apiFactory := range o.apiFactorys { @@ -187,7 +206,7 @@ func (m *MosnRuntime) Run(opts ...Option) (mgrpc.RegisteredServer, error) { grpc.WithGrpcOptions(o.options...), grpc.WithGrpcAPIs(apis), ) - // create grpc server + // 3. create grpc server var err error = nil m.srv, err = grpc.NewGrpcServer(grpcOpts...) return m.srv, err @@ -199,7 +218,7 @@ func (m *MosnRuntime) Stop() { } } -func (m *MosnRuntime) initRuntime(o *runtimeOptions) error { +func DefaultInitRuntimeStage(o *runtimeOptions, m *MosnRuntime) error { if m.runtimeConfig == nil { return errors.New("[runtime] init error:no runtimeConfig") } @@ -208,6 +227,9 @@ func (m *MosnRuntime) initRuntime(o *runtimeOptions) error { return err } // init all kinds of components with config + if err := m.initCustomComponents(o.services.custom); err != nil { + return err + } if err := m.initHellos(o.services.hellos...); err != nil { return err } @@ -507,3 +529,80 @@ func (m *MosnRuntime) initSecretStores(factorys ...*msecretstores.SecretStoresFa } return nil } + +func (m *MosnRuntime) AppendInitRuntimeStage(f initRuntimeStage) { + if f == nil || m.started { + log.DefaultLogger.Errorf("[runtime] invalid initRuntimeStage or already started") + return + } + m.initRuntimeStages = append(m.initRuntimeStages, f) +} + +func (m *MosnRuntime) initRuntime(r *runtimeOptions) error { + st := time.Now() + // check default handler + if len(m.initRuntimeStages) == 0 { + m.initRuntimeStages = append(m.initRuntimeStages, DefaultInitRuntimeStage) + } + // do initialization + for _, f := range m.initRuntimeStages { + err := f(r, m) + if err != nil { + return err + } + } + + log.DefaultLogger.Infof("[runtime] initRuntime stages cost: %v", time.Since(st)) + return nil +} + +func (m *MosnRuntime) SetCustomComponent(componentType string, name string, component custom.Component) { + if _, ok := m.customComponent[componentType]; !ok { + m.customComponent[componentType] = make(map[string]custom.Component) + } + m.customComponent[componentType][name] = component +} + +func (m *MosnRuntime) initCustomComponents(type2factorys map[string][]*custom.ComponentFactory) error { + log.DefaultLogger.Infof("[runtime] start initializing custom components") + // 1. validation + if len(type2factorys) == 0 { + log.DefaultLogger.Infof("[runtime] no custom component factorys compiled") + return nil + } + if len(m.runtimeConfig.CustomComponent) == 0 { + log.DefaultLogger.Infof("[runtime] no custom components in configuration") + return nil + } + // 2. loop registering all types of components. + for compType, factorys := range type2factorys { + // 2.0. check empty + if len(factorys) == 0 { + continue + } + name2Config, ok := m.runtimeConfig.CustomComponent[compType] + if !ok { + log.DefaultLogger.Errorf("[runtime] Your required component type %s is not supported.Please check your configuration", compType) + continue + } + // 2.1. register all the factorys + m.customComponentRegistry.Register(compType, factorys...) + // 2.2. loop initializing component instances + for name, config := range name2Config { + // create the component + comp, err := m.customComponentRegistry.Create(compType, name) + if err != nil { + m.errInt(err, "create custom component %s failed", name) + return err + } + // init + if err := comp.Initialize(context.TODO(), config); err != nil { + m.errInt(err, "init custom component %s failed", name) + return err + } + // initialization finish + m.SetCustomComponent(compType, name, comp) + } + } + return nil +} diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index 3d1b090ac2..8e10df7568 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -23,10 +23,12 @@ import ( "fmt" "github.com/dapr/components-contrib/bindings" "google.golang.org/grpc/test/bufconn" + "mosn.io/layotto/components/custom" "mosn.io/layotto/components/hello/helloworld" sequencer_etcd "mosn.io/layotto/components/sequencer/etcd" sequencer_redis "mosn.io/layotto/components/sequencer/redis" sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper" + "mosn.io/layotto/pkg/grpc" "mosn.io/layotto/pkg/grpc/default_api" mock_appcallback "mosn.io/layotto/pkg/mock/runtime/appcallback" mbindings "mosn.io/layotto/pkg/runtime/bindings" @@ -45,6 +47,7 @@ import ( "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/hello" "mosn.io/layotto/components/lock" + mock_component "mosn.io/layotto/components/pkg/mock" "mosn.io/layotto/components/rpc" "mosn.io/layotto/components/sequencer" "mosn.io/layotto/pkg/mock" @@ -74,6 +77,39 @@ func TestMosnRuntime_GetInfo(t *testing.T) { rt.Stop() } +type superPubsub interface { + custom.Component + sayGoodBye() string +} + +type superPubsubImpl struct { + custom.Component +} + +func (s *superPubsubImpl) sayGoodBye() string { + return "good bye!" +} + +func newSuperPubsub() custom.Component { + return &superPubsubImpl{mock_component.NewCustomComponentMock()} +} + +type mockGrpcAPI struct { + comp superPubsub +} + +func (m *mockGrpcAPI) Init(conn *rawGRPC.ClientConn) error { + return nil +} + +func (m mockGrpcAPI) Register(rawGrpcServer *rawGRPC.Server) error { + return nil +} + +func (m *mockGrpcAPI) sayGoodBye() string { + return m.comp.sayGoodBye() +} + func TestMosnRuntime_Run(t *testing.T) { t.Run("run succ", func(t *testing.T) { runtimeConfig := &MosnRuntimeConfig{} @@ -88,6 +124,50 @@ func TestMosnRuntime_Run(t *testing.T) { assert.NotNil(t, server) rt.Stop() }) + t.Run("run succesfully with initRuntimeStage", func(t *testing.T) { + runtimeConfig := &MosnRuntimeConfig{} + rt := NewMosnRuntime(runtimeConfig) + etcdCustomComponent := mock_component.NewCustomComponentMock() + compType := "xxx_store" + compName := "etcd" + rt.AppendInitRuntimeStage(func(o *runtimeOptions, m *MosnRuntime) error { + m.SetCustomComponent(compType, compName, etcdCustomComponent) + return nil + }) + expect := false + server, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + func(ac *grpc.ApplicationContext) grpc.GrpcAPI { + if ac.CustomComponent[compType][compName] == etcdCustomComponent { + expect = true + } + return &mockGrpcAPI{} + }, + ), + ) + assert.True(t, expect) + assert.Nil(t, err) + assert.NotNil(t, server) + rt.Stop() + }) + t.Run("run with initRuntimeStage error", func(t *testing.T) { + runtimeConfig := &MosnRuntimeConfig{} + rt := NewMosnRuntime(runtimeConfig) + rt.AppendInitRuntimeStage(nil) + var expectErr error = errors.New("expected") + rt.AppendInitRuntimeStage(func(o *runtimeOptions, m *MosnRuntime) error { + return expectErr + }) + _, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + ), + ) + assert.Equal(t, err, expectErr) + }) t.Run("no runtime config", func(t *testing.T) { rt := NewMosnRuntime(nil) @@ -123,9 +203,6 @@ func TestMosnRuntime_Run(t *testing.T) { } rt := NewMosnRuntime(cfg) - rt.errInt = func(err error, format string, args ...interface{}) { - panic(err) - } // 3. Run _, err := rt.Run( // Hello @@ -385,7 +462,7 @@ func TestMosnRuntime_initOutputBinding(t *testing.T) { return &MockBindings{} }) mdata := make(map[string]string) - m.runtimeConfig.Bindings = make(map[string]mbindings.Metadata) + m.RuntimeConfig().Bindings = make(map[string]mbindings.Metadata) m.runtimeConfig.Bindings["mockOutbindings"] = mbindings.Metadata{ Metadata: mdata, } @@ -393,6 +470,70 @@ func TestMosnRuntime_initOutputBinding(t *testing.T) { assert.NotNil(t, m.outputBindings["mockOutbindings"]) } +func TestMosnRuntime_runWithCustomComponentAndAPI(t *testing.T) { + t.Run("normal", func(t *testing.T) { + compType := "super_pubsub" + compName := "etcd" + // 1. construct config + cfg := &MosnRuntimeConfig{ + CustomComponent: map[string]map[string]custom.Config{ + compType: { + compName: custom.Config{ + Version: "", + Metadata: nil, + }, + }, + }, + } + // 2. construct runtime + rt := NewMosnRuntime(cfg) + var customAPI *mockGrpcAPI + // 3. Run + server, err := rt.Run( + WithErrInterceptor(func(err error, format string, args ...interface{}) { + panic(err) + }), + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + func(ac *grpc.ApplicationContext) grpc.GrpcAPI { + comp := ac.CustomComponent[compType][compName].(superPubsub) + customAPI = &mockGrpcAPI{comp: comp} + return customAPI + }, + ), + // Custom components + WithCustomComponentFactory(compType, + custom.NewComponentFactory(compName, newSuperPubsub), + ), + // Hello + WithHelloFactory( + hello.NewHelloFactory("helloworld", helloworld.NewHelloWorld), + ), + // Sequencer + WithSequencerFactory( + runtime_sequencer.NewFactory(compName, func() sequencer.Store { + return sequencer_etcd.NewEtcdSequencer(log.DefaultLogger) + }), + runtime_sequencer.NewFactory("redis", func() sequencer.Store { + return sequencer_redis.NewStandaloneRedisSequencer(log.DefaultLogger) + }), + runtime_sequencer.NewFactory("zookeeper", func() sequencer.Store { + return sequencer_zookeeper.NewZookeeperSequencer(log.DefaultLogger) + }), + ), + ) + // 4. assert + assert.Nil(t, err) + assert.NotNil(t, server) + // 5. invoke customAPI + bye := customAPI.sayGoodBye() + assert.Equal(t, bye, "good bye!") + // 6. stop + rt.Stop() + }) +} + func TestMosnRuntime_runWithPubsub(t *testing.T) { t.Run("normal", func(t *testing.T) { // mock pubsub component @@ -406,11 +547,11 @@ func TestMosnRuntime_runWithPubsub(t *testing.T) { // 2. construct runtime rt, _ := runtimeWithCallbackConnection(t) - rt.errInt = func(err error, format string, args ...interface{}) { - panic(err) - } // 3. Run server, err := rt.Run( + WithErrInterceptor(func(err error, format string, args ...interface{}) { + panic(err) + }), // Hello WithHelloFactory( hello.NewHelloFactory("helloworld", helloworld.NewHelloWorld),