Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: simplify code handling certain error conditions in the resolver #8123

Merged
merged 2 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@
Children map[string]xdsChildConfig `json:"children"`
}

// serviceConfigJSON produces a service config in JSON format representing all
// the clusters referenced in activeClusters. This includes clusters with zero
// references, so they must be pruned first.
func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) {
// serviceConfigJSON produces a service config in JSON format that contains LB
// policy config for the "xds_cluster_manager" LB policy, with entries in the
// children map for all active clusters.
func serviceConfigJSON(activeClusters map[string]*clusterInfo) []byte {
// Generate children (all entries in activeClusters).
children := make(map[string]xdsChildConfig)
for cluster, ci := range activeClusters {
Expand All @@ -87,11 +87,13 @@
),
}

// This is not expected to fail as we have constructed the service config by
// hand right above, and therefore ok to panic.
bs, err := json.Marshal(sc)
if err != nil {
return nil, fmt.Errorf("failed to marshal json: %v", err)
panic(fmt.Sprintf("failed to marshal service config %+v: %v", sc, err))

Check warning on line 94 in xds/internal/resolver/serviceconfig.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/resolver/serviceconfig.go#L94

Added line #L94 was not covered by tests
}
return bs, nil
return bs
}

type virtualHost struct {
Expand Down
43 changes: 18 additions & 25 deletions xds/internal/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@
// sendNewServiceConfig prunes active clusters, generates a new service config
// based on the current set of active clusters, and sends an update to the
// channel with that service config and the provided config selector. Returns
// false if an error occurs while generating the service config and the update
// cannot be sent.
// false if an error occurs while sending an update to the channel.
//
// Only executed in the context of a serializer callback.
func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
Expand All @@ -295,24 +294,28 @@
// There are no clusters and we are sending a failing configSelector.
// Send an empty config, which picks pick-first, with no address, and
// puts the ClientConn into transient failure.
r.cc.UpdateState(resolver.State{ServiceConfig: r.cc.ParseServiceConfig("{}")})
if err := r.cc.UpdateState(resolver.State{ServiceConfig: r.cc.ParseServiceConfig("{}")}); err != nil {
if r.logger.V(2) {
r.logger.Infof("Channel rejected new state (with empty service config) with error: %v", err)
}

Check warning on line 300 in xds/internal/resolver/xds_resolver.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/resolver/xds_resolver.go#L299-L300

Added lines #L299 - L300 were not covered by tests
return false
}
return true
}

sc, err := serviceConfigJSON(r.activeClusters)
if err != nil {
// JSON marshal error; should never happen.
r.logger.Errorf("For Listener resource %q and RouteConfiguration resource %q, failed to marshal newly built service config: %v", r.ldsResourceName, r.rdsResourceName, err)
r.cc.ReportError(err)
return false
}
sc := serviceConfigJSON(r.activeClusters)
r.logger.Infof("For Listener resource %q and RouteConfiguration resource %q, generated service config: %v", r.ldsResourceName, r.rdsResourceName, pretty.FormatJSON(sc))

// Send the update to the ClientConn.
state := iresolver.SetConfigSelector(resolver.State{
ServiceConfig: r.cc.ParseServiceConfig(string(sc)),
}, cs)
r.cc.UpdateState(xdsclient.SetClient(state, r.xdsClient))
if err := r.cc.UpdateState(xdsclient.SetClient(state, r.xdsClient)); err != nil {
if r.logger.V(2) {
r.logger.Infof("Channel rejected new state: %+v with error: %v", state, err)
}
return false

Check warning on line 317 in xds/internal/resolver/xds_resolver.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/resolver/xds_resolver.go#L314-L317

Added lines #L314 - L317 were not covered by tests
}
return true
}

