From 5100ab72fdf48ae530c38c90295b3a91a32e7a9d Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 26 Feb 2026 17:03:09 +0100 Subject: [PATCH 1/2] feat(stresstest): Add batch mode using the client's many() API Adds a third workload mode `batch` (internally `Many`) that sends multiple operations in a single HTTP batch request via `ManyBuilder`. Action weights represent exact counts per batch call. Results are correlated back to track writes/reads for subsequent operations. --- stresstest/config/example.yaml | 11 ++ stresstest/src/http.rs | 2 +- stresstest/src/stresstest.rs | 264 ++++++++++++++++++++++++--------- stresstest/src/workload.rs | 54 +++++++ 4 files changed, 262 insertions(+), 69 deletions(-) diff --git a/stresstest/config/example.yaml b/stresstest/config/example.yaml index 2c6252c9..deee3e48 100644 --- a/stresstest/config/example.yaml +++ b/stresstest/config/example.yaml @@ -28,3 +28,14 @@ workloads: file_sizes: p50: 5 KiB p99: 20 KiB + - name: reprocessing + mode: batch + concurrency: 4 + organizations: 100 + file_sizes: + p50: 10 KiB + p99: 50 KiB + actions: + writes: 5 + reads: 3 + deletes: 1 diff --git a/stresstest/src/http.rs b/stresstest/src/http.rs index ccd80f59..fc3741ca 100644 --- a/stresstest/src/http.rs +++ b/stresstest/src/http.rs @@ -84,7 +84,7 @@ impl HttpRemote { Ok(()) } - fn session(&self, usecase: &Usecase, organization_id: u64) -> Session { + pub(crate) fn session(&self, usecase: &Usecase, organization_id: u64) -> Session { // NB: Reuse the organization ID as project ID to create unique projects. Right now, we do // not benefit from simulating multiple projects per org. usecase diff --git a/stresstest/src/stresstest.rs b/stresstest/src/stresstest.rs index 3a0e7ce2..dfc888f5 100644 --- a/stresstest/src/stresstest.rs +++ b/stresstest/src/stresstest.rs @@ -4,18 +4,21 @@ use std::fmt; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use std::collections::HashMap; + use anyhow::{Error, Result}; use bytesize::ByteSize; use futures::StreamExt; use indicatif::{ProgressBar, ProgressStyle}; -use objectstore_client::{ExpirationPolicy, Usecase}; +use objectstore_client::{ExpirationPolicy, OperationResult, Usecase}; use sketches_ddsketch::DDSketch; use tokio::sync::Semaphore; +use tokio_util::io::ReaderStream; use yansi::Paint; use crate::http::HttpRemote; -use crate::workload::{Action, Workload, WorkloadMode}; +use crate::workload::{Action, ExternalId, InternalId, Workload, WorkloadMode}; /// Stresstest runner that can execute multiple workloads concurrently against a remote. /// @@ -131,7 +134,7 @@ impl Stresstest { println!(); println!( - "{} {} (mode: {:?}, concurrency: {})", + "{} {} (mode: {}, concurrency: {})", "## Workload".bold(), workload.name.bold().blue(), workload.mode, @@ -233,7 +236,7 @@ async fn run_workload( ) -> (Workload, WorkloadMetrics) { // In throughput mode, allow for a high concurrency value. let concurrency = match workload.mode { - WorkloadMode::Weighted => workload.concurrency, + WorkloadMode::Weighted | WorkloadMode::Many => workload.concurrency, WorkloadMode::Throughput => 100, }; @@ -247,6 +250,8 @@ async fn run_workload( let sleep = tokio::time::sleep_until(deadline); tokio::pin!(sleep); + let is_many = matches!(workload.lock().unwrap().mode, WorkloadMode::Many); + loop { if deadline.elapsed() > Duration::ZERO { break; @@ -257,72 +262,27 @@ async fn run_workload( let remote = Arc::clone(&remote); let metrics = Arc::clone(&metrics); - let action = loop { - if let Some(action) = workload.lock().unwrap().next_action() { - break action; - } - - tokio::time::sleep(Duration::from_millis(10)).await; - }; + if is_many { + let task = async move { + run_many_batch(&workload, &remote, &metrics, ttl).await; + drop(permit); + }; + tokio::spawn(task); + } else { + let action = loop { + if let Some(action) = workload.lock().unwrap().next_action() { + break action; + } + tokio::time::sleep(Duration::from_millis(10)).await; + }; - let task = async move { - let start = Instant::now(); - match action { - Action::Write(internal_id, payload) => { - let file_size = payload.len; - let organization_id = workload.lock().unwrap().next_organization_id(); - - let mut usecase = Usecase::new(&workload.lock().unwrap().name); - if let Some(ttl) = ttl { - usecase = usecase.with_expiration_policy(ExpirationPolicy::TimeToLive(ttl)); - } - - match remote.write(&usecase, organization_id, payload).await { - Ok(object_key) => { - let external_id = (usecase, organization_id, object_key); - workload.lock().unwrap().push_file(internal_id, external_id); - let mut metrics = metrics.lock().unwrap(); - metrics.write_timing.add(start.elapsed().as_secs_f64()); - metrics.file_sizes.add(file_size as f64); - metrics.bytes_written += file_size; - } - Err(err) => { - print_error("writing object", &err); - let mut metrics = metrics.lock().unwrap(); - metrics.write_failures += 1; - } - } - } - Action::Read(internal_id, external_id, payload) => { - let file_size = payload.len; - let (usecase, organization_id, object_key) = &external_id; - match remote.read(usecase, *organization_id, object_key, payload).await { - Ok(_) => { - workload.lock().unwrap().push_file(internal_id, external_id); - let mut metrics = metrics.lock().unwrap(); - metrics.read_timing.add(start.elapsed().as_secs_f64()); - metrics.bytes_read += file_size; - } - Err(err) => { - print_error("reading object", &err); - let mut metrics = metrics.lock().unwrap(); - metrics.read_failures += 1; - } - } - } - Action::Delete(external_id) => { - let (usecase, organization_id, object_key) = &external_id; - if let Err(err) = remote.delete(usecase, *organization_id, object_key).await { - print_error("deleting object", &err); - } - let mut metrics = metrics.lock().unwrap(); - metrics.delete_timing.add(start.elapsed().as_secs_f64()); - } - } - drop(permit); - }; - tokio::spawn(task); + let task = async move { + run_single_action(&workload, &remote, &metrics, action, ttl).await; + drop(permit); + }; + tokio::spawn(task); + } } _ = &mut sleep => { break; @@ -347,6 +307,174 @@ async fn run_workload( (workload, metrics) } +async fn run_single_action( + workload: &Arc>, + remote: &Arc, + metrics: &Arc>, + action: Action, + ttl: Option, +) { + let start = Instant::now(); + match action { + Action::Write(internal_id, payload) => { + let file_size = payload.len; + let organization_id = workload.lock().unwrap().next_organization_id(); + + let mut usecase = Usecase::new(&workload.lock().unwrap().name); + if let Some(ttl) = ttl { + usecase = usecase.with_expiration_policy(ExpirationPolicy::TimeToLive(ttl)); + } + + match remote.write(&usecase, organization_id, payload).await { + Ok(object_key) => { + let external_id = (usecase, organization_id, object_key); + workload.lock().unwrap().push_file(internal_id, external_id); + let mut metrics = metrics.lock().unwrap(); + metrics.write_timing.add(start.elapsed().as_secs_f64()); + metrics.file_sizes.add(file_size as f64); + metrics.bytes_written += file_size; + } + Err(err) => { + print_error("writing object", &err); + let mut metrics = metrics.lock().unwrap(); + metrics.write_failures += 1; + } + } + } + Action::Read(internal_id, external_id, payload) => { + let file_size = payload.len; + let (usecase, organization_id, object_key) = &external_id; + match remote + .read(usecase, *organization_id, object_key, payload) + .await + { + Ok(_) => { + workload.lock().unwrap().push_file(internal_id, external_id); + let mut metrics = metrics.lock().unwrap(); + metrics.read_timing.add(start.elapsed().as_secs_f64()); + metrics.bytes_read += file_size; + } + Err(err) => { + print_error("reading object", &err); + let mut metrics = metrics.lock().unwrap(); + metrics.read_failures += 1; + } + } + } + Action::Delete(external_id) => { + let (usecase, organization_id, object_key) = &external_id; + if let Err(err) = remote.delete(usecase, *organization_id, object_key).await { + print_error("deleting object", &err); + } + let mut metrics = metrics.lock().unwrap(); + metrics.delete_timing.add(start.elapsed().as_secs_f64()); + } + } +} + +async fn run_many_batch( + workload: &Arc>, + remote: &Arc, + metrics: &Arc>, + ttl: Option, +) { + let (actions, organization_id, usecase, session) = { + let mut wl = workload.lock().unwrap(); + let actions = wl.next_many_actions(); + if actions.is_empty() { + return; + } + // Always use org 0 so all batches share the same scope. Reads/deletes + // pull from `existing_files` which must live under the same org/usecase. + let organization_id = 0; + let mut usecase = Usecase::new(&wl.name); + if let Some(ttl) = ttl { + usecase = usecase.with_expiration_policy(ExpirationPolicy::TimeToLive(ttl)); + } + let session = remote.session(&usecase, organization_id); + (actions, organization_id, usecase, session) + }; + + // Track writes and reads by key so we can correlate results. + let mut write_tracking: HashMap = HashMap::new(); + let mut read_tracking: HashMap = HashMap::new(); + + let mut builder = session.many(); + for action in actions { + match action { + Action::Write(internal_id, payload) => { + let file_size = payload.len; + let key = internal_id.to_string(); + write_tracking.insert(key.clone(), (internal_id, file_size)); + let stream = ReaderStream::new(payload).boxed(); + builder = builder.push(session.put_stream(stream).compression(None).key(key)); + } + Action::Read(internal_id, external_id, _payload) => { + let (_, _, ref object_key) = external_id; + let key = object_key.clone(); + read_tracking.insert(key.clone(), (internal_id, external_id)); + builder = builder.push(session.get(&key)); + } + Action::Delete(external_id) => { + let (_, _, ref object_key) = external_id; + builder = builder.push(session.delete(object_key)); + } + } + } + + let start = Instant::now(); + let mut results = builder.send(); + + while let Some(result) = results.next().await { + let elapsed = start.elapsed().as_secs_f64(); + match result { + OperationResult::Put(key, Ok(_response)) => { + if let Some((internal_id, file_size)) = write_tracking.remove(&key) { + let external_id = (usecase.clone(), organization_id, key); + workload.lock().unwrap().push_file(internal_id, external_id); + let mut m = metrics.lock().unwrap(); + m.write_timing.add(elapsed); + m.file_sizes.add(file_size as f64); + m.bytes_written += file_size; + } + } + OperationResult::Put(_key, Err(err)) => { + print_error("writing object (batch)", &err.into()); + metrics.lock().unwrap().write_failures += 1; + } + OperationResult::Get(key, Ok(Some(_response))) => { + if let Some((internal_id, external_id)) = read_tracking.remove(&key) { + let file_size = 0; // We don't know the size from the batch response easily + workload.lock().unwrap().push_file(internal_id, external_id); + let mut m = metrics.lock().unwrap(); + m.read_timing.add(elapsed); + m.bytes_read += file_size; + } + } + OperationResult::Get(_key, Ok(None)) => { + print_error( + "reading object (batch)", + &anyhow::anyhow!("object not found"), + ); + metrics.lock().unwrap().read_failures += 1; + } + OperationResult::Get(_key, Err(err)) => { + print_error("reading object (batch)", &err.into()); + metrics.lock().unwrap().read_failures += 1; + } + OperationResult::Delete(_key, Ok(())) => { + metrics.lock().unwrap().delete_timing.add(elapsed); + } + OperationResult::Delete(_key, Err(err)) => { + print_error("deleting object (batch)", &err.into()); + } + OperationResult::Error(err) => { + print_error("batch operation", &err.into()); + } + } + } +} + fn print_error(message: &str, error: &Error) { eprintln!("{} {}", "ERROR:".bold().red(), message.bold()); for source in error.chain() { diff --git a/stresstest/src/workload.rs b/stresstest/src/workload.rs index d2f7775e..62c466fb 100644 --- a/stresstest/src/workload.rs +++ b/stresstest/src/workload.rs @@ -27,6 +27,22 @@ pub enum WorkloadMode { /// /// Actions are used to determine the ops per second for each operation. Throughput, + + /// The workload uses the batch (`many()`) API to send multiple operations per HTTP request. + /// + /// Actions are used to determine the number of writes, reads and deletes per batch call. + #[serde(rename = "batch")] + Many, +} + +impl fmt::Display for WorkloadMode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + WorkloadMode::Weighted => write!(f, "Weighted"), + WorkloadMode::Throughput => write!(f, "Throughput"), + WorkloadMode::Many => write!(f, "Batch"), + } + } } /// A builder for creating a [`Workload`]. @@ -242,7 +258,45 @@ impl Workload { match self.mode { WorkloadMode::Weighted => Some(self.next_action_weighted()), WorkloadMode::Throughput => self.next_action_throughput(), + WorkloadMode::Many => { + unreachable!("Many mode should use next_many_actions() instead") + } + } + } + + /// Returns a batch of actions based on the configured weights. + /// + /// In `Many` mode, weights represent the exact count of each operation type per batch call. + /// Reads and deletes that can't be satisfied (no existing files) are silently skipped. + pub(crate) fn next_many_actions(&mut self) -> Vec { + let num_writes = self.action_distribution.weight(0).unwrap(); + let num_reads = self.action_distribution.weight(1).unwrap(); + let num_deletes = self.action_distribution.weight(2).unwrap(); + + let mut actions = Vec::with_capacity(num_writes + num_reads + num_deletes); + + for _ in 0..num_writes { + let seed = self.rng.next_u64(); + actions.push(Action::Write(InternalId(seed), self.get_payload(seed))); } + + for _ in 0..num_reads { + if let Some((internal, external)) = self.sample_readback() { + actions.push(Action::Read( + internal, + external, + self.get_payload(internal.0), + )); + } + } + + for _ in 0..num_deletes { + if let Some((_internal, external)) = self.sample_readback() { + actions.push(Action::Delete(external)); + } + } + + actions } pub(crate) fn next_organization_id(&mut self) -> u64 { From 9396876f82b952325549c4947f240f88071defa9 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 27 Feb 2026 10:25:06 +0100 Subject: [PATCH 2/2] wip --- stresstest/config/example.yaml | 42 +++++++--------------------------- stresstest/src/stresstest.rs | 21 ++++++++++------- 2 files changed, 21 insertions(+), 42 deletions(-) diff --git a/stresstest/config/example.yaml b/stresstest/config/example.yaml index deee3e48..6dc23a8e 100644 --- a/stresstest/config/example.yaml +++ b/stresstest/config/example.yaml @@ -1,41 +1,15 @@ -remote: http://localhost:18888 +remote: http://localhost:8888 -duration: 5s +duration: 10s workloads: - - name: attachments - mode: throughput - organizations: 1000 - file_sizes: - p50: 50 KiB - p99: 200 KiB - actions: - writes: 10 - reads: 0 - deletes: 0 - - name: profiling - concurrency: 8 - file_sizes: - p50: 15 KiB - p99: 100 KiB - - name: replays - concurrency: 16 - file_sizes: - p50: 25 KiB - p99: 400 KiB - - name: nodestore - concurrency: 32 + - name: snapshots + mode: batch + concurrency: 1 file_sizes: p50: 5 KiB p99: 20 KiB - - name: reprocessing - mode: batch - concurrency: 4 - organizations: 100 - file_sizes: - p50: 10 KiB - p99: 50 KiB actions: - writes: 5 - reads: 3 - deletes: 1 + writes: 20000 + reads: 0 + deletes: 0 diff --git a/stresstest/src/stresstest.rs b/stresstest/src/stresstest.rs index dfc888f5..242a5da2 100644 --- a/stresstest/src/stresstest.rs +++ b/stresstest/src/stresstest.rs @@ -129,8 +129,10 @@ impl Stresstest { bar.finish_and_clear(); let mut total_metrics = WorkloadMetrics::default(); + let mut max_elapsed = Duration::ZERO; let workloads = finished_tasks.into_iter().map(|task| { - let (workload, metrics) = task.unwrap(); + let (workload, metrics, elapsed) = task.unwrap(); + max_elapsed = max_elapsed.max(elapsed); println!(); println!( @@ -140,7 +142,7 @@ impl Stresstest { workload.mode, workload.concurrency.bold() ); - print_metrics(&metrics, duration); + print_metrics(&metrics, elapsed); total_metrics.file_sizes.merge(&metrics.file_sizes).unwrap(); total_metrics.bytes_written += metrics.bytes_written; @@ -166,7 +168,7 @@ impl Stresstest { let workloads: Vec<_> = workloads.collect(); println!(); println!("{}", "## TOTALS".bold()); - print_metrics(&total_metrics, duration); + print_metrics(&total_metrics, max_elapsed); println!(); if !cleanup { @@ -233,7 +235,7 @@ async fn run_workload( workload: Workload, duration: Duration, ttl: Option, -) -> (Workload, WorkloadMetrics) { +) -> (Workload, WorkloadMetrics, Duration) { // In throughput mode, allow for a high concurrency value. let concurrency = match workload.mode { WorkloadMode::Weighted | WorkloadMode::Many => workload.concurrency, @@ -241,6 +243,7 @@ async fn run_workload( }; let semaphore = Arc::new(Semaphore::new(concurrency)); + let start = Instant::now(); let deadline = tokio::time::Instant::now() + duration; let workload = Arc::new(Mutex::new(workload)); @@ -290,21 +293,23 @@ async fn run_workload( } } + // by acquiring *all* the semaphores, we essentially wait for all outstanding tasks to finish + let _permits = semaphore.acquire_many(concurrency as u32).await; + let metrics: WorkloadMetrics = { let mut metrics = metrics.lock().unwrap(); std::mem::take(&mut metrics) }; - // by acquiring *all* the semaphores, we essentially wait for all outstanding tasks to finish - let _permits = semaphore.acquire_many(concurrency as u32).await; - let workload = Arc::try_unwrap(workload) .map_err(|_| ()) .unwrap() .into_inner() .unwrap(); - (workload, metrics) + let elapsed = start.elapsed(); + + (workload, metrics, elapsed) } async fn run_single_action(