diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index 0b2257a6bc..4ea630fd88 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -118,6 +118,22 @@ public static RowRanges createSingle(long rowCount) { return new RowRanges(new Range(0L, rowCount - 1L)); } + /** + * Creates an immutable {@link RowRanges} with the single closed range {@code [from, to]}. + * Used by the Approach 2 (micro-row-group) reader path to express a logical micro-row-group's + * absolute row range against a shared physical column chunk. + * + * @param from inclusive first row index (must be non-negative) + * @param to inclusive last row index (must be {@code >= from}) + * @return an immutable {@link RowRanges} representing {@code [from, to]} + */ + public static RowRanges createBetween(long from, long to) { + if (from < 0 || to < from) { + throw new IllegalArgumentException("Invalid row range [" + from + ", " + to + ']'); + } + return new RowRanges(new Range(from, to)); + } + /** * Creates a mutable RowRanges object with the following ranges: *
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java
index 9c6b9f737c..41ec4edc56 100644
--- a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java
@@ -152,4 +152,38 @@ public void testIntersection() {
     assertAllRowsEqual(intersection(empty, ranges2).iterator());
     assertAllRowsEqual(intersection(empty, empty).iterator());
   }
+
+  @Test
+  public void testCreateBetween() {
+    // Single-element range
+    RowRanges single = RowRanges.createBetween(42L, 42L);
+    assertEquals(1L, single.rowCount());
+    assertAllRowsEqual(single.iterator(), 42L);
+
+    // Multi-element range starting at zero (matches createSingle semantics)
+    RowRanges fromZero = RowRanges.createBetween(0L, 4L);
+    assertEquals(5L, fromZero.rowCount());
+    assertAllRowsEqual(fromZero.iterator(), 0L, 1L, 2L, 3L, 4L);
+    assertEquals(
+        RowRanges.createSingle(5L).getRanges().toString(),
+        fromZero.getRanges().toString());
+
+    // Multi-element range with non-zero (file-absolute) start, the Approach 2 use case
+    RowRanges absolute = RowRanges.createBetween(100_000L, 100_004L);
+    assertEquals(5L, absolute.rowCount());
+    assertAllRowsEqual(absolute.iterator(), 100_000L, 100_001L, 100_002L, 100_003L, 100_004L);
+    assertTrue(absolute.isOverlapping(100_002L, 100_003L));
+    assertFalse(absolute.isOverlapping(99_000L, 99_999L));
+    assertFalse(absolute.isOverlapping(100_005L, 100_010L));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCreateBetweenRejectsNegativeFrom() {
+    RowRanges.createBetween(-1L, 0L);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCreateBetweenRejectsInvertedRange() {
+    RowRanges.createBetween(10L, 5L);
+  }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index e0b0d76e0e..c3bba9a989 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -1140,7 +1140,14 @@ public PageReadStore readRowGroup(int blockIndex) throws IOException {
   public PageReadStore readNextRowGroup() throws IOException {
     ColumnChunkPageReadStore rowGroup = null;
     try {
-      rowGroup = internalReadRowGroup(currentBlock);
+      // Approach 2 (micro-row-group) dispatch: if this block's column chunks are physically
+      // shared with other blocks, use a path that locates pages via OffsetIndex and slices
+      // rows via an absolute RowRanges.
+      if (currentBlock < blocks.size() && blocks.get(currentBlock).isApproach2()) {
+        rowGroup = internalReadApproach2RowGroup(currentBlock);
+      } else {
+        rowGroup = internalReadRowGroup(currentBlock);
+      }
     } catch (ParquetEmptyBlockException e) {
       LOG.warn("Read empty block at index {} from {}", currentBlock, getFile());
       advanceToNextBlock();
@@ -1199,6 +1206,209 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE
     return rowGroup;
   }
 
+  /**
+   * Reads a row group whose column chunks are physically shared with other blocks via the
+   * Approach 2 (micro-row-group) format extension. See
+   * {@link org.apache.parquet.hadoop.metadata.BlockMetaData#isApproach2()}.
+   *
+   * 

For each column the page list is taken straight from the block's OffsetIndex sidecar + * — the writer is responsible for emitting an OffsetIndex that contains exactly the + * pages whose absolute row range overlaps this block's {@code [rowIndexOffset, + * rowIndexOffset + rowCount)} window, with {@code PageLocation.first_row_index} as a + * file-absolute row index. The dictionary page (if any) is located via + * {@link org.apache.parquet.hadoop.metadata.ColumnChunkMetaData#getDictionaryPageOffset()} + * (which the spec leaves valid even when {@code data_page_offset == -1}), and its body is + * sized by reading its page header in-band. + * + *

The returned {@link ColumnChunkPageReadStore} carries a {@link RowRanges} covering + * {@code [rowIndexOffset, rowIndexOffset + rowCount - 1]} in absolute coordinates, so the + * downstream {@link org.apache.parquet.column.impl.SynchronizingColumnReader} discards + * rows that fall outside this block's window when a physical page spans into the next + * block. + * + *

Prototype-grade limitations: + *

+ * These are acceptable for a correctness-first prototype and are called out in the + * accompanying design plan. + * + * @param blockIndex the index of the Approach 2 block to read + * @return a {@link ColumnChunkPageReadStore} whose pages cover the block's absolute row + * range, or {@code null} if {@code blockIndex} is out of range + */ + private ColumnChunkPageReadStore internalReadApproach2RowGroup(int blockIndex) throws IOException { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + return null; + } + BlockMetaData block = blocks.get(blockIndex); + if (block.getRowCount() == 0) { + throw new ParquetEmptyBlockException("Illegal row group of 0 rows"); + } + if (block.getRowIndexOffset() < 0) { + throw new IOException( + "Approach 2 block must declare an absolute rowIndexOffset; block " + blockIndex + " has none"); + } + + final long absFrom = block.getRowIndexOffset(); + final long absTo = absFrom + block.getRowCount() - 1L; + final RowRanges rowRanges = RowRanges.createBetween(absFrom, absTo); + final ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges, absFrom); + + // Use the absolute upper bound (absTo + 1) as the ChunkListBuilder's rowCount: the + // resulting Chunk passes it to ColumnChunkPageReader which uses it as the upper bound + // for OffsetIndex#getLastRowIndex on the final page. Under Approach 2 the OffsetIndex + // stores absolute first_row_index values, so the upper bound must also be absolute. + final long absoluteUpperBound = absTo + 1L; + final ChunkListBuilder builder = new ChunkListBuilder(absoluteUpperBound); + final List allParts = new ArrayList<>(); + final Map dictPagesByDescriptor = new HashMap<>(); + + final ColumnIndexStore ciStore = getColumnIndexStore(blockIndex); + for (ColumnChunkMetaData mc : block.getColumns()) { + ColumnPath pathKey = mc.getPath(); + ColumnDescriptor columnDescriptor = paths.get(pathKey); + if (columnDescriptor == null) { + continue; + } + if (!mc.isPhysicallyShared()) { + throw new IOException("Approach 2 block " + blockIndex + " mixes physically-shared and legacy columns; " + + "mixed-mode blocks are not supported by this prototype"); + } + + OffsetIndex offsetIndex = ciStore.getOffsetIndex(pathKey); + if (offsetIndex == null) { + throw new IOException("Approach 2 column missing OffsetIndex: " + pathKey); + } + + // Build per-page byte ranges. Each page gets its own ChunkDescriptor; consecutive + // pages collapse into a single ConsecutivePartList for vectored IO. + ConsecutivePartList currentParts = null; + for (int i = 0, n = offsetIndex.getPageCount(); i < n; i++) { + long off = offsetIndex.getOffset(i); + int len = offsetIndex.getCompressedPageSize(i); + BenchmarkCounter.incrementTotalBytes(len); + if (currentParts == null || currentParts.endPos() != off) { + currentParts = new ConsecutivePartList(off); + allParts.add(currentParts); + } + ChunkDescriptor cd = new ChunkDescriptor(columnDescriptor, mc, off, len); + currentParts.addChunk(cd); + builder.setOffsetIndex(cd, offsetIndex); + } + + // Dictionary page is located via getDictionaryPageOffset() (still valid under the + // sentinel per spec); read its bytes out-of-band because its compressed size is not + // in the OffsetIndex. + long dictOffset = mc.getDictionaryPageOffset(); + if (dictOffset > 0) { + DictionaryPage dictPage = readDictionaryPageDirect(dictOffset); + if (dictPage != null) { + // Stash by a synthetic descriptor that won't clash with data-page descriptors + // (descriptors equal by ColumnDescriptor; we encode a unique fileOffset/size + // pair to keep the map well-formed). + dictPagesByDescriptor.put(new ChunkDescriptor(columnDescriptor, mc, -dictOffset, 0), dictPage); + } + } + } + + if (!allParts.isEmpty()) { + readAllPartsVectoredOrNormal(allParts, builder); + } + rowGroup.setReleaser(builder.releaser); + + // Each Chunk now wraps one column's data-page bytes (concatenated in OffsetIndex order) + // plus its OffsetIndex; readAllPages parses the bytes and returns a ColumnChunkPageReader. + // For Approach 2 we splice in the separately-read dictionary page if present. + for (Chunk chunk : builder.build()) { + ColumnChunkPageReader dataPagesReader = chunk.readAllPages(); + DictionaryPage dictPage = null; + for (Map.Entry e : dictPagesByDescriptor.entrySet()) { + if (e.getKey().col.equals(chunk.descriptor.col)) { + dictPage = e.getValue(); + break; + } + } + if (dictPage == null) { + rowGroup.addColumn(chunk.descriptor.col, dataPagesReader); + } else { + rowGroup.addColumn( + chunk.descriptor.col, + new ColumnChunkPageReader( + options.getCodecFactory().getDecompressor(chunk.descriptor.metadata.getCodec()), + drainDataPagesQueue(dataPagesReader), + dictPage, + chunk.offsetIndex, + chunk.rowCount, + null /* blockDecryptor */, + null /* fileAAD */, + block.getOrdinal(), + 0 /* columnOrdinal — not used in non-encrypted prototype */, + options)); + } + } + + return rowGroup; + } + + /** + * Reads a single dictionary page directly from {@code f} at {@code dictOffset}. Used by + * the Approach 2 reader path because the compressed dictionary page size is not in the + * OffsetIndex; we discover it by parsing the page header in-band. + * + * @param dictOffset absolute file offset of the dictionary page header + * @return the compressed {@link DictionaryPage}, or {@code null} if the page header + * indicates this is not a dictionary page (shouldn't happen for a well-formed file) + */ + private DictionaryPage readDictionaryPageDirect(long dictOffset) throws IOException { + f.seek(dictOffset); + PageHeader header = Util.readPageHeader(f); + if (header.type != org.apache.parquet.format.PageType.DICTIONARY_PAGE) { + LOG.warn("Expected DICTIONARY_PAGE at offset {} but got {}", dictOffset, header.type); + return null; + } + int compressedSize = header.getCompressed_page_size(); + int uncompressedSize = header.getUncompressed_page_size(); + DictionaryPageHeader dicHeader = header.getDictionary_page_header(); + ByteBuffer buf = options.getAllocator().allocate(compressedSize); + try { + f.readFully(buf); + buf.flip(); + DictionaryPage page = new DictionaryPage( + org.apache.parquet.bytes.BytesInput.from(buf), + uncompressedSize, + dicHeader.getNum_values(), + converter.getEncoding(dicHeader.getEncoding())); + if (header.isSetCrc()) { + page.setCrc(header.getCrc()); + } + return page; + } catch (RuntimeException | IOException e) { + options.getAllocator().release(buf); + throw e; + } + } + + /** + * Drains all data pages out of a {@link ColumnChunkPageReader} returned by + * {@code Chunk.readAllPages()} so we can rebuild it with a different dictionary page. + * The returned list preserves page order. The {@code dataPagesReader} should not be + * used after this call. + */ + private static List drainDataPagesQueue(ColumnChunkPageReader dataPagesReader) { + List drained = new ArrayList<>(); + DataPage p; + while ((p = dataPagesReader.readPage()) != null) { + drained.add(p); + } + return drained; + } + /** * Reads all the columns requested from the specified row group. It may skip specific pages based on the column * indexes according to the actual filter. As the rows are not aligned among the pages of the different columns row diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PhysicalChunkPageSource.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PhysicalChunkPageSource.java new file mode 100644 index 0000000000..86a5551c61 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PhysicalChunkPageSource.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.util.ArrayList; +import java.util.List; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; + +/** + * Cached, per-physical-column-chunk page state used by the Approach 2 (micro-row-group) + * reader path. + * + *

