diff --git a/api/core/v1alpha1/common_types.go b/api/core/v1alpha1/common_types.go index f830b5eb636..bb9ff6d57f3 100644 --- a/api/core/v1alpha1/common_types.go +++ b/api/core/v1alpha1/common_types.go @@ -164,6 +164,12 @@ const ( // Last instance template is recorded to check whether the pod should be restarted because of changes of instance template AnnoKeyLastInstanceTemplate = AnnoKeyPrefix + "last-instance-template" + // TiProxy graceful shutdown delete delay is configured on the TiProxy object annotations in seconds. + AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds = AnnoKeyPrefix + "tiproxy-graceful-shutdown-delete-delay-seconds" + + // TiProxy graceful shutdown begin time is recorded on the pod after it has been removed from the Service selector. + AnnoKeyTiProxyGracefulShutdownBeginTime = AnnoKeyPrefix + "tiproxy-graceful-shutdown-begin-time" + // Features is recorded to check whether the pod should be restarted because of changes of features AnnoKeyFeatures = AnnoKeyPrefix + "features" diff --git a/pkg/controllers/tiproxy/builder.go b/pkg/controllers/tiproxy/builder.go index 106219be402..b4b9375e5a8 100644 --- a/pkg/controllers/tiproxy/builder.go +++ b/pkg/controllers/tiproxy/builder.go @@ -37,13 +37,13 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task task.IfBreak(common.CondClusterIsPaused(state)), // if the cluster is deleting, del all subresources and remove the finalizer directly task.IfBreak(common.CondClusterIsDeleting(state), - common.TaskInstanceFinalizerDel[scope.TiProxy](state, r.Client, common.DefaultInstanceSubresourceLister), + tasks.TaskFinalizerDel(state, r.Client), ), // return if cluster's status is not updated task.IfBreak(common.CondClusterPDAddrIsNotRegistered(state)), task.IfBreak(common.CondObjectIsDeleting[scope.TiProxy](state), - common.TaskInstanceFinalizerDel[scope.TiProxy](state, r.Client, common.DefaultInstanceSubresourceLister), + tasks.TaskFinalizerDel(state, r.Client), // TODO(liubo02): if the finalizer has been removed, no need to update status common.TaskInstanceConditionSynced[scope.TiProxy](state), common.TaskInstanceConditionReady[scope.TiProxy](state), diff --git a/pkg/controllers/tiproxy/tasks/finalizer.go b/pkg/controllers/tiproxy/tasks/finalizer.go new file mode 100644 index 00000000000..9906625c8a7 --- /dev/null +++ b/pkg/controllers/tiproxy/tasks/finalizer.go @@ -0,0 +1,114 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "strconv" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + + "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" + "github.com/pingcap/tidb-operator/v2/pkg/apicall" + "github.com/pingcap/tidb-operator/v2/pkg/client" + "github.com/pingcap/tidb-operator/v2/pkg/controllers/common" + "github.com/pingcap/tidb-operator/v2/pkg/runtime/scope" + "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" +) + +var tiproxyFinalizerSubresourceLister = common.NewSubresourceLister( + common.NewSubresource[corev1.ConfigMapList](), + common.NewSubresource[corev1.PersistentVolumeClaimList](), +) + +func TaskFinalizerDel(state common.ObjectState[*v1alpha1.TiProxy], c client.Client) task.Task { + return task.NameTaskFunc("FinalizerDel", func(ctx context.Context) task.Result { + tiproxy := state.Object() + + pod, err := apicall.GetPod[scope.TiProxy](ctx, c, tiproxy) + if err != nil && !apierrors.IsNotFound(err) { + return task.Fail().With("cannot get pod of tiproxy: %v", err) + } + if pod != nil { + retryAfter, err := drainOrDeletePod(ctx, c, tiproxy, pod) + if err != nil { + return task.Fail().With("cannot delete pod of tiproxy: %v", err) + } + if retryAfter > 0 { + return task.Retry(retryAfter).With("wait for tiproxy pod to be deleted") + } + } + + res, _ := task.RunTask(ctx, common.TaskInstanceFinalizerDel[scope.TiProxy](state, c, tiproxyFinalizerSubresourceLister)) + return res + }) +} + +func drainOrDeletePod(ctx context.Context, c client.Client, tiproxy *v1alpha1.TiProxy, pod *corev1.Pod) (time.Duration, error) { + if !pod.GetDeletionTimestamp().IsZero() { + return task.DefaultRequeueAfter, nil + } + + if raw := tiproxy.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds]; raw != "" { + seconds, err := strconv.ParseInt(raw, 10, 64) + if err != nil || seconds <= 0 { + seconds = 0 + } + if seconds > 0 { + newPod := pod.DeepCopy() + changed := false + + if _, ok := newPod.Labels[v1alpha1.LabelKeyGroup]; ok { + delete(newPod.Labels, v1alpha1.LabelKeyGroup) + changed = true + } + + startAt := time.Now() + if newPod.Annotations == nil { + newPod.Annotations = map[string]string{} + } + if raw := newPod.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownBeginTime]; raw == "" { + newPod.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownBeginTime] = startAt.Format(time.RFC3339Nano) + changed = true + } else if t, err := time.Parse(time.RFC3339Nano, raw); err == nil { + startAt = t + } else { + newPod.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownBeginTime] = startAt.Format(time.RFC3339Nano) + changed = true + } + + if changed { + if err := c.Update(ctx, newPod); err != nil { + return 0, err + } + return task.DefaultRequeueAfter, nil + } + + if remaining := time.Until(startAt.Add(time.Duration(seconds) * time.Second)); remaining > 0 { + if remaining > task.DefaultRequeueAfter { + remaining = task.DefaultRequeueAfter + } + return remaining, nil + } + } + } + + if err := c.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) { + return 0, err + } + return task.DefaultRequeueAfter, nil +} diff --git a/pkg/controllers/tiproxy/tasks/finalizer_test.go b/pkg/controllers/tiproxy/tasks/finalizer_test.go new file mode 100644 index 00000000000..e6aee1aad13 --- /dev/null +++ b/pkg/controllers/tiproxy/tasks/finalizer_test.go @@ -0,0 +1,112 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" + "github.com/pingcap/tidb-operator/v2/pkg/client" + "github.com/pingcap/tidb-operator/v2/pkg/utils/fake" + "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" +) + +func TestTaskFinalizerDelDeletePodImmediatelyByDefault(t *testing.T) { + t.Parallel() + + ctx := context.Background() + tiproxy := deletingTiProxy("") + cluster := fake.FakeObj[v1alpha1.Cluster]("aaa") + pod := fakePod(cluster, tiproxy) + + fc := client.NewFakeClient(tiproxy, pod) + res, done := task.RunTask(ctx, TaskFinalizerDel(&state{tiproxy: tiproxy}, fc)) + + assert.Equal(t, task.SRetry.String(), res.Status().String()) + assert.False(t, done) + + err := fc.Get(ctx, client.ObjectKeyFromObject(pod), &corev1.Pod{}) + assert.True(t, apierrors.IsNotFound(err)) +} + +func TestTaskFinalizerDelDrainPodBeforeDeleting(t *testing.T) { + t.Parallel() + + ctx := context.Background() + tiproxy := deletingTiProxy("3600") + cluster := fake.FakeObj[v1alpha1.Cluster]("aaa") + pod := fakePod(cluster, tiproxy) + + fc := client.NewFakeClient(tiproxy, pod) + res, done := task.RunTask(ctx, TaskFinalizerDel(&state{tiproxy: tiproxy}, fc)) + + assert.Equal(t, task.SRetry.String(), res.Status().String()) + assert.False(t, done) + + actual := &corev1.Pod{} + require.NoError(t, fc.Get(ctx, client.ObjectKeyFromObject(pod), actual)) + _, hasGroupLabel := actual.Labels[v1alpha1.LabelKeyGroup] + assert.False(t, hasGroupLabel) + assert.NotEmpty(t, actual.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownBeginTime]) +} + +func TestTaskFinalizerDelDeletePodAfterDrainDelay(t *testing.T) { + t.Parallel() + + ctx := context.Background() + tiproxy := deletingTiProxy("60") + cluster := fake.FakeObj[v1alpha1.Cluster]("aaa") + pod := fakePod(cluster, tiproxy) + delete(pod.Labels, v1alpha1.LabelKeyGroup) + pod.Annotations = map[string]string{ + v1alpha1.AnnoKeyTiProxyGracefulShutdownBeginTime: time.Now().Add(-2 * time.Minute).Format(time.RFC3339Nano), + } + + fc := client.NewFakeClient(tiproxy, pod) + res, done := task.RunTask(ctx, TaskFinalizerDel(&state{tiproxy: tiproxy}, fc)) + + assert.Equal(t, task.SRetry.String(), res.Status().String()) + assert.False(t, done) + + err := fc.Get(ctx, client.ObjectKeyFromObject(pod), &corev1.Pod{}) + assert.True(t, apierrors.IsNotFound(err)) +} + +func deletingTiProxy(deleteDelaySeconds string) *v1alpha1.TiProxy { + now := metav1.Now() + return fake.FakeObj("aaa-xxx", + fake.DeleteTimestamp[v1alpha1.TiProxy](&now), + fake.AddFinalizer[v1alpha1.TiProxy](), + func(obj *v1alpha1.TiProxy) *v1alpha1.TiProxy { + obj.Spec.Cluster.Name = "aaa" + obj.Spec.Version = fakeVersion + if deleteDelaySeconds != "" { + if obj.Annotations == nil { + obj.Annotations = map[string]string{} + } + obj.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds] = deleteDelaySeconds + } + return obj + }, + ) +} diff --git a/pkg/reloadable/common.go b/pkg/reloadable/common.go index f3d41a72913..ff5ec4f73a9 100644 --- a/pkg/reloadable/common.go +++ b/pkg/reloadable/common.go @@ -85,6 +85,8 @@ func convertLabels(ls map[string]string) map[string]string { func convertAnnotations(ls map[string]string) map[string]string { // ignore defer delete annotation delete(ls, v1alpha1.AnnoKeyDeferDelete) + // ignore tiproxy graceful shutdown delay annotation + delete(ls, v1alpha1.AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds) // ignore boot annotation of pd delete(ls, v1alpha1.AnnoKeyInitialClusterNum) diff --git a/pkg/reloadable/common_test.go b/pkg/reloadable/common_test.go index b4ec00d9004..b7fafefe0f4 100644 --- a/pkg/reloadable/common_test.go +++ b/pkg/reloadable/common_test.go @@ -334,9 +334,10 @@ func TestConvertAnnotations(t *testing.T) { { desc: "ignore keys", in: map[string]string{ - v1alpha1.AnnoKeyInitialClusterNum: "10", - v1alpha1.AnnoKeyDeferDelete: "xxx", - "test": "test", + v1alpha1.AnnoKeyInitialClusterNum: "10", + v1alpha1.AnnoKeyDeferDelete: "xxx", + v1alpha1.AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds: "20", + "test": "test", }, out: map[string]string{ "test": "test", diff --git a/tests/e2e/tiproxy/tiproxy.go b/tests/e2e/tiproxy/tiproxy.go index 9ae5c2cd603..3e56d81cfae 100644 --- a/tests/e2e/tiproxy/tiproxy.go +++ b/tests/e2e/tiproxy/tiproxy.go @@ -29,6 +29,8 @@ import ( "github.com/pingcap/tidb-operator/v2/pkg/runtime/scope" "github.com/pingcap/tidb-operator/v2/tests/e2e/data" "github.com/pingcap/tidb-operator/v2/tests/e2e/framework" + "github.com/pingcap/tidb-operator/v2/tests/e2e/framework/action" + "github.com/pingcap/tidb-operator/v2/tests/e2e/framework/desc" wopt "github.com/pingcap/tidb-operator/v2/tests/e2e/framework/workload" "github.com/pingcap/tidb-operator/v2/tests/e2e/label" "github.com/pingcap/tidb-operator/v2/tests/e2e/utils/cert" @@ -40,6 +42,22 @@ const ( changedConfig = `log.level = 'warn'` ) +func readyTiProxyServiceBackends(ctx context.Context, c client.Client, proxyg *v1alpha1.TiProxyGroup) (int, error) { + endpoints := &corev1.Endpoints{} + if err := c.Get(ctx, client.ObjectKey{ + Namespace: proxyg.Namespace, + Name: proxyg.Name + "-tiproxy", + }, endpoints); err != nil { + return 0, err + } + + count := 0 + for _, subset := range endpoints.Subsets { + count += len(subset.Addresses) + } + return count, nil +} + var _ = ginkgo.Describe("TiProxy", label.TiProxy, func() { f := framework.New() f.Setup() @@ -216,6 +234,174 @@ var _ = ginkgo.Describe("TiProxy", label.TiProxy, func() { f.Must(waiter.WaitForPodsRecreated(ctx, f.Client, runtime.FromTiProxyGroup(proxyg), *changeTime, waiter.LongTaskTimeout)) f.WaitForTiProxyGroupReady(ctx, proxyg) }) + + ginkgo.It("keep service backends during graceful rolling update with large maxSurge", label.Update, func(ctx context.Context) { + o := desc.DefaultOptions() + const replicas int32 = 3 + const deleteDelaySeconds = "20" + + pdg := action.MustCreatePD(ctx, f, o) + kvg := action.MustCreateTiKV(ctx, f, o) + dbg := action.MustCreateTiDB(ctx, f, o) + proxyg := action.MustCreateTiProxy(ctx, f, o, + data.WithReplicas[scope.TiProxyGroup](replicas), + data.WithTiProxyMaxSurge(5), + ) + + f.WaitForPDGroupReady(ctx, pdg) + f.WaitForTiKVGroupReady(ctx, kvg) + f.WaitForTiDBGroupReady(ctx, dbg) + f.WaitForTiProxyGroupReady(ctx, proxyg) + + listTiProxyPods := func() (*corev1.PodList, error) { + pods := &corev1.PodList{} + if err := f.Client.List(ctx, pods, client.InNamespace(proxyg.Namespace), client.MatchingLabels{ + v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, + v1alpha1.LabelKeyCluster: proxyg.Spec.Cluster.Name, + v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentTiProxy, + }); err != nil { + return nil, err + } + return pods, nil + } + + listTiProxies := func() (*v1alpha1.TiProxyList, error) { + tiproxies := &v1alpha1.TiProxyList{} + if err := f.Client.List(ctx, tiproxies, client.InNamespace(proxyg.Namespace), client.MatchingLabels{ + v1alpha1.LabelKeyCluster: proxyg.Spec.Cluster.Name, + v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentTiProxy, + }); err != nil { + return nil, err + } + return tiproxies, nil + } + + initialPods, err := listTiProxyPods() + f.Must(err) + gomega.Expect(initialPods.Items).To(gomega.HaveLen(int(replicas))) + initialPodUIDs := map[string]string{} + for i := range initialPods.Items { + initialPodUIDs[initialPods.Items[i].Name] = string(initialPods.Items[i].UID) + } + + ginkgo.By("Patch TiProxyGroup to enable graceful shutdown delay without restarting pods") + patch := client.MergeFrom(proxyg.DeepCopy()) + if proxyg.Spec.Template.Annotations == nil { + proxyg.Spec.Template.Annotations = map[string]string{} + } + proxyg.Spec.Template.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds] = deleteDelaySeconds + f.Must(f.Client.Patch(ctx, proxyg, patch)) + + gomega.Eventually(func() error { + tiproxies, err := listTiProxies() + if err != nil { + return err + } + if len(tiproxies.Items) != int(replicas) { + return fmt.Errorf("got %d tiproxy instances, want %d", len(tiproxies.Items), replicas) + } + for i := range tiproxies.Items { + if tiproxies.Items[i].Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds] != deleteDelaySeconds { + return fmt.Errorf("tiproxy %s/%s does not have graceful shutdown delay annotation", tiproxies.Items[i].Namespace, tiproxies.Items[i].Name) + } + } + + pods, err := listTiProxyPods() + if err != nil { + return err + } + if len(pods.Items) != int(replicas) { + return fmt.Errorf("got %d tiproxy pods, want %d", len(pods.Items), replicas) + } + for i := range pods.Items { + pod := &pods.Items[i] + uid, ok := initialPodUIDs[pod.Name] + if !ok { + return fmt.Errorf("tiproxy pod %s/%s was recreated unexpectedly", pod.Namespace, pod.Name) + } + if string(pod.UID) != uid { + return fmt.Errorf("tiproxy pod %s/%s uid changed from %s to %s", pod.Namespace, pod.Name, uid, pod.UID) + } + if !pod.DeletionTimestamp.IsZero() { + return fmt.Errorf("tiproxy pod %s/%s is deleting unexpectedly", pod.Namespace, pod.Name) + } + } + return nil + }).WithTimeout(waiter.LongTaskTimeout).WithPolling(waiter.Poll).Should(gomega.Succeed()) + + f.Must(f.Client.Get(ctx, client.ObjectKeyFromObject(proxyg), proxyg)) + + changeTime, err := waiter.MaxPodsCreateTimestamp[scope.TiProxyGroup](ctx, f.Client, proxyg) + f.Must(err) + + ginkgo.By("Patch TiProxyGroup to trigger a rolling restart") + patch = client.MergeFrom(proxyg.DeepCopy()) + proxyg.Spec.Template.Spec.Config = changedConfig + f.Must(f.Client.Patch(ctx, proxyg, patch)) + + var violated error + ginkgo.By("Ensure old pods are drained only after enough new TiProxy backends are ready") + gomega.Eventually(func() error { + if violated != nil { + return violated + } + + pods, err := listTiProxyPods() + if err != nil { + return err + } + + drained := 0 + for i := range pods.Items { + pod := &pods.Items[i] + if _, ok := initialPodUIDs[pod.Name]; !ok { + continue + } + if pod.Labels[v1alpha1.LabelKeyGroup] != proxyg.Name { + if pod.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownBeginTime] == "" { + return fmt.Errorf("drained tiproxy pod %s/%s does not record graceful shutdown begin time", pod.Namespace, pod.Name) + } + drained++ + } + } + if drained == 0 { + return fmt.Errorf("no drained tiproxy pod observed yet") + } + + backends, err := readyTiProxyServiceBackends(ctx, f.Client, proxyg) + if err != nil { + return err + } + if backends < int(replicas) { + violated = fmt.Errorf("tiproxy service backends dropped below desired replicas after drain started: got %d, want >= %d", backends, replicas) + return violated + } + if len(pods.Items) < int(replicas)+2 { + return fmt.Errorf("only %d tiproxy pods exist during graceful rolling update, want at least %d", len(pods.Items), int(replicas)+2) + } + + return nil + }).WithTimeout(waiter.LongTaskTimeout).WithPolling(waiter.Poll).Should(gomega.Succeed()) + + f.Must(waiter.WaitForPodsRecreated(ctx, f.Client, runtime.FromTiProxyGroup(proxyg), *changeTime, waiter.LongTaskTimeout)) + f.WaitForTiProxyGroupReady(ctx, proxyg) + + ginkgo.By("Ensure drained TiProxy instances and pods are eventually deleted") + tiproxies := &v1alpha1.TiProxyList{} + f.Must(f.Client.List(ctx, tiproxies, client.InNamespace(proxyg.Namespace), client.MatchingLabels{ + v1alpha1.LabelKeyCluster: proxyg.Spec.Cluster.Name, + v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentTiProxy, + })) + gomega.Expect(tiproxies.Items).To(gomega.HaveLen(int(replicas))) + + pods := &corev1.PodList{} + f.Must(f.Client.List(ctx, pods, client.InNamespace(proxyg.Namespace), client.MatchingLabels{ + v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, + v1alpha1.LabelKeyCluster: proxyg.Spec.Cluster.Name, + v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentTiProxy, + })) + gomega.Expect(pods.Items).To(gomega.HaveLen(int(replicas))) + }) }) ginkgo.Context("TLS", label.P0, label.FeatureTLS, func() {