Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/Processors/Transforms/Streaming/JoinTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ JoinTransform::JoinTransform(

range_bidirectional_hash_join = join->rangeBidirectionalHashJoin();
bidirectional_hash_join = join->bidirectionalHashJoin();

/// Only the non-bidirectional (data-enrichment) path probes an unbuffered left side against the
/// right build-side hash table, so only it needs the historical-backfill ordering gate.
gate_left_on_right_backfill = !bidirectional_hash_join && !range_bidirectional_hash_join;
}

IProcessor::Status JoinTransform::prepare()
Expand Down Expand Up @@ -71,6 +75,16 @@ IProcessor::Status JoinTransform::prepare()

for (size_t i = 0; auto & input_port_with_data : input_ports_with_data)
{
/// Hold the left input (index 0) while its historical-backfill rows must wait for the right
/// (build) side to finish its historical backfill. We neither pull nor mark it ready, applying
/// backpressure so the rows stay upstream (bounded memory) and are processed in order once the
/// right side is ready. The right input keeps flowing so its backfill can complete.
if (i == 0 && leftHistoricalDataGated())
{
++i;
continue;
}

if (input_port_with_data.input_chunk)
{
/// In case, this input port request checkpoint, so we need wait for other inputs
Expand Down Expand Up @@ -134,6 +148,11 @@ void JoinTransform::work()
auto & input_chunk = input_ports_with_data[i].input_chunk;
if (input_chunk)
{
/// Track historical-backfill progress so the left side's historical rows can be held
/// until the right (build) side has finished backfilling (see leftHistoricalDataGated()).
if (gate_left_on_right_backfill)
trackHistoricalBackfill(i, input_chunk);

/// If any input needs to update data, currently the input is always two consecutive chunks with _tp_delta `-1 and +1`
/// So we have to process them together before processing another input
/// NOTE: Assume the first retracted chunk of updated data always set RetractedDataFlag.
Expand Down Expand Up @@ -233,6 +252,43 @@ inline bool JoinTransform::setupWatermark(Chunk & chunk, int64_t local_watermark
return false;
}

void JoinTransform::trackHistoricalBackfill(size_t input_index, const Chunk & chunk)
{
/// The source emits the HISTORICAL_DATA_START / END marks as separate, empty chunks (and they are
/// propagated downstream by the transforms on the way to the join), bracketing each side's
/// historical backfill data.
if (input_index == 0)
{
/// Left side: remember whether we are currently inside the left input's historical backfill.
if (chunk.isHistoricalDataStart())
left_in_historical_backfill = true;
else if (chunk.isHistoricalDataEnd())
left_in_historical_backfill = false;

return;
}

/// Right (build) side: detect when its historical backfill has completed.
if (right_historical_backfill_done)
return;

if (chunk.isHistoricalDataStart())
right_backfill_started = true;
else if (right_backfill_started)
{
if (chunk.isHistoricalDataEnd())
right_historical_backfill_done = true;
/// else: a right historical data chunk, keep building the hash table.
}
else
{
/// The first right chunk is not a START marker, so the right side performs no historical
/// backfill (pure live source, or no historical data). There is nothing to wait for, so do
/// not gate the left side (avoids stalling/deadlocking when no END marker will ever arrive).
right_historical_backfill_done = true;
}
}

inline void JoinTransform::doJoin(Chunks chunks)
{
if (range_bidirectional_hash_join)
Expand Down
24 changes: 24 additions & 0 deletions src/Processors/Transforms/Streaming/JoinTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,30 @@ class JoinTransform final : public IProcessor
bool range_bidirectional_hash_join = false;
bool bidirectional_hash_join = false;

/// Historical-backfill ordering for the non-bidirectional (data-enrichment, e.g. INNER/LEFT
/// LATEST / ASOF) join path. In that path the left side probes the right (build-side) hash table
/// and is NOT buffered, so a left historical row that probes before the matching right historical
/// row has been inserted is dropped permanently. When both sides backfill from history (e.g.
/// `seek_to='earliest'`) whichever side the executor happens to schedule first decides the result,
/// which is non-deterministic. To make it deterministic we hold the left side's historical rows
/// until the right side's historical backfill has completed. Pure live joins emit no historical
/// markers, so `left_in_historical_backfill` is never set there and the gate stays disabled.
/// These are one-shot startup flags (the right side does not re-backfill after recovery), so they
/// are intentionally not part of the checkpoint state.
NO_SERDE bool gate_left_on_right_backfill = false;
NO_SERDE bool left_in_historical_backfill = false;
NO_SERDE bool right_backfill_started = false;
NO_SERDE bool right_historical_backfill_done = false;

/// True while a left historical-backfill row must wait for the right side's historical backfill.
bool leftHistoricalDataGated() const
{
return gate_left_on_right_backfill && left_in_historical_backfill && !right_historical_backfill_done;
}

/// Update the historical-backfill flags from a chunk observed on input `input_index` (0=left, 1=right).
void trackHistoricalBackfill(size_t input_index, const Chunk & chunk);

size_t transform_id;
[[maybe_unused]] std::shared_ptr<NotJoinedBlocks> non_joined_blocks;
[[maybe_unused]] size_t max_block_size;
Expand Down
26 changes: 26 additions & 0 deletions tests/stream/test_stream_smoke/0099_fixed_issues.json
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,32 @@
"expected_results":[[1, 10], [3, 20], [3, 30]]
}
]
},
{
"id": 24,
"tags": ["stream join stream", "latest join", "backfill"],
"name": "#1188",
"description": "streaming inner latest join with seek_to='earliest' must deterministically emit a left backfill row whose matching right key is already in history (the matching right row is inserted last, behind many filler rows, so the left side would race ahead and drop it without the fix)",
"steps":[
{
"statements": [
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists jr_left_1188"},
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists jr_right_1188"},
{"client":"python", "query_type": "table", "exist":"jr_left_1188", "exist_wait":2, "wait":1, "query":"create stream jr_left_1188(id int64, v int)"},
{"client":"python", "query_type": "table", "exist":"jr_right_1188", "exist_wait":2, "wait":1, "query":"create stream jr_right_1188(id int64, label string)"},
{"client":"python", "query_type": "table", "depends_on_stream":"jr_right_1188", "wait":1, "query":"insert into jr_right_1188(id, label) select number+2, 'filler' from numbers(2000)"},
{"client":"python", "query_type": "table", "wait":1, "query":"insert into jr_right_1188(id, label) values (1, 'active')"},
{"client":"python", "query_type": "table", "depends_on_stream":"jr_left_1188", "wait":2, "query":"insert into jr_left_1188(id, v) values (1, 15)"},
{"client":"python", "query_id":"11880", "depends_on_stream":"jr_right_1188", "query_end_timer":5, "query_type": "stream", "query":"select l.id, l.v, r.label from jr_left_1188 as l inner latest join (select id, label from jr_right_1188) as r on l.id = r.id settings seek_to='earliest', max_threads=1, backfill_max_threads=1"}
]
}
],
"expected_results": [
{
"query_id":"11880",
"expected_results":[[1, 15, "active"]]
}
]
}
]
}