diff --git a/Makefile b/Makefile index 2dbaf655..fddb1384 100644 --- a/Makefile +++ b/Makefile @@ -72,6 +72,11 @@ IMG ?= $(IMAGE_TAG_BASE):v$(VERSION) # MOVER_IMG defines the image:tag used for the mover plugin image. MOVER_IMG ?= $(MOVER_IMAGE_TAG_BASE):v$(VERSION) +# CEPH_CSI_CONFIG_NAME is the name of the ceph-csi ConfigMap the operator reads. +CEPH_CSI_CONFIG_NAME ?= ceph-csi-config +# CEPH_CSI_CONFIG_NAMESPACE is the namespace of the ceph-csi ConfigMap. +CEPH_CSI_CONFIG_NAMESPACE ?= rook-ceph + # Mover container --build-arg flags MOVER_BUILD_ARGS = \ --build-arg CEPH_BASE_IMAGE=$(CEPH_BASE_IMAGE) \ @@ -294,7 +299,7 @@ docker-buildx-mover: ## Build and push docker image for the mover for cross-plat build-installer: manifests generate kustomize ## Generate a consolidated YAML with CRDs and deployment. mkdir -p dist cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG} - $(KUSTOMIZE) build config/default | sed 's|MOVER_IMAGE_PLACEHOLDER|${MOVER_IMG}|g' > dist/install.yaml + $(KUSTOMIZE) build config/default | sed 's|MOVER_IMAGE_PLACEHOLDER|${MOVER_IMG}|g; s|CEPH_CSI_CONFIG_NAME_PLACEHOLDER|${CEPH_CSI_CONFIG_NAME}|g; s|CEPH_CSI_CONFIG_NAMESPACE_PLACEHOLDER|${CEPH_CSI_CONFIG_NAMESPACE}|g' > dist/install.yaml ##@ Deployment @@ -313,7 +318,7 @@ uninstall: ## Uninstall VolSync CRDs from K8s cluster. .PHONY: deploy deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in ~/.kube/config. cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG} - $(KUSTOMIZE) build config/default | sed 's|MOVER_IMAGE_PLACEHOLDER|${MOVER_IMG}|g' | $(KUBECTL) apply -f - + $(KUSTOMIZE) build config/default | sed 's|MOVER_IMAGE_PLACEHOLDER|${MOVER_IMG}|g; s|CEPH_CSI_CONFIG_NAME_PLACEHOLDER|${CEPH_CSI_CONFIG_NAME}|g; s|CEPH_CSI_CONFIG_NAMESPACE_PLACEHOLDER|${CEPH_CSI_CONFIG_NAMESPACE}|g' | $(KUBECTL) apply -f - .PHONY: undeploy undeploy: kustomize ## Undeploy controller from the K8s cluster specified in ~/.kube/config. Call with ignore-not-found=true to ignore resource not found errors during deletion. diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 9d745f7e..a30730cc 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -59,6 +59,10 @@ spec: env: - name: MOVER_IMAGE value: MOVER_IMAGE_PLACEHOLDER + - name: CEPH_CSI_CONFIG_NAME + value: CEPH_CSI_CONFIG_NAME_PLACEHOLDER + - name: CEPH_CSI_CONFIG_NAMESPACE + value: CEPH_CSI_CONFIG_NAMESPACE_PLACEHOLDER ports: [] securityContext: allowPrivilegeEscalation: false @@ -81,18 +85,10 @@ spec: # More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ resources: limits: - cpu: 500m - memory: 128Mi + cpu: 1000m + memory: 512Mi requests: cpu: 10m memory: 64Mi - volumeMounts: - - name: ceph-csi-config - mountPath: /etc/ceph-csi-config - readOnly: true - volumes: - - name: ceph-csi-config - configMap: - name: ceph-csi-config serviceAccountName: controller-manager terminationGracePeriodSeconds: 10 diff --git a/config/manifests/bases/ceph-volsync-plugin-operator.clusterserviceversion.yaml b/config/manifests/bases/ceph-volsync-plugin-operator.clusterserviceversion.yaml index 6a70819b..e0fdd459 100644 --- a/config/manifests/bases/ceph-volsync-plugin-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/ceph-volsync-plugin-operator.clusterserviceversion.yaml @@ -8,7 +8,22 @@ metadata: namespace: placeholder spec: apiservicedefinitions: {} - customresourcedefinitions: {} + customresourcedefinitions: + required: + - description: |- + A ReplicationDestination is a VolSync resource that you can use to define the destination of a VolSync replication + or synchronization. + displayName: Replication Destination + kind: ReplicationDestination + name: replicationdestinations.volsync.backube + version: v1alpha1 + - description: |- + A ReplicationSource is a VolSync resource that you can use to define the source PVC and replication mover type, + enabling you to replicate or synchronize PVC data to a remote location. + displayName: Replication Source + kind: ReplicationSource + name: replicationsources.volsync.backube + version: v1alpha1 description: Ceph VolSync Plugin Operator acts on VolSync's Custom Resources but uses Ceph's internal mechanisms to perform significantly more efficient data replication. displayName: Ceph VolSync Plugin Operator diff --git a/internal/ceph/config/csiconfig.go b/internal/ceph/config/csiconfig.go index d4fc18f7..612d6ca3 100644 --- a/internal/ceph/config/csiconfig.go +++ b/internal/ceph/config/csiconfig.go @@ -71,20 +71,14 @@ const ( } }] */ -func readClusterInfo(pathToConfig, clusterID string) (*kubernetes.ClusterInfo, error) { +// ReadClusterInfoFromData parses raw JSON config data and returns the +// ClusterInfo for the given clusterID. Use this when the config content +// is already available (e.g. fetched from a ConfigMap via the k8s API). +func ReadClusterInfoFromData(data []byte, clusterID string) (*kubernetes.ClusterInfo, error) { var config []kubernetes.ClusterInfo - // #nosec - content, err := os.ReadFile(pathToConfig) - if err != nil { - err = fmt.Errorf("error fetching configuration for cluster ID %q: %w", clusterID, err) - - return nil, err - } - - err = json.Unmarshal(content, &config) - if err != nil { - return nil, fmt.Errorf("unmarshal failed (%w), raw buffer response: %s", err, string(content)) + if err := json.Unmarshal(data, &config); err != nil { + return nil, fmt.Errorf("unmarshal failed (%w), raw buffer response: %s", err, string(data)) } for i := range config { @@ -96,6 +90,16 @@ func readClusterInfo(pathToConfig, clusterID string) (*kubernetes.ClusterInfo, e return nil, fmt.Errorf("%w: %q", ErrConfigNotFound, clusterID) } +func readClusterInfo(pathToConfig, clusterID string) (*kubernetes.ClusterInfo, error) { + // #nosec + content, err := os.ReadFile(pathToConfig) + if err != nil { + return nil, fmt.Errorf("error fetching configuration for cluster ID %q: %w", clusterID, err) + } + + return ReadClusterInfoFromData(content, clusterID) +} + // Mons returns a comma separated MON list from the csi config // for the given clusterID. func Mons(pathToConfig, clusterID string) (string, error) { @@ -186,3 +190,29 @@ func GetCephFSControllerPublishSecretRef(pathToConfig, clusterID string) (string return secretRef.Name, secretRef.Namespace, nil } + +// GetRBDControllerPublishSecretRefFromData returns the secret name and +// namespace for RBD controller publish operations, parsing from raw JSON data. +func GetRBDControllerPublishSecretRefFromData(data []byte, clusterID string) (string, string, error) { + cluster, err := ReadClusterInfoFromData(data, clusterID) + if err != nil { + return "", "", err + } + + secretRef := cluster.RBD.ControllerPublishSecretRef + + return secretRef.Name, secretRef.Namespace, nil +} + +// GetCephFSControllerPublishSecretRefFromData returns the secret name and +// namespace for CephFS controller publish operations, parsing from raw JSON data. +func GetCephFSControllerPublishSecretRefFromData(data []byte, clusterID string) (string, string, error) { + cluster, err := ReadClusterInfoFromData(data, clusterID) + if err != nil { + return "", "", err + } + + secretRef := cluster.CephFS.ControllerPublishSecretRef + + return secretRef.Name, secretRef.Namespace, nil +} diff --git a/internal/ceph/config/csiconfig_test.go b/internal/ceph/config/csiconfig_test.go index f08e2ce1..a3a8b301 100644 --- a/internal/ceph/config/csiconfig_test.go +++ b/internal/ceph/config/csiconfig_test.go @@ -139,6 +139,94 @@ func TestCSIConfig(t *testing.T) { } } +func TestReadClusterInfoFromData(t *testing.T) { + t.Parallel() + data := []byte(`[{"clusterID":"c1","monitors":["mon1","mon2"]},{"clusterID":"c2","monitors":["mon3"]}]`) + + info, err := ReadClusterInfoFromData(data, "c1") + if err != nil { + t.Fatalf("ReadClusterInfoFromData() error: %v", err) + } + if info.ClusterID != "c1" { + t.Errorf("ClusterID = %q, want %q", info.ClusterID, "c1") + } + if len(info.Monitors) != 2 { + t.Errorf("Monitors count = %d, want 2", len(info.Monitors)) + } + + _, err = ReadClusterInfoFromData(data, "missing") + if err == nil { + t.Error("expected error for missing clusterID") + } + + _, err = ReadClusterInfoFromData([]byte("invalid"), "c1") + if err == nil { + t.Error("expected error for invalid JSON") + } +} + +func TestGetRBDControllerPublishSecretRefFromData(t *testing.T) { + t.Parallel() + csiConfig := []cephcsi.ClusterInfo{ + { + ClusterID: "cluster-1", + RBD: cephcsi.RBD{ + ControllerPublishSecretRef: corev1.SecretReference{ + Name: "rbd-secret-1", Namespace: "ceph-csi", + }, + }, + }, + } + data, err := json.Marshal(csiConfig) + if err != nil { + t.Fatalf("json.Marshal() error: %v", err) + } + + name, ns, err := GetRBDControllerPublishSecretRefFromData(data, "cluster-1") + if err != nil { + t.Fatalf("GetRBDControllerPublishSecretRefFromData() error: %v", err) + } + if name != "rbd-secret-1" || ns != "ceph-csi" { + t.Errorf("got (%q, %q), want (%q, %q)", name, ns, "rbd-secret-1", "ceph-csi") + } + + _, _, err = GetRBDControllerPublishSecretRefFromData(data, "missing") + if err == nil { + t.Error("expected error for missing clusterID") + } +} + +func TestGetCephFSControllerPublishSecretRefFromData(t *testing.T) { + t.Parallel() + csiConfig := []cephcsi.ClusterInfo{ + { + ClusterID: "cluster-1", + CephFS: cephcsi.CephFS{ + ControllerPublishSecretRef: corev1.SecretReference{ + Name: "cephfs-secret-1", Namespace: "ceph-csi", + }, + }, + }, + } + data, err := json.Marshal(csiConfig) + if err != nil { + t.Fatalf("json.Marshal() error: %v", err) + } + + name, ns, err := GetCephFSControllerPublishSecretRefFromData(data, "cluster-1") + if err != nil { + t.Fatalf("GetCephFSControllerPublishSecretRefFromData() error: %v", err) + } + if name != "cephfs-secret-1" || ns != "ceph-csi" { + t.Errorf("got (%q, %q), want (%q, %q)", name, ns, "cephfs-secret-1", "ceph-csi") + } + + _, _, err = GetCephFSControllerPublishSecretRefFromData(data, "missing") + if err == nil { + t.Error("expected error for missing clusterID") + } +} + func TestGetRBDControllerPublishSecretRef(t *testing.T) { t.Parallel() tests := []struct { diff --git a/internal/mover/ceph/builder.go b/internal/mover/ceph/builder.go index 46cd0bb4..b12eaec7 100644 --- a/internal/mover/ceph/builder.go +++ b/internal/mover/ceph/builder.go @@ -19,6 +19,7 @@ package mover import ( "flag" "fmt" + "os" "strings" volsyncv1alpha1 "github.com/backube/volsync/api/v1alpha1" @@ -48,6 +49,12 @@ const ( cephContainerImageFlag = "mover-image" cephContainerImageEnvVar = "MOVER_IMAGE" + // CSI config env vars for the operator to locate the ceph-csi ConfigMap + csiConfigNameEnvVar = "CEPH_CSI_CONFIG_NAME" + csiConfigNamespaceEnvVar = "CEPH_CSI_CONFIG_NAMESPACE" + defaultCSIConfigName = "ceph-csi-config" + defaultCSIConfigNS = "rook-ceph" + // External parameter keys // common parameters optStorageClassName = "storageClassName" @@ -67,8 +74,10 @@ const ( // Builder implements mover.Builder for the Ceph data mover. // It detects the CSI provider from the CR and constructs Mover instances. type Builder struct { - viper *viper.Viper // For unit tests to be able to override - global viper will be used by default in Register() - flags *flag.FlagSet // For unit tests to be able to override - global flags will be used by default in Register() + viper *viper.Viper // For unit tests to be able to override - global viper will be used by default in Register() + flags *flag.FlagSet // For unit tests to be able to override - global flags will be used by default in Register() + csiConfigName string + csiConfigNamespace string } var _ mover.Builder = &Builder{} @@ -88,9 +97,19 @@ func Register() error { // newBuilder initializes a Builder with the given viper and flag set, // registering the container image flag and env var binding. func newBuilder(v *viper.Viper, flags *flag.FlagSet) (*Builder, error) { + csiName := os.Getenv(csiConfigNameEnvVar) + if csiName == "" { + csiName = defaultCSIConfigName + } + csiNS := os.Getenv(csiConfigNamespaceEnvVar) + if csiNS == "" { + csiNS = defaultCSIConfigNS + } b := &Builder{ - viper: v, - flags: flags, + viper: v, + flags: flags, + csiConfigName: csiName, + csiConfigNamespace: csiNS, } // Set default ceph container image - will be used if both command line flag and env var are not set @@ -200,24 +219,26 @@ func (rb *Builder) FromSource(cl client.Client, logger logr.Logger, nil) m := &Mover{ - client: cl, - logger: logger.WithValues("method", "Ceph"), - eventRecorder: eventRecorder, - owner: source, - vh: vh, - saHandler: saHandler, - containerImage: rb.getCephContainerImage(), - moverType: mt, - key: secretKey, - address: address, - isSource: isSource, - paused: source.Spec.Paused, - mainPVCName: &source.Spec.SourcePVC, - privileged: privileged, - sourceStatus: source.Status.RsyncTLS, - latestMoverStatus: source.Status.LatestMoverStatus, - moverConfig: volsyncv1alpha1.MoverConfig{}, - options: source.Spec.External.Parameters, + client: cl, + logger: logger.WithValues("method", "Ceph"), + eventRecorder: eventRecorder, + owner: source, + vh: vh, + saHandler: saHandler, + containerImage: rb.getCephContainerImage(), + moverType: mt, + key: secretKey, + address: address, + isSource: isSource, + paused: source.Spec.Paused, + mainPVCName: &source.Spec.SourcePVC, + privileged: privileged, + sourceStatus: source.Status.RsyncTLS, + latestMoverStatus: source.Status.LatestMoverStatus, + moverConfig: volsyncv1alpha1.MoverConfig{}, + options: source.Spec.External.Parameters, + csiConfigName: rb.csiConfigName, + csiConfigNamespace: rb.csiConfigNamespace, } m.initCached() @@ -308,24 +329,26 @@ func (rb *Builder) FromDestination(cl client.Client, logger logr.Logger, } m := &Mover{ - client: cl, - logger: logger.WithValues("method", "Ceph"), - eventRecorder: eventRecorder, - owner: destination, - vh: vh, - saHandler: saHandler, - containerImage: rb.getCephContainerImage(), - moverType: mt, - key: keySecret, - isSource: isSource, - paused: destination.Spec.Paused, - mainPVCName: destPVCPtr, - cleanupTempPVC: false, - privileged: privileged, - destStatus: destination.Status.RsyncTLS, - latestMoverStatus: destination.Status.LatestMoverStatus, - moverConfig: volsyncv1alpha1.MoverConfig{}, - options: options, + client: cl, + logger: logger.WithValues("method", "Ceph"), + eventRecorder: eventRecorder, + owner: destination, + vh: vh, + saHandler: saHandler, + containerImage: rb.getCephContainerImage(), + moverType: mt, + key: keySecret, + isSource: isSource, + paused: destination.Spec.Paused, + mainPVCName: destPVCPtr, + cleanupTempPVC: false, + privileged: privileged, + destStatus: destination.Status.RsyncTLS, + latestMoverStatus: destination.Status.LatestMoverStatus, + moverConfig: volsyncv1alpha1.MoverConfig{}, + options: options, + csiConfigName: rb.csiConfigName, + csiConfigNamespace: rb.csiConfigNamespace, } m.initCached() diff --git a/internal/mover/ceph/mover.go b/internal/mover/ceph/mover.go index 4716e757..4d87e9b1 100644 --- a/internal/mover/ceph/mover.go +++ b/internal/mover/ceph/mover.go @@ -42,7 +42,7 @@ const ( containerNameRBD = "rbd-mover" containerNameCephFS = "cephfs-mover" - // Paths for ceph-csi config mounted in the operator + // csiConfigMountPath is the mount path for ceph-csi config in mover Job pods. csiConfigMountPath = "/etc/ceph-csi-config" // Volume name for ceph-csi secret @@ -97,6 +97,10 @@ type Mover struct { cleanupTempPVC bool // options are the external parameters from the CR spec. options map[string]string + // csiConfigName is the name of the ceph-csi ConfigMap to fetch via the k8s API. + csiConfigName string + // csiConfigNamespace is the namespace of the ceph-csi ConfigMap to fetch via the k8s API. + csiConfigNamespace string // Precomputed values derived from immutable fields. // Set once via initCached() after construction. @@ -197,21 +201,25 @@ func (m *Mover) Synchronize(ctx context.Context) (mover.Result, error) { } // clusterID is extracted from storageclass - clusterID, err := m.clusterIDFromStorageClass( - ctx, dataPVC.Spec.StorageClassName, - ) + clusterID, err := m.clusterIDFromStorageClass(ctx, dataPVC.Spec.StorageClassName) + if err != nil { + return mover.InProgress(), err + } + + // Fetch ceph-csi config once for both ConfigMap and Secret reconciliation + csiConfigData, err := m.fetchCSIConfigData(ctx) if err != nil { return mover.InProgress(), err } // Ensure ceph-csi ConfigMap in owner namespace - csiConfigMapName, err := m.ensureCephCSIConfigMap(ctx, clusterID) + csiConfigMapName, err := m.ensureCephCSIConfigMap(ctx, csiConfigData) if csiConfigMapName == nil || err != nil { return mover.InProgress(), err } // Ensure ceph-csi Secret in owner namespace - csiSecretName, err := m.ensureCephCSISecret(ctx, clusterID) + csiSecretName, err := m.ensureCephCSISecret(ctx, csiConfigData, clusterID) if csiSecretName == nil || err != nil { return mover.InProgress(), err } diff --git a/internal/mover/ceph/secrets.go b/internal/mover/ceph/secrets.go index 54dd7686..981d8661 100644 --- a/internal/mover/ceph/secrets.go +++ b/internal/mover/ceph/secrets.go @@ -21,7 +21,6 @@ import ( "crypto/rand" "encoding/hex" "fmt" - "os" "github.com/backube/volsync/controllers/utils" corev1 "k8s.io/api/core/v1" @@ -102,15 +101,29 @@ func (m *Mover) ensureSecrets(ctx context.Context) (*string, error) { return &keySecret.Name, nil } -// ensureCephCSIConfigMap reads the ceph-csi config files -// from the operator's mounted ConfigMap and creates a -// per-RS/RD ConfigMap in the owner's namespace. +// fetchCSIConfigData fetches the ceph-csi ConfigMap from the configured +// namespace via the k8s API and returns its data map. +func (m *Mover) fetchCSIConfigData(ctx context.Context) (map[string]string, error) { + srcCM := &corev1.ConfigMap{} + if err := m.client.Get(ctx, client.ObjectKey{ + Name: m.csiConfigName, + Namespace: m.csiConfigNamespace, + }, srcCM); err != nil { + return nil, fmt.Errorf("failed to get csi config ConfigMap %s/%s: %w", m.csiConfigNamespace, m.csiConfigName, err) + } + + if _, ok := srcCM.Data["config.json"]; !ok { + return nil, fmt.Errorf("config.json not found in ConfigMap %s/%s", m.csiConfigNamespace, m.csiConfigName) + } + + return srcCM.Data, nil +} + +// ensureCephCSIConfigMap creates a per-RS/RD ConfigMap in the owner's +// namespace from the pre-fetched ceph-csi config data. // TODO: filter the config for only the relevant(+mapped) // clusterID instead of copying everything. -func (m *Mover) ensureCephCSIConfigMap( - ctx context.Context, - _ string, -) (*string, error) { +func (m *Mover) ensureCephCSIConfigMap(ctx context.Context, csiConfigData map[string]string) (*string, error) { cmName := m.namePrefix + "csi-config-" + m.direction + "-" + m.owner.GetName() cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -121,92 +134,42 @@ func (m *Mover) ensureCephCSIConfigMap( logger := m.logger.WithValues("ConfigMap", cmName) data := make(map[string]string) - - // config.json is required - configPath := csiConfigMountPath + "/config.json" - configContent, err := os.ReadFile(configPath) - if err != nil { - return nil, fmt.Errorf("failed to read %s: %w", configPath, err) - } - data["config.json"] = string(configContent) + data["config.json"] = csiConfigData["config.json"] // cluster-mapping.json is optional - mappingPath := csiConfigMountPath + - "/cluster-mapping.json" - mappingContent, err := os.ReadFile(mappingPath) - if err != nil { - if !os.IsNotExist(err) { - return nil, fmt.Errorf("failed to read %s: %w", mappingPath, err) - } - logger.Info( - "cluster-mapping.json not found, skipping", - "path", mappingPath, - ) + if mappingJSON, ok := csiConfigData["cluster-mapping.json"]; ok { + data["cluster-mapping.json"] = mappingJSON } else { - data["cluster-mapping.json"] = - string(mappingContent) + logger.Info("cluster-mapping.json not found in source ConfigMap, skipping") } - op, err := ctrlutil.CreateOrUpdate( - ctx, m.client, cm, func() error { - if err := ctrl.SetControllerReference( - m.owner, cm, m.client.Scheme(), - ); err != nil { - logger.Error( - err, - utils.ErrUnableToSetControllerRef, - ) - return err - } - utils.SetOwnedByVolSync(cm) - cm.Data = data - return nil - }, - ) + op, err := ctrlutil.CreateOrUpdate(ctx, m.client, cm, func() error { + if err := ctrl.SetControllerReference(m.owner, cm, m.client.Scheme()); err != nil { + logger.Error(err, utils.ErrUnableToSetControllerRef) + return err + } + utils.SetOwnedByVolSync(cm) + cm.Data = data + return nil + }) if err != nil { logger.Error(err, "ConfigMap reconcile failed") return nil, err } - logger.V(1).Info( - "CSI ConfigMap reconciled", "operation", op, - ) + logger.V(1).Info("CSI ConfigMap reconciled", "operation", op) return &cmName, nil } -// ensureCephCSISecret extracts clusterID from the PVC, -// looks up the ceph admin secret ref from csi config, -// fetches it, and creates a copy in the owner's namespace. -func (m *Mover) ensureCephCSISecret( - ctx context.Context, - clusterID string, -) (*string, error) { - if m.mainPVCName == nil { - return nil, fmt.Errorf("mainPVCName is not set") - } - - // Get the source PVC to find its PV - srcPVC := &corev1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: *m.mainPVCName, - Namespace: m.owner.GetNamespace(), - }, - } - if err := m.client.Get( - ctx, client.ObjectKeyFromObject(srcPVC), srcPVC, - ); err != nil { - return nil, fmt.Errorf( - "failed to get PVC: %w", err, - ) - } - - // Look up the secret ref from csi config - getSecretRef := config.GetCephFSControllerPublishSecretRef +// ensureCephCSISecret looks up the ceph admin secret ref from the +// pre-fetched csi config data, fetches it, and creates a copy in +// the owner's namespace. +func (m *Mover) ensureCephCSISecret(ctx context.Context, csiConfigData map[string]string, clusterID string) (*string, error) { + getSecretRef := config.GetCephFSControllerPublishSecretRefFromData if m.moverType == constant.MoverRBD { - getSecretRef = config.GetRBDControllerPublishSecretRef + getSecretRef = config.GetRBDControllerPublishSecretRefFromData } - secretName, secretNS, err := - getSecretRef(config.CsiConfigFile, clusterID) + secretName, secretNS, err := getSecretRef([]byte(csiConfigData["config.json"]), clusterID) if err != nil { return nil, fmt.Errorf( "failed to get secret ref: %w", err, diff --git a/internal/mover/ceph/secrets_test.go b/internal/mover/ceph/secrets_test.go index fb694c1b..e7c6b83d 100644 --- a/internal/mover/ceph/secrets_test.go +++ b/internal/mover/ceph/secrets_test.go @@ -104,6 +104,92 @@ func TestEnsureSecrets_AutoGen(t *testing.T) { } } +func TestFetchCSIConfigData(t *testing.T) { + t.Parallel() + srcCM := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ceph-csi-config", + Namespace: "rook-ceph", + }, + Data: map[string]string{ + "config.json": `[{"clusterID":"test-cluster","monitors":["mon1"]}]`, + "cluster-mapping.json": `[{"source":"a","dest":"b"}]`, + }, + } + m := newTestMover(t, true, constant.MoverCephFS, srcCM) + ctx := t.Context() + + data, err := m.fetchCSIConfigData(ctx) + if err != nil { + t.Fatalf("fetchCSIConfigData() error: %v", err) + } + if data["config.json"] != srcCM.Data["config.json"] { + t.Errorf("config.json = %q, want %q", data["config.json"], srcCM.Data["config.json"]) + } + if data["cluster-mapping.json"] != srcCM.Data["cluster-mapping.json"] { + t.Errorf("cluster-mapping.json = %q, want %q", data["cluster-mapping.json"], srcCM.Data["cluster-mapping.json"]) + } +} + +func TestFetchCSIConfigData_MissingConfigJSON(t *testing.T) { + t.Parallel() + srcCM := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ceph-csi-config", + Namespace: "rook-ceph", + }, + Data: map[string]string{}, + } + m := newTestMover(t, true, constant.MoverCephFS, srcCM) + ctx := t.Context() + + _, err := m.fetchCSIConfigData(ctx) + if err == nil { + t.Error("expected error for missing config.json") + } +} + +func TestFetchCSIConfigData_MissingSourceCM(t *testing.T) { + t.Parallel() + m := newTestMover(t, true, constant.MoverCephFS) + ctx := t.Context() + + _, err := m.fetchCSIConfigData(ctx) + if err == nil { + t.Error("expected error for missing source ConfigMap") + } +} + +func TestEnsureCephCSIConfigMap(t *testing.T) { + t.Parallel() + srcData := map[string]string{ + "config.json": `[{"clusterID":"test-cluster","monitors":["mon1"]}]`, + "cluster-mapping.json": `[{"source":"a","dest":"b"}]`, + } + m := newTestMover(t, true, constant.MoverCephFS) + ctx := t.Context() + + got, err := m.ensureCephCSIConfigMap(ctx, srcData) + if err != nil { + t.Fatalf("ensureCephCSIConfigMap() error: %v", err) + } + if got == nil { + t.Fatal("ensureCephCSIConfigMap() returned nil") + } + + // Verify the per-RS/RD ConfigMap was created with correct data + cm := &corev1.ConfigMap{} + if err := m.client.Get(ctx, client.ObjectKey{Name: *got, Namespace: "test-ns"}, cm); err != nil { + t.Fatalf("Get() error: %v", err) + } + if cm.Data["config.json"] != srcData["config.json"] { + t.Errorf("config.json = %q, want %q", cm.Data["config.json"], srcData["config.json"]) + } + if cm.Data["cluster-mapping.json"] != srcData["cluster-mapping.json"] { + t.Errorf("cluster-mapping.json = %q, want %q", cm.Data["cluster-mapping.json"], srcData["cluster-mapping.json"]) + } +} + func TestClusterIDFromStorageClass(t *testing.T) { t.Parallel() sc := &storagev1.StorageClass{ diff --git a/internal/mover/ceph/testhelpers_test.go b/internal/mover/ceph/testhelpers_test.go index d4a399b2..a5699dee 100644 --- a/internal/mover/ceph/testhelpers_test.go +++ b/internal/mover/ceph/testhelpers_test.go @@ -78,17 +78,19 @@ func newTestMover( pvcName := "test-pvc" m := &Mover{ - client: fakeClient, - logger: logr.Discard(), - eventRecorder: &fakeEventRecorder{}, - owner: owner, - isSource: isSource, - moverType: moverType, - mainPVCName: &pvcName, - sourceStatus: &volsyncv1alpha1.ReplicationSourceRsyncTLSStatus{}, - destStatus: &volsyncv1alpha1.ReplicationDestinationRsyncTLSStatus{}, - latestMoverStatus: &volsyncv1alpha1.MoverStatus{}, - options: map[string]string{}, + client: fakeClient, + logger: logr.Discard(), + eventRecorder: &fakeEventRecorder{}, + owner: owner, + isSource: isSource, + moverType: moverType, + mainPVCName: &pvcName, + sourceStatus: &volsyncv1alpha1.ReplicationSourceRsyncTLSStatus{}, + destStatus: &volsyncv1alpha1.ReplicationDestinationRsyncTLSStatus{}, + latestMoverStatus: &volsyncv1alpha1.MoverStatus{}, + options: map[string]string{}, + csiConfigName: "ceph-csi-config", + csiConfigNamespace: "rook-ceph", } m.initCached() diff --git a/test/e2e/helpers_test.go b/test/e2e/helpers_test.go index b7d02fc1..a28804e3 100644 --- a/test/e2e/helpers_test.go +++ b/test/e2e/helpers_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -284,24 +285,36 @@ func createVolumeSnapshot(ctx context.Context, name, pvcName, volumeSnapshotClas func updateManualTrigger(ctx context.Context, rsName, rdName, newID string) { By("updating manual trigger to " + newID) - rs := &volsyncv1alpha1.ReplicationSource{} - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: rsName, Namespace: namespace}, rs)).To(Succeed()) - rs.Spec.Trigger.Manual = newID - Expect(k8sClient.Update(ctx, rs)).To(Succeed()) + Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error { + rs := &volsyncv1alpha1.ReplicationSource{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: rsName, Namespace: namespace}, rs); err != nil { + return err + } + rs.Spec.Trigger.Manual = newID + return k8sClient.Update(ctx, rs) + })).To(Succeed()) - rd := &volsyncv1alpha1.ReplicationDestination{} - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: rdName, Namespace: namespace}, rd)).To(Succeed()) - rd.Spec.Trigger.Manual = newID - Expect(k8sClient.Update(ctx, rd)).To(Succeed()) + Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error { + rd := &volsyncv1alpha1.ReplicationDestination{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: rdName, Namespace: namespace}, rd); err != nil { + return err + } + rd.Spec.Trigger.Manual = newID + return k8sClient.Update(ctx, rd) + })).To(Succeed()) } func updateRSManualTrigger(ctx context.Context, rsName, newID string) { By("updating RS manual trigger to " + newID) - rs := &volsyncv1alpha1.ReplicationSource{} - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: rsName, Namespace: namespace}, rs)).To(Succeed()) - rs.Spec.Trigger.Manual = newID - Expect(k8sClient.Update(ctx, rs)).To(Succeed()) + Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error { + rs := &volsyncv1alpha1.ReplicationSource{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: rsName, Namespace: namespace}, rs); err != nil { + return err + } + rs.Spec.Trigger.Manual = newID + return k8sClient.Update(ctx, rs) + })).To(Succeed()) } func setRSPaused(ctx context.Context, rsName string, paused bool) { @@ -312,10 +325,14 @@ func setRSPaused(ctx context.Context, rsName string, paused bool) { By(action + " ReplicationSource " + rsName) - rs := &volsyncv1alpha1.ReplicationSource{} - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: rsName, Namespace: namespace}, rs)).To(Succeed()) - rs.Spec.Paused = paused - Expect(k8sClient.Update(ctx, rs)).To(Succeed()) + Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error { + rs := &volsyncv1alpha1.ReplicationSource{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: rsName, Namespace: namespace}, rs); err != nil { + return err + } + rs.Spec.Paused = paused + return k8sClient.Update(ctx, rs) + })).To(Succeed()) } // waitForSyncTime waits for RS.Status.LastSyncTime to be non-nil.