diff --git a/CHANGELOG.md b/CHANGELOG.md index 06a59e6ae9d..fe1a21d95c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **General**: Check updated status for Fallback condition instead of ScaledObject ([#7488](https://github.com/kedacore/keda/issues/7488)) - **General**: Fix int64 overflow in milli-quantity conversion for very large metric values ([#7441](https://github.com/kedacore/keda/issues/7441)) - **General**: Fix ScaledObject admission webhook to return validation error from `verifyReplicaCount`, preventing invalid ScaledObjects from being created ([#5954](https://github.com/kedacore/keda/issues/5954)) +- **General**: Handle paused scaling directly in reconciler ([#7663](https://github.com/kedacore/keda/issues/7663)) - **Azure Data Explorer Scaler**: Remove clientSecretFromEnv support ([#7554](https://github.com/kedacore/keda/pull/7554)) - **Cron Scaler**: Fix metric name generation so cron expressions with comma-separated values no longer produce invalid metric names ([#7448](https://github.com/kedacore/keda/issues/7448)) - **Forgejo Scaler**: Limit HTTP error response logging ([#7469](https://github.com/kedacore/keda/pull/7469)) diff --git a/apis/keda/v1alpha1/scaledjob_types.go b/apis/keda/v1alpha1/scaledjob_types.go index 1523e71cff0..889acfd0a8f 100644 --- a/apis/keda/v1alpha1/scaledjob_types.go +++ b/apis/keda/v1alpha1/scaledjob_types.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha1 import ( + "strconv" + batchv1 "k8s.io/api/batch/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -169,6 +171,19 @@ func (s *ScaledJob) GenerateIdentifier() string { return GenerateIdentifier("ScaledJob", s.Namespace, s.Name) } +// NeedToBePausedByAnnotation checks whether this ScaledJob should be paused based on the PausedAnnotation. +func (s *ScaledJob) NeedToBePausedByAnnotation() bool { + value, found := s.GetAnnotations()[PausedAnnotation] + if !found { + return false + } + boolVal, err := strconv.ParseBool(value) + if err != nil { + return true + } + return boolVal +} + // GetStatusConditions returns a pointer to the status conditions for in-place modification. func (s *ScaledJob) GetStatusConditions() *Conditions { return &s.Status.Conditions } diff --git a/apis/keda/v1alpha1/scaledobject_types.go b/apis/keda/v1alpha1/scaledobject_types.go index d151fec2876..b53f2f86c68 100644 --- a/apis/keda/v1alpha1/scaledobject_types.go +++ b/apis/keda/v1alpha1/scaledobject_types.go @@ -278,6 +278,21 @@ func getBoolAnnotation(so *ScaledObject, annotation string) bool { return boolVal } +// GetPausedReplicaCount returns the paused replica count from the annotation, nil if not present. +func (so *ScaledObject) GetPausedReplicaCount() (*int32, error) { + if so.Annotations != nil { + if val, ok := so.Annotations[PausedReplicasAnnotation]; ok { + conv, err := strconv.ParseInt(val, 10, 32) + if err != nil { + return nil, err + } + count := int32(conv) + return &count, nil + } + } + return nil, nil +} + // IsUsingModifiers determines whether scalingModifiers are defined or not func (so *ScaledObject) IsUsingModifiers() bool { return so.Spec.Advanced != nil && !reflect.DeepEqual(so.Spec.Advanced.ScalingModifiers, ScalingModifiers{}) diff --git a/apis/keda/v1alpha1/scaledobject_types_test.go b/apis/keda/v1alpha1/scaledobject_types_test.go index 756e2e59bd7..a97262867d9 100644 --- a/apis/keda/v1alpha1/scaledobject_types_test.go +++ b/apis/keda/v1alpha1/scaledobject_types_test.go @@ -643,6 +643,92 @@ func TestGetHPAReplicas(t *testing.T) { } } +func TestGetPausedReplicaCount(t *testing.T) { + tests := []struct { + name string + so *ScaledObject + wantNil bool + wantValue int32 + wantErr bool + }{ + { + name: "nil annotations returns nil", + so: &ScaledObject{}, + wantNil: true, + }, + { + name: "annotation absent returns nil", + so: func() *ScaledObject { + so := &ScaledObject{} + so.Annotations = map[string]string{"other-annotation": "value"} + return so + }(), + wantNil: true, + }, + { + name: "valid integer annotation returns pointer to value", + so: func() *ScaledObject { + so := &ScaledObject{} + so.Annotations = map[string]string{ + PausedReplicasAnnotation: "5", + } + return so + }(), + wantNil: false, + wantValue: 5, + }, + { + name: "zero value annotation returns pointer to zero", + so: func() *ScaledObject { + so := &ScaledObject{} + so.Annotations = map[string]string{ + PausedReplicasAnnotation: "0", + } + return so + }(), + wantNil: false, + wantValue: 0, + }, + { + name: "invalid string annotation returns error", + so: func() *ScaledObject { + so := &ScaledObject{} + so.Annotations = map[string]string{ + PausedReplicasAnnotation: "not-a-number", + } + return so + }(), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := tt.so.GetPausedReplicaCount() + if tt.wantErr { + if err == nil { + t.Error("Expected error but got nil") + } + return + } + if err != nil { + t.Errorf("Expected no error but got: %v", err) + return + } + if tt.wantNil { + if result != nil { + t.Errorf("Expected nil but got %d", *result) + } + } else { + if result == nil { + t.Error("Expected non-nil result but got nil") + } else if *result != tt.wantValue { + t.Errorf("Expected %d but got %d", tt.wantValue, *result) + } + } + }) + } +} + // Helper function to check if a string contains a substring func contains(s, substr string) bool { return strings.Contains(s, substr) diff --git a/controllers/keda/hpa.go b/controllers/keda/hpa.go index 3314444faf2..5c2533f592d 100644 --- a/controllers/keda/hpa.go +++ b/controllers/keda/hpa.go @@ -33,7 +33,6 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util" - "github.com/kedacore/keda/v2/pkg/scaling/executor" kedastatus "github.com/kedacore/keda/v2/pkg/status" version "github.com/kedacore/keda/v2/version" ) @@ -162,19 +161,6 @@ func (r *ScaledObjectReconciler) newHPAForScaledObject(ctx context.Context, logg minReplicas := scaledObject.GetHPAMinReplicas() maxReplicas := scaledObject.GetHPAMaxReplicas() - pausedCount, err := executor.GetPausedReplicaCount(scaledObject) - if err != nil { - return nil, err - } - if pausedCount != nil { - // MinReplicas on HPA can't be 0 - if *pausedCount == 0 { - *pausedCount = 1 - } - minReplicas = pausedCount - maxReplicas = *pausedCount - } - hpa := &autoscalingv2.HorizontalPodAutoscaler{ Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ MinReplicas: minReplicas, diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index 755a85e33fb..2812c3310b0 100755 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -227,35 +227,29 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log return "ScaledJob is defined correctly and is ready to scaling", nil } -// checkIfPaused checks the presence of "autoscaling.keda.sh/paused" annotation on the scaledJob and stop the scale loop. +// checkIfPaused checks the presence of "autoscaling.keda.sh/paused" annotation on the scaledJob and stops the scale loop. func (r *ScaledJobReconciler) checkIfPaused(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, conditions *kedav1alpha1.Conditions) (bool, error) { - pausedAnnotationValue, pausedAnnotation := scaledJob.GetAnnotations()[kedav1alpha1.PausedAnnotation] - pausedStatus := conditions.GetPausedCondition().Status == metav1.ConditionTrue - shouldPause := false - if pausedAnnotation { - var err error - shouldPause, err = strconv.ParseBool(pausedAnnotationValue) - if err != nil { - shouldPause = true - } - } + shouldPause := scaledJob.NeedToBePausedByAnnotation() + isPausedInStatus := conditions.GetPausedCondition().Status == metav1.ConditionTrue + if shouldPause { - if !pausedStatus { - logger.Info("ScaledJob is paused, stopping scaling loop.") + // Set Paused condition before stopping scale loop to prevent races with new reconciles + if !isPausedInStatus { msg := kedav1alpha1.ScaledJobConditionPausedMessage - if err := r.stopScaleLoop(ctx, logger, scaledJob); err != nil { - msg = "failed to stop the scale loop for paused ScaledJob" - conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledJobStopScaleLoopFailed", msg) - r.EventEmitter.Emit(scaledJob, scaledJob.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledJobFailedType, eventreason.ScaledJobPauseFailed, msg) + conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledJobConditionPausedReason, msg) + if err := kedastatus.SetStatusConditions(ctx, r.Client, logger, scaledJob, conditions); err != nil { return false, err } - conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledJobConditionPausedReason, msg) r.EventEmitter.Emit(scaledJob, scaledJob.Namespace, corev1.EventTypeNormal, eventingv1alpha1.ScaledJobPausedType, eventreason.ScaledJobPaused, msg) } + + if err := r.stopScaleLoop(ctx, logger, scaledJob); err != nil { + return false, err + } return true, nil } - if pausedStatus { - logger.Info("Unpausing ScaledJob.") + + if isPausedInStatus { msg := kedav1alpha1.ScaledJobConditionUnpausedMessage conditions.SetPausedCondition(metav1.ConditionFalse, kedav1alpha1.ScaledJobConditionUnpausedReason, msg) r.EventEmitter.Emit(scaledJob, scaledJob.Namespace, corev1.EventTypeNormal, eventingv1alpha1.ScaledJobUnpausedType, eventreason.ScaledJobUnpaused, msg) diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index 55e292780e5..f94ebb6b071 100755 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -19,7 +19,7 @@ package keda import ( "context" "fmt" - "strconv" + "reflect" "sync" "github.com/go-logr/logr" @@ -242,53 +242,26 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg switch { case needsToPause: - scaledToPausedCount := true - if isPausedInStatus { - // If scaledobject is in paused condition but replica count is not equal to paused replica count, the following scaling logic needs to be trigger again. - scaledToPausedCount = r.checkIfTargetResourceReachPausedCount(ctx, logger, scaledObject) - if scaledToPausedCount { - return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil - } - } + return r.handlePause(ctx, logger, scaledObject, conditions, isPausedInStatus) - if scaledToPausedCount { - msg := kedav1alpha1.ScaledObjectConditionPausedMessage - - if !isPausedInStatus { - // Write status FIRST before any operations that might trigger new reconciles - logger.Info("Setting Paused condition before stopping scale loop") - conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionPausedReason, msg) - if err := kedastatus.SetStatusConditions(ctx, r.Client, logger, scaledObject, conditions); err != nil { - return "failed to update paused status", err - } - } - - // Now safe to perform operations - any triggered reconcile will see paused status - if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil { - msg = "failed to stop the scale loop for paused ScaledObject" - return msg, err - } - if deleted, err := r.ensureHPAForScaledObjectIsDeleted(ctx, logger, scaledObject); !deleted { - msg = "failed to delete HPA for paused ScaledObject" - return msg, err - } - - // Condition already set above, just ensure it's still set in conditions object - conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionPausedReason, msg) - if !isPausedInStatus { - r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectPausedType, eventreason.ScaledObjectPaused, kedav1alpha1.ScaledObjectConditionPausedMessage) - } - return msg, nil - } case scaledObject.NeedToPauseScaleIn() || scaledObject.NeedToPauseScaleOut(): conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionPausedReason, kedav1alpha1.ScaledObjectConditionPausedMessage) if !isPausedInStatus { r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectPausedType, eventreason.ScaledObjectPaused, kedav1alpha1.ScaledObjectConditionPausedMessage) } + case isPausedInStatus: unpausedMessage := "pause annotation removed for ScaledObject" conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledObjectUnpaused", unpausedMessage) r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectUnpausedType, eventreason.ScaledObjectUnpaused, unpausedMessage) + // Explicitly clear PausedReplicaCount from status + if scaledObject.Status.PausedReplicaCount != nil { + status := scaledObject.Status.DeepCopy() + status.PausedReplicaCount = nil + if err := kedastatus.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status); err != nil { + return "failed to clear paused replica count status", err + } + } } // Check scale target Name is specified @@ -347,9 +320,6 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg } logger.Info("Initializing Scaling logic according to ScaledObject Specification") } - if scaledObject.NeedToBePausedByAnnotation() && conditions.GetPausedCondition().Status != metav1.ConditionTrue { - return "ScaledObject paused replicas are being scaled", fmt.Errorf("ScaledObject paused replicas are being scaled") - } return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil } @@ -370,32 +340,89 @@ func (r *ScaledObjectReconciler) ensureScaledObjectLabel(ctx context.Context, lo return r.Client.Update(ctx, scaledObject) } -func (r *ScaledObjectReconciler) checkIfTargetResourceReachPausedCount(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) bool { - pausedReplicaCount, pausedReplicasAnnotationFound := scaledObject.GetAnnotations()[kedav1alpha1.PausedReplicasAnnotation] - if !pausedReplicasAnnotationFound { - return true +// handlePause stops scaling and sets the target to the paused replica count if specified. +func (r *ScaledObjectReconciler) handlePause(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, conditions *kedav1alpha1.Conditions, isPausedInStatus bool) (string, error) { + msg := kedav1alpha1.ScaledObjectConditionPausedMessage + + // Set Paused condition before any operations to prevent races with new reconciles + if !isPausedInStatus { + logger.Info("Setting Paused condition before stopping scale loop") + conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionPausedReason, msg) + if err := kedastatus.SetStatusConditions(ctx, r.Client, logger, scaledObject, conditions); err != nil { + return "failed to update paused status", err + } + r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectPausedType, eventreason.ScaledObjectPaused, msg) + } + + if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil { + return "failed to stop the scale loop for paused ScaledObject", err + } + + if deleted, err := r.ensureHPAForScaledObjectIsDeleted(ctx, logger, scaledObject); !deleted { + return "failed to delete HPA for paused ScaledObject", err + } + + if err := r.scaleTargetToPausedCount(ctx, logger, scaledObject); err != nil { + return "failed to scale target to paused replica count", err + } + + if err := r.syncPausedReplicaCountStatus(ctx, logger, scaledObject); err != nil { + return "failed to update paused replica count status", err } - pausedReplicaCountNum, err := strconv.ParseInt(pausedReplicaCount, 10, 32) + + conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionPausedReason, msg) + return msg, nil +} + +// syncPausedReplicaCountStatus updates PausedReplicaCount in status if it differs from the annotation. +func (r *ScaledObjectReconciler) syncPausedReplicaCountStatus(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { + pausedCount, err := scaledObject.GetPausedReplicaCount() + if err != nil { + return fmt.Errorf("failed to parse paused replica count: %w", err) + } + if !reflect.DeepEqual(scaledObject.Status.PausedReplicaCount, pausedCount) { + status := scaledObject.Status.DeepCopy() + status.PausedReplicaCount = pausedCount + if err := kedastatus.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status); err != nil { + return err + } + } + return nil +} + +// scaleTargetToPausedCount scales the target directly to the paused replica count, no-op if only paused=true is set. +func (r *ScaledObjectReconciler) scaleTargetToPausedCount(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { + pausedCount, err := scaledObject.GetPausedReplicaCount() if err != nil { - return true + return fmt.Errorf("failed to parse paused replica count: %w", err) + } + if pausedCount == nil { + // paused=true without specific count — freeze at current replicas, nothing to scale + return nil } gvkr, err := kedav1alpha1.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind) if err != nil { - logger.Error(err, "failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind) - return true + return fmt.Errorf("failed to parse Group, Version, Kind, Resource: %w", err) } - gvkString := gvkr.GVKString() - logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkString, "Resource", gvkr.Resource) - // check if we already know. - var scale *autoscalingv1.Scale gr := gvkr.GroupResource() - scale, errScale := (r.ScaleClient).Scales(scaledObject.Namespace).Get(ctx, gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) - if errScale != nil { - return true + scale, err := r.ScaleClient.Scales(scaledObject.Namespace).Get(ctx, gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get scale for target resource: %w", err) } - return scale.Spec.Replicas == int32(pausedReplicaCountNum) + + if scale.Spec.Replicas == *pausedCount { + return nil + } + + logger.Info("Scaling target to paused replica count", "currentReplicas", scale.Spec.Replicas, "pausedReplicas", *pausedCount) + scale.Spec.Replicas = *pausedCount + _, err = r.ScaleClient.Scales(scaledObject.Namespace).Update(ctx, gr, scale, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to scale target to paused replica count: %w", err) + } + return nil } // checkTargetResourceIsScalable checks if resource targeted for scaling exists and exposes /scale subresource diff --git a/pkg/scaling/executor/scale_executor_helpers_test.go b/pkg/scaling/executor/scale_executor_helpers_test.go index 201a25a6bdb..d41bfe143e1 100644 --- a/pkg/scaling/executor/scale_executor_helpers_test.go +++ b/pkg/scaling/executor/scale_executor_helpers_test.go @@ -83,72 +83,6 @@ func TestGetIdleOrMinimumReplicaCount(t *testing.T) { } } -func TestGetPausedReplicaCount(t *testing.T) { - tests := []struct { - name string - scaledObject *kedav1alpha1.ScaledObject - wantNil bool - wantValue int32 - wantErr bool - }{ - { - name: "nil annotations returns nil", - scaledObject: &kedav1alpha1.ScaledObject{ - Spec: kedav1alpha1.ScaledObjectSpec{}, - }, - wantNil: true, - }, - { - name: "annotation absent returns nil", - scaledObject: func() *kedav1alpha1.ScaledObject { - so := &kedav1alpha1.ScaledObject{} - so.Annotations = map[string]string{"other-annotation": "value"} - return so - }(), - wantNil: true, - }, - { - name: "valid integer annotation returns pointer to value", - scaledObject: func() *kedav1alpha1.ScaledObject { - so := &kedav1alpha1.ScaledObject{} - so.Annotations = map[string]string{ - kedav1alpha1.PausedReplicasAnnotation: "5", - } - return so - }(), - wantNil: false, - wantValue: 5, - }, - { - name: "invalid string annotation returns error", - scaledObject: func() *kedav1alpha1.ScaledObject { - so := &kedav1alpha1.ScaledObject{} - so.Annotations = map[string]string{ - kedav1alpha1.PausedReplicasAnnotation: "not-a-number", - } - return so - }(), - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result, err := GetPausedReplicaCount(tt.scaledObject) - if tt.wantErr { - assert.Error(t, err) - return - } - assert.NoError(t, err) - if tt.wantNil { - assert.Nil(t, result) - } else { - assert.NotNil(t, result) - assert.Equal(t, tt.wantValue, *result) - } - }) - } -} - func TestIsJobFinished(t *testing.T) { e := &scaleExecutor{} diff --git a/pkg/scaling/executor/scale_scaledobjects.go b/pkg/scaling/executor/scale_scaledobjects.go index 4b406a80698..bae3b1b756e 100644 --- a/pkg/scaling/executor/scale_scaledobjects.go +++ b/pkg/scaling/executor/scale_scaledobjects.go @@ -19,7 +19,6 @@ package executor import ( "context" "fmt" - "strconv" "strings" "time" @@ -49,8 +48,8 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al return result } - // check if SO is paused, and if it is then update the scale to the desired count and return early without performing any other checks or updates - if e.handlePaused(ctx, scaledObject, currentReplicas, &result) { + // Return early if paused to skip normal scaling logic + if e.handlePaused(scaledObject, &result) { return result } // if scaledObject.Spec.MinReplicaCount is not set, then set the default value (0) @@ -242,32 +241,13 @@ func (e *scaleExecutor) updateScaleOnScaleTarget(ctx context.Context, scaledObje return currentReplicas, err } -// handlePaused checks if the ScaledObject is paused and if it is, it scales the target to the paused replica count and returns true. If the ScaledObject is not paused, it returns false. -func (e *scaleExecutor) handlePaused(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, currentReplicas int32, scaleResult *ScaleResult) bool { - pausedCount, err := GetPausedReplicaCount(scaledObject) - if err != nil { - scaleResult.Conditions.SetReadyCondition(metav1.ConditionFalse, "ErrorPausing", fmt.Sprintf("Failed to pause: %v", err)) - scaleResult.Conditions.SetPausedCondition(metav1.ConditionTrue, "ErrorGettingPausedReplicas", fmt.Sprintf("Paused annotation present but failed to determine paused replica count: %v", err)) - scaleResult.Error = fmt.Errorf("paused annotation present but failed to determine paused replica count: %w", err) +// handlePaused skips normal scaling logic while the ScaledObject is paused. +func (e *scaleExecutor) handlePaused(scaledObject *kedav1alpha1.ScaledObject, scaleResult *ScaleResult) bool { + if scaledObject.NeedToBePausedByAnnotation() { + scaleResult.Conditions.SetPausedCondition(metav1.ConditionTrue, "ScaledObjectPaused", "ScaledObject is paused") return true } - if pausedCount == nil { - // ScaledObject is not paused by replica annotation; the controller manages the Paused condition lifecycle, so we leave it untouched here. - return false - } - - // ScaledObject is paused, scale to the paused replica count and set status accordingly - scaleResult.PauseReplicas = pausedCount - if *pausedCount != currentReplicas { - if _, err := e.updateScaleOnScaleTarget(ctx, scaledObject, *pausedCount); err != nil { - scaleResult.Conditions.SetReadyCondition(metav1.ConditionFalse, "ErrorScalingToPausedReplicas", fmt.Sprintf("ScaledObject is paused but failed to scale to paused replica count: %v", err)) - scaleResult.Conditions.SetPausedCondition(metav1.ConditionTrue, "ScaledObjectPaused", "ScaledObject is paused") - scaleResult.Error = fmt.Errorf("ScaledObject is paused but failed to scale to paused replica count: %w", err) - return true - } - } - scaleResult.Conditions.SetPausedCondition(metav1.ConditionTrue, "ScaledObjectPaused", "ScaledObject is paused") - return true + return false } // getIdleOrMinimumReplicaCount returns true if the second value returned is from IdleReplicaCount @@ -283,19 +263,3 @@ func getIdleOrMinimumReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (bool return false, *scaledObject.Spec.MinReplicaCount } - -// GetPausedReplicaCount returns the paused replica count of the ScaledObject. -// If not paused, it returns nil. -func GetPausedReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (*int32, error) { - if scaledObject.Annotations != nil { - if val, ok := scaledObject.Annotations[kedav1alpha1.PausedReplicasAnnotation]; ok { - conv, err := strconv.ParseInt(val, 10, 32) - if err != nil { - return nil, err - } - count := int32(conv) - return &count, nil - } - } - return nil, nil -} diff --git a/pkg/scaling/executor/scale_scaledobjects_test.go b/pkg/scaling/executor/scale_scaledobjects_test.go index 3cc7208a44a..5bb59fc0ae5 100644 --- a/pkg/scaling/executor/scale_scaledobjects_test.go +++ b/pkg/scaling/executor/scale_scaledobjects_test.go @@ -18,7 +18,6 @@ package executor import ( "context" - "strconv" "testing" "github.com/stretchr/testify/assert" @@ -329,19 +328,15 @@ func TestScaleToPausedReplicasCount(t *testing.T) { client := mock_client.NewMockClient(ctrl) recorder := record.NewFakeRecorder(1) mockScaleClient := mock_scale.NewMockScalesGetter(ctrl) - mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl) scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder) - pausedReplicaCount := int32(0) - replicaCount := int32(2) - scaledObject := v1alpha1.ScaledObject{ ObjectMeta: v1.ObjectMeta{ Name: "name", Namespace: "namespace", Annotations: map[string]string{ - "autoscaling.keda.sh/paused-replicas": strconv.Itoa(int(pausedReplicaCount)), + "autoscaling.keda.sh/paused-replicas": "0", }, }, Spec: v1alpha1.ScaledObjectSpec{ @@ -359,29 +354,24 @@ func TestScaleToPausedReplicasCount(t *testing.T) { scaledObject.Status.Conditions = *v1alpha1.GetInitializedConditions() + // GetCurrentReplicas is called before handlePaused, so we need a mock for it. + replicaCount := int32(2) client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).SetArg(2, appsv1.Deployment{ Spec: appsv1.DeploymentSpec{ Replicas: &replicaCount, }, }) - scale := &autoscalingv1.Scale{ - Spec: autoscalingv1.ScaleSpec{ - Replicas: replicaCount, - }, - } - - mockScaleClient.EXPECT().Scales(gomock.Any()).Return(mockScaleInterface).Times(2) - mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil) - mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any()) - result := scaleExecutor.RequestScale(context.TODO(), &scaledObject, true, false, ScaleExecutorOptions{}) - assert.Equal(t, pausedReplicaCount, scale.Spec.Replicas) - condition := result.Conditions.GetActiveCondition() - assert.Equal(t, false, condition.IsTrue()) - assert.NotNil(t, result.PauseReplicas) - assert.Equal(t, pausedReplicaCount, *result.PauseReplicas) + // PauseReplicas should not be set + assert.Nil(t, result.PauseReplicas) + // Executor should set paused condition + condition := result.Conditions.GetPausedCondition() + assert.Equal(t, true, condition.IsTrue()) + // Active condition should not be set (we returned early before checking triggers) + activeCondition := result.Conditions.GetActiveCondition() + assert.Equal(t, false, activeCondition.IsTrue()) } func TestEventWitTriggerInfo(t *testing.T) { diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index ee2f2f6d731..ef1de8c0cf9 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -326,8 +326,10 @@ func (h *scaleHandler) handleResult(ctx context.Context, obj kedav1alpha1.Scalab current.SetStatusLastActiveTime(result.LastActiveTime) } - // apply paused replica count (no-op for ScaledJob) - current.SetStatusPausedReplicaCount(result.PauseReplicas) + // Apply paused replica count only when explicitly set + if result.PauseReplicas != nil { + current.SetStatusPausedReplicaCount(result.PauseReplicas) + } // apply triggers activity delta if activityUpdates != nil || activityRemovals != nil { diff --git a/tests/internals/pause_scaledobject_explicitly/pause_scaledobject_explicitly_test.go b/tests/internals/pause_scaledobject_explicitly/pause_scaledobject_explicitly_test.go index 41d438998aa..40c9d4380be 100644 --- a/tests/internals/pause_scaledobject_explicitly/pause_scaledobject_explicitly_test.go +++ b/tests/internals/pause_scaledobject_explicitly/pause_scaledobject_explicitly_test.go @@ -4,11 +4,14 @@ package pause_scaledobject_explicitly_test import ( + "context" "fmt" "testing" "time" "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" . "github.com/kedacore/keda/v2/tests/helper" @@ -25,6 +28,7 @@ var ( deploymentName = fmt.Sprintf("%s-deployment", testName) monitoredDeploymentName = fmt.Sprintf("%s-monitored", testName) scaledObjectName = fmt.Sprintf("%s-so", testName) + hpaName = fmt.Sprintf("keda-hpa-%s", scaledObjectName) testScaleOutWaitMin = 1 testPauseAtNWaitMin = 1 testScaleInWaitMin = 1 @@ -127,6 +131,10 @@ func TestScaler(t *testing.T) { testPauseWhenScaleIn(t, kc) testScaleIn(t, kc, unpauseMethod) testBothPauseAnnotationActive(t, kc) + testHPANotExistWhilePaused(t, kc) + testHPANotExistWhilePausedReplicas(t, kc) + testChangePausedReplicasValue(t, kc) + testSwitchFromPausedReplicasToPaused(t, kc) // cleanup DeleteKubernetesResources(t, testNamespace, data, templates) @@ -272,3 +280,73 @@ func testBothPauseAnnotationActive(t *testing.T, kc *kubernetes.Clientset) { assert.Truef(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 5, 60, testPauseAtNWaitMin), "replica count should be 5 after %d minute(s)", testPauseAtNWaitMin) } + +func testHPANotExistWhilePaused(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing HPA does not exist while paused ---") + + upsertScaledObjectPausedAnnotation(t) + time.Sleep(5 * time.Second) + + _, err := kc.AutoscalingV2().HorizontalPodAutoscalers(testNamespace).Get(context.Background(), hpaName, metav1.GetOptions{}) + assert.True(t, errors.IsNotFound(err), "HPA should not exist while paused with paused=true") + + removeScaledObjectPausedAnnotation(t) + time.Sleep(5 * time.Second) +} + +func testHPANotExistWhilePausedReplicas(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing HPA does not exist while paused-replicas is set ---") + + upsertScaledObjectPausedReplicasAnnotation(t, 3) + + assert.Truef(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 3, 60, 1), + "replica count should be 3 after 1 minute") + + _, err := kc.AutoscalingV2().HorizontalPodAutoscalers(testNamespace).Get(context.Background(), hpaName, metav1.GetOptions{}) + assert.True(t, errors.IsNotFound(err), "HPA should not exist while paused with paused-replicas") + + removeScaledObjectPausedReplicasAnnotation(t) + time.Sleep(5 * time.Second) +} + +func testChangePausedReplicasValue(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing changing paused-replicas value while paused ---") + + upsertScaledObjectPausedReplicasAnnotation(t, 3) + assert.Truef(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 3, 60, 1), + "replica count should be 3 after 1 minute") + + upsertScaledObjectPausedReplicasAnnotation(t, 7) + assert.Truef(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 7, 60, 1), + "replica count should be 7 after 1 minute") + + removeScaledObjectPausedReplicasAnnotation(t) + time.Sleep(5 * time.Second) +} + +func testSwitchFromPausedReplicasToPaused(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing switching from paused-replicas to paused ---") + + // Ensure a stable starting state: use paused-replicas to set replicas to 5 + upsertScaledObjectPausedReplicasAnnotation(t, 5) + assert.Truef(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 5, 60, 1), + "replica count should be 5 after 1 minute") + + // Switch: remove paused-replicas, add paused=true + removeScaledObjectPausedReplicasAnnotation(t) + upsertScaledObjectPausedAnnotation(t) + time.Sleep(5 * time.Second) + + // HPA should not exist after switch + _, err := kc.AutoscalingV2().HorizontalPodAutoscalers(testNamespace).Get(context.Background(), hpaName, metav1.GetOptions{}) + assert.True(t, errors.IsNotFound(err), "HPA should not exist after switching to paused=true") + + // Replicas should stay frozen at 5 + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 5, 30) + + // Cleanup + removeScaledObjectPausedAnnotation(t) + KubernetesScaleDeployment(t, kc, monitoredDeploymentName, 0, testNamespace) + assert.Truef(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") +}