diff --git a/PendingReleaseNotes.md b/PendingReleaseNotes.md index 2cce5d928e6..da6b2c0288e 100644 --- a/PendingReleaseNotes.md +++ b/PendingReleaseNotes.md @@ -21,5 +21,6 @@ - nvmeof: add Kubernetes ServiceAccount based volume access restriction - cephfs: add Kubernetes ServiceAccount based volume access restriction - nfs: add Kubernetes ServiceAccount based volume access restriction +- rbd-nbd: use VolumeAttributesClass feature implement rbd volume qos [PR](https://github.com/ceph/ceph-csi/pull/6160) ## NOTE diff --git a/README.md b/README.md index e69e8ca421e..19e146d8e59 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,7 @@ for its support details. | | Volume/PV Metrics of File Mode Volume | GA | >= v1.2.0 | >= v1.1.0 | Pacific (>=v16.2.0) | >= v1.15.0 | | | Volume/PV Metrics of Block Mode Volume | GA | >= v1.2.0 | >= v1.1.0 | Pacific (>=v16.2.0) | >= v1.21.0 | | | Topology Aware Provisioning Support | Alpha | >= v2.1.0 | >= v1.1.0 | Pacific (>=v16.2.0) | >= v1.14.0 | +| | RBD-NBD QoS by VolumeAttributesClass | Alpha | >= v3.17.0 | >= v1.12.0 | Pacific (>=v16.2.0) | >= v1.34.0 | | CephFS | Dynamically provision, de-provision File mode RWO volume | GA | >= v1.1.0 | >= v1.0.0 | Pacific (>=v16.2.0) | >= v1.14.0 | | | Dynamically provision, de-provision File mode RWX volume | GA | >= v1.1.0 | >= v1.0.0 | Pacific (>=v16.2.0) | >= v1.14.0 | | | Dynamically provision, de-provision File mode ROX volume | Alpha | >= v3.0.0 | >= v1.0.0 | Pacific (>=v16.2.0) | >= v1.14.0 | diff --git a/charts/ceph-csi-cephfs/README.md b/charts/ceph-csi-cephfs/README.md index c7cb765d6f1..20342ca4c5f 100644 --- a/charts/ceph-csi-cephfs/README.md +++ b/charts/ceph-csi-cephfs/README.md @@ -161,7 +161,7 @@ charts and their default values. | `provisioner.imagePullSecrets` | Specifies imagePullSecrets for containers | `[]` | | `provisioner.profiling.enabled` | Specifies whether profiling should be enabled | `false` | | `provisioner.provisioner.image.repository` | Specifies the csi-provisioner image repository URL | `registry.k8s.io/sig-storage/csi-provisioner` | -| `provisioner.provisioner.image.tag` | Specifies image tag | `v6.1.1` | +| `provisioner.provisioner.image.tag` | Specifies image tag | `v6.2.0` | | `provisioner.provisioner.image.pullPolicy` | Specifies pull policy | `IfNotPresent` | | `provisioner.provisioner.args.httpEndpointPort` | Specifies http server port for diagnostics, health checks and metrics | `""` | | `provisioner.provisioner.extraArgs` | Specifies extra arguments for the provisioner sidecar | `[]` | diff --git a/charts/ceph-csi-cephfs/values.yaml b/charts/ceph-csi-cephfs/values.yaml index a5286c7518a..6d199638d78 100644 --- a/charts/ceph-csi-cephfs/values.yaml +++ b/charts/ceph-csi-cephfs/values.yaml @@ -232,7 +232,7 @@ provisioner: provisioner: image: repository: registry.k8s.io/sig-storage/csi-provisioner - tag: v6.1.1 + tag: v6.2.0 pullPolicy: IfNotPresent resources: {} args: diff --git a/charts/ceph-csi-rbd/templates/provisioner-clusterrole.yaml b/charts/ceph-csi-rbd/templates/provisioner-clusterrole.yaml index 61cf7f8518c..1b6ce80c97c 100644 --- a/charts/ceph-csi-rbd/templates/provisioner-clusterrole.yaml +++ b/charts/ceph-csi-rbd/templates/provisioner-clusterrole.yaml @@ -96,5 +96,8 @@ rules: - apiGroups: [""] resources: ["serviceaccounts/token"] verbs: ["create"] + - apiGroups: ["storage.k8s.io"] + resources: ["volumeattributesclasses"] + verbs: ["get", "list", "watch"] {{- end -}} diff --git a/charts/ceph-csi-rbd/values.yaml b/charts/ceph-csi-rbd/values.yaml index 41dd25967a2..19fd21c5a89 100644 --- a/charts/ceph-csi-rbd/values.yaml +++ b/charts/ceph-csi-rbd/values.yaml @@ -256,7 +256,7 @@ provisioner: provisioner: image: repository: registry.k8s.io/sig-storage/csi-provisioner - tag: v6.1.1 + tag: v6.2.0 pullPolicy: IfNotPresent resources: {} args: diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml index f8aff00a158..e1cfc6bc740 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml @@ -93,7 +93,7 @@ spec: - name: ceph-csi-encryption-kms-config mountPath: /etc/ceph-csi-encryption-kms-config/ - name: csi-provisioner - image: registry.k8s.io/sig-storage/csi-provisioner:v6.1.1 + image: registry.k8s.io/sig-storage/csi-provisioner:v6.2.0 args: - "--csi-address=$(ADDRESS)" - "--v=1" diff --git a/deploy/rbd/kubernetes/csi-provisioner-rbac.yaml b/deploy/rbd/kubernetes/csi-provisioner-rbac.yaml index 41118de60ca..60aa4f7ae31 100644 --- a/deploy/rbd/kubernetes/csi-provisioner-rbac.yaml +++ b/deploy/rbd/kubernetes/csi-provisioner-rbac.yaml @@ -81,6 +81,9 @@ rules: - apiGroups: ["replication.storage.openshift.io"] resources: ["volumegroupreplicationclasses"] verbs: ["get"] + - apiGroups: ["storage.k8s.io"] + resources: ["volumeattributesclasses"] + verbs: ["get", "list", "watch"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml index 090b4a40f3c..67a0cbb37f6 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml @@ -105,7 +105,7 @@ spec: mountPath: /run/secrets/tokens readOnly: true - name: csi-provisioner - image: registry.k8s.io/sig-storage/csi-provisioner:v6.1.1 + image: registry.k8s.io/sig-storage/csi-provisioner:v6.2.0 args: - "--csi-address=$(ADDRESS)" - "--v=1" diff --git a/docs/volumeattributesclass.md b/docs/volumeattributesclass.md index 50b7ddbde95..e668eec8530 100644 --- a/docs/volumeattributesclass.md +++ b/docs/volumeattributesclass.md @@ -64,3 +64,122 @@ parameters. This does not allow the Node-plugin accessing the Ceph cluster to fetch updated parameters when the Volume is _staged_ or _published_. The secrets for staging and publishing can not (easily) be updated after the fact, these are part of the fixed parameters in the PersistentVolume. + +## Use VolumeAttributesClass for rbd-nbd volume qos + +### Create a VolumeAttributesClass + +- Define a VolumeAttributesClass + +```console +--- +apiVersion: storage.k8s.io/v1 +kind: VolumeAttributesClass +metadata: + name: silver +driverName: rbd.csi.ceph.com +parameters: + baseIops: "1800" + maxIops: "10000" + baseBps: "104857600" + maxBps: "188743680" + iopsPerGiB: "12" + bpsPerGiB: "209715" + baseVolSizeBytes: "21474836480" +``` + +```console +kubectl create -f volumeattributesclass.yaml +``` + +- Verify VolumeAttributesClass has been created + +```console +$ kubectl get vac +NAME DRIVERNAME AGE +silver rbd.csi.ceph.com 2s +``` + +### Create PVC with VolumeAttributesClass + +- Define a PVC + +```console +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: rbd-pvc-vac +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 30Gi + storageClassName: csi-rbd-sc + volumeMode: Block + volumeAttributesClassName: silver +``` + +```console +$ kubectl create -f pvc.yaml +persistentvolumeclaim/rbd-pvc-vac created +``` + +- Verify pvc by get + +```console +$ kubectl get pvc +NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS VOLUMEATTRIBUTESCLASS AGE +rbd-pvc-vac Bound pvc-8b2fcb47-233a-4bcf-bb94-f94e9aa1150a 30Gi RWO csi-rbd-sc silver 1m +``` + +### Modify PVC VolumeAttributesClass + +- Create another VolumeAttributesClass + +```console +--- +apiVersion: storage.k8s.io/v1 +kind: VolumeAttributesClass +metadata: + name: gold +driverName: rbd.csi.ceph.com +parameters: + baseIops: "1800" + maxIops: "50000" + baseBps: "104857600" + maxBps: "367001600" + iopsPerGiB: "50" + bpsPerGiB: "524288" + baseVolSizeBytes: "21474836480" +``` + +```console +$ kubectl create -f volumeattributesclass.yaml +volumeattributesclass.storage.k8s.io/silver created +``` + +- Verify VolumeAttributesClass by get + +```console +$ kubectl get vac +NAME DRIVERNAME AGE +gold rbd.csi.ceph.com 1m +silver rbd.csi.ceph.com 2m +``` + +- Modify PVC VolumeAttributesClassName + +```console +$ kubectl patch pvc rbd-pvc-vac -p '{"spec":{"volumeAttributesClassName": "gold"}}' +persistentvolumeclaim/rbd-pvc-vac patched +``` + +- Verify modify pvc volumeAttributesClassName by get + +```console +$ kubectl get pvc +NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS VOLUMEATTRIBUTESCLASS AGE +rbd-pvc-vac Bound pvc-8b2fcb47-233a-4bcf-bb94-f94e9aa1150a 30Gi RWO csi-rbd-sc gold 2m +``` diff --git a/e2e/rbd.go b/e2e/rbd.go index 658b49ebe0e..9c87c4025b8 100644 --- a/e2e/rbd.go +++ b/e2e/rbd.go @@ -5776,7 +5776,38 @@ var _ = Describe("RBD", func() { validateOmapCount(f, 0, rbdType, defaultRBDPool, volumesType) }) - It("validate rbd image qos", func() { + It("validate rbd image qos by volumeattributesclass", func() { + if !supportsVolumeAttributesClass(c, f) { + framework.Logf("skipping VolumeAttributesClass test, needs Kubernetes >= 1.34 and ceph-csi >= 3.17") + + return + } + + // Recreate the StorageClass with the controller-modify-secret so + // that the provisioner can authenticate ControllerModifyVolume + // calls needed for VAC modifications. + err := deleteResource(rbdExamplePath + "storageclass.yaml") + if err != nil { + logAndFail("failed to delete storageclass: %v", err) + } + err = createRBDStorageClass(f.ClientSet, f, defaultSCName, nil, map[string]string{ + "csi.storage.k8s.io/controller-modify-secret-namespace": cephCSINamespace, + "csi.storage.k8s.io/controller-modify-secret-name": rbdProvisionerSecretName, + }, deletePolicy) + if err != nil { + logAndFail("failed to create storageclass with controller-modify-secret: %v", err) + } + defer func() { + err = deleteResource(rbdExamplePath + "storageclass.yaml") + if err != nil { + logAndFail("failed to delete storageclass: %v", err) + } + err = createRBDStorageClass(f.ClientSet, f, defaultSCName, nil, nil, deletePolicy) + if err != nil { + logAndFail("failed to create storageclass: %v", err) + } + }() + var ( baseIops = "3000" maxIops = "15000" @@ -5797,45 +5828,43 @@ var _ = Describe("RBD", func() { readBpsPerGiB = "2097152" writeBpsPerGiB = "1048576" baseVolSizeBytes = "21474836480" + + qosSilverVACName = "silver" + qosGoldVACName = "gold" + qosFlexVACName = "flex" // Capacity-based QoS ) + + // define silver vac parameters qosParameters := map[string]string{ "baseReadIops": baseReadIops, "baseWriteIops": baseWriteIops, "baseReadBps": baseReadBps, "baseWriteBps": baseWriteBps, } - err := deleteResource(rbdExamplePath + "storageclass.yaml") - if err != nil { - logAndFail("failed to delete storageclass: %v", err) - } - err = createRBDStorageClass( + // create silver vac + err = createRBDVolumeAttributesClass( f.ClientSet, f, - defaultSCName, - nil, - qosParameters, - deletePolicy) + qosSilverVACName, + qosParameters) if err != nil { - logAndFail("failed to create storageclass: %v", err) + logAndFail("failed to create volumeattributesclass: %v", err) } defer func() { - err = deleteResource(rbdExamplePath + "storageclass.yaml") + err = deleteRBDVolumeAttributesClass(f.ClientSet, f, qosSilverVACName) if err != nil { - logAndFail("failed to delete storageclass: %v", err) - } - err = createRBDStorageClass(f.ClientSet, f, defaultSCName, nil, nil, deletePolicy) - if err != nil { - logAndFail("failed to create storageclass: %v", err) + logAndFail("failed to delete volumeattributesclass: %v", err) } }() - // 1.1 create PVC + // create pvc with vac pvc, err := loadPVC(pvcPath) if err != nil { logAndFail("failed to load PVC: %v", err) } pvc.Namespace = f.UniqueName + pvc.Spec.VolumeAttributesClassName = &qosSilverVACName err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { logAndFail("failed to create PVC and application: %v", err) @@ -5844,7 +5873,7 @@ var _ = Describe("RBD", func() { validateRBDImageCount(f, 1, defaultRBDPool) validateOmapCount(f, 1, rbdType, defaultRBDPool, volumesType) - // 1.2 validate rbd image qos + // validate rbd image qos wants := map[string]string{ "rbd_qos_read_iops_limit": baseReadIops, "rbd_qos_write_iops_limit": baseWriteIops, @@ -5856,46 +5885,102 @@ var _ = Describe("RBD", func() { logAndFail("failed to validate qos: %v", err) } - // 1.3 delete pvc + // define gold vac parameters + qosParameters = map[string]string{ + "baseReadIops": "4000", + "baseWriteIops": "2000", + "baseReadBps": "419430400", + "baseWriteBps": "209715200", + } + // create gold vac + err = createRBDVolumeAttributesClass( + f.ClientSet, + f, + qosGoldVACName, + qosParameters) + if err != nil { + logAndFail("failed to create volumeattributesclass: %v", err) + } + defer func() { + err = deleteRBDVolumeAttributesClass(f.ClientSet, f, qosGoldVACName) + if err != nil { + logAndFail("failed to delete volumeattributesclass: %v", err) + } + }() + // modify vac to gold + err = modifyPVCVolumeAttributesClass( + f.ClientSet, + pvc, + qosGoldVACName) + if err != nil { + logAndFail("failed to modify volumeattributesclass: %v", err) + } + + // validate rbd image qos + wants = map[string]string{ + "rbd_qos_read_iops_limit": "4000", + "rbd_qos_write_iops_limit": "2000", + "rbd_qos_read_bps_limit": "419430400", + "rbd_qos_write_bps_limit": "209715200", + } + err = validateQOS(f, pvc, wants) + if err != nil { + logAndFail("failed to validate qos: %v", err) + } + + // delete pvc err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { logAndFail("failed to delete PVC: %v", err) } + // define flex vac parameters qosParameters = map[string]string{ + "baseIops": baseIops, + "maxIops": maxIops, "baseReadIops": baseReadIops, + "maxReadIops": maxReadIops, "baseWriteIops": baseWriteIops, + "maxWriteIops": maxWriteIops, + "baseBps": baseBps, + "maxBps": maxBps, "baseReadBps": baseReadBps, + "maxReadBps": maxReadBps, "baseWriteBps": baseWriteBps, + "maxWriteBps": maxWriteBps, + "iopsPerGiB": iopsPerGiB, "readIopsPerGiB": readIopsPerGiB, "writeIopsPerGiB": writeIopsPerGiB, + "bpsPerGiB": bpsPerGiB, "readBpsPerGiB": readBpsPerGiB, "writeBpsPerGiB": writeBpsPerGiB, "baseVolSizeBytes": baseVolSizeBytes, } - err = deleteResource(rbdExamplePath + "storageclass.yaml") - if err != nil { - logAndFail("failed to delete storageclass: %v", err) - } - err = createRBDStorageClass( + // create flex vac + err = createRBDVolumeAttributesClass( f.ClientSet, f, - defaultSCName, - nil, - qosParameters, - deletePolicy) + qosFlexVACName, + qosParameters) if err != nil { - logAndFail("failed to create storageclass: %v", err) + logAndFail("failed to create volumeattributesclass: %v", err) } + defer func() { + err = deleteRBDVolumeAttributesClass(f.ClientSet, f, qosFlexVACName) + if err != nil { + logAndFail("failed to delete volumeattributesclass: %v", err) + } + }() - // 2.1 create PVC + // create pvc with vac pvc, err = loadPVC(pvcPath) if err != nil { logAndFail("failed to load PVC: %v", err) } pvc.Namespace = f.UniqueName - pvc.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("100Gi") + pvc.Spec.VolumeAttributesClassName = &qosFlexVACName + pvc.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("200Gi") err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { logAndFail("failed to create PVC and application: %v", err) @@ -5904,19 +5989,21 @@ var _ = Describe("RBD", func() { validateRBDImageCount(f, 1, defaultRBDPool) validateOmapCount(f, 1, rbdType, defaultRBDPool, volumesType) - // 2.2 validate rbd image qos + // validate rbd image qos wants = map[string]string{ - "rbd_qos_read_iops_limit": "3600", - "rbd_qos_write_iops_limit": "1800", - "rbd_qos_read_bps_limit": "377487360", - "rbd_qos_write_bps_limit": "188743680", + "rbd_qos_iops_limit": "8400", + "rbd_qos_read_iops_limit": "5600", + "rbd_qos_write_iops_limit": "2800", + "rbd_qos_bps_limit": "880803840", + "rbd_qos_read_bps_limit": "587202560", + "rbd_qos_write_bps_limit": "293601280", } err = validateQOS(f, pvc, wants) if err != nil { logAndFail("failed to validate qos: %v", err) } - // 3.1 create snapshot + // create snapshot class err = createRBDSnapshotClass(f) if err != nil { logAndFail("failed to create storageclass: %v", err) @@ -5927,7 +6014,7 @@ var _ = Describe("RBD", func() { logAndFail("failed to delete VolumeSnapshotClass: %v", err) } }() - + // create snapshot snap := getSnapshot(snapshotPath) snap.Namespace = f.UniqueName snap.Spec.Source.PersistentVolumeClaimName = &pvc.Name @@ -5942,13 +6029,14 @@ var _ = Describe("RBD", func() { validateOmapCount(f, 1, rbdType, defaultRBDPool, volumesType) validateOmapCount(f, 1, rbdType, defaultRBDPool, snapsType) - // 3.2 create pvc from snapshot + // create pvc from snapshot pvcClone, err := loadPVC(pvcClonePath) if err != nil { logAndFail("failed to load PVC: %v", err) } pvcClone.Namespace = f.UniqueName - pvcClone.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("100Gi") + pvcClone.Spec.VolumeAttributesClassName = &qosFlexVACName + pvcClone.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("200Gi") err = createPVCAndvalidatePV(f.ClientSet, pvcClone, deployTimeout) if err != nil { logAndFail("failed to create PVC: %v", err) @@ -5960,41 +6048,44 @@ var _ = Describe("RBD", func() { validateOmapCount(f, 2, rbdType, defaultRBDPool, volumesType) validateOmapCount(f, 1, rbdType, defaultRBDPool, snapsType) - // 3.3 validate rbd image qos + // validate clone image qos err = validateQOS(f, pvcClone, wants) if err != nil { logAndFail("failed to validate qos: %v", err) } - // 3.4 delete clone pvc + // delete clone pvc err = deletePVCAndValidatePV(f.ClientSet, pvcClone, deployTimeout) if err != nil { logAndFail("failed to delete PVC: %v", err) } - // 3.5 validate create pvc from snapshot, but pvc size greater than parent + // validate create pvc from snapshot, and pvc size greater than parent pvcClone, err = loadPVC(pvcClonePath) if err != nil { logAndFail("failed to load PVC: %v", err) } pvcClone.Namespace = f.UniqueName - pvcClone.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("200Gi") + pvcClone.Spec.VolumeAttributesClassName = &qosFlexVACName + pvcClone.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("600Gi") err = createPVCAndvalidatePV(f.ClientSet, pvcClone, deployTimeout) if err != nil { logAndFail("failed to create PVC: %v", err) } wants2 := map[string]string{ - "rbd_qos_read_iops_limit": "5600", - "rbd_qos_write_iops_limit": "2800", - "rbd_qos_read_bps_limit": "587202560", - "rbd_qos_write_bps_limit": "293601280", + "rbd_qos_iops_limit": "15000", + "rbd_qos_read_iops_limit": "10000", + "rbd_qos_write_iops_limit": "5000", + "rbd_qos_bps_limit": "1572864000", + "rbd_qos_read_bps_limit": "1048576000", + "rbd_qos_write_bps_limit": "524288000", } err = validateQOS(f, pvcClone, wants2) if err != nil { logAndFail("failed to validate qos: %v", err) } - // 3.6 delete snapshot and clone pvc + // delete snapshot and clone pvc err = deleteSnapshot(&snap, deployTimeout) if err != nil { logAndFail("failed to delete snapshot: %v", err) @@ -6004,13 +6095,14 @@ var _ = Describe("RBD", func() { logAndFail("failed to delete PVC: %v", err) } - // 4.1 create pvc from pvc + // create pvc from pvc pvcSmartClone, err := loadPVC(pvcSmartClonePath) if err != nil { logAndFail("failed to load pvcSmartClone: %v", err) } pvcSmartClone.Namespace = f.UniqueName - pvcSmartClone.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("100Gi") + pvcSmartClone.Spec.VolumeAttributesClassName = &qosFlexVACName + pvcSmartClone.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("200Gi") err = createPVCAndvalidatePV(f.ClientSet, pvcSmartClone, deployTimeout) if err != nil { logAndFail("failed to create pvc: %v", err) @@ -6021,25 +6113,26 @@ var _ = Describe("RBD", func() { validateRBDImageCount(f, totalImages, defaultRBDPool) validateOmapCount(f, 2, rbdType, defaultRBDPool, volumesType) - // 4.2 validate rbd image qos + // validate rbd image qos err = validateQOS(f, pvcSmartClone, wants) if err != nil { logAndFail("failed to validate qos: %v", err) } - // 4.3 delete clone pvc + // delete clone pvc err = deletePVCAndValidatePV(f.ClientSet, pvcSmartClone, deployTimeout) if err != nil { logAndFail("failed to delete PVC: %v", err) } - // 4.4 create pvc from pvc, but pvc size greater than parent + // create pvc from pvc, and pvc size greater than parent pvcSmartClone, err = loadPVC(pvcSmartClonePath) if err != nil { logAndFail("failed to load pvcSmartClone: %v", err) } pvcSmartClone.Namespace = f.UniqueName - pvcSmartClone.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("200Gi") + pvcSmartClone.Spec.VolumeAttributesClassName = &qosFlexVACName + pvcSmartClone.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("600Gi") err = createPVCAndvalidatePV(f.ClientSet, pvcSmartClone, deployTimeout) if err != nil { logAndFail("failed to create pvc: %v", err) @@ -6049,7 +6142,7 @@ var _ = Describe("RBD", func() { logAndFail("failed to validate qos: %v", err) } - // 4.5 delete parent pvc and clone pvc + // delete parent pvc and clone pvc err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { logAndFail("failed to delete PVC: %v", err) @@ -6059,105 +6152,6 @@ var _ = Describe("RBD", func() { logAndFail("failed to delete PVC: %v", err) } - qosParameters = map[string]string{ - "baseIops": baseIops, - "maxIops": maxIops, - "baseReadIops": baseReadIops, - "maxReadIops": maxReadIops, - "baseWriteIops": baseWriteIops, - "maxWriteIops": maxWriteIops, - "baseBps": baseBps, - "maxBps": maxBps, - "baseReadBps": baseReadBps, - "maxReadBps": maxReadBps, - "baseWriteBps": baseWriteBps, - "maxWriteBps": maxWriteBps, - "iopsPerGiB": iopsPerGiB, - "readIopsPerGiB": readIopsPerGiB, - "writeIopsPerGiB": writeIopsPerGiB, - "bpsPerGiB": bpsPerGiB, - "readBpsPerGiB": readBpsPerGiB, - "writeBpsPerGiB": writeBpsPerGiB, - "baseVolSizeBytes": baseVolSizeBytes, - } - err = deleteResource(rbdExamplePath + "storageclass.yaml") - if err != nil { - logAndFail("failed to delete storageclass: %v", err) - } - err = createRBDStorageClass( - f.ClientSet, - f, - defaultSCName, - nil, - qosParameters, - deletePolicy) - if err != nil { - logAndFail("failed to create storageclass: %v", err) - } - pvc, err = loadPVC(pvcPath) - if err != nil { - logAndFail("failed to load PVC: %v", err) - } - pvc.Namespace = f.UniqueName - pvc.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("200Gi") - err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) - if err != nil { - logAndFail("failed to create PVC and application: %v", err) - } - // validate created backend rbd images - validateRBDImageCount(f, 1, defaultRBDPool) - validateOmapCount(f, 1, rbdType, defaultRBDPool, volumesType) - - wants3 := map[string]string{ - "rbd_qos_iops_limit": "8400", - "rbd_qos_read_iops_limit": "5600", - "rbd_qos_write_iops_limit": "2800", - "rbd_qos_bps_limit": "880803840", - "rbd_qos_read_bps_limit": "587202560", - "rbd_qos_write_bps_limit": "293601280", - } - err = validateQOS(f, pvc, wants3) - if err != nil { - logAndFail("failed to validate qos: %v", err) - } - - err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) - if err != nil { - framework.Failf("failed to delete PVC: %v", err) - } - - pvc, err = loadPVC(pvcPath) - if err != nil { - framework.Failf("failed to load PVC: %v", err) - } - pvc.Namespace = f.UniqueName - pvc.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("600Gi") - err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) - if err != nil { - framework.Failf("failed to create PVC and application: %v", err) - } - // validate created backend rbd images - validateRBDImageCount(f, 1, defaultRBDPool) - validateOmapCount(f, 1, rbdType, defaultRBDPool, volumesType) - - wants4 := map[string]string{ - "rbd_qos_iops_limit": "15000", - "rbd_qos_read_iops_limit": "10000", - "rbd_qos_write_iops_limit": "5000", - "rbd_qos_bps_limit": "1572864000", - "rbd_qos_read_bps_limit": "1048576000", - "rbd_qos_write_bps_limit": "524288000", - } - err = validateQOS(f, pvc, wants4) - if err != nil { - logAndFail("failed to validate qos: %v", err) - } - - err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) - if err != nil { - framework.Failf("failed to delete PVC: %v", err) - } - // END: validate created backend rbd images validateRBDImageCount(f, 0, defaultRBDPool) validateOmapCount(f, 0, rbdType, defaultRBDPool, volumesType) diff --git a/e2e/rbd_helper.go b/e2e/rbd_helper.go index 232da6c9be0..20d19fab369 100644 --- a/e2e/rbd_helper.go +++ b/e2e/rbd_helper.go @@ -28,8 +28,10 @@ import ( "github.com/google/uuid" snapapi "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1" + . "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" scv1 "k8s.io/api/storage/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -114,6 +116,13 @@ func rbdOptions(pool string) string { return "--pool=" + pool } +// supportsVolumeAttributesClass returns true when both the Kubernetes cluster +// (>= 1.34) and the deployed ceph-csi (>= 3.17) support VolumeAttributesClass. +func supportsVolumeAttributesClass(c kubernetes.Interface, f *framework.Framework) bool { + return k8sVersionGreaterEquals(c, 1, 34) && + cephcsiVersionGreaterEquals(f, rbdDaemonsetName, rbdContainerName, 3, 17) +} + func createRBDStorageClass( c kubernetes.Interface, f *framework.Framework, @@ -1652,3 +1661,120 @@ func validateServiceAccountVolumeRestriction( return nil } + +func createRBDVolumeAttributesClass( + c kubernetes.Interface, + f *framework.Framework, + name string, + params map[string]string, +) error { + vacPath := fmt.Sprintf("%s/%s", rbdExamplePath, "volumeattributesclass.yaml") + vac, err := getVolumeAttributesClass(vacPath) + if err != nil { + return fmt.Errorf("failed to get vac: %w", err) + } + if name != "" { + vac.Name = name + } + + // overload any parameters that were passed + if params == nil { + // create an empty params, so that params["clusterID"] below + // does not panic + params = map[string]string{} + } + for param, value := range params { + vac.Parameters[param] = value + } + + timeout := time.Duration(deployTimeout) * time.Minute + + return wait.PollUntilContextTimeout(context.TODO(), poll, timeout, true, func(ctx context.Context) (bool, error) { + _, err = c.StorageV1().VolumeAttributesClasses().Create(ctx, &vac, metav1.CreateOptions{}) + if err != nil { + framework.Logf("error creating VolumeAttributesClass %q: %v", vac.Name, err) + if apierrs.IsAlreadyExists(err) { + return true, nil + } + if isRetryableAPIError(err) { + return false, nil + } + + return false, fmt.Errorf("failed to create VolumeAttributesClass %q: %w", vac.Name, err) + } + + return true, nil + }) +} + +func deleteRBDVolumeAttributesClass( + c kubernetes.Interface, + f *framework.Framework, + name string, +) error { + vacPath := fmt.Sprintf("%s/%s", rbdExamplePath, "volumeattributesclass.yaml") + vac, err := getVolumeAttributesClass(vacPath) + if err != nil { + return err + } + if name != "" { + vac.Name = name + } + + timeout := time.Duration(deployTimeout) * time.Minute + + return wait.PollUntilContextTimeout(context.TODO(), poll, timeout, true, func(ctx context.Context) (bool, error) { + err = c.StorageV1().VolumeAttributesClasses().Delete(ctx, vac.Name, metav1.DeleteOptions{}) + if err != nil { + framework.Logf("error deleting VolumeAttributesClass %q: %v", vac.Name, err) + if apierrs.IsNotFound(err) { + return true, nil + } + if isRetryableAPIError(err) { + return false, nil + } + + return false, fmt.Errorf("failed to delete VolumeAttributesClass %q: %w", vac.Name, err) + } + + return true, nil + }) +} + +func modifyPVCVolumeAttributesClass( + c kubernetes.Interface, + pvc *v1.PersistentVolumeClaim, + vacName string, +) error { + ctx := context.TODO() + pvcName := pvc.Name + pvcNamespace := pvc.Namespace + updatedPVC, err := getPersistentVolumeClaim(c, pvcNamespace, pvcName) + if err != nil { + return fmt.Errorf("error fetching pvc %q with %w", pvcName, err) + } + + timeout := time.Duration(deployTimeout) * time.Minute + updatedPVC.Spec.VolumeAttributesClassName = &vacName + _, err = c.CoreV1(). + PersistentVolumeClaims(updatedPVC.Namespace). + Update(ctx, updatedPVC, metav1.UpdateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + return wait.PollUntilContextTimeout(ctx, poll, timeout, true, func(ctx context.Context) (bool, error) { + updatedPVC, err = c.CoreV1().PersistentVolumeClaims(pvcNamespace).Get(ctx, pvcName, metav1.GetOptions{}) + if err != nil { + if isRetryableAPIError(err) { + return false, nil + } + + return false, fmt.Errorf("failed to get pvc: %w", err) + } + + if *updatedPVC.Status.CurrentVolumeAttributesClassName != vacName { + return false, nil + } + + return true, nil + }) +} diff --git a/e2e/utils.go b/e2e/utils.go index b8657ecee02..a5771e70d58 100644 --- a/e2e/utils.go +++ b/e2e/utils.go @@ -23,6 +23,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "os" "regexp" "strconv" @@ -1733,6 +1734,99 @@ func k8sVersionGreaterEquals(c kubernetes.Interface, major, minor int) bool { return (vMajor > major) || (vMajor == major && vMinor >= minor) } +// parseCephCSIVersion parses the version string reported by "cephcsi -version" +// and returns the major and minor version numbers. Supported formats: +// +// - "canary" – unversioned development build; returns math.MaxInt +// - "v3.16.8" – release build +// - "v3.16-canary" – pre-release snapshot for the 3.16 series +// - "v3.17-canary" – pre-release snapshot for the 3.17 series +// +// In all pre-release / canary cases the returned values represent the version +// series the build belongs to, so callers can use them for feature gating. +func parseCephCSIVersion(version string) (int, int, error) { + if version == "canary" { + return math.MaxInt, math.MaxInt, nil + } + + // Strip the optional "v" prefix. + version = strings.TrimPrefix(version, "v") + + // Split on "." to separate major from the rest ("16", "16.8", "16-canary"). + parts := strings.SplitN(version, ".", 3) + if len(parts) < 2 { + return 0, 0, fmt.Errorf("unexpected cephcsi version format: %q", version) + } + + major, err := strconv.Atoi(parts[0]) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse major version from %q: %w", version, err) + } + + // The minor segment may carry a pre-release suffix separated by "-" + // (e.g. "16-canary"); discard everything from the first "-" onward. + minorStr, _, _ := strings.Cut(parts[1], "-") + minor, err := strconv.Atoi(minorStr) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse minor version from %q: %w", version, err) + } + + return major, minor, nil +} + +// getCephCSIVersion runs "cephcsi -version" in the given nodeplugin container +// and returns the major and minor version numbers parsed from the +// "Cephcsi Version:" line of the output. Development builds that report +// "canary" as their version return math.MaxInt for both values so that they +// always compare as newer than any specific release. +func getCephCSIVersion(f *framework.Framework, daemonsetName, containerName string) (int, int, error) { + selector, err := getDaemonSetLabelSelector(f, cephCSINamespace, daemonsetName) + if err != nil { + return 0, 0, fmt.Errorf("failed to get DaemonSet label selector for %s: %w", daemonsetName, err) + } + + opt := metav1.ListOptions{ + LabelSelector: selector, + } + + stdout, stdErr, err := execCommandInContainer(f, "cephcsi -version", cephCSINamespace, containerName, &opt) + if err != nil { + return 0, 0, fmt.Errorf("failed to run cephcsi -version in %s: %w", containerName, err) + } + if stdErr != "" { + return 0, 0, fmt.Errorf("cephcsi -version returned stderr in %s: %s", containerName, stdErr) + } + + for _, line := range strings.Split(stdout, "\n") { + if !strings.HasPrefix(line, "Cephcsi Version:") { + continue + } + + version := strings.TrimSpace(strings.TrimPrefix(line, "Cephcsi Version:")) + + return parseCephCSIVersion(version) + } + + return 0, 0, fmt.Errorf("failed to find version in cephcsi -version output: %q", stdout) +} + +// cephcsiVersionGreaterEquals checks the version of the cephcsi binary +// running in the given nodeplugin container and compares it to the +// major.minor version passed. It returns true when the running version is +// equal to or newer than major.minor, false otherwise. +// If fetching the version fails, the calling test case is marked as FAILED +// and gets aborted. +func cephcsiVersionGreaterEquals(f *framework.Framework, daemonsetName, containerName string, major, minor int) bool { + vMajor, vMinor, err := getCephCSIVersion(f, daemonsetName, containerName) + if err != nil { + logAndFail("failed to get cephcsi version from %s/%s: %v", daemonsetName, containerName, err) + // logAndFail calls framework.Failf which terminates the goroutine; + // this return is unreachable but satisfies the compiler. + } + + return (vMajor > major) || (vMajor == major && vMinor >= minor) +} + // waitForJobCompletion polls the status of the given job and waits until the // jobs has succeeded or until the timeout is hit. func waitForJobCompletion(c kubernetes.Interface, ns, job string, timeout int) error { diff --git a/e2e/utils_test.go b/e2e/utils_test.go new file mode 100644 index 00000000000..e49dc84b95c --- /dev/null +++ b/e2e/utils_test.go @@ -0,0 +1,61 @@ +/* +Copyright 2026 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "math" + "testing" +) + +func TestParseCephCSIVersion(t *testing.T) { + t.Parallel() + + tests := []struct { + input string + wantMajor int + wantMinor int + wantErr bool + }{ + {input: "canary", wantMajor: math.MaxInt, wantMinor: math.MaxInt}, + {input: "v3.16-canary", wantMajor: 3, wantMinor: 16}, + {input: "v3.17-canary", wantMajor: 3, wantMinor: 17}, + {input: "v3.16.8", wantMajor: 3, wantMinor: 16}, + {input: "v3.16.0", wantMajor: 3, wantMinor: 16}, + {input: "invalid", wantErr: true}, + {input: "v3", wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + t.Parallel() + + major, minor, err := parseCephCSIVersion(tt.input) + if (err != nil) != tt.wantErr { + t.Errorf("parseCephCSIVersion(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr) + + return + } + if err != nil { + return + } + if major != tt.wantMajor || minor != tt.wantMinor { + t.Errorf("parseCephCSIVersion(%q) = (%d, %d), want (%d, %d)", + tt.input, major, minor, tt.wantMajor, tt.wantMinor) + } + }) + } +} diff --git a/examples/rbd/storageclass.yaml b/examples/rbd/storageclass.yaml index 7778be885e9..1c731e4709f 100644 --- a/examples/rbd/storageclass.yaml +++ b/examples/rbd/storageclass.yaml @@ -90,6 +90,9 @@ parameters: csi.storage.k8s.io/controller-publish-secret-namespace: default csi.storage.k8s.io/node-stage-secret-name: csi-rbd-secret csi.storage.k8s.io/node-stage-secret-namespace: default + # controller-modify-secret + csi.storage.k8s.io/controller-modify-secret-name: csi-rbd-secret + csi.storage.k8s.io/controller-modify-secret-namespace: default # (optional) Specify the filesystem type of the volume. If not specified, # csi-provisioner will set default as `ext4`. @@ -197,58 +200,6 @@ parameters: # stripeCount: <> # (optional) The object size in bytes. # objectSize: <> - - # rbd volume QoS. - # QoS provides settings for rbd volume read/write iops - # and read/write bandwidth. There are 4 base qos parameters - # among them, when users apply for a volume capacity equal - # to or less than BaseVolSizebytes, use base qos limit. - # For the portion of capacity exceeding BaseVolSizebytes, - # QoS will be increased in steps set per GiB. If the step - # size parameter per GiB is not provided, only base QoS limit - # will be used and not associated with capacity size. - # - # note: currently supports rbd-nbd mounter. - # - # For more details - # (optional) the base limit of operations per second. - # baseIops: <> - # (optional) the max limit of operations per second. - # maxIops: <> - # (optional) the base limit of read operations per second. - # baseReadIops: <> - # (optional) the max limit of read operations per second. - # maxReadIops: <> - # (optional) the base limit of write operations per second. - # baseWriteIops: <> - # (optional) the max limit of write operations per second. - # maxWriteIops: <> - # (optional) the base limit of bytes per second. - # baseBps: <> - # (optional) the max limit of bytes per second. - # maxBps: <> - # (optional) the base limit of read bytes per second. - # baseReadBps: <> - # (optional) the max limit of read bytes per second. - # maxReadBps: <> - # (optional) the base limit of write bytes per second. - # baseWriteBps: <> - # (optional) the max limit of write bytes per second. - # maxWriteBps: <> - # (optional) the limit of operations per GiB. - # iopsPerGiB: <> - # (optional) the limit of read operations per GiB. - # readIopsPerGiB: <> - # (optional) the limit of write operations per GiB. - # writeIopsPerGiB: <> - # (optional) the limit of bytes per GiB. - # bpsPerGiB: <> - # (optional) the limit of read bytes per GiB. - # readBpsPerGiB: <> - # (optional) the limit of write bytes per GiB. - # writeBpsPerGiB: <> - # (optional) min size of volume what use to calc qos beased on capacity. - # baseVolSizeBytes:<> reclaimPolicy: Delete allowVolumeExpansion: true diff --git a/examples/rbd/volumeattributesclass.yaml b/examples/rbd/volumeattributesclass.yaml new file mode 100644 index 00000000000..53083799e2b --- /dev/null +++ b/examples/rbd/volumeattributesclass.yaml @@ -0,0 +1,58 @@ +--- +apiVersion: storage.k8s.io/v1 +kind: VolumeAttributesClass +metadata: + name: qos-vac +driverName: rbd.csi.ceph.com +parameters: + # rbd volume QoS. + # QoS provides settings for rbd volume read/write iops + # and read/write bandwidth. There are 6 base qos parameters + # among them, when users apply for a volume capacity equal + # to or less than BaseVolSizebytes, use base qos limit. + # For the portion of capacity exceeding BaseVolSizebytes, + # QoS will be increased in steps set per GiB. If the step + # size parameter per GiB is not provided, only base QoS limit + # will be used and not associated with capacity size. + # + # note: currently supports rbd-nbd mounter. + # + # For more details + # (optional) the base limit of operations per second. + baseIops: "1000" + # (optional) the max limit of operations per second. + # maxIops: <> + # (optional) the base limit of read operations per second. + # baseReadIops: <> + # (optional) the max limit of read operations per second. + # maxReadIops: <> + # (optional) the base limit of write operations per second. + # baseWriteIops: <> + # (optional) the max limit of write operations per second. + # maxWriteIops: <> + # (optional) the base limit of bytes per second. + # baseBps: <> + # (optional) the max limit of bytes per second. + # maxBps: <> + # (optional) the base limit of read bytes per second. + # baseReadBps: <> + # (optional) the max limit of read bytes per second. + # maxReadBps: <> + # (optional) the base limit of write bytes per second. + # baseWriteBps: <> + # (optional) the max limit of write bytes per second. + # maxWriteBps: <> + # (optional) the limit of operations per GiB. + # iopsPerGiB: <> + # (optional) the limit of read operations per GiB. + # readIopsPerGiB: <> + # (optional) the limit of write operations per GiB. + # writeIopsPerGiB: <> + # (optional) the limit of bytes per GiB. + # bpsPerGiB: <> + # (optional) the limit of read bytes per GiB. + # readBpsPerGiB: <> + # (optional) the limit of write bytes per GiB. + # writeBpsPerGiB: <> + # (optional) min size of volume what use to calc qos beased on capacity. + # baseVolSizeBytes:<> diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index 9dffeb2e9c8..a4a5c275c8d 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -234,8 +234,8 @@ func (cs *ControllerServer) parseVolCreateRequest( return nil, status.Error(codes.InvalidArgument, err.Error()) } - // Get QosParameters from SC if qos configuration existing in SC - err = rbdVol.SetQOS(ctx, req.GetParameters()) + // parse QOS parameters from mutable parameters + err = rbdVol.SetQOS(ctx, req.GetMutableParameters()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -435,7 +435,7 @@ func (cs *ControllerServer) CreateVolume( } }() - err = cs.createBackingImage(ctx, cr, req.GetSecrets(), rbdVol, parentVol, rbdSnap, req.GetParameters()) + err = cs.createBackingImage(ctx, cr, req.GetSecrets(), rbdVol, parentVol, rbdSnap, req.GetMutableParameters()) if err != nil { if errors.Is(err, rbderrors.ErrFlattenInProgress) { return nil, status.Error(codes.Aborted, err.Error()) @@ -765,7 +765,7 @@ func (cs *ControllerServer) createBackingImage( secrets map[string]string, rbdVol, parentVol *rbdVolume, rbdSnap *rbdSnapshot, - scParams map[string]string, + mutableParameters map[string]string, ) error { var err error @@ -827,8 +827,8 @@ func (cs *ControllerServer) createBackingImage( return status.Error(codes.Internal, err.Error()) } - // Save Qos parameters from SC in Image metadata, we will use it while resize volume. - err = rbdVol.SaveQOS(ctx, scParams) + // Save Qos parameters from mutable parameters in Image metadata, we will use it while resize volume. + err = rbdVol.SaveQOS(ctx, mutableParameters) if err != nil { log.ErrorLog(ctx, "failed to save QOS for rbd image: %s with error: %v", rbdVol, err) @@ -1963,3 +1963,86 @@ func (cs *ControllerServer) fenceNode( return nil } + +// ControllerModifyVolume modify the QoS of rbd based on of ControllerModifyVolumeRequest. +// +// Parameters: +// - csi.ControllerModifyVolumeRequest: Passing a list of mutable parameters from csi-resizer. +// +// Return an error if any step in the ControllerModifyVolume process fails. +func (cs *ControllerServer) ControllerModifyVolume( + ctx context.Context, + req *csi.ControllerModifyVolumeRequest, +) (*csi.ControllerModifyVolumeResponse, error) { + err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_MODIFY_VOLUME) + if err != nil { + log.ErrorLog(ctx, "invalid modify volume req: %v", protosanitizer.StripSecrets(req)) + + return nil, err + } + + volID := req.GetVolumeId() + if err := util.ValidateVolumeID(volID, true); err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + mutableParameters := req.GetMutableParameters() + if mutableParameters == nil { + log.ErrorLog(ctx, "invalid modify volume req: %v", protosanitizer.StripSecrets(req)) + + return nil, status.Error(codes.InvalidArgument, "mutable parameters cannot be empty") + } + + // lock out parallel requests against the same volume ID + if acquired := cs.VolumeLocks.TryAcquire(volID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID) + } + defer cs.VolumeLocks.Release(volID) + + cr, err := util.NewUserCredentialsWithMigration(req.GetSecrets()) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + defer cr.DeleteCredentials() + rbdVol, err := genVolFromVolIDWithMigration(ctx, volID, cr, req.GetSecrets()) + if err != nil { + switch { + case errors.Is(err, rbderrors.ErrImageNotFound): + err = status.Errorf(codes.NotFound, "volume ID %s not found", volID) + case errors.Is(err, util.ErrPoolNotFound): + log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err) + err = status.Error(codes.NotFound, err.Error()) + case errors.Is(err, rbderrors.ErrInvalidVolID): + // likely a static provisioned volume + // InvalidArgument indicates that the CO has specified capabilities not supported by the volume + err = status.Errorf(codes.InvalidArgument, "volume ID %s does not support modify", volID) + default: + err = status.Error(codes.Internal, err.Error()) + } + + return nil, err + } + defer rbdVol.Destroy(ctx) + + // lock out volumeID for create, clone, expand and delete operation + if err = cs.OperationLocks.GetModifyLock(volID); err != nil { + log.ErrorLog(ctx, err.Error()) + + return nil, status.Error(codes.Aborted, err.Error()) + } + defer cs.OperationLocks.ReleaseModifyLock(volID) + + // set RequestedVolSize, because calcQosBasedOnCapacity use it. + rbdVol.RequestedVolSize = rbdVol.VolSize + + err = rbdVol.modifyVolumeAttributes(ctx, mutableParameters) + if err != nil { + log.ErrorLog(ctx, "failed to modify volume: %s with error: %v", rbdVol, err) + + return nil, status.Error(codes.Internal, err.Error()) + } + + return &csi.ControllerModifyVolumeResponse{}, nil +} diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index 323bfe7b80c..736c196e3ce 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -118,6 +118,7 @@ func (r *rbdDriver) Run(conf *util.Config) { csi.ControllerServiceCapability_RPC_CLONE_VOLUME, csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, + csi.ControllerServiceCapability_RPC_MODIFY_VOLUME, }) // We only support the multi-writer option when using block, but it's a supported capability for the plugin in diff --git a/internal/rbd/qos.go b/internal/rbd/qos.go index 51e703e875f..c4a58002660 100644 --- a/internal/rbd/qos.go +++ b/internal/rbd/qos.go @@ -102,7 +102,7 @@ func HasQoSParams(params map[string]string) bool { } func parseQosParams( - scParams map[string]string, + mutableParameters map[string]string, ) map[string]*qosSpec { rbdQosParameters := map[string]*qosSpec{ baseIops: {iopsLimit, "", iopsPerGiB, "", maxIops, "", false}, @@ -112,14 +112,14 @@ func parseQosParams( baseReadBps: {readBpsLimit, "", readBpsPerGiB, "", maxReadBps, "", false}, baseWriteBps: {writeBpsLimit, "", writeBpsPerGiB, "", maxWriteBps, "", false}, } - for k, v := range scParams { + for k, v := range mutableParameters { if qos, ok := rbdQosParameters[k]; ok && v != "" { qos.baseLimit = v qos.present = true - if perGiBLimit, ok := scParams[qos.perGiBLimitType]; ok && perGiBLimit != "" { + if perGiBLimit, ok := mutableParameters[qos.perGiBLimitType]; ok && perGiBLimit != "" { qos.perGiBLimit = perGiBLimit } - if maxLimit, ok := scParams[qos.maxLimitType]; ok && maxLimit != "" { + if maxLimit, ok := mutableParameters[qos.maxLimitType]; ok && maxLimit != "" { qos.maxLimit = maxLimit } } @@ -130,14 +130,14 @@ func parseQosParams( func (rv *rbdVolume) SetQOS( ctx context.Context, - scParams map[string]string, + mutableParameters map[string]string, ) error { rv.BaseVolSize = "" - if v, ok := scParams[baseVolSizeBytes]; ok && v != "" { + if v, ok := mutableParameters[baseVolSizeBytes]; ok && v != "" { rv.BaseVolSize = v } - rbdQosParameters := parseQosParams(scParams) + rbdQosParameters := parseQosParams(mutableParameters) for _, qos := range rbdQosParameters { if qos.present { err := rv.calcQosBasedOnCapacity(ctx, qos) @@ -232,7 +232,7 @@ func (rv *rbdVolume) calcQosBasedOnCapacity( func (rv *rbdVolume) SaveQOS( ctx context.Context, - scParams map[string]string, + mutableParameters map[string]string, ) error { needSaveQosParameters := map[string]string{ baseIops: baseQosIopsLimit, @@ -255,7 +255,7 @@ func (rv *rbdVolume) SaveQOS( writeBpsPerGiB: writeBpsPerGiBLimit, baseVolSizeBytes: baseQosVolSize, } - for k, v := range scParams { + for k, v := range mutableParameters { if param, ok := needSaveQosParameters[k]; ok && v != "" { err := rv.SetMetadata(param, v) if err != nil { diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index 55784273951..a192ef8e838 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -2427,3 +2427,31 @@ func (rv *rbdVolume) getUsedBytes(ctx context.Context) (uint64, error) { return usedBytes, nil } + +func (rv *rbdVolume) modifyVolumeAttributes( + ctx context.Context, + newMutableParameters map[string]string, +) error { + image, err := rv.open() + if err != nil { + return err + } + defer image.Close() //nolint:errcheck // not a critical failure + + err = rv.SetQOS(ctx, newMutableParameters) + if err != nil { + return err + } + + err = rv.ApplyQOS(ctx) + if err != nil { + return err + } + + err = rv.SaveQOS(ctx, newMutableParameters) + if err != nil { + return err + } + + return nil +} diff --git a/internal/util/idlocker.go b/internal/util/idlocker.go index fb996b4dd53..9012e6ade93 100644 --- a/internal/util/idlocker.go +++ b/internal/util/idlocker.go @@ -79,6 +79,7 @@ const ( cloneOpt operation = "clone" restoreOp operation = "restore" expandOp operation = "expand" + modifyOp operation = "modify" ) // OperationLock implements a map with atomic operations. @@ -106,12 +107,25 @@ func NewOperationLock() *OperationLock { lock[cloneOpt] = make(map[string]int) lock[restoreOp] = make(map[string]int) lock[expandOp] = make(map[string]int) + lock[modifyOp] = make(map[string]int) return &OperationLock{ locks: lock, } } +// conflictMatrix defines which operations conflict with each other. +// The key is the operation being attempted, and the value is a list of +// operations that cannot be in progress. +var conflictMatrix = map[operation][]operation{ + cloneOpt: {expandOp, modifyOp}, + deleteOp: {expandOp, restoreOp, modifyOp}, + restoreOp: {deleteOp}, + expandOp: {deleteOp, cloneOpt, createOp, modifyOp}, + modifyOp: {deleteOp, cloneOpt, createOp, expandOp}, + // createOp has no conflicts according to the original logic. +} + // GetSnapshotCreateLock gets the snapshot lock on given volumeID. func (ol *OperationLock) GetSnapshotCreateLock(volumeID string) error { return ol.tryAcquire(createOp, volumeID) @@ -140,6 +154,10 @@ func (ol *OperationLock) GetExpandLock(volumeID string) error { return ol.tryAcquire(expandOp, volumeID) } +func (ol *OperationLock) GetModifyLock(volumeID string) error { + return ol.tryAcquire(modifyOp, volumeID) +} + // ReleaseSnapshotCreateLock releases the create lock on given volumeID. func (ol *OperationLock) ReleaseSnapshotCreateLock(volumeID string) { ol.release(createOp, volumeID) @@ -165,69 +183,31 @@ func (ol *OperationLock) ReleaseExpandLock(volumeID string) { ol.release(expandOp, volumeID) } +func (ol *OperationLock) ReleaseModifyLock(volumeID string) { + ol.release(modifyOp, volumeID) +} + // tryAcquire tries to acquire the lock for operating on volumeID and returns true if successful. // If another operation is already using volumeID, returns false. func (ol *OperationLock) tryAcquire(op operation, volumeID string) error { ol.mux.Lock() defer ol.mux.Unlock() - switch op { - case createOp: - // snapshot controller make sure the pvc which is the source for the - // snapshot request won't get deleted while snapshot is getting created, - // so we dont need to check for any ongoing delete operation here on the - // volume. - // increment the counter for snapshot create operation - val := ol.locks[createOp][volumeID] - ol.locks[createOp][volumeID] = val + 1 - case cloneOpt: - // During clone operation, controller make sure no pvc deletion happens on the - // referred PVC datasource, so we are safe from source PVC delete. - - // Check any expand operation is going on for given volume ID. - // if yes we need to return an error to avoid issues. - if _, ok := ol.locks[expandOp][volumeID]; ok { - return fmt.Errorf("an Expand operation with given id %s already exists", volumeID) - } - // increment the counter for clone operation - val := ol.locks[cloneOpt][volumeID] - ol.locks[cloneOpt][volumeID] = val + 1 - case deleteOp: - // During delete operation the volume should not be under expand, - // check any expand operation is going on for given volume ID - if _, ok := ol.locks[expandOp][volumeID]; ok { - return fmt.Errorf("an Expand operation with given id %s already exists", volumeID) - } - // check any restore operation is going on for given volume ID - if _, ok := ol.locks[restoreOp][volumeID]; ok { - return fmt.Errorf("a Restore operation with given id %s already exists", volumeID) - } - ol.locks[deleteOp][volumeID] = 1 - case restoreOp: - // During restore operation the volume should not be deleted - // check any delete operation is going on for given volume ID - if _, ok := ol.locks[deleteOp][volumeID]; ok { - return fmt.Errorf("a Delete operation with given id %s already exists", volumeID) - } - // increment the counter for restore operation - val := ol.locks[restoreOp][volumeID] - ol.locks[restoreOp][volumeID] = val + 1 - case expandOp: - // During expand operation the volume should not be deleted or cloned - // and there should not be a create operation also. - // check any delete operation is going on for given volume ID - if _, ok := ol.locks[deleteOp][volumeID]; ok { - return fmt.Errorf("a Delete operation with given id %s already exists", volumeID) - } - // check any clone operation is going on for given volume ID - if _, ok := ol.locks[cloneOpt][volumeID]; ok { - return fmt.Errorf("a Clone operation with given id %s already exists", volumeID) - } - // check any delete operation is going on for given volume ID - if _, ok := ol.locks[createOp][volumeID]; ok { - return fmt.Errorf("a Create operation with given id %s already exists", volumeID) + if conflictingOps, ok := conflictMatrix[op]; ok { + for _, conflictingOp := range conflictingOps { + if _, exists := ol.locks[conflictingOp][volumeID]; exists { + return fmt.Errorf("cannot acquire lock for %q, "+ + "an %q operation with given id %s already exists", + op, conflictingOp, volumeID) + } } - - ol.locks[expandOp][volumeID] = 1 + } + switch op { + case createOp, cloneOpt, restoreOp: + // These operations are counters. + ol.locks[op][volumeID]++ + case deleteOp, expandOp, modifyOp: + // These operations are flags (presence check). + ol.locks[op][volumeID] = 1 default: return fmt.Errorf("%v operation not supported", op) } @@ -240,7 +220,7 @@ func (ol *OperationLock) release(op operation, volumeID string) { ol.mux.Lock() defer ol.mux.Unlock() switch op { - case cloneOpt, createOp, expandOp, restoreOp, deleteOp: + case cloneOpt, createOp, expandOp, restoreOp, deleteOp, modifyOp: if val, ok := ol.locks[op][volumeID]; ok { // decrement the counter for operation ol.locks[op][volumeID] = val - 1