From a203ee6c48cb0696818fe9274c77f5aafbca9945 Mon Sep 17 00:00:00 2001 From: Benjamin Alpert Date: Mon, 18 Nov 2024 16:14:19 +0100 Subject: [PATCH] Add QPS and Burst configuration for Kubernetes client in controller optionsUpdated `Options` struct with `QPS` and `Burst` fields, added corresponding flags in `AddFlags` method, and set these values in the `Run` function for ippoollet, machinepoollet, and volumepoollet controllers. This allows configurable rate limits for Kubernetes client requests and logs the settings on startup.Suggestion: Review the changes to ensure consistency and test the new configuration options. Signed-off-by: Benjamin Alpert change poollet client specific qps and burst opts implementation to common options pattern change to client config defaults for qps and burst --- .../bucketbroker/cmd/bucketbroker/app/app.go | 21 +++++++++--- .../cmd/machinebroker/app/app.go | 25 ++++++++++---- .../volumebroker/cmd/volumebroker/app/app.go | 21 +++++++++--- .../cmd/bucketpoollet/app/app.go | 22 ++++++++----- .../controllers/bucket_controller.go | 17 +++++++--- .../controllers/controllers_suite_test.go | 12 ++++--- .../cmd/machinepoollet/app/app.go | 28 +++++++++------- .../controllers/machine_controller.go | 17 +++++++--- .../cmd/volumepoollet/app/app.go | 22 ++++++++----- .../controllers/volume_controller.go | 19 +++++++---- utils/client/config/config.go | 14 ++++++-- utils/client/config/options.go | 33 ++++++++++++++++++- 12 files changed, 186 insertions(+), 65 deletions(-) diff --git a/broker/bucketbroker/cmd/bucketbroker/app/app.go b/broker/bucketbroker/cmd/bucketbroker/app/app.go index 64be67848..bdbbad7c9 100644 --- a/broker/bucketbroker/cmd/bucketbroker/app/app.go +++ b/broker/bucketbroker/cmd/bucketbroker/app/app.go @@ -9,13 +9,16 @@ import ( "fmt" "net" - "github.com/ironcore-dev/controller-utils/configutils" - "github.com/ironcore-dev/ironcore/broker/bucketbroker/server" - "github.com/ironcore-dev/ironcore/broker/common" - iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1" "github.com/spf13/cobra" "github.com/spf13/pflag" "google.golang.org/grpc" + + "github.com/ironcore-dev/ironcore/broker/bucketbroker/server" + "github.com/ironcore-dev/ironcore/broker/common" + iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1" + "github.com/ironcore-dev/ironcore/utils/client/config" + + "github.com/ironcore-dev/controller-utils/configutils" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" ) @@ -24,6 +27,9 @@ type Options struct { Kubeconfig string Address string + QPS float32 + Burst int + Namespace string BucketPoolName string BucketPoolSelector map[string]string @@ -33,6 +39,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.Kubeconfig, "kubeconfig", o.Kubeconfig, "Path pointing to a kubeconfig file to use.") fs.StringVar(&o.Address, "address", "/var/run/iri-bucketbroker.sock", "Address to listen on.") + fs.Float32Var(&o.QPS, "qps", config.QPS, "Kubernetes client qps.") + fs.IntVar(&o.Burst, "burst", config.Burst, "Kubernetes client burst.") + fs.StringVar(&o.Namespace, "namespace", o.Namespace, "Target Kubernetes namespace to use.") fs.StringVar(&o.BucketPoolName, "bucket-pool-name", o.BucketPoolName, "Name of the target bucket pool to pin buckets to, if any.") fs.StringToStringVar(&o.BucketPoolSelector, "bucket-pool-selector", o.BucketPoolSelector, "Selector of the target bucket pools to pin buckets to, if any.") @@ -74,6 +83,10 @@ func Run(ctx context.Context, opts Options) error { return err } + cfg.QPS = opts.QPS + cfg.Burst = opts.Burst + setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst) + srv, err := server.New(cfg, server.Options{ Namespace: opts.Namespace, BucketPoolName: opts.BucketPoolName, diff --git a/broker/machinebroker/cmd/machinebroker/app/app.go b/broker/machinebroker/cmd/machinebroker/app/app.go index def4a2021..1547ad4a1 100644 --- a/broker/machinebroker/cmd/machinebroker/app/app.go +++ b/broker/machinebroker/cmd/machinebroker/app/app.go @@ -13,16 +13,19 @@ import ( "net/url" "github.com/go-logr/logr" - "github.com/ironcore-dev/controller-utils/configutils" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "github.com/ironcore-dev/ironcore/broker/common" commongrpc "github.com/ironcore-dev/ironcore/broker/common/grpc" machinebrokerhttp "github.com/ironcore-dev/ironcore/broker/machinebroker/http" "github.com/ironcore-dev/ironcore/broker/machinebroker/server" iri "github.com/ironcore-dev/ironcore/iri/apis/machine/v1alpha1" - "github.com/spf13/cobra" - "github.com/spf13/pflag" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc" + "github.com/ironcore-dev/ironcore/utils/client/config" + + "github.com/ironcore-dev/controller-utils/configutils" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" ) @@ -34,6 +37,9 @@ type Options struct { BaseURL string BrokerDownwardAPILabels map[string]string + QPS float32 + Burst int + Namespace string MachinePoolName string MachinePoolSelector map[string]string @@ -48,6 +54,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.StringToStringVar(&o.BrokerDownwardAPILabels, "broker-downward-api-label", nil, "The labels to broker via downward API. "+ "Example is for instance to broker \"root-machine-uid\" initially obtained via \"machinepoollet.ironcore.dev/machine-uid\".") + fs.Float32Var(&o.QPS, "qps", config.QPS, "Kubernetes client qps.") + fs.IntVar(&o.Burst, "burst", config.Burst, "Kubernetes client burst.") + fs.StringVar(&o.Namespace, "namespace", o.Namespace, "Target Kubernetes namespace to use.") fs.StringVar(&o.MachinePoolName, "machine-pool-name", o.MachinePoolName, "Name of the target machine pool to pin machines to, if any.") fs.StringToStringVar(&o.MachinePoolSelector, "machine-pool-selector", o.MachinePoolSelector, "Selector of the target machine pools to pin machines to, if any.") @@ -102,6 +111,10 @@ func Run(ctx context.Context, opts Options) error { baseURL = u.String() } + cfg.QPS = opts.QPS + cfg.Burst = opts.Burst + setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst) + log.V(1).Info("Creating server", "Namespace", opts.Namespace, "MachinePoolName", opts.MachinePoolName, @@ -142,7 +155,7 @@ func runServer(ctx context.Context, setupLog, log logr.Logger, srv *server.Serve return nil } -func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, srv *server.Server, opts Options) error { +func runGRPCServer(ctx context.Context, setupLog, log logr.Logger, srv *server.Server, opts Options) error { log.V(1).Info("Cleaning up any previous socket") if err := common.CleanupSocketIfExists(opts.Address); err != nil { return fmt.Errorf("error cleaning up socket: %w", err) diff --git a/broker/volumebroker/cmd/volumebroker/app/app.go b/broker/volumebroker/cmd/volumebroker/app/app.go index 98f492a22..1a339b734 100644 --- a/broker/volumebroker/cmd/volumebroker/app/app.go +++ b/broker/volumebroker/cmd/volumebroker/app/app.go @@ -9,13 +9,16 @@ import ( "fmt" "net" - "github.com/ironcore-dev/controller-utils/configutils" - "github.com/ironcore-dev/ironcore/broker/common" - "github.com/ironcore-dev/ironcore/broker/volumebroker/server" - iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1" "github.com/spf13/cobra" "github.com/spf13/pflag" "google.golang.org/grpc" + + "github.com/ironcore-dev/ironcore/broker/common" + "github.com/ironcore-dev/ironcore/broker/volumebroker/server" + iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1" + "github.com/ironcore-dev/ironcore/utils/client/config" + + "github.com/ironcore-dev/controller-utils/configutils" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" ) @@ -24,6 +27,9 @@ type Options struct { Kubeconfig string Address string + QPS float32 + Burst int + Namespace string VolumePoolName string VolumePoolSelector map[string]string @@ -33,6 +39,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.Kubeconfig, "kubeconfig", o.Kubeconfig, "Path pointing to a kubeconfig file to use.") fs.StringVar(&o.Address, "address", "/var/run/iri-volumebroker.sock", "Address to listen on.") + fs.Float32Var(&o.QPS, "qps", config.QPS, "Kubernetes client qps.") + fs.IntVar(&o.Burst, "burst", config.Burst, "Kubernetes client burst.") + fs.StringVar(&o.Namespace, "namespace", o.Namespace, "Target Kubernetes namespace to use.") fs.StringVar(&o.VolumePoolName, "volume-pool-name", o.VolumePoolName, "Name of the target volume pool to pin volumes to, if any.") fs.StringToStringVar(&o.VolumePoolSelector, "volume-pool-selector", o.VolumePoolSelector, "Selector of the target volume pools to pin volumes to, if any.") @@ -74,6 +83,10 @@ func Run(ctx context.Context, opts Options) error { return err } + cfg.QPS = opts.QPS + cfg.Burst = opts.Burst + setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst) + srv, err := server.New(cfg, server.Options{ Namespace: opts.Namespace, VolumePoolName: opts.VolumePoolName, diff --git a/poollet/bucketpoollet/cmd/bucketpoollet/app/app.go b/poollet/bucketpoollet/cmd/bucketpoollet/app/app.go index a9b522996..d23aec753 100644 --- a/poollet/bucketpoollet/cmd/bucketpoollet/app/app.go +++ b/poollet/bucketpoollet/cmd/bucketpoollet/app/app.go @@ -65,6 +65,8 @@ type Options struct { RelistThreshold time.Duration WatchFilterValue string + + MaxConcurrentReconciles int } func (o *Options) AddFlags(fs *pflag.FlagSet) { @@ -89,6 +91,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&o.RelistThreshold, "relist-threshold", 3*time.Minute, "event channel relisting threshold.") fs.StringVar(&o.WatchFilterValue, "watch-filter", "", "Value to filter for while watching.") + + fs.IntVar(&o.MaxConcurrentReconciles, "max-concurrent-reconciles", 1, "Maximum number of concurrent reconciles.") } func (o *Options) MarkFlagsRequired(cmd *cobra.Command) { @@ -150,7 +154,8 @@ func Run(ctx context.Context, opts Options) error { return fmt.Errorf("error getting config: %w", err) } - setupLog.Info("IRI client config", "ChannelCapacity", opts.ChannelCapacity, "RelistPeriod", opts.RelistPeriod, "RelistThreshold", opts.RelistThreshold) + setupLog.Info("IRI Client configuration", "ChannelCapacity", opts.ChannelCapacity, "RelistPeriod", opts.RelistPeriod, "RelistThreshold", opts.RelistThreshold) + setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst) leaderElectionCfg, err := configutils.GetConfig( configutils.Kubeconfig(opts.LeaderElectionKubeconfig), @@ -213,13 +218,14 @@ func Run(ctx context.Context, opts Options) error { } if err := (&controllers.BucketReconciler{ - EventRecorder: mgr.GetEventRecorderFor("buckets"), - Client: mgr.GetClient(), - Scheme: scheme, - BucketRuntime: bucketRuntime, - BucketClassMapper: bucketClassMapper, - BucketPoolName: opts.BucketPoolName, - WatchFilterValue: opts.WatchFilterValue, + EventRecorder: mgr.GetEventRecorderFor("buckets"), + Client: mgr.GetClient(), + Scheme: scheme, + BucketRuntime: bucketRuntime, + BucketClassMapper: bucketClassMapper, + BucketPoolName: opts.BucketPoolName, + WatchFilterValue: opts.WatchFilterValue, + MaxConcurrentReconciles: opts.MaxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("error setting up bucket reconciler with manager: %w", err) } diff --git a/poollet/bucketpoollet/controllers/bucket_controller.go b/poollet/bucketpoollet/controllers/bucket_controller.go index c79863e22..566a350c7 100644 --- a/poollet/bucketpoollet/controllers/bucket_controller.go +++ b/poollet/bucketpoollet/controllers/bucket_controller.go @@ -10,19 +10,20 @@ import ( "fmt" "github.com/go-logr/logr" - "github.com/ironcore-dev/controller-utils/clientutils" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1" iriBucket "github.com/ironcore-dev/ironcore/iri/apis/bucket" iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1" - irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1" bucketpoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/api/v1alpha1" "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/bcm" "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/controllers/events" ironcoreclient "github.com/ironcore-dev/ironcore/utils/client" "github.com/ironcore-dev/ironcore/utils/predicates" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + + "github.com/ironcore-dev/controller-utils/clientutils" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,6 +33,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" ) @@ -47,6 +49,8 @@ type BucketReconciler struct { BucketPoolName string WatchFilterValue string + + MaxConcurrentReconciles int } func (r *BucketReconciler) iriBucketLabels(bucket *storagev1alpha1.Bucket) map[string]string { @@ -432,7 +436,6 @@ func (r *BucketReconciler) updateStatus(ctx context.Context, log logr.Logger, bu Endpoint: iriAccess.Endpoint, } } - } base := bucket.DeepCopy() @@ -466,6 +469,10 @@ func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error { predicates.ResourceIsNotExternallyManaged(log), ), ). + WithOptions( + controller.Options{ + MaxConcurrentReconciles: r.MaxConcurrentReconciles, + }). Complete(r) } diff --git a/poollet/bucketpoollet/controllers/controllers_suite_test.go b/poollet/bucketpoollet/controllers/controllers_suite_test.go index f700e160b..2f081ec2f 100644 --- a/poollet/bucketpoollet/controllers/controllers_suite_test.go +++ b/poollet/bucketpoollet/controllers/controllers_suite_test.go @@ -8,8 +8,6 @@ import ( "testing" "time" - "github.com/ironcore-dev/controller-utils/buildutils" - "github.com/ironcore-dev/controller-utils/modutils" corev1alpha1 "github.com/ironcore-dev/ironcore/api/core/v1alpha1" storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1" storageclient "github.com/ironcore-dev/ironcore/internal/client/storage" @@ -22,8 +20,9 @@ import ( "github.com/ironcore-dev/ironcore/utils/envtest/apiserver" "github.com/ironcore-dev/ironcore/utils/envtest/controllermanager" "github.com/ironcore-dev/ironcore/utils/envtest/process" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" + + "github.com/ironcore-dev/controller-utils/buildutils" + "github.com/ironcore-dev/controller-utils/modutils" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -34,10 +33,13 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" - . "sigs.k8s.io/controller-runtime/pkg/envtest/komega" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" metricserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "sigs.k8s.io/controller-runtime/pkg/envtest/komega" ) var ( diff --git a/poollet/machinepoollet/cmd/machinepoollet/app/app.go b/poollet/machinepoollet/cmd/machinepoollet/app/app.go index 4e9bb764a..3dacf1ac3 100644 --- a/poollet/machinepoollet/cmd/machinepoollet/app/app.go +++ b/poollet/machinepoollet/cmd/machinepoollet/app/app.go @@ -81,6 +81,8 @@ type Options struct { AddressesOptions addresses.GetOptions WatchFilterValue string + + MaxConcurrentReconciles int } func (o *Options) AddFlags(fs *pflag.FlagSet) { @@ -111,6 +113,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { o.AddressesOptions.BindFlags(fs) fs.StringVar(&o.WatchFilterValue, "watch-filter", "", "Value to filter for while watching.") + + fs.IntVar(&o.MaxConcurrentReconciles, "max-concurrent-reconciles", 1, "Maximum number of concurrent reconciles.") } func (o *Options) MarkFlagsRequired(cmd *cobra.Command) { @@ -206,7 +210,8 @@ func Run(ctx context.Context, opts Options) error { return fmt.Errorf("error getting config: %w", err) } - setupLog.Info("IRI client config", "ChannelCapacity", opts.ChannelCapacity, "RelistPeriod", opts.RelistPeriod, "RelistThreshold", opts.RelistThreshold) + setupLog.Info("IRI Client configuration", "ChannelCapacity", opts.ChannelCapacity, "RelistPeriod", opts.RelistPeriod, "RelistThreshold", opts.RelistThreshold) + setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst) leaderElectionCfg, err := configutils.GetConfig( configutils.Kubeconfig(opts.LeaderElectionKubeconfig), @@ -313,16 +318,17 @@ func Run(ctx context.Context, opts Options) error { } if err := (&controllers.MachineReconciler{ - EventRecorder: mgr.GetEventRecorderFor("machines"), - Client: mgr.GetClient(), - MachineRuntime: machineRuntime, - MachineRuntimeName: version.RuntimeName, - MachineRuntimeVersion: version.RuntimeVersion, - MachineClassMapper: machineClassMapper, - MachinePoolName: opts.MachinePoolName, - DownwardAPILabels: opts.MachineDownwardAPILabels, - DownwardAPIAnnotations: opts.MachineDownwardAPIAnnotations, - WatchFilterValue: opts.WatchFilterValue, + EventRecorder: mgr.GetEventRecorderFor("machines"), + Client: mgr.GetClient(), + MachineRuntime: machineRuntime, + MachineRuntimeName: version.RuntimeName, + MachineRuntimeVersion: version.RuntimeVersion, + MachineClassMapper: machineClassMapper, + MachinePoolName: opts.MachinePoolName, + DownwardAPILabels: opts.MachineDownwardAPILabels, + DownwardAPIAnnotations: opts.MachineDownwardAPIAnnotations, + WatchFilterValue: opts.WatchFilterValue, + MaxConcurrentReconciles: opts.MaxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("error setting up machine reconciler with manager: %w", err) } diff --git a/poollet/machinepoollet/controllers/machine_controller.go b/poollet/machinepoollet/controllers/machine_controller.go index fbb9a8f0c..ab35bd4ab 100644 --- a/poollet/machinepoollet/controllers/machine_controller.go +++ b/poollet/machinepoollet/controllers/machine_controller.go @@ -12,7 +12,10 @@ import ( "time" "github.com/go-logr/logr" - "github.com/ironcore-dev/controller-utils/clientutils" + "golang.org/x/exp/maps" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + commonv1alpha1 "github.com/ironcore-dev/ironcore/api/common/v1alpha1" computev1alpha1 "github.com/ironcore-dev/ironcore/api/compute/v1alpha1" networkingv1alpha1 "github.com/ironcore-dev/ironcore/api/networking/v1alpha1" @@ -28,9 +31,8 @@ import ( utilclient "github.com/ironcore-dev/ironcore/utils/client" utilmaps "github.com/ironcore-dev/ironcore/utils/maps" "github.com/ironcore-dev/ironcore/utils/predicates" - "golang.org/x/exp/maps" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + + "github.com/ironcore-dev/controller-utils/clientutils" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -40,6 +42,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -61,6 +64,8 @@ type MachineReconciler struct { DownwardAPIAnnotations map[string]string WatchFilterValue string + + MaxConcurrentReconciles int } func (r *MachineReconciler) machineKeyLabelSelector(machineKey client.ObjectKey) map[string]string { @@ -989,5 +994,9 @@ func (r *MachineReconciler) SetupWithManager(mgr ctrl.Manager) error { &storagev1alpha1.Volume{}, r.enqueueMachinesReferencingVolume(), ). + WithOptions( + controller.Options{ + MaxConcurrentReconciles: r.MaxConcurrentReconciles, + }). Complete(r) } diff --git a/poollet/volumepoollet/cmd/volumepoollet/app/app.go b/poollet/volumepoollet/cmd/volumepoollet/app/app.go index 3b62cb6ae..510c6caea 100644 --- a/poollet/volumepoollet/cmd/volumepoollet/app/app.go +++ b/poollet/volumepoollet/cmd/volumepoollet/app/app.go @@ -66,6 +66,8 @@ type Options struct { RelistThreshold time.Duration WatchFilterValue string + + MaxConcurrentReconciles int } func (o *Options) AddFlags(fs *pflag.FlagSet) { @@ -90,6 +92,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&o.RelistThreshold, "relist-threshold", 3*time.Minute, "event channel relisting threshold.") fs.StringVar(&o.WatchFilterValue, "watch-filter", "", "Value to filter for while watching.") + + fs.IntVar(&o.MaxConcurrentReconciles, "max-concurrent-reconciles", 1, "Maximum number of concurrent reconciles.") } func (o *Options) MarkFlagsRequired(cmd *cobra.Command) { @@ -150,7 +154,8 @@ func Run(ctx context.Context, opts Options) error { return fmt.Errorf("error getting config: %w", err) } - setupLog.Info("IRI client config", "ChannelCapacity", opts.ChannelCapacity, "RelistPeriod", opts.RelistPeriod, "RelistThreshold", opts.RelistThreshold) + setupLog.Info("IRI Client configuration", "ChannelCapacity", opts.ChannelCapacity, "RelistPeriod", opts.RelistPeriod, "RelistThreshold", opts.RelistThreshold) + setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst) leaderElectionCfg, err := configutils.GetConfig( configutils.Kubeconfig(opts.LeaderElectionKubeconfig), @@ -217,13 +222,14 @@ func Run(ctx context.Context, opts Options) error { } if err := (&controllers.VolumeReconciler{ - EventRecorder: mgr.GetEventRecorderFor("volumes"), - Client: mgr.GetClient(), - Scheme: scheme, - VolumeRuntime: volumeRuntime, - VolumeClassMapper: volumeClassMapper, - VolumePoolName: opts.VolumePoolName, - WatchFilterValue: opts.WatchFilterValue, + EventRecorder: mgr.GetEventRecorderFor("volumes"), + Client: mgr.GetClient(), + Scheme: scheme, + VolumeRuntime: volumeRuntime, + VolumeClassMapper: volumeClassMapper, + VolumePoolName: opts.VolumePoolName, + WatchFilterValue: opts.WatchFilterValue, + MaxConcurrentReconciles: opts.MaxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("error setting up volume reconciler with manager: %w", err) } diff --git a/poollet/volumepoollet/controllers/volume_controller.go b/poollet/volumepoollet/controllers/volume_controller.go index 571f0982f..2eaa28285 100644 --- a/poollet/volumepoollet/controllers/volume_controller.go +++ b/poollet/volumepoollet/controllers/volume_controller.go @@ -10,20 +10,21 @@ import ( "fmt" "github.com/go-logr/logr" - "github.com/ironcore-dev/controller-utils/clientutils" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + corev1alpha1 "github.com/ironcore-dev/ironcore/api/core/v1alpha1" storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1" irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1" iriVolume "github.com/ironcore-dev/ironcore/iri/apis/volume" iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1" - volumepoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/volumepoollet/api/v1alpha1" "github.com/ironcore-dev/ironcore/poollet/volumepoollet/controllers/events" "github.com/ironcore-dev/ironcore/poollet/volumepoollet/vcm" ironcoreclient "github.com/ironcore-dev/ironcore/utils/client" "github.com/ironcore-dev/ironcore/utils/predicates" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + + "github.com/ironcore-dev/controller-utils/clientutils" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,6 +34,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" ) @@ -48,6 +50,8 @@ type VolumeReconciler struct { VolumePoolName string WatchFilterValue string + + MaxConcurrentReconciles int } func (r *VolumeReconciler) iriVolumeLabels(volume *storagev1alpha1.Volume) map[string]string { @@ -443,7 +447,7 @@ func (r *VolumeReconciler) update(ctx context.Context, log logr.Logger, volume * return nil } -func (r *VolumeReconciler) volumeSecretName(volumeName string, volumeHandle string) string { +func (r *VolumeReconciler) volumeSecretName(volumeName, volumeHandle string) string { sum := sha256.Sum256([]byte(fmt.Sprintf("%s/%s", volumeName, volumeHandle))) return hex.EncodeToString(sum[:])[:63] } @@ -508,7 +512,6 @@ func (r *VolumeReconciler) updateStatus(ctx context.Context, log logr.Logger, vo VolumeAttributes: iriAccess.Attributes, } } - } base := volume.DeepCopy() @@ -542,6 +545,10 @@ func (r *VolumeReconciler) SetupWithManager(mgr ctrl.Manager) error { predicates.ResourceIsNotExternallyManaged(log), ), ). + WithOptions( + controller.Options{ + MaxConcurrentReconciles: r.MaxConcurrentReconciles, + }). Complete(r) } diff --git a/utils/client/config/config.go b/utils/client/config/config.go index 2c49e2853..a0731fd29 100644 --- a/utils/client/config/config.go +++ b/utils/client/config/config.go @@ -14,7 +14,9 @@ import ( "time" "github.com/go-logr/logr" + utilrest "github.com/ironcore-dev/ironcore/utils/rest" + certificatesv1 "k8s.io/api/certificates/v1" corev1 "k8s.io/api/core/v1" utilnet "k8s.io/apimachinery/pkg/util/net" @@ -44,12 +46,16 @@ const ( // EgressSelectorConfigFlagName is the name of the egress-selector-config flag. EgressSelectorConfigFlagName = "egress-selector-config" -) -var ( - log = ctrl.Log.WithName("client").WithName("config") + // QPS is the default value for allowed queries per second for a client. + QPS float32 = 20.0 + + // Burst is the default value for allowed burst rate for a client. + Burst int = 30 ) +var log = ctrl.Log.WithName("client").WithName("config") + // EgressSelectionName is the name of the egress configuration to use. type EgressSelectionName string @@ -273,6 +279,8 @@ func (g *Getter) getConfig(ctx context.Context, o *GetConfigOptions) (*rest.Conf } cfg.Dial = dialFunc + cfg.QPS = o.QPS + cfg.Burst = o.Burst return cfg, nil, nil } diff --git a/utils/client/config/options.go b/utils/client/config/options.go index d593ee33a..3aeb12297 100644 --- a/utils/client/config/options.go +++ b/utils/client/config/options.go @@ -6,8 +6,9 @@ package config import ( "fmt" - "github.com/ironcore-dev/ironcore/utils/generic" "github.com/spf13/pflag" + + "github.com/ironcore-dev/ironcore/utils/generic" ) // GetConfigOptions are options to supply for a GetConfig call. @@ -38,6 +39,12 @@ type GetConfigOptions struct { // EgressSelectorConfig is the path to an egress selector config to load. EgressSelectorConfig string + + // QPS specifies the queries per second allowed for the client. + QPS float32 + + // Burst specified the burst rate allowed for the client. + Burst int } // BindFlagOptions are options for GetConfigOptions.BindFlags. @@ -80,6 +87,8 @@ func (o *GetConfigOptions) BindFlags(fs *pflag.FlagSet, opts ...func(*BindFlagOp fs.StringVar(&o.BootstrapKubeconfig, bo.NameFunc(BootstrapKubeconfigFlagName), "", "Path to a bootstrap kubeconfig.") fs.BoolVar(&o.RotateCertificates, bo.NameFunc(RotateCertificatesFlagName), false, "Whether to use automatic certificate / config rotation.") fs.StringVar(&o.EgressSelectorConfig, bo.NameFunc(EgressSelectorConfigFlagName), "", "Path pointing to an egress selector config to use.") + fs.Float32Var(&o.QPS, "qps", QPS, "Kubernetes client qps.") + fs.IntVar(&o.Burst, "burst", Burst, "Kubernetes client burst.") } // ApplyToGetConfig implements GetConfigOption. @@ -105,6 +114,12 @@ func (o *GetConfigOptions) ApplyToGetConfig(o2 *GetConfigOptions) { if o.EgressSelectorConfig != "" { o2.EgressSelectorConfig = o.EgressSelectorConfig } + if o.QPS != 0 { + o2.QPS = o.QPS + } + if o.Burst != 0 { + o2.Burst = o.Burst + } } // ApplyOptions applies all GetConfigOption tro this GetConfigOptions. @@ -146,3 +161,19 @@ func (w WithRotate) ApplyToGetConfig(o *GetConfigOptions) { // RotateCertificates enables certificate rotation. var RotateCertificates = WithRotate(true) + +// WithQPS sets GetConfigOptions.QPS to the specified value. +type WithQPS float32 + +// ApplyToGetConfig implements GetConfigOption. +func (c WithQPS) ApplyToGetConfig(o *GetConfigOptions) { + o.QPS = float32(c) +} + +// WithBurst sets GetConfigOptions.Burst to the specified value. +type WithBurst int + +// ApplyToGetConfig implements GetConfigOption. +func (c WithBurst) ApplyToGetConfig(o *GetConfigOptions) { + o.Burst = int(c) +}