Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion config/frac_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ type BinaryDataVersion uint16
const (
// BinaryDataV0 - initial version
BinaryDataV0 BinaryDataVersion = iota

// BinaryDataV1 - support RIDs encoded without varint
BinaryDataV1

// BinaryDataV2 - MIDs stored in nanoseconds
BinaryDataV2

// BinariDataV3 - `.index` file is split across several files
// storing specific sections: `.info`, `.offsets`, `.tokens`, `.ids`, `.lids`
BinaryDataV3
)

const CurrentFracVersion = BinaryDataV2
const CurrentFracVersion = BinaryDataV3
16 changes: 8 additions & 8 deletions frac/active_sealing_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type ActiveSealingSource struct {
rids *UInt64s // RIDs

fields []string // Sorted field names
fieldTids [][]uint32 // Each field contains sorted TIDs based on token value
fieldTIDs [][]uint32 // Each field contains sorted TIDs based on token value

tokens [][]byte // Tokens (values) by TID
lids []*TokenLIDs // LID lists for each token
Expand All @@ -59,7 +59,7 @@ func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSe
info := *active.info // copy

sortedLIDs := active.GetAllDocuments()
fields, fieldTids := sortFields(active.TokenList)
fields, fieldTIDs := sortFields(active.TokenList)

src := ActiveSealingSource{
params: params,
Expand All @@ -74,7 +74,7 @@ func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSe
rids: active.RIDs,

fields: fields,
fieldTids: fieldTids,
fieldTIDs: fieldTIDs,
tokens: active.TokenList.tidToVal,
lids: active.TokenList.tidToLIDs,

Expand All @@ -100,7 +100,7 @@ func sortFields(tl *TokenList) ([]string, [][]uint32) {
fields := slices.Collect(maps.Keys(tl.FieldTIDs))
slices.Sort(fields)

fieldTids := make([][]uint32, len(tl.FieldTIDs))
fieldTIDs := make([][]uint32, len(tl.FieldTIDs))
for i, field := range fields {
// Make a copy because this memory is shared
// with concurrent readers (user search queries).
Expand All @@ -110,10 +110,10 @@ func sortFields(tl *TokenList) ([]string, [][]uint32) {
return bytes.Compare(tl.tidToVal[i], tl.tidToVal[j])
})

fieldTids[i] = cp
fieldTIDs[i] = cp
}

return fields, fieldTids
return fields, fieldTIDs
}

func (src *ActiveSealingSource) ID() iter.Seq2[DocLocation, error] {
Expand All @@ -122,7 +122,7 @@ func (src *ActiveSealingSource) ID() iter.Seq2[DocLocation, error] {
rids := src.rids.vals

// System ID and DocPos are not stored in `src.sortedLIDs`.
// However we do have to yield them to preserve 1-baseed indexing for ids.
// However we do have to yield them to preserve 1-based indexing for ids.
dloc := DocLocation{First: seq.SystemID, Second: seq.SystemDocPos}
if !yield(dloc, nil) {
return
Expand Down Expand Up @@ -193,7 +193,7 @@ func (src *ActiveSealingSource) TokenTriplet() iter.Seq2[string, iter.Seq2[Token
func (src *ActiveSealingSource) postingsForField(field string, idx int) iter.Seq2[TokenPosting, error] {
var lidsbuf []uint32
return func(yield func(TokenPosting, error) bool) {
for _, tid := range src.fieldTids[idx] {
for _, tid := range src.fieldTIDs[idx] {
token := src.tokens[tid]

lids := src.lids[tid].SortedLIDsUnsafe()
Expand Down
4 changes: 2 additions & 2 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1837,11 +1837,11 @@ func (s *FractionTestSuite) TestFractionInfo() {
s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match")
case *Sealed:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1600),
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
case *Remote:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1600),
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
default:
s.Require().Fail("unsupported fraction type")
Expand Down
27 changes: 17 additions & 10 deletions frac/sealed/sealing/blocks_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ func (bb *blocksBuilder) BuildTokenBlocks(
}

block.ext.minTID = 1
for field, tokIt := range it {
for field, tokenIterator := range it {
emitFieldEntry()

fieldName = field
fieldEntryStartTID = currentTID + 1

for pair, err := range tokIt {
for pair, err := range tokenIterator {
if err != nil {
yield(TokenBlock{}, err)
return
Expand Down Expand Up @@ -202,8 +202,9 @@ func seqBlockID(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[
}
}

type lidBlocksAcc struct {
type lidAccumulator struct {
blockCapacity int
onBlock func(lidsSealBlock) error

currentTID uint32
currentBlock lidsSealBlock
Expand All @@ -212,8 +213,14 @@ type lidBlocksAcc struct {
isContinued bool
}

func newLIDBlocksAccumulator(blockCapacity int) *lidBlocksAcc {
a := &lidBlocksAcc{blockCapacity: blockCapacity}
func newLIDAccumulator(
blockCapacity int,
onBlock func(lidsSealBlock) error,
) *lidAccumulator {
a := &lidAccumulator{
blockCapacity: blockCapacity,
onBlock: onBlock,
}

a.currentBlock.ext.minTID = 1
a.currentBlock.payload = lids.Block{
Expand All @@ -229,12 +236,12 @@ func newLIDBlocksAccumulator(blockCapacity int) *lidBlocksAcc {
// For each block that fills up, `onBlock` is called immediately
// before the backing arrays are reset, so `onBlock` may read the
// block data but must not retain references to it.
func (a *lidBlocksAcc) Add(lidsbuf []uint32, onBlock func(lidsSealBlock) error) error {
func (a *lidAccumulator) Add(lidsbuf []uint32) error {
a.currentTID++

for _, lid := range lidsbuf {
if len(a.currentBlock.payload.LIDs) == a.blockCapacity {
if err := onBlock(a.finalizeBlock()); err != nil {
if err := a.onBlock(a.finalizeBlock()); err != nil {
return err
}

Expand All @@ -257,11 +264,11 @@ func (a *lidBlocksAcc) Add(lidsbuf []uint32, onBlock func(lidsSealBlock) error)
return nil
}

func (a *lidBlocksAcc) Flush() lidsSealBlock {
return a.finalizeBlock()
func (a *lidAccumulator) Finalize() error {
return a.onBlock(a.finalizeBlock())
}

func (a *lidBlocksAcc) finalizeBlock() lidsSealBlock {
func (a *lidAccumulator) finalizeBlock() lidsSealBlock {
if !a.isEndOfToken {
a.currentBlock.payload.Offsets = append(
a.currentBlock.payload.Offsets,
Expand Down
26 changes: 13 additions & 13 deletions frac/sealed/sealing/blocks_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,22 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) {
const blockSize = 24
const lidBlockCap = 3

var bb blocksBuilder
lidAccum := newLIDBlocksAccumulator(lidBlockCap)
var lidBlocks []lidsSealBlock
lidAccumulator := newLIDAccumulator(
lidBlockCap,
func(block lidsSealBlock) error {
block.payload.LIDs = slices.Clone(block.payload.LIDs)
block.payload.Offsets = slices.Clone(block.payload.Offsets)
lidBlocks = append(lidBlocks, block)
return nil
},
)

var bb blocksBuilder
tokenBlocks := bb.BuildTokenBlocks(
src.TokenTriplet(),
func(lids []uint32) error {
return lidAccum.Add(lids, func(block lidsSealBlock) error {
block.payload.LIDs = slices.Clone(block.payload.LIDs)
block.payload.Offsets = slices.Clone(block.payload.Offsets)
lidBlocks = append(lidBlocks, block)
return nil
})
return lidAccumulator.Add(lids)
},
blockSize,
)
Expand Down Expand Up @@ -245,11 +249,7 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) {
},
}
assert.Equal(t, actualTokenTable.FieldsTables, expectedTokenTable.FieldsTables)

finalBlock := lidAccum.Flush()
finalBlock.payload.LIDs = slices.Clone(finalBlock.payload.LIDs)
finalBlock.payload.Offsets = slices.Clone(finalBlock.payload.Offsets)
lidBlocks = append(lidBlocks, finalBlock)
assert.NoError(t, lidAccumulator.Finalize())

expectedLIDBlocks := []lidsSealBlock{
{
Expand Down
47 changes: 23 additions & 24 deletions frac/sealed/sealing/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ func (s *IndexSealer) WriteOffsetsFile(ws io.WriteSeeker, src Source) error {
Offsets: src.BlockOffsets(),
}

if err := w.writeBlock(btypeOffset, s.packBlocksOffsetsBlock(offsets)); err != nil {
if err := w.writeBlock(blockTypeOffset, s.packBlocksOffsetsBlock(offsets)); err != nil {
return err
}

// Emit trailing separator.
if err := w.writeBlock(btypeBlackhole, indexBlock{}); err != nil {
if err := w.writeEmptyBlock(); err != nil {
return err
}

Expand All @@ -95,21 +95,21 @@ func (s *IndexSealer) WriteIDFile(ws io.WriteSeeker, src Source) error {
return err
}

if err := w.writeBlock(btypeMid, s.packMIDsBlock(block)); err != nil {
if err := w.writeBlock(blockTypeMID, s.packMIDsBlock(block)); err != nil {
return err
}

if err := w.writeBlock(btypeRid, s.packRIDsBlock(block)); err != nil {
if err := w.writeBlock(blockTypeRID, s.packRIDsBlock(block)); err != nil {
return err
}

if err := w.writeBlock(btypeDocPos, s.packPosBlock(block)); err != nil {
if err := w.writeBlock(blockTypeDocPos, s.packPosBlock(block)); err != nil {
return err
}
}

// Emit trailing separator.
if err := w.writeBlock(btypeBlackhole, indexBlock{}); err != nil {
if err := w.writeEmptyBlock(); err != nil {
return err
}

Expand All @@ -132,42 +132,41 @@ func (s *IndexSealer) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err
var (
bb blocksBuilder
allFieldsTables []token.FieldTable
lidacc = newLIDBlocksAccumulator(consts.LIDBlockCap)
)

// NOTE(dkharms): This is so ugly but I cannot come up with other solution here.
accumulate := func(lids []uint32) error {
return lidacc.Add(lids, func(block lidsSealBlock) error {
return lw.writeBlock(btypeLid, s.packLIDsBlock(block))
})
}
lidAccumulator := newLIDAccumulator(
consts.LIDBlockCap,
func(block lidsSealBlock) error {
return lw.writeBlock(blockTypeLID, s.packLIDsBlock(block))
},
)

for pair, err := range bb.BuildTokenBlocks(src.TokenTriplet(), accumulate, consts.RegularBlockSize) {
for pair, err := range bb.BuildTokenBlocks(src.TokenTriplet(), lidAccumulator.Add, consts.RegularBlockSize) {
if err != nil {
return err
}

if err := tw.writeBlock(btypeToken, s.packTokenBlock(pair.First)); err != nil {
if err := tw.writeBlock(blockTypeToken, s.packTokenBlock(pair.First)); err != nil {
return err
}

allFieldsTables = append(allFieldsTables, pair.Second...)
}

if err := s.finalizeLIDFile(lw, lidacc); err != nil {
if err := s.finalizeLIDFile(lw, lidAccumulator); err != nil {
return err
}

return s.finalizeTokenFile(tw, allFieldsTables)
}

func (s *IndexSealer) finalizeLIDFile(w *writer, lidAccum *lidBlocksAcc) error {
if err := w.writeBlock(btypeLid, s.packLIDsBlock(lidAccum.Flush())); err != nil {
func (s *IndexSealer) finalizeLIDFile(w *writer, lidAccumulator *lidAccumulator) error {
if err := lidAccumulator.Finalize(); err != nil {
return err
}

// Emit trailing separator.
if err := w.writeBlock(btypeBlackhole, indexBlock{}); err != nil {
if err := w.writeEmptyBlock(); err != nil {
return err
}

Expand All @@ -176,17 +175,17 @@ func (s *IndexSealer) finalizeLIDFile(w *writer, lidAccum *lidBlocksAcc) error {

func (s *IndexSealer) finalizeTokenFile(w *writer, allFieldsTables []token.FieldTable) error {
// Emit section separator.
if err := w.writeBlock(btypeToken, indexBlock{}); err != nil {
if err := w.writeEmptyBlock(); err != nil {
return err
}

tokenTableBlock := token.TableBlock{FieldsTables: collapseOrderedFieldsTables(allFieldsTables)}
if err := w.writeBlock(btypeTokenTable, s.packTokenTableBlock(tokenTableBlock)); err != nil {
if err := w.writeBlock(blockTypeTokenTable, s.packTokenTableBlock(tokenTableBlock)); err != nil {
return err
}

// Emit trailing separator.
if err := w.writeBlock(btypeBlackhole, indexBlock{}); err != nil {
if err := w.writeEmptyBlock(); err != nil {
return err
}

Expand All @@ -201,12 +200,12 @@ func (s *IndexSealer) WriteInfoFile(ws io.WriteSeeker, src Source) error {
defer w.release()

block := sealed.BlockInfo{Info: src.Info()}
if err := w.writeBlock(btypeInfo, s.packInfoBlock(block)); err != nil {
if err := w.writeBlock(blockTypeInfo, s.packInfoBlock(block)); err != nil {
return err
}

// Emit trailing separator.
if err := w.writeBlock(btypeBlackhole, indexBlock{}); err != nil {
if err := w.writeEmptyBlock(); err != nil {
return err
}

Expand Down
Loading