Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/evil-carrots-grab.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"partyserver": patch
---

remove redundant initialize code as setName takes care of it, along with the nested blockConcurrencyWhile call
20 changes: 1 addition & 19 deletions packages/partyserver/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -483,12 +483,6 @@ Did you try connecting directly to this Durable Object? Try using getServerByNam
await this.setName(connection.server);
// TODO: ^ this shouldn't be async

if (this.#status !== "started") {
// This means the server "woke up" after hibernation
// so we need to hydrate it again
await this.#initialize();
}

return this.onMessage(connection, message);
}

Expand All @@ -508,11 +502,6 @@ Did you try connecting directly to this Durable Object? Try using getServerByNam
await this.setName(connection.server);
// TODO: ^ this shouldn't be async

if (this.#status !== "started") {
// This means the server "woke up" after hibernation
// so we need to hydrate it again
await this.#initialize();
}
return this.onClose(connection, code, reason, wasClean);
}

Expand All @@ -527,11 +516,6 @@ Did you try connecting directly to this Durable Object? Try using getServerByNam
await this.setName(connection.server);
// TODO: ^ this shouldn't be async

if (this.#status !== "started") {
// This means the server "woke up" after hibernation
// so we need to hydrate it again
await this.#initialize();
}
return this.onError(connection, error);
}

Expand Down Expand Up @@ -612,9 +596,7 @@ Did you try connecting directly to this Durable Object? Try using getServerByNam
this.#_name = name;

if (this.#status !== "started") {
await this.ctx.blockConcurrencyWhile(async () => {
await this.#initialize();
});
await this.#initialize();
}
}

Expand Down
140 changes: 139 additions & 1 deletion packages/partyserver/src/tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
createExecutionContext,
env
env,
runDurableObjectAlarm
// waitOnExecutionContext
} from "cloudflare:test";
import { describe, expect, it } from "vitest";
Expand Down Expand Up @@ -301,6 +302,143 @@ describe("Server", () => {
// describe("in-memory");
});

describe("Hibernating Server (setName handles initialization)", () => {
it("calls onStart before processing connections", async () => {
const ctx = createExecutionContext();
const request = new Request(
"http://example.com/parties/hibernating-on-start-server/h-test1",
{
headers: { Upgrade: "websocket" }
}
);
const response = await worker.fetch(request, env, ctx);
const ws = response.webSocket!;
ws.accept();

const { promise, resolve, reject } = Promise.withResolvers<void>();
ws.addEventListener("message", (message) => {
try {
// counter should be 1 because onStart completed before onConnect
expect(message.data).toEqual("1");
resolve();
} catch (e) {
reject(e);
} finally {
ws.close();
}
});

return promise;
});

it("calls onStart only once with concurrent connections and requests", async () => {
const ctx = createExecutionContext();

async function makeConnection() {
const request = new Request(
"http://example.com/parties/hibernating-on-start-server/h-test2",
{
headers: { Upgrade: "websocket" }
}
);
const response = await worker.fetch(request, env, ctx);
const ws = response.webSocket!;
ws.accept();
const { promise, resolve, reject } = Promise.withResolvers<void>();
ws.addEventListener("message", (message) => {
try {
expect(message.data).toEqual("1");
resolve();
} catch (e) {
reject(e);
} finally {
ws.close();
}
});
return promise;
}

async function makeRequest() {
const request = new Request(
"http://example.com/parties/hibernating-on-start-server/h-test2"
);
const response = await worker.fetch(request, env, ctx);
expect(await response.text()).toEqual("1");
}

await Promise.all([makeConnection(), makeConnection(), makeRequest()]);
});

it("handles websocket messages after initialization", async () => {
const ctx = createExecutionContext();
const request = new Request(
"http://example.com/parties/hibernating-on-start-server/h-test3",
{
headers: { Upgrade: "websocket" }
}
);
const response = await worker.fetch(request, env, ctx);
const ws = response.webSocket!;
ws.accept();

// Wait for the onConnect message
const connectMessage = await new Promise<string>((resolve) => {
ws.addEventListener("message", (e) => resolve(e.data as string), {
once: true
});
});
expect(connectMessage).toEqual("1");

// Send a message and verify the server is still initialized
ws.send("hello");
const echoMessage = await new Promise<string>((resolve) => {
ws.addEventListener("message", (e) => resolve(e.data as string), {
once: true
});
});
expect(echoMessage).toEqual("counter:1");

ws.close();
});
});

