From 559a3a558245604d1bee1041624c044fd889e301 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Mon, 11 May 2026 22:57:57 +0100 Subject: [PATCH 01/15] Handle stale provider runtime state --- .../Layers/ProjectionPipeline.test.ts | 157 ++++++++++ .../Layers/ProjectionPipeline.ts | 30 +- .../Layers/ProviderCommandReactor.test.ts | 293 +++++++++++++++++- .../Layers/ProviderCommandReactor.ts | 89 +++++- apps/web/src/store.test.ts | 46 +++ apps/web/src/store.ts | 32 +- 6 files changed, 626 insertions(+), 21 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index 369eea0f7a0..8fcbea1bda5 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -2172,6 +2172,163 @@ it.effect("restores pending turn-start metadata across projection pipeline resta ), ); +it.effect( + "settles running turns during a clean projection rebuild without thread latest-turn state", + () => + Effect.gen(function* () { + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const sql = yield* SqlClient.SqlClient; + const projectId = ProjectId.make("project-rebuild-settle"); + const threadId = ThreadId.make("thread-rebuild-settle"); + const turnId = TurnId.make("turn-rebuild-settle"); + const messageId = MessageId.make("message-rebuild-settle"); + const createdAt = "2026-02-26T15:00:00.000Z"; + const runningAt = "2026-02-26T15:00:05.000Z"; + const stoppedAt = "2026-02-26T15:00:10.000Z"; + + yield* eventStore.append({ + type: "project.created", + eventId: EventId.make("evt-rebuild-settle-project"), + aggregateKind: "project", + aggregateId: projectId, + occurredAt: createdAt, + commandId: CommandId.make("cmd-rebuild-settle-project"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-rebuild-settle-project"), + metadata: {}, + payload: { + projectId, + title: "Project Rebuild Settle", + workspaceRoot: "/tmp/project-rebuild-settle", + defaultModelSelection: null, + scripts: [], + createdAt, + updatedAt: createdAt, + }, + }); + yield* eventStore.append({ + type: "thread.created", + eventId: EventId.make("evt-rebuild-settle-thread"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: createdAt, + commandId: CommandId.make("cmd-rebuild-settle-thread"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-rebuild-settle-thread"), + metadata: {}, + payload: { + threadId, + projectId, + title: "Thread Rebuild Settle", + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + model: "gpt-5-codex", + }, + runtimeMode: "approval-required", + interactionMode: "default", + branch: null, + worktreePath: null, + createdAt, + updatedAt: createdAt, + }, + }); + yield* eventStore.append({ + type: "thread.turn-start-requested", + eventId: EventId.make("evt-rebuild-settle-turn-start"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: createdAt, + commandId: CommandId.make("cmd-rebuild-settle-turn-start"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-rebuild-settle-turn-start"), + metadata: {}, + payload: { + threadId, + messageId, + runtimeMode: "approval-required", + createdAt, + }, + }); + yield* eventStore.append({ + type: "thread.session-set", + eventId: EventId.make("evt-rebuild-settle-running"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: runningAt, + commandId: CommandId.make("cmd-rebuild-settle-running"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-rebuild-settle-running"), + metadata: {}, + payload: { + threadId, + session: { + threadId, + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: turnId, + lastError: null, + updatedAt: runningAt, + }, + }, + }); + yield* eventStore.append({ + type: "thread.session-set", + eventId: EventId.make("evt-rebuild-settle-stopped"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: stoppedAt, + commandId: CommandId.make("cmd-rebuild-settle-stopped"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-rebuild-settle-stopped"), + metadata: {}, + payload: { + threadId, + session: { + threadId, + status: "stopped", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: null, + lastError: "Provider runtime is no longer active.", + updatedAt: stoppedAt, + }, + }, + }); + + yield* projectionPipeline.bootstrap; + + const turnRows = yield* sql<{ + readonly turnId: string; + readonly state: string; + readonly completedAt: string | null; + }>` + SELECT + turn_id AS "turnId", + state, + completed_at AS "completedAt" + FROM projection_turns + WHERE thread_id = ${threadId} + AND turn_id = ${turnId} + `; + + assert.deepEqual(turnRows, [ + { + turnId, + state: "interrupted", + completedAt: stoppedAt, + }, + ]); + }).pipe( + Effect.provide( + makeProjectionPipelinePrefixedTestLayer("t3-projection-pipeline-rebuild-settle-"), + ), + ), +); + const engineLayer = it.layer( OrchestrationEngineLive.pipe( Layer.provide(OrchestrationProjectionSnapshotQueryLive), diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 1161ff6a7d7..a3b7c7f37fc 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -3,6 +3,7 @@ import { type ChatAttachment, type OrchestrationEvent, ThreadId, + type TurnId, } from "@t3tools/contracts"; import * as Effect from "effect/Effect"; import * as FileSystem from "effect/FileSystem"; @@ -712,9 +713,12 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti if (Option.isNone(existingRow)) { return; } + const latestTurnId = + event.payload.session.activeTurnId ?? + (event.payload.session.status === "running" ? null : existingRow.value.latestTurnId); yield* projectionThreadRepository.upsert({ ...existingRow.value, - latestTurnId: event.payload.session.activeTurnId, + latestTurnId, updatedAt: event.occurredAt, }); yield* refreshThreadShellSummary(event.payload.threadId); @@ -998,6 +1002,30 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti case "thread.session-set": { const turnId = event.payload.session.activeTurnId; if (turnId === null || event.payload.session.status !== "running") { + const turns = yield* projectionTurnRepository.listByThreadId({ + threadId: event.payload.threadId, + }); + const latestRunningTurn = turns + .filter( + (turn): turn is ProjectionTurn & { readonly turnId: TurnId } => + turn.turnId !== null && turn.state === "running", + ) + .reduce<(ProjectionTurn & { readonly turnId: TurnId }) | null>( + (latest, turn) => + latest === null || turn.requestedAt > latest.requestedAt ? turn : latest, + null, + ); + if (latestRunningTurn === null) { + return; + } + + yield* projectionTurnRepository.upsertByTurnId({ + ...latestRunningTurn, + state: "interrupted", + startedAt: latestRunningTurn.startedAt ?? event.payload.session.updatedAt, + requestedAt: latestRunningTurn.requestedAt ?? event.payload.session.updatedAt, + completedAt: latestRunningTurn.completedAt ?? event.payload.session.updatedAt, + }); return; } diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index 571164fad93..a9658427471 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -142,6 +142,9 @@ describe("ProviderCommandReactor", () => { readonly baseDir?: string; readonly threadModelSelection?: ModelSelection; readonly sessionModelSwitch?: "unsupported" | "in-session"; + readonly stopSessionOverride?: ProviderServiceShape["stopSession"]; + readonly listSessionsOverride?: ProviderServiceShape["listSessions"]; + readonly startReactor?: boolean; }) { const now = "2026-01-01T00:00:00.000Z"; const baseDir = input?.baseDir ?? fs.mkdtempSync(path.join(os.tmpdir(), "t3code-reactor-")); @@ -220,20 +223,16 @@ describe("ProviderCommandReactor", () => { const interruptTurn = vi.fn((_: unknown) => Effect.void); const respondToRequest = vi.fn(() => Effect.void); const respondToUserInput = vi.fn(() => Effect.void); - const stopSession = vi.fn((input: unknown) => - Effect.sync(() => { - const threadId = - typeof input === "object" && input !== null && "threadId" in input - ? (input as { threadId?: ThreadId }).threadId - : undefined; - if (!threadId) { - return; - } - const index = runtimeSessions.findIndex((session) => session.threadId === threadId); - if (index >= 0) { - runtimeSessions.splice(index, 1); - } - }), + const stopSession = vi.fn( + input?.stopSessionOverride ?? + ((input) => + Effect.sync(() => { + const threadId = input.threadId; + const index = runtimeSessions.findIndex((session) => session.threadId === threadId); + if (index >= 0) { + runtimeSessions.splice(index, 1); + } + })), ); const renameBranch = vi.fn((input: unknown) => Effect.succeed({ @@ -289,7 +288,7 @@ describe("ProviderCommandReactor", () => { respondToRequest: respondToRequest as ProviderServiceShape["respondToRequest"], respondToUserInput: respondToUserInput as ProviderServiceShape["respondToUserInput"], stopSession: stopSession as ProviderServiceShape["stopSession"], - listSessions: () => Effect.succeed(runtimeSessions), + listSessions: input?.listSessionsOverride ?? (() => Effect.succeed(runtimeSessions)), getCapabilities: (_provider) => Effect.succeed({ sessionModelSwitch: input?.sessionModelSwitch ?? "in-session", @@ -364,8 +363,12 @@ describe("ProviderCommandReactor", () => { const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); const snapshotQuery = await runtime.runPromise(Effect.service(ProjectionSnapshotQuery)); const reactor = await runtime.runPromise(Effect.service(ProviderCommandReactor)); - scope = await Effect.runPromise(Scope.make("sequential")); - await Effect.runPromise(reactor.start().pipe(Scope.provide(scope))); + const reactorScope = await Effect.runPromise(Scope.make("sequential")); + scope = reactorScope; + const startReactor = () => Effect.runPromise(reactor.start().pipe(Scope.provide(reactorScope))); + if (input?.startReactor !== false) { + await startReactor(); + } const drain = () => Effect.runPromise(reactor.drain); await Effect.runPromise( @@ -410,6 +413,7 @@ describe("ProviderCommandReactor", () => { generateThreadTitle, runtimeSessions, stateDir, + startReactor, drain, }; } @@ -1605,6 +1609,191 @@ describe("ProviderCommandReactor", () => { }); }); + it("marks projected running sessions stopped on startup when no live provider session exists", async () => { + const harness = await createHarness({ startReactor: false }); + const now = "2026-01-01T00:00:00.000Z"; + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make("cmd-session-set-stale-running"), + threadId: ThreadId.make("thread-1"), + session: { + threadId: ThreadId.make("thread-1"), + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: asTurnId("turn-stale"), + lastError: null, + updatedAt: now, + }, + createdAt: now, + }), + ); + + await harness.startReactor(); + + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(thread?.session).toMatchObject({ + threadId: ThreadId.make("thread-1"), + status: "stopped", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: null, + lastError: "Provider runtime is no longer active. Start a new turn to reconnect this thread.", + }); + expect(thread?.latestTurn).toMatchObject({ + turnId: asTurnId("turn-stale"), + state: "interrupted", + }); + }); + + it("leaves projected running sessions alone on startup when a live provider session exists", async () => { + const harness = await createHarness({ startReactor: false }); + const now = "2026-01-01T00:00:00.000Z"; + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make("cmd-session-set-live-running"), + threadId: ThreadId.make("thread-1"), + session: { + threadId: ThreadId.make("thread-1"), + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: asTurnId("turn-live"), + lastError: null, + updatedAt: now, + }, + createdAt: now, + }), + ); + harness.runtimeSessions.push({ + provider: ProviderDriverKind.make("codex"), + providerInstanceId: ProviderInstanceId.make("codex"), + status: "running", + runtimeMode: "approval-required", + threadId: ThreadId.make("thread-1"), + cwd: "/tmp/provider-project", + resumeCursor: { opaque: "resume-live" }, + activeTurnId: asTurnId("turn-live"), + createdAt: now, + updatedAt: now, + }); + + await harness.startReactor(); + + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(thread?.session).toMatchObject({ + status: "running", + activeTurnId: asTurnId("turn-live"), + lastError: null, + }); + }); + + it("marks projected running sessions stopped on startup when the live provider session is not running the same turn", async () => { + const harness = await createHarness({ startReactor: false }); + const now = "2026-01-01T00:00:00.000Z"; + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make("cmd-session-set-live-ready"), + threadId: ThreadId.make("thread-1"), + session: { + threadId: ThreadId.make("thread-1"), + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: asTurnId("turn-stale-live-ready"), + lastError: null, + updatedAt: now, + }, + createdAt: now, + }), + ); + harness.runtimeSessions.push({ + provider: ProviderDriverKind.make("codex"), + providerInstanceId: ProviderInstanceId.make("codex"), + status: "ready", + runtimeMode: "approval-required", + threadId: ThreadId.make("thread-1"), + cwd: "/tmp/provider-project", + resumeCursor: { opaque: "resume-ready" }, + createdAt: now, + updatedAt: now, + }); + + await harness.startReactor(); + + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(thread?.session).toMatchObject({ + status: "stopped", + activeTurnId: null, + lastError: "Provider runtime is no longer active. Start a new turn to reconnect this thread.", + }); + expect(thread?.latestTurn).toMatchObject({ + turnId: asTurnId("turn-stale-live-ready"), + state: "interrupted", + }); + }); + + it("leaves projected running sessions alone on startup when live provider sessions cannot be listed", async () => { + const harness = await createHarness({ + startReactor: false, + listSessionsOverride: () => + Effect.die( + new ProviderAdapterRequestError({ + provider: "codex", + method: "ProviderService.listSessions", + detail: "Provider runtime did not answer.", + }), + ), + }); + const now = "2026-01-01T00:00:00.000Z"; + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make("cmd-session-set-list-failure"), + threadId: ThreadId.make("thread-1"), + session: { + threadId: ThreadId.make("thread-1"), + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: asTurnId("turn-list-failure"), + lastError: null, + updatedAt: now, + }, + createdAt: now, + }), + ); + + await harness.startReactor(); + + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(thread?.session).toMatchObject({ + status: "running", + activeTurnId: asTurnId("turn-list-failure"), + lastError: null, + }); + expect(thread?.latestTurn).toMatchObject({ + turnId: asTurnId("turn-list-failure"), + state: "running", + }); + }); + it("rejects active runtime sessions that are missing provider instance ids", async () => { const harness = await createHarness(); const now = "2026-01-01T00:00:00.000Z"; @@ -2007,4 +2196,74 @@ describe("ProviderCommandReactor", () => { expect(thread?.session?.providerInstanceId).toBe(ProviderInstanceId.make("codex_work")); expect(thread?.session?.activeTurnId).toBeNull(); }); + + it("clears thread session state when provider session stop fails", async () => { + const now = "2026-01-01T00:00:00.000Z"; + const harness = await createHarness({ + stopSessionOverride: () => + Effect.fail( + new ProviderAdapterRequestError({ + provider: "codex", + method: "ProviderService.stopSession", + detail: "No live provider session was found for this thread.", + }), + ), + }); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make("cmd-session-set-for-stop-failure"), + threadId: ThreadId.make("thread-1"), + session: { + threadId: ThreadId.make("thread-1"), + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex_work"), + runtimeMode: "approval-required", + activeTurnId: asTurnId("turn-running"), + lastError: null, + updatedAt: now, + }, + createdAt: now, + }), + ); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.stop", + commandId: CommandId.make("cmd-session-stop-failure"), + threadId: ThreadId.make("thread-1"), + createdAt: now, + }), + ); + + await waitFor(async () => { + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + return ( + thread?.session?.status === "stopped" && + thread.session.activeTurnId === null && + thread.activities.some((activity) => activity.kind === "provider.session.stop.failed") + ); + }); + + expect(harness.stopSession.mock.calls.length).toBe(1); + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(thread?.session).not.toBeNull(); + expect(thread?.session?.status).toBe("stopped"); + expect(thread?.session?.providerInstanceId).toBe(ProviderInstanceId.make("codex_work")); + expect(thread?.session?.activeTurnId).toBeNull(); + expect(thread?.session?.lastError).toBe("No live provider session was found for this thread."); + + const failureActivity = thread?.activities.find( + (activity) => activity.kind === "provider.session.stop.failed", + ); + expect(failureActivity).toBeDefined(); + expect(failureActivity?.turnId).toBe(asTurnId("turn-running")); + expect(failureActivity?.payload).toMatchObject({ + detail: "No live provider session was found for this thread.", + }); + }); }); diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index 8b71a976808..5d07585efd9 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -15,9 +15,11 @@ import { import { isTemporaryWorktreeBranch, WORKTREE_BRANCH_PREFIX } from "@t3tools/shared/git"; import * as Cache from "effect/Cache"; import * as Cause from "effect/Cause"; +import * as DateTime from "effect/DateTime"; import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Equal from "effect/Equal"; +import * as Exit from "effect/Exit"; import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as Schema from "effect/Schema"; @@ -88,6 +90,8 @@ const HANDLED_TURN_START_KEY_MAX = 10_000; const HANDLED_TURN_START_KEY_TTL = Duration.minutes(30); const DEFAULT_RUNTIME_MODE: RuntimeMode = "full-access"; const DEFAULT_THREAD_TITLE = "New thread"; +const STALE_RUNNING_SESSION_DETAIL = + "Provider runtime is no longer active. Start a new turn to reconnect this thread."; export function providerErrorLabel(value: string | undefined): string { const normalized = value?.trim(); @@ -280,6 +284,57 @@ const make = Effect.gen(function* () { }); }); + const reconcileStaleRunningSessions = Effect.fn("reconcileStaleRunningSessions")(function* () { + const snapshot = yield* projectionSnapshotQuery.getSnapshot(); + const liveSessionsExit = yield* Effect.exit(providerService.listSessions()); + if (Exit.isFailure(liveSessionsExit)) { + // Runtime state is unknowable here; leave projected sessions untouched rather than + // incorrectly stopping work that may still be active while the provider recovers. + yield* Effect.logWarning( + "provider command reactor skipped stale session reconciliation because live sessions could not be listed", + { + cause: Cause.pretty(liveSessionsExit.cause), + }, + ); + return; + } + + const liveSessionsByThreadId = new Map( + liveSessionsExit.value.map((session) => [session.threadId, session]), + ); + const now = DateTime.formatIso(yield* DateTime.now); + yield* Effect.forEach( + snapshot.threads, + (thread) => { + const session = thread.session; + if (session === null || session.status !== "running" || session.activeTurnId === null) { + return Effect.void; + } + + const liveSession = liveSessionsByThreadId.get(thread.id); + if ( + liveSession?.status === "running" && + liveSession.activeTurnId === session.activeTurnId + ) { + return Effect.void; + } + + return setThreadSession({ + threadId: thread.id, + session: { + ...session, + status: "stopped", + activeTurnId: null, + lastError: session.lastError ?? STALE_RUNNING_SESSION_DETAIL, + updatedAt: now, + }, + createdAt: now, + }); + }, + { discard: true }, + ); + }); + const resolveProject = Effect.fnUntraced(function* (projectId: ProjectId) { return yield* projectionSnapshotQuery .getProjectShellById(projectId) @@ -909,8 +964,31 @@ const make = Effect.gen(function* () { } const now = event.payload.createdAt; + let stopFailureDetail: string | null = null; if (thread.session && thread.session.status !== "stopped") { - yield* providerService.stopSession({ threadId: thread.id }); + const stopExit = yield* Effect.exit(providerService.stopSession({ threadId: thread.id })); + if (Exit.isFailure(stopExit)) { + stopFailureDetail = formatFailureDetail(stopExit.cause); + yield* appendProviderFailureActivity({ + threadId: thread.id, + kind: "provider.session.stop.failed", + summary: "Provider session stop failed", + detail: stopFailureDetail, + turnId: thread.session.activeTurnId, + createdAt: now, + }).pipe( + Effect.catchCause((activityCause) => + Effect.logWarning( + "provider command reactor failed to record provider session stop failure", + { + threadId: thread.id, + cause: Cause.pretty(activityCause), + originalCause: Cause.pretty(stopExit.cause), + }, + ), + ), + ); + } } yield* setThreadSession({ @@ -924,7 +1002,7 @@ const make = Effect.gen(function* () { : {}), runtimeMode: thread.session?.runtimeMode ?? DEFAULT_RUNTIME_MODE, activeTurnId: null, - lastError: thread.session?.lastError ?? null, + lastError: stopFailureDetail ?? thread.session?.lastError ?? null, updatedAt: now, }, createdAt: now, @@ -1006,6 +1084,13 @@ const make = Effect.gen(function* () { yield* Effect.forkScoped( Stream.runForEach(orchestrationEngine.streamDomainEvents, processEvent), ); + yield* reconcileStaleRunningSessions().pipe( + Effect.catchCause((cause) => + Effect.logWarning("provider command reactor failed to reconcile stale running sessions", { + cause: Cause.pretty(cause), + }), + ), + ); }); return { diff --git a/apps/web/src/store.test.ts b/apps/web/src/store.test.ts index 59ebd0cea0c..1322345847a 100644 --- a/apps/web/src/store.test.ts +++ b/apps/web/src/store.test.ts @@ -6,6 +6,7 @@ import { EventId, MessageId, ProjectId, + ProviderDriverKind, ProviderInstanceId, ThreadId, TurnId, @@ -781,6 +782,51 @@ describe("incremental orchestration updates", () => { expect(threadsOf(next)[0]?.messages).toHaveLength(1); }); + it("settles a running latest turn when the session is stopped", () => { + const thread = makeThread({ + session: { + provider: ProviderDriverKind.make("codex"), + status: "running", + orchestrationStatus: "running", + activeTurnId: TurnId.make("turn-1"), + createdAt: "2026-02-27T00:00:00.000Z", + updatedAt: "2026-02-27T00:00:01.000Z", + }, + latestTurn: { + turnId: TurnId.make("turn-1"), + state: "running", + requestedAt: "2026-02-27T00:00:00.000Z", + startedAt: "2026-02-27T00:00:01.000Z", + completedAt: null, + assistantMessageId: null, + }, + }); + + const next = applyOrchestrationEvent( + makeState(thread), + makeEvent("thread.session-set", { + threadId: thread.id, + session: { + threadId: thread.id, + status: "stopped", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: null, + lastError: "Provider runtime is no longer active.", + updatedAt: "2026-02-27T00:00:05.000Z", + }, + }), + localEnvironmentId, + ); + + expect(threadsOf(next)[0]?.session?.status).toBe("closed"); + expect(threadsOf(next)[0]?.latestTurn).toMatchObject({ + turnId: TurnId.make("turn-1"), + state: "interrupted", + completedAt: "2026-02-27T00:00:05.000Z", + }); + }); + it("does not regress latestTurn when an older turn diff completes late", () => { const state = makeState( makeThread({ diff --git a/apps/web/src/store.ts b/apps/web/src/store.ts index e9972f7c9a8..00ad2d12629 100644 --- a/apps/web/src/store.ts +++ b/apps/web/src/store.ts @@ -879,6 +879,32 @@ function buildLatestTurn(params: { }; } +function settleLatestTurnForInactiveSession(input: { + latestTurn: Thread["latestTurn"]; + previousSession: Thread["session"]; + completedAt: string; +}): Thread["latestTurn"] { + const latestTurn = input.latestTurn; + const previousActiveTurnId = input.previousSession?.activeTurnId; + if ( + latestTurn === null || + latestTurn.state !== "running" || + previousActiveTurnId === undefined || + latestTurn.turnId !== previousActiveTurnId + ) { + return latestTurn; + } + return buildLatestTurn({ + previous: latestTurn, + turnId: latestTurn.turnId, + state: "interrupted", + requestedAt: latestTurn.requestedAt, + startedAt: latestTurn.startedAt ?? input.completedAt, + completedAt: latestTurn.completedAt ?? input.completedAt, + assistantMessageId: latestTurn.assistantMessageId, + }); +} + function rebindTurnDiffSummariesForAssistantMessage( turnDiffSummaries: ReadonlyArray, turnId: TurnId, @@ -1473,7 +1499,11 @@ function applyEnvironmentOrchestrationEvent( : null, sourceProposedPlan: thread.pendingSourceProposedPlan, }) - : thread.latestTurn, + : settleLatestTurnForInactiveSession({ + latestTurn: thread.latestTurn, + previousSession: thread.session, + completedAt: event.payload.session.updatedAt, + }), updatedAt: event.occurredAt, })); From c0bea1c185d97c1541f02b61b5bbf4ad7ed33652 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Mon, 11 May 2026 23:33:18 +0100 Subject: [PATCH 02/15] Handle aborted runtime turns --- .../Layers/ProviderRuntimeIngestion.test.ts | 98 +++++++++++++++++++ .../Layers/ProviderRuntimeIngestion.ts | 8 +- 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 3b2411cba2a..0af5fa69edd 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -360,6 +360,48 @@ describe("ProviderRuntimeIngestion", () => { expect(thread.session?.lastError).toBe("turn failed"); }); + it("maps turn aborted events into ready thread session updates", async () => { + const harness = await createHarness(); + const now = "2026-01-01T00:00:00.000Z"; + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-before-abort"), + provider: ProviderDriverKind.make("codex"), + threadId: asThreadId("thread-1"), + createdAt: now, + turnId: asTurnId("turn-aborted"), + }); + + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "running" && thread.session?.activeTurnId === "turn-aborted", + ); + + harness.emit({ + type: "turn.aborted", + eventId: asEventId("evt-turn-aborted"), + provider: ProviderDriverKind.make("codex"), + threadId: asThreadId("thread-1"), + createdAt: "2026-01-01T00:00:01.000Z", + turnId: asTurnId("turn-aborted"), + payload: { + reason: "Interrupted by user.", + }, + }); + + const thread = await waitForThread( + harness.readModel, + (entry) => + entry.session?.status === "ready" && + entry.session?.activeTurnId === null && + entry.session?.lastError === null, + ); + expect(thread.session?.status).toBe("ready"); + expect(thread.session?.activeTurnId).toBeNull(); + }); + it("applies provider session.state.changed transitions directly", async () => { const harness = await createHarness(); const waitingAt = "2026-01-01T00:00:00.000Z"; @@ -610,6 +652,62 @@ describe("ProviderRuntimeIngestion", () => { ); }); + it("ignores auxiliary turn aborts for a non-active turn", async () => { + const harness = await createHarness(); + const now = "2026-01-01T00:00:00.000Z"; + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-before-aux-abort"), + provider: ProviderDriverKind.make("codex"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-primary-abort"), + }); + + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "running" && + thread.session?.activeTurnId === "turn-primary-abort", + ); + + harness.emit({ + type: "turn.aborted", + eventId: asEventId("evt-turn-aborted-aux"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:01.000Z", + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-aux-abort"), + payload: { + reason: "Auxiliary turn aborted.", + }, + }); + + await harness.drain(); + const midReadModel = await harness.readModel(); + const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(midThread?.session?.status).toBe("running"); + expect(midThread?.session?.activeTurnId).toBe("turn-primary-abort"); + + harness.emit({ + type: "turn.aborted", + eventId: asEventId("evt-turn-aborted-primary"), + provider: ProviderDriverKind.make("codex"), + createdAt: "2026-01-01T00:00:02.000Z", + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-primary-abort"), + payload: { + reason: "Primary turn aborted.", + }, + }); + + await waitForThread( + harness.readModel, + (thread) => thread.session?.status === "ready" && thread.session?.activeTurnId === null, + ); + }); + it("ignores non-active turn completion when runtime omits thread id", async () => { const harness = await createHarness(); const now = "2026-01-01T00:00:00.000Z"; diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 2c07ac91b1e..6499a0b7fad 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -1211,6 +1211,7 @@ const make = Effect.gen(function* () { return true; case "turn.started": return !conflictsWithActiveTurn; + case "turn.aborted": case "turn.completed": if (conflictsWithActiveTurn || missingTurnForActiveTurn) { return false; @@ -1236,12 +1237,15 @@ const make = Effect.gen(function* () { event.type === "session.exited" || event.type === "thread.started" || event.type === "turn.started" || + event.type === "turn.aborted" || event.type === "turn.completed" ) { const nextActiveTurnId = event.type === "turn.started" ? (eventTurnId ?? null) - : event.type === "turn.completed" || event.type === "session.exited" + : event.type === "turn.aborted" || + event.type === "turn.completed" || + event.type === "session.exited" ? null : activeTurnId; const status = (() => { @@ -1252,6 +1256,8 @@ const make = Effect.gen(function* () { return "running"; case "session.exited": return "stopped"; + case "turn.aborted": + return "ready"; case "turn.completed": return normalizeRuntimeTurnState(event.payload.state) === "failed" ? "error" From b57c499b53fc91e4f78cf0323262772d7547dab7 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Mon, 11 May 2026 23:39:52 +0100 Subject: [PATCH 03/15] Clear OpenCode state on interrupt --- .../provider/Layers/OpenCodeAdapter.test.ts | 35 +++++++++++++++++++ .../src/provider/Layers/OpenCodeAdapter.ts | 9 +++-- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts index 66dc5edc671..7359fdf2f79 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts @@ -395,6 +395,41 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { }), ); + it.effect("clears active session state after interrupting a turn", () => + Effect.gen(function* () { + const adapter = yield* OpenCodeAdapter; + const threadId = asThreadId("thread-interrupt-turn"); + yield* adapter.startSession({ + provider: ProviderDriverKind.make("opencode"), + threadId, + runtimeMode: "full-access", + }); + + const started = yield* adapter.sendTurn({ + threadId, + input: "Stop this", + modelSelection: { + instanceId: ProviderInstanceId.make("opencode"), + model: "openai/gpt-5", + }, + }); + const runningSessions = yield* adapter.listSessions(); + const runningSession = runningSessions.find((session) => session.threadId === threadId); + + assert.equal(runningSession?.status, "running"); + assert.equal(runningSession?.activeTurnId, started.turnId); + + yield* adapter.interruptTurn(threadId, started.turnId); + const interruptedSessions = yield* adapter.listSessions(); + const interruptedSession = interruptedSessions.find((session) => session.threadId === threadId); + + assert.equal(runtimeMock.state.abortCalls.at(-1), "http://127.0.0.1:9999/session"); + assert.equal(interruptedSession?.status, "ready"); + assert.equal(interruptedSession?.activeTurnId, undefined); + assert.equal(interruptedSession?.lastError, undefined); + }), + ); + it.effect("passes agent and variant options for the adapter's bound custom instance id", () => { const instanceId = ProviderInstanceId.make("opencode_zen"); const adapterLayer = Layer.effect( diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.ts index 512e9ed6bfe..32330eed484 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.ts @@ -1257,14 +1257,19 @@ export function makeOpenCodeAdapter( const interruptTurn: OpenCodeAdapterShape["interruptTurn"] = Effect.fn("interruptTurn")( function* (threadId, turnId) { const context = ensureSessionContext(sessions, threadId); + const interruptedTurnId = turnId ?? context.activeTurnId; yield* runOpenCodeSdk("session.abort", () => context.client.session.abort({ sessionID: context.openCodeSessionId }), ).pipe(Effect.mapError(toRequestError)); - if (turnId ?? context.activeTurnId) { + context.activeTurnId = undefined; + context.activeAgent = undefined; + context.activeVariant = undefined; + yield* updateProviderSession(context, { status: "ready" }, { clearActiveTurnId: true }); + if (interruptedTurnId) { yield* emit({ ...(yield* buildEventBase({ threadId, - turnId: turnId ?? context.activeTurnId, + turnId: interruptedTurnId, })), type: "turn.aborted", payload: { From 119122724dc08d85146913622c6f9aa2fe1e615d Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 12 May 2026 00:01:31 +0100 Subject: [PATCH 04/15] Cover completed runtime turn readiness --- .../Layers/ProviderRuntimeIngestion.test.ts | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 0af5fa69edd..a88de32a1a4 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -360,6 +360,49 @@ describe("ProviderRuntimeIngestion", () => { expect(thread.session?.lastError).toBe("turn failed"); }); + it("maps completed active turns into ready thread session updates", async () => { + const harness = await createHarness(); + const now = "2026-01-01T00:00:00.000Z"; + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-before-complete"), + provider: ProviderDriverKind.make("opencode"), + threadId: asThreadId("thread-1"), + createdAt: now, + turnId: asTurnId("turn-completed-success"), + }); + + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "running" && + thread.session?.activeTurnId === "turn-completed-success", + ); + + harness.emit({ + type: "turn.completed", + eventId: asEventId("evt-turn-completed-success"), + provider: ProviderDriverKind.make("opencode"), + threadId: asThreadId("thread-1"), + createdAt: "2026-01-01T00:00:01.000Z", + turnId: asTurnId("turn-completed-success"), + payload: { + state: "completed", + }, + }); + + const thread = await waitForThread( + harness.readModel, + (entry) => + entry.session?.status === "ready" && + entry.session?.activeTurnId === null && + entry.session?.lastError === null, + ); + expect(thread.session?.status).toBe("ready"); + expect(thread.session?.activeTurnId).toBeNull(); + }); + it("maps turn aborted events into ready thread session updates", async () => { const harness = await createHarness(); const now = "2026-01-01T00:00:00.000Z"; From c2cf3f0270c37fa3d9530e2309ed865680426cc0 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 12 May 2026 10:40:50 +0100 Subject: [PATCH 05/15] Preserve live provider session state --- .../Layers/ProviderCommandReactor.test.ts | 16 +++--- .../Layers/ProviderCommandReactor.ts | 50 ++++++++++++++++--- .../provider/Layers/OpenCodeAdapter.test.ts | 4 +- 3 files changed, 55 insertions(+), 15 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index a9658427471..e12809b335c 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -1697,7 +1697,7 @@ describe("ProviderCommandReactor", () => { }); }); - it("marks projected running sessions stopped on startup when the live provider session is not running the same turn", async () => { + it("mirrors live ready provider sessions on startup and settles the stale projected turn", async () => { const harness = await createHarness({ startReactor: false }); const now = "2026-01-01T00:00:00.000Z"; @@ -1736,9 +1736,9 @@ describe("ProviderCommandReactor", () => { const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.session).toMatchObject({ - status: "stopped", + status: "ready", activeTurnId: null, - lastError: "Provider runtime is no longer active. Start a new turn to reconnect this thread.", + lastError: null, }); expect(thread?.latestTurn).toMatchObject({ turnId: asTurnId("turn-stale-live-ready"), @@ -2197,7 +2197,7 @@ describe("ProviderCommandReactor", () => { expect(thread?.session?.activeTurnId).toBeNull(); }); - it("clears thread session state when provider session stop fails", async () => { + it("keeps thread session state when provider session stop fails", async () => { const now = "2026-01-01T00:00:00.000Z"; const harness = await createHarness({ stopSessionOverride: () => @@ -2242,8 +2242,8 @@ describe("ProviderCommandReactor", () => { const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); return ( - thread?.session?.status === "stopped" && - thread.session.activeTurnId === null && + thread?.session?.status === "running" && + thread.session.activeTurnId === asTurnId("turn-running") && thread.activities.some((activity) => activity.kind === "provider.session.stop.failed") ); }); @@ -2252,9 +2252,9 @@ describe("ProviderCommandReactor", () => { const readModel = await harness.readModel(); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.session).not.toBeNull(); - expect(thread?.session?.status).toBe("stopped"); + expect(thread?.session?.status).toBe("running"); expect(thread?.session?.providerInstanceId).toBe(ProviderInstanceId.make("codex_work")); - expect(thread?.session?.activeTurnId).toBeNull(); + expect(thread?.session?.activeTurnId).toBe(asTurnId("turn-running")); expect(thread?.session?.lastError).toBe("No live provider session was found for this thread."); const failureActivity = thread?.activities.find( diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index 5d07585efd9..3cb1acfc113 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -28,7 +28,11 @@ import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker"; import { resolveThreadWorkspaceCwd } from "../../checkpointing/Utils.ts"; import { increment, orchestrationEventsProcessedTotal } from "../../observability/Metrics.ts"; -import { ProviderAdapterRequestError } from "../../provider/Errors.ts"; +import { + ProviderAdapterRequestError, + ProviderAdapterSessionNotFoundError, + ProviderSessionNotFoundError, +} from "../../provider/Errors.ts"; import type { ProviderServiceError } from "../../provider/Errors.ts"; import { TextGeneration } from "../../textGeneration/TextGeneration.ts"; import { ProviderService } from "../../provider/Services/ProviderService.ts"; @@ -42,6 +46,8 @@ import { ServerSettingsService } from "../../serverSettings.ts"; import { VcsStatusBroadcaster } from "../../vcs/VcsStatusBroadcaster.ts"; import { GitWorkflowService } from "../../git/GitWorkflowService.ts"; const isProviderAdapterRequestError = Schema.is(ProviderAdapterRequestError); +const isProviderAdapterSessionNotFoundError = Schema.is(ProviderAdapterSessionNotFoundError); +const isProviderSessionNotFoundError = Schema.is(ProviderSessionNotFoundError); const isProviderDriverKind = Schema.is(ProviderDriverKind); type ProviderIntentEvent = Extract< @@ -248,6 +254,14 @@ const make = Effect.gen(function* () { return Cause.pretty(cause); }; + const isSessionAlreadyGoneFailure = (cause: Cause.Cause): boolean => { + const failReason = cause.reasons.find(Cause.isFailReason); + return ( + isProviderAdapterSessionNotFoundError(failReason?.error) || + isProviderSessionNotFoundError(failReason?.error) + ); + }; + const setThreadSession = (input: { readonly threadId: ThreadId; readonly session: OrchestrationSession; @@ -312,11 +326,23 @@ const make = Effect.gen(function* () { } const liveSession = liveSessionsByThreadId.get(thread.id); - if ( - liveSession?.status === "running" && - liveSession.activeTurnId === session.activeTurnId - ) { - return Effect.void; + if (liveSession !== undefined) { + return setThreadSession({ + threadId: thread.id, + session: { + ...session, + status: mapProviderSessionStatusToOrchestrationStatus(liveSession.status), + providerName: liveSession.provider, + ...(liveSession.providerInstanceId !== undefined + ? { providerInstanceId: liveSession.providerInstanceId } + : {}), + runtimeMode: liveSession.runtimeMode, + activeTurnId: liveSession.activeTurnId ?? null, + lastError: liveSession.lastError ?? session.lastError ?? null, + updatedAt: now, + }, + createdAt: now, + }); } return setThreadSession({ @@ -988,6 +1014,18 @@ const make = Effect.gen(function* () { ), ), ); + if (!isSessionAlreadyGoneFailure(stopExit.cause)) { + yield* setThreadSession({ + threadId: thread.id, + session: { + ...thread.session, + lastError: stopFailureDetail, + updatedAt: now, + }, + createdAt: now, + }); + return; + } } } diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts index 7359fdf2f79..5920accdcae 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts @@ -421,7 +421,9 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { yield* adapter.interruptTurn(threadId, started.turnId); const interruptedSessions = yield* adapter.listSessions(); - const interruptedSession = interruptedSessions.find((session) => session.threadId === threadId); + const interruptedSession = interruptedSessions.find( + (session) => session.threadId === threadId, + ); assert.equal(runtimeMock.state.abortCalls.at(-1), "http://127.0.0.1:9999/session"); assert.equal(interruptedSession?.status, "ready"); From a6634d46fbd0f8df67b84eaaf634c74294054631 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 12 May 2026 13:27:56 +0100 Subject: [PATCH 06/15] Preserve failed turn outcomes --- .../Layers/ProjectionPipeline.test.ts | 157 ++++++++++++++++++ .../Layers/ProjectionPipeline.ts | 22 ++- .../Layers/ProviderCommandReactor.test.ts | 64 ++++++- .../Layers/ProviderCommandReactor.ts | 40 ++--- .../provider/Layers/OpenCodeAdapter.test.ts | 22 +++ .../src/provider/Layers/OpenCodeAdapter.ts | 17 +- apps/web/src/store.test.ts | 45 +++++ apps/web/src/store.ts | 21 ++- 8 files changed, 353 insertions(+), 35 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index 8fcbea1bda5..51eb0328ead 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -2329,6 +2329,163 @@ it.effect( ), ); +it.effect( + "preserves failed turn state during a clean projection rebuild without thread latest-turn state", + () => + Effect.gen(function* () { + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const sql = yield* SqlClient.SqlClient; + const projectId = ProjectId.make("project-rebuild-error"); + const threadId = ThreadId.make("thread-rebuild-error"); + const turnId = TurnId.make("turn-rebuild-error"); + const messageId = MessageId.make("message-rebuild-error"); + const createdAt = "2026-02-26T16:00:00.000Z"; + const runningAt = "2026-02-26T16:00:05.000Z"; + const failedAt = "2026-02-26T16:00:10.000Z"; + + yield* eventStore.append({ + type: "project.created", + eventId: EventId.make("evt-rebuild-error-project"), + aggregateKind: "project", + aggregateId: projectId, + occurredAt: createdAt, + commandId: CommandId.make("cmd-rebuild-error-project"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-rebuild-error-project"), + metadata: {}, + payload: { + projectId, + title: "Project Rebuild Error", + workspaceRoot: "/tmp/project-rebuild-error", + defaultModelSelection: null, + scripts: [], + createdAt, + updatedAt: createdAt, + }, + }); + yield* eventStore.append({ + type: "thread.created", + eventId: EventId.make("evt-rebuild-error-thread"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: createdAt, + commandId: CommandId.make("cmd-rebuild-error-thread"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-rebuild-error-thread"), + metadata: {}, + payload: { + threadId, + projectId, + title: "Thread Rebuild Error", + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + model: "gpt-5-codex", + }, + runtimeMode: "approval-required", + interactionMode: "default", + branch: null, + worktreePath: null, + createdAt, + updatedAt: createdAt, + }, + }); + yield* eventStore.append({ + type: "thread.turn-start-requested", + eventId: EventId.make("evt-rebuild-error-turn-start"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: createdAt, + commandId: CommandId.make("cmd-rebuild-error-turn-start"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-rebuild-error-turn-start"), + metadata: {}, + payload: { + threadId, + messageId, + runtimeMode: "approval-required", + createdAt, + }, + }); + yield* eventStore.append({ + type: "thread.session-set", + eventId: EventId.make("evt-rebuild-error-running"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: runningAt, + commandId: CommandId.make("cmd-rebuild-error-running"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-rebuild-error-running"), + metadata: {}, + payload: { + threadId, + session: { + threadId, + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: turnId, + lastError: null, + updatedAt: runningAt, + }, + }, + }); + yield* eventStore.append({ + type: "thread.session-set", + eventId: EventId.make("evt-rebuild-error-failed"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: failedAt, + commandId: CommandId.make("cmd-rebuild-error-failed"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-rebuild-error-failed"), + metadata: {}, + payload: { + threadId, + session: { + threadId, + status: "error", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: null, + lastError: "Prompt failed.", + updatedAt: failedAt, + }, + }, + }); + + yield* projectionPipeline.bootstrap; + + const turnRows = yield* sql<{ + readonly turnId: string; + readonly state: string; + readonly completedAt: string | null; + }>` + SELECT + turn_id AS "turnId", + state, + completed_at AS "completedAt" + FROM projection_turns + WHERE thread_id = ${threadId} + AND turn_id = ${turnId} + `; + + assert.deepEqual(turnRows, [ + { + turnId, + state: "error", + completedAt: failedAt, + }, + ]); + }).pipe( + Effect.provide( + makeProjectionPipelinePrefixedTestLayer("t3-projection-pipeline-rebuild-error-"), + ), + ), +); + const engineLayer = it.layer( OrchestrationEngineLive.pipe( Layer.provide(OrchestrationProjectionSnapshotQueryLive), diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index a3b7c7f37fc..a6a14b3efc6 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -2,6 +2,7 @@ import { ApprovalRequestId, type ChatAttachment, type OrchestrationEvent, + type OrchestrationSessionStatus, ThreadId, type TurnId, } from "@t3tools/contracts"; @@ -178,6 +179,21 @@ function deriveHasActionableProposedPlan(input: { return latestPlan !== null && latestPlan.implementedAt === null; } +function settleTurnStateFromSessionStatus( + status: OrchestrationSessionStatus, +): ProjectionTurn["state"] | null { + switch (status) { + case "error": + return "error"; + case "ready": + case "interrupted": + case "stopped": + return "interrupted"; + default: + return null; + } +} + function retainProjectionMessagesAfterRevert( messages: ReadonlyArray, turns: ReadonlyArray, @@ -1002,6 +1018,10 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti case "thread.session-set": { const turnId = event.payload.session.activeTurnId; if (turnId === null || event.payload.session.status !== "running") { + const settledState = settleTurnStateFromSessionStatus(event.payload.session.status); + if (settledState === null) { + return; + } const turns = yield* projectionTurnRepository.listByThreadId({ threadId: event.payload.threadId, }); @@ -1021,7 +1041,7 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti yield* projectionTurnRepository.upsertByTurnId({ ...latestRunningTurn, - state: "interrupted", + state: settledState, startedAt: latestRunningTurn.startedAt ?? event.payload.session.updatedAt, requestedAt: latestRunningTurn.requestedAt ?? event.payload.session.updatedAt, completedAt: latestRunningTurn.completedAt ?? event.payload.session.updatedAt, diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index e12809b335c..076276bbdbd 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -32,7 +32,10 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { deriveServerPaths, ServerConfig } from "../../config.ts"; import { TextGenerationError } from "@t3tools/contracts"; -import { ProviderAdapterRequestError } from "../../provider/Errors.ts"; +import { + ProviderAdapterRequestError, + ProviderAdapterSessionNotFoundError, +} from "../../provider/Errors.ts"; import { OrchestrationEventStoreLive } from "../../persistence/Layers/OrchestrationEventStore.ts"; import { OrchestrationCommandReceiptRepositoryLive } from "../../persistence/Layers/OrchestrationCommandReceipts.ts"; import { SqlitePersistenceMemory } from "../../persistence/Layers/Sqlite.ts"; @@ -2266,4 +2269,63 @@ describe("ProviderCommandReactor", () => { detail: "No live provider session was found for this thread.", }); }); + + it("treats already-gone provider sessions as a benign stop", async () => { + const now = "2026-01-01T00:00:00.000Z"; + const harness = await createHarness({ + stopSessionOverride: () => + Effect.fail( + new ProviderAdapterSessionNotFoundError({ + provider: "codex", + threadId: "thread-1", + }), + ), + }); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make("cmd-session-set-for-benign-stop"), + threadId: ThreadId.make("thread-1"), + session: { + threadId: ThreadId.make("thread-1"), + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex_work"), + runtimeMode: "approval-required", + activeTurnId: asTurnId("turn-running"), + lastError: null, + updatedAt: now, + }, + createdAt: now, + }), + ); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.stop", + commandId: CommandId.make("cmd-session-stop-benign"), + threadId: ThreadId.make("thread-1"), + createdAt: now, + }), + ); + + await waitFor(async () => { + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + return thread?.session?.status === "stopped" && thread.session.lastError === null; + }); + + expect(harness.stopSession.mock.calls.length).toBe(1); + const readModel = await harness.readModel(); + const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(thread?.session).not.toBeNull(); + expect(thread?.session?.status).toBe("stopped"); + expect(thread?.session?.providerInstanceId).toBe(ProviderInstanceId.make("codex_work")); + expect(thread?.session?.activeTurnId).toBeNull(); + expect(thread?.session?.lastError).toBeNull(); + expect( + thread?.activities.find((activity) => activity.kind === "provider.session.stop.failed"), + ).toBeUndefined(); + }); }); diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index 3cb1acfc113..fe4b42c83fc 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -994,27 +994,28 @@ const make = Effect.gen(function* () { if (thread.session && thread.session.status !== "stopped") { const stopExit = yield* Effect.exit(providerService.stopSession({ threadId: thread.id })); if (Exit.isFailure(stopExit)) { + const sessionAlreadyGone = isSessionAlreadyGoneFailure(stopExit.cause); stopFailureDetail = formatFailureDetail(stopExit.cause); - yield* appendProviderFailureActivity({ - threadId: thread.id, - kind: "provider.session.stop.failed", - summary: "Provider session stop failed", - detail: stopFailureDetail, - turnId: thread.session.activeTurnId, - createdAt: now, - }).pipe( - Effect.catchCause((activityCause) => - Effect.logWarning( - "provider command reactor failed to record provider session stop failure", - { - threadId: thread.id, - cause: Cause.pretty(activityCause), - originalCause: Cause.pretty(stopExit.cause), - }, + if (!sessionAlreadyGone) { + yield* appendProviderFailureActivity({ + threadId: thread.id, + kind: "provider.session.stop.failed", + summary: "Provider session stop failed", + detail: stopFailureDetail, + turnId: thread.session.activeTurnId, + createdAt: now, + }).pipe( + Effect.catchCause((activityCause) => + Effect.logWarning( + "provider command reactor failed to record provider session stop failure", + { + threadId: thread.id, + cause: Cause.pretty(activityCause), + originalCause: Cause.pretty(stopExit.cause), + }, + ), ), - ), - ); - if (!isSessionAlreadyGoneFailure(stopExit.cause)) { + ); yield* setThreadSession({ threadId: thread.id, session: { @@ -1026,6 +1027,7 @@ const make = Effect.gen(function* () { }); return; } + stopFailureDetail = null; } } diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts index 5920accdcae..21de7db2fd0 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts @@ -366,6 +366,11 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { runtimeMode: "full-access", }); + const eventsFiber = yield* adapter.streamEvents.pipe( + Stream.runHead, + Effect.timeout("100 millis"), + Effect.forkChild, + ); runtimeMock.state.promptAsyncError = new Error("prompt failed"); const error = yield* adapter .sendTurn({ @@ -378,6 +383,7 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { }) .pipe(Effect.flip); const sessions = yield* adapter.listSessions(); + const emittedEvent = yield* Fiber.join(eventsFiber); assert.equal(error._tag, "ProviderAdapterRequestError"); if (error._tag !== "ProviderAdapterRequestError") { @@ -392,6 +398,22 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { assert.equal(sessions[0]?.status, "ready"); assert.equal(sessions[0]?.activeTurnId, undefined); assert.equal(sessions[0]?.lastError, "prompt failed"); + const outerValue = + typeof emittedEvent === "object" && emittedEvent !== null && "value" in emittedEvent + ? (emittedEvent as { value?: unknown }).value + : undefined; + const innerValue = + typeof outerValue === "object" && outerValue !== null && "value" in outerValue + ? (outerValue as { value?: unknown }).value + : outerValue; + const emittedEventType = + typeof innerValue === "object" && + innerValue !== null && + "type" in innerValue && + typeof (innerValue as { type?: unknown }).type === "string" + ? (innerValue as { type: string }).type + : null; + assert.notEqual(emittedEventType, "turn.aborted"); }), ); diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.ts index 32330eed484..ad5c4358049 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.ts @@ -1217,9 +1217,10 @@ export function makeOpenCodeAdapter( ).pipe( Effect.mapError(toRequestError), // On failure: clear active-turn state, flip the session back to ready - // with lastError set, emit turn.aborted, then let the typed error - // propagate. We don't need to rebuild the error here — `toRequestError` - // already produced the right shape. + // with lastError set, then let the typed error propagate. We don't + // emit turn.aborted here because prompt-start failures are not user + // interrupts, and the orchestration layer already records the + // provider.turn.start.failed activity from the typed error. Effect.tapError((requestError) => Effect.gen(function* () { context.activeTurnId = undefined; @@ -1234,16 +1235,6 @@ export function makeOpenCodeAdapter( }, { clearActiveTurnId: true }, ); - yield* emit({ - ...(yield* buildEventBase({ - threadId: input.threadId, - turnId, - })), - type: "turn.aborted", - payload: { - reason: requestError.detail, - }, - }); }), ), ); diff --git a/apps/web/src/store.test.ts b/apps/web/src/store.test.ts index 1322345847a..cdc544f26c1 100644 --- a/apps/web/src/store.test.ts +++ b/apps/web/src/store.test.ts @@ -827,6 +827,51 @@ describe("incremental orchestration updates", () => { }); }); + it("preserves a failed latest turn when the session enters error", () => { + const thread = makeThread({ + session: { + provider: ProviderDriverKind.make("codex"), + status: "running", + orchestrationStatus: "running", + activeTurnId: TurnId.make("turn-1"), + createdAt: "2026-02-27T00:00:00.000Z", + updatedAt: "2026-02-27T00:00:01.000Z", + }, + latestTurn: { + turnId: TurnId.make("turn-1"), + state: "running", + requestedAt: "2026-02-27T00:00:00.000Z", + startedAt: "2026-02-27T00:00:01.000Z", + completedAt: null, + assistantMessageId: null, + }, + }); + + const next = applyOrchestrationEvent( + makeState(thread), + makeEvent("thread.session-set", { + threadId: thread.id, + session: { + threadId: thread.id, + status: "error", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: null, + lastError: "Prompt failed.", + updatedAt: "2026-02-27T00:00:05.000Z", + }, + }), + localEnvironmentId, + ); + + expect(threadsOf(next)[0]?.session?.status).toBe("error"); + expect(threadsOf(next)[0]?.latestTurn).toMatchObject({ + turnId: TurnId.make("turn-1"), + state: "error", + completedAt: "2026-02-27T00:00:05.000Z", + }); + }); + it("does not regress latestTurn when an older turn diff completes late", () => { const state = makeState( makeThread({ diff --git a/apps/web/src/store.ts b/apps/web/src/store.ts index 00ad2d12629..7264478e214 100644 --- a/apps/web/src/store.ts +++ b/apps/web/src/store.ts @@ -879,14 +879,32 @@ function buildLatestTurn(params: { }; } +function latestTurnStateFromInactiveSessionStatus( + status: OrchestrationSessionStatus, +): NonNullable["state"] | null { + switch (status) { + case "error": + return "error"; + case "ready": + case "interrupted": + case "stopped": + return "interrupted"; + default: + return null; + } +} + function settleLatestTurnForInactiveSession(input: { latestTurn: Thread["latestTurn"]; previousSession: Thread["session"]; + nextSessionStatus: OrchestrationSessionStatus; completedAt: string; }): Thread["latestTurn"] { const latestTurn = input.latestTurn; const previousActiveTurnId = input.previousSession?.activeTurnId; + const nextState = latestTurnStateFromInactiveSessionStatus(input.nextSessionStatus); if ( + nextState === null || latestTurn === null || latestTurn.state !== "running" || previousActiveTurnId === undefined || @@ -897,7 +915,7 @@ function settleLatestTurnForInactiveSession(input: { return buildLatestTurn({ previous: latestTurn, turnId: latestTurn.turnId, - state: "interrupted", + state: nextState, requestedAt: latestTurn.requestedAt, startedAt: latestTurn.startedAt ?? input.completedAt, completedAt: latestTurn.completedAt ?? input.completedAt, @@ -1502,6 +1520,7 @@ function applyEnvironmentOrchestrationEvent( : settleLatestTurnForInactiveSession({ latestTurn: thread.latestTurn, previousSession: thread.session, + nextSessionStatus: event.payload.session.status, completedAt: event.payload.session.updatedAt, }), updatedAt: event.occurredAt, From 6b508410ce57d002dc16d889aa23b1a638f88184 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 12 May 2026 13:54:28 +0100 Subject: [PATCH 07/15] Cover OpenCode reported lifecycle cases --- .../Layers/ProviderRuntimeIngestion.test.ts | 59 +++++++++++++++++++ .../provider/Layers/OpenCodeAdapter.test.ts | 34 +++++++++++ 2 files changed, 93 insertions(+) diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index a88de32a1a4..8cefee75bd2 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -695,6 +695,65 @@ describe("ProviderRuntimeIngestion", () => { ); }); + it("keeps OpenCode main turn tracking when title turn events interleave", async () => { + const harness = await createHarness(); + const now = "2026-01-01T00:00:00.000Z"; + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-opencode-main-started"), + provider: ProviderDriverKind.make("opencode"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-opencode-main"), + }); + + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "running" && + thread.session?.activeTurnId === "turn-opencode-main", + ); + + harness.emit({ + type: "turn.completed", + eventId: asEventId("evt-opencode-title-completed"), + provider: ProviderDriverKind.make("opencode"), + createdAt: "2026-01-01T00:00:01.000Z", + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-opencode-title"), + payload: { + state: "completed", + }, + }); + + await harness.drain(); + const midReadModel = await harness.readModel(); + const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); + expect(midThread?.session?.status).toBe("running"); + expect(midThread?.session?.activeTurnId).toBe("turn-opencode-main"); + + harness.emit({ + type: "turn.completed", + eventId: asEventId("evt-opencode-main-completed"), + provider: ProviderDriverKind.make("opencode"), + createdAt: "2026-01-01T00:00:02.000Z", + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-opencode-main"), + payload: { + state: "completed", + }, + }); + + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "ready" && + thread.session?.activeTurnId === null && + thread.session?.lastError === null, + ); + }); + it("ignores auxiliary turn aborts for a non-active turn", async () => { const harness = await createHarness(); const now = "2026-01-01T00:00:00.000Z"; diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts index 21de7db2fd0..16aa38aca61 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts @@ -454,6 +454,40 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { }), ); + it.effect("allows an interrupted OpenCode turn to be stopped cleanly", () => + Effect.gen(function* () { + const adapter = yield* OpenCodeAdapter; + const threadId = asThreadId("thread-interrupt-then-stop"); + yield* adapter.startSession({ + provider: ProviderDriverKind.make("opencode"), + threadId, + runtimeMode: "full-access", + }); + + const started = yield* adapter.sendTurn({ + threadId, + input: "Steer then stop this", + modelSelection: { + instanceId: ProviderInstanceId.make("opencode"), + model: "openai/gpt-5", + }, + }); + + yield* adapter.interruptTurn(threadId, started.turnId); + const readySessions = yield* adapter.listSessions(); + const readySession = readySessions.find((session) => session.threadId === threadId); + assert.equal(readySession?.status, "ready"); + assert.equal(readySession?.activeTurnId, undefined); + + yield* adapter.stopSession(threadId); + const stoppedSessions = yield* adapter.listSessions(); + const stoppedSession = stoppedSessions.find((session) => session.threadId === threadId); + + assert.equal(stoppedSession, undefined); + assert.equal(runtimeMock.state.abortCalls.at(-1), "http://127.0.0.1:9999/session"); + }), + ); + it.effect("passes agent and variant options for the adapter's bound custom instance id", () => { const instanceId = ProviderInstanceId.make("opencode_zen"); const adapterLayer = Layer.effect( From 5d14d449f5cac61cc00c8b0fa9dd1292063c2770 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 12 May 2026 17:51:39 +0100 Subject: [PATCH 08/15] Fix stopped session turn settlement --- .../provider/Layers/OpenCodeAdapter.test.ts | 35 +++++----- apps/web/src/store.test.ts | 65 +++++++++++++++++++ apps/web/src/store.ts | 20 +++--- 3 files changed, 89 insertions(+), 31 deletions(-) diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts index 16aa38aca61..5dcacabdbaa 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts @@ -18,6 +18,7 @@ import { ProviderDriverKind, ProviderInstanceId, ThreadId, + type ProviderRuntimeEvent, } from "@t3tools/contracts"; import { createModelSelection } from "@t3tools/shared/model"; import { ServerConfig } from "../../config.ts"; @@ -366,11 +367,18 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { runtimeMode: "full-access", }); + const observedEvents: ProviderRuntimeEvent[] = []; const eventsFiber = yield* adapter.streamEvents.pipe( - Stream.runHead, - Effect.timeout("100 millis"), + Stream.filter((event) => event.threadId === asThreadId("thread-send-turn-failure")), + Stream.runForEach((event) => + Effect.sync(() => { + observedEvents.push(event); + }), + ), Effect.forkChild, ); + yield* Effect.yieldNow; + runtimeMock.state.promptAsyncError = new Error("prompt failed"); const error = yield* adapter .sendTurn({ @@ -383,7 +391,8 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { }) .pipe(Effect.flip); const sessions = yield* adapter.listSessions(); - const emittedEvent = yield* Fiber.join(eventsFiber); + yield* Effect.yieldNow; + yield* Fiber.interrupt(eventsFiber); assert.equal(error._tag, "ProviderAdapterRequestError"); if (error._tag !== "ProviderAdapterRequestError") { @@ -398,22 +407,10 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { assert.equal(sessions[0]?.status, "ready"); assert.equal(sessions[0]?.activeTurnId, undefined); assert.equal(sessions[0]?.lastError, "prompt failed"); - const outerValue = - typeof emittedEvent === "object" && emittedEvent !== null && "value" in emittedEvent - ? (emittedEvent as { value?: unknown }).value - : undefined; - const innerValue = - typeof outerValue === "object" && outerValue !== null && "value" in outerValue - ? (outerValue as { value?: unknown }).value - : outerValue; - const emittedEventType = - typeof innerValue === "object" && - innerValue !== null && - "type" in innerValue && - typeof (innerValue as { type?: unknown }).type === "string" - ? (innerValue as { type: string }).type - : null; - assert.notEqual(emittedEventType, "turn.aborted"); + assert.equal( + observedEvents.some((event) => event.type === "turn.aborted"), + false, + ); }), ); diff --git a/apps/web/src/store.test.ts b/apps/web/src/store.test.ts index cdc544f26c1..59a0da13821 100644 --- a/apps/web/src/store.test.ts +++ b/apps/web/src/store.test.ts @@ -827,6 +827,71 @@ describe("incremental orchestration updates", () => { }); }); + it("settles a running latest turn across optimistic stop and server confirmation", () => { + const thread = makeThread({ + session: { + provider: ProviderDriverKind.make("codex"), + status: "running", + orchestrationStatus: "running", + activeTurnId: TurnId.make("turn-1"), + createdAt: "2026-02-27T00:00:00.000Z", + updatedAt: "2026-02-27T00:00:01.000Z", + }, + latestTurn: { + turnId: TurnId.make("turn-1"), + state: "running", + requestedAt: "2026-02-27T00:00:00.000Z", + startedAt: "2026-02-27T00:00:01.000Z", + completedAt: null, + assistantMessageId: null, + }, + }); + const stopRequestedAt = "2026-02-27T00:00:04.000Z"; + + const afterOptimisticStop = applyOrchestrationEvent( + makeState(thread), + makeEvent("thread.session-stop-requested", { + threadId: thread.id, + createdAt: stopRequestedAt, + }), + localEnvironmentId, + ); + + expect(threadsOf(afterOptimisticStop)[0]?.session).toMatchObject({ + status: "closed", + activeTurnId: undefined, + }); + expect(threadsOf(afterOptimisticStop)[0]?.latestTurn).toMatchObject({ + turnId: TurnId.make("turn-1"), + state: "interrupted", + completedAt: stopRequestedAt, + }); + + const afterServerConfirmation = applyOrchestrationEvent( + afterOptimisticStop, + makeEvent("thread.session-set", { + threadId: thread.id, + session: { + threadId: thread.id, + status: "stopped", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: null, + lastError: null, + updatedAt: "2026-02-27T00:00:05.000Z", + }, + }), + localEnvironmentId, + ); + + expect(threadsOf(afterServerConfirmation)[0]?.session?.status).toBe("closed"); + expect(threadsOf(afterServerConfirmation)[0]?.latestTurn).toMatchObject({ + turnId: TurnId.make("turn-1"), + state: "interrupted", + completedAt: stopRequestedAt, + }); + }); + it("preserves a failed latest turn when the session enters error", () => { const thread = makeThread({ session: { diff --git a/apps/web/src/store.ts b/apps/web/src/store.ts index 7264478e214..2be2ed3f34d 100644 --- a/apps/web/src/store.ts +++ b/apps/web/src/store.ts @@ -894,22 +894,14 @@ function latestTurnStateFromInactiveSessionStatus( } } -function settleLatestTurnForInactiveSession(input: { +function settleRunningLatestTurnForInactiveSession(input: { latestTurn: Thread["latestTurn"]; - previousSession: Thread["session"]; nextSessionStatus: OrchestrationSessionStatus; completedAt: string; }): Thread["latestTurn"] { const latestTurn = input.latestTurn; - const previousActiveTurnId = input.previousSession?.activeTurnId; const nextState = latestTurnStateFromInactiveSessionStatus(input.nextSessionStatus); - if ( - nextState === null || - latestTurn === null || - latestTurn.state !== "running" || - previousActiveTurnId === undefined || - latestTurn.turnId !== previousActiveTurnId - ) { + if (nextState === null || latestTurn === null || latestTurn.state !== "running") { return latestTurn; } return buildLatestTurn({ @@ -1517,9 +1509,8 @@ function applyEnvironmentOrchestrationEvent( : null, sourceProposedPlan: thread.pendingSourceProposedPlan, }) - : settleLatestTurnForInactiveSession({ + : settleRunningLatestTurnForInactiveSession({ latestTurn: thread.latestTurn, - previousSession: thread.session, nextSessionStatus: event.payload.session.status, completedAt: event.payload.session.updatedAt, }), @@ -1539,6 +1530,11 @@ function applyEnvironmentOrchestrationEvent( activeTurnId: undefined, updatedAt: event.payload.createdAt, }, + latestTurn: settleRunningLatestTurnForInactiveSession({ + latestTurn: thread.latestTurn, + nextSessionStatus: "stopped", + completedAt: event.payload.createdAt, + }), updatedAt: event.occurredAt, }, ); From 53b9168044220005e08e8f165635ebd849837ff4 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 12 May 2026 17:58:28 +0100 Subject: [PATCH 09/15] Reduce store turn settlement test duplication --- apps/web/src/store.test.ts | 78 ++++++++++++-------------------------- 1 file changed, 24 insertions(+), 54 deletions(-) diff --git a/apps/web/src/store.test.ts b/apps/web/src/store.test.ts index 59a0da13821..fb24a397ad8 100644 --- a/apps/web/src/store.test.ts +++ b/apps/web/src/store.test.ts @@ -88,6 +88,27 @@ function makeThread(overrides: Partial = {}): Thread { }; } +function makeThreadWithRunningTurn(turnId = TurnId.make("turn-1")): Thread { + return makeThread({ + session: { + provider: ProviderDriverKind.make("codex"), + status: "running", + orchestrationStatus: "running", + activeTurnId: turnId, + createdAt: "2026-02-27T00:00:00.000Z", + updatedAt: "2026-02-27T00:00:01.000Z", + }, + latestTurn: { + turnId, + state: "running", + requestedAt: "2026-02-27T00:00:00.000Z", + startedAt: "2026-02-27T00:00:01.000Z", + completedAt: null, + assistantMessageId: null, + }, + }); +} + function makeState(thread: Thread): AppState { const projectId = ProjectId.make("project-1"); const project = { @@ -783,24 +804,7 @@ describe("incremental orchestration updates", () => { }); it("settles a running latest turn when the session is stopped", () => { - const thread = makeThread({ - session: { - provider: ProviderDriverKind.make("codex"), - status: "running", - orchestrationStatus: "running", - activeTurnId: TurnId.make("turn-1"), - createdAt: "2026-02-27T00:00:00.000Z", - updatedAt: "2026-02-27T00:00:01.000Z", - }, - latestTurn: { - turnId: TurnId.make("turn-1"), - state: "running", - requestedAt: "2026-02-27T00:00:00.000Z", - startedAt: "2026-02-27T00:00:01.000Z", - completedAt: null, - assistantMessageId: null, - }, - }); + const thread = makeThreadWithRunningTurn(); const next = applyOrchestrationEvent( makeState(thread), @@ -828,24 +832,7 @@ describe("incremental orchestration updates", () => { }); it("settles a running latest turn across optimistic stop and server confirmation", () => { - const thread = makeThread({ - session: { - provider: ProviderDriverKind.make("codex"), - status: "running", - orchestrationStatus: "running", - activeTurnId: TurnId.make("turn-1"), - createdAt: "2026-02-27T00:00:00.000Z", - updatedAt: "2026-02-27T00:00:01.000Z", - }, - latestTurn: { - turnId: TurnId.make("turn-1"), - state: "running", - requestedAt: "2026-02-27T00:00:00.000Z", - startedAt: "2026-02-27T00:00:01.000Z", - completedAt: null, - assistantMessageId: null, - }, - }); + const thread = makeThreadWithRunningTurn(); const stopRequestedAt = "2026-02-27T00:00:04.000Z"; const afterOptimisticStop = applyOrchestrationEvent( @@ -893,24 +880,7 @@ describe("incremental orchestration updates", () => { }); it("preserves a failed latest turn when the session enters error", () => { - const thread = makeThread({ - session: { - provider: ProviderDriverKind.make("codex"), - status: "running", - orchestrationStatus: "running", - activeTurnId: TurnId.make("turn-1"), - createdAt: "2026-02-27T00:00:00.000Z", - updatedAt: "2026-02-27T00:00:01.000Z", - }, - latestTurn: { - turnId: TurnId.make("turn-1"), - state: "running", - requestedAt: "2026-02-27T00:00:00.000Z", - startedAt: "2026-02-27T00:00:01.000Z", - completedAt: null, - assistantMessageId: null, - }, - }); + const thread = makeThreadWithRunningTurn(); const next = applyOrchestrationEvent( makeState(thread), From 16f42dc4c1b14a80a9a7407328097af2cb05a764 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 12 May 2026 18:44:26 +0100 Subject: [PATCH 10/15] Reduce projection pipeline rebuild test duplication --- .../Layers/ProjectionPipeline.test.ts | 375 +++++++----------- 1 file changed, 154 insertions(+), 221 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index 51eb0328ead..b0f056da104 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -23,7 +23,10 @@ import { makeSqlitePersistenceLive, SqlitePersistenceMemory, } from "../../persistence/Layers/Sqlite.ts"; -import { OrchestrationEventStore } from "../../persistence/Services/OrchestrationEventStore.ts"; +import { + OrchestrationEventStore, + type OrchestrationEventStoreShape, +} from "../../persistence/Services/OrchestrationEventStore.ts"; import { RepositoryIdentityResolverLive } from "../../project/Layers/RepositoryIdentityResolver.ts"; import { OrchestrationEngineLive } from "./OrchestrationEngine.ts"; import { @@ -52,6 +55,134 @@ const exists = (filePath: string) => const BaseTestLayer = makeProjectionPipelinePrefixedTestLayer("t3-projection-pipeline-test-"); +const appendCleanRebuildTurnScenario = Effect.fn("appendCleanRebuildTurnScenario")(function* ( + eventStore: OrchestrationEventStoreShape, + input: { + readonly slug: string; + readonly projectId: ProjectId; + readonly threadId: ThreadId; + readonly turnId: TurnId; + readonly messageId: MessageId; + readonly createdAt: string; + readonly runningAt: string; + readonly finalAt: string; + readonly finalStatus: "stopped" | "error"; + readonly finalLastError: string; + }, +) { + yield* eventStore.append({ + type: "project.created", + eventId: EventId.make(`evt-${input.slug}-project`), + aggregateKind: "project", + aggregateId: input.projectId, + occurredAt: input.createdAt, + commandId: CommandId.make(`cmd-${input.slug}-project`), + causationEventId: null, + correlationId: CorrelationId.make(`cmd-${input.slug}-project`), + metadata: {}, + payload: { + projectId: input.projectId, + title: `Project ${input.slug}`, + workspaceRoot: `/tmp/${input.slug}`, + defaultModelSelection: null, + scripts: [], + createdAt: input.createdAt, + updatedAt: input.createdAt, + }, + }); + yield* eventStore.append({ + type: "thread.created", + eventId: EventId.make(`evt-${input.slug}-thread`), + aggregateKind: "thread", + aggregateId: input.threadId, + occurredAt: input.createdAt, + commandId: CommandId.make(`cmd-${input.slug}-thread`), + causationEventId: null, + correlationId: CorrelationId.make(`cmd-${input.slug}-thread`), + metadata: {}, + payload: { + threadId: input.threadId, + projectId: input.projectId, + title: `Thread ${input.slug}`, + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + model: "gpt-5-codex", + }, + runtimeMode: "approval-required", + interactionMode: "default", + branch: null, + worktreePath: null, + createdAt: input.createdAt, + updatedAt: input.createdAt, + }, + }); + yield* eventStore.append({ + type: "thread.turn-start-requested", + eventId: EventId.make(`evt-${input.slug}-turn-start`), + aggregateKind: "thread", + aggregateId: input.threadId, + occurredAt: input.createdAt, + commandId: CommandId.make(`cmd-${input.slug}-turn-start`), + causationEventId: null, + correlationId: CorrelationId.make(`cmd-${input.slug}-turn-start`), + metadata: {}, + payload: { + threadId: input.threadId, + messageId: input.messageId, + runtimeMode: "approval-required", + createdAt: input.createdAt, + }, + }); + yield* eventStore.append({ + type: "thread.session-set", + eventId: EventId.make(`evt-${input.slug}-running`), + aggregateKind: "thread", + aggregateId: input.threadId, + occurredAt: input.runningAt, + commandId: CommandId.make(`cmd-${input.slug}-running`), + causationEventId: null, + correlationId: CorrelationId.make(`cmd-${input.slug}-running`), + metadata: {}, + payload: { + threadId: input.threadId, + session: { + threadId: input.threadId, + status: "running", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: input.turnId, + lastError: null, + updatedAt: input.runningAt, + }, + }, + }); + yield* eventStore.append({ + type: "thread.session-set", + eventId: EventId.make(`evt-${input.slug}-final`), + aggregateKind: "thread", + aggregateId: input.threadId, + occurredAt: input.finalAt, + commandId: CommandId.make(`cmd-${input.slug}-final`), + causationEventId: null, + correlationId: CorrelationId.make(`cmd-${input.slug}-final`), + metadata: {}, + payload: { + threadId: input.threadId, + session: { + threadId: input.threadId, + status: input.finalStatus, + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "approval-required", + activeTurnId: null, + lastError: input.finalLastError, + updatedAt: input.finalAt, + }, + }, + }); +}); + it.layer(BaseTestLayer)("OrchestrationProjectionPipeline", (it) => { it.effect("bootstraps all projection states and writes projection rows", () => Effect.gen(function* () { @@ -2187,116 +2318,17 @@ it.effect( const runningAt = "2026-02-26T15:00:05.000Z"; const stoppedAt = "2026-02-26T15:00:10.000Z"; - yield* eventStore.append({ - type: "project.created", - eventId: EventId.make("evt-rebuild-settle-project"), - aggregateKind: "project", - aggregateId: projectId, - occurredAt: createdAt, - commandId: CommandId.make("cmd-rebuild-settle-project"), - causationEventId: null, - correlationId: CorrelationId.make("cmd-rebuild-settle-project"), - metadata: {}, - payload: { - projectId, - title: "Project Rebuild Settle", - workspaceRoot: "/tmp/project-rebuild-settle", - defaultModelSelection: null, - scripts: [], - createdAt, - updatedAt: createdAt, - }, - }); - yield* eventStore.append({ - type: "thread.created", - eventId: EventId.make("evt-rebuild-settle-thread"), - aggregateKind: "thread", - aggregateId: threadId, - occurredAt: createdAt, - commandId: CommandId.make("cmd-rebuild-settle-thread"), - causationEventId: null, - correlationId: CorrelationId.make("cmd-rebuild-settle-thread"), - metadata: {}, - payload: { - threadId, - projectId, - title: "Thread Rebuild Settle", - modelSelection: { - instanceId: ProviderInstanceId.make("codex"), - model: "gpt-5-codex", - }, - runtimeMode: "approval-required", - interactionMode: "default", - branch: null, - worktreePath: null, - createdAt, - updatedAt: createdAt, - }, - }); - yield* eventStore.append({ - type: "thread.turn-start-requested", - eventId: EventId.make("evt-rebuild-settle-turn-start"), - aggregateKind: "thread", - aggregateId: threadId, - occurredAt: createdAt, - commandId: CommandId.make("cmd-rebuild-settle-turn-start"), - causationEventId: null, - correlationId: CorrelationId.make("cmd-rebuild-settle-turn-start"), - metadata: {}, - payload: { - threadId, - messageId, - runtimeMode: "approval-required", - createdAt, - }, - }); - yield* eventStore.append({ - type: "thread.session-set", - eventId: EventId.make("evt-rebuild-settle-running"), - aggregateKind: "thread", - aggregateId: threadId, - occurredAt: runningAt, - commandId: CommandId.make("cmd-rebuild-settle-running"), - causationEventId: null, - correlationId: CorrelationId.make("cmd-rebuild-settle-running"), - metadata: {}, - payload: { - threadId, - session: { - threadId, - status: "running", - providerName: "codex", - providerInstanceId: ProviderInstanceId.make("codex"), - runtimeMode: "approval-required", - activeTurnId: turnId, - lastError: null, - updatedAt: runningAt, - }, - }, - }); - yield* eventStore.append({ - type: "thread.session-set", - eventId: EventId.make("evt-rebuild-settle-stopped"), - aggregateKind: "thread", - aggregateId: threadId, - occurredAt: stoppedAt, - commandId: CommandId.make("cmd-rebuild-settle-stopped"), - causationEventId: null, - correlationId: CorrelationId.make("cmd-rebuild-settle-stopped"), - metadata: {}, - payload: { - threadId, - session: { - threadId, - status: "stopped", - providerName: "codex", - providerInstanceId: ProviderInstanceId.make("codex"), - runtimeMode: "approval-required", - activeTurnId: null, - lastError: "Provider runtime is no longer active.", - updatedAt: stoppedAt, - }, - }, + yield* appendCleanRebuildTurnScenario(eventStore, { + slug: "rebuild-settle", + projectId, + threadId, + turnId, + messageId, + createdAt, + runningAt, + finalAt: stoppedAt, + finalStatus: "stopped", + finalLastError: "Provider runtime is no longer active.", }); yield* projectionPipeline.bootstrap; @@ -2344,116 +2376,17 @@ it.effect( const runningAt = "2026-02-26T16:00:05.000Z"; const failedAt = "2026-02-26T16:00:10.000Z"; - yield* eventStore.append({ - type: "project.created", - eventId: EventId.make("evt-rebuild-error-project"), - aggregateKind: "project", - aggregateId: projectId, - occurredAt: createdAt, - commandId: CommandId.make("cmd-rebuild-error-project"), - causationEventId: null, - correlationId: CorrelationId.make("cmd-rebuild-error-project"), - metadata: {}, - payload: { - projectId, - title: "Project Rebuild Error", - workspaceRoot: "/tmp/project-rebuild-error", - defaultModelSelection: null, - scripts: [], - createdAt, - updatedAt: createdAt, - }, - }); - yield* eventStore.append({ - type: "thread.created", - eventId: EventId.make("evt-rebuild-error-thread"), - aggregateKind: "thread", - aggregateId: threadId, - occurredAt: createdAt, - commandId: CommandId.make("cmd-rebuild-error-thread"), - causationEventId: null, - correlationId: CorrelationId.make("cmd-rebuild-error-thread"), - metadata: {}, - payload: { - threadId, - projectId, - title: "Thread Rebuild Error", - modelSelection: { - instanceId: ProviderInstanceId.make("codex"), - model: "gpt-5-codex", - }, - runtimeMode: "approval-required", - interactionMode: "default", - branch: null, - worktreePath: null, - createdAt, - updatedAt: createdAt, - }, - }); - yield* eventStore.append({ - type: "thread.turn-start-requested", - eventId: EventId.make("evt-rebuild-error-turn-start"), - aggregateKind: "thread", - aggregateId: threadId, - occurredAt: createdAt, - commandId: CommandId.make("cmd-rebuild-error-turn-start"), - causationEventId: null, - correlationId: CorrelationId.make("cmd-rebuild-error-turn-start"), - metadata: {}, - payload: { - threadId, - messageId, - runtimeMode: "approval-required", - createdAt, - }, - }); - yield* eventStore.append({ - type: "thread.session-set", - eventId: EventId.make("evt-rebuild-error-running"), - aggregateKind: "thread", - aggregateId: threadId, - occurredAt: runningAt, - commandId: CommandId.make("cmd-rebuild-error-running"), - causationEventId: null, - correlationId: CorrelationId.make("cmd-rebuild-error-running"), - metadata: {}, - payload: { - threadId, - session: { - threadId, - status: "running", - providerName: "codex", - providerInstanceId: ProviderInstanceId.make("codex"), - runtimeMode: "approval-required", - activeTurnId: turnId, - lastError: null, - updatedAt: runningAt, - }, - }, - }); - yield* eventStore.append({ - type: "thread.session-set", - eventId: EventId.make("evt-rebuild-error-failed"), - aggregateKind: "thread", - aggregateId: threadId, - occurredAt: failedAt, - commandId: CommandId.make("cmd-rebuild-error-failed"), - causationEventId: null, - correlationId: CorrelationId.make("cmd-rebuild-error-failed"), - metadata: {}, - payload: { - threadId, - session: { - threadId, - status: "error", - providerName: "codex", - providerInstanceId: ProviderInstanceId.make("codex"), - runtimeMode: "approval-required", - activeTurnId: null, - lastError: "Prompt failed.", - updatedAt: failedAt, - }, - }, + yield* appendCleanRebuildTurnScenario(eventStore, { + slug: "rebuild-error", + projectId, + threadId, + turnId, + messageId, + createdAt, + runningAt, + finalAt: failedAt, + finalStatus: "error", + finalLastError: "Prompt failed.", }); yield* projectionPipeline.bootstrap; From d4e57fb34358c6cdd88b48fb1eb07d3d530f2831 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 12 May 2026 19:26:11 +0100 Subject: [PATCH 11/15] Simplify provider session stop failure handling --- .../orchestration/Layers/ProviderCommandReactor.ts | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index fe4b42c83fc..1478e31af0d 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -990,18 +990,16 @@ const make = Effect.gen(function* () { } const now = event.payload.createdAt; - let stopFailureDetail: string | null = null; if (thread.session && thread.session.status !== "stopped") { const stopExit = yield* Effect.exit(providerService.stopSession({ threadId: thread.id })); if (Exit.isFailure(stopExit)) { - const sessionAlreadyGone = isSessionAlreadyGoneFailure(stopExit.cause); - stopFailureDetail = formatFailureDetail(stopExit.cause); - if (!sessionAlreadyGone) { + if (!isSessionAlreadyGoneFailure(stopExit.cause)) { + const detail = formatFailureDetail(stopExit.cause); yield* appendProviderFailureActivity({ threadId: thread.id, kind: "provider.session.stop.failed", summary: "Provider session stop failed", - detail: stopFailureDetail, + detail, turnId: thread.session.activeTurnId, createdAt: now, }).pipe( @@ -1020,14 +1018,13 @@ const make = Effect.gen(function* () { threadId: thread.id, session: { ...thread.session, - lastError: stopFailureDetail, + lastError: detail, updatedAt: now, }, createdAt: now, }); return; } - stopFailureDetail = null; } } @@ -1042,7 +1039,7 @@ const make = Effect.gen(function* () { : {}), runtimeMode: thread.session?.runtimeMode ?? DEFAULT_RUNTIME_MODE, activeTurnId: null, - lastError: stopFailureDetail ?? thread.session?.lastError ?? null, + lastError: thread.session?.lastError ?? null, updatedAt: now, }, createdAt: now, From b2f027ff4244ef1bcd965390fdea424a1ce44443 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 12 May 2026 19:59:25 +0100 Subject: [PATCH 12/15] Clear stale errors during session reconcile --- .../src/orchestration/Layers/ProviderCommandReactor.test.ts | 2 +- .../src/orchestration/Layers/ProviderCommandReactor.ts | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index 076276bbdbd..48947cc36b6 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -1716,7 +1716,7 @@ describe("ProviderCommandReactor", () => { providerInstanceId: ProviderInstanceId.make("codex"), runtimeMode: "approval-required", activeTurnId: asTurnId("turn-stale-live-ready"), - lastError: null, + lastError: "previous provider failure", updatedAt: now, }, createdAt: now, diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index 1478e31af0d..91a71ff62cd 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -327,18 +327,20 @@ const make = Effect.gen(function* () { const liveSession = liveSessionsByThreadId.get(thread.id); if (liveSession !== undefined) { + const status = mapProviderSessionStatusToOrchestrationStatus(liveSession.status); return setThreadSession({ threadId: thread.id, session: { ...session, - status: mapProviderSessionStatusToOrchestrationStatus(liveSession.status), + status, providerName: liveSession.provider, ...(liveSession.providerInstanceId !== undefined ? { providerInstanceId: liveSession.providerInstanceId } : {}), runtimeMode: liveSession.runtimeMode, activeTurnId: liveSession.activeTurnId ?? null, - lastError: liveSession.lastError ?? session.lastError ?? null, + lastError: + liveSession.lastError ?? (status === "ready" ? null : (session.lastError ?? null)), updatedAt: now, }, createdAt: now, From d2742fcd681b350c0f0972ad0d7f015ace368b41 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 12 May 2026 20:19:22 +0100 Subject: [PATCH 13/15] Clarify restored session error copy --- .../src/orchestration/Layers/ProviderCommandReactor.test.ts | 3 ++- apps/server/src/orchestration/Layers/ProviderCommandReactor.ts | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index 48947cc36b6..4232cc64cc3 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -1646,7 +1646,8 @@ describe("ProviderCommandReactor", () => { providerInstanceId: ProviderInstanceId.make("codex"), runtimeMode: "approval-required", activeTurnId: null, - lastError: "Provider runtime is no longer active. Start a new turn to reconnect this thread.", + lastError: + "This thread was restored, but its running provider session was no longer available.\nStart a new turn to continue.", }); expect(thread?.latestTurn).toMatchObject({ turnId: asTurnId("turn-stale"), diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index 91a71ff62cd..47cef27ab48 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -97,7 +97,7 @@ const HANDLED_TURN_START_KEY_TTL = Duration.minutes(30); const DEFAULT_RUNTIME_MODE: RuntimeMode = "full-access"; const DEFAULT_THREAD_TITLE = "New thread"; const STALE_RUNNING_SESSION_DETAIL = - "Provider runtime is no longer active. Start a new turn to reconnect this thread."; + "This thread was restored, but its running provider session was no longer available.\nStart a new turn to continue."; export function providerErrorLabel(value: string | undefined): string { const normalized = value?.trim(); From 9b21444b8d3a8de9df8bf2ce1002ea7e4ee3eed8 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Wed, 13 May 2026 16:10:55 +0100 Subject: [PATCH 14/15] Fix OpenCode global event ingestion --- .../provider/Layers/OpenCodeAdapter.test.ts | 122 +++++++++++++++++- .../src/provider/Layers/OpenCodeAdapter.ts | 65 ++++++++-- 2 files changed, 176 insertions(+), 11 deletions(-) diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts index 5dcacabdbaa..c4aa8e4165f 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts @@ -80,6 +80,8 @@ const runtimeMock = { }, }; +const waitForSubscribedEventPoll = () => Effect.runPromise(Effect.sleep("5 millis")); + const OpenCodeRuntimeTestDouble: OpenCodeRuntimeShape = { startOpenCodeServerProcess: ({ binaryPath }) => Effect.gen(function* () { @@ -162,8 +164,29 @@ const OpenCodeRuntimeTestDouble: OpenCodeRuntimeShape = { event: { subscribe: async () => ({ stream: (async function* () { - for (const event of runtimeMock.state.subscribedEvents) { - yield event; + let index = 0; + for (let idleCycles = 0; idleCycles < 100; idleCycles++) { + if (index >= runtimeMock.state.subscribedEvents.length) { + await waitForSubscribedEventPoll(); + continue; + } + idleCycles = 0; + yield runtimeMock.state.subscribedEvents[index++]; + } + })(), + }), + }, + global: { + event: async () => ({ + stream: (async function* () { + let index = 0; + for (let idleCycles = 0; idleCycles < 100; idleCycles++) { + if (index >= runtimeMock.state.subscribedEvents.length) { + await waitForSubscribedEventPoll(); + continue; + } + idleCycles = 0; + yield runtimeMock.state.subscribedEvents[index++]; } })(), }), @@ -744,6 +767,101 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { }), ); + it.effect("ingests assistant text and idle completion from global OpenCode events", () => + Effect.gen(function* () { + const adapter = yield* OpenCodeAdapter; + const threadId = asThreadId("thread-opencode-global-events"); + const sessionId = "http://127.0.0.1:9999/session"; + const assistantMessageId = "msg-global-assistant"; + const textPartId = "part-global-text"; + const eventsFiber = yield* adapter.streamEvents.pipe( + Stream.filter((event) => event.threadId === threadId), + Stream.take(6), + Stream.runCollect, + Effect.forkChild, + ); + + yield* adapter.startSession({ + provider: ProviderDriverKind.make("opencode"), + threadId, + runtimeMode: "full-access", + }); + + const started = yield* adapter.sendTurn({ + threadId, + input: "Say pong", + modelSelection: { + instanceId: ProviderInstanceId.make("opencode"), + model: "openai/gpt-5", + }, + }); + + runtimeMock.state.subscribedEvents.push( + { + directory: "C:\\Users\\mike\\dev-stuff\\t3code", + payload: { + type: "message.updated", + properties: { + sessionID: sessionId, + info: { + id: assistantMessageId, + role: "assistant", + }, + }, + }, + }, + { + directory: "C:\\Users\\mike\\dev-stuff\\t3code", + payload: { + type: "message.part.updated", + properties: { + sessionID: sessionId, + part: { + id: textPartId, + sessionID: sessionId, + messageID: assistantMessageId, + type: "text", + text: "pong", + time: { start: 1, end: 2 }, + }, + time: 2, + }, + }, + }, + { + directory: "C:\\Users\\mike\\dev-stuff\\t3code", + payload: { + type: "session.idle", + properties: { + sessionID: sessionId, + }, + }, + }, + ); + + const events = Array.from(yield* Fiber.join(eventsFiber).pipe(Effect.timeout("1 second"))); + const sessions = yield* adapter.listSessions(); + const session = sessions.find((entry) => entry.threadId === threadId); + + assert.deepEqual( + events.map((event) => event.type), + [ + "session.started", + "thread.started", + "turn.started", + "content.delta", + "item.completed", + "turn.completed", + ], + ); + const delta = events.find((event) => event.type === "content.delta"); + assert.equal(delta?.type === "content.delta" ? delta.payload.delta : undefined, "pong"); + assert.equal(events.at(-1)?.turnId, started.turnId); + assert.equal(session?.status, "ready"); + assert.equal(session?.activeTurnId, undefined); + }), + ); + it.effect("writes provider-native observability records using the session thread id", () => Effect.gen(function* () { const nativeEvents: Array<{ diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.ts index ad5c4358049..f33c6182dd5 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.ts @@ -21,7 +21,13 @@ import * as Random from "effect/Random"; import * as Ref from "effect/Ref"; import * as Scope from "effect/Scope"; import * as Stream from "effect/Stream"; -import type { OpencodeClient, Part, PermissionRequest, QuestionRequest } from "@opencode-ai/sdk/v2"; +import type { + Event as OpenCodeEvent, + OpencodeClient, + Part, + PermissionRequest, + QuestionRequest, +} from "@opencode-ai/sdk/v2"; import { getModelSelectionStringOptionValue } from "@t3tools/shared/model"; import { resolveAttachmentPath } from "../../attachmentStore.ts"; @@ -58,7 +64,7 @@ interface OpenCodeTurnSnapshot { } type OpenCodeSubscribedEvent = - Awaited> extends { + Awaited> extends { readonly stream: AsyncIterable; } ? TEvent @@ -89,7 +95,7 @@ interface OpenCodeSessionContext { /** * Sole lifecycle handle for the session. Closing this scope: * - aborts the `AbortController` registered as a finalizer - * (cancels the in-flight `event.subscribe` fetch), + * (cancels the in-flight `global.event` fetch), * - interrupts the event-pump and server-exit fibers forked * via `Effect.forkIn(sessionScope)`, * - tears down the OpenCode server process for scope-owned servers. @@ -389,6 +395,24 @@ function toolStateCreatedAt(part: Extract): string | und } } +function unwrapOpenCodeSubscribedEvent( + raw: OpenCodeSubscribedEvent | OpenCodeEvent, +): OpenCodeEvent | null { + if (typeof raw !== "object" || raw === null) { + return null; + } + if ("payload" in raw) { + const payload = raw.payload; + if (typeof payload === "object" && payload !== null && "type" in payload) { + return payload as OpenCodeEvent; + } + } + if ("type" in raw) { + return raw as OpenCodeEvent; + } + return null; +} + function sessionErrorMessage(error: unknown): string { if (!error || typeof error !== "object") { return "OpenCode session failed."; @@ -634,7 +658,7 @@ export function makeOpenCodeAdapter( const handleSubscribedEvent = Effect.fn("handleSubscribedEvent")(function* ( context: OpenCodeSessionContext, - event: OpenCodeSubscribedEvent, + event: OpenCodeEvent, ) { const payloadSessionId = "properties" in event ? (event.properties as { sessionID?: unknown }).sessionID : undefined; @@ -863,6 +887,25 @@ export function makeOpenCodeAdapter( break; } + case "session.idle": { + if (turnId) { + context.activeTurnId = undefined; + yield* updateProviderSession(context, { status: "ready" }, { clearActiveTurnId: true }); + yield* emit({ + ...(yield* buildEventBase({ + threadId: context.session.threadId, + turnId, + raw: event, + })), + type: "turn.completed", + payload: { + state: "completed", + }, + }); + } + break; + } + case "session.status": { if (event.properties.status.type === "busy") { yield* updateProviderSession(context, { @@ -954,7 +997,7 @@ export function makeOpenCodeAdapter( const startEventPump = Effect.fn("startEventPump")(function* (context: OpenCodeSessionContext) { // One AbortController per session scope. The finalizer fires when // the scope closes (explicit stop, unexpected exit, or layer - // shutdown) and cancels the in-flight `event.subscribe` fetch so + // shutdown) and cancels the in-flight `global.event` fetch so // the async iterable unwinds cleanly. const eventsAbortController = new AbortController(); yield* Scope.addFinalizer( @@ -965,8 +1008,8 @@ export function makeOpenCodeAdapter( // Fibers forked into `context.sessionScope` are interrupted // automatically when the scope closes — no bookkeeping required. yield* Effect.flatMap( - runOpenCodeSdk("event.subscribe", () => - context.client.event.subscribe(undefined, { + runOpenCodeSdk("global.event", () => + context.client.global.event({ signal: eventsAbortController.signal, }), ), @@ -975,11 +1018,15 @@ export function makeOpenCodeAdapter( subscription.stream, (cause) => new OpenCodeRuntimeError({ - operation: "event.subscribe", + operation: "global.event", detail: openCodeRuntimeErrorDetail(cause), cause, }), - ).pipe(Stream.runForEach((event) => handleSubscribedEvent(context, event))), + ).pipe( + Stream.map(unwrapOpenCodeSubscribedEvent), + Stream.filter((event): event is OpenCodeEvent => event !== null), + Stream.runForEach((event) => handleSubscribedEvent(context, event)), + ), ).pipe( Effect.exit, Effect.flatMap((exit) => From 3f83b8207b22736d0e040af1adc8428abb11bd66 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Thu, 14 May 2026 16:45:23 +0100 Subject: [PATCH 15/15] Revert "Fix OpenCode global event ingestion" This reverts commit 9b21444b8d3a8de9df8bf2ce1002ea7e4ee3eed8. --- .../provider/Layers/OpenCodeAdapter.test.ts | 122 +----------------- .../src/provider/Layers/OpenCodeAdapter.ts | 65 ++-------- 2 files changed, 11 insertions(+), 176 deletions(-) diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts index c4aa8e4165f..5dcacabdbaa 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts @@ -80,8 +80,6 @@ const runtimeMock = { }, }; -const waitForSubscribedEventPoll = () => Effect.runPromise(Effect.sleep("5 millis")); - const OpenCodeRuntimeTestDouble: OpenCodeRuntimeShape = { startOpenCodeServerProcess: ({ binaryPath }) => Effect.gen(function* () { @@ -164,29 +162,8 @@ const OpenCodeRuntimeTestDouble: OpenCodeRuntimeShape = { event: { subscribe: async () => ({ stream: (async function* () { - let index = 0; - for (let idleCycles = 0; idleCycles < 100; idleCycles++) { - if (index >= runtimeMock.state.subscribedEvents.length) { - await waitForSubscribedEventPoll(); - continue; - } - idleCycles = 0; - yield runtimeMock.state.subscribedEvents[index++]; - } - })(), - }), - }, - global: { - event: async () => ({ - stream: (async function* () { - let index = 0; - for (let idleCycles = 0; idleCycles < 100; idleCycles++) { - if (index >= runtimeMock.state.subscribedEvents.length) { - await waitForSubscribedEventPoll(); - continue; - } - idleCycles = 0; - yield runtimeMock.state.subscribedEvents[index++]; + for (const event of runtimeMock.state.subscribedEvents) { + yield event; } })(), }), @@ -767,101 +744,6 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { }), ); - it.effect("ingests assistant text and idle completion from global OpenCode events", () => - Effect.gen(function* () { - const adapter = yield* OpenCodeAdapter; - const threadId = asThreadId("thread-opencode-global-events"); - const sessionId = "http://127.0.0.1:9999/session"; - const assistantMessageId = "msg-global-assistant"; - const textPartId = "part-global-text"; - const eventsFiber = yield* adapter.streamEvents.pipe( - Stream.filter((event) => event.threadId === threadId), - Stream.take(6), - Stream.runCollect, - Effect.forkChild, - ); - - yield* adapter.startSession({ - provider: ProviderDriverKind.make("opencode"), - threadId, - runtimeMode: "full-access", - }); - - const started = yield* adapter.sendTurn({ - threadId, - input: "Say pong", - modelSelection: { - instanceId: ProviderInstanceId.make("opencode"), - model: "openai/gpt-5", - }, - }); - - runtimeMock.state.subscribedEvents.push( - { - directory: "C:\\Users\\mike\\dev-stuff\\t3code", - payload: { - type: "message.updated", - properties: { - sessionID: sessionId, - info: { - id: assistantMessageId, - role: "assistant", - }, - }, - }, - }, - { - directory: "C:\\Users\\mike\\dev-stuff\\t3code", - payload: { - type: "message.part.updated", - properties: { - sessionID: sessionId, - part: { - id: textPartId, - sessionID: sessionId, - messageID: assistantMessageId, - type: "text", - text: "pong", - time: { start: 1, end: 2 }, - }, - time: 2, - }, - }, - }, - { - directory: "C:\\Users\\mike\\dev-stuff\\t3code", - payload: { - type: "session.idle", - properties: { - sessionID: sessionId, - }, - }, - }, - ); - - const events = Array.from(yield* Fiber.join(eventsFiber).pipe(Effect.timeout("1 second"))); - const sessions = yield* adapter.listSessions(); - const session = sessions.find((entry) => entry.threadId === threadId); - - assert.deepEqual( - events.map((event) => event.type), - [ - "session.started", - "thread.started", - "turn.started", - "content.delta", - "item.completed", - "turn.completed", - ], - ); - const delta = events.find((event) => event.type === "content.delta"); - assert.equal(delta?.type === "content.delta" ? delta.payload.delta : undefined, "pong"); - assert.equal(events.at(-1)?.turnId, started.turnId); - assert.equal(session?.status, "ready"); - assert.equal(session?.activeTurnId, undefined); - }), - ); - it.effect("writes provider-native observability records using the session thread id", () => Effect.gen(function* () { const nativeEvents: Array<{ diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.ts index f33c6182dd5..ad5c4358049 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.ts @@ -21,13 +21,7 @@ import * as Random from "effect/Random"; import * as Ref from "effect/Ref"; import * as Scope from "effect/Scope"; import * as Stream from "effect/Stream"; -import type { - Event as OpenCodeEvent, - OpencodeClient, - Part, - PermissionRequest, - QuestionRequest, -} from "@opencode-ai/sdk/v2"; +import type { OpencodeClient, Part, PermissionRequest, QuestionRequest } from "@opencode-ai/sdk/v2"; import { getModelSelectionStringOptionValue } from "@t3tools/shared/model"; import { resolveAttachmentPath } from "../../attachmentStore.ts"; @@ -64,7 +58,7 @@ interface OpenCodeTurnSnapshot { } type OpenCodeSubscribedEvent = - Awaited> extends { + Awaited> extends { readonly stream: AsyncIterable; } ? TEvent @@ -95,7 +89,7 @@ interface OpenCodeSessionContext { /** * Sole lifecycle handle for the session. Closing this scope: * - aborts the `AbortController` registered as a finalizer - * (cancels the in-flight `global.event` fetch), + * (cancels the in-flight `event.subscribe` fetch), * - interrupts the event-pump and server-exit fibers forked * via `Effect.forkIn(sessionScope)`, * - tears down the OpenCode server process for scope-owned servers. @@ -395,24 +389,6 @@ function toolStateCreatedAt(part: Extract): string | und } } -function unwrapOpenCodeSubscribedEvent( - raw: OpenCodeSubscribedEvent | OpenCodeEvent, -): OpenCodeEvent | null { - if (typeof raw !== "object" || raw === null) { - return null; - } - if ("payload" in raw) { - const payload = raw.payload; - if (typeof payload === "object" && payload !== null && "type" in payload) { - return payload as OpenCodeEvent; - } - } - if ("type" in raw) { - return raw as OpenCodeEvent; - } - return null; -} - function sessionErrorMessage(error: unknown): string { if (!error || typeof error !== "object") { return "OpenCode session failed."; @@ -658,7 +634,7 @@ export function makeOpenCodeAdapter( const handleSubscribedEvent = Effect.fn("handleSubscribedEvent")(function* ( context: OpenCodeSessionContext, - event: OpenCodeEvent, + event: OpenCodeSubscribedEvent, ) { const payloadSessionId = "properties" in event ? (event.properties as { sessionID?: unknown }).sessionID : undefined; @@ -887,25 +863,6 @@ export function makeOpenCodeAdapter( break; } - case "session.idle": { - if (turnId) { - context.activeTurnId = undefined; - yield* updateProviderSession(context, { status: "ready" }, { clearActiveTurnId: true }); - yield* emit({ - ...(yield* buildEventBase({ - threadId: context.session.threadId, - turnId, - raw: event, - })), - type: "turn.completed", - payload: { - state: "completed", - }, - }); - } - break; - } - case "session.status": { if (event.properties.status.type === "busy") { yield* updateProviderSession(context, { @@ -997,7 +954,7 @@ export function makeOpenCodeAdapter( const startEventPump = Effect.fn("startEventPump")(function* (context: OpenCodeSessionContext) { // One AbortController per session scope. The finalizer fires when // the scope closes (explicit stop, unexpected exit, or layer - // shutdown) and cancels the in-flight `global.event` fetch so + // shutdown) and cancels the in-flight `event.subscribe` fetch so // the async iterable unwinds cleanly. const eventsAbortController = new AbortController(); yield* Scope.addFinalizer( @@ -1008,8 +965,8 @@ export function makeOpenCodeAdapter( // Fibers forked into `context.sessionScope` are interrupted // automatically when the scope closes — no bookkeeping required. yield* Effect.flatMap( - runOpenCodeSdk("global.event", () => - context.client.global.event({ + runOpenCodeSdk("event.subscribe", () => + context.client.event.subscribe(undefined, { signal: eventsAbortController.signal, }), ), @@ -1018,15 +975,11 @@ export function makeOpenCodeAdapter( subscription.stream, (cause) => new OpenCodeRuntimeError({ - operation: "global.event", + operation: "event.subscribe", detail: openCodeRuntimeErrorDetail(cause), cause, }), - ).pipe( - Stream.map(unwrapOpenCodeSubscribedEvent), - Stream.filter((event): event is OpenCodeEvent => event !== null), - Stream.runForEach((event) => handleSubscribedEvent(context, event)), - ), + ).pipe(Stream.runForEach((event) => handleSubscribedEvent(context, event))), ).pipe( Effect.exit, Effect.flatMap((exit) =>