Skip to content

Commit

Permalink
Support for plugable reconcilers
Browse files Browse the repository at this point in the history
Signed-off-by: Patryk Strusiewicz-Surmacki <[email protected]>
  • Loading branch information
p-strusiewiczsurmacki-mobica committed Sep 30, 2024
1 parent 28a37d8 commit a18e452
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 106 deletions.
95 changes: 63 additions & 32 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func main() {
var onlyBPFMode bool
var configFile string
var interfacePrefix string
var useNetconf bool
flag.StringVar(&configFile, "config", "",
"The controller will load its initial configuration from this file. "+
"Omit this flag to use the default configuration values. "+
Expand All @@ -100,6 +101,8 @@ func main() {
"Only attach BPF to specified interfaces in config. This will not start any reconciliation. Perfect for masters.")
flag.StringVar(&interfacePrefix, "macvlan-interface-prefix", "",
"Interface prefix for bridge devices for MACVlan sync")
flag.BoolVar(&useNetconf, "use-netconf", false,
"Use NETCONF interface to configure hosts instead of Netlink and FRR.")
opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -146,7 +149,7 @@ func main() {
os.Exit(1)
}

if err := initComponents(mgr, anycastTracker, cfg, clientConfig, onlyBPFMode); err != nil {
if err := initComponents(mgr, anycastTracker, cfg, clientConfig, onlyBPFMode, useNetconf); err != nil {
setupLog.Error(err, "unable to initialize components")
os.Exit(1)
}
Expand All @@ -163,10 +166,22 @@ func main() {
}
}

func initComponents(mgr manager.Manager, anycastTracker *anycast.Tracker, cfg *config.Config, clientConfig *rest.Config, onlyBPFMode bool) error {
func initComponents(mgr manager.Manager, anycastTracker *anycast.Tracker, cfg *config.Config,
clientConfig *rest.Config, onlyBPFMode bool, useNetconf bool) error {
var adapter reconciler.Adapter
var err error
if useNetconf {
adapter, err = reconciler.NewNetconfReconciler()
} else {
adapter, err = reconciler.NewLegacyReconciler(mgr.GetClient(), anycastTracker, mgr.GetLogger())
}

if err != nil {
return fmt.Errorf("unable to create reconciler: %w", err)
}
// Start VRFRouteConfigurationReconciler when we are not running in only BPF mode.
if !onlyBPFMode {
if err := setupReconcilers(mgr, anycastTracker); err != nil {
if err := setupReconcilers(mgr, anycastTracker, adapter); err != nil {
return fmt.Errorf("unable to setup reconcilers: %w", err)
}
}
Expand All @@ -179,18 +194,10 @@ func initComponents(mgr manager.Manager, anycastTracker *anycast.Tracker, cfg *c
return fmt.Errorf("unable to set up ready check: %w", err)
}

setupLog.Info("load bpf program into Kernel")
if err := bpf.InitBPFRouter(); err != nil {
return fmt.Errorf("unable to init BPF router: %w", err)
}
setupLog.Info("attach bpf to interfaces specified in config")
if err := bpf.AttachInterfaces(cfg.BPFInterfaces); err != nil {
return fmt.Errorf("unable to attach bpf to interfaces: %w", err)
if err := setupBPF(cfg); err != nil {
return fmt.Errorf("uneable to set up BPF: %w", err)
}

setupLog.Info("start bpf interface check")
bpf.RunInterfaceCheck()

setupLog.Info("start anycast sync")
anycastTracker.RunAnycastSync()

Expand All @@ -200,33 +207,57 @@ func initComponents(mgr manager.Manager, anycastTracker *anycast.Tracker, cfg *c
}

if onlyBPFMode {
clusterClient, err := client.New(clientConfig, client.Options{})
if err != nil {
return fmt.Errorf("error creating controller-runtime client: %w", err)
if err := runBPFOnlyMode(clientConfig); err != nil {
return fmt.Errorf("error running BPF only mode: %w", err)
}
}

nc, err := healthcheck.LoadConfig(healthcheck.NetHealthcheckFile)
if err != nil {
return fmt.Errorf("error loading network healthcheck config: %w", err)
}
return nil
}

tcpDialer := healthcheck.NewTCPDialer(nc.Timeout)
hc, err := healthcheck.NewHealthChecker(clusterClient,
healthcheck.NewDefaultHealthcheckToolkit(nil, tcpDialer),
nc)
if err != nil {
return fmt.Errorf("error initializing healthchecker: %w", err)
}
if err = performNetworkingHealthcheck(hc); err != nil {
return fmt.Errorf("error performing healthcheck: %w", err)
}
func runBPFOnlyMode(clientConfig *rest.Config) error {
clusterClient, err := client.New(clientConfig, client.Options{})
if err != nil {
return fmt.Errorf("error creating controller-runtime client: %w", err)
}

nc, err := healthcheck.LoadConfig(healthcheck.NetHealthcheckFile)
if err != nil {
return fmt.Errorf("error loading network healthcheck config: %w", err)
}

tcpDialer := healthcheck.NewTCPDialer(nc.Timeout)
hc, err := healthcheck.NewHealthChecker(clusterClient,
healthcheck.NewDefaultHealthcheckToolkit(nil, tcpDialer),
nc)
if err != nil {
return fmt.Errorf("error initializing healthchecker: %w", err)
}
if err = performNetworkingHealthcheck(hc); err != nil {
return fmt.Errorf("error performing healthcheck: %w", err)
}

return nil
}

func setupBPF(cfg *config.Config) error {
setupLog.Info("load bpf program into Kernel")
if err := bpf.InitBPFRouter(); err != nil {
return fmt.Errorf("unable to init BPF router: %w", err)
}
setupLog.Info("attach bpf to interfaces specified in config")
if err := bpf.AttachInterfaces(cfg.BPFInterfaces); err != nil {
return fmt.Errorf("unable to attach bpf to interfaces: %w", err)
}

setupLog.Info("start bpf interface check")
bpf.RunInterfaceCheck()

return nil
}

func setupReconcilers(mgr manager.Manager, anycastTracker *anycast.Tracker) error {
r, err := reconciler.NewReconciler(mgr.GetClient(), anycastTracker, mgr.GetLogger())
func setupReconcilers(mgr manager.Manager, anycastTracker *anycast.Tracker, adapter reconciler.Adapter) error {
r, err := reconciler.NewReconciler(mgr.GetClient(), anycastTracker, mgr.GetLogger(), adapter)
if err != nil {
return fmt.Errorf("unable to create debounced reconciler: %w", err)
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/reconciler/layer2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,26 @@ import (
"k8s.io/apimachinery/pkg/types"
)

func (r *reconcile) fetchLayer2(ctx context.Context) ([]networkv1alpha1.Layer2NetworkConfiguration, error) {
func (r *Reconciler) fetchLayer2(ctx context.Context) ([]networkv1alpha1.Layer2NetworkConfiguration, error) {
layer2List := &networkv1alpha1.Layer2NetworkConfigurationList{}
err := r.client.List(ctx, layer2List)
if err != nil {
r.Logger.Error(err, "error getting list of Layer2s from Kubernetes")
r.logger.Error(err, "error getting list of Layer2s from Kubernetes")
return nil, fmt.Errorf("error getting list of Layer2s from Kubernetes: %w", err)
}

nodeName := os.Getenv(healthcheck.NodenameEnv)
node := &corev1.Node{}
err = r.client.Get(ctx, types.NamespacedName{Name: nodeName}, node)
if err != nil {
r.Logger.Error(err, "error getting local node name")
r.logger.Error(err, "error getting local node name")
return nil, fmt.Errorf("error getting local node name: %w", err)
}

l2vnis := []networkv1alpha1.Layer2NetworkConfiguration{}
for i := range layer2List.Items {
item := &layer2List.Items[i]
logger := r.Logger.WithValues("name", item.ObjectMeta.Name, "namespace", item.ObjectMeta.Namespace, "vlan", item.Spec.ID, "vni", item.Spec.VNI)
logger := r.logger.WithValues("name", item.ObjectMeta.Name, "namespace", item.ObjectMeta.Namespace, "vlan", item.Spec.ID, "vni", item.Spec.VNI)
if item.Spec.NodeSelector != nil {
selector := labels.NewSelector()
var reqs labels.Requirements
Expand Down Expand Up @@ -76,7 +76,7 @@ func (r *reconcile) fetchLayer2(ctx context.Context) ([]networkv1alpha1.Layer2Ne
return l2vnis, nil
}

func (r *reconcile) reconcileLayer2(l2vnis []networkv1alpha1.Layer2NetworkConfiguration) error {
func (r *LegacyReconciler) reconcileLayer2(l2vnis []networkv1alpha1.Layer2NetworkConfiguration) error {
desired, err := r.getDesired(l2vnis)
if err != nil {
return err
Expand Down Expand Up @@ -111,10 +111,10 @@ func (r *reconcile) reconcileLayer2(l2vnis []networkv1alpha1.Layer2NetworkConfig
}

for i := range toDelete {
r.Logger.Info("Deleting Layer2 because it is no longer configured", "vlan", toDelete[i].VlanID, "vni", toDelete[i].VNI)
r.logger.Info("Deleting Layer2 because it is no longer configured", "vlan", toDelete[i].VlanID, "vni", toDelete[i].VNI)
errs := r.netlinkManager.CleanupL2(&toDelete[i])
for _, err := range errs {
r.Logger.Error(err, "Error deleting Layer2", "vlan", toDelete[i].VlanID, "vni", toDelete[i].VNI)
r.logger.Error(err, "Error deleting Layer2", "vlan", toDelete[i].VlanID, "vni", toDelete[i].VNI)
}
}

Expand All @@ -129,8 +129,8 @@ func (r *reconcile) reconcileLayer2(l2vnis []networkv1alpha1.Layer2NetworkConfig
return nil
}

func (r *reconcile) createL2(info *nl.Layer2Information, anycastTrackerInterfaces *[]int) error {
r.Logger.Info("Creating Layer2", "vlan", info.VlanID, "vni", info.VNI)
func (r *LegacyReconciler) createL2(info *nl.Layer2Information, anycastTrackerInterfaces *[]int) error {
r.logger.Info("Creating Layer2", "vlan", info.VlanID, "vni", info.VNI)
err := r.netlinkManager.CreateL2(info)
if err != nil {
return fmt.Errorf("error creating layer2 vlan %d vni %d: %w", info.VlanID, info.VNI, err)
Expand All @@ -145,7 +145,7 @@ func (r *reconcile) createL2(info *nl.Layer2Information, anycastTrackerInterface
return nil
}

func (r *reconcile) getDesired(l2vnis []networkv1alpha1.Layer2NetworkConfiguration) ([]nl.Layer2Information, error) {
func (r *LegacyReconciler) getDesired(l2vnis []networkv1alpha1.Layer2NetworkConfiguration) ([]nl.Layer2Information, error) {
availableVrfs, err := r.netlinkManager.ListL3()
if err != nil {
return nil, fmt.Errorf("error loading available VRFs: %w", err)
Expand All @@ -162,7 +162,7 @@ func (r *reconcile) getDesired(l2vnis []networkv1alpha1.Layer2NetworkConfigurati

anycastGateways, err := r.netlinkManager.ParseIPAddresses(spec.AnycastGateways)
if err != nil {
r.Logger.Error(err, "error parsing anycast gateways", "layer", l2vnis[i].ObjectMeta.Name, "gw", spec.AnycastGateways)
r.logger.Error(err, "error parsing anycast gateways", "layer", l2vnis[i].ObjectMeta.Name, "gw", spec.AnycastGateways)
return nil, fmt.Errorf("error parsing anycast gateways: %w", err)
}

Expand All @@ -175,7 +175,7 @@ func (r *reconcile) getDesired(l2vnis []networkv1alpha1.Layer2NetworkConfigurati
}
}
if !vrfAvailable {
r.Logger.Error(err, "VRF of Layer2 not found on node", "layer", l2vnis[i].ObjectMeta.Name, "vrf", spec.VRF)
r.logger.Error(err, "VRF of Layer2 not found on node", "layer", l2vnis[i].ObjectMeta.Name, "vrf", spec.VRF)
continue
}
}
Expand Down Expand Up @@ -213,8 +213,8 @@ func determineToBeDeleted(existing, desired []nl.Layer2Information) []nl.Layer2I
return toDelete
}

func (r *reconcile) reconcileExistingLayer(desired, currentConfig *nl.Layer2Information, anycastTrackerInterfaces *[]int) error {
r.Logger.Info("Reconciling existing Layer2", "vlan", desired.VlanID, "vni", desired.VNI)
func (r *LegacyReconciler) reconcileExistingLayer(desired, currentConfig *nl.Layer2Information, anycastTrackerInterfaces *[]int) error {
r.logger.Info("Reconciling existing Layer2", "vlan", desired.VlanID, "vni", desired.VNI)
err := r.netlinkManager.ReconcileL2(currentConfig, desired)
if err != nil {
return fmt.Errorf("error reconciling layer2 vlan %d vni %d: %w", desired.VlanID, desired.VNI, err)
Expand All @@ -229,7 +229,7 @@ func (r *reconcile) reconcileExistingLayer(desired, currentConfig *nl.Layer2Info
return nil
}

func (*reconcile) checkL2Duplicates(configs []networkv1alpha1.Layer2NetworkConfiguration) error {
func (*Reconciler) checkL2Duplicates(configs []networkv1alpha1.Layer2NetworkConfiguration) error {
for i := range configs {
for j := i + 1; j < len(configs); j++ {
if configs[i].Spec.ID == configs[j].Spec.ID {
Expand Down
Loading

0 comments on commit a18e452

Please sign in to comment.