diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index 755a85e33fb..fc03c69b90e 100755 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -149,6 +149,10 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( conditions := scaledJob.Status.Conditions.DeepCopy() msg, err := r.reconcileScaledJob(ctx, reqLogger, scaledJob, &conditions) if err != nil { + if scaling.IsTransientScalerCacheRebuildError(err) { + reqLogger.V(1).Info("Transient scaler cache error during ScaledJob reconcile, requeueing without marking Ready=False", "error", err) + return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, nil + } reqLogger.Error(err, msg) conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg) conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed") diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index ee2f2f6d731..a954381e4fc 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -1008,8 +1008,13 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav logger := log.WithValues("scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) cache, err := h.GetScalersCache(ctx, scaledJob) - metricscollector.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err) if err != nil { + if IsTransientScalerCacheRebuildError(err) { + log.V(1).Info("Transient error getting scalers cache for ScaledJob metrics", + "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name, "error", err) + return nil, false, []string{} + } + metricscollector.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err) log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) return nil, true, []string{} } @@ -1044,6 +1049,10 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav metricscollector.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, latency) } if err != nil { + if IsTransientScalerCacheRebuildError(err) { + scalerLogger.V(1).Info("Transient scaler metric error during cache rebuild, continuing", "error", err) + continue + } scalerLogger.Error(err, "Error getting scaler metrics and activity, but continue") cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) isError = true diff --git a/pkg/scaling/transient_scaler_error.go b/pkg/scaling/transient_scaler_error.go new file mode 100644 index 00000000000..0f78a6536d6 --- /dev/null +++ b/pkg/scaling/transient_scaler_error.go @@ -0,0 +1,40 @@ +/* +Copyright 2021 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaling + +import ( + "errors" + "strings" +) + +// IsTransientScalerCacheRebuildError reports errors that commonly occur when the scaler cache +// is being rebuilt or closed during a ScaledJob spec/generation change (for example an old Redis +// client is closed while a scale-loop goroutine still holds a stale cache pointer). +// These are expected to self-heal on the next poll or reconcile and must not flip Ready=False. +func IsTransientScalerCacheRebuildError(err error) bool { + for err != nil { + msg := err.Error() + if strings.Contains(msg, "redis: client is closed") { + return true + } + if strings.Contains(msg, "scaler with id") && strings.Contains(msg, "not found") && strings.Contains(msg, "Len = 0") { + return true + } + err = errors.Unwrap(err) + } + return false +} diff --git a/pkg/scaling/transient_scaler_error_test.go b/pkg/scaling/transient_scaler_error_test.go new file mode 100644 index 00000000000..be2bf5faf44 --- /dev/null +++ b/pkg/scaling/transient_scaler_error_test.go @@ -0,0 +1,45 @@ +/* +Copyright 2021 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaling + +import ( + "errors" + "fmt" + "testing" +) + +func TestIsTransientScalerCacheRebuildError(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + {name: "nil", err: nil, want: false}, + {name: "redis closed", err: errors.New(`redis: client is closed`), want: true}, + {name: "wrapped redis closed", err: fmt.Errorf("outer: %w", errors.New(`redis: client is closed`)), want: true}, + {name: "scaler len zero", err: errors.New(`scaler with id 0 not found. Len = 0`), want: true}, + {name: "wrapped scaler len zero", err: fmt.Errorf("cache: %w", errors.New(`scaler with id 2 not found. Len = 0`)), want: true}, + {name: "unrelated", err: errors.New("connection refused"), want: false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsTransientScalerCacheRebuildError(tt.err); got != tt.want { + t.Fatalf("IsTransientScalerCacheRebuildError() = %v, want %v", got, tt.want) + } + }) + } +}