From cf3114a1e2ef65f1b727bec9acc5e0ca320b372d Mon Sep 17 00:00:00 2001
From: Shiming Zhang
Date: Mon, 2 Sep 2024 19:07:03 +0800
Subject: [PATCH] Use manage to specify resources
---
.../v1alpha1/kwok_configuration_types.go | 7 +
.../v1alpha1/kwok_manage_selector_types.go | 38 +++
.../config/v1alpha1/zz_generated.deepcopy.go | 59 +++++
.../kwok_configuration_types.go | 7 +
.../internalversion/kwok_manages_selector.go | 242 ++++++++++++++++++
.../kwok_manages_selector_node.go | 96 +++++++
.../kwok_manages_selector_test.go | 122 +++++++++
.../zz_generated.conversion.go | 44 ++++
.../internalversion/zz_generated.deepcopy.go | 75 ++++++
pkg/kwok/cmd/root.go | 192 +++++++-------
pkg/kwok/controllers/controller.go | 107 ++++----
pkg/kwok/server/server.go | 36 +++
site/content/en/docs/generated/apis.md | 143 ++++++++++-
site/content/en/docs/generated/kwok.md | 33 ++-
14 files changed, 1046 insertions(+), 155 deletions(-)
create mode 100644 pkg/apis/config/v1alpha1/kwok_manage_selector_types.go
create mode 100644 pkg/apis/internalversion/kwok_manages_selector.go
create mode 100644 pkg/apis/internalversion/kwok_manages_selector_node.go
create mode 100644 pkg/apis/internalversion/kwok_manages_selector_test.go
diff --git a/pkg/apis/config/v1alpha1/kwok_configuration_types.go b/pkg/apis/config/v1alpha1/kwok_configuration_types.go
index 16980bfbc5..f7302455cb 100644
--- a/pkg/apis/config/v1alpha1/kwok_configuration_types.go
+++ b/pkg/apis/config/v1alpha1/kwok_configuration_types.go
@@ -70,11 +70,15 @@ type KwokConfigurationOptions struct {
// is the default value for flag --tls-private-key-file
TLSPrivateKeyFile string `json:"tlsPrivateKeyFile,omitempty"`
+ // Manages is the option to manage an resources
+ Manages ManagesSelectors `json:"manages,omitempty"`
+
// ManageSingleNode is the option to manage a single node name.
// is the default value for flag --manage-single-node
// Note: when `manage-all-nodes` is specified as true or
// `manage-nodes-with-label-selector` or `manage-nodes-with-annotation-selector` is specified,
// this is a no-op.
+ // Deprecated: use Manages instead
ManageSingleNode string `json:"manageSingleNode,omitempty"`
// Default option to manage (i.e., maintain heartbeat/liveness of) all Nodes or not.
@@ -83,6 +87,7 @@ type KwokConfigurationOptions struct {
// `manage-nodes-with-label-selector` or `manage-nodes-with-annotation-selector` is specified,
// this is a no-op.
// +default=false
+ // Deprecated: use Manages instead
ManageAllNodes *bool `json:"manageAllNodes,omitempty"`
// Default annotations specified on Nodes to demand manage.
@@ -90,6 +95,7 @@ type KwokConfigurationOptions struct {
// Note: when `all-node-manage` is specified as true or
// `manage-single-node` is specified,
// this is a no-op.
+ // Deprecated: use Manages instead
ManageNodesWithAnnotationSelector string `json:"manageNodesWithAnnotationSelector,omitempty"`
// Default labels specified on Nodes to demand manage.
@@ -97,6 +103,7 @@ type KwokConfigurationOptions struct {
// Note: when `all-node-manage` is specified as true or
// `manage-single-node` is specified,
// this is a no-op.
+ // Deprecated: use Manages instead
ManageNodesWithLabelSelector string `json:"manageNodesWithLabelSelector,omitempty"`
// If a Node/Pod is on a managed Node and has this annotation status will not be modified
diff --git a/pkg/apis/config/v1alpha1/kwok_manage_selector_types.go b/pkg/apis/config/v1alpha1/kwok_manage_selector_types.go
new file mode 100644
index 0000000000..b3fe4b275c
--- /dev/null
+++ b/pkg/apis/config/v1alpha1/kwok_manage_selector_types.go
@@ -0,0 +1,38 @@
+/*
+Copyright 2024 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1alpha1
+
+// ManagesSelectors holds information about the manages selectors.
+type ManagesSelectors []ManagesSelector
+
+// ManagesSelector holds information about the manages selector.
+type ManagesSelector struct {
+ // Kind of the referent.
+ Kind string `json:"kind"`
+ // Group of the referent.
+ Group string `json:"group,omitempty"`
+ // Version of the referent.
+ Version string `json:"version,omitempty"`
+ // Namespace of the referent. only valid if it is a namespaced.
+ Namespace string `json:"namespace,omitempty"`
+ // Name of the referent. specify only this one.
+ Name string `json:"name,omitempty"`
+ // Labels of the referent. specify matched with labels.
+ Labels map[string]string `json:"labels,omitempty"`
+ // Annotations of the referent. specify matched with annotations.
+ Annotations map[string]string `json:"annotations,omitempty"`
+}
diff --git a/pkg/apis/config/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/config/v1alpha1/zz_generated.deepcopy.go
index 1b5f881f11..c499319526 100644
--- a/pkg/apis/config/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/config/v1alpha1/zz_generated.deepcopy.go
@@ -201,6 +201,13 @@ func (in *KwokConfigurationOptions) DeepCopyInto(out *KwokConfigurationOptions)
*out = make([]string, len(*in))
copy(*out, *in)
}
+ if in.Manages != nil {
+ in, out := &in.Manages, &out.Manages
+ *out = make(ManagesSelectors, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
if in.ManageAllNodes != nil {
in, out := &in.ManageAllNodes, &out.ManageAllNodes
*out = new(bool)
@@ -404,6 +411,58 @@ func (in *KwokctlResource) DeepCopyObject() runtime.Object {
return nil
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ManagesSelector) DeepCopyInto(out *ManagesSelector) {
+ *out = *in
+ if in.Labels != nil {
+ in, out := &in.Labels, &out.Labels
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ if in.Annotations != nil {
+ in, out := &in.Annotations, &out.Annotations
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ManagesSelector.
+func (in *ManagesSelector) DeepCopy() *ManagesSelector {
+ if in == nil {
+ return nil
+ }
+ out := new(ManagesSelector)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in ManagesSelectors) DeepCopyInto(out *ManagesSelectors) {
+ {
+ in := &in
+ *out = make(ManagesSelectors, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ return
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ManagesSelectors.
+func (in ManagesSelectors) DeepCopy() ManagesSelectors {
+ if in == nil {
+ return nil
+ }
+ out := new(ManagesSelectors)
+ in.DeepCopyInto(out)
+ return *out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Port) DeepCopyInto(out *Port) {
*out = *in
diff --git a/pkg/apis/internalversion/kwok_configuration_types.go b/pkg/apis/internalversion/kwok_configuration_types.go
index 40fa116534..1e8173efe2 100644
--- a/pkg/apis/internalversion/kwok_configuration_types.go
+++ b/pkg/apis/internalversion/kwok_configuration_types.go
@@ -52,16 +52,23 @@ type KwokConfigurationOptions struct {
// TLSPrivateKeyFile is the ile containing x509 private key
TLSPrivateKeyFile string
+ // Manages is the option to manage the resource
+ Manages ManagesSelectors
+
// ManageSingleNode is the option to manage a single node name
+ // Deprecated: use Manages instead
ManageSingleNode string
// Default option to manage (i.e., maintain heartbeat/liveness of) all Nodes or not.
+ // Deprecated: use Manages instead
ManageAllNodes bool
// Default annotations specified on Nodes to demand manage.
+ // Deprecated: use Manages instead
ManageNodesWithAnnotationSelector string
// Default labels specified on Nodes to demand manage.
+ // Deprecated: use Manages instead
ManageNodesWithLabelSelector string
// If a Node/Pod is on a managed Node and has this annotation status will not be modified
diff --git a/pkg/apis/internalversion/kwok_manages_selector.go b/pkg/apis/internalversion/kwok_manages_selector.go
new file mode 100644
index 0000000000..55ff82a217
--- /dev/null
+++ b/pkg/apis/internalversion/kwok_manages_selector.go
@@ -0,0 +1,242 @@
+/*
+Copyright 2024 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internalversion
+
+import (
+ "fmt"
+ "sort"
+ "strings"
+
+ "k8s.io/apimachinery/pkg/fields"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/selection"
+
+ "sigs.k8s.io/kwok/pkg/utils/maps"
+ "sigs.k8s.io/kwok/pkg/utils/slices"
+)
+
+// ManagesSelectors holds information about the manages selectors.
+type ManagesSelectors []ManagesSelector
+
+// ManagesSelector holds information about the manages selector.
+type ManagesSelector struct {
+ // Kind of the referent.
+ Kind string
+ // Group of the referent.
+ Group string
+ // Version of the referent.
+ Version string
+ // Namespace of the referent. only valid if it is a namespaced.
+ Namespace string
+ // Name of the referent. specify only this one.
+ Name string
+ // Labels of the referent. specify matched with labels.
+ Labels map[string]string
+ // Annotations of the referent. specify matched with annotations.
+ Annotations map[string]string
+}
+
+func (s *ManagesSelectors) Set(sel string) error {
+ p, err := parseManagesSelector(sel)
+ if err != nil {
+ return err
+ }
+ *s = append(*s, *p)
+ return nil
+}
+
+func (s ManagesSelectors) Type() string {
+ return "ManagesSelectorSlice"
+}
+
+func (s ManagesSelectors) String() string {
+ strSlice := slices.Map(s, func(t ManagesSelector) string {
+ return t.String()
+ })
+ return strings.Join(strSlice, " ")
+}
+
+func (s *ManagesSelector) Set(sel string) error {
+ p, err := parseManagesSelector(sel)
+ if err != nil {
+ return err
+ }
+ *s = *p
+ return nil
+}
+
+func (s *ManagesSelector) Type() string {
+ return "ManagesSelector"
+}
+
+func parseManagesSelector(arg string) (*ManagesSelector, error) {
+ items := strings.Split(arg, ":")
+
+ t := ManagesSelector{}
+ gvk := items[0]
+ if gvk == "" {
+ return nil, fmt.Errorf("invalid empty target resource ref")
+ }
+
+ sepVersion := strings.Index(gvk, "/")
+ if sepVersion != -1 {
+ t.Version = gvk[sepVersion+1:]
+ gvk = gvk[:sepVersion]
+ }
+
+ sepGroup := strings.Index(gvk, ".")
+ if sepGroup != -1 {
+ t.Kind = gvk[:sepGroup]
+ t.Group = gvk[sepGroup+1:]
+ } else {
+ t.Kind = gvk
+ }
+
+ for _, item := range items[1:] {
+ sel, err := fields.ParseSelector(item)
+ if err != nil {
+ return nil, err
+ }
+ for _, req := range sel.Requirements() {
+ if req.Operator != selection.Equals && req.Operator != selection.DoubleEquals {
+ return nil, fmt.Errorf("invalid selector requirements: %s", req.Operator)
+ }
+ switch req.Field {
+ case "metadata.name":
+ t.Name = req.Value
+ case "metadata.namespace":
+ t.Namespace = req.Value
+ default:
+ sp := strings.SplitN(req.Field, ".", 3)
+ if len(sp) < 2 {
+ return nil, fmt.Errorf("error target resource ref: %s", item)
+ }
+ if sp[0] != "metadata" {
+ return nil, fmt.Errorf("error target resource ref: %s", item)
+ }
+
+ switch sp[1] {
+ case "labels":
+ if t.Labels == nil {
+ t.Labels = map[string]string{}
+ }
+ t.Labels[sp[2]] = req.Value
+ case "annotations":
+ if t.Annotations == nil {
+ t.Annotations = map[string]string{}
+ }
+ t.Annotations[sp[2]] = req.Value
+ default:
+ return nil, fmt.Errorf("error target resource ref: %s", item)
+ }
+ }
+ }
+ }
+ return &t, nil
+}
+
+func (s *ManagesSelector) String() string {
+ if s == nil {
+ return ""
+ }
+
+ buf := &strings.Builder{}
+ buf.WriteString(s.Kind)
+ if s.Group != "" {
+ buf.WriteString(fmt.Sprintf(".%s", s.Group))
+ }
+ if s.Version != "" {
+ buf.WriteString(fmt.Sprintf("/%s", s.Version))
+ }
+ if s.Name != "" {
+ buf.WriteString(fmt.Sprintf(":metadata.name=%s", s.Name))
+ }
+ if s.Namespace != "" {
+ buf.WriteString(fmt.Sprintf(":metadata.namespace=%s", s.Namespace))
+ }
+ if len(s.Labels) > 0 {
+ keys := maps.Keys(s.Labels)
+ sort.Strings(keys)
+ for _, k := range keys {
+ buf.WriteString(fmt.Sprintf(":metadata.labels.%s=%s", k, s.Labels[k]))
+ }
+ }
+ if len(s.Annotations) > 0 {
+ keys := maps.Keys(s.Annotations)
+ sort.Strings(keys)
+ for _, k := range keys {
+ buf.WriteString(fmt.Sprintf(":metadata.annotations.%s=%s", k, s.Annotations[k]))
+ }
+ }
+ return buf.String()
+}
+
+func (s ManagesSelectors) MatchStage(stage *Stage) bool {
+ for _, t := range s {
+ if t.MatchStage(stage) {
+ return true
+ }
+ }
+ return false
+}
+
+func (s *ManagesSelector) MatchStage(stage *Stage) bool {
+ spec := stage.Spec
+ rr := spec.ResourceRef
+
+ if s.Kind != rr.Kind {
+ return false
+ }
+
+ gv := schema.GroupVersion{
+ Group: s.Group,
+ Version: s.Version,
+ }
+ apiGroup := gv.String()
+ if apiGroup == "" {
+ apiGroup = "v1"
+ }
+
+ if rr.APIGroup == "" {
+ rr.APIGroup = "v1"
+ }
+
+ if apiGroup != rr.APIGroup {
+ return false
+ }
+
+ if spec.Selector != nil {
+ if len(s.Labels) != 0 {
+ ml := spec.Selector.MatchLabels
+ for k, v := range s.Labels {
+ if mv, ok := ml[k]; ok && mv != v {
+ return false
+ }
+ }
+ }
+ if len(s.Annotations) != 0 {
+ ma := spec.Selector.MatchAnnotations
+ for k, v := range s.Annotations {
+ if mv, ok := ma[k]; ok && mv != v {
+ return false
+ }
+ }
+ }
+ }
+
+ return true
+}
diff --git a/pkg/apis/internalversion/kwok_manages_selector_node.go b/pkg/apis/internalversion/kwok_manages_selector_node.go
new file mode 100644
index 0000000000..3f440df24e
--- /dev/null
+++ b/pkg/apis/internalversion/kwok_manages_selector_node.go
@@ -0,0 +1,96 @@
+/*
+Copyright 2024 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internalversion
+
+import (
+ "fmt"
+
+ "k8s.io/apimachinery/pkg/labels"
+)
+
+// ManageNodeSelector is a struct that holds how to manage nodes.
+type ManageNodeSelector struct {
+ // ManageSingleNode is the option to manage a single node name
+ ManageSingleNode string
+
+ // Default option to manage (i.e., maintain heartbeat/liveness of) all Nodes or not.
+ ManageAllNodes bool
+
+ // Default annotations specified on Nodes to demand manage.
+ ManageNodesWithAnnotationSelector string
+
+ // Default labels specified on Nodes to demand manage.
+ ManageNodesWithLabelSelector string
+}
+
+// IsEmpty means that no node needs to be managed.
+func (n ManageNodeSelector) IsEmpty() bool {
+ return !n.ManageAllNodes &&
+ n.ManageSingleNode == "" &&
+ n.ManageNodesWithAnnotationSelector == "" &&
+ n.ManageNodesWithLabelSelector == ""
+}
+
+// NodeSelector returns the selector of nodes
+func (s ManagesSelectors) NodeSelector() (ManageNodeSelector, error) {
+ var n *ManageNodeSelector
+ for _, sel := range s {
+ // TODO: Node, Lease, Pod can be maintained separately by different controllers.
+ if sel.Kind == "Pod" &&
+ sel.Group == "" &&
+ (sel.Version == "" || sel.Version == "v1") {
+ return ManageNodeSelector{}, fmt.Errorf("unsupported pod selector type")
+ }
+ if sel.Kind == "Lease" &&
+ sel.Namespace == "kube-node-lease" &&
+ sel.Group == "coordination.k8s.io" &&
+ (sel.Version == "" || sel.Version == "v1") {
+ return ManageNodeSelector{}, fmt.Errorf("unsupported leases.coordination.k8s.io on kube-node-lease selector type")
+ }
+
+ if sel.Kind != "Node" || sel.Group != "" || !(sel.Version == "" || sel.Version == "v1") {
+ continue
+ }
+
+ // TODO: Support multiple nodes selector
+ if n != nil {
+ return ManageNodeSelector{}, fmt.Errorf("duplicate node selector: %v", sel)
+ }
+
+ if sel.Namespace != "" {
+ return ManageNodeSelector{}, fmt.Errorf("invalid node selector with namespace %q", sel.Namespace)
+ }
+
+ n = &ManageNodeSelector{}
+
+ if sel.Name != "" {
+ n.ManageSingleNode = sel.Name
+ }
+ if len(sel.Labels) != 0 {
+ n.ManageNodesWithLabelSelector = labels.Set(sel.Labels).String()
+ }
+ if len(sel.Annotations) != 0 {
+ n.ManageNodesWithAnnotationSelector = labels.Set(sel.Annotations).String()
+ }
+ }
+
+ if n == nil {
+ return ManageNodeSelector{}, nil
+ }
+
+ return *n, nil
+}
diff --git a/pkg/apis/internalversion/kwok_manages_selector_test.go b/pkg/apis/internalversion/kwok_manages_selector_test.go
new file mode 100644
index 0000000000..c71ac9a63d
--- /dev/null
+++ b/pkg/apis/internalversion/kwok_manages_selector_test.go
@@ -0,0 +1,122 @@
+/*
+Copyright 2024 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internalversion
+
+import (
+ "reflect"
+ "testing"
+)
+
+func Test_parseManagesSelector(t *testing.T) {
+ tests := []struct {
+ name string
+ args string
+ want *ManagesSelector
+ wantErr bool
+ }{
+ {
+ name: "empty",
+ args: "",
+ wantErr: true,
+ },
+ {
+ name: "pod",
+ args: "pod",
+ want: &ManagesSelector{
+ Kind: "pod",
+ },
+ },
+ {
+ name: "pod v1",
+ args: "pod/v1",
+ want: &ManagesSelector{
+ Kind: "pod",
+ Version: "v1",
+ },
+ },
+ {
+ name: "deploy.apps",
+ args: "deploy.apps",
+ want: &ManagesSelector{
+ Kind: "deploy",
+ Group: "apps",
+ },
+ },
+ {
+ name: "deploy.apps v1",
+ args: "deploy.apps/v1",
+ want: &ManagesSelector{
+ Kind: "deploy",
+ Group: "apps",
+ Version: "v1",
+ },
+ },
+ {
+ name: "pod name=po",
+ args: "pod:metadata.name=po",
+ want: &ManagesSelector{
+ Kind: "pod",
+ Name: "po",
+ },
+ },
+ {
+ name: "pod namespace=ns",
+ args: "pod:metadata.namespace=ns",
+ want: &ManagesSelector{
+ Kind: "pod",
+ Namespace: "ns",
+ },
+ },
+ {
+ name: "pod labels.apps.group=xxx",
+ args: "pod:metadata.labels.apps.group=xxx",
+ want: &ManagesSelector{
+ Kind: "pod",
+ Labels: map[string]string{
+ "apps.group": "xxx",
+ },
+ },
+ },
+ {
+ name: "pod annotations.apps.group=xxx",
+ args: "pod:metadata.annotations.apps.group=xxx",
+ want: &ManagesSelector{
+ Kind: "pod",
+ Annotations: map[string]string{
+ "apps.group": "xxx",
+ },
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := parseManagesSelector(tt.args)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("ParseTargetResourceRef() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("ParseTargetResourceRef() got = %v, want %v", got, tt.want)
+ }
+
+ rev := got.String()
+ if rev != tt.args {
+ t.Errorf("reverse got = %v, want %v", rev, tt.args)
+ }
+ })
+ }
+}
diff --git a/pkg/apis/internalversion/zz_generated.conversion.go b/pkg/apis/internalversion/zz_generated.conversion.go
index 65008ceaae..b4d243b9b6 100644
--- a/pkg/apis/internalversion/zz_generated.conversion.go
+++ b/pkg/apis/internalversion/zz_generated.conversion.go
@@ -410,6 +410,16 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
+ if err := s.AddGeneratedConversionFunc((*ManagesSelector)(nil), (*configv1alpha1.ManagesSelector)(nil), func(a, b interface{}, scope conversion.Scope) error {
+ return Convert_internalversion_ManagesSelector_To_v1alpha1_ManagesSelector(a.(*ManagesSelector), b.(*configv1alpha1.ManagesSelector), scope)
+ }); err != nil {
+ return err
+ }
+ if err := s.AddGeneratedConversionFunc((*configv1alpha1.ManagesSelector)(nil), (*ManagesSelector)(nil), func(a, b interface{}, scope conversion.Scope) error {
+ return Convert_v1alpha1_ManagesSelector_To_internalversion_ManagesSelector(a.(*configv1alpha1.ManagesSelector), b.(*ManagesSelector), scope)
+ }); err != nil {
+ return err
+ }
if err := s.AddGeneratedConversionFunc((*Metric)(nil), (*v1alpha1.Metric)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_internalversion_Metric_To_v1alpha1_Metric(a.(*Metric), b.(*v1alpha1.Metric), scope)
}); err != nil {
@@ -1492,6 +1502,7 @@ func autoConvert_internalversion_KwokConfigurationOptions_To_v1alpha1_KwokConfig
out.NodePort = in.NodePort
out.TLSCertFile = in.TLSCertFile
out.TLSPrivateKeyFile = in.TLSPrivateKeyFile
+ out.Manages = *(*configv1alpha1.ManagesSelectors)(unsafe.Pointer(&in.Manages))
out.ManageSingleNode = in.ManageSingleNode
if err := v1.Convert_bool_To_Pointer_bool(&in.ManageAllNodes, &out.ManageAllNodes, s); err != nil {
return err
@@ -1533,6 +1544,7 @@ func autoConvert_v1alpha1_KwokConfigurationOptions_To_internalversion_KwokConfig
out.NodePort = in.NodePort
out.TLSCertFile = in.TLSCertFile
out.TLSPrivateKeyFile = in.TLSPrivateKeyFile
+ out.Manages = *(*ManagesSelectors)(unsafe.Pointer(&in.Manages))
out.ManageSingleNode = in.ManageSingleNode
if err := v1.Convert_Pointer_bool_To_bool(&in.ManageAllNodes, &out.ManageAllNodes, s); err != nil {
return err
@@ -2004,6 +2016,38 @@ func Convert_v1alpha1_LogsSpec_To_internalversion_LogsSpec(in *v1alpha1.LogsSpec
return autoConvert_v1alpha1_LogsSpec_To_internalversion_LogsSpec(in, out, s)
}
+func autoConvert_internalversion_ManagesSelector_To_v1alpha1_ManagesSelector(in *ManagesSelector, out *configv1alpha1.ManagesSelector, s conversion.Scope) error {
+ out.Kind = in.Kind
+ out.Group = in.Group
+ out.Version = in.Version
+ out.Namespace = in.Namespace
+ out.Name = in.Name
+ out.Labels = *(*map[string]string)(unsafe.Pointer(&in.Labels))
+ out.Annotations = *(*map[string]string)(unsafe.Pointer(&in.Annotations))
+ return nil
+}
+
+// Convert_internalversion_ManagesSelector_To_v1alpha1_ManagesSelector is an autogenerated conversion function.
+func Convert_internalversion_ManagesSelector_To_v1alpha1_ManagesSelector(in *ManagesSelector, out *configv1alpha1.ManagesSelector, s conversion.Scope) error {
+ return autoConvert_internalversion_ManagesSelector_To_v1alpha1_ManagesSelector(in, out, s)
+}
+
+func autoConvert_v1alpha1_ManagesSelector_To_internalversion_ManagesSelector(in *configv1alpha1.ManagesSelector, out *ManagesSelector, s conversion.Scope) error {
+ out.Kind = in.Kind
+ out.Group = in.Group
+ out.Version = in.Version
+ out.Namespace = in.Namespace
+ out.Name = in.Name
+ out.Labels = *(*map[string]string)(unsafe.Pointer(&in.Labels))
+ out.Annotations = *(*map[string]string)(unsafe.Pointer(&in.Annotations))
+ return nil
+}
+
+// Convert_v1alpha1_ManagesSelector_To_internalversion_ManagesSelector is an autogenerated conversion function.
+func Convert_v1alpha1_ManagesSelector_To_internalversion_ManagesSelector(in *configv1alpha1.ManagesSelector, out *ManagesSelector, s conversion.Scope) error {
+ return autoConvert_v1alpha1_ManagesSelector_To_internalversion_ManagesSelector(in, out, s)
+}
+
func autoConvert_internalversion_Metric_To_v1alpha1_Metric(in *Metric, out *v1alpha1.Metric, s conversion.Scope) error {
out.ObjectMeta = in.ObjectMeta
if err := Convert_internalversion_MetricSpec_To_v1alpha1_MetricSpec(&in.Spec, &out.Spec, s); err != nil {
diff --git a/pkg/apis/internalversion/zz_generated.deepcopy.go b/pkg/apis/internalversion/zz_generated.deepcopy.go
index 3bbf08b6a7..fb56747cbf 100644
--- a/pkg/apis/internalversion/zz_generated.deepcopy.go
+++ b/pkg/apis/internalversion/zz_generated.deepcopy.go
@@ -682,6 +682,13 @@ func (in *KwokConfigurationOptions) DeepCopyInto(out *KwokConfigurationOptions)
*out = make([]string, len(*in))
copy(*out, *in)
}
+ if in.Manages != nil {
+ in, out := &in.Manages, &out.Manages
+ *out = make(ManagesSelectors, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
return
}
@@ -859,6 +866,74 @@ func (in *LogsSpec) DeepCopy() *LogsSpec {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ManageNodeSelector) DeepCopyInto(out *ManageNodeSelector) {
+ *out = *in
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ManageNodeSelector.
+func (in *ManageNodeSelector) DeepCopy() *ManageNodeSelector {
+ if in == nil {
+ return nil
+ }
+ out := new(ManageNodeSelector)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ManagesSelector) DeepCopyInto(out *ManagesSelector) {
+ *out = *in
+ if in.Labels != nil {
+ in, out := &in.Labels, &out.Labels
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ if in.Annotations != nil {
+ in, out := &in.Annotations, &out.Annotations
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ManagesSelector.
+func (in *ManagesSelector) DeepCopy() *ManagesSelector {
+ if in == nil {
+ return nil
+ }
+ out := new(ManagesSelector)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in ManagesSelectors) DeepCopyInto(out *ManagesSelectors) {
+ {
+ in := &in
+ *out = make(ManagesSelectors, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ return
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ManagesSelectors.
+func (in ManagesSelectors) DeepCopy() ManagesSelectors {
+ if in == nil {
+ return nil
+ }
+ out := new(ManagesSelectors)
+ in.DeepCopyInto(out)
+ return *out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Metric) DeepCopyInto(out *Metric) {
*out = *in
diff --git a/pkg/kwok/cmd/root.go b/pkg/kwok/cmd/root.go
index c49ee08d60..73e696df50 100644
--- a/pkg/kwok/cmd/root.go
+++ b/pkg/kwok/cmd/root.go
@@ -82,10 +82,15 @@ func NewCommand(ctx context.Context) *cobra.Command {
cmd.Flags().IntVar(&flags.Options.NodePort, "node-port", flags.Options.NodePort, "Port of the node")
cmd.Flags().StringVar(&flags.Options.TLSCertFile, "tls-cert-file", flags.Options.TLSCertFile, "File containing the default x509 Certificate for HTTPS")
cmd.Flags().StringVar(&flags.Options.TLSPrivateKeyFile, "tls-private-key-file", flags.Options.TLSPrivateKeyFile, "File containing the default x509 private key matching --tls-cert-file")
+ cmd.Flags().Var(&flags.Options.Manages, "manage", "Manages resources")
cmd.Flags().StringVar(&flags.Options.ManageSingleNode, "manage-single-node", flags.Options.ManageSingleNode, "Node that matches the name will be watched and managed. It's conflicted with manage-nodes-with-annotation-selector, manage-nodes-with-label-selector and manage-all-nodes.")
+ _ = cmd.Flags().MarkDeprecated("manage-single-node", "Please use --manage Node:metadata.name= instead")
cmd.Flags().BoolVar(&flags.Options.ManageAllNodes, "manage-all-nodes", flags.Options.ManageAllNodes, "All nodes will be watched and managed. It's conflicted with manage-nodes-with-annotation-selector, manage-nodes-with-label-selector and manage-single-node.")
+ _ = cmd.Flags().MarkDeprecated("manage-all-nodes", "Please use --manage Node instead")
cmd.Flags().StringVar(&flags.Options.ManageNodesWithAnnotationSelector, "manage-nodes-with-annotation-selector", flags.Options.ManageNodesWithAnnotationSelector, "Nodes that match the annotation selector will be watched and managed. It's conflicted with manage-all-nodes and manage-single-node.")
+ _ = cmd.Flags().MarkDeprecated("manage-nodes-with-annotation-selector", "Please use --manage Node:metadata.annotations.= instead")
cmd.Flags().StringVar(&flags.Options.ManageNodesWithLabelSelector, "manage-nodes-with-label-selector", flags.Options.ManageNodesWithLabelSelector, "Nodes that match the label selector will be watched and managed. It's conflicted with manage-all-nodes and manage-single-node.")
+ _ = cmd.Flags().MarkDeprecated("manage-nodes-with-label-selector", "Please use --manage Node:metadata.labels.= instead")
cmd.Flags().StringVar(&flags.Options.DisregardStatusWithAnnotationSelector, "disregard-status-with-annotation-selector", flags.Options.DisregardStatusWithAnnotationSelector, "All node/pod status excluding the ones that match the annotation selector will be watched and managed.")
_ = cmd.Flags().MarkDeprecated("disregard-status-with-annotation-selector", "Please use Stage API instead")
cmd.Flags().StringVar(&flags.Options.DisregardStatusWithLabelSelector, "disregard-status-with-label-selector", flags.Options.DisregardStatusWithLabelSelector, "All node/pod status excluding the ones that match the label selector will be watched and managed.")
@@ -215,18 +220,31 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}
- switch {
- case flags.Options.ManageSingleNode != "":
- logger.Info("Watch single node",
- "node", flags.Options.ManageSingleNode,
- )
- case flags.Options.ManageAllNodes:
- logger.Info("Watch all nodes")
- case flags.Options.ManageNodesWithAnnotationSelector != "" || flags.Options.ManageNodesWithLabelSelector != "":
- logger.Info("Watch nodes",
- "annotation", flags.Options.ManageNodesWithAnnotationSelector,
- "label", flags.Options.ManageNodesWithLabelSelector,
- )
+ manages := flags.Options.Manages
+ var nodeSel internalversion.ManageNodeSelector
+ if len(manages) != 0 {
+ nodeSel, err = manages.NodeSelector()
+ if err != nil {
+ return err
+ }
+ } else {
+ switch {
+ case flags.Options.ManageSingleNode != "":
+ logger.Info("Watch single node",
+ "node", flags.Options.ManageSingleNode,
+ )
+ nodeSel.ManageSingleNode = flags.Options.ManageSingleNode
+ case flags.Options.ManageAllNodes:
+ logger.Info("Watch all nodes")
+ nodeSel.ManageAllNodes = true
+ case flags.Options.ManageNodesWithAnnotationSelector != "" || flags.Options.ManageNodesWithLabelSelector != "":
+ logger.Info("Watch nodes",
+ "annotation", flags.Options.ManageNodesWithAnnotationSelector,
+ "label", flags.Options.ManageNodesWithLabelSelector,
+ )
+ nodeSel.ManageNodesWithLabelSelector = flags.Options.ManageNodesWithLabelSelector
+ nodeSel.ManageNodesWithAnnotationSelector = flags.Options.ManageNodesWithAnnotationSelector
+ }
}
id, err := controllers.Identity()
@@ -248,10 +266,12 @@ func runE(ctx context.Context, flags *flagpole) error {
EnableCNI: flags.Options.EnableCNI,
EnableMetrics: enableMetrics,
EnablePodCache: enableMetrics,
- ManageSingleNode: flags.Options.ManageSingleNode,
- ManageAllNodes: flags.Options.ManageAllNodes,
- ManageNodesWithAnnotationSelector: flags.Options.ManageNodesWithAnnotationSelector,
- ManageNodesWithLabelSelector: flags.Options.ManageNodesWithLabelSelector,
+ Manages: manages,
+ NoManageNode: nodeSel.IsEmpty(),
+ ManageSingleNode: nodeSel.ManageSingleNode,
+ ManageAllNodes: nodeSel.ManageAllNodes,
+ ManageNodesWithAnnotationSelector: nodeSel.ManageNodesWithAnnotationSelector,
+ ManageNodesWithLabelSelector: nodeSel.ManageNodesWithLabelSelector,
DisregardStatusWithAnnotationSelector: flags.Options.DisregardStatusWithAnnotationSelector,
DisregardStatusWithLabelSelector: flags.Options.DisregardStatusWithLabelSelector,
CIDR: flags.Options.CIDR,
@@ -274,7 +294,7 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}
- err = startServer(ctx, flags, ctr, typedKwokClient)
+ err = startServer(ctx, flags, ctr, typedKwokClient, nodeSel)
if err != nil {
return err
}
@@ -283,7 +303,7 @@ func runE(ctx context.Context, flags *flagpole) error {
return nil
}
-func startServer(ctx context.Context, flags *flagpole, ctr *controllers.Controller, typedKwokClient versioned.Interface) (err error) {
+func startServer(ctx context.Context, flags *flagpole, ctr *controllers.Controller, typedKwokClient versioned.Interface, nodeSelector internalversion.ManageNodeSelector) (err error) {
logger := log.FromContext(ctx)
serverAddress := flags.Options.ServerAddress
@@ -292,99 +312,95 @@ func startServer(ctx context.Context, flags *flagpole, ctr *controllers.Controll
}
if serverAddress != "" {
- clusterPortForwards := config.FilterWithTypeFromContext[*internalversion.ClusterPortForward](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterPortForwardKind, clusterPortForwards)
- if err != nil {
- return err
+ mangeNode := !nodeSelector.IsEmpty()
+ conf := server.Config{
+ TypedKwokClient: typedKwokClient,
+ NoManageNode: nodeSelector.IsEmpty(),
+ EnableCRDs: flags.Options.EnableCRDs,
+ DataSource: ctr,
+ NodeCacheGetter: ctr.GetNodeCache(),
+ PodCacheGetter: ctr.GetPodCache(),
}
- portForwards := config.FilterWithTypeFromContext[*internalversion.PortForward](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.PortForwardKind, portForwards)
- if err != nil {
- return err
- }
+ if mangeNode {
+ conf.ClusterPortForwards = config.FilterWithTypeFromContext[*internalversion.ClusterPortForward](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterPortForwardKind, conf.ClusterPortForwards)
+ if err != nil {
+ return err
+ }
- clusterExecs := config.FilterWithTypeFromContext[*internalversion.ClusterExec](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterExecKind, clusterExecs)
- if err != nil {
- return err
- }
+ conf.PortForwards = config.FilterWithTypeFromContext[*internalversion.PortForward](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.PortForwardKind, conf.PortForwards)
+ if err != nil {
+ return err
+ }
- execs := config.FilterWithTypeFromContext[*internalversion.Exec](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ExecKind, execs)
- if err != nil {
- return err
- }
+ conf.ClusterExecs = config.FilterWithTypeFromContext[*internalversion.ClusterExec](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterExecKind, conf.ClusterExecs)
+ if err != nil {
+ return err
+ }
- clusterLogs := config.FilterWithTypeFromContext[*internalversion.ClusterLogs](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterLogsKind, clusterLogs)
- if err != nil {
- return err
- }
+ conf.Execs = config.FilterWithTypeFromContext[*internalversion.Exec](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ExecKind, conf.Execs)
+ if err != nil {
+ return err
+ }
- logs := config.FilterWithTypeFromContext[*internalversion.Logs](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.LogsKind, logs)
- if err != nil {
- return err
- }
+ conf.ClusterLogs = config.FilterWithTypeFromContext[*internalversion.ClusterLogs](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterLogsKind, conf.ClusterLogs)
+ if err != nil {
+ return err
+ }
- clusterAttaches := config.FilterWithTypeFromContext[*internalversion.ClusterAttach](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterAttachKind, clusterAttaches)
- if err != nil {
- return err
- }
+ conf.Logs = config.FilterWithTypeFromContext[*internalversion.Logs](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.LogsKind, conf.Logs)
+ if err != nil {
+ return err
+ }
- attaches := config.FilterWithTypeFromContext[*internalversion.Attach](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.AttachKind, attaches)
- if err != nil {
- return err
- }
+ conf.ClusterAttaches = config.FilterWithTypeFromContext[*internalversion.ClusterAttach](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterAttachKind, conf.ClusterAttaches)
+ if err != nil {
+ return err
+ }
- clusterResourceUsages := config.FilterWithTypeFromContext[*internalversion.ClusterResourceUsage](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterResourceUsageKind, clusterResourceUsages)
- if err != nil {
- return err
- }
+ conf.Attaches = config.FilterWithTypeFromContext[*internalversion.Attach](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.AttachKind, conf.Attaches)
+ if err != nil {
+ return err
+ }
- resourceUsages := config.FilterWithTypeFromContext[*internalversion.ResourceUsage](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ResourceUsageKind, resourceUsages)
- if err != nil {
- return err
+ conf.ClusterResourceUsages = config.FilterWithTypeFromContext[*internalversion.ClusterResourceUsage](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterResourceUsageKind, conf.ClusterResourceUsages)
+ if err != nil {
+ return err
+ }
+
+ conf.ResourceUsages = config.FilterWithTypeFromContext[*internalversion.ResourceUsage](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ResourceUsageKind, conf.ResourceUsages)
+ if err != nil {
+ return err
+ }
}
- metrics := config.FilterWithTypeFromContext[*internalversion.Metric](ctx)
- err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.MetricKind, metrics)
+ conf.Metrics = config.FilterWithTypeFromContext[*internalversion.Metric](ctx)
+ err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.MetricKind, conf.Metrics)
if err != nil {
return err
}
- conf := server.Config{
- TypedKwokClient: typedKwokClient,
- EnableCRDs: flags.Options.EnableCRDs,
- ClusterPortForwards: clusterPortForwards,
- PortForwards: portForwards,
- ClusterExecs: clusterExecs,
- Execs: execs,
- ClusterLogs: clusterLogs,
- Logs: logs,
- ClusterAttaches: clusterAttaches,
- Attaches: attaches,
- ClusterResourceUsages: clusterResourceUsages,
- ResourceUsages: resourceUsages,
- Metrics: metrics,
- DataSource: ctr,
- NodeCacheGetter: ctr.GetNodeCache(),
- PodCacheGetter: ctr.GetPodCache(),
- }
svc, err := server.NewServer(conf)
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}
svc.InstallHealthz()
- svc.InstallServiceDiscovery()
+ if mangeNode {
+ svc.InstallServiceDiscovery()
+ }
- if flags.Options.EnableDebuggingHandlers {
+ if mangeNode && flags.Options.EnableDebuggingHandlers {
svc.InstallDebuggingHandlers()
svc.InstallProfilingHandler(flags.Options.EnableProfilingHandler, flags.Options.EnableContentionProfiling)
} else {
diff --git a/pkg/kwok/controllers/controller.go b/pkg/kwok/controllers/controller.go
index 1b39c798cd..dfd77eefb8 100644
--- a/pkg/kwok/controllers/controller.go
+++ b/pkg/kwok/controllers/controller.go
@@ -76,6 +76,7 @@ type Controller struct {
onNodeUnmanagedFunc func(nodeName string)
readOnlyFunc func(nodeName string) bool
+ noManageNode bool
manageNodesWithLabelSelector string
manageNodesWithAnnotationSelector string
manageNodesWithFieldSelector string
@@ -107,6 +108,8 @@ type Config struct {
RESTMapper meta.RESTMapper
TypedClient kubernetes.Interface
TypedKwokClient versioned.Interface
+ Manages internalversion.ManagesSelectors
+ NoManageNode bool
ManageSingleNode string
ManageAllNodes bool
ManageNodesWithAnnotationSelector string
@@ -167,61 +170,65 @@ func (c *Controller) init(ctx context.Context) (err error) {
return fmt.Errorf("controller already started")
}
- switch {
- case c.conf.ManageSingleNode != "":
- c.managePodsWithFieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.conf.ManageSingleNode).String()
- c.manageNodesWithFieldSelector = fields.OneTermEqualSelector("metadata.name", c.conf.ManageSingleNode).String()
- c.manageNodeLeasesWithFieldSelector = fields.OneTermEqualSelector("metadata.name", c.conf.ManageSingleNode).String()
- case c.conf.ManageAllNodes:
- c.managePodsWithFieldSelector = fields.OneTermNotEqualSelector("spec.nodeName", "").String()
- case c.conf.ManageNodesWithLabelSelector != "" || c.conf.ManageNodesWithAnnotationSelector != "":
- c.manageNodesWithLabelSelector = c.conf.ManageNodesWithLabelSelector
- c.manageNodesWithAnnotationSelector = c.conf.ManageNodesWithAnnotationSelector
- c.managePodsWithFieldSelector = fields.OneTermNotEqualSelector("spec.nodeName", "").String()
- }
-
c.broadcaster = record.NewBroadcaster()
c.recorder = c.broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"})
c.broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: c.conf.TypedClient.CoreV1().Events("")})
+ c.patchMeta = patch.NewPatchMetaFromOpenAPI3(c.conf.RESTClient)
- c.nodesChan = make(chan informer.Event[*corev1.Node], 1)
- c.podsChan = make(chan informer.Event[*corev1.Pod], 1)
+ if c.conf.NoManageNode {
+ c.noManageNode = true
+ } else {
+ c.nodesChan = make(chan informer.Event[*corev1.Node], 1)
+ c.podsChan = make(chan informer.Event[*corev1.Pod], 1)
+
+ switch {
+ case c.conf.ManageSingleNode != "":
+ c.managePodsWithFieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.conf.ManageSingleNode).String()
+ c.manageNodesWithFieldSelector = fields.OneTermEqualSelector("metadata.name", c.conf.ManageSingleNode).String()
+ c.manageNodeLeasesWithFieldSelector = fields.OneTermEqualSelector("metadata.name", c.conf.ManageSingleNode).String()
+ case c.conf.ManageAllNodes:
+ c.managePodsWithFieldSelector = fields.OneTermNotEqualSelector("spec.nodeName", "").String()
+ case c.conf.ManageNodesWithLabelSelector != "" || c.conf.ManageNodesWithAnnotationSelector != "":
+ c.manageNodesWithLabelSelector = c.conf.ManageNodesWithLabelSelector
+ c.manageNodesWithAnnotationSelector = c.conf.ManageNodesWithAnnotationSelector
+ c.managePodsWithFieldSelector = fields.OneTermNotEqualSelector("spec.nodeName", "").String()
+ }
- nodesCli := c.conf.TypedClient.CoreV1().Nodes()
- c.nodesInformer = informer.NewInformer[*corev1.Node, *corev1.NodeList](nodesCli)
- c.nodeCacheGetter, err = c.nodesInformer.WatchWithCache(ctx, informer.Option{
- LabelSelector: c.manageNodesWithLabelSelector,
- AnnotationSelector: c.manageNodesWithAnnotationSelector,
- FieldSelector: c.manageNodesWithFieldSelector,
- }, c.nodesChan)
- if err != nil {
- return fmt.Errorf("failed to watch nodes: %w", err)
- }
+ nodesCli := c.conf.TypedClient.CoreV1().Nodes()
+ c.nodesInformer = informer.NewInformer[*corev1.Node, *corev1.NodeList](nodesCli)
+ c.nodeCacheGetter, err = c.nodesInformer.WatchWithCache(ctx, informer.Option{
+ LabelSelector: c.manageNodesWithLabelSelector,
+ AnnotationSelector: c.manageNodesWithAnnotationSelector,
+ FieldSelector: c.manageNodesWithFieldSelector,
+ }, c.nodesChan)
+ if err != nil {
+ return fmt.Errorf("failed to watch nodes: %w", err)
+ }
- podsCli := c.conf.TypedClient.CoreV1().Pods(corev1.NamespaceAll)
- c.podsInformer = informer.NewInformer[*corev1.Pod, *corev1.PodList](podsCli)
+ podsCli := c.conf.TypedClient.CoreV1().Pods(corev1.NamespaceAll)
+ c.podsInformer = informer.NewInformer[*corev1.Pod, *corev1.PodList](podsCli)
- podWatchOption := informer.Option{
- FieldSelector: c.managePodsWithFieldSelector,
- }
- if c.conf.EnablePodCache {
- c.podCacheGetter, err = c.podsInformer.WatchWithLazyCache(ctx, podWatchOption, c.podsChan)
- } else {
- err = c.podsInformer.Watch(ctx, podWatchOption, c.podsChan)
- }
- if err != nil {
- return fmt.Errorf("failed to watch pods: %w", err)
- }
+ podWatchOption := informer.Option{
+ FieldSelector: c.managePodsWithFieldSelector,
+ }
+ if c.conf.EnablePodCache {
+ c.podCacheGetter, err = c.podsInformer.WatchWithLazyCache(ctx, podWatchOption, c.podsChan)
+ } else {
+ err = c.podsInformer.Watch(ctx, podWatchOption, c.podsChan)
+ }
+ if err != nil {
+ return fmt.Errorf("failed to watch pods: %w", err)
+ }
- if c.conf.NodeLeaseDurationSeconds != 0 {
- nodeLeasesCli := c.conf.TypedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease)
- c.nodeLeasesInformer = informer.NewInformer[*coordinationv1.Lease, *coordinationv1.LeaseList](nodeLeasesCli)
- }
+ if c.conf.NodeLeaseDurationSeconds != 0 {
+ nodeLeasesCli := c.conf.TypedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease)
+ c.nodeLeasesInformer = informer.NewInformer[*coordinationv1.Lease, *coordinationv1.LeaseList](nodeLeasesCli)
+ }
- c.patchMeta = patch.NewPatchMetaFromOpenAPI3(c.conf.RESTClient)
+ c.podOnNodeManageQueue = queue.NewQueue[string]()
+ c.nodeManageQueue = queue.NewQueue[string]()
+ }
- c.podOnNodeManageQueue = queue.NewQueue[string]()
- c.nodeManageQueue = queue.NewQueue[string]()
return nil
}
@@ -363,6 +370,7 @@ func (c *Controller) startStageController(ctx context.Context, ref internalversi
func (c *Controller) initStagesManager(ctx context.Context) error {
logger := log.FromContext(ctx)
+ manages := c.conf.Manages
c.stageGetter = resources.NewDynamicGetter[
[]*internalversion.Stage,
*v1alpha1.Stage,
@@ -376,6 +384,10 @@ func (c *Controller) initStagesManager(ctx context.Context) error {
logger.Error("failed to convert to internal stage", err, "obj", obj)
return nil, false
}
+
+ if manages != nil && !manages.MatchStage(r) {
+ return nil, false
+ }
return r, true
})
},
@@ -537,7 +549,12 @@ func (c *Controller) Start(ctx context.Context) error {
}
if len(c.conf.LocalStages) != 0 {
+ manages := c.conf.Manages
+
for ref, stage := range c.conf.LocalStages {
+ if manages != nil {
+ stage = slices.Filter(stage, manages.MatchStage)
+ }
lifecycle, err := lifecycle.NewLifecycle(stage)
if err != nil {
return err
diff --git a/pkg/kwok/server/server.go b/pkg/kwok/server/server.go
index 30608a85dc..f9816abe38 100644
--- a/pkg/kwok/server/server.go
+++ b/pkg/kwok/server/server.go
@@ -55,6 +55,8 @@ type Server struct {
enableCRDs []string
+ noManageNode bool
+
restfulCont *restful.Container
idleTimeout time.Duration
@@ -97,6 +99,8 @@ type Config struct {
TypedKwokClient versioned.Interface
EnableCRDs []string
+ NoManageNode bool
+
ClusterPortForwards []*internalversion.ClusterPortForward
PortForwards []*internalversion.PortForward
ClusterExecs []*internalversion.ClusterExec
@@ -125,6 +129,8 @@ func NewServer(conf Config) (*Server, error) {
idleTimeout: 1 * time.Hour,
streamCreationTimeout: remotecommandconsts.DefaultStreamCreationTimeout,
+ noManageNode: conf.NoManageNode,
+
clusterPortForwards: resources.NewStaticGetter(conf.ClusterPortForwards),
portForwards: resources.NewStaticGetter(conf.PortForwards),
clusterExecs: resources.NewStaticGetter(conf.ClusterExecs),
@@ -161,6 +167,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
for _, crd := range s.enableCRDs {
switch crd {
case v1alpha1.ClusterPortForwardKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.clusterPortForwards.Get()) != 0 {
return nil, fmt.Errorf("cluster port forwards already exists, cannot watch CRD")
}
@@ -184,6 +193,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterPortForwards)
s.clusterPortForwards = clusterPortForwards
case v1alpha1.PortForwardKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.portForwards.Get()) != 0 {
return nil, fmt.Errorf("port forwards already exists, cannot watch CRD")
}
@@ -207,6 +219,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, portForwards)
s.portForwards = portForwards
case v1alpha1.ClusterExecKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.clusterExecs.Get()) != 0 {
return nil, fmt.Errorf("cluster execs already exists, cannot watch CRD")
}
@@ -230,6 +245,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterExecs)
s.clusterExecs = clusterExecs
case v1alpha1.ExecKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.execs.Get()) != 0 {
return nil, fmt.Errorf("execs already exists, cannot watch CRD")
}
@@ -253,6 +271,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, execs)
s.execs = execs
case v1alpha1.ClusterLogsKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.clusterLogs.Get()) != 0 {
return nil, fmt.Errorf("cluster logs already exists, cannot watch CRD")
}
@@ -276,6 +297,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterLogs)
s.clusterLogs = clusterLogs
case v1alpha1.LogsKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.logs.Get()) != 0 {
return nil, fmt.Errorf("logs already exists, cannot watch CRD")
}
@@ -299,6 +323,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, logs)
s.logs = logs
case v1alpha1.ClusterAttachKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.clusterAttaches.Get()) != 0 {
return nil, fmt.Errorf("cluster attaches already exists, cannot watch CRD")
}
@@ -322,6 +349,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterAttaches)
s.clusterAttaches = clusterAttaches
case v1alpha1.AttachKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.attaches.Get()) != 0 {
return nil, fmt.Errorf("attaches already exists, cannot watch CRD")
}
@@ -345,6 +375,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, attaches)
s.attaches = attaches
case v1alpha1.ClusterResourceUsageKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.clusterResourceUsages.Get()) != 0 {
return nil, fmt.Errorf("cluster resource usage already exists, cannot watch CRD")
}
@@ -368,6 +401,9 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterResourceUsages)
s.clusterResourceUsages = clusterResourceUsages
case v1alpha1.ResourceUsageKind:
+ if s.noManageNode {
+ continue
+ }
if len(s.resourceUsages.Get()) != 0 {
return nil, fmt.Errorf("resource usage already exists, cannot watch CRD")
}
diff --git a/site/content/en/docs/generated/apis.md b/site/content/en/docs/generated/apis.md
index 526dc5a77f..b58fb1229a 100644
--- a/site/content/en/docs/generated/apis.md
+++ b/site/content/en/docs/generated/apis.md
@@ -141,6 +141,9 @@ Resource Types:
KwokctlResource
+
+
+Selector
KwokConfiguration
@@ -382,6 +385,110 @@ string
+
+Selector
+ #
+
+
+
+
+
+
+Field |
+Description |
+
+
+
+
+
+apiVersion
+string
+ |
+
+
+config.kwok.x-k8s.io/v1alpha1
+
+ |
+
+
+
+kind
+string
+ |
+Selector |
+
+
+
+kind
+
+string
+
+ |
+
+ |
+
+
+
+group
+
+string
+
+ |
+
+ |
+
+
+
+version
+
+string
+
+ |
+
+ |
+
+
+
+namespace
+
+string
+
+ |
+
+ |
+
+
+
+name
+
+string
+
+ |
+
+ |
+
+
+
+labels
+
+map[string]string
+
+ |
+
+ |
+
+
+
+annotations
+
+map[string]string
+
+ |
+
+ |
+
+
+
kwok.x-k8s.io/v1alpha1
#
@@ -2447,6 +2554,19 @@ is the default value for flag –tls-private-key-file
+manages
+
+
+Selectors
+
+
+ |
+
+ Manages is the option to manage an resources
+ |
+
+
+
manageSingleNode
string
@@ -2457,7 +2577,8 @@ string
is the default value for flag –manage-single-node
Note: when manage-all-nodes is specified as true or
manage-nodes-with-label-selector or manage-nodes-with-annotation-selector is specified,
-this is a no-op.
+this is a no-op.
+Deprecated: use Manages instead
|
@@ -2472,7 +2593,8 @@ bool
is the default value for flag –manage-all-nodes
Note: when manage-single-node
is specified as true or
manage-nodes-with-label-selector
or manage-nodes-with-annotation-selector
is specified,
-this is a no-op.
+this is a no-op.
+Deprecated: use Manages instead
@@ -2487,7 +2609,8 @@ string
is the default value for flag –manage-nodes-with-annotation-selector
Note: when all-node-manage
is specified as true or
manage-single-node
is specified,
-this is a no-op.
+this is a no-op.
+Deprecated: use Manages instead
@@ -2502,7 +2625,8 @@ string
is the default value for flag –manage-nodes-with-label-selector
Note: when all-node-manage
is specified as true or
manage-single-node
is specified,
-this is a no-op.
+this is a no-op.
+Deprecated: use Manages instead
@@ -3845,6 +3969,17 @@ Protocol
+
+Selectors
+([]sigs.k8s.io/kwok/pkg/apis/config/v1alpha1.Selector
alias)
+ #
+
+
+Appears on:
+KwokConfigurationOptions
+
+
+
Volume
#
diff --git a/site/content/en/docs/generated/kwok.md b/site/content/en/docs/generated/kwok.md
index 19c736fc13..dcc515d0b3 100644
--- a/site/content/en/docs/generated/kwok.md
+++ b/site/content/en/docs/generated/kwok.md
@@ -9,23 +9,20 @@ kwok [flags]
### Options
```
- --cidr string CIDR of the pod ip (default "10.0.0.1/24")
- -c, --config strings config path (default [~/.kwok/kwok.yaml])
- --enable-crds strings List of CRDs to enable
- -h, --help help for kwok
- --kubeconfig string Path to the kubeconfig file to use (default "~/.kube/config")
- --manage-all-nodes All nodes will be watched and managed. It's conflicted with manage-nodes-with-annotation-selector, manage-nodes-with-label-selector and manage-single-node.
- --manage-nodes-with-annotation-selector string Nodes that match the annotation selector will be watched and managed. It's conflicted with manage-all-nodes and manage-single-node.
- --manage-nodes-with-label-selector string Nodes that match the label selector will be watched and managed. It's conflicted with manage-all-nodes and manage-single-node.
- --manage-single-node string Node that matches the name will be watched and managed. It's conflicted with manage-nodes-with-annotation-selector, manage-nodes-with-label-selector and manage-all-nodes.
- --master string The address of the Kubernetes API server (overrides any value in kubeconfig).
- --node-ip string IP of the node
- --node-lease-duration-seconds uint Duration of node lease seconds
- --node-name string Name of the node
- --node-port int Port of the node
- --server-address string Address to expose the server on
- --tls-cert-file string File containing the default x509 Certificate for HTTPS
- --tls-private-key-file string File containing the default x509 private key matching --tls-cert-file
- -v, --v log-level number for the log level verbosity (DEBUG, INFO, WARN, ERROR) or (-4, 0, 4, 8) (default INFO)
+ --cidr string CIDR of the pod ip (default "10.0.0.1/24")
+ -c, --config strings config path (default [~/.kwok/kwok.yaml])
+ --enable-crds strings List of CRDs to enable
+ -h, --help help for kwok
+ --kubeconfig string Path to the kubeconfig file to use (default "~/.kube/config")
+ --manage SelectorSlice Manages resources
+ --master string The address of the Kubernetes API server (overrides any value in kubeconfig).
+ --node-ip string IP of the node
+ --node-lease-duration-seconds uint Duration of node lease seconds
+ --node-name string Name of the node
+ --node-port int Port of the node
+ --server-address string Address to expose the server on
+ --tls-cert-file string File containing the default x509 Certificate for HTTPS
+ --tls-private-key-file string File containing the default x509 private key matching --tls-cert-file
+ -v, --v log-level number for the log level verbosity (DEBUG, INFO, WARN, ERROR) or (-4, 0, 4, 8) (default INFO)
```