From af49147e37b7a9c2822a6ce628d5a8ed9aff4e55 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 12 Mar 2026 12:58:30 +0100 Subject: [PATCH 1/2] feat(bench): Add randomized object sizes to bigtable-bench Replace the fixed --size flag with --p50 and --p99 flags that define a LogNormal size distribution, matching the approach used in stresstest. Object sizes are capped at 950KiB. Throughput stats now track actual bytes written instead of assuming a fixed size. Defaults: p50=2KiB, p99=385KiB. Co-Authored-By: Claude --- Cargo.lock | 1 + bigtable-bench/Cargo.toml | 1 + bigtable-bench/README.md | 3 ++- bigtable-bench/src/main.rs | 41 ++++++++++++++++++++++++++++++-------- 4 files changed, 37 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9b596bba..c04a717b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -441,6 +441,7 @@ dependencies = [ "objectstore-service", "objectstore-types", "rand 0.9.2", + "rand_distr", "rustls", "sketches-ddsketch", "tokio", diff --git a/bigtable-bench/Cargo.toml b/bigtable-bench/Cargo.toml index 300143fa..53f4fcb6 100644 --- a/bigtable-bench/Cargo.toml +++ b/bigtable-bench/Cargo.toml @@ -19,6 +19,7 @@ mimalloc = { workspace = true } objectstore-service = { workspace = true } objectstore-types = { workspace = true } rand = { workspace = true, features = ["small_rng"] } +rand_distr = "0.5.1" rustls = { version = "0.23", default-features = false } sketches-ddsketch = "0.3.0" tokio = { workspace = true, features = ["rt-multi-thread", "sync", "macros", "signal"] } diff --git a/bigtable-bench/README.md b/bigtable-bench/README.md index 73fd0622..cd1e339d 100644 --- a/bigtable-bench/README.md +++ b/bigtable-bench/README.md @@ -17,7 +17,8 @@ cargo run --release -p bigtable-bench -- [OPTIONS] | Flag | Default | Description | |------|---------|-------------| | `-c` | `10` | Concurrent put requests | -| `-s` | `50KB` | Object size (e.g. `4096`, `1MB`) | +| `--p50` | `2KiB` | Median object size (p50) | +| `--p99` | `385KiB` | 99th-percentile object size (p99) | | `-p` | `1` | Bigtable gRPC connection pool size | | `-a` | — | Emulator address (omit for real GCP) | | `--project` | `testing` | GCP project ID | diff --git a/bigtable-bench/src/main.rs b/bigtable-bench/src/main.rs index ae0c18de..55fa1d66 100644 --- a/bigtable-bench/src/main.rs +++ b/bigtable-bench/src/main.rs @@ -14,6 +14,7 @@ use bytesize::ByteSize; use futures_util::StreamExt; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; +use rand_distr::{Distribution, LogNormal}; use sketches_ddsketch::DDSketch; use tokio::sync::Semaphore; use yansi::Paint; @@ -31,9 +32,13 @@ struct Args { #[argh(option, short = 'c', default = "10")] concurrency: usize, - /// object size, e.g. "50KB" or "4096" (default: 50KB) - #[argh(option, short = 's', default = "ByteSize::kb(50)")] - size: ByteSize, + /// median object size (p50), e.g. "50KB" (default: 2KiB) + #[argh(option, default = "ByteSize::kib(2)")] + p50: ByteSize, + + /// 99th-percentile object size (p99), e.g. "200KB" (default: 385KiB) + #[argh(option, default = "ByteSize::kib(385)")] + p99: ByteSize, /// bigtable gRPC connection pool size (default: 1) #[argh(option, short = 'p', default = "1")] @@ -66,7 +71,13 @@ async fn main() -> anyhow::Result<()> { .ok(); // INVARIANT: no other code installs a provider before main() let args: Args = argh::from_env(); - let object_size = args.size.as_u64() as usize; + + let p50 = args.p50.as_u64() as f64; + let p99 = args.p99.as_u64() as f64; + let mu = p50.ln(); + let sigma = (p99.ln() - mu) / 2.3263; + let size_distribution = LogNormal::new(mu, sigma) + .expect("invalid size distribution parameters"); // INVARIANT: p50 and p99 are positive match &args.addr { Some(addr) => eprintln!( @@ -92,20 +103,27 @@ async fn main() -> anyhow::Result<()> { let backend = Arc::new(backend); eprintln!( - "starting benchmark: concurrency={}, object_size={}", - args.concurrency, args.size, + "starting benchmark: concurrency={}, p50={}, p99={}", + args.concurrency, args.p50, args.p99, ); + /// Maximum object size regardless of distribution output. + const MAX_OBJECT_SIZE: u64 = 950 * 1024; + let semaphore = Arc::new(Semaphore::new(args.concurrency)); let sketch = Arc::new(Mutex::new(DDSketch::default())); let failures = Arc::new(AtomicUsize::new(0)); let shutdown = tokio_util::sync::CancellationToken::new(); let pool_size = args.pool; + let size_distribution = Arc::new(size_distribution); let bench_start = Instant::now(); + let total_bytes = Arc::new(AtomicUsize::new(0)); + // Stats printer task. let stats_sketch = Arc::clone(&sketch); let stats_failures = Arc::clone(&failures); + let stats_bytes = Arc::clone(&total_bytes); let stats_shutdown = shutdown.clone(); let stats_handle = tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(1)); @@ -123,7 +141,8 @@ async fn main() -> anyhow::Result<()> { } let ops_per_sec = ops as f64 / elapsed.as_secs_f64(); let ops_per_sec_per_conn = ops_per_sec / pool_size as f64; - let bytes_per_sec = ops_per_sec * object_size as f64; + let bytes_per_sec = + stats_bytes.load(Ordering::Relaxed) as f64 / elapsed.as_secs_f64(); let avg = Duration::from_secs_f64(guard.sum().unwrap() / ops as f64); let p50 = Duration::from_secs_f64(guard.quantile(0.5).unwrap().unwrap()); let p95 = Duration::from_secs_f64(guard.quantile(0.95).unwrap().unwrap()); @@ -160,6 +179,7 @@ async fn main() -> anyhow::Result<()> { let worker_shutdown = shutdown.clone(); let worker_sketch = Arc::clone(&sketch); let worker_failures = Arc::clone(&failures); + let worker_bytes = Arc::clone(&total_bytes); let worker_handle = tokio::spawn(async move { let context = ObjectContext { usecase: "bench".into(), @@ -184,6 +204,8 @@ async fn main() -> anyhow::Result<()> { let backend = Arc::clone(&backend); let sketch = Arc::clone(&worker_sketch); let failures = Arc::clone(&worker_failures); + let bytes = Arc::clone(&worker_bytes); + let size_dist = Arc::clone(&size_distribution); let context = context.clone(); let metadata = metadata.clone(); @@ -191,6 +213,8 @@ async fn main() -> anyhow::Result<()> { let _permit = permit; let mut rng = SmallRng::from_os_rng(); + let object_size = + (size_dist.sample(&mut rng) as u64).min(MAX_OBJECT_SIZE) as usize; let mut buf = vec![0u8; object_size]; rng.fill(&mut buf[..]); @@ -207,6 +231,7 @@ async fn main() -> anyhow::Result<()> { if result.is_err() { failures.fetch_add(1, Ordering::Relaxed); } else { + bytes.fetch_add(object_size, Ordering::Relaxed); sketch.lock().unwrap().add(elapsed.as_secs_f64()); // INVARIANT: no panic inside lock } }); @@ -233,7 +258,7 @@ async fn main() -> anyhow::Result<()> { let elapsed = bench_start.elapsed(); let ops_per_sec = ops as f64 / elapsed.as_secs_f64(); let ops_per_sec_per_conn = ops_per_sec / pool_size as f64; - let bytes_per_sec = ops_per_sec * object_size as f64; + let bytes_per_sec = total_bytes.load(Ordering::Relaxed) as f64 / elapsed.as_secs_f64(); let avg = Duration::from_secs_f64(guard.sum().unwrap() / ops as f64); let p50 = Duration::from_secs_f64(guard.quantile(0.5).unwrap().unwrap()); let p95 = Duration::from_secs_f64(guard.quantile(0.95).unwrap().unwrap()); From 51a0ec4691a622bb97f3f39f514844bd7e8c0766 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 12 Mar 2026 13:05:09 +0100 Subject: [PATCH 2/2] style: Apply rustfmt formatting --- bigtable-bench/src/main.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/bigtable-bench/src/main.rs b/bigtable-bench/src/main.rs index 55fa1d66..a2a37ac7 100644 --- a/bigtable-bench/src/main.rs +++ b/bigtable-bench/src/main.rs @@ -76,8 +76,8 @@ async fn main() -> anyhow::Result<()> { let p99 = args.p99.as_u64() as f64; let mu = p50.ln(); let sigma = (p99.ln() - mu) / 2.3263; - let size_distribution = LogNormal::new(mu, sigma) - .expect("invalid size distribution parameters"); // INVARIANT: p50 and p99 are positive + let size_distribution = + LogNormal::new(mu, sigma).expect("invalid size distribution parameters"); // INVARIANT: p50 and p99 are positive match &args.addr { Some(addr) => eprintln!( @@ -141,8 +141,7 @@ async fn main() -> anyhow::Result<()> { } let ops_per_sec = ops as f64 / elapsed.as_secs_f64(); let ops_per_sec_per_conn = ops_per_sec / pool_size as f64; - let bytes_per_sec = - stats_bytes.load(Ordering::Relaxed) as f64 / elapsed.as_secs_f64(); + let bytes_per_sec = stats_bytes.load(Ordering::Relaxed) as f64 / elapsed.as_secs_f64(); let avg = Duration::from_secs_f64(guard.sum().unwrap() / ops as f64); let p50 = Duration::from_secs_f64(guard.quantile(0.5).unwrap().unwrap()); let p95 = Duration::from_secs_f64(guard.quantile(0.95).unwrap().unwrap()); @@ -213,8 +212,7 @@ async fn main() -> anyhow::Result<()> { let _permit = permit; let mut rng = SmallRng::from_os_rng(); - let object_size = - (size_dist.sample(&mut rng) as u64).min(MAX_OBJECT_SIZE) as usize; + let object_size = (size_dist.sample(&mut rng) as u64).min(MAX_OBJECT_SIZE) as usize; let mut buf = vec![0u8; object_size]; rng.fill(&mut buf[..]);