Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions pkg/controllers/common/task_observe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"context"

coreutil "github.com/pingcap/tidb-operator/v2/pkg/apiutil/core/v1alpha1"
"github.com/pingcap/tidb-operator/v2/pkg/metrics"
"github.com/pingcap/tidb-operator/v2/pkg/runtime"
"github.com/pingcap/tidb-operator/v2/pkg/runtime/scope"
"github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3"
)

// TaskObserveInstance refreshes the AbnormalInstance gauge for the reconciled
// instance every time the pipeline runs, and clears it when the instance CR
// has been removed from the API server (state.Object() == nil).
//
// It mirrors the shape of TaskTrack so observe and clear live in one place:
// the same branch that decides whether the object still exists also decides
// whether to publish or retract the gauge. Placing this after TaskTrack and
// before the `CondObjectHasBeenDeleted` IfBreak means it always runs once
// per reconcile, including the reconcile triggered by the informer's DELETE
// event - covering graceful delete, force-delete that strips finalizers, and
// any other path that lands in the watch stream.
func TaskObserveInstance[
S scope.Instance[F, T],
F Object[P],
T runtime.Instance,
P any,
](state TrackState[F]) task.Task {
return task.NameTaskFunc("ObserveInstance", func(context.Context) task.Result {
obj := state.Object()
if obj == nil {
key := state.Key()
metrics.ClearInstanceConditionMetricsByKey(key.Namespace, key.Name)
return task.Complete().With("cleared metrics for deleted %s", key)
}
conds := coreutil.StatusConditions[S](obj)
metrics.ObserveConditions(obj, conds)
return task.Complete().With("observed metrics for %s/%s", obj.GetNamespace(), obj.GetName())
})
}
109 changes: 109 additions & 0 deletions pkg/controllers/common/task_observe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"context"
"testing"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/pingcap/tidb-operator/api/v2/core/v1alpha1"
"github.com/pingcap/tidb-operator/v2/pkg/metrics"
"github.com/pingcap/tidb-operator/v2/pkg/runtime/scope"
"github.com/pingcap/tidb-operator/v2/pkg/utils/fake"
"github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3"
)

// hasGaugeSample returns true if the AbnormalInstance gauge has any sample
// whose (namespace, instance) labels match the inputs.
func hasGaugeSample(t *testing.T, namespace, instance string) bool {
t.Helper()
ch := make(chan prometheus.Metric, 32)
metrics.AbnormalInstance.Collect(ch)
close(ch)
for m := range ch {
dm := &dto.Metric{}
if err := m.Write(dm); err != nil {
continue
}
labels := map[string]string{}
for _, lp := range dm.GetLabel() {
labels[lp.GetName()] = lp.GetValue()
}
if labels["namespace"] == namespace && labels["instance"] == instance {
return true
}
}
return false
}

func TestTaskObserveInstance_ObservesConditions(t *testing.T) {
const ns, name = "ns-observe", "pd-observe"
defer metrics.ClearInstanceConditionMetricsByKey(ns, name)

obj := fake.FakeObj(name, func(o *v1alpha1.PD) *v1alpha1.PD {
o.Namespace = ns
o.Labels = map[string]string{
v1alpha1.LabelKeyCluster: "c",
v1alpha1.LabelKeyComponent: "pd",
v1alpha1.LabelKeyGroup: "g",
}
o.Status.Conditions = []metav1.Condition{
{Type: v1alpha1.CondReady, Status: metav1.ConditionFalse},
{Type: v1alpha1.CondSynced, Status: metav1.ConditionTrue},
}
return o
})
state := &fakeState[v1alpha1.PD]{ns: ns, name: name, obj: obj}

res, done := task.RunTask(context.Background(), TaskObserveInstance[scope.PD](state))
require.Equal(t, task.SComplete, res.Status())
require.False(t, done)
assert.True(t, hasGaugeSample(t, ns, name),
"ObserveInstance must write at least one series for the instance")
}

