From ae9e5e0ef524f61a00ad8fab898c97c1667cc627 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Fri, 24 Jan 2025 18:34:09 +0000 Subject: [PATCH] fix(deps): update module google.golang.org/grpc to v1.70.0 --- go.mod | 2 +- go.sum | 4 +- pkg/push/go.mod | 4 +- pkg/push/go.sum | 28 ++-- .../endpointsharding/endpointsharding.go | 8 +- .../grpclb/grpc_lb_v1/load_balancer.pb.go | 2 +- .../pickfirst/pickfirstleaf/pickfirstleaf.go | 27 ++- .../balancer/weightedroundrobin/balancer.go | 8 +- .../weightedroundrobin/weightedroundrobin.go | 7 + .../grpc/balancer_wrapper.go | 73 ++++++++- .../grpc_binarylog_v1/binarylog.pb.go | 2 +- .../internal/proto/grpc_gcp/altscontext.pb.go | 2 +- .../internal/proto/grpc_gcp/handshaker.pb.go | 2 +- .../grpc_gcp/transport_security_common.pb.go | 2 +- .../google.golang.org/grpc/credentials/tls.go | 6 +- vendor/google.golang.org/grpc/dialoptions.go | 5 + .../grpc/health/grpc_health_v1/health.pb.go | 2 +- .../grpc/internal/envconfig/envconfig.go | 2 +- .../grpc/internal/envconfig/xds.go | 6 + .../grpc/internal/internal.go | 4 + .../internal/proto/grpc_lookup_v1/rls.pb.go | 2 +- .../proto/grpc_lookup_v1/rls_config.pb.go | 2 +- .../grpc/internal/transport/handler_server.go | 2 +- .../grpc/internal/transport/http2_server.go | 4 +- vendor/google.golang.org/grpc/server.go | 10 +- .../google.golang.org/grpc/service_config.go | 17 +- vendor/google.golang.org/grpc/stream.go | 2 +- vendor/google.golang.org/grpc/version.go | 2 +- .../internal/balancer/clusterimpl/picker.go | 2 +- .../clusterresolver/clusterresolver.go | 30 +++- .../balancer/clusterresolver/configbuilder.go | 70 ++++---- .../clusterresolver/resource_resolver.go | 5 +- .../clusterresolver/resource_resolver_dns.go | 26 +-- .../balancer/outlierdetection/balancer.go | 146 +++++++++++------ .../outlierdetection/subconn_wrapper.go | 154 +++++++++++++++++- .../internal/balancer/wrrlocality/balancer.go | 7 + .../grpc/xds/internal/internal.go | 6 + .../grpc/xds/internal/xdsclient/authority.go | 13 +- .../grpc/xds/internal/xdsclient/client_new.go | 15 +- .../internal/xdsclient/client_refcounted.go | 20 ++- .../grpc/xds/internal/xdsclient/clientimpl.go | 77 +-------- .../xdsclient/transport/ads/ads_stream.go | 2 +- .../xdsclient/xdsresource/type_eds.go | 2 +- .../xdsclient/xdsresource/unmarshal_eds.go | 19 ++- vendor/modules.txt | 2 +- 45 files changed, 557 insertions(+), 276 deletions(-) diff --git a/go.mod b/go.mod index 196f56d8288a2..04394a92833ff 100644 --- a/go.mod +++ b/go.mod @@ -104,7 +104,7 @@ require ( golang.org/x/sys v0.29.0 golang.org/x/time v0.9.0 google.golang.org/api v0.218.0 - google.golang.org/grpc v1.69.4 + google.golang.org/grpc v1.70.0 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/klog/v2 v2.130.1 diff --git a/go.sum b/go.sum index d52d76c74db90..a3e2cc824fcc9 100644 --- a/go.sum +++ b/go.sum @@ -1638,8 +1638,8 @@ google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= -google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A= -google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= +google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/push/go.mod b/pkg/push/go.mod index e203ecd749939..ad4284a93e60c 100644 --- a/pkg/push/go.mod +++ b/pkg/push/go.mod @@ -7,7 +7,7 @@ toolchain go1.23.3 require ( github.com/gogo/protobuf v1.3.2 github.com/stretchr/testify v1.10.0 - google.golang.org/grpc v1.69.4 + google.golang.org/grpc v1.70.0 ) require ( @@ -18,7 +18,7 @@ require ( golang.org/x/sys v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect - google.golang.org/protobuf v1.35.1 // indirect + google.golang.org/protobuf v1.35.2 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/pkg/push/go.sum b/pkg/push/go.sum index f10f9acddd930..97612efa542c2 100644 --- a/pkg/push/go.sum +++ b/pkg/push/go.sum @@ -27,16 +27,16 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= -go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= -go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= -go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= -go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= -go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= -go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= -go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= -go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= -go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= +go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -70,10 +70,10 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= -google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A= -google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= +google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go b/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go index 263c024a84c7e..9b59bfc1d9796 100644 --- a/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go +++ b/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go @@ -35,11 +35,9 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" - "google.golang.org/grpc/balancer/pickfirst" "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/balancer/gracefulswitch" - "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" ) @@ -48,11 +46,7 @@ import ( var PickFirstConfig string func init() { - name := pickfirst.Name - if !envconfig.NewPickFirstEnabled { - name = pickfirstleaf.Name - } - PickFirstConfig = fmt.Sprintf("[{%q: {}}]", name) + PickFirstConfig = fmt.Sprintf("[{%q: {}}]", pickfirstleaf.Name) } // ChildState is the balancer state of a child along with the endpoint which diff --git a/vendor/google.golang.org/grpc/balancer/grpclb/grpc_lb_v1/load_balancer.pb.go b/vendor/google.golang.org/grpc/balancer/grpclb/grpc_lb_v1/load_balancer.pb.go index 3f274482c74bf..86d495bb624f8 100644 --- a/vendor/google.golang.org/grpc/balancer/grpclb/grpc_lb_v1/load_balancer.pb.go +++ b/vendor/google.golang.org/grpc/balancer/grpclb/grpc_lb_v1/load_balancer.pb.go @@ -19,7 +19,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.35.2 // protoc v5.27.1 // source: grpc/lb/v1/load_balancer.proto diff --git a/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go index 2fc0a71f9441e..76fa5fea95f2b 100644 --- a/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go +++ b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go @@ -54,9 +54,18 @@ func init() { balancer.Register(pickfirstBuilder{}) } -// enableHealthListenerKeyType is a unique key type used in resolver attributes -// to indicate whether the health listener usage is enabled. -type enableHealthListenerKeyType struct{} +type ( + // enableHealthListenerKeyType is a unique key type used in resolver + // attributes to indicate whether the health listener usage is enabled. + enableHealthListenerKeyType struct{} + // managedByPickfirstKeyType is an attribute key type to inform Outlier + // Detection that the generic health listener is being used. + // TODO: https://github.com/grpc/grpc-go/issues/7915 - Remove this when + // implementing the dualstack design. This is a hack. Once Dualstack is + // completed, outlier detection will stop sending ejection updates through + // the connectivity listener. + managedByPickfirstKeyType struct{} +) var ( logger = grpclog.Component("pick-first-leaf-lb") @@ -140,6 +149,17 @@ func EnableHealthListener(state resolver.State) resolver.State { return state } +// IsManagedByPickfirst returns whether an address belongs to a SubConn +// managed by the pickfirst LB policy. +// TODO: https://github.com/grpc/grpc-go/issues/7915 - This is a hack to disable +// outlier_detection via the with connectivity listener when using pick_first. +// Once Dualstack changes are complete, all SubConns will be created by +// pick_first and outlier detection will only use the health listener for +// ejection. This hack can then be removed. +func IsManagedByPickfirst(addr resolver.Address) bool { + return addr.BalancerAttributes.Value(managedByPickfirstKeyType{}) != nil +} + type pfConfig struct { serviceconfig.LoadBalancingConfig `json:"-"` @@ -166,6 +186,7 @@ type scData struct { } func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) { + addr.BalancerAttributes = addr.BalancerAttributes.WithValue(managedByPickfirstKeyType{}, true) sd := &scData{ rawConnectivityState: connectivity.Idle, effectiveState: connectivity.Idle, diff --git a/vendor/google.golang.org/grpc/balancer/weightedroundrobin/balancer.go b/vendor/google.golang.org/grpc/balancer/weightedroundrobin/balancer.go index c9c5b576bb0c8..d7b9dc4666ee4 100644 --- a/vendor/google.golang.org/grpc/balancer/weightedroundrobin/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/weightedroundrobin/balancer.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/endpointsharding" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/balancer/weightedroundrobin/internal" "google.golang.org/grpc/balancer/weightedtarget" "google.golang.org/grpc/connectivity" @@ -218,7 +219,9 @@ type wrrBalancer struct { } func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { - b.logger.Infof("UpdateCCS: %v", ccs) + if b.logger.V(2) { + b.logger.Infof("UpdateCCS: %v", ccs) + } cfg, ok := ccs.BalancerConfig.(*lbConfig) if !ok { return fmt.Errorf("wrr: received nil or illegal BalancerConfig (type %T): %v", ccs.BalancerConfig, ccs.BalancerConfig) @@ -232,6 +235,9 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error b.updateEndpointsLocked(ccs.ResolverState.Endpoints) b.mu.Unlock() + // Make pickfirst children use health listeners for outlier detection to + // work. + ccs.ResolverState = pickfirstleaf.EnableHealthListener(ccs.ResolverState) // This causes child to update picker inline and will thus cause inline // picker update. return b.child.UpdateClientConnState(balancer.ClientConnState{ diff --git a/vendor/google.golang.org/grpc/balancer/weightedroundrobin/weightedroundrobin.go b/vendor/google.golang.org/grpc/balancer/weightedroundrobin/weightedroundrobin.go index 8741fdad19dcd..258cdd5db280f 100644 --- a/vendor/google.golang.org/grpc/balancer/weightedroundrobin/weightedroundrobin.go +++ b/vendor/google.golang.org/grpc/balancer/weightedroundrobin/weightedroundrobin.go @@ -56,6 +56,13 @@ func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address { return addr } +// SetAddrInfoInEndpoint returns a copy of endpoint in which the Attributes +// field is updated with addrInfo. +func SetAddrInfoInEndpoint(endpoint resolver.Endpoint, addrInfo AddrInfo) resolver.Endpoint { + endpoint.Attributes = endpoint.Attributes.WithValue(attributeKey{}, addrInfo) + return endpoint +} + // GetAddrInfo returns the AddrInfo stored in the BalancerAttributes field of // addr. func GetAddrInfo(addr resolver.Address) AddrInfo { diff --git a/vendor/google.golang.org/grpc/balancer_wrapper.go b/vendor/google.golang.org/grpc/balancer_wrapper.go index 905817b5fc7b6..c2688376ae74d 100644 --- a/vendor/google.golang.org/grpc/balancer_wrapper.go +++ b/vendor/google.golang.org/grpc/balancer_wrapper.go @@ -34,7 +34,15 @@ import ( "google.golang.org/grpc/status" ) -var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)) +var ( + setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)) + // noOpRegisterHealthListenerFn is used when client side health checking is + // disabled. It sends a single READY update on the registered listener. + noOpRegisterHealthListenerFn = func(_ context.Context, listener func(balancer.SubConnState)) func() { + listener(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + return func() {} + } +) // ccBalancerWrapper sits between the ClientConn and the Balancer. // @@ -277,10 +285,17 @@ type healthData struct { // to the LB policy. This is stored to avoid sending updates when the // SubConn has already exited connectivity state READY. connectivityState connectivity.State + // closeHealthProducer stores function to close the ref counted health + // producer. The health producer is automatically closed when the SubConn + // state changes. + closeHealthProducer func() } func newHealthData(s connectivity.State) *healthData { - return &healthData{connectivityState: s} + return &healthData{ + connectivityState: s, + closeHealthProducer: func() {}, + } } // updateState is invoked by grpc to push a subConn state update to the @@ -413,6 +428,37 @@ func (acbw *acBalancerWrapper) closeProducers() { } } +// healthProducerRegisterFn is a type alias for the health producer's function +// for registering listeners. +type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func() + +// healthListenerRegFn returns a function to register a listener for health +// updates. If client side health checks are disabled, the registered listener +// will get a single READY (raw connectivity state) update. +// +// Client side health checking is enabled when all the following +// conditions are satisfied: +// 1. Health checking is not disabled using the dial option. +// 2. The health package is imported. +// 3. The health check config is present in the service config. +func (acbw *acBalancerWrapper) healthListenerRegFn() func(context.Context, func(balancer.SubConnState)) func() { + if acbw.ccb.cc.dopts.disableHealthCheck { + return noOpRegisterHealthListenerFn + } + regHealthLisFn := internal.RegisterClientHealthCheckListener + if regHealthLisFn == nil { + // The health package is not imported. + return noOpRegisterHealthListenerFn + } + cfg := acbw.ac.cc.healthCheckConfig() + if cfg == nil { + return noOpRegisterHealthListenerFn + } + return func(ctx context.Context, listener func(balancer.SubConnState)) func() { + return regHealthLisFn.(healthProducerRegisterFn)(ctx, acbw, cfg.ServiceName, listener) + } +} + // RegisterHealthListener accepts a health listener from the LB policy. It sends // updates to the health listener as long as the SubConn's connectivity state // doesn't change and a new health listener is not registered. To invalidate @@ -421,6 +467,7 @@ func (acbw *acBalancerWrapper) closeProducers() { func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) { acbw.healthMu.Lock() defer acbw.healthMu.Unlock() + acbw.healthData.closeHealthProducer() // listeners should not be registered when the connectivity state // isn't Ready. This may happen when the balancer registers a listener // after the connectivityState is updated, but before it is notified @@ -436,6 +483,7 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub return } + registerFn := acbw.healthListenerRegFn() acbw.ccb.serializer.TrySchedule(func(ctx context.Context) { if ctx.Err() != nil || acbw.ccb.balancer == nil { return @@ -443,10 +491,25 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub // Don't send updates if a new listener is registered. acbw.healthMu.Lock() defer acbw.healthMu.Unlock() - curHD := acbw.healthData - if curHD != hd { + if acbw.healthData != hd { return } - listener(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + // Serialize the health updates from the health producer with + // other calls into the LB policy. + listenerWrapper := func(scs balancer.SubConnState) { + acbw.ccb.serializer.TrySchedule(func(ctx context.Context) { + if ctx.Err() != nil || acbw.ccb.balancer == nil { + return + } + acbw.healthMu.Lock() + defer acbw.healthMu.Unlock() + if acbw.healthData != hd { + return + } + listener(scs) + }) + } + + hd.closeHealthProducer = registerFn(ctx, listenerWrapper) }) } diff --git a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go index 9e9d0806995c3..21dd72969aee3 100644 --- a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go +++ b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go @@ -18,7 +18,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.35.2 // protoc v5.27.1 // source: grpc/binlog/v1/binarylog.proto diff --git a/vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/altscontext.pb.go b/vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/altscontext.pb.go index 83d23f65aa542..40e42b6ae5827 100644 --- a/vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/altscontext.pb.go +++ b/vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/altscontext.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.35.2 // protoc v5.27.1 // source: grpc/gcp/altscontext.proto diff --git a/vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/handshaker.pb.go b/vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/handshaker.pb.go index 915b36df82146..2993bbfab15c9 100644 --- a/vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/handshaker.pb.go +++ b/vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/handshaker.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.35.2 // protoc v5.27.1 // source: grpc/gcp/handshaker.proto diff --git a/vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/transport_security_common.pb.go b/vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/transport_security_common.pb.go index e9676db4b52a8..a8d5c4857b806 100644 --- a/vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/transport_security_common.pb.go +++ b/vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/transport_security_common.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.35.2 // protoc v5.27.1 // source: grpc/gcp/transport_security_common.proto diff --git a/vendor/google.golang.org/grpc/credentials/tls.go b/vendor/google.golang.org/grpc/credentials/tls.go index e163a473df93e..bd5fe22b6af6a 100644 --- a/vendor/google.golang.org/grpc/credentials/tls.go +++ b/vendor/google.golang.org/grpc/credentials/tls.go @@ -32,6 +32,8 @@ import ( "google.golang.org/grpc/internal/envconfig" ) +const alpnFailureHelpMessage = "If you upgraded from a grpc-go version earlier than 1.67, your TLS connections may have stopped working due to ALPN enforcement. For more details, see: https://github.com/grpc/grpc-go/issues/434" + var logger = grpclog.Component("credentials") // TLSInfo contains the auth information for a TLS authenticated connection. @@ -128,7 +130,7 @@ func (c *tlsCreds) ClientHandshake(ctx context.Context, authority string, rawCon if np == "" { if envconfig.EnforceALPNEnabled { conn.Close() - return nil, nil, fmt.Errorf("credentials: cannot check peer: missing selected ALPN property") + return nil, nil, fmt.Errorf("credentials: cannot check peer: missing selected ALPN property. %s", alpnFailureHelpMessage) } logger.Warningf("Allowing TLS connection to server %q with ALPN disabled. TLS connections to servers with ALPN disabled will be disallowed in future grpc-go releases", cfg.ServerName) } @@ -158,7 +160,7 @@ func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error) if cs.NegotiatedProtocol == "" { if envconfig.EnforceALPNEnabled { conn.Close() - return nil, nil, fmt.Errorf("credentials: cannot check peer: missing selected ALPN property") + return nil, nil, fmt.Errorf("credentials: cannot check peer: missing selected ALPN property. %s", alpnFailureHelpMessage) } else if logger.V(2) { logger.Info("Allowing TLS connection from client with ALPN disabled. TLS connections with ALPN disabled will be disallowed in future grpc-go releases") } diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go index 7494ae591f164..f3a045296a46d 100644 --- a/vendor/google.golang.org/grpc/dialoptions.go +++ b/vendor/google.golang.org/grpc/dialoptions.go @@ -428,6 +428,11 @@ func WithTimeout(d time.Duration) DialOption { // returned by f, gRPC checks the error's Temporary() method to decide if it // should try to reconnect to the network address. // +// Note that gRPC by default performs name resolution on the target passed to +// NewClient. To bypass name resolution and cause the target string to be +// passed directly to the dialer here instead, use the "passthrough" resolver +// by specifying it in the target string, e.g. "passthrough:target". +// // Note: All supported releases of Go (as of December 2023) override the OS // defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive // with OS defaults for keepalive time and interval, use a net.Dialer that sets diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go index 26e16d91924f2..467de16bdbcdc 100644 --- a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go +++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.35.2 // protoc v5.27.1 // source: grpc/health/v1/health.proto diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go index 6e7dd6b772709..1e42b6fdc8722 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go @@ -49,7 +49,7 @@ var ( // XDSFallbackSupport is the env variable that controls whether support for // xDS fallback is turned on. If this is unset or is false, only the first // xDS server in the list of server configs will be used. - XDSFallbackSupport = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FALLBACK", false) + XDSFallbackSupport = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FALLBACK", true) // NewPickFirstEnabled is set if the new pickfirst leaf policy is to be used // instead of the exiting pickfirst implementation. This can be enabled by // setting the environment variable "GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST" diff --git a/vendor/google.golang.org/grpc/internal/envconfig/xds.go b/vendor/google.golang.org/grpc/internal/envconfig/xds.go index 29f234acb1b9d..9afeb444d4536 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/xds.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/xds.go @@ -53,4 +53,10 @@ var ( // C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing. C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI") + + // XDSDualstackEndpointsEnabled is true if gRPC should read the + // "additional addresses" in the xDS endpoint resource. + // TODO: https://github.com/grpc/grpc-go/issues/7866 - Control this using + // an env variable when all LB policies handle endpoints. + XDSDualstackEndpointsEnabled = false ) diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go index 3afc1813440e7..c17b98194b3c7 100644 --- a/vendor/google.golang.org/grpc/internal/internal.go +++ b/vendor/google.golang.org/grpc/internal/internal.go @@ -31,6 +31,10 @@ import ( var ( // HealthCheckFunc is used to provide client-side LB channel health checking HealthCheckFunc HealthChecker + // RegisterClientHealthCheckListener is used to provide a listener for + // updates from the client-side health checking service. It returns a + // function that can be called to stop the health producer. + RegisterClientHealthCheckListener any // func(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState)) func() // BalancerUnregister is exported by package balancer to unregister a balancer. BalancerUnregister func(name string) // KeepaliveMinPingTime is the minimum ping interval. This must be 10s by diff --git a/vendor/google.golang.org/grpc/internal/proto/grpc_lookup_v1/rls.pb.go b/vendor/google.golang.org/grpc/internal/proto/grpc_lookup_v1/rls.pb.go index 14185ca35a0ca..22731029f5f3c 100644 --- a/vendor/google.golang.org/grpc/internal/proto/grpc_lookup_v1/rls.pb.go +++ b/vendor/google.golang.org/grpc/internal/proto/grpc_lookup_v1/rls.pb.go @@ -14,7 +14,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.35.2 // protoc v5.27.1 // source: grpc/lookup/v1/rls.proto diff --git a/vendor/google.golang.org/grpc/internal/proto/grpc_lookup_v1/rls_config.pb.go b/vendor/google.golang.org/grpc/internal/proto/grpc_lookup_v1/rls_config.pb.go index 1549a7aa13a37..73b70c25ea398 100644 --- a/vendor/google.golang.org/grpc/internal/proto/grpc_lookup_v1/rls_config.pb.go +++ b/vendor/google.golang.org/grpc/internal/proto/grpc_lookup_v1/rls_config.pb.go @@ -14,7 +14,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.35.2 // protoc v5.27.1 // source: grpc/lookup/v1/rls_config.proto diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go index d9305a65d88f7..3dea235735188 100644 --- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go @@ -498,5 +498,5 @@ func mapRecvMsgError(err error) error { if strings.Contains(err.Error(), "body closed by handler") { return status.Error(codes.Canceled, err.Error()) } - return connectionErrorf(true, err, err.Error()) + return connectionErrorf(true, err, "%s", err.Error()) } diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index 0055fddd7ecf2..997b0a59b586b 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -564,7 +564,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade t.logger.Infof("Aborting the stream early: %v", errMsg) } t.controlBuf.put(&earlyAbortStream{ - httpStatus: 405, + httpStatus: http.StatusMethodNotAllowed, streamID: streamID, contentSubtype: s.contentSubtype, status: status.New(codes.Internal, errMsg), @@ -585,7 +585,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade stat = status.New(codes.PermissionDenied, err.Error()) } t.controlBuf.put(&earlyAbortStream{ - httpStatus: 200, + httpStatus: http.StatusOK, streamID: s.id, contentSubtype: s.contentSubtype, status: stat, diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index 16065a027ae81..9d5b2884d14ee 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -1360,8 +1360,16 @@ func (s *Server) processUnaryRPC(ctx context.Context, stream *transport.ServerSt } return err } - defer d.Free() + freed := false + dataFree := func() { + if !freed { + d.Free() + freed = true + } + } + defer dataFree() df := func(v any) error { + defer dataFree() if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) } diff --git a/vendor/google.golang.org/grpc/service_config.go b/vendor/google.golang.org/grpc/service_config.go index 7e83027d1994c..8d451e07c7cc0 100644 --- a/vendor/google.golang.org/grpc/service_config.go +++ b/vendor/google.golang.org/grpc/service_config.go @@ -268,18 +268,21 @@ func parseServiceConfig(js string, maxAttempts int) *serviceconfig.ParseResult { return &serviceconfig.ParseResult{Config: &sc} } +func isValidRetryPolicy(jrp *jsonRetryPolicy) bool { + return jrp.MaxAttempts > 1 && + jrp.InitialBackoff > 0 && + jrp.MaxBackoff > 0 && + jrp.BackoffMultiplier > 0 && + len(jrp.RetryableStatusCodes) > 0 +} + func convertRetryPolicy(jrp *jsonRetryPolicy, maxAttempts int) (p *internalserviceconfig.RetryPolicy, err error) { if jrp == nil { return nil, nil } - if jrp.MaxAttempts <= 1 || - jrp.InitialBackoff <= 0 || - jrp.MaxBackoff <= 0 || - jrp.BackoffMultiplier <= 0 || - len(jrp.RetryableStatusCodes) == 0 { - logger.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp) - return nil, nil + if !isValidRetryPolicy(jrp) { + return nil, fmt.Errorf("invalid retry policy (%+v): ", jrp) } if jrp.MaxAttempts < maxAttempts { diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index 17e2267b33201..54adbbced7a65 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -1766,7 +1766,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) { return err } if err == io.ErrUnexpectedEOF { - err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) + err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error()) } return toRPCErr(err) } diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go index d2bba7f3d9ecc..0e03fa4d4f7e8 100644 --- a/vendor/google.golang.org/grpc/version.go +++ b/vendor/google.golang.org/grpc/version.go @@ -19,4 +19,4 @@ package grpc // Version is the current grpc version. -const Version = "1.69.4" +const Version = "1.70.0" diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterimpl/picker.go b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterimpl/picker.go index dd4d39b3d398e..cd94182fa7175 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterimpl/picker.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterimpl/picker.go @@ -129,7 +129,7 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { if d.loadStore != nil { d.loadStore.CallDropped("") } - return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error()) + return balancer.PickResult{}, status.Error(codes.Unavailable, err.Error()) } } diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/clusterresolver.go b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/clusterresolver.go index ae2c5fe957a26..f0a8905d374b2 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -234,7 +234,7 @@ func (b *clusterResolverBalancer) updateChildConfig() { b.child = newChildBalancer(b.priorityBuilder, b.cc, b.bOpts) } - childCfgBytes, addrs, err := buildPriorityConfigJSON(b.priorities, &b.config.xdsLBPolicy) + childCfgBytes, endpoints, err := buildPriorityConfigJSON(b.priorities, &b.config.xdsLBPolicy) if err != nil { b.logger.Warningf("Failed to build child policy config: %v", err) return @@ -248,15 +248,33 @@ func (b *clusterResolverBalancer) updateChildConfig() { b.logger.Infof("Built child policy config: %s", pretty.ToJSON(childCfg)) } - endpoints := make([]resolver.Endpoint, len(addrs)) - for i, a := range addrs { - endpoints[i].Attributes = a.BalancerAttributes - endpoints[i].Addresses = []resolver.Address{a} + flattenedAddrs := make([]resolver.Address, len(endpoints)) + for i := range endpoints { + for j := range endpoints[i].Addresses { + addr := endpoints[i].Addresses[j] + addr.BalancerAttributes = endpoints[i].Attributes + // If the endpoint has multiple addresses, only the first is added + // to the flattened address list. This ensures that LB policies + // that don't support endpoints create only one subchannel to a + // backend. + if j == 0 { + flattenedAddrs[i] = addr + } + // BalancerAttributes need to be present in endpoint addresses. This + // temporary workaround is required to make load reporting work + // with the old pickfirst policy which creates SubConns with multiple + // addresses. Since the addresses can be from different localities, + // an Address.BalancerAttribute is used to identify the locality of the + // address used by the transport. This workaround can be removed once + // the old pickfirst is removed. + // See https://github.com/grpc/grpc-go/issues/7339 + endpoints[i].Addresses[j] = addr + } } if err := b.child.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ Endpoints: endpoints, - Addresses: addrs, + Addresses: flattenedAddrs, ServiceConfig: b.configRaw, Attributes: b.attrsWithClient, }, diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder.go b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder.go index f62b8e6c8eb59..9a3a71c2e5cc1 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder.go @@ -48,8 +48,8 @@ type priorityConfig struct { mechanism DiscoveryMechanism // edsResp is set only if type is EDS. edsResp xdsresource.EndpointsUpdate - // addresses is set only if type is DNS. - addresses []string + // endpoints is set only if type is DNS. + endpoints []resolver.Endpoint // Each discovery mechanism has a name generator so that the child policies // can reuse names between updates (EDS updates for example). childNameGen *nameGenerator @@ -71,8 +71,8 @@ type priorityConfig struct { // ┌──────▼─────┐ ┌─────▼──────┐ // │xDSLBPolicy │ │xDSLBPolicy │ (Locality and Endpoint picking layer) // └────────────┘ └────────────┘ -func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) { - pc, addrs, err := buildPriorityConfig(priorities, xdsLBPolicy) +func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Endpoint, error) { + pc, endpoints, err := buildPriorityConfig(priorities, xdsLBPolicy) if err != nil { return nil, nil, fmt.Errorf("failed to build priority config: %v", err) } @@ -80,23 +80,23 @@ func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internals if err != nil { return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err) } - return ret, addrs, nil + return ret, endpoints, nil } -func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address, error) { +func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Endpoint, error) { var ( - retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)} - retAddrs []resolver.Address + retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)} + retEndpoints []resolver.Endpoint ) for _, p := range priorities { switch p.mechanism.Type { case DiscoveryMechanismTypeEDS: - names, configs, addrs, err := buildClusterImplConfigForEDS(p.childNameGen, p.edsResp, p.mechanism, xdsLBPolicy) + names, configs, endpoints, err := buildClusterImplConfigForEDS(p.childNameGen, p.edsResp, p.mechanism, xdsLBPolicy) if err != nil { return nil, nil, err } retConfig.Priorities = append(retConfig.Priorities, names...) - retAddrs = append(retAddrs, addrs...) + retEndpoints = append(retEndpoints, endpoints...) odCfgs := convertClusterImplMapToOutlierDetection(configs, p.mechanism.outlierDetection) for n, c := range odCfgs { retConfig.Children[n] = &priority.Child{ @@ -107,9 +107,9 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi } continue case DiscoveryMechanismTypeLogicalDNS: - name, config, addrs := buildClusterImplConfigForDNS(p.childNameGen, p.addresses, p.mechanism) + name, config, endpoints := buildClusterImplConfigForDNS(p.childNameGen, p.endpoints, p.mechanism) retConfig.Priorities = append(retConfig.Priorities, name) - retAddrs = append(retAddrs, addrs...) + retEndpoints = append(retEndpoints, endpoints...) odCfg := makeClusterImplOutlierDetectionChild(config, p.mechanism.outlierDetection) retConfig.Children[name] = &priority.Child{ Config: &internalserviceconfig.BalancerConfig{Name: outlierdetection.Name, Config: odCfg}, @@ -120,7 +120,7 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi continue } } - return retConfig, retAddrs, nil + return retConfig, retEndpoints, nil } func convertClusterImplMapToOutlierDetection(ciCfgs map[string]*clusterimpl.LBConfig, odCfg outlierdetection.LBConfig) map[string]*outlierdetection.LBConfig { @@ -137,19 +137,22 @@ func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg out return &odCfgRet } -func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Address) { +func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoint, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Endpoint) { // Endpoint picking policy for DNS is hardcoded to pick_first. const childPolicy = "pick_first" - retAddrs := make([]resolver.Address, 0, len(addrStrs)) + retEndpoints := make([]resolver.Endpoint, len(endpoints)) pName := fmt.Sprintf("priority-%v", g.prefix) - for _, addrStr := range addrStrs { - retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName})) + for i, e := range endpoints { + retEndpoints[i] = hierarchy.SetInEndpoint(e, []string{pName}) + // Copy the nested address field as slice fields are shared by the + // iteration variable and the original slice. + retEndpoints[i].Addresses = append([]resolver.Address{}, e.Addresses...) } return pName, &clusterimpl.LBConfig{ Cluster: mechanism.Cluster, TelemetryLabels: mechanism.TelemetryLabels, ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy}, - }, retAddrs + }, retEndpoints } // buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for @@ -161,7 +164,7 @@ func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism // - map{"p0":p0_config, "p1":p1_config} // - [p0_address_0, p0_address_1, p1_address_0, p1_address_1] // - p0 addresses' hierarchy attributes are set to p0 -func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) { +func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Endpoint, error) { drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops)) for _, d := range edsResp.Drops { drops = append(drops, clusterimpl.DropConfig{ @@ -183,17 +186,17 @@ func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.Endpoint } retNames := g.generate(priorities) retConfigs := make(map[string]*clusterimpl.LBConfig, len(retNames)) - var retAddrs []resolver.Address + var retEndpoints []resolver.Endpoint for i, pName := range retNames { priorityLocalities := priorities[i] - cfg, addrs, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy) + cfg, endpoints, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy) if err != nil { return nil, nil, nil, err } retConfigs[pName] = cfg - retAddrs = append(retAddrs, addrs...) + retEndpoints = append(retEndpoints, endpoints...) } - return retNames, retConfigs, retAddrs, nil + return retNames, retConfigs, retEndpoints, nil } // groupLocalitiesByPriority returns the localities grouped by priority. @@ -244,8 +247,8 @@ func dedupSortedIntSlice(a []int) []int { // priority), and generates a cluster impl policy config, and a list of // addresses with their path hierarchy set to [priority-name, locality-name], so // priority and the xDS LB Policy know which child policy each address is for. -func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) { - var addrs []resolver.Address +func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Endpoint, error) { + var retEndpoints []resolver.Endpoint for _, locality := range localities { var lw uint32 = 1 if locality.Weight != 0 { @@ -262,21 +265,24 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown { continue } - addr := resolver.Address{Addr: endpoint.Address} - addr = hierarchy.Set(addr, []string{priorityName, localityStr}) - addr = internal.SetLocalityID(addr, locality.ID) + resolverEndpoint := resolver.Endpoint{} + for _, as := range endpoint.Addresses { + resolverEndpoint.Addresses = append(resolverEndpoint.Addresses, resolver.Address{Addr: as}) + } + resolverEndpoint = hierarchy.SetInEndpoint(resolverEndpoint, []string{priorityName, localityStr}) + resolverEndpoint = internal.SetLocalityIDInEndpoint(resolverEndpoint, locality.ID) // "To provide the xds_wrr_locality load balancer information about // locality weights received from EDS, the cluster resolver will // populate a new locality weight attribute for each address The // attribute will have the weight (as an integer) of the locality // the address is part of." - A52 - addr = wrrlocality.SetAddrInfo(addr, wrrlocality.AddrInfo{LocalityWeight: lw}) + resolverEndpoint = wrrlocality.SetAddrInfoInEndpoint(resolverEndpoint, wrrlocality.AddrInfo{LocalityWeight: lw}) var ew uint32 = 1 if endpoint.Weight != 0 { ew = endpoint.Weight } - addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: lw * ew}) - addrs = append(addrs, addr) + resolverEndpoint = weightedroundrobin.SetAddrInfoInEndpoint(resolverEndpoint, weightedroundrobin.AddrInfo{Weight: lw * ew}) + retEndpoints = append(retEndpoints, resolverEndpoint) } } return &clusterimpl.LBConfig{ @@ -287,5 +293,5 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority TelemetryLabels: mechanism.TelemetryLabels, DropCategories: drops, ChildPolicy: xdsLBPolicy, - }, addrs, nil + }, retEndpoints, nil } diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver.go b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver.go index 3bcfba8732a30..d9315c3acef5d 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -294,8 +295,8 @@ func (rr *resourceResolver) generateLocked(onDone xdsresource.OnDoneFunc) { switch uu := u.(type) { case xdsresource.EndpointsUpdate: ret = append(ret, priorityConfig{mechanism: rDM.dm, edsResp: uu, childNameGen: rDM.childNameGen}) - case []string: - ret = append(ret, priorityConfig{mechanism: rDM.dm, addresses: uu, childNameGen: rDM.childNameGen}) + case []resolver.Endpoint: + ret = append(ret, priorityConfig{mechanism: rDM.dm, endpoints: uu, childNameGen: rDM.childNameGen}) } } select { diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver_dns.go b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver_dns.go index cfc871d3b59d6..5f7a21153057e 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver_dns.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver_dns.go @@ -47,7 +47,7 @@ type dnsDiscoveryMechanism struct { logger *grpclog.PrefixLogger mu sync.Mutex - addrs []string + endpoints []resolver.Endpoint updateReceived bool } @@ -103,7 +103,7 @@ func (dr *dnsDiscoveryMechanism) lastUpdate() (any, bool) { if !dr.updateReceived { return nil, false } - return dr.addrs, true + return dr.endpoints, true } func (dr *dnsDiscoveryMechanism) resolveNow() { @@ -133,23 +133,15 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error { } dr.mu.Lock() - var addrs []string - if len(state.Endpoints) > 0 { - // Assume 1 address per endpoint, which is how DNS is expected to - // behave. The slice will grow as needed, however. - addrs = make([]string, 0, len(state.Endpoints)) - for _, e := range state.Endpoints { - for _, a := range e.Addresses { - addrs = append(addrs, a.Addr) - } - } - } else { - addrs = make([]string, len(state.Addresses)) + var endpoints = state.Endpoints + if len(endpoints) == 0 { + endpoints = make([]resolver.Endpoint, len(state.Addresses)) for i, a := range state.Addresses { - addrs[i] = a.Addr + endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{a}} + endpoints[i].Attributes = a.BalancerAttributes } } - dr.addrs = addrs + dr.endpoints = endpoints dr.updateReceived = true dr.mu.Unlock() @@ -172,7 +164,7 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) { dr.mu.Unlock() return } - dr.addrs = nil + dr.endpoints = nil dr.updateReceived = true dr.mu.Unlock() diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/balancer.go b/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/balancer.go index c9d496ce09b9c..8f58c00303217 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/balancer.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/balancer.go @@ -33,6 +33,7 @@ import ( "unsafe" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/buffer" @@ -72,7 +73,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba } b.logger = prefixLogger(b) b.logger.Infof("Created") - b.child = gracefulswitch.NewBalancer(b, bOpts) + b.child = synchronizingBalancerWrapper{lb: gracefulswitch.NewBalancer(b, bOpts)} go b.run() return b } @@ -152,6 +153,11 @@ type lbCfgUpdate struct { done chan struct{} } +type scHealthUpdate struct { + scw *subConnWrapper + state balancer.SubConnState +} + type outlierDetectionBalancer struct { // These fields are safe to be accessed without holding any mutex because // they are synchronized in run(), which makes these field accesses happen @@ -170,10 +176,7 @@ type outlierDetectionBalancer struct { logger *grpclog.PrefixLogger channelzParent channelz.Identifier - // childMu guards calls into child (to uphold the balancer.Balancer API - // guarantee of synchronous calls). - childMu sync.Mutex - child *gracefulswitch.Balancer + child synchronizingBalancerWrapper // mu guards access to the following fields. It also helps to synchronize // behaviors of the following events: config updates, firing of the interval @@ -190,8 +193,8 @@ type outlierDetectionBalancer struct { // which uses addrs. This balancer waits for the interval timer algorithm to // finish before making the update to the addrs map. // - // This mutex is never held at the same time as childMu (within the context - // of a single goroutine). + // This mutex is never held when calling methods on the child policy + // (within the context of a single goroutine). mu sync.Mutex addrs map[string]*addressInfo cfg *LBConfig @@ -276,13 +279,9 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt // the balancer.Balancer API, so it is guaranteed to be called in a // synchronous manner, so it cannot race with this read. if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name { - b.childMu.Lock() - err := b.child.SwitchTo(bb) - if err != nil { - b.childMu.Unlock() + if err := b.child.switchTo(bb); err != nil { return fmt.Errorf("outlier detection: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err) } - b.childMu.Unlock() } b.mu.Lock() @@ -319,12 +318,10 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt } b.mu.Unlock() - b.childMu.Lock() - err := b.child.UpdateClientConnState(balancer.ClientConnState{ + err := b.child.updateClientConnState(balancer.ClientConnState{ ResolverState: s.ResolverState, BalancerConfig: b.cfg.ChildPolicy.Config, }) - b.childMu.Unlock() done := make(chan struct{}) b.pickerUpdateCh.Put(lbCfgUpdate{ @@ -337,9 +334,7 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt } func (b *outlierDetectionBalancer) ResolverError(err error) { - b.childMu.Lock() - defer b.childMu.Unlock() - b.child.ResolverError(err) + b.child.resolverError(err) } func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { @@ -355,6 +350,7 @@ func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state if state.ConnectivityState == connectivity.Shutdown { delete(b.scWrappers, scw.SubConn) } + scw.setLatestConnectivityState(state.ConnectivityState) b.scUpdateCh.Put(&scUpdate{ scw: scw, state: state, @@ -368,9 +364,7 @@ func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state func (b *outlierDetectionBalancer) Close() { b.closed.Fire() <-b.done.Done() - b.childMu.Lock() - b.child.Close() - b.childMu.Unlock() + b.child.closeLB() b.scUpdateCh.Close() b.pickerUpdateCh.Close() @@ -383,9 +377,7 @@ func (b *outlierDetectionBalancer) Close() { } func (b *outlierDetectionBalancer) ExitIdle() { - b.childMu.Lock() - defer b.childMu.Unlock() - b.child.ExitIdle() + b.child.exitIdle() } // wrappedPicker delegates to the child policy's picker, and when the request @@ -475,10 +467,13 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal return nil, err } scw := &subConnWrapper{ - SubConn: sc, - addresses: addrs, - scUpdateCh: b.scUpdateCh, - listener: oldListener, + SubConn: sc, + addresses: addrs, + scUpdateCh: b.scUpdateCh, + listener: oldListener, + latestRawConnectivityState: balancer.SubConnState{ConnectivityState: connectivity.Idle}, + latestHealthState: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, + healthListenerEnabled: len(addrs) == 1 && pickfirstleaf.IsManagedByPickfirst(addrs[0]), } b.mu.Lock() defer b.mu.Unlock() @@ -596,34 +591,18 @@ func (b *outlierDetectionBalancer) Target() string { // if the SubConn is not ejected. func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { scw := u.scw - scw.latestState = u.state - if !scw.ejected { - if scw.listener != nil { - b.childMu.Lock() - scw.listener(u.state) - b.childMu.Unlock() - } - } + scw.clearHealthListener() + b.child.updateSubConnState(scw, u.state) +} + +func (b *outlierDetectionBalancer) handleSubConnHealthUpdate(u *scHealthUpdate) { + b.child.updateSubConnHealthState(u.scw, u.state) } // handleEjectedUpdate handles any SubConns that get ejected/unejected, and // forwards the appropriate corresponding subConnState to the child policy. func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) { - scw := u.scw - scw.ejected = u.isEjected - // If scw.latestState has never been written to will default to connectivity - // IDLE, which is fine. - stateToUpdate := scw.latestState - if u.isEjected { - stateToUpdate = balancer.SubConnState{ - ConnectivityState: connectivity.TransientFailure, - } - } - if scw.listener != nil { - b.childMu.Lock() - scw.listener(stateToUpdate) - b.childMu.Unlock() - } + b.child.handleEjectionUpdate(u) } // handleChildStateUpdate forwards the picker update wrapped in a wrapped picker @@ -696,6 +675,8 @@ func (b *outlierDetectionBalancer) run() { b.handleSubConnUpdate(u) case *ejectionUpdate: b.handleEjectedUpdate(u) + case *scHealthUpdate: + b.handleSubConnHealthUpdate(u) } case update, ok := <-b.pickerUpdateCh.Get(): if !ok { @@ -880,6 +861,69 @@ func (b *outlierDetectionBalancer) unejectAddress(addrInfo *addressInfo) { } } +// synchronizingBalancerWrapper serializes calls into balancer (to uphold the +// balancer.Balancer API guarantee of synchronous calls). It also ensures a +// consistent order of locking mutexes when using SubConn listeners to avoid +// deadlocks. +type synchronizingBalancerWrapper struct { + // mu should not be used directly from outside this struct, instead use + // methods defined on the struct. + mu sync.Mutex + lb *gracefulswitch.Balancer +} + +func (sbw *synchronizingBalancerWrapper) switchTo(builder balancer.Builder) error { + sbw.mu.Lock() + defer sbw.mu.Unlock() + return sbw.lb.SwitchTo(builder) +} + +func (sbw *synchronizingBalancerWrapper) updateClientConnState(state balancer.ClientConnState) error { + sbw.mu.Lock() + defer sbw.mu.Unlock() + return sbw.lb.UpdateClientConnState(state) +} + +func (sbw *synchronizingBalancerWrapper) resolverError(err error) { + sbw.mu.Lock() + defer sbw.mu.Unlock() + sbw.lb.ResolverError(err) +} + +func (sbw *synchronizingBalancerWrapper) closeLB() { + sbw.mu.Lock() + defer sbw.mu.Unlock() + sbw.lb.Close() +} + +func (sbw *synchronizingBalancerWrapper) exitIdle() { + sbw.mu.Lock() + defer sbw.mu.Unlock() + sbw.lb.ExitIdle() +} + +func (sbw *synchronizingBalancerWrapper) updateSubConnHealthState(scw *subConnWrapper, scs balancer.SubConnState) { + sbw.mu.Lock() + defer sbw.mu.Unlock() + scw.updateSubConnHealthState(scs) +} + +func (sbw *synchronizingBalancerWrapper) updateSubConnState(scw *subConnWrapper, scs balancer.SubConnState) { + sbw.mu.Lock() + defer sbw.mu.Unlock() + scw.updateSubConnConnectivityState(scs) +} + +func (sbw *synchronizingBalancerWrapper) handleEjectionUpdate(u *ejectionUpdate) { + sbw.mu.Lock() + defer sbw.mu.Unlock() + if u.isEjected { + u.scw.handleEjection() + } else { + u.scw.handleUnejection() + } +} + // addressInfo contains the runtime information about an address that pertains // to Outlier Detection. This struct and all of its fields is protected by // outlierDetectionBalancer.mu in the case where it is accessed through the diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/subconn_wrapper.go index 0fa422d8f262e..7d710fde1b2a9 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -19,9 +19,11 @@ package outlierdetection import ( "fmt" + "sync" "unsafe" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/resolver" ) @@ -31,23 +33,54 @@ import ( // whether or not this SubConn is ejected. type subConnWrapper struct { balancer.SubConn - listener func(balancer.SubConnState) - // addressInfo is a pointer to the subConnWrapper's corresponding address - // map entry, if the map entry exists. + // map entry, if the map entry exists. It is accessed atomically. addressInfo unsafe.Pointer // *addressInfo + // The following fields are set during object creation and read-only after + // that. + + listener func(balancer.SubConnState) + // healthListenerEnabled indicates whether the leaf LB policy is using a + // generic health listener. When enabled, ejection updates are sent via the + // health listener instead of the connectivity listener. Once Dualstack + // changes are complete, all SubConns will be created by pickfirst which + // uses the health listener. + // TODO: https://github.com/grpc/grpc-go/issues/7915 - Once Dualstack + // changes are complete, all SubConns will be created by pick_first and + // outlier detection will only use the health listener for ejection and + // this field can be removed. + healthListenerEnabled bool + + scUpdateCh *buffer.Unbounded + + // The following fields are only referenced in the context of a work + // serializing buffer and don't need to be protected by a mutex. + // These two pieces of state will reach eventual consistency due to sync in // run(), and child will always have the correctly updated SubConnState. - // latestState is the latest state update from the underlying SubConn. This - // is used whenever a SubConn gets unejected. - latestState balancer.SubConnState - ejected bool - scUpdateCh *buffer.Unbounded + ejected bool // addresses is the list of address(es) this SubConn was created with to // help support any change in address(es) addresses []resolver.Address + // latestHealthState is tracked to update the child policy during + // unejection. + latestHealthState balancer.SubConnState + // latestRawConnectivityState is tracked to update the child policy during + // unejection. + latestRawConnectivityState balancer.SubConnState + + // Access to the following fields are protected by a mutex. These fields + // should not be accessed from outside this file, instead use methods + // defined on the struct. + mu sync.Mutex + healthListener func(balancer.SubConnState) + // latestReceivedConnectivityState is the SubConn's most recent connectivity + // state received. It may not be delivered to the child balancer yet. It is + // used to ensure a health listener is registered with the SubConn only when + // the SubConn is READY. + latestReceivedConnectivityState connectivity.State } // eject causes the wrapper to report a state update with the TRANSIENT_FAILURE @@ -72,3 +105,108 @@ func (scw *subConnWrapper) uneject() { func (scw *subConnWrapper) String() string { return fmt.Sprintf("%+v", scw.addresses) } + +func (scw *subConnWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) { + // gRPC currently supports two mechanisms that provide a health signal for + // a connection: client-side health checking and outlier detection. Earlier + // both these mechanisms signaled unhealthiness by setting the subchannel + // state to TRANSIENT_FAILURE. As part of the dualstack changes to make + // pick_first the universal leaf policy (see A61), both these mechanisms + // started using the new health listener to make health signal visible to + // the petiole policies without affecting the underlying connectivity + // management of the pick_first policy + if !scw.healthListenerEnabled { + logger.Errorf("Health listener unexpectedly registered on SubConn %v.", scw) + return + } + + scw.mu.Lock() + defer scw.mu.Unlock() + + if scw.latestReceivedConnectivityState != connectivity.Ready { + return + } + scw.healthListener = listener + if listener == nil { + scw.SubConn.RegisterHealthListener(nil) + return + } + + scw.SubConn.RegisterHealthListener(func(scs balancer.SubConnState) { + scw.scUpdateCh.Put(&scHealthUpdate{ + scw: scw, + state: scs, + }) + }) +} + +// updateSubConnHealthState stores the latest health state for unejection and +// sends updates the health listener. +func (scw *subConnWrapper) updateSubConnHealthState(scs balancer.SubConnState) { + scw.latestHealthState = scs + if scw.ejected { + return + } + scw.mu.Lock() + defer scw.mu.Unlock() + if scw.healthListener != nil { + scw.healthListener(scs) + } +} + +// updateSubConnConnectivityState stores the latest connectivity state for +// unejection and updates the raw connectivity listener. +func (scw *subConnWrapper) updateSubConnConnectivityState(scs balancer.SubConnState) { + scw.latestRawConnectivityState = scs + // If the raw connectivity listener is used for ejection, and the SubConn is + // ejected, don't send the update. + if scw.ejected && !scw.healthListenerEnabled { + return + } + if scw.listener != nil { + scw.listener(scs) + } +} + +func (scw *subConnWrapper) clearHealthListener() { + scw.mu.Lock() + defer scw.mu.Unlock() + scw.healthListener = nil +} + +func (scw *subConnWrapper) handleUnejection() { + scw.ejected = false + if !scw.healthListenerEnabled { + // If scw.latestRawConnectivityState has never been written to will + // default to connectivity IDLE, which is fine. + scw.updateSubConnConnectivityState(scw.latestRawConnectivityState) + return + } + // If scw.latestHealthState has never been written to will use the health + // state CONNECTING set during object creation. + scw.updateSubConnHealthState(scw.latestHealthState) +} + +func (scw *subConnWrapper) handleEjection() { + scw.ejected = true + stateToUpdate := balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + } + if !scw.healthListenerEnabled { + if scw.listener != nil { + scw.listener(stateToUpdate) + } + return + } + scw.mu.Lock() + defer scw.mu.Unlock() + if scw.healthListener != nil { + scw.healthListener(stateToUpdate) + } +} + +func (scw *subConnWrapper) setLatestConnectivityState(state connectivity.State) { + scw.mu.Lock() + defer scw.mu.Unlock() + scw.latestReceivedConnectivityState = state +} diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/wrrlocality/balancer.go b/vendor/google.golang.org/grpc/xds/internal/balancer/wrrlocality/balancer.go index 943ee7806ba18..2b289a81143c9 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/wrrlocality/balancer.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/wrrlocality/balancer.go @@ -120,6 +120,13 @@ func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address { return addr } +// SetAddrInfoInEndpoint returns a copy of endpoint in which the Attributes +// field is updated with AddrInfo. +func SetAddrInfoInEndpoint(endpoint resolver.Endpoint, addrInfo AddrInfo) resolver.Endpoint { + endpoint.Attributes = endpoint.Attributes.WithValue(attributeKey{}, addrInfo) + return endpoint +} + func (a AddrInfo) String() string { return fmt.Sprintf("Locality Weight: %d", a.LocalityWeight) } diff --git a/vendor/google.golang.org/grpc/xds/internal/internal.go b/vendor/google.golang.org/grpc/xds/internal/internal.go index 1d8a6b03f1b3b..74c9195215514 100644 --- a/vendor/google.golang.org/grpc/xds/internal/internal.go +++ b/vendor/google.golang.org/grpc/xds/internal/internal.go @@ -86,6 +86,12 @@ func SetLocalityID(addr resolver.Address, l LocalityID) resolver.Address { return addr } +// SetLocalityIDInEndpoint sets locality ID in endpoint to l. +func SetLocalityIDInEndpoint(endpoint resolver.Endpoint, l LocalityID) resolver.Endpoint { + endpoint.Attributes = endpoint.Attributes.WithValue(localityKey, l) + return endpoint +} + // ResourceTypeMapForTesting maps TypeUrl to corresponding ResourceType. var ResourceTypeMapForTesting map[string]any diff --git a/vendor/google.golang.org/grpc/xds/internal/xdsclient/authority.go b/vendor/google.golang.org/grpc/xds/internal/xdsclient/authority.go index 24673a8d90779..f81685a45e692 100644 --- a/vendor/google.golang.org/grpc/xds/internal/xdsclient/authority.go +++ b/vendor/google.golang.org/grpc/xds/internal/xdsclient/authority.go @@ -639,6 +639,9 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w if a.logger.V(2) { a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON()) } + // state can only be accessed in the context of an + // xdsClientSerializer callback. Hence making a copy of the cached + // resource here for watchCallbackSerializer. resource := state.cache a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) }) } @@ -646,9 +649,13 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // immediately as well. if state.md.Status == xdsresource.ServiceStatusNACKed { if a.logger.V(2) { - a.logger.Infof("Resource type %q with resource name %q was NACKed: %s", rType.TypeName(), resourceName, state.cache.ToJSON()) + a.logger.Infof("Resource type %q with resource name %q was NACKed", rType.TypeName(), resourceName) } - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) }) + // state can only be accessed in the context of an + // xdsClientSerializer callback. Hence making a copy of the error + // here for watchCallbackSerializer. + err := state.md.ErrState.Err + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) }) } // If the metadata field is updated to indicate that the management // server does not have this resource, notify the new watcher. @@ -687,7 +694,7 @@ func (a *authority) unwatchResource(rType xdsresource.Type, resourceName string, delete(state.watchers, watcher) if len(state.watchers) > 0 { if a.logger.V(2) { - a.logger.Infof("%d more watchers exist for type %q, resource name %q", rType.TypeName(), resourceName) + a.logger.Infof("Other watchers exist for type %q, resource name %q", rType.TypeName(), resourceName) } return } diff --git a/vendor/google.golang.org/grpc/xds/internal/xdsclient/client_new.go b/vendor/google.golang.org/grpc/xds/internal/xdsclient/client_new.go index d8f9d6c9417bb..55299c457b25c 100644 --- a/vendor/google.golang.org/grpc/xds/internal/xdsclient/client_new.go +++ b/vendor/google.golang.org/grpc/xds/internal/xdsclient/client_new.go @@ -26,7 +26,6 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/backoff" - "google.golang.org/grpc/internal/cache" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/xds/bootstrap" xdsclientinternal "google.golang.org/grpc/xds/internal/xdsclient/internal" @@ -61,11 +60,11 @@ func New(name string) (XDSClient, func(), error) { if err != nil { return nil, nil, fmt.Errorf("xds: failed to get xDS bootstrap config: %v", err) } - return newRefCounted(name, config, defaultWatchExpiryTimeout, defaultIdleChannelExpiryTimeout, backoff.DefaultExponential.Backoff) + return newRefCounted(name, config, defaultWatchExpiryTimeout, backoff.DefaultExponential.Backoff) } // newClientImpl returns a new xdsClient with the given config. -func newClientImpl(config *bootstrap.Config, watchExpiryTimeout, idleChannelExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) { +func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) { ctx, cancel := context.WithCancel(context.Background()) c := &clientImpl{ done: grpcsync.NewEvent(), @@ -78,7 +77,6 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout, idleChannelExpi transportBuilder: &grpctransport.Builder{}, resourceTypes: newResourceTypeRegistry(), xdsActiveChannels: make(map[string]*channelState), - xdsIdleChannels: cache.NewTimeoutCache(idleChannelExpiryTimeout), } for name, cfg := range config.Authorities() { @@ -121,10 +119,6 @@ type OptionsForTesting struct { // unspecified, uses the default value used in non-test code. WatchExpiryTimeout time.Duration - // IdleChannelExpiryTimeout is the timeout before idle xdsChannels are - // deleted. If unspecified, uses the default value used in non-test code. - IdleChannelExpiryTimeout time.Duration - // StreamBackoffAfterFailure is the backoff function used to determine the // backoff duration after stream failures. // If unspecified, uses the default value used in non-test code. @@ -147,9 +141,6 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) { if opts.WatchExpiryTimeout == 0 { opts.WatchExpiryTimeout = defaultWatchExpiryTimeout } - if opts.IdleChannelExpiryTimeout == 0 { - opts.IdleChannelExpiryTimeout = defaultIdleChannelExpiryTimeout - } if opts.StreamBackoffAfterFailure == nil { opts.StreamBackoffAfterFailure = defaultStreamBackoffFunc } @@ -158,7 +149,7 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) { if err != nil { return nil, nil, err } - return newRefCounted(opts.Name, config, opts.WatchExpiryTimeout, opts.IdleChannelExpiryTimeout, opts.StreamBackoffAfterFailure) + return newRefCounted(opts.Name, config, opts.WatchExpiryTimeout, opts.StreamBackoffAfterFailure) } // GetForTesting returns an xDS client created earlier using the given name. diff --git a/vendor/google.golang.org/grpc/xds/internal/xdsclient/client_refcounted.go b/vendor/google.golang.org/grpc/xds/internal/xdsclient/client_refcounted.go index 1c105ac4e061b..f5fc76d8a75c8 100644 --- a/vendor/google.golang.org/grpc/xds/internal/xdsclient/client_refcounted.go +++ b/vendor/google.golang.org/grpc/xds/internal/xdsclient/client_refcounted.go @@ -27,10 +27,7 @@ import ( "google.golang.org/grpc/internal/xds/bootstrap" ) -const ( - defaultWatchExpiryTimeout = 15 * time.Second - defaultIdleChannelExpiryTimeout = 5 * time.Minute -) +const defaultWatchExpiryTimeout = 15 * time.Second var ( // The following functions are no-ops in the actual code, but can be @@ -43,26 +40,31 @@ var ( func clientRefCountedClose(name string) { clientsMu.Lock() - defer clientsMu.Unlock() - client, ok := clients[name] if !ok { logger.Errorf("Attempt to close a non-existent xDS client with name %s", name) + clientsMu.Unlock() return } if client.decrRef() != 0 { + clientsMu.Unlock() return } + delete(clients, name) + clientsMu.Unlock() + + // This attempts to close the transport to the management server and could + // theoretically call back into the xdsclient package again and deadlock. + // Hence, this needs to be called without holding the lock. client.clientImpl.close() xdsClientImplCloseHook(name) - delete(clients, name) } // newRefCounted creates a new reference counted xDS client implementation for // name, if one does not exist already. If an xDS client for the given name // exists, it gets a reference to it and returns it. -func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout, idleChannelExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) { +func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) { clientsMu.Lock() defer clientsMu.Unlock() @@ -72,7 +74,7 @@ func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout, id } // Create the new client implementation. - c, err := newClientImpl(config, watchExpiryTimeout, idleChannelExpiryTimeout, streamBackoff) + c, err := newClientImpl(config, watchExpiryTimeout, streamBackoff) if err != nil { return nil, nil, err } diff --git a/vendor/google.golang.org/grpc/xds/internal/xdsclient/clientimpl.go b/vendor/google.golang.org/grpc/xds/internal/xdsclient/clientimpl.go index df0949e23cc7e..bb8d9040022f9 100644 --- a/vendor/google.golang.org/grpc/xds/internal/xdsclient/clientimpl.go +++ b/vendor/google.golang.org/grpc/xds/internal/xdsclient/clientimpl.go @@ -25,7 +25,6 @@ import ( "sync/atomic" "time" - "google.golang.org/grpc/internal/cache" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/xds/bootstrap" @@ -63,14 +62,9 @@ type clientImpl struct { // these channels, and forwards updates from the channels to each of these // authorities. // - // Once all references to a channel are dropped, the channel is moved to the - // idle cache where it lives for a configured duration before being closed. - // If the channel is required before the idle timeout fires, it is revived - // from the idle cache and used. + // Once all references to a channel are dropped, the channel is closed. channelsMu sync.Mutex xdsActiveChannels map[string]*channelState // Map from server config to in-use xdsChannels. - xdsIdleChannels *cache.TimeoutCache // Map from server config to idle xdsChannels. - closeCond *sync.Cond } // channelState represents the state of an xDS channel. It tracks the number of @@ -173,21 +167,6 @@ func (c *clientImpl) close() { c.close() } - // Similarly, closing idle channels cannot be done with the lock held, for - // the same reason as described above. So, we clear the idle cache in a - // goroutine and use a condition variable to wait on the condition that the - // idle cache has zero entries. The Wait() method on the condition variable - // releases the lock and blocks the goroutine until signaled (which happens - // when an idle channel is removed from the cache and closed), and grabs the - // lock before returning. - c.channelsMu.Lock() - c.closeCond = sync.NewCond(&c.channelsMu) - go c.xdsIdleChannels.Clear(true) - for c.xdsIdleChannels.Len() > 0 { - c.closeCond.Wait() - } - c.channelsMu.Unlock() - c.serializerClose() <-c.serializer.Done() @@ -289,27 +268,15 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in c.logger.Infof("Received request for a reference to an xdsChannel for server config %q", serverConfig) } - // Use an active channel, if one exists for this server config. + // Use an existing channel, if one exists for this server config. if state, ok := c.xdsActiveChannels[serverConfig.String()]; ok { if c.logger.V(2) { - c.logger.Infof("Reusing an active xdsChannel for server config %q", serverConfig) + c.logger.Infof("Reusing an existing xdsChannel for server config %q", serverConfig) } initLocked(state) return state.channel, c.releaseChannel(serverConfig, state, deInitLocked), nil } - // If an idle channel exists for this server config, remove it from the - // idle cache and add it to the map of active channels, and return it. - if s, ok := c.xdsIdleChannels.Remove(serverConfig.String()); ok { - if c.logger.V(2) { - c.logger.Infof("Reviving an xdsChannel from the idle cache for server config %q", serverConfig) - } - state := s.(*channelState) - c.xdsActiveChannels[serverConfig.String()] = state - initLocked(state) - return state.channel, c.releaseChannel(serverConfig, state, deInitLocked), nil - } - if c.logger.V(2) { c.logger.Infof("Creating a new xdsChannel for server config %q", serverConfig) } @@ -345,9 +312,7 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in } // releaseChannel is a function that is called when a reference to an xdsChannel -// needs to be released. It handles the logic of moving the channel to an idle -// cache if there are no other active references, and closing the channel if it -// remains in the idle cache for the configured duration. +// needs to be released. It handles closing channels with no active references. // // The function takes the following parameters: // - serverConfig: the server configuration for the xdsChannel @@ -360,7 +325,6 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in func (c *clientImpl) releaseChannel(serverConfig *bootstrap.ServerConfig, state *channelState, deInitLocked func(*channelState)) func() { return grpcsync.OnceFunc(func() { c.channelsMu.Lock() - defer c.channelsMu.Unlock() if c.logger.V(2) { c.logger.Infof("Received request to release a reference to an xdsChannel for server config %q", serverConfig) @@ -372,40 +336,17 @@ func (c *clientImpl) releaseChannel(serverConfig *bootstrap.ServerConfig, state if c.logger.V(2) { c.logger.Infof("xdsChannel %p has other active references", state.channel) } + c.channelsMu.Unlock() return } - // Move the channel to the idle cache instead of closing - // immediately. If the channel remains in the idle cache for - // the configured duration, it will get closed. delete(c.xdsActiveChannels, serverConfig.String()) if c.logger.V(2) { - c.logger.Infof("Moving xdsChannel [%p] for server config %s to the idle cache", state.channel, serverConfig) + c.logger.Infof("Closing xdsChannel [%p] for server config %s", state.channel, serverConfig) } + channelToClose := state.channel + c.channelsMu.Unlock() - // The idle cache expiry timeout results in the channel getting - // closed in another serializer callback. - c.xdsIdleChannels.Add(serverConfig.String(), state, grpcsync.OnceFunc(func() { - c.channelsMu.Lock() - channelToClose := state.channel - c.channelsMu.Unlock() - - if c.logger.V(2) { - c.logger.Infof("Idle cache expiry timeout fired for xdsChannel [%p] for server config %s", state.channel, serverConfig) - } - channelToClose.close() - - // If the channel is being closed as a result of the xDS client - // being closed, closeCond is non-nil and we need to signal from - // here to unblock Close(). Holding the lock is not necessary - // to call Signal() on a condition variable. But the field - // `c.closeCond` needs to guarded by the lock, which is why we - // acquire it here. - c.channelsMu.Lock() - if c.closeCond != nil { - c.closeCond.Signal() - } - c.channelsMu.Unlock() - })) + channelToClose.close() }) } diff --git a/vendor/google.golang.org/grpc/xds/internal/xdsclient/transport/ads/ads_stream.go b/vendor/google.golang.org/grpc/xds/internal/xdsclient/transport/ads/ads_stream.go index 4c4856a073476..bf7510058c5f4 100644 --- a/vendor/google.golang.org/grpc/xds/internal/xdsclient/transport/ads/ads_stream.go +++ b/vendor/google.golang.org/grpc/xds/internal/xdsclient/transport/ads/ads_stream.go @@ -664,7 +664,7 @@ func (s *StreamImpl) onError(err error, msgReceived bool) { // connection hitting its max connection age limit. // (see [gRFC A9](https://github.com/grpc/proposal/blob/master/A9-server-side-conn-mgt.md)). if msgReceived { - err = xdsresource.NewErrorf(xdsresource.ErrTypeStreamFailedAfterRecv, err.Error()) + err = xdsresource.NewErrorf(xdsresource.ErrTypeStreamFailedAfterRecv, "%s", err.Error()) } s.eventHandler.OnADSStreamError(err) diff --git a/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/type_eds.go b/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/type_eds.go index 1254d250c99b0..f94a17e7c66ac 100644 --- a/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/type_eds.go +++ b/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/type_eds.go @@ -49,7 +49,7 @@ const ( // Endpoint contains information of an endpoint. type Endpoint struct { - Address string + Addresses []string HealthStatus EndpointHealthStatus Weight uint32 } diff --git a/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/unmarshal_eds.go b/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/unmarshal_eds.go index f65845b702c8c..fd780d6632d27 100644 --- a/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/unmarshal_eds.go +++ b/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/unmarshal_eds.go @@ -26,6 +26,7 @@ import ( v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/xds/internal" "google.golang.org/protobuf/proto" @@ -93,14 +94,22 @@ func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint, uniqueEndpointAddrs } weight = w.GetValue() } - addr := parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()) - if uniqueEndpointAddrs[addr] { - return nil, fmt.Errorf("duplicate endpoint with the same address %s", addr) + addrs := []string{parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress())} + if envconfig.XDSDualstackEndpointsEnabled { + for _, sa := range lbEndpoint.GetEndpoint().GetAdditionalAddresses() { + addrs = append(addrs, parseAddress(sa.GetAddress().GetSocketAddress())) + } + } + + for _, a := range addrs { + if uniqueEndpointAddrs[a] { + return nil, fmt.Errorf("duplicate endpoint with the same address %s", a) + } + uniqueEndpointAddrs[a] = true } - uniqueEndpointAddrs[addr] = true endpoints = append(endpoints, Endpoint{ HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()), - Address: addr, + Addresses: addrs, Weight: weight, }) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 81e2943465455..5ddee9f9e749c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -2049,7 +2049,7 @@ google.golang.org/genproto/googleapis/api/monitoredres google.golang.org/genproto/googleapis/rpc/code google.golang.org/genproto/googleapis/rpc/errdetails google.golang.org/genproto/googleapis/rpc/status -# google.golang.org/grpc v1.69.4 +# google.golang.org/grpc v1.70.0 ## explicit; go 1.22 google.golang.org/grpc google.golang.org/grpc/attributes