Skip to content

Commit a1b91c1

Browse files
waleedlatif1claude
andcommitted
fix(processing): unblock error responses and isolate run-count failures
Remove unnecessary `await waitForCompletion()` from non-SSE and SSE error paths where no `markAsFailed()` follows — these were blocking error responses on log persistence for no reason. Wrap `updateWorkflowRunCounts` in its own try/catch so a run-count DB failure cannot prevent session completion, billing, and trace span persistence. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 1b3708a commit a1b91c1

File tree

3 files changed

+46
-25
lines changed

3 files changed

+46
-25
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/ev
2222
import { processInputFileFields } from '@/lib/execution/files'
2323
import { preprocessExecution } from '@/lib/execution/preprocessing'
2424
import { LoggingSession } from '@/lib/logs/execution/logging-session'
25-
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
2625
import {
2726
cleanupExecutionBase64Cache,
2827
hydrateUserFilesWithBase64,
@@ -642,6 +641,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
642641
logger.info(`[${requestId}] Non-SSE execution timed out`, {
643642
timeoutMs: timeoutController.timeoutMs,
644643
})
644+
await loggingSession.waitForCompletion()
645645
await loggingSession.markAsFailed(timeoutErrorMessage)
646646

647647
return NextResponse.json(
@@ -698,20 +698,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
698698

699699
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
700700

701-
// Fire-and-forget: execution-core.ts already handles logging via its own
702-
// fire-and-forget call. The `completed` guard in LoggingSession prevents
703-
// double-writes, so this is a no-op — but we avoid awaiting it to reduce
704-
// error-response latency.
705-
const { traceSpans, totalDuration } = executionResult
706-
? buildTraceSpans(executionResult)
707-
: { traceSpans: [], totalDuration: 0 }
708-
709-
void loggingSession.safeCompleteWithError({
710-
totalDurationMs: totalDuration || executionResult?.metadata?.duration,
711-
error: { message: errorMessage },
712-
traceSpans,
713-
})
714-
715701
return NextResponse.json(
716702
{
717703
success: false,
@@ -1041,6 +1027,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
10411027
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
10421028
executionId,
10431029
})
1030+
await loggingSession.waitForCompletion()
10441031
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
10451032
} else {
10461033
try {
@@ -1056,6 +1043,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
10561043
executionId,
10571044
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
10581045
})
1046+
await loggingSession.waitForCompletion()
10591047
await loggingSession.markAsFailed(
10601048
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
10611049
)
@@ -1072,6 +1060,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
10721060
timeoutMs: timeoutController.timeoutMs,
10731061
})
10741062

1063+
await loggingSession.waitForCompletion()
10751064
await loggingSession.markAsFailed(timeoutErrorMessage)
10761065

10771066
sendEvent({
@@ -1133,15 +1122,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
11331122
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`, { isTimeout })
11341123

11351124
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
1136-
const { traceSpans, totalDuration } = executionResult
1137-
? buildTraceSpans(executionResult)
1138-
: { traceSpans: [], totalDuration: 0 }
1139-
1140-
await loggingSession.safeCompleteWithError({
1141-
totalDurationMs: totalDuration || executionResult?.metadata?.duration,
1142-
error: { message: errorMessage },
1143-
traceSpans,
1144-
})
11451125

11461126
sendEvent({
11471127
type: 'execution:error',

apps/sim/lib/logs/execution/logging-session.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ export class LoggingSession {
9191
private completed = false
9292
/** Synchronous flag to prevent concurrent completion attempts (race condition guard) */
9393
private completing = false
94+
/** Tracks the in-flight completion promise so callers can await it */
95+
private completionPromise: Promise<void> | null = null
9496
private accumulatedCost: AccumulatedCost = {
9597
total: BASE_EXECUTION_CHARGE,
9698
input: 0,
@@ -694,7 +696,27 @@ export class LoggingSession {
694696
}
695697
}
696698

699+
/**
700+
* Wait for any in-flight fire-and-forget completion to finish.
701+
* Used by callers (e.g. markAsFailed) that need to ensure completion
702+
* has settled before overwriting execution status.
703+
*/
704+
async waitForCompletion(): Promise<void> {
705+
if (this.completionPromise) {
706+
try {
707+
await this.completionPromise
708+
} catch {
709+
/* already handled by safe* wrapper */
710+
}
711+
}
712+
}
713+
697714
async safeComplete(params: SessionCompleteParams = {}): Promise<void> {
715+
this.completionPromise = this._safeCompleteImpl(params)
716+
return this.completionPromise
717+
}
718+
719+
private async _safeCompleteImpl(params: SessionCompleteParams = {}): Promise<void> {
698720
try {
699721
await this.complete(params)
700722
} catch (error) {
@@ -714,6 +736,11 @@ export class LoggingSession {
714736
}
715737

716738
async safeCompleteWithError(params?: SessionErrorCompleteParams): Promise<void> {
739+
this.completionPromise = this._safeCompleteWithErrorImpl(params)
740+
return this.completionPromise
741+
}
742+
743+
private async _safeCompleteWithErrorImpl(params?: SessionErrorCompleteParams): Promise<void> {
717744
try {
718745
await this.completeWithError(params)
719746
} catch (error) {
@@ -735,6 +762,11 @@ export class LoggingSession {
735762
}
736763

737764
async safeCompleteWithCancellation(params?: SessionCancelledParams): Promise<void> {
765+
this.completionPromise = this._safeCompleteWithCancellationImpl(params)
766+
return this.completionPromise
767+
}
768+
769+
private async _safeCompleteWithCancellationImpl(params?: SessionCancelledParams): Promise<void> {
738770
try {
739771
await this.completeWithCancellation(params)
740772
} catch (error) {
@@ -755,6 +787,11 @@ export class LoggingSession {
755787
}
756788

757789
async safeCompleteWithPause(params?: SessionPausedParams): Promise<void> {
790+
this.completionPromise = this._safeCompleteWithPauseImpl(params)
791+
return this.completionPromise
792+
}
793+
794+
private async _safeCompleteWithPauseImpl(params?: SessionPausedParams): Promise<void> {
758795
try {
759796
await this.completeWithPause(params)
760797
} catch (error) {

apps/sim/lib/workflows/executor/execution-core.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,11 @@ export async function executeWorkflowCore(
368368
const { traceSpans, totalDuration } = buildTraceSpans(result)
369369

370370
if (result.success && result.status !== 'paused') {
371-
await updateWorkflowRunCounts(workflowId)
371+
try {
372+
await updateWorkflowRunCounts(workflowId)
373+
} catch (runCountError) {
374+
logger.error(`[${requestId}] Failed to update run counts`, { error: runCountError })
375+
}
372376
}
373377

374378
if (result.status === 'cancelled') {

0 commit comments

Comments
 (0)