From 140d7076cb9d2b3cf1356ea5e86c3472d2ff0ba0 Mon Sep 17 00:00:00 2001 From: randy Date: Tue, 4 May 2021 12:55:04 +0800 Subject: [PATCH] Etcd Client Improve (#51) * bulk insertion * bulk insertion * do with revision * NewConfigClientWithErr return err * fix error * fix ci error * add ut * code block format and modify comment * code block format and modify comment * check suffix * Update client.go Co-authored-by: Xin.Zh --- database/kv/etcd/v3/client.go | 178 ++++++++++++++++++++++------- database/kv/etcd/v3/client_test.go | 101 ++++++++++++++-- 2 files changed, 225 insertions(+), 54 deletions(-) diff --git a/database/kv/etcd/v3/client.go b/database/kv/etcd/v3/client.go index 254b1e9..dd475fb 100644 --- a/database/kv/etcd/v3/client.go +++ b/database/kv/etcd/v3/client.go @@ -20,6 +20,7 @@ package gxetcd import ( "context" "log" + "strings" "sync" "time" ) @@ -36,10 +37,26 @@ var ( ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR // ErrKVPairNotFound not found key ErrKVPairNotFound = perrors.New("k/v pair not found") + // ErrKVListSizeIllegal k/v list empty or not equal size + ErrKVListSizeIllegal = perrors.New("k/v List is empty or kList's size is not equal to the size of vList") + // ErrCompareFail txn compare fail + ErrCompareFail = perrors.New("txn compare fail") + // ErrRevision revision when error + ErrRevision int64 = -1 ) // NewConfigClient create new Client func NewConfigClient(opts ...Option) *Client { + newClient, err := NewConfigClientWithErr(opts...) + + if err != nil { + log.Printf("new etcd client = error{%v}", err) + } + return newClient +} + +// NewConfigClientWithErr create new Client,error +func NewConfigClientWithErr(opts ...Option) (*Client, error) { options := &Options{ Heartbeat: 1, // default Heartbeat } @@ -52,7 +69,8 @@ func NewConfigClient(opts ...Option) *Client { log.Printf("new etcd client (Name{%s}, etcd addresses{%v}, Timeout{%d}) = error{%v}", options.Name, options.Endpoints, options.Timeout, err) } - return newClient + + return newClient, err } // Client represents etcd client Configuration @@ -211,40 +229,97 @@ func (c *Client) GetEndPoints() []string { return c.endpoints } -// if k not exist will put k/v in etcd, otherwise return nil -func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error { +// if k not exist will put k/v in etcd, otherwise return ErrCompareFail +func (c *Client) create(k string, v string, opts ...clientv3.OpOption) error { rawClient := c.GetRawClient() - if rawClient == nil { return ErrNilETCDV3Client } - _, err := rawClient.Txn(c.ctx). - If(clientv3.Compare(clientv3.Version(k), "<", 1)). + resp, err := rawClient.Txn(c.ctx). + If(clientv3.Compare(clientv3.CreateRevision(k), "=", 0)). Then(clientv3.OpPut(k, v, opts...)). Commit() - return err + if err != nil { + return err + } + if !resp.Succeeded { + return ErrCompareFail + } + + return nil } -// if k not exist will put k/v in etcd -// if k is already exist in etcd, replace it -func (c *Client) update(k string, v string, opts ...clientv3.OpOption) error { +// if k in bulk insertion not exist all, then put all k/v in etcd, otherwise return error +func (c *Client) batchCreate(kList []string, vList []string, opts ...clientv3.OpOption) error { rawClient := c.GetRawClient() + if rawClient == nil { + return ErrNilETCDV3Client + } + + kLen := len(kList) + vLen := len(vList) + if kLen == 0 || vLen == 0 || kLen != vLen { + return ErrKVListSizeIllegal + } + + var cs []clientv3.Cmp + var ops []clientv3.Op + + for i, k := range kList { + v := vList[i] + cs = append(cs, clientv3.Compare(clientv3.CreateRevision(k), "=", 0)) + ops = append(ops, clientv3.OpPut(k, v, opts...)) + } + + resp, err := rawClient.Txn(c.ctx). + If(cs...). + Then(ops...). + Commit() + if err != nil { + return err + } + if !resp.Succeeded { + return ErrCompareFail + } + + return nil +} + +// put k/v in etcd, if fail return error +func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error { + rawClient := c.GetRawClient() + if rawClient == nil { + return ErrNilETCDV3Client + } + + _, err := rawClient.Put(c.ctx, k, v, opts...) + return err +} +// put k/v in etcd when ModRevision equal with rev, if not return ErrCompareFail or other err +func (c *Client) updateWithRev(k string, v string, rev int64, opts ...clientv3.OpOption) error { + rawClient := c.GetRawClient() if rawClient == nil { return ErrNilETCDV3Client } - _, err := rawClient.Txn(c.ctx). - If(clientv3.Compare(clientv3.Version(k), "!=", -1)). + resp, err := rawClient.Txn(c.ctx). + If(clientv3.Compare(clientv3.ModRevision(k), "=", rev)). Then(clientv3.OpPut(k, v, opts...)). Commit() - return err + if err != nil { + return err + } + if !resp.Succeeded { + return ErrCompareFail + } + + return nil } func (c *Client) delete(k string) error { rawClient := c.GetRawClient() - if rawClient == nil { return ErrNilETCDV3Client } @@ -253,29 +328,28 @@ func (c *Client) delete(k string) error { return err } -func (c *Client) get(k string) (string, error) { +// getValAndRev get value and revision +func (c *Client) getValAndRev(k string) (string, int64, error) { rawClient := c.GetRawClient() - if rawClient == nil { - return "", ErrNilETCDV3Client + return "", ErrRevision, ErrNilETCDV3Client } resp, err := rawClient.Get(c.ctx, k) if err != nil { - return "", err + return "", ErrRevision, err } if len(resp.Kvs) == 0 { - return "", ErrKVPairNotFound + return "", ErrRevision, ErrKVPairNotFound } - return string(resp.Kvs[0].Value), nil + return string(resp.Kvs[0].Value), resp.Header.Revision, nil } // CleanKV delete all key and value func (c *Client) CleanKV() error { rawClient := c.GetRawClient() - if rawClient == nil { return ErrNilETCDV3Client } @@ -287,11 +361,14 @@ func (c *Client) CleanKV() error { // GetChildren return node children func (c *Client) GetChildren(k string) ([]string, []string, error) { rawClient := c.GetRawClient() - if rawClient == nil { return nil, nil, ErrNilETCDV3Client } + if !strings.HasSuffix(k, "/") { + k += "/" + } + resp, err := rawClient.Get(c.ctx, k, clientv3.WithPrefix()) if err != nil { return nil, nil, err @@ -310,29 +387,18 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) { return kList, vList, nil } -func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) { +// watchWithOption watch +func (c *Client) watchWithOption(k string, opts ...clientv3.OpOption) (clientv3.WatchChan, error) { rawClient := c.GetRawClient() - if rawClient == nil { return nil, ErrNilETCDV3Client } - return rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil -} - -func (c *Client) watch(k string) (clientv3.WatchChan, error) { - rawClient := c.GetRawClient() - - if rawClient == nil { - return nil, ErrNilETCDV3Client - } - - return rawClient.Watch(c.ctx, k), nil + return rawClient.Watch(c.ctx, k, opts...), nil } func (c *Client) keepAliveKV(k string, v string) error { rawClient := c.GetRawClient() - if rawClient == nil { return ErrNilETCDV3Client } @@ -376,13 +442,25 @@ func (c *Client) Valid() bool { // Create key value ... func (c *Client) Create(k string, v string) error { - err := c.put(k, v) + err := c.create(k, v) return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v) } +// BatchCreate bulk insertion +func (c *Client) BatchCreate(kList []string, vList []string) error { + err := c.batchCreate(kList, vList) + return perrors.WithMessagef(err, "batch put k/v error ") +} + // Update key value ... func (c *Client) Update(k, v string) error { - err := c.update(k, v) + err := c.put(k, v) + return perrors.WithMessagef(err, "Update k/v (key: %s value %s)", k, v) +} + +// Update key value ... +func (c *Client) UpdateWithRev(k, v string, rev int64, opts ...clientv3.OpOption) error { + err := c.updateWithRev(k, v, rev, opts...) return perrors.WithMessagef(err, "Update k/v (key: %s value %s)", k, v) } @@ -404,20 +482,36 @@ func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) { return kList, vList, perrors.WithMessagef(err, "get key children (key %s)", k) } +// GetValAndRev gets value and revision by @k +func (c *Client) GetValAndRev(k string) (string, int64, error) { + v, rev, err := c.getValAndRev(k) + return v, rev, perrors.WithMessagef(err, "get key value (key %s)", k) +} + // Get gets value by @k func (c *Client) Get(k string) (string, error) { - v, err := c.get(k) + v, _, err := c.getValAndRev(k) return v, perrors.WithMessagef(err, "get key value (key %s)", k) } // Watch watches on spec key func (c *Client) Watch(k string) (clientv3.WatchChan, error) { - wc, err := c.watch(k) - return wc, perrors.WithMessagef(err, "watch prefix (key %s)", k) + wc, err := c.watchWithOption(k) + return wc, perrors.WithMessagef(err, "watch (key %s)", k) } // WatchWithPrefix watches on spec prefix func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) { - wc, err := c.watchWithPrefix(prefix) + if !strings.HasSuffix(prefix, "/") { + prefix += "/" + } + + wc, err := c.watchWithOption(prefix, clientv3.WithPrefix()) return wc, perrors.WithMessagef(err, "watch prefix (key %s)", prefix) } + +// Watch watches on spc key with OpOption +func (c *Client) WatchWithOption(k string, opts ...clientv3.OpOption) (clientv3.WatchChan, error) { + wc, err := c.watchWithOption(k, opts...) + return wc, perrors.WithMessagef(err, "watch (key %s)", k) +} diff --git a/database/kv/etcd/v3/client_test.go b/database/kv/etcd/v3/client_test.go index 9830ee6..6feefc3 100644 --- a/database/kv/etcd/v3/client_test.go +++ b/database/kv/etcd/v3/client_test.go @@ -48,15 +48,15 @@ var tests = []struct { {input: struct { k string v string - }{k: "name", v: "scott.wang"}}, + }{k: "name/name", v: "scott.wang"}}, {input: struct { k string v string - }{k: "namePrefix", v: "prefix.scott.wang"}}, + }{k: "name/namePrefix", v: "prefix.scott.wang"}}, {input: struct { k string v string - }{k: "namePrefix1", v: "prefix1.scott.wang"}}, + }{k: "name/namePrefix1", v: "prefix1.scott.wang"}}, {input: struct { k string v string @@ -64,7 +64,8 @@ var tests = []struct { } // test dataset prefix -const prefix = "name" +const prefixKey = "name/" +const keyPrefix = "name/name" type ClientTestSuite struct { suite.Suite @@ -118,10 +119,10 @@ func (suite *ClientTestSuite) TearDownSuite() { } func (suite *ClientTestSuite) setUpClient() *Client { - c, err := NewClient(suite.etcdConfig.name, - suite.etcdConfig.endpoints, - suite.etcdConfig.timeout, - suite.etcdConfig.heartbeat) + c, err := NewConfigClientWithErr(WithName(suite.etcdConfig.name), + WithEndpoints(suite.etcdConfig.endpoints...), + WithTimeout(suite.etcdConfig.timeout), + WithHeartbeat(suite.etcdConfig.heartbeat)) if err != nil { suite.T().Fatal(err) } @@ -204,6 +205,82 @@ func (suite *ClientTestSuite) TestClientCreateKV() { } } +func (suite *ClientTestSuite) TestBatchClientCreateKV() { + tests := tests + + c := suite.client + t := suite.T() + + defer suite.client.Close() + + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + expect := tc.input.v + kList := make([]string, 0, 1) + vList := make([]string, 0, 1) + kList = append(kList, k) + vList = append(vList, v) + + if err := c.BatchCreate(kList, vList); err != nil { + t.Fatal(err) + } + + value, err := c.Get(k) + if err != nil { + t.Fatal(err) + } + + if value != expect { + t.Fatalf("expect %v but get %v", expect, value) + } + } +} + +func (suite *ClientTestSuite) TestBatchClientGetValAndRevKV() { + tests := tests + + c := suite.client + t := suite.T() + + defer suite.client.Close() + + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + expect := tc.input.v + kList := make([]string, 0, 1) + vList := make([]string, 0, 1) + kList = append(kList, k) + vList = append(vList, v) + + if err := c.BatchCreate(kList, vList); err != nil { + t.Fatal(err) + } + + value, revision, err := c.getValAndRev(k) + if err != nil { + t.Fatal(err) + } + + err = c.UpdateWithRev(k, k, revision) + if err != nil { + t.Fatal(err) + } + + err = c.Update(k, k) + if err != nil { + t.Fatal(err) + } + + if value != expect { + t.Fatalf("expect %v but get %v", expect, value) + } + } +} + func (suite *ClientTestSuite) TestClientDeleteKV() { tests := tests c := suite.client @@ -250,7 +327,7 @@ func (suite *ClientTestSuite) TestClientGetChildrenKVList() { k := tc.input.k v := tc.input.v - if strings.Contains(k, prefix) { + if strings.Contains(k, prefixKey) { expectKList = append(expectKList, k) expectVList = append(expectVList, v) } @@ -260,7 +337,7 @@ func (suite *ClientTestSuite) TestClientGetChildrenKVList() { } } - kList, vList, err := c.GetChildrenKVList(prefix) + kList, vList, err := c.GetChildrenKVList(prefixKey) if err != nil { t.Fatal(err) } @@ -297,7 +374,7 @@ func (suite *ClientTestSuite) TestClientWatch() { c.Close() }() - wc, err := c.watch(prefix) + wc, err := c.WatchWithOption(keyPrefix) if err != nil { assert.Error(t, err) } @@ -338,7 +415,7 @@ func (suite *ClientTestSuite) TestClientRegisterTemp() { }() completePath := path.Join("scott", "wang") - wc, err := observeC.watch(completePath) + wc, err := observeC.watchWithOption(completePath) if err != nil { assert.Error(t, err) }