Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support polaris service registration discovery #415

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions contrib/registry/polaris/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module github.com/TarsCloud/TarsGo/contrib/registry/polaris

go 1.16

require (
github.com/TarsCloud/TarsGo v1.4.4
github.com/polarismesh/polaris-go v1.5.0
)

replace github.com/TarsCloud/TarsGo v1.4.4 => ../../../
975 changes: 975 additions & 0 deletions contrib/registry/polaris/go.sum

Large diffs are not rendered by default.

172 changes: 172 additions & 0 deletions contrib/registry/polaris/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package polaris

import (
"context"
"time"

"github.com/TarsCloud/TarsGo/tars/registry"
"github.com/TarsCloud/TarsGo/tars/util/endpoint"
"github.com/polarismesh/polaris-go"
"github.com/polarismesh/polaris-go/pkg/model"
)

const (
endpointMeta = "endpoint"
)

type polarisRegistry struct {
namespace string
provider polaris.ProviderAPI
consumer polaris.ConsumerAPI
}

type RegistryOption func(pr *polarisRegistry)

func WithNamespace(namespace string) RegistryOption {
return func(pr *polarisRegistry) {
pr.namespace = namespace
}
}

func New(provider polaris.ProviderAPI, opts ...RegistryOption) registry.Registrar {
consumer := polaris.NewConsumerAPIByContext(provider.SDKContext())
pr := &polarisRegistry{namespace: "tars", provider: provider, consumer: consumer}
for _, opt := range opts {
opt(pr)
}
//pr.addMiddleware()
return pr
}

/*func (pr *polarisRegistry) addMiddleware() {
tars.UseClientFilterMiddleware(func(next tars.ClientFilter) tars.ClientFilter {
return func(ctx context.Context, msg *tars.Message, invoke tars.Invoke, timeout time.Duration) (err error) {
start := time.Now()
defer func() {
delay := time.Since(start)
retStatus := model.RetSuccess
if msg.Resp.IRet != 0 {
retStatus = model.RetFail
}
ret := &polaris.ServiceCallResult{
ServiceCallResult: model.ServiceCallResult{
EmptyInstanceGauge: model.EmptyInstanceGauge{},
CalledInstance: nil, // todo: 怎么获取到或构造 Instance
Method: msg.Req.SServantName + "." + msg.Req.SFuncName,
RetStatus: retStatus,
},
}
ret.SetDelay(delay)
ret.SetRetCode(msg.Resp.IRet)
if er := pr.consumer.UpdateServiceCallResult(ret); er != nil {
TLOG.Errorf("do report service call result : %+v", er)
}
}()
return next(ctx, msg, invoke, timeout)
}
})
}*/

func (pr *polarisRegistry) Registry(_ context.Context, servant *registry.ServantInstance) error {
instance := &polaris.InstanceRegisterRequest{}
instance.Service = servant.Servant
instance.Namespace = pr.namespace
instance.Host = servant.Endpoint.Host
instance.Port = int(servant.Endpoint.Port)
instance.Protocol = &servant.Protocol
if servant.Endpoint.Weight > 0 {
weight := int(servant.Endpoint.Weight)
instance.Weight = &weight
}
if servant.Endpoint.Timeout > 0 {
timeout := time.Duration(servant.Endpoint.Timeout) * time.Millisecond
instance.Timeout = &timeout
}
instance.Version = &servant.TarsVersion
instance.Metadata = createMetadata(servant)
_, err := pr.provider.RegisterInstance(instance)
return err
}

func (pr *polarisRegistry) Deregister(_ context.Context, servant *registry.ServantInstance) error {
instance := &polaris.InstanceDeRegisterRequest{}
instance.Service = servant.Servant
instance.Namespace = pr.namespace
instance.Host = servant.Endpoint.Host
instance.Port = int(servant.Endpoint.Port)
if servant.Endpoint.Timeout > 0 {
timeout := time.Duration(servant.Endpoint.Timeout) * time.Millisecond
instance.Timeout = &timeout
}
err := pr.provider.Deregister(instance)
return err
}

func (pr *polarisRegistry) QueryServant(_ context.Context, id string) (activeEp []registry.Endpoint, inactiveEp []registry.Endpoint, err error) {
req := &polaris.GetAllInstancesRequest{}
req.Namespace = pr.namespace
req.Service = id
resp, err := pr.consumer.GetAllInstances(req)
if err != nil {
return nil, nil, err
}
instances := resp.GetInstances()
for _, ins := range instances {
epf := instanceToEndpoint(ins)
if ins.IsHealthy() {
activeEp = append(activeEp, epf)
} else {
inactiveEp = append(inactiveEp, epf)
}
}
return activeEp, inactiveEp, err
}

func (pr *polarisRegistry) QueryServantBySet(_ context.Context, id, setId string) (activeEp []registry.Endpoint, inactiveEp []registry.Endpoint, err error) {
req := &polaris.GetInstancesRequest{}
req.Namespace = pr.namespace
req.Service = id
req.Metadata = map[string]string{
"internal-enable-set": "Y",
"internal-set-name": setId,
}
resp, err := pr.consumer.GetInstances(req)
if err != nil {
return nil, nil, err
}
instances := resp.GetInstances()
for _, ins := range instances {
epf := instanceToEndpoint(ins)
if ins.IsHealthy() {
activeEp = append(activeEp, epf)
} else {
inactiveEp = append(inactiveEp, epf)
}
}
return activeEp, inactiveEp, err
}

