From 8fe851caa8beed1410728a996a0339ef4e9f801d Mon Sep 17 00:00:00 2001 From: Sunil Pai Date: Mon, 9 Feb 2026 11:09:24 +0000 Subject: [PATCH] Fix initialization race conditions and improve error resilience. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `getServerByName` now propagates errors from the internal `set-name` request instead of silently swallowing them. - `onStart` failures no longer permanently brick the Durable Object. Errors are caught inside `blockConcurrencyWhile` (preserving the input gate) and the status is reset, allowing subsequent requests to retry initialization. - `fetch()` now retries initialization when a previous `onStart` attempt failed, instead of skipping it because the name was already set. - Errors in `fetch()` (including `onStart` failures and malformed props) are now caught and returned as proper 500 responses instead of crashing as unhandled exceptions. - WebSocket handlers (`webSocketMessage`, `webSocketClose`, `webSocketError`) are now wrapped in try/catch so that transient `onStart` failures don't kill the connection — the next message will retry. --- .../fix-initialization-race-conditions.md | 11 ++ package-lock.json | 14 +-- packages/hono-party/package.json | 2 +- packages/partyfn/package.json | 2 +- packages/partyserver/src/index.ts | 118 +++++++++++------- packages/partyserver/src/tests/index.test.ts | 80 ++++++++++++ packages/partyserver/src/tests/worker.ts | 45 +++++++ packages/partyserver/src/tests/wrangler.jsonc | 33 ++--- packages/partysub/package.json | 4 +- packages/partysync/package.json | 2 +- packages/partywhen/package.json | 2 +- packages/y-partyserver/package.json | 2 +- 12 files changed, 239 insertions(+), 76 deletions(-) create mode 100644 .changeset/fix-initialization-race-conditions.md diff --git a/.changeset/fix-initialization-race-conditions.md b/.changeset/fix-initialization-race-conditions.md new file mode 100644 index 0000000..ec5f42c --- /dev/null +++ b/.changeset/fix-initialization-race-conditions.md @@ -0,0 +1,11 @@ +--- +"partyserver": patch +--- + +Fix initialization race conditions and improve error resilience. + +- `getServerByName` now propagates errors from the internal `set-name` request instead of silently swallowing them. +- `onStart` failures no longer permanently brick the Durable Object. Errors are caught inside `blockConcurrencyWhile` (preserving the input gate) and the status is reset, allowing subsequent requests to retry initialization. +- `fetch()` now retries initialization when a previous `onStart` attempt failed, instead of skipping it because the name was already set. +- Errors in `fetch()` (including `onStart` failures and malformed props) are now caught and returned as proper 500 responses instead of crashing as unhandled exceptions. +- WebSocket handlers (`webSocketMessage`, `webSocketClose`, `webSocketError`) are now wrapped in try/catch so that transient `onStart` failures don't kill the connection — the next message will retry. diff --git a/package-lock.json b/package-lock.json index d0ed4fd..9594718 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12235,7 +12235,7 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "hono": "^4.11.1", - "partyserver": "^0.1.3" + "partyserver": "^0.1.4" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20240729.0", @@ -12260,7 +12260,7 @@ "license": "ISC", "dependencies": { "nanoid": "^5.1.6", - "partysocket": "^1.1.12" + "partysocket": "^1.1.13" } }, "packages/partyhard": { @@ -12327,8 +12327,8 @@ "license": "ISC", "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", - "partyserver": "^0.1.3", - "partysocket": "^1.1.12" + "partyserver": "^0.1.4", + "partysocket": "^1.1.13" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20240729.0", @@ -12345,7 +12345,7 @@ }, "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", - "partyserver": "^0.1.3" + "partyserver": "^0.1.4" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20240729.0", @@ -12367,7 +12367,7 @@ "license": "ISC", "dependencies": { "cron-parser": "^5.4.0", - "partyserver": "^0.1.3" + "partyserver": "^0.1.4" } }, "packages/y-partyserver": { @@ -12382,7 +12382,7 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "@types/lodash.debounce": "^4.0.9", - "partyserver": "^0.1.3", + "partyserver": "^0.1.4", "ws": "^8.18.3", "yjs": "^13.6.28" }, diff --git a/packages/hono-party/package.json b/packages/hono-party/package.json index 4302b1d..a3f2433 100644 --- a/packages/hono-party/package.json +++ b/packages/hono-party/package.json @@ -37,6 +37,6 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "hono": "^4.11.1", - "partyserver": "^0.1.3" + "partyserver": "^0.1.4" } } diff --git a/packages/partyfn/package.json b/packages/partyfn/package.json index e54087c..52ce169 100644 --- a/packages/partyfn/package.json +++ b/packages/partyfn/package.json @@ -19,7 +19,7 @@ ], "dependencies": { "nanoid": "^5.1.6", - "partysocket": "^1.1.12" + "partysocket": "^1.1.13" }, "scripts": { "build": "tsx scripts/build.ts" diff --git a/packages/partyserver/src/index.ts b/packages/partyserver/src/index.ts index 9a17c49..a7a910c 100644 --- a/packages/partyserver/src/index.ts +++ b/packages/partyserver/src/index.ts @@ -67,13 +67,7 @@ export async function getServerByName< } // unfortunately we have to await this - await stub - .fetch(req) - // drain body - .then((res) => res.text()) - .catch((e) => { - console.error("Could not set server name:", e); - }); + await stub.fetch(req).then((res) => res.text()); return stub; } @@ -366,34 +360,32 @@ export class Server< * Handle incoming requests to the server. */ async fetch(request: Request): Promise { - // Set the props in-mem if the request included them. - const props = request.headers.get("x-partykit-props"); - if (props) { - try { + try { + // Set the props in-mem if the request included them. + const props = request.headers.get("x-partykit-props"); + if (props) { this.#_props = JSON.parse(props); - } catch { - // This should never happen but log it just in case - console.error("Internal error parsing context props."); } - } + if (!this.#_name) { + // This is temporary while we solve https://github.com/cloudflare/workerd/issues/2240 - if (!this.#_name) { - // This is temporary while we solve https://github.com/cloudflare/workerd/issues/2240 - - // get namespace and room from headers - // const namespace = request.headers.get("x-partykit-namespace"); - const room = request.headers.get("x-partykit-room"); - if ( - // !namespace || - !room - ) { - throw new Error(`Missing namespace or room headers when connecting to ${this.#ParentClass.name}. + // get namespace and room from headers + // const namespace = request.headers.get("x-partykit-namespace"); + const room = request.headers.get("x-partykit-room"); + if ( + // !namespace || + !room + ) { + throw new Error(`Missing namespace or room headers when connecting to ${this.#ParentClass.name}. Did you try connecting directly to this Durable Object? Try using getServerByName(namespace, id) instead.`); + } + await this.setName(room); + } else if (this.#status !== "started") { + // Name was set by a previous request but initialization failed. + // Retry initialization so the server can recover from transient + // onStart failures. + await this.#initialize(); } - await this.setName(room); - } - - try { const url = new URL(request.url); // TODO: this is a hack to set the server name, @@ -450,7 +442,7 @@ Did you try connecting directly to this Durable Object? Try using getServerByNam } } catch (err) { console.error( - `Error in ${this.#ParentClass.name}:${this.name} fetch:`, + `Error in ${this.#ParentClass.name}:${this.#_name ?? ""} fetch:`, err ); if (!(err instanceof Error)) throw err; @@ -477,13 +469,20 @@ Did you try connecting directly to this Durable Object? Try using getServerByNam return; } - const connection = createLazyConnection(ws); + try { + const connection = createLazyConnection(ws); - // rehydrate the server name if it's woken up - await this.setName(connection.server); - // TODO: ^ this shouldn't be async + // rehydrate the server name if it's woken up + await this.setName(connection.server); + // TODO: ^ this shouldn't be async - return this.onMessage(connection, message); + return this.onMessage(connection, message); + } catch (e) { + console.error( + `Error in ${this.#ParentClass.name}:${this.#_name ?? ""} webSocketMessage:`, + e + ); + } } async webSocketClose( @@ -496,13 +495,20 @@ Did you try connecting directly to this Durable Object? Try using getServerByNam return; } - const connection = createLazyConnection(ws); + try { + const connection = createLazyConnection(ws); - // rehydrate the server name if it's woken up - await this.setName(connection.server); - // TODO: ^ this shouldn't be async + // rehydrate the server name if it's woken up + await this.setName(connection.server); + // TODO: ^ this shouldn't be async - return this.onClose(connection, code, reason, wasClean); + return this.onClose(connection, code, reason, wasClean); + } catch (e) { + console.error( + `Error in ${this.#ParentClass.name}:${this.#_name ?? ""} webSocketClose:`, + e + ); + } } async webSocketError(ws: WebSocket, error: unknown): Promise { @@ -510,21 +516,37 @@ Did you try connecting directly to this Durable Object? Try using getServerByNam return; } - const connection = createLazyConnection(ws); + try { + const connection = createLazyConnection(ws); - // rehydrate the server name if it's woken up - await this.setName(connection.server); - // TODO: ^ this shouldn't be async + // rehydrate the server name if it's woken up + await this.setName(connection.server); + // TODO: ^ this shouldn't be async - return this.onError(connection, error); + return this.onError(connection, error); + } catch (e) { + console.error( + `Error in ${this.#ParentClass.name}:${this.#_name ?? ""} webSocketError:`, + e + ); + } } async #initialize(): Promise { + let error: unknown; await this.ctx.blockConcurrencyWhile(async () => { this.#status = "starting"; - await this.onStart(this.#_props); - this.#status = "started"; + try { + await this.onStart(this.#_props); + this.#status = "started"; + } catch (e) { + this.#status = "zero"; + error = e; + } }); + // Re-throw outside blockConcurrencyWhile so the DO's input gate + // isn't permanently broken, allowing subsequent requests to retry. + if (error) throw error; } #attachSocketEventHandlers(connection: Connection) { diff --git a/packages/partyserver/src/tests/index.test.ts b/packages/partyserver/src/tests/index.test.ts index 358dff7..1cf9a51 100644 --- a/packages/partyserver/src/tests/index.test.ts +++ b/packages/partyserver/src/tests/index.test.ts @@ -402,6 +402,86 @@ describe("Hibernating Server (setName handles initialization)", () => { }); }); +describe("Error handling", () => { + it("returns 500 with useful message when room header is missing", async () => { + // Send a request directly to a DO stub without the x-partykit-room header. + // This should return a 500 with the "Missing namespace or room headers" + // message, NOT crash with "Attempting to read .name before it was set". + const id = env.Stateful.idFromName("no-header-test"); + const stub = env.Stateful.get(id); + const response = await stub.fetch( + new Request("http://example.com/some-path") + ); + expect(response.status).toBe(500); + const body = await response.text(); + expect(body).toContain("Missing namespace or room headers"); + }); +}); + +describe("onStart failure recovery", () => { + it("resets status so subsequent requests can retry initialization", async () => { + const ctx = createExecutionContext(); + + // First request: onStart throws on first attempt, returns 500 + const request1 = new Request( + "http://example.com/parties/failing-on-start-server/recovery-test" + ); + const response1 = await worker.fetch(request1, env, ctx); + expect(response1.status).toBe(500); + + // Second request: onStart should succeed on the retry because + // the status was reset to "zero" (not stuck at "starting"), and + // the error was caught inside blockConcurrencyWhile so the DO's + // input gate wasn't permanently broken. + const request2 = new Request( + "http://example.com/parties/failing-on-start-server/recovery-test" + ); + const response2 = await worker.fetch(request2, env, ctx); + expect(response2.status).toBe(200); + const data = (await response2.json()) as { + counter: number; + failCount: number; + }; + // counter is 2 because onStart ran twice (first failed, second succeeded) + expect(data.counter).toEqual(2); + expect(data.failCount).toEqual(1); + }); +}); + +describe("Hibernating server name rehydration", () => { + it("this.name is available in onMessage after wake-up", async () => { + const ctx = createExecutionContext(); + const request = new Request( + "http://example.com/parties/hibernating-name-in-message/rehydrate-test", + { + headers: { Upgrade: "websocket" } + } + ); + const response = await worker.fetch(request, env, ctx); + const ws = response.webSocket!; + ws.accept(); + + // Wait for the onConnect message which includes the name + const connectMessage = await new Promise((resolve) => { + ws.addEventListener("message", (e) => resolve(e.data as string), { + once: true + }); + }); + expect(connectMessage).toEqual("connected:rehydrate-test"); + + // Send a message to trigger onMessage, which also reads this.name + ws.send("ping"); + const nameMessage = await new Promise((resolve) => { + ws.addEventListener("message", (e) => resolve(e.data as string), { + once: true + }); + }); + expect(nameMessage).toEqual("name:rehydrate-test"); + + ws.close(); + }); +}); + describe("Alarm (initialize without redundant blockConcurrencyWhile)", () => { it("properly initializes on alarm and calls onAlarm", async () => { // Use a single stub for the entire test so runDurableObjectAlarm diff --git a/packages/partyserver/src/tests/worker.ts b/packages/partyserver/src/tests/worker.ts index c9c9012..95b74c6 100644 --- a/packages/partyserver/src/tests/worker.ts +++ b/packages/partyserver/src/tests/worker.ts @@ -19,6 +19,8 @@ export type Env = { StateRoundTrip: DurableObjectNamespace; CorsServer: DurableObjectNamespace; CustomCorsServer: DurableObjectNamespace; + FailingOnStartServer: DurableObjectNamespace; + HibernatingNameInMessage: DurableObjectNamespace; }; export class Stateful extends Server { @@ -262,6 +264,49 @@ export class ConfigurableStateInMemory extends Server { } } +/** + * Tests that onStart failure resets the status so subsequent requests + * can retry initialization. The first call to onStart throws; the second + * succeeds. + */ +export class FailingOnStartServer extends Server { + counter = 0; + failCount = 0; + + async onStart() { + this.counter++; + if (this.counter === 1) { + this.failCount++; + throw new Error("onStart failed on first attempt"); + } + } + + onRequest(): Response { + return Response.json({ + counter: this.counter, + failCount: this.failCount + }); + } +} + +/** + * Tests that this.name is correctly available in onMessage after a + * hibernating server wakes up. Sends this.name back in onMessage. + */ +export class HibernatingNameInMessage extends Server { + static options = { + hibernate: true + }; + + onConnect(connection: Connection): void { + connection.send(`connected:${this.name}`); + } + + onMessage(connection: Connection, _message: WSMessage): void { + connection.send(`name:${this.name}`); + } +} + export class CorsServer extends Server { onRequest(): Response | Promise { return Response.json({ cors: true }); diff --git a/packages/partyserver/src/tests/wrangler.jsonc b/packages/partyserver/src/tests/wrangler.jsonc index 7376d1d..36a8860 100644 --- a/packages/partyserver/src/tests/wrangler.jsonc +++ b/packages/partyserver/src/tests/wrangler.jsonc @@ -50,29 +50,34 @@ { "name": "AlarmServer", "class_name": "AlarmServer" + }, + { + "name": "FailingOnStartServer", + "class_name": "FailingOnStartServer" + }, + { + "name": "HibernatingNameInMessage", + "class_name": "HibernatingNameInMessage" } ] }, "migrations": [ { - "tag": "v1", // Should be unique for each entry - "new_classes": ["Stateful", "OnStartServer", "Mixed"] - }, - { - "tag": "v2", + "tag": "v1", "new_classes": [ + "Stateful", + "OnStartServer", + "Mixed", "ConfigurableState", "ConfigurableStateInMemory", - "StateRoundTrip" + "StateRoundTrip", + "CorsServer", + "CustomCorsServer", + "HibernatingOnStartServer", + "AlarmServer", + "FailingOnStartServer", + "HibernatingNameInMessage" ] - }, - { - "tag": "v3", - "new_classes": ["CorsServer", "CustomCorsServer"] - }, - { - "tag": "v4", - "new_classes": ["HibernatingOnStartServer", "AlarmServer"] } ] } diff --git a/packages/partysub/package.json b/packages/partysub/package.json index 9e84aa5..099d20c 100644 --- a/packages/partysub/package.json +++ b/packages/partysub/package.json @@ -46,7 +46,7 @@ }, "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", - "partyserver": "^0.1.3", - "partysocket": "^1.1.12" + "partyserver": "^0.1.4", + "partysocket": "^1.1.13" } } diff --git a/packages/partysync/package.json b/packages/partysync/package.json index 9eaa1e8..2cf6edc 100644 --- a/packages/partysync/package.json +++ b/packages/partysync/package.json @@ -54,6 +54,6 @@ }, "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", - "partyserver": "^0.1.3" + "partyserver": "^0.1.4" } } diff --git a/packages/partywhen/package.json b/packages/partywhen/package.json index 31371c3..cae5ac7 100644 --- a/packages/partywhen/package.json +++ b/packages/partywhen/package.json @@ -29,6 +29,6 @@ "description": "A library for scheduling and running tasks in Cloudflare Workers", "dependencies": { "cron-parser": "^5.4.0", - "partyserver": "^0.1.3" + "partyserver": "^0.1.4" } } diff --git a/packages/y-partyserver/package.json b/packages/y-partyserver/package.json index 87fb36c..f775eb4 100644 --- a/packages/y-partyserver/package.json +++ b/packages/y-partyserver/package.json @@ -53,7 +53,7 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "@types/lodash.debounce": "^4.0.9", - "partyserver": "^0.1.3", + "partyserver": "^0.1.4", "ws": "^8.18.3", "yjs": "^13.6.28" }