Skip to content

fix: make BloomFilter intermediate buffer Spark-compatible#4390

Open
andygrove wants to merge 24 commits into
apache:mainfrom
andygrove:feat/bloom-filter-intermediate-buffer-compat
Open

fix: make BloomFilter intermediate buffer Spark-compatible#4390
andygrove wants to merge 24 commits into
apache:mainfrom
andygrove:feat/bloom-filter-intermediate-buffer-compat

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #2889.

Stacks on #4015.

Rationale for this change

Comet's SparkBloomFilter::state_as_bytes emitted only the raw bit array in native endianness, with no header. Spark's BloomFilterImpl.writeTo emits a header (version, num_hash_functions, optional seed for V2, num_words) followed by big-endian u64 bit words. The mismatch crashed mixed Spark-Partial / Comet-Final (and the inverse) execution with Cannot merge SparkBloomFilters with different lengths. (#2889).

#4015 hides the crash by tagging BloomFilterAggregate as not supportsMixedPartialFinal. This PR fixes the root cause: Comet's intermediate buffer now matches Spark's writeTo output byte-for-byte, and merge_filter parses + validates the header before merging. With buffers wire-compatible, BloomFilterAggregate joins the supportsMixedPartialFinal = true set.

What changes are included in this PR?

  • SparkBloomFilter::state_as_bytes delegates to the existing spark_serialization().
  • SparkBloomFilter::merge_filter parses the Spark header, asserts version / num_hash_functions / seed / num_words match self, and OR-merges big-endian u64 words.
  • CometBloomFilterAggregate overrides supportsMixedPartialFinal = true.
  • CometObjectHashAggregateExec gains a getSupportLevel override that mirrors CometHashAggregateExec's COMET_ENABLE_{PARTIAL,FINAL}_HASH_AGGREGATE test-knob handling, so ObjectHashAggregateExec plans can be exercised in the same way.
  • Native unit tests for V1 / V2 round-trip and explicit mismatch rejection cases.
  • CometExecRuleSuite cases for both mixed-direction BloomFilter plans, mirroring the MIN/MAX coverage added in fix: allow safe mixed Spark/Comet partial/final aggregate execution #4015.

This PR was scaffolded with the project's superpowers:writing-plans / superpowers:subagent-driven-development skills.

How are these changes tested?

andygrove and others added 20 commits April 20, 2026 21:15
Previously, when one aggregate stage (Partial or Final) couldn't be
converted to Comet, the other was also blocked to avoid crashes from
incompatible intermediate buffer formats (issues apache#1389, apache#1267).

This change introduces per-aggregate `supportsMixedPartialFinal` declarations
so that aggregates with simple, compatible buffers (MIN, MAX, COUNT, bitwise)
can safely run in mixed mode while unsafe aggregates (SUM, AVG, Variance,
CollectSet) continue to be blocked.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Restore `convert` scaladoc in `CometAggregateExpressionSerde` that was
  displaced when `supportsMixedPartialFinal` was added
- Require `aggregateExpressions.nonEmpty` in `findPartialAggInPlan` so
  intermediate distinct-elimination stages (empty agg, group-by only)
  are not incorrectly tagged as the Partial to disable
- Document that `canFinalAggregateBeConverted` mirrors the predicate
  checks in `CometBaseAggregate.doConvert` and must be kept in sync
If the corresponding partial aggregate would also fail conversion to Comet
(for example, collect_set on float is incompatible), tagging it early
hijacks the more specific natural fallback reason. Only tag the partial
when it would otherwise have been converted, so the tag guards genuine
buffer-format mismatches rather than masking unrelated fallbacks.

Generalize the convertibility predicate to accept an expected mode and
mirror the mode-specific result-expression handling in doConvert.
… files

findPartialAggInPlan was using a deep tree traversal that matched partial
aggregates separated from the final by other aggregate stages. For Spark's
distinct-aggregate rewrite, the partial for non-distinct aggs feeds into a
PartialMerge stage rather than directly into the final, so tagging it as
unsafe is incorrect and hijacks the natural 'Unsupported aggregation mode
PartialMerge' fallback reason. Walk only through exchanges and AQE stages.

Also regenerate TPC-DS plan-stability golden files for Spark 3.4, 3.5, and
4.0 to reflect the branch's new safe-mixed-execution behavior where the
final aggregate converts to Comet when all aggregate functions have
compatible intermediate buffer formats.
Arrow's row format, used by DataFusion's grouped hash aggregate for
composite group keys, does not support Map at any nesting level. The
existing guard in CometBaseAggregate.doConvert only matched top-level
MapType, so queries grouping by e.g. array<map<int,int>> crashed with
"Row format support not yet implemented for: [SortField { ... List(Map(...)) }]"
once the new mixed-partial-final path produced a Comet Final aggregate
over Spark-partial output.

Add a recursive QueryPlanSerde.containsMapType helper that walks into
ArrayType and StructType, and use it in both doConvert and
canAggregateBeConverted. Add a regression test exercising the failing
group-by.sql query shape from SQLQueryTestSuite.
…ressions

Mixed COUNT partial/final regressed AQE's PropagateEmptyRelationAfterAQE
(which matches BaseAggregateExec only, not CometHashAggregateExec) and the
Spark 4.0 count-bug decorrelation for correlated IN subqueries (row
dropped in in-count-bug.sql OR pattern).

- Remove supportsMixedPartialFinal override from CometCount.
- Update trait docstring to explain why COUNT is intentionally excluded.
- Narrow the two "safe mixed" tests in CometExecRuleSuite to use only
  MIN/MAX, which remain mixed-safe.
- Revert TPC-DS golden file regeneration from commit 753a9a5; those
  plan changes were driven by COUNT becoming mixed-safe.

MIN, MAX, and bitwise aggregates retain supportsMixedPartialFinal = true.
Comet now accelerates the Partial/Final MIN aggregate in the subquery,
which reduces the WholeStageCodegen subtree count below the hardcoded 3
the test asserts. Tag the test with IgnoreComet in the 3.4.3, 3.5.8,
and 4.0.1 diffs, matching the pattern used for other plan-shape tests
in ExplainSuite.
…l-final-aggregates

# Conflicts:
#	.gitignore
#	dev/diffs/3.4.3.diff
#	spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
- add withInfo for multiMode and Spark-Final-without-Comet-Partial early
  returns so EXPLAIN shows the reason
- dedupe containsMapType: route operators.scala through
  QueryPlanSerde.containsMapType and drop the local private copy
- log unrecognized passthrough in findPartialAggInPlan at debug level
The previous commit removed the local containsMapType helper in
operators.scala but left StructType in the imports list, breaking
scalafix in the Lint Java jobs.

It also added withInfo annotations for the multiMode and Spark-Final-
without-Comet-Partial early returns in CometBaseAggregate, which surface
in EXPLAIN output and therefore in the plan-stability golden files.
Regenerate the affected TPC-DS golden files (q10, q24a, q24b, q35, q45,
q70, q77 plus the v2.7 variants) so CometPlanStabilitySuite passes.
The test asserts a specific WholeStageCodegen subtree count, but Comet
changes the plan shape by replacing operators with native versions, so
the expected count no longer matches.
Opt CometBloomFilterAggregate into supportsMixedPartialFinal now that the
native intermediate buffer format matches Spark's BloomFilterImpl.writeTo
byte-for-byte. Also mirror the COMET_ENABLE_{PARTIAL,FINAL}_HASH_AGGREGATE
test knobs from CometHashAggregateExec onto CometObjectHashAggregateExec so
ObjectHashAggregateExec plans can be exercised in the same way.
…ermediate-buffer-compat

# Conflicts:
#	spark/src/test/scala/org/apache/comet/rules/CometExecRuleSuite.scala
@andygrove andygrove marked this pull request as ready for review May 21, 2026 18:14
@mbutrovich mbutrovich self-requested a review May 21, 2026 20:18
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor changes. Thanks for tackling this, @andygrove!

self.spark_serialization()
}

pub fn merge_filter(&mut self, other: &[u8]) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge_filter is pub fn ... -> () and panics via assert_eq! on every header mismatch. Its only caller is Accumulator::merge_batch in bloom_filter_agg.rs:176, which already returns Result. Threading these through as DataFusionError::Internal would let a corrupt or truncated intermediate buffer surface as a query failure rather than crashing the executor process.

);
self.bits.merge_bits(other);

