diff --git a/poollet/bucketpoollet/cmd/bucketpoollet/app/app.go b/poollet/bucketpoollet/cmd/bucketpoollet/app/app.go index 6162c0c0b..e1beea86a 100644 --- a/poollet/bucketpoollet/cmd/bucketpoollet/app/app.go +++ b/poollet/bucketpoollet/cmd/bucketpoollet/app/app.go @@ -64,8 +64,9 @@ type Options struct { WatchFilterValue string - QPS float32 - Burst int + QPS float32 + Burst int + MaxConcurrentReconciles int } func (o *Options) AddFlags(fs *pflag.FlagSet) { @@ -91,6 +92,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.Float32VarP(&o.QPS, "qps", "", 100, "Kubernetes client qps.") fs.IntVar(&o.Burst, "burst", 200, "Kubernetes client burst.") + fs.IntVar(&o.MaxConcurrentReconciles, "max-concurrent-reconciles", 1, "Maximum number of concurrent reconciles.") } func (o *Options) MarkFlagsRequired(cmd *cobra.Command) { @@ -215,13 +217,14 @@ func Run(ctx context.Context, opts Options) error { } if err := (&controllers.BucketReconciler{ - EventRecorder: mgr.GetEventRecorderFor("buckets"), - Client: mgr.GetClient(), - Scheme: scheme, - BucketRuntime: bucketRuntime, - BucketClassMapper: bucketClassMapper, - BucketPoolName: opts.BucketPoolName, - WatchFilterValue: opts.WatchFilterValue, + EventRecorder: mgr.GetEventRecorderFor("buckets"), + Client: mgr.GetClient(), + Scheme: scheme, + BucketRuntime: bucketRuntime, + BucketClassMapper: bucketClassMapper, + BucketPoolName: opts.BucketPoolName, + WatchFilterValue: opts.WatchFilterValue, + MaxConcurrentReconciles: opts.MaxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("error setting up bucket reconciler with manager: %w", err) } diff --git a/poollet/bucketpoollet/controllers/bucket_controller.go b/poollet/bucketpoollet/controllers/bucket_controller.go index c79863e22..566a350c7 100644 --- a/poollet/bucketpoollet/controllers/bucket_controller.go +++ b/poollet/bucketpoollet/controllers/bucket_controller.go @@ -10,19 +10,20 @@ import ( "fmt" "github.com/go-logr/logr" - "github.com/ironcore-dev/controller-utils/clientutils" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1" iriBucket "github.com/ironcore-dev/ironcore/iri/apis/bucket" iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1" - irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1" bucketpoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/api/v1alpha1" "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/bcm" "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/controllers/events" ironcoreclient "github.com/ironcore-dev/ironcore/utils/client" "github.com/ironcore-dev/ironcore/utils/predicates" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + + "github.com/ironcore-dev/controller-utils/clientutils" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,6 +33,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" ) @@ -47,6 +49,8 @@ type BucketReconciler struct { BucketPoolName string WatchFilterValue string + + MaxConcurrentReconciles int } func (r *BucketReconciler) iriBucketLabels(bucket *storagev1alpha1.Bucket) map[string]string { @@ -432,7 +436,6 @@ func (r *BucketReconciler) updateStatus(ctx context.Context, log logr.Logger, bu Endpoint: iriAccess.Endpoint, } } - } base := bucket.DeepCopy() @@ -466,6 +469,10 @@ func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error { predicates.ResourceIsNotExternallyManaged(log), ), ). + WithOptions( + controller.Options{ + MaxConcurrentReconciles: r.MaxConcurrentReconciles, + }). Complete(r) } diff --git a/poollet/bucketpoollet/controllers/controllers_suite_test.go b/poollet/bucketpoollet/controllers/controllers_suite_test.go index f700e160b..2f081ec2f 100644 --- a/poollet/bucketpoollet/controllers/controllers_suite_test.go +++ b/poollet/bucketpoollet/controllers/controllers_suite_test.go @@ -8,8 +8,6 @@ import ( "testing" "time" - "github.com/ironcore-dev/controller-utils/buildutils" - "github.com/ironcore-dev/controller-utils/modutils" corev1alpha1 "github.com/ironcore-dev/ironcore/api/core/v1alpha1" storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1" storageclient "github.com/ironcore-dev/ironcore/internal/client/storage" @@ -22,8 +20,9 @@ import ( "github.com/ironcore-dev/ironcore/utils/envtest/apiserver" "github.com/ironcore-dev/ironcore/utils/envtest/controllermanager" "github.com/ironcore-dev/ironcore/utils/envtest/process" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" + + "github.com/ironcore-dev/controller-utils/buildutils" + "github.com/ironcore-dev/controller-utils/modutils" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -34,10 +33,13 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" - . "sigs.k8s.io/controller-runtime/pkg/envtest/komega" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" metricserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "sigs.k8s.io/controller-runtime/pkg/envtest/komega" ) var ( diff --git a/poollet/machinepoollet/cmd/machinepoollet/app/app.go b/poollet/machinepoollet/cmd/machinepoollet/app/app.go index ef958af48..84595e29b 100644 --- a/poollet/machinepoollet/cmd/machinepoollet/app/app.go +++ b/poollet/machinepoollet/cmd/machinepoollet/app/app.go @@ -80,8 +80,9 @@ type Options struct { WatchFilterValue string - QPS float32 - Burst int + QPS float32 + Burst int + MaxConcurrentReconciles int } func (o *Options) AddFlags(fs *pflag.FlagSet) { @@ -113,6 +114,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.Float32VarP(&o.QPS, "qps", "", 100, "Kubernetes client qps.") fs.IntVar(&o.Burst, "burst", 200, "Kubernetes client burst.") + fs.IntVar(&o.MaxConcurrentReconciles, "max-concurrent-reconciles", 1, "Maximum number of concurrent reconciles.") } func (o *Options) MarkFlagsRequired(cmd *cobra.Command) { @@ -315,16 +317,17 @@ func Run(ctx context.Context, opts Options) error { } if err := (&controllers.MachineReconciler{ - EventRecorder: mgr.GetEventRecorderFor("machines"), - Client: mgr.GetClient(), - MachineRuntime: machineRuntime, - MachineRuntimeName: version.RuntimeName, - MachineRuntimeVersion: version.RuntimeVersion, - MachineClassMapper: machineClassMapper, - MachinePoolName: opts.MachinePoolName, - DownwardAPILabels: opts.MachineDownwardAPILabels, - DownwardAPIAnnotations: opts.MachineDownwardAPIAnnotations, - WatchFilterValue: opts.WatchFilterValue, + EventRecorder: mgr.GetEventRecorderFor("machines"), + Client: mgr.GetClient(), + MachineRuntime: machineRuntime, + MachineRuntimeName: version.RuntimeName, + MachineRuntimeVersion: version.RuntimeVersion, + MachineClassMapper: machineClassMapper, + MachinePoolName: opts.MachinePoolName, + DownwardAPILabels: opts.MachineDownwardAPILabels, + DownwardAPIAnnotations: opts.MachineDownwardAPIAnnotations, + WatchFilterValue: opts.WatchFilterValue, + MaxConcurrentReconciles: opts.MaxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("error setting up machine reconciler with manager: %w", err) } diff --git a/poollet/machinepoollet/controllers/machine_controller.go b/poollet/machinepoollet/controllers/machine_controller.go index ee6c778cc..02f8656f3 100644 --- a/poollet/machinepoollet/controllers/machine_controller.go +++ b/poollet/machinepoollet/controllers/machine_controller.go @@ -11,7 +11,10 @@ import ( "time" "github.com/go-logr/logr" - "github.com/ironcore-dev/controller-utils/clientutils" + "golang.org/x/exp/maps" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + commonv1alpha1 "github.com/ironcore-dev/ironcore/api/common/v1alpha1" computev1alpha1 "github.com/ironcore-dev/ironcore/api/compute/v1alpha1" networkingv1alpha1 "github.com/ironcore-dev/ironcore/api/networking/v1alpha1" @@ -27,9 +30,8 @@ import ( utilclient "github.com/ironcore-dev/ironcore/utils/client" utilmaps "github.com/ironcore-dev/ironcore/utils/maps" "github.com/ironcore-dev/ironcore/utils/predicates" - "golang.org/x/exp/maps" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + + "github.com/ironcore-dev/controller-utils/clientutils" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -39,6 +41,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -60,6 +63,8 @@ type MachineReconciler struct { DownwardAPIAnnotations map[string]string WatchFilterValue string + + MaxConcurrentReconciles int } func (r *MachineReconciler) machineKeyLabelSelector(machineKey client.ObjectKey) map[string]string { @@ -985,5 +990,9 @@ func (r *MachineReconciler) SetupWithManager(mgr ctrl.Manager) error { &storagev1alpha1.Volume{}, r.enqueueMachinesReferencingVolume(), ). + WithOptions( + controller.Options{ + MaxConcurrentReconciles: r.MaxConcurrentReconciles, + }). Complete(r) } diff --git a/poollet/volumepoollet/cmd/volumepoollet/app/app.go b/poollet/volumepoollet/cmd/volumepoollet/app/app.go index 0beb87257..f4fdcb743 100644 --- a/poollet/volumepoollet/cmd/volumepoollet/app/app.go +++ b/poollet/volumepoollet/cmd/volumepoollet/app/app.go @@ -65,8 +65,9 @@ type Options struct { WatchFilterValue string - QPS float32 - Burst int + QPS float32 + Burst int + MaxConcurrentReconciles int } func (o *Options) AddFlags(fs *pflag.FlagSet) { @@ -92,6 +93,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.Float32VarP(&o.QPS, "qps", "", 100, "Kubernetes client qps.") fs.IntVar(&o.Burst, "burst", 200, "Kubernetes client burst.") + fs.IntVar(&o.MaxConcurrentReconciles, "max-concurrent-reconciles", 1, "Maximum number of concurrent reconciles.") } func (o *Options) MarkFlagsRequired(cmd *cobra.Command) { @@ -219,13 +221,14 @@ func Run(ctx context.Context, opts Options) error { } if err := (&controllers.VolumeReconciler{ - EventRecorder: mgr.GetEventRecorderFor("volumes"), - Client: mgr.GetClient(), - Scheme: scheme, - VolumeRuntime: volumeRuntime, - VolumeClassMapper: volumeClassMapper, - VolumePoolName: opts.VolumePoolName, - WatchFilterValue: opts.WatchFilterValue, + EventRecorder: mgr.GetEventRecorderFor("volumes"), + Client: mgr.GetClient(), + Scheme: scheme, + VolumeRuntime: volumeRuntime, + VolumeClassMapper: volumeClassMapper, + VolumePoolName: opts.VolumePoolName, + WatchFilterValue: opts.WatchFilterValue, + MaxConcurrentReconciles: opts.MaxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("error setting up volume reconciler with manager: %w", err) } diff --git a/poollet/volumepoollet/controllers/volume_controller.go b/poollet/volumepoollet/controllers/volume_controller.go index 571f0982f..2eaa28285 100644 --- a/poollet/volumepoollet/controllers/volume_controller.go +++ b/poollet/volumepoollet/controllers/volume_controller.go @@ -10,20 +10,21 @@ import ( "fmt" "github.com/go-logr/logr" - "github.com/ironcore-dev/controller-utils/clientutils" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + corev1alpha1 "github.com/ironcore-dev/ironcore/api/core/v1alpha1" storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1" irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1" iriVolume "github.com/ironcore-dev/ironcore/iri/apis/volume" iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1" - volumepoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/volumepoollet/api/v1alpha1" "github.com/ironcore-dev/ironcore/poollet/volumepoollet/controllers/events" "github.com/ironcore-dev/ironcore/poollet/volumepoollet/vcm" ironcoreclient "github.com/ironcore-dev/ironcore/utils/client" "github.com/ironcore-dev/ironcore/utils/predicates" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + + "github.com/ironcore-dev/controller-utils/clientutils" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,6 +34,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" ) @@ -48,6 +50,8 @@ type VolumeReconciler struct { VolumePoolName string WatchFilterValue string + + MaxConcurrentReconciles int } func (r *VolumeReconciler) iriVolumeLabels(volume *storagev1alpha1.Volume) map[string]string { @@ -443,7 +447,7 @@ func (r *VolumeReconciler) update(ctx context.Context, log logr.Logger, volume * return nil } -func (r *VolumeReconciler) volumeSecretName(volumeName string, volumeHandle string) string { +func (r *VolumeReconciler) volumeSecretName(volumeName, volumeHandle string) string { sum := sha256.Sum256([]byte(fmt.Sprintf("%s/%s", volumeName, volumeHandle))) return hex.EncodeToString(sum[:])[:63] } @@ -508,7 +512,6 @@ func (r *VolumeReconciler) updateStatus(ctx context.Context, log logr.Logger, vo VolumeAttributes: iriAccess.Attributes, } } - } base := volume.DeepCopy() @@ -542,6 +545,10 @@ func (r *VolumeReconciler) SetupWithManager(mgr ctrl.Manager) error { predicates.ResourceIsNotExternallyManaged(log), ), ). + WithOptions( + controller.Options{ + MaxConcurrentReconciles: r.MaxConcurrentReconciles, + }). Complete(r) }