From 74bd1d18c6e99a2590d0648f16e92a643ba65a9c Mon Sep 17 00:00:00 2001 From: Harry Li Date: Sun, 19 Apr 2026 15:51:37 +0800 Subject: [PATCH] patcher: avoid stale resourceVersion conflicts on object updates --- pkg/patcher/apply.go | 51 ++++++++++++++++++------- pkg/patcher/apply_test.go | 79 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 14 deletions(-) diff --git a/pkg/patcher/apply.go b/pkg/patcher/apply.go index f562046875..2bdd02ac6a 100644 --- a/pkg/patcher/apply.go +++ b/pkg/patcher/apply.go @@ -15,6 +15,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/events" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -226,14 +227,16 @@ func applyObjectWithPatch(ctx *synccontext.SyncContext, objPatch patch.Patch, ob // check if we should create or update the object isUpdate := false + currentObj := obj.DeepCopyObject().(client.Object) err := kubeClient.Get(ctx, types.NamespacedName{ Namespace: obj.GetNamespace(), Name: obj.GetName(), - }, obj.DeepCopyObject().(client.Object)) + }, currentObj) if err != nil && !kerrors.IsNotFound(err) { return fmt.Errorf("get object: %w", err) } else if err == nil { isUpdate = true + obj = currentObj } // we cannot create a status only object @@ -264,22 +267,42 @@ func applyObjectWithPatch(ctx *synccontext.SyncContext, objPatch patch.Patch, ob // create / update afterObj := obj.DeepCopyObject().(client.Object) - if isStatus { - err = kubeClient.Status().Update(ctx, obj) + if isUpdate { + err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + latestObj := afterObj.DeepCopyObject().(client.Object) + if err := kubeClient.Get(ctx, types.NamespacedName{ + Namespace: afterObj.GetNamespace(), + Name: afterObj.GetName(), + }, latestObj); err != nil { + return err + } + + if err := objPatch.Apply(latestObj); err != nil { + return fmt.Errorf("apply patch: %w", err) + } + + if isStatus { + return kubeClient.Status().Update(ctx, latestObj) + } + return kubeClient.Update(ctx, latestObj) + }) if err != nil { - return fmt.Errorf("update object status: %w", err) + if isStatus { + return fmt.Errorf("update object status: %w", err) + } + return fmt.Errorf("update object: %w", err) + } + + if err := kubeClient.Get(ctx, types.NamespacedName{ + Namespace: afterObj.GetNamespace(), + Name: afterObj.GetName(), + }, afterObj); err != nil { + return fmt.Errorf("get updated object: %w", err) } } else { - if isUpdate { - err = kubeClient.Update(ctx, obj) - if err != nil { - return fmt.Errorf("update object: %w", err) - } - } else { - err = kubeClient.Create(ctx, obj) - if err != nil { - return fmt.Errorf("create object: %w", err) - } + err = kubeClient.Create(ctx, obj) + if err != nil { + return fmt.Errorf("create object: %w", err) } } diff --git a/pkg/patcher/apply_test.go b/pkg/patcher/apply_test.go index 2779438ebe..d896717a5b 100644 --- a/pkg/patcher/apply_test.go +++ b/pkg/patcher/apply_test.go @@ -1,11 +1,18 @@ package patcher import ( + "context" "encoding/json" + "fmt" "testing" + "github.com/loft-sh/vcluster/pkg/scheme" + "github.com/loft-sh/vcluster/pkg/syncer/synccontext" + testingutil "github.com/loft-sh/vcluster/pkg/util/testing" corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -214,3 +221,75 @@ func wantFieldValues(field string, want map[string]string) func(t *testing.T, go } } } + +type conflictOnceClient struct { + client.Client + + firstUpdate bool +} + +func (c *conflictOnceClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + if !c.firstUpdate { + c.firstUpdate = true + + current := &corev1.ConfigMap{} + if err := c.Client.Get(ctx, client.ObjectKeyFromObject(obj), current); err != nil { + return err + } + current.Labels = map[string]string{"external": "true"} + if err := c.Client.Update(ctx, current, opts...); err != nil { + return err + } + + return kerrors.NewConflict(schema.GroupResource{Group: "", Resource: "configmaps"}, obj.GetName(), fmt.Errorf("simulated conflict")) + } + + return c.Client.Update(ctx, obj, opts...) +} + +func TestApplyObjectRetriesConflictWithLatestObject(t *testing.T) { + t.Helper() + + virtualClient := testingutil.NewFakeClient(scheme.Scheme, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "example", + Namespace: "default", + Labels: map[string]string{ + "initial": "true", + }, + }, + }) + + syncCtx := &synccontext.SyncContext{ + Context: context.Background(), + VirtualClient: &conflictOnceClient{Client: virtualClient}, + } + + before := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "example", + Namespace: "default", + Labels: map[string]string{ + "initial": "true", + }, + }, + } + after := before.DeepCopy() + after.Labels["synced"] = "true" + + if err := ApplyObject(syncCtx, before, after, synccontext.SyncHostToVirtual, false); err != nil { + t.Fatalf("ApplyObject() error = %v", err) + } + + updated := &corev1.ConfigMap{} + if err := virtualClient.Get(context.Background(), client.ObjectKey{Name: "example", Namespace: "default"}, updated); err != nil { + t.Fatalf("Get() error = %v", err) + } + + if updated.Labels["synced"] != "true" { + t.Fatalf("expected synced label to be preserved after retry, got labels: %#v", updated.Labels) + } + if updated.Labels["external"] != "true" { + t.Fatalf("expected external label from concurrent update to survive retry, got labels: %#v", updated.Labels) + } +}