Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(esClientDrain): enhance Drain ES Client function #168

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,28 @@ const (
defaultMetricsAddress = ":7979"
defaultClientGoTimeout = 30 * time.Second
defaultClusterDNSZone = "cluster.local."
defaultRetryCount = "999"
defaultRetryWaitTime = "10s"
defaultRetryMaxWaitTime = "30s"
)

var (
config struct {
Interval time.Duration
AutoscalerInterval time.Duration
APIServer *url.URL
PodSelectors Labels
PriorityNodeSelectors Labels
MetricsAddress string
ClientGoTimeout time.Duration
Debug bool
OperatorID string
Namespace string
ClusterDNSZone string
ElasticsearchEndpoint *url.URL
Interval time.Duration
AutoscalerInterval time.Duration
APIServer *url.URL
PodSelectors Labels
PriorityNodeSelectors Labels
MetricsAddress string
ClientGoTimeout time.Duration
Debug bool
OperatorID string
Namespace string
ClusterDNSZone string
ElasticsearchEndpoint *url.URL
EsClientRetryCount int
EsClientRetryWaitTime time.Duration
EsClientRetryMaxWaitTime time.Duration
}
)

Expand Down Expand Up @@ -71,6 +77,9 @@ func main() {
URLVar(&config.ElasticsearchEndpoint)
kingpin.Flag("namespace", "Limit operator to a certain namespace").
Default(v1.NamespaceAll).StringVar(&config.Namespace)
kingpin.Flag("esclient-retry-count", "Count of retry operations conducted by EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryCount).IntVar(&config.EsClientRetryCount)
kingpin.Flag("esclient-retry-waittime", "Wait time between two operations of EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryWaitTime).DurationVar(&config.EsClientRetryWaitTime)
kingpin.Flag("esclient-retry-max-waittime", "Max wait time between two operations of EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryMaxWaitTime).DurationVar(&config.EsClientRetryMaxWaitTime)

kingpin.Parse()

Expand Down Expand Up @@ -98,6 +107,9 @@ func main() {
config.Namespace,
config.ClusterDNSZone,
config.ElasticsearchEndpoint,
config.EsClientRetryCount,
config.EsClientRetryWaitTime,
config.EsClientRetryMaxWaitTime,
)

go handleSigterm(cancel)
Expand Down
16 changes: 13 additions & 3 deletions operator/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type ElasticsearchOperator struct {
elasticsearchEndpoint *url.URL
operating map[types.UID]operatingEntry
sync.Mutex
recorder kube_record.EventRecorder
recorder kube_record.EventRecorder
esClientRestyConfig *RestyConfig
}

type operatingEntry struct {
Expand All @@ -66,6 +67,9 @@ func NewElasticsearchOperator(
namespace,
clusterDNSZone string,
elasticsearchEndpoint *url.URL,
esClientRetryCount int,
esClientRetryWaitTime,
esClientRetryMaxWaitTime time.Duration,
) *ElasticsearchOperator {
return &ElasticsearchOperator{
logger: log.WithFields(
Expand All @@ -84,6 +88,11 @@ func NewElasticsearchOperator(
elasticsearchEndpoint: elasticsearchEndpoint,
operating: make(map[types.UID]operatingEntry),
recorder: createEventRecorder(client),
esClientRestyConfig: &RestyConfig{
ClientRetryCount: esClientRetryCount,
ClientRetryWaitTime: esClientRetryWaitTime,
ClientRetryMaxWaitTime: esClientRetryMaxWaitTime,
},
}
}

Expand Down Expand Up @@ -497,8 +506,8 @@ func (r *EDSResource) ensureService(ctx context.Context) error {
}

// Drain drains a pod for Elasticsearch data.
func (r *EDSResource) Drain(ctx context.Context, pod *v1.Pod) error {
return r.esClient.Drain(ctx, pod)
func (r *EDSResource) Drain(ctx context.Context, pod *v1.Pod, config *RestyConfig) error {
return r.esClient.Drain(ctx, pod, config)
}

// PreScaleDownHook ensures that the IndexReplicas is set as defined in the EDS
Expand Down Expand Up @@ -641,6 +650,7 @@ func (o *ElasticsearchOperator) operateEDS(eds *zv1.ElasticsearchDataSet, delete
interval: o.interval,
logger: logger,
recorder: o.recorder,
esClientRestyConfig: o.esClientRestyConfig,
}

rs := &EDSResource{
Expand Down
12 changes: 10 additions & 2 deletions operator/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
"k8s.io/apimachinery/pkg/types"
)

const (
defaultRetryCount = 999
defaultRetryWaitTime = 10 * time.Second
defaultRetryMaxWaitTime = 30 * time.Second
)

func TestHasOwnership(t *testing.T) {
eds := &zv1.ElasticsearchDataSet{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -43,7 +49,8 @@ func TestGetElasticsearchEndpoint(t *testing.T) {
faker := &clientset.Clientset{
Interface: fake.NewSimpleClientset(),
}
esOperator := NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", "cluster.local.", nil)
esOperator := NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", "cluster.local.", nil,
defaultRetryCount, defaultRetryWaitTime, defaultRetryMaxWaitTime)

eds := &zv1.ElasticsearchDataSet{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -59,7 +66,8 @@ func TestGetElasticsearchEndpoint(t *testing.T) {
customEndpoint, err := url.Parse(customURL)
assert.NoError(t, err)

esOperator = NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", ".cluster.local.", customEndpoint)
esOperator = NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", ".cluster.local.", customEndpoint,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's pass the RetryConfig from here instead of individual arguments.

defaultRetryCount, defaultRetryWaitTime, defaultRetryMaxWaitTime)
url = esOperator.getElasticsearchEndpoint(eds)
assert.Equal(t, customURL, url.String())
}
Expand Down
98 changes: 83 additions & 15 deletions operator/es_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (
v1 "k8s.io/api/core/v1"
)

// TODO make configurable as flags.
var (
defaultRetryCount = 999
defaultRetryWaitTime = 10 * time.Second
defaultRetryMaxWaitTime = 30 * time.Second
)
// Restry Configuration
type RestyConfig struct {
s-vkropotko marked this conversation as resolved.
Show resolved Hide resolved
ClientRetryCount int
ClientRetryWaitTime time.Duration
ClientRetryMaxWaitTime time.Duration
}

// ESClient is a pod drainer which can drain data from Elasticsearch pods.
type ESClient struct {
Expand Down Expand Up @@ -91,7 +91,7 @@ func (c *ESClient) logger() *log.Entry {
}

// Drain drains data from an Elasticsearch pod.
func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod) error {
func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod, config *RestyConfig) error {

c.logger().Info("Ensuring cluster is in green state")

Expand All @@ -111,7 +111,7 @@ func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod) error {
}

c.logger().Info("Waiting for draining to finish")
return c.waitForEmptyEsNode(ctx, pod)
return c.waitForEmptyEsNode(ctx, pod, config)
}

func (c *ESClient) Cleanup(ctx context.Context) error {
Expand Down Expand Up @@ -220,6 +220,7 @@ func (c *ESClient) excludePodIP(pod *v1.Pod) error {
if excludeString != "" {
ips = strings.Split(excludeString, ",")
}

var foundPodIP bool
for _, ip := range ips {
if ip == podIP {
Expand Down Expand Up @@ -256,6 +257,42 @@ func (c *ESClient) setExcludeIPs(ips string) error {
return nil
}

// remove the podIP from Elasticsearch exclude._ip list
func (c *ESClient) removeFromExcludeIPList(pod *v1.Pod) error {

c.mux.Lock()
defer c.mux.Unlock()
s-vkropotko marked this conversation as resolved.
Show resolved Hide resolved

podIP := pod.Status.PodIP

esSettings, err := c.getClusterSettings()
if err != nil {
return err
}

excludedIPsString := esSettings.Transient.Cluster.Routing.Allocation.Exclude.IP
excludedIPs := strings.Split(excludedIPsString, ",")
var newExcludedIPs []string
for _, excludeIP := range excludedIPs {
if excludeIP != podIP {
newExcludedIPs = append(newExcludedIPs, excludeIP)
sort.Strings(newExcludedIPs)
}
}

newExcludedIPsString := strings.Join(newExcludedIPs, ",")
if newExcludedIPsString != excludedIPsString {
c.logger().Infof("Setting exclude list to '%s'", newExcludedIPsString)

err = c.setExcludeIPs(newExcludedIPsString)
if err != nil {
return err
}
}

return nil
}

func (c *ESClient) updateAutoRebalance(value string) error {
resp, err := resty.New().R().
SetHeader("Content-Type", "application/json").
Expand All @@ -276,23 +313,26 @@ func (c *ESClient) updateAutoRebalance(value string) error {
}

// repeatedly query shard allocations to ensure success of drain operation.
func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error {
func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod, config *RestyConfig) error {
// TODO: implement context handling
podIP := pod.Status.PodIP
_, err := resty.New().
SetRetryCount(defaultRetryCount).
SetRetryWaitTime(defaultRetryWaitTime).
SetRetryMaxWaitTime(defaultRetryMaxWaitTime).
resp, err := resty.New().
SetRetryCount(config.ClientRetryCount).
SetRetryWaitTime(config.ClientRetryWaitTime).
SetRetryMaxWaitTime(config.ClientRetryMaxWaitTime).
AddRetryCondition(
// It is expected to return (bool, error) pair. Resty will retry
// in case condition returns true or non nil error.
func(r *resty.Response) (bool, error) {
if !r.IsSuccess() {
return true, nil
}

var shards []ESShard
err := json.Unmarshal(r.Body(), &shards)
if err != nil {
return true, err
}
// shardIP := make(map[string]bool)
remainingShards := 0
for _, shard := range shards {
if shard.IP == podIP {
Expand All @@ -312,9 +352,37 @@ func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error {
},
).R().
Get(c.Endpoint.String() + "/_cat/shards?h=index,ip&format=json")

if err != nil {
return err
}

if !resp.IsSuccess() {
return fmt.Errorf("HTTP endpoint responded with not expected status code %d", resp.StatusCode())
}

// make sure the IP is still excluded, this could have been updated in the meantime.
if err = c.excludePodIP(pod); err != nil {
otrosien marked this conversation as resolved.
Show resolved Hide resolved
return err
}

var shards []ESShard
err = json.Unmarshal(resp.Body(), &shards)
if err != nil {
return err
}

for _, shard := range shards {
if shard.IP == podIP {
err = fmt.Errorf("Cannot migrate shards from pod '%s' with IP '%s' within provided intervals", pod.ObjectMeta.Name, pod.Status.PodIP)
// if we cannot remove node than return it back active nodes pool
if errExclude := c.removeFromExcludeIPList(pod); errExclude != nil {
return fmt.Errorf("during handling request error: '%v' another error has been raised '%v'", err, errExclude)
}
return err
}
}

return nil
}

Expand Down Expand Up @@ -452,7 +520,7 @@ func (c *ESClient) CreateIndex(indexName, groupName string, shards, replicas int
SetHeader("Content-Type", "application/json").
SetBody([]byte(
fmt.Sprintf(
`{"settings": {"index" : {"number_of_replicas" : "%d", "number_of_shards": "%d",
`{"settings": {"index" : {"number_of_replicas" : "%d", "number_of_shards": "%d",
"routing.allocation.include.group": "%s"}}}`,
replicas,
shards,
Expand Down
Loading