Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion config/frac_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ const (
BinaryDataV1
// BinaryDataV2 - MIDs stored in nanoseconds
BinaryDataV2
// BinaryDataV3 - bitpack for LIDs/MIDs
BinaryDataV3
// BinaryDataV4 - LID blocks have firstLID/lastLID encoded in ext1, isContinued is not used, no legacy TID adjusting
BinaryDataV4
)

const CurrentFracVersion = BinaryDataV2
const CurrentFracVersion = BinaryDataV4
50 changes: 49 additions & 1 deletion frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *FractionTestSuite) SetupTestCommon() {
DocsPositionsZstdLevel: 1,
TokenTableZstdLevel: 1,
DocBlocksZstdLevel: 1,
LIDBlockSize: 512,
LIDBlockSize: 256,
DocBlockSize: 128 * int(units.KiB),
}

Expand Down Expand Up @@ -1326,6 +1326,43 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
fromTime: fromTime,
toTime: toTime,
},
// block skipping scenarios
{
name: "service:gateway AND trace_id:trace-2026",
query: "service:gateway AND trace_id:trace-2026",
filter: func(doc *testDoc) bool {
return doc.service == gateway && doc.traceId == "trace-2026"
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND (trace_id:trace-0 OR trace_id:trace-2500 OR trace_id:trace-4999)",
query: "service:gateway AND (trace_id:trace-0 OR trace_id:trace-2500 OR trace_id:trace-4999)",
filter: func(doc *testDoc) bool {
return doc.service == gateway && (doc.traceId == "trace-0" || doc.traceId == "trace-2500" || doc.traceId == "trace-4999")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND pod:pod-5",
query: "service:gateway AND pod:pod-5",
filter: func(doc *testDoc) bool {
return doc.service == gateway && doc.pod == "pod-5"
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND pod:pod-5 AND message:failed",
query: "service:gateway AND pod:pod-5 AND message:failed",
filter: func(doc *testDoc) bool {
return doc.service == gateway && doc.pod == "pod-5" && strings.Contains(doc.message, "failed")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND message:processing AND message:retry AND level:5",
query: "service:gateway AND message:processing AND message:retry AND level:5",
Expand All @@ -1337,6 +1374,17 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
toTime: toTime,
},
// OR operator queries
{
name: "(service OR) AND (trace_id OR)",
query: "(service:bus OR service:kafka) AND (trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000)",
filter: func(doc *testDoc) bool {
return (doc.service == bus || doc.service == kafka) && (doc.traceId == "trace-1000" ||
doc.traceId == "trace-1500" ||
doc.traceId == "trace-2000")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "trace_id OR",
query: "trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000 OR trace_id:trace-2500 OR trace_id:trace-3000",
Expand Down
3 changes: 2 additions & 1 deletion frac/sealed/lids/iterator_asc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,14 @@ func (it *IteratorAsc) NextGeq(nextID node.LID) node.LID {
return node.NullLID()
}

it.blockIndex = it.table.SeekBlockLeq(it.blockIndex, it.tid, nextID.Unpack())

it.loadNextLIDsBlock()
it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock)
it.counter.AddLIDsCount(len(it.lids))
}

// fast path: smallest remaining > nextID => skip entire block
// TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header
if it.lids[0] > nextID.Unpack() {
it.lids = it.lids[:0]
continue
Expand Down
3 changes: 2 additions & 1 deletion frac/sealed/lids/iterator_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,14 @@ func (it *IteratorDesc) NextGeq(nextID node.LID) node.LID {
return node.NullLID()
}

it.blockIndex = it.table.SeekBlockGeq(it.blockIndex, it.tid, nextID.Unpack())

it.loadNextLIDsBlock() // last chunk in block but not last for tid; need load next block
it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock)
it.counter.AddLIDsCount(len(it.lids)) // inc loaded LIDs count
}

// fast path: last LID < nextID => skip the entire block
// TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header
if nextID.Unpack() > it.lids[len(it.lids)-1] {
it.lids = it.lids[:0]
continue
Expand Down
64 changes: 58 additions & 6 deletions frac/sealed/lids/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lids
import (
"sort"

"github.com/ozontech/seq-db/config"
"go.uber.org/zap"

"github.com/ozontech/seq-db/logger"
Expand All @@ -12,24 +13,35 @@ type Table struct {
StartBlockIndex uint32
MaxTIDs []uint32 // defines last tid for each block
MinTIDs []uint32 // defines first not continued tid for each block
FirstLIDs []uint32
LastLIDs []uint32

// TODO: We need fix MinTID issue that we have to compensate with DiskBlock.getAdjustedMinTID()
// TODO: After that we do not need store IsContinued flag, and able calc it as MaxTIDs[i] == MinTIDs[i+1]
IsContinued []bool
FracVer config.BinaryDataVersion
IsContinued []bool // legacy field, only used in BinaryDataV0-BinaryDataV3 (inclusive)
}

func NewTable(startOfLIDsBlockIndex uint32, minTIDs, maxTIDs []uint32, isContinued []bool) *Table {
func NewTable(
fracVer config.BinaryDataVersion,
startOfLIDsBlockIndex uint32,
minTIDs, maxTIDs []uint32,
firstLIDs, lastLIDs []uint32,
isContinued []bool) *Table {
return &Table{
StartBlockIndex: startOfLIDsBlockIndex,
MinTIDs: minTIDs,
MaxTIDs: maxTIDs,
FirstLIDs: firstLIDs,
LastLIDs: lastLIDs,
IsContinued: isContinued,
FracVer: fracVer,
}
}

func (t *Table) GetAdjustedMinTID(blockIndex uint32) uint32 {
if t.IsContinued[blockIndex] {
return t.MinTIDs[blockIndex] - 1
if t.FracVer < config.BinaryDataV4 {
if t.IsContinued[blockIndex] {
return t.MinTIDs[blockIndex] - 1
}
}
return t.MinTIDs[blockIndex]
}
Expand Down Expand Up @@ -75,6 +87,46 @@ func (t *Table) GetLastBlockIndexForTID(tid uint32) uint32 {
return uint32(index)
}

// SeekBlockGeq finds next block for provided TID which contains
// lid greater or equal to provided LID starting from provided index (inclusive).
// - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions.
func (t *Table) SeekBlockGeq(index uint32, tid uint32, nextLID uint32) uint32 {
if t.FracVer < config.BinaryDataV4 {
// not supported for old frac versions
return index
}

res := index
for i := index + 1; i < uint32(len(t.MinTIDs)); i++ {
if t.MinTIDs[i] == tid && nextLID >= t.FirstLIDs[i] {
res = i
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: here i is int, but in SeekBlockLeq() it is uint32. let's make it consistent, for example:

for i := int(index) + 1; i <len(t.MinTIDs); i++ {
	if t.MinTIDs[i] == tid && nextLID >= t.FirstLIDs[i] {
		res = uint32(i)
		continue
	}
	break
}

continue
}
break
}
return res
}

// SeekBlockLeq finds next block with lowest index for provided TID which contains LIDs
// less or equal to provided LID starting from provided index (inclusive).
// - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions.
func (t *Table) SeekBlockLeq(index uint32, tid uint32, nextLID uint32) uint32 {
if t.FracVer < config.BinaryDataV4 {
// not supported for old frac versions
return index
}

res := index
for i := int(index) - 1; i >= 0; i-- {
if t.MaxTIDs[i] == tid && nextLID <= t.LastLIDs[i] {
res = uint32(i)
continue
}
break
}
return res
}

func (t *Table) HasTIDInPrevBlock(blockIndex, tid uint32) bool {
if blockIndex == 0 { // it is no prev block
return false
Expand Down
12 changes: 6 additions & 6 deletions frac/sealed/sealing/blocks_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ type tokensSealBlock struct {

// lidsExt represents the range and continuation status of LID blocks.
type lidsExt struct {
minTID uint32 // First token ID in the LID block
maxTID uint32 // Last token ID in the LID block
isContinued bool // Whether LID sequence continues in next block
minTID uint32 // First token ID in the LID block
maxTID uint32 // Last token ID in the LID block
firstLID uint32 // First LID in the LID block
lastLID uint32 // Last LID in the LID block
}

// lidsSealBlock represents a sealed block containing LID (Local ID) data.
Expand Down Expand Up @@ -169,7 +170,6 @@ func (bb *blocksBuilder) BuildLIDsBlocks(tokenLIDs iter.Seq[[]uint32], blockCapa
currentTID uint32 // Current TID being processed
currentBlock lidsSealBlock // Current block under construction
isEndOfToken bool // Flag for end of current token's LIDs
isContinued bool // Flag for block continuation
)

// Initialize first block
Expand All @@ -186,8 +186,8 @@ func (bb *blocksBuilder) BuildLIDsBlocks(tokenLIDs iter.Seq[[]uint32], blockCapa
currentBlock.payload.Offsets = append(currentBlock.payload.Offsets, uint32(len(currentBlock.payload.LIDs)))
}
currentBlock.payload.IsLastLID = isEndOfToken // TODO(eguguchkin): Remove legacy field
currentBlock.ext.isContinued = isContinued // TODO(eguguchkin): Remove legacy field
isContinued = !isEndOfToken
currentBlock.ext.firstLID = currentBlock.payload.LIDs[0]
currentBlock.ext.lastLID = currentBlock.payload.LIDs[len(currentBlock.payload.LIDs)-1]
return yield(currentBlock)
}

Expand Down
47 changes: 26 additions & 21 deletions frac/sealed/sealing/blocks_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) {

expected := []lidsSealBlock{{
ext: lidsExt{
minTID: 1,
maxTID: 1,
isContinued: false,
minTID: 1,
maxTID: 1,
firstLID: 10,
lastLID: 30,
},
payload: lids.Block{
LIDs: []uint32{10, 20, 30},
Expand All @@ -330,20 +331,21 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) {
},
}, {
ext: lidsExt{
minTID: 1,
maxTID: 2,
isContinued: true,
minTID: 1,
maxTID: 2,
firstLID: 40,
lastLID: 21,
},
payload: lids.Block{
LIDs: []uint32{40, 11, 21},
Offsets: []uint32{0, 1, 3},
IsLastLID: false,
LIDs: []uint32{40, 11, 21},
Offsets: []uint32{0, 1, 3},
},
}, {
ext: lidsExt{
minTID: 2,
maxTID: 3,
isContinued: true,
minTID: 2,
maxTID: 3,
firstLID: 31,
lastLID: 10,
},
payload: lids.Block{
LIDs: []uint32{31, 41, 10},
Expand All @@ -352,9 +354,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) {
},
}, {
ext: lidsExt{
minTID: 3,
maxTID: 3,
isContinued: true,
minTID: 3,
maxTID: 3,
firstLID: 11,
lastLID: 21,
},
payload: lids.Block{
LIDs: []uint32{11, 20, 21},
Expand All @@ -363,9 +366,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) {
},
}, {
ext: lidsExt{
minTID: 4,
maxTID: 4,
isContinued: false,
minTID: 4,
maxTID: 4,
firstLID: 30,
lastLID: 50,
},
payload: lids.Block{
LIDs: []uint32{30, 40, 50},
Expand All @@ -374,9 +378,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) {
},
}, {
ext: lidsExt{
minTID: 4,
maxTID: 4,
isContinued: true,
minTID: 4,
maxTID: 4,
firstLID: 60,
lastLID: 60,
},
payload: lids.Block{
LIDs: []uint32{60},
Expand Down
15 changes: 6 additions & 9 deletions frac/sealed/sealing/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/alecthomas/units"
"github.com/ozontech/seq-db/config"

"github.com/ozontech/seq-db/bytespool"
"github.com/ozontech/seq-db/consts"
Expand Down Expand Up @@ -414,22 +415,18 @@ func (s *IndexSealer) packPosBlock(block idsSealBlock) indexBlock {
// packLIDsBlock packs Local IDs (LIDs) into a compressed index block.
// Also updates LIDs table for preloaded data access.
func (s *IndexSealer) packLIDsBlock(block lidsSealBlock) indexBlock {
var ext1 uint64
if block.ext.isContinued { // todo: Legacy continuation flag
ext1 = 1
block.ext.minTID++ // Adjust for legacy format
}

// Update LIDs table for PreloadedData
s.lidsTable.MinTIDs = append(s.lidsTable.MinTIDs, block.ext.minTID)
s.lidsTable.MaxTIDs = append(s.lidsTable.MaxTIDs, block.ext.maxTID)
s.lidsTable.IsContinued = append(s.lidsTable.IsContinued, block.ext.isContinued)
s.lidsTable.FirstLIDs = append(s.lidsTable.FirstLIDs, block.ext.firstLID)
s.lidsTable.LastLIDs = append(s.lidsTable.LastLIDs, block.ext.lastLID)
s.lidsTable.FracVer = config.CurrentFracVersion

// Packing block
s.buf1 = block.payload.Pack(s.buf1[:0])
b := s.newIndexBlockZSTD(s.buf1, s.params.LIDsZstdLevel)
b.ext1 = ext1 // Legacy continuation flag
b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range
b.ext1 = uint64(block.ext.lastLID)<<32 | uint64(block.ext.firstLID) // LID range
b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range
return b
}

Expand Down
Loading