Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/workflow-executor/src/build-workflow-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import ForestServerWorkflowPort from './adapters/forest-server-workflow-port';
import ForestadminClientActivityLogPortFactory from './adapters/forestadmin-client-activity-log-port-factory';
import ServerAiAdapter from './adapters/server-ai-adapter';
import {
DEFAULT_AI_INVOKE_TIMEOUT_MS,
DEFAULT_FOREST_SERVER_URL,
DEFAULT_POLLING_INTERVAL_MS,
DEFAULT_STEP_TIMEOUT_MS,
Expand Down Expand Up @@ -43,6 +44,7 @@ export interface ExecutorOptions {
logger?: Logger;
stopTimeoutMs?: number;
stepTimeoutMs?: number;
aiInvokeTimeoutMs?: number;
// Max auto-chained steps per entry (see RunnerConfig.maxChainDepth). 0 disables chaining.
maxChainDepth?: number;
// Dev only: makes every AI call fail immediately so error paths can be exercised locally.
Expand Down Expand Up @@ -112,6 +114,7 @@ function buildCommonDependencies(options: ExecutorOptions) {
authSecret: options.authSecret,
stopTimeoutMs: options.stopTimeoutMs,
stepTimeoutMs: options.stepTimeoutMs ?? DEFAULT_STEP_TIMEOUT_MS,
aiInvokeTimeoutMs: options.aiInvokeTimeoutMs ?? DEFAULT_AI_INVOKE_TIMEOUT_MS,
maxChainDepth: options.maxChainDepth,
};
}
Expand Down
3 changes: 3 additions & 0 deletions packages/workflow-executor/src/cli-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
type WorkflowExecutor,
} from './build-workflow-executor';
import {
DEFAULT_AI_INVOKE_TIMEOUT_MS,
DEFAULT_FOREST_SERVER_URL,
DEFAULT_HTTP_PORT,
DEFAULT_MAX_CHAIN_DEPTH,
Expand Down Expand Up @@ -158,6 +159,7 @@ export function readEnvConfig(env: NodeJS.ProcessEnv, args: CliArgs): CliConfig
pollingIntervalMs: parsePositiveIntEnv('POLLING_INTERVAL_MS', env.POLLING_INTERVAL_MS),
stopTimeoutMs: parsePositiveIntEnv('STOP_TIMEOUT_MS', env.STOP_TIMEOUT_MS),
stepTimeoutMs: parsePositiveIntEnv('STEP_TIMEOUT_MS', env.STEP_TIMEOUT_MS),
aiInvokeTimeoutMs: parsePositiveIntEnv('AI_INVOKE_TIMEOUT_MS', env.AI_INVOKE_TIMEOUT_MS),
maxChainDepth: parsePositiveIntEnv('MAX_CHAIN_DEPTH', env.MAX_CHAIN_DEPTH),
...(aiConfigurations && { aiConfigurations }),
...(env.FORCE_AI_ERROR === 'true' && { forceAiError: true }),
Expand Down Expand Up @@ -194,6 +196,7 @@ Optional environment variables:
POLLING_INTERVAL_MS Default: ${DEFAULT_POLLING_INTERVAL_MS}
STOP_TIMEOUT_MS Default: ${DEFAULT_STOP_TIMEOUT_MS}
STEP_TIMEOUT_MS Max duration of a step in ms (default: ${DEFAULT_STEP_TIMEOUT_MS})
AI_INVOKE_TIMEOUT_MS Max duration of a single AI provider invocation in ms (default: ${DEFAULT_AI_INVOKE_TIMEOUT_MS})
MAX_CHAIN_DEPTH Max steps auto-executed per run before yielding (default: ${DEFAULT_MAX_CHAIN_DEPTH})
NO_COLOR Set to any value to disable ANSI colors in pretty logs
FORCE_AI_ERROR Set to "true" to make every AI call fail (dev only, to test error paths)
Expand Down
1 change: 1 addition & 0 deletions packages/workflow-executor/src/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ export const DEFAULT_HTTP_PORT = 3400;
export const DEFAULT_FOREST_SERVER_URL = 'https://api.forestadmin.com';
export const DEFAULT_POLLING_INTERVAL_MS = 30_000;
export const DEFAULT_STEP_TIMEOUT_MS = 5 * 60_000;
export const DEFAULT_AI_INVOKE_TIMEOUT_MS = 60_000;
export const DEFAULT_STOP_TIMEOUT_MS = 30_000;
export const DEFAULT_MAX_CHAIN_DEPTH = 50;
12 changes: 12 additions & 0 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ export class StepTimeoutError extends WorkflowExecutorError {
}
}

