diff --git a/docs/en/14-reference/09-error-code.md b/docs/en/14-reference/09-error-code.md index c4110de17086..e89e128b27ea 100644 --- a/docs/en/14-reference/09-error-code.md +++ b/docs/en/14-reference/09-error-code.md @@ -148,6 +148,7 @@ Below are the business error codes for each module. | 0x8000023C | reached the maximum concurrency limit | reached the maximum concurrency limit | Check user parameter | | 0x8000023D | reached the maximum call vnode limit | reached the maximum call vnode limit | Check user parameter | | 0x8000023E | Invalid token | Invalid token format | Check and enter the correct token | +| 0x8000023F | Instance register/list API rate limit exceeded | Too many taos_register_instance or taos_list_instances calls in the rate-limit window | Reduce call frequency and retry later; register and list share the same limit | | 0x800002FF | Tsc internal error | TSC internal error | Preserve the scene and logs, report issue on GitHub | #### mnode diff --git a/docs/zh/14-reference/09-error-code.md b/docs/zh/14-reference/09-error-code.md index 7a6ba5d8669c..000b66e64222 100644 --- a/docs/zh/14-reference/09-error-code.md +++ b/docs/zh/14-reference/09-error-code.md @@ -147,6 +147,7 @@ TSDB 错误码包括 taosc 客户端和服务端,所有语言的连接器无 | 0x8000023C | reached the maximum concurrency limit | 单个用户超过了最大并发限制 | 检查参数 | | 0x8000023D | reached the maximum call vnode limit | 单条 SQL 涉及到太多 VNODE | 检查 SQL | | 0x8000023E | Invalid token | 令牌格式错误 | 检查并重新输入正确的令牌 | +| 0x8000023F | Instance register/list API rate limit exceeded | 实例注册/列表 API 调用过于频繁,超过限流 | 降低调用频率或稍后重试;注册与列表共享同一限流窗口 | | 0x800002FF | Tsc internal error | TSC 内部错误 | 保留现场和日志,github 上报 issue | #### mnode diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 62dd7670a6a9..c574204ac8f6 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -235,6 +235,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TSC_SESS_MAX_CONCURRENCY_LIMIT TAOS_DEF_ERROR_CODE(0, 0x023C) #define TSDB_CODE_TSC_SESS_MAX_CALL_VNODE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x023D) #define TSDB_CODE_TSC_INVALID_TOKEN TAOS_DEF_ERROR_CODE(0, 0x023E) +#define TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x023F) #define TSDB_CODE_TSC_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x02FF) // mnode-common diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index beeb3b82b05c..b8430b929a38 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -42,6 +42,8 @@ extern void shellStopDaemon(); #endif +static void instanceRpcGlobalCleanup(void); + static int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SSqlCallbackWrapper *pWrapper); static int32_t waitRefSetToBaseCount(int32_t rsetId, const char *name, int64_t startMs, int64_t timeoutMs) { @@ -283,7 +285,8 @@ void taos_cleanup(void) { int64_t cleanupStartMs = taosGetTimestampMs(); - if (TSDB_CODE_SUCCESS != waitRefSetToBaseCount(clientReqRefPool, "request", cleanupStartMs, CLIENT_CLEANUP_WAIT_TIMEOUT_MS)) { + if (TSDB_CODE_SUCCESS != + waitRefSetToBaseCount(clientReqRefPool, "request", cleanupStartMs, CLIENT_CLEANUP_WAIT_TIMEOUT_MS)) { tscWarn("request ref pool did not drain cleanly before cleanup continues"); } @@ -307,7 +310,8 @@ void taos_cleanup(void) { clientReqRefPool = -1; taosCloseRef(id); - if (TSDB_CODE_SUCCESS != waitRefSetToBaseCount(clientConnRefPool, "connection", cleanupStartMs, CLIENT_CLEANUP_WAIT_TIMEOUT_MS)) { + if (TSDB_CODE_SUCCESS != + waitRefSetToBaseCount(clientConnRefPool, "connection", cleanupStartMs, CLIENT_CLEANUP_WAIT_TIMEOUT_MS)) { tscWarn("connection ref pool did not drain cleanly before cleanup continues"); } @@ -317,6 +321,7 @@ void taos_cleanup(void) { nodesDestroyAllocatorSet(); cleanupAppInfo(); + instanceRpcGlobalCleanup(); rpcCleanup(); tscDebug("rpc cleanup"); @@ -1869,7 +1874,7 @@ void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta if (pQuery->pRoot) { pRequest->stmtType = pQuery->pRoot->type; if (nodeType(pQuery->pRoot) == QUERY_NODE_DELETE_STMT) { - pRequest->secureDelete = ((SDeleteStmt*)pQuery->pRoot)->secureDelete; + pRequest->secureDelete = ((SDeleteStmt *)pQuery->pRoot)->secureDelete; } } @@ -3174,7 +3179,7 @@ static int32_t instanceBuildEpSetFromCfg(SConfig *pCfg, SEpSet *pEpSet) { if (pFirstEpItem == NULL || pFirstEpItem->str == NULL || pFirstEpItem->str[0] == 0) { return TSDB_CODE_CFG_NOT_FOUND; } - SEp firstEp = {0}; + SEp firstEp = {0}; int32_t code = taosGetFqdnPortFromEp(pFirstEpItem->str, &firstEp); if (code != TSDB_CODE_SUCCESS) { return code; @@ -3242,6 +3247,143 @@ static void *instanceOpenRpcClient(const char *label) { return clientRpc; } +/** Client-side rate limit: fixed 100 calls per 1s (register + list combined, protect mnode). */ +#define TSC_INSTANCE_API_RL_WINDOW_MS 1000 +#define TSC_INSTANCE_API_RL_MAX_PER_SEC 100 + +static TdThreadOnce gInstRlOnce = PTHREAD_ONCE_INIT; +static TdThreadMutex gInstRlMutex; +static int32_t gInstRlMutexInited = 0; +static int64_t gInstRlWindowStartMs = 0; +static int32_t gInstRlCountInWindow = 0; + +static void instRlMutexInit(void) { + if (taosThreadMutexInit(&gInstRlMutex, NULL) == TSDB_CODE_SUCCESS) { + gInstRlMutexInited = 1; + } +} + +/** Call before instance RPC; shared by register and list. */ +static int32_t instanceApiRateLimitTry(void) { + int32_t c = taosThreadOnce(&gInstRlOnce, instRlMutexInit); + if (c != TSDB_CODE_SUCCESS) { + terrno = c; + return c; + } + if (!gInstRlMutexInited) { + tscError("instance API rate limiter init failed, block request"); + terrno = TSDB_CODE_TSC_INTERNAL_ERROR; + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + + int64_t now = taosGetTimestampMs(); + (void)taosThreadMutexLock(&gInstRlMutex); + /* Reset window if first use, window elapsed, or clock moved backwards (NTP). */ + if (gInstRlWindowStartMs == 0 || now < gInstRlWindowStartMs || + (now - gInstRlWindowStartMs) >= (int64_t)TSC_INSTANCE_API_RL_WINDOW_MS) { + gInstRlWindowStartMs = now; + gInstRlCountInWindow = 0; + } + if (gInstRlCountInWindow >= TSC_INSTANCE_API_RL_MAX_PER_SEC) { + (void)taosThreadMutexUnlock(&gInstRlMutex); + tscWarn("instance API rate limit exceeded (max %d calls per %d ms, register and list combined)", + TSC_INSTANCE_API_RL_MAX_PER_SEC, TSC_INSTANCE_API_RL_WINDOW_MS); + terrno = TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT; + return TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT; + } + gInstRlCountInWindow++; + (void)taosThreadMutexUnlock(&gInstRlMutex); + return TSDB_CODE_SUCCESS; +} + +/** Process-wide singleton: connectionless instance APIs share one rpcOpen; closed in taos_cleanup. */ +static TdThreadOnce gInstRpcOnce = PTHREAD_ONCE_INIT; +static TdThreadMutex gInstRpcMutex; +static TdThreadCond gInstRpcCond; +static volatile int32_t gInstRpcMutexReady = 0; +static volatile int32_t gInstRpcCondReady = 0; +static void *gInstRpc = NULL; +static int32_t gInstRpcRef = 0; +static int32_t gInstRpcClosing = 0; + +static void instRpcMutexInit(void) { + if (taosThreadMutexInit(&gInstRpcMutex, NULL) == TSDB_CODE_SUCCESS) { + if (taosThreadCondInit(&gInstRpcCond, NULL) == TSDB_CODE_SUCCESS) { + gInstRpcCondReady = 1; + gInstRpcMutexReady = 1; + return; + } + (void)taosThreadMutexDestroy(&gInstRpcMutex); + } +} + +static int32_t instanceRpcAcquire(void **ppRpc) { + int32_t code = taosThreadOnce(&gInstRpcOnce, instRpcMutexInit); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return code; + } + if (!gInstRpcMutexReady || !gInstRpcCondReady) { + tscError("instance RPC singleton not ready, block request"); + terrno = TSDB_CODE_TSC_INTERNAL_ERROR; + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + code = taosThreadMutexLock(&gInstRpcMutex); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (gInstRpcClosing) { + (void)taosThreadMutexUnlock(&gInstRpcMutex); + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + if (gInstRpc == NULL) { + gInstRpc = instanceOpenRpcClient("INST"); + if (gInstRpc == NULL) { + code = terrno; + (void)taosThreadMutexUnlock(&gInstRpcMutex); + return code; + } + tscInfo("instance RPC singleton opened, handle:%p (search this line to count rpcOpen)", gInstRpc); + } + gInstRpcRef++; + *ppRpc = gInstRpc; + (void)taosThreadMutexUnlock(&gInstRpcMutex); + return TSDB_CODE_SUCCESS; +} + +static void instanceRpcRelease(void) { + if (!gInstRpcMutexReady || !gInstRpcCondReady) { + return; + } + (void)taosThreadMutexLock(&gInstRpcMutex); + if (gInstRpcRef > 0) { + gInstRpcRef--; + if (gInstRpcClosing && gInstRpcRef == 0) { + (void)taosThreadCondSignal(&gInstRpcCond); + } + } + (void)taosThreadMutexUnlock(&gInstRpcMutex); +} + +static void instanceRpcGlobalCleanup(void) { + if (!gInstRpcMutexReady || !gInstRpcCondReady) { + return; + } + (void)taosThreadMutexLock(&gInstRpcMutex); + gInstRpcClosing = 1; + while (gInstRpcRef > 0) { + (void)taosThreadCondWait(&gInstRpcCond, &gInstRpcMutex); + } + if (gInstRpc != NULL) { + tscInfo("instance RPC singleton closing, handle:%p (search this line to count rpcClose)", gInstRpc); + rpcClose(gInstRpc); + gInstRpc = NULL; + } + gInstRpcCondReady = 0; + gInstRpcMutexReady = 0; + (void)taosThreadMutexUnlock(&gInstRpcMutex); +} + int32_t taos_register_instance(const char *id, const char *type, const char *desc, int32_t expire) { if (id == NULL || id[0] == 0) { return terrno = TSDB_CODE_INVALID_PARA; @@ -3286,9 +3428,16 @@ int32_t taos_register_instance(const char *id, const char *type, const char *des return terrno = code; } - void *clientRpc = instanceOpenRpcClient("INST"); - if (clientRpc == NULL) { - return terrno; + code = instanceApiRateLimitTry(); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + void *clientRpc = NULL; + code = instanceRpcAcquire(&clientRpc); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return code; } SRpcMsg rpcMsg = {0}; @@ -3308,22 +3457,19 @@ int32_t taos_register_instance(const char *id, const char *type, const char *des int32_t contLen = tSerializeSInstanceRegisterReq(NULL, 0, &req); if (contLen <= 0) { code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR; - rpcClose(clientRpc); - return code; + goto _register_inst_end; } void *pCont = rpcMallocCont(contLen); if (pCont == NULL) { code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY; - rpcClose(clientRpc); - return code; + goto _register_inst_end; } if (tSerializeSInstanceRegisterReq(pCont, contLen, &req) < 0) { code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR; rpcFreeCont(pCont); - rpcClose(clientRpc); - return code; + goto _register_inst_end; } rpcMsg.pCont = pCont; @@ -3335,10 +3481,7 @@ int32_t taos_register_instance(const char *id, const char *type, const char *des code = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != code) { tscError("failed to send instance register req since %s", tstrerror(code)); - // rpcSendRecv failed, pCont may not be freed, but check _RETURN1 path - // In error path, rpcSendRecv may free pCont, but we free it here to be safe - rpcClose(clientRpc); - return code; + goto _register_inst_end; } if (rpcRsp.code != 0) { @@ -3351,8 +3494,9 @@ int32_t taos_register_instance(const char *id, const char *type, const char *des if (rpcRsp.pCont != NULL) { rpcFreeCont(rpcRsp.pCont); } - rpcClose(clientRpc); +_register_inst_end: + instanceRpcRelease(); terrno = code; return code; } @@ -3381,9 +3525,17 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo return code; } - void *clientRpc = instanceOpenRpcClient("LIST"); - if (clientRpc == NULL) { - return terrno; + code = instanceApiRateLimitTry(); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return code; + } + + void *clientRpc = NULL; + code = instanceRpcAcquire(&clientRpc); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return code; } SRpcMsg rpcMsg = {0}; @@ -3394,31 +3546,22 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo tstrncpy(req.filter_type, filter_type, sizeof(req.filter_type)); } - // Serialize request to get required length int32_t contLen = tSerializeSInstanceListReq(NULL, 0, &req); if (contLen <= 0) { code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR; - rpcClose(clientRpc); - terrno = code; - return code; + goto _list_inst_end; } - // Allocate RPC message buffer (includes message header overhead) void *pCont = rpcMallocCont(contLen); if (pCont == NULL) { code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY; - rpcClose(clientRpc); - terrno = code; - return code; + goto _list_inst_end; } - // Serialize request into the content part (after message header) if (tSerializeSInstanceListReq(pCont, contLen, &req) < 0) { code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR; rpcFreeCont(pCont); - rpcClose(clientRpc); - terrno = code; - return code; + goto _list_inst_end; } rpcMsg.pCont = pCont; @@ -3430,25 +3573,18 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo code = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != code) { tscError("failed to send instance list req since %s", tstrerror(code)); - rpcFreeCont(pCont); - rpcClose(clientRpc); - terrno = code; - return code; + goto _list_inst_end; } - // Check response - rpcRsp.code contains the result code from mnode if (rpcRsp.code != 0) { code = rpcRsp.code; tscError("instance list failed, code:%s", tstrerror(code)); if (rpcRsp.pCont != NULL) { rpcFreeCont(rpcRsp.pCont); } - rpcClose(clientRpc); - terrno = code; - return code; + goto _list_inst_end; } - // Deserialize response if (rpcRsp.pCont != NULL && rpcRsp.contLen > 0) { SInstanceListRsp rsp = {0}; code = tDeserializeSInstanceListRsp(rpcRsp.pCont, rpcRsp.contLen, &rsp); @@ -3465,9 +3601,7 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo } rsp.count = 0; rpcFreeCont(rpcRsp.pCont); - rpcClose(clientRpc); - terrno = code; - return code; + goto _list_inst_end; } *pList = rsp.ids; *pCount = rsp.count; @@ -3479,9 +3613,12 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo if (rpcRsp.pCont != NULL) { rpcFreeCont(rpcRsp.pCont); } - rpcClose(clientRpc); + code = TSDB_CODE_SUCCESS; - return TSDB_CODE_SUCCESS; +_list_inst_end: + instanceRpcRelease(); + terrno = code; + return code; } void taos_free_instances(char ***pList, int32_t count) { diff --git a/source/client/test/instanceTest.cpp b/source/client/test/instanceTest.cpp index 7b020d297c3b..1933e6fdc762 100644 --- a/source/client/test/instanceTest.cpp +++ b/source/client/test/instanceTest.cpp @@ -318,4 +318,24 @@ TEST(instanceCase, secondEp) { taos_free_instances(&list, count); } -#pragma GCC diagnostic pop \ No newline at end of file +TEST(instanceCase, instance_api_rate_limit) { + initEnv(); + taosMsleep(1000); + + int32_t code = TSDB_CODE_SUCCESS; + int64_t t0 = taosGetTimestampMs(); + for (int i = 0; i < 100; i++) { + code = taos_register_instance("id-1", "taosadapter", "desc:test_instance", 100); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + } + int64_t t1 = taosGetTimestampMs(); + + code = taos_register_instance("id-1", "taosadapter", "desc:test_instance", 100); + if (t1 - t0 >= 1000) { + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + } else { + ASSERT_EQ(code, TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT); + } +} + +#pragma GCC diagnostic pop diff --git a/source/util/src/terror.c b/source/util/src/terror.c index d6eb5bd73faa..7350e400afe9 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -188,6 +188,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_INTERVAL_OFFSET, "Invalid interval offs TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS, "Operation not supported in windows") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TOTP_CODE, "Invalid TOTP code") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TOKEN, "Invalid token") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT, "instance register/list API rate limit exceeded") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SESS_PER_USER_LIMIT, "reached the maximum sessions per user limit") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SESS_CONN_TIMEOUT, "reached the maximum connection timeout limit")