Skip to content

Commit

Permalink
extend poollet flags with iri relist options
Browse files Browse the repository at this point in the history
  • Loading branch information
balpert89 committed Nov 19, 2024
1 parent 3217ad6 commit 9351f7b
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 22 deletions.
22 changes: 15 additions & 7 deletions poollet/bucketpoollet/cmd/bucketpoollet/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"os"
"time"

"github.com/ironcore-dev/controller-utils/configutils"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

ipamv1alpha1 "github.com/ironcore-dev/ironcore/api/ipam/v1alpha1"
networkingv1alpha1 "github.com/ironcore-dev/ironcore/api/networking/v1alpha1"
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
Expand All @@ -22,8 +24,8 @@ import (
"github.com/ironcore-dev/ironcore/poollet/bucketpoollet/controllers"
"github.com/ironcore-dev/ironcore/poollet/irievent"
"github.com/ironcore-dev/ironcore/utils/client/config"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/ironcore-dev/controller-utils/configutils"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -33,9 +35,7 @@ import (
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

var (
scheme = runtime.NewScheme()
)
var scheme = runtime.NewScheme()

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
Expand All @@ -61,6 +61,8 @@ type Options struct {
BucketClassMapperSyncTimeout time.Duration

ChannelCapacity int
RelistPeriod time.Duration
RelistThreshold time.Duration

WatchFilterValue string
}
Expand All @@ -82,7 +84,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.BucketRuntimeSocketDiscoveryTimeout, "bucket-runtime-discovery-timeout", 20*time.Second, "Timeout for discovering the bucket runtime socket.")
fs.DurationVar(&o.BucketClassMapperSyncTimeout, "bcm-sync-timeout", 10*time.Second, "Timeout waiting for the bucket class mapper to sync.")

fs.IntVar(&o.ChannelCapacity, "channel-capacity", 1024, "channel capacity for the bucket event generator")
fs.IntVar(&o.ChannelCapacity, "channel-capacity", 1024, "channel capacity for the bucket event generator.")
fs.DurationVar(&o.RelistPeriod, "relist-period", 5*time.Second, "event channel relisting period.")
fs.DurationVar(&o.RelistThreshold, "relist-threshold", 3*time.Minute, "event channel relisting threshold.")

fs.StringVar(&o.WatchFilterValue, "watch-filter", "", "Value to filter for while watching.")
}
Expand Down Expand Up @@ -146,6 +150,8 @@ func Run(ctx context.Context, opts Options) error {
return fmt.Errorf("error getting config: %w", err)
}

setupLog.Info("IRI client config", "ChannelCapacity", opts.ChannelCapacity, "RelistPeriod", opts.RelistPeriod, "RelistThreshold", opts.RelistThreshold)

