diff --git a/internal/kuberesource/mutators.go b/internal/kuberesource/mutators.go index 9b2b2df51..ddf9c0183 100644 --- a/internal/kuberesource/mutators.go +++ b/internal/kuberesource/mutators.go @@ -5,6 +5,7 @@ package kuberesource import ( "fmt" + "log" "slices" "strconv" "strings" @@ -176,7 +177,11 @@ func AddPortForwarders(resources []any) []any { for _, resource := range resources { switch obj := resource.(type) { case *applycorev1.ServiceApplyConfiguration: - out = append(out, PortForwarderForService(obj)) + forwarder, err := PortForwarderForService(obj) + if err != nil { + log.Printf("WARNING: no port forwarder added for service %q: %v", *obj.Name, err) + } + out = append(out, forwarder) } out = append(out, resource) } diff --git a/internal/kuberesource/parts.go b/internal/kuberesource/parts.go index 84ba82b23..7d074c086 100644 --- a/internal/kuberesource/parts.go +++ b/internal/kuberesource/parts.go @@ -278,58 +278,27 @@ type PortForwarderConfig struct { *applycorev1.PodApplyConfiguration } -// PortForwarder constructs a port forwarder pod. -func PortForwarder(name, namespace string) *PortForwarderConfig { - name = "port-forwarder-" + name - - p := Pod(name, namespace). - WithLabels(map[string]string{"app.kubernetes.io/name": name}). - WithSpec(PodSpec(). - WithContainers( - Container(). - WithName("port-forwarder"). - WithImage("ghcr.io/edgelesssys/contrast/port-forwarder:latest"). - WithCommand("/bin/bash", "-c", "echo Starting port-forward with socat; exec socat -d -d TCP-LISTEN:${LISTEN_PORT},fork TCP:${FORWARD_HOST}:${FORWARD_PORT}"). - WithResources(ResourceRequirements(). - WithMemoryLimitAndRequest(50), - ), - ), - ) - - return &PortForwarderConfig{p} -} - -// WithListenPort sets the port to listen on. -func (p *PortForwarderConfig) WithListenPort(port int32) *PortForwarderConfig { - p.Spec.Containers[0]. - WithPorts( - ContainerPort(). - WithContainerPort(port), - ). - WithEnv( - NewEnvVar("LISTEN_PORT", strconv.Itoa(int(port))), - ). - WithStartupProbe(Probe(). - WithInitialDelaySeconds(1). - WithPeriodSeconds(1). - WithTCPSocket(TCPSocketAction(). - WithPort(intstr.FromInt32(port))), - ) +// WithForwardTarget sets the target host to forward to. +func (p *PortForwarderConfig) WithForwardTarget(host string) *PortForwarderConfig { + p.Spec.Containers[0].WithEnv(NewEnvVar("FORWARD_HOST", host)) return p } -// WithForwardTarget sets the target host and port to forward to. -func (p *PortForwarderConfig) WithForwardTarget(host string, port int32) *PortForwarderConfig { - p.Spec.Containers[0]. - WithEnv( - NewEnvVar("FORWARD_HOST", host), - NewEnvVar("FORWARD_PORT", strconv.Itoa(int(port))), - ) - return p +const portForwarderScript = `echo Starting port-forward with socat >&2 +handler() { + echo "Received SIGTERM, forwarding to children" >&2 + kill -TERM -1 } - -// PortForwarderMultiplePorts constructs a port forwarder pod for multiple ports. -func PortForwarderMultiplePorts(name, namespace string) *PortForwarderConfig { +trap handler TERM +set -x +for port in ${LISTEN_PORTS}; do + socat -d -d TCP-LISTEN:$port,fork TCP:${FORWARD_HOST}:$port & +done +wait +` + +// PortForwarder constructs a port forwarder pod for multiple ports. +func PortForwarder(name, namespace string) *PortForwarderConfig { name = "port-forwarder-" + name p := Pod(name, namespace). @@ -339,7 +308,7 @@ func PortForwarderMultiplePorts(name, namespace string) *PortForwarderConfig { Container(). WithName("port-forwarder"). WithImage("ghcr.io/edgelesssys/contrast/port-forwarder:latest"). - WithCommand("/bin/bash", "-c", "echo Starting port-forward with socat; for port in ${LISTEN_PORTS}; do socat -d -d TCP-LISTEN:$port,fork TCP:${FORWARD_HOST}:$port & done; wait"). + WithCommand("/bin/bash", "-c", portForwarderScript). WithResources(ResourceRequirements(). WithMemoryLimitAndRequest(50), ), @@ -349,7 +318,7 @@ func PortForwarderMultiplePorts(name, namespace string) *PortForwarderConfig { return &PortForwarderConfig{p} } -// WithListenPorts sets multiple ports to listen on. Should only be used if PortForwarderMultiplePorts was used initially. +// WithListenPorts sets multiple ports to listen on. func (p *PortForwarderConfig) WithListenPorts(ports []int32) *PortForwarderConfig { var containerPorts []*applycorev1.ContainerPortApplyConfiguration var envVar string @@ -493,8 +462,8 @@ func ServiceForStatefulSet(s *applyappsv1.StatefulSetApplyConfiguration) *applyc // PortForwarderForService creates a Pod that forwards network traffic to the given service. // -// Port forwarders are named "port-forwarder-SVCNAME" and forward the first port in the ServiceSpec. -func PortForwarderForService(svc *applycorev1.ServiceApplyConfiguration) *applycorev1.PodApplyConfiguration { +// Port forwarders are named "port-forwarder-SVCNAME" and forward all TCP ports in the ServiceSpec. +func PortForwarderForService(svc *applycorev1.ServiceApplyConfiguration) (*applycorev1.PodApplyConfiguration, error) { namespace := "" if svc.Namespace != nil { namespace = *svc.Namespace @@ -502,14 +471,19 @@ func PortForwarderForService(svc *applycorev1.ServiceApplyConfiguration) *applyc var ports []int32 for _, port := range svc.Spec.Ports { - ports = append(ports, *port.Port) + if port.Protocol == nil || *port.Protocol == corev1.ProtocolTCP { + ports = append(ports, *port.Port) + } + } + if len(ports) == 0 { + return nil, fmt.Errorf("no TCP ports in service spec") } - forwarder := PortForwarderMultiplePorts(*svc.Name, namespace). + forwarder := PortForwarder(*svc.Name, namespace). WithListenPorts(ports). - WithForwardTarget(*svc.Name, -1) // port can be -1 since MultiplePortsForwarder ignores FORWARD_PORT env + WithForwardTarget(*svc.Name) - return forwarder.PodApplyConfiguration + return forwarder.PodApplyConfiguration, nil } // Initializer creates a new InitializerConfig. diff --git a/internal/kuberesource/parts_test.go b/internal/kuberesource/parts_test.go index 5fa4bd94f..a92934fde 100644 --- a/internal/kuberesource/parts_test.go +++ b/internal/kuberesource/parts_test.go @@ -13,8 +13,8 @@ func TestNewPortForwarder(t *testing.T) { require := require.New(t) config := PortForwarder("coordinator", "default"). - WithListenPort(1313). - WithForwardTarget("coordinator", 1313) + WithListenPorts([]int32{1313, 7777}). + WithForwardTarget("coordinator") b, err := EncodeResources(config) require.NoError(err) diff --git a/internal/kuberesource/wrappers.go b/internal/kuberesource/wrappers.go index 100031439..97e1231e9 100644 --- a/internal/kuberesource/wrappers.go +++ b/internal/kuberesource/wrappers.go @@ -92,7 +92,11 @@ type PodConfig struct { // Pod creates a new PodConfig. func Pod(name, namespace string) *PodConfig { - return &PodConfig{applycorev1.Pod(name, namespace)} + p := applycorev1.Pod(name, namespace) + if namespace == "" && p.ObjectMetaApplyConfiguration != nil { + p.ObjectMetaApplyConfiguration.Namespace = nil + } + return &PodConfig{p} } // LabelSelectorConfig wraps applymetav1.LabelSelectorApplyConfiguration.