describe("Alarm (initialize without redundant blockConcurrencyWhile)", () => {
it("properly initializes on alarm and calls onAlarm", async () => {
// Use a single stub for the entire test so runDurableObjectAlarm
// sees the same DO instance that has the alarm scheduled.
const id = env.AlarmServer.idFromName("alarm-test1");
const stub = env.AlarmServer.get(id);

// Initialize the DO and schedule an alarm in one request
const res = await stub.fetch(
new Request(
"http://example.com/parties/alarm-server/alarm-test1?setAlarm=1",
{
headers: { "x-partykit-room": "alarm-test1" }
}
)
);
expect(await res.text()).toEqual("alarm set");

// Trigger the alarm
const ran = await runDurableObjectAlarm(stub);
expect(ran).toBe(true);

// Verify state: onStart called once, alarm was triggered once
const stateRes = await stub.fetch(
new Request("http://example.com/", {
headers: { "x-partykit-room": "alarm-test1" }
})
);
const state = (await stateRes.json()) as {
counter: number;
alarmCount: number;
};
expect(state.counter).toEqual(1);
expect(state.alarmCount).toEqual(1);
});
});

describe("CORS", () => {
it("returns CORS headers on OPTIONS preflight for matched routes", async () => {
const ctx = createExecutionContext();
Expand Down
73 changes: 72 additions & 1 deletion packages/partyserver/src/tests/worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { routePartykitRequest, Server } from "../index";

import type { Connection, ConnectionContext } from "../index";
import type { Connection, ConnectionContext, WSMessage } from "../index";

function assert(condition: unknown, message: string): asserts condition {
if (!condition) {
Expand All @@ -11,6 +11,8 @@ function assert(condition: unknown, message: string): asserts condition {
export type Env = {
Stateful: DurableObjectNamespace<Stateful>;
OnStartServer: DurableObjectNamespace<OnStartServer>;
HibernatingOnStartServer: DurableObjectNamespace<HibernatingOnStartServer>;
AlarmServer: DurableObjectNamespace<AlarmServer>;
Mixed: DurableObjectNamespace<Mixed>;
ConfigurableState: DurableObjectNamespace<ConfigurableState>;
ConfigurableStateInMemory: DurableObjectNamespace<ConfigurableStateInMemory>;
Expand Down Expand Up @@ -67,6 +69,75 @@ export class OnStartServer extends Server {
}
}

/**
* Like OnStartServer but with hibernate: true.
* Tests that setName properly initializes the server in the
* hibernating websocket handler path (webSocketMessage, webSocketClose, etc.)
*/
export class HibernatingOnStartServer extends Server {
static options = {
hibernate: true
};

counter = 0;

async onStart() {
assert(this.name, "name is not available inside onStart");
await new Promise<void>((resolve) => {
setTimeout(() => {
this.counter++;
resolve();
}, 300);
});
}

onConnect(connection: Connection) {
connection.send(this.counter.toString());
}

onMessage(connection: Connection, _message: WSMessage) {
connection.send(`counter:${this.counter}`);
}

onRequest(): Response {
return new Response(this.counter.toString());
}
}

/**
* Tests that alarm() properly initializes the server
* without the redundant blockConcurrencyWhile wrapper.
*/
export class AlarmServer extends Server {
static options = {
hibernate: true
};

counter = 0;
alarmCount = 0;

async onStart() {
this.counter++;
}

onAlarm() {
this.alarmCount++;
}

async onRequest(request: Request): Promise<Response> {
const url = new URL(request.url);
if (url.searchParams.get("setAlarm")) {
// Schedule alarm far in the future so it won't auto-fire
await this.ctx.storage.setAlarm(Date.now() + 60_000);
return new Response("alarm set");
}
return Response.json({
counter: this.counter,
alarmCount: this.alarmCount
});
}
}

export class Mixed extends Server {
static options = {
hibernate: true
Expand Down
12 changes: 12 additions & 0 deletions packages/partyserver/src/tests/wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@
{
"name": "CustomCorsServer",
"class_name": "CustomCorsServer"
},
{
"name": "HibernatingOnStartServer",
"class_name": "HibernatingOnStartServer"
},
{
"name": "AlarmServer",
"class_name": "AlarmServer"
}
]
},
Expand All @@ -61,6 +69,10 @@
{
"tag": "v3",
"new_classes": ["CorsServer", "CustomCorsServer"]
},
{
"tag": "v4",
"new_classes": ["HibernatingOnStartServer", "AlarmServer"]
}
]
}
Loading