-
Notifications
You must be signed in to change notification settings - Fork 5k
fix(rbac): access control[manual-only] #35213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
7862707
91b110c
7a3cf72
ec66bb4
6a96166
85d41d3
17cbc31
bed17a6
30fe44f
073bc72
0fca3e0
33349e9
a4c7511
d6c8766
a50cd47
c1adda9
332d850
86a015c
f2b5b8e
0e6ef8f
4484f52
ef51541
ef08c1f
bffc3ca
c48181c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,7 @@ | |
| #include "storageapi.h" | ||
| #include "tcompare.h" | ||
| #include "thash.h" | ||
| #include "tref.h" | ||
| #include "trpc.h" | ||
| #include "ttypes.h" | ||
| // RPC timeout for virtual table reference validation (5 seconds) | ||
|
|
@@ -79,6 +80,8 @@ typedef struct SSysTableScanInfo { | |
| SRetrieveTableReq req; | ||
| SEpSet epSet; | ||
| tsem_t ready; | ||
| int64_t self; // ref ID in sysTableScanRefPool (for callback safety) | ||
| int32_t rspCode; // error code set by the RPC callback | ||
| SReadHandle readHandle; | ||
| const char* pUser; | ||
| int32_t accountId; | ||
|
|
@@ -124,12 +127,28 @@ typedef struct SSysTableScanInfo { | |
| STableListInfo* pSubTableListInfo; | ||
| } SSysTableScanInfo; | ||
|
|
||
| typedef struct SSysTableLoadCtx { | ||
| tsem_t ready; | ||
| SRetrieveMetaTableRsp* pRsp; | ||
| int32_t rspCode; | ||
| int32_t refs; | ||
| } SSysTableLoadCtx; | ||
| // Lightweight wrapper passed as RPC callback param; stores only the ref ID so | ||
| // the callback can safely acquire the SSysTableScanInfo from the ref pool. | ||
| typedef struct SSysTableScanCbParam { | ||
| int64_t sysTableScanId; | ||
| } SSysTableScanCbParam; | ||
|
|
||
| // Per-file ref pool used to decouple the callback lifetime from the operator | ||
| // lifetime, following the same pattern as fetchObjRefPool in exchangeoperator.c. | ||
| static int32_t sysTableScanRefPool = -1; | ||
| static TdThreadOnce sysTableScanRefPoolOnce = PTHREAD_ONCE_INIT; | ||
|
|
||
| static void doDestroySysTableScanInfo(void* param); | ||
|
|
||
| static void cleanupSysTableScanRefPool(void) { | ||
| int32_t ref = atomic_val_compare_exchange_32(&sysTableScanRefPool, sysTableScanRefPool, 0); | ||
| taosCloseRef(ref); | ||
| } | ||
|
|
||
| static void initSysTableScanRefPool(void) { | ||
| sysTableScanRefPool = taosOpenRef(64, doDestroySysTableScanInfo); | ||
| (void)atexit(cleanupSysTableScanRefPool); | ||
| } | ||
|
|
||
| typedef struct { | ||
| const char* name; | ||
|
|
@@ -208,25 +227,6 @@ static void destroySysScanOperator(void* param); | |
| static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code); | ||
| static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse, bool* equal); | ||
|
|
||
| static void freeSysTableLoadCtx(void* param) { | ||
| SSysTableLoadCtx* pCtx = (SSysTableLoadCtx*)param; | ||
| if (pCtx == NULL) { | ||
| return; | ||
| } | ||
|
|
||
| if (atomic_sub_fetch_32(&pCtx->refs, 1) != 0) { | ||
| return; | ||
| } | ||
|
|
||
| int32_t code = tsem_destroy(&pCtx->ready); | ||
| if (code != TSDB_CODE_SUCCESS) { | ||
| qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); | ||
| } | ||
|
|
||
| taosMemoryFreeClear(pCtx->pRsp); | ||
| taosMemoryFreeClear(pCtx); | ||
| } | ||
|
|
||
| static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable, | ||
| SMetaReader* smrChildTable, const char* dbname, const char* tableName, | ||
| int32_t* pNumOfRows, const SSDataBlock* dataBlock); | ||
|
|
@@ -4945,32 +4945,25 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca | |
| return NULL; | ||
| } | ||
|
|
||
| SSysTableLoadCtx* pLoadCtx = taosMemoryCalloc(1, sizeof(SSysTableLoadCtx)); | ||
| if (pLoadCtx == NULL) { | ||
| qError("%s prepare callback context failed", GET_TASKID(pTaskInfo)); | ||
| pTaskInfo->code = terrno; | ||
| taosMemoryFree(buf1); | ||
| taosMemoryFree(pMsgSendInfo); | ||
| return NULL; | ||
| } | ||
| int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE | ||
| : TDMT_MND_SYSTABLE_RETRIEVE; | ||
|
|
||
| code = tsem_init(&pLoadCtx->ready, 0, 0); | ||
| if (code != TSDB_CODE_SUCCESS) { | ||
| qError("%s init callback context failed since %s", GET_TASKID(pTaskInfo), tstrerror(code)); | ||
| // Allocate a lightweight wrapper that holds only the ref ID; the callback | ||
| // frees it via paramFreeFp = taosAutoMemoryFree after the callback returns. | ||
| SSysTableScanCbParam* pWrapper = taosMemoryCalloc(1, sizeof(SSysTableScanCbParam)); | ||
| if (!pWrapper) { | ||
| pTaskInfo->code = terrno; | ||
| taosMemoryFree(buf1); | ||
| taosMemoryFree(pMsgSendInfo); | ||
| taosMemoryFree(pLoadCtx); | ||
| pTaskInfo->code = code; | ||
| return NULL; | ||
| T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); | ||
| } | ||
| pLoadCtx->rspCode = TSDB_CODE_SUCCESS; | ||
| pLoadCtx->refs = 2; | ||
| pWrapper->sysTableScanId = pInfo->self; | ||
|
|
||
| int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE | ||
| : TDMT_MND_SYSTABLE_RETRIEVE; | ||
| // reset rspCode from the previous iteration | ||
| pInfo->rspCode = 0; | ||
|
|
||
| pMsgSendInfo->param = pLoadCtx; | ||
| pMsgSendInfo->paramFreeFp = freeSysTableLoadCtx; | ||
| pMsgSendInfo->param = pWrapper; | ||
| pMsgSendInfo->paramFreeFp = taosAutoMemoryFree; | ||
| pMsgSendInfo->msgInfo.pData = buf1; | ||
| pMsgSendInfo->msgInfo.len = contLen; | ||
| pMsgSendInfo->msgType = msgType; | ||
|
|
@@ -4979,31 +4972,24 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca | |
|
|
||
| code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, NULL, pMsgSendInfo); | ||
| if (code != TSDB_CODE_SUCCESS) { | ||
| freeSysTableLoadCtx(pLoadCtx); | ||
| qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); | ||
| pTaskInfo->code = code; | ||
| T_LONG_JMP(pTaskInfo->env, code); | ||
| } | ||
|
|
||
| code = tsem_timewait(&pLoadCtx->ready, VTB_REF_RPC_TIMEOUT_MS); | ||
| // Block this worker thread until the response arrives. qSemWait notifies | ||
| // the worker pool and waits, then re-acquires on wake-up. | ||
| code = qSemWait((qTaskInfo_t)pTaskInfo, &pInfo->ready); | ||
| if (code != TSDB_CODE_SUCCESS) { | ||
| freeSysTableLoadCtx(pLoadCtx); | ||
| qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); | ||
| qError("%s tsem_wait failed at line %d since %s", __func__, __LINE__, tstrerror(code)); | ||
|
kailixu marked this conversation as resolved.
|
||
| pTaskInfo->code = code; | ||
| T_LONG_JMP(pTaskInfo->env, code); | ||
| } | ||
|
Comment on lines
+4980
to
4987
|
||
|
|
||
| if (pLoadCtx->rspCode != TSDB_CODE_SUCCESS) { | ||
| pTaskInfo->code = pLoadCtx->rspCode; | ||
| } else { | ||
| pInfo->pRsp = pLoadCtx->pRsp; | ||
| pLoadCtx->pRsp = NULL; | ||
| } | ||
| freeSysTableLoadCtx(pLoadCtx); | ||
|
|
||
| if (pTaskInfo->code) { | ||
| if (pInfo->rspCode != TSDB_CODE_SUCCESS) { | ||
| qError("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo), | ||
| pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code)); | ||
| pInfo->loadInfo.totalRows, tstrerror(pInfo->rspCode)); | ||
| pTaskInfo->code = pInfo->rspCode; | ||
| return NULL; | ||
| } | ||
|
|
||
|
|
@@ -5187,6 +5173,17 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo | |
| } | ||
| pInfo->epSet = pScanPhyNode->mgmtEpSet; | ||
| pInfo->readHandle = *(SReadHandle*)readHandle; | ||
|
|
||
| // Register pInfo in the per-file ref pool so that loadSysTableCallback can | ||
| // safely acquire/release it even after the operator has been destroyed. | ||
| (void)taosThreadOnce(&sysTableScanRefPoolOnce, initSysTableScanRefPool); | ||
| int64_t refId = taosAddRef(sysTableScanRefPool, pInfo); | ||
| if (refId < 0) { | ||
| qError("%s failed to add ref for sysTableScan since %s", GET_TASKID(pTaskInfo), tstrerror(terrno)); | ||
| code = terrno; | ||
| goto _error; | ||
| } | ||
| pInfo->self = refId; | ||
| } | ||
|
|
||
| pInfo->pSubTableListInfo = pTableListInfo; | ||
|
|
@@ -5236,7 +5233,13 @@ void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNo | |
| } | ||
| } | ||
|
|
||
| void destroySysScanOperator(void* param) { | ||
| // doDestroySysTableScanInfo: actual teardown for SSysTableScanInfo. | ||
| // For operators that use the ref pool (MNode path, self > 0), this function | ||
| // is the pool destructor invoked automatically when the last ref is dropped | ||
| // via taosRemoveRef / taosReleaseRef — do NOT call it directly on those. | ||
| // For local-scan operators (self == 0), destroySysScanOperator calls it | ||
| // directly since there is no ref pool involved. | ||
| static void doDestroySysTableScanInfo(void* param) { | ||
| SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param; | ||
| int32_t code = tsem_destroy(&pInfo->ready); | ||
| if (code != TSDB_CODE_SUCCESS) { | ||
|
|
@@ -5284,32 +5287,73 @@ void destroySysScanOperator(void* param) { | |
| taosMemoryFreeClear(param); | ||
| } | ||
|
|
||
| int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) { | ||
| SSysTableLoadCtx* pCtx = (SSysTableLoadCtx*)param; | ||
| if (pCtx == NULL) { | ||
| // destroySysScanOperator: operator destroy callback. For operators that use | ||
| // the ref pool (MNode path), we just remove our reference — actual cleanup | ||
| // happens inside doDestroySysTableScanInfo when all refs are released. For | ||
| // local-scan operators (no ref pool entry), do the teardown inline. | ||
| static void destroySysScanOperator(void* param) { | ||
| SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param; | ||
| if (pInfo->self > 0) { | ||
| // MNode path: remove the operator's own ref; the pool calls | ||
| // doDestroySysTableScanInfo when all refs (operator + any in-flight | ||
| // callbacks) are dropped. | ||
| int32_t refCode = taosRemoveRef(sysTableScanRefPool, pInfo->self); | ||
| if (refCode != TSDB_CODE_SUCCESS) { | ||
| qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(refCode)); | ||
| } | ||
| } else { | ||
| // Local scan path: no ref pool — destroy directly. | ||
| doDestroySysTableScanInfo(pInfo); | ||
| } | ||
| } | ||
|
|
||
| static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) { | ||
| SSysTableScanCbParam* pWrapper = (SSysTableScanCbParam*)param; | ||
|
|
||
| // Acquire the SSysTableScanInfo from the ref pool. If it returns NULL the | ||
| // operator has already been destroyed — discard the response safely. | ||
| SSysTableScanInfo* pInfo = (SSysTableScanInfo*)taosAcquireRef(sysTableScanRefPool, pWrapper->sysTableScanId); | ||
| if (pInfo == NULL) { | ||
| // Operator is gone; free the response payload and bail out. | ||
| taosMemoryFree(pMsg->pData); | ||
| taosMemoryFree(pMsg->pEpSet); | ||
| return TSDB_CODE_SUCCESS; | ||
| } | ||
|
|
||
| if (TSDB_CODE_SUCCESS == code) { | ||
| pCtx->pRsp = pMsg->pData; | ||
| pMsg->pData = NULL; | ||
| pCtx->rspCode = TSDB_CODE_SUCCESS; | ||
| pInfo->pRsp = pMsg->pData; | ||
|
|
||
| SRetrieveMetaTableRsp* pRsp = pCtx->pRsp; | ||
| SRetrieveMetaTableRsp* pRsp = pInfo->pRsp; | ||
| pRsp->numOfRows = htonl(pRsp->numOfRows); | ||
| pRsp->useconds = htobe64(pRsp->useconds); | ||
| pRsp->handle = htobe64(pRsp->handle); | ||
| pRsp->compLen = htonl(pRsp->compLen); | ||
| } else { | ||
|
kailixu marked this conversation as resolved.
|
||
| pCtx->rspCode = rpcCvtErrCode(code); | ||
| if (pCtx->rspCode != code) { | ||
| qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code), tstrerror(pCtx->rspCode)); | ||
| int32_t cvtCode = rpcCvtErrCode(code); | ||
| if (cvtCode != code) { | ||
| qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code), tstrerror(cvtCode)); | ||
| } else { | ||
| qError("load systable rsp received, error:%s", tstrerror(code)); | ||
| } | ||
| pInfo->rspCode = cvtCode; | ||
| taosMemoryFree(pMsg->pData); | ||
| } | ||
| taosMemoryFree(pMsg->pEpSet); | ||
|
|
||
| int32_t res = tsem_post(&pCtx->ready); | ||
| // Release our acquired ref BEFORE posting the semaphore. | ||
| // If we post first, the waiter can race ahead: task completes → taosRemoveRef | ||
| // drops the count to 1, then doDestroyTask frees the task memory pool (which | ||
| // owns pInfo). Our subsequent taosReleaseRef would then drop the count to 0 | ||
| // and call doDestroySysTableScanInfo on already-freed memory. | ||
| // By releasing first (count 2→1, destructor not triggered), pInfo remains | ||
| // valid for the tsem_post call below, and doDestroySysTableScanInfo is | ||
| // called only later, inside destroySysScanOperator, when pInfo is still live. | ||
| int32_t refCode = taosReleaseRef(sysTableScanRefPool, pWrapper->sysTableScanId); | ||
| if (refCode != TSDB_CODE_SUCCESS) { | ||
| qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(refCode)); | ||
| } | ||
|
|
||
| int32_t res = tsem_post(&pInfo->ready); | ||
| if (res != TSDB_CODE_SUCCESS) { | ||
| qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(res)); | ||
| } | ||
|
kailixu marked this conversation as resolved.
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.