let words = self.bits.data();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clones self.bits.data() into words, builds a new Vec of the same size, and then constructs a fresh SparkBitArray whose new re-scans the words to recompute bit_count. Two allocations and two passes per merge. An in-place variant that ORs into self.bits.data directly and accumulates bit_count in the same loop would avoid both.

}

let mut b = SparkBloomFilter::new(SparkBloomFilterVersion::V1, num_hash, num_bits, 0);
b.merge_filter(&a.state_as_bytes());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice that these go through state_as_bytes then merge_filter. The pre-existing v1_round_trip and v2_round_trip tests above use SparkBloomFilter::from, which was always header-aware, so they would not have caught the bug this PR fixes. The new cases are exactly the right round-trip path

override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
CometConf.COMET_EXEC_AGGREGATE_ENABLED)

override def getSupportLevel(op: ObjectHashAggregateExec): SupportLevel = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The body here is identical to CometHashAggregateExec.getSupportLevel at operators.scala:1658-1670, including the conf names. That is fine for the test-knob purpose called out in the comment, but COMET_ENABLE_PARTIAL_HASH_AGGREGATE and COMET_ENABLE_FINAL_HASH_AGGREGATE now gate both HashAggregateExec and ObjectHashAggregateExec. As a follow-up, consider renaming to COMET_ENABLE_PARTIAL_AGGREGATE / COMET_ENABLE_FINAL_AGGREGATE so the conf names match the scope.

