Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge pull request #6132 from atwamahmoud/status-taints-support #35

Open
wants to merge 1 commit into
base: cluster-autoscaler-release-1.27-airbnb
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ type AutoscalingOptions struct {
MaxBulkSoftTaintTime time.Duration
// MaxPodEvictionTime sets the maximum time CA tries to evict a pod before giving up.
MaxPodEvictionTime time.Duration
// IgnoredTaints is a list of taints CA considers to reflect transient node
// StartupTaints is a list of taints CA considers to reflect transient node
// status that should be removed when creating a node template for scheduling.
// The ignored taints are expected to appear during node startup.
IgnoredTaints []string
// startup taints are expected to appear during node startup.
StartupTaints []string
// StatusTaints is a list of taints CA considers to reflect transient node
// status that should be removed when creating a node template for scheduling.
// The status taints are expected to appear during node lifetime, after startup.
Expand Down
161 changes: 156 additions & 5 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,11 +788,162 @@ func TestStartDeletion(t *testing.T) {
return true, nil, nil
})

// Hook node deletion at the level of cloud provider, to gather which nodes were deleted, and to fail the deletion for
// certain nodes to simulate errors.
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
if tc.failedNodeDeletion[node] {
return fmt.Errorf("SIMULATED ERROR: won't remove node")
// Set up other needed structures and options.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test file is the only difficult diff to review. Can you give me a sense of how you constructed it?

opts := config.AutoscalingOptions{
MaxScaleDownParallelism: 10,
MaxDrainParallelism: 5,
MaxPodEvictionTime: 0,
DaemonSetEvictionForEmptyNodes: true,
}

allPods := []*apiv1.Pod{}

for _, pods := range tc.pods {
allPods = append(allPods, pods...)
}

podLister := kube_util.NewTestPodLister(allPods)
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
dsLister, err := kube_util.NewTestDaemonSetLister([]*appsv1.DaemonSet{ds})
if err != nil {
t.Fatalf("Couldn't create daemonset lister")
}

registry := kube_util.NewListerRegistry(nil, nil, podLister, pdbLister, dsLister, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where did provider definition go? Am I being blind?

if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
}
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))
for _, bucket := range emptyNodeGroupViews {
for _, node := range bucket.Nodes {
err := ctx.ClusterSnapshot.AddNodeWithPods(node, tc.pods[node.Name])
if err != nil {
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
}
}
}
for _, bucket := range drainNodeGroupViews {
for _, node := range bucket.Nodes {
pods, found := tc.pods[node.Name]
if !found {
t.Fatalf("Drain node %q doesn't have pods defined in the test case.", node.Name)
}
err := ctx.ClusterSnapshot.AddNodeWithPods(node, pods)
if err != nil {
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
}
}
}

wantScaleDownStatus := &status.ScaleDownStatus{
Result: tc.wantStatus.result,
}
for _, scaleDownNodeInfo := range tc.wantStatus.scaledDownNodes {
statusScaledDownNode := &status.ScaleDownNode{
Node: generateNode(scaleDownNodeInfo.name),
NodeGroup: tc.nodeGroups[scaleDownNodeInfo.nodeGroup],
EvictedPods: scaleDownNodeInfo.evictedPods,
UtilInfo: scaleDownNodeInfo.utilInfo,
}
wantScaleDownStatus.ScaledDownNodes = append(wantScaleDownStatus.ScaledDownNodes, statusScaledDownNode)
}

// Create Actuator, run StartDeletion, and verify the error.
ndt := deletiontracker.NewNodeDeletionTracker(0)
ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, 0*time.Second)
evictor := Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults),
}
gotStatus, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes)
if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
t.Errorf("StartDeletion error diff (-want +got):\n%s", diff)
}

// Verify ScaleDownStatus looks as expected.
ignoreSdNodeOrder := cmpopts.SortSlices(func(a, b *status.ScaleDownNode) bool { return a.Node.Name < b.Node.Name })
ignoreTimestamps := cmpopts.IgnoreFields(status.ScaleDownStatus{}, "NodeDeleteResultsAsOf")
cmpNg := cmp.Comparer(func(a, b *testprovider.TestNodeGroup) bool { return a.Id() == b.Id() })
statusCmpOpts := cmp.Options{ignoreSdNodeOrder, ignoreTimestamps, cmpNg, cmpopts.EquateEmpty()}
if diff := cmp.Diff(wantScaleDownStatus, gotStatus, statusCmpOpts); diff != "" {
t.Errorf("StartDeletion status diff (-want +got):\n%s", diff)
}

