feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366
Draft
schenksj wants to merge 111 commits into
Draft
feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366schenksj wants to merge 111 commits into
schenksj wants to merge 111 commits into
Conversation
Initial scaffolding for the direct Delta integration that replaces the generic contrib SPI proposed in apache#4339. Mirrors Iceberg's pattern: - native/proto/src/proto/operator.proto: typed `DeltaScan delta_scan = 117` variant on `OpStruct`, with the six message definitions (DeltaScanCommon, DeltaScan, DeltaScanTask, DeltaPartitionValue, DeltaScanTaskList, DeltaColumnMapping) inlined next to the IcebergScan group. Field numbers preserved from the contrib-delta-pr2 branch. - native/core/src/execution/planner.rs: unconditional `OpStruct::DeltaScan` dispatcher arm with feature-gated body. Default builds return a clear "rebuild with --features contrib-delta" error; the feature-on arm is a `todo!` stub today and gets filled in as the implementation ports over. - native/core/src/execution/jni_api.rs + planner/operator_registry.rs: extend the existing `OpStruct` match sites so default builds compile exhaustively. - native/core/Cargo.toml: new optional `contrib-delta` feature backed by an optional path dep on `comet-contrib-delta`. Default builds carry zero Delta surface (verified: `cargo check` builds clean without the feature, and the Delta crate is not in the workspace `members` list). - native/Cargo.toml: explicit `exclude = ["../contrib"]` so the workspace doesn't try to absorb the contrib crate (which would fail -- workspace members must live hierarchically under the workspace root). - contrib/delta/native/{Cargo.toml,src/lib.rs}: skeleton crate that re-exports the typed Delta proto messages so contrib-internal code has a stable short alias. Real implementation (kernel-rs log replay, DV filter, column mapping, partition parsing) ports over from contrib-delta-pr2 in follow-up commits. Build verification: cargo check -p datafusion-comet # default: green cargo check -p datafusion-comet --features contrib-delta # green This addresses Parth's review on apache#4339: ~40 lines of core touchpoints all behind a feature gate, no SPI/registry/traits/runtime dispatch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Brings the working delta-kernel-rs integration over from contrib-delta-pr2 without the contrib SPI plumbing Parth flagged on apache#4339. contrib/delta/native/: - jni.rs, scan.rs, engine.rs, error.rs, predicate.rs, dv_filter.rs -- ported verbatim from contrib-delta-pr2 (only crate::proto::* import paths needed adjustment, handled via lib.rs re-export of the typed messages that now live in core's proto crate) - planner.rs -- Delta-specific helpers (build_delta_partitioned_files, parse_delta_partition_scalar with the DATE -> TIMESTAMP_NTZ widening fallback already inlined, ColumnMappingFilterRewriter) exposed as pure-DataFusion functions that core's dispatcher arm composes onto the standard parquet datasource path. NO ContribOperatorPlanner trait, NO ContribPlannerContext, NO ParquetDatasourceParams -- the contrib crate is now a plain library with public functions. - lib.rs -- module decls + a `pub mod proto` re-export of the six typed Delta messages from `datafusion_comet_proto::spark_operator`. No `#[ctor]` and no `register_contrib_planner` call. - Cargo.toml -- standalone (outside the native/ workspace root), no comet-contrib-spi dep, all delta-specific deps stay confined here. native/core/src/execution/planner/contrib_delta_scan.rs (new): - `PhysicalPlanner::plan_delta_scan` -- the `OpStruct::DeltaScan` arm body extracted into its own file (~210 lines, mirrors `OpStruct::IcebergScan` in size and shape). Gated `#[cfg(feature = "contrib-delta")]`; calls core's `init_datasource_exec`, `prepare_object_store_with_configs`, `convert_spark_types_to_arrow_schema` directly + comet-contrib-delta's helpers for the Delta-specific pieces. native/core/src/execution/planner.rs: - `OpStruct::DeltaScan` arm: 6-line dispatcher that calls into `self.plan_delta_scan(...)` under `#[cfg(feature = "contrib-delta")]`. native/core/src/parquet/parquet_exec.rs: - New `ignore_missing_files: bool` arg on `init_datasource_exec`. Threaded through to `IgnoreMissingFileSource` wrapper (ported verbatim from PR2's native/core/src/parquet/missing_file_tolerant.rs) which decorates the final FileSource so its FileOpener swallows object-store NotFound errors as empty streams. Matches Spark's `spark.sql.files.ignoreMissingFiles=true` semantics. All existing call sites updated to pass `false`. Build verification (both checked clean): cargo check -p datafusion-comet # default cargo check -p datafusion-comet --features contrib-delta Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
These five files port verbatim from contrib-delta-pr2 -- they touch only
Spark APIs (via reflection) and standard Scala, none of the rejected SPI
surface:
- DeltaConf.scala Config keys (COMET_DELTA_NATIVE_ENABLED, ...)
- Native.scala JNI bridge for planDeltaScan
- DeltaReflection.scala Reflective access to spark-delta internals
(isDeltaFileFormat, isBatchFileIndex,
extractBatchAddFiles, ...)
- RowTrackingAugmentedFileIndex Wraps a FileIndex to inject row-tracking
metadata columns
- DeltaInputFileBlockHolder Thread-local replacement for
InputFileBlockHolder on the Delta scan path
Plus the regression infrastructure (4.1.0.diff, run-test.sh,
run-regression.sh).
The remaining four files (CometDeltaNativeScan, CometDeltaNativeScanExec,
DeltaScanRuleExtension, DeltaOperatorSerdeExtension, DeltaPlanDataInjector)
each reference the rejected SPI surface (CometOperatorSerde,
CometScanRuleExtension, ContribOp envelope, PlanDataSource, PlanDataInjector).
Those need rewriting before they can compile against main -- queued as the
next commit on this branch:
- drop the `extends CometOperatorSerde[CometScanExec]` trait bound;
expose `convert(...)` as a static method
- replace ContribOp envelope with the typed OpStruct::DeltaScan
- drop the SPI extension class wrappers; integrate detection directly
into CometScanRule.scala + CometExecRule.scala (Iceberg-style)
- bake DeltaPlanDataInjector logic directly into CometDeltaNativeScanExec
Maven `-Pcontrib-delta` profile, scalastyle wiring, and the SPI rewrite
all land together in the follow-up commit so the contrib compiles
end-to-end against main.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ction bridge
The four SPI-touching files from contrib-delta-pr2 rewritten to drop the
rejected SPI base classes and use the typed `OpStruct::DeltaScan` proto
variant directly:
- CometDeltaNativeScan.scala no longer `extends CometOperatorSerde`;
plain object with `convert(scan, builder, childOp*)` static method.
All `ContribOp` envelope wrapping replaced with
`builder.setDeltaScan(...)`. DeltaOperator.* imports redirected to
core's `org.apache.comet.serde.OperatorOuterClass`.
- CometDeltaNativeScanExec.scala no longer `with PlanDataSource`;
public accessors (planDataSourceKey, planDataCommonBytes,
planDataPerPartitionBytes) stay so core's CometExecRDD can read them
directly. `nativeOp.getContribOp.getPayload` calls collapse to the
typed `nativeOp.getDeltaScan` accessor.
- DeltaScanRule.scala was `class DeltaScanRuleExtension extends
CometScanRuleExtension`; now a plain `object DeltaScanRule` with a
single static entry point `transformV1IfDelta(plan, session,
scanExec, relation): Option[SparkPlan]`. The private
`CometScanRule.isSchemaSupported` is unreachable from contrib, so
inline the equivalent check (CometScanTypeChecker + fallback-reason
emission).
- The DeltaOperatorSerdeExtension + DeltaPlanDataInjector files are
not ported -- their roles fold into the next commit's CometExecRule
Delta serde dispatch and into CometDeltaNativeScanExec respectively.
Core wiring:
- spark/pom.xml: new `<profile id="contrib-delta">` adds
contrib/delta/src/main/scala/ as a compile source on comet-spark and
pulls in `io.delta:delta-spark_2.13:4.1.0` at provided scope.
- CometScanRule.scala: 5-line Delta detection block at the head of
`transformV1Scan`'s HadoopFsRelation case (Iceberg-style; calls into
`DeltaIntegration.transformV1IfDelta` which is a no-op when the
contrib isn't bundled).
- DeltaIntegration.scala (new): reflection bridge that resolves the
contrib's `DeltaScanRule` + `CometDeltaNativeScan` companion objects
by class name. Default builds get `None`; -Pcontrib-delta builds get
a working delegate. No SPI / ServiceLoader / registry.
Build verification:
mvn compile # default: still green
mvn compile -Pcontrib-delta # GREEN -- this is the milestone
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tics Spark's UnsafeRow.getUTF8String wraps bytes via UTF8String.fromAddress with no UTF-8 validation, and cast(BinaryType -> StringType) is a zero-copy reinterpret that leaves arbitrary bytes in a StringType column. Delta's Z-Order uses interleave_bits(...).cast(StringType) for opaque sort keys, which panicked Comet's strict from_utf8(...).unwrap() and cascaded into JVM classloader errors (60+ ServiceConfigurationError tests in the contrib-delta-pr2 regression run). Switch to from_utf8_unchecked since the bytes flow directly into Arrow's StringBuilder::append_value and are never introspected as a &str. Verified on contrib-delta-pr2: OptimizeZOrderScalaSuite "interleaving" 4/4 PASS after this fix. Pure core fix -- independent of the contrib/delta integration. Lands on this branch because it's a prerequisite for the Delta regression to be meaningful (without it the Z-Order panic poisons every following test). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Connects core's CometExecRule to the contrib's Delta scan serde so the
Delta-marker CometScanExec produced by CometScanRule flows through the
same `convertToComet(scan, handler)` path as Iceberg / NativeScan / etc.
- CometDeltaNativeScan re-extends core's `CometOperatorSerde` trait
(the trait itself is core, not part of the rejected extension SPI;
every Comet operator handler implements it). `getSupportLevel` /
`enabledConfig` / `convert` now properly override.
- DeltaIntegration.scanHandler: a single reflective lookup exposes
the contrib's companion as a `CometOperatorSerde[CometScanExec]`.
Returns None on default builds.
- CometExecRule.transform: new case beside the SCAN_NATIVE_DATAFUSION
one that recognises the Delta scan marker (scanImpl ==
"native_delta_compat") and dispatches via the handler.
Build verification:
mvn compile GREEN
mvn compile -Pcontrib-delta GREEN
Still pending for end-to-end:
- per-partition task-list injection (replaces PR2's DeltaPlanDataInjector
SPI) -- baked into CometExecRDD via another small reflection hook
- live smoke test once the dylib is rebuilt with --features contrib-delta
and bundled into the jar
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Connects the contrib's per-partition Delta task-list serialisation into
core's existing `PlanDataInjector.injectPlanData` pipeline. Without this
the native side decodes a tasks-empty `DeltaScan` and returns `EmptyExec`
(0 rows) for every Delta scan.
- contrib/delta/.../DeltaPlanDataInjector.scala: implements core's
`PlanDataInjector` trait. `canInject` checks `op.hasDeltaScan` and
rejects already-injected operators (idempotent). `inject` splices the
partition's tasks into the operator's common-only DeltaScan envelope
via `op.toBuilder.setDeltaScan(...)` -- pure typed-proto operations,
no `ContribOp` envelope.
- spark/.../operators.scala: `PlanDataInjector.injectors` Seq now
appends the contrib injector via one reflective Class.forName lookup.
Default builds get None (no contrib classes on classpath) so the
list is unchanged; -Pcontrib-delta builds get the Delta injector.
Build verification:
mvn compile -Pcontrib-delta GREEN
End-to-end Scala+Maven integration is now complete. Remaining work:
- rebuild native dylib with `--features contrib-delta` and bundle
into comet-spark.jar
- run an isolated test (e.g. OptimizeZOrderScalaSuite "interleaving")
to confirm the end-to-end path works
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wrap Class.forName calls in `// scalastyle:off classforname`, change Option[Class[_]] to Option[Class[AnyRef]] to avoid existential type warnings, reword the doc comment so the verbatim string Class.forName doesn't trip scalastyle's source-pattern check. mvn scalastyle:check -Pcontrib-delta GREEN Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…th file path CometExecIterator was wrapping native Parquet failures (e.g. corrupt-footer errors from kernel-rs reading a broken Delta checkpoint) in `_LEGACY_ERROR_TEMP_2254`, whose message is literally "Data read failed." -- no file path, no useful context. That broke tests that mirror Spark/Delta's standard parquet-failure shape, e.g. SnapshotManagementSuite "should not recover when the current checkpoint is broken" which asserts the resulting SparkException's message contains both the file path and "Encountered error while reading file" -- the format `QueryExecutionErrors.cannotReadFilesError` produces. Switch the wrapping to `cannotReadFilesError(cause, filePath)` via a new helper on ShimSparkErrorConverter (which lives in the spark package and can reach the private InputFileBlockHolder / QueryExecutionErrors). File path is read from InputFileBlockHolder, with an empty-string fallback when the thread-local isn't set; the static phrasing still satisfies the test assertion. Pure core fix -- benefits every native parquet read, not just Delta. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
DeltaTable.forPath(spark, path, fsOptions) with a Hadoop custom-fs scheme (e.g. fake://) was being claimed by CometScanRule for V1 parquet scans on the _delta_log/checkpoint.parquet files Delta reads internally. The native side then crashed at executePlan with `Generic URL error: Unable to recognise URL "fake:///..."` since object_store doesn't know the custom scheme. Add a scheme allowlist check (same set already used in the Iceberg branch and the contrib Delta path) at the top of the HadoopFsRelation arm; decline via withInfo when any rootPaths scheme is outside the allowlist so Spark's Hadoop-FS-aware reader handles the scan. Fixes DeltaTableSuite "dropFeatureSupport - with filesystem options" and is also a baseline fix (the same crash reproduces on main per full-20260415-222735.log). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each `plan_delta_scan` JNI call was creating a fresh `DefaultEngine`. Kernel's `DefaultEngine<TokioBackgroundExecutor>` spawns one std::thread per executor that hosts a current_thread tokio runtime, and that runtime's blocking pool (used by kernel for parquet metadata IO and object_store reads) keeps `spawn_blocking` worker threads alive for ~10s after each task. Under regression load (hundreds of Delta scans/minute, each spawning a handful of blocking IO tasks) this accumulates OS threads faster than tokio reaps them, eventually hitting the per-process `ulimit -u` (~1300 on macOS) — visible in the log as `pthread_create EAGAIN` aborts of GenerateIdentityValuesSuite and MergeIntoUnlimitedMergeClausesScalaSuite ~2 hours into the run. Replace the per-call `create_engine` with `get_or_create_engine` that returns an `Arc<DeltaEngine>` from a static cache keyed by `(scheme, authority, DeltaStorageConfig)`. Engines are constructed lazily on first miss per key and reused for the lifetime of the JVM, bounding live OS threads by table-storage diversity rather than by request count. The standalone `create_engine` is kept (behind `#[allow(dead_code)]`) for tests that want a fresh engine. `scan.rs` updated to deref `Arc<DeltaEngine>` to `&dyn Engine` at each kernel call (`builder.build`, `scan.scan_metadata`, `dv.get_row_indexes`). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
DataFusion's `make_array_inner` asserts strict element-type equality (down to
nested field nullability) via `MutableArrayData::with_capacities`. Spark's
`CreateArray` is more permissive: when the analyzer doesn't insert coercion
casts, children can share the same surface struct type but disagree on a
nested field's nullability. Delta's CDF write path builds
`array(struct(id, b, _change_type=lit("delete")), struct(id, b, _change_type=col))`
manually -- one arm's `_change_type` is `Utf8` non-nullable (from a literal),
another is `Utf8` nullable -- and Comet's native serde happily emitted a
`make_array` call. Native execution then panicked:
assertion `left == right` failed: Arrays with inconsistent types passed to
MutableArrayData
left: Struct([..., Field { name: "_change_type", data_type: Utf8 }])
right: Struct([..., Field { name: "_change_type", data_type: Utf8, nullable: true }])
Decline in `CometCreateArray` when `children.map(_.dataType).distinct.size > 1`
so the JVM evaluator (which doesn't have this strictness) handles it. Fixes 4
`DescribeDeltaHistorySuite "replaceWhere on data column ... enableCDF=true"`
failures.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… kind Two perf-sweep items from apache#135: apache#7 parse_delta_partition_scalar TZ parse-once. The per-row chrono_tz::Tz::from_str (or fixed-offset parse) was happening inside parse_delta_partition_scalar for every TIMESTAMP partition value, but the session TZ string doesn't change within a scan. Introduce SessionTimezone enum (Tz | Offset | Invalid), parse once in build_delta_partitioned_files, pass the parsed value through. parse_delta_partition_scalar's signature gains &SessionTimezone and keeps session_tz: &str only for the error message. apache#2 PlanDataInjector lookup by op kind. injectPlanData was running `for (injector <- injectors if injector.canInject(op))` against every operator in the tree; for a 50-op plan with 3 injectors that's 150 canInject calls just to find no match on most ops. Add `opStructCase` to the PlanDataInjector trait, build a Map[OpStructCase, PlanDataInjector] once at object init, and look up by op.getOpStructCase before any canInject call. Iceberg/NativeScan/Delta injectors set their own opStructCase. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tion Perf-sweep #1 from apache#135. `DeltaIntegration.transformV1IfDelta` is invoked for every V1 scan in every plan (the bridge is called unconditionally by CometScanRule before the contrib's own Delta-format check). On -Pcontrib-delta builds each call was doing `getField MODULE$` + `getMethod("transformV1IfDelta", ...)` + 4-arg Method.invoke -- a reflection round-trip per scan. Cache the resolved (module, method) binding once per JVM as `transformV1IfDeltaBinding: Option[(AnyRef, Method)]`, single OnceLock-style volatile. Steady-state per-scan cost drops to one volatile read + one Method.invoke. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Perf-sweep apache#5 from apache#135. `isSchemaCometCompatible` was allocating a fresh CometScanTypeChecker(CometDeltaNativeScan.ScanImpl) on every scan. The checker is stateless w.r.t. its scanImpl tag and is safe to share. Promote it to a private val on DeltaScanRule; the per-scan fallback-reasons ListBuffer remains per-call (it's the only mutable input). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…artitioned_files Perf-audit apache#137 finding #1. The inner `partition_schema.fields()` loop was calling `.iter().find()` on `task.partition_values` for every field -- O(width × values) per task. Pre-build a per-task HashMap<&str, &str> once, then O(1) gets. The map is reused across tasks via clear() so the allocation amortises across all DeltaScanTasks in the scan. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SnapshotManagementSuite "should not recover when the current checkpoint is broken..." asserts the wrapped FAILED_READ_FILE.NO_HINT SparkException message contains the file path (e.g. "0001.checkpoint"). de9e0d3 got the error class right but left the path empty because: 1. Comet's native scan path does NOT go through Spark's FileScanRDD, so the standard InputFileBlockHolder thread-local is never populated. 2. ShimSparkErrorConverter.wrapNativeParquetError was reading from InputFileBlockHolder, getting null, and passing "" to cannotReadFilesError -- producing "Encountered error while reading file . " (with the empty path), which the test rejected. Plumb per-partition file paths from CometNativeScanExec (where they're known at planning time) -> CometExecRDD -> CometExecPartition -> CometExecIterator -> wrapNativeParquetError. CometNativeExec.doExecuteColumnar (the actual call site that constructs the iterator for query trees with a scan) collects file paths from any CometNativeScanExec leaves and passes them through the same CometExecRDD parameter. Verified with a /tmp/cometdiag.log file sentinel that the existing logWarning diags were being silently dropped by the test's `quietly { ... }` block, which is why my earlier "the wrap isn't being reached" conclusion was wrong. Test results after fix: SnapshotManagementSuite checkpoint-broken 2/2 PASS (was 0/2 with empty path). The other 3 fix clusters (de9e0d3+effe5f76+56c2b011) continue to pass: replaceWhere CDF 8/8, dropFeatureSupport 1/1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…+ safeguards
Five fixes from the comprehensive code review of contrib-delta-direct:
1. Implement the missing InputFileBlockHolder hook in CometExecRDD.compute.
Several docs referenced `CometExecRDD.setInputFileForDeltaScan` but no such
method existed and nothing called `DeltaInputFileBlockHolder.set`, leaving
Delta's UPDATE/DELETE/MERGE flows (which use `input_file_name()` to find
touched files) silently looking at an empty path. Now set the thread-local
to the partition's first file (one-per-partition is enforced by
DeltaScanRule when input_file_name() is referenced), unset on task
completion. Stale doc references updated to point at the real call site.
2. DV filter ordering safeguards. DeltaDvFilterExec's `current_row_offset`
tracking assumes physical row ordering from the parquet scan. Override
`maintains_input_order() = [true]` and
`benefits_from_input_partitioning() = [false]` so any future optimizer
that wants to insert a RepartitionExec / SortPreservingMergeExec is
forced to bail rather than silently re-order rows.
3. Tighten IgnoreMissingFileSource's `is_not_found` Display fallback. The
prior `msg.contains("not found")` would match unrelated parquet messages
like "row group statistics not found" or "page index not found" and
silently swallow them as missing-file (returning empty results instead
of failing). Restrict to recognised NotFound prefixes from object_store /
S3 / FS error formats.
4. Multi-line regex for native parquet errors in CometExecIterator. Native
parquet errors with embedded newlines (e.g. footer hex dumps) would slip
past the single-line `^Parquet error: .*$` and surface as bare
CometNativeException. Add `(?s)` so `.` spans newlines.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The post-review fixes added/modified scaladoc that broke spotless line-length rules. Apply spotless:apply across the three touched files. Verified with test-compile. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
7 tasks
Adds a TODO note linking the decline-and-fallback to apache/datafusion#22366. Lets a future maintainer find the upstream fix when it lands and remove the workaround. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…log replay Closes the P1 credential-asymmetry gap carried from apache#3932 (commit 461fa4f). Previously the kernel-rs log-replay path's DeltaStorageConfig only honored explicit static keys (`fs.s3a.access.key` / `fs.s3a.secret.key` / `fs.s3a.session.token`) set in core-site.xml. Users running under SimpleAWSCredentialsProvider / TemporaryAWSCredentialsProvider / AssumedRoleCredentialProvider / IAMInstanceCredentialsProvider would see data-file reads authenticate (those go through Comet's existing native `build_credential_provider`) but log replay fail. Resolution happens Scala-side via reflection against `org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderList` -- the same Hadoop credential machinery Spark uses everywhere else. The resolved (access_key, secret_key, session_token) tuple is stuffed into the `storageOptions` map under the standard Hadoop keys before the JNI call. Reflective because hadoop-aws is an optional dep; absence falls through to static-only behavior (any user without S3 stays unaffected). Architecture note: an in-crate cherry-pick of 461fa4f wasn't viable here because the JNI lives in `contrib/delta/native/` -- a standalone Cargo crate that deliberately doesn't depend on core (to keep the arrow-57 / arrow-58 split clean). The Scala-side approach has the same correctness properties and avoids the crate boundary entirely. Method handles cached via @volatile Option[Option[Binding]] -- the augment path runs on every Delta scan; resolving the Class + getMethod chain on each call would be a per-scan reflection round-trip just to find the same handles every time. SNAPSHOT resolution: log replay completes in seconds, well within any reasonable credential TTL. Long-running data reads continue to use Comet's refresh-capable native credential provider. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ad-bearing Expand the comment on the CM-name + checkLatestSchemaOnRead=false guard to explain the specific failure mode (column_mappings from one snapshot vs. parquet physical names from another after a concurrent ALTER TABLE). The guard is conservative but necessary; a future reader of the code shouldn't mistake it for laziness. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…uet field IDs Implements apache#142. Previously declined at DeltaScanRule.scala:271 because the contrib's native path matched parquet columns by name and CM-id mode demands ID-based matching. Comet core's `schema_adapter.rs` already supports field-ID matching via `use_field_id` + `ignore_missing_field_id` flags; this PR wires the Delta contrib through that machinery. Five mechanical changes: 1. Add `parquet.field.id` (Spark's standard StructField metadata key for parquet field IDs) and `delta.columnMapping.id` (Delta's CM-id storage key) as named constants in DeltaReflection. 2. Add `use_field_id` bool to DeltaScanCommon proto (field 17). 3. CometDeltaNativeScan.translateDeltaFieldIdToParquet walks the schema tree recursively (StructType -> nested fields, ArrayType -> element, MapType -> key/value) copying `delta.columnMapping.id` to `parquet.field.id` on every StructField. Spark's `ParquetUtils.hasFieldId` -- which schema2Proto and serializeDataType's StructType arm read -- looks at `parquet.field.id`, so this is what makes the field IDs actually reach the proto. 4. In `convert()`, detect CM-id mode from snapshot metadata and apply the translator to data_schema / required_schema / partition_schema before calling `schema2Proto`. Set `commonBuilder.setUseFieldId(true)` so the native dispatcher passes `use_field_id=true` to `init_datasource_exec`. 5. native/core/src/execution/planner/contrib_delta_scan.rs uses `common.use_field_id` from the proto instead of the hardcoded `false`. The recursive translator handles nested struct / array / map field IDs -- the "complex sub-types" gotcha from earlier CM-name work. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…n-blocking Expand inline comments on the three remaining DeltaScanRule fallback gates (TahoeLogFileIndexWithCloudFetch, __delta_internal_* synthetic columns, CometScanTypeChecker decline) to document why they're correctness-correct as fallback-only paths and to capture the implementation sketches for any future native-perf work. No behavioral change. Each gate was verified in the recent regression to either never fire (cloud-fetch -- OSS Delta doesn't have the class) or fire on a path Spark's reader handles correctly without test failures (synthetic columns, schema type decline). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pache#144) Native ExecutionPlan wrapper that appends Delta's `__delta_internal_row_index` (UInt64) and `__delta_internal_is_row_deleted` (Int32) columns to scan output batches. Replaces the decline for these synthetic columns where the surrounding plan asks for them (UPDATE/DELETE/MERGE flows). - `synthetic_columns.rs`: new module with DeltaSyntheticColumnsExec. Same physical-order invariant as DeltaDvFilterExec (one file per partition; parquet emits in file row order). Appends columns via a single sweep over the DV-sorted indexes alongside the batch's row range. - proto: add `emit_row_index` (18) and `emit_is_row_deleted` (19) flags on DeltaScanCommon. - contrib_delta_scan.rs: wire three mutually-exclusive wrap modes -- synthetic exec, DV filter exec, or passthrough. NOT YET WIRED Scala-side: when scan.requiredSchema contains these synthetic column names, CometDeltaNativeScan still needs to (a) strip them from the proto schemas (so the native parquet reader doesn't try to read them) and (b) set the proto emit flags. Until that lands the existing decline gate at DeltaScanRule.scala:331-342 stays active. Native module compiles clean. Full linker validation deferred -- disk-space pressure from concurrent regression run blocked the full link cycle. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…olumns Completes apache#144. CometDeltaNativeScan.convert now: - Detects __delta_internal_row_index / __delta_internal_is_row_deleted in scan.requiredSchema - Verifies they form a contiguous SUFFIX of required_schema (so wrapped DeltaSyntheticColumnsExec's appended-at-end output matches Spark's expected layout); declines otherwise - Strips them from the proto required_schema and data_schema so the parquet reader doesn't look for columns that aren't on disk - Filters them out of projection_vector (their -1 sentinel would have been out-of-bounds for native usize) - Sets the proto emit_row_index / emit_is_row_deleted flags so the dispatcher wraps the parquet scan in DeltaSyntheticColumnsExec to append them back DeltaScanRule: removed the decline gate at scanWithMappedSchema. Removed the belt-and-suspenders guard in CometDeltaNativeScan now that the convert path handles synthetics rather than falling back. Combined with the native exec from 2cb9188, this lets UPDATE/DELETE/MERGE flows that materialise the DV deletion flag stay on the native path instead of falling back to Spark's Delta reader. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The decline at DeltaScanRule for "$ScanImpl does not support Parquet field ID matching" was a separate gate from CM-id mode, fired when the user explicitly set spark.sql.parquet.fieldId.read.enabled=true AND scan.requiredSchema carried Spark's standard `parquet.field.id` metadata (non-Delta-id path that nevertheless wants field-ID matching). The same native machinery wired for CM-id (apache#142, commit 7ace165) handles this case unchanged -- `serializeDataType`'s StructType arm reads `ParquetUtils.hasFieldId` for nested types and `schema2Proto` does the same for top-level. The only thing needed was setting `use_field_id=true` on the proto. CometDeltaNativeScan.convert now sets `useFieldIdActive` from EITHER CM-id mode OR (Spark's PARQUET_FIELD_ID_READ_ENABLED + hasFieldIds). Gate removed from DeltaScanRule. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…commit_version)
Unblocks the second gate in DeltaScanRule.applyRowTrackingRewrite, which used
to decline native execution when a row-tracking-enabled table HAD no
materialised column names (rowIdPhysical / rowVerPhysical both empty,
meaning Delta expects synthesis from baseRowId + physical row index).
End-to-end wiring:
- scan.rs: extract baseRowId / defaultRowCommitVersion per scan file from
each ScanMetadata batch's underlying RecordBatch
(`fileConstantValues.baseRowId` / `defaultRowCommitVersion` -- not
exposed by kernel's `ScanFile`). Uses an `RawEntryAcc` context struct
because `visit_scan_files` requires `fn` (not `FnMut`), so the per-batch
row-tracking lookup vec lives in the context.
- jni.rs: thread the extracted values into DeltaScanTask proto fields 6/7
(already present, previously hard-None'd).
- proto: add `emit_row_id` (20) and `emit_row_commit_version` (21) flags
on DeltaScanCommon.
- synthetic_columns.rs: extend DeltaSyntheticColumnsExec to emit the two
new columns (row_id = baseRowId + physical_row_index per file,
row_commit_version = defaultRowCommitVersion constant per file). Nullable
Int64 columns; null-valued when the file has no row tracking.
- contrib_delta_scan.rs: force per-file FileGroups when emit_row_id /
emit_row_commit_version is on (the per-partition row offset counter
doesn't reset across files within a FileGroup, so baseRowId arithmetic
requires 1:1 file-to-partition mapping just like the DV case).
- CometDeltaNativeScan: detect row_id / row_commit_version in
scan.requiredSchema, add to synthetic-column suffix check + strip from
proto schemas + projection_vector, set emit flags.
- DeltaScanRule.applyRowTrackingRewrite: stop declining the no-materialised
case; return None (no rewrite needed) so nativeDeltaScan proceeds and
CometDeltaNativeScan.convert sets the synthesis path.
Also unblocks the related field-id-matching gate when
spark.sql.parquet.fieldId.read.enabled is true (commit ee9f9e4) -- the
same use_field_id machinery handles both CM-id and non-CM-id paths.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
With native synthesis of `__delta_internal_is_row_deleted` wired in apache#144, the `outputHasIsRowDeleted` branch of `scanBelowFallsBackForDvs` no longer needs to force a decline. CometDeltaNativeScan.convert detects the column in scan.requiredSchema and routes through DeltaSyntheticColumnsExec to append it -- the surrounding Delta projection that filters on the column runs against the synthesised output without falling back to Spark. Only `batchFallback` (TahoeBatchFileIndex with DV-bearing AddFiles) still forces decline because our native path can't extract DV info from pre-materialised batch indexes -- separate issue. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Delta 4.1 own-test-suite regression failures on CDC reads under
row-tracking-enabled tables (especially the coordinated-commits batch
backfill variant DeltaCDCScalaWithCatalogOwnedBatch2Suite) surfaced as
'Comet Internal Error: Output column count mismatch: expected 14, got 13'.
The scan declared 14 output columns including the per-file
`default_row_commit_version` metadata column, but the native
DeltaSyntheticColumnsExec only knew about `base_row_id`, so the column
was dropped on the way through and the upstream operator saw N-1.
This commit:
* Adds `default_row_commit_version` to JVM-side `syntheticNames` and
`fixedMetadataNames` sets in CometDeltaNativeScan so it's included
in `metadataColumnNamesEmitted` and the proto.
* Adds the matching `META_DEFAULT_ROW_COMMIT_VERSION` constant, field
schema, and emit branch in `synthetic_columns.rs`; extends
`TaskMetadata` with the new field and wires it from
`core_glue.rs` (the proto already carried the value via
`task.default_row_commit_version`).
Removes the `DeltaSyntheticColumnsExec: unknown metadata column name
'default_row_commit_version'` failure path. A second off-by-one in the
same suite remains under investigation (separate column drop).
Also bundled:
* `CometScanWithPlanData.perPartitionFilePaths` trait method +
`operators.scala` union-path collector now matches the trait
instead of just `CometNativeScanExec`, so MERGE/UPDATE/DELETE
flows that embed a Delta scan in a parent native tree no longer
see empty `input_file_name()` -> `DELTA_FILE_TO_OVERWRITE_NOT_FOUND`.
* `CometExecRDD.compute` populates `InputFileBlockHolder` whenever
`partition.filePaths.nonEmpty` (not only the single-file case),
matching PR apache#3932's approach.
* Re-enables the two MERGE reproducers in `CometDeltaSpecialCharFilenameSuite`.
* Adds `DeltaHiveTest.scala` Comet-wiring hunk to `4.1.0.diff`
(the piece PR apache#3932's 4.0.0.diff had that ours was missing).
* New `CometDeltaRegressionReproSuite` (one repro per root-cause
cluster identified in the 4.1 regression) and a `CometDeltaCdcSuite`.
* New `.github/workflows/delta_regression_test.yml` workflow that
invokes `contrib/delta/dev/run-regression.sh` across Delta 3.3.2 /
4.0.0 / 4.1.0 with smoke -> full gating.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…etic schema augmentation Follow-up to the metadata-column fix in 97c953a. The `default_row_commit_version` column was added to `metadataColumnNamesEmitted` and to the native synthetic-column emit branches, but `isExtraSyntheticName` (which augments `requiredSchemaFields` with metadata cols not in `scan.requiredSchema`) was missing it. The resulting `requiredSchemaForProto` was one column short, so `finalOutputIndices` was sized 13 instead of 14 and the native `ProjectionExec` tried to read past the schema end. Adds `default_row_commit_version` to the explicit name list alongside `base_row_id`. Drops DeltaCDCScalaWithCatalogOwnedBatch2Suite from 6 -> 5 failures (the "CDC read's commit timestamps are correct under different timezones" test now passes). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…d data length
When the Delta scan's `relation.dataSchema` includes a synthetic column
(e.g. `__delta_internal_is_row_deleted` from Delta's DV rewrite for
CDC reads of row-tracking-enabled tables), the JVM-side
`projectionVector` partition-tail indexes were computed off
`fileDataSchemaFields.length` (un-stripped, includes the synthetic),
but native receives `dataSchemaForProtoStripped` (synthetic removed
when `needsSyntheticEmit`). The partition tail then points one past
the end of the native data+partition schema, panicking
`ProjectionExprs::from_indices` with
"index out of bounds: the len is N but the index is N".
Compute partition-tail indexes from the non-synthetic data length
instead, mirroring what native sees. Also rewrite the
data-or-partition lookup in `requiredIndexes` to walk the
synthetic-filtered data field map so each entry points to the
correct native-side position.
Drops DeltaCDCScalaWithCatalogOwnedBatch2Suite from 5 -> 4 failures
("aggregating non-numeric cdc data columns" now passes). The
remaining 4 are "Results do not match" assertions (data correctness,
not native crashes) -- separate investigation.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CDC "delete events" / "insert events" reads attach a non-empty `rowIndexFilters` map to `CdcAddFileIndex` / `TahoeRemoveFileIndex`. This flips the DV bitmap meaning: native batch reads filter OUT the rows in the bitmap, but CDC needs the rows IN the bitmap (the rows being newly deleted/inserted). Our native scan only implements the batch semantics, so it returned the wrong rows -- e.g. for a DELETE "id > 20" on a file [20-24] with DV cardinality 4, the native scan emitted the non-DV'd row (id=20) instead of the DV'd rows (21-24). Add `DeltaReflection.hasInvertedRowIndexFilters` reflection helper and gate `DeltaScanRule.nativeDeltaScan` to decline when set. Spark's Delta reader then handles these correctly. Result: DeltaCDCScalaWithCatalogOwnedBatch2Suite drops from 4 -> 0 failures (55/55 pass). Parent DeltaCDCScalaSuite still 55/55, DeltaCDCSQLSuite still 54/54, contrib local Scala suites still 62 succeeded / 0 failed / 1 canceled (pre-existing). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ata-skipping
`refreshedSnapshotFiles` called `snapshot.filesForScan(Nil, false)` --
passing an empty filter list, so the refreshed file list was the FULL
table. This bypassed Delta's planning-time partition pruning and
stats-based data-skipping: for a PreparedDeltaFileIndex on a partitioned
table, our scan would read every file regardless of the predicate.
Surfaced as StatsCollectionSuite "gather stats" asserting
`recordsScanned(df.where("id = 1")) == 1` but Comet's native scan
returning 9 (all rows from all files).
Thread the scan's `partitionFilters ++ dataFilters` through
`extractBatchAddFiles` to `refreshedSnapshotFiles`, then to
`snapshot.filesForScan(filters, false)`. Delta evaluates partition
filters against partitionValues and data filters against AddFile
stats, returning only the surviving files.
Result: StatsCollectionSuite 65/67 -> 65 succeeded with "gather stats"
+ 1 other test now passing (was 4 failures, down to 2 -- the remaining
two read commit JSONs via LogicalRDD, separate code path).
DeltaCDCScalaWithCatalogOwnedBatch2Suite stays at 55/55; contrib local
suites stay at 62/62.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors MergeIntoSchemaEvolutionBaseExistingColumnSQLPathBasedCDCOnDVsPredPushOffSuite's "upcast int source type into long target" failure shape: target written with DV + CDC enabled, MERGE INSERT + UPDATE *, read back. Upstream returns 0 rows (expected 4) because the post-MERGE read goes through our scan applying a synthetic `__delta_internal_is_row_deleted` filter, and something in the scan/Filter interaction drops all rows. This local reproducer sets the equivalent session configs (persistentDeletionVectors.enabled for merge, useMetadataRowIndex=false, enableChangeDataFeed default, enableDeletionVectors default) and performs the same operations, but does NOT yet fail. The Delta test harness must set up additional state we're not mirroring. Committed as-is so the repro is in place for the next iteration. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Spark wraps file-source partition columns and per-batch constants in
`ConstantColumnVector`. Comet's `NativeUtil.exportBatch` and
`Utils.getBatchFieldVectors` previously threw
`Comet execution only takes Arrow Arrays, but got class
ConstantColumnVector` whenever such a batch reached a Comet operator --
notably OPTIMIZE on a Delta table with DVs, where the FileFormatWriter
threads a constant column through Comet's columnar pipeline.
Add `Utils.materializeConstantColumnVector` that allocates a fresh
Arrow `FieldVector` sized to the batch's row count and pre-fills it
with the constant value (or all-nulls when `isNullAt(0)`). Handles
the primitive types the OPTIMIZE / partition-column shapes surface:
Boolean, Byte, Short, Int, Long, Float, Double, Date, Timestamp
(MICROSECOND/UTC), String, Binary, Decimal128, Null. Throws for
unsupported types so we surface gaps loudly.
Wire it into both export paths so any Comet operator can ingest a
batch containing constants:
- `NativeUtil.exportBatch`: materialise inline, then `Data.exportVector`
through the existing C-Data path.
- `Utils.getBatchFieldVectors` (used by `serializeBatches`):
materialise into the field-vectors list ahead of the Arrow IPC
write.
Result: OptimizeCompactionSQLSuite "optimize command with DVs" no
longer dies at NativeUtil.exportBatch with the ConstantColumnVector
error. The test now hits a separate failure (`NoSuchElementException`
during a later assertion that depends on our DV materialisation),
which is a distinct issue tracked separately.
No regressions: DeltaCDCScalaWithCatalogOwnedBatch2Suite 55/55,
contrib local Scala suites 63 succeeded / 0 failed / 1 canceled
(pre-existing), other 23 optimize tests still pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…re read `DeltaReflection.extractTableRoot` returns the double-URL-encoded URI form (`pathToSingleEncodedUri` -> `path.toUri.toString`) so the NATIVE side, which reads files via RawLocalFileSystem.pathToFile (which uses the URI raw path verbatim), lands on the literal on-disk filename. The JVM-side path was passing that same double-encoded string straight into `new Path(tableRoot)` and on to `HadoopFileSystemDVStore.read`, which resolves the DV file via Hadoop FS. Hadoop's raw path encoding ended up doubly-encoded relative to the on-disk filename, so the DV store opened a non-existent file and threw NPE on the null result. `materializeDeletedRowIndexes` then returned `None`, our DeltaScanRule treated that as a fatal DV-failure and declined, and the scan fell back to Spark+Delta -- which on OPTIMIZE under DVs leads to a NoSuchElementException downstream because a subsequent commit wasn't written. Fix: URL-decode `tableRoot` once before passing to `new Path(...)`. Now Hadoop's URI form re-encodes back to the single-encoded form, RawLocalFileSystem.pathToFile decodes once, and the DV store hits the right on-disk file. Result: `OptimizeCompactionSQLSuite "optimize command with DVs"` now passes (1/1). No regressions: contrib local 63/63, DeltaCDCScalaWithCatalogOwnedBatch2Suite still 55/55. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…=false Delta's `deletionVectors.useMetadataRowIndex` is internal+default=true. The DeletionVectorsTestUtils.PredicatePushdownDisabled mixin flips it to false, switching Delta from parquet's `row_index` virtual column to a synthetic `__delta_internal_is_row_deleted` column above the scan. In that mode, post-MERGE-with-persistentDV reads return wrong row counts -- the original target files emit zero rows through our scan path. The DV bitmap reads correctly, the synthetic emit logic mirrors Delta's, but the batches downstream of the synthetic exec come back empty (root cause is in the interaction between the Delta-side filter rewrite + our per-file-group native plan + the DV synthetic emit; needs deeper file-level investigation we can't land here). Pragmatic fix: when `useMetadataRowIndex=false` AND `__delta_internal_is_row_deleted` is in scan.requiredSchema AND any AddFile carries a DV, decline. Spark+Delta then handles the read correctly. Production impact bounded: `useMetadataRowIndex` is internal+default true, so production reads use parquet's `row_index` path (which our scan handles correctly). The fallback only triggers for tests / users who explicitly disable the conf. Result: MergeIntoSchemaEvolutionBaseExistingColumnSQLPathBasedCDCOnDVsPredPushOffSuite goes from 33/42 -> 42/42. No regression elsewhere: DeltaCDCScalaWithCatalogOwnedBatch2Suite 55/55, OptimizeCompactionSQLSuite 24/24, StatsCollectionSuite 65/67 (2 unrelated `recompute stats` failures), contrib local Scala 63/63. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…adata.file_path When Spark packs multiple files into one Spark partition AND our scan projects `_metadata.file_path` (the recompute-stats / per-file-aggregate shape), we create multiple DataFusion file_groups (one per file) so DeltaSyntheticColumnsExec emits a 1:1 per-file `file_path` metadata mapping. But Spark consumes only ONE DataFusion partition per Spark partition -- the 2nd+ files' batches are silently dropped, their rows never reach Spark, and any per-file groupBy missing them. Surfaced in StatsCollectionSuite "recompute stats multiple columns and files" / "recompute stats on partitioned table": recompute does `groupBy(_metadata.file_path).agg(min, max, ...)`. With 3 files and 2-file-per-partition packing, one file's rows go missing and its recomputed stats come out null. Fix: tag scans that project `file_path` with `NeedsInputFileNameOption` (same plumbing as the `input_file_name()` path), which propagates through to `oneTaskPerPartition=true` on `CometDeltaNativeScanExec`. Each file then becomes its own Spark partition with its own DataFusion partition (1:1 alignment, no dropped data). Narrow gate: only triggers when `file_path` is in scan.output. Other per-file metadata cols (`base_row_id`, `default_row_commit_version`, row-tracking helpers) commonly appear in unrelated scans and the existing file packing handles them correctly. Result: StatsCollectionSuite 65/67 -> 67/67. No regression: OptimizeCompactionSQLSuite still 24/24, contrib local 64/64, parent CDC 55/55 (separate suite still running, will verify). Also drops a now-redundant local CometDeltaRegressionReproSuite stats test that reproduces the path locally; it passes today because our withDeltaTable doesn't trigger the multi-file packing that the upstream test exercises, but the upstream test itself is the authoritative reproducer. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Refactor `extract_storage_config` in the JNI bridge to delegate to a
pure `delta_storage_config_from_map`, making the kernel/Hadoop key
matrix testable in isolation. Removed the now-unused `map_get_string`
helper.
Add two test surfaces that document the known gaps so closing them
forces an obvious test update:
* `jni::tests::extract_storage_config_matrix` -- positive coverage
for kernel-style keys, Hadoop-style fallbacks, and force-path-style.
* `jni::tests::extract_storage_config_known_gaps` -- negative
assertions for GCS (`gcp_*`), per-bucket S3, and Hadoop Azure
account/OAuth keys -- they pass today because the gap exists, and
flip to failures the moment we wire those keys through.
* `CometDeltaCredentialAuditSuite` (Scala) -- pairs the native gaps
with their JVM-side companions:
- asserts s3/s3a/gs/wasb/wasbs prefixes extract the expected keys;
- GAP: abfs/abfss schemes drop `fs.azure.*` keys because
`NativeConfig.objectStoreConfigPrefixes` only registers
`fs.abfs.` / `fs.abfss.` for them, missing OAuth/MSI creds that
Hadoop users have historically set under `fs.azure.*`;
- GAP: `augmentWithResolvedAwsCredentials` doesn't synthesize
per-bucket `fs.s3a.bucket.<name>.*` keys after credential-chain
resolution.
No behavior change for currently-supported credential paths; the
audit only adds regression coverage.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CometDeltaScanConfAuditSuite locks in the contract for every SQLConf /
table property that can change scan output. Each conf gets either a
CONTRACT_NATIVE assertion (engages + matches vanilla) or a
CONTRACT_FALLBACK assertion (declines + matches vanilla):
* DV: useMetadataRowIndex=false declines on DV-bearing reads
* DV: useMetadataRowIndex=true (default) engages native
* DV: MERGE_USE_PERSISTENT_DELETION_VECTORS=true read-side engages
* columnMapping.mode in {none,name,id} engages native
* rowTracking enabled does not disengage
* Comet kill switch (spark.comet.scan.deltaNative.enabled=false) forces
fallback
Plus one GAP marker:
* path-based readChangeFeed routes through DeltaCDFRelation which our
rule doesn't intercept -- table-API CDC reads (covered by
CometDeltaCdcSuite) do engage. Flip this when the rule learns
DeltaCDFRelation.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CometDeltaMetadataColumnAuditSuite projects every recognised _metadata.* / row-tracking column individually and asserts the values match vanilla Spark. Past bugs that motivated this: * silent nulls / row drops when projecting _metadata.file_path on multi-file partitions (commit 8c3cf6c) * column-count off-by-one for default_row_commit_version emit (commit 97c953a) * synthetic-column stripped-length bug in projection_vector (commit 208f083) Covers: * _metadata.file_path / file_name / file_size / file_modification_time * _metadata.file_block_start / file_block_length * _metadata.row_index (row-tracking-enabled) * input_file_name() * _metadata.row_id (unmaterialised) * _metadata.row_commit_version * multi-metadata projection on RT + DV (the off-by-one repro shape) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CometDeltaTypeRoundTripAuditSuite covers Spark/Delta type round-trip through the native scan vs vanilla: Positive matrix: * primitives at edge values (TINYINT/SMALLINT/INT/BIGINT extremes, FLOAT/DOUBLE +/-Infinity, NULLs) * DECIMAL at (5,0) / (10,2) / (18,6) / (38,18) * DATE / TIMESTAMP / TIMESTAMP_NTZ * BINARY (empty, single byte, hex, long) * nested STRUCT<>, ARRAY<>, MAP<> * CHAR(N) / VARCHAR(N) Discovered gaps (assert-and-document; promote when fixed): * column mapping mode=name + complex types triggers AQE AdaptiveSparkPlanExec.setLogicalLinkForNewQueryStage assertion -- likely missing/mismatched logical link on the serialized native scan node * column mapping mode=id post-RENAME COLUMN triggers the same AQE assertion -- same root cause suspected * VARIANT: probe + tolerant assertion (engages OR declines, but if it engages results must match vanilla -- guards against silent corruption when VARIANT support is added) The CM-mode AQE finding is a follow-up worth its own ticket. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Delta stores partition values as strings in the log; coercion back to the partition column's logical type happens at scan time. Past bugs in this area are silent (wrong epoch, lost precision, double-applied timezone), so we lock in the contract with native-vs-vanilla matches. Matrix: * DATE (epoch / current / max + partition-pruning filter) * TIMESTAMP (UTC values, microsecond fractions) * TIMESTAMP_NTZ (no TZ shift) * DECIMAL(18,6) at sign/precision extremes * BIGINT (Long.MinValue / MaxValue) * STRING with spaces, percent-encoded literals, empty, NULL * BOOLEAN * Multi-column (dt, region) with IsNull pruning * Session-TZ swap (write under UTC, read under LA) -- guards against double-applied timezone normalization Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CometDeltaFilterPushdownAuditSuite locks in correctness for the Spark filter shapes that flow into the native scan (whether pushed down or post-filtered, the result rows must match vanilla): Positive matrix: * EqualTo, EqualNullSafe (=, <=>) * GreaterThan/LessThan/Ge/Le * IsNull / IsNotNull on data column * In / NotIn (incl. InSet-sized list) * StringStartsWith / EndsWith / Contains * AND/OR/NOT combinations * Partition-column filters (EqualTo, In, GreaterThan) * Nested struct field reference (s.a, s.b) * Cast-coerced literal (INT literal vs BIGINT column) Discovered gap (assert-and-document): * DV-bearing table + range filter `id > 10 AND id < 25` -- native returns 19..24 while vanilla returns 11..24. Suspect stats-based data-skipping interaction with DV-bearing files reporting min/max that exclude DV'd rows. Filed as a follow-up; the GAP-marker test asserts native row count is LESS than vanilla and flips to a failure (with promotion guidance) once the mis-pruning is fixed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Audit 5 surfaced a silent correctness bug: a range filter over a DV-bearing single-file table (rows 0..29, DELETE id%4=0) returned only 19..24 where vanilla Delta returns 11..24. Root cause: when the synthetic `__delta_internal_is_row_deleted` column is emitted, DeltaSyntheticColumnsStream walks a running `current_row_offset` counter to look up each row's membership in the file's `deleted_row_indexes` (the materialized DV). That counter assumes the parquet reader returns EVERY row in physical order. But the same scan also pushes `data_filters` down to the parquet reader, which skips non-matching rows -- so `current_row_offset` no longer tracks the true parquet row_index, and the DV bitmap gets applied to the wrong stream positions (the first N rows of the post-filter stream rather than the DV'd physical rows). The outer Spark Filter then drops those wrongly flagged rows. Fix: suppress data-filter pushdown to parquet when emit_is_row_deleted is set (core_glue.rs). Spark's outer Filter still applies the predicates correctly; we only forgo parquet-level pruning for this case. Partition filters are unaffected (they prune file groups earlier). DV filtering without the synthetic column (the common read path) still pushes filters down normally. Promotes the audit GAP-marker test in CometDeltaFilterPushdownAuditSuite to a positive `assertDeltaNativeMatches`, plus two extra filters that straddle the DV'd indexes. Verified no regression across the features/CDC/metadata/scan-conf suites (28 tests). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Audit 3 surfaced an INTERNAL_ERROR on column-mapping tables: any plan with a shuffle above the native Delta scan (orderBy / join / aggregate) hit AdaptiveSparkPlanExec.setLogicalLinkForNewQueryStage's `assert(link.isDefined)`. Originally thought to be complex-types or post-rename specific; isolated to: CM-mode + ANY exchange. Root cause: nativeDeltaScan propagated the logical link only from `op.wrapped` (the wrapped FileSourceScanExec). In column-mapping mode Delta builds that scan WITHOUT a logicalLink even though the surrounding CometScanExec (`op`) has one. So the contrib's exec inherited no link, and the built-in CometExecRule "set up logical links" pass -- which re-derives a CometExec's link from `originalPlan.logicalLink` (== `op.wrapped.logicalLink`) and UNSETS the tag when empty -- guaranteed the exec ended up linkless. When AQE then wrapped it in a query stage at the exchange boundary, the assertion fired. Fix: seed the link as `op.wrapped.logicalLink.orElse(op.logicalLink)`, set it on BOTH op.wrapped (so the downstream CometExecRule pass agrees) and the exec. JVM-only change; no native rebuild required. Promotes the two audit GAP markers in CometDeltaTypeRoundTripAuditSuite to positive round-trip assertions and adds a minimal simple-types + orderBy/aggregate regression that isolates the root cause. Verified no regression across column-mapping / scan-conf / native / features suites (34 tests). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…version Delta own-suite regression (DeltaTimeTravelSuite / DeltaHistoryManager*): a versionAsOf/timestampAsOf read returned the LATEST version's data (e.g. v0 returned 20 rows instead of 10). Root cause: DeltaReflection.refreshedSnapshotFiles (used by extractBatchAddFiles for PreparedDeltaFileIndex) unconditionally called deltaLog.update() to refresh the snapshot to HEAD before filesForScan. That refresh exists to pick up fresh deletion-vector descriptors for consecutive DELETEs that reuse a cached FileIndex -- but for a time-travel query it discards the pinned version and returns current files. Fix: gate the head-refresh on PreparedDeltaFileIndex.versionScanned. When it's Some(v) (time travel), use the pinned preparedScan.scannedSnapshot instead -- historical versions are immutable so the DV-staleness reason to refresh does not apply. Non-time-travel reads keep the head refresh. Reproduced by CometDeltaTimeTravelReproSuite (versionAsOf=0 must return v0's rows, not head); fails before / passes after. JVM-only change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…TT gate Follow-up to 7c59938. Rather than gate the head-refresh on versionScanned (a band-aid), drop the head-refresh entirely: always re-query `preparedScan.scannedSnapshot` -- the snapshot the scan was prepared against, which is exactly what vanilla Spark+Delta reads (PreparedDeltaFileIndex extends TahoeFileIndexWithSnapshotDescriptor over it). The earlier `deltaLog.update()` to head (commit 830c979) was meant to pick up DV descriptors written after a cached FileIndex was built, but refreshing to head makes Comet read a DIFFERENT snapshot than vanilla: it diverges on the consecutive-DELETE / DeltaLog-cache case (that commit's own point apache#2 acknowledges vanilla returns the "stale" count) and it returned the LATEST version for time-travel reads. Re-querying the prepared snapshot via filesForScan still picks up that snapshot's freshest DV descriptors, matches vanilla in every case, and needs no time-travel special-casing. Renamed refreshedSnapshotFiles -> preparedSnapshotFiles. Verified: 24 contrib tests pass incl. the DV double-DELETE (CometDeltaColumnMapping Suite) the head-refresh was added for, and CometDeltaTimeTravelRepro Suite. JVM-only. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ptiveBroadcastExec)
Delta own-suite regression F1: MERGE into a partitioned table (and any
broadcast join whose key is the partition column) over a native Delta
scan threw "CometSubqueryAdaptiveBroadcastExec ... does not support the
execute() code path."
Root cause (two layers, both confirmed by repro):
1. The AQE DPP subquery arrives as an unexecutable placeholder:
CometExecRule wraps Spark's SubqueryAdaptiveBroadcastExec into
CometSubqueryAdaptiveBroadcastExec, which
CometPlanAdaptiveDynamicPruningFilters is meant to rewrite to an
executable (Comet)SubqueryBroadcastExec.
2. That rewrite is ORPHANED for a scan buried inside a native block:
transformUp converts the scan, but the converted copy is dropped
when the parent CometNativeExec is rebuilt (TreeNode.makeCopy can't
carry @transient fields -- the apache#3510 issue; verified the converted
node is not reachable in the rule's output). So the live scan keeps
the placeholder, and Comet's native-scan subquery lifecycle
(CometLeafExec.ensureSubqueriesResolved -> waitForSubqueries)
executes it -> throw.
Fix (self-contained in CometDeltaNativeScanExec; no base-Comet change):
* Override ensureSubqueriesResolved to resolve only executable DPP
subqueries and skip adaptive-broadcast placeholders (so they don't
execute and crash).
* In applyDppFilters, convert a surviving placeholder on the fly to an
executable SubqueryBroadcastExec (its `child` is the already-
materialized broadcast) and resolve THAT to recover pruning values,
falling back to scanning all tasks on any failure.
Results are always correct (the surrounding join filters regardless of
pruning). Real partition pruning applies when the scan recomputes
partitions at execution. Reproduced + guarded by CometDeltaDppReproSuite
(forces DPP via dynamicPartitionPruning.useStats=false).
Note: when the scan executes inside a parent native block, the parent's
findAllPlanData reads a planning-time perPartitionData snapshot (memoized
before the broadcast is ready, to keep numPartitions stable), so DPP
pruning is not yet applied in that path -- the scan reads all partitions
(correct, unpruned). Closing that requires emptying DPP-pruned partitions
at execution while keeping the partition count fixed; tracked separately.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Durable record of deliberate tradeoffs (DPP pruning in native-block case, credential plumbing gaps, path-based CDF decline, VARIANT, decline gates) and pending regression failure families (row-tracking materialization + untriaged), so each can be opened as a GitHub issue once the work merges. Cross-references the guarding GAP-marker tests and the fixes landed so far. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ative-block) case Builds on the no-crash fix (64cd878) to deliver actual partition pruning when a DPP broadcast join / MERGE targets a partitioned Delta table and the scan is buried inside a parent native block. Two root causes solved: 1. Orphaned rewrite (apache#3510). CometPlanAdaptiveDynamicPruningFilters rewrites the CometSubqueryAdaptiveBroadcastExec placeholder into an executable CometSubqueryBroadcastExec (with broadcast reuse), but the rewritten scan COPY was dropped when transformUp rebuilt the enclosing native block (TreeNode.makeCopy can't carry @transient fields -- verified the converted node was unreachable in the rule output). Fix: CometDeltaNativeScanExec implements `withDynamicPruningFilters` to install the rewrite IN PLACE via a transient side-channel (`dppFiltersOverride`) and return `this`, so it lands on the SAME instance that executes. The case-class `dppFilters` field is untouched, so node equality/canonicalization is unaffected. Added a minimal `dynamicPruningFilters`/`withDynamicPruningFilters` hook to the `CometScanWithPlanData` trait (default no-op) + a rule case; base scans (CometNativeScanExec/Iceberg) return Nil and are unaffected. 2. Fixed partition count vs runtime pruning. The native scan's partition count is pinned at planning. Fix: group ALL tasks once (`taskGroups = packTasks(allTasks)`) so the count is stable, then prune tasks WITHIN each group at execution -- a fully-pruned group becomes an empty DeltaScan (0 rows) but its partition slot remains. `perPartitionData` is recomputed (not memoized) so a parent block's findAllPlanData sees pruned task lists after the broadcast materializes; `numPartitions` reads `taskGroups.length` so counting never triggers broadcast resolution. Residual safety net: if the rule didn't run, ensureSubqueriesResolved / applyDppFilters skip the unexecutable placeholder and read all partitions (correct, unpruned) instead of crashing. CometDeltaDppReproSuite now asserts the fact scan reads ~120 of 2000 rows (real pruning), not just correctness. Verified no regression across 56 contrib tests (features/CDC/metadata/filter/column-mapping/native/partition/DPP). Updated docs/08-known-limitations.md A1 -> FIXED. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Confirmed, faithful local reproductions for every still-failing Delta 4.1 own-suite family after the F1/F2/apache#198 fixes. Triage collapsed the ~70 original failures into three remaining root causes (the rest are F1/F2, fixed): * F3 row tracking (dominant): native scan synthesizes row IDs from base_row_id+row_index instead of reading the materialized stable columns (_row-id-col-*/_row-commit-version-col-*), so IDs change across any rewrite. Covers all rowid/RowTracking{Merge,Delete,Compaction, ReadWrite}Suite failures (z-order, auto-compact, MERGE row-id stability, DELETE-with-DV-disabled, optimized writes, materialized/ conflicting columns). Repro: row IDs change across OPTIMIZE (v_0: 1->141). * F4 protobuf recursion: a WHERE of ~101 AND'd predicates builds a deep boolean expression that exceeds protobuf's recursion limit (100) when serialized to the native plan -- InvalidProtocolBufferException "too many levels of nesting". From DataSkippingDeltaTests. * F6 corrupted-file error compatibility (SC-8810): a 0-byte data file makes Comet's native reader throw "Requested range was invalid" instead of Spark's FAILED_READ_FILE. CometDeltaPendingReproSuite holds one minimal repro per family, each asserting the correct behavior and marked `ignore` so CI stays green (3 ignored, 0 failed); un-`ignore` to drive each fix. docs/08-known-limitations updated with B3 triage + new B5 (F4) and B6 (F6) entries. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…rquet
Delta own-suite regression F3 (dominant remaining family -- all
rowid/RowTracking{Merge,Delete,Compaction,ReadWrite}Suite failures:
z-order, auto-compact, MERGE/DELETE row-id stability, optimized writes,
materialized columns).
Root cause: when a file is rewritten (OPTIMIZE/z-order/compaction/MERGE/
UPDATE) on a row-tracking table, Delta persists stable row IDs / commit
versions into real parquet columns `_row-id-col-<uuid>` /
`_row-commit-version-col-<uuid>`. The Spark plan reads
`coalesce(_metadata.row_id, base_row_id + row_index)`. The native scan
classified those names as synthetic and synthesised base_row_id+row_index
instead of reading the persisted values -- so after any rewrite the row IDs
changed (new file => new base+index) rather than staying stable.
Fix (CometDeltaNativeScan.convert): treat `_row-id-col-*` /
`_row-commit-version-col-*` as REAL parquet columns --
* add them to the file data schema (materializedRowTrackingFields), read
by name (null for files that don't carry them);
* remove them from every synthetic classification (isSynthetic,
isExtraSyntheticName, metadataColumnNamesEmitted, and the
projection-vector isSyntheticFieldName).
The downstream coalesce then uses the persisted stable value when present
and falls back to base+index only when null. base_row_id / row_index /
default_row_commit_version stay synthesised; filter pushdown on the
materialized columns stays conservatively disabled.
Guard: CometDeltaRowTrackingMaterializedSuite (row IDs stable across
OPTIMIZE and UPDATE; materialised row_commit_version matches vanilla). F3
repro moved out of CometDeltaPendingReproSuite (F4/F6 remain). Verified no
regression across 55 contrib tests (features/CDC/metadata/column-mapping/
native/filter/scan-conf). Full RowTracking* Delta-suite verification pending
the next full regression re-run (per "don't re-run until fixes land").
docs/08-known-limitations B3 -> FIXED; B4 row-tracking families addressed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…rsion limit
CORE Comet change (not contrib-only).
Delta own-suite regression F4 (DataSkippingDeltaTests "remove redundant
stats column references"): a WHERE with ~200 conjuncts builds a left-deep
And chain. CometFilter serializes it as a left-deep BinaryExpr proto; when
the serialized plan is re-parsed on the JVM
(CometNativeExec.findShuffleScanIndices -> Operator.parseFrom) it exceeds
protobuf's default 100-level recursion limit ("Protocol message had too
many levels of nesting"). The Rust prost decoder is subject to the same
limit.
Fix: balance associative And/Or chains at serialization time so the proto
is O(log n) deep instead of O(n).
* QueryPlanSerde.createBalancedBinaryExpr + flattenAssociative (new
helpers next to createBinaryExpr).
* CometAnd / CometOr flatten the chain and emit a balanced BinaryExpr
tree instead of the natural left-deep one.
Comet evaluates And/Or vectorially (both sides always evaluated, no
row-level short-circuit), so rebalancing the associative chain is
semantically identical -- it only changes the proto shape. Fixes both the
JVM and Rust parse paths (vs only raising a JVM parse limit). The contrib
already balanced the scan PREDICATE for the same reason; this extends it to
the Filter operator's condition.
Verified: F4 repro passes (now a guard in CometDeltaPendingReproSuite); no
regression in base CometExpressionSuite (123 tests) or contrib
filter/feature/partition suites (27 tests). docs/08-known-limitations B5 ->
FIXED. Removed the F4-DIAG instrumentation from CometDeltaNativeScan.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CORE Comet change (not contrib-only). Delta own-suite regression F6 (DeltaSuite "SC-8810: skipping deleted file still throws on corrupted file"): a 0-byte/corrupted data file makes the native reader fail in the object_store layer (before parquet parsing) with "External: Generic LocalFileSystem error: Requested range was invalid". CometExecIterator only wrapped messages starting with "Parquet error:" into Spark's FAILED_READ_FILE.NO_HINT, so this object-store error surfaced as a bare CometNativeException and the test's message assertion failed. Fix: CometExecIterator.isFileReadError now also recognises object-store read failures -- "Requested range was invalid" (truncated/empty file), "Object at location ... not found", and the generic "Generic <Store> error:" object_store format (LocalFileSystem/S3/GCS/...) -- and wraps them via ShimSparkErrorConverter.wrapNativeParquetError, matching the FAILED_READ_FILE.NO_HINT error Spark's own reader produces. Signatures are file/object-store specific, so non-file native errors aren't mis-wrapped. Also renamed CometDeltaPendingReproSuite -> CometDeltaEdgeCaseRegressionSuite (all its repros -- F4, F6 -- are now fixed and serve as passing guards). Verified: F6 + F4 repros pass; no regression across contrib native/feature/ edge-case suites (22 tests); base SparkErrorConverterSuite unaffected. docs/08-known-limitations B6 -> FIXED. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
From the fresh-context branch review. Two blocking CORE findings + mediums/lows: H1 (CORE, CometScanRule): the new FS-scheme allowlist ran on every V1 HadoopFsRelation native scan and excluded hdfs + custom libhdfs schemes, disabling native scans for non-Delta HDFS users and breaking ParquetReadFromFakeHadoopFsSuite (registers scheme `fake` via COMET_LIBHDFS_SCHEMES). Fix: union the configured `spark.hadoop.fs.comet.libhdfs.schemes` into the allowlist (case-insensitive). H2 (contrib, core_glue.rs): data-filter pushdown was suppressed only for emit_is_row_deleted, but row_index and row_id (= base_row_id + row_index, unmaterialised) use the same physical-position running counter. A pushed filter skips rows and decouples the counter -> wrong row_index/row_id (repro: `id >= 50` gave row_id 0..49 for ids 50..99). Fix: suppress pushdown when emit_is_row_deleted || emit_row_index || emit_row_id. Guarded by a new test in CometDeltaRowTrackingMaterializedSuite (verified failing before). M1 (CORE, CometExecIterator): the broad `Generic <Store> error:` match in isFileReadError would mis-wrap the non-file config error "Generic HadoopFileSystem error: Hdfs support is not enabled in this build" as FAILED_READ_FILE, masking it. Dropped the broad match; the F6 case is still covered by the specific "Requested range was invalid" / object-not-found phrasings. M2 (CORE, missing_file_tolerant.rs): IgnoreMissingFileSource.as_any returned the wrapper, hiding ParquetSource from DataFusion downcasts (could disable parquet-specific optimizations). Delegate to inner.as_any() (nothing downcasts to the wrapper; source ops still flow through its trait methods). M3: added a column-mapping id-mode + materialized row-tracking test (exercises name-fallback for the no-field-id materialized columns). L1: gate per-partition pruning on effectiveDppFilters (the rule's in-place rewrite), not raw dppFilters. L2: mark dppFiltersOverride @volatile. L4: materializeConstantColumnVector now handles TimestampNTZType (intervals correctly still throw -- not valid constant/partition columns). L5: fix CometScanWithPlanData scaladoc (Iceberg does NOT use the trait). Verified: 34 contrib tests (row-tracking incl. H2/M3, edge-case F4/F6, features/partition/filter) + base CometExpressionSuite (123) all pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tForSubqueries path) The full-regression re-run surfaced the MergeInto "...isPartitioned: true" tests still crashing with "CometSubqueryAdaptiveBroadcastExec ... does not support the execute() code path" -- even though F1/apache#198 fixed the SELECT-join case. Root cause: the optimizer rule's DPP rewrite is installed in-place via a transient `dppFiltersOverride` var (NOT a constructor field), so it is LOST whenever the plan is COPIED after the rule runs -- which MERGE does (it re-plans the target read internally). The executing scan then reverts to the placeholder-bearing `dppFilters`. F1 only guarded the fused-block resolution path (`ensureSubqueriesResolved`), but a MERGE target read executes the scan as a native-block ROOT via the STANDARD lifecycle (`CometNativeColumnarToRowExec` -> child.executeColumnar() -> SparkPlan.waitForSubqueries()), which executed the placeholder and crashed. Fix: override `waitForSubqueries` on CometDeltaNativeScanExec too, sharing `resolveExecutableDppSubqueries()` with `ensureSubqueriesResolved` -- resolve only executable DPP subqueries, skip adaptive-broadcast placeholders. The native scan has no non-DPP subqueries, so not delegating to super is safe. Result: crash-safe in both resolution paths; pruning still applies when the in-place rewrite survives (SELECT joins), and a MERGE/re-planned scan reads all partitions (correct -- the MERGE join filters -- just unpruned). Added a MERGE-into-partitioned guard to CometDeltaDppReproSuite; existing DPP/row-tracking/feature repros still pass. docs/08-known-limitations A1 updated with the transient-override / waitForSubqueries detail. Verification of the real MergeIntoSuite tests is via the in-progress full regression re-run. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…bsent across concurrent file-groups) Regression introduced by F3 (read materialized `_row-id-col-*` from parquet), surfaced by the full Delta 4.1 run: RowTrackingMergeCommonNameBasedCDCOnSuite "INSERT NOT MATCHED only MERGE" et al. failed -- a row-tracking table read with `_metadata.row_id` after an INSERT/MERGE non-deterministically returned far fewer rows than written (1600-4800 of 6000 across runs). Root cause: the materialized `_row-id-col-<uuid>` column is physically present only in files rewritten by a row-id-preserving op -- ABSENT from freshly appended/inserted files (often absent from every file). F3 reads it as a parquet data column. When one Spark partition packs several such files, core_glue emits one file-group per file (needed for per-file row_index), and reading a column physically absent from some files across the concurrently-executed file-groups non-deterministically drops whole file-groups' rows. Forcing one file per Spark partition reads the full row set correctly -- confirming cross-file-group concurrency is the trigger, not the null-fill value. Fix: CometDeltaNativeScan.createExec sets oneTaskPerPartition=true when the scan reads materialized row-tracking columns, so each such file is its own Spark partition => each native plan is single-file-group => the absent-column null-fill runs without cross-file-group concurrency. Same mechanism already used for input_file_name(). Guard: CometDeltaRowTrackingMergeReproSuite (INSERT-only MERGE; native key set == vanilla, full count). Verified against RowTrackingMergeCommonNameBasedCDCOnSuite (17/17 pass). docs/08-known-limitations B7. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Briefing
This PR lands a native Delta Lake scan for Comet. It supersedes #3932 — the
SPI/registry design discussed there was rejected in favor of the Iceberg-style
contrib pattern this PR uses (typed proto variant + ~40 lines of feature-gated
core touchpoints + standalone
contrib/delta/tree). Default builds areentirely unaware of this code: no SPI lookups, no
ServiceLoaderscans, nocontrib surface at runtime. Only when the
-Pcontrib-deltaMaven profile (andparallel
contrib-deltaCargo feature) is activated do the contrib classesland on the classpath and the reflection bridge resolve.
The integration reads Delta metadata via
delta-kernel-rson the driver,encodes the resolved file list (with column mappings, DV info, partition
values, row-tracking baseRowId) into a typed
OpStruct::DeltaScanproto, andexecutes via DataFusion's parquet reader on each executor.
Status & recent correctness fixes
All Delta 4.1 own-suite regression failure families triaged this round are
fixed and each has a lightweight contrib regression guard (a full Delta
own-suite re-run is in progress to confirm end-to-end); deliberate tradeoffs and any remaining
limitations are tracked in
contrib/delta/docs/08-known-limitations.mdfor post-merge issues.
CometSubqueryAdaptiveBroadcastExec); prunes to required partitions even inside a native blockscannedSnapshot, not head_row-id-col-*/_row-commit-version-col-*from parquet (stable IDs across rewrites)FAILED_READ_FILE.NO_HINTfs.comet.libhdfs.schemesCoverage
Supported, fully native (broad):
DeltaDvFilterExecfilters rows on executors. DV filter is chained AFTER synthetic emission (sorow_indexreflects original file positions) when both are needednameANDidmode.namerewrites logical→physical names in the planner;idtranslates Delta'sdelta.columnMapping.idto parquet'sPARQUET:field_idon every StructField (including nested struct/array/map) so the parquet reader matches by ID_row-id-col-<uuid>column from parquetrow_id = base_row_id + physical_row_indexper file, all synthesised natively —base_row_idis emitted as a per-file Int64 constant fromAddFile.baseRowIdand_row-id-col-<uuid>is emitted as all-NULL so Delta'sGenerateRowIDsProject falls back to the computed expressionscan.requiredSchemaordinal-by-ordinal so the upstreamFilter(__delta_internal_is_row_deleted = 0)binds correctly__delta_internal_row_index/__delta_internal_is_row_deletedfor UPDATE/DELETE/MERGE flows.is_row_deletedis emitted asInt8(matching Delta'sByteType) to avoid DataFusion's interval-propagator panicking onInt32 vs Int8mismatches in stats pushdown_metadata.*virtual columns (file_path / file_name / file_size / file_block_start / file_block_length / file_modification_time) detected fromscan.outputeven when not inscan.requiredSchemafinal_output_indices, native dispatcher wraps with aProjectionExecso downstream operators that bind by ordinal don't silently misread one synthetic as anotherspark.sql.parquet.fieldId.read.enabled=true(same wiring as CM-id)input_file_name()and friends — one-task-per-partition + a per-taskInputFileBlockHolderhook inCometExecRDD+CometDeltaNativeScanExecplumbs per-partition file paths through to the RDDFAILED_READ_FILE.NO_HINTexception wrapping with file pathCometParquetUtilsconfig check_delta_log,_change_data, and_commitsparquet reads via the same scanSimpleAWS/TemporaryAWS/AssumedRole/IAMInstance) resolved Scala-side at planning time so kernel log replay authenticates under the same chain as data reads. Reflective lookup againstS3AUtils.createAWSCredentialProviderList; cachedMethodhandlescheckLatestSchemaOnRead=false— our path is pinned to a single snapshot version viaextractSnapshotVersion(relation)so the Delta-side at-read check doesn't apply to usversionAsOf/timestampAsOf) and snapshot reads — files are resolved from the snapshot the scan was prepared against (preparedScan.scannedSnapshot), exactly what vanilla Spark+Delta reads; re-queryingfilesForScanpicks up the freshest DV descriptors that snapshot carriesfile://Falls back to Spark's reader (with
withInforeason surfaced in explain-fallback):Correctness fallbacks — load-bearing, do not remove:
Shared Comet limits (apply to any native scan, not Delta-specific) — each is its own per-case work in core:
CometParquetUtils.isEncryptionConfigSupportedfake://etc.) —object_storehas no Hadoop FS plugin layer; would need a bridgeCometScanTypeCheckerrejections (ShortTypeunder default config, string collation, variant struct) — each is a Comet-core feature gap, not a Delta-contrib problem. Variant in particular: arrow-rs hasparquet-variantcrates but Comet hasn't integrated them yetExternal:
TahoeLogFileIndexWithCloudFetch— Databricks-proprietary file index, not in OSS Delta. Defensive guard for DBR users onlyWorkaround tracked upstream:
CreateArraywith mismatched element types — caller-side decline for apache/datafusion#22366. Removable once upstream landsUser off-switches:
spark.comet.scan.deltaNative.enabled=false,spark.comet.exec.enabled=falseShape
delta_scan = 117native/proto/src/proto/operator.protospark/.../comet/rules/DeltaIntegration.scalaspark/.../comet/rules/CometScanRule.scalaspark/.../comet/rules/CometExecRule.scalaPlanDataInjector.opStructCasespark/.../sql/comet/operators.scalaCometExecRDD,CometNativeScanExec,CometExecIterator,ShimSparkErrorConverterinput_file_name()andFAILED_READ_FILE.NO_HINTwrapping in any native scan)contrib/delta/native/src/core_glue.rs(compiled into core via#[path]; see "Why the dispatcher file lives in contrib but compiles in core" below)contrib/delta/src/main/scala/...contrib/delta/native/src/*.rsspark/pom.xml,contrib/delta/native/Cargo.toml,native/core/Cargo.tomldev/verify-contrib-delta-gate.shcontrib/delta/dev/run-regression.sh+dev/diffs/delta/4.1.0.diffKey design decisions
Iceberg-style contrib, not SPI. Static helper objects with stable names
(
DeltaScanRule.transformV1IfDelta,CometDeltaNativeScan.MODULE$); a singlereflection bridge in core resolves and caches
Methodhandles once per JVM.No registry, no
ServiceLoader, no extension points beyond what core alreadyexposes. The contrib is just classpath-or-not.
Typed proto, not an envelope.
OpStruct::DeltaScanis a first-classvariant. Avoids the
ContribOp { kind, payload }envelope discussed in #3932;PlanDataInjectorkeys byOpStructCasefor O(1) dispatch.Split-mode plan serialization.
CometDeltaNativeScan.convertemits aDeltaScan proto with the
commonblock only (schemas, table root, filters);each partition's
tasksride in a per-partition byte array viaPlanDataInjectorat execution time. Avoids closure-capturing every file inevery partition.
Native synthetic-column synthesis.
DeltaSyntheticColumnsExec(incontrib/delta/native/src/synthetic_columns.rs) emits the standard fourDelta internals (
__delta_internal_row_indexas Int64,__delta_internal_is_row_deletedas Int8,
row_id,row_commit_version) PLUS Spark_metadata.*virtualcolumns PLUS row-tracking-specific synthetics (
base_row_idper-fileconstant from
AddFile.baseRowId,_row-id-col-<uuid>/_row-commit-version-col-<uuid>as NULL-filled). When emit is on, each file gets its own
FileGroupso theper-file row offset / baseRowId arithmetic is well-defined.
Synthetic-suffix ordering matters. The wrapped exec's output ordering is
checked against
scan.requiredSchemaAND the canonical native emit order. Ifthe synthetic block isn't already in canonical order at the right ordinals,
the proto carries
final_output_indicesand the native dispatcher wraps witha
ProjectionExecto reorder. Without this, an upstreamFilter(__delta_internal_is_row_deleted = 0)binding by ordinal would silentlymisread
row_indexasis_row_deleted(caught and fixed mid-PR; theDV-after-DELETE test bisected the bug to a one-ordinal swap).
DV filter chained after synthetic emission, not mutually exclusive. When
both synthetics and a DV are present, we previously chose one wrapper or the
other — which meant any read that surfaced
_tmp_metadata_row_indexgotNO DV filtering applied. The wrappers are now chained:
parquet →
DeltaSyntheticColumnsExec→DeltaDvFilterExec(skipped whenemit_is_row_deletedis on so UPDATE/DELETE/MERGE writers still see every row).CM-name rename before synthetics. Synthetic columns have fixed names
(never CM-renamed) and are appended AFTER the parquet read; the rename
projection has to apply to the parquet output BEFORE the append so the
length-match check works correctly.
Spark
_metadata.*driven fromscan.output, not justscan.requiredSchema.Delta's PreprocessTableWithDVs strategy can append
_metadata.file_pathtoscan.outputwithout putting it inscan.requiredSchema. The syntheticexec detects these from
scan.outputso the wrapped exec's output schemaincludes them and downstream attribute resolution works.
is_row_deletedis Int8, not Int32. Delta declares the column asByteType. Emitting Int32 trips DataFusion's interval propagator withOnly intervals with the same data type are intersectable, lhs:Int32, rhs:Int8whenever the upstream Filter pushes stats. Caught by the CM + DV combined
coverage test.
InputFileBlockHolderthread-local hook inCometExecRDD.compute.Comet's native scans bypass Spark's
FileScanRDD, so the standardinput_file_name()thread-local would otherwise be empty for any nativescan (not just Delta). One small but load-bearing core change that fixes
both Delta's UPDATE/DELETE/MERGE flows AND the
FAILED_READ_FILE.NO_HINTerror wrapping.
CometDeltaNativeScanExecplumbs its per-partition filepaths through to
CometExecRDDsoInputFileBlockHolder.set(path)firescorrectly.
Read from the prepared snapshot (not head) for PreparedDeltaFileIndex.
preparedScan.filescaches the AddFile list at FileIndex construction time.DeltaReflection.preparedSnapshotFilesre-queriespreparedScan.scannedSnapshot.filesForScan(filters, false).files— the snapshotthe scan was prepared against, which is exactly what vanilla reads — to pick up
the freshest DV descriptors that snapshot carries, falling back to the cached
preparedScan.filesif reflection fails. An earlier revision refreshed to headvia
deltaLog.update(), but that returned current data for a time-travel query(versionAsOf=0 yielded head's rows) and diverged from vanilla on the
consecutive-DELETE / DeltaLog-cache-staleness case; reading
scannedSnapshotmatches vanilla in every case.
Engine cache by
(scheme, authority, DeltaStorageConfig). kernel-rs'sDefaultEngine<TokioBackgroundExecutor>spawns one OS thread per executor.Without caching, hundreds of scans/min was leaking threads faster than tokio
reaped them, tripping
pthread_create EAGAIN~2h into regression. The cachebounds live thread count by table-storage diversity instead of by request
count.
DV filter ordering safeguards.
DeltaDvFilterExectrackscurrent_row_offsetacross batches, which assumes physical-order input.Overrides
maintains_input_order() = [true]andbenefits_from_input_partitioning() = [false]so any future optimizer thatwants to insert a
RepartitionExecis forced to bail rather than silentlyre-order rows.
One new core trait method.
PlanDataInjector.opStructCaseis the onlycore trait addition. It keys the existing injector map for O(1) dispatch.
Why the dispatcher file lives in contrib but compiles in core
contrib/delta/native/src/core_glue.rsis physically co-located with therest of the Delta integration but is compiled as a module of the core crate
via
#[cfg(feature = "contrib-delta")] #[path = "../../../../contrib/delta/native/src/core_glue.rs"] mod contrib_delta_scan;. The reason: this file implementsPhysicalPlanner::plan_delta_scanand reaches into core'spub(crate)helpers (
create_expr,init_datasource_exec,prepare_object_store_with_configs). A true cross-crateimplblock isforbidden by Rust, and a
contrib → corecargo dependency would create acycle with core's optional
contrib-deltadep on contrib, so#[path]isthe available tool that lets the FILE's home be with Delta while its
COMPILATION unit stays in core. Build gate (
cfg(feature = "contrib-delta"))is preserved exactly — default builds carry zero Delta surface (see
"Validation" below).
Audit of remaining Delta references in core
After moving the dispatcher body into contrib/, every Delta reference left
in
native/core/src/is either feature-gated or a structural one-line armin an exhaustive
match OpStruct:planner.rs:33-35mod contrib_delta_scan;#[path]-relocated module declaration.#[cfg(feature = "contrib-delta")].planner.rs:1512-1527OpStruct::DeltaScandispatcher armcontrib-deltaCargo feature" so a misconfigured driver gets a clear error.jni_api.rs:op_nameOpStruct::DeltaScan(_) => "DeltaScan"planner/operator_registry.rs:to_operator_typeOpStruct::DeltaScan(_) => NoneOpStructis a proto-generated enum (indatafusion-comet-proto); Rustrequires exhaustive matches everywhere it appears. Keeping the structural
arms un-gated is intentional — it lets default builds identify a misrouted
DeltaScan operator by name in the error message.
Validation
The build gate is enforced by
dev/verify-contrib-delta-gate.sh, which runs6 independent checks across 3 layers and exits non-zero on the first
failure. Designed to be wired into CI.
# Requires a JDK ≥17 on PATH (and as JAVA_HOME for the Maven sub-runs). dev/verify-contrib-delta-gate.shWhat the script asserts:
cargo tree -p datafusion-comet --no-default-featureshas zerocomet-contrib-delta/delta_kernelentriescargo tree -p datafusion-comet --features contrib-deltacorrectly pulls both (catches accidental off)mvn -Pspark-4.1 dependency:listhas zeroio.delta:*depsmvn -Pspark-4.1,contrib-delta dependency:listcorrectly pullsio.delta:delta-sparktest-compileproduces noorg/apache/comet/contrib/**.classand noCometDeltaNativeScan*/DeltaScanRule*/DeltaReflection*classes (only the always-presentDeltaIntegrationreflection bridge)libcomet.dylibis meaningfully smaller (~57 MB delta on macOS arm64 debug build) AND has zerocomet_contrib_delta/delta_kernel/DeltaDvFilter*/DeltaSynthetic*external symbolsCurrent run on this branch: all 6 PASS.
Running the contrib Scala test suite
49 tests across four suites (24 coverage + 25 feature/native/column-mapping):
Current run: 49/49 pass.
CometDeltaCoverageSuiteis the accelerator-coverage matrix — each testasserts BOTH (a) the executed plan contains
CometDeltaNativeScanExec(actually engaged, no silent fall-back) AND (b) the rows match vanilla
Spark+Delta exactly. Covers: SELECT */column-prune/arithmetic/LIMIT/DISTINCT,
filters (eq/neq/IN/IS NULL/BETWEEN/LIKE/AND/OR/NOT), ORDER BY, aggregates
(count/sum/avg/min/max/GROUP BY/HAVING/COUNT DISTINCT), joins
(self/inner/left/leftsemi/leftanti), set ops (UNION/INTERSECT/EXCEPT),
window functions, scalar + IN subqueries, CTEs, partition-pruned reads,
column-mapping reads, DV-bearing reads, nested data (struct/array/map).
Running the contrib Rust test suite
What the in-PR validation looks like end-to-end
dev/verify-contrib-delta-gate.sh— proves default builds carry zero Delta surface.contrib/delta/dev/run-regression.shagainstdev/diffs/delta/4.1.0.diff) — proves we don't regress anything in Delta's own test suite.Review strategy
Suggested order with different bars:
Core touchpoints (~10 minutes, high bar). New core surface area is
small but ships in default builds:
native/proto/src/proto/operator.proto(one OpStruct variant + DeltaScan messages)native/core/src/execution/planner.rs:1512-1527(the actual body lives incontrib/delta/native/src/core_glue.rs; see "Why the dispatcher file lives in contrib but compiles in core" above)spark/.../comet/rules/DeltaIntegration.scala(whole file — reflection bridge)CometScanRule.transformV1Scanand the new case inCometExecRule.transformCometExecRDD+CometExecIterator+CometNativeScanExecdiffs (per-partition file paths,InputFileBlockHolderhook)ShimSparkErrorConverter.wrapNativeParquetErrorspark/.../comet/serde/arrays.scala(CreateArraydecline — references the upstream issue)spark/.../comet/serde/QueryPlanSerde.scala+predicates.scala—CometAnd/CometOrnow serialize a BALANCED And/Or tree (createBalancedBinaryExpr/flattenAssociative) instead of a left-deep one, so a deep boolean predicate doesn't overflow protobuf's 100-level recursion limit when the plan is re-parsed (findShuffleScanIndices). Affects ALL Comet queries; Comet's And/Or is vectorized (non-short-circuiting) so rebalancing is semantically identicalspark/.../comet/CometExecIterator.scala—isFileReadErroralso wraps object-store read failures ("Requested range was invalid", object-not-found) intoFAILED_READ_FILE.NO_HINT, not justParquet error:(matches Spark on corrupted/truncated files). Affects all native scansspark/.../comet/rules/CometScanRule.scala— V1 native-scan filesystem-scheme allowlist (declines schemes object_store can't read); honorsspark.hadoop.fs.comet.libhdfs.schemesso HDFS/custom-libhdfs scans are NOT declinedcommon/.../comet/util/Utils.scala+comet/vector/NativeUtil.scala— materialize SparkConstantColumnVectorto ArrowFieldVector(incl.TimestampNTZType); previously a hard errorspark/.../sql/comet/operators.scala—CometScanWithPlanDatagains optionaldynamicPruningFilters/withDynamicPruningFiltershooks (default no-op) so a scan can have its DPP rewrite installed in place; base scans unaffectedContrib Scala (~30 minutes, contrib bar):
DeltaScanRule.scala— entry point, gates documented under "Coverage" aboveCometDeltaNativeScan.scala— split serde, kernel-rs call, task prune/split/pack, column-mapping fixup, synthetic-column detection + suffix reorder, CM-id field-ID translator, S3A credential chain resolutionCometDeltaNativeScanExec.scala— exec wrapper, DPP partition pruning, metric reporting, per-partition file paths plumbed to InputFileBlockHolderDeltaPlanDataInjector.scala,DeltaInputFileBlockHolder.scala— smallDeltaReflection.scala— reflection bridge into Delta internals (incl.refreshedSnapshotFilesfor snapshot staleness)RowTrackingAugmentedFileIndex.scala— smallCometDeltaCoverageSuite.scala— the accelerator-coverage matrixContrib Rust (~30 minutes, contrib bar):
contrib/delta/native/src/engine.rs— kernel-rs engine + cachecontrib/delta/native/src/scan.rs—plan_delta_scan, DV row-index resolution,extract_row_tracking_for_selected(reads fileConstantValues from raw RecordBatch)contrib/delta/native/src/synthetic_columns.rs—DeltaSyntheticColumnsExec(emits row_index Int64 + is_row_deleted Int8 + row_id + row_commit_version + Spark_metadata.*+ row-tracking synthetics; per-batch row offset counter; DV-walk for is_row_deleted)contrib/delta/native/src/dv_filter.rs—DeltaDvFilterExec(chained after synthetic emission when DV+synthetics both needed)contrib/delta/native/src/planner.rs—build_delta_partitioned_files,SessionTimezone,ColumnMappingFilterRewritercontrib/delta/native/src/core_glue.rs— the in-core dispatcher body (homed here, compiled into core via#[path])contrib/delta/native/src/jni.rs—planDeltaScanJNI entryBuild / regression infra (~5 minutes):
spark/pom.xml-Pcontrib-deltaprofilenative/core/Cargo.tomlcontrib-deltafeaturecontrib/delta/native/Cargo.toml(standalone, not in workspace — intentional to avoid arrow-57 / arrow-58 cross-contamination)dev/verify-contrib-delta-gate.sh— build-gate enforcementcontrib/delta/dev/run-regression.sh+dev/diffs/delta/4.1.0.diffgit log --oneline main..HEADis also a useful walk — commits are labeled byphase (P7a..P7z) and each commit message documents the specific concern it
addresses. Two prior comprehensive reviews are reflected in commits
43768c1c(first review) and
2d13a147(review of the gate-unblock work).Follow-ups (not in this PR)
parquet-variantcrates but Comet hasn't integrated them; would unblockCometScanTypeChecker.isVariantStructfor all native scansProjectionExeccolumn-mapping rename pushdown intoParquetSource's schema adapter (perf item from in-PR sweep)ContribPlannerCtxtrait in a small shared crate so thecore_glue.rsbody can compile in the contrib crate proper (eliminates the#[path]indirection at the cost of a new crate). Tracked as a separate task.Test plan
-Pcontrib-delta):mvn -pl spark -am test-compilegreen-Pcontrib-deltabuilds green (Maven + Cargo)dev/verify-contrib-delta-gate.shpasses all 6 build-gate checksCometDeltaFeaturesSuite/CometDeltaNativeSuite/CometDeltaColumnMappingSuite/CometDeltaCoverageSuiteDescribeDeltaHistorySuite "replaceWhere on data column"— 8/8DeltaTableHadoopOptionsSuite "dropFeatureSupport - with filesystem options"— 1/1SnapshotManagementSuite "should not recover when the current checkpoint is broken..."— 2/2DeltaColumnMappingSuite "physical data and partition schema"+"write/merge df to table"(CM-id + CM-name) — 2/2pthread_create EAGAIN)-Pcontrib-deltabuild paths exercised +dev/verify-contrib-delta-gate.shwiredUpstream issue
apache/datafusion#22366 —
filed for
make_arrayelement-type strictness. TheCometCreateArraydecline in this PR is a caller-side workaround until upstream relaxes.
🤖 Generated with Claude Code