Skip to content

Commit

Permalink
Etcd Client Improve (#51)
Browse files Browse the repository at this point in the history
* bulk insertion

* bulk insertion

* do with revision

* NewConfigClientWithErr return err

* fix error

* fix ci error

* add ut

* code block format and modify comment

* code block format and modify comment

* check suffix

* Update client.go

Co-authored-by: Xin.Zh <[email protected]>
  • Loading branch information
ztelur and AlexStocks authored May 4, 2021
1 parent 0091e2a commit 140d707
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 54 deletions.
178 changes: 136 additions & 42 deletions database/kv/etcd/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package gxetcd
import (
"context"
"log"
"strings"
"sync"
"time"
)
Expand All @@ -36,10 +37,26 @@ var (
ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR
// ErrKVPairNotFound not found key
ErrKVPairNotFound = perrors.New("k/v pair not found")
// ErrKVListSizeIllegal k/v list empty or not equal size
ErrKVListSizeIllegal = perrors.New("k/v List is empty or kList's size is not equal to the size of vList")
// ErrCompareFail txn compare fail
ErrCompareFail = perrors.New("txn compare fail")
// ErrRevision revision when error
ErrRevision int64 = -1
)

// NewConfigClient create new Client
func NewConfigClient(opts ...Option) *Client {
newClient, err := NewConfigClientWithErr(opts...)

if err != nil {
log.Printf("new etcd client = error{%v}", err)
}
return newClient
}

// NewConfigClientWithErr create new Client,error
func NewConfigClientWithErr(opts ...Option) (*Client, error) {
options := &Options{
Heartbeat: 1, // default Heartbeat
}
Expand All @@ -52,7 +69,8 @@ func NewConfigClient(opts ...Option) *Client {
log.Printf("new etcd client (Name{%s}, etcd addresses{%v}, Timeout{%d}) = error{%v}",
options.Name, options.Endpoints, options.Timeout, err)
}
return newClient

return newClient, err
}

// Client represents etcd client Configuration
Expand Down Expand Up @@ -211,40 +229,97 @@ func (c *Client) GetEndPoints() []string {
return c.endpoints
}

// if k not exist will put k/v in etcd, otherwise return nil
func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error {
// if k not exist will put k/v in etcd, otherwise return ErrCompareFail
func (c *Client) create(k string, v string, opts ...clientv3.OpOption) error {
rawClient := c.GetRawClient()

if rawClient == nil {
return ErrNilETCDV3Client
}

_, err := rawClient.Txn(c.ctx).
If(clientv3.Compare(clientv3.Version(k), "<", 1)).
resp, err := rawClient.Txn(c.ctx).
If(clientv3.Compare(clientv3.CreateRevision(k), "=", 0)).
Then(clientv3.OpPut(k, v, opts...)).
Commit()
return err
if err != nil {
return err
}
if !resp.Succeeded {
return ErrCompareFail
}

return nil
}

// if k not exist will put k/v in etcd
// if k is already exist in etcd, replace it
func (c *Client) update(k string, v string, opts ...clientv3.OpOption) error {
// if k in bulk insertion not exist all, then put all k/v in etcd, otherwise return error
func (c *Client) batchCreate(kList []string, vList []string, opts ...clientv3.OpOption) error {
rawClient := c.GetRawClient()
if rawClient == nil {
return ErrNilETCDV3Client
}

kLen := len(kList)
vLen := len(vList)
if kLen == 0 || vLen == 0 || kLen != vLen {
return ErrKVListSizeIllegal
}

var cs []clientv3.Cmp
var ops []clientv3.Op

for i, k := range kList {
v := vList[i]
cs = append(cs, clientv3.Compare(clientv3.CreateRevision(k), "=", 0))
ops = append(ops, clientv3.OpPut(k, v, opts...))
}

resp, err := rawClient.Txn(c.ctx).
If(cs...).
Then(ops...).
Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return ErrCompareFail
}

return nil
}

// put k/v in etcd, if fail return error
func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error {
rawClient := c.GetRawClient()
if rawClient == nil {
return ErrNilETCDV3Client
}

_, err := rawClient.Put(c.ctx, k, v, opts...)
return err
}

// put k/v in etcd when ModRevision equal with rev, if not return ErrCompareFail or other err
func (c *Client) updateWithRev(k string, v string, rev int64, opts ...clientv3.OpOption) error {
rawClient := c.GetRawClient()
if rawClient == nil {
return ErrNilETCDV3Client
}

_, err := rawClient.Txn(c.ctx).
If(clientv3.Compare(clientv3.Version(k), "!=", -1)).
resp, err := rawClient.Txn(c.ctx).
If(clientv3.Compare(clientv3.ModRevision(k), "=", rev)).
Then(clientv3.OpPut(k, v, opts...)).
Commit()
return err
if err != nil {
return err
}
if !resp.Succeeded {
return ErrCompareFail
}

return nil
}

func (c *Client) delete(k string) error {
rawClient := c.GetRawClient()

if rawClient == nil {
return ErrNilETCDV3Client
}
Expand All @@ -253,29 +328,28 @@ func (c *Client) delete(k string) error {
return err
}

func (c *Client) get(k string) (string, error) {
// getValAndRev get value and revision
func (c *Client) getValAndRev(k string) (string, int64, error) {
rawClient := c.GetRawClient()

if rawClient == nil {
return "", ErrNilETCDV3Client
return "", ErrRevision, ErrNilETCDV3Client
}

resp, err := rawClient.Get(c.ctx, k)
if err != nil {
return "", err
return "", ErrRevision, err
}

if len(resp.Kvs) == 0 {
return "", ErrKVPairNotFound
return "", ErrRevision, ErrKVPairNotFound
}

return string(resp.Kvs[0].Value), nil
return string(resp.Kvs[0].Value), resp.Header.Revision, nil
}

// CleanKV delete all key and value
func (c *Client) CleanKV() error {
rawClient := c.GetRawClient()

if rawClient == nil {
return ErrNilETCDV3Client
}
Expand All @@ -287,11 +361,14 @@ func (c *Client) CleanKV() error {
// GetChildren return node children
func (c *Client) GetChildren(k string) ([]string, []string, error) {
rawClient := c.GetRawClient()

if rawClient == nil {
return nil, nil, ErrNilETCDV3Client
}

if !strings.HasSuffix(k, "/") {
k += "/"
}

resp, err := rawClient.Get(c.ctx, k, clientv3.WithPrefix())
if err != nil {
return nil, nil, err
Expand All @@ -310,29 +387,18 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) {
return kList, vList, nil
}

func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) {
// watchWithOption watch
func (c *Client) watchWithOption(k string, opts ...clientv3.OpOption) (clientv3.WatchChan, error) {
rawClient := c.GetRawClient()

if rawClient == nil {
return nil, ErrNilETCDV3Client
}

return rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil
}

func (c *Client) watch(k string) (clientv3.WatchChan, error) {
rawClient := c.GetRawClient()

if rawClient == nil {
return nil, ErrNilETCDV3Client
}

return rawClient.Watch(c.ctx, k), nil
return rawClient.Watch(c.ctx, k, opts...), nil
}

func (c *Client) keepAliveKV(k string, v string) error {
rawClient := c.GetRawClient()

if rawClient == nil {
return ErrNilETCDV3Client
}
Expand Down Expand Up @@ -376,13 +442,25 @@ func (c *Client) Valid() bool {

// Create key value ...
func (c *Client) Create(k string, v string) error {
err := c.put(k, v)
err := c.create(k, v)
return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v)
}

// BatchCreate bulk insertion
func (c *Client) BatchCreate(kList []string, vList []string) error {
err := c.batchCreate(kList, vList)
return perrors.WithMessagef(err, "batch put k/v error ")
}

// Update key value ...
func (c *Client) Update(k, v string) error {
err := c.update(k, v)
err := c.put(k, v)
return perrors.WithMessagef(err, "Update k/v (key: %s value %s)", k, v)
}

// Update key value ...
func (c *Client) UpdateWithRev(k, v string, rev int64, opts ...clientv3.OpOption) error {
err := c.updateWithRev(k, v, rev, opts...)
return perrors.WithMessagef(err, "Update k/v (key: %s value %s)", k, v)
}

Expand All @@ -404,20 +482,36 @@ func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
return kList, vList, perrors.WithMessagef(err, "get key children (key %s)", k)
}

// GetValAndRev gets value and revision by @k
func (c *Client) GetValAndRev(k string) (string, int64, error) {
v, rev, err := c.getValAndRev(k)
return v, rev, perrors.WithMessagef(err, "get key value (key %s)", k)
}

// Get gets value by @k
func (c *Client) Get(k string) (string, error) {
v, err := c.get(k)
v, _, err := c.getValAndRev(k)
return v, perrors.WithMessagef(err, "get key value (key %s)", k)
}

// Watch watches on spec key
func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
wc, err := c.watch(k)
return wc, perrors.WithMessagef(err, "watch prefix (key %s)", k)
wc, err := c.watchWithOption(k)
return wc, perrors.WithMessagef(err, "watch (key %s)", k)
}

// WatchWithPrefix watches on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) {
wc, err := c.watchWithPrefix(prefix)
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}

wc, err := c.watchWithOption(prefix, clientv3.WithPrefix())
return wc, perrors.WithMessagef(err, "watch prefix (key %s)", prefix)
}

// Watch watches on spc key with OpOption
func (c *Client) WatchWithOption(k string, opts ...clientv3.OpOption) (clientv3.WatchChan, error) {
wc, err := c.watchWithOption(k, opts...)
return wc, perrors.WithMessagef(err, "watch (key %s)", k)
}
Loading

0 comments on commit 140d707

Please sign in to comment.