diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index 369eea0f7a0..b0f056da104 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -23,7 +23,10 @@ import { makeSqlitePersistenceLive, SqlitePersistenceMemory, } from "../../persistence/Layers/Sqlite.ts"; -import { OrchestrationEventStore } from "../../persistence/Services/OrchestrationEventStore.ts"; +import { + OrchestrationEventStore, + type OrchestrationEventStoreShape, +} from "../../persistence/Services/OrchestrationEventStore.ts"; import { RepositoryIdentityResolverLive } from "../../project/Layers/RepositoryIdentityResolver.ts"; import { OrchestrationEngineLive } from "./OrchestrationEngine.ts"; import { @@ -52,6 +55,134 @@ const exists = (filePath: string) => const BaseTestLayer = makeProjectionPipelinePrefixedTestLayer("t3-projection-pipeline-test-"); +const appendCleanRebuildTurnScenario = Effect.fn("appendCleanRebuildTurnScenario")(function* ( + eventStore: OrchestrationEventStoreShape, + input: { + readonly slug: string; + readonly projectId: ProjectId; + readonly threadId: ThreadId; + readonly turnId: TurnId; + readonly messageId: MessageId; + readonly createdAt: string; + readonly runningAt: string; + readonly finalAt: string; + readonly finalStatus: "stopped" | "error"; + readonly finalLastError: string; + }, +) { + yield* eventStore.append({ + type: "project.created", + eventId: EventId.make(`evt-${input.slug}-project`), + aggregateKind: "project", + aggregateId: input.projectId, + occurredAt: input.createdAt, + commandId: CommandId.make(`cmd-${input.slug}-project`), + causationEventId: null, + correlationId: CorrelationId.make(`cmd-${input.slug}-project`), + metadata: {}, + payload: { + projectId: input.projectId, + title: `Project ${input.slug}`, + workspaceRoot: `/tmp/${input.slug}`, + defaultModelSelection: null, + scripts: [], + createdAt: input.createdAt, + updatedAt: input.createdAt, + }, + }); + yield* eventStore.append({ + type: "thread.created", + eventId: EventId.make(`evt-${input.slug}-thread`), + aggregateKind: "thread", + aggregateId: input.threadId, + occurredAt: input.createdAt, + commandId: CommandId.make(`cmd-${input.slug}-thread`), + causationEventId: null, + correlationId: CorrelationId.make(`cmd-${input.slug}-thread`), + metadata: {}, + payload: { + threadId: input.threadId, + projectId: input.projectId, + title: `Thread ${input.slug}`, + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + model: "gpt-5-codex", + }, + runtimeMode: "approval-required", + interactionMode: "default", + branch: null, + worktreePath: null, + createdAt: input.createdAt, + updatedAt: input.createdAt, + }, + }); + yield* eventStore.append({ + type: "thread.turn-start-requested", + eventId: EventId.make(`evt-${input.slug}-turn-start`), + aggregateKind: "thread", + aggregateId: input.threadId, + occurredAt: input.createdAt, + commandId: CommandId.make(`cmd-${input.slug}-turn-start`), + causationEventId: null, + correlationId: CorrelationId.make(`cmd-${input.slug}-turn-start`), + metadata: {}, + payload: { + threadId: input.threadId, + messageId: input.messageId, + runtimeMode: "approval-required", + createdAt: input.createdAt, + }, + }); + yield* eventStore.append({ + type: "thread.session-set", + eventId: EventId.make(`evt-${input.slug}-running`), + aggregateKind: "thread", + aggregateId: input.threadId, + occurredAt: input.runningAt, + commandId: CommandId.make(`cmd-${input.slug}-running`), + causationEventId: null, + correlationId: CorrelationId.make(`cmd-${input.slug}-running`), + metadata: {}, + payload: { + threadId: input.threadId, + session: { + threadId: input.threadId, + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: input.turnId, + lastError: null, + updatedAt: input.runningAt, + }, + }, + }); + yield* eventStore.append({ + type: "thread.session-set", + eventId: EventId.make(`evt-${input.slug}-final`), + aggregateKind: "thread", + aggregateId: input.threadId, + occurredAt: input.finalAt, + commandId: CommandId.make(`cmd-${input.slug}-final`), + causationEventId: null, + correlationId: CorrelationId.make(`cmd-${input.slug}-final`), + metadata: {}, + payload: { + threadId: input.threadId, + session: { + threadId: input.threadId, + status: input.finalStatus, + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: null, + lastError: input.finalLastError, + updatedAt: input.finalAt, + }, + }, + }); +}); + it.layer(BaseTestLayer)("OrchestrationProjectionPipeline", (it) => { it.effect("bootstraps all projection states and writes projection rows", () => Effect.gen(function* () { @@ -2172,6 +2303,122 @@ it.effect("restores pending turn-start metadata across projection pipeline resta ), ); +it.effect( + "settles running turns during a clean projection rebuild without thread latest-turn state", + () => + Effect.gen(function* () { + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const sql = yield* SqlClient.SqlClient; + const projectId = ProjectId.make("project-rebuild-settle"); + const threadId = ThreadId.make("thread-rebuild-settle"); + const turnId = TurnId.make("turn-rebuild-settle"); + const messageId = MessageId.make("message-rebuild-settle"); + const createdAt = "2026-02-26T15:00:00.000Z"; + const runningAt = "2026-02-26T15:00:05.000Z"; + const stoppedAt = "2026-02-26T15:00:10.000Z"; + + yield* appendCleanRebuildTurnScenario(eventStore, { + slug: "rebuild-settle", + projectId, + threadId, + turnId, + messageId, + createdAt, + runningAt, + finalAt: stoppedAt, + finalStatus: "stopped", + finalLastError: "Provider runtime is no longer active.", + }); + + yield* projectionPipeline.bootstrap; + + const turnRows = yield* sql<{ + readonly turnId: string; + readonly state: string; + readonly completedAt: string | null; + }>` + SELECT + turn_id AS "turnId", + state, + completed_at AS "completedAt" + FROM projection_turns + WHERE thread_id = ${threadId} + AND turn_id = ${turnId} + `; + + assert.deepEqual(turnRows, [ + { + turnId, + state: "interrupted", + completedAt: stoppedAt, + }, + ]); + }).pipe( + Effect.provide( + makeProjectionPipelinePrefixedTestLayer("t3-projection-pipeline-rebuild-settle-"), + ), + ), +); + +it.effect( + "preserves failed turn state during a clean projection rebuild without thread latest-turn state", + () => + Effect.gen(function* () { + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const sql = yield* SqlClient.SqlClient; + const projectId = ProjectId.make("project-rebuild-error"); + const threadId = ThreadId.make("thread-rebuild-error"); + const turnId = TurnId.make("turn-rebuild-error"); + const messageId = MessageId.make("message-rebuild-error"); + const createdAt = "2026-02-26T16:00:00.000Z"; + const runningAt = "2026-02-26T16:00:05.000Z"; + const failedAt = "2026-02-26T16:00:10.000Z"; + + yield* appendCleanRebuildTurnScenario(eventStore, { + slug: "rebuild-error", + projectId, + threadId, + turnId, + messageId, + createdAt, + runningAt, + finalAt: failedAt, + finalStatus: "error", + finalLastError: "Prompt failed.", + }); + + yield* projectionPipeline.bootstrap; + + const turnRows = yield* sql<{ + readonly turnId: string; + readonly state: string; + readonly completedAt: string | null; + }>` + SELECT + turn_id AS "turnId", + state, + completed_at AS "completedAt" + FROM projection_turns + WHERE thread_id = ${threadId} + AND turn_id = ${turnId} + `; + + assert.deepEqual(turnRows, [ + { + turnId, + state: "error", + completedAt: failedAt, + }, + ]); + }).pipe( + Effect.provide( + makeProjectionPipelinePrefixedTestLayer("t3-projection-pipeline-rebuild-error-"), + ), + ), +); + const engineLayer = it.layer( OrchestrationEngineLive.pipe( Layer.provide(OrchestrationProjectionSnapshotQueryLive), diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 1161ff6a7d7..a6a14b3efc6 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -2,7 +2,9 @@ import { ApprovalRequestId, type ChatAttachment, type OrchestrationEvent, + type OrchestrationSessionStatus, ThreadId, + type TurnId, } from "@t3tools/contracts"; import * as Effect from "effect/Effect"; import * as FileSystem from "effect/FileSystem"; @@ -177,6 +179,21 @@ function deriveHasActionableProposedPlan(input: { return latestPlan !== null && latestPlan.implementedAt === null; } +function settleTurnStateFromSessionStatus( + status: OrchestrationSessionStatus, +): ProjectionTurn["state"] | null { + switch (status) { + case "error": + return "error"; + case "ready": + case "interrupted": + case "stopped": + return "interrupted"; + default: + return null; + } +} + function retainProjectionMessagesAfterRevert( messages: ReadonlyArray, turns: ReadonlyArray, @@ -712,9 +729,12 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti if (Option.isNone(existingRow)) { return; } + const latestTurnId = + event.payload.session.activeTurnId ?? + (event.payload.session.status === "running" ? null : existingRow.value.latestTurnId); yield* projectionThreadRepository.upsert({ ...existingRow.value, - latestTurnId: event.payload.session.activeTurnId, + latestTurnId, updatedAt: event.occurredAt, }); yield* refreshThreadShellSummary(event.payload.threadId); @@ -998,6 +1018,34 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti case "thread.session-set": { const turnId = event.payload.session.activeTurnId; if (turnId === null || event.payload.session.status !== "running") { + const settledState = settleTurnStateFromSessionStatus(event.payload.session.status); + if (settledState === null) { + return; + } + const turns = yield* projectionTurnRepository.listByThreadId({ + threadId: event.payload.threadId, + }); + const latestRunningTurn = turns + .filter( + (turn): turn is ProjectionTurn & { readonly turnId: TurnId } => + turn.turnId !== null && turn.state === "running", + ) + .reduce<(ProjectionTurn & { readonly turnId: TurnId }) | null>( + (latest, turn) => + latest === null || turn.requestedAt > latest.requestedAt ? turn : latest, + null, + ); + if (latestRunningTurn === null) { + return; + } + + yield* projectionTurnRepository.upsertByTurnId({ + ...latestRunningTurn, + state: settledState, + startedAt: latestRunningTurn.startedAt ?? event.payload.session.updatedAt, + requestedAt: latestRunningTurn.requestedAt ?? event.payload.session.updatedAt, + completedAt: latestRunningTurn.completedAt ?? event.payload.session.updatedAt, + }); return; } diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index 571164fad93..4232cc64cc3 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -32,7 +32,10 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { deriveServerPaths, ServerConfig } from "../../config.ts"; import { TextGenerationError } from "@t3tools/contracts"; -import { ProviderAdapterRequestError } from "../../provider/Errors.ts"; +import { + ProviderAdapterRequestError, + ProviderAdapterSessionNotFoundError, +} from "../../provider/Errors.ts"; import { OrchestrationEventStoreLive } from "../../persistence/Layers/OrchestrationEventStore.ts"; import { OrchestrationCommandReceiptRepositoryLive } from "../../persistence/Layers/OrchestrationCommandReceipts.ts"; import { SqlitePersistenceMemory } from "../../persistence/Layers/Sqlite.ts"; @@ -142,6 +145,9 @@ describe("ProviderCommandReactor", () => { readonly baseDir?: string; readonly threadModelSelection?: ModelSelection; readonly sessionModelSwitch?: "unsupported" | "in-session"; + readonly stopSessionOverride?: ProviderServiceShape["stopSession"]; + readonly listSessionsOverride?: ProviderServiceShape["listSessions"]; + readonly startReactor?: boolean; }) { const now = "2026-01-01T00:00:00.000Z"; const baseDir = input?.baseDir ?? fs.mkdtempSync(path.join(os.tmpdir(), "t3code-reactor-")); @@ -220,20 +226,16 @@ describe("ProviderCommandReactor", () => { const interruptTurn = vi.fn((_: unknown) => Effect.void); const respondToRequest = vi.fn(() => Effect.void); const respondToUserInput = vi.fn(() => Effect.void); - const stopSession = vi.fn((input: unknown) => - Effect.sync(() => { - const threadId = - typeof input === "object" && input !== null && "threadId" in input - ? (input as { threadId?: ThreadId }).threadId - : undefined; - if (!threadId) { - return; - } - const index = runtimeSessions.findIndex((session) => session.threadId === threadId); - if (index >= 0) { - runtimeSessions.splice(index, 1); - } - }), + const stopSession = vi.fn( + input?.stopSessionOverride ?? + ((input) => + Effect.sync(() => { + const threadId = input.threadId; + const index = runtimeSessions.findIndex((session) => session.threadId === threadId); + if (index >= 0) { + runtimeSessions.splice(index, 1); + } + })), ); const renameBranch = vi.fn((input: unknown) => Effect.succeed({ @@ -289,7 +291,7 @@ describe("ProviderCommandReactor", () => { respondToRequest: respondToRequest as ProviderServiceShape["respondToRequest"], respondToUserInput: respondToUserInput as ProviderServiceShape["respondToUserInput"], stopSession: stopSession as ProviderServiceShape["stopSession"], - listSessions: () => Effect.succeed(runtimeSessions), + listSessions: input?.listSessionsOverride ?? (() => Effect.succeed(runtimeSessions)), getCapabilities: (_provider) => Effect.succeed({ sessionModelSwitch: input?.sessionModelSwitch ?? "in-session", @@ -364,8 +366,12 @@ describe("ProviderCommandReactor", () => { const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); const snapshotQuery = await runtime.runPromise(Effect.service(ProjectionSnapshotQuery)); const reactor = await runtime.runPromise(Effect.service(ProviderCommandReactor)); - scope = await Effect.runPromise(Scope.make("sequential")); - await Effect.runPromise(reactor.start().pipe(Scope.provide(scope))); + const reactorScope = await Effect.runPromise(Scope.make("sequential")); + scope = reactorScope; + const startReactor = () => Effect.runPromise(reactor.start().pipe(Scope.provide(reactorScope))); + if (input?.startReactor !== false) { + await startReactor(); + } const drain = () => Effect.runPromise(reactor.drain); await Effect.runPromise( @@ -410,6 +416,7 @@ describe("ProviderCommandReactor", () => { generateThreadTitle, runtimeSessions, stateDir, + startReactor, drain, }; } @@ -1605,6 +1612,192 @@ describe("ProviderCommandReactor", () => { }); }); + it("marks projected running sessions stopped on startup when no live provider session exists", async () => { + const harness = await createHarness({ startReactor: false }); + const now = "2026-01-01T00:00:00.000Z"; + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make("cmd-session-set-stale-running"), + threadId: ThreadId.make("thread-1"), + session: { + threadId: ThreadId.make("thread-1"), + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: asTurnId("turn-stale"), + lastError: null, + updatedAt: now, + }, + createdAt: now, + }), + ); + + await harness.startReactor(); + + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(thread?.session).toMatchObject({ + threadId: ThreadId.make("thread-1"), + status: "stopped", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: null, + lastError: + "This thread was restored, but its running provider session was no longer available.\nStart a new turn to continue.", + }); + expect(thread?.latestTurn).toMatchObject({ + turnId: asTurnId("turn-stale"), + state: "interrupted", + }); + }); + + it("leaves projected running sessions alone on startup when a live provider session exists", async () => { + const harness = await createHarness({ startReactor: false }); + const now = "2026-01-01T00:00:00.000Z"; + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make("cmd-session-set-live-running"), + threadId: ThreadId.make("thread-1"), + session: { + threadId: ThreadId.make("thread-1"), + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: asTurnId("turn-live"), + lastError: null, + updatedAt: now, + }, + createdAt: now, + }), + ); + harness.runtimeSessions.push({ + provider: ProviderDriverKind.make("codex"), + providerInstanceId: ProviderInstanceId.make("codex"), + status: "running", + runtimeMode: "approval-required", + threadId: ThreadId.make("thread-1"), + cwd: "/tmp/provider-project", + resumeCursor: { opaque: "resume-live" }, + activeTurnId: asTurnId("turn-live"), + createdAt: now, + updatedAt: now, + }); + + await harness.startReactor(); + + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(thread?.session).toMatchObject({ + status: "running", + activeTurnId: asTurnId("turn-live"), + lastError: null, + }); + }); + + it("mirrors live ready provider sessions on startup and settles the stale projected turn", async () => { + const harness = await createHarness({ startReactor: false }); + const now = "2026-01-01T00:00:00.000Z"; + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make("cmd-session-set-live-ready"), + threadId: ThreadId.make("thread-1"), + session: { + threadId: ThreadId.make("thread-1"), + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: asTurnId("turn-stale-live-ready"), + lastError: "previous provider failure", + updatedAt: now, + }, + createdAt: now, + }), + ); + harness.runtimeSessions.push({ + provider: ProviderDriverKind.make("codex"), + providerInstanceId: ProviderInstanceId.make("codex"), + status: "ready", + runtimeMode: "approval-required", + threadId: ThreadId.make("thread-1"), + cwd: "/tmp/provider-project", + resumeCursor: { opaque: "resume-ready" }, + createdAt: now, + updatedAt: now, + }); + + await harness.startReactor(); + + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(thread?.session).toMatchObject({ + status: "ready", + activeTurnId: null, + lastError: null, + }); + expect(thread?.latestTurn).toMatchObject({ + turnId: asTurnId("turn-stale-live-ready"), + state: "interrupted", + }); + }); + + it("leaves projected running sessions alone on startup when live provider sessions cannot be listed", async () => { + const harness = await createHarness({ + startReactor: false, + listSessionsOverride: () => + Effect.die( + new ProviderAdapterRequestError({ + provider: "codex", + method: "ProviderService.listSessions", + detail: "Provider runtime did not answer.", + }), + ), + }); + const now = "2026-01-01T00:00:00.000Z"; + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make("cmd-session-set-list-failure"), + threadId: ThreadId.make("thread-1"), + session: { + threadId: ThreadId.make("thread-1"), + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: asTurnId("turn-list-failure"), + lastError: null, + updatedAt: now, + }, + createdAt: now, + }), + ); + + await harness.startReactor(); + + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(thread?.session).toMatchObject({ + status: "running", + activeTurnId: asTurnId("turn-list-failure"), + lastError: null, + }); + expect(thread?.latestTurn).toMatchObject({ + turnId: asTurnId("turn-list-failure"), + state: "running", + }); + }); + it("rejects active runtime sessions that are missing provider instance ids", async () => { const harness = await createHarness(); const now = "2026-01-01T00:00:00.000Z"; @@ -2007,4 +2200,133 @@ describe("ProviderCommandReactor", () => { expect(thread?.session?.providerInstanceId).toBe(ProviderInstanceId.make("codex_work")); expect(thread?.session?.activeTurnId).toBeNull(); }); + + it("keeps thread session state when provider session stop fails", async () => { + const now = "2026-01-01T00:00:00.000Z"; + const harness = await createHarness({ + stopSessionOverride: () => + Effect.fail( + new ProviderAdapterRequestError({ + provider: "codex", + method: "ProviderService.stopSession", + detail: "No live provider session was found for this thread.", + }), + ), + }); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make("cmd-session-set-for-stop-failure"), + threadId: ThreadId.make("thread-1"), + session: { + threadId: ThreadId.make("thread-1"), + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex_work"), + runtimeMode: "approval-required", + activeTurnId: asTurnId("turn-running"), + lastError: null, + updatedAt: now, + }, + createdAt: now, + }), + ); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.stop", + commandId: CommandId.make("cmd-session-stop-failure"), + threadId: ThreadId.make("thread-1"), + createdAt: now, + }), + ); + + await waitFor(async () => { + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + return ( + thread?.session?.status === "running" && + thread.session.activeTurnId === asTurnId("turn-running") && + thread.activities.some((activity) => activity.kind === "provider.session.stop.failed") + ); + }); + + expect(harness.stopSession.mock.calls.length).toBe(1); + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(thread?.session).not.toBeNull(); + expect(thread?.session?.status).toBe("running"); + expect(thread?.session?.providerInstanceId).toBe(ProviderInstanceId.make("codex_work")); + expect(thread?.session?.activeTurnId).toBe(asTurnId("turn-running")); + expect(thread?.session?.lastError).toBe("No live provider session was found for this thread."); + + const failureActivity = thread?.activities.find( + (activity) => activity.kind === "provider.session.stop.failed", + ); + expect(failureActivity).toBeDefined(); + expect(failureActivity?.turnId).toBe(asTurnId("turn-running")); + expect(failureActivity?.payload).toMatchObject({ + detail: "No live provider session was found for this thread.", + }); + }); + + it("treats already-gone provider sessions as a benign stop", async () => { + const now = "2026-01-01T00:00:00.000Z"; + const harness = await createHarness({ + stopSessionOverride: () => + Effect.fail( + new ProviderAdapterSessionNotFoundError({ + provider: "codex", + threadId: "thread-1", + }), + ), + }); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make("cmd-session-set-for-benign-stop"), + threadId: ThreadId.make("thread-1"), + session: { + threadId: ThreadId.make("thread-1"), + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex_work"), + runtimeMode: "approval-required", + activeTurnId: asTurnId("turn-running"), + lastError: null, + updatedAt: now, + }, + createdAt: now, + }), + ); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.stop", + commandId: CommandId.make("cmd-session-stop-benign"), + threadId: ThreadId.make("thread-1"), + createdAt: now, + }), + ); + + await waitFor(async () => { + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + return thread?.session?.status === "stopped" && thread.session.lastError === null; + }); + + expect(harness.stopSession.mock.calls.length).toBe(1); + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(thread?.session).not.toBeNull(); + expect(thread?.session?.status).toBe("stopped"); + expect(thread?.session?.providerInstanceId).toBe(ProviderInstanceId.make("codex_work")); + expect(thread?.session?.activeTurnId).toBeNull(); + expect(thread?.session?.lastError).toBeNull(); + expect( + thread?.activities.find((activity) => activity.kind === "provider.session.stop.failed"), + ).toBeUndefined(); + }); }); diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index 8b71a976808..47cef27ab48 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -15,9 +15,11 @@ import { import { isTemporaryWorktreeBranch, WORKTREE_BRANCH_PREFIX } from "@t3tools/shared/git"; import * as Cache from "effect/Cache"; import * as Cause from "effect/Cause"; +import * as DateTime from "effect/DateTime"; import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Equal from "effect/Equal"; +import * as Exit from "effect/Exit"; import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as Schema from "effect/Schema"; @@ -26,7 +28,11 @@ import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker"; import { resolveThreadWorkspaceCwd } from "../../checkpointing/Utils.ts"; import { increment, orchestrationEventsProcessedTotal } from "../../observability/Metrics.ts"; -import { ProviderAdapterRequestError } from "../../provider/Errors.ts"; +import { + ProviderAdapterRequestError, + ProviderAdapterSessionNotFoundError, + ProviderSessionNotFoundError, +} from "../../provider/Errors.ts"; import type { ProviderServiceError } from "../../provider/Errors.ts"; import { TextGeneration } from "../../textGeneration/TextGeneration.ts"; import { ProviderService } from "../../provider/Services/ProviderService.ts"; @@ -40,6 +46,8 @@ import { ServerSettingsService } from "../../serverSettings.ts"; import { VcsStatusBroadcaster } from "../../vcs/VcsStatusBroadcaster.ts"; import { GitWorkflowService } from "../../git/GitWorkflowService.ts"; const isProviderAdapterRequestError = Schema.is(ProviderAdapterRequestError); +const isProviderAdapterSessionNotFoundError = Schema.is(ProviderAdapterSessionNotFoundError); +const isProviderSessionNotFoundError = Schema.is(ProviderSessionNotFoundError); const isProviderDriverKind = Schema.is(ProviderDriverKind); type ProviderIntentEvent = Extract< @@ -88,6 +96,8 @@ const HANDLED_TURN_START_KEY_MAX = 10_000; const HANDLED_TURN_START_KEY_TTL = Duration.minutes(30); const DEFAULT_RUNTIME_MODE: RuntimeMode = "full-access"; const DEFAULT_THREAD_TITLE = "New thread"; +const STALE_RUNNING_SESSION_DETAIL = + "This thread was restored, but its running provider session was no longer available.\nStart a new turn to continue."; export function providerErrorLabel(value: string | undefined): string { const normalized = value?.trim(); @@ -244,6 +254,14 @@ const make = Effect.gen(function* () { return Cause.pretty(cause); }; + const isSessionAlreadyGoneFailure = (cause: Cause.Cause): boolean => { + const failReason = cause.reasons.find(Cause.isFailReason); + return ( + isProviderAdapterSessionNotFoundError(failReason?.error) || + isProviderSessionNotFoundError(failReason?.error) + ); + }; + const setThreadSession = (input: { readonly threadId: ThreadId; readonly session: OrchestrationSession; @@ -280,6 +298,71 @@ const make = Effect.gen(function* () { }); }); + const reconcileStaleRunningSessions = Effect.fn("reconcileStaleRunningSessions")(function* () { + const snapshot = yield* projectionSnapshotQuery.getSnapshot(); + const liveSessionsExit = yield* Effect.exit(providerService.listSessions()); + if (Exit.isFailure(liveSessionsExit)) { + // Runtime state is unknowable here; leave projected sessions untouched rather than + // incorrectly stopping work that may still be active while the provider recovers. + yield* Effect.logWarning( + "provider command reactor skipped stale session reconciliation because live sessions could not be listed", + { + cause: Cause.pretty(liveSessionsExit.cause), + }, + ); + return; + } + + const liveSessionsByThreadId = new Map( + liveSessionsExit.value.map((session) => [session.threadId, session]), + ); + const now = DateTime.formatIso(yield* DateTime.now); + yield* Effect.forEach( + snapshot.threads, + (thread) => { + const session = thread.session; + if (session === null || session.status !== "running" || session.activeTurnId === null) { + return Effect.void; + } + + const liveSession = liveSessionsByThreadId.get(thread.id); + if (liveSession !== undefined) { + const status = mapProviderSessionStatusToOrchestrationStatus(liveSession.status); + return setThreadSession({ + threadId: thread.id, + session: { + ...session, + status, + providerName: liveSession.provider, + ...(liveSession.providerInstanceId !== undefined + ? { providerInstanceId: liveSession.providerInstanceId } + : {}), + runtimeMode: liveSession.runtimeMode, + activeTurnId: liveSession.activeTurnId ?? null, + lastError: + liveSession.lastError ?? (status === "ready" ? null : (session.lastError ?? null)), + updatedAt: now, + }, + createdAt: now, + }); + } + + return setThreadSession({ + threadId: thread.id, + session: { + ...session, + status: "stopped", + activeTurnId: null, + lastError: session.lastError ?? STALE_RUNNING_SESSION_DETAIL, + updatedAt: now, + }, + createdAt: now, + }); + }, + { discard: true }, + ); + }); + const resolveProject = Effect.fnUntraced(function* (projectId: ProjectId) { return yield* projectionSnapshotQuery .getProjectShellById(projectId) @@ -910,7 +993,41 @@ const make = Effect.gen(function* () { const now = event.payload.createdAt; if (thread.session && thread.session.status !== "stopped") { - yield* providerService.stopSession({ threadId: thread.id }); + const stopExit = yield* Effect.exit(providerService.stopSession({ threadId: thread.id })); + if (Exit.isFailure(stopExit)) { + if (!isSessionAlreadyGoneFailure(stopExit.cause)) { + const detail = formatFailureDetail(stopExit.cause); + yield* appendProviderFailureActivity({ + threadId: thread.id, + kind: "provider.session.stop.failed", + summary: "Provider session stop failed", + detail, + turnId: thread.session.activeTurnId, + createdAt: now, + }).pipe( + Effect.catchCause((activityCause) => + Effect.logWarning( + "provider command reactor failed to record provider session stop failure", + { + threadId: thread.id, + cause: Cause.pretty(activityCause), + originalCause: Cause.pretty(stopExit.cause), + }, + ), + ), + ); + yield* setThreadSession({ + threadId: thread.id, + session: { + ...thread.session, + lastError: detail, + updatedAt: now, + }, + createdAt: now, + }); + return; + } + } } yield* setThreadSession({ @@ -1006,6 +1123,13 @@ const make = Effect.gen(function* () { yield* Effect.forkScoped( Stream.runForEach(orchestrationEngine.streamDomainEvents, processEvent), ); + yield* reconcileStaleRunningSessions().pipe( + Effect.catchCause((cause) => + Effect.logWarning("provider command reactor failed to reconcile stale running sessions", { + cause: Cause.pretty(cause), + }), + ), + ); }); return { diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 3b2411cba2a..8cefee75bd2 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -360,6 +360,91 @@ describe("ProviderRuntimeIngestion", () => { expect(thread.session?.lastError).toBe("turn failed"); }); + it("maps completed active turns into ready thread session updates", async () => { + const harness = await createHarness(); + const now = "2026-01-01T00:00:00.000Z"; + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-before-complete"), + provider: ProviderDriverKind.make("opencode"), + threadId: asThreadId("thread-1"), + createdAt: now, + turnId: asTurnId("turn-completed-success"), + }); + + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "running" && + thread.session?.activeTurnId === "turn-completed-success", + ); + + harness.emit({ + type: "turn.completed", + eventId: asEventId("evt-turn-completed-success"), + provider: ProviderDriverKind.make("opencode"), + threadId: asThreadId("thread-1"), + createdAt: "2026-01-01T00:00:01.000Z", + turnId: asTurnId("turn-completed-success"), + payload: { + state: "completed", + }, + }); + + const thread = await waitForThread( + harness.readModel, + (entry) => + entry.session?.status === "ready" && + entry.session?.activeTurnId === null && + entry.session?.lastError === null, + ); + expect(thread.session?.status).toBe("ready"); + expect(thread.session?.activeTurnId).toBeNull(); + }); + + it("maps turn aborted events into ready thread session updates", async () => { + const harness = await createHarness(); + const now = "2026-01-01T00:00:00.000Z"; + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-before-abort"), + provider: ProviderDriverKind.make("codex"), + threadId: asThreadId("thread-1"), + createdAt: now, + turnId: asTurnId("turn-aborted"), + }); + + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "running" && thread.session?.activeTurnId === "turn-aborted", + ); + + harness.emit({ + type: "turn.aborted", + eventId: asEventId("evt-turn-aborted"), + provider: ProviderDriverKind.make("codex"), + threadId: asThreadId("thread-1"), + createdAt: "2026-01-01T00:00:01.000Z", + turnId: asTurnId("turn-aborted"), + payload: { + reason: "Interrupted by user.", + }, + }); + + const thread = await waitForThread( + harness.readModel, + (entry) => + entry.session?.status === "ready" && + entry.session?.activeTurnId === null && + entry.session?.lastError === null, + ); + expect(thread.session?.status).toBe("ready"); + expect(thread.session?.activeTurnId).toBeNull(); + }); + it("applies provider session.state.changed transitions directly", async () => { const harness = await createHarness(); const waitingAt = "2026-01-01T00:00:00.000Z"; @@ -610,6 +695,121 @@ describe("ProviderRuntimeIngestion", () => { ); }); + it("keeps OpenCode main turn tracking when title turn events interleave", async () => { + const harness = await createHarness(); + const now = "2026-01-01T00:00:00.000Z"; + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-opencode-main-started"), + provider: ProviderDriverKind.make("opencode"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-opencode-main"), + }); + + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "running" && + thread.session?.activeTurnId === "turn-opencode-main", + ); + + harness.emit({ + type: "turn.completed", + eventId: asEventId("evt-opencode-title-completed"), + provider: ProviderDriverKind.make("opencode"), + createdAt: "2026-01-01T00:00:01.000Z", + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-opencode-title"), + payload: { + state: "completed", + }, + }); + + await harness.drain(); + const midReadModel = await harness.readModel(); + const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(midThread?.session?.status).toBe("running"); + expect(midThread?.session?.activeTurnId).toBe("turn-opencode-main"); + + harness.emit({ + type: "turn.completed", + eventId: asEventId("evt-opencode-main-completed"), + provider: ProviderDriverKind.make("opencode"), + createdAt: "2026-01-01T00:00:02.000Z", + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-opencode-main"), + payload: { + state: "completed", + }, + }); + + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "ready" && + thread.session?.activeTurnId === null && + thread.session?.lastError === null, + ); + }); + + it("ignores auxiliary turn aborts for a non-active turn", async () => { + const harness = await createHarness(); + const now = "2026-01-01T00:00:00.000Z"; + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-before-aux-abort"), + provider: ProviderDriverKind.make("codex"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-primary-abort"), + }); + + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "running" && + thread.session?.activeTurnId === "turn-primary-abort", + ); + + harness.emit({ + type: "turn.aborted", + eventId: asEventId("evt-turn-aborted-aux"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:01.000Z", + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-aux-abort"), + payload: { + reason: "Auxiliary turn aborted.", + }, + }); + + await harness.drain(); + const midReadModel = await harness.readModel(); + const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(midThread?.session?.status).toBe("running"); + expect(midThread?.session?.activeTurnId).toBe("turn-primary-abort"); + + harness.emit({ + type: "turn.aborted", + eventId: asEventId("evt-turn-aborted-primary"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:02.000Z", + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-primary-abort"), + payload: { + reason: "Primary turn aborted.", + }, + }); + + await waitForThread( + harness.readModel, + (thread) => thread.session?.status === "ready" && thread.session?.activeTurnId === null, + ); + }); + it("ignores non-active turn completion when runtime omits thread id", async () => { const harness = await createHarness(); const now = "2026-01-01T00:00:00.000Z"; diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 2c07ac91b1e..6499a0b7fad 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -1211,6 +1211,7 @@ const make = Effect.gen(function* () { return true; case "turn.started": return !conflictsWithActiveTurn; + case "turn.aborted": case "turn.completed": if (conflictsWithActiveTurn || missingTurnForActiveTurn) { return false; @@ -1236,12 +1237,15 @@ const make = Effect.gen(function* () { event.type === "session.exited" || event.type === "thread.started" || event.type === "turn.started" || + event.type === "turn.aborted" || event.type === "turn.completed" ) { const nextActiveTurnId = event.type === "turn.started" ? (eventTurnId ?? null) - : event.type === "turn.completed" || event.type === "session.exited" + : event.type === "turn.aborted" || + event.type === "turn.completed" || + event.type === "session.exited" ? null : activeTurnId; const status = (() => { @@ -1252,6 +1256,8 @@ const make = Effect.gen(function* () { return "running"; case "session.exited": return "stopped"; + case "turn.aborted": + return "ready"; case "turn.completed": return normalizeRuntimeTurnState(event.payload.state) === "failed" ? "error" diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts index 66dc5edc671..5dcacabdbaa 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts @@ -18,6 +18,7 @@ import { ProviderDriverKind, ProviderInstanceId, ThreadId, + type ProviderRuntimeEvent, } from "@t3tools/contracts"; import { createModelSelection } from "@t3tools/shared/model"; import { ServerConfig } from "../../config.ts"; @@ -366,6 +367,18 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { runtimeMode: "full-access", }); + const observedEvents: ProviderRuntimeEvent[] = []; + const eventsFiber = yield* adapter.streamEvents.pipe( + Stream.filter((event) => event.threadId === asThreadId("thread-send-turn-failure")), + Stream.runForEach((event) => + Effect.sync(() => { + observedEvents.push(event); + }), + ), + Effect.forkChild, + ); + yield* Effect.yieldNow; + runtimeMock.state.promptAsyncError = new Error("prompt failed"); const error = yield* adapter .sendTurn({ @@ -378,6 +391,8 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { }) .pipe(Effect.flip); const sessions = yield* adapter.listSessions(); + yield* Effect.yieldNow; + yield* Fiber.interrupt(eventsFiber); assert.equal(error._tag, "ProviderAdapterRequestError"); if (error._tag !== "ProviderAdapterRequestError") { @@ -392,6 +407,81 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { assert.equal(sessions[0]?.status, "ready"); assert.equal(sessions[0]?.activeTurnId, undefined); assert.equal(sessions[0]?.lastError, "prompt failed"); + assert.equal( + observedEvents.some((event) => event.type === "turn.aborted"), + false, + ); + }), + ); + + it.effect("clears active session state after interrupting a turn", () => + Effect.gen(function* () { + const adapter = yield* OpenCodeAdapter; + const threadId = asThreadId("thread-interrupt-turn"); + yield* adapter.startSession({ + provider: ProviderDriverKind.make("opencode"), + threadId, + runtimeMode: "full-access", + }); + + const started = yield* adapter.sendTurn({ + threadId, + input: "Stop this", + modelSelection: { + instanceId: ProviderInstanceId.make("opencode"), + model: "openai/gpt-5", + }, + }); + const runningSessions = yield* adapter.listSessions(); + const runningSession = runningSessions.find((session) => session.threadId === threadId); + + assert.equal(runningSession?.status, "running"); + assert.equal(runningSession?.activeTurnId, started.turnId); + + yield* adapter.interruptTurn(threadId, started.turnId); + const interruptedSessions = yield* adapter.listSessions(); + const interruptedSession = interruptedSessions.find( + (session) => session.threadId === threadId, + ); + + assert.equal(runtimeMock.state.abortCalls.at(-1), "http://127.0.0.1:9999/session"); + assert.equal(interruptedSession?.status, "ready"); + assert.equal(interruptedSession?.activeTurnId, undefined); + assert.equal(interruptedSession?.lastError, undefined); + }), + ); + + it.effect("allows an interrupted OpenCode turn to be stopped cleanly", () => + Effect.gen(function* () { + const adapter = yield* OpenCodeAdapter; + const threadId = asThreadId("thread-interrupt-then-stop"); + yield* adapter.startSession({ + provider: ProviderDriverKind.make("opencode"), + threadId, + runtimeMode: "full-access", + }); + + const started = yield* adapter.sendTurn({ + threadId, + input: "Steer then stop this", + modelSelection: { + instanceId: ProviderInstanceId.make("opencode"), + model: "openai/gpt-5", + }, + }); + + yield* adapter.interruptTurn(threadId, started.turnId); + const readySessions = yield* adapter.listSessions(); + const readySession = readySessions.find((session) => session.threadId === threadId); + assert.equal(readySession?.status, "ready"); + assert.equal(readySession?.activeTurnId, undefined); + + yield* adapter.stopSession(threadId); + const stoppedSessions = yield* adapter.listSessions(); + const stoppedSession = stoppedSessions.find((session) => session.threadId === threadId); + + assert.equal(stoppedSession, undefined); + assert.equal(runtimeMock.state.abortCalls.at(-1), "http://127.0.0.1:9999/session"); }), ); diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.ts index 512e9ed6bfe..ad5c4358049 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.ts @@ -1217,9 +1217,10 @@ export function makeOpenCodeAdapter( ).pipe( Effect.mapError(toRequestError), // On failure: clear active-turn state, flip the session back to ready - // with lastError set, emit turn.aborted, then let the typed error - // propagate. We don't need to rebuild the error here — `toRequestError` - // already produced the right shape. + // with lastError set, then let the typed error propagate. We don't + // emit turn.aborted here because prompt-start failures are not user + // interrupts, and the orchestration layer already records the + // provider.turn.start.failed activity from the typed error. Effect.tapError((requestError) => Effect.gen(function* () { context.activeTurnId = undefined; @@ -1234,16 +1235,6 @@ export function makeOpenCodeAdapter( }, { clearActiveTurnId: true }, ); - yield* emit({ - ...(yield* buildEventBase({ - threadId: input.threadId, - turnId, - })), - type: "turn.aborted", - payload: { - reason: requestError.detail, - }, - }); }), ), ); @@ -1257,14 +1248,19 @@ export function makeOpenCodeAdapter( const interruptTurn: OpenCodeAdapterShape["interruptTurn"] = Effect.fn("interruptTurn")( function* (threadId, turnId) { const context = ensureSessionContext(sessions, threadId); + const interruptedTurnId = turnId ?? context.activeTurnId; yield* runOpenCodeSdk("session.abort", () => context.client.session.abort({ sessionID: context.openCodeSessionId }), ).pipe(Effect.mapError(toRequestError)); - if (turnId ?? context.activeTurnId) { + context.activeTurnId = undefined; + context.activeAgent = undefined; + context.activeVariant = undefined; + yield* updateProviderSession(context, { status: "ready" }, { clearActiveTurnId: true }); + if (interruptedTurnId) { yield* emit({ ...(yield* buildEventBase({ threadId, - turnId: turnId ?? context.activeTurnId, + turnId: interruptedTurnId, })), type: "turn.aborted", payload: { diff --git a/apps/web/src/store.test.ts b/apps/web/src/store.test.ts index 59ebd0cea0c..fb24a397ad8 100644 --- a/apps/web/src/store.test.ts +++ b/apps/web/src/store.test.ts @@ -6,6 +6,7 @@ import { EventId, MessageId, ProjectId, + ProviderDriverKind, ProviderInstanceId, ThreadId, TurnId, @@ -87,6 +88,27 @@ function makeThread(overrides: Partial = {}): Thread { }; } +function makeThreadWithRunningTurn(turnId = TurnId.make("turn-1")): Thread { + return makeThread({ + session: { + provider: ProviderDriverKind.make("codex"), + status: "running", + orchestrationStatus: "running", + activeTurnId: turnId, + createdAt: "2026-02-27T00:00:00.000Z", + updatedAt: "2026-02-27T00:00:01.000Z", + }, + latestTurn: { + turnId, + state: "running", + requestedAt: "2026-02-27T00:00:00.000Z", + startedAt: "2026-02-27T00:00:01.000Z", + completedAt: null, + assistantMessageId: null, + }, + }); +} + function makeState(thread: Thread): AppState { const projectId = ProjectId.make("project-1"); const project = { @@ -781,6 +803,110 @@ describe("incremental orchestration updates", () => { expect(threadsOf(next)[0]?.messages).toHaveLength(1); }); + it("settles a running latest turn when the session is stopped", () => { + const thread = makeThreadWithRunningTurn(); + + const next = applyOrchestrationEvent( + makeState(thread), + makeEvent("thread.session-set", { + threadId: thread.id, + session: { + threadId: thread.id, + status: "stopped", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: null, + lastError: "Provider runtime is no longer active.", + updatedAt: "2026-02-27T00:00:05.000Z", + }, + }), + localEnvironmentId, + ); + + expect(threadsOf(next)[0]?.session?.status).toBe("closed"); + expect(threadsOf(next)[0]?.latestTurn).toMatchObject({ + turnId: TurnId.make("turn-1"), + state: "interrupted", + completedAt: "2026-02-27T00:00:05.000Z", + }); + }); + + it("settles a running latest turn across optimistic stop and server confirmation", () => { + const thread = makeThreadWithRunningTurn(); + const stopRequestedAt = "2026-02-27T00:00:04.000Z"; + + const afterOptimisticStop = applyOrchestrationEvent( + makeState(thread), + makeEvent("thread.session-stop-requested", { + threadId: thread.id, + createdAt: stopRequestedAt, + }), + localEnvironmentId, + ); + + expect(threadsOf(afterOptimisticStop)[0]?.session).toMatchObject({ + status: "closed", + activeTurnId: undefined, + }); + expect(threadsOf(afterOptimisticStop)[0]?.latestTurn).toMatchObject({ + turnId: TurnId.make("turn-1"), + state: "interrupted", + completedAt: stopRequestedAt, + }); + + const afterServerConfirmation = applyOrchestrationEvent( + afterOptimisticStop, + makeEvent("thread.session-set", { + threadId: thread.id, + session: { + threadId: thread.id, + status: "stopped", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: null, + lastError: null, + updatedAt: "2026-02-27T00:00:05.000Z", + }, + }), + localEnvironmentId, + ); + + expect(threadsOf(afterServerConfirmation)[0]?.session?.status).toBe("closed"); + expect(threadsOf(afterServerConfirmation)[0]?.latestTurn).toMatchObject({ + turnId: TurnId.make("turn-1"), + state: "interrupted", + completedAt: stopRequestedAt, + }); + }); + + it("preserves a failed latest turn when the session enters error", () => { + const thread = makeThreadWithRunningTurn(); + + const next = applyOrchestrationEvent( + makeState(thread), + makeEvent("thread.session-set", { + threadId: thread.id, + session: { + threadId: thread.id, + status: "error", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: null, + lastError: "Prompt failed.", + updatedAt: "2026-02-27T00:00:05.000Z", + }, + }), + localEnvironmentId, + ); + + expect(threadsOf(next)[0]?.session?.status).toBe("error"); + expect(threadsOf(next)[0]?.latestTurn).toMatchObject({ + turnId: TurnId.make("turn-1"), + state: "error", + completedAt: "2026-02-27T00:00:05.000Z", + }); + }); + it("does not regress latestTurn when an older turn diff completes late", () => { const state = makeState( makeThread({ diff --git a/apps/web/src/store.ts b/apps/web/src/store.ts index e9972f7c9a8..2be2ed3f34d 100644 --- a/apps/web/src/store.ts +++ b/apps/web/src/store.ts @@ -879,6 +879,42 @@ function buildLatestTurn(params: { }; } +function latestTurnStateFromInactiveSessionStatus( + status: OrchestrationSessionStatus, +): NonNullable["state"] | null { + switch (status) { + case "error": + return "error"; + case "ready": + case "interrupted": + case "stopped": + return "interrupted"; + default: + return null; + } +} + +function settleRunningLatestTurnForInactiveSession(input: { + latestTurn: Thread["latestTurn"]; + nextSessionStatus: OrchestrationSessionStatus; + completedAt: string; +}): Thread["latestTurn"] { + const latestTurn = input.latestTurn; + const nextState = latestTurnStateFromInactiveSessionStatus(input.nextSessionStatus); + if (nextState === null || latestTurn === null || latestTurn.state !== "running") { + return latestTurn; + } + return buildLatestTurn({ + previous: latestTurn, + turnId: latestTurn.turnId, + state: nextState, + requestedAt: latestTurn.requestedAt, + startedAt: latestTurn.startedAt ?? input.completedAt, + completedAt: latestTurn.completedAt ?? input.completedAt, + assistantMessageId: latestTurn.assistantMessageId, + }); +} + function rebindTurnDiffSummariesForAssistantMessage( turnDiffSummaries: ReadonlyArray, turnId: TurnId, @@ -1473,7 +1509,11 @@ function applyEnvironmentOrchestrationEvent( : null, sourceProposedPlan: thread.pendingSourceProposedPlan, }) - : thread.latestTurn, + : settleRunningLatestTurnForInactiveSession({ + latestTurn: thread.latestTurn, + nextSessionStatus: event.payload.session.status, + completedAt: event.payload.session.updatedAt, + }), updatedAt: event.occurredAt, })); @@ -1490,6 +1530,11 @@ function applyEnvironmentOrchestrationEvent( activeTurnId: undefined, updatedAt: event.payload.createdAt, }, + latestTurn: settleRunningLatestTurnForInactiveSession({ + latestTurn: thread.latestTurn, + nextSessionStatus: "stopped", + completedAt: event.payload.createdAt, + }), updatedAt: event.occurredAt, }, );