diff --git a/ee/enterprise/conversation-api.ts b/ee/enterprise/conversation-api.ts index 14e19d8..eb5473e 100644 --- a/ee/enterprise/conversation-api.ts +++ b/ee/enterprise/conversation-api.ts @@ -175,183 +175,220 @@ async function runConversationMessage( const workspaceLimit = getEffectiveLimit(workspacePlanLimit, 'api.messages_per_month', overageSettings) const keyLimit = getEffectiveLimit(keyData.monthlyMessageLimit, 'api.messages_per_month', overageSettings) - const usageCheck = await db.incrementAPIUsageIfAllowed({ - workspaceId: keyData.workspaceId, - apiKeyId: keyData.keyId, - month: usageMonth, - keyLimit, - workspaceLimit, - }) - if (!usageCheck.allowed) { - const limitForMessage = usageCheck.reason === 'workspace_limit' ? workspacePlanLimit : keyData.monthlyMessageLimit - throw createError({ statusCode: 429, message: errorMessage('conversation.monthly_limit', { limit: limitForMessage }) }) - } - - // Best-effort meter write for overage billing. Fire-and-forget. - recordAPIUsage({ workspaceId: keyData.workspaceId, count: 1, apiKeyId: keyData.keyId, month: usageMonth }).catch(() => {}) - - const permissions = buildPermissions(keyData) - const [owner, repo] = project.repo_full_name.split('/') - if (!owner || !repo) { - throw createError({ statusCode: 400, message: errorMessage('github.repo_required') }) + // Billing semantic mirrors the Studio chat path: reservation is made + // up front for race-free cap enforcement, but a message only becomes + // billable once Anthropic streams a real provider event (text or + // tool_use). Any failure before that point refunds the slot via the + // finally below. + let reserved = false + let committed = false + const tryRevert = async (reason: string) => { + if (!reserved || committed) return + try { + await db.decrementAPIUsage({ + workspaceId: keyData.workspaceId, + apiKeyId: keyData.keyId, + month: usageMonth, + }) + } + catch (err) { + // eslint-disable-next-line no-console + console.error(`[conversation-api] usage revert failed (${reason}):`, err) + } } - const git = useGitProvider({ - installationId: workspace.github_installation_id ?? 0, - owner, - repo, - }) - - const contentRoot = normalizeContentRoot(project.content_root) - const brain = await getOrBuildBrainCache(git, contentRoot, keyData.projectId) - const projectConfig = brain.config - const models = [...brain.models.values()] - const vocabulary = brain.vocabulary - const contentContext = brain.contentContext + try { + const usageCheck = await db.incrementAPIUsageIfAllowed({ + workspaceId: keyData.workspaceId, + apiKeyId: keyData.keyId, + month: usageMonth, + keyLimit, + workspaceLimit, + }) + if (!usageCheck.allowed) { + const limitForMessage = usageCheck.reason === 'workspace_limit' ? workspacePlanLimit : keyData.monthlyMessageLimit + throw createError({ statusCode: 429, message: errorMessage('conversation.monthly_limit', { limit: limitForMessage }) }) + } + reserved = true - const uiContext = parseConversationContext(body.context) + const permissions = buildPermissions(keyData) + const [owner, repo] = project.repo_full_name.split('/') + if (!owner || !repo) { + throw createError({ statusCode: 400, message: errorMessage('github.repo_required') }) + } - let pendingBranches: Array<{ name: string, sha: string, protected: boolean }> = [] - try { - const raw = await git.listBranches('cr/') - pendingBranches = raw.map(b => ({ name: b.name, sha: b.sha, protected: b.protected ?? false })) - } - catch (err) { + const git = useGitProvider({ + installationId: workspace.github_installation_id ?? 0, + owner, + repo, + }) + + const contentRoot = normalizeContentRoot(project.content_root) + const brain = await getOrBuildBrainCache(git, contentRoot, keyData.projectId) + const projectConfig = brain.config + const models = [...brain.models.values()] + const vocabulary = brain.vocabulary + const contentContext = brain.contentContext + + const uiContext = parseConversationContext(body.context) + + let pendingBranches: Array<{ name: string, sha: string, protected: boolean }> = [] + try { + const raw = await git.listBranches('cr/') + pendingBranches = raw.map(b => ({ name: b.name, sha: b.sha, protected: b.protected ?? false })) + } + catch (err) { // eslint-disable-next-line no-console - console.error('[conversation-api] Failed to list branches:', err) - } + console.error('[conversation-api] Failed to list branches:', err) + } - const phase = deriveProjectPhase(projectConfig, pendingBranches, project.status ?? 'active') - const intent = classifyIntent(body.message, uiContext, phase) - - let systemPrompt = buildSystemPrompt( - projectConfig, - models, - permissions, - { initialized: !!projectConfig, pendingBranches, projectStatus: project.status ?? 'active', phase, contentContext }, - uiContext, - intent, - vocabulary, - plan, - keyData.customInstructions, - ) + const phase = deriveProjectPhase(projectConfig, pendingBranches, project.status ?? 'active') + const intent = classifyIntent(body.message, uiContext, phase) - const contentIndex = buildContentIndex(brain) - if (contentIndex) - systemPrompt += `\n\n${contentIndex}` + let systemPrompt = buildSystemPrompt( + projectConfig, + models, + permissions, + { initialized: !!projectConfig, pendingBranches, projectStatus: project.status ?? 'active', phase, contentContext }, + uiContext, + intent, + vocabulary, + plan, + keyData.customInstructions, + ) - const permissionFiltered = filterToolsByPermissions(STUDIO_TOOLS, permissions.availableTools) as typeof STUDIO_TOOLS - const phaseFiltered = permissionFiltered.filter(tool => tool.requiredPhase.includes(phase)) - const aiTools = toAITools(phaseFiltered) + const contentIndex = buildContentIndex(brain) + if (contentIndex) + systemPrompt += `\n\n${contentIndex}` - const runtimeConfig = useRuntimeConfig() - const apiKey = runtimeConfig.anthropic.apiKey - if (!apiKey) - throw createError({ statusCode: 500, message: errorMessage('chat.no_api_key') }) + const permissionFiltered = filterToolsByPermissions(STUDIO_TOOLS, permissions.availableTools) as typeof STUDIO_TOOLS + const phaseFiltered = permissionFiltered.filter(tool => tool.requiredPhase.includes(phase)) + const aiTools = toAITools(phaseFiltered) - let conversationId = body.conversationId - if (conversationId) { - const conv = await db.getConversation(conversationId, keyData.projectId, { apiKeyId: keyData.keyId }) + const runtimeConfig = useRuntimeConfig() + const apiKey = runtimeConfig.anthropic.apiKey + if (!apiKey) + throw createError({ statusCode: 500, message: errorMessage('chat.no_api_key') }) - if (!conv) conversationId = undefined - } + let conversationId = body.conversationId + if (conversationId) { + const conv = await db.getConversation(conversationId, keyData.projectId, { apiKeyId: keyData.keyId }) - if (!conversationId) { - conversationId = await db.createApiConversation(keyData.projectId, keyData.keyId, body.message.substring(0, 100)) ?? undefined - } - - if (!conversationId) - throw createError({ statusCode: 500, message: errorMessage('chat.conversation_create_failed') }) + if (!conv) conversationId = undefined + } - const historyRows = await loadConversationMessages(db, conversationId, 50) - const messages: AIMessage[] = [] - const HISTORY_TOKEN_BUDGET = 8000 + if (!conversationId) { + conversationId = await db.createApiConversation(keyData.projectId, keyData.keyId, body.message.substring(0, 100)) ?? undefined + } - const budgetStart = (() => { - let tokens = 0 - for (let i = historyRows.length - 1; i >= 0; i--) { + if (!conversationId) + throw createError({ statusCode: 500, message: errorMessage('chat.conversation_create_failed') }) + + const historyRows = await loadConversationMessages(db, conversationId, 50) + const messages: AIMessage[] = [] + const HISTORY_TOKEN_BUDGET = 8000 + + const budgetStart = (() => { + let tokens = 0 + for (let i = historyRows.length - 1; i >= 0; i--) { + const row = historyRows[i]! + const content = row.toolCalls ? (row.toolCalls as AIContentBlock[]) : row.content + const estimate = typeof content === 'string' + ? Math.ceil(content.length / 4) + : Math.ceil(JSON.stringify(content).length / 4) + tokens += estimate + if (tokens > HISTORY_TOKEN_BUDGET) return i + 1 + } + return 0 + })() + + for (let i = budgetStart; i < historyRows.length; i++) { const row = historyRows[i]! - const content = row.toolCalls ? (row.toolCalls as AIContentBlock[]) : row.content - const estimate = typeof content === 'string' - ? Math.ceil(content.length / 4) - : Math.ceil(JSON.stringify(content).length / 4) - tokens += estimate - if (tokens > HISTORY_TOKEN_BUDGET) return i + 1 + const content = row.toolCalls ? (row.toolCalls as AIContentBlock[]) : (row.content as string | AIContentBlock[]) + messages.push({ role: row.role as 'user' | 'assistant', content }) + } + messages.push({ role: 'user', content: body.message }) + + const model = keyData.aiModel + const configWorkflow = projectConfig?.workflow ?? 'auto-merge' + const workflow = hasFeature(plan, 'workflow.review') ? configWorkflow : 'auto-merge' + + const contentEngine = createContentEngine({ git, contentRoot, projectId: keyData.projectId }) + const toolResults: Array<{ id: string, name: string, result: unknown }> = [] + let responseText = '' + let totalInputTokens = 0 + let totalOutputTokens = 0 + let lastAssistantContent: AIContentBlock[] = [] + + for await (const evt of runConversationLoop( + { model, apiKey, systemPrompt, messages, tools: aiTools }, + { + engine: contentEngine, + git, + userEmail: `api-key:${keyData.name}`, + userId: keyData.keyId, + contentRoot, + workflow, + permissions, + plan, + projectId: keyData.projectId, + workspaceId: keyData.workspaceId, + uiContext, + phase, + }, + )) { + // Commit on first real provider event — see Studio chat handler + // for the same semantic. `tool_result` is internal, `done` is + // synthetic, neither counts. + if (!committed && (evt.type === 'text' || evt.type === 'tool_use')) { + committed = true + recordAPIUsage({ workspaceId: keyData.workspaceId, count: 1, apiKeyId: keyData.keyId, month: usageMonth }).catch(() => {}) + } + + switch (evt.type) { + case 'text': + responseText += evt.content as string + break + case 'tool_result': + toolResults.push({ id: evt.id as string, name: evt.name as string, result: evt.result }) + break + case 'done': + totalInputTokens = (evt.usage as { inputTokens: number })?.inputTokens ?? 0 + totalOutputTokens = (evt.usage as { outputTokens: number })?.outputTokens ?? 0 + lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? [] + break + } } - return 0 - })() - for (let i = budgetStart; i < historyRows.length; i++) { - const row = historyRows[i]! - const content = row.toolCalls ? (row.toolCalls as AIContentBlock[]) : (row.content as string | AIContentBlock[]) - messages.push({ role: row.role as 'user' | 'assistant', content }) - } - messages.push({ role: 'user', content: body.message }) - - const model = keyData.aiModel - const configWorkflow = projectConfig?.workflow ?? 'auto-merge' - const workflow = hasFeature(plan, 'workflow.review') ? configWorkflow : 'auto-merge' - - const contentEngine = createContentEngine({ git, contentRoot, projectId: keyData.projectId }) - const toolResults: Array<{ id: string, name: string, result: unknown }> = [] - let responseText = '' - let totalInputTokens = 0 - let totalOutputTokens = 0 - let lastAssistantContent: AIContentBlock[] = [] - - for await (const evt of runConversationLoop( - { model, apiKey, systemPrompt, messages, tools: aiTools }, - { - engine: contentEngine, - git, - userEmail: `api-key:${keyData.name}`, - userId: keyData.keyId, - contentRoot, - workflow, - permissions, - plan, - projectId: keyData.projectId, - workspaceId: keyData.workspaceId, - uiContext, - phase, - }, - )) { - switch (evt.type) { - case 'text': - responseText += evt.content as string - break - case 'tool_result': - toolResults.push({ id: evt.id as string, name: evt.name as string, result: evt.result }) - break - case 'done': - totalInputTokens = (evt.usage as { inputTokens: number })?.inputTokens ?? 0 - totalOutputTokens = (evt.usage as { outputTokens: number })?.outputTokens ?? 0 - lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? [] - break + await saveApiChatResult( + conversationId, + body.message, + responseText, + lastAssistantContent, + model, + totalInputTokens, + totalOutputTokens, + keyData.workspaceId, + keyData.keyId, + usageMonth, + ) + + return { + conversationId, + message: responseText, + toolResults: toolResults.length > 0 ? toolResults : undefined, + usage: { + inputTokens: totalInputTokens, + outputTokens: totalOutputTokens, + }, } } - - await saveApiChatResult( - conversationId, - body.message, - responseText, - lastAssistantContent, - model, - totalInputTokens, - totalOutputTokens, - keyData.workspaceId, - keyData.keyId, - usageMonth, - ) - - return { - conversationId, - message: responseText, - toolResults: toolResults.length > 0 ? toolResults : undefined, - usage: { - inputTokens: totalInputTokens, - outputTokens: totalOutputTokens, - }, + finally { + // Refund the reserved API slot if no real provider event ever + // flipped `committed` (auth failure to Anthropic, brain/tool init + // failures after reserve, etc.). After the first text or tool_use + // event we keep the reservation regardless of downstream outcome. + await tryRevert('reserve-scope') } } diff --git a/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts b/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts index 0248828..1b06d41 100644 --- a/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts +++ b/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts @@ -88,191 +88,236 @@ export default defineEventHandler(async (event) => { const overageSettings = event.context.billing?.overageSettings as Record | undefined const monthlyLimit = getEffectiveLimit(basePlanLimit, 'ai.messages_per_month', overageSettings) const usageMonth = new Date().toISOString().substring(0, 7) - if (monthlyLimit !== Infinity) { - const { allowed } = await db.incrementAgentUsageIfAllowed({ - workspaceId, - userId: session.user.id, - month: usageMonth, - source: usageSource, - limit: monthlyLimit, - }) - if (!allowed) - throw createError({ statusCode: 429, message: errorMessage('chat.monthly_limit_reached', { limit: basePlanLimit }) }) - } - - // Best-effort meter write for overage billing. Fire-and-forget. - recordAIUsage({ workspaceId, count: 1, userId: session.user.id, month: usageMonth }).catch(() => {}) - // === CONVERSATION === - let conversationId: string | undefined = body.conversationId - - // Verify ownership if continuing existing conversation - if (conversationId) { - const conv = await db.getConversation(conversationId, projectId, { userId: session.user.id }) - if (!conv) { - conversationId = undefined + // Billing semantic: a message is billable only once Anthropic streams + // its first real provider event (text or tool_use). Pre-AI failures + // (DB error, brain cache failure, abort before first token) refund + // the reserved slot via `tryRevert`. Post-first-event failures stay + // billable — we paid Anthropic for whatever tokens we received. + let reserved = false + let committed = false + const tryRevert = async (reason: string) => { + if (!reserved || committed || monthlyLimit === Infinity) return + try { + await db.decrementAgentUsage({ + workspaceId, + userId: session.user.id, + month: usageMonth, + source: usageSource, + }) + } + catch (err) { + // eslint-disable-next-line no-console + console.error(`[chat] usage revert failed (${reason}):`, err) } } - if (!conversationId) { - conversationId = (await db.createConversation(projectId, session.user.id, body.message)) ?? undefined - } - if (!conversationId) - throw createError({ statusCode: 500, message: errorMessage('chat.conversation_create_failed') }) + try { + if (monthlyLimit !== Infinity) { + const { allowed } = await db.incrementAgentUsageIfAllowed({ + workspaceId, + userId: session.user.id, + month: usageMonth, + source: usageSource, + limit: monthlyLimit, + }) + if (!allowed) + throw createError({ statusCode: 429, message: errorMessage('chat.monthly_limit_reached', { limit: basePlanLimit }) }) + reserved = true + } + + // === CONVERSATION === + let conversationId: string | undefined = body.conversationId - // === HISTORY === - const historyRows = await db.loadConversationMessages(conversationId, 50) + // Verify ownership if continuing existing conversation + if (conversationId) { + const conv = await db.getConversation(conversationId, projectId, { userId: session.user.id }) + if (!conv) { + conversationId = undefined + } + } - // Build message history: chronological order, newest messages prioritized within budget - const allHistory = historyRows ?? [] - const messages: AIMessage[] = [] + if (!conversationId) { + conversationId = (await db.createConversation(projectId, session.user.id, body.message)) ?? undefined + } + if (!conversationId) + throw createError({ statusCode: 500, message: errorMessage('chat.conversation_create_failed') }) + + // === HISTORY === + const historyRows = await db.loadConversationMessages(conversationId, 50) + + // Build message history: chronological order, newest messages prioritized within budget + const allHistory = historyRows ?? [] + const messages: AIMessage[] = [] + + // Walk backwards to find budget cutoff, then take from that point forward + const budgetStart = (() => { + let tokens = 0 + for (let i = allHistory.length - 1; i >= 0; i--) { + const row = allHistory[i]! + const content = row.tool_calls ? (row.tool_calls as AIContentBlock[]) : row.content + const estimate = typeof content === 'string' ? Math.ceil(content.length / 4) : Math.ceil(JSON.stringify(content).length / 4) + tokens += estimate + if (tokens > HISTORY_TOKEN_BUDGET) return i + 1 + } + return 0 + })() - // Walk backwards to find budget cutoff, then take from that point forward - const budgetStart = (() => { - let tokens = 0 - for (let i = allHistory.length - 1; i >= 0; i--) { + for (let i = budgetStart; i < allHistory.length; i++) { const row = allHistory[i]! - const content = row.tool_calls ? (row.tool_calls as AIContentBlock[]) : row.content - const estimate = typeof content === 'string' ? Math.ceil(content.length / 4) : Math.ceil(JSON.stringify(content).length / 4) - tokens += estimate - if (tokens > HISTORY_TOKEN_BUDGET) return i + 1 + const content = row.tool_calls ? (row.tool_calls as AIContentBlock[]) : (row.content as string | AIContentBlock[]) + messages.push({ role: row.role as 'user' | 'assistant', content }) } - return 0 - })() - - for (let i = budgetStart; i < allHistory.length; i++) { - const row = allHistory[i]! - const content = row.tool_calls ? (row.tool_calls as AIContentBlock[]) : (row.content as string | AIContentBlock[]) - messages.push({ role: row.role as 'user' | 'assistant', content }) - } - messages.push({ role: 'user', content: body.message }) + messages.push({ role: 'user', content: body.message }) - // === LOAD SCHEMA (from brain cache) === - const brain = await getOrBuildBrainCache(git, contentRoot, projectId) - const projectConfig = brain.config - const models = [...brain.models.values()] - const vocabulary = brain.vocabulary - const contentContext = brain.contentContext + // === LOAD SCHEMA (from brain cache) === + const brain = await getOrBuildBrainCache(git, contentRoot, projectId) + const projectConfig = brain.config + const models = [...brain.models.values()] + const vocabulary = brain.vocabulary + const contentContext = brain.contentContext - // === STATE MACHINE === - let pendingBranches: Array<{ name: string, sha: string, protected: boolean }> = [] - try { - const raw = await git.listBranches('cr/') - pendingBranches = raw.map(b => ({ name: b.name, sha: b.sha, protected: b.protected ?? false })) - } - catch { /* no branches */ } + // === STATE MACHINE === + let pendingBranches: Array<{ name: string, sha: string, protected: boolean }> = [] + try { + const raw = await git.listBranches('cr/') + pendingBranches = raw.map(b => ({ name: b.name, sha: b.sha, protected: b.protected ?? false })) + } + catch { /* no branches */ } - const phase = deriveProjectPhase(projectConfig, pendingBranches, project.status ?? 'active') + const phase = deriveProjectPhase(projectConfig, pendingBranches, project.status ?? 'active') - // === INTENT CLASSIFICATION === - const intent = classifyIntent(body.message, uiContext, phase) + // === INTENT CLASSIFICATION === + const intent = classifyIntent(body.message, uiContext, phase) - // === BUILD SYSTEM PROMPT (bounded, context-aware) === - const projectState = { - initialized: !!projectConfig, - pendingBranches, - projectStatus: project.status ?? 'active', - phase, - contentContext, - } + // === BUILD SYSTEM PROMPT (bounded, context-aware) === + const projectState = { + initialized: !!projectConfig, + pendingBranches, + projectStatus: project.status ?? 'active', + phase, + contentContext, + } - let systemPrompt = buildSystemPrompt(projectConfig, models, permissions, projectState, uiContext, intent, vocabulary, plan) + let systemPrompt = buildSystemPrompt(projectConfig, models, permissions, projectState, uiContext, intent, vocabulary, plan) - // Append content index from brain cache (compact summary of all content) - const contentIndex = buildContentIndex(brain) - if (contentIndex) { - systemPrompt += `\n\n${contentIndex}` - } + // Append content index from brain cache (compact summary of all content) + const contentIndex = buildContentIndex(brain) + if (contentIndex) { + systemPrompt += `\n\n${contentIndex}` + } - // === FILTER TOOLS by permissions + phase === - const permissionFiltered = filterToolsByPermissions(STUDIO_TOOLS, permissions.availableTools) as StudioTool[] - const phaseFiltered = permissionFiltered.filter(t => t.requiredPhase.includes(phase)) - const aiTools = toAITools(phaseFiltered) + // === FILTER TOOLS by permissions + phase === + const permissionFiltered = filterToolsByPermissions(STUDIO_TOOLS, permissions.availableTools) as StudioTool[] + const phaseFiltered = permissionFiltered.filter(t => t.requiredPhase.includes(phase)) + const aiTools = toAITools(phaseFiltered) - // Model: plan-gated selection - const ALL_MODELS = ['claude-sonnet-4-20250514', 'claude-opus-4-20250514', 'claude-haiku-4-5-20251001'] - const STARTER_MODELS = ['claude-haiku-4-5-20251001'] - const availableModels = hasFeature(plan, 'ai.studio_key') ? ALL_MODELS : STARTER_MODELS - const requestedModel = body.model as string | undefined - const model = (requestedModel && availableModels.includes(requestedModel)) ? requestedModel : availableModels[0]! + // Model: plan-gated selection + const ALL_MODELS = ['claude-sonnet-4-20250514', 'claude-opus-4-20250514', 'claude-haiku-4-5-20251001'] + const STARTER_MODELS = ['claude-haiku-4-5-20251001'] + const availableModels = hasFeature(plan, 'ai.studio_key') ? ALL_MODELS : STARTER_MODELS + const requestedModel = body.model as string | undefined + const model = (requestedModel && availableModels.includes(requestedModel)) ? requestedModel : availableModels[0]! - // Workflow: plans without review feature always auto-merge regardless of config - const configWorkflow = projectConfig?.workflow ?? 'auto-merge' - const workflow = hasFeature(plan, 'workflow.review') ? configWorkflow : 'auto-merge' + // Workflow: plans without review feature always auto-merge regardless of config + const configWorkflow = projectConfig?.workflow ?? 'auto-merge' + const workflow = hasFeature(plan, 'workflow.review') ? configWorkflow : 'auto-merge' - // === SSE STREAM === - const eventStream = createEventStream(event) - const contentEngine = createContentEngine({ git, contentRoot, projectId }) - const abortController = new AbortController() + // === SSE STREAM === + const eventStream = createEventStream(event) + const contentEngine = createContentEngine({ git, contentRoot, projectId }) + const abortController = new AbortController() - const processChat = async () => { - await eventStream.push(JSON.stringify({ type: 'conversation', id: conversationId })) + const processChat = async () => { + await eventStream.push(JSON.stringify({ type: 'conversation', id: conversationId })) - let totalInputTokens = 0 - let totalOutputTokens = 0 - let lastAssistantContent: AIContentBlock[] = [] + let totalInputTokens = 0 + let totalOutputTokens = 0 + let lastAssistantContent: AIContentBlock[] = [] - try { - for await (const evt of runConversationLoop( - { model, apiKey, systemPrompt, messages, tools: aiTools, abortSignal: abortController.signal }, - { engine: contentEngine, git, userEmail: session.user.email ?? '', userId: session.user.id, contentRoot, workflow, permissions, plan, projectId, workspaceId, uiContext, phase }, - )) { + try { + for await (const evt of runConversationLoop( + { model, apiKey, systemPrompt, messages, tools: aiTools, abortSignal: abortController.signal }, + { engine: contentEngine, git, userEmail: session.user.email ?? '', userId: session.user.id, contentRoot, workflow, permissions, plan, projectId, workspaceId, uiContext, phase }, + )) { // Stop processing if client disconnected - if (abortController.signal.aborted) break - - // Forward all events to SSE stream - if (evt.type === 'done') { + if (abortController.signal.aborted) break + + // Commit on the first billable provider event. `text` and + // `tool_use` are real Anthropic stream events; `tool_result` + // is internal (engine-emitted post tool exec), `done` is + // synthetic, and `error` may fire before any tokens — none + // of those count as "we paid for an LLM call." + if (!committed && (evt.type === 'text' || evt.type === 'tool_use')) { + committed = true + recordAIUsage({ workspaceId, count: 1, userId: session.user.id, month: usageMonth }).catch(() => {}) + } + + // Forward all events to SSE stream + if (evt.type === 'done') { // Extract final state from done event before forwarding - totalInputTokens = (evt.usage as { inputTokens: number })?.inputTokens ?? 0 - totalOutputTokens = (evt.usage as { outputTokens: number })?.outputTokens ?? 0 - lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? [] - - // Forward the done event without lastContent (not needed by client) - await eventStream.push(JSON.stringify({ - type: 'done', - usage: evt.usage, - affected: evt.affected, - })) + totalInputTokens = (evt.usage as { inputTokens: number })?.inputTokens ?? 0 + totalOutputTokens = (evt.usage as { outputTokens: number })?.outputTokens ?? 0 + lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? [] + + // Forward the done event without lastContent (not needed by client) + await eventStream.push(JSON.stringify({ + type: 'done', + usage: evt.usage, + affected: evt.affected, + })) + } + else { + await eventStream.push(JSON.stringify(evt)) + } } - else { - await eventStream.push(JSON.stringify(evt)) - } - } - // === SAVE TO DB === - const assistantText = lastAssistantContent - .filter(b => b.type === 'text') - .map(b => (b as { text: string }).text) - .join('') + // === SAVE TO DB === + const assistantText = lastAssistantContent + .filter(b => b.type === 'text') + .map(b => (b as { text: string }).text) + .join('') - await saveChatResult( - conversationId, body.message, assistantText, - lastAssistantContent, model, totalInputTokens, totalOutputTokens, - workspaceId, session.user.id, usageSource, usageMonth, - ) + await saveChatResult( + conversationId, body.message, assistantText, + lastAssistantContent, model, totalInputTokens, totalOutputTokens, + workspaceId, session.user.id, usageSource, usageMonth, + ) // Webhook events are now emitted from conversation-engine.ts per tool execution - } - catch (e: unknown) { - const msg = e instanceof Error ? e.message : 'Chat error' - // eslint-disable-next-line no-console - console.error('[chat] Error:', msg) - try { - await eventStream.push(JSON.stringify({ type: 'error', message: msg })) } - catch { /* stream closed */ } - } - finally { - try { - await eventStream.close() + catch (e: unknown) { + const msg = e instanceof Error ? e.message : 'Chat error' + // eslint-disable-next-line no-console + console.error('[chat] Error:', msg) + try { + await eventStream.push(JSON.stringify({ type: 'error', message: msg })) + } + catch { /* stream closed */ } + } + finally { + // Revert the reserved slot if no provider event ever flipped + // `committed` (provider auth failure, abort before first token, + // brain/tool init failure that surfaced here, etc.). + await tryRevert('post-reserve') + try { + await eventStream.close() + } + catch { /* already closed */ } } - catch { /* already closed */ } } - } - processChat() - eventStream.onClosed(() => { - abortController.abort() - }) - return eventStream.send() + processChat() + eventStream.onClosed(() => { + abortController.abort() + }) + return eventStream.send() + } + catch (err) { + // Pre-AI failure paths (DB reserve fail → 429 throw; createConversation, + // loadMessages, brain cache, etc.). processChat hasn't started yet so + // `committed` is still false; tryRevert refunds the slot if reserved. + await tryRevert('pre-AI') + throw err + } }) diff --git a/server/providers/database.ts b/server/providers/database.ts index 50145ba..fa704a3 100644 --- a/server/providers/database.ts +++ b/server/providers/database.ts @@ -328,6 +328,19 @@ export interface DatabaseProvider { outputTokens: number }) => Promise + /** + * Revert a previously-reserved message slot. Called from chat error + * paths when the AI call never produced a billable event (text or + * tool_use). Clamped at zero by the underlying RPC so concurrent + * reverts can't push `message_count` negative. + */ + decrementAgentUsage: (input: { + workspaceId: string + userId: string + month: string + source: string + }) => Promise + // ═══════════════════════════════════════════════════ // API MESSAGE USAGE (Conversation API) // ═══════════════════════════════════════════════════ @@ -362,6 +375,16 @@ export interface DatabaseProvider { outputTokens: number }) => Promise + /** + * Revert a previously-reserved API message slot. Symmetric with + * `decrementAgentUsage` for the Conversation API path. + */ + decrementAPIUsage: (input: { + workspaceId: string + apiKeyId: string + month: string + }) => Promise + // ═══════════════════════════════════════════════════ // MEDIA ASSETS // ═══════════════════════════════════════════════════ diff --git a/server/providers/supabase-db/conversations.ts b/server/providers/supabase-db/conversations.ts index 354de3a..f09fc08 100644 --- a/server/providers/supabase-db/conversations.ts +++ b/server/providers/supabase-db/conversations.ts @@ -20,8 +20,10 @@ type ConversationMethods = Pick< | 'getMonthlyUsageSummary' | 'incrementAgentUsageIfAllowed' | 'updateAgentUsageTokens' + | 'decrementAgentUsage' | 'incrementAPIUsageIfAllowed' | 'updateAPIUsageTokens' + | 'decrementAPIUsage' | 'getBYOAKey' > @@ -245,6 +247,19 @@ export function conversationMethods(): ConversationMethods { }) }, + async decrementAgentUsage(input) { + const admin = getAdmin() + const { error } = await admin.rpc('decrement_agent_usage', { + p_workspace_id: input.workspaceId, + p_user_id: input.userId, + p_month: input.month, + p_source: input.source, + }) + if (error) { + throw createError({ statusCode: 500, message: `Agent usage revert failed: ${error.message}` }) + } + }, + // ─── API Message Usage (Conversation API) ─── async incrementAPIUsageIfAllowed(input) { @@ -276,6 +291,18 @@ export function conversationMethods(): ConversationMethods { }) }, + async decrementAPIUsage(input) { + const admin = getAdmin() + const { error } = await admin.rpc('decrement_api_usage', { + p_workspace_id: input.workspaceId, + p_api_key_id: input.apiKeyId, + p_month: input.month, + }) + if (error) { + throw createError({ statusCode: 500, message: `API usage revert failed: ${error.message}` }) + } + }, + // ─── BYOA Key ─── async getBYOAKey(accessToken, workspaceId, userId) { diff --git a/supabase/migrations/007_agent_usage_revert_rpcs.sql b/supabase/migrations/007_agent_usage_revert_rpcs.sql new file mode 100644 index 0000000..4b71f76 --- /dev/null +++ b/supabase/migrations/007_agent_usage_revert_rpcs.sql @@ -0,0 +1,61 @@ +-- Revert RPCs for atomic message-slot reservations. +-- +-- Both `incrementAgentUsageIfAllowed` (studio chat) and +-- `incrementAPIUsageIfAllowed` (Conversation API) reserve a message +-- slot BEFORE the LLM call so concurrent requests can't double-spend +-- the last slot under the plan limit. The previous design treated +-- every reservation as terminal: a slot consumed before the AI call +-- (DB error, brain-cache failure, Anthropic auth failure, client +-- abort before first token) stayed consumed and counted toward the +-- monthly limit even though no LLM work was done. +-- +-- The runtime billing rule is: a message becomes billable the moment +-- Anthropic streams its first `text` or `tool_use` event. Any failure +-- before that point triggers a revert via these RPCs. Once committed, +-- failures (provider error mid-stream, save failure, etc.) stay +-- billable — we paid for whatever tokens we received. +-- +-- `GREATEST(message_count - 1, 0)` clamps the count at zero so a +-- double-revert race (caller's finally + caller's outer catch both +-- firing through best-effort wrappers) can never push the counter +-- negative. + +CREATE FUNCTION public.decrement_agent_usage( + p_workspace_id uuid, + p_user_id uuid, + p_month text, + p_source text +) RETURNS void +LANGUAGE plpgsql SECURITY DEFINER +SET search_path TO '' +AS $$ +BEGIN + UPDATE public.agent_usage + SET message_count = GREATEST(message_count - 1, 0), + updated_at = now() + WHERE workspace_id = p_workspace_id + AND user_id = p_user_id + AND month = p_month + AND source = p_source + AND message_count > 0; +END; +$$; + +CREATE FUNCTION public.decrement_api_usage( + p_workspace_id uuid, + p_api_key_id uuid, + p_month text +) RETURNS void +LANGUAGE plpgsql SECURITY DEFINER +SET search_path TO '' +AS $$ +BEGIN + UPDATE public.api_message_usage + SET message_count = GREATEST(message_count - 1, 0), + updated_at = now() + WHERE workspace_id = p_workspace_id + AND api_key_id = p_api_key_id + AND month = p_month + AND message_count > 0; +END; +$$; diff --git a/tests/integration/chat-route.integration.test.ts b/tests/integration/chat-route.integration.test.ts index 4ab4cac..59e1a78 100644 --- a/tests/integration/chat-route.integration.test.ts +++ b/tests/integration/chat-route.integration.test.ts @@ -375,4 +375,222 @@ describe('chat route integration', () => { expect(saveChatResult).not.toHaveBeenCalled() }) }) + + it('refunds the reserved slot when the provider errors before yielding any event', async () => { + const incrementAgentUsageIfAllowed = vi.fn().mockResolvedValue({ allowed: true, currentCount: 1 }) + const decrementAgentUsage = vi.fn().mockResolvedValue(undefined) + const recordAIUsage = vi.fn().mockResolvedValue(undefined) + + vi.stubGlobal('getRouterParam', vi.fn((_: unknown, key: string) => { + if (key === 'workspaceId') return 'workspace-1' + if (key === 'projectId') return 'project-1' + return undefined + })) + vi.stubGlobal('requireAuth', vi.fn().mockReturnValue({ + user: { id: 'user-1', email: 'user@example.com' }, + accessToken: 'token-1', + })) + vi.stubGlobal('useDatabaseProvider', vi.fn().mockReturnValue({ + incrementAgentUsageIfAllowed, + decrementAgentUsage, + getConversation: vi.fn().mockResolvedValue({ id: 'conversation-existing' }), + createConversation: vi.fn().mockResolvedValue('conversation-existing'), + loadConversationMessages: vi.fn().mockResolvedValue([]), + })) + vi.stubGlobal('resolveProjectContext', vi.fn().mockResolvedValue({ + project: { id: 'project-1', status: 'active' }, + workspace: { id: 'workspace-1', plan: 'starter' }, + git: createGitStub(), + contentRoot: '', + })) + vi.stubGlobal('getWorkspacePlan', vi.fn().mockReturnValue('starter')) + // Finite limit triggers the actual reservation path. + vi.stubGlobal('getMonthlyMessageLimit', vi.fn().mockReturnValue(100)) + vi.stubGlobal('resolveAgentPermissions', vi.fn().mockResolvedValue({ + availableTools: ['get_content'], + specificModels: false, + allowedModels: [], + })) + vi.stubGlobal('hasFeature', vi.fn().mockReturnValue(false)) + vi.stubGlobal('saveChatResult', vi.fn().mockResolvedValue(undefined)) + vi.stubGlobal('createContentEngine', vi.fn().mockReturnValue({})) + vi.stubGlobal('buildSystemPrompt', vi.fn().mockReturnValue('system')) + vi.stubGlobal('buildContentIndex', vi.fn().mockReturnValue('')) + vi.stubGlobal('getOrBuildBrainCache', vi.fn().mockResolvedValue({ + config: null, + models: new Map(), + vocabulary: null, + contentContext: null, + })) + vi.stubGlobal('filterToolsByPermissions', vi.fn().mockReturnValue([])) + vi.stubGlobal('STUDIO_TOOLS', []) + vi.stubGlobal('recordAIUsage', recordAIUsage) + vi.stubGlobal('useAIProvider', vi.fn().mockReturnValue({ + streamCompletion: () => ({ + [Symbol.asyncIterator]() { + return { + next: async () => { throw new Error('anthropic auth failed') }, + } + }, + }), + })) + + await withTestServer({ + routes: [ + { path: '/api/workspaces/workspace-1/projects/project-1/chat', handler: await loadChatHandler() }, + ], + }, async ({ request }) => { + const response = await request('/api/workspaces/workspace-1/projects/project-1/chat', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ message: 'hello', conversationId: 'conversation-existing' }), + }) + await response.text() // drain SSE + + expect(incrementAgentUsageIfAllowed).toHaveBeenCalledTimes(1) + expect(decrementAgentUsage).toHaveBeenCalledTimes(1) + expect(decrementAgentUsage).toHaveBeenCalledWith({ + workspaceId: 'workspace-1', + userId: 'user-1', + month: expect.any(String), + source: 'studio', + }) + // Meter event only fires after first billable provider event. + expect(recordAIUsage).not.toHaveBeenCalled() + }) + }) + + it('keeps the reservation when the provider yields at least one event before erroring', async () => { + const incrementAgentUsageIfAllowed = vi.fn().mockResolvedValue({ allowed: true, currentCount: 1 }) + const decrementAgentUsage = vi.fn().mockResolvedValue(undefined) + const recordAIUsage = vi.fn().mockResolvedValue(undefined) + + vi.stubGlobal('getRouterParam', vi.fn((_: unknown, key: string) => { + if (key === 'workspaceId') return 'workspace-1' + if (key === 'projectId') return 'project-1' + return undefined + })) + vi.stubGlobal('requireAuth', vi.fn().mockReturnValue({ + user: { id: 'user-1', email: 'user@example.com' }, + accessToken: 'token-1', + })) + vi.stubGlobal('useDatabaseProvider', vi.fn().mockReturnValue({ + incrementAgentUsageIfAllowed, + decrementAgentUsage, + getConversation: vi.fn().mockResolvedValue({ id: 'conversation-existing' }), + createConversation: vi.fn().mockResolvedValue('conversation-existing'), + loadConversationMessages: vi.fn().mockResolvedValue([]), + })) + vi.stubGlobal('resolveProjectContext', vi.fn().mockResolvedValue({ + project: { id: 'project-1', status: 'active' }, + workspace: { id: 'workspace-1', plan: 'starter' }, + git: createGitStub(), + contentRoot: '', + })) + vi.stubGlobal('getWorkspacePlan', vi.fn().mockReturnValue('starter')) + vi.stubGlobal('getMonthlyMessageLimit', vi.fn().mockReturnValue(100)) + vi.stubGlobal('resolveAgentPermissions', vi.fn().mockResolvedValue({ + availableTools: ['get_content'], + specificModels: false, + allowedModels: [], + })) + vi.stubGlobal('hasFeature', vi.fn().mockReturnValue(false)) + vi.stubGlobal('saveChatResult', vi.fn().mockResolvedValue(undefined)) + vi.stubGlobal('createContentEngine', vi.fn().mockReturnValue({})) + vi.stubGlobal('buildSystemPrompt', vi.fn().mockReturnValue('system')) + vi.stubGlobal('buildContentIndex', vi.fn().mockReturnValue('')) + vi.stubGlobal('getOrBuildBrainCache', vi.fn().mockResolvedValue({ + config: null, + models: new Map(), + vocabulary: null, + contentContext: null, + })) + vi.stubGlobal('filterToolsByPermissions', vi.fn().mockReturnValue([])) + vi.stubGlobal('STUDIO_TOOLS', []) + vi.stubGlobal('recordAIUsage', recordAIUsage) + vi.stubGlobal('useAIProvider', vi.fn().mockReturnValue({ + streamCompletion: async function* () { + // First real event → request becomes billable. + yield { type: 'text', content: 'partial answer...' } + // Now the provider explodes mid-stream — we already paid for these tokens. + throw new Error('connection reset') + }, + })) + + await withTestServer({ + routes: [ + { path: '/api/workspaces/workspace-1/projects/project-1/chat', handler: await loadChatHandler() }, + ], + }, async ({ request }) => { + const response = await request('/api/workspaces/workspace-1/projects/project-1/chat', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ message: 'hello', conversationId: 'conversation-existing' }), + }) + await response.text() + + expect(incrementAgentUsageIfAllowed).toHaveBeenCalledTimes(1) + // Slot stays consumed: first real provider event flipped `committed`. + expect(decrementAgentUsage).not.toHaveBeenCalled() + expect(recordAIUsage).toHaveBeenCalledTimes(1) + }) + }) + + it('refunds the reserved slot when a pre-AI step fails after reservation', async () => { + const incrementAgentUsageIfAllowed = vi.fn().mockResolvedValue({ allowed: true, currentCount: 1 }) + const decrementAgentUsage = vi.fn().mockResolvedValue(undefined) + const recordAIUsage = vi.fn().mockResolvedValue(undefined) + const loadConversationMessages = vi.fn().mockRejectedValue(new Error('history table offline')) + + vi.stubGlobal('getRouterParam', vi.fn((_: unknown, key: string) => { + if (key === 'workspaceId') return 'workspace-1' + if (key === 'projectId') return 'project-1' + return undefined + })) + vi.stubGlobal('requireAuth', vi.fn().mockReturnValue({ + user: { id: 'user-1', email: 'user@example.com' }, + accessToken: 'token-1', + })) + vi.stubGlobal('useDatabaseProvider', vi.fn().mockReturnValue({ + incrementAgentUsageIfAllowed, + decrementAgentUsage, + getConversation: vi.fn().mockResolvedValue({ id: 'conversation-existing' }), + createConversation: vi.fn().mockResolvedValue('conversation-existing'), + loadConversationMessages, + })) + vi.stubGlobal('resolveProjectContext', vi.fn().mockResolvedValue({ + project: { id: 'project-1', status: 'active' }, + workspace: { id: 'workspace-1', plan: 'starter' }, + git: createGitStub(), + contentRoot: '', + })) + vi.stubGlobal('getWorkspacePlan', vi.fn().mockReturnValue('starter')) + vi.stubGlobal('getMonthlyMessageLimit', vi.fn().mockReturnValue(100)) + vi.stubGlobal('resolveAgentPermissions', vi.fn().mockResolvedValue({ + availableTools: ['get_content'], + specificModels: false, + allowedModels: [], + })) + vi.stubGlobal('hasFeature', vi.fn().mockReturnValue(false)) + vi.stubGlobal('recordAIUsage', recordAIUsage) + + await withTestServer({ + routes: [ + { path: '/api/workspaces/workspace-1/projects/project-1/chat', handler: await loadChatHandler() }, + ], + }, async ({ request }) => { + const response = await request('/api/workspaces/workspace-1/projects/project-1/chat', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ message: 'hello', conversationId: 'conversation-existing' }), + }) + // Handler throws before streaming starts; h3 turns it into a 500. + expect(response.status).toBeGreaterThanOrEqual(500) + + expect(incrementAgentUsageIfAllowed).toHaveBeenCalledTimes(1) + expect(loadConversationMessages).toHaveBeenCalledTimes(1) + expect(decrementAgentUsage).toHaveBeenCalledTimes(1) + expect(recordAIUsage).not.toHaveBeenCalled() + }) + }) }) diff --git a/tests/unit/db.test.ts b/tests/unit/db.test.ts index 6db6223..a0af57d 100644 --- a/tests/unit/db.test.ts +++ b/tests/unit/db.test.ts @@ -47,7 +47,9 @@ describe('db helpers', () => { insertMessage: vi.fn().mockResolvedValue(undefined), upsertAgentUsage: vi.fn().mockResolvedValue(undefined), updateAgentUsageTokens: vi.fn().mockResolvedValue(undefined), + decrementAgentUsage: vi.fn().mockResolvedValue(undefined), updateAPIUsageTokens: vi.fn().mockResolvedValue(undefined), + decrementAPIUsage: vi.fn().mockResolvedValue(undefined), updateConversationTimestamp: vi.fn().mockResolvedValue(undefined), getBYOAKey: vi.fn().mockResolvedValue(null), }