diff --git a/apis/apps/v1alpha1/node_pod_probe_types.go b/apis/apps/v1alpha1/node_pod_probe_types.go index 9de5a80eae..772bc5f5f2 100644 --- a/apis/apps/v1alpha1/node_pod_probe_types.go +++ b/apis/apps/v1alpha1/node_pod_probe_types.go @@ -32,6 +32,8 @@ type PodProbe struct { Namespace string `json:"namespace"` // pod uid UID string `json:"uid"` + // pod ip + IP string `json:"IP"` // Custom container probe, supports Exec, Tcp, and returns the result to Pod yaml Probes []ContainerProbe `json:"probes,omitempty"` } diff --git a/config/crd/bases/apps.kruise.io_nodepodprobes.yaml b/config/crd/bases/apps.kruise.io_nodepodprobes.yaml index c9e7a23356..65b2269e4a 100644 --- a/config/crd/bases/apps.kruise.io_nodepodprobes.yaml +++ b/config/crd/bases/apps.kruise.io_nodepodprobes.yaml @@ -38,6 +38,9 @@ spec: podProbes: items: properties: + IP: + description: pod ip + type: string name: description: pod name type: string @@ -221,6 +224,7 @@ spec: description: pod uid type: string required: + - IP - name - namespace - uid diff --git a/pkg/controller/podprobemarker/pod_probe_marker_controller.go b/pkg/controller/podprobemarker/pod_probe_marker_controller.go index 3ff4f38524..50a1472f30 100644 --- a/pkg/controller/podprobemarker/pod_probe_marker_controller.go +++ b/pkg/controller/podprobemarker/pod_probe_marker_controller.go @@ -23,17 +23,10 @@ import ( "reflect" "strings" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/features" - "github.com/openkruise/kruise/pkg/util" - utilclient "github.com/openkruise/kruise/pkg/util/client" - "github.com/openkruise/kruise/pkg/util/controllerfinder" - utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" - utilfeature "github.com/openkruise/kruise/pkg/util/feature" - "github.com/openkruise/kruise/pkg/util/ratelimiter" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" kubecontroller "k8s.io/kubernetes/pkg/controller" @@ -44,6 +37,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/features" + "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" + "github.com/openkruise/kruise/pkg/util/controllerfinder" + utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" + "github.com/openkruise/kruise/pkg/util/ratelimiter" ) func init() { @@ -234,15 +236,47 @@ func (r *ReconcilePodProbeMarker) updateNodePodProbes(ppm *appsv1alpha1.PodProbe exist = true for j := range ppm.Spec.Probes { probe := ppm.Spec.Probes[j] + if podProbe.IP == "" { + podProbe.IP = pod.Status.PodIP + } + if probe.Probe.TCPSocket != nil { + probe, err = convertTcpSocketProbeCheckPort(probe, pod) + if err != nil { + klog.Errorf("Failed to convert tcpSocket probe port, err: %v, pod: %v/%v", err, pod.Namespace, pod.Name) + continue + } + } + if probe.Probe.HTTPGet != nil { + probe, err = convertHttpGetProbeCheckPort(probe, pod) + if err != nil { + klog.Errorf("Failed to convert httpGet probe port, err: %v, pod: %v/%v", err, pod.Namespace, pod.Name) + continue + } + } setPodContainerProbes(podProbe, probe, ppm.Name) } break } } if !exist { - podProbe := appsv1alpha1.PodProbe{Name: pod.Name, Namespace: pod.Namespace, UID: string(pod.UID)} + podProbe := appsv1alpha1.PodProbe{Name: pod.Name, Namespace: pod.Namespace, UID: string(pod.UID), IP: pod.Status.PodIP} for j := range ppm.Spec.Probes { probe := ppm.Spec.Probes[j] + // look up a port in a container by name & convert container name port + if probe.Probe.TCPSocket != nil { + probe, err = convertTcpSocketProbeCheckPort(probe, pod) + if err != nil { + klog.Errorf("Failed to convert tcpSocket probe port, err: %v, pod: %v/%v", err, pod.Namespace, pod.Name) + continue + } + } + if probe.Probe.HTTPGet != nil { + probe, err = convertHttpGetProbeCheckPort(probe, pod) + if err != nil { + klog.Errorf("Failed to convert httpGet probe port, err: %v, pod: %v/%v", err, pod.Namespace, pod.Name) + continue + } + } podProbe.Probes = append(podProbe.Probes, appsv1alpha1.ContainerProbe{ Name: fmt.Sprintf("%s#%s", ppm.Name, probe.Name), ContainerName: probe.ContainerName, @@ -266,6 +300,43 @@ func (r *ReconcilePodProbeMarker) updateNodePodProbes(ppm *appsv1alpha1.PodProbe return nil } +func convertHttpGetProbeCheckPort(probe appsv1alpha1.PodContainerProbe, pod *corev1.Pod) (appsv1alpha1.PodContainerProbe, error) { + probeNew := probe.DeepCopy() + if probe.Probe.HTTPGet.Port.Type == intstr.Int { + return *probeNew, nil + } + container := util.GetPodContainerByName(probe.ContainerName, pod) + if container == nil { + return *probeNew, fmt.Errorf("Failed to get container by name: %v in pod: %v/%v", probe.ContainerName, pod.Namespace, pod.Name) + } + portInt, err := util.ExtractPort(probe.Probe.HTTPGet.Port, *container) + if err != nil { + return *probeNew, fmt.Errorf("Failed to extract port for container: %v in pod: %v/%v", container.Name, pod.Namespace, pod.Name) + } + // If you need to parse integer values with specific bit sizes, avoid strconv.Atoi, + // and instead use strconv.ParseInt or strconv.ParseUint, which also allow specifying the bit size. + // https://codeql.github.com/codeql-query-help/go/go-incorrect-integer-conversion/ + probeNew.Probe.HTTPGet.Port = intstr.FromInt(portInt) + return *probeNew, nil +} + +func convertTcpSocketProbeCheckPort(probe appsv1alpha1.PodContainerProbe, pod *corev1.Pod) (appsv1alpha1.PodContainerProbe, error) { + probeNew := probe.DeepCopy() + if probe.Probe.TCPSocket.Port.Type == intstr.Int { + return *probeNew, nil + } + container := util.GetPodContainerByName(probe.ContainerName, pod) + if container == nil { + return *probeNew, fmt.Errorf("Failed to get container by name: %v in pod: %v/%v", probe.ContainerName, pod.Namespace, pod.Name) + } + portInt, err := util.ExtractPort(probe.Probe.TCPSocket.Port, *container) + if err != nil { + return *probeNew, fmt.Errorf("Failed to extract port for container: %v in pod: %v/%v", container.Name, pod.Namespace, pod.Name) + } + probeNew.Probe.TCPSocket.Port = intstr.FromInt(portInt) + return *probeNew, nil +} + func setPodContainerProbes(podProbe *appsv1alpha1.PodProbe, probe appsv1alpha1.PodContainerProbe, ppmName string) { newProbe := appsv1alpha1.ContainerProbe{ Name: fmt.Sprintf("%s#%s", ppmName, probe.Name), diff --git a/pkg/controller/podprobemarker/pod_probe_marker_controller_test.go b/pkg/controller/podprobemarker/pod_probe_marker_controller_test.go index 1fc6296cf6..dbf8b028d2 100644 --- a/pkg/controller/podprobemarker/pod_probe_marker_controller_test.go +++ b/pkg/controller/podprobemarker/pod_probe_marker_controller_test.go @@ -18,20 +18,23 @@ package podprobemarker import ( "context" + "fmt" "reflect" "testing" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/util/controllerfinder" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" + "github.com/openkruise/kruise/pkg/util/controllerfinder" ) func init() { @@ -92,6 +95,106 @@ var ( }, } + demoPodProbeMarkerForTcpCheck = appsv1alpha1.PodProbeMarker{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ppm-1", + }, + Spec: appsv1alpha1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test", + }, + }, + Probes: []appsv1alpha1.PodContainerProbe{ + { + Name: "tcpCheckHealthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "main-port"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsv1alpha1.ProbeMarkerPolicy{ + { + State: appsv1alpha1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsv1alpha1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + } + + demoPodProbeMarkerForHttpCheck = appsv1alpha1.PodProbeMarker{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ppm-1", + }, + Spec: appsv1alpha1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test", + }, + }, + Probes: []appsv1alpha1.PodContainerProbe{ + { + Name: "httpCheckHealthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/index.html", + Port: intstr.IntOrString{Type: intstr.String, StrVal: "main-port"}, + Scheme: corev1.URISchemeHTTP, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsv1alpha1.ProbeMarkerPolicy{ + { + State: appsv1alpha1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsv1alpha1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + } + demoNodePodProbe = appsv1alpha1.NodePodProbe{ ObjectMeta: metav1.ObjectMeta{ Name: "node-1", @@ -660,6 +763,7 @@ func TestSyncPodProbeMarker(t *testing.T) { return []*appsv1alpha1.NodePodProbe{demo} }, }, + { name: "test7, NodePodProbes changed", req: ctrl.Request{ @@ -774,55 +878,919 @@ func TestSyncPodProbeMarker(t *testing.T) { return []*appsv1alpha1.NodePodProbe{demo} }, }, - } - for _, cs := range cases { - t.Run(cs.name, func(t *testing.T) { - fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() - for _, obj := range cs.getPods() { - err := fakeClient.Create(context.TODO(), obj.DeepCopy()) - if err != nil { - t.Fatalf("create Pod failed: %s", err.Error()) + { + name: "test8, merge NodePodProbes(failed to convert tcpSocketProbe check port)", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: demoPodProbeMarkerForTcpCheck.Name, + }, + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: types.UID("pod-1-uid"), + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main-v2", + Ports: []corev1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 9090, + }, + }, + }, + }, + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodInitialized, + Status: corev1.ConditionTrue, + }, + }, + }, + }, } - } - for _, obj := range cs.getPodProbeMarkers() { - err := fakeClient.Create(context.TODO(), obj.DeepCopy()) - if err != nil { - t.Fatalf("create PodProbeMarker failed: %s", err.Error()) + return pods + }, + getPodProbeMarkers: func() []*appsv1alpha1.PodProbeMarker { + ppms := []*appsv1alpha1.PodProbeMarker{ + demoPodProbeMarkerForTcpCheck.DeepCopy(), } - } - for _, obj := range cs.getNodePodProbes() { - err := fakeClient.Create(context.TODO(), obj.DeepCopy()) - if err != nil { - t.Fatalf("create NodePodProbes failed: %s", err.Error()) + return ppms + }, + getNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + return []*appsv1alpha1.NodePodProbe{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + UID: "pod-1-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, } - } - - controllerfinder.Finder = &controllerfinder.ControllerFinder{Client: fakeClient} - recon := ReconcilePodProbeMarker{Client: fakeClient} - _, err := recon.Reconcile(context.TODO(), cs.req) - if err != nil { - t.Fatalf("Reconcile failed: %s", err.Error()) - } - if !checkNodePodProbeEqual(fakeClient, t, cs.expectNodePodProbes()) { - t.Fatalf("Reconcile failed") - } - }) - } -} + }, + expectNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[0].Probes = []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + } + return []*appsv1alpha1.NodePodProbe{demo} + }, + }, -func checkNodePodProbeEqual(c client.WithWatch, t *testing.T, expect []*appsv1alpha1.NodePodProbe) bool { - for i := range expect { - obj := expect[i] - npp := &appsv1alpha1.NodePodProbe{} - err := c.Get(context.TODO(), client.ObjectKey{Namespace: obj.Namespace, Name: obj.Name}, npp) - if err != nil { - t.Fatalf("get NodePodProbe failed: %s", err.Error()) - return false - } - if !reflect.DeepEqual(obj.Spec, npp.Spec) { - return false - } - } - return true + { + name: "test8, merge NodePodProbes(failed to convert httpGetProbe check port)", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: demoPodProbeMarkerForHttpCheck.Name, + }, + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: types.UID("pod-1-uid"), + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main-v2", + Ports: []corev1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 9090, + }, + }, + }, + }, + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodInitialized, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + return pods + }, + getPodProbeMarkers: func() []*appsv1alpha1.PodProbeMarker { + ppms := []*appsv1alpha1.PodProbeMarker{ + demoPodProbeMarkerForHttpCheck.DeepCopy(), + } + return ppms + }, + getNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + return []*appsv1alpha1.NodePodProbe{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + UID: "pod-1-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + }, + expectNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[0].Probes = []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + } + return []*appsv1alpha1.NodePodProbe{demo} + }, + }, + + { + name: "test-test", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: demoPodProbeMarkerForTcpCheck.Name, + }, + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: types.UID("pod-1-uid"), + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodInitialized, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + return pods + }, + getPodProbeMarkers: func() []*appsv1alpha1.PodProbeMarker { + ppms := []*appsv1alpha1.PodProbeMarker{ + demoPodProbeMarkerForTcpCheck.DeepCopy(), + } + return ppms + }, + getNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + return []*appsv1alpha1.NodePodProbe{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + UID: "pod-1-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "log", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/home/admin/healthy.sh"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + }, + expectNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[0].Probes = []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "log", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/home/admin/healthy.sh"}, + }, + }, + }, + }, + }, + } + return []*appsv1alpha1.NodePodProbe{demo} + }, + }, + + { + name: "test-test-http", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: demoPodProbeMarkerForHttpCheck.Name, + }, + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: types.UID("pod-1-uid"), + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodInitialized, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + return pods + }, + getPodProbeMarkers: func() []*appsv1alpha1.PodProbeMarker { + ppms := []*appsv1alpha1.PodProbeMarker{ + demoPodProbeMarkerForHttpCheck.DeepCopy(), + } + return ppms + }, + getNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + return []*appsv1alpha1.NodePodProbe{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + UID: "pod-1-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "log", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/home/admin/healthy.sh"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + }, + expectNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[0].Probes = []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "log", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/home/admin/healthy.sh"}, + }, + }, + }, + }, + }, + } + return []*appsv1alpha1.NodePodProbe{demo} + }, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + for _, obj := range cs.getPods() { + err := fakeClient.Create(context.TODO(), obj.DeepCopy()) + if err != nil { + t.Fatalf("create Pod failed: %s", err.Error()) + } + } + for _, obj := range cs.getPodProbeMarkers() { + err := fakeClient.Create(context.TODO(), obj.DeepCopy()) + if err != nil { + t.Fatalf("create PodProbeMarker failed: %s", err.Error()) + } + } + for _, obj := range cs.getNodePodProbes() { + err := fakeClient.Create(context.TODO(), obj.DeepCopy()) + if err != nil { + t.Fatalf("create NodePodProbes failed: %s", err.Error()) + } + } + + controllerfinder.Finder = &controllerfinder.ControllerFinder{Client: fakeClient} + recon := ReconcilePodProbeMarker{Client: fakeClient} + _, err := recon.Reconcile(context.TODO(), cs.req) + if err != nil { + t.Fatalf("Reconcile failed: %s", err.Error()) + } + if !checkNodePodProbeEqual(fakeClient, t, cs.expectNodePodProbes()) { + t.Fatalf("Reconcile failed") + } + }) + } +} + +func checkNodePodProbeEqual(c client.WithWatch, t *testing.T, expect []*appsv1alpha1.NodePodProbe) bool { + for i := range expect { + obj := expect[i] + npp := &appsv1alpha1.NodePodProbe{} + err := c.Get(context.TODO(), client.ObjectKey{Namespace: obj.Namespace, Name: obj.Name}, npp) + if err != nil { + t.Fatalf("get NodePodProbe failed: %s", err.Error()) + return false + } + t.Logf("expect: %v --> but: %v", util.DumpJSON(obj.Spec), util.DumpJSON(npp.Spec)) + if !reflect.DeepEqual(obj.Spec, npp.Spec) { + return false + } + } + return true +} + +func TestConvertTcpSocketProbeCheckPort(t *testing.T) { + cases := []struct { + name string + probe appsv1alpha1.PodContainerProbe + pod *corev1.Pod + expectErr error + exportProbe appsv1alpha1.PodContainerProbe + }{ + { + name: "convert tcpProbe port", + probe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "main-port"}, + }, + }, + }, + }, + }, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "sp1", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Ports: []corev1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 80, + }, + }, + }, + }, + }, + }, + exportProbe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: 80}, + }, + }, + }, + }, + }, + }, + { + name: "no convert tcpProbe port(int type)", + probe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: 80}, + }, + }, + }, + }, + }, + pod: &corev1.Pod{}, + exportProbe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: 80}, + }, + }, + }, + }, + }, + }, + { + name: "convert tcpProbe port error(no found container)", + probe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main-fake", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "main-port"}, + }, + }, + }, + }, + }, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "sp1", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Ports: []corev1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 80, + }, + }, + }, + }, + }, + }, + expectErr: fmt.Errorf("Failed to get container by name: main-fake in pod: sp1/pod1"), + exportProbe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main-fake", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "main-port"}, + }, + }, + }, + }, + }, + }, + { + name: "convert tcpProbe port error(failed to extract port)", + probe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main-fake", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "main-port"}, + }, + }, + }, + }, + }, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "sp1", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Ports: []corev1.ContainerPort{ + { + Name: "main-port", + ContainerPort: -1, + }, + }, + }, + }, + }, + }, + exportProbe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main-fake", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "main-port"}, + }, + }, + }, + }, + }, + expectErr: fmt.Errorf("Failed to get container by name: main-fake in pod: sp1/pod1"), + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + get, err := convertTcpSocketProbeCheckPort(cs.probe, cs.pod) + if !reflect.DeepEqual(cs.expectErr, err) { + t.Errorf("get probeProbe by container name failed, err: %v", err) + } + if !reflect.DeepEqual(get, cs.exportProbe) { + t.Errorf("expect: %v, but: %v", util.DumpJSON(cs.exportProbe), util.DumpJSON(get)) + } + }) + } +} + +func TestTestConvertHttpGetProbeCheckPort(t *testing.T) { + cases := []struct { + name string + probe appsv1alpha1.PodContainerProbe + pod *corev1.Pod + expectErr error + exportProbe appsv1alpha1.PodContainerProbe + }{ + { + name: "convert httpProbe port", + probe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "main-port"}, + Path: "/index.html", + Scheme: corev1.URISchemeHTTP, + }, + }, + }, + }, + }, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "sp1", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Ports: []corev1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 80, + }, + }, + }, + }, + }, + }, + exportProbe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: 80}, + Path: "/index.html", + Scheme: corev1.URISchemeHTTP, + }, + }, + }, + }, + }, + }, + { + name: "no convert httpProbe port(int type)", + probe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: 80}, + Path: "/index.html", + Scheme: corev1.URISchemeHTTP, + }, + }, + }, + }, + }, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "sp1", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Ports: []corev1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 80, + }, + }, + }, + }, + }, + }, + exportProbe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: 80}, + Path: "/index.html", + Scheme: corev1.URISchemeHTTP, + }, + }, + }, + }, + }, + }, + + { + name: "convert httpProbe port error(no found container)", + probe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main-fake", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "main-port"}, + Path: "/index.html", + Scheme: corev1.URISchemeHTTP, + }, + }, + }, + }, + }, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "sp1", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Ports: []corev1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 80, + }, + }, + }, + }, + }, + }, + expectErr: fmt.Errorf("Failed to get container by name: main-fake in pod: sp1/pod1"), + exportProbe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main-fake", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "main-port"}, + Path: "/index.html", + Scheme: corev1.URISchemeHTTP, + }, + }, + }, + }, + }, + }, + { + name: "convert httpProbe port error(failed to extract port)", + probe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "main-port"}, + Path: "/index.html", + Scheme: corev1.URISchemeHTTP, + }, + }, + }, + }, + }, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "sp1", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Ports: []corev1.ContainerPort{ + { + Name: "main-port", + ContainerPort: -1, + }, + }, + }, + }, + }, + }, + expectErr: fmt.Errorf("Failed to extract port for container: main in pod: sp1/pod1"), + exportProbe: appsv1alpha1.PodContainerProbe{ + Name: "probe#main", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "main-port"}, + Path: "/index.html", + Scheme: corev1.URISchemeHTTP, + }, + }, + }, + }, + }, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + get, err := convertHttpGetProbeCheckPort(cs.probe, cs.pod) + if !reflect.DeepEqual(cs.expectErr, err) { + t.Errorf("get probeProbe by container name failed, err: %v", err) + } + if !reflect.DeepEqual(get, cs.exportProbe) { + t.Errorf("expect: %v, but: %v", util.DumpJSON(cs.exportProbe), util.DumpJSON(get)) + } + }) + } } diff --git a/pkg/controller/podprobemarker/podprobemarker_event_handler.go b/pkg/controller/podprobemarker/podprobemarker_event_handler.go index 7afdfa5898..45a00ea6c1 100644 --- a/pkg/controller/podprobemarker/podprobemarker_event_handler.go +++ b/pkg/controller/podprobemarker/podprobemarker_event_handler.go @@ -19,9 +19,6 @@ package podprobemarker import ( "context" - appsalphav1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/util" - utilclient "github.com/openkruise/kruise/pkg/util/client" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -33,6 +30,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + appsalphav1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" ) var _ handler.EventHandler = &enqueueRequestForPodProbeMarker{} @@ -95,11 +96,14 @@ func (p *enqueueRequestForPod) Update(evt event.UpdateEvent, q workqueue.RateLim if newInitialCondition == nil { return } - if kubecontroller.IsPodActive(new) && (oldInitialCondition == nil || oldInitialCondition.Status == corev1.ConditionFalse) && - newInitialCondition.Status == corev1.ConditionTrue { + if !kubecontroller.IsPodActive(new) { + return + } + if ((oldInitialCondition == nil || oldInitialCondition.Status == corev1.ConditionFalse) && + newInitialCondition.Status == corev1.ConditionTrue) || old.Status.PodIP != new.Status.PodIP { ppms, err := p.getPodProbeMarkerForPod(new) if err != nil { - klog.Errorf("List PodProbeMarker fialed: %s", err.Error()) + klog.Errorf("List PodProbeMarker fail: %s", err.Error()) return } for _, ppm := range ppms { diff --git a/pkg/controller/podprobemarker/podprobemarker_event_handler_test.go b/pkg/controller/podprobemarker/podprobemarker_event_handler_test.go new file mode 100644 index 0000000000..29c061dbfc --- /dev/null +++ b/pkg/controller/podprobemarker/podprobemarker_event_handler_test.go @@ -0,0 +1,918 @@ +package podprobemarker + +import ( + "context" + "fmt" + "reflect" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + utilpointer "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/event" + + appsalphav1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" +) + +var ( + podDemo = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Labels: map[string]string{"app": "nginx", "pub-controller": "true"}, + Annotations: map[string]string{}, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: "nginx", + UID: types.UID("606132e0-85ef-460a-8cf5-cd8f915a8cc3"), + Controller: utilpointer.BoolPtr(true), + }, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx:v1", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "nginx", + Image: "nginx:v1", + ImageID: "nginx@sha256:a9286defaba7b3a519d585ba0e37d0b2cbee74ebfe590960b0b1d6a5e97d1e1d", + Ready: true, + }, + }, + }, + } +) + +func TestPodUpdateEventHandler(t *testing.T) { + newPod := podDemo.DeepCopy() + newPod.ResourceVersion = fmt.Sprintf("%d", time.Now().Unix()) + + updateQ := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + updateEvent := event.UpdateEvent{ + ObjectOld: podDemo, + ObjectNew: newPod, + } + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + handler := enqueueRequestForPod{reader: fakeClient} + + // update with pod no PodInitialized, then return + handler.Update(updateEvent, updateQ) + if updateQ.Len() != 0 { + t.Errorf("unexpected update event handle queue size, expected 1 actual %d", updateQ.Len()) + } + + // update with pod, pod is not active, then return + newPod = podDemo.DeepCopy() + newPod.ResourceVersion = fmt.Sprintf("%d", time.Now().Unix()) + newPod.Status.Phase = corev1.PodSucceeded + + // update with pod status changed and reconcile + handler.Update(updateEvent, updateQ) + if updateQ.Len() != 0 { + t.Errorf("unexpected update event handle queue size, expected 1 actual %d", updateQ.Len()) + } + + // parse pod error + updateEvent = event.UpdateEvent{ + ObjectOld: nil, + ObjectNew: nil, + } + handler.Update(updateEvent, updateQ) + if updateQ.Len() != 0 { + t.Errorf("unexpected update event handle queue size, expected 1 actual %d", updateQ.Len()) + } + +} + +func TestPodUpdateEventHandler_v2(t *testing.T) { + cases := []struct { + name string + ppmList *appsalphav1.PodProbeMarkerList + expectQLen int + }{ + { + name: "podUpdateEvent, exist podProbeMarker", + ppmList: &appsalphav1.PodProbeMarkerList{ + Items: []appsalphav1.PodProbeMarker{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v1", + Namespace: "default", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "nginx", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v2", + Namespace: "default", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "nginx-v2", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v3", + Namespace: "default", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "nginx-v3", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + }, + }, + expectQLen: 1, + }, + { + name: "podUpdateEvent, no exist podProbeMarker", + ppmList: &appsalphav1.PodProbeMarkerList{ + Items: []appsalphav1.PodProbeMarker{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v1", + Namespace: "default", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "nginx-v1", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v2", + Namespace: "default", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "nginx-v2", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v3", + Namespace: "default", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "nginx-v3", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + }, + }, + expectQLen: 0, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + handler := enqueueRequestForPod{reader: fakeClient} + for _, ppm := range cs.ppmList.Items { + fakeClient.Create(context.TODO(), &ppm) + } + newPod := podDemo.DeepCopy() + newPod.ResourceVersion = fmt.Sprintf("%d", time.Now().Unix()) + util.SetPodCondition(newPod, corev1.PodCondition{ + Type: corev1.PodInitialized, + Status: corev1.ConditionTrue, + }) + util.SetPodCondition(podDemo, corev1.PodCondition{ + Type: corev1.PodInitialized, + Status: corev1.ConditionFalse, + }) + + updateQ := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + updateEvent := event.UpdateEvent{ + ObjectOld: podDemo, + ObjectNew: newPod, + } + handler.Update(updateEvent, updateQ) + if updateQ.Len() != cs.expectQLen { + t.Errorf("unexpected update event handle queue size, expected %v actual %d", cs.expectQLen, updateQ.Len()) + } + }) + } +} + +func TestGetPodProbeMarkerForPod(t *testing.T) { + + cases := []struct { + name string + ppmList *appsalphav1.PodProbeMarkerList + pod *corev1.Pod + expect []*appsalphav1.PodProbeMarker + expectErr error + }{ + { + name: "get pod probe marker list resources", + ppmList: &appsalphav1.PodProbeMarkerList{ + Items: []appsalphav1.PodProbeMarker{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v1", + Namespace: "sp1", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-v1", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v2", + Namespace: "sp1", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-v2", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v3", + Namespace: "sp1", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-v2", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + }, + }, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "sp1", + Labels: map[string]string{ + "app": "test-v2", + }, + }, + }, + expect: []*appsalphav1.PodProbeMarker{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v2", + Namespace: "sp1", + ResourceVersion: "1", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-v2", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v3", + Namespace: "sp1", + ResourceVersion: "1", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-v2", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + }, + expectErr: nil, + }, + { + name: "no found pod probe marker list resources", + ppmList: &appsalphav1.PodProbeMarkerList{ + Items: []appsalphav1.PodProbeMarker{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v1", + Namespace: "sp1", + ResourceVersion: "01", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-v1", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v2", + Namespace: "sp1", + ResourceVersion: "01", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-v2", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "game-server-probe-v3", + Namespace: "sp1", + ResourceVersion: "01", + }, + Spec: appsalphav1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-v2", + }, + }, + Probes: []appsalphav1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsalphav1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsalphav1.ProbeMarkerPolicy{ + { + State: appsalphav1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsalphav1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + }, + }, + }, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "sp1", + Labels: map[string]string{ + "app": "test", + }, + }, + }, + expect: nil, + expectErr: nil, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + for _, ppm := range cs.ppmList.Items { + fakeClient.Create(context.TODO(), &ppm) + } + handler := enqueueRequestForPod{reader: fakeClient} + get, err := handler.getPodProbeMarkerForPod(cs.pod) + if !reflect.DeepEqual(cs.expectErr, err) { + t.Errorf("expectErr: %v, but: %v", cs.expectErr, err) + } + if !reflect.DeepEqual(util.DumpJSON(cs.expect), util.DumpJSON(get)) { + t.Errorf("expectGet: %v, but: %v", util.DumpJSON(cs.expect), util.DumpJSON(get)) + } + }) + } +} diff --git a/pkg/controller/sidecarset/sidecarset_hotupgrade_test.go b/pkg/controller/sidecarset/sidecarset_hotupgrade_test.go index a7d5056526..27ea9bdb72 100644 --- a/pkg/controller/sidecarset/sidecarset_hotupgrade_test.go +++ b/pkg/controller/sidecarset/sidecarset_hotupgrade_test.go @@ -22,6 +22,7 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" "github.com/openkruise/kruise/pkg/control/sidecarcontrol" + "github.com/openkruise/kruise/pkg/util" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -258,7 +259,7 @@ func testUpdateHotUpgradeSidecar(t *testing.T, hotUpgradeEmptyImage string, side } podInput = podOutput.DeepCopy() for cName, infos := range cs.expectedInfo { - sidecarContainer := getPodContainerByName(cName, podOutput) + sidecarContainer := util.GetPodContainerByName(cName, podOutput) if infos[0] != sidecarContainer.Image { t.Fatalf("expect pod(%s) container(%s) image(%s), but get image(%s)", pod.Name, sidecarContainer.Name, infos[0], sidecarContainer.Image) } @@ -293,13 +294,3 @@ func testUpdateHotUpgradeSidecar(t *testing.T, hotUpgradeEmptyImage string, side }) } } - -func getPodContainerByName(cName string, pod *corev1.Pod) *corev1.Container { - for _, container := range pod.Spec.Containers { - if cName == container.Name { - return &container - } - } - - return nil -} diff --git a/pkg/daemon/podprobe/pod_probe_controller.go b/pkg/daemon/podprobe/pod_probe_controller.go index 8e92700dad..8ff687cc60 100644 --- a/pkg/daemon/podprobe/pod_probe_controller.go +++ b/pkg/daemon/podprobe/pod_probe_controller.go @@ -26,15 +26,6 @@ import ( "sync" "time" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/client" - kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned" - clientalpha1 "github.com/openkruise/kruise/pkg/client/clientset/versioned/typed/apps/v1alpha1" - listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" - daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime" - daemonoptions "github.com/openkruise/kruise/pkg/daemon/options" - "github.com/openkruise/kruise/pkg/daemon/util" - commonutil "github.com/openkruise/kruise/pkg/util" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -51,6 +42,16 @@ import ( "k8s.io/gengo/examples/set-gen/sets" "k8s.io/klog/v2" kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/client" + kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned" + clientalpha1 "github.com/openkruise/kruise/pkg/client/clientset/versioned/typed/apps/v1alpha1" + listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" + daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime" + daemonoptions "github.com/openkruise/kruise/pkg/daemon/options" + "github.com/openkruise/kruise/pkg/daemon/util" + commonutil "github.com/openkruise/kruise/pkg/util" ) const ( @@ -63,6 +64,7 @@ type probeKey struct { podNs string podName string podUID string + podIP string containerName string probeName string } @@ -249,7 +251,7 @@ func (c *Controller) sync() error { c.workerLock.Lock() validWorkers := map[probeKey]struct{}{} for _, podProbe := range npp.Spec.PodProbes { - key := probeKey{podNs: podProbe.Namespace, podName: podProbe.Name, podUID: podProbe.UID} + key := probeKey{podNs: podProbe.Namespace, podName: podProbe.Name, podUID: podProbe.UID, podIP: podProbe.IP} for i := range podProbe.Probes { probe := podProbe.Probes[i] key.containerName = probe.ContainerName diff --git a/pkg/daemon/podprobe/pod_probe_controller_test.go b/pkg/daemon/podprobe/pod_probe_controller_test.go index 1daf0672ed..f013d0a5f1 100644 --- a/pkg/daemon/podprobe/pod_probe_controller_test.go +++ b/pkg/daemon/podprobe/pod_probe_controller_test.go @@ -22,16 +22,18 @@ import ( "testing" "time" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/client/clientset/versioned/fake" - listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" - commonutil "github.com/openkruise/kruise/pkg/util" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/client/clientset/versioned/fake" + listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" + commonutil "github.com/openkruise/kruise/pkg/util" ) var ( @@ -44,6 +46,7 @@ var ( { Name: "pod-0", UID: "pod-0-uid", + IP: "1.1.1.1", Probes: []appsv1alpha1.ContainerProbe{ { Name: "ppm-1#healthy", @@ -64,6 +67,7 @@ var ( { Name: "pod-1", UID: "pod-1-uid", + IP: "2.2.2.2", Probes: []appsv1alpha1.ContainerProbe{ { Name: "ppm-1#healthy", @@ -81,6 +85,27 @@ var ( }, }, }, + { + Name: "pod-2", + UID: "pod-2-uid", + IP: "3.3.3.3", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-1#tcpCheck", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(8000)}, + Host: "3.3.3.3", + }, + }, + }, + }, + }, + }, + }, }, }, } @@ -96,7 +121,7 @@ func TestUpdateNodePodProbeStatus(t *testing.T) { { name: "test1, update pod probe status", getUpdate: func() Update { - return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} + return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} }, getNodePodProbe: func() *appsv1alpha1.NodePodProbe { demo := demoNodePodProbe.DeepCopy() @@ -144,10 +169,11 @@ func TestUpdateNodePodProbeStatus(t *testing.T) { return obj }, }, + { name: "test2, update pod probe status", getUpdate: func() Update { - return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} + return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} }, getNodePodProbe: func() *appsv1alpha1.NodePodProbe { demo := demoNodePodProbe.DeepCopy() @@ -227,10 +253,11 @@ func TestUpdateNodePodProbeStatus(t *testing.T) { return obj }, }, + { name: "test3, update pod probe status", getUpdate: func() Update { - return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} + return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} }, getNodePodProbe: func() *appsv1alpha1.NodePodProbe { demo := demoNodePodProbe.DeepCopy() @@ -286,6 +313,7 @@ func TestUpdateNodePodProbeStatus(t *testing.T) { return obj }, }, + { name: "test4, update pod probe status", getUpdate: func() Update { @@ -412,7 +440,7 @@ func TestSyncNodePodProbe(t *testing.T) { }, setWorkers: func(c *Controller) { c.workers = map[probeKey]*worker{} - key1 := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#check"} + key1 := probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#check"} c.workers[key1] = newWorker(c, key1, &appsv1alpha1.ContainerProbeSpec{ Probe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ @@ -423,7 +451,7 @@ func TestSyncNodePodProbe(t *testing.T) { }, }) go c.workers[key1].run() - key2 := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"} + key2 := probeKey{"", "pod-2", "pod-2-uid", "3.3.3.3", "main", "ppm-1#tcpCheck"} c.workers[key2] = newWorker(c, key2, &appsv1alpha1.ContainerProbeSpec{ Probe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ @@ -437,8 +465,8 @@ func TestSyncNodePodProbe(t *testing.T) { }, expectWorkers: func(c *Controller) map[probeKey]*worker { expect := map[probeKey]*worker{} - key := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"} - expect[key] = newWorker(c, key, &appsv1alpha1.ContainerProbeSpec{ + key1 := probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"} + expect[key1] = newWorker(c, key1, &appsv1alpha1.ContainerProbeSpec{ Probe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ Exec: &corev1.ExecAction{ @@ -448,6 +476,17 @@ func TestSyncNodePodProbe(t *testing.T) { InitialDelaySeconds: 100, }, }) + key2 := probeKey{"", "pod-2", "pod-2-uid", "3.3.3.3", "main", "ppm-1#tcpCheck"} + expect[key2] = newWorker(c, key2, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(8000)}, + Host: "3.3.3.3", + }, + }, + }, + }) return expect }, }, @@ -476,7 +515,19 @@ func TestSyncNodePodProbe(t *testing.T) { }, expectWorkers: func(c *Controller) map[probeKey]*worker { expect := map[probeKey]*worker{} - key1 := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"} + key0 := probeKey{"", "pod-0", "pod-0-uid", "1.1.1.1", "main", "ppm-1#healthy"} + expect[key0] = newWorker(c, key0, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }) + + key1 := probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "main", "ppm-1#healthy"} expect[key1] = newWorker(c, key1, &appsv1alpha1.ContainerProbeSpec{ Probe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ @@ -487,7 +538,8 @@ func TestSyncNodePodProbe(t *testing.T) { InitialDelaySeconds: 100, }, }) - key2 := probeKey{"", "pod-1", "pod-1-uid", "nginx", "ppm-1#check"} + + key2 := probeKey{"", "pod-1", "pod-1-uid", "2.2.2.2", "nginx", "ppm-1#check"} expect[key2] = newWorker(c, key2, &appsv1alpha1.ContainerProbeSpec{ Probe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ @@ -498,15 +550,16 @@ func TestSyncNodePodProbe(t *testing.T) { InitialDelaySeconds: 100, }, }) - key3 := probeKey{"", "pod-0", "pod-0-uid", "main", "ppm-1#healthy"} + + key3 := probeKey{"", "pod-2", "pod-2-uid", "3.3.3.3", "main", "ppm-1#tcpCheck"} expect[key3] = newWorker(c, key3, &appsv1alpha1.ContainerProbeSpec{ Probe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ - Exec: &corev1.ExecAction{ - Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(8000)}, + Host: "3.3.3.3", }, }, - InitialDelaySeconds: 100, }, }) return expect diff --git a/pkg/daemon/podprobe/prober.go b/pkg/daemon/podprobe/prober.go index 5b2e5c9ef3..118558b3a2 100644 --- a/pkg/daemon/podprobe/prober.go +++ b/pkg/daemon/podprobe/prober.go @@ -22,13 +22,15 @@ import ( "io" "time" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" criapi "k8s.io/cri-api/pkg/apis" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/probe" execprobe "k8s.io/kubernetes/pkg/probe/exec" + tcpprobe "k8s.io/kubernetes/pkg/probe/tcp" "k8s.io/utils/exec" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" ) const maxProbeMessageLength = 1024 @@ -36,6 +38,7 @@ const maxProbeMessageLength = 1024 // Prober helps to check the probe(exec, http, tcp) of a container. type prober struct { exec execprobe.Prober + tcp tcpprobe.Prober runtimeService criapi.RuntimeService } @@ -44,13 +47,14 @@ type prober struct { func newProber(runtimeService criapi.RuntimeService) *prober { return &prober{ exec: execprobe.New(), + tcp: tcpprobe.New(), runtimeService: runtimeService, } } // probe probes the container. -func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string) (appsv1alpha1.ProbeState, string, error) { - result, msg, err := pb.runProbe(p, container, containerID) +func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, probeKey probeKey, containerRuntimeStatus *runtimeapi.ContainerStatus, containerID string) (appsv1alpha1.ProbeState, string, error) { + result, msg, err := pb.runProbe(p, probeKey, containerRuntimeStatus, containerID) if bytes.Count([]byte(msg), nil)-1 > maxProbeMessageLength { msg = msg[:maxProbeMessageLength] } @@ -60,19 +64,29 @@ func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeap return appsv1alpha1.ProbeSucceeded, msg, nil } -func (pb *prober) runProbe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string) (probe.Result, string, error) { +func (pb *prober) runProbe(p *appsv1alpha1.ContainerProbeSpec, probeKey probeKey, containerRuntimeStatus *runtimeapi.ContainerStatus, containerID string) (probe.Result, string, error) { timeSecond := p.TimeoutSeconds if timeSecond <= 0 { timeSecond = 1 } timeout := time.Duration(timeSecond) * time.Second // current only support exec - // todo: http, tcp + // todo: http if p.Exec != nil { return pb.exec.Probe(pb.newExecInContainer(containerID, p.Exec.Command, timeout)) } - klog.InfoS("Failed to find probe builder for container", "containerName", container.Metadata.Name) - return probe.Unknown, "", fmt.Errorf("missing probe handler for %s", container.Metadata.Name) + // support tcp socket probe handler + if p.TCPSocket != nil { + port := p.TCPSocket.Port.IntValue() + host := p.TCPSocket.Host + if host == "" { + host = probeKey.podIP + } + klog.InfoS("TCP-Probe Host", "host", host, "port", port, "timeout", timeout) + return pb.tcp.Probe(host, port, timeout) + } + klog.InfoS("Failed to find probe builder for container", "containerName", containerRuntimeStatus.Metadata.Name) + return probe.Unknown, "", fmt.Errorf("missing probe handler for %s", containerRuntimeStatus.Metadata.Name) } type execInContainer struct { diff --git a/pkg/daemon/podprobe/prober_test.go b/pkg/daemon/podprobe/prober_test.go new file mode 100644 index 0000000000..7c93bc9404 --- /dev/null +++ b/pkg/daemon/podprobe/prober_test.go @@ -0,0 +1,105 @@ +package podprobe + +import ( + "net" + "net/http" + "net/http/httptest" + "strconv" + "testing" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/kubernetes/pkg/probe" + tcpprobe "k8s.io/kubernetes/pkg/probe/tcp" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" +) + +// New creates Prober. +func New() prober { + return prober{ + tcp: tcpprobe.New(), + } +} + +func TestRunProbe(t *testing.T) { + // Setup a test server that responds to probing correctly + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + tHost, tPortStr, err := net.SplitHostPort(server.Listener.Addr().String()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + tPort, err := strconv.Atoi(tPortStr) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + tests := []struct { + name string + p *appsv1alpha1.ContainerProbeSpec + probeKey probeKey + containerRuntimeStatus *runtimeapi.ContainerStatus + containerID string + + expectedStatus probe.Result + expectedError error + }{ + { + name: "test tcpProbe check, a connection is made and probing would succeed", + p: &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(tPort)}, + }, + }, + }, + }, + probeKey: probeKey{ + podIP: tHost, + }, + containerRuntimeStatus: &runtimeapi.ContainerStatus{ + Metadata: &runtimeapi.ContainerMetadata{ + Name: "container-name", + }, + }, + + expectedStatus: probe.Success, + expectedError: nil, + }, + + { + name: "test tcpProbe check, no connection can be made and probing would fail", + p: &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(-1)}, + }, + }, + }, + }, + probeKey: probeKey{ + podIP: tHost, + }, + expectedStatus: probe.Failure, + expectedError: nil, + }, + } + + prober := New() + for i, tt := range tests { + status, _, err := prober.runProbe(tt.p, tt.probeKey, tt.containerRuntimeStatus, tt.containerID) + if status != tt.expectedStatus { + t.Errorf("#%d: expected status=%v, get=%v", i, tt.expectedStatus, status) + } + if err != tt.expectedError { + t.Errorf("#%d: expected error=%v, get=%v", i, tt.expectedError, err) + } + } + +} diff --git a/pkg/daemon/podprobe/worker.go b/pkg/daemon/podprobe/worker.go index 7b9556d1bc..c7c5a04141 100644 --- a/pkg/daemon/podprobe/worker.go +++ b/pkg/daemon/podprobe/worker.go @@ -22,11 +22,12 @@ import ( "reflect" "time" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/util" "k8s.io/apimachinery/pkg/util/runtime" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" ) // worker handles the periodic probing of its assigned container. Each worker has a go-routine @@ -37,7 +38,7 @@ type worker struct { // Channel for stopping the probe. stopCh chan struct{} - // pod uid, container name, probe name + // pod uid, container name, probe name, ip key probeKey // Describes the probe configuration @@ -119,21 +120,21 @@ func (w *worker) doProbe() (keepGoing bool) { defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging) defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true }) - container, _ := w.probeController.fetchLatestPodContainer(w.key.podUID, w.key.containerName) - if container == nil { + containerRuntimeStatus, _ := w.probeController.fetchLatestPodContainer(w.key.podUID, w.key.containerName) + if containerRuntimeStatus == nil { klog.V(5).Infof("Pod(%s/%s) container(%s) Not Found", w.key.podNs, w.key.podName, w.key.containerName) return true } - if w.containerID != container.Id { + if w.containerID != containerRuntimeStatus.Id { if w.containerID != "" { w.probeController.result.remove(w.containerID) } - klog.V(5).Infof("Pod(%s/%s) container(%s) Id changed(%s -> %s)", w.key.podNs, w.key.podName, w.key.containerName, w.containerID, container.Id) - w.containerID = container.Id + klog.V(5).Infof("Pod(%s/%s) container(%s) Id changed(%s -> %s)", w.key.podNs, w.key.podName, w.key.containerName, w.containerID, containerRuntimeStatus.Id) + w.containerID = containerRuntimeStatus.Id w.probeController.result.set(w.containerID, w.key, w.initialValue, "") } - if container.State != runtimeapi.ContainerState_CONTAINER_RUNNING { + if containerRuntimeStatus.State != runtimeapi.ContainerState_CONTAINER_RUNNING { klog.V(5).Infof("Pod(%s/%s) Non-running container(%s) probed", w.key.podNs, w.key.podName, w.key.containerName) w.probeController.result.set(w.containerID, w.key, appsv1alpha1.ProbeFailed, fmt.Sprintf("Container(%s) is Non-running", w.key.containerName)) } @@ -143,7 +144,7 @@ func (w *worker) doProbe() (keepGoing bool) { if initialDelay < 1 { initialDelay = 1 } - curDelay := int32(time.Since(time.Unix(0, container.StartedAt)).Seconds()) + curDelay := int32(time.Since(time.Unix(0, containerRuntimeStatus.StartedAt)).Seconds()) if curDelay < initialDelay { klog.V(5).Infof("Pod(%s:%s) container(%s) probe(%s) initialDelay(%d), but curDelay(%d)", w.key.podNs, w.key.podName, w.key.containerName, w.key.probeName, initialDelay, curDelay) @@ -152,7 +153,7 @@ func (w *worker) doProbe() (keepGoing bool) { // the full container environment here, OR we must make a call to the CRI in order to get those environment // values from the running container. - result, msg, err := w.probeController.prober.probe(w.spec, container, w.containerID) + result, msg, err := w.probeController.prober.probe(w.spec, w.key, containerRuntimeStatus, w.containerID) if err != nil { klog.Errorf("Pod(%s/%s) do container(%s) probe(%s) spec(%s) failed: %s", w.key.podNs, w.key.podName, w.key.containerName, w.key.probeName, util.DumpJSON(w.spec), err.Error()) diff --git a/pkg/util/pods.go b/pkg/util/pods.go index 9c873ed59f..b1d2838811 100644 --- a/pkg/util/pods.go +++ b/pkg/util/pods.go @@ -18,14 +18,18 @@ package util import ( "fmt" + "strconv" "strings" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" ) // GetPodNames returns names of the given Pods array @@ -343,3 +347,46 @@ func SetPodReadyCondition(pod *v1.Pod) { SetPodCondition(pod, newPodReady) } + +func ExtractPort(param intstr.IntOrString, container v1.Container) (int, error) { + port := -1 + var err error + switch param.Type { + case intstr.Int: + port = param.IntValue() + case intstr.String: + if port, err = findPortByName(container, param.StrVal); err != nil { + // Last ditch effort - maybe it was an int stored as string? + klog.Errorf("error : %v", err) + if port, err = strconv.Atoi(param.StrVal); err != nil { + return port, err + } + } + default: + return port, fmt.Errorf("intOrString had no kind: %+v", param) + } + if port > 0 && port < 65536 { + return port, nil + } + return port, fmt.Errorf("invalid port number: %v", port) +} + +// findPortByName is a helper function to look up a port in a container by name. +func findPortByName(container v1.Container, portName string) (int, error) { + for _, port := range container.Ports { + if port.Name == portName { + return int(port.ContainerPort), nil + } + } + return 0, fmt.Errorf("port %s not found", portName) +} + +func GetPodContainerByName(cName string, pod *v1.Pod) *v1.Container { + for _, container := range pod.Spec.Containers { + if cName == container.Name { + return &container + } + } + + return nil +} diff --git a/pkg/util/pods_test.go b/pkg/util/pods_test.go index 5ca39cca10..a39f517022 100644 --- a/pkg/util/pods_test.go +++ b/pkg/util/pods_test.go @@ -17,9 +17,13 @@ limitations under the License. package util import ( + "fmt" + "reflect" "testing" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" ) func TestMergeVolumeMounts(t *testing.T) { @@ -102,3 +106,280 @@ func TestMergeVolumes(t *testing.T) { } } } + +func TestExtractPort(t *testing.T) { + cases := []struct { + name string + param intstr.IntOrString + container v1.Container + expectErr error + expectPortInt int + }{ + { + name: "str type container port", + param: intstr.IntOrString{ + Type: intstr.String, + StrVal: "main-port", + }, + container: v1.Container{ + Name: "main", + Ports: []v1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 80, + }, + }, + }, + expectPortInt: 80, + }, + { + name: "str type container", + param: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 8081, + }, + container: v1.Container{ + Name: "main", + Ports: []v1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 8080, + }, + }, + }, + expectPortInt: 8081, + }, + { + name: "intOrString had no kind", + param: intstr.IntOrString{ + Type: 3, + }, + container: v1.Container{ + Name: "main", + Ports: []v1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 8080, + }, + }, + }, + expectErr: fmt.Errorf("intOrString had no kind: {Type:3 IntVal:0 StrVal:}"), + expectPortInt: -1, + }, + { + name: "find port by name error(strconv atoi error)", + param: intstr.IntOrString{ + Type: intstr.String, + StrVal: "main-port", + }, + container: v1.Container{ + Name: "main", + Ports: []v1.ContainerPort{ + { + Name: "main-port", + ContainerPort: -80, + }, + }, + }, + expectErr: fmt.Errorf("invalid port number: -80"), + expectPortInt: -80, + }, + { + name: "no found port", + param: intstr.IntOrString{ + Type: intstr.String, + StrVal: "main-port", + }, + container: v1.Container{ + Name: "main", + Ports: []v1.ContainerPort{ + { + Name: "main-port-no", + ContainerPort: 80, + }, + }, + }, + expectErr: fmt.Errorf("strconv.Atoi: parsing \"main-port\": invalid syntax"), + expectPortInt: 0, + }, + { + name: "invalid port number", + param: intstr.IntOrString{}, + container: v1.Container{ + Name: "main", + Ports: []v1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 8080, + }, + }, + }, + expectErr: fmt.Errorf("invalid port number: 0"), + expectPortInt: 0, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + get, err := ExtractPort(cs.param, cs.container) + if err != nil && !reflect.DeepEqual(cs.expectErr.Error(), err.Error()) { + t.Logf("%v ---> %v", cs.expectErr, err) + t.Errorf("Failed to extractPort, err: %v", err) + } + if get != cs.expectPortInt { + t.Errorf("expect: %v, but: %v", cs.expectPortInt, get) + } + }) + } +} + +func TestGetPodContainerByName(t *testing.T) { + cases := []struct { + name string + cName string + pod *v1.Pod + expectContainer *v1.Container + }{ + { + name: "find container by name in pod", + cName: "main", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "sp1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "main", + Image: "nginx:1.5", + Ports: []v1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 9090, + }, + }, + }, + { + Name: "sidecar1", + Image: "sidecar1-image:1.5", + }, + }, + }, + }, + expectContainer: &v1.Container{ + Name: "main", + Image: "nginx:1.5", + Ports: []v1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 9090, + }, + }, + }, + }, + + { + name: "no find container by name in pod", + cName: "main-using", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "sp1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "main", + Image: "nginx:1.5", + Ports: []v1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 9090, + }, + }, + }, + { + Name: "sidecar1", + Image: "sidecar1-image:1.5", + }, + }, + }, + }, + expectContainer: nil, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + get := GetPodContainerByName(cs.cName, cs.pod) + if !reflect.DeepEqual(DumpJSON(get), DumpJSON(cs.expectContainer)) { + t.Errorf("expect: %v, but: %v", DumpJSON(cs.expectContainer), DumpJSON(get)) + } + }) + } +} + +func TestFindPortByName(t *testing.T) { + cases := []struct { + name string + container v1.Container + portName string + expectErr error + expectPort int + }{ + { + name: "get port by port name", + container: v1.Container{ + Name: "main", + Ports: []v1.ContainerPort{ + { + Name: "main-port", + ContainerPort: 80, + }, + }, + }, + portName: "main-port", + expectPort: 80, + }, + { + name: "no find port by port name(container port is nil)", + container: v1.Container{ + Name: "main", + Ports: []v1.ContainerPort{ + { + Name: "main-port", + }, + }, + }, + portName: "main-port", + expectPort: 0, + }, + { + name: "no found port by port name", + container: v1.Container{ + Name: "main", + Ports: []v1.ContainerPort{ + { + Name: "main-port", + }, + }, + }, + portName: "main-port-fake", + expectErr: fmt.Errorf("port main-port-fake not found"), + expectPort: 0, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + get, err := findPortByName(cs.container, cs.portName) + if !reflect.DeepEqual(cs.expectErr, err) { + t.Errorf("Failed to get port by name, err: %v", err) + } + if get != cs.expectPort { + t.Errorf("expect: %v, but: %v", cs.expectPort, get) + } + }) + } +} diff --git a/pkg/webhook/podprobemarker/validating/probe_create_update_handler.go b/pkg/webhook/podprobemarker/validating/probe_create_update_handler.go index b552046871..b07f1406a2 100644 --- a/pkg/webhook/podprobemarker/validating/probe_create_update_handler.go +++ b/pkg/webhook/podprobemarker/validating/probe_create_update_handler.go @@ -24,11 +24,11 @@ import ( "regexp" "strings" + "k8s.io/apimachinery/pkg/util/intstr" + "github.com/openkruise/kruise/pkg/features" utilfeature "github.com/openkruise/kruise/pkg/util/feature" - "github.com/openkruise/kruise/apis/apps/pub" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" genericvalidation "k8s.io/apimachinery/pkg/api/validation" @@ -41,6 +41,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + "github.com/openkruise/kruise/apis/apps/pub" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" ) const ( @@ -202,13 +205,30 @@ func validateHandler(handler *corev1.ProbeHandler, fldPath *field.Path) field.Er numHandlers := 0 allErrors := field.ErrorList{} if handler.Exec != nil { - numHandlers++ - allErrors = append(allErrors, validateExecAction(handler.Exec, fldPath.Child("exec"))...) + if numHandlers > 0 { + allErrors = append(allErrors, field.Forbidden(fldPath.Child("exec"), "may not specify more than 1 handler type")) + } else { + numHandlers++ + allErrors = append(allErrors, validateExecAction(handler.Exec, fldPath.Child("exec"))...) + } } - if handler.HTTPGet != nil || handler.TCPSocket != nil { - numHandlers++ - allErrors = append(allErrors, field.Forbidden(fldPath.Child("probe"), "current only support exec probe")) + if handler.HTTPGet != nil { + if numHandlers > 0 { + allErrors = append(allErrors, field.Forbidden(fldPath.Child("httpGet"), "may not specify more than 1 handler type")) + } else { + numHandlers++ + allErrors = append(allErrors, field.Forbidden(fldPath.Child("probe"), "current no support http probe")) + } } + if handler.TCPSocket != nil { + if numHandlers > 0 { + allErrors = append(allErrors, field.Forbidden(fldPath.Child("tcpSocket"), "may not specify more than 1 handler type")) + } else { + numHandlers++ + allErrors = append(allErrors, validateTCPSocketAction(handler.TCPSocket, fldPath.Child("tcpSocket"))...) + } + } + if numHandlers == 0 { allErrors = append(allErrors, field.Required(fldPath, "must specify a handler type")) } @@ -223,6 +243,26 @@ func validateExecAction(exec *corev1.ExecAction, fldPath *field.Path) field.Erro return allErrors } +func validateTCPSocketAction(tcp *corev1.TCPSocketAction, fldPath *field.Path) field.ErrorList { + return ValidatePortNumOrName(tcp.Port, fldPath.Child("port")) +} + +func ValidatePortNumOrName(port intstr.IntOrString, fldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + if port.Type == intstr.Int { + for _, msg := range validationutil.IsValidPortNum(port.IntValue()) { + allErrs = append(allErrs, field.Invalid(fldPath, port.IntValue(), msg)) + } + } else if port.Type == intstr.String { + for _, msg := range validationutil.IsValidPortName(port.StrVal) { + allErrs = append(allErrs, field.Invalid(fldPath, port.StrVal, msg)) + } + } else { + allErrs = append(allErrs, field.InternalError(fldPath, fmt.Errorf("unknown type: %v", port.Type))) + } + return allErrs +} + func validateProbeMarkerPolicy(policy *appsv1alpha1.ProbeMarkerPolicy, fldPath *field.Path) field.ErrorList { allErrors := field.ErrorList{} if policy.State != appsv1alpha1.ProbeSucceeded && policy.State != appsv1alpha1.ProbeFailed { diff --git a/pkg/webhook/podprobemarker/validating/probe_validating_test.go b/pkg/webhook/podprobemarker/validating/probe_validating_test.go index d18cdbaf9c..18d0b8014b 100644 --- a/pkg/webhook/podprobemarker/validating/probe_validating_test.go +++ b/pkg/webhook/podprobemarker/validating/probe_validating_test.go @@ -19,14 +19,16 @@ package validating import ( "testing" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/validation/field" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" ) func init() { @@ -133,8 +135,10 @@ func TestValidatingPodProbeMarker(t *testing.T) { Probe: appsv1alpha1.ContainerProbeSpec{ Probe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ - TCPSocket: &corev1.TCPSocketAction{ - Port: intstr.FromInt(80), + HTTPGet: &corev1.HTTPGetAction{ + Path: "/index.html", + Scheme: corev1.URISchemeHTTP, + Port: intstr.FromInt(80), }, }, }, @@ -236,3 +240,217 @@ func TestValidatingPodProbeMarker(t *testing.T) { }) } } + +func TestValidateHandler(t *testing.T) { + successCases := []corev1.ProbeHandler{ + {Exec: &corev1.ExecAction{Command: []string{"echo"}}}, + {TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(8000)}, + Host: "3.3.3.3", + }}, + {TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "container-port"}, + Host: "3.3.3.3", + }}, + } + for _, h := range successCases { + if errs := validateHandler(&h, field.NewPath("field")); len(errs) != 0 { + t.Errorf("expected success: %v", errs) + } + } + + errorCases := []corev1.ProbeHandler{ + {}, + {Exec: &corev1.ExecAction{Command: []string{}}}, + {TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "container-port-v2"}, + Host: "3.3.3.3", + }}, + {TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: -1}, + Host: "3.3.3.3", + }}, + {HTTPGet: &corev1.HTTPGetAction{Path: "", Port: intstr.FromInt(0), Host: ""}}, + {HTTPGet: &corev1.HTTPGetAction{Path: "/foo", Port: intstr.FromInt(65536), Host: "host"}}, + {HTTPGet: &corev1.HTTPGetAction{Path: "", Port: intstr.FromString(""), Host: ""}}, + { + Exec: &corev1.ExecAction{Command: []string{}}, + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "container-port-v2"}, + Host: "3.3.3.3", + }, + }, + { + Exec: &corev1.ExecAction{Command: []string{}}, + HTTPGet: &corev1.HTTPGetAction{Path: "", Port: intstr.FromString(""), Host: ""}, + }, + { + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "container-port-v2"}, + Host: "3.3.3.3", + }, + HTTPGet: &corev1.HTTPGetAction{Path: "", Port: intstr.FromString(""), Host: ""}, + }, + { + Exec: &corev1.ExecAction{Command: []string{}}, + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.String, StrVal: "container-port-v2"}, + Host: "3.3.3.3", + }, + HTTPGet: &corev1.HTTPGetAction{Path: "", Port: intstr.FromString(""), Host: ""}, + }, + } + for _, h := range errorCases { + if errs := validateHandler(&h, field.NewPath("field")); len(errs) == 0 { + t.Errorf("expected failure for %#v", h) + } + } +} + +func TestValidatePortNumOrName(t *testing.T) { + successCases := []struct { + port intstr.IntOrString + fldPath *field.Path + }{ + { + port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + fldPath: field.NewPath("field"), + }, + { + port: intstr.IntOrString{ + Type: intstr.String, + StrVal: "abc", + }, + fldPath: field.NewPath("field"), + }, + } + for _, cs := range successCases { + if getErrs := ValidatePortNumOrName(cs.port, cs.fldPath); len(getErrs) != 0 { + t.Errorf("expect failure for %#v", util.DumpJSON(cs)) + } + } + + errorCases := []struct { + port intstr.IntOrString + fldPath *field.Path + }{ + { + port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: -1, + }, + fldPath: field.NewPath("field"), + }, + { + port: intstr.IntOrString{ + Type: intstr.String, + StrVal: "", + }, + fldPath: field.NewPath("field"), + }, + { + // IsValidPortName check that the argument is valid syntax. It must be + // non-empty and no more than 15 characters long. It may contain only [-a-z0-9] + // and must contain at least one letter [a-z]. + port: intstr.IntOrString{ + Type: intstr.String, + StrVal: "aaaaabbbbbcccccd", // more than 15 characters + }, + fldPath: field.NewPath("field"), + }, + { + port: intstr.IntOrString{ + Type: 3, // fake type + }, + fldPath: field.NewPath("field"), + }, + } + for _, cs := range errorCases { + if getErrs := ValidatePortNumOrName(cs.port, cs.fldPath); len(getErrs) == 0 { + t.Errorf("expect failure for %#v", util.DumpJSON(cs)) + } + } +} + +func TestValidateTCPSocketAction(t *testing.T) { + successCases := []struct { + tcp *corev1.TCPSocketAction + fldPath *field.Path + }{ + { + tcp: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + }, + fldPath: field.NewPath("field"), + }, + { + tcp: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{ + Type: intstr.String, + StrVal: "abc", + }, + }, + fldPath: field.NewPath("field"), + }, + } + for _, cs := range successCases { + if getErrs := validateTCPSocketAction(cs.tcp, cs.fldPath); len(getErrs) != 0 { + t.Errorf("expect failure for %#v", util.DumpJSON(cs)) + } + } + + errorCases := []struct { + tcp *corev1.TCPSocketAction + fldPath *field.Path + }{ + { + tcp: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: -1, + }, + }, + fldPath: field.NewPath("field"), + }, + { + tcp: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{ + Type: intstr.String, + StrVal: "", + }, + }, + fldPath: field.NewPath("field"), + }, + { + tcp: &corev1.TCPSocketAction{ + // IsValidPortName check that the argument is valid syntax. It must be + // non-empty and no more than 15 characters long. It may contain only [-a-z0-9] + // and must contain at least one letter [a-z]. + Port: intstr.IntOrString{ + Type: intstr.String, + StrVal: "aaaaabbbbbcccccd", // more than 15 characters + }, + }, + fldPath: field.NewPath("field"), + }, + { + tcp: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{ + Type: 3, // fake type + }, + }, + fldPath: field.NewPath("field"), + }, + } + for _, cs := range errorCases { + if getErrs := validateTCPSocketAction(cs.tcp, cs.fldPath); len(getErrs) == 0 { + t.Errorf("expect failure for %#v", util.DumpJSON(cs)) + } + } +} diff --git a/test/e2e/apps/podprobemarker.go b/test/e2e/apps/podprobemarker.go index ba60eb2a46..2ab9ee7073 100644 --- a/test/e2e/apps/podprobemarker.go +++ b/test/e2e/apps/podprobemarker.go @@ -23,18 +23,20 @@ import ( "github.com/onsi/ginkgo" "github.com/onsi/gomega" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" - "github.com/openkruise/kruise/pkg/controller/podprobemarker" - "github.com/openkruise/kruise/pkg/util" - "github.com/openkruise/kruise/test/e2e/framework" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" utilpointer "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + "github.com/openkruise/kruise/pkg/controller/podprobemarker" + "github.com/openkruise/kruise/pkg/util" + "github.com/openkruise/kruise/test/e2e/framework" ) var _ = SIGDescribe("PodProbeMarker", func() { @@ -53,7 +55,7 @@ var _ = SIGDescribe("PodProbeMarker", func() { randStr = rand.String(10) }) - framework.KruiseDescribe("PodProbeMarker functionality", func() { + framework.KruiseDescribe("PodProbeMarker with exec functionality", func() { ginkgo.AfterEach(func() { if ginkgo.CurrentGinkgoTestDescription().Failed { @@ -61,7 +63,7 @@ var _ = SIGDescribe("PodProbeMarker", func() { } }) - ginkgo.It("pod probe marker test1", func() { + ginkgo.It("pod probe marker exec test", func() { nodeList, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) nodeLen := len(nodeList.Items) @@ -274,4 +276,182 @@ var _ = SIGDescribe("PodProbeMarker", func() { } }) }) + + framework.KruiseDescribe("PodProbeMarker with tcpCheck functionality", func() { + ginkgo.AfterEach(func() { + if ginkgo.CurrentGinkgoTestDescription().Failed { + framework.DumpDebugInfo(c, ns) + } + }) + + ginkgo.It("pod probe marker tcpCheck test", func() { + nodeList, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + nodeLen := len(nodeList.Items) + if nodeLen == 0 { + ginkgo.By("pod probe markers list nodeList is zero") + return + } + nppList, err := kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(nppList.Items).To(gomega.HaveLen(nodeLen)) + + // create statefulset + sts := tester.NewBaseStatefulSet(ns, randStr) + // For heterogeneous scenario like edge cluster, I want to deploy a Pod for each Node to verify that the functionality works + sts.Spec.Template.Spec.TopologySpreadConstraints = []v1.TopologySpreadConstraint{ + { + LabelSelector: sts.Spec.Selector, + MaxSkew: 1, + TopologyKey: "kubernetes.io/hostname", + WhenUnsatisfiable: v1.ScheduleAnyway, + }, + } + sts.Spec.Replicas = utilpointer.Int32Ptr(int32(nodeLen)) + ginkgo.By(fmt.Sprintf("Create statefulset(%s/%s)", sts.Namespace, sts.Name)) + tester.CreateStatefulSet(sts) + + // create pod probe marker + ppmList := tester.NewPodProbeMarkerForTcpCheck(ns, randStr) + ppm1 := &ppmList[0] + _, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Create(context.TODO(), ppm1, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second * 10) + + // check finalizer + ppm1, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Get(context.TODO(), ppm1.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(controllerutil.ContainsFinalizer(ppm1, podprobemarker.PodProbeMarkerFinalizer)).To(gomega.BeTrue()) + + pods, err := tester.ListActivePods(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods).To(gomega.HaveLen(int(*sts.Spec.Replicas))) + validPods := sets.NewString() + for _, pod := range pods { + validPods.Insert(string(pod.UID)) + npp, err := kc.AppsV1alpha1().NodePodProbes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + var podProbe *appsv1alpha1.PodProbe + for i := range npp.Spec.PodProbes { + obj := &npp.Spec.PodProbes[i] + if obj.UID == string(pod.UID) { + podProbe = obj + break + } + } + gomega.Expect(podProbe).NotTo(gomega.BeNil()) + gomega.Expect(pod.Labels["nginx"]).To(gomega.Equal("healthy")) + condition := util.GetCondition(pod, "game.kruise.io/healthy") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionTrue))) + condition = util.GetCondition(pod, "game.kruise.io/check") + gomega.Expect(condition).To(gomega.BeNil()) + } + nppList, err = kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, npp := range nppList.Items { + for _, podProbe := range npp.Spec.PodProbes { + gomega.Expect(validPods.Has(podProbe.UID)).To(gomega.BeTrue()) + } + } + + // update failed probe, tcp port check from 80 ---> 8081 + ppm1, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Get(context.TODO(), ppm1.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + ppm1.Spec.Probes[0].Probe.TCPSocket = &v1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(8081)}, + } + _, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Update(context.TODO(), ppm1, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second * 60) + pods, err = tester.ListActivePods(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods).To(gomega.HaveLen(int(*sts.Spec.Replicas))) + for _, pod := range pods { + // healthy probe + gomega.Expect(pod.Labels["nginx"]).To(gomega.Equal("")) + condition := util.GetCondition(pod, "game.kruise.io/healthy") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionFalse))) + } + + // update success probe + ppm1, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Get(context.TODO(), ppm1.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + ppm1.Spec.Probes[0].Probe.TCPSocket = &v1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(80)}, + } + _, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Update(context.TODO(), ppm1, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // scale down + sts, err = kc.AppsV1beta1().StatefulSets(ns).Get(context.TODO(), sts.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + sts.Spec.Replicas = utilpointer.Int32Ptr(1) + _, err = kc.AppsV1beta1().StatefulSets(ns).Update(context.TODO(), sts, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second * 60) + + pods, err = tester.ListActivePods(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(len(pods)).To(gomega.Equal(1)) + validPods = sets.NewString() + for _, pod := range pods { + validPods.Insert(string(pod.UID)) + // healthy probe + gomega.Expect(pod.Labels["nginx"]).To(gomega.Equal("healthy")) + condition := util.GetCondition(pod, "game.kruise.io/healthy") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionTrue))) + } + nppList, err = kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, npp := range nppList.Items { + for _, podProbe := range npp.Spec.PodProbes { + gomega.Expect(validPods.Has(podProbe.UID)).To(gomega.BeTrue()) + } + } + + // scale up + sts, err = kc.AppsV1beta1().StatefulSets(ns).Get(context.TODO(), sts.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + sts.Spec.Replicas = utilpointer.Int32Ptr(int32(nodeLen)) + _, err = kc.AppsV1beta1().StatefulSets(ns).Update(context.TODO(), sts, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + tester.WaitForStatefulSetRunning(sts) + time.Sleep(time.Second * 100) + + pods, err = tester.ListActivePods(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(len(pods)).To(gomega.Equal(nodeLen)) + validPods = sets.NewString() + for _, pod := range pods { + validPods.Insert(string(pod.UID)) + // healthy probe + gomega.Expect(pod.Labels["nginx"]).To(gomega.Equal("healthy")) + condition := util.GetCondition(pod, "game.kruise.io/healthy") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionTrue))) + } + nppList, err = kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, npp := range nppList.Items { + for _, podProbe := range npp.Spec.PodProbes { + gomega.Expect(validPods.Has(podProbe.UID)).To(gomega.BeTrue()) + } + } + + // delete podProbeMarker + for _, ppm := range ppmList { + err = kc.AppsV1alpha1().PodProbeMarkers(ns).Delete(context.TODO(), ppm.Name, metav1.DeleteOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + time.Sleep(time.Second * 3) + nppList, err = kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, npp := range nppList.Items { + gomega.Expect(npp.Spec.PodProbes).To(gomega.HaveLen(0)) + } + }) + }) }) diff --git a/test/e2e/framework/pod_probe_marker_util.go b/test/e2e/framework/pod_probe_marker_util.go index 37e5a5c8c6..80739b712b 100644 --- a/test/e2e/framework/pod_probe_marker_util.go +++ b/test/e2e/framework/pod_probe_marker_util.go @@ -22,16 +22,18 @@ import ( "time" "github.com/onsi/gomega" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" - kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" kubecontroller "k8s.io/kubernetes/pkg/controller" utilpointer "k8s.io/utils/pointer" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" ) type PodProbeMarkerTester struct { @@ -132,6 +134,48 @@ func (s *PodProbeMarkerTester) NewPodProbeMarker(ns, randStr string) []appsv1alp return []appsv1alpha1.PodProbeMarker{nginx, main} } +func (s *PodProbeMarkerTester) NewPodProbeMarkerForTcpCheck(ns, randStr string) []appsv1alpha1.PodProbeMarker { + nginx := appsv1alpha1.PodProbeMarker{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ppm-nginx", + Namespace: ns, + }, + Spec: appsv1alpha1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": fmt.Sprintf("probe-%s", randStr), + }, + }, + Probes: []appsv1alpha1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "nginx", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{Type: intstr.Int, IntVal: int32(80)}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsv1alpha1.ProbeMarkerPolicy{ + { + State: appsv1alpha1.ProbeSucceeded, + Labels: map[string]string{ + "nginx": "healthy", + }, + }, + }, + }, + }, + }, + } + + return []appsv1alpha1.PodProbeMarker{nginx} +} + func (s *PodProbeMarkerTester) NewBaseStatefulSet(namespace, randStr string) *appsv1beta1.StatefulSet { return &appsv1beta1.StatefulSet{ TypeMeta: metav1.TypeMeta{ diff --git a/vendor/k8s.io/kubernetes/pkg/probe/tcp/tcp.go b/vendor/k8s.io/kubernetes/pkg/probe/tcp/tcp.go new file mode 100644 index 0000000000..7ce1450470 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/probe/tcp/tcp.go @@ -0,0 +1,63 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tcp + +import ( + "net" + "strconv" + "time" + + "k8s.io/kubernetes/pkg/probe" + + "k8s.io/klog/v2" +) + +// New creates Prober. +func New() Prober { + return tcpProber{} +} + +// Prober is an interface that defines the Probe function for doing TCP readiness/liveness checks. +type Prober interface { + Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) +} + +type tcpProber struct{} + +// Probe checks that a TCP connection to the address can be opened. +func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) { + return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout) +} + +// DoTCPProbe checks that a TCP socket to the address can be opened. +// If the socket can be opened, it returns Success +// If the socket fails to open, it returns Failure. +// This is exported because some other packages may want to do direct TCP probes. +func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) { + d := probe.ProbeDialer() + d.Timeout = timeout + conn, err := d.Dial("tcp", addr) + if err != nil { + // Convert errors to failures to handle timeouts. + return probe.Failure, err.Error(), nil + } + err = conn.Close() + if err != nil { + klog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err) + } + return probe.Success, "", nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index fb5d78e7c6..56614bc91c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1369,6 +1369,7 @@ k8s.io/kubernetes/pkg/kubelet/util/format k8s.io/kubernetes/pkg/kubelet/util/ioutils k8s.io/kubernetes/pkg/probe k8s.io/kubernetes/pkg/probe/exec +k8s.io/kubernetes/pkg/probe/tcp k8s.io/kubernetes/pkg/proxy/util k8s.io/kubernetes/pkg/scheduler k8s.io/kubernetes/pkg/scheduler/apis/config