Skip to content

Commit a98e20f

Browse files
committed
fix(providers): propagate abort signal to all LLM SDK calls
1 parent 60f9eb2 commit a98e20f

File tree

15 files changed

+209
-65
lines changed

15 files changed

+209
-65
lines changed

apps/sim/executor/handlers/agent/agent-handler.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,7 @@ export class AgentBlockHandler implements BlockHandler {
10861086
verbosity: providerRequest.verbosity,
10871087
thinkingLevel: providerRequest.thinkingLevel,
10881088
previousInteractionId: providerRequest.previousInteractionId,
1089+
abortSignal: ctx.abortSignal,
10891090
})
10901091

10911092
return this.processProviderResponse(response, block, responseFormat)

apps/sim/providers/anthropic/core.ts

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,20 @@ const ANTHROPIC_SDK_NON_STREAMING_MAX_TOKENS = 21333
138138
*/
139139
async function createMessage(
140140
anthropic: Anthropic,
141-
payload: AnthropicPayload
141+
payload: AnthropicPayload,
142+
abortSignal?: AbortSignal
142143
): Promise<Anthropic.Messages.Message> {
144+
const options = abortSignal ? { signal: abortSignal } : undefined
143145
if (payload.max_tokens > ANTHROPIC_SDK_NON_STREAMING_MAX_TOKENS && !payload.stream) {
144-
const stream = anthropic.messages.stream(payload as Anthropic.Messages.MessageStreamParams)
146+
const stream = anthropic.messages.stream(
147+
payload as Anthropic.Messages.MessageStreamParams,
148+
options
149+
)
145150
return stream.finalMessage()
146151
}
147152
return anthropic.messages.create(
148-
payload as Anthropic.Messages.MessageCreateParamsNonStreaming
153+
payload as Anthropic.Messages.MessageCreateParamsNonStreaming,
154+
options
149155
) as Promise<Anthropic.Messages.Message>
150156
}
151157

