Skip to content
208 changes: 149 additions & 59 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package backend

import (
"errors"
"fmt"
"hash/crc32"
"io"
Expand All @@ -28,6 +29,7 @@ import (
"go.uber.org/zap"

bolt "go.etcd.io/bbolt"
bolterrors "go.etcd.io/bbolt/errors"
"go.etcd.io/etcd/client/pkg/v3/verify"
)

Expand Down Expand Up @@ -469,20 +471,6 @@ func (b *backend) defrag() error {
isDefragActive.Set(1)
defer isDefragActive.Set(0)

// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()

// lock database after lock tx to avoid deadlock.
b.mu.Lock()
defer b.mu.Unlock()

// block concurrent read requests while resetting tx
b.readTx.Lock()
defer b.readTx.Unlock()

// Create a temporary file to ensure we start with a clean slate.
// Snapshotter.cleanupSnapdir cleans up any of these that are found during startup.
dir := filepath.Dir(b.db.Path())
Expand All @@ -500,63 +488,112 @@ func (b *backend) defrag() error {
// return nil, fmt.Errorf(defragOpenFileError)
return temp, nil
}
// Don't load tmp db into memory regardless of opening options
options.Mlock = false
tdbp := temp.Name()
tmpdb, err := bolt.Open(tdbp, 0o600, &options)
if err != nil {
temp.Close()
if rmErr := os.Remove(temp.Name()); rmErr != nil {
b.lg.Error(
"failed to remove temporary file",
b.lg.Error("failed to remove temporary file",
zap.String("path", temp.Name()),
zap.Error(rmErr),
)
}

return err
}

dbp := b.db.Path()
size1, sizeInUse1 := b.Size(), b.SizeInUse()
b.lg.Info(
"defragmenting",
b.lg.Info("defragmenting",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes", size1),
zap.String("current-db-size", humanize.Bytes(uint64(size1))),
zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
)

defer func() {
// NOTE: We should exit as soon as possible because that tx
// might be closed. The inflight request might use invalid
// tx and then panic as well. The real panic reason might be
// shadowed by new panic. So, we should fatal here with lock.
if rerr := recover(); rerr != nil {
b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr))
}
}()
// ============================================================
// PHASE 1: Commit pending writes, take a snapshot, install the
// journal, then unlock so writes can continue during the copy.
// ============================================================
b.batchTx.LockOutsideApply()

// Commit/stop and then reset current transactions (including the readTx)
b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil
b.batchTx.commit(false)

b.mu.RLock()
snapTx, snapErr := b.db.Begin(false)
b.mu.RUnlock()
if snapErr != nil {
b.batchTx.Unlock()
tmpdb.Close()
os.Remove(tdbp)
return fmt.Errorf("failed to begin snapshot tx for defrag: %w", snapErr)
}

journal := newDefragJournal()
b.batchTx.defragJournal = journal
b.batchTx.Unlock()

b.lg.Info("defrag: copying data (writes unlocked)")

// gofail: var defragBeforeCopy struct{}
err = defragdb(b.db, tmpdb, defragLimit)
err = defragFromTx(snapTx, tmpdb, defragLimit)
snapTx.Rollback()

if err != nil {
b.batchTx.LockOutsideApply()
b.batchTx.defragJournal = nil
b.batchTx.Unlock()
journal.close()
tmpdb.Close()
if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr))
}
os.RemoveAll(tdbp)
return err
}

// restore the bbolt transactions if defragmentation fails
b.batchTx.tx = b.unsafeBegin(true)
b.readTx.tx = b.unsafeBegin(false)
// ============================================================
// PHASE 2: Lock writes, drain and replay the journal into the
// temp DB. This should be fast since it's only the delta.
// ============================================================
b.lg.Info("defrag: replaying journal")

b.batchTx.LockOutsideApply()
b.batchTx.defragJournal = nil
journal.close()
ops := journal.drain()

if len(ops) > 0 {
b.lg.Info("defrag: replaying journal ops", zap.Int("count", len(ops)))
}

err = replayJournal(tmpdb, ops, defragLimit)
if err != nil {
b.batchTx.Unlock()
tmpdb.Close()
os.RemoveAll(tdbp)
return err
}

// ============================================================
// PHASE 3: Atomic switchover — close old db, rename, reopen.
// ============================================================
b.lg.Info("defrag: switching database")

