Skip to content

Commit 5dcdee0

Browse files
bloveclaude
andcommitted
fix(agent): filter SDK metadata from messages/partial events
normalizeMessages() had two code paths: event['messages'] (returned unfiltered) and event['data'] (filtered by isMessageLike). In production, FetchStreamTransport's normalizeSdkEvent wraps the raw SDK data array—which includes metadata objects like { langgraph_node, langgraph_triggers }—into event.messages. These metadata objects lack content/type/id fields, causing messageContent() to return undefined and crashing the content classifier's detectType() on undefined.length. The fix applies the existing isMessageLike filter to the event['messages'] path. Tests now simulate post-normalization event shapes matching what FetchStreamTransport produces. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1ad3b6a commit 5dcdee0

2 files changed

Lines changed: 86 additions & 1 deletion

File tree

libs/agent/src/lib/internals/stream-manager.bridge.spec.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,88 @@ describe('createStreamManagerBridge', () => {
139139
}
140140
);
141141

142+
it.each(['messages/partial', 'messages/complete'] as const)(
143+
'filters metadata from normalized SDK %s events (messages array path)',
144+
async (type) => {
145+
const transport = new MockAgentTransport();
146+
const subjects = makeSubjects();
147+
const destroy$ = new Subject<void>();
148+
const bridge = createStreamManagerBridge({
149+
options: { apiUrl: '', assistantId: 'test', transport },
150+
subjects,
151+
threadId$: of(null),
152+
destroy$: destroy$.asObservable(),
153+
});
154+
155+
bridge.submit({});
156+
// Simulate post-normalizeSdkEvent shape: messages array includes metadata
157+
// This is what FetchStreamTransport produces in production
158+
transport.emit([{
159+
type,
160+
messages: [
161+
{ id: 'ai-1', type: 'ai', content: 'Hello' },
162+
{ langgraph_node: 'chatbot', langgraph_triggers: ['start:chatbot'] },
163+
],
164+
data: [
165+
{ id: 'ai-1', type: 'ai', content: 'Hello' },
166+
{ langgraph_node: 'chatbot', langgraph_triggers: ['start:chatbot'] },
167+
],
168+
} as any]);
169+
transport.close();
170+
171+
await new Promise(r => setTimeout(r, 10));
172+
173+
// Only the real message should be in messages$, not the metadata
174+
expect(subjects.messages$.value).toHaveLength(1);
175+
expect(subjects.messages$.value[0]).toMatchObject({ id: 'ai-1', content: 'Hello' });
176+
destroy$.next();
177+
}
178+
);
179+
180+
it('does not accumulate metadata across multiple messages/partial events', async () => {
181+
const transport = new MockAgentTransport();
182+
const subjects = makeSubjects();
183+
const destroy$ = new Subject<void>();
184+
const bridge = createStreamManagerBridge({
185+
options: { apiUrl: '', assistantId: 'test', transport },
186+
subjects,
187+
threadId$: of(null),
188+
destroy$: destroy$.asObservable(),
189+
});
190+
191+
bridge.submit({});
192+
193+
// First values event — sets up the human message
194+
transport.emit([{
195+
type: 'values',
196+
values: { messages: [{ id: 'h-1', type: 'human', content: 'hi' }] },
197+
} as any]);
198+
199+
// Simulate multiple messages/partial events (production SDK shape)
200+
for (let i = 0; i < 5; i++) {
201+
transport.emit([{
202+
type: 'messages/partial',
203+
messages: [
204+
{ id: 'ai-1', type: 'ai', content: 'Hello'.slice(0, i + 1) },
205+
{ langgraph_node: 'chatbot' },
206+
],
207+
data: [
208+
{ id: 'ai-1', type: 'ai', content: 'Hello'.slice(0, i + 1) },
209+
{ langgraph_node: 'chatbot' },
210+
],
211+
} as any]);
212+
}
213+
214+
transport.close();
215+
await new Promise(r => setTimeout(r, 10));
216+
217+
// Should only have human + AI messages, no accumulated metadata
218+
expect(subjects.messages$.value).toHaveLength(2);
219+
expect(subjects.messages$.value[0]).toMatchObject({ id: 'h-1', content: 'hi' });
220+
expect(subjects.messages$.value[1]).toMatchObject({ id: 'ai-1', content: 'Hello' });
221+
destroy$.next();
222+
});
223+
142224
it('ignores late events from the previous stream after threadId changes', async () => {
143225
const transport = new MockAgentTransport();
144226
const subjects = makeSubjects();

libs/agent/src/lib/internals/stream-manager.bridge.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,10 @@ function isMessagesEvent(type: StreamEvent['type']): boolean {
255255
function normalizeMessages(event: StreamEvent): unknown[] | null {
256256
const directMessages = event['messages'];
257257
if (Array.isArray(directMessages)) {
258-
return directMessages;
258+
// Filter out non-message metadata objects (e.g. { langgraph_node, langgraph_triggers })
259+
// that the LangGraph SDK includes alongside real messages in messages/* events.
260+
const filtered = directMessages.filter(isMessageLike);
261+
return filtered.length > 0 ? filtered : null;
259262
}
260263

261264
const data = event['data'];

0 commit comments

Comments
 (0)