Skip to content

Commit 59792c0

Browse files
improvement(mcp): bound MCP memory and lifecycle concurrency (#4751)
* improvement(mcp): bound MCP memory and lifecycle concurrency * update db mock * address comments * address comments
1 parent 23d2922 commit 59792c0

31 files changed

Lines changed: 3618 additions & 386 deletions

File tree

apps/sim/app/api/mcp/serve/[serverId]/route.test.ts

Lines changed: 684 additions & 5 deletions
Large diffs are not rendered by default.

apps/sim/app/api/mcp/serve/[serverId]/route.ts

Lines changed: 526 additions & 92 deletions
Large diffs are not rendered by default.

apps/sim/app/api/mcp/servers/[id]/route.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ import { toError } from '@sim/utils/errors'
33
import type { NextRequest } from 'next/server'
44
import { updateMcpServerBodySchema } from '@/lib/api/contracts/mcp'
55
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
6-
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
6+
import {
7+
mcpBodyReadErrorResponse,
8+
readMcpJsonBodyWithLimit,
9+
withMcpAuth,
10+
} from '@/lib/mcp/middleware'
711
import { performUpdateMcpServer } from '@/lib/mcp/orchestration'
812
import {
913
createMcpErrorResponse,
@@ -28,7 +32,7 @@ export const PATCH = withRouteHandler(
2832
try {
2933
const { id: serverId } = await params
3034

31-
const rawBody = getParsedBody(request) ?? (await request.json())
35+
const rawBody = await readMcpJsonBodyWithLimit(request)
3236
const parsedBody = updateMcpServerBodySchema.safeParse(rawBody)
3337

3438
if (!parsedBody.success) {
@@ -82,6 +86,8 @@ export const PATCH = withRouteHandler(
8286
server: { ...rest, hasOauthClientSecret: !!_secret },
8387
})
8488
} catch (error) {
89+
const bodyErrorResponse = mcpBodyReadErrorResponse(error, request)
90+
if (bodyErrorResponse) return bodyErrorResponse
8591
logger.error(`[${requestId}] Error updating MCP server:`, error)
8692
return createMcpErrorResponse(toError(error), 'Failed to update MCP server', 500)
8793
}

apps/sim/app/api/mcp/servers/route.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ import type { NextRequest } from 'next/server'
77
import { createMcpServerBodySchema, deleteMcpServerByQuerySchema } from '@/lib/api/contracts/mcp'
88
import { validationErrorResponse } from '@/lib/api/server'
99
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
10-
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
10+
import {
11+
mcpBodyReadErrorResponse,
12+
readMcpJsonBodyWithLimit,
13+
withMcpAuth,
14+
} from '@/lib/mcp/middleware'
1115
import { performCreateMcpServer, performDeleteMcpServer } from '@/lib/mcp/orchestration'
1216
import {
1317
createMcpErrorResponse,
@@ -55,7 +59,7 @@ export const POST = withRouteHandler(
5559
withMcpAuth('write')(
5660
async (request: NextRequest, { userId, userName, userEmail, workspaceId, requestId }) => {
5761
try {
58-
const rawBody = getParsedBody(request) ?? (await request.json())
62+
const rawBody = await readMcpJsonBodyWithLimit(request)
5963
const parsedBody = createMcpServerBodySchema.safeParse(rawBody)
6064

6165
if (!parsedBody.success) {
@@ -120,6 +124,8 @@ export const POST = withRouteHandler(
120124
result.updated ? 200 : 201
121125
)
122126
} catch (error) {
127+
const bodyErrorResponse = mcpBodyReadErrorResponse(error, request)
128+
if (bodyErrorResponse) return bodyErrorResponse
123129
logger.error(`[${requestId}] Error registering MCP server:`, error)
124130
return createMcpErrorResponse(toError(error), 'Failed to register MCP server', 500)
125131
}

apps/sim/app/api/mcp/servers/test-connection/route.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ import {
1111
validateMcpDomain,
1212
validateMcpServerSsrf,
1313
} from '@/lib/mcp/domain-check'
14-
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
14+
import {
15+
mcpBodyReadErrorResponse,
16+
readMcpJsonBodyWithLimit,
17+
withMcpAuth,
18+
} from '@/lib/mcp/middleware'
1519
import { detectMcpAuthType } from '@/lib/mcp/oauth'
1620
import { resolveMcpConfigEnvVars } from '@/lib/mcp/resolve-config'
1721
import type { McpAuthType, McpTransport } from '@/lib/mcp/types'
@@ -64,7 +68,7 @@ function sanitizeConnectionError(error: unknown): string {
6468
export const POST = withRouteHandler(
6569
withMcpAuth('write')(async (request: NextRequest, { userId, workspaceId, requestId }) => {
6670
try {
67-
const rawBody = getParsedBody(request) ?? (await request.json())
71+
const rawBody = await readMcpJsonBodyWithLimit(request)
6872
const parsedBody = mcpServerTestBodySchema.safeParse(rawBody)
6973

7074
if (!parsedBody.success) {
@@ -235,6 +239,8 @@ export const POST = withRouteHandler(
235239

236240
return createMcpSuccessResponse(result, result.success ? 200 : 400)
237241
} catch (error) {
242+
const bodyErrorResponse = mcpBodyReadErrorResponse(error, request)
243+
if (bodyErrorResponse) return bodyErrorResponse
238244
logger.error(`[${requestId}] Error testing MCP server connection:`, error)
239245
return createMcpErrorResponse(toError(error), 'Failed to test server connection', 500)
240246
}

apps/sim/app/api/mcp/tools/discover/route.ts

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,55 @@
11
import { UnauthorizedError } from '@modelcontextprotocol/sdk/client/auth.js'
22
import { createLogger } from '@sim/logger'
3+
import { getErrorMessage } from '@sim/utils/errors'
34
import type { NextRequest } from 'next/server'
45
import { mcpToolDiscoveryQuerySchema, refreshMcpToolsBodySchema } from '@/lib/api/contracts/mcp'
56
import { validationErrorResponse } from '@/lib/api/server'
67
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
7-
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
8+
import {
9+
mcpBodyReadErrorResponse,
10+
readMcpJsonBodyWithLimit,
11+
withMcpAuth,
12+
} from '@/lib/mcp/middleware'
813
import { mcpService } from '@/lib/mcp/service'
914
import { McpOauthAuthorizationRequiredError, type McpToolDiscoveryResponse } from '@/lib/mcp/types'
1015
import { categorizeError, createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
1116

1217
const logger = createLogger('McpToolDiscoveryAPI')
18+
const MCP_REFRESH_DISCOVERY_CONCURRENCY = 5
1319

1420
export const dynamic = 'force-dynamic'
1521

22+
async function settleWithConcurrency<T, R>(
23+
items: T[],
24+
concurrency: number,
25+
task: (item: T) => Promise<R>
26+
): Promise<Array<PromiseSettledResult<R>>> {
27+
const results: Array<PromiseSettledResult<R> | undefined> = new Array(items.length)
28+
let nextIndex = 0
29+
30+
const workers = Array.from({ length: Math.min(concurrency, items.length) }, async () => {
31+
while (nextIndex < items.length) {
32+
const index = nextIndex
33+
nextIndex += 1
34+
try {
35+
results[index] = { status: 'fulfilled', value: await task(items[index]) }
36+
} catch (reason) {
37+
results[index] = { status: 'rejected', reason }
38+
}
39+
}
40+
})
41+
42+
await Promise.all(workers)
43+
44+
return results.map(
45+
(result) =>
46+
result ?? {
47+
status: 'rejected',
48+
reason: new Error('MCP refresh discovery task did not run'),
49+
}
50+
)
51+
}
52+
1653
export const GET = withRouteHandler(
1754
withMcpAuth('read')(async (request: NextRequest, { userId, workspaceId, requestId }) => {
1855
try {
@@ -63,7 +100,7 @@ export const GET = withRouteHandler(
63100
export const POST = withRouteHandler(
64101
withMcpAuth('read')(async (request: NextRequest, { userId, workspaceId, requestId }) => {
65102
try {
66-
const rawBody = getParsedBody(request) ?? (await request.json())
103+
const rawBody = await readMcpJsonBodyWithLimit(request)
67104
const parsedBody = refreshMcpToolsBodySchema.safeParse(rawBody)
68105

69106
if (!parsedBody.success) {
@@ -74,11 +111,13 @@ export const POST = withRouteHandler(
74111

75112
logger.info(`[${requestId}] Refreshing tools for ${serverIds.length} servers`)
76113

77-
const results = await Promise.allSettled(
78-
serverIds.map(async (serverId: string) => {
114+
const results = await settleWithConcurrency(
115+
serverIds,
116+
MCP_REFRESH_DISCOVERY_CONCURRENCY,
117+
async (serverId: string) => {
79118
const tools = await mcpService.discoverServerTools(userId, serverId, workspaceId, true)
80119
return { serverId, toolCount: tools.length }
81-
})
120+
}
82121
)
83122

84123
const successes: Array<{ serverId: string; toolCount: number }> = []
@@ -91,7 +130,7 @@ export const POST = withRouteHandler(
91130
} else {
92131
failures.push({
93132
serverId,
94-
error: result.reason instanceof Error ? result.reason.message : 'Unknown error',
133+
error: getErrorMessage(result.reason, 'Unknown error'),
95134
})
96135
}
97136
})
@@ -107,6 +146,8 @@ export const POST = withRouteHandler(
107146
},
108147
})
109148
} catch (error) {
149+
const bodyErrorResponse = mcpBodyReadErrorResponse(error, request)
150+
if (bodyErrorResponse) return bodyErrorResponse
110151
if (
111152
error instanceof McpOauthAuthorizationRequiredError ||
112153
error instanceof UnauthorizedError

apps/sim/app/api/mcp/tools/execute/route.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ import { getExecutionTimeout } from '@/lib/core/execution-limits'
88
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
99
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
1010
import { SIM_VIA_HEADER } from '@/lib/execution/call-chain'
11-
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
11+
import {
12+
mcpBodyReadErrorResponse,
13+
readMcpJsonBodyWithLimit,
14+
withMcpAuth,
15+
} from '@/lib/mcp/middleware'
1216
import { McpOauthRedirectRequired } from '@/lib/mcp/oauth'
1317
import { mcpService } from '@/lib/mcp/service'
1418
import {
@@ -53,7 +57,7 @@ export const POST = withRouteHandler(
5357
withMcpAuth('read')(async (request: NextRequest, { userId, workspaceId, requestId }) => {
5458
let serverId: string | undefined
5559
try {
56-
const rawBody = getParsedBody(request) ?? (await request.json())
60+
const rawBody = await readMcpJsonBodyWithLimit(request)
5761
const parsedBody = mcpToolExecutionBodySchema.safeParse(rawBody)
5862

5963
if (!parsedBody.success) {
@@ -235,6 +239,8 @@ export const POST = withRouteHandler(
235239

236240
return createMcpSuccessResponse(transformedResult)
237241
} catch (error) {
242+
const bodyErrorResponse = mcpBodyReadErrorResponse(error, request)
243+
if (bodyErrorResponse) return bodyErrorResponse
238244
if (
239245
error instanceof McpOauthAuthorizationRequiredError ||
240246
error instanceof McpOauthRedirectRequired ||

apps/sim/app/api/mcp/workflow-servers/[id]/route.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ import {
99
workflowMcpServerParamsSchema,
1010
} from '@/lib/api/contracts/workflow-mcp-servers'
1111
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
12-
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
12+
import {
13+
mcpBodyReadErrorResponse,
14+
readMcpJsonBodyWithLimit,
15+
withMcpAuth,
16+
} from '@/lib/mcp/middleware'
1317
import {
1418
performDeleteWorkflowMcpServer,
1519
performUpdateWorkflowMcpServer,
@@ -94,7 +98,7 @@ export const PATCH = withRouteHandler(
9498
) => {
9599
try {
96100
const { id: serverId } = workflowMcpServerParamsSchema.parse(await params)
97-
const rawBody = getParsedBody(request) ?? (await request.json())
101+
const rawBody = await readMcpJsonBodyWithLimit(request)
98102
const parsedBody = updateWorkflowMcpServerBodySchema.safeParse(rawBody)
99103

100104
if (!parsedBody.success) {
@@ -130,6 +134,8 @@ export const PATCH = withRouteHandler(
130134

131135
return createMcpSuccessResponse({ server: updatedServer })
132136
} catch (error) {
137+
const bodyErrorResponse = mcpBodyReadErrorResponse(error, request)
138+
if (bodyErrorResponse) return bodyErrorResponse
133139
logger.error(`[${requestId}] Error updating workflow MCP server:`, error)
134140
return createMcpErrorResponse(toError(error), 'Failed to update workflow MCP server', 500)
135141
}

apps/sim/app/api/mcp/workflow-servers/[id]/tools/[toolId]/route.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,17 @@ import {
99
workflowMcpToolParamsSchema,
1010
} from '@/lib/api/contracts/workflow-mcp-servers'
1111
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
12-
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
12+
import {
13+
mcpBodyReadErrorResponse,
14+
readMcpJsonBodyWithLimit,
15+
withMcpAuth,
16+
} from '@/lib/mcp/middleware'
1317
import { performDeleteWorkflowMcpTool, performUpdateWorkflowMcpTool } from '@/lib/mcp/orchestration'
14-
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
18+
import {
19+
createMcpErrorResponse,
20+
createMcpSuccessResponse,
21+
mcpOrchestrationStatus,
22+
} from '@/lib/mcp/utils'
1523

1624
const logger = createLogger('WorkflowMcpToolAPI')
1725

@@ -86,7 +94,7 @@ export const PATCH = withRouteHandler(
8694
) => {
8795
try {
8896
const { id: serverId, toolId } = workflowMcpToolParamsSchema.parse(await params)
89-
const rawBody = getParsedBody(request) ?? (await request.json())
97+
const rawBody = await readMcpJsonBodyWithLimit(request)
9098
const parsedBody = updateWorkflowMcpToolBodySchema.safeParse(rawBody)
9199

92100
if (!parsedBody.success) {
@@ -109,12 +117,10 @@ export const PATCH = withRouteHandler(
109117
parameterSchema: body.parameterSchema,
110118
})
111119
if (!result.success || !result.tool) {
112-
const status =
113-
result.errorCode === 'not_found' ? 404 : result.errorCode === 'validation' ? 400 : 500
114120
return createMcpErrorResponse(
115121
new Error(result.error || 'Failed to update tool'),
116122
result.error || 'Failed to update tool',
117-
status
123+
mcpOrchestrationStatus(result.errorCode)
118124
)
119125
}
120126

@@ -124,6 +130,8 @@ export const PATCH = withRouteHandler(
124130

125131
return createMcpSuccessResponse({ tool: updatedTool })
126132
} catch (error) {
133+
const bodyErrorResponse = mcpBodyReadErrorResponse(error, request)
134+
if (bodyErrorResponse) return bodyErrorResponse
127135
logger.error(`[${requestId}] Error updating tool:`, error)
128136
return createMcpErrorResponse(toError(error), 'Failed to update tool', 500)
129137
}
@@ -158,7 +166,7 @@ export const DELETE = withRouteHandler(
158166
return createMcpErrorResponse(
159167
new Error(result.error || 'Tool not found'),
160168
result.error || 'Tool not found',
161-
result.errorCode === 'not_found' ? 404 : 500
169+
mcpOrchestrationStatus(result.errorCode)
162170
)
163171
}
164172
const deletedTool = result.tool

apps/sim/app/api/mcp/workflow-servers/[id]/tools/route.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ import {
99
workflowMcpServerParamsSchema,
1010
} from '@/lib/api/contracts/workflow-mcp-servers'
1111
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
12-
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
12+
import {
13+
mcpBodyReadErrorResponse,
14+
readMcpJsonBodyWithLimit,
15+
withMcpAuth,
16+
} from '@/lib/mcp/middleware'
1317
import { performCreateWorkflowMcpTool } from '@/lib/mcp/orchestration'
1418
import {
1519
createMcpErrorResponse,
@@ -96,7 +100,7 @@ export const POST = withRouteHandler(
96100
) => {
97101
try {
98102
const { id: serverId } = workflowMcpServerParamsSchema.parse(await params)
99-
const rawBody = getParsedBody(request) ?? (await request.json())
103+
const rawBody = await readMcpJsonBodyWithLimit(request)
100104
const parsedBody = createWorkflowMcpToolBodySchema.safeParse(rawBody)
101105

102106
if (!parsedBody.success) {
@@ -136,6 +140,8 @@ export const POST = withRouteHandler(
136140

137141
return createMcpSuccessResponse({ tool }, 201)
138142
} catch (error) {
143+
const bodyErrorResponse = mcpBodyReadErrorResponse(error, request)
144+
if (bodyErrorResponse) return bodyErrorResponse
139145
logger.error(`[${requestId}] Error adding tool:`, error)
140146
return createMcpErrorResponse(toError(error), 'Failed to add tool', 500)
141147
}

0 commit comments

Comments
 (0)