Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bigtable-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mimalloc = { workspace = true }
objectstore-service = { workspace = true }
objectstore-types = { workspace = true }
rand = { workspace = true, features = ["small_rng"] }
rand_distr = "0.5.1"
rustls = { version = "0.23", default-features = false }
sketches-ddsketch = "0.3.0"
tokio = { workspace = true, features = ["rt-multi-thread", "sync", "macros", "signal"] }
Expand Down
3 changes: 2 additions & 1 deletion bigtable-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ cargo run --release -p bigtable-bench -- [OPTIONS]
| Flag | Default | Description |
|------|---------|-------------|
| `-c` | `10` | Concurrent put requests |
| `-s` | `50KB` | Object size (e.g. `4096`, `1MB`) |
| `--p50` | `2KiB` | Median object size (p50) |
| `--p99` | `385KiB` | 99th-percentile object size (p99) |
| `-p` | `1` | Bigtable gRPC connection pool size |
| `-a` | — | Emulator address (omit for real GCP) |
| `--project` | `testing` | GCP project ID |
Expand Down
39 changes: 31 additions & 8 deletions bigtable-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use bytesize::ByteSize;
use futures_util::StreamExt;
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use rand_distr::{Distribution, LogNormal};
use sketches_ddsketch::DDSketch;
use tokio::sync::Semaphore;
use yansi::Paint;
Expand All @@ -31,9 +32,13 @@ struct Args {
#[argh(option, short = 'c', default = "10")]
concurrency: usize,

/// object size, e.g. "50KB" or "4096" (default: 50KB)
#[argh(option, short = 's', default = "ByteSize::kb(50)")]
size: ByteSize,
/// median object size (p50), e.g. "50KB" (default: 2KiB)
#[argh(option, default = "ByteSize::kib(2)")]
p50: ByteSize,

/// 99th-percentile object size (p99), e.g. "200KB" (default: 385KiB)
#[argh(option, default = "ByteSize::kib(385)")]
p99: ByteSize,

/// bigtable gRPC connection pool size (default: 1)
#[argh(option, short = 'p', default = "1")]
Expand Down Expand Up @@ -66,7 +71,13 @@ async fn main() -> anyhow::Result<()> {
.ok(); // INVARIANT: no other code installs a provider before main()

let args: Args = argh::from_env();
let object_size = args.size.as_u64() as usize;

let p50 = args.p50.as_u64() as f64;
let p99 = args.p99.as_u64() as f64;
let mu = p50.ln();
let sigma = (p99.ln() - mu) / 2.3263;
let size_distribution =
LogNormal::new(mu, sigma).expect("invalid size distribution parameters"); // INVARIANT: p50 and p99 are positive

match &args.addr {
Some(addr) => eprintln!(
Expand All @@ -92,20 +103,27 @@ async fn main() -> anyhow::Result<()> {
let backend = Arc::new(backend);

eprintln!(
"starting benchmark: concurrency={}, object_size={}",
args.concurrency, args.size,
"starting benchmark: concurrency={}, p50={}, p99={}",
args.concurrency, args.p50, args.p99,
);

/// Maximum object size regardless of distribution output.
const MAX_OBJECT_SIZE: u64 = 950 * 1024;

let semaphore = Arc::new(Semaphore::new(args.concurrency));
let sketch = Arc::new(Mutex::new(DDSketch::default()));
let failures = Arc::new(AtomicUsize::new(0));
let shutdown = tokio_util::sync::CancellationToken::new();
let pool_size = args.pool;
let size_distribution = Arc::new(size_distribution);
let bench_start = Instant::now();

let total_bytes = Arc::new(AtomicUsize::new(0));

// Stats printer task.
let stats_sketch = Arc::clone(&sketch);
let stats_failures = Arc::clone(&failures);
let stats_bytes = Arc::clone(&total_bytes);
let stats_shutdown = shutdown.clone();
let stats_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
Expand All @@ -123,7 +141,7 @@ async fn main() -> anyhow::Result<()> {
}
let ops_per_sec = ops as f64 / elapsed.as_secs_f64();
let ops_per_sec_per_conn = ops_per_sec / pool_size as f64;
let bytes_per_sec = ops_per_sec * object_size as f64;
let bytes_per_sec = stats_bytes.load(Ordering::Relaxed) as f64 / elapsed.as_secs_f64();
let avg = Duration::from_secs_f64(guard.sum().unwrap() / ops as f64);
let p50 = Duration::from_secs_f64(guard.quantile(0.5).unwrap().unwrap());
let p95 = Duration::from_secs_f64(guard.quantile(0.95).unwrap().unwrap());
Expand Down Expand Up @@ -160,6 +178,7 @@ async fn main() -> anyhow::Result<()> {
let worker_shutdown = shutdown.clone();
let worker_sketch = Arc::clone(&sketch);
let worker_failures = Arc::clone(&failures);
let worker_bytes = Arc::clone(&total_bytes);
let worker_handle = tokio::spawn(async move {
let context = ObjectContext {
usecase: "bench".into(),
Expand All @@ -184,13 +203,16 @@ async fn main() -> anyhow::Result<()> {
let backend = Arc::clone(&backend);
let sketch = Arc::clone(&worker_sketch);
let failures = Arc::clone(&worker_failures);
let bytes = Arc::clone(&worker_bytes);
let size_dist = Arc::clone(&size_distribution);
let context = context.clone();
let metadata = metadata.clone();

tokio::spawn(async move {
let _permit = permit;

let mut rng = SmallRng::from_os_rng();
let object_size = (size_dist.sample(&mut rng) as u64).min(MAX_OBJECT_SIZE) as usize;
let mut buf = vec![0u8; object_size];
rng.fill(&mut buf[..]);

Expand All @@ -207,6 +229,7 @@ async fn main() -> anyhow::Result<()> {
if result.is_err() {
failures.fetch_add(1, Ordering::Relaxed);
} else {
bytes.fetch_add(object_size, Ordering::Relaxed);
sketch.lock().unwrap().add(elapsed.as_secs_f64()); // INVARIANT: no panic inside lock
}
});
Expand All @@ -233,7 +256,7 @@ async fn main() -> anyhow::Result<()> {
let elapsed = bench_start.elapsed();
let ops_per_sec = ops as f64 / elapsed.as_secs_f64();
let ops_per_sec_per_conn = ops_per_sec / pool_size as f64;
let bytes_per_sec = ops_per_sec * object_size as f64;
let bytes_per_sec = total_bytes.load(Ordering::Relaxed) as f64 / elapsed.as_secs_f64();
let avg = Duration::from_secs_f64(guard.sum().unwrap() / ops as f64);
let p50 = Duration::from_secs_f64(guard.quantile(0.5).unwrap().unwrap());
let p95 = Duration::from_secs_f64(guard.quantile(0.95).unwrap().unwrap());
Expand Down
Loading