From 1ff25ba2183a32594427bb4e1fc40df2be25be2b Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 5 Mar 2026 17:07:37 +0100 Subject: [PATCH 1/6] feat(bench): Add Bigtable load testing tool Adds bigtable-bench, a CLI tool that writes objects directly to the BigTableBackend and prints live latency percentiles, throughput, and failure counts. Supports configurable concurrency, object size, connection pool size, and emulator address (for use with Toxiproxy). --- Cargo.lock | 19 +++ Cargo.toml | 2 +- bigtable-bench/Cargo.toml | 25 ++++ bigtable-bench/src/main.rs | 228 +++++++++++++++++++++++++++++++++ objectstore-service/src/lib.rs | 3 +- 5 files changed, 275 insertions(+), 2 deletions(-) create mode 100644 bigtable-bench/Cargo.toml create mode 100644 bigtable-bench/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index a2a6511f..5ea3a4b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -428,6 +428,25 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" +[[package]] +name = "bigtable-bench" +version = "0.1.0" +dependencies = [ + "anyhow", + "argh", + "bytes", + "bytesize", + "futures-util", + "mimalloc", + "objectstore-service", + "objectstore-types", + "rand 0.9.2", + "sketches-ddsketch", + "tokio", + "tokio-util", + "yansi", +] + [[package]] name = "bigtable_rs" version = "0.2.19" diff --git a/Cargo.toml b/Cargo.toml index b5b0a3c7..7411341e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "3" -members = ["objectstore-*", "clients/rust", "stresstest"] +members = ["objectstore-*", "clients/rust", "stresstest", "bigtable-bench"] default-members = ["objectstore-server"] [profile.dev] diff --git a/bigtable-bench/Cargo.toml b/bigtable-bench/Cargo.toml new file mode 100644 index 00000000..cf98b863 --- /dev/null +++ b/bigtable-bench/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "bigtable-bench" +authors = ["Sentry "] +description = "Benchmark tool for writing objects directly to the Bigtable backend" +homepage = "https://getsentry.github.io/objectstore/" +repository = "https://github.com/getsentry/objectstore" +license-file = "../LICENSE.md" +version = "0.1.0" +edition = "2024" +publish = false + +[dependencies] +anyhow = { workspace = true } +argh = "0.1.13" +bytes = { workspace = true } +bytesize = "2.0.1" +futures-util = { workspace = true } +mimalloc = { workspace = true } +objectstore-service = { workspace = true } +objectstore-types = { workspace = true } +rand = { workspace = true, features = ["small_rng"] } +sketches-ddsketch = "0.3.0" +tokio = { workspace = true, features = ["rt-multi-thread", "sync", "macros", "signal"] } +tokio-util = { workspace = true } +yansi = "1.0.1" diff --git a/bigtable-bench/src/main.rs b/bigtable-bench/src/main.rs new file mode 100644 index 00000000..dd6cb004 --- /dev/null +++ b/bigtable-bench/src/main.rs @@ -0,0 +1,228 @@ +//! Benchmark tool for writing objects directly to the Bigtable backend. +//! +//! Bypasses the HTTP layer and writes directly via the `BigTableBackend`, +//! printing live latency percentiles every second. Intended for use with the +//! Bigtable emulator (`devservices up --mode=full`). + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use anyhow::Context; +use argh::FromArgs; +use bytesize::ByteSize; +use futures_util::StreamExt; +use rand::rngs::SmallRng; +use rand::{Rng, SeedableRng}; +use sketches_ddsketch::DDSketch; +use tokio::sync::Semaphore; +use yansi::Paint; + +use objectstore_service::backend::bigtable::BigTableBackend; +use objectstore_service::backend::common::Backend; +use objectstore_service::id::{ObjectContext, ObjectId}; +use objectstore_types::metadata::Metadata; +use objectstore_types::scope::{Scope, Scopes}; + +/// Benchmark tool for the Bigtable storage backend. +#[derive(Debug, FromArgs)] +struct Args { + /// number of concurrent put requests (default: 10) + #[argh(option, short = 'c', default = "10")] + concurrency: usize, + + /// object size, e.g. "50KB" or "4096" (default: 50KB) + #[argh(option, short = 's', default = "ByteSize::kb(50)")] + size: ByteSize, + + /// bigtable gRPC connection pool size (default: 1) + #[argh(option, short = 'p', default = "1")] + pool: usize, + + /// bigtable emulator address (default: localhost:8086) + #[argh(option, short = 'a', default = "String::from(\"localhost:8086\")")] + addr: String, +} + +#[global_allocator] +static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args: Args = argh::from_env(); + let object_size = args.size.as_u64() as usize; + + eprintln!( + "connecting to Bigtable emulator ({}), pool size {}", + args.addr, args.pool + ); + + let backend = BigTableBackend::new( + Some(&args.addr), + "testing", + "objectstore", + "objectstore", + Some(args.pool), + ) + .await + .context("failed to connect to Bigtable emulator")?; + + let backend = Arc::new(backend); + + eprintln!( + "starting benchmark: concurrency={}, object_size={}", + args.concurrency, args.size, + ); + + let semaphore = Arc::new(Semaphore::new(args.concurrency)); + let sketch = Arc::new(Mutex::new(DDSketch::default())); + let failures = Arc::new(AtomicUsize::new(0)); + let shutdown = tokio_util::sync::CancellationToken::new(); + let concurrency = args.concurrency; + let bench_start = Instant::now(); + + // Stats printer task. + let stats_sketch = Arc::clone(&sketch); + let stats_failures = Arc::clone(&failures); + let stats_shutdown = shutdown.clone(); + let stats_handle = tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + let start = Instant::now(); + loop { + tokio::select! { + _ = interval.tick() => {} + () = stats_shutdown.cancelled() => break, + } + let elapsed = start.elapsed(); + let guard = stats_sketch.lock().unwrap(); // INVARIANT: no panic inside lock + let ops = guard.count(); + if ops == 0 { + continue; + } + let ops_per_sec = ops as f64 / elapsed.as_secs_f64(); + let ops_per_sec_per_conn = ops_per_sec / concurrency as f64; + let avg = Duration::from_secs_f64(guard.sum().unwrap() / ops as f64); + let p50 = Duration::from_secs_f64(guard.quantile(0.5).unwrap().unwrap()); + let p95 = Duration::from_secs_f64(guard.quantile(0.95).unwrap().unwrap()); + let p99 = Duration::from_secs_f64(guard.quantile(0.99).unwrap().unwrap()); + let max = Duration::from_secs_f64(guard.max().unwrap()); + drop(guard); + let failed = stats_failures.load(Ordering::Relaxed); + + eprint!( + "\x1b[2K[{} ops | {:.0} ops/s | {:.2} ops/s/conn] avg: {} p50: {} p95: {} p99: {} max: {} ", + ops.bold(), + ops_per_sec.bold(), + ops_per_sec_per_conn.bold(), + format_ms(avg).bold(), + format_ms(p50), + format_ms(p95), + format_ms(p99), + format_ms(max), + ); + if failed > 0 { + eprint!("\n\x1b[2Kfailed: {}\x1b[1A", failed.red().bold()); + } + eprint!("\r"); + } + }); + + // Worker loop — spawns tasks up to the semaphore limit until ctrl+c. + let worker_shutdown = shutdown.clone(); + let worker_sketch = Arc::clone(&sketch); + let worker_failures = Arc::clone(&failures); + let worker_handle = tokio::spawn(async move { + let context = ObjectContext { + usecase: "bench".into(), + scopes: Scopes::from_iter([ + Scope::create("org", "bench").unwrap(), // INVARIANT: valid scope name + ]), + }; + let metadata = Metadata::default(); + + loop { + if worker_shutdown.is_cancelled() { + break; + } + + let permit = match semaphore.clone().acquire_owned().await { + Ok(p) => p, + Err(_) => break, // semaphore closed + }; + + let backend = Arc::clone(&backend); + let sketch = Arc::clone(&worker_sketch); + let failures = Arc::clone(&worker_failures); + let context = context.clone(); + let metadata = metadata.clone(); + + tokio::spawn(async move { + let _permit = permit; + + let mut rng = SmallRng::from_os_rng(); + let mut buf = vec![0u8; object_size]; + rng.fill(&mut buf[..]); + + let id = ObjectId::random(context); + let stream = futures_util::stream::once(async { + Ok::<_, std::io::Error>(bytes::Bytes::from(buf)) + }) + .boxed(); + + let start = Instant::now(); + let result = backend.put_object(&id, &metadata, stream).await; + let elapsed = start.elapsed(); + + if result.is_err() { + failures.fetch_add(1, Ordering::Relaxed); + } else { + sketch.lock().unwrap().add(elapsed.as_secs_f64()); // INVARIANT: no panic inside lock + } + }); + } + }); + + tokio::signal::ctrl_c() + .await + .context("failed to listen for ctrl+c")?; + + shutdown.cancel(); + + let _ = stats_handle.await; + let _ = worker_handle.await; + + // Clear the two-line live stats display before printing the final summary. + eprint!("\r\x1b[2K\n\x1b[2K\x1b[1A\r"); + + // Print final summary. + let guard = sketch.lock().unwrap(); // INVARIANT: no panic inside lock + let ops = guard.count(); + if ops > 0 { + let elapsed = bench_start.elapsed(); + let ops_per_sec = ops as f64 / elapsed.as_secs_f64(); + let ops_per_sec_per_conn = ops_per_sec / concurrency as f64; + let avg = Duration::from_secs_f64(guard.sum().unwrap() / ops as f64); + let p50 = Duration::from_secs_f64(guard.quantile(0.5).unwrap().unwrap()); + let p95 = Duration::from_secs_f64(guard.quantile(0.95).unwrap().unwrap()); + let p99 = Duration::from_secs_f64(guard.quantile(0.99).unwrap().unwrap()); + let max = Duration::from_secs_f64(guard.max().unwrap()); + eprintln!( + "\nfinal: {} ops | {:.0} ops/s | {:.2} ops/s/conn | avg: {} p50: {} p95: {} p99: {} max: {}", + ops.bold(), + ops_per_sec.bold(), + ops_per_sec_per_conn.bold(), + format_ms(avg).bold(), + format_ms(p50), + format_ms(p95), + format_ms(p99), + format_ms(max), + ); + } + + Ok(()) +} + +/// Formats a [`Duration`] as milliseconds with two decimal places. +fn format_ms(d: Duration) -> String { + format!("{:.2}ms", d.as_secs_f64() * 1000.0) +} diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 4c973bb5..258359d3 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -2,7 +2,8 @@ #![warn(missing_docs)] #![warn(missing_debug_implementations)] -mod backend; +#[allow(missing_docs, private_interfaces)] +pub mod backend; mod concurrency; pub mod error; pub mod id; From 03400b091fc3e92e9895f968276289ff46a5a4a1 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 5 Mar 2026 17:12:44 +0100 Subject: [PATCH 2/6] feat(bench): Support real GCP Bigtable and add 1h TTL Adds --project, --instance, --table flags and makes --addr optional so the bench can connect to a real GCP Bigtable instance (using gcp_auth for credentials) in addition to the local emulator. All objects are now written with a 1h TTL to avoid polluting production tables during load tests. --- bigtable-bench/src/main.rs | 49 +++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/bigtable-bench/src/main.rs b/bigtable-bench/src/main.rs index dd6cb004..ff2e03dd 100644 --- a/bigtable-bench/src/main.rs +++ b/bigtable-bench/src/main.rs @@ -21,7 +21,7 @@ use yansi::Paint; use objectstore_service::backend::bigtable::BigTableBackend; use objectstore_service::backend::common::Backend; use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_types::metadata::Metadata; +use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use objectstore_types::scope::{Scope, Scopes}; /// Benchmark tool for the Bigtable storage backend. @@ -39,9 +39,21 @@ struct Args { #[argh(option, short = 'p', default = "1")] pool: usize, - /// bigtable emulator address (default: localhost:8086) - #[argh(option, short = 'a', default = "String::from(\"localhost:8086\")")] - addr: String, + /// bigtable emulator address; omit to connect to real GCP Bigtable + #[argh(option, short = 'a')] + addr: Option, + + /// GCP project ID (default: testing) + #[argh(option, default = "String::from(\"testing\")")] + project: String, + + /// bigtable instance name (default: objectstore) + #[argh(option, default = "String::from(\"objectstore\")")] + instance: String, + + /// bigtable table name (default: objectstore) + #[argh(option, default = "String::from(\"objectstore\")")] + table: String, } #[global_allocator] @@ -52,20 +64,26 @@ async fn main() -> anyhow::Result<()> { let args: Args = argh::from_env(); let object_size = args.size.as_u64() as usize; - eprintln!( - "connecting to Bigtable emulator ({}), pool size {}", - args.addr, args.pool - ); + match &args.addr { + Some(addr) => eprintln!( + "connecting to Bigtable emulator ({addr}), pool size {}", + args.pool + ), + None => eprintln!( + "connecting to GCP Bigtable ({}/{}), pool size {}", + args.project, args.instance, args.pool + ), + } let backend = BigTableBackend::new( - Some(&args.addr), - "testing", - "objectstore", - "objectstore", + args.addr.as_deref(), + &args.project, + &args.instance, + &args.table, Some(args.pool), ) .await - .context("failed to connect to Bigtable emulator")?; + .context("failed to connect to Bigtable")?; let backend = Arc::new(backend); @@ -138,7 +156,10 @@ async fn main() -> anyhow::Result<()> { Scope::create("org", "bench").unwrap(), // INVARIANT: valid scope name ]), }; - let metadata = Metadata::default(); + let metadata = Metadata { + expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)), + ..Metadata::default() + }; loop { if worker_shutdown.is_cancelled() { From 1c5becd6b93250fe0f8043aa956d8873671ba5a1 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 5 Mar 2026 17:14:18 +0100 Subject: [PATCH 3/6] fix(bench): Install rustls CryptoProvider for GCP connections When connecting to real GCP Bigtable, rustls requires an explicit CryptoProvider because both aws-lc-rs and ring are enabled as transitive dependencies. Install aws-lc-rs as the default at startup. --- Cargo.lock | 1 + bigtable-bench/Cargo.toml | 1 + bigtable-bench/src/main.rs | 4 ++++ 3 files changed, 6 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 5ea3a4b0..df606c8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -441,6 +441,7 @@ dependencies = [ "objectstore-service", "objectstore-types", "rand 0.9.2", + "rustls", "sketches-ddsketch", "tokio", "tokio-util", diff --git a/bigtable-bench/Cargo.toml b/bigtable-bench/Cargo.toml index cf98b863..300143fa 100644 --- a/bigtable-bench/Cargo.toml +++ b/bigtable-bench/Cargo.toml @@ -19,6 +19,7 @@ mimalloc = { workspace = true } objectstore-service = { workspace = true } objectstore-types = { workspace = true } rand = { workspace = true, features = ["small_rng"] } +rustls = { version = "0.23", default-features = false } sketches-ddsketch = "0.3.0" tokio = { workspace = true, features = ["rt-multi-thread", "sync", "macros", "signal"] } tokio-util = { workspace = true } diff --git a/bigtable-bench/src/main.rs b/bigtable-bench/src/main.rs index ff2e03dd..c1a5dadf 100644 --- a/bigtable-bench/src/main.rs +++ b/bigtable-bench/src/main.rs @@ -61,6 +61,10 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; #[tokio::main] async fn main() -> anyhow::Result<()> { + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .ok(); // INVARIANT: no other code installs a provider before main() + let args: Args = argh::from_env(); let object_size = args.size.as_u64() as usize; From e7c160baa19b1aca6d594cec8aeaa3bcdefb0881 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 5 Mar 2026 17:27:18 +0100 Subject: [PATCH 4/6] feat(bench): Add network throughput display with SI units Prints bytes/s alongside ops/s in both the live display and final summary, using SI decimal prefixes (kB/s, MB/s, GB/s). --- bigtable-bench/src/main.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/bigtable-bench/src/main.rs b/bigtable-bench/src/main.rs index c1a5dadf..ea0826c0 100644 --- a/bigtable-bench/src/main.rs +++ b/bigtable-bench/src/main.rs @@ -123,6 +123,7 @@ async fn main() -> anyhow::Result<()> { } let ops_per_sec = ops as f64 / elapsed.as_secs_f64(); let ops_per_sec_per_conn = ops_per_sec / concurrency as f64; + let bytes_per_sec = ops_per_sec * object_size as f64; let avg = Duration::from_secs_f64(guard.sum().unwrap() / ops as f64); let p50 = Duration::from_secs_f64(guard.quantile(0.5).unwrap().unwrap()); let p95 = Duration::from_secs_f64(guard.quantile(0.95).unwrap().unwrap()); @@ -132,10 +133,11 @@ async fn main() -> anyhow::Result<()> { let failed = stats_failures.load(Ordering::Relaxed); eprint!( - "\x1b[2K[{} ops | {:.0} ops/s | {:.2} ops/s/conn] avg: {} p50: {} p95: {} p99: {} max: {} ", + "\x1b[2K[{} ops | {:.0} ops/s | {:.2} ops/s/conn | {}/s] avg: {} p50: {} p95: {} p99: {} max: {} ", ops.bold(), ops_per_sec.bold(), ops_per_sec_per_conn.bold(), + format_throughput(bytes_per_sec).bold(), format_ms(avg).bold(), format_ms(p50), format_ms(p95), @@ -226,16 +228,18 @@ async fn main() -> anyhow::Result<()> { let elapsed = bench_start.elapsed(); let ops_per_sec = ops as f64 / elapsed.as_secs_f64(); let ops_per_sec_per_conn = ops_per_sec / concurrency as f64; + let bytes_per_sec = ops_per_sec * object_size as f64; let avg = Duration::from_secs_f64(guard.sum().unwrap() / ops as f64); let p50 = Duration::from_secs_f64(guard.quantile(0.5).unwrap().unwrap()); let p95 = Duration::from_secs_f64(guard.quantile(0.95).unwrap().unwrap()); let p99 = Duration::from_secs_f64(guard.quantile(0.99).unwrap().unwrap()); let max = Duration::from_secs_f64(guard.max().unwrap()); eprintln!( - "\nfinal: {} ops | {:.0} ops/s | {:.2} ops/s/conn | avg: {} p50: {} p95: {} p99: {} max: {}", + "\nfinal: {} ops | {:.0} ops/s | {:.2} ops/s/conn | {}/s | avg: {} p50: {} p95: {} p99: {} max: {}", ops.bold(), ops_per_sec.bold(), ops_per_sec_per_conn.bold(), + format_throughput(bytes_per_sec).bold(), format_ms(avg).bold(), format_ms(p50), format_ms(p95), @@ -251,3 +255,16 @@ async fn main() -> anyhow::Result<()> { fn format_ms(d: Duration) -> String { format!("{:.2}ms", d.as_secs_f64() * 1000.0) } + +/// Formats a byte rate using SI decimal prefixes (kB/s, MB/s, GB/s). +fn format_throughput(bytes_per_sec: f64) -> String { + if bytes_per_sec >= 1e9 { + format!("{:.2} GB/s", bytes_per_sec / 1e9) + } else if bytes_per_sec >= 1e6 { + format!("{:.2} MB/s", bytes_per_sec / 1e6) + } else if bytes_per_sec >= 1e3 { + format!("{:.2} kB/s", bytes_per_sec / 1e3) + } else { + format!("{:.0} B/s", bytes_per_sec) + } +} From 385b045e1146f16157c14c5c206c4c6c50e27847 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 5 Mar 2026 17:29:35 +0100 Subject: [PATCH 5/6] fix(bench): Remove duplicate /s suffix in throughput display --- bigtable-bench/src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigtable-bench/src/main.rs b/bigtable-bench/src/main.rs index ea0826c0..74cf0bf4 100644 --- a/bigtable-bench/src/main.rs +++ b/bigtable-bench/src/main.rs @@ -133,7 +133,7 @@ async fn main() -> anyhow::Result<()> { let failed = stats_failures.load(Ordering::Relaxed); eprint!( - "\x1b[2K[{} ops | {:.0} ops/s | {:.2} ops/s/conn | {}/s] avg: {} p50: {} p95: {} p99: {} max: {} ", + "\x1b[2K[{} ops | {:.0} ops/s | {:.2} ops/s/conn | {}] avg: {} p50: {} p95: {} p99: {} max: {} ", ops.bold(), ops_per_sec.bold(), ops_per_sec_per_conn.bold(), @@ -235,7 +235,7 @@ async fn main() -> anyhow::Result<()> { let p99 = Duration::from_secs_f64(guard.quantile(0.99).unwrap().unwrap()); let max = Duration::from_secs_f64(guard.max().unwrap()); eprintln!( - "\nfinal: {} ops | {:.0} ops/s | {:.2} ops/s/conn | {}/s | avg: {} p50: {} p95: {} p99: {} max: {}", + "\nfinal: {} ops | {:.0} ops/s | {:.2} ops/s/conn | {} | avg: {} p50: {} p95: {} p99: {} max: {}", ops.bold(), ops_per_sec.bold(), ops_per_sec_per_conn.bold(), From 192e4447d6fdc960035e05517e04ad079fb3033a Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Mon, 9 Mar 2026 11:35:53 +0100 Subject: [PATCH 6/6] feat(bigtable-bench): Improve live stats display - Fix ops/s/conn to divide by pool size (connection count) rather than concurrency (in-flight request count) - Print latencies (avg/p50/p95/p99/max) on a separate line with aligned columns - Show elapsed time (M:SS) in the live stats header and final summary - Use debug-nonroot distroless image for easier container inspection --- Dockerfile | 2 +- bigtable-bench/src/main.rs | 29 +++++++++++++++++++++-------- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/Dockerfile b/Dockerfile index b732f059..1bc8b3b9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM gcr.io/distroless/cc-debian12:nonroot +FROM gcr.io/distroless/cc-debian12:debug-nonroot ARG BINARY=objectstore COPY ${BINARY} /bin/entrypoint diff --git a/bigtable-bench/src/main.rs b/bigtable-bench/src/main.rs index 74cf0bf4..1a32b070 100644 --- a/bigtable-bench/src/main.rs +++ b/bigtable-bench/src/main.rs @@ -101,6 +101,7 @@ async fn main() -> anyhow::Result<()> { let failures = Arc::new(AtomicUsize::new(0)); let shutdown = tokio_util::sync::CancellationToken::new(); let concurrency = args.concurrency; + let pool_size = args.pool; let bench_start = Instant::now(); // Stats printer task. @@ -122,7 +123,7 @@ async fn main() -> anyhow::Result<()> { continue; } let ops_per_sec = ops as f64 / elapsed.as_secs_f64(); - let ops_per_sec_per_conn = ops_per_sec / concurrency as f64; + let ops_per_sec_per_conn = ops_per_sec / pool_size as f64; let bytes_per_sec = ops_per_sec * object_size as f64; let avg = Duration::from_secs_f64(guard.sum().unwrap() / ops as f64); let p50 = Duration::from_secs_f64(guard.quantile(0.5).unwrap().unwrap()); @@ -133,11 +134,15 @@ async fn main() -> anyhow::Result<()> { let failed = stats_failures.load(Ordering::Relaxed); eprint!( - "\x1b[2K[{} ops | {:.0} ops/s | {:.2} ops/s/conn | {}] avg: {} p50: {} p95: {} p99: {} max: {} ", + "\x1b[2K[{} ops | {:.0} ops/s | {:.2} ops/s/conn | {} | {}]", ops.bold(), ops_per_sec.bold(), ops_per_sec_per_conn.bold(), format_throughput(bytes_per_sec).bold(), + format_elapsed(elapsed).bold(), + ); + eprint!( + "\n\x1b[2K avg: {} p50: {} p95: {} p99: {} max: {}", format_ms(avg).bold(), format_ms(p50), format_ms(p95), @@ -145,9 +150,10 @@ async fn main() -> anyhow::Result<()> { format_ms(max), ); if failed > 0 { - eprint!("\n\x1b[2Kfailed: {}\x1b[1A", failed.red().bold()); + eprint!("\n\x1b[2K failed: {}\r\x1b[2A", failed.red().bold()); + } else { + eprint!("\n\x1b[2K\r\x1b[2A"); } - eprint!("\r"); } }); @@ -218,8 +224,8 @@ async fn main() -> anyhow::Result<()> { let _ = stats_handle.await; let _ = worker_handle.await; - // Clear the two-line live stats display before printing the final summary. - eprint!("\r\x1b[2K\n\x1b[2K\x1b[1A\r"); + // Clear the three-line live stats display before printing the final summary. + eprint!("\r\x1b[2K\n\x1b[2K\n\x1b[2K\x1b[2A\r"); // Print final summary. let guard = sketch.lock().unwrap(); // INVARIANT: no panic inside lock @@ -227,7 +233,7 @@ async fn main() -> anyhow::Result<()> { if ops > 0 { let elapsed = bench_start.elapsed(); let ops_per_sec = ops as f64 / elapsed.as_secs_f64(); - let ops_per_sec_per_conn = ops_per_sec / concurrency as f64; + let ops_per_sec_per_conn = ops_per_sec / pool_size as f64; let bytes_per_sec = ops_per_sec * object_size as f64; let avg = Duration::from_secs_f64(guard.sum().unwrap() / ops as f64); let p50 = Duration::from_secs_f64(guard.quantile(0.5).unwrap().unwrap()); @@ -235,11 +241,12 @@ async fn main() -> anyhow::Result<()> { let p99 = Duration::from_secs_f64(guard.quantile(0.99).unwrap().unwrap()); let max = Duration::from_secs_f64(guard.max().unwrap()); eprintln!( - "\nfinal: {} ops | {:.0} ops/s | {:.2} ops/s/conn | {} | avg: {} p50: {} p95: {} p99: {} max: {}", + "\nfinal: {} ops | {:.0} ops/s | {:.2} ops/s/conn | {} | {}\n avg: {} p50: {} p95: {} p99: {} max: {}", ops.bold(), ops_per_sec.bold(), ops_per_sec_per_conn.bold(), format_throughput(bytes_per_sec).bold(), + format_elapsed(elapsed).bold(), format_ms(avg).bold(), format_ms(p50), format_ms(p95), @@ -256,6 +263,12 @@ fn format_ms(d: Duration) -> String { format!("{:.2}ms", d.as_secs_f64() * 1000.0) } +/// Formats a [`Duration`] as `M:SS`. +fn format_elapsed(d: Duration) -> String { + let secs = d.as_secs(); + format!("{}:{:02}", secs / 60, secs % 60) +} + /// Formats a byte rate using SI decimal prefixes (kB/s, MB/s, GB/s). fn format_throughput(bytes_per_sec: f64) -> String { if bytes_per_sec >= 1e9 {