From 16ccb4c118f5569dc0d30dcbc87651ac34efe08c Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Fri, 24 Apr 2026 17:01:15 +0530 Subject: [PATCH] PHOENIX-7792 Add wait time before processing NEW files to minimize the possibility of files being open during replay --- .../replication/ReplicationLogDiscovery.java | 89 ++++- .../reader/ReplicationLogDiscoveryReplay.java | 12 + .../ReplicationLogDiscoveryTest.java | 375 +++++++++++++++--- 3 files changed, 418 insertions(+), 58 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java index 175de94f871..f657e9e9a9d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java @@ -18,6 +18,7 @@ package org.apache.phoenix.replication; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -104,6 +105,17 @@ public abstract class ReplicationLogDiscovery { public static final int DEFAULT_IN_PROGRESS_FILE_MIN_AGE_SECONDS = 60; + /** + * Configuration key for the percentage buffer added to the round time to determine the minimum + * age of a new file before it becomes eligible for processing. This prevents processing files + * that are still being written by the source cluster. The minimum age is calculated as: + * roundTimeMs + (roundTimeMs * percentage / 100) + */ + public static final String REPLICATION_NEW_FILE_AGE_BUFFER_PERCENTAGE_KEY = + "phoenix.replication.new.file.age.buffer.percentage"; + + public static final double DEFAULT_NEW_FILE_AGE_BUFFER_PERCENTAGE = 15.0; + protected final Configuration conf; protected final String haGroupName; protected final ReplicationLogTracker replicationLogTracker; @@ -283,7 +295,10 @@ protected boolean shouldProcessInProgressDirectory() { /** * Processes all new files for a specific replication round. Continuously processes files until no - * new files remain for the round. + * new files remain for the round. Files that are younger than the configured minimum age + * (roundTime + buffer percentage) are skipped to avoid processing files still being written by + * the source cluster. When all remaining files are too young, sleeps for the minimum time needed + * until the oldest young file becomes eligible. * @param replicationRound - The replication round for which to process new files * @throws IOException if there's an error during file processing */ @@ -291,11 +306,43 @@ protected void processNewFilesForRound(ReplicationRound replicationRound) throws LOG.info("Starting new files processing for round: {} for haGroup: {}", replicationRound, haGroupName); long startTime = EnvironmentEdgeManager.currentTime(); - List files = replicationLogTracker.getNewFilesForRound(replicationRound); - LOG.info("Number of new files for round {} is {}", replicationRound, files.size()); - while (!files.isEmpty()) { - processOneRandomFile(files); - files = replicationLogTracker.getNewFilesForRound(replicationRound); + List allFiles = replicationLogTracker.getNewFilesForRound(replicationRound); + LOG.info("Number of new files for round {} is {}", replicationRound, allFiles.size()); + long minAgeMs = getNewFileMinAgeMs(); + while (!allFiles.isEmpty()) { + long currentTime = EnvironmentEdgeManager.currentTime(); + + List eligible = new ArrayList<>(); + long oldestYoungFileTimestamp = Long.MAX_VALUE; + for (Path f : allFiles) { + long fileTs = replicationLogTracker.getFileTimestamp(f); + if (currentTime - fileTs >= minAgeMs) { + eligible.add(f); + } else { + oldestYoungFileTimestamp = Math.min(oldestYoungFileTimestamp, fileTs); + } + } + + if (!eligible.isEmpty()) { + processOneRandomFile(eligible); + } else { + long sleepTime = oldestYoungFileTimestamp + minAgeMs - currentTime; + if (sleepTime > 0) { + LOG.info( + "All {} new files are too young for round {} (minAge={}ms). " + + "Sleeping {}ms for haGroup: {}", + allFiles.size(), replicationRound, minAgeMs, sleepTime, haGroupName); + try { + waitForFileAge(sleepTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while waiting for files to age for haGroup: {}", haGroupName); + break; + } + } + } + + allFiles = replicationLogTracker.getNewFilesForRound(replicationRound); } long duration = EnvironmentEdgeManager.currentTime() - startTime; LOG.info("Finished new files processing for round: {} in {}ms for haGroup: {}", @@ -507,6 +554,36 @@ public double getWaitingBufferPercentage() { return DEFAULT_WAITING_BUFFER_PERCENTAGE; } + /** + * Returns the percentage buffer for new file age calculation. The minimum age of a new file is: + * roundTimeMs + (roundTimeMs * percentage / 100). Subclasses can override to provide custom + * percentages. + * @return The buffer percentage (default 15.0%) + */ + public double getNewFileAgeBufferPercentage() { + return conf.getDouble(REPLICATION_NEW_FILE_AGE_BUFFER_PERCENTAGE_KEY, + DEFAULT_NEW_FILE_AGE_BUFFER_PERCENTAGE); + } + + /** + * Returns the minimum age in milliseconds for a new file to be eligible for processing. Computed + * as roundTimeMs + (roundTimeMs * bufferPercentage / 100). + * @return The minimum file age in milliseconds + */ + public long getNewFileMinAgeMs() { + return roundTimeMills + (long) (roundTimeMills * getNewFileAgeBufferPercentage() / 100.0); + } + + /** + * Waits for the specified duration to allow young files to age past the minimum threshold. + * Subclasses can override this method for testing purposes. + * @param sleepTimeMs - The time to wait in milliseconds + * @throws InterruptedException if the thread is interrupted while waiting + */ + protected void waitForFileAge(long sleepTimeMs) throws InterruptedException { + Thread.sleep(sleepTimeMs); + } + public int getInProgressFileMaxRetries() { return conf.getInt(REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY, DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java index f20ff7ae217..908585c9522 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java @@ -87,6 +87,12 @@ public class ReplicationLogDiscoveryReplay extends ReplicationLogDiscovery { public static final String REPLICATION_REPLAY_WAITING_BUFFER_PERCENTAGE_KEY = "phoenix.replication.replay.waiting.buffer.percentage"; + /** + * Configuration key for new file age buffer percentage for replay + */ + public static final String REPLICATION_REPLAY_NEW_FILE_AGE_BUFFER_PERCENTAGE_KEY = + "phoenix.replication.replay.new.file.age.buffer.percentage"; + /** * Default replay interval in seconds. Controls how frequently the replay process runs. */ @@ -452,6 +458,12 @@ public double getWaitingBufferPercentage() { DEFAULT_WAITING_BUFFER_PERCENTAGE); } + @Override + public double getNewFileAgeBufferPercentage() { + return getConf().getDouble(REPLICATION_REPLAY_NEW_FILE_AGE_BUFFER_PERCENTAGE_KEY, + DEFAULT_NEW_FILE_AGE_BUFFER_PERCENTAGE); + } + protected ReplicationRound getLastRoundInSync() { return lastRoundInSync; } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java index caacf175ced..1e976c1e1ef 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java @@ -933,67 +933,79 @@ public void testProcessInProgressDirectoryWithIntermittentFailure() throws IOExc .when(discovery) .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file3Prefix))); - // Process in-progress directory - discovery.processInProgressDirectory(); + // Inject a monotonically advancing clock so that the rename timestamp from markInProgress + // is always strictly less than the threshold computed in the next loop iteration. + // Without this, both can land on the same millisecond causing renameTs < threshold to be + // false, which makes the retry file invisible to getOlderInProgressFiles. + AtomicLong clock = new AtomicLong(EnvironmentEdgeManager.currentTime()); + EnvironmentEdgeManager.injectEdge(() -> clock.incrementAndGet()); - // Verify that markInProgress was called 7 times (5 initially + 2 for retries) - Mockito.verify(fileTracker, Mockito.times(7)).markInProgress(Mockito.any(Path.class)); + try { + // Process in-progress directory + discovery.processInProgressDirectory(); - // Verify that markInProgress was called for each expected file - // Files 1 and 3 are called twice (initial attempt + retry), others once - for (int i = 0; i < allInProgressFiles.size(); i++) { - Path expectedFile = allInProgressFiles.get(i); - String expectedPrefix = extractPrefix(expectedFile.getName()); - int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are retried - Mockito.verify(fileTracker, Mockito.times(expectedTimes)).markInProgress( - Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); - } + // Verify that markInProgress was called 7 times (5 initially + 2 for retries) + Mockito.verify(fileTracker, Mockito.times(7)).markInProgress(Mockito.any(Path.class)); - // Verify that processFile was called for each file in the directory (i.e. 5 + 2 times for - // failed once that would succeed in next retry) - Mockito.verify(discovery, Mockito.times(7)).processFile(Mockito.any(Path.class)); + // Verify that markInProgress was called for each expected file + // Files 1 and 3 are called twice (initial attempt + retry), others once + for (int i = 0; i < allInProgressFiles.size(); i++) { + Path expectedFile = allInProgressFiles.get(i); + String expectedPrefix = extractPrefix(expectedFile.getName()); + int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are retried + Mockito.verify(fileTracker, Mockito.times(expectedTimes)).markInProgress( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } - // Verify that processFile was called for each specific file (using prefix matching) - // Files 1 and 3 should be called twice (fail once, succeed on retry), others once - for (int i = 0; i < allInProgressFiles.size(); i++) { - Path expectedFile = allInProgressFiles.get(i); - String expectedPrefix = extractPrefix(expectedFile.getName()); - int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are called twice (fail + - // retry success) - Mockito.verify(discovery, Mockito.times(expectedTimes)) - .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); - } + // Verify that processFile was called for each file in the directory (i.e. 5 + 2 times for + // failed once that would succeed in next retry) + Mockito.verify(discovery, Mockito.times(7)).processFile(Mockito.any(Path.class)); - // Verify that markCompleted was called for each successfully processed file - Mockito.verify(fileTracker, Mockito.times(5)).markCompleted(Mockito.any(Path.class)); + // Verify that processFile was called for each specific file (using prefix matching) + // Files 1 and 3 should be called twice (fail once, succeed on retry), others once + for (int i = 0; i < allInProgressFiles.size(); i++) { + Path expectedFile = allInProgressFiles.get(i); + String expectedPrefix = extractPrefix(expectedFile.getName()); + int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are called twice (fail + + // retry success) + Mockito.verify(discovery, Mockito.times(expectedTimes)).processFile( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } - // Verify that markCompleted was called for 2 intermittent failed processed file - Mockito.verify(fileTracker, Mockito.times(2)).markFailed(Mockito.any(Path.class)); + // Verify that markCompleted was called for each successfully processed file + Mockito.verify(fileTracker, Mockito.times(5)).markCompleted(Mockito.any(Path.class)); - // Verify that markFailed was called once ONLY for failed files - String failedPrefix1 = extractPrefix(allInProgressFiles.get(1).getName()); - Mockito.verify(fileTracker, Mockito.times(1)) - .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix1))); - String failedPrefix3 = extractPrefix(allInProgressFiles.get(3).getName()); - Mockito.verify(fileTracker, Mockito.times(1)) - .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix3))); + // Verify that markCompleted was called for 2 intermittent failed processed file + Mockito.verify(fileTracker, Mockito.times(2)).markFailed(Mockito.any(Path.class)); - // Verify that markFailed was NOT called for files processed successfully in first iteration - String successPrefix0 = extractPrefix(allInProgressFiles.get(0).getName()); - Mockito.verify(fileTracker, Mockito.never()) - .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix0))); - String successPrefix2 = extractPrefix(allInProgressFiles.get(2).getName()); - Mockito.verify(fileTracker, Mockito.never()) - .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix2))); - String successPrefix4 = extractPrefix(allInProgressFiles.get(4).getName()); - Mockito.verify(fileTracker, Mockito.never()) - .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix4))); + // Verify that markFailed was called once ONLY for failed files + String failedPrefix1 = extractPrefix(allInProgressFiles.get(1).getName()); + Mockito.verify(fileTracker, Mockito.times(1)) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix1))); + String failedPrefix3 = extractPrefix(allInProgressFiles.get(3).getName()); + Mockito.verify(fileTracker, Mockito.times(1)) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix3))); - // Verify that markCompleted was called for each successfully processed file with correct paths - for (Path expectedFile : allInProgressFiles) { - String expectedPrefix = extractPrefix(expectedFile.getName()); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( - Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + // Verify that markFailed was NOT called for files processed successfully in first iteration + String successPrefix0 = extractPrefix(allInProgressFiles.get(0).getName()); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix0))); + String successPrefix2 = extractPrefix(allInProgressFiles.get(2).getName()); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix2))); + String successPrefix4 = extractPrefix(allInProgressFiles.get(4).getName()); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix4))); + + // Verify that markCompleted was called for each successfully processed file with correct + // paths + for (Path expectedFile : allInProgressFiles) { + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } + } finally { + EnvironmentEdgeManager.reset(); } } @@ -1831,6 +1843,246 @@ public void testInit_NoInProgressFiles_NoNewFiles() throws IOException { } } + /** + * Tests that files older than the configured minimum age are processed immediately without any + * sleep. All files have timestamps well before the threshold. + */ + @Test + public void testProcessNewFilesForRoundWithOldFiles() throws IOException { + long roundStart = 1704153600000L; + long roundEnd = 1704153660000L; + ReplicationRound replicationRound = new ReplicationRound(roundStart, roundEnd); + + // Inject a clock that makes all files old enough (current time = roundStart + 120s) + long currentTime = roundStart + 120000L; + EnvironmentEdgeManager.injectEdge(() -> currentTime); + + try { + List newFiles = createNewFilesForRound(replicationRound, 3); + discovery.processNewFilesForRound(replicationRound); + + List processedFiles = discovery.getProcessedFiles(); + assertEquals("All old files should be processed", 3, processedFiles.size()); + + // Verify markInProgress was called for each file by prefix + Mockito.verify(fileTracker, Mockito.times(3)).markInProgress(Mockito.any(Path.class)); + for (Path expectedFile : newFiles) { + Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(Mockito + .argThat(path -> path.getName().startsWith(expectedFile.getName().split("\\.")[0]))); + } + + // Verify markCompleted was called for each file by prefix + Mockito.verify(fileTracker, Mockito.times(3)).markCompleted(Mockito.any(Path.class)); + for (Path expectedFile : newFiles) { + String expectedPrefix = + expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf(".")); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } + + // Verify waitForFileAge was never called since all files were old enough + assertEquals("Should never sleep when all files are old enough", 0, + discovery.getSleepDurations().size()); + } finally { + EnvironmentEdgeManager.reset(); + } + } + + /** + * Tests that when a mix of old and young files exist, only the old file is processed first. The + * young file is deferred and processed only after waitForFileAge advances the clock past the + * threshold. Verifies processing order and exact sleep duration. + */ + @Test + public void testProcessNewFilesForRoundWithMixedAgeFiles() throws IOException { + long roundStart = 1704153600000L; + long roundEnd = 1704153660000L; + long minAgeMs = 69000L; // 60s + 15% + ReplicationRound replicationRound = new ReplicationRound(roundStart, roundEnd); + + // Create files at roundStart (old) and roundStart + 50s (young at first) + ReplicationShardDirectoryManager shardManager = + fileTracker.getReplicationShardDirectoryManager(); + Path shardPath = shardManager.getShardDirectory(roundStart); + localFs.mkdirs(shardPath); + + String oldFilePrefix = roundStart + "_rs-0"; + Path oldFile = new Path(shardPath, oldFilePrefix + ".plog"); + localFs.create(oldFile, true).close(); + long youngFileTimestamp = roundStart + 50000L; + String youngFilePrefix = youngFileTimestamp + "_rs-1"; + Path youngFile = new Path(shardPath, youngFilePrefix + ".plog"); + localFs.create(youngFile, true).close(); + + // Clock starts at roundStart + 80s: + // oldFile age = 80s >= 69s (eligible) + // youngFile age = 30s < 69s (too young) + AtomicLong clock = new AtomicLong(roundStart + 80000L); + EnvironmentEdgeManager.injectEdge(clock::get); + // waitForFileAge will advance the clock by the sleep duration + discovery.setTestClock(clock); + + try { + discovery.processNewFilesForRound(replicationRound); + + List processedFiles = discovery.getProcessedFiles(); + assertEquals("Both files should be processed", 2, processedFiles.size()); + + // Verify ordering: old file processed first, young file second + assertEquals("Old file should be processed first", oldFilePrefix, + extractPrefix(processedFiles.get(0).getName())); + assertEquals("Young file should be processed second", youngFilePrefix, + extractPrefix(processedFiles.get(1).getName())); + + // Verify waitForFileAge was called exactly once with the correct duration + // Expected: youngFileTimestamp + minAge - clockAtSleepTime + // clockAtSleepTime = roundStart + 80000 (old file was processed but clock doesn't advance) + // sleepTime = (roundStart + 50000) + 69000 - (roundStart + 80000) = 39000ms + List sleepDurations = discovery.getSleepDurations(); + assertEquals("Should have slept exactly once", 1, sleepDurations.size()); + long expectedSleepTime = youngFileTimestamp + minAgeMs - (roundStart + 80000L); + assertEquals("Sleep duration should be exactly the time needed for young file to age", + expectedSleepTime, sleepDurations.get(0).longValue()); + + // Verify markInProgress was called for each file by prefix + Mockito.verify(fileTracker, Mockito.times(2)).markInProgress(Mockito.any(Path.class)); + Mockito.verify(fileTracker, Mockito.times(1)) + .markInProgress(Mockito.argThat(path -> path.getName().startsWith(oldFilePrefix))); + Mockito.verify(fileTracker, Mockito.times(1)) + .markInProgress(Mockito.argThat(path -> path.getName().startsWith(youngFilePrefix))); + + // Verify markCompleted was called for each file by prefix + Mockito.verify(fileTracker, Mockito.times(2)).markCompleted(Mockito.any(Path.class)); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(oldFilePrefix))); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(youngFilePrefix))); + } finally { + EnvironmentEdgeManager.reset(); + } + } + + /** + * Tests that when all files are too young, they are processed in batches as they age past the + * threshold via waitForFileAge advancing the clock. Three files with staggered timestamps: file0 + * (earliest) becomes eligible after the first sleep, file1 and file2 (same later timestamp) + * become eligible together after the second sleep. Verifies exact sleep durations. + */ + @Test + public void testProcessNewFilesForRoundAllTooYoung() throws IOException { + long roundStart = 1704153600000L; + long roundEnd = 1704153660000L; + long minAgeMs = 69000L; // 60s + 15% + ReplicationRound replicationRound = new ReplicationRound(roundStart, roundEnd); + + // Create 3 files: file0 at roundStart, file1 and file2 at roundStart + 5000 + ReplicationShardDirectoryManager shardManager = + fileTracker.getReplicationShardDirectoryManager(); + Path shardPath = shardManager.getShardDirectory(roundStart); + localFs.mkdirs(shardPath); + + long laterTimestamp = roundStart + 5000L; + String file0Prefix = roundStart + "_rs-0"; + Path file0 = new Path(shardPath, file0Prefix + ".plog"); + localFs.create(file0, true).close(); + String file1Prefix = laterTimestamp + "_rs-1"; + Path file1 = new Path(shardPath, file1Prefix + ".plog"); + localFs.create(file1, true).close(); + String file2Prefix = laterTimestamp + "_rs-2"; + Path file2 = new Path(shardPath, file2Prefix + ".plog"); + localFs.create(file2, true).close(); + List newFiles = Arrays.asList(file0, file1, file2); + + // Clock at roundStart + 10s: all files too young + // file0 age = 10s, file1/file2 age = 5s — all < 69s + AtomicLong clock = new AtomicLong(roundStart + 10000L); + EnvironmentEdgeManager.injectEdge(clock::get); + discovery.setTestClock(clock); + + try { + discovery.processNewFilesForRound(replicationRound); + + List processedFiles = discovery.getProcessedFiles(); + assertEquals("All 3 files should be processed after aging", 3, processedFiles.size()); + + // Sleep 1: based on file0 (oldest young file) + // sleepTime = roundStart + 69000 - (roundStart + 10000) = 59000ms + // clock advances to roundStart + 69000 + // file0 age = 69000 >= 69000 (eligible), file1/file2 age = 64000 < 69000 (still young) + // After processing file0: + // Sleep 2: based on file1/file2 (oldest young = laterTimestamp) + // sleepTime = (roundStart + 5000) + 69000 - (roundStart + 69000) = 5000ms + // clock advances to roundStart + 74000 + // file1/file2 age = 69000 >= 69000 (eligible) + List sleepDurations = discovery.getSleepDurations(); + assertEquals("Should have slept exactly twice", 2, sleepDurations.size()); + long expectedSleep1 = roundStart + minAgeMs - (roundStart + 10000L); // 59000 + assertEquals("First sleep should be based on the oldest young file (file0)", expectedSleep1, + sleepDurations.get(0).longValue()); + long expectedSleep2 = laterTimestamp + minAgeMs - (roundStart + 69000L); // 5000 + assertEquals("Second sleep should be based on file1/file2 timestamp", expectedSleep2, + sleepDurations.get(1).longValue()); + + Mockito.verify(fileTracker, Mockito.times(3)).markInProgress(Mockito.any(Path.class)); + for (Path expectedFile : newFiles) { + Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(Mockito + .argThat(path -> path.getName().startsWith(expectedFile.getName().split("\\.")[0]))); + } + + Mockito.verify(fileTracker, Mockito.times(3)).markCompleted(Mockito.any(Path.class)); + for (Path expectedFile : newFiles) { + String expectedPrefix = + expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf(".")); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } + } finally { + EnvironmentEdgeManager.reset(); + } + } + + /** + * Tests that the new file age buffer percentage configuration is respected. + */ + @Test + public void testNewFileMinAgeConfiguration() { + // Default: 15% + conf.setDouble(ReplicationLogDiscovery.REPLICATION_NEW_FILE_AGE_BUFFER_PERCENTAGE_KEY, 15.0); + long expectedMinAge = 60000L + (long) (60000L * 15.0 / 100.0); // 69000ms + assertEquals("Min age should be roundTime + 15%", expectedMinAge, + discovery.getNewFileMinAgeMs()); + + // Custom: 50% + conf.setDouble(ReplicationLogDiscovery.REPLICATION_NEW_FILE_AGE_BUFFER_PERCENTAGE_KEY, 50.0); + long expectedMinAge50 = 60000L + (long) (60000L * 50.0 / 100.0); // 90000ms + assertEquals("Min age should be roundTime + 50%", expectedMinAge50, + discovery.getNewFileMinAgeMs()); + + // Zero: disabled + conf.setDouble(ReplicationLogDiscovery.REPLICATION_NEW_FILE_AGE_BUFFER_PERCENTAGE_KEY, 0.0); + assertEquals("Min age should be roundTime when buffer is 0%", 60000L, + discovery.getNewFileMinAgeMs()); + } + + /** + * Tests that processNewFilesForRound completes without error when no files exist for the round. + */ + @Test + public void testProcessNewFilesForRoundNoFiles() throws IOException { + long roundStart = 1704153600000L; + long roundEnd = 1704153660000L; + ReplicationRound replicationRound = new ReplicationRound(roundStart, roundEnd); + + // Don't create any files - the round directory doesn't exist + discovery.processNewFilesForRound(replicationRound); + + List processedFiles = discovery.getProcessedFiles(); + assertEquals("No files should be processed when no files exist", 0, processedFiles.size()); + Mockito.verify(fileTracker, Mockito.never()).markInProgress(Mockito.any(Path.class)); + Mockito.verify(fileTracker, Mockito.never()).markCompleted(Mockito.any(Path.class)); + assertEquals("Should never sleep when no files exist", 0, discovery.getSleepDurations().size()); + } + private List createNewFilesForRound(ReplicationRound replicationRound, int fileCount) throws IOException { // Create files for multiple rounds in the same shard with each file at gap of 2 seconds @@ -2211,6 +2463,8 @@ public TestableReplicationLogTracker(final Configuration conf, final String haGr private static class TestableReplicationLogDiscovery extends ReplicationLogDiscovery { private final List processedFiles = new ArrayList<>(); private final List processedRounds = new ArrayList<>(); + private final List sleepDurations = new ArrayList<>(); + private AtomicLong testClock; public TestableReplicationLogDiscovery(ReplicationLogTracker fileTracker) { super(fileTracker); @@ -2275,5 +2529,22 @@ protected boolean shouldProcessInProgressDirectory() { public void setMockShouldProcessInProgressDirectory(boolean value) { this.mockShouldProcessInProgressDirectory = value; } + + @Override + public void waitForFileAge(long sleepTimeMs) { + sleepDurations.add(sleepTimeMs); + if (testClock != null) { + testClock.addAndGet(sleepTimeMs); + } + } + + public void setTestClock(AtomicLong clock) { + this.testClock = clock; + } + + public List getSleepDurations() { + return new ArrayList<>(sleepDurations); + } + } }