From 6d604c696ba59aef79d863576013d7b7d1fe285e Mon Sep 17 00:00:00 2001 From: Kevin van Zonneveld Date: Tue, 14 Apr 2026 16:27:36 +0200 Subject: [PATCH 1/4] Make MCP HTTP stateless for hosted routing --- .changeset/stateless-mcp-http.md | 5 + packages/mcp-server/src/express.ts | 68 +++-------- packages/mcp-server/src/http.ts | 92 +++----------- .../test/e2e/non-sticky-routing.test.ts | 115 ++++++++++++++++++ 4 files changed, 155 insertions(+), 125 deletions(-) create mode 100644 .changeset/stateless-mcp-http.md create mode 100644 packages/mcp-server/test/e2e/non-sticky-routing.test.ts diff --git a/.changeset/stateless-mcp-http.md b/.changeset/stateless-mcp-http.md new file mode 100644 index 00000000..c8db4a6d --- /dev/null +++ b/.changeset/stateless-mcp-http.md @@ -0,0 +1,5 @@ +--- +'@transloadit/mcp-server': patch +--- + +Serve Streamable HTTP MCP requests statelessly so hosted deployments keep working behind non-sticky load balancing while preserving isolated transport instances per request. diff --git a/packages/mcp-server/src/express.ts b/packages/mcp-server/src/express.ts index a3a450b6..46443c7e 100644 --- a/packages/mcp-server/src/express.ts +++ b/packages/mcp-server/src/express.ts @@ -1,6 +1,4 @@ -import { randomUUID } from 'node:crypto' import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js' -import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js' import express from 'express' import type { TransloaditMcpHttpOptions } from './http.ts' import { isBasicAuthorized } from './http-helpers.ts' @@ -13,11 +11,6 @@ export type TransloaditMcpExpressOptions = TransloaditMcpHttpOptions & { } export function createTransloaditMcpExpressRouter(options: TransloaditMcpExpressOptions = {}) { - const sessionIdGenerator = options.sessionIdGenerator ?? (() => randomUUID()) - - // Per-session transport map: each MCP client gets its own transport + server pair. - const transports = new Map() - const router = express.Router() const routePath = options.path ?? '/mcp' const metricsPath = @@ -64,58 +57,27 @@ export function createTransloaditMcpExpressRouter(options: TransloaditMcpExpress }) router.all(routePath, async (req: express.Request, res: express.Response) => { - const sessionId = req.headers['mcp-session-id'] as string | undefined - let transport: StreamableHTTPServerTransport | undefined - - if (sessionId) { - transport = transports.get(sessionId) - if (!transport) { - res.status(404).json({ - jsonrpc: '2.0', - error: { code: -32000, message: 'Session not found' }, - id: null, - }) - return - } - } else if (req.method === 'POST' && isInitializeRequest(req.body)) { - // New initialization request — create a new transport + server pair. - const newTransport = new StreamableHTTPServerTransport({ - sessionIdGenerator, - allowedOrigins: options.allowedOrigins, - allowedHosts: options.allowedHosts, - enableDnsRebindingProtection: options.enableDnsRebindingProtection, - onsessioninitialized: (sid) => { - transports.set(sid, newTransport) - }, - }) - - newTransport.onclose = () => { - const sid = newTransport.sessionId - if (sid) { - transports.delete(sid) - } - } - - const server = createTransloaditMcpServer(options) - await server.connect(newTransport) - transport = newTransport - } else if (req.method === 'POST') { - res.status(400).json({ + if (req.method !== 'POST') { + res.status(405).json({ jsonrpc: '2.0', - error: { code: -32600, message: 'Bad Request: No valid session ID provided' }, + error: { code: -32000, message: 'Method not allowed.' }, id: null, }) return } - if (!transport) { - res.status(400).json({ - jsonrpc: '2.0', - error: { code: -32600, message: 'Bad Request: No valid session ID provided' }, - id: null, - }) - return - } + const transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: undefined, + allowedOrigins: options.allowedOrigins, + allowedHosts: options.allowedHosts, + enableDnsRebindingProtection: options.enableDnsRebindingProtection, + }) + const server = createTransloaditMcpServer(options) + res.on('close', () => { + void transport.close() + void server.close() + }) + await server.connect(transport) await transport.handleRequest(req, res, req.body) }) diff --git a/packages/mcp-server/src/http.ts b/packages/mcp-server/src/http.ts index 62d7e27d..eb32a1c2 100644 --- a/packages/mcp-server/src/http.ts +++ b/packages/mcp-server/src/http.ts @@ -1,7 +1,5 @@ -import { randomUUID } from 'node:crypto' import type { IncomingMessage, ServerResponse } from 'node:http' import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js' -import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js' import type { SevLogger } from '@transloadit/sev-logger' import { applyCorsHeaders, @@ -23,6 +21,7 @@ export type TransloaditMcpHttpOptions = TransloaditMcpServerOptions & { path?: string metricsPath?: string | false metricsAuth?: { username: string; password: string } + // Ignored on purpose: the hosted HTTP server is stateless and does not mint session IDs. sessionIdGenerator?: (() => string) | undefined logger?: SevLogger } @@ -36,7 +35,7 @@ export type TransloaditMcpHttpHandler = (( const defaultPath = '/mcp' -/** Read the full request body and JSON-parse it so `isInitializeRequest` can inspect the payload. */ +/** Read the full request body and JSON-parse it before handing it to the MCP transport. */ function readJsonBody(req: IncomingMessage): Promise { return new Promise((resolve, reject) => { const chunks: Buffer[] = [] @@ -64,10 +63,6 @@ export function createTransloaditMcpHttpHandler( const metricsPath = options.metricsPath === false ? undefined : normalizePath(options.metricsPath ?? '/metrics') const metricsAuth = options.metricsAuth - const sessionIdGenerator = options.sessionIdGenerator ?? (() => randomUUID()) - - // Per-session transport map: each MCP client gets its own transport + server pair. - const transports = new Map() const serverCardJson = JSON.stringify( buildServerCard(expectedPath, { authKey: options.authKey, authSecret: options.authSecret }), @@ -162,78 +157,33 @@ export function createTransloaditMcpHttpHandler( return } - // Route request to the correct per-session transport. - const sessionId = req.headers['mcp-session-id'] as string | undefined - let transport: StreamableHTTPServerTransport | undefined - - if (sessionId) { - transport = transports.get(sessionId) - if (!transport) { - res.statusCode = 404 - res.setHeader('Content-Type', 'application/json') - res.end( - JSON.stringify({ - jsonrpc: '2.0', - error: { code: -32000, message: 'Session not found' }, - id: null, - }), - ) - return - } - } - - // For POST requests without a session, read the body to check for initialization. - let parsedBody: unknown - if (req.method === 'POST' && !transport) { - parsedBody = await readJsonBody(req) - if (isInitializeRequest(parsedBody)) { - const newTransport = new StreamableHTTPServerTransport({ - sessionIdGenerator, - allowedOrigins: options.allowedOrigins, - allowedHosts: options.allowedHosts, - enableDnsRebindingProtection: options.enableDnsRebindingProtection, - onsessioninitialized: (sid) => { - transports.set(sid, newTransport) - }, - }) - - newTransport.onclose = () => { - const sid = newTransport.sessionId - if (sid) { - transports.delete(sid) - } - } - - const server = createTransloaditMcpServer(options) - await server.connect(newTransport) - transport = newTransport - } else { - res.statusCode = 400 - res.setHeader('Content-Type', 'application/json') - res.end( - JSON.stringify({ - jsonrpc: '2.0', - error: { code: -32600, message: 'Bad Request: No valid session ID provided' }, - id: null, - }), - ) - return - } - } - - if (!transport) { - res.statusCode = 400 + if (req.method !== 'POST') { + res.statusCode = 405 res.setHeader('Content-Type', 'application/json') res.end( JSON.stringify({ jsonrpc: '2.0', - error: { code: -32600, message: 'Bad Request: No valid session ID provided' }, + error: { code: -32000, message: 'Method not allowed.' }, id: null, }), ) return } + const parsedBody = await readJsonBody(req) + const transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: undefined, + allowedOrigins: options.allowedOrigins, + allowedHosts: options.allowedHosts, + enableDnsRebindingProtection: options.enableDnsRebindingProtection, + }) + const server = createTransloaditMcpServer(options) + res.on('close', () => { + void transport.close() + void server.close() + }) + await server.connect(transport) + try { await transport.handleRequest(req, res, parsedBody) } catch { @@ -245,9 +195,7 @@ export function createTransloaditMcpHttpHandler( }) as TransloaditMcpHttpHandler handler.close = async () => { - const closePromises = [...transports.values()].map((t) => t.close()) - await Promise.all(closePromises) - transports.clear() + return Promise.resolve() } return handler diff --git a/packages/mcp-server/test/e2e/non-sticky-routing.test.ts b/packages/mcp-server/test/e2e/non-sticky-routing.test.ts new file mode 100644 index 00000000..1ee95b79 --- /dev/null +++ b/packages/mcp-server/test/e2e/non-sticky-routing.test.ts @@ -0,0 +1,115 @@ +import { createServer, request as httpRequest } from 'node:http' +import type { AddressInfo, Socket } from 'node:net' +import { expect, test } from 'vitest' +import { createHttpClient, startHttpServer } from './http-server.ts' +import { parseToolPayload } from './mcp-client.ts' + +type RoutedRequest = { + method: string + path: string + headers: Record + body: Buffer +} + +function readRequest(req: import('node:http').IncomingMessage): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = [] + req.on('data', (chunk: Buffer) => chunks.push(chunk)) + req.on('end', () => { + resolve({ + method: req.method ?? 'GET', + path: req.url ?? '/', + headers: req.headers, + body: Buffer.concat(chunks), + }) + }) + req.on('error', reject) + }) +} + +function proxyToBackend( + targetPort: number, + routedRequest: RoutedRequest, + res: import('node:http').ServerResponse, +): Promise { + return new Promise((resolve, reject) => { + const upstream = httpRequest( + { + host: '127.0.0.1', + port: targetPort, + method: routedRequest.method, + path: routedRequest.path, + headers: routedRequest.headers, + }, + (upstreamRes) => { + res.writeHead(upstreamRes.statusCode ?? 500, upstreamRes.headers) + upstreamRes.pipe(res) + upstreamRes.on('end', resolve) + upstreamRes.on('error', reject) + }, + ) + + upstream.on('error', reject) + upstream.end(routedRequest.body) + }) +} + +async function startAlternatingProxy( + ports: [number, number], +): Promise<{ url: URL; close: () => Promise }> { + let requestCount = 0 + const server = createServer(async (req, res) => { + try { + const routedRequest = await readRequest(req) + const targetPort = ports[requestCount % ports.length]! + requestCount += 1 + await proxyToBackend(targetPort, routedRequest, res) + } catch (error) { + res.statusCode = 500 + res.end(error instanceof Error ? error.message : String(error)) + } + }) + + await new Promise((resolve) => { + server.listen(0, '127.0.0.1', resolve) + }) + + const { port } = server.address() as AddressInfo + return { + url: new URL(`http://127.0.0.1:${port}/mcp`), + close: async () => { + await new Promise((resolve, reject) => { + server.close((error) => (error ? reject(error) : resolve())) + }) + }, + } +} + +test('streamable http: survives non-sticky routing across hosted MCP backends', async () => { + const backendA = await startHttpServer() + const backendB = await startHttpServer() + const proxy = await startAlternatingProxy([ + backendA.url.port ? Number(backendA.url.port) : 0, + backendB.url.port ? Number(backendB.url.port) : 0, + ]) + + try { + const { client, transport } = await createHttpClient(proxy.url) + + try { + const robots = await client.callTool({ + name: 'transloadit_list_robots', + arguments: { limit: 1 }, + }) + + expect(parseToolPayload(robots).status).toBe('ok') + } finally { + await transport.close() + await client.close() + } + } finally { + await proxy.close() + await backendA.close() + await backendB.close() + } +}) From b7bdcbe575a3132c48ffa70d7e2b2067876b4c9e Mon Sep 17 00:00:00 2001 From: Kevin van Zonneveld Date: Tue, 14 Apr 2026 16:49:01 +0200 Subject: [PATCH 2/4] Harden stateless MCP routing coverage --- packages/mcp-server/src/http.ts | 4 +- .../test/e2e/non-sticky-routing.test.ts | 45 ++++++++++++++++++- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/packages/mcp-server/src/http.ts b/packages/mcp-server/src/http.ts index eb32a1c2..953b4d8e 100644 --- a/packages/mcp-server/src/http.ts +++ b/packages/mcp-server/src/http.ts @@ -194,9 +194,7 @@ export function createTransloaditMcpHttpHandler( } }) as TransloaditMcpHttpHandler - handler.close = async () => { - return Promise.resolve() - } + handler.close = () => Promise.resolve() return handler } diff --git a/packages/mcp-server/test/e2e/non-sticky-routing.test.ts b/packages/mcp-server/test/e2e/non-sticky-routing.test.ts index 1ee95b79..f29829cd 100644 --- a/packages/mcp-server/test/e2e/non-sticky-routing.test.ts +++ b/packages/mcp-server/test/e2e/non-sticky-routing.test.ts @@ -1,5 +1,5 @@ import { createServer, request as httpRequest } from 'node:http' -import type { AddressInfo, Socket } from 'node:net' +import type { AddressInfo } from 'node:net' import { expect, test } from 'vitest' import { createHttpClient, startHttpServer } from './http-server.ts' import { parseToolPayload } from './mcp-client.ts' @@ -61,7 +61,7 @@ async function startAlternatingProxy( const server = createServer(async (req, res) => { try { const routedRequest = await readRequest(req) - const targetPort = ports[requestCount % ports.length]! + const targetPort = ports[requestCount % ports.length] ?? ports[0] requestCount += 1 await proxyToBackend(targetPort, routedRequest, res) } catch (error) { @@ -113,3 +113,44 @@ test('streamable http: survives non-sticky routing across hosted MCP backends', await backendB.close() } }) + +test('streamable http: supports concurrent clients through non-sticky routing', async () => { + const backendA = await startHttpServer() + const backendB = await startHttpServer() + const proxy = await startAlternatingProxy([ + backendA.url.port ? Number(backendA.url.port) : 0, + backendB.url.port ? Number(backendB.url.port) : 0, + ]) + + try { + const sessions = await Promise.all( + Array.from({ length: 10 }, async () => createHttpClient(proxy.url)), + ) + + try { + const results = await Promise.all( + sessions.map(async ({ client }) => + client.callTool({ + name: 'transloadit_list_robots', + arguments: { limit: 1 }, + }), + ), + ) + + for (const result of results) { + expect(parseToolPayload(result).status).toBe('ok') + } + } finally { + await Promise.all( + sessions.map(async ({ client, transport }) => { + await transport.close() + await client.close() + }), + ) + } + } finally { + await proxy.close() + await backendA.close() + await backendB.close() + } +}) From c7fdc0678ba3e566d2550092d33112cf5b82efdf Mon Sep 17 00:00:00 2001 From: Kevin van Zonneveld Date: Tue, 14 Apr 2026 17:08:05 +0200 Subject: [PATCH 3/4] Drain active MCP transports on shutdown --- packages/mcp-server/src/http.ts | 24 +++- .../mcp-server/test/unit/http-handler.test.ts | 104 ++++++++++++++++++ 2 files changed, 125 insertions(+), 3 deletions(-) create mode 100644 packages/mcp-server/test/unit/http-handler.test.ts diff --git a/packages/mcp-server/src/http.ts b/packages/mcp-server/src/http.ts index 953b4d8e..e4054d86 100644 --- a/packages/mcp-server/src/http.ts +++ b/packages/mcp-server/src/http.ts @@ -59,6 +59,10 @@ function readJsonBody(req: IncomingMessage): Promise { export function createTransloaditMcpHttpHandler( options: TransloaditMcpHttpOptions = {}, ): TransloaditMcpHttpHandler { + const activeRequests = new Set<{ + transport: StreamableHTTPServerTransport + server: Awaited> + }>() const expectedPath = options.path ?? defaultPath const metricsPath = options.metricsPath === false ? undefined : normalizePath(options.metricsPath ?? '/metrics') @@ -178,10 +182,14 @@ export function createTransloaditMcpHttpHandler( enableDnsRebindingProtection: options.enableDnsRebindingProtection, }) const server = createTransloaditMcpServer(options) - res.on('close', () => { + const activeRequest = { transport, server } + activeRequests.add(activeRequest) + const cleanupActiveRequest = () => { + activeRequests.delete(activeRequest) void transport.close() void server.close() - }) + } + res.on('close', cleanupActiveRequest) await server.connect(transport) try { @@ -191,10 +199,20 @@ export function createTransloaditMcpHttpHandler( res.statusCode = 500 res.end('Internal Server Error') } + } finally { + activeRequests.delete(activeRequest) } }) as TransloaditMcpHttpHandler - handler.close = () => Promise.resolve() + handler.close = async () => { + await Promise.all( + Array.from(activeRequests, async (activeRequest) => { + activeRequests.delete(activeRequest) + const { transport, server } = activeRequest + await Promise.all([transport.close(), server.close()]) + }), + ) + } return handler } diff --git a/packages/mcp-server/test/unit/http-handler.test.ts b/packages/mcp-server/test/unit/http-handler.test.ts new file mode 100644 index 00000000..c4415471 --- /dev/null +++ b/packages/mcp-server/test/unit/http-handler.test.ts @@ -0,0 +1,104 @@ +import { once } from 'node:events' +import { createServer, request as httpRequest } from 'node:http' +import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest' + +const transportInstances: MockTransport[] = [] +const serverInstances: MockServer[] = [] + +class MockTransport { + public closed = false + + public constructor() { + transportInstances.push(this) + } + + public async handleRequest(): Promise { + await new Promise(() => {}) + } + + public async close(): Promise { + this.closed = true + } +} + +class MockServer { + public closed = false + + public constructor() { + serverInstances.push(this) + } + + public async connect(): Promise {} + + public async close(): Promise { + this.closed = true + } +} + +vi.mock('@modelcontextprotocol/sdk/server/streamableHttp.js', () => { + return { + StreamableHTTPServerTransport: MockTransport, + } +}) + +vi.mock('../../src/server.ts', () => { + return { + createTransloaditMcpServer: () => new MockServer(), + } +}) + +describe('createTransloaditMcpHttpHandler', () => { + beforeEach(() => { + transportInstances.length = 0 + serverInstances.length = 0 + }) + + afterEach(() => { + vi.restoreAllMocks() + }) + + test('handler.close drains active stateless request transports and servers', async () => { + const { createTransloaditMcpHttpHandler } = await import('../../src/http.ts') + + const handler = createTransloaditMcpHttpHandler() + const server = createServer((req, res) => { + void handler(req, res) + }) + + await new Promise((resolve) => { + server.listen(0, '127.0.0.1', resolve) + }) + + const address = server.address() + if (!address || typeof address === 'string') { + throw new Error('Expected numeric test server address') + } + + const req = httpRequest({ + host: '127.0.0.1', + port: address.port, + method: 'POST', + path: '/mcp', + headers: { + 'content-type': 'application/json', + }, + }) + req.on('error', () => {}) + req.write('{}') + req.end() + + await vi.waitFor(() => { + expect(transportInstances).toHaveLength(1) + expect(serverInstances).toHaveLength(1) + }) + + await handler.close() + + expect(transportInstances[0]?.closed).toBe(true) + expect(serverInstances[0]?.closed).toBe(true) + + req.destroy() + server.close() + await once(server, 'close') + }) +}) From ca9626e4a4fc2eccbb35f9fe93f864b82194a5b0 Mon Sep 17 00:00:00 2001 From: Kevin van Zonneveld Date: Tue, 14 Apr 2026 17:13:51 +0200 Subject: [PATCH 4/4] Fix MCP handler shutdown test lint --- packages/mcp-server/test/unit/http-handler.test.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/mcp-server/test/unit/http-handler.test.ts b/packages/mcp-server/test/unit/http-handler.test.ts index c4415471..bf828f0a 100644 --- a/packages/mcp-server/test/unit/http-handler.test.ts +++ b/packages/mcp-server/test/unit/http-handler.test.ts @@ -16,8 +16,9 @@ class MockTransport { await new Promise(() => {}) } - public async close(): Promise { + public close(): Promise { this.closed = true + return Promise.resolve() } } @@ -28,10 +29,13 @@ class MockServer { serverInstances.push(this) } - public async connect(): Promise {} + public connect(): Promise { + return Promise.resolve() + } - public async close(): Promise { + public close(): Promise { this.closed = true + return Promise.resolve() } }