feat: Support Expanding OR Conditions in INNER JOIN into Multiple Mutually Exclusive Branches#22370
feat: Support Expanding OR Conditions in INNER JOIN into Multiple Mutually Exclusive Branches#22370xiedeyantu wants to merge 1 commit into
Conversation
…ually Exclusive Branches
| #[derive(Default, Debug)] | ||
| pub struct ExpandJoinOrPredicate; | ||
|
|
||
| impl ExpandJoinOrPredicate { |
There was a problem hiding this comment.
Can we avoid a new rule for this?
There was a problem hiding this comment.
I see this as a relatively standalone piece of logic, and I haven’t yet found a clean way to hook into it.
Another concern is that I’m not sure whether DataFusion can reliably obtain the row counts from both sides of a join—during testing, I noticed that when both tables are very small (say, ~10 rows), NestedLoopJoin can actually be faster than HashJoin + Union. However, once both tables grow larger (e.g., 1000+ rows), the rewrite shows very significant gains.
Could you walk me through your concerns in more detail? That would help me figure out how to improve it.
There was a problem hiding this comment.
The concern mostly is makung the optimizer list longer and writing more passes than necessary. If it would fit in an existing planning / optimization pass it would be good.
There was a problem hiding this comment.
The only solution I can think of right now is to add this capability to the existing extract_equijoin_predicate rules. However, strictly speaking, they do completely different things, and the code logic isn't reusable. But if you agree, I can modify it this way. Or please feel free to let me know if you have any better suggestions.
There was a problem hiding this comment.
I am wonder if instead using union + duplicating scan / subquery we can try one of the following approaches:
-
Disjoint hash join operator (supporting multiple branches) this would also work for non-mutually exclusive ones => This would probably be a big/complex feature.
-
Add a new operator that "expands" the input data for multiple output operators (two hash joins) and unions them together later (e.g.
ExpandExec -> [HashJoinExec(a==b), HashJoinExec(c==d)] -> UnionExec). This avoids the duplicated scan / subquery which might be (much) slower in some cases. I think this will work pretty well in general and we probably could enable it by default?
There was a problem hiding this comment.
I think approach 1 means adding a new specialized hash join operator for ORed equality conditions:
SELECT *
FROM t1
JOIN t2
ON (t1.v1 = t2.v1) OR (t1.v2 = t2.v2)We could add a specialized DisjointHashJoinExec operator. The core logic would look like:
- Buffer the build side and build hash tables for
t1.v1andt1.v2. - Probe the other side. For each probe row, probe both hash tables, union and deduplicate the matched build-side row indices, and then materialize the joined rows.
The benefits are:
- No repeated scans, which can be expensive for Parquet.
- Can also support outer/semi/mark joins
- Simpler planning.
I think this approach would be relatively straightforward to implement after:
I'm looking for help reviewing this feature 🎣 , would also be happy to help with follow-up work, such as extending this idea into a disjoint equi-join implementation.
There was a problem hiding this comment.
Yeah that is exactly what I mean, thanks @2010YOUY01 for explaining fully.
There was a problem hiding this comment.
@2010YOUY01 Thank you so much for your detailed explanation of this logic! I think it's a very good idea, and for inner joins, this implementation is optimal, completing the entire logic directly at the physical execution layer. Regarding my current proposal, I support this implementation. However, since I'm not entirely clear on the execution-level logic, implementing it using DisjointHashJoinExec might take some time.
Actually, there's 2 PRs(apache/calcite#4300, apache/calcite#4315) I submitted to Calcite, This rule file (https://github.com/apache/calcite/blob/main/core/src/main/java/org/apache/calcite/rel/rules/JoinExpandOrToUnionRule.java) will be more intuitive. where I implemented inner/left/right/full/anti joins (I didn't implement semi-join because its semantics are not easily split into multiple mutually exclusive joins). Their methods for splitting multiple join branches differ (refer to the comments in the connection code above). This isn't easily implemented using DisjointHashJoinExec; for example, left joins would be split into inner and anti joins. I limited the PR to inner joins because I wanted to implement the first step first, as the performance improvement was significant when testing joins of two tables (1000+ rows). I can't construct SQL to test other scenarios yet, as they all involve anti joins. If everyone accepts this solution, I will expand it to support more join types later. This is why I implemented it as a separate rule. There's another reason, which I also mentioned above: when the table only has 10 rows of data, the overhead becomes apparent, slowing down this optimization. Therefore, parameters or statistical information can help decide whether to rewrite it.
Regarding your first question, I think we can achieve scan reuse at either the logical or physical layer; we don't need to worry too much about rewriting the logic.
Regarding the second question, I think it might be difficult for us to do, perhaps because my understanding of DataFusion's execution layer is still limited.
Regarding the third question, similar to the first, the complexity of the plan is not necessarily directly related to the actual execution logic or performance.
This is just my personal opinion; please correct me if I'm wrong. Thank you very much for participating in the discussion.
If we're only considering inner joins (without expanding to other join types), I personally like the second solution @Dandandan mentioned. @2010YOUY01 If you've already implemented it, then I'll close this PR. If you're interested in further expansions of this PR, I can implement these capabilities through multiple PRs. Looking forward to your reply!
There was a problem hiding this comment.
@2010YOUY01 Thank you so much for your detailed explanation of this logic! I think it's a very good idea, and for inner joins, this implementation is optimal, completing the entire logic directly at the physical execution layer. Regarding my current proposal, I support this implementation. However, since I'm not entirely clear on the execution-level logic, implementing it using DisjointHashJoinExec might take some time.
This might not be obvious to everyone yet, but with #21983, extending this into a new specialized join operator should be easy, and it would automatically support outer/semi/... join types.
I can show this with a PoC later. Once we have that version, we can decide which approach to take.
There was a problem hiding this comment.
@2010YOUY01 Thank you for your great idea. I look forward to your solution and will also study your existing solutions.
Which issue does this PR close?
Rationale for this change
This PR adds a logical optimizer rule for
INNER JOINpredicates of the formcond1 OR cond2 ..., inspired by Calcite PR 4300.When each OR branch can be decomposed into hashjoin-capable equality predicates, DataFusion can rewrite the join into a
UNION ALLof mutually exclusive inner joins. This allows each branch to be planned as a normal equijoin instead of leaving the full disjunction in the join filter.To preserve SQL correctness for nullable expressions, branch exclusivity is expressed with
IS NOT TRUErather than plainNOTor<>.What changes are included in this PR?
expand_join_or_predicate.INNER JOIN.UNION ALLbranches.join.onkeys when adding keys extracted from OR branches.Union.EXPLAIN VERBOSEsqllogictest expectations for the new optimizer pass.eliminate_cross_joinexposed by the new rewrite.Are these changes tested?
Yes.
Added sqllogictest coverage to verify that:
NULLValidated with:
cargo test -p datafusion-optimizer expand_join_or_predicate --libcargo test -p datafusion-sqllogictest --test sqllogictests -- join_only.slt explain.sltAre there any user-facing changes?
No public API changes are included in this PR.
This change only affects logical optimization for eligible
INNER JOIN ... ON ... OR ...queries.