diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index 0b2257a6bc..4ea630fd88 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -118,6 +118,22 @@ public static RowRanges createSingle(long rowCount) { return new RowRanges(new Range(0L, rowCount - 1L)); } + /** + * Creates an immutable {@link RowRanges} with the single closed range {@code [from, to]}. + * Used by the Approach 2 (micro-row-group) reader path to express a logical micro-row-group's + * absolute row range against a shared physical column chunk. + * + * @param from inclusive first row index (must be non-negative) + * @param to inclusive last row index (must be {@code >= from}) + * @return an immutable {@link RowRanges} representing {@code [from, to]} + */ + public static RowRanges createBetween(long from, long to) { + if (from < 0 || to < from) { + throw new IllegalArgumentException("Invalid row range [" + from + ", " + to + ']'); + } + return new RowRanges(new Range(from, to)); + } + /** * Creates a mutable RowRanges object with the following ranges: *
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java
index 9c6b9f737c..41ec4edc56 100644
--- a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java
@@ -152,4 +152,38 @@ public void testIntersection() {
assertAllRowsEqual(intersection(empty, ranges2).iterator());
assertAllRowsEqual(intersection(empty, empty).iterator());
}
+
+ @Test
+ public void testCreateBetween() {
+ // Single-element range
+ RowRanges single = RowRanges.createBetween(42L, 42L);
+ assertEquals(1L, single.rowCount());
+ assertAllRowsEqual(single.iterator(), 42L);
+
+ // Multi-element range starting at zero (matches createSingle semantics)
+ RowRanges fromZero = RowRanges.createBetween(0L, 4L);
+ assertEquals(5L, fromZero.rowCount());
+ assertAllRowsEqual(fromZero.iterator(), 0L, 1L, 2L, 3L, 4L);
+ assertEquals(
+ RowRanges.createSingle(5L).getRanges().toString(),
+ fromZero.getRanges().toString());
+
+ // Multi-element range with non-zero (file-absolute) start, the Approach 2 use case
+ RowRanges absolute = RowRanges.createBetween(100_000L, 100_004L);
+ assertEquals(5L, absolute.rowCount());
+ assertAllRowsEqual(absolute.iterator(), 100_000L, 100_001L, 100_002L, 100_003L, 100_004L);
+ assertTrue(absolute.isOverlapping(100_002L, 100_003L));
+ assertFalse(absolute.isOverlapping(99_000L, 99_999L));
+ assertFalse(absolute.isOverlapping(100_005L, 100_010L));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCreateBetweenRejectsNegativeFrom() {
+ RowRanges.createBetween(-1L, 0L);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCreateBetweenRejectsInvertedRange() {
+ RowRanges.createBetween(10L, 5L);
+ }
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index e0b0d76e0e..c3bba9a989 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -1140,7 +1140,14 @@ public PageReadStore readRowGroup(int blockIndex) throws IOException {
public PageReadStore readNextRowGroup() throws IOException {
ColumnChunkPageReadStore rowGroup = null;
try {
- rowGroup = internalReadRowGroup(currentBlock);
+ // Approach 2 (micro-row-group) dispatch: if this block's column chunks are physically
+ // shared with other blocks, use a path that locates pages via OffsetIndex and slices
+ // rows via an absolute RowRanges.
+ if (currentBlock < blocks.size() && blocks.get(currentBlock).isApproach2()) {
+ rowGroup = internalReadApproach2RowGroup(currentBlock);
+ } else {
+ rowGroup = internalReadRowGroup(currentBlock);
+ }
} catch (ParquetEmptyBlockException e) {
LOG.warn("Read empty block at index {} from {}", currentBlock, getFile());
advanceToNextBlock();
@@ -1199,6 +1206,209 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE
return rowGroup;
}
+ /**
+ * Reads a row group whose column chunks are physically shared with other blocks via the
+ * Approach 2 (micro-row-group) format extension. See
+ * {@link org.apache.parquet.hadoop.metadata.BlockMetaData#isApproach2()}.
+ *
+ * For each column the page list is taken straight from the block's OffsetIndex sidecar
+ * — the writer is responsible for emitting an OffsetIndex that contains exactly the
+ * pages whose absolute row range overlaps this block's {@code [rowIndexOffset,
+ * rowIndexOffset + rowCount)} window, with {@code PageLocation.first_row_index} as a
+ * file-absolute row index. The dictionary page (if any) is located via
+ * {@link org.apache.parquet.hadoop.metadata.ColumnChunkMetaData#getDictionaryPageOffset()}
+ * (which the spec leaves valid even when {@code data_page_offset == -1}), and its body is
+ * sized by reading its page header in-band.
+ *
+ *
The returned {@link ColumnChunkPageReadStore} carries a {@link RowRanges} covering
+ * {@code [rowIndexOffset, rowIndexOffset + rowCount - 1]} in absolute coordinates, so the
+ * downstream {@link org.apache.parquet.column.impl.SynchronizingColumnReader} discards
+ * rows that fall outside this block's window when a physical page spans into the next
+ * block.
+ *
+ *
Prototype-grade limitations:
+ *
An Approach 2 file represents the row groups in its footer as logical + * micro-row-groups that share a common physical column chunk per column. Multiple + * {@link org.apache.parquet.hadoop.metadata.BlockMetaData} entries point at overlapping + * page ranges in their {@link OffsetIndex} sidecars, and a single physical {@link DataPage} + * may straddle the boundary between adjacent micro-row-groups. + * + *
This class is the in-memory model of that physical column chunk after IO + page-header + * parse. It holds: + *
Sharing across micro-row-groups: each visiting {@code readNextRowGroup()} call hands + * the caller a fresh {@link org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader} + * built from a slice of this source's page list (only the pages that overlap the + * visiting block's row range). The {@code DataPage} objects themselves are shared by + * reference; their {@link org.apache.parquet.bytes.BytesInput} payloads are re-readable + * because they wrap underlying {@code ByteBuffer}s rather than consuming a single stream. + * + *
The compressed dictionary page is shared by reference too. Each micro-row-group's + * column reader re-decompresses and re-decodes it independently — that is the + * known prototype-grade inefficiency called out in the Approach 2 plan; a production + * implementation would memoize the decoded {@link org.apache.parquet.column.Dictionary}. + * + *
Instances are constructed at file open by {@link ParquetFileReader} when an
+ * Approach 2 file is detected (any block returns {@code true} from
+ * {@link org.apache.parquet.hadoop.metadata.BlockMetaData#isApproach2()}).
+ */
+final class PhysicalChunkPageSource {
+
+ /**
+ * Column chunk metadata from the first physical row group in the group. Used by the
+ * caller to resolve codec, encoding stats, and (when present) the compressed
+ * dictionary page's location during IO planning. Note that
+ * {@link ColumnChunkMetaData#isPhysicallyShared()} returns {@code true} on this
+ * instance; callers must not use {@link ColumnChunkMetaData#getStartingPos()} or
+ * {@link ColumnChunkMetaData#getTotalSize()} for IO planning.
+ */
+ private final ColumnChunkMetaData metadata;
+
+ /**
+ * The full ordered list of compressed data pages for this physical column chunk.
+ * Each {@link DataPage} carries a re-readable {@link org.apache.parquet.bytes.BytesInput}.
+ */
+ private final List A page is included if any row index in its {@code [firstRowIndex, lastRowIndex]}
+ * span falls inside {@code [fromAbsoluteRow, toAbsoluteRow]}. The OffsetIndex must be
+ * non-null and its entries must be sorted by {@code firstRowIndex}.
+ *
+ * @param fromAbsoluteRow inclusive lower bound, file-absolute
+ * @param toAbsoluteRow inclusive upper bound, file-absolute
+ * @return the filtered page slice, possibly empty but never {@code null}
+ */
+ PageSlice sliceForRowRange(long fromAbsoluteRow, long toAbsoluteRow) {
+ if (toAbsoluteRow < fromAbsoluteRow) {
+ return new PageSlice(List.of(), new SlicedOffsetIndex(absoluteOffsetIndex, new int[0]));
+ }
+ if (absoluteOffsetIndex == null) {
+ throw new IllegalStateException(
+ "Approach 2 column chunk has no OffsetIndex; cannot slice pages by row range");
+ }
+ final int n = absoluteOffsetIndex.getPageCount();
+ final long upperRowSentinel = toAbsoluteRow + 1L;
+ final List This probe must not trigger decryption: Approach 2 interacts with the
+ * encrypted-metadata path in ways that are out of scope for this prototype, so
+ * encrypted column chunks always return {@code false}. This also keeps the cheap
+ * dispatch in {@link org.apache.parquet.hadoop.ParquetFileReader#readNextRowGroup()}
+ * from throwing on plaintext-footer-encrypted-columns files when no decryptor is
+ * configured.
+ */
+ public boolean isPhysicallyShared() {
+ if (isEncrypted()) {
+ return false;
+ }
+ return getFirstDataPageOffset() == SENTINEL_OFFSET;
+ }
+
/**
* checks that a positive long value fits in an int.
* (reindexed on Integer.MIN_VALUE)
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestApproach2BlockMetaData.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestApproach2BlockMetaData.java
new file mode 100644
index 0000000000..a3b3b2d2fb
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestApproach2BlockMetaData.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashSet;
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.junit.Test;
+
+/**
+ * Tests for {@link BlockMetaData#isApproach2()} sentinel detection. Approach 2 (micro-row-group)
+ * blocks are detected by checking that every {@link ColumnChunkMetaData} in the block reports
+ * {@link ColumnChunkMetaData#isPhysicallyShared()} as {@code true}.
+ */
+public class TestApproach2BlockMetaData {
+
+ @Test
+ public void testEmptyBlockIsNotApproach2() {
+ BlockMetaData block = new BlockMetaData();
+ assertFalse(block.isApproach2());
+ }
+
+ @Test
+ public void testAllSentinelBlockIsApproach2() {
+ BlockMetaData block = new BlockMetaData();
+ block.addColumn(buildColumn("a", ColumnChunkMetaData.SENTINEL_OFFSET));
+ block.addColumn(buildColumn("b", ColumnChunkMetaData.SENTINEL_OFFSET));
+ assertTrue(block.isApproach2());
+ }
+
+ @Test
+ public void testMixedSentinelBlockIsNotApproach2() {
+ BlockMetaData block = new BlockMetaData();
+ block.addColumn(buildColumn("a", ColumnChunkMetaData.SENTINEL_OFFSET));
+ block.addColumn(buildColumn("b", 100L));
+ assertFalse(block.isApproach2());
+ }
+
+ @Test
+ public void testLegacyBlockIsNotApproach2() {
+ BlockMetaData block = new BlockMetaData();
+ block.addColumn(buildColumn("a", 100L));
+ block.addColumn(buildColumn("b", 200L));
+ assertFalse(block.isApproach2());
+ }
+
+ private static ColumnChunkMetaData buildColumn(String name, long firstDataPage) {
+ return ColumnChunkMetaData.get(
+ ColumnPath.get(name),
+ BINARY,
+ CompressionCodecName.UNCOMPRESSED,
+ new HashSet<>(Collections.emptySet()),
+ new BinaryStatistics(),
+ firstDataPage,
+ 0L,
+ 0L,
+ 0L,
+ 0L);
+ }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java
index 99da0fa7fb..9850736f92 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java
@@ -20,6 +20,7 @@
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.HashSet;
@@ -67,6 +68,23 @@ public void testConversionNeg() {
assertEquals(neg, md.getFirstDataPageOffset());
}
+ @Test
+ public void testSentinelIsPhysicallyShared() {
+ // Approach 2 (micro-row-group) sentinel: firstDataPage == -1 marks the column as
+ // physically shared with other blocks.
+ ColumnChunkMetaData md = newMD(ColumnChunkMetaData.SENTINEL_OFFSET);
+ assertTrue(md.isPhysicallyShared());
+ assertEquals(ColumnChunkMetaData.SENTINEL_OFFSET, md.getStartingPos());
+ }
+
+ @Test
+ public void testLegacyIsNotPhysicallyShared() {
+ // A normal column chunk must not look like the Approach 2 sentinel.
+ ColumnChunkMetaData md = newMD(100L);
+ assertFalse(md.isPhysicallyShared());
+ assertEquals(100L, md.getStartingPos());
+ }
+
private ColumnChunkMetaData newMD(long big) {
Set