Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions pkg/controllers/common/task_observe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"context"

coreutil "github.com/pingcap/tidb-operator/v2/pkg/apiutil/core/v1alpha1"
"github.com/pingcap/tidb-operator/v2/pkg/metrics"
"github.com/pingcap/tidb-operator/v2/pkg/runtime"
"github.com/pingcap/tidb-operator/v2/pkg/runtime/scope"
"github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3"
)

// TaskObserveInstance refreshes the AbnormalInstance gauge for the reconciled
// instance every time the pipeline runs, and clears it when the instance CR
// has been removed from the API server (state.Object() == nil).
//
// It mirrors the shape of TaskTrack so observe and clear live in one place:
// the same branch that decides whether the object still exists also decides
// whether to publish or retract the gauge. Placing this after TaskTrack and
// before the `CondObjectHasBeenDeleted` IfBreak means it always runs once
// per reconcile, including the reconcile triggered by the informer's DELETE
// event - covering graceful delete, force-delete that strips finalizers, and
// any other path that lands in the watch stream.
func TaskObserveInstance[
S scope.Instance[F, T],
F Object[P],
T runtime.Instance,
P any,
](state TrackState[F]) task.Task {
return task.NameTaskFunc("ObserveInstance", func(context.Context) task.Result {
obj := state.Object()
if obj == nil {
key := state.Key()
// scope.Component[S]() is a compile-time constant for the reconcile
// kind, so we can qualify the sweep even after the CR is gone and
// its labels are no longer readable.
metrics.ClearInstanceConditionMetricsByKey(key.Namespace, scope.Component[S](), key.Name)
return task.Complete().With("cleared metrics for deleted %s", key)
}
conds := coreutil.StatusConditions[S](obj)
metrics.ObserveConditions(obj, conds)
return task.Complete().With("observed metrics for %s/%s", obj.GetNamespace(), obj.GetName())
})
}
109 changes: 109 additions & 0 deletions pkg/controllers/common/task_observe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"context"
"testing"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/pingcap/tidb-operator/api/v2/core/v1alpha1"
"github.com/pingcap/tidb-operator/v2/pkg/metrics"
"github.com/pingcap/tidb-operator/v2/pkg/runtime/scope"
"github.com/pingcap/tidb-operator/v2/pkg/utils/fake"
"github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3"
)

// hasGaugeSample returns true if the AbnormalInstance gauge has any sample
// whose (namespace, instance) labels match the inputs.
func hasGaugeSample(t *testing.T, namespace, instance string) bool {
t.Helper()
ch := make(chan prometheus.Metric, 32)
metrics.AbnormalInstance.Collect(ch)
close(ch)
for m := range ch {
dm := &dto.Metric{}
if err := m.Write(dm); err != nil {
continue
}
labels := map[string]string{}
for _, lp := range dm.GetLabel() {
labels[lp.GetName()] = lp.GetValue()
}
if labels["namespace"] == namespace && labels["instance"] == instance {
return true
}
}
return false
}

func TestTaskObserveInstance_ObservesConditions(t *testing.T) {
const ns, name = "ns-observe", "pd-observe"
defer metrics.ClearInstanceConditionMetricsByKey(ns, v1alpha1.LabelValComponentPD, name)

obj := fake.FakeObj(name, func(o *v1alpha1.PD) *v1alpha1.PD {
o.Namespace = ns
o.Labels = map[string]string{
v1alpha1.LabelKeyCluster: "c",
v1alpha1.LabelKeyComponent: "pd",
v1alpha1.LabelKeyGroup: "g",
}
o.Status.Conditions = []metav1.Condition{
{Type: v1alpha1.CondReady, Status: metav1.ConditionFalse},
{Type: v1alpha1.CondSynced, Status: metav1.ConditionTrue},
}
return o
})
state := &fakeState[v1alpha1.PD]{ns: ns, name: name, obj: obj}

res, done := task.RunTask(context.Background(), TaskObserveInstance[scope.PD](state))
require.Equal(t, task.SComplete, res.Status())
require.False(t, done)
assert.True(t, hasGaugeSample(t, ns, name),
"ObserveInstance must write at least one series for the instance")
}

