perf(physical-optimizer): skip ensure_distribution rebuild when children are unchanged#22521
perf(physical-optimizer): skip ensure_distribution rebuild when children are unchanged#22521zhuqi-lucas wants to merge 5 commits into
Conversation
…ren are unchanged ensure_distribution was unconditionally calling plan.with_new_children after collecting the (possibly redistributed) children, even when none of the children were actually replaced. For nodes like ProjectionExec, that path runs through try_new and recomputes the schema, equivalence properties, and output ordering each time, which is pure overhead when the input Arcs are identical. Compare each new child Arc with the original via Arc::ptr_eq and reuse the existing plan Arc when nothing changed. The UnionExec to InterleaveExec special case still runs first because it intentionally produces a new node. On a representative deep ProjectionExec stack (30 layers over a sorted parquet scan, no join / aggregate / unmet ordering, 5000 iterations) this brings ensure_distribution from 170.55 us/call to 59.36 us/call, a ~2.87x speedup. Profiling on a real workload dominated by point queries showed ProjectionExec::with_new_children taking 1.94s out of a 2.87s ensure_distribution slice in a 60s sample, so this is the bulk of the rule's cost on that shape. Closes apache#22520
There was a problem hiding this comment.
Pull request overview
This PR optimizes the physical optimizer’s distribution enforcement by avoiding unnecessary plan-node rebuilds when the computed child plans are identical to the existing children, reducing planning CPU overhead (notably for deep ProjectionExec stacks) without changing correctness.
Changes:
- Detect when all post-processing child plan
Arcs are pointer-identical to the original children viaArc::ptr_eq. - Reuse the existing
planArcand skipwith_new_childrenwhen children are unchanged. - Preserve the existing
UnionExec→InterleaveExecspecial-case behavior by applying it before the “children unchanged” fast-path.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
2010YOUY01
left a comment
There was a problem hiding this comment.
Thank you for working on this!
It seems we have already tried to solve a similar issue before: https://github.com/apache/datafusion/pull/19792/changes#diff-8ea78154af964a28c15386d17e3e6ce2f056651666a94dc7c5fb9f66dd849588R325 it avoids to recompute PlanProperties, but it seems there are still remaining wasteful computation exists.
I think we should better to keep the logic in the same place. Now this PR handles it at the caller side, and #19792 is avoiding re-computation at the callee side.
I prefer this PR's approach, I think keeping the check at the caller side is easier to maintain. However this might require some large changes, if that's the case we could do that as a follow-up PR/
| // `ensure_distribution` time for plans where no distribution change | ||
| // applies (point queries with no join / aggregate / unmet ordering), | ||
| // so the rebuild is wasted on the common case. | ||
| let original_children = plan.children(); |
There was a problem hiding this comment.
We could introduce a helper like
with_new_children_if_necessary(plan, children_plans), and later disallow direct with_new_children() usage via clippy. This way we could enforce the helper project-wide to avoid similar issues.
There was a problem hiding this comment.
Good suggestion @2010YOUY01 , addressed in latest PR.
Asserts that when no child of a node is replaced (no RepartitionExec / SortExec injection required), ensure_distribution reuses the input Arc<dyn ExecutionPlan> via Arc::ptr_eq instead of going through with_new_children. Guards the fast path against a future refactor re-introducing the unconditional rebuild.
Per reviewer suggestion in apache#22521, replace the inline Arc::ptr_eq + with_new_children branch with a call to the existing datafusion_physical_plan::with_new_children_if_necessary helper. Same behavior, smaller surface area, keeps the optimization in one canonical place so future call sites elsewhere in the optimizer pick it up automatically.
|
Thanks for the review @2010YOUY01! Good catch on #19792 — I had not noticed that one. I just pushed two updates:
Re: the clippy lint to disallow direct Latest commits: 28a92a5 (refactor) + b5beced (test). Also created the follow-up #22555 |
alamb
left a comment
There was a problem hiding this comment.
Looks great to me -- thanks @zhuqi-lucas and @2010YOUY01
|
run benchmark sql_planner |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing qizhu/enforce-distribution-skip-rebuild (46cec88) to d3983d3 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
Which issue does this PR close?
Rationale for this change
ensure_distributionindatafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rsunconditionally callsplan.with_new_children(children_plans)after collecting the (possibly redistributed) children, even when none of those children were actually replaced. For nodes likeProjectionExec, that path runs throughtry_newand recomputes the schema, equivalence properties, output ordering, and output partitioning, then allocates a newArc<dyn ExecutionPlan>. When every child Arc is pointer-identical to the input, that work produces a logically identical node — pure overhead.The cost is amplified by two factors:
ensure_distributionto inject aRepartitionExecorSortExecfor) hit this wasted rebuild at every node in the plan. A 5–30 deepProjectionExecstack pays the cost N times.ProjectionExec::try_newareO(num_columns): per-columndata_type/nullablelookup to build the new schema, per-column remapping of equivalence classes through the projection mapping, and per-column lookup when rewritingPhysicalSortExprs into the output ordering. Wide schemas (tens of columns) make every wasted rebuild proportionally heavier.Profiling a production point-query workload (wide schemas, deep
ProjectionExecstacks) showedProjectionExec::with_new_childrenas the single largest cost insideensure_distribution:ensure_distributiontotal: 2.87s of a 60s CPU sampleProjectionExec::with_new_children: 1.94s (56% of the rule)SortExec::with_new_children: 0.11sWhat changes are included in this PR?
After collecting
children_plans, compare each new child Arc with the original viaArc::ptr_eq. When every child is unchanged, reuse the existingplanArc and skipwith_new_children. TheUnionExectoInterleaveExecspecial case still runs first because it intentionally produces a new node even when child Arcs are unchanged.This relies on the fact that
ensure_distributionalready produces pointer-identical Arcs for children that need no redistribution (it threads the original Arc through unchanged), soArc::ptr_eqprecisely distinguishes "rewritten" from "untouched" children at O(1) per child.Are these changes tested?
Yes. The existing
enforce_distributionsuite passes unchanged (66/66):The behavior is observable only as a CPU reduction; correctness is preserved because
ExecutionPlannodes are immutable, so reusing the original Arc produces the same plan tree aswith_new_children(unchanged_children)would have, just without the schema / ordering / equivalence / partitioning recomputation.Are there any user-facing changes?
No. Same plans, lower planning time.
Micro-benchmark
Plan shape: 30-deep
ProjectionExecstack over a sorted parquet scan, 5000 iterations.Wider schemas (more projection expressions per node) widen the gap further because each skipped
with_new_childrenavoids more O(num_columns) work.