Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions stresstest/src/stresstest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ impl Stresstest {
bar.finish_and_clear();

let mut total_metrics = WorkloadMetrics::default();
let mut max_elapsed = Duration::ZERO;
let workloads = finished_tasks.into_iter().map(|task| {
let (workload, metrics) = task.unwrap();
let (workload, metrics, elapsed) = task.unwrap();
max_elapsed = max_elapsed.max(elapsed);

println!();
println!(
Expand All @@ -137,7 +139,7 @@ impl Stresstest {
workload.mode,
workload.concurrency.bold()
);
print_metrics(&metrics, duration);
print_metrics(&metrics, elapsed);

total_metrics.file_sizes.merge(&metrics.file_sizes).unwrap();
total_metrics.bytes_written += metrics.bytes_written;
Expand All @@ -163,7 +165,7 @@ impl Stresstest {
let workloads: Vec<_> = workloads.collect();
println!();
println!("{}", "## TOTALS".bold());
print_metrics(&total_metrics, duration);
print_metrics(&total_metrics, max_elapsed);
println!();

if !cleanup {
Expand Down Expand Up @@ -230,14 +232,15 @@ async fn run_workload(
workload: Workload,
duration: Duration,
ttl: Option<Duration>,
) -> (Workload, WorkloadMetrics) {
) -> (Workload, WorkloadMetrics, Duration) {
// In throughput mode, allow for a high concurrency value.
let concurrency = match workload.mode {
WorkloadMode::Weighted => workload.concurrency,
WorkloadMode::Throughput => 100,
};

let semaphore = Arc::new(Semaphore::new(concurrency));
let start = Instant::now();
let deadline = tokio::time::Instant::now() + duration;

let workload = Arc::new(Mutex::new(workload));
Expand Down Expand Up @@ -330,21 +333,23 @@ async fn run_workload(
}
}

// by acquiring *all* the semaphores, we essentially wait for all outstanding tasks to finish
let _permits = semaphore.acquire_many(concurrency as u32).await;

let metrics: WorkloadMetrics = {
let mut metrics = metrics.lock().unwrap();
std::mem::take(&mut metrics)
};

// by acquiring *all* the semaphores, we essentially wait for all outstanding tasks to finish
let _permits = semaphore.acquire_many(concurrency as u32).await;

let workload = Arc::try_unwrap(workload)
.map_err(|_| ())
.unwrap()
.into_inner()
.unwrap();

(workload, metrics)
let elapsed = start.elapsed();

(workload, metrics, elapsed)
}

fn print_error(message: &str, error: &Error) {
Expand Down
Loading