Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 59 additions & 4 deletions engine/sdks/typescript/runner/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { stringifyToClient, stringifyToServer } from "./stringify";
import { type HibernatingWebSocketMetadata, Tunnel } from "./tunnel";
import {
calculateBackoff,
idToStr,
parseWebSocketCloseReason,
stringifyError,
unreachable,
Expand Down Expand Up @@ -323,24 +324,42 @@ export class Runner {
}

async forceStopActor(actorId: string, generation?: number) {
this.log?.debug({
this.log?.info({
msg: "force stopping actor",
actorId,
generation,
});

const actor = this.getActor(actorId, generation);
if (!actor) return;
if (!actor) {
this.log?.info({
msg: "force stop: actor not found, skipping",
actorId,
generation,
});
return;
}

// If onActorStop times out, Pegboard will handle this timeout with ACTOR_STOP_THRESHOLD_DURATION_MS
//
// If we receive a request while onActorStop is running, a Service
// Unavailable error will be returned to Guard and the request will be
// retried
this.log?.info({
msg: "force stop: calling onActorStop",
actorId,
generation: actor.generation,
});
try {
await this.#config.onActorStop(actorId, actor.generation);
} catch (err) {
console.error(`Error in onActorStop for actor ${actorId}:`, err);
}
this.log?.info({
msg: "force stop: onActorStop complete",
actorId,
generation: actor.generation,
});

// Close requests after onActorStop so you can send messages over the tunnel
this.#tunnel?.closeActiveRequests(actor);
Expand All @@ -350,6 +369,11 @@ export class Runner {
// Remove actor after stopping in order to ensure that we can still
// call actions on the runner
this.#removeActor(actorId, generation);
this.log?.info({
msg: "force stop: actor removed",
actorId,
generation: actor.generation,
});
}

#handleLost() {
Expand Down Expand Up @@ -1006,6 +1030,17 @@ export class Runner {

for (const commandWrapper of commands) {
if (commandWrapper.inner.tag === "CommandStartActor") {
const startCmd = commandWrapper.inner.val as protocol.CommandStartActor;
this.log?.info({
msg: "raw CommandStartActor from engine",
actorId: commandWrapper.checkpoint.actorId,
generation: commandWrapper.checkpoint.generation,
actorName: startCmd.config.name,
hibernatingRequestCount: startCmd.hibernatingRequests.length,
hibernatingRequestIds: startCmd.hibernatingRequests.map(
(hr) => `${idToStr(hr.gatewayId)}:${idToStr(hr.requestId)}`
),
});
// Spawn background promise
this.#handleCommandStartActor(commandWrapper).catch((err) => {
this.log?.error({
Expand Down Expand Up @@ -1154,24 +1189,44 @@ export class Runner {
hibernatingRequests: startCommand.hibernatingRequests.length,
});

// Log each hibernating request from the engine for debugging
for (const hr of startCommand.hibernatingRequests) {
this.log?.info({
msg: "engine provided hibernating request",
actorId,
gatewayId: idToStr(hr.gatewayId),
requestId: idToStr(hr.requestId),
});
}

this.#sendActorStateUpdate(actorId, generation, "running");

try {
// TODO: Add timeout to onActorStart
// Call onActorStart asynchronously and handle errors
this.log?.debug({
this.log?.info({
msg: "calling onActorStart",
actorId,
generation,
});
const startTime = performance.now();
await this.#config.onActorStart(actorId, generation, actorConfig);
const startDurationMs = performance.now() - startTime;

this.log?.info({
msg: "onActorStart complete",
actorId,
generation,
startDurationMs: Math.round(startDurationMs),
});

instance.actorStartPromise.resolve();
} catch (err) {
this.log?.error({
msg: "error starting runner actor",
actorId,
err,
generation,
err: stringifyError(err),
});

instance.actorStartPromise.reject(err);
Expand Down
20 changes: 20 additions & 0 deletions engine/sdks/typescript/runner/src/tunnel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,26 @@ export class Tunnel {
if (actor) {
const adapter = actor.getWebSocket(gatewayId, requestId);
if (adapter) {
// For hibernatable connections that receive a downstream
// close (e.g. tunnel ping timeout during migration), skip
// the user-facing close event so the connection state stays
// persisted in KV. The next runner will restore it.
if (
adapter[HIBERNATABLE_SYMBOL] &&
close.reason === "ws.downstream_closed"
) {
this.log?.info({
msg: "skipping close for hibernatable connection with downstream_closed, preserving KV",
requestId: idToStr(requestId),
code: close.code,
reason: close.reason,
});
actor.deleteWebSocket(gatewayId, requestId);
actor.deletePendingRequest(gatewayId, requestId);
this.#removeRequestToActor(gatewayId, requestId);
return;
}

// We don't need to send a close response
adapter._handleClose(
requestId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,15 @@ export class WebSocketTunnelAdapter {

const expectedIndex = wrappingAddU16(previousIndex, 1);
if (serverMessageIndex !== expectedIndex) {
const closeReason = "ws.message_index_skip";
this.#log?.warn({
msg: "hibernatable websocket message index out of sequence, closing connection",
msg: "hibernatable websocket message index out of sequence",
requestId,
actorId: this.#actorId,
previousIndex,
expectedIndex,
receivedIndex: serverMessageIndex,
closeReason,
gap: wrappingSubU16(wrappingSubU16(serverMessageIndex, previousIndex), 1),
});
this.#close(1008, closeReason, true);
return true;
}

this.#serverMessageIndex = serverMessageIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ export class StateManager<
async saveState(opts: SaveStateOptions): Promise<void> {
this.#actor.assertReady();

if (this.#persistChanged) {
const hasChanges =
this.#persistChanged ||
this.#actor.connectionManager.connsWithPersistChanged.size > 0;
if (hasChanges) {
if (opts.immediate) {
await this.#savePersistInner();
} else {
Expand Down
Loading