diff --git a/src/Processors/Transforms/Streaming/JoinTransform.cpp b/src/Processors/Transforms/Streaming/JoinTransform.cpp index 597acd47e3d..ae2019124f2 100644 --- a/src/Processors/Transforms/Streaming/JoinTransform.cpp +++ b/src/Processors/Transforms/Streaming/JoinTransform.cpp @@ -40,6 +40,10 @@ JoinTransform::JoinTransform( range_bidirectional_hash_join = join->rangeBidirectionalHashJoin(); bidirectional_hash_join = join->bidirectionalHashJoin(); + + /// Only the non-bidirectional (data-enrichment) path probes an unbuffered left side against the + /// right build-side hash table, so only it needs the historical-backfill ordering gate. + gate_left_on_right_backfill = !bidirectional_hash_join && !range_bidirectional_hash_join; } IProcessor::Status JoinTransform::prepare() @@ -71,6 +75,16 @@ IProcessor::Status JoinTransform::prepare() for (size_t i = 0; auto & input_port_with_data : input_ports_with_data) { + /// Hold the left input (index 0) while its historical-backfill rows must wait for the right + /// (build) side to finish its historical backfill. We neither pull nor mark it ready, applying + /// backpressure so the rows stay upstream (bounded memory) and are processed in order once the + /// right side is ready. The right input keeps flowing so its backfill can complete. + if (i == 0 && leftHistoricalDataGated()) + { + ++i; + continue; + } + if (input_port_with_data.input_chunk) { /// In case, this input port request checkpoint, so we need wait for other inputs @@ -134,6 +148,11 @@ void JoinTransform::work() auto & input_chunk = input_ports_with_data[i].input_chunk; if (input_chunk) { + /// Track historical-backfill progress so the left side's historical rows can be held + /// until the right (build) side has finished backfilling (see leftHistoricalDataGated()). + if (gate_left_on_right_backfill) + trackHistoricalBackfill(i, input_chunk); + /// If any input needs to update data, currently the input is always two consecutive chunks with _tp_delta `-1 and +1` /// So we have to process them together before processing another input /// NOTE: Assume the first retracted chunk of updated data always set RetractedDataFlag. @@ -233,6 +252,43 @@ inline bool JoinTransform::setupWatermark(Chunk & chunk, int64_t local_watermark return false; } +void JoinTransform::trackHistoricalBackfill(size_t input_index, const Chunk & chunk) +{ + /// The source emits the HISTORICAL_DATA_START / END marks as separate, empty chunks (and they are + /// propagated downstream by the transforms on the way to the join), bracketing each side's + /// historical backfill data. + if (input_index == 0) + { + /// Left side: remember whether we are currently inside the left input's historical backfill. + if (chunk.isHistoricalDataStart()) + left_in_historical_backfill = true; + else if (chunk.isHistoricalDataEnd()) + left_in_historical_backfill = false; + + return; + } + + /// Right (build) side: detect when its historical backfill has completed. + if (right_historical_backfill_done) + return; + + if (chunk.isHistoricalDataStart()) + right_backfill_started = true; + else if (right_backfill_started) + { + if (chunk.isHistoricalDataEnd()) + right_historical_backfill_done = true; + /// else: a right historical data chunk, keep building the hash table. + } + else + { + /// The first right chunk is not a START marker, so the right side performs no historical + /// backfill (pure live source, or no historical data). There is nothing to wait for, so do + /// not gate the left side (avoids stalling/deadlocking when no END marker will ever arrive). + right_historical_backfill_done = true; + } +} + inline void JoinTransform::doJoin(Chunks chunks) { if (range_bidirectional_hash_join) diff --git a/src/Processors/Transforms/Streaming/JoinTransform.h b/src/Processors/Transforms/Streaming/JoinTransform.h index 7a1246dcf66..11653b3761d 100644 --- a/src/Processors/Transforms/Streaming/JoinTransform.h +++ b/src/Processors/Transforms/Streaming/JoinTransform.h @@ -68,6 +68,30 @@ class JoinTransform final : public IProcessor bool range_bidirectional_hash_join = false; bool bidirectional_hash_join = false; + /// Historical-backfill ordering for the non-bidirectional (data-enrichment, e.g. INNER/LEFT + /// LATEST / ASOF) join path. In that path the left side probes the right (build-side) hash table + /// and is NOT buffered, so a left historical row that probes before the matching right historical + /// row has been inserted is dropped permanently. When both sides backfill from history (e.g. + /// `seek_to='earliest'`) whichever side the executor happens to schedule first decides the result, + /// which is non-deterministic. To make it deterministic we hold the left side's historical rows + /// until the right side's historical backfill has completed. Pure live joins emit no historical + /// markers, so `left_in_historical_backfill` is never set there and the gate stays disabled. + /// These are one-shot startup flags (the right side does not re-backfill after recovery), so they + /// are intentionally not part of the checkpoint state. + NO_SERDE bool gate_left_on_right_backfill = false; + NO_SERDE bool left_in_historical_backfill = false; + NO_SERDE bool right_backfill_started = false; + NO_SERDE bool right_historical_backfill_done = false; + + /// True while a left historical-backfill row must wait for the right side's historical backfill. + bool leftHistoricalDataGated() const + { + return gate_left_on_right_backfill && left_in_historical_backfill && !right_historical_backfill_done; + } + + /// Update the historical-backfill flags from a chunk observed on input `input_index` (0=left, 1=right). + void trackHistoricalBackfill(size_t input_index, const Chunk & chunk); + size_t transform_id; [[maybe_unused]] std::shared_ptr non_joined_blocks; [[maybe_unused]] size_t max_block_size; diff --git a/tests/stream/test_stream_smoke/0099_fixed_issues.json b/tests/stream/test_stream_smoke/0099_fixed_issues.json index e4fb0ceebad..ca6987d5507 100644 --- a/tests/stream/test_stream_smoke/0099_fixed_issues.json +++ b/tests/stream/test_stream_smoke/0099_fixed_issues.json @@ -695,6 +695,32 @@ "expected_results":[[1, 10], [3, 20], [3, 30]] } ] + }, + { + "id": 24, + "tags": ["stream join stream", "latest join", "backfill"], + "name": "#1188", + "description": "streaming inner latest join with seek_to='earliest' must deterministically emit a left backfill row whose matching right key is already in history (the matching right row is inserted last, behind many filler rows, so the left side would race ahead and drop it without the fix)", + "steps":[ + { + "statements": [ + {"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists jr_left_1188"}, + {"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists jr_right_1188"}, + {"client":"python", "query_type": "table", "exist":"jr_left_1188", "exist_wait":2, "wait":1, "query":"create stream jr_left_1188(id int64, v int)"}, + {"client":"python", "query_type": "table", "exist":"jr_right_1188", "exist_wait":2, "wait":1, "query":"create stream jr_right_1188(id int64, label string)"}, + {"client":"python", "query_type": "table", "depends_on_stream":"jr_right_1188", "wait":1, "query":"insert into jr_right_1188(id, label) select number+2, 'filler' from numbers(2000)"}, + {"client":"python", "query_type": "table", "wait":1, "query":"insert into jr_right_1188(id, label) values (1, 'active')"}, + {"client":"python", "query_type": "table", "depends_on_stream":"jr_left_1188", "wait":2, "query":"insert into jr_left_1188(id, v) values (1, 15)"}, + {"client":"python", "query_id":"11880", "depends_on_stream":"jr_right_1188", "query_end_timer":5, "query_type": "stream", "query":"select l.id, l.v, r.label from jr_left_1188 as l inner latest join (select id, label from jr_right_1188) as r on l.id = r.id settings seek_to='earliest', max_threads=1, backfill_max_threads=1"} + ] + } + ], + "expected_results": [ + { + "query_id":"11880", + "expected_results":[[1, 15, "active"]] + } + ] } ] }