From 1718cb45e0c5ce75919bba930d16b8ad910e529c Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 28 May 2026 16:51:20 +0200 Subject: [PATCH] fix(workflow-executor): add per-invocation AI timeout to surface hanging provider errors [PRD-409] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the AI provider hangs (no response, internal retries, or holds the connection open), the previous code relied on the global STEP_TIMEOUT_MS (default 5 min) to fail the step. From the user's perspective this looks like an infinite spinner. Add a dedicated timeout on each AI invocation (default 60s, configurable via AI_INVOKE_TIMEOUT_MS) using AbortController + signal so the underlying HTTP request is actually cancelled. On timeout, throws the new AiInvokeTimeoutError, which BaseStepExecutor.execute() converts to an error outcome with a user-friendly message — the orchestrator then sets context.error on the step and the frontend exits its isLoading state immediately. fixes PRD-409 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/build-workflow-executor.ts | 3 + packages/workflow-executor/src/cli-core.ts | 3 + packages/workflow-executor/src/defaults.ts | 1 + packages/workflow-executor/src/errors.ts | 12 ++ .../src/executors/base-step-executor.ts | 27 ++++- .../src/executors/step-executor-factory.ts | 2 + packages/workflow-executor/src/runner.ts | 5 + .../src/types/execution-context.ts | 1 + .../test/executors/base-step-executor.test.ts | 108 ++++++++++++++++++ 9 files changed, 159 insertions(+), 3 deletions(-) diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index 8087d22d0d..65976bd3d7 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -14,6 +14,7 @@ import ForestServerWorkflowPort from './adapters/forest-server-workflow-port'; import ForestadminClientActivityLogPortFactory from './adapters/forestadmin-client-activity-log-port-factory'; import ServerAiAdapter from './adapters/server-ai-adapter'; import { + DEFAULT_AI_INVOKE_TIMEOUT_MS, DEFAULT_FOREST_SERVER_URL, DEFAULT_POLLING_INTERVAL_MS, DEFAULT_STEP_TIMEOUT_MS, @@ -43,6 +44,7 @@ export interface ExecutorOptions { logger?: Logger; stopTimeoutMs?: number; stepTimeoutMs?: number; + aiInvokeTimeoutMs?: number; // Max auto-chained steps per entry (see RunnerConfig.maxChainDepth). 0 disables chaining. maxChainDepth?: number; // Dev only: makes every AI call fail immediately so error paths can be exercised locally. @@ -112,6 +114,7 @@ function buildCommonDependencies(options: ExecutorOptions) { authSecret: options.authSecret, stopTimeoutMs: options.stopTimeoutMs, stepTimeoutMs: options.stepTimeoutMs ?? DEFAULT_STEP_TIMEOUT_MS, + aiInvokeTimeoutMs: options.aiInvokeTimeoutMs ?? DEFAULT_AI_INVOKE_TIMEOUT_MS, maxChainDepth: options.maxChainDepth, }; } diff --git a/packages/workflow-executor/src/cli-core.ts b/packages/workflow-executor/src/cli-core.ts index 1c683af027..88e1458ff5 100644 --- a/packages/workflow-executor/src/cli-core.ts +++ b/packages/workflow-executor/src/cli-core.ts @@ -13,6 +13,7 @@ import { type WorkflowExecutor, } from './build-workflow-executor'; import { + DEFAULT_AI_INVOKE_TIMEOUT_MS, DEFAULT_FOREST_SERVER_URL, DEFAULT_HTTP_PORT, DEFAULT_MAX_CHAIN_DEPTH, @@ -158,6 +159,7 @@ export function readEnvConfig(env: NodeJS.ProcessEnv, args: CliArgs): CliConfig pollingIntervalMs: parsePositiveIntEnv('POLLING_INTERVAL_MS', env.POLLING_INTERVAL_MS), stopTimeoutMs: parsePositiveIntEnv('STOP_TIMEOUT_MS', env.STOP_TIMEOUT_MS), stepTimeoutMs: parsePositiveIntEnv('STEP_TIMEOUT_MS', env.STEP_TIMEOUT_MS), + aiInvokeTimeoutMs: parsePositiveIntEnv('AI_INVOKE_TIMEOUT_MS', env.AI_INVOKE_TIMEOUT_MS), maxChainDepth: parsePositiveIntEnv('MAX_CHAIN_DEPTH', env.MAX_CHAIN_DEPTH), ...(aiConfigurations && { aiConfigurations }), ...(env.FORCE_AI_ERROR === 'true' && { forceAiError: true }), @@ -194,6 +196,7 @@ Optional environment variables: POLLING_INTERVAL_MS Default: ${DEFAULT_POLLING_INTERVAL_MS} STOP_TIMEOUT_MS Default: ${DEFAULT_STOP_TIMEOUT_MS} STEP_TIMEOUT_MS Max duration of a step in ms (default: ${DEFAULT_STEP_TIMEOUT_MS}) + AI_INVOKE_TIMEOUT_MS Max duration of a single AI provider invocation in ms (default: ${DEFAULT_AI_INVOKE_TIMEOUT_MS}) MAX_CHAIN_DEPTH Max steps auto-executed per run before yielding (default: ${DEFAULT_MAX_CHAIN_DEPTH}) NO_COLOR Set to any value to disable ANSI colors in pretty logs FORCE_AI_ERROR Set to "true" to make every AI call fail (dev only, to test error paths) diff --git a/packages/workflow-executor/src/defaults.ts b/packages/workflow-executor/src/defaults.ts index b03e4877e4..10b278fd9c 100644 --- a/packages/workflow-executor/src/defaults.ts +++ b/packages/workflow-executor/src/defaults.ts @@ -2,5 +2,6 @@ export const DEFAULT_HTTP_PORT = 3400; export const DEFAULT_FOREST_SERVER_URL = 'https://api.forestadmin.com'; export const DEFAULT_POLLING_INTERVAL_MS = 30_000; export const DEFAULT_STEP_TIMEOUT_MS = 5 * 60_000; +export const DEFAULT_AI_INVOKE_TIMEOUT_MS = 60_000; export const DEFAULT_STOP_TIMEOUT_MS = 30_000; export const DEFAULT_MAX_CHAIN_DEPTH = 50; diff --git a/packages/workflow-executor/src/errors.ts b/packages/workflow-executor/src/errors.ts index 4a8038a610..8ac3e6ee1c 100644 --- a/packages/workflow-executor/src/errors.ts +++ b/packages/workflow-executor/src/errors.ts @@ -209,6 +209,18 @@ export class StepTimeoutError extends WorkflowExecutorError { } } +// Thrown when the AI provider does not respond within the configured timeout — distinct from +// StepTimeoutError so we can surface a provider-specific message and tune the AI timeout +// independently of the step timeout (AI hangs are common; record fetches are not). +export class AiInvokeTimeoutError extends WorkflowExecutorError { + constructor(timeoutMs: number) { + super( + `AI provider did not respond within ${timeoutMs}ms`, + 'The AI provider did not respond in time. Please try again, or contact your administrator if the problem persists.', + ); + } +} + export class NoMcpToolsError extends WorkflowExecutorError { constructor(requestedMcpServerId?: string, loadedMcpServerIds?: readonly string[]) { const technical = requestedMcpServerId diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index a7d76228c2..88643c3a2b 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -17,6 +17,7 @@ import type { import { SystemMessage } from '@forestadmin/ai-proxy'; import { + AiInvokeTimeoutError, InvalidAiRequestError, MalformedToolCallError, MissingToolCallError, @@ -328,9 +329,29 @@ export default abstract class BaseStepExecutor { BaseStepExecutor.assertNoMidArraySystemMessages(messages); const modelWithTools = this.context.model.bindTools(tools, { tool_choice: 'any' }); - const response = await modelWithTools.invoke( - BaseStepExecutor.mergeLeadingSystemMessages(messages), - ); + const preparedMessages = BaseStepExecutor.mergeLeadingSystemMessages(messages); + const aiTimeoutMs = this.context.aiInvokeTimeoutMs; + + let response; + + if (!aiTimeoutMs || aiTimeoutMs <= 0) { + response = await modelWithTools.invoke(preparedMessages); + } else { + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), aiTimeoutMs); + + try { + response = await modelWithTools.invoke(preparedMessages, { + signal: controller.signal, + }); + } catch (err) { + if (controller.signal.aborted) throw new AiInvokeTimeoutError(aiTimeoutMs); + throw err; + } finally { + clearTimeout(timer); + } + } + const toolCall = response.tool_calls?.[0]; if (toolCall !== undefined) { diff --git a/packages/workflow-executor/src/executors/step-executor-factory.ts b/packages/workflow-executor/src/executors/step-executor-factory.ts index 736b633215..fa7c24a649 100644 --- a/packages/workflow-executor/src/executors/step-executor-factory.ts +++ b/packages/workflow-executor/src/executors/step-executor-factory.ts @@ -41,6 +41,7 @@ export interface StepContextConfig { schemaCache: SchemaCache; logger: Logger; stepTimeoutMs?: number; + aiInvokeTimeoutMs?: number; } export default class StepExecutorFactory { @@ -135,6 +136,7 @@ export default class StepExecutorFactory { logger: cfg.logger, incomingPendingData, stepTimeoutMs: cfg.stepTimeoutMs, + aiInvokeTimeoutMs: cfg.aiInvokeTimeoutMs, activityLogPort, }; } diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index ba8e423e23..86ee4178b9 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -42,6 +42,10 @@ export interface RunnerConfig { // On timeout the step reports status:error; the underlying work is not aborted (Promise.race // limitation). Late rejections are caught and logged; late resolutions are silently discarded. stepTimeoutMs?: number; + // Per-AI-invocation timeout (used by BaseStepExecutor.invokeWithTools). Aborts the underlying + // HTTP request via AbortSignal so a hanging provider is killed quickly, before stepTimeoutMs + // would fire. 0/undefined disables. + aiInvokeTimeoutMs?: number; // Max number of ADDITIONAL steps auto-chained via /update-step response before yielding to the // next poll cycle (counted after the initial step). 0 disables chaining entirely. Default 50. maxChainDepth?: number; @@ -416,6 +420,7 @@ export default class Runner { schemaCache: this.config.schemaCache, logger: this.logger, stepTimeoutMs: this.config.stepTimeoutMs, + aiInvokeTimeoutMs: this.config.aiInvokeTimeoutMs, }; } } diff --git a/packages/workflow-executor/src/types/execution-context.ts b/packages/workflow-executor/src/types/execution-context.ts index 94ea4b3a6d..4a1ca06128 100644 --- a/packages/workflow-executor/src/types/execution-context.ts +++ b/packages/workflow-executor/src/types/execution-context.ts @@ -41,5 +41,6 @@ export interface ExecutionContext readonly logger: Logger; readonly incomingPendingData?: unknown; readonly stepTimeoutMs?: number; + readonly aiInvokeTimeoutMs?: number; readonly activityLogPort: ActivityLogPort; } diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 035b277d2b..5119696a6b 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -11,6 +11,7 @@ import type { BaseMessage, DynamicStructuredTool } from '@forestadmin/ai-proxy'; import { HumanMessage, SystemMessage } from '@forestadmin/ai-proxy'; import { + AiInvokeTimeoutError, InvalidAiRequestError, MalformedToolCallError, MissingToolCallError, @@ -854,6 +855,113 @@ describe('BaseStepExecutor', () => { ); }); }); + + describe('AI invoke timeout', () => { + // Mocks a model.invoke that never resolves on its own but rejects with AbortError + // when its received AbortSignal fires — mimics LangChain's behavior on signal.abort(). + function makeHangingModel() { + const invoke = jest.fn().mockImplementation( + (_messages, opts) => + new Promise((_resolve, reject) => { + opts?.signal?.addEventListener('abort', () => { + const err = new Error('Aborted'); + err.name = 'AbortError'; + reject(err); + }); + }), + ); + + return { + model: { + bindTools: jest.fn().mockReturnValue({ invoke }), + } as unknown as ExecutionContext['model'], + invoke, + }; + } + + it('throws AiInvokeTimeoutError when model.invoke hangs beyond aiInvokeTimeoutMs', async () => { + jest.useFakeTimers(); + + try { + const { model } = makeHangingModel(); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 100 })); + const promise = executor.invokeWithTool(dummyMessages, dummyTool); + // Attach catch synchronously so a late rejection (after advanceTimersByTime) doesn't + // produce an unhandled rejection warning. + const caught = promise.catch(err => err); + jest.advanceTimersByTime(150); + const err = await caught; + + expect(err).toBeInstanceOf(AiInvokeTimeoutError); + expect((err as Error).message).toContain('100ms'); + } finally { + jest.useRealTimers(); + } + }); + + it('passes the AbortSignal as the second arg to model.invoke', async () => { + const { model, invoke } = makeMockModel({ + tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], + }); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 })); + + await executor.invokeWithTool(dummyMessages, dummyTool); + + expect(invoke).toHaveBeenCalledWith( + expect.any(Array), + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + }); + + it('does not pass any options to model.invoke when aiInvokeTimeoutMs is unset', async () => { + const { model, invoke } = makeMockModel({ + tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], + }); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: undefined })); + + await executor.invokeWithTool(dummyMessages, dummyTool); + + expect(invoke).toHaveBeenCalledTimes(1); + expect(invoke.mock.calls[0]).toHaveLength(1); + }); + + it('treats aiInvokeTimeoutMs <= 0 as disabled', async () => { + const { model, invoke } = makeMockModel({ + tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], + }); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 0 })); + + await executor.invokeWithTool(dummyMessages, dummyTool); + + expect(invoke.mock.calls[0]).toHaveLength(1); + }); + + it('rethrows non-abort errors without wrapping them', async () => { + const apiError = Object.assign(new Error('OpenAI 503'), { status: 503, name: 'APIError' }); + const invoke = jest.fn().mockRejectedValue(apiError); + const model = { + bindTools: jest.fn().mockReturnValue({ invoke }), + } as unknown as ExecutionContext['model']; + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 })); + + await expect(executor.invokeWithTool(dummyMessages, dummyTool)).rejects.toBe(apiError); + }); + + it('clears the timer after a successful invoke (no unref leak)', async () => { + const { model } = makeMockModel({ + tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], + }); + const clearSpy = jest.spyOn(global, 'clearTimeout'); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 })); + + try { + await executor.invokeWithTool(dummyMessages, dummyTool); + expect(clearSpy).toHaveBeenCalled(); + } finally { + clearSpy.mockRestore(); + } + }); + }); }); describe('patchAndReloadPendingData', () => {