Skip to content

Commit

Permalink
feat: support package loader
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie committed Aug 25, 2024
1 parent 94e81ae commit 69b262a
Show file tree
Hide file tree
Showing 23 changed files with 417 additions and 76 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/license.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Run license check
run: ./license-checker/license-checker.sh
run: make license
3 changes: 2 additions & 1 deletion admin/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This API client was generated by the [OpenAPI Generator](https://openapi-generat

- API version: 1.0.0
- Package version: 1.0.0
- Generator version: 7.6.0
- Build package: org.openapitools.codegen.languages.GoClientCodegen
For more information, please visit [https://github.com/FunctionStream](https://github.com/FunctionStream)

Expand All @@ -22,7 +23,7 @@ go get golang.org/x/net/context
Put the package under your project folder and add the following in import:

```go
import adminclient "github.com/functionstream/functionstream/admin/client"
import adminclient "github.com/functionstream/function-stream/admin/client"
```

To use a proxy, set the environment variable `HTTP_PROXY`:
Expand Down
2 changes: 1 addition & 1 deletion admin/client/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion admin/client/docs/ModelFunction.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Name | Type | Description | Notes
**Config** | Pointer to **map[string]string** | | [optional]
**Name** | **string** | |
**Namespace** | Pointer to **string** | | [optional]
**Package** | **string** | |
**Replicas** | **int32** | |
**Runtime** | [**ModelRuntimeConfig**](ModelRuntimeConfig.md) | |
**Sink** | [**ModelTubeConfig**](ModelTubeConfig.md) | |
Expand All @@ -16,7 +17,7 @@ Name | Type | Description | Notes

### NewModelFunction

`func NewModelFunction(name string, replicas int32, runtime ModelRuntimeConfig, sink ModelTubeConfig, source []ModelTubeConfig, ) *ModelFunction`
`func NewModelFunction(name string, package_ string, replicas int32, runtime ModelRuntimeConfig, sink ModelTubeConfig, source []ModelTubeConfig, ) *ModelFunction`

NewModelFunction instantiates a new ModelFunction object
This constructor will assign default values to properties that have it defined,
Expand Down Expand Up @@ -101,6 +102,26 @@ SetNamespace sets Namespace field to given value.

HasNamespace returns a boolean if a field has been set.

### GetPackage

`func (o *ModelFunction) GetPackage() string`

GetPackage returns the Package field if non-nil, zero value otherwise.

### GetPackageOk

`func (o *ModelFunction) GetPackageOk() (*string, bool)`

GetPackageOk returns a tuple with the Package field if it's non-nil, zero value otherwise
and a boolean to check if the value has been set.

### SetPackage

`func (o *ModelFunction) SetPackage(v string)`

SetPackage sets Package field to given value.


### GetReplicas

`func (o *ModelFunction) GetReplicas() int32`
Expand Down
30 changes: 29 additions & 1 deletion admin/client/model_model_function.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion admin/client/utils.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions apidocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@
"model.Function": {
"required": [
"name",
"package",
"runtime",
"source",
"sink",
Expand All @@ -354,6 +355,9 @@
"namespace": {
"type": "string"
},
"package": {
"type": "string"
},
"replicas": {
"type": "integer",
"format": "int32"
Expand Down
6 changes: 4 additions & 2 deletions benchmark/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"testing"
"time"

"github.com/functionstream/function-stream/common/config"

"github.com/functionstream/function-stream/fs/api"
"github.com/functionstream/function-stream/fs/runtime/wazero"

Expand Down Expand Up @@ -117,11 +119,11 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {

s, err := server.NewServer(
server.WithRuntimeFactoryBuilder(common.WASMRuntime,
func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) {
func(configMap config.ConfigMap) (api.FunctionRuntimeFactory, error) {
return wazero.NewWazeroFunctionRuntimeFactory(), nil
}),
server.WithTubeFactoryBuilder(common.MemoryTubeType,
func(configMap common.ConfigMap) (contube.TubeFactory, error) {
func(configMap config.ConfigMap) (contube.TubeFactory, error) {
return memoryQueueFactory, nil
}),
)
Expand Down
13 changes: 12 additions & 1 deletion common/config.go → common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package common
package config

// ConfigMap is a custom type that represents a map where keys are strings and values are of any type.
// Since Viper is not case-sensitive, we use '-' to separate words in all field names in the config map.
Expand All @@ -24,3 +24,14 @@ package common
// - `socket-path` refers to the path of the socket.
// - `pulsar-url` refers to the URL of the Pulsar service.
type ConfigMap map[string]interface{}

// MergeConfig merges multiple ConfigMap into one
func MergeConfig(configs ...ConfigMap) ConfigMap {
result := ConfigMap{}
for _, config := range configs {
for k, v := range config {
result[k] = v
}
}
return result
}
10 changes: 6 additions & 4 deletions common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package common
import "fmt"

var (
ErrorFunctionNotFound = fmt.Errorf("function not found")
ErrorFunctionExists = fmt.Errorf("function already exists")
ErrorRuntimeFactoryNotFound = fmt.Errorf("runtime factory not found")
ErrorTubeFactoryNotFound = fmt.Errorf("tube factory not found")
ErrorFunctionNotFound = fmt.Errorf("function not found")
ErrorFunctionExists = fmt.Errorf("function already exists")
ErrorFunctionUnsupportedRuntime = fmt.Errorf("function does not support runtime")
ErrorRuntimeFactoryNotFound = fmt.Errorf("runtime factory not found")
ErrorTubeFactoryNotFound = fmt.Errorf("tube factory not found")
ErrorPackageNoSupportedRuntime = fmt.Errorf("package does not support any runtime")
)
26 changes: 26 additions & 0 deletions common/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
package common

import (
"context"

"github.com/go-logr/logr"
"github.com/go-logr/zapr"
"go.uber.org/zap"
)

const (
DebugLevel int = 4
InfoLevel int = 3
WarnLevel int = 2
)

type Logger struct {
Expand Down Expand Up @@ -53,7 +57,29 @@ func (l *Logger) Debug(msg string, keysAndValues ...interface{}) {
}
}

func (l *Logger) Warn(msg string, keysAndValues ...interface{}) {
l.V(WarnLevel).Info(msg, keysAndValues...)
}

func (l *Logger) Info(msg string, keysAndValues ...interface{}) {
l.V(InfoLevel).Info(msg, keysAndValues...)
}

func (l *Logger) SubLogger(keysAndValues ...any) *Logger {
internalLogger := l.WithValues(keysAndValues...)
return &Logger{&internalLogger}
}

type loggerKey struct{}

func WithLogger(ctx context.Context, logger *Logger) context.Context {
return context.WithValue(ctx, loggerKey{}, logger)
}

func GetLogger(ctx context.Context) *Logger {
logger, ok := ctx.Value(loggerKey{}).(*Logger)
if !ok {
return NewDefaultLogger()
}
return logger
}
9 changes: 5 additions & 4 deletions common/model/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package model
import (
"strings"

"github.com/functionstream/function-stream/common/config"

"github.com/functionstream/function-stream/fs/contube"
"github.com/pkg/errors"
)
Expand All @@ -28,16 +30,15 @@ type TubeConfig struct {
Config contube.ConfigMap `json:"config,omitempty"`
}

type ConfigMap map[string]interface{}

type RuntimeConfig struct {
Config ConfigMap `json:"config,omitempty"`
Type string `json:"type"`
Config config.ConfigMap `json:"config,omitempty"`
Type string `json:"type"`
}

type Function struct {
Name string `json:"name"`
Namespace string `json:"namespace,omitempty"`
Package string `json:"package"`
Runtime RuntimeConfig `json:"runtime"`
Sources []TubeConfig `json:"source"`
Sink TubeConfig `json:"sink"`
Expand Down
3 changes: 2 additions & 1 deletion conf/function-stream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ tube-config:
pulsar_url: "pulsar://localhost:6650"
runtime-config:
external:
socket-path: /tmp/fs.sock
socket-path: /tmp/fs.sock
function-store: ./functionsa
27 changes: 27 additions & 0 deletions fs/api/package.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2024 Function Stream Org.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package api

import "github.com/functionstream/function-stream/common/model"

type Package interface {
GetSupportedRuntimeConfig() []model.RuntimeConfig
}

type PackageLoader interface {
Load(path string) (Package, error)
}
3 changes: 2 additions & 1 deletion fs/api/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package api

import (
"github.com/functionstream/function-stream/common/model"
"github.com/functionstream/function-stream/fs/contube"
)

Expand All @@ -26,5 +27,5 @@ type FunctionRuntime interface {
}

type FunctionRuntimeFactory interface {
NewFunctionRuntime(instance FunctionInstance) (FunctionRuntime, error)
NewFunctionRuntime(instance FunctionInstance, rc *model.RuntimeConfig) (FunctionRuntime, error)
}
Loading

0 comments on commit 69b262a

Please sign in to comment.