diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java index a65cb882a4325..77a1634fa0bc2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java @@ -27,9 +27,9 @@ import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.FunctionKind; import org.apache.flink.table.planner.catalog.CatalogSchemaModel; import org.apache.flink.table.planner.catalog.CatalogSchemaTable; -import org.apache.flink.table.planner.functions.sql.ml.SqlMLTableFunction; import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader; import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; import org.apache.flink.table.planner.utils.ShortcutUtils; @@ -65,6 +65,8 @@ import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.SqlWindowTableFunction; import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlOperandMetadata; +import org.apache.calcite.sql.type.SqlOperandTypeChecker; import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.sql.validate.DelegatingScope; import org.apache.calcite.sql.validate.IdentifierNamespace; @@ -92,7 +94,6 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL; import static org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup.includeExpandedColumn; @@ -343,7 +344,7 @@ protected void addToSelectList( final Column column = resolvedSchema.getColumn(columnName).orElse(null); if (qualified.suffix().size() == 1 && column != null) { if (includeExpandedColumn(column, columnExpansionStrategies) - || declaredDescriptorColumn(scope, column)) { + || isDeclaredOnTimeColumn(scope, column)) { super.addToSelectList( list, aliases, fieldList, exp, scope, includeSystemVars); } @@ -360,15 +361,16 @@ protected void addToSelectList( protected @PolyNull SqlNode performUnconditionalRewrites( @PolyNull SqlNode node, boolean underFrom) { - // Special case for window TVFs like: - // TUMBLE(TABLE t, DESCRIPTOR(metadata_virtual), INTERVAL '1' MINUTE)) or - // SESSION(TABLE t PARTITION BY a, DESCRIPTOR(metadata_virtual), INTERVAL '1' MINUTE)) + // Capture table arguments early: + // TUMBLE(TABLE t, DESCRIPTOR(metadata_virtual), INTERVAL '1' MINUTE) or + // SESSION(TABLE t PARTITION BY a, DESCRIPTOR(metadata_virtual), INTERVAL '1' MINUTE) + // MyPtf(in => TABLE t PARTITION BY a, on_time => DESCRIPTOR(metadata_virtual)) // // "TABLE t" is translated into an implicit "SELECT * FROM t". This would ignore columns - // that are not expanded by default. However, the descriptor explicitly states the need - // for this column. Therefore, explicit table expressions (for window TVFs at most one) - // are captured before rewriting and replaced with a "marker" SqlSelect that contains the - // descriptor information. The "marker" SqlSelect is considered during column expansion. + // that are not expanded by default. However, the on_time descriptor explicitly states the + // need for time columns. Therefore, explicit table expressions are captured before + // rewriting and replaced with a "marker" SqlSelect that contains the descriptor + // information. The "marker" SqlSelect is considered during column expansion. final List tableArgs = getTableOperands(node); final SqlNode rewritten = super.performUnconditionalRewrites(node, underFrom); @@ -376,55 +378,54 @@ protected void addToSelectList( if (!(node instanceof SqlBasicCall)) { return rewritten; } + final SqlBasicCall call = (SqlBasicCall) node; - final SqlOperator operator = call.getOperator(); + // Special case for MODEL if (node instanceof SqlExplicitModelCall) { // Convert it so that model can be accessed in planner. SqlExplicitModelCall // from parser can't access model. - SqlExplicitModelCall modelCall = (SqlExplicitModelCall) node; - SqlIdentifier modelIdentifier = modelCall.getModelIdentifier(); - FlinkCalciteCatalogReader catalogReader = + final SqlExplicitModelCall modelCall = (SqlExplicitModelCall) node; + final SqlIdentifier modelIdentifier = modelCall.getModelIdentifier(); + final FlinkCalciteCatalogReader catalogReader = (FlinkCalciteCatalogReader) getCatalogReader(); - CatalogSchemaModel model = catalogReader.getModel(modelIdentifier.names); + final CatalogSchemaModel model = catalogReader.getModel(modelIdentifier.names); if (model != null) { return new SqlModelCall(modelCall, model); } } - // TODO (FLINK-37819): add test for SqlMLTableFunction - if (operator instanceof SqlWindowTableFunction || operator instanceof SqlMLTableFunction) { - if (tableArgs.stream().allMatch(Objects::isNull)) { - return rewritten; - } - - final List descriptors = - call.getOperandList().stream() - .flatMap(FlinkCalciteSqlValidator::extractDescriptors) - .collect(Collectors.toList()); - + // Mark rewritten "TABLE t" with on_time columns + if (tableArgs == null || tableArgs.stream().allMatch(Objects::isNull)) { + return rewritten; + } + final List onTimeColumns = extractOnTime(call); + if (onTimeColumns != null) { for (int i = 0; i < call.operandCount(); i++) { final SqlIdentifier tableArg = tableArgs.get(i); if (tableArg != null) { - final SqlNode opReplacement = new ExplicitTableSqlSelect(tableArg, descriptors); + final SqlNode opReplacement = + new ExplicitTableSqlSelect(tableArg, onTimeColumns); + // for f(TABLE t PARTITION BY c, ...) if (call.operand(i).getKind() == SqlKind.SET_SEMANTICS_TABLE) { final SqlCall setSemanticsTable = call.operand(i); setSemanticsTable.setOperand(0, opReplacement); } else if (call.operand(i).getKind() == SqlKind.ARGUMENT_ASSIGNMENT) { - // for TUMBLE(DATA => TABLE t3, ...) final SqlCall assignment = call.operand(i); + // for f(in => TABLE t PARTITION BY c, ...) if (assignment.operand(0).getKind() == SqlKind.SET_SEMANTICS_TABLE) { - final SqlCall setSemanticsTable = assignment.operand(i); + final SqlCall setSemanticsTable = assignment.operand(0); setSemanticsTable.setOperand(0, opReplacement); } else { + // for f(in => TABLE t, ...) assignment.setOperand(0, opReplacement); } } else { - // for TUMBLE(TABLE t3, ...) + // for f(TABLE t, ...) call.setOperand(i, opReplacement); } } - // for TUMBLE([DATA =>] SELECT ..., ...) + // for f([in =>] SELECT ..., ...) } } @@ -446,9 +447,9 @@ public SqlNode maybeCast(SqlNode node, RelDataType currentType, RelDataType desi */ static class ExplicitTableSqlSelect extends SqlSelect { - private final List descriptors; + private final List onTimeColumns; - public ExplicitTableSqlSelect(SqlIdentifier table, List descriptors) { + public ExplicitTableSqlSelect(SqlIdentifier table, List onTimeColumns) { super( SqlParserPos.ZERO, null, @@ -462,7 +463,7 @@ public ExplicitTableSqlSelect(SqlIdentifier table, List descripto null, null, null); - this.descriptors = descriptors; + this.onTimeColumns = onTimeColumns; } } @@ -470,30 +471,32 @@ public ExplicitTableSqlSelect(SqlIdentifier table, List descripto * Returns whether the given column has been declared in a {@link SqlKind#DESCRIPTOR} next to a * {@link SqlKind#EXPLICIT_TABLE} within TVF operands. */ - private static boolean declaredDescriptorColumn(SelectScope scope, Column column) { + private static boolean isDeclaredOnTimeColumn(SelectScope scope, Column column) { if (!(scope.getNode() instanceof ExplicitTableSqlSelect)) { return false; } final ExplicitTableSqlSelect select = (ExplicitTableSqlSelect) scope.getNode(); - return select.descriptors.stream() + return select.onTimeColumns.stream() .map(SqlIdentifier::getSimple) .anyMatch(id -> id.equals(column.getName())); } /** * Returns all {@link SqlKind#EXPLICIT_TABLE} and {@link SqlKind#SET_SEMANTICS_TABLE} operands - * within TVF operands. A list entry is {@code null} if the operand is not an {@link + * within PTF operands. A list entry is {@code null} if the operand is not an {@link * SqlKind#EXPLICIT_TABLE} or {@link SqlKind#SET_SEMANTICS_TABLE}. */ private static List getTableOperands(SqlNode node) { if (!(node instanceof SqlBasicCall)) { return null; } + final SqlBasicCall call = (SqlBasicCall) node; if (!(call.getOperator() instanceof SqlFunction)) { return null; } + final SqlFunction function = (SqlFunction) call.getOperator(); if (!isTableFunction(function)) { @@ -501,52 +504,109 @@ private static List getTableOperands(SqlNode node) { } return call.getOperandList().stream() - .map(FlinkCalciteSqlValidator::extractTableOperand) + .map(FlinkCalciteSqlValidator::extractExplicitTables) .collect(Collectors.toList()); } - private static @Nullable SqlIdentifier extractTableOperand(SqlNode op) { + /** Extracts "TABLE t" nodes before they get rewritten into "SELECT * FROM t". */ + private static @Nullable SqlIdentifier extractExplicitTables(SqlNode op) { if (op.getKind() == SqlKind.EXPLICIT_TABLE) { final SqlBasicCall opCall = (SqlBasicCall) op; if (opCall.operandCount() == 1 && opCall.operand(0) instanceof SqlIdentifier) { - // for TUMBLE(TABLE t3, ...) + // for f(TABLE t, ...) return opCall.operand(0); } } else if (op.getKind() == SqlKind.SET_SEMANTICS_TABLE) { - // for SESSION windows + // for f(TABLE t PARTITION BY x) final SqlBasicCall opCall = (SqlBasicCall) op; - final SqlCall setSemanticsTable = opCall.operand(0); - if (setSemanticsTable.operand(0) instanceof SqlIdentifier) { - return setSemanticsTable.operand(0); - } + return extractExplicitTables(opCall.operand(0)); } else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) { - // for TUMBLE(DATA => TABLE t3, ...) + // for f(in => TABLE t, ...) final SqlBasicCall opCall = (SqlBasicCall) op; - return extractTableOperand(opCall.operand(0)); + return extractExplicitTables(opCall.operand(0)); } return null; } - private static Stream extractDescriptors(SqlNode op) { + /** Extracts the on_time argument of a PTF (or TIMECOL for window PTFs for legacy reasons). */ + private static @Nullable List extractOnTime(SqlBasicCall call) { + // Extract from operand from PTF + final SqlNode onTimeOperand; + if (call.getOperator() instanceof SqlWindowTableFunction) { + onTimeOperand = extractOperandByArgName(call, "TIMECOL"); + } else if (ShortcutUtils.isFunctionKind(call.getOperator(), FunctionKind.PROCESS_TABLE)) { + onTimeOperand = extractOperandByArgName(call, "on_time"); + } else { + onTimeOperand = null; + } + + // No operand found + if (onTimeOperand == null) { + return null; + } + + return extractDescriptors(onTimeOperand); + } + + private static List extractDescriptors(SqlNode op) { if (op.getKind() == SqlKind.DESCRIPTOR) { - // for TUMBLE(..., DESCRIPTOR(col), ...) final SqlBasicCall opCall = (SqlBasicCall) op; return opCall.getOperandList().stream() .filter(SqlIdentifier.class::isInstance) - .map(SqlIdentifier.class::cast); - } else if (op.getKind() == SqlKind.SET_SEMANTICS_TABLE) { - // for SESSION windows - final SqlBasicCall opCall = (SqlBasicCall) op; - return ((SqlNodeList) opCall.operand(1)) - .stream() - .filter(SqlIdentifier.class::isInstance) - .map(SqlIdentifier.class::cast); - } else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) { - // for TUMBLE(..., TIMECOL => DESCRIPTOR(col), ...) - final SqlBasicCall opCall = (SqlBasicCall) op; - return extractDescriptors(opCall.operand(0)); + .map(SqlIdentifier.class::cast) + .collect(Collectors.toList()); + } + return List.of(); + } + + /** + * Returns the operand for a given argument name from a BasicSqlCall. Supports both positional + * and named arguments. If at least one ARGUMENT_ASSIGNMENT is used, named lookup is performed. + * Otherwise, positional lookup using SqlOperandMetadata is used. + * + * @param call the SQL call to extract the operand from + * @param argumentName the name of the argument to retrieve + * @return the SqlNode for the operand, or null if not found or not supported + */ + private static @Nullable SqlNode extractOperandByArgName( + SqlBasicCall call, String argumentName) { + // Check if operator supports SqlOperandMetadata + final SqlOperator operator = call.getOperator(); + final SqlOperandTypeChecker typeChecker = operator.getOperandTypeChecker(); + if (!(typeChecker instanceof SqlOperandMetadata)) { + return null; + } + + final SqlOperandMetadata operandMetadata = (SqlOperandMetadata) typeChecker; + + // Detect if named arguments are used by checking for ARGUMENT_ASSIGNMENT + final List operands = call.getOperandList(); + final boolean hasNamedArguments = + operands.stream().anyMatch(op -> op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT); + + if (hasNamedArguments) { + // Named mode: search through ARGUMENT_ASSIGNMENT nodes + for (SqlNode operand : operands) { + if (operand.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) { + final SqlBasicCall assignment = (SqlBasicCall) operand; + // operand(1) contains the parameter name as SqlIdentifier + final SqlIdentifier paramName = assignment.operand(1); + if (paramName.getSimple().equals(argumentName)) { + // operand(0) contains the actual value + return assignment.operand(0); + } + } + } + return null; + } else { + // Positional mode: use SqlOperandMetadata to map name to position + final List paramNames = operandMetadata.paramNames(); + final int index = paramNames.indexOf(argumentName); + if (index == -1 || index >= call.operandCount()) { + return null; + } + return call.operand(index); } - return Stream.empty(); } private static boolean isTableFunction(SqlFunction function) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java index 38bc5eb038ddf..415b78efeac09 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java @@ -25,6 +25,7 @@ import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionKind; import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.delegation.PlannerBase; @@ -156,6 +157,18 @@ public static DataTypeFactory unwrapDataTypeFactory(RelBuilder relBuilder) { return ((BridgingSqlFunction) call.getOperator()).getDefinition(); } + public static @Nullable FunctionDefinition unwrapFunctionDefinition(SqlOperator operator) { + if (!(operator instanceof BridgingSqlFunction)) { + return null; + } + return ((BridgingSqlFunction) operator).getDefinition(); + } + + public static boolean isFunctionKind(SqlOperator operator, FunctionKind kind) { + final FunctionDefinition functionDefinition = unwrapFunctionDefinition(operator); + return functionDefinition != null && functionDefinition.getKind() == kind; + } + public static @Nullable BridgingSqlFunction unwrapBridgingSqlFunction(RexCall call) { final SqlOperator operator = call.getOperator(); if (operator instanceof BridgingSqlFunction) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java index 19ea817ba20d9..79d1c3eacc250 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java @@ -18,20 +18,30 @@ package org.apache.flink.table.planner.plan.stream.sql; +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.types.Row; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; +import java.util.List; +import static org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY; +import static org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH; +import static org.apache.flink.table.annotation.ArgumentTrait.ROW_SEMANTIC_TABLE; +import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE; import static org.apache.flink.table.api.config.TableConfigOptions.ColumnExpansionStrategy.EXCLUDE_ALIASED_VIRTUAL_METADATA_COLUMNS; import static org.apache.flink.table.api.config.TableConfigOptions.ColumnExpansionStrategy.EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_COLUMN_EXPANSION_STRATEGY; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link TableConfigOptions#TABLE_COLUMN_EXPANSION_STRATEGY}. */ class ColumnExpansionTest { @@ -87,7 +97,7 @@ void testExcludeDefaultVirtualMetadataColumns() { tableEnv.getConfig() .set( TABLE_COLUMN_EXPANSION_STRATEGY, - Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); + List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); // From one table assertColumnNames( @@ -158,7 +168,7 @@ void testExcludeAliasedVirtualMetadataColumns() { tableEnv.getConfig() .set( TABLE_COLUMN_EXPANSION_STRATEGY, - Collections.singletonList(EXCLUDE_ALIASED_VIRTUAL_METADATA_COLUMNS)); + List.of(EXCLUDE_ALIASED_VIRTUAL_METADATA_COLUMNS)); // From one table assertColumnNames( @@ -238,7 +248,7 @@ void testExplicitTableWithinTableFunction() { tableEnv.getConfig() .set( TABLE_COLUMN_EXPANSION_STRATEGY, - Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); + List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); // t3_m_virtual is selected due to expansion of the explicit table expression // with hints from descriptor @@ -265,7 +275,7 @@ void testSetSemanticsTableWithinTableFunction() { tableEnv.getConfig() .set( TABLE_COLUMN_EXPANSION_STRATEGY, - Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); + List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); // t3_m_virtual is selected due to expansion of the explicit table expression // with hints from descriptor @@ -293,7 +303,7 @@ void testExplicitTableWithinTableFunctionWithInsertIntoNamedColumns() { tableEnv.getConfig() .set( TABLE_COLUMN_EXPANSION_STRATEGY, - Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); + List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); tableEnv.executeSql( "CREATE TABLE sink (\n" @@ -318,7 +328,7 @@ void testSetSemanticsTableWithinTableFunctionWithInsertIntoNamedColumns() { tableEnv.getConfig() .set( TABLE_COLUMN_EXPANSION_STRATEGY, - Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); + List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); tableEnv.executeSql( "CREATE TABLE sink (\n" @@ -346,7 +356,7 @@ void testExplicitTableWithinTableFunctionWithNamedArgs() { tableEnv.getConfig() .set( TABLE_COLUMN_EXPANSION_STRATEGY, - Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); + List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); // t3_m_virtual is selected due to expansion of the explicit table expression // with hints from descriptor @@ -374,7 +384,7 @@ void testSetSemanticsTableFunctionWithNamedArgs() { tableEnv.getConfig() .set( TABLE_COLUMN_EXPANSION_STRATEGY, - Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); + List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); // t3_m_virtual is selected due to expansion of the explicit table expression // with hints from descriptor @@ -396,4 +406,67 @@ void testSetSemanticsTableFunctionWithNamedArgs() { "t3_s", "agg"); } + + @Test + void testProcessTableFunctionWithOnTime() { + tableEnv.getConfig() + .set( + TABLE_COLUMN_EXPANSION_STRATEGY, + List.of(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS)); + + // Register PTF that requires on_time + tableEnv.createTemporarySystemFunction("singlePtf", PassThroughPtf.class); + tableEnv.createTemporarySystemFunction("multiPtf", MultiInputPtf.class); + + // t3_m_virtual is not selected due to missing on_time descriptor + assertColumnNames("SELECT * FROM singlePtf(r => TABLE t3)", "t3_s", "t3_i", "out"); + assertColumnNames("SELECT * FROM singlePtf(TABLE t3)", "t3_s", "t3_i", "out"); + + // t3_m_virtual is selected due to expansion of the explicit table expression + // with hints from the on_time descriptor + assertColumnNames( + "SELECT * FROM singlePtf(r => TABLE t3, on_time => DESCRIPTOR(t3_m_virtual))", + "t3_s", + "t3_i", + "t3_m_virtual", + "out", + "rowtime"); + assertColumnNames( + "SELECT * FROM singlePtf(TABLE t3, DESCRIPTOR(t3_m_virtual))", + "t3_s", + "t3_i", + "t3_m_virtual", + "out", + "rowtime"); + + assertThatThrownBy( + () -> + tableEnv.sqlQuery( + "SELECT * FROM multiPtf(TABLE t3, TABLE t2, DESCRIPTOR(t3_m_virtual, t2_m_virtual))")) + // Message indicates that 't2_m_virtual' was correctly resolved + // and passed to PTF type inference + .hasRootCauseMessage( + "Unsupported data type for time attribute. The `on_time` argument " + + "must reference a TIMESTAMP or TIMESTAMP_LTZ column (up to " + + "precision 3). However, column 't2_m_virtual' in table " + + "argument 'r2' has data type 'INT'."); + } + + @DataTypeHint("ROW") + public static class PassThroughPtf extends ProcessTableFunction { + @SuppressWarnings("unused") + public void eval(@ArgumentHint({ROW_SEMANTIC_TABLE, PASS_COLUMNS_THROUGH}) Row r) { + // dummy + } + } + + @DataTypeHint("ROW") + public static class MultiInputPtf extends ProcessTableFunction { + @SuppressWarnings("unused") + public void eval( + @ArgumentHint({SET_SEMANTIC_TABLE, OPTIONAL_PARTITION_BY}) Row r1, + @ArgumentHint({SET_SEMANTIC_TABLE, OPTIONAL_PARTITION_BY}) Row r2) { + // dummy + } + } }