Skip to content

fix(streaming): order historical backfill in unoptimized LATEST join (#1188)#1189

Open
sofiane-soufi wants to merge 1 commit into
timeplus-io:developfrom
sofiane-soufi:bug/1188-streaming-inner-latest-join-backfill-drop
Open

fix(streaming): order historical backfill in unoptimized LATEST join (#1188)#1189
sofiane-soufi wants to merge 1 commit into
timeplus-io:developfrom
sofiane-soufi:bug/1188-streaming-inner-latest-join-backfill-drop

Conversation

@sofiane-soufi

Copy link
Copy Markdown
Contributor

Summary

Fixes #1188.

A streaming INNER/LEFT LATEST JOIN (more generally, any non-bidirectional data-enrichment join: LATEST, ASOF, and stream JOIN table) that backfills both sides from history (e.g. SETTINGS seek_to='earliest') could silently and non-deterministically drop left rows whose matching right key was already present in history. No error was raised — the query just emitted zero rows instead of the matching row. The same data joined in historical table() mode always matched, so this was a streaming-only correctness gap. It reproduced with both integer and floating-point join keys.

This PR makes the historical backfill deterministic: a left historical row whose matching right key exists in history before the query starts is now emitted on every run.

Root cause

In the non-bidirectional (data-enrichment) join path, the left side probes the right (build-side) hash table and is not buffered — a left row that finds no match is discarded immediately and forever (see the existing comment in JoinTransform::doJoin: "right stream data only changes won't trigger join since left stream data is not buffered").

JoinTransform::prepare() pulls the left and right input ports concurrently with no ordering between them. With seek_to='earliest' both sides backfill from history independently. If the executor schedules the left side's historical rows to be probed before the right side's matching historical rows have been inserted into the hash table, the INNER join emits nothing for them and they are lost permanently. Whichever side the executor happens to run first decides the result — hence the run-to-run non-determinism. It is independent of key type.

Fix

Add a one-shot historical-backfill ordering gate to JoinTransform, scoped to the non-bidirectional data-enrichment path only:

  • Hold the left side's historical-backfill rows until the right (build) side's historical backfill has completed.
  • The gate is armed by the left side's own HISTORICAL_DATA_START marker, so it is active before any left data chunk is pulled, and released by the right side's HISTORICAL_DATA_END marker. While gated, the left input is not pulled (backpressure), so held rows stay upstream (bounded memory) and are processed in order once the right hash table is fully populated.
  • Pure live joins are unaffected: they emit no historical markers, so the gate never engages. If the right side performs no historical backfill, its first (non-START) chunk releases the gate immediately, so the left side can never stall or deadlock.
  • The flags are one-shot startup state and are intentionally not part of the checkpoint, because the right side does not re-run historical backfill after recovery.

The markers are the same HISTORICAL_DATA_START/END empty-chunk signals already emitted by the storage layer and consumed elsewhere (e.g. AggregatingTransform, VersionsFilterTransform); they propagate through the per-side transforms to the join's input ports.

Files changed

  • src/Processors/Transforms/Streaming/JoinTransform.h — gate state + helper.
  • src/Processors/Transforms/Streaming/JoinTransform.cpp — arm/release logic in prepare()/work() (non-bidirectional path only).
  • tests/stream/test_stream_smoke/0099_fixed_issues.json — regression test.

Behavior & compatibility

  • Deterministic for the reported case: the matching row is emitted on every run.
  • No change to: streaming-vs-streaming live joins, the optimized/parallel (ConcurrentHashJoin) path, bidirectional joins (ALL, range, etc.), other strictness/kinds, and historical table() mode.
  • LEFT LATEST JOIN backfill remains correct: matched rows carry the right value and unmatched left rows still emit with NULL right columns.

Testing

Built before/after and compared on real binaries using the issue's reproduction (max_threads=1, backfill_max_threads=1):

Scenario Before (develop) After
Minimal repro, int64 key 9/16 runs dropped the row 16/16 emitted
Minimal repro, float64 key dropped on a large fraction of runs 12/12 emitted
Regression-test scenario (matching right key inserted last, behind 2000 filler rows) 10/10 dropped 10/10 emitted

No regressions observed in: LEFT LATEST JOIN backfill (matched + null rows), live INNER LATEST JOIN (identical output before/after), bidirectional date_diff_within backfill join, and historical table() join.

The added regression test (fixed_issues case #1188) inserts the matching right key last, behind many filler rows, so the left side reliably races ahead: it deterministically drops the row without the fix and emits it with the fix.

PR checklist

  • Did you run ClangFormat? — yes (clean on changed files).
  • Did you separate headers to a different section in existing community code base? — N/A; changed files are Proton-original (namespace DB::Streaming).
  • Did you surround proton: starts/ends for new code in existing community code base? — N/A; changed files are Proton-original, which are not fenced.

…imeplus-io#1188)

A streaming data-enrichment join (INNER/LEFT LATEST, ASOF, stream-join-table)
probes the left side against the right build-side hash table without buffering
the left. When both sides backfill from history (e.g. seek_to='earliest'),
JoinTransform pulled both inputs with no ordering, so a left historical row
whose matching right key already exists in history could probe before that
right row was inserted and be dropped permanently. Whichever side the executor
scheduled first decided the result, making the loss silent and non-deterministic
(it reproduced for both integer and floating-point keys). The same data joined
in historical table() mode always matched.

Fix: in the non-bidirectional path, hold the left side's historical-backfill
rows until the right side's historical backfill has completed. The gate engages
on the left's own HISTORICAL_DATA_START marker (so it is armed before any left
data is pulled) and releases on the right's HISTORICAL_DATA_END marker, applying
backpressure so the held rows stay upstream (bounded memory) and are processed
in order once the right hash table is fully populated. Pure live joins emit no
historical markers, so the gate never engages there; if the right side performs
no historical backfill its first (non-START) chunk releases the gate, so it can
never stall. The flags are one-shot startup state and are not checkpointed (the
right side does not re-backfill after recovery).

Add a regression test (test_stream_smoke fixed_issues timeplus-io#1188) that inserts the
matching right key last behind many filler rows so the left races ahead: it
deterministically drops the row without the fix and emits it with the fix.
@yl-lisen

yl-lisen commented Jun 9, 2026

Copy link
Copy Markdown
Collaborator

thanks for your report, we already have a related fix PR #1190

@chenziliang

Copy link
Copy Markdown
Collaborator

Hi @sofiane-soufi we have already this in enterprise, i am curious why we still have this PR here: is there any gap there ? it is superseded by #1190

@chenziliang

Copy link
Copy Markdown
Collaborator

@sofiane-soufi could you please rebase ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Streaming INNER LATEST JOIN with seek_to='earliest' non-deterministically drops backfill rows that should match

3 participants