Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore!: change services to be headless #574

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions e2e/basic/headless_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package basic

import (
"context"
"fmt"
"strconv"
"time"
)

const gopingImage = "ghcr.io/celestiaorg/goping:4803195"

func (s *Suite) TestHeadlessService() {
const (
namePrefix = "headless-srv-test"
numOfPingPackets = 100
numOfTests = 10
packetTimeout = 1 * time.Second
gopingPort = 8001
)
ctx := context.Background()

mother, err := s.Knuu.NewInstance(namePrefix + "mother")
s.Require().NoError(err)

err = mother.Build().SetImage(ctx, gopingImage)
s.Require().NoError(err)

s.Require().NoError(mother.Network().AddPortTCP(gopingPort))
s.Require().NoError(mother.Build().Commit(ctx))

err = mother.Build().SetEnvironmentVariable("SERVE_ADDR", fmt.Sprintf("0.0.0.0:%d", gopingPort))
s.Require().NoError(err)
mojtaba-esk marked this conversation as resolved.
Show resolved Hide resolved

target, err := mother.CloneWithName(namePrefix + "target")
s.Require().NoError(err)

executor, err := mother.CloneWithName(namePrefix + "executor")
s.Require().NoError(err)

// Prepare ping executor & target
s.Require().NoError(target.Execution().Start(ctx))
s.Require().NoError(executor.Execution().Start(ctx))

targetEndpoint := fmt.Sprintf("%s:%d", target.Network().HostName(), gopingPort)
s.T().Logf("targetEndpoint: %v", targetEndpoint)

s.T().Log("Starting ping test. It takes a while.")
for i := 0; i < numOfTests; i++ {
startTime := time.Now()

output, err := executor.Execution().ExecuteCommand(ctx, "goping", "ping", "-q",
"-c", fmt.Sprint(numOfPingPackets),
"-t", packetTimeout.String(),
"-m", "packetloss",
targetEndpoint)
s.Require().NoError(err)

elapsed := time.Since(startTime)
s.T().Logf("i: %d, test took %.2f seconds, output: `%s`", i, elapsed.Seconds(), output)

gotPacketloss, err := strconv.ParseFloat(output, 64)
s.Require().NoErrorf(err, "failed to parse output: `%s`", output)

s.Assert().Zero(gotPacketloss)
}
}
4 changes: 1 addition & 3 deletions e2e/basic/probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ func (s *Suite) TestProbe() {
}
s.Require().NoError(web.Monitoring().SetLivenessProbe(&livenessProbe))
s.Require().NoError(web.Build().Commit(ctx))
s.Require().NoError(web.Execution().Start(ctx))

// Test logic
webIP, err := web.Network().GetIP(ctx)
s.Require().NoError(err)

s.Require().NoError(web.Execution().Start(ctx))

wgetOutput, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP)
s.Require().NoError(err)

