Skip to content
Open
322 changes: 322 additions & 0 deletions docs/plans/2026-02-25-message-flow-panel-design.md

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions shared/consumer-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,22 @@ export type ConsumerMessage =
type: "session_lifecycle";
subtype: string;
metadata: Record<string, unknown>;
}
| {
type: "adapter_drop";
reason: string;
dropped_type: string;
dropped_metadata?: Record<string, unknown>;
}
| {
type: "translation_event";
boundary: "T1" | "T2" | "T3" | "T4";
translator: string;
from: { format: string; body: unknown };
to: { format: string; body: unknown };
traceId?: string;
timestamp: number;
sessionId: string;
};

// ── Inbound Messages (consumer → bridge) ────────────────────────────────────
Expand Down
43 changes: 43 additions & 0 deletions src/adapters/claude/claude-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* queue outbound messages before the WebSocket handshake completes.
*/

import { randomUUID } from "node:crypto";
import type WebSocket from "ws";
import type { RawData } from "ws";
import type { BackendSession } from "../../core/interfaces/backend-adapter.js";
Expand Down Expand Up @@ -99,6 +100,14 @@ export class ClaudeSession implements BackendSession {
},
);
this.sendToSocket(ndjson);

this.enqueueTranslationEvent(
"T2",
"toNDJSON",
{ format: "UnifiedMessage", body: message },
{ format: "Claude NDJSON", body: ndjson },
trace.traceId,
);
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -267,6 +276,15 @@ export class ClaudeSession implements BackendSession {
},
);
this.queue.enqueue(unified);

