Skip to content
Open
37 changes: 12 additions & 25 deletions packages/workflow-executor/src/adapters/step-definition-mapper.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type {
ServerTaskTypeEnum,
ServerWorkflowCondition,
ServerWorkflowStep,
ServerWorkflowTask,
} from './server-types';
import type { ConditionStepDefinition, StepDefinition } from '../types/validated/step-definition';

import { ServerTaskTypeEnum } from './server-types';
import { InvalidStepDefinitionError, UnsupportedStepTypeError } from '../errors';
import {
ConditionStepDefinitionSchema,
Expand All @@ -18,48 +18,35 @@ import {
UpdateRecordStepDefinitionSchema,
} from '../types/validated/step-definition';

const TASK_TYPE_TO_STEP_TYPE: Record<ServerTaskTypeEnum, StepType> = {
'get-data': StepType.ReadRecord,
'update-data': StepType.UpdateRecord,
'trigger-action': StepType.TriggerAction,
'load-related-record': StepType.LoadRelatedRecord,
'mcp-server': StepType.Mcp,
guideline: StepType.Guidance,
};

function mapTask(task: ServerWorkflowTask): StepDefinition {
const stepType = TASK_TYPE_TO_STEP_TYPE[task.taskType];

if (!stepType) {
throw new InvalidStepDefinitionError(`Unknown taskType: "${task.taskType}"`);
}

// executionType is passed through as-is; each schema's .default().catch() handles
// missing or unsupported values without requiring an explicit mapping here.
const base = { prompt: task.prompt, executionType: task.executionType };

switch (stepType) {
case StepType.Mcp:
switch (task.taskType) {
case ServerTaskTypeEnum.McpServer:
return McpStepDefinitionSchema.parse({
...base,
type: StepType.Mcp,
...('mcpServerId' in task && { mcpServerId: task.mcpServerId }),
mcpServerId: task.mcpServerId,
});
case StepType.Guidance:
case ServerTaskTypeEnum.Guideline:
return GuidanceStepDefinitionSchema.parse({ ...base, type: StepType.Guidance });
case StepType.ReadRecord:
case ServerTaskTypeEnum.GetData:
return ReadRecordStepDefinitionSchema.parse({ ...base, type: StepType.ReadRecord });
case StepType.UpdateRecord:
case ServerTaskTypeEnum.UpdateData:
return UpdateRecordStepDefinitionSchema.parse({ ...base, type: StepType.UpdateRecord });
case StepType.TriggerAction:
case ServerTaskTypeEnum.TriggerAction:
return TriggerActionStepDefinitionSchema.parse({ ...base, type: StepType.TriggerAction });
case StepType.LoadRelatedRecord:
case ServerTaskTypeEnum.LoadRelatedRecord:
return LoadRelatedRecordStepDefinitionSchema.parse({
...base,
type: StepType.LoadRelatedRecord,
});
default:
throw new InvalidStepDefinitionError(`Unmapped step type: "${stepType}"`);
throw new InvalidStepDefinitionError(
`Unknown taskType: "${(task as { taskType: string }).taskType}"`,
);
}
}

Expand Down
12 changes: 5 additions & 7 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,11 @@ export class StepTimeoutError extends WorkflowExecutorError {
}

export class NoMcpToolsError extends WorkflowExecutorError {
constructor(requestedMcpServerId?: string, loadedMcpServerIds?: readonly string[]) {
const technical = requestedMcpServerId
? `No MCP tools available for mcpServerId="${requestedMcpServerId}". Loaded MCP server ids: [${(
loadedMcpServerIds ?? []
).join(', ')}]`
: 'No MCP tools available';
super(technical, 'No tools are available to execute this step.');
constructor(requestedMcpServerId: string) {
super(
`No MCP tools available for mcpServerId="${requestedMcpServerId}"`,
'Tools could not be loaded for the targeted server. Please try again, or contact your administrator if the problem persists.',
);
}
}

Expand Down
32 changes: 10 additions & 22 deletions packages/workflow-executor/src/executors/mcp-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export default class McpStepExecutor extends BaseStepExecutor<McpStepDefinition>
}

// Branches B & C -- First call
const tools = this.getFilteredTools();
const tools = this.requireTools();
const { toolName, args } = await this.selectTool(tools);
const selectedTool = tools.find(t => t.base.name === toolName);
if (!selectedTool) throw new McpToolNotFoundError(toolName);
Expand All @@ -107,7 +107,7 @@ export default class McpStepExecutor extends BaseStepExecutor<McpStepDefinition>
target: McpToolCall,
existingExecution?: McpStepExecutionData,
): Promise<StepExecutionResult> {
const tools = this.getFilteredTools();
const tools = this.requireTools();
const tool = tools.find(t => t.base.name === target.name && t.sourceId === target.sourceId);
if (!tool) throw new McpToolNotFoundError(target.name);

Expand Down Expand Up @@ -225,27 +225,15 @@ export default class McpStepExecutor extends BaseStepExecutor<McpStepDefinition>
);
}

