Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.util.Preconditions;

/**
* Container class used by {@link StorageApiWritesShardedRecords} and {@link
Expand Down Expand Up @@ -138,22 +139,29 @@ public void close() {
}
}

@Memoized
public byte[] getTableSchemaHash() {
return TableRowToStorageApiProto.tableSchemaHash(getTableSchema());
}

boolean hasSchemaChanged(TableSchema updatedTableSchema) {
return updatedTableSchema.hashCode() != getTableSchema().hashCode();
}

public ByteString encodeUnknownFields(TableRow unknown, boolean ignoreUnknownValues)
throws TableRowToStorageApiProto.SchemaConversionException {
Message msg =
TableRowToStorageApiProto.messageFromTableRow(
getSchemaInformation(),
getDescriptorIgnoreRequired(),
unknown,
ignoreUnknownValues,
true,
null,
null,
null);
Preconditions.checkArgumentNotNull(
TableRowToStorageApiProto.messageFromTableRow(
getSchemaInformation(),
getDescriptorIgnoreRequired(),
unknown,
ignoreUnknownValues,
true,
null,
null,
null,
TableRowToStorageApiProto.ErrorCollector.DONT_COLLECT));
return msg.toByteString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3246,8 +3246,12 @@ public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
/**
* Allows the schema of the destination table to be updated as a side effect of the write.
*
* <p>This configuration applies only when writing to BigQuery with {@link Method#FILE_LOADS} as
* <p>This configuration applies only when writing to BigQuery with {@link Method#FILE_LOADS},
* {@link Method#STORAGE_WRITE_API", or {@link Method#STORAGE_API_AT_LEAST_ONCE} as
* method.
* <p>If using with storage-write API, new fields (except for nested messages) will always be created with type
* STRING.
* TODO: Followon PR will add support for a user-supplied type mapping.
*/
public Write<T> withSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions) {
checkArgument(schemaUpdateOptions != null, "schemaUpdateOptions can not be null");
Expand Down Expand Up @@ -4202,9 +4206,18 @@ private <DestinationT> WriteResult continueExpandTyped(
}
return input.apply(batchLoads);
} else if (method == Method.STORAGE_WRITE_API || method == Method.STORAGE_API_AT_LEAST_ONCE) {
boolean useSchemaUpdate =
getSchemaUpdateOptions() != null && !getSchemaUpdateOptions().isEmpty();
if (useSchemaUpdate) {
checkArgument(
!getAutoSchemaUpdate() && !getIgnoreUnknownValues(),
"Schema update options are not supported when using auto schema update or ignore unknown values");
}
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
StorageApiDynamicDestinations<T, DestinationT> storageApiDynamicDestinations;
if (getUseBeamSchema()) {
checkArgument(
!useSchemaUpdate, "SchemaUpdateOptions are not supported when using Beam schemas");
Comment thread
ahmedabu98 marked this conversation as resolved.
// This ensures that the Beam rows are directly translated into protos for Storage API
// writes, with no
// need to round trip through JSON TableRow objects.
Expand All @@ -4216,6 +4229,9 @@ private <DestinationT> WriteResult continueExpandTyped(
getFormatRecordOnFailureFunction(),
getRowMutationInformationFn() != null);
} else if (getWriteProtosClass() != null && getDirectWriteProtos()) {
checkArgument(
!useSchemaUpdate, "SchemaUpdateOptions are not supported when writing protos");

// We could support both of these by falling back to
// StorageApiDynamicDestinationsTableRow. This
// would defeat the optimization (we would be forced to create a new dynamic proto message
Expand All @@ -4233,13 +4249,17 @@ private <DestinationT> WriteResult continueExpandTyped(
!getIgnoreUnknownValues(),
"ignoreUnknownValues not supported when using writeProtos."
+ " Try setting withDirectWriteProtos(false)");

storageApiDynamicDestinations =
(StorageApiDynamicDestinations<T, DestinationT>)
new StorageApiDynamicDestinationsProto(
dynamicDestinations,
getWriteProtosClass(),
getFormatRecordOnFailureFunction());
} else if (getAvroRowWriterFactory() != null) {
checkArgument(
!useSchemaUpdate, "SchemaUpdateOptions are not supported when writing avros");

// we can configure the avro to storage write api proto converter for this
// assuming the format function returns an Avro GenericRecord
// and there is a schema defined
Expand All @@ -4248,7 +4268,6 @@ private <DestinationT> WriteResult continueExpandTyped(
|| getDynamicDestinations() != null
|| getSchemaFromView() != null,
"A schema must be provided for avro rows to be used with StorageWrite API.");

RowWriterFactory.AvroRowWriterFactory<T, GenericRecord, DestinationT>
recordWriterFactory =
(RowWriterFactory.AvroRowWriterFactory<T, GenericRecord, DestinationT>)
Expand All @@ -4275,7 +4294,10 @@ private <DestinationT> WriteResult continueExpandTyped(
getRowMutationInformationFn() != null,
getCreateDisposition(),
getIgnoreUnknownValues(),
getAutoSchemaUpdate());
getAutoSchemaUpdate(),
getSchemaUpdateOptions() == null
? Collections.emptySet()
: getSchemaUpdateOptions());
}

