Skip to content

[Spark] Capture and emit aggregated data-file (Parquet footer) metrics at commit time #16675

@gtrettenero

Description

@gtrettenero

Proposed Change

Proposal: Capture and emit aggregated data-file (Parquet footer) metrics at commit time

Feature Request / Improvement

Provide an opt-in mechanism to capture aggregated physical/storage statistics directly from Parquet footers during a write, and surface them through Iceberg's existing event framework at commit time — without persisting them in table metadata.

Motivation

Iceberg infers column-level metrics (value_counts, null_value_counts, nan_value_counts, lower_bounds, upper_bounds) and stores them per-DataFile in manifests. Because these metrics live in manifest entries, collecting them for every column of a wide table bloats manifests and slows scan planning. To bound this, Iceberg caps inferred metrics at the first N columns:

write.metadata.metrics.max-inferred-column-defaults = 100 (default)

This cap is the right trade-off for query-planning metadata, but it creates a gap for observability / storage-health use cases:

  1. Wide tables (>100 columns) get no inferred metrics for columns beyond the cap unless each is configured explicitly. There is no aggregate visibility into the storage footprint of those columns.
  2. Physical/storage statistics aren't captured at all in metadata — e.g. per-column compressed vs. uncompressed size, encoding/codec usage, dictionary/bloom-filter presence, and file-size distribution (percentiles, small-file counts). Today the only way to get these is to re-open and re-read files after the fact, which is expensive and runs on stale data.
  3. Raising the metrics cap to get more coverage directly worsens metadata bloat and planning latency — the exact problem the cap exists to prevent.

The key insight: metrics needed for query planning and metrics needed for operational observability have different lifetimes and storage requirements. Observability metrics don't need to live in manifests forever; they're most valuable at write time, aggregated, and emitted to a monitoring system. Decoupling the two lets us collect rich, uncapped, aggregate statistics cheaply without touching the metadata hot path.

Proposed Solution

Add an opt-in write-path feature that, during a Spark write:

  1. Reads Parquet footers for the data files produced by each task (footers are already on the writer; this avoids a separate read pass).
  2. Aggregates statistics map-side, grouped by partition and by column field-id, so per-file payloads never accumulate on the driver.
  3. Bridges the aggregated result into the commit and publishes it as a new event through org.apache.iceberg.events.Listeners, so any deployment can register a listener and route the metrics to its own monitoring/observability system.

Because the metrics flow through the event framework rather than into TableMetadata/manifests, there is no metadata bloat and no column cap, and the commit path's persisted state is unchanged.

High-level implementation details

Executor-side capture (Spark write path)

  • A new opt-in table property gates the behavior; when disabled (the default), there is zero overhead.
  • After each task's DataWriter.commit(), the produced DataFiles' Parquet footers are read via FileIO/InputFile (with a small adapter from Iceberg's SeekableInputStream to Parquet's InputFile).
  • For each footer, statistics are extracted per column chunk and mapped back to Iceberg field-ids (via the Parquet schema's field ids, so the data is schema-evolution safe), then folded into a partition-keyed aggregate.

Map-side aggregation via a Spark AccumulatorV2

  • Aggregates are accumulated into a custom accumulator and merged on the driver. To avoid driver heap pressure on writes producing very large numbers of partitions, the accumulator:
    • merges executor-local aggregates directly (no per-file objects retained), and
    • returns an empty snapshot from value() so Spark does not retain O(numTasks) deep copies in TaskInfo accumulables; the real data is consumed once, at commit, via an explicit drain.

Commit-time emission

  • Just before commitOperation(), the merged aggregate is finalized (file-size percentiles, small-file counts) and handed to the commit via a thread-local bridge, then published as an event during the synchronous commit notification.
  • The bridge is cleared on commit success, on CommitStateUnknownException, and on any other failure, so nothing leaks across commits.
  • The core event type stays free of Parquet/Spark types (the payload is carried in a serialized, engine-agnostic form), keeping the engine boundary clean.

Metrics captured (aggregated per partition + per column)

  • Row counts and total record counts.
  • File counts, total file size, and a file-size distribution: average, p50/p75/p90/p95/p99, fixed size buckets, and a configurable small-file count.
  • Per-column (by field-id): compressed size, uncompressed size, value count, null count, codec (or "mixed"), and dictionary-page / bloom-filter presence.

Configuration (new table properties)
write.data-file-metrics.enabled (default: false)
write.data-file-metrics.small-file-threshold-bytes (default: 128 KB)

Why the event framework instead of metadata

  • No manifest/TableMetadata changes → no planning-path impact and no spec change.
  • No column cap → aggregate coverage for arbitrarily wide tables.
  • Metrics are available in real time at commit, computed from footers already produced by the write.
  • Fully opt-in and pluggable: deployments that don't register a listener pay nothing.

Scope / non-goals

  • Initial implementation targets the Spark write path (SparkWrite); Flink and others could follow the same pattern.
  • Parquet only initially (footer-based); ORC/Avro could be added later.
  • This does not replace Iceberg's existing inferred metrics used for pruning — it's a complementary, observability-focused channel.

Open questions for discussion

  1. Is the events framework the right surface, or should this be a dedicated metrics/reporting interface (similar in spirit to MetricsReporter for scans, but for writes)?
  2. Naming/placement of the new property and event type.
  3. Should the aggregation granularity (per-partition, per-column) and the file-size bucket boundaries be configurable?
  4. Appetite for extending beyond Parquet, and beyond Spark, in follow-ups.

Proposal document

No response

Specifications

  • Table
  • View
  • REST
  • Puffin
  • Encryption
  • Other

Metadata

Metadata

Assignees

No one assigned

    Labels

    proposalIceberg Improvement Proposal (spec/major changes/etc)

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions