From 1cbb859c9177f551488e98dbeae350c8d8549391 Mon Sep 17 00:00:00 2001 From: prosdev Date: Mon, 16 Mar 2026 14:55:35 -0700 Subject: [PATCH 1/9] =?UTF-8?q?docs(canvas):=20add=20Phase=202=20plan=20?= =?UTF-8?q?=E2=80=94=20SSE=20run=20panel?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five-part plan covering SSE service layer, RunSlice state machine, run button with validation, run panel with node highlighting, and reconnection with human-in-the-loop resume. Two review passes addressed race conditions (terminal event + onerror, concurrent reconnection), auth via Vite proxy, and Sheet bottom variant. Co-Authored-By: Claude Opus 4.6 (1M context) --- .claude/gw-plans/canvas/README.md | 2 +- .../canvas/phase-2-sse-run-panel/overview.md | 46 +++ .../phase-2.1-sse-service-layer.md | 280 +++++++++++++ .../phase-2.2-run-slice.md | 373 ++++++++++++++++++ .../phase-2.3-run-button-validation.md | 234 +++++++++++ .../phase-2.4-run-panel.md | 310 +++++++++++++++ .../phase-2.5-reconnection-resume.md | 264 +++++++++++++ 7 files changed, 1508 insertions(+), 1 deletion(-) create mode 100644 .claude/gw-plans/canvas/phase-2-sse-run-panel/overview.md create mode 100644 .claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.1-sse-service-layer.md create mode 100644 .claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.2-run-slice.md create mode 100644 .claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.3-run-button-validation.md create mode 100644 .claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.4-run-panel.md create mode 100644 .claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.5-reconnection-resume.md diff --git a/.claude/gw-plans/canvas/README.md b/.claude/gw-plans/canvas/README.md index f37d2e6..357c1b8 100644 --- a/.claude/gw-plans/canvas/README.md +++ b/.claude/gw-plans/canvas/README.md @@ -7,7 +7,7 @@ React 19 + React Flow frontend phases. Depends on execution phases 3-4 for API s | Phase | Plan | Status | |-------|------|--------| | 1 | [Canvas core](phase-1-canvas-core/overview.md) -- Home view, Start/LLM/End nodes, edge wiring, config panel, save/load | Complete | -| 2 | SSE run panel -- stream display, node highlighting, reconnection | Not started | +| 2 | [SSE run panel](phase-2-sse-run-panel/overview.md) -- SSE streaming, run panel, node highlighting, reconnection, resume | Planned | | 3 | Full node set -- Tool/Condition/HumanInput nodes, settings page | Not started | | 4 | Validation, run input modal, state panel | Not started | | 5 | Error handling, run history, debug panel, JSON schema panel | Not started | diff --git a/.claude/gw-plans/canvas/phase-2-sse-run-panel/overview.md b/.claude/gw-plans/canvas/phase-2-sse-run-panel/overview.md new file mode 100644 index 0000000..71a0a8e --- /dev/null +++ b/.claude/gw-plans/canvas/phase-2-sse-run-panel/overview.md @@ -0,0 +1,46 @@ +# Canvas Phase 2 — SSE Run Panel + +## Goal + +Wire the canvas to the execution backend so users can run a graph and watch +it execute in real time. Node-by-node SSE streaming, active node highlighting, +reconnection, and human-in-the-loop resume. + +## What already exists + +| Layer | Status | +|-------|--------| +| Execution API (`POST /v1/graphs/{id}/run`, `GET /v1/runs/{id}/stream`, resume, cancel, status) | Fully implemented | +| SSE event types (`@shared/events` — 7 event types) | Defined | +| `runs.ts` API stub | Skeleton — wrong URL path, no reconnection | +| `runSlice.ts` store stub | Skeleton — all action bodies are TODOs | +| `CanvasHeader` | No run button yet | +| `Sheet` UI component | Supports right/left — needs bottom | +| `UISlice.panelLayout` | Already tracks `"right" \| "bottom"` | +| Node pulse CSS | Documented in gw-frontend skill, not yet implemented | + +## Parts + +| Part | Summary | Depends on | +|------|---------|------------| +| 2.1 | [SSE service layer](phase-2.1-sse-service-layer.md) — EventSource wrapper with reconnection, correct API paths, typed event parsing | — | +| 2.2 | [RunSlice implementation](phase-2.2-run-slice.md) — Full state machine, event dispatch, start/cancel/resume actions | 2.1 | +| 2.3 | [Run button + validation](phase-2.3-run-button-validation.md) — CanvasHeader run button, client-side validation, run input dialog | 2.2 | +| 2.4 | [Run panel + node highlighting](phase-2.4-run-panel.md) — Bottom/right panel with event timeline, active node pulse on canvas | 2.2 | +| 2.5 | [Reconnection + resume UI](phase-2.5-reconnection-resume.md) — Full reconnection state machine with backoff, human-in-the-loop resume form | 2.1, 2.4 | + +## Out of scope (Phase 3+) + +- Tool/Condition/HumanInput node components (Phase 3) +- Debug panel with per-node state inspection (Phase 5) +- Run history list (Phase 5) +- Run input modal with schema-driven form fields (Phase 4) + +## Architecture constraints + +- Components read store only — no `fetch()`, no API imports +- `runSlice` calls `@api/runs` — owns SSE lifecycle +- `EventSource` is managed by the service layer, not the store +- Reconnection uses `Last-Event-ID` for server-side replay +- All API path params use `encodeURIComponent()` +- Toast for errors via `useUIStore.getState().showToast()` diff --git a/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.1-sse-service-layer.md b/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.1-sse-service-layer.md new file mode 100644 index 0000000..a634158 --- /dev/null +++ b/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.1-sse-service-layer.md @@ -0,0 +1,280 @@ +# Phase 2.1 — SSE Service Layer + +## Goal + +Replace the `runs.ts` stub with a production-ready SSE service layer that +handles typed event parsing, reconnection via `Last-Event-ID`, and clean +teardown. + +## Files to modify + +| File | Action | +|------|--------| +| `packages/canvas/src/api/runs.ts` | Rewrite | +| `packages/canvas/src/api/client.ts` | Export `apiUrl()` helper | + +## Design + +### `apiUrl()` helper + +The base `request()` in `client.ts` already uses `BASE_URL = "/api"`. SSE +uses `EventSource` which doesn't go through `request()`, so we export a +shared `apiUrl(path)` function. + +```typescript +// client.ts +const BASE_URL = "/api"; // existing — Vite proxy rewrites /api → /v1 + +export function apiUrl(path: string): string { + return `${BASE_URL}${path}`; +} +``` + +Refactor `request()` to use `apiUrl()` internally. The `/api` prefix is +critical — it routes through the Vite dev proxy (which rewrites `/api` to +`/v1` and forwards to `localhost:8000`). In production, the reverse proxy +does the same. **Never use `http://localhost:8000/v1` directly** — that +bypasses the proxy and breaks EventSource auth. + +### `runs.ts` — five exports + +```typescript +// 1. Start a run — POST /api/graphs/{graph_id}/run +export async function startRun( + graphId: string, + input?: Record, +): Promise<{ run_id: string; status: string }> + +// 2. Connect to SSE stream — GET /api/runs/{run_id}/stream +// Returns a cleanup function. Caller provides typed handlers. +export function connectStream( + runId: string, + handlers: StreamHandlers, + lastEventId?: number, +): () => void + +// 3. Resume a paused run — POST /api/runs/{run_id}/resume +export async function resumeRun( + runId: string, + input: unknown, +): Promise<{ status: string }> + +// 4. Cancel a run — POST /api/runs/{run_id}/cancel +export async function cancelRun(runId: string): Promise + +// 5. Get run status — GET /api/runs/{run_id}/status +export async function getRunStatus( + runId: string, +): Promise +``` + +### `StreamHandlers` type + +```typescript +export interface StreamHandlers { + onEvent: (event: GraphEvent, eventId: number | null) => void; + onError: (error: Error) => void; +} +``` + +Note: **No `onClose` handler.** `EventSource` has no native close event. +When the server ends the stream, EventSource fires `onerror`. Terminal +events (`graph_completed`, non-recoverable `error`) are detected by the +store via `_handleEvent`, not via a separate close signal. + +### `startRun` implementation + +```typescript +export async function startRun( + graphId: string, + input?: Record, +): Promise<{ run_id: string; status: string }> { + return request<{ run_id: string; status: string }>( + `/graphs/${encodeURIComponent(graphId)}/run`, + { + method: "POST", + body: JSON.stringify({ input: input ?? {} }), + }, + ); +} +``` + +Note: The `input` value is wrapped in `{ input: ... }` to match the server's +`StartRunRequest` schema. Encoding happens here in the service layer — +callers pass raw IDs. + +### `connectStream` implementation + +```typescript +export function connectStream( + runId: string, + handlers: StreamHandlers, + lastEventId?: number, +): () => void { + const encoded = encodeURIComponent(runId); + const params = lastEventId != null ? `?last_event_id=${lastEventId}` : ""; + const url = apiUrl(`/runs/${encoded}/stream${params}`); + + const source = new EventSource(url); + + // Listen for each known event type (server sends typed SSE events) + const EVENT_TYPES = [ + "run_started", "node_started", "node_completed", + "edge_traversed", "graph_paused", "graph_completed", "error", + ] as const; + + for (const type of EVENT_TYPES) { + source.addEventListener(type, (e: MessageEvent) => { + const eventId = e.lastEventId ? Number(e.lastEventId) : null; + try { + const data = JSON.parse(e.data); + handlers.onEvent({ event: type, data } as GraphEvent, eventId); + } catch { + // Malformed SSE data — skip event, don't crash + } + }); + } + + source.onerror = () => { + source.close(); + handlers.onError(new Error("SSE connection lost")); + }; + + return () => { + source.close(); + }; +} +``` + +Key decisions: +- **Named event listeners** (`addEventListener(type, ...)`) instead of + `onmessage` because the server sends typed SSE events (`event: node_started`), + not generic `message` events. +- **`eventId` passed to handler** so the store can track `lastEventId` for + reconnection replay. +- **No auto-reconnect** — `EventSource` has built-in reconnection but we + disable it (close on error) because the reconnection state machine in + `runSlice` needs to control backoff and status-check logic. +- **JSON parse errors caught** — malformed data is silently skipped rather + than crashing the event loop. + +### `resumeRun` implementation + +```typescript +export async function resumeRun( + runId: string, + input: unknown, +): Promise<{ status: string }> { + return request<{ status: string }>( + `/runs/${encodeURIComponent(runId)}/resume`, + { + method: "POST", + body: JSON.stringify({ input }), + }, + ); +} +``` + +Note: Input wrapped in `{ input: ... }` to match `ResumeRunRequest` schema. + +### `cancelRun` implementation + +```typescript +export async function cancelRun(runId: string): Promise { + await request(`/runs/${encodeURIComponent(runId)}/cancel`, { + method: "POST", + }); +} +``` + +Server returns `{ detail: "Cancel requested" }` — we ignore the response body. + +### `getRunStatus` implementation + +```typescript +export async function getRunStatus( + runId: string, +): Promise { + return request( + `/runs/${encodeURIComponent(runId)}/status`, + ); +} +``` + +### `RunStatusResponse` type + +```typescript +export interface RunStatusResponse { + run_id: string; + graph_id: string; + status: "running" | "paused" | "completed" | "error"; + node_id: string | null; + prompt: string | null; + final_state: unknown | null; + duration_ms: number | null; + error: string | null; +} +``` + +### Authentication for EventSource + +`EventSource` doesn't support custom headers. Auth works via the Vite proxy: + +- **Dev**: Vite proxy rewrites `/api` → `/v1` and forwards to `localhost:8000`. + The API key is injected by the proxy (or the execution server runs without + auth in dev mode). +- **Production**: Same-origin reverse proxy handles the rewrite. No API key + in browser URLs. + +The existing Vite proxy config already covers `/api/*`, which includes +`/api/runs/{id}/stream`. No proxy changes needed. + +## Tests + +### `packages/canvas/src/api/__tests__/runs.test.ts` + +Mock `fetch` via `vi.fn()` and `EventSource` via a lightweight mock. + +| Test | What it verifies | +|------|-----------------| +| `startRun sends correct URL and body shape` | URL is `/api/graphs/{encoded}/run`, body is `{ input: {} }` | +| `startRun encodes graph ID with special chars` | `my graph#1` → `/api/graphs/my%20graph%231/run` | +| `connectStream listens for all 7 event types` | `addEventListener` called for each type | +| `connectStream passes eventId to handler` | `e.lastEventId` forwarded as number | +| `connectStream handles JSON parse errors` | Malformed data doesn't crash, handler not called | +| `connectStream cleanup closes EventSource` | `source.close()` called | +| `resumeRun wraps input in request body` | Body is `{ input: }` | +| `cancelRun sends POST to correct URL` | URL is `/api/runs/{encoded}/cancel` | +| `getRunStatus returns typed response` | Response shape matches `RunStatusResponse` | + +### EventSource mock strategy + +```typescript +class MockEventSource { + listeners = new Map(); + addEventListener(type: string, fn: Function) { this.listeners.set(type, fn); } + close = vi.fn(); + // Simulate: mockSource.emit("node_started", { ... }) + emit(type: string, data: unknown, id?: string) { + this.listeners.get(type)?.({ data: JSON.stringify(data), lastEventId: id }); + } +} +vi.stubGlobal("EventSource", MockEventSource); +``` + +## Acceptance criteria + +- [ ] `startRun()` calls `/api/graphs/{encoded}/run` with body `{ input: ... }` +- [ ] `connectStream()` receives typed events and calls `onEvent` with `GraphEvent` + `eventId` +- [ ] `connectStream()` returns a cleanup function that closes the EventSource +- [ ] `connectStream()` supports `lastEventId` query param for reconnection replay +- [ ] `connectStream()` catches JSON parse errors without crashing +- [ ] `resumeRun()` wraps input in `{ input: ... }` body +- [ ] `cancelRun()` sends POST, ignores response body +- [ ] `getRunStatus()` returns typed `RunStatusResponse` +- [ ] All URLs use `encodeURIComponent()` on path params +- [ ] All URLs use `/api` prefix (goes through Vite proxy) +- [ ] No `onClose` in `StreamHandlers` (EventSource has no close event) +- [ ] `apiUrl()` exported from `client.ts`, used by both `request()` and EventSource +- [ ] `tsc --noEmit` passes +- [ ] Unit tests pass diff --git a/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.2-run-slice.md b/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.2-run-slice.md new file mode 100644 index 0000000..a6ca0a8 --- /dev/null +++ b/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.2-run-slice.md @@ -0,0 +1,373 @@ +# Phase 2.2 — RunSlice Implementation + +## Goal + +Implement the full run lifecycle state machine in `runSlice.ts` — start a +run, dispatch SSE events to state, highlight active node, handle completion +and errors. + +## Depends on + +- 2.1 (SSE service layer) + +## Files to modify + +| File | Action | +|------|--------| +| `packages/canvas/src/store/runSlice.ts` | Rewrite stub | + +## State shape + +```typescript +export type RunStatus = + | "idle" + | "running" + | "paused" + | "reconnecting" + | "completed" + | "error" + | "connection_lost"; + +export interface RunSlice { + // State + activeRunId: string | null; + runStatus: RunStatus; + activeNodeId: string | null; + runOutput: GraphEvent[]; + reconnectAttempts: number; + lastEventId: number; + finalState: unknown | null; + durationMs: number | null; + errorMessage: string | null; + pausedPrompt: string | null; + + // Actions + startRun: (graphId: string, input?: Record) => Promise; + cancelRun: () => Promise; + resumeRun: (input: unknown) => Promise; + resetRun: () => void; + + // Internal — called by SSE event handlers, not by components + _handleEvent: (event: GraphEvent, eventId: number | null) => void; + _handleStreamError: (error: Error) => void; + _disconnect: () => void; +} +``` + +### Private state (module closure, not in Zustand) + +```typescript +let cleanup: (() => void) | null = null; +let terminalReceived = false; // guards against onerror after graph_completed/error +``` + +The EventSource cleanup reference and terminal guard are held in the module +closure, not in Zustand state, because they're not serializable and +components don't need them. + +**Why `terminalReceived`?** When a terminal event (`graph_completed` or +non-recoverable `error`) arrives, `_handleEvent` calls `cleanup()` which +closes the EventSource. Closing an EventSource synchronously fires `onerror`. +If `onerror` runs before `set()` commits the new status, `_handleStreamError` +would see `runStatus === "running"` and trigger reconnection — even though +the run is actually done. The flag prevents this race. + +## State machine + +``` + ┌──────────────┐ + │ idle │ + └──────┬───────┘ + │ startRun() + ┌──────▼───────┐ + ┌─────│ running │◄────────────────┐ + │ └──┬───┬───┬───┘ │ + │ │ │ │ │ + connection │ graph_│ │ │ graph_ reconnect │ + error │ paused│ │ │ completed success │ + │ │ │ │ │ + ┌──────▼──┐ ┌──▼───┘ ┌▼──────────┐ ┌──────┴──────┐ + │ error │ │ paused│ │ completed │ │reconnecting │ + └─────────┘ └───────┘ └───────────┘ └─────────────┘ + ▲ │ + └───────── 3 failures ──────────────────┘ +``` + +`cancelRun()` from `running` or `paused` → `idle`. +`resetRun()` from any state → `idle`. + +## Event dispatch (`_handleEvent`) + +**Important**: `cleanup()` calls (which close the EventSource and may trigger +`onerror`) must happen **outside** the `set()` callback to prevent re-entrant +state updates. + +```typescript +_handleEvent: (event, eventId) => { + // Close connection on terminal events BEFORE updating state. + // Set terminalReceived flag to prevent onerror → reconnection race. + if (event.event === "graph_completed" || + (event.event === "error" && !event.data.recoverable)) { + terminalReceived = true; + cleanup?.(); + cleanup = null; + } + + set((s) => { + const output = [...s.runOutput, event]; + const base = { runOutput: output, lastEventId: eventId ?? s.lastEventId }; + + switch (event.event) { + case "run_started": + return { ...base, runStatus: "running" }; + + case "node_started": + return { ...base, activeNodeId: event.data.node_id }; + + case "node_completed": + return base; + // activeNodeId stays until next node_started or completion + + case "edge_traversed": + return base; + + case "graph_paused": + return { + ...base, + runStatus: "paused", + activeNodeId: event.data.node_id, + pausedPrompt: event.data.prompt, + }; + + case "graph_completed": + return { + ...base, + runStatus: "completed", + activeNodeId: null, + finalState: event.data.final_state, + durationMs: event.data.duration_ms, + }; + + case "error": + if (!event.data.recoverable) { + return { + ...base, + runStatus: "error", + activeNodeId: event.data.node_id ?? s.activeNodeId, + errorMessage: event.data.message, + }; + } + // Recoverable errors: log but don't change status + return base; + + default: + return base; + } + }); +} +``` + +## `startRun` implementation + +```typescript +startRun: async (graphId, input) => { + // Reset previous run state + cleanup?.(); + cleanup = null; + terminalReceived = false; + set({ + runStatus: "running", + activeRunId: null, + activeNodeId: null, + runOutput: [], + reconnectAttempts: 0, + lastEventId: 0, + finalState: null, + durationMs: null, + errorMessage: null, + pausedPrompt: null, + }); + + try { + // graphId is passed raw — encoding happens in the service layer + const { run_id } = await startRunApi(graphId, input); + set({ activeRunId: run_id }); + + const { _handleEvent, _handleStreamError } = useRunStore.getState(); + cleanup = connectStream(run_id, { + onEvent: _handleEvent, + onError: _handleStreamError, + }); + } catch (err) { + const message = err instanceof ApiError + ? err.message + : "Failed to start run"; + set({ runStatus: "error", errorMessage: message }); + useUIStore.getState().showToast(message, "error"); + } +} +``` + +Note: `graphId` is **not** encoded here. The `startRunApi()` function in the +service layer handles encoding. This avoids double-encoding. + +## `cancelRun` implementation + +```typescript +cancelRun: async () => { + const { activeRunId } = useRunStore.getState(); + if (!activeRunId) return; + + // Close connection first, then send cancel + cleanup?.(); + cleanup = null; + + try { + await cancelRunApi(activeRunId); + } catch { + // Best-effort — run may have already completed + } + set({ runStatus: "idle", activeNodeId: null }); +} +``` + +## `resumeRun` implementation (race-condition-safe) + +```typescript +resumeRun: async (input) => { + const { activeRunId, _handleEvent, _handleStreamError } = + useRunStore.getState(); + if (!activeRunId) return; + + // 1. Close old connection + cleanup?.(); + cleanup = null; + + // 2. Open NEW SSE connection BEFORE the resume POST returns + // (race condition fix per gw-frontend skill — server waits 2s for listener) + cleanup = connectStream(activeRunId, { + onEvent: _handleEvent, + onError: _handleStreamError, + }); + + set({ runStatus: "running", pausedPrompt: null }); + + // 3. Send resume request + try { + await resumeRunApi(activeRunId, input); + } catch (err) { + const message = err instanceof ApiError + ? err.message + : "Failed to resume run"; + set({ runStatus: "error", errorMessage: message }); + useUIStore.getState().showToast(message, "error"); + } +} +``` + +## `resetRun` + +```typescript +resetRun: () => { + cleanup?.(); + cleanup = null; + terminalReceived = false; + set({ + activeRunId: null, + runStatus: "idle", + activeNodeId: null, + runOutput: [], + reconnectAttempts: 0, + lastEventId: 0, + finalState: null, + durationMs: null, + errorMessage: null, + pausedPrompt: null, + }); +} +``` + +## `_handleStreamError` + +Sets status to `connection_lost`. The full reconnection logic (backoff, +status polling, reattach) is implemented in Part 2.5. This stub ensures +Part 2.2 is functional standalone. + +```typescript +_handleStreamError: (_error) => { + // Guard: skip if a terminal event was already received (onerror race) + if (terminalReceived) return; + + cleanup = null; // connection already closed by service layer + const { runStatus } = useRunStore.getState(); + // Only react if we're in an active state + if (runStatus === "running" || runStatus === "reconnecting") { + set({ runStatus: "connection_lost" }); + } +} +``` + +## `_disconnect` + +Utility for reconnection logic in 2.5 to close the current connection. + +```typescript +_disconnect: () => { + cleanup?.(); + cleanup = null; +} +``` + +## Tests + +### `packages/canvas/src/store/__tests__/runSlice.test.ts` + +Mock `startRunApi`, `connectStream`, `cancelRunApi`, `resumeRunApi` from `@api/runs`. + +| Test | What it verifies | +|------|-----------------| +| `idle → running on startRun` | Status transitions, `activeRunId` set | +| `startRun error → error status + toast` | API failure sets `errorMessage`, shows toast | +| `node_started sets activeNodeId` | `_handleEvent` with `node_started` | +| `node_completed clears nothing` | `activeNodeId` stays until next `node_started` | +| `graph_paused → paused status + prompt` | Status, `pausedPrompt` set | +| `graph_completed → completed + cleanup` | Status, `finalState`, `durationMs`, cleanup called | +| `non-recoverable error → error status + cleanup` | Status, `errorMessage`, cleanup called | +| `recoverable error → status unchanged` | Only `runOutput` updated | +| `cancelRun closes connection + sends cancel` | Cleanup called, API called, status → idle | +| `resumeRun opens SSE before POST` | `connectStream` called before `resumeRunApi` | +| `resetRun cleans up everything` | All state reset, cleanup called | +| `cleanup() outside set() callback` | No re-entrant state updates on terminal events | +| `lastEventId tracked from event handler` | Increments with each event | +| `onerror after graph_completed does not change status` | `terminalReceived` guard prevents race | +| `startRun resets terminalReceived flag` | Fresh run not blocked by previous terminal | + +### Mock strategy + +```typescript +// Mock the API module +vi.mock("@api/runs", () => ({ + startRun: vi.fn(), + connectStream: vi.fn(() => vi.fn()), // returns cleanup fn + cancelRun: vi.fn(), + resumeRun: vi.fn(), +})); +``` + +## Acceptance criteria + +- [ ] `startRun` calls API, stores `run_id`, connects SSE stream +- [ ] `graphId` passed raw to service layer (no double-encoding) +- [ ] SSE events update `runOutput`, `activeNodeId`, `runStatus` correctly +- [ ] `graph_completed` → status `completed`, stream closed **outside** `set()` +- [ ] Non-recoverable `error` → status `error`, stream closed **outside** `set()` +- [ ] `cancelRun` closes stream and sends cancel request +- [ ] `resumeRun` opens new SSE connection before POST (race condition fix) +- [ ] `resetRun` cleans up everything +- [ ] `terminalReceived` flag prevents onerror → reconnection race after completion +- [ ] `terminalReceived` reset in `startRun` and `resetRun` +- [ ] EventSource cleanup ref held in module closure, not Zustand state +- [ ] `useRunStore.getState()` used in callbacks to avoid stale closures +- [ ] Errors shown via `showToast()` +- [ ] `tsc --noEmit` passes +- [ ] Unit tests pass diff --git a/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.3-run-button-validation.md b/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.3-run-button-validation.md new file mode 100644 index 0000000..ec82f67 --- /dev/null +++ b/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.3-run-button-validation.md @@ -0,0 +1,234 @@ +# Phase 2.3 — Run Button + Client-Side Validation + +## Goal + +Add a Run button to the canvas header that validates the graph before +starting execution. Show validation errors as highlighted nodes + toast. + +## Depends on + +- 2.2 (RunSlice) + +## Files to create/modify + +| File | Action | +|------|--------| +| `packages/canvas/src/utils/validateGraph.ts` | Create | +| `packages/canvas/src/components/canvas/CanvasHeader.tsx` | Add Run/Stop button | +| `packages/canvas/src/components/canvas/RunInputDialog.tsx` | Create — simple input dialog | + +## Validation rules (Phase 2 subset) + +Only validate what Phase 2 nodes support (Start, LLM, End): + +```typescript +interface ValidationError { + message: string; + nodeId?: string; // for highlighting +} + +export function validateGraph( + nodes: NodeSchema[], + edges: EdgeSchema[], +): ValidationError[] { + const errors: ValidationError[] = []; + + // 1. Exactly one Start node + const starts = nodes.filter(n => n.type === "start"); + if (starts.length === 0) errors.push({ message: "Start node required" }); + if (starts.length > 1) errors.push({ message: "Only one Start node allowed", nodeId: starts[1].id }); + + // 2. At least one End node + const ends = nodes.filter(n => n.type === "end"); + if (ends.length === 0) errors.push({ message: "End node required" }); + + // 3. All nodes connected (no orphans) + const connectedIds = new Set(edges.flatMap(e => [e.source, e.target])); + for (const node of nodes) { + if (!connectedIds.has(node.id)) { + errors.push({ message: `${node.type} node is disconnected`, nodeId: node.id }); + } + } + + // 4. LLM nodes have a system prompt + for (const node of nodes) { + if (node.type === "llm" && !node.config.system_prompt?.trim()) { + errors.push({ message: "LLM node needs a system prompt", nodeId: node.id }); + } + } + + return errors; +} +``` + +## Run button behavior + +The CanvasHeader gets a Run/Stop button on the right side: + +``` +[← Back] Graph Name * [Save] [▶ Run] / [■ Stop] +``` + +**Important**: Read `nodes` and `edges` via `useGraphStore.getState()` inside +the click handler — not as top-level selectors. `CanvasHeader` is memoized +and subscribing to `nodes`/`edges` would re-render the header on every node +drag. + +### States + +| `runStatus` | Button | Action | +|-------------|--------|--------| +| `idle` | `▶ Run` | Validate → open input dialog (or start directly) | +| `running` | `■ Stop` | Call `cancelRun()` | +| `paused` | `▶ Resume` | Open resume input (handled by run panel, Part 2.4) | +| `reconnecting` | `■ Stop` (disabled spinner) | Wait for reconnection | +| `completed` | `▶ Run` | Reset + start new run | +| `error` | `▶ Run` | Reset + start new run | +| `connection_lost` | `▶ Run` | Reset + start new run | + +### Validation flow + +1. User clicks Run +2. `validateGraph(nodes, edges)` runs +3. If errors: + - Show first error as toast (error variant) + - If error has `nodeId`, pulse that node red briefly (CSS class) + - Don't start run +4. If valid: + - If graph has unsaved changes, auto-save first + - Open `RunInputDialog` (or skip if no input fields defined) + +## RunInputDialog + +Simple dialog for providing initial input to the run. Phase 2 keeps it +minimal — a single JSON textarea. + +```typescript +// RunInputDialog.tsx +interface RunInputDialogProps { + open: boolean; + onClose: () => void; + onSubmit: (input: Record) => void; +} +``` + +Content: +- Title: "Run Graph" +- Textarea for JSON input (pre-filled with `{}`) +- "Start" and "Cancel" buttons +- JSON parse error shown inline if invalid + +Phase 4 replaces this with a schema-driven form based on `GraphSchema.state` +fields. For now, raw JSON is sufficient. + +## Validation error highlighting + +Add a transient CSS class to nodes with validation errors: + +```typescript +// In GraphCanvas or a new hook +const [errorNodeIds, setErrorNodeIds] = useState>(new Set()); + +// When validation fails: +setErrorNodeIds(new Set(errors.filter(e => e.nodeId).map(e => e.nodeId!))); +setTimeout(() => setErrorNodeIds(new Set()), 3000); // clear after 3s +``` + +Node shell reads this to apply a red border pulse: +```css +.node-validation-error { + animation: error-pulse 0.6s ease-in-out 3; + border-color: var(--color-red-500); +} +``` + +## Auto-save before run + +If the graph has unsaved changes (`dirty === true`), save before running. +Check for save errors before proceeding. + +```typescript +const handleRun = async () => { + const { nodes, edges, graph, dirty, saveGraph, saveError } = + useGraphStore.getState(); + if (!graph) return; + + const errors = validateGraph(nodes, edges); + if (errors.length > 0) { /* show errors */ return; } + + if (dirty) { + await saveGraph(); + if (useGraphStore.getState().saveError) { + showToast("Failed to save — fix save errors before running", "error"); + return; + } + } + + setInputDialogOpen(true); +}; +``` + +Note: `saveGraph()` catches errors internally and sets `saveError` without +re-throwing. We check `saveError` after awaiting to detect failures. + +## RunInputDialog → startRun flow + +The `onSubmit` callback must close over `graph.id`: + +```typescript + setInputDialogOpen(false)} + onSubmit={(input) => { + setInputDialogOpen(false); + useRunStore.getState().startRun(graph!.id, input); + }} +/> +``` + +`graph.id` comes from `useGraphStore(s => s.graph)` which CanvasHeader +already subscribes to (for the name display). The `!` assertion is safe +because the Run button is only enabled when `graph` is non-null. + +## Tests + +### `packages/canvas/src/utils/__tests__/validateGraph.test.ts` + +| Test | What it verifies | +|------|-----------------| +| `valid graph passes` | Start → LLM → End with system prompt returns [] | +| `missing Start node` | Error with "Start node required" | +| `missing End node` | Error with "End node required" | +| `disconnected node` | Error with nodeId pointing to orphan | +| `LLM without system prompt` | Error with nodeId pointing to LLM | +| `multiple Start nodes` | Error on second Start node | + +### `packages/canvas/src/components/canvas/__tests__/CanvasHeader.test.tsx` + +Mock `useRunStore` and `useGraphStore`. + +| Test | What it verifies | +|------|-----------------| +| `shows Run button in idle` | Button text is "Run" | +| `shows Stop button when running` | Button text is "Stop" | +| `Run button triggers validation` | `validateGraph` called on click | +| `auto-saves before run when dirty` | `saveGraph` called when `dirty === true` | + +### `packages/canvas/src/components/canvas/__tests__/RunInputDialog.test.tsx` + +| Test | What it verifies | +|------|-----------------| +| `renders JSON textarea` | Pre-filled with `{}` | +| `rejects invalid JSON` | Error shown, submit disabled | +| `calls onSubmit with parsed JSON` | Valid JSON parsed and passed | + +## Acceptance criteria + +- [ ] `validateGraph()` catches missing Start/End, orphan nodes, empty LLM prompts +- [ ] Run button appears in CanvasHeader, changes label/action based on `runStatus` +- [ ] Validation errors show as toast + node highlighting (3s auto-clear) +- [ ] RunInputDialog opens for JSON input, validates JSON before submit +- [ ] Graph auto-saves before run if dirty +- [ ] Stop button calls `cancelRun()` +- [ ] `tsc --noEmit` passes +- [ ] Unit tests pass diff --git a/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.4-run-panel.md b/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.4-run-panel.md new file mode 100644 index 0000000..5eea9f1 --- /dev/null +++ b/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.4-run-panel.md @@ -0,0 +1,310 @@ +# Phase 2.4 — Run Panel + Node Highlighting + +## Goal + +Build the run output panel that shows real-time execution events, and +highlight the active node on the canvas during a run. + +## Depends on + +- 2.2 (RunSlice) + +## Files to create/modify + +| File | Action | +|------|--------| +| `packages/canvas/src/components/panels/RunPanel.tsx` | Create | +| `packages/canvas/src/components/panels/RunEventItem.tsx` | Create | +| `packages/canvas/src/components/canvas/CanvasRoute.tsx` | Add RunPanel | +| `packages/canvas/src/components/canvas/GraphCanvas.tsx` | Active node class | +| `packages/canvas/src/components/canvas/nodes/BaseNodeShell.tsx` | Active node CSS | +| `packages/canvas/src/components/ui/Sheet.tsx` | Add `"bottom"` side support | +| `packages/canvas/src/styles/tokens.ts` | Add node-active animation tokens | + +## Run panel design + +The panel shows a live timeline of execution events. It opens automatically +when a run starts and stays open until dismissed. + +### Layout + +Uses the existing `Sheet` component extended with `side: "bottom"`: + +``` +┌──────────────────────────────────────────────┐ +│ Canvas Header [▶ Run] │ +├──────────────────────────────────────────────┤ +│ │ +│ React Flow Canvas │ +│ │ +├──────────────────────────────────────────────┤ +│ Run Output [✕] │ +│ ┌──────────────────────────────────────┐ │ +│ │ ▶ run_started 12:34:05 │ │ +│ │ ● node_started: llm_1 12:34:05 │ │ +│ │ ✓ node_completed: llm_1 1.2s │ │ +│ │ → edge_traversed: llm_1 → end_1 │ │ +│ │ ✓ graph_completed 3.4s │ │ +│ └──────────────────────────────────────┘ │ +└──────────────────────────────────────────────┘ +``` + +### Panel positioning + +Read `useUIStore.panelLayout` to determine position: +- `"bottom"` — panel slides up from bottom (default during runs) +- `"right"` — panel slides in from right (shares space with config panel) + +For Phase 2, default to bottom. The toggle can be added later. + +### Sheet `"bottom"` support + +The current Sheet has `h-full w-80` hardcoded in the outer div's class +string. Adding `"bottom"` requires pulling size classes into the +`sideClasses` map so each side controls its own dimensions. + +Refactor the Sheet component: + +```typescript +type SheetSide = "left" | "right" | "bottom"; + +const sideClasses: Record = { + left: { + position: "left-0 top-0", + border: "border-r", + openTransform: "translate-x-0", + closedTransform: "-translate-x-full", + size: "h-full w-80", + }, + right: { + position: "right-0 top-0", + border: "border-l", + openTransform: "translate-x-0", + closedTransform: "translate-x-full", + size: "h-full w-80", + }, + bottom: { + position: "inset-x-0 bottom-0", + border: "border-t", + openTransform: "translate-y-0", // ← must be Y-axis, not X + closedTransform: "translate-y-full", + size: "w-full h-64", + }, +}; +``` + +The outer div template changes from: +``` +`absolute ${position} top-0 z-20 h-full w-80 ${border} ... ${open ? "translate-x-0" : transform}` +``` +to: +``` +`absolute ${position} z-20 ${size} ${border} ... ${open ? openTransform : closedTransform}` +``` + +Key changes: +- `top-0` moves into the left/right `position` entries (bottom uses `bottom-0`) +- `h-full w-80` moves into `size` per variant +- `openTransform` is split from `closedTransform` — bottom needs `translate-y-0`, + not `translate-x-0` +``` + +## RunPanel component + +```typescript +interface RunPanelProps { + // No props — reads from useRunStore directly +} + +export function RunPanel() { + const runStatus = useRunStore(s => s.runStatus); + const runOutput = useRunStore(s => s.runOutput); + const durationMs = useRunStore(s => s.durationMs); + const errorMessage = useRunStore(s => s.errorMessage); + + // Don't render if no run has been started + if (runStatus === "idle") return null; + + return ( + +
+ {runOutput.map((event, i) => ( + + ))} + {runStatus === "completed" && ( +
Completed in {formatDuration(durationMs)}
+ )} + {runStatus === "error" && ( +
{errorMessage}
+ )} + {runStatus === "connection_lost" && ( +
Connection lost — reconnecting...
+ )} +
+
+ ); +} +``` + +### Auto-scroll + +The event list auto-scrolls to the bottom as new events arrive: + +```typescript +const endRef = useRef(null); +useEffect(() => { + endRef.current?.scrollIntoView({ behavior: "smooth" }); +}, [runOutput.length]); +``` + +### Panel close behavior + +Closing the panel does NOT cancel the run. It just hides the panel. The run +continues in the background (node highlighting still active). Re-open by +clicking a "Show run" indicator in the header. + +## RunEventItem component + +Renders a single event row with icon, label, and timestamp/duration: + +```typescript +function RunEventItem({ event }: { event: GraphEvent }) { + // Icon + color per event type: + // run_started → ▶ blue + // node_started → ● amber (spinning) + // node_completed → ✓ green + duration + // edge_traversed → → zinc-400 + // graph_paused → ⏸ amber + prompt text + // graph_completed → ✓✓ green + total duration + // error → ✗ red + message +} +``` + +Keep it simple — single line per event, monospace-friendly. + +## Active node highlighting + +When `activeNodeId` is set in the run store, the corresponding node on the +canvas gets a pulsing border. + +### In BaseNodeShell + +`BaseNodeShell` does not currently accept an `id` prop. Use React Flow's +`useNodeId()` hook instead of threading `id` through all node components: + +```typescript +import { useNodeId } from "@xyflow/react"; + +export function BaseNodeShell({ children, ... }: Props) { + const nodeId = useNodeId(); // provided by React Flow context + const activeNodeId = useRunStore(s => s.activeNodeId); + const isActive = activeNodeId === nodeId; + + return ( +
+ {children} +
+ ); +} +``` + +No changes needed to `StartNode`, `LLMNode`, or `EndNode` — `useNodeId()` +reads from React Flow's internal context, which is already provided per-node. +``` + +### CSS animation (in tokens.ts or a CSS file) + +```css +.node-active { + border-color: var(--color-blue-400); + box-shadow: 0 0 12px rgba(96, 165, 250, 0.4); + animation: node-pulse 1.5s ease-in-out infinite; +} + +@keyframes node-pulse { + 0%, 100% { box-shadow: 0 0 8px rgba(96, 165, 250, 0.3); } + 50% { box-shadow: 0 0 16px rgba(96, 165, 250, 0.6); } +} +``` + +### Completed/error node states + +After a run completes, briefly show completion state on nodes: +- Nodes that completed successfully: green border flash (1s) +- Node that errored: red border + +This is derived from `runOutput` — scan for `node_completed` and `error` +events. Reset when `resetRun()` is called. + +## CanvasRoute integration + +```typescript +// CanvasRoute.tsx +
+ + + +
+``` + +The RunPanel renders conditionally (returns null when idle). Sheet handles +the slide animation. + +## Canvas height adjustment + +When the bottom panel is open, the React Flow canvas needs to shrink to +avoid overlap. Two approaches: + +1. **Overlay** — panel floats over canvas (simpler, may obscure nodes) +2. **Resize** — canvas height adjusts when panel opens + +**Decision: Overlay for Phase 2.** The panel is 256px tall and semi-transparent +at the top edge. Users can scroll/zoom the canvas. Phase 5 can add resize +behavior if needed. + +## Tests + +### `packages/canvas/src/components/ui/__tests__/Sheet.test.tsx` + +| Test | What it verifies | +|------|-----------------| +| `renders full-width bottom variant` | Bottom sheet has `w-full h-64` classes | +| `slide-up animation on bottom open` | `translate-y-0` when open, `translate-y-full` when closed | +| `right variant still works after refactor` | `w-80 h-full` classes unchanged | + +### `packages/canvas/src/components/panels/__tests__/RunPanel.test.tsx` + +| Test | What it verifies | +|------|-----------------| +| `returns null when status is idle` | No DOM rendered | +| `renders event timeline during run` | RunEventItem for each event in runOutput | +| `shows completion duration` | Duration displayed when status is completed | +| `shows error message` | Red error text when status is error | +| `scrolls to bottom on new event` | `scrollIntoView` called on endRef | + +### `packages/canvas/src/components/panels/__tests__/RunEventItem.test.tsx` + +| Test | What it verifies | +|------|-----------------| +| `renders icon and label per event type` | Correct icon for each of 7 event types | + +## Acceptance criteria + +- [ ] Sheet `sideClasses` includes size (no hardcoded `w-80` in template) +- [ ] Sheet supports `side="bottom"` with slide-up animation +- [ ] RunPanel shows live event timeline during run +- [ ] Events auto-scroll to bottom +- [ ] Active node gets pulsing blue border during execution +- [ ] Completed run shows total duration +- [ ] Error state shows error message in red +- [ ] Panel close hides panel but doesn't cancel run +- [ ] RunPanel mounts in CanvasRoute alongside NodeConfigPanel +- [ ] `tsc --noEmit` passes +- [ ] Unit tests pass diff --git a/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.5-reconnection-resume.md b/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.5-reconnection-resume.md new file mode 100644 index 0000000..3f4907e --- /dev/null +++ b/.claude/gw-plans/canvas/phase-2-sse-run-panel/phase-2.5-reconnection-resume.md @@ -0,0 +1,264 @@ +# Phase 2.5 — Reconnection + Resume UI + +## Goal + +Implement the full SSE reconnection state machine with exponential backoff, +and the human-in-the-loop resume form for paused runs. + +## Depends on + +- 2.1 (SSE service layer — `connectStream`, `getRunStatus`) +- 2.4 (Run panel — where resume UI is rendered) + +## Files to create/modify + +| File | Action | +|------|--------| +| `packages/canvas/src/store/runSlice.ts` | Extend `_handleStreamError` with reconnection | +| `packages/canvas/src/components/panels/ResumeForm.tsx` | Create | +| `packages/canvas/src/components/panels/RunPanel.tsx` | Integrate ResumeForm + connection lost banner | + +## Reconnection state machine + +From gw-frontend skill — this is the authoritative spec: + +``` +CONNECTED → graph_completed → COMPLETED (normal path) + ↘ connection drops unexpectedly + → RECONNECTING (backoff: 1s → 2s → 4s, max 3 attempts) + → GET /runs/{id}/status + { status: "completed" } → replay terminal event → COMPLETED + { status: "running" } → reattach to /stream → CONNECTED + { status: "paused" } → show resume UI → PAUSED + 404 / server error → FAILED, show banner + → 3 failed attempts → FAILED +``` + +### Concurrency guard + +Add a module-level flag to prevent parallel reconnection chains. If two +`onerror` events fire in rapid succession, both `_handleStreamError` calls +read `reconnectAttempts` at the same time and spawn duplicate chains. + +```typescript +let reconnecting = false; // module-level, alongside cleanup and terminalReceived +``` + +### Implementation in `_handleStreamError` + +Replace the Phase 2.2 stub with full reconnection: + +```typescript +_handleStreamError: async (_error) => { + // Guard: skip if terminal event received (onerror race — see 2.2) + if (terminalReceived) return; + + // Guard: prevent concurrent reconnection chains + if (reconnecting) return; + + const state = useRunStore.getState(); + if (state.runStatus === "completed" || state.runStatus === "error") return; + if (!state.activeRunId) return; + + reconnecting = true; + + const attempt = state.reconnectAttempts + 1; + if (attempt > 3) { + reconnecting = false; + set({ + runStatus: "connection_lost", + errorMessage: "Connection lost after 3 attempts", + }); + showToast("Connection lost — run may still be executing on the server", "error"); + return; + } + + set({ runStatus: "reconnecting", reconnectAttempts: attempt }); + + // Exponential backoff: 1s, 2s, 4s + await sleep(1000 * Math.pow(2, attempt - 1)); + + try { + const status = await getRunStatus(state.activeRunId); + + switch (status.status) { + case "completed": + reconnecting = false; + set({ + runStatus: "completed", + finalState: status.final_state, + durationMs: status.duration_ms, + activeNodeId: null, + }); + break; + + case "running": { + // Reattach SSE with last known event ID + const { _handleEvent, _handleStreamError, lastEventId } = + useRunStore.getState(); + cleanup = connectStream(state.activeRunId, { + onEvent: _handleEvent, + onError: _handleStreamError, + }, lastEventId); + reconnecting = false; + set({ runStatus: "running", reconnectAttempts: 0 }); + break; + } + + case "paused": + reconnecting = false; + set({ + runStatus: "paused", + activeNodeId: status.node_id, + pausedPrompt: status.prompt, + }); + break; + + case "error": + reconnecting = false; + set({ + runStatus: "error", + errorMessage: status.error ?? "Run failed on server", + activeNodeId: null, + }); + break; + } + } catch { + // Status check failed — retry (reset guard so recursive call proceeds) + reconnecting = false; + useRunStore.getState()._handleStreamError(new Error("Status check failed")); + } +} +``` + +### `sleep` utility + +```typescript +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} +``` + +### `lastEventId` tracking + +Already handled in Parts 2.1 and 2.2: `connectStream` passes `eventId` to +`onEvent`, and `_handleEvent` stores it in `lastEventId`. On reconnection, +`lastEventId` is passed to `connectStream` which adds it as +`?last_event_id=N` — the server replays buffered events after that ID. + +## Resume UI + +When `runStatus === "paused"`, the run panel shows a resume form. + +### ResumeForm component + +```typescript +interface ResumeFormProps { + prompt: string; // from graph_paused event + onSubmit: (input: unknown) => void; +} + +export function ResumeForm({ prompt, onSubmit }: ResumeFormProps) { + const [value, setValue] = useState(""); + + return ( +
+

{prompt}

+
+ setValue(e.target.value)} + placeholder="Type your response..." + className="flex-1" + /> + +
+
+ ); +} +``` + +### Integration in RunPanel + +```typescript +// In RunPanel, after the event list: +{runStatus === "paused" && pausedPrompt && ( + useRunStore.getState().resumeRun(input)} + /> +)} +``` + +### Resume flow (race condition safe) + +Already implemented in Phase 2.2's `resumeRun` — opens new SSE connection +before the POST returns. The server has a 2-second timeout waiting for the +SSE listener. + +Sequence: +1. User types response, clicks Resume +2. `resumeRun(input)` called +3. Old SSE connection closed +4. **New SSE connection opened immediately** +5. `POST /runs/{id}/resume` sent +6. Server detects SSE listener, feeds input to LangGraph +7. Execution continues, events flow to new connection + +## Connection lost banner + +When `runStatus === "connection_lost"` (3 failed reconnection attempts), show +a persistent banner in the run panel: + +``` +⚠ Connection lost — the run may still be executing on the server. +[Retry Connection] [Dismiss] +``` + +"Retry Connection" resets `reconnectAttempts` to 0 and triggers +`_handleStreamError` again to restart the reconnection cycle. + +## Tests + +### `packages/canvas/src/store/__tests__/runSlice.reconnect.test.ts` + +Mock `getRunStatus`, `connectStream`, and `sleep` (via `vi.useFakeTimers`). + +| Test | What it verifies | +|------|-----------------| +| `reconnects with exponential backoff` | Delays are 1s, 2s, 4s | +| `recovers on status=running` | Opens new SSE, resets attempts, status → running | +| `recovers on status=completed` | Sets finalState/durationMs, status → completed | +| `recovers on status=paused` | Sets pausedPrompt, status → paused | +| `recovers on status=error` | Sets errorMessage, status → error | +| `gives up after 3 attempts` | Status → connection_lost, toast shown | +| `status check failure triggers retry` | Recursive call to _handleStreamError | +| `lastEventId passed to connectStream on reattach` | connectStream called with stored lastEventId | +| `concurrent onerror during reconnection is ignored` | Second call returns immediately via `reconnecting` guard | +| `reconnecting flag resets on success` | Flag is false after status=running recovery | +| `reconnecting flag resets on give-up` | Flag is false after 3 failed attempts | + +### `packages/canvas/src/components/panels/__tests__/ResumeForm.test.tsx` + +| Test | What it verifies | +|------|-----------------| +| `renders prompt text` | Prompt from graph_paused shown | +| `submit button disabled when empty` | Can't submit empty input | +| `calls onSubmit with input value` | Submit fires with typed value | + +## Acceptance criteria + +- [ ] `reconnecting` guard prevents concurrent reconnection chains +- [ ] `reconnecting` reset on success, give-up, and retry +- [ ] Reconnection attempts with exponential backoff (1s → 2s → 4s) +- [ ] Status check determines correct recovery path (completed/running/paused/error) +- [ ] Successful reconnection resets attempt counter +- [ ] 3 failed attempts → `connection_lost` status + banner +- [ ] `lastEventId` tracked and passed to `connectStream` on reconnect +- [ ] Server replays buffered events after the last seen ID +- [ ] ResumeForm shows prompt and input field when paused +- [ ] Resume opens new SSE before POST (race condition safe) +- [ ] Connection lost banner with manual retry option +- [ ] `tsc --noEmit` passes From 899643b5a1277a806b0e39c43080aa4e80bc4bab Mon Sep 17 00:00:00 2001 From: prosdev Date: Mon, 16 Mar 2026 17:39:47 -0700 Subject: [PATCH 2/9] =?UTF-8?q?feat(canvas):=20implement=20Phase=202=20?= =?UTF-8?q?=E2=80=94=20SSE=20run=20panel?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2.1: SSE service layer - Export apiUrl() from client.ts for shared URL construction - Rewrite runs.ts: startRun, resumeRun, cancelRun, getRunStatus, connectStream with typed SSE events and Last-Event-ID support - All request bodies wrap input in { input: ... } per server schema Phase 2.2: RunSlice state machine - Full event dispatch for all 7 GraphEvent types - terminalReceived flag prevents onerror race after graph_completed - Module-closure cleanup ref (not in Zustand state) Phase 2.3: Run button + validation - Run/Stop button in CanvasHeader with status-driven states - validateGraph checks Start/End presence, orphans, LLM prompts - Auto-save before run with error checking - RunInputDialog for JSON input Phase 2.4: Run panel + node highlighting - Sheet extended with side="bottom" (openTransform/closedTransform) - RunPanel with event timeline, auto-scroll, status banners - Active node pulse via useNodeId() + useRunStore in BaseNodeShell Phase 2.5: Reconnection + resume - Exponential backoff (1s→2s→4s, max 3 attempts) - Status polling fallback (completed/running/paused/error) - reconnecting concurrency guard prevents parallel chains - ResumeForm for human-in-the-loop paused runs - Fix stale URL in gw-frontend skill Co-Authored-By: Claude Opus 4.6 (1M context) --- .claude/skills/gw-frontend/SKILL.md | 4 +- .../canvas/src/api/__tests__/runs.test.ts | 216 ++++++++++ packages/canvas/src/api/client.ts | 7 +- packages/canvas/src/api/runs.ts | 130 +++++- .../src/components/canvas/CanvasHeader.tsx | 90 ++++- .../src/components/canvas/CanvasRoute.tsx | 2 + .../src/components/canvas/RunInputDialog.tsx | 56 +++ .../components/canvas/nodes/BaseNodeShell.tsx | 9 +- .../nodes/__tests__/BaseNodeShell.test.tsx | 9 + .../canvas/nodes/__tests__/EndNode.test.tsx | 9 + .../canvas/nodes/__tests__/LLMNode.test.tsx | 9 + .../canvas/nodes/__tests__/StartNode.test.tsx | 9 + .../src/components/panels/ResumeForm.tsx | 33 ++ .../src/components/panels/RunEventItem.tsx | 104 +++++ .../canvas/src/components/panels/RunPanel.tsx | 83 ++++ packages/canvas/src/components/ui/Sheet.tsx | 34 +- .../src/store/__tests__/runSlice.test.ts | 377 ++++++++++++++++++ packages/canvas/src/store/runSlice.ts | 302 +++++++++++++- .../src/utils/__tests__/validateGraph.test.ts | 116 ++++++ packages/canvas/src/utils/validateGraph.ts | 67 ++++ 20 files changed, 1616 insertions(+), 50 deletions(-) create mode 100644 packages/canvas/src/api/__tests__/runs.test.ts create mode 100644 packages/canvas/src/components/canvas/RunInputDialog.tsx create mode 100644 packages/canvas/src/components/panels/ResumeForm.tsx create mode 100644 packages/canvas/src/components/panels/RunEventItem.tsx create mode 100644 packages/canvas/src/components/panels/RunPanel.tsx create mode 100644 packages/canvas/src/store/__tests__/runSlice.test.ts create mode 100644 packages/canvas/src/utils/__tests__/validateGraph.test.ts create mode 100644 packages/canvas/src/utils/validateGraph.ts diff --git a/.claude/skills/gw-frontend/SKILL.md b/.claude/skills/gw-frontend/SKILL.md index 3895972..da0b63d 100644 --- a/.claude/skills/gw-frontend/SKILL.md +++ b/.claude/skills/gw-frontend/SKILL.md @@ -121,9 +121,9 @@ interface UISlice { CONNECTED → graph_completed → COMPLETED (normal path) ↘ connection drops unexpectedly → RECONNECTING (backoff: 1s → 2s → 4s, max 3 attempts) - → GET /graphs/run/:id/status + → GET /runs/:id/status { status: "completed" } → replay terminal event → COMPLETED - { status: "running" } → reattach to /stream → CONNECTED + { status: "running" } → reattach to /runs/:id/stream → CONNECTED { status: "paused" } → show resume UI → PAUSED 404 / server error → FAILED, show banner → 3 failed attempts → FAILED diff --git a/packages/canvas/src/api/__tests__/runs.test.ts b/packages/canvas/src/api/__tests__/runs.test.ts new file mode 100644 index 0000000..5d6166b --- /dev/null +++ b/packages/canvas/src/api/__tests__/runs.test.ts @@ -0,0 +1,216 @@ +import { + cancelRun, + connectStream, + getRunStatus, + resumeRun, + startRun, +} from "../runs"; + +// --------------------------------------------------------------------------- +// Mocks +// --------------------------------------------------------------------------- + +vi.mock("../client", () => ({ + apiUrl: (path: string) => `/api${path}`, + request: vi.fn(), +})); + +const { request } = await import("../client"); + +class MockEventSource { + url: string; + listeners = new Map void)[]>(); + onerror: ((e: Event) => void) | null = null; + close = vi.fn(); + + constructor(url: string) { + this.url = url; + mockEventSourceInstances.push(this); + } + + addEventListener(type: string, fn: (e: MessageEvent) => void) { + const existing = this.listeners.get(type) ?? []; + existing.push(fn); + this.listeners.set(type, existing); + } + + emit(type: string, data: unknown, id?: string) { + const fns = this.listeners.get(type); + if (!fns) return; + for (const fn of fns) { + fn({ data: JSON.stringify(data), lastEventId: id ?? "" } as MessageEvent); + } + } +} + +let mockEventSourceInstances: MockEventSource[] = []; +vi.stubGlobal("EventSource", MockEventSource); + +function latestSource(): MockEventSource { + const s = mockEventSourceInstances[mockEventSourceInstances.length - 1]; + if (!s) throw new Error("No EventSource created"); + return s; +} + +function lastRequestBody(): unknown { + const calls = vi.mocked(request).mock.calls; + const call = calls[calls.length - 1]; + if (!call) throw new Error("No request calls"); + const opts = call[1] as RequestInit | undefined; + return JSON.parse(opts?.body as string); +} + +function lastRequestPath(): string { + const calls = vi.mocked(request).mock.calls; + const call = calls[calls.length - 1]; + if (!call) throw new Error("No request calls"); + return call[0] as string; +} + +beforeEach(() => { + vi.clearAllMocks(); + mockEventSourceInstances = []; +}); + +// --------------------------------------------------------------------------- +// REST endpoints +// --------------------------------------------------------------------------- + +describe("startRun", () => { + it("calls correct URL with encoded graph ID", async () => { + vi.mocked(request).mockResolvedValue({ run_id: "r1", status: "running" }); + await startRun("my graph#1", { key: "val" }); + expect(request).toHaveBeenCalledWith( + "/graphs/my%20graph%231/run", + expect.objectContaining({ method: "POST" }), + ); + }); + + it("wraps input in { input } body", async () => { + vi.mocked(request).mockResolvedValue({ run_id: "r1", status: "running" }); + await startRun("g1", { foo: "bar" }); + expect(lastRequestBody()).toEqual({ input: { foo: "bar" } }); + }); + + it("defaults input to empty object", async () => { + vi.mocked(request).mockResolvedValue({ run_id: "r1", status: "running" }); + await startRun("g1"); + expect(lastRequestBody()).toEqual({ input: {} }); + }); +}); + +describe("resumeRun", () => { + it("wraps input in { input } body", async () => { + vi.mocked(request).mockResolvedValue({ status: "resumed" }); + await resumeRun("r1", "user response"); + expect(lastRequestPath()).toBe("/runs/r1/resume"); + expect(lastRequestBody()).toEqual({ input: "user response" }); + }); +}); + +describe("cancelRun", () => { + it("sends POST to correct URL", async () => { + vi.mocked(request).mockResolvedValue(undefined); + await cancelRun("r1"); + expect(request).toHaveBeenCalledWith( + "/runs/r1/cancel", + expect.objectContaining({ method: "POST" }), + ); + }); +}); + +describe("getRunStatus", () => { + it("calls correct URL and returns response", async () => { + const status = { run_id: "r1", status: "running" }; + vi.mocked(request).mockResolvedValue(status); + const result = await getRunStatus("r1"); + expect(request).toHaveBeenCalledWith("/runs/r1/status"); + expect(result).toEqual(status); + }); +}); + +// --------------------------------------------------------------------------- +// SSE stream +// --------------------------------------------------------------------------- + +describe("connectStream", () => { + it("creates EventSource with correct URL through apiUrl", () => { + connectStream("r1", { onEvent: vi.fn(), onError: vi.fn() }); + expect(latestSource().url).toBe("/api/runs/r1/stream"); + }); + + it("encodes run ID in URL", () => { + connectStream("run #1", { onEvent: vi.fn(), onError: vi.fn() }); + expect(latestSource().url).toBe("/api/runs/run%20%231/stream"); + }); + + it("adds last_event_id query param when > 0", () => { + connectStream("r1", { onEvent: vi.fn(), onError: vi.fn() }, 5); + expect(latestSource().url).toBe("/api/runs/r1/stream?last_event_id=5"); + }); + + it("omits query param when lastEventId is 0", () => { + connectStream("r1", { onEvent: vi.fn(), onError: vi.fn() }, 0); + expect(latestSource().url).toBe("/api/runs/r1/stream"); + }); + + it("listens for all 7 event types", () => { + connectStream("r1", { onEvent: vi.fn(), onError: vi.fn() }); + const types = [...latestSource().listeners.keys()]; + expect(types).toEqual([ + "run_started", + "node_started", + "node_completed", + "edge_traversed", + "graph_paused", + "graph_completed", + "error", + ]); + }); + + it("passes parsed event and eventId to onEvent", () => { + const onEvent = vi.fn(); + connectStream("r1", { onEvent, onError: vi.fn() }); + latestSource().emit("node_started", { node_id: "n1", timestamp: "t" }, "3"); + expect(onEvent).toHaveBeenCalledWith( + { event: "node_started", data: { node_id: "n1", timestamp: "t" } }, + 3, + ); + }); + + it("passes null eventId when lastEventId is empty", () => { + const onEvent = vi.fn(); + connectStream("r1", { onEvent, onError: vi.fn() }); + latestSource().emit("run_started", { run_id: "r1", timestamp: "t" }); + expect(onEvent).toHaveBeenCalledWith(expect.anything(), null); + }); + + it("handles JSON parse errors without crashing", () => { + const onEvent = vi.fn(); + connectStream("r1", { onEvent, onError: vi.fn() }); + const fns = latestSource().listeners.get("node_started") ?? []; + const handler = fns[0]; + if (!handler) throw new Error("Expected listener"); + handler({ data: "not json", lastEventId: "" } as MessageEvent); + expect(onEvent).not.toHaveBeenCalled(); + }); + + it("calls onError and closes source on onerror", () => { + const onError = vi.fn(); + connectStream("r1", { onEvent: vi.fn(), onError }); + const source = latestSource(); + source.onerror?.(new Event("error")); + expect(source.close).toHaveBeenCalled(); + expect(onError).toHaveBeenCalledWith(expect.any(Error)); + }); + + it("cleanup function closes EventSource", () => { + const cleanup = connectStream("r1", { + onEvent: vi.fn(), + onError: vi.fn(), + }); + const source = latestSource(); + cleanup(); + expect(source.close).toHaveBeenCalled(); + }); +}); diff --git a/packages/canvas/src/api/client.ts b/packages/canvas/src/api/client.ts index 84bdc95..92209f7 100644 --- a/packages/canvas/src/api/client.ts +++ b/packages/canvas/src/api/client.ts @@ -30,6 +30,11 @@ export class ApiError extends Error { const BASE_URL = "/api"; +/** Build a full API URL from a path. Used by both request() and EventSource. */ +export function apiUrl(path: string): string { + return `${BASE_URL}${path}`; +} + export async function request( path: string, options?: RequestInit, @@ -38,7 +43,7 @@ export async function request( "Content-Type": "application/json", }; - const response = await fetch(`${BASE_URL}${path}`, { + const response = await fetch(apiUrl(path), { ...options, headers: { ...headers, ...options?.headers }, }); diff --git a/packages/canvas/src/api/runs.ts b/packages/canvas/src/api/runs.ts index f017e25..107517e 100644 --- a/packages/canvas/src/api/runs.ts +++ b/packages/canvas/src/api/runs.ts @@ -1,29 +1,129 @@ /** Run management and SSE streaming service layer. */ -import { request } from "./client"; +import type { GraphEvent } from "@shared/events"; +import { apiUrl, request } from "./client"; +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export interface StreamHandlers { + onEvent: (event: GraphEvent, eventId: number | null) => void; + onError: (error: Error) => void; +} + +export interface RunStatusResponse { + run_id: string; + graph_id: string; + status: "running" | "paused" | "completed" | "error"; + node_id: string | null; + prompt: string | null; + final_state: unknown | null; + duration_ms: number | null; + error: string | null; +} + +// --------------------------------------------------------------------------- +// REST endpoints +// --------------------------------------------------------------------------- + +/** Start a run — POST /graphs/{graph_id}/run */ export async function startRun( graphId: string, + input?: Record, +): Promise<{ run_id: string; status: string }> { + return request<{ run_id: string; status: string }>( + `/graphs/${encodeURIComponent(graphId)}/run`, + { + method: "POST", + body: JSON.stringify({ input: input ?? {} }), + }, + ); +} + +/** Resume a paused run — POST /runs/{run_id}/resume */ +export async function resumeRun( + runId: string, input: unknown, -): Promise<{ run_id: string }> { - return request<{ run_id: string }>(`/graphs/${graphId}/run`, { +): Promise<{ status: string }> { + return request<{ status: string }>( + `/runs/${encodeURIComponent(runId)}/resume`, + { + method: "POST", + body: JSON.stringify({ input }), + }, + ); +} + +/** Cancel a running/paused run — POST /runs/{run_id}/cancel */ +export async function cancelRun(runId: string): Promise { + await request(`/runs/${encodeURIComponent(runId)}/cancel`, { method: "POST", - body: JSON.stringify(input), }); } +/** Get current run status — GET /runs/{run_id}/status */ +export async function getRunStatus(runId: string): Promise { + return request( + `/runs/${encodeURIComponent(runId)}/status`, + ); +} + +// --------------------------------------------------------------------------- +// SSE stream +// --------------------------------------------------------------------------- + +const EVENT_TYPES = [ + "run_started", + "node_started", + "node_completed", + "edge_traversed", + "graph_paused", + "graph_completed", + "error", +] as const; + +/** + * Connect to an SSE run stream. + * + * Uses named event listeners (not onmessage) because the server sends typed + * SSE events. Returns a cleanup function that closes the EventSource. + * + * No auto-reconnect — EventSource is closed on error so the store's + * reconnection state machine (runSlice) can control backoff and status checks. + */ export function connectStream( runId: string, - handlers: { - onMessage: (event: string, data: unknown) => void; - onError: (error: Event) => void; - }, -): EventSource { - const source = new EventSource(`/api/graphs/run/${runId}/stream`); - source.onmessage = (e) => { - const parsed = JSON.parse(e.data); - handlers.onMessage(parsed.event, parsed.data); + handlers: StreamHandlers, + lastEventId?: number, +): () => void { + const encoded = encodeURIComponent(runId); + const params = + lastEventId != null && lastEventId > 0 + ? `?last_event_id=${lastEventId}` + : ""; + const url = apiUrl(`/runs/${encoded}/stream${params}`); + + const source = new EventSource(url); + + for (const type of EVENT_TYPES) { + source.addEventListener(type, (e: MessageEvent) => { + const eventId = e.lastEventId ? Number(e.lastEventId) : null; + try { + const data = JSON.parse(e.data); + handlers.onEvent({ event: type, data } as GraphEvent, eventId); + } catch { + // Malformed SSE data — skip event, don't crash the stream + } + }); + } + + source.onerror = () => { + source.close(); + handlers.onError(new Error("SSE connection lost")); + }; + + return () => { + source.close(); }; - source.onerror = handlers.onError; - return source; } diff --git a/packages/canvas/src/components/canvas/CanvasHeader.tsx b/packages/canvas/src/components/canvas/CanvasHeader.tsx index b57cf03..8d4224a 100644 --- a/packages/canvas/src/components/canvas/CanvasHeader.tsx +++ b/packages/canvas/src/components/canvas/CanvasHeader.tsx @@ -1,7 +1,8 @@ import { useGraphStore } from "@store/graphSlice"; +import { useRunStore } from "@store/runSlice"; import { useUIStore } from "@store/uiSlice"; import { Button } from "@ui/Button"; -import { ChevronLeft, Pencil, Save } from "lucide-react"; +import { ChevronLeft, Pencil, Play, Save, Square } from "lucide-react"; import { type KeyboardEvent, memo, @@ -11,6 +12,8 @@ import { useState, } from "react"; import { useNavigate } from "react-router"; +import { validateGraph } from "../../utils/validateGraph"; +import { RunInputDialog } from "./RunInputDialog"; function CanvasHeaderComponent() { const graph = useGraphStore((s) => s.graph); @@ -19,9 +22,11 @@ function CanvasHeaderComponent() { const saveError = useGraphStore((s) => s.saveError); const saveGraph = useGraphStore((s) => s.saveGraph); const renameGraph = useGraphStore((s) => s.renameGraph); + const runStatus = useRunStore((s) => s.runStatus); const navigate = useNavigate(); const [editing, setEditing] = useState(false); + const [inputDialogOpen, setInputDialogOpen] = useState(false); const inputRef = useRef(null); // Show toast when saveError appears @@ -68,6 +73,49 @@ function CanvasHeaderComponent() { saveGraph(); }, [saveGraph]); + const handleRun = useCallback(async () => { + const { nodes, edges, dirty: isDirty } = useGraphStore.getState(); + + const errors = validateGraph(nodes, edges); + if (errors.length > 0) { + useUIStore + .getState() + .showToast(errors[0]?.message ?? "Validation failed", "error"); + return; + } + + if (isDirty) { + await useGraphStore.getState().saveGraph(); + if (useGraphStore.getState().saveError) { + useUIStore + .getState() + .showToast( + "Failed to save — fix save errors before running", + "error", + ); + return; + } + } + + setInputDialogOpen(true); + }, []); + + const handleRunSubmit = useCallback( + (input: Record) => { + setInputDialogOpen(false); + if (graph) { + useRunStore.getState().startRun(graph.id, input); + } + }, + [graph], + ); + + const handleStop = useCallback(() => { + useRunStore.getState().cancelRun(); + }, []); + + const isRunning = runStatus === "running" || runStatus === "reconnecting"; + return ( <>
@@ -117,15 +165,39 @@ function CanvasHeaderComponent() { )} - +
+ + + {isRunning ? ( + + ) : ( + + )} +
+ + setInputDialogOpen(false)} + onSubmit={handleRunSubmit} + /> ); } diff --git a/packages/canvas/src/components/canvas/CanvasRoute.tsx b/packages/canvas/src/components/canvas/CanvasRoute.tsx index 8979332..c38322e 100644 --- a/packages/canvas/src/components/canvas/CanvasRoute.tsx +++ b/packages/canvas/src/components/canvas/CanvasRoute.tsx @@ -6,6 +6,7 @@ import { useEffect, useState } from "react"; import { Link, useParams } from "react-router"; import { useBeforeUnload } from "../../hooks/useBeforeUnload"; import { NodeConfigPanel } from "../panels/NodeConfigPanel"; +import { RunPanel } from "../panels/RunPanel"; import { CanvasHeader } from "./CanvasHeader"; import { GraphCanvas } from "./GraphCanvas"; @@ -57,6 +58,7 @@ export function CanvasRoute() {
+
diff --git a/packages/canvas/src/components/canvas/RunInputDialog.tsx b/packages/canvas/src/components/canvas/RunInputDialog.tsx new file mode 100644 index 0000000..081a519 --- /dev/null +++ b/packages/canvas/src/components/canvas/RunInputDialog.tsx @@ -0,0 +1,56 @@ +import { Button } from "@ui/Button"; +import { Dialog } from "@ui/Dialog"; +import { useState } from "react"; + +interface RunInputDialogProps { + open: boolean; + onClose: () => void; + onSubmit: (input: Record) => void; +} + +export function RunInputDialog({ + open, + onClose, + onSubmit, +}: RunInputDialogProps) { + const [value, setValue] = useState("{}"); + const [parseError, setParseError] = useState(null); + + const handleSubmit = () => { + try { + const parsed = JSON.parse(value); + setParseError(null); + onSubmit(parsed as Record); + } catch { + setParseError("Invalid JSON"); + } + }; + + return ( + +
+