Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions yarn-project/prover-node/src/prover-node.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,27 @@ describe('prover-node', () => {
expect(proverNode.totalJobCount).toEqual(1);
});

it('awaits in-flight epoch jobs before stop resolves', async () => {
// Block job.run() so the runJob wrapper stays in-flight while we call stop.
const jobRun = promiseWithResolvers<void>();
proverNode.nextJobRun = () => jobRun.promise;
await proverNode.handleEpochReadyToProve(EpochNumber.fromBigInt(10n));

let stopped = false;
const stopPromise = proverNode.stop().then(() => {
stopped = true;
});

await sleep(100);
expect(stopped).toBe(false);
expect(jobs[0].job.stop).toHaveBeenCalled();

// Resolving the run promise unblocks the runJob wrapper, which lets stop complete.
jobRun.resolve();
await stopPromise;
expect(stopped).toBe(true);
});

class TestProverNode extends ProverNode {
public totalJobCount = 0;
public nextJobState: EpochProvingJobState = 'completed';
Expand Down
22 changes: 20 additions & 2 deletions yarn-project/prover-node/src/prover-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable
private log = createLogger('prover-node');

private jobs: Map<string, EpochProvingJob> = new Map();
private runJobPromises: Set<Promise<void>> = new Set();
private config: ProverNodeOptions;
private jobMetrics: ProverNodeJobMetrics;
private rewardsMetrics: ProverNodeRewardsMetrics;
Expand Down Expand Up @@ -166,10 +167,21 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable
async stop() {
this.log.info('Stopping ProverNode');
await this.epochsMonitor.stop();

// Signal in-flight epoch jobs to stop and await the entire `runJob` wrapper
// (not just `EpochProvingJob.run`) before tearing down the prover, publisher or
// world-state. The wrapper does post-`run` work (e.g. `tryUploadEpochFailure`,
// re-creating a job on reorg) that still touches world-state, and `asyncPool`
// tasks inside `run` can hold native world-state forks open. Without this barrier,
// those tasks have been observed calling into the native world-state addon after
// `ProverNode.stop()` returned, producing process-level SEGFAULTs during e2e
// teardown of in-process simulated prover-nodes.
await Promise.all(Array.from(this.jobs.values()).map(job => job.stop()));
await Promise.all(Array.from(this.runJobPromises));

await this.prover.stop();
await tryStop(this.publisherFactory);
this.publisher?.interrupt();
await Promise.all(Array.from(this.jobs.values()).map(job => job.stop()));
this.rewardsMetrics.stop();
this.l1Metrics.stop();
await this.telemetryClient.stop();
Expand All @@ -192,7 +204,13 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable
*/
public async startProof(epochNumber: EpochNumber) {
const job = await this.createProvingJob(epochNumber, { skipEpochCheck: true });
void this.runJob(job);
this.trackRunJob(job);
}

/** Spawns `runJob` and tracks the resulting promise so `stop()` can await it. */
private trackRunJob(job: EpochProvingJob) {
const promise = this.runJob(job).finally(() => this.runJobPromises.delete(promise));
this.runJobPromises.add(promise);
}

private async runJob(job: EpochProvingJob) {
Expand Down
12 changes: 12 additions & 0 deletions yarn-project/world-state/src/native/native_world_state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1478,6 +1478,18 @@ describe('NativeWorldState', () => {

await Promise.all([setupFork.close(), testFork.close()]);
}, 30_000);

it('rejects calls issued after close with a JS error rather than crashing', async () => {
const wsLocal = await NativeWorldStateService.tmp();
const fork = await wsLocal.fork();
await fork.close();
await wsLocal.close();

// A fresh fork() after close must surface a JS error, not segfault into the
// destroyed native addon. This is the defensive-shutdown contract relied on by
// ProverNode teardown.
await expect(wsLocal.fork()).rejects.toThrow(/closed/i);
});
});

describe('Checkpoints', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ export class NativeWorldState implements NativeWorldStateInstance {
responseHandler = (response: WorldStateResponse[T]): WorldStateResponse[T] => response,
errorHandler = (_: string) => {},
): Promise<WorldStateResponse[T]> {
// Reject calls that arrive after `close()` has flipped the destruction guard, before
// they reach the per-fork queue. This is defence-in-depth for the teardown race where
// an in-process owner (e.g. an in-flight EpochProvingJob) tries to use the addon
// after the native instance has been closed: a clean JS error is far preferable to
// the SIGSEGV that an unguarded native call would otherwise produce.
if (!this.open) {
throw new Error(`Native world state is closed; cannot call ${WorldStateMessageType[messageType]}`);
}

// Here we determine which fork the request is being executed against and whether it requires uncommitted data
// We use the fork Id to select the appropriate request queue and the uncommitted data flag to pass to the queue
let forkId = -1;
Expand Down Expand Up @@ -215,14 +224,24 @@ export class NativeWorldState implements NativeWorldStateInstance {

/**
* Stops the native instance.
*
* Flips `this.open` to reject any further `call()` invocations, drains every
* per-fork queue, then sends `CLOSE` on the canonical queue. Draining fork
* queues before sending the native CLOSE prevents a race where an in-flight
* fork-queue call would touch native state that CLOSE has already destroyed.
*/
public async close(): Promise<void> {
if (!this.open) {
return;
}
this.open = false;
const queue = this.queues.get(0)!;

const forkQueues = Array.from(this.queues.entries())
.filter(([forkId]) => forkId !== 0)
.map(([, queue]) => queue.stop());
await Promise.all(forkQueues);

const queue = this.queues.get(0)!;
await queue.execute(
async () => {
await this._sendMessage(WorldStateMessageType.CLOSE, { canonical: true });
Expand Down
Loading