diff --git a/engine/sdks/typescript/runner/src/mod.ts b/engine/sdks/typescript/runner/src/mod.ts index 724f8ecd49..c8ee1ad8ad 100644 --- a/engine/sdks/typescript/runner/src/mod.ts +++ b/engine/sdks/typescript/runner/src/mod.ts @@ -7,6 +7,7 @@ import { stringifyToClient, stringifyToServer } from "./stringify"; import { type HibernatingWebSocketMetadata, Tunnel } from "./tunnel"; import { calculateBackoff, + idToStr, parseWebSocketCloseReason, stringifyError, unreachable, @@ -323,24 +324,42 @@ export class Runner { } async forceStopActor(actorId: string, generation?: number) { - this.log?.debug({ + this.log?.info({ msg: "force stopping actor", actorId, + generation, }); const actor = this.getActor(actorId, generation); - if (!actor) return; + if (!actor) { + this.log?.info({ + msg: "force stop: actor not found, skipping", + actorId, + generation, + }); + return; + } // If onActorStop times out, Pegboard will handle this timeout with ACTOR_STOP_THRESHOLD_DURATION_MS // // If we receive a request while onActorStop is running, a Service // Unavailable error will be returned to Guard and the request will be // retried + this.log?.info({ + msg: "force stop: calling onActorStop", + actorId, + generation: actor.generation, + }); try { await this.#config.onActorStop(actorId, actor.generation); } catch (err) { console.error(`Error in onActorStop for actor ${actorId}:`, err); } + this.log?.info({ + msg: "force stop: onActorStop complete", + actorId, + generation: actor.generation, + }); // Close requests after onActorStop so you can send messages over the tunnel this.#tunnel?.closeActiveRequests(actor); @@ -350,6 +369,11 @@ export class Runner { // Remove actor after stopping in order to ensure that we can still // call actions on the runner this.#removeActor(actorId, generation); + this.log?.info({ + msg: "force stop: actor removed", + actorId, + generation: actor.generation, + }); } #handleLost() { @@ -1006,6 +1030,17 @@ export class Runner { for (const commandWrapper of commands) { if (commandWrapper.inner.tag === "CommandStartActor") { + const startCmd = commandWrapper.inner.val as protocol.CommandStartActor; + this.log?.info({ + msg: "raw CommandStartActor from engine", + actorId: commandWrapper.checkpoint.actorId, + generation: commandWrapper.checkpoint.generation, + actorName: startCmd.config.name, + hibernatingRequestCount: startCmd.hibernatingRequests.length, + hibernatingRequestIds: startCmd.hibernatingRequests.map( + (hr) => `${idToStr(hr.gatewayId)}:${idToStr(hr.requestId)}` + ), + }); // Spawn background promise this.#handleCommandStartActor(commandWrapper).catch((err) => { this.log?.error({ @@ -1154,24 +1189,44 @@ export class Runner { hibernatingRequests: startCommand.hibernatingRequests.length, }); + // Log each hibernating request from the engine for debugging + for (const hr of startCommand.hibernatingRequests) { + this.log?.info({ + msg: "engine provided hibernating request", + actorId, + gatewayId: idToStr(hr.gatewayId), + requestId: idToStr(hr.requestId), + }); + } + this.#sendActorStateUpdate(actorId, generation, "running"); try { // TODO: Add timeout to onActorStart // Call onActorStart asynchronously and handle errors - this.log?.debug({ + this.log?.info({ msg: "calling onActorStart", actorId, generation, }); + const startTime = performance.now(); await this.#config.onActorStart(actorId, generation, actorConfig); + const startDurationMs = performance.now() - startTime; + + this.log?.info({ + msg: "onActorStart complete", + actorId, + generation, + startDurationMs: Math.round(startDurationMs), + }); instance.actorStartPromise.resolve(); } catch (err) { this.log?.error({ msg: "error starting runner actor", actorId, - err, + generation, + err: stringifyError(err), }); instance.actorStartPromise.reject(err); diff --git a/engine/sdks/typescript/runner/src/tunnel.ts b/engine/sdks/typescript/runner/src/tunnel.ts index ad2b650e74..28f44d90de 100644 --- a/engine/sdks/typescript/runner/src/tunnel.ts +++ b/engine/sdks/typescript/runner/src/tunnel.ts @@ -1124,6 +1124,26 @@ export class Tunnel { if (actor) { const adapter = actor.getWebSocket(gatewayId, requestId); if (adapter) { + // For hibernatable connections that receive a downstream + // close (e.g. tunnel ping timeout during migration), skip + // the user-facing close event so the connection state stays + // persisted in KV. The next runner will restore it. + if ( + adapter[HIBERNATABLE_SYMBOL] && + close.reason === "ws.downstream_closed" + ) { + this.log?.info({ + msg: "skipping close for hibernatable connection with downstream_closed, preserving KV", + requestId: idToStr(requestId), + code: close.code, + reason: close.reason, + }); + actor.deleteWebSocket(gatewayId, requestId); + actor.deletePendingRequest(gatewayId, requestId); + this.#removeRequestToActor(gatewayId, requestId); + return; + } + // We don't need to send a close response adapter._handleClose( requestId, diff --git a/engine/sdks/typescript/runner/src/websocket-tunnel-adapter.ts b/engine/sdks/typescript/runner/src/websocket-tunnel-adapter.ts index a40f0830d9..3275ac7c82 100644 --- a/engine/sdks/typescript/runner/src/websocket-tunnel-adapter.ts +++ b/engine/sdks/typescript/runner/src/websocket-tunnel-adapter.ts @@ -138,19 +138,15 @@ export class WebSocketTunnelAdapter { const expectedIndex = wrappingAddU16(previousIndex, 1); if (serverMessageIndex !== expectedIndex) { - const closeReason = "ws.message_index_skip"; this.#log?.warn({ - msg: "hibernatable websocket message index out of sequence, closing connection", + msg: "hibernatable websocket message index out of sequence", requestId, actorId: this.#actorId, previousIndex, expectedIndex, receivedIndex: serverMessageIndex, - closeReason, gap: wrappingSubU16(wrappingSubU16(serverMessageIndex, previousIndex), 1), }); - this.#close(1008, closeReason, true); - return true; } this.#serverMessageIndex = serverMessageIndex; diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts index 98a7841793..9792a5cdc6 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts @@ -216,7 +216,10 @@ export class StateManager< async saveState(opts: SaveStateOptions): Promise { this.#actor.assertReady(); - if (this.#persistChanged) { + const hasChanges = + this.#persistChanged || + this.#actor.connectionManager.connsWithPersistChanged.size > 0; + if (hasChanges) { if (opts.immediate) { await this.#savePersistInner(); } else {