Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sequencer zookeeper #156

Merged
merged 12 commits into from
Aug 12, 2021
4 changes: 4 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ import (
// Sequencer
sequencer_etcd "mosn.io/layotto/components/sequencer/etcd"
sequencer_redis "mosn.io/layotto/components/sequencer/redis"
sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper"
// Actuator
_ "mosn.io/layotto/pkg/actuator"
"mosn.io/layotto/pkg/actuator/health"
Expand Down Expand Up @@ -266,6 +267,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime_sequencer.NewFactory("redis", func() sequencer.Store {
return sequencer_redis.NewStandaloneRedisSequencer(log.DefaultLogger)
}),
runtime_sequencer.NewFactory("zookeeper", func() sequencer.Store {
return sequencer_zookeeper.NewZookeeperSequencer(log.DefaultLogger)
}),
))
// 4. check if unhealthy
if err != nil {
Expand Down
90 changes: 9 additions & 81 deletions components/lock/zookeeper/zookeeper_lock.go
Original file line number Diff line number Diff line change
@@ -1,53 +1,20 @@
package zookeeper

import (
"errors"
"fmt"
"github.com/go-zookeeper/zk"
"mosn.io/layotto/components/lock"
"mosn.io/layotto/components/pkg/utils"
"mosn.io/pkg/log"
"mosn.io/pkg/utils"
"strconv"
"strings"
util "mosn.io/pkg/utils"
"time"
)

const (
host = "zookeeperHosts"
password = "zookeeperPassword"
sessionTimeout = "sessionTimeout"
logInfo = "logInfo"
defaultSessionTimeout = 5 * time.Second
)

type ConnectionFactory interface {
NewConnection(expire time.Duration, meta metadata) (ZKConnection, error)
}

type ConnectionFactoryImpl struct {
}

func (c *ConnectionFactoryImpl) NewConnection(expire time.Duration, meta metadata) (ZKConnection, error) {
conn, _, err := zk.Connect(meta.hosts, expire, zk.WithLogInfo(meta.logInfo))
if err != nil {
return nil, err
}
return conn, nil
}

type ZKConnection interface {
Get(path string) ([]byte, *zk.Stat, error)
Delete(path string, version int32) error
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
Close()
}

type ZookeeperLock struct {
//trylock reestablish connection every time
factory ConnectionFactory
factory utils.ConnectionFactory
//unlock reuse this conneciton
unlockConn ZKConnection
metadata metadata
unlockConn utils.ZKConnection
metadata utils.ZookeeperMetadata
logger log.ErrorLogger
}

Expand All @@ -60,16 +27,16 @@ func NewZookeeperLock(logger log.ErrorLogger) *ZookeeperLock {

func (p *ZookeeperLock) Init(metadata lock.Metadata) error {

m, err := parseZookeeperMetadata(metadata)
m, err := utils.ParseZookeeperMetadata(metadata.Properties)
if err != nil {
return err
}

p.metadata = m
p.factory = &ConnectionFactoryImpl{}
p.factory = &utils.ConnectionFactoryImpl{}

//init unlock connection
zkConn, err := p.factory.NewConnection(p.metadata.sessionTimeout, p.metadata)
zkConn, err := p.factory.NewConnection(p.metadata.SessionTimeout, p.metadata)
if err != nil {
return err
}
Expand Down Expand Up @@ -103,7 +70,7 @@ func (p *ZookeeperLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse
}

//2.2 create node success, asyn to make sure zkclient alive for need time
utils.GoWithRecover(func() {
util.GoWithRecover(func() {
//can also
//time.Sleep(time.Second * time.Duration(req.Expire))
timeAfterTrigger := time.After(time.Second * time.Duration(req.Expire))
Expand Down Expand Up @@ -153,42 +120,3 @@ func (p *ZookeeperLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, e
//delete success, unlock success
return &lock.UnlockResponse{Status: lock.SUCCESS}, nil
}

type metadata struct {
hosts []string
password string
sessionTimeout time.Duration
logInfo bool
}

func parseZookeeperMetadata(meta lock.Metadata) (metadata, error) {
m := metadata{}
if val, ok := meta.Properties[host]; ok && val != "" {
split := strings.Split(val, ";")
m.hosts = append(m.hosts, split...)
} else {
return m, errors.New("zookeeper store error: missing host address")
}

if val, ok := meta.Properties[password]; ok && val != "" {
m.password = val
}

m.sessionTimeout = defaultSessionTimeout
if val, ok := meta.Properties[sessionTimeout]; ok && val != "" {
parsedVal, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("zookeeper store error: can't parse sessionTimeout field: %s", err)
}
m.sessionTimeout = time.Duration(parsedVal) * time.Second
}

if val, ok := meta.Properties[logInfo]; ok && val != "" {
b, err := strconv.ParseBool(val)
if err != nil {
return metadata{}, err
}
m.logInfo = b
}
return m, nil
}
19 changes: 10 additions & 9 deletions components/lock/zookeeper/zookeeper_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"mosn.io/layotto/components/lock"
"mosn.io/layotto/components/pkg/utils"
"mosn.io/pkg/log"
"testing"
"time"
Expand Down Expand Up @@ -35,9 +36,9 @@ func TestZookeeperLock_ALock_AUnlock(t *testing.T) {

//mock
ctrl := gomock.NewController(t)
unlockConn := NewMockZKConnection(ctrl)
lockConn := NewMockZKConnection(ctrl)
factory := NewMockConnectionFactory(ctrl)
unlockConn := utils.NewMockZKConnection(ctrl)
lockConn := utils.NewMockZKConnection(ctrl)
factory := utils.NewMockConnectionFactory(ctrl)
path := "/" + resouseId
factory.EXPECT().NewConnection(time.Duration(expireTime)*time.Second, comp.metadata).Return(lockConn, nil).Times(2)

Expand Down Expand Up @@ -74,9 +75,9 @@ func TestZookeeperLock_ALock_BUnlock(t *testing.T) {

//mock
ctrl := gomock.NewController(t)
unlockConn := NewMockZKConnection(ctrl)
lockConn := NewMockZKConnection(ctrl)
factory := NewMockConnectionFactory(ctrl)
unlockConn := utils.NewMockZKConnection(ctrl)
lockConn := utils.NewMockZKConnection(ctrl)
factory := utils.NewMockConnectionFactory(ctrl)
path := "/" + resouseId
factory.EXPECT().NewConnection(time.Duration(expireTime)*time.Second, comp.metadata).Return(lockConn, nil).Times(2)

Expand Down Expand Up @@ -113,9 +114,9 @@ func TestZookeeperLock_ALock_BLock_AUnlock_BLock_BUnlock(t *testing.T) {

//mock
ctrl := gomock.NewController(t)
unlockConn := NewMockZKConnection(ctrl)
lockConn := NewMockZKConnection(ctrl)
factory := NewMockConnectionFactory(ctrl)
unlockConn := utils.NewMockZKConnection(ctrl)
lockConn := utils.NewMockZKConnection(ctrl)
factory := utils.NewMockConnectionFactory(ctrl)
path := "/" + resouseId

factory.EXPECT().NewConnection(time.Duration(expireTime)*time.Second, comp.metadata).Return(lockConn, nil).Times(3)
Expand Down
85 changes: 85 additions & 0 deletions components/pkg/utils/zookeeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package utils

import (
"errors"
"fmt"
"github.com/go-zookeeper/zk"
"strconv"
"strings"
"time"
)

const (
zkHost = "zookeeperHosts"
zkPassword = "zookeeperPassword"
sessionTimeout = "SessionTimeout"
logInfo = "LogInfo"
defaultSessionTimeout = 5 * time.Second
)

type ConnectionFactory interface {
NewConnection(expire time.Duration, meta ZookeeperMetadata) (ZKConnection, error)
}

type ConnectionFactoryImpl struct {
}

func (c *ConnectionFactoryImpl) NewConnection(expire time.Duration, meta ZookeeperMetadata) (ZKConnection, error) {

if expire == 0 {
expire = meta.SessionTimeout
}

conn, _, err := zk.Connect(meta.Hosts, expire, zk.WithLogInfo(meta.LogInfo))
if err != nil {
return nil, err
}
return conn, nil
}

type ZKConnection interface {
Get(path string) ([]byte, *zk.Stat, error)
Set(path string, data []byte, version int32) (*zk.Stat, error)
Delete(path string, version int32) error
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
Close()
}

type ZookeeperMetadata struct {
Hosts []string
Password string
SessionTimeout time.Duration
LogInfo bool
}

func ParseZookeeperMetadata(properties map[string]string) (ZookeeperMetadata, error) {
m := ZookeeperMetadata{}
if val, ok := properties[zkHost]; ok && val != "" {
split := strings.Split(val, ";")
m.Hosts = append(m.Hosts, split...)
} else {
return m, errors.New("zookeeper store error: missing zkHost address")
}

if val, ok := properties[zkPassword]; ok && val != "" {
m.Password = val
}

m.SessionTimeout = defaultSessionTimeout
if val, ok := properties[sessionTimeout]; ok && val != "" {
parsedVal, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("zookeeper store error: can't parse SessionTimeout field: %s", err)
}
m.SessionTimeout = time.Duration(parsedVal) * time.Second
}

if val, ok := properties[logInfo]; ok && val != "" {
b, err := strconv.ParseBool(val)
if err != nil {
return ZookeeperMetadata{}, err
}
m.LogInfo = b
}
return m, nil
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading