Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kuberesource: some improvements around the port-forwarder resource #1192

Merged
merged 4 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion internal/kuberesource/mutators.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package kuberesource

import (
"fmt"
"log"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -176,7 +177,11 @@ func AddPortForwarders(resources []any) []any {
for _, resource := range resources {
switch obj := resource.(type) {
case *applycorev1.ServiceApplyConfiguration:
out = append(out, PortForwarderForService(obj))
forwarder, err := PortForwarderForService(obj)
if err != nil {
log.Printf("WARNING: no port forwarder added for service %q: %v", *obj.Name, err)
}
out = append(out, forwarder)
}
out = append(out, resource)
}
Expand Down
86 changes: 30 additions & 56 deletions internal/kuberesource/parts.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,58 +278,27 @@ type PortForwarderConfig struct {
*applycorev1.PodApplyConfiguration
}

// PortForwarder constructs a port forwarder pod.
func PortForwarder(name, namespace string) *PortForwarderConfig {
name = "port-forwarder-" + name

p := Pod(name, namespace).
WithLabels(map[string]string{"app.kubernetes.io/name": name}).
WithSpec(PodSpec().
WithContainers(
Container().
WithName("port-forwarder").
WithImage("ghcr.io/edgelesssys/contrast/port-forwarder:latest").
WithCommand("/bin/bash", "-c", "echo Starting port-forward with socat; exec socat -d -d TCP-LISTEN:${LISTEN_PORT},fork TCP:${FORWARD_HOST}:${FORWARD_PORT}").
WithResources(ResourceRequirements().
WithMemoryLimitAndRequest(50),
),
),
)

return &PortForwarderConfig{p}
}

// WithListenPort sets the port to listen on.
func (p *PortForwarderConfig) WithListenPort(port int32) *PortForwarderConfig {
p.Spec.Containers[0].
WithPorts(
ContainerPort().
WithContainerPort(port),
).
WithEnv(
NewEnvVar("LISTEN_PORT", strconv.Itoa(int(port))),
).
WithStartupProbe(Probe().
WithInitialDelaySeconds(1).
WithPeriodSeconds(1).
WithTCPSocket(TCPSocketAction().
WithPort(intstr.FromInt32(port))),
)
// WithForwardTarget sets the target host to forward to.
func (p *PortForwarderConfig) WithForwardTarget(host string) *PortForwarderConfig {
p.Spec.Containers[0].WithEnv(NewEnvVar("FORWARD_HOST", host))
return p
}

// WithForwardTarget sets the target host and port to forward to.
func (p *PortForwarderConfig) WithForwardTarget(host string, port int32) *PortForwarderConfig {
p.Spec.Containers[0].
WithEnv(
NewEnvVar("FORWARD_HOST", host),
NewEnvVar("FORWARD_PORT", strconv.Itoa(int(port))),
)
return p
const portForwarderScript = `echo Starting port-forward with socat >&2
handler() {
echo "Received SIGTERM, forwarding to children" >&2
kill -TERM -1
}

// PortForwarderMultiplePorts constructs a port forwarder pod for multiple ports.
func PortForwarderMultiplePorts(name, namespace string) *PortForwarderConfig {
trap handler TERM
set -x
for port in ${LISTEN_PORTS}; do
socat -d -d TCP-LISTEN:$port,fork TCP:${FORWARD_HOST}:$port &
done
wait
`

// PortForwarder constructs a port forwarder pod for multiple ports.
func PortForwarder(name, namespace string) *PortForwarderConfig {
name = "port-forwarder-" + name

p := Pod(name, namespace).
Expand All @@ -339,7 +308,7 @@ func PortForwarderMultiplePorts(name, namespace string) *PortForwarderConfig {
Container().
WithName("port-forwarder").
WithImage("ghcr.io/edgelesssys/contrast/port-forwarder:latest").
WithCommand("/bin/bash", "-c", "echo Starting port-forward with socat; for port in ${LISTEN_PORTS}; do socat -d -d TCP-LISTEN:$port,fork TCP:${FORWARD_HOST}:$port & done; wait").
WithCommand("/bin/bash", "-c", portForwarderScript).
WithResources(ResourceRequirements().
WithMemoryLimitAndRequest(50),
),
Expand All @@ -349,7 +318,7 @@ func PortForwarderMultiplePorts(name, namespace string) *PortForwarderConfig {
return &PortForwarderConfig{p}
}

// WithListenPorts sets multiple ports to listen on. Should only be used if PortForwarderMultiplePorts was used initially.
// WithListenPorts sets multiple ports to listen on.
func (p *PortForwarderConfig) WithListenPorts(ports []int32) *PortForwarderConfig {
var containerPorts []*applycorev1.ContainerPortApplyConfiguration
var envVar string
Expand Down Expand Up @@ -493,23 +462,28 @@ func ServiceForStatefulSet(s *applyappsv1.StatefulSetApplyConfiguration) *applyc

// PortForwarderForService creates a Pod that forwards network traffic to the given service.
//
// Port forwarders are named "port-forwarder-SVCNAME" and forward the first port in the ServiceSpec.
func PortForwarderForService(svc *applycorev1.ServiceApplyConfiguration) *applycorev1.PodApplyConfiguration {
// Port forwarders are named "port-forwarder-SVCNAME" and forward all TCP ports in the ServiceSpec.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it less error prone if we error when we encounter UDP ports? On the other hand how do we want to forward the TCP part of something that has TCP and UDP ports. Just thinking out loud, no need to change something here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UDP port forwarding is a bit iffy, and supporting TCP is enough for our use case. However, the implementation as is is just wrong if there is a UDP port, which is why I'm introducing the check.

On the other hand how do we want to forward the TCP part of something that has TCP and UDP ports.

You mean situations where both UDP and TCP need to be reachable for full functionality (like, a DNS server)? Fortunately, we don't need that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that when using the function one might not read the documentation or just doesn't know if the port of a service is UDP or TCP and expecting a port forwarder. to put it another way: I thought about enforcing the comment in code by returning an error when we encounter non-TCP ports.

You mean situations where both UDP and TCP need to be reachable for full functionality (like, a DNS server)? Fortunately, we don't need that.

This was thinking about the consequences of return an error when we encounter non TCP ports. Then one would need to split every service that both has UDP and TCP ports into two in order to export the TCP ports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, it's not like you can kubectl port-forward UDP, and that is the only reason we have this pod. If this were a public library function I'd tend to agree, but I don't think we need to be this careful for an e2e test utility.

That being said, there's a bug in the current implementation which I only discovered because of this thread: if there are only non-TCP ports, the list is empty and the port-forwarder likely crashloop. In that case, returning an error is probably justified.

func PortForwarderForService(svc *applycorev1.ServiceApplyConfiguration) (*applycorev1.PodApplyConfiguration, error) {
namespace := ""
if svc.Namespace != nil {
namespace = *svc.Namespace
}

var ports []int32
for _, port := range svc.Spec.Ports {
ports = append(ports, *port.Port)
if port.Protocol == nil || *port.Protocol == corev1.ProtocolTCP {
ports = append(ports, *port.Port)
}
}
if len(ports) == 0 {
return nil, fmt.Errorf("no TCP ports in service spec")
}

forwarder := PortForwarderMultiplePorts(*svc.Name, namespace).
forwarder := PortForwarder(*svc.Name, namespace).
WithListenPorts(ports).
WithForwardTarget(*svc.Name, -1) // port can be -1 since MultiplePortsForwarder ignores FORWARD_PORT env
WithForwardTarget(*svc.Name)

return forwarder.PodApplyConfiguration
return forwarder.PodApplyConfiguration, nil
}

// Initializer creates a new InitializerConfig.
Expand Down
4 changes: 2 additions & 2 deletions internal/kuberesource/parts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ func TestNewPortForwarder(t *testing.T) {
require := require.New(t)

config := PortForwarder("coordinator", "default").
WithListenPort(1313).
WithForwardTarget("coordinator", 1313)
WithListenPorts([]int32{1313, 7777}).
WithForwardTarget("coordinator")

b, err := EncodeResources(config)
require.NoError(err)
Expand Down
6 changes: 5 additions & 1 deletion internal/kuberesource/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ type PodConfig struct {

// Pod creates a new PodConfig.
func Pod(name, namespace string) *PodConfig {
return &PodConfig{applycorev1.Pod(name, namespace)}
p := applycorev1.Pod(name, namespace)
if namespace == "" && p.ObjectMetaApplyConfiguration != nil {
p.ObjectMetaApplyConfiguration.Namespace = nil
}
return &PodConfig{p}
}

// LabelSelectorConfig wraps applymetav1.LabelSelectorApplyConfiguration.
Expand Down