Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/stateless-mcp-http.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@transloadit/mcp-server': patch
---

Serve Streamable HTTP MCP requests statelessly so hosted deployments keep working behind non-sticky load balancing while preserving isolated transport instances per request.
68 changes: 15 additions & 53 deletions packages/mcp-server/src/express.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { randomUUID } from 'node:crypto'
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'
import express from 'express'
import type { TransloaditMcpHttpOptions } from './http.ts'
import { isBasicAuthorized } from './http-helpers.ts'
Expand All @@ -13,11 +11,6 @@ export type TransloaditMcpExpressOptions = TransloaditMcpHttpOptions & {
}

export function createTransloaditMcpExpressRouter(options: TransloaditMcpExpressOptions = {}) {
const sessionIdGenerator = options.sessionIdGenerator ?? (() => randomUUID())

// Per-session transport map: each MCP client gets its own transport + server pair.
const transports = new Map<string, StreamableHTTPServerTransport>()

const router = express.Router()
const routePath = options.path ?? '/mcp'
const metricsPath =
Expand Down Expand Up @@ -64,58 +57,27 @@ export function createTransloaditMcpExpressRouter(options: TransloaditMcpExpress
})

router.all(routePath, async (req: express.Request, res: express.Response) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined
let transport: StreamableHTTPServerTransport | undefined

if (sessionId) {
transport = transports.get(sessionId)
if (!transport) {
res.status(404).json({
jsonrpc: '2.0',
error: { code: -32000, message: 'Session not found' },
id: null,
})
return
}
} else if (req.method === 'POST' && isInitializeRequest(req.body)) {
// New initialization request — create a new transport + server pair.
const newTransport = new StreamableHTTPServerTransport({
sessionIdGenerator,
allowedOrigins: options.allowedOrigins,
allowedHosts: options.allowedHosts,
enableDnsRebindingProtection: options.enableDnsRebindingProtection,
onsessioninitialized: (sid) => {
transports.set(sid, newTransport)
},
})

newTransport.onclose = () => {
const sid = newTransport.sessionId
if (sid) {
transports.delete(sid)
}
}

const server = createTransloaditMcpServer(options)
await server.connect(newTransport)
transport = newTransport
} else if (req.method === 'POST') {
res.status(400).json({
if (req.method !== 'POST') {
res.status(405).json({
jsonrpc: '2.0',
error: { code: -32600, message: 'Bad Request: No valid session ID provided' },
error: { code: -32000, message: 'Method not allowed.' },
id: null,
})
return
}

if (!transport) {
res.status(400).json({
jsonrpc: '2.0',
error: { code: -32600, message: 'Bad Request: No valid session ID provided' },
id: null,
})
return
}
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
allowedOrigins: options.allowedOrigins,
allowedHosts: options.allowedHosts,
enableDnsRebindingProtection: options.enableDnsRebindingProtection,
})
const server = createTransloaditMcpServer(options)
res.on('close', () => {
void transport.close()
void server.close()
})
await server.connect(transport)

await transport.handleRequest(req, res, req.body)
})
Expand Down
108 changes: 36 additions & 72 deletions packages/mcp-server/src/http.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { randomUUID } from 'node:crypto'
import type { IncomingMessage, ServerResponse } from 'node:http'
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'
import type { SevLogger } from '@transloadit/sev-logger'
import {
applyCorsHeaders,
Expand All @@ -23,6 +21,7 @@ export type TransloaditMcpHttpOptions = TransloaditMcpServerOptions & {
path?: string
metricsPath?: string | false
metricsAuth?: { username: string; password: string }
// Ignored on purpose: the hosted HTTP server is stateless and does not mint session IDs.
sessionIdGenerator?: (() => string) | undefined
logger?: SevLogger
}
Expand All @@ -36,7 +35,7 @@ export type TransloaditMcpHttpHandler = ((

const defaultPath = '/mcp'

/** Read the full request body and JSON-parse it so `isInitializeRequest` can inspect the payload. */
/** Read the full request body and JSON-parse it before handing it to the MCP transport. */
function readJsonBody(req: IncomingMessage): Promise<unknown> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = []
Expand All @@ -60,14 +59,14 @@ function readJsonBody(req: IncomingMessage): Promise<unknown> {
export function createTransloaditMcpHttpHandler(
options: TransloaditMcpHttpOptions = {},
): TransloaditMcpHttpHandler {
const activeRequests = new Set<{
transport: StreamableHTTPServerTransport
server: Awaited<ReturnType<typeof createTransloaditMcpServer>>
}>()
const expectedPath = options.path ?? defaultPath
const metricsPath =
options.metricsPath === false ? undefined : normalizePath(options.metricsPath ?? '/metrics')
const metricsAuth = options.metricsAuth
const sessionIdGenerator = options.sessionIdGenerator ?? (() => randomUUID())

// Per-session transport map: each MCP client gets its own transport + server pair.
const transports = new Map<string, StreamableHTTPServerTransport>()

const serverCardJson = JSON.stringify(
buildServerCard(expectedPath, { authKey: options.authKey, authSecret: options.authSecret }),
Expand Down Expand Up @@ -162,92 +161,57 @@ export function createTransloaditMcpHttpHandler(
return
}

// Route request to the correct per-session transport.
const sessionId = req.headers['mcp-session-id'] as string | undefined
let transport: StreamableHTTPServerTransport | undefined

if (sessionId) {
transport = transports.get(sessionId)
if (!transport) {
res.statusCode = 404
res.setHeader('Content-Type', 'application/json')
res.end(
JSON.stringify({
jsonrpc: '2.0',
error: { code: -32000, message: 'Session not found' },
id: null,
}),
)
return
}
}

// For POST requests without a session, read the body to check for initialization.
let parsedBody: unknown
if (req.method === 'POST' && !transport) {
parsedBody = await readJsonBody(req)
if (isInitializeRequest(parsedBody)) {
const newTransport = new StreamableHTTPServerTransport({
sessionIdGenerator,
allowedOrigins: options.allowedOrigins,
allowedHosts: options.allowedHosts,
enableDnsRebindingProtection: options.enableDnsRebindingProtection,
onsessioninitialized: (sid) => {
transports.set(sid, newTransport)
},
})

newTransport.onclose = () => {
const sid = newTransport.sessionId
if (sid) {
transports.delete(sid)
}
}

const server = createTransloaditMcpServer(options)
await server.connect(newTransport)
transport = newTransport
} else {
res.statusCode = 400
res.setHeader('Content-Type', 'application/json')
res.end(
JSON.stringify({
jsonrpc: '2.0',
error: { code: -32600, message: 'Bad Request: No valid session ID provided' },
id: null,
}),
)
return
}
}

if (!transport) {
res.statusCode = 400
if (req.method !== 'POST') {
res.statusCode = 405
res.setHeader('Content-Type', 'application/json')
res.end(
JSON.stringify({
jsonrpc: '2.0',
error: { code: -32600, message: 'Bad Request: No valid session ID provided' },
error: { code: -32000, message: 'Method not allowed.' },
id: null,
}),
)
return
}

const parsedBody = await readJsonBody(req)
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
allowedOrigins: options.allowedOrigins,
allowedHosts: options.allowedHosts,
enableDnsRebindingProtection: options.enableDnsRebindingProtection,
})
const server = createTransloaditMcpServer(options)
const activeRequest = { transport, server }
activeRequests.add(activeRequest)
const cleanupActiveRequest = () => {
activeRequests.delete(activeRequest)
void transport.close()
void server.close()
}
res.on('close', cleanupActiveRequest)
await server.connect(transport)

try {
await transport.handleRequest(req, res, parsedBody)
} catch {
if (!res.headersSent) {
res.statusCode = 500
res.end('Internal Server Error')
}
} finally {
activeRequests.delete(activeRequest)
}
}) as TransloaditMcpHttpHandler

handler.close = async () => {
const closePromises = [...transports.values()].map((t) => t.close())
await Promise.all(closePromises)
transports.clear()
await Promise.all(
Array.from(activeRequests, async (activeRequest) => {
activeRequests.delete(activeRequest)
const { transport, server } = activeRequest
await Promise.all([transport.close(), server.close()])
}),
)
}

return handler
Expand Down
Loading