func TestTaskObserveInstance_ClearsOnMissingObject(t *testing.T) {
const ns, name = "ns-clear", "pd-clear"

// Pre-populate a series as if a previous reconcile had observed the instance.
seed := fake.FakeObj(name, func(o *v1alpha1.PD) *v1alpha1.PD {
o.Namespace = ns
o.Labels = map[string]string{
v1alpha1.LabelKeyCluster: "c",
v1alpha1.LabelKeyComponent: "pd",
v1alpha1.LabelKeyGroup: "g",
}
return o
})
metrics.ObserveConditions(seed, []metav1.Condition{
{Type: v1alpha1.CondReady, Status: metav1.ConditionFalse},
})
require.True(t, hasGaugeSample(t, ns, name), "test precondition: seed series exists")

// Simulate a reconcile where TaskContextObject saw NotFound.
state := &fakeState[v1alpha1.PD]{ns: ns, name: name, obj: nil}

res, done := task.RunTask(context.Background(), TaskObserveInstance[scope.PD](state))
require.Equal(t, task.SComplete, res.Status())
require.False(t, done)
assert.False(t, hasGaugeSample(t, ns, name),
"ObserveInstance must clear every series for the instance when the object is gone")
}
2 changes: 2 additions & 0 deletions pkg/controllers/pd/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get pd
common.TaskContextObject[scope.PD](state, r.Client),
common.TaskTrack[scope.PD](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.PD](state),
// if it's gone just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.PD](state)),
Comment thread
fgksgf marked this conversation as resolved.

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/resourcemanager/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
runner := task.NewTaskRunner(reporter,
common.TaskContextObject[scope.ResourceManager](state, r.Client),
common.TaskTrack[scope.ResourceManager](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.ResourceManager](state),
task.IfBreak(common.CondObjectHasBeenDeleted[scope.ResourceManager](state)),

task.IfBreak(common.CondObjectIsDeleting[scope.ResourceManager](state),
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/router/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
runner := task.NewTaskRunner(reporter,
common.TaskContextObject[scope.Router](state, r.Client),
common.TaskTrack[scope.Router](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.Router](state),
task.IfBreak(common.CondObjectHasBeenDeleted[scope.Router](state)),

task.IfBreak(common.CondObjectIsDeleting[scope.Router](state),
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/scheduler/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get scheduler
common.TaskContextObject[scope.Scheduler](state, r.Client),
common.TaskTrack[scope.Scheduler](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.Scheduler](state),
// if it's gone just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.Scheduler](state)),

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/scheduling/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get scheduling
common.TaskContextObject[scope.Scheduling](state, r.Client),
common.TaskTrack[scope.Scheduling](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.Scheduling](state),
// if it's gone just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.Scheduling](state)),

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/ticdc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get TiCDC
common.TaskContextObject[scope.TiCDC](state, r.Client),
common.TaskTrack[scope.TiCDC](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TiCDC](state),
// if it's deleted just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiCDC](state)),

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/tidb/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get tidb
common.TaskContextObject[scope.TiDB](state, r.Client),
common.TaskTrack[scope.TiDB](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TiDB](state),
tasks.TaskRegisterForAdoption(state, r.AdoptManager),
// if it's deleted just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiDB](state)),
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/tiflash/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// Get tiflash
common.TaskContextObject[scope.TiFlash](state, r.Client),
common.TaskTrack[scope.TiFlash](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TiFlash](state),
// if it's deleted just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiFlash](state), tasks.TaskDeregisterTiFlashClient(state, r.TiFlashClientManager)),

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/tikv/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get tikv
common.TaskContextObject[scope.TiKV](state, r.Client),
common.TaskTrack[scope.TiKV](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TiKV](state),
// if it's deleted just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiKV](state)),

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/tikvworker/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get tikv worker
common.TaskContextObject[scope.TiKVWorker](state, r.Client),
common.TaskTrack[scope.TiKVWorker](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TiKVWorker](state),
// if it's deleted just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiKVWorker](state)),

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/tiproxy/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get tiproxy
common.TaskContextObject[scope.TiProxy](state, r.Client),
common.TaskTrack[scope.TiProxy](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TiProxy](state),
// if it's deleted just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiProxy](state)),

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/tso/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get tso
common.TaskContextObject[scope.TSO](state, r.Client),
common.TaskTrack[scope.TSO](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TSO](state),
// if it's gone just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TSO](state)),

Expand Down
27 changes: 27 additions & 0 deletions pkg/metrics/abnormal_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -57,6 +58,15 @@ func ObserveCondition(obj client.Object, conds []metav1.Condition, condType stri
AbnormalInstance.WithLabelValues(labels...).Set(value)
}

// ObserveConditions records the gauge for every condition type tracked by this
// package. This is the convenience entry point from reconcile tasks that want
// to refresh the full picture in one call.
func ObserveConditions(obj client.Object, conds []metav1.Condition) {
for _, condType := range trackedConditions {
ObserveCondition(obj, conds, condType)
}
}

// ClearInstanceConditionMetrics removes every tracked-condition series for
// the given instance.
//
Expand All @@ -76,3 +86,20 @@ func ClearInstanceConditionMetrics(obj client.Object) {
AbnormalInstance.DeleteLabelValues(append(base, condType)...)
}
}

// ClearInstanceConditionMetricsByKey removes every tracked-condition series
// matching (namespace, component, instance) regardless of cluster / group
// labels. Use this from reconcile paths where the object has already been
// deleted from the API server and the business labels are no longer readable;
// passing component (which is a constant for a given reconcile scope) avoids
// sweeping up a different kind of instance that happens to share the same
// namespace and name (e.g. a PD and a TiDB both called "foo"). The partial
// match still covers series written under an earlier cluster / group label
// value if those labels ever shifted.
func ClearInstanceConditionMetricsByKey(namespace, component, instance string) {
AbnormalInstance.DeletePartialMatch(prometheus.Labels{
"namespace": namespace,
"component": component,
"instance": instance,
})
}
66 changes: 66 additions & 0 deletions pkg/metrics/abnormal_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,69 @@ func TestClearInstanceConditionMetrics(t *testing.T) {
assert.False(t, gaugeSeriesExists(t, name, v1alpha1.CondReady),
"Ready series must be removed on clear")
}

func TestClearInstanceConditionMetricsByKey(t *testing.T) {
const name = "tikv-byKey"
obj := newTiKVForMetricTest(name)

// Seed two series under the normal labels.
ObserveCondition(obj, []metav1.Condition{{Type: v1alpha1.CondSynced, Status: metav1.ConditionFalse}}, v1alpha1.CondSynced)
ObserveCondition(obj, []metav1.Condition{{Type: v1alpha1.CondReady, Status: metav1.ConditionFalse}}, v1alpha1.CondReady)
require.True(t, gaugeSeriesExists(t, name, v1alpha1.CondSynced))
require.True(t, gaugeSeriesExists(t, name, v1alpha1.CondReady))

// Also seed a series with the same (namespace, component, instance) but
// a drifted cluster / group label, simulating a rename during the
// instance lifetime.
driftedLabels := []string{"test-ns", "drifted-cluster", "tikv", "drifted-group", name, v1alpha1.CondReady}
AbnormalInstance.WithLabelValues(driftedLabels...).Set(1)

// Seed a sibling instance (same namespace + same component) that must
// survive the partial-match clear.
sibling := newTiKVForMetricTest("tikv-sibling")
ObserveCondition(sibling, []metav1.Condition{{Type: v1alpha1.CondReady, Status: metav1.ConditionFalse}}, v1alpha1.CondReady)
defer ClearInstanceConditionMetrics(sibling)

// Seed another component that happens to share namespace + instance name
// with the target. It must NOT be swept because the clear qualifies by
// component.
collisionLabels := []string{"test-ns", "test-cluster", "tidb", "test-group", name, v1alpha1.CondReady}
AbnormalInstance.WithLabelValues(collisionLabels...).Set(1)
defer AbnormalInstance.DeleteLabelValues(collisionLabels...)

ClearInstanceConditionMetricsByKey("test-ns", "tikv", name)

assert.False(t, gaugeSeriesExists(t, name, v1alpha1.CondSynced),
"primary Synced series must be removed")
assert.False(t, gaugeSeriesExists(t, name, v1alpha1.CondReady),
"primary Ready series must be removed")

// Drifted-label series must also be swept since namespace+component+instance match.
assert.False(t, labelCombinationExists(t, driftedLabels),
"series with drifted cluster / group labels must be swept by partial match")

// Sibling instance under the same namespace and component must be untouched.
assert.True(t, gaugeSeriesExists(t, "tikv-sibling", v1alpha1.CondReady),
"unrelated instance must not be swept")

// Same namespace + same name but different component must NOT be swept.
assert.True(t, labelCombinationExists(t, collisionLabels),
"series for a different component sharing namespace+name must not be swept")
}

func labelCombinationExists(t *testing.T, want []string) bool {
t.Helper()
ch := make(chan prometheus.Metric, 32)
AbnormalInstance.Collect(ch)
close(ch)
for m := range ch {
dm := &dto.Metric{}
if err := m.Write(dm); err != nil {
continue
}
if labelsEqual(dm.GetLabel(), want) {
return true
}
}
return false
}
Loading