b.mu.Lock()
b.readTx.Lock()

// NOTE: We should exit as soon as possible because that tx
// might be closed. The inflight request might use invalid
// tx and then panic as well. The real panic reason might be
// shadowed by new panic. So, we should fatal here with lock.
defer func() {
if rerr := recover(); rerr != nil {
b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr))
}
}()

b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil

err = b.db.Close()
if err != nil {
b.lg.Fatal("failed to close database", zap.Error(err))
Expand Down Expand Up @@ -585,12 +622,15 @@ func (b *backend) defrag() error {
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))

b.readTx.Unlock()
b.mu.Unlock()
b.batchTx.Unlock()

took := time.Since(now)
defragSec.Observe(took.Seconds())

size2, sizeInUse2 := b.Size(), b.SizeInUse()
b.lg.Info(
"finished defragmenting directory",
b.lg.Info("finished defragmenting directory",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes-diff", size2-size1),
zap.Int64("current-db-size-bytes", size2),
Expand All @@ -603,11 +643,7 @@ func (b *backend) defrag() error {
return nil
}

func defragdb(odb, tmpdb *bolt.DB, limit int) error {
// gofail: var defragdbFail string
// return fmt.Errorf(defragdbFail)

// open a tx on tmpdb for writes
func defragFromTx(srcTx *bolt.Tx, tmpdb *bolt.DB, limit int) error {
tmptx, err := tmpdb.Begin(true)
if err != nil {
return err
Expand All @@ -618,18 +654,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
}
}()

// open a tx on old db for read
tx, err := odb.Begin(false)
if err != nil {
return err
}
defer tx.Rollback()

c := tx.Cursor()

c := srcTx.Cursor()
count := 0
for next, _ := c.First(); next != nil; next, _ = c.Next() {
b := tx.Bucket(next)
b := srcTx.Bucket(next)
if b == nil {
return fmt.Errorf("backend: cannot defrag bucket %s", next)
}
Expand All @@ -638,7 +666,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
if berr != nil {
return berr
}
tmpb.FillPercent = 0.9 // for bucket2seq write in for each
tmpb.FillPercent = 0.9

if err = b.ForEach(func(k, v []byte) error {
count++
Expand All @@ -652,8 +680,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
return err
}
tmpb = tmptx.Bucket(next)
tmpb.FillPercent = 0.9 // for bucket2seq write in for each

tmpb.FillPercent = 0.9
count = 0
}
return tmpb.Put(k, v)
Expand All @@ -665,6 +692,69 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
return tmptx.Commit()
}

func replayJournal(tmpdb *bolt.DB, ops []defragJournalOp, limit int) error {
if len(ops) == 0 {
return nil
}

tx, err := tmpdb.Begin(true)
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback()
}
}()

count := 0
for _, op := range ops {
count++
if count > limit {
if err = tx.Commit(); err != nil {
return err
}
tx, err = tmpdb.Begin(true)
if err != nil {
return err
}
count = 0
}

switch op.opType {
case opCreateBucket:
if _, err = tx.CreateBucketIfNotExists(op.bucketName); err != nil {
return fmt.Errorf("replay: create bucket %s: %w", op.bucketName, err)
}
case opDeleteBucket:
if delErr := tx.DeleteBucket(op.bucketName); delErr != nil && !errors.Is(delErr, bolterrors.ErrBucketNotFound) {
return fmt.Errorf("replay: delete bucket %s: %w", op.bucketName, delErr)
}
case opPut:
b := tx.Bucket(op.bucketName)
if b == nil {
return fmt.Errorf("replay: bucket %s not found for put", op.bucketName)
}
if op.seq {
b.FillPercent = 0.9
}
if err = b.Put(op.key, op.value); err != nil {
return fmt.Errorf("replay: put in bucket %s: %w", op.bucketName, err)
}
case opDelete:
b := tx.Bucket(op.bucketName)
if b == nil {
continue
}
if err = b.Delete(op.key); err != nil {
return fmt.Errorf("replay: delete from bucket %s: %w", op.bucketName, err)
}
}
}

return tx.Commit()
}

func (b *backend) begin(write bool) *bolt.Tx {
b.mu.RLock()
tx := b.unsafeBegin(write)
Expand Down
Loading