diff --git a/apps/sim/executor/handlers/agent/agent-handler.ts b/apps/sim/executor/handlers/agent/agent-handler.ts index 075438dd38..506fe712f9 100644 --- a/apps/sim/executor/handlers/agent/agent-handler.ts +++ b/apps/sim/executor/handlers/agent/agent-handler.ts @@ -1086,6 +1086,7 @@ export class AgentBlockHandler implements BlockHandler { verbosity: providerRequest.verbosity, thinkingLevel: providerRequest.thinkingLevel, previousInteractionId: providerRequest.previousInteractionId, + abortSignal: ctx.abortSignal, }) return this.processProviderResponse(response, block, responseFormat) diff --git a/apps/sim/providers/anthropic/core.ts b/apps/sim/providers/anthropic/core.ts index 3a31b19ac6..dbe5df9222 100644 --- a/apps/sim/providers/anthropic/core.ts +++ b/apps/sim/providers/anthropic/core.ts @@ -138,14 +138,20 @@ const ANTHROPIC_SDK_NON_STREAMING_MAX_TOKENS = 21333 */ async function createMessage( anthropic: Anthropic, - payload: AnthropicPayload + payload: AnthropicPayload, + abortSignal?: AbortSignal ): Promise { + const options = abortSignal ? { signal: abortSignal } : undefined if (payload.max_tokens > ANTHROPIC_SDK_NON_STREAMING_MAX_TOKENS && !payload.stream) { - const stream = anthropic.messages.stream(payload as Anthropic.Messages.MessageStreamParams) + const stream = anthropic.messages.stream( + payload as Anthropic.Messages.MessageStreamParams, + options + ) return stream.finalMessage() } return anthropic.messages.create( - payload as Anthropic.Messages.MessageCreateParamsNonStreaming + payload as Anthropic.Messages.MessageCreateParamsNonStreaming, + options ) as Promise } @@ -367,10 +373,13 @@ export async function executeAnthropicProviderRequest( const providerStartTime = Date.now() const providerStartTimeISO = new Date(providerStartTime).toISOString() - const streamResponse = await anthropic.messages.create({ - ...payload, - stream: true, - } as Anthropic.Messages.MessageCreateParamsStreaming) + const streamResponse = await anthropic.messages.create( + { + ...payload, + stream: true, + } as Anthropic.Messages.MessageCreateParamsStreaming, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromAnthropicStream( @@ -461,7 +470,7 @@ export async function executeAnthropicProviderRequest( const forcedTools = preparedTools?.forcedTools || [] let usedForcedTools: string[] = [] - let currentResponse = await createMessage(anthropic, payload) + let currentResponse = await createMessage(anthropic, payload, request.abortSignal) const firstResponseTime = Date.now() - initialCallTime let content = '' @@ -708,7 +717,7 @@ export async function executeAnthropicProviderRequest( const nextModelStartTime = Date.now() - currentResponse = await createMessage(anthropic, nextPayload) + currentResponse = await createMessage(anthropic, nextPayload, request.abortSignal) const nextCheckResult = checkForForcedToolUsage( currentResponse, @@ -758,7 +767,8 @@ export async function executeAnthropicProviderRequest( } const streamResponse = await anthropic.messages.create( - streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming + streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming, + request.abortSignal ? { signal: request.abortSignal } : undefined ) const streamingResult = { @@ -860,7 +870,7 @@ export async function executeAnthropicProviderRequest( const forcedTools = preparedTools?.forcedTools || [] let usedForcedTools: string[] = [] - let currentResponse = await createMessage(anthropic, payload) + let currentResponse = await createMessage(anthropic, payload, request.abortSignal) const firstResponseTime = Date.now() - initialCallTime let content = '' @@ -1118,7 +1128,7 @@ export async function executeAnthropicProviderRequest( const nextModelStartTime = Date.now() - currentResponse = await createMessage(anthropic, nextPayload) + currentResponse = await createMessage(anthropic, nextPayload, request.abortSignal) const nextCheckResult = checkForForcedToolUsage( currentResponse, @@ -1182,7 +1192,8 @@ export async function executeAnthropicProviderRequest( } const streamResponse = await anthropic.messages.create( - streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming + streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming, + request.abortSignal ? { signal: request.abortSignal } : undefined ) const streamingResult = { diff --git a/apps/sim/providers/azure-openai/index.ts b/apps/sim/providers/azure-openai/index.ts index 3b711acd63..b171ba9f1a 100644 --- a/apps/sim/providers/azure-openai/index.ts +++ b/apps/sim/providers/azure-openai/index.ts @@ -165,7 +165,10 @@ async function executeChatCompletionsRequest( stream: true, stream_options: { include_usage: true }, } - const streamResponse = await azureOpenAI.chat.completions.create(streamingParams) + const streamResponse = await azureOpenAI.chat.completions.create( + streamingParams, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromAzureOpenAIStream(streamResponse, (content, usage) => { @@ -243,7 +246,10 @@ async function executeChatCompletionsRequest( const forcedTools = preparedTools?.forcedTools || [] let usedForcedTools: string[] = [] - let currentResponse = (await azureOpenAI.chat.completions.create(payload)) as ChatCompletion + let currentResponse = (await azureOpenAI.chat.completions.create( + payload, + request.abortSignal ? { signal: request.abortSignal } : undefined + )) as ChatCompletion const firstResponseTime = Date.now() - initialCallTime let content = currentResponse.choices[0]?.message?.content || '' @@ -421,7 +427,10 @@ async function executeChatCompletionsRequest( } const nextModelStartTime = Date.now() - currentResponse = (await azureOpenAI.chat.completions.create(nextPayload)) as ChatCompletion + currentResponse = (await azureOpenAI.chat.completions.create( + nextPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined + )) as ChatCompletion const nextCheckResult = checkForForcedToolUsage( currentResponse, @@ -471,7 +480,10 @@ async function executeChatCompletionsRequest( stream: true, stream_options: { include_usage: true }, } - const streamResponse = await azureOpenAI.chat.completions.create(streamingParams) + const streamResponse = await azureOpenAI.chat.completions.create( + streamingParams, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromAzureOpenAIStream(streamResponse, (content, usage) => { diff --git a/apps/sim/providers/bedrock/index.ts b/apps/sim/providers/bedrock/index.ts index 1d36af7869..ab7866a544 100644 --- a/apps/sim/providers/bedrock/index.ts +++ b/apps/sim/providers/bedrock/index.ts @@ -284,7 +284,10 @@ export const bedrockProvider: ProviderConfig = { inferenceConfig, }) - const streamResponse = await client.send(command) + const streamResponse = await client.send( + command, + request.abortSignal ? { abortSignal: request.abortSignal } : undefined + ) if (!streamResponse.stream) { throw new Error('No stream returned from Bedrock') @@ -379,7 +382,10 @@ export const bedrockProvider: ProviderConfig = { toolConfig, }) - let currentResponse = await client.send(command) + let currentResponse = await client.send( + command, + request.abortSignal ? { abortSignal: request.abortSignal } : undefined + ) const firstResponseTime = Date.now() - initialCallTime let content = '' @@ -628,7 +634,10 @@ export const bedrockProvider: ProviderConfig = { : undefined, }) - currentResponse = await client.send(nextCommand) + currentResponse = await client.send( + nextCommand, + request.abortSignal ? { abortSignal: request.abortSignal } : undefined + ) const nextToolUseContentBlocks = (currentResponse.output?.message?.content || []).filter( (block): block is ContentBlock & { toolUse: ToolUseBlock } => 'toolUse' in block @@ -696,7 +705,10 @@ export const bedrockProvider: ProviderConfig = { }, }) - const structuredResponse = await client.send(structuredOutputCommand) + const structuredResponse = await client.send( + structuredOutputCommand, + request.abortSignal ? { abortSignal: request.abortSignal } : undefined + ) const structuredOutputEndTime = Date.now() timeSegments.push({ @@ -782,7 +794,10 @@ export const bedrockProvider: ProviderConfig = { toolConfig: streamToolConfig, }) - const streamResponse = await client.send(streamCommand) + const streamResponse = await client.send( + streamCommand, + request.abortSignal ? { abortSignal: request.abortSignal } : undefined + ) if (!streamResponse.stream) { throw new Error('No stream returned from Bedrock') diff --git a/apps/sim/providers/cerebras/index.ts b/apps/sim/providers/cerebras/index.ts index 23daff85f2..85ce7a2445 100644 --- a/apps/sim/providers/cerebras/index.ts +++ b/apps/sim/providers/cerebras/index.ts @@ -117,10 +117,13 @@ export const cerebrasProvider: ProviderConfig = { if (request.stream && (!tools || tools.length === 0)) { logger.info('Using streaming response for Cerebras request (no tools)') - const streamResponse: any = await client.chat.completions.create({ - ...payload, - stream: true, - }) + const streamResponse: any = await client.chat.completions.create( + { + ...payload, + stream: true, + }, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromCerebrasStream(streamResponse, (content, usage) => { @@ -179,7 +182,10 @@ export const cerebrasProvider: ProviderConfig = { } const initialCallTime = Date.now() - let currentResponse = (await client.chat.completions.create(payload)) as CerebrasResponse + let currentResponse = (await client.chat.completions.create( + payload, + request.abortSignal ? { signal: request.abortSignal } : undefined + )) as CerebrasResponse const firstResponseTime = Date.now() - initialCallTime let content = currentResponse.choices[0]?.message?.content || '' @@ -365,7 +371,8 @@ export const cerebrasProvider: ProviderConfig = { finalPayload.tool_choice = 'none' const finalResponse = (await client.chat.completions.create( - finalPayload + finalPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined )) as CerebrasResponse const nextModelEndTime = Date.now() @@ -401,7 +408,8 @@ export const cerebrasProvider: ProviderConfig = { const nextModelStartTime = Date.now() currentResponse = (await client.chat.completions.create( - nextPayload + nextPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined )) as CerebrasResponse const nextModelEndTime = Date.now() @@ -443,7 +451,10 @@ export const cerebrasProvider: ProviderConfig = { stream: true, } - const streamResponse: any = await client.chat.completions.create(streamingPayload) + const streamResponse: any = await client.chat.completions.create( + streamingPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output) diff --git a/apps/sim/providers/deepseek/index.ts b/apps/sim/providers/deepseek/index.ts index 693bf50c59..f537e5e89c 100644 --- a/apps/sim/providers/deepseek/index.ts +++ b/apps/sim/providers/deepseek/index.ts @@ -114,10 +114,13 @@ export const deepseekProvider: ProviderConfig = { if (request.stream && (!tools || tools.length === 0)) { logger.info('Using streaming response for DeepSeek request (no tools)') - const streamResponse = await deepseek.chat.completions.create({ - ...payload, - stream: true, - }) + const streamResponse = await deepseek.chat.completions.create( + { + ...payload, + stream: true, + }, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromDeepseekStream( @@ -183,7 +186,10 @@ export const deepseekProvider: ProviderConfig = { const forcedTools = preparedTools?.forcedTools || [] let usedForcedTools: string[] = [] - let currentResponse = await deepseek.chat.completions.create(payload) + let currentResponse = await deepseek.chat.completions.create( + payload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const firstResponseTime = Date.now() - initialCallTime let content = currentResponse.choices[0]?.message?.content || '' @@ -375,7 +381,10 @@ export const deepseekProvider: ProviderConfig = { } const nextModelStartTime = Date.now() - currentResponse = await deepseek.chat.completions.create(nextPayload) + currentResponse = await deepseek.chat.completions.create( + nextPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) if ( typeof nextPayload.tool_choice === 'object' && @@ -439,7 +448,10 @@ export const deepseekProvider: ProviderConfig = { stream: true, } - const streamResponse = await deepseek.chat.completions.create(streamingPayload) + const streamResponse = await deepseek.chat.completions.create( + streamingPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output) diff --git a/apps/sim/providers/gemini/core.ts b/apps/sim/providers/gemini/core.ts index 000d90f405..55855b334d 100644 --- a/apps/sim/providers/gemini/core.ts +++ b/apps/sim/providers/gemini/core.ts @@ -387,10 +387,25 @@ const DEEP_RESEARCH_POLL_INTERVAL_MS = 10_000 const DEEP_RESEARCH_MAX_DURATION_MS = 60 * 60 * 1000 /** - * Sleeps for the specified number of milliseconds + * Sleeps for the specified number of milliseconds, respecting an optional abort signal. */ -function sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)) +function sleep(ms: number, signal?: AbortSignal): Promise { + if (signal?.aborted) { + return Promise.reject( + signal.reason ?? new DOMException('The operation was aborted.', 'AbortError') + ) + } + return new Promise((resolve, reject) => { + const onAbort = () => { + clearTimeout(timer) + reject(signal!.reason ?? new DOMException('The operation was aborted.', 'AbortError')) + } + const timer = setTimeout(() => { + signal?.removeEventListener('abort', onAbort) + resolve() + }, ms) + signal?.addEventListener('abort', onAbort, { once: true }) + }) } /** @@ -680,7 +695,10 @@ export async function executeDeepResearchRequest( stream: true, } - const streamResponse = await ai.interactions.create(streamParams) + const streamResponse = await ai.interactions.create( + streamParams, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const firstResponseTime = Date.now() - providerStartTime const streamingResult: StreamingExecution = { @@ -765,7 +783,10 @@ export async function executeDeepResearchRequest( stream: false, } - const interaction = await ai.interactions.create(createParams) + const interaction = await ai.interactions.create( + createParams, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const interactionId = interaction.id logger.info('Deep research interaction created', { interactionId, status: interaction.status }) @@ -793,8 +814,12 @@ export async function executeDeepResearchRequest( elapsedMs: Date.now() - pollStartTime, }) - await sleep(DEEP_RESEARCH_POLL_INTERVAL_MS) - result = await ai.interactions.get(interactionId) + await sleep(DEEP_RESEARCH_POLL_INTERVAL_MS, request.abortSignal) + result = await ai.interactions.get( + interactionId, + undefined, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) } if (result.status !== 'completed') { @@ -882,6 +907,9 @@ export async function executeGeminiRequest( // Build configuration const geminiConfig: GenerateContentConfig = {} + if (request.abortSignal) { + geminiConfig.abortSignal = request.abortSignal + } if (request.temperature !== undefined) { geminiConfig.temperature = request.temperature } diff --git a/apps/sim/providers/groq/index.ts b/apps/sim/providers/groq/index.ts index cf3d45c78e..756082b45f 100644 --- a/apps/sim/providers/groq/index.ts +++ b/apps/sim/providers/groq/index.ts @@ -118,10 +118,13 @@ export const groqProvider: ProviderConfig = { const providerStartTime = Date.now() const providerStartTimeISO = new Date(providerStartTime).toISOString() - const streamResponse = await groq.chat.completions.create({ - ...payload, - stream: true, - }) + const streamResponse = await groq.chat.completions.create( + { + ...payload, + stream: true, + }, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromGroqStream(streamResponse as any, (content, usage) => { @@ -185,7 +188,10 @@ export const groqProvider: ProviderConfig = { try { const initialCallTime = Date.now() - let currentResponse = await groq.chat.completions.create(payload) + let currentResponse = await groq.chat.completions.create( + payload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const firstResponseTime = Date.now() - initialCallTime let content = currentResponse.choices[0]?.message?.content || '' @@ -355,7 +361,10 @@ export const groqProvider: ProviderConfig = { } const nextModelStartTime = Date.now() - currentResponse = await groq.chat.completions.create(nextPayload) + currentResponse = await groq.chat.completions.create( + nextPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const nextModelEndTime = Date.now() const thisModelTime = nextModelEndTime - nextModelStartTime @@ -396,7 +405,10 @@ export const groqProvider: ProviderConfig = { stream: true, } - const streamResponse = await groq.chat.completions.create(streamingPayload) + const streamResponse = await groq.chat.completions.create( + streamingPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output) diff --git a/apps/sim/providers/mistral/index.ts b/apps/sim/providers/mistral/index.ts index 5b261c947d..693885fe28 100644 --- a/apps/sim/providers/mistral/index.ts +++ b/apps/sim/providers/mistral/index.ts @@ -143,7 +143,10 @@ export const mistralProvider: ProviderConfig = { ...payload, stream: true, } - const streamResponse = await mistral.chat.completions.create(streamingParams) + const streamResponse = await mistral.chat.completions.create( + streamingParams, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromMistralStream(streamResponse, (content, usage) => { @@ -242,7 +245,10 @@ export const mistralProvider: ProviderConfig = { } } - let currentResponse = await mistral.chat.completions.create(payload) + let currentResponse = await mistral.chat.completions.create( + payload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const firstResponseTime = Date.now() - initialCallTime let content = currentResponse.choices[0]?.message?.content || '' @@ -413,7 +419,10 @@ export const mistralProvider: ProviderConfig = { const nextModelStartTime = Date.now() - currentResponse = await mistral.chat.completions.create(nextPayload) + currentResponse = await mistral.chat.completions.create( + nextPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) checkForForcedToolUsage(currentResponse, nextPayload.tool_choice) @@ -454,7 +463,10 @@ export const mistralProvider: ProviderConfig = { tool_choice: 'auto', stream: true, } - const streamResponse = await mistral.chat.completions.create(streamingParams) + const streamResponse = await mistral.chat.completions.create( + streamingParams, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromMistralStream(streamResponse, (content, usage) => { diff --git a/apps/sim/providers/ollama/index.ts b/apps/sim/providers/ollama/index.ts index f8485bdfe7..baae54fb75 100644 --- a/apps/sim/providers/ollama/index.ts +++ b/apps/sim/providers/ollama/index.ts @@ -166,7 +166,10 @@ export const ollamaProvider: ProviderConfig = { stream: true, stream_options: { include_usage: true }, } - const streamResponse = await ollama.chat.completions.create(streamingParams) + const streamResponse = await ollama.chat.completions.create( + streamingParams, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromOllamaStream(streamResponse, (content, usage) => { @@ -248,7 +251,10 @@ export const ollamaProvider: ProviderConfig = { const initialCallTime = Date.now() - let currentResponse = await ollama.chat.completions.create(payload) + let currentResponse = await ollama.chat.completions.create( + payload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const firstResponseTime = Date.now() - initialCallTime let content = currentResponse.choices[0]?.message?.content || '' @@ -408,7 +414,10 @@ export const ollamaProvider: ProviderConfig = { const nextModelStartTime = Date.now() - currentResponse = await ollama.chat.completions.create(nextPayload) + currentResponse = await ollama.chat.completions.create( + nextPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const nextModelEndTime = Date.now() const thisModelTime = nextModelEndTime - nextModelStartTime @@ -450,7 +459,10 @@ export const ollamaProvider: ProviderConfig = { stream: true, stream_options: { include_usage: true }, } - const streamResponse = await ollama.chat.completions.create(streamingParams) + const streamResponse = await ollama.chat.completions.create( + streamingParams, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromOllamaStream(streamResponse, (content, usage) => { diff --git a/apps/sim/providers/openai/core.ts b/apps/sim/providers/openai/core.ts index 6833181dc1..139e12eaa3 100644 --- a/apps/sim/providers/openai/core.ts +++ b/apps/sim/providers/openai/core.ts @@ -265,6 +265,7 @@ export async function executeResponsesProviderRequest( method: 'POST', headers: config.headers, body: JSON.stringify(body), + signal: request.abortSignal, }) if (!response.ok) { @@ -286,6 +287,7 @@ export async function executeResponsesProviderRequest( method: 'POST', headers: config.headers, body: JSON.stringify(createRequestBody(initialInput, { stream: true })), + signal: request.abortSignal, }) if (!streamResponse.ok) { @@ -704,6 +706,7 @@ export async function executeResponsesProviderRequest( method: 'POST', headers: config.headers, body: JSON.stringify(createRequestBody(currentInput, streamOverrides)), + signal: request.abortSignal, }) if (!streamResponse.ok) { diff --git a/apps/sim/providers/openrouter/index.ts b/apps/sim/providers/openrouter/index.ts index edbed1b7ca..2951d56ae7 100644 --- a/apps/sim/providers/openrouter/index.ts +++ b/apps/sim/providers/openrouter/index.ts @@ -157,7 +157,10 @@ export const openRouterProvider: ProviderConfig = { stream: true, stream_options: { include_usage: true }, } - const streamResponse = await client.chat.completions.create(streamingParams) + const streamResponse = await client.chat.completions.create( + streamingParams, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromOpenAIStream(streamResponse, (content, usage) => { @@ -231,7 +234,10 @@ export const openRouterProvider: ProviderConfig = { const forcedTools = preparedTools?.forcedTools || [] let usedForcedTools: string[] = [] - let currentResponse = await client.chat.completions.create(payload) + let currentResponse = await client.chat.completions.create( + payload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const firstResponseTime = Date.now() - initialCallTime let content = currentResponse.choices[0]?.message?.content || '' @@ -400,7 +406,10 @@ export const openRouterProvider: ProviderConfig = { } const nextModelStartTime = Date.now() - currentResponse = await client.chat.completions.create(nextPayload) + currentResponse = await client.chat.completions.create( + nextPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const nextForcedToolResult = checkForForcedToolUsage( currentResponse, nextPayload.tool_choice, @@ -450,7 +459,10 @@ export const openRouterProvider: ProviderConfig = { ) } - const streamResponse = await client.chat.completions.create(streamingParams) + const streamResponse = await client.chat.completions.create( + streamingParams, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromOpenAIStream(streamResponse, (content, usage) => { @@ -533,7 +545,10 @@ export const openRouterProvider: ProviderConfig = { ) const finalStartTime = Date.now() - const finalResponse = await client.chat.completions.create(finalPayload) + const finalResponse = await client.chat.completions.create( + finalPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const finalEndTime = Date.now() const finalDuration = finalEndTime - finalStartTime diff --git a/apps/sim/providers/types.ts b/apps/sim/providers/types.ts index 2c59bfcc13..5df279077e 100644 --- a/apps/sim/providers/types.ts +++ b/apps/sim/providers/types.ts @@ -176,6 +176,7 @@ export interface ProviderRequest { isDeployedContext?: boolean /** Previous interaction ID for multi-turn Interactions API requests (deep research follow-ups) */ previousInteractionId?: string + abortSignal?: AbortSignal } /** diff --git a/apps/sim/providers/vllm/index.ts b/apps/sim/providers/vllm/index.ts index 5e704c654b..9f0b4aaf65 100644 --- a/apps/sim/providers/vllm/index.ts +++ b/apps/sim/providers/vllm/index.ts @@ -189,7 +189,10 @@ export const vllmProvider: ProviderConfig = { stream: true, stream_options: { include_usage: true }, } - const streamResponse = await vllm.chat.completions.create(streamingParams) + const streamResponse = await vllm.chat.completions.create( + streamingParams, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromVLLMStream(streamResponse, (content, usage) => { @@ -293,7 +296,10 @@ export const vllmProvider: ProviderConfig = { } } - let currentResponse = await vllm.chat.completions.create(payload) + let currentResponse = await vllm.chat.completions.create( + payload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const firstResponseTime = Date.now() - initialCallTime let content = currentResponse.choices[0]?.message?.content || '' @@ -474,7 +480,10 @@ export const vllmProvider: ProviderConfig = { const nextModelStartTime = Date.now() - currentResponse = await vllm.chat.completions.create(nextPayload) + currentResponse = await vllm.chat.completions.create( + nextPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) checkForForcedToolUsage(currentResponse, nextPayload.tool_choice) @@ -519,7 +528,10 @@ export const vllmProvider: ProviderConfig = { stream: true, stream_options: { include_usage: true }, } - const streamResponse = await vllm.chat.completions.create(streamingParams) + const streamResponse = await vllm.chat.completions.create( + streamingParams, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromVLLMStream(streamResponse, (content, usage) => { diff --git a/apps/sim/providers/xai/index.ts b/apps/sim/providers/xai/index.ts index 98cbf52c1e..c5a6766fbf 100644 --- a/apps/sim/providers/xai/index.ts +++ b/apps/sim/providers/xai/index.ts @@ -115,7 +115,10 @@ export const xAIProvider: ProviderConfig = { } : { ...basePayload, stream: true, stream_options: { include_usage: true } } - const streamResponse = await xai.chat.completions.create(streamingParams) + const streamResponse = await xai.chat.completions.create( + streamingParams, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const streamingResult = { stream: createReadableStreamFromXAIStream(streamResponse, (content, usage) => { @@ -199,7 +202,10 @@ export const xAIProvider: ProviderConfig = { Object.assign(initialPayload, responseFormatPayload) } - let currentResponse = await xai.chat.completions.create(initialPayload) + let currentResponse = await xai.chat.completions.create( + initialPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const firstResponseTime = Date.now() - initialCallTime let content = currentResponse.choices[0]?.message?.content || '' @@ -414,7 +420,10 @@ export const xAIProvider: ProviderConfig = { const nextModelStartTime = Date.now() - currentResponse = await xai.chat.completions.create(nextPayload) + currentResponse = await xai.chat.completions.create( + nextPayload, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) if (nextPayload.tool_choice && typeof nextPayload.tool_choice === 'object') { const result = checkForForcedToolUsage( currentResponse, @@ -479,7 +488,10 @@ export const xAIProvider: ProviderConfig = { } } - const streamResponse = await xai.chat.completions.create(finalStreamingPayload as any) + const streamResponse = await xai.chat.completions.create( + finalStreamingPayload as any, + request.abortSignal ? { signal: request.abortSignal } : undefined + ) const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output)