@@ -367,10 +373,13 @@ export async function executeAnthropicProviderRequest(
367373
const providerStartTime = Date.now()
368374
const providerStartTimeISO = new Date(providerStartTime).toISOString()
369375

370-
const streamResponse = await anthropic.messages.create({
371-
...payload,
372-
stream: true,
373-
} as Anthropic.Messages.MessageCreateParamsStreaming)
376+
const streamResponse = await anthropic.messages.create(
377+
{
378+
...payload,
379+
stream: true,
380+
} as Anthropic.Messages.MessageCreateParamsStreaming,
381+
request.abortSignal ? { signal: request.abortSignal } : undefined
382+
)
374383

375384
const streamingResult = {
376385
stream: createReadableStreamFromAnthropicStream(
@@ -461,7 +470,7 @@ export async function executeAnthropicProviderRequest(
461470
const forcedTools = preparedTools?.forcedTools || []
462471
let usedForcedTools: string[] = []
463472

464-
let currentResponse = await createMessage(anthropic, payload)
473+
let currentResponse = await createMessage(anthropic, payload, request.abortSignal)
465474
const firstResponseTime = Date.now() - initialCallTime
466475

467476
let content = ''
@@ -708,7 +717,7 @@ export async function executeAnthropicProviderRequest(
708717

709718
const nextModelStartTime = Date.now()
710719

711-
currentResponse = await createMessage(anthropic, nextPayload)
720+
currentResponse = await createMessage(anthropic, nextPayload, request.abortSignal)
712721

713722
const nextCheckResult = checkForForcedToolUsage(
714723
currentResponse,
@@ -758,7 +767,8 @@ export async function executeAnthropicProviderRequest(
758767
}
759768

760769
const streamResponse = await anthropic.messages.create(
761-
streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming
770+
streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming,
771+
request.abortSignal ? { signal: request.abortSignal } : undefined
762772
)
763773

764774
const streamingResult = {
@@ -860,7 +870,7 @@ export async function executeAnthropicProviderRequest(
860870
const forcedTools = preparedTools?.forcedTools || []
861871
let usedForcedTools: string[] = []
862872

863-
let currentResponse = await createMessage(anthropic, payload)
873+
let currentResponse = await createMessage(anthropic, payload, request.abortSignal)
864874
const firstResponseTime = Date.now() - initialCallTime
865875

866876
let content = ''
@@ -1118,7 +1128,7 @@ export async function executeAnthropicProviderRequest(
11181128

11191129
const nextModelStartTime = Date.now()
11201130

1121-
currentResponse = await createMessage(anthropic, nextPayload)
1131+
currentResponse = await createMessage(anthropic, nextPayload, request.abortSignal)
11221132

11231133
const nextCheckResult = checkForForcedToolUsage(
11241134
currentResponse,
@@ -1182,7 +1192,8 @@ export async function executeAnthropicProviderRequest(
11821192
}
11831193

11841194
const streamResponse = await anthropic.messages.create(
1185-
streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming
1195+
streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming,
1196+
request.abortSignal ? { signal: request.abortSignal } : undefined
11861197
)
11871198

11881199
const streamingResult = {

apps/sim/providers/azure-openai/index.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,10 @@ async function executeChatCompletionsRequest(
165165
stream: true,
166166
stream_options: { include_usage: true },
167167
}
168-
const streamResponse = await azureOpenAI.chat.completions.create(streamingParams)
168+
const streamResponse = await azureOpenAI.chat.completions.create(
169+
streamingParams,
170+
request.abortSignal ? { signal: request.abortSignal } : undefined
171+
)
169172

170173
const streamingResult = {
171174
stream: createReadableStreamFromAzureOpenAIStream(streamResponse, (content, usage) => {
@@ -243,7 +246,10 @@ async function executeChatCompletionsRequest(
243246
const forcedTools = preparedTools?.forcedTools || []
244247
let usedForcedTools: string[] = []
245248

246-
let currentResponse = (await azureOpenAI.chat.completions.create(payload)) as ChatCompletion
249+
let currentResponse = (await azureOpenAI.chat.completions.create(
250+
payload,
251+
request.abortSignal ? { signal: request.abortSignal } : undefined
252+
)) as ChatCompletion
247253
const firstResponseTime = Date.now() - initialCallTime
248254

249255
let content = currentResponse.choices[0]?.message?.content || ''
@@ -421,7 +427,10 @@ async function executeChatCompletionsRequest(
421427
}
422428

423429
const nextModelStartTime = Date.now()
424-
currentResponse = (await azureOpenAI.chat.completions.create(nextPayload)) as ChatCompletion
430+
currentResponse = (await azureOpenAI.chat.completions.create(
431+
nextPayload,
432+
request.abortSignal ? { signal: request.abortSignal } : undefined
433+
)) as ChatCompletion
425434

426435
const nextCheckResult = checkForForcedToolUsage(
427436
currentResponse,
@@ -471,7 +480,10 @@ async function executeChatCompletionsRequest(
471480
stream: true,
472481
stream_options: { include_usage: true },
473482
}
474-
const streamResponse = await azureOpenAI.chat.completions.create(streamingParams)
483+
const streamResponse = await azureOpenAI.chat.completions.create(
484+
streamingParams,
485+
request.abortSignal ? { signal: request.abortSignal } : undefined
486+
)
475487

476488
const streamingResult = {
477489
stream: createReadableStreamFromAzureOpenAIStream(streamResponse, (content, usage) => {

apps/sim/providers/bedrock/index.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,10 @@ export const bedrockProvider: ProviderConfig = {
284284
inferenceConfig,
285285
})
286286

287-
const streamResponse = await client.send(command)
287+
const streamResponse = await client.send(
288+
command,
289+
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
290+
)
288291

289292
if (!streamResponse.stream) {
290293
throw new Error('No stream returned from Bedrock')
@@ -379,7 +382,10 @@ export const bedrockProvider: ProviderConfig = {
379382
toolConfig,
380383
})
381384

382-
let currentResponse = await client.send(command)
385+
let currentResponse = await client.send(
386+
command,
387+
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
388+
)
383389
const firstResponseTime = Date.now() - initialCallTime
384390

385391
let content = ''
@@ -628,7 +634,10 @@ export const bedrockProvider: ProviderConfig = {
628634
: undefined,
629635
})
630636

631-
currentResponse = await client.send(nextCommand)
637+
currentResponse = await client.send(
638+
nextCommand,
639+
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
640+
)
632641

633642
const nextToolUseContentBlocks = (currentResponse.output?.message?.content || []).filter(
634643
(block): block is ContentBlock & { toolUse: ToolUseBlock } => 'toolUse' in block
@@ -696,7 +705,10 @@ export const bedrockProvider: ProviderConfig = {
696705
},
697706
})
698707

699-
const structuredResponse = await client.send(structuredOutputCommand)
708+
const structuredResponse = await client.send(
709+
structuredOutputCommand,
710+
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
711+
)
700712
const structuredOutputEndTime = Date.now()
701713

702714
timeSegments.push({
@@ -782,7 +794,10 @@ export const bedrockProvider: ProviderConfig = {
782794
toolConfig: streamToolConfig,
783795
})
784796

785-
const streamResponse = await client.send(streamCommand)
797+
const streamResponse = await client.send(
798+
streamCommand,
799+
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
800+
)
786801

787802
if (!streamResponse.stream) {
788803
throw new Error('No stream returned from Bedrock')

apps/sim/providers/cerebras/index.ts

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,13 @@ export const cerebrasProvider: ProviderConfig = {
117117
if (request.stream && (!tools || tools.length === 0)) {
118118
logger.info('Using streaming response for Cerebras request (no tools)')
119119

120-
const streamResponse: any = await client.chat.completions.create({
121-
...payload,
122-
stream: true,
123-
})
120+
const streamResponse: any = await client.chat.completions.create(
121+
{
122+
...payload,
123+
stream: true,
124+
},
125+
request.abortSignal ? { signal: request.abortSignal } : undefined
126+
)
124127

125128
const streamingResult = {
126129
stream: createReadableStreamFromCerebrasStream(streamResponse, (content, usage) => {
@@ -179,7 +182,10 @@ export const cerebrasProvider: ProviderConfig = {
179182
}
180183
const initialCallTime = Date.now()
181184

182-
let currentResponse = (await client.chat.completions.create(payload)) as CerebrasResponse
185+
let currentResponse = (await client.chat.completions.create(
186+
payload,
187+
request.abortSignal ? { signal: request.abortSignal } : undefined
188+
)) as CerebrasResponse
183189
const firstResponseTime = Date.now() - initialCallTime
184190

185191
let content = currentResponse.choices[0]?.message?.content || ''
@@ -365,7 +371,8 @@ export const cerebrasProvider: ProviderConfig = {
365371
finalPayload.tool_choice = 'none'
366372

367373
const finalResponse = (await client.chat.completions.create(
368-
finalPayload
374+
finalPayload,
375+
request.abortSignal ? { signal: request.abortSignal } : undefined
369376
)) as CerebrasResponse
370377

371378
const nextModelEndTime = Date.now()
@@ -401,7 +408,8 @@ export const cerebrasProvider: ProviderConfig = {
401408

402409
const nextModelStartTime = Date.now()
403410
currentResponse = (await client.chat.completions.create(
404-
nextPayload
411+
nextPayload,
412+
request.abortSignal ? { signal: request.abortSignal } : undefined
405413
)) as CerebrasResponse
406414

407415
const nextModelEndTime = Date.now()
@@ -443,7 +451,10 @@ export const cerebrasProvider: ProviderConfig = {
443451
stream: true,
444452
}
445453

446-
const streamResponse: any = await client.chat.completions.create(streamingPayload)
454+
const streamResponse: any = await client.chat.completions.create(
455+
streamingPayload,
456+
request.abortSignal ? { signal: request.abortSignal } : undefined
457+
)
447458

448459
const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output)
449460

apps/sim/providers/deepseek/index.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,13 @@ export const deepseekProvider: ProviderConfig = {
114114
if (request.stream && (!tools || tools.length === 0)) {
115115
logger.info('Using streaming response for DeepSeek request (no tools)')
116116

117-
const streamResponse = await deepseek.chat.completions.create({
118-
...payload,
119-
stream: true,
120-
})
117+
const streamResponse = await deepseek.chat.completions.create(
118+
{
119+
...payload,
120+
stream: true,
121+
},
122+
request.abortSignal ? { signal: request.abortSignal } : undefined
123+
)
121124

122125
const streamingResult = {
123126
stream: createReadableStreamFromDeepseekStream(
@@ -183,7 +186,10 @@ export const deepseekProvider: ProviderConfig = {
183186
const forcedTools = preparedTools?.forcedTools || []
184187
let usedForcedTools: string[] = []
185188

186-
let currentResponse = await deepseek.chat.completions.create(payload)
189+
let currentResponse = await deepseek.chat.completions.create(
190+
payload,
191+
request.abortSignal ? { signal: request.abortSignal } : undefined
192+
)
187193
const firstResponseTime = Date.now() - initialCallTime
188194

189195
let content = currentResponse.choices[0]?.message?.content || ''
@@ -375,7 +381,10 @@ export const deepseekProvider: ProviderConfig = {
375381
}
376382

377383
const nextModelStartTime = Date.now()
378-
currentResponse = await deepseek.chat.completions.create(nextPayload)
384+
currentResponse = await deepseek.chat.completions.create(
385+
nextPayload,
386+
request.abortSignal ? { signal: request.abortSignal } : undefined
387+
)
379388

380389
if (
381390
typeof nextPayload.tool_choice === 'object' &&
@@ -439,7 +448,10 @@ export const deepseekProvider: ProviderConfig = {
439448
stream: true,
440449
}
441450

442-
const streamResponse = await deepseek.chat.completions.create(streamingPayload)
451+
const streamResponse = await deepseek.chat.completions.create(
452+
streamingPayload,
453+
request.abortSignal ? { signal: request.abortSignal } : undefined
454+
)
443455

444456
const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output)
445457

apps/sim/providers/gemini/core.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,9 @@ export async function executeGeminiRequest(
882882
// Build configuration
883883
const geminiConfig: GenerateContentConfig = {}
884884

885+
if (request.abortSignal) {
886+
geminiConfig.abortSignal = request.abortSignal
887+
}
885888
if (request.temperature !== undefined) {
886889
geminiConfig.temperature = request.temperature
887890
}

apps/sim/providers/groq/index.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,13 @@ export const groqProvider: ProviderConfig = {
118118
const providerStartTime = Date.now()
119119
const providerStartTimeISO = new Date(providerStartTime).toISOString()
120120

121-
const streamResponse = await groq.chat.completions.create({
122-
...payload,
123-
stream: true,
124-
})
121+
const streamResponse = await groq.chat.completions.create(
122+
{
123+
...payload,
124+
stream: true,
125+
},
126+
request.abortSignal ? { signal: request.abortSignal } : undefined
127+
)
125128

126129
const streamingResult = {
127130
stream: createReadableStreamFromGroqStream(streamResponse as any, (content, usage) => {
@@ -185,7 +188,10 @@ export const groqProvider: ProviderConfig = {
185188
try {
186189
const initialCallTime = Date.now()
187190

188-
let currentResponse = await groq.chat.completions.create(payload)
191+
let currentResponse = await groq.chat.completions.create(
192+
payload,
193+
request.abortSignal ? { signal: request.abortSignal } : undefined
194+
)
189195
const firstResponseTime = Date.now() - initialCallTime
190196

191197
let content = currentResponse.choices[0]?.message?.content || ''
@@ -355,7 +361,10 @@ export const groqProvider: ProviderConfig = {
355361
}
356362

357363
const nextModelStartTime = Date.now()
358-
currentResponse = await groq.chat.completions.create(nextPayload)
364+
currentResponse = await groq.chat.completions.create(
365+
nextPayload,
366+
request.abortSignal ? { signal: request.abortSignal } : undefined
367+
)
359368

360369
const nextModelEndTime = Date.now()
361370
const thisModelTime = nextModelEndTime - nextModelStartTime
@@ -396,7 +405,10 @@ export const groqProvider: ProviderConfig = {
396405
stream: true,
397406
}
398407

399-
const streamResponse = await groq.chat.completions.create(streamingPayload)
408+
const streamResponse = await groq.chat.completions.create(
409+
streamingPayload,
410+
request.abortSignal ? { signal: request.abortSignal } : undefined
411+
)
400412

401413
const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output)
402414

0 commit comments

Comments
 (0)