Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/core/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ const (
// Last instance template is recorded to check whether the pod should be restarted because of changes of instance template
AnnoKeyLastInstanceTemplate = AnnoKeyPrefix + "last-instance-template"

// TiProxy graceful shutdown delete delay is configured on the TiProxy object annotations in seconds.
AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds = AnnoKeyPrefix + "tiproxy-graceful-shutdown-delete-delay-seconds"

// TiProxy graceful shutdown begin time is recorded on the pod after it has been removed from the Service selector.
AnnoKeyTiProxyGracefulShutdownBeginTime = AnnoKeyPrefix + "tiproxy-graceful-shutdown-begin-time"

// Features is recorded to check whether the pod should be restarted because of changes of features
AnnoKeyFeatures = AnnoKeyPrefix + "features"

Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/tiproxy/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
task.IfBreak(common.CondClusterIsPaused(state)),
// if the cluster is deleting, del all subresources and remove the finalizer directly
task.IfBreak(common.CondClusterIsDeleting(state),
common.TaskInstanceFinalizerDel[scope.TiProxy](state, r.Client, common.DefaultInstanceSubresourceLister),
tasks.TaskFinalizerDel(state, r.Client),
),
// return if cluster's status is not updated
task.IfBreak(common.CondClusterPDAddrIsNotRegistered(state)),

task.IfBreak(common.CondObjectIsDeleting[scope.TiProxy](state),
common.TaskInstanceFinalizerDel[scope.TiProxy](state, r.Client, common.DefaultInstanceSubresourceLister),
tasks.TaskFinalizerDel(state, r.Client),
// TODO(liubo02): if the finalizer has been removed, no need to update status
common.TaskInstanceConditionSynced[scope.TiProxy](state),
common.TaskInstanceConditionReady[scope.TiProxy](state),
Expand Down
114 changes: 114 additions & 0 deletions pkg/controllers/tiproxy/tasks/finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tasks

import (
"context"
"strconv"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/pingcap/tidb-operator/api/v2/core/v1alpha1"
"github.com/pingcap/tidb-operator/v2/pkg/apicall"
"github.com/pingcap/tidb-operator/v2/pkg/client"
"github.com/pingcap/tidb-operator/v2/pkg/controllers/common"
"github.com/pingcap/tidb-operator/v2/pkg/runtime/scope"
"github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3"
)

var tiproxyFinalizerSubresourceLister = common.NewSubresourceLister(
common.NewSubresource[corev1.ConfigMapList](),
common.NewSubresource[corev1.PersistentVolumeClaimList](),
)

func TaskFinalizerDel(state common.ObjectState[*v1alpha1.TiProxy], c client.Client) task.Task {
return task.NameTaskFunc("FinalizerDel", func(ctx context.Context) task.Result {
tiproxy := state.Object()

pod, err := apicall.GetPod[scope.TiProxy](ctx, c, tiproxy)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move common.TaskContextPod[scope.TiProxy](state, r.Client) before this task and no need to get pod again.

if err != nil && !apierrors.IsNotFound(err) {
return task.Fail().With("cannot get pod of tiproxy: %v", err)
}
if pod != nil {
retryAfter, err := drainOrDeletePod(ctx, c, tiproxy, pod)
if err != nil {
return task.Fail().With("cannot delete pod of tiproxy: %v", err)
}
if retryAfter > 0 {
return task.Retry(retryAfter).With("wait for tiproxy pod to be deleted")
}
}

res, _ := task.RunTask(ctx, common.TaskInstanceFinalizerDel[scope.TiProxy](state, c, tiproxyFinalizerSubresourceLister))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new task and do not run another task in this task. If task is not fail, next task will be run.

return res
})
}

func drainOrDeletePod(ctx context.Context, c client.Client, tiproxy *v1alpha1.TiProxy, pod *corev1.Pod) (time.Duration, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to notify the tiproxy that it is terminating?

if !pod.GetDeletionTimestamp().IsZero() {
return task.DefaultRequeueAfter, nil
}

if raw := tiproxy.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds]; raw != "" {
seconds, err := strconv.ParseInt(raw, 10, 64)
if err != nil || seconds <= 0 {
seconds = 0
}
if seconds > 0 {
newPod := pod.DeepCopy()
changed := false

if _, ok := newPod.Labels[v1alpha1.LabelKeyGroup]; ok {
delete(newPod.Labels, v1alpha1.LabelKeyGroup)
changed = true
}

startAt := time.Now()
if newPod.Annotations == nil {
newPod.Annotations = map[string]string{}
}
if raw := newPod.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownBeginTime]; raw == "" {
newPod.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownBeginTime] = startAt.Format(time.RFC3339Nano)
changed = true
} else if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
startAt = t
} else {
newPod.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownBeginTime] = startAt.Format(time.RFC3339Nano)
changed = true
}

if changed {
if err := c.Update(ctx, newPod); err != nil {
return 0, err
}
return task.DefaultRequeueAfter, nil
}