// Thrown when the AI provider does not respond within the configured timeout — distinct from
// StepTimeoutError so we can surface a provider-specific message and tune the AI timeout
// independently of the step timeout (AI hangs are common; record fetches are not).
export class AiInvokeTimeoutError extends WorkflowExecutorError {
constructor(timeoutMs: number) {
super(
`AI provider did not respond within ${timeoutMs}ms`,
'The AI provider did not respond in time. Please try again, or contact your administrator if the problem persists.',
);
}
}

export class NoMcpToolsError extends WorkflowExecutorError {
constructor(requestedMcpServerId?: string, loadedMcpServerIds?: readonly string[]) {
const technical = requestedMcpServerId
Expand Down
27 changes: 24 additions & 3 deletions packages/workflow-executor/src/executors/base-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type {
import { SystemMessage } from '@forestadmin/ai-proxy';

import {
AiInvokeTimeoutError,
InvalidAiRequestError,
MalformedToolCallError,
MissingToolCallError,
Expand Down Expand Up @@ -328,9 +329,29 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
): Promise<{ toolName: string; args: T }> {
BaseStepExecutor.assertNoMidArraySystemMessages(messages);
const modelWithTools = this.context.model.bindTools(tools, { tool_choice: 'any' });
const response = await modelWithTools.invoke(
BaseStepExecutor.mergeLeadingSystemMessages(messages),
);
const preparedMessages = BaseStepExecutor.mergeLeadingSystemMessages(messages);
const aiTimeoutMs = this.context.aiInvokeTimeoutMs;

let response;

if (!aiTimeoutMs || aiTimeoutMs <= 0) {
response = await modelWithTools.invoke(preparedMessages);
} else {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), aiTimeoutMs);

try {
response = await modelWithTools.invoke(preparedMessages, {
signal: controller.signal,
});
} catch (err) {
if (controller.signal.aborted) throw new AiInvokeTimeoutError(aiTimeoutMs);
throw err;
} finally {
clearTimeout(timer);
}
}

const toolCall = response.tool_calls?.[0];

if (toolCall !== undefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export interface StepContextConfig {
schemaCache: SchemaCache;
logger: Logger;
stepTimeoutMs?: number;
aiInvokeTimeoutMs?: number;
}

export default class StepExecutorFactory {
Expand Down Expand Up @@ -135,6 +136,7 @@ export default class StepExecutorFactory {
logger: cfg.logger,
incomingPendingData,
stepTimeoutMs: cfg.stepTimeoutMs,
aiInvokeTimeoutMs: cfg.aiInvokeTimeoutMs,
activityLogPort,
};
}
Expand Down
5 changes: 5 additions & 0 deletions packages/workflow-executor/src/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export interface RunnerConfig {
// On timeout the step reports status:error; the underlying work is not aborted (Promise.race
// limitation). Late rejections are caught and logged; late resolutions are silently discarded.
stepTimeoutMs?: number;
// Per-AI-invocation timeout (used by BaseStepExecutor.invokeWithTools). Aborts the underlying
// HTTP request via AbortSignal so a hanging provider is killed quickly, before stepTimeoutMs
// would fire. 0/undefined disables.
aiInvokeTimeoutMs?: number;
// Max number of ADDITIONAL steps auto-chained via /update-step response before yielding to the
// next poll cycle (counted after the initial step). 0 disables chaining entirely. Default 50.
maxChainDepth?: number;
Expand Down Expand Up @@ -416,6 +420,7 @@ export default class Runner {
schemaCache: this.config.schemaCache,
logger: this.logger,
stepTimeoutMs: this.config.stepTimeoutMs,
aiInvokeTimeoutMs: this.config.aiInvokeTimeoutMs,
};
}
}
1 change: 1 addition & 0 deletions packages/workflow-executor/src/types/execution-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ export interface ExecutionContext<TStep extends StepDefinition = StepDefinition>
readonly logger: Logger;
readonly incomingPendingData?: unknown;
readonly stepTimeoutMs?: number;
readonly aiInvokeTimeoutMs?: number;
readonly activityLogPort: ActivityLogPort;
}
108 changes: 108 additions & 0 deletions packages/workflow-executor/test/executors/base-step-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { BaseMessage, DynamicStructuredTool } from '@forestadmin/ai-proxy';
import { HumanMessage, SystemMessage } from '@forestadmin/ai-proxy';

import {
AiInvokeTimeoutError,
InvalidAiRequestError,
MalformedToolCallError,
MissingToolCallError,
Expand Down Expand Up @@ -854,6 +855,113 @@ describe('BaseStepExecutor', () => {
);
});
});

