Skip to content

Commit

Permalink
make concurrent reconciles configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
balpert89 committed Nov 25, 2024
1 parent 8a83ccb commit 8b7d861
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 50 deletions.
21 changes: 12 additions & 9 deletions poollet/bucketpoollet/cmd/bucketpoollet/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ type Options struct {

WatchFilterValue string

QPS float32
Burst int
QPS float32
Burst int
MaxConcurrentReconciles int
}

func (o *Options) AddFlags(fs *pflag.FlagSet) {
Expand All @@ -91,6 +92,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {

fs.Float32VarP(&o.QPS, "qps", "", 100, "Kubernetes client qps.")
fs.IntVar(&o.Burst, "burst", 200, "Kubernetes client burst.")
fs.IntVar(&o.MaxConcurrentReconciles, "max-concurrent-reconciles", 1, "Maximum number of concurrent reconciles.")
}

func (o *Options) MarkFlagsRequired(cmd *cobra.Command) {
Expand Down Expand Up @@ -215,13 +217,14 @@ func Run(ctx context.Context, opts Options) error {
}

if err := (&controllers.BucketReconciler{
EventRecorder: mgr.GetEventRecorderFor("buckets"),
Client: mgr.GetClient(),
Scheme: scheme,
BucketRuntime: bucketRuntime,
BucketClassMapper: bucketClassMapper,
BucketPoolName: opts.BucketPoolName,
WatchFilterValue: opts.WatchFilterValue,
EventRecorder: mgr.GetEventRecorderFor("buckets"),
Client: mgr.GetClient(),
Scheme: scheme,
BucketRuntime: bucketRuntime,
BucketClassMapper: bucketClassMapper,
BucketPoolName: opts.BucketPoolName,
WatchFilterValue: opts.WatchFilterValue,
MaxConcurrentReconciles: opts.MaxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("error setting up bucket reconciler with manager: %w", err)
}
Expand Down
17 changes: 12 additions & 5 deletions poollet/bucketpoollet/controllers/bucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ import (
"fmt"

"github.com/go-logr/logr"
"github.com/ironcore-dev/controller-utils/clientutils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
iriBucket "github.com/ironcore-dev/ironcore/iri/apis/bucket"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"

irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1"
bucketpoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/api/v1alpha1"
"github.com/ironcore-dev/ironcore/poollet/bucketpoollet/bcm"
"github.com/ironcore-dev/ironcore/poollet/bucketpoollet/controllers/events"
ironcoreclient "github.com/ironcore-dev/ironcore/utils/client"
"github.com/ironcore-dev/ironcore/utils/predicates"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/ironcore-dev/controller-utils/clientutils"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -32,6 +33,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
Expand All @@ -47,6 +49,8 @@ type BucketReconciler struct {

BucketPoolName string
WatchFilterValue string

MaxConcurrentReconciles int
}

func (r *BucketReconciler) iriBucketLabels(bucket *storagev1alpha1.Bucket) map[string]string {
Expand Down Expand Up @@ -432,7 +436,6 @@ func (r *BucketReconciler) updateStatus(ctx context.Context, log logr.Logger, bu
Endpoint: iriAccess.Endpoint,
}
}

}

base := bucket.DeepCopy()
Expand Down Expand Up @@ -466,6 +469,10 @@ func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
predicates.ResourceIsNotExternallyManaged(log),
),
).
WithOptions(
controller.Options{
MaxConcurrentReconciles: r.MaxConcurrentReconciles,
}).
Complete(r)
}

Expand Down
12 changes: 7 additions & 5 deletions poollet/bucketpoollet/controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"testing"
"time"

"github.com/ironcore-dev/controller-utils/buildutils"
"github.com/ironcore-dev/controller-utils/modutils"
corev1alpha1 "github.com/ironcore-dev/ironcore/api/core/v1alpha1"
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
storageclient "github.com/ironcore-dev/ironcore/internal/client/storage"
Expand All @@ -22,8 +20,9 @@ import (
"github.com/ironcore-dev/ironcore/utils/envtest/apiserver"
"github.com/ironcore-dev/ironcore/utils/envtest/controllermanager"
"github.com/ironcore-dev/ironcore/utils/envtest/process"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/ironcore-dev/controller-utils/buildutils"
"github.com/ironcore-dev/controller-utils/modutils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -34,10 +33,13 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
. "sigs.k8s.io/controller-runtime/pkg/envtest/komega"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "sigs.k8s.io/controller-runtime/pkg/envtest/komega"
)

