diff --git a/e2e/basic/headless_service_test.go b/e2e/basic/headless_service_test.go new file mode 100644 index 0000000..1890663 --- /dev/null +++ b/e2e/basic/headless_service_test.go @@ -0,0 +1,66 @@ +package basic + +import ( + "context" + "fmt" + "strconv" + "time" +) + +const gopingImage = "ghcr.io/celestiaorg/goping:4803195" + +func (s *Suite) TestHeadlessService() { + const ( + namePrefix = "headless-srv-test" + numOfPingPackets = 100 + numOfTests = 10 + packetTimeout = 1 * time.Second + gopingPort = 8001 + ) + ctx := context.Background() + + mother, err := s.Knuu.NewInstance(namePrefix + "mother") + s.Require().NoError(err) + + err = mother.Build().SetImage(ctx, gopingImage) + s.Require().NoError(err) + + s.Require().NoError(mother.Network().AddPortTCP(gopingPort)) + s.Require().NoError(mother.Build().Commit(ctx)) + + err = mother.Build().SetEnvironmentVariable("SERVE_ADDR", fmt.Sprintf("0.0.0.0:%d", gopingPort)) + s.Require().NoError(err) + + target, err := mother.CloneWithName(namePrefix + "target") + s.Require().NoError(err) + + executor, err := mother.CloneWithName(namePrefix + "executor") + s.Require().NoError(err) + + // Prepare ping executor & target + s.Require().NoError(target.Execution().Start(ctx)) + s.Require().NoError(executor.Execution().Start(ctx)) + + targetEndpoint := fmt.Sprintf("%s:%d", target.Network().HostName(), gopingPort) + s.T().Logf("targetEndpoint: %v", targetEndpoint) + + s.T().Log("Starting ping test. It takes a while.") + for i := 0; i < numOfTests; i++ { + startTime := time.Now() + + output, err := executor.Execution().ExecuteCommand(ctx, "goping", "ping", "-q", + "-c", fmt.Sprint(numOfPingPackets), + "-t", packetTimeout.String(), + "-m", "packetloss", + targetEndpoint) + s.Require().NoError(err) + + elapsed := time.Since(startTime) + s.T().Logf("i: %d, test took %.2f seconds, output: `%s`", i, elapsed.Seconds(), output) + + gotPacketloss, err := strconv.ParseFloat(output, 64) + s.Require().NoErrorf(err, "failed to parse output: `%s`", output) + + s.Assert().Zero(gotPacketloss) + } +} diff --git a/e2e/basic/probe_test.go b/e2e/basic/probe_test.go index a7d0807..7e9641d 100644 --- a/e2e/basic/probe_test.go +++ b/e2e/basic/probe_test.go @@ -34,13 +34,11 @@ func (s *Suite) TestProbe() { } s.Require().NoError(web.Monitoring().SetLivenessProbe(&livenessProbe)) s.Require().NoError(web.Build().Commit(ctx)) + s.Require().NoError(web.Execution().Start(ctx)) - // Test logic webIP, err := web.Network().GetIP(ctx) s.Require().NoError(err) - s.Require().NoError(web.Execution().Start(ctx)) - wgetOutput, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP) s.Require().NoError(err) diff --git a/e2e/netshaper/netshaper_test.go b/e2e/netshaper/netshaper_test.go index cfef130..f942c71 100644 --- a/e2e/netshaper/netshaper_test.go +++ b/e2e/netshaper/netshaper_test.go @@ -51,9 +51,6 @@ func (s *Suite) TestNetShaperBandwidth() { s.Require().NoError(iperfClient.Execution().Start(ctx)) - iperfServerIP, err := iperfServer.Network().GetIP(ctx) - s.Require().NoError(err) - // Perform the test type testCase struct { name string @@ -75,6 +72,9 @@ func (s *Suite) TestNetShaperBandwidth() { } } + iperfServerIP, err := iperfServer.Network().GetIP(ctx) + s.Require().NoError(err) + for _, tc := range tt { tc := tc s.Run(tc.name, func() { @@ -267,6 +267,7 @@ func (s *Suite) TestNetShaperLatency() { targetIP, err := target.Network().GetIP(ctx) s.Require().NoError(err) + targetAddress := fmt.Sprintf("%s:%d", targetIP, gopingPort) for _, tc := range tt { tc := tc @@ -279,7 +280,6 @@ func (s *Suite) TestNetShaperLatency() { s.T().Log("Starting latency test. It takes a while.") startTime := time.Now() - targetAddress := fmt.Sprintf("%s:%d", targetIP, gopingPort) output, err := executor.Execution().ExecuteCommand(ctx, "goping", "ping", "-q", "-c", fmt.Sprint(numOfPingPackets), @@ -361,6 +361,7 @@ func (s *Suite) TestNetShaperJitter() { targetIP, err := target.Network().GetIP(ctx) s.Require().NoError(err) + targetAddress := fmt.Sprintf("%s:%d", targetIP, gopingPort) for _, tc := range tt { tc := tc @@ -373,7 +374,6 @@ func (s *Suite) TestNetShaperJitter() { s.T().Log("Starting jitter test. It takes a while.") startTime := time.Now() - targetAddress := fmt.Sprintf("%s:%d", targetIP, gopingPort) output, err := executor.Execution().ExecuteCommand(ctx, "goping", "ping", "-q", "-c", fmt.Sprint(numOfPingPackets), diff --git a/e2e/system/env_to_json_test.go b/e2e/system/env_to_json_test.go index 54393d5..9c32a5a 100644 --- a/e2e/system/env_to_json_test.go +++ b/e2e/system/env_to_json_test.go @@ -62,10 +62,7 @@ func (s *Suite) TestEnvToJSON() { // Test logic for _, i := range instances { - webIP, err := i.Network().GetIP(ctx) - s.Require().NoError(err) - - wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP) + wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", i.Network().HostName()) s.Require().NoError(err) expectedBytes, err := json.Marshal(envVars) diff --git a/e2e/system/external_file_test.go b/e2e/system/external_file_test.go index 3c2c2da..f1129bd 100644 --- a/e2e/system/external_file_test.go +++ b/e2e/system/external_file_test.go @@ -41,14 +41,9 @@ func (s *Suite) TestExternalFile() { s.Require().NoError(err) s.Require().NoError(server.Build().Commit(ctx)) - - // Test logic - serverIP, err := server.Network().GetIP(ctx) - s.Require().NoError(err) - s.Require().NoError(server.Execution().Start(ctx)) - wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", serverIP) + wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", server.Network().HostName()) s.Require().NoError(err) s.Assert().Contains(wget, "Hello World!") diff --git a/e2e/system/file_test.go b/e2e/system/file_test.go index dcb20b5..2b1ec4b 100644 --- a/e2e/system/file_test.go +++ b/e2e/system/file_test.go @@ -41,15 +41,6 @@ func (s *Suite) TestFile() { s.Require().NoError(err, "Error committing changes") // Test logic - s.T().Log("Getting server IP") - var serverfileIP string - err = s.RetryOperation(func() error { - var err error - serverfileIP, err = serverfile.Network().GetIP(ctx) - return err - }, maxRetries) - s.Require().NoError(err, "Error getting server IP") - s.T().Log("Starting server") err = s.RetryOperation(func() error { return serverfile.Execution().Start(ctx) @@ -60,7 +51,7 @@ func (s *Suite) TestFile() { var wget string err = s.RetryOperation(func() error { var err error - wget, err = executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", serverfileIP) + wget, err = executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", serverfile.Network().HostName()) return err }, maxRetries) s.Require().NoError(err, "Error executing wget command") diff --git a/e2e/system/file_test_image_cached_test.go b/e2e/system/file_test_image_cached_test.go index a1a6a77..f727576 100644 --- a/e2e/system/file_test_image_cached_test.go +++ b/e2e/system/file_test_image_cached_test.go @@ -63,16 +63,11 @@ func (s *Suite) TestFileCached() { for _, i := range instances { err := s.RetryOperation(func() error { - webIP, err := i.Network().GetIP(ctx) - if err != nil { - return fmt.Errorf("getting IP: %w", err) - } - if err := i.Execution().WaitInstanceIsRunning(ctx); err != nil { return fmt.Errorf("waiting for instance to run: %w", err) } - wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP) + wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", i.Network().HostName()) if err != nil { return fmt.Errorf("executing wget: %w", err) } diff --git a/e2e/system/files_to_volumes_cm_test.go b/e2e/system/files_to_volumes_cm_test.go index ee817f1..7a2377b 100644 --- a/e2e/system/files_to_volumes_cm_test.go +++ b/e2e/system/files_to_volumes_cm_test.go @@ -25,16 +25,10 @@ func (s *Suite) TestNoVolumesNoFiles() { target := s.CreateNginxInstance(ctx, namePrefix+"-target") s.Require().NoError(target.Build().Commit(ctx)) + s.Require().NoError(target.Execution().Start(ctx)) - // Test logic - s.Require().NoError(target.Execution().StartAsync(ctx)) - - webIP, err := target.Network().GetIP(ctx) - s.Require().NoError(err) - - s.Require().NoError(target.Execution().WaitInstanceIsRunning(ctx)) - - wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP) + wget, err := executor.Execution(). + ExecuteCommand(ctx, "wget", "-q", "-O", "-", target.Network().HostName()) s.Require().NoError(err) s.Assert().Contains(wget, "Welcome to nginx!") @@ -59,13 +53,11 @@ func (s *Suite) TestOneVolumeNoFiles() { s.Require().NoError(target.Build().Commit(ctx)) // Test logic - s.Require().NoError(target.Execution().StartAsync(ctx)) + s.Require().NoError(target.Execution().Start(ctx)) webIP, err := target.Network().GetIP(ctx) s.Require().NoError(err) - s.Require().NoError(target.Execution().WaitInstanceIsRunning(ctx)) - wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP) s.Require().NoError(err) @@ -114,13 +106,10 @@ func (s *Suite) TestNoVolumesOneFile() { } for _, i := range instances { - webIP, err := i.Network().GetIP(ctx) - s.Require().NoError(err) - err = i.Execution().WaitInstanceIsRunning(ctx) s.Require().NoError(err) - wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP) + wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", i.Network().HostName()) s.Require().NoError(err) wget = strings.TrimSpace(wget) @@ -167,11 +156,9 @@ func (s *Suite) TestOneVolumeOneFile() { } for _, i := range instances { - webIP, err := i.Network().GetIP(ctx) - s.Require().NoError(err) s.Require().NoError(i.Execution().WaitInstanceIsRunning(ctx)) - wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP) + wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", i.Network().HostName()) s.Require().NoError(err) wget = strings.TrimSpace(wget) @@ -237,17 +224,15 @@ func (s *Suite) TestOneVolumeTwoFiles() { } for _, i := range instances { - webIP, err := i.Network().GetIP(ctx) - s.Require().NoError(err) s.Require().NoError(i.Execution().WaitInstanceIsRunning(ctx)) - wgetIndex, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP) + wgetIndex, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", i.Network().HostName()) s.Require().NoError(err) wgetIndex = strings.TrimSpace(wgetIndex) s.Assert().Equal("hello from 1", wgetIndex) - webIP2 := webIP + "/index-2.html" - wgetIndex2, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP2) + webHost2 := i.Network().HostName() + "/index-2.html" + wgetIndex2, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webHost2) s.Require().NoError(err) wgetIndex2 = strings.TrimSpace(wgetIndex2) s.Assert().Equal("hello from 2", wgetIndex2) diff --git a/e2e/system/folder_test.go b/e2e/system/folder_test.go index 23badf2..677d843 100644 --- a/e2e/system/folder_test.go +++ b/e2e/system/folder_test.go @@ -21,13 +21,12 @@ func (s *Suite) TestFolder() { require.NoError(s.T(), err) require.NoError(s.T(), web.Build().Commit(ctx)) + s.Require().NoError(web.Execution().Start(ctx)) // Test logic webIP, err := web.Network().GetIP(ctx) s.Require().NoError(err) - s.Require().NoError(web.Execution().Start(ctx)) - wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP) s.Require().NoError(err) diff --git a/e2e/system/folder_test_image_cached_test.go b/e2e/system/folder_test_image_cached_test.go index 7e84ab9..9a305b2 100644 --- a/e2e/system/folder_test_image_cached_test.go +++ b/e2e/system/folder_test_image_cached_test.go @@ -45,12 +45,9 @@ func (s *Suite) TestFolderCached() { } for _, i := range instances { - webIP, err := i.Network().GetIP(ctx) - s.Require().NoError(err) - s.Require().NoError(i.Execution().WaitInstanceIsRunning(ctx)) - wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP) + wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", i.Network().HostName()) s.Require().NoError(err) s.Assert().Contains(wget, "Hello World!") diff --git a/pkg/instance/errors.go b/pkg/instance/errors.go index 6c1e42e..a0487ec 100644 --- a/pkg/instance/errors.go +++ b/pkg/instance/errors.go @@ -220,5 +220,8 @@ var ( ErrAddingHostToProxyNotAllowed = errors.New("AddingHostToProxyNotAllowed", "adding host to proxy is only allowed in state 'Started' and 'Preparing'. Current state is '%s'") ErrInstanceNameAlreadyExists = errors.New("InstanceNameAlreadyExists", "instance name '%s' already exists") ErrSettingSidecarName = errors.New("SettingSidecarName", "error setting sidecar name with prefix '%s' for instance '%s'") + ErrGettingServiceEndpointNotAllowed = errors.New("GettingServiceEndpointNotAllowed", "getting service endpoint is only allowed in state 'Started'. Current state is '%s'") ErrCannotCloneInstance = errors.New("CannotCloneInstance", "cannot clone instance '%s' in state '%s'") + ErrGettingIPNotAllowed = errors.New("GettingIPNotAllowed", "getting IP is allowed in state 'Started'. Current state is '%s'") + ErrPodIPNotReady = errors.New("PodIPNotReady", "pod IP is not ready for pod '%s'") ) diff --git a/pkg/instance/network.go b/pkg/instance/network.go index 584a3b4..4e25bc4 100644 --- a/pkg/instance/network.go +++ b/pkg/instance/network.go @@ -119,34 +119,28 @@ func (n *network) AddPortUDP(port int) error { } // GetIP returns the IP of the instance -// This function can only be called in the states 'Preparing' and 'Started' +// This function can only be called in the states 'Started' +// The IP is not persistent and can be changed when the pod is restarted +// If a persistent IP is needed, use HostName() instead func (n *network) GetIP(ctx context.Context) (string, error) { - // Check if i.kubernetesService already has the IP - if n.kubernetesService != nil && n.kubernetesService.Spec.ClusterIP != "" { - return n.kubernetesService.Spec.ClusterIP, nil - } - // If not, proceed with the existing logic to deploy the service and get the IP - svc, err := n.instance.K8sClient.GetService(ctx, n.instance.name) - if err != nil || svc == nil { - // Service does not exist, so we need to deploy it - err := n.deployService(ctx, n.portsTCP, n.portsUDP) - if err != nil { - return "", ErrDeployingServiceForInstance.WithParams(n.instance.name).Wrap(err) - } - svc, err = n.instance.K8sClient.GetService(ctx, n.instance.name) - if err != nil { - return "", ErrGettingServiceForInstance.WithParams(n.instance.name).Wrap(err) - } + if !n.instance.IsInState(StateStarted) { + return "", ErrGettingIPNotAllowed.WithParams(n.instance.state.String()) } - ip := svc.Spec.ClusterIP - if ip == "" { - return "", ErrGettingServiceIP.WithParams(n.instance.name) + pod, err := n.instance.K8sClient.GetFirstPodFromReplicaSet(ctx, n.instance.name) + if err != nil { + return "", ErrGettingPodFromReplicaSet.WithParams(n.instance.name).Wrap(err) } - // Update i.kubernetesService for future reference - n.kubernetesService = svc - return ip, nil + if pod.Status.PodIP == "" { + return "", ErrPodIPNotReady.WithParams(pod.Name) + } + + return pod.Status.PodIP, nil +} + +func (n *network) HostName() string { + return n.instance.K8sClient.ServiceDNS(n.instance.name) } // deployService deploys the service for the instance diff --git a/pkg/k8s/errors.go b/pkg/k8s/errors.go index 6cfa710..59afd23 100644 --- a/pkg/k8s/errors.go +++ b/pkg/k8s/errors.go @@ -137,4 +137,7 @@ var ( ErrListingPods = errors.New("ListingPods", "failed to list pods") ErrGetPodStatus = errors.New("GetPodStatus", "failed to get pod status for pod %s") ErrUpdatingConfigmap = errors.New("UpdatingConfigmap", "failed to update configmap %s") + ErrGettingServiceIP = errors.New("GettingServiceIP", "failed to get service IP for service %s") + ErrGettingServiceNodePort = errors.New("GettingServiceNodePort", "failed to get service node port for service %s") + ErrHeadlessService = errors.New("HeadlessService", "headless service '%s' does not have a cluster IP, use DNS instead") ) diff --git a/pkg/k8s/service.go b/pkg/k8s/service.go index 1e4e4cb..5f593ab 100644 --- a/pkg/k8s/service.go +++ b/pkg/k8s/service.go @@ -121,13 +121,48 @@ func (c *Client) DeleteService(ctx context.Context, name string) error { } func (c *Client) GetServiceIP(ctx context.Context, name string) (string, error) { - svc, err := c.GetService(ctx, name) + srv, err := c.GetService(ctx, name) if err != nil { return "", ErrGettingService.WithParams(name).Wrap(err) } - return svc.Spec.ClusterIP, nil + + if srv.Spec.Type == v1.ServiceTypeLoadBalancer { + // Use the LoadBalancer's external IP + if len(srv.Status.LoadBalancer.Ingress) > 0 { + return srv.Status.LoadBalancer.Ingress[0].IP, nil + } + return "", ErrLoadBalancerIPNotAvailable + } + + if srv.Spec.Type != v1.ServiceTypeNodePort { + // Headless service does not have a cluster IP + if srv.Spec.ClusterIP == v1.ClusterIPNone { + return "", ErrHeadlessService.WithParams(name) + } + return srv.Spec.ClusterIP, nil + } + + // Use the Node IP and NodePort + nodes, err := c.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return "", ErrGettingNodes.Wrap(err) + } + if len(nodes.Items) == 0 { + return "", ErrNoNodesFound + } + + // Use the first node for simplicity, you might need to handle multiple nodes + var nodeIP string + for _, address := range nodes.Items[0].Status.Addresses { + if address.Type == v1.NodeExternalIP { + nodeIP = address.Address + break + } + } + return nodeIP, nil } +// WaitForService() works only for the services with publicly accessible IP func (c *Client) WaitForService(ctx context.Context, name string) error { retryInterval := time.Duration(0) for { @@ -148,11 +183,13 @@ func (c *Client) WaitForService(ctx context.Context, name string) error { } // Check if service is reachable + // Since this function is called from the client, + // we cannot use headless service as it is not accessible from outside the cluster + // so we use the service IP and port to check connectivity endpoint, err := c.GetServiceEndpoint(ctx, name) if err != nil { return ErrGettingServiceEndpoint.WithParams(name).Wrap(err) } - if err := checkServiceConnectivity(endpoint); err != nil { continue } @@ -162,42 +199,30 @@ func (c *Client) WaitForService(ctx context.Context, name string) error { } } +func (c *Client) ServiceDNS(name string) string { + return fmt.Sprintf("%s.%s.svc.cluster.local", name, c.namespace) +} + func (c *Client) GetServiceEndpoint(ctx context.Context, name string) (string, error) { - srv, err := c.clientset.CoreV1().Services(c.namespace).Get(ctx, name, metav1.GetOptions{}) + ip, err := c.GetServiceIP(ctx, name) if err != nil { - return "", ErrGettingService.WithParams(name).Wrap(err) + return "", ErrGettingServiceIP.WithParams(name).Wrap(err) } - if srv.Spec.Type == v1.ServiceTypeLoadBalancer { - // Use the LoadBalancer's external IP - if len(srv.Status.LoadBalancer.Ingress) > 0 { - return fmt.Sprintf("%s:%d", srv.Status.LoadBalancer.Ingress[0].IP, srv.Spec.Ports[0].Port), nil - } - return "", ErrLoadBalancerIPNotAvailable + port, err := c.ServiceNodePort(ctx, name) + if err != nil { + return "", ErrGettingServiceNodePort.WithParams(name).Wrap(err) } - if srv.Spec.Type == v1.ServiceTypeNodePort { - // Use the Node IP and NodePort - nodes, err := c.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) - if err != nil { - return "", ErrGettingNodes.Wrap(err) - } - if len(nodes.Items) == 0 { - return "", ErrNoNodesFound - } + return fmt.Sprintf("%s:%d", ip, port), nil +} - // Use the first node for simplicity, you might need to handle multiple nodes - var nodeIP string - for _, address := range nodes.Items[0].Status.Addresses { - if address.Type == v1.NodeExternalIP { - nodeIP = address.Address - break - } - } - return fmt.Sprintf("%s:%d", nodeIP, srv.Spec.Ports[0].NodePort), nil +func (c *Client) ServiceNodePort(ctx context.Context, name string) (int32, error) { + svc, err := c.GetService(ctx, name) + if err != nil { + return 0, ErrGettingService.WithParams(name).Wrap(err) } - - return fmt.Sprintf("%s:%d", srv.Spec.ClusterIP, srv.Spec.Ports[0].Port), nil + return svc.Spec.Ports[0].NodePort, nil } func (c *Client) isServiceReady(ctx context.Context, name string) (bool, error) { @@ -276,9 +301,10 @@ func prepareService( Labels: labels, }, Spec: v1.ServiceSpec{ - Ports: servicePorts, - Selector: selectorMap, - Type: v1.ServiceTypeClusterIP, + Ports: servicePorts, + Selector: selectorMap, + Type: v1.ServiceTypeClusterIP, + ClusterIP: v1.ClusterIPNone, // Headless service }, } return svc, nil diff --git a/pkg/k8s/types.go b/pkg/k8s/types.go index 9e964da..f46d9a3 100644 --- a/pkg/k8s/types.go +++ b/pkg/k8s/types.go @@ -58,6 +58,8 @@ type KubeManager interface { GetService(ctx context.Context, name string) (*corev1.Service, error) GetServiceEndpoint(ctx context.Context, name string) (string, error) GetServiceIP(ctx context.Context, name string) (string, error) + ServiceDNS(name string) string + ServiceNodePort(ctx context.Context, name string) (int32, error) IsPodRunning(ctx context.Context, name string) (bool, error) IsReplicaSetRunning(ctx context.Context, name string) (bool, error) Namespace() string