int numShards = getStorageApiNumStreams(bqOptions);
Expand All @@ -4287,6 +4309,7 @@ private <DestinationT> WriteResult continueExpandTyped(
StorageApiLoads<DestinationT, T> storageApiLoads =
new StorageApiLoads<>(
destinationCoder,
elementCoder,
storageApiDynamicDestinations,
getRowMutationInformationFn(),
getCreateDisposition(),
Expand All @@ -4304,7 +4327,8 @@ private <DestinationT> WriteResult continueExpandTyped(
getDefaultMissingValueInterpretation(),
getBigLakeConfiguration(),
getBadRecordRouter(),
getBadRecordErrorHandler());
getBadRecordErrorHandler(),
!getSchemaUpdateOptions().isEmpty());
return input.apply("StorageApiLoads", storageApiLoads);
} else {
throw new RuntimeException("Unexpected write method " + method);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,12 @@ public interface BigQueryOptions
Boolean getGroupFilesFileLoad();

void setGroupFilesFileLoad(Boolean value);

@Hidden
@Description(
"The number of parallelization to use for buffering elements when upgrading table schemas.")
@Default.Integer(50)
Integer getSchemaUpgradeBufferingShards();

void setSchemaUpgradeBufferingShards(Integer value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ <T> long insertAll(
/** Patch BigQuery {@link Table} description. */
Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription)
throws IOException, InterruptedException;

Table patchTableSchema(
TableReference tableReference, com.google.api.services.bigquery.model.TableSchema newSchema)
throws IOException, InterruptedException;
}

/** An interface to get, create and flush Cloud BigQuery STORAGE API write streams. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1459,6 +1459,31 @@ public Table patchTableDescription(
ALWAYS_RETRY);
}

@Override
public Table patchTableSchema(
TableReference tableReference, com.google.api.services.bigquery.model.TableSchema newSchema)
throws IOException, InterruptedException {
Table newTable = new Table();
newTable.setSchema(newSchema);

Tables.Patch request =
client
.tables()
.patch(
tableReference.getProjectId(),
tableReference.getDatasetId(),
tableReference.getTableId(),
newTable);
return executeWithRetries(
request,
String.format(
"Unable to patch table: %s, aborting after %d retries.",
tableReference, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
createDefaultBackoff(),
DONT_RETRY_INVALID_ARG_OR_PRECONDITION);
}

@Override
public void close() throws Exception {
// Nothing to close
Expand Down Expand Up @@ -1664,6 +1689,11 @@ public void close() throws Exception {
return !errorExtractor.itemNotFound(input);
};

static final SerializableFunction<IOException, Boolean> DONT_RETRY_INVALID_ARG_OR_PRECONDITION =
input -> {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
return !errorExtractor.preconditionNotMet(input) && !errorExtractor.badRequest(input);
};
static final SerializableFunction<IOException, Boolean> ALWAYS_RETRY = input -> true;

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public enum RpcMethod {
STREAMING_INSERTS,
APPEND_ROWS,
FLUSH_ROWS,
FINALIZE_STREAM
FINALIZE_STREAM,
PATCH_TABLE,
OPEN_WRITE_STREAM
}

// Status of a BigQuery row from the AppendRows RPC call.
Expand Down
Loading
Loading