Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions frac/active_token_list.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package frac

import (
"bytes"
"context"
"fmt"
"hash/crc32"
Expand Down Expand Up @@ -38,6 +39,38 @@ func (tp *activeTokenProvider) GetToken(tid uint32) []byte {
return tp.tidToVal[id]
}

// FindContains finds tids of tokens which contain a provided needle. From and to indices are specified inclusive.
func (tp *activeTokenProvider) FindContains(firstTID, lastTID uint32, needle []byte) ([]uint32, error) {
if len(needle) == 0 {
return nil, nil
}
var tids []uint32
for tid := firstTID; tid <= lastTID; tid++ {
if bytes.Contains(tp.GetToken(tid), needle) {
tids = append(tids, tid)
}
}
return tids, nil
}

// FindToken finds tids of tokens which suffice a provided searcher (predicate).
func (tp *activeTokenProvider) FindToken(searcher pattern.Searcher) ([]uint32, error) {
firstTID := searcher.FirstTID()
lastTID := searcher.LastTID()
var tids []uint32
for tid := firstTID; tid <= lastTID; tid++ {
match, err := searcher.Check(tp.GetToken(tid))
if err != nil {
return nil, err
}

if match {
tids = append(tids, tid)
}
}
return tids, nil
}

func (tp *activeTokenProvider) FirstTID() uint32 {
return 1
}
Expand Down
57 changes: 56 additions & 1 deletion frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1390,14 +1390,69 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
toTime: midTime,
},

// other queries
// wildcards
{
name: "trace_id:trace-4*",
query: "trace_id:trace-4*",
filter: func(doc *testDoc) bool { return strings.Contains(doc.traceId, "trace-4") },
fromTime: fromTime,
toTime: toTime,
},
{
name: "id:*1* OR id:*2*",
query: "id:*1* OR id:*2*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.id, "1") || strings.Contains(doc.id, "2")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "id:*1* AND id:*2*",
query: "id:*1* AND id:*2*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.id, "1") && strings.Contains(doc.id, "2")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "id:*1 OR id:*2 OR id:*3",
query: "id:*1 OR id:*2 OR id:*3",
filter: func(doc *testDoc) bool {
return strings.HasSuffix(doc.id, "1") || strings.HasSuffix(doc.id, "2") || strings.HasSuffix(doc.id, "3")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "message:*re*",
query: "message:*re*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.message, "re")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "message:*uest OR id:*1",
query: "message:*uest OR id:*1",
filter: func(doc *testDoc) bool {
// the only message token which suffices is 'request'
return strings.Contains(doc.message, "request") || strings.HasSuffix(doc.id, "1")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:*a*",
query: "service:*a*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.service, "a")
},
fromTime: fromTime,
toTime: toTime,
},
}