leaderElectionCfg, err := configutils.GetConfig(
configutils.Kubeconfig(opts.LeaderElectionKubeconfig),
)
Expand Down Expand Up @@ -188,6 +194,8 @@ func Run(ctx context.Context, opts Options) error {
return res.Buckets, nil
}, irievent.GeneratorOptions{
ChannelCapacity: opts.ChannelCapacity,
RelistPeriod: opts.RelistPeriod,
RelistThreshold: opts.RelistThreshold,
})
if err := mgr.Add(bucketEvents); err != nil {
return fmt.Errorf("error adding bucket event generator: %w", err)
Expand Down
22 changes: 15 additions & 7 deletions poollet/machinepoollet/cmd/machinepoollet/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"strconv"
"time"

"github.com/ironcore-dev/controller-utils/configutils"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

computev1alpha1 "github.com/ironcore-dev/ironcore/api/compute/v1alpha1"
ipamv1alpha1 "github.com/ironcore-dev/ironcore/api/ipam/v1alpha1"
networkingv1alpha1 "github.com/ironcore-dev/ironcore/api/networking/v1alpha1"
Expand All @@ -28,8 +30,8 @@ import (
"github.com/ironcore-dev/ironcore/poollet/machinepoollet/mem"
"github.com/ironcore-dev/ironcore/poollet/machinepoollet/server"
"github.com/ironcore-dev/ironcore/utils/client/config"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/ironcore-dev/controller-utils/configutils"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -43,9 +45,7 @@ import (
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

var (
scheme = runtime.NewScheme()
)
var scheme = runtime.NewScheme()

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
Expand Down Expand Up @@ -73,6 +73,8 @@ type Options struct {
MachineClassMapperSyncTimeout time.Duration

ChannelCapacity int
RelistPeriod time.Duration
RelistThreshold time.Duration

ServerFlags server.Flags

Expand Down Expand Up @@ -100,7 +102,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.DialTimeout, "dial-timeout", 1*time.Second, "Timeout for dialing to the machine runtime endpoint.")
fs.DurationVar(&o.MachineClassMapperSyncTimeout, "mcm-sync-timeout", 10*time.Second, "Timeout waiting for the machine class mapper to sync.")

fs.IntVar(&o.ChannelCapacity, "channel-capacity", 1024, "channel capacity for the machine event generator")
fs.IntVar(&o.ChannelCapacity, "channel-capacity", 1024, "channel capacity for the machine event generator.")
fs.DurationVar(&o.RelistPeriod, "relist-period", 5*time.Second, "event channel relisting period.")
fs.DurationVar(&o.RelistThreshold, "relist-threshold", 3*time.Minute, "event channel relisting threshold.")

o.ServerFlags.BindFlags(fs)

Expand Down Expand Up @@ -202,6 +206,8 @@ func Run(ctx context.Context, opts Options) error {
return fmt.Errorf("error getting config: %w", err)
}

setupLog.Info("IRI client config", "ChannelCapacity", opts.ChannelCapacity, "RelistPeriod", opts.RelistPeriod, "RelistThreshold", opts.RelistThreshold)

leaderElectionCfg, err := configutils.GetConfig(
configutils.Kubeconfig(opts.LeaderElectionKubeconfig),
)
Expand Down Expand Up @@ -273,6 +279,8 @@ func Run(ctx context.Context, opts Options) error {
return res.Machines, nil
}, irievent.GeneratorOptions{
ChannelCapacity: opts.ChannelCapacity,
RelistPeriod: opts.RelistPeriod,
RelistThreshold: opts.RelistThreshold,
})
if err := mgr.Add(machineEvents); err != nil {
return fmt.Errorf("error adding machine event generator: %w", err)
Expand Down
23 changes: 15 additions & 8 deletions poollet/volumepoollet/cmd/volumepoollet/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"os"
"time"

"github.com/ironcore-dev/controller-utils/configutils"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

ipamv1alpha1 "github.com/ironcore-dev/ironcore/api/ipam/v1alpha1"
networkingv1alpha1 "github.com/ironcore-dev/ironcore/api/networking/v1alpha1"
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
Expand All @@ -22,10 +24,9 @@ import (
"github.com/ironcore-dev/ironcore/poollet/volumepoollet/controllers"
"github.com/ironcore-dev/ironcore/poollet/volumepoollet/vcm"
"github.com/ironcore-dev/ironcore/poollet/volumepoollet/vem"

"github.com/ironcore-dev/ironcore/utils/client/config"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/ironcore-dev/controller-utils/configutils"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -35,9 +36,7 @@ import (
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

var (
scheme = runtime.NewScheme()
)
var scheme = runtime.NewScheme()

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
Expand All @@ -63,6 +62,8 @@ type Options struct {
VolumeClassMapperSyncTimeout time.Duration

ChannelCapacity int
RelistPeriod time.Duration
RelistThreshold time.Duration

WatchFilterValue string
}
Expand All @@ -84,7 +85,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.VolumeRuntimeSocketDiscoveryTimeout, "volume-runtime-discovery-timeout", 20*time.Second, "Timeout for discovering the volume runtime socket.")
fs.DurationVar(&o.VolumeClassMapperSyncTimeout, "vcm-sync-timeout", 10*time.Second, "Timeout waiting for the volume class mapper to sync.")

fs.IntVar(&o.ChannelCapacity, "channel-capacity", 1024, "channel capacity for the volume event generator")
fs.IntVar(&o.ChannelCapacity, "channel-capacity", 1024, "channel capacity for the bucket event generator.")
fs.DurationVar(&o.RelistPeriod, "relist-period", 5*time.Second, "event channel relisting period.")
fs.DurationVar(&o.RelistThreshold, "relist-threshold", 3*time.Minute, "event channel relisting threshold.")

fs.StringVar(&o.WatchFilterValue, "watch-filter", "", "Value to filter for while watching.")
}
Expand Down Expand Up @@ -147,6 +150,8 @@ func Run(ctx context.Context, opts Options) error {
return fmt.Errorf("error getting config: %w", err)
}

setupLog.Info("IRI client config", "ChannelCapacity", opts.ChannelCapacity, "RelistPeriod", opts.RelistPeriod, "RelistThreshold", opts.RelistThreshold)

leaderElectionCfg, err := configutils.GetConfig(
configutils.Kubeconfig(opts.LeaderElectionKubeconfig),
)
Expand Down Expand Up @@ -188,6 +193,8 @@ func Run(ctx context.Context, opts Options) error {
return res.Volumes, nil
}, irievent.GeneratorOptions{
ChannelCapacity: opts.ChannelCapacity,
RelistPeriod: opts.RelistPeriod,
RelistThreshold: opts.RelistThreshold,
})
if err := mgr.Add(volumeEvents); err != nil {
return fmt.Errorf("error adding volume event generator: %w", err)
Expand Down

0 comments on commit 9351f7b

Please sign in to comment.