Expand All @@ -321,7 +324,7 @@
// r.activeClusters for previously-unseen clusters.
//
// Only executed in the context of a serializer callback.
func (r *xdsResolver) newConfigSelector() (*configSelector, error) {
func (r *xdsResolver) newConfigSelector() *configSelector {
cs := &configSelector{
r: r,
virtualHost: virtualHost{
Expand Down Expand Up @@ -357,11 +360,7 @@
}
cs.routes[i].clusters = clusters

var err error
cs.routes[i].m, err = xdsresource.RouteToMatcher(rt)
if err != nil {
return nil, err
}
cs.routes[i].m = xdsresource.RouteToMatcher(rt)
cs.routes[i].actionType = rt.ActionType
if rt.MaxStreamDuration == nil {
cs.routes[i].maxStreamDuration = r.currentListener.MaxStreamDuration
Expand All @@ -381,7 +380,7 @@
atomic.AddInt32(&ci.refCount, 1)
}

return cs, nil
return cs
}

// pruneActiveClusters deletes entries in r.activeClusters with zero
Expand Down Expand Up @@ -437,13 +436,7 @@
return
}

cs, err := r.newConfigSelector()
if err != nil {
r.logger.Warningf("Failed to build a config selector for resource %q: %v", r.ldsResourceName, err)
r.cc.ReportError(err)
return
}

cs := r.newConfigSelector()
if !r.sendNewServiceConfig(cs) {
// JSON error creating the service config (unexpected); erase
// this config selector and ignore this update, continuing with
Expand Down
6 changes: 1 addition & 5 deletions xds/internal/xdsclient/xdsresource/filter_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,8 @@ func (fc *FilterChain) ConstructUsableRouteConfiguration(config RouteConfigUpdat
func (fc *FilterChain) convertVirtualHost(virtualHost *VirtualHost) (VirtualHostWithInterceptors, error) {
rs := make([]RouteWithInterceptors, len(virtualHost.Routes))
for i, r := range virtualHost.Routes {
var err error
rs[i].ActionType = r.ActionType
rs[i].M, err = RouteToMatcher(r)
if err != nil {
return VirtualHostWithInterceptors{}, fmt.Errorf("matcher construction: %v", err)
}
rs[i].M = RouteToMatcher(r)
for _, filter := range fc.HTTPFilters {
// Route is highest priority on server side, as there is no concept
// of an upstream cluster on server side.
Expand Down
11 changes: 7 additions & 4 deletions xds/internal/xdsclient/xdsresource/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
)

// RouteToMatcher converts a route to a Matcher to match incoming RPC's against.
func RouteToMatcher(r *Route) (*CompositeMatcher, error) {
//
// Only expected to be called on a Route that passed validation checks by the
// xDS client.
func RouteToMatcher(r *Route) *CompositeMatcher {
var pm pathMatcher
switch {
case r.Regex != nil:
Expand All @@ -39,7 +42,7 @@
case r.Prefix != nil:
pm = newPathPrefixMatcher(*r.Prefix, r.CaseInsensitive)
default:
return nil, fmt.Errorf("illegal route: missing path_matcher")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we leave this, with a panic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Done here and below.

panic("illegal route: missing path_matcher")

Check warning on line 45 in xds/internal/xdsclient/xdsresource/matcher.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/xdsresource/matcher.go#L45

Added line #L45 was not covered by tests
}

headerMatchers := make([]matcher.HeaderMatcher, 0, len(r.Headers))
Expand All @@ -62,7 +65,7 @@
case h.StringMatch != nil:
matcherT = matcher.NewHeaderStringMatcher(h.Name, *h.StringMatch, invert)
default:
return nil, fmt.Errorf("illegal route: missing header_match_specifier")
panic("illegal route: missing header_match_specifier")

Check warning on line 68 in xds/internal/xdsclient/xdsresource/matcher.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/xdsresource/matcher.go#L68

Added line #L68 was not covered by tests
}
headerMatchers = append(headerMatchers, matcherT)
}
Expand All @@ -71,7 +74,7 @@
if r.Fraction != nil {
fractionMatcher = newFractionMatcher(*r.Fraction)
}
return newCompositeMatcher(pm, headerMatchers, fractionMatcher), nil
return newCompositeMatcher(pm, headerMatchers, fractionMatcher)
}

// CompositeMatcher is a matcher that holds onto many matchers and aggregates
Expand Down