Skip to content

Commit

Permalink
adapting Worker Pods status report in Module to v.2 (#619) (#860)
Browse files Browse the repository at this point in the history
This commits adds implementation of reporting worker pods status
into the Module's status.
The following fields are reported:
1) number of targeted nodes ( based on selector field)
2) number of desired nodes that kernel module should be deployed (based
   on NMCs configured)
3) current number of the nodes that the kernel module is already
   deployed to ( based on NMC spec and status configs)
  • Loading branch information
yevgeny-shnaidman authored Oct 24, 2023
1 parent 3a42633 commit 67e33e1
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 38 deletions.
6 changes: 5 additions & 1 deletion internal/controllers/device_plugin_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (r *DevicePluginReconciler) Reconcile(ctx context.Context, req ctrl.Request

err = r.reconHelperAPI.moduleUpdateDevicePluginStatus(ctx, mod, existingDevicePluginDS)
if err != nil {
return res, fmt.Errorf("failed to update status of the module: %w", err)
return res, fmt.Errorf("failed to update device-plugin status of the module: %w", err)
}

logger.Info("Reconcile loop finished successfully")
Expand Down Expand Up @@ -283,6 +283,10 @@ func (dprh *devicePluginReconcilerHelper) moduleUpdateDevicePluginStatus(ctx con
mod *kmmv1beta1.Module,
existingDevicePluginDS []appsv1.DaemonSet) error {

if mod.Spec.DevicePlugin == nil {
return nil
}

// get the number of nodes targeted by selector (which also relevant for device plugin)
numTargetedNodes, err := dprh.getNumTargetedNodes(ctx, mod.Spec.Selector)
if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion internal/controllers/device_plugin_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,19 @@ var _ = Describe("DevicePluginReconciler_moduleUpdateDevicePluginStatus", func()

ctx := context.Background()

It("device plugin not defined in the module", func() {
mod := kmmv1beta1.Module{}
err := dprh.moduleUpdateDevicePluginStatus(ctx, &mod, nil)
Expect(err).NotTo(HaveOccurred())
})

DescribeTable("device-plugin status update",
func(numTargetedNodes int, numAvailableInDaemonSets []int, nodesMatchingNumber, availableNumber int) {
mod := kmmv1beta1.Module{}
mod := kmmv1beta1.Module{
Spec: kmmv1beta1.ModuleSpec{
DevicePlugin: &kmmv1beta1.DevicePluginSpec{},
},
}
expectedMod := mod.DeepCopy()
expectedMod.Status.DevicePlugin.NodesMatchingSelectorNumber = int32(nodesMatchingNumber)
expectedMod.Status.DevicePlugin.DesiredNumber = int32(nodesMatchingNumber)
Expand Down
14 changes: 14 additions & 0 deletions internal/controllers/mock_module_nmc_reconciler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 45 additions & 8 deletions internal/controllers/module_nmc_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"strings"

"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -129,6 +130,9 @@ func (mnr *ModuleNMCReconciler) Reconcile(ctx context.Context, req ctrl.Request)
sumErr = multierror.Append(sumErr, err)
}

err = mnr.reconHelper.moduleUpdateWorkerPodsStatus(ctx, mod, targetedNodes)
sumErr = multierror.Append(sumErr, err)

err = sumErr.ErrorOrNil()
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to reconcile module %s/%s config: %v", mod.Namespace, mod.Name, err)
Expand Down Expand Up @@ -167,6 +171,7 @@ type moduleNMCReconcilerHelperAPI interface {
prepareSchedulingData(ctx context.Context, mod *kmmv1beta1.Module, targetedNodes []v1.Node, currentNMCs sets.Set[string]) (map[string]schedulingData, []error)
enableModuleOnNode(ctx context.Context, mld *api.ModuleLoaderData, node *v1.Node) error
disableModuleOnNode(ctx context.Context, modNamespace, modName, nodeName string) error
moduleUpdateWorkerPodsStatus(ctx context.Context, mod *kmmv1beta1.Module, targetedNodes []v1.Node) error
}

type moduleNMCReconcilerHelper struct {
Expand Down Expand Up @@ -295,15 +300,19 @@ func (mnrh *moduleNMCReconcilerHelper) getNodesListBySelector(ctx context.Contex
}

func (mnrh *moduleNMCReconcilerHelper) getNMCsByModuleSet(ctx context.Context, mod *kmmv1beta1.Module) (sets.Set[string], error) {
nmcNamesList, err := mnrh.getNMCsNamesForModule(ctx, mod)
nmcList, err := mnrh.getNMCsForModule(ctx, mod)
if err != nil {
return nil, fmt.Errorf("failed to get list of %s/%s module's NMC for map: %v", mod.Namespace, mod.Name, err)
}

return sets.New[string](nmcNamesList...), nil
result := sets.New[string]()
for _, nmc := range nmcList {
result.Insert(nmc.Name)
}
return result, nil
}

func (mnrh *moduleNMCReconcilerHelper) getNMCsNamesForModule(ctx context.Context, mod *kmmv1beta1.Module) ([]string, error) {
func (mnrh *moduleNMCReconcilerHelper) getNMCsForModule(ctx context.Context, mod *kmmv1beta1.Module) ([]kmmv1beta1.NodeModulesConfig, error) {
logger := log.FromContext(ctx)
moduleNMCLabel := nmc.ModuleConfiguredLabel(mod.Namespace, mod.Name)
logger.V(1).Info("Listing nmcs", "selector", moduleNMCLabel)
Expand All @@ -312,11 +321,8 @@ func (mnrh *moduleNMCReconcilerHelper) getNMCsNamesForModule(ctx context.Context
if err := mnrh.client.List(ctx, &selectedNMCs, opt); err != nil {
return nil, fmt.Errorf("could not list NMCs: %v", err)
}
result := make([]string, len(selectedNMCs.Items))
for i := range selectedNMCs.Items {
result[i] = selectedNMCs.Items[i].Name
}
return result, nil

return selectedNMCs.Items, nil
}

// prepareSchedulingData prepare data needed to scheduling enable/disable module per node
Expand Down Expand Up @@ -428,6 +434,37 @@ func (mnrh *moduleNMCReconcilerHelper) removeModuleFromNMC(ctx context.Context,
return nil
}

func (mnrh *moduleNMCReconcilerHelper) moduleUpdateWorkerPodsStatus(ctx context.Context, mod *kmmv1beta1.Module, targetedNodes []v1.Node) error {
logger := log.FromContext(ctx)
// get nmcs with configured
nmcs, err := mnrh.getNMCsForModule(ctx, mod)
if err != nil {
return fmt.Errorf("failed to get configured NMCs for module %s/%s: %v", mod.Namespace, mod.Name, err)
}

numAvailable := 0
for _, nmc := range nmcs {
modSpec, _ := mnrh.nmcHelper.GetModuleSpecEntry(&nmc, mod.Namespace, mod.Name)
if modSpec == nil {
logger.Info(utils.WarnString(
fmt.Sprintf("module %s/%s spec is missing in NMC %s although config label is present", mod.Namespace, mod.Name, nmc.Name)))
continue
}
modStatus := mnrh.nmcHelper.GetModuleStatusEntry(&nmc, mod.Namespace, mod.Name)
if modStatus != nil && reflect.DeepEqual(modSpec.Config, modStatus.Config) {
numAvailable += 1
}
}

unmodifiedMod := mod.DeepCopy()

mod.Status.ModuleLoader.NodesMatchingSelectorNumber = int32(len(targetedNodes))
mod.Status.ModuleLoader.DesiredNumber = int32(len(nmcs))
mod.Status.ModuleLoader.AvailableNumber = int32(numAvailable)

return mnrh.client.Status().Patch(ctx, mod, client.MergeFrom(unmodifiedMod))
}

func prepareNodeSchedulingData(node v1.Node, mld *api.ModuleLoaderData, currentNMCs sets.Set[string]) schedulingData {
versionLabel := ""
present := false
Expand Down
187 changes: 176 additions & 11 deletions internal/controllers/module_nmc_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ var _ = Describe("Reconcile", func() {
getNodesError,
getNMCsMapError,
prepareSchedulingError,
shouldBeOnNode bool) {
shouldBeOnNode,
disableEnableError,
moduleUpdateStatusErr bool) {

nmcMLDConfigs := map[string]schedulingData{"nodeName": disableSchedulingData}
if shouldBeOnNode {
Expand Down Expand Up @@ -123,13 +125,28 @@ var _ = Describe("Reconcile", func() {
mockReconHelper.EXPECT().getNMCsByModuleSet(ctx, &mod).Return(currentNMCs, nil)
if prepareSchedulingError {
mockReconHelper.EXPECT().prepareSchedulingData(ctx, &mod, targetedNodes, currentNMCs).Return(nil, []error{returnedError})
goto executeTestFunction
goto moduleStatusUpdateFunction
}
mockReconHelper.EXPECT().prepareSchedulingData(ctx, &mod, targetedNodes, currentNMCs).Return(nmcMLDConfigs, []error{})
if disableEnableError {
if shouldBeOnNode {
mockReconHelper.EXPECT().enableModuleOnNode(ctx, &mld, &node).Return(returnedError)
} else {
mockReconHelper.EXPECT().disableModuleOnNode(ctx, mod.Namespace, mod.Name, node.Name).Return(returnedError)
}
goto moduleStatusUpdateFunction
}
if shouldBeOnNode {
mockReconHelper.EXPECT().enableModuleOnNode(ctx, &mld, &node).Return(returnedError)
mockReconHelper.EXPECT().enableModuleOnNode(ctx, &mld, &node).Return(nil)
} else {
mockReconHelper.EXPECT().disableModuleOnNode(ctx, mod.Namespace, mod.Name, node.Name).Return(nil)
}

moduleStatusUpdateFunction:
if moduleUpdateStatusErr {
mockReconHelper.EXPECT().moduleUpdateWorkerPodsStatus(ctx, &mod, targetedNodes).Return(returnedError)
} else {
mockReconHelper.EXPECT().disableModuleOnNode(ctx, mod.Namespace, mod.Name, node.Name).Return(returnedError)
mockReconHelper.EXPECT().moduleUpdateWorkerPodsStatus(ctx, &mod, targetedNodes).Return(nil)
}

executeTestFunction:
Expand All @@ -139,13 +156,14 @@ var _ = Describe("Reconcile", func() {
Expect(err).To(HaveOccurred())

},
Entry("getRequestedModule failed", true, false, false, false, false, false),
Entry("setFinalizerAndStatus failed", false, true, false, false, false, false),
Entry("getNodesListBySelector failed", false, false, true, false, false, false),
Entry("getNMCsByModuleMap failed", false, false, false, true, false, false),
Entry("prepareSchedulingData failed", false, false, false, false, true, false),
Entry("enableModuleOnNode failed", false, false, false, false, false, true),
Entry("disableModuleOnNode failed", false, false, false, false, false, false),
Entry("getRequestedModule failed", true, false, false, false, false, false, false, false),
Entry("setFinalizerAndStatus failed", false, true, false, false, false, false, false, false),
Entry("getNodesListBySelector failed", false, false, true, false, false, false, false, false),
Entry("getNMCsByModuleMap failed", false, false, false, true, false, false, false, false),
Entry("prepareSchedulingData failed", false, false, false, false, true, false, false, false),
Entry("enableModuleOnNode failed", false, false, false, false, false, true, true, false),
Entry("disableModuleOnNode failed", false, false, false, false, false, false, true, false),
Entry(".moduleUpdateWorkerPodsStatus failed", false, false, false, false, false, false, false, true),
)

It("Good flow, should run on node", func() {
Expand All @@ -157,6 +175,7 @@ var _ = Describe("Reconcile", func() {
mockReconHelper.EXPECT().getNMCsByModuleSet(ctx, &mod).Return(currentNMCs, nil),
mockReconHelper.EXPECT().prepareSchedulingData(ctx, &mod, targetedNodes, currentNMCs).Return(nmcMLDConfigs, nil),
mockReconHelper.EXPECT().enableModuleOnNode(ctx, &mld, &node).Return(nil),
mockReconHelper.EXPECT().moduleUpdateWorkerPodsStatus(ctx, &mod, targetedNodes).Return(nil),
)

res, err := mnr.Reconcile(ctx, req)
Expand All @@ -174,6 +193,7 @@ var _ = Describe("Reconcile", func() {
mockReconHelper.EXPECT().getNMCsByModuleSet(ctx, &mod).Return(currentNMCs, nil),
mockReconHelper.EXPECT().prepareSchedulingData(ctx, &mod, targetedNodes, currentNMCs).Return(nmcMLDConfigs, nil),
mockReconHelper.EXPECT().disableModuleOnNode(ctx, mod.Namespace, mod.Name, node.Name).Return(nil),
mockReconHelper.EXPECT().moduleUpdateWorkerPodsStatus(ctx, &mod, targetedNodes).Return(nil),
)

res, err := mnr.Reconcile(ctx, req)
Expand Down Expand Up @@ -929,3 +949,148 @@ var _ = Describe("removeModuleFromNMC", func() {
Expect(err).NotTo(HaveOccurred())
})
})

var _ = Describe("moduleUpdateWorkerPodsStatus", func() {
var (
ctx context.Context
ctrl *gomock.Controller
clnt *client.MockClient
mod kmmv1beta1.Module
mnrh *moduleNMCReconcilerHelper
helper *nmc.MockHelper
statusWriter *client.MockStatusWriter
)

BeforeEach(func() {
ctx = context.Background()
ctrl = gomock.NewController(GinkgoT())
clnt = client.NewMockClient(ctrl)
helper = nmc.NewMockHelper(ctrl)
statusWriter = client.NewMockStatusWriter(ctrl)
mod = kmmv1beta1.Module{
ObjectMeta: metav1.ObjectMeta{
Name: "modName",
Namespace: "modNamespace",
},
}
mnrh = &moduleNMCReconcilerHelper{client: clnt, nmcHelper: helper}
})

It("faled to get configured NMCs", func() {
clnt.EXPECT().List(ctx, gomock.Any(), gomock.Any()).Return(fmt.Errorf("some error"))
err := mnrh.moduleUpdateWorkerPodsStatus(ctx, &mod, nil)
Expect(err).To(HaveOccurred())
})

It("module missing from spec", func() {
nmc1 := kmmv1beta1.NodeModulesConfig{
ObjectMeta: metav1.ObjectMeta{Name: "nmc1"},
}
targetedNodes := []v1.Node{v1.Node{}, v1.Node{}}
expectedMod := mod.DeepCopy()
expectedMod.Status.ModuleLoader.NodesMatchingSelectorNumber = int32(2)
expectedMod.Status.ModuleLoader.DesiredNumber = int32(1)
expectedMod.Status.ModuleLoader.AvailableNumber = int32(0)
gomock.InOrder(
clnt.EXPECT().List(ctx, gomock.Any(), gomock.Any()).DoAndReturn(
func(_ interface{}, list *kmmv1beta1.NodeModulesConfigList, _ ...interface{}) error {
list.Items = []kmmv1beta1.NodeModulesConfig{nmc1}
return nil
},
),
helper.EXPECT().GetModuleSpecEntry(&nmc1, mod.Namespace, mod.Name).Return(nil, 0),
clnt.EXPECT().Status().Return(statusWriter),
statusWriter.EXPECT().Patch(ctx, expectedMod, gomock.Any()),
)

err := mnrh.moduleUpdateWorkerPodsStatus(ctx, &mod, targetedNodes)
Expect(err).NotTo(HaveOccurred())
})

DescribeTable("module present in spec", func(numTargetedNodes int,
modulePresentInStatus,
configsEqual bool,
expectedNodesMatchingSelectorNumber,
expectedDesiredNumber,
expectedAvailableNumber int) {
expectedMod := mod.DeepCopy()
expectedMod.Status.ModuleLoader.NodesMatchingSelectorNumber = int32(expectedNodesMatchingSelectorNumber)
expectedMod.Status.ModuleLoader.DesiredNumber = int32(expectedDesiredNumber)
expectedMod.Status.ModuleLoader.AvailableNumber = int32(expectedAvailableNumber)

targetedNodes := []v1.Node{}
for i := 0; i < numTargetedNodes; i++ {
targetedNodes = append(targetedNodes, v1.Node{})
}
nmc1 := kmmv1beta1.NodeModulesConfig{
ObjectMeta: metav1.ObjectMeta{Name: "nmc1"},
}
moduleConfig1 := kmmv1beta1.ModuleConfig{ContainerImage: "some image1"}
moduleConfig2 := kmmv1beta1.ModuleConfig{ContainerImage: "some image2"}
nmcModuleSpec := kmmv1beta1.NodeModuleSpec{
Config: moduleConfig1,
}
nmcModuleStatus := kmmv1beta1.NodeModuleStatus{}
if configsEqual {
nmcModuleStatus.Config = moduleConfig1
} else {
nmcModuleStatus.Config = moduleConfig2
}
clnt.EXPECT().List(ctx, gomock.Any(), gomock.Any()).DoAndReturn(
func(_ interface{}, list *kmmv1beta1.NodeModulesConfigList, _ ...interface{}) error {
list.Items = []kmmv1beta1.NodeModulesConfig{nmc1}
return nil
},
)
helper.EXPECT().GetModuleSpecEntry(&nmc1, mod.Namespace, mod.Name).Return(&nmcModuleSpec, 0)
if modulePresentInStatus {
helper.EXPECT().GetModuleStatusEntry(&nmc1, mod.Namespace, mod.Name).Return(&nmcModuleStatus)
} else {
helper.EXPECT().GetModuleStatusEntry(&nmc1, mod.Namespace, mod.Name).Return(nil)
}
clnt.EXPECT().Status().Return(statusWriter)
statusWriter.EXPECT().Patch(ctx, expectedMod, gomock.Any())

err := mnrh.moduleUpdateWorkerPodsStatus(ctx, &mod, targetedNodes)
Expect(err).NotTo(HaveOccurred())
},
Entry("2 targeted nodes, module not in status", 2, false, false, 2, 1, 0),
Entry("3 targeted nodes, module in status, configs not equal", 2, true, false, 2, 1, 0),
Entry("3 targeted nodes, module in status, configs equal", 2, true, true, 2, 1, 1),
)

It("multiple module in spec and status", func() {
moduleConfig1 := kmmv1beta1.ModuleConfig{ContainerImage: "some image1"}
//moduleConfig2 := kmmv1beta1.ModuleConfig{ContainerImage: "some image2",}
nmcModuleSpec := kmmv1beta1.NodeModuleSpec{
Config: moduleConfig1,
}
nmcModuleStatus := kmmv1beta1.NodeModuleStatus{
Config: moduleConfig1,
}
nmc1 := kmmv1beta1.NodeModulesConfig{
ObjectMeta: metav1.ObjectMeta{Name: "nmc1"},
}
targetedNodes := []v1.Node{v1.Node{}, v1.Node{}}
expectedMod := mod.DeepCopy()
expectedMod.Status.ModuleLoader.NodesMatchingSelectorNumber = int32(2)
expectedMod.Status.ModuleLoader.DesiredNumber = int32(1)
expectedMod.Status.ModuleLoader.AvailableNumber = int32(1)
gomock.InOrder(
clnt.EXPECT().List(ctx, gomock.Any(), gomock.Any()).DoAndReturn(
func(_ interface{}, list *kmmv1beta1.NodeModulesConfigList, _ ...interface{}) error {
list.Items = []kmmv1beta1.NodeModulesConfig{nmc1}
return nil
},
),
helper.EXPECT().GetModuleSpecEntry(&nmc1, mod.Namespace, mod.Name).Return(&nmcModuleSpec, 0),
helper.EXPECT().GetModuleStatusEntry(&nmc1, mod.Namespace, mod.Name).Return(&nmcModuleStatus),
clnt.EXPECT().Status().Return(statusWriter),
statusWriter.EXPECT().Patch(ctx, expectedMod, gomock.Any()),
)

err := mnrh.moduleUpdateWorkerPodsStatus(ctx, &mod, targetedNodes)
Expect(err).NotTo(HaveOccurred())
})

})
Loading

0 comments on commit 67e33e1

Please sign in to comment.