diff --git a/libs/agent/src/lib/internals/stream-manager.bridge.spec.ts b/libs/agent/src/lib/internals/stream-manager.bridge.spec.ts index 1af26243d..744661f47 100644 --- a/libs/agent/src/lib/internals/stream-manager.bridge.spec.ts +++ b/libs/agent/src/lib/internals/stream-manager.bridge.spec.ts @@ -139,6 +139,88 @@ describe('createStreamManagerBridge', () => { } ); + it.each(['messages/partial', 'messages/complete'] as const)( + 'filters metadata from normalized SDK %s events (messages array path)', + async (type) => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of(null), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({}); + // Simulate post-normalizeSdkEvent shape: messages array includes metadata + // This is what FetchStreamTransport produces in production + transport.emit([{ + type, + messages: [ + { id: 'ai-1', type: 'ai', content: 'Hello' }, + { langgraph_node: 'chatbot', langgraph_triggers: ['start:chatbot'] }, + ], + data: [ + { id: 'ai-1', type: 'ai', content: 'Hello' }, + { langgraph_node: 'chatbot', langgraph_triggers: ['start:chatbot'] }, + ], + } as any]); + transport.close(); + + await new Promise(r => setTimeout(r, 10)); + + // Only the real message should be in messages$, not the metadata + expect(subjects.messages$.value).toHaveLength(1); + expect(subjects.messages$.value[0]).toMatchObject({ id: 'ai-1', content: 'Hello' }); + destroy$.next(); + } + ); + + it('does not accumulate metadata across multiple messages/partial events', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of(null), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({}); + + // First values event — sets up the human message + transport.emit([{ + type: 'values', + values: { messages: [{ id: 'h-1', type: 'human', content: 'hi' }] }, + } as any]); + + // Simulate multiple messages/partial events (production SDK shape) + for (let i = 0; i < 5; i++) { + transport.emit([{ + type: 'messages/partial', + messages: [ + { id: 'ai-1', type: 'ai', content: 'Hello'.slice(0, i + 1) }, + { langgraph_node: 'chatbot' }, + ], + data: [ + { id: 'ai-1', type: 'ai', content: 'Hello'.slice(0, i + 1) }, + { langgraph_node: 'chatbot' }, + ], + } as any]); + } + + transport.close(); + await new Promise(r => setTimeout(r, 10)); + + // Should only have human + AI messages, no accumulated metadata + expect(subjects.messages$.value).toHaveLength(2); + expect(subjects.messages$.value[0]).toMatchObject({ id: 'h-1', content: 'hi' }); + expect(subjects.messages$.value[1]).toMatchObject({ id: 'ai-1', content: 'Hello' }); + destroy$.next(); + }); + it('ignores late events from the previous stream after threadId changes', async () => { const transport = new MockAgentTransport(); const subjects = makeSubjects(); diff --git a/libs/agent/src/lib/internals/stream-manager.bridge.ts b/libs/agent/src/lib/internals/stream-manager.bridge.ts index fbce3cc0b..ba9adec47 100644 --- a/libs/agent/src/lib/internals/stream-manager.bridge.ts +++ b/libs/agent/src/lib/internals/stream-manager.bridge.ts @@ -255,7 +255,10 @@ function isMessagesEvent(type: StreamEvent['type']): boolean { function normalizeMessages(event: StreamEvent): unknown[] | null { const directMessages = event['messages']; if (Array.isArray(directMessages)) { - return directMessages; + // Filter out non-message metadata objects (e.g. { langgraph_node, langgraph_triggers }) + // that the LangGraph SDK includes alongside real messages in messages/* events. + const filtered = directMessages.filter(isMessageLike); + return filtered.length > 0 ? filtered : null; } const data = event['data'];