From 37d5b883ca0cd6f0c21f2324f008da208be25481 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 20 Nov 2024 10:07:38 +0100 Subject: [PATCH 1/5] rbd: take lock on targetpath during node operation We should not be dependent on the CO to ensure that it will serialize the request instead of that we need to have own internal locks to ensure that we dont do concurrent operations for same request. Signed-off-by: Madhu Rajanna (cherry picked from commit 38c0e64307d4fb108b95498d2220fe870f8440c8) --- internal/rbd/nodeserver.go | 18 ++++++++++++++---- internal/util/idlocker.go | 3 +++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index b1170b90565..d8fb91094e0 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -709,8 +709,12 @@ func (ns *NodeServer) NodePublishVolume( volID := req.GetVolumeId() stagingPath += "/" + volID - // Considering kubelet make sure the stage and publish operations - // are serialized, we dont need any extra locking in nodePublish + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) // Check if that target path exists properly notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock) @@ -913,8 +917,14 @@ func (ns *NodeServer) NodeUnpublishVolume( } targetPath := req.GetTargetPath() - // considering kubelet make sure node operations like unpublish/unstage...etc can not be called - // at same time, an explicit locking at time of nodeunpublish is not required. + + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + isMnt, err := ns.Mounter.IsMountPoint(targetPath) if err != nil { if os.IsNotExist(err) { diff --git a/internal/util/idlocker.go b/internal/util/idlocker.go index 92733c19c9c..211081a1372 100644 --- a/internal/util/idlocker.go +++ b/internal/util/idlocker.go @@ -28,6 +28,9 @@ const ( // SnapshotOperationAlreadyExistsFmt string format to return for concurrent operation. SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists" + + // TargetPathOperationAlreadyExistsFmt string format to return for concurrent operation on target path. + TargetPathOperationAlreadyExistsFmt = "an operation with the given target path %s already exists" ) // VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs From 7d6d188c6774f717c7eb08f7ee7bae2896e04d7c Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 20 Nov 2024 10:12:59 +0100 Subject: [PATCH 2/5] cephfs: take lock on targetpath on node operation We should not be dependent on the CO to ensure that it will serialize the request instead of that we need to have own internal locks to ensure that we dont do concurrent operations for same request. Signed-off-by: Madhu Rajanna (cherry picked from commit 38b0a4cbadfb20e818e76cc0d3b603274513ec8e) --- internal/cephfs/nodeserver.go | 21 +++++++++++++++------ internal/cephfs/store/volumeoptions.go | 4 ++-- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index ede292b3142..8996dfb90cc 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -450,6 +450,13 @@ func (ns *NodeServer) NodePublishVolume( targetPath := req.GetTargetPath() volID := fsutil.VolumeID(req.GetVolumeId()) + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + volOptions := &store.VolumeOptions{} defer volOptions.Destroy() @@ -462,9 +469,6 @@ func (ns *NodeServer) NodePublishVolume( return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error()) } - // Considering kubelet make sure the stage and publish operations - // are serialized, we dont need any extra locking in nodePublish - if err = util.CreateMountPoint(targetPath); err != nil { log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err) @@ -555,12 +559,17 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, err } - // considering kubelet make sure node operations like unpublish/unstage...etc can not be called - // at same time, an explicit locking at time of nodeunpublish is not required. targetPath := req.GetTargetPath() + volID := req.GetVolumeId() + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) // stop the health-checker that may have been started in NodeGetVolumeStats() - ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath) + ns.healthChecker.StopChecker(volID, targetPath) isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) if err != nil { diff --git a/internal/cephfs/store/volumeoptions.go b/internal/cephfs/store/volumeoptions.go index 8a91b44b287..8c3feadeecd 100644 --- a/internal/cephfs/store/volumeoptions.go +++ b/internal/cephfs/store/volumeoptions.go @@ -151,8 +151,8 @@ func validateMounter(m string) error { return nil } -func (v *VolumeOptions) DetectMounter(options map[string]string) error { - return extractMounter(&v.Mounter, options) +func (vo *VolumeOptions) DetectMounter(options map[string]string) error { + return extractMounter(&vo.Mounter, options) } func extractMounter(dest *string, options map[string]string) error { From 7e3c894c7ae5c103b0a70e5ee9cc6ad1e1534c80 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 20 Nov 2024 10:14:26 +0100 Subject: [PATCH 3/5] cephfs: use os.Remove to remove directory using os.RemoveAll will remove everything in the director after the Umount we should be using os.Remove only to remove the empty directory Signed-off-by: Madhu Rajanna (cherry picked from commit ffa8eaf5ddc3a6f3041bf010f76e7d71c0a08b8d) --- internal/cephfs/nodeserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 8996dfb90cc..81757cce08e 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -592,7 +592,7 @@ func (ns *NodeServer) NodeUnpublishVolume( isMnt = true } if !isMnt { - if err = os.RemoveAll(targetPath); err != nil { + if err = os.Remove(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } From e1ba89b607c880602c860c5ef14eb9a488c775ac Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 20 Nov 2024 10:15:40 +0100 Subject: [PATCH 4/5] rbd: use os.Remove to remove directory using os.RemoveAll will remove everything in the director after the Umount we should be using os.Remove only to remove the empty directory Signed-off-by: Madhu Rajanna (cherry picked from commit 39cc628adf1da1ab78f3aacfb7fe8f53e9a1fd29) --- internal/rbd/nodeserver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index d8fb91094e0..21b5c9c1da1 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -937,7 +937,7 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, status.Error(codes.NotFound, err.Error()) } if !isMnt { - if err = os.RemoveAll(targetPath); err != nil { + if err = os.Remove(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -948,7 +948,7 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, status.Error(codes.Internal, err.Error()) } - if err = os.RemoveAll(targetPath); err != nil { + if err = os.Remove(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } From 2dc86ecd44c309b4c7827d433b2164224cb3b883 Mon Sep 17 00:00:00 2001 From: yati1998 Date: Tue, 17 Dec 2024 12:25:01 +0530 Subject: [PATCH 5/5] doc: correct codespell errors this commit resolves codespell errors Signed-off-by: yati1998 --- docs/metrics.md | 4 ++-- examples/rbd/storageclass.yaml | 2 +- internal/health-checker/checker.go | 2 +- internal/util/crushlocation_test.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/metrics.md b/docs/metrics.md index 8f19c8d2bc7..c4f45b0a52e 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -22,8 +22,8 @@ curl -X GET http://10.109.65.142:8080/metrics 2>/dev/null | grep csi csi_liveness 1 ``` -Promethues can be deployed through the promethues operator described [here](https://coreos.com/operators/prometheus/docs/latest/user-guides/getting-started.html). -The [service-monitor](../deploy/service-monitor.yaml) will tell promethues how +Prometheus can be deployed through the prometheus operator described [here](https://coreos.com/operators/prometheus/docs/latest/user-guides/getting-started.html). +The [service-monitor](../deploy/service-monitor.yaml) will tell prometheus how to pull metrics out of CSI. Each CSI pod has a service to expose the endpoint to prometheus. By default, rbd diff --git a/examples/rbd/storageclass.yaml b/examples/rbd/storageclass.yaml index 601a6696af4..5d2fc5ebae7 100644 --- a/examples/rbd/storageclass.yaml +++ b/examples/rbd/storageclass.yaml @@ -125,7 +125,7 @@ parameters: # "file": Enable file encryption on the mounted filesystem # "block": Encrypt RBD block device # When unspecified assume type "block". "file" and "block" are - # mutally exclusive. + # mutually exclusive. # encryptionType: "block" # (optional) Use external key management system for encryption passphrases by diff --git a/internal/health-checker/checker.go b/internal/health-checker/checker.go index 5eef779b5a5..14322b2d3df 100644 --- a/internal/health-checker/checker.go +++ b/internal/health-checker/checker.go @@ -37,7 +37,7 @@ type checker struct { // timeout contains the delay (interval + timeout) timeout time.Duration - // mutex protects against concurrent access to healty, err and + // mutex protects against concurrent access to healthy, err and // lastUpdate mutex *sync.RWMutex diff --git a/internal/util/crushlocation_test.go b/internal/util/crushlocation_test.go index 141f64d7841..a761b63bca8 100644 --- a/internal/util/crushlocation_test.go +++ b/internal/util/crushlocation_test.go @@ -60,7 +60,7 @@ func Test_getCrushLocationMap(t *testing.T) { want: map[string]string{"zone": "zone1"}, }, { - name: "multuple matching crushlocation and node labels", + name: "multiple matching crushlocation and node labels", args: input{ crushLocationLabels: "topology.io/zone,topology.io/rack", nodeLabels: map[string]string{