Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .changeset/remove-tooltask-get-handlers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
"@modelcontextprotocol/core": minor
"@modelcontextprotocol/server": minor
---

Make `ToolTaskHandler.getTask`/`getTaskResult` optional and actually invoke them

**Bug fix:** `getTask` and `getTaskResult` handlers registered via `registerToolTask` were never invoked — `tasks/get` and `tasks/result` requests always hit `TaskStore` directly.

**Breaking changes (experimental API):**

- `getTask` and `getTaskResult` are now **optional** on `ToolTaskHandler`. When omitted, `TaskStore` handles the requests (previous de-facto behavior).
- `TaskRequestHandler` signature changed: handlers receive only `(ctx: TaskServerContext)`, not the tool's input arguments.

**Migration:** If your handlers just delegated to `ctx.task.store`, delete them. If you're proxying an external job system (Step Functions, CI/CD pipelines), keep them and drop the `args` parameter.
2 changes: 2 additions & 0 deletions docs/migration-SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ Notes:
| `ErrorCode.RequestTimeout` | `SdkErrorCode.RequestTimeout` |
| `ErrorCode.ConnectionClosed` | `SdkErrorCode.ConnectionClosed` |
| `StreamableHTTPError` | REMOVED (use `SdkError` with `SdkErrorCode.ClientHttp*`) |
| `ToolTaskHandler.getTask(args, ctx)` | `ToolTaskHandler.getTask?(ctx)` — now optional, no args |
| `ToolTaskHandler.getTaskResult(args, ctx)` | `ToolTaskHandler.getTaskResult?(ctx)` — now optional, no args |

All other symbols from `@modelcontextprotocol/sdk/types.js` retain their original names (e.g., `CallToolResultSchema`, `ListToolsResultSchema`, etc.).

