diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index 275064f083b2..ba505b9d9464 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -15,6 +15,7 @@ package backend import ( + "errors" "fmt" "hash/crc32" "io" @@ -28,6 +29,7 @@ import ( "go.uber.org/zap" bolt "go.etcd.io/bbolt" + bolterrors "go.etcd.io/bbolt/errors" "go.etcd.io/etcd/client/pkg/v3/verify" ) @@ -469,20 +471,6 @@ func (b *backend) defrag() error { isDefragActive.Set(1) defer isDefragActive.Set(0) - // TODO: make this non-blocking? - // lock batchTx to ensure nobody is using previous tx, and then - // close previous ongoing tx. - b.batchTx.LockOutsideApply() - defer b.batchTx.Unlock() - - // lock database after lock tx to avoid deadlock. - b.mu.Lock() - defer b.mu.Unlock() - - // block concurrent read requests while resetting tx - b.readTx.Lock() - defer b.readTx.Unlock() - // Create a temporary file to ensure we start with a clean slate. // Snapshotter.cleanupSnapdir cleans up any of these that are found during startup. dir := filepath.Dir(b.db.Path()) @@ -500,27 +488,23 @@ func (b *backend) defrag() error { // return nil, fmt.Errorf(defragOpenFileError) return temp, nil } - // Don't load tmp db into memory regardless of opening options options.Mlock = false tdbp := temp.Name() tmpdb, err := bolt.Open(tdbp, 0o600, &options) if err != nil { temp.Close() if rmErr := os.Remove(temp.Name()); rmErr != nil { - b.lg.Error( - "failed to remove temporary file", + b.lg.Error("failed to remove temporary file", zap.String("path", temp.Name()), zap.Error(rmErr), ) } - return err } dbp := b.db.Path() size1, sizeInUse1 := b.Size(), b.SizeInUse() - b.lg.Info( - "defragmenting", + b.lg.Info("defragmenting", zap.String("path", dbp), zap.Int64("current-db-size-bytes", size1), zap.String("current-db-size", humanize.Bytes(uint64(size1))), @@ -528,35 +512,88 @@ func (b *backend) defrag() error { zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))), ) - defer func() { - // NOTE: We should exit as soon as possible because that tx - // might be closed. The inflight request might use invalid - // tx and then panic as well. The real panic reason might be - // shadowed by new panic. So, we should fatal here with lock. - if rerr := recover(); rerr != nil { - b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr)) - } - }() + // ============================================================ + // PHASE 1: Commit pending writes, take a snapshot, install the + // journal, then unlock so writes can continue during the copy. + // ============================================================ + b.batchTx.LockOutsideApply() - // Commit/stop and then reset current transactions (including the readTx) - b.batchTx.unsafeCommit(true) - b.batchTx.tx = nil + b.batchTx.commit(false) + + b.mu.RLock() + snapTx, snapErr := b.db.Begin(false) + b.mu.RUnlock() + if snapErr != nil { + b.batchTx.Unlock() + tmpdb.Close() + os.Remove(tdbp) + return fmt.Errorf("failed to begin snapshot tx for defrag: %w", snapErr) + } + + journal := newDefragJournal() + b.batchTx.defragJournal = journal + b.batchTx.Unlock() + + b.lg.Info("defrag: copying data (writes unlocked)") // gofail: var defragBeforeCopy struct{} - err = defragdb(b.db, tmpdb, defragLimit) + err = defragFromTx(snapTx, tmpdb, defragLimit) + snapTx.Rollback() + if err != nil { + b.batchTx.LockOutsideApply() + b.batchTx.defragJournal = nil + b.batchTx.Unlock() + journal.close() tmpdb.Close() - if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil { - b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr)) - } + os.RemoveAll(tdbp) + return err + } - // restore the bbolt transactions if defragmentation fails - b.batchTx.tx = b.unsafeBegin(true) - b.readTx.tx = b.unsafeBegin(false) + // ============================================================ + // PHASE 2: Lock writes, drain and replay the journal into the + // temp DB. This should be fast since it's only the delta. + // ============================================================ + b.lg.Info("defrag: replaying journal") + b.batchTx.LockOutsideApply() + b.batchTx.defragJournal = nil + journal.close() + ops := journal.drain() + + if len(ops) > 0 { + b.lg.Info("defrag: replaying journal ops", zap.Int("count", len(ops))) + } + + err = replayJournal(tmpdb, ops, defragLimit) + if err != nil { + b.batchTx.Unlock() + tmpdb.Close() + os.RemoveAll(tdbp) return err } + // ============================================================ + // PHASE 3: Atomic switchover — close old db, rename, reopen. + // ============================================================ + b.lg.Info("defrag: switching database") + + b.mu.Lock() + b.readTx.Lock() + + // NOTE: We should exit as soon as possible because that tx + // might be closed. The inflight request might use invalid + // tx and then panic as well. The real panic reason might be + // shadowed by new panic. So, we should fatal here with lock. + defer func() { + if rerr := recover(); rerr != nil { + b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr)) + } + }() + + b.batchTx.unsafeCommit(true) + b.batchTx.tx = nil + err = b.db.Close() if err != nil { b.lg.Fatal("failed to close database", zap.Error(err)) @@ -585,12 +622,15 @@ func (b *backend) defrag() error { atomic.StoreInt64(&b.size, size) atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize))) + b.readTx.Unlock() + b.mu.Unlock() + b.batchTx.Unlock() + took := time.Since(now) defragSec.Observe(took.Seconds()) size2, sizeInUse2 := b.Size(), b.SizeInUse() - b.lg.Info( - "finished defragmenting directory", + b.lg.Info("finished defragmenting directory", zap.String("path", dbp), zap.Int64("current-db-size-bytes-diff", size2-size1), zap.Int64("current-db-size-bytes", size2), @@ -603,11 +643,7 @@ func (b *backend) defrag() error { return nil } -func defragdb(odb, tmpdb *bolt.DB, limit int) error { - // gofail: var defragdbFail string - // return fmt.Errorf(defragdbFail) - - // open a tx on tmpdb for writes +func defragFromTx(srcTx *bolt.Tx, tmpdb *bolt.DB, limit int) error { tmptx, err := tmpdb.Begin(true) if err != nil { return err @@ -618,18 +654,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { } }() - // open a tx on old db for read - tx, err := odb.Begin(false) - if err != nil { - return err - } - defer tx.Rollback() - - c := tx.Cursor() - + c := srcTx.Cursor() count := 0 for next, _ := c.First(); next != nil; next, _ = c.Next() { - b := tx.Bucket(next) + b := srcTx.Bucket(next) if b == nil { return fmt.Errorf("backend: cannot defrag bucket %s", next) } @@ -638,7 +666,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { if berr != nil { return berr } - tmpb.FillPercent = 0.9 // for bucket2seq write in for each + tmpb.FillPercent = 0.9 if err = b.ForEach(func(k, v []byte) error { count++ @@ -652,8 +680,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { return err } tmpb = tmptx.Bucket(next) - tmpb.FillPercent = 0.9 // for bucket2seq write in for each - + tmpb.FillPercent = 0.9 count = 0 } return tmpb.Put(k, v) @@ -665,6 +692,69 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { return tmptx.Commit() } +func replayJournal(tmpdb *bolt.DB, ops []defragJournalOp, limit int) error { + if len(ops) == 0 { + return nil + } + + tx, err := tmpdb.Begin(true) + if err != nil { + return err + } + defer func() { + if err != nil { + tx.Rollback() + } + }() + + count := 0 + for _, op := range ops { + count++ + if count > limit { + if err = tx.Commit(); err != nil { + return err + } + tx, err = tmpdb.Begin(true) + if err != nil { + return err + } + count = 0 + } + + switch op.opType { + case opCreateBucket: + if _, err = tx.CreateBucketIfNotExists(op.bucketName); err != nil { + return fmt.Errorf("replay: create bucket %s: %w", op.bucketName, err) + } + case opDeleteBucket: + if delErr := tx.DeleteBucket(op.bucketName); delErr != nil && !errors.Is(delErr, bolterrors.ErrBucketNotFound) { + return fmt.Errorf("replay: delete bucket %s: %w", op.bucketName, delErr) + } + case opPut: + b := tx.Bucket(op.bucketName) + if b == nil { + return fmt.Errorf("replay: bucket %s not found for put", op.bucketName) + } + if op.seq { + b.FillPercent = 0.9 + } + if err = b.Put(op.key, op.value); err != nil { + return fmt.Errorf("replay: put in bucket %s: %w", op.bucketName, err) + } + case opDelete: + b := tx.Bucket(op.bucketName) + if b == nil { + continue + } + if err = b.Delete(op.key); err != nil { + return fmt.Errorf("replay: delete from bucket %s: %w", op.bucketName, err) + } + } + } + + return tx.Commit() +} + func (b *backend) begin(write bool) *bolt.Tx { b.mu.RLock() tx := b.unsafeBegin(write) diff --git a/server/storage/backend/backend_test.go b/server/storage/backend/backend_test.go index fc024b88bc1f..96b166f32c47 100644 --- a/server/storage/backend/backend_test.go +++ b/server/storage/backend/backend_test.go @@ -18,6 +18,8 @@ import ( "fmt" "os" "reflect" + "sync" + "sync/atomic" "testing" "time" @@ -349,3 +351,977 @@ func TestBackendWritebackForEach(t *testing.T) { t.Fatalf("expected %q, got %q", seq, partialSeq) } } + +func TestBackendDefragConcurrentWrites(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + largeVal := make([]byte, 1024) + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 5000; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("pre_%05d", i)), largeVal) + } + tx.Unlock() + b.ForceCommit() + + // delete some keys to create reclaimable space + tx.Lock() + for i := 0; i < 2500; i++ { + tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("pre_%05d", i))) + } + tx.Unlock() + b.ForceCommit() + + // Writer waits until defrag is about to start, then writes concurrently. + var wg sync.WaitGroup + stop := make(chan struct{}) + defragStarted := make(chan struct{}) + var opsDone atomic.Int32 + + wg.Add(1) + go func() { + defer wg.Done() + <-defragStarted + for i := 0; ; i++ { + select { + case <-stop: + return + default: + } + tx := b.BatchTx() + tx.Lock() + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("concurrent_%04d", i)), []byte("new")) + tx.Unlock() + opsDone.Add(1) + } + }() + + defragDone := make(chan error, 1) + close(defragStarted) + go func() { + defragDone <- b.Defrag() + }() + err := <-defragDone + close(stop) + wg.Wait() + + require.NoError(t, err) + require.Greater(t, opsDone.Load(), int32(0), "at least one write must complete during defrag") + + // verify pre-existing keys that weren't deleted still exist + rtx := b.BatchTx() + rtx.Lock() + for i := 2500; i < 5000; i++ { + keys, vals := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("pre_%05d", i)), nil, 0) + require.Lenf(t, keys, 1, "key pre_%05d should exist", i) + assert.Equal(t, largeVal, vals[0]) + } + // verify deleted keys are gone + for i := 0; i < 2500; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("pre_%05d", i)), nil, 0) + assert.Emptyf(t, keys, "key pre_%05d should be deleted", i) + } + rtx.Unlock() + + // verify at least some concurrent writes are present + b.ForceCommit() + rtx.Lock() + var found int + for i := 0; i < 10000; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("concurrent_%04d", i)), nil, 0) + if len(keys) > 0 { + found++ + } + } + rtx.Unlock() + assert.Greater(t, found, 0, "at least some concurrent writes should be present after defrag") +} + +func TestBackendDefragConcurrentReads(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 1000; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + var wg sync.WaitGroup + stop := make(chan struct{}) + readErrors := make(chan error, 100) + + // concurrent reader + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + rtx := b.ConcurrentReadTx() + rtx.RLock() + keys, _ := rtx.UnsafeRange(schema.Test, []byte("key_0500"), nil, 0) + if len(keys) == 0 { + readErrors <- fmt.Errorf("key_0500 not found during defrag") + } + rtx.RUnlock() + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + close(readErrors) + + require.NoError(t, err) + for readErr := range readErrors { + t.Error(readErr) + } +} + +func TestBackendDefragWriteAvailability(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < backend.DefragLimitForTest()+100; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%06d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + // delete half the keys to make defrag meaningful + tx.Lock() + for i := 0; i < backend.DefragLimitForTest()/2; i++ { + tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("key_%06d", i))) + } + tx.Unlock() + b.ForceCommit() + + // track write latencies during defrag + var wg sync.WaitGroup + stop := make(chan struct{}) + var maxWriteLatency time.Duration + var mu sync.Mutex + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; ; i++ { + select { + case <-stop: + return + default: + } + start := time.Now() + wtx := b.BatchTx() + wtx.Lock() + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("during_%06d", i)), []byte("val")) + wtx.Unlock() + elapsed := time.Since(start) + + mu.Lock() + if elapsed > maxWriteLatency { + maxWriteLatency = elapsed + } + mu.Unlock() + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + + require.NoError(t, err) + t.Logf("max write latency during defrag: %v", maxWriteLatency) + // The max latency should be well under a second since the copy phase + // doesn't hold the batchTx lock. Writes only block during the brief + // replay + switchover phases. + assert.Less(t, maxWriteLatency, 5*time.Second, + "write latency during defrag should be bounded") +} + +func TestBackendDefragConcurrentReadsAndWrites(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 500; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("original")) + } + tx.Unlock() + b.ForceCommit() + + var wg sync.WaitGroup + stop := make(chan struct{}) + + // concurrent writer + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; ; i++ { + select { + case <-stop: + return + default: + } + wtx := b.BatchTx() + wtx.Lock() + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("write_%04d", i)), []byte("new")) + wtx.Unlock() + } + }() + + // concurrent reader using single-key lookups (schema.Test is not a safe-range bucket) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + rtx := b.ConcurrentReadTx() + rtx.RLock() + rtx.UnsafeRange(schema.Test, []byte("key_0250"), nil, 0) + rtx.RUnlock() + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + + require.NoError(t, err) + + // verify original data survived + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 0; i < 500; i++ { + keys, vals := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist", i) + assert.Equal(t, []byte("original"), vals[0]) + } + rtx.Unlock() +} + +func TestBackendDefragDeletesDuringDefrag(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 100; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + // delete keys concurrently during defrag + var wg sync.WaitGroup + stop := make(chan struct{}) + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 50; i++ { + select { + case <-stop: + return + default: + } + tx := b.BatchTx() + tx.Lock() + tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("key_%04d", i))) + tx.Unlock() + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + + require.NoError(t, err) + + // keys 50-99 should still exist + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 50; i < 100; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist", i) + } + rtx.Unlock() +} + +func TestBackendDefragLogicalConsistency(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafeCreateBucket(schema.Key) + for i := 0; i < 500; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("test_%04d", i)), []byte(fmt.Sprintf("val_%04d", i))) + } + for i := 0; i < 300; i++ { + tx.UnsafeSeqPut(schema.Key, []byte(fmt.Sprintf("rev_%06d", i)), []byte(fmt.Sprintf("data_%06d", i))) + } + tx.Unlock() + b.ForceCommit() + + // delete some keys to create fragmentation + tx.Lock() + for i := 0; i < 200; i++ { + tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("test_%04d", i))) + } + tx.Unlock() + b.ForceCommit() + + // capture logical hash before defrag + hashBefore, err := b.Hash(nil) + require.NoError(t, err) + sizeBefore := b.Size() + + err = b.Defrag() + require.NoError(t, err) + + // hash must be identical — same logical content + hashAfter, err := b.Hash(nil) + require.NoError(t, err) + assert.Equal(t, hashBefore, hashAfter, "logical hash must match after defrag") + + // physical size should have shrunk + sizeAfter := b.Size() + assert.Less(t, sizeAfter, sizeBefore, "defrag should reclaim space") + + // verify all surviving keys individually + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 200; i < 500; i++ { + keys, vals := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("test_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "test_%04d should exist", i) + assert.Equal(t, []byte(fmt.Sprintf("val_%04d", i)), vals[0]) + } + for i := 0; i < 300; i++ { + keys, vals := rtx.UnsafeRange(schema.Key, []byte(fmt.Sprintf("rev_%06d", i)), nil, 0) + require.Lenf(t, keys, 1, "rev_%06d should exist", i) + assert.Equal(t, []byte(fmt.Sprintf("data_%06d", i)), vals[0]) + } + // deleted keys must be gone + for i := 0; i < 200; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("test_%04d", i)), nil, 0) + assert.Emptyf(t, keys, "test_%04d should be deleted", i) + } + rtx.Unlock() +} + +func TestBackendDefragOverwriteDuringCopy(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 100; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("original")) + } + tx.Unlock() + b.ForceCommit() + + // overwrite keys concurrently during defrag + var wg sync.WaitGroup + stop := make(chan struct{}) + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + wtx := b.BatchTx() + wtx.Lock() + for i := 0; i < 50; i++ { + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("updated")) + } + wtx.Unlock() + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + require.NoError(t, err) + + // all 50 overwritten keys must have the new value + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 0; i < 50; i++ { + keys, vals := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist", i) + assert.Equalf(t, []byte("updated"), vals[0], "key_%04d should have updated value", i) + } + // keys 50-99 should still have original value + for i := 50; i < 100; i++ { + keys, vals := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist", i) + assert.Equalf(t, []byte("original"), vals[0], "key_%04d should have original value", i) + } + rtx.Unlock() +} + +func TestBackendDefragNewBucketDuringCopy(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 100; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + // create a new bucket and write to it during defrag + var wg sync.WaitGroup + done := make(chan struct{}) + + wg.Add(1) + go func() { + defer wg.Done() + wtx := b.BatchTx() + wtx.Lock() + wtx.UnsafeCreateBucket(schema.Key) + for i := 0; i < 50; i++ { + wtx.UnsafeSeqPut(schema.Key, []byte(fmt.Sprintf("newkey_%04d", i)), []byte("newval")) + } + wtx.Unlock() + close(done) + }() + + err := b.Defrag() + <-done + wg.Wait() + require.NoError(t, err) + + // verify original bucket data + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 0; i < 100; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist in Test bucket", i) + } + // verify new bucket and its data exist + for i := 0; i < 50; i++ { + keys, vals := rtx.UnsafeRange(schema.Key, []byte(fmt.Sprintf("newkey_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "newkey_%04d should exist in Key bucket", i) + assert.Equal(t, []byte("newval"), vals[0]) + } + rtx.Unlock() +} + +func TestBackendDefragMultipleSequential(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 200; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + for round := 0; round < 3; round++ { + // delete some keys before each defrag + tx.Lock() + for i := round * 20; i < (round+1)*20; i++ { + tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("key_%04d", i))) + } + tx.Unlock() + b.ForceCommit() + + hashBefore, err := b.Hash(nil) + require.NoError(t, err) + + err = b.Defrag() + require.NoErrorf(t, err, "defrag round %d failed", round) + + hashAfter, err := b.Hash(nil) + require.NoError(t, err) + assert.Equalf(t, hashBefore, hashAfter, "hash mismatch after defrag round %d", round) + + // add new keys after each defrag + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 10; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("round%d_%04d", round, i)), []byte("new")) + } + tx.Unlock() + b.ForceCommit() + } + + // verify round keys from all 3 rounds exist + rtx := b.BatchTx() + rtx.Lock() + for round := 0; round < 3; round++ { + for i := 0; i < 10; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("round%d_%04d", round, i)), nil, 0) + require.Lenf(t, keys, 1, "round%d_%04d should exist", round, i) + } + } + rtx.Unlock() +} + +func TestBackendDefragEmptyDatabase(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + err := b.Defrag() + require.NoError(t, err) + + // verify we can still use the database after defragging an empty one + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("after_defrag"), []byte("works")) + tx.Unlock() + b.ForceCommit() + + rtx := b.BatchTx() + rtx.Lock() + keys, vals := rtx.UnsafeRange(schema.Test, []byte("after_defrag"), nil, 0) + require.Len(t, keys, 1) + assert.Equal(t, []byte("works"), vals[0]) + rtx.Unlock() +} + +func TestBackendDefragLargeJournalReplay(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + // pre-populate enough data to slow down Phase 1 copy, giving the + // concurrent writer time to exceed defragLimit + largeVal := make([]byte, 1024) + for i := 0; i < 5000; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("pre_%05d", i)), largeVal) + } + tx.Unlock() + b.ForceCommit() + + // write enough keys during defrag to exceed defragLimit (10000) in the replay, + // exercising the batched commit path in replayJournal + var wg sync.WaitGroup + stop := make(chan struct{}) + totalWritten := make(chan int, 1) + + wg.Add(1) + go func() { + defer wg.Done() + var count int + for i := 0; ; i++ { + select { + case <-stop: + totalWritten <- count + return + default: + } + wtx := b.BatchTx() + wtx.Lock() + for j := 0; j < 10; j++ { + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("journal_%06d", count)), []byte("jval")) + count++ + } + wtx.Unlock() + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + written := <-totalWritten + + require.NoError(t, err) + require.Greater(t, written, backend.DefragLimitForTest(), + "test must exceed DefragLimitForTest() to cover batched replay") + t.Logf("journal ops written during defrag: %d", written) + + // verify pre-existing keys survived + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 0; i < 5000; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("pre_%05d", i)), nil, 0) + require.Lenf(t, keys, 1, "pre_%05d should exist", i) + } + // verify at least some journal keys exist + var found int + for i := 0; i < written; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("journal_%06d", i)), nil, 0) + if len(keys) > 0 { + found++ + } + } + rtx.Unlock() + assert.Greater(t, found, 0, "journal writes should be present after defrag") +} + +func TestBackendDefragReadsDuringDefragSeeBufferedPuts(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("pre_key"), []byte("pre_val")) + tx.Unlock() + b.ForceCommit() + + // During defrag, writes go to journal+buffer only (not bbolt). + // Readers via ConcurrentReadTx should still see these puts + // because readTx.buf is preserved (commits are skipped). + wroteKey := make(chan struct{}) + readResult := make(chan bool, 1) + stop := make(chan struct{}) + var wg sync.WaitGroup + + // writer: put a key during defrag and signal + wg.Add(1) + go func() { + defer wg.Done() + // wait a moment for defrag to start + time.Sleep(10 * time.Millisecond) + wtx := b.BatchTx() + wtx.Lock() + wtx.UnsafePut(schema.Test, []byte("during_defrag"), []byte("visible")) + wtx.Unlock() + close(wroteKey) + <-stop + }() + + // reader: after the write, check if the key is visible via ConcurrentReadTx + wg.Add(1) + go func() { + defer wg.Done() + <-wroteKey + time.Sleep(5 * time.Millisecond) + rtx := b.ConcurrentReadTx() + rtx.RLock() + keys, _ := rtx.UnsafeRange(schema.Test, []byte("during_defrag"), nil, 0) + rtx.RUnlock() + readResult <- len(keys) > 0 + <-stop + }() + + err := b.Defrag() + close(stop) + wg.Wait() + require.NoError(t, err) + + visible := <-readResult + assert.True(t, visible, "puts during defrag should be visible to ConcurrentReadTx via buffer") + + // verify key is also present after defrag + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + keys, vals := rtx.UnsafeRange(schema.Test, []byte("during_defrag"), nil, 0) + rtx.Unlock() + require.Len(t, keys, 1) + assert.Equal(t, []byte("visible"), vals[0]) +} + +func TestBackendDefragExceedBatchLimit(t *testing.T) { + // Use a small batch limit so we can exceed it without writing too many keys. + b, _ := betesting.NewTmpBackend(t, time.Hour, 100) + defer betesting.Close(t, b) + + largeVal := make([]byte, 1024) + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 5000; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("seed_%05d", i)), largeVal) + } + tx.Unlock() + b.ForceCommit() + + // Write more than batchLimit keys during defrag to exercise the + // Unlock path where pending >= batchLimit but commits are skipped. + var wg sync.WaitGroup + stop := make(chan struct{}) + totalWritten := make(chan int, 1) + + wg.Add(1) + go func() { + defer wg.Done() + var count int + for i := 0; ; i++ { + select { + case <-stop: + totalWritten <- count + return + default: + } + wtx := b.BatchTx() + wtx.Lock() + for j := 0; j < 10; j++ { + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("batch_%06d", count)), []byte("v")) + count++ + } + wtx.Unlock() + } + }() + + err := b.Defrag() + close(stop) + wg.Wait() + written := <-totalWritten + + require.NoError(t, err) + t.Logf("wrote %d keys during defrag (batch limit = 100)", written) + require.Greater(t, written, 100, "should have exceeded the batch limit during defrag") + + // verify all keys are present after defrag + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + var found int + for i := 0; i < written; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("batch_%06d", i)), nil, 0) + if len(keys) > 0 { + found++ + } + } + rtx.Unlock() + assert.Equal(t, written, found, "all keys written during defrag should be present") +} + +func TestBackendDefragCommitDuringDefragPreservesData(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("seed"), []byte("value")) + tx.Unlock() + b.ForceCommit() + + // Simulate what backend.run() does: call Commit() periodically. + // During defrag, Commit() should be skipped so readTx.buf isn't cleared. + wroteKeys := make(chan struct{}) + commitDone := make(chan struct{}) + stop := make(chan struct{}) + var wg sync.WaitGroup + + // writer: put keys during defrag + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(10 * time.Millisecond) + for i := 0; i < 50; i++ { + wtx := b.BatchTx() + wtx.Lock() + wtx.UnsafePut(schema.Test, []byte(fmt.Sprintf("commit_test_%04d", i)), []byte("val")) + wtx.Unlock() + } + close(wroteKeys) + <-stop + }() + + // committer: call ForceCommit after writes, simulating periodic timer + wg.Add(1) + go func() { + defer wg.Done() + <-wroteKeys + b.ForceCommit() + close(commitDone) + <-stop + }() + + // reader: after commit, verify data is still visible + readResult := make(chan int, 1) + wg.Add(1) + go func() { + defer wg.Done() + <-commitDone + rtx := b.ConcurrentReadTx() + rtx.RLock() + var found int + for i := 0; i < 50; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("commit_test_%04d", i)), nil, 0) + if len(keys) > 0 { + found++ + } + } + rtx.RUnlock() + readResult <- found + <-stop + }() + + err := b.Defrag() + close(stop) + wg.Wait() + require.NoError(t, err) + + found := <-readResult + assert.Equal(t, 50, found, "all puts should remain visible after Commit() during defrag") +} + +func TestBackendDefragConcurrentReadTxSurvivesSwitchover(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 100; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + // Acquire a ConcurrentReadTx BEFORE defrag starts. This holds a + // reference to the old DB's bbolt read tx via txWg. Phase 3 must + // wait for this reader to finish before it can close the old DB. + longReader := b.ConcurrentReadTx() + longReader.RLock() + + // Verify the long reader can read data + keys, vals := longReader.UnsafeRange(schema.Test, []byte("key_0050"), nil, 0) + require.Len(t, keys, 1) + assert.Equal(t, []byte("value"), vals[0]) + + defragDone := make(chan error, 1) + go func() { + defragDone <- b.Defrag() + }() + + // Hold the read tx open for a bit — Phase 3 should block on db.Close() + // waiting for this reader to release. + time.Sleep(50 * time.Millisecond) + + select { + case err := <-defragDone: + t.Fatalf("Defrag finished before longReader.RUnlock(): %v", err) + default: + } + + // The reader should still be able to read from the old snapshot + keys, vals = longReader.UnsafeRange(schema.Test, []byte("key_0000"), nil, 0) + require.Len(t, keys, 1) + assert.Equal(t, []byte("value"), vals[0]) + + // Release the long reader — this unblocks Phase 3 + longReader.RUnlock() + + err := <-defragDone + require.NoError(t, err) + + // After defrag, verify new reads work on the new DB + b.ForceCommit() + rtx := b.ConcurrentReadTx() + rtx.RLock() + for i := 0; i < 100; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist after defrag", i) + } + rtx.RUnlock() +} + +func TestBackendDefragMultipleConcurrentReadTxDuringSwitchover(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + for i := 0; i < 200; i++ { + tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), []byte("value")) + } + tx.Unlock() + b.ForceCommit() + + // Simulate many concurrent readers that continuously create and + // release ConcurrentReadTx during defrag, including across the + // Phase 3 switchover. Each reader creates short-lived snapshots + // that must not panic or return errors when the DB is swapped. + var wg sync.WaitGroup + stop := make(chan struct{}) + readErrors := make(chan error, 1000) + + for r := range 5 { + wg.Add(1) + go func(id int) { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + rtx := b.ConcurrentReadTx() + rtx.RLock() + key := []byte(fmt.Sprintf("key_%04d", id*20)) + keys, vals := rtx.UnsafeRange(schema.Test, key, nil, 0) + if len(keys) != 1 { + readErrors <- fmt.Errorf("reader %d: expected 1 key, got %d", id, len(keys)) + } else if string(vals[0]) != "value" { + readErrors <- fmt.Errorf("reader %d: expected 'value', got %q", id, vals[0]) + } + rtx.RUnlock() + } + }(r) + } + + err := b.Defrag() + close(stop) + wg.Wait() + close(readErrors) + + require.NoError(t, err) + + var errs []error + for e := range readErrors { + errs = append(errs, e) + } + assert.Empty(t, errs, "no read errors should occur during defrag switchover") + + // Verify data integrity after defrag + b.ForceCommit() + rtx := b.BatchTx() + rtx.Lock() + for i := 0; i < 200; i++ { + keys, _ := rtx.UnsafeRange(schema.Test, []byte(fmt.Sprintf("key_%04d", i)), nil, 0) + require.Lenf(t, keys, 1, "key_%04d should exist after defrag", i) + } + rtx.Unlock() +} diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 5af557cb4283..03a002fa9ce2 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -291,6 +291,7 @@ type batchTxBuffered struct { batchTx buf txWriteBuffer pendingDeleteOperations int + defragJournal *defragJournal } func newBatchTxBuffered(backend *backend) *batchTxBuffered { @@ -332,7 +333,15 @@ func (t *batchTxBuffered) Unlock() { // // Please also refer to // https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158 - if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 { + if t.defragJournal != nil { + // During defrag, writes are buffered in the journal and not + // written to bbolt. Skip the commit to prevent readTx.buf + // from being cleared — puts would become invisible since + // they are not in bbolt either. Reset counters so batchTx.Unlock + // does not trigger a wasteful empty commit. + t.pending = 0 + t.pendingDeleteOperations = 0 + } else if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 { t.commit(false) } } @@ -341,7 +350,9 @@ func (t *batchTxBuffered) Unlock() { func (t *batchTxBuffered) Commit() { t.lock() - t.commit(false) + if t.defragJournal == nil { + t.commit(false) + } t.Unlock() } @@ -385,22 +396,51 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) { } } +func (t *batchTxBuffered) UnsafeCreateBucket(bucket Bucket) { + if j := t.defragJournal; j != nil { + t.pending++ + j.appendCreateBucket(bucket.Name()) + } else { + t.batchTx.UnsafeCreateBucket(bucket) + } +} + func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) { - t.batchTx.UnsafePut(bucket, key, value) + if j := t.defragJournal; j != nil { + t.pending++ + j.appendPut(bucket.Name(), key, value, false) + } else { + t.batchTx.UnsafePut(bucket, key, value) + } t.buf.put(bucket, key, value) } func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) { - t.batchTx.UnsafeSeqPut(bucket, key, value) + if j := t.defragJournal; j != nil { + t.pending++ + j.appendPut(bucket.Name(), key, value, true) + } else { + t.batchTx.UnsafeSeqPut(bucket, key, value) + } t.buf.putSeq(bucket, key, value) } func (t *batchTxBuffered) UnsafeDelete(bucketType Bucket, key []byte) { - t.batchTx.UnsafeDelete(bucketType, key) + if j := t.defragJournal; j != nil { + t.pending++ + j.appendDelete(bucketType.Name(), key) + } else { + t.batchTx.UnsafeDelete(bucketType, key) + } t.pendingDeleteOperations++ } func (t *batchTxBuffered) UnsafeDeleteBucket(bucket Bucket) { - t.batchTx.UnsafeDeleteBucket(bucket) + if j := t.defragJournal; j != nil { + t.pending++ + j.appendDeleteBucket(bucket.Name()) + } else { + t.batchTx.UnsafeDeleteBucket(bucket) + } t.pendingDeleteOperations++ } diff --git a/server/storage/backend/defrag_journal.go b/server/storage/backend/defrag_journal.go new file mode 100644 index 000000000000..940891b7df25 --- /dev/null +++ b/server/storage/backend/defrag_journal.go @@ -0,0 +1,122 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +import "sync" + +type defragOpType uint8 + +const ( + opPut defragOpType = iota + opDelete + opCreateBucket + opDeleteBucket +) + +type defragJournalOp struct { + opType defragOpType + bucketName []byte + key []byte + value []byte + seq bool +} + +type defragJournal struct { + mu sync.Mutex + ops []defragJournalOp + closed bool +} + +func newDefragJournal() *defragJournal { + return &defragJournal{ + ops: make([]defragJournalOp, 0, 1024), + } +} + +func (j *defragJournal) appendPut(bucketName, key, value []byte, seq bool) { + j.mu.Lock() + defer j.mu.Unlock() + if j.closed { + return + } + j.ops = append(j.ops, defragJournalOp{ + opType: opPut, + bucketName: cloneBytes(bucketName), + key: cloneBytes(key), + value: cloneBytes(value), + seq: seq, + }) +} + +func (j *defragJournal) appendDelete(bucketName, key []byte) { + j.mu.Lock() + defer j.mu.Unlock() + if j.closed { + return + } + j.ops = append(j.ops, defragJournalOp{ + opType: opDelete, + bucketName: cloneBytes(bucketName), + key: cloneBytes(key), + }) +} + +func (j *defragJournal) appendCreateBucket(bucketName []byte) { + j.mu.Lock() + defer j.mu.Unlock() + if j.closed { + return + } + j.ops = append(j.ops, defragJournalOp{ + opType: opCreateBucket, + bucketName: cloneBytes(bucketName), + }) +} + +func (j *defragJournal) appendDeleteBucket(bucketName []byte) { + j.mu.Lock() + defer j.mu.Unlock() + if j.closed { + return + } + j.ops = append(j.ops, defragJournalOp{ + opType: opDeleteBucket, + bucketName: cloneBytes(bucketName), + }) +} + +// drain returns all accumulated ops and resets the journal. +func (j *defragJournal) drain() []defragJournalOp { + j.mu.Lock() + defer j.mu.Unlock() + ops := j.ops + j.ops = nil + return ops +} + +func (j *defragJournal) close() { + j.mu.Lock() + defer j.mu.Unlock() + j.closed = true +} + +func cloneBytes(b []byte) []byte { + if b == nil { + return nil + } + cp := make([]byte, len(b)) + copy(cp, b) + return cp +} diff --git a/server/storage/backend/defrag_journal_test.go b/server/storage/backend/defrag_journal_test.go new file mode 100644 index 000000000000..b5e8ccae3c86 --- /dev/null +++ b/server/storage/backend/defrag_journal_test.go @@ -0,0 +1,88 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDefragJournalAppendAndDrain(t *testing.T) { + j := newDefragJournal() + + j.appendCreateBucket([]byte("bucket1")) + j.appendPut([]byte("bucket1"), []byte("key1"), []byte("val1"), false) + j.appendPut([]byte("bucket1"), []byte("key2"), []byte("val2"), true) + j.appendDelete([]byte("bucket1"), []byte("key1")) + j.appendDeleteBucket([]byte("bucket1")) + + ops := j.drain() + require.Len(t, ops, 5) + + assert.Equal(t, opCreateBucket, ops[0].opType) + assert.Equal(t, []byte("bucket1"), ops[0].bucketName) + + assert.Equal(t, opPut, ops[1].opType) + assert.Equal(t, []byte("key1"), ops[1].key) + assert.Equal(t, []byte("val1"), ops[1].value) + assert.False(t, ops[1].seq) + + assert.Equal(t, opPut, ops[2].opType) + assert.Equal(t, []byte("key2"), ops[2].key) + assert.True(t, ops[2].seq) + + assert.Equal(t, opDelete, ops[3].opType) + assert.Equal(t, []byte("key1"), ops[3].key) + + assert.Equal(t, opDeleteBucket, ops[4].opType) + assert.Equal(t, []byte("bucket1"), ops[4].bucketName) + + // drain again should be empty + ops = j.drain() + assert.Empty(t, ops) +} + +func TestDefragJournalClose(t *testing.T) { + j := newDefragJournal() + + j.appendPut([]byte("b"), []byte("k"), []byte("v"), false) + j.close() + + // appends after close are no-ops + j.appendPut([]byte("b"), []byte("k2"), []byte("v2"), false) + j.appendDelete([]byte("b"), []byte("k")) + j.appendCreateBucket([]byte("b2")) + j.appendDeleteBucket([]byte("b")) + + ops := j.drain() + require.Len(t, ops, 1) + assert.Equal(t, []byte("k"), ops[0].key) +} + +func TestDefragJournalDeepCopies(t *testing.T) { + j := newDefragJournal() + + key := []byte("original") + j.appendPut([]byte("b"), key, []byte("v"), false) + + // mutate the source slice + key[0] = 'X' + + ops := j.drain() + require.Len(t, ops, 1) + assert.Equal(t, []byte("original"), ops[0].key, "journal should deep-copy byte slices") +}