Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/index_analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func analyzeIndex(
if err := b.Unpack(readBlock()); err != nil {
logger.Fatal("error unpacking block info", zap.Error(err))
}
ver := b.Info.BinaryDataVer

docsCount := int(b.Info.DocsTotal)

Expand Down Expand Up @@ -162,7 +163,7 @@ func analyzeIndex(
}

block := &lids.Block{}
if err := block.Unpack(data, &lids.UnpackBuffer{}); err != nil {
if err := block.Unpack(data, ver, &lids.UnpackBuffer{}); err != nil {
logger.Fatal("error unpacking lids block", zap.Error(err))
}

Expand Down
4 changes: 3 additions & 1 deletion config/frac_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const (
BinaryDataV1
// BinaryDataV2 - MIDs stored in nanoseconds
BinaryDataV2
// BinaryDataV3 - delta bitpack encoded MIDs and LIDs
BinaryDataV3
)

const CurrentFracVersion = BinaryDataV2
const CurrentFracVersion = BinaryDataV3
4 changes: 2 additions & 2 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,8 +1841,8 @@ func (s *FractionTestSuite) TestFractionInfo() {
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
case *Remote:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1500),
"index on disk doesn't match. actual value: %d", info.MetaOnDisk)
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1550),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
default:
s.Require().Fail("unsupported fraction type")
}
Expand Down
2 changes: 1 addition & 1 deletion frac/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e
docsReader: &f.docsReader,
blocksOffsets: f.blocksData.BlocksOffsets,
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs),
lidsLoader: lids.NewLoader(f.info.BinaryDataVer, &f.indexReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable),

Expand Down
2 changes: 1 addition & 1 deletion frac/sealed.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider {
docsReader: &f.docsReader,
blocksOffsets: f.blocksData.BlocksOffsets,
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs),
lidsLoader: lids.NewLoader(f.info.BinaryDataVer, &f.indexReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable),

Expand Down
52 changes: 33 additions & 19 deletions frac/sealed/lids/block.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package lids

import (
"encoding/binary"
"math"
"unsafe"

"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/packer"
)

Expand All @@ -23,21 +23,9 @@ func (b *Block) getLIDs(i int) []uint32 {
return b.LIDs[b.Offsets[i]:b.Offsets[i+1]]
}

