diff --git a/apps/sim/app/api/billing/update-cost/route.ts b/apps/sim/app/api/billing/update-cost/route.ts index 87b7f14f15..7f9f990077 100644 --- a/apps/sim/app/api/billing/update-cost/route.ts +++ b/apps/sim/app/api/billing/update-cost/route.ts @@ -33,7 +33,6 @@ export async function POST(req: NextRequest) { logger.info(`[${requestId}] Update cost request started`) if (!isBillingEnabled) { - logger.debug(`[${requestId}] Billing is disabled, skipping cost update`) return NextResponse.json({ success: true, message: 'Billing disabled, cost update skipped', diff --git a/apps/sim/app/api/chat/[identifier]/otp/route.ts b/apps/sim/app/api/chat/[identifier]/otp/route.ts index 7a9b7bdee9..e518ceb28a 100644 --- a/apps/sim/app/api/chat/[identifier]/otp/route.ts +++ b/apps/sim/app/api/chat/[identifier]/otp/route.ts @@ -117,8 +117,6 @@ export async function POST( const requestId = generateRequestId() try { - logger.debug(`[${requestId}] Processing OTP request for identifier: ${identifier}`) - const body = await request.json() const { email } = otpRequestSchema.parse(body) @@ -211,8 +209,6 @@ export async function PUT( const requestId = generateRequestId() try { - logger.debug(`[${requestId}] Verifying OTP for identifier: ${identifier}`) - const body = await request.json() const { email, otp } = otpVerifySchema.parse(body) diff --git a/apps/sim/app/api/chat/[identifier]/route.ts b/apps/sim/app/api/chat/[identifier]/route.ts index 57041c4cc5..772b1c56f4 100644 --- a/apps/sim/app/api/chat/[identifier]/route.ts +++ b/apps/sim/app/api/chat/[identifier]/route.ts @@ -42,8 +42,6 @@ export async function POST( const requestId = generateRequestId() try { - logger.debug(`[${requestId}] Processing chat request for identifier: ${identifier}`) - let parsedBody try { const rawBody = await request.json() @@ -294,8 +292,6 @@ export async function GET( const requestId = generateRequestId() try { - logger.debug(`[${requestId}] Fetching chat info for identifier: ${identifier}`) - const deploymentResult = await db .select({ id: chat.id, diff --git a/apps/sim/app/api/creators/route.ts b/apps/sim/app/api/creators/route.ts index 1113de3d45..843c0aea49 100644 --- a/apps/sim/app/api/creators/route.ts +++ b/apps/sim/app/api/creators/route.ts @@ -95,11 +95,6 @@ export async function POST(request: NextRequest) { const body = await request.json() const data = CreateCreatorProfileSchema.parse(body) - logger.debug(`[${requestId}] Creating creator profile:`, { - referenceType: data.referenceType, - referenceId: data.referenceId, - }) - // Validate permissions if (data.referenceType === 'user') { if (data.referenceId !== session.user.id) { diff --git a/apps/sim/app/api/form/[identifier]/route.ts b/apps/sim/app/api/form/[identifier]/route.ts index a4ad31eef8..d6d4f019e4 100644 --- a/apps/sim/app/api/form/[identifier]/route.ts +++ b/apps/sim/app/api/form/[identifier]/route.ts @@ -58,8 +58,6 @@ export async function POST( const requestId = generateRequestId() try { - logger.debug(`[${requestId}] Processing form submission for identifier: ${identifier}`) - let parsedBody try { const rawBody = await request.json() @@ -300,8 +298,6 @@ export async function GET( const requestId = generateRequestId() try { - logger.debug(`[${requestId}] Fetching form info for identifier: ${identifier}`) - const deploymentResult = await db .select({ id: form.id, diff --git a/apps/sim/app/api/help/route.ts b/apps/sim/app/api/help/route.ts index f874c6304b..2c2e24ddb7 100644 --- a/apps/sim/app/api/help/route.ts +++ b/apps/sim/app/api/help/route.ts @@ -77,8 +77,6 @@ export async function POST(req: NextRequest) { } } - logger.debug(`[${requestId}] Help request includes ${images.length} images`) - const userId = session.user.id let emailText = ` Type: ${type} diff --git a/apps/sim/app/api/knowledge/search/route.ts b/apps/sim/app/api/knowledge/search/route.ts index c3e6993140..686f7c19cc 100644 --- a/apps/sim/app/api/knowledge/search/route.ts +++ b/apps/sim/app/api/knowledge/search/route.ts @@ -186,8 +186,6 @@ export async function POST(request: NextRequest) { valueTo: filter.valueTo, } }) - - logger.debug(`[${requestId}] Processed ${structuredFilters.length} structured filters`) } if (accessibleKbIds.length === 0) { @@ -220,7 +218,6 @@ export async function POST(request: NextRequest) { if (!hasQuery && hasFilters) { // Tag-only search without vector similarity - logger.debug(`[${requestId}] Executing tag-only search with filters:`, structuredFilters) results = await handleTagOnlySearch({ knowledgeBaseIds: accessibleKbIds, topK: validatedData.topK, @@ -244,7 +241,6 @@ export async function POST(request: NextRequest) { }) } else if (hasQuery && !hasFilters) { // Vector-only search - logger.debug(`[${requestId}] Executing vector-only search`) const strategy = getQueryStrategy(accessibleKbIds.length, validatedData.topK) const queryVector = JSON.stringify(await queryEmbeddingPromise) diff --git a/apps/sim/app/api/knowledge/search/utils.ts b/apps/sim/app/api/knowledge/search/utils.ts index 3eba10f911..dc112fe24a 100644 --- a/apps/sim/app/api/knowledge/search/utils.ts +++ b/apps/sim/app/api/knowledge/search/utils.ts @@ -1,11 +1,8 @@ import { db } from '@sim/db' import { document, embedding } from '@sim/db/schema' -import { createLogger } from '@sim/logger' import { and, eq, inArray, isNull, sql } from 'drizzle-orm' import type { StructuredFilter } from '@/lib/knowledge/types' -const logger = createLogger('KnowledgeSearchUtils') - export async function getDocumentNamesByIds( documentIds: string[] ): Promise> { @@ -140,17 +137,12 @@ function buildFilterCondition(filter: StructuredFilter, embeddingTable: any) { const { tagSlot, fieldType, operator, value, valueTo } = filter if (!isTagSlotKey(tagSlot)) { - logger.debug(`[getStructuredTagFilters] Unknown tag slot: ${tagSlot}`) return null } const column = embeddingTable[tagSlot] if (!column) return null - logger.debug( - `[getStructuredTagFilters] Processing ${tagSlot} (${fieldType}) ${operator} ${value}` - ) - // Handle text operators if (fieldType === 'text') { const stringValue = String(value) @@ -208,7 +200,6 @@ function buildFilterCondition(filter: StructuredFilter, embeddingTable: any) { const dateStr = String(value) // Validate YYYY-MM-DD format if (!/^\d{4}-\d{2}-\d{2}$/.test(dateStr)) { - logger.debug(`[getStructuredTagFilters] Invalid date format: ${dateStr}, expected YYYY-MM-DD`) return null } @@ -287,9 +278,6 @@ function getStructuredTagFilters(filters: StructuredFilter[], embeddingTable: an conditions.push(slotConditions[0]) } else { // Multiple conditions for same slot - OR them together - logger.debug( - `[getStructuredTagFilters] OR'ing ${slotConditions.length} conditions for ${slot}` - ) conditions.push(sql`(${sql.join(slotConditions, sql` OR `)})`) } } @@ -380,8 +368,6 @@ export async function handleTagOnlySearch(params: SearchParams): Promise`${embedding.embedding} <=> ${queryVector}::vector`.as('distance') @@ -489,23 +473,13 @@ export async function handleTagAndVectorSearch(params: SearchParams): Promise r.id), diff --git a/apps/sim/app/api/logs/execution/[executionId]/route.ts b/apps/sim/app/api/logs/execution/[executionId]/route.ts index 27a75298d2..90e0747b00 100644 --- a/apps/sim/app/api/logs/execution/[executionId]/route.ts +++ b/apps/sim/app/api/logs/execution/[executionId]/route.ts @@ -34,10 +34,6 @@ export async function GET( const authenticatedUserId = authResult.userId - logger.debug( - `[${requestId}] Fetching execution data for: ${executionId} (auth: ${authResult.authType})` - ) - const [workflowLog] = await db .select({ id: workflowExecutionLogs.id, @@ -125,11 +121,6 @@ export async function GET( }, } - logger.debug(`[${requestId}] Successfully fetched execution data for: ${executionId}`) - logger.debug( - `[${requestId}] Workflow state contains ${Object.keys((snapshot.stateData as any)?.blocks || {}).length} blocks` - ) - return NextResponse.json(response) } catch (error) { logger.error(`[${requestId}] Error fetching execution data:`, error) diff --git a/apps/sim/app/api/mcp/tools/execute/route.ts b/apps/sim/app/api/mcp/tools/execute/route.ts index 4229cebbfd..d44839590f 100644 --- a/apps/sim/app/api/mcp/tools/execute/route.ts +++ b/apps/sim/app/api/mcp/tools/execute/route.ts @@ -83,7 +83,6 @@ export const POST = withMcpAuth('read')( serverId: serverId, serverName: 'provided-schema', } as McpTool - logger.debug(`[${requestId}] Using provided schema for ${toolName}, skipping discovery`) } else { const tools = await mcpService.discoverServerTools(userId, serverId, workspaceId) tool = tools.find((t) => t.name === toolName) ?? null diff --git a/apps/sim/app/api/schedules/[id]/route.ts b/apps/sim/app/api/schedules/[id]/route.ts index 2d3da99cd8..eb65f07b53 100644 --- a/apps/sim/app/api/schedules/[id]/route.ts +++ b/apps/sim/app/api/schedules/[id]/route.ts @@ -26,7 +26,6 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{ try { const { id: scheduleId } = await params - logger.debug(`[${requestId}] Reactivating schedule with ID: ${scheduleId}`) const session = await getSession() if (!session?.user?.id) { diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index d985cf72b1..c928b17143 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -51,7 +51,6 @@ export async function GET(request: NextRequest) { lastQueuedAt: workflowSchedule.lastQueuedAt, }) - logger.debug(`[${requestId}] Successfully queried schedules: ${dueSchedules.length} found`) logger.info(`[${requestId}] Processing ${dueSchedules.length} due scheduled workflows`) const jobQueue = await getJobQueue() diff --git a/apps/sim/app/api/templates/[id]/route.ts b/apps/sim/app/api/templates/[id]/route.ts index b7a0425fff..2ea215566e 100644 --- a/apps/sim/app/api/templates/[id]/route.ts +++ b/apps/sim/app/api/templates/[id]/route.ts @@ -24,8 +24,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ try { const session = await getSession() - logger.debug(`[${requestId}] Fetching template: ${id}`) - const result = await db .select({ template: templates, @@ -74,8 +72,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ views: sql`${templates.views} + 1`, }) .where(eq(templates.id, id)) - - logger.debug(`[${requestId}] Incremented view count for template: ${id}`) } catch (viewError) { logger.warn(`[${requestId}] Failed to increment view count for template: ${id}`, viewError) } diff --git a/apps/sim/app/api/templates/[id]/star/route.ts b/apps/sim/app/api/templates/[id]/star/route.ts index 8f9fc19a0a..bd8b2db082 100644 --- a/apps/sim/app/api/templates/[id]/star/route.ts +++ b/apps/sim/app/api/templates/[id]/star/route.ts @@ -58,8 +58,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) } - logger.debug(`[${requestId}] Adding star for template: ${id}, user: ${session.user.id}`) - // Verify the template exists const templateExists = await db .select({ id: templates.id }) @@ -133,8 +131,6 @@ export async function DELETE( return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) } - logger.debug(`[${requestId}] Removing star for template: ${id}, user: ${session.user.id}`) - // Check if the star exists const existingStar = await db .select({ id: templateStars.id }) diff --git a/apps/sim/app/api/templates/route.ts b/apps/sim/app/api/templates/route.ts index 74d84be2b0..55628bfc7c 100644 --- a/apps/sim/app/api/templates/route.ts +++ b/apps/sim/app/api/templates/route.ts @@ -68,8 +68,6 @@ export async function GET(request: NextRequest) { const { searchParams } = new URL(request.url) const params = QueryParamsSchema.parse(Object.fromEntries(searchParams.entries())) - logger.debug(`[${requestId}] Fetching templates with params:`, params) - // Check if user is a super user const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id) const isSuperUser = effectiveSuperUser @@ -187,11 +185,6 @@ export async function POST(request: NextRequest) { const body = await request.json() const data = CreateTemplateSchema.parse(body) - logger.debug(`[${requestId}] Creating template:`, { - name: data.name, - workflowId: data.workflowId, - }) - // Verify the workflow exists and belongs to the user const workflowExists = await db .select({ id: workflow.id }) diff --git a/apps/sim/app/api/users/me/api-keys/[id]/route.ts b/apps/sim/app/api/users/me/api-keys/[id]/route.ts index 596d2812c8..e9344b86dd 100644 --- a/apps/sim/app/api/users/me/api-keys/[id]/route.ts +++ b/apps/sim/app/api/users/me/api-keys/[id]/route.ts @@ -18,7 +18,6 @@ export async function DELETE( const { id } = await params try { - logger.debug(`[${requestId}] Deleting API key: ${id}`) const session = await getSession() if (!session?.user?.id) { return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) diff --git a/apps/sim/app/api/wand/route.ts b/apps/sim/app/api/wand/route.ts index 2bd65fa0ff..f868364ae6 100644 --- a/apps/sim/app/api/wand/route.ts +++ b/apps/sim/app/api/wand/route.ts @@ -103,12 +103,10 @@ async function updateUserStatsForWand( isBYOK = false ): Promise { if (!isBillingEnabled) { - logger.debug(`[${requestId}] Billing is disabled, skipping wand usage cost update`) return } if (!usage.total_tokens || usage.total_tokens <= 0) { - logger.debug(`[${requestId}] No tokens to update in user stats`) return } @@ -146,13 +144,6 @@ async function updateUserStatsForWand( }) .where(eq(userStats.userId, userId)) - logger.debug(`[${requestId}] Updated user stats for wand usage`, { - userId, - tokensUsed: totalTokens, - costAdded: costToStore, - isBYOK, - }) - await logModelUsage({ userId, source: 'wand', @@ -291,23 +282,8 @@ export async function POST(req: NextRequest) { messages.push({ role: 'user', content: prompt }) - logger.debug( - `[${requestId}] Calling ${useWandAzure ? 'Azure OpenAI' : 'OpenAI'} API for wand generation`, - { - stream, - historyLength: history.length, - endpoint: useWandAzure ? azureEndpoint : 'api.openai.com', - model: useWandAzure ? wandModelName : 'gpt-4o', - apiVersion: useWandAzure ? azureApiVersion : 'N/A', - } - ) - if (stream) { try { - logger.debug( - `[${requestId}] Starting streaming request to ${useWandAzure ? 'Azure OpenAI' : 'OpenAI'}` - ) - logger.info( `[${requestId}] About to create stream with model: ${useWandAzure ? wandModelName : 'gpt-4o'}` ) @@ -327,8 +303,6 @@ export async function POST(req: NextRequest) { headers.Authorization = `Bearer ${activeOpenAIKey}` } - logger.debug(`[${requestId}] Making streaming request to: ${apiUrl}`) - const response = await fetch(apiUrl, { method: 'POST', headers, @@ -429,7 +403,6 @@ export async function POST(req: NextRequest) { try { parsed = JSON.parse(data) } catch (parseError) { - logger.debug(`[${requestId}] Skipped non-JSON line: ${data.substring(0, 100)}`) continue } diff --git a/apps/sim/app/api/webhooks/[id]/route.ts b/apps/sim/app/api/webhooks/[id]/route.ts index 447527236d..f1f1fbd628 100644 --- a/apps/sim/app/api/webhooks/[id]/route.ts +++ b/apps/sim/app/api/webhooks/[id]/route.ts @@ -21,7 +21,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ try { const { id } = await params - logger.debug(`[${requestId}] Fetching webhook with ID: ${id}`) const auth = await checkSessionOrInternalAuth(request, { requireWorkflowId: false }) if (!auth.success || !auth.userId) { @@ -77,7 +76,6 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise< try { const { id } = await params - logger.debug(`[${requestId}] Updating webhook with ID: ${id}`) const auth = await checkSessionOrInternalAuth(request, { requireWorkflowId: false }) if (!auth.success || !auth.userId) { @@ -129,11 +127,6 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise< return NextResponse.json({ error: 'Access denied' }, { status: 403 }) } - logger.debug(`[${requestId}] Updating webhook properties`, { - hasActiveUpdate: isActive !== undefined, - hasFailedCountUpdate: failedCount !== undefined, - }) - const updatedWebhook = await db .update(webhook) .set({ @@ -161,7 +154,6 @@ export async function DELETE( try { const { id } = await params - logger.debug(`[${requestId}] Deleting webhook with ID: ${id}`) const auth = await checkSessionOrInternalAuth(request, { requireWorkflowId: false }) if (!auth.success || !auth.userId) { diff --git a/apps/sim/app/api/webhooks/route.ts b/apps/sim/app/api/webhooks/route.ts index 27a8b866aa..4d5508a125 100644 --- a/apps/sim/app/api/webhooks/route.ts +++ b/apps/sim/app/api/webhooks/route.ts @@ -112,7 +112,6 @@ export async function GET(request: NextRequest) { return NextResponse.json({ webhooks: [] }, { status: 200 }) } - logger.debug(`[${requestId}] Fetching workspace-accessible webhooks for ${session.user.id}`) const workspacePermissionRows = await db .select({ workspaceId: permissions.entityId }) .from(permissions) diff --git a/apps/sim/app/api/workflows/[id]/chat/status/route.ts b/apps/sim/app/api/workflows/[id]/chat/status/route.ts index 4dbaf214eb..ef84667d5d 100644 --- a/apps/sim/app/api/workflows/[id]/chat/status/route.ts +++ b/apps/sim/app/api/workflows/[id]/chat/status/route.ts @@ -35,8 +35,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ ) } - logger.debug(`[${requestId}] Checking chat deployment status for workflow: ${id}`) - // Find any active chat deployments for this workflow const deploymentResults = await db .select({ diff --git a/apps/sim/app/api/workflows/[id]/deploy/route.ts b/apps/sim/app/api/workflows/[id]/deploy/route.ts index 697ddd5764..1dd8798a3f 100644 --- a/apps/sim/app/api/workflows/[id]/deploy/route.ts +++ b/apps/sim/app/api/workflows/[id]/deploy/route.ts @@ -22,6 +22,7 @@ import { } from '@/lib/workflows/schedules' import { validateWorkflowPermissions } from '@/lib/workflows/utils' import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils' +import type { WorkflowState } from '@/stores/workflows/workflow/types' const logger = createLogger('WorkflowDeployAPI') @@ -33,8 +34,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ const { id } = await params try { - logger.debug(`[${requestId}] Fetching deployment info for workflow: ${id}`) - const { error, workflow: workflowData } = await validateWorkflowPermissions( id, requestId, @@ -86,7 +85,10 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ variables: workflowRecord?.variables || {}, } const { hasWorkflowChanged } = await import('@/lib/workflows/comparison') - needsRedeployment = hasWorkflowChanged(currentState as any, active.state as any) + needsRedeployment = hasWorkflowChanged( + currentState as WorkflowState, + active.state as WorkflowState + ) } } @@ -112,8 +114,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ const { id } = await params try { - logger.debug(`[${requestId}] Deploying workflow: ${id}`) - const { error, session, @@ -355,8 +355,6 @@ export async function DELETE( const { id } = await params try { - logger.debug(`[${requestId}] Undeploying workflow: ${id}`) - const { error, session, diff --git a/apps/sim/app/api/workflows/[id]/deployed/route.ts b/apps/sim/app/api/workflows/[id]/deployed/route.ts index 2e335e3de4..347e77eacb 100644 --- a/apps/sim/app/api/workflows/[id]/deployed/route.ts +++ b/apps/sim/app/api/workflows/[id]/deployed/route.ts @@ -21,8 +21,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ const { id } = await params try { - logger.debug(`[${requestId}] Fetching deployed state for workflow: ${id}`) - const authHeader = request.headers.get('authorization') let isInternalCall = false @@ -38,8 +36,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ const response = createErrorResponse(error.message, error.status) return addNoCacheHeaders(response) } - } else { - logger.debug(`[${requestId}] Internal API call for deployed workflow: ${id}`) } let deployedState = null @@ -52,7 +48,8 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ parallels: data.parallels, variables: data.variables, } - } catch { + } catch (error) { + logger.warn(`[${requestId}] Failed to load deployed state for workflow ${id}`, { error }) deployedState = null } diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 8c07d3b3fd..cf8431f9dc 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -22,7 +22,6 @@ import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/ev import { processInputFileFields } from '@/lib/execution/files' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' -import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { cleanupExecutionBase64Cache, hydrateUserFilesWithBase64, @@ -409,18 +408,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const shouldUseDraftState = isPublicApiAccess ? false : (useDraftState ?? auth.authType === 'session') - const workflowAuthorization = await authorizeWorkflowByWorkspacePermission({ - workflowId, - userId, - action: shouldUseDraftState ? 'write' : 'read', - }) - if (!workflowAuthorization.allowed) { - return NextResponse.json( - { error: workflowAuthorization.message || 'Access denied' }, - { status: workflowAuthorization.status } - ) - } - const streamHeader = req.headers.get('X-Stream-Response') === 'true' const enableSSE = streamHeader || streamParam === true const executionModeHeader = req.headers.get('X-Execution-Mode') @@ -455,6 +442,21 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const useAuthenticatedUserAsActor = isClientSession || (auth.authType === 'api_key' && auth.apiKeyType === 'personal') + // Authorization fetches the full workflow record and checks workspace permissions. + // Run it first so we can pass the record to preprocessing (eliminates a duplicate DB query). + const workflowAuthorization = await authorizeWorkflowByWorkspacePermission({ + workflowId, + userId, + action: shouldUseDraftState ? 'write' : 'read', + }) + if (!workflowAuthorization.allowed) { + return NextResponse.json( + { error: workflowAuthorization.message || 'Access denied' }, + { status: workflowAuthorization.status } + ) + } + + // Pass the pre-fetched workflow record to skip the redundant Step 1 DB query in preprocessing. const preprocessResult = await preprocessExecution({ workflowId, userId, @@ -465,6 +467,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: loggingSession, useDraftState: shouldUseDraftState, useAuthenticatedUserAsActor, + workflowRecord: workflowAuthorization.workflow ?? undefined, }) if (!preprocessResult.success) { @@ -514,7 +517,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: try { const workflowData = shouldUseDraftState ? await loadWorkflowFromNormalizedTables(workflowId) - : await loadDeployedWorkflowState(workflowId) + : await loadDeployedWorkflowState(workflowId, workspaceId) if (workflowData) { const deployedVariables = @@ -694,12 +697,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const executionResult = hasExecutionResult(error) ? error.executionResult : undefined - await loggingSession.safeCompleteWithError({ - totalDurationMs: executionResult?.metadata?.duration, - error: { message: errorMessage }, - traceSpans: executionResult?.logs as any, - }) - return NextResponse.json( { success: false, @@ -718,11 +715,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } finally { timeoutController.cleanup() if (executionId) { - try { - await cleanupExecutionBase64Cache(executionId) - } catch (error) { + void cleanupExecutionBase64Cache(executionId).catch((error) => { logger.error(`[${requestId}] Failed to cleanup base64 cache`, { error }) - } + }) } } } @@ -1123,15 +1118,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`, { isTimeout }) const executionResult = hasExecutionResult(error) ? error.executionResult : undefined - const { traceSpans, totalDuration } = executionResult - ? buildTraceSpans(executionResult) - : { traceSpans: [], totalDuration: 0 } - - await loggingSession.safeCompleteWithError({ - totalDurationMs: totalDuration || executionResult?.metadata?.duration, - error: { message: errorMessage }, - traceSpans, - }) sendEvent({ type: 'execution:error', diff --git a/apps/sim/app/api/workflows/[id]/route.ts b/apps/sim/app/api/workflows/[id]/route.ts index 825ac94142..140cc8ef53 100644 --- a/apps/sim/app/api/workflows/[id]/route.ts +++ b/apps/sim/app/api/workflows/[id]/route.ts @@ -77,18 +77,9 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ } } - logger.debug(`[${requestId}] Attempting to load workflow ${workflowId} from normalized tables`) const normalizedData = await loadWorkflowFromNormalizedTables(workflowId) if (normalizedData) { - logger.debug(`[${requestId}] Found normalized data for workflow ${workflowId}:`, { - blocksCount: Object.keys(normalizedData.blocks).length, - edgesCount: normalizedData.edges.length, - loopsCount: Object.keys(normalizedData.loops).length, - parallelsCount: Object.keys(normalizedData.parallels).length, - loops: normalizedData.loops, - }) - const finalWorkflowData = { ...workflowData, state: { diff --git a/apps/sim/app/api/workflows/[id]/state/route.ts b/apps/sim/app/api/workflows/[id]/state/route.ts index 1e7beaefcd..60417bf4eb 100644 --- a/apps/sim/app/api/workflows/[id]/state/route.ts +++ b/apps/sim/app/api/workflows/[id]/state/route.ts @@ -11,7 +11,7 @@ import { extractAndPersistCustomTools } from '@/lib/workflows/persistence/custom import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils' import { sanitizeAgentToolsInBlocks } from '@/lib/workflows/sanitization/validation' import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' -import type { BlockState } from '@/stores/workflows/workflow/types' +import type { BlockState, WorkflowState } from '@/stores/workflows/workflow/types' import { generateLoopBlocks, generateParallelBlocks } from '@/stores/workflows/workflow/utils' const logger = createLogger('WorkflowStateAPI') @@ -153,13 +153,15 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{ } // Sanitize custom tools in agent blocks before saving - const { blocks: sanitizedBlocks, warnings } = sanitizeAgentToolsInBlocks(state.blocks as any) + const { blocks: sanitizedBlocks, warnings } = sanitizeAgentToolsInBlocks( + state.blocks as Record + ) // Save to normalized tables // Ensure all required fields are present for WorkflowState type // Filter out blocks without type or name before saving const filteredBlocks = Object.entries(sanitizedBlocks).reduce( - (acc, [blockId, block]: [string, any]) => { + (acc, [blockId, block]: [string, BlockState]) => { if (block.type && block.name) { // Ensure all required fields are present acc[blockId] = { @@ -191,7 +193,10 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{ deployedAt: state.deployedAt, } - const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowState as any) + const saveResult = await saveWorkflowToNormalizedTables( + workflowId, + workflowState as WorkflowState + ) if (!saveResult.success) { logger.error(`[${requestId}] Failed to save workflow ${workflowId} state:`, saveResult.error) diff --git a/apps/sim/app/api/workflows/[id]/status/route.ts b/apps/sim/app/api/workflows/[id]/status/route.ts index b83dffed3a..f53fbe05e4 100644 --- a/apps/sim/app/api/workflows/[id]/status/route.ts +++ b/apps/sim/app/api/workflows/[id]/status/route.ts @@ -7,6 +7,7 @@ import { hasWorkflowChanged } from '@/lib/workflows/comparison' import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' import { validateWorkflowAccess } from '@/app/api/workflows/middleware' import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils' +import type { WorkflowState } from '@/stores/workflows/workflow/types' const logger = createLogger('WorkflowStatusAPI') @@ -64,7 +65,10 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ .limit(1) if (active?.state) { - needsRedeployment = hasWorkflowChanged(currentState as any, active.state as any) + needsRedeployment = hasWorkflowChanged( + currentState as WorkflowState, + active.state as WorkflowState + ) } } diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts index a08d8a57d3..7b6fd62576 100644 --- a/apps/sim/background/schedule-execution.ts +++ b/apps/sim/background/schedule-execution.ts @@ -40,15 +40,10 @@ async function applyScheduleUpdate( scheduleId: string, updates: WorkflowScheduleUpdate, requestId: string, - context: string, - successLog?: string + context: string ) { try { await db.update(workflowSchedule).set(updates).where(eq(workflowSchedule.id, scheduleId)) - - if (successLog) { - logger.debug(`[${requestId}] ${successLog}`) - } } catch (error) { logger.error(`[${requestId}] ${context}`, error) } @@ -132,8 +127,10 @@ async function runWorkflowExecution({ asyncTimeout?: number }): Promise { try { - logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`) - const deployedData = await loadDeployedWorkflowState(payload.workflowId) + const deployedData = await loadDeployedWorkflowState( + payload.workflowId, + workflowRecord.workspaceId ?? undefined + ) const blocks = deployedData.blocks const { deploymentVersionId } = deployedData @@ -351,8 +348,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { status: 'disabled', }, requestId, - `Failed to disable schedule ${payload.scheduleId} after authentication error`, - `Disabled schedule ${payload.scheduleId} due to authentication failure (401)` + `Failed to disable schedule ${payload.scheduleId} after authentication error` ) return } @@ -370,8 +366,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { status: 'disabled', }, requestId, - `Failed to disable schedule ${payload.scheduleId} after authorization error`, - `Disabled schedule ${payload.scheduleId} due to authorization failure (403)` + `Failed to disable schedule ${payload.scheduleId} after authorization error` ) return } @@ -386,8 +381,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { status: 'disabled', }, requestId, - `Failed to disable schedule ${payload.scheduleId} after missing workflow`, - `Disabled schedule ${payload.scheduleId} because the workflow no longer exists` + `Failed to disable schedule ${payload.scheduleId} after missing workflow` ) return } @@ -404,8 +398,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { nextRunAt: nextRetryAt, }, requestId, - `Error updating schedule ${payload.scheduleId} for rate limit`, - `Updated next retry time for schedule ${payload.scheduleId} due to rate limit` + `Error updating schedule ${payload.scheduleId} for rate limit` ) return } @@ -421,8 +414,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { nextRunAt, }, requestId, - `Error updating schedule ${payload.scheduleId} after usage limit check`, - `Scheduled next run for ${payload.scheduleId} after usage limit` + `Error updating schedule ${payload.scheduleId} after usage limit check` ) } return @@ -450,8 +442,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { status: shouldDisable ? 'disabled' : 'active', }, requestId, - `Error updating schedule ${payload.scheduleId} after preprocessing failure`, - `Updated schedule ${payload.scheduleId} after preprocessing failure` + `Error updating schedule ${payload.scheduleId} after preprocessing failure` ) return } @@ -503,8 +494,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { lastQueuedAt: null, }, requestId, - `Error updating schedule ${payload.scheduleId} after success`, - `Updated next run time for workflow ${payload.workflowId} to ${nextRunAt.toISOString()}` + `Error updating schedule ${payload.scheduleId} after success` ) return } @@ -531,8 +521,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { status: shouldDisable ? 'disabled' : 'active', }, requestId, - `Error updating schedule ${payload.scheduleId} after failure`, - `Updated schedule ${payload.scheduleId} after failure` + `Error updating schedule ${payload.scheduleId} after failure` ) } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error) @@ -550,8 +539,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { nextRunAt: nextRetryAt, }, requestId, - `Error updating schedule ${payload.scheduleId} for service overload`, - `Updated schedule ${payload.scheduleId} retry time due to service overload` + `Error updating schedule ${payload.scheduleId} for service overload` ) return } @@ -578,8 +566,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { status: shouldDisable ? 'disabled' : 'active', }, requestId, - `Error updating schedule ${payload.scheduleId} after execution error`, - `Updated schedule ${payload.scheduleId} after execution error` + `Error updating schedule ${payload.scheduleId} after execution error` ) } } catch (error: unknown) { diff --git a/apps/sim/background/webhook-execution.ts b/apps/sim/background/webhook-execution.ts index ce0d8d1b38..5113dc075c 100644 --- a/apps/sim/background/webhook-execution.ts +++ b/apps/sim/background/webhook-execution.ts @@ -440,7 +440,6 @@ async function executeWebhookJobInternal( const triggerConfig = getTrigger(resolvedTriggerId) if (triggerConfig.outputs) { - logger.debug(`[${requestId}] Processing trigger ${resolvedTriggerId} file outputs`) const processedInput = await processTriggerFileOutputs(input, triggerConfig.outputs, { workspaceId, workflowId: payload.workflowId, @@ -450,8 +449,6 @@ async function executeWebhookJobInternal( }) safeAssign(input, processedInput as Record) } - } else { - logger.debug(`[${requestId}] No valid triggerId found for block ${payload.blockId}`) } } catch (error) { logger.error(`[${requestId}] Error processing trigger file outputs:`, error) @@ -469,7 +466,6 @@ async function executeWebhookJobInternal( name: string type: 'string' | 'number' | 'boolean' | 'object' | 'array' | 'file[]' }> - logger.debug(`[${requestId}] Processing generic webhook files from inputFormat`) const fileFields = inputFormat.filter((field) => field.type === 'file[]') diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts index ce4f3b7379..8183f5a5ea 100644 --- a/apps/sim/background/workflow-execution.ts +++ b/apps/sim/background/workflow-execution.ts @@ -7,7 +7,6 @@ import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' -import { getWorkflowById } from '@/lib/workflows/utils' import { ExecutionSnapshot } from '@/executor/execution/snapshot' import type { ExecutionMetadata } from '@/executor/execution/types' import { hasExecutionResult } from '@/executor/utils/errors' @@ -79,10 +78,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { variables: {}, }) - const workflow = await getWorkflowById(workflowId) - if (!workflow) { - throw new Error(`Workflow ${workflowId} not found after preprocessing`) - } + const workflow = preprocessResult.workflowRecord! const metadata: ExecutionMetadata = { requestId, diff --git a/apps/sim/lib/audit/log.ts b/apps/sim/lib/audit/log.ts index 53094f7bf7..a4300b82bd 100644 --- a/apps/sim/lib/audit/log.ts +++ b/apps/sim/lib/audit/log.ts @@ -215,7 +215,7 @@ async function insertAuditLog(params: AuditLogParams): Promise { actorName = row?.name ?? undefined actorEmail = row?.email ?? undefined } catch (error) { - logger.debug('Failed to resolve actor info', { error, actorId: params.actorId }) + logger.warn('Failed to resolve actor info', { error, actorId: params.actorId }) } } diff --git a/apps/sim/lib/billing/calculations/usage-monitor.ts b/apps/sim/lib/billing/calculations/usage-monitor.ts index 6da277a80d..b6e2612d83 100644 --- a/apps/sim/lib/billing/calculations/usage-monitor.ts +++ b/apps/sim/lib/billing/calculations/usage-monitor.ts @@ -2,6 +2,7 @@ import { db } from '@sim/db' import { member, organization, userStats } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, inArray } from 'drizzle-orm' +import type { HighestPrioritySubscription } from '@/lib/billing/core/plan' import { getUserUsageLimit } from '@/lib/billing/core/usage' import { isBillingEnabled } from '@/lib/core/config/feature-flags' @@ -21,7 +22,10 @@ interface UsageData { * Checks a user's cost usage against their subscription plan limit * and returns usage information including whether they're approaching the limit */ -export async function checkUsageStatus(userId: string): Promise { +export async function checkUsageStatus( + userId: string, + preloadedSubscription?: HighestPrioritySubscription +): Promise { try { // If billing is disabled, always return permissive limits if (!isBillingEnabled) { @@ -42,7 +46,7 @@ export async function checkUsageStatus(userId: string): Promise { } // Get usage limit from user_stats (per-user cap) - const limit = await getUserUsageLimit(userId) + const limit = await getUserUsageLimit(userId, preloadedSubscription) logger.info('Using stored usage limit', { userId, limit }) // Get actual usage from the database @@ -228,7 +232,10 @@ export async function checkAndNotifyUsage(userId: string): Promise { * @param userId The ID of the user to check * @returns An object containing the exceeded status and usage details */ -export async function checkServerSideUsageLimits(userId: string): Promise<{ +export async function checkServerSideUsageLimits( + userId: string, + preloadedSubscription?: HighestPrioritySubscription +): Promise<{ isExceeded: boolean currentUsage: number limit: number @@ -314,7 +321,7 @@ export async function checkServerSideUsageLimits(userId: string): Promise<{ } } - const usageData = await checkUsageStatus(userId) + const usageData = await checkUsageStatus(userId, preloadedSubscription) return { isExceeded: usageData.isExceeded, diff --git a/apps/sim/lib/billing/core/plan.ts b/apps/sim/lib/billing/core/plan.ts index 8af735542b..6224b0986e 100644 --- a/apps/sim/lib/billing/core/plan.ts +++ b/apps/sim/lib/billing/core/plan.ts @@ -6,6 +6,8 @@ import { checkEnterprisePlan, checkProPlan, checkTeamPlan } from '@/lib/billing/ const logger = createLogger('PlanLookup') +export type HighestPrioritySubscription = Awaited> + /** * Get the highest priority active subscription for a user * Priority: Enterprise > Team > Pro > Free diff --git a/apps/sim/lib/billing/core/usage.ts b/apps/sim/lib/billing/core/usage.ts index beefdc2610..e5bf1354d1 100644 --- a/apps/sim/lib/billing/core/usage.ts +++ b/apps/sim/lib/billing/core/usage.ts @@ -7,7 +7,10 @@ import { renderFreeTierUpgradeEmail, renderUsageThresholdEmail, } from '@/components/emails' -import { getHighestPrioritySubscription } from '@/lib/billing/core/plan' +import { + getHighestPrioritySubscription, + type HighestPrioritySubscription, +} from '@/lib/billing/core/plan' import { canEditUsageLimit, getFreeTierLimit, @@ -352,8 +355,14 @@ export async function updateUserUsageLimit( * Free/Pro: Individual user limit from userStats * Team/Enterprise: Organization limit */ -export async function getUserUsageLimit(userId: string): Promise { - const subscription = await getHighestPrioritySubscription(userId) +export async function getUserUsageLimit( + userId: string, + preloadedSubscription?: HighestPrioritySubscription +): Promise { + const subscription = + preloadedSubscription !== undefined + ? preloadedSubscription + : await getHighestPrioritySubscription(userId) if (!subscription || subscription.plan === 'free' || subscription.plan === 'pro') { // Free/Pro: Use individual limit from userStats diff --git a/apps/sim/lib/billing/organizations/membership.ts b/apps/sim/lib/billing/organizations/membership.ts index 5fee8bb5cf..5d26cef92e 100644 --- a/apps/sim/lib/billing/organizations/membership.ts +++ b/apps/sim/lib/billing/organizations/membership.ts @@ -154,7 +154,6 @@ export async function restoreUserProSubscription(userId: string): Promise> +type SubscriptionInfo = HighestPrioritySubscription export async function preprocessExecution( options: PreprocessExecutionOptions @@ -84,6 +87,7 @@ export async function preprocessExecution( loggingSession: providedLoggingSession, isResumeContext: _isResumeContext = false, useAuthenticatedUserAsActor = false, + workflowRecord: prefetchedWorkflowRecord, } = options logger.info(`[${requestId}] Starting execution preprocessing`, { @@ -94,58 +98,69 @@ export async function preprocessExecution( }) // ========== STEP 1: Validate Workflow Exists ========== - let workflowRecord: WorkflowRecord | null = null - try { - const records = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1) + if (prefetchedWorkflowRecord && prefetchedWorkflowRecord.id !== workflowId) { + logger.error(`[${requestId}] Prefetched workflow record ID mismatch`, { + expected: workflowId, + received: prefetchedWorkflowRecord.id, + }) + throw new Error( + `Prefetched workflow record ID mismatch: expected ${workflowId}, got ${prefetchedWorkflowRecord.id}` + ) + } + let workflowRecord: WorkflowRecord | null = prefetchedWorkflowRecord ?? null + if (!workflowRecord) { + try { + const records = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1) + + if (records.length === 0) { + logger.warn(`[${requestId}] Workflow not found: ${workflowId}`) + + await logPreprocessingError({ + workflowId, + executionId, + triggerType, + requestId, + userId: 'unknown', + workspaceId: '', + errorMessage: + 'Workflow not found. The workflow may have been deleted or is no longer accessible.', + loggingSession: providedLoggingSession, + }) + + return { + success: false, + error: { + message: 'Workflow not found', + statusCode: 404, + logCreated: true, + }, + } + } - if (records.length === 0) { - logger.warn(`[${requestId}] Workflow not found: ${workflowId}`) + workflowRecord = records[0] + } catch (error) { + logger.error(`[${requestId}] Error fetching workflow`, { error, workflowId }) await logPreprocessingError({ workflowId, executionId, triggerType, requestId, - userId: 'unknown', - workspaceId: '', - errorMessage: - 'Workflow not found. The workflow may have been deleted or is no longer accessible.', + userId: userId || 'unknown', + workspaceId: providedWorkspaceId || '', + errorMessage: 'Internal error while fetching workflow', loggingSession: providedLoggingSession, }) return { success: false, error: { - message: 'Workflow not found', - statusCode: 404, + message: 'Internal error while fetching workflow', + statusCode: 500, logCreated: true, }, } } - - workflowRecord = records[0] - } catch (error) { - logger.error(`[${requestId}] Error fetching workflow`, { error, workflowId }) - - await logPreprocessingError({ - workflowId, - executionId, - triggerType, - requestId, - userId: userId || 'unknown', - workspaceId: providedWorkspaceId || '', - errorMessage: 'Internal error while fetching workflow', - loggingSession: providedLoggingSession, - }) - - return { - success: false, - error: { - message: 'Internal error while fetching workflow', - statusCode: 500, - logCreated: true, - }, - } } const workspaceId = workflowRecord.workspaceId || providedWorkspaceId || '' @@ -249,38 +264,23 @@ export async function preprocessExecution( } } - // ========== STEP 4: Get User Subscription ========== - let userSubscription: SubscriptionInfo = null - try { - userSubscription = await getHighestPrioritySubscription(actorUserId) - logger.debug(`[${requestId}] User subscription retrieved`, { - actorUserId, - hasSub: !!userSubscription, - plan: userSubscription?.plan, - }) - } catch (error) { - logger.error(`[${requestId}] Error fetching subscription`, { error, actorUserId }) - } - - // ========== STEP 5: Check Rate Limits ========== - let rateLimitInfo: { allowed: boolean; remaining: number; resetAt: Date } | undefined + // ========== STEP 4: Get Subscription ========== + const userSubscription = await getHighestPrioritySubscription(actorUserId) - if (checkRateLimit) { + // ========== STEP 5: Check Usage Limits ========== + if (!skipUsageLimits) { try { - const rateLimiter = new RateLimiter() - rateLimitInfo = await rateLimiter.checkRateLimitWithSubscription( - actorUserId, - userSubscription, - triggerType, - false // not async - ) - - if (!rateLimitInfo.allowed) { - logger.warn(`[${requestId}] Rate limit exceeded for user ${actorUserId}`, { - triggerType, - remaining: rateLimitInfo.remaining, - resetAt: rateLimitInfo.resetAt, - }) + const usageCheck = await checkServerSideUsageLimits(actorUserId, userSubscription) + if (usageCheck.isExceeded) { + logger.warn( + `[${requestId}] User ${actorUserId} has exceeded usage limits. Blocking execution.`, + { + currentUsage: usageCheck.currentUsage, + limit: usageCheck.limit, + workflowId, + triggerType, + } + ) await logPreprocessingError({ workflowId, @@ -289,25 +289,27 @@ export async function preprocessExecution( requestId, userId: actorUserId, workspaceId, - errorMessage: `Rate limit exceeded. ${rateLimitInfo.remaining} requests remaining. Resets at ${rateLimitInfo.resetAt.toISOString()}.`, + errorMessage: + usageCheck.message || + `Usage limit exceeded: $${usageCheck.currentUsage?.toFixed(2)} used of $${usageCheck.limit?.toFixed(2)} limit. Please upgrade your plan to continue.`, loggingSession: providedLoggingSession, }) return { success: false, error: { - message: `Rate limit exceeded. Please try again later.`, - statusCode: 429, + message: + usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.', + statusCode: 402, logCreated: true, }, } } - - logger.debug(`[${requestId}] Rate limit check passed`, { - remaining: rateLimitInfo.remaining, - }) } catch (error) { - logger.error(`[${requestId}] Error checking rate limits`, { error, actorUserId }) + logger.error(`[${requestId}] Error checking usage limits`, { + error, + actorUserId, + }) await logPreprocessingError({ workflowId, @@ -316,14 +318,15 @@ export async function preprocessExecution( requestId, userId: actorUserId, workspaceId, - errorMessage: 'Error checking rate limits. Execution blocked for safety.', + errorMessage: + 'Unable to determine usage limits. Execution blocked for security. Please contact support.', loggingSession: providedLoggingSession, }) return { success: false, error: { - message: 'Error checking rate limits', + message: 'Unable to determine usage limits. Execution blocked for security.', statusCode: 500, logCreated: true, }, @@ -331,21 +334,25 @@ export async function preprocessExecution( } } - // ========== STEP 6: Check Usage Limits (CRITICAL) ========== - if (!skipUsageLimits) { + // ========== STEP 6: Check Rate Limits ========== + let rateLimitInfo: { allowed: boolean; remaining: number; resetAt: Date } | undefined + + if (checkRateLimit) { try { - const usageCheck = await checkServerSideUsageLimits(actorUserId) + const rateLimiter = new RateLimiter() + rateLimitInfo = await rateLimiter.checkRateLimitWithSubscription( + actorUserId, + userSubscription, + triggerType, + false // not async + ) - if (usageCheck.isExceeded) { - logger.warn( - `[${requestId}] User ${actorUserId} has exceeded usage limits. Blocking execution.`, - { - currentUsage: usageCheck.currentUsage, - limit: usageCheck.limit, - workflowId, - triggerType, - } - ) + if (!rateLimitInfo.allowed) { + logger.warn(`[${requestId}] Rate limit exceeded for user ${actorUserId}`, { + triggerType, + remaining: rateLimitInfo.remaining, + resetAt: rateLimitInfo.resetAt, + }) await logPreprocessingError({ workflowId, @@ -354,29 +361,21 @@ export async function preprocessExecution( requestId, userId: actorUserId, workspaceId, - errorMessage: - usageCheck.message || - `Usage limit exceeded: $${usageCheck.currentUsage?.toFixed(2)} used of $${usageCheck.limit?.toFixed(2)} limit. Please upgrade your plan to continue.`, + errorMessage: `Rate limit exceeded. ${rateLimitInfo.remaining} requests remaining. Resets at ${rateLimitInfo.resetAt.toISOString()}.`, loggingSession: providedLoggingSession, }) return { success: false, error: { - message: - usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.', - statusCode: 402, + message: `Rate limit exceeded. Please try again later.`, + statusCode: 429, logCreated: true, }, } } - - logger.debug(`[${requestId}] Usage limit check passed`, { - currentUsage: usageCheck.currentUsage, - limit: usageCheck.limit, - }) } catch (error) { - logger.error(`[${requestId}] Error checking usage limits`, { error, actorUserId }) + logger.error(`[${requestId}] Error checking rate limits`, { error, actorUserId }) await logPreprocessingError({ workflowId, @@ -385,22 +384,19 @@ export async function preprocessExecution( requestId, userId: actorUserId, workspaceId, - errorMessage: - 'Unable to determine usage limits. Execution blocked for security. Please contact support.', + errorMessage: 'Error checking rate limits. Execution blocked for safety.', loggingSession: providedLoggingSession, }) return { success: false, error: { - message: 'Unable to determine usage limits. Execution blocked for security.', + message: 'Error checking rate limits', statusCode: 500, logCreated: true, }, } } - } else { - logger.debug(`[${requestId}] Skipping usage limits check (test mode)`) } // ========== SUCCESS: All Checks Passed ========== @@ -461,7 +457,7 @@ async function logPreprocessingError(params: { try { const session = - loggingSession || new LoggingSession(workflowId, executionId, triggerType as any, requestId) + loggingSession || new LoggingSession(workflowId, executionId, triggerType, requestId) await session.safeStart({ userId, @@ -477,11 +473,6 @@ async function logPreprocessingError(params: { traceSpans: [], skipCost: true, // Preprocessing errors should not charge - no execution occurred }) - - logger.debug(`[${requestId}] Logged preprocessing error to database`, { - workflowId, - executionId, - }) } catch (error) { logger.error(`[${requestId}] Failed to log preprocessing error`, { error, diff --git a/apps/sim/lib/guardrails/validate_hallucination.ts b/apps/sim/lib/guardrails/validate_hallucination.ts index 658a528fcc..0db2c0b573 100644 --- a/apps/sim/lib/guardrails/validate_hallucination.ts +++ b/apps/sim/lib/guardrails/validate_hallucination.ts @@ -54,12 +54,6 @@ async function queryKnowledgeBase( authHeaders?: { cookie?: string; authorization?: string } ): Promise { try { - logger.info(`[${requestId}] Querying knowledge base`, { - knowledgeBaseId, - query: query.substring(0, 100), - topK, - }) - // Call the knowledge base search API directly const searchUrl = `${getInternalApiBaseUrl()}/api/knowledge/search` @@ -90,8 +84,6 @@ async function queryKnowledgeBase( const chunks = results.map((r: any) => r.content || '').filter((c: string) => c.length > 0) - logger.info(`[${requestId}] Retrieved ${chunks.length} chunks from knowledge base`) - return chunks } catch (error: any) { logger.error(`[${requestId}] Error querying knowledge base`, { @@ -194,7 +186,6 @@ Evaluate the consistency and provide your score and reasoning in JSON format.` } const content = response.content.trim() - logger.debug(`[${requestId}] LLM response:`, { content }) let jsonContent = content diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index 2bc62d4ebe..c9e2fb8d65 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -359,9 +359,7 @@ export class ExecutionLogger implements IExecutionLoggerService { .leftJoin(userStats, eq(member.userId, userStats.userId)) .where(eq(member.organizationId, sub.referenceId)) .limit(1) - const orgUsageBeforeNum = Number.parseFloat( - (orgUsageBefore as any)?.toString?.() || '0' - ) + const orgUsageBeforeNum = Number.parseFloat(String(orgUsageBefore ?? '0')) await this.updateUserStats( updatedLog.workflowId, @@ -433,7 +431,7 @@ export class ExecutionLogger implements IExecutionLoggerService { endedAt: updatedLog.endedAt?.toISOString() || endedAt, totalDurationMs: updatedLog.totalDurationMs || totalDurationMs, executionData: updatedLog.executionData as WorkflowExecutionLog['executionData'], - cost: updatedLog.cost as any, + cost: updatedLog.cost as WorkflowExecutionLog['cost'], createdAt: updatedLog.createdAt.toISOString(), } @@ -467,7 +465,7 @@ export class ExecutionLogger implements IExecutionLoggerService { endedAt: workflowLog.endedAt?.toISOString() || workflowLog.startedAt.toISOString(), totalDurationMs: workflowLog.totalDurationMs || 0, executionData: workflowLog.executionData as WorkflowExecutionLog['executionData'], - cost: workflowLog.cost as any, + cost: workflowLog.cost as WorkflowExecutionLog['cost'], createdAt: workflowLog.createdAt.toISOString(), } } diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index c0e943ee82..10a4fc577e 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -89,6 +89,10 @@ export class LoggingSession { private workflowState?: WorkflowState private isResume = false private completed = false + /** Synchronous flag to prevent concurrent completion attempts (race condition guard) */ + private completing = false + /** Tracks the in-flight completion promise so callers can await it */ + private completionPromise: Promise | null = null private accumulatedCost: AccumulatedCost = { total: BASE_EXECUTION_CHARGE, input: 0, @@ -186,7 +190,7 @@ export class LoggingSession { .limit(1) if (existing?.cost) { - const cost = existing.cost as any + const cost = existing.cost as AccumulatedCost this.accumulatedCost = { total: cost.total || BASE_EXECUTION_CHARGE, input: cost.input || 0, @@ -235,18 +239,9 @@ export class LoggingSession { workflowState: this.workflowState, deploymentVersionId, }) - - if (this.requestId) { - logger.debug(`[${this.requestId}] Started logging for execution ${this.executionId}`) - } } else { this.isResume = true await this.loadExistingCost() - if (this.requestId) { - logger.debug( - `[${this.requestId}] Resuming logging for existing execution ${this.executionId}` - ) - } } } catch (error) { if (this.requestId) { @@ -256,20 +251,11 @@ export class LoggingSession { } } - /** - * Set up logging on an executor instance - * Note: Logging now works through trace spans only, no direct executor integration needed - */ - setupExecutor(executor: any): void { - if (this.requestId) { - logger.debug(`[${this.requestId}] Logging session ready for execution ${this.executionId}`) - } - } - async complete(params: SessionCompleteParams = {}): Promise { - if (this.completed) { + if (this.completed || this.completing) { return } + this.completing = true const { endedAt, totalDurationMs, finalOutput, traceSpans, workflowInput, executionState } = params @@ -336,11 +322,8 @@ export class LoggingSession { // Silently fail } } - - if (this.requestId) { - logger.debug(`[${this.requestId}] Completed logging for execution ${this.executionId}`) - } } catch (error) { + this.completing = false logger.error(`Failed to complete logging for execution ${this.executionId}:`, { requestId: this.requestId, workflowId: this.workflowId, @@ -353,9 +336,10 @@ export class LoggingSession { } async completeWithError(params: SessionErrorCompleteParams = {}): Promise { - if (this.completed) { + if (this.completed || this.completing) { return } + this.completing = true try { const { endedAt, totalDurationMs, error, traceSpans, skipCost } = params @@ -455,6 +439,7 @@ export class LoggingSession { ) } } catch (enhancedError) { + this.completing = false logger.error(`Failed to complete error logging for execution ${this.executionId}:`, { requestId: this.requestId, workflowId: this.workflowId, @@ -467,9 +452,10 @@ export class LoggingSession { } async completeWithCancellation(params: SessionCancelledParams = {}): Promise { - if (this.completed) { + if (this.completed || this.completing) { return } + this.completing = true try { const { endedAt, totalDurationMs, traceSpans } = params @@ -540,6 +526,7 @@ export class LoggingSession { ) } } catch (cancelError) { + this.completing = false logger.error(`Failed to complete cancelled logging for execution ${this.executionId}:`, { requestId: this.requestId, workflowId: this.workflowId, @@ -686,7 +673,28 @@ export class LoggingSession { } } + /** + * Wait for any in-flight fire-and-forget completion to finish. + * Called internally by markAsFailed to ensure completion has settled + * before overwriting execution status. + */ + async waitForCompletion(): Promise { + if (this.completionPromise) { + try { + await this.completionPromise + } catch { + /* already handled by safe* wrapper */ + } + } + } + async safeComplete(params: SessionCompleteParams = {}): Promise { + if (this.completionPromise) return this.completionPromise + this.completionPromise = this._safeCompleteImpl(params) + return this.completionPromise + } + + private async _safeCompleteImpl(params: SessionCompleteParams = {}): Promise { try { await this.complete(params) } catch (error) { @@ -706,6 +714,12 @@ export class LoggingSession { } async safeCompleteWithError(params?: SessionErrorCompleteParams): Promise { + if (this.completionPromise) return this.completionPromise + this.completionPromise = this._safeCompleteWithErrorImpl(params) + return this.completionPromise + } + + private async _safeCompleteWithErrorImpl(params?: SessionErrorCompleteParams): Promise { try { await this.completeWithError(params) } catch (error) { @@ -727,6 +741,12 @@ export class LoggingSession { } async safeCompleteWithCancellation(params?: SessionCancelledParams): Promise { + if (this.completionPromise) return this.completionPromise + this.completionPromise = this._safeCompleteWithCancellationImpl(params) + return this.completionPromise + } + + private async _safeCompleteWithCancellationImpl(params?: SessionCancelledParams): Promise { try { await this.completeWithCancellation(params) } catch (error) { @@ -747,6 +767,12 @@ export class LoggingSession { } async safeCompleteWithPause(params?: SessionPausedParams): Promise { + if (this.completionPromise) return this.completionPromise + this.completionPromise = this._safeCompleteWithPauseImpl(params) + return this.completionPromise + } + + private async _safeCompleteWithPauseImpl(params?: SessionPausedParams): Promise { try { await this.completeWithPause(params) } catch (error) { @@ -767,6 +793,7 @@ export class LoggingSession { } async markAsFailed(errorMessage?: string): Promise { + await this.waitForCompletion() await LoggingSession.markExecutionAsFailed(this.executionId, errorMessage, this.requestId) } @@ -810,9 +837,10 @@ export class LoggingSession { isError: boolean status?: 'completed' | 'failed' | 'cancelled' | 'pending' }): Promise { - if (this.completed) { + if (this.completed || this.completing) { return } + this.completing = true logger.warn( `[${this.requestId || 'unknown'}] Logging completion failed for execution ${this.executionId} - attempting cost-only fallback` @@ -851,6 +879,7 @@ export class LoggingSession { `[${this.requestId || 'unknown'}] Cost-only fallback succeeded for execution ${this.executionId}` ) } catch (fallbackError) { + this.completing = false logger.error( `[${this.requestId || 'unknown'}] Cost-only fallback also failed for execution ${this.executionId}:`, { error: fallbackError instanceof Error ? fallbackError.message : String(fallbackError) } diff --git a/apps/sim/lib/logs/execution/snapshot/service.ts b/apps/sim/lib/logs/execution/snapshot/service.ts index a2f4d0fff1..f6822b74a0 100644 --- a/apps/sim/lib/logs/execution/snapshot/service.ts +++ b/apps/sim/lib/logs/execution/snapshot/service.ts @@ -82,30 +82,6 @@ export class SnapshotService implements ISnapshotService { } } - async getSnapshotByHash( - workflowId: string, - hash: string - ): Promise { - const [snapshot] = await db - .select() - .from(workflowExecutionSnapshots) - .where( - and( - eq(workflowExecutionSnapshots.workflowId, workflowId), - eq(workflowExecutionSnapshots.stateHash, hash) - ) - ) - .limit(1) - - if (!snapshot) return null - - return { - ...snapshot, - stateData: snapshot.stateData as WorkflowState, - createdAt: snapshot.createdAt.toISOString(), - } - } - computeStateHash(state: WorkflowState): string { const normalizedState = normalizeWorkflowState(state) const stateString = normalizedStringify(normalizedState) diff --git a/apps/sim/lib/logs/search-suggestions.ts b/apps/sim/lib/logs/search-suggestions.ts index 907ac4c1de..b59137e7de 100644 --- a/apps/sim/lib/logs/search-suggestions.ts +++ b/apps/sim/lib/logs/search-suggestions.ts @@ -433,7 +433,7 @@ export class SearchSuggestions { value: `date:${partial}`, label: `${this.formatDateLabel(startDate)} to ${this.formatDateLabel(endDate)}`, description: 'Custom date range', - category: 'date' as any, + category: 'date', }) return suggestions } @@ -446,7 +446,7 @@ export class SearchSuggestions { value: `date:${startDate}..`, label: `${this.formatDateLabel(startDate)} to ...`, description: 'Type end date (YYYY-MM-DD)', - category: 'date' as any, + category: 'date', }) return suggestions } @@ -458,7 +458,7 @@ export class SearchSuggestions { value: `date:${partial}`, label: `Year ${partial}`, description: 'All logs from this year', - category: 'date' as any, + category: 'date', }) return suggestions } @@ -486,7 +486,7 @@ export class SearchSuggestions { value: `date:${partial}`, label: `${monthName} ${year}`, description: 'All logs from this month', - category: 'date' as any, + category: 'date', }) return suggestions } @@ -500,7 +500,7 @@ export class SearchSuggestions { value: `date:${partial}`, label: this.formatDateLabel(partial), description: 'Single date', - category: 'date' as any, + category: 'date', }) // Also suggest starting a range suggestions.push({ @@ -508,7 +508,7 @@ export class SearchSuggestions { value: `date:${partial}..`, label: `${this.formatDateLabel(partial)} to ...`, description: 'Start a date range', - category: 'date' as any, + category: 'date', }) } return suggestions @@ -521,7 +521,7 @@ export class SearchSuggestions { value: `date:${partial}`, label: partial, description: 'Continue typing: YYYY, YYYY-MM, or YYYY-MM-DD', - category: 'date' as any, + category: 'date', }) } diff --git a/apps/sim/lib/logs/types.ts b/apps/sim/lib/logs/types.ts index d2775d1ca6..9996dbefec 100644 --- a/apps/sim/lib/logs/types.ts +++ b/apps/sim/lib/logs/types.ts @@ -337,7 +337,6 @@ export interface BatchInsertResult { export interface SnapshotService { createSnapshot(workflowId: string, state: WorkflowState): Promise getSnapshot(id: string): Promise - getSnapshotByHash(workflowId: string, hash: string): Promise computeStateHash(state: WorkflowState): string cleanupOrphanedSnapshots(olderThanDays: number): Promise } diff --git a/apps/sim/lib/mcp/middleware.ts b/apps/sim/lib/mcp/middleware.ts index b342f1cef9..9206cbc7ff 100644 --- a/apps/sim/lib/mcp/middleware.ts +++ b/apps/sim/lib/mcp/middleware.ts @@ -71,9 +71,7 @@ async function validateMcpAuth( workspaceId = body.workspaceId ;(request as any)._parsedBody = body } - } catch (error) { - logger.debug(`[${requestId}] Could not parse request body for workspaceId extraction`) - } + } catch {} } if (!workspaceId) { diff --git a/apps/sim/lib/mcp/service.ts b/apps/sim/lib/mcp/service.ts index 67801f85f1..00ca59267b 100644 --- a/apps/sim/lib/mcp/service.ts +++ b/apps/sim/lib/mcp/service.ts @@ -318,7 +318,6 @@ class McpService { try { const cached = await this.cacheAdapter.get(cacheKey) if (cached) { - logger.debug(`[${requestId}] Using cached tools for user ${userId}`) return cached.tools } } catch (error) { diff --git a/apps/sim/lib/mcp/workflow-mcp-sync.ts b/apps/sim/lib/mcp/workflow-mcp-sync.ts index 4205043cec..6942453086 100644 --- a/apps/sim/lib/mcp/workflow-mcp-sync.ts +++ b/apps/sim/lib/mcp/workflow-mcp-sync.ts @@ -68,7 +68,6 @@ export async function syncMcpToolsForWorkflow(options: SyncOptions): Promise 0 && emails[0].historyId) { latestHistoryId = emails[0].historyId - logger.debug(`[${requestId}] Updated historyId to ${latestHistoryId}`) } return { emails, latestHistoryId } @@ -704,10 +696,6 @@ async function processEmails( ...(config.includeRawEmail ? { rawEmail: email } : {}), } - logger.debug( - `[${requestId}] Sending ${config.includeRawEmail ? 'simplified + raw' : 'simplified'} email payload for ${email.id}` - ) - const webhookUrl = `${getInternalApiBaseUrl()}/api/webhooks/trigger/${webhookData.path}` const response = await fetch(webhookUrl, { diff --git a/apps/sim/lib/webhooks/imap-polling-service.ts b/apps/sim/lib/webhooks/imap-polling-service.ts index 37fc4c6217..28020426e3 100644 --- a/apps/sim/lib/webhooks/imap-polling-service.ts +++ b/apps/sim/lib/webhooks/imap-polling-service.ts @@ -285,7 +285,6 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string) { try { await client.connect() - logger.debug(`[${requestId}] Connected to IMAP server ${config.host}`) const maxEmails = config.maxEmailsPerPoll || 25 let totalEmailsCollected = 0 @@ -295,7 +294,6 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string) { try { const mailbox = await client.mailboxOpen(mailboxPath) - logger.debug(`[${requestId}] Opened mailbox: ${mailbox.path}, exists: ${mailbox.exists}`) // Parse search criteria - expects JSON object from UI let searchCriteria: any = { unseen: true } @@ -335,14 +333,10 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string) { const searchResult = await client.search(searchCriteria, { uid: true }) messageUids = searchResult === false ? [] : searchResult } catch (searchError) { - logger.debug( - `[${requestId}] Search returned no messages for ${mailboxPath}: ${searchError}` - ) continue } if (messageUids.length === 0) { - logger.debug(`[${requestId}] No messages matching criteria in ${mailboxPath}`) continue } @@ -357,8 +351,6 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string) { ) } - logger.info(`[${requestId}] Processing ${uidsToProcess.length} emails from ${mailboxPath}`) - for await (const msg of client.fetch( uidsToProcess, { @@ -384,7 +376,6 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string) { } await client.logout() - logger.debug(`[${requestId}] Disconnected from IMAP server`) return { emails, latestUidByMailbox } } catch (error) { diff --git a/apps/sim/lib/webhooks/processor.ts b/apps/sim/lib/webhooks/processor.ts index 047d862ef7..5abfd9f651 100644 --- a/apps/sim/lib/webhooks/processor.ts +++ b/apps/sim/lib/webhooks/processor.ts @@ -107,7 +107,6 @@ export async function parseWebhookBody( // Allow empty body - some webhooks send empty payloads if (!rawBody || rawBody.length === 0) { - logger.debug(`[${requestId}] Received request with empty body, treating as empty object`) return { body: {}, rawBody: '' } } } catch (bodyError) { @@ -127,19 +126,15 @@ export async function parseWebhookBody( if (payloadString) { body = JSON.parse(payloadString) - logger.debug(`[${requestId}] Parsed form-encoded GitHub webhook payload`) } else { body = Object.fromEntries(formData.entries()) - logger.debug(`[${requestId}] Parsed form-encoded webhook data (direct fields)`) } } else { body = JSON.parse(rawBody) - logger.debug(`[${requestId}] Parsed JSON webhook payload`) } // Allow empty JSON objects - some webhooks send empty payloads if (Object.keys(body).length === 0) { - logger.debug(`[${requestId}] Received empty JSON object`) } } catch (parseError) { logger.error(`[${requestId}] Failed to parse webhook body`, { @@ -499,8 +494,6 @@ export async function verifyProviderAuth( logger.warn(`[${requestId}] Microsoft Teams HMAC signature verification failed`) return new NextResponse('Unauthorized - Invalid HMAC signature', { status: 401 }) } - - logger.debug(`[${requestId}] Microsoft Teams HMAC signature verified successfully`) } } @@ -578,8 +571,6 @@ export async function verifyProviderAuth( }) return new NextResponse('Unauthorized - Invalid Twilio signature', { status: 401 }) } - - logger.debug(`[${requestId}] Twilio Voice signature verified successfully`) } } @@ -603,8 +594,6 @@ export async function verifyProviderAuth( }) return new NextResponse('Unauthorized - Invalid Typeform signature', { status: 401 }) } - - logger.debug(`[${requestId}] Typeform signature verified successfully`) } } @@ -628,8 +617,6 @@ export async function verifyProviderAuth( }) return new NextResponse('Unauthorized - Invalid Linear signature', { status: 401 }) } - - logger.debug(`[${requestId}] Linear signature verified successfully`) } } @@ -653,8 +640,6 @@ export async function verifyProviderAuth( }) return new NextResponse('Unauthorized - Invalid Circleback signature', { status: 401 }) } - - logger.debug(`[${requestId}] Circleback signature verified successfully`) } } @@ -678,8 +663,6 @@ export async function verifyProviderAuth( }) return new NextResponse('Unauthorized - Invalid Cal.com signature', { status: 401 }) } - - logger.debug(`[${requestId}] Cal.com signature verified successfully`) } } @@ -703,8 +686,6 @@ export async function verifyProviderAuth( }) return new NextResponse('Unauthorized - Invalid Jira signature', { status: 401 }) } - - logger.debug(`[${requestId}] Jira signature verified successfully`) } } @@ -728,8 +709,6 @@ export async function verifyProviderAuth( }) return new NextResponse('Unauthorized - Invalid Confluence signature', { status: 401 }) } - - logger.debug(`[${requestId}] Confluence signature verified successfully`) } } @@ -757,10 +736,6 @@ export async function verifyProviderAuth( }) return new NextResponse('Unauthorized - Invalid GitHub signature', { status: 401 }) } - - logger.debug(`[${requestId}] GitHub signature verified successfully`, { - usingSha256: !!signature256, - }) } } @@ -784,8 +759,6 @@ export async function verifyProviderAuth( }) return new NextResponse('Unauthorized - Invalid Fireflies signature', { status: 401 }) } - - logger.debug(`[${requestId}] Fireflies signature verified successfully`) } } @@ -870,10 +843,6 @@ export async function checkWebhookPreprocessing( return NextResponse.json({ error: error.message }, { status: error.statusCode }) } - logger.debug(`[${requestId}] Webhook preprocessing passed`, { - provider: foundWebhook.provider, - }) - return null } catch (preprocessError) { logger.error(`[${requestId}] Error during webhook preprocessing:`, preprocessError) diff --git a/apps/sim/lib/webhooks/rss-polling-service.ts b/apps/sim/lib/webhooks/rss-polling-service.ts index d75daa62fb..a7bff5af1c 100644 --- a/apps/sim/lib/webhooks/rss-polling-service.ts +++ b/apps/sim/lib/webhooks/rss-polling-service.ts @@ -263,8 +263,6 @@ async function fetchNewRssItems( requestId: string ): Promise<{ feed: RssFeed; items: RssItem[] }> { try { - logger.debug(`[${requestId}] Fetching RSS feed: ${config.feedUrl}`) - const urlValidation = await validateUrlWithDNS(config.feedUrl, 'feedUrl') if (!urlValidation.isValid) { logger.error(`[${requestId}] Invalid RSS feed URL: ${urlValidation.error}`) @@ -288,7 +286,6 @@ async function fetchNewRssItems( const feed = await parser.parseString(xmlContent) if (!feed.items || !feed.items.length) { - logger.debug(`[${requestId}] No items in feed`) return { feed: feed as RssFeed, items: [] } } diff --git a/apps/sim/lib/webhooks/utils.server.ts b/apps/sim/lib/webhooks/utils.server.ts index 8603b5bfbf..f63983577b 100644 --- a/apps/sim/lib/webhooks/utils.server.ts +++ b/apps/sim/lib/webhooks/utils.server.ts @@ -67,7 +67,6 @@ export async function handleWhatsAppVerification( const verificationToken = providerConfig.verificationToken if (!verificationToken) { - logger.debug(`[${requestId}] Webhook ${wh.id} has no verification token, skipping`) continue } @@ -1599,7 +1598,6 @@ export function verifyProviderWebhook( case 'telegram': { // Check User-Agent to ensure it's not blocked by middleware const userAgent = request.headers.get('user-agent') || '' - logger.debug(`[${requestId}] Telegram webhook request received with User-Agent: ${userAgent}`) if (!userAgent) { logger.warn( @@ -1613,8 +1611,6 @@ export function verifyProviderWebhook( request.headers.get('x-real-ip') || 'unknown' - logger.debug(`[${requestId}] Telegram webhook request from IP: ${clientIp}`) - break } case 'microsoft-teams': @@ -1774,14 +1770,8 @@ export async function fetchAndProcessAirtablePayloads( if (storedCursor && typeof storedCursor === 'number') { currentCursor = storedCursor - logger.debug( - `[${requestId}] Using stored cursor: ${currentCursor} for webhook ${webhookData.id}` - ) } else { currentCursor = null - logger.debug( - `[${requestId}] No valid stored cursor for webhook ${webhookData.id}, starting from beginning` - ) } let accessToken: string | null = null @@ -1797,8 +1787,6 @@ export async function fetchAndProcessAirtablePayloads( ) throw new Error('Airtable access token not found.') } - - logger.info(`[${requestId}] Successfully obtained Airtable access token`) } catch (tokenError: any) { logger.error( `[${requestId}] Failed to get Airtable OAuth token for credential ${credentialId}`, @@ -1818,10 +1806,6 @@ export async function fetchAndProcessAirtablePayloads( apiCallCount++ // Safety break if (apiCallCount > 10) { - logger.warn(`[${requestId}] Reached maximum polling limit (10 calls)`, { - webhookId: webhookData.id, - consolidatedCount: consolidatedChangesMap.size, - }) mightHaveMore = false break } @@ -1833,11 +1817,6 @@ export async function fetchAndProcessAirtablePayloads( } const fullUrl = `${apiUrl}?${queryParams.toString()}` - logger.debug(`[${requestId}] Fetching Airtable payloads (call ${apiCallCount})`, { - url: fullUrl, - webhookId: webhookData.id, - }) - try { const fetchStartTime = Date.now() const response = await fetch(fullUrl, { @@ -1848,14 +1827,6 @@ export async function fetchAndProcessAirtablePayloads( }, }) - // DEBUG: Log API response time - logger.debug(`[${requestId}] TRACE: Airtable API response received`, { - status: response.status, - duration: `${Date.now() - fetchStartTime}ms`, - hasBody: true, - apiCall: apiCallCount, - }) - const responseBody = await response.json() if (!response.ok || responseBody.error) { @@ -1877,9 +1848,6 @@ export async function fetchAndProcessAirtablePayloads( } const receivedPayloads = responseBody.payloads || [] - logger.debug( - `[${requestId}] Received ${receivedPayloads.length} payloads from Airtable (call ${apiCallCount})` - ) // --- Process and Consolidate Changes --- if (receivedPayloads.length > 0) { @@ -1891,13 +1859,6 @@ export async function fetchAndProcessAirtablePayloads( let changeCount = 0 for (const payload of receivedPayloads) { if (payload.changedTablesById) { - // DEBUG: Log tables being processed - const tableIds = Object.keys(payload.changedTablesById) - logger.debug(`[${requestId}] TRACE: Processing changes for tables`, { - tables: tableIds, - payloadTimestamp: payload.timestamp, - }) - for (const [tableId, tableChangesUntyped] of Object.entries( payload.changedTablesById )) { @@ -1907,10 +1868,6 @@ export async function fetchAndProcessAirtablePayloads( if (tableChanges.createdRecordsById) { const createdCount = Object.keys(tableChanges.createdRecordsById).length changeCount += createdCount - // DEBUG: Log created records count - logger.debug( - `[${requestId}] TRACE: Processing ${createdCount} created records for table ${tableId}` - ) for (const [recordId, recordDataUntyped] of Object.entries( tableChanges.createdRecordsById @@ -1940,10 +1897,6 @@ export async function fetchAndProcessAirtablePayloads( if (tableChanges.changedRecordsById) { const updatedCount = Object.keys(tableChanges.changedRecordsById).length changeCount += updatedCount - // DEBUG: Log updated records count - logger.debug( - `[${requestId}] TRACE: Processing ${updatedCount} updated records for table ${tableId}` - ) for (const [recordId, recordDataUntyped] of Object.entries( tableChanges.changedRecordsById @@ -1980,21 +1933,12 @@ export async function fetchAndProcessAirtablePayloads( } } } - - // DEBUG: Log totals for this batch - logger.debug( - `[${requestId}] TRACE: Processed ${changeCount} changes in API call ${apiCallCount})`, - { - currentMapSize: consolidatedChangesMap.size, - } - ) } const nextCursor = responseBody.cursor mightHaveMore = responseBody.mightHaveMore || false if (nextCursor && typeof nextCursor === 'number' && nextCursor !== currentCursor) { - logger.debug(`[${requestId}] Updating cursor from ${currentCursor} to ${nextCursor}`) currentCursor = nextCursor // Follow exactly the old implementation - use awaited update instead of parallel @@ -2031,7 +1975,6 @@ export async function fetchAndProcessAirtablePayloads( }) mightHaveMore = false } else if (nextCursor === currentCursor) { - logger.debug(`[${requestId}] Cursor hasn't changed (${currentCursor}), stopping poll`) mightHaveMore = false // Explicitly stop if cursor hasn't changed } } catch (fetchError: any) { @@ -2123,14 +2066,6 @@ export async function fetchAndProcessAirtablePayloads( ) // Error logging handled by logging session } - - // DEBUG: Log function completion - logger.debug(`[${requestId}] TRACE: fetchAndProcessAirtablePayloads completed`, { - totalFetched: payloadsFetched, - totalApiCalls: apiCallCount, - totalChanges: consolidatedChangesMap.size, - timestamp: new Date().toISOString(), - }) } // Define an interface for AirtableChange diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index b7437bd090..b96a872dbf 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -342,8 +342,6 @@ export async function executeWorkflowCore( contextExtensions, }) - loggingSession.setupExecutor(executorInstance) - // Convert initial workflow variables to their native types if (workflowVariables) { for (const [varId, variable] of Object.entries(workflowVariables)) { @@ -362,60 +360,52 @@ export async function executeWorkflowCore( )) as ExecutionResult) : ((await executorInstance.execute(workflowId, resolvedTriggerBlockId)) as ExecutionResult) - // Build trace spans for logging from the full execution result - const { traceSpans, totalDuration } = buildTraceSpans(result) - - // Update workflow run counts - if (result.success && result.status !== 'paused') { - await updateWorkflowRunCounts(workflowId) - } - - if (result.status === 'cancelled') { - await loggingSession.safeCompleteWithCancellation({ - endedAt: new Date().toISOString(), - totalDurationMs: totalDuration || 0, - traceSpans: traceSpans || [], - }) - - await clearExecutionCancellation(executionId) - - logger.info(`[${requestId}] Workflow execution cancelled`, { - duration: result.metadata?.duration, - }) - - return result - } - - if (result.status === 'paused') { - await loggingSession.safeCompleteWithPause({ - endedAt: new Date().toISOString(), - totalDurationMs: totalDuration || 0, - traceSpans: traceSpans || [], - workflowInput: processedInput, - }) - - await clearExecutionCancellation(executionId) - - logger.info(`[${requestId}] Workflow execution paused`, { - duration: result.metadata?.duration, - }) - - return result - } + // Fire-and-forget: post-execution logging, billing, and cleanup + void (async () => { + try { + const { traceSpans, totalDuration } = buildTraceSpans(result) + + if (result.success && result.status !== 'paused') { + try { + await updateWorkflowRunCounts(workflowId) + } catch (runCountError) { + logger.error(`[${requestId}] Failed to update run counts`, { error: runCountError }) + } + } - await loggingSession.safeComplete({ - endedAt: new Date().toISOString(), - totalDurationMs: totalDuration || 0, - finalOutput: result.output || {}, - traceSpans: traceSpans || [], - workflowInput: processedInput, - executionState: result.executionState, - }) + if (result.status === 'cancelled') { + await loggingSession.safeCompleteWithCancellation({ + endedAt: new Date().toISOString(), + totalDurationMs: totalDuration || 0, + traceSpans: traceSpans || [], + }) + } else if (result.status === 'paused') { + await loggingSession.safeCompleteWithPause({ + endedAt: new Date().toISOString(), + totalDurationMs: totalDuration || 0, + traceSpans: traceSpans || [], + workflowInput: processedInput, + }) + } else { + await loggingSession.safeComplete({ + endedAt: new Date().toISOString(), + totalDurationMs: totalDuration || 0, + finalOutput: result.output || {}, + traceSpans: traceSpans || [], + workflowInput: processedInput, + executionState: result.executionState, + }) + } - await clearExecutionCancellation(executionId) + await clearExecutionCancellation(executionId) + } catch (postExecError) { + logger.error(`[${requestId}] Post-execution logging failed`, { error: postExecError }) + } + })() logger.info(`[${requestId}] Workflow execution completed`, { success: result.success, + status: result.status, duration: result.metadata?.duration, }) @@ -423,20 +413,31 @@ export async function executeWorkflowCore( } catch (error: unknown) { logger.error(`[${requestId}] Execution failed:`, error) - const executionResult = hasExecutionResult(error) ? error.executionResult : undefined - const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] } - - await loggingSession.safeCompleteWithError({ - endedAt: new Date().toISOString(), - totalDurationMs: executionResult?.metadata?.duration || 0, - error: { - message: error instanceof Error ? error.message : 'Execution failed', - stackTrace: error instanceof Error ? error.stack : undefined, - }, - traceSpans, - }) - - await clearExecutionCancellation(executionId) + // Fire-and-forget: error logging and cleanup + void (async () => { + try { + const executionResult = hasExecutionResult(error) ? error.executionResult : undefined + const { traceSpans } = executionResult + ? buildTraceSpans(executionResult) + : { traceSpans: [] } + + await loggingSession.safeCompleteWithError({ + endedAt: new Date().toISOString(), + totalDurationMs: executionResult?.metadata?.duration || 0, + error: { + message: error instanceof Error ? error.message : 'Execution failed', + stackTrace: error instanceof Error ? error.stack : undefined, + }, + traceSpans, + }) + + await clearExecutionCancellation(executionId) + } catch (postExecError) { + logger.error(`[${requestId}] Post-execution error logging failed`, { + error: postExecError, + }) + } + })() throw error } diff --git a/apps/sim/lib/workflows/persistence/utils.ts b/apps/sim/lib/workflows/persistence/utils.ts index 3e2a16c5fa..b9d70021d9 100644 --- a/apps/sim/lib/workflows/persistence/utils.ts +++ b/apps/sim/lib/workflows/persistence/utils.ts @@ -76,7 +76,10 @@ export async function blockExistsInDeployment( } } -export async function loadDeployedWorkflowState(workflowId: string): Promise { +export async function loadDeployedWorkflowState( + workflowId: string, + providedWorkspaceId?: string +): Promise { try { const [active] = await db .select({ @@ -100,15 +103,19 @@ export async function loadDeployedWorkflowState(workflowId: string): Promise } - const [wfRow] = await db - .select({ workspaceId: workflow.workspaceId }) - .from(workflow) - .where(eq(workflow.id, workflowId)) - .limit(1) + let resolvedWorkspaceId = providedWorkspaceId + if (!resolvedWorkspaceId) { + const [wfRow] = await db + .select({ workspaceId: workflow.workspaceId }) + .from(workflow) + .where(eq(workflow.id, workflowId)) + .limit(1) + resolvedWorkspaceId = wfRow?.workspaceId ?? undefined + } const resolvedBlocks = state.blocks || {} - const { blocks: migratedBlocks } = wfRow?.workspaceId - ? await migrateCredentialIds(resolvedBlocks, wfRow.workspaceId) + const { blocks: migratedBlocks } = resolvedWorkspaceId + ? await migrateCredentialIds(resolvedBlocks, resolvedWorkspaceId) : { blocks: resolvedBlocks } return {