Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions ee/enterprise/conversation-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import { errorMessage } from '../../server/utils/content-strings'
import { normalizeContentRoot } from '../../server/utils/content-paths'
import { runConversationLoop } from '../../server/utils/conversation-engine'
import { validateConversationKey } from '../../server/utils/conversation-keys'
import { saveChatResult } from '../../server/utils/db'
import { getWorkspacePlan, hasFeature } from '../../server/utils/license'
import { saveApiChatResult } from '../../server/utils/db'
import { getPlanLimit, getWorkspacePlan, hasFeature } from '../../server/utils/license'
import { getEffectiveLimit } from '../../server/utils/overage'
import { checkRateLimit } from '../../server/utils/rate-limit'
import { useDatabaseProvider, useGitProvider } from '../../server/utils/providers'

Expand Down Expand Up @@ -163,18 +164,28 @@ async function runConversationMessage(
if (!rateCheck.allowed)
throw createError({ statusCode: 429, message: errorMessage('chat.rate_limited', { seconds: Math.ceil(rateCheck.retryAfterMs / 1000) }) })

// Atomic: check monthly limit + reserve a message slot (prevents race conditions)
// Atomic: enforce per-key AND per-workspace caps in one RPC, then
// reserve a message slot. `getEffectiveLimit` raises the cap to
// SOFT_CAP_MAX when overage is enabled or the plan is Infinity, so
// the RPC stays integer-typed and overage requests fall through to
// the meter outbox below.
const usageMonth = new Date().toISOString().substring(0, 7)
const { allowed } = await db.incrementAgentUsageIfAllowed({
const overageSettings = (event.context as { billing?: { overageSettings?: Record<string, boolean> } }).billing?.overageSettings
const workspacePlanLimit = getPlanLimit(plan, 'api.messages_per_month')
const workspaceLimit = getEffectiveLimit(workspacePlanLimit, 'api.messages_per_month', overageSettings)
const keyLimit = getEffectiveLimit(keyData.monthlyMessageLimit, 'api.messages_per_month', overageSettings)

const usageCheck = await db.incrementAPIUsageIfAllowed({
workspaceId: keyData.workspaceId,
userId: keyData.keyId,
apiKeyId: keyData.keyId,
month: usageMonth,
source: 'api',
limit: keyData.monthlyMessageLimit,
keyLimit,
workspaceLimit,
})
if (!allowed)
throw createError({ statusCode: 429, message: errorMessage('conversation.monthly_limit', { limit: keyData.monthlyMessageLimit }) })
if (!usageCheck.allowed) {
const limitForMessage = usageCheck.reason === 'workspace_limit' ? workspacePlanLimit : keyData.monthlyMessageLimit
throw createError({ statusCode: 429, message: errorMessage('conversation.monthly_limit', { limit: limitForMessage }) })
}

// Best-effort meter write for overage billing. Fire-and-forget.
recordAPIUsage({ workspaceId: keyData.workspaceId, count: 1, apiKeyId: keyData.keyId, month: usageMonth }).catch(() => {})
Expand Down Expand Up @@ -240,13 +251,13 @@ async function runConversationMessage(

let conversationId = body.conversationId
if (conversationId) {
const conv = await db.getConversation(conversationId, keyData.projectId, { userId: keyData.keyId })
const conv = await db.getConversation(conversationId, keyData.projectId, { apiKeyId: keyData.keyId })

if (!conv) conversationId = undefined
}

if (!conversationId) {
conversationId = await db.createConversation(keyData.projectId, keyData.keyId, body.message.substring(0, 100)) ?? undefined
conversationId = await db.createApiConversation(keyData.projectId, keyData.keyId, body.message.substring(0, 100)) ?? undefined
}

if (!conversationId)
Expand Down Expand Up @@ -320,7 +331,7 @@ async function runConversationMessage(
}
}

await saveChatResult(
await saveApiChatResult(
conversationId,
body.message,
responseText,
Expand All @@ -330,9 +341,7 @@ async function runConversationMessage(
totalOutputTokens,
keyData.workspaceId,
keyData.keyId,
'api',
usageMonth,
keyData.keyId,
)

return {
Expand All @@ -354,7 +363,7 @@ async function runConversationHistory(event: H3Event) {
if (!conversationId)
throw createError({ statusCode: 400, message: errorMessage('validation.conversation_id_required') })

const conv = await db.getConversation(conversationId, keyData.projectId, { userId: keyData.keyId })
const conv = await db.getConversation(conversationId, keyData.projectId, { apiKeyId: keyData.keyId })

if (!conv)
throw createError({ statusCode: 404, message: errorMessage('chat.conversation_not_found') })
Expand Down
19 changes: 15 additions & 4 deletions ee/enterprise/conversation-keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,19 @@ export function createConversationKeysBridge() {

await db.requireWorkspaceRole(session.accessToken, session.user.id, workspaceId, ['owner', 'admin'])

if (body.customInstructions !== undefined) {
// Resolve plan once if any plan-gated field is being updated.
// monthlyMessageLimit must be clamped to the workspace plan cap
// — otherwise a downgraded workspace could carry a stale per-key
// limit that exceeds its current `api.messages_per_month`.
const needsPlan = body.customInstructions !== undefined || body.monthlyMessageLimit !== undefined
let plan: ReturnType<typeof getWorkspacePlan> | null = null
if (needsPlan) {
const workspace = await db.getWorkspaceById(workspaceId, 'plan')
plan = getWorkspacePlan(workspace ?? {})
}

const plan = getWorkspacePlan(workspace ?? {})
if (!hasFeature(plan, 'api.custom_instructions') && body.customInstructions)
if (body.customInstructions !== undefined) {
if (!hasFeature(plan!, 'api.custom_instructions') && body.customInstructions)
throw createError({ statusCode: 403, message: errorMessage('conversation.upgrade') })
}

Expand All @@ -170,7 +178,10 @@ export function createConversationKeysBridge() {
if (body.customInstructions !== undefined) updates.custom_instructions = body.customInstructions ? body.customInstructions.substring(0, 2000) : body.customInstructions
if (body.aiModel !== undefined && validModels.includes(body.aiModel)) updates.ai_model = body.aiModel
if (body.rateLimitPerMinute !== undefined) updates.rate_limit_per_minute = Math.max(1, Math.min(body.rateLimitPerMinute, 60))
if (body.monthlyMessageLimit !== undefined) updates.monthly_message_limit = Math.max(1, Math.min(body.monthlyMessageLimit, 100_000))
if (body.monthlyMessageLimit !== undefined) {
const planCap = getPlanLimit(plan!, 'api.messages_per_month')
updates.monthly_message_limit = Math.max(1, Math.min(body.monthlyMessageLimit, planCap))
}

if (Object.keys(updates).length === 0)
throw createError({ statusCode: 400, message: errorMessage('validation.no_fields_to_update') })
Expand Down
53 changes: 52 additions & 1 deletion server/providers/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,24 @@ export interface DatabaseProvider {
// ═══════════════════════════════════════════════════

createConversation: (projectId: string, userId: string, title: string) => Promise<string | null>
getConversation: (conversationId: string, projectId: string, filters?: { userId?: string }) => Promise<DatabaseRow | null>
/**
* Create a Conversation API-owned chat thread. The API key acts as
* the actor in place of a workspace member; `conversations.user_id`
* stays NULL and `api_key_id` carries the owner reference. Enforced
* by `conversations_actor_xor` — exactly one of user_id / api_key_id
* is non-null per row.
*/
createApiConversation: (projectId: string, apiKeyId: string, title: string) => Promise<string | null>
/**
* Ownership check before continuing a conversation. Pass either a
* `userId` (Studio chat) or an `apiKeyId` (Conversation API) — never
* both. The XOR is mirrored from the DB-level CHECK on `conversations`.
*/
getConversation: (
conversationId: string,
projectId: string,
filters?: { userId: string } | { apiKeyId: string },
) => Promise<DatabaseRow | null>
listConversations: (accessToken: string, projectId: string, userId: string) => Promise<DatabaseRow[]>
deleteConversation: (accessToken: string, conversationId: string, userId: string, projectId: string) => Promise<void>
updateConversationTimestamp: (conversationId: string) => Promise<void>
Expand Down Expand Up @@ -311,6 +328,40 @@ export interface DatabaseProvider {
outputTokens: number
}) => Promise<void>

// ═══════════════════════════════════════════════════
// API MESSAGE USAGE (Conversation API)
// ═══════════════════════════════════════════════════

/**
* Atomic: enforce BOTH per-key and per-workspace monthly caps in
* one RPC and reserve a message slot. Reason discriminator lets the
* caller surface the right 429 message ("your key is over" vs "the
* workspace plan is over").
*
* Pass `SOFT_CAP_MAX` (from `getEffectiveLimit`) for either limit
* when overage is enabled or the plan grants Infinity.
*/
incrementAPIUsageIfAllowed: (input: {
workspaceId: string
apiKeyId: string
month: string
keyLimit: number
workspaceLimit: number
}) => Promise<{
allowed: boolean
reason: 'ok' | 'key_limit' | 'workspace_limit'
current: number
}>

/** Increment token counters on the `api_message_usage` row after the AI call settles. */
updateAPIUsageTokens: (input: {
workspaceId: string
apiKeyId: string
month: string
inputTokens: number
outputTokens: number
}) => Promise<void>

// ═══════════════════════════════════════════════════
// MEDIA ASSETS
// ═══════════════════════════════════════════════════
Expand Down
54 changes: 52 additions & 2 deletions server/providers/supabase-db/conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { getAdmin, getUser } from './helpers'
type ConversationMethods = Pick<
DatabaseProvider,
| 'createConversation'
| 'createApiConversation'
| 'getConversation'
| 'listConversations'
| 'deleteConversation'
Expand All @@ -19,6 +20,8 @@ type ConversationMethods = Pick<
| 'getMonthlyUsageSummary'
| 'incrementAgentUsageIfAllowed'
| 'updateAgentUsageTokens'
| 'incrementAPIUsageIfAllowed'
| 'updateAPIUsageTokens'
| 'getBYOAKey'
>

Expand All @@ -39,15 +42,31 @@ export function conversationMethods(): ConversationMethods {
return data?.id ?? null
},

async createApiConversation(projectId, apiKeyId, title) {
const admin = getAdmin()
const { data } = await admin
.from('conversations')
.insert({
project_id: projectId,
api_key_id: apiKeyId,
title: title.substring(0, 100),
})
.select('id')
.single()

return data?.id ?? null
},

async getConversation(conversationId, projectId, filters) {
const admin = getAdmin()
let query = admin
.from('conversations')
.select('id, title, user_id, created_at, updated_at')
.select('id, title, user_id, api_key_id, created_at, updated_at')
.eq('id', conversationId)
.eq('project_id', projectId)

if (filters?.userId) query = query.eq('user_id', filters.userId)
if (filters && 'userId' in filters) query = query.eq('user_id', filters.userId)
else if (filters && 'apiKeyId' in filters) query = query.eq('api_key_id', filters.apiKeyId)

const { data, error } = await query.single()
if (error) {
Expand Down Expand Up @@ -226,6 +245,37 @@ export function conversationMethods(): ConversationMethods {
})
},

// ─── API Message Usage (Conversation API) ───

async incrementAPIUsageIfAllowed(input) {
const admin = getAdmin()
const { data, error } = await admin.rpc('increment_api_usage_if_allowed', {
p_workspace_id: input.workspaceId,
p_api_key_id: input.apiKeyId,
p_month: input.month,
p_key_limit: input.keyLimit,
p_workspace_limit: input.workspaceLimit,
})

if (error) {
throw createError({ statusCode: 500, message: `Atomic API usage check failed: ${error.message}` })
}

const result = data as { allowed: boolean, reason: 'ok' | 'key_limit' | 'workspace_limit', current: number }
return result
},

async updateAPIUsageTokens(input) {
const admin = getAdmin()
await admin.rpc('increment_api_usage_tokens', {
p_workspace_id: input.workspaceId,
p_api_key_id: input.apiKeyId,
p_month: input.month,
p_input_tokens: input.inputTokens,
p_output_tokens: input.outputTokens,
})
},

// ─── BYOA Key ───

async getBYOAKey(accessToken, workspaceId, userId) {
Expand Down
5 changes: 3 additions & 2 deletions server/providers/supabase-db/usage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ export function usageMethods(): UsageMethods {
},

async getWorkspaceMonthlyAPIUsage(workspaceId, month) {
// Conversation API usage is keyed by `api_key_id`, not `user_id`,
// and lives in its own aggregate table — see migration 006.
const { data } = await getAdmin()
.from('agent_usage')
.from('api_message_usage')
.select('message_count')
.eq('workspace_id', workspaceId)
.eq('month', month)
.eq('source', 'api')

return (data ?? []).reduce(
(sum: number, r: Record<string, unknown>) => sum + ((r.message_count as number) ?? 0),
Expand Down
Loading
Loading