Skip to content

Commit

Permalink
cleanup and reorg (#10434)
Browse files Browse the repository at this point in the history
  • Loading branch information
lgadban authored Jan 13, 2025
1 parent 57783da commit 8e35c9f
Show file tree
Hide file tree
Showing 14 changed files with 137 additions and 92 deletions.
3 changes: 2 additions & 1 deletion projects/gateway2/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,13 @@ func NewControllerBuilder(ctx context.Context, cfg StartConfig) (*ControllerBuil
cfg.SetupOpts.Cache,
)
proxySyncer.Init(ctx, isOurGw, cfg.KrtOptions)

if err := mgr.Add(proxySyncer); err != nil {
setupLog.Error(err, "unable to add proxySyncer runnable")
return nil, err
}
setupLog.Info("starting controller builder", "GatewayClasses", sets.List(gwClasses))

setupLog.Info("starting controller builder", "GatewayClasses", sets.List(gwClasses))
return &ControllerBuilder{
proxySyncer: proxySyncer,
cfg: cfg,
Expand Down
8 changes: 6 additions & 2 deletions projects/gateway2/extensions2/plugins/kubernetes/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ func NewPlugin(ctx context.Context, commoncol *common.CommonCollections) extensi
return NewPluginFromCollections(ctx, commoncol.KrtOpts, commoncol.Settings, commoncol.Pods, services, endpointSlices)
}

func NewPluginFromCollections(ctx context.Context, krtOpts krtutil.KrtOptions,
func NewPluginFromCollections(
ctx context.Context,
krtOpts krtutil.KrtOptions,
settings krt.Singleton[glookubev1.Settings],
pods krt.Collection[krtcollections.LocalityPod],
services krt.Collection[*corev1.Service], endpointSlices krt.Collection[*discoveryv1.EndpointSlice]) extensionsplug.Plugin {
services krt.Collection[*corev1.Service],
endpointSlices krt.Collection[*discoveryv1.EndpointSlice],
) extensionsplug.Plugin {
gk := schema.GroupKind{
Group: corev1.GroupName,
Kind: "Service",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,24 @@ type routeOptsPluginGwPass struct {
}

func NewPlugin(ctx context.Context, commoncol *common.CommonCollections) extensionplug.Plugin {

col := krtutil.SetupCollectionDynamic[v1alpha1.RoutePolicy](
ctx,
commoncol.Client,
v1alpha1.SchemeGroupVersion.WithResource("routepolicies"),
commoncol.KrtOpts.ToOptions("RoutePolicy")...,
)
gk := v1alpha1.RoutePolicyGVK.GroupKind()
policyCol := krt.NewCollection(col, func(krtctx krt.HandlerContext, i *v1alpha1.RoutePolicy) *ir.PolicyWrapper {
policyCol := krt.NewCollection(col, func(krtctx krt.HandlerContext, policyCR *v1alpha1.RoutePolicy) *ir.PolicyWrapper {
var pol = &ir.PolicyWrapper{
ObjectSource: ir.ObjectSource{
Group: gk.Group,
Kind: gk.Kind,
Namespace: i.Namespace,
Name: i.Name,
Namespace: policyCR.Namespace,
Name: policyCR.Name,
},
Policy: i,
PolicyIR: &routeOptsPlugin{ct: i.CreationTimestamp.Time, spec: i.Spec},
TargetRefs: convert(i.Spec.TargetRef),
Policy: policyCR,
PolicyIR: &routeOptsPlugin{ct: policyCR.CreationTimestamp.Time, spec: policyCR.Spec},
TargetRefs: convert(policyCR.Spec.TargetRef),
}
return pol
})
Expand Down
62 changes: 43 additions & 19 deletions projects/gateway2/krtcollections/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ type UpstreamIndex struct {
krtopts krtutil.KrtOptions
}

func NewUpstreamIndex(krtopts krtutil.KrtOptions, backendRefExtension []extensionsplug.GetBackendForRefPlugin, policies *PolicyIndex) *UpstreamIndex {
func NewUpstreamIndex(
krtopts krtutil.KrtOptions,
backendRefExtension []extensionsplug.GetBackendForRefPlugin,
policies *PolicyIndex,
) *UpstreamIndex {
return &UpstreamIndex{
policies: policies,
availableUpstreams: map[schema.GroupKind]krt.Collection[ir.Upstream]{},
Expand Down Expand Up @@ -70,7 +74,8 @@ func (ui *UpstreamIndex) Upstreams() []krt.Collection[ir.Upstream] {

func (ui *UpstreamIndex) AddUpstreams(gk schema.GroupKind, col krt.Collection[ir.Upstream]) {
ucol := krt.NewCollection(col, func(kctx krt.HandlerContext, u ir.Upstream) *ir.Upstream {
u.AttachedPolicies = toAttachedPolicies(ui.policies.getTargetingPolicies(kctx, extensionsplug.UpstreamAttachmentPoint, u.ObjectSource, ""))
policies := ui.policies.getTargetingPolicies(kctx, extensionsplug.UpstreamAttachmentPoint, u.ObjectSource, "")
u.AttachedPolicies = toAttachedPolicies(policies)
return &u
}, ui.krtopts.ToOptions("")...)
ui.availableUpstreams[gk] = ucol
Expand Down Expand Up @@ -145,7 +150,12 @@ type GatewayIndex struct {
Gateways krt.Collection[ir.Gateway]
}

func NewGatewayIndex(krtopts krtutil.KrtOptions, isOurGw func(gw *gwv1.Gateway) bool, policies *PolicyIndex, gws krt.Collection[*gwv1.Gateway]) *GatewayIndex {
func NewGatewayIndex(
krtopts krtutil.KrtOptions,
isOurGw func(gw *gwv1.Gateway) bool,
policies *PolicyIndex,
gws krt.Collection[*gwv1.Gateway],
) *GatewayIndex {
h := &GatewayIndex{policies: policies}
h.Gateways = krt.NewCollection(gws, func(kctx krt.HandlerContext, i *gwv1.Gateway) *ir.Gateway {
if !isOurGw(i) {
Expand Down Expand Up @@ -200,6 +210,7 @@ type PolicyIndex struct {

hasSyncedFuncs []func() bool
}
type policyFetcherMap = map[schema.GroupKind]func(n string, ns string) ir.PolicyIR

func (h *PolicyIndex) HasSynced() bool {
for _, f := range h.hasSyncedFuncs {
Expand All @@ -211,20 +222,23 @@ func (h *PolicyIndex) HasSynced() bool {
}

func NewPolicyIndex(krtopts krtutil.KrtOptions, contributesPolicies extensionsplug.ContributesPolicies) *PolicyIndex {

h := &PolicyIndex{policiesFetch: map[schema.GroupKind]func(n string, ns string) ir.PolicyIR{}}
h := &PolicyIndex{policiesFetch: policyFetcherMap{}}

var policycols []krt.Collection[ir.PolicyWrapper]
for gk, ext := range contributesPolicies {
if ext.Policies != nil {
policycols = append(policycols, ext.Policies)
h.hasSyncedFuncs = append(h.hasSyncedFuncs, ext.Policies.Synced().HasSynced)
}
if ext.PoliciesFetch != nil {
h.policiesFetch[gk] = ext.PoliciesFetch
}
if ext.GlobalPolicies != nil {
h.globalPolicies = append(h.globalPolicies, globalPolicy{GroupKind: gk, ir: ext.GlobalPolicies, points: ext.AttachmentPoints()})
for gk, plugin := range contributesPolicies {
if plugin.Policies != nil {
policycols = append(policycols, plugin.Policies)
h.hasSyncedFuncs = append(h.hasSyncedFuncs, plugin.Policies.Synced().HasSynced)
}
if plugin.PoliciesFetch != nil {
h.policiesFetch[gk] = plugin.PoliciesFetch
}
if plugin.GlobalPolicies != nil {
h.globalPolicies = append(h.globalPolicies, globalPolicy{
GroupKind: gk,
ir: plugin.GlobalPolicies,
points: plugin.AttachmentPoints(),
})
}
}

Expand All @@ -245,10 +259,13 @@ func NewPolicyIndex(krtopts krtutil.KrtOptions, contributesPolicies extensionspl

// Attachment happens during collection creation (i.e. this file), and not translation. so these methods don't need to be public!
// note: we may want to change that for global policies maybe.
func (p *PolicyIndex) getTargetingPolicies(kctx krt.HandlerContext, pnt extensionsplug.AttachmentPoints, targetRef ir.ObjectSource, sectionName string) []ir.PolicyAtt {

func (p *PolicyIndex) getTargetingPolicies(
kctx krt.HandlerContext,
pnt extensionsplug.AttachmentPoints,
targetRef ir.ObjectSource,
sectionName string,
) []ir.PolicyAtt {
var ret []ir.PolicyAtt

for _, gp := range p.globalPolicies {
if gp.points.Has(pnt) {
if p := gp.ir(kctx, pnt); p != nil {
Expand Down Expand Up @@ -430,7 +447,14 @@ func (h *RoutesIndex) HasSynced() bool {
return h.httpRoutes.Synced().HasSynced() && h.routes.Synced().HasSynced() && h.policies.HasSynced() && h.upstreams.HasSynced() && h.refgrants.HasSynced()
}

func NewRoutesIndex(krtopts krtutil.KrtOptions, httproutes krt.Collection[*gwv1.HTTPRoute], tcproutes krt.Collection[*gwv1a2.TCPRoute], policies *PolicyIndex, upstreams *UpstreamIndex, refgrants *RefGrantIndex) *RoutesIndex {
func NewRoutesIndex(
krtopts krtutil.KrtOptions,
httproutes krt.Collection[*gwv1.HTTPRoute],
tcproutes krt.Collection[*gwv1a2.TCPRoute],
policies *PolicyIndex,
upstreams *UpstreamIndex,
refgrants *RefGrantIndex,
) *RoutesIndex {

h := &RoutesIndex{policies: policies, refgrants: refgrants, upstreams: upstreams}
h.hasSyncedFuncs = append(h.hasSyncedFuncs, httproutes.Synced().HasSynced, tcproutes.Synced().HasSynced)
Expand Down
45 changes: 26 additions & 19 deletions projects/gateway2/krtcollections/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,32 @@ func registerTypes() {
)
}

func initCollectionsWithGateways(ctx context.Context,
func InitCollections(
ctx context.Context,
extensions extensionsplug.Plugin,
istioClient kube.Client,
isOurGw func(gw *gwv1.Gateway) bool,
refgrants *RefGrantIndex,
krtopts krtutil.KrtOptions,
) (*GatewayIndex, *RoutesIndex, krt.Collection[ir.Upstream], krt.Collection[ir.EndpointsForUpstream]) {
registerTypes()

httpRoutes := krt.WrapClient(kclient.New[*gwv1.HTTPRoute](istioClient), krtopts.ToOptions("HTTPRoute")...)
tcproutes := krt.WrapClient(kclient.New[*gwv1a2.TCPRoute](istioClient), krtopts.ToOptions("TCPRoute")...)
kubeRawGateways := krt.WrapClient(kclient.New[*gwv1.Gateway](istioClient), krtopts.ToOptions("KubeGateways")...)

return initCollectionsWithGateways(isOurGw, kubeRawGateways, httpRoutes, tcproutes, refgrants, extensions, krtopts)
}

func initCollectionsWithGateways(
isOurGw func(gw *gwv1.Gateway) bool,
kubeRawGateways krt.Collection[*gwv1.Gateway],
httpRoutes krt.Collection[*gwv1.HTTPRoute],
tcproutes krt.Collection[*gwv1a2.TCPRoute],
refgrants *RefGrantIndex,
extensions extensionsplug.Plugin, krtopts krtutil.KrtOptions) (*GatewayIndex, *RoutesIndex, krt.Collection[ir.Upstream], krt.Collection[ir.EndpointsForUpstream]) {
extensions extensionsplug.Plugin,
krtopts krtutil.KrtOptions,
) (*GatewayIndex, *RoutesIndex, krt.Collection[ir.Upstream], krt.Collection[ir.EndpointsForUpstream]) {

policies := NewPolicyIndex(krtopts, extensions.ContributesPolicies)

Expand All @@ -71,31 +90,19 @@ func initCollectionsWithGateways(ctx context.Context,
}

upstreamIndex := NewUpstreamIndex(krtopts, backendRefPlugins, policies)
finalUpstreams, endpointIRs := initUpstreams(ctx, extensions, upstreamIndex, krtopts)
finalUpstreams, endpointIRs := initUpstreams(extensions, upstreamIndex, krtopts)

kubeGateways := NewGatewayIndex(krtopts, isOurGw, policies, kubeRawGateways)

routes := NewRoutesIndex(krtopts, httpRoutes, tcproutes, policies, upstreamIndex, refgrants)
return kubeGateways, routes, finalUpstreams, endpointIRs
}

func InitCollections(ctx context.Context,
func initUpstreams(
extensions extensionsplug.Plugin,
istioClient kube.Client,
isOurGw func(gw *gwv1.Gateway) bool,
refgrants *RefGrantIndex,
krtopts krtutil.KrtOptions) (*GatewayIndex, *RoutesIndex, krt.Collection[ir.Upstream], krt.Collection[ir.EndpointsForUpstream]) {
registerTypes()

httpRoutes := krt.WrapClient(kclient.New[*gwv1.HTTPRoute](istioClient), krtopts.ToOptions("HTTPRoute")...)
tcproutes := krt.WrapClient(kclient.New[*gwv1a2.TCPRoute](istioClient), krtopts.ToOptions("TCPRoute")...)
kubeRawGateways := krt.WrapClient(kclient.New[*gwv1.Gateway](istioClient), krtopts.ToOptions("KubeGateways")...)

return initCollectionsWithGateways(ctx, isOurGw, kubeRawGateways, httpRoutes, tcproutes, refgrants, extensions, krtopts)
}

func initUpstreams(ctx context.Context,
extensions extensionsplug.Plugin, upstreamIndex *UpstreamIndex, krtopts krtutil.KrtOptions) (krt.Collection[ir.Upstream], krt.Collection[ir.EndpointsForUpstream]) {
upstreamIndex *UpstreamIndex,
krtopts krtutil.KrtOptions,
) (krt.Collection[ir.Upstream], krt.Collection[ir.EndpointsForUpstream]) {

allEndpoints := []krt.Collection[ir.EndpointsForUpstream]{}
for k, col := range extensions.ContributesUpstreams {
Expand Down
5 changes: 4 additions & 1 deletion projects/gateway2/proxy_syncer/cla.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ func (ie *PerClientEnvoyEndpoints) FetchEndpointsForClient(kctx krt.HandlerConte
return krt.Fetch(kctx, ie.endpoints, krt.FilterIndex(ie.index, ucc.ResourceName()))
}

func NewPerClientEnvoyEndpoints(logger *zap.Logger, krtopts krtutil.KrtOptions, uccs krt.Collection[ir.UniqlyConnectedClient],
func NewPerClientEnvoyEndpoints(
logger *zap.Logger,
krtopts krtutil.KrtOptions,
uccs krt.Collection[ir.UniqlyConnectedClient],
glooEndpoints krt.Collection[ir.EndpointsForUpstream],
translateEndpoints func(kctx krt.HandlerContext, ucc ir.UniqlyConnectedClient, ep ir.EndpointsForUpstream) (*envoy_config_endpoint_v3.ClusterLoadAssignment, uint64),
) PerClientEnvoyEndpoints {
Expand Down
10 changes: 8 additions & 2 deletions projects/gateway2/proxy_syncer/perclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ import (
"istio.io/istio/pkg/kube/krt"
)

func snapshotPerClient(l *zap.Logger, krtopts krtutil.KrtOptions, uccCol krt.Collection[ir.UniqlyConnectedClient],
mostXdsSnapshots krt.Collection[GatewayXdsResources], endpoints PerClientEnvoyEndpoints, clusters PerClientEnvoyClusters) krt.Collection[XdsSnapWrapper] {
func snapshotPerClient(
l *zap.Logger,
krtopts krtutil.KrtOptions,
uccCol krt.Collection[ir.UniqlyConnectedClient],
mostXdsSnapshots krt.Collection[GatewayXdsResources],
endpoints PerClientEnvoyEndpoints,
clusters PerClientEnvoyClusters,
) krt.Collection[XdsSnapWrapper] {

xdsSnapshotsForUcc := krt.NewCollection(uccCol, func(kctx krt.HandlerContext, ucc ir.UniqlyConnectedClient) *XdsSnapWrapper {
maybeMostlySnap := krt.FetchOne(kctx, mostXdsSnapshots, krt.FilterKey(ucc.Role))
Expand Down
25 changes: 22 additions & 3 deletions projects/gateway2/proxy_syncer/proxy_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,28 @@ func (s *ProxySyncer) Init(ctx context.Context, isOurGw func(gw *gwv1.Gateway) b
return toResources(gw, *xdsSnap, rm)
}, krtopts.ToOptions("MostXdsSnapshots")...)

epPerClient := NewPerClientEnvoyEndpoints(logger.Desugar(), krtopts, s.uniqueClients, endpointIRs, s.translatorSyncer.TranslateEndpoints)
clustersPerClient := NewPerClientEnvoyClusters(ctx, krtopts, s.translatorSyncer.GetUpstreamTranslator(), finalUpstreams, s.uniqueClients)
s.perclientSnapCollection = snapshotPerClient(logger.Desugar(), krtopts, s.uniqueClients, s.mostXdsSnapshots, epPerClient, clustersPerClient)
epPerClient := NewPerClientEnvoyEndpoints(
logger.Desugar(),
krtopts,
s.uniqueClients,
endpointIRs,
s.translatorSyncer.TranslateEndpoints,
)
clustersPerClient := NewPerClientEnvoyClusters(
ctx,
krtopts,
s.translatorSyncer.GetUpstreamTranslator(),
finalUpstreams,
s.uniqueClients,
)
s.perclientSnapCollection = snapshotPerClient(
logger.Desugar(),
krtopts,
s.uniqueClients,
s.mostXdsSnapshots,
epPerClient,
clustersPerClient,
)

// as proxies are created, they also contain a reportMap containing status for the Gateway and associated xRoutes (really parentRefs)
// here we will merge reports that are per-Proxy to a singleton Report used to persist to k8s on a timer
Expand Down
17 changes: 7 additions & 10 deletions projects/gateway2/query/httproute.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (r *gatewayQueries) getDelegatedChildren(
}
ref := *backendRef.Delegate
// Fetch child routes based on the backend reference
referencedRoutes, err := r.fetchChildRoutes(kctx, ctx, parent.Namespace, backendRef)
referencedRoutes, err := r.fetchChildRoutes(kctx, backendRef)
if err != nil {
children.AddError(ref, err)
continue
Expand Down Expand Up @@ -254,8 +254,6 @@ func (r *gatewayQueries) getDelegatedChildren(

func (r *gatewayQueries) fetchChildRoutes(
kctx krt.HandlerContext,
ctx context.Context,
parentNamespace string,
backend ir.HttpBackendOrDelegate,
) ([]ir.HttpRouteIR, error) {

Expand Down Expand Up @@ -294,7 +292,7 @@ func (r *gatewayQueries) GetRoutesForGateway(kctx krt.HandlerContext, ctx contex

// Process each route
ret := NewRoutesForGwResult()
routes := fetchRoutes(kctx, r, nns)
routes := r.routes.RoutesForGateway(kctx, nns)
for _, route := range routes {
if err := r.processRoute(kctx, ctx, gw, route, ret); err != nil {
return nil, err
Expand All @@ -304,14 +302,13 @@ func (r *gatewayQueries) GetRoutesForGateway(kctx krt.HandlerContext, ctx contex
return ret, nil
}

// fetchRoutes is a helper function to fetch routes and add to the routes slice.
func fetchRoutes(kctx krt.HandlerContext, r *gatewayQueries, nns types.NamespacedName) []ir.Route {
return r.routes.RoutesForGateway(kctx, nns)
}

func (r *gatewayQueries) processRoute(
kctx krt.HandlerContext,
ctx context.Context, gw *gwv1.Gateway, route ir.Route, ret *RoutesForGwResult) error {
ctx context.Context,
gw *gwv1.Gateway,
route ir.Route,
ret *RoutesForGwResult,
) error {
refs := getParentRefsForGw(gw, route)
routeKind := route.GetGroupKind().Kind

Expand Down
17 changes: 2 additions & 15 deletions projects/gateway2/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,6 @@ func (f FromObject) Namespace() string {
return f.GetNamespace()
}

// TODO(Law): remove this type entirely?
type FromGkNs struct {
Gk metav1.GroupKind
Ns string
}

func (f FromGkNs) GroupKind() (metav1.GroupKind, error) {
return f.Gk, nil
}

func (f FromGkNs) Namespace() string {
return f.Ns
}

type GatewayQueries interface {
GetSecretForRef(kctx krt.HandlerContext, ctx context.Context, fromGk schema.GroupKind, fromns string, secretRef apiv1.SecretObjectReference) (*ir.Secret, error)

Expand All @@ -121,7 +107,8 @@ type GatewayQueries interface {
ctx context.Context,
route ir.Route,
hostnames []string,
parentRef gwv1.ParentReference) *RouteInfo
parentRef gwv1.ParentReference,
) *RouteInfo
}

type RoutesForGwResult struct {
Expand Down
Loading

0 comments on commit 8e35c9f

Please sign in to comment.