Skip to content

Commit

Permalink
Add QPS and Burst configuration for Kubernetes client in controller o…
Browse files Browse the repository at this point in the history
…ptionsUpdated `Options` struct with `QPS` and `Burst` fields, added corresponding flags in `AddFlags` method, and set these values in the `Run` function for ippoollet, machinepoollet, and volumepoollet controllers. This allows configurable rate limits for Kubernetes client requests and logs the settings on startup.Suggestion: Review the changes to ensure consistency and test the new configuration options.
  • Loading branch information
balpert89 committed Nov 18, 2024
1 parent 6a612a0 commit 8a83ccb
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 33 deletions.
20 changes: 16 additions & 4 deletions broker/bucketbroker/cmd/bucketbroker/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"fmt"
"net"

"github.com/ironcore-dev/controller-utils/configutils"
"github.com/ironcore-dev/ironcore/broker/bucketbroker/server"
"github.com/ironcore-dev/ironcore/broker/common"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"google.golang.org/grpc"

"github.com/ironcore-dev/ironcore/broker/bucketbroker/server"
"github.com/ironcore-dev/ironcore/broker/common"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"

"github.com/ironcore-dev/controller-utils/configutils"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
Expand All @@ -24,6 +26,9 @@ type Options struct {
Kubeconfig string
Address string

QPS float32
Burst int

Namespace string
BucketPoolName string
BucketPoolSelector map[string]string
Expand All @@ -33,6 +38,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Kubeconfig, "kubeconfig", o.Kubeconfig, "Path pointing to a kubeconfig file to use.")
fs.StringVar(&o.Address, "address", "/var/run/iri-bucketbroker.sock", "Address to listen on.")

fs.Float32VarP(&o.QPS, "qps", "", 100, "Kubernetes client qps.")
fs.IntVar(&o.Burst, "burst", 200, "Kubernetes client burst.")

fs.StringVar(&o.Namespace, "namespace", o.Namespace, "Target Kubernetes namespace to use.")
fs.StringVar(&o.BucketPoolName, "bucket-pool-name", o.BucketPoolName, "Name of the target bucket pool to pin buckets to, if any.")
fs.StringToStringVar(&o.BucketPoolSelector, "bucket-pool-selector", o.BucketPoolSelector, "Selector of the target bucket pools to pin buckets to, if any.")
Expand Down Expand Up @@ -74,6 +82,10 @@ func Run(ctx context.Context, opts Options) error {
return err
}

cfg.QPS = opts.QPS
cfg.Burst = opts.Burst
setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst)

