Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 46 additions & 7 deletions include/common/streamMsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -677,13 +677,13 @@ typedef enum ESTriggerPullType {
STRIGGER_PULL_FIRST_TS,
STRIGGER_PULL_TSDB_META,
STRIGGER_PULL_TSDB_META_NEXT,
STRIGGER_PULL_TSDB_TS_DATA,
STRIGGER_PULL_TSDB_TRIGGER_DATA,
STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT,
STRIGGER_PULL_TSDB_CALC_DATA,
STRIGGER_PULL_TSDB_CALC_DATA_NEXT,
STRIGGER_PULL_TSDB_DATA, //10
STRIGGER_PULL_TSDB_DATA_NEXT,
STRIGGER_PULL_TSDB_TS_DATA, // DEPRECATED: replaced by STRIGGER_PULL_TSDB_DATA_DIFF_RANGE etc; remove after trigger side migration
STRIGGER_PULL_TSDB_TRIGGER_DATA, // DEPRECATED: same as above
STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT, // DEPRECATED: same as above
STRIGGER_PULL_TSDB_CALC_DATA, // DEPRECATED: same as above
STRIGGER_PULL_TSDB_CALC_DATA_NEXT, // DEPRECATED: same as above
STRIGGER_PULL_TSDB_DATA, //10 // DEPRECATED: replaced by STRIGGER_PULL_TSDB_DATA_DIFF_RANGE / STRIGGER_PULL_TSDB_DATA_SAME_RANGE; remove after trigger side migration
STRIGGER_PULL_TSDB_DATA_NEXT, // DEPRECATED: same as above
STRIGGER_PULL_GROUP_COL_VALUE,
STRIGGER_PULL_VTABLE_INFO,
STRIGGER_PULL_VTABLE_PSEUDO_COL,
Expand All @@ -692,6 +692,16 @@ typedef enum ESTriggerPullType {
STRIGGER_PULL_WAL_DATA_NEW,
STRIGGER_PULL_WAL_META_DATA_NEW,
STRIGGER_PULL_WAL_CALC_DATA_NEW,
// === Added for history-data pull optimization ===
STRIGGER_PULL_TSDB_DATA_DIFF_RANGE,
STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_NEXT,
STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC,
STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC_NEXT,
STRIGGER_PULL_TSDB_DATA_SAME_RANGE,
STRIGGER_PULL_TSDB_DATA_SAME_RANGE_NEXT,
STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC,
STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC_NEXT,
STRIGGER_PULL_SET_TABLE_HISTORY, // Same as STRIGGER_PULL_SET_TABLE, but writes into the *History fields
STRIGGER_PULL_TYPE_MAX,
} ESTriggerPullType;

Expand All @@ -709,6 +719,33 @@ typedef struct SSTriggerSetTableRequest {
SSHashObj* uidInfoCalc; // < uid->SHashObj<slotId->colId> >
} SSTriggerSetTableRequest;

typedef struct SSTriggerTableTimeRange {
int64_t suid; // 0 for non-virtual tables; for virtual tables it is the suid of the uid
int64_t uid;
int64_t skey;
int64_t ekey;
} SSTriggerTableTimeRange;

typedef struct SSTriggerTsdbDataDiffRangeRequest {
SSTriggerPullRequest base;
int64_t ver;
int8_t order; // 1 asc, 2 desc
SArray* ranges; // SArray<SSTriggerTableTimeRange>
} SSTriggerTsdbDataDiffRangeRequest;

typedef struct SSTriggerTsdbDataSameRangeRequest {
SSTriggerPullRequest base;
int64_t ver;
int64_t gid; // 0 means all tables; non-zero means a single group
int64_t skey;
int64_t ekey;
int8_t order;
} SSTriggerTsdbDataSameRangeRequest;

// _NEXT variants carry no extra fields; reuse SSTriggerPullRequest directly.
typedef SSTriggerPullRequest SSTriggerTsdbDataDiffRangeNextRequest;
typedef SSTriggerPullRequest SSTriggerTsdbDataSameRangeNextRequest;

typedef struct SSTriggerLastTsRequest {
SSTriggerPullRequest base;
} SSTriggerLastTsRequest;
Expand Down Expand Up @@ -867,6 +904,8 @@ typedef union SSTriggerPullRequestUnion {
SSTriggerVirTableInfoRequest virTableInfoReq;
SSTriggerVirTablePseudoColRequest virTablePseudoColReq;
SSTriggerOrigTableInfoRequest origTableInfoReq;
SSTriggerTsdbDataDiffRangeRequest tsdbDataDiffRangeReq;
SSTriggerTsdbDataSameRangeRequest tsdbDataSameRangeReq;
} SSTriggerPullRequestUnion;

int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTriggerPullRequest* pReq);
Expand Down
19 changes: 17 additions & 2 deletions include/libs/new-stream/streamReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ typedef struct StreamTableListInfo {
int64_t version;
} StreamTableListInfo;

