From 5f2e4ba5de614eef0d20b80ee638b619e4771fa2 Mon Sep 17 00:00:00 2001 From: "xuan.zhao" Date: Thu, 23 Apr 2026 18:16:33 +0800 Subject: [PATCH 1/2] feat(puffin): add puffin file reader and writer - PuffinWriter: in-memory writer that builds complete Puffin files - Add() writes blobs with optional compression - Finish() serializes footer with JSON metadata - Tracks BlobMetadata for all written blobs - PuffinReader: in-memory reader that parses Puffin files - ReadFileMetadata() parses footer and validates magic bytes - ReadBlob() reads and decompresses individual blobs - ReadAll() reads all blobs from metadata - Expose Compress/Decompress as public API in puffin_format.h - Register new sources in CMake and Meson build systems - Add comprehensive tests including Java binary compatibility --- src/iceberg/CMakeLists.txt | 4 +- src/iceberg/meson.build | 2 + src/iceberg/puffin/meson.build | 8 +- src/iceberg/puffin/puffin_format.cc | 24 +- src/iceberg/puffin/puffin_format.h | 10 + src/iceberg/puffin/puffin_reader.cc | 202 ++++++++++ src/iceberg/puffin/puffin_reader.h | 85 ++++ src/iceberg/puffin/puffin_writer.cc | 176 ++++++++ src/iceberg/puffin/puffin_writer.h | 106 +++++ src/iceberg/test/CMakeLists.txt | 3 +- src/iceberg/test/meson.build | 6 +- src/iceberg/test/puffin_reader_writer_test.cc | 379 ++++++++++++++++++ 12 files changed, 989 insertions(+), 16 deletions(-) create mode 100644 src/iceberg/puffin/puffin_reader.cc create mode 100644 src/iceberg/puffin/puffin_reader.h create mode 100644 src/iceberg/puffin/puffin_writer.cc create mode 100644 src/iceberg/puffin/puffin_writer.h create mode 100644 src/iceberg/test/puffin_reader_writer_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 7be689f8d..18cf70bdb 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -175,7 +175,9 @@ set(ICEBERG_DATA_SOURCES deletes/roaring_position_bitmap.cc puffin/file_metadata.cc puffin/json_serde.cc - puffin/puffin_format.cc) + puffin/puffin_format.cc + puffin/puffin_reader.cc + puffin/puffin_writer.cc) set(ICEBERG_DATA_STATIC_BUILD_INTERFACE_LIBS) set(ICEBERG_DATA_SHARED_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 48b5d4250..483d8f796 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -157,6 +157,8 @@ iceberg_data_sources = files( 'puffin/file_metadata.cc', 'puffin/json_serde.cc', 'puffin/puffin_format.cc', + 'puffin/puffin_reader.cc', + 'puffin/puffin_writer.cc', ) # CRoaring does not export symbols, so on Windows it must diff --git a/src/iceberg/puffin/meson.build b/src/iceberg/puffin/meson.build index 7869d7b2c..7f30468db 100644 --- a/src/iceberg/puffin/meson.build +++ b/src/iceberg/puffin/meson.build @@ -16,6 +16,12 @@ # under the License. install_headers( - ['file_metadata.h', 'puffin_format.h', 'type_fwd.h'], + [ + 'file_metadata.h', + 'puffin_format.h', + 'puffin_reader.h', + 'puffin_writer.h', + 'type_fwd.h', + ], subdir: 'iceberg/puffin', ) diff --git a/src/iceberg/puffin/puffin_format.cc b/src/iceberg/puffin/puffin_format.cc index 88807d0ca..88d378f04 100644 --- a/src/iceberg/puffin/puffin_format.cc +++ b/src/iceberg/puffin/puffin_format.cc @@ -36,6 +36,18 @@ constexpr std::pair GetFlagPosition(PuffinFlag flag) { std::unreachable(); } +} // namespace + +bool IsFlagSet(std::span flags, PuffinFlag flag) { + auto [byte_num, bit_num] = GetFlagPosition(flag); + return (flags[byte_num] & (1 << bit_num)) != 0; +} + +void SetFlag(std::span flags, PuffinFlag flag) { + auto [byte_num, bit_num] = GetFlagPosition(flag); + flags[byte_num] |= (1 << bit_num); +} + // TODO(zhaoxuan1994): Move compression logic to a unified codec interface. Result> Compress(PuffinCompressionCodec codec, std::span input) { @@ -63,16 +75,4 @@ Result> Decompress(PuffinCompressionCodec codec, std::unreachable(); } -} // namespace - -bool IsFlagSet(std::span flags, PuffinFlag flag) { - auto [byte_num, bit_num] = GetFlagPosition(flag); - return (flags[byte_num] & (1 << bit_num)) != 0; -} - -void SetFlag(std::span flags, PuffinFlag flag) { - auto [byte_num, bit_num] = GetFlagPosition(flag); - flags[byte_num] |= (1 << bit_num); -} - } // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_format.h b/src/iceberg/puffin/puffin_format.h index e5ecf9003..b3b5f10de 100644 --- a/src/iceberg/puffin/puffin_format.h +++ b/src/iceberg/puffin/puffin_format.h @@ -23,8 +23,10 @@ /// Puffin file format constants and utilities. #include +#include #include #include +#include #include "iceberg/iceberg_data_export.h" #include "iceberg/puffin/file_metadata.h" @@ -66,4 +68,12 @@ ICEBERG_DATA_EXPORT bool IsFlagSet(std::span flags, PuffinFlag /// \brief Set a flag in the flags bytes. ICEBERG_DATA_EXPORT void SetFlag(std::span flags, PuffinFlag flag); +/// \brief Compress data using the specified codec. +ICEBERG_DATA_EXPORT Result> Compress( + PuffinCompressionCodec codec, std::span input); + +/// \brief Decompress data using the specified codec. +ICEBERG_DATA_EXPORT Result> Decompress( + PuffinCompressionCodec codec, std::span input); + } // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_reader.cc b/src/iceberg/puffin/puffin_reader.cc new file mode 100644 index 000000000..6d6ec0ccb --- /dev/null +++ b/src/iceberg/puffin/puffin_reader.cc @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_reader.h" + +#include +#include +#include +#include +#include + +#include "iceberg/file_io.h" +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +namespace { + +// Validate magic bytes in a buffer at offset 0. +Status CheckMagic(std::span data) { + if (static_cast(data.size()) < PuffinFormat::kMagicLength) { + return Invalid("Invalid file: buffer too small for magic"); + } + auto* begin = reinterpret_cast(data.data()); + if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) { + return Invalid( + "Invalid file: expected magic, got [{:#04x}, {:#04x}, {:#04x}, {:#04x}]", + begin[0], begin[1], begin[2], begin[3]); + } + return {}; +} + +// Validate that no unknown flag bits are set. +Status CheckUnknownFlags(std::span flags) { + constexpr uint8_t kKnownBitsMask = 0x01; + if ((flags[0] & ~kKnownBitsMask) != 0 || flags[1] != 0 || flags[2] != 0 || + flags[3] != 0) { + return Invalid( + "Invalid file: unknown footer flags set [{:#04x}, {:#04x}, {:#04x}, {:#04x}]", + flags[0], flags[1], flags[2], flags[3]); + } + return {}; +} + +} // namespace + +PuffinReader::PuffinReader(std::unique_ptr stream, int64_t file_size, + std::optional known_footer_size) + : stream_(std::move(stream)), + file_size_(file_size), + known_footer_size_(known_footer_size) {} + +PuffinReader::~PuffinReader() = default; + +Result> PuffinReader::Make( + std::unique_ptr input_file, std::optional footer_size) { + if (!input_file) { + return InvalidArgument("input_file must not be null"); + } + ICEBERG_ASSIGN_OR_RAISE(auto file_size, input_file->Size()); + ICEBERG_ASSIGN_OR_RAISE(auto stream, input_file->Open()); + return std::unique_ptr( + new PuffinReader(std::move(stream), file_size, footer_size)); +} + +Result> PuffinReader::ReadBytes(int64_t offset, int64_t length) { + if (offset < 0 || length < 0 || offset > file_size_ || length > file_size_ - offset) { + return Invalid("Read out of bounds: offset {} + length {} exceeds file size {}", + offset, length, file_size_); + } + std::vector buf(length); + ICEBERG_RETURN_UNEXPECTED(stream_->ReadFully(offset, buf)); + return buf; +} + +Result PuffinReader::ReadFileMetadata() { + if (file_size_ < PuffinFormat::kFooterStructLength) { + return Invalid("Invalid file: file length {} is less than minimal footer size {}", + file_size_, PuffinFormat::kFooterStructLength); + } + + // Validate header magic + ICEBERG_ASSIGN_OR_RAISE(auto header_bytes, ReadBytes(0, PuffinFormat::kMagicLength)); + ICEBERG_RETURN_UNEXPECTED(CheckMagic(header_bytes)); + + // Read footer struct from end of file + auto footer_struct_offset = file_size_ - PuffinFormat::kFooterStructLength; + ICEBERG_ASSIGN_OR_RAISE( + auto footer_struct, + ReadBytes(footer_struct_offset, PuffinFormat::kFooterStructLength)); + + // Validate footer end magic + std::span footer_end_magic( + footer_struct.data() + PuffinFormat::kFooterStructMagicOffset, + PuffinFormat::kMagicLength); + ICEBERG_RETURN_UNEXPECTED(CheckMagic(footer_end_magic)); + + // Read payload size + auto payload_size = ReadLittleEndian( + footer_struct.data() + PuffinFormat::kFooterStructPayloadSizeOffset); + + if (payload_size < 0) { + return Invalid("Invalid file: negative payload size {}", payload_size); + } + + // Calculate total footer size and validate + int64_t footer_size = PuffinFormat::kFooterStartMagicLength + + static_cast(payload_size) + + PuffinFormat::kFooterStructLength; + auto footer_offset = file_size_ - footer_size; + if (footer_offset < 0) { + return Invalid("Invalid file: footer size {} exceeds file size {}", footer_size, + file_size_); + } + + // Validate footer start magic + ICEBERG_ASSIGN_OR_RAISE(auto footer_start_magic, + ReadBytes(footer_offset, PuffinFormat::kMagicLength)); + ICEBERG_RETURN_UNEXPECTED(CheckMagic(footer_start_magic)); + + // Check flags + std::array flags{}; + std::memcpy(flags.data(), footer_struct.data() + PuffinFormat::kFooterStructFlagsOffset, + 4); + ICEBERG_RETURN_UNEXPECTED(CheckUnknownFlags(flags)); + + PuffinCompressionCodec footer_compression = PuffinCompressionCodec::kNone; + if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) { + footer_compression = PuffinFormat::kDefaultFooterCompressionCodec; + } + + // Read and decompress footer payload + auto payload_offset = footer_offset + PuffinFormat::kFooterStartMagicLength; + ICEBERG_ASSIGN_OR_RAISE(auto payload_bytes, ReadBytes(payload_offset, payload_size)); + ICEBERG_ASSIGN_OR_RAISE(auto decompressed, + Decompress(footer_compression, payload_bytes)); + + // Parse JSON + std::string_view json_str(reinterpret_cast(decompressed.data()), + decompressed.size()); + return FileMetadataFromJsonString(json_str); +} + +Result>> PuffinReader::ReadBlob( + const BlobMetadata& blob_metadata) { + if (blob_metadata.offset < 0 || blob_metadata.length < 0 || + blob_metadata.offset > file_size_ || + blob_metadata.length > file_size_ - blob_metadata.offset) { + return Invalid("Invalid blob: offset {} + length {} exceeds file size {}", + blob_metadata.offset, blob_metadata.length, file_size_); + } + + ICEBERG_ASSIGN_OR_RAISE(auto raw_data, + ReadBytes(blob_metadata.offset, blob_metadata.length)); + + ICEBERG_ASSIGN_OR_RAISE( + auto codec, PuffinCompressionCodecFromName(blob_metadata.compression_codec)); + ICEBERG_ASSIGN_OR_RAISE(auto decompressed, Decompress(codec, raw_data)); + + return std::pair{blob_metadata, std::move(decompressed)}; +} + +Result>>> +PuffinReader::ReadAll(const std::vector& blobs) { + // Sort by offset for sequential I/O access pattern + std::vector sorted; + sorted.reserve(blobs.size()); + for (const auto& blob : blobs) { + sorted.push_back(&blob); + } + std::ranges::sort(sorted, + [](const auto* a, const auto* b) { return a->offset < b->offset; }); + + std::vector>> results; + results.reserve(blobs.size()); + for (const auto* blob : sorted) { + ICEBERG_ASSIGN_OR_RAISE(auto blob_pair, ReadBlob(*blob)); + results.push_back(std::move(blob_pair)); + } + return results; +} + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_reader.h b/src/iceberg/puffin/puffin_reader.h new file mode 100644 index 000000000..bc1bfb9a9 --- /dev/null +++ b/src/iceberg/puffin/puffin_reader.h @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/puffin/puffin_reader.h +/// Puffin file reader. + +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_data_export.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/result.h" + +namespace iceberg { +class InputFile; +class SeekableInputStream; +} // namespace iceberg + +namespace iceberg::puffin { + +/// \brief Reader for Puffin files. +/// +/// Reads from an InputFile with seek support for efficient blob access. +class ICEBERG_DATA_EXPORT PuffinReader { + public: + /// \brief Create a PuffinReader for the given input file. + /// \param input_file The input file to read from. + /// \param footer_size Optional known footer size hint to avoid an extra seek. + static Result> Make( + std::unique_ptr input_file, + std::optional footer_size = std::nullopt); + + ~PuffinReader(); + + /// \brief Read and return the file metadata from the footer. + Result ReadFileMetadata(); + + /// \brief Read a specific blob's data by its metadata. + /// \param blob_metadata The metadata describing the blob to read. + /// \return A pair of (BlobMetadata, decompressed data), or an error. + Result>> ReadBlob( + const BlobMetadata& blob_metadata); + + /// \brief Read all blobs described in the file metadata. + /// \return A vector of (BlobMetadata, decompressed data) pairs, or an error. + Result>>> ReadAll( + const std::vector& blobs); + + private: + PuffinReader(std::unique_ptr stream, int64_t file_size, + std::optional known_footer_size); + + /// Opened input stream. + std::unique_ptr stream_; + /// Total file size. + int64_t file_size_; + /// Known footer size hint (avoids one seek if provided). + std::optional known_footer_size_; + + Result> ReadBytes(int64_t offset, int64_t length); +}; + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_writer.cc b/src/iceberg/puffin/puffin_writer.cc new file mode 100644 index 000000000..9b646f167 --- /dev/null +++ b/src/iceberg/puffin/puffin_writer.cc @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_writer.h" + +#include + +#include "iceberg/file_io.h" +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +PuffinWriter::PuffinWriter(std::unique_ptr stream, + std::unordered_map properties, + PuffinCompressionCodec default_codec, bool compress_footer) + : stream_(std::move(stream)), + properties_(std::move(properties)), + default_codec_(default_codec), + compress_footer_(compress_footer) {} + +PuffinWriter::~PuffinWriter() = default; + +Result> PuffinWriter::Make( + std::unique_ptr output_file, + std::unordered_map properties, + PuffinCompressionCodec default_codec, bool compress_footer) { + if (!output_file) { + return InvalidArgument("output_file must not be null"); + } + ICEBERG_ASSIGN_OR_RAISE(auto stream, output_file->CreateOrOverwrite()); + return std::unique_ptr(new PuffinWriter( + std::move(stream), std::move(properties), default_codec, compress_footer)); +} + +Status PuffinWriter::WriteBytes(std::span data) { + return stream_->Write(data); +} + +Status PuffinWriter::WriteMagic() { + const auto& magic = PuffinFormat::kMagicV1; + return WriteBytes(std::span( + reinterpret_cast(magic.data()), magic.size())); +} + +Status PuffinWriter::WriteHeader() { + if (header_written_) return {}; + ICEBERG_RETURN_UNEXPECTED(WriteMagic()); + header_written_ = true; + return {}; +} + +Result PuffinWriter::Write(const Blob& blob) { + if (finished_) { + return Invalid("Writer already finished"); + } + + ICEBERG_RETURN_UNEXPECTED(WriteHeader()); + + auto codec = blob.requested_compression.value_or(default_codec_); + std::span input_span( + reinterpret_cast(blob.data.data()), blob.data.size()); + ICEBERG_ASSIGN_OR_RAISE(auto compressed, Compress(codec, input_span)); + + ICEBERG_ASSIGN_OR_RAISE(auto offset, stream_->Position()); + ICEBERG_RETURN_UNEXPECTED( + WriteBytes(std::span(compressed.data(), compressed.size()))); + auto length = static_cast(compressed.size()); + + auto codec_name = CodecName(codec); + BlobMetadata metadata{ + .type = blob.type, + .input_fields = blob.input_fields, + .snapshot_id = blob.snapshot_id, + .sequence_number = blob.sequence_number, + .offset = offset, + .length = length, + .compression_codec = std::string(codec_name), + .properties = blob.properties, + }; + written_blobs_metadata_.push_back(metadata); + return metadata; +} + +Status PuffinWriter::Finish() { + if (finished_) { + return Invalid("Writer already finished"); + } + + ICEBERG_RETURN_UNEXPECTED(WriteHeader()); + + FileMetadata file_metadata{ + .blobs = written_blobs_metadata_, + .properties = properties_, + }; + + auto footer_json = ToJsonString(file_metadata); + std::vector footer_payload( + reinterpret_cast(footer_json.data()), + reinterpret_cast(footer_json.data() + footer_json.size())); + + // Compress footer if requested + std::array flags{}; + if (compress_footer_) { + ICEBERG_ASSIGN_OR_RAISE( + footer_payload, + Compress(PuffinFormat::kDefaultFooterCompressionCodec, footer_payload)); + SetFlag(flags, PuffinFlag::kFooterPayloadCompressed); + } + + // Footer start magic + ICEBERG_ASSIGN_OR_RAISE(auto footer_start, stream_->Position()); + ICEBERG_RETURN_UNEXPECTED(WriteMagic()); + + // Footer payload + ICEBERG_RETURN_UNEXPECTED(WriteBytes(footer_payload)); + + // Footer struct: payload_size (4) + flags (4) + magic (4) + auto payload_size = static_cast(footer_payload.size()); + std::array size_buf{}; + WriteLittleEndian(payload_size, size_buf.data()); + ICEBERG_RETURN_UNEXPECTED(WriteBytes(size_buf)); + + // Flags + ICEBERG_RETURN_UNEXPECTED(WriteBytes(std::span( + reinterpret_cast(flags.data()), flags.size()))); + + // Footer end magic + ICEBERG_RETURN_UNEXPECTED(WriteMagic()); + + ICEBERG_ASSIGN_OR_RAISE(auto end_pos, stream_->Position()); + footer_size_ = end_pos - footer_start; + file_size_ = end_pos; + finished_ = true; + + ICEBERG_RETURN_UNEXPECTED(stream_->Flush()); + return stream_->Close(); +} + +const std::vector& PuffinWriter::written_blobs_metadata() const { + return written_blobs_metadata_; +} + +Result PuffinWriter::footer_size() const { + if (!finished_) { + return Invalid("Writer not finished yet"); + } + return footer_size_; +} + +Result PuffinWriter::file_size() const { + if (!finished_) { + return Invalid("Writer not finished yet"); + } + return file_size_; +} + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_writer.h b/src/iceberg/puffin/puffin_writer.h new file mode 100644 index 000000000..ace2dbf99 --- /dev/null +++ b/src/iceberg/puffin/puffin_writer.h @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/puffin/puffin_writer.h +/// Puffin file writer. + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_data_export.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/result.h" + +namespace iceberg { +class OutputFile; +class PositionOutputStream; +} // namespace iceberg + +namespace iceberg::puffin { + +/// \brief Writer for Puffin files. +/// +/// Writes blobs and footer to an OutputFile stream. +class ICEBERG_DATA_EXPORT PuffinWriter { + public: + /// \brief Create a PuffinWriter for the given output file. + /// \param output_file The output file to write to. + /// \param properties File-level properties to include in the footer. + /// \param default_codec Default compression codec for blobs. + /// \param compress_footer Whether to compress the footer payload. + static Result> Make( + std::unique_ptr output_file, + std::unordered_map properties = {}, + PuffinCompressionCodec default_codec = PuffinCompressionCodec::kNone, + bool compress_footer = false); + + ~PuffinWriter(); + + /// \brief Write a blob and return its metadata. + Result Write(const Blob& blob); + + /// \brief Finalize the file by writing the footer and closing the stream. + Status Finish(); + + /// \brief Get metadata for all blobs written so far. + const std::vector& written_blobs_metadata() const; + + /// \brief Get the footer size. Returns error if Finish() has not been called. + Result footer_size() const; + + /// \brief Get the total file size. Returns error if Finish() has not been called. + Result file_size() const; + + private: + PuffinWriter(std::unique_ptr stream, + std::unordered_map properties, + PuffinCompressionCodec default_codec, bool compress_footer); + + /// Output stream. + std::unique_ptr stream_; + /// File-level properties to include in the footer. + std::unordered_map properties_; + /// Default compression codec for blobs without explicit compression. + PuffinCompressionCodec default_codec_; + /// Whether to compress the footer payload. + bool compress_footer_; + /// Metadata for all blobs written so far. + std::vector written_blobs_metadata_; + /// Whether the header magic has been written. + bool header_written_ = false; + /// Whether Finish() has been called. + bool finished_ = false; + /// Footer size, set after Finish(). + int64_t footer_size_ = -1; + /// Total file size, set after Finish(). + int64_t file_size_ = -1; + + Status WriteBytes(std::span data); + Status WriteHeader(); + Status WriteMagic(); +}; + +} // namespace iceberg::puffin diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index d9059c567..997d18354 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -146,7 +146,8 @@ add_iceberg_test(puffin_test USE_DATA SOURCES puffin_format_test.cc - puffin_json_test.cc) + puffin_json_test.cc + puffin_reader_writer_test.cc) if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(avro_test diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index e7f6165c9..b21a264b1 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -109,7 +109,11 @@ iceberg_tests = { 'use_data': true, }, 'puffin_test': { - 'sources': files('puffin_format_test.cc', 'puffin_json_test.cc'), + 'sources': files( + 'puffin_format_test.cc', + 'puffin_json_test.cc', + 'puffin_reader_writer_test.cc', + ), 'use_data': true, }, } diff --git a/src/iceberg/test/puffin_reader_writer_test.cc b/src/iceberg/test/puffin_reader_writer_test.cc new file mode 100644 index 000000000..c428f6747 --- /dev/null +++ b/src/iceberg/test/puffin_reader_writer_test.cc @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "iceberg/file_io.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/puffin/puffin_reader.h" +#include "iceberg/puffin/puffin_writer.h" +#include "iceberg/test/matchers.h" + +namespace iceberg::puffin { + +namespace { + +// Simple in-memory stream implementations for testing. + +class MemoryOutputStream : public PositionOutputStream { + public: + explicit MemoryOutputStream(std::shared_ptr> buffer) + : buffer_(std::move(buffer)) {} + + Result Position() const override { + return static_cast(buffer_->size()); + } + + Status Write(std::span data) override { + buffer_->insert(buffer_->end(), data.begin(), data.end()); + return {}; + } + + Status Flush() override { return {}; } + Status Close() override { return {}; } + + private: + std::shared_ptr> buffer_; +}; + +class MemoryInputStream : public SeekableInputStream { + public: + explicit MemoryInputStream(std::shared_ptr> buffer) + : buffer_(std::move(buffer)) {} + + Result Position() const override { return position_; } + + Status Seek(int64_t position) override { + position_ = position; + return {}; + } + + Result Read(std::span out) override { + auto available = static_cast(buffer_->size()) - position_; + auto to_read = std::min(static_cast(out.size()), available); + if (to_read > 0) { + std::memcpy(out.data(), buffer_->data() + position_, to_read); + position_ += to_read; + } + return to_read; + } + + Status ReadFully(int64_t position, std::span out) override { + if (position < 0 || position + static_cast(out.size()) > + static_cast(buffer_->size())) { + return Invalid("ReadFully out of bounds"); + } + std::memcpy(out.data(), buffer_->data() + position, out.size()); + return {}; + } + + Status Close() override { return {}; } + + private: + std::shared_ptr> buffer_; + int64_t position_ = 0; +}; + +class MemoryOutputFile : public OutputFile { + public: + explicit MemoryOutputFile(std::shared_ptr> buffer) + : buffer_(std::move(buffer)) {} + + std::string_view location() const override { return "memory://test.puffin"; } + + Result> Create() override { + return CreateOrOverwrite(); + } + + Result> CreateOrOverwrite() override { + buffer_->clear(); + return std::make_unique(buffer_); + } + + private: + std::shared_ptr> buffer_; +}; + +class MemoryInputFile : public InputFile { + public: + explicit MemoryInputFile(std::shared_ptr> buffer) + : buffer_(std::move(buffer)) {} + + std::string_view location() const override { return "memory://test.puffin"; } + + Result Size() const override { return static_cast(buffer_->size()); } + + Result> Open() override { + return std::make_unique(buffer_); + } + + private: + std::shared_ptr> buffer_; +}; + +// Helper to create a shared buffer and output/input file pair. +struct MemoryFile { + std::shared_ptr> buffer = + std::make_shared>(); + + std::unique_ptr output() { + return std::make_unique(buffer); + } + + std::unique_ptr input() { return std::make_unique(buffer); } +}; + +std::vector ToBytes(std::string_view str) { + return {reinterpret_cast(str.data()), + reinterpret_cast(str.data() + str.size())}; +} + +} // namespace + +// ============================================================================ +// PuffinWriter Tests +// ============================================================================ + +TEST(PuffinWriterTest, WriteEmptyFile) { + MemoryFile file; + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.output())); + ASSERT_THAT(writer->Finish(), IsOk()); + + auto& data = *file.buffer; + EXPECT_GE(data.size(), 20u); + // Header magic + EXPECT_EQ(data[0], std::byte{0x50}); + EXPECT_EQ(data[1], std::byte{0x46}); + EXPECT_EQ(data[2], std::byte{0x41}); + EXPECT_EQ(data[3], std::byte{0x31}); + // Footer end magic + auto sz = data.size(); + EXPECT_EQ(data[sz - 4], std::byte{0x50}); + EXPECT_EQ(data[sz - 3], std::byte{0x46}); + EXPECT_EQ(data[sz - 2], std::byte{0x41}); + EXPECT_EQ(data[sz - 1], std::byte{0x31}); + + EXPECT_TRUE(writer->written_blobs_metadata().empty()); + ICEBERG_UNWRAP_OR_FAIL(auto fsize, writer->file_size()); + EXPECT_EQ(fsize, static_cast(data.size())); +} + +TEST(PuffinWriterTest, WriterRejectsAfterFinish) { + MemoryFile file; + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.output())); + ASSERT_THAT(writer->Finish(), IsOk()); + + // Double finish + EXPECT_THAT(writer->Finish(), IsError(ErrorKind::kInvalid)); + + // Write after finish + Blob blob{.type = "a", .snapshot_id = 1, .sequence_number = 0}; + EXPECT_THAT(writer->Write(blob), IsError(ErrorKind::kInvalid)); +} + +TEST(PuffinWriterTest, MakeRejectsNullOutput) { + EXPECT_THAT(PuffinWriter::Make(nullptr), IsError(ErrorKind::kInvalidArgument)); +} + +TEST(PuffinWriterTest, SizesBeforeFinishReturnError) { + MemoryFile file; + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.output())); + EXPECT_THAT(writer->footer_size(), IsError(ErrorKind::kInvalid)); + EXPECT_THAT(writer->file_size(), IsError(ErrorKind::kInvalid)); +} + +// ============================================================================ +// Round-Trip Tests +// ============================================================================ + +TEST(PuffinRoundTripTest, SingleBlob) { + MemoryFile file; + { + ICEBERG_UNWRAP_OR_FAIL(auto writer, + PuffinWriter::Make(file.output(), {{"created-by", "test"}})); + std::vector blob_data = {0x01, 0x02, 0x03, 0x04, 0x05}; + ICEBERG_UNWRAP_OR_FAIL(auto meta, writer->Write(Blob{.type = "test-blob", + .input_fields = {1, 2}, + .snapshot_id = 42, + .sequence_number = 7, + .data = blob_data})); + EXPECT_EQ(meta.type, "test-blob"); + EXPECT_EQ(meta.offset, 4); // after header magic + EXPECT_EQ(meta.length, 5); + ASSERT_THAT(writer->Finish(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto fsize, writer->file_size()); + EXPECT_GT(fsize, 0); + } + + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.input())); + ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); + ASSERT_EQ(fm.blobs.size(), 1); + EXPECT_EQ(fm.blobs[0].type, "test-blob"); + EXPECT_EQ(fm.properties.at("created-by"), "test"); + + ICEBERG_UNWRAP_OR_FAIL(auto blob_result, reader->ReadBlob(fm.blobs[0])); + std::vector expected = {std::byte{0x01}, std::byte{0x02}, std::byte{0x03}, + std::byte{0x04}, std::byte{0x05}}; + EXPECT_EQ(blob_result.second, expected); +} + +TEST(PuffinRoundTripTest, MultipleBlobs) { + MemoryFile file; + { + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.output())); + ICEBERG_UNWRAP_OR_FAIL(auto m1, writer->Write(Blob{.type = "first", + .input_fields = {1}, + .snapshot_id = 1, + .sequence_number = 0, + .data = {'a', 'b', 'c'}})); + ICEBERG_UNWRAP_OR_FAIL(auto m2, writer->Write(Blob{.type = "second", + .input_fields = {2}, + .snapshot_id = 2, + .sequence_number = 1, + .data = {'d', 'e', 'f', 'g'}, + .properties = {{"key", "val"}}})); + EXPECT_EQ(m2.offset, 7); // header(4) + first blob(3) + EXPECT_EQ(m2.length, 4); + ASSERT_THAT(writer->Finish(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.input())); + ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); + ASSERT_EQ(fm.blobs.size(), 2); + EXPECT_TRUE(fm.blobs[0].properties.empty()); + EXPECT_EQ(fm.blobs[1].properties.at("key"), "val"); + + ICEBERG_UNWRAP_OR_FAIL(auto all, reader->ReadAll(fm.blobs)); + ASSERT_EQ(all.size(), 2); + EXPECT_EQ(all[0].second, ToBytes("abc")); + EXPECT_EQ(all[1].second, ToBytes("defg")); +} + +TEST(PuffinRoundTripTest, WithProperties) { + MemoryFile file; + { + ICEBERG_UNWRAP_OR_FAIL( + auto writer, + PuffinWriter::Make(file.output(), {{"created-by", "iceberg-cpp-test"}})); + std::string text = "hello puffin"; + std::vector blob_data(text.begin(), text.end()); + ASSERT_THAT(writer->Write(Blob{.type = "text-blob", + .input_fields = {1}, + .snapshot_id = 100, + .sequence_number = 5, + .data = blob_data, + .properties = {{"encoding", "utf-8"}}}), + IsOk()); + ASSERT_THAT(writer->Finish(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.input())); + ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); + EXPECT_EQ(fm.properties.at("created-by"), "iceberg-cpp-test"); + ASSERT_EQ(fm.blobs.size(), 1); + EXPECT_EQ(fm.blobs[0].properties.at("encoding"), "utf-8"); + + ICEBERG_UNWRAP_OR_FAIL(auto blob_result, reader->ReadBlob(fm.blobs[0])); + EXPECT_EQ(blob_result.second, ToBytes("hello puffin")); +} + +// ============================================================================ +// PuffinReader Error Tests +// ============================================================================ + +TEST(PuffinReaderTest, MakeRejectsNullInput) { + EXPECT_THAT(PuffinReader::Make(nullptr), IsError(ErrorKind::kInvalidArgument)); +} + +TEST(PuffinReaderTest, InvalidMagic) { + auto buffer = std::make_shared>(16, std::byte{0x00}); + auto input = std::make_unique(buffer); + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(std::move(input))); + EXPECT_THAT(reader->ReadFileMetadata(), IsError(ErrorKind::kInvalid)); +} + +TEST(PuffinReaderTest, TruncatedFile) { + auto buffer = std::make_shared>(2, std::byte{0x50}); + auto input = std::make_unique(buffer); + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(std::move(input))); + EXPECT_THAT(reader->ReadFileMetadata(), IsError(ErrorKind::kInvalid)); +} + +TEST(PuffinReaderTest, InvalidBlobOffset) { + MemoryFile file; + { + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.output())); + ASSERT_THAT(writer->Finish(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.input())); + BlobMetadata bad_meta{.type = "bad", + .snapshot_id = 1, + .sequence_number = 0, + .offset = 9999, + .length = 100}; + EXPECT_THAT(reader->ReadBlob(bad_meta), IsError(ErrorKind::kInvalid)); +} + +TEST(PuffinReaderTest, UnknownFlagsRejected) { + // Build a valid puffin file then tamper with flags + MemoryFile file; + { + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.output())); + ASSERT_THAT(writer->Finish(), IsOk()); + } + // Set unknown flag bit in the footer struct (flags are at offset -8 from end) + auto& data = *file.buffer; + data[data.size() - 8] = std::byte{0x02}; // bit 1 is unknown + + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.input())); + EXPECT_THAT(reader->ReadFileMetadata(), IsError(ErrorKind::kInvalid)); +} + +// ============================================================================ +// Java Binary Compatibility Tests +// ============================================================================ + +TEST(PuffinReaderTest, JavaEmptyPuffinCompatibility) { + auto buffer = std::make_shared>(std::vector{ + std::byte{0x50}, std::byte{0x46}, std::byte{0x41}, std::byte{0x31}, std::byte{0x50}, + std::byte{0x46}, std::byte{0x41}, std::byte{0x31}, std::byte{0x7b}, std::byte{0x22}, + std::byte{0x62}, std::byte{0x6c}, std::byte{0x6f}, std::byte{0x62}, std::byte{0x73}, + std::byte{0x22}, std::byte{0x3a}, std::byte{0x5b}, std::byte{0x5d}, std::byte{0x7d}, + std::byte{0x0c}, std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, + std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, std::byte{0x50}, std::byte{0x46}, + std::byte{0x41}, std::byte{0x31}, + }); + + auto input = std::make_unique(buffer); + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(std::move(input))); + ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); + EXPECT_TRUE(fm.blobs.empty()); + EXPECT_TRUE(fm.properties.empty()); +} + +} // namespace iceberg::puffin From e936d4940c3eda9c02153e5818a540c36fc87396 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 27 May 2026 23:53:07 +0800 Subject: [PATCH 2/2] fix various issues and polish some lines --- src/iceberg/puffin/puffin_reader.cc | 255 ++++++++++------ src/iceberg/puffin/puffin_reader.h | 21 +- src/iceberg/puffin/puffin_writer.cc | 70 +++-- src/iceberg/puffin/puffin_writer.h | 35 +-- src/iceberg/test/mock_io.h | 217 +++++++++++++ src/iceberg/test/puffin_reader_writer_test.cc | 288 +++++++----------- src/iceberg/type_fwd.h | 5 + 7 files changed, 565 insertions(+), 326 deletions(-) diff --git a/src/iceberg/puffin/puffin_reader.cc b/src/iceberg/puffin/puffin_reader.cc index 6d6ec0ccb..47ac58f13 100644 --- a/src/iceberg/puffin/puffin_reader.cc +++ b/src/iceberg/puffin/puffin_reader.cc @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include #include "iceberg/file_io.h" @@ -35,32 +37,110 @@ namespace iceberg::puffin { namespace { -// Validate magic bytes in a buffer at offset 0. -Status CheckMagic(std::span data) { - if (static_cast(data.size()) < PuffinFormat::kMagicLength) { - return Invalid("Invalid file: buffer too small for magic"); - } - auto* begin = reinterpret_cast(data.data()); - if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) { - return Invalid( - "Invalid file: expected magic, got [{:#04x}, {:#04x}, {:#04x}, {:#04x}]", - begin[0], begin[1], begin[2], begin[3]); - } +struct FooterInfo { + int32_t payload_size; + PuffinCompressionCodec compression; +}; + +Status CheckMagic(std::span data, int64_t offset = 0) { + ICEBERG_PRECHECK(offset >= 0, "Invalid file: magic offset {} is negative", offset); + auto offset_size = static_cast(offset); + ICEBERG_PRECHECK(offset_size <= data.size() && + data.size() - offset_size >= PuffinFormat::kMagicLength, + "Invalid file: buffer too small for magic at offset {}", offset); + auto* begin = reinterpret_cast(data.data() + offset_size); + ICEBERG_PRECHECK( + std::equal(PuffinFormat::kMagicV1.cbegin(), PuffinFormat::kMagicV1.cend(), begin), + "Invalid file: expected magic at offset {}, got [{:#04x}, {:#04x}, {:#04x}, " + "{:#04x}]", + offset, begin[0], begin[1], begin[2], begin[3]); return {}; } -// Validate that no unknown flag bits are set. Status CheckUnknownFlags(std::span flags) { constexpr uint8_t kKnownBitsMask = 0x01; - if ((flags[0] & ~kKnownBitsMask) != 0 || flags[1] != 0 || flags[2] != 0 || - flags[3] != 0) { - return Invalid( - "Invalid file: unknown footer flags set [{:#04x}, {:#04x}, {:#04x}, {:#04x}]", - flags[0], flags[1], flags[2], flags[3]); + ICEBERG_PRECHECK( + (flags[0] & ~kKnownBitsMask) == 0 && flags[1] == 0 && flags[2] == 0 && + flags[3] == 0, + "Invalid file: unknown footer flags set [{:#04x}, {:#04x}, {:#04x}, {:#04x}]", + flags[0], flags[1], flags[2], flags[3]); + return {}; +} + +Result FooterPayloadSize(std::span footer_struct) { + ICEBERG_PRECHECK(footer_struct.size() >= PuffinFormat::kFooterStructLength, + "Invalid file: footer struct is too small"); + auto payload_size = ReadLittleEndian( + footer_struct.data() + PuffinFormat::kFooterStructPayloadSizeOffset); + ICEBERG_PRECHECK(payload_size >= 0, "Invalid file: negative payload size {}", + payload_size); + return payload_size; +} + +Result> DecodeFlags(std::span footer_struct) { + ICEBERG_PRECHECK(footer_struct.size() >= PuffinFormat::kFooterStructLength, + "Invalid file: footer struct is too small"); + std::array flags{}; + std::memcpy(flags.data(), footer_struct.data() + PuffinFormat::kFooterStructFlagsOffset, + flags.size()); + ICEBERG_RETURN_UNEXPECTED(CheckUnknownFlags(flags)); + return flags; +} + +PuffinCompressionCodec FooterCompressionCodec(std::span flags) { + if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) { + return PuffinFormat::kDefaultFooterCompressionCodec; } + return PuffinCompressionCodec::kNone; +} + +Status CheckFooterSize(int64_t footer_size, int32_t payload_size) { + auto expected_footer_size = PuffinFormat::kFooterStartMagicLength + + static_cast(payload_size) + + PuffinFormat::kFooterStructLength; + ICEBERG_PRECHECK(footer_size == expected_footer_size, + "Invalid file: footer size {} does not match payload size {}", + footer_size, payload_size); return {}; } +Result DecodeFooterInfo(std::span footer, + int64_t footer_size) { + ICEBERG_PRECHECK(footer_size >= PuffinFormat::kFooterStartMagicLength + + PuffinFormat::kFooterStructLength, + "Invalid file: footer size {} is too small", footer_size); + ICEBERG_PRECHECK(static_cast(footer_size) <= footer.size(), + "Invalid file: footer size {} exceeds buffer size {}", footer_size, + footer.size()); + + ICEBERG_RETURN_UNEXPECTED(CheckMagic(footer, PuffinFormat::kFooterStartMagicOffset)); + + auto footer_struct_offset = footer_size - PuffinFormat::kFooterStructLength; + std::span footer_struct(footer.data() + footer_struct_offset, + PuffinFormat::kFooterStructLength); + ICEBERG_RETURN_UNEXPECTED( + CheckMagic(footer_struct, PuffinFormat::kFooterStructMagicOffset)); + + ICEBERG_ASSIGN_OR_RAISE(auto payload_size, FooterPayloadSize(footer_struct)); + ICEBERG_RETURN_UNEXPECTED(CheckFooterSize(footer_size, payload_size)); + + ICEBERG_ASSIGN_OR_RAISE(auto flags, DecodeFlags(footer_struct)); + return FooterInfo{.payload_size = payload_size, + .compression = FooterCompressionCodec(flags)}; +} + +Result ParseFileMetadata(std::span payload, + PuffinCompressionCodec compression) { + std::vector decompressed; + if (compression != PuffinCompressionCodec::kNone) { + ICEBERG_ASSIGN_OR_RAISE(decompressed, Decompress(compression, payload)); + payload = decompressed; + } + + return FileMetadataFromJsonString( + std::string_view(reinterpret_cast(payload.data()), payload.size())); +} + } // namespace PuffinReader::PuffinReader(std::unique_ptr stream, int64_t file_size, @@ -72,108 +152,88 @@ PuffinReader::PuffinReader(std::unique_ptr stream, int64_t PuffinReader::~PuffinReader() = default; Result> PuffinReader::Make( - std::unique_ptr input_file, std::optional footer_size) { - if (!input_file) { - return InvalidArgument("input_file must not be null"); + std::unique_ptr input_file, std::optional footer_size, + std::optional file_size) { + ICEBERG_PRECHECK(input_file, "Input file must not be null"); + int64_t resolved_file_size = 0; + if (file_size.has_value()) { + ICEBERG_PRECHECK(*file_size >= 0, "File size must not be negative: {}", *file_size); + resolved_file_size = *file_size; + } else { + ICEBERG_ASSIGN_OR_RAISE(resolved_file_size, input_file->Size()); + } + if (footer_size.has_value()) { + ICEBERG_PRECHECK(*footer_size > 0, "Footer size must be positive: {}", *footer_size); + ICEBERG_PRECHECK(*footer_size <= resolved_file_size - PuffinFormat::kMagicLength, + "Footer size {} exceeds file size {}", *footer_size, + resolved_file_size); + ICEBERG_PRECHECK(*footer_size <= std::numeric_limits::max(), + "Footer size {} is too large", *footer_size); } - ICEBERG_ASSIGN_OR_RAISE(auto file_size, input_file->Size()); ICEBERG_ASSIGN_OR_RAISE(auto stream, input_file->Open()); return std::unique_ptr( - new PuffinReader(std::move(stream), file_size, footer_size)); + new PuffinReader(std::move(stream), resolved_file_size, footer_size)); } Result> PuffinReader::ReadBytes(int64_t offset, int64_t length) { - if (offset < 0 || length < 0 || offset > file_size_ || length > file_size_ - offset) { - return Invalid("Read out of bounds: offset {} + length {} exceeds file size {}", - offset, length, file_size_); - } + ICEBERG_PRECHECK(!closed_, "Reader already closed"); + ICEBERG_PRECHECK(offset >= 0, "Offset must not be negative: {}", offset); + ICEBERG_PRECHECK(length >= 0, "Length must not be negative: {}", length); + ICEBERG_PRECHECK(offset <= file_size_, "Offset {} exceeds file size {}", offset, + file_size_); + ICEBERG_PRECHECK(length <= file_size_ - offset, + "Length {} exceeds file size {} at offset {}", length, file_size_, + offset); std::vector buf(length); ICEBERG_RETURN_UNEXPECTED(stream_->ReadFully(offset, buf)); return buf; } -Result PuffinReader::ReadFileMetadata() { - if (file_size_ < PuffinFormat::kFooterStructLength) { - return Invalid("Invalid file: file length {} is less than minimal footer size {}", - file_size_, PuffinFormat::kFooterStructLength); - } - - // Validate header magic - ICEBERG_ASSIGN_OR_RAISE(auto header_bytes, ReadBytes(0, PuffinFormat::kMagicLength)); - ICEBERG_RETURN_UNEXPECTED(CheckMagic(header_bytes)); - - // Read footer struct from end of file - auto footer_struct_offset = file_size_ - PuffinFormat::kFooterStructLength; - ICEBERG_ASSIGN_OR_RAISE( - auto footer_struct, - ReadBytes(footer_struct_offset, PuffinFormat::kFooterStructLength)); - - // Validate footer end magic - std::span footer_end_magic( - footer_struct.data() + PuffinFormat::kFooterStructMagicOffset, - PuffinFormat::kMagicLength); - ICEBERG_RETURN_UNEXPECTED(CheckMagic(footer_end_magic)); - - // Read payload size - auto payload_size = ReadLittleEndian( - footer_struct.data() + PuffinFormat::kFooterStructPayloadSizeOffset); - - if (payload_size < 0) { - return Invalid("Invalid file: negative payload size {}", payload_size); - } - - // Calculate total footer size and validate - int64_t footer_size = PuffinFormat::kFooterStartMagicLength + - static_cast(payload_size) + - PuffinFormat::kFooterStructLength; - auto footer_offset = file_size_ - footer_size; - if (footer_offset < 0) { - return Invalid("Invalid file: footer size {} exceeds file size {}", footer_size, - file_size_); +Result PuffinReader::FooterSize() { + if (known_footer_size_.has_value()) { + return *known_footer_size_; } - // Validate footer start magic - ICEBERG_ASSIGN_OR_RAISE(auto footer_start_magic, - ReadBytes(footer_offset, PuffinFormat::kMagicLength)); - ICEBERG_RETURN_UNEXPECTED(CheckMagic(footer_start_magic)); + ICEBERG_ASSIGN_OR_RAISE(auto footer_struct, + ReadBytes(file_size_ - PuffinFormat::kFooterStructLength, + PuffinFormat::kFooterStructLength)); + ICEBERG_RETURN_UNEXPECTED( + CheckMagic(footer_struct, PuffinFormat::kFooterStructMagicOffset)); - // Check flags - std::array flags{}; - std::memcpy(flags.data(), footer_struct.data() + PuffinFormat::kFooterStructFlagsOffset, - 4); - ICEBERG_RETURN_UNEXPECTED(CheckUnknownFlags(flags)); + ICEBERG_ASSIGN_OR_RAISE(auto payload_size, FooterPayloadSize(footer_struct)); + known_footer_size_ = PuffinFormat::kFooterStartMagicLength + + static_cast(payload_size) + + PuffinFormat::kFooterStructLength; + return *known_footer_size_; +} - PuffinCompressionCodec footer_compression = PuffinCompressionCodec::kNone; - if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) { - footer_compression = PuffinFormat::kDefaultFooterCompressionCodec; - } +Result> PuffinReader::ReadFooter(int64_t footer_size) { + return ReadBytes(file_size_ - footer_size, footer_size); +} - // Read and decompress footer payload - auto payload_offset = footer_offset + PuffinFormat::kFooterStartMagicLength; - ICEBERG_ASSIGN_OR_RAISE(auto payload_bytes, ReadBytes(payload_offset, payload_size)); - ICEBERG_ASSIGN_OR_RAISE(auto decompressed, - Decompress(footer_compression, payload_bytes)); +Result PuffinReader::ReadFileMetadata() { + ICEBERG_ASSIGN_OR_RAISE(auto header_bytes, ReadBytes(0, PuffinFormat::kMagicLength)); + ICEBERG_RETURN_UNEXPECTED(CheckMagic(header_bytes)); - // Parse JSON - std::string_view json_str(reinterpret_cast(decompressed.data()), - decompressed.size()); - return FileMetadataFromJsonString(json_str); + ICEBERG_ASSIGN_OR_RAISE(auto footer_size, FooterSize()); + ICEBERG_ASSIGN_OR_RAISE(auto footer, ReadFooter(footer_size)); + ICEBERG_ASSIGN_OR_RAISE(auto footer_info, DecodeFooterInfo(footer, footer_size)); + std::span payload_bytes( + footer.data() + PuffinFormat::kFooterStartMagicLength, footer_info.payload_size); + return ParseFileMetadata(payload_bytes, footer_info.compression); } Result>> PuffinReader::ReadBlob( const BlobMetadata& blob_metadata) { - if (blob_metadata.offset < 0 || blob_metadata.length < 0 || - blob_metadata.offset > file_size_ || - blob_metadata.length > file_size_ - blob_metadata.offset) { - return Invalid("Invalid blob: offset {} + length {} exceeds file size {}", - blob_metadata.offset, blob_metadata.length, file_size_); - } - ICEBERG_ASSIGN_OR_RAISE(auto raw_data, ReadBytes(blob_metadata.offset, blob_metadata.length)); ICEBERG_ASSIGN_OR_RAISE( auto codec, PuffinCompressionCodecFromName(blob_metadata.compression_codec)); + if (codec == PuffinCompressionCodec::kNone) { + return std::pair{blob_metadata, std::move(raw_data)}; + } + ICEBERG_ASSIGN_OR_RAISE(auto decompressed, Decompress(codec, raw_data)); return std::pair{blob_metadata, std::move(decompressed)}; @@ -199,4 +259,13 @@ PuffinReader::ReadAll(const std::vector& blobs) { return results; } +Status PuffinReader::Close() { + if (closed_) { + return {}; + } + ICEBERG_RETURN_UNEXPECTED(stream_->Close()); + closed_ = true; + return {}; +} + } // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_reader.h b/src/iceberg/puffin/puffin_reader.h index bc1bfb9a9..3b805426b 100644 --- a/src/iceberg/puffin/puffin_reader.h +++ b/src/iceberg/puffin/puffin_reader.h @@ -32,11 +32,7 @@ #include "iceberg/iceberg_data_export.h" #include "iceberg/puffin/file_metadata.h" #include "iceberg/result.h" - -namespace iceberg { -class InputFile; -class SeekableInputStream; -} // namespace iceberg +#include "iceberg/type_fwd.h" namespace iceberg::puffin { @@ -48,9 +44,11 @@ class ICEBERG_DATA_EXPORT PuffinReader { /// \brief Create a PuffinReader for the given input file. /// \param input_file The input file to read from. /// \param footer_size Optional known footer size hint to avoid an extra seek. + /// \param file_size Optional known file size hint to avoid fetching size. static Result> Make( std::unique_ptr input_file, - std::optional footer_size = std::nullopt); + std::optional footer_size = std::nullopt, + std::optional file_size = std::nullopt); ~PuffinReader(); @@ -68,18 +66,25 @@ class ICEBERG_DATA_EXPORT PuffinReader { Result>>> ReadAll( const std::vector& blobs); + /// \brief Close the underlying input stream. + Status Close(); + private: PuffinReader(std::unique_ptr stream, int64_t file_size, std::optional known_footer_size); + Result> ReadBytes(int64_t offset, int64_t length); + Result FooterSize(); + Result> ReadFooter(int64_t footer_size); + /// Opened input stream. std::unique_ptr stream_; /// Total file size. int64_t file_size_; /// Known footer size hint (avoids one seek if provided). std::optional known_footer_size_; - - Result> ReadBytes(int64_t offset, int64_t length); + /// Whether the reader has been closed. + bool closed_ = false; }; } // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_writer.cc b/src/iceberg/puffin/puffin_writer.cc index 9b646f167..db749117f 100644 --- a/src/iceberg/puffin/puffin_writer.cc +++ b/src/iceberg/puffin/puffin_writer.cc @@ -20,6 +20,7 @@ #include "iceberg/puffin/puffin_writer.h" #include +#include #include "iceberg/file_io.h" #include "iceberg/puffin/json_serde_internal.h" @@ -43,10 +44,8 @@ Result> PuffinWriter::Make( std::unique_ptr output_file, std::unordered_map properties, PuffinCompressionCodec default_codec, bool compress_footer) { - if (!output_file) { - return InvalidArgument("output_file must not be null"); - } - ICEBERG_ASSIGN_OR_RAISE(auto stream, output_file->CreateOrOverwrite()); + ICEBERG_PRECHECK(output_file, "Output file must not be null"); + ICEBERG_ASSIGN_OR_RAISE(auto stream, output_file->Create()); return std::unique_ptr(new PuffinWriter( std::move(stream), std::move(properties), default_codec, compress_footer)); } @@ -69,21 +68,22 @@ Status PuffinWriter::WriteHeader() { } Result PuffinWriter::Write(const Blob& blob) { - if (finished_) { - return Invalid("Writer already finished"); - } - + ICEBERG_PRECHECK(!finished_ && !footer_written_, "Writer already finished"); ICEBERG_RETURN_UNEXPECTED(WriteHeader()); auto codec = blob.requested_compression.value_or(default_codec_); std::span input_span( reinterpret_cast(blob.data.data()), blob.data.size()); - ICEBERG_ASSIGN_OR_RAISE(auto compressed, Compress(codec, input_span)); + std::vector compressed; + auto output_span = input_span; + if (codec != PuffinCompressionCodec::kNone) { + ICEBERG_ASSIGN_OR_RAISE(compressed, Compress(codec, input_span)); + output_span = std::span(compressed.data(), compressed.size()); + } ICEBERG_ASSIGN_OR_RAISE(auto offset, stream_->Position()); - ICEBERG_RETURN_UNEXPECTED( - WriteBytes(std::span(compressed.data(), compressed.size()))); - auto length = static_cast(compressed.size()); + ICEBERG_RETURN_UNEXPECTED(WriteBytes(output_span)); + auto length = static_cast(output_span.size()); auto codec_name = CodecName(codec); BlobMetadata metadata{ @@ -101,9 +101,8 @@ Result PuffinWriter::Write(const Blob& blob) { } Status PuffinWriter::Finish() { - if (finished_) { - return Invalid("Writer already finished"); - } + ICEBERG_PRECHECK(!finished_, "Writer already finished"); + ICEBERG_PRECHECK(!footer_written_, "Footer already written"); ICEBERG_RETURN_UNEXPECTED(WriteHeader()); @@ -113,18 +112,24 @@ Status PuffinWriter::Finish() { }; auto footer_json = ToJsonString(file_metadata); - std::vector footer_payload( - reinterpret_cast(footer_json.data()), - reinterpret_cast(footer_json.data() + footer_json.size())); + std::span footer_payload( + reinterpret_cast(footer_json.data()), footer_json.size()); + std::vector compressed_footer_payload; // Compress footer if requested std::array flags{}; if (compress_footer_) { ICEBERG_ASSIGN_OR_RAISE( - footer_payload, + compressed_footer_payload, Compress(PuffinFormat::kDefaultFooterCompressionCodec, footer_payload)); + footer_payload = std::span(compressed_footer_payload.data(), + compressed_footer_payload.size()); SetFlag(flags, PuffinFlag::kFooterPayloadCompressed); } + ICEBERG_CHECK( + footer_payload.size() <= static_cast(std::numeric_limits::max()), + "Footer payload is too large: {}", footer_payload.size()); + auto payload_size = static_cast(footer_payload.size()); // Footer start magic ICEBERG_ASSIGN_OR_RAISE(auto footer_start, stream_->Position()); @@ -134,7 +139,6 @@ Status PuffinWriter::Finish() { ICEBERG_RETURN_UNEXPECTED(WriteBytes(footer_payload)); // Footer struct: payload_size (4) + flags (4) + magic (4) - auto payload_size = static_cast(footer_payload.size()); std::array size_buf{}; WriteLittleEndian(payload_size, size_buf.data()); ICEBERG_RETURN_UNEXPECTED(WriteBytes(size_buf)); @@ -148,28 +152,32 @@ Status PuffinWriter::Finish() { ICEBERG_ASSIGN_OR_RAISE(auto end_pos, stream_->Position()); footer_size_ = end_pos - footer_start; - file_size_ = end_pos; + footer_written_ = true; + ICEBERG_RETURN_UNEXPECTED(stream_->Flush()); + ICEBERG_RETURN_UNEXPECTED(stream_->Close()); + ICEBERG_ASSIGN_OR_RAISE(file_size_, stream_->Position()); finished_ = true; + return {}; +} - ICEBERG_RETURN_UNEXPECTED(stream_->Flush()); - return stream_->Close(); +Status PuffinWriter::Close() { + if (finished_) { + return {}; + } + return Finish(); } const std::vector& PuffinWriter::written_blobs_metadata() const { return written_blobs_metadata_; } -Result PuffinWriter::footer_size() const { - if (!finished_) { - return Invalid("Writer not finished yet"); - } +Result PuffinWriter::FooterSize() const { + ICEBERG_PRECHECK(footer_written_, "Footer not written yet"); return footer_size_; } -Result PuffinWriter::file_size() const { - if (!finished_) { - return Invalid("Writer not finished yet"); - } +Result PuffinWriter::FileSize() const { + ICEBERG_PRECHECK(finished_, "Writer not finished yet"); return file_size_; } diff --git a/src/iceberg/puffin/puffin_writer.h b/src/iceberg/puffin/puffin_writer.h index ace2dbf99..2a7984091 100644 --- a/src/iceberg/puffin/puffin_writer.h +++ b/src/iceberg/puffin/puffin_writer.h @@ -33,11 +33,7 @@ #include "iceberg/iceberg_data_export.h" #include "iceberg/puffin/file_metadata.h" #include "iceberg/result.h" - -namespace iceberg { -class OutputFile; -class PositionOutputStream; -} // namespace iceberg +#include "iceberg/type_fwd.h" namespace iceberg::puffin { @@ -65,42 +61,47 @@ class ICEBERG_DATA_EXPORT PuffinWriter { /// \brief Finalize the file by writing the footer and closing the stream. Status Finish(); + /// \brief Close the writer, finalizing the file if needed. + Status Close(); + /// \brief Get metadata for all blobs written so far. const std::vector& written_blobs_metadata() const; - /// \brief Get the footer size. Returns error if Finish() has not been called. - Result footer_size() const; + /// \brief Get the footer size. Returns error if the footer has not been written. + Result FooterSize() const; - /// \brief Get the total file size. Returns error if Finish() has not been called. - Result file_size() const; + /// \brief Get the total file size. Returns error if Finish() has not succeeded. + Result FileSize() const; private: PuffinWriter(std::unique_ptr stream, std::unordered_map properties, PuffinCompressionCodec default_codec, bool compress_footer); + Status WriteBytes(std::span data); + Status WriteHeader(); + Status WriteMagic(); + /// Output stream. std::unique_ptr stream_; /// File-level properties to include in the footer. std::unordered_map properties_; /// Default compression codec for blobs without explicit compression. - PuffinCompressionCodec default_codec_; + const PuffinCompressionCodec default_codec_; /// Whether to compress the footer payload. - bool compress_footer_; + const bool compress_footer_; /// Metadata for all blobs written so far. std::vector written_blobs_metadata_; /// Whether the header magic has been written. bool header_written_ = false; - /// Whether Finish() has been called. + /// Whether the footer has been written. + bool footer_written_ = false; + /// Whether Finish() has succeeded. bool finished_ = false; - /// Footer size, set after Finish(). + /// Footer size, set after the footer is written. int64_t footer_size_ = -1; /// Total file size, set after Finish(). int64_t file_size_ = -1; - - Status WriteBytes(std::span data); - Status WriteHeader(); - Status WriteMagic(); }; } // namespace iceberg::puffin diff --git a/src/iceberg/test/mock_io.h b/src/iceberg/test/mock_io.h index c9f38e505..f643a852b 100644 --- a/src/iceberg/test/mock_io.h +++ b/src/iceberg/test/mock_io.h @@ -19,10 +19,25 @@ #pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + #include #include #include "iceberg/file_io.h" +#include "iceberg/util/macros.h" namespace iceberg { @@ -31,12 +46,214 @@ class MockFileIO : public FileIO { MockFileIO() = default; ~MockFileIO() override = default; + Result> NewInputFile(std::string file_location) override { + auto file = FindFile(file_location); + if (!file) { + return NotFound("File does not exist: {}", file_location); + } + return std::make_unique(std::move(file_location), std::move(file), + std::nullopt); + } + + Result> NewInputFile(std::string file_location, + size_t length) override { + if (length > static_cast(std::numeric_limits::max())) { + return InvalidArgument("File length {} exceeds int64_t max", length); + } + auto file = FindFile(file_location); + if (!file) { + return NotFound("File does not exist: {}", file_location); + } + return std::make_unique(std::move(file_location), std::move(file), + static_cast(length)); + } + + Result> NewOutputFile(std::string file_location) override { + return std::make_unique(files_, std::move(file_location)); + } + + void AddFile(std::string file_location, std::span data) { + files_->insert_or_assign( + std::move(file_location), + std::make_shared>(data.begin(), data.end())); + } + + void AddFile(std::string file_location, std::string_view data) { + AddFile(std::move(file_location), + std::as_bytes(std::span(data.data(), data.size()))); + } + + std::vector& FileData(const std::string& file_location) { + return *GetOrCreateFile(file_location); + } + + const std::vector& FileData(const std::string& file_location) const { + return *files_->at(file_location); + } + MOCK_METHOD((Result), ReadFile, (const std::string&, std::optional), (override)); MOCK_METHOD(Status, WriteFile, (const std::string&, std::string_view), (override)); MOCK_METHOD(Status, DeleteFile, (const std::string&), (override)); + + private: + using FileMap = + std::unordered_map>>; + + class InMemoryInputStream : public SeekableInputStream { + public: + explicit InMemoryInputStream(std::shared_ptr> data) + : data_(std::move(data)) {} + + Result Position() const override { return position_; } + + Status Seek(int64_t position) override { + ICEBERG_PRECHECK(!closed_, "Input stream is closed"); + ICEBERG_PRECHECK(position >= 0, "Position must not be negative: {}", position); + position_ = position; + return {}; + } + + Result Read(std::span out) override { + ICEBERG_PRECHECK(!closed_, "Input stream is closed"); + auto file_size = static_cast(data_->size()); + ICEBERG_PRECHECK(position_ <= file_size, "Position {} exceeds file size {}", + position_, file_size); + auto bytes_to_read = + std::min(static_cast(out.size()), file_size - position_); + if (bytes_to_read > 0) { + std::memcpy(out.data(), data_->data() + static_cast(position_), + static_cast(bytes_to_read)); + position_ += bytes_to_read; + } + return bytes_to_read; + } + + Status ReadFully(int64_t position, std::span out) override { + ICEBERG_PRECHECK(!closed_, "Input stream is closed"); + ICEBERG_PRECHECK(position >= 0, "Position must not be negative: {}", position); + auto file_size = static_cast(data_->size()); + ICEBERG_PRECHECK(static_cast(out.size()) <= file_size - position, + "Read out of bounds: offset {} + length {} exceeds file size {}", + position, out.size(), file_size); + if (!out.empty()) { + std::memcpy(out.data(), data_->data() + static_cast(position), + out.size()); + } + return {}; + } + + Status Close() override { + closed_ = true; + return {}; + } + + private: + std::shared_ptr> data_; + int64_t position_ = 0; + bool closed_ = false; + }; + + class InMemoryOutputStream : public PositionOutputStream { + public: + explicit InMemoryOutputStream(std::shared_ptr> data) + : data_(std::move(data)) {} + + Result Position() const override { + return static_cast(data_->size()); + } + + Status Write(std::span data) override { + ICEBERG_PRECHECK(!closed_, "Output stream is closed"); + data_->insert(data_->end(), data.begin(), data.end()); + return {}; + } + + Status Flush() override { + ICEBERG_PRECHECK(!closed_, "Output stream is closed"); + return {}; + } + + Status Close() override { + closed_ = true; + return {}; + } + + private: + std::shared_ptr> data_; + bool closed_ = false; + }; + + class InMemoryInputFile : public InputFile { + public: + InMemoryInputFile(std::string location, std::shared_ptr> data, + std::optional length) + : location_(std::move(location)), data_(std::move(data)), length_(length) {} + + std::string_view location() const override { return location_; } + + Result Size() const override { + return length_.value_or(static_cast(data_->size())); + } + + Result> Open() override { + return std::make_unique(data_); + } + + private: + std::string location_; + std::shared_ptr> data_; + std::optional length_; + }; + + class InMemoryOutputFile : public OutputFile { + public: + InMemoryOutputFile(std::shared_ptr files, std::string location) + : files_(std::move(files)), location_(std::move(location)) {} + + std::string_view location() const override { return location_; } + + Result> Create() override { + if (files_->contains(location_)) { + return AlreadyExists("File already exists: {}", location_); + } + auto file = std::make_shared>(); + files_->emplace(location_, file); + return std::make_unique(std::move(file)); + } + + Result> CreateOrOverwrite() override { + auto file = std::make_shared>(); + files_->insert_or_assign(location_, file); + return std::make_unique(std::move(file)); + } + + private: + std::shared_ptr files_; + std::string location_; + }; + + std::shared_ptr> FindFile( + const std::string& file_location) const { + auto file = files_->find(file_location); + if (file == files_->end()) { + return nullptr; + } + return file->second; + } + + std::shared_ptr> GetOrCreateFile( + const std::string& file_location) { + auto& file = (*files_)[file_location]; + if (!file) { + file = std::make_shared>(); + } + return file; + } + + std::shared_ptr files_ = std::make_shared(); }; } // namespace iceberg diff --git a/src/iceberg/test/puffin_reader_writer_test.cc b/src/iceberg/test/puffin_reader_writer_test.cc index c428f6747..8d610cee9 100644 --- a/src/iceberg/test/puffin_reader_writer_test.cc +++ b/src/iceberg/test/puffin_reader_writer_test.cc @@ -19,132 +19,60 @@ #include #include -#include #include +#include #include -#include +#include #include #include -#include "iceberg/file_io.h" #include "iceberg/puffin/file_metadata.h" #include "iceberg/puffin/puffin_reader.h" #include "iceberg/puffin/puffin_writer.h" #include "iceberg/test/matchers.h" +#include "iceberg/test/mock_io.h" namespace iceberg::puffin { namespace { -// Simple in-memory stream implementations for testing. +struct PuffinFile { + MockFileIO io; + std::string location = "memory://test.puffin"; -class MemoryOutputStream : public PositionOutputStream { - public: - explicit MemoryOutputStream(std::shared_ptr> buffer) - : buffer_(std::move(buffer)) {} - - Result Position() const override { - return static_cast(buffer_->size()); - } - - Status Write(std::span data) override { - buffer_->insert(buffer_->end(), data.begin(), data.end()); - return {}; - } - - Status Flush() override { return {}; } - Status Close() override { return {}; } - - private: - std::shared_ptr> buffer_; -}; - -class MemoryInputStream : public SeekableInputStream { - public: - explicit MemoryInputStream(std::shared_ptr> buffer) - : buffer_(std::move(buffer)) {} - - Result Position() const override { return position_; } - - Status Seek(int64_t position) override { - position_ = position; - return {}; - } - - Result Read(std::span out) override { - auto available = static_cast(buffer_->size()) - position_; - auto to_read = std::min(static_cast(out.size()), available); - if (to_read > 0) { - std::memcpy(out.data(), buffer_->data() + position_, to_read); - position_ += to_read; + std::unique_ptr Output() { + auto output_file = io.NewOutputFile(location); + EXPECT_THAT(output_file, IsOk()); + if (!output_file) { + return nullptr; } - return to_read; + return std::move(output_file.value()); } - Status ReadFully(int64_t position, std::span out) override { - if (position < 0 || position + static_cast(out.size()) > - static_cast(buffer_->size())) { - return Invalid("ReadFully out of bounds"); + std::unique_ptr Input() { + auto input_file = io.NewInputFile(location); + EXPECT_THAT(input_file, IsOk()); + if (!input_file) { + return nullptr; } - std::memcpy(out.data(), buffer_->data() + position, out.size()); - return {}; + return std::move(input_file.value()); } - Status Close() override { return {}; } - - private: - std::shared_ptr> buffer_; - int64_t position_ = 0; -}; - -class MemoryOutputFile : public OutputFile { - public: - explicit MemoryOutputFile(std::shared_ptr> buffer) - : buffer_(std::move(buffer)) {} - - std::string_view location() const override { return "memory://test.puffin"; } - - Result> Create() override { - return CreateOrOverwrite(); - } - - Result> CreateOrOverwrite() override { - buffer_->clear(); - return std::make_unique(buffer_); - } - - private: - std::shared_ptr> buffer_; -}; - -class MemoryInputFile : public InputFile { - public: - explicit MemoryInputFile(std::shared_ptr> buffer) - : buffer_(std::move(buffer)) {} - - std::string_view location() const override { return "memory://test.puffin"; } - - Result Size() const override { return static_cast(buffer_->size()); } - - Result> Open() override { - return std::make_unique(buffer_); + std::unique_ptr Input(int64_t file_size) { + auto input_file = io.NewInputFile(location, static_cast(file_size)); + EXPECT_THAT(input_file, IsOk()); + if (!input_file) { + return nullptr; + } + return std::move(input_file.value()); } - private: - std::shared_ptr> buffer_; -}; - -// Helper to create a shared buffer and output/input file pair. -struct MemoryFile { - std::shared_ptr> buffer = - std::make_shared>(); + std::vector& Data() { return io.FileData(location); } - std::unique_ptr output() { - return std::make_unique(buffer); - } + const std::vector& Data() const { return io.FileData(location); } - std::unique_ptr input() { return std::make_unique(buffer); } + void SetData(std::span bytes) { io.AddFile(location, bytes); } }; std::vector ToBytes(std::string_view str) { @@ -154,23 +82,17 @@ std::vector ToBytes(std::string_view str) { } // namespace -// ============================================================================ -// PuffinWriter Tests -// ============================================================================ - TEST(PuffinWriterTest, WriteEmptyFile) { - MemoryFile file; - ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.output())); + PuffinFile file; + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); ASSERT_THAT(writer->Finish(), IsOk()); - auto& data = *file.buffer; + auto& data = file.Data(); EXPECT_GE(data.size(), 20u); - // Header magic EXPECT_EQ(data[0], std::byte{0x50}); EXPECT_EQ(data[1], std::byte{0x46}); EXPECT_EQ(data[2], std::byte{0x41}); EXPECT_EQ(data[3], std::byte{0x31}); - // Footer end magic auto sz = data.size(); EXPECT_EQ(data[sz - 4], std::byte{0x50}); EXPECT_EQ(data[sz - 3], std::byte{0x46}); @@ -178,43 +100,61 @@ TEST(PuffinWriterTest, WriteEmptyFile) { EXPECT_EQ(data[sz - 1], std::byte{0x31}); EXPECT_TRUE(writer->written_blobs_metadata().empty()); - ICEBERG_UNWRAP_OR_FAIL(auto fsize, writer->file_size()); + ICEBERG_UNWRAP_OR_FAIL(auto fsize, writer->FileSize()); EXPECT_EQ(fsize, static_cast(data.size())); } TEST(PuffinWriterTest, WriterRejectsAfterFinish) { - MemoryFile file; - ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.output())); + PuffinFile file; + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); ASSERT_THAT(writer->Finish(), IsOk()); - // Double finish - EXPECT_THAT(writer->Finish(), IsError(ErrorKind::kInvalid)); + EXPECT_THAT(writer->Finish(), IsError(ErrorKind::kInvalidArgument)); - // Write after finish Blob blob{.type = "a", .snapshot_id = 1, .sequence_number = 0}; - EXPECT_THAT(writer->Write(blob), IsError(ErrorKind::kInvalid)); + EXPECT_THAT(writer->Write(blob), IsError(ErrorKind::kInvalidArgument)); } TEST(PuffinWriterTest, MakeRejectsNullOutput) { EXPECT_THAT(PuffinWriter::Make(nullptr), IsError(ErrorKind::kInvalidArgument)); } +TEST(PuffinWriterTest, MakeDoesNotOverwriteExistingFile) { + PuffinFile file; + file.Data().push_back(std::byte{0x42}); + + EXPECT_THAT(PuffinWriter::Make(file.Output()), IsError(ErrorKind::kAlreadyExists)); + ASSERT_EQ(file.Data().size(), 1); + EXPECT_EQ(file.Data().front(), std::byte{0x42}); +} + TEST(PuffinWriterTest, SizesBeforeFinishReturnError) { - MemoryFile file; - ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.output())); - EXPECT_THAT(writer->footer_size(), IsError(ErrorKind::kInvalid)); - EXPECT_THAT(writer->file_size(), IsError(ErrorKind::kInvalid)); + PuffinFile file; + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); + EXPECT_THAT(writer->FooterSize(), IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(writer->FileSize(), IsError(ErrorKind::kInvalidArgument)); } -// ============================================================================ -// Round-Trip Tests -// ============================================================================ +TEST(PuffinWriterTest, CloseImplicitlyFinishesFile) { + PuffinFile file; + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); + + ASSERT_THAT(writer->Close(), IsOk()); + EXPECT_THAT(writer->Close(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto fsize, writer->FileSize()); + EXPECT_EQ(fsize, static_cast(file.Data().size())); + + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.Input())); + ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); + EXPECT_TRUE(fm.blobs.empty()); +} TEST(PuffinRoundTripTest, SingleBlob) { - MemoryFile file; + PuffinFile file; { ICEBERG_UNWRAP_OR_FAIL(auto writer, - PuffinWriter::Make(file.output(), {{"created-by", "test"}})); + PuffinWriter::Make(file.Output(), {{"created-by", "test"}})); std::vector blob_data = {0x01, 0x02, 0x03, 0x04, 0x05}; ICEBERG_UNWRAP_OR_FAIL(auto meta, writer->Write(Blob{.type = "test-blob", .input_fields = {1, 2}, @@ -222,14 +162,14 @@ TEST(PuffinRoundTripTest, SingleBlob) { .sequence_number = 7, .data = blob_data})); EXPECT_EQ(meta.type, "test-blob"); - EXPECT_EQ(meta.offset, 4); // after header magic + EXPECT_EQ(meta.offset, 4); EXPECT_EQ(meta.length, 5); ASSERT_THAT(writer->Finish(), IsOk()); - ICEBERG_UNWRAP_OR_FAIL(auto fsize, writer->file_size()); + ICEBERG_UNWRAP_OR_FAIL(auto fsize, writer->FileSize()); EXPECT_GT(fsize, 0); } - ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.input())); + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.Input())); ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); ASSERT_EQ(fm.blobs.size(), 1); EXPECT_EQ(fm.blobs[0].type, "test-blob"); @@ -242,9 +182,9 @@ TEST(PuffinRoundTripTest, SingleBlob) { } TEST(PuffinRoundTripTest, MultipleBlobs) { - MemoryFile file; + PuffinFile file; { - ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.output())); + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); ICEBERG_UNWRAP_OR_FAIL(auto m1, writer->Write(Blob{.type = "first", .input_fields = {1}, .snapshot_id = 1, @@ -256,12 +196,12 @@ TEST(PuffinRoundTripTest, MultipleBlobs) { .sequence_number = 1, .data = {'d', 'e', 'f', 'g'}, .properties = {{"key", "val"}}})); - EXPECT_EQ(m2.offset, 7); // header(4) + first blob(3) + EXPECT_EQ(m2.offset, 7); EXPECT_EQ(m2.length, 4); ASSERT_THAT(writer->Finish(), IsOk()); } - ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.input())); + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.Input())); ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); ASSERT_EQ(fm.blobs.size(), 2); EXPECT_TRUE(fm.blobs[0].properties.empty()); @@ -274,11 +214,11 @@ TEST(PuffinRoundTripTest, MultipleBlobs) { } TEST(PuffinRoundTripTest, WithProperties) { - MemoryFile file; + PuffinFile file; { ICEBERG_UNWRAP_OR_FAIL( auto writer, - PuffinWriter::Make(file.output(), {{"created-by", "iceberg-cpp-test"}})); + PuffinWriter::Make(file.Output(), {{"created-by", "iceberg-cpp-test"}})); std::string text = "hello puffin"; std::vector blob_data(text.begin(), text.end()); ASSERT_THAT(writer->Write(Blob{.type = "text-blob", @@ -291,7 +231,7 @@ TEST(PuffinRoundTripTest, WithProperties) { ASSERT_THAT(writer->Finish(), IsOk()); } - ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.input())); + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.Input())); ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); EXPECT_EQ(fm.properties.at("created-by"), "iceberg-cpp-test"); ASSERT_EQ(fm.blobs.size(), 1); @@ -301,65 +241,59 @@ TEST(PuffinRoundTripTest, WithProperties) { EXPECT_EQ(blob_result.second, ToBytes("hello puffin")); } -// ============================================================================ -// PuffinReader Error Tests -// ============================================================================ - TEST(PuffinReaderTest, MakeRejectsNullInput) { EXPECT_THAT(PuffinReader::Make(nullptr), IsError(ErrorKind::kInvalidArgument)); } -TEST(PuffinReaderTest, InvalidMagic) { - auto buffer = std::make_shared>(16, std::byte{0x00}); - auto input = std::make_unique(buffer); - ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(std::move(input))); - EXPECT_THAT(reader->ReadFileMetadata(), IsError(ErrorKind::kInvalid)); -} +TEST(PuffinReaderTest, MakeUsesKnownFileSizeAndFooterSize) { + PuffinFile file; + int64_t footer_size = 0; + int64_t file_size = 0; + { + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); + ASSERT_THAT(writer->Finish(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(footer_size, writer->FooterSize()); + ICEBERG_UNWRAP_OR_FAIL(file_size, writer->FileSize()); + } -TEST(PuffinReaderTest, TruncatedFile) { - auto buffer = std::make_shared>(2, std::byte{0x50}); - auto input = std::make_unique(buffer); - ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(std::move(input))); - EXPECT_THAT(reader->ReadFileMetadata(), IsError(ErrorKind::kInvalid)); + ICEBERG_UNWRAP_OR_FAIL( + auto reader, PuffinReader::Make(file.Input(file_size), footer_size, file_size)); + ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); + EXPECT_TRUE(fm.blobs.empty()); } -TEST(PuffinReaderTest, InvalidBlobOffset) { - MemoryFile file; +TEST(PuffinReaderTest, KnownFooterSizeMismatchIsRejected) { + PuffinFile file; + int64_t footer_size = 0; + int64_t file_size = 0; { - ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.output())); + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); ASSERT_THAT(writer->Finish(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(footer_size, writer->FooterSize()); + ICEBERG_UNWRAP_OR_FAIL(file_size, writer->FileSize()); } - ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.input())); - BlobMetadata bad_meta{.type = "bad", - .snapshot_id = 1, - .sequence_number = 0, - .offset = 9999, - .length = 100}; - EXPECT_THAT(reader->ReadBlob(bad_meta), IsError(ErrorKind::kInvalid)); + ICEBERG_UNWRAP_OR_FAIL(auto reader, + PuffinReader::Make(file.Input(), footer_size - 1, file_size)); + EXPECT_THAT(reader->ReadFileMetadata(), IsError(ErrorKind::kInvalidArgument)); } TEST(PuffinReaderTest, UnknownFlagsRejected) { - // Build a valid puffin file then tamper with flags - MemoryFile file; + PuffinFile file; { - ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.output())); + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); ASSERT_THAT(writer->Finish(), IsOk()); } - // Set unknown flag bit in the footer struct (flags are at offset -8 from end) - auto& data = *file.buffer; - data[data.size() - 8] = std::byte{0x02}; // bit 1 is unknown + auto& data = file.Data(); + data[data.size() - 8] = std::byte{0x02}; - ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.input())); - EXPECT_THAT(reader->ReadFileMetadata(), IsError(ErrorKind::kInvalid)); + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.Input())); + EXPECT_THAT(reader->ReadFileMetadata(), IsError(ErrorKind::kInvalidArgument)); } -// ============================================================================ -// Java Binary Compatibility Tests -// ============================================================================ - -TEST(PuffinReaderTest, JavaEmptyPuffinCompatibility) { - auto buffer = std::make_shared>(std::vector{ +TEST(PuffinReaderTest, EmptyPuffinCompatibility) { + PuffinFile file; + std::vector data{ std::byte{0x50}, std::byte{0x46}, std::byte{0x41}, std::byte{0x31}, std::byte{0x50}, std::byte{0x46}, std::byte{0x41}, std::byte{0x31}, std::byte{0x7b}, std::byte{0x22}, std::byte{0x62}, std::byte{0x6c}, std::byte{0x6f}, std::byte{0x62}, std::byte{0x73}, @@ -367,10 +301,10 @@ TEST(PuffinReaderTest, JavaEmptyPuffinCompatibility) { std::byte{0x0c}, std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, std::byte{0x50}, std::byte{0x46}, std::byte{0x41}, std::byte{0x31}, - }); + }; + file.SetData(data); - auto input = std::make_unique(buffer); - ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(std::move(input))); + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.Input())); ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); EXPECT_TRUE(fm.blobs.empty()); EXPECT_TRUE(fm.properties.empty()); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 144a9e33a..064ec285a 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -188,6 +188,11 @@ class FileIO; class Reader; class Writer; +class InputFile; +class OutputFile; +class PositionOutputStream; +class SeekableInputStream; + /// \brief Row-based data structures. class ArrayLike; class MapLike;