From 49c031390db7f98c3ed1df7356dbb866601b573d Mon Sep 17 00:00:00 2001 From: yew1eb Date: Tue, 28 Apr 2026 11:54:21 +0800 Subject: [PATCH 1/2] perf: SIMD short-circuit in JoinHashMap probe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit [AURON-2160] Optimize join hash map probe by checking hash_matched first before computing empty mask. This reduces ~50% SIMD instructions when hash hit rate is high (typical join scenarios). Before: Always compute both hash_matched and empty SIMD masks. After: Only compute empty mask when hash_matched has no hits. Also add a criterion microbenchmark (benches/join_hash_map.rs) covering realistic BHJ build sizes (5M/10M/20M keys) × three hit rates (0/50/100%). Results on Apple M2 Pro (probe_size=4096): build size | hit=0% | hit=50% | hit=100% ----------------+---------+---------+--------- 5M (~128 MB) | 6.63 µs | 6.52 µs | 6.35 µs 10M (~256 MB) | 6.68 µs | 6.50 µs | 6.36 µs 20M (~512 MB) | 6.70 µs | 6.59 µs | 6.36 µs Latency stays flat because prefetch_read_data (4-step ahead) fully pipelines cache misses. The hit=100% path is consistently ~4-5% faster, aligning with the optimization goal. Instruction-count savings can be confirmed on x86 via: perf stat -e instructions Run benchmark: cargo bench --bench join_hash_map -p datafusion-ext-plans --- Cargo.lock | 243 ++++++++++++++++-- Cargo.toml | 1 + native-engine/datafusion-ext-plans/Cargo.toml | 5 + .../benches/join_hash_map.rs | 120 +++++++++ .../src/joins/join_hash_map.rs | 12 +- 5 files changed, 357 insertions(+), 24 deletions(-) create mode 100644 native-engine/datafusion-ext-plans/benches/join_hash_map.rs diff --git a/Cargo.lock b/Cargo.lock index 24b42d5df..e943b2445 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,18 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + +[[package]] +name = "anstyle" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" + [[package]] name = "anyhow" version = "1.0.99" @@ -618,6 +630,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.2.32" @@ -670,6 +688,58 @@ dependencies = [ "phf", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + +[[package]] +name = "clap" +version = "4.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" +dependencies = [ + "clap_builder", +] + +[[package]] +name = "clap_builder" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" +dependencies = [ + "anstyle", + "clap_lex", +] + +[[package]] +name = "clap_lex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" + [[package]] name = "cmake" version = "0.1.54" @@ -764,6 +834,61 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -859,7 +984,7 @@ dependencies = [ "flate2", "futures", "hex", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -893,7 +1018,7 @@ dependencies = [ "datafusion-session", "datafusion-sql", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -979,7 +1104,7 @@ dependencies = [ "flate2", "futures", "glob", - "itertools", + "itertools 0.14.0", "log", "object_store", "parquet", @@ -1063,7 +1188,7 @@ dependencies = [ "datafusion-session", "futures", "hex", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -1124,7 +1249,7 @@ dependencies = [ "arrow", "datafusion-common", "indexmap", - "itertools", + "itertools 0.14.0", "paste", ] @@ -1139,7 +1264,7 @@ dependencies = [ "byteorder", "chrono", "datafusion", - "itertools", + "itertools 0.14.0", "jni", "log", "lz4_flex 0.13.0", @@ -1162,7 +1287,7 @@ dependencies = [ "auron-jni-bridge", "datafusion", "datafusion-ext-commons", - "itertools", + "itertools 0.14.0", "jni", "log", "once_cell", @@ -1179,7 +1304,7 @@ dependencies = [ "chrono-tz", "datafusion", "datafusion-ext-commons", - "itertools", + "itertools 0.14.0", "jni", "log", "num", @@ -1204,6 +1329,7 @@ dependencies = [ "bytes 1.11.1", "bytesize", "count-write", + "criterion", "datafusion", "datafusion-datasource", "datafusion-datasource-parquet", @@ -1214,7 +1340,7 @@ dependencies = [ "futures", "futures-util", "hashbrown 0.14.5", - "itertools", + "itertools 0.14.0", "jni", "log", "num", @@ -1254,7 +1380,7 @@ dependencies = [ "datafusion-expr-common", "datafusion-macros", "hex", - "itertools", + "itertools 0.14.0", "log", "md-5", "rand", @@ -1312,7 +1438,7 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-macros", "datafusion-physical-expr-common", - "itertools", + "itertools 0.14.0", "log", "paste", ] @@ -1380,7 +1506,7 @@ dependencies = [ "datafusion-expr-common", "datafusion-physical-expr", "indexmap", - "itertools", + "itertools 0.14.0", "log", "recursive", "regex", @@ -1402,7 +1528,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap", - "itertools", + "itertools 0.14.0", "log", "paste", "petgraph 0.8.2", @@ -1418,7 +1544,7 @@ dependencies = [ "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", - "itertools", + "itertools 0.14.0", ] [[package]] @@ -1435,7 +1561,7 @@ dependencies = [ "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-pruning", - "itertools", + "itertools 0.14.0", "log", "recursive", ] @@ -1462,7 +1588,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap", - "itertools", + "itertools 0.14.0", "log", "parking_lot", "pin-project-lite", @@ -1485,7 +1611,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", - "itertools", + "itertools 0.14.0", "log", ] @@ -1505,7 +1631,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-sql", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -2236,6 +2362,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -2729,7 +2864,7 @@ dependencies = [ "futures", "http", "humantime", - "itertools", + "itertools 0.14.0", "parking_lot", "percent-encoding", "thiserror 2.0.14", @@ -2747,6 +2882,12 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "oorandom" +version = "11.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" + [[package]] name = "orc-rust" version = "0.7.0" @@ -2932,6 +3073,34 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "poem" version = "3.1.12" @@ -3116,7 +3285,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck", - "itertools", + "itertools 0.14.0", "log", "multimap", "once_cell", @@ -3136,7 +3305,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.104", @@ -3149,7 +3318,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.104", @@ -3331,6 +3500,26 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rayon" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb39b166781f92d482534ef4b4b1b2568f42613b53e5b6c160e24cfbfa30926d" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "rdkafka" version = "0.36.2" @@ -4088,6 +4277,16 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.9.0" diff --git a/Cargo.toml b/Cargo.toml index 6e285093b..2a3d18abc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -182,6 +182,7 @@ parking_lot = "0.12.5" paste = "1.0.15" procfs = "0.18.0" prost = "0.14.3" +criterion = { version = "0.5", features = ["html_reports"] } prost-types = "0.14.3" prost-reflect = "0.16.3" rand = "0.9.3" diff --git a/native-engine/datafusion-ext-plans/Cargo.toml b/native-engine/datafusion-ext-plans/Cargo.toml index f21857034..c15492349 100644 --- a/native-engine/datafusion-ext-plans/Cargo.toml +++ b/native-engine/datafusion-ext-plans/Cargo.toml @@ -73,4 +73,9 @@ sonic-rs = { workspace = true } procfs = { workspace = true } [dev-dependencies] +criterion = { workspace = true } rand = { workspace = true } + +[[bench]] +name = "join_hash_map" +harness = false diff --git a/native-engine/datafusion-ext-plans/benches/join_hash_map.rs b/native-engine/datafusion-ext-plans/benches/join_hash_map.rs new file mode 100644 index 000000000..49b095a53 --- /dev/null +++ b/native-engine/datafusion-ext-plans/benches/join_hash_map.rs @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Microbenchmarks for JoinHashMap::lookup_many. +//! +//! Sweeps three build-side map sizes to cover different cache regimes: +//! - 4K keys (~2 MB map) → fits in L2, mostly cache-hot +//! - 64K keys (~32 MB map) → spills into L3 +//! - 1M keys (~512 MB map)→ well beyond L3, cache-cold +//! +//! For each map size, three probe hit rates are measured: +//! - 0% (all misses) +//! - 50% (half hit, half miss) +//! - 100% (all hits — typical inner-join scenario) +//! +//! Probe batch size is fixed at 4096 to match a typical Spark batch. + +use std::sync::Arc; + +use arrow::{ + array::Int32Array, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, +}; +use criterion::{BenchmarkId, Criterion, black_box, criterion_group, criterion_main}; +use datafusion_ext_plans::joins::join_hash_map::JoinHashMap; + +/// Probe batch size — matches the default Spark vectorized batch size. +const PROBE_SIZE: usize = 4096; + +/// Build-side hash base: top bit always set so hashes are non-zero +/// (matches join_create_hashes convention). +const BUILD_HASH_BASE: u32 = 0x8000_0001; + +fn make_map(build_size: usize) -> (JoinHashMap, Vec) { + let values: Vec = (0..build_size as i32).collect(); + let array = Arc::new(Int32Array::from(values)); + let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, false)])); + let batch = RecordBatch::try_new(schema, vec![array.clone() as _]).expect("build batch"); + + let build_hashes: Vec = (0..build_size as u32) + .map(|i| BUILD_HASH_BASE.wrapping_add(i)) + .collect(); + + let map = JoinHashMap::create_from_data_batch_and_hashes( + batch, + vec![array as _], + build_hashes.clone(), + ) + .expect("build map"); + + (map, build_hashes) +} + +/// Generate PROBE_SIZE hashes at the given hit rate. +/// Hits and misses are interleaved evenly (Bresenham) to avoid cache grouping +/// bias. +fn make_probe_hashes(build_hashes: &[u32], build_size: usize, hit_rate: f64) -> Vec { + // Miss hashes start well above the build range to avoid accidental collision. + let miss_base: u32 = BUILD_HASH_BASE.wrapping_add(build_size as u32 + 0x0010_0000); + let build_len = build_hashes.len(); + (0..PROBE_SIZE) + .map(|i| { + let cumulative = ((i + 1) as f64 * hit_rate) as usize; + let prev = (i as f64 * hit_rate) as usize; + if cumulative > prev { + build_hashes[i % build_len] + } else { + miss_base.wrapping_add(i as u32) + } + }) + .collect() +} + +fn bench_lookup_many(c: &mut Criterion) { + // (label, build_size) + // map memory ≈ (build_size * 2 / 8).next_power_of_two() * 64 bytes + // 5M → ~128 MB (realistic BHJ, ~50 MB serialized data) + // 10M → ~256 MB (realistic BHJ, ~100–200 MB serialized data) + // 20M → ~512 MB (realistic BHJ, ~1 GB serialized data) + let build_sizes: &[(&str, usize)] = &[ + ("build=5M", 5_000_000), + ("build=10M", 10_000_000), + ("build=20M", 20_000_000), + ]; + let hit_rates: &[(&str, f64)] = &[("hit=0%", 0.0), ("hit=50%", 0.5), ("hit=100%", 1.0)]; + + let mut group = c.benchmark_group("JoinHashMap::lookup_many"); + + for &(size_label, build_size) in build_sizes { + let (map, build_hashes) = make_map(build_size); + for &(rate_label, hit_rate) in hit_rates { + let probe = make_probe_hashes(&build_hashes, build_size, hit_rate); + let label = format!("{size_label}/{rate_label}"); + group.bench_with_input(BenchmarkId::from_parameter(&label), &label, |b, _| { + b.iter(|| { + let result = map.lookup_many(black_box(probe.clone())); + black_box(result) + }); + }); + } + } + + group.finish(); +} + +criterion_group!(benches, bench_lookup_many); +criterion_main!(benches); diff --git a/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs b/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs index 686dd9f0b..fbef6febc 100644 --- a/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs +++ b/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs @@ -253,15 +253,23 @@ impl Table { let mut e = entries![i] as usize; loop { let hash_matched = self.map[e].hashes.simd_eq(Simd::splat(hashes[i])); - let empty = self.map[e].hashes.simd_eq(Simd::splat(0)); - if let Some(pos) = (hash_matched | empty).first_set() { + // Fast path: check hash match first (common case) + if let Some(pos) = hash_matched.first_set() { hashes[i] = unsafe { // safety: transmute MapValue(u32) to u32 std::mem::transmute(self.map[e].values[pos]) }; break; } + + // Slow path: check empty slot only when no match + let empty = self.map[e].hashes.simd_eq(Simd::splat(0)); + if empty.any() { + hashes[i] = MapValue::EMPTY.0; + break; + } + e += 1; e %= 1 << self.map_mod_bits; } From e128f56585a69d116bbf9935baaf5dce90c6e23f Mon Sep 17 00:00:00 2001 From: yew1eb Date: Tue, 28 Apr 2026 17:46:50 +0800 Subject: [PATCH 2/2] trigger ci