Skip to content

Commit

Permalink
Merge refs/heads/main into sam/deployer-test-race
Browse files Browse the repository at this point in the history
  • Loading branch information
soloio-bulldozer[bot] authored Jun 20, 2024
2 parents f9d5fa2 + 4a6bc1c commit fedc46e
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 42 deletions.
11 changes: 11 additions & 0 deletions changelog/v1.18.0-beta1/fix-proxy-status-sync.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
changelog:
- type: FIX
issueLink: https://github.com/solo-io/solo-projects/issues/6304
resolvesIssue: true
description: >-
Fix statuses being synced properly for k8s gateway resources
- type: FIX
issueLink: https://github.com/solo-io/solo-projects/issues/6107
resolvesIssue: true
description: >-
Follow up to fix discoveryAddress, istioMetaMeshId and istioMetaClusterId in k8s gateway deployment for Istio integration.
11 changes: 6 additions & 5 deletions projects/gateway2/deployer/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,7 @@ type helmAutoscaling struct {
}

type helmIstio struct {
Enabled *bool `json:"enabled,omitempty"`
IstioDiscoveryAddress *string `json:"istioDiscoveryAddress,omitempty"`
IstioMetaMeshId *string `json:"istioMetaMeshId,omitempty"`
IstioMetaClusterId *string `json:"istioMetaClusterId,omitempty"`
Enabled *bool `json:"enabled,omitempty"`
}