/* ------------------------------------------------------------------ */
/* SDiffRangeIter — iterator state for DiffRange pulls */
/* ------------------------------------------------------------------ */



typedef struct SStreamTriggerReaderInfo {
void* pTask;
int32_t order;
Expand All @@ -59,6 +65,7 @@ typedef struct SStreamTriggerReaderInfo {
SNodeList* triggerCols;
SNodeList* triggerPseudoCols;
SHashObj* streamTaskMap;
SHashObj* streamTaskMapHistory; /* per-key map for SDiffRangeIter; freeFp set in vnodeStream.c */
SHashObj* groupIdMap;
SSubplan* triggerAst;
SSubplan* calcAst;
Expand All @@ -73,8 +80,10 @@ typedef struct SStreamTriggerReaderInfo {
int32_t numOfExprTriggerTag;
SExprInfo* pExprInfoCalcTag;
int32_t numOfExprCalcTag;
SSHashObj* uidHashTrigger; // < uid -> SHashObj < slotId -> colId > >
SSHashObj* uidHashCalc; // < uid -> SHashObj < slotId -> colId > >
SSHashObj* uidHashTrigger; // < uid -> SHashObj < slotId -> colId > >
SSHashObj* uidHashCalc; // < uid -> SHashObj < slotId -> colId > >
SSHashObj* uidHashTriggerHistory; // history version, TSDB path (vtable only)
SSHashObj* uidHashCalcHistory; // history version, TSDB path (vtable only)
void* historyTableList;
SFilterInfo* pFilterInfo;
SHashObj* pTableMetaCacheTrigger;
Expand All @@ -88,6 +97,7 @@ typedef struct SStreamTriggerReaderInfo {

StreamTableListInfo tableList;
StreamTableListInfo vSetTableList;
StreamTableListInfo vSetTableListHistory; // Dedicated to the TSDB history path for virtual-table streams

} SStreamTriggerReaderInfo;

Expand Down Expand Up @@ -140,6 +150,7 @@ void qStreamSetTaskRunning(int64_t streamId, int64_t taskId);
int32_t streamBuildFetchRsp(SArray* pResList, bool hasNext, void** data, size_t* size, int8_t precision);

int32_t qBuildVTableList(SStreamTriggerReaderInfo* sStreamReaderInfo);
int32_t qBuildVTableListHistory(SStreamTriggerReaderInfo* sStreamReaderInfo);

int32_t createStreamTask(void* pVnode, SStreamOptions* options, SStreamReaderTaskInner** ppTask,
SSDataBlock* pResBlock, STableKeyInfo* pList, int32_t pNum, SStorageAPI* storageApi);
Expand All @@ -148,12 +159,16 @@ int32_t createStreamTaskForTs(SStreamOptions* options, SStreamReaderTaskInner**

int32_t initStreamTableListInfo(StreamTableListInfo* pTableListInfo);
int32_t qStreamGetTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, uint64_t gid, STableKeyInfo** pKeyInfo, int32_t* size);
int32_t qStreamGetTableListHistory(SStreamTriggerReaderInfo* sStreamReaderInfo, uint64_t gid, STableKeyInfo** pKeyInfo, int32_t* size); // Added: virtual-table stream history path
void qStreamDestroyTableInfo(StreamTableListInfo* pTableListInfo);
void qStreamClearTableInfo(StreamTableListInfo* pTableListInfo);
int32_t qStreamCopyTableInfo(SStreamTriggerReaderInfo* sStreamReaderInfo, StreamTableListInfo* dst);
int32_t qStreamCopyTableInfoHistory(SStreamTriggerReaderInfo* sStreamReaderInfo, StreamTableListInfo* dst); // Added: virtual-table stream history path
int32_t qStreamSetTableList(StreamTableListInfo* pTableListInfo, int64_t uid, uint64_t gid);
int32_t qStreamGetTableListGroupNum(SStreamTriggerReaderInfo* sStreamReaderInfo);
int32_t qStreamGetTableListGroupNumHistory(SStreamTriggerReaderInfo* sStreamReaderInfo); // Added: virtual-table stream history path
int32_t qStreamGetTableListNum(SStreamTriggerReaderInfo* sStreamReaderInfo);
int32_t qStreamGetTableListNumHistory(SStreamTriggerReaderInfo* sStreamReaderInfo); // Added: virtual-table stream history path
SArray* qStreamGetTableArrayList(SStreamTriggerReaderInfo* sStreamReaderInfo);
int32_t qStreamIterTableList(StreamTableListInfo* sStreamReaderInfo, STableKeyInfo** pKeyInfo, int32_t* size, int64_t* suid);
uint64_t qStreamGetGroupIdFromOrigin(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid);
Expand Down
86 changes: 83 additions & 3 deletions source/common/src/msg/streamMsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -3095,10 +3095,18 @@ void tDestroySTriggerPullRequest(SSTriggerPullRequestUnion* pReq) {
taosArrayDestroy(pRequest->cols);
pRequest->cols = NULL;
}
} else if (pReq->base.type == STRIGGER_PULL_SET_TABLE) {
} else if (pReq->base.type == STRIGGER_PULL_SET_TABLE ||
pReq->base.type == STRIGGER_PULL_SET_TABLE_HISTORY) {
SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
tSimpleHashCleanup(pRequest->uidInfoTrigger);
tSimpleHashCleanup(pRequest->uidInfoCalc);
} else if (pReq->base.type == STRIGGER_PULL_TSDB_DATA_DIFF_RANGE ||
pReq->base.type == STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC) {
SSTriggerTsdbDataDiffRangeRequest* pRequest = (SSTriggerTsdbDataDiffRangeRequest*)pReq;
if (pRequest->ranges != NULL) {
taosArrayDestroy(pRequest->ranges);
pRequest->ranges = NULL;
}
}
}

Expand Down Expand Up @@ -3182,7 +3190,8 @@ int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTrigger
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));

