Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/fix-initialization-race-conditions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"partyserver": patch
---

Fix initialization race conditions and improve error resilience.

- `getServerByName` now propagates errors from the internal `set-name` request instead of silently swallowing them.
- `onStart` failures no longer permanently brick the Durable Object. Errors are caught inside `blockConcurrencyWhile` (preserving the input gate) and the status is reset, allowing subsequent requests to retry initialization.
- `fetch()` now retries initialization when a previous `onStart` attempt failed, instead of skipping it because the name was already set.
- Errors in `fetch()` (including `onStart` failures and malformed props) are now caught and returned as proper 500 responses instead of crashing as unhandled exceptions.
- WebSocket handlers (`webSocketMessage`, `webSocketClose`, `webSocketError`) are now wrapped in try/catch so that transient `onStart` failures don't kill the connection — the next message will retry.
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/hono-party/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@
"devDependencies": {
"@cloudflare/workers-types": "^4.20251218.0",
"hono": "^4.11.1",
"partyserver": "^0.1.3"
"partyserver": "^0.1.4"
}
}
2 changes: 1 addition & 1 deletion packages/partyfn/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
],
"dependencies": {
"nanoid": "^5.1.6",
"partysocket": "^1.1.12"
"partysocket": "^1.1.13"
},
"scripts": {
"build": "tsx scripts/build.ts"
Expand Down
118 changes: 70 additions & 48 deletions packages/partyserver/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,7 @@
}

// unfortunately we have to await this
await stub
.fetch(req)
// drain body
.then((res) => res.text())
.catch((e) => {
console.error("Could not set server name:", e);
});
await stub.fetch(req).then((res) => res.text());

