From 0f3714b9b758f24de0b1911c148bdba8d87de9b6 Mon Sep 17 00:00:00 2001 From: Aaron U'Ren Date: Sun, 3 Dec 2023 14:38:33 -0600 Subject: [PATCH] fix(hairpin): set hairpin_mode for veth iface It used to be that the kubelet handled setting hairpin mode for us: https://github.com/kubernetes/kubernetes/pull/13628 Then this functionality moved to the dockershim: https://github.com/kubernetes/kubernetes/pull/62212 Then the functionality was removed entirely: https://github.com/kubernetes/kubernetes/commit/83265c9171f Unfortunately, it was lost that we ever depended on this in order for our hairpin implementation to work, if we ever knew it at all. Additionally, I suspect that containerd and cri-o implementations never worked correctly with hairpinning. Without this, the NAT rules that we implement for hairpinning don't work correctly. Because hairpin_mode isn't implemented on the virtual interface of the container on the host, the packet bubbles up to the kube-bridge. At some point in the traffic flow, the route back to the pod gets resolved to the mac address inside the container, at that point, the packet's source mac and destination mac don't match the kube-bridge interface and the packet is black-holed. This can also be fixed by putting the kube-bridge interface into promiscuous mode so that it accepts all mac addresses, but I think that going back to the original functionality of enabling hairpin_mode on the veth interface of the container is likely the lesser of two evils here as putting the kube-bridge interface into promiscuous mode will likely have unintentional consequences. --- pkg/controllers/proxy/hairpin_controller.go | 121 ++++++++ pkg/controllers/proxy/linux_networking.go | 153 ++++----- pkg/controllers/proxy/linux_networking_moq.go | 290 ++++++++++-------- .../proxy/network_services_controller.go | 20 ++ pkg/controllers/proxy/utils.go | 54 +++- pkg/healthcheck/health_controller.go | 17 + 6 files changed, 437 insertions(+), 218 deletions(-) create mode 100644 pkg/controllers/proxy/hairpin_controller.go diff --git a/pkg/controllers/proxy/hairpin_controller.go b/pkg/controllers/proxy/hairpin_controller.go new file mode 100644 index 000000000..06bc372e0 --- /dev/null +++ b/pkg/controllers/proxy/hairpin_controller.go @@ -0,0 +1,121 @@ +package proxy + +import ( + "fmt" + "net" + "os" + "path" + "runtime" + "sync" + "time" + + "github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck" + "github.com/cloudnativelabs/kube-router/v2/pkg/utils" + "github.com/vishvananda/netns" + "k8s.io/klog/v2" +) + +type hairpinController struct { + epC <-chan string + nsc *NetworkServicesController +} + +func (hpc *hairpinController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup, + healthChan chan<- *healthcheck.ControllerHeartbeat) { + defer wg.Done() + klog.Infof("Starting hairping controller (handles setting hairpin_mode for veth interfaces)") + + t := time.NewTicker(healthcheck.HPCSyncPeriod) + defer t.Stop() + for { + // Add an additional non-blocking select to ensure that if the stopCh channel is closed it is handled first + select { + case <-stopCh: + klog.Info("Shutting down Hairpin Controller goroutine") + return + default: + } + select { + case <-stopCh: + klog.Info("Shutting down Hairpin Controller goroutine") + return + case endpointIP := <-hpc.epC: + klog.V(1).Infof("Received request for hairpin setup of endpoint %s, processing", endpointIP) + err := hpc.ensureHairpinEnabledForPodInterface(endpointIP) + if err != nil { + klog.Errorf("unable to set hairpin mode for endpoint %s, its possible that hairpinning will not "+ + "work as expected. Error was: %v", + endpointIP, err) + } + case <-t.C: + healthcheck.SendHeartBeat(healthChan, "HPC") + } + } +} + +func (hpc *hairpinController) ensureHairpinEnabledForPodInterface(endpointIP string) error { + klog.V(2).Infof("Attempting to enable hairpin mode for endpoint IP %s", endpointIP) + crRuntime, containerID, err := hpc.nsc.findContainerRuntimeReferences(endpointIP) + if err != nil { + return err + } + klog.V(2).Infof("Detected runtime %s and container ID %s for endpoint IP %s", crRuntime, containerID, endpointIP) + + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + hostNetworkNSHandle, err := netns.Get() + if err != nil { + return fmt.Errorf("failed to get namespace due to %v", err) + } + defer utils.CloseCloserDisregardError(&hostNetworkNSHandle) + + var pid int + if crRuntime == "docker" { + // WARN: This method is deprecated and will be removed once docker-shim is removed from kubelet. + pid, err = hpc.nsc.ln.getContainerPidWithDocker(containerID) + if err != nil { + return fmt.Errorf("failed to prepare endpoint %s to do direct server return due to %v", + endpointIP, err) + } + } else { + // We expect CRI compliant runtimes here + // ugly workaround, refactoring of pkg/Proxy is required + pid, err = hpc.nsc.ln.getContainerPidWithCRI(hpc.nsc.dsr.runtimeEndpoint, containerID) + if err != nil { + return fmt.Errorf("failed to prepare endpoint %s to do DSR due to: %v", endpointIP, err) + } + } + klog.V(2).Infof("Found PID %d for endpoint IP %s", pid, endpointIP) + + // Get the interface link ID from inside the container so that we can link it to the veth on the host namespace + ifaceID, err := hpc.nsc.ln.findIfaceLinkForPid(pid) + if err != nil { + return fmt.Errorf("failed to find the interface ID inside the container NS for endpoint IP: %s, due to: %v", + endpointIP, err) + } + klog.V(2).Infof("Found Interface Link ID %d for endpoint IP %s", ifaceID, endpointIP) + + ifaceName, err := net.InterfaceByIndex(ifaceID) + if err != nil { + return fmt.Errorf("failed to get the interface name from the link ID inside the container for endpoint IP: "+ + "%s and Interface ID: %d due to: %v", endpointIP, ifaceID, err) + } + + klog.V(1).Infof("Enabling hairpin for interface %s for endpoint IP %s", ifaceName.Name, endpointIP) + hpPath := path.Join(sysFSVirtualNetPath, ifaceName.Name, sysFSHairpinRelPath) + if _, err := os.Stat(hpPath); err != nil { + return fmt.Errorf("hairpin path %s doesn't appear to exist for us to set", hpPath) + } + + return os.WriteFile(hpPath, []byte(hairpinEnable), 0644) +} + +func NewHairpinController(nsc *NetworkServicesController, endpointCh <-chan string) *hairpinController { + hpc := hairpinController{ + nsc: nsc, + epC: endpointCh, + } + + return &hpc +} diff --git a/pkg/controllers/proxy/linux_networking.go b/pkg/controllers/proxy/linux_networking.go index 67a900819..28b118937 100644 --- a/pkg/controllers/proxy/linux_networking.go +++ b/pkg/controllers/proxy/linux_networking.go @@ -6,7 +6,8 @@ import ( "net" "os" "os/exec" - "runtime" + "path" + "strconv" "strings" "syscall" "time" @@ -23,6 +24,18 @@ import ( const ( ipv4NetMaskBits = 32 ipv6NetMaskBits = 128 + + // TODO: it's bad to rely on eth0 here. While this is inside the container's namespace and is determined by the + // container runtime and so far we've been able to count on this being reliably set to eth0, it is possible that + // this may shift sometime in the future with a different runtime. It would be better to find a reliable way to + // determine the interface name from inside the container. + assumedContainerIfaceName = "eth0" + + procFSBasePath = "/proc" + procFSCWDRelPath = "cwd" + sysFSBasePath = "/sys" + sysFSNetClassRelPath = "class/net" + sysFSIfLinkRelPath = "iflink" ) // LinuxNetworking interface contains all linux networking subsystem calls @@ -40,13 +53,14 @@ type linuxNetworking struct { type netlinkCalls interface { ipAddrAdd(iface netlink.Link, ip string, nodeIP string, addRoute bool) error ipAddrDel(iface netlink.Link, ip string, nodeIP string) error - prepareEndpointForDsrWithDocker(containerID string, endpointIP string, vip string) error + getContainerPidWithDocker(containerID string) (int, error) + getContainerPidWithCRI(runtimeEndpoint string, containerID string) (int, error) getKubeDummyInterface() (netlink.Link, error) setupRoutesForExternalIPForDSR(serviceInfo serviceInfoMap, setupIPv4, setupIPv6 bool) error - prepareEndpointForDsrWithCRI(runtimeEndpoint, containerID, endpointIP, vip string) error configureContainerForDSR(vip, endpointIP, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error setupPolicyRoutingForDSR(setupIPv4, setupIPv6 bool) error + findIfaceLinkForPid(pid int) (int, error) } func (ln *linuxNetworking) ipAddrDel(iface netlink.Link, ip string, nodeIP string) error { @@ -553,91 +567,93 @@ func (ln *linuxNetworking) setupRoutesForExternalIPForDSR(serviceInfoMap service return nil } -// This function does the following -// - get the pod corresponding to the endpoint ip -// - get the container id from pod spec -// - from the container id, use docker client to get the pid -// - enter process network namespace and create ipip tunnel -// - add VIP to the tunnel interface -// - disable rp_filter -// WARN: This method is deprecated and will be removed once docker-shim is removed from kubelet. -func (ln *linuxNetworking) prepareEndpointForDsrWithDocker(containerID string, endpointIP string, vip string) error { - - // Its possible switch namespaces may never work safely in GO without hacks. - // https://groups.google.com/forum/#!topic/golang-nuts/ss1gEOcehjk/discussion - // https://www.weave.works/blog/linux-namespaces-and-go-don-t-mix - // Dont know if same issue, but seen namespace issue, so adding - // logs and boilerplate code and verbose logs for diagnosis - - runtime.LockOSThread() - defer runtime.UnlockOSThread() - - var activeNetworkNamespaceHandle netns.NsHandle - - hostNetworkNamespaceHandle, err := netns.Get() - if err != nil { - return fmt.Errorf("failed to get namespace due to %v", err) - } - defer utils.CloseCloserDisregardError(&hostNetworkNamespaceHandle) - - activeNetworkNamespaceHandle, err = netns.Get() - if err != nil { - return fmt.Errorf("failed to get namespace due to %v", err) - } - klog.V(1).Infof("Current network namespace before netns.Set: %s", activeNetworkNamespaceHandle.String()) - defer utils.CloseCloserDisregardError(&activeNetworkNamespaceHandle) - +// getContainerPidWithDocker get the PID for a given docker container ID which allows, among other things, for us to +// enter the network namespace of the pod +func (ln *linuxNetworking) getContainerPidWithDocker(containerID string) (int, error) { dockerClient, err := client.NewClientWithOpts(client.FromEnv) if err != nil { - return fmt.Errorf("failed to get docker client due to %v", err) + return 0, fmt.Errorf("failed to get docker client due to %v", err) } defer utils.CloseCloserDisregardError(dockerClient) containerSpec, err := dockerClient.ContainerInspect(context.Background(), containerID) if err != nil { - return fmt.Errorf("failed to get docker container spec due to %v", err) + return 0, fmt.Errorf("failed to get docker container spec due to %v", err) } - pid := containerSpec.State.Pid - return ln.configureContainerForDSR(vip, endpointIP, containerID, pid, hostNetworkNamespaceHandle) + return containerSpec.State.Pid, nil } -// The same as prepareEndpointForDsr but using CRI instead of docker. -func (ln *linuxNetworking) prepareEndpointForDsrWithCRI(runtimeEndpoint, containerID, endpointIP, vip string) error { - - // It's possible switch namespaces may never work safely in GO without hacks. - // https://groups.google.com/forum/#!topic/golang-nuts/ss1gEOcehjk/discussion - // https://www.weave.works/blog/linux-namespaces-and-go-don-t-mix - // Dont know if same issue, but seen namespace issue, so adding - // logs and boilerplate code and verbose logs for diagnosis - +// getContainerPidWithCRI get the PID for a given compatible CRI (cri-o / containerd / etc.) container ID which allows, +// among other things, for us to enter the network namespace of the pod +func (ln *linuxNetworking) getContainerPidWithCRI(runtimeEndpoint string, containerID string) (int, error) { if runtimeEndpoint == "" { - return fmt.Errorf("runtimeEndpoint is not specified") + return 0, fmt.Errorf("runtimeEndpoint is not specified") } - runtime.LockOSThread() - defer runtime.UnlockOSThread() - - hostNetworkNamespaceHandle, err := netns.Get() - if err != nil { - return fmt.Errorf("failed to get host namespace due to %v", err) - } - klog.V(1).Infof("current network namespace before netns.Set: %s", hostNetworkNamespaceHandle.String()) - defer utils.CloseCloserDisregardError(&hostNetworkNamespaceHandle) - rs, err := cri.NewRemoteRuntimeService(runtimeEndpoint, cri.DefaultConnectionTimeout) if err != nil { - return err + return 0, err } defer utils.CloseCloserDisregardError(rs) info, err := rs.ContainerInfo(containerID) if err != nil { - return err + return 0, err } - pid := info.Pid - return ln.configureContainerForDSR(vip, endpointIP, containerID, pid, hostNetworkNamespaceHandle) + return info.Pid, nil +} + +// findIfaceLinkForPid finds the interface link number inside the network namespace of the passed pid. +// +// It is extremely unfortunate, that we have to go through /proc for this functionality. Ideally, we could use +// unix.Setns to enter the mount namespace for the PID and then just look through the sysfs filesystem to find this +// information. Unfortunately, there appear to be problems doing this in Golang and the only way it appears to work +// correctly is if you know all of the various PIDs you might need to join before the application is launched. +// See the following for more details: +// - https://github.com/golang/go/issues/8676 +// - https://stackoverflow.com/questions/25704661/calling-setns-from-go-returns-einval-for-mnt-namespace +// +// Additionally, we can't us nsenter because we need access to the basic tools that kube-router has on the host and +// we can't guarantee that even basic commands like ls or cat will be available inside the container's NS filesystem. +func (ln *linuxNetworking) findIfaceLinkForPid(pid int) (int, error) { + var ifaceID int + + listAvailableIfaces := func() { + ifacesPath := path.Join(procFSBasePath, strconv.Itoa(pid), procFSCWDRelPath, sysFSBasePath, + sysFSNetClassRelPath) + entries, err := os.ReadDir(ifacesPath) + if err != nil { + klog.Warningf("could not list: %s due to: %v", ifacesPath, entries) + return + } + var sb strings.Builder + for _, e := range entries { + sb.WriteString(e.Name() + " ") + } + klog.Warningf("Able to see the following interfaces: %s", sb.String()) + klog.Warning("If one of the above is not eth0 it is likely, that the assumption that we've hardcoded in " + + "kube-router is wrong, please report this as a bug along with this output") + } + + ifaceSysPath := path.Join(procFSBasePath, strconv.Itoa(pid), procFSCWDRelPath, sysFSBasePath, sysFSNetClassRelPath, + assumedContainerIfaceName, sysFSIfLinkRelPath) + output, err := os.ReadFile(ifaceSysPath) + if err != nil { + listAvailableIfaces() + return ifaceID, fmt.Errorf("unable to read the ifaceID inside the container from %s, output was: %s, error "+ + "was: %v", ifaceSysPath, string(output), err) + } + + ifaceID, err = strconv.Atoi(strings.TrimSuffix(string(output), "\n")) + if ifaceID == 0 || err != nil { + listAvailableIfaces() + return ifaceID, fmt.Errorf("unable to find the ifaceID inside the container from %s, output was: %s, error "+ + "was %v", ifaceSysPath, string(output), err) + } + + return ifaceID, nil } func (ln *linuxNetworking) configureContainerForDSR( @@ -673,6 +689,7 @@ func (ln *linuxNetworking) configureContainerForDSR( containerID, pid, err) } + // This is just for logging, and that is why we close it immediately after getting it activeNetworkNamespaceHandle, err := netns.Get() if err != nil { return fmt.Errorf("failed to get activeNetworkNamespace due to %v", err) @@ -746,11 +763,7 @@ func (ln *linuxNetworking) configureContainerForDSR( sysctlErr.Error()) } - // TODO: it's bad to rely on eth0 here. While this is inside the container's namespace and is determined by the - // container runtime and so far we've been able to count on this being reliably set to eth0, it is possible that - // this may shift sometime in the future with a different runtime. It would be better to find a reliable way to - // determine the interface name from inside the container. - sysctlErr = utils.SetSysctlSingleTemplate(utils.IPv4ConfRPFilterTemplate, "eth0", 0) + sysctlErr = utils.SetSysctlSingleTemplate(utils.IPv4ConfRPFilterTemplate, assumedContainerIfaceName, 0) if sysctlErr != nil { attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) return fmt.Errorf("failed to disable rp_filter on eth0 in the endpoint container: %s", sysctlErr.Error()) diff --git a/pkg/controllers/proxy/linux_networking_moq.go b/pkg/controllers/proxy/linux_networking_moq.go index d0a1f8c4c..940044fc4 100644 --- a/pkg/controllers/proxy/linux_networking_moq.go +++ b/pkg/controllers/proxy/linux_networking_moq.go @@ -24,6 +24,15 @@ var _ LinuxNetworking = &LinuxNetworkingMock{} // configureContainerForDSRFunc: func(vip string, endpointIP string, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error { // panic("mock out the configureContainerForDSR method") // }, +// findIfaceLinkForPidFunc: func(pid int) (int, error) { +// panic("mock out the findIfaceLinkForPid method") +// }, +// getContainerPidWithCRIFunc: func(runtimeEndpoint string, containerID string) (int, error) { +// panic("mock out the getContainerPidWithCRI method") +// }, +// getContainerPidWithDockerFunc: func(containerID string) (int, error) { +// panic("mock out the getContainerPidWithDocker method") +// }, // getKubeDummyInterfaceFunc: func() (netlink.Link, error) { // panic("mock out the getKubeDummyInterface method") // }, @@ -66,12 +75,6 @@ var _ LinuxNetworking = &LinuxNetworkingMock{} // ipvsUpdateServiceFunc: func(ipvsSvc *ipvs.Service) error { // panic("mock out the ipvsUpdateService method") // }, -// prepareEndpointForDsrWithCRIFunc: func(runtimeEndpoint string, containerID string, endpointIP string, vip string) error { -// panic("mock out the prepareEndpointForDsrWithCRI method") -// }, -// prepareEndpointForDsrWithDockerFunc: func(containerID string, endpointIP string, vip string) error { -// panic("mock out the prepareEndpointForDsrWithDocker method") -// }, // setupPolicyRoutingForDSRFunc: func(setupIPv4 bool, setupIPv6 bool) error { // panic("mock out the setupPolicyRoutingForDSR method") // }, @@ -88,6 +91,15 @@ type LinuxNetworkingMock struct { // configureContainerForDSRFunc mocks the configureContainerForDSR method. configureContainerForDSRFunc func(vip string, endpointIP string, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error + // findIfaceLinkForPidFunc mocks the findIfaceLinkForPid method. + findIfaceLinkForPidFunc func(pid int) (int, error) + + // getContainerPidWithCRIFunc mocks the getContainerPidWithCRI method. + getContainerPidWithCRIFunc func(runtimeEndpoint string, containerID string) (int, error) + + // getContainerPidWithDockerFunc mocks the getContainerPidWithDocker method. + getContainerPidWithDockerFunc func(containerID string) (int, error) + // getKubeDummyInterfaceFunc mocks the getKubeDummyInterface method. getKubeDummyInterfaceFunc func() (netlink.Link, error) @@ -130,12 +142,6 @@ type LinuxNetworkingMock struct { // ipvsUpdateServiceFunc mocks the ipvsUpdateService method. ipvsUpdateServiceFunc func(ipvsSvc *ipvs.Service) error - // prepareEndpointForDsrWithCRIFunc mocks the prepareEndpointForDsrWithCRI method. - prepareEndpointForDsrWithCRIFunc func(runtimeEndpoint string, containerID string, endpointIP string, vip string) error - - // prepareEndpointForDsrWithDockerFunc mocks the prepareEndpointForDsrWithDocker method. - prepareEndpointForDsrWithDockerFunc func(containerID string, endpointIP string, vip string) error - // setupPolicyRoutingForDSRFunc mocks the setupPolicyRoutingForDSR method. setupPolicyRoutingForDSRFunc func(setupIPv4 bool, setupIPv6 bool) error @@ -157,6 +163,23 @@ type LinuxNetworkingMock struct { // HostNetworkNamespaceHandle is the hostNetworkNamespaceHandle argument value. HostNetworkNamespaceHandle netns.NsHandle } + // findIfaceLinkForPid holds details about calls to the findIfaceLinkForPid method. + findIfaceLinkForPid []struct { + // Pid is the pid argument value. + Pid int + } + // getContainerPidWithCRI holds details about calls to the getContainerPidWithCRI method. + getContainerPidWithCRI []struct { + // RuntimeEndpoint is the runtimeEndpoint argument value. + RuntimeEndpoint string + // ContainerID is the containerID argument value. + ContainerID string + } + // getContainerPidWithDocker holds details about calls to the getContainerPidWithDocker method. + getContainerPidWithDocker []struct { + // ContainerID is the containerID argument value. + ContainerID string + } // getKubeDummyInterface holds details about calls to the getKubeDummyInterface method. getKubeDummyInterface []struct { } @@ -271,26 +294,6 @@ type LinuxNetworkingMock struct { // IpvsSvc is the ipvsSvc argument value. IpvsSvc *ipvs.Service } - // prepareEndpointForDsrWithCRI holds details about calls to the prepareEndpointForDsrWithCRI method. - prepareEndpointForDsrWithCRI []struct { - // RuntimeEndpoint is the runtimeEndpoint argument value. - RuntimeEndpoint string - // ContainerID is the containerID argument value. - ContainerID string - // EndpointIP is the endpointIP argument value. - EndpointIP string - // Vip is the vip argument value. - Vip string - } - // prepareEndpointForDsrWithDocker holds details about calls to the prepareEndpointForDsrWithDocker method. - prepareEndpointForDsrWithDocker []struct { - // ContainerID is the containerID argument value. - ContainerID string - // EndpointIP is the endpointIP argument value. - EndpointIP string - // Vip is the vip argument value. - Vip string - } // setupPolicyRoutingForDSR holds details about calls to the setupPolicyRoutingForDSR method. setupPolicyRoutingForDSR []struct { // SetupIPv4 is the setupIPv4 argument value. @@ -308,25 +311,26 @@ type LinuxNetworkingMock struct { SetupIPv6 bool } } - lockconfigureContainerForDSR sync.RWMutex - lockgetKubeDummyInterface sync.RWMutex - lockipAddrAdd sync.RWMutex - lockipAddrDel sync.RWMutex - lockipvsAddFWMarkService sync.RWMutex - lockipvsAddServer sync.RWMutex - lockipvsAddService sync.RWMutex - lockipvsDelDestination sync.RWMutex - lockipvsDelService sync.RWMutex - lockipvsGetDestinations sync.RWMutex - lockipvsGetServices sync.RWMutex - lockipvsNewDestination sync.RWMutex - lockipvsNewService sync.RWMutex - lockipvsUpdateDestination sync.RWMutex - lockipvsUpdateService sync.RWMutex - lockprepareEndpointForDsrWithCRI sync.RWMutex - lockprepareEndpointForDsrWithDocker sync.RWMutex - locksetupPolicyRoutingForDSR sync.RWMutex - locksetupRoutesForExternalIPForDSR sync.RWMutex + lockconfigureContainerForDSR sync.RWMutex + lockfindIfaceLinkForPid sync.RWMutex + lockgetContainerPidWithCRI sync.RWMutex + lockgetContainerPidWithDocker sync.RWMutex + lockgetKubeDummyInterface sync.RWMutex + lockipAddrAdd sync.RWMutex + lockipAddrDel sync.RWMutex + lockipvsAddFWMarkService sync.RWMutex + lockipvsAddServer sync.RWMutex + lockipvsAddService sync.RWMutex + lockipvsDelDestination sync.RWMutex + lockipvsDelService sync.RWMutex + lockipvsGetDestinations sync.RWMutex + lockipvsGetServices sync.RWMutex + lockipvsNewDestination sync.RWMutex + lockipvsNewService sync.RWMutex + lockipvsUpdateDestination sync.RWMutex + lockipvsUpdateService sync.RWMutex + locksetupPolicyRoutingForDSR sync.RWMutex + locksetupRoutesForExternalIPForDSR sync.RWMutex } // configureContainerForDSR calls configureContainerForDSRFunc. @@ -377,6 +381,106 @@ func (mock *LinuxNetworkingMock) configureContainerForDSRCalls() []struct { return calls } +// findIfaceLinkForPid calls findIfaceLinkForPidFunc. +func (mock *LinuxNetworkingMock) findIfaceLinkForPid(pid int) (int, error) { + if mock.findIfaceLinkForPidFunc == nil { + panic("LinuxNetworkingMock.findIfaceLinkForPidFunc: method is nil but LinuxNetworking.findIfaceLinkForPid was just called") + } + callInfo := struct { + Pid int + }{ + Pid: pid, + } + mock.lockfindIfaceLinkForPid.Lock() + mock.calls.findIfaceLinkForPid = append(mock.calls.findIfaceLinkForPid, callInfo) + mock.lockfindIfaceLinkForPid.Unlock() + return mock.findIfaceLinkForPidFunc(pid) +} + +// findIfaceLinkForPidCalls gets all the calls that were made to findIfaceLinkForPid. +// Check the length with: +// +// len(mockedLinuxNetworking.findIfaceLinkForPidCalls()) +func (mock *LinuxNetworkingMock) findIfaceLinkForPidCalls() []struct { + Pid int +} { + var calls []struct { + Pid int + } + mock.lockfindIfaceLinkForPid.RLock() + calls = mock.calls.findIfaceLinkForPid + mock.lockfindIfaceLinkForPid.RUnlock() + return calls +} + +// getContainerPidWithCRI calls getContainerPidWithCRIFunc. +func (mock *LinuxNetworkingMock) getContainerPidWithCRI(runtimeEndpoint string, containerID string) (int, error) { + if mock.getContainerPidWithCRIFunc == nil { + panic("LinuxNetworkingMock.getContainerPidWithCRIFunc: method is nil but LinuxNetworking.getContainerPidWithCRI was just called") + } + callInfo := struct { + RuntimeEndpoint string + ContainerID string + }{ + RuntimeEndpoint: runtimeEndpoint, + ContainerID: containerID, + } + mock.lockgetContainerPidWithCRI.Lock() + mock.calls.getContainerPidWithCRI = append(mock.calls.getContainerPidWithCRI, callInfo) + mock.lockgetContainerPidWithCRI.Unlock() + return mock.getContainerPidWithCRIFunc(runtimeEndpoint, containerID) +} + +// getContainerPidWithCRICalls gets all the calls that were made to getContainerPidWithCRI. +// Check the length with: +// +// len(mockedLinuxNetworking.getContainerPidWithCRICalls()) +func (mock *LinuxNetworkingMock) getContainerPidWithCRICalls() []struct { + RuntimeEndpoint string + ContainerID string +} { + var calls []struct { + RuntimeEndpoint string + ContainerID string + } + mock.lockgetContainerPidWithCRI.RLock() + calls = mock.calls.getContainerPidWithCRI + mock.lockgetContainerPidWithCRI.RUnlock() + return calls +} + +// getContainerPidWithDocker calls getContainerPidWithDockerFunc. +func (mock *LinuxNetworkingMock) getContainerPidWithDocker(containerID string) (int, error) { + if mock.getContainerPidWithDockerFunc == nil { + panic("LinuxNetworkingMock.getContainerPidWithDockerFunc: method is nil but LinuxNetworking.getContainerPidWithDocker was just called") + } + callInfo := struct { + ContainerID string + }{ + ContainerID: containerID, + } + mock.lockgetContainerPidWithDocker.Lock() + mock.calls.getContainerPidWithDocker = append(mock.calls.getContainerPidWithDocker, callInfo) + mock.lockgetContainerPidWithDocker.Unlock() + return mock.getContainerPidWithDockerFunc(containerID) +} + +// getContainerPidWithDockerCalls gets all the calls that were made to getContainerPidWithDocker. +// Check the length with: +// +// len(mockedLinuxNetworking.getContainerPidWithDockerCalls()) +func (mock *LinuxNetworkingMock) getContainerPidWithDockerCalls() []struct { + ContainerID string +} { + var calls []struct { + ContainerID string + } + mock.lockgetContainerPidWithDocker.RLock() + calls = mock.calls.getContainerPidWithDocker + mock.lockgetContainerPidWithDocker.RUnlock() + return calls +} + // getKubeDummyInterface calls getKubeDummyInterfaceFunc. func (mock *LinuxNetworkingMock) getKubeDummyInterface() (netlink.Link, error) { if mock.getKubeDummyInterfaceFunc == nil { @@ -911,90 +1015,6 @@ func (mock *LinuxNetworkingMock) ipvsUpdateServiceCalls() []struct { return calls } -// prepareEndpointForDsrWithCRI calls prepareEndpointForDsrWithCRIFunc. -func (mock *LinuxNetworkingMock) prepareEndpointForDsrWithCRI(runtimeEndpoint string, containerID string, endpointIP string, vip string) error { - if mock.prepareEndpointForDsrWithCRIFunc == nil { - panic("LinuxNetworkingMock.prepareEndpointForDsrWithCRIFunc: method is nil but LinuxNetworking.prepareEndpointForDsrWithCRI was just called") - } - callInfo := struct { - RuntimeEndpoint string - ContainerID string - EndpointIP string - Vip string - }{ - RuntimeEndpoint: runtimeEndpoint, - ContainerID: containerID, - EndpointIP: endpointIP, - Vip: vip, - } - mock.lockprepareEndpointForDsrWithCRI.Lock() - mock.calls.prepareEndpointForDsrWithCRI = append(mock.calls.prepareEndpointForDsrWithCRI, callInfo) - mock.lockprepareEndpointForDsrWithCRI.Unlock() - return mock.prepareEndpointForDsrWithCRIFunc(runtimeEndpoint, containerID, endpointIP, vip) -} - -// prepareEndpointForDsrWithCRICalls gets all the calls that were made to prepareEndpointForDsrWithCRI. -// Check the length with: -// -// len(mockedLinuxNetworking.prepareEndpointForDsrWithCRICalls()) -func (mock *LinuxNetworkingMock) prepareEndpointForDsrWithCRICalls() []struct { - RuntimeEndpoint string - ContainerID string - EndpointIP string - Vip string -} { - var calls []struct { - RuntimeEndpoint string - ContainerID string - EndpointIP string - Vip string - } - mock.lockprepareEndpointForDsrWithCRI.RLock() - calls = mock.calls.prepareEndpointForDsrWithCRI - mock.lockprepareEndpointForDsrWithCRI.RUnlock() - return calls -} - -// prepareEndpointForDsrWithDocker calls prepareEndpointForDsrWithDockerFunc. -func (mock *LinuxNetworkingMock) prepareEndpointForDsrWithDocker(containerID string, endpointIP string, vip string) error { - if mock.prepareEndpointForDsrWithDockerFunc == nil { - panic("LinuxNetworkingMock.prepareEndpointForDsrWithDockerFunc: method is nil but LinuxNetworking.prepareEndpointForDsrWithDocker was just called") - } - callInfo := struct { - ContainerID string - EndpointIP string - Vip string - }{ - ContainerID: containerID, - EndpointIP: endpointIP, - Vip: vip, - } - mock.lockprepareEndpointForDsrWithDocker.Lock() - mock.calls.prepareEndpointForDsrWithDocker = append(mock.calls.prepareEndpointForDsrWithDocker, callInfo) - mock.lockprepareEndpointForDsrWithDocker.Unlock() - return mock.prepareEndpointForDsrWithDockerFunc(containerID, endpointIP, vip) -} - -// prepareEndpointForDsrWithDockerCalls gets all the calls that were made to prepareEndpointForDsrWithDocker. -// Check the length with: -// -// len(mockedLinuxNetworking.prepareEndpointForDsrWithDockerCalls()) -func (mock *LinuxNetworkingMock) prepareEndpointForDsrWithDockerCalls() []struct { - ContainerID string - EndpointIP string - Vip string -} { - var calls []struct { - ContainerID string - EndpointIP string - Vip string - } - mock.lockprepareEndpointForDsrWithDocker.RLock() - calls = mock.calls.prepareEndpointForDsrWithDocker - mock.lockprepareEndpointForDsrWithDocker.RUnlock() - return calls -} - // setupPolicyRoutingForDSR calls setupPolicyRoutingForDSRFunc. func (mock *LinuxNetworkingMock) setupPolicyRoutingForDSR(setupIPv4 bool, setupIPv6 bool) error { if mock.setupPolicyRoutingForDSRFunc == nil { diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index 6efe2c674..6146f6221 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -145,6 +145,9 @@ type NetworkServicesController struct { podIPv6CIDRs []string isIPv4Capable bool isIPv6Capable bool + + hpc *hairpinController + hpEndpointReceiver chan string } type ipvsCalls interface { @@ -230,6 +233,9 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control klog.Fatalf("error cleaning up old/bad masquerade rules: %s", err.Error()) } + wg.Add(1) + go nsc.hpc.Run(stopCh, wg, healthChan) + // enable masquerade rule err = nsc.ensureMasqueradeIptablesRule() if err != nil { @@ -1258,6 +1264,17 @@ func (nsc *NetworkServicesController) syncHairpinIptablesRules() error { continue } + // Ensure that hairpin mode is enabled for the virtual interface assigned to the pod behind the endpoint + // IP. + // + // This used to be handled by the kubelet, and then later the functionality was moved to the docker-shim + // but now the docker-shim has been removed, and its possible that it never existed for containerd or + // cri-o so we now ensure that it is handled. + // + // Without this change, the return traffic from a client to a service within the same pod will never + // make it back into the pod's namespace + nsc.hpEndpointReceiver <- ep.ip + // Handle ClusterIP Service hairpinRuleFrom(familyClusterIPs, ep.ip, family, svcInfo.port, rulesMap) @@ -2007,5 +2024,8 @@ func NewNetworkServicesController(clientset kubernetes.Interface, nsc.epSliceLister = epSliceInformer.GetIndexer() nsc.EndpointSliceEventHandler = nsc.newEndpointSliceEventHandler() + nsc.hpEndpointReceiver = make(chan string) + nsc.hpc = NewHairpinController(&nsc, nsc.hpEndpointReceiver) + return &nsc, nil } diff --git a/pkg/controllers/proxy/utils.go b/pkg/controllers/proxy/utils.go index d868810b5..c4c46ccf9 100644 --- a/pkg/controllers/proxy/utils.go +++ b/pkg/controllers/proxy/utils.go @@ -5,6 +5,7 @@ import ( "hash/fnv" "net" "os/exec" + "runtime" "strconv" "strings" "syscall" @@ -21,6 +22,9 @@ import ( const ( interfaceWaitSleepTime = 100 * time.Millisecond + sysFSVirtualNetPath = "/sys/devices/virtual/net" + sysFSHairpinRelPath = "brport/hairpin_mode" + hairpinEnable = "1" ) func attemptNamespaceResetAfterError(hostNSHandle netns.NsHandle) { @@ -186,36 +190,61 @@ func convertSysCallProtoToSvcProto(sysProtocol uint16) string { } } -// addDSRIPInsidePodNetNamespace takes a given external IP and endpoint IP for a DSR service and then uses the container -// runtime to add the external IP to a virtual interface inside the pod so that it can receive DSR traffic inside its -// network namespace. -func (nsc *NetworkServicesController) addDSRIPInsidePodNetNamespace(externalIP, endpointIP string) error { +// findContainerRuntimeReferences find the container runtime and container ID for a given endpoint IP do this by: +// - Resolving the endpoint IP to a pod +// - Ensure that the pod actually exists on the node in question +// - Get the container ID of the primary container (since this function primarily allows us to enter the pod's +// namespace, it doesn't really matter which container we choose here, if this function gets used for something +// else in the future, this might have to be re-evaluated) +func (nsc *NetworkServicesController) findContainerRuntimeReferences(endpointIP string) (string, string, error) { podObj, err := nsc.getPodObjectForEndpoint(endpointIP) if err != nil { - return fmt.Errorf("failed to find endpoint with ip: %s. so skipping preparing endpoint for DSR", + return "", "", fmt.Errorf("failed to find endpoint with ip: %s. so skipping preparing endpoint for DSR", endpointIP) } // we are only concerned with endpoint pod running on current node if strings.Compare(podObj.Status.HostIP, nsc.primaryIP.String()) != 0 { - return nil + return "", "", nil } containerURL := podObj.Status.ContainerStatuses[0].ContainerID runtime, containerID, err := cri.EndpointParser(containerURL) if err != nil { - return fmt.Errorf("couldn't get containerID (container=%s, pod=%s). Skipping DSR endpoint set up", + return "", "", fmt.Errorf("couldn't get containerID (container=%s, pod=%s). Skipping DSR endpoint set up", podObj.Spec.Containers[0].Name, podObj.Name) } if containerID == "" { - return fmt.Errorf("failed to find container id for the endpoint with ip: %s so skipping preparing "+ + return "", "", fmt.Errorf("failed to find container id for the endpoint with ip: %s so skipping preparing "+ "endpoint for DSR", endpointIP) } - if runtime == "docker" { + return runtime, containerID, nil +} + +// addDSRIPInsidePodNetNamespace takes a given external IP and endpoint IP for a DSR service and then uses the container +// runtime to add the external IP to a virtual interface inside the pod so that it can receive DSR traffic inside its +// network namespace. +func (nsc *NetworkServicesController) addDSRIPInsidePodNetNamespace(externalIP, endpointIP string) error { + crRuntime, containerID, err := nsc.findContainerRuntimeReferences(endpointIP) + if err != nil { + return err + } + + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + hostNetworkNamespaceHandle, err := netns.Get() + if err != nil { + return fmt.Errorf("failed to get namespace due to %v", err) + } + defer utils.CloseCloserDisregardError(&hostNetworkNamespaceHandle) + + var pid int + if crRuntime == "docker" { // WARN: This method is deprecated and will be removed once docker-shim is removed from kubelet. - err = nsc.ln.prepareEndpointForDsrWithDocker(containerID, endpointIP, externalIP) + pid, err = nsc.ln.getContainerPidWithDocker(containerID) if err != nil { return fmt.Errorf("failed to prepare endpoint %s to do direct server return due to %v", endpointIP, err) @@ -223,14 +252,13 @@ func (nsc *NetworkServicesController) addDSRIPInsidePodNetNamespace(externalIP, } else { // We expect CRI compliant runtimes here // ugly workaround, refactoring of pkg/Proxy is required - err = nsc.ln.(*linuxNetworking).prepareEndpointForDsrWithCRI(nsc.dsr.runtimeEndpoint, - containerID, endpointIP, externalIP) + pid, err = nsc.ln.getContainerPidWithCRI(nsc.dsr.runtimeEndpoint, containerID) if err != nil { return fmt.Errorf("failed to prepare endpoint %s to do DSR due to: %v", endpointIP, err) } } - return nil + return nsc.ln.configureContainerForDSR(externalIP, endpointIP, containerID, pid, hostNetworkNamespaceHandle) } // getPrimaryAndCIDRsByFamily returns the best primary nodeIP and a slice of all of the relevant podCIDRs based upon a diff --git a/pkg/healthcheck/health_controller.go b/pkg/healthcheck/health_controller.go index b135d64c8..ec7a184ed 100644 --- a/pkg/healthcheck/health_controller.go +++ b/pkg/healthcheck/health_controller.go @@ -12,6 +12,8 @@ import ( ) const ( + HPCStaticSyncInterval = 60 + HPCSyncPeriod = time.Duration(HPCStaticSyncInterval) * time.Second defaultGraceTimeDuration = time.Duration(1500) * time.Millisecond healthControllerTickTime = 5000 * time.Millisecond ) @@ -43,6 +45,8 @@ type HealthStats struct { NetworkRoutingControllerAliveTTL time.Duration NetworkServicesControllerAlive time.Time NetworkServicesControllerAliveTTL time.Duration + HairpinControllerAlive time.Time + HairpinControllerAliveTTL time.Duration } // SendHeartBeat sends a heartbeat on the passed channel @@ -97,12 +101,19 @@ func (hc *HealthController) HandleHeartbeat(beat *ControllerHeartbeat) { hc.Status.LoadBalancerControllerAliveTTL = time.Since(hc.Status.LoadBalancerControllerAlive) } hc.Status.LoadBalancerControllerAlive = beat.LastHeartBeat + case beat.Component == "NSC": if hc.Status.NetworkServicesControllerAliveTTL == 0 { hc.Status.NetworkServicesControllerAliveTTL = time.Since(hc.Status.NetworkServicesControllerAlive) } hc.Status.NetworkServicesControllerAlive = beat.LastHeartBeat + case beat.Component == "HPC": + if hc.Status.HairpinControllerAliveTTL == 0 { + hc.Status.HairpinControllerAliveTTL = time.Since(hc.Status.HairpinControllerAlive) + } + hc.Status.HairpinControllerAlive = beat.LastHeartBeat + case beat.Component == "NRC": if hc.Status.NetworkRoutingControllerAliveTTL == 0 { hc.Status.NetworkRoutingControllerAliveTTL = time.Since(hc.Status.NetworkRoutingControllerAlive) @@ -155,6 +166,11 @@ func (hc *HealthController) CheckHealth() bool { klog.Error("NetworkService Controller heartbeat missed") health = false } + if time.Since(hc.Status.HairpinControllerAlive) > + HPCSyncPeriod+hc.Status.HairpinControllerAliveTTL+graceTime { + klog.Error("Hairpin Controller heartbeat missed") + health = false + } } if hc.Config.MetricsEnabled { @@ -225,6 +241,7 @@ func (hc *HealthController) SetAlive() { hc.Status.NetworkPolicyControllerAlive = now hc.Status.NetworkRoutingControllerAlive = now hc.Status.NetworkServicesControllerAlive = now + hc.Status.HairpinControllerAlive = now } // NewHealthController creates a new health controller and returns a reference to it