Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add locking to all volume operations #38

Merged
merged 4 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ test-sanity:
setup-external-e2e: test/e2e/e2e.test test/e2e/ginkgo

test/e2e/e2e.test test/e2e/ginkgo:
curl --location https://dl.k8s.io/v1.19.5/kubernetes-test-linux-amd64.tar.gz | \
curl --location https://dl.k8s.io/v1.27.5/kubernetes-test-linux-amd64.tar.gz | \
tar --strip-components=3 -C test/e2e -zxf - kubernetes/test/bin/e2e.test kubernetes/test/bin/ginkgo

.PHONY: test-e2e
test-e2e: setup-external-e2e
bash ./test/e2e/run.sh
bash ./test/e2e/run.sh
52 changes: 48 additions & 4 deletions deploy/k8s/controller-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: cloudstack-csi-controller
namespace: kube-system
spec:
replicas: 1
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
Expand Down Expand Up @@ -40,19 +40,44 @@ spec:
env:
- name: CSI_ENDPOINT
value: unix:///var/lib/csi/sockets/pluginproxy/csi.sock
securityContext:
runAsNonRoot: true
runAsUser: 65532
runAsGroup: 65532
volumeMounts:
- name: socket-dir
mountPath: /var/lib/csi/sockets/pluginproxy/
- name: cloudstack-conf
mountPath: /etc/cloudstack-csi-driver
ports:
- name: healthz
containerPort: 9808
protocol: TCP
livenessProbe:
httpGet:
path: /healthz
port: healthz
initialDelaySeconds: 30
timeoutSeconds: 10
periodSeconds: 180
failureThreshold: 3

- name: external-provisioner
image: registry.k8s.io/sig-storage/csi-provisioner:v3.3.1
image: registry.k8s.io/sig-storage/csi-provisioner:v3.5.0
imagePullPolicy: IfNotPresent
args:
- "--v=4"
- "--timeout=300s"
- "--csi-address=$(ADDRESS)"
- "--v=5"
- "--kube-api-qps=100"
- "--kube-api-burst=100"
- "--leader-election"
- "--leader-election-lease-duration=120s"
- "--leader-election-renew-deadline=60s"
- "--leader-election-retry-period=30s"
- "--default-fstype=ext4"
- "--feature-gates=Topology=true"
- "--strict-topology"
env:
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/csi.sock
Expand All @@ -64,8 +89,27 @@ spec:
image: registry.k8s.io/sig-storage/csi-attacher:v4.3.0
imagePullPolicy: IfNotPresent
args:
- "--v=4"
- "--timeout=300s"
- "--csi-address=$(ADDRESS)"
- "--leader-election"
- "--leader-election-lease-duration=120s"
- "--leader-election-renew-deadline=60s"
- "--leader-election-retry-period=30s"
- "--kube-api-qps=100"
- "--kube-api-burst=100"
env:
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/csi.sock
volumeMounts:
- name: socket-dir
mountPath: /var/lib/csi/sockets/pluginproxy/

- name: liveness-probe
image: registry.k8s.io/sig-storage/livenessprobe:v2.10.0
args:
- "--v=4"
- "--csi-address=$(ADDRESS)"
- "--v=5"
env:
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/csi.sock
Expand Down
29 changes: 27 additions & 2 deletions deploy/k8s/node-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ spec:
fieldPath: spec.nodeName
securityContext:
privileged: true
capabilities:
add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true
volumeMounts:
- name: plugin-dir
mountPath: /csi
Expand All @@ -55,6 +58,30 @@ spec:
mountPath: /run/cloud-init/
- name: cloudstack-conf
mountPath: /etc/cloudstack-csi-driver
ports:
- name: healthz
containerPort: 9808
protocol: TCP
livenessProbe:
httpGet:
path: /healthz
port: healthz
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 5
failureThreshold: 3

- name: liveness-probe
image: registry.k8s.io/sig-storage/livenessprobe:v2.10.0
args:
- "--v=4"
- "--csi-address=$(ADDRESS)"
env:
- name: ADDRESS
value: /csi/csi.sock
volumeMounts:
- name: plugin-dir
mountPath: /csi