const trace = extractTraceContext(unified.metadata);
this.enqueueTranslationEvent(
"T3",
"translate",
{ format: "Claude NDJSON", body: cliMsg },
{ format: "UnifiedMessage", body: unified },
trace.traceId,
);
} else {
const consumedType = cliMsg.type === "user" || cliMsg.type === "keep_alive";
this.tracer?.error(
Expand All @@ -285,6 +303,31 @@ export class ClaudeSession implements BackendSession {
}
}

private enqueueTranslationEvent(
boundary: "T1" | "T2" | "T3" | "T4",
translator: string,
from: { format: string; body: unknown },
to: { format: string; body: unknown },
traceId: string | undefined,
): void {
this.queue.enqueue({
id: randomUUID(),
timestamp: Date.now(),
type: "translation_event",
role: "system",
content: [],
metadata: {
boundary,
translator,
from,
to,
trace_id: traceId,
session_id: this.sessionId,
timestamp: Date.now(),
},
});
}

private traceUnparsedLine(line: string, error: string): void {
const maxChars = 2_000;
const truncated =
Expand Down
7 changes: 5 additions & 2 deletions src/bin/beamcode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,11 @@ export async function runBeamcode(argv: string[] = process.argv): Promise<void>
await sessionCoordinator.start();
setActiveSessionId(pickMostRecentSessionId(sessionCoordinator.registry.listSessions()));

// 7. Auto-launch a session AFTER WS is ready so the CLI can connect
if (!config.noAutoLaunch) {
// 7. Auto-launch a session AFTER WS is ready so the CLI can connect.
// Skip if sessions were already restored from storage — the consumer UI will
// show them and the user can create new ones via the dialog.
const existingSessions = sessionCoordinator.registry.listSessions();
if (!config.noAutoLaunch && existingSessions.length === 0) {
try {
const session = await sessionCoordinator.createSession({
cwd: config.cwd,
Expand Down
18 changes: 18 additions & 0 deletions src/core/messaging/consumer-message-mapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,24 @@ export function mapSessionLifecycle(msg: UnifiedMessage): ConsumerMessage {
return mapMetadataMessage("session_lifecycle", msg);
}

/**
* Map a UnifiedMessage of type "translation_event" to a ConsumerMessage.
* Translation events carry message flow visualization metadata.
*/
export function mapTranslationEvent(msg: UnifiedMessage): ConsumerMessage {
const m = msg.metadata;
return {
type: "translation_event",
boundary: m.boundary as "T1" | "T2" | "T3" | "T4",
translator: m.translator as string,
from: m.from as { format: string; body: unknown },
to: m.to as { format: string; body: unknown },
traceId: m.trace_id as string | undefined,
timestamp: m.timestamp as number,
sessionId: m.session_id as string,
};
}

/** Shared mapper for message types that forward subtype + metadata. */
function mapMetadataMessage(
type: "configuration_change" | "session_lifecycle",
Expand Down
5 changes: 5 additions & 0 deletions src/core/session/effect-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ export function executeEffects(
case "SEND_TO_CONSUMER":
deps.broadcaster.sendTo(effect.ws, effect.message);
break;

case "EMIT_TRANSLATION":
// Forward translation event to consumers for message flow panel visualization
deps.broadcaster.broadcast(session, effect.event);
break;
}
} catch (err) {
// A failing effect must never abort subsequent effects.
Expand Down
4 changes: 3 additions & 1 deletion src/core/session/effect-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ export type Effect =
/** Resolve git info for the session (after seeding cwd). */
| { type: "RESOLVE_GIT_INFO" }
/** Send a targeted message to a specific consumer WebSocket. */
| { type: "SEND_TO_CONSUMER"; ws: WebSocketLike; message: ConsumerMessage };
| { type: "SEND_TO_CONSUMER"; ws: WebSocketLike; message: ConsumerMessage }
/** Emit a translation event for message flow panel visualization (dev tool). */
| { type: "EMIT_TRANSLATION"; event: ConsumerMessage };
85 changes: 80 additions & 5 deletions src/core/session/session-reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
mapStreamEvent,
mapToolProgress,
mapToolUseSummary,
mapTranslationEvent,
} from "../messaging/consumer-message-mapper.js";
import { normalizeInbound } from "../messaging/inbound-normalizer.js";
import { diffTeamState } from "../team/team-event-differ.js";
Expand Down Expand Up @@ -588,12 +589,13 @@ function reduceInboundCommand(
);

// Normalize message for backend send (pure — no I/O).
const baseUnified = normalizeInbound({
type: "user_message",
const inboundMsg = {
type: "user_message" as const,
content: command.content,
session_id: data.backendSessionId || command.session_id || "",
images: command.images,
});
};
const baseUnified = normalizeInbound(inboundMsg);
if (!baseUnified) return [data, []];

// Apply slash passthrough trace context when present (always a complete group).
Expand All @@ -610,6 +612,15 @@ function reduceInboundCommand(
}
: baseUnified;

const t1Event = translationEffect(
"T1",
"normalizeInbound",
{ format: "InboundMessage", body: inboundMsg },
{ format: "UnifiedMessage", body: unified },
unified.metadata.trace_id as string | undefined,
data.state.session_id,
);

const isConnected = data.lifecycle === "active" || data.lifecycle === "idle";

if (isConnected) {
Expand All @@ -624,6 +635,7 @@ function reduceInboundCommand(
[
{ type: "BROADCAST", message: userMsg },
{ type: "PERSIST_NOW" },
t1Event,
{ type: "SEND_TO_BACKEND", message: unified },
],
];
Expand All @@ -644,7 +656,7 @@ function reduceInboundCommand(
messageHistory: nextHistory,
pendingMessages: [...data.pendingMessages, unified],
},
[{ type: "BROADCAST", message: userMsg }, { type: "PERSIST_NOW" }],
[{ type: "BROADCAST", message: userMsg }, { type: "PERSIST_NOW" }, t1Event],
];
}

Expand Down Expand Up @@ -762,6 +774,33 @@ function reduceBackendMessage(
return [nextData, effects];
}

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

function translationEffect(
boundary: "T1" | "T2" | "T3" | "T4",
translator: string,
from: { format: string; body: unknown },
to: { format: string; body: unknown },
traceId: string | undefined,
sessionId: string,
): Effect {
return {
type: "EMIT_TRANSLATION",
event: {
type: "translation_event",
boundary,
translator,
from,
to,
traceId,
timestamp: Date.now(),
sessionId,
},
};
}

// ---------------------------------------------------------------------------
// Effect builder — pure, depends only on prev/next data and the message
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -824,14 +863,35 @@ function buildEffects(
if (nextData.messageHistory !== prevData.messageHistory) {
const mapped = mapAssistantMessage(message);
if (mapped.type === "assistant") {
effects.push(
translationEffect(
"T4",
"mapAssistantMessage",
{ format: "UnifiedMessage", body: message },
{ format: "ConsumerMessage", body: mapped },
message.metadata?.trace_id as string | undefined,
prevData.state.session_id,
),
);
effects.push({ type: "BROADCAST", message: mapped });
}
}
break;
}

case "result": {
effects.push({ type: "BROADCAST", message: mapResultMessage(message) });
const resultMsg = mapResultMessage(message);
effects.push(
translationEffect(
"T4",
"mapResultMessage",
{ format: "UnifiedMessage", body: message },
{ format: "ConsumerMessage", body: resultMsg },
message.metadata?.trace_id as string | undefined,
prevData.state.session_id,
),
);
effects.push({ type: "BROADCAST", message: resultMsg });
effects.push({ type: "AUTO_SEND_QUEUED" });
// Emit first-turn completion event when num_turns reaches 1
const numTurns = message.metadata?.num_turns as number | undefined;
Expand Down Expand Up @@ -875,6 +935,16 @@ function buildEffects(
eventType: "permission:requested",
payload: { request: mapped.cliPerm },
});
} else {
effects.push({
type: "BROADCAST",
message: {
type: "adapter_drop",
reason: `permission_request subtype '${String(message.metadata?.subtype ?? "unknown")}' is not supported (only 'can_use_tool' is handled)`,
dropped_type: "permission_request",
dropped_metadata: message.metadata as Record<string, unknown>,
},
});
}
break;
}
Expand Down Expand Up @@ -930,6 +1000,11 @@ function buildEffects(
effects.push({ type: "BROADCAST", message: mapSessionLifecycle(message) });
break;
}

case "translation_event": {
effects.push({ type: "BROADCAST", message: mapTranslationEvent(message) });
break;
}
}

return effects;
Expand Down
7 changes: 4 additions & 3 deletions src/core/session/session-state-reducer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -671,15 +671,16 @@ describe("sessionReducer INBOUND_COMMAND", () => {
expect(next.lastStatus).toBe("running");
expect(next.messageHistory).toHaveLength(1);
expect(next.messageHistory[0]).toMatchObject({ type: "user_message", content: "hi" });
// Reducer now produces 3 effects: BROADCAST, PERSIST_NOW, and SEND_TO_BACKEND
// Reducer now produces 4 effects: BROADCAST, PERSIST_NOW, EMIT_TRANSLATION, and SEND_TO_BACKEND
// (backend connected path — lifecycle is "active")
expect(effects).toHaveLength(3);
expect(effects).toHaveLength(4);
expect(effects[0]).toMatchObject({
type: "BROADCAST",
message: { type: "user_message", content: "hi" },
});
expect(effects[1]).toMatchObject({ type: "PERSIST_NOW" });
expect(effects[2]).toMatchObject({ type: "SEND_TO_BACKEND" });
expect(effects[2]).toMatchObject({ type: "EMIT_TRANSLATION" });
expect(effects[3]).toMatchObject({ type: "SEND_TO_BACKEND" });
});

it("returns data unchanged and empty effects for user_message when lifecycle is closing", () => {
Expand Down
2 changes: 2 additions & 0 deletions src/core/types/unified-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export type UnifiedMessageType =
| "team_task_update"
| "team_state_change"
| "session_lifecycle"
| "translation_event"
| "unknown";

/**
Expand Down Expand Up @@ -217,6 +218,7 @@ const VALID_MESSAGE_TYPES = new Set<string>([
"team_task_update",
"team_state_change",
"session_lifecycle",
"translation_event",
"unknown",
]);

Expand Down
16 changes: 16 additions & 0 deletions src/types/consumer-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,20 @@ export type ConsumerMessage =
type: "session_lifecycle";
subtype: string;
metadata: Record<string, unknown>;
}
| {
type: "adapter_drop";
reason: string;
dropped_type: string;
dropped_metadata?: Record<string, unknown>;
}
| {
type: "translation_event";
boundary: "T1" | "T2" | "T3" | "T4";
translator: string;
from: { format: string; body: unknown };
to: { format: string; body: unknown };
traceId?: string;
timestamp: number;
sessionId: string;
};
Loading