-
Notifications
You must be signed in to change notification settings - Fork 11
perf: improve wildcard query perf with predicate and contains-check pushdown #397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
0fa1d0e
cee0a60
c1219b9
432b592
29ff016
739974d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| package token | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "encoding/binary" | ||
| "fmt" | ||
| "math" | ||
|
|
@@ -10,6 +11,7 @@ import ( | |
|
|
||
| "github.com/ozontech/seq-db/cache" | ||
| "github.com/ozontech/seq-db/logger" | ||
| "github.com/ozontech/seq-db/pattern" | ||
| "github.com/ozontech/seq-db/storage" | ||
| ) | ||
|
|
||
|
|
@@ -60,6 +62,30 @@ func (b *Block) GetToken(index int) []byte { | |
| return b.Payload[offset : offset+l] | ||
| } | ||
|
|
||
| func (b *Block) FindContains(from, to int, needle []byte) ([]int, error) { | ||
|
dkharms marked this conversation as resolved.
Outdated
|
||
| indices := make([]int, 0) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess you could pass here slice of needles as well to handle queries like
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I think it's doable. Maybe will do
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. upd: will do in a separate PR |
||
| for i := from; i <= to; i++ { | ||
| if bytes.Contains(b.GetToken(i), needle) { | ||
| indices = append(indices, i) | ||
| } | ||
| } | ||
| return indices, nil | ||
| } | ||
|
|
||
| func (b *Block) FindToken(from, to int, searcher pattern.Searcher) ([]int, error) { | ||
| indices := make([]int, 0) | ||
| for i := from; i <= to; i++ { | ||
| ok, err := searcher.Check(b.GetToken(i)) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if ok { | ||
| indices = append(indices, i) | ||
| } | ||
| } | ||
| return indices, nil | ||
| } | ||
|
|
||
| // BlockLoader is responsible for Reading from disk, unpacking and caching tokens blocks. | ||
| // NOT THREAD SAFE. Do not use concurrently. | ||
| // Use your own BlockLoader instance for each search query | ||
|
|
||
| Original file line number | Diff line number | Diff line change | |||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,6 +2,8 @@ package token | ||||||||||
|
|
|||||||||||
| import ( | |||||||||||
| "sort" | |||||||||||
|
|
|||||||||||
| "github.com/ozontech/seq-db/pattern" | |||||||||||
| ) | |||||||||||
|
|
|||||||||||
| type Provider struct { | |||||||||||
|
|
@@ -55,3 +57,71 @@ func (tp *Provider) GetToken(tid uint32) []byte { | ||||||||||
| block := tp.findBlock(entry.BlockIndex) | |||||||||||
| return block.GetToken(entry.GetIndexInTokensBlock(tid)) | |||||||||||
| } | |||||||||||
|
|
|||||||||||
| func (tp *Provider) FindContains(firstTID, lastTID uint32, needle []byte) ([]uint32, error) { | |||||||||||
| return tp.findInBlocks(firstTID, lastTID, func(b *Block, firstIndex, lastIndex int) ([]int, error) { | |||||||||||
| return b.FindContains(firstIndex, lastIndex, needle) | |||||||||||
| }) | |||||||||||
| } | |||||||||||
|
|
|||||||||||
| func (tp *Provider) FindToken(searcher pattern.Searcher) ([]uint32, error) { | |||||||||||
| return tp.findInBlocks(searcher.FirstTID(), searcher.LastTID(), func(b *Block, firstIndex, lastIndex int) ([]int, error) { | |||||||||||
| return b.FindToken(firstIndex, lastIndex, searcher) | |||||||||||
| }) | |||||||||||
| } | |||||||||||
|
|
|||||||||||
| func (tp *Provider) findInBlocks(firstTID, lastTID uint32, search func(*Block, int, int) ([]int, error)) ([]uint32, error) { | |||||||||||
| entries := tp.narrowEntries(firstTID, lastTID) | |||||||||||
| if len(entries) == 0 { | |||||||||||
| return nil, nil | |||||||||||
| } | |||||||||||
|
|
|||||||||||
| var tids []uint32 | |||||||||||
|
|
|||||||||||
| for _, entry := range entries { | |||||||||||
| block := tp.findBlock(entry.BlockIndex) | |||||||||||
| firstIndex, lastIndex := tp.narrowTIDs(entry, firstTID, lastTID) | |||||||||||
|
dkharms marked this conversation as resolved.
Outdated
|
|||||||||||
| indices, err := search(block, firstIndex, lastIndex) | |||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: better to use 'indexes' in this context
|
|||||||||||
| if err != nil { | |||||||||||
| return nil, err | |||||||||||
| } | |||||||||||
| for _, idx := range indices { | |||||||||||
| tid := entry.StartTID + uint32(idx-int(entry.StartIndex)) | |||||||||||
| tids = append(tids, tid) | |||||||||||
| } | |||||||||||
| } | |||||||||||
| return tids, nil | |||||||||||
| } | |||||||||||
|
|
|||||||||||
| func (tp *Provider) narrowTIDs(entry *TableEntry, firstTID, fromTID uint32) (int, int) { | |||||||||||
|
dkharms marked this conversation as resolved.
Outdated
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not a token provider method; tp is not used in this function at all. Rather, this is a method for TableEntry
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved to |
|||||||||||
| tidStart := firstTID | |||||||||||
| if entry.StartTID > tidStart { | |||||||||||
| tidStart = entry.StartTID | |||||||||||
| } | |||||||||||
| tidEnd := fromTID | |||||||||||
| if lastTID := entry.getLastTID(); lastTID < tidEnd { | |||||||||||
| tidEnd = lastTID | |||||||||||
| } | |||||||||||
|
|
|||||||||||
| firstIndex := entry.GetIndexInTokensBlock(tidStart) | |||||||||||
| lastIndex := entry.GetIndexInTokensBlock(tidEnd) | |||||||||||
| return firstIndex, lastIndex | |||||||||||
| } | |||||||||||
|
|
|||||||||||
| func (tp *Provider) narrowEntries(firstTID, lastTID uint32) []*TableEntry { | |||||||||||
| firstIdx := sort.Search(len(tp.entries), func(i int) bool { | |||||||||||
| return tp.entries[i].getLastTID() >= firstTID | |||||||||||
| }) | |||||||||||
| if firstIdx >= len(tp.entries) { | |||||||||||
| return nil | |||||||||||
| } | |||||||||||
| lastIdx := sort.Search(len(tp.entries), func(i int) bool { | |||||||||||
| return tp.entries[i].StartTID > lastTID | |||||||||||
| }) | |||||||||||
| lastIdx-- | |||||||||||
| if lastIdx < firstIdx { | |||||||||||
| return nil | |||||||||||
| } | |||||||||||
| entries := tp.entries[firstIdx : lastIdx+1] | |||||||||||
| return entries | |||||||||||
| } | |||||||||||
|
dkharms marked this conversation as resolved.
Outdated
|
|||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,8 @@ import ( | |
|
|
||
| type tokenProvider interface { | ||
| GetToken(uint32) []byte | ||
| FindContains(firstTID uint32, lastTID uint32, needle []byte) ([]uint32, error) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you decide to make Seems like for this specific case (e.g. query And now we always pass the first and last TID in this method:
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I see. But for specific case of type Provider struct {
...
// NOTE: Entries already were narrowed.
entries []*TableEntry
...
}
func (tp *Provider) FirstTID() uint32 {
return tp.entries[0].StartTID
}
func (tp *Provider) LastTID() uint32 {
return tp.entries[len(tp.entries)-1].getLastTID()
}
func (tp *Provider) FindContains(needle []byte) ([]uint32, error) {
return tp.findInBlocks(tp.FirstTID(), tp.LastTID(), func(b *Block, firstIndex, lastIndex int) ([]int, error) {
return b.contains(firstIndex, lastIndex, needle)
})
} |
||
| FindToken(searcher Searcher) ([]uint32, error) | ||
| FirstTID() uint32 | ||
| LastTID() uint32 | ||
| Ordered() bool | ||
|
|
@@ -27,11 +29,11 @@ type baseSearch struct { | |
| last int | ||
| } | ||
|
|
||
| func (s *baseSearch) firstTID() uint32 { | ||
| func (s *baseSearch) FirstTID() uint32 { | ||
| return uint32(s.first) | ||
| } | ||
|
|
||
| func (s *baseSearch) lastTID() uint32 { | ||
| func (s *baseSearch) LastTID() uint32 { | ||
| return uint32(s.last) | ||
| } | ||
|
|
||
|
|
@@ -67,7 +69,7 @@ func (s *literalSearch) Narrow(tp tokenProvider) { | |
| s.last = s.first - 1 // begin > end: will be considered empty | ||
| } | ||
|
|
||
| func (s *literalSearch) check(val []byte) (bool, error) { | ||
| func (s *literalSearch) Check(val []byte) (bool, error) { | ||
| if s.narrowed { | ||
| return len(s.value) == len(val), nil | ||
| } | ||
|
|
@@ -165,7 +167,7 @@ func findSequence(haystack []byte, needles [][]byte) int { | |
| return len(needles) | ||
| } | ||
|
|
||
| func (s *wildcardSearch) check(val []byte) (bool, error) { | ||
| func (s *wildcardSearch) Check(val []byte) (bool, error) { | ||
| return s.checkPrefix(val) && s.checkSuffix(val) && s.checkMiddle(val), nil | ||
| } | ||
|
|
||
|
|
@@ -181,7 +183,7 @@ func newRangeTextSearch(base baseSearch, token *parser.Range) *rangeTextSearch { | |
| } | ||
| } | ||
|
|
||
| func (s *rangeTextSearch) check(val []byte) (bool, error) { | ||
| func (s *rangeTextSearch) Check(val []byte) (bool, error) { | ||
| valStr := string(val) | ||
| if s.token.From.Kind != parser.TermSymbol { | ||
| if s.token.IncludeFrom { | ||
|
|
@@ -244,7 +246,7 @@ func newRangeNumberSearch(base baseSearch, token *parser.Range) *rangeNumberSear | |
| return s | ||
| } | ||
|
|
||
| func (s *rangeNumberSearch) check(rawVal []byte) (bool, error) { | ||
| func (s *rangeNumberSearch) Check(rawVal []byte) (bool, error) { | ||
| val, err := strconv.ParseFloat(string(rawVal), 64) | ||
| if err != nil || isNaNOrInf(val) { | ||
| return false, nil | ||
|
|
@@ -301,7 +303,7 @@ func newRangeIPSearch(base baseSearch, token *parser.IPRange) *rangeIpSearch { | |
| return s | ||
| } | ||
|
|
||
| func (s *rangeIpSearch) check(rawVal []byte) (bool, error) { | ||
| func (s *rangeIpSearch) Check(rawVal []byte) (bool, error) { | ||
| val, err := netip.ParseAddr(string(rawVal)) | ||
| if err != nil { | ||
| return false, nil | ||
|
|
@@ -324,7 +326,7 @@ func newReSearch(base baseSearch, token *parser.Re) *reSearch { | |
| return &reSearch{baseSearch: base, r: token.CompiledExpression} | ||
| } | ||
|
|
||
| func (s *reSearch) check(rawVal []byte) (bool, error) { | ||
| func (s *reSearch) Check(rawVal []byte) (bool, error) { | ||
| if config.MaxRegexTokensCheck > 0 && s.checked >= config.MaxRegexTokensCheck { | ||
| return false, errors.New( | ||
| "'re' filter exceeded token limit: " + | ||
|
|
@@ -335,13 +337,13 @@ func (s *reSearch) check(rawVal []byte) (bool, error) { | |
| return s.r.Match(rawVal), nil | ||
| } | ||
|
|
||
| type searcher interface { | ||
| firstTID() uint32 | ||
| lastTID() uint32 | ||
| check(val []byte) (bool, error) | ||
| type Searcher interface { | ||
| FirstTID() uint32 | ||
| LastTID() uint32 | ||
| Check(val []byte) (bool, error) | ||
| } | ||
|
|
||
| func newSearcher(token parser.Token, tp tokenProvider) searcher { | ||
| func newSearcher(token parser.Token, tp tokenProvider) Searcher { | ||
| base := baseSearch{ | ||
| first: int(tp.FirstTID()), | ||
| last: int(tp.LastTID()), | ||
|
|
@@ -390,22 +392,24 @@ func isNaNOrInf(f float64) bool { | |
| return math.IsNaN(f) || math.IsInf(f, 0) | ||
| } | ||
|
|
||
| func Search(ctx context.Context, t parser.Token, tp tokenProvider) ([]uint32, error) { | ||
| tids := []uint32{} | ||
| s := newSearcher(t, tp) | ||
| for tid := s.firstTID(); tid <= s.lastTID(); tid++ { | ||
| if tid&1023 == 0 && util.IsCancelled(ctx) { | ||
| return nil, ctx.Err() | ||
| } | ||
|
|
||
| match, err := s.check(tp.GetToken(tid)) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| func isSimpleWildcardContains(token parser.Token) (needle []byte, ok bool) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a comment with an example of an expression that fits/matches this check |
||
| lit, ok := token.(*parser.Literal) | ||
| if !ok || len(lit.Terms) != 3 { | ||
| return nil, false | ||
| } | ||
| if !lit.Terms[0].IsWildcard() || lit.Terms[1].Kind != parser.TermText || !lit.Terms[2].IsWildcard() { | ||
| return nil, false | ||
| } | ||
| return []byte(lit.Terms[1].Data), true | ||
| } | ||
|
|
||
| if match { | ||
| tids = append(tids, tid) | ||
| } | ||
| func Search(ctx context.Context, t parser.Token, tp tokenProvider) ([]uint32, error) { | ||
| if util.IsCancelled(ctx) { | ||
| return nil, ctx.Err() | ||
| } | ||
| if needle, ok := isSimpleWildcardContains(t); ok { | ||
| return tp.FindContains(tp.FirstTID(), tp.LastTID(), needle) | ||
| } | ||
| return tids, nil | ||
| s := newSearcher(t, tp) | ||
| return tp.FindToken(s) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've discussed that you can perform
bytes.Containson the block payload before checking each token individually. Have you measured performance of such optimization?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I tried calling
bytes.Indexon entire payload. It boosts even further comparing to this PR:message:foobar
35 ms => 9 ms
However, this means that when
bytes.Indexreturns and if we have some proper index returned, then we need to do a bin search onOffsetsto find an index and then check for false positive. It also comes with neat property that we can avoid callUnpack(build offsets) lazily which boosts cold query performance (somewhat around extra 20%).I put a task to the backlog, decided that it's too much for a single PR.