-
Notifications
You must be signed in to change notification settings - Fork 4.1k
GH-50026: [C++][Parquet] SIMD-accelerate SBBF probe via branchless autovec #50030
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #include <array> | ||
| #include <cstdint> | ||
| #include <cstring> | ||
| #include <limits> | ||
|
|
@@ -34,7 +35,35 @@ | |
| #include "parquet/thrift_internal.h" | ||
| #include "parquet/xxhasher.h" | ||
|
|
||
| #if defined(ARROW_HAVE_AVX2) || defined(ARROW_HAVE_RUNTIME_AVX2) | ||
| # include "parquet/bloom_filter_avx2_internal.h" | ||
| #endif | ||
|
|
||
| #include "parquet/bloom_filter_block_impl_internal.h" | ||
|
|
||
| #include "arrow/util/dispatch_internal.h" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest adjusting the include order. Can we move line 44 to line 26? |
||
|
|
||
| namespace parquet { | ||
|
|
||
| namespace internal { | ||
| namespace { | ||
|
|
||
| using ::arrow::internal::DynamicDispatch; | ||
|
|
||
| struct FindHashBlockDynamicFunction { | ||
| using FunctionType = decltype(&FindHashBlockImpl); | ||
|
|
||
| static constexpr auto targets() { | ||
| return std::array{ | ||
| ARROW_DISPATCH_TARGET_NONE(&FindHashBlockImpl) // | ||
| ARROW_DISPATCH_TARGET_AVX2(&FindHashBlockAvx2) // | ||
| }; | ||
| } | ||
| }; | ||
|
|
||
| } // namespace | ||
| } // namespace internal | ||
|
|
||
| namespace { | ||
|
|
||
| constexpr int32_t kCiphertextLengthSize = 4; | ||
|
|
@@ -346,20 +375,18 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const { | |
| } | ||
|
|
||
| bool BlockSplitBloomFilter::FindHash(uint64_t hash) const { | ||
| // Probe kernels in bloom_filter_block_impl_internal.h and bloom_filter_avx2.cc both | ||
| // hard-code an 8-lane (256-bit) block. | ||
| static_assert(kBitsSetPerBlock == 8, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest to move this |
||
| "SBBF probe kernels assume 8 bits set per 256-bit block"); | ||
| const uint32_t bucket_index = | ||
| static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32); | ||
| const uint32_t key = static_cast<uint32_t>(hash); | ||
| const uint32_t* bitset32 = reinterpret_cast<const uint32_t*>(data_->data()); | ||
|
|
||
| for (int i = 0; i < kBitsSetPerBlock; ++i) { | ||
| // Calculate mask for key in the given bitset. | ||
| const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27); | ||
| if (ARROW_PREDICT_FALSE(0 == | ||
| (bitset32[kBitsSetPerBlock * bucket_index + i] & mask))) { | ||
| return false; | ||
| } | ||
| } | ||
| return true; | ||
| const uint32_t* block = bitset32 + kBitsSetPerBlock * bucket_index; | ||
| static ::arrow::internal::DynamicDispatch<internal::FindHashBlockDynamicFunction> | ||
| dispatch; | ||
| return dispatch(block, SALT, key); | ||
| } | ||
|
|
||
| void BlockSplitBloomFilter::InsertHashImpl(uint64_t hash) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #include <xsimd/xsimd.hpp> | ||
|
|
||
| #include "parquet/bloom_filter_avx2_internal.h" | ||
|
|
||
| namespace parquet::internal { | ||
|
|
||
| // Parquet SBBF probe (256-bit block, 8 SALT-derived bits per probe). Unrelated | ||
| // to cpp/src/arrow/acero/bloom_filter_avx2.cc -- Acero's blocked bloom filter | ||
| // is a different algorithm (64-bit block, ~4 bits per probe, in-memory only) | ||
| // and the two kernels are not interchangeable. See the Parquet spec for the | ||
| // SBBF on-disk layout this kernel must match. | ||
| // | ||
| // Spelled in xsimd rather than reusing the autovectorized body in | ||
| // bloom_filter_block_impl_internal.h: only clang lowers that body to a single vptest; | ||
| // gcc and MSVC emit a longer horizontal vpor reduction. | ||
| bool FindHashBlockAvx2(const uint32_t* block, const uint32_t* salt, uint32_t key) { | ||
| using batch = xsimd::batch<uint32_t>; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May xsimd generate avx512 here? I think we may need to use |
||
| const batch mask = | ||
| batch(uint32_t{1}) | ||
| << xsimd::bitwise_rshift<27>(batch(key) * batch::load_unaligned(salt)); | ||
|
pitrou marked this conversation as resolved.
|
||
| const batch miss = xsimd::bitwise_andnot(mask, batch::load_unaligned(block)); | ||
| // `miss != 0` (one extra vpcmpeqd) is deliberate: reinterpreting `miss` | ||
| // directly as a batch_bool would skip the compare but feed non-canonical | ||
| // lane values into batch_bool, which relies on xsimd's AVX2 backend | ||
| // lowering none() to a whole-register vptest. That lowering is not part | ||
| // of xsimd's documented contract. | ||
| return xsimd::none(miss != batch(uint32_t{0})); | ||
| } | ||
|
|
||
| } // namespace parquet::internal | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #pragma once | ||
|
|
||
| #include <cstdint> | ||
|
|
||
| #include "parquet/platform.h" | ||
|
|
||
| namespace parquet::internal { | ||
|
|
||
| PARQUET_EXPORT bool FindHashBlockAvx2(const uint32_t* block, const uint32_t* salt, | ||
| uint32_t key); | ||
|
|
||
| } // namespace parquet::internal |
|
dmatth1 marked this conversation as resolved.
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I recommend adding a newline. |
||
| #pragma once | ||
|
|
||
| #include <cstdint> | ||
|
|
||
| namespace parquet::internal { | ||
|
|
||
| inline bool FindHashBlockImpl(const uint32_t* block, const uint32_t* salt, uint32_t key) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use |
||
| constexpr int kBitsSetPerBlock = 8; | ||
| uint32_t miss = 0; | ||
| for (int i = 0; i < kBitsSetPerBlock; ++i) { | ||
| const uint32_t mask = static_cast<uint32_t>(1) << ((key * salt[i]) >> 27); | ||
| miss |= (~block[i] & mask); | ||
| } | ||
| return miss == 0; | ||
| } | ||
|
|
||
| } // namespace parquet::internal | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ | |
| #include "arrow/status.h" | ||
| #include "arrow/testing/gtest_util.h" | ||
| #include "arrow/testing/random.h" | ||
| #include "arrow/util/cpu_info.h" | ||
|
|
||
| #include "parquet/bloom_filter.h" | ||
| #include "parquet/exception.h" | ||
|
|
@@ -38,6 +39,14 @@ | |
| #include "parquet/types.h" | ||
| #include "parquet/xxhasher.h" | ||
|
|
||
| // Both dispatch targets included directly so the test can exercise the | ||
| // un-picked one too -- DynamicDispatch resolves once at static init. | ||
| #include "parquet/bloom_filter_block_impl_internal.h" | ||
|
|
||
| #if defined(ARROW_HAVE_RUNTIME_AVX2) | ||
| # include "parquet/bloom_filter_avx2_internal.h" | ||
| #endif | ||
|
|
||
| namespace parquet { | ||
| namespace test { | ||
|
|
||
|
|
@@ -434,5 +443,85 @@ TYPED_TEST(TestBatchBloomFilter, Basic) { | |
| AssertBufferEqual(*buffer, *batch_insert_buffer); | ||
| } | ||
|
|
||
| // Guards against silent drift between the baseline and AVX2 probe bodies -- | ||
| // DynamicDispatch only runs one of them per host. | ||
| #if defined(ARROW_HAVE_RUNTIME_AVX2) | ||
| namespace { | ||
|
|
||
| // Mirror of BlockSplitBloomFilter::kBitsSetPerBlock (private to the class). | ||
| constexpr int kBitsSetPerBlock = 8; | ||
|
Comment on lines
+451
to
+452
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, why not make it public in the class? This would avoid hard-coding it in multiple places (in |
||
|
|
||
| // Test-only SALT (matches the Parquet SBBF spec values used in | ||
| // bloom_filter.h). Kernel-vs-kernel agreement holds for any SALT, so this | ||
| // duplication is a contained test-side convenience, not a spec mirror. | ||
| constexpr uint32_t kProbeTestSalt[kBitsSetPerBlock] = { | ||
| 0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU, | ||
| 0x705495c7U, 0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U, | ||
| }; | ||
|
|
||
| inline void InsertIntoBlock(uint32_t* block, uint32_t key) { | ||
| for (int i = 0; i < kBitsSetPerBlock; ++i) { | ||
| block[i] |= uint32_t{1} << ((key * kProbeTestSalt[i]) >> 27); | ||
| } | ||
| } | ||
|
|
||
| void AssertKernelsAgree(const uint32_t* block, uint32_t key) { | ||
| const bool standard = internal::FindHashBlockImpl(block, kProbeTestSalt, key); | ||
| const bool avx2 = internal::FindHashBlockAvx2(block, kProbeTestSalt, key); | ||
| ASSERT_EQ(standard, avx2) << "dispatch targets diverged for key=0x" << std::hex << key; | ||
| } | ||
|
|
||
| } // namespace | ||
|
|
||
| class BloomFilterProbeKernel : public ::testing::Test { | ||
| protected: | ||
| void SetUp() override { | ||
| if (!::arrow::internal::CpuInfo::GetInstance()->IsSupported( | ||
| ::arrow::internal::CpuInfo::AVX2)) { | ||
| GTEST_SKIP() << "AVX2 not available at runtime"; | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| // Random-block fuzz: exercises the full bit lattice, catches reduction / | ||
| // operand-order bugs that don't depend on realistic fill density. | ||
| TEST_F(BloomFilterProbeKernel, AgreeOnRandomBlocks) { | ||
| std::mt19937_64 rng(0xC0FFEE); | ||
| constexpr int kNumTrials = 20000; | ||
| for (int trial = 0; trial < kNumTrials; ++trial) { | ||
| uint32_t block[kBitsSetPerBlock]; | ||
| for (uint32_t& word : block) { | ||
| word = static_cast<uint32_t>(rng()); | ||
| } | ||
| AssertKernelsAgree(block, static_cast<uint32_t>(rng())); | ||
| } | ||
| } | ||
|
|
||
| // Production-fill fuzz: blocks populated by the same SALT-derived insert the | ||
| // writer uses, then probed with both inserted keys (must match) and fresh | ||
| // keys (mostly miss). Catches bugs that only surface on real fill density. | ||
| TEST_F(BloomFilterProbeKernel, AgreeOnPopulatedBlocks) { | ||
| std::mt19937_64 rng(0xBABECAFE); | ||
| constexpr int kNumBlocks = 200; | ||
| constexpr int kKeysPerBlock = 6; // ~k inserts per 256-bit block, realistic FPP. | ||
| for (int b = 0; b < kNumBlocks; ++b) { | ||
| uint32_t block[kBitsSetPerBlock] = {0}; | ||
| std::vector<uint32_t> inserted; | ||
| inserted.reserve(kKeysPerBlock); | ||
| for (int k = 0; k < kKeysPerBlock; ++k) { | ||
| const uint32_t key = static_cast<uint32_t>(rng()); | ||
| InsertIntoBlock(block, key); | ||
| inserted.push_back(key); | ||
| } | ||
| for (uint32_t key : inserted) { | ||
| AssertKernelsAgree(block, key); | ||
| } | ||
| for (int q = 0; q < 50; ++q) { | ||
| AssertKernelsAgree(block, static_cast<uint32_t>(rng())); | ||
| } | ||
| } | ||
| } | ||
| #endif | ||
|
|
||
| } // namespace test | ||
| } // namespace parquet | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add
ARROW_HAVE_AVX2here? There is following code inbloom_filter.cc