Skip to content

feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366

Draft
schenksj wants to merge 111 commits into
apache:mainfrom
schenksj:contrib-delta-direct
Draft

feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366
schenksj wants to merge 111 commits into
apache:mainfrom
schenksj:contrib-delta-direct

Conversation

@schenksj
Copy link
Copy Markdown

@schenksj schenksj commented May 19, 2026

Briefing

This PR lands a native Delta Lake scan for Comet. It supersedes #3932 — the
SPI/registry design discussed there was rejected in favor of the Iceberg-style
contrib pattern this PR uses (typed proto variant + ~40 lines of feature-gated
core touchpoints + standalone contrib/delta/ tree). Default builds are
entirely unaware of this code: no SPI lookups, no ServiceLoader scans, no
contrib surface at runtime. Only when the -Pcontrib-delta Maven profile (and
parallel contrib-delta Cargo feature) is activated do the contrib classes
land on the classpath and the reflection bridge resolve.

The integration reads Delta metadata via delta-kernel-rs on the driver,
encodes the resolved file list (with column mappings, DV info, partition
values, row-tracking baseRowId) into a typed OpStruct::DeltaScan proto, and
executes via DataFusion's parquet reader on each executor.

Status & recent correctness fixes

All Delta 4.1 own-suite regression failure families triaged this round are
fixed and each has a lightweight contrib regression guard (a full Delta
own-suite re-run is in progress to confirm end-to-end); deliberate tradeoffs and any remaining
limitations are tracked in contrib/delta/docs/08-known-limitations.md
for post-merge issues.

Area Fix Scope
DPP on partitioned tables (MERGE/join) no longer crashes (CometSubqueryAdaptiveBroadcastExec); prunes to required partitions even inside a native block contrib
Time travel reads the pinned scannedSnapshot, not head contrib
Row tracking reads MATERIALIZED _row-id-col-* / _row-commit-version-col-* from parquet (stable IDs across rewrites) contrib
DV / row_index + pushed data filter data-filter pushdown suppressed when a physical-position synthetic is emitted (was wrong row IDs) contrib
CM-mode + shuffle logicalLink propagated so AQE doesn't assert contrib
Deep boolean predicates balanced And/Or serialization (protobuf recursion limit) core Comet
Corrupted/short file reads wrapped as FAILED_READ_FILE.NO_HINT core Comet
HDFS / libhdfs native scans FS-scheme allowlist honors fs.comet.libhdfs.schemes core Comet

Reviewers: the bottom three rows touch CORE Comet code (not just the
contrib subtree) and affect all Comet queries/scans — see "Core touchpoints"
in Review strategy.

Coverage

Supported, fully native (broad):

  • Deletion vectors — kernel resolves the bitmap on the driver, DeltaDvFilterExec filters rows on executors. DV filter is chained AFTER synthetic emission (so row_index reflects original file positions) when both are needed
  • Column mapping both name AND id mode. name rewrites logical→physical names in the planner; id translates Delta's delta.columnMapping.id to parquet's PARQUET:field_id on every StructField (including nested struct/array/map) so the parquet reader matches by ID
  • Type widening — DataFusion's parquet schema adapter handles the read-time cast
  • Row tracking — supported in three modes:
    • Materialised: rewritten to read the physical _row-id-col-<uuid> column from parquet
    • Unmaterialised: row_id = base_row_id + physical_row_index per file, all synthesised natively — base_row_id is emitted as a per-file Int64 constant from AddFile.baseRowId and _row-id-col-<uuid> is emitted as all-NULL so Delta's GenerateRowIDs Project falls back to the computed expression
    • Mixed within one query: the wrapped exec's emit order matches scan.requiredSchema ordinal-by-ordinal so the upstream Filter(__delta_internal_is_row_deleted = 0) binds correctly
  • Native synthesis of Delta's internal columns __delta_internal_row_index / __delta_internal_is_row_deleted for UPDATE/DELETE/MERGE flows. is_row_deleted is emitted as Int8 (matching Delta's ByteType) to avoid DataFusion's interval-propagator panicking on Int32 vs Int8 mismatches in stats pushdown
  • Native synthesis of Spark _metadata.* virtual columns (file_path / file_name / file_size / file_block_start / file_block_length / file_modification_time) detected from scan.output even when not in scan.requiredSchema
  • Output column reorder when synthetics aren't already a canonical suffix — proto carries final_output_indices, native dispatcher wraps with a ProjectionExec so downstream operators that bind by ordinal don't silently misread one synthetic as another
  • General-purpose Parquet field-ID matching when spark.sql.parquet.fieldId.read.enabled=true (same wiring as CM-id)
  • Partition pruning, including DPP (resolved after AQE broadcasts are ready)
  • Predicate pushdown into parquet (with synthetic-column filters excluded — those are handled by the upstream Filter after synthetic emission)
  • Multi-task-per-partition packing for cluster utilisation
  • input_file_name() and friends — one-task-per-partition + a per-task InputFileBlockHolder hook in CometExecRDD + CometDeltaNativeScanExec plumbs per-partition file paths through to the RDD
  • FAILED_READ_FILE.NO_HINT exception wrapping with file path
  • Encryption that routes through the shared CometParquetUtils config check
  • _delta_log, _change_data, and _commits parquet reads via the same scan
  • S3A Hadoop credential chain (SimpleAWS / TemporaryAWS / AssumedRole / IAMInstance) resolved Scala-side at planning time so kernel log replay authenticates under the same chain as data reads. Reflective lookup against S3AUtils.createAWSCredentialProviderList; cached Method handles
  • checkLatestSchemaOnRead=false — our path is pinned to a single snapshot version via extractSnapshotVersion(relation) so the Delta-side at-read check doesn't apply to us
  • Time travel (versionAsOf / timestampAsOf) and snapshot reads — files are resolved from the snapshot the scan was prepared against (preparedScan.scannedSnapshot), exactly what vanilla Spark+Delta reads; re-querying filesForScan picks up the freshest DV descriptors that snapshot carries
  • All public S3 / Azure / GCS / OSS schemes; local file://

Falls back to Spark's reader (with withInfo reason surfaced in explain-fallback):

Correctness fallbacks — load-bearing, do not remove:

  • DV materialisation failure (DV file missing / unsupported version / read error) — kernel can't give us the deleted-row indexes, so we can't filter
  • Reflective AddFile extraction failure — no file list, nothing to scan
  • Kernel-rs log-replay error — same
  • Phase 6 reader-feature gate — currently an empty list; defense-in-depth for any future kernel-rs return of unsupported feature names

