Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Abstract Node Info #1739

Merged
merged 4 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cmd/kube-router.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (kr *KubeRouter) Run() error {
return fmt.Errorf("failed to create iptables handlers: %v", err)
}
npc, err := netpol.NewNetworkPolicyController(kr.Client,
kr.Config, podInformer, npInformer, nsInformer, &ipsetMutex, iptablesCmdHandlers, ipSetHandlers)
kr.Config, podInformer, npInformer, nsInformer, &ipsetMutex, nil, iptablesCmdHandlers, ipSetHandlers)
if err != nil {
return fmt.Errorf("failed to create network policy controller: %v", err)
}
Expand Down
38 changes: 13 additions & 25 deletions pkg/controllers/netpol/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ var (

// NetworkPolicyController struct to hold information required by NetworkPolicyController
type NetworkPolicyController struct {
nodeHostName string
krNode utils.NodeAware
aauren marked this conversation as resolved.
Show resolved Hide resolved
serviceClusterIPRanges []net.IPNet
serviceExternalIPRanges []net.IPNet
serviceLoadBalancerIPRanges []net.IPNet
Expand All @@ -79,7 +79,6 @@ type NetworkPolicyController struct {
iptablesSaveRestore map[v1core.IPFamily]utils.IPTablesSaveRestorer
filterTableRules map[v1core.IPFamily]*bytes.Buffer
ipSetHandlers map[v1core.IPFamily]utils.IPSetHandler
nodeIPs map[v1core.IPFamily]net.IP

podLister cache.Indexer
npLister cache.Indexer
Expand Down Expand Up @@ -832,7 +831,7 @@ func NewIPTablesHandlers(config *options.KubeRouterConfig) (
func NewNetworkPolicyController(clientset kubernetes.Interface,
config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer,
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer,
ipsetMutex *sync.Mutex,
ipsetMutex *sync.Mutex, linkQ utils.LocalLinkQuerier,
iptablesCmdHandlers map[v1core.IPFamily]utils.IPTablesHandler,
ipSetHandlers map[v1core.IPFamily]utils.IPSetHandler) (*NetworkPolicyController, error) {
npc := NetworkPolicyController{ipsetMutex: ipsetMutex}
Expand Down Expand Up @@ -941,44 +940,33 @@ func NewNetworkPolicyController(clientset kubernetes.Interface,
return nil, err
}

npc.nodeHostName = node.Name

nodeIPv4, nodeIPv6 := utils.GetAllNodeIPs(node)
npc.krNode, err = utils.NewKRNode(node, linkQ, config.EnableIPv4, config.EnableIPv6)
if err != nil {
return nil, err
}

npc.iptablesCmdHandlers = iptablesCmdHandlers
npc.iptablesSaveRestore = make(map[v1core.IPFamily]utils.IPTablesSaveRestorer, 2)
npc.filterTableRules = make(map[v1core.IPFamily]*bytes.Buffer, 2)
npc.ipSetHandlers = ipSetHandlers
npc.nodeIPs = make(map[v1core.IPFamily]net.IP, 2)

if config.EnableIPv4 {
if !npc.krNode.IsIPv4Capable() {
return nil, fmt.Errorf("IPv4 was enabled but no IPv4 address was found on node")
}
klog.V(2).Infof("IPv4 is enabled")
npc.iptablesSaveRestore[v1core.IPv4Protocol] = utils.NewIPTablesSaveRestore(v1core.IPv4Protocol)
var buf bytes.Buffer
npc.filterTableRules[v1core.IPv4Protocol] = &buf
// TODO: assuming that NPC should only use a single IP here is short-sighted, fix it so it considers all IPs
switch {
case len(nodeIPv4[v1core.NodeInternalIP]) > 0:
npc.nodeIPs[v1core.IPv4Protocol] = nodeIPv4[v1core.NodeInternalIP][0]
case len(nodeIPv4[v1core.NodeExternalIP]) > 0:
npc.nodeIPs[v1core.IPv4Protocol] = nodeIPv4[v1core.NodeExternalIP][0]
default:
return nil, fmt.Errorf("IPv4 was enabled but no IPv4 address was found on node")
}
}
if config.EnableIPv6 {
if !npc.krNode.IsIPv6Capable() {
return nil, fmt.Errorf("IPv6 was enabled but no IPv6 address was found on node")
}
klog.V(2).Infof("IPv6 is enabled")
npc.iptablesSaveRestore[v1core.IPv6Protocol] = utils.NewIPTablesSaveRestore(v1core.IPv6Protocol)
var buf bytes.Buffer
npc.filterTableRules[v1core.IPv6Protocol] = &buf
// TODO: assuming that NPC should only use a single IP here is short-sighted, fix it so it considers all IPs
switch {
case len(nodeIPv6[v1core.NodeInternalIP]) > 0:
npc.nodeIPs[v1core.IPv6Protocol] = nodeIPv6[v1core.NodeInternalIP][0]
case len(nodeIPv6[v1core.NodeExternalIP]) > 0:
npc.nodeIPs[v1core.IPv6Protocol] = nodeIPv6[v1core.NodeExternalIP][0]
default:
return nil, fmt.Errorf("IPv6 was enabled but no IPv6 address was found on node")
}
}

npc.podLister = podInformer.GetIndexer()
Expand Down
62 changes: 57 additions & 5 deletions pkg/controllers/netpol/network_policy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/coreos/go-iptables/iptables"
"github.com/vishvananda/netlink"

netv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -207,17 +208,20 @@ func newUneventfulNetworkPolicyController(podInformer cache.SharedIndexInformer,
npc.iptablesSaveRestore = make(map[v1.IPFamily]utils.IPTablesSaveRestorer)
npc.filterTableRules = make(map[v1.IPFamily]*bytes.Buffer)
npc.ipSetHandlers = make(map[v1.IPFamily]utils.IPSetHandler)
npc.nodeIPs = make(map[v1.IPFamily]net.IP)

// TODO: Handle both IP families
npc.iptablesCmdHandlers[v1.IPv4Protocol] = newFakeIPTables(iptables.ProtocolIPv4)
npc.iptablesSaveRestore[v1.IPv4Protocol] = utils.NewIPTablesSaveRestore(v1.IPv4Protocol)
var buf bytes.Buffer
npc.filterTableRules[v1.IPv4Protocol] = &buf
npc.ipSetHandlers[v1.IPv4Protocol] = &fakeIPSet{}
npc.nodeIPs[v1.IPv4Protocol] = net.IPv4(10, 10, 10, 10)

npc.nodeHostName = "node"
krNode := utils.KRNode{
NodeName: "node",
NodeIPv4Addrs: map[v1.NodeAddressType][]net.IP{v1.NodeInternalIP: {net.IPv4(10, 10, 10, 10)}},
}
npc.krNode = &krNode

npc.podLister = podInformer.GetIndexer()
npc.nsLister = nsInformer.GetIndexer()
npc.npLister = npInformer.GetIndexer()
Expand Down Expand Up @@ -754,6 +758,51 @@ func (ips *fakeIPSet) Name(name string) string {
return name
}

type fakeLocalLinkQuerier struct {
links []netlink.Link
addrs []*net.IPNet
}

func newFakeLocalLinkQuerier(addrStrings []string) *fakeLocalLinkQuerier {
links := make([]netlink.Link, len(addrStrings))
for idx := range addrStrings {
linkAttrs := netlink.LinkAttrs{
Index: idx,
}
linkDevice := netlink.Device{LinkAttrs: linkAttrs}
links[idx] = &linkDevice
}
addrs := make([]*net.IPNet, len(addrStrings))
for idx, addr := range addrStrings {
ip := net.ParseIP(addr)
var netMask net.IPMask
if ip.To4() != nil {
netMask = net.CIDRMask(24, 32)
} else {
netMask = net.CIDRMask(64, 128)
}
ipNet := &net.IPNet{
IP: ip,
Mask: netMask,
}
addrs[idx] = ipNet
}
return &fakeLocalLinkQuerier{
links: links,
addrs: addrs,
}
}

func (f *fakeLocalLinkQuerier) LinkList() ([]netlink.Link, error) {
return f.links, nil
}

func (f *fakeLocalLinkQuerier) AddrList(link netlink.Link, family int) ([]netlink.Addr, error) {
addrs := make([]netlink.Addr, 1)
addrs[0] = netlink.Addr{IPNet: f.addrs[link.Attrs().Index]}
return addrs, nil
}

func TestNetworkPolicyController(t *testing.T) {
curHostname, _ := os.Hostname()
testCases := []tNetPolConfigTestCase{
Expand Down Expand Up @@ -939,7 +988,9 @@ func TestNetworkPolicyController(t *testing.T) {
"",
},
}
client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*newFakeNode("node", []string{"10.10.10.10", "2001:0db8:0042:0001:0000:0000:0000:0000"})}})
fakeNodeIPs := []string{"10.10.10.10", "2001:0db8:0042:0001:0000:0000:0000:0000"}
fakeLinkQuerier := newFakeLocalLinkQuerier(fakeNodeIPs)
client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*newFakeNode("node", fakeNodeIPs)}})
_, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client)
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
Expand All @@ -948,7 +999,8 @@ func TestNetworkPolicyController(t *testing.T) {
iptablesHandlers[v1.IPv4Protocol] = newFakeIPTables(iptables.ProtocolIPv4)
ipSetHandlers := make(map[v1.IPFamily]utils.IPSetHandler, 1)
ipSetHandlers[v1.IPv4Protocol] = &fakeIPSet{}
_, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer, &sync.Mutex{}, iptablesHandlers, ipSetHandlers)
_, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer,
&sync.Mutex{}, fakeLinkQuerier, iptablesHandlers, ipSetHandlers)
if err == nil && test.expectError {
t.Error("This config should have failed, but it was successful instead")
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/netpol/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo []

// loop through the pods running on the node
allLocalPods := make(map[string]podInfo)
for _, nodeIP := range npc.nodeIPs {
for _, nodeIP := range npc.krNode.GetNodeIPAddrs() {
npc.getLocalPods(allLocalPods, nodeIP.String())
}
for _, pod := range allLocalPods {
Expand Down
76 changes: 21 additions & 55 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ const (

// NetworkServicesController struct stores information needed by the controller
type NetworkServicesController struct {
primaryIP net.IP
nodeHostName string
krNode utils.NodeAware
syncPeriod time.Duration
mu sync.Mutex
serviceMap serviceInfoMap
Expand Down Expand Up @@ -145,12 +144,8 @@ type NetworkServicesController struct {

iptablesCmdHandlers map[v1.IPFamily]utils.IPTablesHandler
ipSetHandlers map[v1.IPFamily]utils.IPSetHandler
nodeIPv4Addrs map[v1.NodeAddressType][]net.IP
nodeIPv6Addrs map[v1.NodeAddressType][]net.IP
podIPv4CIDRs []string
podIPv6CIDRs []string
isIPv4Capable bool
isIPv6Capable bool

hpc *hairpinController
hpEndpointReceiver chan string
Expand Down Expand Up @@ -294,7 +289,7 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
// Ensure rp_filter=2 for DSR capability, see:
// * https://access.redhat.com/solutions/53031
// * https://github.com/cloudnativelabs/kube-router/pull/1651#issuecomment-2072851683
if nsc.isIPv4Capable {
if nsc.krNode.IsIPv4Capable() {
sysctlErr := utils.SetSysctlSingleTemplate(utils.IPv4ConfRPFilterTemplate, "all", 2)
if sysctlErr != nil {
if sysctlErr.IsFatal() {
Expand Down Expand Up @@ -618,10 +613,10 @@ func (nsc *NetworkServicesController) syncIpvsFirewall() error {

for family, addrs := range addrsMap {
// Don't run for families that we don't support
if family == v1.IPv4Protocol && !nsc.isIPv4Capable {
if family == v1.IPv4Protocol && !nsc.krNode.IsIPv4Capable() {
continue
}
if family == v1.IPv6Protocol && !nsc.isIPv6Capable {
if family == v1.IPv6Protocol && !nsc.krNode.IsIPv6Capable() {
continue
}

Expand Down Expand Up @@ -744,10 +739,10 @@ func (nsc *NetworkServicesController) publishMetrics(serviceInfoMap serviceInfoM
} else {
pushMetric = false
}
case nsc.primaryIP.String():
case nsc.krNode.GetPrimaryNodeIP().String():
if protocol == ipvsSvc.Protocol && uint16(svc.port) == ipvsSvc.Port {
pushMetric = true
svcVip = nsc.primaryIP.String()
svcVip = nsc.krNode.GetPrimaryNodeIP().String()
} else {
pushMetric = false
}
Expand Down Expand Up @@ -1106,7 +1101,7 @@ func (nsc *NetworkServicesController) buildEndpointSliceInfo() endpointSliceInfo
}

for _, addr := range ep.Addresses {
isLocal := ep.NodeName != nil && *ep.NodeName == nsc.nodeHostName
isLocal := ep.NodeName != nil && *ep.NodeName == nsc.krNode.GetNodeName()
endpoints = append(endpoints, endpointSliceInfo{
ip: addr,
port: int(*port.Port),
Expand Down Expand Up @@ -1287,17 +1282,13 @@ func (nsc *NetworkServicesController) syncHairpinIptablesRules() error {
family = v1.IPv4Protocol
familyClusterIPs = clusterIPs[v1.IPv4Protocol]
familyExternalIPs = externalIPs[v1.IPv4Protocol]
//nolint:gocritic // we intend to append to separate maps here
familyNodeIPs = append(nsc.nodeIPv4Addrs[v1.NodeInternalIP],
nsc.nodeIPv4Addrs[v1.NodeExternalIP]...)
familyNodeIPs = nsc.krNode.GetNodeIPv4Addrs()
rulesMap = ipv4RulesNeeded
} else {
family = v1.IPv6Protocol
familyClusterIPs = clusterIPs[v1.IPv6Protocol]
familyExternalIPs = externalIPs[v1.IPv6Protocol]
//nolint:gocritic // we intend to append to separate maps here
familyNodeIPs = append(nsc.nodeIPv6Addrs[v1.NodeInternalIP],
nsc.nodeIPv6Addrs[v1.NodeExternalIP]...)
familyNodeIPs = nsc.krNode.GetNodeIPv6Addrs()
rulesMap = ipv6RulesNeeded
}
if len(familyClusterIPs) < 1 {
Expand Down Expand Up @@ -1336,14 +1327,14 @@ func (nsc *NetworkServicesController) syncHairpinIptablesRules() error {
}

// Cleanup (if needed) and return if there's no hairpin-mode Services
if len(ipv4RulesNeeded) == 0 && nsc.isIPv4Capable {
if len(ipv4RulesNeeded) == 0 && nsc.krNode.IsIPv4Capable() {
klog.V(1).Info("No IPv4 hairpin-mode enabled services found -- no hairpin rules created")
err := nsc.deleteHairpinIptablesRules(v1.IPv4Protocol)
if err != nil {
return fmt.Errorf("error deleting hairpin rules: %v", err)
}
}
if len(ipv6RulesNeeded) == 0 && nsc.isIPv6Capable {
if len(ipv6RulesNeeded) == 0 && nsc.krNode.IsIPv6Capable() {
klog.V(1).Info("No IPv6 hairpin-mode enabled services found -- no hairpin rules created")
err := nsc.deleteHairpinIptablesRules(v1.IPv6Protocol)
if err != nil {
Expand Down Expand Up @@ -1777,7 +1768,7 @@ func (nsc *NetworkServicesController) Cleanup() {
if len(nsc.iptablesCmdHandlers) < 1 {
// Even though we have a config at this point (via passed param), we want to send nil so that the node will
// discover which IP address families it has and act accordingly
err = nsc.setupHandlers(nil, nil)
err = nsc.setupHandlers(nil)
if err != nil {
klog.Errorf("could not cleanup because we couldn't create iptables/ipset command handlers due to: %v", err)
}
Expand Down Expand Up @@ -1936,35 +1927,13 @@ func (nsc *NetworkServicesController) handleServiceDelete(obj interface{}) {

// setupHandlers Here we test to see whether the node is IPv6 capable, if the user has enabled IPv6 (via command-line
// options) and the node has an IPv6 address, the following method will return an IPv6 address
func (nsc *NetworkServicesController) setupHandlers(config *options.KubeRouterConfig, node *v1.Node) error {
// node being nil covers the case where this function is called by something that doesn't have a kube-apiserver
// connection like the cleanup code. In this instance we want all possible iptables and ipset handlers
if node != nil {
nsc.nodeIPv4Addrs, nsc.nodeIPv6Addrs = utils.GetAllNodeIPs(node)
}

// We test for nil configs as the Cleanup() method often doesn't have a valid config in this respect, so rather
// than trying to guess options, it is better to just let the logic fallthrough. For the primary path to this func,
// NewNetworkServicesController, the config will not be nil and we want to check that we have options that match
// the node's capability to ensure sanity later down the road.
if config != nil {
if config.EnableIPv4 && len(nsc.nodeIPv4Addrs[v1.NodeInternalIP]) < 1 &&
len(nsc.nodeIPv4Addrs[v1.NodeExternalIP]) < 1 {
return fmt.Errorf("IPv4 was enabled, but no IPv4 address was found on the node")
}
}
nsc.isIPv4Capable = len(nsc.nodeIPv4Addrs) > 0
if config != nil {
if config.EnableIPv6 && len(nsc.nodeIPv6Addrs[v1.NodeInternalIP]) < 1 &&
len(nsc.nodeIPv6Addrs[v1.NodeExternalIP]) < 1 {
return fmt.Errorf("IPv6 was enabled, but no IPv6 address was found on the node")
}
}
nsc.isIPv6Capable = len(nsc.nodeIPv6Addrs) > 0

func (nsc *NetworkServicesController) setupHandlers(node *v1.Node) error {
nsc.ipSetHandlers = make(map[v1.IPFamily]utils.IPSetHandler)
nsc.iptablesCmdHandlers = make(map[v1.IPFamily]utils.IPTablesHandler)
if node == nil || len(nsc.nodeIPv4Addrs) > 0 {

// node being nil covers the case where this function is called by something that doesn't have a kube-apiserver
// connection like the cleanup code. In this instance we want all possible iptables and ipset handlers
if node == nil || nsc.krNode == nil || nsc.krNode.IsIPv4Capable() {
aauren marked this conversation as resolved.
Show resolved Hide resolved
iptHandler, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
if err != nil {
klog.Fatalf("Failed to allocate IPv4 iptables handler: %v", err)
Expand All @@ -1979,7 +1948,7 @@ func (nsc *NetworkServicesController) setupHandlers(config *options.KubeRouterCo
}
nsc.ipSetHandlers[v1.IPv4Protocol] = ipset
}
if node == nil || len(nsc.nodeIPv6Addrs) > 0 {
if node == nil || nsc.krNode == nil || nsc.krNode.IsIPv6Capable() {
iptHandler, err := iptables.NewWithProtocol(iptables.ProtocolIPv6)
if err != nil {
klog.Fatalf("Failed to allocate IPv6 iptables handler: %v", err)
Expand Down Expand Up @@ -2084,10 +2053,7 @@ func NewNetworkServicesController(clientset kubernetes.Interface,
return nil, err
}

nsc.nodeHostName = node.Name
// We preserve the old logic here for getting the primary IP which is set on nrc.primaryIP. This can be either IPv4
// or IPv6
nsc.primaryIP, err = utils.GetPrimaryNodeIP(node)
nsc.krNode, err = utils.NewKRNode(node, nil, config.EnableIPv4, config.EnableIPv6)
if err != nil {
return nil, err
}
Expand All @@ -2097,12 +2063,12 @@ func NewNetworkServicesController(clientset kubernetes.Interface,
// * Sets nsc.nodeIPv6Addr & nsc.isIPv6Capable
// * Creates the iptables handlers for ipv4 & ipv6
// * Creates the ipset handlers for ipv4 & ipv6
err = nsc.setupHandlers(config, node)
err = nsc.setupHandlers(node)
if err != nil {
return nil, err
}

automtu, err := utils.GetMTUFromNodeIP(nsc.primaryIP)
automtu, err := nsc.krNode.GetNodeMTU()
if err != nil {
return nil, err
}
Expand Down
Loading