// Verify that all expected nodes were deleted using the cloud provider hook.
var gotDeletedNodes []string
nodesLoop:
for i := 0; i < len(tc.wantDeletedNodes); i++ {
select {
case deletedNode := <-deletedNodes:
gotDeletedNodes = append(gotDeletedNodes, deletedNode)
case <-time.After(3 * time.Second):
t.Errorf("Timeout while waiting for deleted nodes.")
break nodesLoop
}
}
ignoreStrOrder := cmpopts.SortSlices(func(a, b string) bool { return a < b })
if diff := cmp.Diff(tc.wantDeletedNodes, gotDeletedNodes, ignoreStrOrder); diff != "" {
t.Errorf("deletedNodes diff (-want +got):\n%s", diff)
}

// Verify that all expected pods were deleted using the fake k8s client hook.
var gotDeletedPods []string
podsLoop:
for i := 0; i < len(tc.wantDeletedPods); i++ {
select {
case deletedPod := <-deletedPods:
gotDeletedPods = append(gotDeletedPods, deletedPod)
case <-time.After(3 * time.Second):
t.Errorf("Timeout while waiting for deleted pods.")
break podsLoop
}
}
if diff := cmp.Diff(tc.wantDeletedPods, gotDeletedPods, ignoreStrOrder); diff != "" {
t.Errorf("deletedPods diff (-want +got):\n%s", diff)
}

// Verify that all expected taint updates happened using the fake k8s client hook.
allUpdatesCount := 0
for _, updates := range tc.wantTaintUpdates {
allUpdatesCount += len(updates)
}
gotTaintUpdates := make(map[string][][]apiv1.Taint)
taintsLoop:
for i := 0; i < allUpdatesCount; i++ {
select {
case taintUpdate := <-taintUpdates:
gotTaintUpdates[taintUpdate.nodeName] = append(gotTaintUpdates[taintUpdate.nodeName], taintUpdate.taints)
case <-time.After(3 * time.Second):
t.Errorf("Timeout while waiting for taint updates.")
break taintsLoop
}
}
startupTaintValue := cmpopts.IgnoreFields(apiv1.Taint{}, "Value")
if diff := cmp.Diff(tc.wantTaintUpdates, gotTaintUpdates, startupTaintValue, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("taintUpdates diff (-want +got):\n%s", diff)
}

// Wait for all expected deletions to be reported in NodeDeletionTracker. Reporting happens shortly after the deletion
// in cloud provider we sync to above and so this will usually not wait at all. However, it can still happen
// that there is a delay between cloud provider deletion and reporting, in which case the results are not there yet
// and we need to wait for them before asserting.
err = waitForDeletionResultsCount(actuator.nodeDeletionTracker, len(tc.wantNodeDeleteResults), 3*time.Second, 200*time.Millisecond)
if err != nil {
t.Errorf("Timeout while waiting for node deletion results")
}

// Run StartDeletion again to gather node deletion results for deletions started in the previous call, and verify
// that they look as expected.
gotNextStatus, gotNextErr := actuator.StartDeletion(nil, nil)
if gotNextErr != nil {
t.Errorf("StartDeletion unexpected error: %v", gotNextErr)
}
if diff := cmp.Diff(tc.wantNodeDeleteResults, gotNextStatus.NodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" {
t.Errorf("NodeDeleteResults diff (-want +got):\n%s", diff)
}
deletedNodes <- node
return nil
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*a
// our normal handling for booting up nodes deal with this.
// TODO: Remove this call when we handle dynamically provisioned resources.
allNodes, readyNodes = a.processors.CustomResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes)
allNodes, readyNodes = taints.FilterOutNodesWithIgnoredTaints(a.taintConfig.IgnoredTaints, allNodes, readyNodes)
allNodes, readyNodes = taints.FilterOutNodesWithStartupTaints(a.taintConfig, allNodes, readyNodes)
return allNodes, readyNodes, nil
}