func (b *Block) Pack(dst []byte) []byte {
lastLID := int64(0)
last := b.getCount() - 1
for i := 0; i <= last; i++ {
for _, lid := range b.getLIDs(i) {
dst = binary.AppendVarint(dst, int64(lid)-lastLID)
lastLID = int64(lid)
}

if i < last || b.IsLastLID {
// when we add this value to prev we must get -1 (or math.MaxUint32 for uint32)
// it is the end-marker; see `Block.Unpack()`
dst = binary.AppendVarint(dst, -1-lastLID)
}
}
func (b *Block) Pack(dst []byte, buf []uint32) []byte {
dst = packer.CompressDeltaBitpackUint32(dst, b.Offsets, buf)
dst = packer.CompressDeltaBitpackUint32(dst, b.LIDs, buf)
return dst
}

Expand All @@ -49,13 +37,39 @@ func (b *Block) GetSizeBytes() int {
return blockSize + uint32Size*cap(b.LIDs) + uint32Size*cap(b.Offsets)
}

func (b *Block) Unpack(data []byte, buf *UnpackBuffer) error {
func (b *Block) Unpack(data []byte, fracVer config.BinaryDataVersion, buf *UnpackBuffer) error {
buf.Reset(fracVer)

if fracVer >= config.BinaryDataV3 {
return b.unpackBitpack(data, buf)
}

return b.unpackVarint(data, buf)
}

func (b *Block) unpackBitpack(data []byte, buf *UnpackBuffer) error {
var err error
var values []uint32

data, values, err = packer.DecompressDeltaBitpackUint32(data, buf.decompressed, buf.compressed)
if err != nil {
return err
}
b.Offsets = append([]uint32{}, values...)

_, values, err = packer.DecompressDeltaBitpackUint32(data, buf.decompressed, buf.compressed)
if err != nil {
return err
}
b.LIDs = append([]uint32{}, values...)
return nil
}

func (b *Block) unpackVarint(data []byte, buf *UnpackBuffer) error {
var lid, offset uint32

b.IsLastLID = true

buf.lids = buf.lids[:0]
buf.offsets = buf.offsets[:0]
buf.offsets = append(buf.offsets, 0) // first offset is always zero

unpacker := packer.NewBytesUnpacker(data)
Expand Down
226 changes: 226 additions & 0 deletions frac/sealed/lids/block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package lids

import (
"math"
"math/rand"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ozontech/seq-db/config"
)

func TestBlockPack(t *testing.T) {
testCases := []struct {
name string
lids []uint32
offsets []uint32
generator func() ([]uint32, []uint32)
}{
{
name: "small_single_token",
lids: generate(4),
offsets: []uint32{0, 4},
},
{
name: "small_a_few_token",
lids: generate(6),
offsets: []uint32{0, 3, 6},
},
{
name: "small_single_lid",
lids: []uint32{100},
offsets: []uint32{0, 1},
},
{
name: "small_big_lids",
lids: []uint32{math.MaxUint32 - 100, math.MaxUint32 - 50, math.MaxUint32 - 10},
offsets: []uint32{0, 3},
},
{
name: "small_few_tokens",
lids: generate(8),
offsets: []uint32{0, 3, 6, 8},
},
{
name: "medium_many_tokens",
generator: func() ([]uint32, []uint32) {
lids := make([]uint32, 0)
offsets := []uint32{0}
startLID := uint32(100)
for i := 0; i < 10; i++ {
for j := 0; j < 3; j++ {
lids = append(lids, startLID+uint32(i*10+j))
}
offsets = append(offsets, uint32(len(lids)))
startLID += 30
}
return lids, offsets
},
},
{
name: "large_many_tokens",
generator: func() ([]uint32, []uint32) {
lids := make([]uint32, 0, 150)
offsets := []uint32{0}
groupSize := 30
for group := 0; group < 5; group++ {
for i := 0; i < groupSize; i++ {
lids = append(lids, 1+uint32(group*groupSize*10+i*10))
}
offsets = append(offsets, uint32(len(lids)))
}
return lids, offsets
},
},
{
name: "medium_128_lids",
lids: generate(128),
offsets: []uint32{0, 128},
},
{
name: "medium_127_lids",
lids: generate(127),
offsets: []uint32{0, 127},
},
{
name: "medium_129_lids",
lids: generate(129),
offsets: []uint32{0, 129},
},
{
name: "medium_4k_lids",
lids: generate(4096),
offsets: []uint32{0, 4096},
},
{
name: "medium_4k_minus_one_lids",
lids: generate(4095),
offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 4095},
},
{
name: "medium_4k_plus_one_lids",
lids: generate(4097),
offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 4097},
},
{
name: "medium_64k_lids",
lids: generate(65536),
offsets: []uint32{0, 65536},
},
{
name: "medium_64k_minus_one_lids",
lids: generate(65535),
offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 65535},
},
{
name: "medium_64k_plus_one_lids",
lids: generate(65537),
offsets: []uint32{0, 10, 50, 100, 150, 190, 1000, 1500, 65537},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var lids []uint32
var offsets []uint32

if tc.generator != nil {
lids, offsets = tc.generator()
} else {
lids = tc.lids
offsets = tc.offsets
}

block := &Block{
LIDs: lids,
Offsets: offsets,
}

packed := block.Pack(nil, nil)
require.NotEmpty(t, packed)

unpacked := &Block{}
buf := &UnpackBuffer{}
err := unpacked.Unpack(packed, config.CurrentFracVersion, buf)

require.NoError(t, err)
assert.EqualExportedValues(t, block, unpacked)
})
}
}

func generate(n int) []uint32 {
v := make([]uint32, n)
last := uint32(100)
for i := range v {
v[i] = last
last += uint32(1 + rand.Intn(5))
}
return v
}

func TestBlockPack_ReuseBuffer(t *testing.T) {
// Test that UnpackBuffer can be reused
block1 := &Block{
LIDs: generate(64 * 1024),
Offsets: []uint32{0, 3},
}

block2 := &Block{
LIDs: generate(64 * 1024),
Offsets: []uint32{0, 4},
}

buf1 := make([]uint32, 0, 64*1024)
packed1 := block1.Pack(nil, buf1)

buf1 = buf1[:0]
packed2 := block2.Pack(nil, buf1)

buf2 := &UnpackBuffer{}

unpacked1 := &Block{}
err := unpacked1.Unpack(packed1, config.CurrentFracVersion, buf2)
require.NoError(t, err)
assert.Equal(t, block1.LIDs, unpacked1.LIDs)

unpacked2 := &Block{}
err = unpacked2.Unpack(packed2, config.CurrentFracVersion, buf2)
require.NoError(t, err)
assert.Equal(t, block2.LIDs, unpacked2.LIDs)
}

func BenchmarkBlock_Pack(b *testing.B) {
lids := generate(64 * 1024)

block := &Block{
LIDs: lids,
Offsets: []uint32{0, 64 * 1024},
}
tmp := make([]uint32, 0, 64*1024/4)

for b.Loop() {
block.Pack(nil, tmp)
}
}

func BenchmarkBlock_Unpack(b *testing.B) {
lids := generate(64 * 1024)

block := &Block{
LIDs: lids,
Offsets: []uint32{0, 64 * 1024},
}
packed := block.Pack(nil, nil)

buf := &UnpackBuffer{}
unpacked := &Block{}

b.ResetTimer()
for b.Loop() {
err := unpacked.Unpack(packed, config.CurrentFracVersion, buf)
assert.NoError(b, err)
}
}
Loading
Loading