diff --git a/main.go b/main.go index b3cc23cf..b2555ff9 100644 --- a/main.go +++ b/main.go @@ -26,22 +26,28 @@ const ( defaultMetricsAddress = ":7979" defaultClientGoTimeout = 30 * time.Second defaultClusterDNSZone = "cluster.local." + defaultRetryCount = "999" + defaultRetryWaitTime = "10s" + defaultRetryMaxWaitTime = "30s" ) var ( config struct { - Interval time.Duration - AutoscalerInterval time.Duration - APIServer *url.URL - PodSelectors Labels - PriorityNodeSelectors Labels - MetricsAddress string - ClientGoTimeout time.Duration - Debug bool - OperatorID string - Namespace string - ClusterDNSZone string - ElasticsearchEndpoint *url.URL + Interval time.Duration + AutoscalerInterval time.Duration + APIServer *url.URL + PodSelectors Labels + PriorityNodeSelectors Labels + MetricsAddress string + ClientGoTimeout time.Duration + Debug bool + OperatorID string + Namespace string + ClusterDNSZone string + ElasticsearchEndpoint *url.URL + EsClientRetryCount int + EsClientRetryWaitTime time.Duration + EsClientRetryMaxWaitTime time.Duration } ) @@ -71,6 +77,9 @@ func main() { URLVar(&config.ElasticsearchEndpoint) kingpin.Flag("namespace", "Limit operator to a certain namespace"). Default(v1.NamespaceAll).StringVar(&config.Namespace) + kingpin.Flag("esclient-retry-count", "Count of retry operations conducted by EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryCount).IntVar(&config.EsClientRetryCount) + kingpin.Flag("esclient-retry-waittime", "Wait time between two operations of EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryWaitTime).DurationVar(&config.EsClientRetryWaitTime) + kingpin.Flag("esclient-retry-max-waittime", "Max wait time between two operations of EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryMaxWaitTime).DurationVar(&config.EsClientRetryMaxWaitTime) kingpin.Parse() @@ -98,6 +107,9 @@ func main() { config.Namespace, config.ClusterDNSZone, config.ElasticsearchEndpoint, + config.EsClientRetryCount, + config.EsClientRetryWaitTime, + config.EsClientRetryMaxWaitTime, ) go handleSigterm(cancel) diff --git a/operator/elasticsearch.go b/operator/elasticsearch.go index 9f007619..6859139b 100644 --- a/operator/elasticsearch.go +++ b/operator/elasticsearch.go @@ -47,7 +47,8 @@ type ElasticsearchOperator struct { elasticsearchEndpoint *url.URL operating map[types.UID]operatingEntry sync.Mutex - recorder kube_record.EventRecorder + recorder kube_record.EventRecorder + esClientRetryConfig *RetryConfig } type operatingEntry struct { @@ -66,6 +67,9 @@ func NewElasticsearchOperator( namespace, clusterDNSZone string, elasticsearchEndpoint *url.URL, + esClientRetryCount int, + esClientRetryWaitTime, + esClientRetryMaxWaitTime time.Duration, ) *ElasticsearchOperator { return &ElasticsearchOperator{ logger: log.WithFields( @@ -84,6 +88,11 @@ func NewElasticsearchOperator( elasticsearchEndpoint: elasticsearchEndpoint, operating: make(map[types.UID]operatingEntry), recorder: createEventRecorder(client), + esClientRetryConfig: &RetryConfig{ + ClientRetryCount: esClientRetryCount, + ClientRetryWaitTime: esClientRetryWaitTime, + ClientRetryMaxWaitTime: esClientRetryMaxWaitTime, + }, } } @@ -509,8 +518,8 @@ func (r *EDSResource) ensureService(ctx context.Context) error { } // Drain drains a pod for Elasticsearch data. -func (r *EDSResource) Drain(ctx context.Context, pod *v1.Pod) error { - return r.esClient.Drain(ctx, pod) +func (r *EDSResource) Drain(ctx context.Context, pod *v1.Pod, config *RetryConfig) error { + return r.esClient.Drain(ctx, pod, config) } // PreScaleDownHook ensures that the IndexReplicas is set as defined in the EDS @@ -653,6 +662,7 @@ func (o *ElasticsearchOperator) operateEDS(eds *zv1.ElasticsearchDataSet, delete interval: o.interval, logger: logger, recorder: o.recorder, + esClientRetryConfig: o.esClientRetryConfig, } rs := &EDSResource{ diff --git a/operator/elasticsearch_test.go b/operator/elasticsearch_test.go index 9e085db9..d2641691 100644 --- a/operator/elasticsearch_test.go +++ b/operator/elasticsearch_test.go @@ -15,6 +15,12 @@ import ( "k8s.io/apimachinery/pkg/types" ) +const ( + defaultRetryCount = 999 + defaultRetryWaitTime = 10 * time.Second + defaultRetryMaxWaitTime = 30 * time.Second +) + func TestHasOwnership(t *testing.T) { eds := &zv1.ElasticsearchDataSet{ ObjectMeta: metav1.ObjectMeta{ @@ -43,7 +49,8 @@ func TestGetElasticsearchEndpoint(t *testing.T) { faker := &clientset.Clientset{ Interface: fake.NewSimpleClientset(), } - esOperator := NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", "cluster.local.", nil) + esOperator := NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", "cluster.local.", nil, + defaultRetryCount, defaultRetryWaitTime, defaultRetryMaxWaitTime) eds := &zv1.ElasticsearchDataSet{ ObjectMeta: metav1.ObjectMeta{ @@ -59,7 +66,8 @@ func TestGetElasticsearchEndpoint(t *testing.T) { customEndpoint, err := url.Parse(customURL) assert.NoError(t, err) - esOperator = NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", ".cluster.local.", customEndpoint) + esOperator = NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", ".cluster.local.", customEndpoint, + defaultRetryCount, defaultRetryWaitTime, defaultRetryMaxWaitTime) url = esOperator.getElasticsearchEndpoint(eds) assert.Equal(t, customURL, url.String()) } diff --git a/operator/es_client.go b/operator/es_client.go index 49deaf8b..e9e7e42a 100644 --- a/operator/es_client.go +++ b/operator/es_client.go @@ -17,12 +17,12 @@ import ( v1 "k8s.io/api/core/v1" ) -// TODO make configurable as flags. -var ( - defaultRetryCount = 999 - defaultRetryWaitTime = 10 * time.Second - defaultRetryMaxWaitTime = 30 * time.Second -) +// Restry Configuration +type RetryConfig struct { + ClientRetryCount int + ClientRetryWaitTime time.Duration + ClientRetryMaxWaitTime time.Duration +} // ESClient is a pod drainer which can drain data from Elasticsearch pods. type ESClient struct { @@ -92,7 +92,7 @@ func (c *ESClient) logger() *log.Entry { } // Drain drains data from an Elasticsearch pod. -func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod) error { +func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod, config *RetryConfig) error { c.logger().Info("Ensuring cluster is in green state") @@ -112,11 +112,15 @@ func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod) error { } c.logger().Info("Waiting for draining to finish") - return c.waitForEmptyEsNode(ctx, pod) + return c.waitForEmptyEsNode(ctx, pod, config) } func (c *ESClient) Cleanup(ctx context.Context) error { + // prevent ESClient from execute another operations on excludeIPList in ES + c.mux.Lock() + defer c.mux.Unlock() + // 1. fetch IPs from _cat/nodes nodes, err := c.GetNodes() if err != nil { @@ -204,13 +208,14 @@ func (c *ESClient) getClusterSettings() (*ESSettings, error) { // adds the podIP to Elasticsearch exclude._ip list func (c *ESClient) excludePodIP(pod *v1.Pod) error { + // prevent ESClient from execute another operations on excludeIPList in ES c.mux.Lock() + defer c.mux.Unlock() podIP := pod.Status.PodIP esSettings, err := c.getClusterSettings() if err != nil { - c.mux.Unlock() return err } @@ -221,6 +226,7 @@ func (c *ESClient) excludePodIP(pod *v1.Pod) error { if excludeString != "" { ips = strings.Split(excludeString, ",") } + var foundPodIP bool for _, ip := range ips { if ip == podIP { @@ -234,7 +240,6 @@ func (c *ESClient) excludePodIP(pod *v1.Pod) error { err = c.setExcludeIPs(strings.Join(ips, ",")) } - c.mux.Unlock() return err } @@ -257,6 +262,45 @@ func (c *ESClient) setExcludeIPs(ips string) error { return nil } +// remove the podIP from Elasticsearch exclude._ip list +func (c *ESClient) undoExcludePodIP(pod *v1.Pod) error { + + // prevent ESClient from execute another operations on excludeIPList in ES + c.mux.Lock() + defer c.mux.Unlock() + + podIP := pod.Status.PodIP + + esSettings, err := c.getClusterSettings() + if err != nil { + return err + } + + excludedIPsString := esSettings.Transient.Cluster.Routing.Allocation.Exclude.IP + excludedIPs := strings.Split(excludedIPsString, ",") + + // create a new array with excludedIP without provided Pod IP address + var newExcludedIPs []string + for _, excludeIP := range excludedIPs { + if excludeIP != podIP { + newExcludedIPs = append(newExcludedIPs, excludeIP) + sort.Strings(newExcludedIPs) + } + } + + newExcludedIPsString := strings.Join(newExcludedIPs, ",") + if newExcludedIPsString != excludedIPsString { + c.logger().Infof("Setting exclude list to '%s'", newExcludedIPsString) + + err = c.setExcludeIPs(newExcludedIPsString) + if err != nil { + return err + } + } + + return nil +} + func (c *ESClient) updateAutoRebalance(value string) error { resp, err := resty.New().R(). SetHeader("Content-Type", "application/json"). @@ -277,23 +321,26 @@ func (c *ESClient) updateAutoRebalance(value string) error { } // repeatedly query shard allocations to ensure success of drain operation. -func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error { +func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod, config *RetryConfig) error { // TODO: implement context handling podIP := pod.Status.PodIP - _, err := resty.New(). - SetRetryCount(defaultRetryCount). - SetRetryWaitTime(defaultRetryWaitTime). - SetRetryMaxWaitTime(defaultRetryMaxWaitTime). + resp, err := resty.New(). + SetRetryCount(config.ClientRetryCount). + SetRetryWaitTime(config.ClientRetryWaitTime). + SetRetryMaxWaitTime(config.ClientRetryMaxWaitTime). AddRetryCondition( // It is expected to return (bool, error) pair. Resty will retry // in case condition returns true or non nil error. func(r *resty.Response) (bool, error) { + if !r.IsSuccess() { + return true, nil + } + var shards []ESShard err := json.Unmarshal(r.Body(), &shards) if err != nil { return true, err } - // shardIP := make(map[string]bool) remainingShards := 0 for _, shard := range shards { if shard.IP == podIP { @@ -313,9 +360,32 @@ func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error { }, ).R(). Get(c.Endpoint.String() + "/_cat/shards?h=index,ip&format=json") + if err != nil { return err } + + if !resp.IsSuccess() { + return fmt.Errorf("HTTP endpoint responded with not expected status code %d", resp.StatusCode()) + } + + var shards []ESShard + err = json.Unmarshal(resp.Body(), &shards) + if err != nil { + return err + } + + for _, shard := range shards { + if shard.IP == podIP { + err = fmt.Errorf("Cannot migrate shards from pod '%s' with IP '%s' within provided intervals", pod.ObjectMeta.Name, pod.Status.PodIP) + // if we cannot remove node than return it back active nodes pool + if errExclude := c.undoExcludePodIP(pod); errExclude != nil { + return fmt.Errorf("during handling request error: '%v' another error has been raised '%v'", err, errExclude) + } + return err + } + } + return nil } @@ -457,7 +527,7 @@ func (c *ESClient) CreateIndex(indexName, groupName string, shards, replicas int SetHeader("Content-Type", "application/json"). SetBody([]byte( fmt.Sprintf( - `{"settings": {"index" : {"number_of_replicas" : "%d", "number_of_shards": "%d", + `{"settings": {"index" : {"number_of_replicas" : "%d", "number_of_shards": "%d", "routing.allocation.include.group": "%s"}}}`, replicas, shards, diff --git a/operator/es_client_test.go b/operator/es_client_test.go index 708ea669..c993fa45 100644 --- a/operator/es_client_test.go +++ b/operator/es_client_test.go @@ -2,8 +2,11 @@ package operator import ( "context" + "encoding/json" + "net/http" "net/url" "testing" + "time" "github.com/jarcoal/httpmock" "github.com/stretchr/testify/assert" @@ -28,11 +31,18 @@ func TestDrain(t *testing.T) { systemUnderTest := &ESClient{ Endpoint: url, } + err := systemUnderTest.Drain(context.TODO(), &v1.Pod{ Status: v1.PodStatus{ PodIP: "1.2.3.4", }, - }) + }, + &RetryConfig{ + ClientRetryCount: 999, + ClientRetryWaitTime: 10 * time.Second, + ClientRetryMaxWaitTime: 30 * time.Second, + }, + ) assert.NoError(t, err) @@ -41,6 +51,31 @@ func TestDrain(t *testing.T) { require.EqualValues(t, 2, info["PUT http://elasticsearch:9200/_cluster/settings"]) require.EqualValues(t, 1, info["GET http://elasticsearch:9200/_cluster/settings"]) require.EqualValues(t, 1, info["GET http://elasticsearch:9200/_cat/shards"]) + + // Test that if ES endpoint stops responding as expected Drain will return an error + httpmock.Reset() + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cluster/settings", + httpmock.NewStringResponder(200, `{"transient":{"cluster":{"routing":{"rebalance":{"enable":"all"}}}}}`)) + httpmock.RegisterResponder("PUT", "http://elasticsearch:9200/_cluster/settings", + httpmock.NewStringResponder(200, `{}`)) + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cluster/health", + httpmock.NewStringResponder(200, `{"status":"green"}`)) + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cat/shards", + httpmock.NewStringResponder(404, ``)) + + err = systemUnderTest.Drain(context.TODO(), &v1.Pod{ + Status: v1.PodStatus{ + PodIP: "1.2.3.4", + }, + }, + &RetryConfig{ + ClientRetryCount: 1, + ClientRetryWaitTime: 1 * time.Second, + ClientRetryMaxWaitTime: 1 * time.Second, + }, + ) + + assert.NotNil(t, err) } func TestCleanup(t *testing.T) { @@ -257,5 +292,86 @@ func TestExcludeSystemIndices(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, len(indices), indices) assert.Equal(t, "a", indices[0].Index, indices) +} + +func TestExcludeIP(t *testing.T) { + httpmock.Activate() + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cluster/settings", + httpmock.NewStringResponder(200, `{"persistent":{},"transient":{"cluster":{"routing":{"rebalance":{"enable":"all"},"allocation":{"exclude":{"_ip":"192.168.1.1,192.168.1.3"}}}}}}`)) + + httpmock.RegisterResponder("PUT", "http://elasticsearch:9200/_cluster/settings", + func(req *http.Request) (*http.Response, error) { + + update := make(map[string]map[string]string) + if err := json.NewDecoder(req.Body).Decode(&update); err != nil { + return httpmock.NewStringResponse(400, ""), nil + } + + if update["transient"]["cluster.routing.allocation.exclude._ip"] != "192.168.1.1,192.168.1.2,192.168.1.3" { + return httpmock.NewStringResponse(400, ""), nil + } + + resp, err := httpmock.NewJsonResponse(200, "") + if err != nil { + return httpmock.NewStringResponse(500, ""), nil + } + return resp, nil + }, + ) + + url, _ := url.Parse("http://elasticsearch:9200") + systemUnderTest := &ESClient{ + Endpoint: url, + } + + err := systemUnderTest.excludePodIP(&v1.Pod{ + Status: v1.PodStatus{ + PodIP: "192.168.1.2", + }, + }) + + assert.NoError(t, err) +} + +func TestUndoExcludePodIP(t *testing.T) { + httpmock.Activate() + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cluster/settings", + httpmock.NewStringResponder(200, `{"persistent":{},"transient":{"cluster":{"routing":{"rebalance":{"enable":"all"},"allocation":{"exclude":{"_ip":"192.168.1.1,192.168.1.2,192.168.1.3"}}}}}}`)) + + httpmock.RegisterResponder("PUT", "http://elasticsearch:9200/_cluster/settings", + func(req *http.Request) (*http.Response, error) { + + update := make(map[string]map[string]string) + if err := json.NewDecoder(req.Body).Decode(&update); err != nil { + return httpmock.NewStringResponse(400, ""), nil + } + + if update["transient"]["cluster.routing.allocation.exclude._ip"] != "192.168.1.1,192.168.1.3" { + return httpmock.NewStringResponse(400, ""), nil + } + + resp, err := httpmock.NewJsonResponse(200, "") + if err != nil { + return httpmock.NewStringResponse(500, ""), nil + } + return resp, nil + }, + ) + + url, _ := url.Parse("http://elasticsearch:9200") + systemUnderTest := &ESClient{ + Endpoint: url, + } + err := systemUnderTest.undoExcludePodIP(&v1.Pod{ + Status: v1.PodStatus{ + PodIP: "192.168.1.2", + }, + }) + + assert.NoError(t, err) } diff --git a/operator/operator.go b/operator/operator.go index c3922a59..d0409ff1 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -86,7 +86,7 @@ type StatefulResource interface { // Drain drains a pod for data. It's expected that the method only // returns after the pod has been drained. - Drain(ctx context.Context, pod *v1.Pod) error + Drain(ctx context.Context, pod *v1.Pod, config *RetryConfig) error } // Operator is a generic operator that can manage Pods filtered by a selector. @@ -96,6 +96,7 @@ type Operator struct { interval time.Duration logger *log.Entry recorder kube_record.EventRecorder + esClientRetryConfig *RetryConfig } func (o *Operator) Run(ctx context.Context, done chan<- struct{}, sr StatefulResource) { @@ -353,7 +354,7 @@ func (o *Operator) operatePods(ctx context.Context, sts *appsv1.StatefulSet, sr // drain Pod o.recorder.Event(sr.Self(), v1.EventTypeNormal, "DrainingPod", fmt.Sprintf("Draining Pod '%s/%s'", pod.Namespace, pod.Name)) - err = sr.Drain(ctx, pod) + err = sr.Drain(ctx, pod, o.esClientRetryConfig) if err != nil { return fmt.Errorf("failed to drain Pod %s/%s: %v", pod.Namespace, pod.Name, err) } @@ -452,7 +453,7 @@ func (o *Operator) rescaleStatefulSet(ctx context.Context, sts *appsv1.StatefulS } log.Infof("Draining Pod %s/%s for scaledown", pod.Namespace, pod.Name) - err := sr.Drain(ctx, &pod) + err := sr.Drain(ctx, &pod, o.esClientRetryConfig) if err != nil { return fmt.Errorf("failed to drain pod %s/%s: %v", pod.Namespace, pod.Name, err) } diff --git a/operator/operator_test.go b/operator/operator_test.go index 26e44ccc..a721b791 100644 --- a/operator/operator_test.go +++ b/operator/operator_test.go @@ -43,12 +43,12 @@ func (r *mockResource) PodTemplateSpec() *v1.PodTemplateSpec { return r.podTempl func (r *mockResource) VolumeClaimTemplates() []v1.PersistentVolumeClaim { return r.volumeClaimTemplates } -func (r *mockResource) Self() runtime.Object { return r.eds } -func (r *mockResource) EnsureResources(ctx context.Context) error { return nil } -func (r *mockResource) UpdateStatus(ctx context.Context, sts *appsv1.StatefulSet) error { return nil } -func (r *mockResource) PreScaleDownHook(ctx context.Context) error { return nil } -func (r *mockResource) OnStableReplicasHook(ctx context.Context) error { return nil } -func (r *mockResource) Drain(ctx context.Context, pod *v1.Pod) error { return nil } +func (r *mockResource) Self() runtime.Object { return r.eds } +func (r *mockResource) EnsureResources(ctx context.Context) error { return nil } +func (r *mockResource) UpdateStatus(ctx context.Context, sts *appsv1.StatefulSet) error { return nil } +func (r *mockResource) PreScaleDownHook(ctx context.Context) error { return nil } +func (r *mockResource) OnStableReplicasHook(ctx context.Context) error { return nil } +func (r *mockResource) Drain(ctx context.Context, pod *v1.Pod, config *RetryConfig) error { return nil } func TestPrioritizePodsForUpdate(t *testing.T) { updatingPod := v1.Pod{