From b670656a096a6bd26688d79974b453404a3deb5f Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Fri, 22 Jul 2022 11:06:11 +0530 Subject: [PATCH 1/2] update ipvs proxier to update realserver weights at startup Update the IPVS proxier to have a bool `initialSync` which is set to true when a new proxier is initialized and then set to false on all syncs. This lets us run startup-only logic, which subsequently lets us update the realserver only when needed and avoiding any expensive operations. Signed-off-by: Sanskar Jaiswal --- pkg/proxy/ipvs/proxier.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index be72cc9ce2f7d..1dcb1830dd1ac 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -223,6 +223,13 @@ type Proxier struct { serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap nodeLabels map[string]string + // initialSync is a bool indicating if the proxier is syncing for the first time. + // It is set to true when a new proxier is initialized and then set to false on all + // future syncs. + // This lets us run specific logic that's required only during proxy startup. + // For eg: it enables us to update weights of existing destinations only on startup + // saving us the cost of querying and updating real servers during every sync. + initialSync bool // endpointSlicesSynced, and servicesSynced are set to true when // corresponding objects are synced after startup. This is used to avoid updating // ipvs rules with some partial data after kube-proxy restart. @@ -468,6 +475,7 @@ func NewProxier(ipt utiliptables.Interface, serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil), + initialSync: true, syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, excludeCIDRs: parsedExcludeCIDRs, @@ -1010,6 +1018,12 @@ func (proxier *Proxier) syncProxyRules() { return } + // its safe to set initialSync to false as it acts as a flag for startup actions + // and the mutex is held. + defer func() { + proxier.initialSync = false + }() + // Keep track of how long syncs take. start := time.Now() defer func() { @@ -2007,6 +2021,19 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode } if curEndpoints.Has(ep) { + // if we are syncing for the first time, loop through all current destinations and + // reset their weight. + if proxier.initialSync { + for _, dest := range curDests { + if dest.Weight != newDest.Weight { + err = proxier.ipvs.UpdateRealServer(appliedVirtualServer, newDest) + if err != nil { + klog.ErrorS(err, "Failed to update destination", "newDest", newDest) + continue + } + } + } + } // check if newEndpoint is in gracefulDelete list, if true, delete this ep immediately uniqueRS := GetUniqueRSName(vs, newDest) if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) { @@ -2025,6 +2052,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode continue } } + // Delete old endpoints for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() { // if curEndpoint is in gracefulDelete, skip From 8b5f263cd3602e588bfe3089f67753e0759d9195 Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Fri, 26 Aug 2022 07:44:01 +0000 Subject: [PATCH 2/2] add tests for initialSync usage in syncEndpoint Signed-off-by: Sanskar Jaiswal --- pkg/proxy/ipvs/proxier_test.go | 91 ++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 1cc9b65596d62..1358442921202 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -178,6 +178,15 @@ func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) { proxier.servicesSynced = true } +func makeEndpointSliceMap(proxier *Proxier, allEpSlices ...*discovery.EndpointSlice) { + for i := range allEpSlices { + proxier.OnEndpointSliceAdd(allEpSlices[i]) + } + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.endpointSlicesSynced = true +} + func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service { svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -1379,6 +1388,88 @@ func TestNodePortIPv6(t *testing.T) { } } +func Test_syncEndpoint_updateWeightsOnRestart(t *testing.T) { + tcpProtocol := v1.ProtocolTCP + + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) + + svc1 := makeTestService("ns1", "svc1", func(svc *v1.Service) { + svc.Spec.ClusterIP = "10.20.30.41" + svc.Spec.Ports = []v1.ServicePort{{ + Name: "p80", + Port: int32(80), + Protocol: v1.ProtocolTCP, + }} + }) + epSlice1 := makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"10.180.0.1"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.StringPtr("p80"), + Port: pointer.Int32(80), + Protocol: &tcpProtocol, + }} + }) + + // sync proxy rules to get to the desired initial state + makeServiceMap(fp, svc1) + makeEndpointSliceMap(fp, epSlice1) + fp.syncProxyRules() + + serv := &utilipvs.VirtualServer{ + Address: netutils.ParseIPSloppy("10.20.30.41"), + Port: uint16(80), + Protocol: string(tcpProtocol), + Scheduler: fp.ipvsScheduler, + } + + vs, err := fp.ipvs.GetVirtualServer(serv) + if err != nil { + t.Errorf("failed to get virtual server, err: %v", err) + } + + rss, err := fp.ipvs.GetRealServers(vs) + if err != nil { + t.Errorf("failed to get real servers, err: %v", err) + } + for _, rs := range rss { + rs.Weight = 0 + if err = fp.ipvs.UpdateRealServer(vs, rs); err != nil { + t.Errorf("failed to update real server: %v, err: %v", rs, err) + } + } + + // simulate a restart by enabling initial sync logic. + fp.initialSync = true + err = fp.syncEndpoint(proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Name: "svc1", + Namespace: "ns1", + }, + Port: "80", + Protocol: tcpProtocol, + }, true, vs) + if err != nil { + t.Errorf("failed to sync endpoint, err: %v", err) + } + + rss, err = fp.ipvs.GetRealServers(vs) + if err != nil { + t.Errorf("failed to get real server, err: %v", err) + } + for _, rs := range rss { + if rs.Weight != 1 { + t.Logf("unexpected realserver weight: %d, expected weight: 1", rs.Weight) + t.Errorf("unexpected realserver state") + } + } +} + func TestIPv4Proxier(t *testing.T) { tcpProtocol := v1.ProtocolTCP tests := []struct {