Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DFBUGS-1011: Prevent dataloss due to the concurrent RPC calls (occurrence is very low) #429

Open
wants to merge 5 commits into
base: release-4.16
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ curl -X GET http://10.109.65.142:8080/metrics 2>/dev/null | grep csi
csi_liveness 1
```

Promethues can be deployed through the promethues operator described [here](https://coreos.com/operators/prometheus/docs/latest/user-guides/getting-started.html).
The [service-monitor](../deploy/service-monitor.yaml) will tell promethues how
Prometheus can be deployed through the prometheus operator described [here](https://coreos.com/operators/prometheus/docs/latest/user-guides/getting-started.html).
The [service-monitor](../deploy/service-monitor.yaml) will tell prometheus how
to pull metrics out of CSI.

Each CSI pod has a service to expose the endpoint to prometheus. By default, rbd
Expand Down
2 changes: 1 addition & 1 deletion examples/rbd/storageclass.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ parameters:
# "file": Enable file encryption on the mounted filesystem
# "block": Encrypt RBD block device
# When unspecified assume type "block". "file" and "block" are
# mutally exclusive.
# mutually exclusive.
# encryptionType: "block"

# (optional) Use external key management system for encryption passphrases by
Expand Down
23 changes: 16 additions & 7 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,13 @@ func (ns *NodeServer) NodePublishVolume(
targetPath := req.GetTargetPath()
volID := fsutil.VolumeID(req.GetVolumeId())

if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

volOptions := &store.VolumeOptions{}
defer volOptions.Destroy()

Expand All @@ -462,9 +469,6 @@ func (ns *NodeServer) NodePublishVolume(
return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error())
}

// Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish

if err = util.CreateMountPoint(targetPath); err != nil {
log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err)

Expand Down Expand Up @@ -555,12 +559,17 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, err
}

// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.
targetPath := req.GetTargetPath()
volID := req.GetVolumeId()
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

// stop the health-checker that may have been started in NodeGetVolumeStats()
ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath)
ns.healthChecker.StopChecker(volID, targetPath)

isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
if err != nil {
Expand All @@ -583,7 +592,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
isMnt = true
}
if !isMnt {
if err = os.RemoveAll(targetPath); err != nil {
if err = os.Remove(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand Down
4 changes: 2 additions & 2 deletions internal/cephfs/store/volumeoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func validateMounter(m string) error {
return nil
}

func (v *VolumeOptions) DetectMounter(options map[string]string) error {
return extractMounter(&v.Mounter, options)
func (vo *VolumeOptions) DetectMounter(options map[string]string) error {
return extractMounter(&vo.Mounter, options)
}

func extractMounter(dest *string, options map[string]string) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/health-checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type checker struct {
// timeout contains the delay (interval + timeout)
timeout time.Duration

// mutex protects against concurrent access to healty, err and
// mutex protects against concurrent access to healthy, err and
// lastUpdate
mutex *sync.RWMutex

Expand Down
22 changes: 16 additions & 6 deletions internal/rbd/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,12 @@ func (ns *NodeServer) NodePublishVolume(
volID := req.GetVolumeId()
stagingPath += "/" + volID

// Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

// Check if that target path exists properly
notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock)
Expand Down Expand Up @@ -913,8 +917,14 @@ func (ns *NodeServer) NodeUnpublishVolume(
}

targetPath := req.GetTargetPath()
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.

if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

isMnt, err := ns.Mounter.IsMountPoint(targetPath)
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -927,7 +937,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, status.Error(codes.NotFound, err.Error())
}
if !isMnt {
if err = os.RemoveAll(targetPath); err != nil {
if err = os.Remove(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand All @@ -938,7 +948,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, status.Error(codes.Internal, err.Error())
}

if err = os.RemoveAll(targetPath); err != nil {
if err = os.Remove(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand Down
2 changes: 1 addition & 1 deletion internal/util/crushlocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func Test_getCrushLocationMap(t *testing.T) {
want: map[string]string{"zone": "zone1"},
},
{
name: "multuple matching crushlocation and node labels",
name: "multiple matching crushlocation and node labels",
args: input{
crushLocationLabels: "topology.io/zone,topology.io/rack",
nodeLabels: map[string]string{
Expand Down
3 changes: 3 additions & 0 deletions internal/util/idlocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (

// SnapshotOperationAlreadyExistsFmt string format to return for concurrent operation.
SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists"

// TargetPathOperationAlreadyExistsFmt string format to return for concurrent operation on target path.
TargetPathOperationAlreadyExistsFmt = "an operation with the given target path %s already exists"
)

// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs
Expand Down
Loading