diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 7be689f8d..18cf70bdb 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -175,7 +175,9 @@ set(ICEBERG_DATA_SOURCES deletes/roaring_position_bitmap.cc puffin/file_metadata.cc puffin/json_serde.cc - puffin/puffin_format.cc) + puffin/puffin_format.cc + puffin/puffin_reader.cc + puffin/puffin_writer.cc) set(ICEBERG_DATA_STATIC_BUILD_INTERFACE_LIBS) set(ICEBERG_DATA_SHARED_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 48b5d4250..483d8f796 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -157,6 +157,8 @@ iceberg_data_sources = files( 'puffin/file_metadata.cc', 'puffin/json_serde.cc', 'puffin/puffin_format.cc', + 'puffin/puffin_reader.cc', + 'puffin/puffin_writer.cc', ) # CRoaring does not export symbols, so on Windows it must diff --git a/src/iceberg/puffin/meson.build b/src/iceberg/puffin/meson.build index 7869d7b2c..7f30468db 100644 --- a/src/iceberg/puffin/meson.build +++ b/src/iceberg/puffin/meson.build @@ -16,6 +16,12 @@ # under the License. install_headers( - ['file_metadata.h', 'puffin_format.h', 'type_fwd.h'], + [ + 'file_metadata.h', + 'puffin_format.h', + 'puffin_reader.h', + 'puffin_writer.h', + 'type_fwd.h', + ], subdir: 'iceberg/puffin', ) diff --git a/src/iceberg/puffin/puffin_format.cc b/src/iceberg/puffin/puffin_format.cc index 88807d0ca..88d378f04 100644 --- a/src/iceberg/puffin/puffin_format.cc +++ b/src/iceberg/puffin/puffin_format.cc @@ -36,6 +36,18 @@ constexpr std::pair GetFlagPosition(PuffinFlag flag) { std::unreachable(); } +} // namespace + +bool IsFlagSet(std::span flags, PuffinFlag flag) { + auto [byte_num, bit_num] = GetFlagPosition(flag); + return (flags[byte_num] & (1 << bit_num)) != 0; +} + +void SetFlag(std::span flags, PuffinFlag flag) { + auto [byte_num, bit_num] = GetFlagPosition(flag); + flags[byte_num] |= (1 << bit_num); +} + // TODO(zhaoxuan1994): Move compression logic to a unified codec interface. Result> Compress(PuffinCompressionCodec codec, std::span input) { @@ -63,16 +75,4 @@ Result> Decompress(PuffinCompressionCodec codec, std::unreachable(); } -} // namespace - -bool IsFlagSet(std::span flags, PuffinFlag flag) { - auto [byte_num, bit_num] = GetFlagPosition(flag); - return (flags[byte_num] & (1 << bit_num)) != 0; -} - -void SetFlag(std::span flags, PuffinFlag flag) { - auto [byte_num, bit_num] = GetFlagPosition(flag); - flags[byte_num] |= (1 << bit_num); -} - } // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_format.h b/src/iceberg/puffin/puffin_format.h index e5ecf9003..b3b5f10de 100644 --- a/src/iceberg/puffin/puffin_format.h +++ b/src/iceberg/puffin/puffin_format.h @@ -23,8 +23,10 @@ /// Puffin file format constants and utilities. #include +#include #include #include +#include #include "iceberg/iceberg_data_export.h" #include "iceberg/puffin/file_metadata.h" @@ -66,4 +68,12 @@ ICEBERG_DATA_EXPORT bool IsFlagSet(std::span flags, PuffinFlag /// \brief Set a flag in the flags bytes. ICEBERG_DATA_EXPORT void SetFlag(std::span flags, PuffinFlag flag); +/// \brief Compress data using the specified codec. +ICEBERG_DATA_EXPORT Result> Compress( + PuffinCompressionCodec codec, std::span input); + +/// \brief Decompress data using the specified codec. +ICEBERG_DATA_EXPORT Result> Decompress( + PuffinCompressionCodec codec, std::span input); + } // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_reader.cc b/src/iceberg/puffin/puffin_reader.cc new file mode 100644 index 000000000..47ac58f13 --- /dev/null +++ b/src/iceberg/puffin/puffin_reader.cc @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_reader.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/file_io.h" +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +namespace { + +struct FooterInfo { + int32_t payload_size; + PuffinCompressionCodec compression; +}; + +Status CheckMagic(std::span data, int64_t offset = 0) { + ICEBERG_PRECHECK(offset >= 0, "Invalid file: magic offset {} is negative", offset); + auto offset_size = static_cast(offset); + ICEBERG_PRECHECK(offset_size <= data.size() && + data.size() - offset_size >= PuffinFormat::kMagicLength, + "Invalid file: buffer too small for magic at offset {}", offset); + auto* begin = reinterpret_cast(data.data() + offset_size); + ICEBERG_PRECHECK( + std::equal(PuffinFormat::kMagicV1.cbegin(), PuffinFormat::kMagicV1.cend(), begin), + "Invalid file: expected magic at offset {}, got [{:#04x}, {:#04x}, {:#04x}, " + "{:#04x}]", + offset, begin[0], begin[1], begin[2], begin[3]); + return {}; +} + +Status CheckUnknownFlags(std::span flags) { + constexpr uint8_t kKnownBitsMask = 0x01; + ICEBERG_PRECHECK( + (flags[0] & ~kKnownBitsMask) == 0 && flags[1] == 0 && flags[2] == 0 && + flags[3] == 0, + "Invalid file: unknown footer flags set [{:#04x}, {:#04x}, {:#04x}, {:#04x}]", + flags[0], flags[1], flags[2], flags[3]); + return {}; +} + +Result FooterPayloadSize(std::span footer_struct) { + ICEBERG_PRECHECK(footer_struct.size() >= PuffinFormat::kFooterStructLength, + "Invalid file: footer struct is too small"); + auto payload_size = ReadLittleEndian( + footer_struct.data() + PuffinFormat::kFooterStructPayloadSizeOffset); + ICEBERG_PRECHECK(payload_size >= 0, "Invalid file: negative payload size {}", + payload_size); + return payload_size; +} + +Result> DecodeFlags(std::span footer_struct) { + ICEBERG_PRECHECK(footer_struct.size() >= PuffinFormat::kFooterStructLength, + "Invalid file: footer struct is too small"); + std::array flags{}; + std::memcpy(flags.data(), footer_struct.data() + PuffinFormat::kFooterStructFlagsOffset, + flags.size()); + ICEBERG_RETURN_UNEXPECTED(CheckUnknownFlags(flags)); + return flags; +} + +PuffinCompressionCodec FooterCompressionCodec(std::span flags) { + if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) { + return PuffinFormat::kDefaultFooterCompressionCodec; + } + return PuffinCompressionCodec::kNone; +} + +Status CheckFooterSize(int64_t footer_size, int32_t payload_size) { + auto expected_footer_size = PuffinFormat::kFooterStartMagicLength + + static_cast(payload_size) + + PuffinFormat::kFooterStructLength; + ICEBERG_PRECHECK(footer_size == expected_footer_size, + "Invalid file: footer size {} does not match payload size {}", + footer_size, payload_size); + return {}; +} + +Result DecodeFooterInfo(std::span footer, + int64_t footer_size) { + ICEBERG_PRECHECK(footer_size >= PuffinFormat::kFooterStartMagicLength + + PuffinFormat::kFooterStructLength, + "Invalid file: footer size {} is too small", footer_size); + ICEBERG_PRECHECK(static_cast(footer_size) <= footer.size(), + "Invalid file: footer size {} exceeds buffer size {}", footer_size, + footer.size()); + + ICEBERG_RETURN_UNEXPECTED(CheckMagic(footer, PuffinFormat::kFooterStartMagicOffset)); + + auto footer_struct_offset = footer_size - PuffinFormat::kFooterStructLength; + std::span footer_struct(footer.data() + footer_struct_offset, + PuffinFormat::kFooterStructLength); + ICEBERG_RETURN_UNEXPECTED( + CheckMagic(footer_struct, PuffinFormat::kFooterStructMagicOffset)); + + ICEBERG_ASSIGN_OR_RAISE(auto payload_size, FooterPayloadSize(footer_struct)); + ICEBERG_RETURN_UNEXPECTED(CheckFooterSize(footer_size, payload_size)); + + ICEBERG_ASSIGN_OR_RAISE(auto flags, DecodeFlags(footer_struct)); + return FooterInfo{.payload_size = payload_size, + .compression = FooterCompressionCodec(flags)}; +} + +Result ParseFileMetadata(std::span payload, + PuffinCompressionCodec compression) { + std::vector decompressed; + if (compression != PuffinCompressionCodec::kNone) { + ICEBERG_ASSIGN_OR_RAISE(decompressed, Decompress(compression, payload)); + payload = decompressed; + } + + return FileMetadataFromJsonString( + std::string_view(reinterpret_cast(payload.data()), payload.size())); +} + +} // namespace + +PuffinReader::PuffinReader(std::unique_ptr stream, int64_t file_size, + std::optional known_footer_size) + : stream_(std::move(stream)), + file_size_(file_size), + known_footer_size_(known_footer_size) {} + +PuffinReader::~PuffinReader() = default; + +Result> PuffinReader::Make( + std::unique_ptr input_file, std::optional footer_size, + std::optional file_size) { + ICEBERG_PRECHECK(input_file, "Input file must not be null"); + int64_t resolved_file_size = 0; + if (file_size.has_value()) { + ICEBERG_PRECHECK(*file_size >= 0, "File size must not be negative: {}", *file_size); + resolved_file_size = *file_size; + } else { + ICEBERG_ASSIGN_OR_RAISE(resolved_file_size, input_file->Size()); + } + if (footer_size.has_value()) { + ICEBERG_PRECHECK(*footer_size > 0, "Footer size must be positive: {}", *footer_size); + ICEBERG_PRECHECK(*footer_size <= resolved_file_size - PuffinFormat::kMagicLength, + "Footer size {} exceeds file size {}", *footer_size, + resolved_file_size); + ICEBERG_PRECHECK(*footer_size <= std::numeric_limits::max(), + "Footer size {} is too large", *footer_size); + } + ICEBERG_ASSIGN_OR_RAISE(auto stream, input_file->Open()); + return std::unique_ptr( + new PuffinReader(std::move(stream), resolved_file_size, footer_size)); +} + +Result> PuffinReader::ReadBytes(int64_t offset, int64_t length) { + ICEBERG_PRECHECK(!closed_, "Reader already closed"); + ICEBERG_PRECHECK(offset >= 0, "Offset must not be negative: {}", offset); + ICEBERG_PRECHECK(length >= 0, "Length must not be negative: {}", length); + ICEBERG_PRECHECK(offset <= file_size_, "Offset {} exceeds file size {}", offset, + file_size_); + ICEBERG_PRECHECK(length <= file_size_ - offset, + "Length {} exceeds file size {} at offset {}", length, file_size_, + offset); + std::vector buf(length); + ICEBERG_RETURN_UNEXPECTED(stream_->ReadFully(offset, buf)); + return buf; +} + +Result PuffinReader::FooterSize() { + if (known_footer_size_.has_value()) { + return *known_footer_size_; + } + + ICEBERG_ASSIGN_OR_RAISE(auto footer_struct, + ReadBytes(file_size_ - PuffinFormat::kFooterStructLength, + PuffinFormat::kFooterStructLength)); + ICEBERG_RETURN_UNEXPECTED( + CheckMagic(footer_struct, PuffinFormat::kFooterStructMagicOffset)); + + ICEBERG_ASSIGN_OR_RAISE(auto payload_size, FooterPayloadSize(footer_struct)); + known_footer_size_ = PuffinFormat::kFooterStartMagicLength + + static_cast(payload_size) + + PuffinFormat::kFooterStructLength; + return *known_footer_size_; +} + +Result> PuffinReader::ReadFooter(int64_t footer_size) { + return ReadBytes(file_size_ - footer_size, footer_size); +} + +Result PuffinReader::ReadFileMetadata() { + ICEBERG_ASSIGN_OR_RAISE(auto header_bytes, ReadBytes(0, PuffinFormat::kMagicLength)); + ICEBERG_RETURN_UNEXPECTED(CheckMagic(header_bytes)); + + ICEBERG_ASSIGN_OR_RAISE(auto footer_size, FooterSize()); + ICEBERG_ASSIGN_OR_RAISE(auto footer, ReadFooter(footer_size)); + ICEBERG_ASSIGN_OR_RAISE(auto footer_info, DecodeFooterInfo(footer, footer_size)); + std::span payload_bytes( + footer.data() + PuffinFormat::kFooterStartMagicLength, footer_info.payload_size); + return ParseFileMetadata(payload_bytes, footer_info.compression); +} + +Result>> PuffinReader::ReadBlob( + const BlobMetadata& blob_metadata) { + ICEBERG_ASSIGN_OR_RAISE(auto raw_data, + ReadBytes(blob_metadata.offset, blob_metadata.length)); + + ICEBERG_ASSIGN_OR_RAISE( + auto codec, PuffinCompressionCodecFromName(blob_metadata.compression_codec)); + if (codec == PuffinCompressionCodec::kNone) { + return std::pair{blob_metadata, std::move(raw_data)}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto decompressed, Decompress(codec, raw_data)); + + return std::pair{blob_metadata, std::move(decompressed)}; +} + +Result>>> +PuffinReader::ReadAll(const std::vector& blobs) { + // Sort by offset for sequential I/O access pattern + std::vector sorted; + sorted.reserve(blobs.size()); + for (const auto& blob : blobs) { + sorted.push_back(&blob); + } + std::ranges::sort(sorted, + [](const auto* a, const auto* b) { return a->offset < b->offset; }); + + std::vector>> results; + results.reserve(blobs.size()); + for (const auto* blob : sorted) { + ICEBERG_ASSIGN_OR_RAISE(auto blob_pair, ReadBlob(*blob)); + results.push_back(std::move(blob_pair)); + } + return results; +} + +Status PuffinReader::Close() { + if (closed_) { + return {}; + } + ICEBERG_RETURN_UNEXPECTED(stream_->Close()); + closed_ = true; + return {}; +} + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_reader.h b/src/iceberg/puffin/puffin_reader.h new file mode 100644 index 000000000..3b805426b --- /dev/null +++ b/src/iceberg/puffin/puffin_reader.h @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/puffin/puffin_reader.h +/// Puffin file reader. + +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_data_export.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg::puffin { + +/// \brief Reader for Puffin files. +/// +/// Reads from an InputFile with seek support for efficient blob access. +class ICEBERG_DATA_EXPORT PuffinReader { + public: + /// \brief Create a PuffinReader for the given input file. + /// \param input_file The input file to read from. + /// \param footer_size Optional known footer size hint to avoid an extra seek. + /// \param file_size Optional known file size hint to avoid fetching size. + static Result> Make( + std::unique_ptr input_file, + std::optional footer_size = std::nullopt, + std::optional file_size = std::nullopt); + + ~PuffinReader(); + + /// \brief Read and return the file metadata from the footer. + Result ReadFileMetadata(); + + /// \brief Read a specific blob's data by its metadata. + /// \param blob_metadata The metadata describing the blob to read. + /// \return A pair of (BlobMetadata, decompressed data), or an error. + Result>> ReadBlob( + const BlobMetadata& blob_metadata); + + /// \brief Read all blobs described in the file metadata. + /// \return A vector of (BlobMetadata, decompressed data) pairs, or an error. + Result>>> ReadAll( + const std::vector& blobs); + + /// \brief Close the underlying input stream. + Status Close(); + + private: + PuffinReader(std::unique_ptr stream, int64_t file_size, + std::optional known_footer_size); + + Result> ReadBytes(int64_t offset, int64_t length); + Result FooterSize(); + Result> ReadFooter(int64_t footer_size); + + /// Opened input stream. + std::unique_ptr stream_; + /// Total file size. + int64_t file_size_; + /// Known footer size hint (avoids one seek if provided). + std::optional known_footer_size_; + /// Whether the reader has been closed. + bool closed_ = false; +}; + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_writer.cc b/src/iceberg/puffin/puffin_writer.cc new file mode 100644 index 000000000..db749117f --- /dev/null +++ b/src/iceberg/puffin/puffin_writer.cc @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_writer.h" + +#include +#include + +#include "iceberg/file_io.h" +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +PuffinWriter::PuffinWriter(std::unique_ptr stream, + std::unordered_map properties, + PuffinCompressionCodec default_codec, bool compress_footer) + : stream_(std::move(stream)), + properties_(std::move(properties)), + default_codec_(default_codec), + compress_footer_(compress_footer) {} + +PuffinWriter::~PuffinWriter() = default; + +Result> PuffinWriter::Make( + std::unique_ptr output_file, + std::unordered_map properties, + PuffinCompressionCodec default_codec, bool compress_footer) { + ICEBERG_PRECHECK(output_file, "Output file must not be null"); + ICEBERG_ASSIGN_OR_RAISE(auto stream, output_file->Create()); + return std::unique_ptr(new PuffinWriter( + std::move(stream), std::move(properties), default_codec, compress_footer)); +} + +Status PuffinWriter::WriteBytes(std::span data) { + return stream_->Write(data); +} + +Status PuffinWriter::WriteMagic() { + const auto& magic = PuffinFormat::kMagicV1; + return WriteBytes(std::span( + reinterpret_cast(magic.data()), magic.size())); +} + +Status PuffinWriter::WriteHeader() { + if (header_written_) return {}; + ICEBERG_RETURN_UNEXPECTED(WriteMagic()); + header_written_ = true; + return {}; +} + +Result PuffinWriter::Write(const Blob& blob) { + ICEBERG_PRECHECK(!finished_ && !footer_written_, "Writer already finished"); + ICEBERG_RETURN_UNEXPECTED(WriteHeader()); + + auto codec = blob.requested_compression.value_or(default_codec_); + std::span input_span( + reinterpret_cast(blob.data.data()), blob.data.size()); + std::vector compressed; + auto output_span = input_span; + if (codec != PuffinCompressionCodec::kNone) { + ICEBERG_ASSIGN_OR_RAISE(compressed, Compress(codec, input_span)); + output_span = std::span(compressed.data(), compressed.size()); + } + + ICEBERG_ASSIGN_OR_RAISE(auto offset, stream_->Position()); + ICEBERG_RETURN_UNEXPECTED(WriteBytes(output_span)); + auto length = static_cast(output_span.size()); + + auto codec_name = CodecName(codec); + BlobMetadata metadata{ + .type = blob.type, + .input_fields = blob.input_fields, + .snapshot_id = blob.snapshot_id, + .sequence_number = blob.sequence_number, + .offset = offset, + .length = length, + .compression_codec = std::string(codec_name), + .properties = blob.properties, + }; + written_blobs_metadata_.push_back(metadata); + return metadata; +} + +Status PuffinWriter::Finish() { + ICEBERG_PRECHECK(!finished_, "Writer already finished"); + ICEBERG_PRECHECK(!footer_written_, "Footer already written"); + + ICEBERG_RETURN_UNEXPECTED(WriteHeader()); + + FileMetadata file_metadata{ + .blobs = written_blobs_metadata_, + .properties = properties_, + }; + + auto footer_json = ToJsonString(file_metadata); + std::span footer_payload( + reinterpret_cast(footer_json.data()), footer_json.size()); + std::vector compressed_footer_payload; + + // Compress footer if requested + std::array flags{}; + if (compress_footer_) { + ICEBERG_ASSIGN_OR_RAISE( + compressed_footer_payload, + Compress(PuffinFormat::kDefaultFooterCompressionCodec, footer_payload)); + footer_payload = std::span(compressed_footer_payload.data(), + compressed_footer_payload.size()); + SetFlag(flags, PuffinFlag::kFooterPayloadCompressed); + } + ICEBERG_CHECK( + footer_payload.size() <= static_cast(std::numeric_limits::max()), + "Footer payload is too large: {}", footer_payload.size()); + auto payload_size = static_cast(footer_payload.size()); + + // Footer start magic + ICEBERG_ASSIGN_OR_RAISE(auto footer_start, stream_->Position()); + ICEBERG_RETURN_UNEXPECTED(WriteMagic()); + + // Footer payload + ICEBERG_RETURN_UNEXPECTED(WriteBytes(footer_payload)); + + // Footer struct: payload_size (4) + flags (4) + magic (4) + std::array size_buf{}; + WriteLittleEndian(payload_size, size_buf.data()); + ICEBERG_RETURN_UNEXPECTED(WriteBytes(size_buf)); + + // Flags + ICEBERG_RETURN_UNEXPECTED(WriteBytes(std::span( + reinterpret_cast(flags.data()), flags.size()))); + + // Footer end magic + ICEBERG_RETURN_UNEXPECTED(WriteMagic()); + + ICEBERG_ASSIGN_OR_RAISE(auto end_pos, stream_->Position()); + footer_size_ = end_pos - footer_start; + footer_written_ = true; + ICEBERG_RETURN_UNEXPECTED(stream_->Flush()); + ICEBERG_RETURN_UNEXPECTED(stream_->Close()); + ICEBERG_ASSIGN_OR_RAISE(file_size_, stream_->Position()); + finished_ = true; + return {}; +} + +Status PuffinWriter::Close() { + if (finished_) { + return {}; + } + return Finish(); +} + +const std::vector& PuffinWriter::written_blobs_metadata() const { + return written_blobs_metadata_; +} + +Result PuffinWriter::FooterSize() const { + ICEBERG_PRECHECK(footer_written_, "Footer not written yet"); + return footer_size_; +} + +Result PuffinWriter::FileSize() const { + ICEBERG_PRECHECK(finished_, "Writer not finished yet"); + return file_size_; +} + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_writer.h b/src/iceberg/puffin/puffin_writer.h new file mode 100644 index 000000000..2a7984091 --- /dev/null +++ b/src/iceberg/puffin/puffin_writer.h @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/puffin/puffin_writer.h +/// Puffin file writer. + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_data_export.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg::puffin { + +/// \brief Writer for Puffin files. +/// +/// Writes blobs and footer to an OutputFile stream. +class ICEBERG_DATA_EXPORT PuffinWriter { + public: + /// \brief Create a PuffinWriter for the given output file. + /// \param output_file The output file to write to. + /// \param properties File-level properties to include in the footer. + /// \param default_codec Default compression codec for blobs. + /// \param compress_footer Whether to compress the footer payload. + static Result> Make( + std::unique_ptr output_file, + std::unordered_map properties = {}, + PuffinCompressionCodec default_codec = PuffinCompressionCodec::kNone, + bool compress_footer = false); + + ~PuffinWriter(); + + /// \brief Write a blob and return its metadata. + Result Write(const Blob& blob); + + /// \brief Finalize the file by writing the footer and closing the stream. + Status Finish(); + + /// \brief Close the writer, finalizing the file if needed. + Status Close(); + + /// \brief Get metadata for all blobs written so far. + const std::vector& written_blobs_metadata() const; + + /// \brief Get the footer size. Returns error if the footer has not been written. + Result FooterSize() const; + + /// \brief Get the total file size. Returns error if Finish() has not succeeded. + Result FileSize() const; + + private: + PuffinWriter(std::unique_ptr stream, + std::unordered_map properties, + PuffinCompressionCodec default_codec, bool compress_footer); + + Status WriteBytes(std::span data); + Status WriteHeader(); + Status WriteMagic(); + + /// Output stream. + std::unique_ptr stream_; + /// File-level properties to include in the footer. + std::unordered_map properties_; + /// Default compression codec for blobs without explicit compression. + const PuffinCompressionCodec default_codec_; + /// Whether to compress the footer payload. + const bool compress_footer_; + /// Metadata for all blobs written so far. + std::vector written_blobs_metadata_; + /// Whether the header magic has been written. + bool header_written_ = false; + /// Whether the footer has been written. + bool footer_written_ = false; + /// Whether Finish() has succeeded. + bool finished_ = false; + /// Footer size, set after the footer is written. + int64_t footer_size_ = -1; + /// Total file size, set after Finish(). + int64_t file_size_ = -1; +}; + +} // namespace iceberg::puffin diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index d9059c567..997d18354 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -146,7 +146,8 @@ add_iceberg_test(puffin_test USE_DATA SOURCES puffin_format_test.cc - puffin_json_test.cc) + puffin_json_test.cc + puffin_reader_writer_test.cc) if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(avro_test diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index e7f6165c9..b21a264b1 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -109,7 +109,11 @@ iceberg_tests = { 'use_data': true, }, 'puffin_test': { - 'sources': files('puffin_format_test.cc', 'puffin_json_test.cc'), + 'sources': files( + 'puffin_format_test.cc', + 'puffin_json_test.cc', + 'puffin_reader_writer_test.cc', + ), 'use_data': true, }, } diff --git a/src/iceberg/test/mock_io.h b/src/iceberg/test/mock_io.h index c9f38e505..f643a852b 100644 --- a/src/iceberg/test/mock_io.h +++ b/src/iceberg/test/mock_io.h @@ -19,10 +19,25 @@ #pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + #include #include #include "iceberg/file_io.h" +#include "iceberg/util/macros.h" namespace iceberg { @@ -31,12 +46,214 @@ class MockFileIO : public FileIO { MockFileIO() = default; ~MockFileIO() override = default; + Result> NewInputFile(std::string file_location) override { + auto file = FindFile(file_location); + if (!file) { + return NotFound("File does not exist: {}", file_location); + } + return std::make_unique(std::move(file_location), std::move(file), + std::nullopt); + } + + Result> NewInputFile(std::string file_location, + size_t length) override { + if (length > static_cast(std::numeric_limits::max())) { + return InvalidArgument("File length {} exceeds int64_t max", length); + } + auto file = FindFile(file_location); + if (!file) { + return NotFound("File does not exist: {}", file_location); + } + return std::make_unique(std::move(file_location), std::move(file), + static_cast(length)); + } + + Result> NewOutputFile(std::string file_location) override { + return std::make_unique(files_, std::move(file_location)); + } + + void AddFile(std::string file_location, std::span data) { + files_->insert_or_assign( + std::move(file_location), + std::make_shared>(data.begin(), data.end())); + } + + void AddFile(std::string file_location, std::string_view data) { + AddFile(std::move(file_location), + std::as_bytes(std::span(data.data(), data.size()))); + } + + std::vector& FileData(const std::string& file_location) { + return *GetOrCreateFile(file_location); + } + + const std::vector& FileData(const std::string& file_location) const { + return *files_->at(file_location); + } + MOCK_METHOD((Result), ReadFile, (const std::string&, std::optional), (override)); MOCK_METHOD(Status, WriteFile, (const std::string&, std::string_view), (override)); MOCK_METHOD(Status, DeleteFile, (const std::string&), (override)); + + private: + using FileMap = + std::unordered_map>>; + + class InMemoryInputStream : public SeekableInputStream { + public: + explicit InMemoryInputStream(std::shared_ptr> data) + : data_(std::move(data)) {} + + Result Position() const override { return position_; } + + Status Seek(int64_t position) override { + ICEBERG_PRECHECK(!closed_, "Input stream is closed"); + ICEBERG_PRECHECK(position >= 0, "Position must not be negative: {}", position); + position_ = position; + return {}; + } + + Result Read(std::span out) override { + ICEBERG_PRECHECK(!closed_, "Input stream is closed"); + auto file_size = static_cast(data_->size()); + ICEBERG_PRECHECK(position_ <= file_size, "Position {} exceeds file size {}", + position_, file_size); + auto bytes_to_read = + std::min(static_cast(out.size()), file_size - position_); + if (bytes_to_read > 0) { + std::memcpy(out.data(), data_->data() + static_cast(position_), + static_cast(bytes_to_read)); + position_ += bytes_to_read; + } + return bytes_to_read; + } + + Status ReadFully(int64_t position, std::span out) override { + ICEBERG_PRECHECK(!closed_, "Input stream is closed"); + ICEBERG_PRECHECK(position >= 0, "Position must not be negative: {}", position); + auto file_size = static_cast(data_->size()); + ICEBERG_PRECHECK(static_cast(out.size()) <= file_size - position, + "Read out of bounds: offset {} + length {} exceeds file size {}", + position, out.size(), file_size); + if (!out.empty()) { + std::memcpy(out.data(), data_->data() + static_cast(position), + out.size()); + } + return {}; + } + + Status Close() override { + closed_ = true; + return {}; + } + + private: + std::shared_ptr> data_; + int64_t position_ = 0; + bool closed_ = false; + }; + + class InMemoryOutputStream : public PositionOutputStream { + public: + explicit InMemoryOutputStream(std::shared_ptr> data) + : data_(std::move(data)) {} + + Result Position() const override { + return static_cast(data_->size()); + } + + Status Write(std::span data) override { + ICEBERG_PRECHECK(!closed_, "Output stream is closed"); + data_->insert(data_->end(), data.begin(), data.end()); + return {}; + } + + Status Flush() override { + ICEBERG_PRECHECK(!closed_, "Output stream is closed"); + return {}; + } + + Status Close() override { + closed_ = true; + return {}; + } + + private: + std::shared_ptr> data_; + bool closed_ = false; + }; + + class InMemoryInputFile : public InputFile { + public: + InMemoryInputFile(std::string location, std::shared_ptr> data, + std::optional length) + : location_(std::move(location)), data_(std::move(data)), length_(length) {} + + std::string_view location() const override { return location_; } + + Result Size() const override { + return length_.value_or(static_cast(data_->size())); + } + + Result> Open() override { + return std::make_unique(data_); + } + + private: + std::string location_; + std::shared_ptr> data_; + std::optional length_; + }; + + class InMemoryOutputFile : public OutputFile { + public: + InMemoryOutputFile(std::shared_ptr files, std::string location) + : files_(std::move(files)), location_(std::move(location)) {} + + std::string_view location() const override { return location_; } + + Result> Create() override { + if (files_->contains(location_)) { + return AlreadyExists("File already exists: {}", location_); + } + auto file = std::make_shared>(); + files_->emplace(location_, file); + return std::make_unique(std::move(file)); + } + + Result> CreateOrOverwrite() override { + auto file = std::make_shared>(); + files_->insert_or_assign(location_, file); + return std::make_unique(std::move(file)); + } + + private: + std::shared_ptr files_; + std::string location_; + }; + + std::shared_ptr> FindFile( + const std::string& file_location) const { + auto file = files_->find(file_location); + if (file == files_->end()) { + return nullptr; + } + return file->second; + } + + std::shared_ptr> GetOrCreateFile( + const std::string& file_location) { + auto& file = (*files_)[file_location]; + if (!file) { + file = std::make_shared>(); + } + return file; + } + + std::shared_ptr files_ = std::make_shared(); }; } // namespace iceberg diff --git a/src/iceberg/test/puffin_reader_writer_test.cc b/src/iceberg/test/puffin_reader_writer_test.cc new file mode 100644 index 000000000..8d610cee9 --- /dev/null +++ b/src/iceberg/test/puffin_reader_writer_test.cc @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/puffin/puffin_reader.h" +#include "iceberg/puffin/puffin_writer.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/mock_io.h" + +namespace iceberg::puffin { + +namespace { + +struct PuffinFile { + MockFileIO io; + std::string location = "memory://test.puffin"; + + std::unique_ptr Output() { + auto output_file = io.NewOutputFile(location); + EXPECT_THAT(output_file, IsOk()); + if (!output_file) { + return nullptr; + } + return std::move(output_file.value()); + } + + std::unique_ptr Input() { + auto input_file = io.NewInputFile(location); + EXPECT_THAT(input_file, IsOk()); + if (!input_file) { + return nullptr; + } + return std::move(input_file.value()); + } + + std::unique_ptr Input(int64_t file_size) { + auto input_file = io.NewInputFile(location, static_cast(file_size)); + EXPECT_THAT(input_file, IsOk()); + if (!input_file) { + return nullptr; + } + return std::move(input_file.value()); + } + + std::vector& Data() { return io.FileData(location); } + + const std::vector& Data() const { return io.FileData(location); } + + void SetData(std::span bytes) { io.AddFile(location, bytes); } +}; + +std::vector ToBytes(std::string_view str) { + return {reinterpret_cast(str.data()), + reinterpret_cast(str.data() + str.size())}; +} + +} // namespace + +TEST(PuffinWriterTest, WriteEmptyFile) { + PuffinFile file; + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); + ASSERT_THAT(writer->Finish(), IsOk()); + + auto& data = file.Data(); + EXPECT_GE(data.size(), 20u); + EXPECT_EQ(data[0], std::byte{0x50}); + EXPECT_EQ(data[1], std::byte{0x46}); + EXPECT_EQ(data[2], std::byte{0x41}); + EXPECT_EQ(data[3], std::byte{0x31}); + auto sz = data.size(); + EXPECT_EQ(data[sz - 4], std::byte{0x50}); + EXPECT_EQ(data[sz - 3], std::byte{0x46}); + EXPECT_EQ(data[sz - 2], std::byte{0x41}); + EXPECT_EQ(data[sz - 1], std::byte{0x31}); + + EXPECT_TRUE(writer->written_blobs_metadata().empty()); + ICEBERG_UNWRAP_OR_FAIL(auto fsize, writer->FileSize()); + EXPECT_EQ(fsize, static_cast(data.size())); +} + +TEST(PuffinWriterTest, WriterRejectsAfterFinish) { + PuffinFile file; + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); + ASSERT_THAT(writer->Finish(), IsOk()); + + EXPECT_THAT(writer->Finish(), IsError(ErrorKind::kInvalidArgument)); + + Blob blob{.type = "a", .snapshot_id = 1, .sequence_number = 0}; + EXPECT_THAT(writer->Write(blob), IsError(ErrorKind::kInvalidArgument)); +} + +TEST(PuffinWriterTest, MakeRejectsNullOutput) { + EXPECT_THAT(PuffinWriter::Make(nullptr), IsError(ErrorKind::kInvalidArgument)); +} + +TEST(PuffinWriterTest, MakeDoesNotOverwriteExistingFile) { + PuffinFile file; + file.Data().push_back(std::byte{0x42}); + + EXPECT_THAT(PuffinWriter::Make(file.Output()), IsError(ErrorKind::kAlreadyExists)); + ASSERT_EQ(file.Data().size(), 1); + EXPECT_EQ(file.Data().front(), std::byte{0x42}); +} + +TEST(PuffinWriterTest, SizesBeforeFinishReturnError) { + PuffinFile file; + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); + EXPECT_THAT(writer->FooterSize(), IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(writer->FileSize(), IsError(ErrorKind::kInvalidArgument)); +} + +TEST(PuffinWriterTest, CloseImplicitlyFinishesFile) { + PuffinFile file; + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); + + ASSERT_THAT(writer->Close(), IsOk()); + EXPECT_THAT(writer->Close(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto fsize, writer->FileSize()); + EXPECT_EQ(fsize, static_cast(file.Data().size())); + + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.Input())); + ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); + EXPECT_TRUE(fm.blobs.empty()); +} + +TEST(PuffinRoundTripTest, SingleBlob) { + PuffinFile file; + { + ICEBERG_UNWRAP_OR_FAIL(auto writer, + PuffinWriter::Make(file.Output(), {{"created-by", "test"}})); + std::vector blob_data = {0x01, 0x02, 0x03, 0x04, 0x05}; + ICEBERG_UNWRAP_OR_FAIL(auto meta, writer->Write(Blob{.type = "test-blob", + .input_fields = {1, 2}, + .snapshot_id = 42, + .sequence_number = 7, + .data = blob_data})); + EXPECT_EQ(meta.type, "test-blob"); + EXPECT_EQ(meta.offset, 4); + EXPECT_EQ(meta.length, 5); + ASSERT_THAT(writer->Finish(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto fsize, writer->FileSize()); + EXPECT_GT(fsize, 0); + } + + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.Input())); + ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); + ASSERT_EQ(fm.blobs.size(), 1); + EXPECT_EQ(fm.blobs[0].type, "test-blob"); + EXPECT_EQ(fm.properties.at("created-by"), "test"); + + ICEBERG_UNWRAP_OR_FAIL(auto blob_result, reader->ReadBlob(fm.blobs[0])); + std::vector expected = {std::byte{0x01}, std::byte{0x02}, std::byte{0x03}, + std::byte{0x04}, std::byte{0x05}}; + EXPECT_EQ(blob_result.second, expected); +} + +TEST(PuffinRoundTripTest, MultipleBlobs) { + PuffinFile file; + { + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); + ICEBERG_UNWRAP_OR_FAIL(auto m1, writer->Write(Blob{.type = "first", + .input_fields = {1}, + .snapshot_id = 1, + .sequence_number = 0, + .data = {'a', 'b', 'c'}})); + ICEBERG_UNWRAP_OR_FAIL(auto m2, writer->Write(Blob{.type = "second", + .input_fields = {2}, + .snapshot_id = 2, + .sequence_number = 1, + .data = {'d', 'e', 'f', 'g'}, + .properties = {{"key", "val"}}})); + EXPECT_EQ(m2.offset, 7); + EXPECT_EQ(m2.length, 4); + ASSERT_THAT(writer->Finish(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.Input())); + ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); + ASSERT_EQ(fm.blobs.size(), 2); + EXPECT_TRUE(fm.blobs[0].properties.empty()); + EXPECT_EQ(fm.blobs[1].properties.at("key"), "val"); + + ICEBERG_UNWRAP_OR_FAIL(auto all, reader->ReadAll(fm.blobs)); + ASSERT_EQ(all.size(), 2); + EXPECT_EQ(all[0].second, ToBytes("abc")); + EXPECT_EQ(all[1].second, ToBytes("defg")); +} + +TEST(PuffinRoundTripTest, WithProperties) { + PuffinFile file; + { + ICEBERG_UNWRAP_OR_FAIL( + auto writer, + PuffinWriter::Make(file.Output(), {{"created-by", "iceberg-cpp-test"}})); + std::string text = "hello puffin"; + std::vector blob_data(text.begin(), text.end()); + ASSERT_THAT(writer->Write(Blob{.type = "text-blob", + .input_fields = {1}, + .snapshot_id = 100, + .sequence_number = 5, + .data = blob_data, + .properties = {{"encoding", "utf-8"}}}), + IsOk()); + ASSERT_THAT(writer->Finish(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.Input())); + ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); + EXPECT_EQ(fm.properties.at("created-by"), "iceberg-cpp-test"); + ASSERT_EQ(fm.blobs.size(), 1); + EXPECT_EQ(fm.blobs[0].properties.at("encoding"), "utf-8"); + + ICEBERG_UNWRAP_OR_FAIL(auto blob_result, reader->ReadBlob(fm.blobs[0])); + EXPECT_EQ(blob_result.second, ToBytes("hello puffin")); +} + +TEST(PuffinReaderTest, MakeRejectsNullInput) { + EXPECT_THAT(PuffinReader::Make(nullptr), IsError(ErrorKind::kInvalidArgument)); +} + +TEST(PuffinReaderTest, MakeUsesKnownFileSizeAndFooterSize) { + PuffinFile file; + int64_t footer_size = 0; + int64_t file_size = 0; + { + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); + ASSERT_THAT(writer->Finish(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(footer_size, writer->FooterSize()); + ICEBERG_UNWRAP_OR_FAIL(file_size, writer->FileSize()); + } + + ICEBERG_UNWRAP_OR_FAIL( + auto reader, PuffinReader::Make(file.Input(file_size), footer_size, file_size)); + ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); + EXPECT_TRUE(fm.blobs.empty()); +} + +TEST(PuffinReaderTest, KnownFooterSizeMismatchIsRejected) { + PuffinFile file; + int64_t footer_size = 0; + int64_t file_size = 0; + { + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); + ASSERT_THAT(writer->Finish(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(footer_size, writer->FooterSize()); + ICEBERG_UNWRAP_OR_FAIL(file_size, writer->FileSize()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto reader, + PuffinReader::Make(file.Input(), footer_size - 1, file_size)); + EXPECT_THAT(reader->ReadFileMetadata(), IsError(ErrorKind::kInvalidArgument)); +} + +TEST(PuffinReaderTest, UnknownFlagsRejected) { + PuffinFile file; + { + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(file.Output())); + ASSERT_THAT(writer->Finish(), IsOk()); + } + auto& data = file.Data(); + data[data.size() - 8] = std::byte{0x02}; + + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.Input())); + EXPECT_THAT(reader->ReadFileMetadata(), IsError(ErrorKind::kInvalidArgument)); +} + +TEST(PuffinReaderTest, EmptyPuffinCompatibility) { + PuffinFile file; + std::vector data{ + std::byte{0x50}, std::byte{0x46}, std::byte{0x41}, std::byte{0x31}, std::byte{0x50}, + std::byte{0x46}, std::byte{0x41}, std::byte{0x31}, std::byte{0x7b}, std::byte{0x22}, + std::byte{0x62}, std::byte{0x6c}, std::byte{0x6f}, std::byte{0x62}, std::byte{0x73}, + std::byte{0x22}, std::byte{0x3a}, std::byte{0x5b}, std::byte{0x5d}, std::byte{0x7d}, + std::byte{0x0c}, std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, + std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, std::byte{0x50}, std::byte{0x46}, + std::byte{0x41}, std::byte{0x31}, + }; + file.SetData(data); + + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(file.Input())); + ICEBERG_UNWRAP_OR_FAIL(auto fm, reader->ReadFileMetadata()); + EXPECT_TRUE(fm.blobs.empty()); + EXPECT_TRUE(fm.properties.empty()); +} + +} // namespace iceberg::puffin diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 144a9e33a..064ec285a 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -188,6 +188,11 @@ class FileIO; class Reader; class Writer; +class InputFile; +class OutputFile; +class PositionOutputStream; +class SeekableInputStream; + /// \brief Row-based data structures. class ArrayLike; class MapLike;