diff --git a/pkg/controllers/common/task_observe.go b/pkg/controllers/common/task_observe.go new file mode 100644 index 00000000000..e4713d54216 --- /dev/null +++ b/pkg/controllers/common/task_observe.go @@ -0,0 +1,58 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "context" + + coreutil "github.com/pingcap/tidb-operator/v2/pkg/apiutil/core/v1alpha1" + "github.com/pingcap/tidb-operator/v2/pkg/metrics" + "github.com/pingcap/tidb-operator/v2/pkg/runtime" + "github.com/pingcap/tidb-operator/v2/pkg/runtime/scope" + "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" +) + +// TaskObserveInstance refreshes the AbnormalInstance gauge for the reconciled +// instance every time the pipeline runs, and clears it when the instance CR +// has been removed from the API server (state.Object() == nil). +// +// It mirrors the shape of TaskTrack so observe and clear live in one place: +// the same branch that decides whether the object still exists also decides +// whether to publish or retract the gauge. Placing this after TaskTrack and +// before the `CondObjectHasBeenDeleted` IfBreak means it always runs once +// per reconcile, including the reconcile triggered by the informer's DELETE +// event - covering graceful delete, force-delete that strips finalizers, and +// any other path that lands in the watch stream. +func TaskObserveInstance[ + S scope.Instance[F, T], + F Object[P], + T runtime.Instance, + P any, +](state TrackState[F]) task.Task { + return task.NameTaskFunc("ObserveInstance", func(context.Context) task.Result { + obj := state.Object() + if obj == nil { + key := state.Key() + // scope.Component[S]() is a compile-time constant for the reconcile + // kind, so we can qualify the sweep even after the CR is gone and + // its labels are no longer readable. + metrics.ClearInstanceConditionMetricsByKey(key.Namespace, scope.Component[S](), key.Name) + return task.Complete().With("cleared metrics for deleted %s", key) + } + conds := coreutil.StatusConditions[S](obj) + metrics.ObserveConditions(obj, conds) + return task.Complete().With("observed metrics for %s/%s", obj.GetNamespace(), obj.GetName()) + }) +} diff --git a/pkg/controllers/common/task_observe_test.go b/pkg/controllers/common/task_observe_test.go new file mode 100644 index 00000000000..9e525d3e9c2 --- /dev/null +++ b/pkg/controllers/common/task_observe_test.go @@ -0,0 +1,109 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "context" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" + "github.com/pingcap/tidb-operator/v2/pkg/metrics" + "github.com/pingcap/tidb-operator/v2/pkg/runtime/scope" + "github.com/pingcap/tidb-operator/v2/pkg/utils/fake" + "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" +) + +// hasGaugeSample returns true if the AbnormalInstance gauge has any sample +// whose (namespace, instance) labels match the inputs. +func hasGaugeSample(t *testing.T, namespace, instance string) bool { + t.Helper() + ch := make(chan prometheus.Metric, 32) + metrics.AbnormalInstance.Collect(ch) + close(ch) + for m := range ch { + dm := &dto.Metric{} + if err := m.Write(dm); err != nil { + continue + } + labels := map[string]string{} + for _, lp := range dm.GetLabel() { + labels[lp.GetName()] = lp.GetValue() + } + if labels["namespace"] == namespace && labels["instance"] == instance { + return true + } + } + return false +} + +func TestTaskObserveInstance_ObservesConditions(t *testing.T) { + const ns, name = "ns-observe", "pd-observe" + defer metrics.ClearInstanceConditionMetricsByKey(ns, v1alpha1.LabelValComponentPD, name) + + obj := fake.FakeObj(name, func(o *v1alpha1.PD) *v1alpha1.PD { + o.Namespace = ns + o.Labels = map[string]string{ + v1alpha1.LabelKeyCluster: "c", + v1alpha1.LabelKeyComponent: "pd", + v1alpha1.LabelKeyGroup: "g", + } + o.Status.Conditions = []metav1.Condition{ + {Type: v1alpha1.CondReady, Status: metav1.ConditionFalse}, + {Type: v1alpha1.CondSynced, Status: metav1.ConditionTrue}, + } + return o + }) + state := &fakeState[v1alpha1.PD]{ns: ns, name: name, obj: obj} + + res, done := task.RunTask(context.Background(), TaskObserveInstance[scope.PD](state)) + require.Equal(t, task.SComplete, res.Status()) + require.False(t, done) + assert.True(t, hasGaugeSample(t, ns, name), + "ObserveInstance must write at least one series for the instance") +} + +func TestTaskObserveInstance_ClearsOnMissingObject(t *testing.T) { + const ns, name = "ns-clear", "pd-clear" + + // Pre-populate a series as if a previous reconcile had observed the instance. + seed := fake.FakeObj(name, func(o *v1alpha1.PD) *v1alpha1.PD { + o.Namespace = ns + o.Labels = map[string]string{ + v1alpha1.LabelKeyCluster: "c", + v1alpha1.LabelKeyComponent: "pd", + v1alpha1.LabelKeyGroup: "g", + } + return o + }) + metrics.ObserveConditions(seed, []metav1.Condition{ + {Type: v1alpha1.CondReady, Status: metav1.ConditionFalse}, + }) + require.True(t, hasGaugeSample(t, ns, name), "test precondition: seed series exists") + + // Simulate a reconcile where TaskContextObject saw NotFound. + state := &fakeState[v1alpha1.PD]{ns: ns, name: name, obj: nil} + + res, done := task.RunTask(context.Background(), TaskObserveInstance[scope.PD](state)) + require.Equal(t, task.SComplete, res.Status()) + require.False(t, done) + assert.False(t, hasGaugeSample(t, ns, name), + "ObserveInstance must clear every series for the instance when the object is gone") +} diff --git a/pkg/controllers/pd/builder.go b/pkg/controllers/pd/builder.go index 36d2bd41b10..f3f347af5c5 100644 --- a/pkg/controllers/pd/builder.go +++ b/pkg/controllers/pd/builder.go @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // get pd common.TaskContextObject[scope.PD](state, r.Client), common.TaskTrack[scope.PD](state, r.Tracker), + // refresh the abnormal_instance gauge, or clear it if the CR is gone + common.TaskObserveInstance[scope.PD](state), // if it's gone just return task.IfBreak(common.CondObjectHasBeenDeleted[scope.PD](state)), diff --git a/pkg/controllers/resourcemanager/builder.go b/pkg/controllers/resourcemanager/builder.go index c924af349ce..e44b943ba4f 100644 --- a/pkg/controllers/resourcemanager/builder.go +++ b/pkg/controllers/resourcemanager/builder.go @@ -25,6 +25,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task runner := task.NewTaskRunner(reporter, common.TaskContextObject[scope.ResourceManager](state, r.Client), common.TaskTrack[scope.ResourceManager](state, r.Tracker), + // refresh the abnormal_instance gauge, or clear it if the CR is gone + common.TaskObserveInstance[scope.ResourceManager](state), task.IfBreak(common.CondObjectHasBeenDeleted[scope.ResourceManager](state)), task.IfBreak(common.CondObjectIsDeleting[scope.ResourceManager](state), diff --git a/pkg/controllers/router/builder.go b/pkg/controllers/router/builder.go index c8d3c63baee..0d5c77647b2 100644 --- a/pkg/controllers/router/builder.go +++ b/pkg/controllers/router/builder.go @@ -25,6 +25,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task runner := task.NewTaskRunner(reporter, common.TaskContextObject[scope.Router](state, r.Client), common.TaskTrack[scope.Router](state, r.Tracker), + // refresh the abnormal_instance gauge, or clear it if the CR is gone + common.TaskObserveInstance[scope.Router](state), task.IfBreak(common.CondObjectHasBeenDeleted[scope.Router](state)), task.IfBreak(common.CondObjectIsDeleting[scope.Router](state), diff --git a/pkg/controllers/scheduler/builder.go b/pkg/controllers/scheduler/builder.go index b0036e03196..d5217682055 100644 --- a/pkg/controllers/scheduler/builder.go +++ b/pkg/controllers/scheduler/builder.go @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // get scheduler common.TaskContextObject[scope.Scheduler](state, r.Client), common.TaskTrack[scope.Scheduler](state, r.Tracker), + // refresh the abnormal_instance gauge, or clear it if the CR is gone + common.TaskObserveInstance[scope.Scheduler](state), // if it's gone just return task.IfBreak(common.CondObjectHasBeenDeleted[scope.Scheduler](state)), diff --git a/pkg/controllers/scheduling/builder.go b/pkg/controllers/scheduling/builder.go index 8f7ccb72902..6e48fa80a9c 100644 --- a/pkg/controllers/scheduling/builder.go +++ b/pkg/controllers/scheduling/builder.go @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // get scheduling common.TaskContextObject[scope.Scheduling](state, r.Client), common.TaskTrack[scope.Scheduling](state, r.Tracker), + // refresh the abnormal_instance gauge, or clear it if the CR is gone + common.TaskObserveInstance[scope.Scheduling](state), // if it's gone just return task.IfBreak(common.CondObjectHasBeenDeleted[scope.Scheduling](state)), diff --git a/pkg/controllers/ticdc/builder.go b/pkg/controllers/ticdc/builder.go index 790301a83fa..d5c7252acfa 100644 --- a/pkg/controllers/ticdc/builder.go +++ b/pkg/controllers/ticdc/builder.go @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // get TiCDC common.TaskContextObject[scope.TiCDC](state, r.Client), common.TaskTrack[scope.TiCDC](state, r.Tracker), + // refresh the abnormal_instance gauge, or clear it if the CR is gone + common.TaskObserveInstance[scope.TiCDC](state), // if it's deleted just return task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiCDC](state)), diff --git a/pkg/controllers/tidb/builder.go b/pkg/controllers/tidb/builder.go index 3eba7b86b09..3386a6f2ec9 100644 --- a/pkg/controllers/tidb/builder.go +++ b/pkg/controllers/tidb/builder.go @@ -31,6 +31,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // get tidb common.TaskContextObject[scope.TiDB](state, r.Client), common.TaskTrack[scope.TiDB](state, r.Tracker), + // refresh the abnormal_instance gauge, or clear it if the CR is gone + common.TaskObserveInstance[scope.TiDB](state), tasks.TaskRegisterForAdoption(state, r.AdoptManager), // if it's deleted just return task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiDB](state)), diff --git a/pkg/controllers/tiflash/builder.go b/pkg/controllers/tiflash/builder.go index 93cb1827567..34bf6748b08 100644 --- a/pkg/controllers/tiflash/builder.go +++ b/pkg/controllers/tiflash/builder.go @@ -28,6 +28,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // Get tiflash common.TaskContextObject[scope.TiFlash](state, r.Client), common.TaskTrack[scope.TiFlash](state, r.Tracker), + // refresh the abnormal_instance gauge, or clear it if the CR is gone + common.TaskObserveInstance[scope.TiFlash](state), // if it's deleted just return task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiFlash](state), tasks.TaskDeregisterTiFlashClient(state, r.TiFlashClientManager)), diff --git a/pkg/controllers/tikv/builder.go b/pkg/controllers/tikv/builder.go index 765effcb721..16d913524c6 100644 --- a/pkg/controllers/tikv/builder.go +++ b/pkg/controllers/tikv/builder.go @@ -28,6 +28,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // get tikv common.TaskContextObject[scope.TiKV](state, r.Client), common.TaskTrack[scope.TiKV](state, r.Tracker), + // refresh the abnormal_instance gauge, or clear it if the CR is gone + common.TaskObserveInstance[scope.TiKV](state), // if it's deleted just return task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiKV](state)), diff --git a/pkg/controllers/tikvworker/builder.go b/pkg/controllers/tikvworker/builder.go index 115abcdeec4..0a5584b83c6 100644 --- a/pkg/controllers/tikvworker/builder.go +++ b/pkg/controllers/tikvworker/builder.go @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // get tikv worker common.TaskContextObject[scope.TiKVWorker](state, r.Client), common.TaskTrack[scope.TiKVWorker](state, r.Tracker), + // refresh the abnormal_instance gauge, or clear it if the CR is gone + common.TaskObserveInstance[scope.TiKVWorker](state), // if it's deleted just return task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiKVWorker](state)), diff --git a/pkg/controllers/tiproxy/builder.go b/pkg/controllers/tiproxy/builder.go index 106219be402..5dbc12cc706 100644 --- a/pkg/controllers/tiproxy/builder.go +++ b/pkg/controllers/tiproxy/builder.go @@ -28,6 +28,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // get tiproxy common.TaskContextObject[scope.TiProxy](state, r.Client), common.TaskTrack[scope.TiProxy](state, r.Tracker), + // refresh the abnormal_instance gauge, or clear it if the CR is gone + common.TaskObserveInstance[scope.TiProxy](state), // if it's deleted just return task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiProxy](state)), diff --git a/pkg/controllers/tso/builder.go b/pkg/controllers/tso/builder.go index afc37fb6457..3e430a3beb3 100644 --- a/pkg/controllers/tso/builder.go +++ b/pkg/controllers/tso/builder.go @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task // get tso common.TaskContextObject[scope.TSO](state, r.Client), common.TaskTrack[scope.TSO](state, r.Tracker), + // refresh the abnormal_instance gauge, or clear it if the CR is gone + common.TaskObserveInstance[scope.TSO](state), // if it's gone just return task.IfBreak(common.CondObjectHasBeenDeleted[scope.TSO](state)), diff --git a/pkg/metrics/abnormal_instance.go b/pkg/metrics/abnormal_instance.go index 8fceb702afc..9f8b1c4b228 100644 --- a/pkg/metrics/abnormal_instance.go +++ b/pkg/metrics/abnormal_instance.go @@ -15,6 +15,7 @@ package metrics import ( + "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -57,6 +58,15 @@ func ObserveCondition(obj client.Object, conds []metav1.Condition, condType stri AbnormalInstance.WithLabelValues(labels...).Set(value) } +// ObserveConditions records the gauge for every condition type tracked by this +// package. This is the convenience entry point from reconcile tasks that want +// to refresh the full picture in one call. +func ObserveConditions(obj client.Object, conds []metav1.Condition) { + for _, condType := range trackedConditions { + ObserveCondition(obj, conds, condType) + } +} + // ClearInstanceConditionMetrics removes every tracked-condition series for // the given instance. // @@ -76,3 +86,20 @@ func ClearInstanceConditionMetrics(obj client.Object) { AbnormalInstance.DeleteLabelValues(append(base, condType)...) } } + +// ClearInstanceConditionMetricsByKey removes every tracked-condition series +// matching (namespace, component, instance) regardless of cluster / group +// labels. Use this from reconcile paths where the object has already been +// deleted from the API server and the business labels are no longer readable; +// passing component (which is a constant for a given reconcile scope) avoids +// sweeping up a different kind of instance that happens to share the same +// namespace and name (e.g. a PD and a TiDB both called "foo"). The partial +// match still covers series written under an earlier cluster / group label +// value if those labels ever shifted. +func ClearInstanceConditionMetricsByKey(namespace, component, instance string) { + AbnormalInstance.DeletePartialMatch(prometheus.Labels{ + "namespace": namespace, + "component": component, + "instance": instance, + }) +} diff --git a/pkg/metrics/abnormal_instance_test.go b/pkg/metrics/abnormal_instance_test.go index a29de512595..42a1ddbba4e 100644 --- a/pkg/metrics/abnormal_instance_test.go +++ b/pkg/metrics/abnormal_instance_test.go @@ -147,3 +147,69 @@ func TestClearInstanceConditionMetrics(t *testing.T) { assert.False(t, gaugeSeriesExists(t, name, v1alpha1.CondReady), "Ready series must be removed on clear") } + +func TestClearInstanceConditionMetricsByKey(t *testing.T) { + const name = "tikv-byKey" + obj := newTiKVForMetricTest(name) + + // Seed two series under the normal labels. + ObserveCondition(obj, []metav1.Condition{{Type: v1alpha1.CondSynced, Status: metav1.ConditionFalse}}, v1alpha1.CondSynced) + ObserveCondition(obj, []metav1.Condition{{Type: v1alpha1.CondReady, Status: metav1.ConditionFalse}}, v1alpha1.CondReady) + require.True(t, gaugeSeriesExists(t, name, v1alpha1.CondSynced)) + require.True(t, gaugeSeriesExists(t, name, v1alpha1.CondReady)) + + // Also seed a series with the same (namespace, component, instance) but + // a drifted cluster / group label, simulating a rename during the + // instance lifetime. + driftedLabels := []string{"test-ns", "drifted-cluster", "tikv", "drifted-group", name, v1alpha1.CondReady} + AbnormalInstance.WithLabelValues(driftedLabels...).Set(1) + + // Seed a sibling instance (same namespace + same component) that must + // survive the partial-match clear. + sibling := newTiKVForMetricTest("tikv-sibling") + ObserveCondition(sibling, []metav1.Condition{{Type: v1alpha1.CondReady, Status: metav1.ConditionFalse}}, v1alpha1.CondReady) + defer ClearInstanceConditionMetrics(sibling) + + // Seed another component that happens to share namespace + instance name + // with the target. It must NOT be swept because the clear qualifies by + // component. + collisionLabels := []string{"test-ns", "test-cluster", "tidb", "test-group", name, v1alpha1.CondReady} + AbnormalInstance.WithLabelValues(collisionLabels...).Set(1) + defer AbnormalInstance.DeleteLabelValues(collisionLabels...) + + ClearInstanceConditionMetricsByKey("test-ns", "tikv", name) + + assert.False(t, gaugeSeriesExists(t, name, v1alpha1.CondSynced), + "primary Synced series must be removed") + assert.False(t, gaugeSeriesExists(t, name, v1alpha1.CondReady), + "primary Ready series must be removed") + + // Drifted-label series must also be swept since namespace+component+instance match. + assert.False(t, labelCombinationExists(t, driftedLabels), + "series with drifted cluster / group labels must be swept by partial match") + + // Sibling instance under the same namespace and component must be untouched. + assert.True(t, gaugeSeriesExists(t, "tikv-sibling", v1alpha1.CondReady), + "unrelated instance must not be swept") + + // Same namespace + same name but different component must NOT be swept. + assert.True(t, labelCombinationExists(t, collisionLabels), + "series for a different component sharing namespace+name must not be swept") +} + +func labelCombinationExists(t *testing.T, want []string) bool { + t.Helper() + ch := make(chan prometheus.Metric, 32) + AbnormalInstance.Collect(ch) + close(ch) + for m := range ch { + dm := &dto.Metric{} + if err := m.Write(dm); err != nil { + continue + } + if labelsEqual(dm.GetLabel(), want) { + return true + } + } + return false +}