Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/sim/executor/handlers/agent/agent-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,7 @@ export class AgentBlockHandler implements BlockHandler {
verbosity: providerRequest.verbosity,
thinkingLevel: providerRequest.thinkingLevel,
previousInteractionId: providerRequest.previousInteractionId,
abortSignal: ctx.abortSignal,
})

return this.processProviderResponse(response, block, responseFormat)
Expand Down
37 changes: 24 additions & 13 deletions apps/sim/providers/anthropic/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,20 @@ const ANTHROPIC_SDK_NON_STREAMING_MAX_TOKENS = 21333
*/
async function createMessage(
anthropic: Anthropic,
payload: AnthropicPayload
payload: AnthropicPayload,
abortSignal?: AbortSignal
): Promise<Anthropic.Messages.Message> {
const options = abortSignal ? { signal: abortSignal } : undefined
if (payload.max_tokens > ANTHROPIC_SDK_NON_STREAMING_MAX_TOKENS && !payload.stream) {
const stream = anthropic.messages.stream(payload as Anthropic.Messages.MessageStreamParams)
const stream = anthropic.messages.stream(
payload as Anthropic.Messages.MessageStreamParams,
options
)
return stream.finalMessage()
}
return anthropic.messages.create(
payload as Anthropic.Messages.MessageCreateParamsNonStreaming
payload as Anthropic.Messages.MessageCreateParamsNonStreaming,
options
) as Promise<Anthropic.Messages.Message>
}

