Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
conditions := scaledJob.Status.Conditions.DeepCopy()
msg, err := r.reconcileScaledJob(ctx, reqLogger, scaledJob, &conditions)
if err != nil {
if scaling.IsTransientScalerCacheRebuildError(err) {
reqLogger.V(1).Info("Transient scaler cache error during ScaledJob reconcile, requeueing without marking Ready=False", "error", err)
return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, nil
}
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed")
Expand Down
11 changes: 10 additions & 1 deletion pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,8 +1008,13 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
logger := log.WithValues("scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)

cache, err := h.GetScalersCache(ctx, scaledJob)
metricscollector.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err)
if err != nil {
if IsTransientScalerCacheRebuildError(err) {
log.V(1).Info("Transient error getting scalers cache for ScaledJob metrics",
"scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name, "error", err)
return nil, false, []string{}
}
metricscollector.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err)
log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)
return nil, true, []string{}
}
Expand Down Expand Up @@ -1044,6 +1049,10 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
metricscollector.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, latency)
}
if err != nil {
if IsTransientScalerCacheRebuildError(err) {
scalerLogger.V(1).Info("Transient scaler metric error during cache rebuild, continuing", "error", err)
continue
}
scalerLogger.Error(err, "Error getting scaler metrics and activity, but continue")
cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
isError = true
Expand Down
40 changes: 40 additions & 0 deletions pkg/scaling/transient_scaler_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copyright 2021 The KEDA Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scaling

import (
"errors"
"strings"
)

// IsTransientScalerCacheRebuildError reports errors that commonly occur when the scaler cache
// is being rebuilt or closed during a ScaledJob spec/generation change (for example an old Redis
// client is closed while a scale-loop goroutine still holds a stale cache pointer).
// These are expected to self-heal on the next poll or reconcile and must not flip Ready=False.
func IsTransientScalerCacheRebuildError(err error) bool {
for err != nil {
msg := err.Error()
if strings.Contains(msg, "redis: client is closed") {
return true
}
if strings.Contains(msg, "scaler with id") && strings.Contains(msg, "not found") && strings.Contains(msg, "Len = 0") {
return true
}
err = errors.Unwrap(err)
}
return false
}
45 changes: 45 additions & 0 deletions pkg/scaling/transient_scaler_error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2021 The KEDA Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scaling

import (
"errors"
"fmt"
"testing"
)

func TestIsTransientScalerCacheRebuildError(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{name: "nil", err: nil, want: false},
{name: "redis closed", err: errors.New(`redis: client is closed`), want: true},
{name: "wrapped redis closed", err: fmt.Errorf("outer: %w", errors.New(`redis: client is closed`)), want: true},
{name: "scaler len zero", err: errors.New(`scaler with id 0 not found. Len = 0`), want: true},
{name: "wrapped scaler len zero", err: fmt.Errorf("cache: %w", errors.New(`scaler with id 2 not found. Len = 0`)), want: true},
{name: "unrelated", err: errors.New("connection refused"), want: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsTransientScalerCacheRebuildError(tt.err); got != tt.want {
t.Fatalf("IsTransientScalerCacheRebuildError() = %v, want %v", got, tt.want)
}
})
}
}
Loading