Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/util/taoserror.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TSC_SESS_MAX_CONCURRENCY_LIMIT TAOS_DEF_ERROR_CODE(0, 0x023C)
#define TSDB_CODE_TSC_SESS_MAX_CALL_VNODE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x023D)
#define TSDB_CODE_TSC_INVALID_TOKEN TAOS_DEF_ERROR_CODE(0, 0x023E)
#define TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x023F)
#define TSDB_CODE_TSC_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x02FF)

// mnode-common
Expand Down
231 changes: 184 additions & 47 deletions source/client/src/clientMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
extern void shellStopDaemon();
#endif

static void instanceRpcGlobalCleanup(void);

static int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SSqlCallbackWrapper *pWrapper);

static int32_t waitRefSetToBaseCount(int32_t rsetId, const char *name, int64_t startMs, int64_t timeoutMs) {
Expand Down Expand Up @@ -283,7 +285,8 @@ void taos_cleanup(void) {

int64_t cleanupStartMs = taosGetTimestampMs();

if (TSDB_CODE_SUCCESS != waitRefSetToBaseCount(clientReqRefPool, "request", cleanupStartMs, CLIENT_CLEANUP_WAIT_TIMEOUT_MS)) {
if (TSDB_CODE_SUCCESS !=
waitRefSetToBaseCount(clientReqRefPool, "request", cleanupStartMs, CLIENT_CLEANUP_WAIT_TIMEOUT_MS)) {
tscWarn("request ref pool did not drain cleanly before cleanup continues");
}

Expand All @@ -307,7 +310,8 @@ void taos_cleanup(void) {
clientReqRefPool = -1;
taosCloseRef(id);

if (TSDB_CODE_SUCCESS != waitRefSetToBaseCount(clientConnRefPool, "connection", cleanupStartMs, CLIENT_CLEANUP_WAIT_TIMEOUT_MS)) {
if (TSDB_CODE_SUCCESS !=
waitRefSetToBaseCount(clientConnRefPool, "connection", cleanupStartMs, CLIENT_CLEANUP_WAIT_TIMEOUT_MS)) {
tscWarn("connection ref pool did not drain cleanly before cleanup continues");
}

Expand All @@ -317,6 +321,7 @@ void taos_cleanup(void) {

nodesDestroyAllocatorSet();
cleanupAppInfo();
instanceRpcGlobalCleanup();
rpcCleanup();
tscDebug("rpc cleanup");

Expand Down Expand Up @@ -1869,7 +1874,7 @@ void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta
if (pQuery->pRoot) {
pRequest->stmtType = pQuery->pRoot->type;
if (nodeType(pQuery->pRoot) == QUERY_NODE_DELETE_STMT) {
pRequest->secureDelete = ((SDeleteStmt*)pQuery->pRoot)->secureDelete;
pRequest->secureDelete = ((SDeleteStmt *)pQuery->pRoot)->secureDelete;
}
}

Expand Down Expand Up @@ -3174,7 +3179,7 @@ static int32_t instanceBuildEpSetFromCfg(SConfig *pCfg, SEpSet *pEpSet) {
if (pFirstEpItem == NULL || pFirstEpItem->str == NULL || pFirstEpItem->str[0] == 0) {
return TSDB_CODE_CFG_NOT_FOUND;
}
SEp firstEp = {0};
SEp firstEp = {0};
int32_t code = taosGetFqdnPortFromEp(pFirstEpItem->str, &firstEp);
if (code != TSDB_CODE_SUCCESS) {
return code;
Expand Down Expand Up @@ -3242,6 +3247,143 @@ static void *instanceOpenRpcClient(const char *label) {
return clientRpc;
}

/** Client-side rate limit: fixed 100 calls per 1s (register + list combined, protect mnode). */
#define TSC_INSTANCE_API_RL_WINDOW_MS 1000
#define TSC_INSTANCE_API_RL_MAX_PER_SEC 100

static TdThreadOnce gInstRlOnce = PTHREAD_ONCE_INIT;
static TdThreadMutex gInstRlMutex;
static int32_t gInstRlMutexInited = 0;
static int64_t gInstRlWindowStartMs = 0;
static int32_t gInstRlCountInWindow = 0;

static void instRlMutexInit(void) {
if (taosThreadMutexInit(&gInstRlMutex, NULL) == TSDB_CODE_SUCCESS) {
gInstRlMutexInited = 1;
}
}

/** Call before instance RPC; shared by register and list. */
static int32_t instanceApiRateLimitTry(void) {
int32_t c = taosThreadOnce(&gInstRlOnce, instRlMutexInit);
if (c != TSDB_CODE_SUCCESS) {
terrno = c;
return c;
}
if (!gInstRlMutexInited) {
tscError("instance API rate limiter init failed, block request");
terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
return TSDB_CODE_TSC_INTERNAL_ERROR;
}

int64_t now = taosGetTimestampMs();
(void)taosThreadMutexLock(&gInstRlMutex);
/* Reset window if first use, window elapsed, or clock moved backwards (NTP). */
if (gInstRlWindowStartMs == 0 || now < gInstRlWindowStartMs ||
(now - gInstRlWindowStartMs) >= (int64_t)TSC_INSTANCE_API_RL_WINDOW_MS) {
gInstRlWindowStartMs = now;
gInstRlCountInWindow = 0;
}
if (gInstRlCountInWindow >= TSC_INSTANCE_API_RL_MAX_PER_SEC) {
(void)taosThreadMutexUnlock(&gInstRlMutex);
tscWarn("instance API rate limit exceeded (max %d calls per %d ms, register and list combined)",
TSC_INSTANCE_API_RL_MAX_PER_SEC, TSC_INSTANCE_API_RL_WINDOW_MS);
terrno = TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT;
return TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT;
}
gInstRlCountInWindow++;
(void)taosThreadMutexUnlock(&gInstRlMutex);
return TSDB_CODE_SUCCESS;
}

/** Process-wide singleton: connectionless instance APIs share one rpcOpen; closed in taos_cleanup. */
static TdThreadOnce gInstRpcOnce = PTHREAD_ONCE_INIT;
static TdThreadMutex gInstRpcMutex;
static TdThreadCond gInstRpcCond;
static volatile int32_t gInstRpcMutexReady = 0;
static volatile int32_t gInstRpcCondReady = 0;
static void *gInstRpc = NULL;
Comment on lines +3301 to +3305
static int32_t gInstRpcRef = 0;
static int32_t gInstRpcClosing = 0;

static void instRpcMutexInit(void) {
if (taosThreadMutexInit(&gInstRpcMutex, NULL) == TSDB_CODE_SUCCESS) {
if (taosThreadCondInit(&gInstRpcCond, NULL) == TSDB_CODE_SUCCESS) {
gInstRpcCondReady = 1;
gInstRpcMutexReady = 1;
return;
}
(void)taosThreadMutexDestroy(&gInstRpcMutex);
}
}

static int32_t instanceRpcAcquire(void **ppRpc) {
Comment thread
Pengrongkun marked this conversation as resolved.
int32_t code = taosThreadOnce(&gInstRpcOnce, instRpcMutexInit);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}
if (!gInstRpcMutexReady || !gInstRpcCondReady) {
tscError("instance RPC singleton not ready, block request");
terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
Comment thread
Pengrongkun marked this conversation as resolved.
code = taosThreadMutexLock(&gInstRpcMutex);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (gInstRpcClosing) {
(void)taosThreadMutexUnlock(&gInstRpcMutex);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (gInstRpc == NULL) {
gInstRpc = instanceOpenRpcClient("INST");
if (gInstRpc == NULL) {
code = terrno;
(void)taosThreadMutexUnlock(&gInstRpcMutex);
return code;
}
tscInfo("instance RPC singleton opened, handle:%p (search this line to count rpcOpen)", gInstRpc);
}
gInstRpcRef++;
*ppRpc = gInstRpc;
(void)taosThreadMutexUnlock(&gInstRpcMutex);
return TSDB_CODE_SUCCESS;
}

static void instanceRpcRelease(void) {
if (!gInstRpcMutexReady || !gInstRpcCondReady) {
return;
}
(void)taosThreadMutexLock(&gInstRpcMutex);
if (gInstRpcRef > 0) {
gInstRpcRef--;
if (gInstRpcClosing && gInstRpcRef == 0) {
(void)taosThreadCondSignal(&gInstRpcCond);
}
}
(void)taosThreadMutexUnlock(&gInstRpcMutex);
}

static void instanceRpcGlobalCleanup(void) {
if (!gInstRpcMutexReady || !gInstRpcCondReady) {
return;
}
(void)taosThreadMutexLock(&gInstRpcMutex);
gInstRpcClosing = 1;
while (gInstRpcRef > 0) {
(void)taosThreadCondWait(&gInstRpcCond, &gInstRpcMutex);
}
if (gInstRpc != NULL) {
tscInfo("instance RPC singleton closing, handle:%p (search this line to count rpcClose)", gInstRpc);
rpcClose(gInstRpc);
gInstRpc = NULL;
}
gInstRpcCondReady = 0;
gInstRpcMutexReady = 0;
(void)taosThreadMutexUnlock(&gInstRpcMutex);
}

int32_t taos_register_instance(const char *id, const char *type, const char *desc, int32_t expire) {
if (id == NULL || id[0] == 0) {
return terrno = TSDB_CODE_INVALID_PARA;
Expand Down Expand Up @@ -3286,9 +3428,16 @@ int32_t taos_register_instance(const char *id, const char *type, const char *des
return terrno = code;
}

void *clientRpc = instanceOpenRpcClient("INST");
if (clientRpc == NULL) {
return terrno;
code = instanceApiRateLimitTry();
if (code != TSDB_CODE_SUCCESS) {
return code;
}

void *clientRpc = NULL;
code = instanceRpcAcquire(&clientRpc);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}

SRpcMsg rpcMsg = {0};
Expand All @@ -3308,22 +3457,19 @@ int32_t taos_register_instance(const char *id, const char *type, const char *des
int32_t contLen = tSerializeSInstanceRegisterReq(NULL, 0, &req);
if (contLen <= 0) {
code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
rpcClose(clientRpc);
return code;
goto _register_inst_end;
}

void *pCont = rpcMallocCont(contLen);
if (pCont == NULL) {
code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY;
rpcClose(clientRpc);
return code;
goto _register_inst_end;
}

if (tSerializeSInstanceRegisterReq(pCont, contLen, &req) < 0) {
code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
rpcFreeCont(pCont);
rpcClose(clientRpc);
return code;
goto _register_inst_end;
}

rpcMsg.pCont = pCont;
Expand All @@ -3335,10 +3481,7 @@ int32_t taos_register_instance(const char *id, const char *type, const char *des
code = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to send instance register req since %s", tstrerror(code));
Comment thread
Pengrongkun marked this conversation as resolved.
// rpcSendRecv failed, pCont may not be freed, but check _RETURN1 path
// In error path, rpcSendRecv may free pCont, but we free it here to be safe
rpcClose(clientRpc);
return code;
goto _register_inst_end;
}

if (rpcRsp.code != 0) {
Expand All @@ -3351,8 +3494,9 @@ int32_t taos_register_instance(const char *id, const char *type, const char *des
if (rpcRsp.pCont != NULL) {
rpcFreeCont(rpcRsp.pCont);
}
rpcClose(clientRpc);

_register_inst_end:
instanceRpcRelease();
terrno = code;
return code;
}
Expand Down Expand Up @@ -3381,9 +3525,17 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo
return code;
}

void *clientRpc = instanceOpenRpcClient("LIST");
if (clientRpc == NULL) {
return terrno;
code = instanceApiRateLimitTry();
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}

void *clientRpc = NULL;
code = instanceRpcAcquire(&clientRpc);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}

SRpcMsg rpcMsg = {0};
Expand All @@ -3394,31 +3546,22 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo
tstrncpy(req.filter_type, filter_type, sizeof(req.filter_type));
}

// Serialize request to get required length
int32_t contLen = tSerializeSInstanceListReq(NULL, 0, &req);
if (contLen <= 0) {
code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
rpcClose(clientRpc);
terrno = code;
return code;
goto _list_inst_end;
}

// Allocate RPC message buffer (includes message header overhead)
void *pCont = rpcMallocCont(contLen);
if (pCont == NULL) {
code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY;
rpcClose(clientRpc);
terrno = code;
return code;
goto _list_inst_end;
}

// Serialize request into the content part (after message header)
if (tSerializeSInstanceListReq(pCont, contLen, &req) < 0) {
code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
rpcFreeCont(pCont);
rpcClose(clientRpc);
terrno = code;
return code;
goto _list_inst_end;
}

rpcMsg.pCont = pCont;
Expand All @@ -3430,25 +3573,18 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo
code = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to send instance list req since %s", tstrerror(code));
rpcFreeCont(pCont);
rpcClose(clientRpc);
terrno = code;
return code;
goto _list_inst_end;
}

// Check response - rpcRsp.code contains the result code from mnode
if (rpcRsp.code != 0) {
code = rpcRsp.code;
tscError("instance list failed, code:%s", tstrerror(code));
if (rpcRsp.pCont != NULL) {
rpcFreeCont(rpcRsp.pCont);
}
rpcClose(clientRpc);
terrno = code;
return code;
goto _list_inst_end;
}

// Deserialize response
if (rpcRsp.pCont != NULL && rpcRsp.contLen > 0) {
SInstanceListRsp rsp = {0};
code = tDeserializeSInstanceListRsp(rpcRsp.pCont, rpcRsp.contLen, &rsp);
Expand All @@ -3465,9 +3601,7 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo
}
rsp.count = 0;
rpcFreeCont(rpcRsp.pCont);
rpcClose(clientRpc);
terrno = code;
return code;
goto _list_inst_end;
}
*pList = rsp.ids;
*pCount = rsp.count;
Expand All @@ -3479,9 +3613,12 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo
if (rpcRsp.pCont != NULL) {
rpcFreeCont(rpcRsp.pCont);
}
rpcClose(clientRpc);
code = TSDB_CODE_SUCCESS;

return TSDB_CODE_SUCCESS;
_list_inst_end:
instanceRpcRelease();
terrno = code;
return code;
Comment on lines +3619 to +3621
}

void taos_free_instances(char ***pList, int32_t count) {
Expand Down
Loading
Loading