diff --git a/Gopkg.lock b/Gopkg.lock index ebc0ab1e..4ecfcbb9 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -831,7 +831,7 @@ "google.golang.org/grpc/reflection", "google.golang.org/grpc/status", "gopkg.in/yaml.v2", - "k8s.io/api/apps/v1", + "k8s.io/api/apps/v1beta1", "k8s.io/api/core/v1", "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1", "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset", diff --git a/pkg/apis/v1alpha1/types.go b/pkg/apis/v1alpha1/types.go index 2753f293..5b971893 100644 --- a/pkg/apis/v1alpha1/types.go +++ b/pkg/apis/v1alpha1/types.go @@ -43,6 +43,8 @@ type CollectorSetSpec struct { Policy *CollectorSetPolicy `json:"policy"` ProxyURL string `json:"proxyURL,omitempty"` SecretName string `json:"secretName,omitempty"` + PriorityClassName string `json:"priorityClassName,omitempty"` // default value is empty string. If value is set then user must have PriorityClass resource created otherwise Pod will be rejected. + Tolerations []v1.Toleration `json:"tolerations"` // Tolerations are applied to pods, and allow the pods to schedule onto nodes with matching taints. } // CollectorSetStatus is the CollectorSet controller's status. diff --git a/pkg/apis/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/v1alpha1/zz_generated.deepcopy.go index 4b13f32c..2de9e4e7 100644 --- a/pkg/apis/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/v1alpha1/zz_generated.deepcopy.go @@ -6,6 +6,7 @@ package v1alpha1 import ( distributor "github.com/logicmonitor/k8s-collectorset-controller/pkg/distributor" + v1 "k8s.io/api/core/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -104,6 +105,13 @@ func (in *CollectorSetSpec) DeepCopyInto(out *CollectorSetSpec) { *out = new(CollectorSetPolicy) (*in).DeepCopyInto(*out) } + if in.Tolerations != nil { + in, out := &in.Tolerations, &out.Tolerations + *out = make([]v1.Toleration, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } diff --git a/pkg/controller/collectorset.go b/pkg/controller/collectorset.go index 7a43b183..d4b1ce95 100644 --- a/pkg/controller/collectorset.go +++ b/pkg/controller/collectorset.go @@ -12,6 +12,7 @@ import ( log "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -91,6 +92,8 @@ func CreateOrUpdateCollectorSet(collectorset *crv1alpha1.CollectorSet, controlle }, }, }, + PriorityClassName: collectorset.Spec.PriorityClassName, + Tolerations: getTolerations(collectorset), Containers: []apiv1.Container{ { Name: "collector", @@ -206,6 +209,7 @@ func getCollectorImagePullPolicy(collectorset *crv1alpha1.CollectorSet) (apiv1.P return collectorset.Spec.ImagePullPolicy, nil } return "", fmt.Errorf("unsupported imagePullPolicy value: %v, supported values: [%v, %v, %v]", collectorset.Spec.ImagePullPolicy, apiv1.PullAlways, apiv1.PullNever, apiv1.PullIfNotPresent) + } func setProxyConfiguration(collectorset *crv1alpha1.CollectorSet, statefulset *appsv1.StatefulSet) { @@ -250,6 +254,25 @@ func setProxyConfiguration(collectorset *crv1alpha1.CollectorSet, statefulset *a } } +func getTolerations(collectorset *crv1alpha1.CollectorSet) []v1.Toleration { + tolerations := []v1.Toleration{} + if collectorset.Spec.Tolerations != nil { + log.Debugf("Tolerations: %v", collectorset.Spec.Tolerations) + for _, toleration := range collectorset.Spec.Tolerations { + if toleration.Operator == v1.TolerationOpExists && toleration.Value != "" { + log.Errorf("Value must be empty when 'operator' is 'Exists'. Toleration: %v", toleration) + } else if toleration.Operator != v1.TolerationOpExists && toleration.Key == "" { + log.Errorf("Operator must be 'Exists' when 'key' is empty. Toleration: %v", toleration) + } else if toleration.Effect != v1.TaintEffectNoExecute && toleration.TolerationSeconds != nil { + log.Errorf("Effect must be 'NoExecute' when 'tolerationSeconds' is set. Toleration: %v", toleration) + } else { + tolerations = append(tolerations, toleration) + } + } + } + return tolerations +} + func updateCollectors(client *client.LMSdkGo, ids []int32) error { // if there is only one collector, there will be no backup for it if len(ids) < 2 { @@ -275,12 +298,12 @@ func updateCollectors(client *client.LMSdkGo, ids []int32) error { // DeleteCollectorSet deletes the collectorset. func DeleteCollectorSet(collectorset *crv1alpha1.CollectorSet, client clientset.Interface) error { data := []byte(`[{"op":"add","path":"/spec/replicas","value": 0}]`) - if _, err := client.AppsV1beta1().StatefulSets(collectorset.Namespace).Patch(collectorset.Name, types.JSONPatchType, data); err != nil { + if _, err := client.AppsV1().StatefulSets(collectorset.Namespace).Patch(collectorset.Name, types.JSONPatchType, data); err != nil { return err } deleteOpts := metav1.DeleteOptions{} - return client.AppsV1beta1().StatefulSets(collectorset.Namespace).Delete(collectorset.Name, &deleteOpts) + return client.AppsV1().StatefulSets(collectorset.Namespace).Delete(collectorset.Name, &deleteOpts) } func checkCollectorGroupExistsByID(client *client.LMSdkGo, id int32) bool {