private getFilteredTools(): RemoteTool[] {
const { mcpServerId } = this.context.stepDefinition;
const tools = mcpServerId
? this.remoteTools.filter(t => t.mcpServerId === mcpServerId)
: [...this.remoteTools];

if (tools.length === 0) {
const loadedMcpServerIds = this.remoteTools
.map(t => t.mcpServerId)
.filter((value): value is string => !!value);
const error = new NoMcpToolsError(mcpServerId, loadedMcpServerIds);
this.context.logger.error(error.message, {
runId: this.context.runId,
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
requestedMcpServerId: mcpServerId,
loadedMcpServerIds,
});
throw error;
// Tools are pre-scoped to step.mcpServerId upstream. An empty list means either no config
// matched, or the per-server connection failed at load time (McpClient swallows per-server
// errors). RemoteToolFetcher emits the diagnostic upstream; here we just surface the empty
// case as a domain error so BaseStepExecutor turns it into a step outcome.
private requireTools(): RemoteTool[] {
if (this.remoteTools.length === 0) {
throw new NoMcpToolsError(this.context.stepDefinition.mcpServerId);
}

return tools;
return [...this.remoteTools];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export default class StepExecutorFactory {
step: AvailableStepExecution,
contextConfig: StepContextConfig,
activityLogPort: ActivityLogPort,
loadTools: () => Promise<RemoteTool[]>,
fetchRemoteTools: (mcpServerId: string) => Promise<RemoteTool[]>,
incomingPendingData?: unknown,
): Promise<IStepExecutor> {
try {
Expand Down Expand Up @@ -76,11 +76,16 @@ export default class StepExecutorFactory {
return new LoadRelatedRecordStepExecutor(
context as ExecutionContext<LoadRelatedRecordStepDefinition>,
);
case StepType.Mcp:

case StepType.Mcp: {
const mcpContext = context as ExecutionContext<McpStepDefinition>;

return new McpStepExecutor(
context as ExecutionContext<McpStepDefinition>,
await loadTools(),
mcpContext,
await fetchRemoteTools(mcpContext.stepDefinition.mcpServerId),
);
}

case StepType.Guidance:
return new GuidanceStepExecutor(context as ExecutionContext<GuidanceStepDefinition>);
default:
Expand Down
82 changes: 82 additions & 0 deletions packages/workflow-executor/src/remote-tool-fetcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import type { AiModelPort } from './ports/ai-model-port';
import type { Logger } from './ports/logger-port';
import type { WorkflowPort } from './ports/workflow-port';
import type { RemoteTool, ToolConfig } from '@forestadmin/ai-proxy';

// Match by config.id, not by Record key: server names can collide across configs.
export function scopeConfigsToServer(
configs: Record<string, ToolConfig>,
mcpServerId: string,
): Record<string, ToolConfig> {
return Object.fromEntries(Object.entries(configs).filter(([, cfg]) => cfg.id === mcpServerId));
}

export default class RemoteToolFetcher {
private readonly workflowPort: WorkflowPort;
private readonly aiModelPort: AiModelPort;
private readonly logger: Logger;

constructor(workflowPort: WorkflowPort, aiModelPort: AiModelPort, logger: Logger) {
this.workflowPort = workflowPort;
this.aiModelPort = aiModelPort;
this.logger = logger;
}

async fetch(mcpServerId: string): Promise<RemoteTool[]> {
const configs = await this.workflowPort.getMcpServerConfigs();
const scoped = scopeConfigsToServer(configs, mcpServerId);

this.warnMissingTargetServer(configs, scoped, mcpServerId);

if (Object.keys(scoped).length === 0) return [];

const tools = await this.aiModelPort.loadRemoteTools(scoped);

this.errorOnPartialLoadFailure(scoped, tools, mcpServerId);

return tools;
}

// Distinguish "no configs at all" (deployment misconfig) from "configs exist but none match"
// (orchestrator/executor drift on server id) — both yield zero tools, but ops need to know
// which one to fix.
private warnMissingTargetServer(
configs: Record<string, ToolConfig>,
scoped: Record<string, ToolConfig>,
mcpServerId: string,
): void {
if (Object.keys(scoped).length > 0) return;

const availableMcpServerIds = Object.values(configs)
.map(cfg => cfg.id)
.filter((id): id is string => Boolean(id));

this.logger.warn(
Object.keys(configs).length === 0
? 'MCP step targets a server but orchestrator returned no MCP configs'
: 'MCP step targets a server not advertised by the orchestrator',
{ requestedMcpServerId: mcpServerId, availableMcpServerIds },
);
}

// Partial-failure detection: McpClient swallows per-server load errors and returns whatever
// succeeded. Match config.id against tool.mcpServerId — both providers populate it from the
// orchestrator's persisted id, so the check is uniform across MCP and Forest connectors.
private errorOnPartialLoadFailure(
scoped: Record<string, ToolConfig>,
tools: RemoteTool[],
mcpServerId: string,
): void {
const loadedMcpServerIds = new Set(tools.map(t => t.mcpServerId));
const failedConfigNames = Object.entries(scoped)
.filter(([, cfg]) => !loadedMcpServerIds.has(cfg.id))
.map(([name]) => name);

if (failedConfigNames.length === 0) return;

this.logger.error('MCP servers failed to load tools', {
requestedMcpServerId: mcpServerId,
failedConfigNames,
});
}
}
17 changes: 8 additions & 9 deletions packages/workflow-executor/src/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import type SchemaCache from './schema-cache';
import type { AvailableStepExecution, StepExecutionResult } from './types/execution-context';
import type { StepExecutionData } from './types/step-execution-data';
import type { StepOutcome } from './types/validated/step-outcome';
import type { RemoteTool } from '@forestadmin/ai-proxy';

import ConsoleLogger from './adapters/console-logger';
import { DEFAULT_MAX_CHAIN_DEPTH, DEFAULT_STOP_TIMEOUT_MS } from './defaults';
Expand All @@ -22,6 +21,7 @@ import {
} from './errors';
import StepExecutorFactory from './executors/step-executor-factory';
import InFlightRunRegistry from './in-flight-run-registry';
import RemoteToolFetcher from './remote-tool-fetcher';
import { stepTypeToOutcomeType } from './types/validated/step-outcome';
import validateSecrets from './validate-secrets';

Expand Down Expand Up @@ -52,11 +52,17 @@ export default class Runner {
private pollingTimer: NodeJS.Timeout | null = null;
private readonly inFlightRuns = new InFlightRunRegistry();
private readonly logger: Logger;
private readonly remoteToolFetcher: RemoteToolFetcher;
private _state: RunnerState = 'idle';

constructor(config: RunnerConfig) {
this.config = config;
this.logger = config.logger ?? new ConsoleLogger();
this.remoteToolFetcher = new RemoteToolFetcher(
config.workflowPort,
config.aiModelPort,
this.logger,
);
}

get state(): RunnerState {
Expand Down Expand Up @@ -251,13 +257,6 @@ export default class Runner {
}
}

private async fetchRemoteTools(): Promise<RemoteTool[]> {
const configs = await this.config.workflowPort.getMcpServerConfigs();
if (Object.keys(configs).length === 0) return [];

return this.config.aiModelPort.loadRemoteTools(configs);
}

private executeStep(
step: AvailableStepExecution,
forestServerToken: string,
Expand Down Expand Up @@ -295,7 +294,7 @@ export default class Runner {
currentStep,
this.contextConfig,
this.config.activityLogPortFactory.forRun(currentToken),
() => this.fetchRemoteTools(),
mcpServerId => this.remoteToolFetcher.fetch(mcpServerId),
currentIncomingData,
);
result = await executor.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ export const McpStepDefinitionSchema = z.object({
.enum([AutomatedWithConfirmation, FullyAutomated])
.default(AutomatedWithConfirmation)
.catch(AutomatedWithConfirmation),
mcpServerId: z.string().optional(),
mcpServerId: z.string().min(1),
});
export type McpStepDefinition = z.infer<typeof McpStepDefinitionSchema>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,10 @@ describe('toStepDefinition', () => {
});
});

it('should map task with mcp-server taskType without mcpServerId', () => {
it('rejects an mcp-server task missing mcpServerId at the zod boundary', () => {
const task = makeTask({ taskType: ServerTaskTypeEnum.McpServer, prompt: 'run mcp' });

expect(toStepDefinition(task)).toEqual({
type: StepType.Mcp,
prompt: 'run mcp',
executionType: ServerStepExecutionTypeEnum.AutomatedWithConfirmation,
});
expect(() => toStepDefinition(task)).toThrow();
});

it('should map task with guideline taskType to guidance', () => {
Expand Down
35 changes: 7 additions & 28 deletions packages/workflow-executor/test/errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,38 +70,17 @@ describe('extractErrorMessage', () => {
});

describe('NoMcpToolsError', () => {
it('produces a fully generic technical message when no mcpServerId was requested (no filter case)', () => {
const err = new NoMcpToolsError();
it('includes the requested mcpServerId in the technical message', () => {
const err = new NoMcpToolsError('id-missing');

expect(err.message).toBe('No MCP tools available');
expect(err.userMessage).toBe('No tools are available to execute this step.');
expect(err.message).toBe('No MCP tools available for mcpServerId="id-missing"');
});

it('includes the requested mcpServerId in the technical message when a filter was active', () => {
const err = new NoMcpToolsError('id-missing', ['id-A', 'id-B']);
it('keeps the user-facing message free of internal ids', () => {
const err = new NoMcpToolsError('id-missing');

expect(err.message).toMatch(/id-missing/);
});

it('lists the loaded mcpServerIds in the technical message so misconfigurations are diagnosable', () => {
const err = new NoMcpToolsError('id-missing', ['id-A', 'id-B']);

expect(err.message).toMatch(/id-A/);
expect(err.message).toMatch(/id-B/);
});

it('handles an empty loaded-id list without producing a malformed message', () => {
const err = new NoMcpToolsError('id-missing', []);

expect(err.message).toMatch(/id-missing/);
expect(err.message).not.toMatch(/undefined|null|\[object/i);
});

it('keeps the user-facing message generic — no internal ids must leak', () => {
const err = new NoMcpToolsError('id-missing', ['id-A', 'id-B']);

expect(err.userMessage).toBe('No tools are available to execute this step.');
expect(err.userMessage).not.toMatch(/id-missing|id-A|id-B/);
expect(err.userMessage).toMatch(/^Tools could not be loaded for the targeted server\./);
expect(err.userMessage).not.toMatch(/id-missing/);
});
});

Expand Down
Loading
Loading