Expand Down
10 changes: 5 additions & 5 deletions e2e/netshaper/netshaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ func (s *Suite) TestNetShaperBandwidth() {

s.Require().NoError(iperfClient.Execution().Start(ctx))

iperfServerIP, err := iperfServer.Network().GetIP(ctx)
s.Require().NoError(err)

// Perform the test
type testCase struct {
name string
Expand All @@ -75,6 +72,9 @@ func (s *Suite) TestNetShaperBandwidth() {
}
}

iperfServerIP, err := iperfServer.Network().GetIP(ctx)
s.Require().NoError(err)

for _, tc := range tt {
tc := tc
s.Run(tc.name, func() {
Expand Down Expand Up @@ -267,6 +267,7 @@ func (s *Suite) TestNetShaperLatency() {

targetIP, err := target.Network().GetIP(ctx)
s.Require().NoError(err)
targetAddress := fmt.Sprintf("%s:%d", targetIP, gopingPort)

for _, tc := range tt {
tc := tc
Expand All @@ -279,7 +280,6 @@ func (s *Suite) TestNetShaperLatency() {
s.T().Log("Starting latency test. It takes a while.")
startTime := time.Now()

targetAddress := fmt.Sprintf("%s:%d", targetIP, gopingPort)
output, err := executor.Execution().ExecuteCommand(ctx,
"goping", "ping", "-q",
"-c", fmt.Sprint(numOfPingPackets),
Expand Down Expand Up @@ -361,6 +361,7 @@ func (s *Suite) TestNetShaperJitter() {

targetIP, err := target.Network().GetIP(ctx)
s.Require().NoError(err)
targetAddress := fmt.Sprintf("%s:%d", targetIP, gopingPort)

for _, tc := range tt {
tc := tc
Expand All @@ -373,7 +374,6 @@ func (s *Suite) TestNetShaperJitter() {
s.T().Log("Starting jitter test. It takes a while.")
startTime := time.Now()

targetAddress := fmt.Sprintf("%s:%d", targetIP, gopingPort)
output, err := executor.Execution().ExecuteCommand(ctx,
"goping", "ping", "-q",
"-c", fmt.Sprint(numOfPingPackets),
Expand Down
5 changes: 1 addition & 4 deletions e2e/system/env_to_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ func (s *Suite) TestEnvToJSON() {

// Test logic
for _, i := range instances {
webIP, err := i.Network().GetIP(ctx)
s.Require().NoError(err)

wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP)
wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", i.Network().HostName())
mojtaba-esk marked this conversation as resolved.
Show resolved Hide resolved
s.Require().NoError(err)

expectedBytes, err := json.Marshal(envVars)
Expand Down
7 changes: 1 addition & 6 deletions e2e/system/external_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,9 @@ func (s *Suite) TestExternalFile() {
s.Require().NoError(err)

s.Require().NoError(server.Build().Commit(ctx))

// Test logic
serverIP, err := server.Network().GetIP(ctx)
s.Require().NoError(err)

s.Require().NoError(server.Execution().Start(ctx))

wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", serverIP)
wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", server.Network().HostName())
mojtaba-esk marked this conversation as resolved.
Show resolved Hide resolved
s.Require().NoError(err)

s.Assert().Contains(wget, "Hello World!")
Expand Down
11 changes: 1 addition & 10 deletions e2e/system/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,6 @@ func (s *Suite) TestFile() {
s.Require().NoError(err, "Error committing changes")

// Test logic
s.T().Log("Getting server IP")
var serverfileIP string
err = s.RetryOperation(func() error {
var err error
serverfileIP, err = serverfile.Network().GetIP(ctx)
return err
}, maxRetries)
s.Require().NoError(err, "Error getting server IP")

s.T().Log("Starting server")
err = s.RetryOperation(func() error {
return serverfile.Execution().Start(ctx)
Expand All @@ -60,7 +51,7 @@ func (s *Suite) TestFile() {
var wget string
err = s.RetryOperation(func() error {
var err error
wget, err = executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", serverfileIP)
wget, err = executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", serverfile.Network().HostName())
return err
}, maxRetries)
s.Require().NoError(err, "Error executing wget command")
Expand Down
7 changes: 1 addition & 6 deletions e2e/system/file_test_image_cached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,11 @@ func (s *Suite) TestFileCached() {

for _, i := range instances {
err := s.RetryOperation(func() error {
webIP, err := i.Network().GetIP(ctx)
if err != nil {
return fmt.Errorf("getting IP: %w", err)
}

if err := i.Execution().WaitInstanceIsRunning(ctx); err != nil {
return fmt.Errorf("waiting for instance to run: %w", err)
}

wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP)
wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", i.Network().HostName())
if err != nil {
return fmt.Errorf("executing wget: %w", err)
}
Expand Down
33 changes: 9 additions & 24 deletions e2e/system/files_to_volumes_cm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,10 @@ func (s *Suite) TestNoVolumesNoFiles() {

target := s.CreateNginxInstance(ctx, namePrefix+"-target")
s.Require().NoError(target.Build().Commit(ctx))
s.Require().NoError(target.Execution().Start(ctx))

// Test logic
s.Require().NoError(target.Execution().StartAsync(ctx))

webIP, err := target.Network().GetIP(ctx)
s.Require().NoError(err)

s.Require().NoError(target.Execution().WaitInstanceIsRunning(ctx))

wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP)
wget, err := executor.Execution().
ExecuteCommand(ctx, "wget", "-q", "-O", "-", target.Network().HostName())
s.Require().NoError(err)

s.Assert().Contains(wget, "Welcome to nginx!")
Expand All @@ -59,13 +53,11 @@ func (s *Suite) TestOneVolumeNoFiles() {
s.Require().NoError(target.Build().Commit(ctx))

// Test logic
s.Require().NoError(target.Execution().StartAsync(ctx))
s.Require().NoError(target.Execution().Start(ctx))

webIP, err := target.Network().GetIP(ctx)
s.Require().NoError(err)

s.Require().NoError(target.Execution().WaitInstanceIsRunning(ctx))

wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP)
mojtaba-esk marked this conversation as resolved.
Show resolved Hide resolved
s.Require().NoError(err)

Expand Down Expand Up @@ -114,13 +106,10 @@ func (s *Suite) TestNoVolumesOneFile() {
}

for _, i := range instances {
webIP, err := i.Network().GetIP(ctx)
s.Require().NoError(err)

err = i.Execution().WaitInstanceIsRunning(ctx)
s.Require().NoError(err)

wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP)
wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", i.Network().HostName())
mojtaba-esk marked this conversation as resolved.
Show resolved Hide resolved
s.Require().NoError(err)
wget = strings.TrimSpace(wget)

Expand Down Expand Up @@ -167,11 +156,9 @@ func (s *Suite) TestOneVolumeOneFile() {
}

for _, i := range instances {
webIP, err := i.Network().GetIP(ctx)
s.Require().NoError(err)
s.Require().NoError(i.Execution().WaitInstanceIsRunning(ctx))

wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP)
wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", i.Network().HostName())
mojtaba-esk marked this conversation as resolved.
Show resolved Hide resolved
s.Require().NoError(err)
wget = strings.TrimSpace(wget)

Expand Down Expand Up @@ -237,17 +224,15 @@ func (s *Suite) TestOneVolumeTwoFiles() {
}

for _, i := range instances {
webIP, err := i.Network().GetIP(ctx)
s.Require().NoError(err)
s.Require().NoError(i.Execution().WaitInstanceIsRunning(ctx))

wgetIndex, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP)
wgetIndex, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", i.Network().HostName())
s.Require().NoError(err)
wgetIndex = strings.TrimSpace(wgetIndex)
s.Assert().Equal("hello from 1", wgetIndex)

webIP2 := webIP + "/index-2.html"
wgetIndex2, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP2)
webHost2 := i.Network().HostName() + "/index-2.html"
wgetIndex2, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webHost2)
mojtaba-esk marked this conversation as resolved.
Show resolved Hide resolved
s.Require().NoError(err)
wgetIndex2 = strings.TrimSpace(wgetIndex2)
s.Assert().Equal("hello from 2", wgetIndex2)
Expand Down
3 changes: 1 addition & 2 deletions e2e/system/folder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ func (s *Suite) TestFolder() {
require.NoError(s.T(), err)

require.NoError(s.T(), web.Build().Commit(ctx))
s.Require().NoError(web.Execution().Start(ctx))
mojtaba-esk marked this conversation as resolved.
Show resolved Hide resolved

// Test logic
webIP, err := web.Network().GetIP(ctx)
s.Require().NoError(err)

s.Require().NoError(web.Execution().Start(ctx))

wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP)
s.Require().NoError(err)

Expand Down
5 changes: 1 addition & 4 deletions e2e/system/folder_test_image_cached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,9 @@ func (s *Suite) TestFolderCached() {
}

for _, i := range instances {
webIP, err := i.Network().GetIP(ctx)
s.Require().NoError(err)

s.Require().NoError(i.Execution().WaitInstanceIsRunning(ctx))

wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", webIP)
wget, err := executor.Execution().ExecuteCommand(ctx, "wget", "-q", "-O", "-", i.Network().HostName())
s.Require().NoError(err)

s.Assert().Contains(wget, "Hello World!")
Expand Down
3 changes: 3 additions & 0 deletions pkg/instance/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,5 +220,8 @@ var (
ErrAddingHostToProxyNotAllowed = errors.New("AddingHostToProxyNotAllowed", "adding host to proxy is only allowed in state 'Started' and 'Preparing'. Current state is '%s'")
ErrInstanceNameAlreadyExists = errors.New("InstanceNameAlreadyExists", "instance name '%s' already exists")
ErrSettingSidecarName = errors.New("SettingSidecarName", "error setting sidecar name with prefix '%s' for instance '%s'")
ErrGettingServiceEndpointNotAllowed = errors.New("GettingServiceEndpointNotAllowed", "getting service endpoint is only allowed in state 'Started'. Current state is '%s'")
ErrCannotCloneInstance = errors.New("CannotCloneInstance", "cannot clone instance '%s' in state '%s'")
ErrGettingIPNotAllowed = errors.New("GettingIPNotAllowed", "getting IP is allowed in state 'Started'. Current state is '%s'")
ErrPodIPNotReady = errors.New("PodIPNotReady", "pod IP is not ready for pod '%s'")
)
40 changes: 17 additions & 23 deletions pkg/instance/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,34 +119,28 @@ func (n *network) AddPortUDP(port int) error {
}

// GetIP returns the IP of the instance
// This function can only be called in the states 'Preparing' and 'Started'
// This function can only be called in the states 'Started'
// The IP is not persistent and can be changed when the pod is restarted
// If a persistent IP is needed, use HostName() instead
func (n *network) GetIP(ctx context.Context) (string, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem may be that the developer expects this IP to be static. But in fact, the IP changes if the Pod restarts. A restart can happen either by the users' intentions or 'randomly' by Kubernetes.
One idea is to document it properly, but the developer might oversee this. Another option would be to remove GetIP and entirely require the switch to HostName.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should definitely document the fact that the IP can change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true. a proper documentation needs to be added.

// Check if i.kubernetesService already has the IP
if n.kubernetesService != nil && n.kubernetesService.Spec.ClusterIP != "" {
return n.kubernetesService.Spec.ClusterIP, nil
}
// If not, proceed with the existing logic to deploy the service and get the IP
svc, err := n.instance.K8sClient.GetService(ctx, n.instance.name)
if err != nil || svc == nil {
// Service does not exist, so we need to deploy it
err := n.deployService(ctx, n.portsTCP, n.portsUDP)
if err != nil {
return "", ErrDeployingServiceForInstance.WithParams(n.instance.name).Wrap(err)
}
svc, err = n.instance.K8sClient.GetService(ctx, n.instance.name)
if err != nil {
return "", ErrGettingServiceForInstance.WithParams(n.instance.name).Wrap(err)
}
if !n.instance.IsInState(StateStarted) {
return "", ErrGettingIPNotAllowed.WithParams(n.instance.state.String())
}

ip := svc.Spec.ClusterIP
if ip == "" {
return "", ErrGettingServiceIP.WithParams(n.instance.name)
pod, err := n.instance.K8sClient.GetFirstPodFromReplicaSet(ctx, n.instance.name)
if err != nil {
return "", ErrGettingPodFromReplicaSet.WithParams(n.instance.name).Wrap(err)
mojtaba-esk marked this conversation as resolved.
Show resolved Hide resolved
}

// Update i.kubernetesService for future reference
n.kubernetesService = svc
return ip, nil
if pod.Status.PodIP == "" {
return "", ErrPodIPNotReady.WithParams(pod.Name)
}

return pod.Status.PodIP, nil
}
Comment on lines +126 to +140
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider additional error handling for pod readiness

The implementation now correctly checks for an empty pod IP, which addresses a previous concern. However, consider implementing a retry mechanism or a wait for the pod to be in a running state before attempting to retrieve the IP. This could help handle cases where the pod is not immediately ready when the function is called.

Here's a suggested implementation with a retry mechanism:

 func (n *network) GetIP(ctx context.Context) (string, error) {
 	if !n.instance.IsInState(StateStarted) {
 		return "", ErrGettingIPNotAllowed.WithParams(n.instance.state.String())
 	}
 
-	pod, err := n.instance.K8sClient.GetFirstPodFromReplicaSet(ctx, n.instance.name)
-	if err != nil {
-		return "", ErrGettingPodFromReplicaSet.WithParams(n.instance.name).Wrap(err)
-	}
-
-	if pod.Status.PodIP == "" {
-		return "", ErrPodIPNotReady.WithParams(pod.Name)
-	}
-
-	return pod.Status.PodIP, nil
+	var pod *v1.Pod
+	var err error
+	for retries := 0; retries < 5; retries++ {
+		pod, err = n.instance.K8sClient.GetFirstPodFromReplicaSet(ctx, n.instance.name)
+		if err != nil {
+			return "", ErrGettingPodFromReplicaSet.WithParams(n.instance.name).Wrap(err)
+		}
+		if pod.Status.Phase == v1.PodRunning && pod.Status.PodIP != "" {
+			return pod.Status.PodIP, nil
+		}
+		time.Sleep(time.Second * 2)
+	}
+	return "", ErrPodIPNotReady.WithParams(pod.Name)
 }

This implementation adds a retry loop that checks for a running pod with a non-empty IP, with a maximum of 5 attempts and a 2-second delay between attempts.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if !n.instance.IsInState(StateStarted) {
return "", ErrGettingIPNotAllowed.WithParams(n.instance.state.String())
}
ip := svc.Spec.ClusterIP
if ip == "" {
return "", ErrGettingServiceIP.WithParams(n.instance.name)
pod, err := n.instance.K8sClient.GetFirstPodFromReplicaSet(ctx, n.instance.name)
if err != nil {
return "", ErrGettingPodFromReplicaSet.WithParams(n.instance.name).Wrap(err)
}
// Update i.kubernetesService for future reference
n.kubernetesService = svc
return ip, nil
if pod.Status.PodIP == "" {
return "", ErrPodIPNotReady.WithParams(pod.Name)
}
return pod.Status.PodIP, nil
}
func (n *network) GetIP(ctx context.Context) (string, error) {
if !n.instance.IsInState(StateStarted) {
return "", ErrGettingIPNotAllowed.WithParams(n.instance.state.String())
}
var pod *v1.Pod
var err error
for retries := 0; retries < 5; retries++ {
pod, err = n.instance.K8sClient.GetFirstPodFromReplicaSet(ctx, n.instance.name)
if err != nil {
return "", ErrGettingPodFromReplicaSet.WithParams(n.instance.name).Wrap(err)
}
if pod.Status.Phase == v1.PodRunning && pod.Status.PodIP != "" {
return pod.Status.PodIP, nil
}
time.Sleep(time.Second * 2)
}
return "", ErrPodIPNotReady.WithParams(pod.Name)
}


func (n *network) HostName() string {
return n.instance.K8sClient.ServiceDNS(n.instance.name)
}
mojtaba-esk marked this conversation as resolved.
Show resolved Hide resolved

// deployService deploys the service for the instance
Expand Down
3 changes: 3 additions & 0 deletions pkg/k8s/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,7 @@ var (
ErrListingPods = errors.New("ListingPods", "failed to list pods")
ErrGetPodStatus = errors.New("GetPodStatus", "failed to get pod status for pod %s")
ErrUpdatingConfigmap = errors.New("UpdatingConfigmap", "failed to update configmap %s")
ErrGettingServiceIP = errors.New("GettingServiceIP", "failed to get service IP for service %s")
ErrGettingServiceNodePort = errors.New("GettingServiceNodePort", "failed to get service node port for service %s")
ErrHeadlessService = errors.New("HeadlessService", "headless service '%s' does not have a cluster IP, use DNS instead")
)
Loading
Loading