Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.phoenix.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -104,6 +105,17 @@ public abstract class ReplicationLogDiscovery {

public static final int DEFAULT_IN_PROGRESS_FILE_MIN_AGE_SECONDS = 60;

/**
* Configuration key for the percentage buffer added to the round time to determine the minimum
* age of a new file before it becomes eligible for processing. This prevents processing files
* that are still being written by the source cluster. The minimum age is calculated as:
* roundTimeMs + (roundTimeMs * percentage / 100)
*/
public static final String REPLICATION_NEW_FILE_AGE_BUFFER_PERCENTAGE_KEY =
"phoenix.replication.new.file.age.buffer.percentage";

public static final double DEFAULT_NEW_FILE_AGE_BUFFER_PERCENTAGE = 15.0;

protected final Configuration conf;
protected final String haGroupName;
protected final ReplicationLogTracker replicationLogTracker;
Expand Down Expand Up @@ -283,19 +295,54 @@ protected boolean shouldProcessInProgressDirectory() {

/**
* Processes all new files for a specific replication round. Continuously processes files until no
* new files remain for the round.
* new files remain for the round. Files that are younger than the configured minimum age
* (roundTime + buffer percentage) are skipped to avoid processing files still being written by
* the source cluster. When all remaining files are too young, sleeps for the minimum time needed
* until the oldest young file becomes eligible.
* @param replicationRound - The replication round for which to process new files
* @throws IOException if there's an error during file processing
*/
protected void processNewFilesForRound(ReplicationRound replicationRound) throws IOException {
LOG.info("Starting new files processing for round: {} for haGroup: {}", replicationRound,
haGroupName);
long startTime = EnvironmentEdgeManager.currentTime();
List<Path> files = replicationLogTracker.getNewFilesForRound(replicationRound);
LOG.info("Number of new files for round {} is {}", replicationRound, files.size());
while (!files.isEmpty()) {
processOneRandomFile(files);
files = replicationLogTracker.getNewFilesForRound(replicationRound);
List<Path> allFiles = replicationLogTracker.getNewFilesForRound(replicationRound);
LOG.info("Number of new files for round {} is {}", replicationRound, allFiles.size());
long minAgeMs = getNewFileMinAgeMs();
while (!allFiles.isEmpty()) {
long currentTime = EnvironmentEdgeManager.currentTime();

List<Path> eligible = new ArrayList<>();
long oldestYoungFileTimestamp = Long.MAX_VALUE;
for (Path f : allFiles) {
long fileTs = replicationLogTracker.getFileTimestamp(f);
if (currentTime - fileTs >= minAgeMs) {
eligible.add(f);
} else {
oldestYoungFileTimestamp = Math.min(oldestYoungFileTimestamp, fileTs);
}
}

if (!eligible.isEmpty()) {
processOneRandomFile(eligible);
} else {
long sleepTime = oldestYoungFileTimestamp + minAgeMs - currentTime;
if (sleepTime > 0) {
LOG.info(
"All {} new files are too young for round {} (minAge={}ms). "
+ "Sleeping {}ms for haGroup: {}",
allFiles.size(), replicationRound, minAgeMs, sleepTime, haGroupName);
try {
waitForFileAge(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while waiting for files to age for haGroup: {}", haGroupName);
break;
}
}
}

allFiles = replicationLogTracker.getNewFilesForRound(replicationRound);
}
long duration = EnvironmentEdgeManager.currentTime() - startTime;
LOG.info("Finished new files processing for round: {} in {}ms for haGroup: {}",
Expand Down Expand Up @@ -507,6 +554,36 @@ public double getWaitingBufferPercentage() {
return DEFAULT_WAITING_BUFFER_PERCENTAGE;
}

/**
* Returns the percentage buffer for new file age calculation. The minimum age of a new file is:
* roundTimeMs + (roundTimeMs * percentage / 100). Subclasses can override to provide custom
* percentages.
* @return The buffer percentage (default 15.0%)
*/
public double getNewFileAgeBufferPercentage() {
return conf.getDouble(REPLICATION_NEW_FILE_AGE_BUFFER_PERCENTAGE_KEY,
DEFAULT_NEW_FILE_AGE_BUFFER_PERCENTAGE);
}

/**
* Returns the minimum age in milliseconds for a new file to be eligible for processing. Computed
* as roundTimeMs + (roundTimeMs * bufferPercentage / 100).
* @return The minimum file age in milliseconds
*/
public long getNewFileMinAgeMs() {
return roundTimeMills + (long) (roundTimeMills * getNewFileAgeBufferPercentage() / 100.0);
}

/**
* Waits for the specified duration to allow young files to age past the minimum threshold.
* Subclasses can override this method for testing purposes.
* @param sleepTimeMs - The time to wait in milliseconds
* @throws InterruptedException if the thread is interrupted while waiting
*/
protected void waitForFileAge(long sleepTimeMs) throws InterruptedException {
Thread.sleep(sleepTimeMs);
}

public int getInProgressFileMaxRetries() {
return conf.getInt(REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY,
DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ public class ReplicationLogDiscoveryReplay extends ReplicationLogDiscovery {
public static final String REPLICATION_REPLAY_WAITING_BUFFER_PERCENTAGE_KEY =
"phoenix.replication.replay.waiting.buffer.percentage";

/**
* Configuration key for new file age buffer percentage for replay
*/
public static final String REPLICATION_REPLAY_NEW_FILE_AGE_BUFFER_PERCENTAGE_KEY =
"phoenix.replication.replay.new.file.age.buffer.percentage";

/**
* Default replay interval in seconds. Controls how frequently the replay process runs.
*/
Expand Down Expand Up @@ -452,6 +458,12 @@ public double getWaitingBufferPercentage() {
DEFAULT_WAITING_BUFFER_PERCENTAGE);
}

@Override
public double getNewFileAgeBufferPercentage() {
return getConf().getDouble(REPLICATION_REPLAY_NEW_FILE_AGE_BUFFER_PERCENTAGE_KEY,
DEFAULT_NEW_FILE_AGE_BUFFER_PERCENTAGE);
}

protected ReplicationRound getLastRoundInSync() {
return lastRoundInSync;
}
Expand Down
Loading