func TestTaskObserveInstance_ClearsOnMissingObject(t *testing.T) {
const ns, name = "ns-clear", "pd-clear"

// Pre-populate a series as if a previous reconcile had observed the instance.
seed := fake.FakeObj(name, func(o *v1alpha1.PD) *v1alpha1.PD {
o.Namespace = ns
o.Labels = map[string]string{
v1alpha1.LabelKeyCluster: "c",
v1alpha1.LabelKeyComponent: "pd",
v1alpha1.LabelKeyGroup: "g",
}
return o
})
metrics.ObserveConditions(seed, []metav1.Condition{
{Type: v1alpha1.CondReady, Status: metav1.ConditionFalse},
})
require.True(t, hasGaugeSample(t, ns, name), "test precondition: seed series exists")

// Simulate a reconcile where TaskContextObject saw NotFound.
state := &fakeState[v1alpha1.PD]{ns: ns, name: name, obj: nil}

res, done := task.RunTask(context.Background(), TaskObserveInstance[scope.PD](state))
require.Equal(t, task.SComplete, res.Status())
require.False(t, done)
assert.False(t, hasGaugeSample(t, ns, name),
"ObserveInstance must clear every series for the instance when the object is gone")
}
2 changes: 2 additions & 0 deletions pkg/controllers/pd/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get pd
common.TaskContextObject[scope.PD](state, r.Client),
common.TaskTrack[scope.PD](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.PD](state),
// if it's gone just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.PD](state)),
Comment thread
fgksgf marked this conversation as resolved.

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/scheduling/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get scheduling
common.TaskContextObject[scope.Scheduling](state, r.Client),
common.TaskTrack[scope.Scheduling](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.Scheduling](state),
// if it's gone just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.Scheduling](state)),

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/ticdc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get TiCDC
common.TaskContextObject[scope.TiCDC](state, r.Client),
common.TaskTrack[scope.TiCDC](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TiCDC](state),
// if it's deleted just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiCDC](state)),

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/tidb/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get tidb
common.TaskContextObject[scope.TiDB](state, r.Client),
common.TaskTrack[scope.TiDB](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TiDB](state),
tasks.TaskRegisterForAdoption(state, r.AdoptManager),
// if it's deleted just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiDB](state)),
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/tiflash/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// Get tiflash
common.TaskContextObject[scope.TiFlash](state, r.Client),
common.TaskTrack[scope.TiFlash](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TiFlash](state),
// if it's deleted just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiFlash](state), tasks.TaskDeregisterTiFlashClient(state, r.TiFlashClientManager)),

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/tikv/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get tikv
common.TaskContextObject[scope.TiKV](state, r.Client),
common.TaskTrack[scope.TiKV](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TiKV](state),
// if it's deleted just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiKV](state)),

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/tikvworker/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get tikv worker
common.TaskContextObject[scope.TiKVWorker](state, r.Client),
common.TaskTrack[scope.TiKVWorker](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TiKVWorker](state),
// if it's deleted just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiKVWorker](state)),

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/tiproxy/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get tiproxy
common.TaskContextObject[scope.TiProxy](state, r.Client),
common.TaskTrack[scope.TiProxy](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TiProxy](state),
// if it's deleted just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TiProxy](state)),

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/tso/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
// get tso
common.TaskContextObject[scope.TSO](state, r.Client),
common.TaskTrack[scope.TSO](state, r.Tracker),
// refresh the abnormal_instance gauge, or clear it if the CR is gone
common.TaskObserveInstance[scope.TSO](state),
// if it's gone just return
task.IfBreak(common.CondObjectHasBeenDeleted[scope.TSO](state)),

Expand Down
23 changes: 23 additions & 0 deletions pkg/metrics/abnormal_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -57,6 +58,15 @@ func ObserveCondition(obj client.Object, conds []metav1.Condition, condType stri
AbnormalInstance.WithLabelValues(labels...).Set(value)
}

