Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,11 @@ set(PARQUET_SRCS

if(ARROW_HAVE_RUNTIME_AVX2)

@HuaHuaY HuaHuaY Jun 9, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add ARROW_HAVE_AVX2 here? There is following code in bloom_filter.cc

#if defined(ARROW_HAVE_AVX2) || defined(ARROW_HAVE_RUNTIME_AVX2)
#  include "parquet/bloom_filter_avx2_internal.h"
#endif

# AVX2 is used as a proxy for BMI2.
list(APPEND PARQUET_SRCS level_comparison_avx2.cc level_conversion_bmi2.cc)
list(APPEND
PARQUET_SRCS
level_comparison_avx2.cc
level_conversion_bmi2.cc
bloom_filter_avx2.cc)
# We need CMAKE_CXX_FLAGS_RELEASE here to prevent the one-definition-rule
# violation with -DCMAKE_BUILD_TYPE=MinSizeRel. CMAKE_CXX_FLAGS_RELEASE
# will force inlining as much as possible.
Expand All @@ -205,8 +209,8 @@ if(ARROW_HAVE_RUNTIME_AVX2)
separate_arguments(RELEASE_FLAGS NATIVE_COMMAND "${CMAKE_CXX_FLAGS_RELEASE}")
list(APPEND AVX2_FLAGS ${RELEASE_FLAGS})
endif()
set_source_files_properties(level_comparison_avx2.cc PROPERTIES COMPILE_OPTIONS
"${AVX2_FLAGS}")
set_source_files_properties(level_comparison_avx2.cc bloom_filter_avx2.cc
PROPERTIES COMPILE_OPTIONS "${AVX2_FLAGS}")
# WARNING: DO NOT BLINDLY COPY THIS CODE FOR OTHER BMI2 USE CASES.
# This code is always guarded by runtime dispatch which verifies
# BMI2 is present. For a very small number of CPUs AVX2 does not
Expand Down
47 changes: 37 additions & 10 deletions cpp/src/parquet/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <array>
#include <cstdint>
#include <cstring>
#include <limits>
Expand All @@ -34,7 +35,35 @@
#include "parquet/thrift_internal.h"
#include "parquet/xxhasher.h"

#if defined(ARROW_HAVE_AVX2) || defined(ARROW_HAVE_RUNTIME_AVX2)
# include "parquet/bloom_filter_avx2_internal.h"
#endif

#include "parquet/bloom_filter_block_impl_internal.h"

#include "arrow/util/dispatch_internal.h"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adjusting the include order. Can we move line 44 to line 26?


namespace parquet {

namespace internal {
namespace {

using ::arrow::internal::DynamicDispatch;

struct FindHashBlockDynamicFunction {
using FunctionType = decltype(&FindHashBlockImpl);

static constexpr auto targets() {
return std::array{
ARROW_DISPATCH_TARGET_NONE(&FindHashBlockImpl) //
ARROW_DISPATCH_TARGET_AVX2(&FindHashBlockAvx2) //
};
}
};

} // namespace
} // namespace internal

namespace {

constexpr int32_t kCiphertextLengthSize = 4;
Expand Down Expand Up @@ -346,20 +375,18 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const {
}

bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
// Probe kernels in bloom_filter_block_impl_internal.h and bloom_filter_avx2.cc both
// hard-code an 8-lane (256-bit) block.
static_assert(kBitsSetPerBlock == 8,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to move this static_assert to block_filter_avx2.cc.

"SBBF probe kernels assume 8 bits set per 256-bit block");
const uint32_t bucket_index =
static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32);
const uint32_t key = static_cast<uint32_t>(hash);
const uint32_t* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());

for (int i = 0; i < kBitsSetPerBlock; ++i) {
// Calculate mask for key in the given bitset.
const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
if (ARROW_PREDICT_FALSE(0 ==
(bitset32[kBitsSetPerBlock * bucket_index + i] & mask))) {
return false;
}
}
return true;
const uint32_t* block = bitset32 + kBitsSetPerBlock * bucket_index;
static ::arrow::internal::DynamicDispatch<internal::FindHashBlockDynamicFunction>
dispatch;
return dispatch(block, SALT, key);
}

