[multistage] add lookup join support to physical optimizer#18158
[multistage] add lookup join support to physical optimizer#18158dang-stripe wants to merge 2 commits intoapache:masterfrom
Conversation
| return rootNode; | ||
| } | ||
| return transform(rootNode, null); | ||
| } |
There was a problem hiding this comment.
i've skipped lite mode since it adds quite a bit of scope. the broker gets assigned as worker for the join fragment so we'd need reassign it to a server and handle routing.
|
thanks @dang-stripe. will take a look at this over the weekend. I am curious: have you seen any workloads where physical optimizer performs better than the other optimizer? |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18158 +/- ##
============================================
- Coverage 63.31% 63.19% -0.12%
- Complexity 1540 1616 +76
============================================
Files 3200 3214 +14
Lines 194221 195818 +1597
Branches 29929 30259 +330
============================================
+ Hits 122972 123752 +780
- Misses 61583 62176 +593
- Partials 9666 9890 +224
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
@ankitsultana thanks for looking! yep we're seeing around 50% reduction in p50/p75 broker latency for one of our workloads. this workload involves rather complex queries doing up to 5-6 joins + window operations. we both enabled physical optimizer for the queries and colocated segments for each partition to a single server, so we're seeing a large reduction in shuffle exchanges. |
Addresses: #17961
Problem
Using lookup joins in the physical optimizer via query hint
joinOptions(join_strategy='lookup')would fail with"Right input must be leaf operator". The optimizer inserted aBROADCAST_EXCHANGEon the dimension table side, splitting it into a separate fragment, butLookupJoinOperatorneeds the dimension table as aLeafOperatorin the same fragment because it expects to have access to aDimensionTableDataManager.Solution
This adds lookup join support to the V2 physical optimizer by:
LOOKUP_LOCAL_EXCHANGEwhich acts as a pseudo-exchange so the fragment is classified as a leaf stage and the dim table is visible in EXPLAIN plansLookupJoinRuleto isolate every lookup join in its own plan fragment with exchanges on all sides (IDENTITY_EXCHANGE above the join and for non-dim table, LOOKUP_LOCAL_EXCHANGE for dim table)graph TD subgraph before ["Before: Dim table split into separate fragment"] A0[Downstream operators] --> A1["PhysicalJoin<br/>join_strategy=lookup"] A1 --> B1[IDENTITY_EXCHANGE] A1 --> C1["IDENTITY_EXCHANGE<br/>❌ splits dim into separate fragment"] B1 --> D1[FactTableScan] C1 --> E1[DimTableScan] style C1 fill:#f88,stroke:#c00 style E1 fill:#f88,stroke:#c00 end subgraph after ["After: LookupJoinRule isolates the lookup join"] A2[Downstream operators] --> IX_ABOVE["IDENTITY_EXCHANGE<br/>(above — isolation)"] IX_ABOVE --> J2["PhysicalJoin<br/>join_strategy=lookup"] J2 --> IX_LEFT["IDENTITY_EXCHANGE<br/>(left — leaf boundary)"] J2 --> LLE["LOOKUP_LOCAL_EXCHANGE<br/>(right — pseudo, no split)"] IX_LEFT --> FS2[FactTableScan] LLE --> DS2[DimTableScan] style LLE fill:#8f8,stroke:#0a0 style IX_ABOVE fill:#8cf,stroke:#06c style IX_LEFT fill:#8cf,stroke:#06c end before ~~~ afterAnother alternative was to match MSE v1 behavior by skipping exchange on right side, but I didn't go down this path since omitting exchanges could break assumptions in other rules (every stage boundary has a PhysicalExchange) and we wouldn't get visibility in explain plans.
Future work
Testing
We've deployed this on our production clusters and have compared production query results (multiple chained joins using dim tables for date spines and fx rates) with and without the physical optimizer. We haven't seen issues with correctness.
cc @ankitsultana @shauryachats