From 2cd42574dbac83290bd12459a25297d473823d96 Mon Sep 17 00:00:00 2001 From: vkropotko Date: Wed, 7 Apr 2021 18:02:55 +0200 Subject: [PATCH 1/4] feat(esClientDrain): enhance Drain ES Client function Add an ability to set parameters for retry logic with Resty lib Add handling for not finishned shards migration Signed-off-by: vkropotko --- main.go | 36 ++++++++----- operator/elasticsearch.go | 16 ++++-- operator/elasticsearch_test.go | 12 ++++- operator/es_client.go | 90 +++++++++++++++++++++++++------ operator/es_client_test.go | 98 ++++++++++++++++++++++++++++++++-- operator/operator.go | 7 +-- operator/operator_test.go | 12 ++--- 7 files changed, 227 insertions(+), 44 deletions(-) diff --git a/main.go b/main.go index b3cc23cf..b2555ff9 100644 --- a/main.go +++ b/main.go @@ -26,22 +26,28 @@ const ( defaultMetricsAddress = ":7979" defaultClientGoTimeout = 30 * time.Second defaultClusterDNSZone = "cluster.local." + defaultRetryCount = "999" + defaultRetryWaitTime = "10s" + defaultRetryMaxWaitTime = "30s" ) var ( config struct { - Interval time.Duration - AutoscalerInterval time.Duration - APIServer *url.URL - PodSelectors Labels - PriorityNodeSelectors Labels - MetricsAddress string - ClientGoTimeout time.Duration - Debug bool - OperatorID string - Namespace string - ClusterDNSZone string - ElasticsearchEndpoint *url.URL + Interval time.Duration + AutoscalerInterval time.Duration + APIServer *url.URL + PodSelectors Labels + PriorityNodeSelectors Labels + MetricsAddress string + ClientGoTimeout time.Duration + Debug bool + OperatorID string + Namespace string + ClusterDNSZone string + ElasticsearchEndpoint *url.URL + EsClientRetryCount int + EsClientRetryWaitTime time.Duration + EsClientRetryMaxWaitTime time.Duration } ) @@ -71,6 +77,9 @@ func main() { URLVar(&config.ElasticsearchEndpoint) kingpin.Flag("namespace", "Limit operator to a certain namespace"). Default(v1.NamespaceAll).StringVar(&config.Namespace) + kingpin.Flag("esclient-retry-count", "Count of retry operations conducted by EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryCount).IntVar(&config.EsClientRetryCount) + kingpin.Flag("esclient-retry-waittime", "Wait time between two operations of EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryWaitTime).DurationVar(&config.EsClientRetryWaitTime) + kingpin.Flag("esclient-retry-max-waittime", "Max wait time between two operations of EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryMaxWaitTime).DurationVar(&config.EsClientRetryMaxWaitTime) kingpin.Parse() @@ -98,6 +107,9 @@ func main() { config.Namespace, config.ClusterDNSZone, config.ElasticsearchEndpoint, + config.EsClientRetryCount, + config.EsClientRetryWaitTime, + config.EsClientRetryMaxWaitTime, ) go handleSigterm(cancel) diff --git a/operator/elasticsearch.go b/operator/elasticsearch.go index e983acb6..5bab858e 100644 --- a/operator/elasticsearch.go +++ b/operator/elasticsearch.go @@ -47,7 +47,8 @@ type ElasticsearchOperator struct { elasticsearchEndpoint *url.URL operating map[types.UID]operatingEntry sync.Mutex - recorder kube_record.EventRecorder + recorder kube_record.EventRecorder + esClientRestyConfig *RestyConfig } type operatingEntry struct { @@ -66,6 +67,9 @@ func NewElasticsearchOperator( namespace, clusterDNSZone string, elasticsearchEndpoint *url.URL, + esClientRetryCount int, + esClientRetryWaitTime, + esClientRetryMaxWaitTime time.Duration, ) *ElasticsearchOperator { return &ElasticsearchOperator{ logger: log.WithFields( @@ -84,6 +88,11 @@ func NewElasticsearchOperator( elasticsearchEndpoint: elasticsearchEndpoint, operating: make(map[types.UID]operatingEntry), recorder: createEventRecorder(client), + esClientRestyConfig: &RestyConfig{ + ClientRetryCount: esClientRetryCount, + ClientRetryWaitTime: esClientRetryWaitTime, + ClientRetryMaxWaitTime: esClientRetryMaxWaitTime, + }, } } @@ -497,8 +506,8 @@ func (r *EDSResource) ensureService(ctx context.Context) error { } // Drain drains a pod for Elasticsearch data. -func (r *EDSResource) Drain(ctx context.Context, pod *v1.Pod) error { - return r.esClient.Drain(ctx, pod) +func (r *EDSResource) Drain(ctx context.Context, pod *v1.Pod, config *RestyConfig) error { + return r.esClient.Drain(ctx, pod, config) } // PreScaleDownHook ensures that the IndexReplicas is set as defined in the EDS @@ -641,6 +650,7 @@ func (o *ElasticsearchOperator) operateEDS(eds *zv1.ElasticsearchDataSet, delete interval: o.interval, logger: logger, recorder: o.recorder, + esClientRestyConfig: o.esClientRestyConfig, } rs := &EDSResource{ diff --git a/operator/elasticsearch_test.go b/operator/elasticsearch_test.go index 9e085db9..d2641691 100644 --- a/operator/elasticsearch_test.go +++ b/operator/elasticsearch_test.go @@ -15,6 +15,12 @@ import ( "k8s.io/apimachinery/pkg/types" ) +const ( + defaultRetryCount = 999 + defaultRetryWaitTime = 10 * time.Second + defaultRetryMaxWaitTime = 30 * time.Second +) + func TestHasOwnership(t *testing.T) { eds := &zv1.ElasticsearchDataSet{ ObjectMeta: metav1.ObjectMeta{ @@ -43,7 +49,8 @@ func TestGetElasticsearchEndpoint(t *testing.T) { faker := &clientset.Clientset{ Interface: fake.NewSimpleClientset(), } - esOperator := NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", "cluster.local.", nil) + esOperator := NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", "cluster.local.", nil, + defaultRetryCount, defaultRetryWaitTime, defaultRetryMaxWaitTime) eds := &zv1.ElasticsearchDataSet{ ObjectMeta: metav1.ObjectMeta{ @@ -59,7 +66,8 @@ func TestGetElasticsearchEndpoint(t *testing.T) { customEndpoint, err := url.Parse(customURL) assert.NoError(t, err) - esOperator = NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", ".cluster.local.", customEndpoint) + esOperator = NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", ".cluster.local.", customEndpoint, + defaultRetryCount, defaultRetryWaitTime, defaultRetryMaxWaitTime) url = esOperator.getElasticsearchEndpoint(eds) assert.Equal(t, customURL, url.String()) } diff --git a/operator/es_client.go b/operator/es_client.go index cb987ba1..9dfe254b 100644 --- a/operator/es_client.go +++ b/operator/es_client.go @@ -17,12 +17,12 @@ import ( v1 "k8s.io/api/core/v1" ) -// TODO make configurable as flags. -var ( - defaultRetryCount = 999 - defaultRetryWaitTime = 10 * time.Second - defaultRetryMaxWaitTime = 30 * time.Second -) +// Restry Configuration +type RestyConfig struct { + ClientRetryCount int + ClientRetryWaitTime time.Duration + ClientRetryMaxWaitTime time.Duration +} // ESClient is a pod drainer which can drain data from Elasticsearch pods. type ESClient struct { @@ -91,7 +91,7 @@ func (c *ESClient) logger() *log.Entry { } // Drain drains data from an Elasticsearch pod. -func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod) error { +func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod, config *RestyConfig) error { c.logger().Info("Ensuring cluster is in green state") @@ -111,7 +111,7 @@ func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod) error { } c.logger().Info("Waiting for draining to finish") - return c.waitForEmptyEsNode(ctx, pod) + return c.waitForEmptyEsNode(ctx, pod, config) } func (c *ESClient) Cleanup(ctx context.Context) error { @@ -220,6 +220,7 @@ func (c *ESClient) excludePodIP(pod *v1.Pod) error { if excludeString != "" { ips = strings.Split(excludeString, ",") } + var foundPodIP bool for _, ip := range ips { if ip == podIP { @@ -256,6 +257,42 @@ func (c *ESClient) setExcludeIPs(ips string) error { return nil } +// remove the podIP from Elasticsearch exclude._ip list +func (c *ESClient) removeFromExcludeIPList(pod *v1.Pod) error { + + c.mux.Lock() + defer c.mux.Unlock() + + podIP := pod.Status.PodIP + + esSettings, err := c.getClusterSettings() + if err != nil { + return err + } + + excludedIPsString := esSettings.Transient.Cluster.Routing.Allocation.Exclude.IP + excludedIPs := strings.Split(excludedIPsString, ",") + var newExcludedIPs []string + for _, excludeIP := range excludedIPs { + if excludeIP != podIP { + newExcludedIPs = append(newExcludedIPs, excludeIP) + sort.Strings(newExcludedIPs) + } + } + + newExcludedIPsString := strings.Join(newExcludedIPs, ",") + if newExcludedIPsString != excludedIPsString { + c.logger().Infof("Setting exclude list to '%s'", newExcludedIPsString) + + err = c.setExcludeIPs(newExcludedIPsString) + if err != nil { + return err + } + } + + return nil +} + func (c *ESClient) updateAutoRebalance(value string) error { resp, err := resty.New().R(). SetHeader("Content-Type", "application/json"). @@ -276,13 +313,13 @@ func (c *ESClient) updateAutoRebalance(value string) error { } // repeatedly query shard allocations to ensure success of drain operation. -func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error { +func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod, config *RestyConfig) error { // TODO: implement context handling podIP := pod.Status.PodIP - _, err := resty.New(). - SetRetryCount(defaultRetryCount). - SetRetryWaitTime(defaultRetryWaitTime). - SetRetryMaxWaitTime(defaultRetryMaxWaitTime). + resp, err := resty.New(). + SetRetryCount(config.ClientRetryCount). + SetRetryWaitTime(config.ClientRetryWaitTime). + SetRetryMaxWaitTime(config.ClientRetryMaxWaitTime). AddRetryCondition( // It is expected to return (bool, error) pair. Resty will retry // in case condition returns true or non nil error. @@ -292,7 +329,6 @@ func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error { if err != nil { return true, err } - // shardIP := make(map[string]bool) remainingShards := 0 for _, shard := range shards { if shard.IP == podIP { @@ -312,9 +348,33 @@ func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error { }, ).R(). Get(c.Endpoint.String() + "/_cat/shards?h=index,ip&format=json") + if err != nil { return err } + + // make sure the IP is still excluded, this could have been updated in the meantime. + if err = c.excludePodIP(pod); err != nil { + return err + } + + var shards []ESShard + err = json.Unmarshal(resp.Body(), &shards) + if err != nil { + return err + } + + for _, shard := range shards { + if shard.IP == podIP { + err = fmt.Errorf("Cannot migrate shards from pod '%s' with IP '%s' within provided intervals", pod.ObjectMeta.Name, pod.Status.PodIP) + // if we cannot remove node than return it back active nodes pool + if errExclude := c.removeFromExcludeIPList(pod); errExclude != nil { + return fmt.Errorf("during handling request error: '%v' another error has been raised '%v'", err, errExclude) + } + return err + } + } + return nil } @@ -452,7 +512,7 @@ func (c *ESClient) CreateIndex(indexName, groupName string, shards, replicas int SetHeader("Content-Type", "application/json"). SetBody([]byte( fmt.Sprintf( - `{"settings": {"index" : {"number_of_replicas" : "%d", "number_of_shards": "%d", + `{"settings": {"index" : {"number_of_replicas" : "%d", "number_of_shards": "%d", "routing.allocation.include.group": "%s"}}}`, replicas, shards, diff --git a/operator/es_client_test.go b/operator/es_client_test.go index be73a37b..358d7aa5 100644 --- a/operator/es_client_test.go +++ b/operator/es_client_test.go @@ -2,8 +2,11 @@ package operator import ( "context" + "encoding/json" + "net/http" "net/url" "testing" + "time" "github.com/jarcoal/httpmock" "github.com/stretchr/testify/assert" @@ -28,18 +31,25 @@ func TestDrain(t *testing.T) { systemUnderTest := &ESClient{ Endpoint: url, } + err := systemUnderTest.Drain(context.TODO(), &v1.Pod{ Status: v1.PodStatus{ PodIP: "1.2.3.4", }, - }) + }, + &RestyConfig{ + ClientRetryCount: 999, + ClientRetryWaitTime: 10 * time.Second, + ClientRetryMaxWaitTime: 30 * time.Second, + }, + ) assert.NoError(t, err) info := httpmock.GetCallCountInfo() require.EqualValues(t, 1, info["GET http://elasticsearch:9200/_cluster/health"]) - require.EqualValues(t, 2, info["PUT http://elasticsearch:9200/_cluster/settings"]) - require.EqualValues(t, 1, info["GET http://elasticsearch:9200/_cluster/settings"]) + require.EqualValues(t, 3, info["PUT http://elasticsearch:9200/_cluster/settings"]) + require.EqualValues(t, 2, info["GET http://elasticsearch:9200/_cluster/settings"]) require.EqualValues(t, 1, info["GET http://elasticsearch:9200/_cat/shards"]) } @@ -239,3 +249,85 @@ func TestEnsureGreenClusterState(t *testing.T) { assert.Error(t, err) } + +func TestExcludeIP(t *testing.T) { + httpmock.Activate() + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cluster/settings", + httpmock.NewStringResponder(200, `{"persistent":{},"transient":{"cluster":{"routing":{"rebalance":{"enable":"all"},"allocation":{"exclude":{"_ip":"192.168.1.1,192.168.1.3"}}}}}}`)) + + httpmock.RegisterResponder("PUT", "http://elasticsearch:9200/_cluster/settings", + func(req *http.Request) (*http.Response, error) { + + update := make(map[string]map[string]string) + if err := json.NewDecoder(req.Body).Decode(&update); err != nil { + return httpmock.NewStringResponse(400, ""), nil + } + + if update["transient"]["cluster.routing.allocation.exclude._ip"] != "192.168.1.1,192.168.1.2,192.168.1.3" { + return httpmock.NewStringResponse(400, ""), nil + } + + resp, err := httpmock.NewJsonResponse(200, "") + if err != nil { + return httpmock.NewStringResponse(500, ""), nil + } + return resp, nil + }, + ) + + url, _ := url.Parse("http://elasticsearch:9200") + systemUnderTest := &ESClient{ + Endpoint: url, + } + + err := systemUnderTest.excludePodIP(&v1.Pod{ + Status: v1.PodStatus{ + PodIP: "192.168.1.2", + }, + }) + + assert.NoError(t, err) +} + +func TestRemoveFromExcludeIPList(t *testing.T) { + httpmock.Activate() + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cluster/settings", + httpmock.NewStringResponder(200, `{"persistent":{},"transient":{"cluster":{"routing":{"rebalance":{"enable":"all"},"allocation":{"exclude":{"_ip":"192.168.1.1,192.168.1.2,192.168.1.3"}}}}}}`)) + + httpmock.RegisterResponder("PUT", "http://elasticsearch:9200/_cluster/settings", + func(req *http.Request) (*http.Response, error) { + + update := make(map[string]map[string]string) + if err := json.NewDecoder(req.Body).Decode(&update); err != nil { + return httpmock.NewStringResponse(400, ""), nil + } + + if update["transient"]["cluster.routing.allocation.exclude._ip"] != "192.168.1.1,192.168.1.3" { + return httpmock.NewStringResponse(400, ""), nil + } + + resp, err := httpmock.NewJsonResponse(200, "") + if err != nil { + return httpmock.NewStringResponse(500, ""), nil + } + return resp, nil + }, + ) + + url, _ := url.Parse("http://elasticsearch:9200") + systemUnderTest := &ESClient{ + Endpoint: url, + } + + err := systemUnderTest.removeFromExcludeIPList(&v1.Pod{ + Status: v1.PodStatus{ + PodIP: "192.168.1.2", + }, + }) + + assert.NoError(t, err) +} diff --git a/operator/operator.go b/operator/operator.go index f65d4abb..254d6765 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -86,7 +86,7 @@ type StatefulResource interface { // Drain drains a pod for data. It's expected that the method only // returns after the pod has been drained. - Drain(ctx context.Context, pod *v1.Pod) error + Drain(ctx context.Context, pod *v1.Pod, config *RestyConfig) error } // Operator is a generic operator that can manage Pods filtered by a selector. @@ -96,6 +96,7 @@ type Operator struct { interval time.Duration logger *log.Entry recorder kube_record.EventRecorder + esClientRestyConfig *RestyConfig } func (o *Operator) Run(ctx context.Context, done chan<- struct{}, sr StatefulResource) { @@ -353,7 +354,7 @@ func (o *Operator) operatePods(ctx context.Context, sts *appsv1.StatefulSet, sr // drain Pod o.recorder.Event(sr.Self(), v1.EventTypeNormal, "DrainingPod", fmt.Sprintf("Draining Pod '%s/%s'", pod.Namespace, pod.Name)) - err = sr.Drain(ctx, pod) + err = sr.Drain(ctx, pod, o.esClientRestyConfig) if err != nil { return fmt.Errorf("failed to drain Pod %s/%s: %v", pod.Namespace, pod.Name, err) } @@ -452,7 +453,7 @@ func (o *Operator) rescaleStatefulSet(ctx context.Context, sts *appsv1.StatefulS } log.Infof("Draining Pod %s/%s for scaledown", pod.Namespace, pod.Name) - err := sr.Drain(ctx, &pod) + err := sr.Drain(ctx, &pod, o.esClientRestyConfig) if err != nil { return fmt.Errorf("failed to drain pod %s/%s: %v", pod.Namespace, pod.Name, err) } diff --git a/operator/operator_test.go b/operator/operator_test.go index 26e44ccc..fbf50ca9 100644 --- a/operator/operator_test.go +++ b/operator/operator_test.go @@ -43,12 +43,12 @@ func (r *mockResource) PodTemplateSpec() *v1.PodTemplateSpec { return r.podTempl func (r *mockResource) VolumeClaimTemplates() []v1.PersistentVolumeClaim { return r.volumeClaimTemplates } -func (r *mockResource) Self() runtime.Object { return r.eds } -func (r *mockResource) EnsureResources(ctx context.Context) error { return nil } -func (r *mockResource) UpdateStatus(ctx context.Context, sts *appsv1.StatefulSet) error { return nil } -func (r *mockResource) PreScaleDownHook(ctx context.Context) error { return nil } -func (r *mockResource) OnStableReplicasHook(ctx context.Context) error { return nil } -func (r *mockResource) Drain(ctx context.Context, pod *v1.Pod) error { return nil } +func (r *mockResource) Self() runtime.Object { return r.eds } +func (r *mockResource) EnsureResources(ctx context.Context) error { return nil } +func (r *mockResource) UpdateStatus(ctx context.Context, sts *appsv1.StatefulSet) error { return nil } +func (r *mockResource) PreScaleDownHook(ctx context.Context) error { return nil } +func (r *mockResource) OnStableReplicasHook(ctx context.Context) error { return nil } +func (r *mockResource) Drain(ctx context.Context, pod *v1.Pod, config *RestyConfig) error { return nil } func TestPrioritizePodsForUpdate(t *testing.T) { updatingPod := v1.Pod{ From ece1c4fbc8728f06337af543178a42c24baec062 Mon Sep 17 00:00:00 2001 From: vkropotko Date: Fri, 9 Apr 2021 11:03:21 +0200 Subject: [PATCH 2/4] feat(drain): improve http error handling Signed-off-by: vkropotko --- operator/es_client.go | 8 ++++++++ operator/es_client_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/operator/es_client.go b/operator/es_client.go index 9dfe254b..9184b50e 100644 --- a/operator/es_client.go +++ b/operator/es_client.go @@ -324,6 +324,10 @@ func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod, config * // It is expected to return (bool, error) pair. Resty will retry // in case condition returns true or non nil error. func(r *resty.Response) (bool, error) { + if !r.IsSuccess() { + return true, nil + } + var shards []ESShard err := json.Unmarshal(r.Body(), &shards) if err != nil { @@ -353,6 +357,10 @@ func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod, config * return err } + if !resp.IsSuccess() { + return fmt.Errorf("HTTP endpoint responded with not expected status code %d", resp.StatusCode()) + } + // make sure the IP is still excluded, this could have been updated in the meantime. if err = c.excludePodIP(pod); err != nil { return err diff --git a/operator/es_client_test.go b/operator/es_client_test.go index 358d7aa5..8ebf5716 100644 --- a/operator/es_client_test.go +++ b/operator/es_client_test.go @@ -51,6 +51,31 @@ func TestDrain(t *testing.T) { require.EqualValues(t, 3, info["PUT http://elasticsearch:9200/_cluster/settings"]) require.EqualValues(t, 2, info["GET http://elasticsearch:9200/_cluster/settings"]) require.EqualValues(t, 1, info["GET http://elasticsearch:9200/_cat/shards"]) + + // Test that if ES endpoint stops responding as expected Drain will return an error + httpmock.Reset() + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cluster/settings", + httpmock.NewStringResponder(200, `{"transient":{"cluster":{"routing":{"rebalance":{"enable":"all"}}}}}`)) + httpmock.RegisterResponder("PUT", "http://elasticsearch:9200/_cluster/settings", + httpmock.NewStringResponder(200, `{}`)) + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cluster/health", + httpmock.NewStringResponder(200, `{"status":"green"}`)) + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cat/shards", + httpmock.NewStringResponder(404, ``)) + + err = systemUnderTest.Drain(context.TODO(), &v1.Pod{ + Status: v1.PodStatus{ + PodIP: "1.2.3.4", + }, + }, + &RestyConfig{ + ClientRetryCount: 1, + ClientRetryWaitTime: 1 * time.Second, + ClientRetryMaxWaitTime: 1 * time.Second, + }, + ) + + assert.NotNil(t, err) } func TestCleanup(t *testing.T) { From 559e2872d25091fa2911193b52e01fc069393c2f Mon Sep 17 00:00:00 2001 From: vkropotko Date: Wed, 14 Apr 2021 20:21:40 +0200 Subject: [PATCH 3/4] fix: update naming and add mutex lock to cleanup func Signed-off-by: vkropotko --- operator/elasticsearch.go | 8 ++++---- operator/es_client.go | 26 ++++++++++++++------------ operator/es_client_test.go | 8 ++++---- operator/operator.go | 8 ++++---- operator/operator_test.go | 2 +- 5 files changed, 27 insertions(+), 25 deletions(-) diff --git a/operator/elasticsearch.go b/operator/elasticsearch.go index 5bab858e..f2a41740 100644 --- a/operator/elasticsearch.go +++ b/operator/elasticsearch.go @@ -48,7 +48,7 @@ type ElasticsearchOperator struct { operating map[types.UID]operatingEntry sync.Mutex recorder kube_record.EventRecorder - esClientRestyConfig *RestyConfig + esClientRetryConfig *RetryConfig } type operatingEntry struct { @@ -88,7 +88,7 @@ func NewElasticsearchOperator( elasticsearchEndpoint: elasticsearchEndpoint, operating: make(map[types.UID]operatingEntry), recorder: createEventRecorder(client), - esClientRestyConfig: &RestyConfig{ + esClientRetryConfig: &RetryConfig{ ClientRetryCount: esClientRetryCount, ClientRetryWaitTime: esClientRetryWaitTime, ClientRetryMaxWaitTime: esClientRetryMaxWaitTime, @@ -506,7 +506,7 @@ func (r *EDSResource) ensureService(ctx context.Context) error { } // Drain drains a pod for Elasticsearch data. -func (r *EDSResource) Drain(ctx context.Context, pod *v1.Pod, config *RestyConfig) error { +func (r *EDSResource) Drain(ctx context.Context, pod *v1.Pod, config *RetryConfig) error { return r.esClient.Drain(ctx, pod, config) } @@ -650,7 +650,7 @@ func (o *ElasticsearchOperator) operateEDS(eds *zv1.ElasticsearchDataSet, delete interval: o.interval, logger: logger, recorder: o.recorder, - esClientRestyConfig: o.esClientRestyConfig, + esClientRetryConfig: o.esClientRetryConfig, } rs := &EDSResource{ diff --git a/operator/es_client.go b/operator/es_client.go index 9184b50e..e0f70fd3 100644 --- a/operator/es_client.go +++ b/operator/es_client.go @@ -18,7 +18,7 @@ import ( ) // Restry Configuration -type RestyConfig struct { +type RetryConfig struct { ClientRetryCount int ClientRetryWaitTime time.Duration ClientRetryMaxWaitTime time.Duration @@ -91,7 +91,7 @@ func (c *ESClient) logger() *log.Entry { } // Drain drains data from an Elasticsearch pod. -func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod, config *RestyConfig) error { +func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod, config *RetryConfig) error { c.logger().Info("Ensuring cluster is in green state") @@ -116,6 +116,10 @@ func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod, config *RestyConfig) func (c *ESClient) Cleanup(ctx context.Context) error { + // prevent ESClient from execute another operations on excludeIPList in ES + c.mux.Lock() + defer c.mux.Unlock() + // 1. fetch IPs from _cat/nodes nodes, err := c.GetNodes() if err != nil { @@ -203,13 +207,14 @@ func (c *ESClient) getClusterSettings() (*ESSettings, error) { // adds the podIP to Elasticsearch exclude._ip list func (c *ESClient) excludePodIP(pod *v1.Pod) error { + // prevent ESClient from execute another operations on excludeIPList in ES c.mux.Lock() + defer c.mux.Unlock() podIP := pod.Status.PodIP esSettings, err := c.getClusterSettings() if err != nil { - c.mux.Unlock() return err } @@ -234,7 +239,6 @@ func (c *ESClient) excludePodIP(pod *v1.Pod) error { err = c.setExcludeIPs(strings.Join(ips, ",")) } - c.mux.Unlock() return err } @@ -258,8 +262,9 @@ func (c *ESClient) setExcludeIPs(ips string) error { } // remove the podIP from Elasticsearch exclude._ip list -func (c *ESClient) removeFromExcludeIPList(pod *v1.Pod) error { +func (c *ESClient) undoExcludePodIP(pod *v1.Pod) error { + // prevent ESClient from execute another operations on excludeIPList in ES c.mux.Lock() defer c.mux.Unlock() @@ -272,6 +277,8 @@ func (c *ESClient) removeFromExcludeIPList(pod *v1.Pod) error { excludedIPsString := esSettings.Transient.Cluster.Routing.Allocation.Exclude.IP excludedIPs := strings.Split(excludedIPsString, ",") + + // create a new array with excludedIP without provided Pod IP address var newExcludedIPs []string for _, excludeIP := range excludedIPs { if excludeIP != podIP { @@ -313,7 +320,7 @@ func (c *ESClient) updateAutoRebalance(value string) error { } // repeatedly query shard allocations to ensure success of drain operation. -func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod, config *RestyConfig) error { +func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod, config *RetryConfig) error { // TODO: implement context handling podIP := pod.Status.PodIP resp, err := resty.New(). @@ -361,11 +368,6 @@ func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod, config * return fmt.Errorf("HTTP endpoint responded with not expected status code %d", resp.StatusCode()) } - // make sure the IP is still excluded, this could have been updated in the meantime. - if err = c.excludePodIP(pod); err != nil { - return err - } - var shards []ESShard err = json.Unmarshal(resp.Body(), &shards) if err != nil { @@ -376,7 +378,7 @@ func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod, config * if shard.IP == podIP { err = fmt.Errorf("Cannot migrate shards from pod '%s' with IP '%s' within provided intervals", pod.ObjectMeta.Name, pod.Status.PodIP) // if we cannot remove node than return it back active nodes pool - if errExclude := c.removeFromExcludeIPList(pod); errExclude != nil { + if errExclude := c.undoExcludePodIP(pod); errExclude != nil { return fmt.Errorf("during handling request error: '%v' another error has been raised '%v'", err, errExclude) } return err diff --git a/operator/es_client_test.go b/operator/es_client_test.go index 8ebf5716..0294417b 100644 --- a/operator/es_client_test.go +++ b/operator/es_client_test.go @@ -37,7 +37,7 @@ func TestDrain(t *testing.T) { PodIP: "1.2.3.4", }, }, - &RestyConfig{ + &RetryConfig{ ClientRetryCount: 999, ClientRetryWaitTime: 10 * time.Second, ClientRetryMaxWaitTime: 30 * time.Second, @@ -68,7 +68,7 @@ func TestDrain(t *testing.T) { PodIP: "1.2.3.4", }, }, - &RestyConfig{ + &RetryConfig{ ClientRetryCount: 1, ClientRetryWaitTime: 1 * time.Second, ClientRetryMaxWaitTime: 1 * time.Second, @@ -316,7 +316,7 @@ func TestExcludeIP(t *testing.T) { assert.NoError(t, err) } -func TestRemoveFromExcludeIPList(t *testing.T) { +func TestUndoExcludePodIP(t *testing.T) { httpmock.Activate() defer httpmock.DeactivateAndReset() @@ -348,7 +348,7 @@ func TestRemoveFromExcludeIPList(t *testing.T) { Endpoint: url, } - err := systemUnderTest.removeFromExcludeIPList(&v1.Pod{ + err := systemUnderTest.undoExcludePodIP(&v1.Pod{ Status: v1.PodStatus{ PodIP: "192.168.1.2", }, diff --git a/operator/operator.go b/operator/operator.go index 254d6765..7afecadd 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -86,7 +86,7 @@ type StatefulResource interface { // Drain drains a pod for data. It's expected that the method only // returns after the pod has been drained. - Drain(ctx context.Context, pod *v1.Pod, config *RestyConfig) error + Drain(ctx context.Context, pod *v1.Pod, config *RetryConfig) error } // Operator is a generic operator that can manage Pods filtered by a selector. @@ -96,7 +96,7 @@ type Operator struct { interval time.Duration logger *log.Entry recorder kube_record.EventRecorder - esClientRestyConfig *RestyConfig + esClientRetryConfig *RetryConfig } func (o *Operator) Run(ctx context.Context, done chan<- struct{}, sr StatefulResource) { @@ -354,7 +354,7 @@ func (o *Operator) operatePods(ctx context.Context, sts *appsv1.StatefulSet, sr // drain Pod o.recorder.Event(sr.Self(), v1.EventTypeNormal, "DrainingPod", fmt.Sprintf("Draining Pod '%s/%s'", pod.Namespace, pod.Name)) - err = sr.Drain(ctx, pod, o.esClientRestyConfig) + err = sr.Drain(ctx, pod, o.esClientRetryConfig) if err != nil { return fmt.Errorf("failed to drain Pod %s/%s: %v", pod.Namespace, pod.Name, err) } @@ -453,7 +453,7 @@ func (o *Operator) rescaleStatefulSet(ctx context.Context, sts *appsv1.StatefulS } log.Infof("Draining Pod %s/%s for scaledown", pod.Namespace, pod.Name) - err := sr.Drain(ctx, &pod, o.esClientRestyConfig) + err := sr.Drain(ctx, &pod, o.esClientRetryConfig) if err != nil { return fmt.Errorf("failed to drain pod %s/%s: %v", pod.Namespace, pod.Name, err) } diff --git a/operator/operator_test.go b/operator/operator_test.go index fbf50ca9..a721b791 100644 --- a/operator/operator_test.go +++ b/operator/operator_test.go @@ -48,7 +48,7 @@ func (r *mockResource) EnsureResources(ctx context.Context) error func (r *mockResource) UpdateStatus(ctx context.Context, sts *appsv1.StatefulSet) error { return nil } func (r *mockResource) PreScaleDownHook(ctx context.Context) error { return nil } func (r *mockResource) OnStableReplicasHook(ctx context.Context) error { return nil } -func (r *mockResource) Drain(ctx context.Context, pod *v1.Pod, config *RestyConfig) error { return nil } +func (r *mockResource) Drain(ctx context.Context, pod *v1.Pod, config *RetryConfig) error { return nil } func TestPrioritizePodsForUpdate(t *testing.T) { updatingPod := v1.Pod{ From d33bd55e096f4d95c9c627dcc12c9111d596b029 Mon Sep 17 00:00:00 2001 From: vkropotko Date: Wed, 14 Apr 2021 20:31:11 +0200 Subject: [PATCH 4/4] fix: tests Signed-off-by: vkropotko --- operator/es_client_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/operator/es_client_test.go b/operator/es_client_test.go index 0294417b..8ceead0c 100644 --- a/operator/es_client_test.go +++ b/operator/es_client_test.go @@ -48,8 +48,8 @@ func TestDrain(t *testing.T) { info := httpmock.GetCallCountInfo() require.EqualValues(t, 1, info["GET http://elasticsearch:9200/_cluster/health"]) - require.EqualValues(t, 3, info["PUT http://elasticsearch:9200/_cluster/settings"]) - require.EqualValues(t, 2, info["GET http://elasticsearch:9200/_cluster/settings"]) + require.EqualValues(t, 2, info["PUT http://elasticsearch:9200/_cluster/settings"]) + require.EqualValues(t, 1, info["GET http://elasticsearch:9200/_cluster/settings"]) require.EqualValues(t, 1, info["GET http://elasticsearch:9200/_cat/shards"]) // Test that if ES endpoint stops responding as expected Drain will return an error