void BlockSplitBloomFilter::InsertHashImpl(uint64_t hash) {
Expand Down
47 changes: 47 additions & 0 deletions cpp/src/parquet/bloom_filter_avx2.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <xsimd/xsimd.hpp>

#include "parquet/bloom_filter_avx2_internal.h"

namespace parquet::internal {

// Parquet SBBF probe (256-bit block, 8 SALT-derived bits per probe). Unrelated
// to cpp/src/arrow/acero/bloom_filter_avx2.cc -- Acero's blocked bloom filter
// is a different algorithm (64-bit block, ~4 bits per probe, in-memory only)
// and the two kernels are not interchangeable. See the Parquet spec for the
// SBBF on-disk layout this kernel must match.
//
// Spelled in xsimd rather than reusing the autovectorized body in
// bloom_filter_block_impl_internal.h: only clang lowers that body to a single vptest;
// gcc and MSVC emit a longer horizontal vpor reduction.
bool FindHashBlockAvx2(const uint32_t* block, const uint32_t* salt, uint32_t key) {
using batch = xsimd::batch<uint32_t>;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May xsimd generate avx512 here? I think we may need to use xsimd::batch<uint32_t, xsimd::avx2>.

const batch mask =
batch(uint32_t{1})
<< xsimd::bitwise_rshift<27>(batch(key) * batch::load_unaligned(salt));
Comment thread
pitrou marked this conversation as resolved.
const batch miss = xsimd::bitwise_andnot(mask, batch::load_unaligned(block));
// `miss != 0` (one extra vpcmpeqd) is deliberate: reinterpreting `miss`
// directly as a batch_bool would skip the compare but feed non-canonical
// lane values into batch_bool, which relies on xsimd's AVX2 backend
// lowering none() to a whole-register vptest. That lowering is not part
// of xsimd's documented contract.
return xsimd::none(miss != batch(uint32_t{0}));
}

} // namespace parquet::internal
29 changes: 29 additions & 0 deletions cpp/src/parquet/bloom_filter_avx2_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>

#include "parquet/platform.h"

namespace parquet::internal {

PARQUET_EXPORT bool FindHashBlockAvx2(const uint32_t* block, const uint32_t* salt,
uint32_t key);

} // namespace parquet::internal
33 changes: 33 additions & 0 deletions cpp/src/parquet/bloom_filter_block_impl_internal.h
Comment thread
dmatth1 marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend adding a newline.

#pragma once

#include <cstdint>

namespace parquet::internal {

inline bool FindHashBlockImpl(const uint32_t* block, const uint32_t* salt, uint32_t key) {

@HuaHuaY HuaHuaY Jun 9, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use std::span<const uint32_t, kBitsSetPerBlock> block as the first parameter? Will doing this improve readability?

constexpr int kBitsSetPerBlock = 8;
uint32_t miss = 0;
for (int i = 0; i < kBitsSetPerBlock; ++i) {
const uint32_t mask = static_cast<uint32_t>(1) << ((key * salt[i]) >> 27);
miss |= (~block[i] & mask);
}
return miss == 0;
}

} // namespace parquet::internal
89 changes: 89 additions & 0 deletions cpp/src/parquet/bloom_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/cpu_info.h"

#include "parquet/bloom_filter.h"
#include "parquet/exception.h"
Expand All @@ -38,6 +39,14 @@
#include "parquet/types.h"
#include "parquet/xxhasher.h"

// Both dispatch targets included directly so the test can exercise the
// un-picked one too -- DynamicDispatch resolves once at static init.
#include "parquet/bloom_filter_block_impl_internal.h"

#if defined(ARROW_HAVE_RUNTIME_AVX2)
# include "parquet/bloom_filter_avx2_internal.h"
#endif

namespace parquet {
namespace test {

Expand Down Expand Up @@ -434,5 +443,85 @@ TYPED_TEST(TestBatchBloomFilter, Basic) {
AssertBufferEqual(*buffer, *batch_insert_buffer);
}

// Guards against silent drift between the baseline and AVX2 probe bodies --
// DynamicDispatch only runs one of them per host.
#if defined(ARROW_HAVE_RUNTIME_AVX2)
namespace {

// Mirror of BlockSplitBloomFilter::kBitsSetPerBlock (private to the class).
constexpr int kBitsSetPerBlock = 8;
Comment on lines +451 to +452

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, why not make it public in the class? This would avoid hard-coding it in multiple places (in cpp/src/parquet/bloom_filter_block_impl_internal.h as well).


// Test-only SALT (matches the Parquet SBBF spec values used in
// bloom_filter.h). Kernel-vs-kernel agreement holds for any SALT, so this
// duplication is a contained test-side convenience, not a spec mirror.
constexpr uint32_t kProbeTestSalt[kBitsSetPerBlock] = {
0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU,
0x705495c7U, 0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U,
};

inline void InsertIntoBlock(uint32_t* block, uint32_t key) {
for (int i = 0; i < kBitsSetPerBlock; ++i) {
block[i] |= uint32_t{1} << ((key * kProbeTestSalt[i]) >> 27);
}
}

void AssertKernelsAgree(const uint32_t* block, uint32_t key) {
const bool standard = internal::FindHashBlockImpl(block, kProbeTestSalt, key);
const bool avx2 = internal::FindHashBlockAvx2(block, kProbeTestSalt, key);
ASSERT_EQ(standard, avx2) << "dispatch targets diverged for key=0x" << std::hex << key;
}

} // namespace

class BloomFilterProbeKernel : public ::testing::Test {
protected:
void SetUp() override {
if (!::arrow::internal::CpuInfo::GetInstance()->IsSupported(
::arrow::internal::CpuInfo::AVX2)) {
GTEST_SKIP() << "AVX2 not available at runtime";
}
}
};

// Random-block fuzz: exercises the full bit lattice, catches reduction /
// operand-order bugs that don't depend on realistic fill density.
TEST_F(BloomFilterProbeKernel, AgreeOnRandomBlocks) {
std::mt19937_64 rng(0xC0FFEE);
constexpr int kNumTrials = 20000;
for (int trial = 0; trial < kNumTrials; ++trial) {
uint32_t block[kBitsSetPerBlock];
for (uint32_t& word : block) {
word = static_cast<uint32_t>(rng());
}
AssertKernelsAgree(block, static_cast<uint32_t>(rng()));
}
}

// Production-fill fuzz: blocks populated by the same SALT-derived insert the
// writer uses, then probed with both inserted keys (must match) and fresh
// keys (mostly miss). Catches bugs that only surface on real fill density.
TEST_F(BloomFilterProbeKernel, AgreeOnPopulatedBlocks) {
std::mt19937_64 rng(0xBABECAFE);
constexpr int kNumBlocks = 200;
constexpr int kKeysPerBlock = 6; // ~k inserts per 256-bit block, realistic FPP.
for (int b = 0; b < kNumBlocks; ++b) {
uint32_t block[kBitsSetPerBlock] = {0};
std::vector<uint32_t> inserted;
inserted.reserve(kKeysPerBlock);
for (int k = 0; k < kKeysPerBlock; ++k) {
const uint32_t key = static_cast<uint32_t>(rng());
InsertIntoBlock(block, key);
inserted.push_back(key);
}
for (uint32_t key : inserted) {
AssertKernelsAgree(block, key);
}
for (int q = 0; q < 50; ++q) {
AssertKernelsAgree(block, static_cast<uint32_t>(rng()));
}
}
}
#endif

} // namespace test
} // namespace parquet
Loading