fix: make BloomFilter intermediate buffer Spark-compatible#4390
fix: make BloomFilter intermediate buffer Spark-compatible#4390andygrove wants to merge 24 commits into
Conversation
Previously, when one aggregate stage (Partial or Final) couldn't be converted to Comet, the other was also blocked to avoid crashes from incompatible intermediate buffer formats (issues apache#1389, apache#1267). This change introduces per-aggregate `supportsMixedPartialFinal` declarations so that aggregates with simple, compatible buffers (MIN, MAX, COUNT, bitwise) can safely run in mixed mode while unsafe aggregates (SUM, AVG, Variance, CollectSet) continue to be blocked. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Restore `convert` scaladoc in `CometAggregateExpressionSerde` that was displaced when `supportsMixedPartialFinal` was added - Require `aggregateExpressions.nonEmpty` in `findPartialAggInPlan` so intermediate distinct-elimination stages (empty agg, group-by only) are not incorrectly tagged as the Partial to disable - Document that `canFinalAggregateBeConverted` mirrors the predicate checks in `CometBaseAggregate.doConvert` and must be kept in sync
If the corresponding partial aggregate would also fail conversion to Comet (for example, collect_set on float is incompatible), tagging it early hijacks the more specific natural fallback reason. Only tag the partial when it would otherwise have been converted, so the tag guards genuine buffer-format mismatches rather than masking unrelated fallbacks. Generalize the convertibility predicate to accept an expected mode and mirror the mode-specific result-expression handling in doConvert.
… files findPartialAggInPlan was using a deep tree traversal that matched partial aggregates separated from the final by other aggregate stages. For Spark's distinct-aggregate rewrite, the partial for non-distinct aggs feeds into a PartialMerge stage rather than directly into the final, so tagging it as unsafe is incorrect and hijacks the natural 'Unsupported aggregation mode PartialMerge' fallback reason. Walk only through exchanges and AQE stages. Also regenerate TPC-DS plan-stability golden files for Spark 3.4, 3.5, and 4.0 to reflect the branch's new safe-mixed-execution behavior where the final aggregate converts to Comet when all aggregate functions have compatible intermediate buffer formats.
Arrow's row format, used by DataFusion's grouped hash aggregate for
composite group keys, does not support Map at any nesting level. The
existing guard in CometBaseAggregate.doConvert only matched top-level
MapType, so queries grouping by e.g. array<map<int,int>> crashed with
"Row format support not yet implemented for: [SortField { ... List(Map(...)) }]"
once the new mixed-partial-final path produced a Comet Final aggregate
over Spark-partial output.
Add a recursive QueryPlanSerde.containsMapType helper that walks into
ArrayType and StructType, and use it in both doConvert and
canAggregateBeConverted. Add a regression test exercising the failing
group-by.sql query shape from SQLQueryTestSuite.
…ressions Mixed COUNT partial/final regressed AQE's PropagateEmptyRelationAfterAQE (which matches BaseAggregateExec only, not CometHashAggregateExec) and the Spark 4.0 count-bug decorrelation for correlated IN subqueries (row dropped in in-count-bug.sql OR pattern). - Remove supportsMixedPartialFinal override from CometCount. - Update trait docstring to explain why COUNT is intentionally excluded. - Narrow the two "safe mixed" tests in CometExecRuleSuite to use only MIN/MAX, which remain mixed-safe. - Revert TPC-DS golden file regeneration from commit 753a9a5; those plan changes were driven by COUNT becoming mixed-safe. MIN, MAX, and bitwise aggregates retain supportsMixedPartialFinal = true.
Comet now accelerates the Partial/Final MIN aggregate in the subquery, which reduces the WholeStageCodegen subtree count below the hardcoded 3 the test asserts. Tag the test with IgnoreComet in the 3.4.3, 3.5.8, and 4.0.1 diffs, matching the pattern used for other plan-shape tests in ExplainSuite.
…l-final-aggregates # Conflicts: # .gitignore # dev/diffs/3.4.3.diff # spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
…l-final-aggregates
…l-final-aggregates
- add withInfo for multiMode and Spark-Final-without-Comet-Partial early returns so EXPLAIN shows the reason - dedupe containsMapType: route operators.scala through QueryPlanSerde.containsMapType and drop the local private copy - log unrecognized passthrough in findPartialAggInPlan at debug level
The previous commit removed the local containsMapType helper in operators.scala but left StructType in the imports list, breaking scalafix in the Lint Java jobs. It also added withInfo annotations for the multiMode and Spark-Final- without-Comet-Partial early returns in CometBaseAggregate, which surface in EXPLAIN output and therefore in the plan-stability golden files. Regenerate the affected TPC-DS golden files (q10, q24a, q24b, q35, q45, q70, q77 plus the v2.7 variants) so CometPlanStabilitySuite passes.
The test asserts a specific WholeStageCodegen subtree count, but Comet changes the plan shape by replacing operators with native versions, so the expected count no longer matches.
Opt CometBloomFilterAggregate into supportsMixedPartialFinal now that the
native intermediate buffer format matches Spark's BloomFilterImpl.writeTo
byte-for-byte. Also mirror the COMET_ENABLE_{PARTIAL,FINAL}_HASH_AGGREGATE
test knobs from CometHashAggregateExec onto CometObjectHashAggregateExec so
ObjectHashAggregateExec plans can be exercised in the same way.
…ermediate-buffer-compat # Conflicts: # spark/src/test/scala/org/apache/comet/rules/CometExecRuleSuite.scala
mbutrovich
left a comment
There was a problem hiding this comment.
Minor changes. Thanks for tackling this, @andygrove!
| self.spark_serialization() | ||
| } | ||
|
|
||
| pub fn merge_filter(&mut self, other: &[u8]) { |
There was a problem hiding this comment.
merge_filter is pub fn ... -> () and panics via assert_eq! on every header mismatch. Its only caller is Accumulator::merge_batch in bloom_filter_agg.rs:176, which already returns Result. Threading these through as DataFusionError::Internal would let a corrupt or truncated intermediate buffer surface as a query failure rather than crashing the executor process.
| ); | ||
| self.bits.merge_bits(other); | ||
|
|
||
| let words = self.bits.data(); |
There was a problem hiding this comment.
This clones self.bits.data() into words, builds a new Vec of the same size, and then constructs a fresh SparkBitArray whose new re-scans the words to recompute bit_count. Two allocations and two passes per merge. An in-place variant that ORs into self.bits.data directly and accumulates bit_count in the same loop would avoid both.
| } | ||
|
|
||
| let mut b = SparkBloomFilter::new(SparkBloomFilterVersion::V1, num_hash, num_bits, 0); | ||
| b.merge_filter(&a.state_as_bytes()); |
There was a problem hiding this comment.
Nice that these go through state_as_bytes then merge_filter. The pre-existing v1_round_trip and v2_round_trip tests above use SparkBloomFilter::from, which was always header-aware, so they would not have caught the bug this PR fixes. The new cases are exactly the right round-trip path
| override def enabledConfig: Option[ConfigEntry[Boolean]] = Some( | ||
| CometConf.COMET_EXEC_AGGREGATE_ENABLED) | ||
|
|
||
| override def getSupportLevel(op: ObjectHashAggregateExec): SupportLevel = { |
There was a problem hiding this comment.
The body here is identical to CometHashAggregateExec.getSupportLevel at operators.scala:1658-1670, including the conf names. That is fine for the test-knob purpose called out in the comment, but COMET_ENABLE_PARTIAL_HASH_AGGREGATE and COMET_ENABLE_FINAL_HASH_AGGREGATE now gate both HashAggregateExec and ObjectHashAggregateExec. As a follow-up, consider renaming to COMET_ENABLE_PARTIAL_AGGREGATE / COMET_ENABLE_FINAL_AGGREGATE so the conf names match the scope.
- Return DataFusionError::Internal on header mismatches instead of asserting, so a corrupt or truncated intermediate buffer fails the query rather than crashing the executor process. - OR-merge incoming words in place via SparkBitArray::merge_be_words and accumulate bit_count in the same pass, avoiding the clone-then-rebuild path that allocated and scanned the bit array twice. - Adjust the merge_rejects_* tests to check the returned error message instead of #[should_panic]. [skip ci]
…ermediate-buffer-compat
The Spark 4.0 BloomFilterAggregateQuerySuite CI job aborted the executor with a multi-exabyte native allocation, and the Spark 3.4 CometExecRuleSuite job failed analysis. Three bloom-filter issues surfaced once this branch let bloom_filter_agg execute natively: - Spark's BloomFilterAggregate caps numItems/numBits at maxNumItems/ maxNumBits, but CometBloomFilterAggregate forwarded the raw literals. Comet's native aggregate stores them as i32, so an oversized Long (e.g. the Long.MaxValue cases in BloomFilterAggregateQuerySuite) wrapped to a negative size and triggered a 2^61-byte allocation. Apply the same cap in the serde so the native side receives Spark-equivalent values. - update_batch hit `unreachable!()` on a null input value. Spark's BloomFilterAggregate.update ignores nulls; skip them, and return an error rather than panicking on a genuinely unexpected type. - The new CometExecRuleSuite BloomFilter cases used an int column, which Spark 3.4's bloom_filter_agg rejects (it only accepts a long-typed first argument); cast to bigint. Adds a CometExec3_4PlusSuite regression test covering oversized numItems/numBits with a null-containing input.
Which issue does this PR close?
Closes #2889.
Stacks on #4015.
Rationale for this change
Comet's
SparkBloomFilter::state_as_bytesemitted only the raw bit array in native endianness, with no header. Spark'sBloomFilterImpl.writeToemits a header (version, num_hash_functions, optional seed for V2, num_words) followed by big-endianu64bit words. The mismatch crashed mixed Spark-Partial / Comet-Final (and the inverse) execution withCannot merge SparkBloomFilters with different lengths.(#2889).#4015 hides the crash by tagging
BloomFilterAggregateas notsupportsMixedPartialFinal. This PR fixes the root cause: Comet's intermediate buffer now matches Spark'swriteTooutput byte-for-byte, andmerge_filterparses + validates the header before merging. With buffers wire-compatible,BloomFilterAggregatejoins thesupportsMixedPartialFinal = trueset.What changes are included in this PR?
SparkBloomFilter::state_as_bytesdelegates to the existingspark_serialization().SparkBloomFilter::merge_filterparses the Spark header, assertsversion/num_hash_functions/seed/num_wordsmatchself, and OR-merges big-endianu64words.CometBloomFilterAggregateoverridessupportsMixedPartialFinal = true.CometObjectHashAggregateExecgains agetSupportLeveloverride that mirrorsCometHashAggregateExec'sCOMET_ENABLE_{PARTIAL,FINAL}_HASH_AGGREGATEtest-knob handling, soObjectHashAggregateExecplans can be exercised in the same way.CometExecRuleSuitecases for both mixed-direction BloomFilter plans, mirroring the MIN/MAX coverage added in fix: allow safe mixed Spark/Comet partial/final aggregate execution #4015.This PR was scaffolded with the project's
superpowers:writing-plans/superpowers:subagent-driven-developmentskills.How are these changes tested?
cargo test -p datafusion-comet-spark-expr bloom_filter::covers V1 / V2 round-trip and four mismatch-rejection cases.CometExecRuleSuitemixed BloomFilter tests (both directions).BloomFilterAggregateQuerySuitenow exercises the mixed path via CI (previously surfaced Bloom filter intermediate aggregate buffers are not compatible between Spark and Comet #2889).