Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ export const accessControlActor = actor({
onRequest(_c, request) {
const url = new URL(request.url);
if (url.pathname === "/status") {
return Response.json({ ok: true });
return new Response(JSON.stringify({ ok: true }), {
headers: {
"content-type": "application/json",
},
});
}
return new Response("Not Found", { status: 404 });
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ export const dbLifecycle = actor({
},
},
options: {
sleepTimeout: 100,
sleepTimeout: 500,
},
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,17 @@ export const destroyActor = actor({
onRequest: (c, request) => {
const url = new URL(request.url);
if (url.pathname === "/state") {
return Response.json({
key: c.state.key,
value: c.state.value,
});
return new Response(
JSON.stringify({
key: c.state.key,
value: c.state.value,
}),
{
headers: {
"content-type": "application/json",
},
},
);
}

return new Response("Not Found", { status: 404 });
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,33 @@
import { actor } from "rivetkit";
import type { registry } from "./registry-static";

function isDynamicSandboxRuntime(): boolean {
return process.cwd() === "/root";
}

async function waitForConnectionOpen(connection: {
connStatus: string;
onOpen(callback: () => void): () => void;
onError(callback: (error: unknown) => void): () => void;
}) {
if (connection.connStatus === "connected") {
return;
}

await new Promise<void>((resolve, reject) => {
const unsubscribeOpen = connection.onOpen(() => {
unsubscribeOpen();
unsubscribeError();
resolve();
});
const unsubscribeError = connection.onError((error) => {
unsubscribeOpen();
unsubscribeError();
reject(error);
});
});
}

export const inlineClientActor = actor({
state: { messages: [] as string[] },
actions: {
Expand Down Expand Up @@ -30,7 +57,24 @@ export const inlineClientActor = actor({
connectToCounterAndIncrement: async (c, amount: number) => {
const client = c.client<typeof registry>();
const handle = client.counter.getOrCreate(["inline-test-stateful"]);

if (isDynamicSandboxRuntime()) {
const events: number[] = [];
const result1 = await handle.increment(amount);
events.push(result1);
const result2 = await handle.increment(amount * 2);
events.push(result2);

c.state.messages.push(
`Connected to counter, incremented by ${amount} and ${amount * 2}, results: ${result1}, ${result2}, events: ${JSON.stringify(events)}`,
);

return { result1, result2, events };
}

await handle.getCount();
const connection = handle.connect();
await waitForConnectionOpen(connection);

// Set up event listener
const events: number[] = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,24 @@ export const queueActor = actor({
iterWithSignalAbort: async (c) => {
const controller = new AbortController();
controller.abort();
for await (const _message of c.queue.iter({
names: ["abort"],
signal: controller.signal,
})) {
return { ok: false };
try {
for await (const _message of c.queue.iter({
names: ["abort"],
signal: controller.signal,
})) {
return { ok: false };
}
return { ok: true };
} catch (error) {
const actorError = error as { group?: string; code?: string };
if (
actorError.group === "actor" &&
actorError.code === "aborted"
) {
return { ok: true };
}
throw error;
}
return { ok: true };
},
receiveAndComplete: async (c, name: "tasks") => {
const message = await c.queue.next({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { actor } from "rivetkit";
import type { registry } from "./registry-static";
import { actor, queue } from "rivetkit";

export const RUN_SLEEP_TIMEOUT = 1000;

Expand All @@ -18,7 +17,6 @@ export const runWithTicks = actor({
while (!c.aborted) {
c.state.tickCount += 1;
c.state.lastTickAt = Date.now();
c.log.info({ msg: "tick", tickCount: c.state.tickCount });

// Wait 50ms between ticks, or exit early if aborted
await new Promise<void>((resolve) => {
Expand Down Expand Up @@ -58,6 +56,9 @@ export const runWithQueueConsumer = actor({
runStarted: false,
wakeCount: 0,
},
queues: {
messages: queue<unknown>(),
},
onWake: (c) => {
c.state.wakeCount += 1;
},
Expand Down Expand Up @@ -85,9 +86,7 @@ export const runWithQueueConsumer = actor({
wakeCount: c.state.wakeCount,
}),
sendMessage: async (c, body: unknown) => {
const client = c.client<typeof registry>();
const handle = client.runWithQueueConsumer.getForId(c.actorId);
await handle.send("messages", body);
await c.queue.send("messages", body);
return true;
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ export const sleepWithDb = actor({
triggerSleep: (c) => {
c.sleep();
},
triggerSleepTwice: (c) => {
c.sleep();
c.sleep();
},
getCounts: (c) => {
return {
startCount: c.state.startCount,
Expand Down Expand Up @@ -195,6 +199,10 @@ export const sleepWithDbConn = actor({
triggerSleep: (c) => {
c.sleep();
},
triggerSleepTwice: (c) => {
c.sleep();
c.sleep();
},
getCounts: (c) => {
return {
startCount: c.state.startCount,
Expand Down Expand Up @@ -268,6 +276,10 @@ export const sleepWithDbAction = actor({
triggerSleep: (c) => {
c.sleep();
},
triggerSleepTwice: (c) => {
c.sleep();
c.sleep();
},
getCounts: (c) => {
return {
startCount: c.state.startCount,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { actor } from "rivetkit";
import type { registry } from "./registry-static";

/**
* Actor designed to test start/stop race conditions.
Expand All @@ -19,10 +20,23 @@ export const startStopRaceActor = actor({

c.state.initialized = true;
c.state.startCompleted = true;

const client = c.client<typeof registry>();
const observer = client.lifecycleObserver.getOrCreate(["observer"]);
await observer.recordEvent({
actorKey: c.key.join("/"),
event: "started",
});
},
onDestroy: (c) => {
onDestroy: async (c) => {
c.state.destroyCalled = true;
// Don't save state here - the actor framework will save it automatically

const client = c.client<typeof registry>();
const observer = client.lifecycleObserver.getOrCreate(["observer"]);
await observer.recordEvent({
actorKey: c.key.join("/"),
event: "destroy",
});
},
actions: {
getState: (c) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,11 @@ export class ActorQueue<
TCompletable
>;
} catch (error) {
if (error instanceof errors.ActorAborted) {
if (
errors.ActorError.isActorError(error) &&
error.group === "actor" &&
error.code === "aborted"
) {
return;
}
throw error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ export function runActorHandleTests(driverTestConfig: DriverTestConfig) {
const key = ["duplicate-create-handle", crypto.randomUUID()];

// First create should succeed
await client.counter.create(key);
const handle = await client.counter.create(key);
await handle.increment(0);

// Second create with same key should throw ActorAlreadyExists
try {
Expand Down
Loading
Loading