Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ public abstract class ConsistencyRequest {
@Nullable
public abstract String getConsistencyToken();

protected abstract boolean isFullyQualified();

public static ConsistencyRequest forReplication(String tableId) {
return new AutoValue_ConsistencyRequest(
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, null);
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, null, false);
}

/**
Expand All @@ -59,17 +61,41 @@ public static ConsistencyRequest forReplication(String tableId, String consisten
Preconditions.checkNotNull(consistencyToken, "consistencyToken must not be null");

return new AutoValue_ConsistencyRequest(
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, consistencyToken);
tableId,
CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES,
consistencyToken,
false);
}

public static ConsistencyRequest forDataBoost(String tableId) {
return new AutoValue_ConsistencyRequest(
tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES, null);
tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES, null, false);
}

@InternalApi
public static ConsistencyRequest forReplicationFromTableName(String tableName) {
return new AutoValue_ConsistencyRequest(
tableName, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, null, true);
}

@InternalApi
public static ConsistencyRequest forReplicationFromTableName(
String tableName, String consistencyToken) {
Preconditions.checkNotNull(consistencyToken, "consistencyToken must not be null");
Comment thread
jinseopkim0 marked this conversation as resolved.

return new AutoValue_ConsistencyRequest(
tableName,
Comment thread
jinseopkim0 marked this conversation as resolved.
CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES,
consistencyToken,
true);
}

@InternalApi
public CheckConsistencyRequest toCheckConsistencyProto(
TableAdminRequestContext requestContext, String token) {
Preconditions.checkState(
!isFullyQualified(),
"Use toCheckConsistencyProto(String token) for fully qualified table names.");
Comment thread
jinseopkim0 marked this conversation as resolved.
CheckConsistencyRequest.Builder builder = CheckConsistencyRequest.newBuilder();
TableName tableName =
TableName.of(requestContext.getProjectId(), requestContext.getInstanceId(), getTableId());
Expand All @@ -83,13 +109,41 @@ public CheckConsistencyRequest toCheckConsistencyProto(
return builder.setName(tableName.toString()).setConsistencyToken(token).build();
}

@InternalApi
public CheckConsistencyRequest toCheckConsistencyProto(String token) {
Preconditions.checkState(
isFullyQualified(),
"Use toCheckConsistencyProto(TableAdminRequestContext, String) for non-qualified table"
+ " names.");
CheckConsistencyRequest.Builder builder = CheckConsistencyRequest.newBuilder();

if (getMode().equals(CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES)) {
builder.setStandardReadRemoteWrites(StandardReadRemoteWrites.newBuilder().build());
} else {
builder.setDataBoostReadLocalWrites(DataBoostReadLocalWrites.newBuilder().build());
Comment thread
jinseopkim0 marked this conversation as resolved.
Outdated
}

return builder.setName(getTableId()).setConsistencyToken(token).build();
}

@InternalApi
public GenerateConsistencyTokenRequest toGenerateTokenProto(
TableAdminRequestContext requestContext) {
Preconditions.checkState(
!isFullyQualified(), "Use toGenerateTokenProto() for fully qualified table names.");
GenerateConsistencyTokenRequest.Builder builder = GenerateConsistencyTokenRequest.newBuilder();
TableName tableName =
TableName.of(requestContext.getProjectId(), requestContext.getInstanceId(), getTableId());

return builder.setName(tableName.toString()).build();
}

@InternalApi
public GenerateConsistencyTokenRequest toGenerateTokenProto() {
Preconditions.checkState(
isFullyQualified(),
"Use toGenerateTokenProto(TableAdminRequestContext) for non-qualified table names.");
GenerateConsistencyTokenRequest.Builder builder = GenerateConsistencyTokenRequest.newBuilder();
return builder.setName(getTableId()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import javax.annotation.Nullable;

/**
* Callable that waits until either replication or Data Boost has caught up to the point it was
Expand All @@ -56,15 +57,15 @@ class AwaitConsistencyCallable extends UnaryCallable<ConsistencyRequest, Void> {
private final UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable;
private final RetryingExecutor<CheckConsistencyResponse> executor;

private final TableAdminRequestContext requestContext;
@Nullable private final TableAdminRequestContext requestContext;

static AwaitConsistencyCallable create(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
ClientContext clientContext,
RetrySettings pollingSettings,
TableAdminRequestContext requestContext) {
@Nullable TableAdminRequestContext requestContext) {

RetryAlgorithm<CheckConsistencyResponse> retryAlgorithm =
new RetryAlgorithm<>(
Expand All @@ -78,13 +79,22 @@ static AwaitConsistencyCallable create(
generateCallable, checkCallable, retryingExecutor, requestContext);
}

static AwaitConsistencyCallable create(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
ClientContext clientContext,
RetrySettings pollingSettings) {
return create(generateCallable, checkCallable, clientContext, pollingSettings, null);
}

@VisibleForTesting
AwaitConsistencyCallable(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
RetryingExecutor<CheckConsistencyResponse> executor,
TableAdminRequestContext requestContext) {
@Nullable TableAdminRequestContext requestContext) {
this.generateCallable = generateCallable;
this.checkCallable = checkCallable;
this.executor = executor;
Expand All @@ -98,22 +108,30 @@ public ApiFuture<Void> futureCall(
// If the token is already provided, skip generation and poll directly.
if (consistencyRequest.getConsistencyToken() != null) {
CheckConsistencyRequest request =
consistencyRequest.toCheckConsistencyProto(
requestContext, consistencyRequest.getConsistencyToken());
requestContext == null
? consistencyRequest.toCheckConsistencyProto(consistencyRequest.getConsistencyToken())
: consistencyRequest.toCheckConsistencyProto(
requestContext, consistencyRequest.getConsistencyToken());
return pollToken(request, apiCallContext);
}

ApiFuture<GenerateConsistencyTokenResponse> tokenFuture =
generateToken(consistencyRequest.toGenerateTokenProto(requestContext), apiCallContext);
generateToken(
requestContext == null
? consistencyRequest.toGenerateTokenProto()
: consistencyRequest.toGenerateTokenProto(requestContext),
apiCallContext);

return ApiFutures.transformAsync(
tokenFuture,
new ApiAsyncFunction<GenerateConsistencyTokenResponse, Void>() {
@Override
public ApiFuture<Void> apply(GenerateConsistencyTokenResponse input) {
CheckConsistencyRequest request =
consistencyRequest.toCheckConsistencyProto(
requestContext, input.getConsistencyToken());
requestContext == null
? consistencyRequest.toCheckConsistencyProto(input.getConsistencyToken())
: consistencyRequest.toCheckConsistencyProto(
requestContext, input.getConsistencyToken());
return pollToken(request, apiCallContext);
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,19 @@ public class EnhancedBigtableTableAdminStub extends GrpcBigtableTableAdminStub {
private final BigtableTableAdminStubSettings settings;
private final ClientContext clientContext;

private final TableAdminRequestContext requestContext;
@javax.annotation.Nullable private final TableAdminRequestContext requestContext;

@Deprecated private final AwaitReplicationCallable awaitReplicationCallable;

private final AwaitConsistencyCallable awaitConsistencyCallable;
private final OperationCallable<Void, Empty, OptimizeRestoredTableMetadata>
optimizeRestoredTableOperationBaseCallable;

public static EnhancedBigtableTableAdminStub createEnhanced(
BigtableTableAdminStubSettings settings) throws IOException {
return new EnhancedBigtableTableAdminStub(settings, ClientContext.create(settings), null);
}
Comment thread
jinseopkim0 marked this conversation as resolved.

Comment thread
jinseopkim0 marked this conversation as resolved.
public static EnhancedBigtableTableAdminStub createEnhanced(
BigtableTableAdminStubSettings settings, TableAdminRequestContext requestContext)
throws IOException {
Expand All @@ -72,7 +77,7 @@ public static EnhancedBigtableTableAdminStub createEnhanced(
private EnhancedBigtableTableAdminStub(
BigtableTableAdminStubSettings settings,
ClientContext clientContext,
TableAdminRequestContext requestContext)
@javax.annotation.Nullable TableAdminRequestContext requestContext)
throws IOException {
super(settings, clientContext);

Expand Down
Loading