// ObserveConditions records the gauge for every condition type tracked by this
// package. This is the convenience entry point from reconcile tasks that want
// to refresh the full picture in one call.
func ObserveConditions(obj client.Object, conds []metav1.Condition) {
for _, condType := range trackedConditions {
ObserveCondition(obj, conds, condType)
}
}

// ClearInstanceConditionMetrics removes every tracked-condition series for
// the given instance.
//
Expand All @@ -76,3 +86,16 @@ func ClearInstanceConditionMetrics(obj client.Object) {
AbnormalInstance.DeleteLabelValues(append(base, condType)...)
}
}

// ClearInstanceConditionMetricsByKey removes every tracked-condition series
// matching (namespace, instance) regardless of cluster / component / group
// labels. Use this from reconcile paths where the object has already been
// deleted from the API server and the business labels are no longer available
// for an exact match; the partial match guarantees we also sweep up series
// written under an earlier label value if those labels ever shifted.
func ClearInstanceConditionMetricsByKey(namespace, instance string) {
AbnormalInstance.DeletePartialMatch(prometheus.Labels{
"namespace": namespace,
Comment thread
fgksgf marked this conversation as resolved.
Outdated
"instance": instance,
})
}
53 changes: 53 additions & 0 deletions pkg/metrics/abnormal_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,56 @@ func TestClearInstanceConditionMetrics(t *testing.T) {
assert.False(t, gaugeSeriesExists(t, name, v1alpha1.CondReady),
"Ready series must be removed on clear")
}

func TestClearInstanceConditionMetricsByKey(t *testing.T) {
const name = "tikv-byKey"
obj := newTiKVForMetricTest(name)

// Seed two series under the normal labels.
ObserveCondition(obj, []metav1.Condition{{Type: v1alpha1.CondSynced, Status: metav1.ConditionFalse}}, v1alpha1.CondSynced)
ObserveCondition(obj, []metav1.Condition{{Type: v1alpha1.CondReady, Status: metav1.ConditionFalse}}, v1alpha1.CondReady)
require.True(t, gaugeSeriesExists(t, name, v1alpha1.CondSynced))
require.True(t, gaugeSeriesExists(t, name, v1alpha1.CondReady))

// Also seed a series with the same (namespace, instance) but a drifted
// business label, simulating a label rename during the instance lifetime.
driftedLabels := []string{"test-ns", "drifted-cluster", "tikv", "test-group", name, v1alpha1.CondReady}
AbnormalInstance.WithLabelValues(driftedLabels...).Set(1)

// Seed a sibling instance that must survive the partial-match clear.
sibling := newTiKVForMetricTest("tikv-sibling")
ObserveCondition(sibling, []metav1.Condition{{Type: v1alpha1.CondReady, Status: metav1.ConditionFalse}}, v1alpha1.CondReady)
defer ClearInstanceConditionMetrics(sibling)

ClearInstanceConditionMetricsByKey("test-ns", name)

assert.False(t, gaugeSeriesExists(t, name, v1alpha1.CondSynced),
"primary Synced series must be removed")
assert.False(t, gaugeSeriesExists(t, name, v1alpha1.CondReady),
"primary Ready series must be removed")

// Drifted-label series must also be swept since namespace+instance match.
assert.False(t, labelCombinationExists(t, driftedLabels),
"series with drifted business labels must be swept by partial match")

// Sibling instance under the same namespace must be untouched.
assert.True(t, gaugeSeriesExists(t, "tikv-sibling", v1alpha1.CondReady),
"unrelated instance must not be swept")
}

func labelCombinationExists(t *testing.T, want []string) bool {
t.Helper()
ch := make(chan prometheus.Metric, 32)
AbnormalInstance.Collect(ch)
close(ch)
for m := range ch {
dm := &dto.Metric{}
if err := m.Write(dm); err != nil {
continue
}
if labelsEqual(dm.GetLabel(), want) {
return true
}
}
return false
}
Loading