diff --git a/.gitignore b/.gitignore index b17513daf..2b7142092 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,12 @@ miden-node-stress-test-* /accounts /data +# Native benchmark runtime artifacts (data dir, logs, snapshots, proofs). +/node-data* +/logs +/snapshots +/benchmark-proofs + # Sqlite db files *.sqlite3 *.sqlite3-shm diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a53aa2aa..10557e1b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,8 @@ - [BREAKING] Removed `miden-node ntx-builder` subcommand and created a separate `miden-ntx-builder` binary ([#2067](https://github.com/0xMiden/node/pull/2067)). - [BREAKING] Reworked note proto types for multi-attachment support: `NoteMetadata` now carries `attachment_schemes` (repeated) and `attachments_commitment` instead of a single `attachment`. `Note` and `NetworkNote` gained an `attachments` field. `NoteSyncRecord` now embeds full `NoteMetadata` instead of `NoteMetadataHeader`. Removed `NoteAttachmentKind` enum and `NoteMetadataHeader` message ([#2078](https://github.com/0xMiden/node/pull/2078)). - [BREAKING] Changed `SyncChainMmr` endpoint: the upper end of the block range we're syncing is now the chain tip with the requested finality level. Validator signature is also returned ([#2075](https://github.com/0xMiden/node/pull/2075)). +- Added `miden-benchmark` binary for end-to-end TPS measurements. `create-proofs` generates locally-proven mint/consume transaction pairs bound to the target node's chain tip; `run-benchmark` submits the bundle and reports peak/mean/window-average TPS plus inclusion latency, all derived from block-header data ([#2073](https://github.com/0xMiden/node/pull/2073)). +- Added `--batch.workers` flag (env `MIDEN_NODE_BLOCK_PRODUCER_BATCH_WORKERS`) to the block-producer to make the previously-hardcoded batch-builder worker pool size configurable; default remains 2 ([#2073](https://github.com/0xMiden/node/pull/2073)). ## v0.14.10 (2026-05-29) diff --git a/Cargo.lock b/Cargo.lock index 64c6977a4..b28a6c7d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2968,6 +2968,25 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "miden-benchmark" +version = "0.15.0" +dependencies = [ + "anyhow", + "clap", + "fs-err", + "miden-node-proto", + "miden-protocol", + "miden-remote-prover-client", + "miden-standards", + "miden-tx", + "rand 0.9.2", + "rayon", + "tokio", + "tonic", + "url", +] + [[package]] name = "miden-block-prover" version = "0.15.0" diff --git a/Cargo.toml b/Cargo.toml index f0c9c8f2e..84556999a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ + "bin/benchmark", "bin/genesis", "bin/network-monitor", "bin/node", @@ -154,6 +155,10 @@ should_panic_without_expect = "allow" # We don't care about the specific panic # Configure `cargo-typos` [workspace.metadata.typos] files.extend-exclude = [ - "*.min.js", # Minified JS bundles (vendored htmx etc.). - "*.svg", # SVG files. + "*.min.js", # Minified JS bundles (vendored htmx etc.). + "*.svg", # SVG files. + "benchmark-proofs/", # miden-benchmark output. + "logs/", # Native benchmark process logs. + "node-data*/", # Native benchmark runtime data dir + any sibling clones (RocksDB LOGs etc.). + "snapshots/", # Snapshot tarballs used by the bench replay workflow. ] diff --git a/Makefile b/Makefile index 2628151c2..1b81264d8 100644 --- a/Makefile +++ b/Makefile @@ -129,6 +129,10 @@ install-stress-test: ## Installs stress-test binary install-network-monitor: ## Installs network monitor binary cargo install --path bin/network-monitor --locked +.PHONY: install-benchmark +install-benchmark: ## Installs the benchmark binary + cargo install --path bin/benchmark --locked + # --- docker -------------------------------------------------------------------------------------- .PHONY: compose-genesis diff --git a/bin/benchmark/Cargo.toml b/bin/benchmark/Cargo.toml new file mode 100644 index 000000000..85fd0c49d --- /dev/null +++ b/bin/benchmark/Cargo.toml @@ -0,0 +1,32 @@ +[package] +authors.workspace = true +description = "A binary to run benchmarks of the Miden network" +edition.workspace = true +exclude.workspace = true +homepage.workspace = true +keywords = ["benchmark", "miden", "node"] +license.workspace = true +name = "miden-benchmark" +publish = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[lints] +workspace = true + +[dependencies] +anyhow = { workspace = true } +clap = { features = ["env", "string"], workspace = true } +fs-err = { workspace = true } +miden-node-proto = { workspace = true } +miden-protocol = { features = ["std", "testing"], workspace = true } +miden-remote-prover-client = { features = ["tx-prover"], workspace = true } +miden-standards = { workspace = true } +miden-tx = { features = ["concurrent", "std"], workspace = true } +rand = { workspace = true } +rayon = { workspace = true } +tokio = { features = ["full"], workspace = true } +tonic = { workspace = true } +url = { features = ["serde"], workspace = true } diff --git a/bin/benchmark/README.md b/bin/benchmark/README.md new file mode 100644 index 000000000..95cba1680 --- /dev/null +++ b/bin/benchmark/README.md @@ -0,0 +1,252 @@ +# Miden benchmark + +A binary for measuring transaction throughput on a Miden node by submitting locally-generated proven transactions over RPC and reporting submission ack rate, block inclusion rate, and end-to-end latency. + +## Overview + +End-to-end benchmarking is split into two phases because of proof generation is expensive and shouldn't be on the critical path of the throughput measurement: + +1. **`create-proofs`**: Generates a faucet, N wallets, and `2 * N` proven + transactions (one mint and one consume per wallet). By default each proof + is produced locally with `LocalTransactionProver`; pass + `--remote-prover-url` to offload proving to a remote prover (see [Using a + remote prover](#using-a-remote-prover)). Each proof is bound to the chain + state of the target node at the moment of generation (genesis commitment, + reference block, initial account commitments, input note nullifiers). The + bundle is written to `./benchmark-proofs/` as serialized blobs. +2. **`run-benchmark`**: Loads the bundle from disk and submits it to the + node's RPC. Mints are submitted sequentially (each mutates the shared + faucet, so order matters) and consumes are submitted with bounded + concurrency. After submission, the run waits a few blocks and scans them + to compute inclusion rate, inclusion TPS, and submit/inclusion latency + percentiles. + +Each proof takes seconds of real proving, so generating a bundle once and re-running submissions against it is the right way to iterate on the node's mempool / block-producer / store throughput. See [Re-usingproofs](#re-using-proofs-across-runs) below. + +## Building + +```sh +make install-benchmark +``` + +## Usage + +### Generate proven transactions + +```sh +miden-benchmark create-proofs \ + --rpc-url http://127.0.0.1:57291 \ + --num-transactions 100 +``` + +Writes the bundle to `./benchmark-proofs/`: + +- `mint_txs.bin`, `mint_tx_inputs.bin` +- `consume_txs.bin`, `consume_tx_inputs.bin` + +### Submit them + +```sh +miden-benchmark run-benchmark \ + --rpc-url http://127.0.0.1:57291 \ + --concurrency 32 \ + --wait-blocks 3 +``` + +Mints go in sequentially, then consumes with the requested concurrency, then the run waits `--wait-blocks` blocks before scanning for inclusion. Per-phase ack rate, RPC latency percentiles, inclusion rate, and inclusion TPS are printed at the end. + +### Using a remote prover + +Pass `--remote-prover-url` to `create-proofs` to offload STARK proving to a +remote prover instead of producing proofs locally: + +```sh +miden-benchmark create-proofs \ + --rpc-url http://127.0.0.1:57291 \ + --num-transactions 100 \ + --remote-prover-url http://prover.example.com:50051 +``` + +The benchmark paces proving requests so that an autoscaling prover fleet has +time to spin up additional workers before being saturated: + +- Dispatch starts at **1 req/s** and bumps by 1 req/s every **3 minutes**, up + to **10 req/s**, and then holds at 10 req/s for the rest of the run. +- Up to **64** proving requests may be in flight at once (independent of the + rate cap). +- A retryable gRPC error from the prover (`ResourceExhausted`, + `Unavailable`, `DeadlineExceeded`, or any transport-level failure) **freezes + the ramp** at the current step for the rest of the run, and the failing + request is retried with exponential backoff (500ms x 2**n, capped at 30s, up + to 10 attempts). +- If the prover URL is unreachable or a non-retryable error is returned, + `create-proofs` exits with a non-zero status after the retry budget is + exhausted. + +Mint executions remain sequential (each mint mutates the shared faucet, so +ordering matters), but proving runs concurrently under the rate limiter. +Consume executions are also serial today, with concurrent proving. + +## Re-using proofs across runs + +A `ProvenTransaction` is pinned to the chain state it was generated against: + +- the node's genesis commitment, +- the reference block header, +- the initial account commitment of the account being modified, +- the input note nullifiers. + +Once a tx is included in a block, the node's state advances nullifiers are recorded and account commitments change. Re-submitting the same proven tx is rejected because the chain has moved past the state the proof was built against. + +**Useful tip: clone the node's data directory before each benchmark run.** If you snapshot the data directory while the node is stopped, then *clone* the snapshot every time before bringing the node back up, the proofs in `./benchmark-proofs/` stay valid indefinitely. Each run is: + +1. Stop the node. +2. Replace the node's working data directory with a fresh copy of the + snapshot. +3. Start the node. +4. `miden-benchmark run-benchmark`. + +## Starting the node + +The benchmark needs a running Miden node with a reachable RPC endpoint. + +### Option A: docker-compose (recommended for benchmarking) + +The repo's `docker-compose.yml` wires up all node components (`store`, +`validator`, `block-producer`, `rpc`, `ntx-builder`) plus telemetry. From the +repo root: + +```sh +make docker-build # build miden-node and miden-validator images +make compose-genesis # wipe the volume, bootstrap a fresh genesis +make compose-up # start the stack (RPC at http://127.0.0.1:57291) +``` + +Stop with `make compose-down`. + +### Option B: running `miden-node` and `miden-validator` directly + +Install both binaries: + +```sh +make install-node +make install-validator +``` + +Bootstrap a fresh data directory (one-time): + +```sh +DATA=./node-data + +miden-validator bootstrap \ + --data-directory $DATA/validator \ + --genesis-block-directory $DATA/genesis \ + --accounts-directory $DATA/accounts + +miden-node store bootstrap \ + --data-directory $DATA/store \ + --genesis-block $DATA/genesis/genesis.dat +``` + +Start each component. The example below backgrounds them with `nohup` and captures logs under `./logs/`. For an interactive run, drop the `nohup` / `&` and put each command in its own terminal. + +```sh +mkdir -p logs + +DATA=./node-data + +nohup miden-validator start \ + --listen 127.0.0.1:50101 \ + --data-directory "$DATA/validator" \ + > logs/validator.log 2>&1 & + +nohup miden-node store start \ + --rpc.listen 127.0.0.1:50001 \ + --ntx-builder.listen 127.0.0.1:50002 \ + --block-producer.listen 127.0.0.1:50003 \ + --data-directory "$DATA/store" \ + > logs/store.log 2>&1 & + +nohup miden-node block-producer start \ + --listen 127.0.0.1:50201 \ + --store.url http://127.0.0.1:50003 \ + --validator.url http://127.0.0.1:50101 \ + --max-txs-per-batch 1024 \ + --max-batches-per-block 64 \ + --block.interval 2s \ + --batch.interval 100ms \ + --batch.workers 16 \ + --mempool.tx-capacity 1000000 \ + > logs/block-producer.log 2>&1 & + +nohup miden-node rpc start \ + --listen 127.0.0.1:57291 \ + --store.url http://127.0.0.1:50001 \ + --block-producer.url http://127.0.0.1:50201 \ + --validator.url http://127.0.0.1:50101 \ + --grpc.timeout 24h \ + --grpc.max_connection_age 24h \ + --grpc.burst_size 100000 \ + --grpc.replenish_n_per_second 100000 \ + --grpc.max_concurrent_connections 1000000 \ + > logs/rpc.log 2>&1 & + +nohup miden-node ntx-builder start \ + --listen 127.0.0.1:50301 \ + --store.url http://127.0.0.1:50002 \ + --block-producer.url http://127.0.0.1:50201 \ + --validator.url http://127.0.0.1:50101 \ + --data-directory "$DATA/ntx-builder" \ + > logs/ntx-builder.log 2>&1 & +``` + +#### Stopping the node + +```sh +pkill -f miden-validator +pkill -f 'miden-node store' +pkill -f 'miden-node block-producer' +pkill -f 'miden-node rpc' +pkill -f 'miden-node ntx-builder' +# Or, if no other miden binaries are running: +pkill -f 'miden-(node|validator)' +``` +## Lifting the TPS ceiling + +At default settings the block-producer caps end-to-end inclusion at **~21 tx/s**, well below the protocol's hard limit. + +### The layered ceiling + +| Cap | Default | Protocol max | Knob | +| -------------------------------------------- | ------- | ------------ | ------------------------------ | +| Transactions per batch | 8 | 1024 | `--max-txs-per-batch` | +| Batches per block | 8 | 64 | `--max-batches-per-block` | +| Block interval | 3 s | n/a | `--block.interval` | +| Batch interval | 1 s | n/a | `--batch.interval` | +| Concurrent batch-builder workers | 2 | n/a | `--batch.workers` | +| Inflight mempool transactions | ~1280 | n/a | `--mempool.tx-capacity` | + +Block throughput ceiling = `max_batches_per_block x max_txs_per_batch / block.interval`. + +- Defaults: `8 x 8 / 3 s ~= 21 tx/s`. +- Protocol max with a 1 s block: `64 x 1024 / 1 s = 65 536 tx/s`. + +Protocol caps are enforced at startup (in `bin/node/src/commands/block_producer.rs`) and require a protocol-level change to lift. Everything else is operator configuration. + +### The batch-builder worker pool (`--batch.workers`) + +`--batch.workers` (env `MIDEN_NODE_BLOCK_PRODUCER_BATCH_WORKERS`) sets how many batches the block-producer keeps proving in parallel. Each worker is responsible for one in-flight batch proof — locally with the built-in prover, or remotely if `--batch-prover.url` is set. The default is **2**. Once `--max-txs-per-batch` and `--max-batches-per-block` are pushed up, this worker count is the single setting that determines how fast the block-producer can refill the mempool's batch slots; leaving it at 2 caps effective throughput well before the new block capacity becomes reachable. + +Rough sizing: + +- **With local batch proving** (no `--batch-prover.url`): raise to roughly + the number of physical CPU cores on the block-producer host. More than + that just over-subscribes the cores running the prover. +- **With a remote batch prover**: raise to whatever the remote service can + service in parallel (i.e. its own worker count). The block-producer + workers are now mostly waiting on I/O, so the bound is the remote + prover's capacity, not local CPU. + +## License + +This project is [MIT licensed](../../LICENSE). diff --git a/bin/benchmark/src/create_proofs.rs b/bin/benchmark/src/create_proofs.rs new file mode 100644 index 000000000..7bfa9e8a2 --- /dev/null +++ b/bin/benchmark/src/create_proofs.rs @@ -0,0 +1,504 @@ +//! The `create-proofs` orchestrator and everything it needs to build the +//! proven-tx bundle locally: +//! +//! - `run` orchestrates the genesis fetch + faucet/wallet construction + mint phase + consume phase +//! + final write-out to `./benchmark-proofs/`. +//! - `create_faucet` / `create_wallet` build the accounts the bench uses. +//! - `BenchmarkDataStore` is the in-memory `DataStore` impl that feeds the `TransactionExecutor` +//! while we generate proofs locally. + +use std::collections::{BTreeSet, HashMap}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use miden_protocol::account::auth::{AuthScheme, AuthSecretKey}; +use miden_protocol::account::{ + Account, + AccountBuilder, + AccountComponent, + AccountId, + AccountStorageMode, + AccountType, + PartialAccount, + StorageMapKey, +}; +use miden_protocol::asset::{ + Asset, + AssetAmount, + AssetVaultKey, + AssetWitness, + FungibleAsset, + TokenSymbol, +}; +use miden_protocol::block::{BlockHeader, BlockNumber}; +use miden_protocol::crypto::dsa::falcon512_poseidon2::SecretKey; +use miden_protocol::crypto::rand::RandomCoin; +use miden_protocol::note::{Note, NoteAttachments, NoteScript, NoteScriptRoot}; +use miden_protocol::transaction::{ + AccountInputs, + InputNote, + InputNotes, + PartialBlockchain, + ProvenTransaction, + TransactionArgs, +}; +use miden_protocol::utils::serde::Serializable; +use miden_protocol::{Felt, MastForest, Word}; +use miden_standards::account::auth::AuthSingleSig; +use miden_standards::account::faucets::{FungibleFaucet, TokenName}; +use miden_standards::account::interface::{AccountInterface, AccountInterfaceExt}; +use miden_standards::account::policies::{ + BurnPolicyConfig, + MintPolicyConfig, + PolicyAuthority, + TokenPolicyManager, +}; +use miden_standards::account::wallets::BasicWallet; +use miden_standards::note::P2idNote; +use miden_tx::auth::BasicAuthenticator; +use miden_tx::{ + DataStore, + DataStoreError, + MastForestStore, + TransactionExecutor, + TransactionMastStore, +}; +use rand::Rng; +use rayon::prelude::*; +use url::Url; + +use crate::prover::BenchmarkProver; +use crate::rpc_state::{fetch_chain_tip_header, fetch_partial_blockchain}; +use crate::summary::print_proving_summary; +use crate::{ + PROOFS_DIR, + create_genesis_aware_rpc_client, + get_genesis_header_request, + write_to_file, +}; + +// PROVING TASK HELPERS +// ================================================================================================ + +/// Result of a single spawned proving task: the proof attempt and the wall +/// time that task spent (which, for the remote path, includes rate-limit and +/// retry waits). +type ProveOutcome = (anyhow::Result, Duration); + +/// Await every spawned proving task in spawn order, returning the proofs in +/// that same order plus the summed per-task wall time. If any task fails (or +/// panics) we print the error and exit with a non-zero status. Proven txs +/// later in the bundle reference earlier ones, so a single failure means the +/// bundle is unusable anyway. +async fn collect_proofs( + label: &str, + tasks: Vec>, +) -> (Vec, Duration) { + let mut proofs = Vec::with_capacity(tasks.len()); + let mut total = Duration::ZERO; + for (i, handle) in tasks.into_iter().enumerate() { + let (result, elapsed) = handle.await.unwrap_or_else(|err| { + eprintln!("{label} proving task {i} panicked: {err}"); + std::process::exit(1); + }); + total += elapsed; + match result { + Ok(tx) => proofs.push(tx), + Err(err) => { + eprintln!("{label} proving failed for tx {i}: {err:#}"); + std::process::exit(1); + }, + } + } + (proofs, total) +} + +// ORCHESTRATOR +// ================================================================================================ + +#[expect( + clippy::too_many_lines, + reason = "single linear orchestration of genesis fetch + mint phase + consume phase; \ + splitting would just shuffle locals (faucet, data_store, authenticator) around" +)] +pub(crate) async fn run(rpc_url: Url, num_transactions: u64, remote_prover_url: Option) { + let mut rpc_client = create_genesis_aware_rpc_client(&rpc_url, Duration::from_secs(10)) + .await + .unwrap(); + + println!("Fetching genesis block header from {rpc_url}..."); + let genesis_header_proto = rpc_client + .get_block_header_by_number(get_genesis_header_request()) + .await + .unwrap() + .into_inner() + .block_header + .expect("RPC returned no block header"); + let genesis_header: BlockHeader = genesis_header_proto.try_into().unwrap(); + + println!("Fetching chain tip header..."); + let ref_block_header = fetch_chain_tip_header(&mut rpc_client).await; + let ref_block_num = ref_block_header.block_num(); + println!(" ref block = {ref_block_num} (proofs will bind to this block's chain state)"); + + println!("Fetching chain MMR up to ref block..."); + let partial_blockchain = + fetch_partial_blockchain(&mut rpc_client, ref_block_num.as_u32(), &genesis_header).await; + + println!("Creating faucet..."); + let (mut faucet, faucet_secret_key) = create_faucet(); + + let coin_seed: [u64; 4] = rand::rng().random(); + let mut seed_rng = RandomCoin::new(coin_seed.map(Felt::new).into()); + let wallet_secret_key = SecretKey::with_rng(&mut seed_rng); + let wallet_public_key = wallet_secret_key.public_key(); + + println!("Creating {num_transactions} wallets in parallel..."); + let wallets: Vec = (0..num_transactions) + .into_par_iter() + .map(|index| create_wallet(&wallet_public_key, index)) + .collect(); + + let mut data_store = BenchmarkDataStore::new(ref_block_header.clone(), partial_blockchain); + data_store.add_account(faucet.clone()); + for wallet in &wallets { + data_store.add_account(wallet.clone()); + } + + let authenticator = BasicAuthenticator::new(&[ + AuthSecretKey::Falcon512Poseidon2(faucet_secret_key), + AuthSecretKey::Falcon512Poseidon2(wallet_secret_key), + ]); + + let prover = Arc::new(match remote_prover_url { + Some(url) => { + println!("Using remote prover at {url} (rate-limited ramp from 1 to 10 req/s)."); + BenchmarkProver::remote(url) + }, + None => BenchmarkProver::local(), + }); + let faucet_id = faucet.id(); + + // Mint phase: executions are sequential (each mutates the shared faucet), + // but proving runs concurrently on the prover (under the rate limiter when + // remote). + println!("Executing {num_transactions} mint transactions (sequential)..."); + let mut mint_tasks: Vec> = + Vec::with_capacity(num_transactions as usize); + let mut mint_tx_inputs: Vec> = Vec::with_capacity(num_transactions as usize); + let mut mint_notes: Vec = Vec::with_capacity(num_transactions as usize); + let mint_phase_start = Instant::now(); + let mut mint_exec_total = Duration::ZERO; + + for index in 0..num_transactions { + let wallet_id = wallets[index as usize].id(); + let note = { + let asset = Asset::Fungible(FungibleAsset::new(faucet_id, 10).unwrap()); + P2idNote::create( + faucet_id, + wallet_id, + vec![asset], + miden_protocol::note::NoteType::Public, + NoteAttachments::empty(), + &mut seed_rng, + ) + .expect("note creation failed") + }; + + let account_interface = AccountInterface::from_account(&faucet); + let script = account_interface + .build_send_notes_script(&[note.clone().into()], None) + .expect("failed to build mint send-notes script"); + + let mut tx_args = TransactionArgs::default().with_tx_script(script); + tx_args.add_output_note_recipient(Box::new(note.recipient().clone())); + + let executor = TransactionExecutor::new(&data_store).with_authenticator(&authenticator); + + let exec_t0 = Instant::now(); + let executed_tx = Box::pin(executor.execute_transaction( + faucet_id, + ref_block_num, + InputNotes::default(), + tx_args, + )) + .await + .expect("failed to execute mint transaction"); + mint_exec_total += exec_t0.elapsed(); + + let tx_inputs_bytes = executed_tx.tx_inputs().to_bytes(); + let delta = executed_tx.account_delta().clone(); + + // Evolve the faucet state for the next iteration before we hand the + // executed tx off for proving. The first mint of a never-before-seen + // account produces a full-state delta (because the delta carries the + // freshly deployed code); subsequent mints produce partial-state + // deltas that can be applied incrementally. + if delta.is_full_state() { + faucet = Account::try_from(&delta) + .expect("failed to materialize faucet from full-state delta"); + } else { + faucet.apply_delta(&delta).expect("failed to apply faucet delta"); + } + data_store.add_account(faucet.clone()); + + let prover = Arc::clone(&prover); + mint_tasks.push(tokio::spawn(async move { + let prove_t0 = Instant::now(); + let result = prover.prove(executed_tx).await; + (result, prove_t0.elapsed()) + })); + mint_tx_inputs.push(tx_inputs_bytes); + mint_notes.push(note); + + if (index + 1) % 10 == 0 || index + 1 == num_transactions { + println!(" executed {} / {num_transactions} mint txs", index + 1); + } + } + + println!("Awaiting {num_transactions} mint proofs..."); + let (mint_txs, mint_prove_total) = collect_proofs("mint", mint_tasks).await; + let mint_phase_elapsed = mint_phase_start.elapsed(); + print_proving_summary( + "Mint", + num_transactions, + mint_phase_elapsed, + mint_exec_total, + mint_prove_total, + ); + + // Consume phase — same shape: sequential executions, concurrent proving. + println!("Executing {num_transactions} consume transactions (sequential)..."); + let mut consume_tasks: Vec> = + Vec::with_capacity(num_transactions as usize); + let mut consume_tx_inputs: Vec> = Vec::with_capacity(num_transactions as usize); + let consume_phase_start = Instant::now(); + let mut consume_exec_total = Duration::ZERO; + + for index in 0..num_transactions { + let wallet_id = wallets[index as usize].id(); + let note = mint_notes[index as usize].clone(); + let input_note = InputNote::Unauthenticated { note }; + let input_notes = + InputNotes::new(vec![input_note]).expect("failed to construct input notes for consume"); + + let executor = TransactionExecutor::new(&data_store).with_authenticator(&authenticator); + + let exec_t0 = Instant::now(); + let executed_tx = Box::pin(executor.execute_transaction( + wallet_id, + ref_block_num, + input_notes, + TransactionArgs::default(), + )) + .await + .expect("failed to execute consume transaction"); + consume_exec_total += exec_t0.elapsed(); + + let tx_inputs_bytes = executed_tx.tx_inputs().to_bytes(); + + let prover = Arc::clone(&prover); + consume_tasks.push(tokio::spawn(async move { + let prove_t0 = Instant::now(); + let result = prover.prove(executed_tx).await; + (result, prove_t0.elapsed()) + })); + consume_tx_inputs.push(tx_inputs_bytes); + + if (index + 1) % 10 == 0 || index + 1 == num_transactions { + println!(" executed {} / {num_transactions} consume txs", index + 1); + } + } + + println!("Awaiting {num_transactions} consume proofs..."); + let (consume_txs, consume_prove_total) = collect_proofs("consume", consume_tasks).await; + let consume_phase_elapsed = consume_phase_start.elapsed(); + print_proving_summary( + "Consume", + num_transactions, + consume_phase_elapsed, + consume_exec_total, + consume_prove_total, + ); + + let out_dir = PathBuf::from(PROOFS_DIR); + println!("Writing proofs to {}/", out_dir.display()); + fs_err::create_dir_all(&out_dir).unwrap(); + write_to_file(&out_dir.join("mint_txs.bin"), &mint_txs); + write_to_file(&out_dir.join("mint_tx_inputs.bin"), &mint_tx_inputs); + write_to_file(&out_dir.join("consume_txs.bin"), &consume_txs); + write_to_file(&out_dir.join("consume_tx_inputs.bin"), &consume_tx_inputs); + println!("Done."); +} + +// ACCOUNT BUILDERS +// ================================================================================================ + +/// Creates a new faucet account and returns it alongside its secret key. +fn create_faucet() -> (Account, SecretKey) { + let coin_seed: [u64; 4] = rand::rng().random(); + let mut rng = RandomCoin::new(coin_seed.map(Felt::new).into()); + let key_pair = SecretKey::with_rng(&mut rng); + let init_seed = [0_u8; 32]; + + let fungible_faucet: AccountComponent = FungibleFaucet::builder() + .name(TokenName::new("BENCHMARK").unwrap()) + .symbol(TokenSymbol::new("BCM").unwrap()) + .decimals(2) + .max_supply(AssetAmount::new(FungibleAsset::MAX_AMOUNT).unwrap()) + .build() + .unwrap() + .into(); + + let faucet = AccountBuilder::new(init_seed) + .account_type(AccountType::FungibleFaucet) + .storage_mode(AccountStorageMode::Private) + .with_component(fungible_faucet) + .with_components(TokenPolicyManager::new( + PolicyAuthority::AuthControlled, + MintPolicyConfig::AllowAll, + BurnPolicyConfig::AllowAll, + )) + .with_auth_component(AuthSingleSig::new( + key_pair.public_key().into(), + AuthScheme::Falcon512Poseidon2, + )) + .build() + .unwrap(); + (faucet, key_pair) +} + +/// Creates a new wallet account with the given public key, using `index` to vary +/// the init seed so each wallet ends up with a distinct account ID. +fn create_wallet( + public_key: &miden_protocol::crypto::dsa::falcon512_poseidon2::PublicKey, + index: u64, +) -> Account { + let init_seed: Vec<_> = index.to_be_bytes().into_iter().chain([0u8; 24]).collect(); + AccountBuilder::new(init_seed.try_into().unwrap()) + .account_type(AccountType::RegularAccountImmutableCode) + .storage_mode(AccountStorageMode::Private) + .with_auth_component(AuthSingleSig::new( + public_key.clone().into(), + AuthScheme::Falcon512Poseidon2, + )) + .with_component(BasicWallet) + .build() + .unwrap() +} + +// BENCHMARK DATA STORE +// ================================================================================================ + +/// In-memory `DataStore` impl used to feed the [`TransactionExecutor`] when +/// generating real proofs locally. Modelled on the network-monitor's +/// `MonitorDataStore`. +struct BenchmarkDataStore { + accounts: HashMap, + block_header: BlockHeader, + partial_block_chain: PartialBlockchain, + mast_store: TransactionMastStore, +} + +impl BenchmarkDataStore { + fn new(block_header: BlockHeader, partial_block_chain: PartialBlockchain) -> Self { + Self { + accounts: HashMap::new(), + block_header, + partial_block_chain, + mast_store: TransactionMastStore::new(), + } + } + + fn add_account(&mut self, account: Account) { + self.mast_store.load_account_code(account.code()); + self.accounts.insert(account.id(), account); + } + + fn get_account(&self, account_id: AccountId) -> Result<&Account, DataStoreError> { + self.accounts.get(&account_id).ok_or_else(|| DataStoreError::Other { + error_msg: "unknown account".into(), + source: None, + }) + } +} + +impl DataStore for BenchmarkDataStore { + async fn get_transaction_inputs( + &self, + account_id: AccountId, + _block_refs: BTreeSet, + ) -> Result<(PartialAccount, BlockHeader, PartialBlockchain), DataStoreError> { + let account = self.get_account(account_id)?; + let partial_account = PartialAccount::from(account); + Ok((partial_account, self.block_header.clone(), self.partial_block_chain.clone())) + } + + async fn get_storage_map_witness( + &self, + account_id: AccountId, + map_root: Word, + map_key: StorageMapKey, + ) -> Result { + let account = self.get_account(account_id)?; + for slot in account.storage().slots() { + if let miden_protocol::account::StorageSlotContent::Map(map) = slot.content() { + if map.root() == map_root { + return Ok(map.open(&map_key)); + } + } + } + Err(DataStoreError::Other { + error_msg: format!("no storage map with the requested root in account {account_id}") + .into(), + source: None, + }) + } + + async fn get_foreign_account_inputs( + &self, + _foreign_account_id: AccountId, + _ref_block: BlockNumber, + ) -> Result { + unimplemented!("foreign account inputs are not needed for the benchmark") + } + + async fn get_vault_asset_witnesses( + &self, + account_id: AccountId, + vault_root: Word, + vault_keys: BTreeSet, + ) -> Result, DataStoreError> { + let account = self.get_account(account_id)?; + + if account.vault().root() != vault_root { + return Err(DataStoreError::Other { + error_msg: "vault root mismatch".into(), + source: None, + }); + } + + Result::, _>::from_iter(vault_keys.into_iter().map(|vault_key| { + AssetWitness::new(account.vault().open(vault_key).into()).map_err(|err| { + DataStoreError::Other { + error_msg: "failed to open vault asset tree".into(), + source: Some(Box::new(err)), + } + }) + })) + } + + async fn get_note_script( + &self, + _script_root: NoteScriptRoot, + ) -> Result, DataStoreError> { + Ok(None) + } +} + +impl MastForestStore for BenchmarkDataStore { + fn get(&self, procedure_hash: &Word) -> Option> { + self.mast_store.get(procedure_hash) + } +} diff --git a/bin/benchmark/src/inclusion.rs b/bin/benchmark/src/inclusion.rs new file mode 100644 index 000000000..1f2f7b027 --- /dev/null +++ b/bin/benchmark/src/inclusion.rs @@ -0,0 +1,235 @@ +//! Post-submission inclusion scan. +//! +//! `scan_with_drain` is the one-shot watcher: it polls the chain past a +//! starting height, scans each new block for the txs we submitted, and +//! exits as soon as every submitted tx has been seen on-chain — falling +//! back to a `max_blocks` bound if some submissions never land. The result +//! carries per-block hit counts plus the scan span used to derive the +//! average block interval. + +use std::collections::HashMap; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use miden_node_proto::clients::RpcClient; +use miden_node_proto::generated as proto; +use miden_node_proto::generated::rpc::BlockHeaderByNumberRequest; +use miden_protocol::block::{BlockHeader, SignedBlock}; +use miden_protocol::transaction::TransactionId; +use miden_protocol::utils::serde::Deserializable; + +/// One scanned block that contained at least one of our txs. Empty blocks +/// in the scan range are not represented here. +#[derive(Debug, Clone, Copy)] +pub(crate) struct BlockHit { + /// On-chain block number. + pub(crate) block_num: u32, + /// Unix-seconds timestamp from the block header. + pub(crate) block_ts: u32, + /// Number of our txs included in this block. + pub(crate) hit_count: u32, +} + +#[derive(Debug)] +pub(crate) struct InclusionResult { + pub(crate) submitted_count: u64, + pub(crate) included_count: u64, + /// One entry per block in the scan range that included any of our txs, + /// in scan order. Throughput metrics are derived from this list plus + /// the block interval inferred from the scan span (see + /// [`InclusionResult::derived_block_interval`]). + pub(crate) per_block_hits: Vec, + /// For each successfully submitted tx that landed in a block: the + /// elapsed time from RPC ack to that block's header timestamp. + pub(crate) inclusion_latencies: Vec, + /// Number of blocks the inclusion scan successfully read headers for. + pub(crate) scanned_block_count: u32, + /// Header timestamps of the first and last successfully scanned blocks + /// (unix seconds). Together with `scanned_block_count`, used to derive + /// the block interval at print time. + pub(crate) scanned_first_ts: u32, + pub(crate) scanned_last_ts: u32, +} + +impl InclusionResult { + /// Derive the average block interval from the scan span. Returns `None` + /// when the scan touched fewer than two blocks or when all scanned + /// headers share the same 1-second-resolution timestamp (sub-second + /// cadence), in which case the bench cannot determine the interval + /// from headers alone. + pub(crate) fn derived_block_interval(&self) -> Option { + if self.scanned_block_count < 2 || self.scanned_last_ts <= self.scanned_first_ts { + return None; + } + let span_secs = u64::from(self.scanned_last_ts - self.scanned_first_ts); + let intervals = u64::from(self.scanned_block_count - 1); + // f64 keeps the fractional seconds when the cadence is finer than 1s + // *and* the scan crosses enough one-second boundaries. + #[expect( + clippy::cast_precision_loss, + reason = "block counts and timestamp deltas are tiny in practice" + )] + let interval_secs = (span_secs as f64) / (intervals as f64); + Some(Duration::from_secs_f64(interval_secs)) + } +} + +/// Watch the chain advance past `start_height` and scan each new block for +/// our submitted txs as it lands. Stops as soon as every entry in `ack_by_id` +/// has been matched (early-exit), or after `max_blocks` blocks past +/// `start_height` have been scanned without draining (timeout) — whichever +/// comes first. Returns the final scanned block number alongside the +/// inclusion stats; if early-exit fires, the returned `h_final` is the +/// block that completed the drain. +#[expect( + clippy::too_many_lines, + reason = "polling + per-block deserialization + tx-id matching is intentionally inline; \ + the alternative is to thread eight pieces of mutable state through a helper, \ + which obscures the read flow without changing the logic" +)] +pub(crate) async fn scan_with_drain( + mut client: RpcClient, + start_height: u32, + max_blocks: u32, + mut ack_by_id: HashMap, +) -> (u32, InclusionResult) { + let submitted_count = ack_by_id.len() as u64; + let mut included_count: u64 = 0; + let mut per_block_hits: Vec = Vec::new(); + let mut inclusion_latencies: Vec = Vec::new(); + let mut scanned_block_count: u32 = 0; + let mut scanned_first_ts: u32 = 0; + let mut scanned_last_ts: u32 = 0; + + let max_target = start_height.saturating_add(max_blocks); + let mut next_block = start_height + 1; + let mut last_seen_height = start_height; + let mut h_final = start_height; + + 'outer: loop { + // Refresh the chain tip and announce changes. + let tip = current_block_height(client.clone()).await; + if tip != last_seen_height { + println!(" block height: {tip}"); + last_seen_height = tip; + } + + // Scan every unwatched block, capped at the max-bound target. + let scan_to = tip.min(max_target); + while next_block <= scan_to { + let request = proto::blockchain::BlockRequest { + block_num: next_block, + include_proof: None, + }; + let response = match client.get_block_by_number(request).await { + Ok(r) => r.into_inner(), + Err(status) => { + eprintln!( + " warning: get_block_by_number({next_block}) failed: {status} \ + — skipping this block in the inclusion scan" + ); + next_block += 1; + continue; + }, + }; + let Some(bytes) = response.block else { + next_block += 1; + continue; + }; + let signed_block = match SignedBlock::read_from_bytes(&bytes) { + Ok(sb) => sb, + Err(err) => { + eprintln!( + " warning: failed to deserialize SignedBlock for block {next_block}: {err}" + ); + next_block += 1; + continue; + }, + }; + + let block_ts = signed_block.header().timestamp(); + let block_ts_system = UNIX_EPOCH + Duration::from_secs(u64::from(block_ts)); + + // Track scan span so we can derive the block interval at print time. + if scanned_block_count == 0 { + scanned_first_ts = block_ts; + } + scanned_last_ts = block_ts; + scanned_block_count += 1; + + let mut hits_in_this_block: u32 = 0; + for header in signed_block.body().transactions().as_slice() { + if let Some(ack_at) = ack_by_id.remove(&header.id()) { + hits_in_this_block += 1; + included_count += 1; + // Block timestamps have 1-second resolution and may round + // down past the ack instant; clamp negative deltas to zero. + let latency = block_ts_system.duration_since(ack_at).unwrap_or_default(); + inclusion_latencies.push(latency); + } + } + + if hits_in_this_block > 0 { + per_block_hits.push(BlockHit { + block_num: next_block, + block_ts, + hit_count: hits_in_this_block, + }); + } + + h_final = next_block; + next_block += 1; + + // Early exit: pending set drained — every submitted tx is on chain. + if ack_by_id.is_empty() { + println!( + " all {submitted_count} submitted tx(s) included by block {h_final}; \ + stopping scan early" + ); + break 'outer; + } + } + + // Hit the safety bound but still have pending txs. Stop and report + // what we have; the unaccounted-for txs will show as drop in the + // summary. + if next_block > max_target { + println!( + " reached max wait of {max_blocks} blocks past height {start_height}; \ + stopping with {} tx(s) still pending", + ack_by_id.len(), + ); + break; + } + + // Pace the polling. + tokio::time::sleep(Duration::from_millis(500)).await; + } + + let inclusion = InclusionResult { + submitted_count, + included_count, + per_block_hits, + inclusion_latencies, + scanned_block_count, + scanned_first_ts, + scanned_last_ts, + }; + (h_final, inclusion) +} + +pub(crate) async fn current_block_height(mut client: RpcClient) -> u32 { + let response = client + .get_block_header_by_number(BlockHeaderByNumberRequest { + block_num: None, + include_mmr_proof: None, + }) + .await + .expect("failed to fetch latest block header") + .into_inner(); + let header: BlockHeader = response + .block_header + .expect("no block header in response") + .try_into() + .expect("failed to decode block header"); + header.block_num().as_u32() +} diff --git a/bin/benchmark/src/main.rs b/bin/benchmark/src/main.rs new file mode 100644 index 000000000..d2cad3b7e --- /dev/null +++ b/bin/benchmark/src/main.rs @@ -0,0 +1,181 @@ +//! Runs benchmarks. +//! +//! Each subcommand's body lives in its own module (`create_proofs`, `submit`). +//! `main.rs` is just the clap CLI + dispatch + a few shared utilities both +//! orchestrators need (RPC client setup with genesis metadata, file I/O +//! helpers, and the proofs-bundle directory). + +use std::path::Path; +use std::time::Duration; + +use anyhow::{Context, Result}; +use clap::{Parser, Subcommand}; +use miden_node_proto::clients::{Builder, RpcClient}; +use miden_node_proto::generated::rpc::BlockHeaderByNumberRequest; +use miden_protocol::block::{BlockHeader, BlockNumber}; +use miden_protocol::utils::serde::{Deserializable, Serializable}; +use url::Url; + +mod create_proofs; +mod inclusion; +mod prover; +mod rpc_state; +mod submit; +mod summary; + +// SHARED CONSTANTS +// ================================================================================================ + +pub(crate) const PROOFS_DIR: &str = "./benchmark-proofs"; + +// COMMANDS +// ================================================================================================ + +#[derive(Parser)] +#[command(version, about, long_about = None)] +pub struct Cli { + #[command(subcommand)] + pub command: Command, +} + +#[derive(Subcommand)] +pub enum Command { + CreateProofs { + /// RPC endpoint of the target miden node — used to discover the + /// genesis commitment that the generated proofs are bound to. Must + /// match the node you intend to submit the proofs against. + #[arg(long, default_value = "http://127.0.0.1:57291")] + rpc_url: Url, + /// Number of mint + consume transaction pairs to generate. Each + /// pair takes seconds of real STARK proving, so start small. + #[arg(long, default_value_t = 10)] + num_transactions: u64, + /// If set, proofs are produced by the remote prover at this URL + /// instead of locally. Dispatch is rate-limited: starts at 1 req/s, + /// bumps by 1 req/s every 3 minutes up to 10 req/s, and freezes at + /// the current step if the prover returns a retryable error + /// (resource-exhausted, unavailable, or deadline-exceeded). If unset, + /// proving runs locally with `LocalTransactionProver`. + #[arg(long)] + remote_prover_url: Option, + }, + RunBenchmark { + /// RPC endpoint of the target miden node. + #[arg(long, default_value = "http://127.0.0.1:57291")] + rpc_url: Url, + /// Number of concurrent submission tasks. + #[arg(long, default_value_t = 32)] + concurrency: usize, + /// Maximum number of blocks past the submission point to scan + /// before giving up. The scan exits early as soon as every submitted + /// tx has been seen on-chain, so this is an upper bound on the + /// wait, not a fixed delay. Bump this when running large batches + /// that may take many blocks to fully include. + #[arg(long, default_value_t = 30)] + wait_blocks: u32, + }, +} + +#[tokio::main] +async fn main() { + let cli = Cli::parse(); + cli.run().await; +} + +impl Cli { + async fn run(self) { + match self.command { + Command::CreateProofs { + rpc_url, + num_transactions, + remote_prover_url, + } => { + create_proofs::run(rpc_url, num_transactions, remote_prover_url).await; + }, + Command::RunBenchmark { rpc_url, concurrency, wait_blocks } => { + submit::run(rpc_url, concurrency, wait_blocks).await; + }, + } + } +} + +// SHARED INFRA +// ================================================================================================ + +/// Create an RPC client configured with the correct genesis metadata in the +/// `Accept` header so that write RPCs such as `SubmitProvenTransaction` are +/// accepted by the node. +pub(crate) async fn create_genesis_aware_rpc_client( + rpc_url: &Url, + timeout: Duration, +) -> Result { + let use_tls = rpc_url.scheme() == "https"; + + let tls_stage = Builder::new(rpc_url.clone()); + let timeout_stage = if use_tls { + tls_stage.with_tls().context("Failed to configure TLS for RPC client")? + } else { + tls_stage.without_tls() + }; + let mut rpc: RpcClient = timeout_stage + .with_timeout(timeout) + .without_metadata_version() + .without_metadata_genesis() + .without_otel_context_injection() + .connect() + .await + .context("Failed to create RPC client for genesis discovery")?; + + let response = rpc + .get_block_header_by_number(get_genesis_header_request()) + .await + .context("Failed to get genesis block header from RPC")? + .into_inner(); + + let genesis_block_header = response + .block_header + .ok_or_else(|| anyhow::anyhow!("No block header in response"))?; + + let genesis_header: BlockHeader = + genesis_block_header.try_into().context("Failed to convert block header")?; + + let genesis_commitment = genesis_header.commitment(); + let genesis = genesis_commitment.to_hex(); + + let tls_stage = Builder::new(rpc_url.clone()); + let timeout_stage = if use_tls { + tls_stage.with_tls().context("Failed to configure TLS for RPC client")? + } else { + tls_stage.without_tls() + }; + let rpc_client = timeout_stage + .with_timeout(timeout) + .without_metadata_version() + .with_metadata_genesis(genesis) + .without_otel_context_injection() + .connect() + .await + .context("Failed to connect to RPC server with genesis metadata")?; + + Ok(rpc_client) +} + +pub(crate) fn get_genesis_header_request() -> BlockHeaderByNumberRequest { + BlockHeaderByNumberRequest { + block_num: Some(BlockNumber::GENESIS.as_u32()), + include_mmr_proof: None, + } +} + +pub(crate) fn read_from_file(path: &Path) -> T { + let bytes = fs_err::read(path).unwrap_or_else(|_| { + panic!("failed to read {} — run `create-proofs` first", path.display()) + }); + T::read_from_bytes(&bytes) + .unwrap_or_else(|_| panic!("failed to deserialize {}", path.display())) +} + +pub(crate) fn write_to_file(path: &Path, value: &T) { + fs_err::write(path, value.to_bytes()) + .unwrap_or_else(|err| panic!("failed to write {}: {err}", path.display())); +} diff --git a/bin/benchmark/src/prover.rs b/bin/benchmark/src/prover.rs new file mode 100644 index 000000000..de258b4f3 --- /dev/null +++ b/bin/benchmark/src/prover.rs @@ -0,0 +1,274 @@ +//! Pluggable prover for the `create-proofs` orchestrator. +//! +//! - [`BenchmarkProver::Local`] keeps the current `LocalTransactionProver` path (the default when +//! `--remote-prover-url` is not set). +//! - [`BenchmarkProver::Remote`] talks to a deployed remote prover. To avoid slamming an +//! autoscaling fleet at t=0, requests are paced by a [`RampingRateLimiter`] that starts at +//! [`START_RATE`] rps and bumps by 1 rps every [`STEP_DURATION`] until it hits [`MAX_RATE`]. +//! Retryable gRPC errors (resource-exhausted, unavailable, deadline-exceeded, or any +//! transport-level failure) freeze the ramp at the current step for the rest of the run. + +use std::error::Error as _; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::time::{Duration, Instant}; + +use anyhow::Result; +use miden_protocol::transaction::{ExecutedTransaction, ProvenTransaction, TransactionInputs}; +use miden_remote_prover_client::RemoteTransactionProver; +use miden_tx::{LocalTransactionProver, TransactionProverError}; +use tokio::sync::{Mutex, Semaphore}; + +// SCHEDULE CONSTANTS +// ================================================================================================ + +const START_RATE: u32 = 1; +const MAX_RATE: u32 = 10; +const STEP_DURATION: Duration = Duration::from_secs(180); + +/// Per-request gRPC deadline. STARK transaction proofs routinely exceed the +/// default 10s, so we override it. Anything past this is treated as a +/// retryable error. +const PROVE_TIMEOUT: Duration = Duration::from_secs(120); + +/// Cap on the number of proving requests in flight at once, independent of the +/// rate. At [`MAX_RATE`] with ~30s proof latency we'd otherwise stack hundreds +/// of in-flight requests. +const MAX_IN_FLIGHT: usize = 64; + +// RETRY CONSTANTS +// ================================================================================================ + +const RETRY_BASE: Duration = Duration::from_millis(500); +const RETRY_MAX_BACKOFF: Duration = Duration::from_secs(30); +const RETRY_MAX_ATTEMPTS: u32 = 10; +const RETRY_BACKOFF_SHIFT_CAP: u32 = 6; + +// BENCHMARK PROVER +// ================================================================================================ + +pub(crate) enum BenchmarkProver { + Local(LocalTransactionProver), + Remote { + prover: RemoteTransactionProver, + limiter: Arc, + permits: Arc, + }, +} + +impl BenchmarkProver { + pub(crate) fn local() -> Self { + Self::Local(LocalTransactionProver::default()) + } + + pub(crate) fn remote(endpoint: String) -> Self { + let prover = RemoteTransactionProver::new(endpoint).with_timeout(PROVE_TIMEOUT); + Self::Remote { + prover, + limiter: Arc::new(RampingRateLimiter::new()), + permits: Arc::new(Semaphore::new(MAX_IN_FLIGHT)), + } + } + + /// Prove the given executed transaction. The remote path paces dispatch + /// through the rate limiter and retries retryable errors with exponential + /// backoff; the local path runs the in-process prover directly. + pub(crate) async fn prove( + &self, + executed_tx: ExecutedTransaction, + ) -> Result { + match self { + Self::Local(prover) => prover + .prove(executed_tx) + .await + .map_err(|err| anyhow::anyhow!("local proving failed: {err}")), + Self::Remote { prover, limiter, permits } => { + let tx_inputs: TransactionInputs = executed_tx.into(); + prove_remote_with_retry(prover, limiter, permits, &tx_inputs).await + }, + } + } +} + +async fn prove_remote_with_retry( + prover: &RemoteTransactionProver, + limiter: &Arc, + permits: &Arc, + tx_inputs: &TransactionInputs, +) -> Result { + // Hold one in-flight permit across every retry so the concurrency cap + // accounts for slow-but-still-progressing requests. + let _permit = permits + .clone() + .acquire_owned() + .await + .expect("in-flight semaphore is never closed"); + + let mut attempt: u32 = 0; + loop { + limiter.acquire().await; + match prover.prove(tx_inputs).await { + Ok(tx) => return Ok(tx), + Err(err) => { + if !is_retryable(&err) { + return Err(anyhow::anyhow!("remote proving failed: {err}")); + } + limiter.freeze(); + attempt += 1; + if attempt > RETRY_MAX_ATTEMPTS { + return Err(anyhow::anyhow!( + "remote proving failed after {RETRY_MAX_ATTEMPTS} retries: {err}" + )); + } + let shift = attempt.min(RETRY_BACKOFF_SHIFT_CAP); + let backoff = (RETRY_BASE.saturating_mul(1 << shift)).min(RETRY_MAX_BACKOFF); + eprintln!( + "remote prover returned retryable error (attempt {attempt}/{RETRY_MAX_ATTEMPTS}, backoff {backoff:?}): {err}" + ); + tokio::time::sleep(backoff).await; + }, + } + } +} + +/// Walk the error source chain looking for a tonic status or transport error. +/// We classify resource-exhausted, unavailable, deadline-exceeded, and any +/// transport-level failure (e.g. broken pipe, connect refused) as retryable. +fn is_retryable(err: &TransactionProverError) -> bool { + let mut src: Option<&(dyn std::error::Error + 'static)> = err.source(); + while let Some(e) = src { + if let Some(status) = e.downcast_ref::() { + return matches!( + status.code(), + tonic::Code::ResourceExhausted + | tonic::Code::Unavailable + | tonic::Code::DeadlineExceeded + ); + } + if e.downcast_ref::().is_some() { + return true; + } + src = e.source(); + } + false +} + +// RAMPING RATE LIMITER +// ================================================================================================ + +/// A wall-clock-anchored rate limiter that ramps from [`START_RATE`] to +/// [`MAX_RATE`] requests/sec, bumping by 1 rps every [`STEP_DURATION`]. +/// +/// [`freeze`](Self::freeze) caps the rate at its current value for the rest of +/// the run; once frozen, the ramp never resumes. +pub(crate) struct RampingRateLimiter { + start: Instant, + inner: Mutex, + /// Last rate we logged a step transition for. Used purely for logging. + reported_rate: AtomicU32, +} + +struct Inner { + /// Earliest instant at which the next `acquire()` may return. + next_release: Instant, + /// If `Some(rate)`, the rate is capped at `rate` for the rest of the run. + frozen_at: Option, +} + +impl RampingRateLimiter { + fn new() -> Self { + let now = Instant::now(); + Self { + start: now, + inner: Mutex::new(Inner { next_release: now, frozen_at: None }), + reported_rate: AtomicU32::new(0), + } + } + + /// Block until this caller is allowed to dispatch one request under the + /// current rate schedule. + async fn acquire(&self) { + let sleep_until = { + let mut inner = self.inner.lock().await; + let rate = compute_rate(self.start, inner.frozen_at); + let now = Instant::now(); + let earliest = inner.next_release.max(now); + let slot = earliest + slot_interval(rate); + inner.next_release = slot; + + let prev = self.reported_rate.swap(rate, Ordering::Relaxed); + if prev != rate { + println!(" rate limiter: now dispatching at {rate} req/s"); + } + earliest + }; + tokio::time::sleep_until(sleep_until.into()).await; + } + + /// Freeze the rate at the current value. Idempotent — first freeze wins. + fn freeze(&self) { + // Best-effort lock; if contended, the other caller will set it. + if let Ok(mut inner) = self.inner.try_lock() { + if inner.frozen_at.is_none() { + let rate = compute_rate(self.start, None); + inner.frozen_at = Some(rate); + println!( + " rate limiter: freezing ramp at {rate} req/s after retryable prover error" + ); + } + } + } +} + +fn compute_rate(start: Instant, frozen_at: Option) -> u32 { + let elapsed = start.elapsed(); + let step = u32::try_from(elapsed.as_secs() / STEP_DURATION.as_secs()).unwrap_or(u32::MAX); + let target = START_RATE.saturating_add(step).min(MAX_RATE); + frozen_at.map_or(target, |cap| target.min(cap)) +} + +fn slot_interval(rate: u32) -> Duration { + Duration::from_micros(1_000_000 / u64::from(rate.max(1))) +} + +// TESTS +// ================================================================================================ + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn rate_starts_at_start_rate() { + let now = Instant::now(); + assert_eq!(compute_rate(now, None), START_RATE); + } + + #[test] + fn rate_is_capped_by_freeze() { + let now = Instant::now(); + // `frozen_at` is a cap, not a target — at t=0 the natural rate is + // `START_RATE`, which is already below caps of 3 or higher. + assert_eq!(compute_rate(now, Some(3)), START_RATE); + assert_eq!(compute_rate(now, Some(MAX_RATE)), START_RATE); + // A cap below the natural rate clamps the result down. + assert_eq!(compute_rate(now, Some(0)), 0); + } + + #[test] + fn natural_rate_is_capped_at_max() { + // Simulate "elapsed > MAX_RATE * STEP_DURATION" by constructing a + // start instant far in the past. + let long_ago = Instant::now() + .checked_sub(STEP_DURATION * (MAX_RATE + 5)) + .expect("test environment supports backdated Instants"); + assert_eq!(compute_rate(long_ago, None), MAX_RATE); + assert_eq!(compute_rate(long_ago, Some(4)), 4); + } + + #[test] + fn slot_interval_matches_rate() { + assert_eq!(slot_interval(1), Duration::from_secs(1)); + assert_eq!(slot_interval(10), Duration::from_millis(100)); + } +} diff --git a/bin/benchmark/src/rpc_state.rs b/bin/benchmark/src/rpc_state.rs new file mode 100644 index 000000000..4fe4e1cc9 --- /dev/null +++ b/bin/benchmark/src/rpc_state.rs @@ -0,0 +1,93 @@ +//! Thin-client state-fetch helpers used by `create_proofs::run`. +//! +//! These let the bench bind its proofs to the target node's actual chain +//! state (chain MMR at the tip) instead of fabricating an empty +//! `PartialBlockchain`. Without this, runs against any chain whose genesis +//! state isn't minimal (testnet, devnet, any local node restored from +//! a snapshot) fail with `AdviceError::MapKeyNotFound` during proof +//! generation. + +use miden_node_proto::clients::RpcClient; +use miden_node_proto::generated::rpc::{ + BlockHeaderByNumberRequest, + FinalityLevel, + SyncChainMmrRequest, +}; +use miden_protocol::block::BlockHeader; +use miden_protocol::crypto::merkle::mmr::{MmrDelta, MmrPeaks, PartialMmr}; +use miden_protocol::transaction::PartialBlockchain; + +/// Fetch the header of the latest committed block from the target node. +/// +/// `get_block_header_by_number(block_num=None)` returns the chain tip per +/// the server's documented contract. +pub(crate) async fn fetch_chain_tip_header(client: &mut RpcClient) -> BlockHeader { + let response = client + .get_block_header_by_number(BlockHeaderByNumberRequest { + block_num: None, + include_mmr_proof: None, + }) + .await + .expect("failed to fetch chain tip header") + .into_inner(); + + response + .block_header + .expect("chain tip response missing block_header") + .try_into() + .expect("failed to decode chain tip block header") +} + +/// Build a [`PartialBlockchain`] whose chain MMR matches the tip block's +/// `chain_commitment`. +/// +/// Construction is: +/// +/// - `tip_block_num == 0` → empty MMR (chain at genesis has no prior blocks committed). No RPC +/// calls. +/// - `tip_block_num >= 1` → MMR starts empty, then the genesis block's commitment is added as leaf +/// 0 (this brings the local MMR's forest to 1, matching what the server expects as the caller's +/// pre-state for `block_from = 0`). +/// - `tip_block_num >= 2` → `sync_chain_mmr(block_from = 0, upper_bound = BlockNum(tip_block_num))` +/// is called and the returned `MmrDelta` is applied, bringing the MMR's forest from 1 up to +/// `tip_block_num`. +/// +/// After this function returns, `partial_mmr.peaks().hash_peaks()` matches +/// the tip block's `chain_commitment()`. +pub(crate) async fn fetch_partial_blockchain( + client: &mut RpcClient, + tip_block_num: u32, + genesis_header: &BlockHeader, +) -> PartialBlockchain { + let mut partial_mmr = PartialMmr::from_peaks(MmrPeaks::default()); + + if tip_block_num == 0 { + return PartialBlockchain::new(partial_mmr, Vec::new()) + .expect("empty PartialBlockchain construction"); + } + + // Genesis is always leaf 0; this brings forest from 0 to 1. + partial_mmr.add(genesis_header.commitment(), false); + + if tip_block_num >= 2 { + let request = SyncChainMmrRequest { + current_client_block_height: 0, + finality_level: FinalityLevel::Committed.into(), + }; + + let response = client + .sync_chain_mmr(request) + .await + .expect("failed to call sync_chain_mmr") + .into_inner(); + let mmr_delta_proto = + response.mmr_delta.expect("sync_chain_mmr response missing mmr_delta"); + let mmr_delta: MmrDelta = mmr_delta_proto + .try_into() + .expect("failed to decode MmrDelta from sync_chain_mmr response"); + partial_mmr.apply(mmr_delta).expect("failed to apply chain MMR delta"); + } + + PartialBlockchain::new(partial_mmr, Vec::new()) + .expect("PartialBlockchain construction from fetched chain MMR") +} diff --git a/bin/benchmark/src/submit.rs b/bin/benchmark/src/submit.rs new file mode 100644 index 000000000..1f956ca95 --- /dev/null +++ b/bin/benchmark/src/submit.rs @@ -0,0 +1,267 @@ +//! The `run-benchmark` orchestrator and the submission RPC primitives. +//! +//! `run` is the top-level entry point invoked from `main::Cli::run` for the +//! `RunBenchmark` subcommand. It owns the dance of loading the proven-tx +//! bundle, submitting mints sequentially and consumes concurrently, waiting +//! for the chain to advance, and handing off to [`crate::inclusion`] + +//! [`crate::summary`] for the inclusion scan and the human-readable summary. +//! +//! The submission primitives ([`submit_all`], [`submit_sequential`]) and the +//! aggregate types ([`SubmitOutcome`], [`PhaseStats`]) live here too because +//! they're not used anywhere else — only `summary` reads `PhaseStats` and only +//! by `&` reference. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant, SystemTime}; + +use miden_node_proto::clients::RpcClient; +use miden_node_proto::generated as proto; +use miden_protocol::transaction::{ProvenTransaction, TransactionId}; +use miden_protocol::utils::serde::Serializable; +use tokio::sync::Semaphore; +use url::Url; + +use crate::inclusion::{current_block_height, scan_with_drain}; +use crate::summary::{print_phase_progress, print_summary}; +use crate::{PROOFS_DIR, create_genesis_aware_rpc_client, read_from_file}; + +// ORCHESTRATOR +// ================================================================================================ + +pub(crate) async fn run(rpc_url: Url, concurrency: usize, wait_blocks: u32) { + let in_dir = PathBuf::from(PROOFS_DIR); + + println!("Loading mint txs from {}", in_dir.join("mint_txs.bin").display()); + let mint_txs: Vec = read_from_file(&in_dir.join("mint_txs.bin")); + let mint_tx_inputs: Vec> = read_from_file(&in_dir.join("mint_tx_inputs.bin")); + assert_eq!(mint_txs.len(), mint_tx_inputs.len(), "mint tx/inputs length mismatch"); + + println!("Loading consume txs from {}", in_dir.join("consume_txs.bin").display()); + let consume_txs: Vec = read_from_file(&in_dir.join("consume_txs.bin")); + let consume_tx_inputs: Vec> = read_from_file(&in_dir.join("consume_tx_inputs.bin")); + assert_eq!(consume_txs.len(), consume_tx_inputs.len(), "consume tx/inputs length mismatch"); + + // Compute the tx-id master lists up front so we can match them against + // on-chain block contents later, without having to interrogate the node. + let mint_ids: Vec = mint_txs.iter().map(ProvenTransaction::id).collect(); + let consume_ids: Vec = consume_txs.iter().map(ProvenTransaction::id).collect(); + + println!("Connecting to {rpc_url}..."); + let rpc_client = create_genesis_aware_rpc_client(&rpc_url, Duration::from_secs(30)) + .await + .expect("failed to create RPC client"); + + let h_start = current_block_height(rpc_client.clone()).await; + println!("Chain height at start: {h_start}"); + + println!( + "Submitting {} mint txs sequentially (each one mutates the shared faucet, so the \ + submits must be serialized for the mempool to chain them)...", + mint_txs.len() + ); + let mint_stats = submit_sequential(rpc_client.clone(), mint_txs, mint_tx_inputs).await; + print_phase_progress("mint", &mint_stats); + + println!("Submitting {} consume txs with concurrency={concurrency}...", consume_txs.len()); + let consume_stats = + submit_all(rpc_client.clone(), consume_txs, consume_tx_inputs, concurrency).await; + print_phase_progress("consume", &consume_stats); + + let ack_by_id = build_ack_map(&mint_ids, &mint_stats, &consume_ids, &consume_stats); + println!( + "Watching for inclusion of {} acked tx(s) (max {wait_blocks} blocks past height {h_start})...", + ack_by_id.len(), + ); + let (h_final, inclusion) = + scan_with_drain(rpc_client.clone(), h_start, wait_blocks, ack_by_id).await; + + print_summary(h_start, h_final, &mint_stats, &consume_stats, concurrency, &inclusion); +} + +// SUBMISSION STATS +// ================================================================================================ + +/// Outcome of a single `submit_proven_transaction` RPC. +#[derive(Debug)] +pub(crate) struct SubmitOutcome { + /// Position of this tx in the original input vec — used to recover the + /// corresponding `TransactionId` from the caller-owned id list. + pub(crate) index: usize, + /// `Ok(rpc_round_trip_duration)` on success, `Err(grpc_code)` on failure. + pub(crate) result: Result, + /// Wall-clock timestamp at which the RPC returned `Ok`. `None` on error. + /// Stored as `SystemTime` so it is directly comparable to block headers' + /// unix-second timestamps when computing inclusion latency. + pub(crate) ack_at: Option, +} + +/// Aggregated stats for one submission phase (mint or consume). +#[derive(Debug)] +pub(crate) struct PhaseStats { + /// Wall-clock duration of the entire phase. + pub(crate) elapsed: Duration, + /// One entry per input tx, aligned by `index`. + pub(crate) outcomes: Vec, +} + +impl PhaseStats { + pub(crate) fn ok_count(&self) -> u64 { + self.outcomes.iter().filter(|o| o.result.is_ok()).count() as u64 + } + + pub(crate) fn err_count(&self) -> u64 { + self.outcomes.iter().filter(|o| o.result.is_err()).count() as u64 + } + + pub(crate) fn submit_latencies(&self) -> Vec { + self.outcomes.iter().filter_map(|o| o.result.as_ref().ok().copied()).collect() + } + + pub(crate) fn err_by_code(&self) -> HashMap { + let mut map: HashMap = HashMap::new(); + for o in &self.outcomes { + if let Err(code) = o.result { + *map.entry(code).or_insert(0) += 1; + } + } + map + } +} + +// SUBMISSION PRIMITIVES +// ================================================================================================ + +async fn submit_all( + client: RpcClient, + txs: Vec, + tx_inputs: Vec>, + concurrency: usize, +) -> PhaseStats { + /// How many distinct error messages to surface to the console as they + /// happen. The full failure breakdown still appears in the summary. + const MAX_ERRORS_TO_PRINT: u64 = 5; + + let start = Instant::now(); + let semaphore = Arc::new(Semaphore::new(concurrency)); + // Incrementing-only counter used purely to budget the live error prints. + // It is never read on the hot path, so it does not introduce any + // submit-side synchronization beyond what was already there. + let printed = Arc::new(AtomicU64::new(0)); + + let total = txs.len(); + let mut set = tokio::task::JoinSet::new(); + for (i, (tx, inputs)) in txs.into_iter().zip(tx_inputs.into_iter()).enumerate() { + let permit = semaphore.clone().acquire_owned().await.unwrap(); + let mut client = client.clone(); + let printed = printed.clone(); + set.spawn(async move { + let request = proto::transaction::ProvenTransaction { + transaction: tx.to_bytes(), + transaction_inputs: Some(inputs), + }; + let t0 = Instant::now(); + let outcome = match client.submit_proven_transaction(request).await { + Ok(_) => SubmitOutcome { + index: i, + result: Ok(t0.elapsed()), + ack_at: Some(SystemTime::now()), + }, + Err(status) => { + if printed.fetch_add(1, Ordering::Relaxed) < MAX_ERRORS_TO_PRINT { + eprintln!( + " tx idx {i} failed: code={:?} message={}", + status.code(), + status.message() + ); + } + SubmitOutcome { + index: i, + result: Err(status.code()), + ack_at: None, + } + }, + }; + drop(permit); + outcome + }); + } + + // Outcomes carry their original `index`, so completion order is fine — + // downstream summarizers don't depend on the vec being in spawn order. + let mut outcomes = Vec::with_capacity(total); + while let Some(res) = set.join_next().await { + outcomes.push(res.expect("submission task panicked")); + } + + PhaseStats { elapsed: start.elapsed(), outcomes } +} + +/// Submit txs one at a time, awaiting each RPC response before sending the +/// next. Used for the mint phase, where every tx mutates the shared faucet +/// and therefore must arrive at the mempool in order — the block-producer's +/// mempool will reject out-of-order submissions but happily chains in-order +/// ones against its own pending state, so we only need to serialize the +/// `submit_proven_transaction` calls themselves, not wait for block +/// inclusion in between. +async fn submit_sequential( + mut client: RpcClient, + txs: Vec, + tx_inputs: Vec>, +) -> PhaseStats { + let start = Instant::now(); + let total = txs.len(); + let mut outcomes = Vec::with_capacity(total); + + for (i, (tx, inputs)) in txs.into_iter().zip(tx_inputs.into_iter()).enumerate() { + let request = proto::transaction::ProvenTransaction { + transaction: tx.to_bytes(), + transaction_inputs: Some(inputs), + }; + + let t0 = Instant::now(); + let outcome = match client.submit_proven_transaction(request).await { + Ok(_) => SubmitOutcome { + index: i, + result: Ok(t0.elapsed()), + ack_at: Some(SystemTime::now()), + }, + Err(status) => { + eprintln!(" tx {} / {total} failed: {status}", i + 1); + SubmitOutcome { + index: i, + result: Err(status.code()), + ack_at: None, + } + }, + }; + outcomes.push(outcome); + } + + PhaseStats { elapsed: start.elapsed(), outcomes } +} + +/// Build a lookup from the on-chain `TransactionId` of every successfully +/// submitted tx to the `SystemTime` at which the node ack'd its submission. +/// Used by [`compute_inclusion`] to compute per-tx inclusion latency. +fn build_ack_map( + mint_ids: &[TransactionId], + mint_stats: &PhaseStats, + consume_ids: &[TransactionId], + consume_stats: &PhaseStats, +) -> HashMap { + let mut map = HashMap::new(); + for outcome in &mint_stats.outcomes { + if let Some(ack_at) = outcome.ack_at { + map.insert(mint_ids[outcome.index], ack_at); + } + } + for outcome in &consume_stats.outcomes { + if let Some(ack_at) = outcome.ack_at { + map.insert(consume_ids[outcome.index], ack_at); + } + } + map +} diff --git a/bin/benchmark/src/summary.rs b/bin/benchmark/src/summary.rs new file mode 100644 index 000000000..078280a3b --- /dev/null +++ b/bin/benchmark/src/summary.rs @@ -0,0 +1,314 @@ +//! Every line the bench prints to stdout, plus the formatting and metric +//! helpers that only matter for output (percentiles, rate/percentage casts, +//! duration formatters). Other modules pass `PhaseStats` and `InclusionResult` +//! references in; this module never mutates or owns them. + +use std::collections::HashMap; +use std::time::Duration; + +use crate::inclusion::{BlockHit, InclusionResult}; +use crate::submit::PhaseStats; + +// PROOF-GENERATION SUMMARY +// ================================================================================================ + +/// Prints a per-phase summary of how long proof generation took, broken down +/// into the executor (VM execution) and prover (STARK proving) costs, plus the +/// mean per tx for each so that runs of different sizes can be compared. +pub(crate) fn print_proving_summary( + label: &str, + num_transactions: u64, + wall: Duration, + exec_total: Duration, + prove_total: Duration, +) { + let n_u32 = u32::try_from(num_transactions).unwrap_or(u32::MAX); + let exec_mean = if num_transactions > 0 { + exec_total / n_u32 + } else { + Duration::ZERO + }; + let prove_mean = if num_transactions > 0 { + prove_total / n_u32 + } else { + Duration::ZERO + }; + let per_tx_mean = if num_transactions > 0 { + (exec_total + prove_total) / n_u32 + } else { + Duration::ZERO + }; + println!("{label} proving summary (n={num_transactions}):"); + println!(" wall time: {}", format_duration_secs(wall)); + println!( + " execute_transaction: total={} mean={}/tx", + format_duration_secs(exec_total), + format_duration_secs(exec_mean), + ); + println!( + " prover.prove: total={} mean={}/tx", + format_duration_secs(prove_total), + format_duration_secs(prove_mean), + ); + println!(" exec+prove per tx: mean={}/tx", format_duration_secs(per_tx_mean)); +} + +// SUBMISSION SUMMARIES +// ================================================================================================ + +pub(crate) fn print_phase_progress(label: &str, stats: &PhaseStats) { + let elapsed = stats.elapsed.as_secs_f64(); + let rate = rate_per_second(stats.ok_count(), stats.elapsed); + println!( + " {label}: ok={ok} err={err} in {elapsed:.1}s ({rate:.1} tx/s ack rate)", + ok = stats.ok_count(), + err = stats.err_count(), + ); +} + +pub(crate) fn print_summary( + h_start: u32, + h_final: u32, + mint: &PhaseStats, + consume: &PhaseStats, + concurrency: usize, + inclusion: &InclusionResult, +) { + println!(); + println!("=== Summary ==="); + println!( + "Chain height: {h_start} -> {h_final} ({} blocks, of which {} contained at least one of our txs)", + h_final - h_start, + inclusion.per_block_hits.len(), + ); + println!(); + print_phase_summary("Mint phase (sequential)", mint); + println!(); + print_phase_summary(&format!("Consume phase (concurrent, c={concurrency})"), consume); + println!(); + print_inclusion_summary(inclusion); +} + +fn print_phase_summary(title: &str, stats: &PhaseStats) { + let ok = stats.ok_count(); + let err = stats.err_count(); + let elapsed = stats.elapsed.as_secs_f64(); + let total = stats.outcomes.len() as u64; + + println!("{title}:"); + println!( + " ok = {ok} / {total} err = {err} ({})", + format_err_breakdown(stats.err_by_code()), + ); + + let mut latencies = stats.submit_latencies(); + if let Some(p) = percentiles(&mut latencies) { + println!( + " submit RPC latency: mean={mean} p50={p50} p95={p95} p99={p99} max={max}", + mean = format_duration_ms(p.mean), + p50 = format_duration_ms(p.p50), + p95 = format_duration_ms(p.p95), + p99 = format_duration_ms(p.p99), + max = format_duration_ms(p.max), + ); + } else { + println!(" submit RPC latency: (no successful submissions)"); + } + + let rate = rate_per_second(stats.ok_count(), stats.elapsed); + println!(" elapsed = {elapsed:.1}s, RPC ack rate = {rate:.1} tx/s"); +} + +fn print_inclusion_summary(inclusion: &InclusionResult) { + let submitted = inclusion.submitted_count; + let included = inclusion.included_count; + let drop = submitted.saturating_sub(included); + let drop_pct = ratio_pct(drop, submitted); + + println!("Inclusion (per-tx ID match against block contents):"); + println!( + " included = {included} / {submitted} submitted ({drop} missing, {drop_pct:.1}% drop)", + ); + + let hits = &inclusion.per_block_hits; + if hits.is_empty() { + println!(" no blocks observed containing any of our txs"); + return; + } + + // Per-block aggregates. + let counts: Vec = hits.iter().map(|h| h.hit_count).collect(); + let sum_counts: u32 = counts.iter().copied().sum(); + let max_count = counts.iter().copied().max().unwrap_or(0); + let n_blocks = u32::try_from(counts.len()).unwrap_or(u32::MAX); + let mean_count = f64::from(sum_counts) / f64::from(n_blocks); + + let peak_block = hits.iter().max_by_key(|h| h.hit_count).expect("non-empty hits"); + let first_block = hits.first().expect("non-empty hits"); + let last_block = hits.last().expect("non-empty hits"); + + println!( + " blocks with our txs = {n_blocks} \ + (block range {}..={}, mean txs/block when present = {mean_count:.1}, max = {max_count})", + first_block.block_num, last_block.block_num, + ); + + // Derive the block interval from consecutive scanned timestamps. + let Some(block_interval) = inclusion.derived_block_interval() else { + println!( + " block interval: could not derive from {} scanned block(s) \ + (need >=2 blocks spanning at least one second boundary)", + inclusion.scanned_block_count, + ); + println!(" throughput metrics skipped; per-block series follows."); + print_per_block_series(hits, None); + return; + }; + + println!( + " derived block interval = {} (from {} scanned blocks, span = {}s)", + format_duration_secs(block_interval), + inclusion.scanned_block_count, + inclusion.scanned_last_ts - inclusion.scanned_first_ts, + ); + + // Throughput. Each block-with-our-txs is treated as `block_interval` + // seconds of node work. + let interval_secs = block_interval.as_secs_f64(); + let peak_rate = rate_per_second(u64::from(peak_block.hit_count), block_interval); + let mean_rate = if interval_secs > 0.0 { + mean_count / interval_secs + } else { + 0.0 + }; + let window_rate = rate_per_second(included, block_interval.saturating_mul(n_blocks)); + + println!( + " peak per-block rate = {} txs in block {} => {peak_rate:.1} tx/s", + peak_block.hit_count, peak_block.block_num, + ); + println!(" mean per-block rate = {mean_count:.1} txs/block => {mean_rate:.1} tx/s"); + println!( + " window-average TPS = {included} included / ({n_blocks} blocks * {}) \ + => {window_rate:.1} tx/s", + format_duration_secs(block_interval), + ); + + print_per_block_series(hits, Some(block_interval)); + + let mut lats = inclusion.inclusion_latencies.clone(); + if let Some(p) = percentiles(&mut lats) { + println!( + " inclusion latency (submit_ack -> block timestamp): mean={mean} p50={p50} p95={p95} p99={p99} max={max}", + mean = format_duration_secs(p.mean), + p50 = format_duration_secs(p.p50), + p95 = format_duration_secs(p.p95), + p99 = format_duration_secs(p.p99), + max = format_duration_secs(p.max), + ); + } +} + +/// Print a compact per-block series so the operator can eyeball the +/// time-series shape (ramp, plateau, dip). Empty blocks in the scan range +/// are intentionally omitted. If `block_interval` is `Some`, each line also +/// shows the equivalent rate; if `None`, only the raw count. +fn print_per_block_series(hits: &[BlockHit], block_interval: Option) { + println!(" per-block series:"); + for hit in hits { + match block_interval { + Some(interval) => { + let rate = rate_per_second(u64::from(hit.hit_count), interval); + println!( + " block {} (ts={}): {} txs ({rate:.1} tx/s @ block_interval)", + hit.block_num, hit.block_ts, hit.hit_count, + ); + }, + None => { + println!( + " block {} (ts={}): {} txs", + hit.block_num, hit.block_ts, hit.hit_count, + ); + }, + } + } +} + +// METRIC HELPERS +// ================================================================================================ + +/// Computes `count / elapsed`, treating a zero-or-negative elapsed window as +/// zero. Wrapping the cast in a helper keeps the precision-loss expect tightly +/// scoped — the loss is harmless for display purposes. +#[expect( + clippy::cast_precision_loss, + reason = "presentational rate; precision loss past 2^52 events is irrelevant" +)] +fn rate_per_second(count: u64, elapsed: Duration) -> f64 { + let secs = elapsed.as_secs_f64(); + if secs > 0.0 { (count as f64) / secs } else { 0.0 } +} + +/// Computes `100 * num / den` as a percentage, returning 0 when `den == 0`. +#[expect( + clippy::cast_precision_loss, + reason = "presentational percentage; precision loss past 2^52 is irrelevant" +)] +fn ratio_pct(num: u64, den: u64) -> f64 { + if den == 0 { + 0.0 + } else { + (num as f64) * 100.0 / (den as f64) + } +} + +fn format_err_breakdown(by_code: HashMap) -> String { + if by_code.is_empty() { + return "no errors".to_string(); + } + let mut entries: Vec<(tonic::Code, u64)> = by_code.into_iter().collect(); + entries.sort_by(|a, b| b.1.cmp(&a.1)); + let parts: Vec = entries.iter().map(|(c, n)| format!("{c:?}={n}")).collect(); + parts.join(", ") +} + +fn format_duration_ms(d: Duration) -> String { + format!("{:.1}ms", d.as_secs_f64() * 1000.0) +} + +fn format_duration_secs(d: Duration) -> String { + format!("{:.2}s", d.as_secs_f64()) +} + +#[derive(Debug, Clone, Copy)] +struct Percentiles { + mean: Duration, + p50: Duration, + p95: Duration, + p99: Duration, + max: Duration, +} + +/// Returns `None` if there are no samples. +fn percentiles(samples: &mut [Duration]) -> Option { + if samples.is_empty() { + return None; + } + samples.sort(); + let n = samples.len(); + // Integer index for percentile `num/den`. Picked over an `f64` cast to + // avoid the cast_sign_loss / cast_precision_loss footguns. + let pick = |num: usize, den: usize| -> Duration { + let idx = (n * num / den).min(n - 1); + samples[idx] + }; + let sum: Duration = samples.iter().copied().sum(); + let mean = sum / u32::try_from(n).unwrap_or(u32::MAX); + Some(Percentiles { + mean, + p50: pick(50, 100), + p95: pick(95, 100), + p99: pick(99, 100), + max: *samples.last().unwrap(), + }) +} diff --git a/bin/node/src/commands/block_producer.rs b/bin/node/src/commands/block_producer.rs index b92186823..e19223fd2 100644 --- a/bin/node/src/commands/block_producer.rs +++ b/bin/node/src/commands/block_producer.rs @@ -6,6 +6,7 @@ use anyhow::Context; use miden_node_block_producer::{ BlockProducer, DEFAULT_BATCH_INTERVAL, + DEFAULT_BATCH_WORKERS, DEFAULT_BLOCK_INTERVAL, DEFAULT_MAX_BATCHES_PER_BLOCK, DEFAULT_MAX_TXS_PER_BATCH, @@ -22,6 +23,7 @@ const ENV_MAX_TXS_PER_BATCH: &str = "MIDEN_NODE_BLOCK_PRODUCER_MAX_TXS_PER_BATCH const ENV_MAX_BATCHES_PER_BLOCK: &str = "MIDEN_NODE_BLOCK_PRODUCER_MAX_BATCHES_PER_BLOCK"; const ENV_MEMPOOL_TX_CAPACITY: &str = "MIDEN_NODE_BLOCK_PRODUCER_MEMPOOL_TX_CAPACITY"; const ENV_BATCH_PROVER_URL: &str = "MIDEN_NODE_BLOCK_PRODUCER_BATCH_PROVER_URL"; +const ENV_BATCH_WORKERS: &str = "MIDEN_NODE_BLOCK_PRODUCER_BATCH_WORKERS"; // BLOCK PRODUCER COMMAND // ================================================================================================ @@ -95,6 +97,7 @@ impl BlockProducerCommand { max_batches_per_block: block_producer.max_batches_per_block, grpc_options, mempool_tx_capacity: block_producer.mempool_tx_capacity, + batch_workers: block_producer.batch_workers, } .serve() .await @@ -132,6 +135,7 @@ mod tests { max_txs_per_batch: 8, max_batches_per_block: miden_protocol::MAX_BATCHES_PER_BLOCK + 1, // Invalid value mempool_tx_capacity: NonZeroUsize::new(1000).unwrap(), + batch_workers: NonZeroUsize::new(2).unwrap(), }, enable_otel: false, grpc_options: GrpcOptionsInternal::default(), @@ -157,6 +161,7 @@ mod tests { * (should fail) */ max_batches_per_block: 8, mempool_tx_capacity: NonZeroUsize::new(1000).unwrap(), + batch_workers: NonZeroUsize::new(2).unwrap(), }, enable_otel: false, grpc_options: GrpcOptionsInternal::default(), @@ -223,4 +228,16 @@ pub struct BlockProducerConfig { value_name = "NUM" )] mempool_tx_capacity: NonZeroUsize, + + /// Number of concurrent batch-builder workers. + /// + /// Each worker can prove one batch at a time, so this caps how many batch + /// proofs the block-producer keeps in flight. + #[arg( + long = "batch.workers", + env = ENV_BATCH_WORKERS, + value_name = "NUM", + default_value_t = DEFAULT_BATCH_WORKERS + )] + pub batch_workers: NonZeroUsize, } diff --git a/crates/block-producer/src/lib.rs b/crates/block-producer/src/lib.rs index 955aa2356..6fc204fcc 100644 --- a/crates/block-producer/src/lib.rs +++ b/crates/block-producer/src/lib.rs @@ -32,8 +32,13 @@ pub const DEFAULT_MAX_TXS_PER_BATCH: usize = 8; /// Maximum number of batches per block. pub const DEFAULT_MAX_BATCHES_PER_BLOCK: usize = 8; -/// Size of the batch building worker pool. -const SERVER_NUM_BATCH_BUILDERS: NonZeroUsize = NonZeroUsize::new(2).unwrap(); +/// Default size of the batch-builder worker pool. +/// +/// Each worker can prove one batch at a time. Raising this allows more +/// concurrent batch proofs in-flight, which is the primary lever for lifting +/// the per-block-producer TPS ceiling once `--max-txs-per-batch` and +/// `--max-batches-per-block` are pushed up. +pub const DEFAULT_BATCH_WORKERS: NonZeroUsize = NonZeroUsize::new(2).unwrap(); /// The number of blocks of committed state that the mempool retains. /// diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index f0266c98a..a1a835d38 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -33,7 +33,7 @@ use crate::errors::{BlockProducerError, MempoolSubmissionError, StoreError}; use crate::mempool::{BatchBudget, BlockBudget, Mempool, MempoolConfig, SharedMempool}; use crate::store::StoreClient; use crate::validator::BlockProducerValidatorClient; -use crate::{CACHED_MEMPOOL_STATS_UPDATE_INTERVAL, COMPONENT, SERVER_NUM_BATCH_BUILDERS}; +use crate::{CACHED_MEMPOOL_STATS_UPDATE_INTERVAL, COMPONENT}; #[cfg(test)] mod tests; @@ -66,6 +66,9 @@ pub struct BlockProducer { /// The maximum number of inflight transactions allowed in the mempool at once. pub mempool_tx_capacity: NonZeroUsize, + + /// The number of concurrent batch-builder workers. + pub batch_workers: NonZeroUsize, } // BLOCK PRODUCER @@ -119,7 +122,7 @@ impl BlockProducer { let block_builder = BlockBuilder::new(store.clone(), validator, self.block_interval); let batch_builder = BatchBuilder::new( store.clone(), - SERVER_NUM_BATCH_BUILDERS, + self.batch_workers, self.batch_prover_url, self.batch_interval, ); diff --git a/crates/block-producer/src/server/tests.rs b/crates/block-producer/src/server/tests.rs index 053de2c7d..b573d157c 100644 --- a/crates/block-producer/src/server/tests.rs +++ b/crates/block-producer/src/server/tests.rs @@ -14,7 +14,12 @@ use tokio::{runtime, task}; use tonic::transport::{Channel, Endpoint}; use url::Url; -use crate::{BlockProducer, DEFAULT_MAX_BATCHES_PER_BLOCK, DEFAULT_MAX_TXS_PER_BATCH}; +use crate::{ + BlockProducer, + DEFAULT_BATCH_WORKERS, + DEFAULT_MAX_BATCHES_PER_BLOCK, + DEFAULT_MAX_TXS_PER_BATCH, +}; /// Tests that the block producer starts up correctly even when the store is not initially /// available. The block producer should retry with exponential backoff until the store becomes @@ -75,6 +80,7 @@ async fn block_producer_startup_is_robust_to_network_failures() { max_batches_per_block: DEFAULT_MAX_BATCHES_PER_BLOCK, grpc_options, mempool_tx_capacity: NonZeroUsize::new(100).unwrap(), + batch_workers: DEFAULT_BATCH_WORKERS, } .serve() .await