Expand Down
7 changes: 5 additions & 2 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ var (
regional = flag.Bool("regional", false, "Cluster is regional.")
newPodScaleUpDelay = flag.Duration("new-pod-scale-up-delay", 0*time.Second, "Pods less than this old will not be considered for scale-up. Can be increased for individual pods through annotation 'cluster-autoscaler.kubernetes.io/pod-scale-up-delay'.")

ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group")
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)")
startupTaintsFlag = multiStringFlag("startup-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Equivalent to ignore-taint)")
statusTaintsFlag = multiStringFlag("status-taint", "Specifies a taint to ignore in node templates when considering to scale a node group but nodes will not be treated as unready")
balancingIgnoreLabelsFlag = multiStringFlag("balancing-ignore-label", "Specifies a label to ignore in addition to the basic and cloud-provider set of labels when comparing if two node groups are similar")
balancingLabelsFlag = multiStringFlag("balancing-label", "Specifies a label to use for comparing if two node groups are similar, rather than the built in heuristics. Setting this flag disables all other comparison logic, and cannot be combined with --balancing-ignore-label.")
awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only")
Expand Down Expand Up @@ -327,7 +329,8 @@ func createAutoscalingOptions() config.AutoscalingOptions {
ExpendablePodsPriorityCutoff: *expendablePodsPriorityCutoff,
Regional: *regional,
NewPodScaleUpDelay: *newPodScaleUpDelay,
IgnoredTaints: *ignoreTaintsFlag,
StartupTaints: append(*ignoreTaintsFlag, *startupTaintsFlag...),
StatusTaints: *statusTaintsFlag,
BalancingExtraIgnoredLabels: *balancingIgnoreLabelsFlag,
BalancingLabels: *balancingLabelsFlag,
KubeConfigPath: *kubeConfigFile,
Expand Down
6 changes: 3 additions & 3 deletions cluster-autoscaler/utils/kubernetes/ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ const (
// still upcoming due to a missing resource (e.g. GPU).
ResourceUnready NodeNotReadyReason = "cluster-autoscaler.kubernetes.io/resource-not-ready"

// IgnoreTaint is a fake identifier used internally by Cluster Autoscaler
// StartupNodes is a fake identifier used internally by Cluster Autoscaler
// to indicate nodes that appear Ready in the API, but are treated as
// still upcoming due to applied ignore taint.
IgnoreTaint NodeNotReadyReason = "cluster-autoscaler.kubernetes.io/ignore-taint"
// still upcoming due to applied startup taint.
StartupNodes NodeNotReadyReason = "cluster-autoscaler.kubernetes.io/startup-taint"
)

// IsNodeReadyAndSchedulable returns true if the node is ready and schedulable.
Expand Down
59 changes: 39 additions & 20 deletions cluster-autoscaler/utils/taints/taints.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ const (
// IgnoreTaintPrefix any taint starting with it will be filtered out from autoscaler template node.
IgnoreTaintPrefix = "ignore-taint.cluster-autoscaler.kubernetes.io/"

// StartupTaintPrefix (Same as IgnoreTaintPrefix) any taint starting with it will be filtered out from autoscaler template node.
StartupTaintPrefix = "startup-taint.cluster-autoscaler.kubernetes.io/"

// StatusTaintPrefix any taint starting with it will be filtered out from autoscaler template node but unlike IgnoreTaintPrefix & StartupTaintPrefix it should not be trated as unready.
StatusTaintPrefix = "status-taint.cluster-autoscaler.kubernetes.io/"

gkeNodeTerminationHandlerTaint = "cloud.google.com/impending-node-termination"

// AWS: Indicates that a node has volumes stuck in attaching state and hence it is not fit for scheduling more pods
Expand All @@ -57,16 +63,18 @@ type TaintKeySet map[string]bool

// TaintConfig is a config of taints that require special handling
type TaintConfig struct {
IgnoredTaints TaintKeySet
StatusTaints TaintKeySet
StartupTaints TaintKeySet
StatusTaints TaintKeySet
StartupTaintPrefixes []string
StatusTaintPrefixes []string
}

// NewTaintConfig returns the taint config extracted from options
func NewTaintConfig(opts config.AutoscalingOptions) TaintConfig {
ignoredTaints := make(TaintKeySet)
for _, taintKey := range opts.IgnoredTaints {
klog.V(4).Infof("Ignoring taint %s on all NodeGroups", taintKey)
ignoredTaints[taintKey] = true
startupTaints := make(TaintKeySet)
for _, taintKey := range opts.StartupTaints {
klog.V(4).Infof("Startup taint %s on all NodeGroups", taintKey)
startupTaints[taintKey] = true
}

statusTaints := make(TaintKeySet)
Expand All @@ -76,8 +84,10 @@ func NewTaintConfig(opts config.AutoscalingOptions) TaintConfig {
}

return TaintConfig{
IgnoredTaints: ignoredTaints,
StatusTaints: statusTaints,
StartupTaints: startupTaints,
StatusTaints: statusTaints,
StartupTaintPrefixes: []string{IgnoreTaintPrefix, StartupTaintPrefix},
StatusTaintPrefixes: []string{StatusTaintPrefix},
}
}

Expand Down Expand Up @@ -319,6 +329,15 @@ func CleanAllTaints(nodes []*apiv1.Node, client kube_client.Interface, recorder
}
}

func matchesAnyPrefix(prefixes []string, key string) bool {
for _, prefix := range prefixes {
if strings.HasPrefix(key, prefix) {
return true
}
}
return false
}

// SanitizeTaints returns filtered taints
func SanitizeTaints(taints []apiv1.Taint, taintConfig TaintConfig) []apiv1.Taint {
var newTaints []apiv1.Taint
Expand All @@ -344,12 +363,12 @@ func SanitizeTaints(taints []apiv1.Taint, taintConfig TaintConfig) []apiv1.Taint
continue
}

if _, exists := taintConfig.IgnoredTaints[taint.Key]; exists {
klog.V(4).Infof("Removing ignored taint %s, when creating template from node", taint.Key)
if _, exists := taintConfig.StartupTaints[taint.Key]; exists {
klog.V(4).Infof("Removing startup taint %s, when creating template from node", taint.Key)
continue
}

if strings.HasPrefix(taint.Key, IgnoreTaintPrefix) {
shouldRemoveBasedOnPrefix := matchesAnyPrefix(taintConfig.StartupTaintPrefixes, taint.Key) || matchesAnyPrefix(taintConfig.StatusTaintPrefixes, taint.Key)
if shouldRemoveBasedOnPrefix {
klog.V(4).Infof("Removing taint %s based on prefix, when creation template from node", taint.Key)
continue
}
Expand All @@ -364,24 +383,24 @@ func SanitizeTaints(taints []apiv1.Taint, taintConfig TaintConfig) []apiv1.Taint
return newTaints
}

// FilterOutNodesWithIgnoredTaints override the condition status of the given nodes to mark them as NotReady when they have
// FilterOutNodesWithStartupTaints override the condition status of the given nodes to mark them as NotReady when they have
// filtered taints.
func FilterOutNodesWithIgnoredTaints(ignoredTaints TaintKeySet, allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) {
func FilterOutNodesWithStartupTaints(taintConfig TaintConfig, allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) {
newAllNodes := make([]*apiv1.Node, 0)
newReadyNodes := make([]*apiv1.Node, 0)
nodesWithIgnoredTaints := make(map[string]*apiv1.Node)
nodesWithStartupTaints := make(map[string]*apiv1.Node)
for _, node := range readyNodes {
if len(node.Spec.Taints) == 0 {
newReadyNodes = append(newReadyNodes, node)
continue
}
ready := true
for _, t := range node.Spec.Taints {
_, hasIgnoredTaint := ignoredTaints[t.Key]
if hasIgnoredTaint || strings.HasPrefix(t.Key, IgnoreTaintPrefix) {
_, hasStartupTaint := taintConfig.StartupTaints[t.Key]
if hasStartupTaint || matchesAnyPrefix(taintConfig.StartupTaintPrefixes, t.Key) {
ready = false
nodesWithIgnoredTaints[node.Name] = kubernetes.GetUnreadyNodeCopy(node, kubernetes.IgnoreTaint)
klog.V(3).Infof("Overriding status of node %v, which seems to have ignored taint %q", node.Name, t.Key)
nodesWithStartupTaints[node.Name] = kubernetes.GetUnreadyNodeCopy(node, kubernetes.StartupNodes)
klog.V(3).Infof("Overriding status of node %v, which seems to have startup taint %q", node.Name, t.Key)
break
}
}
Expand All @@ -391,7 +410,7 @@ func FilterOutNodesWithIgnoredTaints(ignoredTaints TaintKeySet, allNodes, readyN
}
// Override any node with ignored taint with its "unready" copy
for _, node := range allNodes {
if newNode, found := nodesWithIgnoredTaints[node.Name]; found {
if newNode, found := nodesWithStartupTaints[node.Name]; found {
newAllNodes = append(newAllNodes, newNode)
} else {
newAllNodes = append(newAllNodes, node)
Expand Down
Loading
Loading