diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java index 163403f7f..1fd28bb2b 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java @@ -157,7 +157,7 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc query = generateQuery(partitionFromDate, partitionToDate, filter, datasetProjectId, datasetId, tableName, limit, orderBy, - isPartitionFilterRequired, (StandardTableDefinition) tableDefinition); + isPartitionFilterRequired, tableDefinition); } if (query != null) { @@ -181,7 +181,7 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc @VisibleForTesting String generateQuery(String partitionFromDate, String partitionToDate, String filter, String datasetProject, String dataset, String table, String limit, String orderBy, - Boolean isPartitionFilterRequired, StandardTableDefinition tableDefinition) { + Boolean isPartitionFilterRequired, TableDefinition tableDef) { if (Strings.isNullOrEmpty(filter) && Strings.isNullOrEmpty(orderBy) && Strings.isNullOrEmpty( limit) @@ -189,6 +189,13 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi return null; } + if (!(tableDef instanceof StandardTableDefinition)) { + throw new IllegalArgumentException( + String.format("Unsupported BigQuery table type for filtering/partitioning: %s. " + + "Cannot apply filters, limits, or ordering.", tableDef.getType())); + } + + StandardTableDefinition tableDefinition = (StandardTableDefinition) tableDef; RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning(); TimePartitioning timePartitioning = tableDefinition.getTimePartitioning(); StringBuilder condition = new StringBuilder();