From b252f6ea3c817dd7b086d3456f9bc791b4c43189 Mon Sep 17 00:00:00 2001 From: Teng Lin Date: Thu, 26 Feb 2026 11:00:27 -0500 Subject: [PATCH 1/8] feat: add Message Flow Panel dev tool with adapter drop instrumentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a developer-facing WebSocket message visualizer panel to the web UI, toggled via ⌥M or the StatusBar "Flow" button. Disabled by default. ## Panel features - Two-lane timeline: outbound (bridge→consumer) left, inbound (consumer→bridge) right - Color-coded pills per message type with expand/collapse for full JSON - Hover-to-connect: glowing SVG bezier lines between paired messages (permission_request↔response, tool_use groups) with latency badge - LIVE/PAUSED toggle with pending count, type filter, auto-scroll, clear - Resizable width (drag left edge, 360–1200px range) - No horizontal scroll: pills use min-w-0 + break-all for content wrapping ## Adapter drop instrumentation - New adapter_drop ConsumerMessage type in both type systems - session-reducer emits adapter_drop when permission_request subtype is unsupported (anything other than can_use_tool), instead of silent discard - Frontend handles adapter_drop: stored as message, shown as red pill ## Implementation - ws.ts: addFlowInboundListener / addFlowOutboundListener tap exports - store.ts: messageFlowOpen boolean (default false) - useMessageFlow: ring buffer (500 cap), pause/resume, pairing index - MessagePill, ConnectorOverlay, MessageFlowPanel components - ⌥M shortcut in useKeyboardShortcuts, Flow button in StatusBar --- .../2026-02-25-message-flow-panel-design.md | 322 ++++++++++++++++++ shared/consumer-types.ts | 6 + src/core/session/session-reducer.ts | 10 + src/types/consumer-messages.ts | 6 + web/src/App.tsx | 7 + web/src/components/ConnectorOverlay.tsx | 81 +++++ web/src/components/MessageFlowPanel.test.tsx | 65 ++++ web/src/components/MessageFlowPanel.tsx | 262 ++++++++++++++ web/src/components/MessagePill.test.tsx | 104 ++++++ web/src/components/MessagePill.tsx | 87 +++++ web/src/components/StatusBar.tsx | 34 ++ web/src/hooks/useKeyboardShortcuts.ts | 7 + web/src/hooks/useMessageFlow.test.ts | 162 +++++++++ web/src/hooks/useMessageFlow.ts | 147 ++++++++ web/src/store.ts | 4 + web/src/ws.ts | 24 ++ 16 files changed, 1328 insertions(+) create mode 100644 docs/plans/2026-02-25-message-flow-panel-design.md create mode 100644 web/src/components/ConnectorOverlay.tsx create mode 100644 web/src/components/MessageFlowPanel.test.tsx create mode 100644 web/src/components/MessageFlowPanel.tsx create mode 100644 web/src/components/MessagePill.test.tsx create mode 100644 web/src/components/MessagePill.tsx create mode 100644 web/src/hooks/useMessageFlow.test.ts create mode 100644 web/src/hooks/useMessageFlow.ts diff --git a/docs/plans/2026-02-25-message-flow-panel-design.md b/docs/plans/2026-02-25-message-flow-panel-design.md new file mode 100644 index 00000000..e29a09ae --- /dev/null +++ b/docs/plans/2026-02-25-message-flow-panel-design.md @@ -0,0 +1,322 @@ +# Message Flow Panel — Design Doc + +**Date:** 2026-02-25 +**Status:** Approved, ready for implementation +**Review:** Passed Momus dual review (Claude + Gemini), issues fixed + +## Summary + +A developer-facing drawer panel in the BeamCode web UI that visualizes WebSocket message passing through the system in real time. Targeted at developers debugging adapter behavior, inspecting tool call chains, and verifying protocol correctness. + +## Aesthetic Direction: "Signal Wire" + +Oscilloscope / circuit trace aesthetic. Near-black background with subtle dot-grid texture. Monospace type throughout. Each message type assigned a saturated signal color. Glowing SVG connector lines draw between paired messages on hover. + +- **Font:** JetBrains Mono (content), Departure Mono or similar (header/labels) +- **Background:** `#0A0B0D` with dot-grid overlay +- **Connector lines:** cubic bezier SVG, animated with `stroke-dashoffset` draw on hover + +## Layout + +Two-lane layout with a vertical time axis in the center: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ ◈ MESSAGE FLOW ● LIVE [All Types ▾] [↓] [⌫] │ +├───────────────────────┬────────────┬────────────────────────┤ +│ OUTBOUND │ │ INBOUND │ +│ bridge → consumer │ TIME ↓ │ consumer → bridge │ +├───────────────────────┼────────────┼────────────────────────┤ +│ [pill] ─────────────→│ 00:01.234 │ │ +│ │ 00:01.301 │←──────────── [pill] │ +│ [pill] ─────────────→│ 00:01.305 │ │ +└───────────────────────┴────────────┴────────────────────────┘ +``` + +- **OUTBOUND** (left lane): messages flowing bridge → consumer, received via `socket.onmessage` +- **INBOUND** (right lane): messages flowing consumer → bridge, sent via `ws.ts:send()` + +## Message Pill Anatomy + +``` +┌──────────────────────────────────────┐ +│▌ tool_use 00:01.234 ↗ [▾] │ +│ name: Bash │ +└──────────────────────────────────────┘ +``` + +- 3px left-edge color bar (type color) +- Type label (bold monospace) +- Timestamp: `elapsed ms` since first message in session (see Data Model) +- Direction arrow: ↗ outbound / ↙ inbound +- Expand chevron `[▾]` → full JSON view +- Hover: background lifts, connector activates + +## Message Type Color System + +Note: `tool_use`, `tool_result`, and `thinking` are **content block subtypes** nested inside +`assistant` messages — they do not appear as top-level `ConsumerMessage.type` values. +Top-level pills use `assistant` as the message type; content block subtypes are shown +in the expanded JSON view and in the pairing logic (which walks `content[]`). + +### Outbound top-level types (bridge → consumer) + +| Type | Color | Hex | +|------|-------|-----| +| `assistant` | Amber | `#F59E0B` | +| `stream_event` | Cyan | `#22D3EE` | +| `tool_progress` | Teal | `#14B8A6` | +| `tool_use_summary` | Sage | `#6EE7B7` | +| `status_change` | Violet | `#A78BFA` | +| `permission_request` | Coral | `#F97316` | +| `result` | Lime | `#84CC16` | +| `cli_connected` | Steel | `#94A3B8` | +| `cli_disconnected` | Muted red | `#F87171` | +| `error` | Red | `#EF4444` | +| `user_message` (echoed) | White | `#F8FAFC` | +| `message_queued` | Purple | `#C084FC` | +| `message_history` | Zinc | `#A1A1AA` | + +### Inbound top-level types (consumer → bridge, sent via `send()`) + +| Type | Color | Hex | +|------|-------|-----| +| `user_message` | White | `#F8FAFC` | +| `permission_response` | Coral lighter | `#FED7AA` | +| `interrupt` | Red | `#EF4444` | +| `slash_command` | Sky | `#38BDF8` | +| `queue_message` | Purple | `#C084FC` | +| `update_queued_message` | Purple lighter | `#E9D5FF` | +| `cancel_queued_message` | Muted red | `#F87171` | +| `set_model` | Zinc | `#A1A1AA` | +| `set_permission_mode` | Zinc | `#A1A1AA` | + +## Pairing Logic + +Related messages are highlighted on hover with a glowing connector line and latency badge. + +| Outbound | Paired Inbound | Match Key | Notes | +|----------|---------------|-----------|-------| +| `permission_request` | `permission_response` | `request.id` / `request_id` | 1:1 by ID | +| `assistant` (with `tool_use` block) | `tool_use_summary` or next `assistant` response | `content[].id` | Walk `content[]` for tool_use blocks | +| `tool_progress` | — | `tool_use_id` | Groups with originating `assistant` pill | +| `message_queued` | `update_queued_message` / `cancel_queued_message` / `queued_message_sent` | Session singleton | Only one queued message per session at a time; pair by temporal adjacency after `message_queued` outbound | + +**Note on `message_queued` pairing:** `message_queued` (outbound) carries `consumer_id` but +the inbound messages (`update_queued_message`, `cancel_queued_message`) do not carry a +matching field. Use session-singleton semantics: all inbound queue messages after a +`message_queued` event (until `queued_message_sent` or `queued_message_cancelled`) are +considered paired to that `message_queued` pill. + +**Hover behavior:** +1. Hovered pill + paired counterpart → full opacity; all others → 30% opacity +2. Curved SVG connector draws between them (cubic bezier, `stroke-dashoffset` animation, ~200ms) +3. Latency badge on connector midpoint: `+47ms` (difference in `wallTime` fields) + +For `tool_progress` groups: hover the parent `assistant` pill → all related `tool_progress` +pills glow in teal, connector runs to each. Mini badge shows: `3 progress events (1.2s)`. + +## Controls + +| Control | Behavior | +|---------|----------| +| `● LIVE` | Pulses when messages are flowing; click to **pause** — freezes display, keeps buffering to `pendingWhilePaused[]`, shows `● PAUSED +12` badge | +| `[All Types ▾]` | Multiselect filter by message type; filtered-out types show as faint tick marks on the time axis | +| `[↓ auto-scroll]` | Toggle — when off, new messages append without scrolling the view | +| `[⌫ clear]` | Clears `flowMessages` ring buffer in the hook; no store mutation | + +**Keyboard shortcuts:** +- `⌥M` — toggle panel open/closed; registered in `App.tsx` via `useEffect` + `document.addEventListener` (same pattern as other global shortcuts in `App.tsx`) +- `Escape` — close panel; registered inside `MessageFlowPanel.tsx` via `useEffect` + `document.addEventListener` (same pattern as `LogDrawer.tsx:22-29`) + +**StatusBar entry point:** Add a `MessageFlowButton` to `web/src/components/StatusBar.tsx`, +positioned after the existing logs button, following the same rendering pattern. + +## Implementation + +### 1. Intercept Mechanism in `ws.ts` + +Add a module-level listener registry **after** the existing module-level state (after line 17): + +```ts +// ── Message flow tap (dev panel) ────────────────────────────────────────── +type FlowInboundListener = (sessionId: string, msg: ConsumerMessage) => void; +type FlowOutboundListener = (sessionId: string, msg: InboundMessage) => void; +const flowInboundListeners = new Set(); +const flowOutboundListeners = new Set(); + +export function addFlowInboundListener(cb: FlowInboundListener): () => void { + flowInboundListeners.add(cb); + return () => flowInboundListeners.delete(cb); +} + +export function addFlowOutboundListener(cb: FlowOutboundListener): () => void { + flowOutboundListeners.add(cb); + return () => flowOutboundListeners.delete(cb); +} +``` + +In `handleMessage`, after `const msg = parsed as ConsumerMessage;` (line 115), add: +```ts +for (const cb of flowInboundListeners) cb(sessionId, msg); +``` + +In `send()` (line 529), before `socket.send(...)`, add: +```ts +for (const cb of flowOutboundListeners) cb(targetId, message); +``` + +**Done when:** `addFlowInboundListener` and `addFlowOutboundListener` are exported and calling them with a callback fires it once per matching message in `ws.test.ts`. + +### 2. Store Additions in `store.ts` + +Add to the flat `AppState` interface (alongside `logDrawerOpen`): + +```ts +messageFlowOpen: boolean; +setMessageFlowOpen: (open: boolean) => void; +``` + +Implement in the `create()` call following the same pattern as `logDrawerOpen`. No localStorage persistence needed (reset on page load is fine for a dev panel). + +The ring buffer (`flowMessages`) lives entirely in `useMessageFlow`'s `useRef` — not in the store. This keeps the store free of dev-only state and avoids unnecessary global re-renders. + +**Done when:** `useStore.getState().messageFlowOpen` is `false` by default and `setMessageFlowOpen(true)` toggles it, verified by a store test following the pattern in `store.test.ts`. + +### 3. `useMessageFlow` Hook + +File: `web/src/hooks/useMessageFlow.ts` + +```ts +const MAX_FLOW_MESSAGES = 500; // hard cap on ring buffer + +interface UseMessageFlowResult { + messages: FlowMessage[]; + paused: boolean; + pendingCount: number; + setPaused: (v: boolean) => void; + clear: () => void; +} +``` + +- On mount: call `addFlowInboundListener` and `addFlowOutboundListener`, store cleanup in a `useEffect` return +- Each incoming message: `crypto.randomUUID()` for `id`, `Date.now()` for `wallTime`, `Date.now() - sessionStartRef.current` for `timestamp` (where `sessionStartRef` is set to `Date.now()` on the first message received) +- Ring buffer: if `messages.length >= MAX_FLOW_MESSAGES`, evict `messages[0]` before appending +- When paused: new messages go to a `pendingRef` array instead; on resume, flush pending into ring buffer (capped) +- Pairing index: `Map` (id → paired FlowMessage id), built incrementally as messages arrive + +**Done when:** Unit test in `useMessageFlow.test.ts` asserts: +1. Ring buffer evicts at 501 messages: `expect(result.current.messages).toHaveLength(500)` +2. Pause stops messages from appearing in `messages`; resume flushes them +3. `permission_request` message sets `pairedId` on the corresponding `permission_response` + +### 4. `ConnectorOverlay.tsx` + +File: `web/src/components/ConnectorOverlay.tsx` + +SVG coordinate strategy: +- `MessageFlowPanel` root div has `position: relative` +- `ConnectorOverlay` is `position: absolute; inset: 0; pointer-events: none; overflow: visible` +- Each `MessagePill` has a `data-flow-id={msg.id}` attribute on its root element +- On hover, the overlay calls `document.querySelector([data-flow-id="${pairedId}"])` and `getBoundingClientRect()` on both pills, then subtracts the panel container's `getBoundingClientRect()` to get local coordinates +- SVG `` uses cubic bezier: control points at `(panelMidX, pillA.centerY)` and `(panelMidX, pillB.centerY)` +- Animated with `stroke-dasharray` + `stroke-dashoffset` CSS transition, ~200ms ease-out + +**Done when:** Hovering a `permission_request` pill draws a visible line to its paired `permission_response` pill, verified manually in the browser (no unit test for coordinate math). + +### 5. `MessageFlowPanel.tsx` + +File: `web/src/components/MessageFlowPanel.tsx` + +- Reads `messageFlowOpen` and `currentSessionId` from store +- Returns `null` when `!messageFlowOpen || !currentSessionId` (same pattern as `LogDrawer.tsx:31`) +- Renders: top controls bar, two-column layout (left: outbound pills, center: time axis, right: inbound pills), `ConnectorOverlay` as absolute child +- Escape-to-close via `useEffect` + `document.addEventListener` (copy pattern from `LogDrawer.tsx:22-29`) + +Layout classnames follow the project's Tailwind v4 conventions (see other components for reference). + +**Done when:** Panel renders correctly when `messageFlowOpen = true`, pills appear in the correct lane, and Escape closes the panel. Verified by component test in `MessageFlowPanel.test.tsx`. + +### 6. `MessagePill.tsx` + +File: `web/src/components/MessagePill.tsx` + +Props: +```ts +interface MessagePillProps { + message: FlowMessage; + dimmed: boolean; // true when another pill is hovered + onHoverStart: () => void; // notify parent to activate connector + onHoverEnd: () => void; +} +``` + +- Color bar: 3px left border using inline style with the type's hex color +- Preview: first 60 chars of `JSON.stringify(message.payload)` +- Expand: controlled by local `useState`, shows full `
` JSON on expand
+- `data-flow-id={message.id}` on root element (required by ConnectorOverlay)
+
+**Done when:** Snapshot test in `MessagePill.test.tsx` renders without error for each direction (`"out"`, `"in"`) and all representative message types.
+
+### 7. App.tsx Wiring
+
+In `web/src/App.tsx`, add alongside the existing `LogDrawer`:
+
+1. Import `MessageFlowPanel` and `setMessageFlowOpen` from store
+2. Add `⌥M` global shortcut in the top-level `useEffect` keyboard handler (or create one if absent):
+   ```ts
+   if (e.altKey && e.key === "m") {
+     useStore.getState().setMessageFlowOpen(!useStore.getState().messageFlowOpen);
+   }
+   ```
+3. Render `` as a sibling to `` in the JSX
+
+**Done when:** Pressing `⌥M` toggles the panel open/closed without errors.
+
+### 8. StatusBar Entry Point
+
+In `web/src/components/StatusBar.tsx`, add a `MessageFlowButton` component following the
+same pattern as the existing logs button. Position it adjacent to the logs button.
+
+**Done when:** Button is visible in the StatusBar and clicking it toggles `messageFlowOpen`.
+
+## Data Model
+
+```ts
+export const MAX_FLOW_MESSAGES = 500;
+
+export interface FlowMessage {
+  id: string;            // crypto.randomUUID() at capture time
+  direction: "out" | "in";
+  type: string;          // msg.type from ConsumerMessage or InboundMessage
+  payload: unknown;      // full original message object
+  timestamp: number;     // Date.now() - sessionStartMs (ms since first message)
+  wallTime: number;      // Date.now() at capture (used for latency badge)
+  pairedId?: string;     // id of the single paired FlowMessage (1:1 pairs)
+  groupIds?: string[];   // ids of related FlowMessages (1:N groups, e.g. tool_progress)
+}
+```
+
+`timestamp` is relative to the first message received in the current session (captured in
+`useMessageFlow` as `sessionStartRef = Date.now()` on first message). This is what drives
+the time axis labels. `wallTime` is absolute and used only for computing latency badges.
+
+## File Plan
+
+```
+web/src/components/
+  MessageFlowPanel.tsx        ← panel shell, two-lane layout, controls, Escape shortcut
+  MessageFlowPanel.test.tsx   ← renders correctly, Escape closes
+  MessagePill.tsx             ← pill: color bar, type, timestamp, preview, expand, data-flow-id
+  MessagePill.test.tsx        ← snapshot for each direction + representative types
+  ConnectorOverlay.tsx        ← position:absolute SVG layer, getBoundingClientRect on hover
+
+web/src/hooks/
+  useMessageFlow.ts           ← ring buffer (MAX_FLOW_MESSAGES), listeners, pause, pairing index
+  useMessageFlow.test.ts      ← ring cap, pause/resume, pairing logic
+
+web/src/ws.ts                 ← add addFlowInboundListener + addFlowOutboundListener exports
+web/src/store.ts              ← add messageFlowOpen + setMessageFlowOpen to AppState
+web/src/App.tsx               ← ⌥M shortcut +  render
+web/src/components/StatusBar.tsx ← MessageFlowButton
+```
diff --git a/shared/consumer-types.ts b/shared/consumer-types.ts
index 7c90fdf7..343dff39 100644
--- a/shared/consumer-types.ts
+++ b/shared/consumer-types.ts
@@ -314,6 +314,12 @@ export type ConsumerMessage =
       type: "session_lifecycle";
       subtype: string;
       metadata: Record;
+    }
+  | {
+      type: "adapter_drop";
+      reason: string;
+      dropped_type: string;
+      dropped_metadata?: Record;
     };
 
 // ── Inbound Messages (consumer → bridge) ────────────────────────────────────
diff --git a/src/core/session/session-reducer.ts b/src/core/session/session-reducer.ts
index 6d7cf9bc..1979e6b8 100644
--- a/src/core/session/session-reducer.ts
+++ b/src/core/session/session-reducer.ts
@@ -875,6 +875,16 @@ function buildEffects(
           eventType: "permission:requested",
           payload: { request: mapped.cliPerm },
         });
+      } else {
+        effects.push({
+          type: "BROADCAST",
+          message: {
+            type: "adapter_drop",
+            reason: `permission_request subtype '${String(message.metadata?.subtype ?? "unknown")}' is not supported (only 'can_use_tool' is handled)`,
+            dropped_type: "permission_request",
+            dropped_metadata: message.metadata as Record,
+          },
+        });
       }
       break;
     }
diff --git a/src/types/consumer-messages.ts b/src/types/consumer-messages.ts
index 6c5615d7..5bab836e 100644
--- a/src/types/consumer-messages.ts
+++ b/src/types/consumer-messages.ts
@@ -237,4 +237,10 @@ export type ConsumerMessage =
       type: "session_lifecycle";
       subtype: string;
       metadata: Record;
+    }
+  | {
+      type: "adapter_drop";
+      reason: string;
+      dropped_type: string;
+      dropped_metadata?: Record;
     };
diff --git a/web/src/App.tsx b/web/src/App.tsx
index f985904d..605433ef 100644
--- a/web/src/App.tsx
+++ b/web/src/App.tsx
@@ -2,6 +2,7 @@ import { Component, type ErrorInfo, type ReactNode, useEffect } from "react";
 import { listSessions } from "./api";
 import { ChatView } from "./components/ChatView";
 import { LogDrawer } from "./components/LogDrawer";
+import { MessageFlowPanel } from "./components/MessageFlowPanel";
 import { NewSessionDialog } from "./components/NewSessionDialog";
 import { QuickSwitcher } from "./components/QuickSwitcher";
 import { ShortcutsModal } from "./components/ShortcutsModal";
@@ -104,6 +105,7 @@ export default function App() {
   const sidebarOpen = useStore((s) => s.sidebarOpen);
   const taskPanelOpen = useStore((s) => s.taskPanelOpen);
   const logDrawerOpen = useStore((s) => s.logDrawerOpen);
+  const messageFlowOpen = useStore((s) => s.messageFlowOpen);
   const darkMode = useStore((s) => s.darkMode);
   const toggleSidebar = useStore((s) => s.toggleSidebar);
   const quickSwitcherOpen = useStore((s) => s.quickSwitcherOpen);
@@ -151,6 +153,11 @@ export default function App() {
         {logDrawerOpen && }
       
 
+      {/* Message flow panel */}
+      Flow error}>
+        {messageFlowOpen && }
+      
+
       
       Dialog error}>
         
diff --git a/web/src/components/ConnectorOverlay.tsx b/web/src/components/ConnectorOverlay.tsx
new file mode 100644
index 00000000..31970cff
--- /dev/null
+++ b/web/src/components/ConnectorOverlay.tsx
@@ -0,0 +1,81 @@
+import type { RefObject } from "react";
+import type { FlowMessage } from "./MessagePill";
+import { getColor } from "./MessagePill";
+
+interface ConnectorOverlayProps {
+  hoveredId: string | null;
+  messages: FlowMessage[];
+  containerRef: RefObject;
+}
+
+const svgStyle = {
+  position: "absolute" as const,
+  inset: 0,
+  pointerEvents: "none" as const,
+  overflow: "visible" as const,
+  width: "100%",
+  height: "100%",
+};
+
+export function ConnectorOverlay({ hoveredId, messages, containerRef }: ConnectorOverlayProps) {
+  if (!hoveredId || !containerRef.current) {
+    return ;
+  }
+
+  const hovered = messages.find((m) => m.id === hoveredId);
+  if (!hovered?.pairedId) {
+    return ;
+  }
+
+  const paired = messages.find((m) => m.id === hovered.pairedId);
+  if (!paired) {
+    return ;
+  }
+
+  const container = containerRef.current;
+  const el1 = container.querySelector(`[data-flow-id="${hoveredId}"]`);
+  const el2 = container.querySelector(`[data-flow-id="${hovered.pairedId}"]`);
+  if (!el1 || !el2) {
+    return ;
+  }
+
+  const containerRect = container.getBoundingClientRect();
+  const rect1 = el1.getBoundingClientRect();
+  const rect2 = el2.getBoundingClientRect();
+
+  const x1 = rect1.left - containerRect.left + rect1.width / 2;
+  const y1 = rect1.top - containerRect.top + rect1.height / 2;
+  const x2 = rect2.left - containerRect.left + rect2.width / 2;
+  const y2 = rect2.top - containerRect.top + rect2.height / 2;
+  const midX = containerRect.width / 2;
+
+  const d = `M ${x1} ${y1} C ${midX} ${y1}, ${midX} ${y2}, ${x2} ${y2}`;
+  const color = getColor(hovered.type);
+  const latencyMs = Math.abs(hovered.wallTime - paired.wallTime);
+
+  return (
+    
+  );
+}
diff --git a/web/src/components/MessageFlowPanel.test.tsx b/web/src/components/MessageFlowPanel.test.tsx
new file mode 100644
index 00000000..08bcece4
--- /dev/null
+++ b/web/src/components/MessageFlowPanel.test.tsx
@@ -0,0 +1,65 @@
+import { cleanup, fireEvent, render, screen } from "@testing-library/react";
+import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
+import { useStore } from "../store";
+
+// Mock useMessageFlow to avoid pulling in ws.ts
+vi.mock("../hooks/useMessageFlow", () => ({
+  useMessageFlow: () => ({
+    messages: [],
+    paused: false,
+    pendingCount: 0,
+    setPaused: vi.fn(),
+    clear: vi.fn(),
+  }),
+}));
+
+// Lazy import after mock is registered
+const { MessageFlowPanel } = await import("./MessageFlowPanel");
+
+// jsdom doesn't implement scrollIntoView
+Element.prototype.scrollIntoView = vi.fn();
+
+describe("MessageFlowPanel", () => {
+  beforeEach(() => {
+    useStore.setState({
+      messageFlowOpen: false,
+      currentSessionId: "test-session",
+    });
+  });
+
+  afterEach(cleanup);
+
+  it("returns null when messageFlowOpen is false", () => {
+    const { container } = render();
+    expect(container.firstChild).toBeNull();
+  });
+
+  it("renders when messageFlowOpen is true", () => {
+    useStore.setState({ messageFlowOpen: true });
+    render();
+    expect(screen.getByText("MESSAGE FLOW")).toBeTruthy();
+  });
+
+  it("returns null when currentSessionId is null", () => {
+    useStore.setState({ messageFlowOpen: true, currentSessionId: null });
+    const { container } = render();
+    expect(container.firstChild).toBeNull();
+  });
+
+  it("closes on Escape key", () => {
+    useStore.setState({ messageFlowOpen: true });
+    render();
+    expect(screen.getByText("MESSAGE FLOW")).toBeTruthy();
+
+    fireEvent.keyDown(document, { key: "Escape" });
+    expect(useStore.getState().messageFlowOpen).toBe(false);
+  });
+
+  it("close button sets messageFlowOpen to false", () => {
+    useStore.setState({ messageFlowOpen: true });
+    render();
+    const closeBtn = screen.getByLabelText("Close message flow");
+    fireEvent.click(closeBtn);
+    expect(useStore.getState().messageFlowOpen).toBe(false);
+  });
+});
diff --git a/web/src/components/MessageFlowPanel.tsx b/web/src/components/MessageFlowPanel.tsx
new file mode 100644
index 00000000..8bdabcf1
--- /dev/null
+++ b/web/src/components/MessageFlowPanel.tsx
@@ -0,0 +1,262 @@
+import { useCallback, useEffect, useMemo, useRef, useState } from "react";
+import { useMessageFlow } from "../hooks/useMessageFlow";
+import { useStore } from "../store";
+import { ConnectorOverlay } from "./ConnectorOverlay";
+import { MessagePill } from "./MessagePill";
+
+export function MessageFlowPanel() {
+  const messageFlowOpen = useStore((s) => s.messageFlowOpen);
+  const setMessageFlowOpen = useStore((s) => s.setMessageFlowOpen);
+  const currentSessionId = useStore((s) => s.currentSessionId);
+  const { messages, paused, pendingCount, setPaused, clear } = useMessageFlow(currentSessionId);
+
+  const [hoveredId, setHoveredId] = useState(null);
+  const [filterTypes, setFilterTypes] = useState>(new Set());
+  const [autoScroll, setAutoScroll] = useState(true);
+  const [filterOpen, setFilterOpen] = useState(false);
+  const [panelWidth, setPanelWidth] = useState(560);
+  const containerRef = useRef(null);
+  const bottomRef = useRef(null);
+
+  const handleResizeMouseDown = useCallback(
+    (e: React.MouseEvent) => {
+      e.preventDefault();
+      const startX = e.clientX;
+      const startWidth = panelWidth;
+      const onMove = (me: MouseEvent) => {
+        const delta = startX - me.clientX; // drag left = grow
+        setPanelWidth(Math.max(360, Math.min(1200, startWidth + delta)));
+      };
+      const onUp = () => {
+        document.removeEventListener("mousemove", onMove);
+        document.removeEventListener("mouseup", onUp);
+        document.body.style.cursor = "";
+        document.body.style.userSelect = "";
+      };
+      document.body.style.cursor = "col-resize";
+      document.body.style.userSelect = "none";
+      document.addEventListener("mousemove", onMove);
+      document.addEventListener("mouseup", onUp);
+    },
+    [panelWidth],
+  );
+
+  // Escape to close
+  useEffect(() => {
+    if (!messageFlowOpen) return;
+    function onKeyDown(e: KeyboardEvent) {
+      if (e.key === "Escape") setMessageFlowOpen(false);
+    }
+    document.addEventListener("keydown", onKeyDown);
+    return () => document.removeEventListener("keydown", onKeyDown);
+  }, [messageFlowOpen, setMessageFlowOpen]);
+
+  // Auto-scroll
+  // biome-ignore lint/correctness/useExhaustiveDependencies: intentional — scroll on new messages
+  useEffect(() => {
+    if (autoScroll && !paused) bottomRef.current?.scrollIntoView({ behavior: "smooth" });
+  }, [messages.length, autoScroll, paused]);
+
+  const allTypes = useMemo(() => {
+    const types = new Set();
+    for (const m of messages) types.add(m.type);
+    return [...types].sort();
+  }, [messages]);
+
+  if (!messageFlowOpen || !currentSessionId) return null;
+
+  const filtered =
+    filterTypes.size > 0 ? messages.filter((m) => filterTypes.has(m.type)) : messages;
+
+  const hoveredMsg = hoveredId ? messages.find((m) => m.id === hoveredId) : null;
+  const pairedIdOfHovered = hoveredMsg?.pairedId ?? null;
+
+  function toggleFilter(type: string) {
+    setFilterTypes((prev) => {
+      const next = new Set(prev);
+      if (next.has(type)) next.delete(type);
+      else next.add(type);
+      return next;
+    });
+  }
+
+  return (
+    
+ {/* Resize handle on left edge */} + {/* biome-ignore lint/a11y/useSemanticElements: resize handle requires div */} + {/* biome-ignore lint/a11y/useFocusableInteractive: mouse-only resize handle */} +
+ {/* Header */} +
+ + MESSAGE FLOW + + + {/* Live/Paused badge */} + + +
+ {/* Filter dropdown */} +
+ + {filterOpen && ( +
+ {allTypes.map((type) => ( + + ))} + {allTypes.length === 0 && ( + + No messages yet + + )} +
+ )} +
+ + {/* Auto-scroll toggle */} + + + {/* Clear */} + + + {/* Close */} + +
+
+ + {/* Column headers */} +
+ + Outbound ↗ + + + + Inbound ↙ + +
+ + {/* Scrollable body */} +
+ + + {filtered.length === 0 ? ( +
+ Waiting for messages… +
+ ) : ( +
+ {filtered.map((msg) => { + const isDimmed = + hoveredId !== null && msg.id !== hoveredId && msg.id !== pairedIdOfHovered; + return ( +
+ {msg.direction === "out" ? ( + <> + setHoveredId(msg.id)} + onHoverEnd={() => setHoveredId(null)} + /> + + +{msg.timestamp} + +
+ + ) : ( + <> +
+ + +{msg.timestamp} + + setHoveredId(msg.id)} + onHoverEnd={() => setHoveredId(null)} + /> + + )} +
+ ); + })} +
+ )} +
+
+
+ ); +} diff --git a/web/src/components/MessagePill.test.tsx b/web/src/components/MessagePill.test.tsx new file mode 100644 index 00000000..f1896252 --- /dev/null +++ b/web/src/components/MessagePill.test.tsx @@ -0,0 +1,104 @@ +import { render, screen } from "@testing-library/react"; +import userEvent from "@testing-library/user-event"; +import { describe, expect, it, vi } from "vitest"; +import { type FlowMessage, MessagePill } from "./MessagePill"; + +function makeMessage(overrides: Partial = {}): FlowMessage { + return { + id: "msg-1", + direction: "out", + type: "assistant", + payload: { text: "hello" }, + timestamp: 1234, + wallTime: Date.now(), + ...overrides, + }; +} + +const noop = () => {}; + +describe("MessagePill", () => { + it('renders with data-flow-id for direction "out"', () => { + const msg = makeMessage({ direction: "out" }); + const { container } = render( + , + ); + const root = container.firstElementChild as HTMLElement; + expect(root.getAttribute("data-flow-id")).toBe("msg-1"); + }); + + it('shows ↙ arrow for direction "in"', () => { + const msg = makeMessage({ direction: "in" }); + render(); + expect(screen.getByText("↙")).toBeInTheDocument(); + }); + + it("shows ↗ arrow for direction out", () => { + const msg = makeMessage({ direction: "out" }); + render(); + expect(screen.getByText("↗")).toBeInTheDocument(); + }); + + it("applies opacity-30 class when dimmed", () => { + const msg = makeMessage(); + const { container } = render( + , + ); + const root = container.firstElementChild as HTMLElement; + expect(root.className).toContain("opacity-30"); + }); + + it("does not apply opacity-30 when not dimmed", () => { + const msg = makeMessage(); + const { container } = render( + , + ); + const root = container.firstElementChild as HTMLElement; + expect(root.className).not.toContain("opacity-30"); + }); + + it("toggles expand/collapse on click", async () => { + const user = userEvent.setup(); + const msg = makeMessage({ payload: { key: "value" } }); + render(); + + // Initially collapsed — toggle button shows [▾] + const toggle = screen.getByText("[▾]"); + expect(toggle).toBeInTheDocument(); + + await user.click(toggle); + // Now expanded — toggle shows [▴] + expect(screen.getByText("[▴]")).toBeInTheDocument(); + + await user.click(screen.getByText("[▴]")); + expect(screen.getByText("[▾]")).toBeInTheDocument(); + }); + + it("calls onHoverStart and onHoverEnd", async () => { + const user = userEvent.setup(); + const onHoverStart = vi.fn(); + const onHoverEnd = vi.fn(); + const msg = makeMessage(); + const { container } = render( + , + ); + const root = container.firstElementChild as HTMLElement; + + await user.hover(root); + expect(onHoverStart).toHaveBeenCalled(); + + await user.unhover(root); + expect(onHoverEnd).toHaveBeenCalled(); + }); + + it("displays timestamp formatted as +{n}ms", () => { + const msg = makeMessage({ timestamp: 5678 }); + render(); + expect(screen.getByText("+5678ms")).toBeInTheDocument(); + }); +}); diff --git a/web/src/components/MessagePill.tsx b/web/src/components/MessagePill.tsx new file mode 100644 index 00000000..a09fe2f1 --- /dev/null +++ b/web/src/components/MessagePill.tsx @@ -0,0 +1,87 @@ +import { useState } from "react"; + +export interface FlowMessage { + id: string; + direction: "out" | "in"; + type: string; + payload: unknown; + timestamp: number; + wallTime: number; + pairedId?: string; + groupIds?: string[]; +} + +const COLOR_MAP: Record = { + assistant: "#F59E0B", + stream_event: "#22D3EE", + tool_progress: "#14B8A6", + tool_use_summary: "#6EE7B7", + status_change: "#A78BFA", + permission_request: "#F97316", + result: "#84CC16", + cli_connected: "#94A3B8", + cli_disconnected: "#F87171", + error: "#EF4444", + user_message: "#F8FAFC", + message_queued: "#C084FC", + permission_response: "#FED7AA", + interrupt: "#EF4444", + slash_command: "#38BDF8", + queue_message: "#C084FC", + update_queued_message: "#E9D5FF", + cancel_queued_message: "#F87171", + adapter_drop: "#EF4444", +}; + +const DEFAULT_COLOR = "#71717A"; + +export function getColor(type: string): string { + return COLOR_MAP[type] ?? DEFAULT_COLOR; +} + +interface MessagePillProps { + message: FlowMessage; + dimmed: boolean; + onHoverStart: () => void; + onHoverEnd: () => void; +} + +export function MessagePill({ message, dimmed, onHoverStart, onHoverEnd }: MessagePillProps) { + const [expanded, setExpanded] = useState(false); + const color = getColor(message.type); + const payloadStr = JSON.stringify(message.payload); + const preview = payloadStr.length > 80 ? `${payloadStr.slice(0, 80)}…` : payloadStr; + + return ( + // biome-ignore lint/a11y/noStaticElementInteractions: pill is a visual dev tool element, not interactive UI +
+
+ + {message.type} + + +{message.timestamp}ms + {message.direction === "out" ? "↗" : "↙"} + +
+ {expanded ? ( +
+          {JSON.stringify(message.payload, null, 2)}
+        
+ ) : ( + {preview} + )} +
+ ); +} diff --git a/web/src/components/StatusBar.tsx b/web/src/components/StatusBar.tsx index f3a1b6f9..a9a7a9a2 100644 --- a/web/src/components/StatusBar.tsx +++ b/web/src/components/StatusBar.tsx @@ -320,6 +320,39 @@ function LogsButton() { ); } +// ── Message Flow Button ────────────────────────────────────────────────────── + +function MessageFlowButton() { + const messageFlowOpen = useStore((s) => s.messageFlowOpen); + const setMessageFlowOpen = useStore((s) => s.setMessageFlowOpen); + + return ( + + ); +} + // ── Status Bar ─────────────────────────────────────────────────────────────── export function StatusBar() { @@ -402,6 +435,7 @@ export function StatusBar() { )} + {adapterType && } diff --git a/web/src/hooks/useKeyboardShortcuts.ts b/web/src/hooks/useKeyboardShortcuts.ts index f66f5343..aa1488dc 100644 --- a/web/src/hooks/useKeyboardShortcuts.ts +++ b/web/src/hooks/useKeyboardShortcuts.ts @@ -59,6 +59,13 @@ export function useKeyboardShortcuts(): void { } } + // Alt+M: toggle message flow panel + if (e.altKey && (e.key === "m" || e.key === "M")) { + e.preventDefault(); + state.setMessageFlowOpen(!state.messageFlowOpen); + return; + } + // ?: open shortcuts modal (only when not typing in input) if (e.key === "?" && !meta && !e.altKey && !isInputFocused()) { e.preventDefault(); diff --git a/web/src/hooks/useMessageFlow.test.ts b/web/src/hooks/useMessageFlow.test.ts new file mode 100644 index 00000000..199d29f4 --- /dev/null +++ b/web/src/hooks/useMessageFlow.test.ts @@ -0,0 +1,162 @@ +import { act, renderHook } from "@testing-library/react"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { ConsumerMessage, InboundMessage } from "../../../shared/consumer-types"; +import { MAX_FLOW_MESSAGES, useMessageFlow } from "./useMessageFlow"; + +// Capture the listener callbacks registered via ws.ts exports +type InboundCb = (sessionId: string, msg: ConsumerMessage) => void; +type OutboundCb = (sessionId: string, msg: InboundMessage) => void; + +const { inboundListeners, outboundListeners } = vi.hoisted(() => ({ + inboundListeners: new Set(), + outboundListeners: new Set(), +})); + +vi.mock("../ws", () => ({ + addFlowInboundListener: (cb: InboundCb) => { + inboundListeners.add(cb); + return () => inboundListeners.delete(cb); + }, + addFlowOutboundListener: (cb: OutboundCb) => { + outboundListeners.add(cb); + return () => outboundListeners.delete(cb); + }, +})); + +const SESSION = "test-session"; + +function fireInbound(msg: Partial & { type: string }) { + for (const cb of inboundListeners) cb(SESSION, msg as ConsumerMessage); +} + +function fireOutbound(msg: Partial & { type: string }) { + for (const cb of outboundListeners) cb(SESSION, msg as InboundMessage); +} + +describe("useMessageFlow", () => { + beforeEach(() => { + inboundListeners.clear(); + outboundListeners.clear(); + vi.stubGlobal("crypto", { randomUUID: () => `uuid-${++uuidCounter}` }); + uuidCounter = 0; + }); + + let uuidCounter = 0; + + it("captures inbound and outbound messages", () => { + const { result } = renderHook(() => useMessageFlow(SESSION)); + expect(result.current.messages).toHaveLength(0); + + act(() => fireInbound({ type: "assistant" })); + expect(result.current.messages).toHaveLength(1); + expect(result.current.messages[0].direction).toBe("in"); + + act(() => fireOutbound({ type: "user_message" })); + expect(result.current.messages).toHaveLength(2); + expect(result.current.messages[1].direction).toBe("out"); + }); + + it("ring buffer caps at MAX_FLOW_MESSAGES", () => { + const { result } = renderHook(() => useMessageFlow(SESSION)); + + act(() => { + for (let i = 0; i < MAX_FLOW_MESSAGES + 1; i++) { + fireInbound({ type: "assistant" }); + } + }); + + expect(result.current.messages).toHaveLength(MAX_FLOW_MESSAGES); + // First message should have been evicted; second message should be first + expect(result.current.messages[0].id).toBe("uuid-2"); + }); + + it("pause/resume: messages during pause appear on resume", () => { + const { result } = renderHook(() => useMessageFlow(SESSION)); + + act(() => fireInbound({ type: "assistant" })); + expect(result.current.messages).toHaveLength(1); + + // Pause + act(() => result.current.setPaused(true)); + expect(result.current.paused).toBe(true); + + // Messages during pause go to pending + act(() => { + fireInbound({ type: "stream_event" }); + fireInbound({ type: "result" }); + }); + expect(result.current.messages).toHaveLength(1); + expect(result.current.pendingCount).toBe(2); + + // Resume flushes pending + act(() => result.current.setPaused(false)); + expect(result.current.paused).toBe(false); + expect(result.current.messages).toHaveLength(3); + expect(result.current.pendingCount).toBe(0); + }); + + it("permission_request/permission_response pairing sets pairedId", () => { + const { result } = renderHook(() => useMessageFlow(SESSION)); + + // Inbound permission_request + act(() => + fireInbound({ + type: "permission_request", + request: { id: "req-1", tool: "Bash", input: {} }, + } as unknown as ConsumerMessage), + ); + + const reqMsg = result.current.messages[0]; + expect(reqMsg.type).toBe("permission_request"); + + // Outbound permission_response + act(() => + fireOutbound({ + type: "permission_response", + request_id: "req-1", + allowed: true, + } as unknown as InboundMessage), + ); + + const resMsg = result.current.messages[1]; + expect(resMsg.type).toBe("permission_response"); + + // Both should be paired + expect(resMsg.pairedId).toBe(reqMsg.id); + expect(reqMsg.pairedId).toBe(resMsg.id); + }); + + it("ignores messages for other sessions", () => { + const { result } = renderHook(() => useMessageFlow(SESSION)); + + act(() => { + for (const cb of inboundListeners) + cb("other-session", { type: "assistant" } as ConsumerMessage); + }); + + expect(result.current.messages).toHaveLength(0); + }); + + it("clear() resets all state", () => { + const { result } = renderHook(() => useMessageFlow(SESSION)); + + act(() => { + fireInbound({ type: "assistant" }); + fireInbound({ type: "result" }); + }); + expect(result.current.messages).toHaveLength(2); + + act(() => result.current.clear()); + expect(result.current.messages).toHaveLength(0); + }); + + it("cleans up listeners on unmount", () => { + const { unmount } = renderHook(() => useMessageFlow(SESSION)); + expect(inboundListeners.size).toBe(1); + expect(outboundListeners.size).toBe(1); + + unmount(); + expect(inboundListeners.size).toBe(0); + expect(outboundListeners.size).toBe(0); + }); +}); diff --git a/web/src/hooks/useMessageFlow.ts b/web/src/hooks/useMessageFlow.ts new file mode 100644 index 00000000..fd570e5c --- /dev/null +++ b/web/src/hooks/useMessageFlow.ts @@ -0,0 +1,147 @@ +import { useCallback, useEffect, useRef, useState } from "react"; +import type { FlowMessage } from "../components/MessagePill"; +import { addFlowInboundListener, addFlowOutboundListener } from "../ws"; + +export const MAX_FLOW_MESSAGES = 500; + +interface UseMessageFlowResult { + messages: FlowMessage[]; + paused: boolean; + pendingCount: number; + setPaused: (v: boolean) => void; + clear: () => void; +} + +export function useMessageFlow(sessionId: string | null): UseMessageFlowResult { + const [, setTick] = useState(0); + const [paused, setPausedState] = useState(false); + + const bufferRef = useRef([]); + const pendingRef = useRef([]); + const pausedRef = useRef(false); + const sessionStartRef = useRef(null); + const pairingIndexRef = useRef(new Map()); + + const forceRender = useCallback(() => setTick((n) => n + 1), []); + + const appendToBuffer = useCallback((msg: FlowMessage) => { + const buf = bufferRef.current; + if (buf.length >= MAX_FLOW_MESSAGES) { + buf.shift(); + } + buf.push(msg); + }, []); + + const buildFlowMessage = useCallback( + (direction: "in" | "out", type: string, payload: unknown): FlowMessage => { + const now = Date.now(); + if (sessionStartRef.current === null) { + sessionStartRef.current = now; + } + return { + id: crypto.randomUUID(), + direction, + type, + payload, + wallTime: now, + timestamp: now - sessionStartRef.current, + }; + }, + [], + ); + + const handlePairing = useCallback((msg: FlowMessage) => { + const index = pairingIndexRef.current; + const p = msg.payload as Record; + + if (msg.direction === "out" && msg.type === "permission_response") { + const requestId = p.request_id as string | undefined; + if (requestId) { + const pairedFlowId = index.get(requestId); + if (pairedFlowId) { + msg.pairedId = pairedFlowId; + const partner = bufferRef.current.find((m) => m.id === pairedFlowId); + if (partner) partner.pairedId = msg.id; + // Also check pending + const pendingPartner = pendingRef.current.find((m) => m.id === pairedFlowId); + if (pendingPartner) pendingPartner.pairedId = msg.id; + } + } + } + + if (msg.direction === "in" && msg.type === "permission_request") { + const request = p.request as Record | undefined; + const requestId = request?.id as string | undefined; + if (requestId) { + index.set(requestId, msg.id); + } + } + }, []); + + const ingest = useCallback( + (msg: FlowMessage) => { + handlePairing(msg); + if (pausedRef.current) { + pendingRef.current.push(msg); + forceRender(); + } else { + appendToBuffer(msg); + forceRender(); + } + }, + [handlePairing, appendToBuffer, forceRender], + ); + + const setPaused = useCallback( + (v: boolean) => { + pausedRef.current = v; + setPausedState(v); + if (!v) { + // Flush pending into ring buffer + for (const msg of pendingRef.current) { + appendToBuffer(msg); + } + pendingRef.current = []; + forceRender(); + } + }, + [appendToBuffer, forceRender], + ); + + const clear = useCallback(() => { + bufferRef.current = []; + pendingRef.current = []; + sessionStartRef.current = null; + pairingIndexRef.current.clear(); + forceRender(); + }, [forceRender]); + + useEffect(() => { + if (!sessionId) return; + + const removeInbound = addFlowInboundListener((sid, msg) => { + if (sid !== sessionId) return; + const flowMsg = buildFlowMessage("in", msg.type, msg); + ingest(flowMsg); + }); + + const removeOutbound = addFlowOutboundListener((sid, msg) => { + if (sid !== sessionId) return; + const flowMsg = buildFlowMessage("out", msg.type, msg); + ingest(flowMsg); + }); + + return () => { + removeInbound(); + removeOutbound(); + }; + }, [sessionId, buildFlowMessage, ingest]); + + return { + messages: bufferRef.current, + paused, + pendingCount: pendingRef.current.length, + setPaused, + clear, + }; +} diff --git a/web/src/store.ts b/web/src/store.ts index 8418051e..9cf6ef22 100644 --- a/web/src/store.ts +++ b/web/src/store.ts @@ -106,6 +106,7 @@ export interface AppState { toasts: Toast[]; processLogs: Record; logDrawerOpen: boolean; + messageFlowOpen: boolean; // Actions setCurrentSession: (id: string) => void; @@ -124,6 +125,7 @@ export interface AppState { removeToast: (id: string) => void; appendProcessLog: (sessionId: string, line: string) => void; setLogDrawerOpen: (open: boolean) => void; + setMessageFlowOpen: (open: boolean) => void; // Session data actions ensureSessionData: (id: string) => void; @@ -296,6 +298,7 @@ export const useStore = create()((set, get) => ({ toasts: [], processLogs: {}, logDrawerOpen: false, + messageFlowOpen: false, setCurrentSession: (id) => set({ currentSessionId: id }), toggleSidebar: () => @@ -353,6 +356,7 @@ export const useStore = create()((set, get) => ({ }), setLogDrawerOpen: (open) => set({ logDrawerOpen: open }), + setMessageFlowOpen: (open) => set({ messageFlowOpen: open }), ensureSessionData: (id) => { if (!get().sessionData[id]) { diff --git a/web/src/ws.ts b/web/src/ws.ts index 1d75e772..0f73aabf 100644 --- a/web/src/ws.ts +++ b/web/src/ws.ts @@ -16,6 +16,22 @@ const reconnectState = new Map< >(); const MAX_RECONNECT_DELAY = 30_000; +// ── Message flow tap (dev panel) ────────────────────────────────────────── +type FlowInboundListener = (sessionId: string, msg: ConsumerMessage) => void; +type FlowOutboundListener = (sessionId: string, msg: InboundMessage) => void; +const flowInboundListeners = new Set(); +const flowOutboundListeners = new Set(); + +export function addFlowInboundListener(cb: FlowInboundListener): () => void { + flowInboundListeners.add(cb); + return () => flowInboundListeners.delete(cb); +} + +export function addFlowOutboundListener(cb: FlowOutboundListener): () => void { + flowOutboundListeners.add(cb); + return () => flowOutboundListeners.delete(cb); +} + // ── Streaming delta batching ────────────────────────────────────────────── // Coalesce rapid content_block_delta events into at most one Zustand set() // per animation frame, reducing re-renders from hundreds/s to ~60/s. @@ -113,6 +129,7 @@ function handleMessage(sessionId: string, data: string): void { // Guard: ensure parsed value is an object with a string `type` discriminant if (!parsed || typeof parsed !== "object" || !("type" in parsed)) return; const msg = parsed as ConsumerMessage; + for (const cb of flowInboundListeners) cb(sessionId, msg); store.ensureSessionData(sessionId); @@ -412,6 +429,10 @@ function handleMessage(sessionId: string, data: string): void { // Informational only — no store action needed break; + case "adapter_drop": + store.addMessage(sessionId, msg); + break; + default: { // Compile-time exhaustiveness guard; runtime remains forward-compatible. const _exhaustive: never = msg; @@ -531,6 +552,7 @@ export function send(message: InboundMessage, sessionId?: string): void { if (!targetId) return; const socket = connections.get(targetId); if (socket?.readyState === WebSocket.OPEN) { + for (const cb of flowOutboundListeners) cb(targetId, message); socket.send(JSON.stringify(message)); } } @@ -540,4 +562,6 @@ export function _resetForTesting(): void { disconnect(); pendingDeltas.clear(); flushScheduled = false; + flowInboundListeners.clear(); + flowOutboundListeners.clear(); } From 89f80401651408cce2964dbcfa34d786b43135b1 Mon Sep 17 00:00:00 2001 From: Teng Lin Date: Thu, 26 Feb 2026 11:37:12 -0500 Subject: [PATCH 2/8] feat: render thinking blocks during streaming and surface persisted output Fixes two UI rendering gaps discovered during live usage: - Thinking blocks now appear live during streaming (not just after completion) - Large tool output wrapped in is parsed and displayed with file path + preview --- web/src/components/AgentColumn.tsx | 7 +- web/src/components/AgentPane.tsx | 7 +- web/src/components/PersistedOutputBlock.tsx | 68 +++++++++++++++++++ .../components/StreamingIndicator.test.tsx | 20 ++++++ web/src/components/StreamingIndicator.tsx | 30 ++++++++ web/src/components/ToolResultBlock.test.tsx | 32 +++++++++ web/src/components/ToolResultBlock.tsx | 10 +++ web/src/store.test.ts | 35 ++++++++++ web/src/store.ts | 34 +++++++++- web/src/utils/persisted-output.test.ts | 40 +++++++++++ web/src/utils/persisted-output.ts | 44 ++++++++++++ web/src/ws.test.ts | 63 +++++++++++++++++ web/src/ws.ts | 39 ++++++++++- 13 files changed, 425 insertions(+), 4 deletions(-) create mode 100644 web/src/components/PersistedOutputBlock.tsx create mode 100644 web/src/utils/persisted-output.test.ts create mode 100644 web/src/utils/persisted-output.ts diff --git a/web/src/components/AgentColumn.tsx b/web/src/components/AgentColumn.tsx index 6dfcfea0..8e03f007 100644 --- a/web/src/components/AgentColumn.tsx +++ b/web/src/components/AgentColumn.tsx @@ -15,9 +15,14 @@ function AgentColumnStreamingIndicator({ agentId: string; }) { const stream = useStore((s) => s.sessionData[sessionId]?.agentStreaming[agentId]); - if (!stream?.text && !stream?.startedAt) return null; + if (!stream?.text && !stream?.thinking && !stream?.startedAt) return null; return (
+ {stream.thinking && ( +
+          {stream.thinking}
+        
+ )} {stream.text && }
diff --git a/web/src/components/AgentPane.tsx b/web/src/components/AgentPane.tsx index 854edcdd..5a086e9c 100644 --- a/web/src/components/AgentPane.tsx +++ b/web/src/components/AgentPane.tsx @@ -13,9 +13,14 @@ import { MessageBubble } from "./MessageBubble"; function AgentStreamingIndicator({ sessionId, agentId }: { sessionId: string; agentId: string }) { const stream = useStore((s) => s.sessionData[sessionId]?.agentStreaming[agentId]); - if (!stream?.text && !stream?.startedAt) return null; + if (!stream?.text && !stream?.thinking && !stream?.startedAt) return null; return (
+ {stream.thinking && ( +
+          {stream.thinking}
+        
+ )} {stream.text && }
diff --git a/web/src/components/PersistedOutputBlock.tsx b/web/src/components/PersistedOutputBlock.tsx new file mode 100644 index 00000000..72ed2fe7 --- /dev/null +++ b/web/src/components/PersistedOutputBlock.tsx @@ -0,0 +1,68 @@ +import { useState } from "react"; +import type { ParsedPersistedOutput } from "../utils/persisted-output"; +import { CopyButton } from "./CopyButton"; + +interface PersistedOutputBlockProps { + parsed: ParsedPersistedOutput; + isError?: boolean; +} + +const MAX_PREVIEW_LINES = 30; + +export function PersistedOutputBlock({ parsed, isError }: PersistedOutputBlockProps) { + const [expanded, setExpanded] = useState(false); + const lines = parsed.preview.split("\n"); + const truncated = lines.length > MAX_PREVIEW_LINES; + const displayed = expanded ? parsed.preview : lines.slice(0, MAX_PREVIEW_LINES).join("\n"); + + return ( +
+ {/* Header banner */} +
+ + + Output truncated ({parsed.size}) — saved to{" "} + + {parsed.filePath} + + +
+ + {/* Preview content */} +
+ +
+          {displayed}
+        
+ {truncated && !expanded && ( + + )} +
+
+ ); +} diff --git a/web/src/components/StreamingIndicator.test.tsx b/web/src/components/StreamingIndicator.test.tsx index 22990322..4e9f8d55 100644 --- a/web/src/components/StreamingIndicator.test.tsx +++ b/web/src/components/StreamingIndicator.test.tsx @@ -44,6 +44,26 @@ describe("StreamingIndicator", () => { expect(bold.tagName).toBe("STRONG"); }); + it("renders streaming thinking content above text", () => { + store().ensureSessionData(SESSION); + store().setStreamingStarted(SESSION, Date.now()); + store().appendStreamingThinking(SESSION, "Let me think about this"); + store().setStreaming(SESSION, "Here is my answer"); + render(); + expect(screen.getByText("Thinking...")).toBeInTheDocument(); + expect(screen.getByText("Let me think about this")).toBeInTheDocument(); + expect(screen.getByText(/Here is my answer/)).toBeInTheDocument(); + }); + + it("renders thinking block without text content", () => { + store().ensureSessionData(SESSION); + store().setStreamingStarted(SESSION, Date.now()); + store().setSessionStatus(SESSION, "running"); + store().appendStreamingThinking(SESSION, "Pondering..."); + render(); + expect(screen.getByText("Pondering...")).toBeInTheDocument(); + }); + it("displays token count when available", () => { store().ensureSessionData(SESSION); store().setStreamingStarted(SESSION, Date.now()); diff --git a/web/src/components/StreamingIndicator.tsx b/web/src/components/StreamingIndicator.tsx index 0e19400f..97b28dbd 100644 --- a/web/src/components/StreamingIndicator.tsx +++ b/web/src/components/StreamingIndicator.tsx @@ -46,6 +46,7 @@ function useElapsed(startedAt: number | null): number | null { export function StreamingIndicator({ sessionId }: StreamingIndicatorProps) { const streaming = useStore((s) => s.sessionData[sessionId]?.streaming ?? null); + const streamingThinking = useStore((s) => s.sessionData[sessionId]?.streamingThinking ?? null); const streamingStartedAt = useStore((s) => s.sessionData[sessionId]?.streamingStartedAt ?? null); const streamingOutputTokens = useStore( (s) => s.sessionData[sessionId]?.streamingOutputTokens ?? 0, @@ -90,6 +91,35 @@ export function StreamingIndicator({ sessionId }: StreamingIndicatorProps) { return (
+ {streamingThinking && ( +
+
+ + Thinking... +
+
+            {streamingThinking}
+          
+
+ )} {streaming && }
diff --git a/web/src/components/ToolResultBlock.test.tsx b/web/src/components/ToolResultBlock.test.tsx index fb9dc5b6..17073f50 100644 --- a/web/src/components/ToolResultBlock.test.tsx +++ b/web/src/components/ToolResultBlock.test.tsx @@ -245,4 +245,36 @@ describe("ToolResultBlock", () => { expect(pre?.className).toContain("text-bc-error"); }); }); + + describe("persisted output", () => { + const persistedContent = ` +Output too large (57.8KB). Full output saved to: /tmp/output.txt + +Preview (first 2KB): +line 1 of preview +line 2 of preview +... +`; + + it("renders persisted output banner for Bash tool", () => { + render(); + expect(screen.getByText(/Output truncated/)).toBeInTheDocument(); + expect(screen.getByText("/tmp/output.txt")).toBeInTheDocument(); + }); + + it("renders preview content", () => { + render(); + expect(screen.getByText(/line 1 of preview/)).toBeInTheDocument(); + }); + + it("works for any tool name", () => { + render(); + expect(screen.getByText(/Output truncated/)).toBeInTheDocument(); + }); + + it("falls through to normal rendering for non-persisted content", () => { + render(); + expect(screen.queryByText(/Output truncated/)).not.toBeInTheDocument(); + }); + }); }); diff --git a/web/src/components/ToolResultBlock.tsx b/web/src/components/ToolResultBlock.tsx index bab8792f..24efc77a 100644 --- a/web/src/components/ToolResultBlock.tsx +++ b/web/src/components/ToolResultBlock.tsx @@ -1,10 +1,12 @@ import { useState } from "react"; import type { ConsumerContentBlock } from "../../../shared/consumer-types"; import { stripAnsi } from "../utils/ansi-strip"; +import { isPersistedOutput, parsePersistedOutput } from "../utils/persisted-output"; import { truncateLines } from "../utils/truncate"; import { containsUnifiedDiff } from "../utils/unified-diff"; import { CopyButton } from "./CopyButton"; import { MarkdownContent } from "./MarkdownContent"; +import { PersistedOutputBlock } from "./PersistedOutputBlock"; import { UnifiedDiffBlock } from "./UnifiedDiffBlock"; interface ToolResultBlockProps { @@ -180,6 +182,14 @@ function renderContent( content: string | ConsumerContentBlock[], isError?: boolean, ): React.ReactNode { + // Check for persisted-output wrapper before tool-specific rendering + if (typeof content === "string" && isPersistedOutput(text)) { + const parsed = parsePersistedOutput(text); + if (parsed) { + return ; + } + } + switch (toolName) { case "Bash": { const stripped = stripAnsi(text); diff --git a/web/src/store.test.ts b/web/src/store.test.ts index 682a893d..790a5686 100644 --- a/web/src/store.test.ts +++ b/web/src/store.test.ts @@ -442,11 +442,31 @@ describe("store", () => { }); }); + describe("streaming thinking", () => { + it("appendStreamingThinking accumulates thinking text", () => { + store().appendStreamingThinking(SESSION_ID, "Let me"); + store().appendStreamingThinking(SESSION_ID, " think..."); + expect(store().sessionData[SESSION_ID].streamingThinking).toBe("Let me think..."); + }); + + it("appendStreamingThinking initializes from null", () => { + store().appendStreamingThinking(SESSION_ID, "first"); + expect(store().sessionData[SESSION_ID].streamingThinking).toBe("first"); + }); + + it("clearStreaming also clears streamingThinking", () => { + store().appendStreamingThinking(SESSION_ID, "thinking"); + store().clearStreaming(SESSION_ID); + expect(store().sessionData[SESSION_ID].streamingThinking).toBeNull(); + }); + }); + describe("agent streaming", () => { it("initAgentStreaming creates entry", () => { store().initAgentStreaming(SESSION_ID, "agent-1"); const entry = store().sessionData[SESSION_ID].agentStreaming["agent-1"]; expect(entry.text).toBe(""); + expect(entry.thinking).toBeNull(); expect(entry.startedAt).toBeTypeOf("number"); expect(entry.outputTokens).toBe(0); }); @@ -492,6 +512,21 @@ describe("store", () => { expect(store().sessionData[SESSION_ID].agentStreaming["agent-1"]).toBeUndefined(); expect(store().sessionData[SESSION_ID].agentStreaming["agent-2"].text).toBe("a2"); }); + + it("appendAgentStreamingThinking accumulates thinking", () => { + store().initAgentStreaming(SESSION_ID, "agent-1"); + store().appendAgentStreamingThinking(SESSION_ID, "agent-1", "thought"); + expect(store().sessionData[SESSION_ID].agentStreaming["agent-1"].thinking).toBe("thought"); + }); + + it("appendAgentStreaming preserves thinking when text appended", () => { + store().initAgentStreaming(SESSION_ID, "agent-1"); + store().appendAgentStreamingThinking(SESSION_ID, "agent-1", "thought"); + store().appendAgentStreaming(SESSION_ID, "agent-1", "text"); + const entry = store().sessionData[SESSION_ID].agentStreaming["agent-1"]; + expect(entry.thinking).toBe("thought"); + expect(entry.text).toBe("text"); + }); }); describe("identity", () => { diff --git a/web/src/store.ts b/web/src/store.ts index 9cf6ef22..83061688 100644 --- a/web/src/store.ts +++ b/web/src/store.ts @@ -36,6 +36,7 @@ export interface SessionIdentity { export interface SessionData { messages: ConsumerMessage[]; streaming: string | null; + streamingThinking: string | null; streamingStartedAt: number | null; streamingOutputTokens: number; streamingBlocks: ConsumerContentBlock[]; @@ -55,6 +56,7 @@ export interface SessionData { string, { text: string | null; + thinking: string | null; startedAt: number | null; outputTokens: number; } @@ -133,6 +135,7 @@ export interface AppState { setMessages: (sessionId: string, messages: ConsumerMessage[]) => void; setStreaming: (sessionId: string, text: string | null) => void; appendStreaming: (sessionId: string, delta: string) => void; + appendStreamingThinking: (sessionId: string, delta: string) => void; setStreamingStarted: (sessionId: string, ts: number | null) => void; setStreamingOutputTokens: (sessionId: string, count: number) => void; setStreamingBlocks: (sessionId: string, blocks: ConsumerContentBlock[]) => void; @@ -169,6 +172,7 @@ export interface AppState { ) => void; initAgentStreaming: (sessionId: string, agentId: string) => void; appendAgentStreaming: (sessionId: string, agentId: string, delta: string) => void; + appendAgentStreamingThinking: (sessionId: string, agentId: string, delta: string) => void; setAgentStreamingOutputTokens: (sessionId: string, agentId: string, count: number) => void; clearAgentStreaming: (sessionId: string, agentId: string) => void; setQueuedMessage: (sessionId: string, msg: SessionData["queuedMessage"]) => void; @@ -194,6 +198,7 @@ function emptySessionData(): SessionData { return { messages: [], streaming: null, + streamingThinking: null, streamingStartedAt: null, streamingOutputTokens: 0, streamingBlocks: [], @@ -393,6 +398,14 @@ export const useStore = create()((set, get) => ({ return patchSession(s, sessionId, { streaming: (data.streaming ?? "") + delta }); }), + appendStreamingThinking: (sessionId, delta) => + set((s) => { + const data = s.sessionData[sessionId] ?? emptySessionData(); + return patchSession(s, sessionId, { + streamingThinking: (data.streamingThinking ?? "") + delta, + }); + }), + setStreamingStarted: (sessionId, ts) => set((s) => patchSession(s, sessionId, { streamingStartedAt: ts })), @@ -406,6 +419,7 @@ export const useStore = create()((set, get) => ({ set((s) => patchSession(s, sessionId, { streaming: null, + streamingThinking: null, streamingStartedAt: null, streamingOutputTokens: 0, streamingBlocks: [], @@ -475,7 +489,7 @@ export const useStore = create()((set, get) => ({ return patchSession(s, sessionId, { agentStreaming: { ...data.agentStreaming, - [agentId]: { text: "", startedAt: Date.now(), outputTokens: 0 }, + [agentId]: { text: "", thinking: null, startedAt: Date.now(), outputTokens: 0 }, }, }); }), @@ -489,6 +503,24 @@ export const useStore = create()((set, get) => ({ ...data.agentStreaming, [agentId]: { text: (current?.text ?? "") + delta, + thinking: current?.thinking ?? null, + startedAt: current?.startedAt ?? null, + outputTokens: current?.outputTokens ?? 0, + }, + }, + }); + }), + + appendAgentStreamingThinking: (sessionId, agentId, delta) => + set((s) => { + const data = s.sessionData[sessionId] ?? emptySessionData(); + const current = data.agentStreaming[agentId]; + return patchSession(s, sessionId, { + agentStreaming: { + ...data.agentStreaming, + [agentId]: { + text: current?.text ?? null, + thinking: (current?.thinking ?? "") + delta, startedAt: current?.startedAt ?? null, outputTokens: current?.outputTokens ?? 0, }, diff --git a/web/src/utils/persisted-output.test.ts b/web/src/utils/persisted-output.test.ts new file mode 100644 index 00000000..821de313 --- /dev/null +++ b/web/src/utils/persisted-output.test.ts @@ -0,0 +1,40 @@ +import { describe, expect, it } from "vitest"; +import { isPersistedOutput, parsePersistedOutput } from "./persisted-output"; + +const SAMPLE = ` +Output too large (57.8KB). Full output saved to: /tmp/output.txt + +Preview (first 2KB): +line 1 of preview +line 2 of preview +... +`; + +describe("isPersistedOutput", () => { + it("returns true for persisted-output wrapper", () => { + expect(isPersistedOutput(SAMPLE)).toBe(true); + }); + + it("returns false for plain text", () => { + expect(isPersistedOutput("just some output")).toBe(false); + }); + + it("returns false for empty string", () => { + expect(isPersistedOutput("")).toBe(false); + }); +}); + +describe("parsePersistedOutput", () => { + it("extracts size, filePath, and preview", () => { + const result = parsePersistedOutput(SAMPLE); + expect(result).not.toBeNull(); + expect(result!.size).toBe("57.8KB"); + expect(result!.filePath).toBe("/tmp/output.txt"); + expect(result!.preview).toContain("line 1 of preview"); + expect(result!.preview).toContain("line 2 of preview"); + }); + + it("returns null for non-matching text", () => { + expect(parsePersistedOutput("regular output")).toBeNull(); + }); +}); diff --git a/web/src/utils/persisted-output.ts b/web/src/utils/persisted-output.ts new file mode 100644 index 00000000..4ad2b533 --- /dev/null +++ b/web/src/utils/persisted-output.ts @@ -0,0 +1,44 @@ +export interface ParsedPersistedOutput { + /** Human-readable size string, e.g. "57.8KB" */ + size: string; + /** File path where full output was saved */ + filePath: string; + /** The truncated preview content */ + preview: string; +} + +const PERSISTED_OUTPUT_RE = /^\s*\n/; + +/** + * Returns true if the text is wrapped in `` tags. + */ +export function isPersistedOutput(text: string): boolean { + return PERSISTED_OUTPUT_RE.test(text); +} + +/** + * Parse a persisted-output block into its components. + * Returns null if the text doesn't match the expected format. + */ +export function parsePersistedOutput(text: string): ParsedPersistedOutput | null { + if (!isPersistedOutput(text)) return null; + + // Strip the wrapper tags + const inner = text + .replace(/^\s*\n/, "") + .replace(/\n<\/persisted-output>\s*$/, ""); + + // Parse header line: "Output too large (57.8KB). Full output saved to: /path/to/file.txt" + const headerMatch = inner.match( + /^Output too large \(([^)]+)\)\.\s*Full output saved to:\s*(.+)/m, + ); + + const size = headerMatch?.[1] ?? "unknown"; + const filePath = headerMatch?.[2]?.trim() ?? ""; + + // Extract preview content (everything after "Preview (first ...):\n") + const previewMatch = inner.match(/Preview \(first [^)]*\):\s*\n([\s\S]*?)(?:\n\.\.\.\s*$|$)/); + const preview = previewMatch?.[1] ?? ""; + + return { size, filePath, preview }; +} diff --git a/web/src/ws.test.ts b/web/src/ws.test.ts index e5a518f3..a4fad18d 100644 --- a/web/src/ws.test.ts +++ b/web/src/ws.test.ts @@ -634,6 +634,69 @@ describe("handleMessage", () => { expect(getSessionData()?.agentStreaming?.["agent-1"]?.text).toBe("agent text"); }); + // ── stream_event → content_block_delta (thinking) ───────────────────── + + it("stream_event thinking_delta: buffers thinking to main streaming", () => { + const ws = openSession(); + useStore.getState().setStreaming("s1", ""); + useStore.getState().setStreamingStarted("s1", Date.now()); + + ws.simulateMessage( + JSON.stringify({ + type: "stream_event", + event: { + type: "content_block_delta", + delta: { type: "thinking_delta", thinking: "Let me think" }, + }, + parent_tool_use_id: null, + }), + ); + + flushDeltas(); + expect(getSessionData()?.streamingThinking).toBe("Let me think"); + }); + + it("stream_event thinking_delta with agent: appends to agent thinking", () => { + const ws = openSession(); + useStore.getState().initAgentStreaming("s1", "agent-1"); + + ws.simulateMessage( + JSON.stringify({ + type: "stream_event", + event: { + type: "content_block_delta", + delta: { type: "thinking_delta", thinking: "agent thought" }, + }, + parent_tool_use_id: "agent-1", + }), + ); + + flushDeltas(); + expect(getSessionData()?.agentStreaming?.["agent-1"]?.thinking).toBe("agent thought"); + }); + + it("stream_event thinking_delta auto-inits streaming if needed", () => { + const ws = openSession(); + + ws.simulateMessage( + JSON.stringify({ + type: "stream_event", + event: { + type: "content_block_delta", + delta: { type: "thinking_delta", thinking: "thought" }, + }, + parent_tool_use_id: null, + }), + ); + + // Streaming should have been auto-initialized + expect(getSessionData()?.streaming).toBe(""); + expect(getSessionData()?.sessionStatus).toBe("running"); + + flushDeltas(); + expect(getSessionData()?.streamingThinking).toBe("thought"); + }); + // ── stream_event → message_delta ──────────────────────────────────────── it("stream_event message_delta: sets output tokens", () => { diff --git a/web/src/ws.ts b/web/src/ws.ts index 0f73aabf..a8914361 100644 --- a/web/src/ws.ts +++ b/web/src/ws.ts @@ -39,8 +39,12 @@ export function addFlowOutboundListener(cb: FlowOutboundListener): () => void { interface PendingDelta { /** Accumulated text for the main session streaming. */ main: string; + /** Accumulated thinking for the main session streaming. */ + mainThinking: string; /** Accumulated text per agent sub-stream. */ agents: Record; + /** Accumulated thinking per agent sub-stream. */ + agentsThinking: Record; } const pendingDeltas = new Map(); @@ -64,16 +68,22 @@ export function flushDeltas(): void { if (delta.main) { store.appendStreaming(sessionId, delta.main); } + if (delta.mainThinking) { + store.appendStreamingThinking(sessionId, delta.mainThinking); + } for (const [agentId, text] of Object.entries(delta.agents)) { store.appendAgentStreaming(sessionId, agentId, text); } + for (const [agentId, thinking] of Object.entries(delta.agentsThinking)) { + store.appendAgentStreamingThinking(sessionId, agentId, thinking); + } } } function bufferStreamingDelta(sessionId: string, agentId: string | null, text: string): void { let entry = pendingDeltas.get(sessionId); if (!entry) { - entry = { main: "", agents: {} }; + entry = { main: "", mainThinking: "", agents: {}, agentsThinking: {} }; pendingDeltas.set(sessionId, entry); } if (agentId) { @@ -84,6 +94,24 @@ function bufferStreamingDelta(sessionId: string, agentId: string | null, text: s scheduleDeltaFlush(); } +function bufferStreamingThinkingDelta( + sessionId: string, + agentId: string | null, + thinking: string, +): void { + let entry = pendingDeltas.get(sessionId); + if (!entry) { + entry = { main: "", mainThinking: "", agents: {}, agentsThinking: {} }; + pendingDeltas.set(sessionId, entry); + } + if (agentId) { + entry.agentsThinking[agentId] = (entry.agentsThinking[agentId] ?? "") + thinking; + } else { + entry.mainThinking += thinking; + } + scheduleDeltaFlush(); +} + function getConsumerId(): string { const key = "beamcode_consumer_id"; let id = localStorage.getItem(key); @@ -230,6 +258,15 @@ function handleMessage(sessionId: string, data: string): void { store.setSessionStatus(sessionId, "running"); } bufferStreamingDelta(sessionId, agentId, delta.text); + } else if (delta?.type === "thinking_delta" && delta.thinking) { + // Auto-init streaming if no message_start was received + const sd = useStore.getState().sessionData[sessionId]; + if (!agentId && sd?.streaming === null) { + store.setStreamingStarted(sessionId, Date.now()); + store.setStreaming(sessionId, ""); + store.setSessionStatus(sessionId, "running"); + } + bufferStreamingThinkingDelta(sessionId, agentId, delta.thinking); } break; } From 7442ac954e0095add43113a67021793be4801405 Mon Sep 17 00:00:00 2001 From: Teng Lin Date: Thu, 26 Feb 2026 12:07:38 -0500 Subject: [PATCH 3/8] feat: add translation boundary visualization and fix build errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add translation_event ConsumerMessage type (T1–T4 boundary markers) - Emit EMIT_TRANSLATION effect in session-reducer for user_message and assistant/result messages, surfacing normalizeInbound and mapAssistantMessage translation boundaries - Add EMIT_TRANSLATION effect type and executor case - Handle translation_event in ws.ts switch and useMessageFlow hook, populating boundary metadata on FlowMessage for detailed panel view - Add detailLevel prop to MessagePill for compact/detailed display toggle; detailed mode shows translation boundary badge and native format inspector - Add T1→T4 toggle button in MessageFlowPanel header - Fix MessagePill.test.tsx: add missing detailLevel="compact" to all renders - Fix ws.ts exhaustive switch: add translation_event case alongside adapter_drop --- shared/consumer-types.ts | 10 +++ src/core/session/effect-executor.ts | 5 ++ src/core/session/effect-types.ts | 4 +- src/core/session/session-reducer.ts | 56 ++++++++++++++-- .../session/session-state-reducer.test.ts | 7 +- src/types/consumer-messages.ts | 10 +++ web/src/components/MessageFlowPanel.tsx | 13 ++++ web/src/components/MessagePill.test.tsx | 65 +++++++++++++++++-- web/src/components/MessagePill.tsx | 53 ++++++++++++++- web/src/hooks/useMessageFlow.ts | 27 +++++++- web/src/ws.ts | 1 + 11 files changed, 232 insertions(+), 19 deletions(-) diff --git a/shared/consumer-types.ts b/shared/consumer-types.ts index 343dff39..4b356db2 100644 --- a/shared/consumer-types.ts +++ b/shared/consumer-types.ts @@ -320,6 +320,16 @@ export type ConsumerMessage = reason: string; dropped_type: string; dropped_metadata?: Record; + } + | { + type: "translation_event"; + boundary: "T1" | "T2" | "T3" | "T4"; + translator: string; + from: { format: string; body: unknown }; + to: { format: string; body: unknown }; + traceId?: string; + timestamp: number; + sessionId: string; }; // ── Inbound Messages (consumer → bridge) ──────────────────────────────────── diff --git a/src/core/session/effect-executor.ts b/src/core/session/effect-executor.ts index 438efda8..82492e4d 100644 --- a/src/core/session/effect-executor.ts +++ b/src/core/session/effect-executor.ts @@ -94,6 +94,11 @@ export function executeEffects( case "SEND_TO_CONSUMER": deps.broadcaster.sendTo(effect.ws, effect.message); break; + + case "EMIT_TRANSLATION": + // Forward translation event to consumers for message flow panel visualization + deps.broadcaster.broadcast(session, effect.event); + break; } } catch (err) { // A failing effect must never abort subsequent effects. diff --git a/src/core/session/effect-types.ts b/src/core/session/effect-types.ts index 21efd2e0..f4ec7901 100644 --- a/src/core/session/effect-types.ts +++ b/src/core/session/effect-types.ts @@ -39,4 +39,6 @@ export type Effect = /** Resolve git info for the session (after seeding cwd). */ | { type: "RESOLVE_GIT_INFO" } /** Send a targeted message to a specific consumer WebSocket. */ - | { type: "SEND_TO_CONSUMER"; ws: WebSocketLike; message: ConsumerMessage }; + | { type: "SEND_TO_CONSUMER"; ws: WebSocketLike; message: ConsumerMessage } + /** Emit a translation event for message flow panel visualization (dev tool). */ + | { type: "EMIT_TRANSLATION"; event: ConsumerMessage }; diff --git a/src/core/session/session-reducer.ts b/src/core/session/session-reducer.ts index 1979e6b8..93af3502 100644 --- a/src/core/session/session-reducer.ts +++ b/src/core/session/session-reducer.ts @@ -588,12 +588,13 @@ function reduceInboundCommand( ); // Normalize message for backend send (pure — no I/O). - const baseUnified = normalizeInbound({ - type: "user_message", + const inboundMsg = { + type: "user_message" as const, content: command.content, session_id: data.backendSessionId || command.session_id || "", images: command.images, - }); + }; + const baseUnified = normalizeInbound(inboundMsg); if (!baseUnified) return [data, []]; // Apply slash passthrough trace context when present (always a complete group). @@ -610,6 +611,21 @@ function reduceInboundCommand( } : baseUnified; + // Emit T1 translation event for message flow panel + const t1Event: Effect = { + type: "EMIT_TRANSLATION", + event: { + type: "translation_event", + boundary: "T1", + translator: "normalizeInbound", + from: { format: "InboundMessage", body: inboundMsg }, + to: { format: "UnifiedMessage", body: unified }, + traceId: unified.metadata.trace_id as string | undefined, + timestamp: Date.now(), + sessionId: data.state.session_id, + }, + }; + const isConnected = data.lifecycle === "active" || data.lifecycle === "idle"; if (isConnected) { @@ -624,6 +640,7 @@ function reduceInboundCommand( [ { type: "BROADCAST", message: userMsg }, { type: "PERSIST_NOW" }, + t1Event, { type: "SEND_TO_BACKEND", message: unified }, ], ]; @@ -644,7 +661,7 @@ function reduceInboundCommand( messageHistory: nextHistory, pendingMessages: [...data.pendingMessages, unified], }, - [{ type: "BROADCAST", message: userMsg }, { type: "PERSIST_NOW" }], + [{ type: "BROADCAST", message: userMsg }, { type: "PERSIST_NOW" }, t1Event], ]; } @@ -824,6 +841,20 @@ function buildEffects( if (nextData.messageHistory !== prevData.messageHistory) { const mapped = mapAssistantMessage(message); if (mapped.type === "assistant") { + // Emit T4 translation event + effects.push({ + type: "EMIT_TRANSLATION", + event: { + type: "translation_event", + boundary: "T4", + translator: "mapAssistantMessage", + from: { format: "UnifiedMessage", body: message }, + to: { format: "ConsumerMessage", body: mapped }, + traceId: message.metadata?.trace_id as string | undefined, + timestamp: Date.now(), + sessionId: prevData.state.session_id, + }, + }); effects.push({ type: "BROADCAST", message: mapped }); } } @@ -831,7 +862,22 @@ function buildEffects( } case "result": { - effects.push({ type: "BROADCAST", message: mapResultMessage(message) }); + const resultMsg = mapResultMessage(message); + // Emit T4 translation event + effects.push({ + type: "EMIT_TRANSLATION", + event: { + type: "translation_event", + boundary: "T4", + translator: "mapResultMessage", + from: { format: "UnifiedMessage", body: message }, + to: { format: "ConsumerMessage", body: resultMsg }, + traceId: message.metadata?.trace_id as string | undefined, + timestamp: Date.now(), + sessionId: prevData.state.session_id, + }, + }); + effects.push({ type: "BROADCAST", message: resultMsg }); effects.push({ type: "AUTO_SEND_QUEUED" }); // Emit first-turn completion event when num_turns reaches 1 const numTurns = message.metadata?.num_turns as number | undefined; diff --git a/src/core/session/session-state-reducer.test.ts b/src/core/session/session-state-reducer.test.ts index 6f0f9389..88def159 100644 --- a/src/core/session/session-state-reducer.test.ts +++ b/src/core/session/session-state-reducer.test.ts @@ -671,15 +671,16 @@ describe("sessionReducer INBOUND_COMMAND", () => { expect(next.lastStatus).toBe("running"); expect(next.messageHistory).toHaveLength(1); expect(next.messageHistory[0]).toMatchObject({ type: "user_message", content: "hi" }); - // Reducer now produces 3 effects: BROADCAST, PERSIST_NOW, and SEND_TO_BACKEND + // Reducer now produces 4 effects: BROADCAST, PERSIST_NOW, EMIT_TRANSLATION, and SEND_TO_BACKEND // (backend connected path — lifecycle is "active") - expect(effects).toHaveLength(3); + expect(effects).toHaveLength(4); expect(effects[0]).toMatchObject({ type: "BROADCAST", message: { type: "user_message", content: "hi" }, }); expect(effects[1]).toMatchObject({ type: "PERSIST_NOW" }); - expect(effects[2]).toMatchObject({ type: "SEND_TO_BACKEND" }); + expect(effects[2]).toMatchObject({ type: "EMIT_TRANSLATION" }); + expect(effects[3]).toMatchObject({ type: "SEND_TO_BACKEND" }); }); it("returns data unchanged and empty effects for user_message when lifecycle is closing", () => { diff --git a/src/types/consumer-messages.ts b/src/types/consumer-messages.ts index 5bab836e..feccbe13 100644 --- a/src/types/consumer-messages.ts +++ b/src/types/consumer-messages.ts @@ -243,4 +243,14 @@ export type ConsumerMessage = reason: string; dropped_type: string; dropped_metadata?: Record; + } + | { + type: "translation_event"; + boundary: "T1" | "T2" | "T3" | "T4"; + translator: string; + from: { format: string; body: unknown }; + to: { format: string; body: unknown }; + traceId?: string; + timestamp: number; + sessionId: string; }; diff --git a/web/src/components/MessageFlowPanel.tsx b/web/src/components/MessageFlowPanel.tsx index 8bdabcf1..eabde485 100644 --- a/web/src/components/MessageFlowPanel.tsx +++ b/web/src/components/MessageFlowPanel.tsx @@ -15,6 +15,7 @@ export function MessageFlowPanel() { const [autoScroll, setAutoScroll] = useState(true); const [filterOpen, setFilterOpen] = useState(false); const [panelWidth, setPanelWidth] = useState(560); + const [detailLevel, setDetailLevel] = useState<"compact" | "detailed">("compact"); const containerRef = useRef(null); const bottomRef = useRef(null); @@ -158,6 +159,16 @@ export function MessageFlowPanel() { )}
+ {/* Detail level toggle */} + + {/* Auto-scroll toggle */} + {boundaryExpanded && ( +
+
+                {JSON.stringify(message.nativeFormat.body, null, 2).slice(0, 500)}
+                {JSON.stringify(message.nativeFormat.body).length > 500 && "…"}
+              
+ +
+ )} +
+ )}
); } diff --git a/web/src/hooks/useMessageFlow.ts b/web/src/hooks/useMessageFlow.ts index fd570e5c..b574795f 100644 --- a/web/src/hooks/useMessageFlow.ts +++ b/web/src/hooks/useMessageFlow.ts @@ -121,8 +121,31 @@ export function useMessageFlow(sessionId: string | null): UseMessageFlowResult { const removeInbound = addFlowInboundListener((sid, msg) => { if (sid !== sessionId) return; - const flowMsg = buildFlowMessage("in", msg.type, msg); - ingest(flowMsg); + + // Handle translation events separately to populate boundary metadata + if (msg.type === "translation_event") { + const evt = msg as { + type: "translation_event"; + boundary: "T1" | "T2" | "T3" | "T4"; + translator: string; + from: { format: string; body: unknown }; + to: { format: string; body: unknown }; + traceId?: string; + timestamp: number; + sessionId: string; + }; + const direction = evt.boundary === "T1" || evt.boundary === "T2" ? "out" : "in"; + const flowMsg = buildFlowMessage(direction, evt.boundary, evt.to.body); + flowMsg.boundary = evt.boundary; + flowMsg.translator = evt.translator; + flowMsg.nativeFormat = evt.from; + flowMsg.traceId = evt.traceId; + ingest(flowMsg); + } else { + // Regular consumer message + const flowMsg = buildFlowMessage("in", msg.type, msg); + ingest(flowMsg); + } }); const removeOutbound = addFlowOutboundListener((sid, msg) => { diff --git a/web/src/ws.ts b/web/src/ws.ts index a8914361..4dded04a 100644 --- a/web/src/ws.ts +++ b/web/src/ws.ts @@ -467,6 +467,7 @@ function handleMessage(sessionId: string, data: string): void { break; case "adapter_drop": + case "translation_event": store.addMessage(sessionId, msg); break; From 26c54ba03a7a73668444402bcabca95e42d4a25f Mon Sep 17 00:00:00 2001 From: Teng Lin Date: Thu, 26 Feb 2026 12:36:30 -0500 Subject: [PATCH 4/8] fix: coerce null traceColor to undefined for React CSSProperties compatibility --- web/src/components/MessagePill.tsx | 52 ++++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/web/src/components/MessagePill.tsx b/web/src/components/MessagePill.tsx index 1aefdce2..e4ac8e34 100644 --- a/web/src/components/MessagePill.tsx +++ b/web/src/components/MessagePill.tsx @@ -45,10 +45,38 @@ export function getColor(type: string): string { return COLOR_MAP[type] ?? DEFAULT_COLOR; } +/** + * Generate a consistent color from a traceId for visual correlation. + * Uses a simple hash to pick from a palette of distinct colors. + */ +function getTraceColor(traceId: string): string { + const colors = [ + "#22D3EE", // cyan + "#F59E0B", // amber + "#A78BFA", // purple + "#FB923C", // orange + "#34D399", // emerald + "#F472B6", // pink + "#FACC15", // yellow + "#60A5FA", // blue + "#FB7185", // rose + "#A3E635", // lime + ]; + + let hash = 0; + for (let i = 0; i < traceId.length; i++) { + hash = (hash << 5) - hash + traceId.charCodeAt(i); + hash = hash & hash; // Convert to 32bit integer + } + + return colors[Math.abs(hash) % colors.length]; +} + interface MessagePillProps { message: FlowMessage; detailLevel: "compact" | "detailed"; dimmed: boolean; + highlighted?: boolean; onHoverStart: () => void; onHoverEnd: () => void; } @@ -57,6 +85,7 @@ export function MessagePill({ message, detailLevel, dimmed, + highlighted = false, onHoverStart, onHoverEnd, }: MessagePillProps) { @@ -67,13 +96,23 @@ export function MessagePill({ const preview = payloadStr.length > 80 ? `${payloadStr.slice(0, 80)}…` : payloadStr; const showBoundary = detailLevel === "detailed" && message.boundary; + const traceColor = message.traceId ? getTraceColor(message.traceId) : null; + const truncatedTraceId = message.traceId?.slice(0, 8); return ( // biome-ignore lint/a11y/noStaticElementInteractions: pill is a visual dev tool element, not interactive UI
@@ -86,6 +125,15 @@ export function MessagePill({ {message.boundary} )} + {message.traceId && ( + + {truncatedTraceId} + + )} +{message.timestamp}ms {message.direction === "out" ? "↗" : "↙"}