switch (pReq->type) {
case STRIGGER_PULL_SET_TABLE: {
case STRIGGER_PULL_SET_TABLE:
case STRIGGER_PULL_SET_TABLE_HISTORY: {
SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoTrigger));
TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoCalc));
Expand Down Expand Up @@ -3255,6 +3264,38 @@ int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTrigger
case STRIGGER_PULL_TSDB_DATA_NEXT: {
break;
}
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE:
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC: {
SSTriggerTsdbDataDiffRangeRequest* pRequest = (SSTriggerTsdbDataDiffRangeRequest*)pReq;
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
int32_t nRanges = (pRequest->ranges != NULL) ? taosArrayGetSize(pRequest->ranges) : 0;
TAOS_CHECK_EXIT(tEncodeI32(&encoder, nRanges));
for (int32_t i = 0; i < nRanges; i++) {
SSTriggerTableTimeRange* r = (SSTriggerTableTimeRange*)TARRAY_GET_ELEM(pRequest->ranges, i);
TAOS_CHECK_EXIT(tEncodeI64(&encoder, r->suid));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, r->uid));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, r->skey));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, r->ekey));
}
break;
}
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_NEXT:
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC_NEXT:
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_NEXT:
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC_NEXT: {
break;
}
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE:
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC: {
SSTriggerTsdbDataSameRangeRequest* pRequest = (SSTriggerTsdbDataSameRangeRequest*)pReq;
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
break;
}
case STRIGGER_PULL_WAL_META_NEW: {
SSTriggerWalMetaNewRequest* pRequest = (SSTriggerWalMetaNewRequest*)pReq;
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
Expand Down Expand Up @@ -3409,7 +3450,8 @@ int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPull
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->sessionId));

switch (type) {
case STRIGGER_PULL_SET_TABLE: {
case STRIGGER_PULL_SET_TABLE:
case STRIGGER_PULL_SET_TABLE_HISTORY: {
SSTriggerSetTableRequest* pRequest = &(pReq->setTableReq);
TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoTrigger));
TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoCalc));
Expand Down Expand Up @@ -3482,6 +3524,44 @@ int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPull
case STRIGGER_PULL_TSDB_DATA_NEXT: {
break;
}
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE:
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC: {
SSTriggerTsdbDataDiffRangeRequest* pRequest = &(pReq->tsdbDataDiffRangeReq);
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
int32_t nRanges = 0;
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nRanges));
if (nRanges > 0) {
pRequest->ranges = taosArrayInit_s(sizeof(SSTriggerTableTimeRange), nRanges);
Comment on lines +3534 to +3537
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The nRanges value is read directly from the network buffer without validation. A very large value could lead to an excessive memory allocation in taosArrayInit_s, potentially causing an Out-Of-Memory (OOM) condition. Please add a check to enforce a reasonable upper limit on the number of ranges.

TSDB_CHECK_NULL(pRequest->ranges, code, lino, _exit, terrno);
for (int32_t i = 0; i < nRanges; i++) {
SSTriggerTableTimeRange* r = (SSTriggerTableTimeRange*)TARRAY_GET_ELEM(pRequest->ranges, i);
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &r->suid));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &r->uid));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &r->skey));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &r->ekey));
}
} else {
pRequest->ranges = NULL;
}
break;
}
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_NEXT:
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC_NEXT:
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_NEXT:
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC_NEXT: {
break;
}
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE:
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC: {
SSTriggerTsdbDataSameRangeRequest* pRequest = &(pReq->tsdbDataSameRangeReq);
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
break;
}
case STRIGGER_PULL_WAL_META_NEW: {
SSTriggerWalMetaNewRequest* pRequest = &(pReq->walMetaNewReq);
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
Expand Down
Loading
Loading