for _, tc := range searchTestCases {
Expand Down
26 changes: 26 additions & 0 deletions frac/sealed/token/block_loader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package token

import (
"bytes"
"encoding/binary"
"fmt"
"math"
Expand All @@ -10,6 +11,7 @@ import (

"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/pattern"
"github.com/ozontech/seq-db/storage"
)

Expand Down Expand Up @@ -60,6 +62,30 @@ func (b *Block) GetToken(index int) []byte {
return b.Payload[offset : offset+l]
}

func (b *Block) contains(from, to int, needle []byte) ([]int, error) {
indexes := make([]int, 0)
for i := from; i <= to; i++ {
if bytes.Contains(b.GetToken(i), needle) {
indexes = append(indexes, i)
}
}
return indexes, nil
}

func (b *Block) find(from, to int, searcher pattern.Searcher) ([]int, error) {
indexes := make([]int, 0)
for i := from; i <= to; i++ {
ok, err := searcher.Check(b.GetToken(i))
if err != nil {
return nil, err
}
if ok {
indexes = append(indexes, i)
}
}
return indexes, nil
}

// BlockLoader is responsible for Reading from disk, unpacking and caching tokens blocks.
// NOT THREAD SAFE. Do not use concurrently.
// Use your own BlockLoader instance for each search query
Expand Down
55 changes: 55 additions & 0 deletions frac/sealed/token/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package token

import (
"sort"

"github.com/ozontech/seq-db/pattern"
)

type Provider struct {
Expand Down Expand Up @@ -55,3 +57,56 @@ func (tp *Provider) GetToken(tid uint32) []byte {
block := tp.findBlock(entry.BlockIndex)
return block.GetToken(entry.GetIndexInTokensBlock(tid))
}

func (tp *Provider) FindContains(firstTID, lastTID uint32, needle []byte) ([]uint32, error) {
return tp.findInBlocks(firstTID, lastTID, func(b *Block, firstIndex, lastIndex int) ([]int, error) {
return b.contains(firstIndex, lastIndex, needle)
})
}

func (tp *Provider) FindToken(searcher pattern.Searcher) ([]uint32, error) {
return tp.findInBlocks(searcher.FirstTID(), searcher.LastTID(), func(b *Block, firstIndex, lastIndex int) ([]int, error) {
return b.find(firstIndex, lastIndex, searcher)
})
}

func (tp *Provider) findInBlocks(firstTID, lastTID uint32, search func(*Block, int, int) ([]int, error)) ([]uint32, error) {
entries := tp.narrowEntries(firstTID, lastTID)
if len(entries) == 0 {
return nil, nil
}

var tids []uint32

for _, entry := range entries {
block := tp.findBlock(entry.BlockIndex)
firstIndex, lastIndex := entry.narrowIndexes(firstTID, lastTID)
indexes, err := search(block, firstIndex, lastIndex)
if err != nil {
return nil, err
}
for _, idx := range indexes {
tid := entry.StartTID + uint32(idx-int(entry.StartIndex))
tids = append(tids, tid)
}
}
return tids, nil
}

func (tp *Provider) narrowEntries(firstTID, lastTID uint32) []*TableEntry {
firstIdx := sort.Search(len(tp.entries), func(i int) bool {
return tp.entries[i].getLastTID() >= firstTID
})

if firstIdx >= len(tp.entries) {
return nil
}

lastIdx := sort.Search(len(tp.entries), func(i int) bool {
return tp.entries[i].StartTID > lastTID
})

// INVARIANT: Following condition always holds:
// lastIdx <= len(tp.entries) && firstIdx <= lastIdx
return tp.entries[firstIdx:lastIdx]
}
15 changes: 15 additions & 0 deletions frac/sealed/token/table_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ func (t *TableEntry) getLastTID() uint32 {
return t.StartTID + t.ValCount - 1
}

func (t *TableEntry) narrowIndexes(firstTID, lastTID uint32) (int, int) {
tidStart := firstTID
if t.StartTID > tidStart {
tidStart = t.StartTID
}
tidEnd := lastTID
if entryLastTID := t.getLastTID(); entryLastTID < tidEnd {
tidEnd = entryLastTID
}

firstIndex := t.GetIndexInTokensBlock(tidStart)
lastIndex := t.GetIndexInTokensBlock(tidEnd)
return firstIndex, lastIndex
}

func (t *TableEntry) checkTIDInBlock(tid uint32) bool {
if tid < t.StartTID {
return false
Expand Down
63 changes: 34 additions & 29 deletions pattern/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

type tokenProvider interface {
GetToken(uint32) []byte
FindContains(firstTID uint32, lastTID uint32, needle []byte) ([]uint32, error)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide to make firstTID and lastTID a part of an API?

Seems like for this specific case (e.g. query foo:'*bar*') we cannot narrow the TID search boundaries.

And now we always pass the first and last TID in this method:
https://github.com/ozontech/seq-db/blob/0-wildcard-predicate-pushdown/pattern/pattern.go#L411

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For foo:'*bar*' we narrow down to all tokens of foo field, i.e. foo:? tokens. If there are 50 such tokens only, we will check only a single block, and firstTID might be like 1000 and lastTID 1050

Copy link
Copy Markdown
Member

@dkharms dkharms May 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I see. But for specific case of FindContains() arguments firstLID, lastLID should not be a part of an API. Here, take a look, we already have all necessary information in Provider:

type Provider struct {
	...
    // NOTE: Entries already were narrowed.
	entries  []*TableEntry
	...
}

func (tp *Provider) FirstTID() uint32 {
	return tp.entries[0].StartTID
}

func (tp *Provider) LastTID() uint32 {
	return tp.entries[len(tp.entries)-1].getLastTID()
}

func (tp *Provider) FindContains(needle []byte) ([]uint32, error) {
	return tp.findInBlocks(tp.FirstTID(), tp.LastTID(), func(b *Block, firstIndex, lastIndex int) ([]int, error) {
		return b.contains(firstIndex, lastIndex, needle)
	})
}

FindToken(searcher Searcher) ([]uint32, error)
FirstTID() uint32
LastTID() uint32
Ordered() bool
Expand All @@ -27,11 +29,11 @@ type baseSearch struct {
last int
}

func (s *baseSearch) firstTID() uint32 {
func (s *baseSearch) FirstTID() uint32 {
return uint32(s.first)
}

func (s *baseSearch) lastTID() uint32 {
func (s *baseSearch) LastTID() uint32 {
return uint32(s.last)
}

Expand Down Expand Up @@ -67,7 +69,7 @@ func (s *literalSearch) Narrow(tp tokenProvider) {
s.last = s.first - 1 // begin > end: will be considered empty
}

func (s *literalSearch) check(val []byte) (bool, error) {
func (s *literalSearch) Check(val []byte) (bool, error) {
if s.narrowed {
return len(s.value) == len(val), nil
}
Expand Down Expand Up @@ -165,7 +167,7 @@ func findSequence(haystack []byte, needles [][]byte) int {
return len(needles)
}

func (s *wildcardSearch) check(val []byte) (bool, error) {
func (s *wildcardSearch) Check(val []byte) (bool, error) {
return s.checkPrefix(val) && s.checkSuffix(val) && s.checkMiddle(val), nil
}

Expand All @@ -181,7 +183,7 @@ func newRangeTextSearch(base baseSearch, token *parser.Range) *rangeTextSearch {
}
}

func (s *rangeTextSearch) check(val []byte) (bool, error) {
func (s *rangeTextSearch) Check(val []byte) (bool, error) {
valStr := string(val)
if s.token.From.Kind != parser.TermSymbol {
if s.token.IncludeFrom {
Expand Down Expand Up @@ -244,7 +246,7 @@ func newRangeNumberSearch(base baseSearch, token *parser.Range) *rangeNumberSear
return s
}

func (s *rangeNumberSearch) check(rawVal []byte) (bool, error) {
func (s *rangeNumberSearch) Check(rawVal []byte) (bool, error) {
val, err := strconv.ParseFloat(string(rawVal), 64)
if err != nil || isNaNOrInf(val) {
return false, nil
Expand Down Expand Up @@ -301,7 +303,7 @@ func newRangeIPSearch(base baseSearch, token *parser.IPRange) *rangeIpSearch {
return s
}

func (s *rangeIpSearch) check(rawVal []byte) (bool, error) {
func (s *rangeIpSearch) Check(rawVal []byte) (bool, error) {
val, err := netip.ParseAddr(string(rawVal))
if err != nil {
return false, nil
Expand All @@ -324,7 +326,7 @@ func newReSearch(base baseSearch, token *parser.Re) *reSearch {
return &reSearch{baseSearch: base, r: token.CompiledExpression}
}

func (s *reSearch) check(rawVal []byte) (bool, error) {
func (s *reSearch) Check(rawVal []byte) (bool, error) {
if config.MaxRegexTokensCheck > 0 && s.checked >= config.MaxRegexTokensCheck {
return false, errors.New(
"'re' filter exceeded token limit: " +
Expand All @@ -335,13 +337,13 @@ func (s *reSearch) check(rawVal []byte) (bool, error) {
return s.r.Match(rawVal), nil
}

type searcher interface {
firstTID() uint32
lastTID() uint32
check(val []byte) (bool, error)
type Searcher interface {
FirstTID() uint32
LastTID() uint32
Check(val []byte) (bool, error)
}

func newSearcher(token parser.Token, tp tokenProvider) searcher {
func newSearcher(token parser.Token, tp tokenProvider) Searcher {
base := baseSearch{
first: int(tp.FirstTID()),
last: int(tp.LastTID()),
Expand Down Expand Up @@ -390,22 +392,25 @@ func isNaNOrInf(f float64) bool {
return math.IsNaN(f) || math.IsInf(f, 0)
}

func Search(ctx context.Context, t parser.Token, tp tokenProvider) ([]uint32, error) {
tids := []uint32{}
s := newSearcher(t, tp)
for tid := s.firstTID(); tid <= s.lastTID(); tid++ {
if tid&1023 == 0 && util.IsCancelled(ctx) {
return nil, ctx.Err()
}

match, err := s.check(tp.GetToken(tid))
if err != nil {
return nil, err
}
// isSimpleWildcardContains checks if this AST token is simple wildcard like 'foo:*bar*'
func isSimpleWildcardContains(token parser.Token) (needle []byte, ok bool) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment with an example of an expression that fits/matches this check

lit, ok := token.(*parser.Literal)
if !ok || len(lit.Terms) != 3 {
return nil, false
}
if !lit.Terms[0].IsWildcard() || lit.Terms[1].Kind != parser.TermText || !lit.Terms[2].IsWildcard() {
return nil, false
}
return []byte(lit.Terms[1].Data), true
}

if match {
tids = append(tids, tid)
}
func Search(ctx context.Context, t parser.Token, tp tokenProvider) ([]uint32, error) {
if util.IsCancelled(ctx) {
return nil, ctx.Err()
}
if needle, ok := isSimpleWildcardContains(t); ok {
return tp.FindContains(tp.FirstTID(), tp.LastTID(), needle)
}
return tids, nil
s := newSearcher(t, tp)
return tp.FindToken(s)
}
Loading
Loading