From 4e6847dea1017a9df842c8ab20450a257126eaf5 Mon Sep 17 00:00:00 2001 From: AztecBot Date: Sat, 16 May 2026 23:05:42 +0000 Subject: [PATCH] fix(prover-client): drop late stale-epoch reports + race-proof epoch teardown Re-fix the flaky 'ProvingBroker > Retries > does not retry if job is stale' test (persisted variant) that dequeued PR #23344 from the merge queue. The failure surfaces as Error('Store is closed') from BatchQueue.execProcessor -> KVBrokerDatabase.commitWrites -> SingleEpochDatabase.batchWrite -> store.transactionAsync, triggered by a race between the broker's epoch cleanupPass and the persisted DB's in-flight batched writes. Two small, independent defenses: - proving_broker.ts: in #reportProvingJobError and #reportProvingJobSuccess, after the existing !item early-return, also drop the report when isJobStale(item). A late report for a job whose epoch has been (or is being) cleaned up never reaches the database. - proving_broker_database/persisted.ts: track deletedEpochs so commitWrites short-circuits for torn-down epoch directories; reorder deleteAllProvingJobsOlderThanEpoch to remove from the epochs map before await db.delete(); catch a residual 'Store is closed' inside commitWrites as a benign drop. Verified locally: 5 consecutive runs of proving_broker.test.ts pass, 102/102 tests each, including both in-memory and persisted variants of the previously flaky 'does not retry if job is stale'. --- .../src/proving_broker/proving_broker.ts | 19 ++++++++++++++ .../proving_broker_database/persisted.ts | 26 +++++++++++++++++-- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index 27364938d5e1..a9baf923b641 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -412,6 +412,17 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr return; } + if (this.isJobStale(item)) { + // Job belongs to an epoch that has been (or is being) cleaned up. Don't persist a + // late error report — the cleanupPass will drop in-memory state for it, and writing + // to the deleted-epoch database racing with that teardown surfaces as 'Store is closed'. + this.logger.warn(`Discarding error report for stale proving job id=${id} epochNumber=${item.epochNumber}`, { + provingJobId: id, + }); + this.inProgress.delete(id); + return; + } + if (!info) { this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`, { provingJobId: id, @@ -554,6 +565,14 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr return; } + if (this.isJobStale(item)) { + this.logger.warn(`Discarding result for stale proving job id=${id} epochNumber=${item.epochNumber}`, { + provingJobId: id, + }); + this.inProgress.delete(id); + return; + } + if (!info) { this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`, { provingJobId: id, diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts b/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts index 021a1e64c84c..32cd291c7e16 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts @@ -87,6 +87,8 @@ export class KVBrokerDatabase implements ProvingBrokerDatabase { public readonly tracer: Tracer; + private deletedEpochs = new Set(); + private constructor( private epochs: Map, private config: ProverBrokerConfig, @@ -113,11 +115,26 @@ export class KVBrokerDatabase implements ProvingBrokerDatabase { // exposed for testing public async commitWrites(items: Array, epochNumber: number) { + if (this.deletedEpochs.has(epochNumber)) { + // Epoch was already pruned; the broker no longer cares about these writes. + return; + } + const jobsToAdd = items.filter((item): item is ProvingJob => 'id' in item); const resultsToAdd = items.filter((item): item is [ProvingJobId, ProvingJobSettledResult] => Array.isArray(item)); const db = await this.getEpochDatabase(EpochNumber(epochNumber)); - await db.batchWrite(jobsToAdd, resultsToAdd); + try { + await db.batchWrite(jobsToAdd, resultsToAdd); + } catch (err) { + // The store can be closed concurrently by deleteAllProvingJobsOlderThanEpoch while a + // batch is mid-flight. Treat this as a benign no-op — the epoch is being torn down. + if (err instanceof Error && err.message === 'Store is closed') { + this.logger.verbose(`Dropping batch for closed epoch ${epochNumber} store`); + return; + } + throw err; + } } private async estimateSize() { @@ -181,14 +198,19 @@ export class KVBrokerDatabase implements ProvingBrokerDatabase { })) async deleteAllProvingJobsOlderThanEpoch(epochNumber: EpochNumber): Promise { const oldEpochs = Array.from(this.epochs.keys()).filter(e => e < Number(epochNumber)); + // Mark before tearing down: this prevents commitWrites from reopening a deleted epoch's + // directory if a stale batch arrives mid-delete. + for (const old of oldEpochs) { + this.deletedEpochs.add(old); + } for (const old of oldEpochs) { const db = this.epochs.get(old); if (!db) { continue; } this.logger.verbose(`Deleting broker database for epoch ${old}`); - await db.delete(); this.epochs.delete(old); + await db.delete(); } }