Skip to content

Commit c7621a2

Browse files
committed
wip
1 parent 997d2b8 commit c7621a2

2 files changed

Lines changed: 21 additions & 31 deletions

File tree

stresstest/config/example.yaml

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,15 @@
1-
remote: http://localhost:18888
1+
remote: http://localhost:8888
22

3-
duration: 5s
3+
duration: 10s
44

55
workloads:
6-
- name: attachments
7-
mode: throughput
8-
organizations: 1000
6+
- name: snapshots
7+
mode: batch
8+
concurrency: 1
99
file_sizes:
10-
p50: 50 KiB
11-
p99: 200 KiB
10+
p50: 5 KiB
11+
p99: 20 KiB
1212
actions:
13-
writes: 10
13+
writes: 20000
1414
reads: 0
1515
deletes: 0
16-
- name: profiling
17-
concurrency: 8
18-
file_sizes:
19-
p50: 15 KiB
20-
p99: 100 KiB
21-
- name: replays
22-
concurrency: 16
23-
file_sizes:
24-
p50: 25 KiB
25-
p99: 400 KiB
26-
- name: nodestore
27-
concurrency: 32
28-
file_sizes:
29-
p50: 5 KiB
30-
p99: 20 KiB

stresstest/src/stresstest.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,10 @@ impl Stresstest {
126126
bar.finish_and_clear();
127127

128128
let mut total_metrics = WorkloadMetrics::default();
129+
let mut max_elapsed = Duration::ZERO;
129130
let workloads = finished_tasks.into_iter().map(|task| {
130-
let (workload, metrics) = task.unwrap();
131+
let (workload, metrics, elapsed) = task.unwrap();
132+
max_elapsed = max_elapsed.max(elapsed);
131133

132134
println!();
133135
println!(
@@ -137,7 +139,7 @@ impl Stresstest {
137139
workload.mode,
138140
workload.concurrency.bold()
139141
);
140-
print_metrics(&metrics, duration);
142+
print_metrics(&metrics, elapsed);
141143

142144
total_metrics.file_sizes.merge(&metrics.file_sizes).unwrap();
143145
total_metrics.bytes_written += metrics.bytes_written;
@@ -163,7 +165,7 @@ impl Stresstest {
163165
let workloads: Vec<_> = workloads.collect();
164166
println!();
165167
println!("{}", "## TOTALS".bold());
166-
print_metrics(&total_metrics, duration);
168+
print_metrics(&total_metrics, max_elapsed);
167169
println!();
168170

169171
if !cleanup {
@@ -230,14 +232,15 @@ async fn run_workload(
230232
workload: Workload,
231233
duration: Duration,
232234
ttl: Option<Duration>,
233-
) -> (Workload, WorkloadMetrics) {
235+
) -> (Workload, WorkloadMetrics, Duration) {
234236
// In throughput mode, allow for a high concurrency value.
235237
let concurrency = match workload.mode {
236238
WorkloadMode::Weighted => workload.concurrency,
237239
WorkloadMode::Throughput => 100,
238240
};
239241

240242
let semaphore = Arc::new(Semaphore::new(concurrency));
243+
let start = Instant::now();
241244
let deadline = tokio::time::Instant::now() + duration;
242245

243246
let workload = Arc::new(Mutex::new(workload));
@@ -330,21 +333,23 @@ async fn run_workload(
330333
}
331334
}
332335

336+
// by acquiring *all* the semaphores, we essentially wait for all outstanding tasks to finish
337+
let _permits = semaphore.acquire_many(concurrency as u32).await;
338+
333339
let metrics: WorkloadMetrics = {
334340
let mut metrics = metrics.lock().unwrap();
335341
std::mem::take(&mut metrics)
336342
};
337343

338-
// by acquiring *all* the semaphores, we essentially wait for all outstanding tasks to finish
339-
let _permits = semaphore.acquire_many(concurrency as u32).await;
340-
341344
let workload = Arc::try_unwrap(workload)
342345
.map_err(|_| ())
343346
.unwrap()
344347
.into_inner()
345348
.unwrap();
346349

347-
(workload, metrics)
350+
let elapsed = start.elapsed();
351+
352+
(workload, metrics, elapsed)
348353
}
349354

350355
fn print_error(message: &str, error: &Error) {

0 commit comments

Comments
 (0)