diff --git a/.changeset/stable-ci-e2e-cleanup.md b/.changeset/stable-ci-e2e-cleanup.md new file mode 100644 index 0000000000..506b01f3c6 --- /dev/null +++ b/.changeset/stable-ci-e2e-cleanup.md @@ -0,0 +1,13 @@ +--- +"@workflow/astro": patch +"@workflow/builders": patch +"@workflow/core": patch +"@workflow/nest": patch +"@workflow/next": patch +"@workflow/sveltekit": patch +"@workflow/utils": patch +"@workflow/world-vercel": patch +"workflow": patch +--- + +Fix local workflow port detection, make generated health endpoints respond to HEAD requests, materialize manual webhook response bodies before returning them, wait for step return stream serialization before completing the step, bound Vercel stream and health-check operations so stuck writes or queue sends retry or time out instead of hanging, and stabilize remote Vercel e2e checks around CLI inspection, sleep timing, and hook registration/disposal. diff --git a/packages/astro/src/builder.ts b/packages/astro/src/builder.ts index d13cae3f03..16406da9aa 100644 --- a/packages/astro/src/builder.ts +++ b/packages/astro/src/builder.ts @@ -23,6 +23,24 @@ const WORKFLOW_ROUTES = [ }, ]; +function replaceGeneratedRouteExport( + content: string, + pattern: RegExp, + replacement: string, + errorMessage: string +) { + const sourceMapMarker = '\n//# sourceMappingURL='; + const sourceMapIndex = content.lastIndexOf(sourceMapMarker); + const routeCode = + sourceMapIndex === -1 ? content : content.slice(0, sourceMapIndex); + const sourceMap = sourceMapIndex === -1 ? '' : content.slice(sourceMapIndex); + const wrappedRouteCode = routeCode.replace(pattern, replacement); + if (wrappedRouteCode === routeCode) { + throw new Error(errorMessage); + } + return wrappedRouteCode + sourceMap; +} + export class LocalBuilder extends BaseBuilder { constructor() { super({ @@ -119,15 +137,20 @@ export const prerender = false;\n` let stepsRouteContent = await readFile(stepsRouteFile, 'utf-8'); // Normalize request, needed for preserving request through astro - stepsRouteContent = stepsRouteContent.replace( - /export\s*\{\s*stepEntrypoint\s+as\s+POST\s*\}\s*;?$/m, + stepsRouteContent = replaceGeneratedRouteExport( + stepsRouteContent, + /export\s*\{\s*stepEntrypoint\w*\s+as\s+HEAD\s*,\s*stepEntrypoint\w*\s+as\s+POST\s*\}\s*;?\s*$/m, `${NORMALIZE_REQUEST_CODE} -export const POST = async ({request}) => { +const handleStepRequest = async ({request}) => { const normalRequest = await normalizeRequest(request); return stepEntrypoint(normalRequest); -} +}; -export const prerender = false;` +export const HEAD = handleStepRequest; +export const POST = handleStepRequest; + +export const prerender = false;`, + 'Failed to wrap generated Astro step route' ); await writeFile(stepsRouteFile, stepsRouteContent); @@ -156,16 +179,23 @@ export const prerender = false;` let workflowsRouteContent = await readFile(workflowsRouteFile, 'utf-8'); // Normalize request, needed for preserving request through astro - workflowsRouteContent = workflowsRouteContent.replace( - /export const POST = workflowEntrypoint\(workflowCode\);?$/m, + const wrappedWorkflowsRouteContent = workflowsRouteContent.replace( + /const handler = workflowEntrypoint\(workflowCode\);\s*export const HEAD = handler;\s*export const POST = handler;?\s*$/m, `${NORMALIZE_REQUEST_CODE} -export const POST = async ({request}) => { +const handleWorkflowRequest = async ({request}) => { const normalRequest = await normalizeRequest(request); return workflowEntrypoint(workflowCode)(normalRequest); -} +}; + +export const HEAD = handleWorkflowRequest; +export const POST = handleWorkflowRequest; export const prerender = false;` ); + if (wrappedWorkflowsRouteContent === workflowsRouteContent) { + throw new Error('Failed to wrap generated Astro workflow route'); + } + workflowsRouteContent = wrappedWorkflowsRouteContent; await writeFile(workflowsRouteFile, workflowsRouteContent); return manifest; diff --git a/packages/builders/src/base-builder.ts b/packages/builders/src/base-builder.ts index 27c605a7c3..aaa28aed58 100644 --- a/packages/builders/src/base-builder.ts +++ b/packages/builders/src/base-builder.ts @@ -603,7 +603,7 @@ export abstract class BaseBuilder { // Serde files for cross-context class registration ${serdeImports} // API entrypoint - export { stepEntrypoint as POST } from 'workflow/runtime';`; + export { stepEntrypoint as HEAD, stepEntrypoint as POST } from 'workflow/runtime';`; // Bundle with esbuild and our custom SWC plugin const entriesToBundle = externalizeNonSteps @@ -1006,7 +1006,10 @@ import { workflowEntrypoint } from 'workflow/runtime'; const workflowCode = \`${workflowBundleCode.replace(/[\\`$]/g, '\\$&')}\`; -export const POST = workflowEntrypoint(workflowCode);`; +const handler = workflowEntrypoint(workflowCode); + +export const HEAD = handler; +export const POST = handler;`; // we skip the final bundling step for Next.js so it can bundle itself if (!bundleFinalOutput) { diff --git a/packages/core/e2e/dev.test.ts b/packages/core/e2e/dev.test.ts index a6f40b7649..074f867c21 100644 --- a/packages/core/e2e/dev.test.ts +++ b/packages/core/e2e/dev.test.ts @@ -94,6 +94,12 @@ export function createDevTests(config?: DevTestConfig) { await Promise.all([ fetchWithTimeout('/').catch(() => {}), fetchWithTimeout('/api/chat').catch(() => {}), + fetchWithTimeout('/.well-known/workflow/v1/flow?__health').catch( + () => {} + ), + fetchWithTimeout('/.well-known/workflow/v1/step?__health').catch( + () => {} + ), ]); }; @@ -137,21 +143,23 @@ export function createDevTests(config?: DevTestConfig) { }); afterEach(async () => { - // Restore file contents before deleting any files. If a deletion races - // ahead of an api-file restore, the dev server briefly sees an import - // pointing at a missing module and fails compilation. On Windows that - // failure can stick — Turbopack leaves stale imports in the generated - // step route bundle — and every subsequent step request returns 500. + // Restore file contents before clearing any added files. Dev servers can + // keep generated imports alive briefly after a rebuild. Next's generated + // step route imports deferred copies, so added workflow files need to keep + // their real contents until shutdown. Other builders can use empty + // placeholders to drop workflow directives while avoiding missing imports. const toRestore = restoreFiles.filter((item) => item.content !== ''); - const toDelete = restoreFiles.filter((item) => item.content === ''); + const toClear = restoreFiles.filter((item) => item.content === ''); await Promise.all( toRestore.map((item) => fs.writeFile(item.path, item.content)) ); - if (toDelete.length > 0) { + if (toClear.length > 0) { await prewarm(); + if (!supportsDeferredStepCopies) { + await Promise.all(toClear.map((item) => fs.writeFile(item.path, ''))); + await prewarm(); + } } - await Promise.all(toDelete.map((item) => fs.unlink(item.path))); - await prewarm(); restoreFiles.length = 0; }); diff --git a/packages/core/e2e/e2e-agent.test.ts b/packages/core/e2e/e2e-agent.test.ts index 5584d85916..7e55d22492 100644 --- a/packages/core/e2e/e2e-agent.test.ts +++ b/packages/core/e2e/e2e-agent.test.ts @@ -37,6 +37,8 @@ async function start( // @workflow/ai step files are missing from the step bundle, causing // "doStreamStep not found" errors. Skip agent tests on canary until fixed. const isCanary = process.env.NEXT_CANARY === '1'; +const vercelProductionRetry = + process.env.WORKFLOW_VERCEL_ENV === 'production' ? 1 : 0; async function agentE2e(fn: string) { return getWorkflowMetadata( @@ -76,14 +78,18 @@ describe.skipIf(isCanary)('DurableAgent e2e', { timeout: 120_000 }, () => { expect(rv.lastStepText).toBe('The sum is 10'); }); - it('multiple sequential tool calls', async () => { - const run = await start(await agentE2e('agentMultiStepE2e'), []); - const rv = await run.returnValue; - expect(rv).toMatchObject({ - stepCount: 4, - lastStepText: 'All done!', - }); - }); + it( + 'multiple sequential tool calls', + { retry: vercelProductionRetry, timeout: 120_000 }, + async () => { + const run = await start(await agentE2e('agentMultiStepE2e'), []); + const rv = await run.returnValue; + expect(rv).toMatchObject({ + stepCount: 4, + lastStepText: 'All done!', + }); + } + ); it('tool error recovery', async () => { const run = await start(await agentE2e('agentErrorToolE2e'), []); diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index da7a9607d2..be51c132be 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -47,6 +47,46 @@ if (!deploymentUrl) { throw new Error('`DEPLOYMENT_URL` environment variable is not set'); } +const remoteE2ETimeout = process.env.WORKFLOW_VERCEL_ENV ? 360_000 : 60_000; + +type E2EEvent = { + eventType: string; + createdAt?: string | Date; +}; + +function getEventCreatedAt(events: E2EEvent[], eventType: string): number { + const event = events.find((candidate) => candidate.eventType === eventType); + assert(event?.createdAt, `Could not find ${eventType} event timestamp`); + return new Date(event.createdAt).getTime(); +} + +async function waitForHookState( + token: string, + predicate: ( + hook: Awaited> | undefined + ) => boolean, + timeoutMs = process.env.WORKFLOW_VERCEL_ENV ? 30_000 : 10_000 +) { + const startTime = Date.now(); + + while (Date.now() - startTime < timeoutMs) { + let hook: Awaited> | undefined; + try { + hook = await getHookByToken(token); + } catch { + hook = undefined; + } + + if (predicate(hook)) { + return hook; + } + + await sleep(500); + } + + throw new Error(`Timed out waiting for hook token "${token}" state`); +} + /** * Tracked wrapper around start() that automatically registers runs * for diagnostics on test failure and observability metadata collection. @@ -163,7 +203,7 @@ describe('e2e', () => { workflowFile: 'workflows/98_duplicate_case.ts', workflowFn: 'addTenWorkflow', }, - ])('addTenWorkflow', { timeout: 60_000 }, async (workflow) => { + ])('addTenWorkflow', { timeout: remoteE2ETimeout }, async (workflow) => { const run = await start( await getWorkflowMetadata( deploymentUrl, @@ -309,12 +349,12 @@ describe('e2e', () => { const run = await start(await e2e('hookWorkflow'), [token, customData]); - // Wait a few seconds so that the hook is registered. - // TODO: make this more efficient when we add subscription support. - await new Promise((resolve) => setTimeout(resolve, 5_000)); - // Look up the hook and resume it with the first payload - let hook = await getHookByToken(token); + let hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered'); expect(hook.runId).toBe(run.runId); await resumeHook(hook, { message: 'one', @@ -325,7 +365,11 @@ describe('e2e', () => { await expect(getHookByToken('invalid')).rejects.toThrow(/not found/i); // Resume with second payload - hook = await getHookByToken(token); + hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered after first payload'); expect(hook.runId).toBe(run.runId); await resumeHook(hook, { message: 'two', @@ -333,7 +377,11 @@ describe('e2e', () => { }); // Resume with third (final) payload - hook = await getHookByToken(token); + hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered after second payload'); expect(hook.runId).toBe(run.runId); await resumeHook(hook, { message: 'three', @@ -364,11 +412,12 @@ describe('e2e', () => { const run = await start(await e2e('hookWorkflow'), [token, customData]); - // Wait for the hook to be registered - await new Promise((resolve) => setTimeout(resolve, 5_000)); - // Verify the hook exists via server-side API - const hook = await getHookByToken(token); + const hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered'); expect(hook.runId).toBe(run.runId); // Attempt to resume via the public webhook endpoint — should get 404 @@ -523,7 +572,13 @@ describe('e2e', () => { const run = await start(await e2e('sleepingWorkflow'), []); const returnValue = await run.returnValue; expect(returnValue.startTime).toBeLessThan(returnValue.endTime); - expect(returnValue.endTime - returnValue.startTime).toBeGreaterThan(9999); + + const { json: events } = await cliInspectJson( + `events --run ${run.runId} --json` + ); + const runStartedAt = getEventCreatedAt(events, 'run_started'); + const waitCompletedAt = getEventCreatedAt(events, 'wait_completed'); + expect(waitCompletedAt - runStartedAt).toBeGreaterThan(9999); }); test('parallelSleepWorkflow', { timeout: 60_000 }, async () => { @@ -1207,11 +1262,12 @@ describe('e2e', () => { customData, ]); - // Wait for hook to be registered - await new Promise((resolve) => setTimeout(resolve, 5_000)); - // Send payload to first workflow - let hook = await getHookByToken(token); + let hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run1.runId + ); + assert(hook, 'Expected hook to be registered by workflow 1'); expect(hook.runId).toBe(run1.runId); await resumeHook(hook, { message: 'test-message-1', @@ -1232,11 +1288,12 @@ describe('e2e', () => { customData, ]); - // Wait for hook to be registered - await new Promise((resolve) => setTimeout(resolve, 5_000)); - // Send payload to second workflow using same token - hook = await getHookByToken(token); + hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run2.runId + ); + assert(hook, 'Expected hook to be registered by workflow 2'); expect(hook.runId).toBe(run2.runId); await resumeHook(hook, { message: 'test-message-2', @@ -1273,8 +1330,10 @@ describe('e2e', () => { customData, ]); - // Wait for the hook to be registered by workflow 1 - await new Promise((resolve) => setTimeout(resolve, 5_000)); + await waitForHookState( + token, + (candidate) => candidate?.runId === run1.runId + ); // Start second workflow with the SAME token while first is still running // This should fail because the hook token is already in use @@ -1296,7 +1355,11 @@ describe('e2e', () => { expect(run2Data.status).toBe('failed'); // Now send a payload to complete workflow 1 - const hook = await getHookByToken(token); + const hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run1.runId + ); + assert(hook, 'Expected hook to still belong to workflow 1'); await resumeHook(hook, { message: 'test-concurrent', customData: (hook.metadata as any)?.customData, @@ -1329,11 +1392,11 @@ describe('e2e', () => { ]); // Wait for the hook to be registered by workflow 1 - await new Promise((resolve) => setTimeout(resolve, 5_000)); - - // Verify the hook exists and belongs to workflow 1 - let hook = await getHookByToken(token); - expect(hook.runId).toBe(run1.runId); + let hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run1.runId + ); + assert(hook, 'Expected hook to be registered by workflow 1'); // Send payload to first workflow - this will trigger it to dispose the hook await resumeHook(hook, { @@ -1343,7 +1406,7 @@ describe('e2e', () => { // Wait for workflow 1 to process the payload and dispose the hook // The workflow has a 5s sleep after disposal, so it's still running - await new Promise((resolve) => setTimeout(resolve, 3_000)); + await waitForHookState(token, (candidate) => candidate === undefined); // Now start workflow 2 with the SAME token while workflow 1 is still running // This should succeed because workflow 1 disposed its hook @@ -1353,11 +1416,11 @@ describe('e2e', () => { ]); // Wait for workflow 2's hook to be registered - await new Promise((resolve) => setTimeout(resolve, 5_000)); - - // Verify the hook now belongs to workflow 2 - hook = await getHookByToken(token); - expect(hook.runId).toBe(run2.runId); + hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run2.runId + ); + assert(hook, 'Expected hook to be registered by workflow 2'); // Send payload to workflow 2 await resumeHook(hook, { @@ -1529,6 +1592,12 @@ describe('e2e', () => { '/.well-known/workflow/v1/flow?__health', deploymentUrl ); + const flowHeadRes = await fetch(flowHealthUrl, { + method: 'HEAD', + headers: await getTrustedSourcesHeaders(), + }); + expect(flowHeadRes.status).toBe(200); + const flowRes = await fetch(flowHealthUrl, { method: 'POST', headers: await getTrustedSourcesHeaders(), @@ -1551,6 +1620,12 @@ describe('e2e', () => { '/.well-known/workflow/v1/step?__health', deploymentUrl ); + const stepHeadRes = await fetch(stepHealthUrl, { + method: 'HEAD', + headers: await getTrustedSourcesHeaders(), + }); + expect(stepHeadRes.status).toBe(200); + const stepRes = await fetch(stepHealthUrl, { method: 'POST', headers: await getTrustedSourcesHeaders(), @@ -2051,7 +2126,13 @@ describe('e2e', () => { ); const returnValue = await run.returnValue; expect(returnValue.startTime).toBeLessThan(returnValue.endTime); - expect(returnValue.endTime - returnValue.startTime).toBeGreaterThan(9999); + + const { json: events } = await cliInspectJson( + `events --run ${run.runId} --json` + ); + const runStartedAt = getEventCreatedAt(events, 'run_started'); + const waitCompletedAt = getEventCreatedAt(events, 'wait_completed'); + expect(waitCompletedAt - runStartedAt).toBeGreaterThan(9999); }); }); @@ -2067,23 +2148,32 @@ describe('e2e', () => { const run = await start(await e2e('hookWithSleepWorkflow'), [token]); - // Wait for the hook to be registered - await new Promise((resolve) => setTimeout(resolve, 5_000)); - // Send 3 payloads: two normal ones, then one with done=true - let hook = await getHookByToken(token); + let hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered'); expect(hook.runId).toBe(run.runId); await resumeHook(hook, { type: 'subscribe', id: 1 }); // Wait for the first payload to be processed (step must complete) await new Promise((resolve) => setTimeout(resolve, 3_000)); - hook = await getHookByToken(token); + hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered after first payload'); await resumeHook(hook, { type: 'subscribe', id: 2 }); await new Promise((resolve) => setTimeout(resolve, 3_000)); - hook = await getHookByToken(token); + hook = await waitForHookState( + token, + (candidate) => candidate?.runId === run.runId + ); + assert(hook, 'Expected hook to be registered after second payload'); await resumeHook(hook, { type: 'done', done: true }); const returnValue = await run.returnValue; diff --git a/packages/core/e2e/local-build.test.ts b/packages/core/e2e/local-build.test.ts index 7793a71c14..cb583ed806 100644 --- a/packages/core/e2e/local-build.test.ts +++ b/packages/core/e2e/local-build.test.ts @@ -64,6 +64,28 @@ async function runCommandWithLiveOutput( }); } +function isRetryableTurbopackLoaderFailure(error: unknown): boolean { + const message = error instanceof Error ? error.message : String(error); + return ( + message.includes('TurbopackInternalError') && + message.includes('failed to receive message') && + message.includes('evaluate_webpack_loader') + ); +} + +async function runBuildWithRetry(cwd: string): Promise { + try { + return await runCommandWithLiveOutput('pnpm', ['build'], cwd); + } catch (error) { + if (!isRetryableTurbopackLoaderFailure(error)) { + throw error; + } + + console.warn('Retrying build after Turbopack loader transport failure.'); + return await runCommandWithLiveOutput('pnpm', ['build'], cwd); + } +} + /** * Read a file if it exists, return null otherwise. */ @@ -105,11 +127,7 @@ describe.each([ return; } - const result = await runCommandWithLiveOutput( - 'pnpm', - ['build'], - getWorkbenchAppPath(project) - ); + const result = await runBuildWithRetry(getWorkbenchAppPath(project)); expect(result.output).not.toContain('Error:'); diff --git a/packages/core/e2e/utils.ts b/packages/core/e2e/utils.ts index fbb8ba256f..ac152b1ab4 100644 --- a/packages/core/e2e/utils.ts +++ b/packages/core/e2e/utils.ts @@ -11,7 +11,8 @@ import { getWorld, setWorld } from '../src/runtime'; const __dirname = dirname(fileURLToPath(import.meta.url)); const defaultCliTimeoutMs = Number( - process.env.WORKFLOW_E2E_CLI_TIMEOUT_MS ?? '20000' + process.env.WORKFLOW_E2E_CLI_TIMEOUT_MS ?? + (process.env.WORKFLOW_VERCEL_ENV ? '90000' : '20000') ); function splitArgs(raw: string): string[] { diff --git a/packages/core/src/runtime/helpers.test.ts b/packages/core/src/runtime/helpers.test.ts index 3844a84c2f..176340f9dc 100644 --- a/packages/core/src/runtime/helpers.test.ts +++ b/packages/core/src/runtime/helpers.test.ts @@ -1,5 +1,6 @@ +import type { World } from '@workflow/world'; import { describe, expect, it, vi } from 'vitest'; -import { getWorkflowQueueName } from './helpers.js'; +import { getWorkflowQueueName, healthCheck } from './helpers.js'; // Mock the logger to suppress output during tests vi.mock('../logger.js', () => ({ @@ -80,3 +81,59 @@ describe('getWorkflowQueueName', () => { expect(() => getWorkflowQueueName('')).toThrow('Invalid workflow name'); }); }); + +describe('healthCheck', () => { + it('returns unhealthy when queue delivery does not settle before the timeout', async () => { + const world = { + queue: vi.fn(() => new Promise(() => {})), + readFromStream: vi.fn(), + } as unknown as World; + + const result = await healthCheck(world, 'workflow', { timeout: 10 }); + + expect(result).toEqual({ + healthy: false, + error: 'Health check timed out after 10ms', + }); + expect(world.queue).toHaveBeenCalledWith( + '__wkf_workflow_health_check', + { + __healthCheck: true, + correlationId: expect.any(String), + }, + { + specVersion: 1, + deploymentId: undefined, + } + ); + expect(world.readFromStream).not.toHaveBeenCalled(); + }); + + it('returns unhealthy when opening the response stream does not settle before the timeout', async () => { + const world = { + queue: vi.fn().mockResolvedValue({ messageId: null }), + readFromStream: vi.fn(() => new Promise(() => {})), + } as unknown as World; + + const result = await healthCheck(world, 'step', { timeout: 10 }); + + expect(result).toEqual({ + healthy: false, + error: 'Health check timed out after 10ms', + }); + expect(world.queue).toHaveBeenCalledWith( + '__wkf_step_health_check', + { + __healthCheck: true, + correlationId: expect.any(String), + }, + { + specVersion: 1, + deploymentId: undefined, + } + ); + expect(world.readFromStream).toHaveBeenCalledWith( + expect.stringMatching(/^__health_check__/) + ); + }); +}); diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 7a725f12ca..babcf02b89 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -143,6 +143,56 @@ const HEALTH_CHECK_POLL_INTERVAL = 100; // (which doesn't work across processes) const HEALTH_CHECK_READ_TIMEOUT = 500; +class HealthCheckTimeoutError extends Error { + constructor(timeout: number) { + super(`Health check timed out after ${timeout}ms`); + } +} + +function getHealthCheckTimeRemaining(startTime: number, timeout: number) { + return timeout - (Date.now() - startTime); +} + +async function withHealthCheckTimeout( + promise: Promise, + startTime: number, + timeout: number +): Promise { + const remaining = getHealthCheckTimeRemaining(startTime, timeout); + if (remaining <= 0) { + throw new HealthCheckTimeoutError(timeout); + } + + let timeoutId: ReturnType | undefined; + try { + return await Promise.race([ + promise, + new Promise((_, reject) => { + timeoutId = setTimeout( + () => reject(new HealthCheckTimeoutError(timeout)), + remaining + ); + }), + ]); + } finally { + if (timeoutId) { + clearTimeout(timeoutId); + } + } +} + +function wait(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function waitForNextHealthCheckPoll(startTime: number, timeout: number) { + const remaining = getHealthCheckTimeRemaining(startTime, timeout); + if (remaining <= 0) { + return; + } + await wait(Math.min(HEALTH_CHECK_POLL_INTERVAL, remaining)); +} + /** * Read chunks from a stream with a timeout per read operation. * Returns { chunks, timedOut } where timedOut indicates if a read timed out. @@ -223,6 +273,32 @@ function parseHealthCheckResponse( return parsed; } +async function readHealthCheckResponse( + world: World, + streamName: string, + startTime: number, + timeout: number +): Promise<{ healthy: boolean } | null> { + const stream = await withHealthCheckTimeout( + world.readFromStream(streamName), + startTime, + timeout + ); + const reader = stream.getReader(); + const readTimeout = Math.min( + HEALTH_CHECK_READ_TIMEOUT, + Math.max(1, getHealthCheckTimeRemaining(startTime, timeout)) + ); + const { chunks, timedOut } = await readStreamWithTimeout(reader, readTimeout); + + if (timedOut) { + await reader.cancel().catch(() => {}); + return null; + } + + return parseHealthCheckResponse(chunks); +} + export async function healthCheck( world: World, endpoint: HealthCheckEndpoint, @@ -240,54 +316,41 @@ export async function healthCheck( const startTime = Date.now(); try { - await world.queue( - queueName, - { __healthCheck: true, correlationId }, - { - // Use JSON transport so the health check works against both - // old (JSON-only) and new (dual) deployments. - specVersion: SPEC_VERSION_LEGACY, - deploymentId: options?.deploymentId, - } + await withHealthCheckTimeout( + world.queue( + queueName, + { __healthCheck: true, correlationId }, + { + // Use JSON transport so the health check works against both + // old (JSON-only) and new (dual) deployments. + specVersion: SPEC_VERSION_LEGACY, + deploymentId: options?.deploymentId, + } + ), + startTime, + timeout ); - while (Date.now() - startTime < timeout) { + while (getHealthCheckTimeRemaining(startTime, timeout) > 0) { try { - const stream = await world.readFromStream(streamName); - const reader = stream.getReader(); - const { chunks, timedOut } = await readStreamWithTimeout( - reader, - HEALTH_CHECK_READ_TIMEOUT + const response = await readHealthCheckResponse( + world, + streamName, + startTime, + timeout ); - - if (timedOut) { - try { - reader.cancel(); - } catch { - // Ignore cancel errors - } - await new Promise((resolve) => - setTimeout(resolve, HEALTH_CHECK_POLL_INTERVAL) - ); - continue; - } - - const response = parseHealthCheckResponse(chunks); if (response) { return { ...response, latencyMs: Date.now() - startTime, }; } - - await new Promise((resolve) => - setTimeout(resolve, HEALTH_CHECK_POLL_INTERVAL) - ); - } catch { - await new Promise((resolve) => - setTimeout(resolve, HEALTH_CHECK_POLL_INTERVAL) - ); + } catch (error) { + if (error instanceof HealthCheckTimeoutError) { + throw error; + } } + await waitForNextHealthCheckPoll(startTime, timeout); } return { healthy: false, @@ -392,6 +455,12 @@ export function withHealthCheck( headers: HEALTH_CHECK_CORS_HEADERS, }); } + if (req.method === 'HEAD') { + return new Response(null, { + status: 200, + headers: HEALTH_CHECK_CORS_HEADERS, + }); + } return new Response( JSON.stringify({ healthy: true, diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index f894e55a1e..6b90f2d990 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -26,6 +26,19 @@ import { waitedUntil } from '../util.js'; import { getWorkflowQueueName } from './helpers.js'; import { getWorld } from './world.js'; +async function materializeResponseBody(response: Response): Promise { + if (!response.body) { + return response; + } + + const body = await response.arrayBuffer(); + return new Response(body, { + status: response.status, + statusText: response.statusText, + headers: response.headers, + }); +} + /** * Internal helper that returns the hook, the associated workflow run, * and the resolved encryption key. @@ -297,9 +310,9 @@ export async function resumeWebhook( const reader = responseReadable.getReader(); const chunk = await reader.read(); if (chunk.value) { - response = chunk.value; + response = await materializeResponseBody(chunk.value); } - reader.cancel(); + await reader.cancel(); } if (!response) { diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 6bf0057a60..006258140b 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -12,7 +12,11 @@ import { } from '@workflow/errors'; import { pluralize } from '@workflow/utils'; import { getPort } from '@workflow/utils/get-port'; -import { SPEC_VERSION_CURRENT, StepInvokePayloadSchema } from '@workflow/world'; +import { + SPEC_VERSION_CURRENT, + type Step, + StepInvokePayloadSchema, +} from '@workflow/world'; import { importKey } from '../encryption.js'; import { runtimeLogger, stepLogger } from '../logger.js'; import { getStepFunction } from '../private.js'; @@ -178,7 +182,7 @@ const stepHandler = createQueueHandler( // - Step not in terminal state (returns 409) // - retryAfter timestamp reached (returns 425 with Retry-After header) // - Workflow still active (returns 410 if completed) - let step; + let step: Step; try { const startResult = await world.events.create( workflowRunId, @@ -780,12 +784,14 @@ const stepHandler = createQueueHandler( // The workflow runtime must be resilient to the below code not executing on a failed step result = await trace('step.dehydrate', {}, async (dehydrateSpan) => { const startTime = Date.now(); + const returnValueOpsStart = ops.length; const dehydrated = await dehydrateStepReturnValue( result, workflowRunId, encryptionKey, ops ); + await Promise.all(ops.slice(returnValueOpsStart)); const durationMs = Date.now() - startTime; dehydrateSpan?.setAttributes({ ...Attribute.QueueSerializeTimeMs(durationMs), diff --git a/packages/nest/src/workflow.controller.ts b/packages/nest/src/workflow.controller.ts index 2b99a0b7d1..2bc2365e0d 100644 --- a/packages/nest/src/workflow.controller.ts +++ b/packages/nest/src/workflow.controller.ts @@ -1,6 +1,6 @@ import { readFileSync } from 'node:fs'; import { pathToFileURL } from 'node:url'; -import { All, Controller, Get, Post, Req, Res } from '@nestjs/common'; +import { All, Controller, Get, Head, Post, Req, Res } from '@nestjs/common'; import { join } from 'pathe'; // Module-level state for configuration @@ -88,23 +88,36 @@ function getOutDir(): string { export class WorkflowController { @Post('step') async handleStep(@Req() req: any, @Res() res: any) { - const outDir = getOutDir(); - const { POST } = await import( - pathToFileURL(join(outDir, 'steps.mjs')).href - ); - const webRequest = toWebRequest(req); - const webResponse = await POST(webRequest); - await sendWebResponse(res, webResponse); + await this.handleGeneratedEndpoint(req, res, 'steps.mjs'); + } + + @Head('step') + async handleStepHead(@Req() req: any, @Res() res: any) { + await this.handleGeneratedEndpoint(req, res, 'steps.mjs'); } @Post('flow') async handleFlow(@Req() req: any, @Res() res: any) { + await this.handleGeneratedEndpoint(req, res, 'workflows.mjs'); + } + + @Head('flow') + async handleFlowHead(@Req() req: any, @Res() res: any) { + await this.handleGeneratedEndpoint(req, res, 'workflows.mjs'); + } + + private async handleGeneratedEndpoint( + req: any, + res: any, + bundleFileName: 'steps.mjs' | 'workflows.mjs' + ) { const outDir = getOutDir(); - const { POST } = await import( - pathToFileURL(join(outDir, 'workflows.mjs')).href + const { HEAD, POST } = await import( + pathToFileURL(join(outDir, bundleFileName)).href ); const webRequest = toWebRequest(req); - const webResponse = await POST(webRequest); + const handler = req.method === 'HEAD' && HEAD ? HEAD : POST; + const webResponse = await handler(webRequest); await sendWebResponse(res, webResponse); } diff --git a/packages/next/src/builder-deferred.ts b/packages/next/src/builder-deferred.ts index 86e1e58c86..eae7e8ef1d 100644 --- a/packages/next/src/builder-deferred.ts +++ b/packages/next/src/builder-deferred.ts @@ -1945,7 +1945,7 @@ export async function getNextBuilderDeferred() { serdeImports ? `// Serde files for cross-context class registration\n${serdeImports}` : '', - "export { stepEntrypoint as POST } from 'workflow/runtime';", + "export { stepEntrypoint as HEAD, stepEntrypoint as POST } from 'workflow/runtime';", ] .filter(Boolean) .join('\n'); diff --git a/packages/sveltekit/src/builder.ts b/packages/sveltekit/src/builder.ts index 3a0a988af6..9e55b2f6e7 100644 --- a/packages/sveltekit/src/builder.ts +++ b/packages/sveltekit/src/builder.ts @@ -21,6 +21,24 @@ const SVELTEKIT_VIRTUAL_MODULES = [ '$app/*', // All $app subpaths ]; +function replaceGeneratedRouteExport( + content: string, + pattern: RegExp, + replacement: string, + errorMessage: string +) { + const sourceMapMarker = '\n//# sourceMappingURL='; + const sourceMapIndex = content.lastIndexOf(sourceMapMarker); + const routeCode = + sourceMapIndex === -1 ? content : content.slice(0, sourceMapIndex); + const sourceMap = sourceMapIndex === -1 ? '' : content.slice(sourceMapIndex); + const wrappedRouteCode = routeCode.replace(pattern, replacement); + if (wrappedRouteCode === routeCode) { + throw new Error(errorMessage); + } + return wrappedRouteCode + sourceMap; +} + export class SvelteKitBuilder extends BaseBuilder { constructor(config?: Partial) { const workingDir = config?.workingDir || process.cwd(); @@ -121,13 +139,18 @@ export class SvelteKitBuilder extends BaseBuilder { let stepsRouteContent = await readFile(stepsRouteFile, 'utf-8'); // Replace the default export with SvelteKit-compatible handler - stepsRouteContent = stepsRouteContent.replace( - /export\s*\{\s*stepEntrypoint\s+as\s+POST\s*\}\s*;?$/m, + stepsRouteContent = replaceGeneratedRouteExport( + stepsRouteContent, + /export\s*\{\s*stepEntrypoint\w*\s+as\s+HEAD\s*,\s*stepEntrypoint\w*\s+as\s+POST\s*\}\s*;?\s*$/m, `${NORMALIZE_REQUEST_CODE} -export const POST = async ({request}) => { +const handleStepRequest = async ({request}) => { const normalRequest = await normalizeRequest(request); return stepEntrypoint(normalRequest); -}` +}; + +export const HEAD = handleStepRequest; +export const POST = handleStepRequest;`, + 'Failed to wrap generated SvelteKit step route' ); await writeFile(stepsRouteFile, stepsRouteContent); @@ -160,14 +183,21 @@ export const POST = async ({request}) => { let workflowsRouteContent = await readFile(workflowsRouteFile, 'utf-8'); // Replace the default export with SvelteKit-compatible handler - workflowsRouteContent = workflowsRouteContent.replace( - /export const POST = workflowEntrypoint\(workflowCode\);?$/m, + const wrappedWorkflowsRouteContent = workflowsRouteContent.replace( + /const handler = workflowEntrypoint\(workflowCode\);\s*export const HEAD = handler;\s*export const POST = handler;?\s*$/m, `${NORMALIZE_REQUEST_CODE} -export const POST = async ({request}) => { +const handleWorkflowRequest = async ({request}) => { const normalRequest = await normalizeRequest(request); return workflowEntrypoint(workflowCode)(normalRequest); -}` +}; + +export const HEAD = handleWorkflowRequest; +export const POST = handleWorkflowRequest;` ); + if (wrappedWorkflowsRouteContent === workflowsRouteContent) { + throw new Error('Failed to wrap generated SvelteKit workflow route'); + } + workflowsRouteContent = wrappedWorkflowsRouteContent; await writeFile(workflowsRouteFile, workflowsRouteContent); return manifest; diff --git a/packages/utils/src/get-port.test.ts b/packages/utils/src/get-port.test.ts index fd44cd1656..ce8722d33e 100644 --- a/packages/utils/src/get-port.test.ts +++ b/packages/utils/src/get-port.test.ts @@ -243,16 +243,19 @@ describe('getWorkflowPort', () => { it('should identify workflow server among multiple ports', async () => { // Non-workflow server (returns 404 for all requests) - const nonWorkflowServer = http.createServer((req, res) => { + const nonWorkflowServer = http.createServer((_req, res) => { res.writeHead(404); res.end(); }); // Workflow server (returns 200 for health check endpoint) const workflowServer = http.createServer((req, res) => { - if (req.url?.includes('__health')) { + if (req.url?.includes('__health') && req.method === 'HEAD') { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('Workflow SDK endpoint is healthy'); + } else if (req.url?.includes('__health')) { + res.writeHead(405, { Allow: 'HEAD' }); + res.end(); } else if (req.url?.startsWith('/.well-known/workflow/v1/')) { res.writeHead(400, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Missing required headers' })); @@ -275,11 +278,11 @@ describe('getWorkflowPort', () => { it('should fall back to first port when probing fails', async () => { // Two non-workflow servers (both return 404) - const server1 = http.createServer((req, res) => { + const server1 = http.createServer((_req, res) => { res.writeHead(404); res.end(); }); - const server2 = http.createServer((req, res) => { + const server2 = http.createServer((_req, res) => { res.writeHead(404); res.end(); }); @@ -302,9 +305,12 @@ describe('getWorkflowPort', () => { }); // Fast workflow server (returns 200 for health check) const fastServer = http.createServer((req, res) => { - if (req.url?.includes('__health')) { + if (req.url?.includes('__health') && req.method === 'HEAD') { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('Workflow SDK endpoint is healthy'); + } else if (req.url?.includes('__health')) { + res.writeHead(405, { Allow: 'HEAD' }); + res.end(); } else if (req.url?.startsWith('/.well-known/workflow/v1/')) { res.writeHead(400); res.end(); @@ -318,21 +324,25 @@ describe('getWorkflowPort', () => { await new Promise((resolve) => slowServer.listen(0, resolve)); await new Promise((resolve) => fastServer.listen(0, resolve)); + const fastAddr = fastServer.address() as AddressInfo; const start = Date.now(); - const port = await getWorkflowPort({ timeout: 100 }); + const port = await getWorkflowPort({ + timeout: 100, + }); const elapsed = Date.now() - start; - const fastAddr = fastServer.address() as AddressInfo; expect(port).toBe(fastAddr.port); - // Should complete reasonably quickly (Windows CI can be slow) expect(elapsed).toBeLessThan(2000); }); it('should handle concurrent getWorkflowPort calls', async () => { const server = http.createServer((req, res) => { - if (req.url?.includes('__health')) { + if (req.url?.includes('__health') && req.method === 'HEAD') { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('Workflow SDK endpoint is healthy'); + } else if (req.url?.includes('__health')) { + res.writeHead(405, { Allow: 'HEAD' }); + res.end(); } else if (req.url?.startsWith('/.well-known/workflow/v1/')) { res.writeHead(400); res.end(); diff --git a/packages/utils/src/get-port.ts b/packages/utils/src/get-port.ts index b6806a35a4..02e136c244 100644 --- a/packages/utils/src/get-port.ts +++ b/packages/utils/src/get-port.ts @@ -21,6 +21,47 @@ function parsePort(value: string, radix = 10): number | undefined { const join = (arr: string[], sep: string) => arr.join(sep); const PROC_ROOT = join(['', 'proc'], '/'); +interface LibuvTcpHandle { + type?: string; + is_active?: boolean; + localEndpoint?: { + port?: number; + }; + remoteEndpoint?: unknown; +} + +function getReportedPorts(): number[] { + const report = process.report?.getReport?.() as + | { libuv?: LibuvTcpHandle[] } + | undefined; + const handles = report?.libuv; + + if (!handles) { + return []; + } + + const ports: number[] = []; + const seen = new Set(); + + for (const handle of handles) { + if ( + handle.type !== 'tcp' || + handle.is_active !== true || + handle.remoteEndpoint !== null + ) { + continue; + } + + const port = parsePort(String(handle.localEndpoint?.port)); + if (port !== undefined && !seen.has(port)) { + ports.push(port); + seen.add(port); + } + } + + return ports; +} + /** * Gets ALL listening ports for the current process on Linux by reading /proc filesystem. * Returns ports in order of file descriptor (deterministic ordering). @@ -205,6 +246,11 @@ export async function getAllPorts(): Promise { const { pid, platform } = process; try { + const reportedPorts = getReportedPorts(); + if (reportedPorts.length > 0) { + return reportedPorts; + } + switch (platform) { case 'linux': return await getLinuxPorts(pid); diff --git a/packages/world-vercel/src/streamer.test.ts b/packages/world-vercel/src/streamer.test.ts index f3046d5832..9cb8f0c95a 100644 --- a/packages/world-vercel/src/streamer.test.ts +++ b/packages/world-vercel/src/streamer.test.ts @@ -226,6 +226,32 @@ describe('writeToStreamMulti pagination', () => { expect(fetchSpy).toHaveBeenCalledTimes(1); }); + it('adds an abort signal to stream writes', async () => { + const fetchSpy = vi + .spyOn(globalThis, 'fetch') + .mockImplementation(async () => new Response('ok')); + + const streamer = await getStreamer(); + + await streamer.writeToStream('s', 'run-1', new Uint8Array([1])); + + expect(fetchSpy).toHaveBeenCalledTimes(1); + expect(fetchSpy.mock.calls[0]?.[1]?.signal).toBeInstanceOf(AbortSignal); + }); + + it('adds an abort signal to stream closes', async () => { + const fetchSpy = vi + .spyOn(globalThis, 'fetch') + .mockImplementation(async () => new Response('ok')); + + const streamer = await getStreamer(); + + await streamer.closeStream('s', 'run-1'); + + expect(fetchSpy).toHaveBeenCalledTimes(1); + expect(fetchSpy.mock.calls[0]?.[1]?.signal).toBeInstanceOf(AbortSignal); + }); + it('paginates into multiple requests when chunks > MAX_CHUNKS_PER_REQUEST', async () => { const fetchSpy = vi .spyOn(globalThis, 'fetch') diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index 6945a98edc..bc9baa6c90 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -17,6 +17,7 @@ import { * MAX_CHUNKS_PER_BATCH. Larger batches are split into multiple requests. */ export const MAX_CHUNKS_PER_REQUEST = 1000; +const DEFAULT_STREAM_MUTATION_TIMEOUT_MS = 30_000; // Streaming calls use plain fetch() without the undici dispatcher. // The dispatcher's retry logic doesn't apply well to streaming operations @@ -36,6 +37,41 @@ function getStreamUrl( return new URL(`${httpConfig.baseUrl}/v2/stream/${encodeURIComponent(name)}`); } +function getStreamMutationTimeoutMs() { + const parsed = Number.parseInt( + process.env.WORKFLOW_VERCEL_STREAM_TIMEOUT_MS ?? '', + 10 + ); + if (Number.isFinite(parsed) && parsed > 0) { + return parsed; + } + return DEFAULT_STREAM_MUTATION_TIMEOUT_MS; +} + +async function fetchStreamMutation( + url: URL, + init: RequestInit, + operation: 'write' | 'close' +) { + const timeoutMs = getStreamMutationTimeoutMs(); + try { + return await fetch(url, { + ...init, + signal: AbortSignal.timeout(timeoutMs), + }); + } catch (err) { + if ( + err instanceof Error && + (err.name === 'AbortError' || err.name === 'TimeoutError') + ) { + throw new Error(`Stream ${operation} timed out after ${timeoutMs}ms`, { + cause: err, + }); + } + throw err; + } +} + /** * Encode multiple chunks into a length-prefixed binary format. * Format: [4 bytes big-endian length][chunk bytes][4 bytes length][chunk bytes]... @@ -107,13 +143,14 @@ export function createStreamer(config?: APIConfig): Streamer { const resolvedRunId = await runId; const httpConfig = await getHttpConfig(config); - const response = await fetch( + const response = await fetchStreamMutation( getStreamUrl(name, resolvedRunId, httpConfig), { method: 'PUT', body: chunk, headers: httpConfig.headers, - } + }, + 'write' ); const text = await response.text(); if (!response.ok) { @@ -149,13 +186,14 @@ export function createStreamer(config?: APIConfig): Streamer { for (let i = 0; i < chunks.length; i += MAX_CHUNKS_PER_REQUEST) { const batch = chunks.slice(i, i + MAX_CHUNKS_PER_REQUEST); const body = encodeMultiChunks(batch); - const response = await fetch( + const response = await fetchStreamMutation( getStreamUrl(name, resolvedRunId, httpConfig), { method: 'PUT', body, headers: httpConfig.headers, - } + }, + 'write' ); const text = await response.text(); if (!response.ok) { @@ -172,12 +210,13 @@ export function createStreamer(config?: APIConfig): Streamer { const httpConfig = await getHttpConfig(config); httpConfig.headers.set('X-Stream-Done', 'true'); - const response = await fetch( + const response = await fetchStreamMutation( getStreamUrl(name, resolvedRunId, httpConfig), { method: 'PUT', headers: httpConfig.headers, - } + }, + 'close' ); const text = await response.text(); if (!response.ok) {