describe('AI invoke timeout', () => {
// Mocks a model.invoke that never resolves on its own but rejects with AbortError
// when its received AbortSignal fires — mimics LangChain's behavior on signal.abort().
function makeHangingModel() {
const invoke = jest.fn().mockImplementation(
(_messages, opts) =>
new Promise((_resolve, reject) => {
opts?.signal?.addEventListener('abort', () => {
const err = new Error('Aborted');
err.name = 'AbortError';
reject(err);
});
}),
);

return {
model: {
bindTools: jest.fn().mockReturnValue({ invoke }),
} as unknown as ExecutionContext['model'],
invoke,
};
}

it('throws AiInvokeTimeoutError when model.invoke hangs beyond aiInvokeTimeoutMs', async () => {
jest.useFakeTimers();

try {
const { model } = makeHangingModel();
const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 100 }));
const promise = executor.invokeWithTool(dummyMessages, dummyTool);
// Attach catch synchronously so a late rejection (after advanceTimersByTime) doesn't
// produce an unhandled rejection warning.
const caught = promise.catch(err => err);
jest.advanceTimersByTime(150);
const err = await caught;

expect(err).toBeInstanceOf(AiInvokeTimeoutError);
expect((err as Error).message).toContain('100ms');
} finally {
jest.useRealTimers();
}
});

it('passes the AbortSignal as the second arg to model.invoke', async () => {
const { model, invoke } = makeMockModel({
tool_calls: [{ name: 'tool', args: {}, id: 'c1' }],
});
const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 }));

await executor.invokeWithTool(dummyMessages, dummyTool);

expect(invoke).toHaveBeenCalledWith(
expect.any(Array),
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
});

it('does not pass any options to model.invoke when aiInvokeTimeoutMs is unset', async () => {
const { model, invoke } = makeMockModel({
tool_calls: [{ name: 'tool', args: {}, id: 'c1' }],
});
const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: undefined }));

await executor.invokeWithTool(dummyMessages, dummyTool);

expect(invoke).toHaveBeenCalledTimes(1);
expect(invoke.mock.calls[0]).toHaveLength(1);
});

it('treats aiInvokeTimeoutMs <= 0 as disabled', async () => {
const { model, invoke } = makeMockModel({
tool_calls: [{ name: 'tool', args: {}, id: 'c1' }],
});
const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 0 }));

await executor.invokeWithTool(dummyMessages, dummyTool);

expect(invoke.mock.calls[0]).toHaveLength(1);
});

it('rethrows non-abort errors without wrapping them', async () => {
const apiError = Object.assign(new Error('OpenAI 503'), { status: 503, name: 'APIError' });
const invoke = jest.fn().mockRejectedValue(apiError);
const model = {
bindTools: jest.fn().mockReturnValue({ invoke }),
} as unknown as ExecutionContext['model'];
const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 }));

await expect(executor.invokeWithTool(dummyMessages, dummyTool)).rejects.toBe(apiError);
});

it('clears the timer after a successful invoke (no unref leak)', async () => {
const { model } = makeMockModel({
tool_calls: [{ name: 'tool', args: {}, id: 'c1' }],
});
const clearSpy = jest.spyOn(global, 'clearTimeout');
const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 }));

try {
await executor.invokeWithTool(dummyMessages, dummyTool);
expect(clearSpy).toHaveBeenCalled();
} finally {
clearSpy.mockRestore();
}
});
});
});

describe('patchAndReloadPendingData', () => {
Expand Down
Loading