Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 3 additions & 17 deletions frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ import (
"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/frac/sealed/lids"
"github.com/ozontech/seq-db/frac/sealed/sealing"
"github.com/ozontech/seq-db/frac/sealed/seqids"
"github.com/ozontech/seq-db/frac/sealed/token"
"github.com/ozontech/seq-db/indexer"
"github.com/ozontech/seq-db/parser"
"github.com/ozontech/seq-db/seq"
Expand Down Expand Up @@ -353,28 +350,17 @@ func seal(active *Active) (*Sealed, error) {
if err != nil {
return nil, err
}
indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),
}

sealed := NewSealedPreloaded(
active.BaseFileName,
preloaded,
storage.NewReadLimiter(1, nil),
indexCache,
newIndexCache(),
cache.NewCache[[]byte](nil, nil),
&Config{},
testSkipMaskProvider{},
)

active.Release()
return sealed, nil
}
51 changes: 3 additions & 48 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import (
"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/frac/sealed/lids"
"github.com/ozontech/seq-db/frac/sealed/sealing"
"github.com/ozontech/seq-db/frac/sealed/seqids"
"github.com/ozontech/seq-db/frac/sealed/token"
"github.com/ozontech/seq-db/indexer"
"github.com/ozontech/seq-db/node"
"github.com/ozontech/seq-db/parser"
Expand Down Expand Up @@ -2093,25 +2090,11 @@ func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed {
preloaded, err := sealing.Seal(activeSealingSource, s.sealParams)
s.Require().NoError(err, "Sealing failed")

indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),
}

sealed := NewSealedPreloaded(
active.BaseFileName,
preloaded,
storage.NewReadLimiter(1, nil),
indexCache,
newIndexCache(),
cache.NewCache[[]byte](nil, nil),
s.config,
testSkipMaskProvider{},
Expand Down Expand Up @@ -2289,24 +2272,10 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal
sealed := s.newSealed(bulks...)
sealed.Release()

indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),
}

sealed = NewSealed(
sealed.BaseFileName,
storage.NewReadLimiter(1, nil),
indexCache,
newIndexCache(),
cache.NewCache[[]byte](nil, nil),
nil,
s.config,
Expand Down Expand Up @@ -2363,25 +2332,11 @@ func (s *RemoteFractionTestSuite) SetupTest() {
s.Require().NoError(err, "offload failed")
s.Require().True(offloaded, "didn't offload frac")

indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),
}

remoteFrac := NewRemote(
context.Background(),
sealed.BaseFileName,
storage.NewReadLimiter(1, nil),
indexCache,
newIndexCache(),
cache.NewCache[[]byte](nil, nil),
sealed.info,
s.config,
Expand Down
40 changes: 35 additions & 5 deletions frac/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,30 @@ import (
"github.com/ozontech/seq-db/frac/sealed/token"
)

func newIndexCache() *IndexCache {
return &IndexCache{
LegacyRegistry: cache.NewCache[[]byte](nil, nil),

InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),

MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),

Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
}
}