func createMetadata(servant *registry.ServantInstance) map[string]string {
metadata := make(map[string]string)
metadata["tarsVersion"] = servant.TarsVersion
metadata["app"] = servant.App
metadata["server"] = servant.Server
ep := endpoint.Tars2endpoint(servant.Endpoint)
metadata[endpointMeta] = ep.String()
// polaris plugin
metadata["internal-enable-set"] = "N"
if servant.EnableSet {
metadata["internal-enable-set"] = "Y"
metadata["internal-set-name"] = servant.SetDivision
}
return metadata
}

func instanceToEndpoint(instance model.Instance) registry.Endpoint {
md := instance.GetMetadata()
ep := endpoint.Parse(instance.GetMetadata()[endpointMeta])
ep.Host = instance.GetHost()
ep.Port = int32(instance.GetPort())
ep.SetId = md["internal-set-name"]
return endpoint.Endpoint2tars(ep)
}
2 changes: 2 additions & 0 deletions examples/PolarisServer/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
PolarisServer
*.log
8 changes: 8 additions & 0 deletions examples/PolarisServer/HelloObj.tars
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module TestApp
{
interface HelloObj
{
int Add(int a,int b,out int c); // Some example function
int Sub(int a,int b,out int c); // Some example function
};
};
33 changes: 33 additions & 0 deletions examples/PolarisServer/HelloObj_imp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"context"
)

// HelloObjImp servant implementation
type HelloObjImp struct {
}

// Init servant init
func (imp *HelloObjImp) Init() error {
//initialize servant here:
//...
return nil
}

// Destroy servant destroy
func (imp *HelloObjImp) Destroy() {
//destroy servant here:
//...
}

func (imp *HelloObjImp) Add(ctx context.Context, a int32, b int32, c *int32) (int32, error) {
//Doing something in your function
//...
return 0, nil
}
func (imp *HelloObjImp) Sub(ctx context.Context, a int32, b int32, c *int32) (int32, error) {
//Doing something in your function
//...
return 0, nil
}
36 changes: 36 additions & 0 deletions examples/PolarisServer/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"fmt"
"log"

"github.com/polarismesh/polaris-go"

pr "github.com/TarsCloud/TarsGo/contrib/registry/polaris"
"github.com/TarsCloud/TarsGo/tars"

"polarisserver/tars-protocol/TestApp"
)

func main() {
//provider, err := polaris.NewProviderAPI()
// 或者使用以下方法,则不需要创建配置文件
provider, err := polaris.NewProviderAPIByAddress("127.0.0.1:8091")
if err != nil {
log.Fatalf("fail to create providerAPI, err is %v", err)
}
defer provider.Destroy()
// 注册中心
comm := tars.NewCommunicator(tars.WithRegistry(pr.New(provider, pr.WithNamespace("tars"))))
obj := fmt.Sprintf("TestApp.PolarisServer.HelloObj")
app := new(TestApp.HelloObj)
comm.StringToProxy(obj, app)
var out, i int32
i = 123
ret, err := app.Add(i, i*2, &out)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(ret, out)
}
25 changes: 25 additions & 0 deletions examples/PolarisServer/config/config.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<tars>
<application>
enableset=y
setdivision=public.ali.1
<server>
app=TestApp
server=PolarisServer
local=tcp -h 127.0.0.1 -p 10014 -t 30000
logpath=/tmp
<TestApp.PolarisServer.HelloObjAdapter>
allow
endpoint=tcp -h 127.0.0.1 -p 10015 -t 60000
handlegroup=TestApp.PolarisServer.HelloObjAdapter
maxconns=200000
protocol=tars
queuecap=10000
queuetimeout=60000
servant=TestApp.PolarisServer.HelloObj
shmcap=0
shmkey=0
threads=1
</TestApp.PolarisServer.HelloObjAdapter>
</server>
</application>
</tars>
21 changes: 21 additions & 0 deletions examples/PolarisServer/debugtool/dumpstack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
"fmt"

"github.com/TarsCloud/TarsGo/tars"
"github.com/TarsCloud/TarsGo/tars/protocol/res/adminf"
)

func main() {
comm := tars.NewCommunicator()
obj := "TestApp.PolarisServer.HelloObjObj@tcp -h 127.0.0.1 -p 10014 -t 60000"
app := new(adminf.AdminF)
comm.StringToProxy(obj, app)
ret, err := app.Notify("tars.dumpstack")
if err != nil {
fmt.Println(err)
return
}
fmt.Println(ret)
}
53 changes: 53 additions & 0 deletions examples/PolarisServer/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
module polarisserver

go 1.17

require (
github.com/TarsCloud/TarsGo v1.3.6
github.com/polarismesh/polaris-go v1.2.0-beta.3
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dlclark/regexp2 v1.7.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/gin-gonic/gin v1.8.1 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.0 // indirect
github.com/goccy/go-json v0.9.7 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/automaxprocs v1.5.1 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 // indirect
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20220504150022-98cd25cafc72 // indirect
google.golang.org/grpc v1.46.2 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace github.com/TarsCloud/TarsGo => ../../
Loading