diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 828485782c82..a79d210753b0 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -291,7 +291,9 @@ extern int64_t tsmaDataDeleteMark; // wal extern int64_t tsWalFsyncDataSizeLimit; extern bool tsWalForceRepair; -extern bool tsWalDeleteOnCorruption; +// WAL recovery policy (only affects single replica) +// 0 = refuse to start (default), 1 = delete corrupted and start +extern int32_t tsWalRecoveryPolicy; // internal extern bool tsDiskIDCheckEnabled; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 6ca3d7d3bcbe..4e32ca3a70bd 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -303,6 +303,8 @@ const char* syncStr(ESyncState state); int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg); +int32_t syncNotifyWalTruncated(int32_t vgId, int64_t truncatedVer); + // util int32_t syncSnapInfoDataRealloc(SSnapshot* pSnap, int32_t size); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 209dc18c9866..49a0ef6a3ae2 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -166,7 +166,7 @@ int32_t walInit(stopDnodeFn stopDnode); void walCleanUp(); // handle open and ctl -SWal *walOpen(const char *path, SWalCfg *pCfg); +SWal *walOpen(const char *path, SWalCfg *pCfg, int32_t replica); int32_t walAlter(SWal *, SWalCfg *pCfg); int32_t walPersist(SWal *); void walClose(SWal *); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 8f9a625479a0..3c6a94787062 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -356,7 +356,7 @@ bool tsStartUdfd = true; // wal int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L); bool tsWalForceRepair = 0; -bool tsWalDeleteOnCorruption = false; +int32_t tsWalRecoveryPolicy = 0; // Default: refuse to start for single replica // ttl bool tsTtlChangeOnWrite = false; // if true, ttl delete time changes on last write @@ -981,7 +981,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "syncApplyQueueSize", tsSyncApplyQueueSize, 32, 2048, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncRoutineReportInterval", tsRoutineReportInterval, 5, 600, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "syncLogHeartbeat", tsSyncLogHeartbeat, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL)); - TAOS_CHECK_RETURN(cfgAddBool(pCfg, "walDeleteOnCorruption", tsWalDeleteOnCorruption, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "walRecoveryPolicy", tsWalRecoveryPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_NONE, CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncTimeout", tsSyncTimeout, 0, 60 * 24 * 2 * 1000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); @@ -2143,8 +2143,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsRpcRecvLogThreshold = pItem->i32; // GRANT_CFG_GET; - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "walDeleteOnCorruption"); - tsWalDeleteOnCorruption = pItem->bval; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "walRecoveryPolicy"); + tsWalRecoveryPolicy = pItem->i32; TAOS_RETURN(TSDB_CODE_SUCCESS); } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 8a8442947e5f..ab880e423014 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -632,7 +632,7 @@ static int32_t mndInitWal(SMnode *pMnode) { } #endif - pMnode->pWal = walOpen(path, &cfg); + pMnode->pWal = walOpen(path, &cfg, pMnode->syncMgmt.numOfReplicas); if (pMnode->pWal == NULL) { code = TSDB_CODE_MND_RETURN_VALUE_NULL; if (terrno != 0) code = terrno; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index c8f792eadf33..189f830897e3 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -490,7 +490,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC TAOS_UNUSED(ret); vInfo("vgId:%d, start to open vnode wal", TD_VID(pVnode)); - pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg)); + pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg), pVnode->config.syncCfg.replicaNum); if (pVnode->pWal == NULL) { vError("vgId:%d, failed to open vnode wal since %s. wal:%s", TD_VID(pVnode), tstrerror(terrno), tdir); goto _err; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 0a111fd7aff2..605fa5a326ba 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -4089,4 +4089,4 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { return true; } -#endif +#endif \ No newline at end of file diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 624e7c9bbff4..12c3f6520649 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -168,7 +168,7 @@ int32_t walRemoveMeta(SWal* pWal); int32_t walRollImpl(SWal* pWal); int32_t walRollFileInfo(SWal* pWal); int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t* lastVer); -int32_t walCheckAndRepairMeta(SWal* pWal); +int32_t walCheckAndRepairMeta(SWal* pWal, int32_t replica); int64_t walChangeWrite(SWal* pWal, int64_t ver); int32_t walCheckAndRepairIdx(SWal* pWal); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 5287dc02435e..8580abae447c 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -431,7 +431,75 @@ static int32_t walRenameCorruptedDir(SWal* pWal) { TAOS_RETURN(code); } -static int32_t walLogEntriesComplete(SWal* pWal) { +// walTrimIdxFile + walScanLogGetLastVer already truncate the corrupted file itself. +// This function only needs to delete subsequent files that are entirely beyond the corruption point. +static int32_t walTruncateCorruptedFiles(SWal* pWal, int32_t fileIdx, int32_t replica) { + int32_t code = TSDB_CODE_SUCCESS; + bool shouldRecover = false; + int32_t sz = taosArrayGetSize(pWal->fileInfoSet); + + if (fileIdx < 0 || fileIdx >= sz) { + wError("vgId:%d, walTruncateCorruptedFiles called with invalid fileIdx:%d (sz:%d)", pWal->cfg.vgId, fileIdx, sz); + TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); + } + + SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); + if (replica >= 3) { + shouldRecover = true; + wInfo("vgId:%d, WAL corrupted at ver:%" PRId64 ", auto-recovery enabled for replica=%d", + pWal->cfg.vgId, pFileInfo->firstVer, replica); + } else { + shouldRecover = (tsWalRecoveryPolicy == 1); + if (shouldRecover) { + wWarn("vgId:%d, WAL corrupted at ver:%" PRId64 ", force recovery enabled by walRecoveryPolicy=1", + pWal->cfg.vgId, pFileInfo->firstVer); + } else { + wError("vgId:%d, WAL corrupted at ver:%" PRId64 ", refusing to start to prevent data loss", + pWal->cfg.vgId, pFileInfo->firstVer); + wError("vgId:%d, to force recovery with data loss, set 'walRecoveryPolicy 1' in taos.cfg and restart", + pWal->cfg.vgId); + TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); + } + } + + // Delete all files after the corrupted file (the corrupted file itself is already truncated + // by walTrimIdxFile + walScanLogGetLastVer before this function is called) + int32_t deleteFrom = fileIdx + 1; + wInfo("vgId:%d, removing WAL files from index %d onwards", pWal->cfg.vgId, deleteFrom); + + // Delete all files from fileIdx onwards + for (int32_t i = deleteFrom; i < taosArrayGetSize(pWal->fileInfoSet); i++) { + SWalFileInfo* pDelFileInfo = taosArrayGet(pWal->fileInfoSet, i); + char delLogName[WAL_FILE_LEN]; + char delIdxName[WAL_FILE_LEN]; + + walBuildLogName(pWal, pDelFileInfo->firstVer, delLogName); + walBuildIdxName(pWal, pDelFileInfo->firstVer, delIdxName); + + if (taosRemoveFile(delLogName) != 0) { + wWarn("vgId:%d, failed to remove corrupted log file %s", pWal->cfg.vgId, delLogName); + } else { + wInfo("vgId:%d, removed corrupted log file %s", pWal->cfg.vgId, delLogName); + } + + if (taosRemoveFile(delIdxName) != 0) { + wWarn("vgId:%d, failed to remove corrupted idx file %s", pWal->cfg.vgId, delIdxName); + } else { + wInfo("vgId:%d, removed corrupted idx file %s", pWal->cfg.vgId, delIdxName); + } + } + + // Remove deleted files from fileInfoSet + if (deleteFrom < taosArrayGetSize(pWal->fileInfoSet)) { + taosArrayRemoveBatch(pWal->fileInfoSet, deleteFrom, taosArrayGetSize(pWal->fileInfoSet) - deleteFrom, NULL); + } + + wInfo("vgId:%d, WAL truncated successfully", pWal->cfg.vgId); + + TAOS_RETURN(TSDB_CODE_SUCCESS); +} + +static int32_t walLogEntriesComplete(SWal* pWal, int32_t replica) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); bool complete = true; int32_t fileIdx = -1; @@ -451,13 +519,18 @@ static int32_t walLogEntriesComplete(SWal* pWal) { if (!complete) { wError("vgId:%d, WAL log entries incomplete in range [%" PRId64 ", %" PRId64 "], index:%" PRId64 - ", snaphot index:%" PRId64, - pWal->cfg.vgId, pWal->vers.firstVer, pWal->vers.lastVer, index, pWal->vers.snapshotVer); - if (tsWalDeleteOnCorruption) { - TAOS_RETURN(walRenameCorruptedDir(pWal)); - } else { - TAOS_RETURN(TSDB_CODE_WAL_LOG_INCOMPLETE); + ", snaphot index:%" PRId64 ", fileIdx:%d", + pWal->cfg.vgId, pWal->vers.firstVer, pWal->vers.lastVer, index, pWal->vers.snapshotVer, fileIdx); + if (fileIdx >= sz) { + // All file firstVers are consecutive but vers.lastVer doesn't match the last file's lastVer. + // There is no specific corrupted file — this is a meta-only inconsistency. Correct vers.lastVer + // in-place; the caller is responsible for persisting the corrected meta. + wWarn("vgId:%d, WAL vers.lastVer mismatch: recorded %" PRId64 " actual %" PRId64 ", correcting meta", + pWal->cfg.vgId, pWal->vers.lastVer, index); + pWal->vers.lastVer = index; + TAOS_RETURN(TSDB_CODE_SUCCESS); } + TAOS_RETURN(walTruncateCorruptedFiles(pWal, fileIdx, replica)); } else { TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -517,7 +590,7 @@ void walRegfree(regex_t* ptr) { regfree(ptr); } -int32_t walCheckAndRepairMeta(SWal* pWal) { +int32_t walCheckAndRepairMeta(SWal* pWal, int32_t replica) { // load log files, get first/snapshot/last version info int32_t code = 0; int32_t lino = 0; @@ -607,27 +680,44 @@ int32_t walCheckAndRepairMeta(SWal* pWal) { if (lastVer < 0) { if (code != TSDB_CODE_WAL_LOG_NOT_EXIST) { wError("vgId:%d, failed to scan wal last index since %s", pWal->cfg.vgId, tstrerror(code)); - if (tsWalDeleteOnCorruption) { - TAOS_RETURN(walRenameCorruptedDir(pWal)); + + code = walTruncateCorruptedFiles(pWal, fileIdx, replica); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; } - goto _exit; + // Files from fileIdx+1 onwards have been removed from fileInfoSet. + // pFileInfo is now stale (fileSize/lastVer no longer reflect reality), and totSize + // may already contain sizes of deleted files from earlier loop iterations. + // Break here; the post-loop code will recompute totals from the remaining fileInfoSet. + updateMeta = true; + code = TSDB_CODE_SUCCESS; + break; + } else { + // empty log file + lastVer = pFileInfo->firstVer - 1; + code = TSDB_CODE_SUCCESS; } - // empty log file - lastVer = pFileInfo->firstVer - 1; - - code = TSDB_CODE_SUCCESS; } - wInfo("vgId:%d, repaired file %s, last index:%" PRId64 ", fileSize:%" PRId64 ", fileSize in meta:%" PRId64, - pWal->cfg.vgId, fnameStr, lastVer, fileSize, pFileInfo->fileSize); - // update lastVer - pFileInfo->lastVer = lastVer; - totSize += pFileInfo->fileSize; + if (code == TSDB_CODE_SUCCESS && lastVer >= 0) { + wInfo("vgId:%d, repaired file %s, last index:%" PRId64 ", fileSize:%" PRId64 ", fileSize in meta:%" PRId64, + pWal->cfg.vgId, fnameStr, lastVer, fileSize, pFileInfo->fileSize); + + // update lastVer + pFileInfo->lastVer = lastVer; + totSize += pFileInfo->fileSize; + } } // reset vers info and so on actualFileNum = taosArrayGetSize(pWal->fileInfoSet); pWal->writeCur = actualFileNum - 1; + // Recompute totSize from the current fileInfoSet: truncation inside the loop may have + // removed entries whose sizes were already accumulated into the running totSize. + totSize = 0; + for (int32_t i = 0; i < actualFileNum; i++) { + totSize += ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, i))->fileSize; + } pWal->totSize = totSize; pWal->vers.lastVer = -1; if (actualFileNum > 0) { @@ -647,7 +737,7 @@ int32_t walCheckAndRepairMeta(SWal* pWal) { TAOS_CHECK_EXIT(walSaveMeta(pWal)); } - TAOS_CHECK_EXIT(walLogEntriesComplete(pWal)); + TAOS_CHECK_EXIT(walLogEntriesComplete(pWal, replica)); wInfo("vgId:%d, success to repair meta, wal path:%s, first index:%" PRId64 ", last index:%" PRId64 ", snapshot index:%" PRId64, diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index e1a523aeefce..1f84d28fc160 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -129,7 +129,7 @@ int32_t walInitWriteFileForSkip(SWal *pWal) { TAOS_RETURN(code); } -SWal *walOpen(const char *path, SWalCfg *pCfg) { +SWal *walOpen(const char *path, SWalCfg *pCfg, int32_t replica) { int32_t code = 0; SWal *pWal = taosMemoryCalloc(1, sizeof(SWal)); if (pWal == NULL) { @@ -206,7 +206,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { wWarn("vgId:%d, failed to load meta, code:0x%x", pWal->cfg.vgId, code); } if (pWal->cfg.level != TAOS_WAL_SKIP) { - code = walCheckAndRepairMeta(pWal); + code = walCheckAndRepairMeta(pWal, replica); if (code < 0) { wError("vgId:%d, cannot open wal since repair meta file failed since %s", pWal->cfg.vgId, tstrerror(code)); goto _err; diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 10de5ccf2634..35ac6956c65e 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -31,7 +31,7 @@ class WalCleanEnv : public ::testing::Test { pCfg->retentionPeriod = 0; pCfg->retentionSize = 0; pCfg->level = TAOS_WAL_FSYNC; - pWal = walOpen(pathName, pCfg); + pWal = walOpen(pathName, pCfg, 1); taosMemoryFree(pCfg); ASSERT(pWal != NULL); } @@ -61,7 +61,7 @@ class WalCleanDeleteEnv : public ::testing::Test { pCfg->retentionPeriod = 0; pCfg->retentionSize = 0; pCfg->level = TAOS_WAL_FSYNC; - pWal = walOpen(pathName, pCfg); + pWal = walOpen(pathName, pCfg, 1); taosMemoryFree(pCfg); ASSERT(pWal != NULL); } @@ -98,7 +98,7 @@ class WalKeepEnv : public ::testing::Test { pCfg->retentionPeriod = 0; pCfg->retentionSize = 0; pCfg->level = TAOS_WAL_FSYNC; - pWal = walOpen(pathName, pCfg); + pWal = walOpen(pathName, pCfg, 1); taosMemoryFree(pCfg); ASSERT(pWal != NULL); } @@ -137,7 +137,7 @@ class WalRetentionEnv : public ::testing::Test { cfg.rollPeriod = 0; cfg.vgId = 0; cfg.level = TAOS_WAL_FSYNC; - pWal = walOpen(pathName, &cfg); + pWal = walOpen(pathName, &cfg, 1); ASSERT(pWal != NULL); } @@ -175,7 +175,7 @@ class WalSkipLevel : public ::testing::Test { cfg.rollPeriod = 0; cfg.vgId = 1; cfg.level = TAOS_WAL_SKIP; - pWal = walOpen(pathName, &cfg); + pWal = walOpen(pathName, &cfg, 1); ASSERT(pWal != NULL); } @@ -214,7 +214,7 @@ class WalEncrypted : public ::testing::Test { cfg.vgId = 0; cfg.level = TAOS_WAL_FSYNC; cfg.encryptAlgorithm = 1; - pWal = walOpen(pathName, &cfg); + pWal = walOpen(pathName, &cfg, 1); ASSERT(pWal != NULL); } @@ -689,7 +689,7 @@ TEST_F(WalCleanEnv, walRepairLogFileTs2) { SWalFileInfo* pFileInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 2); pFileInfo->closeTs = -1; - code = walCheckAndRepairMeta(pWal); + code = walCheckAndRepairMeta(pWal,1); ASSERT_EQ(code, 0); } @@ -947,253 +947,256 @@ TEST_F(WalKeepEnv, walSetKeepVersionConcurrent) { ASSERT_LT(finalVersion, numThreads * callsPerThread); } -// Test walRenameCorruptedDir: delete first wal log file -TEST_F(WalRetentionEnv, corruptedDirDeleteFirstFile) { - walResetEnv(); - int code; - - // Enable walDeleteOnCorruption - bool oldVal = tsWalDeleteOnCorruption; - tsWalDeleteOnCorruption = true; - - // Write logs to create multiple wal files - for (int i = 0; i < 100; i++) { - char newStr[100]; - sprintf(newStr, "%s-%d", ranStr, i); - int len = strlen(newStr); - code = walAppendLog(pWal, i, 0, syncMeta, newStr, len, NULL); - ASSERT_EQ(code, 0); - } - - // Roll to create second file - code = walRollImpl(pWal); - ASSERT_EQ(code, 0); - - for (int i = 100; i < 200; i++) { - char newStr[100]; - sprintf(newStr, "%s-%d", ranStr, i); - int len = strlen(newStr); - code = walAppendLog(pWal, i, 0, syncMeta, newStr, len, NULL); - ASSERT_EQ(code, 0); +// WAL Recovery Policy Tests +class WalRecoveryPolicy : public ::testing::Test { + protected: + static void SetUpTestCase() { + int code = walInit(NULL); + ASSERT(code == 0); } - - // Roll to create third file - code = walRollImpl(pWal); - ASSERT_EQ(code, 0); - - for (int i = 200; i < 300; i++) { - char newStr[100]; - sprintf(newStr, "%s-%d", ranStr, i); - int len = strlen(newStr); - code = walAppendLog(pWal, i, 0, syncMeta, newStr, len, NULL); - ASSERT_EQ(code, 0); + + static void TearDownTestCase() { walCleanUp(); } + + void SetUp() override { + taosRemoveDir(pathName); + SWalCfg cfg; + cfg.rollPeriod = -1; + cfg.segSize = -1; + cfg.committed = -1; + cfg.retentionPeriod = -1; + cfg.retentionSize = 0; + cfg.rollPeriod = 0; + cfg.vgId = 1; + cfg.level = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, &cfg, 1); // Single replica + ASSERT(pWal != NULL); } - - TearDown(); - - // List and find the first log file in the WAL directory - TdDirPtr pDir = taosOpenDir(pathName); - ASSERT_NE(pDir, nullptr); - - char firstLogFile[256] = {0}; - TdDirEntryPtr pDirEntry; - while ((pDirEntry = taosReadDir(pDir)) != NULL) { - char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry)); - if (strstr(name, ".log") != NULL) { - if (firstLogFile[0] == 0 || strcmp(name, firstLogFile) < 0) { - strncpy(firstLogFile, name, sizeof(firstLogFile) - 1); - } + + void TearDown() override { + if (pWal) { + walClose(pWal); + pWal = NULL; } + taosRemoveDir(pathName); } - taosCloseDir(&pDir); - - ASSERT_NE(firstLogFile[0], 0); - char logFile[256]; - snprintf(logFile, sizeof(logFile), "%s" TD_DIRSEP "%s", pathName, firstLogFile); - printf("Deleting first log file: %s\n", logFile); - taosRemoveFile(logFile); - - // Wait for file system to complete the operation (especially important on Windows) - taosMsleep(100); - - // Re-open should trigger walRenameCorruptedDir - SetUp(); - - // After rename, WAL should be recreated and empty - ASSERT_NE(pWal, nullptr); - ASSERT_EQ(pWal->vers.firstVer, 100); - ASSERT_EQ(pWal->vers.lastVer, 299); - - // Verify old directory was renamed - char corruptedPath[300]; - sprintf(corruptedPath, "%s.corrupted", pathName); - // The actual path will have a timestamp, so we just verify new dir exists and is empty - - // Restore old value - tsWalDeleteOnCorruption = oldVal; -} -#ifndef WINDOWS -// Test walRenameCorruptedDir: delete middle wal log file -TEST_F(WalRetentionEnv, corruptedDirDeleteMiddleFile) { - walResetEnv(); - int code; - - bool oldVal = tsWalDeleteOnCorruption; - tsWalDeleteOnCorruption = true; + void corruptWalFile() { + int32_t code; + int ver = 0; - // Write logs to create multiple wal files - for (int i = 0; i < 100; i++) { - char newStr[100]; - sprintf(newStr, "%s-%d", ranStr, i); - int len = strlen(newStr); - code = walAppendLog(pWal, i, 0, syncMeta, newStr, len, NULL); + // Batch 1: write 30 entries, then roll to a new file + for (int i = 0; i < 30; i++, ver++) { + int bodyLen = sprintf(body, "test%d", ver); + code = walAppendLog(pWal, ver, 0, syncMeta, body, bodyLen, NULL); + ASSERT_EQ(code, 0); + } + // Large logRetention ensures no old files are deleted during snapshot + code = walBeginSnapshot(pWal, ver - 1, 1000000); ASSERT_EQ(code, 0); - } - - code = walRollImpl(pWal); - ASSERT_EQ(code, 0); - - for (int i = 100; i < 200; i++) { - char newStr[100]; - sprintf(newStr, "%s-%d", ranStr, i); - int len = strlen(newStr); - code = walAppendLog(pWal, i, 0, syncMeta, newStr, len, NULL); + code = walEndSnapshot(pWal, false); ASSERT_EQ(code, 0); - } - - code = walRollImpl(pWal); - ASSERT_EQ(code, 0); - - for (int i = 200; i < 300; i++) { - char newStr[100]; - sprintf(newStr, "%s-%d", ranStr, i); - int len = strlen(newStr); - code = walAppendLog(pWal, i, 0, syncMeta, newStr, len, NULL); + + // Batch 2: write 30 entries into the second (middle) file + for (int i = 0; i < 30; i++, ver++) { + int bodyLen = sprintf(body, "test%d", ver); + code = walAppendLog(pWal, ver, 0, syncMeta, body, bodyLen, NULL); + ASSERT_EQ(code, 0); + } + // Roll again to seal the middle file + code = walBeginSnapshot(pWal, ver - 1, 1000000); ASSERT_EQ(code, 0); - } - - TearDown(); - - // List all log files and find the middle one - TdDirPtr pDir = taosOpenDir(pathName); - ASSERT_NE(pDir, nullptr); - - SArray* logFiles = taosArrayInit(10, sizeof(char[256])); - TdDirEntryPtr pDirEntry; - while ((pDirEntry = taosReadDir(pDir)) != NULL) { - char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry)); - if (strstr(name, ".log") != NULL) { - char fileName[256]; - strncpy(fileName, name, sizeof(fileName) - 1); - taosArrayPush(logFiles, fileName); + code = walEndSnapshot(pWal, false); + ASSERT_EQ(code, 0); + + // Batch 3: write 30 entries into the third file + for (int i = 0; i < 30; i++, ver++) { + int bodyLen = sprintf(body, "test%d", ver); + code = walAppendLog(pWal, ver, 0, syncMeta, body, bodyLen, NULL); + ASSERT_EQ(code, 0); + } + walFsync(pWal, false); + walClose(pWal); + pWal = NULL; + + // Collect all .log files and sort them by name (name encodes firstVer) + TdDirPtr pDir = taosOpenDir(pathName); + ASSERT_NE(pDir, nullptr); + + std::vector logFiles; + TdDirEntryPtr pDirEntry; + while ((pDirEntry = taosReadDir(pDir)) != NULL) { + char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry)); + if (strstr(name, ".log") != NULL) { + logFiles.push_back(std::string(pathName) + TD_DIRSEP + name); + } } + taosCloseDir(&pDir); + + ASSERT_GE((int)logFiles.size(), 3); + std::sort(logFiles.begin(), logFiles.end()); + + // Corrupt the middle file by truncating it to 50% + const std::string& middleFile = logFiles[logFiles.size() / 2]; + int64_t fileSize = 0; + taosStatFile(middleFile.c_str(), &fileSize, NULL, NULL); + TdFilePtr pFile = taosOpenFile(middleFile.c_str(), TD_FILE_READ | TD_FILE_WRITE); + ASSERT_NE(pFile, nullptr); + taosFtruncateFile(pFile, fileSize / 2); + taosCloseFile(&pFile); } - taosCloseDir(&pDir); - - int fileCount = taosArrayGetSize(logFiles); - ASSERT_GT(fileCount, 1); - - // Sort to ensure consistent ordering - taosArraySort(logFiles, (__compar_fn_t)strcmp); - - // Delete the middle log file - char* middleFile = (char*)taosArrayGet(logFiles, fileCount / 2); - char logFile[256]; - snprintf(logFile, sizeof(logFile), "%s" TD_DIRSEP "%s", pathName, middleFile); - printf("Deleting middle log file: %s\n", logFile); - taosRemoveFile(logFile); - - taosArrayDestroy(logFiles); - - // Wait for file system to complete the operation (especially important on Windows) - taosMsleep(100); - - // Re-open should trigger walRenameCorruptedDir - SetUp(); - - // After rename, WAL should be recreated and empty + + SWal* pWal = NULL; + const char* pathName = TD_TMP_DIR_PATH "wal_recovery_test"; + char body[2048]; +}; + +TEST_F(WalRecoveryPolicy, singleReplicaRefuseToStart) { + // Corrupt WAL file + corruptWalFile(); + + // Save old value + int32_t oldPolicy = tsWalRecoveryPolicy; + tsWalRecoveryPolicy = 0; // Refuse to start + + // Try to open WAL - should fail due to corrupted file and refuse-to-start policy + SWalCfg cfg; + cfg.rollPeriod = -1; + cfg.segSize = -1; + cfg.committed = -1; + cfg.retentionPeriod = -1; + cfg.retentionSize = 0; + cfg.rollPeriod = 0; + cfg.vgId = 1; + cfg.level = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, &cfg, 1); + ASSERT_EQ(pWal, nullptr); + + // Restore + tsWalRecoveryPolicy = oldPolicy; +} + +TEST_F(WalRecoveryPolicy, singleReplicaForceRecovery) { + // Corrupt WAL file + corruptWalFile(); + + // Save old value + int32_t oldPolicy = tsWalRecoveryPolicy; + tsWalRecoveryPolicy = 1; // Force recovery + + // Open WAL - should succeed with truncation + SWalCfg cfg; + cfg.rollPeriod = -1; + cfg.segSize = -1; + cfg.committed = -1; + cfg.retentionPeriod = -1; + cfg.retentionSize = 0; + cfg.rollPeriod = 0; + cfg.vgId = 1; + cfg.level = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, &cfg, 1); ASSERT_NE(pWal, nullptr); - ASSERT_EQ(pWal->vers.firstVer, -1); - ASSERT_EQ(pWal->vers.lastVer, -1); - - tsWalDeleteOnCorruption = oldVal; + + // Verify WAL is accessible + ASSERT_GE(pWal->vers.lastVer, -1); + + // Restore + tsWalRecoveryPolicy = oldPolicy; } -#endif -// Test walRenameCorruptedDir: delete last wal log file -TEST_F(WalRetentionEnv, corruptedDirDeleteLastFile) { - walResetEnv(); - int code; - - bool oldVal = tsWalDeleteOnCorruption; - tsWalDeleteOnCorruption = true; +TEST_F(WalRecoveryPolicy, threeReplicaAutoRecovery) { + // Corrupt WAL file + corruptWalFile(); + + // Save old value + int32_t oldPolicy = tsWalRecoveryPolicy; + tsWalRecoveryPolicy = 0; // Even with refuse policy, 3 replicas should auto-recover + + // Open WAL with 3 replicas - should succeed + SWalCfg cfg; + cfg.rollPeriod = -1; + cfg.segSize = -1; + cfg.committed = -1; + cfg.retentionPeriod = -1; + cfg.retentionSize = 0; + cfg.rollPeriod = 0; + cfg.vgId = 1; + cfg.level = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, &cfg, 3); // Three replicas + ASSERT_NE(pWal, nullptr); - // Write logs to create multiple wal files - for (int i = 0; i < 100; i++) { - char newStr[100]; - sprintf(newStr, "%s-%d", ranStr, i); - int len = strlen(newStr); - code = walAppendLog(pWal, i, 0, syncMeta, newStr, len, NULL); - ASSERT_EQ(code, 0); - } - - code = walRollImpl(pWal); - ASSERT_EQ(code, 0); - - for (int i = 100; i < 200; i++) { - char newStr[100]; - sprintf(newStr, "%s-%d", ranStr, i); - int len = strlen(newStr); - code = walAppendLog(pWal, i, 0, syncMeta, newStr, len, NULL); + // Verify WAL is accessible + ASSERT_GE(pWal->vers.lastVer, -1); + + // Restore + tsWalRecoveryPolicy = oldPolicy; +} + +TEST_F(WalRecoveryPolicy, truncationPreservesValidData) { + // Write data in two batches + for (int i = 0; i < 50; i++) { + int bodyLen = sprintf(body, "batch1_%d", i); + int32_t code = walAppendLog(pWal, i, 0, syncMeta, body, bodyLen, NULL); ASSERT_EQ(code, 0); } - - code = walRollImpl(pWal); - ASSERT_EQ(code, 0); - - for (int i = 200; i < 300; i++) { - char newStr[100]; - sprintf(newStr, "%s-%d", ranStr, i); - int len = strlen(newStr); - code = walAppendLog(pWal, i, 0, syncMeta, newStr, len, NULL); + walFsync(pWal, false); + + for (int i = 50; i < 100; i++) { + int bodyLen = sprintf(body, "batch2_%d", i); + int32_t code = walAppendLog(pWal, i, 0, syncMeta, body, bodyLen, NULL); ASSERT_EQ(code, 0); } - - TearDown(); - - // List all log files and find the last one + walFsync(pWal, false); + + int64_t lastVerBeforeCorruption = pWal->vers.lastVer; + ASSERT_EQ(lastVerBeforeCorruption, 99); + + walClose(pWal); + pWal = NULL; + + // Corrupt the log file TdDirPtr pDir = taosOpenDir(pathName); ASSERT_NE(pDir, nullptr); - - char lastLogFile[256] = {0}; + + char logFile[256] = {0}; TdDirEntryPtr pDirEntry; while ((pDirEntry = taosReadDir(pDir)) != NULL) { char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry)); if (strstr(name, ".log") != NULL) { - if (lastLogFile[0] == 0 || strcmp(name, lastLogFile) > 0) { - strncpy(lastLogFile, name, sizeof(lastLogFile) - 1); - } + snprintf(logFile, sizeof(logFile), "%s" TD_DIRSEP "%s", pathName, name); + break; } } taosCloseDir(&pDir); - - ASSERT_NE(lastLogFile[0], 0); - char logFile[256]; - snprintf(logFile, sizeof(logFile), "%s" TD_DIRSEP "%s", pathName, lastLogFile); - printf("Deleting last log file: %s\n", logFile); - taosRemoveFile(logFile); - - // Wait for file system to complete the operation (especially important on Windows) - taosMsleep(100); - - // Re-open should trigger walRenameCorruptedDir - SetUp(); - - // After rename, WAL should be recreated and empty + + // Truncate to 70% to corrupt second batch + int64_t fileSize = 0; + taosStatFile(logFile, &fileSize, NULL, NULL); + TdFilePtr pFile = taosOpenFile(logFile, TD_FILE_READ | TD_FILE_WRITE); + ASSERT_NE(pFile, nullptr); + taosFtruncateFile(pFile, fileSize * 7 / 10); + taosCloseFile(&pFile); + + // Save old value + int32_t oldPolicy = tsWalRecoveryPolicy; + tsWalRecoveryPolicy = 1; // Force recovery + + // Reopen with recovery + SWalCfg cfg; + cfg.rollPeriod = -1; + cfg.segSize = -1; + cfg.committed = -1; + cfg.retentionPeriod = -1; + cfg.retentionSize = 0; + cfg.rollPeriod = 0; + cfg.vgId = 1; + cfg.level = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, &cfg, 1); ASSERT_NE(pWal, nullptr); - ASSERT_EQ(pWal->vers.firstVer, 0); - ASSERT_EQ(pWal->vers.lastVer, 199); - - tsWalDeleteOnCorruption = oldVal; -} \ No newline at end of file + + // Verify some data is preserved (at least first batch) + ASSERT_GE(pWal->vers.lastVer, 0); + ASSERT_LT(pWal->vers.lastVer, lastVerBeforeCorruption); + + // Restore + tsWalRecoveryPolicy = oldPolicy; +}