From f1b6872da3787027ab2b2ecbc698eb9e5c841d8f Mon Sep 17 00:00:00 2001 From: Brun Christophe Date: Thu, 28 May 2026 16:21:46 +0200 Subject: [PATCH 1/7] fix(workflow-executor): scope MCP tool fetch to the step's target server (PRD-363) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every MCP step opened connections to every configured MCP server and listed their tools, then filtered in-memory by step.mcpServerId. With N configured servers, an MCP step paid N× the connect/list-tools roundtrip cost, hit every upstream server (billable/rate-limited), and over-allocated MCP connections. Filter configs by cfg.id (DB id from PRD-360) inside Runner.fetchRemoteTools before delegating to loadRemoteTools. The MCP branch of StepExecutorFactory forwards step.mcpServerId — the Runner stays generic. The executor's getFilteredTools collapses into requireTools (pre-scoped list → assert non-empty). fixes PRD-363 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/executors/mcp-step-executor.ts | 25 +-- .../src/executors/step-executor-factory.ts | 13 +- packages/workflow-executor/src/runner.ts | 14 +- .../test/executors/mcp-step-executor.test.ts | 125 ++--------- .../workflow-executor/test/runner.test.ts | 205 +++++++++++++++++- 5 files changed, 244 insertions(+), 138 deletions(-) diff --git a/packages/workflow-executor/src/executors/mcp-step-executor.ts b/packages/workflow-executor/src/executors/mcp-step-executor.ts index 4477125809..68807559d2 100644 --- a/packages/workflow-executor/src/executors/mcp-step-executor.ts +++ b/packages/workflow-executor/src/executors/mcp-step-executor.ts @@ -82,7 +82,7 @@ export default class McpStepExecutor extends BaseStepExecutor } // Branches B & C -- First call - const tools = this.getFilteredTools(); + const tools = this.requireTools(); const { toolName, args } = await this.selectTool(tools); const selectedTool = tools.find(t => t.base.name === toolName); if (!selectedTool) throw new McpToolNotFoundError(toolName); @@ -107,7 +107,7 @@ export default class McpStepExecutor extends BaseStepExecutor target: McpToolCall, existingExecution?: McpStepExecutionData, ): Promise { - const tools = this.getFilteredTools(); + const tools = this.requireTools(); const tool = tools.find(t => t.base.name === target.name && t.sourceId === target.sourceId); if (!tool) throw new McpToolNotFoundError(target.name); @@ -225,27 +225,22 @@ export default class McpStepExecutor extends BaseStepExecutor ); } - private getFilteredTools(): RemoteTool[] { - const { mcpServerId } = this.context.stepDefinition; - const tools = mcpServerId - ? this.remoteTools.filter(t => t.mcpServerId === mcpServerId) - : [...this.remoteTools]; - - if (tools.length === 0) { - const loadedMcpServerIds = this.remoteTools - .map(t => t.mcpServerId) - .filter((value): value is string => !!value); - const error = new NoMcpToolsError(mcpServerId, loadedMcpServerIds); + // Tools are pre-scoped to step.mcpServerId by Runner.fetchRemoteTools (PRD-363) — the executor + // only has to assert non-empty. An empty list here means: the orchestrator returned no config + // matching the step's mcpServerId, OR every per-server connection failed inside loadRemoteTools. + private requireTools(): RemoteTool[] { + if (this.remoteTools.length === 0) { + const { mcpServerId } = this.context.stepDefinition; + const error = new NoMcpToolsError(mcpServerId, []); this.context.logger.error(error.message, { runId: this.context.runId, stepId: this.context.stepId, stepIndex: this.context.stepIndex, requestedMcpServerId: mcpServerId, - loadedMcpServerIds, }); throw error; } - return tools; + return [...this.remoteTools]; } } diff --git a/packages/workflow-executor/src/executors/step-executor-factory.ts b/packages/workflow-executor/src/executors/step-executor-factory.ts index 736b633215..0d23617907 100644 --- a/packages/workflow-executor/src/executors/step-executor-factory.ts +++ b/packages/workflow-executor/src/executors/step-executor-factory.ts @@ -48,7 +48,7 @@ export default class StepExecutorFactory { step: AvailableStepExecution, contextConfig: StepContextConfig, activityLogPort: ActivityLogPort, - loadTools: () => Promise, + fetchRemoteTools: (mcpServerId?: string) => Promise, incomingPendingData?: unknown, ): Promise { try { @@ -76,11 +76,16 @@ export default class StepExecutorFactory { return new LoadRelatedRecordStepExecutor( context as ExecutionContext, ); - case StepType.Mcp: + + case StepType.Mcp: { + const mcpContext = context as ExecutionContext; + return new McpStepExecutor( - context as ExecutionContext, - await loadTools(), + mcpContext, + await fetchRemoteTools(mcpContext.stepDefinition.mcpServerId), ); + } + case StepType.Guidance: return new GuidanceStepExecutor(context as ExecutionContext); default: diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index ba8e423e23..38ba65b3f5 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -251,11 +251,17 @@ export default class Runner { } } - private async fetchRemoteTools(): Promise { + // Match by config.id, not by Record key: server names can collide across configs. + private async fetchRemoteTools(mcpServerId?: string): Promise { const configs = await this.config.workflowPort.getMcpServerConfigs(); - if (Object.keys(configs).length === 0) return []; - return this.config.aiModelPort.loadRemoteTools(configs); + const scoped = mcpServerId + ? Object.fromEntries(Object.entries(configs).filter(([, cfg]) => cfg.id === mcpServerId)) + : configs; + + if (Object.keys(scoped).length === 0) return []; + + return this.config.aiModelPort.loadRemoteTools(scoped); } private executeStep( @@ -295,7 +301,7 @@ export default class Runner { currentStep, this.contextConfig, this.config.activityLogPortFactory.forRun(currentToken), - () => this.fetchRemoteTools(), + (mcpServerId?: string) => this.fetchRemoteTools(mcpServerId), currentIncomingData, ); result = await executor.execute(); diff --git a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts index d77cbf67a8..8f030b1143 100644 --- a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts @@ -444,90 +444,30 @@ describe('McpStepExecutor', () => { }); }); - describe('mcpServerId filter (matches by tool.mcpServerId, not tool.sourceId)', () => { - it('passes only tools whose mcpServerId matches step.mcpServerId to the AI', async () => { - const toolA = new MockRemoteTool({ - name: 'tool_a', - sourceId: 'zendesk', - mcpServerId: 'id-A', - }); - const toolB = new MockRemoteTool({ - name: 'tool_b', - sourceId: 'zendesk', - mcpServerId: 'id-B', - }); - const invokeFn = jest.fn().mockResolvedValue('ok'); - const toolB2 = new MockRemoteTool({ - name: 'tool_b2', - sourceId: 'zendesk', - mcpServerId: 'id-B', - invoke: invokeFn, - }); - - const { model, bindTools } = makeMockModel('tool_b', {}); - const runStore = makeMockRunStore(); + // PRD-363: tool-list scoping is performed in Runner.fetchRemoteTools (filter by config.id). + // The executor consumes pre-scoped tools and only asserts non-empty; per-tool re-filtering + // would just be redundant defense-in-depth, so the tests below assert the simpler contract. + describe('forwards all provided remoteTools to the AI', () => { + it('binds every tool it receives — scoping is the Runner-fetch contract, not the executor', async () => { + const toolA = new MockRemoteTool({ name: 'tool_a', mcpServerId: 'id-A' }); + const toolB = new MockRemoteTool({ name: 'tool_b', mcpServerId: 'id-A' }); + const { model, bindTools } = makeMockModel('tool_a', {}); const context = makeContext({ model, - runStore, stepDefinition: makeStep({ - mcpServerId: 'id-B', + mcpServerId: 'id-A', executionType: StepExecutionMode.FullyAutomated, }), }); - const executor = new McpStepExecutor(context, [toolA, toolB, toolB2]); - - await executor.execute(); - - const boundTools = bindTools.mock.calls[0][0] as Array<{ name: string }>; - const boundNames = boundTools.map(t => t.name); - expect(boundNames).not.toContain('tool_a'); - expect(boundNames).toContain('tool_b'); - expect(boundNames).toContain('tool_b2'); - }); - - it('does not match by sourceId — server-name collisions must not leak tools across configs', async () => { - const tool = new MockRemoteTool({ - name: 'tool_a', - sourceId: 'server-B', - mcpServerId: 'id-99', - }); - const context = makeContext({ - stepDefinition: makeStep({ mcpServerId: 'server-B' }), - }); - const executor = new McpStepExecutor(context, [tool]); - - const result = await executor.execute(); - - expect(result.stepOutcome.status).toBe('error'); - expect(result.stepOutcome.error).toBe('No tools are available to execute this step.'); - }); - - it('returns all tools (no filter) when step.mcpServerId is absent', async () => { - const toolA = new MockRemoteTool({ - name: 'tool_a', - sourceId: 'server-A', - mcpServerId: 'id-A', - }); - const toolB = new MockRemoteTool({ - name: 'tool_b', - sourceId: 'server-B', - mcpServerId: 'id-B', - }); - const { model, bindTools } = makeMockModel('tool_a', {}); - const context = makeContext({ - model, - stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), - }); const executor = new McpStepExecutor(context, [toolA, toolB]); await executor.execute(); const boundTools = bindTools.mock.calls[0][0] as Array<{ name: string }>; - const boundNames = boundTools.map(t => t.name); - expect(boundNames).toEqual(expect.arrayContaining(['tool_a', 'tool_b'])); + expect(boundTools.map(t => t.name)).toEqual(expect.arrayContaining(['tool_a', 'tool_b'])); }); - it('resolves a Forest-connector-backed tool when its mcpServerId is threaded through', async () => { + it('resolves a Forest-connector-backed tool end-to-end', async () => { const invokeFn = jest.fn().mockResolvedValue('done'); const forestTool = new MockRemoteTool({ name: 'zendesk_get_tickets', @@ -565,31 +505,9 @@ describe('McpStepExecutor', () => { expect(result.stepOutcome.error).toBe('No tools are available to execute this step.'); }); - it('returns error when mcpServerId filter yields no tools', async () => { - const tool = new MockRemoteTool({ - name: 'tool_a', - sourceId: 'server-A', - mcpServerId: 'id-A', - }); - const context = makeContext({ - stepDefinition: makeStep({ mcpServerId: 'id-B' }), - }); - const executor = new McpStepExecutor(context, [tool]); - - const result = await executor.execute(); - - expect(result.stepOutcome.status).toBe('error'); - expect(result.stepOutcome.error).toBe('No tools are available to execute this step.'); - }); - it('keeps the user-facing error message generic regardless of the misconfigured mcpServerId', async () => { - const tool = new MockRemoteTool({ - name: 'tool_a', - sourceId: 'server-A', - mcpServerId: 'id-A', - }); const context = makeContext({ stepDefinition: makeStep({ mcpServerId: 'id-B' }) }); - const executor = new McpStepExecutor(context, [tool]); + const executor = new McpStepExecutor(context, []); const result = await executor.execute(); @@ -597,32 +515,19 @@ describe('McpStepExecutor', () => { expect(result.stepOutcome.error).not.toMatch(/id-B/); }); - it('logs the technical message with the requested mcpServerId and loaded mcpServerIds when filter misses', async () => { + it('logs the technical message with the requested mcpServerId when tools are empty', async () => { const logger = { info: jest.fn(), warn: jest.fn(), error: jest.fn() }; - const toolA = new MockRemoteTool({ - name: 'tool_a', - sourceId: 'server-A', - mcpServerId: 'id-A', - }); - const toolB = new MockRemoteTool({ - name: 'tool_b', - sourceId: 'server-B', - mcpServerId: 'id-B', - }); const context = makeContext({ logger, stepDefinition: makeStep({ mcpServerId: 'id-missing' }), }); - const executor = new McpStepExecutor(context, [toolA, toolB]); + const executor = new McpStepExecutor(context, []); await executor.execute(); expect(logger.error).toHaveBeenCalledWith( expect.stringMatching(/id-missing/), - expect.objectContaining({ - requestedMcpServerId: 'id-missing', - loadedMcpServerIds: expect.arrayContaining(['id-A', 'id-B']), - }), + expect.objectContaining({ requestedMcpServerId: 'id-missing' }), ); }); }); diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index f578c5daa9..29da022729 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -1212,6 +1212,187 @@ describe('MCP lazy loading (via once thunk)', () => { }); }); +// --------------------------------------------------------------------------- +// PRD-363: MCP fetch scoping +// --------------------------------------------------------------------------- + +describe('MCP fetch scoping (PRD-363)', () => { + it('passes only the matching config to loadRemoteTools when step.mcpServerId is set', async () => { + const workflowPort = createMockWorkflowPort(); + const aiClient = createMockAiClient(); + const step = makePendingStep({ + runId: 'run-1', + stepId: 'step-mcp-1', + stepType: StepType.Mcp, + stepDefinition: { + type: StepType.Mcp, + executionType: StepExecutionMode.AutomatedWithConfirmation, + mcpServerId: 'id-A', + }, + }); + workflowPort.getAvailableRun.mockResolvedValue({ + step, + auth: { forestServerToken: 'test-forest-token' }, + }); + workflowPort.getMcpServerConfigs.mockResolvedValue({ + 'server-A': { id: 'id-A', url: 'https://a.example', type: 'http', headers: {} }, + 'server-B': { id: 'id-B', url: 'https://b.example', type: 'http', headers: {} }, + }); + + runner = new Runner( + createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), + ); + await runner.triggerPoll('run-1'); + + expect(aiClient.loadRemoteTools).toHaveBeenCalledTimes(1); + expect(aiClient.loadRemoteTools).toHaveBeenCalledWith({ + 'server-A': expect.objectContaining({ id: 'id-A' }), + }); + }); + + // Matching by Record key would let a renamed server collide with another config's id. + it('matches by config.id, not by the Record key (server name)', async () => { + const workflowPort = createMockWorkflowPort(); + const aiClient = createMockAiClient(); + const step = makePendingStep({ + runId: 'run-1', + stepId: 'step-mcp-1', + stepType: StepType.Mcp, + stepDefinition: { + type: StepType.Mcp, + executionType: StepExecutionMode.AutomatedWithConfirmation, + // mcpServerId resembles a server name from another entry, but must match by id. + mcpServerId: 'server-A', + }, + }); + workflowPort.getAvailableRun.mockResolvedValue({ + step, + auth: { forestServerToken: 'test-forest-token' }, + }); + workflowPort.getMcpServerConfigs.mockResolvedValue({ + 'server-A': { id: 'id-A', url: 'https://a.example', type: 'http', headers: {} }, + 'server-B': { id: 'server-A', url: 'https://b.example', type: 'http', headers: {} }, + }); + + runner = new Runner( + createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), + ); + await runner.triggerPoll('run-1'); + + expect(aiClient.loadRemoteTools).toHaveBeenCalledWith({ + 'server-B': expect.objectContaining({ id: 'server-A' }), + }); + }); + + it('skips loadRemoteTools when no config matches the step.mcpServerId', async () => { + const workflowPort = createMockWorkflowPort(); + const aiClient = createMockAiClient(); + const step = makePendingStep({ + runId: 'run-1', + stepId: 'step-mcp-1', + stepType: StepType.Mcp, + stepDefinition: { + type: StepType.Mcp, + executionType: StepExecutionMode.AutomatedWithConfirmation, + mcpServerId: 'id-missing', + }, + }); + workflowPort.getAvailableRun.mockResolvedValue({ + step, + auth: { forestServerToken: 'test-forest-token' }, + }); + workflowPort.getMcpServerConfigs.mockResolvedValue({ + 'server-A': { id: 'id-A', url: 'https://a.example', type: 'http', headers: {} }, + }); + + runner = new Runner( + createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), + ); + await runner.triggerPoll('run-1'); + + expect(workflowPort.getMcpServerConfigs).toHaveBeenCalledTimes(1); + expect(aiClient.loadRemoteTools).not.toHaveBeenCalled(); + }); + + // Branch A re-entry: the same run can be re-dispatched (chained or triggered after + // awaiting-input). Each dispatch must re-fetch tools scoped to its own step's mcpServerId. + it('scopes loadRemoteTools per dispatch on chained MCP steps (Branch A re-entry)', async () => { + const workflowPort = createMockWorkflowPort(); + const aiClient = createMockAiClient(); + const mcpDef = (id: string) => + ({ + type: StepType.Mcp, + executionType: StepExecutionMode.AutomatedWithConfirmation, + mcpServerId: id, + } as const); + const initial = makePendingStep({ + runId: 'run-1', + stepId: 'step-0', + stepIndex: 0, + stepType: StepType.Mcp, + stepDefinition: mcpDef('id-A'), + }); + const chained = makePendingStep({ + runId: 'run-1', + stepId: 'step-1', + stepIndex: 1, + stepType: StepType.Mcp, + stepDefinition: mcpDef('id-A'), + }); + workflowPort.getAvailableRun.mockResolvedValueOnce({ + step: initial, + auth: { forestServerToken: 'token-0' }, + }); + workflowPort.updateStepExecution + .mockResolvedValueOnce({ step: chained, auth: { forestServerToken: 'token-1' } }) + .mockResolvedValueOnce(null); + workflowPort.getMcpServerConfigs.mockResolvedValue({ + 'server-A': { id: 'id-A', url: 'https://a.example', type: 'http', headers: {} }, + 'server-B': { id: 'id-B', url: 'https://b.example', type: 'http', headers: {} }, + }); + + runner = new Runner( + createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), + ); + await runner.triggerPoll('run-1'); + + expect(aiClient.loadRemoteTools).toHaveBeenCalledTimes(2); + expect(aiClient.loadRemoteTools).toHaveBeenNthCalledWith(1, { + 'server-A': expect.objectContaining({ id: 'id-A' }), + }); + expect(aiClient.loadRemoteTools).toHaveBeenNthCalledWith(2, { + 'server-A': expect.objectContaining({ id: 'id-A' }), + }); + }); + + it('passes the full Record when step.mcpServerId is absent (legacy fallback)', async () => { + const workflowPort = createMockWorkflowPort(); + const aiClient = createMockAiClient(); + const step = makePendingStep({ + runId: 'run-1', + stepId: 'step-mcp-1', + stepType: StepType.Mcp, + // No mcpServerId on the step definition — preserves legacy behaviour. + }); + workflowPort.getAvailableRun.mockResolvedValue({ + step, + auth: { forestServerToken: 'test-forest-token' }, + }); + const allConfigs = { + 'server-A': { id: 'id-A', url: 'https://a.example', type: 'http' as const, headers: {} }, + 'server-B': { id: 'id-B', url: 'https://b.example', type: 'http' as const, headers: {} }, + }; + workflowPort.getMcpServerConfigs.mockResolvedValue(allConfigs); + + runner = new Runner( + createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), + ); + await runner.triggerPoll('run-1'); + + expect(aiClient.loadRemoteTools).toHaveBeenCalledWith(allConfigs); + }); +}); + // --------------------------------------------------------------------------- // getExecutor — factory // --------------------------------------------------------------------------- @@ -1289,17 +1470,31 @@ describe('StepExecutorFactory.create — factory', () => { expect(executor).toBeInstanceOf(LoadRelatedRecordStepExecutor); }); - it('dispatches McpTask steps to McpStepExecutor and calls loadTools', async () => { - const step = makePendingStep({ stepType: StepType.Mcp }); - const loadTools = jest.fn().mockResolvedValue([]); + it('dispatches McpTask steps to McpStepExecutor and forwards step.mcpServerId to fetchRemoteTools', async () => { + const step = makePendingStep({ + stepType: StepType.Mcp, + stepDefinition: { + type: StepType.Mcp, + executionType: StepExecutionMode.AutomatedWithConfirmation, + mcpServerId: 'srv-42', + }, + }); + const fetchRemoteTools = jest.fn().mockResolvedValue([]); const executor = await StepExecutorFactory.create( step, makeContextConfig(), makeRunLogger(), - loadTools, + fetchRemoteTools, ); expect(executor).toBeInstanceOf(McpStepExecutor); - expect(loadTools).toHaveBeenCalledTimes(1); + expect(fetchRemoteTools).toHaveBeenCalledWith('srv-42'); + }); + + it('forwards undefined to fetchRemoteTools when McpTask step has no mcpServerId (legacy)', async () => { + const step = makePendingStep({ stepType: StepType.Mcp }); + const fetchRemoteTools = jest.fn().mockResolvedValue([]); + await StepExecutorFactory.create(step, makeContextConfig(), makeRunLogger(), fetchRemoteTools); + expect(fetchRemoteTools).toHaveBeenCalledWith(undefined); }); it('dispatches Guidance steps to GuidanceStepExecutor', async () => { From 8f79aafdddba8be6684733bb6739094ebea683c2 Mon Sep 17 00:00:00 2001 From: Brun Christophe Date: Thu, 28 May 2026 16:57:02 +0200 Subject: [PATCH 2/7] fix(workflow-executor): tighten MCP scoping tests and diagnostics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review feedback: - Strengthen tests with divergent mcpServerId fixtures so a future re-introduced filter or per-dispatch memoization regression fails. - Update the MCP integration test to exercise the cfg.id matching. - Surface a warn log when the orchestrator returns configs but none matches the step's mcpServerId — disambiguates "no configs" from "missing server config" in ops logs. - Strip ticket refs and internal taxonomy from inline comments. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/executors/mcp-step-executor.ts | 6 +++--- packages/workflow-executor/src/runner.ts | 9 +++++++++ .../test/executors/mcp-step-executor.test.ts | 14 +++++++------- .../test/integration/workflow-execution.test.ts | 5 ++++- packages/workflow-executor/test/runner.test.ts | 14 ++++---------- 5 files changed, 27 insertions(+), 21 deletions(-) diff --git a/packages/workflow-executor/src/executors/mcp-step-executor.ts b/packages/workflow-executor/src/executors/mcp-step-executor.ts index 68807559d2..714511e45c 100644 --- a/packages/workflow-executor/src/executors/mcp-step-executor.ts +++ b/packages/workflow-executor/src/executors/mcp-step-executor.ts @@ -225,9 +225,9 @@ export default class McpStepExecutor extends BaseStepExecutor ); } - // Tools are pre-scoped to step.mcpServerId by Runner.fetchRemoteTools (PRD-363) — the executor - // only has to assert non-empty. An empty list here means: the orchestrator returned no config - // matching the step's mcpServerId, OR every per-server connection failed inside loadRemoteTools. + // Tools are pre-scoped to step.mcpServerId upstream; the executor only asserts non-empty. + // An empty list means either no config matched the step's mcpServerId, or every per-server + // connection failed at load time. private requireTools(): RemoteTool[] { if (this.remoteTools.length === 0) { const { mcpServerId } = this.context.stepDefinition; diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index 38ba65b3f5..6ed826965d 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -259,6 +259,15 @@ export default class Runner { ? Object.fromEntries(Object.entries(configs).filter(([, cfg]) => cfg.id === mcpServerId)) : configs; + // Disambiguate "no configs at all" from "configs returned but none matched" — the latter + // indicates the orchestrator did not advertise the server the step references. + if (mcpServerId && Object.keys(configs).length > 0 && Object.keys(scoped).length === 0) { + this.logger.warn('MCP step targets a server not advertised by the orchestrator', { + requestedMcpServerId: mcpServerId, + availableMcpServerIds: Object.values(configs).map(cfg => cfg.id), + }); + } + if (Object.keys(scoped).length === 0) return []; return this.config.aiModelPort.loadRemoteTools(scoped); diff --git a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts index 8f030b1143..b2ab9a4234 100644 --- a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts @@ -444,13 +444,13 @@ describe('McpStepExecutor', () => { }); }); - // PRD-363: tool-list scoping is performed in Runner.fetchRemoteTools (filter by config.id). - // The executor consumes pre-scoped tools and only asserts non-empty; per-tool re-filtering - // would just be redundant defense-in-depth, so the tests below assert the simpler contract. describe('forwards all provided remoteTools to the AI', () => { - it('binds every tool it receives — scoping is the Runner-fetch contract, not the executor', async () => { - const toolA = new MockRemoteTool({ name: 'tool_a', mcpServerId: 'id-A' }); - const toolB = new MockRemoteTool({ name: 'tool_b', mcpServerId: 'id-A' }); + // Tools are pre-scoped upstream — the executor must not re-filter. Mixing divergent + // mcpServerId values in the input asserts the executor passes every tool through, even + // ones that wouldn't match the step's mcpServerId on their own. + it('binds every tool it receives, including ones whose mcpServerId differs from the step', async () => { + const matchingTool = new MockRemoteTool({ name: 'tool_a', mcpServerId: 'id-A' }); + const offTargetTool = new MockRemoteTool({ name: 'tool_b', mcpServerId: 'id-B' }); const { model, bindTools } = makeMockModel('tool_a', {}); const context = makeContext({ model, @@ -459,7 +459,7 @@ describe('McpStepExecutor', () => { executionType: StepExecutionMode.FullyAutomated, }), }); - const executor = new McpStepExecutor(context, [toolA, toolB]); + const executor = new McpStepExecutor(context, [matchingTool, offTargetTool]); await executor.execute(); diff --git a/packages/workflow-executor/test/integration/workflow-execution.test.ts b/packages/workflow-executor/test/integration/workflow-execution.test.ts index 80f86192e4..0f8be78830 100644 --- a/packages/workflow-executor/test/integration/workflow-execution.test.ts +++ b/packages/workflow-executor/test/integration/workflow-execution.test.ts @@ -583,6 +583,7 @@ describe('workflow execution (integration)', () => { type: StepType.Mcp, executionType: StepExecutionMode.AutomatedWithConfirmation, prompt: 'Send a notification', + mcpServerId: 'mcp-1', }, }); @@ -590,7 +591,9 @@ describe('workflow execution (integration)', () => { getAvailableRun: jest .fn() .mockResolvedValue({ step, auth: { forestServerToken: 'test-forest-token' } }), - getMcpServerConfigs: jest.fn().mockResolvedValue({ 'mcp-1': { url: 'http://fake' } }), + getMcpServerConfigs: jest + .fn() + .mockResolvedValue({ 'mcp-server-1': { id: 'mcp-1', url: 'http://fake' } }), }); const { server, runStore } = createIntegrationSetup({ diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index 29da022729..009a149f53 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -1212,11 +1212,7 @@ describe('MCP lazy loading (via once thunk)', () => { }); }); -// --------------------------------------------------------------------------- -// PRD-363: MCP fetch scoping -// --------------------------------------------------------------------------- - -describe('MCP fetch scoping (PRD-363)', () => { +describe('MCP fetch scoping', () => { it('passes only the matching config to loadRemoteTools when step.mcpServerId is set', async () => { const workflowPort = createMockWorkflowPort(); const aiClient = createMockAiClient(); @@ -1314,9 +1310,7 @@ describe('MCP fetch scoping (PRD-363)', () => { expect(aiClient.loadRemoteTools).not.toHaveBeenCalled(); }); - // Branch A re-entry: the same run can be re-dispatched (chained or triggered after - // awaiting-input). Each dispatch must re-fetch tools scoped to its own step's mcpServerId. - it('scopes loadRemoteTools per dispatch on chained MCP steps (Branch A re-entry)', async () => { + it('re-scopes loadRemoteTools per dispatch when chained MCP steps target different servers', async () => { const workflowPort = createMockWorkflowPort(); const aiClient = createMockAiClient(); const mcpDef = (id: string) => @@ -1337,7 +1331,7 @@ describe('MCP fetch scoping (PRD-363)', () => { stepId: 'step-1', stepIndex: 1, stepType: StepType.Mcp, - stepDefinition: mcpDef('id-A'), + stepDefinition: mcpDef('id-B'), }); workflowPort.getAvailableRun.mockResolvedValueOnce({ step: initial, @@ -1361,7 +1355,7 @@ describe('MCP fetch scoping (PRD-363)', () => { 'server-A': expect.objectContaining({ id: 'id-A' }), }); expect(aiClient.loadRemoteTools).toHaveBeenNthCalledWith(2, { - 'server-A': expect.objectContaining({ id: 'id-A' }), + 'server-B': expect.objectContaining({ id: 'id-B' }), }); }); From fad8801a8e9e2e467a760fd6571e996ba511fa51 Mon Sep 17 00:00:00 2001 From: Brun Christophe Date: Thu, 28 May 2026 17:03:08 +0200 Subject: [PATCH 3/7] fix(workflow-executor): tighten comments per second-pass review Tighten the disambiguation comment on the diagnostic warn log to lead with the WHY, and drop a marginal restatement comment in the legacy- fallback test. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/workflow-executor/src/runner.ts | 4 ++-- packages/workflow-executor/test/runner.test.ts | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index 6ed826965d..945fa40613 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -259,8 +259,8 @@ export default class Runner { ? Object.fromEntries(Object.entries(configs).filter(([, cfg]) => cfg.id === mcpServerId)) : configs; - // Disambiguate "no configs at all" from "configs returned but none matched" — the latter - // indicates the orchestrator did not advertise the server the step references. + // The orchestrator returned configs but none advertised the step's target server — + // surface distinctly from the "no configs at all" case. if (mcpServerId && Object.keys(configs).length > 0 && Object.keys(scoped).length === 0) { this.logger.warn('MCP step targets a server not advertised by the orchestrator', { requestedMcpServerId: mcpServerId, diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index 009a149f53..b538aa6e56 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -1366,7 +1366,6 @@ describe('MCP fetch scoping', () => { runId: 'run-1', stepId: 'step-mcp-1', stepType: StepType.Mcp, - // No mcpServerId on the step definition — preserves legacy behaviour. }); workflowPort.getAvailableRun.mockResolvedValue({ step, From 96ca091f3ee323f6be7d4c5db95ccd295b6927da Mon Sep 17 00:00:00 2001 From: Brun Christophe Date: Thu, 28 May 2026 17:31:27 +0200 Subject: [PATCH 4/7] fix(workflow-executor): harden MCP fetch scoping diagnostics per review - Distinguish "no configs at all" vs "no match" warns in Runner.fetchRemoteTools. - Detect partial load failure (case b): error-log failed sourceIds so ops can tell "wrong target server" from "MCP server down". - Warn about MCP configs lacking an id when mcpServerId is set (partial PRD-360 migration); filter undefined out of availableMcpServerIds payload. - Drop dead loadedMcpServerIds param from NoMcpToolsError; remove duplicate log in McpStepExecutor.requireTools (BaseStepExecutor already logs the error). - Strengthen tests: assert warn payloads in runner.test.ts, scoped call in the integration test, rename stale test in mcp-step-executor.test.ts. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/workflow-executor/src/errors.ts | 6 +- .../src/executors/mcp-step-executor.ts | 15 +- packages/workflow-executor/src/runner.ts | 53 +++++- .../workflow-executor/test/errors.test.ts | 22 +-- .../test/executors/mcp-step-executor.test.ts | 12 +- .../integration/workflow-execution.test.ts | 13 +- .../workflow-executor/test/runner.test.ts | 170 +++++++++++++++++- 7 files changed, 242 insertions(+), 49 deletions(-) diff --git a/packages/workflow-executor/src/errors.ts b/packages/workflow-executor/src/errors.ts index 4a8038a610..8a8a8cf49c 100644 --- a/packages/workflow-executor/src/errors.ts +++ b/packages/workflow-executor/src/errors.ts @@ -210,11 +210,9 @@ export class StepTimeoutError extends WorkflowExecutorError { } export class NoMcpToolsError extends WorkflowExecutorError { - constructor(requestedMcpServerId?: string, loadedMcpServerIds?: readonly string[]) { + constructor(requestedMcpServerId?: string) { const technical = requestedMcpServerId - ? `No MCP tools available for mcpServerId="${requestedMcpServerId}". Loaded MCP server ids: [${( - loadedMcpServerIds ?? [] - ).join(', ')}]` + ? `No MCP tools available for mcpServerId="${requestedMcpServerId}"` : 'No MCP tools available'; super(technical, 'No tools are available to execute this step.'); } diff --git a/packages/workflow-executor/src/executors/mcp-step-executor.ts b/packages/workflow-executor/src/executors/mcp-step-executor.ts index 714511e45c..c42190b018 100644 --- a/packages/workflow-executor/src/executors/mcp-step-executor.ts +++ b/packages/workflow-executor/src/executors/mcp-step-executor.ts @@ -226,19 +226,12 @@ export default class McpStepExecutor extends BaseStepExecutor } // Tools are pre-scoped to step.mcpServerId upstream; the executor only asserts non-empty. - // An empty list means either no config matched the step's mcpServerId, or every per-server - // connection failed at load time. + // An empty list means either no config matched the step's mcpServerId, or the per-server MCP + // connection failed at load time (McpClient swallows per-server load errors). The runner logs + // the diagnostic at the right layer; here we rely on BaseStepExecutor's catch-and-log. private requireTools(): RemoteTool[] { if (this.remoteTools.length === 0) { - const { mcpServerId } = this.context.stepDefinition; - const error = new NoMcpToolsError(mcpServerId, []); - this.context.logger.error(error.message, { - runId: this.context.runId, - stepId: this.context.stepId, - stepIndex: this.context.stepIndex, - requestedMcpServerId: mcpServerId, - }); - throw error; + throw new NoMcpToolsError(this.context.stepDefinition.mcpServerId); } return [...this.remoteTools]; diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index 945fa40613..0cc9dc4ed6 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -255,22 +255,59 @@ export default class Runner { private async fetchRemoteTools(mcpServerId?: string): Promise { const configs = await this.config.workflowPort.getMcpServerConfigs(); + if (mcpServerId) { + // Configs without id cannot be matched against a defined mcpServerId. Surface them so a + // partial PRD-360 migration doesn't masquerade as "wrong target server" downstream. + const unidentifiedConfigNames = Object.entries(configs) + .filter(([, cfg]) => cfg.id === undefined) + .map(([name]) => name); + + if (unidentifiedConfigNames.length > 0) { + this.logger.warn('MCP configs without id cannot be scoped — check orchestrator migration', { + requestedMcpServerId: mcpServerId, + unidentifiedConfigNames, + }); + } + } + const scoped = mcpServerId ? Object.fromEntries(Object.entries(configs).filter(([, cfg]) => cfg.id === mcpServerId)) : configs; - // The orchestrator returned configs but none advertised the step's target server — - // surface distinctly from the "no configs at all" case. - if (mcpServerId && Object.keys(configs).length > 0 && Object.keys(scoped).length === 0) { - this.logger.warn('MCP step targets a server not advertised by the orchestrator', { - requestedMcpServerId: mcpServerId, - availableMcpServerIds: Object.values(configs).map(cfg => cfg.id), - }); + if (mcpServerId && Object.keys(scoped).length === 0) { + const availableMcpServerIds = Object.values(configs) + .map(cfg => cfg.id) + .filter((id): id is string => Boolean(id)); + + // Distinguish "no configs at all" (deployment misconfig) from "configs exist but none + // match" (orchestrator/executor drift on server id) — both yield zero tools, but ops + // need to know which one to fix. + this.logger.warn( + Object.keys(configs).length === 0 + ? 'MCP step targets a server but orchestrator returned no MCP configs' + : 'MCP step targets a server not advertised by the orchestrator', + { requestedMcpServerId: mcpServerId, availableMcpServerIds }, + ); } if (Object.keys(scoped).length === 0) return []; - return this.config.aiModelPort.loadRemoteTools(scoped); + const tools = await this.config.aiModelPort.loadRemoteTools(scoped); + + // Partial-failure detection: McpClient swallows per-server load errors and returns whatever + // succeeded. Compare scoped keys to the tools' sourceIds so ops can tell "wrong config" + // from "MCP server down". + const loadedSourceIds = new Set(tools.map(t => t.sourceId)); + const failedSourceIds = Object.keys(scoped).filter(name => !loadedSourceIds.has(name)); + + if (failedSourceIds.length > 0) { + this.logger.error('MCP servers failed to load tools', { + requestedMcpServerId: mcpServerId ?? null, + failedSourceIds, + }); + } + + return tools; } private executeStep( diff --git a/packages/workflow-executor/test/errors.test.ts b/packages/workflow-executor/test/errors.test.ts index aa01a0423e..b4c0883712 100644 --- a/packages/workflow-executor/test/errors.test.ts +++ b/packages/workflow-executor/test/errors.test.ts @@ -78,30 +78,16 @@ describe('NoMcpToolsError', () => { }); it('includes the requested mcpServerId in the technical message when a filter was active', () => { - const err = new NoMcpToolsError('id-missing', ['id-A', 'id-B']); + const err = new NoMcpToolsError('id-missing'); - expect(err.message).toMatch(/id-missing/); - }); - - it('lists the loaded mcpServerIds in the technical message so misconfigurations are diagnosable', () => { - const err = new NoMcpToolsError('id-missing', ['id-A', 'id-B']); - - expect(err.message).toMatch(/id-A/); - expect(err.message).toMatch(/id-B/); - }); - - it('handles an empty loaded-id list without producing a malformed message', () => { - const err = new NoMcpToolsError('id-missing', []); - - expect(err.message).toMatch(/id-missing/); - expect(err.message).not.toMatch(/undefined|null|\[object/i); + expect(err.message).toBe('No MCP tools available for mcpServerId="id-missing"'); }); it('keeps the user-facing message generic — no internal ids must leak', () => { - const err = new NoMcpToolsError('id-missing', ['id-A', 'id-B']); + const err = new NoMcpToolsError('id-missing'); expect(err.userMessage).toBe('No tools are available to execute this step.'); - expect(err.userMessage).not.toMatch(/id-missing|id-A|id-B/); + expect(err.userMessage).not.toMatch(/id-missing/); }); }); diff --git a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts index b2ab9a4234..0352729ea8 100644 --- a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts @@ -505,7 +505,7 @@ describe('McpStepExecutor', () => { expect(result.stepOutcome.error).toBe('No tools are available to execute this step.'); }); - it('keeps the user-facing error message generic regardless of the misconfigured mcpServerId', async () => { + it('keeps the user-facing error message generic when no tools are available', async () => { const context = makeContext({ stepDefinition: makeStep({ mcpServerId: 'id-B' }) }); const executor = new McpStepExecutor(context, []); @@ -525,9 +525,15 @@ describe('McpStepExecutor', () => { await executor.execute(); + // BaseStepExecutor catches NoMcpToolsError and logs error.message (which encodes the + // requested mcpServerId) along with the step correlation context. expect(logger.error).toHaveBeenCalledWith( - expect.stringMatching(/id-missing/), - expect.objectContaining({ requestedMcpServerId: 'id-missing' }), + 'No MCP tools available for mcpServerId="id-missing"', + expect.objectContaining({ + runId: expect.any(String), + stepId: expect.any(String), + stepIndex: expect.any(Number), + }), ); }); }); diff --git a/packages/workflow-executor/test/integration/workflow-execution.test.ts b/packages/workflow-executor/test/integration/workflow-execution.test.ts index 0f8be78830..e689daf96e 100644 --- a/packages/workflow-executor/test/integration/workflow-execution.test.ts +++ b/packages/workflow-executor/test/integration/workflow-execution.test.ts @@ -591,9 +591,12 @@ describe('workflow execution (integration)', () => { getAvailableRun: jest .fn() .mockResolvedValue({ step, auth: { forestServerToken: 'test-forest-token' } }), - getMcpServerConfigs: jest - .fn() - .mockResolvedValue({ 'mcp-server-1': { id: 'mcp-1', url: 'http://fake' } }), + // Two configs but only one matches step.mcpServerId — the assertion below proves + // Runner.fetchRemoteTools actually scopes the Record before calling loadRemoteTools. + getMcpServerConfigs: jest.fn().mockResolvedValue({ + 'mcp-server-1': { id: 'mcp-1', url: 'http://fake' }, + 'mcp-server-2': { id: 'mcp-2', url: 'http://other' }, + }), }); const { server, runStore } = createIntegrationSetup({ @@ -629,6 +632,10 @@ describe('workflow execution (integration)', () => { 'run-1', expect.objectContaining({ type: 'mcp', status: 'success' }), ); + // Scoping must reach the AI port — only the matching server is forwarded, not the full map. + expect(aiClient.loadRemoteTools).toHaveBeenCalledWith({ + 'mcp-server-1': expect.objectContaining({ id: 'mcp-1' }), + }); }); // ------------------------------------------------------------------------- diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index b538aa6e56..a0fcb367d5 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -1280,9 +1280,10 @@ describe('MCP fetch scoping', () => { }); }); - it('skips loadRemoteTools when no config matches the step.mcpServerId', async () => { + it('skips loadRemoteTools and warns with availableMcpServerIds when no config matches', async () => { const workflowPort = createMockWorkflowPort(); const aiClient = createMockAiClient(); + const logger = createMockLogger(); const step = makePendingStep({ runId: 'run-1', stepId: 'step-mcp-1', @@ -1299,15 +1300,180 @@ describe('MCP fetch scoping', () => { }); workflowPort.getMcpServerConfigs.mockResolvedValue({ 'server-A': { id: 'id-A', url: 'https://a.example', type: 'http', headers: {} }, + 'server-B': { id: 'id-B', url: 'https://b.example', type: 'http', headers: {} }, }); runner = new Runner( - createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), + createRunnerConfig({ + workflowPort, + aiModelPort: aiClient as unknown as AiModelPort, + logger, + }), ); await runner.triggerPoll('run-1'); expect(workflowPort.getMcpServerConfigs).toHaveBeenCalledTimes(1); expect(aiClient.loadRemoteTools).not.toHaveBeenCalled(); + expect(logger.warn).toHaveBeenCalledWith( + 'MCP step targets a server not advertised by the orchestrator', + { + requestedMcpServerId: 'id-missing', + availableMcpServerIds: expect.arrayContaining(['id-A', 'id-B']), + }, + ); + }); + + it('warns distinctly when orchestrator returns no MCP configs at all', async () => { + const workflowPort = createMockWorkflowPort(); + const aiClient = createMockAiClient(); + const logger = createMockLogger(); + const step = makePendingStep({ + runId: 'run-1', + stepId: 'step-mcp-1', + stepType: StepType.Mcp, + stepDefinition: { + type: StepType.Mcp, + executionType: StepExecutionMode.AutomatedWithConfirmation, + mcpServerId: 'id-A', + }, + }); + workflowPort.getAvailableRun.mockResolvedValue({ + step, + auth: { forestServerToken: 'test-forest-token' }, + }); + workflowPort.getMcpServerConfigs.mockResolvedValue({}); + + runner = new Runner( + createRunnerConfig({ + workflowPort, + aiModelPort: aiClient as unknown as AiModelPort, + logger, + }), + ); + await runner.triggerPoll('run-1'); + + expect(aiClient.loadRemoteTools).not.toHaveBeenCalled(); + expect(logger.warn).toHaveBeenCalledWith( + 'MCP step targets a server but orchestrator returned no MCP configs', + { requestedMcpServerId: 'id-A', availableMcpServerIds: [] }, + ); + expect(logger.warn).not.toHaveBeenCalledWith( + 'MCP step targets a server not advertised by the orchestrator', + expect.anything(), + ); + }); + + it('omits undefined ids from availableMcpServerIds and warns about unidentified configs', async () => { + const workflowPort = createMockWorkflowPort(); + const aiClient = createMockAiClient(); + const logger = createMockLogger(); + const step = makePendingStep({ + runId: 'run-1', + stepId: 'step-mcp-1', + stepType: StepType.Mcp, + stepDefinition: { + type: StepType.Mcp, + executionType: StepExecutionMode.AutomatedWithConfirmation, + mcpServerId: 'id-missing', + }, + }); + workflowPort.getAvailableRun.mockResolvedValue({ + step, + auth: { forestServerToken: 'test-forest-token' }, + }); + workflowPort.getMcpServerConfigs.mockResolvedValue({ + 'server-A': { id: 'id-A', url: 'https://a.example', type: 'http', headers: {} }, + 'legacy-server': { url: 'https://legacy.example', type: 'http', headers: {} }, + }); + + runner = new Runner( + createRunnerConfig({ + workflowPort, + aiModelPort: aiClient as unknown as AiModelPort, + logger, + }), + ); + await runner.triggerPoll('run-1'); + + expect(logger.warn).toHaveBeenCalledWith( + 'MCP configs without id cannot be scoped — check orchestrator migration', + { requestedMcpServerId: 'id-missing', unidentifiedConfigNames: ['legacy-server'] }, + ); + expect(logger.warn).toHaveBeenCalledWith( + 'MCP step targets a server not advertised by the orchestrator', + { requestedMcpServerId: 'id-missing', availableMcpServerIds: ['id-A'] }, + ); + }); + + it('does not warn about unidentified configs in legacy fallback (no step.mcpServerId)', async () => { + const workflowPort = createMockWorkflowPort(); + const aiClient = createMockAiClient(); + const logger = createMockLogger(); + const step = makePendingStep({ + runId: 'run-1', + stepId: 'step-mcp-1', + stepType: StepType.Mcp, + }); + workflowPort.getAvailableRun.mockResolvedValue({ + step, + auth: { forestServerToken: 'test-forest-token' }, + }); + workflowPort.getMcpServerConfigs.mockResolvedValue({ + 'legacy-server': { url: 'https://legacy.example', type: 'http' as const, headers: {} }, + }); + + runner = new Runner( + createRunnerConfig({ + workflowPort, + aiModelPort: aiClient as unknown as AiModelPort, + logger, + }), + ); + await runner.triggerPoll('run-1'); + + expect(logger.warn).not.toHaveBeenCalledWith( + 'MCP configs without id cannot be scoped — check orchestrator migration', + expect.anything(), + ); + }); + + it('logs an error listing failed sourceIds when loadRemoteTools returns fewer tools than scoped entries', async () => { + const workflowPort = createMockWorkflowPort(); + const aiClient = createMockAiClient(); + const logger = createMockLogger(); + const step = makePendingStep({ + runId: 'run-1', + stepId: 'step-mcp-1', + stepType: StepType.Mcp, + stepDefinition: { + type: StepType.Mcp, + executionType: StepExecutionMode.AutomatedWithConfirmation, + mcpServerId: 'id-A', + }, + }); + workflowPort.getAvailableRun.mockResolvedValue({ + step, + auth: { forestServerToken: 'test-forest-token' }, + }); + workflowPort.getMcpServerConfigs.mockResolvedValue({ + 'server-A': { id: 'id-A', url: 'https://a.example', type: 'http', headers: {} }, + }); + // McpClient swallows per-server load errors — simulate the empty-result case here. + aiClient.loadRemoteTools.mockResolvedValue([]); + + runner = new Runner( + createRunnerConfig({ + workflowPort, + aiModelPort: aiClient as unknown as AiModelPort, + logger, + }), + ); + await runner.triggerPoll('run-1'); + + expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', { + requestedMcpServerId: 'id-A', + failedSourceIds: ['server-A'], + }); }); it('re-scopes loadRemoteTools per dispatch when chained MCP steps target different servers', async () => { From 3b5632c1a33282bf2a9d4eb2b76e5663502e439c Mon Sep 17 00:00:00 2001 From: Brun Christophe Date: Fri, 29 May 2026 11:40:33 +0200 Subject: [PATCH 5/7] refactor(workflow-executor): extract RemoteToolFetcher and skip Forest connectors in partial-failure check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Splits MCP fetch concern out of Runner into a dedicated module with one helper per diagnostic, shrinking Runner by ~50 lines and isolating the scoping logic for unit testing. errorOnPartialLoadFailure now excludes Forest integrations from the sourceId-vs-Record-key comparison: Forest connectors carry a hardcoded sourceId (zendesk/kolar/snowflake/...) that does not match the Record key, so a happy-path load was being reported as failed. Field renamed failedSourceIds -> failedConfigNames to surface the right namespace. Adds a genuine partial-failure runner test (survivor + failed) that locks the sourceId-vs-key semantics end-to-end by inspecting the captured McpStepExecutor instance — needed because the global BaseStepExecutor.execute spy hides downstream behaviour otherwise. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/remote-tool-fetcher.ts | 105 +++++++ packages/workflow-executor/src/runner.ts | 69 +--- .../test/remote-tool-fetcher.test.ts | 297 ++++++++++++++++++ .../workflow-executor/test/runner.test.ts | 62 +++- 4 files changed, 470 insertions(+), 63 deletions(-) create mode 100644 packages/workflow-executor/src/remote-tool-fetcher.ts create mode 100644 packages/workflow-executor/test/remote-tool-fetcher.test.ts diff --git a/packages/workflow-executor/src/remote-tool-fetcher.ts b/packages/workflow-executor/src/remote-tool-fetcher.ts new file mode 100644 index 0000000000..3df4af214b --- /dev/null +++ b/packages/workflow-executor/src/remote-tool-fetcher.ts @@ -0,0 +1,105 @@ +import type { AiModelPort } from './ports/ai-model-port'; +import type { Logger } from './ports/logger-port'; +import type { WorkflowPort } from './ports/workflow-port'; +import type { RemoteTool, ToolConfig } from '@forestadmin/ai-proxy'; + +import { isForestIntegrationConfig } from '@forestadmin/ai-proxy'; + +// Match by config.id, not by Record key: server names can collide across configs. +export function scopeConfigsToServer( + configs: Record, + mcpServerId: string, +): Record { + return Object.fromEntries(Object.entries(configs).filter(([, cfg]) => cfg.id === mcpServerId)); +} + +export default class RemoteToolFetcher { + private readonly workflowPort: WorkflowPort; + private readonly aiModelPort: AiModelPort; + private readonly logger: Logger; + + constructor(workflowPort: WorkflowPort, aiModelPort: AiModelPort, logger: Logger) { + this.workflowPort = workflowPort; + this.aiModelPort = aiModelPort; + this.logger = logger; + } + + async fetch(mcpServerId?: string): Promise { + const configs = await this.workflowPort.getMcpServerConfigs(); + const scoped = mcpServerId ? scopeConfigsToServer(configs, mcpServerId) : configs; + + if (mcpServerId) { + this.warnUnidentifiedConfigs(configs, mcpServerId); + this.warnMissingTargetServer(configs, scoped, mcpServerId); + } + + if (Object.keys(scoped).length === 0) return []; + + const tools = await this.aiModelPort.loadRemoteTools(scoped); + + this.errorOnPartialLoadFailure(scoped, tools, mcpServerId); + + return tools; + } + + // Configs without id cannot be matched against a defined mcpServerId. Surface them so a + // partial PRD-360 migration doesn't masquerade as "wrong target server" downstream. + private warnUnidentifiedConfigs(configs: Record, mcpServerId: string): void { + const unidentifiedConfigNames = Object.entries(configs) + .filter(([, cfg]) => cfg.id === undefined) + .map(([name]) => name); + + if (unidentifiedConfigNames.length === 0) return; + + this.logger.warn('MCP configs without id cannot be scoped — check orchestrator migration', { + requestedMcpServerId: mcpServerId, + unidentifiedConfigNames, + }); + } + + // Distinguish "no configs at all" (deployment misconfig) from "configs exist but none match" + // (orchestrator/executor drift on server id) — both yield zero tools, but ops need to know + // which one to fix. + private warnMissingTargetServer( + configs: Record, + scoped: Record, + mcpServerId: string, + ): void { + if (Object.keys(scoped).length > 0) return; + + const availableMcpServerIds = Object.values(configs) + .map(cfg => cfg.id) + .filter((id): id is string => Boolean(id)); + + this.logger.warn( + Object.keys(configs).length === 0 + ? 'MCP step targets a server but orchestrator returned no MCP configs' + : 'MCP step targets a server not advertised by the orchestrator', + { requestedMcpServerId: mcpServerId, availableMcpServerIds }, + ); + } + + // Partial-failure detection: McpClient swallows per-server load errors and returns whatever + // succeeded. Compare scoped keys to the tools' sourceIds so ops can tell "wrong config" from + // "MCP server down". Only valid for MCP configs — Forest integrations carry a hardcoded + // sourceId (e.g. 'zendesk') that does not match the Record key, so any successfully-loaded + // Forest connector keyed otherwise would always be flagged as failed. + private errorOnPartialLoadFailure( + scoped: Record, + tools: RemoteTool[], + mcpServerId: string | undefined, + ): void { + const loadedSourceIds = new Set(tools.map(t => t.sourceId)); + const failedConfigNames = Object.entries(scoped) + .filter(([, cfg]) => !isForestIntegrationConfig(cfg)) + .map(([name]) => name) + .filter(name => !loadedSourceIds.has(name)); + + if (failedConfigNames.length === 0) return; + + this.logger.error('MCP servers failed to load tools', { + requestedMcpServerId: mcpServerId ?? null, + failedConfigNames, + }); + } +} diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index 0cc9dc4ed6..c950b19f95 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -9,7 +9,6 @@ import type SchemaCache from './schema-cache'; import type { AvailableStepExecution, StepExecutionResult } from './types/execution-context'; import type { StepExecutionData } from './types/step-execution-data'; import type { StepOutcome } from './types/validated/step-outcome'; -import type { RemoteTool } from '@forestadmin/ai-proxy'; import ConsoleLogger from './adapters/console-logger'; import { DEFAULT_MAX_CHAIN_DEPTH, DEFAULT_STOP_TIMEOUT_MS } from './defaults'; @@ -22,6 +21,7 @@ import { } from './errors'; import StepExecutorFactory from './executors/step-executor-factory'; import InFlightRunRegistry from './in-flight-run-registry'; +import RemoteToolFetcher from './remote-tool-fetcher'; import { stepTypeToOutcomeType } from './types/validated/step-outcome'; import validateSecrets from './validate-secrets'; @@ -52,11 +52,17 @@ export default class Runner { private pollingTimer: NodeJS.Timeout | null = null; private readonly inFlightRuns = new InFlightRunRegistry(); private readonly logger: Logger; + private readonly remoteToolFetcher: RemoteToolFetcher; private _state: RunnerState = 'idle'; constructor(config: RunnerConfig) { this.config = config; this.logger = config.logger ?? new ConsoleLogger(); + this.remoteToolFetcher = new RemoteToolFetcher( + config.workflowPort, + config.aiModelPort, + this.logger, + ); } get state(): RunnerState { @@ -251,65 +257,6 @@ export default class Runner { } } - // Match by config.id, not by Record key: server names can collide across configs. - private async fetchRemoteTools(mcpServerId?: string): Promise { - const configs = await this.config.workflowPort.getMcpServerConfigs(); - - if (mcpServerId) { - // Configs without id cannot be matched against a defined mcpServerId. Surface them so a - // partial PRD-360 migration doesn't masquerade as "wrong target server" downstream. - const unidentifiedConfigNames = Object.entries(configs) - .filter(([, cfg]) => cfg.id === undefined) - .map(([name]) => name); - - if (unidentifiedConfigNames.length > 0) { - this.logger.warn('MCP configs without id cannot be scoped — check orchestrator migration', { - requestedMcpServerId: mcpServerId, - unidentifiedConfigNames, - }); - } - } - - const scoped = mcpServerId - ? Object.fromEntries(Object.entries(configs).filter(([, cfg]) => cfg.id === mcpServerId)) - : configs; - - if (mcpServerId && Object.keys(scoped).length === 0) { - const availableMcpServerIds = Object.values(configs) - .map(cfg => cfg.id) - .filter((id): id is string => Boolean(id)); - - // Distinguish "no configs at all" (deployment misconfig) from "configs exist but none - // match" (orchestrator/executor drift on server id) — both yield zero tools, but ops - // need to know which one to fix. - this.logger.warn( - Object.keys(configs).length === 0 - ? 'MCP step targets a server but orchestrator returned no MCP configs' - : 'MCP step targets a server not advertised by the orchestrator', - { requestedMcpServerId: mcpServerId, availableMcpServerIds }, - ); - } - - if (Object.keys(scoped).length === 0) return []; - - const tools = await this.config.aiModelPort.loadRemoteTools(scoped); - - // Partial-failure detection: McpClient swallows per-server load errors and returns whatever - // succeeded. Compare scoped keys to the tools' sourceIds so ops can tell "wrong config" - // from "MCP server down". - const loadedSourceIds = new Set(tools.map(t => t.sourceId)); - const failedSourceIds = Object.keys(scoped).filter(name => !loadedSourceIds.has(name)); - - if (failedSourceIds.length > 0) { - this.logger.error('MCP servers failed to load tools', { - requestedMcpServerId: mcpServerId ?? null, - failedSourceIds, - }); - } - - return tools; - } - private executeStep( step: AvailableStepExecution, forestServerToken: string, @@ -347,7 +294,7 @@ export default class Runner { currentStep, this.contextConfig, this.config.activityLogPortFactory.forRun(currentToken), - (mcpServerId?: string) => this.fetchRemoteTools(mcpServerId), + mcpServerId => this.remoteToolFetcher.fetch(mcpServerId), currentIncomingData, ); result = await executor.execute(); diff --git a/packages/workflow-executor/test/remote-tool-fetcher.test.ts b/packages/workflow-executor/test/remote-tool-fetcher.test.ts new file mode 100644 index 0000000000..cc9dda3f45 --- /dev/null +++ b/packages/workflow-executor/test/remote-tool-fetcher.test.ts @@ -0,0 +1,297 @@ +import type { AiModelPort } from '../src/ports/ai-model-port'; +import type { Logger } from '../src/ports/logger-port'; +import type { WorkflowPort } from '../src/ports/workflow-port'; +import type { RemoteTool, ToolConfig } from '@forestadmin/ai-proxy'; + +import RemoteToolFetcher, { scopeConfigsToServer } from '../src/remote-tool-fetcher'; + +function createMockWorkflowPort(): jest.Mocked> { + return { getMcpServerConfigs: jest.fn().mockResolvedValue({}) }; +} + +function createMockAiModelPort(): jest.Mocked> { + return { loadRemoteTools: jest.fn().mockResolvedValue([]) }; +} + +function createMockLogger(): jest.Mocked> { + return { info: jest.fn(), warn: jest.fn(), error: jest.fn() }; +} + +function makeRemoteTool(sourceId: string): RemoteTool { + return { sourceId } as unknown as RemoteTool; +} + +function makeFetcher(overrides?: { + workflowPort?: Partial>>; + aiModelPort?: Partial>>; + logger?: jest.Mocked>; +}) { + const workflowPort = { ...createMockWorkflowPort(), ...overrides?.workflowPort }; + const aiModelPort = { ...createMockAiModelPort(), ...overrides?.aiModelPort }; + const logger = overrides?.logger ?? createMockLogger(); + const fetcher = new RemoteToolFetcher( + workflowPort as unknown as WorkflowPort, + aiModelPort as unknown as AiModelPort, + logger, + ); + + return { fetcher, workflowPort, aiModelPort, logger }; +} + +const cfg = (id: string | undefined): ToolConfig => + ({ id, url: 'https://x.example', type: 'http' as const, headers: {} } as unknown as ToolConfig); + +// --------------------------------------------------------------------------- +// scopeConfigsToServer (pure) +// --------------------------------------------------------------------------- + +describe('scopeConfigsToServer', () => { + it('keeps only entries whose config.id matches the requested mcpServerId', () => { + const configs = { 'srv-a': cfg('id-A'), 'srv-b': cfg('id-B') }; + + expect(scopeConfigsToServer(configs, 'id-A')).toEqual({ 'srv-a': cfg('id-A') }); + }); + + // Matching by Record key would let a renamed server collide with another config's id. + it('matches by config.id, not by Record key', () => { + const configs = { 'server-A': cfg('id-A'), 'server-B': cfg('server-A') }; + + expect(scopeConfigsToServer(configs, 'server-A')).toEqual({ + 'server-B': cfg('server-A'), + }); + }); + + it('returns an empty object when no config matches', () => { + const configs = { 'srv-a': cfg('id-A') }; + + expect(scopeConfigsToServer(configs, 'id-missing')).toEqual({}); + }); + + it('skips entries with undefined id even when the requested id is undefined-like', () => { + const configs = { legacy: cfg(undefined), 'srv-a': cfg('id-A') }; + + // Undefined-id entries are never scoped in; the legacy fallback bypasses scoping entirely. + expect(scopeConfigsToServer(configs, 'id-A')).toEqual({ 'srv-a': cfg('id-A') }); + }); +}); + +// --------------------------------------------------------------------------- +// RemoteToolFetcher.fetch +// --------------------------------------------------------------------------- + +describe('RemoteToolFetcher.fetch', () => { + it('passes the full Record to loadRemoteTools when mcpServerId is undefined (legacy)', async () => { + const configs = { 'srv-a': cfg('id-A'), 'srv-b': cfg('id-B') }; + const { fetcher, aiModelPort } = makeFetcher({ + workflowPort: { getMcpServerConfigs: jest.fn().mockResolvedValue(configs) }, + }); + + await fetcher.fetch(); + + expect(aiModelPort.loadRemoteTools).toHaveBeenCalledWith(configs); + }); + + it('passes only the matching config to loadRemoteTools when mcpServerId is set', async () => { + const { fetcher, aiModelPort } = makeFetcher({ + workflowPort: { + getMcpServerConfigs: jest + .fn() + .mockResolvedValue({ 'srv-a': cfg('id-A'), 'srv-b': cfg('id-B') }), + }, + }); + + await fetcher.fetch('id-A'); + + expect(aiModelPort.loadRemoteTools).toHaveBeenCalledWith({ 'srv-a': cfg('id-A') }); + }); + + it('returns an empty array and skips loadRemoteTools when the scoped Record is empty', async () => { + const { fetcher, aiModelPort } = makeFetcher({ + workflowPort: { getMcpServerConfigs: jest.fn().mockResolvedValue({}) }, + }); + + const tools = await fetcher.fetch('id-A'); + + expect(tools).toEqual([]); + expect(aiModelPort.loadRemoteTools).not.toHaveBeenCalled(); + }); + + it('warns about unidentified configs before reporting the missing target server', async () => { + const { fetcher, logger } = makeFetcher({ + workflowPort: { + getMcpServerConfigs: jest + .fn() + .mockResolvedValue({ 'srv-a': cfg('id-A'), legacy: cfg(undefined) }), + }, + }); + + await fetcher.fetch('id-missing'); + + expect(logger.warn).toHaveBeenCalledWith( + 'MCP configs without id cannot be scoped — check orchestrator migration', + { requestedMcpServerId: 'id-missing', unidentifiedConfigNames: ['legacy'] }, + ); + expect(logger.warn).toHaveBeenCalledWith( + 'MCP step targets a server not advertised by the orchestrator', + { requestedMcpServerId: 'id-missing', availableMcpServerIds: ['id-A'] }, + ); + }); + + it('does not warn about unidentified configs when no mcpServerId is set (legacy fallback)', async () => { + const { fetcher, logger } = makeFetcher({ + workflowPort: { + getMcpServerConfigs: jest.fn().mockResolvedValue({ legacy: cfg(undefined) }), + }, + }); + + await fetcher.fetch(); + + expect(logger.warn).not.toHaveBeenCalledWith( + 'MCP configs without id cannot be scoped — check orchestrator migration', + expect.anything(), + ); + }); + + it('warns distinctly when orchestrator returns no MCP configs at all', async () => { + const { fetcher, logger } = makeFetcher({ + workflowPort: { getMcpServerConfigs: jest.fn().mockResolvedValue({}) }, + }); + + await fetcher.fetch('id-A'); + + expect(logger.warn).toHaveBeenCalledWith( + 'MCP step targets a server but orchestrator returned no MCP configs', + { requestedMcpServerId: 'id-A', availableMcpServerIds: [] }, + ); + expect(logger.warn).not.toHaveBeenCalledWith( + 'MCP step targets a server not advertised by the orchestrator', + expect.anything(), + ); + }); + + it('does not warn about the missing target when the scoped Record is non-empty', async () => { + const { fetcher, logger } = makeFetcher({ + workflowPort: { + getMcpServerConfigs: jest.fn().mockResolvedValue({ 'srv-a': cfg('id-A') }), + }, + }); + + await fetcher.fetch('id-A'); + + expect(logger.warn).not.toHaveBeenCalled(); + }); + + it('logs an error listing failed config names when loadRemoteTools returns fewer tools than scoped entries', async () => { + const { fetcher, logger } = makeFetcher({ + workflowPort: { + getMcpServerConfigs: jest + .fn() + .mockResolvedValue({ 'srv-a': cfg('id-A'), 'srv-b': cfg('id-A') }), + }, + aiModelPort: { + loadRemoteTools: jest.fn().mockResolvedValue([makeRemoteTool('srv-a')]), + }, + }); + + await fetcher.fetch('id-A'); + + expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', { + requestedMcpServerId: 'id-A', + failedConfigNames: ['srv-b'], + }); + }); + + it('reports requestedMcpServerId as null in the partial-failure log for the legacy fallback', async () => { + const { fetcher, logger } = makeFetcher({ + workflowPort: { + getMcpServerConfigs: jest.fn().mockResolvedValue({ 'srv-a': cfg('id-A') }), + }, + aiModelPort: { loadRemoteTools: jest.fn().mockResolvedValue([]) }, + }); + + await fetcher.fetch(); + + expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', { + requestedMcpServerId: null, + failedConfigNames: ['srv-a'], + }); + }); + + it('does not log a partial-failure error when every scoped entry produced a tool', async () => { + const { fetcher, logger } = makeFetcher({ + workflowPort: { + getMcpServerConfigs: jest.fn().mockResolvedValue({ 'srv-a': cfg('id-A') }), + }, + aiModelPort: { + loadRemoteTools: jest.fn().mockResolvedValue([makeRemoteTool('srv-a')]), + }, + }); + + await fetcher.fetch('id-A'); + + expect(logger.error).not.toHaveBeenCalled(); + }); + + // Forest integrations set sourceId to a hardcoded literal ('zendesk', 'snowflake', ...) that + // does not necessarily match the Record key — a sourceId-vs-key comparison would always flag + // them as failed on the happy path. + it('does not flag a Forest integration whose sourceId differs from the Record key', async () => { + const forestConfig = { + isForestConnector: true as const, + name: 'zendesk', + } as unknown as ToolConfig; + const { fetcher, logger } = makeFetcher({ + workflowPort: { + getMcpServerConfigs: jest.fn().mockResolvedValue({ 'zendesk-prod': forestConfig }), + }, + aiModelPort: { + loadRemoteTools: jest.fn().mockResolvedValue([makeRemoteTool('zendesk')]), + }, + }); + + await fetcher.fetch(); + + expect(logger.error).not.toHaveBeenCalled(); + }); + + it('flags only the MCP config when a Forest connector and a failed MCP entry coexist', async () => { + const forestConfig = { + isForestConnector: true as const, + name: 'zendesk', + } as unknown as ToolConfig; + const { fetcher, logger } = makeFetcher({ + workflowPort: { + getMcpServerConfigs: jest.fn().mockResolvedValue({ + 'zendesk-prod': forestConfig, + 'srv-a': cfg('id-A'), + }), + }, + aiModelPort: { + loadRemoteTools: jest.fn().mockResolvedValue([makeRemoteTool('zendesk')]), + }, + }); + + await fetcher.fetch(); + + expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', { + requestedMcpServerId: null, + failedConfigNames: ['srv-a'], + }); + }); + + it('returns the tools produced by loadRemoteTools verbatim', async () => { + const remoteTools = [makeRemoteTool('srv-a'), makeRemoteTool('srv-b')]; + const { fetcher } = makeFetcher({ + workflowPort: { + getMcpServerConfigs: jest + .fn() + .mockResolvedValue({ 'srv-a': cfg('id-A'), 'srv-b': cfg('id-A') }), + }, + aiModelPort: { loadRemoteTools: jest.fn().mockResolvedValue(remoteTools) }, + }); + + const result = await fetcher.fetch('id-A'); + + expect(result).toBe(remoteTools); + }); +}); diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index a0fcb367d5..25701c5aac 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -1437,7 +1437,7 @@ describe('MCP fetch scoping', () => { ); }); - it('logs an error listing failed sourceIds when loadRemoteTools returns fewer tools than scoped entries', async () => { + it('logs an error listing failed config names when loadRemoteTools returns fewer tools than scoped entries', async () => { const workflowPort = createMockWorkflowPort(); const aiClient = createMockAiClient(); const logger = createMockLogger(); @@ -1472,10 +1472,68 @@ describe('MCP fetch scoping', () => { expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', { requestedMcpServerId: 'id-A', - failedSourceIds: ['server-A'], + failedConfigNames: ['server-A'], }); }); + // Locks the sourceId-vs-key semantics of errorOnPartialLoadFailure end-to-end: only the + // failed MCP config is flagged while the survivor reaches the executor. The all-failed + // variant above can't lock this — it would route to NoMcpToolsError regardless of how the + // log builds its config list. + it('flags only the failed config and proceeds with the survivor on a genuine partial failure', async () => { + const workflowPort = createMockWorkflowPort(); + const aiClient = createMockAiClient(); + const logger = createMockLogger(); + const step = makePendingStep({ + runId: 'run-1', + stepId: 'step-mcp-1', + stepType: StepType.Mcp, + stepDefinition: { + type: StepType.Mcp, + executionType: StepExecutionMode.AutomatedWithConfirmation, + mcpServerId: 'id-A', + }, + }); + workflowPort.getAvailableRun.mockResolvedValue({ + step, + auth: { forestServerToken: 'test-forest-token' }, + }); + workflowPort.getMcpServerConfigs.mockResolvedValue({ + 'server-A': { id: 'id-A', url: 'https://a.example', type: 'http', headers: {} }, + 'server-B': { id: 'id-A', url: 'https://b.example', type: 'http', headers: {} }, + }); + // McpClient swallows per-server load errors: 'server-A' loaded, 'server-B' is silently dropped. + const survivor = { sourceId: 'server-A', base: { name: 'tool-a' } }; + aiClient.loadRemoteTools.mockResolvedValue([survivor] as unknown as Awaited< + ReturnType + >); + + runner = new Runner( + createRunnerConfig({ + workflowPort, + aiModelPort: aiClient as unknown as AiModelPort, + logger, + }), + ); + await runner.triggerPoll('run-1'); + + // (1) The survivor's Record key is NOT flagged; only the dropped one is. + expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', { + requestedMcpServerId: 'id-A', + failedConfigNames: ['server-B'], + }); + // (2) The McpStepExecutor was constructed with the survivor — proves "proceeds with the + // survivor" end-to-end despite the partial failure. Asserting on the captured instance + // bypasses the global execute() spy (which would otherwise hide which tools reached + // the executor). + expect(executeSpy).toHaveBeenCalledTimes(1); + const executorInstance = executeSpy.mock.instances[0]; + expect(executorInstance).toBeInstanceOf(McpStepExecutor); + expect( + (executorInstance as unknown as { remoteTools: readonly unknown[] }).remoteTools, + ).toEqual([survivor]); + }); + it('re-scopes loadRemoteTools per dispatch when chained MCP steps target different servers', async () => { const workflowPort = createMockWorkflowPort(); const aiClient = createMockAiClient(); From dbfd26d1ed6a03ae5fc156dfeb08766763bd5f69 Mon Sep 17 00:00:00 2001 From: Brun Christophe Date: Fri, 29 May 2026 12:09:46 +0200 Subject: [PATCH 6/7] refactor(workflow-executor): require mcpServerId on McpTask steps (PRD-360) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PRD-360 is delivered on the orchestrator: every config emitted on /liana/mcp-server-configs-with-details carries a non-null DB id, on both MCP and Forest-connector branches. Lock the executor's trust boundary accordingly and remove the legacy-fallback code that handled the pre-migration "no mcpServerId / no config.id" shape. - zod McpStepDefinition.mcpServerId becomes required — a missing id is now a malformed run instead of a silent broad fetch. - step-definition mapper switches on the union discriminant so TS narrows mcpServerId; drops the defensive 'in task' spread. - StepExecutorFactory + NoMcpToolsError signatures tightened to string. - RemoteToolFetcher.fetch is now string-only; warnUnidentifiedConfigs (the partial-PRD-360-migration sentinel) is gone. - Tests pruned of legacy-fallback scenarios; mapper boundary now has a fail-fast assertion for the missing-id case. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/adapters/step-definition-mapper.ts | 37 ++--- packages/workflow-executor/src/errors.ts | 10 +- .../src/executors/step-executor-factory.ts | 2 +- .../src/remote-tool-fetcher.ts | 28 +--- .../src/types/validated/step-definition.ts | 2 +- .../adapters/step-definition-mapper.test.ts | 8 +- .../workflow-executor/test/errors.test.ts | 10 +- .../test/executors/mcp-step-executor.test.ts | 1 + .../test/remote-tool-fetcher.test.ts | 75 ++-------- .../workflow-executor/test/runner.test.ts | 140 +----------------- 10 files changed, 46 insertions(+), 267 deletions(-) diff --git a/packages/workflow-executor/src/adapters/step-definition-mapper.ts b/packages/workflow-executor/src/adapters/step-definition-mapper.ts index a96b049791..f1723ce26d 100644 --- a/packages/workflow-executor/src/adapters/step-definition-mapper.ts +++ b/packages/workflow-executor/src/adapters/step-definition-mapper.ts @@ -1,11 +1,11 @@ import type { - ServerTaskTypeEnum, ServerWorkflowCondition, ServerWorkflowStep, ServerWorkflowTask, } from './server-types'; import type { ConditionStepDefinition, StepDefinition } from '../types/validated/step-definition'; +import { ServerTaskTypeEnum } from './server-types'; import { InvalidStepDefinitionError, UnsupportedStepTypeError } from '../errors'; import { ConditionStepDefinitionSchema, @@ -18,48 +18,35 @@ import { UpdateRecordStepDefinitionSchema, } from '../types/validated/step-definition'; -const TASK_TYPE_TO_STEP_TYPE: Record = { - 'get-data': StepType.ReadRecord, - 'update-data': StepType.UpdateRecord, - 'trigger-action': StepType.TriggerAction, - 'load-related-record': StepType.LoadRelatedRecord, - 'mcp-server': StepType.Mcp, - guideline: StepType.Guidance, -}; - function mapTask(task: ServerWorkflowTask): StepDefinition { - const stepType = TASK_TYPE_TO_STEP_TYPE[task.taskType]; - - if (!stepType) { - throw new InvalidStepDefinitionError(`Unknown taskType: "${task.taskType}"`); - } - // executionType is passed through as-is; each schema's .default().catch() handles // missing or unsupported values without requiring an explicit mapping here. const base = { prompt: task.prompt, executionType: task.executionType }; - switch (stepType) { - case StepType.Mcp: + switch (task.taskType) { + case ServerTaskTypeEnum.McpServer: return McpStepDefinitionSchema.parse({ ...base, type: StepType.Mcp, - ...('mcpServerId' in task && { mcpServerId: task.mcpServerId }), + mcpServerId: task.mcpServerId, }); - case StepType.Guidance: + case ServerTaskTypeEnum.Guideline: return GuidanceStepDefinitionSchema.parse({ ...base, type: StepType.Guidance }); - case StepType.ReadRecord: + case ServerTaskTypeEnum.GetData: return ReadRecordStepDefinitionSchema.parse({ ...base, type: StepType.ReadRecord }); - case StepType.UpdateRecord: + case ServerTaskTypeEnum.UpdateData: return UpdateRecordStepDefinitionSchema.parse({ ...base, type: StepType.UpdateRecord }); - case StepType.TriggerAction: + case ServerTaskTypeEnum.TriggerAction: return TriggerActionStepDefinitionSchema.parse({ ...base, type: StepType.TriggerAction }); - case StepType.LoadRelatedRecord: + case ServerTaskTypeEnum.LoadRelatedRecord: return LoadRelatedRecordStepDefinitionSchema.parse({ ...base, type: StepType.LoadRelatedRecord, }); default: - throw new InvalidStepDefinitionError(`Unmapped step type: "${stepType}"`); + throw new InvalidStepDefinitionError( + `Unknown taskType: "${(task as { taskType: string }).taskType}"`, + ); } } diff --git a/packages/workflow-executor/src/errors.ts b/packages/workflow-executor/src/errors.ts index 8a8a8cf49c..6c13911fc7 100644 --- a/packages/workflow-executor/src/errors.ts +++ b/packages/workflow-executor/src/errors.ts @@ -210,11 +210,11 @@ export class StepTimeoutError extends WorkflowExecutorError { } export class NoMcpToolsError extends WorkflowExecutorError { - constructor(requestedMcpServerId?: string) { - const technical = requestedMcpServerId - ? `No MCP tools available for mcpServerId="${requestedMcpServerId}"` - : 'No MCP tools available'; - super(technical, 'No tools are available to execute this step.'); + constructor(requestedMcpServerId: string) { + super( + `No MCP tools available for mcpServerId="${requestedMcpServerId}"`, + 'No tools are available to execute this step.', + ); } } diff --git a/packages/workflow-executor/src/executors/step-executor-factory.ts b/packages/workflow-executor/src/executors/step-executor-factory.ts index 0d23617907..e339ba83fb 100644 --- a/packages/workflow-executor/src/executors/step-executor-factory.ts +++ b/packages/workflow-executor/src/executors/step-executor-factory.ts @@ -48,7 +48,7 @@ export default class StepExecutorFactory { step: AvailableStepExecution, contextConfig: StepContextConfig, activityLogPort: ActivityLogPort, - fetchRemoteTools: (mcpServerId?: string) => Promise, + fetchRemoteTools: (mcpServerId: string) => Promise, incomingPendingData?: unknown, ): Promise { try { diff --git a/packages/workflow-executor/src/remote-tool-fetcher.ts b/packages/workflow-executor/src/remote-tool-fetcher.ts index 3df4af214b..61a464b6ba 100644 --- a/packages/workflow-executor/src/remote-tool-fetcher.ts +++ b/packages/workflow-executor/src/remote-tool-fetcher.ts @@ -24,14 +24,11 @@ export default class RemoteToolFetcher { this.logger = logger; } - async fetch(mcpServerId?: string): Promise { + async fetch(mcpServerId: string): Promise { const configs = await this.workflowPort.getMcpServerConfigs(); - const scoped = mcpServerId ? scopeConfigsToServer(configs, mcpServerId) : configs; + const scoped = scopeConfigsToServer(configs, mcpServerId); - if (mcpServerId) { - this.warnUnidentifiedConfigs(configs, mcpServerId); - this.warnMissingTargetServer(configs, scoped, mcpServerId); - } + this.warnMissingTargetServer(configs, scoped, mcpServerId); if (Object.keys(scoped).length === 0) return []; @@ -42,21 +39,6 @@ export default class RemoteToolFetcher { return tools; } - // Configs without id cannot be matched against a defined mcpServerId. Surface them so a - // partial PRD-360 migration doesn't masquerade as "wrong target server" downstream. - private warnUnidentifiedConfigs(configs: Record, mcpServerId: string): void { - const unidentifiedConfigNames = Object.entries(configs) - .filter(([, cfg]) => cfg.id === undefined) - .map(([name]) => name); - - if (unidentifiedConfigNames.length === 0) return; - - this.logger.warn('MCP configs without id cannot be scoped — check orchestrator migration', { - requestedMcpServerId: mcpServerId, - unidentifiedConfigNames, - }); - } - // Distinguish "no configs at all" (deployment misconfig) from "configs exist but none match" // (orchestrator/executor drift on server id) — both yield zero tools, but ops need to know // which one to fix. @@ -87,7 +69,7 @@ export default class RemoteToolFetcher { private errorOnPartialLoadFailure( scoped: Record, tools: RemoteTool[], - mcpServerId: string | undefined, + mcpServerId: string, ): void { const loadedSourceIds = new Set(tools.map(t => t.sourceId)); const failedConfigNames = Object.entries(scoped) @@ -98,7 +80,7 @@ export default class RemoteToolFetcher { if (failedConfigNames.length === 0) return; this.logger.error('MCP servers failed to load tools', { - requestedMcpServerId: mcpServerId ?? null, + requestedMcpServerId: mcpServerId, failedConfigNames, }); } diff --git a/packages/workflow-executor/src/types/validated/step-definition.ts b/packages/workflow-executor/src/types/validated/step-definition.ts index 3413653d16..8cf05f90d7 100644 --- a/packages/workflow-executor/src/types/validated/step-definition.ts +++ b/packages/workflow-executor/src/types/validated/step-definition.ts @@ -114,7 +114,7 @@ export const McpStepDefinitionSchema = z.object({ .enum([AutomatedWithConfirmation, FullyAutomated]) .default(AutomatedWithConfirmation) .catch(AutomatedWithConfirmation), - mcpServerId: z.string().optional(), + mcpServerId: z.string(), }); export type McpStepDefinition = z.infer; diff --git a/packages/workflow-executor/test/adapters/step-definition-mapper.test.ts b/packages/workflow-executor/test/adapters/step-definition-mapper.test.ts index a9a09d0929..594a8b8d07 100644 --- a/packages/workflow-executor/test/adapters/step-definition-mapper.test.ts +++ b/packages/workflow-executor/test/adapters/step-definition-mapper.test.ts @@ -112,14 +112,10 @@ describe('toStepDefinition', () => { }); }); - it('should map task with mcp-server taskType without mcpServerId', () => { + it('rejects an mcp-server task missing mcpServerId at the zod boundary', () => { const task = makeTask({ taskType: ServerTaskTypeEnum.McpServer, prompt: 'run mcp' }); - expect(toStepDefinition(task)).toEqual({ - type: StepType.Mcp, - prompt: 'run mcp', - executionType: ServerStepExecutionTypeEnum.AutomatedWithConfirmation, - }); + expect(() => toStepDefinition(task)).toThrow(); }); it('should map task with guideline taskType to guidance', () => { diff --git a/packages/workflow-executor/test/errors.test.ts b/packages/workflow-executor/test/errors.test.ts index b4c0883712..3f5c0541d4 100644 --- a/packages/workflow-executor/test/errors.test.ts +++ b/packages/workflow-executor/test/errors.test.ts @@ -70,17 +70,11 @@ describe('extractErrorMessage', () => { }); describe('NoMcpToolsError', () => { - it('produces a fully generic technical message when no mcpServerId was requested (no filter case)', () => { - const err = new NoMcpToolsError(); - - expect(err.message).toBe('No MCP tools available'); - expect(err.userMessage).toBe('No tools are available to execute this step.'); - }); - - it('includes the requested mcpServerId in the technical message when a filter was active', () => { + it('includes the requested mcpServerId in the technical message', () => { const err = new NoMcpToolsError('id-missing'); expect(err.message).toBe('No MCP tools available for mcpServerId="id-missing"'); + expect(err.userMessage).toBe('No tools are available to execute this step.'); }); it('keeps the user-facing message generic — no internal ids must leak', () => { diff --git a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts index 0352729ea8..809be2e964 100644 --- a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts @@ -42,6 +42,7 @@ function makeStep(overrides: Partial = {}): McpStepDefinition type: StepType.Mcp, prompt: 'Send a notification to the user', executionType: StepExecutionMode.AutomatedWithConfirmation, + mcpServerId: 'default-mcp-id', ...overrides, }; } diff --git a/packages/workflow-executor/test/remote-tool-fetcher.test.ts b/packages/workflow-executor/test/remote-tool-fetcher.test.ts index cc9dda3f45..376656a3a2 100644 --- a/packages/workflow-executor/test/remote-tool-fetcher.test.ts +++ b/packages/workflow-executor/test/remote-tool-fetcher.test.ts @@ -66,13 +66,6 @@ describe('scopeConfigsToServer', () => { expect(scopeConfigsToServer(configs, 'id-missing')).toEqual({}); }); - - it('skips entries with undefined id even when the requested id is undefined-like', () => { - const configs = { legacy: cfg(undefined), 'srv-a': cfg('id-A') }; - - // Undefined-id entries are never scoped in; the legacy fallback bypasses scoping entirely. - expect(scopeConfigsToServer(configs, 'id-A')).toEqual({ 'srv-a': cfg('id-A') }); - }); }); // --------------------------------------------------------------------------- @@ -80,17 +73,6 @@ describe('scopeConfigsToServer', () => { // --------------------------------------------------------------------------- describe('RemoteToolFetcher.fetch', () => { - it('passes the full Record to loadRemoteTools when mcpServerId is undefined (legacy)', async () => { - const configs = { 'srv-a': cfg('id-A'), 'srv-b': cfg('id-B') }; - const { fetcher, aiModelPort } = makeFetcher({ - workflowPort: { getMcpServerConfigs: jest.fn().mockResolvedValue(configs) }, - }); - - await fetcher.fetch(); - - expect(aiModelPort.loadRemoteTools).toHaveBeenCalledWith(configs); - }); - it('passes only the matching config to loadRemoteTools when mcpServerId is set', async () => { const { fetcher, aiModelPort } = makeFetcher({ workflowPort: { @@ -116,39 +98,20 @@ describe('RemoteToolFetcher.fetch', () => { expect(aiModelPort.loadRemoteTools).not.toHaveBeenCalled(); }); - it('warns about unidentified configs before reporting the missing target server', async () => { + it('warns about the missing target with the list of advertised ids when no config matches', async () => { const { fetcher, logger } = makeFetcher({ workflowPort: { getMcpServerConfigs: jest .fn() - .mockResolvedValue({ 'srv-a': cfg('id-A'), legacy: cfg(undefined) }), + .mockResolvedValue({ 'srv-a': cfg('id-A'), 'srv-b': cfg('id-B') }), }, }); await fetcher.fetch('id-missing'); - expect(logger.warn).toHaveBeenCalledWith( - 'MCP configs without id cannot be scoped — check orchestrator migration', - { requestedMcpServerId: 'id-missing', unidentifiedConfigNames: ['legacy'] }, - ); expect(logger.warn).toHaveBeenCalledWith( 'MCP step targets a server not advertised by the orchestrator', - { requestedMcpServerId: 'id-missing', availableMcpServerIds: ['id-A'] }, - ); - }); - - it('does not warn about unidentified configs when no mcpServerId is set (legacy fallback)', async () => { - const { fetcher, logger } = makeFetcher({ - workflowPort: { - getMcpServerConfigs: jest.fn().mockResolvedValue({ legacy: cfg(undefined) }), - }, - }); - - await fetcher.fetch(); - - expect(logger.warn).not.toHaveBeenCalledWith( - 'MCP configs without id cannot be scoped — check orchestrator migration', - expect.anything(), + { requestedMcpServerId: 'id-missing', availableMcpServerIds: ['id-A', 'id-B'] }, ); }); @@ -201,22 +164,6 @@ describe('RemoteToolFetcher.fetch', () => { }); }); - it('reports requestedMcpServerId as null in the partial-failure log for the legacy fallback', async () => { - const { fetcher, logger } = makeFetcher({ - workflowPort: { - getMcpServerConfigs: jest.fn().mockResolvedValue({ 'srv-a': cfg('id-A') }), - }, - aiModelPort: { loadRemoteTools: jest.fn().mockResolvedValue([]) }, - }); - - await fetcher.fetch(); - - expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', { - requestedMcpServerId: null, - failedConfigNames: ['srv-a'], - }); - }); - it('does not log a partial-failure error when every scoped entry produced a tool', async () => { const { fetcher, logger } = makeFetcher({ workflowPort: { @@ -237,8 +184,9 @@ describe('RemoteToolFetcher.fetch', () => { // them as failed on the happy path. it('does not flag a Forest integration whose sourceId differs from the Record key', async () => { const forestConfig = { + id: 'id-zendesk', isForestConnector: true as const, - name: 'zendesk', + integrationName: 'Zendesk', } as unknown as ToolConfig; const { fetcher, logger } = makeFetcher({ workflowPort: { @@ -249,21 +197,22 @@ describe('RemoteToolFetcher.fetch', () => { }, }); - await fetcher.fetch(); + await fetcher.fetch('id-zendesk'); expect(logger.error).not.toHaveBeenCalled(); }); - it('flags only the MCP config when a Forest connector and a failed MCP entry coexist', async () => { + it('flags only the MCP config when a Forest connector and a failed MCP entry share the target id', async () => { const forestConfig = { + id: 'shared-id', isForestConnector: true as const, - name: 'zendesk', + integrationName: 'Zendesk', } as unknown as ToolConfig; const { fetcher, logger } = makeFetcher({ workflowPort: { getMcpServerConfigs: jest.fn().mockResolvedValue({ 'zendesk-prod': forestConfig, - 'srv-a': cfg('id-A'), + 'srv-a': cfg('shared-id'), }), }, aiModelPort: { @@ -271,10 +220,10 @@ describe('RemoteToolFetcher.fetch', () => { }, }); - await fetcher.fetch(); + await fetcher.fetch('shared-id'); expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', { - requestedMcpServerId: null, + requestedMcpServerId: 'shared-id', failedConfigNames: ['srv-a'], }); }); diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index 25701c5aac..1f9e031303 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -126,7 +126,11 @@ function makeStepDefinition(stepType: StepType): StepDefinition { } if (stepType === StepType.Mcp) { - return { type: StepType.Mcp, executionType: StepExecutionMode.AutomatedWithConfirmation }; + return { + type: StepType.Mcp, + executionType: StepExecutionMode.AutomatedWithConfirmation, + mcpServerId: 'default-mcp-id', + }; } if (stepType === StepType.Guidance) { @@ -1155,33 +1159,6 @@ describe('MCP lazy loading (via once thunk)', () => { expect(aiClient.loadRemoteTools).not.toHaveBeenCalled(); }); - it('passes the orchestrator Record-shape configs directly to loadRemoteTools', async () => { - const workflowPort = createMockWorkflowPort(); - const aiClient = createMockAiClient(); - const step = makePendingStep({ - runId: 'run-1', - stepId: 'step-mcp-1', - stepType: StepType.Mcp, - }); - workflowPort.getAvailableRun.mockResolvedValue({ - step, - auth: { forestServerToken: 'test-forest-token' }, - }); - const realConfigs = { - 'mcp-server-1': { url: 'https://mcp.example.com', type: 'http' as const, headers: {} }, - }; - workflowPort.getMcpServerConfigs.mockResolvedValue(realConfigs); - - runner = new Runner( - createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), - ); - await runner.triggerPoll('run-1'); - - expect(workflowPort.getMcpServerConfigs).toHaveBeenCalledTimes(1); - expect(aiClient.loadRemoteTools).toHaveBeenCalledTimes(1); - expect(aiClient.loadRemoteTools).toHaveBeenCalledWith(realConfigs); - }); - it('skips loadRemoteTools when the orchestrator returns an empty Record', async () => { const workflowPort = createMockWorkflowPort(); const aiClient = createMockAiClient(); @@ -1363,80 +1340,6 @@ describe('MCP fetch scoping', () => { ); }); - it('omits undefined ids from availableMcpServerIds and warns about unidentified configs', async () => { - const workflowPort = createMockWorkflowPort(); - const aiClient = createMockAiClient(); - const logger = createMockLogger(); - const step = makePendingStep({ - runId: 'run-1', - stepId: 'step-mcp-1', - stepType: StepType.Mcp, - stepDefinition: { - type: StepType.Mcp, - executionType: StepExecutionMode.AutomatedWithConfirmation, - mcpServerId: 'id-missing', - }, - }); - workflowPort.getAvailableRun.mockResolvedValue({ - step, - auth: { forestServerToken: 'test-forest-token' }, - }); - workflowPort.getMcpServerConfigs.mockResolvedValue({ - 'server-A': { id: 'id-A', url: 'https://a.example', type: 'http', headers: {} }, - 'legacy-server': { url: 'https://legacy.example', type: 'http', headers: {} }, - }); - - runner = new Runner( - createRunnerConfig({ - workflowPort, - aiModelPort: aiClient as unknown as AiModelPort, - logger, - }), - ); - await runner.triggerPoll('run-1'); - - expect(logger.warn).toHaveBeenCalledWith( - 'MCP configs without id cannot be scoped — check orchestrator migration', - { requestedMcpServerId: 'id-missing', unidentifiedConfigNames: ['legacy-server'] }, - ); - expect(logger.warn).toHaveBeenCalledWith( - 'MCP step targets a server not advertised by the orchestrator', - { requestedMcpServerId: 'id-missing', availableMcpServerIds: ['id-A'] }, - ); - }); - - it('does not warn about unidentified configs in legacy fallback (no step.mcpServerId)', async () => { - const workflowPort = createMockWorkflowPort(); - const aiClient = createMockAiClient(); - const logger = createMockLogger(); - const step = makePendingStep({ - runId: 'run-1', - stepId: 'step-mcp-1', - stepType: StepType.Mcp, - }); - workflowPort.getAvailableRun.mockResolvedValue({ - step, - auth: { forestServerToken: 'test-forest-token' }, - }); - workflowPort.getMcpServerConfigs.mockResolvedValue({ - 'legacy-server': { url: 'https://legacy.example', type: 'http' as const, headers: {} }, - }); - - runner = new Runner( - createRunnerConfig({ - workflowPort, - aiModelPort: aiClient as unknown as AiModelPort, - logger, - }), - ); - await runner.triggerPoll('run-1'); - - expect(logger.warn).not.toHaveBeenCalledWith( - 'MCP configs without id cannot be scoped — check orchestrator migration', - expect.anything(), - ); - }); - it('logs an error listing failed config names when loadRemoteTools returns fewer tools than scoped entries', async () => { const workflowPort = createMockWorkflowPort(); const aiClient = createMockAiClient(); @@ -1582,32 +1485,6 @@ describe('MCP fetch scoping', () => { 'server-B': expect.objectContaining({ id: 'id-B' }), }); }); - - it('passes the full Record when step.mcpServerId is absent (legacy fallback)', async () => { - const workflowPort = createMockWorkflowPort(); - const aiClient = createMockAiClient(); - const step = makePendingStep({ - runId: 'run-1', - stepId: 'step-mcp-1', - stepType: StepType.Mcp, - }); - workflowPort.getAvailableRun.mockResolvedValue({ - step, - auth: { forestServerToken: 'test-forest-token' }, - }); - const allConfigs = { - 'server-A': { id: 'id-A', url: 'https://a.example', type: 'http' as const, headers: {} }, - 'server-B': { id: 'id-B', url: 'https://b.example', type: 'http' as const, headers: {} }, - }; - workflowPort.getMcpServerConfigs.mockResolvedValue(allConfigs); - - runner = new Runner( - createRunnerConfig({ workflowPort, aiModelPort: aiClient as unknown as AiModelPort }), - ); - await runner.triggerPoll('run-1'); - - expect(aiClient.loadRemoteTools).toHaveBeenCalledWith(allConfigs); - }); }); // --------------------------------------------------------------------------- @@ -1707,13 +1584,6 @@ describe('StepExecutorFactory.create — factory', () => { expect(fetchRemoteTools).toHaveBeenCalledWith('srv-42'); }); - it('forwards undefined to fetchRemoteTools when McpTask step has no mcpServerId (legacy)', async () => { - const step = makePendingStep({ stepType: StepType.Mcp }); - const fetchRemoteTools = jest.fn().mockResolvedValue([]); - await StepExecutorFactory.create(step, makeContextConfig(), makeRunLogger(), fetchRemoteTools); - expect(fetchRemoteTools).toHaveBeenCalledWith(undefined); - }); - it('dispatches Guidance steps to GuidanceStepExecutor', async () => { const step = makePendingStep({ stepType: StepType.Guidance }); const executor = await StepExecutorFactory.create( From e607979fa87b86c2a142c9ee51127aec218a53cf Mon Sep 17 00:00:00 2001 From: Brun Christophe Date: Fri, 29 May 2026 13:46:00 +0200 Subject: [PATCH 7/7] refactor(workflow-executor): unify MCP/Forest partial-failure check via tool.mcpServerId MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Post-review follow-ups for PR #1607. errorOnPartialLoadFailure now discriminates on tool.mcpServerId instead of tool.sourceId, which both McpClient and ForestIntegrationClient populate from the orchestrator's persisted id. This removes the Forest carve-out and fixes a real silent failure: a Forest connector that fails to load entirely is now flagged with its Record key in the error log, instead of bubbling up as a generic NoMcpToolsError with no indication of which connector is down. Other follow-ups: - z.string().min(1) on mcpServerId at the boundary — empty string is no longer parseable. - NoMcpToolsError userMessage made actionable, aligned with the StepTimeoutError pattern ("X happened. [Action]"). - Stale references corrected: mcp-step-executor's requireTools comment no longer mentions Runner; integration test no longer references the removed Runner.fetchRemoteTools symbol. - New unit tests for getMcpServerConfigs / loadRemoteTools rejection paths pin the contract that errors propagate instead of being swallowed into an empty tool list. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/workflow-executor/src/errors.ts | 2 +- .../src/executors/mcp-step-executor.ts | 8 +- .../src/remote-tool-fetcher.ts | 15 ++-- .../src/types/validated/step-definition.ts | 2 +- .../workflow-executor/test/errors.test.ts | 5 +- .../test/executors/mcp-step-executor.test.ts | 12 ++- .../integration/workflow-execution.test.ts | 2 +- .../test/remote-tool-fetcher.test.ts | 79 +++++++++++-------- .../workflow-executor/test/runner.test.ts | 60 ++------------ 9 files changed, 73 insertions(+), 112 deletions(-) diff --git a/packages/workflow-executor/src/errors.ts b/packages/workflow-executor/src/errors.ts index 6c13911fc7..828d2758fb 100644 --- a/packages/workflow-executor/src/errors.ts +++ b/packages/workflow-executor/src/errors.ts @@ -213,7 +213,7 @@ export class NoMcpToolsError extends WorkflowExecutorError { constructor(requestedMcpServerId: string) { super( `No MCP tools available for mcpServerId="${requestedMcpServerId}"`, - 'No tools are available to execute this step.', + 'Tools could not be loaded for the targeted server. Please try again, or contact your administrator if the problem persists.', ); } } diff --git a/packages/workflow-executor/src/executors/mcp-step-executor.ts b/packages/workflow-executor/src/executors/mcp-step-executor.ts index c42190b018..063be3b022 100644 --- a/packages/workflow-executor/src/executors/mcp-step-executor.ts +++ b/packages/workflow-executor/src/executors/mcp-step-executor.ts @@ -225,10 +225,10 @@ export default class McpStepExecutor extends BaseStepExecutor ); } - // Tools are pre-scoped to step.mcpServerId upstream; the executor only asserts non-empty. - // An empty list means either no config matched the step's mcpServerId, or the per-server MCP - // connection failed at load time (McpClient swallows per-server load errors). The runner logs - // the diagnostic at the right layer; here we rely on BaseStepExecutor's catch-and-log. + // Tools are pre-scoped to step.mcpServerId upstream. An empty list means either no config + // matched, or the per-server connection failed at load time (McpClient swallows per-server + // errors). RemoteToolFetcher emits the diagnostic upstream; here we just surface the empty + // case as a domain error so BaseStepExecutor turns it into a step outcome. private requireTools(): RemoteTool[] { if (this.remoteTools.length === 0) { throw new NoMcpToolsError(this.context.stepDefinition.mcpServerId); diff --git a/packages/workflow-executor/src/remote-tool-fetcher.ts b/packages/workflow-executor/src/remote-tool-fetcher.ts index 61a464b6ba..0d141ae2bc 100644 --- a/packages/workflow-executor/src/remote-tool-fetcher.ts +++ b/packages/workflow-executor/src/remote-tool-fetcher.ts @@ -3,8 +3,6 @@ import type { Logger } from './ports/logger-port'; import type { WorkflowPort } from './ports/workflow-port'; import type { RemoteTool, ToolConfig } from '@forestadmin/ai-proxy'; -import { isForestIntegrationConfig } from '@forestadmin/ai-proxy'; - // Match by config.id, not by Record key: server names can collide across configs. export function scopeConfigsToServer( configs: Record, @@ -62,20 +60,17 @@ export default class RemoteToolFetcher { } // Partial-failure detection: McpClient swallows per-server load errors and returns whatever - // succeeded. Compare scoped keys to the tools' sourceIds so ops can tell "wrong config" from - // "MCP server down". Only valid for MCP configs — Forest integrations carry a hardcoded - // sourceId (e.g. 'zendesk') that does not match the Record key, so any successfully-loaded - // Forest connector keyed otherwise would always be flagged as failed. + // succeeded. Match config.id against tool.mcpServerId — both providers populate it from the + // orchestrator's persisted id, so the check is uniform across MCP and Forest connectors. private errorOnPartialLoadFailure( scoped: Record, tools: RemoteTool[], mcpServerId: string, ): void { - const loadedSourceIds = new Set(tools.map(t => t.sourceId)); + const loadedMcpServerIds = new Set(tools.map(t => t.mcpServerId)); const failedConfigNames = Object.entries(scoped) - .filter(([, cfg]) => !isForestIntegrationConfig(cfg)) - .map(([name]) => name) - .filter(name => !loadedSourceIds.has(name)); + .filter(([, cfg]) => !loadedMcpServerIds.has(cfg.id)) + .map(([name]) => name); if (failedConfigNames.length === 0) return; diff --git a/packages/workflow-executor/src/types/validated/step-definition.ts b/packages/workflow-executor/src/types/validated/step-definition.ts index 8cf05f90d7..6c2916cb14 100644 --- a/packages/workflow-executor/src/types/validated/step-definition.ts +++ b/packages/workflow-executor/src/types/validated/step-definition.ts @@ -114,7 +114,7 @@ export const McpStepDefinitionSchema = z.object({ .enum([AutomatedWithConfirmation, FullyAutomated]) .default(AutomatedWithConfirmation) .catch(AutomatedWithConfirmation), - mcpServerId: z.string(), + mcpServerId: z.string().min(1), }); export type McpStepDefinition = z.infer; diff --git a/packages/workflow-executor/test/errors.test.ts b/packages/workflow-executor/test/errors.test.ts index 3f5c0541d4..be03b9562d 100644 --- a/packages/workflow-executor/test/errors.test.ts +++ b/packages/workflow-executor/test/errors.test.ts @@ -74,13 +74,12 @@ describe('NoMcpToolsError', () => { const err = new NoMcpToolsError('id-missing'); expect(err.message).toBe('No MCP tools available for mcpServerId="id-missing"'); - expect(err.userMessage).toBe('No tools are available to execute this step.'); }); - it('keeps the user-facing message generic — no internal ids must leak', () => { + it('keeps the user-facing message free of internal ids', () => { const err = new NoMcpToolsError('id-missing'); - expect(err.userMessage).toBe('No tools are available to execute this step.'); + expect(err.userMessage).toMatch(/^Tools could not be loaded for the targeted server\./); expect(err.userMessage).not.toMatch(/id-missing/); }); }); diff --git a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts index 809be2e964..fab6e8f0c4 100644 --- a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts @@ -503,16 +503,20 @@ describe('McpStepExecutor', () => { const result = await executor.execute(); expect(result.stepOutcome.status).toBe('error'); - expect(result.stepOutcome.error).toBe('No tools are available to execute this step.'); + expect(result.stepOutcome.error).toMatch( + /^Tools could not be loaded for the targeted server\./, + ); }); - it('keeps the user-facing error message generic when no tools are available', async () => { + it('keeps the user-facing error message free of internal ids', async () => { const context = makeContext({ stepDefinition: makeStep({ mcpServerId: 'id-B' }) }); const executor = new McpStepExecutor(context, []); const result = await executor.execute(); - expect(result.stepOutcome.error).toBe('No tools are available to execute this step.'); + expect(result.stepOutcome.error).toMatch( + /^Tools could not be loaded for the targeted server\./, + ); expect(result.stepOutcome.error).not.toMatch(/id-B/); }); @@ -673,7 +677,7 @@ describe('McpStepExecutor', () => { await expect(executor.execute()).resolves.toMatchObject({ stepOutcome: { status: 'error', - error: 'No tools are available to execute this step.', + error: expect.stringMatching(/^Tools could not be loaded for the targeted server\./), }, }); }); diff --git a/packages/workflow-executor/test/integration/workflow-execution.test.ts b/packages/workflow-executor/test/integration/workflow-execution.test.ts index e689daf96e..82f02cc237 100644 --- a/packages/workflow-executor/test/integration/workflow-execution.test.ts +++ b/packages/workflow-executor/test/integration/workflow-execution.test.ts @@ -592,7 +592,7 @@ describe('workflow execution (integration)', () => { .fn() .mockResolvedValue({ step, auth: { forestServerToken: 'test-forest-token' } }), // Two configs but only one matches step.mcpServerId — the assertion below proves - // Runner.fetchRemoteTools actually scopes the Record before calling loadRemoteTools. + // RemoteToolFetcher actually scopes the Record before calling loadRemoteTools. getMcpServerConfigs: jest.fn().mockResolvedValue({ 'mcp-server-1': { id: 'mcp-1', url: 'http://fake' }, 'mcp-server-2': { id: 'mcp-2', url: 'http://other' }, diff --git a/packages/workflow-executor/test/remote-tool-fetcher.test.ts b/packages/workflow-executor/test/remote-tool-fetcher.test.ts index 376656a3a2..0beedc9f3e 100644 --- a/packages/workflow-executor/test/remote-tool-fetcher.test.ts +++ b/packages/workflow-executor/test/remote-tool-fetcher.test.ts @@ -17,8 +17,8 @@ function createMockLogger(): jest.Mocked> { return { info: jest.fn(), warn: jest.fn(), error: jest.fn() }; } -function makeRemoteTool(sourceId: string): RemoteTool { - return { sourceId } as unknown as RemoteTool; +function makeRemoteTool(sourceId: string, mcpServerId?: string): RemoteTool { + return { sourceId, mcpServerId } as unknown as RemoteTool; } function makeFetcher(overrides?: { @@ -144,33 +144,29 @@ describe('RemoteToolFetcher.fetch', () => { expect(logger.warn).not.toHaveBeenCalled(); }); - it('logs an error listing failed config names when loadRemoteTools returns fewer tools than scoped entries', async () => { + it('flags the scoped MCP config when no tool was loaded for its id', async () => { const { fetcher, logger } = makeFetcher({ workflowPort: { - getMcpServerConfigs: jest - .fn() - .mockResolvedValue({ 'srv-a': cfg('id-A'), 'srv-b': cfg('id-A') }), - }, - aiModelPort: { - loadRemoteTools: jest.fn().mockResolvedValue([makeRemoteTool('srv-a')]), + getMcpServerConfigs: jest.fn().mockResolvedValue({ 'srv-a': cfg('id-A') }), }, + aiModelPort: { loadRemoteTools: jest.fn().mockResolvedValue([]) }, }); await fetcher.fetch('id-A'); expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', { requestedMcpServerId: 'id-A', - failedConfigNames: ['srv-b'], + failedConfigNames: ['srv-a'], }); }); - it('does not log a partial-failure error when every scoped entry produced a tool', async () => { + it('does not log a partial-failure error when a tool carries the scoped config id', async () => { const { fetcher, logger } = makeFetcher({ workflowPort: { getMcpServerConfigs: jest.fn().mockResolvedValue({ 'srv-a': cfg('id-A') }), }, aiModelPort: { - loadRemoteTools: jest.fn().mockResolvedValue([makeRemoteTool('srv-a')]), + loadRemoteTools: jest.fn().mockResolvedValue([makeRemoteTool('srv-a', 'id-A')]), }, }); @@ -179,10 +175,9 @@ describe('RemoteToolFetcher.fetch', () => { expect(logger.error).not.toHaveBeenCalled(); }); - // Forest integrations set sourceId to a hardcoded literal ('zendesk', 'snowflake', ...) that - // does not necessarily match the Record key — a sourceId-vs-key comparison would always flag - // them as failed on the happy path. - it('does not flag a Forest integration whose sourceId differs from the Record key', async () => { + // Forest integrations carry a hardcoded sourceId (e.g. 'zendesk'); the partial-failure check + // discriminates on tool.mcpServerId, which both providers populate from the orchestrator id. + it('does not flag a Forest connector whose sourceId differs from the Record key', async () => { const forestConfig = { id: 'id-zendesk', isForestConnector: true as const, @@ -193,7 +188,7 @@ describe('RemoteToolFetcher.fetch', () => { getMcpServerConfigs: jest.fn().mockResolvedValue({ 'zendesk-prod': forestConfig }), }, aiModelPort: { - loadRemoteTools: jest.fn().mockResolvedValue([makeRemoteTool('zendesk')]), + loadRemoteTools: jest.fn().mockResolvedValue([makeRemoteTool('zendesk', 'id-zendesk')]), }, }); @@ -202,39 +197,32 @@ describe('RemoteToolFetcher.fetch', () => { expect(logger.error).not.toHaveBeenCalled(); }); - it('flags only the MCP config when a Forest connector and a failed MCP entry share the target id', async () => { + it('flags a Forest connector that fails to load entirely', async () => { const forestConfig = { - id: 'shared-id', + id: 'id-zendesk', isForestConnector: true as const, integrationName: 'Zendesk', } as unknown as ToolConfig; const { fetcher, logger } = makeFetcher({ workflowPort: { - getMcpServerConfigs: jest.fn().mockResolvedValue({ - 'zendesk-prod': forestConfig, - 'srv-a': cfg('shared-id'), - }), - }, - aiModelPort: { - loadRemoteTools: jest.fn().mockResolvedValue([makeRemoteTool('zendesk')]), + getMcpServerConfigs: jest.fn().mockResolvedValue({ 'zendesk-prod': forestConfig }), }, + aiModelPort: { loadRemoteTools: jest.fn().mockResolvedValue([]) }, }); - await fetcher.fetch('shared-id'); + await fetcher.fetch('id-zendesk'); expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', { - requestedMcpServerId: 'shared-id', - failedConfigNames: ['srv-a'], + requestedMcpServerId: 'id-zendesk', + failedConfigNames: ['zendesk-prod'], }); }); it('returns the tools produced by loadRemoteTools verbatim', async () => { - const remoteTools = [makeRemoteTool('srv-a'), makeRemoteTool('srv-b')]; + const remoteTools = [makeRemoteTool('srv-a', 'id-A')]; const { fetcher } = makeFetcher({ workflowPort: { - getMcpServerConfigs: jest - .fn() - .mockResolvedValue({ 'srv-a': cfg('id-A'), 'srv-b': cfg('id-A') }), + getMcpServerConfigs: jest.fn().mockResolvedValue({ 'srv-a': cfg('id-A') }), }, aiModelPort: { loadRemoteTools: jest.fn().mockResolvedValue(remoteTools) }, }); @@ -243,4 +231,29 @@ describe('RemoteToolFetcher.fetch', () => { expect(result).toBe(remoteTools); }); + + it('propagates a rejection from loadRemoteTools without logging partial-failure', async () => { + const { fetcher, logger } = makeFetcher({ + workflowPort: { + getMcpServerConfigs: jest.fn().mockResolvedValue({ 'srv-a': cfg('id-A') }), + }, + aiModelPort: { + loadRemoteTools: jest.fn().mockRejectedValue(new Error('MCP unreachable')), + }, + }); + + await expect(fetcher.fetch('id-A')).rejects.toThrow('MCP unreachable'); + expect(logger.error).not.toHaveBeenCalled(); + }); + + it('propagates a rejection from getMcpServerConfigs without calling loadRemoteTools', async () => { + const { fetcher, aiModelPort } = makeFetcher({ + workflowPort: { + getMcpServerConfigs: jest.fn().mockRejectedValue(new Error('orchestrator down')), + }, + }); + + await expect(fetcher.fetch('id-A')).rejects.toThrow('orchestrator down'); + expect(aiModelPort.loadRemoteTools).not.toHaveBeenCalled(); + }); }); diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index 1f9e031303..01af2c0d6e 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -1340,7 +1340,10 @@ describe('MCP fetch scoping', () => { ); }); - it('logs an error listing failed config names when loadRemoteTools returns fewer tools than scoped entries', async () => { + // The diagnostic must not short-circuit dispatch — the executor is still constructed (and + // will surface NoMcpToolsError downstream). Asserting on executeSpy.mock.instances bypasses + // the global execute() spy to confirm the executor saw the (empty) tool list. + it('logs partial-failure and still dispatches to the executor when the scoped server loaded zero tools', async () => { const workflowPort = createMockWorkflowPort(); const aiClient = createMockAiClient(); const logger = createMockLogger(); @@ -1361,7 +1364,6 @@ describe('MCP fetch scoping', () => { workflowPort.getMcpServerConfigs.mockResolvedValue({ 'server-A': { id: 'id-A', url: 'https://a.example', type: 'http', headers: {} }, }); - // McpClient swallows per-server load errors — simulate the empty-result case here. aiClient.loadRemoteTools.mockResolvedValue([]); runner = new Runner( @@ -1377,64 +1379,12 @@ describe('MCP fetch scoping', () => { requestedMcpServerId: 'id-A', failedConfigNames: ['server-A'], }); - }); - - // Locks the sourceId-vs-key semantics of errorOnPartialLoadFailure end-to-end: only the - // failed MCP config is flagged while the survivor reaches the executor. The all-failed - // variant above can't lock this — it would route to NoMcpToolsError regardless of how the - // log builds its config list. - it('flags only the failed config and proceeds with the survivor on a genuine partial failure', async () => { - const workflowPort = createMockWorkflowPort(); - const aiClient = createMockAiClient(); - const logger = createMockLogger(); - const step = makePendingStep({ - runId: 'run-1', - stepId: 'step-mcp-1', - stepType: StepType.Mcp, - stepDefinition: { - type: StepType.Mcp, - executionType: StepExecutionMode.AutomatedWithConfirmation, - mcpServerId: 'id-A', - }, - }); - workflowPort.getAvailableRun.mockResolvedValue({ - step, - auth: { forestServerToken: 'test-forest-token' }, - }); - workflowPort.getMcpServerConfigs.mockResolvedValue({ - 'server-A': { id: 'id-A', url: 'https://a.example', type: 'http', headers: {} }, - 'server-B': { id: 'id-A', url: 'https://b.example', type: 'http', headers: {} }, - }); - // McpClient swallows per-server load errors: 'server-A' loaded, 'server-B' is silently dropped. - const survivor = { sourceId: 'server-A', base: { name: 'tool-a' } }; - aiClient.loadRemoteTools.mockResolvedValue([survivor] as unknown as Awaited< - ReturnType - >); - - runner = new Runner( - createRunnerConfig({ - workflowPort, - aiModelPort: aiClient as unknown as AiModelPort, - logger, - }), - ); - await runner.triggerPoll('run-1'); - - // (1) The survivor's Record key is NOT flagged; only the dropped one is. - expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', { - requestedMcpServerId: 'id-A', - failedConfigNames: ['server-B'], - }); - // (2) The McpStepExecutor was constructed with the survivor — proves "proceeds with the - // survivor" end-to-end despite the partial failure. Asserting on the captured instance - // bypasses the global execute() spy (which would otherwise hide which tools reached - // the executor). expect(executeSpy).toHaveBeenCalledTimes(1); const executorInstance = executeSpy.mock.instances[0]; expect(executorInstance).toBeInstanceOf(McpStepExecutor); expect( (executorInstance as unknown as { remoteTools: readonly unknown[] }).remoteTools, - ).toEqual([survivor]); + ).toEqual([]); }); it('re-scopes loadRemoteTools per dispatch when chained MCP steps target different servers', async () => {