diff --git a/.changeset/abort-handlers-on-close.md b/.changeset/abort-handlers-on-close.md new file mode 100644 index 000000000..b6bc65e65 --- /dev/null +++ b/.changeset/abort-handlers-on-close.md @@ -0,0 +1,5 @@ +--- +'@modelcontextprotocol/core': patch +--- + +Abort in-flight request handlers when the connection closes. Previously, request handlers would continue running after the transport disconnected, wasting resources and preventing proper cleanup. Also fixes `InMemoryTransport.close()` firing `onclose` twice on the initiating side. diff --git a/.changeset/stdio-skip-non-json.md b/.changeset/stdio-skip-non-json.md new file mode 100644 index 000000000..d20b740c9 --- /dev/null +++ b/.changeset/stdio-skip-non-json.md @@ -0,0 +1,5 @@ +--- +'@modelcontextprotocol/core': patch +--- + +`ReadBuffer.readMessage()` now silently skips non-JSON lines instead of throwing `SyntaxError`. This prevents noisy `onerror` callbacks when hot-reload tools (tsx, nodemon) write debug output like "Gracefully restarting..." to stdout. Lines that parse as JSON but fail JSONRPC schema validation still throw. diff --git a/.github/workflows/conformance.yml b/.github/workflows/conformance.yml index 47606314e..67d9dcbfe 100644 --- a/.github/workflows/conformance.yml +++ b/.github/workflows/conformance.yml @@ -18,12 +18,12 @@ jobs: runs-on: ubuntu-latest continue-on-error: true steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - name: Install pnpm - uses: pnpm/action-setup@v4 + uses: pnpm/action-setup@b906affcce14559ad1aafd4ab0e942779e9f58b1 # v4 with: run_install: false - - uses: actions/setup-node@v4 + - uses: actions/setup-node@v6 with: node-version: 24 cache: pnpm @@ -36,12 +36,12 @@ jobs: runs-on: ubuntu-latest continue-on-error: true steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - name: Install pnpm - uses: pnpm/action-setup@v4 + uses: pnpm/action-setup@b906affcce14559ad1aafd4ab0e942779e9f58b1 # v4 with: run_install: false - - uses: actions/setup-node@v4 + - uses: actions/setup-node@v6 with: node-version: 24 cache: pnpm diff --git a/.github/workflows/deploy-docs.yml b/.github/workflows/deploy-docs.yml index 3b59623a3..aab034bf3 100644 --- a/.github/workflows/deploy-docs.yml +++ b/.github/workflows/deploy-docs.yml @@ -25,13 +25,13 @@ jobs: url: ${{ steps.deployment.outputs.page_url }} steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - name: Install pnpm - uses: pnpm/action-setup@v4 + uses: pnpm/action-setup@b906affcce14559ad1aafd4ab0e942779e9f58b1 # v4 with: run_install: false - - uses: actions/setup-node@v4 + - uses: actions/setup-node@v6 with: node-version: 24 cache: pnpm diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4fd4d35e9..3ab2c5094 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -19,7 +19,7 @@ jobs: - uses: actions/checkout@v6 - name: Install pnpm - uses: pnpm/action-setup@v4 + uses: pnpm/action-setup@b906affcce14559ad1aafd4ab0e942779e9f58b1 # v4 id: pnpm-install with: run_install: false @@ -44,7 +44,7 @@ jobs: - uses: actions/checkout@v6 - name: Install pnpm - uses: pnpm/action-setup@v4 + uses: pnpm/action-setup@b906affcce14559ad1aafd4ab0e942779e9f58b1 # v4 id: pnpm-install with: run_install: false @@ -71,7 +71,7 @@ jobs: steps: - uses: actions/checkout@v6 - name: Install pnpm - uses: pnpm/action-setup@v4 + uses: pnpm/action-setup@b906affcce14559ad1aafd4ab0e942779e9f58b1 # v4 with: run_install: false - uses: actions/setup-node@v6 @@ -81,12 +81,12 @@ jobs: cache-dependency-path: pnpm-lock.yaml - name: Set up Bun if: matrix.runtime == 'bun' - uses: oven-sh/setup-bun@v2 + uses: oven-sh/setup-bun@0c5077e51419868618aeaa5fe8019c62421857d6 # v2 with: bun-version: ${{ matrix.version }} - name: Set up Deno if: matrix.runtime == 'deno' - uses: denoland/setup-deno@v2 + uses: denoland/setup-deno@667a34cdef165d8d2b2e98dde39547c9daac7282 # v2 with: deno-version: ${{ matrix.version }} - run: pnpm install @@ -105,14 +105,14 @@ jobs: id-token: write steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - name: Install pnpm - uses: pnpm/action-setup@v4 + uses: pnpm/action-setup@b906affcce14559ad1aafd4ab0e942779e9f58b1 # v4 id: pnpm-install with: run_install: false - - uses: actions/setup-node@v4 + - uses: actions/setup-node@v6 with: node-version: 24 cache: pnpm diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index a180396b6..c2257e21b 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -19,7 +19,7 @@ jobs: - uses: actions/checkout@v6 - name: Install pnpm - uses: pnpm/action-setup@v4 + uses: pnpm/action-setup@b906affcce14559ad1aafd4ab0e942779e9f58b1 # v4 with: run_install: false diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ed0b1061b..d3f4ecf6d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -19,7 +19,7 @@ jobs: - uses: actions/checkout@v6 - name: Install pnpm - uses: pnpm/action-setup@v4 + uses: pnpm/action-setup@b906affcce14559ad1aafd4ab0e942779e9f58b1 # v4 with: run_install: false @@ -36,7 +36,7 @@ jobs: - name: Create Release Pull Request or Publish to npm id: changesets - uses: changesets/action@v1 + uses: changesets/action@6a0a831ff30acef54f2c6aa1cbbc1096b066edaf # v1 with: publish: pnpm run build:all && pnpm changeset publish env: diff --git a/.github/workflows/update-spec-types.yml b/.github/workflows/update-spec-types.yml index 4a54f76c5..1c67ef2f1 100644 --- a/.github/workflows/update-spec-types.yml +++ b/.github/workflows/update-spec-types.yml @@ -18,7 +18,7 @@ jobs: uses: actions/checkout@v6 - name: Install pnpm - uses: pnpm/action-setup@v4 + uses: pnpm/action-setup@b906affcce14559ad1aafd4ab0e942779e9f58b1 # v4 id: pnpm-install with: run_install: false diff --git a/packages/core/src/shared/protocol.ts b/packages/core/src/shared/protocol.ts index d82d67da3..d6daf0172 100644 --- a/packages/core/src/shared/protocol.ts +++ b/packages/core/src/shared/protocol.ts @@ -457,8 +457,11 @@ export abstract class Protocol { this._transport = transport; const _onclose = this.transport?.onclose; this._transport.onclose = () => { - _onclose?.(); - this._onclose(); + try { + _onclose?.(); + } finally { + this._onclose(); + } }; const _onerror = this.transport?.onerror; @@ -494,13 +497,28 @@ export abstract class Protocol { this._taskManager.onClose(); this._pendingDebouncedNotifications.clear(); + for (const info of this._timeoutInfo.values()) { + clearTimeout(info.timeoutId); + } + this._timeoutInfo.clear(); + + const requestHandlerAbortControllers = this._requestHandlerAbortControllers; + this._requestHandlerAbortControllers = new Map(); + const error = new SdkError(SdkErrorCode.ConnectionClosed, 'Connection closed'); this._transport = undefined; - this.onclose?.(); - for (const handler of responseHandlers.values()) { - handler(error); + try { + this.onclose?.(); + } finally { + for (const handler of responseHandlers.values()) { + handler(error); + } + + for (const controller of requestHandlerAbortControllers.values()) { + controller.abort(error); + } } } @@ -642,7 +660,9 @@ export abstract class Protocol { ) .catch(error => this._onerror(new Error(`Failed to send response: ${error}`))) .finally(() => { - this._requestHandlerAbortControllers.delete(request.id); + if (this._requestHandlerAbortControllers.get(request.id) === abortController) { + this._requestHandlerAbortControllers.delete(request.id); + } }); } diff --git a/packages/core/src/shared/stdio.ts b/packages/core/src/shared/stdio.ts index 773860770..7283a5ef9 100644 --- a/packages/core/src/shared/stdio.ts +++ b/packages/core/src/shared/stdio.ts @@ -12,18 +12,28 @@ export class ReadBuffer { } readMessage(): JSONRPCMessage | null { - if (!this._buffer) { - return null; + while (this._buffer) { + const index = this._buffer.indexOf('\n'); + if (index === -1) { + return null; + } + + const line = this._buffer.toString('utf8', 0, index).replace(/\r$/, ''); + this._buffer = this._buffer.subarray(index + 1); + + try { + return deserializeMessage(line); + } catch (error) { + // Skip non-JSON lines (e.g., debug output from hot-reload tools like + // tsx or nodemon that write to stdout). Schema validation errors still + // throw so malformed-but-valid-JSON messages surface via onerror. + if (error instanceof SyntaxError) { + continue; + } + throw error; + } } - - const index = this._buffer.indexOf('\n'); - if (index === -1) { - return null; - } - - const line = this._buffer.toString('utf8', 0, index).replace(/\r$/, ''); - this._buffer = this._buffer.subarray(index + 1); - return deserializeMessage(line); + return null; } clear(): void { diff --git a/packages/core/src/shared/taskManager.ts b/packages/core/src/shared/taskManager.ts index 28460d1d9..7ca7f7b4a 100644 --- a/packages/core/src/shared/taskManager.ts +++ b/packages/core/src/shared/taskManager.ts @@ -801,6 +801,7 @@ export class TaskManager { onClose(): void { this._taskProgressTokens.clear(); + this._requestResolvers.clear(); } // -- Private helpers -- @@ -893,8 +894,4 @@ export class NullTaskManager extends TaskManager { ): Promise<{ queued: boolean; jsonrpcNotification?: JSONRPCNotification }> { return { queued: false, jsonrpcNotification: { ...notification, jsonrpc: '2.0' } }; } - - override onClose(): void { - // No-op - } } diff --git a/packages/core/src/util/inMemory.ts b/packages/core/src/util/inMemory.ts index 1103b5733..256363c13 100644 --- a/packages/core/src/util/inMemory.ts +++ b/packages/core/src/util/inMemory.ts @@ -13,6 +13,7 @@ interface QueuedMessage { export class InMemoryTransport implements Transport { private _otherTransport?: InMemoryTransport; private _messageQueue: QueuedMessage[] = []; + private _closed = false; onclose?: () => void; onerror?: (error: Error) => void; @@ -39,10 +40,16 @@ export class InMemoryTransport implements Transport { } async close(): Promise { + if (this._closed) return; + this._closed = true; + const other = this._otherTransport; this._otherTransport = undefined; - await other?.close(); - this.onclose?.(); + try { + await other?.close(); + } finally { + this.onclose?.(); + } } /** diff --git a/packages/core/test/inMemory.test.ts b/packages/core/test/inMemory.test.ts index 280efdf4b..46332eaa2 100644 --- a/packages/core/test/inMemory.test.ts +++ b/packages/core/test/inMemory.test.ts @@ -99,6 +99,53 @@ describe('InMemoryTransport', () => { await expect(clientTransport.send({ jsonrpc: '2.0', method: 'test', id: 1 })).rejects.toThrow('Not connected'); }); + test('should fire onclose exactly once per transport', async () => { + let clientCloseCount = 0; + let serverCloseCount = 0; + + clientTransport.onclose = () => clientCloseCount++; + serverTransport.onclose = () => serverCloseCount++; + + await clientTransport.close(); + + expect(clientCloseCount).toBe(1); + expect(serverCloseCount).toBe(1); + }); + + test('should handle double close idempotently', async () => { + let clientCloseCount = 0; + clientTransport.onclose = () => clientCloseCount++; + + await clientTransport.close(); + await clientTransport.close(); + + expect(clientCloseCount).toBe(1); + }); + + test('should handle concurrent close from both sides', async () => { + let clientCloseCount = 0; + let serverCloseCount = 0; + + clientTransport.onclose = () => clientCloseCount++; + serverTransport.onclose = () => serverCloseCount++; + + await Promise.all([clientTransport.close(), serverTransport.close()]); + + expect(clientCloseCount).toBe(1); + expect(serverCloseCount).toBe(1); + }); + + test('should fire onclose even if peer onclose throws', async () => { + let clientCloseCount = 0; + clientTransport.onclose = () => clientCloseCount++; + serverTransport.onclose = () => { + throw new Error('boom'); + }; + + await expect(clientTransport.close()).rejects.toThrow('boom'); + expect(clientCloseCount).toBe(1); + }); + test('should queue messages sent before start', async () => { const message: JSONRPCMessage = { jsonrpc: '2.0', diff --git a/packages/core/test/shared/protocol.test.ts b/packages/core/test/shared/protocol.test.ts index 60e4e6d24..69735bc3a 100644 --- a/packages/core/test/shared/protocol.test.ts +++ b/packages/core/test/shared/protocol.test.ts @@ -217,6 +217,36 @@ describe('protocol tests', () => { expect(oncloseMock).toHaveBeenCalled(); }); + test('should abort in-flight request handlers when the connection is closed', async () => { + await protocol.connect(transport); + + let abortReason: unknown; + let handlerStarted = false; + const handlerDone = new Promise(resolve => { + protocol.setRequestHandler('ping', async (_request, ctx) => { + handlerStarted = true; + await new Promise(resolveInner => { + ctx.mcpReq.signal.addEventListener('abort', () => { + abortReason = ctx.mcpReq.signal.reason; + resolveInner(); + }); + }); + resolve(); + return {}; + }); + }); + + transport.onmessage?.({ jsonrpc: '2.0', id: 1, method: 'ping', params: {} }); + + await vi.waitFor(() => expect(handlerStarted).toBe(true)); + + await transport.close(); + await handlerDone; + + expect(abortReason).toBeInstanceOf(SdkError); + expect((abortReason as SdkError).code).toBe(SdkErrorCode.ConnectionClosed); + }); + test('should not overwrite existing hooks when connecting transports', async () => { const oncloseMock = vi.fn(); const onerrorMock = vi.fn(); diff --git a/packages/core/test/shared/stdio.test.ts b/packages/core/test/shared/stdio.test.ts index 7e880aa57..65d1de0ea 100644 --- a/packages/core/test/shared/stdio.test.ts +++ b/packages/core/test/shared/stdio.test.ts @@ -33,3 +33,83 @@ test('should be reusable after clearing', () => { readBuffer.append(Buffer.from('\n')); expect(readBuffer.readMessage()).toEqual(testMessage); }); + +describe('non-JSON line filtering', () => { + test('should skip empty lines', () => { + const readBuffer = new ReadBuffer(); + readBuffer.append(Buffer.from('\n\n' + JSON.stringify(testMessage) + '\n\n')); + + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('should skip non-JSON lines before a valid message', () => { + const readBuffer = new ReadBuffer(); + readBuffer.append(Buffer.from('Debug: Starting server\n' + 'Warning: Something happened\n' + JSON.stringify(testMessage) + '\n')); + + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('should skip non-JSON lines interleaved with multiple valid messages', () => { + const readBuffer = new ReadBuffer(); + const message1: JSONRPCMessage = { jsonrpc: '2.0', method: 'method1' }; + const message2: JSONRPCMessage = { jsonrpc: '2.0', method: 'method2' }; + + readBuffer.append( + Buffer.from( + 'Debug line 1\n' + + JSON.stringify(message1) + + '\n' + + 'Debug line 2\n' + + 'Another non-JSON line\n' + + JSON.stringify(message2) + + '\n' + ) + ); + + expect(readBuffer.readMessage()).toEqual(message1); + expect(readBuffer.readMessage()).toEqual(message2); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('should preserve incomplete JSON at end of buffer until completed', () => { + const readBuffer = new ReadBuffer(); + readBuffer.append(Buffer.from('{"jsonrpc": "2.0", "method": "test"')); + expect(readBuffer.readMessage()).toBeNull(); + + readBuffer.append(Buffer.from('}\n')); + expect(readBuffer.readMessage()).toEqual({ jsonrpc: '2.0', method: 'test' }); + }); + + test('should skip lines with unbalanced braces', () => { + const readBuffer = new ReadBuffer(); + readBuffer.append(Buffer.from('{incomplete\n' + 'incomplete}\n' + JSON.stringify(testMessage) + '\n')); + + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('should skip lines that look like JSON but fail to parse', () => { + const readBuffer = new ReadBuffer(); + readBuffer.append(Buffer.from('{invalidJson: true}\n' + JSON.stringify(testMessage) + '\n')); + + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('should tolerate leading/trailing whitespace around valid JSON', () => { + const readBuffer = new ReadBuffer(); + const message: JSONRPCMessage = { jsonrpc: '2.0', method: 'test' }; + readBuffer.append(Buffer.from(' ' + JSON.stringify(message) + ' \n')); + + expect(readBuffer.readMessage()).toEqual(message); + }); + + test('should still throw on valid JSON that fails schema validation', () => { + const readBuffer = new ReadBuffer(); + readBuffer.append(Buffer.from('{"not": "a jsonrpc message"}\n')); + + expect(() => readBuffer.readMessage()).toThrow(); + }); +});