diff --git a/.github/workflows/license.yaml b/.github/workflows/license.yaml index 865f8e25..17e995c8 100644 --- a/.github/workflows/license.yaml +++ b/.github/workflows/license.yaml @@ -27,4 +27,4 @@ jobs: steps: - uses: actions/checkout@v3 - name: Run license check - run: ./license-checker/license-checker.sh \ No newline at end of file + run: make license \ No newline at end of file diff --git a/admin/client/README.md b/admin/client/README.md index 3cb0214f..e67832d6 100644 --- a/admin/client/README.md +++ b/admin/client/README.md @@ -7,6 +7,7 @@ This API client was generated by the [OpenAPI Generator](https://openapi-generat - API version: 1.0.0 - Package version: 1.0.0 +- Generator version: 7.6.0 - Build package: org.openapitools.codegen.languages.GoClientCodegen For more information, please visit [https://github.com/FunctionStream](https://github.com/FunctionStream) @@ -22,7 +23,7 @@ go get golang.org/x/net/context Put the package under your project folder and add the following in import: ```go -import adminclient "github.com/functionstream/functionstream/admin/client" +import adminclient "github.com/functionstream/function-stream/admin/client" ``` To use a proxy, set the environment variable `HTTP_PROXY`: diff --git a/admin/client/client.go b/admin/client/client.go index e211c48e..88da64e2 100644 --- a/admin/client/client.go +++ b/admin/client/client.go @@ -176,7 +176,7 @@ func parameterAddToHeaderOrQuery(headerOrQueryParams interface{}, keyPrefix stri return } if t, ok := obj.(time.Time); ok { - parameterAddToHeaderOrQuery(headerOrQueryParams, keyPrefix, t.Format(time.RFC3339), collectionType) + parameterAddToHeaderOrQuery(headerOrQueryParams, keyPrefix, t.Format(time.RFC3339Nano), collectionType) return } value = v.Type().String() + " value" diff --git a/admin/client/docs/ModelFunction.md b/admin/client/docs/ModelFunction.md index 8306182f..46a729c3 100644 --- a/admin/client/docs/ModelFunction.md +++ b/admin/client/docs/ModelFunction.md @@ -7,6 +7,7 @@ Name | Type | Description | Notes **Config** | Pointer to **map[string]string** | | [optional] **Name** | **string** | | **Namespace** | Pointer to **string** | | [optional] +**Package** | **string** | | **Replicas** | **int32** | | **Runtime** | [**ModelRuntimeConfig**](ModelRuntimeConfig.md) | | **Sink** | [**ModelTubeConfig**](ModelTubeConfig.md) | | @@ -16,7 +17,7 @@ Name | Type | Description | Notes ### NewModelFunction -`func NewModelFunction(name string, replicas int32, runtime ModelRuntimeConfig, sink ModelTubeConfig, source []ModelTubeConfig, ) *ModelFunction` +`func NewModelFunction(name string, package_ string, replicas int32, runtime ModelRuntimeConfig, sink ModelTubeConfig, source []ModelTubeConfig, ) *ModelFunction` NewModelFunction instantiates a new ModelFunction object This constructor will assign default values to properties that have it defined, @@ -101,6 +102,26 @@ SetNamespace sets Namespace field to given value. HasNamespace returns a boolean if a field has been set. +### GetPackage + +`func (o *ModelFunction) GetPackage() string` + +GetPackage returns the Package field if non-nil, zero value otherwise. + +### GetPackageOk + +`func (o *ModelFunction) GetPackageOk() (*string, bool)` + +GetPackageOk returns a tuple with the Package field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetPackage + +`func (o *ModelFunction) SetPackage(v string)` + +SetPackage sets Package field to given value. + + ### GetReplicas `func (o *ModelFunction) GetReplicas() int32` diff --git a/admin/client/model_model_function.go b/admin/client/model_model_function.go index eb5f1dbc..85922ee5 100644 --- a/admin/client/model_model_function.go +++ b/admin/client/model_model_function.go @@ -24,6 +24,7 @@ type ModelFunction struct { Config *map[string]string `json:"config,omitempty"` Name string `json:"name"` Namespace *string `json:"namespace,omitempty"` + Package string `json:"package"` Replicas int32 `json:"replicas"` Runtime ModelRuntimeConfig `json:"runtime"` Sink ModelTubeConfig `json:"sink"` @@ -36,9 +37,10 @@ type _ModelFunction ModelFunction // This constructor will assign default values to properties that have it defined, // and makes sure properties required by API are set, but the set of arguments // will change when the set of required properties is changed -func NewModelFunction(name string, replicas int32, runtime ModelRuntimeConfig, sink ModelTubeConfig, source []ModelTubeConfig) *ModelFunction { +func NewModelFunction(name string, package_ string, replicas int32, runtime ModelRuntimeConfig, sink ModelTubeConfig, source []ModelTubeConfig) *ModelFunction { this := ModelFunction{} this.Name = name + this.Package = package_ this.Replicas = replicas this.Runtime = runtime this.Sink = sink @@ -142,6 +144,30 @@ func (o *ModelFunction) SetNamespace(v string) { o.Namespace = &v } +// GetPackage returns the Package field value +func (o *ModelFunction) GetPackage() string { + if o == nil { + var ret string + return ret + } + + return o.Package +} + +// GetPackageOk returns a tuple with the Package field value +// and a boolean to check if the value has been set. +func (o *ModelFunction) GetPackageOk() (*string, bool) { + if o == nil { + return nil, false + } + return &o.Package, true +} + +// SetPackage sets field value +func (o *ModelFunction) SetPackage(v string) { + o.Package = v +} + // GetReplicas returns the Replicas field value func (o *ModelFunction) GetReplicas() int32 { if o == nil { @@ -255,6 +281,7 @@ func (o ModelFunction) ToMap() (map[string]interface{}, error) { if !IsNil(o.Namespace) { toSerialize["namespace"] = o.Namespace } + toSerialize["package"] = o.Package toSerialize["replicas"] = o.Replicas toSerialize["runtime"] = o.Runtime toSerialize["sink"] = o.Sink @@ -268,6 +295,7 @@ func (o *ModelFunction) UnmarshalJSON(data []byte) (err error) { // that every required field exists as a key in the generic map. requiredProperties := []string{ "name", + "package", "replicas", "runtime", "sink", diff --git a/admin/client/utils.go b/admin/client/utils.go index adf098e7..be8dcda5 100644 --- a/admin/client/utils.go +++ b/admin/client/utils.go @@ -320,7 +320,7 @@ func NewNullableTime(val *time.Time) *NullableTime { } func (v NullableTime) MarshalJSON() ([]byte, error) { - return v.value.MarshalJSON() + return json.Marshal(v.value) } func (v *NullableTime) UnmarshalJSON(src []byte) error { diff --git a/apidocs.json b/apidocs.json index 8c3d0b4d..50c6fb4c 100644 --- a/apidocs.json +++ b/apidocs.json @@ -336,6 +336,7 @@ "model.Function": { "required": [ "name", + "package", "runtime", "source", "sink", @@ -354,6 +355,9 @@ "namespace": { "type": "string" }, + "package": { + "type": "string" + }, "replicas": { "type": "integer", "format": "int32" diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index d4dc104c..ed4fd491 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "github.com/functionstream/function-stream/common/config" + "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/runtime/wazero" @@ -117,11 +119,11 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { s, err := server.NewServer( server.WithRuntimeFactoryBuilder(common.WASMRuntime, - func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { + func(configMap config.ConfigMap) (api.FunctionRuntimeFactory, error) { return wazero.NewWazeroFunctionRuntimeFactory(), nil }), server.WithTubeFactoryBuilder(common.MemoryTubeType, - func(configMap common.ConfigMap) (contube.TubeFactory, error) { + func(configMap config.ConfigMap) (contube.TubeFactory, error) { return memoryQueueFactory, nil }), ) diff --git a/common/config.go b/common/config/config.go similarity index 81% rename from common/config.go rename to common/config/config.go index 20ba3d29..a6232d66 100644 --- a/common/config.go +++ b/common/config/config.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package common +package config // ConfigMap is a custom type that represents a map where keys are strings and values are of any type. // Since Viper is not case-sensitive, we use '-' to separate words in all field names in the config map. @@ -24,3 +24,14 @@ package common // - `socket-path` refers to the path of the socket. // - `pulsar-url` refers to the URL of the Pulsar service. type ConfigMap map[string]interface{} + +// MergeConfig merges multiple ConfigMap into one +func MergeConfig(configs ...ConfigMap) ConfigMap { + result := ConfigMap{} + for _, config := range configs { + for k, v := range config { + result[k] = v + } + } + return result +} diff --git a/common/errors.go b/common/errors.go index b3853f16..582049f2 100644 --- a/common/errors.go +++ b/common/errors.go @@ -19,8 +19,10 @@ package common import "fmt" var ( - ErrorFunctionNotFound = fmt.Errorf("function not found") - ErrorFunctionExists = fmt.Errorf("function already exists") - ErrorRuntimeFactoryNotFound = fmt.Errorf("runtime factory not found") - ErrorTubeFactoryNotFound = fmt.Errorf("tube factory not found") + ErrorFunctionNotFound = fmt.Errorf("function not found") + ErrorFunctionExists = fmt.Errorf("function already exists") + ErrorFunctionUnsupportedRuntime = fmt.Errorf("function does not support runtime") + ErrorRuntimeFactoryNotFound = fmt.Errorf("runtime factory not found") + ErrorTubeFactoryNotFound = fmt.Errorf("tube factory not found") + ErrorPackageNoSupportedRuntime = fmt.Errorf("package does not support any runtime") ) diff --git a/common/log.go b/common/log.go index b7195350..fe7d9537 100644 --- a/common/log.go +++ b/common/log.go @@ -17,6 +17,8 @@ package common import ( + "context" + "github.com/go-logr/logr" "github.com/go-logr/zapr" "go.uber.org/zap" @@ -24,6 +26,8 @@ import ( const ( DebugLevel int = 4 + InfoLevel int = 3 + WarnLevel int = 2 ) type Logger struct { @@ -53,7 +57,29 @@ func (l *Logger) Debug(msg string, keysAndValues ...interface{}) { } } +func (l *Logger) Warn(msg string, keysAndValues ...interface{}) { + l.V(WarnLevel).Info(msg, keysAndValues...) +} + +func (l *Logger) Info(msg string, keysAndValues ...interface{}) { + l.V(InfoLevel).Info(msg, keysAndValues...) +} + func (l *Logger) SubLogger(keysAndValues ...any) *Logger { internalLogger := l.WithValues(keysAndValues...) return &Logger{&internalLogger} } + +type loggerKey struct{} + +func WithLogger(ctx context.Context, logger *Logger) context.Context { + return context.WithValue(ctx, loggerKey{}, logger) +} + +func GetLogger(ctx context.Context) *Logger { + logger, ok := ctx.Value(loggerKey{}).(*Logger) + if !ok { + return NewDefaultLogger() + } + return logger +} diff --git a/common/model/function.go b/common/model/function.go index 5d0511df..c4d1f360 100644 --- a/common/model/function.go +++ b/common/model/function.go @@ -19,6 +19,8 @@ package model import ( "strings" + "github.com/functionstream/function-stream/common/config" + "github.com/functionstream/function-stream/fs/contube" "github.com/pkg/errors" ) @@ -28,16 +30,15 @@ type TubeConfig struct { Config contube.ConfigMap `json:"config,omitempty"` } -type ConfigMap map[string]interface{} - type RuntimeConfig struct { - Config ConfigMap `json:"config,omitempty"` - Type string `json:"type"` + Config config.ConfigMap `json:"config,omitempty"` + Type string `json:"type"` } type Function struct { Name string `json:"name"` Namespace string `json:"namespace,omitempty"` + Package string `json:"package"` Runtime RuntimeConfig `json:"runtime"` Sources []TubeConfig `json:"source"` Sink TubeConfig `json:"sink"` diff --git a/conf/function-stream.yaml b/conf/function-stream.yaml index af9a0345..30b28f3f 100644 --- a/conf/function-stream.yaml +++ b/conf/function-stream.yaml @@ -22,4 +22,5 @@ tube-config: pulsar_url: "pulsar://localhost:6650" runtime-config: external: - socket-path: /tmp/fs.sock \ No newline at end of file + socket-path: /tmp/fs.sock +function-store: ./functionsa \ No newline at end of file diff --git a/fs/api/package.go b/fs/api/package.go new file mode 100644 index 00000000..b5d5ed33 --- /dev/null +++ b/fs/api/package.go @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package api + +import "github.com/functionstream/function-stream/common/model" + +type Package interface { + GetSupportedRuntimeConfig() []model.RuntimeConfig +} + +type PackageLoader interface { + Load(path string) (Package, error) +} diff --git a/fs/api/runtime.go b/fs/api/runtime.go index 482440e1..03cde4cf 100644 --- a/fs/api/runtime.go +++ b/fs/api/runtime.go @@ -17,6 +17,7 @@ package api import ( + "github.com/functionstream/function-stream/common/model" "github.com/functionstream/function-stream/fs/contube" ) @@ -26,5 +27,5 @@ type FunctionRuntime interface { } type FunctionRuntimeFactory interface { - NewFunctionRuntime(instance FunctionInstance) (FunctionRuntime, error) + NewFunctionRuntime(instance FunctionInstance, rc *model.RuntimeConfig) (FunctionRuntime, error) } diff --git a/fs/manager.go b/fs/manager.go index 6d46115d..247651ec 100644 --- a/fs/manager.go +++ b/fs/manager.go @@ -23,6 +23,9 @@ import ( "strconv" "sync" + "github.com/functionstream/function-stream/common/config" + _package "github.com/functionstream/function-stream/fs/package" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/common/model" "github.com/functionstream/function-stream/fs/api" @@ -56,6 +59,7 @@ type managerOptions struct { stateStore api.StateStore dontUseDefaultStateStore bool queueFactory contube.TubeFactory + packageLoader api.PackageLoader // TODO: Need to set it log *logr.Logger } @@ -111,6 +115,13 @@ func WithLogger(log *logr.Logger) ManagerOption { }) } +func WithPackageLoader(loader api.PackageLoader) ManagerOption { + return managerOptionFunc(func(c *managerOptions) (*managerOptions, error) { + c.packageLoader = loader + return c, nil + }) +} + func NewFunctionManager(opts ...ManagerOption) (FunctionManager, error) { options := &managerOptions{ tubeFactoryMap: make(map[string]contube.TubeFactory), @@ -144,6 +155,9 @@ func NewFunctionManager(opts ...ManagerOption) (FunctionManager, error) { for k := range options.tubeFactoryMap { loadedTubeFact = append(loadedTubeFact, k) } + if options.packageLoader == nil { + options.packageLoader = _package.NewDefaultPackageLoader() + } log.Info("Function manager created", "runtime-factories", loadedRuntimeFact, "tube-factories", loadedTubeFact) return &functionManagerImpl{ @@ -173,6 +187,40 @@ func (fm *functionManagerImpl) createFuncCtx() *funcCtxImpl { return newFuncCtxImpl(fm.options.stateStore) } +func generateRuntimeConfig(ctx context.Context, p api.Package, f *model.Function) (*model.RuntimeConfig, error) { + log := common.GetLogger(ctx) + rc := &model.RuntimeConfig{} + if p == _package.EmptyPackage { + return &f.Runtime, nil + } + supportedRuntimeConf := p.GetSupportedRuntimeConfig() + rcMap := map[string]*model.RuntimeConfig{} + for k, v := range supportedRuntimeConf { + if v.Type == "" { + log.Warn("Package supported runtime type is empty. Ignore it.", "index", k, "package", f.Package) + continue + } + vCopy := v + rcMap[v.Type] = &vCopy + } + if len(rcMap) == 0 { + return nil, common.ErrorPackageNoSupportedRuntime + } + defaultRC := &supportedRuntimeConf[0] + if f.Runtime.Type == "" { + rc.Type = defaultRC.Type + } else { + if r, exist := rcMap[f.Runtime.Type]; exist { + defaultRC = r + } else { + return nil, fmt.Errorf("runtime type '%s' is not supported by package '%s'", f.Runtime.Type, f.Package) + } + rc.Type = f.Runtime.Type + } + rc.Config = config.MergeConfig(defaultRC.Config, f.Runtime.Config) + return rc, nil +} + func (fm *functionManagerImpl) StartFunction(f *model.Function) error { // TODO: Shouldn't use pointer here if err := f.Validate(); err != nil { return err @@ -186,15 +234,22 @@ func (fm *functionManagerImpl) StartFunction(f *model.Function) error { // TODO: fm.functionsLock.Unlock() for i := int32(0); i < f.Replicas; i++ { - runtimeType := f.Runtime.Type + p, err := fm.options.packageLoader.Load(f.Package) + if err != nil { + return err + } + runtimeConfig, err := generateRuntimeConfig(context.Background(), p, f) + if err != nil { + return fmt.Errorf("failed to generate runtime config: %v", err) + } funcCtx := fm.createFuncCtx() - instanceLogger := fm.log.SubLogger("functionName", f.Name, "instanceIndex", int(i), "runtimeType", runtimeType) + instanceLogger := fm.log.SubLogger("functionName", f.Name, "instanceIndex", int(i), "runtimeType", runtimeConfig.Type) instance := fm.options.instanceFactory.NewFunctionInstance(f, funcCtx, i, instanceLogger) fm.functionsLock.Lock() fm.functions[common.GetNamespacedName(f.Namespace, f.Name)][i] = instance fm.functionsLock.Unlock() - runtimeFactory, err := fm.getRuntimeFactory(runtimeType) + runtimeFactory, err := fm.getRuntimeFactory(runtimeConfig.Type) if err != nil { return err } @@ -220,7 +275,7 @@ func (fm *functionManagerImpl) StartFunction(f *model.Function) error { // TODO: } funcCtx.setSink(sink) - runtime, err := runtimeFactory.NewFunctionRuntime(instance) + runtime, err := runtimeFactory.NewFunctionRuntime(instance, runtimeConfig) if err != nil { return fmt.Errorf("failed to create runtime: %w", err) } diff --git a/fs/manager_test.go b/fs/manager_test.go new file mode 100644 index 00000000..6f0619d6 --- /dev/null +++ b/fs/manager_test.go @@ -0,0 +1,97 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fs + +import ( + "context" + "testing" + + "github.com/functionstream/function-stream/common" + "github.com/functionstream/function-stream/common/model" + "github.com/stretchr/testify/assert" +) + +// Mock implementations of the interfaces and structs +type MockPackage struct { + runtimeConfigs []model.RuntimeConfig +} + +func (m *MockPackage) GetSupportedRuntimeConfig() []model.RuntimeConfig { + return m.runtimeConfigs +} + +func TestGenerateRuntimeConfig_EmptySupportedRuntimeConfig(t *testing.T) { + ctx := context.Background() + p := &MockPackage{runtimeConfigs: []model.RuntimeConfig{}} + f := &model.Function{} + + _, err := generateRuntimeConfig(ctx, p, f) + assert.NotNil(t, err) + assert.Equal(t, common.ErrorPackageNoSupportedRuntime, err) +} + +func TestGenerateRuntimeConfig_EmptyFunctionRuntimeType(t *testing.T) { + ctx := context.Background() + p := &MockPackage{ + runtimeConfigs: []model.RuntimeConfig{ + {Type: "runtime1", Config: map[string]interface{}{"key1": "value1"}}, + }, + } + f := &model.Function{ + Runtime: model.RuntimeConfig{}, + } + + rc, err := generateRuntimeConfig(ctx, p, f) + assert.Nil(t, err) + assert.Equal(t, "runtime1", rc.Type) + assert.Equal(t, "value1", rc.Config["key1"]) +} + +func TestGenerateRuntimeConfig_UnsupportedFunctionRuntimeType(t *testing.T) { + ctx := context.Background() + p := &MockPackage{ + runtimeConfigs: []model.RuntimeConfig{ + {Type: "runtime1", Config: map[string]interface{}{"key1": "value1"}}, + }, + } + f := &model.Function{ + Runtime: model.RuntimeConfig{Type: "unsupported_runtime"}, + } + + _, err := generateRuntimeConfig(ctx, p, f) + assert.NotNil(t, err) + assert.Equal(t, "runtime type 'unsupported_runtime' is not supported by package ''", err.Error()) +} + +func TestGenerateRuntimeConfig_SupportedFunctionRuntimeType(t *testing.T) { + ctx := context.Background() + p := &MockPackage{ + runtimeConfigs: []model.RuntimeConfig{ + {Type: "runtime1", Config: map[string]interface{}{"key1": "value1"}}, + {Type: "runtime2", Config: map[string]interface{}{"key2": "value2"}}, + }, + } + f := &model.Function{ + Runtime: model.RuntimeConfig{Type: "runtime2", Config: map[string]interface{}{"key3": "value3"}}, + } + + rc, err := generateRuntimeConfig(ctx, p, f) + assert.Nil(t, err) + assert.Equal(t, "runtime2", rc.Type) + assert.Equal(t, "value2", rc.Config["key2"]) + assert.Equal(t, "value3", rc.Config["key3"]) +} diff --git a/fs/package/package_loader.go b/fs/package/package_loader.go new file mode 100644 index 00000000..41213531 --- /dev/null +++ b/fs/package/package_loader.go @@ -0,0 +1,61 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package _package + +import ( + "github.com/functionstream/function-stream/common" + "github.com/functionstream/function-stream/common/model" + "github.com/functionstream/function-stream/fs/api" +) + +type WasmPackage struct { + api.Package + path string +} + +type emptyPackage struct{} + +func (p *emptyPackage) GetSupportedRuntimeConfig() []model.RuntimeConfig { + return nil +} + +var EmptyPackage = &emptyPackage{} + +func (p *WasmPackage) GetSupportedRuntimeConfig() []model.RuntimeConfig { + return []model.RuntimeConfig{ + { + Type: common.WASMRuntime, + Config: map[string]interface{}{ + "archive": p.path, + }, + }, + } +} + +type DefaultPackageLoader struct { +} + +func (p DefaultPackageLoader) Load(path string) (api.Package, error) { + if path == "" { + return EmptyPackage, nil + } + return &WasmPackage{path: path}, nil +} + +func NewDefaultPackageLoader() api.PackageLoader { + return &DefaultPackageLoader{} +} diff --git a/fs/runtime/external/runtime.go b/fs/runtime/external/runtime.go index 61b2c73d..ff28be95 100644 --- a/fs/runtime/external/runtime.go +++ b/fs/runtime/external/runtime.go @@ -23,6 +23,9 @@ import ( "os" "sync" + "github.com/functionstream/function-stream/common/config" + funcModel "github.com/functionstream/function-stream/common/model" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" @@ -101,7 +104,8 @@ type Factory struct { log *common.Logger } -func (f *Factory) NewFunctionRuntime(instance api.FunctionInstance) (api.FunctionRuntime, error) { +func (f *Factory) NewFunctionRuntime(instance api.FunctionInstance, + _ *funcModel.RuntimeConfig) (api.FunctionRuntime, error) { def := instance.Definition() r := &runtime{ inputCh: make(chan contube.Record), @@ -139,7 +143,7 @@ const ( DefaultSocketPath = "/tmp/fs.sock" ) -func NewFactoryWithConfig(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { +func NewFactoryWithConfig(configMap config.ConfigMap) (api.FunctionRuntimeFactory, error) { socketPath := "" if v, ok := configMap["socket-path"].(string); ok { socketPath = v diff --git a/fs/runtime/wazero/wazero_runtime.go b/fs/runtime/wazero/wazero_runtime.go index e9b7d24a..20a8ac79 100644 --- a/fs/runtime/wazero/wazero_runtime.go +++ b/fs/runtime/wazero/wazero_runtime.go @@ -18,9 +18,12 @@ package wazero import ( "context" + "errors" "fmt" "os" + "github.com/functionstream/function-stream/common/model" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" @@ -71,7 +74,8 @@ func WithWASMFetcher(fetcher WASMFetcher) func(*options) { } } -func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionInstance) (api.FunctionRuntime, error) { +func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionInstance, + rc *model.RuntimeConfig) (api.FunctionRuntime, error) { log := instance.Logger() r := wazero.NewRuntime(instance.Context()) _, err := r.NewHostModuleBuilder("env").NewFunctionBuilder().WithFunc(func(ctx context.Context, @@ -97,10 +101,10 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI wasi_snapshot_preview1.MustInstantiate(instance.Context(), r) - if instance.Definition().Runtime.Config == nil { + if rc.Config == nil { return nil, fmt.Errorf("no runtime config found") } - path, exist := instance.Definition().Runtime.Config["archive"] + path, exist := rc.Config["archive"] if !exist { return nil, fmt.Errorf("no wasm archive found") } @@ -114,10 +118,9 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI } mod, err := r.InstantiateWithConfig(instance.Context(), wasmBytes, config) if err != nil { - if exitErr, ok := err.(*sys.ExitError); ok && exitErr.ExitCode() != 0 { + var exitErr *sys.ExitError + if errors.As(err, &exitErr) && exitErr.ExitCode() != 0 { return nil, fmt.Errorf("failed to instantiate function, function exit with code %d", exitErr.ExitCode()) - } else if !ok { - return nil, fmt.Errorf("failed to instantiate function: %w", err) } } if err != nil { diff --git a/server/config.go b/server/config.go index fb458d79..75bc26dd 100644 --- a/server/config.go +++ b/server/config.go @@ -21,9 +21,10 @@ import ( "os" "strings" + "github.com/functionstream/function-stream/common/config" + "github.com/go-playground/validator/v10" - "github.com/functionstream/function-stream/common" "github.com/spf13/viper" ) @@ -31,17 +32,17 @@ type FactoryConfig struct { // Deprecate Ref *string `mapstructure:"ref"` Type *string `mapstructure:"type"` - Config *common.ConfigMap `mapstructure:"config"` + Config *config.ConfigMap `mapstructure:"config"` } type StateStoreConfig struct { Type *string `mapstructure:"type"` - Config *common.ConfigMap `mapstructure:"config"` + Config *config.ConfigMap `mapstructure:"config"` } type QueueConfig struct { Type string `mapstructure:"type"` - Config common.ConfigMap `mapstructure:"config"` + Config config.ConfigMap `mapstructure:"config"` } type Config struct { @@ -50,9 +51,9 @@ type Config struct { Queue QueueConfig `mapstructure:"queue"` - TubeConfig map[string]common.ConfigMap `mapstructure:"tube-config"` + TubeConfig map[string]config.ConfigMap `mapstructure:"tube-config"` - RuntimeConfig map[string]common.ConfigMap `mapstructure:"runtime-config"` + RuntimeConfig map[string]config.ConfigMap `mapstructure:"runtime-config"` // StateStore is the configuration for the state store that the function stream server will use. // Optional diff --git a/server/server.go b/server/server.go index 5f45bf24..431e5251 100644 --- a/server/server.go +++ b/server/server.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + "github.com/functionstream/function-stream/common/config" + "github.com/functionstream/function-stream/fs/runtime/external" "github.com/go-logr/logr" @@ -68,10 +70,10 @@ type serverOptions struct { enableTls bool tlsCertFile string tlsKeyFile string - tubeFactoryBuilders map[string]func(configMap common.ConfigMap) (contube.TubeFactory, error) - tubeConfig map[string]common.ConfigMap - runtimeFactoryBuilders map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) - runtimeConfig map[string]common.ConfigMap + tubeFactoryBuilders map[string]func(configMap config.ConfigMap) (contube.TubeFactory, error) + tubeConfig map[string]config.ConfigMap + runtimeFactoryBuilders map[string]func(configMap config.ConfigMap) (api.FunctionRuntimeFactory, error) + runtimeConfig map[string]config.ConfigMap queueConfig QueueConfig log *logr.Logger } @@ -113,7 +115,7 @@ func WithQueueConfig(config QueueConfig) ServerOption { func WithTubeFactoryBuilder( name string, - builder func(configMap common.ConfigMap) (contube.TubeFactory, error), + builder func(configMap config.ConfigMap) (contube.TubeFactory, error), ) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { o.tubeFactoryBuilders[name] = builder @@ -122,7 +124,7 @@ func WithTubeFactoryBuilder( } func WithTubeFactoryBuilders( - builder map[string]func(configMap common.ConfigMap, + builder map[string]func(configMap config.ConfigMap, ) (contube.TubeFactory, error)) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { for n, b := range builder { @@ -134,7 +136,7 @@ func WithTubeFactoryBuilders( func WithRuntimeFactoryBuilder( name string, - builder func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error), + builder func(configMap config.ConfigMap) (api.FunctionRuntimeFactory, error), ) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { o.runtimeFactoryBuilders[name] = builder @@ -143,7 +145,7 @@ func WithRuntimeFactoryBuilder( } func WithRuntimeFactoryBuilders( - builder map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error), + builder map[string]func(configMap config.ConfigMap) (api.FunctionRuntimeFactory, error), ) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { for n, b := range builder { @@ -167,32 +169,32 @@ func WithLogger(log *logr.Logger) ServerOption { }) } -func GetBuiltinTubeFactoryBuilder() map[string]func(configMap common.ConfigMap) (contube.TubeFactory, error) { - return map[string]func(configMap common.ConfigMap) (contube.TubeFactory, error){ - common.PulsarTubeType: func(configMap common.ConfigMap) (contube.TubeFactory, error) { +func GetBuiltinTubeFactoryBuilder() map[string]func(configMap config.ConfigMap) (contube.TubeFactory, error) { + return map[string]func(configMap config.ConfigMap) (contube.TubeFactory, error){ + common.PulsarTubeType: func(configMap config.ConfigMap) (contube.TubeFactory, error) { return contube.NewPulsarEventQueueFactory(context.Background(), contube.ConfigMap(configMap)) }, //nolint:unparam - common.MemoryTubeType: func(_ common.ConfigMap) (contube.TubeFactory, error) { + common.MemoryTubeType: func(_ config.ConfigMap) (contube.TubeFactory, error) { return contube.NewMemoryQueueFactory(context.Background()), nil }, } } -func GetBuiltinRuntimeFactoryBuilder() map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { - return map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error){ +func GetBuiltinRuntimeFactoryBuilder() map[string]func(configMap config.ConfigMap) (api.FunctionRuntimeFactory, error) { + return map[string]func(configMap config.ConfigMap) (api.FunctionRuntimeFactory, error){ //nolint:unparam - common.WASMRuntime: func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { + common.WASMRuntime: func(configMap config.ConfigMap) (api.FunctionRuntimeFactory, error) { return wazero.NewWazeroFunctionRuntimeFactory(), nil }, - common.ExternalRuntime: func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { + common.ExternalRuntime: func(configMap config.ConfigMap) (api.FunctionRuntimeFactory, error) { return external.NewFactoryWithConfig(configMap) }, } } -func setupFactories[T any](factoryBuilder map[string]func(configMap common.ConfigMap) (T, error), - config map[string]common.ConfigMap, +func setupFactories[T any](factoryBuilder map[string]func(configMap config.ConfigMap) (T, error), + config map[string]config.ConfigMap, ) (map[string]T, error) { factories := make(map[string]T) for name, builder := range factoryBuilder { @@ -245,10 +247,10 @@ func WithConfig(config *Config) ServerOption { func NewServer(opts ...ServerOption) (*Server, error) { options := &serverOptions{} - options.tubeFactoryBuilders = make(map[string]func(configMap common.ConfigMap) (contube.TubeFactory, error)) - options.tubeConfig = make(map[string]common.ConfigMap) - options.runtimeFactoryBuilders = make(map[string]func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error)) - options.runtimeConfig = make(map[string]common.ConfigMap) + options.tubeFactoryBuilders = make(map[string]func(configMap config.ConfigMap) (contube.TubeFactory, error)) + options.tubeConfig = make(map[string]config.ConfigMap) + options.runtimeFactoryBuilders = make(map[string]func(configMap config.ConfigMap) (api.FunctionRuntimeFactory, error)) + options.runtimeConfig = make(map[string]config.ConfigMap) options.stateStoreLoader = DefaultStateStoreLoader for _, o := range opts { if o == nil { @@ -342,14 +344,14 @@ func NewDefaultServer() (*Server, error) { ListenAddr: ":7300", Queue: QueueConfig{ Type: common.MemoryTubeType, - Config: common.ConfigMap{}, + Config: config.ConfigMap{}, }, - TubeConfig: map[string]common.ConfigMap{ + TubeConfig: map[string]config.ConfigMap{ common.PulsarTubeType: { contube.PulsarURLKey: "pulsar://localhost:6650", }, }, - RuntimeConfig: map[string]common.ConfigMap{}, + RuntimeConfig: map[string]config.ConfigMap{}, } return NewServer( WithTubeFactoryBuilders(GetBuiltinTubeFactoryBuilder()), diff --git a/server/server_test.go b/server/server_test.go index 4e34085f..0ed0f4ca 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -24,6 +24,8 @@ import ( "strconv" "testing" + "github.com/functionstream/function-stream/common/config" + adminclient "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/common/model" @@ -73,12 +75,7 @@ func TestStandaloneBasicFunction(t *testing.T) { outputTopic := "test-output-" + strconv.Itoa(rand.Int()) funcConf := &model.Function{ - Runtime: model.RuntimeConfig{ - Type: common.WASMRuntime, - Config: map[string]interface{}{ - common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", - }, - }, + Package: "../bin/example_basic.wasm", Sources: []model.TubeConfig{ { Type: common.MemoryTubeType, @@ -140,12 +137,7 @@ func TestHttpTube(t *testing.T) { endpoint := "test-endpoint" funcConf := &model.Function{ - Runtime: model.RuntimeConfig{ - Type: common.WASMRuntime, - Config: map[string]interface{}{ - common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", - }, - }, + Package: "../bin/example_basic.wasm", Sources: []model.TubeConfig{{ Type: common.HttpTubeType, Config: map[string]interface{}{ @@ -200,7 +192,8 @@ func TestHttpTube(t *testing.T) { type MockRuntimeFactory struct { } -func (f *MockRuntimeFactory) NewFunctionRuntime(instance api.FunctionInstance) (api.FunctionRuntime, error) { +func (f *MockRuntimeFactory) NewFunctionRuntime(instance api.FunctionInstance, + _ *model.RuntimeConfig) (api.FunctionRuntime, error) { return &MockRuntime{ funcCtx: instance.FunctionContext(), }, nil @@ -238,7 +231,7 @@ func TestStatefulFunction(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() s, httpAddr := startStandaloneSvr(t, ctx, - WithRuntimeFactoryBuilder("mock", func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) { + WithRuntimeFactoryBuilder("mock", func(configMap config.ConfigMap) (api.FunctionRuntimeFactory, error) { return &MockRuntimeFactory{}, nil }))