return stub;
}
Expand Down Expand Up @@ -366,34 +360,32 @@
* Handle incoming requests to the server.
*/
async fetch(request: Request): Promise<Response> {
// Set the props in-mem if the request included them.
const props = request.headers.get("x-partykit-props");
if (props) {
try {
try {
// Set the props in-mem if the request included them.
const props = request.headers.get("x-partykit-props");
if (props) {
this.#_props = JSON.parse(props);
} catch {
// This should never happen but log it just in case
console.error("Internal error parsing context props.");
}
}
if (!this.#_name) {
// This is temporary while we solve https://github.com/cloudflare/workerd/issues/2240

if (!this.#_name) {
// This is temporary while we solve https://github.com/cloudflare/workerd/issues/2240

// get namespace and room from headers
// const namespace = request.headers.get("x-partykit-namespace");
const room = request.headers.get("x-partykit-room");
if (
// !namespace ||
!room
) {
throw new Error(`Missing namespace or room headers when connecting to ${this.#ParentClass.name}.
// get namespace and room from headers
// const namespace = request.headers.get("x-partykit-namespace");
const room = request.headers.get("x-partykit-room");
if (
// !namespace ||
!room
) {
throw new Error(`Missing namespace or room headers when connecting to ${this.#ParentClass.name}.
Did you try connecting directly to this Durable Object? Try using getServerByName(namespace, id) instead.`);
}
await this.setName(room);
} else if (this.#status !== "started") {
// Name was set by a previous request but initialization failed.
// Retry initialization so the server can recover from transient
// onStart failures.
await this.#initialize();
}
await this.setName(room);
}

try {
const url = new URL(request.url);

// TODO: this is a hack to set the server name,
Expand Down Expand Up @@ -450,7 +442,7 @@
}
} catch (err) {
console.error(
`Error in ${this.#ParentClass.name}:${this.name} fetch:`,
`Error in ${this.#ParentClass.name}:${this.#_name ?? "<unnamed>"} fetch:`,
err
);
if (!(err instanceof Error)) throw err;
Expand All @@ -472,18 +464,25 @@
async webSocketMessage(ws: WebSocket, message: WSMessage): Promise<void> {
// Ignore websockets accepted outside PartyServer (e.g. via
// `state.acceptWebSocket()` in user code). These sockets won't have the
// `__pk` attachment namespace required to rehydrate a Connection.

Check warning

Code scanning / CodeQL

Information exposure through a stack trace Medium

This information exposed to the user depends on
stack trace information
.
This information exposed to the user depends on stack trace information.
if (!isPartyServerWebSocket(ws)) {
return;
}

const connection = createLazyConnection(ws);
try {
const connection = createLazyConnection(ws);

// rehydrate the server name if it's woken up
await this.setName(connection.server);
// TODO: ^ this shouldn't be async
// rehydrate the server name if it's woken up
await this.setName(connection.server);
// TODO: ^ this shouldn't be async

return this.onMessage(connection, message);
return this.onMessage(connection, message);
} catch (e) {
console.error(
`Error in ${this.#ParentClass.name}:${this.#_name ?? "<unnamed>"} webSocketMessage:`,
e
);
}
}

async webSocketClose(
Expand All @@ -496,35 +495,58 @@
return;
}

const connection = createLazyConnection(ws);
try {
const connection = createLazyConnection(ws);

// rehydrate the server name if it's woken up
await this.setName(connection.server);
// TODO: ^ this shouldn't be async
// rehydrate the server name if it's woken up
await this.setName(connection.server);
// TODO: ^ this shouldn't be async

return this.onClose(connection, code, reason, wasClean);
return this.onClose(connection, code, reason, wasClean);
} catch (e) {
console.error(
`Error in ${this.#ParentClass.name}:${this.#_name ?? "<unnamed>"} webSocketClose:`,
e
);
}
}

async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
if (!isPartyServerWebSocket(ws)) {
return;
}

const connection = createLazyConnection(ws);
try {
const connection = createLazyConnection(ws);

// rehydrate the server name if it's woken up
await this.setName(connection.server);
// TODO: ^ this shouldn't be async
// rehydrate the server name if it's woken up
await this.setName(connection.server);
// TODO: ^ this shouldn't be async

return this.onError(connection, error);
return this.onError(connection, error);
} catch (e) {
console.error(
`Error in ${this.#ParentClass.name}:${this.#_name ?? "<unnamed>"} webSocketError:`,
e
);
}
}

async #initialize(): Promise<void> {
let error: unknown;
await this.ctx.blockConcurrencyWhile(async () => {
this.#status = "starting";
await this.onStart(this.#_props);
this.#status = "started";
try {
await this.onStart(this.#_props);
this.#status = "started";
} catch (e) {
this.#status = "zero";
error = e;
}
});
// Re-throw outside blockConcurrencyWhile so the DO's input gate
// isn't permanently broken, allowing subsequent requests to retry.
if (error) throw error;
}

#attachSocketEventHandlers(connection: Connection) {
Expand Down
80 changes: 80 additions & 0 deletions packages/partyserver/src/tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,86 @@ describe("Hibernating Server (setName handles initialization)", () => {
});
});

describe("Error handling", () => {
it("returns 500 with useful message when room header is missing", async () => {
// Send a request directly to a DO stub without the x-partykit-room header.
// This should return a 500 with the "Missing namespace or room headers"
// message, NOT crash with "Attempting to read .name before it was set".
const id = env.Stateful.idFromName("no-header-test");
const stub = env.Stateful.get(id);
const response = await stub.fetch(
new Request("http://example.com/some-path")
);
expect(response.status).toBe(500);
const body = await response.text();
expect(body).toContain("Missing namespace or room headers");
});
});

describe("onStart failure recovery", () => {
it("resets status so subsequent requests can retry initialization", async () => {
const ctx = createExecutionContext();

// First request: onStart throws on first attempt, returns 500
const request1 = new Request(
"http://example.com/parties/failing-on-start-server/recovery-test"
);
const response1 = await worker.fetch(request1, env, ctx);
expect(response1.status).toBe(500);

// Second request: onStart should succeed on the retry because
// the status was reset to "zero" (not stuck at "starting"), and
// the error was caught inside blockConcurrencyWhile so the DO's
// input gate wasn't permanently broken.
const request2 = new Request(
"http://example.com/parties/failing-on-start-server/recovery-test"
);
const response2 = await worker.fetch(request2, env, ctx);
expect(response2.status).toBe(200);
const data = (await response2.json()) as {
counter: number;
failCount: number;
};
// counter is 2 because onStart ran twice (first failed, second succeeded)
expect(data.counter).toEqual(2);
expect(data.failCount).toEqual(1);
});
});

describe("Hibernating server name rehydration", () => {
it("this.name is available in onMessage after wake-up", async () => {
const ctx = createExecutionContext();
const request = new Request(
"http://example.com/parties/hibernating-name-in-message/rehydrate-test",
{
headers: { Upgrade: "websocket" }
}
);
const response = await worker.fetch(request, env, ctx);
const ws = response.webSocket!;
ws.accept();

// Wait for the onConnect message which includes the name
const connectMessage = await new Promise<string>((resolve) => {
ws.addEventListener("message", (e) => resolve(e.data as string), {
once: true
});
});
expect(connectMessage).toEqual("connected:rehydrate-test");

// Send a message to trigger onMessage, which also reads this.name
ws.send("ping");
const nameMessage = await new Promise<string>((resolve) => {
ws.addEventListener("message", (e) => resolve(e.data as string), {
once: true
});
});
expect(nameMessage).toEqual("name:rehydrate-test");

ws.close();
});
});

describe("Alarm (initialize without redundant blockConcurrencyWhile)", () => {
it("properly initializes on alarm and calls onAlarm", async () => {
// Use a single stub for the entire test so runDurableObjectAlarm
Expand Down
Loading
Loading