diff --git a/pkg/csi_driver/node_test.go b/pkg/csi_driver/node_test.go index 8d04599f..730a07d8 100644 --- a/pkg/csi_driver/node_test.go +++ b/pkg/csi_driver/node_test.go @@ -106,7 +106,7 @@ func initTestNodeServerWithKubeClient(t *testing.T, client kubernetes.Interface) mounter: mounter, metaService: metaserice, volumeLocks: util.NewVolumeLocks(), - lockReleaseController: lockrelease.NewFakeLockReleaseControllerWithClient(client), + lockReleaseController: lockrelease.NewControllerBuilder().WithClient(client).Build(), features: &GCFSDriverFeatureOptions{FeatureLockRelease: &FeatureLockRelease{Enabled: true}}, } } diff --git a/pkg/releaselock/configmap_util_test.go b/pkg/releaselock/configmap_util_test.go index 78fa3baf..e84b0eed 100644 --- a/pkg/releaselock/configmap_util_test.go +++ b/pkg/releaselock/configmap_util_test.go @@ -179,7 +179,7 @@ func TestGetConfigMap(t *testing.T) { } for _, test := range cases { client := fake.NewSimpleClientset(test.existingCM) - controller := NewFakeLockReleaseControllerWithClient(client) + controller := NewControllerBuilder().WithClient(client).Build() cm, err := controller.GetConfigMap(context.Background(), test.cmName, test.cmNamespace) if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil { t.Fatal(gotExpected) @@ -274,7 +274,7 @@ func TestUpdateConfigMapWithKeyValue(t *testing.T) { } for _, test := range cases { client := fake.NewSimpleClientset(test.existingCM) - controller := NewFakeLockReleaseControllerWithClient(client) + controller := NewControllerBuilder().WithClient(client).Build() ctx := context.Background() err := controller.UpdateConfigMapWithKeyValue(ctx, test.existingCM, test.key, test.value) if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil { @@ -372,7 +372,7 @@ func TestRemoveKeyFromConfigMap(t *testing.T) { } for _, test := range cases { client := fake.NewSimpleClientset(test.existingCM) - controller := NewFakeLockReleaseControllerWithClient(client) + controller := NewControllerBuilder().WithClient(client).Build() ctx := context.Background() err := controller.RemoveKeyFromConfigMap(ctx, test.existingCM, test.key) if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil { @@ -470,7 +470,7 @@ func TestRemoveKeyFromConfigMapWithRetry(t *testing.T) { } for _, test := range cases { client := fake.NewSimpleClientset(test.existingCM) - controller := NewFakeLockReleaseControllerWithClient(client) + controller := NewControllerBuilder().WithClient(client).Build() ctx := context.Background() err := controller.RemoveKeyFromConfigMapWithRetry(ctx, test.existingCM, test.key) if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil { diff --git a/pkg/releaselock/controller.go b/pkg/releaselock/controller.go index 11989b17..dacba50b 100644 --- a/pkg/releaselock/controller.go +++ b/pkg/releaselock/controller.go @@ -48,6 +48,77 @@ type NodeUpdatePair struct { NewObj *corev1.Node } +type LockService interface { + ReleaseLock(hostIP, clientIP string) error +} + +type EventProcessor interface { + processConfigMapEntryOnNodeCreation(ctx context.Context, key string, filestoreIP string, node *corev1.Node, cm *corev1.ConfigMap) error + processConfigMapEntryOnNodeUpdate(ctx context.Context, key string, filestoreIP string, newNode *corev1.Node, oldNode *corev1.Node, cm *corev1.ConfigMap) error + SetController(ctrl *LockReleaseController) +} + +type DefaultEventProcessor struct { + ctrl *LockReleaseController +} + +func (p *DefaultEventProcessor) SetController(ctrl *LockReleaseController) { + p.ctrl = ctrl +} + +func (p *DefaultEventProcessor) processConfigMapEntryOnNodeCreation(ctx context.Context, key string, filestoreIP string, node *corev1.Node, cm *corev1.ConfigMap) error { + if p.ctrl == nil { + return fmt.Errorf("controller not set") + } + + c := p.ctrl + _, _, _, _, gceInstanceID, gkeNodeInternalIP, err := ParseConfigMapKey(key) + if err != nil { + return fmt.Errorf("failed to parse configmap key %s: %v", key, err) + } + klog.V(6).Infof("Verifying GKE node %s with nodeId %s nodeInternalIP %s exists or not", node.Name, gceInstanceID, gkeNodeInternalIP) + entryMatchesNode, err := c.verifyConfigMapEntry(node, gceInstanceID, gkeNodeInternalIP) + if err != nil { + return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %v", node.Name, gceInstanceID, gkeNodeInternalIP, err) + } + if entryMatchesNode { + klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s still exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP) + return nil + } + + // Try to match the latest node, to prevent incorrect releasing the lock in case of a lagging informer/watch + latestNode, err := c.client.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{}) + if err != nil { + if apiError.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to get node in namespace %v", err) + } + entryMatchesLatestNode, err := c.verifyConfigMapEntry(latestNode, gceInstanceID, gkeNodeInternalIP) + if err != nil { + return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %v", node.Name, gceInstanceID, gkeNodeInternalIP, err) + } + if entryMatchesLatestNode { + klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP) + return nil + } + + klog.Infof("GKE node %s with nodeId %s nodeInternalIP %s no longer exists, releasing lock for Filestore IP %s", node.Name, gceInstanceID, gkeNodeInternalIP, filestoreIP) + opErr := c.lockService.ReleaseLock(filestoreIP, gkeNodeInternalIP) + c.RecordLockReleaseMetrics(opErr) + if opErr != nil { + return fmt.Errorf("failed to release lock: %v", opErr) + } + klog.Infof("Removing lock info key %s from configmap %s/%s with data %v", key, cm.Namespace, cm.Name, cm.Data) + // Apply the "Get() and Update(), or retry" logic in RemoveKeyFromConfigMap(). + // This will increase the number of k8s api calls, + // but reduce repetitive ReleaseLock() due to kubeclient api failures in each reconcile loop. + if err := c.RemoveKeyFromConfigMapWithRetry(ctx, cm, key); err != nil { + return fmt.Errorf("failed to remove key %s from configmap %s/%s: %v", key, cm.Namespace, cm.Name, err) + } + return nil +} + type LockReleaseController struct { client kubernetes.Interface @@ -63,6 +134,9 @@ type LockReleaseController struct { updateEventQueue workqueue.RateLimitingInterface createEventQueue workqueue.RateLimitingInterface + + eventProcessor EventProcessor + lockService LockService } type LockReleaseControllerConfig struct { @@ -98,6 +172,9 @@ func NewLockReleaseController( &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, ) + eventProcessor := &DefaultEventProcessor{} + lockService := &FileStoreRPCClient{} + lc := &LockReleaseController{ id: id, hostname: hostname, @@ -106,6 +183,8 @@ func NewLockReleaseController( nodeInformer: nodeInformer, updateEventQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), createEventQueue: workqueue.NewRateLimitingQueue(ratelimiter), + eventProcessor: eventProcessor, + lockService: lockService, } if config.MetricEndpoint != "" { @@ -116,6 +195,7 @@ func NewLockReleaseController( lc.metricsManager = mm } + eventProcessor.SetController(lc) return lc, nil } @@ -160,11 +240,13 @@ func (c *LockReleaseController) handleCreateEvent(ctx context.Context, obj inter var configMapReconcileErrors []error for key, filestoreIP := range data { - err = c.processConfigMapEntryOnNodeCreation(ctx, key, filestoreIP, node, cm) + eventProcessor := c.eventProcessor + err = eventProcessor.processConfigMapEntryOnNodeCreation(ctx, key, filestoreIP, node, cm) if err != nil { configMapReconcileErrors = append(configMapReconcileErrors, err) } } + klog.Infof("skipped processing %d entries in config map", len(configMapReconcileErrors)) if len(configMapReconcileErrors) > 0 { return errors.Join(configMapReconcileErrors...) } @@ -172,62 +254,6 @@ func (c *LockReleaseController) handleCreateEvent(ctx context.Context, obj inter } -func (c *LockReleaseController) processConfigMapEntryOnNodeCreation(ctx context.Context, key string, filestoreIP string, node *corev1.Node, cm *corev1.ConfigMap) error { - _, _, _, _, gceInstanceID, gkeNodeInternalIP, err := ParseConfigMapKey(key) - if err != nil { - return fmt.Errorf("failed to parse configmap key %s: %w", key, err) - } - klog.V(6).Infof("Verifying GKE node %s with nodeId %s nodeInternalIP %s exists or not", node.Name, gceInstanceID, gkeNodeInternalIP) - entryMatchesNode, err := c.verifyConfigMapEntry(node, gceInstanceID, gkeNodeInternalIP) - if err != nil { - return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", node.Name, gceInstanceID, gkeNodeInternalIP, err) - } - if entryMatchesNode { - klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s still exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP) - return nil - } - - // Try to match the latest node, to prevent incorrect releasing the lock in case of a lagging informer/watch - latestNode, err := c.client.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{}) - if err != nil { - if apiError.IsNotFound(err) { - opErr := ReleaseLock(filestoreIP, gkeNodeInternalIP) - c.RecordLockReleaseMetrics(opErr) - if opErr != nil { - return fmt.Errorf("failed to release lock: %w", opErr) - } - if err := c.RemoveKeyFromConfigMapWithRetry(ctx, cm, key); err != nil { - return fmt.Errorf("failed to remove key %s from configmap %s/%s: %w", key, cm.Namespace, cm.Name, err) - } - return nil - } - return fmt.Errorf("failed to get node in namespace %w", err) - } - entryMatchesLatestNode, err := c.verifyConfigMapEntry(latestNode, gceInstanceID, gkeNodeInternalIP) - if err != nil { - return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", node.Name, gceInstanceID, gkeNodeInternalIP, err) - } - if entryMatchesLatestNode { - klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP) - return nil - } - - klog.Infof("GKE node %s with nodeId %s nodeInternalIP %s no longer exists, releasing lock for Filestore IP %s", node.Name, gceInstanceID, gkeNodeInternalIP, filestoreIP) - opErr := ReleaseLock(filestoreIP, gkeNodeInternalIP) - c.RecordLockReleaseMetrics(opErr) - if opErr != nil { - return fmt.Errorf("failed to release lock: %w", opErr) - } - klog.Infof("Removing lock info key %s from configmap %s/%s with data %v", key, cm.Namespace, cm.Name, cm.Data) - // Apply the "Get() and Update(), or retry" logic in RemoveKeyFromConfigMap(). - // This will increase the number of k8s api calls, - // but reduce repetitive ReleaseLock() due to kubeclient api failures in each reconcile loop. - if err := c.RemoveKeyFromConfigMapWithRetry(ctx, cm, key); err != nil { - return fmt.Errorf("failed to remove key %s from configmap %s/%s: %w", key, cm.Namespace, cm.Name, err) - } - return nil -} - func (c *LockReleaseController) processNextCreateEvent(ctx context.Context) bool { obj, shutdown := c.createEventQueue.Get() if shutdown { @@ -306,7 +332,7 @@ func (c *LockReleaseController) handleUpdateEvent(ctx context.Context, oldObj in data := cm.DeepCopy().Data var configMapReconcileErrors []error for key, filestoreIP := range data { - err = c.processConfigMapEntryOnNodeUpdate(ctx, key, filestoreIP, newNode, oldNode, cm) + err = c.eventProcessor.processConfigMapEntryOnNodeUpdate(ctx, key, filestoreIP, newNode, oldNode, cm) if err != nil { configMapReconcileErrors = append(configMapReconcileErrors, err) } @@ -317,7 +343,11 @@ func (c *LockReleaseController) handleUpdateEvent(ctx context.Context, oldObj in return nil } -func (c *LockReleaseController) processConfigMapEntryOnNodeUpdate(ctx context.Context, key string, filestoreIP string, newNode *corev1.Node, oldNode *corev1.Node, cm *corev1.ConfigMap) error { +func (p *DefaultEventProcessor) processConfigMapEntryOnNodeUpdate(ctx context.Context, key string, filestoreIP string, newNode *corev1.Node, oldNode *corev1.Node, cm *corev1.ConfigMap) error { + if p.ctrl == nil { + return fmt.Errorf("controller not set") + } + c := p.ctrl _, _, _, _, gceInstanceID, gkeNodeInternalIP, err := ParseConfigMapKey(key) if err != nil { return fmt.Errorf("failed to parse configmap key %s: %w", key, err) @@ -327,18 +357,20 @@ func (c *LockReleaseController) processConfigMapEntryOnNodeUpdate(ctx context.Co if err != nil { return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", newNode.Name, gceInstanceID, gkeNodeInternalIP, err) } - entryMatchesOldNode, err := c.verifyConfigMapEntry(oldNode, gceInstanceID, gkeNodeInternalIP) - if err != nil { - return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", newNode.Name, gceInstanceID, gkeNodeInternalIP, err) - } - klog.Infof("Checked config map entry against old node(matching result %t), and new node(matching result %t)", entryMatchesOldNode, entryMatchesNewNode) if entryMatchesNewNode { klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s still exists in API server, skip lock info reconciliation", newNode.Name, gceInstanceID, gkeNodeInternalIP) return nil } + + entryMatchesOldNode, err := c.verifyConfigMapEntry(oldNode, gceInstanceID, gkeNodeInternalIP) + if err != nil { + return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %v", newNode.Name, gceInstanceID, gkeNodeInternalIP, err) + } + klog.Infof("Checked config map entry against old node(matching result %t), and new node(matching result %t)", entryMatchesOldNode, entryMatchesNewNode) + if entryMatchesOldNode { klog.Infof("GKE node %s with nodeId %s nodeInternalIP %s matches a node before update, releasing lock for Filestore IP %s", newNode.Name, gceInstanceID, gkeNodeInternalIP, filestoreIP) - opErr := ReleaseLock(filestoreIP, gkeNodeInternalIP) + opErr := c.lockService.ReleaseLock(filestoreIP, gkeNodeInternalIP) c.RecordLockReleaseMetrics(opErr) if opErr != nil { return fmt.Errorf("failed to release lock: %w", opErr) diff --git a/pkg/releaselock/controller_test.go b/pkg/releaselock/controller_test.go index 3f5c4605..bd37d3c9 100644 --- a/pkg/releaselock/controller_test.go +++ b/pkg/releaselock/controller_test.go @@ -1,12 +1,50 @@ package lockrelease import ( + "context" + "fmt" "testing" + "github.com/stretchr/testify/mock" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" ) +type MockEventProcessor struct { + mock.Mock +} + +func (m *MockEventProcessor) processConfigMapEntryOnNodeCreation(ctx context.Context, key string, filestoreIP string, node *corev1.Node, cm *corev1.ConfigMap) error { + args := m.Called(ctx) // Pass the arguments used in On() + if args.Error(0) != nil { + return args.Error(0) + } + return nil +} + +func (m *MockEventProcessor) processConfigMapEntryOnNodeUpdate(ctx context.Context, key string, filestoreIP string, newNode *corev1.Node, oldNode *corev1.Node, cm *corev1.ConfigMap) error { + args := m.Called(ctx) + if args.Error(0) != nil { + return args.Error(0) + } + return nil +} + +type MockLockService struct { + mock.Mock +} + +func (m *MockLockService) ReleaseLock(hostIP, clientIP string) error { + args := m.Called() + if args.Error(0) != nil { + return args.Error(0) + } + return nil +} + +func (m *MockEventProcessor) SetController(ctrl *LockReleaseController) {} + func TestVerifyConfigMapEntry(t *testing.T) { cases := []struct { name string @@ -105,7 +143,7 @@ func TestVerifyConfigMapEntry(t *testing.T) { }, } for _, test := range cases { - controller := NewFakeLockReleaseController() + controller := NewControllerBuilder().Build() nodeExists, err := controller.verifyConfigMapEntry(test.node, test.gceInstanceID, test.nodeInternalIP) if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil { t.Errorf("%v", gotExpected) @@ -115,3 +153,489 @@ func TestVerifyConfigMapEntry(t *testing.T) { } } } + +func TestProcessConfigMapEntryOnNodeCreation(t *testing.T) { + cases := []struct { + name string + key string + filestoreIP string + node *corev1.Node + cm *corev1.ConfigMap + lockReleaseError bool + expectedError bool + expectedConfigMapSize int + }{ + { + name: "should keep the entry", + key: "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1", + filestoreIP: "192.168.92.0", + cm: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fscsi-node-name", + Namespace: "gke-managed-filestorecsi", + }, + Data: map[string]string{ + "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1": "192.168.92.0", + }, + }, + + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "123456", + }, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{{Address: "192.168.1.1", Type: corev1.NodeInternalIP}}, + }, + }, + lockReleaseError: false, + expectedError: false, + expectedConfigMapSize: 1, + }, + { + name: "should remove the entry due to node's absence in config map", + key: "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1", + filestoreIP: "192.168.92.0", + cm: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fscsi-node-name", + Namespace: "gke-managed-filestorecsi", + }, + Data: map[string]string{ + "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1": "192.168.92.0", + }, + }, + + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "changed_key", + }, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{{Address: "192.168.1.1", Type: corev1.NodeInternalIP}}, + }, + }, + lockReleaseError: false, + expectedError: false, + expectedConfigMapSize: 0, + }, + { + name: "fail to remove the entry due to rpc call failure", + key: "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1", + filestoreIP: "192.168.92.0", + cm: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fscsi-node-name", + Namespace: "gke-managed-filestorecsi", + }, + Data: map[string]string{ + "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1": "192.168.92.0", + }, + }, + + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "changed_key", + }, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{{Address: "192.168.1.1", Type: corev1.NodeInternalIP}}, + }, + }, + lockReleaseError: true, + expectedError: true, + expectedConfigMapSize: 1, + }, + } + for _, test := range cases { + client := fake.NewSimpleClientset(test.cm, test.node) + eventProcessor := &DefaultEventProcessor{} + lockService := &MockLockService{} + if test.lockReleaseError { + lockService.On("ReleaseLock").Return(fmt.Errorf("fake lock release rpc call error")) + } else { + lockService.On("ReleaseLock").Return(nil) + } + + c := NewControllerBuilder().WithClient(client).WithProcessor(eventProcessor).WithLockService(lockService).Build() + err := eventProcessor.processConfigMapEntryOnNodeCreation(context.Background(), test.key, test.filestoreIP, test.node, test.cm) + fmt.Printf("test case: %s processConfigMapEntryOnNodeCreation result, %v", test.name, err) + if err != nil && !test.expectedError { + t.Errorf("got an unexpected error") + } + + if err == nil && test.expectedError { + t.Errorf("expected error but no error returned") + } + updatedCM, err := c.GetConfigMap(context.Background(), test.cm.Name, test.cm.Namespace) + if err != nil { + t.Error("error getting config map") + } + if got, want := len(updatedCM.Data), test.expectedConfigMapSize; got != want { + t.Errorf("expected resulting config map size: %d, but got %d", want, got) + } + } +} + +func TestProcessConfigMapEntryOnNodeUpdate(t *testing.T) { + cases := []struct { + name string + key string + filestoreIP string + newNode *corev1.Node + oldNode *corev1.Node + cm *corev1.ConfigMap + lockReleaseError bool + expectedError bool + expectedConfigMapSize int + }{ + { + name: "should keep the entry because new node matches config map entry", + key: "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1", + filestoreIP: "192.168.92.0", + cm: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fscsi-node-name", + Namespace: "gke-managed-filestorecsi", + }, + Data: map[string]string{ + "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1": "192.168.92.0", + }, + }, + + newNode: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "123456", + }, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{{Address: "192.168.1.1", Type: corev1.NodeInternalIP}}, + }, + }, + oldNode: &corev1.Node{}, + lockReleaseError: false, + expectedError: false, + expectedConfigMapSize: 1, + }, + { + name: "should remove the entry because old node matches config map entry but new node does not", + key: "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1", + filestoreIP: "192.168.92.0", + cm: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fscsi-node-name", + Namespace: "gke-managed-filestorecsi", + }, + Data: map[string]string{ + "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1": "192.168.92.0", + }, + }, + + newNode: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "changed_key", + }, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{{Address: "192.168.1.1", Type: corev1.NodeInternalIP}}, + }, + }, + oldNode: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "123456", + }, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{{Address: "192.168.1.1", Type: corev1.NodeInternalIP}}, + }, + }, + lockReleaseError: false, + expectedError: false, + expectedConfigMapSize: 0, + }, + { + name: "fail to remove the entry due to rpc call failure", + key: "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1", + filestoreIP: "192.168.92.0", + cm: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fscsi-node-name", + Namespace: "gke-managed-filestorecsi", + }, + Data: map[string]string{ + "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1": "192.168.92.0", + }, + }, + + newNode: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "changed_key", + }, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{{Address: "192.168.1.1", Type: corev1.NodeInternalIP}}, + }, + }, + oldNode: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "123456", + }, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{{Address: "192.168.1.1", Type: corev1.NodeInternalIP}}, + }, + }, + lockReleaseError: true, + expectedError: true, + expectedConfigMapSize: 1, + }, + } + for _, test := range cases { + client := fake.NewSimpleClientset(test.cm) + eventProcessor := &DefaultEventProcessor{} + lockService := &MockLockService{} + if test.lockReleaseError { + lockService.On("ReleaseLock").Return(fmt.Errorf("fake lock release rpc call error")) + } else { + lockService.On("ReleaseLock").Return(nil) + } + + c := NewControllerBuilder().WithClient(client).WithProcessor(eventProcessor).WithLockService(lockService).Build() + err := eventProcessor.processConfigMapEntryOnNodeUpdate(context.Background(), test.key, test.filestoreIP, test.newNode, test.oldNode, test.cm) + fmt.Printf("test case: %s processConfigMapEntryOnNodeUpdate result, %v", test.name, err) + if err != nil && !test.expectedError { + t.Errorf("got an unexpected error") + } + + if err == nil && test.expectedError { + t.Errorf("expected error but no error returned") + } + updatedCM, err := c.GetConfigMap(context.Background(), test.cm.Name, test.cm.Namespace) + if err != nil { + t.Error("error getting config map") + } + if got, want := len(updatedCM.Data), test.expectedConfigMapSize; got != want { + t.Errorf("expected resulting config map size: %d, but got %d", want, got) + } + } +} + +func TestHandleCreateEvent(t *testing.T) { + cases := []struct { + name string + existingCM *corev1.ConfigMap + obj interface{} + eventProcessorError bool + expectedError bool + }{ + { + name: "config map does not exist", + existingCM: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fscsi-not-exist", + Namespace: "gke-managed-filestorecsi", + }, + }, + obj: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "node1-id", + }, + }, + }, + expectedError: false, + }, + { + name: "config map is found but config map processing returns error", + existingCM: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fscsi-node-name", + Namespace: "gke-managed-filestorecsi", + }, + Data: map[string]string{ + "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1": "192.168.92.0", + "test-project.us-central1.test-filestore1.test-share.123456.192_168_1_1": "192.168.92.1", + }, + }, + obj: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "node2-id", + }, + }, + }, + eventProcessorError: true, + expectedError: true, + }, + { + name: "config map is found and all entries are processed successfully", + existingCM: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fscsi-node-name", + Namespace: "gke-managed-filestorecsi", + }, + Data: map[string]string{ + "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1": "192.168.92.0", + }, + }, + obj: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "node2-id", + }, + }, + }, + eventProcessorError: false, + expectedError: false, + }, + } + for _, test := range cases { + client := fake.NewSimpleClientset(test.existingCM) + eventProcessor := &MockEventProcessor{} + if test.eventProcessorError { + eventProcessor.On("processConfigMapEntryOnNodeCreation", mock.Anything).Return(fmt.Errorf("mock processor error")) + } else { + eventProcessor.On("processConfigMapEntryOnNodeCreation", mock.Anything).Return(nil) + } + controller := NewControllerBuilder().WithClient(client).WithProcessor(eventProcessor).Build() + err := controller.handleCreateEvent(context.Background(), test.obj) + fmt.Printf("test case: %s handleCreateEvent result, %v", test.name, err) + if err != nil && !test.expectedError { + t.Errorf("got an unexpected error") + } + + if err == nil && test.expectedError { + t.Errorf("expected error but no error returned") + } + } +} + +func TestHandleUpdateEvent(t *testing.T) { + cases := []struct { + name string + existingCM *corev1.ConfigMap + oldObj interface{} + newObj interface{} + eventProcessorError bool + expectedError bool + }{ + { + name: "config map does not exist", + existingCM: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fscsi-not-exist", + Namespace: "gke-managed-filestorecsi", + }, + }, + newObj: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "node1-id", + }, + }, + }, + oldObj: &corev1.Node{}, + expectedError: false, + }, + { + name: "config map is found but config map processing returns error", + existingCM: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fscsi-node-name", + Namespace: "gke-managed-filestorecsi", + }, + Data: map[string]string{ + "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1": "192.168.92.0", + "test-project.us-central1.test-filestore1.test-share.123456.192_168_1_1": "192.168.92.1", + }, + }, + newObj: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "node2-id", + }, + }, + }, + oldObj: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "node2-id", + }, + }, + }, + eventProcessorError: true, + expectedError: true, + }, + { + name: "config map is found and all entries are processed successfully", + existingCM: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fscsi-node-name", + Namespace: "gke-managed-filestorecsi", + }, + Data: map[string]string{ + "test-project.us-central1.test-filestore.test-share.123456.192_168_1_1": "192.168.92.0", + }, + }, + newObj: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "node2-id", + }, + }, + }, + oldObj: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-name", + Annotations: map[string]string{ + gceInstanceIDKey: "node2-id", + }, + }, + }, + eventProcessorError: false, + expectedError: false, + }, + } + for _, test := range cases { + client := fake.NewSimpleClientset(test.existingCM) + eventProcessor := &MockEventProcessor{} + if test.eventProcessorError { + eventProcessor.On("processConfigMapEntryOnNodeUpdate", mock.Anything).Return(fmt.Errorf("mock processor error")) + } else { + eventProcessor.On("processConfigMapEntryOnNodeUpdate", mock.Anything).Return(nil) + } + controller := NewControllerBuilder().WithClient(client).WithProcessor(eventProcessor).Build() + err := controller.handleUpdateEvent(context.Background(), test.oldObj, test.newObj) + fmt.Printf("test case: %s handleUpdateEvent result, %v", test.name, err) + if err != nil && !test.expectedError { + t.Errorf("got an unexpected error") + } + + if err == nil && test.expectedError { + t.Errorf("expected error but no error returned") + } + } +} diff --git a/pkg/releaselock/fake.go b/pkg/releaselock/fake.go index cc2e765c..ac967270 100644 --- a/pkg/releaselock/fake.go +++ b/pkg/releaselock/fake.go @@ -15,10 +15,39 @@ package lockrelease import "k8s.io/client-go/kubernetes" -func NewFakeLockReleaseController() *LockReleaseController { - return &LockReleaseController{} +type FakeLockReleaseControllerBuilder struct { + client kubernetes.Interface + processor EventProcessor + lockService LockService } -func NewFakeLockReleaseControllerWithClient(client kubernetes.Interface) *LockReleaseController { - return &LockReleaseController{client: client} +func NewControllerBuilder() *FakeLockReleaseControllerBuilder { + return &FakeLockReleaseControllerBuilder{} +} + +func (b *FakeLockReleaseControllerBuilder) WithClient(client kubernetes.Interface) *FakeLockReleaseControllerBuilder { + b.client = client + return b +} + +func (b *FakeLockReleaseControllerBuilder) WithProcessor(processor EventProcessor) *FakeLockReleaseControllerBuilder { + b.processor = processor + return b +} + +func (b *FakeLockReleaseControllerBuilder) WithLockService(lockService LockService) *FakeLockReleaseControllerBuilder { + b.lockService = lockService + return b +} + +func (b *FakeLockReleaseControllerBuilder) Build() *LockReleaseController { + c := &LockReleaseController{ + client: b.client, + eventProcessor: b.processor, + lockService: b.lockService, + } + if b.processor != nil { + b.processor.SetController(c) + } + return c } diff --git a/pkg/releaselock/rpc.go b/pkg/releaselock/rpc.go index 6550b20a..ea5e6230 100644 --- a/pkg/releaselock/rpc.go +++ b/pkg/releaselock/rpc.go @@ -41,6 +41,8 @@ const ( notifyCloseChannelSize = 1 ) +type FileStoreRPCClient struct{} + type releaseLockResponse struct { status releaseLockStatus } @@ -69,7 +71,7 @@ func RegisterLockReleaseProcedure() error { // ReleaseLock calls the Filestore server to remove all advisory locks for a given GKE node IP. // hostIP is the internal IP address of the Filestore instance. // clientIP is the internal IP address of the GKE node. -func ReleaseLock(hostIP, clientIP string) error { +func (c *FileStoreRPCClient) ReleaseLock(hostIP, clientIP string) error { // Check for valid IPV4 address. if net.ParseIP(hostIP) == nil { return fmt.Errorf("invalid Filestore IP address %s", hostIP)