From 9e6971c5c467ee6d1ca075887adcc0718d9dbf51 Mon Sep 17 00:00:00 2001 From: Allen Ray Date: Thu, 14 May 2026 11:54:17 -0400 Subject: [PATCH 1/8] backend: refactor defrag to minimize database lock time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split the monolithic defrag operation into three phases to allow writes to continue during the slow bulk copy: Phase 1 (unlocked): Commit pending writes, take a bbolt MVCC snapshot, install a write journal on batchTxBuffered, then unlock. The bulk copy runs against the snapshot while normal writes continue and are captured by the journal. Phase 2 (locked): Lock batchTx, drain the journal, replay delta writes into the temp DB. This is fast since it's only the writes that occurred during the copy. Phase 3 (locked): Atomic switchover — close old DB, rename temp file, reopen, create new transactions, unlock. Write blocking duration changes from O(db_size) to O(writes_during_copy) + fixed overhead for close/rename/reopen. Co-Authored-By: Claude Opus 4.6 (1M context) --- server/storage/backend/backend.go | 198 ++++++++++++------ server/storage/backend/backend_test.go | 127 +++++++++++ server/storage/backend/batch_tx.go | 20 ++ server/storage/backend/defrag_journal.go | 122 +++++++++++ server/storage/backend/defrag_journal_test.go | 88 ++++++++ 5 files changed, 496 insertions(+), 59 deletions(-) create mode 100644 server/storage/backend/defrag_journal.go create mode 100644 server/storage/backend/defrag_journal_test.go diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index 275064f083b2..36f4d9f1fc24 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -15,6 +15,7 @@ package backend import ( + "errors" "fmt" "hash/crc32" "io" @@ -28,6 +29,7 @@ import ( "go.uber.org/zap" bolt "go.etcd.io/bbolt" + bolterrors "go.etcd.io/bbolt/errors" "go.etcd.io/etcd/client/pkg/v3/verify" ) @@ -469,20 +471,6 @@ func (b *backend) defrag() error { isDefragActive.Set(1) defer isDefragActive.Set(0) - // TODO: make this non-blocking? - // lock batchTx to ensure nobody is using previous tx, and then - // close previous ongoing tx. - b.batchTx.LockOutsideApply() - defer b.batchTx.Unlock() - - // lock database after lock tx to avoid deadlock. - b.mu.Lock() - defer b.mu.Unlock() - - // block concurrent read requests while resetting tx - b.readTx.Lock() - defer b.readTx.Unlock() - // Create a temporary file to ensure we start with a clean slate. // Snapshotter.cleanupSnapdir cleans up any of these that are found during startup. dir := filepath.Dir(b.db.Path()) @@ -500,27 +488,23 @@ func (b *backend) defrag() error { // return nil, fmt.Errorf(defragOpenFileError) return temp, nil } - // Don't load tmp db into memory regardless of opening options options.Mlock = false tdbp := temp.Name() tmpdb, err := bolt.Open(tdbp, 0o600, &options) if err != nil { temp.Close() if rmErr := os.Remove(temp.Name()); rmErr != nil { - b.lg.Error( - "failed to remove temporary file", + b.lg.Error("failed to remove temporary file", zap.String("path", temp.Name()), zap.Error(rmErr), ) } - return err } dbp := b.db.Path() size1, sizeInUse1 := b.Size(), b.SizeInUse() - b.lg.Info( - "defragmenting", + b.lg.Info("defragmenting", zap.String("path", dbp), zap.Int64("current-db-size-bytes", size1), zap.String("current-db-size", humanize.Bytes(uint64(size1))), @@ -528,35 +512,78 @@ func (b *backend) defrag() error { zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))), ) - defer func() { - // NOTE: We should exit as soon as possible because that tx - // might be closed. The inflight request might use invalid - // tx and then panic as well. The real panic reason might be - // shadowed by new panic. So, we should fatal here with lock. - if rerr := recover(); rerr != nil { - b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr)) - } - }() + // ============================================================ + // PHASE 1: Commit pending writes, take a snapshot, install the + // journal, then unlock so writes can continue during the copy. + // ============================================================ + b.batchTx.LockOutsideApply() - // Commit/stop and then reset current transactions (including the readTx) - b.batchTx.unsafeCommit(true) - b.batchTx.tx = nil + b.batchTx.commit(false) + + b.mu.RLock() + snapTx, snapErr := b.db.Begin(false) + b.mu.RUnlock() + if snapErr != nil { + b.batchTx.Unlock() + tmpdb.Close() + os.Remove(tdbp) + return fmt.Errorf("failed to begin snapshot tx for defrag: %w", snapErr) + } + + journal := newDefragJournal() + b.batchTx.defragJournal = journal + b.batchTx.Unlock() + + b.lg.Info("defrag: copying data (writes unlocked)") // gofail: var defragBeforeCopy struct{} - err = defragdb(b.db, tmpdb, defragLimit) + err = defragFromTx(snapTx, tmpdb, defragLimit) + snapTx.Rollback() + if err != nil { + b.batchTx.LockOutsideApply() + b.batchTx.defragJournal = nil + b.batchTx.Unlock() + journal.close() tmpdb.Close() - if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil { - b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr)) - } + os.RemoveAll(tdbp) + return err + } + + // ============================================================ + // PHASE 2: Lock writes, drain and replay the journal into the + // temp DB. This should be fast since it's only the delta. + // ============================================================ + b.lg.Info("defrag: replaying journal") - // restore the bbolt transactions if defragmentation fails - b.batchTx.tx = b.unsafeBegin(true) - b.readTx.tx = b.unsafeBegin(false) + b.batchTx.LockOutsideApply() + b.batchTx.defragJournal = nil + journal.close() + ops := journal.drain() + + if len(ops) > 0 { + b.lg.Info("defrag: replaying journal ops", zap.Int("count", len(ops))) + } + err = replayJournal(tmpdb, ops, defragLimit) + if err != nil { + b.batchTx.Unlock() + tmpdb.Close() + os.RemoveAll(tdbp) return err } + // ============================================================ + // PHASE 3: Atomic switchover — close old db, rename, reopen. + // ============================================================ + b.lg.Info("defrag: switching database") + + b.mu.Lock() + b.readTx.Lock() + + b.batchTx.unsafeCommit(true) + b.batchTx.tx = nil + err = b.db.Close() if err != nil { b.lg.Fatal("failed to close database", zap.Error(err)) @@ -585,12 +612,15 @@ func (b *backend) defrag() error { atomic.StoreInt64(&b.size, size) atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize))) + b.readTx.Unlock() + b.mu.Unlock() + b.batchTx.Unlock() + took := time.Since(now) defragSec.Observe(took.Seconds()) size2, sizeInUse2 := b.Size(), b.SizeInUse() - b.lg.Info( - "finished defragmenting directory", + b.lg.Info("finished defragmenting directory", zap.String("path", dbp), zap.Int64("current-db-size-bytes-diff", size2-size1), zap.Int64("current-db-size-bytes", size2), @@ -603,11 +633,7 @@ func (b *backend) defrag() error { return nil } -func defragdb(odb, tmpdb *bolt.DB, limit int) error { - // gofail: var defragdbFail string - // return fmt.Errorf(defragdbFail) - - // open a tx on tmpdb for writes +func defragFromTx(srcTx *bolt.Tx, tmpdb *bolt.DB, limit int) error { tmptx, err := tmpdb.Begin(true) if err != nil { return err @@ -618,18 +644,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { } }() - // open a tx on old db for read - tx, err := odb.Begin(false) - if err != nil { - return err - } - defer tx.Rollback() - - c := tx.Cursor() - + c := srcTx.Cursor() count := 0 for next, _ := c.First(); next != nil; next, _ = c.Next() { - b := tx.Bucket(next) + b := srcTx.Bucket(next) if b == nil { return fmt.Errorf("backend: cannot defrag bucket %s", next) } @@ -638,7 +656,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { if berr != nil { return berr } - tmpb.FillPercent = 0.9 // for bucket2seq write in for each + tmpb.FillPercent = 0.9 if err = b.ForEach(func(k, v []byte) error { count++ @@ -652,8 +670,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { return err } tmpb = tmptx.Bucket(next) - tmpb.FillPercent = 0.9 // for bucket2seq write in for each - + tmpb.FillPercent = 0.9 count = 0 } return tmpb.Put(k, v) @@ -665,6 +682,69 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { return tmptx.Commit() } +func replayJournal(tmpdb *bolt.DB, ops []defragJournalOp, limit int) error { + if len(ops) == 0 { + return nil + } + + tx, err := tmpdb.Begin(true) + if err != nil { + return err + } + defer func() { + if err != nil { + tx.Rollback() + } + }() + + count := 0 + for _, op := range ops { + count++ + if count > limit { + if err = tx.Commit(); err != nil { + return err + } + tx, err = tmpdb.Begin(true) + if err != nil { + return err + } + count = 0 + } + + switch op.opType { + case opCreateBucket: + if _, err = tx.CreateBucketIfNotExists(op.bucketName); err != nil { + return fmt.Errorf("replay: create bucket %s: %w", op.bucketName, err) + } + case opDeleteBucket: + if delErr := tx.DeleteBucket(op.bucketName); delErr != nil && !errors.Is(delErr, bolterrors.ErrBucketNotFound) { + return fmt.Errorf("replay: delete bucket %s: %w", op.bucketName, delErr) + } + case opPut: + b := tx.Bucket(op.bucketName) + if b == nil { + return fmt.Errorf("replay: bucket %s not found for put", op.bucketName) + } + if op.seq { + b.FillPercent = 0.9 + } + if err = b.Put(op.key, op.value); err != nil { + return fmt.Errorf("replay: put in bucket %s: %w", op.bucketName, err) + } + case opDelete: + b := tx.Bucket(op.bucketName) + if b == nil { + continue + } + if err = b.Delete(op.key); err != nil { + return fmt.Errorf("replay: delete from bucket %s: %w", op.bucketName, err) + } + } + } + + return tx.Commit() +} + func (b *backend) begin(write bool) *bolt.Tx { b.mu.RLock() tx := b.unsafeBegin(write) diff --git a/server/storage/backend/backend_test.go b/server/storage/backend/backend_test.go index fc024b88bc1f..4adeb24f4244 100644 --- a/server/storage/backend/backend_test.go +++ b/server/storage/backend/backend_test.go @@ -18,6 +18,7 @@ import ( "fmt" "os" "reflect" + "sync" "testing" "time" @@ -349,3 +350,129 @@ func TestBackendWritebackForEach(t *testing.T) { t.Fatalf("expected %q, got %q", seq, partialSeq) } } + +func TestBackendDefragConcurrentWrites(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 1000; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("pre_%04d", i)), []byte("original")) + } + tx.Unlock() + b.ForceCommit() + + // delete some keys to create reclaimable space + tx.Lock() + for i := 0; i < 500; i++ { + tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("pre_%04d", i))) + } + tx.Unlock() + b.ForceCommit() + + // write concurrently during defrag + var wg sync.WaitGroup + stop := make(chan struct{}) + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; ; i++ { + select { + case <-stop: + return + default: + } + tx := b.BatchTx() + tx.Lock() + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("concurrent_%04d", i)), []byte("new")) + tx.Unlock() + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + + require.NoError(t, err) + + // verify pre-existing keys that weren't deleted still exist + rtx := b.BatchTx() + rtx.Lock() + for i := 500; i < 1000; i++ { + keys, vals := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("pre_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key pre_%04d should exist", i) + assert.Equal(t, []byte("original"), vals[0]) + } + // verify deleted keys are gone + for i := 0; i < 500; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("pre_%04d", i)), nil, 0) + assert.Emptyf(t, keys, "key pre_%04d should be deleted", i) + } + rtx.Unlock() + + // verify at least some concurrent writes are present + b.ForceCommit() + rtx.Lock() + var found int + for i := 0; i < 10000; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("concurrent_%04d", i)), nil, 0) + if len(keys) > 0 { + found++ + } + } + rtx.Unlock() + assert.Greater(t, found, 0, "at least some concurrent writes should be present after defrag") +} + +func TestBackendDefragDeletesDuringDefrag(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 100; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + // delete keys concurrently during defrag + var wg sync.WaitGroup + stop := make(chan struct{}) + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 50; i++ { + select { + case <-stop: + return + default: + } + tx := b.BatchTx() + tx.Lock() + tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("key_%04d", i))) + tx.Unlock() + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + + require.NoError(t, err) + + // keys 50-99 should still exist + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 50; i < 100; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist", i) + } + rtx.Unlock() +} diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 5af557cb4283..50c80de12708 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -291,6 +291,7 @@ type batchTxBuffered struct { batchTx buf txWriteBuffer pendingDeleteOperations int + defragJournal *defragJournal } func newBatchTxBuffered(backend *backend) *batchTxBuffered { @@ -385,22 +386,41 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) { } } +func (t *batchTxBuffered) UnsafeCreateBucket(bucket Bucket) { + t.batchTx.UnsafeCreateBucket(bucket) + if j := t.defragJournal; j != nil { + j.appendCreateBucket(bucket.Name()) + } +} + func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) { t.batchTx.UnsafePut(bucket, key, value) t.buf.put(bucket, key, value) + if j := t.defragJournal; j != nil { + j.appendPut(bucket.Name(), key, value, false) + } } func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) { t.batchTx.UnsafeSeqPut(bucket, key, value) t.buf.putSeq(bucket, key, value) + if j := t.defragJournal; j != nil { + j.appendPut(bucket.Name(), key, value, true) + } } func (t *batchTxBuffered) UnsafeDelete(bucketType Bucket, key []byte) { t.batchTx.UnsafeDelete(bucketType, key) t.pendingDeleteOperations++ + if j := t.defragJournal; j != nil { + j.appendDelete(bucketType.Name(), key) + } } func (t *batchTxBuffered) UnsafeDeleteBucket(bucket Bucket) { t.batchTx.UnsafeDeleteBucket(bucket) t.pendingDeleteOperations++ + if j := t.defragJournal; j != nil { + j.appendDeleteBucket(bucket.Name()) + } } diff --git a/server/storage/backend/defrag_journal.go b/server/storage/backend/defrag_journal.go new file mode 100644 index 000000000000..940891b7df25 --- /dev/null +++ b/server/storage/backend/defrag_journal.go @@ -0,0 +1,122 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +import "sync" + +type defragOpType uint8 + +const ( + opPut defragOpType = iota + opDelete + opCreateBucket + opDeleteBucket +) + +type defragJournalOp struct { + opType defragOpType + bucketName []byte + key []byte + value []byte + seq bool +} + +type defragJournal struct { + mu sync.Mutex + ops []defragJournalOp + closed bool +} + +func newDefragJournal() *defragJournal { + return &defragJournal{ + ops: make([]defragJournalOp, 0, 1024), + } +} + +func (j *defragJournal) appendPut(bucketName, key, value []byte, seq bool) { + j.mu.Lock() + defer j.mu.Unlock() + if j.closed { + return + } + j.ops = append(j.ops, defragJournalOp{ + opType: opPut, + bucketName: cloneBytes(bucketName), + key: cloneBytes(key), + value: cloneBytes(value), + seq: seq, + }) +} + +func (j *defragJournal) appendDelete(bucketName, key []byte) { + j.mu.Lock() + defer j.mu.Unlock() + if j.closed { + return + } + j.ops = append(j.ops, defragJournalOp{ + opType: opDelete, + bucketName: cloneBytes(bucketName), + key: cloneBytes(key), + }) +} + +func (j *defragJournal) appendCreateBucket(bucketName []byte) { + j.mu.Lock() + defer j.mu.Unlock() + if j.closed { + return + } + j.ops = append(j.ops, defragJournalOp{ + opType: opCreateBucket, + bucketName: cloneBytes(bucketName), + }) +} + +func (j *defragJournal) appendDeleteBucket(bucketName []byte) { + j.mu.Lock() + defer j.mu.Unlock() + if j.closed { + return + } + j.ops = append(j.ops, defragJournalOp{ + opType: opDeleteBucket, + bucketName: cloneBytes(bucketName), + }) +} + +// drain returns all accumulated ops and resets the journal. +func (j *defragJournal) drain() []defragJournalOp { + j.mu.Lock() + defer j.mu.Unlock() + ops := j.ops + j.ops = nil + return ops +} + +func (j *defragJournal) close() { + j.mu.Lock() + defer j.mu.Unlock() + j.closed = true +} + +func cloneBytes(b []byte) []byte { + if b == nil { + return nil + } + cp := make([]byte, len(b)) + copy(cp, b) + return cp +} diff --git a/server/storage/backend/defrag_journal_test.go b/server/storage/backend/defrag_journal_test.go new file mode 100644 index 000000000000..b5e8ccae3c86 --- /dev/null +++ b/server/storage/backend/defrag_journal_test.go @@ -0,0 +1,88 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDefragJournalAppendAndDrain(t *testing.T) { + j := newDefragJournal() + + j.appendCreateBucket([]byte("bucket1")) + j.appendPut([]byte("bucket1"), []byte("key1"), []byte("val1"), false) + j.appendPut([]byte("bucket1"), []byte("key2"), []byte("val2"), true) + j.appendDelete([]byte("bucket1"), []byte("key1")) + j.appendDeleteBucket([]byte("bucket1")) + + ops := j.drain() + require.Len(t, ops, 5) + + assert.Equal(t, opCreateBucket, ops[0].opType) + assert.Equal(t, []byte("bucket1"), ops[0].bucketName) + + assert.Equal(t, opPut, ops[1].opType) + assert.Equal(t, []byte("key1"), ops[1].key) + assert.Equal(t, []byte("val1"), ops[1].value) + assert.False(t, ops[1].seq) + + assert.Equal(t, opPut, ops[2].opType) + assert.Equal(t, []byte("key2"), ops[2].key) + assert.True(t, ops[2].seq) + + assert.Equal(t, opDelete, ops[3].opType) + assert.Equal(t, []byte("key1"), ops[3].key) + + assert.Equal(t, opDeleteBucket, ops[4].opType) + assert.Equal(t, []byte("bucket1"), ops[4].bucketName) + + // drain again should be empty + ops = j.drain() + assert.Empty(t, ops) +} + +func TestDefragJournalClose(t *testing.T) { + j := newDefragJournal() + + j.appendPut([]byte("b"), []byte("k"), []byte("v"), false) + j.close() + + // appends after close are no-ops + j.appendPut([]byte("b"), []byte("k2"), []byte("v2"), false) + j.appendDelete([]byte("b"), []byte("k")) + j.appendCreateBucket([]byte("b2")) + j.appendDeleteBucket([]byte("b")) + + ops := j.drain() + require.Len(t, ops, 1) + assert.Equal(t, []byte("k"), ops[0].key) +} + +func TestDefragJournalDeepCopies(t *testing.T) { + j := newDefragJournal() + + key := []byte("original") + j.appendPut([]byte("b"), key, []byte("v"), false) + + // mutate the source slice + key[0] = 'X' + + ops := j.drain() + require.Len(t, ops, 1) + assert.Equal(t, []byte("original"), ops[0].key, "journal should deep-copy byte slices") +} From 481ed2d098576369d06b5e00d5baec853e07f8bc Mon Sep 17 00:00:00 2001 From: Allen Ray Date: Thu, 14 May 2026 13:15:48 -0400 Subject: [PATCH 2/8] backend: add defrag tests for concurrency, availability, and consistency Add tests that validate the non-blocking defrag implementation: - TestBackendDefragConcurrentReads: reads work throughout defrag - TestBackendDefragWriteAvailability: writes stay unblocked during copy - TestBackendDefragConcurrentReadsAndWrites: simultaneous readers/writers - TestBackendDefragLogicalConsistency: hash + key-by-key verification Co-Authored-By: Claude Opus 4.6 (1M context) --- server/storage/backend/backend_test.go | 244 +++++++++++++++++++++++++ 1 file changed, 244 insertions(+) diff --git a/server/storage/backend/backend_test.go b/server/storage/backend/backend_test.go index 4adeb24f4244..71816b5668d5 100644 --- a/server/storage/backend/backend_test.go +++ b/server/storage/backend/backend_test.go @@ -427,6 +427,186 @@ func TestBackendDefragConcurrentWrites(t *testing.T) { assert.Greater(t, found, 0, "at least some concurrent writes should be present after defrag") } +func TestBackendDefragConcurrentReads(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 1000; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + var wg sync.WaitGroup + stop := make(chan struct{}) + readErrors := make(chan error, 100) + + // concurrent reader + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + rtx := b.ConcurrentReadTx() + rtx.RLock() + keys, _ := rtx.UnsafeRange(schema.Test, []byte("key_0500"), nil, 0) + if len(keys) == 0 { + readErrors <- fmt.Errorf("key_0500 not found during defrag") + } + rtx.RUnlock() + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + close(readErrors) + + require.NoError(t, err) + for readErr := range readErrors { + t.Error(readErr) + } +} + +func TestBackendDefragWriteAvailability(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < backend.DefragLimitForTest()+100; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%06d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + // delete half the keys to make defrag meaningful + tx.Lock() + for i := 0; i < backend.DefragLimitForTest()/2; i++ { + tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("key_%06d", i))) + } + tx.Unlock() + b.ForceCommit() + + // track write latencies during defrag + var wg sync.WaitGroup + stop := make(chan struct{}) + var maxWriteLatency time.Duration + var mu sync.Mutex + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; ; i++ { + select { + case <-stop: + return + default: + } + start := time.Now() + wtx := b.BatchTx() + wtx.Lock() + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("during_%06d", i)), []byte("val")) + wtx.Unlock() + elapsed := time.Since(start) + + mu.Lock() + if elapsed > maxWriteLatency { + maxWriteLatency = elapsed + } + mu.Unlock() + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + + require.NoError(t, err) + t.Logf("max write latency during defrag: %v", maxWriteLatency) + // The max latency should be well under a second since the copy phase + // doesn't hold the batchTx lock. Writes only block during the brief + // replay + switchover phases. + assert.Less(t, maxWriteLatency, 5*time.Second, + "write latency during defrag should be bounded") +} + +func TestBackendDefragConcurrentReadsAndWrites(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 500; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("original")) + } + tx.Unlock() + b.ForceCommit() + + var wg sync.WaitGroup + stop := make(chan struct{}) + + // concurrent writer + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; ; i++ { + select { + case <-stop: + return + default: + } + wtx := b.BatchTx() + wtx.Lock() + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("write_%04d", i)), []byte("new")) + wtx.Unlock() + } + }() + + // concurrent reader using single-key lookups (schema.Test is not a safe-range bucket) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + rtx := b.ConcurrentReadTx() + rtx.RLock() + rtx.UnsafeRange(schema.Test, []byte("key_0250"), nil, 0) + rtx.RUnlock() + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + + require.NoError(t, err) + + // verify original data survived + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 0; i < 500; i++ { + keys, vals := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist", i) + assert.Equal(t, []byte("original"), vals[0]) + } + rtx.Unlock() +} + func TestBackendDefragDeletesDuringDefrag(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, b) @@ -476,3 +656,67 @@ func TestBackendDefragDeletesDuringDefrag(t *testing.T) { } rtx.Unlock() } + +func TestBackendDefragLogicalConsistency(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafeCreateBucket(schema.Key) + for i := 0; i < 500; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("test_%04d", i)), []byte(fmt.Sprintf("val_%04d", i))) + } + for i := 0; i < 300; i++ { + tx.UnsafeSeqPut(schema.Key, []byte(fmt.Sprintf("rev_%06d", i)), []byte(fmt.Sprintf("data_%06d", i))) + } + tx.Unlock() + b.ForceCommit() + + // delete some keys to create fragmentation + tx.Lock() + for i := 0; i < 200; i++ { + tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("test_%04d", i))) + } + tx.Unlock() + b.ForceCommit() + + // capture logical hash before defrag + hashBefore, err := b.Hash(nil) + require.NoError(t, err) + sizeBefore := b.Size() + + err = b.Defrag() + require.NoError(t, err) + + // hash must be identical — same logical content + hashAfter, err := b.Hash(nil) + require.NoError(t, err) + assert.Equal(t, hashBefore, hashAfter, "logical hash must match after defrag") + + // physical size should have shrunk + sizeAfter := b.Size() + assert.Less(t, sizeAfter, sizeBefore, "defrag should reclaim space") + + // verify all surviving keys individually + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 200; i < 500; i++ { + keys, vals := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("test_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "test_%04d should exist", i) + assert.Equal(t, []byte(fmt.Sprintf("val_%04d", i)), vals[0]) + } + for i := 0; i < 300; i++ { + keys, vals := rtx.UnsafeRange(schema.Key, []byte(fmt.Sprintf("rev_%06d", i)), nil, 0) + require.Lenf(t, keys, 1, "rev_%06d should exist", i) + assert.Equal(t, []byte(fmt.Sprintf("data_%06d", i)), vals[0]) + } + // deleted keys must be gone + for i := 0; i < 200; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("test_%04d", i)), nil, 0) + assert.Emptyf(t, keys, "test_%04d should be deleted", i) + } + rtx.Unlock() +} From 853fb9aaabf6137df597768ef2b4c42403bf6d3c Mon Sep 17 00:00:00 2001 From: Allen Ray Date: Thu, 14 May 2026 13:20:20 -0400 Subject: [PATCH 3/8] backend: add edge case defrag tests - TestBackendDefragOverwriteDuringCopy: key overwritten during copy gets the new value after replay - TestBackendDefragNewBucketDuringCopy: bucket created during copy is present after defrag with its data - TestBackendDefragMultipleSequential: 3 sequential defrags with mutations between each, hash verified after every round - TestBackendDefragEmptyDatabase: defrag on an empty db is a no-op - TestBackendDefragLargeJournalReplay: enough writes during copy to exercise the batched commit path in replayJournal Co-Authored-By: Claude Opus 4.6 (1M context) --- server/storage/backend/backend_test.go | 253 +++++++++++++++++++++++++ 1 file changed, 253 insertions(+) diff --git a/server/storage/backend/backend_test.go b/server/storage/backend/backend_test.go index 71816b5668d5..2a632688cf23 100644 --- a/server/storage/backend/backend_test.go +++ b/server/storage/backend/backend_test.go @@ -720,3 +720,256 @@ func TestBackendDefragLogicalConsistency(t *testing.T) { } rtx.Unlock() } + +func TestBackendDefragOverwriteDuringCopy(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 100; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("original")) + } + tx.Unlock() + b.ForceCommit() + + // overwrite keys concurrently during defrag + var wg sync.WaitGroup + stop := make(chan struct{}) + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + wtx := b.BatchTx() + wtx.Lock() + for i := 0; i < 50; i++ { + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("updated")) + } + wtx.Unlock() + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + require.NoError(t, err) + + // all 50 overwritten keys must have the new value + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 0; i < 50; i++ { + keys, vals := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist", i) + assert.Equalf(t, []byte("updated"), vals[0], "key_%04d should have updated value", i) + } + // keys 50-99 should still have original value + for i := 50; i < 100; i++ { + keys, vals := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist", i) + assert.Equalf(t, []byte("original"), vals[0], "key_%04d should have original value", i) + } + rtx.Unlock() +} + +func TestBackendDefragNewBucketDuringCopy(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 100; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + // create a new bucket and write to it during defrag + var wg sync.WaitGroup + done := make(chan struct{}) + + wg.Add(1) + go func() { + defer wg.Done() + wtx := b.BatchTx() + wtx.Lock() + wtx.UnsafeCreateBucket(schema.Key) + for i := 0; i < 50; i++ { + wtx.UnsafeSeqPut(schema.Key, []byte(fmt.Sprintf("newkey_%04d", i)), []byte("newval")) + } + wtx.Unlock() + close(done) + }() + + err := b.Defrag() + <-done + wg.Wait() + require.NoError(t, err) + + // verify original bucket data + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 0; i < 100; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist in Test bucket", i) + } + // verify new bucket and its data exist + for i := 0; i < 50; i++ { + keys, vals := rtx.UnsafeRange(schema.Key, []byte(fmt.Sprintf("newkey_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "newkey_%04d should exist in Key bucket", i) + assert.Equal(t, []byte("newval"), vals[0]) + } + rtx.Unlock() +} + +func TestBackendDefragMultipleSequential(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 200; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + for round := 0; round < 3; round++ { + // delete some keys before each defrag + tx.Lock() + for i := round * 20; i < (round+1)*20; i++ { + tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("key_%04d", i))) + } + tx.Unlock() + b.ForceCommit() + + hashBefore, err := b.Hash(nil) + require.NoError(t, err) + + err = b.Defrag() + require.NoErrorf(t, err, "defrag round %d failed", round) + + hashAfter, err := b.Hash(nil) + require.NoError(t, err) + assert.Equalf(t, hashBefore, hashAfter, "hash mismatch after defrag round %d", round) + + // add new keys after each defrag + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 10; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("round%d_%04d", round, i)), []byte("new")) + } + tx.Unlock() + b.ForceCommit() + } + + // verify round keys from all 3 rounds exist + rtx := b.BatchTx() + rtx.Lock() + for round := 0; round < 3; round++ { + for i := 0; i < 10; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("round%d_%04d", round, i)), nil, 0) + require.Lenf(t, keys, 1, "round%d_%04d should exist", round, i) + } + } + rtx.Unlock() +} + +func TestBackendDefragEmptyDatabase(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + err := b.Defrag() + require.NoError(t, err) + + // verify we can still use the database after defragging an empty one + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("after_defrag"), []byte("works")) + tx.Unlock() + b.ForceCommit() + + rtx := b.BatchTx() + rtx.Lock() + keys, vals := rtx.UnsafeRange(schema.Test, []byte("after_defrag"), nil, 0) + require.Len(t, keys, 1) + assert.Equal(t, []byte("works"), vals[0]) + rtx.Unlock() +} + +func TestBackendDefragLargeJournalReplay(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 100; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("pre_%04d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + // write enough keys during defrag to exceed defragLimit (10000) in the replay, + // exercising the batched commit path in replayJournal + var wg sync.WaitGroup + stop := make(chan struct{}) + totalWritten := make(chan int, 1) + + wg.Add(1) + go func() { + defer wg.Done() + var count int + for i := 0; ; i++ { + select { + case <-stop: + totalWritten <- count + return + default: + } + wtx := b.BatchTx() + wtx.Lock() + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("journal_%06d", i)), []byte("jval")) + wtx.Unlock() + count++ + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + written := <-totalWritten + + require.NoError(t, err) + t.Logf("journal ops written during defrag: %d", written) + + // verify pre-existing keys survived + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 0; i < 100; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("pre_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "pre_%04d should exist", i) + } + // verify at least some journal keys exist + var found int + for i := 0; i < written; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("journal_%06d", i)), nil, 0) + if len(keys) > 0 { + found++ + } + } + rtx.Unlock() + assert.Greater(t, found, 0, "journal writes should be present after defrag") +} From 88a9d877701b9f1ea4f9eb9b7d9f11b7c2149037 Mon Sep 17 00:00:00 2001 From: Allen Ray Date: Thu, 14 May 2026 14:17:44 -0400 Subject: [PATCH 4/8] backend: restore panic recovery during defrag switchover If a panic occurs after batchTx.tx is set to nil during the Phase 3 switchover, inflight requests could use the invalid tx and produce secondary panics that shadow the real cause. Restore the deferred recover that catches the original panic and calls Fatal. Co-Authored-By: Claude Opus 4.6 (1M context) --- server/storage/backend/backend.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index 36f4d9f1fc24..ba505b9d9464 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -581,6 +581,16 @@ func (b *backend) defrag() error { b.mu.Lock() b.readTx.Lock() + // NOTE: We should exit as soon as possible because that tx + // might be closed. The inflight request might use invalid + // tx and then panic as well. The real panic reason might be + // shadowed by new panic. So, we should fatal here with lock. + defer func() { + if rerr := recover(); rerr != nil { + b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr)) + } + }() + b.batchTx.unsafeCommit(true) b.batchTx.tx = nil From d794ada02d4a2a47822a54c2690bf74c2351e857 Mon Sep 17 00:00:00 2001 From: Allen Ray Date: Fri, 15 May 2026 09:47:33 -0400 Subject: [PATCH 5/8] backend: decouple write buffering from bbolt during defrag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During defrag, writes to the old database are wasted I/O since it is about to be replaced. When the defrag journal is active, mutation methods on batchTxBuffered now write to the journal and in-memory buffer only, skipping the bbolt write. Commits are also skipped during defrag to prevent readTx.buf from being cleared — since puts are not in bbolt, clearing the read buffer would make them invisible. This addresses feedback to decouple transaction buffering from bbolt: writes are buffered on the etcd side and handed over as a batch to the new database via journal replay, without writing to a database that is about to be replaced. Adds three new tests: - TestBackendDefragReadsDuringDefragSeeBufferedPuts: verifies ConcurrentReadTx sees puts made during defrag via the buffer - TestBackendDefragExceedBatchLimit: writes exceed batchLimit during defrag, exercising the counter-reset path in Unlock - TestBackendDefragCommitDuringDefragPreservesData: ForceCommit during defrag does not clear buffered data Co-Authored-By: Claude Opus 4.6 (1M context) --- server/storage/backend/backend_test.go | 198 +++++++++++++++++++++++++ server/storage/backend/batch_tx.go | 42 ++++-- 2 files changed, 229 insertions(+), 11 deletions(-) diff --git a/server/storage/backend/backend_test.go b/server/storage/backend/backend_test.go index 2a632688cf23..f4a1ede913a6 100644 --- a/server/storage/backend/backend_test.go +++ b/server/storage/backend/backend_test.go @@ -973,3 +973,201 @@ func TestBackendDefragLargeJournalReplay(t *testing.T) { rtx.Unlock() assert.Greater(t, found, 0, "journal writes should be present after defrag") } + +func TestBackendDefragReadsDuringDefragSeeBufferedPuts(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("pre_key"), []byte("pre_val")) + tx.Unlock() + b.ForceCommit() + + // During defrag, writes go to journal+buffer only (not bbolt). + // Readers via ConcurrentReadTx should still see these puts + // because readTx.buf is preserved (commits are skipped). + wroteKey := make(chan struct{}) + readResult := make(chan bool, 1) + stop := make(chan struct{}) + var wg sync.WaitGroup + + // writer: put a key during defrag and signal + wg.Add(1) + go func() { + defer wg.Done() + // wait a moment for defrag to start + time.Sleep(10 * time.Millisecond) + wtx := b.BatchTx() + wtx.Lock() + wtx.UnsafePut(schema.Test, []byte("during_defrag"), []byte("visible")) + wtx.Unlock() + close(wroteKey) + <-stop + }() + + // reader: after the write, check if the key is visible via ConcurrentReadTx + wg.Add(1) + go func() { + defer wg.Done() + <-wroteKey + time.Sleep(5 * time.Millisecond) + rtx := b.ConcurrentReadTx() + rtx.RLock() + keys, _ := rtx.UnsafeRange(schema.Test, []byte("during_defrag"), nil, 0) + rtx.RUnlock() + readResult <- len(keys) > 0 + <-stop + }() + + err := b.Defrag() + close(stop) + wg.Wait() + require.NoError(t, err) + + visible := <-readResult + assert.True(t, visible, "puts during defrag should be visible to ConcurrentReadTx via buffer") + + // verify key is also present after defrag + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + keys, vals := rtx.UnsafeRange(schema.Test, []byte("during_defrag"), nil, 0) + rtx.Unlock() + require.Len(t, keys, 1) + assert.Equal(t, []byte("visible"), vals[0]) +} + +func TestBackendDefragExceedBatchLimit(t *testing.T) { + // Use a small batch limit so we can exceed it without writing too many keys. + b, _ := betesting.NewTmpBackend(t, time.Hour, 100) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("seed"), []byte("value")) + tx.Unlock() + b.ForceCommit() + + // Write more than batchLimit keys during defrag to exercise the + // Unlock path where pending >= batchLimit but commits are skipped. + var wg sync.WaitGroup + stop := make(chan struct{}) + totalWritten := make(chan int, 1) + + wg.Add(1) + go func() { + defer wg.Done() + var count int + for i := 0; ; i++ { + select { + case <-stop: + totalWritten <- count + return + default: + } + wtx := b.BatchTx() + wtx.Lock() + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("batch_%06d", i)), []byte("v")) + wtx.Unlock() + count++ + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + written := <-totalWritten + + require.NoError(t, err) + t.Logf("wrote %d keys during defrag (batch limit = 100)", written) + require.Greater(t, written, 100, "should have exceeded the batch limit during defrag") + + // verify all keys are present after defrag + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + var found int + for i := 0; i < written; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("batch_%06d", i)), nil, 0) + if len(keys) > 0 { + found++ + } + } + rtx.Unlock() + assert.Equal(t, written, found, "all keys written during defrag should be present") +} + +func TestBackendDefragCommitDuringDefragPreservesData(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("seed"), []byte("value")) + tx.Unlock() + b.ForceCommit() + + // Simulate what backend.run() does: call Commit() periodically. + // During defrag, Commit() should be skipped so readTx.buf isn't cleared. + wroteKeys := make(chan struct{}) + commitDone := make(chan struct{}) + stop := make(chan struct{}) + var wg sync.WaitGroup + + // writer: put keys during defrag + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(10 * time.Millisecond) + for i := 0; i < 50; i++ { + wtx := b.BatchTx() + wtx.Lock() + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("commit_test_%04d", i)), []byte("val")) + wtx.Unlock() + } + close(wroteKeys) + <-stop + }() + + // committer: call ForceCommit after writes, simulating periodic timer + wg.Add(1) + go func() { + defer wg.Done() + <-wroteKeys + b.ForceCommit() + close(commitDone) + <-stop + }() + + // reader: after commit, verify data is still visible + readResult := make(chan int, 1) + wg.Add(1) + go func() { + defer wg.Done() + <-commitDone + rtx := b.ConcurrentReadTx() + rtx.RLock() + var found int + for i := 0; i < 50; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("commit_test_%04d", i)), nil, 0) + if len(keys) > 0 { + found++ + } + } + rtx.RUnlock() + readResult <- found + <-stop + }() + + err := b.Defrag() + close(stop) + wg.Wait() + require.NoError(t, err) + + found := <-readResult + assert.Equal(t, 50, found, "all puts should remain visible after Commit() during defrag") +} diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 50c80de12708..03a002fa9ce2 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -333,7 +333,15 @@ func (t *batchTxBuffered) Unlock() { // // Please also refer to // https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158 - if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 { + if t.defragJournal != nil { + // During defrag, writes are buffered in the journal and not + // written to bbolt. Skip the commit to prevent readTx.buf + // from being cleared — puts would become invisible since + // they are not in bbolt either. Reset counters so batchTx.Unlock + // does not trigger a wasteful empty commit. + t.pending = 0 + t.pendingDeleteOperations = 0 + } else if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 { t.commit(false) } } @@ -342,7 +350,9 @@ func (t *batchTxBuffered) Unlock() { func (t *batchTxBuffered) Commit() { t.lock() - t.commit(false) + if t.defragJournal == nil { + t.commit(false) + } t.Unlock() } @@ -387,40 +397,50 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) { } func (t *batchTxBuffered) UnsafeCreateBucket(bucket Bucket) { - t.batchTx.UnsafeCreateBucket(bucket) if j := t.defragJournal; j != nil { + t.pending++ j.appendCreateBucket(bucket.Name()) + } else { + t.batchTx.UnsafeCreateBucket(bucket) } } func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) { - t.batchTx.UnsafePut(bucket, key, value) - t.buf.put(bucket, key, value) if j := t.defragJournal; j != nil { + t.pending++ j.appendPut(bucket.Name(), key, value, false) + } else { + t.batchTx.UnsafePut(bucket, key, value) } + t.buf.put(bucket, key, value) } func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) { - t.batchTx.UnsafeSeqPut(bucket, key, value) - t.buf.putSeq(bucket, key, value) if j := t.defragJournal; j != nil { + t.pending++ j.appendPut(bucket.Name(), key, value, true) + } else { + t.batchTx.UnsafeSeqPut(bucket, key, value) } + t.buf.putSeq(bucket, key, value) } func (t *batchTxBuffered) UnsafeDelete(bucketType Bucket, key []byte) { - t.batchTx.UnsafeDelete(bucketType, key) - t.pendingDeleteOperations++ if j := t.defragJournal; j != nil { + t.pending++ j.appendDelete(bucketType.Name(), key) + } else { + t.batchTx.UnsafeDelete(bucketType, key) } + t.pendingDeleteOperations++ } func (t *batchTxBuffered) UnsafeDeleteBucket(bucket Bucket) { - t.batchTx.UnsafeDeleteBucket(bucket) - t.pendingDeleteOperations++ if j := t.defragJournal; j != nil { + t.pending++ j.appendDeleteBucket(bucket.Name()) + } else { + t.batchTx.UnsafeDeleteBucket(bucket) } + t.pendingDeleteOperations++ } From 0fedc749079c03287bef493da4d3a35cbd50cd2e Mon Sep 17 00:00:00 2001 From: Allen Ray Date: Fri, 15 May 2026 10:05:10 -0400 Subject: [PATCH 6/8] backend: add tests for ConcurrentReadTx safety during defrag switchover Tests that readers holding ConcurrentReadTx snapshots from the old database are not disrupted when Phase 3 closes and replaces the DB: - TestBackendDefragConcurrentReadTxSurvivesSwitchover: a single ConcurrentReadTx acquired before defrag is held across the entire switchover, verifying that Phase 3 blocks on db.Close() until the reader releases, and that reads on the old snapshot work throughout - TestBackendDefragMultipleConcurrentReadTxDuringSwitchover: five concurrent readers continuously create and release ConcurrentReadTx during defrag, exercising the txWg synchronization across the Phase 3 boundary Co-Authored-By: Claude Opus 4.6 (1M context) --- server/storage/backend/backend_test.go | 124 +++++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/server/storage/backend/backend_test.go b/server/storage/backend/backend_test.go index f4a1ede913a6..0023715863c4 100644 --- a/server/storage/backend/backend_test.go +++ b/server/storage/backend/backend_test.go @@ -1171,3 +1171,127 @@ func TestBackendDefragCommitDuringDefragPreservesData(t *testing.T) { found := <-readResult assert.Equal(t, 50, found, "all puts should remain visible after Commit() during defrag") } + +func TestBackendDefragConcurrentReadTxSurvivesSwitchover(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 100; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + // Acquire a ConcurrentReadTx BEFORE defrag starts. This holds a + // reference to the old DB's bbolt read tx via txWg. Phase 3 must + // wait for this reader to finish before it can close the old DB. + longReader := b.ConcurrentReadTx() + longReader.RLock() + + // Verify the long reader can read data + keys, vals := longReader.UnsafeRange(schema.Test, []byte("key_0050"), nil, 0) + require.Len(t, keys, 1) + assert.Equal(t, []byte("value"), vals[0]) + + defragDone := make(chan error, 1) + go func() { + defragDone <- b.Defrag() + }() + + // Hold the read tx open for a bit — Phase 3 should block on db.Close() + // waiting for this reader to release. + time.Sleep(50 * time.Millisecond) + + // The reader should still be able to read from the old snapshot + keys, vals = longReader.UnsafeRange(schema.Test, []byte("key_0000"), nil, 0) + require.Len(t, keys, 1) + assert.Equal(t, []byte("value"), vals[0]) + + // Release the long reader — this unblocks Phase 3 + longReader.RUnlock() + + err := <-defragDone + require.NoError(t, err) + + // After defrag, verify new reads work on the new DB + b.ForceCommit() + rtx := b.ConcurrentReadTx() + rtx.RLock() + for i := 0; i < 100; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist after defrag", i) + } + rtx.RUnlock() +} + +func TestBackendDefragMultipleConcurrentReadTxDuringSwitchover(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 200; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + // Simulate many concurrent readers that continuously create and + // release ConcurrentReadTx during defrag, including across the + // Phase 3 switchover. Each reader creates short-lived snapshots + // that must not panic or return errors when the DB is swapped. + var wg sync.WaitGroup + stop := make(chan struct{}) + readErrors := make(chan error, 1000) + + for r := range 5 { + wg.Add(1) + go func(id int) { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + rtx := b.ConcurrentReadTx() + rtx.RLock() + key := []byte(fmt.Sprintf("key_%04d", id*20)) + keys, vals := rtx.UnsafeRange(schema.Test, key, nil, 0) + if len(keys) != 1 { + readErrors <- fmt.Errorf("reader %d: expected 1 key, got %d", id, len(keys)) + } else if string(vals[0]) != "value" { + readErrors <- fmt.Errorf("reader %d: expected 'value', got %q", id, vals[0]) + } + rtx.RUnlock() + } + }(r) + } + + err := b.Defrag() + close(stop) + wg.Wait() + close(readErrors) + + require.NoError(t, err) + + var errs []error + for e := range readErrors { + errs = append(errs, e) + } + assert.Empty(t, errs, "no read errors should occur during defrag switchover") + + // Verify data integrity after defrag + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 0; i < 200; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist after defrag", i) + } + rtx.Unlock() +} From b303788bb3d5ef44a70a2c9b6e7a423b688b8913 Mon Sep 17 00:00:00 2001 From: Allen Ray Date: Fri, 15 May 2026 10:20:00 -0400 Subject: [PATCH 7/8] backend: make defrag tests more deterministic Address CodeRabbit review feedback: - Add atomic counter to TestBackendDefragConcurrentWrites to assert writes actually occurred during defrag - Increase pre-populated data in TestBackendDefragLargeJournalReplay and assert written ops exceed defragLimit to ensure batched replay path is exercised - Add non-blocking select in TestBackendDefragConcurrentReadTxSurvives Switchover to prove defrag is blocked by the outstanding reader Co-Authored-By: Claude Opus 4.6 (1M context) --- server/storage/backend/backend_test.go | 31 ++++++++++++++++++++------ 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/server/storage/backend/backend_test.go b/server/storage/backend/backend_test.go index 0023715863c4..91046b2835f0 100644 --- a/server/storage/backend/backend_test.go +++ b/server/storage/backend/backend_test.go @@ -19,6 +19,7 @@ import ( "os" "reflect" "sync" + "sync/atomic" "testing" "time" @@ -375,6 +376,7 @@ func TestBackendDefragConcurrentWrites(t *testing.T) { // write concurrently during defrag var wg sync.WaitGroup stop := make(chan struct{}) + var opsDone atomic.Int32 wg.Add(1) go func() { @@ -389,6 +391,7 @@ func TestBackendDefragConcurrentWrites(t *testing.T) { tx.Lock() tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("concurrent_%04d", i)), []byte("new")) tx.Unlock() + opsDone.Add(1) } }() @@ -397,6 +400,7 @@ func TestBackendDefragConcurrentWrites(t *testing.T) { wg.Wait() require.NoError(t, err) + require.Greater(t, opsDone.Load(), int32(0), "at least one write must complete during defrag") // verify pre-existing keys that weren't deleted still exist rtx := b.BatchTx() @@ -915,8 +919,11 @@ func TestBackendDefragLargeJournalReplay(t *testing.T) { tx := b.BatchTx() tx.Lock() tx.UnsafeCreateBucket(schema.Test) - for i := 0; i < 100; i++ { - tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("pre_%04d", i)), []byte("value")) + // pre-populate enough data to slow down Phase 1 copy, giving the + // concurrent writer time to exceed defragLimit + largeVal := make([]byte, 1024) + for i := 0; i < 5000; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("pre_%05d", i)), largeVal) } tx.Unlock() b.ForceCommit() @@ -940,9 +947,11 @@ func TestBackendDefragLargeJournalReplay(t *testing.T) { } wtx := b.BatchTx() wtx.Lock() - wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("journal_%06d", i)), []byte("jval")) + for j := 0; j < 10; j++ { + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("journal_%06d", count)), []byte("jval")) + count++ + } wtx.Unlock() - count++ } }() @@ -952,15 +961,17 @@ func TestBackendDefragLargeJournalReplay(t *testing.T) { written := <-totalWritten require.NoError(t, err) + require.Greater(t, written, backend.DefragLimitForTest(), + "test must exceed DefragLimitForTest() to cover batched replay") t.Logf("journal ops written during defrag: %d", written) // verify pre-existing keys survived b.ForceCommit() rtx := b.BatchTx() rtx.Lock() - for i := 0; i < 100; i++ { - keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("pre_%04d", i)), nil, 0) - require.Lenf(t, keys, 1, "pre_%04d should exist", i) + for i := 0; i < 5000; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("pre_%05d", i)), nil, 0) + require.Lenf(t, keys, 1, "pre_%05d should exist", i) } // verify at least some journal keys exist var found int @@ -1205,6 +1216,12 @@ func TestBackendDefragConcurrentReadTxSurvivesSwitchover(t *testing.T) { // waiting for this reader to release. time.Sleep(50 * time.Millisecond) + select { + case err := <-defragDone: + t.Fatalf("Defrag finished before longReader.RUnlock(): %v", err) + default: + } + // The reader should still be able to read from the old snapshot keys, vals = longReader.UnsafeRange(schema.Test, []byte("key_0000"), nil, 0) require.Len(t, keys, 1) From b9a070811875d2603c7dca70b1cf9d96a6768bc4 Mon Sep 17 00:00:00 2001 From: Allen Ray Date: Fri, 15 May 2026 11:23:21 -0400 Subject: [PATCH 8/8] backend: fix flaky defrag tests for CI determinism MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TestBackendDefragConcurrentWrites: gate writer on defragStarted channel so it only runs during defrag, increase pre-populated data to 5000x1KB keys to ensure Phase 1 takes long enough for the writer to execute. TestBackendDefragExceedBatchLimit: increase pre-populated data from 1 seed key to 5000x1KB keys and batch 10 puts per lock cycle. On CI, the single seed key made Phase 1 trivially fast — only 56 keys were written, failing the >100 assertion. Co-Authored-By: Claude Opus 4.6 (1M context) --- server/storage/backend/backend_test.go | 45 +++++++++++++++++--------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/server/storage/backend/backend_test.go b/server/storage/backend/backend_test.go index 91046b2835f0..96b166f32c47 100644 --- a/server/storage/backend/backend_test.go +++ b/server/storage/backend/backend_test.go @@ -356,31 +356,34 @@ func TestBackendDefragConcurrentWrites(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, b) + largeVal := make([]byte, 1024) tx := b.BatchTx() tx.Lock() tx.UnsafeCreateBucket(schema.Test) - for i := 0; i < 1000; i++ { - tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("pre_%04d", i)), []byte("original")) + for i := 0; i < 5000; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("pre_%05d", i)), largeVal) } tx.Unlock() b.ForceCommit() // delete some keys to create reclaimable space tx.Lock() - for i := 0; i < 500; i++ { - tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("pre_%04d", i))) + for i := 0; i < 2500; i++ { + tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("pre_%05d", i))) } tx.Unlock() b.ForceCommit() - // write concurrently during defrag + // Writer waits until defrag is about to start, then writes concurrently. var wg sync.WaitGroup stop := make(chan struct{}) + defragStarted := make(chan struct{}) var opsDone atomic.Int32 wg.Add(1) go func() { defer wg.Done() + <-defragStarted for i := 0; ; i++ { select { case <-stop: @@ -395,7 +398,12 @@ func TestBackendDefragConcurrentWrites(t *testing.T) { } }() - err := b.Defrag() + defragDone := make(chan error, 1) + close(defragStarted) + go func() { + defragDone <- b.Defrag() + }() + err := <-defragDone close(stop) wg.Wait() @@ -405,15 +413,15 @@ func TestBackendDefragConcurrentWrites(t *testing.T) { // verify pre-existing keys that weren't deleted still exist rtx := b.BatchTx() rtx.Lock() - for i := 500; i < 1000; i++ { - keys, vals := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("pre_%04d", i)), nil, 0) - require.Lenf(t, keys, 1, "key pre_%04d should exist", i) - assert.Equal(t, []byte("original"), vals[0]) + for i := 2500; i < 5000; i++ { + keys, vals := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("pre_%05d", i)), nil, 0) + require.Lenf(t, keys, 1, "key pre_%05d should exist", i) + assert.Equal(t, largeVal, vals[0]) } // verify deleted keys are gone - for i := 0; i < 500; i++ { - keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("pre_%04d", i)), nil, 0) - assert.Emptyf(t, keys, "key pre_%04d should be deleted", i) + for i := 0; i < 2500; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("pre_%05d", i)), nil, 0) + assert.Emptyf(t, keys, "key pre_%05d should be deleted", i) } rtx.Unlock() @@ -1055,10 +1063,13 @@ func TestBackendDefragExceedBatchLimit(t *testing.T) { b, _ := betesting.NewTmpBackend(t, time.Hour, 100) defer betesting.Close(t, b) + largeVal := make([]byte, 1024) tx := b.BatchTx() tx.Lock() tx.UnsafeCreateBucket(schema.Test) - tx.UnsafePut(schema.Test, []byte("seed"), []byte("value")) + for i := 0; i < 5000; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("seed_%05d", i)), largeVal) + } tx.Unlock() b.ForceCommit() @@ -1081,9 +1092,11 @@ func TestBackendDefragExceedBatchLimit(t *testing.T) { } wtx := b.BatchTx() wtx.Lock() - wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("batch_%06d", i)), []byte("v")) + for j := 0; j < 10; j++ { + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("batch_%06d", count)), []byte("v")) + count++ + } wtx.Unlock() - count++ } }()