Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sequencer zookeeper #156

Merged
merged 12 commits into from
Aug 12, 2021
5 changes: 4 additions & 1 deletion cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ import (

// Sequencer
sequencer_etcd "mosn.io/layotto/components/sequencer/etcd"

sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper"
// Actuator
_ "mosn.io/layotto/pkg/actuator"
"mosn.io/layotto/pkg/actuator/health"
Expand Down Expand Up @@ -263,6 +263,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime_sequencer.NewFactory("etcd", func() sequencer.Store {
return sequencer_etcd.NewEtcdSequencer(log.DefaultLogger)
}),
runtime_sequencer.NewFactory("zookeeper", func() sequencer.Store {
return sequencer_zookeeper.NewZookeeperSequencer(log.DefaultLogger)
}),
))
// 4. check if unhealthy
if err != nil {
Expand Down
164 changes: 164 additions & 0 deletions components/sequencer/zookeeper/zookeeper_sequencer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package zookeeper

import (
"context"
"errors"
"fmt"
"github.com/go-zookeeper/zk"
"mosn.io/layotto/components/sequencer"
"mosn.io/pkg/log"
"strconv"
"strings"
"time"
)

const (
host = "zookeeperHosts"
password = "zookeeperPassword"
sessionTimeout = "sessionTimeout"
logInfo = "logInfo"
defaultSessionTimeout = 5 * time.Second
)

type ConnectionFactory interface {
NewConnection(meta metadata) (ZKConnection, error)
}

type ConnectionFactoryImpl struct {
}

func (c *ConnectionFactoryImpl) NewConnection(meta metadata) (ZKConnection, error) {
conn, _, err := zk.Connect(meta.hosts, meta.sessionTimeout, zk.WithLogInfo(meta.logInfo))
if err != nil {
return nil, err
}
return conn, nil
}

type ZKConnection interface {
Set(path string, data []byte, version int32) (*zk.Stat, error)
Get(path string) ([]byte, *zk.Stat, error)
Close()
}

type ZookeeperSequencer struct {
client ZKConnection
metadata metadata
logger log.ErrorLogger
factory ConnectionFactory
ctx context.Context
cancel context.CancelFunc
}

// NewZookeeperSequencer returns a new zookeeper sequencer
func NewZookeeperSequencer(logger log.ErrorLogger) *ZookeeperSequencer {
s := &ZookeeperSequencer{
logger: logger,
}

return s
}

func (s *ZookeeperSequencer) Init(config sequencer.Configuration) error {
m, err := parseRedisMetadata(config)
if err != nil {
return err
}
//init
s.metadata = m
s.factory = &ConnectionFactoryImpl{}
connection, err := s.factory.NewConnection(m)
if err != nil {
return err
}
s.client = connection
s.ctx, s.cancel = context.WithCancel(context.Background())

//check biggerThan
for k, needV := range s.metadata.biggerThan {
if needV <= 0 {
continue
}
_, stat, err := s.client.Get("/" + k)
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
//key not exist
if err == zk.ErrNoNode {
return fmt.Errorf("zookeeper sequencer error: can not satisfy biggerThan guarantee.key: %s, current key does not exist", k)
}
//other error
return err
}
realV := int64(stat.Version)

if realV < needV {
return fmt.Errorf("zookeeper sequencer error: can not satisfy biggerThan guarantee.key: %s,current id:%v", k, realV)
}

}
return err

}

func (s *ZookeeperSequencer) GetNextId(req *sequencer.GetNextIdRequest) (*sequencer.GetNextIdResponse, error) {

stat, err := s.client.Set("/"+req.Key, []byte(""), -1)

if err != nil {
return nil, err
}
return &sequencer.GetNextIdResponse{
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
NextId: int64(stat.Version),
}, nil
}

func (s *ZookeeperSequencer) GetSegment(req *sequencer.GetSegmentRequest) (support bool, result *sequencer.GetSegmentResponse, err error) {
return false, nil, nil
}
func (s *ZookeeperSequencer) Close() error {
s.cancel()
s.client.Close()
return nil
}
func parseRedisMetadata(config sequencer.Configuration) (metadata, error) {
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
m := metadata{}

m.biggerThan = config.BiggerThan

if val, ok := config.Properties[host]; ok && val != "" {
split := strings.Split(val, ";")
m.hosts = append(m.hosts, split...)
} else {
return m, errors.New("zookeeper store error: missing host address")
}

if val, ok := config.Properties[password]; ok && val != "" {
m.password = val
}

m.sessionTimeout = defaultSessionTimeout
if val, ok := config.Properties[sessionTimeout]; ok && val != "" {
parsedVal, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("zookeeper store error: can't parse sessionTimeout field: %s", err)
}
m.sessionTimeout = time.Duration(parsedVal) * time.Second
}

if val, ok := config.Properties[logInfo]; ok && val != "" {
b, err := strconv.ParseBool(val)
if err != nil {
return metadata{}, err
}
m.logInfo = b
}
return m, nil
}

type metadata struct {
hosts []string
password string
sessionTimeout time.Duration
logInfo bool
keyPrefix string
biggerThan map[string]int64
}
116 changes: 116 additions & 0 deletions components/sequencer/zookeeper/zookeeper_sequencer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 49 additions & 0 deletions components/sequencer/zookeeper/zookeeper_sequencer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package zookeeper

import (
"github.com/go-zookeeper/zk"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"mosn.io/layotto/components/sequencer"
"mosn.io/pkg/log"
"testing"
)

const key = "resoure_1"

func TestZookeeperSequencer_GetNextId(t *testing.T) {
cfg := sequencer.Configuration{
BiggerThan: nil,
Properties: map[string]string{
"zookeeperHosts": "127.0.0.1",
},
}

comp := NewZookeeperSequencer(log.DefaultLogger)
comp.Init(cfg)

//mock
ctrl := gomock.NewController(t)
client := NewMockZKConnection(ctrl)

path := "/" + key
var val int32 = 1
client.EXPECT().Set(path, []byte(""), int32(-1)).Return(&zk.Stat{Version: val}, nil).Times(1)
val++
client.EXPECT().Set(path, []byte(""), int32(-1)).Return(&zk.Stat{Version: val}, nil).Times(1)
comp.client = client
//first
resp, err := comp.GetNextId(&sequencer.GetNextIdRequest{
Key: key,
})
assert.NoError(t, err)
assert.Equal(t, int64(1), resp.NextId)

//repeat
resp, err = comp.GetNextId(&sequencer.GetNextIdRequest{
Key: key,
})
assert.NoError(t, err)
assert.Equal(t, int64(2), resp.NextId)

}
Loading