if remaining := time.Until(startAt.Add(time.Duration(seconds) * time.Second)); remaining > 0 {
if remaining > task.DefaultRequeueAfter {
remaining = task.DefaultRequeueAfter
}
return remaining, nil
}
}
}

if err := c.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) {
return 0, err
}
return task.DefaultRequeueAfter, nil
}
112 changes: 112 additions & 0 deletions pkg/controllers/tiproxy/tasks/finalizer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tasks

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/pingcap/tidb-operator/api/v2/core/v1alpha1"
"github.com/pingcap/tidb-operator/v2/pkg/client"
"github.com/pingcap/tidb-operator/v2/pkg/utils/fake"
"github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3"
)

func TestTaskFinalizerDelDeletePodImmediatelyByDefault(t *testing.T) {
t.Parallel()

ctx := context.Background()
tiproxy := deletingTiProxy("")
cluster := fake.FakeObj[v1alpha1.Cluster]("aaa")
pod := fakePod(cluster, tiproxy)

fc := client.NewFakeClient(tiproxy, pod)
res, done := task.RunTask(ctx, TaskFinalizerDel(&state{tiproxy: tiproxy}, fc))

assert.Equal(t, task.SRetry.String(), res.Status().String())
assert.False(t, done)

err := fc.Get(ctx, client.ObjectKeyFromObject(pod), &corev1.Pod{})
assert.True(t, apierrors.IsNotFound(err))
}

func TestTaskFinalizerDelDrainPodBeforeDeleting(t *testing.T) {
t.Parallel()

ctx := context.Background()
tiproxy := deletingTiProxy("3600")
cluster := fake.FakeObj[v1alpha1.Cluster]("aaa")
pod := fakePod(cluster, tiproxy)

fc := client.NewFakeClient(tiproxy, pod)
res, done := task.RunTask(ctx, TaskFinalizerDel(&state{tiproxy: tiproxy}, fc))

assert.Equal(t, task.SRetry.String(), res.Status().String())
assert.False(t, done)

actual := &corev1.Pod{}
require.NoError(t, fc.Get(ctx, client.ObjectKeyFromObject(pod), actual))
_, hasGroupLabel := actual.Labels[v1alpha1.LabelKeyGroup]
assert.False(t, hasGroupLabel)
assert.NotEmpty(t, actual.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownBeginTime])
}

func TestTaskFinalizerDelDeletePodAfterDrainDelay(t *testing.T) {
t.Parallel()

ctx := context.Background()
tiproxy := deletingTiProxy("60")
cluster := fake.FakeObj[v1alpha1.Cluster]("aaa")
pod := fakePod(cluster, tiproxy)
delete(pod.Labels, v1alpha1.LabelKeyGroup)
pod.Annotations = map[string]string{
v1alpha1.AnnoKeyTiProxyGracefulShutdownBeginTime: time.Now().Add(-2 * time.Minute).Format(time.RFC3339Nano),
}

fc := client.NewFakeClient(tiproxy, pod)
res, done := task.RunTask(ctx, TaskFinalizerDel(&state{tiproxy: tiproxy}, fc))

assert.Equal(t, task.SRetry.String(), res.Status().String())
assert.False(t, done)

err := fc.Get(ctx, client.ObjectKeyFromObject(pod), &corev1.Pod{})
assert.True(t, apierrors.IsNotFound(err))
}

func deletingTiProxy(deleteDelaySeconds string) *v1alpha1.TiProxy {
now := metav1.Now()
return fake.FakeObj("aaa-xxx",
fake.DeleteTimestamp[v1alpha1.TiProxy](&now),
fake.AddFinalizer[v1alpha1.TiProxy](),
func(obj *v1alpha1.TiProxy) *v1alpha1.TiProxy {
obj.Spec.Cluster.Name = "aaa"
obj.Spec.Version = fakeVersion
if deleteDelaySeconds != "" {
if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
obj.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds] = deleteDelaySeconds
}
return obj
},
)
}
2 changes: 2 additions & 0 deletions pkg/reloadable/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func convertLabels(ls map[string]string) map[string]string {
func convertAnnotations(ls map[string]string) map[string]string {
// ignore defer delete annotation
delete(ls, v1alpha1.AnnoKeyDeferDelete)
// ignore tiproxy graceful shutdown delay annotation
delete(ls, v1alpha1.AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds)
// ignore boot annotation of pd
delete(ls, v1alpha1.AnnoKeyInitialClusterNum)

Expand Down
7 changes: 4 additions & 3 deletions pkg/reloadable/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,10 @@ func TestConvertAnnotations(t *testing.T) {
{
desc: "ignore keys",
in: map[string]string{
v1alpha1.AnnoKeyInitialClusterNum: "10",
v1alpha1.AnnoKeyDeferDelete: "xxx",
"test": "test",
v1alpha1.AnnoKeyInitialClusterNum: "10",
v1alpha1.AnnoKeyDeferDelete: "xxx",
v1alpha1.AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds: "20",
"test": "test",
},
out: map[string]string{
"test": "test",
Expand Down
Loading
Loading