Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/util/taoserror.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TSC_SESS_MAX_CONCURRENCY_LIMIT TAOS_DEF_ERROR_CODE(0, 0x023C)
#define TSDB_CODE_TSC_SESS_MAX_CALL_VNODE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x023D)
#define TSDB_CODE_TSC_INVALID_TOKEN TAOS_DEF_ERROR_CODE(0, 0x023E)
#define TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x023F)
#define TSDB_CODE_TSC_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x02FF)

// mnode-common
Expand Down
193 changes: 148 additions & 45 deletions source/client/src/clientMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
extern void shellStopDaemon();
#endif

static void instanceRpcGlobalCleanup(void);

static int32_t sentinel = TSC_VAR_NOT_RELEASE;
static int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SSqlCallbackWrapper *pWrapper);

Expand Down Expand Up @@ -283,6 +285,7 @@ void taos_cleanup(void) {

nodesDestroyAllocatorSet();
cleanupAppInfo();
instanceRpcGlobalCleanup();
rpcCleanup();
tscDebug("rpc cleanup");

Expand Down Expand Up @@ -3207,6 +3210,100 @@ static void *instanceOpenRpcClient(const char *label) {
return clientRpc;
}

/** Client-side rate limit: fixed 100 calls per 1s (register + list combined, protect mnode). */
#define TSC_INSTANCE_API_RL_WINDOW_MS 1000
#define TSC_INSTANCE_API_RL_MAX_PER_SEC 100

static TdThreadOnce gInstRlOnce = PTHREAD_ONCE_INIT;
static TdThreadMutex gInstRlMutex;
static int32_t gInstRlMutexInited = 0;
static int64_t gInstRlWindowStartMs = 0;
static int32_t gInstRlCountInWindow = 0;

static void instRlMutexInit(void) {
if (taosThreadMutexInit(&gInstRlMutex, NULL) == TSDB_CODE_SUCCESS) {
gInstRlMutexInited = 1;
}
}

/** Call before instance RPC; shared by register and list. */
static int32_t instanceApiRateLimitTry(void) {
int32_t c = taosThreadOnce(&gInstRlOnce, instRlMutexInit);
if (c != TSDB_CODE_SUCCESS || !gInstRlMutexInited) {
return terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY;
Comment thread
Pengrongkun marked this conversation as resolved.
Outdated
}
Comment thread
Pengrongkun marked this conversation as resolved.
Outdated

int64_t now = taosGetTimestampMs();
(void)taosThreadMutexLock(&gInstRlMutex);
/* Reset window if first use, window elapsed, or clock moved backwards (NTP). */
if (gInstRlWindowStartMs == 0 || now < gInstRlWindowStartMs ||
(now - gInstRlWindowStartMs) >= (int64_t)TSC_INSTANCE_API_RL_WINDOW_MS) {
gInstRlWindowStartMs = now;
gInstRlCountInWindow = 0;
}
if (gInstRlCountInWindow >= TSC_INSTANCE_API_RL_MAX_PER_SEC) {
(void)taosThreadMutexUnlock(&gInstRlMutex);
tscWarn("instance API rate limit exceeded (max %d calls per %d ms, register and list combined)",
TSC_INSTANCE_API_RL_MAX_PER_SEC, TSC_INSTANCE_API_RL_WINDOW_MS);
terrno = TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT;
return TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT;
}
gInstRlCountInWindow++;
(void)taosThreadMutexUnlock(&gInstRlMutex);
return TSDB_CODE_SUCCESS;
}

/** Process-wide singleton: connectionless instance APIs share one rpcOpen; closed in taos_cleanup. */
static TdThreadOnce gInstRpcOnce = PTHREAD_ONCE_INIT;
static TdThreadMutex gInstRpcMutex;
static volatile int32_t gInstRpcMutexReady = 0;
static void *gInstRpc = NULL;
Comment on lines +3301 to +3305

static void instRpcMutexInit(void) {
if (taosThreadMutexInit(&gInstRpcMutex, NULL) == TSDB_CODE_SUCCESS) {
gInstRpcMutexReady = 1;
}
}

static int32_t instanceRpcAcquire(void **ppRpc) {
Comment thread
Pengrongkun marked this conversation as resolved.
int32_t code = taosThreadOnce(&gInstRpcOnce, instRpcMutexInit);
if (code != TSDB_CODE_SUCCESS || !gInstRpcMutexReady) {
return terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY;
}
Comment thread
Pengrongkun marked this conversation as resolved.
code = taosThreadMutexLock(&gInstRpcMutex);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (gInstRpc == NULL) {
gInstRpc = instanceOpenRpcClient("INST");
if (gInstRpc == NULL) {
code = terrno;
(void)taosThreadMutexUnlock(&gInstRpcMutex);
return code;
}
tscInfo("instance RPC singleton opened, handle:%p (search this line to count rpcOpen)", gInstRpc);
}
*ppRpc = gInstRpc;
return TSDB_CODE_SUCCESS;
}

static void instanceRpcRelease(void) { (void)taosThreadMutexUnlock(&gInstRpcMutex); }
Comment thread
Pengrongkun marked this conversation as resolved.
Outdated

static void instanceRpcGlobalCleanup(void) {
if (!gInstRpcMutexReady) {
return;
}
(void)taosThreadMutexLock(&gInstRpcMutex);
if (gInstRpc != NULL) {
tscInfo("instance RPC singleton closing, handle:%p (search this line to count rpcClose)", gInstRpc);
rpcClose(gInstRpc);
gInstRpc = NULL;
}
(void)taosThreadMutexUnlock(&gInstRpcMutex);
(void)taosThreadMutexDestroy(&gInstRpcMutex);
gInstRpcMutexReady = 0;
Comment thread
Pengrongkun marked this conversation as resolved.
Outdated
}

int32_t taos_register_instance(const char *id, const char *type, const char *desc, int32_t expire) {
if (id == NULL || id[0] == 0) {
return terrno = TSDB_CODE_INVALID_PARA;
Expand Down Expand Up @@ -3251,9 +3348,16 @@ int32_t taos_register_instance(const char *id, const char *type, const char *des
return terrno = code;
}

void *clientRpc = instanceOpenRpcClient("INST");
if (clientRpc == NULL) {
return terrno;
code = instanceApiRateLimitTry();
if (code != TSDB_CODE_SUCCESS) {
return code;
}

void *clientRpc = NULL;
code = instanceRpcAcquire(&clientRpc);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}

SRpcMsg rpcMsg = {0};
Expand All @@ -3273,22 +3377,19 @@ int32_t taos_register_instance(const char *id, const char *type, const char *des
int32_t contLen = tSerializeSInstanceRegisterReq(NULL, 0, &req);
if (contLen <= 0) {
code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
rpcClose(clientRpc);
return code;
goto _register_inst_end;
}

void *pCont = rpcMallocCont(contLen);
if (pCont == NULL) {
code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY;
rpcClose(clientRpc);
return code;
goto _register_inst_end;
}

if (tSerializeSInstanceRegisterReq(pCont, contLen, &req) < 0) {
code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
rpcFreeCont(pCont);
rpcClose(clientRpc);
return code;
goto _register_inst_end;
}

rpcMsg.pCont = pCont;
Expand All @@ -3300,10 +3401,7 @@ int32_t taos_register_instance(const char *id, const char *type, const char *des
code = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to send instance register req since %s", tstrerror(code));
Comment thread
Pengrongkun marked this conversation as resolved.
// rpcSendRecv failed, pCont may not be freed, but check _RETURN1 path
// In error path, rpcSendRecv may free pCont, but we free it here to be safe
rpcClose(clientRpc);
return code;
goto _register_inst_end;
}

if (rpcRsp.code != 0) {
Expand All @@ -3316,8 +3414,9 @@ int32_t taos_register_instance(const char *id, const char *type, const char *des
if (rpcRsp.pCont != NULL) {
rpcFreeCont(rpcRsp.pCont);
}
rpcClose(clientRpc);

_register_inst_end:
instanceRpcRelease();
terrno = code;
return code;
}
Expand Down Expand Up @@ -3346,11 +3445,21 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo
return code;
}

void *clientRpc = instanceOpenRpcClient("LIST");
if (clientRpc == NULL) {
return terrno;
code = instanceApiRateLimitTry();
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}

void *clientRpc = NULL;
code = instanceRpcAcquire(&clientRpc);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}

int32_t retCode = TSDB_CODE_SUCCESS;
Comment thread
Pengrongkun marked this conversation as resolved.
Outdated

SRpcMsg rpcMsg = {0};
SRpcMsg rpcRsp = {0};

Expand All @@ -3359,31 +3468,25 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo
tstrncpy(req.filter_type, filter_type, sizeof(req.filter_type));
}

// Serialize request to get required length
int32_t contLen = tSerializeSInstanceListReq(NULL, 0, &req);
if (contLen <= 0) {
code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
rpcClose(clientRpc);
terrno = code;
return code;
retCode = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
terrno = retCode;
goto _list_inst_end;
}

// Allocate RPC message buffer (includes message header overhead)
void *pCont = rpcMallocCont(contLen);
if (pCont == NULL) {
code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY;
rpcClose(clientRpc);
terrno = code;
return code;
retCode = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY;
terrno = retCode;
goto _list_inst_end;
}

// Serialize request into the content part (after message header)
if (tSerializeSInstanceListReq(pCont, contLen, &req) < 0) {
code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
retCode = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
rpcFreeCont(pCont);
rpcClose(clientRpc);
terrno = code;
return code;
terrno = retCode;
goto _list_inst_end;
}

rpcMsg.pCont = pCont;
Expand All @@ -3396,24 +3499,21 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to send instance list req since %s", tstrerror(code));
rpcFreeCont(pCont);
Comment thread
Pengrongkun marked this conversation as resolved.
Outdated
rpcClose(clientRpc);
retCode = code;
terrno = code;
return code;
goto _list_inst_end;
}

// Check response - rpcRsp.code contains the result code from mnode
if (rpcRsp.code != 0) {
code = rpcRsp.code;
tscError("instance list failed, code:%s", tstrerror(code));
retCode = rpcRsp.code;
tscError("instance list failed, code:%s", tstrerror(retCode));
if (rpcRsp.pCont != NULL) {
rpcFreeCont(rpcRsp.pCont);
}
rpcClose(clientRpc);
terrno = code;
return code;
terrno = retCode;
goto _list_inst_end;
}

// Deserialize response
if (rpcRsp.pCont != NULL && rpcRsp.contLen > 0) {
SInstanceListRsp rsp = {0};
code = tDeserializeSInstanceListRsp(rpcRsp.pCont, rpcRsp.contLen, &rsp);
Expand All @@ -3430,9 +3530,9 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo
}
rsp.count = 0;
rpcFreeCont(rpcRsp.pCont);
rpcClose(clientRpc);
retCode = code;
terrno = code;
return code;
goto _list_inst_end;
}
*pList = rsp.ids;
*pCount = rsp.count;
Expand All @@ -3444,9 +3544,12 @@ int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCo
if (rpcRsp.pCont != NULL) {
rpcFreeCont(rpcRsp.pCont);
}
rpcClose(clientRpc);
retCode = TSDB_CODE_SUCCESS;

