diff --git a/cmd/file.d/file.d.go b/cmd/file.d/file.d.go index 4cb8b58ca..3b89bcd77 100644 --- a/cmd/file.d/file.d.go +++ b/cmd/file.d/file.d.go @@ -44,6 +44,7 @@ import ( _ "github.com/ozontech/file.d/plugin/action/set_time" _ "github.com/ozontech/file.d/plugin/action/split" _ "github.com/ozontech/file.d/plugin/action/throttle" + _ "github.com/ozontech/file.d/plugin/action/transform" _ "github.com/ozontech/file.d/plugin/input/dmesg" _ "github.com/ozontech/file.d/plugin/input/fake" _ "github.com/ozontech/file.d/plugin/input/file" diff --git a/e2e/start_work_test.go b/e2e/start_work_test.go index c7c349eb2..b6f8a7f3d 100644 --- a/e2e/start_work_test.go +++ b/e2e/start_work_test.go @@ -50,6 +50,7 @@ import ( _ "github.com/ozontech/file.d/plugin/action/set_time" _ "github.com/ozontech/file.d/plugin/action/split" _ "github.com/ozontech/file.d/plugin/action/throttle" + _ "github.com/ozontech/file.d/plugin/action/transform" _ "github.com/ozontech/file.d/plugin/input/dmesg" _ "github.com/ozontech/file.d/plugin/input/fake" _ "github.com/ozontech/file.d/plugin/input/file" diff --git a/playground/playground.go b/playground/playground.go index 61b65a259..12a4b40f9 100644 --- a/playground/playground.go +++ b/playground/playground.go @@ -35,6 +35,7 @@ import ( _ "github.com/ozontech/file.d/plugin/action/remove_fields" _ "github.com/ozontech/file.d/plugin/action/rename" _ "github.com/ozontech/file.d/plugin/action/set_time" + _ "github.com/ozontech/file.d/plugin/action/transform" _ "github.com/ozontech/file.d/plugin/input/fake" "github.com/ozontech/file.d/plugin/output/devnull" "github.com/prometheus/client_golang/prometheus" diff --git a/plugin/action/transform/compiler/compiler.go b/plugin/action/transform/compiler/compiler.go new file mode 100644 index 000000000..a33305bce --- /dev/null +++ b/plugin/action/transform/compiler/compiler.go @@ -0,0 +1,714 @@ +package compiler + +import ( + "fmt" + "strconv" + + "github.com/ozontech/file.d/plugin/action/transform/core" + "github.com/ozontech/file.d/plugin/action/transform/parser" +) + +type compileError struct { + Pos parser.Position + Message string +} + +func (e *compileError) Error() string { + return fmt.Sprintf("parse error at (%s): %s", e.Pos, e.Message) +} + +// Compiler builds an AST from a slice of tokens. +type Compiler struct { + tokens []parser.Token + pos int +} + +func NewCompiler(source string) (*Compiler, error) { + tokens, err := parser.Parse(source) + if err != nil { + return nil, err + } + return &Compiler{tokens: tokens}, nil +} + +func (c *Compiler) Compile() ([]core.Expr, error) { + var exprs []core.Expr + + for !c.atEnd() { + for c.match(parser.SEMICOLON) { + } + if c.atEnd() { + break + } + + expr, err := c.parseExpr(parser.BpLowest) + if err != nil { + return nil, err + } + exprs = append(exprs, expr) + } + + return exprs, nil +} + +// Returns the current token without advancing. +// Returns the EOF token when the stream is finished. +func (c *Compiler) peek() parser.Token { + if c.pos >= len(c.tokens) { + return parser.Token{Type: parser.EOF} + } + return c.tokens[c.pos] +} + +// Returns the token at pos+offset without advancing. +// Returns the EOF token when out of bounds. +func (c *Compiler) peekAt(offset int) parser.Token { + idx := c.pos + offset + if idx >= len(c.tokens) { + return parser.Token{Type: parser.EOF} + } + return c.tokens[idx] +} + +// Returns the current token and moves the position forward. +func (c *Compiler) advance() parser.Token { + tok := c.peek() + if !c.atEnd() { + c.pos++ + } + return tok +} + +// Consumes the current token if it matches typ, or returns an error. +func (c *Compiler) expect(typ parser.TokenType) (parser.Token, error) { + tok := c.peek() + if tok.Type != typ { + return tok, c.errorf(tok, "expected %s, got %s (%q)", typ, tok.Type, tok.Lexeme) + } + return c.advance(), nil +} + +// Consumes the current token if it matches typ; returns true on success. +func (c *Compiler) match(typ parser.TokenType) bool { + if c.peek().Type == typ { + c.pos++ + return true + } + return false +} + +func (c *Compiler) check(typ parser.TokenType) bool { + return c.peek().Type == typ +} + +func (c *Compiler) atEnd() bool { + return c.pos >= len(c.tokens) +} + +func (c *Compiler) errorf(tok parser.Token, format string, args ...any) *compileError { + return &compileError{ + Pos: tok.StartPos(), + Message: fmt.Sprintf(format, args...), + } +} + +// parseExpr main function of the Pratt parser. +// +// minBP is the minimum binding power that an infix operator must exceed +// in order to be consumed. Calling with parser.BpLowest parses a full expression. +// +// - Left-associative: infix calls parseExpr(bp(op)) - same BP blocks re-entry +// - Right-associative: infix calls parseExpr(bp(op)-1) - same BP is allowed on the right +func (c *Compiler) parseExpr(minBP int) (core.Expr, error) { + // parse the left operand via a prefix handler + left, err := c.parsePrefix() + if err != nil { + return nil, err + } + + // consume infix operators while they are stronger than the threshold + for { + next := c.peek() + if next.Type.BindingPower() <= minBP { + break + } + op := c.advance() + left, err = c.parseInfix(left, op) + if err != nil { + return nil, err + } + } + + return left, nil +} + +// Called when a token appears at the start of an expression. +func (c *Compiler) parsePrefix() (core.Expr, error) { + tok := c.peek() + + switch tok.Type { + + // Literals + case parser.LIT_INTEGER: + return c.parseIntLit() + case parser.LIT_FLOAT: + return c.parseFloatLit() + case parser.LIT_STRING, parser.LIT_STRING_RAW: + return c.parseStringLit() + case parser.KW_TRUE: + return &core.BoolLit{Node: nodeAt(c.advance()), Value: true}, nil + case parser.KW_FALSE: + return &core.BoolLit{Node: nodeAt(c.advance()), Value: false}, nil + case parser.KW_NULL: + return &core.NullLit{Node: nodeAt(c.advance())}, nil + case parser.KW_DEL: + return c.parseDel() + case parser.LIT_REGEX: + t := c.advance() + return &core.RegexLit{Node: nodeAt(t), Pattern: unwrap(t.Lexeme, 2)}, nil + case parser.LIT_TIMESTAMP: + t := c.advance() + return &core.TimestampLit{Node: nodeAt(t), Value: unwrap(t.Lexeme, 2)}, nil + + // Identifier - variable or function call + case parser.IDENT: + t := c.advance() + return &core.IdentExpr{Node: nodeAt(t), Name: t.Lexeme}, nil + + // Paths + case parser.DOT: + return c.parseEventPath() + case parser.PERCENT: + return c.parseMetadataPath() + + // Unary operators + case parser.BANG, parser.MINUS: + return c.parseUnary() + + // Grouped expression + case parser.LPAREN: + return c.parseGrouped() + + // Collection literals + case parser.LBRACKET: + return c.parseArray() + case parser.LBRACE: + return c.parseObject() + + // Control flow + case parser.KW_IF: + return c.parseIf() + case parser.KW_ABORT: + return &core.AbortExpr{Node: nodeAt(c.advance())}, nil + case parser.KW_FOR: + return c.parseFor() + } + + return nil, c.errorf(tok, "unexpected token %s (%q)", tok.Type, tok.Lexeme) +} + +// Called when a token appears between two expressions. +func (c *Compiler) parseInfix(left core.Expr, op parser.Token) (core.Expr, error) { + switch op.Type { + + case parser.OP_ASSIGN: + if !isLValue(left) { + return nil, c.errorf(op, "left side of assignment must be a variable, path, or index expression") + } + // right-associative: bp-1 allows chaining a = b = c -> a = (b = c) + right, err := c.parseExpr(parser.BpAssign - 1) + if err != nil { + return nil, err + } + return &core.AssignExpr{ + Node: core.NewNode(left.Pos()), + Target: left, + Value: right, + }, nil + case parser.OP_OR, parser.OP_AND, + parser.OP_EQ, parser.OP_NEQ, + parser.OP_LT, parser.OP_LTE, parser.OP_GT, parser.OP_GTE, + parser.PLUS, parser.MINUS, + parser.STAR, parser.SLASH, parser.PERCENT: + right, err := c.parseExpr(op.Type.BindingPower()) + if err != nil { + return nil, err + } + return &core.BinaryExpr{ + Node: core.NewNode(left.Pos()), + Left: left, + Op: op.Lexeme, + Right: right, + }, nil + + // function call + case parser.LPAREN: + ident, ok := left.(*core.IdentExpr) + if !ok { + return nil, c.errorf(op, "function call requires an identifier on the left, got %T", left) + } + args, err := c.parseArgList() + if err != nil { + return nil, err + } + if _, err := c.expect(parser.RPAREN); err != nil { + return nil, err + } + return &core.CallExpr{Node: ident.Node, Name: ident.Name, Args: args}, nil + + // index access + // path indexing (.field[0]) is handled inside parseEventPath. + case parser.LBRACKET: + index, err := c.parseExpr(parser.BpLowest) + if err != nil { + return nil, err + } + if _, err := c.expect(parser.RBRACKET); err != nil { + return nil, err + } + return &core.IndexExpr{ + Node: core.NewNode(left.Pos()), + Object: left, + Index: index, + }, nil + } + + return nil, c.errorf(op, "unknown infix operator %q", op.Lexeme) +} + +func (c *Compiler) parseIntLit() (core.Expr, error) { + tok := c.advance() + v, err := strconv.ParseInt(tok.Lexeme, 10, 64) + if err != nil { + return nil, c.errorf(tok, "invalid integer literal %q", tok.Lexeme) + } + return &core.IntLit{Node: nodeAt(tok), Value: v}, nil +} + +func (c *Compiler) parseFloatLit() (core.Expr, error) { + tok := c.advance() + v, err := strconv.ParseFloat(tok.Lexeme, 64) + if err != nil { + return nil, c.errorf(tok, "invalid float literal %q", tok.Lexeme) + } + return &core.FloatLit{Node: nodeAt(tok), Value: v}, nil +} + +func (c *Compiler) parseStringLit() (core.Expr, error) { + tok := c.advance() + switch tok.Type { + case parser.LIT_STRING: + // process escape sequences. + v, err := strconv.Unquote(tok.Lexeme) + if err != nil { + return nil, c.errorf(tok, "invalid string literal: %v", err) + } + return &core.StringLit{Node: nodeAt(tok), Value: v}, nil + + case parser.LIT_STRING_RAW: + return &core.StringLit{Node: nodeAt(tok), Value: unwrap(tok.Lexeme, 2)}, nil + } + return nil, c.errorf(tok, "expected string, got %s", tok.Type) +} + +func (c *Compiler) parseUnary() (core.Expr, error) { + op := c.advance() + operand, err := c.parseExpr(parser.BpUnary) + if err != nil { + return nil, err + } + return &core.UnaryExpr{Node: nodeAt(op), Op: op.Lexeme, Operand: operand}, nil +} + +func (c *Compiler) parseGrouped() (core.Expr, error) { + // consume ( + c.advance() + + expr, err := c.parseExpr(parser.BpLowest) + if err != nil { + return nil, err + } + if _, err := c.expect(parser.RPAREN); err != nil { + return nil, err + } + return expr, nil +} + +func (c *Compiler) parseArray() (core.Expr, error) { + // consume [ + start := c.advance() + + var elements []core.Expr + for !c.check(parser.RBRACKET) && !c.atEnd() { + el, err := c.parseExpr(parser.BpLowest) + if err != nil { + return nil, err + } + elements = append(elements, el) + if !c.match(parser.COMMA) { + break + } + } + + if _, err := c.expect(parser.RBRACKET); err != nil { + return nil, err + } + return &core.ArrayExpr{Node: nodeAt(start), Elements: elements}, nil +} + +func (c *Compiler) parseObject() (core.Expr, error) { + // consume { + start := c.advance() + + var pairs []core.KVPair + for !c.check(parser.RBRACE) && !c.atEnd() { + kv, err := c.parseKVPair() + if err != nil { + return nil, err + } + pairs = append(pairs, kv) + if !c.match(parser.COMMA) { + break + } + } + + if _, err := c.expect(parser.RBRACE); err != nil { + return nil, err + } + return &core.ObjectExpr{Node: nodeAt(start), Pairs: pairs}, nil +} + +func (c *Compiler) parseKVPair() (core.KVPair, error) { + tok := c.peek() + + var key string + switch tok.Type { + case parser.LIT_STRING: + t := c.advance() + v, err := strconv.Unquote(t.Lexeme) + if err != nil { + return core.KVPair{}, c.errorf(t, "invalid object key: %v", err) + } + key = v + case parser.LIT_STRING_RAW: + t := c.advance() + key = unwrap(t.Lexeme, 2) + case parser.IDENT: + key = c.advance().Lexeme + default: + return core.KVPair{}, c.errorf(tok, "object key must be a string or identifier, got %s", tok.Type) + } + + if _, err := c.expect(parser.COLON); err != nil { + return core.KVPair{}, err + } + + val, err := c.parseExpr(parser.BpLowest) + if err != nil { + return core.KVPair{}, err + } + + return core.KVPair{Key: key, Value: val}, nil +} + +func (c *Compiler) parseEventPath() (core.Expr, error) { + // consume . + start := c.advance() + + var segments []core.PathSegment + if seg, ok, err := c.tryFieldSegment(); err != nil { + return nil, err + } else if ok { + segments = append(segments, seg) + var err error + segments, err = c.continueSegments(segments) + if err != nil { + return nil, err + } + } + + return &core.PathExpr{Node: nodeAt(start), Root: core.EventRoot, Segments: segments}, nil +} + +func (c *Compiler) parseMetadataPath() (core.Expr, error) { + // consume % + start := c.advance() + + tok := c.peek() + if tok.Type != parser.IDENT { + return nil, c.errorf(tok, "expected metadata field name after %%, got %s", tok.Type) + } + + segments := []core.PathSegment{{Field: c.advance().Lexeme}} + + var err error + segments, err = c.continueSegments(segments) + if err != nil { + return nil, err + } + + return &core.PathExpr{Node: nodeAt(start), Root: core.MetadataRoot, Segments: segments}, nil +} + +// Attempts to read a named path segment. +func (c *Compiler) tryFieldSegment() (core.PathSegment, bool, error) { + switch c.peek().Type { + case parser.IDENT: + return core.PathSegment{Field: c.advance().Lexeme}, true, nil + case parser.LIT_STRING: + t := c.advance() + v, err := strconv.Unquote(t.Lexeme) + if err != nil { + return core.PathSegment{}, false, c.errorf(t, "invalid field name: %v", err) + } + return core.PathSegment{Field: v}, true, nil + case parser.LIT_STRING_RAW: + t := c.advance() + return core.PathSegment{Field: unwrap(t.Lexeme, 2)}, true, nil + } + return core.PathSegment{}, false, nil +} + +// Greedily consumes path continuations: .field and [index]. +func (c *Compiler) continueSegments(segments []core.PathSegment) ([]core.PathSegment, error) { + for { + switch c.peek().Type { + case parser.DOT: + if c.pos-1 >= 0 && c.pos-1 < len(c.tokens) { + dot := c.peek() + prev := c.tokens[c.pos-1] + if dot.StartLine > prev.EndLine { + return segments, nil + } + } + + c.advance() + seg, ok, err := c.tryFieldSegment() + if err != nil { + return nil, err + } + if !ok { + return nil, c.errorf(c.peek(), "expected field name after '.', got %s", c.peek().Type) + } + segments = append(segments, seg) + + case parser.LBRACKET: + c.advance() + index, err := c.parseExpr(parser.BpLowest) + if err != nil { + return nil, err + } + if _, err := c.expect(parser.RBRACKET); err != nil { + return nil, err + } + segments = append(segments, core.PathSegment{Index: index}) + + default: + return segments, nil + } + } +} + +// Parses If expressions (e.g. if condition { ... } else { ... }) +func (c *Compiler) parseIf() (core.Expr, error) { + // consume if + start := c.advance() + + condition, err := c.parseExpr(parser.BpLowest) + if err != nil { + return nil, err + } + + then, err := c.parseBlock() + if err != nil { + return nil, err + } + + var elseBranch []core.Expr + if c.match(parser.KW_ELSE) { + if c.check(parser.KW_IF) { + elseIf, err := c.parseIf() + if err != nil { + return nil, err + } + elseBranch = []core.Expr{elseIf} + } else { + elseBranch, err = c.parseBlock() + if err != nil { + return nil, err + } + } + } + + return &core.IfExpr{ + Node: nodeAt(start), + Condition: condition, + Then: then, + Else: elseBranch, + }, nil +} + +// Parses If block (e.g. { expr; expr; ... }) +// Semicolons between expressions are optional. +func (c *Compiler) parseBlock() ([]core.Expr, error) { + if _, err := c.expect(parser.LBRACE); err != nil { + return nil, err + } + + var exprs []core.Expr + for !c.check(parser.RBRACE) && !c.atEnd() { + e, err := c.parseExpr(parser.BpLowest) + if err != nil { + return nil, err + } + exprs = append(exprs, e) + for c.match(parser.SEMICOLON) { + } + } + + if _, err := c.expect(parser.RBRACE); err != nil { + return nil, err + } + return exprs, nil +} + +func (c *Compiler) parseArgList() ([]core.Argument, error) { + var args []core.Argument + for !c.check(parser.RPAREN) && !c.atEnd() { + arg, err := c.parseArgument() + if err != nil { + return nil, err + } + args = append(args, arg) + if !c.match(parser.COMMA) { + break + } + } + return args, nil +} + +// Parses function arguments: named (key: expr) or positional (expr). +func (c *Compiler) parseArgument() (core.Argument, error) { + if c.peek().Type == parser.IDENT && c.peekAt(1).Type == parser.COLON { + name := c.advance().Lexeme + c.advance() + val, err := c.parseExpr(parser.BpLowest) + if err != nil { + return core.Argument{}, err + } + return core.Argument{Name: name, Value: val}, nil + } + + val, err := c.parseExpr(parser.BpLowest) + if err != nil { + return core.Argument{}, err + } + return core.Argument{Value: val}, nil +} + +// Parses delete expressions (e.g. del .field | del .field.nested[0] | del %meta.key) +// +// Only core.PathExpr is a valid target - anything else is a compile-time error. +func (c *Compiler) parseDel() (core.Expr, error) { + start := c.advance() + + tok := c.peek() + + var pathExpr *core.PathExpr + + switch tok.Type { + case parser.DOT: + raw, err := c.parseEventPath() + if err != nil { + return nil, err + } + pathExpr = raw.(*core.PathExpr) + + case parser.PERCENT: + raw, err := c.parseMetadataPath() + if err != nil { + return nil, err + } + pathExpr = raw.(*core.PathExpr) + + default: + return nil, c.errorf(tok, "del requires a path (.field or %%field), got %s", tok.Type) + } + + return &core.DelExpr{Node: nodeAt(start), Target: pathExpr}, nil +} + +// Parses for expressions (e.g. for i in expr { ... } | for i, item in expr { ... }) +func (c *Compiler) parseFor() (core.Expr, error) { + start := c.advance() + + first, err := c.expect(parser.IDENT) + if err != nil { + return nil, err + } + + var indexName, itemName string + + if c.match(parser.COMMA) { + second, err := c.expect(parser.IDENT) + if err != nil { + return nil, err + } + + if first.Lexeme != "_" { + indexName = first.Lexeme + } + if second.Lexeme != "_" { + itemName = second.Lexeme + } + } else { + indexName = first.Lexeme + } + + if indexName == "" && itemName == "" { + return nil, c.errorf(first, "for loop must bind at least one variable: use 'for i in ...' or 'for i, item in ...'") + } + + if _, err := c.expect(parser.KW_IN); err != nil { + return nil, err + } + + iter, err := c.parseExpr(parser.BpLowest) + if err != nil { + return nil, err + } + + body, err := c.parseBlock() + if err != nil { + return nil, err + } + + return &core.ForExpr{ + Node: nodeAt(start), + Index: indexName, + Item: itemName, + Iter: iter, + Body: body, + }, nil +} + +// isLValue reports whether expr is a valid assignment target. +func isLValue(expr core.Expr) bool { + switch expr.(type) { + case *core.IdentExpr, *core.PathExpr, *core.IndexExpr: + return true + } + return false +} + +func nodeAt(tok parser.Token) core.Node { + return core.NewNode(tok.StartPos()) +} + +// Strips prefixLen bytes from the front and 1 byte from the end. +func unwrap(s string, prefixLen int) string { + if len(s) <= prefixLen+1 { + return "" + } + return s[prefixLen : len(s)-1] +} diff --git a/plugin/action/transform/compiler/validate.go b/plugin/action/transform/compiler/validate.go new file mode 100644 index 000000000..4dcb64c87 --- /dev/null +++ b/plugin/action/transform/compiler/validate.go @@ -0,0 +1,173 @@ +package compiler + +import ( + "fmt" + "regexp" + "time" + + "github.com/ozontech/file.d/plugin/action/transform/core" +) + +// ValidateCalls walks the AST and checks that every function call refers to +// a function that exists in the registry. +// This is a lightweight static check - argument types are validated at runtime +func ValidateCalls(exprs []core.Expr, registry *core.Registry) error { + for _, expr := range exprs { + if err := validateExpr(expr, registry); err != nil { + return err + } + } + return nil +} + +func validateExpr(expr core.Expr, registry *core.Registry) error { + switch e := expr.(type) { + + case *core.CallExpr: + fn, ok := registry.Get(e.Name) + if !ok { + return fmt.Errorf("%s: unknown function %q", e.Pos(), e.Name) + } + if err := validateArgs(e, fn); err != nil { + return err + } + for _, arg := range e.Args { + if err := validateExpr(arg.Value, registry); err != nil { + return err + } + } + case *core.BinaryExpr: + if err := validateExpr(e.Left, registry); err != nil { + return err + } + return validateExpr(e.Right, registry) + case *core.UnaryExpr: + return validateExpr(e.Operand, registry) + case *core.AssignExpr: + return validateExpr(e.Value, registry) + case *core.IndexExpr: + if err := validateExpr(e.Object, registry); err != nil { + return err + } + return validateExpr(e.Index, registry) + case *core.ArrayExpr: + for _, el := range e.Elements { + if err := validateExpr(el, registry); err != nil { + return err + } + } + case *core.ObjectExpr: + seen := make(map[string]bool, len(e.Pairs)) + for _, kv := range e.Pairs { + if seen[kv.Key] { + return fmt.Errorf("%s: duplicate object key %q", e.Pos(), kv.Key) + } + seen[kv.Key] = true + if err := validateExpr(kv.Value, registry); err != nil { + return err + } + } + case *core.IfExpr: + if err := validateExpr(e.Condition, registry); err != nil { + return err + } + if err := ValidateCalls(e.Then, registry); err != nil { + return err + } + return ValidateCalls(e.Else, registry) + case *core.PathExpr: + for _, seg := range e.Segments { + if seg.IsIndex() { + if err := validateExpr(seg.Index, registry); err != nil { + return err + } + } + } + case *core.DelExpr: + for _, seg := range e.Target.Segments { + if seg.IsIndex() { + if err := validateExpr(seg.Index, registry); err != nil { + return err + } + } + } + case *core.ForExpr: + if err := validateExpr(e.Iter, registry); err != nil { + return err + } + return ValidateCalls(e.Body, registry) + case *core.RegexLit: + re, err := regexp.Compile(e.Pattern) + if err != nil { + return fmt.Errorf("%s: invalid regex pattern %q: %w", e.Pos(), e.Pattern, err) + } + e.Compiled = re + case *core.TimestampLit: + layouts := []string{ + time.RFC3339Nano, + time.RFC3339, + "2006-01-02T15:04:05", + "2006-01-02", + } + for _, layout := range layouts { + if t, err := time.Parse(layout, e.Value); err == nil { + e.Parsed = t + return nil + } + } + return fmt.Errorf("%s: cannot parse %q as a timestamp", e.Pos(), e.Value) + } + + return nil +} + +// validateArgs statically checks argument structure against the function's +// parameter list. Only structural issues are checked here — value types +// are validated at runtime since arguments are arbitrary expressions. +func validateArgs(e *core.CallExpr, fn core.Function) error { + params := fn.Params() + + var positionalCount int + named := make(map[string]bool) + + for _, arg := range e.Args { + if arg.Name == "" { + positionalCount++ + continue + } + if named[arg.Name] { + return fmt.Errorf("%s: function %q: duplicate argument %q", + e.Pos(), fn.Name(), arg.Name) + } + named[arg.Name] = true + known := false + for _, p := range params { + if p.Name == arg.Name { + known = true + break + } + } + if !known { + return fmt.Errorf("%s: function %q: unknown argument %q", + e.Pos(), fn.Name(), arg.Name) + } + } + + if positionalCount > len(params) { + return fmt.Errorf("%s: function %q: too many arguments: expected at most %d, got %d", + e.Pos(), fn.Name(), len(params), positionalCount) + } + + for i, p := range params { + if !p.Required { + continue + } + if i < positionalCount || named[p.Name] { + continue + } + return fmt.Errorf("%s: function %q: missing required argument %q", + e.Pos(), fn.Name(), p.Name) + } + + return nil +} diff --git a/plugin/action/transform/core/ast.go b/plugin/action/transform/core/ast.go new file mode 100644 index 000000000..313b45bba --- /dev/null +++ b/plugin/action/transform/core/ast.go @@ -0,0 +1,284 @@ +package core + +import ( + "fmt" + "regexp" + "strings" + "time" +) + +type EvalContext interface { + GetVar(string) (Value, bool) + SetVar(string, Value) + DeleteVar(string) + GetTarget() Target + CallFunc(pos Position, name string, positional []Value, named map[string]Value) (Value, error) +} + +type Position interface { + String() string +} + +type Expr interface { + Pos() Position + Eval(ctx EvalContext) (Value, error) +} + +type Node struct { + pos Position +} + +func NewNode(pos Position) Node { + return Node{pos: pos} +} + +func (n Node) Pos() Position { + return n.pos +} + +type IntLit struct { + Node + Value int64 +} + +type FloatLit struct { + Node + Value float64 +} + +type StringLit struct { + Node + Value string +} + +type BoolLit struct { + Node + Value bool +} + +type NullLit struct { + Node +} + +type RegexLit struct { + Node + Pattern string + Compiled *regexp.Regexp +} + +type TimestampLit struct { + Node + Value string + Parsed time.Time +} + +type IdentExpr struct { + Node + Name string +} + +type PathSegment struct { + Field string + Index Expr +} + +func (s PathSegment) IsField() bool { return s.Field != "" } +func (s PathSegment) IsIndex() bool { return s.Index != nil } + +type PathExpr struct { + Node + Root PathRoot + Segments []PathSegment +} + +type ArrayExpr struct { + Node + Elements []Expr +} + +type KVPair struct { + Key string + Value Expr +} + +type ObjectExpr struct { + Node + Pairs []KVPair +} + +type UnaryExpr struct { + Node + Op string + Operand Expr +} + +type BinaryExpr struct { + Node + Left Expr + Op string + Right Expr +} + +type AssignExpr struct { + Node + Target Expr + Value Expr +} + +type IndexExpr struct { + Node + Object Expr + Index Expr +} + +type Argument struct { + Name string + Value Expr +} + +type CallExpr struct { + Node + Name string + Args []Argument +} + +type IfExpr struct { + Node + Condition Expr + Then []Expr + Else []Expr +} + +type AbortExpr struct { + Node +} + +type DelExpr struct { + Node + Target *PathExpr +} + +type ForExpr struct { + Node + Index string + Item string + Iter Expr + Body []Expr +} + +// DumpAST returns a human-readable representation of the AST. +// Use only for debug +func DumpAST(expr Expr, depth int) string { + pad := strings.Repeat(" ", depth) + p := depth + 1 + + switch e := expr.(type) { + case *IntLit: + return fmt.Sprintf("%sIntLit(%d)", pad, e.Value) + case *FloatLit: + return fmt.Sprintf("%sFloatLit(%g)", pad, e.Value) + case *StringLit: + return fmt.Sprintf("%sStringLit(%q)", pad, e.Value) + case *BoolLit: + return fmt.Sprintf("%sBoolLit(%v)", pad, e.Value) + case *NullLit: + return fmt.Sprintf("%sNullLit", pad) + case *RegexLit: + return fmt.Sprintf("%sRegexLit(%q)", pad, e.Pattern) + case *TimestampLit: + return fmt.Sprintf("%sTimestampLit(%q)", pad, e.Value) + case *IdentExpr: + return fmt.Sprintf("%sIdent(%s)", pad, e.Name) + + case *PathExpr: + root := "." + if e.Root == MetadataRoot { + root = "%" + } + parts := make([]string, 0, len(e.Segments)) + for _, s := range e.Segments { + if s.IsField() { + parts = append(parts, s.Field) + } else { + parts = append(parts, fmt.Sprintf("[%s]", DumpAST(s.Index, 0))) + } + } + return fmt.Sprintf("%sPath(%s%s)", pad, root, strings.Join(parts, ".")) + + case *ArrayExpr: + lines := []string{fmt.Sprintf("%sArray", pad)} + for _, el := range e.Elements { + lines = append(lines, DumpAST(el, p)) + } + return strings.Join(lines, "\n") + + case *ObjectExpr: + lines := []string{fmt.Sprintf("%sObject", pad)} + for _, kv := range e.Pairs { + lines = append(lines, fmt.Sprintf("%s key(%q):", pad, kv.Key)) + lines = append(lines, DumpAST(kv.Value, p+1)) + } + return strings.Join(lines, "\n") + + case *UnaryExpr: + return fmt.Sprintf("%sUnary(%s)\n%s", pad, e.Op, DumpAST(e.Operand, p)) + + case *BinaryExpr: + return fmt.Sprintf("%sBinary(%s)\n%s\n%s", + pad, e.Op, + DumpAST(e.Left, p), + DumpAST(e.Right, p), + ) + + case *AssignExpr: + return fmt.Sprintf("%sAssign\n%s\n%s", + pad, + DumpAST(e.Target, p), + DumpAST(e.Value, p), + ) + + case *IndexExpr: + return fmt.Sprintf("%sIndex\n%s\n%s", + pad, + DumpAST(e.Object, p), + DumpAST(e.Index, p), + ) + + case *CallExpr: + lines := []string{fmt.Sprintf("%sCall(%s)", pad, e.Name)} + for _, arg := range e.Args { + if arg.Name != "" { + lines = append(lines, fmt.Sprintf("%s named(%s:)", pad, arg.Name)) + lines = append(lines, DumpAST(arg.Value, p+1)) + } else { + lines = append(lines, DumpAST(arg.Value, p)) + } + } + return strings.Join(lines, "\n") + + case *IfExpr: + lines := []string{ + fmt.Sprintf("%sIf", pad), + fmt.Sprintf("%s condition:", pad), + DumpAST(e.Condition, p+1), + fmt.Sprintf("%s then:", pad), + } + for _, t := range e.Then { + lines = append(lines, DumpAST(t, p+1)) + } + if len(e.Else) > 0 { + lines = append(lines, fmt.Sprintf("%s else:", pad)) + for _, el := range e.Else { + lines = append(lines, DumpAST(el, p+1)) + } + } + return strings.Join(lines, "\n") + + case *AbortExpr: + return fmt.Sprintf("%sAbort", pad) + case *DelExpr: + return fmt.Sprintf("%sDel\n%s", pad, DumpAST(e.Target, p)) + } + + return fmt.Sprintf("%s", pad, expr) +} diff --git a/plugin/action/transform/core/eval.go b/plugin/action/transform/core/eval.go new file mode 100644 index 000000000..85ce57672 --- /dev/null +++ b/plugin/action/transform/core/eval.go @@ -0,0 +1,599 @@ +package core + +import ( + "errors" + "fmt" + "math" + "time" +) + +var AbortError = errors.New("abort") + +// evalBlock evaluates a sequence of expressions and returns the value of the last one. +// An empty block evaluates to null. +func evalBlock(ctx EvalContext, exprs []Expr) (Value, error) { + var last Value = NullValue{} + for _, expr := range exprs { + val, err := expr.Eval(ctx) + if err != nil { + return NullValue{}, err + } + last = val + } + return last, nil +} + +func (e *IntLit) Eval(_ EvalContext) (Value, error) { + return IntegerValue{V: e.Value}, nil +} + +func (e *FloatLit) Eval(_ EvalContext) (Value, error) { + return FloatValue{V: e.Value}, nil +} + +func (e *StringLit) Eval(_ EvalContext) (Value, error) { + return StringValue{V: e.Value}, nil +} + +func (e *BoolLit) Eval(_ EvalContext) (Value, error) { + return BoolValue{V: e.Value}, nil +} + +func (e *NullLit) Eval(_ EvalContext) (Value, error) { + return NullValue{}, nil +} + +func (e *RegexLit) Eval(_ EvalContext) (Value, error) { + return RegexValue{V: e.Compiled}, nil +} + +func (e *TimestampLit) Eval(_ EvalContext) (Value, error) { + return TimestampValue{V: e.Parsed}, nil +} + +func (e *IdentExpr) Eval(ctx EvalContext) (Value, error) { + if val, ok := ctx.GetVar(e.Name); ok { + return val, nil + } + // nil or error ??? + return NullValue{}, nil +} + +func (e *PathExpr) Eval(ctx EvalContext) (Value, error) { + path, err := e.toRuntimePath(ctx) + if err != nil { + return NullValue{}, err + } + val, err := ctx.GetTarget().Get(path) + return val, err +} + +// toRuntimePath converts a PathExpr (AST) into a Path (runtime) by evaluating +// any dynamic index expressions contained in the segment list. +// +// - .field -> FieldSeg("field") +// - [0] -> IndexSeg(0) +// - ["key"] -> FieldSeg("key") - string key becomes a named field +// - [.dynamic_idx] -> IndexSeg(n) - expression evaluated at runtime +func (e *PathExpr) toRuntimePath(ctx EvalContext) (Path, error) { + segs := make([]Segment, 0, len(e.Segments)) + + for _, s := range e.Segments { + if s.IsField() { + segs = append(segs, FieldSeg(s.Field)) + continue + } + + idxVal, err := s.Index.Eval(ctx) + if err != nil { + return Path{}, err + } + + switch v := idxVal.(type) { + case IntegerValue: + segs = append(segs, IndexSeg(int(v.V))) + case StringValue: + segs = append(segs, FieldSeg(v.V)) + default: + return Path{}, fmt.Errorf( + "%s: path index must be integer or string, got %s", e.Pos(), idxVal.Kind()) + } + } + + return Path{Root: e.Root, Segments: segs}, nil +} + +func (e *ArrayExpr) Eval(ctx EvalContext) (Value, error) { + elements := make([]Value, len(e.Elements)) + for i, el := range e.Elements { + val, err := el.Eval(ctx) + if err != nil { + return NullValue{}, err + } + elements[i] = val + } + return ArrayValue{V: elements}, nil +} + +func (e *ObjectExpr) Eval(ctx EvalContext) (Value, error) { + result := make(map[string]Value, len(e.Pairs)) + for _, kv := range e.Pairs { + val, err := kv.Value.Eval(ctx) + if err != nil { + return NullValue{}, err + } + result[kv.Key] = val + } + return ObjectValue{V: result}, nil +} + +func (e *UnaryExpr) Eval(ctx EvalContext) (Value, error) { + operand, err := e.Operand.Eval(ctx) + if err != nil { + return NullValue{}, err + } + + switch e.Op { + case "!": + return BoolValue{V: !operand.AsBool()}, nil + + case "-": + return evalNegate(e.Pos(), resolve(operand)) + } + + return NullValue{}, fmt.Errorf("%s: unknown unary operator %q", e.Pos(), e.Op) +} + +func evalNegate(pos Position, operand Value) (Value, error) { + switch v := operand.(type) { + case IntegerValue: + return IntegerValue{V: -v.V}, nil + case FloatValue: + return FloatValue{V: -v.V}, nil + } + return NullValue{}, fmt.Errorf("%s: unary minus requires integer or float, got %s", + pos, operand.Kind()) +} + +func (e *BinaryExpr) Eval(ctx EvalContext) (Value, error) { + switch e.Op { + case "&&": + left, err := e.Left.Eval(ctx) + if err != nil { + return NullValue{}, err + } + if !left.AsBool() { + return left, nil + } + return e.Right.Eval(ctx) + + case "||": + left, err := e.Left.Eval(ctx) + if err != nil { + return NullValue{}, err + } + if left.AsBool() { + return left, nil + } + return e.Right.Eval(ctx) + } + + left, err := e.Left.Eval(ctx) + if err != nil { + return NullValue{}, err + } + right, err := e.Right.Eval(ctx) + if err != nil { + return NullValue{}, err + } + + switch e.Op { + case "==": + return BoolValue{V: left.Equal(right)}, nil + case "!=": + return BoolValue{V: !left.Equal(right)}, nil + case "+": + return evalAdd(e.Pos(), resolve(left), resolve(right)) + case "-", "*", "/", "%": + return evalArithmetic(e.Pos(), e.Op, resolve(left), resolve(right)) + case "<", "<=", ">", ">=": + return evalComparison(e.Pos(), e.Op, resolve(left), resolve(right)) + } + + return NullValue{}, fmt.Errorf("%s: unknown binary operator %q", e.Pos(), e.Op) +} + +func (e *AssignExpr) Eval(ctx EvalContext) (Value, error) { + value, err := e.Value.Eval(ctx) + if err != nil { + return NullValue{}, err + } + + switch target := e.Target.(type) { + + case *IdentExpr: + ctx.SetVar(target.Name, value) + return value, nil + case *PathExpr: + path, err := target.toRuntimePath(ctx) + if err != nil { + return NullValue{}, err + } + if err := ctx.GetTarget().Set(path, value); err != nil { + return NullValue{}, fmt.Errorf("%s: %w", e.Pos(), err) + } + return value, nil + case *IndexExpr: + if err := evalIndexAssign(ctx, target, value); err != nil { + return NullValue{}, fmt.Errorf("%s: %w", e.Pos(), err) + } + return value, nil + } + + return NullValue{}, fmt.Errorf("%s: invalid assignment target %T", e.Pos(), e.Target) +} + +// evalIndexAssign handles arr[n] = value and obj["key"] = value +// where the object is a local variable (IdentExpr). +func evalIndexAssign(ctx EvalContext, target *IndexExpr, value Value) error { + ident, ok := target.Object.(*IdentExpr) + if !ok { + return fmt.Errorf("index assignment target must be a local variable, got %T", + target.Object) + } + + current, _ := ctx.GetVar(ident.Name) + + idxVal, err := target.Index.Eval(ctx) + if err != nil { + return err + } + + switch idx := idxVal.(type) { + + case IntegerValue: + // arr[n] = value + arr, ok := current.(ArrayValue) + if !ok { + return fmt.Errorf("cannot use integer index on %s", current.Kind()) + } + resolved := resolveIndex(int(idx.V), len(arr.V)) + if resolved < 0 { + return fmt.Errorf("index %d is out of bounds (len %d)", idx.V, len(arr.V)) + } + newSlice := make([]Value, len(arr.V)) + copy(newSlice, arr.V) + // Grow with nulls if the index exceeds the current length. + for len(newSlice) <= resolved { + newSlice = append(newSlice, NullValue{}) + } + newSlice[resolved] = value + ctx.SetVar(ident.Name, ArrayValue{V: newSlice}) + return nil + + case StringValue: + // obj["key"] = value + obj, ok := current.(ObjectValue) + if !ok { + return fmt.Errorf("cannot use string index on %s", current.Kind()) + } + newMap := make(map[string]Value, len(obj.V)+1) + for k, v := range obj.V { + newMap[k] = v + } + newMap[idx.V] = value + ctx.SetVar(ident.Name, ObjectValue{V: newMap}) + return nil + } + + return fmt.Errorf("index must be integer or string, got %s", idxVal.Kind()) +} + +// Eval resolves arr[n] and obj["key"] on local variables and call results. +func (e *IndexExpr) Eval(ctx EvalContext) (Value, error) { + obj, err := e.Object.Eval(ctx) + if err != nil { + return NullValue{}, err + } + + idx, err := e.Index.Eval(ctx) + if err != nil { + return NullValue{}, err + } + + return evalIndex(e.Pos(), resolve(obj), resolve(idx)) +} + +func evalIndex(pos Position, obj, idx Value) (Value, error) { + switch o := obj.(type) { + case ArrayValue: + i, ok := idx.(IntegerValue) + if !ok { + return NullValue{}, fmt.Errorf( + "%s: array index must be integer, got %s", pos, idx.Kind()) + } + resolved := resolveIndex(int(i.V), len(o.V)) + if resolved < 0 || resolved >= len(o.V) { + // nil or error ??? + return NullValue{}, nil + } + return o.V[resolved], nil + case ObjectValue: + s, ok := idx.(StringValue) + if !ok { + return NullValue{}, fmt.Errorf( + "%s: object index must be string, got %s", pos, idx.Kind()) + } + val, exists := o.V[s.V] + if !exists { + // nil or error ??? + return NullValue{}, nil + } + return val, nil + } + + return NullValue{}, fmt.Errorf("%s: cannot index into %s", pos, obj.Kind()) +} + +func (e *CallExpr) Eval(ctx EvalContext) (Value, error) { + var positional []Value + named := make(map[string]Value) + + for _, arg := range e.Args { + val, err := arg.Value.Eval(ctx) + if err != nil { + return NullValue{}, err + } + + val = resolve(val) + + if arg.Name == "" { + positional = append(positional, val) + } else { + named[arg.Name] = val + } + } + + return ctx.CallFunc(e.Pos(), e.Name, positional, named) +} +func (e *IfExpr) Eval(ctx EvalContext) (Value, error) { + condition, err := e.Condition.Eval(ctx) + if err != nil { + return NullValue{}, err + } + + if condition.AsBool() { + return evalBlock(ctx, e.Then) + } + if len(e.Else) > 0 { + return evalBlock(ctx, e.Else) + } + + return NullValue{}, nil +} + +func (e *AbortExpr) Eval(_ EvalContext) (Value, error) { + return NullValue{}, AbortError +} + +func (e *DelExpr) Eval(ctx EvalContext) (Value, error) { + path, err := e.Target.toRuntimePath(ctx) + if err != nil { + return NullValue{}, err + } + if err := ctx.GetTarget().Delete(path); err != nil { + return NullValue{}, fmt.Errorf("%s: del: %w", e.Pos(), err) + } + return NullValue{}, nil +} + +func (e *ForExpr) Eval(ctx EvalContext) (Value, error) { + iterVal, err := e.Iter.Eval(ctx) + if err != nil { + return NullValue{}, err + } + + arr, ok := resolve(iterVal).(ArrayValue) + if !ok { + return NullValue{}, fmt.Errorf("%s: for loop requires array, got %s", e.Pos(), iterVal.Kind()) + } + + for i, item := range arr.V { + if e.Index != "" { + ctx.SetVar(e.Index, IntegerValue{V: int64(i)}) + } + if e.Item != "" { + ctx.SetVar(e.Item, resolve(item)) + } + + _, err := evalBlock(ctx, e.Body) + if err != nil { + if errors.Is(err, AbortError) { + return NullValue{}, err + } + return NullValue{}, err + } + } + + // clean up loop variables from scope + if e.Index != "" { + ctx.DeleteVar(e.Index) + } + ctx.DeleteVar(e.Item) + + return NullValue{}, nil +} + +// evalAdd handles the "+" operator: +// - string + string -> concatenation +// - int + int -> integer result +// - any numeric mix -> float result +func evalAdd(pos Position, left, right Value) (Value, error) { + if l, ok := left.(StringValue); ok { + r, ok := right.(StringValue) + if !ok { + return NullValue{}, fmt.Errorf( + "%s: operator +: cannot concatenate string with %s", pos, right.Kind()) + } + return StringValue{V: l.V + r.V}, nil + } + + if l, ok := left.(IntegerValue); ok { + if r, ok := right.(IntegerValue); ok { + return IntegerValue{V: l.V + r.V}, nil + } + } + + l, err := ToFloat(left) + if err != nil { + return NullValue{}, fmt.Errorf("%s: operator +: %w", pos, err) + } + r, err := ToFloat(right) + if err != nil { + return NullValue{}, fmt.Errorf("%s: operator +: %w", pos, err) + } + return FloatValue{V: l + r}, nil +} + +func evalArithmetic(pos Position, op string, left, right Value) (Value, error) { + if l, ok := left.(IntegerValue); ok { + if r, ok := right.(IntegerValue); ok { + switch op { + case "-": + return IntegerValue{V: l.V - r.V}, nil + case "*": + return IntegerValue{V: l.V * r.V}, nil + case "/": + if r.V == 0 { + return NullValue{}, fmt.Errorf("%s: operator /: division by zero", pos) + } + return IntegerValue{V: l.V / r.V}, nil + case "%": + if r.V == 0 { + return NullValue{}, fmt.Errorf("%s: operator %%: modulo by zero", pos) + } + return IntegerValue{V: l.V % r.V}, nil + } + } + } + + l, err := ToFloat(left) + if err != nil { + return NullValue{}, fmt.Errorf("%s: operator %s: %w", pos, op, err) + } + r, err := ToFloat(right) + if err != nil { + return NullValue{}, fmt.Errorf("%s: operator %s: %w", pos, op, err) + } + + switch op { + case "-": + return FloatValue{V: l - r}, nil + case "*": + return FloatValue{V: l * r}, nil + case "/": + if r == 0 { + return NullValue{}, fmt.Errorf("%s: operator /: division by zero", pos) + } + return FloatValue{V: l / r}, nil + case "%": + if r == 0 { + return NullValue{}, fmt.Errorf("%s: operator %%: modulo by zero", pos) + } + return FloatValue{V: math.Mod(l, r)}, nil + } + + return NullValue{}, fmt.Errorf("%s: unknown arithmetic operator %q", pos, op) +} + +func evalComparison(pos Position, op string, left, right Value) (Value, error) { + if l, ok := left.(IntegerValue); ok { + if r, ok := right.(IntegerValue); ok { + return BoolValue{V: cmpInts(op, l.V, r.V)}, nil + } + } + + lNum, lErr := ToFloat(left) + rNum, rErr := ToFloat(right) + if lErr == nil && rErr == nil { + return BoolValue{V: cmpFloats(op, lNum, rNum)}, nil + } + + if l, ok := left.(StringValue); ok { + if r, ok := right.(StringValue); ok { + return BoolValue{V: cmpStrings(op, l.V, r.V)}, nil + } + } + + if l, ok := left.(TimestampValue); ok { + if r, ok := right.(TimestampValue); ok { + return BoolValue{V: cmpTimestamps(op, l.V, r.V)}, nil + } + } + + return NullValue{}, fmt.Errorf( + "%s: operator %s: cannot compare %s and %s", pos, op, left.Kind(), right.Kind()) +} + +func cmpInts(op string, l, r int64) bool { + switch op { + case "<": + return l < r + case "<=": + return l <= r + case ">": + return l > r + case ">=": + return l >= r + } + return false +} + +func cmpFloats(op string, l, r float64) bool { + switch op { + case "<": + return l < r + case "<=": + return l <= r + case ">": + return l > r + case ">=": + return l >= r + } + return false +} + +func cmpStrings(op, l, r string) bool { + switch op { + case "<": + return l < r + case "<=": + return l <= r + case ">": + return l > r + case ">=": + return l >= r + } + return false +} + +func cmpTimestamps(op string, l, r time.Time) bool { + switch op { + case "<": + return l.Before(r) + case "<=": + return !l.After(r) + case ">": + return l.After(r) + case ">=": + return !l.Before(r) + } + return false +} + +func resolveIndex(idx, length int) int { + if idx < 0 { + idx = length + idx + } + return idx +} diff --git a/plugin/action/transform/core/function.go b/plugin/action/transform/core/function.go new file mode 100644 index 000000000..cddaab430 --- /dev/null +++ b/plugin/action/transform/core/function.go @@ -0,0 +1,154 @@ +package core + +import ( + "fmt" + "slices" + "strings" +) + +// Describes a single parameter of a built-in function. +type Parameter struct { + // Name is the parameter name as used in named calls: fn(name: value). + Name string + + // Required - if true the caller must provide this argument. + // If false and the argument is omitted, Default is used. + Required bool + + // Default is the value used when the parameter is optional and not provided. + // A nil interface value means "no default" (only valid when Required is false + // and the function handles the missing case itself). + Default Value + + // AcceptedKinds lists the value kinds this parameter accepts. + // An empty slice means any kind is accepted. + AcceptedKinds []ValueKind +} + +// Function is the interface every built-in function must implement. +// +// Lifecycle during a call: +// 1. Interpreter evaluates all argument expressions -> positional []Value + named map[string]Value +// 2. Registry.ResolveArgs validates and maps them to the parameter list -> map[string]Value +// 3. Function.Call receives the resolved map and returns a Value +type Function interface { + // Returns the function name as it appears in source code. + Name() string + + // Returns the ordered list of parameter descriptors. + // Order matters for positional argument binding. + Params() []Parameter + + // Call executes the function with fully-resolved, validated arguments. + // args is keyed by parameter name and always contains every parameter + // that has a value (required args + provided optional args + defaults). + Call(args map[string]Value) (Value, error) +} + +// Registry holds all built-in functions available during program execution. +// It is built once at startup and shared across all Program.Run calls. +type Registry struct { + functions map[string]Function +} + +func NewRegistry() *Registry { + return &Registry{ + functions: make(map[string]Function), + } +} + +func (r *Registry) Register(fn Function) error { + name := fn.Name() + if _, exists := r.functions[name]; exists { + return fmt.Errorf("function %q is already registered", name) + } + r.functions[name] = fn + return nil +} + +func (r *Registry) MustRegister(fn Function) { + if err := r.Register(fn); err != nil { + panic(fmt.Sprintf("transform: %s", err)) + } +} + +func (r *Registry) Get(name string) (Function, bool) { + fn, ok := r.functions[name] + return fn, ok +} + +// Maps evaluated argument values to the function's parameter map. +func (r *Registry) ResolveArgs(fn Function, positional []Value, named map[string]Value) (map[string]Value, error) { + params := fn.Params() + + if len(positional) > len(params) { + return nil, fmt.Errorf( + "function %q: too many arguments: expected at most %d, got %d", + fn.Name(), len(params), len(positional), + ) + } + + resolved := make(map[string]Value, len(params)) + explicit := make(map[string]bool, len(params)) + + for _, p := range params { + if p.Default != nil { + resolved[p.Name] = p.Default + } + } + + for i, val := range positional { + pName := params[i].Name + resolved[pName] = val + explicit[pName] = true + } + + for argName, val := range named { + if !slices.ContainsFunc(params, func(p Parameter) bool { return p.Name == argName }) { + return nil, fmt.Errorf("function %q: unknown argument %q", fn.Name(), argName) + } + if explicit[argName] { + return nil, fmt.Errorf( + "function %q: argument %q provided both positionally and by name", + fn.Name(), argName) + } + resolved[argName] = val + explicit[argName] = true + } + + for _, p := range params { + if p.Required && !explicit[p.Name] { + return nil, fmt.Errorf( + "function %q: missing required argument %q", + fn.Name(), p.Name) + } + } + + for _, p := range params { + if len(p.AcceptedKinds) == 0 { + continue + } + val, ok := resolved[p.Name] + if !ok { + continue + } + if !slices.Contains(p.AcceptedKinds, val.Kind()) { + return nil, fmt.Errorf( + "function %q: argument %q: expected %s, got %s", + fn.Name(), p.Name, + joinKinds(p.AcceptedKinds), + val.Kind(), + ) + } + } + + return resolved, nil +} + +func joinKinds(kinds []ValueKind) string { + parts := make([]string, len(kinds)) + for i, k := range kinds { + parts[i] = k.String() + } + return strings.Join(parts, " or ") +} diff --git a/plugin/action/transform/core/target.go b/plugin/action/transform/core/target.go new file mode 100644 index 000000000..facf18c5e --- /dev/null +++ b/plugin/action/transform/core/target.go @@ -0,0 +1,42 @@ +package core + +type PathRoot int + +const ( + EventRoot PathRoot = iota + MetadataRoot +) + +// Segment is a single resolved step in a runtime path. +// Exactly one mode is active per segment. +type Segment struct { + Field string // .fieldname - active when IsIndex is false + Idx int // [n] - active when IsIndex is true; negative == from end +} + +func FieldSeg(name string) Segment { return Segment{Field: name} } +func IndexSeg(idx int) Segment { return Segment{Idx: idx} } + +func (s Segment) IsField() bool { return s.Field != "" } +func (s Segment) IsIndex() bool { return s.Field == "" } + +// Path is a fully-resolved runtime path produced by the interpreter +// after evaluating any dynamic index expressions inside PathExpr. +type Path struct { + Root PathRoot // EventRoot (.field) or MetadataRoot (%field) + Segments []Segment // empty == event root or metadata root +} + +// Abstraction over the event being processed. +// +// The interpreter accesses all data exclusively through this interface. +type Target interface { + // Get retrieves the value at path. + Get(path Path) (Value, error) + + // Set writes value at path, creating missing nodes as needed. + Set(path Path, value Value) error + + // Delete removes the node at path. No-op when the path does not exist. + Delete(path Path) error +} diff --git a/plugin/action/transform/core/value.go b/plugin/action/transform/core/value.go new file mode 100644 index 000000000..b1bca102b --- /dev/null +++ b/plugin/action/transform/core/value.go @@ -0,0 +1,292 @@ +package core + +import ( + "fmt" + "math" + "regexp" + "strconv" + "strings" + "time" + + insaneJSON "github.com/ozontech/insane-json" +) + +type ValueKind int + +const ( + KindNull ValueKind = iota + KindBool + KindInteger + KindFloat + KindString + KindArray + KindObject + KindRegex + KindTimestamp +) + +var valueStrings []string = []string{"null", "bool", "integer", "float", "string", "array", "object", "regex", "timestamp"} + +func (k ValueKind) String() string { + if int(k) < len(valueStrings) { + return valueStrings[k] + } + return "unknown" +} + +type Value interface { + Kind() ValueKind + AsBool() bool + Equal(Value) bool + String() string +} + +type NullValue struct{} +type BoolValue struct{ V bool } +type IntegerValue struct{ V int64 } +type FloatValue struct{ V float64 } +type StringValue struct{ V string } +type ArrayValue struct{ V []Value } +type ObjectValue struct{ V map[string]Value } +type RegexValue struct{ V *regexp.Regexp } +type TimestampValue struct{ V time.Time } +type JSONNodeValue struct{ N *insaneJSON.Node } + +func (NullValue) Kind() ValueKind { return KindNull } +func (BoolValue) Kind() ValueKind { return KindBool } +func (IntegerValue) Kind() ValueKind { return KindInteger } +func (FloatValue) Kind() ValueKind { return KindFloat } +func (StringValue) Kind() ValueKind { return KindString } +func (ArrayValue) Kind() ValueKind { return KindArray } +func (ObjectValue) Kind() ValueKind { return KindObject } +func (RegexValue) Kind() ValueKind { return KindRegex } +func (TimestampValue) Kind() ValueKind { return KindTimestamp } +func (v JSONNodeValue) Kind() ValueKind { + switch { + case v.N == nil || v.N.IsNull(): + return KindNull + case v.N.IsTrue() || v.N.IsFalse(): + return KindBool + case v.N.IsNumber(): + if _, err := strconv.ParseInt(v.N.AsString(), 10, 64); err == nil { + return KindInteger + } + return KindFloat + case v.N.IsString(): + return KindString + case v.N.IsArray(): + return KindArray + case v.N.IsObject(): + return KindObject + } + return KindNull +} + +func (NullValue) AsBool() bool { return false } +func (v BoolValue) AsBool() bool { return v.V } +func (IntegerValue) AsBool() bool { return true } +func (FloatValue) AsBool() bool { return true } +func (StringValue) AsBool() bool { return true } +func (ArrayValue) AsBool() bool { return true } +func (ObjectValue) AsBool() bool { return true } +func (RegexValue) AsBool() bool { return true } +func (TimestampValue) AsBool() bool { return true } +func (v JSONNodeValue) AsBool() bool { return v.N.AsBool() } + +func (NullValue) Equal(other Value) bool { return other.Kind() == KindNull } + +func (v BoolValue) Equal(other Value) bool { + o, ok := other.(BoolValue) + return ok && v.V == o.V +} + +func (v IntegerValue) Equal(other Value) bool { + switch o := other.(type) { + case IntegerValue: + return v.V == o.V + case FloatValue: + return float64(v.V) == o.V + } + return false +} + +func (v FloatValue) Equal(other Value) bool { + switch o := other.(type) { + case FloatValue: + return v.V == o.V + case IntegerValue: + return v.V == float64(o.V) + } + return false +} + +func (v StringValue) Equal(other Value) bool { + o, ok := other.(StringValue) + return ok && v.V == o.V +} + +func (v ArrayValue) Equal(other Value) bool { + o, ok := other.(ArrayValue) + if !ok || len(v.V) != len(o.V) { + return false + } + for i := range v.V { + if !v.V[i].Equal(o.V[i]) { + return false + } + } + return true +} + +func (v ObjectValue) Equal(other Value) bool { + o, ok := other.(ObjectValue) + if !ok || len(v.V) != len(o.V) { + return false + } + for k, lhs := range v.V { + rhs, exists := o.V[k] + if !exists || !lhs.Equal(rhs) { + return false + } + } + return true +} + +func (v RegexValue) Equal(other Value) bool { + o, ok := other.(RegexValue) + return ok && v.V.String() == o.V.String() +} + +func (v TimestampValue) Equal(other Value) bool { + o, ok := other.(TimestampValue) + return ok && v.V.Equal(o.V) +} + +func (v JSONNodeValue) Equal(other Value) bool { + return v.String() == other.String() +} + +func (NullValue) String() string { return "null" } + +func (v BoolValue) String() string { + if v.V { + return "true" + } + return "false" +} + +func (v IntegerValue) String() string { + return strconv.FormatInt(v.V, 10) +} + +func (v FloatValue) String() string { + f := v.V + switch { + case math.IsInf(f, 1): + return "Infinity" + case math.IsInf(f, -1): + return "-Infinity" + case math.IsNaN(f): + return "NaN" + } + return strconv.FormatFloat(f, 'g', -1, 64) +} + +func (v StringValue) String() string { return v.V } + +func (v ArrayValue) String() string { + parts := make([]string, len(v.V)) + for i, el := range v.V { + parts[i] = display(el) + } + return "[" + strings.Join(parts, ", ") + "]" +} + +func (v ObjectValue) String() string { + parts := make([]string, 0, len(v.V)) + for k, val := range v.V { + parts = append(parts, fmt.Sprintf("%q: %s", k, display(val))) + } + return "{" + strings.Join(parts, ", ") + "}" +} + +func (v RegexValue) String() string { + return "r'" + v.V.String() + "'" +} + +func (v TimestampValue) String() string { + return v.V.Format(time.RFC3339Nano) +} + +func (v JSONNodeValue) String() string { + if v.N == nil { + return "null" + } + return v.N.AsString() +} + +func ToFloat(v Value) (float64, error) { + switch t := v.(type) { + case IntegerValue: + return float64(t.V), nil + case FloatValue: + return t.V, nil + } + return 0, fmt.Errorf("type error: expected integer or float, got %s", v.Kind()) +} + +// wraps StringValue in quotes when used inside array/object formatting. +func display(v Value) string { + if s, ok := v.(StringValue); ok { + return fmt.Sprintf("%q", s.V) + } + return v.String() +} + +func resolve(v Value) Value { + jv, ok := v.(JSONNodeValue) + if !ok { + return v + } + return JsonNodeToValue(jv.N) +} + +// JsonNodeToValue converts a single insaneJSON node to a Value. +// The conversion is recursive for arrays and objects. +func JsonNodeToValue(node *insaneJSON.Node) Value { + if node == nil { + return NullValue{} + } + + switch { + case node.IsNull(): + return NullValue{} + case node.IsTrue() || node.IsFalse(): + return BoolValue{V: node.AsBool()} + case node.IsNumber(): + if i, err := strconv.ParseInt(node.AsString(), 10, 64); err == nil { + return IntegerValue{V: i} + } + f, _ := strconv.ParseFloat(node.AsString(), 64) + return FloatValue{V: f} + case node.IsString(): + return StringValue{V: node.AsString()} + case node.IsArray(): + nodes := node.AsArray() + arr := make([]Value, len(nodes)) + for i, n := range nodes { + arr[i] = JsonNodeToValue(n) + } + return ArrayValue{V: arr} + case node.IsObject(): + fields := node.AsFields() + obj := make(map[string]Value, len(fields)) + for _, field := range fields { + key := field.AsString() + val := node.Dig(key) + obj[key] = JsonNodeToValue(val) + } + return ObjectValue{V: obj} + } + return NullValue{} +} diff --git a/plugin/action/transform/parser/lexer.go b/plugin/action/transform/parser/lexer.go new file mode 100644 index 000000000..338bb1465 --- /dev/null +++ b/plugin/action/transform/parser/lexer.go @@ -0,0 +1,92 @@ +package parser + +import ( + "github.com/timtadh/lexmachine" + "github.com/timtadh/lexmachine/machines" +) + +var ( + globalLexer = NewLexer() +) + +func NewLexer() *lexmachine.Lexer { + l := lexmachine.NewLexer() + + token := func(typ TokenType) lexmachine.Action { + return func(_ *lexmachine.Scanner, m *machines.Match) (any, error) { + return NewToken(typ, m), nil + } + } + + skip := func(_ *lexmachine.Scanner, _ *machines.Match) (any, error) { + return nil, nil + } + + // whitespaces + l.Add([]byte(`[ \t\r\n]+`), skip) + // comments + l.Add([]byte(`#[^\n]*`), skip) + + // literals + // r'\d+' - regex + l.Add([]byte(`r'([^'\\]|\\.)*'`), token(LIT_REGEX)) + // t'2024-01-01T00:00:00Z' - timestamp + l.Add([]byte(`t'[^']*'`), token(LIT_TIMESTAMP)) + // s'C:\new\folder' - raw string + l.Add([]byte(`s'([^'\\]|\\.)*'`), token(LIT_STRING_RAW)) + + // keywords + l.Add([]byte(`if`), token(KW_IF)) + l.Add([]byte(`else`), token(KW_ELSE)) + l.Add([]byte(`true`), token(KW_TRUE)) + l.Add([]byte(`false`), token(KW_FALSE)) + l.Add([]byte(`null`), token(KW_NULL)) + l.Add([]byte(`abort`), token(KW_ABORT)) + l.Add([]byte(`del`), token(KW_DEL)) + l.Add([]byte(`for`), token(KW_FOR)) + l.Add([]byte(`in`), token(KW_IN)) + + // identificators + l.Add([]byte(`[a-zA-Z_][a-zA-Z0-9_]*`), token(IDENT)) + + // numeric literals + // format: 3.14 | 1.5e10 | 1.5e+10 | 1.5e-10 | 1e10 | 1e+10 | 1e-10 + l.Add([]byte(`[0-9]+(\.[0-9]+([eE][+-]?[0-9]+)?|[eE][+-]?[0-9]+)`), token(LIT_FLOAT)) + // integers + l.Add([]byte(`[0-9]+`), token(LIT_INTEGER)) + // string literals + l.Add([]byte(`"([^"\\]|\\.)*"`), token(LIT_STRING)) + + // operators + l.Add([]byte(`&&`), token(OP_AND)) + l.Add([]byte(`\|\|`), token(OP_OR)) + l.Add([]byte(`==`), token(OP_EQ)) + l.Add([]byte(`!=`), token(OP_NEQ)) + l.Add([]byte(`<=`), token(OP_LTE)) + l.Add([]byte(`>=`), token(OP_GTE)) + l.Add([]byte(`=`), token(OP_ASSIGN)) + l.Add([]byte(`<`), token(OP_LT)) + l.Add([]byte(`>`), token(OP_GT)) + + l.Add([]byte(`\+`), token(PLUS)) + l.Add([]byte(`-`), token(MINUS)) + l.Add([]byte(`\*`), token(STAR)) + l.Add([]byte(`/`), token(SLASH)) + l.Add([]byte(`%`), token(PERCENT)) + l.Add([]byte(`!`), token(BANG)) + + l.Add([]byte(`\(`), token(LPAREN)) + l.Add([]byte(`\)`), token(RPAREN)) + l.Add([]byte(`\{`), token(LBRACE)) + l.Add([]byte(`\}`), token(RBRACE)) + l.Add([]byte(`\[`), token(LBRACKET)) + l.Add([]byte(`\]`), token(RBRACKET)) + l.Add([]byte(`,`), token(COMMA)) + l.Add([]byte(`:`), token(COLON)) + l.Add([]byte(`;`), token(SEMICOLON)) + l.Add([]byte(`\.`), token(DOT)) + + _ = l.Compile() + + return l +} diff --git a/plugin/action/transform/parser/parse.go b/plugin/action/transform/parser/parse.go new file mode 100644 index 000000000..2820dbdbb --- /dev/null +++ b/plugin/action/transform/parser/parse.go @@ -0,0 +1,30 @@ +package parser + +import ( + "fmt" + + "github.com/timtadh/lexmachine/machines" +) + +func Parse(input string) ([]Token, error) { + scanner, _ := globalLexer.Scanner([]byte(input)) + + var tokens []Token + for raw, err, eos := scanner.Next(); !eos; raw, err, eos = scanner.Next() { + if err != nil { + if ui, ok := err.(*machines.UnconsumedInput); ok { + return nil, fmt.Errorf( + "unexpected character at (%d:%d): %q", + ui.StartLine, ui.StartColumn, string(ui.Text), + ) + } + return nil, fmt.Errorf("unexpected parse error: %w", err) + } + if raw == nil { + continue + } + tokens = append(tokens, raw.(Token)) + } + + return tokens, nil +} diff --git a/plugin/action/transform/parser/tokens.go b/plugin/action/transform/parser/tokens.go new file mode 100644 index 000000000..ad94722d6 --- /dev/null +++ b/plugin/action/transform/parser/tokens.go @@ -0,0 +1,183 @@ +package parser + +import ( + "fmt" + + "github.com/timtadh/lexmachine/machines" +) + +type TokenType int + +const ( + EOF TokenType = -1 + iota + WHITESPACE + COMMENT + + KW_IF + KW_ELSE + KW_TRUE + KW_FALSE + KW_NULL + KW_ABORT + KW_DEL + KW_FOR + KW_IN + + IDENT + + LIT_FLOAT + LIT_INTEGER + LIT_STRING // "double quoted" + LIT_STRING_RAW // s'\n\n' + LIT_REGEX // r'\d+' + LIT_TIMESTAMP // t'2024-01-01T00:00:00Z' + + OP_AND // && + OP_OR // || + OP_EQ // == + OP_NEQ // != + OP_LTE // <= + OP_GTE // >= + OP_ASSIGN // = + OP_LT // < + OP_GT // > + + LPAREN // ( + RPAREN // ) + LBRACE // { + RBRACE // } + LBRACKET // [ + RBRACKET // ] + + PLUS // + + MINUS // - + STAR // * + SLASH // / + PERCENT // % + BANG // ! + COMMA // , + COLON // : + SEMICOLON // ; + DOT // . +) + +var tokenNames = map[TokenType]string{ + EOF: "EOF", + WHITESPACE: "WHITESPACE", + COMMENT: "COMMENT", + KW_IF: "KW_IF", + KW_ELSE: "KW_ELSE", + KW_TRUE: "KW_TRUE", + KW_FALSE: "KW_FALSE", + KW_NULL: "KW_NULL", + KW_ABORT: "KW_ABORT", + KW_DEL: "KW_DEL", + KW_FOR: "KW_FOR", + KW_IN: "KW_IN", + IDENT: "IDENT", + LIT_FLOAT: "LIT_FLOAT", + LIT_INTEGER: "LIT_INTEGER", + LIT_STRING: "LIT_STRING", + LIT_STRING_RAW: "LIT_STRING_RAW", + LIT_REGEX: "LIT_REGEX", + LIT_TIMESTAMP: "LIT_TIMESTAMP", + OP_AND: "OP_AND", + OP_OR: "OP_OR", + OP_EQ: "OP_EQ", + OP_NEQ: "OP_NEQ", + OP_LTE: "OP_LTE", + OP_GTE: "OP_GTE", + OP_ASSIGN: "OP_ASSIGN", + PLUS: "PLUS", + MINUS: "MINUS", + STAR: "STAR", + SLASH: "SLASH", + PERCENT: "PERCENT", + OP_LT: "OP_LT", + OP_GT: "OP_GT", + BANG: "BANG", + LPAREN: "LPAREN", + RPAREN: "RPAREN", + LBRACE: "LBRACE", + RBRACE: "RBRACE", + LBRACKET: "LBRACKET", + RBRACKET: "RBRACKET", + COMMA: "COMMA", + COLON: "COLON", + SEMICOLON: "SEMICOLON", + DOT: "DOT", +} + +// Binding power is the "gravitational pull" of an infix operator. +// The higher the value, the more tightly the operator binds its operands. +const ( + BpLowest = iota // 0 — expression terminator + BpAssign // 1 — = + BpOr // 2 — || + BpAnd // 3 — && + BpEqual // 4 — == != + BpCompare // 5 — < <= > >= + BpAdd // 6 — + - + BpMul // 7 — * / % + BpUnary // 8 — prefix ! and - (not in BindingPower, used directly) + BpCall // 9 — fn() expr[] +) + +func (t TokenType) String() string { + return tokenNames[t] +} + +func (t TokenType) BindingPower() int { + switch t { + case OP_ASSIGN: + return BpAssign + case OP_OR: + return BpOr + case OP_AND: + return BpAnd + case OP_EQ, OP_NEQ: + return BpEqual + case OP_LT, OP_LTE, OP_GT, OP_GTE: + return BpCompare + case PLUS, MINUS: + return BpAdd + case STAR, SLASH, PERCENT: + return BpMul + case LPAREN, LBRACKET: // fn(args) or expr[index] + return BpCall + } + return BpLowest +} + +type Position struct { + Line int + Column int +} + +func (p Position) String() string { + return fmt.Sprintf("%d:%d", p.Line, p.Column) +} + +type Token struct { + Type TokenType + Lexeme string + StartLine int + StartColumn int + EndLine int + EndColumn int +} + +func (t Token) StartPos() Position { + return Position{t.StartLine, t.StartColumn} +} + +func NewToken(typ TokenType, m *machines.Match) Token { + return Token{ + Type: typ, + Lexeme: string(m.Bytes), + StartLine: m.StartLine, + StartColumn: m.StartColumn, + EndLine: m.EndLine, + EndColumn: m.EndColumn, + } +} diff --git a/plugin/action/transform/runtime/context.go b/plugin/action/transform/runtime/context.go new file mode 100644 index 000000000..5217da15f --- /dev/null +++ b/plugin/action/transform/runtime/context.go @@ -0,0 +1,57 @@ +package runtime + +import ( + "fmt" + + "github.com/ozontech/file.d/plugin/action/transform/core" +) + +// Context carries all runtime state available during expression evaluation. +// +// A single Context is created per Program.Run call and passed down through every Eval call. +type Context struct { + target core.Target + registry *core.Registry + scope map[string]core.Value +} + +func NewContext(target core.Target, registry *core.Registry) *Context { + return &Context{ + target: target, + registry: registry, + scope: make(map[string]core.Value), + } +} + +func (c *Context) GetVar(name string) (core.Value, bool) { + v, ok := c.scope[name] + return v, ok +} + +func (c *Context) SetVar(name string, value core.Value) { + c.scope[name] = value +} + +func (c *Context) DeleteVar(name string) { + delete(c.scope, name) +} + +func (c *Context) GetTarget() core.Target { + return c.target +} + +func (c *Context) CallFunc(pos core.Position, name string, positional []core.Value, named map[string]core.Value) (core.Value, error) { + fn, ok := c.registry.Get(name) + if !ok { + return core.NullValue{}, fmt.Errorf("%s: unknown function %q", pos, name) + } + resolved, err := c.registry.ResolveArgs(fn, positional, named) + if err != nil { + return core.NullValue{}, nil + } + result, err := fn.Call(resolved) + if err != nil { + return core.NullValue{}, nil + } + return result, nil +} diff --git a/plugin/action/transform/runtime/map_target.go b/plugin/action/transform/runtime/map_target.go new file mode 100644 index 000000000..960122eed --- /dev/null +++ b/plugin/action/transform/runtime/map_target.go @@ -0,0 +1,323 @@ +package runtime + +import ( + "fmt" + + "github.com/ozontech/file.d/plugin/action/transform/core" +) + +// MapTarget is the standard in-memory Target. +type MapTarget struct { + event map[string]core.Value + metadata map[string]core.Value +} + +func NewMapTarget() *MapTarget { + return &MapTarget{ + event: make(map[string]core.Value), + metadata: make(map[string]core.Value), + } +} + +func NewMapTargetFrom(event map[string]core.Value) *MapTarget { + t := NewMapTarget() + t.event = event + return t +} + +func (t *MapTarget) Event() map[string]core.Value { + return t.event +} + +func (t *MapTarget) Metadata() map[string]core.Value { + return t.metadata +} + +func (t *MapTarget) rootMap(r core.PathRoot) map[string]core.Value { + if r == core.MetadataRoot { + return t.metadata + } + return t.event +} + +func (t *MapTarget) Get(path core.Path) (core.Value, error) { + root := t.rootMap(path.Root) + + if len(path.Segments) == 0 { + snap := make(map[string]core.Value, len(root)) + for k, v := range root { + snap[k] = v + } + return core.ObjectValue{V: snap}, nil + } + + var current core.Value = core.ObjectValue{V: root} + + for i, seg := range path.Segments { + if seg.IsIndex() { + arr, ok := current.(core.ArrayValue) + if !ok { + return core.NullValue{}, fmt.Errorf( + "segment %d: cannot index %s with integer", i, current.Kind()) + } + idx := resolveIndex(seg.Idx, len(arr.V)) + if idx < 0 || idx >= len(arr.V) { + return core.NullValue{}, nil + } + current = arr.V[idx] + } else { + obj, ok := current.(core.ObjectValue) + if !ok { + return core.NullValue{}, fmt.Errorf( + "segment %d: cannot access field %q on %s", i, seg.Field, current.Kind()) + } + val, exists := obj.V[seg.Field] + if !exists { + return core.NullValue{}, nil + } + current = val + } + } + + return current, nil +} + +func (t *MapTarget) Set(path core.Path, value core.Value) error { + root := t.rootMap(path.Root) + + if len(path.Segments) == 0 { + obj, ok := value.(core.ObjectValue) + if !ok { + return fmt.Errorf( + "cannot assign %s to root path: value must be an object", value.Kind()) + } + for k := range root { + delete(root, k) + } + for k, v := range obj.V { + root[k] = v + } + return nil + } + + return setInMap(root, path.Segments, value) +} + +// setInMap recursively writes value into obj along segs. +func setInMap(obj map[string]core.Value, segs []core.Segment, value core.Value) error { + head, tail := segs[0], segs[1:] + + if head.IsIndex() { + return fmt.Errorf("cannot use integer index [%d] at object level", head.Idx) + } + + if len(tail) == 0 { + obj[head.Field] = value + return nil + } + + existing := obj[head.Field] + + if tail[0].IsIndex() { + // index -> node must be an array. + var arr []core.Value + if a, ok := existing.(core.ArrayValue); ok { + arr = make([]core.Value, len(a.V)) + copy(arr, a.V) + } + newArr, err := setInArray(arr, tail, value) + if err != nil { + return fmt.Errorf(".%s: %w", head.Field, err) + } + obj[head.Field] = core.ArrayValue{V: newArr} + } else { + // field -> node must be an object. + var child map[string]core.Value + if o, ok := existing.(core.ObjectValue); ok { + child = make(map[string]core.Value, len(o.V)) + for k, v := range o.V { + child[k] = v + } + } else { + child = make(map[string]core.Value) + } + if err := setInMap(child, tail, value); err != nil { + return fmt.Errorf(".%s: %w", head.Field, err) + } + obj[head.Field] = core.ObjectValue{V: child} + } + + return nil +} + +// setInArray recursively writes value into arr along segs. +func setInArray(arr []core.Value, segs []core.Segment, value core.Value) ([]core.Value, error) { + head, tail := segs[0], segs[1:] + + if !head.IsIndex() { + return nil, fmt.Errorf("cannot access field .%s on array", head.Field) + } + + idx := resolveIndex(head.Idx, len(arr)) + if idx < 0 { + return nil, fmt.Errorf("index %d is out of bounds", head.Idx) + } + + // Grow with nulls if the index exceeds the current length. + for len(arr) <= idx { + arr = append(arr, core.NullValue{}) + } + + if len(tail) == 0 { + arr[idx] = value + return arr, nil + } + + existing := arr[idx] + + if tail[0].IsIndex() { + var child []core.Value + if a, ok := existing.(core.ArrayValue); ok { + child = make([]core.Value, len(a.V)) + copy(child, a.V) + } + newChild, err := setInArray(child, tail, value) + if err != nil { + return nil, fmt.Errorf("[%d]: %w", head.Idx, err) + } + arr[idx] = core.ArrayValue{V: newChild} + } else { + var child map[string]core.Value + if o, ok := existing.(core.ObjectValue); ok { + child = make(map[string]core.Value, len(o.V)) + for k, v := range o.V { + child[k] = v + } + } else { + child = make(map[string]core.Value) + } + if err := setInMap(child, tail, value); err != nil { + return nil, fmt.Errorf("[%d]: %w", head.Idx, err) + } + arr[idx] = core.ObjectValue{V: child} + } + + return arr, nil +} + +func (t *MapTarget) Delete(path core.Path) error { + root := t.rootMap(path.Root) + + if len(path.Segments) == 0 { + for k := range root { + delete(root, k) + } + return nil + } + + return deleteFromMap(root, path.Segments) +} + +func deleteFromMap(obj map[string]core.Value, segs []core.Segment) error { + head, tail := segs[0], segs[1:] + + if head.IsIndex() { + return fmt.Errorf("cannot use integer index [%d] at object level", head.Idx) + } + + if len(tail) == 0 { + delete(obj, head.Field) + return nil + } + + existing, ok := obj[head.Field] + if !ok { + return nil + } + + if tail[0].IsIndex() { + a, ok := existing.(core.ArrayValue) + if !ok { + return nil + } + arr := make([]core.Value, len(a.V)) + copy(arr, a.V) + newArr, err := deleteFromArray(arr, tail) + if err != nil { + return fmt.Errorf(".%s: %w", head.Field, err) + } + obj[head.Field] = core.ArrayValue{V: newArr} + } else { + o, ok := existing.(core.ObjectValue) + if !ok { + return nil + } + child := make(map[string]core.Value, len(o.V)) + for k, v := range o.V { + child[k] = v + } + if err := deleteFromMap(child, tail); err != nil { + return fmt.Errorf(".%s: %w", head.Field, err) + } + obj[head.Field] = core.ObjectValue{V: child} + } + + return nil +} + +func deleteFromArray(arr []core.Value, segs []core.Segment) ([]core.Value, error) { + head, tail := segs[0], segs[1:] + + if !head.IsIndex() { + return arr, fmt.Errorf("cannot access field .%s on array", head.Field) + } + + idx := resolveIndex(head.Idx, len(arr)) + if idx < 0 || idx >= len(arr) { + return arr, nil + } + + if len(tail) == 0 { + return append(arr[:idx], arr[idx+1:]...), nil + } + + existing := arr[idx] + + if tail[0].IsIndex() { + a, ok := existing.(core.ArrayValue) + if !ok { + return arr, nil + } + child := make([]core.Value, len(a.V)) + copy(child, a.V) + newChild, err := deleteFromArray(child, tail) + if err != nil { + return nil, fmt.Errorf("[%d]: %w", head.Idx, err) + } + arr[idx] = core.ArrayValue{V: newChild} + } else { + o, ok := existing.(core.ObjectValue) + if !ok { + return arr, nil + } + child := make(map[string]core.Value, len(o.V)) + for k, v := range o.V { + child[k] = v + } + if err := deleteFromMap(child, tail); err != nil { + return nil, fmt.Errorf("[%d]: %w", head.Idx, err) + } + arr[idx] = core.ObjectValue{V: child} + } + + return arr, nil +} + +// resolveIndex maps a possibly-negative index to an absolute position. +// -1 -> last element, -2 -> second to last, etc. +func resolveIndex(idx, length int) int { + if idx < 0 { + idx = length + idx + } + return idx +} diff --git a/plugin/action/transform/runtime/root_target.go b/plugin/action/transform/runtime/root_target.go new file mode 100644 index 000000000..8ebdf0122 --- /dev/null +++ b/plugin/action/transform/runtime/root_target.go @@ -0,0 +1,228 @@ +package runtime + +import ( + "fmt" + "strconv" + "strings" + + "github.com/ozontech/file.d/plugin/action/transform/core" + insaneJSON "github.com/ozontech/insane-json" +) + +type RootTarget struct { + Root *insaneJSON.Root + SourceName string + metadata map[string]string + + pathBuffer []string +} + +func NewRootTarget(root *insaneJSON.Root, sourceName string, metadata map[string]string) *RootTarget { + return &RootTarget{ + Root: root, + SourceName: sourceName, + metadata: metadata, + + pathBuffer: make([]string, 0), + } +} + +func (t *RootTarget) Get(path core.Path) (core.Value, error) { + if path.Root == core.MetadataRoot { + return t.getMetadata(path) + } + + if len(path.Segments) == 0 { + return core.JsonNodeToValue(t.Root.Node), nil + } + + t.pathBuffer = toInsaneJSONPath(path.Segments, t.pathBuffer) + node := t.Root.Dig(t.pathBuffer...) + if node == nil { + return core.NullValue{}, nil + } + + return core.JSONNodeValue{N: node}, nil +} + +func (t *RootTarget) Set(path core.Path, value core.Value) error { + if path.Root == core.MetadataRoot { + return t.setMetadata(path, value) + } + + if len(path.Segments) == 0 { + return fmt.Errorf("set: cannot replace event root") + } + + t.pathBuffer = toInsaneJSONPath(path.Segments[:len(path.Segments)-1], t.pathBuffer) + parent := t.Root.Dig(t.pathBuffer...) + if parent == nil { + return nil + } + + encoded, err := valueToJSON(value) + if err != nil { + return fmt.Errorf("set %s: %w", formatSegments(path.Segments), err) + } + + leaf := path.Segments[len(path.Segments)-1] + if leaf.IsIndex() { + arr := parent.AsArray() + idx := resolveIndex(leaf.Idx, len(arr)) + if idx < 0 || idx >= len(arr) { + return fmt.Errorf("set: index %d out of bounds", leaf.Idx) + } + node := arr[idx] + node.MutateToJSON(t.Root, encoded) + } else { + existing := parent.Dig(leaf.Field) + if existing == nil { + parent.AddFieldNoAlloc(t.Root, leaf.Field).MutateToJSON(t.Root, encoded) + } else { + existing.MutateToJSON(t.Root, encoded) + } + } + + return nil +} + +func (t *RootTarget) Delete(path core.Path) error { + if path.Root == core.MetadataRoot { + return t.deleteMetadata(path) + } + + if len(path.Segments) == 0 { + return fmt.Errorf("delete: cannot delete event root") + } + + t.pathBuffer = toInsaneJSONPath(path.Segments, t.pathBuffer) + node := t.Root.Dig(t.pathBuffer...) + if node == nil { + return nil + } + + node.Suicide() + + return nil +} + +func (t *RootTarget) getMetadata(path core.Path) (core.Value, error) { + if len(path.Segments) == 0 { + obj := make(map[string]core.Value, len(t.metadata)) + for k, v := range t.metadata { + obj[k] = core.StringValue{V: v} + } + return core.ObjectValue{V: obj}, nil + } + + if len(path.Segments) != 1 || !path.Segments[0].IsField() { + return core.NullValue{}, fmt.Errorf("metadata path must be a single field name") + } + + key := path.Segments[0].Field + val, ok := t.metadata[key] + if !ok { + return core.NullValue{}, nil + } + return core.StringValue{V: val}, nil +} + +func (t *RootTarget) setMetadata(path core.Path, value core.Value) error { + if len(path.Segments) != 1 || !path.Segments[0].IsField() { + return fmt.Errorf("metadata path must be a single field name") + } + s, ok := value.(core.StringValue) + if !ok { + return fmt.Errorf("metadata values must be strings, got %s", value.Kind()) + } + t.metadata[path.Segments[0].Field] = s.V + return nil +} + +func (t *RootTarget) deleteMetadata(path core.Path) error { + if len(path.Segments) != 1 || !path.Segments[0].IsField() { + return fmt.Errorf("metadata path must be a single field name") + } + delete(t.metadata, path.Segments[0].Field) + return nil +} + +func toInsaneJSONPath(segments []core.Segment, pathBuffer []string) []string { + lseg := len(segments) + lpb := len(pathBuffer) + + if lpb < lseg { + pathBuffer = append(pathBuffer, make([]string, lseg-lpb)...) + } else { + pathBuffer = pathBuffer[:lseg] + } + + for i, seg := range segments { + if seg.IsField() { + pathBuffer[i] = seg.Field + continue + } + pathBuffer[i] = strconv.Itoa(seg.Idx) + } + + return pathBuffer +} + +// valueToJSON serialises a core.Value to a JSON string. +func valueToJSON(v core.Value) (string, error) { + switch val := v.(type) { + case core.NullValue: + return "null", nil + case core.BoolValue: + if val.V { + return "true", nil + } + return "false", nil + case core.IntegerValue: + return strconv.FormatInt(val.V, 10), nil + case core.FloatValue: + return strconv.FormatFloat(val.V, 'f', -1, 64), nil + case core.StringValue: + return strconv.Quote(val.V), nil + case core.ArrayValue: + parts := make([]string, len(val.V)) + for i, el := range val.V { + s, err := valueToJSON(el) + if err != nil { + return "", err + } + parts[i] = s + } + return "[" + strings.Join(parts, ",") + "]", nil + case core.ObjectValue: + parts := make([]string, 0, len(val.V)) + for k, el := range val.V { + s, err := valueToJSON(el) + if err != nil { + return "", err + } + parts = append(parts, strconv.Quote(k)+":"+s) + } + return "{" + strings.Join(parts, ",") + "}", nil + case core.JSONNodeValue: + node := v.(core.JSONNodeValue).N + if node == nil { + return "null", nil + } + + return node.EncodeToString(), nil + } + return "", fmt.Errorf("cannot serialise %s to JSON", v.Kind()) +} + +func formatSegments(segs []core.Segment) string { + var b strings.Builder + for _, s := range segs { + if s.IsIndex() { + fmt.Fprintf(&b, "[%d]", s.Idx) + } else { + fmt.Fprintf(&b, ".%s", s.Field) + } + } + return b.String() +} diff --git a/plugin/action/transform/stdlib/upcase.go b/plugin/action/transform/stdlib/upcase.go new file mode 100644 index 000000000..29fe9c113 --- /dev/null +++ b/plugin/action/transform/stdlib/upcase.go @@ -0,0 +1,26 @@ +package stdlib + +import ( + "strings" + + "github.com/ozontech/file.d/plugin/action/transform/core" +) + +type Upcase struct{} + +func (Upcase) Name() string { return "upcase" } + +func (Upcase) Params() []core.Parameter { + return []core.Parameter{ + { + Name: "value", + Required: true, + AcceptedKinds: []core.ValueKind{core.KindString}, + }, + } +} + +func (Upcase) Call(args map[string]core.Value) (core.Value, error) { + val := args["value"].(core.StringValue) + return core.StringValue{V: strings.ToUpper(val.V)}, nil +} diff --git a/plugin/action/transform/transform.go b/plugin/action/transform/transform.go new file mode 100644 index 000000000..8b9ac024c --- /dev/null +++ b/plugin/action/transform/transform.go @@ -0,0 +1,102 @@ +package transform + +import ( + "errors" + "fmt" + + "github.com/ozontech/file.d/fd" + "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/plugin/action/transform/compiler" + "github.com/ozontech/file.d/plugin/action/transform/core" + "github.com/ozontech/file.d/plugin/action/transform/runtime" + "github.com/ozontech/file.d/plugin/action/transform/stdlib" + "go.uber.org/zap" +) + +var ( + compilerCache = map[string]*compiler.Compiler{} +) + +/*{ introduction +}*/ + +type Plugin struct { + config *Config + registry *core.Registry + expressions []core.Expr + logger *zap.Logger + pluginController pipeline.ActionPluginController +} + +// ! config-params +// ^ config-params +type Config struct { + // > @3@4@5@6 + // > + // > Transform plugin source code. + Source string `json:"source"` // * +} + +func init() { + fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{ + Type: "transform", + Factory: factory, + }) +} + +func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { + return &Plugin{}, &Config{} +} + +func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) { + p.config = config.(*Config) + p.logger = params.Logger.Desugar() + p.pluginController = params.Controller + + p.registry = core.NewRegistry() + p.registry.MustRegister(stdlib.Upcase{}) + + var err error + cacheKey := fmt.Sprintf("%s_%d", params.PipelineName, params.Index) + c, ok := compilerCache[cacheKey] + if !ok { + p.logger.Info("create compiler") + c, err = compiler.NewCompiler(p.config.Source) + if err != nil { + p.logger.Fatal("parsing error", zap.Error(err)) + } + compilerCache[cacheKey] = c + } + + exprs, err := c.Compile() + if err != nil { + p.logger.Fatal("compilation error", zap.Error(err)) + } + + if err := compiler.ValidateCalls(exprs, p.registry); err != nil { + p.logger.Fatal("validation error", zap.Error(err)) + } + + p.expressions = exprs +} + +func (p *Plugin) Stop() {} + +func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { + target := runtime.NewRootTarget(event.Root, event.SourceName, nil) + ctx := runtime.NewContext(target, p.registry) + + for _, expr := range p.expressions { + _, err := expr.Eval(ctx) + if err != nil { + if errors.Is(err, core.AbortError) { + p.logger.Debug("transform program aborted") + return pipeline.ActionPass + } + p.logger.Error("transform runtime error", zap.String("position", expr.Pos().String()), zap.Error(err)) + return pipeline.ActionPass + } + } + + return pipeline.ActionPass +} diff --git a/plugin/action/transform/transform_test.go b/plugin/action/transform/transform_test.go new file mode 100644 index 000000000..08b764fb6 --- /dev/null +++ b/plugin/action/transform/transform_test.go @@ -0,0 +1,415 @@ +package transform + +import ( + "sync" + "testing" + + "github.com/ozontech/file.d/cfg" + "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/test" + insaneJSON "github.com/ozontech/insane-json" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type eventCase struct { + in string + fields map[string]string +} + +func TestLanguage(t *testing.T) { + tests := []struct { + name string + source string + events []eventCase + }{ + { + name: "assign", + source: `.res = "hello"`, + events: []eventCase{ + { + in: `{"x":1}`, + fields: map[string]string{"res": "hello"}, + }, + }, + }, + { + name: "literals", + source: ` + .str = "hello" + .raw = s'no\escape' + .num = 42 + .flt = 3.14 + .bool = true + .nl = null + `, + events: []eventCase{ + { + in: `{"x":1}`, + fields: map[string]string{ + "str": "hello", + "raw": `no\escape`, + "num": "42", + "flt": "3.14", + "bool": "true", + "nl": "null", + }, + }, + }, + }, { + name: "arithmetic", + source: ` + .add = .a + .b + .sub = .a - .b + .mul = .a * .b + .div = .a / .b + .mod = .a % .b + .conc = .s + "_suffix" + `, + events: []eventCase{ + { + in: `{"a":10,"b":3,"s":"hello"}`, + fields: map[string]string{ + "add": "13", + "sub": "7", + "mul": "30", + "div": "3", + "mod": "1", + "conc": "hello_suffix", + }, + }, + }, + }, + { + name: "comparison", + source: ` + .gt = .a > .b + .lt = .a < .b + .gte = .a >= .b + .lte = .a <= .b + .eq = .a == .b + .neq = .a != .b + .seq = .s == "hello" + `, + events: []eventCase{ + { + in: `{"a":10,"b":3,"s":"hello"}`, + fields: map[string]string{ + "gt": "true", + "lt": "false", + "gte": "true", + "lte": "false", + "eq": "false", + "neq": "true", + "seq": "true", + }, + }, + }, + }, + { + name: "logical", + source: ` + .and = .a && .b + .or = .b || .c + .not = !.c + `, + events: []eventCase{ + { + in: `{"a":true,"b":false,"c":false}`, + fields: map[string]string{ + "and": "false", + "or": "false", + "not": "true", + }, + }, + }, + }, + + { + name: "if_else", + source: ` + if .status >= 500 { + .severity = "critical" + } else if .status >= 400 { + .severity = "warning" + } else { + .severity = "ok" + } + `, + events: []eventCase{ + { + in: `{"status":503}`, + fields: map[string]string{"severity": "critical"}, + }, + { + in: `{"status":404}`, + fields: map[string]string{"severity": "warning"}, + }, + { + in: `{"status":200}`, + fields: map[string]string{"severity": "ok"}, + }, + }, + }, + + { + name: "abort", + source: ` + if .drop == true { + abort + } + .processed = true + `, + events: []eventCase{ + { + in: `{"drop":true}`, + fields: map[string]string{"processed": ""}, + }, + { + in: `{"drop":false}`, + fields: map[string]string{"processed": "true"}, + }, + }, + }, + { + name: "path", + source: ` + .user.role = "admin" + .tags[0] = "first" + idx = 1 + .tags[idx] = "second" + `, + events: []eventCase{ + { + in: `{"user":{},"tags":["",""]}`, + fields: map[string]string{ + "user.role": "admin", + "tags.0": "first", + "tags.1": "second", + }, + }, + }, + }, + + { + name: "array", + source: ` + arr = [1, 2, 3] + .first = arr[0] + .last = arr[-1] + arr[0] = 99 + .modified = arr[0] + `, + events: []eventCase{ + { + in: `{"x":1}`, + fields: map[string]string{ + "first": "1", + "last": "3", + "modified": "99", + }, + }, + }, + }, + + { + name: "object", + source: ` + obj = {"a": 1, "b": 2} + .va = obj["a"] + .vb = obj["b"] + `, + events: []eventCase{ + { + in: `{"x":1}`, + fields: map[string]string{ + "va": "1", + "vb": "2", + }, + }, + }, + }, + + { + name: "for_index", + source: ` + for i in .items { + if .items[i]["role"] == "admin" { + .items[i]["privileged"] = true + } + } + `, + events: []eventCase{ + { + in: `{"items":[{"role":"admin"},{"role":"user"}]}`, + fields: map[string]string{ + "items.0.privileged": "true", + "items.1.privileged": "", + }, + }, + }, + }, + { + name: "for_index_and_item", + source: ` + for i, item in .items { + if item["role"] == "admin" { + .items[i]["privileged"] = true + } + } + `, + events: []eventCase{ + { + in: `{"items":[{"role":"admin"},{"role":"user"}]}`, + fields: map[string]string{ + "items.0.privileged": "true", + "items.1.privileged": "", + }, + }, + }, + }, + + { + name: "for_blank_index", + source: ` + for _, item in .tags { + .count = .count + 1 + } + `, + events: []eventCase{ + { + in: `{"tags":["a","b","c"],"count":0}`, + fields: map[string]string{"count": "3"}, + }, + }, + }, + + { + name: "delete", + source: ` + del .secret + del .user.password + `, + events: []eventCase{ + { + in: `{"secret":"s3cr3t","user":{"name":"user321","password":"123"}}`, + fields: map[string]string{ + "secret": "", + "user.name": "user321", + "user.password": "", + }, + }, + { + in: `{"x":1}`, + fields: map[string]string{ + "secret": "", + "x": "1", + }, + }, + }, + }, + + { + name: "nested", + source: ` + if .level == "error" || .level == "fatal" { + .severity = "high" + } else { + .severity = "low" + } + + for i, item in .errors { + if item["code"] >= 500 { + .errors[i]["critical"] = true + } + } + + del .internal + .processed = true + `, + events: []eventCase{ + { + in: `{"level":"error","errors":[{"code":503},{"code":404}],"internal":"secret"}`, + fields: map[string]string{ + "severity": "high", + "errors.0.critical": "true", + "errors.1.critical": "", + "internal": "", + "processed": "true", + }, + }, + { + in: `{"level":"info","errors":[{"code":200}],"internal":"secret"}`, + fields: map[string]string{ + "severity": "low", + "errors.0.critical": "", + "internal": "", + "processed": "true", + }, + }, + }, + }, + { + name: "func_upcase", + source: ` + .level = upcase(.level) + name = upcase(.user.name) + .user.name = name + `, + events: []eventCase{ + { + in: `{"level":"info","user":{"name":"user321","password":"123"}}`, + fields: map[string]string{ + "level": "INFO", + "user.name": "USER321", + }, + }, + }, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + config := test.NewConfig(&Config{Source: tc.source}, nil) + p, input, output := test.NewPipelineMock( + test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false), + "name", + ) + + wg := &sync.WaitGroup{} + outEvents := make([]string, 0, len(tc.events)) + + output.SetOutFn(func(e *pipeline.Event) { + outEvents = append(outEvents, e.Root.EncodeToString()) + wg.Done() + }) + + wg.Add(len(tc.events)) + for _, ev := range tc.events { + input.In(0, "test.log", test.NewOffset(0), []byte(ev.in)) + } + wg.Wait() + p.Stop() + + require.Equal(t, len(tc.events), len(outEvents), "wrong number of output events") + + root := insaneJSON.Spawn() + defer insaneJSON.Release(root) + + for i, ev := range tc.events { + err := root.DecodeString(outEvents[i]) + require.NoError(t, err, "event %d: failed to decode output JSON", i) + + for field, want := range ev.fields { + node := root.Dig(cfg.ParseFieldSelector(field)...) + got := "" + if node != nil { + got = node.AsString() + } + assert.Equal(t, want, got, "event %d: field %q", i, field) + } + } + }) + } +}