type helmSdsContainer struct {
Expand All @@ -115,9 +112,13 @@ type sdsBootstrap struct {
type helmIstioContainer struct {
Image *helmImage `json:"image,omitempty"`
LogLevel *string `json:"logLevel,omitempty"`
// Note: This is set by envoySidecarResources in helm chart

Resources *v1alpha1kube.ResourceRequirements `json:"resources,omitempty"`
SecurityContext *extcorev1.SecurityContext `json:"securityContext,omitempty"`

IstioDiscoveryAddress *string `json:"istioDiscoveryAddress,omitempty"`
IstioMetaMeshId *string `json:"istioMetaMeshId,omitempty"`
IstioMetaClusterId *string `json:"istioMetaClusterId,omitempty"`
}

type helmServiceAccount struct {
Expand Down
16 changes: 8 additions & 8 deletions projects/gateway2/deployer/values_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,13 @@ func getIstioContainerValues(istioContainerConfig *v1alpha1.IstioContainer) *hel
}

return &helmIstioContainer{
Image: istioImage,
LogLevel: ptr.To(istioContainerConfig.GetLogLevel().GetValue()),
Resources: istioContainerConfig.GetResources(),
SecurityContext: istioContainerConfig.GetSecurityContext(),
Image: istioImage,
LogLevel: ptr.To(istioContainerConfig.GetLogLevel().GetValue()),
Resources: istioContainerConfig.GetResources(),
SecurityContext: istioContainerConfig.GetSecurityContext(),
IstioDiscoveryAddress: ptr.To(istioContainerConfig.GetIstioDiscoveryAddress().GetValue()),
IstioMetaMeshId: ptr.To(istioContainerConfig.GetIstioMetaMeshId().GetValue()),
IstioMetaClusterId: ptr.To(istioContainerConfig.GetIstioMetaClusterId().GetValue()),
}
}

Expand All @@ -152,10 +155,7 @@ func getIstioValues(istioValues bootstrap.IstioValues, istioConfig *v1alpha1.Ist
}

return &helmIstio{
Enabled: ptr.To(istioValues.IntegrationEnabled),
IstioDiscoveryAddress: ptr.To(istioConfig.GetIstioProxyContainer().GetIstioDiscoveryAddress().GetValue()),
IstioMetaMeshId: ptr.To(istioConfig.GetIstioProxyContainer().GetIstioMetaMeshId().GetValue()),
IstioMetaClusterId: ptr.To(istioConfig.GetIstioProxyContainer().GetIstioMetaClusterId().GetValue()),
Enabled: ptr.To(istioValues.IntegrationEnabled),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,14 @@ spec:
- name: PILOT_CERT_PROVIDER
value: istiod
- name: CA_ADDR
value: "istiod.istio-system.svc:15012" # TODO: Configurable istioDiscoveryAddress
value: {{ $gateway.istioContainer.istioDiscoveryAddress }}
- name: ISTIO_META_MESH_ID
value: "cluster.local" # TODO: Configurable istioMetaMeshId
value: {{ $gateway.istioContainer.istioMetaMeshId }}
- name: ISTIO_META_CLUSTER_ID
value: "Kubernetes" # TODO: Configurable istioMetaClusterId
value: {{ $gateway.istioContainer.istioMetaClusterId }}
- name: PROXY_CONFIG
# TODO: Configurable istioDiscoveryAddress
value: |
{"discoveryAddress": "istiod.istio-system.svc:15012" }
{"discoveryAddress": {{ $gateway.istioContainer.istioDiscoveryAddress }} }
- name: POD_NAME
valueFrom:
fieldRef:
Expand Down
4 changes: 2 additions & 2 deletions projects/gateway2/proxy_syncer/proxy_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

// QueueStatusForProxiesFn queues a list of proxies to be synced and the plugin registry that produced them for a given sync iteration
type QueueStatusForProxiesFn func(proxies gloo_solo_io.ProxyList, pluginRegistry *registry.PluginRegistry, totalSyncCount int)
type QueueStatusForProxiesFn func(ctx context.Context, proxies gloo_solo_io.ProxyList, pluginRegistry *registry.PluginRegistry, totalSyncCount int)

// ProxySyncer is responsible for translating Kubernetes Gateway CRs into Gloo Proxies
// and syncing the proxyClient with the newly translated proxies.
Expand Down Expand Up @@ -152,7 +152,7 @@ func (s *ProxySyncer) Start(ctx context.Context) error {
TranslatedGateways: translatedGateways,
})

s.queueStatusForProxies(proxies, &pluginRegistry, totalResyncs)
s.queueStatusForProxies(ctx, proxies, &pluginRegistry, totalResyncs)
s.syncStatus(ctx, rm, gwl)
s.syncRouteStatus(ctx, rm)
s.reconcileProxies(ctx, proxies)
Expand Down
47 changes: 35 additions & 12 deletions projects/gateway2/status/status_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var _ proxy_syncer.QueueStatusForProxiesFn = (&statusSyncerFactory{}).QueueStatu
// GatewayStatusSyncer is responsible for applying status plugins to Gloo Gateway proxies
type GatewayStatusSyncer interface {
QueueStatusForProxies(
ctx context.Context,
proxiesToQueue v1.ProxyList,
pluginRegistry *registry.PluginRegistry,
totalSyncCount int,
Expand All @@ -42,30 +43,40 @@ type statusSyncerFactory struct {
// maps a proxy to the sync iteration that produced it
// only the latest sync iteration is stored and used to apply status plugins
resyncsPerProxy map[types.NamespacedName]int
// proxies left to sync
resyncsPerIteration map[int][]types.NamespacedName

lock *sync.Mutex
}

func NewStatusSyncerFactory() GatewayStatusSyncer {
return &statusSyncerFactory{
registryPerSync: make(map[int]*registry.PluginRegistry),
resyncsPerProxy: make(map[types.NamespacedName]int),
lock: &sync.Mutex{},
registryPerSync: make(map[int]*registry.PluginRegistry),
resyncsPerProxy: make(map[types.NamespacedName]int),
resyncsPerIteration: make(map[int][]types.NamespacedName),
lock: &sync.Mutex{},
}
}

// QueueStatusForProxies queues the proxies to be synced and plugin registry for the given sync iteration
func (f *statusSyncerFactory) QueueStatusForProxies(
ctx context.Context,
proxiesToQueue v1.ProxyList,
pluginRegistry *registry.PluginRegistry,
totalSyncCount int,
) {
f.lock.Lock()
defer f.lock.Unlock()

contextutils.LoggerFrom(ctx).Debugf("queueing %v proxies for sync %d", len(proxiesToQueue), totalSyncCount)

// queue each proxy for a given sync iteration
for _, proxy := range proxiesToQueue {
// overwrite the sync count for the proxy with the most recent sync count
f.resyncsPerProxy[getProxyNameNamespace(proxy)] = totalSyncCount

// keep track of proxies to check all proxies are handled in debugger
f.resyncsPerIteration[totalSyncCount] = append(f.resyncsPerIteration[totalSyncCount], getProxyNameNamespace(proxy))
}
// the plugin registry that produced the proxies is the same for all proxies in a given sync
f.registryPerSync[totalSyncCount] = pluginRegistry
Expand All @@ -77,9 +88,11 @@ func (f *statusSyncerFactory) HandleProxyReports(ctx context.Context, proxiesWit
f.lock.Lock()
defer f.lock.Unlock()

contextutils.LoggerFrom(ctx).Debugf("handling proxy reports for %v proxies", len(proxiesWithReports))

proxiesToReport := make(map[int][]translatorutils.ProxyWithReports)
var proxySyncCount int
for _, proxyWithReport := range filterProxiesByControllerName(proxiesWithReports) {
var proxySyncCount int
// Get the sync iteration that produced the proxy from the proxy metadata
if proxyWithReport.Proxy.GetMetadata().GetAnnotations() != nil {
if syncId, ok := proxyWithReport.Proxy.GetMetadata().GetAnnotations()[utils.ProxySyncId]; ok {
Expand All @@ -91,8 +104,21 @@ func (f *statusSyncerFactory) HandleProxyReports(ctx context.Context, proxiesWit
// if the proxySyncCount saved in the statusSyncer for a given proxy is higher (i.e. newer) than the syncCount
// on the proxy metadata, then continue because this report iteration is for an older sync which we no longer care about
if f.resyncsPerProxy[proxyKey] > proxySyncCount {
// old proxy was garbage collected, expect a future resync
// old proxy was garbage collected, expect a future re-sync
continue
}

if f.resyncsPerIteration[proxySyncCount] == nil {
// re-sync already happened, nothing to do
continue
} else {
updatedList := make([]types.NamespacedName, 0)
for _, proxyNameNs := range f.resyncsPerIteration[proxySyncCount] {
if proxyNameNs != proxyKey {
updatedList = append(updatedList, proxyNameNs)
}
}
f.resyncsPerIteration[proxySyncCount] = updatedList
}

proxiesToReport[proxySyncCount] = append(proxiesToReport[proxySyncCount], proxyWithReport)
Expand All @@ -103,15 +129,12 @@ func (f *statusSyncerFactory) HandleProxyReports(ctx context.Context, proxiesWit
for syncCount, proxies := range proxiesToReport {
if plugins, ok := f.registryPerSync[syncCount]; ok {
newStatusSyncer(plugins).applyStatusPlugins(ctx, proxies)
} else {
// this should never happen
contextutils.LoggerFrom(ctx).DPanicf("no registry found for proxy sync count %d", syncCount)
}
}

// reinitialize the registry if there are no more proxies for the sync iteration
if len(f.resyncsPerProxy) == 0 {
f.registryPerSync = make(map[int]*registry.PluginRegistry)
// If there are no more proxies for the sync iteration, delete the sync count
if len(f.resyncsPerIteration) == 0 {
delete(f.registryPerSync, syncCount)
}
}
}

Expand Down
21 changes: 11 additions & 10 deletions projects/gateway2/status/status_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ var _ = Describe("Status Syncer", func() {

proxiesToQueue := v1.ProxyList{proxyOne, proxyTwo}
pluginRegistry := &registry.PluginRegistry{}
ctx := context.Background()

// Test QueueStatusForProxies method
syncer.QueueStatusForProxies(proxiesToQueue, pluginRegistry, 123)
syncer.QueueStatusForProxies(ctx, proxiesToQueue, pluginRegistry, 123)

// Queue the proxy (this is invoked in the proxy syncer)
proxiesMap := syncer.(*statusSyncerFactory).resyncsPerProxy
Expand All @@ -78,7 +79,6 @@ var _ = Describe("Status Syncer", func() {
},
},
}
ctx := context.Background()
syncer.HandleProxyReports(ctx, proxyOneWithReports)

// Ensure proxy one has been removed from the queue after handling reports, but proxy two is still present
Expand Down Expand Up @@ -145,9 +145,10 @@ var _ = Describe("Status Syncer", func() {

proxiesToQueue := v1.ProxyList{proxyOne, proxyTwo}
pluginRegistry := &registry.PluginRegistry{}
ctx := context.Background()

// Test QueueStatusForProxies method
syncer.QueueStatusForProxies(proxiesToQueue, pluginRegistry, 123)
syncer.QueueStatusForProxies(ctx, proxiesToQueue, pluginRegistry, 123)

// Queue the proxy (this is invoked in the proxy syncer)
proxiesMap := syncer.(*statusSyncerFactory).resyncsPerProxy
Expand All @@ -173,7 +174,6 @@ var _ = Describe("Status Syncer", func() {
},
},
}
ctx := context.Background()
syncer.HandleProxyReports(ctx, proxiesWithReports)

// Ensure both proxies are removed from the queue after handling reports
Expand Down Expand Up @@ -235,11 +235,12 @@ var _ = Describe("Status Syncer", func() {

proxiesToQueue125 := v1.ProxyList{newProxy}
pluginRegistry125 := &registry.PluginRegistry{}
ctx := context.Background()

// Each proxy is queued with a different registry per sync iteration
syncer.QueueStatusForProxies(proxiesToQueue123, pluginRegistry123, 123)
syncer.QueueStatusForProxies(proxiesToQueue124, pluginRegistry124, 124)
syncer.QueueStatusForProxies(proxiesToQueue125, pluginRegistry125, 125)
syncer.QueueStatusForProxies(ctx, proxiesToQueue123, pluginRegistry123, 123)
syncer.QueueStatusForProxies(ctx, proxiesToQueue124, pluginRegistry124, 124)
syncer.QueueStatusForProxies(ctx, proxiesToQueue125, pluginRegistry125, 125)

// Queue the proxy (this is invoked in the proxy syncer)
proxiesMap := syncer.(*statusSyncerFactory).resyncsPerProxy
Expand All @@ -258,7 +259,6 @@ var _ = Describe("Status Syncer", func() {
},
},
}
ctx := context.Background()
syncer.HandleProxyReports(ctx, oldProxiesWithReports)

// Ensure only the latest proxy is still present
Expand All @@ -284,7 +284,8 @@ var _ = Describe("Status Syncer", func() {
// ensure all proxies are removed from the queue
Expect(proxiesMap).To(BeEmpty())
registryMap = syncer.(*statusSyncerFactory).registryPerSync
// ensure registry is cleared for all sync iterations
Expect(registryMap).To(BeEmpty())
Expect(registryMap).ToNot(BeEmpty())
// ensure registry is only cleared for processed sync iteration
Expect(registryMap).To(And(HaveKey(123), HaveKey(124)))
})
})

0 comments on commit fedc46e

Please sign in to comment.