From 69c1e69db8f2d451bfb62a3d950a4a769eb09d09 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Thu, 21 May 2026 14:50:07 +0300 Subject: [PATCH 1/3] feat: new api for compaction --- frac/sealed.go | 1 + fracmanager/fracmanager.go | 50 +++++- fracmanager/fracmanager_for_tests.go | 2 +- fracmanager/fracmanager_test.go | 6 +- fracmanager/fracs_stats.go | 5 +- fracmanager/fraction_registry.go | 219 ++++++++++++++++++-------- fracmanager/lifecycle_manager.go | 27 ++-- fracmanager/lifecycle_manager_test.go | 44 +++--- fracmanager/sync_appender.go | 16 +- 9 files changed, 248 insertions(+), 122 deletions(-) diff --git a/frac/sealed.go b/frac/sealed.go index c18f9a62..e5f8a555 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -392,6 +392,7 @@ func (f *Sealed) Release() { func (f *Sealed) Suicide() { f.Release() + // Rename docs atomically first — this commits the intent to delete. oldPath := f.BaseFileName + consts.DocsFileSuffix newPath := f.BaseFileName + consts.DocsDelFileSuffix diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 77a73c78..5dc808c7 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -77,11 +77,11 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk wg.Wait() // finalize appender to prevent new writes - appender := lc.registry.Appender() - if err := appender.Finalize(); err != nil { + appender := lc.registry.appender() + if err := appender.finalize(); err != nil { logger.Fatal("shutdown fraction freezing error", zap.Error(err)) } - appender.WaitWriteIdle() + appender.waitWriteIdle() stopIdx() @@ -96,16 +96,50 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk return &fm, stop, nil } +type CompactionSnapshot struct { + claimed []*refCountedSealed +} + +func (cs *CompactionSnapshot) Fractions() []*frac.Sealed { + result := make([]*frac.Sealed, len(cs.claimed)) + for i, f := range cs.claimed { + result[i] = f.Sealed + } + return result +} + +func (cs *CompactionSnapshot) Destroy() { + for _, f := range cs.claimed { + f.Destroy() + } +} + +func (fm *FracManager) SealedFractionsSnapshot() []*frac.Sealed { + return fm.lc.registry.sealedSnapshot() +} + +func (fm *FracManager) ClaimForCompaction(names []string) (*CompactionSnapshot, error) { + claimed, err := fm.lc.registry.claimForCompaction(names) + if err != nil { + return nil, err + } + return &CompactionSnapshot{claimed: claimed}, nil +} + +func (fm *FracManager) SubstituteWithSealed(produced *frac.Sealed, snapshot *CompactionSnapshot) { + fm.lc.registry.substituteWithSealed(produced, snapshot.claimed...) +} + func (fm *FracManager) AcquireFraction(name string) (frac.Fraction, func(), bool) { - return fm.lc.registry.AcquireOneFraction(name) + return fm.lc.registry.acquireOneFraction(name) } func (fm *FracManager) AcquireFractions() (List, func()) { - return fm.lc.registry.AcquireAllFractions() + return fm.lc.registry.acquireAllFractions() } func (fm *FracManager) Oldest() uint64 { - return fm.lc.registry.OldestTotal() + return fm.lc.registry.oldestTotal() } func (fm *FracManager) Flags() *StateManager { @@ -121,7 +155,7 @@ func (fm *FracManager) Append(ctx context.Context, docs storage.DocBlock, metas return ctx.Err() default: // Try to append data to the currently active fraction - err := fm.lc.registry.Appender().Append(docs, metas) + err := fm.lc.registry.appender().append(docs, metas) if err != nil { logger.Info("append fail", zap.Error(err)) if err == ErrFractionNotWritable { @@ -167,7 +201,7 @@ func startStatsWorker(ctx context.Context, reg *fractionRegistry, wg *sync.WaitG logger.Info("stats loop is started") // Run stats collection every 10 seconds util.RunEvery(ctx.Done(), time.Second*10, func() { - stats := reg.Stats() + stats := reg.statistics() stats.Log() // Log statistics stats.SetMetrics() // Update Prometheus metrics }) diff --git a/fracmanager/fracmanager_for_tests.go b/fracmanager/fracmanager_for_tests.go index c4ec1cad..39349289 100644 --- a/fracmanager/fracmanager_for_tests.go +++ b/fracmanager/fracmanager_for_tests.go @@ -3,7 +3,7 @@ package fracmanager import "sync" func (fm *FracManager) WaitIdleForTests() { - fm.lc.registry.Appender().WaitWriteIdle() + fm.lc.registry.appender().waitWriteIdle() } func (fm *FracManager) SealForcedForTests() { diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index d92e13b8..64c264b6 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -62,7 +62,7 @@ func TestSealingOnShutdown(t *testing.T) { cfg, fm, stop := setupFracManager(t, cfg) appendDocsToFracManager(t, fm, 10) - activeName := fm.lc.registry.all.fractions[0].Info().Name() + activeName := fm.lc.registry.snapshot.fractions[0].Info().Name() stop() @@ -70,7 +70,7 @@ func TestSealingOnShutdown(t *testing.T) { cfg.MinSealFracSize = 1 // to ensure that the frac will be sealed on shutdown cfg, fm, stop = setupFracManager(t, cfg) - allFractions := fm.lc.registry.all.fractions + allFractions := fm.lc.registry.snapshot.fractions assert.Equal(t, 1, len(allFractions), "should have one fraction") assert.Equal(t, activeName, allFractions[0].Info().Name(), "fraction should have the same name") _, ok := allFractions[0].(*syncAppender) @@ -80,7 +80,7 @@ func TestSealingOnShutdown(t *testing.T) { // third start _, fm, stop = setupFracManager(t, cfg) - allFractions = fm.lc.registry.all.fractions + allFractions = fm.lc.registry.snapshot.fractions assert.Equal(t, 2, len(allFractions), "should have 2 fraction: new active and old sealed") _, ok = allFractions[0].(*refCountedSealed) assert.True(t, ok, "first fraction should be sealed") diff --git a/fracmanager/fracs_stats.go b/fracmanager/fracs_stats.go index c70bbd37..ee255543 100644 --- a/fracmanager/fracs_stats.go +++ b/fracmanager/fracs_stats.go @@ -76,6 +76,7 @@ type registryStats struct { active fracsStats // Statistics for active fraction sealing fracsStats // Statistics for fractions in the sealing process sealed fracsStats // Statistics for fractions on sealed disk + compacting fracsStats // Statistics for fractions participating in compaction offloading fracsStats // Statistics for fractions in the offloading process remotes fracsStats // Statistics for fractions in remote storage } @@ -84,6 +85,7 @@ func (s *registryStats) Log() { s.active.Log("active") s.sealing.Log("sealing") s.sealed.Log("sealed") + s.compacting.Log("compacting") s.offloading.Log("offloading") s.remotes.Log("remotes") } @@ -92,10 +94,11 @@ func (s *registryStats) SetMetrics() { s.active.SetMetrics(dataSizeTotal, "active") s.sealing.SetMetrics(dataSizeTotal, "sealing") s.sealed.SetMetrics(dataSizeTotal, "sealed") + s.compacting.SetMetrics(dataSizeTotal, "compacting") s.offloading.SetMetrics(dataSizeTotal, "offloading") s.remotes.SetMetrics(dataSizeTotal, "remotes") } func (s registryStats) TotalSizeOnDiskLocal() uint64 { - return s.sealing.totalSizeOnDisk + s.sealed.totalSizeOnDisk + return s.sealing.totalSizeOnDisk + s.sealed.totalSizeOnDisk + s.compacting.totalSizeOnDisk } diff --git a/fracmanager/fraction_registry.go b/fracmanager/fraction_registry.go index b0667c04..0c0872c6 100644 --- a/fracmanager/fraction_registry.go +++ b/fracmanager/fraction_registry.go @@ -21,16 +21,17 @@ type fractionRegistry struct { sealing map[string]*syncAppender // fractions being sealed (0-5 typical) sealed PartitionedCollection[*refCountedSealed] // local sealed fractions (can be thousands) + compacting map[string]*refCountedSealed // fractions participating in compaction offloading PartitionedCollection[*refCountedSealed] // fractions being offloaded (0-5 typical) remotes PartitionedCollection[*refCountedRemote] // offloaded fractions (can be thousands) stats registryStats // size statistics for monitoring muAppender sync.RWMutex - appender *syncAppender // currently active writable fraction + sappender *syncAppender // currently active writable fraction - muAll sync.RWMutex - all fractionsSnapshot // all fractions + muSnapshot sync.RWMutex + snapshot fractionsSnapshot // all fractions } // NewFractionRegistry creates and initializes a new fraction registry instance. @@ -51,10 +52,11 @@ func NewFractionRegistry(active *frac.Active, sealed []*frac.Sealed, remotes []* } reg := fractionRegistry{ - appender: &syncAppender{refCountedActive: refCountedActive{Active: active}}, + sappender: &syncAppender{refCountedActive: refCountedActive{Active: active}}, sealing: map[string]*syncAppender{}, sealed: NewPartitionedCollection(func(rcs *refCountedSealed) uint64 { return creationTime(rcs) }), + compacting: map[string]*refCountedSealed{}, offloading: NewPartitionedCollection(func(rcs *refCountedSealed) uint64 { return lastDocTime(rcs) }), remotes: NewPartitionedCollection(func(rcr *refCountedRemote) uint64 { return lastDocTime(rcr) }), } @@ -76,51 +78,51 @@ func NewFractionRegistry(active *frac.Active, sealed []*frac.Sealed, remotes []* return ®, nil } -// Appender returns the currently active writable fraction. -func (r *fractionRegistry) Appender() *syncAppender { +// appender returns the currently active writable fraction. +func (r *fractionRegistry) appender() *syncAppender { r.muAppender.RLock() defer r.muAppender.RUnlock() - return r.appender + return r.sappender } -func (r *fractionRegistry) AcquireOneFraction(name string) (frac.Fraction, func(), bool) { - r.muAll.RLock() - defer r.muAll.RUnlock() +func (r *fractionRegistry) acquireOneFraction(name string) (frac.Fraction, func(), bool) { + r.muSnapshot.RLock() + defer r.muSnapshot.RUnlock() - return r.all.AcquireOne(name) + return r.snapshot.AcquireOne(name) } -// AcquireAllFractions returns a read-only view of all fractions -func (r *fractionRegistry) AcquireAllFractions() ([]frac.Fraction, func()) { - r.muAll.RLock() - defer r.muAll.RUnlock() +// acquireAllFractions returns a read-only view of all fractions +func (r *fractionRegistry) acquireAllFractions() ([]frac.Fraction, func()) { + r.muSnapshot.RLock() + defer r.muSnapshot.RUnlock() - return r.all.AcquireAll() + return r.snapshot.AcquireAll() } -// Stats returns current size statistics of the registry. -func (r *fractionRegistry) Stats() registryStats { +// statistics returns current size statistics of the registry. +func (r *fractionRegistry) statistics() registryStats { r.mu.RLock() s := r.stats - i := r.appender.Info() + i := r.sappender.Info() r.mu.RUnlock() s.active.Set(i) return s } -// OldestTotal returns the creation time of the oldest fraction in the registry. -func (r *fractionRegistry) OldestTotal() uint64 { - r.muAll.RLock() - defer r.muAll.RUnlock() - return r.all.oldestTotal +// oldestTotal returns the creation time of the oldest fraction in the registry. +func (r *fractionRegistry) oldestTotal() uint64 { + r.muSnapshot.RLock() + defer r.muSnapshot.RUnlock() + return r.snapshot.oldestTotal } -// OldestLocal returns the creation time of the oldest local fraction in the registry. -func (r *fractionRegistry) OldestLocal() uint64 { - r.muAll.RLock() - defer r.muAll.RUnlock() - return r.all.oldestLocal +// oldestLocal returns the creation time of the oldest local fraction in the registry. +func (r *fractionRegistry) oldestLocal() uint64 { + r.muSnapshot.RLock() + defer r.muSnapshot.RUnlock() + return r.snapshot.oldestLocal } type activeProvider interface { @@ -131,39 +133,39 @@ func (r *fractionRegistry) setAppender(appender *syncAppender) { r.muAppender.Lock() defer r.muAppender.Unlock() - r.appender = appender + r.sappender = appender - r.muAll.Lock() - defer r.muAll.Unlock() + r.muSnapshot.Lock() + defer r.muSnapshot.Unlock() - r.all.AddActive(appender) + r.snapshot.AddActive(appender) } -// RotateIfFull completes the current active fraction and starts a new one. +// rotateIfFull completes the current active fraction and starts a new one. // Moves previous active fraction to sealing queue. // Should be called when the current active fraction reaches size limit and needs to be rotated -func (r *fractionRegistry) RotateIfFull(maxSize uint64, ap activeProvider) (*refCountedActive, func(), error) { +func (r *fractionRegistry) rotateIfFull(maxSize uint64, ap activeProvider) (*refCountedActive, func(), error) { r.mu.Lock() defer r.mu.Unlock() - if r.appender.Info().DocsOnDisk <= maxSize { + if r.sappender.Info().DocsOnDisk <= maxSize { return nil, nil, nil } - old := r.appender + old := r.sappender r.sealing[old.Info().Name()] = old r.setAppender(&syncAppender{refCountedActive: refCountedActive{Active: ap.CreateActive()}}) - if err := old.Finalize(); err != nil { + if err := old.finalize(); err != nil { return nil, nil, err } curInfo := old.Info() r.stats.sealing.Add(curInfo) - r.appender.Suspend(old.Suspended()) + r.sappender.suspend(old.isSuspended()) wg := sync.WaitGroup{} wg.Add(1) @@ -172,7 +174,7 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, ap activeProvider) (*ref go func() { defer wg.Done() - old.WaitWriteIdle() // can be long enough + old.waitWriteIdle() // can be long enough finalInfo := old.Info() r.mu.Lock() @@ -187,11 +189,11 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, ap activeProvider) (*ref return &old.refCountedActive, wg.Wait, nil } -func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { +func (r *fractionRegistry) suspendIfOverCapacity(maxQueue, maxSize uint64) { r.mu.Lock() defer r.mu.Unlock() - suspended := r.appender.Suspended() + suspended := r.sappender.isSuspended() if maxQueue > 0 && r.stats.sealing.count >= int(maxQueue) { if !suspended { @@ -199,7 +201,7 @@ func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { zap.String("reason", "sealing queue size exceeded"), zap.Uint64("limit", maxQueue), zap.Int("queue_size", r.stats.sealing.count)) - r.appender.Suspend(true) + r.sappender.suspend(true) } return } @@ -212,7 +214,7 @@ func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { zap.String("reason", "occupied space limit exceeded"), zap.Float64("queue_size_limit_gb", util.Float64ToPrec(util.SizeToUnit(maxSize, "gb"), 2)), zap.Float64("occupied_space_gb", util.Float64ToPrec(util.SizeToUnit(du, "gb"), 2))) - r.appender.Suspend(true) + r.sappender.suspend(true) } return } @@ -223,20 +225,21 @@ func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { zap.Float64("occupied_space_gb", util.Float64ToPrec(util.SizeToUnit(du, "gb"), 2)), zap.Uint64("sealing_queue_size_limit", maxQueue), zap.Int("queue_size", r.stats.sealing.count)) - r.appender.Suspend(false) + r.sappender.suspend(false) } } func (r *fractionRegistry) diskUsage() uint64 { - return r.appender.Info().FullSize() + + return r.sappender.Info().FullSize() + r.stats.sealed.totalSizeOnDisk + r.stats.sealing.totalSizeOnDisk + + r.stats.compacting.totalSizeOnDisk + r.stats.offloading.totalSizeOnDisk } -// EvictLocalForDelete removes oldest local fractions to free disk space. +// evictLocalForDelete removes oldest local fractions to free disk space. // Returns evicted fractions or error if insufficient space is released. -func (r *fractionRegistry) EvictLocalForDelete(sizeLimit uint64) (evicted []*refCountedSealed, err error) { +func (r *fractionRegistry) evictLocalForDelete(sizeLimit uint64) (evicted []*refCountedSealed, err error) { r.mu.Lock() defer r.mu.Unlock() @@ -249,9 +252,9 @@ func (r *fractionRegistry) EvictLocalForDelete(sizeLimit uint64) (evicted []*ref return evicted, nil } -// EvictLocalForOffload removes oldest local fractions to moves it to offloading queue. +// evictLocalForOffload removes oldest local fractions to moves it to offloading queue. // Returns evicted fractions or error if insufficient space is released. -func (r *fractionRegistry) EvictLocalForOffload(sizeLimit uint64) ([]*refCountedSealed, error) { +func (r *fractionRegistry) evictLocalForOffload(sizeLimit uint64) ([]*refCountedSealed, error) { r.mu.Lock() defer r.mu.Unlock() @@ -272,16 +275,21 @@ func (r *fractionRegistry) evictLocal(sizeLimit uint64) ([]*refCountedSealed, er var releasingSize uint64 // calculate total used disk space - totalUsedSize := r.stats.TotalSizeOnDiskLocal() + r.appender.Info().FullSize() - - evicted := []*refCountedSealed{} + totalUsedSize := r.stats.TotalSizeOnDiskLocal() + r.sappender.Info().FullSize() + var evicted []*refCountedSealed for r.sealed.Len() > 0 && totalUsedSize-releasingSize > sizeLimit { for _, s := range r.sealed.GetByPartition(r.sealed.MinPartition()) { + if totalUsedSize-releasingSize <= sizeLimit { + break + } + info := s.Info() releasingSize += info.FullSize() + r.stats.sealed.Sub(info) r.sealed.Del(info.Name()) + evicted = append(evicted, s) } } @@ -296,10 +304,10 @@ func (r *fractionRegistry) evictLocal(sizeLimit uint64) ([]*refCountedSealed, er return evicted, nil } -// EvictRemote removes oldest remote fractions based on retention policy. +// evictRemote removes oldest remote fractions based on retention policy. // Fractions older than retention period are permanently deleted. // Returns removed fractions or empty slice if nothing to remove. -func (r *fractionRegistry) EvictRemote(retention time.Duration) []*refCountedRemote { +func (r *fractionRegistry) evictRemote(retention time.Duration) []*refCountedRemote { if retention == 0 { return nil } @@ -322,9 +330,9 @@ func (r *fractionRegistry) EvictRemote(retention time.Duration) []*refCountedRem return evicted } -// EvictOverflowed removes oldest fractions from offloading queue when it exceeds size limit. +// evictOverflowed removes oldest fractions from offloading queue when it exceeds size limit. // Used when offloading queue grows too large due to slow remote storage performance. -func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) (evicted []*refCountedSealed) { +func (r *fractionRegistry) evictOverflowed(sizeLimit uint64) (evicted []*refCountedSealed) { if sizeLimit == 0 { return nil } @@ -355,23 +363,43 @@ loop: return evicted } -// PromoteToSealed moves fractions from sealing to local queue when sealing completes. -func (r *fractionRegistry) PromoteToSealed(active *refCountedActive, sealed *frac.Sealed) { +// promoteToSealed moves fractions from sealing to local queue when sealing completes. +func (r *fractionRegistry) promoteToSealed(active *refCountedActive, sealed ...*frac.Sealed) { r.mu.Lock() defer r.mu.Unlock() - r.sealed.Add(sealed.Info().Name(), &refCountedSealed{Sealed: sealed}) - r.stats.sealed.Add(sealed.Info()) - r.stats.sealing.Sub(active.Info()) + for _, f := range sealed { + info := f.Info() + r.sealed.Add(info.Name(), &refCountedSealed{Sealed: f}) + r.stats.sealed.Add(info) + } + r.stats.sealing.Sub(active.Info()) delete(r.sealing, active.Info().Name()) r.rebuildSnapshot() } -// PromoteToRemote moves fractions from offloading to remote queue when offloading completes. +func (r *fractionRegistry) substituteWithSealed(produced *frac.Sealed, consumed ...*refCountedSealed) { + r.mu.Lock() + defer r.mu.Unlock() + + for _, f := range consumed { + info := f.Info() + r.stats.compacting.Sub(info) + delete(r.compacting, info.Name()) + } + + info := produced.Info() + r.stats.sealed.Add(info) + r.sealed.Add(info.Name(), &refCountedSealed{Sealed: produced}) + + r.rebuildSnapshot() +} + +// promoteToRemote moves fractions from offloading to remote queue when offloading completes. // Special case: handles fractions that don't require offloading (remote == nil). -func (r *fractionRegistry) PromoteToRemote(sealed *refCountedSealed, remote *frac.Remote) { +func (r *fractionRegistry) promoteToRemote(sealed *refCountedSealed, remote *frac.Remote) { r.mu.Lock() defer r.mu.Unlock() @@ -380,14 +408,60 @@ func (r *fractionRegistry) PromoteToRemote(sealed *refCountedSealed, remote *fra r.stats.remotes.Add(remote.Info()) } - r.stats.offloading.Sub(sealed.Info()) r.offloading.Del(sealed.Info().Name()) + r.stats.offloading.Sub(sealed.Info()) + + r.rebuildSnapshot() +} + +func (r *fractionRegistry) sealedSnapshot() []*frac.Sealed { + r.mu.RLock() + defer r.mu.RUnlock() + + result := make([]*frac.Sealed, 0, r.sealed.Len()) + for s := range r.sealed.All() { + result = append(result, s.Sealed) + } + + return result +} + +func (r *fractionRegistry) claimForCompaction(names []string) ([]*refCountedSealed, error) { + r.mu.Lock() + defer r.mu.Unlock() + + for _, name := range names { + // NOTE(dkharms): If offloading pressure is high on the oldest fractions, + // compaction may repeatedly fail to claim them and get into livelock. + if _, ok := r.sealed.Get(name); !ok { + return nil, fmt.Errorf( + "fraction %q is not available for compaction", + name, + ) + } + } + + claimed := make([]*refCountedSealed, 0, len(names)) + for _, name := range names { + s, _ := r.sealed.Get(name) + + r.sealed.Del(name) + r.stats.sealed.Sub(s.Info()) + + r.compacting[name] = s + r.stats.compacting.Add(s.Info()) + + claimed = append(claimed, s) + } + r.rebuildSnapshot() + return claimed, nil } // rebuildSnapshot reconstructs the all fractions list func (r *fractionRegistry) rebuildSnapshot() { - capacity := r.remotes.Len() + r.offloading.Len() + r.sealed.Len() + len(r.sealing) + 1 + capacity := r.remotes.Len() + r.offloading.Len() + + r.sealed.Len() + len(r.compacting) + len(r.sealing) + 1 // allocate extra capacity to accommodate appender rotation that may occur during snapshot lifetime all := newFractionsSnapshot(capacity + 1) @@ -404,13 +478,18 @@ func (r *fractionRegistry) rebuildSnapshot() { all.AddSealed(s) } + for _, c := range r.compacting { + all.AddSealed(c) + } + for _, a := range r.sealing { all.AddActive(a) } - all.AddActive(r.appender) + all.AddActive(r.sappender) + + r.muSnapshot.Lock() + defer r.muSnapshot.Unlock() - r.muAll.Lock() - defer r.muAll.Unlock() - r.all = all + r.snapshot = all } diff --git a/fracmanager/lifecycle_manager.go b/fracmanager/lifecycle_manager.go index 24025c23..327d475f 100644 --- a/fracmanager/lifecycle_manager.go +++ b/fracmanager/lifecycle_manager.go @@ -2,6 +2,7 @@ package fracmanager import ( "context" + "fmt" "sync" "time" @@ -42,7 +43,7 @@ func newLifecycleManager( // Maintain performs periodic lifecycle management tasks. // It coordinates rotation, offloading, cleanup based on configuration. func (lc *lifecycleManager) Maintain(ctx context.Context, cfg *Config, wg *sync.WaitGroup) { - lc.registry.SuspendIfOverCapacity(cfg.SealingQueueLen, cfg.SuspendThreshold()) + lc.registry.suspendIfOverCapacity(cfg.SealingQueueLen, cfg.SuspendThreshold()) lc.rotate(cfg.FracSize, wg) if cfg.OffloadingEnabled { @@ -68,7 +69,7 @@ func (lc *lifecycleManager) SyncInfoCache() { // rotate checks if active fraction needs rotation based on size limit. // Creates new active fraction and starts sealing the previous one. func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) { - active, waitBeforeSealing, err := lc.registry.RotateIfFull(maxSize, lc.provider) + active, waitBeforeSealing, err := lc.registry.rotateIfFull(maxSize, lc.provider) if err != nil { logger.Fatal("active fraction rotation error", zap.Error(err)) } @@ -89,7 +90,7 @@ func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) { } lc.infoCache.Add(sealed.Info()) - lc.registry.PromoteToSealed(active, sealed) + lc.registry.promoteToSealed(active, sealed) active.Destroy() }() } @@ -97,7 +98,7 @@ func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) { // offloadLocal starts offloading of local fractions to remote storage. // Selects fractions based on disk space usage and retention policy. func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, retryDelay time.Duration, wg *sync.WaitGroup) { - toOffload, err := lc.registry.EvictLocalForOffload(sizeLimit) + toOffload, err := lc.registry.evictLocalForOffload(sizeLimit) if err != nil { logger.Fatal("error releasing old fractions:", zap.Error(err)) } @@ -108,7 +109,7 @@ func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, remote := lc.offloadWithRetry(ctx, frac.Sealed, retryDelay) - lc.registry.PromoteToRemote(frac, remote) + lc.registry.promoteToRemote(frac, remote) if remote == nil { lc.infoCache.Remove(frac.Info().Name()) @@ -181,7 +182,7 @@ func (lc *lifecycleManager) tryOffload(ctx context.Context, sealed *frac.Sealed) // cleanRemote deletes outdated remote fractions based on retention policy. func (lc *lifecycleManager) cleanRemote(retention time.Duration, wg *sync.WaitGroup) { - toDelete := lc.registry.EvictRemote(retention) + toDelete := lc.registry.evictRemote(retention) wg.Add(len(toDelete)) for _, remote := range toDelete { go func() { @@ -194,10 +195,16 @@ func (lc *lifecycleManager) cleanRemote(retention time.Duration, wg *sync.WaitGr // cleanLocal deletes outdated local fractions when offloading is disabled. func (lc *lifecycleManager) cleanLocal(sizeLimit uint64, wg *sync.WaitGroup) { - toDelete, err := lc.registry.EvictLocalForDelete(sizeLimit) + toDelete, err := lc.registry.evictLocalForDelete(sizeLimit) if err != nil { logger.Fatal("error releasing old fractions:", zap.Error(err)) } + + fmt.Printf("len(toDelete): %v\n", len(toDelete)) + for _, f := range toDelete { + fmt.Printf("f.Info().Name(): %v\n", f.Info().Name()) + } + if len(toDelete) > 0 && !lc.flags.IsCapacityExceeded() { if err := lc.flags.setCapacityExceeded(true); err != nil { logger.Fatal("can't set capacity_exceeded flag", zap.Error(err)) @@ -217,14 +224,14 @@ func (lc *lifecycleManager) cleanLocal(sizeLimit uint64, wg *sync.WaitGroup) { // updateOldestMetric updates the prometheus metric with oldest fraction timestamp. func (lc *lifecycleManager) updateOldestMetric() { - oldestFracTime.WithLabelValues("remote").Set((time.Duration(lc.registry.OldestTotal()) * time.Millisecond).Seconds()) - oldestFracTime.WithLabelValues("local").Set((time.Duration(lc.registry.OldestLocal()) * time.Millisecond).Seconds()) + oldestFracTime.WithLabelValues("remote").Set((time.Duration(lc.registry.oldestTotal()) * time.Millisecond).Seconds()) + oldestFracTime.WithLabelValues("local").Set((time.Duration(lc.registry.oldestLocal()) * time.Millisecond).Seconds()) } // removeOverflowed removes fractions from offloading queue that exceed size limit // Stops ongoing offloading tasks and cleans up both local and remote resources. func (lc *lifecycleManager) removeOverflowed(sizeLimit uint64, wg *sync.WaitGroup) { - evicted := lc.registry.EvictOverflowed(sizeLimit) + evicted := lc.registry.evictOverflowed(sizeLimit) for _, sealed := range evicted { wg.Add(1) go func() { diff --git a/fracmanager/lifecycle_manager_test.go b/fracmanager/lifecycle_manager_test.go index cb9ab1e0..382a4ebf 100644 --- a/fracmanager/lifecycle_manager_test.go +++ b/fracmanager/lifecycle_manager_test.go @@ -38,7 +38,9 @@ func TestFracInfoCache(t *testing.T) { defer tearDown() fillRotateAndCheck := func(names map[string]struct{}) { - appender := lc.registry.Appender() + time.Sleep(time.Millisecond * 10) + + appender := lc.registry.appender() appendDocsToActive(t, appender.Active, 10+rand.Intn(10)) wg := sync.WaitGroup{} @@ -56,13 +58,13 @@ func TestFracInfoCache(t *testing.T) { for range 10 { fillRotateAndCheck(first) } - halfSize := lc.registry.Stats().TotalSizeOnDiskLocal() + halfSize := lc.registry.statistics().TotalSizeOnDiskLocal() second := map[string]struct{}{} for range 10 { fillRotateAndCheck(second) } - total := lc.registry.Stats().TotalSizeOnDiskLocal() + total := lc.registry.statistics().TotalSizeOnDiskLocal() wg := sync.WaitGroup{} lc.cleanLocal(total-halfSize, &wg) @@ -86,7 +88,7 @@ func TestCapacityExceeded(t *testing.T) { const fracsCount = 10 fillAndRotate := func() { - appender := lc.registry.Appender() + appender := lc.registry.appender() appendDocsToActive(t, appender.Active, 10+rand.Intn(10)) wg := sync.WaitGroup{} @@ -102,19 +104,19 @@ func TestCapacityExceeded(t *testing.T) { } assert.False(t, lc.flags.IsCapacityExceeded(), "there should be no deletions and the flag is false") - total := lc.registry.Stats().TotalSizeOnDiskLocal() + total := lc.registry.statistics().TotalSizeOnDiskLocal() wg := sync.WaitGroup{} lc.cleanLocal(total, &wg) wg.Wait() - assert.Equal(t, fracsCount, lc.registry.Stats().sealed.count, "as much as was added, so much should be") + assert.Equal(t, fracsCount, lc.registry.statistics().sealed.count, "as much as was added, so much should be") assert.False(t, lc.flags.IsCapacityExceeded(), "there should still be no deletions, and the flag is false") lc.cleanLocal(total-1, &wg) wg.Wait() - assert.Equal(t, fracsCount-1, lc.registry.Stats().sealed.count, "expect one less") + assert.Equal(t, fracsCount-1, lc.registry.statistics().sealed.count, "expect one less") assert.True(t, lc.flags.IsCapacityExceeded(), "the flag must be true now") } @@ -124,30 +126,30 @@ func TestOldestMetrics(t *testing.T) { const fracsCount = 10 fillAndRotate := func() { - appender := lc.registry.Appender() + appender := lc.registry.appender() appendDocsToActive(t, appender.Active, 10+rand.Intn(10)) wg := sync.WaitGroup{} lc.rotate(0, &wg) wg.Wait() } - firstFracTime := lc.registry.Appender().Info().CreationTime + firstFracTime := lc.registry.appender().Info().CreationTime for range fracsCount { fillAndRotate() } // Check state after initial rotations - assert.Equal(t, firstFracTime, lc.registry.OldestTotal(), "should point to the very first fraction when all data is local") - assert.Equal(t, firstFracTime, lc.registry.OldestLocal(), "should point to the first fraction when nothing is offloaded") + assert.Equal(t, firstFracTime, lc.registry.oldestTotal(), "should point to the very first fraction when all data is local") + assert.Equal(t, firstFracTime, lc.registry.oldestLocal(), "should point to the first fraction when nothing is offloaded") - halfSize := lc.registry.Stats().TotalSizeOnDiskLocal() + halfSize := lc.registry.statistics().TotalSizeOnDiskLocal() - halfwayFracTime := lc.registry.Appender().Info().CreationTime + halfwayFracTime := lc.registry.appender().Info().CreationTime for range fracsCount { fillAndRotate() } - total := lc.registry.Stats().TotalSizeOnDiskLocal() + total := lc.registry.statistics().TotalSizeOnDiskLocal() wg := sync.WaitGroup{} lc.offloadLocal(t.Context(), total-halfSize, 0, &wg) @@ -155,8 +157,8 @@ func TestOldestMetrics(t *testing.T) { // Check state after offloading assert.NotEqual(t, firstFracTime, halfwayFracTime, "expect different creation times") - assert.Equal(t, firstFracTime, lc.registry.OldestTotal(), "should still reference the first fraction after offload") - assert.Equal(t, halfwayFracTime, lc.registry.OldestLocal(), "should point to the oldest remaining local fraction after offload") + assert.Equal(t, firstFracTime, lc.registry.oldestTotal(), "should still reference the first fraction after offload") + assert.Equal(t, halfwayFracTime, lc.registry.oldestLocal(), "should point to the oldest remaining local fraction after offload") } func TestPendingDestroy(t *testing.T) { @@ -170,19 +172,19 @@ func TestPendingDestroy(t *testing.T) { // appending docs to `fracsCount` fractions where the last is active and the rest are sealed wg := sync.WaitGroup{} for range fracsCount - 1 { - appendDocsToActive(t, lc.registry.Appender().Active, docsPerFrac) + appendDocsToActive(t, lc.registry.appender().Active, docsPerFrac) lc.rotate(0, &wg) } - appendDocsToActive(t, lc.registry.Appender().Active, docsPerFrac) + appendDocsToActive(t, lc.registry.appender().Active, docsPerFrac) // wait sealing complete wg.Wait() // take all fracs to search - fractions1, release1 := lc.registry.AcquireAllFractions() + fractions1, release1 := lc.registry.acquireAllFractions() // delete all sealing fracs - lc.cleanLocal(lc.registry.Appender().Info().FullSize(), &wg) + lc.cleanLocal(lc.registry.appender().Info().FullSize(), &wg) var ( beforeRelease time.Time @@ -220,7 +222,7 @@ func TestPendingDestroy(t *testing.T) { cleanup.Wait() assert.Less(t, beforeRelease, afterCleanup, "we expect cleanup to happen after release") - fractions2, release2 := lc.registry.AcquireAllFractions() + fractions2, release2 := lc.registry.acquireAllFractions() assert.Len(t, fractions2, 1, "only one active fraction should remain") singleName := fractions2[0].Info().Name() diff --git a/fracmanager/sync_appender.go b/fracmanager/sync_appender.go index 76cf4ee0..1acb15a3 100644 --- a/fracmanager/sync_appender.go +++ b/fracmanager/sync_appender.go @@ -26,8 +26,8 @@ type syncAppender struct { suspended bool // Temporarily suspended for writes } -// Append adds documents to the active fraction -func (a *syncAppender) Append(docs, meta []byte) error { +// append adds documents to the active fraction +func (a *syncAppender) append(docs, meta []byte) error { a.mu.RLock() if a.finalized { a.mu.RUnlock() @@ -43,22 +43,22 @@ func (a *syncAppender) Append(docs, meta []byte) error { return a.refCountedActive.Append(docs, meta, &a.wg) } -func (a *syncAppender) Suspended() bool { +func (a *syncAppender) isSuspended() bool { a.mu.Lock() defer a.mu.Unlock() return a.suspended } -func (a *syncAppender) Suspend(value bool) { +func (a *syncAppender) suspend(value bool) { a.mu.Lock() a.suspended = value a.mu.Unlock() } -// WaitWriteIdle waits for all pending write operations to complete +// waitWriteIdle waits for all pending write operations to complete // Used before sealing to ensure data consistency. -func (a *syncAppender) WaitWriteIdle() { +func (a *syncAppender) waitWriteIdle() { start := time.Now() logger.Info("waiting fraction to stop write...", zap.String("name", a.BaseFileName)) a.wg.Wait() @@ -70,8 +70,8 @@ func (a *syncAppender) WaitWriteIdle() { ) } -// Finalize marks the fraction as read-only and prevents new writes from starting after finalize. -func (a *syncAppender) Finalize() error { +// finalize marks the fraction as read-only and prevents new writes from starting after finalize. +func (a *syncAppender) finalize() error { a.mu.Lock() if a.finalized { a.mu.Unlock() From dcdeec2b2c435128d0fda2d3dac8bf8a2e203432 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Fri, 22 May 2026 14:21:25 +0300 Subject: [PATCH 2/3] chore: remove garbage code --- fracmanager/lifecycle_manager.go | 6 ------ fracmanager/lifecycle_manager_test.go | 2 -- 2 files changed, 8 deletions(-) diff --git a/fracmanager/lifecycle_manager.go b/fracmanager/lifecycle_manager.go index 327d475f..e98c5871 100644 --- a/fracmanager/lifecycle_manager.go +++ b/fracmanager/lifecycle_manager.go @@ -2,7 +2,6 @@ package fracmanager import ( "context" - "fmt" "sync" "time" @@ -200,11 +199,6 @@ func (lc *lifecycleManager) cleanLocal(sizeLimit uint64, wg *sync.WaitGroup) { logger.Fatal("error releasing old fractions:", zap.Error(err)) } - fmt.Printf("len(toDelete): %v\n", len(toDelete)) - for _, f := range toDelete { - fmt.Printf("f.Info().Name(): %v\n", f.Info().Name()) - } - if len(toDelete) > 0 && !lc.flags.IsCapacityExceeded() { if err := lc.flags.setCapacityExceeded(true); err != nil { logger.Fatal("can't set capacity_exceeded flag", zap.Error(err)) diff --git a/fracmanager/lifecycle_manager_test.go b/fracmanager/lifecycle_manager_test.go index 382a4ebf..bebc2c1f 100644 --- a/fracmanager/lifecycle_manager_test.go +++ b/fracmanager/lifecycle_manager_test.go @@ -38,8 +38,6 @@ func TestFracInfoCache(t *testing.T) { defer tearDown() fillRotateAndCheck := func(names map[string]struct{}) { - time.Sleep(time.Millisecond * 10) - appender := lc.registry.appender() appendDocsToActive(t, appender.Active, 10+rand.Intn(10)) From 96ba81fd7d2a5a67981d648b430c0f76c67e01b8 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Fri, 22 May 2026 14:25:22 +0300 Subject: [PATCH 3/3] chore: remove evict local fix --- fracmanager/fraction_registry.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/fracmanager/fraction_registry.go b/fracmanager/fraction_registry.go index 0c0872c6..c0112383 100644 --- a/fracmanager/fraction_registry.go +++ b/fracmanager/fraction_registry.go @@ -280,10 +280,6 @@ func (r *fractionRegistry) evictLocal(sizeLimit uint64) ([]*refCountedSealed, er var evicted []*refCountedSealed for r.sealed.Len() > 0 && totalUsedSize-releasingSize > sizeLimit { for _, s := range r.sealed.GetByPartition(r.sealed.MinPartition()) { - if totalUsedSize-releasingSize <= sizeLimit { - break - } - info := s.Info() releasingSize += info.FullSize()