Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions yarn-project/prover-client/src/proving_broker/proving_broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,17 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr
return;
}

if (this.isJobStale(item)) {
// Job belongs to an epoch that has been (or is being) cleaned up. Don't persist a
// late error report — the cleanupPass will drop in-memory state for it, and writing
// to the deleted-epoch database racing with that teardown surfaces as 'Store is closed'.
this.logger.warn(`Discarding error report for stale proving job id=${id} epochNumber=${item.epochNumber}`, {
provingJobId: id,
});
this.inProgress.delete(id);
return;
}

if (!info) {
this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`, {
provingJobId: id,
Expand Down Expand Up @@ -554,6 +565,14 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr
return;
}

if (this.isJobStale(item)) {
this.logger.warn(`Discarding result for stale proving job id=${id} epochNumber=${item.epochNumber}`, {
provingJobId: id,
});
this.inProgress.delete(id);
return;
}

if (!info) {
this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`, {
provingJobId: id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ export class KVBrokerDatabase implements ProvingBrokerDatabase {

public readonly tracer: Tracer;

private deletedEpochs = new Set<number>();

private constructor(
private epochs: Map<number, SingleEpochDatabase>,
private config: ProverBrokerConfig,
Expand All @@ -113,11 +115,26 @@ export class KVBrokerDatabase implements ProvingBrokerDatabase {

// exposed for testing
public async commitWrites(items: Array<ProvingJob | [ProvingJobId, ProvingJobSettledResult]>, epochNumber: number) {
if (this.deletedEpochs.has(epochNumber)) {
// Epoch was already pruned; the broker no longer cares about these writes.
return;
}

const jobsToAdd = items.filter((item): item is ProvingJob => 'id' in item);
const resultsToAdd = items.filter((item): item is [ProvingJobId, ProvingJobSettledResult] => Array.isArray(item));

const db = await this.getEpochDatabase(EpochNumber(epochNumber));
await db.batchWrite(jobsToAdd, resultsToAdd);
try {
await db.batchWrite(jobsToAdd, resultsToAdd);
} catch (err) {
// The store can be closed concurrently by deleteAllProvingJobsOlderThanEpoch while a
// batch is mid-flight. Treat this as a benign no-op — the epoch is being torn down.
if (err instanceof Error && err.message === 'Store is closed') {
this.logger.verbose(`Dropping batch for closed epoch ${epochNumber} store`);
return;
}
throw err;
}
}

private async estimateSize() {
Expand Down Expand Up @@ -181,14 +198,19 @@ export class KVBrokerDatabase implements ProvingBrokerDatabase {
}))
async deleteAllProvingJobsOlderThanEpoch(epochNumber: EpochNumber): Promise<void> {
const oldEpochs = Array.from(this.epochs.keys()).filter(e => e < Number(epochNumber));
// Mark before tearing down: this prevents commitWrites from reopening a deleted epoch's
// directory if a stale batch arrives mid-delete.
for (const old of oldEpochs) {
this.deletedEpochs.add(old);
}
for (const old of oldEpochs) {
const db = this.epochs.get(old);
if (!db) {
continue;
}
this.logger.verbose(`Deleting broker database for epoch ${old}`);
await db.delete();
this.epochs.delete(old);
await db.delete();
}
}

Expand Down
Loading