Expand Down Expand Up @@ -367,10 +373,13 @@ export async function executeAnthropicProviderRequest(
const providerStartTime = Date.now()
const providerStartTimeISO = new Date(providerStartTime).toISOString()

const streamResponse = await anthropic.messages.create({
...payload,
stream: true,
} as Anthropic.Messages.MessageCreateParamsStreaming)
const streamResponse = await anthropic.messages.create(
{
...payload,
stream: true,
} as Anthropic.Messages.MessageCreateParamsStreaming,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
stream: createReadableStreamFromAnthropicStream(
Expand Down Expand Up @@ -461,7 +470,7 @@ export async function executeAnthropicProviderRequest(
const forcedTools = preparedTools?.forcedTools || []
let usedForcedTools: string[] = []

let currentResponse = await createMessage(anthropic, payload)
let currentResponse = await createMessage(anthropic, payload, request.abortSignal)
const firstResponseTime = Date.now() - initialCallTime

let content = ''
Expand Down Expand Up @@ -708,7 +717,7 @@ export async function executeAnthropicProviderRequest(

const nextModelStartTime = Date.now()

currentResponse = await createMessage(anthropic, nextPayload)
currentResponse = await createMessage(anthropic, nextPayload, request.abortSignal)

const nextCheckResult = checkForForcedToolUsage(
currentResponse,
Expand Down Expand Up @@ -758,7 +767,8 @@ export async function executeAnthropicProviderRequest(
}

const streamResponse = await anthropic.messages.create(
streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming
streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
Expand Down Expand Up @@ -860,7 +870,7 @@ export async function executeAnthropicProviderRequest(
const forcedTools = preparedTools?.forcedTools || []
let usedForcedTools: string[] = []

let currentResponse = await createMessage(anthropic, payload)
let currentResponse = await createMessage(anthropic, payload, request.abortSignal)
const firstResponseTime = Date.now() - initialCallTime

let content = ''
Expand Down Expand Up @@ -1118,7 +1128,7 @@ export async function executeAnthropicProviderRequest(

const nextModelStartTime = Date.now()

currentResponse = await createMessage(anthropic, nextPayload)
currentResponse = await createMessage(anthropic, nextPayload, request.abortSignal)

const nextCheckResult = checkForForcedToolUsage(
currentResponse,
Expand Down Expand Up @@ -1182,7 +1192,8 @@ export async function executeAnthropicProviderRequest(
}

const streamResponse = await anthropic.messages.create(
streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming
streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
Expand Down
20 changes: 16 additions & 4 deletions apps/sim/providers/azure-openai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ async function executeChatCompletionsRequest(
stream: true,
stream_options: { include_usage: true },
}
const streamResponse = await azureOpenAI.chat.completions.create(streamingParams)
const streamResponse = await azureOpenAI.chat.completions.create(
streamingParams,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
stream: createReadableStreamFromAzureOpenAIStream(streamResponse, (content, usage) => {
Expand Down Expand Up @@ -243,7 +246,10 @@ async function executeChatCompletionsRequest(
const forcedTools = preparedTools?.forcedTools || []
let usedForcedTools: string[] = []

let currentResponse = (await azureOpenAI.chat.completions.create(payload)) as ChatCompletion
let currentResponse = (await azureOpenAI.chat.completions.create(
payload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)) as ChatCompletion
const firstResponseTime = Date.now() - initialCallTime

let content = currentResponse.choices[0]?.message?.content || ''
Expand Down Expand Up @@ -421,7 +427,10 @@ async function executeChatCompletionsRequest(
}

const nextModelStartTime = Date.now()
currentResponse = (await azureOpenAI.chat.completions.create(nextPayload)) as ChatCompletion
currentResponse = (await azureOpenAI.chat.completions.create(
nextPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)) as ChatCompletion

const nextCheckResult = checkForForcedToolUsage(
currentResponse,
Expand Down Expand Up @@ -471,7 +480,10 @@ async function executeChatCompletionsRequest(
stream: true,
stream_options: { include_usage: true },
}
const streamResponse = await azureOpenAI.chat.completions.create(streamingParams)
const streamResponse = await azureOpenAI.chat.completions.create(
streamingParams,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
stream: createReadableStreamFromAzureOpenAIStream(streamResponse, (content, usage) => {
Expand Down
25 changes: 20 additions & 5 deletions apps/sim/providers/bedrock/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,10 @@ export const bedrockProvider: ProviderConfig = {
inferenceConfig,
})

const streamResponse = await client.send(command)
const streamResponse = await client.send(
command,
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
)

if (!streamResponse.stream) {
throw new Error('No stream returned from Bedrock')
Expand Down Expand Up @@ -379,7 +382,10 @@ export const bedrockProvider: ProviderConfig = {
toolConfig,
})

let currentResponse = await client.send(command)
let currentResponse = await client.send(
command,
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
)
const firstResponseTime = Date.now() - initialCallTime

let content = ''
Expand Down Expand Up @@ -628,7 +634,10 @@ export const bedrockProvider: ProviderConfig = {
: undefined,
})

currentResponse = await client.send(nextCommand)
currentResponse = await client.send(
nextCommand,
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
)

const nextToolUseContentBlocks = (currentResponse.output?.message?.content || []).filter(
(block): block is ContentBlock & { toolUse: ToolUseBlock } => 'toolUse' in block
Expand Down Expand Up @@ -696,7 +705,10 @@ export const bedrockProvider: ProviderConfig = {
},
})

const structuredResponse = await client.send(structuredOutputCommand)
const structuredResponse = await client.send(
structuredOutputCommand,
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
)
const structuredOutputEndTime = Date.now()

timeSegments.push({
Expand Down Expand Up @@ -782,7 +794,10 @@ export const bedrockProvider: ProviderConfig = {
toolConfig: streamToolConfig,
})

const streamResponse = await client.send(streamCommand)
const streamResponse = await client.send(
streamCommand,
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
)

if (!streamResponse.stream) {
throw new Error('No stream returned from Bedrock')
Expand Down
27 changes: 19 additions & 8 deletions apps/sim/providers/cerebras/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,13 @@ export const cerebrasProvider: ProviderConfig = {
if (request.stream && (!tools || tools.length === 0)) {
logger.info('Using streaming response for Cerebras request (no tools)')

const streamResponse: any = await client.chat.completions.create({
...payload,
stream: true,
})
const streamResponse: any = await client.chat.completions.create(
{
...payload,
stream: true,
},
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
stream: createReadableStreamFromCerebrasStream(streamResponse, (content, usage) => {
Expand Down Expand Up @@ -179,7 +182,10 @@ export const cerebrasProvider: ProviderConfig = {
}
const initialCallTime = Date.now()

let currentResponse = (await client.chat.completions.create(payload)) as CerebrasResponse
let currentResponse = (await client.chat.completions.create(
payload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)) as CerebrasResponse
const firstResponseTime = Date.now() - initialCallTime

let content = currentResponse.choices[0]?.message?.content || ''
Expand Down Expand Up @@ -365,7 +371,8 @@ export const cerebrasProvider: ProviderConfig = {
finalPayload.tool_choice = 'none'

const finalResponse = (await client.chat.completions.create(
finalPayload
finalPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)) as CerebrasResponse

const nextModelEndTime = Date.now()
Expand Down Expand Up @@ -401,7 +408,8 @@ export const cerebrasProvider: ProviderConfig = {

const nextModelStartTime = Date.now()
currentResponse = (await client.chat.completions.create(
nextPayload
nextPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)) as CerebrasResponse

const nextModelEndTime = Date.now()
Expand Down Expand Up @@ -443,7 +451,10 @@ export const cerebrasProvider: ProviderConfig = {
stream: true,
}

const streamResponse: any = await client.chat.completions.create(streamingPayload)
const streamResponse: any = await client.chat.completions.create(
streamingPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output)

Expand Down
26 changes: 19 additions & 7 deletions apps/sim/providers/deepseek/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,13 @@ export const deepseekProvider: ProviderConfig = {
if (request.stream && (!tools || tools.length === 0)) {
logger.info('Using streaming response for DeepSeek request (no tools)')

const streamResponse = await deepseek.chat.completions.create({
...payload,
stream: true,
})
const streamResponse = await deepseek.chat.completions.create(
{
...payload,
stream: true,
},
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
stream: createReadableStreamFromDeepseekStream(
Expand Down Expand Up @@ -183,7 +186,10 @@ export const deepseekProvider: ProviderConfig = {
const forcedTools = preparedTools?.forcedTools || []
let usedForcedTools: string[] = []

let currentResponse = await deepseek.chat.completions.create(payload)
let currentResponse = await deepseek.chat.completions.create(
payload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)
const firstResponseTime = Date.now() - initialCallTime

let content = currentResponse.choices[0]?.message?.content || ''
Expand Down Expand Up @@ -375,7 +381,10 @@ export const deepseekProvider: ProviderConfig = {
}

const nextModelStartTime = Date.now()
currentResponse = await deepseek.chat.completions.create(nextPayload)
currentResponse = await deepseek.chat.completions.create(
nextPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

if (
typeof nextPayload.tool_choice === 'object' &&
Expand Down Expand Up @@ -439,7 +448,10 @@ export const deepseekProvider: ProviderConfig = {
stream: true,
}

const streamResponse = await deepseek.chat.completions.create(streamingPayload)
const streamResponse = await deepseek.chat.completions.create(
streamingPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output)

Expand Down
Loading