return TSDB_CODE_SUCCESS;
_list_inst_end:
instanceRpcRelease();
terrno = retCode;
return retCode;
}

void taos_free_instances(char ***pList, int32_t count) {
Expand Down
20 changes: 20 additions & 0 deletions source/client/test/instanceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,4 +318,24 @@ TEST(instanceCase, secondEp) {
taos_free_instances(&list, count);
}

TEST(instanceCase, instance_api_rate_limit) {
initEnv();
taosMsleep(1000);

int32_t code = TSDB_CODE_SUCCESS;
int64_t t0 = taosGetTimestampMs();
for (int i = 0; i < 100; i++) {
code = taos_register_instance("id-1", "taosadapter", "desc:test_instance", 100);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
}
int64_t t1 = taosGetTimestampMs();

code = taos_register_instance("id-1", "taosadapter", "desc:test_instance", 100);
if (t1 - t0 >= 1000) {
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
} else {
ASSERT_EQ(code, TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT);
}
Comment thread
Pengrongkun marked this conversation as resolved.
Comment on lines +323 to +338
}

#pragma GCC diagnostic pop
Comment thread
Pengrongkun marked this conversation as resolved.
Outdated
1 change: 1 addition & 0 deletions source/util/src/terror.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_INTERVAL_OFFSET, "Invalid interval offs
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS, "Operation not supported in windows")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TOTP_CODE, "Invalid TOTP code")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TOKEN, "Invalid token")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT, "instance register/list API rate limit exceeded")

TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SESS_PER_USER_LIMIT, "reached the maximum sessions per user limit")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SESS_CONN_TIMEOUT, "reached the maximum connection timeout limit")
Expand Down
Loading