An Approach 2 file represents the row groups in its footer as logical + * micro-row-groups that share a common physical column chunk per column. Multiple + * {@link org.apache.parquet.hadoop.metadata.BlockMetaData} entries point at overlapping + * page ranges in their {@link OffsetIndex} sidecars, and a single physical {@link DataPage} + * may straddle the boundary between adjacent micro-row-groups. + * + *

This class is the in-memory model of that physical column chunk after IO + page-header + * parse. It holds: + *

    + *
  • The compressed {@link DictionaryPage} (or {@code null} if the column has none).
  • + *
  • The full, ordered list of compressed {@link DataPage}s for the chunk.
  • + *
  • The OffsetIndex that describes those pages with file-absolute + * {@code first_row_index} values.
  • + *
  • The {@link ColumnChunkMetaData} from the first physical row group in the group, + * used for codec lookup and other shared properties.
  • + *
+ * + *

Sharing across micro-row-groups: each visiting {@code readNextRowGroup()} call hands + * the caller a fresh {@link org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader} + * built from a slice of this source's page list (only the pages that overlap the + * visiting block's row range). The {@code DataPage} objects themselves are shared by + * reference; their {@link org.apache.parquet.bytes.BytesInput} payloads are re-readable + * because they wrap underlying {@code ByteBuffer}s rather than consuming a single stream. + * + *

