Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions phoenix-core-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.79</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,33 @@ public static long getMaxLookbackInMillis(Configuration conf) {

/** Exposed for testing */
public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server";

/**
* The scan attribute to enable server-side chunk formation and checksum computation for
* PhoenixSyncTableTool.
*/
public static final String SYNC_TABLE_CHUNK_FORMATION = "_SyncTableChunkFormation";

/**
* The scan attribute to provide the target chunk size in bytes for PhoenixSyncTableTool.
*/
public static final String SYNC_TABLE_CHUNK_SIZE_BYTES = "_SyncTableChunkSizeBytes";

/**
* The scan attribute to provide the MessageDigest state for cross-region hash continuation in
* PhoenixSyncTableTool.
*/
public static final String SYNC_TABLE_CONTINUED_DIGEST_STATE = "_SyncTableContinuedDigestState";

/**
* PhoenixSyncTableTool chunk metadata cell qualifiers. These define the wire protocol between
* PhoenixSyncTableRegionScanner (server-side coprocessor) and PhoenixSyncTableMapper (client-side
* mapper). The coprocessor returns chunk metadata as HBase cells with these qualifiers, and the
* mapper parses them to extract chunk information.
*/
public static final byte[] SYNC_TABLE_START_KEY_QUALIFIER = Bytes.toBytes("START_KEY");
public static final byte[] SYNC_TABLE_HASH_QUALIFIER = Bytes.toBytes("HASH");
public static final byte[] SYNC_TABLE_ROW_COUNT_QUALIFIER = Bytes.toBytes("ROW_COUNT");
public static final byte[] SYNC_TABLE_IS_PARTIAL_CHUNK_QUALIFIER =
Bytes.toBytes("IS_PARTIAL_CHUNK");
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ public interface QueryServices extends SQLCloseable {
public static final String SERVER_SIDE_PRIOIRTY_ATTRIB = "phoenix.serverside.rpc.priority";
public static final String ALLOW_LOCAL_INDEX_ATTRIB = "phoenix.index.allowLocalIndex";

// Timeout config for PhoenixSyncTableTool
String SYNC_TABLE_QUERY_TIMEOUT_ATTRIB = "phoenix.sync.table.query.timeout";
String SYNC_TABLE_RPC_TIMEOUT_ATTRIB = "phoenix.sync.table.rpc.timeout";
String SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB = "phoenix.sync.table.client.scanner.timeout";
String SYNC_TABLE_RPC_RETRIES_COUNTER = "phoenix.sync.table.rpc.retries.counter";

// Retries when doing server side writes to SYSTEM.CATALOG
public static final String METADATA_WRITE_RETRIES_NUMBER = "phoenix.metadata.rpc.retries.number";
public static final String METADATA_WRITE_RETRY_PAUSE = "phoenix.metadata.rpc.pause";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,12 @@ public class QueryServicesOptions {
// hrs
public static final long DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD = 30000; // 30 secs

// 30 min scan timeout * 5 tries, with 2100ms total pause time between retries
public static final long DEFAULT_SYNC_TABLE_QUERY_TIMEOUT = (5 * 30000 * 60) + 2100;
public static final long DEFAULT_SYNC_TABLE_RPC_TIMEOUT = 30000 * 60; // 30 mins
public static final long DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT = 30000 * 60; // 30 mins
public static final int DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER = 5; // 5 total tries at rpc level

/**
* HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate
* and give some room for things in the middle
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.util;

import java.io.IOException;
import org.bouncycastle.crypto.digests.SHA256Digest;

/**
* Utility class for SHA-256 digest state serialization and deserialization. We are not using jdk
* bundled SHA, since their digest can't be serialized/deserialized which is needed for
* PhoenixSyncTableTool for cross-region hash continuation.
*/
public class SHA256DigestUtil {

private SHA256DigestUtil() {
}

/**
* Encodes a SHA256Digest state to a byte array.
* @param digest The digest whose state should be encoded
* @return Byte array containing the raw BouncyCastle encoded state
*/
public static byte[] encodeDigestState(SHA256Digest digest) {
return digest.getEncodedState();
}

/**
* Decodes a SHA256Digest state from a byte array.
* @param encodedState Byte array containing BouncyCastle encoded digest state
* @return SHA256Digest restored to the saved state
* @throws IOException if state is invalid, corrupted
*/
public static SHA256Digest decodeDigestState(byte[] encodedState) throws IOException {
if (encodedState == null || encodedState.length == 0) {
throw new IllegalArgumentException(
"Invalid encoded digest state: encodedState is null or empty");
}
return new SHA256Digest(encodedState);
}

/**
* Decodes a digest state and finalizes it to produce the SHA-256 checksum.
* @param encodedState Serialized BouncyCastle digest state
* @return 32-byte SHA-256 hash
* @throws IOException if state decoding fails
*/
public static byte[] finalizeDigestToChecksum(byte[] encodedState) throws IOException {
SHA256Digest digest = decodeDigestState(encodedState);
return finalizeDigestToChecksum(digest);
}

/**
* Finalizes a SHA256Digest to produce the final checksum.
* @param digest The digest to finalize
* @return 32-byte SHA-256 hash
*/
public static byte[] finalizeDigestToChecksum(SHA256Digest digest) {
byte[] hash = new byte[digest.getDigestSize()];
digest.doFinal(hash, 0);
return hash;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,11 @@ public static boolean isIndexRebuild(Scan scan) {
return scan.getAttribute((BaseScannerRegionObserverConstants.REBUILD_INDEXES)) != null;
}

public static boolean isSyncTableChunkFormationEnabled(Scan scan) {
return Arrays.equals(
scan.getAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_FORMATION), TRUE_BYTES);
}

public static int getClientVersion(Scan scan) {
int clientVersion = UNKNOWN_CLIENT_VERSION;
byte[] clientVersionBytes =
Expand Down
6 changes: 5 additions & 1 deletion phoenix-core-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,12 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.79</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand Down
Loading