srv, err := server.New(cfg, server.Options{
Namespace: opts.Namespace,
BucketPoolName: opts.BucketPoolName,
Expand Down
24 changes: 18 additions & 6 deletions broker/machinebroker/cmd/machinebroker/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ import (
"net/url"

"github.com/go-logr/logr"
"github.com/ironcore-dev/controller-utils/configutils"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/ironcore-dev/ironcore/broker/common"
commongrpc "github.com/ironcore-dev/ironcore/broker/common/grpc"
machinebrokerhttp "github.com/ironcore-dev/ironcore/broker/machinebroker/http"
"github.com/ironcore-dev/ironcore/broker/machinebroker/server"
iri "github.com/ironcore-dev/ironcore/iri/apis/machine/v1alpha1"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/ironcore-dev/controller-utils/configutils"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
Expand All @@ -34,6 +36,9 @@ type Options struct {
BaseURL string
BrokerDownwardAPILabels map[string]string

QPS float32
Burst int

Namespace string
MachinePoolName string
MachinePoolSelector map[string]string
Expand All @@ -48,6 +53,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringToStringVar(&o.BrokerDownwardAPILabels, "broker-downward-api-label", nil, "The labels to broker via downward API. "+
"Example is for instance to broker \"root-machine-uid\" initially obtained via \"machinepoollet.ironcore.dev/machine-uid\".")

fs.Float32VarP(&o.QPS, "qps", "", 100, "Kubernetes client qps.")
fs.IntVar(&o.Burst, "burst", 200, "Kubernetes client burst.")

fs.StringVar(&o.Namespace, "namespace", o.Namespace, "Target Kubernetes namespace to use.")
fs.StringVar(&o.MachinePoolName, "machine-pool-name", o.MachinePoolName, "Name of the target machine pool to pin machines to, if any.")
fs.StringToStringVar(&o.MachinePoolSelector, "machine-pool-selector", o.MachinePoolSelector, "Selector of the target machine pools to pin machines to, if any.")
Expand Down Expand Up @@ -102,6 +110,10 @@ func Run(ctx context.Context, opts Options) error {
baseURL = u.String()
}

cfg.QPS = opts.QPS
cfg.Burst = opts.Burst
setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst)

log.V(1).Info("Creating server",
"Namespace", opts.Namespace,
"MachinePoolName", opts.MachinePoolName,
Expand Down Expand Up @@ -142,7 +154,7 @@ func runServer(ctx context.Context, setupLog, log logr.Logger, srv *server.Serve
return nil
}

func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, srv *server.Server, opts Options) error {
func runGRPCServer(ctx context.Context, setupLog, log logr.Logger, srv *server.Server, opts Options) error {
log.V(1).Info("Cleaning up any previous socket")
if err := common.CleanupSocketIfExists(opts.Address); err != nil {
return fmt.Errorf("error cleaning up socket: %w", err)
Expand Down
20 changes: 16 additions & 4 deletions broker/volumebroker/cmd/volumebroker/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"fmt"
"net"

"github.com/ironcore-dev/controller-utils/configutils"
"github.com/ironcore-dev/ironcore/broker/common"
"github.com/ironcore-dev/ironcore/broker/volumebroker/server"
iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"google.golang.org/grpc"

"github.com/ironcore-dev/ironcore/broker/common"
"github.com/ironcore-dev/ironcore/broker/volumebroker/server"
iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1"

"github.com/ironcore-dev/controller-utils/configutils"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
Expand All @@ -24,6 +26,9 @@ type Options struct {
Kubeconfig string
Address string

QPS float32
Burst int

Namespace string
VolumePoolName string
VolumePoolSelector map[string]string
Expand All @@ -33,6 +38,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Kubeconfig, "kubeconfig", o.Kubeconfig, "Path pointing to a kubeconfig file to use.")
fs.StringVar(&o.Address, "address", "/var/run/iri-volumebroker.sock", "Address to listen on.")

fs.Float32VarP(&o.QPS, "qps", "", 100, "Kubernetes client qps.")
fs.IntVar(&o.Burst, "burst", 200, "Kubernetes client burst.")

fs.StringVar(&o.Namespace, "namespace", o.Namespace, "Target Kubernetes namespace to use.")
fs.StringVar(&o.VolumePoolName, "volume-pool-name", o.VolumePoolName, "Name of the target volume pool to pin volumes to, if any.")
fs.StringToStringVar(&o.VolumePoolSelector, "volume-pool-selector", o.VolumePoolSelector, "Selector of the target volume pools to pin volumes to, if any.")
Expand Down Expand Up @@ -74,6 +82,10 @@ func Run(ctx context.Context, opts Options) error {
return err
}

cfg.QPS = opts.QPS
cfg.Burst = opts.Burst
setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst)

srv, err := server.New(cfg, server.Options{
Namespace: opts.Namespace,
VolumePoolName: opts.VolumePoolName,
Expand Down
22 changes: 16 additions & 6 deletions poollet/bucketpoollet/cmd/bucketpoollet/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"os"
"time"

"github.com/ironcore-dev/controller-utils/configutils"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

ipamv1alpha1 "github.com/ironcore-dev/ironcore/api/ipam/v1alpha1"
networkingv1alpha1 "github.com/ironcore-dev/ironcore/api/networking/v1alpha1"
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
Expand All @@ -22,8 +24,8 @@ import (
"github.com/ironcore-dev/ironcore/poollet/bucketpoollet/controllers"
"github.com/ironcore-dev/ironcore/poollet/irievent"
"github.com/ironcore-dev/ironcore/utils/client/config"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/ironcore-dev/controller-utils/configutils"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -33,9 +35,7 @@ import (
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

var (
scheme = runtime.NewScheme()
)
var scheme = runtime.NewScheme()

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
Expand Down Expand Up @@ -63,6 +63,9 @@ type Options struct {
ChannelCapacity int

WatchFilterValue string

QPS float32
Burst int
}

func (o *Options) AddFlags(fs *pflag.FlagSet) {
Expand All @@ -85,6 +88,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&o.ChannelCapacity, "channel-capacity", 1024, "channel capacity for the bucket event generator")

fs.StringVar(&o.WatchFilterValue, "watch-filter", "", "Value to filter for while watching.")

fs.Float32VarP(&o.QPS, "qps", "", 100, "Kubernetes client qps.")
fs.IntVar(&o.Burst, "burst", 200, "Kubernetes client burst.")
}

func (o *Options) MarkFlagsRequired(cmd *cobra.Command) {
Expand Down Expand Up @@ -146,6 +152,10 @@ func Run(ctx context.Context, opts Options) error {
return fmt.Errorf("error getting config: %w", err)
}

cfg.QPS = opts.QPS
cfg.Burst = opts.Burst
setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst)

leaderElectionCfg, err := configutils.GetConfig(
configutils.Kubeconfig(opts.LeaderElectionKubeconfig),
)
Expand Down
22 changes: 16 additions & 6 deletions poollet/machinepoollet/cmd/machinepoollet/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"strconv"
"time"

"github.com/ironcore-dev/controller-utils/configutils"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

computev1alpha1 "github.com/ironcore-dev/ironcore/api/compute/v1alpha1"
ipamv1alpha1 "github.com/ironcore-dev/ironcore/api/ipam/v1alpha1"
networkingv1alpha1 "github.com/ironcore-dev/ironcore/api/networking/v1alpha1"
Expand All @@ -28,8 +30,8 @@ import (
"github.com/ironcore-dev/ironcore/poollet/machinepoollet/mem"
"github.com/ironcore-dev/ironcore/poollet/machinepoollet/server"
"github.com/ironcore-dev/ironcore/utils/client/config"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/ironcore-dev/controller-utils/configutils"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -43,9 +45,7 @@ import (
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

var (
scheme = runtime.NewScheme()
)
var scheme = runtime.NewScheme()

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
Expand Down Expand Up @@ -79,6 +79,9 @@ type Options struct {
AddressesOptions addresses.GetOptions

WatchFilterValue string

QPS float32
Burst int
}

func (o *Options) AddFlags(fs *pflag.FlagSet) {
Expand Down Expand Up @@ -107,6 +110,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
o.AddressesOptions.BindFlags(fs)

fs.StringVar(&o.WatchFilterValue, "watch-filter", "", "Value to filter for while watching.")

fs.Float32VarP(&o.QPS, "qps", "", 100, "Kubernetes client qps.")
fs.IntVar(&o.Burst, "burst", 200, "Kubernetes client burst.")
}

func (o *Options) MarkFlagsRequired(cmd *cobra.Command) {
Expand Down Expand Up @@ -202,6 +208,10 @@ func Run(ctx context.Context, opts Options) error {
return fmt.Errorf("error getting config: %w", err)
}

cfg.QPS = opts.QPS
cfg.Burst = opts.Burst
setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst)

leaderElectionCfg, err := configutils.GetConfig(
configutils.Kubeconfig(opts.LeaderElectionKubeconfig),
)
Expand Down
23 changes: 16 additions & 7 deletions poollet/volumepoollet/cmd/volumepoollet/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"os"
"time"

"github.com/ironcore-dev/controller-utils/configutils"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

ipamv1alpha1 "github.com/ironcore-dev/ironcore/api/ipam/v1alpha1"
networkingv1alpha1 "github.com/ironcore-dev/ironcore/api/networking/v1alpha1"
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
Expand All @@ -22,10 +24,9 @@ import (
"github.com/ironcore-dev/ironcore/poollet/volumepoollet/controllers"
"github.com/ironcore-dev/ironcore/poollet/volumepoollet/vcm"
"github.com/ironcore-dev/ironcore/poollet/volumepoollet/vem"

"github.com/ironcore-dev/ironcore/utils/client/config"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/ironcore-dev/controller-utils/configutils"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -35,9 +36,7 @@ import (
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

var (
scheme = runtime.NewScheme()
)
var scheme = runtime.NewScheme()

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
Expand Down Expand Up @@ -65,6 +64,9 @@ type Options struct {
ChannelCapacity int

WatchFilterValue string

QPS float32
Burst int
}

func (o *Options) AddFlags(fs *pflag.FlagSet) {
Expand All @@ -87,6 +89,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&o.ChannelCapacity, "channel-capacity", 1024, "channel capacity for the volume event generator")

fs.StringVar(&o.WatchFilterValue, "watch-filter", "", "Value to filter for while watching.")

fs.Float32VarP(&o.QPS, "qps", "", 100, "Kubernetes client qps.")
fs.IntVar(&o.Burst, "burst", 200, "Kubernetes client burst.")
}

func (o *Options) MarkFlagsRequired(cmd *cobra.Command) {
Expand Down Expand Up @@ -147,6 +152,10 @@ func Run(ctx context.Context, opts Options) error {
return fmt.Errorf("error getting config: %w", err)
}

cfg.QPS = opts.QPS
cfg.Burst = opts.Burst
setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst)

leaderElectionCfg, err := configutils.GetConfig(
configutils.Kubeconfig(opts.LeaderElectionKubeconfig),
)
Expand Down

0 comments on commit 8a83ccb

Please sign in to comment.