diff --git a/pom.xml b/pom.xml
index 30bd75657..f166848dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
io.cdap.plugin
google-cloud
- 0.23.6
+ 0.23.7-SNAPSHOT
Google Cloud Plugins
jar
Plugins for Google Big Query
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java
index fb3cd84eb..e3a195e0d 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java
@@ -24,6 +24,7 @@
import com.google.api.services.bigquery.model.TableReference;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.StandardTableDefinition;
+import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableDefinition.Type;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat;
@@ -139,7 +140,7 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath, null);
Type type = Objects.requireNonNull(bigQueryTable).getDefinition().getType();
Boolean isPartitionFilterRequired = bigQueryTable.getRequirePartitionFilter();
- StandardTableDefinition tableDefinition = Objects.requireNonNull(bigQueryTable).getDefinition();
+ TableDefinition tableDefinition = Objects.requireNonNull(bigQueryTable).getDefinition();
String query;
if (type == Type.VIEW || type == Type.MATERIALIZED_VIEW || type == Type.EXTERNAL) {
@@ -173,7 +174,7 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
@VisibleForTesting
String generateQuery(String partitionFromDate, String partitionToDate, String filter,
String datasetProject, String dataset, String table, String limit, String orderBy,
- Boolean isPartitionFilterRequired, StandardTableDefinition tableDefinition) {
+ Boolean isPartitionFilterRequired, TableDefinition tableDef) {
if (Strings.isNullOrEmpty(filter) && Strings.isNullOrEmpty(orderBy) && Strings.isNullOrEmpty(
limit)
@@ -181,6 +182,13 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi
return null;
}
+ if (!(tableDef instanceof StandardTableDefinition)) {
+ throw new IllegalArgumentException(
+ String.format("Unsupported BigQuery table type for filtering/partitioning: %s. " +
+ "Cannot apply filters, limits, or ordering.", tableDef.getType()));
+ }
+
+ StandardTableDefinition tableDefinition = (StandardTableDefinition) tableDef;
RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning();
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();
StringBuilder condition = new StringBuilder();