var (
Expand Down
27 changes: 15 additions & 12 deletions poollet/machinepoollet/cmd/machinepoollet/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ type Options struct {

WatchFilterValue string

QPS float32
Burst int
QPS float32
Burst int
MaxConcurrentReconciles int
}

func (o *Options) AddFlags(fs *pflag.FlagSet) {
Expand Down Expand Up @@ -113,6 +114,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {

fs.Float32VarP(&o.QPS, "qps", "", 100, "Kubernetes client qps.")
fs.IntVar(&o.Burst, "burst", 200, "Kubernetes client burst.")
fs.IntVar(&o.MaxConcurrentReconciles, "max-concurrent-reconciles", 1, "Maximum number of concurrent reconciles.")
}

func (o *Options) MarkFlagsRequired(cmd *cobra.Command) {
Expand Down Expand Up @@ -315,16 +317,17 @@ func Run(ctx context.Context, opts Options) error {
}

if err := (&controllers.MachineReconciler{
EventRecorder: mgr.GetEventRecorderFor("machines"),
Client: mgr.GetClient(),
MachineRuntime: machineRuntime,
MachineRuntimeName: version.RuntimeName,
MachineRuntimeVersion: version.RuntimeVersion,
MachineClassMapper: machineClassMapper,
MachinePoolName: opts.MachinePoolName,
DownwardAPILabels: opts.MachineDownwardAPILabels,
DownwardAPIAnnotations: opts.MachineDownwardAPIAnnotations,
WatchFilterValue: opts.WatchFilterValue,
EventRecorder: mgr.GetEventRecorderFor("machines"),
Client: mgr.GetClient(),
MachineRuntime: machineRuntime,
MachineRuntimeName: version.RuntimeName,
MachineRuntimeVersion: version.RuntimeVersion,
MachineClassMapper: machineClassMapper,
MachinePoolName: opts.MachinePoolName,
DownwardAPILabels: opts.MachineDownwardAPILabels,
DownwardAPIAnnotations: opts.MachineDownwardAPIAnnotations,
WatchFilterValue: opts.WatchFilterValue,
MaxConcurrentReconciles: opts.MaxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("error setting up machine reconciler with manager: %w", err)
}
Expand Down
17 changes: 13 additions & 4 deletions poollet/machinepoollet/controllers/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/ironcore-dev/controller-utils/clientutils"
"golang.org/x/exp/maps"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

commonv1alpha1 "github.com/ironcore-dev/ironcore/api/common/v1alpha1"
computev1alpha1 "github.com/ironcore-dev/ironcore/api/compute/v1alpha1"
networkingv1alpha1 "github.com/ironcore-dev/ironcore/api/networking/v1alpha1"
Expand All @@ -27,9 +30,8 @@ import (
utilclient "github.com/ironcore-dev/ironcore/utils/client"
utilmaps "github.com/ironcore-dev/ironcore/utils/maps"
"github.com/ironcore-dev/ironcore/utils/predicates"
"golang.org/x/exp/maps"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/ironcore-dev/controller-utils/clientutils"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -39,6 +41,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -60,6 +63,8 @@ type MachineReconciler struct {
DownwardAPIAnnotations map[string]string

WatchFilterValue string

MaxConcurrentReconciles int
}

func (r *MachineReconciler) machineKeyLabelSelector(machineKey client.ObjectKey) map[string]string {
Expand Down Expand Up @@ -985,5 +990,9 @@ func (r *MachineReconciler) SetupWithManager(mgr ctrl.Manager) error {
&storagev1alpha1.Volume{},
r.enqueueMachinesReferencingVolume(),
).
WithOptions(
controller.Options{
MaxConcurrentReconciles: r.MaxConcurrentReconciles,
}).
Complete(r)
}
21 changes: 12 additions & 9 deletions poollet/volumepoollet/cmd/volumepoollet/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ type Options struct {

WatchFilterValue string

QPS float32
Burst int
QPS float32
Burst int
MaxConcurrentReconciles int
}

func (o *Options) AddFlags(fs *pflag.FlagSet) {
Expand All @@ -92,6 +93,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {

fs.Float32VarP(&o.QPS, "qps", "", 100, "Kubernetes client qps.")
fs.IntVar(&o.Burst, "burst", 200, "Kubernetes client burst.")
fs.IntVar(&o.MaxConcurrentReconciles, "max-concurrent-reconciles", 1, "Maximum number of concurrent reconciles.")
}

func (o *Options) MarkFlagsRequired(cmd *cobra.Command) {
Expand Down Expand Up @@ -219,13 +221,14 @@ func Run(ctx context.Context, opts Options) error {
}

if err := (&controllers.VolumeReconciler{
EventRecorder: mgr.GetEventRecorderFor("volumes"),
Client: mgr.GetClient(),
Scheme: scheme,
VolumeRuntime: volumeRuntime,
VolumeClassMapper: volumeClassMapper,
VolumePoolName: opts.VolumePoolName,
WatchFilterValue: opts.WatchFilterValue,
EventRecorder: mgr.GetEventRecorderFor("volumes"),
Client: mgr.GetClient(),
Scheme: scheme,
VolumeRuntime: volumeRuntime,
VolumeClassMapper: volumeClassMapper,
VolumePoolName: opts.VolumePoolName,
WatchFilterValue: opts.WatchFilterValue,
MaxConcurrentReconciles: opts.MaxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("error setting up volume reconciler with manager: %w", err)
}
Expand Down
19 changes: 13 additions & 6 deletions poollet/volumepoollet/controllers/volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@ import (
"fmt"

"github.com/go-logr/logr"
"github.com/ironcore-dev/controller-utils/clientutils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

corev1alpha1 "github.com/ironcore-dev/ironcore/api/core/v1alpha1"
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1"
iriVolume "github.com/ironcore-dev/ironcore/iri/apis/volume"
iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1"

volumepoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/volumepoollet/api/v1alpha1"
"github.com/ironcore-dev/ironcore/poollet/volumepoollet/controllers/events"
"github.com/ironcore-dev/ironcore/poollet/volumepoollet/vcm"
ironcoreclient "github.com/ironcore-dev/ironcore/utils/client"
"github.com/ironcore-dev/ironcore/utils/predicates"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/ironcore-dev/controller-utils/clientutils"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,6 +34,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
Expand All @@ -48,6 +50,8 @@ type VolumeReconciler struct {

VolumePoolName string
WatchFilterValue string

MaxConcurrentReconciles int
}

func (r *VolumeReconciler) iriVolumeLabels(volume *storagev1alpha1.Volume) map[string]string {
Expand Down Expand Up @@ -443,7 +447,7 @@ func (r *VolumeReconciler) update(ctx context.Context, log logr.Logger, volume *
return nil
}

func (r *VolumeReconciler) volumeSecretName(volumeName string, volumeHandle string) string {
func (r *VolumeReconciler) volumeSecretName(volumeName, volumeHandle string) string {
sum := sha256.Sum256([]byte(fmt.Sprintf("%s/%s", volumeName, volumeHandle)))
return hex.EncodeToString(sum[:])[:63]
}
Expand Down Expand Up @@ -508,7 +512,6 @@ func (r *VolumeReconciler) updateStatus(ctx context.Context, log logr.Logger, vo
VolumeAttributes: iriAccess.Attributes,
}
}

}

base := volume.DeepCopy()
Expand Down Expand Up @@ -542,6 +545,10 @@ func (r *VolumeReconciler) SetupWithManager(mgr ctrl.Manager) error {
predicates.ResourceIsNotExternallyManaged(log),
),
).
WithOptions(
controller.Options{
MaxConcurrentReconciles: r.MaxConcurrentReconciles,
}).
Complete(r)
}

Expand Down

0 comments on commit 8b7d861

Please sign in to comment.