Expand Down
31 changes: 31 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,37 @@ import { JSONRPCError, ResourceReference, isJSONRPCError } from '@modelcontextpr
import { JSONRPCErrorResponse, ResourceTemplateReference, isJSONRPCErrorResponse } from '@modelcontextprotocol/server';
```

### `ToolTaskHandler.getTask` and `getTaskResult` are now optional (experimental)

`getTask` and `getTaskResult` are now optional on `ToolTaskHandler`. When omitted, `tasks/get` and `tasks/result` are served directly from the configured `TaskStore`. Their signature has also changed — they no longer receive the tool's input arguments (which aren't available at `tasks/get`/`tasks/result` time).

If your handlers just delegated to the store, delete them:

**Before:**

```typescript
server.experimental.tasks.registerToolTask('long-task', config, {
createTask: async (args, ctx) => { /* ... */ },
getTask: async (args, ctx) => ctx.task.store.getTask(ctx.task.id),
getTaskResult: async (args, ctx) => ctx.task.store.getTaskResult(ctx.task.id)
});
```

**After:**

```typescript
server.experimental.tasks.registerToolTask('long-task', config, {
createTask: async (args, ctx) => { /* ... */ }
});
```

Keep them if you're proxying an external job system (AWS Step Functions, CI/CD pipelines, etc.) — the new signature takes only `ctx`:

```typescript
getTask: async (ctx) => describeStepFunctionExecution(ctx.task.id),
getTaskResult: async (ctx) => getStepFunctionOutput(ctx.task.id)
```

### Request handler context types

The `RequestHandlerExtra` type has been replaced with a structured context type hierarchy using nested groups:
Expand Down
14 changes: 0 additions & 14 deletions examples/server/src/simpleStreamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,13 +480,6 @@ const getServer = () => {
return {
task
};
},
async getTask(_args, ctx) {
return await ctx.task.store.getTask(ctx.task.id);
},
async getTaskResult(_args, ctx) {
const result = await ctx.task.store.getTaskResult(ctx.task.id);
return result as CallToolResult;
}
}
);
Expand Down Expand Up @@ -588,13 +581,6 @@ const getServer = () => {
})();

return { task };
},
async getTask(_args, ctx) {
return await ctx.task.store.getTask(ctx.task.id);
},
async getTaskResult(_args, ctx) {
const result = await ctx.task.store.getTaskResult(ctx.task.id);
return result as CallToolResult;
}
}
);
Expand Down
45 changes: 34 additions & 11 deletions packages/core/src/shared/taskManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ export type TaskContext = {
requestedTtl?: number | null;
};

/**
* Overrides for `tasks/get` and `tasks/result` lookups. Consulted before
* the configured {@linkcode TaskStore}; return `undefined` to fall through.
* @internal
*/
export type TaskLookupOverrides = {
getTask?: (taskId: string, ctx: BaseContext) => Promise<Task | undefined>;
getTaskResult?: (taskId: string, ctx: BaseContext) => Promise<Result | undefined>;
};

export type TaskManagerOptions = {
/**
* Task storage implementation. Required for handling incoming task requests (server-side).
Expand Down Expand Up @@ -199,20 +209,30 @@ export class TaskManager {
private _requestResolvers: Map<RequestId, (response: JSONRPCResultResponse | Error) => void> = new Map();
private _options: TaskManagerOptions;
private _host?: TaskManagerHost;
private _overrides?: TaskLookupOverrides;

constructor(options: TaskManagerOptions) {
this._options = options;
this._taskStore = options.taskStore;
this._taskMessageQueue = options.taskMessageQueue;
}

/**
* Installs per-task lookup overrides consulted before the {@linkcode TaskStore}.
* Used by McpServer to dispatch to per-tool `getTask`/`getTaskResult` handlers.
* @internal
*/
setTaskOverrides(overrides: TaskLookupOverrides): void {
this._overrides = overrides;
}

bind(host: TaskManagerHost): void {
this._host = host;

if (this._taskStore) {
host.registerHandler('tasks/get', async (request, ctx) => {
const params = request.params as { taskId: string };
const task = await this.handleGetTask(params.taskId, ctx.sessionId);
const task = await this.handleGetTask(params.taskId, ctx);
// Per spec: tasks/get responses SHALL NOT include related-task metadata
// as the taskId parameter is the source of truth
return {
Expand All @@ -222,7 +242,7 @@ export class TaskManager {

host.registerHandler('tasks/result', async (request, ctx) => {
const params = request.params as { taskId: string };
return await this.handleGetTaskPayload(params.taskId, ctx.sessionId, ctx.mcpReq.signal, async message => {
return await this.handleGetTaskPayload(params.taskId, ctx, async message => {
// Send the message on the response stream by passing the relatedRequestId
// This tells the transport to write the message to the tasks/result response stream
await host.sendOnResponseStream(message, ctx.mcpReq.id);
Expand Down Expand Up @@ -362,8 +382,11 @@ export class TaskManager {

// -- Handler bodies (delegated from Protocol's registered handlers) --

private async handleGetTask(taskId: string, sessionId?: string): Promise<Task> {
const task = await this._requireTaskStore.getTask(taskId, sessionId);
private async handleGetTask(taskId: string, ctx: BaseContext): Promise<Task> {
const override = await this._overrides?.getTask?.(taskId, ctx);
if (override !== undefined) return override;

const task = await this._requireTaskStore.getTask(taskId, ctx.sessionId);
if (!task) {
throw new ProtocolError(ProtocolErrorCode.InvalidParams, 'Failed to retrieve task: Task not found');
}
Expand All @@ -372,10 +395,12 @@ export class TaskManager {

private async handleGetTaskPayload(
taskId: string,
sessionId: string | undefined,
signal: AbortSignal,
ctx: BaseContext,
sendOnResponseStream: (message: JSONRPCNotification | JSONRPCRequest) => Promise<void>
): Promise<Result> {
const sessionId = ctx.sessionId;
const signal = ctx.mcpReq.signal;

const handleTaskResult = async (): Promise<Result> => {
if (this._taskMessageQueue) {
let queuedMessage: QueuedMessage | undefined;
Expand Down Expand Up @@ -404,17 +429,15 @@ export class TaskManager {
}
}

const task = await this._requireTaskStore.getTask(taskId, sessionId);
if (!task) {
throw new ProtocolError(ProtocolErrorCode.InvalidParams, `Task not found: ${taskId}`);
}
const task = await this.handleGetTask(taskId, ctx);

if (!isTerminal(task.status)) {
await this._waitForTaskUpdate(task.pollInterval, signal);
return await handleTaskResult();
}

const result = await this._requireTaskStore.getTaskResult(taskId, sessionId);
const override = await this._overrides?.getTaskResult?.(taskId, ctx);
const result = override ?? (await this._requireTaskStore.getTaskResult(taskId, sessionId));
await this._clearTaskQueue(taskId);

return {
Expand Down
44 changes: 32 additions & 12 deletions packages/server/src/experimental/tasks/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import type {
TaskServerContext
} from '@modelcontextprotocol/core';

import type { BaseToolCallback } from '../../server/mcp.js';
import type { AnyToolHandler, BaseToolCallback } from '../../server/mcp.js';

// ============================================================================
// Task Handler Types (for registerToolTask)
Expand All @@ -30,19 +30,27 @@ export type CreateTaskRequestHandler<

/**
* Handler for task operations (`get`, `getResult`).
*
* Receives only the context (no tool arguments — they are not available at
* `tasks/get` or `tasks/result` time). Access the task ID via `ctx.task.id`.
*
* @experimental
*/
export type TaskRequestHandler<SendResultT extends Result, Args extends StandardSchemaWithJSON | undefined = undefined> = BaseToolCallback<
SendResultT,
TaskServerContext,
Args
>;
export type TaskRequestHandler<SendResultT extends Result> = (ctx: TaskServerContext) => SendResultT | Promise<SendResultT>;

/**
* Interface for task-based tool handlers.
*
* Task-based tools split a long-running operation into three phases:
* `createTask`, `getTask`, and `getTaskResult`.
* Task-based tools create a task on `tools/call` and by default let the SDK's
* `TaskStore` handle subsequent `tasks/get` and `tasks/result` requests.
*
* Provide `getTask` and `getTaskResult` to override the default lookups — useful
* when proxying an external job system (e.g., AWS Step Functions, CI/CD pipelines)
* where the external system is the source of truth for task state.
*
* **Note:** the taskId → tool mapping used to dispatch `getTask`/`getTaskResult`
* is held in-memory and does not survive server restarts or span multiple
* instances. In those scenarios, requests fall through to the `TaskStore`.
*
* @see {@linkcode @modelcontextprotocol/server!experimental/tasks/mcpServer.ExperimentalMcpServerTasks#registerToolTask | registerToolTask} for registration.
* @experimental
Expand All @@ -56,11 +64,23 @@ export interface ToolTaskHandler<Args extends StandardSchemaWithJSON | undefined
*/
createTask: CreateTaskRequestHandler<CreateTaskResult, Args>;
/**
* Handler for `tasks/get` requests.
* Optional handler for `tasks/get` requests. When omitted, the configured
* `TaskStore` is consulted directly.
*/
getTask: TaskRequestHandler<GetTaskResult, Args>;
getTask?: TaskRequestHandler<GetTaskResult>;
/**
* Handler for `tasks/result` requests.
* Optional handler for `tasks/result` requests. When omitted, the configured
* `TaskStore` is consulted directly.
*/
getTaskResult: TaskRequestHandler<CallToolResult, Args>;
getTaskResult?: TaskRequestHandler<CallToolResult>;
}

/**
* Type guard for {@linkcode ToolTaskHandler}.
* @experimental
*/
export function isToolTaskHandler(
handler: AnyToolHandler<StandardSchemaWithJSON | undefined>
): handler is ToolTaskHandler<StandardSchemaWithJSON | undefined> {
return 'createTask' in handler;
}
78 changes: 67 additions & 11 deletions packages/server/src/experimental/tasks/mcpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,25 @@
* @experimental
*/

import type { StandardSchemaWithJSON, TaskToolExecution, ToolAnnotations, ToolExecution } from '@modelcontextprotocol/core';
import type {
BaseContext,
CallToolResult,
GetTaskResult,
ServerContext,
StandardSchemaWithJSON,
TaskManager,
TaskServerContext,
TaskToolExecution,
ToolAnnotations,
ToolExecution
} from '@modelcontextprotocol/core';

import type { AnyToolHandler, McpServer, RegisteredTool } from '../../server/mcp.js';
import type { ToolTaskHandler } from './interfaces.js';
import { isToolTaskHandler } from './interfaces.js';

/**
* Internal interface for accessing {@linkcode McpServer}'s private _createRegisteredTool method.
* Internal interface for accessing {@linkcode McpServer}'s private members.
* @internal
*/
interface McpServerInternal {
Expand All @@ -26,6 +38,7 @@ interface McpServerInternal {
_meta: Record<string, unknown> | undefined,
handler: AnyToolHandler<StandardSchemaWithJSON | undefined>
): RegisteredTool;
_registeredTools: { [name: string]: RegisteredTool };
}

/**
Expand All @@ -39,14 +52,63 @@ interface McpServerInternal {
* @experimental
*/
export class ExperimentalMcpServerTasks {
/**
* Maps taskId → toolName for tasks whose handlers define custom
* `getTask` or `getTaskResult`. In-memory only; after a server restart
* or on a different instance, lookups fall through to the TaskStore.
*/
private _taskToTool = new Map<string, string>();

constructor(private readonly _mcpServer: McpServer) {}

/** @internal */
_installOverrides(taskManager: TaskManager): void {
taskManager.setTaskOverrides({
getTask: (taskId, ctx) => this._dispatch(taskId, ctx, 'getTask'),
getTaskResult: (taskId, ctx) => this._dispatch(taskId, ctx, 'getTaskResult')
});
}

/** @internal */
_recordTask(taskId: string, toolName: string): void {
const tool = (this._mcpServer as unknown as McpServerInternal)._registeredTools[toolName];
if (tool && isToolTaskHandler(tool.handler) && (tool.handler.getTask || tool.handler.getTaskResult)) {
this._taskToTool.set(taskId, toolName);
}
Comment on lines +60 to +77
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 _taskToTool Map grows without bound: entries are added via _recordTask() but never deleted when tasks reach terminal status or when the server closes. For long-running servers using custom getTask/getTaskResult handlers, this is an unbounded memory leak. Consider deleting entries after returning a terminal result in _dispatch, and clearing the map on close (similar to how TaskManager._taskProgressTokens has _cleanupTaskProgressHandler() and onClose() cleanup).

Extended reasoning...

What the bug is

The _taskToTool Map (line 60) in ExperimentalMcpServerTasks records taskId -> toolName mappings whenever a task is created for a tool that has custom getTask or getTaskResult handlers. Entries are added in _recordTask() but there are zero calls to _taskToTool.delete() or _taskToTool.clear() anywhere in the class or codebase.

Code path

The flow is:

  1. A client calls tools/call with task augmentation. McpServer calls this._experimental.tasks._recordTask(taskId, toolName) (mcp.ts lines 200 and 322).
  2. _recordTask() checks if the tool has custom getTask/getTaskResult handlers and, if so, stores the mapping: this._taskToTool.set(taskId, toolName) (mcpServer.ts line 77).
  3. When tasks/get or tasks/result arrives, _dispatch() reads from the map to route to the correct handler.
  4. No code ever removes the entry -- not after a terminal result is returned, not when the task is cancelled, and not when the server shuts down.

Why existing code does not prevent it

The TaskManager class in core has cleanup patterns for its own maps: _cleanupTaskProgressHandler() deletes entries from _taskProgressTokens when a task reaches terminal status, and onClose() clears the entire map. The ExperimentalMcpServerTasks class follows none of these patterns -- it has no onClose() hook and no per-task cleanup.

Step-by-step proof

  1. Server starts, _taskToTool is empty (size 0).
  2. Client creates task T1 for tool proxy-task (which has custom getTask/getTaskResult). _recordTask("T1", "proxy-task") runs. Map size = 1.
  3. T1 completes. _dispatch returns the result. Map still has entry "T1" -> "proxy-task" -- size = 1.
  4. Client creates task T2 for the same tool. Map size = 2.
  5. T2 completes. Map size = 2.
  6. After N tasks, map size = N. The entries for T1, T2, etc. are never removed despite those tasks being long finished.

Impact

Each entry is just two short strings (~100 bytes), so the leak is slow. For the common case (no custom handlers), _recordTask skips the .set() call entirely, so there is no leak. The leak only manifests for the uncommon proxy use case (custom getTask/getTaskResult handlers) on a long-running server that processes many tasks. The PR description already acknowledges the in-memory-only nature of this map, but the concern there was about cross-instance behavior -- within a single process the map grows monotonically.

Suggested fix

Delete the entry in _dispatch after a terminal result is returned (when the method is getTaskResult, since that is the final lookup). Also add an onClose() method that calls this._taskToTool.clear() and wire it into the server shutdown path. This mirrors the _taskProgressTokens cleanup pattern in TaskManager.

}

private async _dispatch<M extends 'getTask' | 'getTaskResult'>(
taskId: string,
ctx: BaseContext,
method: M
): Promise<(M extends 'getTask' ? GetTaskResult : CallToolResult) | undefined> {
const toolName = this._taskToTool.get(taskId);
if (!toolName) return undefined;

const tool = (this._mcpServer as unknown as McpServerInternal)._registeredTools[toolName];
if (!tool || !isToolTaskHandler(tool.handler)) return undefined;

const handler = tool.handler[method];
if (!handler) return undefined;

const serverCtx = ctx as ServerContext;
if (!serverCtx.task?.store) return undefined;

const taskCtx: TaskServerContext = {
...serverCtx,
task: { ...serverCtx.task, id: taskId, store: serverCtx.task.store }
};

return handler(taskCtx) as M extends 'getTask' ? GetTaskResult : CallToolResult;
}

/**
* Registers a task-based tool with a config object and handler.
*
* Task-based tools support long-running operations that can be polled for status
* and results. The handler must implement {@linkcode ToolTaskHandler.createTask | createTask}, {@linkcode ToolTaskHandler.getTask | getTask}, and {@linkcode ToolTaskHandler.getTaskResult | getTaskResult}
* methods.
* and results. The handler implements {@linkcode ToolTaskHandler.createTask | createTask}
* to start the task; subsequent `tasks/get` and `tasks/result` requests are served
* from the configured `TaskStore`.
*
* @example
* ```typescript
Expand All @@ -59,19 +121,13 @@ export class ExperimentalMcpServerTasks {
* const task = await ctx.task.store.createTask({ ttl: 300000 });
* startBackgroundWork(task.taskId, args);
* return { task };
* },
* getTask: async (args, ctx) => {
* return ctx.task.store.getTask(ctx.task.id);
* },
* getTaskResult: async (args, ctx) => {
* return ctx.task.store.getTaskResult(ctx.task.id);
* }
* });
* ```
*
* @param name - The tool name
* @param config - Tool configuration (description, schemas, etc.)
* @param handler - Task handler with {@linkcode ToolTaskHandler.createTask | createTask}, {@linkcode ToolTaskHandler.getTask | getTask}, {@linkcode ToolTaskHandler.getTaskResult | getTaskResult} methods
* @param handler - Task handler with {@linkcode ToolTaskHandler.createTask | createTask}
* @returns {@linkcode server/mcp.RegisteredTool | RegisteredTool} for managing the tool's lifecycle
*
* @experimental
Expand Down
Loading
Loading