Add PuffinWriter for writing deletion vectors#3474
Conversation
Verify pyiceberg's PuffinFile reader can parse deletion vectors written by Spark. Uses coalesce(1) to force Spark to create DVs instead of COW.
PuffinFile reads only the serialized vector, skipping a blob's length prefix, deletion-vector magic and CRC-32, so the round-trip tests never exercise that framing. Add coverage for review items agreed on the original PR (apache#2822) that were not yet asserted by any test: - Assert the blob `fields` is [2147483645] (Java MetadataColumns.ROW_POSITION, INT_MAX - 2), required for Java/Spark interoperability (raised by @ebyhr). - Assert the deletion-vector blob framing at the byte level: the length prefix, the deletion-vector magic, and the CRC-32 over magic + vector.
| self._blobs = [] | ||
| self._blob_payloads = [] | ||
|
|
||
| # 1. Create bitmaps from positions |
There was a problem hiding this comment.
nit: I would avoid using number prefixes. When we want to add a new operation, we need to adjust the subsequent numbers.
| # Calculate the cardinality from the bitmaps | ||
| cardinality = sum(len(bm) for bm in bitmaps.values()) |
There was a problem hiding this comment.
nit: A comment for a simple single line seems excessive. It's evident when we read the code.
| @pytest.mark.integration | ||
| def test_read_spark_written_puffin_dv(spark: SparkSession, session_catalog: RestCatalog) -> None: | ||
| """Verify pyiceberg can read Puffin DVs written by Spark.""" | ||
| identifier = "default.spark_puffin_format_test" |
There was a problem hiding this comment.
This PR introduces support for write operations, so we're interested in verifying that Spark can read Puffin files written by PyIceberg. There are no requested changes for now. I suppose this PR is a preparatory change, and we'll need another PR to use it during the write operations.
There was a problem hiding this comment.
Exactly right, this PR is preparatory. PyIceberg does not yet have a write path that commits DVs as delete files, so a Spark-reads-PyIceberg interop test is not possible in isolation. As follow-ups, I plan to (1) extend PuffinWriter to support one blob per referenced data file and expose per-blob offset/length for content_offset/content_size_in_bytes, and (2) wire it into the merge-on-read branch of Transaction.delete() for v3 tables (toward #1078), where the Spark-reads-PyIceberg interop test will live.
| class PuffinWriter: | ||
| _blobs: list[PuffinBlobMetadata] | ||
| _blob_payloads: list[bytes] | ||
| _created_by: str | None |
There was a problem hiding this comment.
Could you please set the default value for the _created_by field using PyIceberg version {version}? You can obtain the version by using importlib.metadata.version.
There was a problem hiding this comment.
Good idea. Done in 4ecfd18, the default is now PyIceberg version {importlib.metadata.version("pyiceberg")}.
| @@ -0,0 +1,93 @@ | |||
| # Licensed to the Apache Software Foundation (ASF) under one | |||
There was a problem hiding this comment.
This test passes without the changes made in this PR. Could you please extract a PR that adding this test?
- Default created-by footer property to 'PyIceberg version {version}'
- Move the Spark interop reader test to a separate PR
- Remove numbered and self-evident comments
- Name the row position field id constant
- Validate positions in set_blob (non-negative, non-empty)
- Simplify blob framing and finish() assembly
|
|
||
|
|
||
| class PuffinWriter: | ||
| """Writes a Puffin file containing a single deletion-vector-v1 blob.""" |
There was a problem hiding this comment.
This comment looks misleading. This writer doesn't write a file in my understanding.
There was a problem hiding this comment.
You are right, it didn't. Addressed in eb81422 together with the suggestion below: PuffinWriter now accepts an OutputFile and finish() writes the file, so the docstring matches the behavior now.
| _blob_payloads: list[bytes] | ||
| _created_by: str | ||
|
|
||
| def __init__(self, created_by: str | None = None) -> None: |
There was a problem hiding this comment.
What about accepting an OutputFile or something, and writing the content to it? I think this is a better approach than returning bytes. Iceberg Java PuffinWriter also accepts an output file object.
There was a problem hiding this comment.
Good idea, done in eb81422. PuffinWriter now takes an OutputFile and finish() writes the content to it and returns the file size, following the Java PuffinWriter shape. One simplification compared to Java: the file is assembled in memory and written in one shot rather than streamed, which should be fine for DVs since they are small. Happy to revisit with a streaming implementation if needed.
Part of #2261. Continues #2822.
Rationale for this change
This adds a
PuffinWriterfor writing Puffin files containingdeletion-vector-v1blobs — the first building block for deletion-vector write support in PyIceberg (tracking issue #2261).It revives #2822 by @rambleraptor (with @glesperance's Spark interop test), which was auto-closed by the stale bot rather than on merit. The original work — including all review feedback already addressed there (@ebyhr, @geruh) — is preserved commit-for-commit.
On top of that, this PR adds unit tests for two agreed review items that were not yet asserted by any test:
fieldsvalue[2147483645](JavaMetadataColumns.ROW_POSITION, INT_MAX - 2), required for Java/Spark interoperability; andPuffinFilereader skips, so the round-trip tests did not previously exercise it.As in the original PR, this is intentionally scoped to the writer + tests so we can agree on the write semantics before wiring it into the delete/manifest writers and the merge-on-read path. Per the original review discussion, the writer expects the caller to provide one merged deletion vector per data file.
Are these changes tested?
Yes:
tests/table/test_puffin.py).tests/integration/test_puffin_spark_interop.py, by @glesperance).Are there any user-facing changes?
No.
PuffinWriteris a new internal building block and is not yet wired into any public write path.