type IndexCache struct {
// Registry cache for legacy sealed fractions.
LegacyRegistry *cache.Cache[[]byte]

// Per-file registry caches (each IndexReader needs its own).
InfoRegistry *cache.Cache[[]byte]
TokenRegistry *cache.Cache[[]byte]
Expand All @@ -16,24 +39,31 @@ type IndexCache struct {
LIDRegistry *cache.Cache[[]byte]

// Block-level data caches shared across all readers.
MIDs *cache.Cache[[]byte]
RIDs *cache.Cache[seqids.BlockRIDs]
Params *cache.Cache[seqids.BlockParams]
MIDs *cache.Cache[[]byte]
RIDs *cache.Cache[seqids.BlockRIDs]
Params *cache.Cache[seqids.BlockParams]

Tokens *cache.Cache[*token.Block]
TokenTable *cache.Cache[token.Table]
LIDs *cache.Cache[*lids.Block]

LIDs *cache.Cache[*lids.Block]
}

func (s *IndexCache) Release() {
s.LegacyRegistry.Release()

s.InfoRegistry.Release()
s.TokenRegistry.Release()
s.OffsetsRegistry.Release()
s.IDRegistry.Release()
s.LIDRegistry.Release()
s.LIDs.Release()

s.MIDs.Release()
s.RIDs.Release()
s.Params.Release()

s.Tokens.Release()
s.TokenTable.Release()

s.LIDs.Release()
}
69 changes: 42 additions & 27 deletions frac/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ type Remote struct {

indexCache *IndexCache

loadMu *sync.RWMutex
isLoaded bool
initMu *sync.RWMutex
isInited bool
blocksData sealed.BlocksData

s3cli *s3.Client
Expand All @@ -88,7 +88,7 @@ func NewRemote(
f := &Remote{
ctx: ctx,

loadMu: &sync.RWMutex{},
initMu: &sync.RWMutex{},

readLimiter: readLimiter,
docsCache: docsCache,
Expand Down Expand Up @@ -116,15 +116,14 @@ func NewRemote(
// I wrote a small proposal on how we can reduce impact of such events.
// https://github.com/ozontech/seq-db/issues/92

if err := f.openInfo(); err != nil {
if err := f.loadInfo(); err != nil {
logger.Error(
"cannot open info file: any subsequent operation will fail",
zap.String("fraction", filepath.Base(f.BaseFileName)),
zap.Error(err),
)
}

f.info = loadInfo(f.infoReader)
return f
}

Expand Down Expand Up @@ -163,7 +162,7 @@ func (f *Remote) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error)
}

func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, error) {
if err := f.load(); err != nil {
if err := f.init(); err != nil {
logger.Error(
"will create empty data provider: cannot load remote fraction",
zap.String("fraction", f.Info().Name()),
Expand Down Expand Up @@ -217,6 +216,8 @@ func (f *Remote) IsIntersecting(from, to seq.MID) bool {
}

func (f *Remote) Suicide() {
// FIXME(dkharms): We need to rename `.remote` file to `._remote` to commit deletion intent.
// Now, we might have fraction leaks in S3 storage since [Suicide] is not atomic.
util.MustRemoveFileByPath(f.BaseFileName + consts.RemoteFractionSuffix)

f.docsCache.Release()
Expand Down Expand Up @@ -251,14 +252,28 @@ func (f *Remote) String() string {
return fracToString(f, "remote")
}

func (f *Remote) load() error {
f.loadMu.Lock()
defer f.loadMu.Unlock()
func (f *Remote) loadInfo() error {
if f.IsLegacy {
if err := f.openInfoLegacy(); err != nil {
return err
}
f.info = loadInfo(f.legacyReader)

if f.isLoaded {
return nil
}

if err := f.openInfo(); err != nil {
return err
}
f.info = loadInfo(f.infoReader)

return nil
}

func (f *Remote) init() error {
f.initMu.Lock()
defer f.initMu.Unlock()

if err := f.openDocs(); err != nil {
return err
}
Expand All @@ -267,9 +282,13 @@ func (f *Remote) load() error {
return err
}

if f.isInited {
return nil
}

if f.IsLegacy {
(&LegacyLoader{}).Load(&f.blocksData, f.info, f.legacyReader)
f.isLoaded = true
f.isInited = true
return nil
}

Expand All @@ -281,29 +300,25 @@ func (f *Remote) load() error {
LID: f.lidReader,
})

f.isLoaded = true
f.isInited = true
return nil
}

func (f *Remote) openInfo() error {
if f.IsLegacy {
if f.legacyFile != nil {
return nil
}

indexName := filepath.Base(f.BaseFileName) + consts.IndexFileSuffix
f.legacyFile = s3.NewReader(f.ctx, f.s3cli, indexName)
func (f *Remote) openInfoLegacy() error {
if f.legacyFile != nil {
return nil
}

return f.openRemoteFile(consts.IndexFileSuffix, func(file storage.ImmutableFile) {
f.legacyFile = file
f.legacyReader = storage.NewIndexReader(
f.readLimiter, indexName,
f.legacyFile, f.indexCache.InfoRegistry,
f.readLimiter, file.Name(),
file, f.indexCache.LegacyRegistry,
)
})
}

// infoReader is used by [loadInfo]
f.infoReader = f.legacyReader
return nil
}

func (f *Remote) openInfo() error {
if f.infoFile != nil {
return nil
}
Expand Down
Loading
Loading