- name: node-driver-registrar
image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.8.0
Expand All @@ -68,8 +95,6 @@ spec:
value: /csi/csi.sock
- name: DRIVER_REG_SOCK_PATH
value: /var/lib/kubelet/plugins/csi.cloudstack.apache.org/csi.sock
securityContext:
privileged: true
volumeMounts:
- name: plugin-dir
mountPath: /csi
Expand Down
103 changes: 82 additions & 21 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"
"fmt"
"math/rand"
"sync"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -24,15 +24,15 @@ var onlyVolumeCapAccessMode = csi.VolumeCapability_AccessMode{

type controllerServer struct {
csi.UnimplementedControllerServer
connector cloud.Interface
locks map[string]*sync.Mutex
connector cloud.Interface
volumeLocks *util.VolumeLocks
}

// NewControllerServer creates a new Controller gRPC server.
func NewControllerServer(connector cloud.Interface) csi.ControllerServer {
return &controllerServer{
connector: connector,
locks: make(map[string]*sync.Mutex),
connector: connector,
volumeLocks: util.NewVolumeLocks(),
}
}

Expand Down Expand Up @@ -61,6 +61,12 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Errorf(codes.InvalidArgument, "Missing parameter %v", DiskOfferingKey)
}

if acquired := cs.volumeLocks.TryAcquire(name); !acquired {
ctxzap.Extract(ctx).Sugar().Errorf(util.VolumeOperationAlreadyExistsFmt, name)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, name)
}
defer cs.volumeLocks.Release(name)

