Skip to content

Commit 12aa7e2

Browse files
committed
Run async execs in background
1 parent a925c04 commit 12aa7e2

2 files changed

Lines changed: 122 additions & 12 deletions

File tree

apps/api/src/routes/execs.test.ts

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ describe.skipIf(!RUN_API_INTEGRATION_TESTS)('GET /v1/sandboxes/:id/exec/:execId
332332
expect(body.ended_at).toBeDefined()
333333
})
334334

335-
test('returns queued exec from async execution', async () => {
335+
test('returns completed exec from async execution (background fiber finishes)', async () => {
336336
const env = createTestEnv()
337337
const sandboxId = await createRunningSandbox(env)
338338

@@ -347,6 +347,9 @@ describe.skipIf(!RUN_API_INTEGRATION_TESTS)('GET /v1/sandboxes/:id/exec/:execId
347347
)
348348
const execBody = (yield* execRes.json) as { exec_id: string }
349349

350+
// Allow the daemon fiber to complete
351+
yield* Effect.yieldNow()
352+
350353
const getRes = yield* client.execute(
351354
HttpClientRequest.get(
352355
`/v1/sandboxes/${sandboxId}/exec/${execBody.exec_id}`,
@@ -359,8 +362,9 @@ describe.skipIf(!RUN_API_INTEGRATION_TESTS)('GET /v1/sandboxes/:id/exec/:execId
359362

360363
expect(result.status).toBe(200)
361364
const body = result.body as Record<string, unknown>
362-
expect(body.status).toBe('queued')
363-
expect(body.exit_code).toBeNull()
365+
// Background fiber runs the exec and transitions to done
366+
expect(body.status).toBe('done')
367+
expect(body.exit_code).toBe(0)
364368
})
365369

366370
test('returns 404 for unknown exec', async () => {
@@ -474,13 +478,16 @@ describe.skipIf(!RUN_API_INTEGRATION_TESTS)('GET /v1/sandboxes/:id/execs — lis
474478
HttpClientRequest.bodyUnsafeJson({ cmd: ['echo', 'sync'] }),
475479
),
476480
)
477-
// Async exec → queued
481+
// Async exec → also completes to done (background fiber runs immediately in tests)
478482
yield* client.execute(
479483
HttpClientRequest.post(`/v1/sandboxes/${sandboxId}/exec`).pipe(
480484
HttpClientRequest.bodyUnsafeJson({ cmd: ['sleep', '10'], wait: false }),
481485
),
482486
)
483487

488+
// Allow the daemon fiber to complete
489+
yield* Effect.yieldNow()
490+
484491
const response = yield* client.execute(
485492
HttpClientRequest.get(`/v1/sandboxes/${sandboxId}/execs?status=done`),
486493
)
@@ -491,8 +498,10 @@ describe.skipIf(!RUN_API_INTEGRATION_TESTS)('GET /v1/sandboxes/:id/execs — lis
491498

492499
expect(result.status).toBe(200)
493500
const body = result.body as { execs: Array<{ status: string }> }
494-
expect(body.execs.length).toBe(1)
501+
// Both execs complete to done (background fiber finishes immediately with in-memory services)
502+
expect(body.execs.length).toBe(2)
495503
expect(body.execs[0].status).toBe('done')
504+
expect(body.execs[1].status).toBe('done')
496505
})
497506

498507
test('returns 404 for unknown sandbox', async () => {
@@ -710,7 +719,7 @@ describe.skipIf(!RUN_API_INTEGRATION_TESTS)('Exec status transitions', () => {
710719
expect(result.ended_at).toBeDefined()
711720
})
712721

713-
test('async exec stays queued until processed', async () => {
722+
test('async exec transitions to done after background fiber completes', async () => {
714723
const env = createTestEnv()
715724
const sandboxId = await createRunningSandbox(env)
716725

@@ -725,6 +734,9 @@ describe.skipIf(!RUN_API_INTEGRATION_TESTS)('Exec status transitions', () => {
725734
)
726735
const execBody = (yield* execRes.json) as { exec_id: string }
727736

737+
// Allow the daemon fiber to complete
738+
yield* Effect.yieldNow()
739+
728740
const getRes = yield* client.execute(
729741
HttpClientRequest.get(
730742
`/v1/sandboxes/${sandboxId}/exec/${execBody.exec_id}`,
@@ -735,10 +747,10 @@ describe.skipIf(!RUN_API_INTEGRATION_TESTS)('Exec status transitions', () => {
735747
}),
736748
)
737749

738-
expect(result.status).toBe('queued')
739-
expect(result.exit_code).toBeNull()
740-
expect(result.started_at).toBeNull()
741-
expect(result.ended_at).toBeNull()
750+
expect(result.status).toBe('done')
751+
expect(result.exit_code).toBe(0)
752+
expect(result.started_at).toBeDefined()
753+
expect(result.ended_at).toBeDefined()
742754
})
743755
})
744756

apps/api/src/routes/execs.ts

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { HttpRouter, HttpServerRequest, HttpServerResponse } from '@effect/platform'
2-
import { Effect } from 'effect'
2+
import { Cause, Effect } from 'effect'
33
import {
44
generateUUIDv7,
55
idToBytes,
@@ -68,6 +68,92 @@ function rowToExec(row: ExecRow): Exec {
6868
}
6969
}
7070

71+
// -- Background async exec ----------------------------------------------------
72+
73+
const runAsyncExec = (
74+
execId: Uint8Array,
75+
execIdStr: string,
76+
sandboxIdBytes: Uint8Array,
77+
cmdArray: string[],
78+
cwd: string,
79+
env: Record<string, string>,
80+
timeoutSeconds: number,
81+
) =>
82+
Effect.gen(function* () {
83+
const execRepo = yield* ExecRepo
84+
const nodeClient = yield* NodeClient
85+
const redis = yield* RedisService
86+
87+
// Transition to running
88+
yield* execRepo.updateStatus(execId, 'running', { startedAt: new Date() })
89+
90+
const result = yield* nodeClient.exec({
91+
sandboxId: sandboxIdBytes,
92+
execId: execIdStr,
93+
cmd: cmdArray,
94+
cwd,
95+
env,
96+
timeoutSeconds,
97+
})
98+
99+
// Push events to Redis for SSE consumption
100+
let eventSeq = 0
101+
const now = new Date().toISOString()
102+
103+
if (result.stdout) {
104+
eventSeq++
105+
yield* redis.pushExecEvent(
106+
execIdStr,
107+
{ seq: eventSeq, ts: now, data: { seq: eventSeq, t: 'stdout', data: result.stdout } },
108+
EVENT_TTL_SECONDS,
109+
)
110+
}
111+
if (result.stderr) {
112+
eventSeq++
113+
yield* redis.pushExecEvent(
114+
execIdStr,
115+
{ seq: eventSeq, ts: now, data: { seq: eventSeq, t: 'stderr', data: result.stderr } },
116+
EVENT_TTL_SECONDS,
117+
)
118+
}
119+
eventSeq++
120+
yield* redis.pushExecEvent(
121+
execIdStr,
122+
{
123+
seq: eventSeq,
124+
ts: now,
125+
data: {
126+
seq: eventSeq,
127+
t: 'exit',
128+
code: result.exitCode,
129+
duration_ms: result.durationMs,
130+
resource_usage: {
131+
cpu_ms: result.cpuMs,
132+
peak_memory_bytes: result.peakMemoryBytes,
133+
},
134+
},
135+
},
136+
EVENT_TTL_SECONDS,
137+
)
138+
139+
// Update exec row to done
140+
yield* execRepo.updateStatus(execId, 'done', {
141+
exitCode: result.exitCode,
142+
cpuMs: result.cpuMs,
143+
peakMemoryBytes: result.peakMemoryBytes,
144+
durationMs: result.durationMs,
145+
endedAt: new Date(),
146+
})
147+
}).pipe(
148+
Effect.catchAllCause((cause) =>
149+
Effect.gen(function* () {
150+
yield* Effect.logError(`Async exec failed for ${execIdStr}: ${Cause.pretty(cause)}`)
151+
const execRepo = yield* ExecRepo
152+
yield* execRepo.updateStatus(execId, 'failed', { endedAt: new Date() })
153+
}).pipe(Effect.catchAll(() => Effect.void)),
154+
),
155+
)
156+
71157
// -- Execute command ----------------------------------------------------------
72158

73159
const execCommand = Effect.gen(function* () {
@@ -185,8 +271,20 @@ const execCommand = Effect.gen(function* () {
185271

186272
const execIdStr = bytesToId(EXEC_PREFIX, execId)
187273

188-
// Async mode: return immediately
274+
// Async mode: fire background execution and return immediately
189275
if (!wait) {
276+
yield* Effect.forkDaemon(
277+
runAsyncExec(
278+
execId,
279+
execIdStr,
280+
sandboxIdBytes,
281+
cmdArray,
282+
body.cwd ?? DEFAULT_CWD,
283+
body.env ?? {},
284+
timeoutSeconds,
285+
),
286+
)
287+
190288
const response: ExecAsyncResponse = {
191289
exec_id: execIdStr,
192290
status: 'queued',

0 commit comments

Comments
 (0)