andygrove added 2 commits May 21, 2026 14:35
- Return DataFusionError::Internal on header mismatches instead of
  asserting, so a corrupt or truncated intermediate buffer fails the
  query rather than crashing the executor process.
- OR-merge incoming words in place via SparkBitArray::merge_be_words
  and accumulate bit_count in the same pass, avoiding the
  clone-then-rebuild path that allocated and scanned the bit array twice.
- Adjust the merge_rejects_* tests to check the returned error message
  instead of #[should_panic].

[skip ci]
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove!

The Spark 4.0 BloomFilterAggregateQuerySuite CI job aborted the executor
with a multi-exabyte native allocation, and the Spark 3.4 CometExecRuleSuite
job failed analysis. Three bloom-filter issues surfaced once this branch
let bloom_filter_agg execute natively:

- Spark's BloomFilterAggregate caps numItems/numBits at maxNumItems/
  maxNumBits, but CometBloomFilterAggregate forwarded the raw literals.
  Comet's native aggregate stores them as i32, so an oversized Long
  (e.g. the Long.MaxValue cases in BloomFilterAggregateQuerySuite) wrapped
  to a negative size and triggered a 2^61-byte allocation. Apply the same
  cap in the serde so the native side receives Spark-equivalent values.
- update_batch hit `unreachable!()` on a null input value. Spark's
  BloomFilterAggregate.update ignores nulls; skip them, and return an
  error rather than panicking on a genuinely unexpected type.
- The new CometExecRuleSuite BloomFilter cases used an int column, which
  Spark 3.4's bloom_filter_agg rejects (it only accepts a long-typed
  first argument); cast to bigint.

Adds a CometExec3_4PlusSuite regression test covering oversized
numItems/numBits with a null-containing input.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bloom filter intermediate aggregate buffers are not compatible between Spark and Comet

2 participants