Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 248 additions & 1 deletion apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import {
makeSqlitePersistenceLive,
SqlitePersistenceMemory,
} from "../../persistence/Layers/Sqlite.ts";
import { OrchestrationEventStore } from "../../persistence/Services/OrchestrationEventStore.ts";
import {
OrchestrationEventStore,
type OrchestrationEventStoreShape,
} from "../../persistence/Services/OrchestrationEventStore.ts";
import { RepositoryIdentityResolverLive } from "../../project/Layers/RepositoryIdentityResolver.ts";
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
import {
Expand Down Expand Up @@ -52,6 +55,134 @@ const exists = (filePath: string) =>

const BaseTestLayer = makeProjectionPipelinePrefixedTestLayer("t3-projection-pipeline-test-");

const appendCleanRebuildTurnScenario = Effect.fn("appendCleanRebuildTurnScenario")(function* (
eventStore: OrchestrationEventStoreShape,
input: {
readonly slug: string;
readonly projectId: ProjectId;
readonly threadId: ThreadId;
readonly turnId: TurnId;
readonly messageId: MessageId;
readonly createdAt: string;
readonly runningAt: string;
readonly finalAt: string;
readonly finalStatus: "stopped" | "error";
readonly finalLastError: string;
},
) {
yield* eventStore.append({
type: "project.created",
eventId: EventId.make(`evt-${input.slug}-project`),
aggregateKind: "project",
aggregateId: input.projectId,
occurredAt: input.createdAt,
commandId: CommandId.make(`cmd-${input.slug}-project`),
causationEventId: null,
correlationId: CorrelationId.make(`cmd-${input.slug}-project`),
metadata: {},
payload: {
projectId: input.projectId,
title: `Project ${input.slug}`,
workspaceRoot: `/tmp/${input.slug}`,
defaultModelSelection: null,
scripts: [],
createdAt: input.createdAt,
updatedAt: input.createdAt,
},
});
yield* eventStore.append({
type: "thread.created",
eventId: EventId.make(`evt-${input.slug}-thread`),
aggregateKind: "thread",
aggregateId: input.threadId,
occurredAt: input.createdAt,
commandId: CommandId.make(`cmd-${input.slug}-thread`),
causationEventId: null,
correlationId: CorrelationId.make(`cmd-${input.slug}-thread`),
metadata: {},
payload: {
threadId: input.threadId,
projectId: input.projectId,
title: `Thread ${input.slug}`,
modelSelection: {
instanceId: ProviderInstanceId.make("codex"),
model: "gpt-5-codex",
},
runtimeMode: "approval-required",
interactionMode: "default",
branch: null,
worktreePath: null,
createdAt: input.createdAt,
updatedAt: input.createdAt,
},
});
yield* eventStore.append({
type: "thread.turn-start-requested",
eventId: EventId.make(`evt-${input.slug}-turn-start`),
aggregateKind: "thread",
aggregateId: input.threadId,
occurredAt: input.createdAt,
commandId: CommandId.make(`cmd-${input.slug}-turn-start`),
causationEventId: null,
correlationId: CorrelationId.make(`cmd-${input.slug}-turn-start`),
metadata: {},
payload: {
threadId: input.threadId,
messageId: input.messageId,
runtimeMode: "approval-required",
createdAt: input.createdAt,
},
});
yield* eventStore.append({
type: "thread.session-set",
eventId: EventId.make(`evt-${input.slug}-running`),
aggregateKind: "thread",
aggregateId: input.threadId,
occurredAt: input.runningAt,
commandId: CommandId.make(`cmd-${input.slug}-running`),
causationEventId: null,
correlationId: CorrelationId.make(`cmd-${input.slug}-running`),
metadata: {},
payload: {
threadId: input.threadId,
session: {
threadId: input.threadId,
status: "running",
providerName: "codex",
providerInstanceId: ProviderInstanceId.make("codex"),
runtimeMode: "approval-required",
activeTurnId: input.turnId,
lastError: null,
updatedAt: input.runningAt,
},
},
});
yield* eventStore.append({
type: "thread.session-set",
eventId: EventId.make(`evt-${input.slug}-final`),
aggregateKind: "thread",
aggregateId: input.threadId,
occurredAt: input.finalAt,
commandId: CommandId.make(`cmd-${input.slug}-final`),
causationEventId: null,
correlationId: CorrelationId.make(`cmd-${input.slug}-final`),
metadata: {},
payload: {
threadId: input.threadId,
session: {
threadId: input.threadId,
status: input.finalStatus,
providerName: "codex",
providerInstanceId: ProviderInstanceId.make("codex"),
runtimeMode: "approval-required",
activeTurnId: null,
lastError: input.finalLastError,
updatedAt: input.finalAt,
},
},
});
});

it.layer(BaseTestLayer)("OrchestrationProjectionPipeline", (it) => {
it.effect("bootstraps all projection states and writes projection rows", () =>
Effect.gen(function* () {
Expand Down Expand Up @@ -2172,6 +2303,122 @@ it.effect("restores pending turn-start metadata across projection pipeline resta
),
);

it.effect(
"settles running turns during a clean projection rebuild without thread latest-turn state",
() =>
Effect.gen(function* () {
const projectionPipeline = yield* OrchestrationProjectionPipeline;
const eventStore = yield* OrchestrationEventStore;
const sql = yield* SqlClient.SqlClient;
const projectId = ProjectId.make("project-rebuild-settle");
const threadId = ThreadId.make("thread-rebuild-settle");
const turnId = TurnId.make("turn-rebuild-settle");
const messageId = MessageId.make("message-rebuild-settle");
const createdAt = "2026-02-26T15:00:00.000Z";
const runningAt = "2026-02-26T15:00:05.000Z";
const stoppedAt = "2026-02-26T15:00:10.000Z";

yield* appendCleanRebuildTurnScenario(eventStore, {
slug: "rebuild-settle",
projectId,
threadId,
turnId,
messageId,
createdAt,
runningAt,
finalAt: stoppedAt,
finalStatus: "stopped",
finalLastError: "Provider runtime is no longer active.",
});

yield* projectionPipeline.bootstrap;

const turnRows = yield* sql<{
readonly turnId: string;
readonly state: string;
readonly completedAt: string | null;
}>`
SELECT
turn_id AS "turnId",
state,
completed_at AS "completedAt"
FROM projection_turns
WHERE thread_id = ${threadId}
AND turn_id = ${turnId}
`;

assert.deepEqual(turnRows, [
{
turnId,
state: "interrupted",
completedAt: stoppedAt,
},
]);
}).pipe(
Effect.provide(
makeProjectionPipelinePrefixedTestLayer("t3-projection-pipeline-rebuild-settle-"),
),
),
);

it.effect(
"preserves failed turn state during a clean projection rebuild without thread latest-turn state",
() =>
Effect.gen(function* () {
const projectionPipeline = yield* OrchestrationProjectionPipeline;
const eventStore = yield* OrchestrationEventStore;
const sql = yield* SqlClient.SqlClient;
const projectId = ProjectId.make("project-rebuild-error");
const threadId = ThreadId.make("thread-rebuild-error");
const turnId = TurnId.make("turn-rebuild-error");
const messageId = MessageId.make("message-rebuild-error");
const createdAt = "2026-02-26T16:00:00.000Z";
const runningAt = "2026-02-26T16:00:05.000Z";
const failedAt = "2026-02-26T16:00:10.000Z";

yield* appendCleanRebuildTurnScenario(eventStore, {
slug: "rebuild-error",
projectId,
threadId,
turnId,
messageId,
createdAt,
runningAt,
finalAt: failedAt,
finalStatus: "error",
finalLastError: "Prompt failed.",
});

yield* projectionPipeline.bootstrap;

const turnRows = yield* sql<{
readonly turnId: string;
readonly state: string;
readonly completedAt: string | null;
}>`
SELECT
turn_id AS "turnId",
state,
completed_at AS "completedAt"
FROM projection_turns
WHERE thread_id = ${threadId}
AND turn_id = ${turnId}
`;

assert.deepEqual(turnRows, [
{
turnId,
state: "error",
completedAt: failedAt,
},
]);
}).pipe(
Effect.provide(
makeProjectionPipelinePrefixedTestLayer("t3-projection-pipeline-rebuild-error-"),
),
),
);

const engineLayer = it.layer(
OrchestrationEngineLive.pipe(
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
Expand Down
50 changes: 49 additions & 1 deletion apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import {
ApprovalRequestId,
type ChatAttachment,
type OrchestrationEvent,
type OrchestrationSessionStatus,
ThreadId,
type TurnId,
} from "@t3tools/contracts";
import * as Effect from "effect/Effect";
import * as FileSystem from "effect/FileSystem";
Expand Down Expand Up @@ -177,6 +179,21 @@ function deriveHasActionableProposedPlan(input: {
return latestPlan !== null && latestPlan.implementedAt === null;
}

function settleTurnStateFromSessionStatus(
status: OrchestrationSessionStatus,
): ProjectionTurn["state"] | null {
switch (status) {
case "error":
return "error";
case "ready":
case "interrupted":
case "stopped":
return "interrupted";
default:
return null;
}
}

function retainProjectionMessagesAfterRevert(
messages: ReadonlyArray<ProjectionThreadMessage>,
turns: ReadonlyArray<ProjectionTurn>,
Expand Down Expand Up @@ -712,9 +729,12 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
if (Option.isNone(existingRow)) {
return;
}
const latestTurnId =
event.payload.session.activeTurnId ??
(event.payload.session.status === "running" ? null : existingRow.value.latestTurnId);
yield* projectionThreadRepository.upsert({
...existingRow.value,
latestTurnId: event.payload.session.activeTurnId,
latestTurnId,
updatedAt: event.occurredAt,
});
yield* refreshThreadShellSummary(event.payload.threadId);
Expand Down Expand Up @@ -998,6 +1018,34 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
case "thread.session-set": {
const turnId = event.payload.session.activeTurnId;
if (turnId === null || event.payload.session.status !== "running") {
const settledState = settleTurnStateFromSessionStatus(event.payload.session.status);
if (settledState === null) {
return;
}
const turns = yield* projectionTurnRepository.listByThreadId({
threadId: event.payload.threadId,
});
const latestRunningTurn = turns
.filter(
(turn): turn is ProjectionTurn & { readonly turnId: TurnId } =>
turn.turnId !== null && turn.state === "running",
)
.reduce<(ProjectionTurn & { readonly turnId: TurnId }) | null>(
(latest, turn) =>
latest === null || turn.requestedAt > latest.requestedAt ? turn : latest,
null,
);
if (latestRunningTurn === null) {
return;
}

yield* projectionTurnRepository.upsertByTurnId({
...latestRunningTurn,
state: settledState,
startedAt: latestRunningTurn.startedAt ?? event.payload.session.updatedAt,
requestedAt: latestRunningTurn.requestedAt ?? event.payload.session.updatedAt,
completedAt: latestRunningTurn.completedAt ?? event.payload.session.updatedAt,
});
return;
}

Expand Down
Loading
Loading