Shared Comet limits (apply to any native scan, not Delta-specific) — each is its own per-case work in core:

  • Encryption with unsupported KMS config — shared CometParquetUtils.isEncryptionConfigSupported
  • Custom Hadoop FS schemes (fake:// etc.) — object_store has no Hadoop FS plugin layer; would need a bridge
  • CometScanTypeChecker rejections (ShortType under default config, string collation, variant struct) — each is a Comet-core feature gap, not a Delta-contrib problem. Variant in particular: arrow-rs has parquet-variant crates but Comet hasn't integrated them yet

External:

  • TahoeLogFileIndexWithCloudFetch — Databricks-proprietary file index, not in OSS Delta. Defensive guard for DBR users only

Workaround tracked upstream:

  • CreateArray with mismatched element types — caller-side decline for apache/datafusion#22366. Removable once upstream lands

User off-switches:

  • spark.comet.scan.deltaNative.enabled=false, spark.comet.exec.enabled=false

Shape

Layer Path Lives in
Typed proto variant delta_scan = 117 native/proto/src/proto/operator.proto Core
Reflection bridge spark/.../comet/rules/DeltaIntegration.scala Core
Scan-rule arm spark/.../comet/rules/CometScanRule.scala Core (one block)
Exec-rule arm spark/.../comet/rules/CometExecRule.scala Core (one case)
PlanDataInjector.opStructCase spark/.../sql/comet/operators.scala Core (one method)
Per-partition file paths CometExecRDD, CometNativeScanExec, CometExecIterator, ShimSparkErrorConverter Core (load-bearing for input_file_name() and FAILED_READ_FILE.NO_HINT wrapping in any native scan)
Native dispatcher arm (DV / synthetic / reorder / CM-id) contrib/delta/native/src/core_glue.rs (compiled into core via #[path]; see "Why the dispatcher file lives in contrib but compiles in core" below) Compiled into core, file homed in contrib
Delta scan rule, exec, serde contrib/delta/src/main/scala/... Contrib
Kernel-rs engine + cache, scan, DV filter, synthetic columns, planner contrib/delta/native/src/*.rs Contrib
Maven profile, Cargo feature spark/pom.xml, contrib/delta/native/Cargo.toml, native/core/Cargo.toml Build
Build-gate verification dev/verify-contrib-delta-gate.sh Build
Regression harness contrib/delta/dev/run-regression.sh + dev/diffs/delta/4.1.0.diff Contrib

Key design decisions

Iceberg-style contrib, not SPI. Static helper objects with stable names
(DeltaScanRule.transformV1IfDelta, CometDeltaNativeScan.MODULE$); a single
reflection bridge in core resolves and caches Method handles once per JVM.
No registry, no ServiceLoader, no extension points beyond what core already
exposes. The contrib is just classpath-or-not.

Typed proto, not an envelope. OpStruct::DeltaScan is a first-class
variant. Avoids the ContribOp { kind, payload } envelope discussed in #3932;
PlanDataInjector keys by OpStructCase for O(1) dispatch.

Split-mode plan serialization. CometDeltaNativeScan.convert emits a
DeltaScan proto with the common block only (schemas, table root, filters);
each partition's tasks ride in a per-partition byte array via
PlanDataInjector at execution time. Avoids closure-capturing every file in
every partition.

Native synthetic-column synthesis. DeltaSyntheticColumnsExec (in
contrib/delta/native/src/synthetic_columns.rs) emits the standard four
Delta internals (__delta_internal_row_index as Int64, __delta_internal_is_row_deleted
as Int8, row_id, row_commit_version) PLUS Spark _metadata.* virtual
columns PLUS row-tracking-specific synthetics (base_row_id per-file
constant from AddFile.baseRowId, _row-id-col-<uuid> / _row-commit-version-col-<uuid>
as NULL-filled). When emit is on, each file gets its own FileGroup so the
per-file row offset / baseRowId arithmetic is well-defined.

Synthetic-suffix ordering matters. The wrapped exec's output ordering is
checked against scan.requiredSchema AND the canonical native emit order. If
the synthetic block isn't already in canonical order at the right ordinals,
the proto carries final_output_indices and the native dispatcher wraps with
a ProjectionExec to reorder. Without this, an upstream
Filter(__delta_internal_is_row_deleted = 0) binding by ordinal would silently
misread row_index as is_row_deleted (caught and fixed mid-PR; the
DV-after-DELETE test bisected the bug to a one-ordinal swap).

DV filter chained after synthetic emission, not mutually exclusive. When
both synthetics and a DV are present, we previously chose one wrapper or the
other — which meant any read that surfaced _tmp_metadata_row_index got
NO DV filtering applied. The wrappers are now chained:
parquet → DeltaSyntheticColumnsExecDeltaDvFilterExec (skipped when
emit_is_row_deleted is on so UPDATE/DELETE/MERGE writers still see every row).

CM-name rename before synthetics. Synthetic columns have fixed names
(never CM-renamed) and are appended AFTER the parquet read; the rename
projection has to apply to the parquet output BEFORE the append so the
length-match check works correctly.

Spark _metadata.* driven from scan.output, not just scan.requiredSchema.
Delta's PreprocessTableWithDVs strategy can append _metadata.file_path to
scan.output without putting it in scan.requiredSchema. The synthetic
exec detects these from scan.output so the wrapped exec's output schema
includes them and downstream attribute resolution works.

is_row_deleted is Int8, not Int32. Delta declares the column as
ByteType. Emitting Int32 trips DataFusion's interval propagator with
Only intervals with the same data type are intersectable, lhs:Int32, rhs:Int8
whenever the upstream Filter pushes stats. Caught by the CM + DV combined
coverage test.

InputFileBlockHolder thread-local hook in CometExecRDD.compute.
Comet's native scans bypass Spark's FileScanRDD, so the standard
input_file_name() thread-local would otherwise be empty for any native
scan (not just Delta). One small but load-bearing core change that fixes
both Delta's UPDATE/DELETE/MERGE flows AND the FAILED_READ_FILE.NO_HINT
error wrapping. CometDeltaNativeScanExec plumbs its per-partition file
paths through to CometExecRDD so InputFileBlockHolder.set(path) fires
correctly.

Read from the prepared snapshot (not head) for PreparedDeltaFileIndex.
preparedScan.files caches the AddFile list at FileIndex construction time.
DeltaReflection.preparedSnapshotFiles re-queries
preparedScan.scannedSnapshot.filesForScan(filters, false).files — the snapshot
the scan was prepared against, which is exactly what vanilla reads — to pick up
the freshest DV descriptors that snapshot carries, falling back to the cached
preparedScan.files if reflection fails. An earlier revision refreshed to head
via deltaLog.update(), but that returned current data for a time-travel query
(versionAsOf=0 yielded head's rows) and diverged from vanilla on the
consecutive-DELETE / DeltaLog-cache-staleness case; reading scannedSnapshot
matches vanilla in every case.

Engine cache by (scheme, authority, DeltaStorageConfig). kernel-rs's
DefaultEngine<TokioBackgroundExecutor> spawns one OS thread per executor.
Without caching, hundreds of scans/min was leaking threads faster than tokio
reaped them, tripping pthread_create EAGAIN ~2h into regression. The cache
bounds live thread count by table-storage diversity instead of by request
count.

DV filter ordering safeguards. DeltaDvFilterExec tracks
current_row_offset across batches, which assumes physical-order input.
Overrides maintains_input_order() = [true] and
benefits_from_input_partitioning() = [false] so any future optimizer that
wants to insert a RepartitionExec is forced to bail rather than silently
re-order rows.

One new core trait method. PlanDataInjector.opStructCase is the only
core trait addition. It keys the existing injector map for O(1) dispatch.

Why the dispatcher file lives in contrib but compiles in core

contrib/delta/native/src/core_glue.rs is physically co-located with the
rest of the Delta integration but is compiled as a module of the core crate
via #[cfg(feature = "contrib-delta")] #[path = "../../../../contrib/delta/native/src/core_glue.rs"] mod contrib_delta_scan;. The reason: this file implements
PhysicalPlanner::plan_delta_scan and reaches into core's pub(crate)
helpers (create_expr, init_datasource_exec,
prepare_object_store_with_configs). A true cross-crate impl block is
forbidden by Rust, and a contrib → core cargo dependency would create a
cycle with core's optional contrib-delta dep on contrib, so #[path] is
the available tool that lets the FILE's home be with Delta while its
COMPILATION unit stays in core. Build gate (cfg(feature = "contrib-delta"))
is preserved exactly — default builds carry zero Delta surface (see
"Validation" below).

Audit of remaining Delta references in core

After moving the dispatcher body into contrib/, every Delta reference left
in native/core/src/ is either feature-gated or a structural one-line arm
in an exhaustive match OpStruct:

File Reference Why it's there
planner.rs:33-35 mod contrib_delta_scan; The #[path]-relocated module declaration. #[cfg(feature = "contrib-delta")].
planner.rs:1512-1527 OpStruct::DeltaScan dispatcher arm Both halves feature-gated. Default-build half returns "Received a DeltaScan operator but core was built without the contrib-delta Cargo feature" so a misconfigured driver gets a clear error.
jni_api.rs:op_name OpStruct::DeltaScan(_) => "DeltaScan" Exhaustive enum match; returns a string for tracing. No contrib logic.
planner/operator_registry.rs:to_operator_type OpStruct::DeltaScan(_) => None Exhaustive enum match; signals "not in OperatorType enum". No contrib logic.

OpStruct is a proto-generated enum (in datafusion-comet-proto); Rust
requires exhaustive matches everywhere it appears. Keeping the structural
arms un-gated is intentional — it lets default builds identify a misrouted
DeltaScan operator by name in the error message.

Validation

The build gate is enforced by dev/verify-contrib-delta-gate.sh, which runs
6 independent checks across 3 layers and exits non-zero on the first
failure. Designed to be wired into CI.

# Requires a JDK ≥17 on PATH (and as JAVA_HOME for the Maven sub-runs).
dev/verify-contrib-delta-gate.sh

What the script asserts:

Layer Check
Cargo cargo tree -p datafusion-comet --no-default-features has zero comet-contrib-delta / delta_kernel entries
Cargo cargo tree -p datafusion-comet --features contrib-delta correctly pulls both (catches accidental off)
Maven mvn -Pspark-4.1 dependency:list has zero io.delta:* deps
Maven mvn -Pspark-4.1,contrib-delta dependency:list correctly pulls io.delta:delta-spark
Maven Default test-compile produces no org/apache/comet/contrib/**.class and no CometDeltaNativeScan* / DeltaScanRule* / DeltaReflection* classes (only the always-present DeltaIntegration reflection bridge)
Native Default libcomet.dylib is meaningfully smaller (~57 MB delta on macOS arm64 debug build) AND has zero comet_contrib_delta / delta_kernel / DeltaDvFilter* / DeltaSynthetic* external symbols

Current run on this branch: all 6 PASS.

Running the contrib Scala test suite

49 tests across four suites (24 coverage + 25 feature/native/column-mapping):

# JDK 17, contrib + spark-4.1 profiles
JAVA_HOME=$(/usr/libexec/java_home -v 17) \
  mvn -Pspark-4.1,contrib-delta -pl spark -am test \
    -Dsuites='org.apache.comet.contrib.delta.CometDeltaFeaturesSuite,
              org.apache.comet.contrib.delta.CometDeltaNativeSuite,
              org.apache.comet.contrib.delta.CometDeltaColumnMappingSuite,
              org.apache.comet.contrib.delta.CometDeltaCoverageSuite' \
    -Djava.version=17 -Dmaven.compiler.source=17 -Dmaven.compiler.target=17

Current run: 49/49 pass.

CometDeltaCoverageSuite is the accelerator-coverage matrix — each test
asserts BOTH (a) the executed plan contains CometDeltaNativeScanExec
(actually engaged, no silent fall-back) AND (b) the rows match vanilla
Spark+Delta exactly. Covers: SELECT */column-prune/arithmetic/LIMIT/DISTINCT,
filters (eq/neq/IN/IS NULL/BETWEEN/LIKE/AND/OR/NOT), ORDER BY, aggregates
(count/sum/avg/min/max/GROUP BY/HAVING/COUNT DISTINCT), joins
(self/inner/left/leftsemi/leftanti), set ops (UNION/INTERSECT/EXCEPT),
window functions, scalar + IN subqueries, CTEs, partition-pruned reads,
column-mapping reads, DV-bearing reads, nested data (struct/array/map).

Running the contrib Rust test suite

cargo test -p comet-contrib-delta
# Plus the integration tests that exercise plan_delta_scan against a
# real parquet + _delta_log tree:
cargo test -p datafusion-comet --features contrib-delta

What the in-PR validation looks like end-to-end

  1. dev/verify-contrib-delta-gate.sh — proves default builds carry zero Delta surface.
  2. Contrib Scala suite (4 suites, 49 tests) — proves accelerator engages and matches vanilla across the SQL surface area.
  3. Contrib Rust unit + integration tests — proves the kernel-rs engine cache, DV filter, synthetic columns, predicate, and CM-rewriter behave correctly in isolation.
  4. Full Delta 4.1 regression (contrib/delta/dev/run-regression.sh against dev/diffs/delta/4.1.0.diff) — proves we don't regress anything in Delta's own test suite.

Review strategy

Suggested order with different bars:

  1. Core touchpoints (~10 minutes, high bar). New core surface area is
    small but ships in default builds:

    • native/proto/src/proto/operator.proto (one OpStruct variant + DeltaScan messages)
    • The dispatcher arm in native/core/src/execution/planner.rs:1512-1527 (the actual body lives in contrib/delta/native/src/core_glue.rs; see "Why the dispatcher file lives in contrib but compiles in core" above)
    • spark/.../comet/rules/DeltaIntegration.scala (whole file — reflection bridge)
    • The new arm in CometScanRule.transformV1Scan and the new case in CometExecRule.transform
    • CometExecRDD + CometExecIterator + CometNativeScanExec diffs (per-partition file paths, InputFileBlockHolder hook)
    • ShimSparkErrorConverter.wrapNativeParquetError
    • spark/.../comet/serde/arrays.scala (CreateArray decline — references the upstream issue)
    • spark/.../comet/serde/QueryPlanSerde.scala + predicates.scalaCometAnd/CometOr now serialize a BALANCED And/Or tree (createBalancedBinaryExpr/flattenAssociative) instead of a left-deep one, so a deep boolean predicate doesn't overflow protobuf's 100-level recursion limit when the plan is re-parsed (findShuffleScanIndices). Affects ALL Comet queries; Comet's And/Or is vectorized (non-short-circuiting) so rebalancing is semantically identical
    • spark/.../comet/CometExecIterator.scalaisFileReadError also wraps object-store read failures ("Requested range was invalid", object-not-found) into FAILED_READ_FILE.NO_HINT, not just Parquet error: (matches Spark on corrupted/truncated files). Affects all native scans
    • spark/.../comet/rules/CometScanRule.scala — V1 native-scan filesystem-scheme allowlist (declines schemes object_store can't read); honors spark.hadoop.fs.comet.libhdfs.schemes so HDFS/custom-libhdfs scans are NOT declined
    • common/.../comet/util/Utils.scala + comet/vector/NativeUtil.scala — materialize Spark ConstantColumnVector to Arrow FieldVector (incl. TimestampNTZType); previously a hard error
    • spark/.../sql/comet/operators.scalaCometScanWithPlanData gains optional dynamicPruningFilters/withDynamicPruningFilters hooks (default no-op) so a scan can have its DPP rewrite installed in place; base scans unaffected
  2. Contrib Scala (~30 minutes, contrib bar):

    • DeltaScanRule.scala — entry point, gates documented under "Coverage" above
    • CometDeltaNativeScan.scala — split serde, kernel-rs call, task prune/split/pack, column-mapping fixup, synthetic-column detection + suffix reorder, CM-id field-ID translator, S3A credential chain resolution
    • CometDeltaNativeScanExec.scala — exec wrapper, DPP partition pruning, metric reporting, per-partition file paths plumbed to InputFileBlockHolder
    • DeltaPlanDataInjector.scala, DeltaInputFileBlockHolder.scala — small
    • DeltaReflection.scala — reflection bridge into Delta internals (incl. refreshedSnapshotFiles for snapshot staleness)
    • RowTrackingAugmentedFileIndex.scala — small
    • CometDeltaCoverageSuite.scala — the accelerator-coverage matrix
  3. Contrib Rust (~30 minutes, contrib bar):

    • contrib/delta/native/src/engine.rs — kernel-rs engine + cache
    • contrib/delta/native/src/scan.rsplan_delta_scan, DV row-index resolution, extract_row_tracking_for_selected (reads fileConstantValues from raw RecordBatch)
    • contrib/delta/native/src/synthetic_columns.rsDeltaSyntheticColumnsExec (emits row_index Int64 + is_row_deleted Int8 + row_id + row_commit_version + Spark _metadata.* + row-tracking synthetics; per-batch row offset counter; DV-walk for is_row_deleted)
    • contrib/delta/native/src/dv_filter.rsDeltaDvFilterExec (chained after synthetic emission when DV+synthetics both needed)
    • contrib/delta/native/src/planner.rsbuild_delta_partitioned_files, SessionTimezone, ColumnMappingFilterRewriter
    • contrib/delta/native/src/core_glue.rs — the in-core dispatcher body (homed here, compiled into core via #[path])
    • contrib/delta/native/src/jni.rsplanDeltaScan JNI entry
  4. Build / regression infra (~5 minutes):

    • spark/pom.xml -Pcontrib-delta profile
    • native/core/Cargo.toml contrib-delta feature
    • contrib/delta/native/Cargo.toml (standalone, not in workspace — intentional to avoid arrow-57 / arrow-58 cross-contamination)
    • dev/verify-contrib-delta-gate.sh — build-gate enforcement
    • contrib/delta/dev/run-regression.sh + dev/diffs/delta/4.1.0.diff

git log --oneline main..HEAD is also a useful walk — commits are labeled by
phase (P7a..P7z) and each commit message documents the specific concern it
addresses. Two prior comprehensive reviews are reflected in commits 43768c1c
(first review) and 2d13a147 (review of the gate-unblock work).

Follow-ups (not in this PR)

  • Variant type native support — arrow-rs has parquet-variant crates but Comet hasn't integrated them; would unblock CometScanTypeChecker.isVariantStruct for all native scans
  • String collation native support in expression evaluators
  • ProjectionExec column-mapping rename pushdown into ParquetSource's schema adapter (perf item from in-PR sweep)
  • Engine cache TTL / credential-rotation eviction (fine for validation; would block long-lived production drivers using STS)
  • Filter-rewriter linear field lookup → name→index HashMap (perf audit item; per-filter not per-batch)
  • Extract a ContribPlannerCtx trait in a small shared crate so the core_glue.rs body can compile in the contrib crate proper (eliminates the #[path] indirection at the cost of a new crate). Tracked as a separate task.

Test plan

  • Default builds (no -Pcontrib-delta): mvn -pl spark -am test-compile green
  • -Pcontrib-delta builds green (Maven + Cargo)
  • dev/verify-contrib-delta-gate.sh passes all 6 build-gate checks
  • Contrib Scala test suite: 49/49 pass across CometDeltaFeaturesSuite / CometDeltaNativeSuite / CometDeltaColumnMappingSuite / CometDeltaCoverageSuite
  • Contrib Rust unit tests pass
  • Two comprehensive code reviews completed; both rounds of findings addressed
  • Targeted retest of every cluster surfaced during validation, all pass:
    • DescribeDeltaHistorySuite "replaceWhere on data column" — 8/8
    • DeltaTableHadoopOptionsSuite "dropFeatureSupport - with filesystem options" — 1/1
    • SnapshotManagementSuite "should not recover when the current checkpoint is broken..." — 2/2
    • DeltaColumnMappingSuite "physical data and partition schema" + "write/merge df to table" (CM-id + CM-name) — 2/2
  • Engine-cache fix verified end-to-end (no more pthread_create EAGAIN)
  • Full Delta 4.1 regression with all gate-unblock commits in place
  • CI: default + -Pcontrib-delta build paths exercised + dev/verify-contrib-delta-gate.sh wired

Upstream issue

apache/datafusion#22366
filed for make_array element-type strictness. The CometCreateArray
decline in this PR is a caller-side workaround until upstream relaxes.

🤖 Generated with Claude Code

schenksj and others added 19 commits May 18, 2026 20:01
Initial scaffolding for the direct Delta integration that replaces the
generic contrib SPI proposed in apache#4339. Mirrors Iceberg's pattern:

  - native/proto/src/proto/operator.proto: typed `DeltaScan delta_scan = 117`
    variant on `OpStruct`, with the six message definitions (DeltaScanCommon,
    DeltaScan, DeltaScanTask, DeltaPartitionValue, DeltaScanTaskList,
    DeltaColumnMapping) inlined next to the IcebergScan group. Field numbers
    preserved from the contrib-delta-pr2 branch.

  - native/core/src/execution/planner.rs: unconditional `OpStruct::DeltaScan`
    dispatcher arm with feature-gated body. Default builds return a clear
    "rebuild with --features contrib-delta" error; the feature-on arm is a
    `todo!` stub today and gets filled in as the implementation ports over.

  - native/core/src/execution/jni_api.rs + planner/operator_registry.rs: extend
    the existing `OpStruct` match sites so default builds compile exhaustively.

  - native/core/Cargo.toml: new optional `contrib-delta` feature backed by an
    optional path dep on `comet-contrib-delta`. Default builds carry zero Delta
    surface (verified: `cargo check` builds clean without the feature, and the
    Delta crate is not in the workspace `members` list).

  - native/Cargo.toml: explicit `exclude = ["../contrib"]` so the workspace
    doesn't try to absorb the contrib crate (which would fail -- workspace
    members must live hierarchically under the workspace root).

  - contrib/delta/native/{Cargo.toml,src/lib.rs}: skeleton crate that re-exports
    the typed Delta proto messages so contrib-internal code has a stable short
    alias. Real implementation (kernel-rs log replay, DV filter, column
    mapping, partition parsing) ports over from contrib-delta-pr2 in follow-up
    commits.

Build verification:
  cargo check  -p datafusion-comet                        # default: green
  cargo check  -p datafusion-comet --features contrib-delta # green

This addresses Parth's review on apache#4339: ~40 lines of core touchpoints all
behind a feature gate, no SPI/registry/traits/runtime dispatch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Brings the working delta-kernel-rs integration over from contrib-delta-pr2
without the contrib SPI plumbing Parth flagged on apache#4339.

contrib/delta/native/:
  - jni.rs, scan.rs, engine.rs, error.rs, predicate.rs, dv_filter.rs --
    ported verbatim from contrib-delta-pr2 (only crate::proto::* import paths
    needed adjustment, handled via lib.rs re-export of the typed messages
    that now live in core's proto crate)
  - planner.rs -- Delta-specific helpers (build_delta_partitioned_files,
    parse_delta_partition_scalar with the DATE -> TIMESTAMP_NTZ widening
    fallback already inlined, ColumnMappingFilterRewriter) exposed as
    pure-DataFusion functions that core's dispatcher arm composes onto the
    standard parquet datasource path. NO ContribOperatorPlanner trait, NO
    ContribPlannerContext, NO ParquetDatasourceParams -- the contrib crate
    is now a plain library with public functions.
  - lib.rs -- module decls + a `pub mod proto` re-export of the six typed
    Delta messages from `datafusion_comet_proto::spark_operator`. No
    `#[ctor]` and no `register_contrib_planner` call.
  - Cargo.toml -- standalone (outside the native/ workspace root), no
    comet-contrib-spi dep, all delta-specific deps stay confined here.

native/core/src/execution/planner/contrib_delta_scan.rs (new):
  - `PhysicalPlanner::plan_delta_scan` -- the `OpStruct::DeltaScan` arm body
    extracted into its own file (~210 lines, mirrors `OpStruct::IcebergScan`
    in size and shape). Gated `#[cfg(feature = "contrib-delta")]`; calls
    core's `init_datasource_exec`, `prepare_object_store_with_configs`,
    `convert_spark_types_to_arrow_schema` directly + comet-contrib-delta's
    helpers for the Delta-specific pieces.

native/core/src/execution/planner.rs:
  - `OpStruct::DeltaScan` arm: 6-line dispatcher that calls into
    `self.plan_delta_scan(...)` under `#[cfg(feature = "contrib-delta")]`.

native/core/src/parquet/parquet_exec.rs:
  - New `ignore_missing_files: bool` arg on `init_datasource_exec`.
    Threaded through to `IgnoreMissingFileSource` wrapper (ported verbatim
    from PR2's native/core/src/parquet/missing_file_tolerant.rs) which
    decorates the final FileSource so its FileOpener swallows object-store
    NotFound errors as empty streams. Matches Spark's
    `spark.sql.files.ignoreMissingFiles=true` semantics. All existing call
    sites updated to pass `false`.

Build verification (both checked clean):
  cargo check  -p datafusion-comet                          # default
  cargo check  -p datafusion-comet --features contrib-delta

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
These five files port verbatim from contrib-delta-pr2 -- they touch only
Spark APIs (via reflection) and standard Scala, none of the rejected SPI
surface:

  - DeltaConf.scala               Config keys (COMET_DELTA_NATIVE_ENABLED, ...)
  - Native.scala                  JNI bridge for planDeltaScan
  - DeltaReflection.scala         Reflective access to spark-delta internals
                                  (isDeltaFileFormat, isBatchFileIndex,
                                  extractBatchAddFiles, ...)
  - RowTrackingAugmentedFileIndex Wraps a FileIndex to inject row-tracking
                                  metadata columns
  - DeltaInputFileBlockHolder     Thread-local replacement for
                                  InputFileBlockHolder on the Delta scan path

Plus the regression infrastructure (4.1.0.diff, run-test.sh,
run-regression.sh).

The remaining four files (CometDeltaNativeScan, CometDeltaNativeScanExec,
DeltaScanRuleExtension, DeltaOperatorSerdeExtension, DeltaPlanDataInjector)
each reference the rejected SPI surface (CometOperatorSerde,
CometScanRuleExtension, ContribOp envelope, PlanDataSource, PlanDataInjector).
Those need rewriting before they can compile against main -- queued as the
next commit on this branch:
  - drop the `extends CometOperatorSerde[CometScanExec]` trait bound;
    expose `convert(...)` as a static method
  - replace ContribOp envelope with the typed OpStruct::DeltaScan
  - drop the SPI extension class wrappers; integrate detection directly
    into CometScanRule.scala + CometExecRule.scala (Iceberg-style)
  - bake DeltaPlanDataInjector logic directly into CometDeltaNativeScanExec

Maven `-Pcontrib-delta` profile, scalastyle wiring, and the SPI rewrite
all land together in the follow-up commit so the contrib compiles
end-to-end against main.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ction bridge

The four SPI-touching files from contrib-delta-pr2 rewritten to drop the
rejected SPI base classes and use the typed `OpStruct::DeltaScan` proto
variant directly:

  - CometDeltaNativeScan.scala  no longer `extends CometOperatorSerde`;
    plain object with `convert(scan, builder, childOp*)` static method.
    All `ContribOp` envelope wrapping replaced with
    `builder.setDeltaScan(...)`. DeltaOperator.* imports redirected to
    core's `org.apache.comet.serde.OperatorOuterClass`.
  - CometDeltaNativeScanExec.scala  no longer `with PlanDataSource`;
    public accessors (planDataSourceKey, planDataCommonBytes,
    planDataPerPartitionBytes) stay so core's CometExecRDD can read them
    directly. `nativeOp.getContribOp.getPayload` calls collapse to the
    typed `nativeOp.getDeltaScan` accessor.
  - DeltaScanRule.scala  was `class DeltaScanRuleExtension extends
    CometScanRuleExtension`; now a plain `object DeltaScanRule` with a
    single static entry point `transformV1IfDelta(plan, session,
    scanExec, relation): Option[SparkPlan]`. The private
    `CometScanRule.isSchemaSupported` is unreachable from contrib, so
    inline the equivalent check (CometScanTypeChecker + fallback-reason
    emission).
  - The DeltaOperatorSerdeExtension + DeltaPlanDataInjector files are
    not ported -- their roles fold into the next commit's CometExecRule
    Delta serde dispatch and into CometDeltaNativeScanExec respectively.

Core wiring:
  - spark/pom.xml: new `<profile id="contrib-delta">` adds
    contrib/delta/src/main/scala/ as a compile source on comet-spark and
    pulls in `io.delta:delta-spark_2.13:4.1.0` at provided scope.
  - CometScanRule.scala: 5-line Delta detection block at the head of
    `transformV1Scan`'s HadoopFsRelation case (Iceberg-style; calls into
    `DeltaIntegration.transformV1IfDelta` which is a no-op when the
    contrib isn't bundled).
  - DeltaIntegration.scala (new): reflection bridge that resolves the
    contrib's `DeltaScanRule` + `CometDeltaNativeScan` companion objects
    by class name. Default builds get `None`; -Pcontrib-delta builds get
    a working delegate. No SPI / ServiceLoader / registry.

Build verification:
  mvn compile                  # default: still green
  mvn compile -Pcontrib-delta  # GREEN -- this is the milestone

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tics

Spark's UnsafeRow.getUTF8String wraps bytes via UTF8String.fromAddress with
no UTF-8 validation, and cast(BinaryType -> StringType) is a zero-copy
reinterpret that leaves arbitrary bytes in a StringType column. Delta's
Z-Order uses interleave_bits(...).cast(StringType) for opaque sort keys,
which panicked Comet's strict from_utf8(...).unwrap() and cascaded into
JVM classloader errors (60+ ServiceConfigurationError tests in the
contrib-delta-pr2 regression run).

Switch to from_utf8_unchecked since the bytes flow directly into Arrow's
StringBuilder::append_value and are never introspected as a &str.

Verified on contrib-delta-pr2: OptimizeZOrderScalaSuite "interleaving"
4/4 PASS after this fix.

Pure core fix -- independent of the contrib/delta integration. Lands on
this branch because it's a prerequisite for the Delta regression to be
meaningful (without it the Z-Order panic poisons every following test).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Connects core's CometExecRule to the contrib's Delta scan serde so the
Delta-marker CometScanExec produced by CometScanRule flows through the
same `convertToComet(scan, handler)` path as Iceberg / NativeScan / etc.

  - CometDeltaNativeScan re-extends core's `CometOperatorSerde` trait
    (the trait itself is core, not part of the rejected extension SPI;
    every Comet operator handler implements it). `getSupportLevel` /
    `enabledConfig` / `convert` now properly override.
  - DeltaIntegration.scanHandler: a single reflective lookup exposes
    the contrib's companion as a `CometOperatorSerde[CometScanExec]`.
    Returns None on default builds.
  - CometExecRule.transform: new case beside the SCAN_NATIVE_DATAFUSION
    one that recognises the Delta scan marker (scanImpl ==
    "native_delta_compat") and dispatches via the handler.

Build verification:
  mvn compile                  GREEN
  mvn compile -Pcontrib-delta  GREEN

Still pending for end-to-end:
  - per-partition task-list injection (replaces PR2's DeltaPlanDataInjector
    SPI) -- baked into CometExecRDD via another small reflection hook
  - live smoke test once the dylib is rebuilt with --features contrib-delta
    and bundled into the jar

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Connects the contrib's per-partition Delta task-list serialisation into
core's existing `PlanDataInjector.injectPlanData` pipeline. Without this
the native side decodes a tasks-empty `DeltaScan` and returns `EmptyExec`
(0 rows) for every Delta scan.

  - contrib/delta/.../DeltaPlanDataInjector.scala: implements core's
    `PlanDataInjector` trait. `canInject` checks `op.hasDeltaScan` and
    rejects already-injected operators (idempotent). `inject` splices the
    partition's tasks into the operator's common-only DeltaScan envelope
    via `op.toBuilder.setDeltaScan(...)` -- pure typed-proto operations,
    no `ContribOp` envelope.
  - spark/.../operators.scala: `PlanDataInjector.injectors` Seq now
    appends the contrib injector via one reflective Class.forName lookup.
    Default builds get None (no contrib classes on classpath) so the
    list is unchanged; -Pcontrib-delta builds get the Delta injector.

Build verification:
  mvn compile -Pcontrib-delta  GREEN

End-to-end Scala+Maven integration is now complete. Remaining work:
  - rebuild native dylib with `--features contrib-delta` and bundle
    into comet-spark.jar
  - run an isolated test (e.g. OptimizeZOrderScalaSuite "interleaving")
    to confirm the end-to-end path works

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wrap Class.forName calls in `// scalastyle:off classforname`, change
Option[Class[_]] to Option[Class[AnyRef]] to avoid existential type
warnings, reword the doc comment so the verbatim string Class.forName
doesn't trip scalastyle's source-pattern check.

  mvn scalastyle:check -Pcontrib-delta  GREEN

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…th file path

CometExecIterator was wrapping native Parquet failures (e.g. corrupt-footer
errors from kernel-rs reading a broken Delta checkpoint) in `_LEGACY_ERROR_TEMP_2254`,
whose message is literally "Data read failed." -- no file path, no useful context.

That broke tests that mirror Spark/Delta's standard parquet-failure shape, e.g.
SnapshotManagementSuite "should not recover when the current checkpoint is broken"
which asserts the resulting SparkException's message contains both the file path
and "Encountered error while reading file" -- the format
`QueryExecutionErrors.cannotReadFilesError` produces.

Switch the wrapping to `cannotReadFilesError(cause, filePath)` via a new helper
on ShimSparkErrorConverter (which lives in the spark package and can reach the
private InputFileBlockHolder / QueryExecutionErrors). File path is read from
InputFileBlockHolder, with an empty-string fallback when the thread-local
isn't set; the static phrasing still satisfies the test assertion.

Pure core fix -- benefits every native parquet read, not just Delta.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
DeltaTable.forPath(spark, path, fsOptions) with a Hadoop custom-fs scheme
(e.g. fake://) was being claimed by CometScanRule for V1 parquet scans on
the _delta_log/checkpoint.parquet files Delta reads internally. The native
side then crashed at executePlan with `Generic URL error: Unable to
recognise URL "fake:///..."` since object_store doesn't know the custom
scheme.

Add a scheme allowlist check (same set already used in the Iceberg branch
and the contrib Delta path) at the top of the HadoopFsRelation arm; decline
via withInfo when any rootPaths scheme is outside the allowlist so Spark's
Hadoop-FS-aware reader handles the scan.

Fixes DeltaTableSuite "dropFeatureSupport - with filesystem options" and is
also a baseline fix (the same crash reproduces on main per
full-20260415-222735.log).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each `plan_delta_scan` JNI call was creating a fresh `DefaultEngine`. Kernel's
`DefaultEngine<TokioBackgroundExecutor>` spawns one std::thread per executor that
hosts a current_thread tokio runtime, and that runtime's blocking pool (used by
kernel for parquet metadata IO and object_store reads) keeps `spawn_blocking`
worker threads alive for ~10s after each task. Under regression load (hundreds
of Delta scans/minute, each spawning a handful of blocking IO tasks) this
accumulates OS threads faster than tokio reaps them, eventually hitting the
per-process `ulimit -u` (~1300 on macOS) — visible in the log as
`pthread_create EAGAIN` aborts of GenerateIdentityValuesSuite and
MergeIntoUnlimitedMergeClausesScalaSuite ~2 hours into the run.

Replace the per-call `create_engine` with `get_or_create_engine` that returns
an `Arc<DeltaEngine>` from a static cache keyed by `(scheme, authority,
DeltaStorageConfig)`. Engines are constructed lazily on first miss per key and
reused for the lifetime of the JVM, bounding live OS threads by table-storage
diversity rather than by request count. The standalone `create_engine` is kept
(behind `#[allow(dead_code)]`) for tests that want a fresh engine.

`scan.rs` updated to deref `Arc<DeltaEngine>` to `&dyn Engine` at each kernel
call (`builder.build`, `scan.scan_metadata`, `dv.get_row_indexes`).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
DataFusion's `make_array_inner` asserts strict element-type equality (down to
nested field nullability) via `MutableArrayData::with_capacities`. Spark's
`CreateArray` is more permissive: when the analyzer doesn't insert coercion
casts, children can share the same surface struct type but disagree on a
nested field's nullability. Delta's CDF write path builds
`array(struct(id, b, _change_type=lit("delete")), struct(id, b, _change_type=col))`
manually -- one arm's `_change_type` is `Utf8` non-nullable (from a literal),
another is `Utf8` nullable -- and Comet's native serde happily emitted a
`make_array` call. Native execution then panicked:

  assertion `left == right` failed: Arrays with inconsistent types passed to
  MutableArrayData
   left: Struct([..., Field { name: "_change_type", data_type: Utf8 }])
  right: Struct([..., Field { name: "_change_type", data_type: Utf8, nullable: true }])

Decline in `CometCreateArray` when `children.map(_.dataType).distinct.size > 1`
so the JVM evaluator (which doesn't have this strictness) handles it. Fixes 4
`DescribeDeltaHistorySuite "replaceWhere on data column ... enableCDF=true"`
failures.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… kind

Two perf-sweep items from apache#135:

apache#7 parse_delta_partition_scalar TZ parse-once. The per-row
chrono_tz::Tz::from_str (or fixed-offset parse) was happening inside
parse_delta_partition_scalar for every TIMESTAMP partition value, but the
session TZ string doesn't change within a scan. Introduce SessionTimezone
enum (Tz | Offset | Invalid), parse once in build_delta_partitioned_files,
pass the parsed value through. parse_delta_partition_scalar's signature gains
&SessionTimezone and keeps session_tz: &str only for the error message.

apache#2 PlanDataInjector lookup by op kind. injectPlanData was running
`for (injector <- injectors if injector.canInject(op))` against every
operator in the tree; for a 50-op plan with 3 injectors that's 150
canInject calls just to find no match on most ops. Add `opStructCase` to the
PlanDataInjector trait, build a Map[OpStructCase, PlanDataInjector] once at
object init, and look up by op.getOpStructCase before any canInject call.
Iceberg/NativeScan/Delta injectors set their own opStructCase.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tion

Perf-sweep #1 from apache#135. `DeltaIntegration.transformV1IfDelta` is invoked for
every V1 scan in every plan (the bridge is called unconditionally by
CometScanRule before the contrib's own Delta-format check). On
-Pcontrib-delta builds each call was doing `getField MODULE$` +
`getMethod("transformV1IfDelta", ...)` + 4-arg Method.invoke -- a reflection
round-trip per scan.

Cache the resolved (module, method) binding once per JVM as
`transformV1IfDeltaBinding: Option[(AnyRef, Method)]`, single OnceLock-style
volatile. Steady-state per-scan cost drops to one volatile read + one
Method.invoke.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Perf-sweep apache#5 from apache#135. `isSchemaCometCompatible` was allocating a fresh
CometScanTypeChecker(CometDeltaNativeScan.ScanImpl) on every scan. The
checker is stateless w.r.t. its scanImpl tag and is safe to share. Promote
it to a private val on DeltaScanRule; the per-scan fallback-reasons
ListBuffer remains per-call (it's the only mutable input).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…artitioned_files

Perf-audit apache#137 finding #1. The inner `partition_schema.fields()` loop was
calling `.iter().find()` on `task.partition_values` for every field --
O(width × values) per task. Pre-build a per-task HashMap<&str, &str> once,
then O(1) gets. The map is reused across tasks via clear() so the allocation
amortises across all DeltaScanTasks in the scan.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SnapshotManagementSuite "should not recover when the current checkpoint is
broken..." asserts the wrapped FAILED_READ_FILE.NO_HINT SparkException message
contains the file path (e.g. "0001.checkpoint"). de9e0d3 got the error class
right but left the path empty because:

  1. Comet's native scan path does NOT go through Spark's FileScanRDD, so the
     standard InputFileBlockHolder thread-local is never populated.
  2. ShimSparkErrorConverter.wrapNativeParquetError was reading from
     InputFileBlockHolder, getting null, and passing "" to
     cannotReadFilesError -- producing "Encountered error while reading file . "
     (with the empty path), which the test rejected.

Plumb per-partition file paths from CometNativeScanExec (where they're known
at planning time) -> CometExecRDD -> CometExecPartition -> CometExecIterator
-> wrapNativeParquetError. CometNativeExec.doExecuteColumnar (the actual call
site that constructs the iterator for query trees with a scan) collects file
paths from any CometNativeScanExec leaves and passes them through the same
CometExecRDD parameter.

Verified with a /tmp/cometdiag.log file sentinel that the existing logWarning
diags were being silently dropped by the test's `quietly { ... }` block,
which is why my earlier "the wrap isn't being reached" conclusion was wrong.

Test results after fix: SnapshotManagementSuite checkpoint-broken 2/2 PASS
(was 0/2 with empty path). The other 3 fix clusters
(de9e0d3+effe5f76+56c2b011) continue to pass: replaceWhere CDF 8/8,
dropFeatureSupport 1/1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…+ safeguards

Five fixes from the comprehensive code review of contrib-delta-direct:

1. Implement the missing InputFileBlockHolder hook in CometExecRDD.compute.
   Several docs referenced `CometExecRDD.setInputFileForDeltaScan` but no such
   method existed and nothing called `DeltaInputFileBlockHolder.set`, leaving
   Delta's UPDATE/DELETE/MERGE flows (which use `input_file_name()` to find
   touched files) silently looking at an empty path. Now set the thread-local
   to the partition's first file (one-per-partition is enforced by
   DeltaScanRule when input_file_name() is referenced), unset on task
   completion. Stale doc references updated to point at the real call site.

2. DV filter ordering safeguards. DeltaDvFilterExec's `current_row_offset`
   tracking assumes physical row ordering from the parquet scan. Override
   `maintains_input_order() = [true]` and
   `benefits_from_input_partitioning() = [false]` so any future optimizer
   that wants to insert a RepartitionExec / SortPreservingMergeExec is
   forced to bail rather than silently re-order rows.

3. Tighten IgnoreMissingFileSource's `is_not_found` Display fallback. The
   prior `msg.contains("not found")` would match unrelated parquet messages
   like "row group statistics not found" or "page index not found" and
   silently swallow them as missing-file (returning empty results instead
   of failing). Restrict to recognised NotFound prefixes from object_store /
   S3 / FS error formats.

4. Multi-line regex for native parquet errors in CometExecIterator. Native
   parquet errors with embedded newlines (e.g. footer hex dumps) would slip
   past the single-line `^Parquet error: .*$` and surface as bare
   CometNativeException. Add `(?s)` so `.` spans newlines.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The post-review fixes added/modified scaladoc that broke spotless line-length
rules. Apply spotless:apply across the three touched files. Verified with
test-compile.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
schenksj and others added 10 commits May 19, 2026 14:28
Adds a TODO note linking the decline-and-fallback to
apache/datafusion#22366. Lets a future maintainer find the upstream fix
when it lands and remove the workaround.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…log replay

Closes the P1 credential-asymmetry gap carried from apache#3932 (commit 461fa4f).
Previously the kernel-rs log-replay path's DeltaStorageConfig only honored
explicit static keys (`fs.s3a.access.key` / `fs.s3a.secret.key` /
`fs.s3a.session.token`) set in core-site.xml. Users running under
SimpleAWSCredentialsProvider / TemporaryAWSCredentialsProvider /
AssumedRoleCredentialProvider / IAMInstanceCredentialsProvider would see
data-file reads authenticate (those go through Comet's existing native
`build_credential_provider`) but log replay fail.

Resolution happens Scala-side via reflection against
`org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderList` -- the
same Hadoop credential machinery Spark uses everywhere else. The resolved
(access_key, secret_key, session_token) tuple is stuffed into the
`storageOptions` map under the standard Hadoop keys before the JNI call.
Reflective because hadoop-aws is an optional dep; absence falls through to
static-only behavior (any user without S3 stays unaffected).

Architecture note: an in-crate cherry-pick of 461fa4f wasn't viable here
because the JNI lives in `contrib/delta/native/` -- a standalone Cargo
crate that deliberately doesn't depend on core (to keep the arrow-57 /
arrow-58 split clean). The Scala-side approach has the same correctness
properties and avoids the crate boundary entirely.

Method handles cached via @volatile Option[Option[Binding]] -- the augment
path runs on every Delta scan; resolving the Class + getMethod chain on
each call would be a per-scan reflection round-trip just to find the same
handles every time.

SNAPSHOT resolution: log replay completes in seconds, well within any
reasonable credential TTL. Long-running data reads continue to use Comet's
refresh-capable native credential provider.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ad-bearing

Expand the comment on the CM-name + checkLatestSchemaOnRead=false guard to
explain the specific failure mode (column_mappings from one snapshot vs.
parquet physical names from another after a concurrent ALTER TABLE). The
guard is conservative but necessary; a future reader of the code shouldn't
mistake it for laziness.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…uet field IDs

Implements apache#142. Previously declined at DeltaScanRule.scala:271 because the
contrib's native path matched parquet columns by name and CM-id mode demands
ID-based matching. Comet core's `schema_adapter.rs` already supports field-ID
matching via `use_field_id` + `ignore_missing_field_id` flags; this PR wires
the Delta contrib through that machinery.

Five mechanical changes:

  1. Add `parquet.field.id` (Spark's standard StructField metadata key for
     parquet field IDs) and `delta.columnMapping.id` (Delta's CM-id storage
     key) as named constants in DeltaReflection.

  2. Add `use_field_id` bool to DeltaScanCommon proto (field 17).

  3. CometDeltaNativeScan.translateDeltaFieldIdToParquet walks the schema
     tree recursively (StructType -> nested fields, ArrayType -> element,
     MapType -> key/value) copying `delta.columnMapping.id` to
     `parquet.field.id` on every StructField. Spark's `ParquetUtils.hasFieldId`
     -- which schema2Proto and serializeDataType's StructType arm read --
     looks at `parquet.field.id`, so this is what makes the field IDs actually
     reach the proto.

  4. In `convert()`, detect CM-id mode from snapshot metadata and apply the
     translator to data_schema / required_schema / partition_schema before
     calling `schema2Proto`. Set `commonBuilder.setUseFieldId(true)` so the
     native dispatcher passes `use_field_id=true` to `init_datasource_exec`.

  5. native/core/src/execution/planner/contrib_delta_scan.rs uses
     `common.use_field_id` from the proto instead of the hardcoded `false`.

The recursive translator handles nested struct / array / map field IDs --
the "complex sub-types" gotcha from earlier CM-name work.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…n-blocking

Expand inline comments on the three remaining DeltaScanRule fallback gates
(TahoeLogFileIndexWithCloudFetch, __delta_internal_* synthetic columns,
CometScanTypeChecker decline) to document why they're correctness-correct as
fallback-only paths and to capture the implementation sketches for any future
native-perf work.

No behavioral change. Each gate was verified in the recent regression to
either never fire (cloud-fetch -- OSS Delta doesn't have the class) or fire
on a path Spark's reader handles correctly without test failures (synthetic
columns, schema type decline).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pache#144)

Native ExecutionPlan wrapper that appends Delta's `__delta_internal_row_index`
(UInt64) and `__delta_internal_is_row_deleted` (Int32) columns to scan output
batches. Replaces the decline for these synthetic columns where the surrounding
plan asks for them (UPDATE/DELETE/MERGE flows).

- `synthetic_columns.rs`: new module with DeltaSyntheticColumnsExec. Same
  physical-order invariant as DeltaDvFilterExec (one file per partition;
  parquet emits in file row order). Appends columns via a single sweep over
  the DV-sorted indexes alongside the batch's row range.
- proto: add `emit_row_index` (18) and `emit_is_row_deleted` (19) flags on
  DeltaScanCommon.
- contrib_delta_scan.rs: wire three mutually-exclusive wrap modes -- synthetic
  exec, DV filter exec, or passthrough.

NOT YET WIRED Scala-side: when scan.requiredSchema contains these synthetic
column names, CometDeltaNativeScan still needs to (a) strip them from the
proto schemas (so the native parquet reader doesn't try to read them) and
(b) set the proto emit flags. Until that lands the existing decline gate at
DeltaScanRule.scala:331-342 stays active.

Native module compiles clean. Full linker validation deferred -- disk-space
pressure from concurrent regression run blocked the full link cycle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…olumns

Completes apache#144. CometDeltaNativeScan.convert now:
  - Detects __delta_internal_row_index / __delta_internal_is_row_deleted in
    scan.requiredSchema
  - Verifies they form a contiguous SUFFIX of required_schema (so wrapped
    DeltaSyntheticColumnsExec's appended-at-end output matches Spark's
    expected layout); declines otherwise
  - Strips them from the proto required_schema and data_schema so the parquet
    reader doesn't look for columns that aren't on disk
  - Filters them out of projection_vector (their -1 sentinel would have been
    out-of-bounds for native usize)
  - Sets the proto emit_row_index / emit_is_row_deleted flags so the
    dispatcher wraps the parquet scan in DeltaSyntheticColumnsExec to append
    them back

DeltaScanRule: removed the decline gate at scanWithMappedSchema. Removed the
belt-and-suspenders guard in CometDeltaNativeScan now that the convert path
handles synthetics rather than falling back.

Combined with the native exec from 2cb9188, this lets UPDATE/DELETE/MERGE
flows that materialise the DV deletion flag stay on the native path instead
of falling back to Spark's Delta reader.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The decline at DeltaScanRule for "$ScanImpl does not support Parquet field ID
matching" was a separate gate from CM-id mode, fired when the user explicitly
set spark.sql.parquet.fieldId.read.enabled=true AND scan.requiredSchema
carried Spark's standard `parquet.field.id` metadata (non-Delta-id path that
nevertheless wants field-ID matching).

The same native machinery wired for CM-id (apache#142, commit 7ace165) handles
this case unchanged -- `serializeDataType`'s StructType arm reads
`ParquetUtils.hasFieldId` for nested types and `schema2Proto` does the same
for top-level. The only thing needed was setting `use_field_id=true` on the
proto.

CometDeltaNativeScan.convert now sets `useFieldIdActive` from EITHER CM-id
mode OR (Spark's PARQUET_FIELD_ID_READ_ENABLED + hasFieldIds). Gate removed
from DeltaScanRule.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…commit_version)

Unblocks the second gate in DeltaScanRule.applyRowTrackingRewrite, which used
to decline native execution when a row-tracking-enabled table HAD no
materialised column names (rowIdPhysical / rowVerPhysical both empty,
meaning Delta expects synthesis from baseRowId + physical row index).

End-to-end wiring:

  - scan.rs: extract baseRowId / defaultRowCommitVersion per scan file from
    each ScanMetadata batch's underlying RecordBatch
    (`fileConstantValues.baseRowId` / `defaultRowCommitVersion` -- not
    exposed by kernel's `ScanFile`). Uses an `RawEntryAcc` context struct
    because `visit_scan_files` requires `fn` (not `FnMut`), so the per-batch
    row-tracking lookup vec lives in the context.
  - jni.rs: thread the extracted values into DeltaScanTask proto fields 6/7
    (already present, previously hard-None'd).
  - proto: add `emit_row_id` (20) and `emit_row_commit_version` (21) flags
    on DeltaScanCommon.
  - synthetic_columns.rs: extend DeltaSyntheticColumnsExec to emit the two
    new columns (row_id = baseRowId + physical_row_index per file,
    row_commit_version = defaultRowCommitVersion constant per file). Nullable
    Int64 columns; null-valued when the file has no row tracking.
  - contrib_delta_scan.rs: force per-file FileGroups when emit_row_id /
    emit_row_commit_version is on (the per-partition row offset counter
    doesn't reset across files within a FileGroup, so baseRowId arithmetic
    requires 1:1 file-to-partition mapping just like the DV case).
  - CometDeltaNativeScan: detect row_id / row_commit_version in
    scan.requiredSchema, add to synthetic-column suffix check + strip from
    proto schemas + projection_vector, set emit flags.
  - DeltaScanRule.applyRowTrackingRewrite: stop declining the no-materialised
    case; return None (no rewrite needed) so nativeDeltaScan proceeds and
    CometDeltaNativeScan.convert sets the synthesis path.

Also unblocks the related field-id-matching gate when
spark.sql.parquet.fieldId.read.enabled is true (commit ee9f9e4) -- the
same use_field_id machinery handles both CM-id and non-CM-id paths.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
With native synthesis of `__delta_internal_is_row_deleted` wired in apache#144,
the `outputHasIsRowDeleted` branch of `scanBelowFallsBackForDvs` no longer
needs to force a decline. CometDeltaNativeScan.convert detects the column
in scan.requiredSchema and routes through DeltaSyntheticColumnsExec to
append it -- the surrounding Delta projection that filters on the column
runs against the synthesised output without falling back to Spark.

Only `batchFallback` (TahoeBatchFileIndex with DV-bearing AddFiles) still
forces decline because our native path can't extract DV info from
pre-materialised batch indexes -- separate issue.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
schenksj and others added 30 commits May 22, 2026 11:10
Delta 4.1 own-test-suite regression failures on CDC reads under
row-tracking-enabled tables (especially the coordinated-commits batch
backfill variant DeltaCDCScalaWithCatalogOwnedBatch2Suite) surfaced as
'Comet Internal Error: Output column count mismatch: expected 14, got 13'.

The scan declared 14 output columns including the per-file
`default_row_commit_version` metadata column, but the native
DeltaSyntheticColumnsExec only knew about `base_row_id`, so the column
was dropped on the way through and the upstream operator saw N-1.

This commit:
  * Adds `default_row_commit_version` to JVM-side `syntheticNames` and
    `fixedMetadataNames` sets in CometDeltaNativeScan so it's included
    in `metadataColumnNamesEmitted` and the proto.
  * Adds the matching `META_DEFAULT_ROW_COMMIT_VERSION` constant, field
    schema, and emit branch in `synthetic_columns.rs`; extends
    `TaskMetadata` with the new field and wires it from
    `core_glue.rs` (the proto already carried the value via
    `task.default_row_commit_version`).

Removes the `DeltaSyntheticColumnsExec: unknown metadata column name
'default_row_commit_version'` failure path. A second off-by-one in the
same suite remains under investigation (separate column drop).

Also bundled:
  * `CometScanWithPlanData.perPartitionFilePaths` trait method +
    `operators.scala` union-path collector now matches the trait
    instead of just `CometNativeScanExec`, so MERGE/UPDATE/DELETE
    flows that embed a Delta scan in a parent native tree no longer
    see empty `input_file_name()` -> `DELTA_FILE_TO_OVERWRITE_NOT_FOUND`.
  * `CometExecRDD.compute` populates `InputFileBlockHolder` whenever
    `partition.filePaths.nonEmpty` (not only the single-file case),
    matching PR apache#3932's approach.
  * Re-enables the two MERGE reproducers in `CometDeltaSpecialCharFilenameSuite`.
  * Adds `DeltaHiveTest.scala` Comet-wiring hunk to `4.1.0.diff`
    (the piece PR apache#3932's 4.0.0.diff had that ours was missing).
  * New `CometDeltaRegressionReproSuite` (one repro per root-cause
    cluster identified in the 4.1 regression) and a `CometDeltaCdcSuite`.
  * New `.github/workflows/delta_regression_test.yml` workflow that
    invokes `contrib/delta/dev/run-regression.sh` across Delta 3.3.2 /
    4.0.0 / 4.1.0 with smoke -> full gating.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…etic schema augmentation

Follow-up to the metadata-column fix in 97c953a. The
`default_row_commit_version` column was added to `metadataColumnNamesEmitted`
and to the native synthetic-column emit branches, but
`isExtraSyntheticName` (which augments `requiredSchemaFields` with
metadata cols not in `scan.requiredSchema`) was missing it. The
resulting `requiredSchemaForProto` was one column short, so
`finalOutputIndices` was sized 13 instead of 14 and the native
`ProjectionExec` tried to read past the schema end.

Adds `default_row_commit_version` to the explicit name list alongside
`base_row_id`. Drops DeltaCDCScalaWithCatalogOwnedBatch2Suite from
6 -> 5 failures (the "CDC read's commit timestamps are correct
under different timezones" test now passes).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…d data length

When the Delta scan's `relation.dataSchema` includes a synthetic column
(e.g. `__delta_internal_is_row_deleted` from Delta's DV rewrite for
CDC reads of row-tracking-enabled tables), the JVM-side
`projectionVector` partition-tail indexes were computed off
`fileDataSchemaFields.length` (un-stripped, includes the synthetic),
but native receives `dataSchemaForProtoStripped` (synthetic removed
when `needsSyntheticEmit`). The partition tail then points one past
the end of the native data+partition schema, panicking
`ProjectionExprs::from_indices` with
"index out of bounds: the len is N but the index is N".

Compute partition-tail indexes from the non-synthetic data length
instead, mirroring what native sees. Also rewrite the
data-or-partition lookup in `requiredIndexes` to walk the
synthetic-filtered data field map so each entry points to the
correct native-side position.

Drops DeltaCDCScalaWithCatalogOwnedBatch2Suite from 5 -> 4 failures
("aggregating non-numeric cdc data columns" now passes). The
remaining 4 are "Results do not match" assertions (data correctness,
not native crashes) -- separate investigation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CDC "delete events" / "insert events" reads attach a non-empty
`rowIndexFilters` map to `CdcAddFileIndex` / `TahoeRemoveFileIndex`.
This flips the DV bitmap meaning: native batch reads filter OUT the
rows in the bitmap, but CDC needs the rows IN the bitmap (the rows
being newly deleted/inserted). Our native scan only implements the
batch semantics, so it returned the wrong rows -- e.g. for a DELETE
"id > 20" on a file [20-24] with DV cardinality 4, the native scan
emitted the non-DV'd row (id=20) instead of the DV'd rows (21-24).

Add `DeltaReflection.hasInvertedRowIndexFilters` reflection helper
and gate `DeltaScanRule.nativeDeltaScan` to decline when set. Spark's
Delta reader then handles these correctly.

Result: DeltaCDCScalaWithCatalogOwnedBatch2Suite drops from 4 -> 0
failures (55/55 pass). Parent DeltaCDCScalaSuite still 55/55,
DeltaCDCSQLSuite still 54/54, contrib local Scala suites still
62 succeeded / 0 failed / 1 canceled (pre-existing).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ata-skipping

`refreshedSnapshotFiles` called `snapshot.filesForScan(Nil, false)` --
passing an empty filter list, so the refreshed file list was the FULL
table. This bypassed Delta's planning-time partition pruning and
stats-based data-skipping: for a PreparedDeltaFileIndex on a partitioned
table, our scan would read every file regardless of the predicate.

Surfaced as StatsCollectionSuite "gather stats" asserting
`recordsScanned(df.where("id = 1")) == 1` but Comet's native scan
returning 9 (all rows from all files).

Thread the scan's `partitionFilters ++ dataFilters` through
`extractBatchAddFiles` to `refreshedSnapshotFiles`, then to
`snapshot.filesForScan(filters, false)`. Delta evaluates partition
filters against partitionValues and data filters against AddFile
stats, returning only the surviving files.

Result: StatsCollectionSuite 65/67 -> 65 succeeded with "gather stats"
+ 1 other test now passing (was 4 failures, down to 2 -- the remaining
two read commit JSONs via LogicalRDD, separate code path).
DeltaCDCScalaWithCatalogOwnedBatch2Suite stays at 55/55; contrib local
suites stay at 62/62.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors MergeIntoSchemaEvolutionBaseExistingColumnSQLPathBasedCDCOnDVsPredPushOffSuite's
"upcast int source type into long target" failure shape: target written
with DV + CDC enabled, MERGE INSERT + UPDATE *, read back. Upstream
returns 0 rows (expected 4) because the post-MERGE read goes through
our scan applying a synthetic `__delta_internal_is_row_deleted` filter,
and something in the scan/Filter interaction drops all rows.

This local reproducer sets the equivalent session configs
(persistentDeletionVectors.enabled for merge, useMetadataRowIndex=false,
enableChangeDataFeed default, enableDeletionVectors default) and
performs the same operations, but does NOT yet fail. The Delta test
harness must set up additional state we're not mirroring. Committed
as-is so the repro is in place for the next iteration.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Spark wraps file-source partition columns and per-batch constants in
`ConstantColumnVector`. Comet's `NativeUtil.exportBatch` and
`Utils.getBatchFieldVectors` previously threw
`Comet execution only takes Arrow Arrays, but got class
ConstantColumnVector` whenever such a batch reached a Comet operator --
notably OPTIMIZE on a Delta table with DVs, where the FileFormatWriter
threads a constant column through Comet's columnar pipeline.

Add `Utils.materializeConstantColumnVector` that allocates a fresh
Arrow `FieldVector` sized to the batch's row count and pre-fills it
with the constant value (or all-nulls when `isNullAt(0)`). Handles
the primitive types the OPTIMIZE / partition-column shapes surface:
Boolean, Byte, Short, Int, Long, Float, Double, Date, Timestamp
(MICROSECOND/UTC), String, Binary, Decimal128, Null. Throws for
unsupported types so we surface gaps loudly.

Wire it into both export paths so any Comet operator can ingest a
batch containing constants:
  - `NativeUtil.exportBatch`: materialise inline, then `Data.exportVector`
    through the existing C-Data path.
  - `Utils.getBatchFieldVectors` (used by `serializeBatches`):
    materialise into the field-vectors list ahead of the Arrow IPC
    write.

Result: OptimizeCompactionSQLSuite "optimize command with DVs" no
longer dies at NativeUtil.exportBatch with the ConstantColumnVector
error. The test now hits a separate failure (`NoSuchElementException`
during a later assertion that depends on our DV materialisation),
which is a distinct issue tracked separately.

No regressions: DeltaCDCScalaWithCatalogOwnedBatch2Suite 55/55,
contrib local Scala suites 63 succeeded / 0 failed / 1 canceled
(pre-existing), other 23 optimize tests still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…re read

`DeltaReflection.extractTableRoot` returns the double-URL-encoded URI
form (`pathToSingleEncodedUri` -> `path.toUri.toString`) so the NATIVE
side, which reads files via RawLocalFileSystem.pathToFile (which uses
the URI raw path verbatim), lands on the literal on-disk filename.

The JVM-side path was passing that same double-encoded string straight
into `new Path(tableRoot)` and on to `HadoopFileSystemDVStore.read`,
which resolves the DV file via Hadoop FS. Hadoop's raw path encoding
ended up doubly-encoded relative to the on-disk filename, so the DV
store opened a non-existent file and threw NPE on the null result.

`materializeDeletedRowIndexes` then returned `None`, our DeltaScanRule
treated that as a fatal DV-failure and declined, and the scan fell
back to Spark+Delta -- which on OPTIMIZE under DVs leads to a
NoSuchElementException downstream because a subsequent commit wasn't
written.

Fix: URL-decode `tableRoot` once before passing to `new Path(...)`.
Now Hadoop's URI form re-encodes back to the single-encoded form,
RawLocalFileSystem.pathToFile decodes once, and the DV store hits
the right on-disk file.

Result: `OptimizeCompactionSQLSuite "optimize command with DVs"`
now passes (1/1). No regressions: contrib local 63/63,
DeltaCDCScalaWithCatalogOwnedBatch2Suite still 55/55.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…=false

Delta's `deletionVectors.useMetadataRowIndex` is internal+default=true.
The DeletionVectorsTestUtils.PredicatePushdownDisabled mixin flips it
to false, switching Delta from parquet's `row_index` virtual column
to a synthetic `__delta_internal_is_row_deleted` column above the
scan.

In that mode, post-MERGE-with-persistentDV reads return wrong row
counts -- the original target files emit zero rows through our scan
path. The DV bitmap reads correctly, the synthetic emit logic mirrors
Delta's, but the batches downstream of the synthetic exec come back
empty (root cause is in the interaction between the Delta-side
filter rewrite + our per-file-group native plan + the DV synthetic
emit; needs deeper file-level investigation we can't land here).

Pragmatic fix: when `useMetadataRowIndex=false` AND
`__delta_internal_is_row_deleted` is in scan.requiredSchema AND any
AddFile carries a DV, decline. Spark+Delta then handles the read
correctly.

Production impact bounded: `useMetadataRowIndex` is internal+default
true, so production reads use parquet's `row_index` path (which our
scan handles correctly). The fallback only triggers for tests / users
who explicitly disable the conf.

Result: MergeIntoSchemaEvolutionBaseExistingColumnSQLPathBasedCDCOnDVsPredPushOffSuite
goes from 33/42 -> 42/42. No regression elsewhere:
DeltaCDCScalaWithCatalogOwnedBatch2Suite 55/55,
OptimizeCompactionSQLSuite 24/24, StatsCollectionSuite 65/67
(2 unrelated `recompute stats` failures), contrib local Scala 63/63.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…adata.file_path

When Spark packs multiple files into one Spark partition AND our scan
projects `_metadata.file_path` (the recompute-stats / per-file-aggregate
shape), we create multiple DataFusion file_groups (one per file) so
DeltaSyntheticColumnsExec emits a 1:1 per-file `file_path`
metadata mapping. But Spark consumes only ONE DataFusion partition per
Spark partition -- the 2nd+ files' batches are silently dropped, their
rows never reach Spark, and any per-file groupBy missing them.

Surfaced in StatsCollectionSuite "recompute stats multiple columns and
files" / "recompute stats on partitioned table": recompute does
`groupBy(_metadata.file_path).agg(min, max, ...)`. With 3 files and
2-file-per-partition packing, one file's rows go missing and its
recomputed stats come out null.

Fix: tag scans that project `file_path` with
`NeedsInputFileNameOption` (same plumbing as the
`input_file_name()` path), which propagates through to
`oneTaskPerPartition=true` on `CometDeltaNativeScanExec`. Each file
then becomes its own Spark partition with its own DataFusion
partition (1:1 alignment, no dropped data).

Narrow gate: only triggers when `file_path` is in scan.output. Other
per-file metadata cols (`base_row_id`, `default_row_commit_version`,
row-tracking helpers) commonly appear in unrelated scans and the
existing file packing handles them correctly.

Result: StatsCollectionSuite 65/67 -> 67/67. No regression:
OptimizeCompactionSQLSuite still 24/24, contrib local 64/64, parent
CDC 55/55 (separate suite still running, will verify).

Also drops a now-redundant local CometDeltaRegressionReproSuite stats
test that reproduces the path locally; it passes today because our
withDeltaTable doesn't trigger the multi-file packing that the
upstream test exercises, but the upstream test itself is the
authoritative reproducer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Refactor `extract_storage_config` in the JNI bridge to delegate to a
pure `delta_storage_config_from_map`, making the kernel/Hadoop key
matrix testable in isolation. Removed the now-unused `map_get_string`
helper.

Add two test surfaces that document the known gaps so closing them
forces an obvious test update:

* `jni::tests::extract_storage_config_matrix` -- positive coverage
  for kernel-style keys, Hadoop-style fallbacks, and force-path-style.
* `jni::tests::extract_storage_config_known_gaps` -- negative
  assertions for GCS (`gcp_*`), per-bucket S3, and Hadoop Azure
  account/OAuth keys -- they pass today because the gap exists, and
  flip to failures the moment we wire those keys through.
* `CometDeltaCredentialAuditSuite` (Scala) -- pairs the native gaps
  with their JVM-side companions:
  - asserts s3/s3a/gs/wasb/wasbs prefixes extract the expected keys;
  - GAP: abfs/abfss schemes drop `fs.azure.*` keys because
    `NativeConfig.objectStoreConfigPrefixes` only registers
    `fs.abfs.` / `fs.abfss.` for them, missing OAuth/MSI creds that
    Hadoop users have historically set under `fs.azure.*`;
  - GAP: `augmentWithResolvedAwsCredentials` doesn't synthesize
    per-bucket `fs.s3a.bucket.<name>.*` keys after credential-chain
    resolution.

No behavior change for currently-supported credential paths; the
audit only adds regression coverage.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CometDeltaScanConfAuditSuite locks in the contract for every SQLConf /
table property that can change scan output. Each conf gets either a
CONTRACT_NATIVE assertion (engages + matches vanilla) or a
CONTRACT_FALLBACK assertion (declines + matches vanilla):

* DV: useMetadataRowIndex=false declines on DV-bearing reads
* DV: useMetadataRowIndex=true (default) engages native
* DV: MERGE_USE_PERSISTENT_DELETION_VECTORS=true read-side engages
* columnMapping.mode in {none,name,id} engages native
* rowTracking enabled does not disengage
* Comet kill switch (spark.comet.scan.deltaNative.enabled=false) forces
  fallback

Plus one GAP marker:
* path-based readChangeFeed routes through DeltaCDFRelation which our
  rule doesn't intercept -- table-API CDC reads (covered by
  CometDeltaCdcSuite) do engage. Flip this when the rule learns
  DeltaCDFRelation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CometDeltaMetadataColumnAuditSuite projects every recognised
_metadata.* / row-tracking column individually and asserts the values
match vanilla Spark. Past bugs that motivated this:

* silent nulls / row drops when projecting _metadata.file_path on
  multi-file partitions (commit 8c3cf6c)
* column-count off-by-one for default_row_commit_version emit
  (commit 97c953a)
* synthetic-column stripped-length bug in projection_vector
  (commit 208f083)

Covers:
* _metadata.file_path / file_name / file_size / file_modification_time
* _metadata.file_block_start / file_block_length
* _metadata.row_index (row-tracking-enabled)
* input_file_name()
* _metadata.row_id (unmaterialised)
* _metadata.row_commit_version
* multi-metadata projection on RT + DV (the off-by-one repro shape)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CometDeltaTypeRoundTripAuditSuite covers Spark/Delta type round-trip
through the native scan vs vanilla:

Positive matrix:
* primitives at edge values (TINYINT/SMALLINT/INT/BIGINT extremes,
  FLOAT/DOUBLE +/-Infinity, NULLs)
* DECIMAL at (5,0) / (10,2) / (18,6) / (38,18)
* DATE / TIMESTAMP / TIMESTAMP_NTZ
* BINARY (empty, single byte, hex, long)
* nested STRUCT<>, ARRAY<>, MAP<>
* CHAR(N) / VARCHAR(N)

Discovered gaps (assert-and-document; promote when fixed):
* column mapping mode=name + complex types triggers AQE
  AdaptiveSparkPlanExec.setLogicalLinkForNewQueryStage assertion --
  likely missing/mismatched logical link on the serialized native
  scan node
* column mapping mode=id post-RENAME COLUMN triggers the same AQE
  assertion -- same root cause suspected
* VARIANT: probe + tolerant assertion (engages OR declines, but if it
  engages results must match vanilla -- guards against silent
  corruption when VARIANT support is added)

The CM-mode AQE finding is a follow-up worth its own ticket.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Delta stores partition values as strings in the log; coercion back to
the partition column's logical type happens at scan time. Past bugs
in this area are silent (wrong epoch, lost precision, double-applied
timezone), so we lock in the contract with native-vs-vanilla matches.

Matrix:
* DATE (epoch / current / max + partition-pruning filter)
* TIMESTAMP (UTC values, microsecond fractions)
* TIMESTAMP_NTZ (no TZ shift)
* DECIMAL(18,6) at sign/precision extremes
* BIGINT (Long.MinValue / MaxValue)
* STRING with spaces, percent-encoded literals, empty, NULL
* BOOLEAN
* Multi-column (dt, region) with IsNull pruning
* Session-TZ swap (write under UTC, read under LA) -- guards against
  double-applied timezone normalization

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CometDeltaFilterPushdownAuditSuite locks in correctness for the Spark
filter shapes that flow into the native scan (whether pushed down or
post-filtered, the result rows must match vanilla):

Positive matrix:
* EqualTo, EqualNullSafe (=, <=>)
* GreaterThan/LessThan/Ge/Le
* IsNull / IsNotNull on data column
* In / NotIn (incl. InSet-sized list)
* StringStartsWith / EndsWith / Contains
* AND/OR/NOT combinations
* Partition-column filters (EqualTo, In, GreaterThan)
* Nested struct field reference (s.a, s.b)
* Cast-coerced literal (INT literal vs BIGINT column)

Discovered gap (assert-and-document):
* DV-bearing table + range filter `id > 10 AND id < 25` -- native
  returns 19..24 while vanilla returns 11..24. Suspect stats-based
  data-skipping interaction with DV-bearing files reporting min/max
  that exclude DV'd rows. Filed as a follow-up; the GAP-marker test
  asserts native row count is LESS than vanilla and flips to a
  failure (with promotion guidance) once the mis-pruning is fixed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Audit 5 surfaced a silent correctness bug: a range filter over a
DV-bearing single-file table (rows 0..29, DELETE id%4=0) returned only
19..24 where vanilla Delta returns 11..24.

Root cause: when the synthetic `__delta_internal_is_row_deleted` column
is emitted, DeltaSyntheticColumnsStream walks a running
`current_row_offset` counter to look up each row's membership in the
file's `deleted_row_indexes` (the materialized DV). That counter assumes
the parquet reader returns EVERY row in physical order. But the same
scan also pushes `data_filters` down to the parquet reader, which skips
non-matching rows -- so `current_row_offset` no longer tracks the true
parquet row_index, and the DV bitmap gets applied to the wrong stream
positions (the first N rows of the post-filter stream rather than the
DV'd physical rows). The outer Spark Filter then drops those wrongly
flagged rows.

Fix: suppress data-filter pushdown to parquet when emit_is_row_deleted
is set (core_glue.rs). Spark's outer Filter still applies the predicates
correctly; we only forgo parquet-level pruning for this case. Partition
filters are unaffected (they prune file groups earlier). DV filtering
without the synthetic column (the common read path) still pushes filters
down normally.

Promotes the audit GAP-marker test in CometDeltaFilterPushdownAuditSuite
to a positive `assertDeltaNativeMatches`, plus two extra filters that
straddle the DV'd indexes. Verified no regression across the
features/CDC/metadata/scan-conf suites (28 tests).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Audit 3 surfaced an INTERNAL_ERROR on column-mapping tables: any plan
with a shuffle above the native Delta scan (orderBy / join / aggregate)
hit AdaptiveSparkPlanExec.setLogicalLinkForNewQueryStage's
`assert(link.isDefined)`. Originally thought to be complex-types or
post-rename specific; isolated to: CM-mode + ANY exchange.

Root cause: nativeDeltaScan propagated the logical link only from
`op.wrapped` (the wrapped FileSourceScanExec). In column-mapping mode
Delta builds that scan WITHOUT a logicalLink even though the surrounding
CometScanExec (`op`) has one. So the contrib's exec inherited no link,
and the built-in CometExecRule "set up logical links" pass -- which
re-derives a CometExec's link from `originalPlan.logicalLink` (==
`op.wrapped.logicalLink`) and UNSETS the tag when empty -- guaranteed the
exec ended up linkless. When AQE then wrapped it in a query stage at the
exchange boundary, the assertion fired.

Fix: seed the link as `op.wrapped.logicalLink.orElse(op.logicalLink)`,
set it on BOTH op.wrapped (so the downstream CometExecRule pass agrees)
and the exec. JVM-only change; no native rebuild required.

Promotes the two audit GAP markers in CometDeltaTypeRoundTripAuditSuite
to positive round-trip assertions and adds a minimal simple-types +
orderBy/aggregate regression that isolates the root cause. Verified no
regression across column-mapping / scan-conf / native / features suites
(34 tests).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…version

Delta own-suite regression (DeltaTimeTravelSuite / DeltaHistoryManager*):
a versionAsOf/timestampAsOf read returned the LATEST version's data
(e.g. v0 returned 20 rows instead of 10).

Root cause: DeltaReflection.refreshedSnapshotFiles (used by
extractBatchAddFiles for PreparedDeltaFileIndex) unconditionally called
deltaLog.update() to refresh the snapshot to HEAD before filesForScan.
That refresh exists to pick up fresh deletion-vector descriptors for
consecutive DELETEs that reuse a cached FileIndex -- but for a
time-travel query it discards the pinned version and returns current
files.

Fix: gate the head-refresh on PreparedDeltaFileIndex.versionScanned.
When it's Some(v) (time travel), use the pinned preparedScan.scannedSnapshot
instead -- historical versions are immutable so the DV-staleness reason
to refresh does not apply. Non-time-travel reads keep the head refresh.

Reproduced by CometDeltaTimeTravelReproSuite (versionAsOf=0 must return
v0's rows, not head); fails before / passes after. JVM-only change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…TT gate

Follow-up to 7c59938. Rather than gate the head-refresh on
versionScanned (a band-aid), drop the head-refresh entirely: always
re-query `preparedScan.scannedSnapshot` -- the snapshot the scan was
prepared against, which is exactly what vanilla Spark+Delta reads
(PreparedDeltaFileIndex extends TahoeFileIndexWithSnapshotDescriptor
over it).

The earlier `deltaLog.update()` to head (commit 830c979) was meant to
pick up DV descriptors written after a cached FileIndex was built, but
refreshing to head makes Comet read a DIFFERENT snapshot than vanilla:
it diverges on the consecutive-DELETE / DeltaLog-cache case (that
commit's own point apache#2 acknowledges vanilla returns the "stale" count)
and it returned the LATEST version for time-travel reads. Re-querying
the prepared snapshot via filesForScan still picks up that snapshot's
freshest DV descriptors, matches vanilla in every case, and needs no
time-travel special-casing.

Renamed refreshedSnapshotFiles -> preparedSnapshotFiles. Verified: 24
contrib tests pass incl. the DV double-DELETE (CometDeltaColumnMapping
Suite) the head-refresh was added for, and CometDeltaTimeTravelRepro
Suite. JVM-only.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ptiveBroadcastExec)

Delta own-suite regression F1: MERGE into a partitioned table (and any
broadcast join whose key is the partition column) over a native Delta
scan threw "CometSubqueryAdaptiveBroadcastExec ... does not support the
execute() code path."

Root cause (two layers, both confirmed by repro):
  1. The AQE DPP subquery arrives as an unexecutable placeholder:
     CometExecRule wraps Spark's SubqueryAdaptiveBroadcastExec into
     CometSubqueryAdaptiveBroadcastExec, which
     CometPlanAdaptiveDynamicPruningFilters is meant to rewrite to an
     executable (Comet)SubqueryBroadcastExec.
  2. That rewrite is ORPHANED for a scan buried inside a native block:
     transformUp converts the scan, but the converted copy is dropped
     when the parent CometNativeExec is rebuilt (TreeNode.makeCopy can't
     carry @transient fields -- the apache#3510 issue; verified the converted
     node is not reachable in the rule's output). So the live scan keeps
     the placeholder, and Comet's native-scan subquery lifecycle
     (CometLeafExec.ensureSubqueriesResolved -> waitForSubqueries)
     executes it -> throw.

Fix (self-contained in CometDeltaNativeScanExec; no base-Comet change):
  * Override ensureSubqueriesResolved to resolve only executable DPP
    subqueries and skip adaptive-broadcast placeholders (so they don't
    execute and crash).
  * In applyDppFilters, convert a surviving placeholder on the fly to an
    executable SubqueryBroadcastExec (its `child` is the already-
    materialized broadcast) and resolve THAT to recover pruning values,
    falling back to scanning all tasks on any failure.

Results are always correct (the surrounding join filters regardless of
pruning). Real partition pruning applies when the scan recomputes
partitions at execution. Reproduced + guarded by CometDeltaDppReproSuite
(forces DPP via dynamicPartitionPruning.useStats=false).

Note: when the scan executes inside a parent native block, the parent's
findAllPlanData reads a planning-time perPartitionData snapshot (memoized
before the broadcast is ready, to keep numPartitions stable), so DPP
pruning is not yet applied in that path -- the scan reads all partitions
(correct, unpruned). Closing that requires emptying DPP-pruned partitions
at execution while keeping the partition count fixed; tracked separately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Durable record of deliberate tradeoffs (DPP pruning in native-block case,
credential plumbing gaps, path-based CDF decline, VARIANT, decline gates)
and pending regression failure families (row-tracking materialization +
untriaged), so each can be opened as a GitHub issue once the work merges.
Cross-references the guarding GAP-marker tests and the fixes landed so far.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ative-block) case

Builds on the no-crash fix (64cd878) to deliver actual partition pruning
when a DPP broadcast join / MERGE targets a partitioned Delta table and the
scan is buried inside a parent native block. Two root causes solved:

1. Orphaned rewrite (apache#3510). CometPlanAdaptiveDynamicPruningFilters rewrites
   the CometSubqueryAdaptiveBroadcastExec placeholder into an executable
   CometSubqueryBroadcastExec (with broadcast reuse), but the rewritten scan
   COPY was dropped when transformUp rebuilt the enclosing native block
   (TreeNode.makeCopy can't carry @transient fields -- verified the converted
   node was unreachable in the rule output). Fix: CometDeltaNativeScanExec
   implements `withDynamicPruningFilters` to install the rewrite IN PLACE via a
   transient side-channel (`dppFiltersOverride`) and return `this`, so it lands
   on the SAME instance that executes. The case-class `dppFilters` field is
   untouched, so node equality/canonicalization is unaffected. Added a minimal
   `dynamicPruningFilters`/`withDynamicPruningFilters` hook to the
   `CometScanWithPlanData` trait (default no-op) + a rule case; base scans
   (CometNativeScanExec/Iceberg) return Nil and are unaffected.

2. Fixed partition count vs runtime pruning. The native scan's partition count
   is pinned at planning. Fix: group ALL tasks once (`taskGroups =
   packTasks(allTasks)`) so the count is stable, then prune tasks WITHIN each
   group at execution -- a fully-pruned group becomes an empty DeltaScan
   (0 rows) but its partition slot remains. `perPartitionData` is recomputed
   (not memoized) so a parent block's findAllPlanData sees pruned task lists
   after the broadcast materializes; `numPartitions` reads `taskGroups.length`
   so counting never triggers broadcast resolution.

Residual safety net: if the rule didn't run, ensureSubqueriesResolved /
applyDppFilters skip the unexecutable placeholder and read all partitions
(correct, unpruned) instead of crashing.

CometDeltaDppReproSuite now asserts the fact scan reads ~120 of 2000 rows
(real pruning), not just correctness. Verified no regression across 56 contrib
tests (features/CDC/metadata/filter/column-mapping/native/partition/DPP).
Updated docs/08-known-limitations.md A1 -> FIXED.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Confirmed, faithful local reproductions for every still-failing Delta 4.1
own-suite family after the F1/F2/apache#198 fixes. Triage collapsed the ~70
original failures into three remaining root causes (the rest are F1/F2,
fixed):

* F3 row tracking (dominant): native scan synthesizes row IDs from
  base_row_id+row_index instead of reading the materialized stable
  columns (_row-id-col-*/_row-commit-version-col-*), so IDs change across
  any rewrite. Covers all rowid/RowTracking{Merge,Delete,Compaction,
  ReadWrite}Suite failures (z-order, auto-compact, MERGE row-id
  stability, DELETE-with-DV-disabled, optimized writes, materialized/
  conflicting columns). Repro: row IDs change across OPTIMIZE (v_0: 1->141).

* F4 protobuf recursion: a WHERE of ~101 AND'd predicates builds a deep
  boolean expression that exceeds protobuf's recursion limit (100) when
  serialized to the native plan -- InvalidProtocolBufferException "too many
  levels of nesting". From DataSkippingDeltaTests.

* F6 corrupted-file error compatibility (SC-8810): a 0-byte data file makes
  Comet's native reader throw "Requested range was invalid" instead of
  Spark's FAILED_READ_FILE.

CometDeltaPendingReproSuite holds one minimal repro per family, each
asserting the correct behavior and marked `ignore` so CI stays green (3
ignored, 0 failed); un-`ignore` to drive each fix. docs/08-known-limitations
updated with B3 triage + new B5 (F4) and B6 (F6) entries.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…rquet

Delta own-suite regression F3 (dominant remaining family -- all
rowid/RowTracking{Merge,Delete,Compaction,ReadWrite}Suite failures:
z-order, auto-compact, MERGE/DELETE row-id stability, optimized writes,
materialized columns).

Root cause: when a file is rewritten (OPTIMIZE/z-order/compaction/MERGE/
UPDATE) on a row-tracking table, Delta persists stable row IDs / commit
versions into real parquet columns `_row-id-col-<uuid>` /
`_row-commit-version-col-<uuid>`. The Spark plan reads
`coalesce(_metadata.row_id, base_row_id + row_index)`. The native scan
classified those names as synthetic and synthesised base_row_id+row_index
instead of reading the persisted values -- so after any rewrite the row IDs
changed (new file => new base+index) rather than staying stable.

Fix (CometDeltaNativeScan.convert): treat `_row-id-col-*` /
`_row-commit-version-col-*` as REAL parquet columns --
  * add them to the file data schema (materializedRowTrackingFields), read
    by name (null for files that don't carry them);
  * remove them from every synthetic classification (isSynthetic,
    isExtraSyntheticName, metadataColumnNamesEmitted, and the
    projection-vector isSyntheticFieldName).
The downstream coalesce then uses the persisted stable value when present
and falls back to base+index only when null. base_row_id / row_index /
default_row_commit_version stay synthesised; filter pushdown on the
materialized columns stays conservatively disabled.

Guard: CometDeltaRowTrackingMaterializedSuite (row IDs stable across
OPTIMIZE and UPDATE; materialised row_commit_version matches vanilla). F3
repro moved out of CometDeltaPendingReproSuite (F4/F6 remain). Verified no
regression across 55 contrib tests (features/CDC/metadata/column-mapping/
native/filter/scan-conf). Full RowTracking* Delta-suite verification pending
the next full regression re-run (per "don't re-run until fixes land").
docs/08-known-limitations B3 -> FIXED; B4 row-tracking families addressed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…rsion limit

CORE Comet change (not contrib-only).

Delta own-suite regression F4 (DataSkippingDeltaTests "remove redundant
stats column references"): a WHERE with ~200 conjuncts builds a left-deep
And chain. CometFilter serializes it as a left-deep BinaryExpr proto; when
the serialized plan is re-parsed on the JVM
(CometNativeExec.findShuffleScanIndices -> Operator.parseFrom) it exceeds
protobuf's default 100-level recursion limit ("Protocol message had too
many levels of nesting"). The Rust prost decoder is subject to the same
limit.

Fix: balance associative And/Or chains at serialization time so the proto
is O(log n) deep instead of O(n).
  * QueryPlanSerde.createBalancedBinaryExpr + flattenAssociative (new
    helpers next to createBinaryExpr).
  * CometAnd / CometOr flatten the chain and emit a balanced BinaryExpr
    tree instead of the natural left-deep one.
Comet evaluates And/Or vectorially (both sides always evaluated, no
row-level short-circuit), so rebalancing the associative chain is
semantically identical -- it only changes the proto shape. Fixes both the
JVM and Rust parse paths (vs only raising a JVM parse limit). The contrib
already balanced the scan PREDICATE for the same reason; this extends it to
the Filter operator's condition.

Verified: F4 repro passes (now a guard in CometDeltaPendingReproSuite); no
regression in base CometExpressionSuite (123 tests) or contrib
filter/feature/partition suites (27 tests). docs/08-known-limitations B5 ->
FIXED. Removed the F4-DIAG instrumentation from CometDeltaNativeScan.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CORE Comet change (not contrib-only).

Delta own-suite regression F6 (DeltaSuite "SC-8810: skipping deleted file
still throws on corrupted file"): a 0-byte/corrupted data file makes the
native reader fail in the object_store layer (before parquet parsing) with
"External: Generic LocalFileSystem error: Requested range was invalid".
CometExecIterator only wrapped messages starting with "Parquet error:" into
Spark's FAILED_READ_FILE.NO_HINT, so this object-store error surfaced as a
bare CometNativeException and the test's message assertion failed.

Fix: CometExecIterator.isFileReadError now also recognises object-store read
failures -- "Requested range was invalid" (truncated/empty file),
"Object at location ... not found", and the generic "Generic <Store> error:"
object_store format (LocalFileSystem/S3/GCS/...) -- and wraps them via
ShimSparkErrorConverter.wrapNativeParquetError, matching the
FAILED_READ_FILE.NO_HINT error Spark's own reader produces. Signatures are
file/object-store specific, so non-file native errors aren't mis-wrapped.

Also renamed CometDeltaPendingReproSuite -> CometDeltaEdgeCaseRegressionSuite
(all its repros -- F4, F6 -- are now fixed and serve as passing guards).

Verified: F6 + F4 repros pass; no regression across contrib native/feature/
edge-case suites (22 tests); base SparkErrorConverterSuite unaffected.
docs/08-known-limitations B6 -> FIXED.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
From the fresh-context branch review. Two blocking CORE findings + mediums/lows:

H1 (CORE, CometScanRule): the new FS-scheme allowlist ran on every V1
HadoopFsRelation native scan and excluded hdfs + custom libhdfs schemes,
disabling native scans for non-Delta HDFS users and breaking
ParquetReadFromFakeHadoopFsSuite (registers scheme `fake` via
COMET_LIBHDFS_SCHEMES). Fix: union the configured
`spark.hadoop.fs.comet.libhdfs.schemes` into the allowlist (case-insensitive).

H2 (contrib, core_glue.rs): data-filter pushdown was suppressed only for
emit_is_row_deleted, but row_index and row_id (= base_row_id + row_index,
unmaterialised) use the same physical-position running counter. A pushed
filter skips rows and decouples the counter -> wrong row_index/row_id
(repro: `id >= 50` gave row_id 0..49 for ids 50..99). Fix: suppress pushdown
when emit_is_row_deleted || emit_row_index || emit_row_id. Guarded by a new
test in CometDeltaRowTrackingMaterializedSuite (verified failing before).

M1 (CORE, CometExecIterator): the broad `Generic <Store> error:` match in
isFileReadError would mis-wrap the non-file config error "Generic
HadoopFileSystem error: Hdfs support is not enabled in this build" as
FAILED_READ_FILE, masking it. Dropped the broad match; the F6 case is still
covered by the specific "Requested range was invalid" / object-not-found
phrasings.

M2 (CORE, missing_file_tolerant.rs): IgnoreMissingFileSource.as_any returned
the wrapper, hiding ParquetSource from DataFusion downcasts (could disable
parquet-specific optimizations). Delegate to inner.as_any() (nothing
downcasts to the wrapper; source ops still flow through its trait methods).

M3: added a column-mapping id-mode + materialized row-tracking test (exercises
name-fallback for the no-field-id materialized columns).

L1: gate per-partition pruning on effectiveDppFilters (the rule's in-place
rewrite), not raw dppFilters. L2: mark dppFiltersOverride @volatile. L4:
materializeConstantColumnVector now handles TimestampNTZType (intervals
correctly still throw -- not valid constant/partition columns). L5: fix
CometScanWithPlanData scaladoc (Iceberg does NOT use the trait).

Verified: 34 contrib tests (row-tracking incl. H2/M3, edge-case F4/F6,
features/partition/filter) + base CometExpressionSuite (123) all pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tForSubqueries path)

The full-regression re-run surfaced the MergeInto "...isPartitioned: true"
tests still crashing with "CometSubqueryAdaptiveBroadcastExec ... does not
support the execute() code path" -- even though F1/apache#198 fixed the SELECT-join
case.

Root cause: the optimizer rule's DPP rewrite is installed in-place via a
transient `dppFiltersOverride` var (NOT a constructor field), so it is LOST
whenever the plan is COPIED after the rule runs -- which MERGE does (it
re-plans the target read internally). The executing scan then reverts to the
placeholder-bearing `dppFilters`. F1 only guarded the fused-block resolution
path (`ensureSubqueriesResolved`), but a MERGE target read executes the scan
as a native-block ROOT via the STANDARD lifecycle
(`CometNativeColumnarToRowExec` -> child.executeColumnar() ->
SparkPlan.waitForSubqueries()), which executed the placeholder and crashed.

Fix: override `waitForSubqueries` on CometDeltaNativeScanExec too, sharing
`resolveExecutableDppSubqueries()` with `ensureSubqueriesResolved` -- resolve
only executable DPP subqueries, skip adaptive-broadcast placeholders. The
native scan has no non-DPP subqueries, so not delegating to super is safe.
Result: crash-safe in both resolution paths; pruning still applies when the
in-place rewrite survives (SELECT joins), and a MERGE/re-planned scan reads
all partitions (correct -- the MERGE join filters -- just unpruned).

Added a MERGE-into-partitioned guard to CometDeltaDppReproSuite; existing
DPP/row-tracking/feature repros still pass. docs/08-known-limitations A1
updated with the transient-override / waitForSubqueries detail. Verification
of the real MergeIntoSuite tests is via the in-progress full regression
re-run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…bsent across concurrent file-groups)

Regression introduced by F3 (read materialized `_row-id-col-*` from parquet),
surfaced by the full Delta 4.1 run: RowTrackingMergeCommonNameBasedCDCOnSuite
"INSERT NOT MATCHED only MERGE" et al. failed -- a row-tracking table read with
`_metadata.row_id` after an INSERT/MERGE non-deterministically returned far fewer
rows than written (1600-4800 of 6000 across runs).

Root cause: the materialized `_row-id-col-<uuid>` column is physically present
only in files rewritten by a row-id-preserving op -- ABSENT from freshly
appended/inserted files (often absent from every file). F3 reads it as a parquet
data column. When one Spark partition packs several such files, core_glue emits
one file-group per file (needed for per-file row_index), and reading a column
physically absent from some files across the concurrently-executed file-groups
non-deterministically drops whole file-groups' rows. Forcing one file per Spark
partition reads the full row set correctly -- confirming cross-file-group
concurrency is the trigger, not the null-fill value.

Fix: CometDeltaNativeScan.createExec sets oneTaskPerPartition=true when the scan
reads materialized row-tracking columns, so each such file is its own Spark
partition => each native plan is single-file-group => the absent-column null-fill
runs without cross-file-group concurrency. Same mechanism already used for
input_file_name().

Guard: CometDeltaRowTrackingMergeReproSuite (INSERT-only MERGE; native key set ==
vanilla, full count). Verified against RowTrackingMergeCommonNameBasedCDCOnSuite
(17/17 pass). docs/08-known-limitations B7.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant