diff --git a/docs/plans/2026-02-25-message-flow-panel-design.md b/docs/plans/2026-02-25-message-flow-panel-design.md new file mode 100644 index 00000000..b8f201f9 --- /dev/null +++ b/docs/plans/2026-02-25-message-flow-panel-design.md @@ -0,0 +1,322 @@ +# Message Flow Panel — Design Doc + +**Date:** 2026-02-25 +**Status:** Approved, ready for implementation +**Review:** Passed Momus dual review (Claude + Gemini), issues fixed + +## Summary + +A developer-facing drawer panel in the BeamCode web UI that visualizes WebSocket message passing through the system in real time. Targeted at developers debugging adapter behavior, inspecting tool call chains, and verifying protocol correctness. + +## Aesthetic Direction: "Signal Wire" + +Oscilloscope / circuit trace aesthetic. Near-black background with subtle dot-grid texture. Monospace type throughout. Each message type assigned a saturated signal color. Glowing SVG connector lines draw between paired messages on hover. + +- **Font:** JetBrains Mono (content), Departure Mono or similar (header/labels) +- **Background:** `#0A0B0D` with dot-grid overlay +- **Connector lines:** cubic bezier SVG, animated with `stroke-dashoffset` draw on hover + +## Layout + +Two-lane layout with a vertical time axis in the center: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ ◈ MESSAGE FLOW ● LIVE [All Types ▾] [↓] [⌫] │ +├───────────────────────┬────────────┬────────────────────────┤ +│ OUTBOUND │ │ INBOUND │ +│ bridge → consumer │ TIME ↓ │ consumer → bridge │ +├───────────────────────┼────────────┼────────────────────────┤ +│ [pill] ─────────────→│ 00:01.234 │ │ +│ │ 00:01.301 │←──────────── [pill] │ +│ [pill] ─────────────→│ 00:01.305 │ │ +└───────────────────────┴────────────┴────────────────────────┘ +``` + +- **OUTBOUND** (left lane): messages flowing bridge → consumer, received via `socket.onmessage` +- **INBOUND** (right lane): messages flowing consumer → bridge, sent via `ws.ts:send()` + +## Message Pill Anatomy + +``` +┌──────────────────────────────────────┐ +│▌ tool_use 00:01.234 ↗ [▾] │ +│ name: Bash │ +└──────────────────────────────────────┘ +``` + +- 3px left-edge color bar (type color) +- Type label (bold monospace) +- Timestamp: `elapsed ms` since first message in session (see Data Model) +- Direction arrow: ↗ outbound / ↙ inbound +- Expand chevron `[▾]` → full JSON view +- Hover: background lifts, connector activates + +## Message Type Color System + +Note: `tool_use`, `tool_result`, and `thinking` are **content block subtypes** nested inside +`assistant` messages — they do not appear as top-level `ConsumerMessage.type` values. +Top-level pills use `assistant` as the message type; content block subtypes are shown +in the expanded JSON view and in the pairing logic (which walks `content[]`). + +### Outbound top-level types (bridge → consumer) + +| Type | Color | Hex | +|------|-------|-----| +| `assistant` | Amber | `#F59E0B` | +| `stream_event` | Cyan | `#22D3EE` | +| `tool_progress` | Teal | `#14B8A6` | +| `tool_use_summary` | Sage | `#6EE7B7` | +| `status_change` | Violet | `#A78BFA` | +| `permission_request` | Coral | `#F97316` | +| `result` | Lime | `#84CC16` | +| `cli_connected` | Steel | `#94A3B8` | +| `cli_disconnected` | Muted red | `#F87171` | +| `error` | Red | `#EF4444` | +| `user_message` (echoed) | White | `#F8FAFC` | +| `message_queued` | Purple | `#C084FC` | +| `message_history` | Zinc | `#A1A1AA` | + +### Inbound top-level types (consumer → bridge, sent via `send()`) + +| Type | Color | Hex | +|------|-------|-----| +| `user_message` | White | `#F8FAFC` | +| `permission_response` | Coral lighter | `#FED7AA` | +| `interrupt` | Red | `#EF4444` | +| `slash_command` | Sky | `#38BDF8` | +| `queue_message` | Purple | `#C084FC` | +| `update_queued_message` | Purple lighter | `#E9D5FF` | +| `cancel_queued_message` | Muted red | `#F87171` | +| `set_model` | Zinc | `#A1A1AA` | +| `set_permission_mode` | Zinc | `#A1A1AA` | + +## Pairing Logic + +Related messages are highlighted on hover with a glowing connector line and latency badge. + +| Outbound | Paired Message | Match Key | Notes | +|----------|---------------|-----------|-------| +| `permission_request` | `permission_response` | `request.id` / `request_id` | 1:1 by ID | +| `assistant` (with `tool_use` block) | `tool_use_summary` or next `assistant` response | `content[].id` | Walk `content[]` for tool_use blocks | +| `tool_progress` | — | `tool_use_id` | Groups with originating `assistant` pill | +| `message_queued` | `update_queued_message` / `cancel_queued_message` / `queued_message_sent` | Session singleton | Only one queued message per session at a time; pair by temporal adjacency after `message_queued` outbound | + +**Note on `message_queued` pairing:** `message_queued` (outbound) carries `consumer_id` but +the inbound messages (`update_queued_message`, `cancel_queued_message`) do not carry a +matching field. Use session-singleton semantics: all inbound queue messages after a +`message_queued` event (until `queued_message_sent` or `queued_message_cancelled`) are +considered paired to that `message_queued` pill. + +**Hover behavior:** +1. Hovered pill + paired counterpart → full opacity; all others → 30% opacity +2. Curved SVG connector draws between them (cubic bezier, `stroke-dashoffset` animation, ~200ms) +3. Latency badge on connector midpoint: `+47ms` (difference in `wallTime` fields) + +For `tool_progress` groups: hover the parent `assistant` pill → all related `tool_progress` +pills glow in teal, connector runs to each. Mini badge shows: `3 progress events (1.2s)`. + +## Controls + +| Control | Behavior | +|---------|----------| +| `● LIVE` | Pulses when messages are flowing; click to **pause** — freezes display, keeps buffering to `pendingWhilePaused[]`, shows `● PAUSED +12` badge | +| `[All Types ▾]` | Multiselect filter by message type; filtered-out types show as faint tick marks on the time axis | +| `[↓ auto-scroll]` | Toggle — when off, new messages append without scrolling the view | +| `[⌫ clear]` | Clears `flowMessages` ring buffer in the hook; no store mutation | + +**Keyboard shortcuts:** +- `⌥M` — toggle panel open/closed; registered in `App.tsx` via `useEffect` + `document.addEventListener` (same pattern as other global shortcuts in `App.tsx`) +- `Escape` — close panel; registered inside `MessageFlowPanel.tsx` via `useEffect` + `document.addEventListener` (same pattern as `LogDrawer.tsx:22-29`) + +**StatusBar entry point:** Add a `MessageFlowButton` to `web/src/components/StatusBar.tsx`, +positioned after the existing logs button, following the same rendering pattern. + +## Implementation + +### 1. Intercept Mechanism in `ws.ts` + +Add a module-level listener registry **after** the existing module-level state (after line 17): + +```ts +// ── Message flow tap (dev panel) ────────────────────────────────────────── +type FlowInboundListener = (sessionId: string, msg: ConsumerMessage) => void; +type FlowOutboundListener = (sessionId: string, msg: InboundMessage) => void; +const flowInboundListeners = new Set(); +const flowOutboundListeners = new Set(); + +export function addFlowInboundListener(cb: FlowInboundListener): () => void { + flowInboundListeners.add(cb); + return () => flowInboundListeners.delete(cb); +} + +export function addFlowOutboundListener(cb: FlowOutboundListener): () => void { + flowOutboundListeners.add(cb); + return () => flowOutboundListeners.delete(cb); +} +``` + +In `handleMessage`, after `const msg = parsed as ConsumerMessage;` (line 115), add: +```ts +for (const cb of flowInboundListeners) cb(sessionId, msg); +``` + +In `send()` (line 529), before `socket.send(...)`, add: +```ts +for (const cb of flowOutboundListeners) cb(targetId, message); +``` + +**Done when:** `addFlowInboundListener` and `addFlowOutboundListener` are exported and calling them with a callback fires it once per matching message in `ws.test.ts`. + +### 2. Store Additions in `store.ts` + +Add to the flat `AppState` interface (alongside `logDrawerOpen`): + +```ts +messageFlowOpen: boolean; +setMessageFlowOpen: (open: boolean) => void; +``` + +Implement in the `create()` call following the same pattern as `logDrawerOpen`. No localStorage persistence needed (reset on page load is fine for a dev panel). + +The ring buffer (`flowMessages`) lives entirely in `useMessageFlow`'s `useRef` — not in the store. This keeps the store free of dev-only state and avoids unnecessary global re-renders. + +**Done when:** `useStore.getState().messageFlowOpen` is `false` by default and `setMessageFlowOpen(true)` toggles it, verified by a store test following the pattern in `store.test.ts`. + +### 3. `useMessageFlow` Hook + +File: `web/src/hooks/useMessageFlow.ts` + +```ts +const MAX_FLOW_MESSAGES = 500; // hard cap on ring buffer + +interface UseMessageFlowResult { + messages: FlowMessage[]; + paused: boolean; + pendingCount: number; + setPaused: (v: boolean) => void; + clear: () => void; +} +``` + +- On mount: call `addFlowInboundListener` and `addFlowOutboundListener`, store cleanup in a `useEffect` return +- Each incoming message: `crypto.randomUUID()` for `id`, `Date.now()` for `wallTime`, `Date.now() - sessionStartRef.current` for `timestamp` (where `sessionStartRef` is set to `Date.now()` on the first message received) +- Ring buffer: if `messages.length >= MAX_FLOW_MESSAGES`, evict `messages[0]` before appending +- When paused: new messages go to a `pendingRef` array instead; on resume, flush pending into ring buffer (capped) +- Pairing index: `Map` (id → paired FlowMessage id), built incrementally as messages arrive + +**Done when:** Unit test in `useMessageFlow.test.ts` asserts: +1. Ring buffer evicts at 501 messages: `expect(result.current.messages).toHaveLength(500)` +2. Pause stops messages from appearing in `messages`; resume flushes them +3. `permission_request` message sets `pairedId` on the corresponding `permission_response` + +### 4. `ConnectorOverlay.tsx` + +File: `web/src/components/ConnectorOverlay.tsx` + +SVG coordinate strategy: +- `MessageFlowPanel` root div has `position: relative` +- `ConnectorOverlay` is `position: absolute; inset: 0; pointer-events: none; overflow: visible` +- Each `MessagePill` has a `data-flow-id={msg.id}` attribute on its root element +- On hover, the overlay calls `document.querySelector([data-flow-id="${pairedId}"])` and `getBoundingClientRect()` on both pills, then subtracts the panel container's `getBoundingClientRect()` to get local coordinates +- SVG `` uses cubic bezier: control points at `(panelMidX, pillA.centerY)` and `(panelMidX, pillB.centerY)` +- Animated with `stroke-dasharray` + `stroke-dashoffset` CSS transition, ~200ms ease-out + +**Done when:** Hovering a `permission_request` pill draws a visible line to its paired `permission_response` pill, verified manually in the browser (no unit test for coordinate math). + +### 5. `MessageFlowPanel.tsx` + +File: `web/src/components/MessageFlowPanel.tsx` + +- Reads `messageFlowOpen` and `currentSessionId` from store +- Returns `null` when `!messageFlowOpen || !currentSessionId` (same pattern as `LogDrawer.tsx:31`) +- Renders: top controls bar, two-column layout (left: outbound pills, center: time axis, right: inbound pills), `ConnectorOverlay` as absolute child +- Escape-to-close via `useEffect` + `document.addEventListener` (copy pattern from `LogDrawer.tsx:22-29`) + +Layout classnames follow the project's Tailwind v4 conventions (see other components for reference). + +**Done when:** Panel renders correctly when `messageFlowOpen = true`, pills appear in the correct lane, and Escape closes the panel. Verified by component test in `MessageFlowPanel.test.tsx`. + +### 6. `MessagePill.tsx` + +File: `web/src/components/MessagePill.tsx` + +Props: +```ts +interface MessagePillProps { + message: FlowMessage; + dimmed: boolean; // true when another pill is hovered + onHoverStart: () => void; // notify parent to activate connector + onHoverEnd: () => void; +} +``` + +- Color bar: 3px left border using inline style with the type's hex color +- Preview: first 60 chars of `JSON.stringify(message.payload)` +- Expand: controlled by local `useState`, shows full `
` JSON on expand
+- `data-flow-id={message.id}` on root element (required by ConnectorOverlay)
+
+**Done when:** Snapshot test in `MessagePill.test.tsx` renders without error for each direction (`"out"`, `"in"`) and all representative message types.
+
+### 7. App.tsx Wiring
+
+In `web/src/App.tsx`, add alongside the existing `LogDrawer`:
+
+1. Import `MessageFlowPanel` and `setMessageFlowOpen` from store
+2. Add `⌥M` global shortcut in the top-level `useEffect` keyboard handler (or create one if absent):
+   ```ts
+   if (e.altKey && e.key === "m") {
+     useStore.getState().setMessageFlowOpen(!useStore.getState().messageFlowOpen);
+   }
+   ```
+3. Render `` as a sibling to `` in the JSX
+
+**Done when:** Pressing `⌥M` toggles the panel open/closed without errors.
+
+### 8. StatusBar Entry Point
+
+In `web/src/components/StatusBar.tsx`, add a `MessageFlowButton` component following the
+same pattern as the existing logs button. Position it adjacent to the logs button.
+
+**Done when:** Button is visible in the StatusBar and clicking it toggles `messageFlowOpen`.
+
+## Data Model
+
+```ts
+export const MAX_FLOW_MESSAGES = 500;
+
+export interface FlowMessage {
+  id: string;            // crypto.randomUUID() at capture time
+  direction: "out" | "in";
+  type: string;          // msg.type from ConsumerMessage or InboundMessage
+  payload: unknown;      // full original message object
+  timestamp: number;     // Date.now() - sessionStartMs (ms since first message)
+  wallTime: number;      // Date.now() at capture (used for latency badge)
+  pairedId?: string;     // id of the single paired FlowMessage (1:1 pairs)
+  groupIds?: string[];   // ids of related FlowMessages (1:N groups, e.g. tool_progress)
+}
+```
+
+`timestamp` is relative to the first message received in the current session (captured in
+`useMessageFlow` as `sessionStartRef = Date.now()` on first message). This is what drives
+the time axis labels. `wallTime` is absolute and used only for computing latency badges.
+
+## File Plan
+
+```
+web/src/components/
+  MessageFlowPanel.tsx        ← panel shell, two-lane layout, controls, Escape shortcut
+  MessageFlowPanel.test.tsx   ← renders correctly, Escape closes
+  MessagePill.tsx             ← pill: color bar, type, timestamp, preview, expand, data-flow-id
+  MessagePill.test.tsx        ← snapshot for each direction + representative types
+  ConnectorOverlay.tsx        ← position:absolute SVG layer, getBoundingClientRect on hover
+
+web/src/hooks/
+  useMessageFlow.ts           ← ring buffer (MAX_FLOW_MESSAGES), listeners, pause, pairing index
+  useMessageFlow.test.ts      ← ring cap, pause/resume, pairing logic
+
+web/src/ws.ts                 ← add addFlowInboundListener + addFlowOutboundListener exports
+web/src/store.ts              ← add messageFlowOpen + setMessageFlowOpen to AppState
+web/src/App.tsx               ← ⌥M shortcut +  render
+web/src/components/StatusBar.tsx ← MessageFlowButton
+```
diff --git a/shared/consumer-types.ts b/shared/consumer-types.ts
index 7c90fdf7..4b356db2 100644
--- a/shared/consumer-types.ts
+++ b/shared/consumer-types.ts
@@ -314,6 +314,22 @@ export type ConsumerMessage =
       type: "session_lifecycle";
       subtype: string;
       metadata: Record;
+    }
+  | {
+      type: "adapter_drop";
+      reason: string;
+      dropped_type: string;
+      dropped_metadata?: Record;
+    }
+  | {
+      type: "translation_event";
+      boundary: "T1" | "T2" | "T3" | "T4";
+      translator: string;
+      from: { format: string; body: unknown };
+      to: { format: string; body: unknown };
+      traceId?: string;
+      timestamp: number;
+      sessionId: string;
     };
 
 // ── Inbound Messages (consumer → bridge) ────────────────────────────────────
diff --git a/src/adapters/claude/claude-session.ts b/src/adapters/claude/claude-session.ts
index c26e222e..619a766a 100644
--- a/src/adapters/claude/claude-session.ts
+++ b/src/adapters/claude/claude-session.ts
@@ -8,6 +8,7 @@
  * queue outbound messages before the WebSocket handshake completes.
  */
 
+import { randomUUID } from "node:crypto";
 import type WebSocket from "ws";
 import type { RawData } from "ws";
 import type { BackendSession } from "../../core/interfaces/backend-adapter.js";
@@ -99,6 +100,14 @@ export class ClaudeSession implements BackendSession {
       },
     );
     this.sendToSocket(ndjson);
+
+    this.enqueueTranslationEvent(
+      "T2",
+      "toNDJSON",
+      { format: "UnifiedMessage", body: message },
+      { format: "Claude NDJSON", body: ndjson },
+      trace.traceId,
+    );
   }
 
   // ---------------------------------------------------------------------------
@@ -267,6 +276,15 @@ export class ClaudeSession implements BackendSession {
         },
       );
       this.queue.enqueue(unified);
+
+      const trace = extractTraceContext(unified.metadata);
+      this.enqueueTranslationEvent(
+        "T3",
+        "translate",
+        { format: "Claude NDJSON", body: cliMsg },
+        { format: "UnifiedMessage", body: unified },
+        trace.traceId,
+      );
     } else {
       const consumedType = cliMsg.type === "user" || cliMsg.type === "keep_alive";
       this.tracer?.error(
@@ -285,6 +303,31 @@ export class ClaudeSession implements BackendSession {
     }
   }
 
+  private enqueueTranslationEvent(
+    boundary: "T1" | "T2" | "T3" | "T4",
+    translator: string,
+    from: { format: string; body: unknown },
+    to: { format: string; body: unknown },
+    traceId: string | undefined,
+  ): void {
+    this.queue.enqueue({
+      id: randomUUID(),
+      timestamp: Date.now(),
+      type: "translation_event",
+      role: "system",
+      content: [],
+      metadata: {
+        boundary,
+        translator,
+        from,
+        to,
+        trace_id: traceId,
+        session_id: this.sessionId,
+        timestamp: Date.now(),
+      },
+    });
+  }
+
   private traceUnparsedLine(line: string, error: string): void {
     const maxChars = 2_000;
     const truncated =
diff --git a/src/bin/beamcode.ts b/src/bin/beamcode.ts
index e1d543af..9d1e9977 100644
--- a/src/bin/beamcode.ts
+++ b/src/bin/beamcode.ts
@@ -560,8 +560,11 @@ export async function runBeamcode(argv: string[] = process.argv): Promise
   await sessionCoordinator.start();
   setActiveSessionId(pickMostRecentSessionId(sessionCoordinator.registry.listSessions()));
 
-  // 7. Auto-launch a session AFTER WS is ready so the CLI can connect
-  if (!config.noAutoLaunch) {
+  // 7. Auto-launch a session AFTER WS is ready so the CLI can connect.
+  // Skip if sessions were already restored from storage — the consumer UI will
+  // show them and the user can create new ones via the dialog.
+  const existingSessions = sessionCoordinator.registry.listSessions();
+  if (!config.noAutoLaunch && existingSessions.length === 0) {
     try {
       const session = await sessionCoordinator.createSession({
         cwd: config.cwd,
diff --git a/src/core/messaging/consumer-message-mapper.ts b/src/core/messaging/consumer-message-mapper.ts
index 202cbf17..af00e09a 100644
--- a/src/core/messaging/consumer-message-mapper.ts
+++ b/src/core/messaging/consumer-message-mapper.ts
@@ -273,6 +273,24 @@ export function mapSessionLifecycle(msg: UnifiedMessage): ConsumerMessage {
   return mapMetadataMessage("session_lifecycle", msg);
 }
 
+/**
+ * Map a UnifiedMessage of type "translation_event" to a ConsumerMessage.
+ * Translation events carry message flow visualization metadata.
+ */
+export function mapTranslationEvent(msg: UnifiedMessage): ConsumerMessage {
+  const m = msg.metadata;
+  return {
+    type: "translation_event",
+    boundary: m.boundary as "T1" | "T2" | "T3" | "T4",
+    translator: m.translator as string,
+    from: m.from as { format: string; body: unknown },
+    to: m.to as { format: string; body: unknown },
+    traceId: m.trace_id as string | undefined,
+    timestamp: m.timestamp as number,
+    sessionId: m.session_id as string,
+  };
+}
+
 /** Shared mapper for message types that forward subtype + metadata. */
 function mapMetadataMessage(
   type: "configuration_change" | "session_lifecycle",
diff --git a/src/core/session/effect-executor.ts b/src/core/session/effect-executor.ts
index 438efda8..82492e4d 100644
--- a/src/core/session/effect-executor.ts
+++ b/src/core/session/effect-executor.ts
@@ -94,6 +94,11 @@ export function executeEffects(
         case "SEND_TO_CONSUMER":
           deps.broadcaster.sendTo(effect.ws, effect.message);
           break;
+
+        case "EMIT_TRANSLATION":
+          // Forward translation event to consumers for message flow panel visualization
+          deps.broadcaster.broadcast(session, effect.event);
+          break;
       }
     } catch (err) {
       // A failing effect must never abort subsequent effects.
diff --git a/src/core/session/effect-types.ts b/src/core/session/effect-types.ts
index 21efd2e0..f4ec7901 100644
--- a/src/core/session/effect-types.ts
+++ b/src/core/session/effect-types.ts
@@ -39,4 +39,6 @@ export type Effect =
   /** Resolve git info for the session (after seeding cwd). */
   | { type: "RESOLVE_GIT_INFO" }
   /** Send a targeted message to a specific consumer WebSocket. */
-  | { type: "SEND_TO_CONSUMER"; ws: WebSocketLike; message: ConsumerMessage };
+  | { type: "SEND_TO_CONSUMER"; ws: WebSocketLike; message: ConsumerMessage }
+  /** Emit a translation event for message flow panel visualization (dev tool). */
+  | { type: "EMIT_TRANSLATION"; event: ConsumerMessage };
diff --git a/src/core/session/session-reducer.ts b/src/core/session/session-reducer.ts
index 6d7cf9bc..190679d9 100644
--- a/src/core/session/session-reducer.ts
+++ b/src/core/session/session-reducer.ts
@@ -34,6 +34,7 @@ import {
   mapStreamEvent,
   mapToolProgress,
   mapToolUseSummary,
+  mapTranslationEvent,
 } from "../messaging/consumer-message-mapper.js";
 import { normalizeInbound } from "../messaging/inbound-normalizer.js";
 import { diffTeamState } from "../team/team-event-differ.js";
@@ -588,12 +589,13 @@ function reduceInboundCommand(
       );
 
       // Normalize message for backend send (pure — no I/O).
-      const baseUnified = normalizeInbound({
-        type: "user_message",
+      const inboundMsg = {
+        type: "user_message" as const,
         content: command.content,
         session_id: data.backendSessionId || command.session_id || "",
         images: command.images,
-      });
+      };
+      const baseUnified = normalizeInbound(inboundMsg);
       if (!baseUnified) return [data, []];
 
       // Apply slash passthrough trace context when present (always a complete group).
@@ -610,6 +612,15 @@ function reduceInboundCommand(
           }
         : baseUnified;
 
+      const t1Event = translationEffect(
+        "T1",
+        "normalizeInbound",
+        { format: "InboundMessage", body: inboundMsg },
+        { format: "UnifiedMessage", body: unified },
+        unified.metadata.trace_id as string | undefined,
+        data.state.session_id,
+      );
+
       const isConnected = data.lifecycle === "active" || data.lifecycle === "idle";
 
       if (isConnected) {
@@ -624,6 +635,7 @@ function reduceInboundCommand(
           [
             { type: "BROADCAST", message: userMsg },
             { type: "PERSIST_NOW" },
+            t1Event,
             { type: "SEND_TO_BACKEND", message: unified },
           ],
         ];
@@ -644,7 +656,7 @@ function reduceInboundCommand(
           messageHistory: nextHistory,
           pendingMessages: [...data.pendingMessages, unified],
         },
-        [{ type: "BROADCAST", message: userMsg }, { type: "PERSIST_NOW" }],
+        [{ type: "BROADCAST", message: userMsg }, { type: "PERSIST_NOW" }, t1Event],
       ];
     }
 
@@ -762,6 +774,33 @@ function reduceBackendMessage(
   return [nextData, effects];
 }
 
+// ---------------------------------------------------------------------------
+// Helpers
+// ---------------------------------------------------------------------------
+
+function translationEffect(
+  boundary: "T1" | "T2" | "T3" | "T4",
+  translator: string,
+  from: { format: string; body: unknown },
+  to: { format: string; body: unknown },
+  traceId: string | undefined,
+  sessionId: string,
+): Effect {
+  return {
+    type: "EMIT_TRANSLATION",
+    event: {
+      type: "translation_event",
+      boundary,
+      translator,
+      from,
+      to,
+      traceId,
+      timestamp: Date.now(),
+      sessionId,
+    },
+  };
+}
+
 // ---------------------------------------------------------------------------
 // Effect builder — pure, depends only on prev/next data and the message
 // ---------------------------------------------------------------------------
@@ -824,6 +863,16 @@ function buildEffects(
       if (nextData.messageHistory !== prevData.messageHistory) {
         const mapped = mapAssistantMessage(message);
         if (mapped.type === "assistant") {
+          effects.push(
+            translationEffect(
+              "T4",
+              "mapAssistantMessage",
+              { format: "UnifiedMessage", body: message },
+              { format: "ConsumerMessage", body: mapped },
+              message.metadata?.trace_id as string | undefined,
+              prevData.state.session_id,
+            ),
+          );
           effects.push({ type: "BROADCAST", message: mapped });
         }
       }
@@ -831,7 +880,18 @@ function buildEffects(
     }
 
     case "result": {
-      effects.push({ type: "BROADCAST", message: mapResultMessage(message) });
+      const resultMsg = mapResultMessage(message);
+      effects.push(
+        translationEffect(
+          "T4",
+          "mapResultMessage",
+          { format: "UnifiedMessage", body: message },
+          { format: "ConsumerMessage", body: resultMsg },
+          message.metadata?.trace_id as string | undefined,
+          prevData.state.session_id,
+        ),
+      );
+      effects.push({ type: "BROADCAST", message: resultMsg });
       effects.push({ type: "AUTO_SEND_QUEUED" });
       // Emit first-turn completion event when num_turns reaches 1
       const numTurns = message.metadata?.num_turns as number | undefined;
@@ -875,6 +935,16 @@ function buildEffects(
           eventType: "permission:requested",
           payload: { request: mapped.cliPerm },
         });
+      } else {
+        effects.push({
+          type: "BROADCAST",
+          message: {
+            type: "adapter_drop",
+            reason: `permission_request subtype '${String(message.metadata?.subtype ?? "unknown")}' is not supported (only 'can_use_tool' is handled)`,
+            dropped_type: "permission_request",
+            dropped_metadata: message.metadata as Record,
+          },
+        });
       }
       break;
     }
@@ -930,6 +1000,11 @@ function buildEffects(
       effects.push({ type: "BROADCAST", message: mapSessionLifecycle(message) });
       break;
     }
+
+    case "translation_event": {
+      effects.push({ type: "BROADCAST", message: mapTranslationEvent(message) });
+      break;
+    }
   }
 
   return effects;
diff --git a/src/core/session/session-state-reducer.test.ts b/src/core/session/session-state-reducer.test.ts
index 6f0f9389..88def159 100644
--- a/src/core/session/session-state-reducer.test.ts
+++ b/src/core/session/session-state-reducer.test.ts
@@ -671,15 +671,16 @@ describe("sessionReducer INBOUND_COMMAND", () => {
     expect(next.lastStatus).toBe("running");
     expect(next.messageHistory).toHaveLength(1);
     expect(next.messageHistory[0]).toMatchObject({ type: "user_message", content: "hi" });
-    // Reducer now produces 3 effects: BROADCAST, PERSIST_NOW, and SEND_TO_BACKEND
+    // Reducer now produces 4 effects: BROADCAST, PERSIST_NOW, EMIT_TRANSLATION, and SEND_TO_BACKEND
     // (backend connected path — lifecycle is "active")
-    expect(effects).toHaveLength(3);
+    expect(effects).toHaveLength(4);
     expect(effects[0]).toMatchObject({
       type: "BROADCAST",
       message: { type: "user_message", content: "hi" },
     });
     expect(effects[1]).toMatchObject({ type: "PERSIST_NOW" });
-    expect(effects[2]).toMatchObject({ type: "SEND_TO_BACKEND" });
+    expect(effects[2]).toMatchObject({ type: "EMIT_TRANSLATION" });
+    expect(effects[3]).toMatchObject({ type: "SEND_TO_BACKEND" });
   });
 
   it("returns data unchanged and empty effects for user_message when lifecycle is closing", () => {
diff --git a/src/core/types/unified-message.ts b/src/core/types/unified-message.ts
index c1b9616e..5599bc91 100644
--- a/src/core/types/unified-message.ts
+++ b/src/core/types/unified-message.ts
@@ -46,6 +46,7 @@ export type UnifiedMessageType =
   | "team_task_update"
   | "team_state_change"
   | "session_lifecycle"
+  | "translation_event"
   | "unknown";
 
 /**
@@ -217,6 +218,7 @@ const VALID_MESSAGE_TYPES = new Set([
   "team_task_update",
   "team_state_change",
   "session_lifecycle",
+  "translation_event",
   "unknown",
 ]);
 
diff --git a/src/types/consumer-messages.ts b/src/types/consumer-messages.ts
index 6c5615d7..feccbe13 100644
--- a/src/types/consumer-messages.ts
+++ b/src/types/consumer-messages.ts
@@ -237,4 +237,20 @@ export type ConsumerMessage =
       type: "session_lifecycle";
       subtype: string;
       metadata: Record;
+    }
+  | {
+      type: "adapter_drop";
+      reason: string;
+      dropped_type: string;
+      dropped_metadata?: Record;
+    }
+  | {
+      type: "translation_event";
+      boundary: "T1" | "T2" | "T3" | "T4";
+      translator: string;
+      from: { format: string; body: unknown };
+      to: { format: string; body: unknown };
+      traceId?: string;
+      timestamp: number;
+      sessionId: string;
     };
diff --git a/web/src/App.tsx b/web/src/App.tsx
index f985904d..6bec66de 100644
--- a/web/src/App.tsx
+++ b/web/src/App.tsx
@@ -2,6 +2,7 @@ import { Component, type ErrorInfo, type ReactNode, useEffect } from "react";
 import { listSessions } from "./api";
 import { ChatView } from "./components/ChatView";
 import { LogDrawer } from "./components/LogDrawer";
+import { MessageFlowPanel } from "./components/MessageFlowPanel";
 import { NewSessionDialog } from "./components/NewSessionDialog";
 import { QuickSwitcher } from "./components/QuickSwitcher";
 import { ShortcutsModal } from "./components/ShortcutsModal";
@@ -84,6 +85,10 @@ function useBootstrap() {
         const byId: Record = {};
         for (const s of sessions) byId[s.sessionId] = s;
         useStore.getState().setSessions(byId);
+        // No sessions and no session in the URL → prompt user to create one
+        if (sessions.length === 0 && !sessionId) {
+          useStore.getState().setNewSessionDialogOpen(true);
+        }
       })
       .catch((err) => console.warn("[bootstrap] Failed to load sessions:", err));
 
@@ -104,6 +109,7 @@ export default function App() {
   const sidebarOpen = useStore((s) => s.sidebarOpen);
   const taskPanelOpen = useStore((s) => s.taskPanelOpen);
   const logDrawerOpen = useStore((s) => s.logDrawerOpen);
+  const messageFlowOpen = useStore((s) => s.messageFlowOpen);
   const darkMode = useStore((s) => s.darkMode);
   const toggleSidebar = useStore((s) => s.toggleSidebar);
   const quickSwitcherOpen = useStore((s) => s.quickSwitcherOpen);
@@ -151,6 +157,11 @@ export default function App() {
         {logDrawerOpen && }
       
 
+      {/* Message flow panel */}
+      Flow error}>
+        {messageFlowOpen && }
+      
+
       
       Dialog error}>
         
diff --git a/web/src/components/AgentColumn.tsx b/web/src/components/AgentColumn.tsx
index 6dfcfea0..8e03f007 100644
--- a/web/src/components/AgentColumn.tsx
+++ b/web/src/components/AgentColumn.tsx
@@ -15,9 +15,14 @@ function AgentColumnStreamingIndicator({
   agentId: string;
 }) {
   const stream = useStore((s) => s.sessionData[sessionId]?.agentStreaming[agentId]);
-  if (!stream?.text && !stream?.startedAt) return null;
+  if (!stream?.text && !stream?.thinking && !stream?.startedAt) return null;
   return (
     
+ {stream.thinking && ( +
+          {stream.thinking}
+        
+ )} {stream.text && }
diff --git a/web/src/components/AgentPane.tsx b/web/src/components/AgentPane.tsx index 854edcdd..5a086e9c 100644 --- a/web/src/components/AgentPane.tsx +++ b/web/src/components/AgentPane.tsx @@ -13,9 +13,14 @@ import { MessageBubble } from "./MessageBubble"; function AgentStreamingIndicator({ sessionId, agentId }: { sessionId: string; agentId: string }) { const stream = useStore((s) => s.sessionData[sessionId]?.agentStreaming[agentId]); - if (!stream?.text && !stream?.startedAt) return null; + if (!stream?.text && !stream?.thinking && !stream?.startedAt) return null; return (
+ {stream.thinking && ( +
+          {stream.thinking}
+        
+ )} {stream.text && }
diff --git a/web/src/components/ConnectorOverlay.tsx b/web/src/components/ConnectorOverlay.tsx new file mode 100644 index 00000000..31970cff --- /dev/null +++ b/web/src/components/ConnectorOverlay.tsx @@ -0,0 +1,81 @@ +import type { RefObject } from "react"; +import type { FlowMessage } from "./MessagePill"; +import { getColor } from "./MessagePill"; + +interface ConnectorOverlayProps { + hoveredId: string | null; + messages: FlowMessage[]; + containerRef: RefObject; +} + +const svgStyle = { + position: "absolute" as const, + inset: 0, + pointerEvents: "none" as const, + overflow: "visible" as const, + width: "100%", + height: "100%", +}; + +export function ConnectorOverlay({ hoveredId, messages, containerRef }: ConnectorOverlayProps) { + if (!hoveredId || !containerRef.current) { + return ; + } + + const hovered = messages.find((m) => m.id === hoveredId); + if (!hovered?.pairedId) { + return ; + } + + const paired = messages.find((m) => m.id === hovered.pairedId); + if (!paired) { + return ; + } + + const container = containerRef.current; + const el1 = container.querySelector(`[data-flow-id="${hoveredId}"]`); + const el2 = container.querySelector(`[data-flow-id="${hovered.pairedId}"]`); + if (!el1 || !el2) { + return ; + } + + const containerRect = container.getBoundingClientRect(); + const rect1 = el1.getBoundingClientRect(); + const rect2 = el2.getBoundingClientRect(); + + const x1 = rect1.left - containerRect.left + rect1.width / 2; + const y1 = rect1.top - containerRect.top + rect1.height / 2; + const x2 = rect2.left - containerRect.left + rect2.width / 2; + const y2 = rect2.top - containerRect.top + rect2.height / 2; + const midX = containerRect.width / 2; + + const d = `M ${x1} ${y1} C ${midX} ${y1}, ${midX} ${y2}, ${x2} ${y2}`; + const color = getColor(hovered.type); + const latencyMs = Math.abs(hovered.wallTime - paired.wallTime); + + return ( + + ); +} diff --git a/web/src/components/MessageFlowPanel.test.tsx b/web/src/components/MessageFlowPanel.test.tsx new file mode 100644 index 00000000..08bcece4 --- /dev/null +++ b/web/src/components/MessageFlowPanel.test.tsx @@ -0,0 +1,65 @@ +import { cleanup, fireEvent, render, screen } from "@testing-library/react"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { useStore } from "../store"; + +// Mock useMessageFlow to avoid pulling in ws.ts +vi.mock("../hooks/useMessageFlow", () => ({ + useMessageFlow: () => ({ + messages: [], + paused: false, + pendingCount: 0, + setPaused: vi.fn(), + clear: vi.fn(), + }), +})); + +// Lazy import after mock is registered +const { MessageFlowPanel } = await import("./MessageFlowPanel"); + +// jsdom doesn't implement scrollIntoView +Element.prototype.scrollIntoView = vi.fn(); + +describe("MessageFlowPanel", () => { + beforeEach(() => { + useStore.setState({ + messageFlowOpen: false, + currentSessionId: "test-session", + }); + }); + + afterEach(cleanup); + + it("returns null when messageFlowOpen is false", () => { + const { container } = render(); + expect(container.firstChild).toBeNull(); + }); + + it("renders when messageFlowOpen is true", () => { + useStore.setState({ messageFlowOpen: true }); + render(); + expect(screen.getByText("MESSAGE FLOW")).toBeTruthy(); + }); + + it("returns null when currentSessionId is null", () => { + useStore.setState({ messageFlowOpen: true, currentSessionId: null }); + const { container } = render(); + expect(container.firstChild).toBeNull(); + }); + + it("closes on Escape key", () => { + useStore.setState({ messageFlowOpen: true }); + render(); + expect(screen.getByText("MESSAGE FLOW")).toBeTruthy(); + + fireEvent.keyDown(document, { key: "Escape" }); + expect(useStore.getState().messageFlowOpen).toBe(false); + }); + + it("close button sets messageFlowOpen to false", () => { + useStore.setState({ messageFlowOpen: true }); + render(); + const closeBtn = screen.getByLabelText("Close message flow"); + fireEvent.click(closeBtn); + expect(useStore.getState().messageFlowOpen).toBe(false); + }); +}); diff --git a/web/src/components/MessageFlowPanel.tsx b/web/src/components/MessageFlowPanel.tsx new file mode 100644 index 00000000..f489f7f4 --- /dev/null +++ b/web/src/components/MessageFlowPanel.tsx @@ -0,0 +1,295 @@ +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; +import { useMessageFlow } from "../hooks/useMessageFlow"; +import { useStore } from "../store"; +import { ConnectorOverlay } from "./ConnectorOverlay"; +import { MessagePill } from "./MessagePill"; + +export function MessageFlowPanel() { + const messageFlowOpen = useStore((s) => s.messageFlowOpen); + const setMessageFlowOpen = useStore((s) => s.setMessageFlowOpen); + const currentSessionId = useStore((s) => s.currentSessionId); + const { messages, paused, pendingCount, setPaused, clear } = useMessageFlow(currentSessionId); + + const [hoveredId, setHoveredId] = useState(null); + const [hoveredTraceId, setHoveredTraceId] = useState(null); + const [filterTypes, setFilterTypes] = useState>(new Set()); + const [autoScroll, setAutoScroll] = useState(true); + const [filterOpen, setFilterOpen] = useState(false); + const [panelWidth, setPanelWidth] = useState(560); + const [detailLevel, setDetailLevel] = useState<"compact" | "detailed">("compact"); + const containerRef = useRef(null); + const bottomRef = useRef(null); + + const handleResizeMouseDown = useCallback( + (e: React.MouseEvent) => { + e.preventDefault(); + const startX = e.clientX; + const startWidth = panelWidth; + const onMove = (me: MouseEvent) => { + const delta = startX - me.clientX; // drag left = grow + setPanelWidth(Math.max(360, Math.min(1200, startWidth + delta))); + }; + const onUp = () => { + document.removeEventListener("mousemove", onMove); + document.removeEventListener("mouseup", onUp); + document.body.style.cursor = ""; + document.body.style.userSelect = ""; + }; + document.body.style.cursor = "col-resize"; + document.body.style.userSelect = "none"; + document.addEventListener("mousemove", onMove); + document.addEventListener("mouseup", onUp); + }, + [panelWidth], + ); + + // Escape to close + useEffect(() => { + if (!messageFlowOpen) return; + function onKeyDown(e: KeyboardEvent) { + if (e.key === "Escape") setMessageFlowOpen(false); + } + document.addEventListener("keydown", onKeyDown); + return () => document.removeEventListener("keydown", onKeyDown); + }, [messageFlowOpen, setMessageFlowOpen]); + + // Auto-scroll + // biome-ignore lint/correctness/useExhaustiveDependencies: intentional — scroll on new messages + useEffect(() => { + if (autoScroll && !paused) bottomRef.current?.scrollIntoView({ behavior: "smooth" }); + }, [messages.length, autoScroll, paused]); + + const allTypes = useMemo(() => { + const types = new Set(); + for (const m of messages) types.add(m.type); + return [...types].sort(); + }, [messages]); + + const handleHoverStart = useCallback((msg: (typeof messages)[0]) => { + setHoveredId(msg.id); + if (msg.traceId) { + setHoveredTraceId(msg.traceId); + } + }, []); + + const handleHoverEnd = useCallback(() => { + setHoveredId(null); + setHoveredTraceId(null); + }, []); + + if (!messageFlowOpen || !currentSessionId) return null; + + const filtered = + filterTypes.size > 0 ? messages.filter((m) => filterTypes.has(m.type)) : messages; + + const hoveredMsg = hoveredId ? messages.find((m) => m.id === hoveredId) : null; + const pairedIdOfHovered = hoveredMsg?.pairedId ?? null; + + function toggleFilter(type: string) { + setFilterTypes((prev) => { + const next = new Set(prev); + if (next.has(type)) next.delete(type); + else next.add(type); + return next; + }); + } + + return ( +
+ {/* Resize handle on left edge */} + {/* biome-ignore lint/a11y/useSemanticElements: resize handle requires div */} +
{ + if (e.key === "ArrowLeft") setPanelWidth((w) => Math.min(1200, w + 8)); + if (e.key === "ArrowRight") setPanelWidth((w) => Math.max(360, w - 8)); + }} + role="separator" + tabIndex={0} + aria-orientation="vertical" + aria-valuenow={panelWidth} + aria-valuemin={360} + aria-valuemax={1200} + aria-label="Resize message flow panel" + /> + {/* Header */} +
+ + MESSAGE FLOW + + + {/* Live/Paused badge */} + + +
+ {/* Filter dropdown */} +
+ + {filterOpen && ( +
+ {allTypes.map((type) => ( + + ))} + {allTypes.length === 0 && ( + + No messages yet + + )} +
+ )} +
+ + {/* Detail level toggle */} + + + {/* Auto-scroll toggle */} + + + {/* Clear */} + + + {/* Close */} + +
+
+ + {/* Column headers */} +
+ + Outbound ↗ + + + + Inbound ↙ + +
+ + {/* Scrollable body */} +
+ + + {filtered.length === 0 ? ( +
+ Waiting for messages… +
+ ) : ( +
+ {filtered.map((msg) => { + const isDimmed = + hoveredId !== null && msg.id !== hoveredId && msg.id !== pairedIdOfHovered; + const isHighlighted = hoveredTraceId !== null && msg.traceId === hoveredTraceId; + return ( +
+ {msg.direction === "out" ? ( + <> + handleHoverStart(msg)} + onHoverEnd={handleHoverEnd} + /> + + +{msg.timestamp} + +
+ + ) : ( + <> +
+ + +{msg.timestamp} + + handleHoverStart(msg)} + onHoverEnd={handleHoverEnd} + /> + + )} +
+ ); + })} +
+ )} +
+
+
+ ); +} diff --git a/web/src/components/MessagePill.test.tsx b/web/src/components/MessagePill.test.tsx new file mode 100644 index 00000000..52df6c7a --- /dev/null +++ b/web/src/components/MessagePill.test.tsx @@ -0,0 +1,96 @@ +import { render, screen } from "@testing-library/react"; +import userEvent from "@testing-library/user-event"; +import { describe, expect, it, vi } from "vitest"; +import { type FlowMessage, MessagePill } from "./MessagePill"; + +function makeMessage(overrides: Partial = {}): FlowMessage { + return { + id: "msg-1", + direction: "out", + type: "assistant", + payload: { text: "hello" }, + timestamp: 1234, + wallTime: Date.now(), + ...overrides, + }; +} + +const noop = () => {}; + +type PillOverrides = Partial[0]>; + +function renderPill(msgOverrides: Partial = {}, propOverrides: PillOverrides = {}) { + return render( + , + ); +} + +describe("MessagePill", () => { + it('renders with data-flow-id for direction "out"', () => { + const { container } = renderPill({ direction: "out" }); + const root = container.firstElementChild as HTMLElement; + expect(root.getAttribute("data-flow-id")).toBe("msg-1"); + }); + + it('shows ↙ arrow for direction "in"', () => { + renderPill({ direction: "in" }); + expect(screen.getByText("↙")).toBeInTheDocument(); + }); + + it("shows ↗ arrow for direction out", () => { + renderPill({ direction: "out" }); + expect(screen.getByText("↗")).toBeInTheDocument(); + }); + + it("applies opacity-30 class when dimmed", () => { + const { container } = renderPill({}, { dimmed: true }); + const root = container.firstElementChild as HTMLElement; + expect(root.className).toContain("opacity-30"); + }); + + it("does not apply opacity-30 when not dimmed", () => { + const { container } = renderPill({}, { dimmed: false }); + const root = container.firstElementChild as HTMLElement; + expect(root.className).not.toContain("opacity-30"); + }); + + it("toggles expand/collapse on click", async () => { + const user = userEvent.setup(); + renderPill({ payload: { key: "value" } }); + + const toggle = screen.getByText("[▾]"); + expect(toggle).toBeInTheDocument(); + + await user.click(toggle); + expect(screen.getByText("[▴]")).toBeInTheDocument(); + + await user.click(screen.getByText("[▴]")); + expect(screen.getByText("[▾]")).toBeInTheDocument(); + }); + + it("calls onHoverStart and onHoverEnd", async () => { + const user = userEvent.setup(); + const onHoverStart = vi.fn(); + const onHoverEnd = vi.fn(); + const { container } = renderPill({}, { onHoverStart, onHoverEnd }); + const root = container.firstElementChild as HTMLElement; + + await user.hover(root); + expect(onHoverStart).toHaveBeenCalled(); + + await user.unhover(root); + expect(onHoverEnd).toHaveBeenCalled(); + }); + + it("displays timestamp formatted as +{n}ms", () => { + renderPill({ timestamp: 5678 }); + expect(screen.getByText("+5678ms")).toBeInTheDocument(); + }); +}); diff --git a/web/src/components/MessagePill.tsx b/web/src/components/MessagePill.tsx new file mode 100644 index 00000000..7d9face0 --- /dev/null +++ b/web/src/components/MessagePill.tsx @@ -0,0 +1,186 @@ +import { useState } from "react"; + +export interface FlowMessage { + id: string; + direction: "out" | "in"; + type: string; + payload: unknown; + timestamp: number; + wallTime: number; + pairedId?: string; + groupIds?: string[]; + // Translation boundary metadata (for message flow visualization) + boundary?: "T1" | "T2" | "T3" | "T4"; + translator?: string; + nativeFormat?: { format: string; body: unknown }; + traceId?: string; +} + +const COLOR_MAP: Record = { + assistant: "#F59E0B", + stream_event: "#22D3EE", + tool_progress: "#14B8A6", + tool_use_summary: "#6EE7B7", + status_change: "#A78BFA", + permission_request: "#F97316", + result: "#84CC16", + cli_connected: "#94A3B8", + cli_disconnected: "#F87171", + error: "#EF4444", + user_message: "#F8FAFC", + message_queued: "#C084FC", + permission_response: "#FED7AA", + interrupt: "#EF4444", + slash_command: "#38BDF8", + queue_message: "#C084FC", + update_queued_message: "#E9D5FF", + cancel_queued_message: "#F87171", + adapter_drop: "#EF4444", + translation_event: "#8B5CF6", +}; + +const DEFAULT_COLOR = "#71717A"; + +export function getColor(type: string): string { + return COLOR_MAP[type] ?? DEFAULT_COLOR; +} + +/** + * Generate a consistent color from a traceId for visual correlation. + * Uses a simple hash to pick from a palette of distinct colors. + */ +function getTraceColor(traceId: string): string { + const colors = [ + "#22D3EE", // cyan + "#F59E0B", // amber + "#A78BFA", // purple + "#FB923C", // orange + "#34D399", // emerald + "#F472B6", // pink + "#FACC15", // yellow + "#60A5FA", // blue + "#FB7185", // rose + "#A3E635", // lime + ]; + + let hash = 0; + for (let i = 0; i < traceId.length; i++) { + hash = (hash << 5) - hash + traceId.charCodeAt(i); + hash = hash & hash; // Convert to 32bit integer + } + + return colors[Math.abs(hash) % colors.length]; +} + +interface MessagePillProps { + message: FlowMessage; + detailLevel: "compact" | "detailed"; + dimmed: boolean; + highlighted?: boolean; + onHoverStart: () => void; + onHoverEnd: () => void; +} + +export function MessagePill({ + message, + detailLevel, + dimmed, + highlighted = false, + onHoverStart, + onHoverEnd, +}: MessagePillProps) { + const [expanded, setExpanded] = useState(false); + const [boundaryExpanded, setBoundaryExpanded] = useState(false); + const color = getColor(message.type); + const payloadStr = JSON.stringify(message.payload); + const preview = payloadStr.length > 80 ? `${payloadStr.slice(0, 80)}…` : payloadStr; + + const showBoundary = detailLevel === "detailed" && message.boundary; + const traceColor = message.traceId ? getTraceColor(message.traceId) : undefined; + const truncatedTraceId = message.traceId?.slice(0, 8); + + return ( + // biome-ignore lint/a11y/noStaticElementInteractions: pill is a visual dev tool element, not interactive UI +
+
+ + {message.type} + + {showBoundary && ( + + {message.boundary} + + )} + {message.traceId && ( + + {truncatedTraceId} + + )} + +{message.timestamp}ms + {message.direction === "out" ? "↗" : "↙"} + +
+ {expanded ? ( +
+          {JSON.stringify(message.payload, null, 2)}
+        
+ ) : ( + {preview} + )} + + {/* Boundary details (detailed mode only) */} + {showBoundary && message.nativeFormat && ( +
+ + {boundaryExpanded && ( +
+
+                {JSON.stringify(message.nativeFormat.body, null, 2).slice(0, 500)}
+                {JSON.stringify(message.nativeFormat.body).length > 500 && "…"}
+              
+ +
+ )} +
+ )} +
+ ); +} diff --git a/web/src/components/PersistedOutputBlock.tsx b/web/src/components/PersistedOutputBlock.tsx new file mode 100644 index 00000000..72ed2fe7 --- /dev/null +++ b/web/src/components/PersistedOutputBlock.tsx @@ -0,0 +1,68 @@ +import { useState } from "react"; +import type { ParsedPersistedOutput } from "../utils/persisted-output"; +import { CopyButton } from "./CopyButton"; + +interface PersistedOutputBlockProps { + parsed: ParsedPersistedOutput; + isError?: boolean; +} + +const MAX_PREVIEW_LINES = 30; + +export function PersistedOutputBlock({ parsed, isError }: PersistedOutputBlockProps) { + const [expanded, setExpanded] = useState(false); + const lines = parsed.preview.split("\n"); + const truncated = lines.length > MAX_PREVIEW_LINES; + const displayed = expanded ? parsed.preview : lines.slice(0, MAX_PREVIEW_LINES).join("\n"); + + return ( +
+ {/* Header banner */} +
+ + + Output truncated ({parsed.size}) — saved to{" "} + + {parsed.filePath} + + +
+ + {/* Preview content */} +
+ +
+          {displayed}
+        
+ {truncated && !expanded && ( + + )} +
+
+ ); +} diff --git a/web/src/components/StatusBar.tsx b/web/src/components/StatusBar.tsx index f3a1b6f9..a9a7a9a2 100644 --- a/web/src/components/StatusBar.tsx +++ b/web/src/components/StatusBar.tsx @@ -320,6 +320,39 @@ function LogsButton() { ); } +// ── Message Flow Button ────────────────────────────────────────────────────── + +function MessageFlowButton() { + const messageFlowOpen = useStore((s) => s.messageFlowOpen); + const setMessageFlowOpen = useStore((s) => s.setMessageFlowOpen); + + return ( + + ); +} + // ── Status Bar ─────────────────────────────────────────────────────────────── export function StatusBar() { @@ -402,6 +435,7 @@ export function StatusBar() { )} + {adapterType && } diff --git a/web/src/components/StreamingIndicator.test.tsx b/web/src/components/StreamingIndicator.test.tsx index 22990322..4e9f8d55 100644 --- a/web/src/components/StreamingIndicator.test.tsx +++ b/web/src/components/StreamingIndicator.test.tsx @@ -44,6 +44,26 @@ describe("StreamingIndicator", () => { expect(bold.tagName).toBe("STRONG"); }); + it("renders streaming thinking content above text", () => { + store().ensureSessionData(SESSION); + store().setStreamingStarted(SESSION, Date.now()); + store().appendStreamingThinking(SESSION, "Let me think about this"); + store().setStreaming(SESSION, "Here is my answer"); + render(); + expect(screen.getByText("Thinking...")).toBeInTheDocument(); + expect(screen.getByText("Let me think about this")).toBeInTheDocument(); + expect(screen.getByText(/Here is my answer/)).toBeInTheDocument(); + }); + + it("renders thinking block without text content", () => { + store().ensureSessionData(SESSION); + store().setStreamingStarted(SESSION, Date.now()); + store().setSessionStatus(SESSION, "running"); + store().appendStreamingThinking(SESSION, "Pondering..."); + render(); + expect(screen.getByText("Pondering...")).toBeInTheDocument(); + }); + it("displays token count when available", () => { store().ensureSessionData(SESSION); store().setStreamingStarted(SESSION, Date.now()); diff --git a/web/src/components/StreamingIndicator.tsx b/web/src/components/StreamingIndicator.tsx index 0e19400f..97b28dbd 100644 --- a/web/src/components/StreamingIndicator.tsx +++ b/web/src/components/StreamingIndicator.tsx @@ -46,6 +46,7 @@ function useElapsed(startedAt: number | null): number | null { export function StreamingIndicator({ sessionId }: StreamingIndicatorProps) { const streaming = useStore((s) => s.sessionData[sessionId]?.streaming ?? null); + const streamingThinking = useStore((s) => s.sessionData[sessionId]?.streamingThinking ?? null); const streamingStartedAt = useStore((s) => s.sessionData[sessionId]?.streamingStartedAt ?? null); const streamingOutputTokens = useStore( (s) => s.sessionData[sessionId]?.streamingOutputTokens ?? 0, @@ -90,6 +91,35 @@ export function StreamingIndicator({ sessionId }: StreamingIndicatorProps) { return (
+ {streamingThinking && ( +
+
+ + Thinking... +
+
+            {streamingThinking}
+          
+
+ )} {streaming && }
diff --git a/web/src/components/ToolResultBlock.test.tsx b/web/src/components/ToolResultBlock.test.tsx index fb9dc5b6..17073f50 100644 --- a/web/src/components/ToolResultBlock.test.tsx +++ b/web/src/components/ToolResultBlock.test.tsx @@ -245,4 +245,36 @@ describe("ToolResultBlock", () => { expect(pre?.className).toContain("text-bc-error"); }); }); + + describe("persisted output", () => { + const persistedContent = ` +Output too large (57.8KB). Full output saved to: /tmp/output.txt + +Preview (first 2KB): +line 1 of preview +line 2 of preview +... +`; + + it("renders persisted output banner for Bash tool", () => { + render(); + expect(screen.getByText(/Output truncated/)).toBeInTheDocument(); + expect(screen.getByText("/tmp/output.txt")).toBeInTheDocument(); + }); + + it("renders preview content", () => { + render(); + expect(screen.getByText(/line 1 of preview/)).toBeInTheDocument(); + }); + + it("works for any tool name", () => { + render(); + expect(screen.getByText(/Output truncated/)).toBeInTheDocument(); + }); + + it("falls through to normal rendering for non-persisted content", () => { + render(); + expect(screen.queryByText(/Output truncated/)).not.toBeInTheDocument(); + }); + }); }); diff --git a/web/src/components/ToolResultBlock.tsx b/web/src/components/ToolResultBlock.tsx index bab8792f..24efc77a 100644 --- a/web/src/components/ToolResultBlock.tsx +++ b/web/src/components/ToolResultBlock.tsx @@ -1,10 +1,12 @@ import { useState } from "react"; import type { ConsumerContentBlock } from "../../../shared/consumer-types"; import { stripAnsi } from "../utils/ansi-strip"; +import { isPersistedOutput, parsePersistedOutput } from "../utils/persisted-output"; import { truncateLines } from "../utils/truncate"; import { containsUnifiedDiff } from "../utils/unified-diff"; import { CopyButton } from "./CopyButton"; import { MarkdownContent } from "./MarkdownContent"; +import { PersistedOutputBlock } from "./PersistedOutputBlock"; import { UnifiedDiffBlock } from "./UnifiedDiffBlock"; interface ToolResultBlockProps { @@ -180,6 +182,14 @@ function renderContent( content: string | ConsumerContentBlock[], isError?: boolean, ): React.ReactNode { + // Check for persisted-output wrapper before tool-specific rendering + if (typeof content === "string" && isPersistedOutput(text)) { + const parsed = parsePersistedOutput(text); + if (parsed) { + return ; + } + } + switch (toolName) { case "Bash": { const stripped = stripAnsi(text); diff --git a/web/src/hooks/useKeyboardShortcuts.ts b/web/src/hooks/useKeyboardShortcuts.ts index f66f5343..aa1488dc 100644 --- a/web/src/hooks/useKeyboardShortcuts.ts +++ b/web/src/hooks/useKeyboardShortcuts.ts @@ -59,6 +59,13 @@ export function useKeyboardShortcuts(): void { } } + // Alt+M: toggle message flow panel + if (e.altKey && (e.key === "m" || e.key === "M")) { + e.preventDefault(); + state.setMessageFlowOpen(!state.messageFlowOpen); + return; + } + // ?: open shortcuts modal (only when not typing in input) if (e.key === "?" && !meta && !e.altKey && !isInputFocused()) { e.preventDefault(); diff --git a/web/src/hooks/useMessageFlow.test.ts b/web/src/hooks/useMessageFlow.test.ts new file mode 100644 index 00000000..6a561cc2 --- /dev/null +++ b/web/src/hooks/useMessageFlow.test.ts @@ -0,0 +1,164 @@ +import { act, renderHook } from "@testing-library/react"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { ConsumerMessage, InboundMessage } from "../../../shared/consumer-types"; +import { MAX_FLOW_MESSAGES, useMessageFlow } from "./useMessageFlow"; + +// Capture the listener callbacks registered via ws.ts exports +type InboundCb = (sessionId: string, msg: ConsumerMessage) => void; +type OutboundCb = (sessionId: string, msg: InboundMessage) => void; + +const { inboundListeners, outboundListeners } = vi.hoisted(() => ({ + inboundListeners: new Set(), + outboundListeners: new Set(), +})); + +vi.mock("../ws", () => ({ + addFlowInboundListener: (cb: InboundCb) => { + inboundListeners.add(cb); + return () => inboundListeners.delete(cb); + }, + addFlowOutboundListener: (cb: OutboundCb) => { + outboundListeners.add(cb); + return () => outboundListeners.delete(cb); + }, +})); + +const SESSION = "test-session"; + +function fireInbound(msg: Partial & { type: string }) { + for (const cb of inboundListeners) cb(SESSION, msg as ConsumerMessage); +} + +function fireOutbound(msg: Partial & { type: string }) { + for (const cb of outboundListeners) cb(SESSION, msg as InboundMessage); +} + +describe("useMessageFlow", () => { + beforeEach(() => { + inboundListeners.clear(); + outboundListeners.clear(); + vi.stubGlobal("crypto", { randomUUID: () => `uuid-${++uuidCounter}` }); + uuidCounter = 0; + }); + + let uuidCounter = 0; + + it("captures inbound and outbound messages", () => { + const { result } = renderHook(() => useMessageFlow(SESSION)); + expect(result.current.messages).toHaveLength(0); + + act(() => fireInbound({ type: "assistant" })); + expect(result.current.messages).toHaveLength(1); + // bridge→consumer = left/outbound column + expect(result.current.messages[0].direction).toBe("out"); + + act(() => fireOutbound({ type: "user_message" })); + expect(result.current.messages).toHaveLength(2); + // consumer→bridge = right/inbound column + expect(result.current.messages[1].direction).toBe("in"); + }); + + it("ring buffer caps at MAX_FLOW_MESSAGES", () => { + const { result } = renderHook(() => useMessageFlow(SESSION)); + + act(() => { + for (let i = 0; i < MAX_FLOW_MESSAGES + 1; i++) { + fireInbound({ type: "assistant" }); + } + }); + + expect(result.current.messages).toHaveLength(MAX_FLOW_MESSAGES); + // First message should have been evicted; second message should be first + expect(result.current.messages[0].id).toBe("uuid-2"); + }); + + it("pause/resume: messages during pause appear on resume", () => { + const { result } = renderHook(() => useMessageFlow(SESSION)); + + act(() => fireInbound({ type: "assistant" })); + expect(result.current.messages).toHaveLength(1); + + // Pause + act(() => result.current.setPaused(true)); + expect(result.current.paused).toBe(true); + + // Messages during pause go to pending + act(() => { + fireInbound({ type: "stream_event" }); + fireInbound({ type: "result" }); + }); + expect(result.current.messages).toHaveLength(1); + expect(result.current.pendingCount).toBe(2); + + // Resume flushes pending + act(() => result.current.setPaused(false)); + expect(result.current.paused).toBe(false); + expect(result.current.messages).toHaveLength(3); + expect(result.current.pendingCount).toBe(0); + }); + + it("permission_request/permission_response pairing sets pairedId", () => { + const { result } = renderHook(() => useMessageFlow(SESSION)); + + // Inbound permission_request + act(() => + fireInbound({ + type: "permission_request", + request: { id: "req-1", tool: "Bash", input: {} }, + } as unknown as ConsumerMessage), + ); + + const reqMsg = result.current.messages[0]; + expect(reqMsg.type).toBe("permission_request"); + + // Outbound permission_response + act(() => + fireOutbound({ + type: "permission_response", + request_id: "req-1", + allowed: true, + } as unknown as InboundMessage), + ); + + const resMsg = result.current.messages[1]; + expect(resMsg.type).toBe("permission_response"); + + // Both should be paired + expect(resMsg.pairedId).toBe(reqMsg.id); + expect(reqMsg.pairedId).toBe(resMsg.id); + }); + + it("ignores messages for other sessions", () => { + const { result } = renderHook(() => useMessageFlow(SESSION)); + + act(() => { + for (const cb of inboundListeners) + cb("other-session", { type: "assistant" } as ConsumerMessage); + }); + + expect(result.current.messages).toHaveLength(0); + }); + + it("clear() resets all state", () => { + const { result } = renderHook(() => useMessageFlow(SESSION)); + + act(() => { + fireInbound({ type: "assistant" }); + fireInbound({ type: "result" }); + }); + expect(result.current.messages).toHaveLength(2); + + act(() => result.current.clear()); + expect(result.current.messages).toHaveLength(0); + }); + + it("cleans up listeners on unmount", () => { + const { unmount } = renderHook(() => useMessageFlow(SESSION)); + expect(inboundListeners.size).toBe(1); + expect(outboundListeners.size).toBe(1); + + unmount(); + expect(inboundListeners.size).toBe(0); + expect(outboundListeners.size).toBe(0); + }); +}); diff --git a/web/src/hooks/useMessageFlow.ts b/web/src/hooks/useMessageFlow.ts new file mode 100644 index 00000000..b62d8d05 --- /dev/null +++ b/web/src/hooks/useMessageFlow.ts @@ -0,0 +1,173 @@ +import { useCallback, useEffect, useRef, useState } from "react"; +import type { FlowMessage } from "../components/MessagePill"; +import { addFlowInboundListener, addFlowOutboundListener } from "../ws"; + +export const MAX_FLOW_MESSAGES = 500; + +interface UseMessageFlowResult { + messages: FlowMessage[]; + paused: boolean; + pendingCount: number; + setPaused: (v: boolean) => void; + clear: () => void; +} + +export function useMessageFlow(sessionId: string | null): UseMessageFlowResult { + const [, setTick] = useState(0); + const [paused, setPausedState] = useState(false); + + const bufferRef = useRef([]); + const pendingRef = useRef([]); + const pausedRef = useRef(false); + const sessionStartRef = useRef(null); + const pairingIndexRef = useRef(new Map()); + + const forceRender = useCallback(() => setTick((n) => n + 1), []); + + const appendToBuffer = useCallback((msg: FlowMessage) => { + const buf = bufferRef.current; + if (buf.length >= MAX_FLOW_MESSAGES) { + buf.shift(); + } + buf.push(msg); + }, []); + + const buildFlowMessage = useCallback( + (direction: "in" | "out", type: string, payload: unknown): FlowMessage => { + const now = Date.now(); + if (sessionStartRef.current === null) { + sessionStartRef.current = now; + } + return { + id: crypto.randomUUID(), + direction, + type, + payload, + wallTime: now, + timestamp: now - sessionStartRef.current, + }; + }, + [], + ); + + const handlePairing = useCallback((msg: FlowMessage) => { + const index = pairingIndexRef.current; + const p = msg.payload as Record; + + if (msg.direction === "in" && msg.type === "permission_response") { + const requestId = p.request_id as string | undefined; + if (requestId) { + const pairedFlowId = index.get(requestId); + if (pairedFlowId) { + msg.pairedId = pairedFlowId; + const partner = bufferRef.current.find((m) => m.id === pairedFlowId); + if (partner) partner.pairedId = msg.id; + // Also check pending + const pendingPartner = pendingRef.current.find((m) => m.id === pairedFlowId); + if (pendingPartner) pendingPartner.pairedId = msg.id; + } + } + } + + if (msg.direction === "out" && msg.type === "permission_request") { + const request = p.request as Record | undefined; + const requestId = request?.id as string | undefined; + if (requestId) { + index.set(requestId, msg.id); + } + } + }, []); + + const ingest = useCallback( + (msg: FlowMessage) => { + handlePairing(msg); + if (pausedRef.current) { + pendingRef.current.push(msg); + forceRender(); + } else { + appendToBuffer(msg); + forceRender(); + } + }, + [handlePairing, appendToBuffer, forceRender], + ); + + const setPaused = useCallback( + (v: boolean) => { + pausedRef.current = v; + setPausedState(v); + if (!v) { + // Flush pending into ring buffer + for (const msg of pendingRef.current) { + appendToBuffer(msg); + } + pendingRef.current = []; + forceRender(); + } + }, + [appendToBuffer, forceRender], + ); + + const clear = useCallback(() => { + bufferRef.current = []; + pendingRef.current = []; + sessionStartRef.current = null; + pairingIndexRef.current.clear(); + forceRender(); + }, [forceRender]); + + useEffect(() => { + if (!sessionId) return; + + const removeInbound = addFlowInboundListener((sid, msg) => { + if (sid !== sessionId) return; + + // Handle translation events separately to populate boundary metadata + if (msg.type === "translation_event") { + const evt = msg as { + type: "translation_event"; + boundary: "T1" | "T2" | "T3" | "T4"; + translator: string; + from: { format: string; body: unknown }; + to: { format: string; body: unknown }; + traceId?: string; + timestamp: number; + sessionId: string; + }; + // T1/T2 = consumer→backend path (right/inbound column) + // T3/T4 = backend→consumer path (left/outbound column) + const direction = evt.boundary === "T1" || evt.boundary === "T2" ? "in" : "out"; + const flowMsg = buildFlowMessage(direction, evt.boundary, evt.to.body); + flowMsg.boundary = evt.boundary; + flowMsg.translator = evt.translator; + flowMsg.nativeFormat = evt.from; + flowMsg.traceId = evt.traceId; + ingest(flowMsg); + } else { + // Regular consumer message — bridge→consumer = left/outbound column + const flowMsg = buildFlowMessage("out", msg.type, msg); + ingest(flowMsg); + } + }); + + const removeOutbound = addFlowOutboundListener((sid, msg) => { + if (sid !== sessionId) return; + // consumer→bridge = right/inbound column + const flowMsg = buildFlowMessage("in", msg.type, msg); + ingest(flowMsg); + }); + + return () => { + removeInbound(); + removeOutbound(); + }; + }, [sessionId, buildFlowMessage, ingest]); + + return { + messages: bufferRef.current, + paused, + pendingCount: pendingRef.current.length, + setPaused, + clear, + }; +} diff --git a/web/src/store.test.ts b/web/src/store.test.ts index 682a893d..790a5686 100644 --- a/web/src/store.test.ts +++ b/web/src/store.test.ts @@ -442,11 +442,31 @@ describe("store", () => { }); }); + describe("streaming thinking", () => { + it("appendStreamingThinking accumulates thinking text", () => { + store().appendStreamingThinking(SESSION_ID, "Let me"); + store().appendStreamingThinking(SESSION_ID, " think..."); + expect(store().sessionData[SESSION_ID].streamingThinking).toBe("Let me think..."); + }); + + it("appendStreamingThinking initializes from null", () => { + store().appendStreamingThinking(SESSION_ID, "first"); + expect(store().sessionData[SESSION_ID].streamingThinking).toBe("first"); + }); + + it("clearStreaming also clears streamingThinking", () => { + store().appendStreamingThinking(SESSION_ID, "thinking"); + store().clearStreaming(SESSION_ID); + expect(store().sessionData[SESSION_ID].streamingThinking).toBeNull(); + }); + }); + describe("agent streaming", () => { it("initAgentStreaming creates entry", () => { store().initAgentStreaming(SESSION_ID, "agent-1"); const entry = store().sessionData[SESSION_ID].agentStreaming["agent-1"]; expect(entry.text).toBe(""); + expect(entry.thinking).toBeNull(); expect(entry.startedAt).toBeTypeOf("number"); expect(entry.outputTokens).toBe(0); }); @@ -492,6 +512,21 @@ describe("store", () => { expect(store().sessionData[SESSION_ID].agentStreaming["agent-1"]).toBeUndefined(); expect(store().sessionData[SESSION_ID].agentStreaming["agent-2"].text).toBe("a2"); }); + + it("appendAgentStreamingThinking accumulates thinking", () => { + store().initAgentStreaming(SESSION_ID, "agent-1"); + store().appendAgentStreamingThinking(SESSION_ID, "agent-1", "thought"); + expect(store().sessionData[SESSION_ID].agentStreaming["agent-1"].thinking).toBe("thought"); + }); + + it("appendAgentStreaming preserves thinking when text appended", () => { + store().initAgentStreaming(SESSION_ID, "agent-1"); + store().appendAgentStreamingThinking(SESSION_ID, "agent-1", "thought"); + store().appendAgentStreaming(SESSION_ID, "agent-1", "text"); + const entry = store().sessionData[SESSION_ID].agentStreaming["agent-1"]; + expect(entry.thinking).toBe("thought"); + expect(entry.text).toBe("text"); + }); }); describe("identity", () => { diff --git a/web/src/store.ts b/web/src/store.ts index 8418051e..83061688 100644 --- a/web/src/store.ts +++ b/web/src/store.ts @@ -36,6 +36,7 @@ export interface SessionIdentity { export interface SessionData { messages: ConsumerMessage[]; streaming: string | null; + streamingThinking: string | null; streamingStartedAt: number | null; streamingOutputTokens: number; streamingBlocks: ConsumerContentBlock[]; @@ -55,6 +56,7 @@ export interface SessionData { string, { text: string | null; + thinking: string | null; startedAt: number | null; outputTokens: number; } @@ -106,6 +108,7 @@ export interface AppState { toasts: Toast[]; processLogs: Record; logDrawerOpen: boolean; + messageFlowOpen: boolean; // Actions setCurrentSession: (id: string) => void; @@ -124,6 +127,7 @@ export interface AppState { removeToast: (id: string) => void; appendProcessLog: (sessionId: string, line: string) => void; setLogDrawerOpen: (open: boolean) => void; + setMessageFlowOpen: (open: boolean) => void; // Session data actions ensureSessionData: (id: string) => void; @@ -131,6 +135,7 @@ export interface AppState { setMessages: (sessionId: string, messages: ConsumerMessage[]) => void; setStreaming: (sessionId: string, text: string | null) => void; appendStreaming: (sessionId: string, delta: string) => void; + appendStreamingThinking: (sessionId: string, delta: string) => void; setStreamingStarted: (sessionId: string, ts: number | null) => void; setStreamingOutputTokens: (sessionId: string, count: number) => void; setStreamingBlocks: (sessionId: string, blocks: ConsumerContentBlock[]) => void; @@ -167,6 +172,7 @@ export interface AppState { ) => void; initAgentStreaming: (sessionId: string, agentId: string) => void; appendAgentStreaming: (sessionId: string, agentId: string, delta: string) => void; + appendAgentStreamingThinking: (sessionId: string, agentId: string, delta: string) => void; setAgentStreamingOutputTokens: (sessionId: string, agentId: string, count: number) => void; clearAgentStreaming: (sessionId: string, agentId: string) => void; setQueuedMessage: (sessionId: string, msg: SessionData["queuedMessage"]) => void; @@ -192,6 +198,7 @@ function emptySessionData(): SessionData { return { messages: [], streaming: null, + streamingThinking: null, streamingStartedAt: null, streamingOutputTokens: 0, streamingBlocks: [], @@ -296,6 +303,7 @@ export const useStore = create()((set, get) => ({ toasts: [], processLogs: {}, logDrawerOpen: false, + messageFlowOpen: false, setCurrentSession: (id) => set({ currentSessionId: id }), toggleSidebar: () => @@ -353,6 +361,7 @@ export const useStore = create()((set, get) => ({ }), setLogDrawerOpen: (open) => set({ logDrawerOpen: open }), + setMessageFlowOpen: (open) => set({ messageFlowOpen: open }), ensureSessionData: (id) => { if (!get().sessionData[id]) { @@ -389,6 +398,14 @@ export const useStore = create()((set, get) => ({ return patchSession(s, sessionId, { streaming: (data.streaming ?? "") + delta }); }), + appendStreamingThinking: (sessionId, delta) => + set((s) => { + const data = s.sessionData[sessionId] ?? emptySessionData(); + return patchSession(s, sessionId, { + streamingThinking: (data.streamingThinking ?? "") + delta, + }); + }), + setStreamingStarted: (sessionId, ts) => set((s) => patchSession(s, sessionId, { streamingStartedAt: ts })), @@ -402,6 +419,7 @@ export const useStore = create()((set, get) => ({ set((s) => patchSession(s, sessionId, { streaming: null, + streamingThinking: null, streamingStartedAt: null, streamingOutputTokens: 0, streamingBlocks: [], @@ -471,7 +489,7 @@ export const useStore = create()((set, get) => ({ return patchSession(s, sessionId, { agentStreaming: { ...data.agentStreaming, - [agentId]: { text: "", startedAt: Date.now(), outputTokens: 0 }, + [agentId]: { text: "", thinking: null, startedAt: Date.now(), outputTokens: 0 }, }, }); }), @@ -485,6 +503,24 @@ export const useStore = create()((set, get) => ({ ...data.agentStreaming, [agentId]: { text: (current?.text ?? "") + delta, + thinking: current?.thinking ?? null, + startedAt: current?.startedAt ?? null, + outputTokens: current?.outputTokens ?? 0, + }, + }, + }); + }), + + appendAgentStreamingThinking: (sessionId, agentId, delta) => + set((s) => { + const data = s.sessionData[sessionId] ?? emptySessionData(); + const current = data.agentStreaming[agentId]; + return patchSession(s, sessionId, { + agentStreaming: { + ...data.agentStreaming, + [agentId]: { + text: current?.text ?? null, + thinking: (current?.thinking ?? "") + delta, startedAt: current?.startedAt ?? null, outputTokens: current?.outputTokens ?? 0, }, diff --git a/web/src/utils/persisted-output.test.ts b/web/src/utils/persisted-output.test.ts new file mode 100644 index 00000000..821de313 --- /dev/null +++ b/web/src/utils/persisted-output.test.ts @@ -0,0 +1,40 @@ +import { describe, expect, it } from "vitest"; +import { isPersistedOutput, parsePersistedOutput } from "./persisted-output"; + +const SAMPLE = ` +Output too large (57.8KB). Full output saved to: /tmp/output.txt + +Preview (first 2KB): +line 1 of preview +line 2 of preview +... +`; + +describe("isPersistedOutput", () => { + it("returns true for persisted-output wrapper", () => { + expect(isPersistedOutput(SAMPLE)).toBe(true); + }); + + it("returns false for plain text", () => { + expect(isPersistedOutput("just some output")).toBe(false); + }); + + it("returns false for empty string", () => { + expect(isPersistedOutput("")).toBe(false); + }); +}); + +describe("parsePersistedOutput", () => { + it("extracts size, filePath, and preview", () => { + const result = parsePersistedOutput(SAMPLE); + expect(result).not.toBeNull(); + expect(result!.size).toBe("57.8KB"); + expect(result!.filePath).toBe("/tmp/output.txt"); + expect(result!.preview).toContain("line 1 of preview"); + expect(result!.preview).toContain("line 2 of preview"); + }); + + it("returns null for non-matching text", () => { + expect(parsePersistedOutput("regular output")).toBeNull(); + }); +}); diff --git a/web/src/utils/persisted-output.ts b/web/src/utils/persisted-output.ts new file mode 100644 index 00000000..4ad2b533 --- /dev/null +++ b/web/src/utils/persisted-output.ts @@ -0,0 +1,44 @@ +export interface ParsedPersistedOutput { + /** Human-readable size string, e.g. "57.8KB" */ + size: string; + /** File path where full output was saved */ + filePath: string; + /** The truncated preview content */ + preview: string; +} + +const PERSISTED_OUTPUT_RE = /^\s*\n/; + +/** + * Returns true if the text is wrapped in `` tags. + */ +export function isPersistedOutput(text: string): boolean { + return PERSISTED_OUTPUT_RE.test(text); +} + +/** + * Parse a persisted-output block into its components. + * Returns null if the text doesn't match the expected format. + */ +export function parsePersistedOutput(text: string): ParsedPersistedOutput | null { + if (!isPersistedOutput(text)) return null; + + // Strip the wrapper tags + const inner = text + .replace(/^\s*\n/, "") + .replace(/\n<\/persisted-output>\s*$/, ""); + + // Parse header line: "Output too large (57.8KB). Full output saved to: /path/to/file.txt" + const headerMatch = inner.match( + /^Output too large \(([^)]+)\)\.\s*Full output saved to:\s*(.+)/m, + ); + + const size = headerMatch?.[1] ?? "unknown"; + const filePath = headerMatch?.[2]?.trim() ?? ""; + + // Extract preview content (everything after "Preview (first ...):\n") + const previewMatch = inner.match(/Preview \(first [^)]*\):\s*\n([\s\S]*?)(?:\n\.\.\.\s*$|$)/); + const preview = previewMatch?.[1] ?? ""; + + return { size, filePath, preview }; +} diff --git a/web/src/ws.test.ts b/web/src/ws.test.ts index e5a518f3..a4fad18d 100644 --- a/web/src/ws.test.ts +++ b/web/src/ws.test.ts @@ -634,6 +634,69 @@ describe("handleMessage", () => { expect(getSessionData()?.agentStreaming?.["agent-1"]?.text).toBe("agent text"); }); + // ── stream_event → content_block_delta (thinking) ───────────────────── + + it("stream_event thinking_delta: buffers thinking to main streaming", () => { + const ws = openSession(); + useStore.getState().setStreaming("s1", ""); + useStore.getState().setStreamingStarted("s1", Date.now()); + + ws.simulateMessage( + JSON.stringify({ + type: "stream_event", + event: { + type: "content_block_delta", + delta: { type: "thinking_delta", thinking: "Let me think" }, + }, + parent_tool_use_id: null, + }), + ); + + flushDeltas(); + expect(getSessionData()?.streamingThinking).toBe("Let me think"); + }); + + it("stream_event thinking_delta with agent: appends to agent thinking", () => { + const ws = openSession(); + useStore.getState().initAgentStreaming("s1", "agent-1"); + + ws.simulateMessage( + JSON.stringify({ + type: "stream_event", + event: { + type: "content_block_delta", + delta: { type: "thinking_delta", thinking: "agent thought" }, + }, + parent_tool_use_id: "agent-1", + }), + ); + + flushDeltas(); + expect(getSessionData()?.agentStreaming?.["agent-1"]?.thinking).toBe("agent thought"); + }); + + it("stream_event thinking_delta auto-inits streaming if needed", () => { + const ws = openSession(); + + ws.simulateMessage( + JSON.stringify({ + type: "stream_event", + event: { + type: "content_block_delta", + delta: { type: "thinking_delta", thinking: "thought" }, + }, + parent_tool_use_id: null, + }), + ); + + // Streaming should have been auto-initialized + expect(getSessionData()?.streaming).toBe(""); + expect(getSessionData()?.sessionStatus).toBe("running"); + + flushDeltas(); + expect(getSessionData()?.streamingThinking).toBe("thought"); + }); + // ── stream_event → message_delta ──────────────────────────────────────── it("stream_event message_delta: sets output tokens", () => { diff --git a/web/src/ws.ts b/web/src/ws.ts index 1d75e772..202fc0ca 100644 --- a/web/src/ws.ts +++ b/web/src/ws.ts @@ -16,6 +16,22 @@ const reconnectState = new Map< >(); const MAX_RECONNECT_DELAY = 30_000; +// ── Message flow tap (dev panel) ────────────────────────────────────────── +type FlowInboundListener = (sessionId: string, msg: ConsumerMessage) => void; +type FlowOutboundListener = (sessionId: string, msg: InboundMessage) => void; +const flowInboundListeners = new Set(); +const flowOutboundListeners = new Set(); + +export function addFlowInboundListener(cb: FlowInboundListener): () => void { + flowInboundListeners.add(cb); + return () => flowInboundListeners.delete(cb); +} + +export function addFlowOutboundListener(cb: FlowOutboundListener): () => void { + flowOutboundListeners.add(cb); + return () => flowOutboundListeners.delete(cb); +} + // ── Streaming delta batching ────────────────────────────────────────────── // Coalesce rapid content_block_delta events into at most one Zustand set() // per animation frame, reducing re-renders from hundreds/s to ~60/s. @@ -23,8 +39,12 @@ const MAX_RECONNECT_DELAY = 30_000; interface PendingDelta { /** Accumulated text for the main session streaming. */ main: string; + /** Accumulated thinking for the main session streaming. */ + mainThinking: string; /** Accumulated text per agent sub-stream. */ agents: Record; + /** Accumulated thinking per agent sub-stream. */ + agentsThinking: Record; } const pendingDeltas = new Map(); @@ -48,16 +68,22 @@ export function flushDeltas(): void { if (delta.main) { store.appendStreaming(sessionId, delta.main); } + if (delta.mainThinking) { + store.appendStreamingThinking(sessionId, delta.mainThinking); + } for (const [agentId, text] of Object.entries(delta.agents)) { store.appendAgentStreaming(sessionId, agentId, text); } + for (const [agentId, thinking] of Object.entries(delta.agentsThinking)) { + store.appendAgentStreamingThinking(sessionId, agentId, thinking); + } } } function bufferStreamingDelta(sessionId: string, agentId: string | null, text: string): void { let entry = pendingDeltas.get(sessionId); if (!entry) { - entry = { main: "", agents: {} }; + entry = { main: "", mainThinking: "", agents: {}, agentsThinking: {} }; pendingDeltas.set(sessionId, entry); } if (agentId) { @@ -68,6 +94,24 @@ function bufferStreamingDelta(sessionId: string, agentId: string | null, text: s scheduleDeltaFlush(); } +function bufferStreamingThinkingDelta( + sessionId: string, + agentId: string | null, + thinking: string, +): void { + let entry = pendingDeltas.get(sessionId); + if (!entry) { + entry = { main: "", mainThinking: "", agents: {}, agentsThinking: {} }; + pendingDeltas.set(sessionId, entry); + } + if (agentId) { + entry.agentsThinking[agentId] = (entry.agentsThinking[agentId] ?? "") + thinking; + } else { + entry.mainThinking += thinking; + } + scheduleDeltaFlush(); +} + function getConsumerId(): string { const key = "beamcode_consumer_id"; let id = localStorage.getItem(key); @@ -113,6 +157,7 @@ function handleMessage(sessionId: string, data: string): void { // Guard: ensure parsed value is an object with a string `type` discriminant if (!parsed || typeof parsed !== "object" || !("type" in parsed)) return; const msg = parsed as ConsumerMessage; + for (const cb of flowInboundListeners) cb(sessionId, msg); store.ensureSessionData(sessionId); @@ -177,10 +222,6 @@ function handleMessage(sessionId: string, data: string): void { } case "queued_message_cancelled": - store.setQueuedMessage(sessionId, null); - store.setEditingQueue(sessionId, false); - break; - case "queued_message_sent": store.setQueuedMessage(sessionId, null); store.setEditingQueue(sessionId, false); @@ -204,15 +245,22 @@ function handleMessage(sessionId: string, data: string): void { case "content_block_delta": { const delta = (event as { delta?: { type: string; text?: string; thinking?: string } }) .delta; + + // Auto-init streaming if no message_start was received (ACP backends) + const needsAutoInit = + (delta?.type === "text_delta" || delta?.type === "thinking_delta") && + !agentId && + useStore.getState().sessionData[sessionId]?.streaming === null; + if (needsAutoInit) { + store.setStreamingStarted(sessionId, Date.now()); + store.setStreaming(sessionId, ""); + store.setSessionStatus(sessionId, "running"); + } + if (delta?.type === "text_delta" && delta.text) { - // Auto-init streaming if no message_start was received (ACP backends) - const sd = useStore.getState().sessionData[sessionId]; - if (!agentId && sd?.streaming === null) { - store.setStreamingStarted(sessionId, Date.now()); - store.setStreaming(sessionId, ""); - store.setSessionStatus(sessionId, "running"); - } bufferStreamingDelta(sessionId, agentId, delta.text); + } else if (delta?.type === "thinking_delta" && delta.thinking) { + bufferStreamingThinkingDelta(sessionId, agentId, delta.thinking); } break; } @@ -412,6 +460,11 @@ function handleMessage(sessionId: string, data: string): void { // Informational only — no store action needed break; + case "adapter_drop": + case "translation_event": + // Captured by flow panel listeners only; not surfaced in chat UI + break; + default: { // Compile-time exhaustiveness guard; runtime remains forward-compatible. const _exhaustive: never = msg; @@ -531,6 +584,7 @@ export function send(message: InboundMessage, sessionId?: string): void { if (!targetId) return; const socket = connections.get(targetId); if (socket?.readyState === WebSocket.OPEN) { + for (const cb of flowOutboundListeners) cb(targetId, message); socket.send(JSON.stringify(message)); } } @@ -540,4 +594,6 @@ export function _resetForTesting(): void { disconnect(); pendingDeltas.clear(); flushScheduled = false; + flowInboundListeners.clear(); + flowOutboundListeners.clear(); }