// Check if a volume with that name already exists
if vol, err := cs.connector.GetVolumeByName(ctx, name); err == cloud.ErrNotFound {
// The volume does not exist
Expand All @@ -77,6 +83,8 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
Volume: &csi.Volume{
VolumeId: vol.ID,
CapacityBytes: vol.Size,
VolumeContext: req.GetParameters(),
// ContentSource: req.GetVolumeContentSource(), TODO: snapshot support
AccessibleTopology: []*csi.Topology{
Topology{ZoneID: vol.ZoneID}.ToCSI(),
},
Expand Down Expand Up @@ -118,6 +126,13 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
zoneID = t.ZoneID
}

ctxzap.Extract(ctx).Sugar().Infow("Creating new volume",
"name", name,
"size", sizeInGB,
"offering", diskOfferingID,
"zone", zoneID,
)

volID, err := cs.connector.CreateVolume(ctx, diskOfferingID, zoneID, name, sizeInGB)
if err != nil {
return nil, status.Errorf(codes.Internal, "Cannot create volume %s: %v", name, err.Error())
Expand All @@ -127,6 +142,8 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
Volume: &csi.Volume{
VolumeId: volID,
CapacityBytes: util.GigaBytesToBytes(sizeInGB),
VolumeContext: req.GetParameters(),
// ContentSource: req.GetVolumeContentSource(), TODO: snapshot support
AccessibleTopology: []*csi.Topology{
Topology{ZoneID: zoneID}.ToCSI(),
},
Expand Down Expand Up @@ -198,6 +215,17 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}

volumeID := req.GetVolumeId()

if acquired := cs.volumeLocks.TryAcquire(volumeID); !acquired {
ctxzap.Extract(ctx).Sugar().Errorf(util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer cs.volumeLocks.Release(volumeID)

ctxzap.Extract(ctx).Sugar().Infow("Deleting volume",
"volumeID", volumeID,
)

err := cs.connector.DeleteVolume(ctx, volumeID)
if err != nil && err != cloud.ErrNotFound {
return nil, status.Errorf(codes.Internal, "Cannot delete volume %s: %s", volumeID, err.Error())
Expand All @@ -218,15 +246,6 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
}
nodeID := req.GetNodeId()

//Ensure only one node is processing at same time
lock, ok := cs.locks[nodeID]
if !ok {
lock = &sync.Mutex{}
cs.locks[nodeID] = lock
}
lock.Lock()
defer lock.Unlock()

if req.GetReadonly() {
return nil, status.Error(codes.InvalidArgument, "Readonly not possible")
}
Expand All @@ -238,6 +257,11 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
return nil, status.Error(codes.InvalidArgument, "Access mode not accepted")
}

ctxzap.Extract(ctx).Sugar().Infow("Initiating attaching volume",
"volumeID", volumeID,
"nodeID", nodeID,
)

// Check volume
vol, err := cs.connector.GetVolumeByID(ctx, volumeID)
if err == cloud.ErrNotFound {
Expand All @@ -248,6 +272,11 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
}

if vol.VirtualMachineID != "" && vol.VirtualMachineID != nodeID {
ctxzap.Extract(ctx).Sugar().Errorw("Volume already attached to another node",
"volumeID", volumeID,
"nodeID", nodeID,
"attached nodeID", vol.VirtualMachineID,
)
return nil, status.Error(codes.AlreadyExists, "Volume already assigned to another node")
}

Expand All @@ -260,18 +289,32 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs

if vol.VirtualMachineID == nodeID {
// volume already attached

ctxzap.Extract(ctx).Sugar().Infow("Volume already attached to node",
"volumeID", volumeID,
"nodeID", nodeID,
"deviceID", vol.DeviceID,
)
publishContext := map[string]string{
deviceIDContextKey: vol.DeviceID,
}
return &csi.ControllerPublishVolumeResponse{PublishContext: publishContext}, nil
}

ctxzap.Extract(ctx).Sugar().Infow("Attaching volume to node",
"volumeID", volumeID,
"nodeID", nodeID,
)

deviceID, err := cs.connector.AttachVolume(ctx, volumeID, nodeID)
if err != nil {
return nil, status.Errorf(codes.Internal, "Cannot attach volume %s: %s", volumeID, err.Error())
}

ctxzap.Extract(ctx).Sugar().Infow("Attached volume to node successfully",
"volumeID", volumeID,
"nodeID", nodeID,
)

publishContext := map[string]string{
deviceIDContextKey: deviceID,
}
Expand Down Expand Up @@ -302,17 +345,32 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *

// Check VM existence
if _, err := cs.connector.GetVMByID(ctx, nodeID); err == cloud.ErrNotFound {
return nil, status.Errorf(codes.NotFound, "VM %v not found", nodeID)
// volumes cannot be attached to deleted VMs
ctxzap.Extract(ctx).Sugar().Warnw("VM not found, marking ControllerUnpublishVolume successful",
"volumeID", volumeID,
"nodeID", nodeID,
)
return &csi.ControllerUnpublishVolumeResponse{}, nil
} else if err != nil {
// Error with CloudStack
return nil, status.Errorf(codes.Internal, "Error %v", err)
}

ctxzap.Extract(ctx).Sugar().Infow("Detaching volume from node",
"volumeID", volumeID,
"nodeID", nodeID,
)

err := cs.connector.DetachVolume(ctx, volumeID)
if err != nil {
return nil, status.Errorf(codes.Internal, "Cannot detach volume %s: %s", volumeID, err.Error())
}

ctxzap.Extract(ctx).Sugar().Infow("Detached volume from node successfully",
"volumeID", volumeID,
"nodeID", nodeID,
)

return &csi.ControllerUnpublishVolumeResponse{}, nil
}

Expand All @@ -334,13 +392,16 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
return nil, status.Errorf(codes.Internal, "Error %v", err)
}

var confirmed *csi.ValidateVolumeCapabilitiesResponse_Confirmed
if isValidVolumeCapabilities(volCaps) {
confirmed = &csi.ValidateVolumeCapabilitiesResponse_Confirmed{VolumeCapabilities: volCaps}
if !isValidVolumeCapabilities(volCaps) {
return &csi.ValidateVolumeCapabilitiesResponse{Message: "Requested VolumeCapabilities are invalid"}, nil
}

return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: confirmed,
}, nil
Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
VolumeContext: req.GetVolumeContext(),
VolumeCapabilities: volCaps,
Parameters: req.GetParameters(),
}}, nil
}

func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) bool {
Expand Down
Loading
Loading