The compressed dictionary page is shared by reference too. Each micro-row-group's + * column reader re-decompresses and re-decodes it independently — that is the + * known prototype-grade inefficiency called out in the Approach 2 plan; a production + * implementation would memoize the decoded {@link org.apache.parquet.column.Dictionary}. + * + *

Instances are constructed at file open by {@link ParquetFileReader} when an + * Approach 2 file is detected (any block returns {@code true} from + * {@link org.apache.parquet.hadoop.metadata.BlockMetaData#isApproach2()}). + */ +final class PhysicalChunkPageSource { + + /** + * Column chunk metadata from the first physical row group in the group. Used by the + * caller to resolve codec, encoding stats, and (when present) the compressed + * dictionary page's location during IO planning. Note that + * {@link ColumnChunkMetaData#isPhysicallyShared()} returns {@code true} on this + * instance; callers must not use {@link ColumnChunkMetaData#getStartingPos()} or + * {@link ColumnChunkMetaData#getTotalSize()} for IO planning. + */ + private final ColumnChunkMetaData metadata; + + /** + * The full ordered list of compressed data pages for this physical column chunk. + * Each {@link DataPage} carries a re-readable {@link org.apache.parquet.bytes.BytesInput}. + */ + private final List compressedPages; + + /** + * The compressed dictionary page, or {@code null} if this column chunk has none. + */ + private final DictionaryPage compressedDictionaryPage; + + /** + * OffsetIndex covering every page in {@link #compressedPages}. {@code firstRowIndex(i)} + * values are file-absolute under Approach 2. + */ + private final OffsetIndex absoluteOffsetIndex; + + PhysicalChunkPageSource( + ColumnChunkMetaData metadata, + List compressedPages, + DictionaryPage compressedDictionaryPage, + OffsetIndex absoluteOffsetIndex) { + this.metadata = metadata; + this.compressedPages = List.copyOf(compressedPages); + this.compressedDictionaryPage = compressedDictionaryPage; + this.absoluteOffsetIndex = absoluteOffsetIndex; + } + + ColumnChunkMetaData getMetadata() { + return metadata; + } + + DictionaryPage getCompressedDictionaryPage() { + return compressedDictionaryPage; + } + + OffsetIndex getAbsoluteOffsetIndex() { + return absoluteOffsetIndex; + } + + int getPageCount() { + return compressedPages.size(); + } + + /** + * A slice of {@link PhysicalChunkPageSource} pages restricted to those that overlap an + * absolute row range. The {@link OffsetIndex} returned by {@link #getOffsetIndex()} is + * indexed 0..{@code pages.size()-1} and matches {@link #getPages()} entry-for-entry, so + * a {@code ColumnChunkPageReader} built from this slice can index either by ordinal + * without confusion. + */ + static final class PageSlice { + private final List pages; + private final OffsetIndex offsetIndex; + + private PageSlice(List pages, OffsetIndex offsetIndex) { + this.pages = pages; + this.offsetIndex = offsetIndex; + } + + List getPages() { + return pages; + } + + OffsetIndex getOffsetIndex() { + return offsetIndex; + } + } + + /** + * Returns the slice of pages whose absolute row span intersects the closed range + * {@code [fromAbsoluteRow, toAbsoluteRow]}. The returned slice's {@link OffsetIndex} + * is a 0-based view into the surviving subset, so callers can pass it to + * {@code ColumnChunkPageReader} without further translation. + * + *

A page is included if any row index in its {@code [firstRowIndex, lastRowIndex]} + * span falls inside {@code [fromAbsoluteRow, toAbsoluteRow]}. The OffsetIndex must be + * non-null and its entries must be sorted by {@code firstRowIndex}. + * + * @param fromAbsoluteRow inclusive lower bound, file-absolute + * @param toAbsoluteRow inclusive upper bound, file-absolute + * @return the filtered page slice, possibly empty but never {@code null} + */ + PageSlice sliceForRowRange(long fromAbsoluteRow, long toAbsoluteRow) { + if (toAbsoluteRow < fromAbsoluteRow) { + return new PageSlice(List.of(), new SlicedOffsetIndex(absoluteOffsetIndex, new int[0])); + } + if (absoluteOffsetIndex == null) { + throw new IllegalStateException( + "Approach 2 column chunk has no OffsetIndex; cannot slice pages by row range"); + } + final int n = absoluteOffsetIndex.getPageCount(); + final long upperRowSentinel = toAbsoluteRow + 1L; + final List resultPages = new ArrayList<>(); + final List keptIndexes = new ArrayList<>(); + for (int i = 0; i < n; i++) { + final long pageFirst = absoluteOffsetIndex.getFirstRowIndex(i); + // Last row of page i is firstRow(i+1) - 1 when there is a next page; for the final + // page the source has no easy upper bound, so we treat it as open-ended and trust + // the SynchronizingColumnReader to stop at toAbsoluteRow. + final long pageLast = (i + 1 < n) ? absoluteOffsetIndex.getFirstRowIndex(i + 1) - 1L : Long.MAX_VALUE; + if (pageLast < fromAbsoluteRow) { + continue; + } + if (pageFirst >= upperRowSentinel) { + break; + } + resultPages.add(compressedPages.get(i)); + keptIndexes.add(i); + } + int[] indexMap = new int[keptIndexes.size()]; + for (int k = 0; k < indexMap.length; k++) { + indexMap[k] = keptIndexes.get(k); + } + return new PageSlice(resultPages, new SlicedOffsetIndex(absoluteOffsetIndex, indexMap)); + } + + /** + * A read-only view onto another {@link OffsetIndex} that exposes only the entries at + * positions listed in {@code indexMap}. Used to align the sliced page list with a fresh + * {@code ColumnChunkPageReader}, which addresses its OffsetIndex by sequential page + * ordinal. + */ + private static final class SlicedOffsetIndex implements OffsetIndex { + private final OffsetIndex source; + private final int[] indexMap; + + SlicedOffsetIndex(OffsetIndex source, int[] indexMap) { + this.source = source; + this.indexMap = indexMap; + } + + @Override + public int getPageCount() { + return indexMap.length; + } + + @Override + public long getOffset(int pageIndex) { + return source.getOffset(indexMap[pageIndex]); + } + + @Override + public int getCompressedPageSize(int pageIndex) { + return source.getCompressedPageSize(indexMap[pageIndex]); + } + + @Override + public long getFirstRowIndex(int pageIndex) { + return source.getFirstRowIndex(indexMap[pageIndex]); + } + + @Override + public long getLastRowIndex(int pageIndex, long totalRowCount) { + int srcIdx = indexMap[pageIndex]; + int nextIdx = srcIdx + 1; + return (nextIdx >= source.getPageCount() ? totalRowCount : source.getFirstRowIndex(nextIdx)) - 1L; + } + + @Override + public int getPageOrdinal(int pageIndex) { + return source.getPageOrdinal(indexMap[pageIndex]); + } + + @Override + public java.util.Optional getUnencodedByteArrayDataBytes(int pageIndex) { + return source.getUnencodedByteArrayDataBytes(indexMap[pageIndex]); + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java index e1fe9b894c..5f3b5a801a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java @@ -107,12 +107,34 @@ public List getColumns() { } /** - * @return the starting pos of first column + * @return the starting pos of first column, or {@link ColumnChunkMetaData#SENTINEL_OFFSET} + * if this block is part of an Approach 2 (micro-row-group) physical group and + * its columns are physically shared with other blocks. */ public long getStartingPos() { return getColumns().get(0).getStartingPos(); } + /** + * @return {@code true} if every column in this block is physically shared with other + * blocks via the Approach 2 (micro-row-group) format extension. When this + * returns {@code true}, the block's pages must be located via per-column + * {@code OffsetIndex} sidecars and a single physical IO covers multiple blocks + * of the same physical group. Returns {@code false} for legacy (contiguous) + * column chunks. Empty blocks return {@code false}. + */ + public boolean isApproach2() { + if (columns.isEmpty()) { + return false; + } + for (ColumnChunkMetaData col : columns) { + if (!col.isPhysicallyShared()) { + return false; + } + } + return true; + } + @Override public String toString() { String rowIndexOffsetStr = ""; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java index 4ba52dec2c..85d9ed9a0a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java @@ -48,6 +48,17 @@ * Column meta data for a block stored in the file footer and passed in the InputSplit */ public abstract class ColumnChunkMetaData { + /** + * Sentinel value used for {@link #getFirstDataPageOffset()} and the on-disk + * {@code data_page_offset} / {@code ColumnChunk.file_offset} fields to mark + * a column chunk as physically shared with other logical row groups + * (Approach 2 of the micro-row-group format extension). When set, the + * column chunk's pages are owned by a physical group that spans multiple + * {@link BlockMetaData} entries, and page locations must be looked up via + * the {@code OffsetIndex} sidecar rather than derived from this metadata. + */ + public static final long SENTINEL_OFFSET = -1L; + protected int rowGroupOrdinal = -1; @Deprecated @@ -308,12 +319,18 @@ public int getRowGroupOrdinal() { } /** - * @return the offset of the first byte in the chunk + * @return the offset of the first byte in the chunk, or {@link #SENTINEL_OFFSET} if this + * column chunk is physically shared (see {@link #isPhysicallyShared()}). Callers + * that need an actual byte offset must consult the column's {@code OffsetIndex} + * in that case. */ public long getStartingPos() { decryptIfNeeded(); long dictionaryPageOffset = getDictionaryPageOffset(); long firstDataPageOffset = getFirstDataPageOffset(); + if (firstDataPageOffset == SENTINEL_OFFSET) { + return SENTINEL_OFFSET; + } if (dictionaryPageOffset > 0 && dictionaryPageOffset < firstDataPageOffset) { // if there's a dictionary and it's before the first data page, start from there return dictionaryPageOffset; @@ -321,6 +338,27 @@ public long getStartingPos() { return firstDataPageOffset; } + /** + * @return {@code true} if this column chunk's pages are not contiguous in the file + * and must be located via the {@code OffsetIndex} sidecar (Approach 2 of the + * micro-row-group format extension). When {@code true}, callers must not use + * {@link #getStartingPos()} or {@link #getTotalSize()} to plan a single IO over + * the column chunk. + * + *

This probe must not trigger decryption: Approach 2 interacts with the + * encrypted-metadata path in ways that are out of scope for this prototype, so + * encrypted column chunks always return {@code false}. This also keeps the cheap + * dispatch in {@link org.apache.parquet.hadoop.ParquetFileReader#readNextRowGroup()} + * from throwing on plaintext-footer-encrypted-columns files when no decryptor is + * configured. + */ + public boolean isPhysicallyShared() { + if (isEncrypted()) { + return false; + } + return getFirstDataPageOffset() == SENTINEL_OFFSET; + } + /** * checks that a positive long value fits in an int. * (reindexed on Integer.MIN_VALUE) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestApproach2BlockMetaData.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestApproach2BlockMetaData.java new file mode 100644 index 0000000000..a3b3b2d2fb --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestApproach2BlockMetaData.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.metadata; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.HashSet; +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.junit.Test; + +/** + * Tests for {@link BlockMetaData#isApproach2()} sentinel detection. Approach 2 (micro-row-group) + * blocks are detected by checking that every {@link ColumnChunkMetaData} in the block reports + * {@link ColumnChunkMetaData#isPhysicallyShared()} as {@code true}. + */ +public class TestApproach2BlockMetaData { + + @Test + public void testEmptyBlockIsNotApproach2() { + BlockMetaData block = new BlockMetaData(); + assertFalse(block.isApproach2()); + } + + @Test + public void testAllSentinelBlockIsApproach2() { + BlockMetaData block = new BlockMetaData(); + block.addColumn(buildColumn("a", ColumnChunkMetaData.SENTINEL_OFFSET)); + block.addColumn(buildColumn("b", ColumnChunkMetaData.SENTINEL_OFFSET)); + assertTrue(block.isApproach2()); + } + + @Test + public void testMixedSentinelBlockIsNotApproach2() { + BlockMetaData block = new BlockMetaData(); + block.addColumn(buildColumn("a", ColumnChunkMetaData.SENTINEL_OFFSET)); + block.addColumn(buildColumn("b", 100L)); + assertFalse(block.isApproach2()); + } + + @Test + public void testLegacyBlockIsNotApproach2() { + BlockMetaData block = new BlockMetaData(); + block.addColumn(buildColumn("a", 100L)); + block.addColumn(buildColumn("b", 200L)); + assertFalse(block.isApproach2()); + } + + private static ColumnChunkMetaData buildColumn(String name, long firstDataPage) { + return ColumnChunkMetaData.get( + ColumnPath.get(name), + BINARY, + CompressionCodecName.UNCOMPRESSED, + new HashSet<>(Collections.emptySet()), + new BinaryStatistics(), + firstDataPage, + 0L, + 0L, + 0L, + 0L); + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java index 99da0fa7fb..9850736f92 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java @@ -20,6 +20,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.HashSet; @@ -67,6 +68,23 @@ public void testConversionNeg() { assertEquals(neg, md.getFirstDataPageOffset()); } + @Test + public void testSentinelIsPhysicallyShared() { + // Approach 2 (micro-row-group) sentinel: firstDataPage == -1 marks the column as + // physically shared with other blocks. + ColumnChunkMetaData md = newMD(ColumnChunkMetaData.SENTINEL_OFFSET); + assertTrue(md.isPhysicallyShared()); + assertEquals(ColumnChunkMetaData.SENTINEL_OFFSET, md.getStartingPos()); + } + + @Test + public void testLegacyIsNotPhysicallyShared() { + // A normal column chunk must not look like the Approach 2 sentinel. + ColumnChunkMetaData md = newMD(100L); + assertFalse(md.isPhysicallyShared()); + assertEquals(100L, md.getStartingPos()); + } + private ColumnChunkMetaData newMD(long big) { Set e = new HashSet(); PrimitiveTypeName t = BINARY;