From 3ffc10debcbd145cd0fa3b83fde9da3127afd4af Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 11:21:05 -0700 Subject: [PATCH 01/18] Fix stable CI test harness failures --- .changeset/stable-ci-e2e-cleanup.md | 4 ++++ packages/core/e2e/dev.test.ts | 17 ++++++++++------- packages/utils/src/get-port.test.ts | 2 +- 3 files changed, 15 insertions(+), 8 deletions(-) create mode 100644 .changeset/stable-ci-e2e-cleanup.md diff --git a/.changeset/stable-ci-e2e-cleanup.md b/.changeset/stable-ci-e2e-cleanup.md new file mode 100644 index 0000000000..6e3729a673 --- /dev/null +++ b/.changeset/stable-ci-e2e-cleanup.md @@ -0,0 +1,4 @@ +--- +--- + +Fix stable CI test harness failures without releasing package changes. diff --git a/packages/core/e2e/dev.test.ts b/packages/core/e2e/dev.test.ts index a6f40b7649..187745677d 100644 --- a/packages/core/e2e/dev.test.ts +++ b/packages/core/e2e/dev.test.ts @@ -137,21 +137,24 @@ export function createDevTests(config?: DevTestConfig) { }); afterEach(async () => { - // Restore file contents before deleting any files. If a deletion races - // ahead of an api-file restore, the dev server briefly sees an import - // pointing at a missing module and fails compilation. On Windows that - // failure can stick — Turbopack leaves stale imports in the generated - // step route bundle — and every subsequent step request returns 500. + // Restore file contents before clearing any added files. If a deletion + // races ahead of an api-file restore, the dev server briefly sees an + // import pointing at a missing module and fails compilation. On Windows + // that failure can stick - Turbopack leaves stale imports in the + // generated step route bundle — and every subsequent step request + // returns 500. const toRestore = restoreFiles.filter((item) => item.content !== ''); const toDelete = restoreFiles.filter((item) => item.content === ''); await Promise.all( toRestore.map((item) => fs.writeFile(item.path, item.content)) ); if (toDelete.length > 0) { + await Promise.all(toDelete.map((item) => fs.writeFile(item.path, ''))); + await prewarm(); + await new Promise((res) => setTimeout(res, 2000)); + await Promise.all(toDelete.map((item) => fs.unlink(item.path))); await prewarm(); } - await Promise.all(toDelete.map((item) => fs.unlink(item.path))); - await prewarm(); restoreFiles.length = 0; }); diff --git a/packages/utils/src/get-port.test.ts b/packages/utils/src/get-port.test.ts index fd44cd1656..91182a300b 100644 --- a/packages/utils/src/get-port.test.ts +++ b/packages/utils/src/get-port.test.ts @@ -325,7 +325,7 @@ describe('getWorkflowPort', () => { const fastAddr = fastServer.address() as AddressInfo; expect(port).toBe(fastAddr.port); // Should complete reasonably quickly (Windows CI can be slow) - expect(elapsed).toBeLessThan(2000); + expect(elapsed).toBeLessThan(5000); }); it('should handle concurrent getWorkflowPort calls', async () => { From 5e75f9c912a6cae3d1455f2d04caea0cda3d1c96 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 11:38:48 -0700 Subject: [PATCH 02/18] Fix dev e2e cleanup races --- packages/core/e2e/dev.test.ts | 25 +++++++++++++------------ packages/utils/src/get-port.test.ts | 11 +++++++---- packages/utils/src/get-port.ts | 3 ++- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/packages/core/e2e/dev.test.ts b/packages/core/e2e/dev.test.ts index 187745677d..b4f8de2f49 100644 --- a/packages/core/e2e/dev.test.ts +++ b/packages/core/e2e/dev.test.ts @@ -94,6 +94,12 @@ export function createDevTests(config?: DevTestConfig) { await Promise.all([ fetchWithTimeout('/').catch(() => {}), fetchWithTimeout('/api/chat').catch(() => {}), + fetchWithTimeout('/.well-known/workflow/v1/flow?__health').catch( + () => {} + ), + fetchWithTimeout('/.well-known/workflow/v1/step?__health').catch( + () => {} + ), ]); }; @@ -137,22 +143,17 @@ export function createDevTests(config?: DevTestConfig) { }); afterEach(async () => { - // Restore file contents before clearing any added files. If a deletion - // races ahead of an api-file restore, the dev server briefly sees an - // import pointing at a missing module and fails compilation. On Windows - // that failure can stick - Turbopack leaves stale imports in the - // generated step route bundle — and every subsequent step request - // returns 500. + // Restore file contents before clearing any added files. Keeping empty + // placeholders avoids dev-server races where generated imports briefly + // point at a missing module between the dev test and the follow-on e2e + // suite. const toRestore = restoreFiles.filter((item) => item.content !== ''); - const toDelete = restoreFiles.filter((item) => item.content === ''); + const toClear = restoreFiles.filter((item) => item.content === ''); await Promise.all( toRestore.map((item) => fs.writeFile(item.path, item.content)) ); - if (toDelete.length > 0) { - await Promise.all(toDelete.map((item) => fs.writeFile(item.path, ''))); - await prewarm(); - await new Promise((res) => setTimeout(res, 2000)); - await Promise.all(toDelete.map((item) => fs.unlink(item.path))); + if (toClear.length > 0) { + await Promise.all(toClear.map((item) => fs.writeFile(item.path, ''))); await prewarm(); } restoreFiles.length = 0; diff --git a/packages/utils/src/get-port.test.ts b/packages/utils/src/get-port.test.ts index 91182a300b..4147ac8c5f 100644 --- a/packages/utils/src/get-port.test.ts +++ b/packages/utils/src/get-port.test.ts @@ -318,14 +318,17 @@ describe('getWorkflowPort', () => { await new Promise((resolve) => slowServer.listen(0, resolve)); await new Promise((resolve) => fastServer.listen(0, resolve)); + const slowAddr = slowServer.address() as AddressInfo; + const fastAddr = fastServer.address() as AddressInfo; const start = Date.now(); - const port = await getWorkflowPort({ timeout: 100 }); + const port = await getWorkflowPort({ + timeout: 100, + candidatePorts: [slowAddr.port, fastAddr.port], + }); const elapsed = Date.now() - start; - const fastAddr = fastServer.address() as AddressInfo; expect(port).toBe(fastAddr.port); - // Should complete reasonably quickly (Windows CI can be slow) - expect(elapsed).toBeLessThan(5000); + expect(elapsed).toBeLessThan(2000); }); it('should handle concurrent getWorkflowPort calls', async () => { diff --git a/packages/utils/src/get-port.ts b/packages/utils/src/get-port.ts index b6806a35a4..6efb7c1db5 100644 --- a/packages/utils/src/get-port.ts +++ b/packages/utils/src/get-port.ts @@ -239,6 +239,7 @@ const PROBE_ENDPOINT = '/.well-known/workflow/v1/flow?__health'; export interface ProbeOptions { endpoint?: string; timeout?: number; + candidatePorts?: number[]; } /** @@ -283,7 +284,7 @@ async function probePort( export async function getWorkflowPort( options?: ProbeOptions ): Promise { - const ports = await getAllPorts(); + const ports = options?.candidatePorts ?? (await getAllPorts()); if (ports.length === 0) { return undefined; From caf294cd52309cb5f0d833b42a91e7a8344e6ce6 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 11:52:28 -0700 Subject: [PATCH 03/18] Restore non-Next dev cleanup --- packages/core/e2e/dev.test.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/core/e2e/dev.test.ts b/packages/core/e2e/dev.test.ts index b4f8de2f49..0a8d1f99e2 100644 --- a/packages/core/e2e/dev.test.ts +++ b/packages/core/e2e/dev.test.ts @@ -143,10 +143,10 @@ export function createDevTests(config?: DevTestConfig) { }); afterEach(async () => { - // Restore file contents before clearing any added files. Keeping empty - // placeholders avoids dev-server races where generated imports briefly - // point at a missing module between the dev test and the follow-on e2e - // suite. + // Restore file contents before clearing any added files. Next's generated + // step route imports deferred copies, so it needs the empty placeholders + // to avoid a missing-module race between this suite and the follow-on e2e + // suite. Other builders should return to the original file tree. const toRestore = restoreFiles.filter((item) => item.content !== ''); const toClear = restoreFiles.filter((item) => item.content === ''); await Promise.all( @@ -155,6 +155,10 @@ export function createDevTests(config?: DevTestConfig) { if (toClear.length > 0) { await Promise.all(toClear.map((item) => fs.writeFile(item.path, ''))); await prewarm(); + if (!supportsDeferredStepCopies) { + await Promise.all(toClear.map((item) => fs.unlink(item.path))); + await prewarm(); + } } restoreFiles.length = 0; }); From 25c2289d6b10bcf4d367bfd51830b6367be101db Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 14:59:44 -0700 Subject: [PATCH 04/18] Keep Next dev temp workflow files intact --- packages/core/e2e/dev.test.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/core/e2e/dev.test.ts b/packages/core/e2e/dev.test.ts index 0a8d1f99e2..4b55a3a01f 100644 --- a/packages/core/e2e/dev.test.ts +++ b/packages/core/e2e/dev.test.ts @@ -144,18 +144,20 @@ export function createDevTests(config?: DevTestConfig) { afterEach(async () => { // Restore file contents before clearing any added files. Next's generated - // step route imports deferred copies, so it needs the empty placeholders - // to avoid a missing-module race between this suite and the follow-on e2e - // suite. Other builders should return to the original file tree. + // step route imports deferred copies, so temporary workflow files need to + // keep their real contents until the dev server shuts down. Emptying them + // can make the builder remove a copied step file while the route still + // imports it. Other builders should return to the original file tree. const toRestore = restoreFiles.filter((item) => item.content !== ''); const toClear = restoreFiles.filter((item) => item.content === ''); await Promise.all( toRestore.map((item) => fs.writeFile(item.path, item.content)) ); if (toClear.length > 0) { - await Promise.all(toClear.map((item) => fs.writeFile(item.path, ''))); await prewarm(); if (!supportsDeferredStepCopies) { + await Promise.all(toClear.map((item) => fs.writeFile(item.path, ''))); + await prewarm(); await Promise.all(toClear.map((item) => fs.unlink(item.path))); await prewarm(); } From e84e29fa5e81fd5866218dee5ec28a53aabad9cc Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 15:06:37 -0700 Subject: [PATCH 05/18] Keep dev test placeholders on disk --- packages/core/e2e/dev.test.ts | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/packages/core/e2e/dev.test.ts b/packages/core/e2e/dev.test.ts index 4b55a3a01f..074f867c21 100644 --- a/packages/core/e2e/dev.test.ts +++ b/packages/core/e2e/dev.test.ts @@ -143,11 +143,11 @@ export function createDevTests(config?: DevTestConfig) { }); afterEach(async () => { - // Restore file contents before clearing any added files. Next's generated - // step route imports deferred copies, so temporary workflow files need to - // keep their real contents until the dev server shuts down. Emptying them - // can make the builder remove a copied step file while the route still - // imports it. Other builders should return to the original file tree. + // Restore file contents before clearing any added files. Dev servers can + // keep generated imports alive briefly after a rebuild. Next's generated + // step route imports deferred copies, so added workflow files need to keep + // their real contents until shutdown. Other builders can use empty + // placeholders to drop workflow directives while avoiding missing imports. const toRestore = restoreFiles.filter((item) => item.content !== ''); const toClear = restoreFiles.filter((item) => item.content === ''); await Promise.all( @@ -158,8 +158,6 @@ export function createDevTests(config?: DevTestConfig) { if (!supportsDeferredStepCopies) { await Promise.all(toClear.map((item) => fs.writeFile(item.path, ''))); await prewarm(); - await Promise.all(toClear.map((item) => fs.unlink(item.path))); - await prewarm(); } } restoreFiles.length = 0; From a681d3832de9d9f5c3f3517c3bbb7aba89591186 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 15:26:45 -0700 Subject: [PATCH 06/18] Speed up workflow port detection --- packages/utils/src/get-port.test.ts | 8 ++--- packages/utils/src/get-port.ts | 49 +++++++++++++++++++++++++++-- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/packages/utils/src/get-port.test.ts b/packages/utils/src/get-port.test.ts index 4147ac8c5f..2163880d7a 100644 --- a/packages/utils/src/get-port.test.ts +++ b/packages/utils/src/get-port.test.ts @@ -243,7 +243,7 @@ describe('getWorkflowPort', () => { it('should identify workflow server among multiple ports', async () => { // Non-workflow server (returns 404 for all requests) - const nonWorkflowServer = http.createServer((req, res) => { + const nonWorkflowServer = http.createServer((_req, res) => { res.writeHead(404); res.end(); }); @@ -275,11 +275,11 @@ describe('getWorkflowPort', () => { it('should fall back to first port when probing fails', async () => { // Two non-workflow servers (both return 404) - const server1 = http.createServer((req, res) => { + const server1 = http.createServer((_req, res) => { res.writeHead(404); res.end(); }); - const server2 = http.createServer((req, res) => { + const server2 = http.createServer((_req, res) => { res.writeHead(404); res.end(); }); @@ -318,12 +318,10 @@ describe('getWorkflowPort', () => { await new Promise((resolve) => slowServer.listen(0, resolve)); await new Promise((resolve) => fastServer.listen(0, resolve)); - const slowAddr = slowServer.address() as AddressInfo; const fastAddr = fastServer.address() as AddressInfo; const start = Date.now(); const port = await getWorkflowPort({ timeout: 100, - candidatePorts: [slowAddr.port, fastAddr.port], }); const elapsed = Date.now() - start; diff --git a/packages/utils/src/get-port.ts b/packages/utils/src/get-port.ts index 6efb7c1db5..02e136c244 100644 --- a/packages/utils/src/get-port.ts +++ b/packages/utils/src/get-port.ts @@ -21,6 +21,47 @@ function parsePort(value: string, radix = 10): number | undefined { const join = (arr: string[], sep: string) => arr.join(sep); const PROC_ROOT = join(['', 'proc'], '/'); +interface LibuvTcpHandle { + type?: string; + is_active?: boolean; + localEndpoint?: { + port?: number; + }; + remoteEndpoint?: unknown; +} + +function getReportedPorts(): number[] { + const report = process.report?.getReport?.() as + | { libuv?: LibuvTcpHandle[] } + | undefined; + const handles = report?.libuv; + + if (!handles) { + return []; + } + + const ports: number[] = []; + const seen = new Set(); + + for (const handle of handles) { + if ( + handle.type !== 'tcp' || + handle.is_active !== true || + handle.remoteEndpoint !== null + ) { + continue; + } + + const port = parsePort(String(handle.localEndpoint?.port)); + if (port !== undefined && !seen.has(port)) { + ports.push(port); + seen.add(port); + } + } + + return ports; +} + /** * Gets ALL listening ports for the current process on Linux by reading /proc filesystem. * Returns ports in order of file descriptor (deterministic ordering). @@ -205,6 +246,11 @@ export async function getAllPorts(): Promise { const { pid, platform } = process; try { + const reportedPorts = getReportedPorts(); + if (reportedPorts.length > 0) { + return reportedPorts; + } + switch (platform) { case 'linux': return await getLinuxPorts(pid); @@ -239,7 +285,6 @@ const PROBE_ENDPOINT = '/.well-known/workflow/v1/flow?__health'; export interface ProbeOptions { endpoint?: string; timeout?: number; - candidatePorts?: number[]; } /** @@ -284,7 +329,7 @@ async function probePort( export async function getWorkflowPort( options?: ProbeOptions ): Promise { - const ports = options?.candidatePorts ?? (await getAllPorts()); + const ports = await getAllPorts(); if (ports.length === 0) { return undefined; From 982c0d605495285d62c95853876ccf027ca6ca98 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 15:43:39 -0700 Subject: [PATCH 07/18] Probe workflow health with POST --- packages/utils/src/get-port.test.ts | 15 ++++++++++++--- packages/utils/src/get-port.ts | 4 ++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/packages/utils/src/get-port.test.ts b/packages/utils/src/get-port.test.ts index 2163880d7a..36f8c1e6b6 100644 --- a/packages/utils/src/get-port.test.ts +++ b/packages/utils/src/get-port.test.ts @@ -250,9 +250,12 @@ describe('getWorkflowPort', () => { // Workflow server (returns 200 for health check endpoint) const workflowServer = http.createServer((req, res) => { - if (req.url?.includes('__health')) { + if (req.url?.includes('__health') && req.method === 'POST') { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('Workflow SDK endpoint is healthy'); + } else if (req.url?.includes('__health')) { + res.writeHead(405, { Allow: 'POST' }); + res.end(); } else if (req.url?.startsWith('/.well-known/workflow/v1/')) { res.writeHead(400, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Missing required headers' })); @@ -302,9 +305,12 @@ describe('getWorkflowPort', () => { }); // Fast workflow server (returns 200 for health check) const fastServer = http.createServer((req, res) => { - if (req.url?.includes('__health')) { + if (req.url?.includes('__health') && req.method === 'POST') { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('Workflow SDK endpoint is healthy'); + } else if (req.url?.includes('__health')) { + res.writeHead(405, { Allow: 'POST' }); + res.end(); } else if (req.url?.startsWith('/.well-known/workflow/v1/')) { res.writeHead(400); res.end(); @@ -331,9 +337,12 @@ describe('getWorkflowPort', () => { it('should handle concurrent getWorkflowPort calls', async () => { const server = http.createServer((req, res) => { - if (req.url?.includes('__health')) { + if (req.url?.includes('__health') && req.method === 'POST') { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('Workflow SDK endpoint is healthy'); + } else if (req.url?.includes('__health')) { + res.writeHead(405, { Allow: 'POST' }); + res.end(); } else if (req.url?.startsWith('/.well-known/workflow/v1/')) { res.writeHead(400); res.end(); diff --git a/packages/utils/src/get-port.ts b/packages/utils/src/get-port.ts index 02e136c244..fd4d761c32 100644 --- a/packages/utils/src/get-port.ts +++ b/packages/utils/src/get-port.ts @@ -289,7 +289,7 @@ export interface ProbeOptions { /** * Probes a port to check if it's serving the workflow HTTP server. - * Uses HEAD request to minimize overhead. + * Uses the same POST health check method as the generated workflow endpoint. * * @returns true if the port responds with a 200 status from the health check endpoint */ @@ -304,7 +304,7 @@ async function probePort( try { const response = await fetch(`http://localhost:${port}${endpoint}`, { - method: 'HEAD', + method: 'POST', signal: controller.signal, }); From 9c7bea95bbbc306da1b8f7dfa8cb0b156e7f15ec Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 16:22:12 -0700 Subject: [PATCH 08/18] Support HEAD workflow health checks --- .changeset/stable-ci-e2e-cleanup.md | 9 ++++++++- packages/astro/src/builder.ts | 18 ++++++++++++------ packages/builders/src/base-builder.ts | 7 +++++-- packages/core/e2e/e2e.test.ts | 12 ++++++++++++ packages/core/src/runtime/helpers.ts | 6 ++++++ packages/next/src/builder-deferred.ts | 2 +- packages/sveltekit/src/builder.ts | 18 ++++++++++++------ packages/utils/src/get-port.test.ts | 12 ++++++------ packages/utils/src/get-port.ts | 4 ++-- 9 files changed, 64 insertions(+), 24 deletions(-) diff --git a/.changeset/stable-ci-e2e-cleanup.md b/.changeset/stable-ci-e2e-cleanup.md index 6e3729a673..3c54c80bb7 100644 --- a/.changeset/stable-ci-e2e-cleanup.md +++ b/.changeset/stable-ci-e2e-cleanup.md @@ -1,4 +1,11 @@ --- +"@workflow/astro": patch +"@workflow/builders": patch +"@workflow/core": patch +"@workflow/next": patch +"@workflow/sveltekit": patch +"@workflow/utils": patch +"workflow": patch --- -Fix stable CI test harness failures without releasing package changes. +Fix local workflow port detection and make generated health endpoints respond to HEAD requests. diff --git a/packages/astro/src/builder.ts b/packages/astro/src/builder.ts index d13cae3f03..fa340d8897 100644 --- a/packages/astro/src/builder.ts +++ b/packages/astro/src/builder.ts @@ -120,12 +120,15 @@ export const prerender = false;\n` // Normalize request, needed for preserving request through astro stepsRouteContent = stepsRouteContent.replace( - /export\s*\{\s*stepEntrypoint\s+as\s+POST\s*\}\s*;?$/m, + /export\s*\{\s*stepEntrypoint\s+as\s+HEAD\s*,\s*stepEntrypoint\s+as\s+POST\s*\}\s*;?$/m, `${NORMALIZE_REQUEST_CODE} -export const POST = async ({request}) => { +const handleStepRequest = async ({request}) => { const normalRequest = await normalizeRequest(request); return stepEntrypoint(normalRequest); -} +}; + +export const HEAD = handleStepRequest; +export const POST = handleStepRequest; export const prerender = false;` ); @@ -157,12 +160,15 @@ export const prerender = false;` // Normalize request, needed for preserving request through astro workflowsRouteContent = workflowsRouteContent.replace( - /export const POST = workflowEntrypoint\(workflowCode\);?$/m, + /const handler = workflowEntrypoint\(workflowCode\);\n\nexport const HEAD = handler;\nexport const POST = handler;?$/m, `${NORMALIZE_REQUEST_CODE} -export const POST = async ({request}) => { +const handleWorkflowRequest = async ({request}) => { const normalRequest = await normalizeRequest(request); return workflowEntrypoint(workflowCode)(normalRequest); -} +}; + +export const HEAD = handleWorkflowRequest; +export const POST = handleWorkflowRequest; export const prerender = false;` ); diff --git a/packages/builders/src/base-builder.ts b/packages/builders/src/base-builder.ts index 27c605a7c3..aaa28aed58 100644 --- a/packages/builders/src/base-builder.ts +++ b/packages/builders/src/base-builder.ts @@ -603,7 +603,7 @@ export abstract class BaseBuilder { // Serde files for cross-context class registration ${serdeImports} // API entrypoint - export { stepEntrypoint as POST } from 'workflow/runtime';`; + export { stepEntrypoint as HEAD, stepEntrypoint as POST } from 'workflow/runtime';`; // Bundle with esbuild and our custom SWC plugin const entriesToBundle = externalizeNonSteps @@ -1006,7 +1006,10 @@ import { workflowEntrypoint } from 'workflow/runtime'; const workflowCode = \`${workflowBundleCode.replace(/[\\`$]/g, '\\$&')}\`; -export const POST = workflowEntrypoint(workflowCode);`; +const handler = workflowEntrypoint(workflowCode); + +export const HEAD = handler; +export const POST = handler;`; // we skip the final bundling step for Next.js so it can bundle itself if (!bundleFinalOutput) { diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index da7a9607d2..038b4f5091 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -1529,6 +1529,12 @@ describe('e2e', () => { '/.well-known/workflow/v1/flow?__health', deploymentUrl ); + const flowHeadRes = await fetch(flowHealthUrl, { + method: 'HEAD', + headers: await getTrustedSourcesHeaders(), + }); + expect(flowHeadRes.status).toBe(200); + const flowRes = await fetch(flowHealthUrl, { method: 'POST', headers: await getTrustedSourcesHeaders(), @@ -1551,6 +1557,12 @@ describe('e2e', () => { '/.well-known/workflow/v1/step?__health', deploymentUrl ); + const stepHeadRes = await fetch(stepHealthUrl, { + method: 'HEAD', + headers: await getTrustedSourcesHeaders(), + }); + expect(stepHeadRes.status).toBe(200); + const stepRes = await fetch(stepHealthUrl, { method: 'POST', headers: await getTrustedSourcesHeaders(), diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 7a725f12ca..1871b815e3 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -392,6 +392,12 @@ export function withHealthCheck( headers: HEALTH_CHECK_CORS_HEADERS, }); } + if (req.method === 'HEAD') { + return new Response(null, { + status: 200, + headers: HEALTH_CHECK_CORS_HEADERS, + }); + } return new Response( JSON.stringify({ healthy: true, diff --git a/packages/next/src/builder-deferred.ts b/packages/next/src/builder-deferred.ts index 86e1e58c86..eae7e8ef1d 100644 --- a/packages/next/src/builder-deferred.ts +++ b/packages/next/src/builder-deferred.ts @@ -1945,7 +1945,7 @@ export async function getNextBuilderDeferred() { serdeImports ? `// Serde files for cross-context class registration\n${serdeImports}` : '', - "export { stepEntrypoint as POST } from 'workflow/runtime';", + "export { stepEntrypoint as HEAD, stepEntrypoint as POST } from 'workflow/runtime';", ] .filter(Boolean) .join('\n'); diff --git a/packages/sveltekit/src/builder.ts b/packages/sveltekit/src/builder.ts index 3a0a988af6..8b9fe7c05c 100644 --- a/packages/sveltekit/src/builder.ts +++ b/packages/sveltekit/src/builder.ts @@ -122,12 +122,15 @@ export class SvelteKitBuilder extends BaseBuilder { // Replace the default export with SvelteKit-compatible handler stepsRouteContent = stepsRouteContent.replace( - /export\s*\{\s*stepEntrypoint\s+as\s+POST\s*\}\s*;?$/m, + /export\s*\{\s*stepEntrypoint\s+as\s+HEAD\s*,\s*stepEntrypoint\s+as\s+POST\s*\}\s*;?$/m, `${NORMALIZE_REQUEST_CODE} -export const POST = async ({request}) => { +const handleStepRequest = async ({request}) => { const normalRequest = await normalizeRequest(request); return stepEntrypoint(normalRequest); -}` +}; + +export const HEAD = handleStepRequest; +export const POST = handleStepRequest;` ); await writeFile(stepsRouteFile, stepsRouteContent); @@ -161,12 +164,15 @@ export const POST = async ({request}) => { // Replace the default export with SvelteKit-compatible handler workflowsRouteContent = workflowsRouteContent.replace( - /export const POST = workflowEntrypoint\(workflowCode\);?$/m, + /const handler = workflowEntrypoint\(workflowCode\);\n\nexport const HEAD = handler;\nexport const POST = handler;?$/m, `${NORMALIZE_REQUEST_CODE} -export const POST = async ({request}) => { +const handleWorkflowRequest = async ({request}) => { const normalRequest = await normalizeRequest(request); return workflowEntrypoint(workflowCode)(normalRequest); -}` +}; + +export const HEAD = handleWorkflowRequest; +export const POST = handleWorkflowRequest;` ); await writeFile(workflowsRouteFile, workflowsRouteContent); diff --git a/packages/utils/src/get-port.test.ts b/packages/utils/src/get-port.test.ts index 36f8c1e6b6..ce8722d33e 100644 --- a/packages/utils/src/get-port.test.ts +++ b/packages/utils/src/get-port.test.ts @@ -250,11 +250,11 @@ describe('getWorkflowPort', () => { // Workflow server (returns 200 for health check endpoint) const workflowServer = http.createServer((req, res) => { - if (req.url?.includes('__health') && req.method === 'POST') { + if (req.url?.includes('__health') && req.method === 'HEAD') { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('Workflow SDK endpoint is healthy'); } else if (req.url?.includes('__health')) { - res.writeHead(405, { Allow: 'POST' }); + res.writeHead(405, { Allow: 'HEAD' }); res.end(); } else if (req.url?.startsWith('/.well-known/workflow/v1/')) { res.writeHead(400, { 'Content-Type': 'application/json' }); @@ -305,11 +305,11 @@ describe('getWorkflowPort', () => { }); // Fast workflow server (returns 200 for health check) const fastServer = http.createServer((req, res) => { - if (req.url?.includes('__health') && req.method === 'POST') { + if (req.url?.includes('__health') && req.method === 'HEAD') { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('Workflow SDK endpoint is healthy'); } else if (req.url?.includes('__health')) { - res.writeHead(405, { Allow: 'POST' }); + res.writeHead(405, { Allow: 'HEAD' }); res.end(); } else if (req.url?.startsWith('/.well-known/workflow/v1/')) { res.writeHead(400); @@ -337,11 +337,11 @@ describe('getWorkflowPort', () => { it('should handle concurrent getWorkflowPort calls', async () => { const server = http.createServer((req, res) => { - if (req.url?.includes('__health') && req.method === 'POST') { + if (req.url?.includes('__health') && req.method === 'HEAD') { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('Workflow SDK endpoint is healthy'); } else if (req.url?.includes('__health')) { - res.writeHead(405, { Allow: 'POST' }); + res.writeHead(405, { Allow: 'HEAD' }); res.end(); } else if (req.url?.startsWith('/.well-known/workflow/v1/')) { res.writeHead(400); diff --git a/packages/utils/src/get-port.ts b/packages/utils/src/get-port.ts index fd4d761c32..02e136c244 100644 --- a/packages/utils/src/get-port.ts +++ b/packages/utils/src/get-port.ts @@ -289,7 +289,7 @@ export interface ProbeOptions { /** * Probes a port to check if it's serving the workflow HTTP server. - * Uses the same POST health check method as the generated workflow endpoint. + * Uses HEAD request to minimize overhead. * * @returns true if the port responds with a 200 status from the health check endpoint */ @@ -304,7 +304,7 @@ async function probePort( try { const response = await fetch(`http://localhost:${port}${endpoint}`, { - method: 'POST', + method: 'HEAD', signal: controller.signal, }); From 9df8f3524265745df807b5e6270a0eff8a585a98 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 16:52:32 -0700 Subject: [PATCH 09/18] Stabilize local CI health checks --- .changeset/stable-ci-e2e-cleanup.md | 1 + packages/core/e2e/local-build.test.ts | 28 +++++++++++++++---- packages/nest/src/workflow.controller.ts | 35 ++++++++++++++++-------- 3 files changed, 48 insertions(+), 16 deletions(-) diff --git a/.changeset/stable-ci-e2e-cleanup.md b/.changeset/stable-ci-e2e-cleanup.md index 3c54c80bb7..ce2762ec41 100644 --- a/.changeset/stable-ci-e2e-cleanup.md +++ b/.changeset/stable-ci-e2e-cleanup.md @@ -2,6 +2,7 @@ "@workflow/astro": patch "@workflow/builders": patch "@workflow/core": patch +"@workflow/nest": patch "@workflow/next": patch "@workflow/sveltekit": patch "@workflow/utils": patch diff --git a/packages/core/e2e/local-build.test.ts b/packages/core/e2e/local-build.test.ts index 7793a71c14..cb583ed806 100644 --- a/packages/core/e2e/local-build.test.ts +++ b/packages/core/e2e/local-build.test.ts @@ -64,6 +64,28 @@ async function runCommandWithLiveOutput( }); } +function isRetryableTurbopackLoaderFailure(error: unknown): boolean { + const message = error instanceof Error ? error.message : String(error); + return ( + message.includes('TurbopackInternalError') && + message.includes('failed to receive message') && + message.includes('evaluate_webpack_loader') + ); +} + +async function runBuildWithRetry(cwd: string): Promise { + try { + return await runCommandWithLiveOutput('pnpm', ['build'], cwd); + } catch (error) { + if (!isRetryableTurbopackLoaderFailure(error)) { + throw error; + } + + console.warn('Retrying build after Turbopack loader transport failure.'); + return await runCommandWithLiveOutput('pnpm', ['build'], cwd); + } +} + /** * Read a file if it exists, return null otherwise. */ @@ -105,11 +127,7 @@ describe.each([ return; } - const result = await runCommandWithLiveOutput( - 'pnpm', - ['build'], - getWorkbenchAppPath(project) - ); + const result = await runBuildWithRetry(getWorkbenchAppPath(project)); expect(result.output).not.toContain('Error:'); diff --git a/packages/nest/src/workflow.controller.ts b/packages/nest/src/workflow.controller.ts index 2b99a0b7d1..2bc2365e0d 100644 --- a/packages/nest/src/workflow.controller.ts +++ b/packages/nest/src/workflow.controller.ts @@ -1,6 +1,6 @@ import { readFileSync } from 'node:fs'; import { pathToFileURL } from 'node:url'; -import { All, Controller, Get, Post, Req, Res } from '@nestjs/common'; +import { All, Controller, Get, Head, Post, Req, Res } from '@nestjs/common'; import { join } from 'pathe'; // Module-level state for configuration @@ -88,23 +88,36 @@ function getOutDir(): string { export class WorkflowController { @Post('step') async handleStep(@Req() req: any, @Res() res: any) { - const outDir = getOutDir(); - const { POST } = await import( - pathToFileURL(join(outDir, 'steps.mjs')).href - ); - const webRequest = toWebRequest(req); - const webResponse = await POST(webRequest); - await sendWebResponse(res, webResponse); + await this.handleGeneratedEndpoint(req, res, 'steps.mjs'); + } + + @Head('step') + async handleStepHead(@Req() req: any, @Res() res: any) { + await this.handleGeneratedEndpoint(req, res, 'steps.mjs'); } @Post('flow') async handleFlow(@Req() req: any, @Res() res: any) { + await this.handleGeneratedEndpoint(req, res, 'workflows.mjs'); + } + + @Head('flow') + async handleFlowHead(@Req() req: any, @Res() res: any) { + await this.handleGeneratedEndpoint(req, res, 'workflows.mjs'); + } + + private async handleGeneratedEndpoint( + req: any, + res: any, + bundleFileName: 'steps.mjs' | 'workflows.mjs' + ) { const outDir = getOutDir(); - const { POST } = await import( - pathToFileURL(join(outDir, 'workflows.mjs')).href + const { HEAD, POST } = await import( + pathToFileURL(join(outDir, bundleFileName)).href ); const webRequest = toWebRequest(req); - const webResponse = await POST(webRequest); + const handler = req.method === 'HEAD' && HEAD ? HEAD : POST; + const webResponse = await handler(webRequest); await sendWebResponse(res, webResponse); } From 10d44aeedf343abfdfde1291fe8ad56c7f9e8a72 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 17:10:12 -0700 Subject: [PATCH 10/18] Retry flaky Vercel agent e2e --- packages/core/e2e/e2e-agent.test.ts | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/packages/core/e2e/e2e-agent.test.ts b/packages/core/e2e/e2e-agent.test.ts index 5584d85916..7e55d22492 100644 --- a/packages/core/e2e/e2e-agent.test.ts +++ b/packages/core/e2e/e2e-agent.test.ts @@ -37,6 +37,8 @@ async function start( // @workflow/ai step files are missing from the step bundle, causing // "doStreamStep not found" errors. Skip agent tests on canary until fixed. const isCanary = process.env.NEXT_CANARY === '1'; +const vercelProductionRetry = + process.env.WORKFLOW_VERCEL_ENV === 'production' ? 1 : 0; async function agentE2e(fn: string) { return getWorkflowMetadata( @@ -76,14 +78,18 @@ describe.skipIf(isCanary)('DurableAgent e2e', { timeout: 120_000 }, () => { expect(rv.lastStepText).toBe('The sum is 10'); }); - it('multiple sequential tool calls', async () => { - const run = await start(await agentE2e('agentMultiStepE2e'), []); - const rv = await run.returnValue; - expect(rv).toMatchObject({ - stepCount: 4, - lastStepText: 'All done!', - }); - }); + it( + 'multiple sequential tool calls', + { retry: vercelProductionRetry, timeout: 120_000 }, + async () => { + const run = await start(await agentE2e('agentMultiStepE2e'), []); + const rv = await run.returnValue; + expect(rv).toMatchObject({ + stepCount: 4, + lastStepText: 'All done!', + }); + } + ); it('tool error recovery', async () => { const run = await start(await agentE2e('agentErrorToolE2e'), []); From 71094518a025dc981859c17e5db067952b7de430 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 17:41:41 -0700 Subject: [PATCH 11/18] Wrap generated framework route exports --- packages/astro/src/builder.ts | 34 ++++++++++++++++++++++++++----- packages/sveltekit/src/builder.ts | 34 ++++++++++++++++++++++++++----- 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/packages/astro/src/builder.ts b/packages/astro/src/builder.ts index fa340d8897..16406da9aa 100644 --- a/packages/astro/src/builder.ts +++ b/packages/astro/src/builder.ts @@ -23,6 +23,24 @@ const WORKFLOW_ROUTES = [ }, ]; +function replaceGeneratedRouteExport( + content: string, + pattern: RegExp, + replacement: string, + errorMessage: string +) { + const sourceMapMarker = '\n//# sourceMappingURL='; + const sourceMapIndex = content.lastIndexOf(sourceMapMarker); + const routeCode = + sourceMapIndex === -1 ? content : content.slice(0, sourceMapIndex); + const sourceMap = sourceMapIndex === -1 ? '' : content.slice(sourceMapIndex); + const wrappedRouteCode = routeCode.replace(pattern, replacement); + if (wrappedRouteCode === routeCode) { + throw new Error(errorMessage); + } + return wrappedRouteCode + sourceMap; +} + export class LocalBuilder extends BaseBuilder { constructor() { super({ @@ -119,8 +137,9 @@ export const prerender = false;\n` let stepsRouteContent = await readFile(stepsRouteFile, 'utf-8'); // Normalize request, needed for preserving request through astro - stepsRouteContent = stepsRouteContent.replace( - /export\s*\{\s*stepEntrypoint\s+as\s+HEAD\s*,\s*stepEntrypoint\s+as\s+POST\s*\}\s*;?$/m, + stepsRouteContent = replaceGeneratedRouteExport( + stepsRouteContent, + /export\s*\{\s*stepEntrypoint\w*\s+as\s+HEAD\s*,\s*stepEntrypoint\w*\s+as\s+POST\s*\}\s*;?\s*$/m, `${NORMALIZE_REQUEST_CODE} const handleStepRequest = async ({request}) => { const normalRequest = await normalizeRequest(request); @@ -130,7 +149,8 @@ const handleStepRequest = async ({request}) => { export const HEAD = handleStepRequest; export const POST = handleStepRequest; -export const prerender = false;` +export const prerender = false;`, + 'Failed to wrap generated Astro step route' ); await writeFile(stepsRouteFile, stepsRouteContent); @@ -159,8 +179,8 @@ export const prerender = false;` let workflowsRouteContent = await readFile(workflowsRouteFile, 'utf-8'); // Normalize request, needed for preserving request through astro - workflowsRouteContent = workflowsRouteContent.replace( - /const handler = workflowEntrypoint\(workflowCode\);\n\nexport const HEAD = handler;\nexport const POST = handler;?$/m, + const wrappedWorkflowsRouteContent = workflowsRouteContent.replace( + /const handler = workflowEntrypoint\(workflowCode\);\s*export const HEAD = handler;\s*export const POST = handler;?\s*$/m, `${NORMALIZE_REQUEST_CODE} const handleWorkflowRequest = async ({request}) => { const normalRequest = await normalizeRequest(request); @@ -172,6 +192,10 @@ export const POST = handleWorkflowRequest; export const prerender = false;` ); + if (wrappedWorkflowsRouteContent === workflowsRouteContent) { + throw new Error('Failed to wrap generated Astro workflow route'); + } + workflowsRouteContent = wrappedWorkflowsRouteContent; await writeFile(workflowsRouteFile, workflowsRouteContent); return manifest; diff --git a/packages/sveltekit/src/builder.ts b/packages/sveltekit/src/builder.ts index 8b9fe7c05c..9e55b2f6e7 100644 --- a/packages/sveltekit/src/builder.ts +++ b/packages/sveltekit/src/builder.ts @@ -21,6 +21,24 @@ const SVELTEKIT_VIRTUAL_MODULES = [ '$app/*', // All $app subpaths ]; +function replaceGeneratedRouteExport( + content: string, + pattern: RegExp, + replacement: string, + errorMessage: string +) { + const sourceMapMarker = '\n//# sourceMappingURL='; + const sourceMapIndex = content.lastIndexOf(sourceMapMarker); + const routeCode = + sourceMapIndex === -1 ? content : content.slice(0, sourceMapIndex); + const sourceMap = sourceMapIndex === -1 ? '' : content.slice(sourceMapIndex); + const wrappedRouteCode = routeCode.replace(pattern, replacement); + if (wrappedRouteCode === routeCode) { + throw new Error(errorMessage); + } + return wrappedRouteCode + sourceMap; +} + export class SvelteKitBuilder extends BaseBuilder { constructor(config?: Partial) { const workingDir = config?.workingDir || process.cwd(); @@ -121,8 +139,9 @@ export class SvelteKitBuilder extends BaseBuilder { let stepsRouteContent = await readFile(stepsRouteFile, 'utf-8'); // Replace the default export with SvelteKit-compatible handler - stepsRouteContent = stepsRouteContent.replace( - /export\s*\{\s*stepEntrypoint\s+as\s+HEAD\s*,\s*stepEntrypoint\s+as\s+POST\s*\}\s*;?$/m, + stepsRouteContent = replaceGeneratedRouteExport( + stepsRouteContent, + /export\s*\{\s*stepEntrypoint\w*\s+as\s+HEAD\s*,\s*stepEntrypoint\w*\s+as\s+POST\s*\}\s*;?\s*$/m, `${NORMALIZE_REQUEST_CODE} const handleStepRequest = async ({request}) => { const normalRequest = await normalizeRequest(request); @@ -130,7 +149,8 @@ const handleStepRequest = async ({request}) => { }; export const HEAD = handleStepRequest; -export const POST = handleStepRequest;` +export const POST = handleStepRequest;`, + 'Failed to wrap generated SvelteKit step route' ); await writeFile(stepsRouteFile, stepsRouteContent); @@ -163,8 +183,8 @@ export const POST = handleStepRequest;` let workflowsRouteContent = await readFile(workflowsRouteFile, 'utf-8'); // Replace the default export with SvelteKit-compatible handler - workflowsRouteContent = workflowsRouteContent.replace( - /const handler = workflowEntrypoint\(workflowCode\);\n\nexport const HEAD = handler;\nexport const POST = handler;?$/m, + const wrappedWorkflowsRouteContent = workflowsRouteContent.replace( + /const handler = workflowEntrypoint\(workflowCode\);\s*export const HEAD = handler;\s*export const POST = handler;?\s*$/m, `${NORMALIZE_REQUEST_CODE} const handleWorkflowRequest = async ({request}) => { const normalRequest = await normalizeRequest(request); @@ -174,6 +194,10 @@ const handleWorkflowRequest = async ({request}) => { export const HEAD = handleWorkflowRequest; export const POST = handleWorkflowRequest;` ); + if (wrappedWorkflowsRouteContent === workflowsRouteContent) { + throw new Error('Failed to wrap generated SvelteKit workflow route'); + } + workflowsRouteContent = wrappedWorkflowsRouteContent; await writeFile(workflowsRouteFile, workflowsRouteContent); return manifest; From 722266f34a79dd850b317d7502de9cc5ae51bdff Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 17:58:23 -0700 Subject: [PATCH 12/18] Relax remote addTen e2e timeout --- packages/core/e2e/e2e.test.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 038b4f5091..07518eb44e 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -47,6 +47,8 @@ if (!deploymentUrl) { throw new Error('`DEPLOYMENT_URL` environment variable is not set'); } +const remoteE2ETimeout = process.env.WORKFLOW_VERCEL_ENV ? 360_000 : 60_000; + /** * Tracked wrapper around start() that automatically registers runs * for diagnostics on test failure and observability metadata collection. @@ -163,7 +165,7 @@ describe('e2e', () => { workflowFile: 'workflows/98_duplicate_case.ts', workflowFn: 'addTenWorkflow', }, - ])('addTenWorkflow', { timeout: 60_000 }, async (workflow) => { + ])('addTenWorkflow', { timeout: remoteE2ETimeout }, async (workflow) => { const run = await start( await getWorkflowMetadata( deploymentUrl, From 6409bf4309fd32c25c3d840f8a4d35acc8b034dd Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 18:22:59 -0700 Subject: [PATCH 13/18] Materialize manual webhook responses --- .changeset/stable-ci-e2e-cleanup.md | 2 +- packages/core/src/runtime/resume-hook.ts | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/.changeset/stable-ci-e2e-cleanup.md b/.changeset/stable-ci-e2e-cleanup.md index ce2762ec41..d54dba2664 100644 --- a/.changeset/stable-ci-e2e-cleanup.md +++ b/.changeset/stable-ci-e2e-cleanup.md @@ -9,4 +9,4 @@ "workflow": patch --- -Fix local workflow port detection and make generated health endpoints respond to HEAD requests. +Fix local workflow port detection, make generated health endpoints respond to HEAD requests, and materialize manual webhook response bodies before returning them. diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index f894e55a1e..6b90f2d990 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -26,6 +26,19 @@ import { waitedUntil } from '../util.js'; import { getWorkflowQueueName } from './helpers.js'; import { getWorld } from './world.js'; +async function materializeResponseBody(response: Response): Promise { + if (!response.body) { + return response; + } + + const body = await response.arrayBuffer(); + return new Response(body, { + status: response.status, + statusText: response.statusText, + headers: response.headers, + }); +} + /** * Internal helper that returns the hook, the associated workflow run, * and the resolved encryption key. @@ -297,9 +310,9 @@ export async function resumeWebhook( const reader = responseReadable.getReader(); const chunk = await reader.read(); if (chunk.value) { - response = chunk.value; + response = await materializeResponseBody(chunk.value); } - reader.cancel(); + await reader.cancel(); } if (!response) { From 83a50ebd46457cb2ba1b7fa6c10bd0c98274da8a Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 20:31:33 -0700 Subject: [PATCH 14/18] Give remote CLI inspect more time --- .changeset/stable-ci-e2e-cleanup.md | 2 +- packages/core/e2e/utils.ts | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.changeset/stable-ci-e2e-cleanup.md b/.changeset/stable-ci-e2e-cleanup.md index d54dba2664..4e6f9635dc 100644 --- a/.changeset/stable-ci-e2e-cleanup.md +++ b/.changeset/stable-ci-e2e-cleanup.md @@ -9,4 +9,4 @@ "workflow": patch --- -Fix local workflow port detection, make generated health endpoints respond to HEAD requests, and materialize manual webhook response bodies before returning them. +Fix local workflow port detection, make generated health endpoints respond to HEAD requests, materialize manual webhook response bodies before returning them, and allow remote Vercel e2e CLI inspections enough time to fetch run data. diff --git a/packages/core/e2e/utils.ts b/packages/core/e2e/utils.ts index fbb8ba256f..ac152b1ab4 100644 --- a/packages/core/e2e/utils.ts +++ b/packages/core/e2e/utils.ts @@ -11,7 +11,8 @@ import { getWorld, setWorld } from '../src/runtime'; const __dirname = dirname(fileURLToPath(import.meta.url)); const defaultCliTimeoutMs = Number( - process.env.WORKFLOW_E2E_CLI_TIMEOUT_MS ?? '20000' + process.env.WORKFLOW_E2E_CLI_TIMEOUT_MS ?? + (process.env.WORKFLOW_VERCEL_ENV ? '90000' : '20000') ); function splitArgs(raw: string): string[] { From 71833b992edc17bcf3daa6b52ee60d28a5326bcd Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 14 May 2026 22:42:53 -0700 Subject: [PATCH 15/18] Stabilize remote sleep and hook e2e checks --- .changeset/stable-ci-e2e-cleanup.md | 2 +- packages/core/e2e/e2e.test.ts | 76 ++++++++++++++++++++++++----- 2 files changed, 64 insertions(+), 14 deletions(-) diff --git a/.changeset/stable-ci-e2e-cleanup.md b/.changeset/stable-ci-e2e-cleanup.md index 4e6f9635dc..209f9679eb 100644 --- a/.changeset/stable-ci-e2e-cleanup.md +++ b/.changeset/stable-ci-e2e-cleanup.md @@ -9,4 +9,4 @@ "workflow": patch --- -Fix local workflow port detection, make generated health endpoints respond to HEAD requests, materialize manual webhook response bodies before returning them, and allow remote Vercel e2e CLI inspections enough time to fetch run data. +Fix local workflow port detection, make generated health endpoints respond to HEAD requests, materialize manual webhook response bodies before returning them, and stabilize remote Vercel e2e checks around CLI inspection, sleep timing, and hook disposal. diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 07518eb44e..5b5e06855e 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -49,6 +49,44 @@ if (!deploymentUrl) { const remoteE2ETimeout = process.env.WORKFLOW_VERCEL_ENV ? 360_000 : 60_000; +type E2EEvent = { + eventType: string; + createdAt?: string | Date; +}; + +function getEventCreatedAt(events: E2EEvent[], eventType: string): number { + const event = events.find((candidate) => candidate.eventType === eventType); + assert(event?.createdAt, `Could not find ${eventType} event timestamp`); + return new Date(event.createdAt).getTime(); +} + +async function waitForHookState( + token: string, + predicate: ( + hook: Awaited> | undefined + ) => boolean, + timeoutMs = process.env.WORKFLOW_VERCEL_ENV ? 30_000 : 10_000 +) { + const startTime = Date.now(); + + while (Date.now() - startTime < timeoutMs) { + let hook: Awaited> | undefined; + try { + hook = await getHookByToken(token); + } catch { + hook = undefined; + } + + if (predicate(hook)) { + return hook; + } + + await sleep(500); + } + + throw new Error(`Timed out waiting for hook token "${token}" state`); +} + /** * Tracked wrapper around start() that automatically registers runs * for diagnostics on test failure and observability metadata collection. @@ -525,7 +563,13 @@ describe('e2e', () => { const run = await start(await e2e('sleepingWorkflow'), []); const returnValue = await run.returnValue; expect(returnValue.startTime).toBeLessThan(returnValue.endTime); - expect(returnValue.endTime - returnValue.startTime).toBeGreaterThan(9999); + + const { json: events } = await cliInspectJson( + `events --run ${run.runId} --json` + ); + const runStartedAt = getEventCreatedAt(events, 'run_started'); + const waitCompletedAt = getEventCreatedAt(events, 'wait_completed'); + expect(waitCompletedAt - runStartedAt).toBeGreaterThan(9999); }); test('parallelSleepWorkflow', { timeout: 60_000 }, async () => { @@ -1331,11 +1375,11 @@ describe('e2e', () => { ]); // Wait for the hook to be registered by workflow 1 - await new Promise((resolve) => setTimeout(resolve, 5_000)); - - // Verify the hook exists and belongs to workflow 1 - let hook = await getHookByToken(token); - expect(hook.runId).toBe(run1.runId); + let hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run1.runId + ); + assert(hook, 'Expected hook to be registered by workflow 1'); // Send payload to first workflow - this will trigger it to dispose the hook await resumeHook(hook, { @@ -1345,7 +1389,7 @@ describe('e2e', () => { // Wait for workflow 1 to process the payload and dispose the hook // The workflow has a 5s sleep after disposal, so it's still running - await new Promise((resolve) => setTimeout(resolve, 3_000)); + await waitForHookState(token, (candidate) => candidate === undefined); // Now start workflow 2 with the SAME token while workflow 1 is still running // This should succeed because workflow 1 disposed its hook @@ -1355,11 +1399,11 @@ describe('e2e', () => { ]); // Wait for workflow 2's hook to be registered - await new Promise((resolve) => setTimeout(resolve, 5_000)); - - // Verify the hook now belongs to workflow 2 - hook = await getHookByToken(token); - expect(hook.runId).toBe(run2.runId); + hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run2.runId + ); + assert(hook, 'Expected hook to be registered by workflow 2'); // Send payload to workflow 2 await resumeHook(hook, { @@ -2065,7 +2109,13 @@ describe('e2e', () => { ); const returnValue = await run.returnValue; expect(returnValue.startTime).toBeLessThan(returnValue.endTime); - expect(returnValue.endTime - returnValue.startTime).toBeGreaterThan(9999); + + const { json: events } = await cliInspectJson( + `events --run ${run.runId} --json` + ); + const runStartedAt = getEventCreatedAt(events, 'run_started'); + const waitCompletedAt = getEventCreatedAt(events, 'wait_completed'); + expect(waitCompletedAt - runStartedAt).toBeGreaterThan(9999); }); }); From e3a7f8e71d8dab5a4bc99343c86d6f2e0a0ddd81 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Fri, 15 May 2026 00:21:55 -0700 Subject: [PATCH 16/18] Wait for step return streams before completion --- .changeset/stable-ci-e2e-cleanup.md | 2 +- packages/core/src/runtime/step-handler.ts | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/.changeset/stable-ci-e2e-cleanup.md b/.changeset/stable-ci-e2e-cleanup.md index 209f9679eb..037bb4532f 100644 --- a/.changeset/stable-ci-e2e-cleanup.md +++ b/.changeset/stable-ci-e2e-cleanup.md @@ -9,4 +9,4 @@ "workflow": patch --- -Fix local workflow port detection, make generated health endpoints respond to HEAD requests, materialize manual webhook response bodies before returning them, and stabilize remote Vercel e2e checks around CLI inspection, sleep timing, and hook disposal. +Fix local workflow port detection, make generated health endpoints respond to HEAD requests, materialize manual webhook response bodies before returning them, wait for step return stream serialization before completing the step, and stabilize remote Vercel e2e checks around CLI inspection, sleep timing, and hook disposal. diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 6bf0057a60..006258140b 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -12,7 +12,11 @@ import { } from '@workflow/errors'; import { pluralize } from '@workflow/utils'; import { getPort } from '@workflow/utils/get-port'; -import { SPEC_VERSION_CURRENT, StepInvokePayloadSchema } from '@workflow/world'; +import { + SPEC_VERSION_CURRENT, + type Step, + StepInvokePayloadSchema, +} from '@workflow/world'; import { importKey } from '../encryption.js'; import { runtimeLogger, stepLogger } from '../logger.js'; import { getStepFunction } from '../private.js'; @@ -178,7 +182,7 @@ const stepHandler = createQueueHandler( // - Step not in terminal state (returns 409) // - retryAfter timestamp reached (returns 425 with Retry-After header) // - Workflow still active (returns 410 if completed) - let step; + let step: Step; try { const startResult = await world.events.create( workflowRunId, @@ -780,12 +784,14 @@ const stepHandler = createQueueHandler( // The workflow runtime must be resilient to the below code not executing on a failed step result = await trace('step.dehydrate', {}, async (dehydrateSpan) => { const startTime = Date.now(); + const returnValueOpsStart = ops.length; const dehydrated = await dehydrateStepReturnValue( result, workflowRunId, encryptionKey, ops ); + await Promise.all(ops.slice(returnValueOpsStart)); const durationMs = Date.now() - startTime; dehydrateSpan?.setAttributes({ ...Attribute.QueueSerializeTimeMs(durationMs), From 5561fc63e15ee8cf608112efdcc8141b74e03d49 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Fri, 15 May 2026 03:35:34 -0700 Subject: [PATCH 17/18] Stabilize hook and stream e2e waits --- .changeset/stable-ci-e2e-cleanup.md | 3 +- packages/core/e2e/e2e.test.ts | 82 ++++++++++++++-------- packages/world-vercel/src/streamer.test.ts | 26 +++++++ packages/world-vercel/src/streamer.ts | 51 ++++++++++++-- 4 files changed, 127 insertions(+), 35 deletions(-) diff --git a/.changeset/stable-ci-e2e-cleanup.md b/.changeset/stable-ci-e2e-cleanup.md index 037bb4532f..9dc5a92092 100644 --- a/.changeset/stable-ci-e2e-cleanup.md +++ b/.changeset/stable-ci-e2e-cleanup.md @@ -6,7 +6,8 @@ "@workflow/next": patch "@workflow/sveltekit": patch "@workflow/utils": patch +"@workflow/world-vercel": patch "workflow": patch --- -Fix local workflow port detection, make generated health endpoints respond to HEAD requests, materialize manual webhook response bodies before returning them, wait for step return stream serialization before completing the step, and stabilize remote Vercel e2e checks around CLI inspection, sleep timing, and hook disposal. +Fix local workflow port detection, make generated health endpoints respond to HEAD requests, materialize manual webhook response bodies before returning them, wait for step return stream serialization before completing the step, bound Vercel stream mutations so stuck writes retry instead of hanging, and stabilize remote Vercel e2e checks around CLI inspection, sleep timing, and hook registration/disposal. diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 5b5e06855e..be51c132be 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -349,12 +349,12 @@ describe('e2e', () => { const run = await start(await e2e('hookWorkflow'), [token, customData]); - // Wait a few seconds so that the hook is registered. - // TODO: make this more efficient when we add subscription support. - await new Promise((resolve) => setTimeout(resolve, 5_000)); - // Look up the hook and resume it with the first payload - let hook = await getHookByToken(token); + let hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered'); expect(hook.runId).toBe(run.runId); await resumeHook(hook, { message: 'one', @@ -365,7 +365,11 @@ describe('e2e', () => { await expect(getHookByToken('invalid')).rejects.toThrow(/not found/i); // Resume with second payload - hook = await getHookByToken(token); + hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered after first payload'); expect(hook.runId).toBe(run.runId); await resumeHook(hook, { message: 'two', @@ -373,7 +377,11 @@ describe('e2e', () => { }); // Resume with third (final) payload - hook = await getHookByToken(token); + hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered after second payload'); expect(hook.runId).toBe(run.runId); await resumeHook(hook, { message: 'three', @@ -404,11 +412,12 @@ describe('e2e', () => { const run = await start(await e2e('hookWorkflow'), [token, customData]); - // Wait for the hook to be registered - await new Promise((resolve) => setTimeout(resolve, 5_000)); - // Verify the hook exists via server-side API - const hook = await getHookByToken(token); + const hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered'); expect(hook.runId).toBe(run.runId); // Attempt to resume via the public webhook endpoint — should get 404 @@ -1253,11 +1262,12 @@ describe('e2e', () => { customData, ]); - // Wait for hook to be registered - await new Promise((resolve) => setTimeout(resolve, 5_000)); - // Send payload to first workflow - let hook = await getHookByToken(token); + let hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run1.runId + ); + assert(hook, 'Expected hook to be registered by workflow 1'); expect(hook.runId).toBe(run1.runId); await resumeHook(hook, { message: 'test-message-1', @@ -1278,11 +1288,12 @@ describe('e2e', () => { customData, ]); - // Wait for hook to be registered - await new Promise((resolve) => setTimeout(resolve, 5_000)); - // Send payload to second workflow using same token - hook = await getHookByToken(token); + hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run2.runId + ); + assert(hook, 'Expected hook to be registered by workflow 2'); expect(hook.runId).toBe(run2.runId); await resumeHook(hook, { message: 'test-message-2', @@ -1319,8 +1330,10 @@ describe('e2e', () => { customData, ]); - // Wait for the hook to be registered by workflow 1 - await new Promise((resolve) => setTimeout(resolve, 5_000)); + await waitForHookState( + token, + (candidate) => candidate?.runId === run1.runId + ); // Start second workflow with the SAME token while first is still running // This should fail because the hook token is already in use @@ -1342,7 +1355,11 @@ describe('e2e', () => { expect(run2Data.status).toBe('failed'); // Now send a payload to complete workflow 1 - const hook = await getHookByToken(token); + const hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run1.runId + ); + assert(hook, 'Expected hook to still belong to workflow 1'); await resumeHook(hook, { message: 'test-concurrent', customData: (hook.metadata as any)?.customData, @@ -2131,23 +2148,32 @@ describe('e2e', () => { const run = await start(await e2e('hookWithSleepWorkflow'), [token]); - // Wait for the hook to be registered - await new Promise((resolve) => setTimeout(resolve, 5_000)); - // Send 3 payloads: two normal ones, then one with done=true - let hook = await getHookByToken(token); + let hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered'); expect(hook.runId).toBe(run.runId); await resumeHook(hook, { type: 'subscribe', id: 1 }); // Wait for the first payload to be processed (step must complete) await new Promise((resolve) => setTimeout(resolve, 3_000)); - hook = await getHookByToken(token); + hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered after first payload'); await resumeHook(hook, { type: 'subscribe', id: 2 }); await new Promise((resolve) => setTimeout(resolve, 3_000)); - hook = await getHookByToken(token); + hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered after second payload'); await resumeHook(hook, { type: 'done', done: true }); const returnValue = await run.returnValue; diff --git a/packages/world-vercel/src/streamer.test.ts b/packages/world-vercel/src/streamer.test.ts index f3046d5832..9cb8f0c95a 100644 --- a/packages/world-vercel/src/streamer.test.ts +++ b/packages/world-vercel/src/streamer.test.ts @@ -226,6 +226,32 @@ describe('writeToStreamMulti pagination', () => { expect(fetchSpy).toHaveBeenCalledTimes(1); }); + it('adds an abort signal to stream writes', async () => { + const fetchSpy = vi + .spyOn(globalThis, 'fetch') + .mockImplementation(async () => new Response('ok')); + + const streamer = await getStreamer(); + + await streamer.writeToStream('s', 'run-1', new Uint8Array([1])); + + expect(fetchSpy).toHaveBeenCalledTimes(1); + expect(fetchSpy.mock.calls[0]?.[1]?.signal).toBeInstanceOf(AbortSignal); + }); + + it('adds an abort signal to stream closes', async () => { + const fetchSpy = vi + .spyOn(globalThis, 'fetch') + .mockImplementation(async () => new Response('ok')); + + const streamer = await getStreamer(); + + await streamer.closeStream('s', 'run-1'); + + expect(fetchSpy).toHaveBeenCalledTimes(1); + expect(fetchSpy.mock.calls[0]?.[1]?.signal).toBeInstanceOf(AbortSignal); + }); + it('paginates into multiple requests when chunks > MAX_CHUNKS_PER_REQUEST', async () => { const fetchSpy = vi .spyOn(globalThis, 'fetch') diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index 6945a98edc..bc9baa6c90 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -17,6 +17,7 @@ import { * MAX_CHUNKS_PER_BATCH. Larger batches are split into multiple requests. */ export const MAX_CHUNKS_PER_REQUEST = 1000; +const DEFAULT_STREAM_MUTATION_TIMEOUT_MS = 30_000; // Streaming calls use plain fetch() without the undici dispatcher. // The dispatcher's retry logic doesn't apply well to streaming operations @@ -36,6 +37,41 @@ function getStreamUrl( return new URL(`${httpConfig.baseUrl}/v2/stream/${encodeURIComponent(name)}`); } +function getStreamMutationTimeoutMs() { + const parsed = Number.parseInt( + process.env.WORKFLOW_VERCEL_STREAM_TIMEOUT_MS ?? '', + 10 + ); + if (Number.isFinite(parsed) && parsed > 0) { + return parsed; + } + return DEFAULT_STREAM_MUTATION_TIMEOUT_MS; +} + +async function fetchStreamMutation( + url: URL, + init: RequestInit, + operation: 'write' | 'close' +) { + const timeoutMs = getStreamMutationTimeoutMs(); + try { + return await fetch(url, { + ...init, + signal: AbortSignal.timeout(timeoutMs), + }); + } catch (err) { + if ( + err instanceof Error && + (err.name === 'AbortError' || err.name === 'TimeoutError') + ) { + throw new Error(`Stream ${operation} timed out after ${timeoutMs}ms`, { + cause: err, + }); + } + throw err; + } +} + /** * Encode multiple chunks into a length-prefixed binary format. * Format: [4 bytes big-endian length][chunk bytes][4 bytes length][chunk bytes]... @@ -107,13 +143,14 @@ export function createStreamer(config?: APIConfig): Streamer { const resolvedRunId = await runId; const httpConfig = await getHttpConfig(config); - const response = await fetch( + const response = await fetchStreamMutation( getStreamUrl(name, resolvedRunId, httpConfig), { method: 'PUT', body: chunk, headers: httpConfig.headers, - } + }, + 'write' ); const text = await response.text(); if (!response.ok) { @@ -149,13 +186,14 @@ export function createStreamer(config?: APIConfig): Streamer { for (let i = 0; i < chunks.length; i += MAX_CHUNKS_PER_REQUEST) { const batch = chunks.slice(i, i + MAX_CHUNKS_PER_REQUEST); const body = encodeMultiChunks(batch); - const response = await fetch( + const response = await fetchStreamMutation( getStreamUrl(name, resolvedRunId, httpConfig), { method: 'PUT', body, headers: httpConfig.headers, - } + }, + 'write' ); const text = await response.text(); if (!response.ok) { @@ -172,12 +210,13 @@ export function createStreamer(config?: APIConfig): Streamer { const httpConfig = await getHttpConfig(config); httpConfig.headers.set('X-Stream-Done', 'true'); - const response = await fetch( + const response = await fetchStreamMutation( getStreamUrl(name, resolvedRunId, httpConfig), { method: 'PUT', headers: httpConfig.headers, - } + }, + 'close' ); const text = await response.text(); if (!response.ok) { From 5daac2c877b1e5392cf7fb84edcbfb3efbe72a24 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Fri, 15 May 2026 03:50:53 -0700 Subject: [PATCH 18/18] Bound queue health check timeouts --- .changeset/stable-ci-e2e-cleanup.md | 2 +- packages/core/src/runtime/helpers.test.ts | 59 +++++++++- packages/core/src/runtime/helpers.ts | 137 ++++++++++++++++------ 3 files changed, 159 insertions(+), 39 deletions(-) diff --git a/.changeset/stable-ci-e2e-cleanup.md b/.changeset/stable-ci-e2e-cleanup.md index 9dc5a92092..506b01f3c6 100644 --- a/.changeset/stable-ci-e2e-cleanup.md +++ b/.changeset/stable-ci-e2e-cleanup.md @@ -10,4 +10,4 @@ "workflow": patch --- -Fix local workflow port detection, make generated health endpoints respond to HEAD requests, materialize manual webhook response bodies before returning them, wait for step return stream serialization before completing the step, bound Vercel stream mutations so stuck writes retry instead of hanging, and stabilize remote Vercel e2e checks around CLI inspection, sleep timing, and hook registration/disposal. +Fix local workflow port detection, make generated health endpoints respond to HEAD requests, materialize manual webhook response bodies before returning them, wait for step return stream serialization before completing the step, bound Vercel stream and health-check operations so stuck writes or queue sends retry or time out instead of hanging, and stabilize remote Vercel e2e checks around CLI inspection, sleep timing, and hook registration/disposal. diff --git a/packages/core/src/runtime/helpers.test.ts b/packages/core/src/runtime/helpers.test.ts index 3844a84c2f..176340f9dc 100644 --- a/packages/core/src/runtime/helpers.test.ts +++ b/packages/core/src/runtime/helpers.test.ts @@ -1,5 +1,6 @@ +import type { World } from '@workflow/world'; import { describe, expect, it, vi } from 'vitest'; -import { getWorkflowQueueName } from './helpers.js'; +import { getWorkflowQueueName, healthCheck } from './helpers.js'; // Mock the logger to suppress output during tests vi.mock('../logger.js', () => ({ @@ -80,3 +81,59 @@ describe('getWorkflowQueueName', () => { expect(() => getWorkflowQueueName('')).toThrow('Invalid workflow name'); }); }); + +describe('healthCheck', () => { + it('returns unhealthy when queue delivery does not settle before the timeout', async () => { + const world = { + queue: vi.fn(() => new Promise(() => {})), + readFromStream: vi.fn(), + } as unknown as World; + + const result = await healthCheck(world, 'workflow', { timeout: 10 }); + + expect(result).toEqual({ + healthy: false, + error: 'Health check timed out after 10ms', + }); + expect(world.queue).toHaveBeenCalledWith( + '__wkf_workflow_health_check', + { + __healthCheck: true, + correlationId: expect.any(String), + }, + { + specVersion: 1, + deploymentId: undefined, + } + ); + expect(world.readFromStream).not.toHaveBeenCalled(); + }); + + it('returns unhealthy when opening the response stream does not settle before the timeout', async () => { + const world = { + queue: vi.fn().mockResolvedValue({ messageId: null }), + readFromStream: vi.fn(() => new Promise(() => {})), + } as unknown as World; + + const result = await healthCheck(world, 'step', { timeout: 10 }); + + expect(result).toEqual({ + healthy: false, + error: 'Health check timed out after 10ms', + }); + expect(world.queue).toHaveBeenCalledWith( + '__wkf_step_health_check', + { + __healthCheck: true, + correlationId: expect.any(String), + }, + { + specVersion: 1, + deploymentId: undefined, + } + ); + expect(world.readFromStream).toHaveBeenCalledWith( + expect.stringMatching(/^__health_check__/) + ); + }); +}); diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 1871b815e3..babcf02b89 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -143,6 +143,56 @@ const HEALTH_CHECK_POLL_INTERVAL = 100; // (which doesn't work across processes) const HEALTH_CHECK_READ_TIMEOUT = 500; +class HealthCheckTimeoutError extends Error { + constructor(timeout: number) { + super(`Health check timed out after ${timeout}ms`); + } +} + +function getHealthCheckTimeRemaining(startTime: number, timeout: number) { + return timeout - (Date.now() - startTime); +} + +async function withHealthCheckTimeout( + promise: Promise, + startTime: number, + timeout: number +): Promise { + const remaining = getHealthCheckTimeRemaining(startTime, timeout); + if (remaining <= 0) { + throw new HealthCheckTimeoutError(timeout); + } + + let timeoutId: ReturnType | undefined; + try { + return await Promise.race([ + promise, + new Promise((_, reject) => { + timeoutId = setTimeout( + () => reject(new HealthCheckTimeoutError(timeout)), + remaining + ); + }), + ]); + } finally { + if (timeoutId) { + clearTimeout(timeoutId); + } + } +} + +function wait(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function waitForNextHealthCheckPoll(startTime: number, timeout: number) { + const remaining = getHealthCheckTimeRemaining(startTime, timeout); + if (remaining <= 0) { + return; + } + await wait(Math.min(HEALTH_CHECK_POLL_INTERVAL, remaining)); +} + /** * Read chunks from a stream with a timeout per read operation. * Returns { chunks, timedOut } where timedOut indicates if a read timed out. @@ -223,6 +273,32 @@ function parseHealthCheckResponse( return parsed; } +async function readHealthCheckResponse( + world: World, + streamName: string, + startTime: number, + timeout: number +): Promise<{ healthy: boolean } | null> { + const stream = await withHealthCheckTimeout( + world.readFromStream(streamName), + startTime, + timeout + ); + const reader = stream.getReader(); + const readTimeout = Math.min( + HEALTH_CHECK_READ_TIMEOUT, + Math.max(1, getHealthCheckTimeRemaining(startTime, timeout)) + ); + const { chunks, timedOut } = await readStreamWithTimeout(reader, readTimeout); + + if (timedOut) { + await reader.cancel().catch(() => {}); + return null; + } + + return parseHealthCheckResponse(chunks); +} + export async function healthCheck( world: World, endpoint: HealthCheckEndpoint, @@ -240,54 +316,41 @@ export async function healthCheck( const startTime = Date.now(); try { - await world.queue( - queueName, - { __healthCheck: true, correlationId }, - { - // Use JSON transport so the health check works against both - // old (JSON-only) and new (dual) deployments. - specVersion: SPEC_VERSION_LEGACY, - deploymentId: options?.deploymentId, - } + await withHealthCheckTimeout( + world.queue( + queueName, + { __healthCheck: true, correlationId }, + { + // Use JSON transport so the health check works against both + // old (JSON-only) and new (dual) deployments. + specVersion: SPEC_VERSION_LEGACY, + deploymentId: options?.deploymentId, + } + ), + startTime, + timeout ); - while (Date.now() - startTime < timeout) { + while (getHealthCheckTimeRemaining(startTime, timeout) > 0) { try { - const stream = await world.readFromStream(streamName); - const reader = stream.getReader(); - const { chunks, timedOut } = await readStreamWithTimeout( - reader, - HEALTH_CHECK_READ_TIMEOUT + const response = await readHealthCheckResponse( + world, + streamName, + startTime, + timeout ); - - if (timedOut) { - try { - reader.cancel(); - } catch { - // Ignore cancel errors - } - await new Promise((resolve) => - setTimeout(resolve, HEALTH_CHECK_POLL_INTERVAL) - ); - continue; - } - - const response = parseHealthCheckResponse(chunks); if (response) { return { ...response, latencyMs: Date.now() - startTime, }; } - - await new Promise((resolve) => - setTimeout(resolve, HEALTH_CHECK_POLL_INTERVAL) - ); - } catch { - await new Promise((resolve) => - setTimeout(resolve, HEALTH_CHECK_POLL_INTERVAL) - ); + } catch (error) { + if (error instanceof HealthCheckTimeoutError) { + throw error; + } } + await waitForNextHealthCheckPoll(startTime, timeout); } return { healthy: false,