Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ jobs:
- name: Patch install telemetry into publishable manifests
run: node libs/telemetry/scripts/apply-install-telemetry.mjs dist/libs/chat dist/libs/langgraph dist/libs/ag-ui dist/libs/render dist/libs/a2ui dist/libs/licensing

- name: Verify install telemetry in publishable manifests
run: node libs/telemetry/scripts/verify-install-telemetry.mjs dist/libs/chat dist/libs/langgraph dist/libs/ag-ui dist/libs/render dist/libs/a2ui dist/libs/licensing dist/libs/telemetry

- name: Verify atomic release versions
run: node scripts/verify-release-versions.mjs --tag "$RELEASE_TAG"
env:
Expand Down
43 changes: 43 additions & 0 deletions apps/website/src/app/api/ingest/route.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';

const capture = vi.fn();
const shutdown = vi.fn();

vi.mock('posthog-node', () => ({
PostHog: vi.fn(function PostHog() {
return { capture, shutdown };
}),
}));

import { POST } from './route';

describe('/api/ingest', () => {
beforeEach(() => {
vi.clearAllMocks();
process.env.NEXT_PUBLIC_POSTHOG_TOKEN = 'phc_server';
delete process.env.NEXT_PUBLIC_POSTHOG_HOST;
});

it('accepts neutral browser telemetry payloads without requiring the public ingest key', async () => {
shutdown.mockResolvedValue(undefined);
const response = await POST(new Request('https://cacheplane.ai/api/ingest', {
method: 'POST',
body: JSON.stringify({
event: 'ngaf:browser_chat_init',
distinctId: 'browser:test',
properties: { surface: 'canonical_demo' },
}),
}) as never);

expect(response.status).toBe(202);
expect(capture).toHaveBeenCalledWith({
distinctId: 'browser:test',
event: 'ngaf:browser_chat_init',
properties: {
surface: 'canonical_demo',
$ip: null,
$process_person_profile: false,
},
});
});
});
2 changes: 1 addition & 1 deletion apps/website/src/app/api/ingest/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function readPayload(value: unknown): {
} | null {
if (!isRecord(value)) return null;
const payload = value as TelemetryIngestPayload;
if (payload.key !== PUBLIC_INGEST_KEY) return null;
if (payload.key !== undefined && payload.key !== PUBLIC_INGEST_KEY) return null;

const distinctId = toSafeAnalyticsString(payload.distinctId, 200);
const event = toSafeAnalyticsString(payload.event, 100);
Expand Down
14 changes: 14 additions & 0 deletions docs/gtm/taxonomy.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ sessions by design.
|--------------------------------------|--------------------------------------------|-----------------|--------------|
| `ngaf:postinstall` | Dependency/global install of a published `@ngaf/*` package | Node (script) | **Opt-out** |
| `ngaf:runtime_instance_created` | Runtime adapter init | Node / Browser | **Opt-out** on Node, **Opt-in** in Browser |
| `ngaf:runtime_request_created` | Runtime adapter request created | Node / Browser | **Opt-out** on Node, **Opt-in** in Browser |
| `ngaf:stream_started` | Stream begins | Node / Browser | **Opt-out** on Node, **Opt-in** in Browser |
| `ngaf:stream_ended` | Stream ends normally | Node / Browser | **Opt-out** on Node, **Opt-in** in Browser |
| `ngaf:stream_errored` | Stream errors | Node / Browser | **Opt-out** on Node, **Opt-in** in Browser |
Expand Down Expand Up @@ -102,6 +103,19 @@ Browser events never fire unless the consumer explicitly opts in. See `libs/tele
| `global_install` | bool | Whether npm reports a global install. |
| `sample_weight` | number | Inverse sample rate for weighted counts. |

### Runtime telemetry properties

| Property | Type | Notes |
|---------------|--------|--------------------------------------------|
| `transport` | string | Runtime transport, e.g. `langgraph`, `ag-ui`, or `custom`. |
| `surface` | string | Adapter surface emitting the event. |
| `requestType` | string | Request shape, e.g. `submit`, `resubmit`, `regenerate`, `enqueue`, or `join`. |
| `provider` | string | Model provider when known. |
| `model` | string | Model name when known. |
| `durationMs` | number | Stream duration for end/error events. |
| `errorClass` | string | Error class only. Never send error messages. |
| `sample_weight` | number | Inverse sample rate for weighted counts. |

## Shared properties

| Property | Type | Notes |
Expand Down
6 changes: 4 additions & 2 deletions libs/ag-ui/src/lib/provide-ag-ui-agent.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: MIT
import { InjectionToken, inject, type Provider } from '@angular/core';
import { HttpAgent } from '@ag-ui/client';
import type { Agent } from '@ngaf/chat';
import type { Agent, AgentRuntimeTelemetrySink } from '@ngaf/chat';
import { toAgent } from './to-agent';

/**
Expand All @@ -17,6 +17,8 @@ export interface AgUiAgentConfig {
agentId?: string;
threadId?: string;
headers?: Record<string, string>;
/** Optional app-owned telemetry sink. No telemetry is emitted unless this is provided. */
telemetry?: AgentRuntimeTelemetrySink | false;
}

export const AG_UI_AGENT = new InjectionToken<Agent>('AG_UI_AGENT');
Expand All @@ -38,7 +40,7 @@ export function provideAgUiAgent(config: AgUiAgentConfig): Provider[] {
...(config.threadId !== undefined ? { threadId: config.threadId } : {}),
...(config.headers !== undefined ? { headers: config.headers } : {}),
});
return toAgent(source);
return toAgent(source, { telemetry: config.telemetry });
},
},
];
Expand Down
46 changes: 46 additions & 0 deletions libs/ag-ui/src/lib/to-agent.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { describe, it, expect, vi } from 'vitest';
import { Observable, Subject } from 'rxjs';
import type { AbstractAgent, BaseEvent } from '@ag-ui/client';
import type { RunAgentInput } from '@ag-ui/core';
import type { AgentRuntimeTelemetryPayload } from '@ngaf/chat';
import { toAgent } from './to-agent';

/**
Expand Down Expand Up @@ -113,6 +114,51 @@ describe('toAgent', () => {
expect(stub.runAgent).toHaveBeenCalledOnce();
});

it('emits opt-in telemetry around completed AG-UI runs', async () => {
const stub = new StubAgent();
const seen: AgentRuntimeTelemetryPayload[] = [];
const a = toAgent(stub as unknown as AbstractAgent, {
telemetry: (payload) => seen.push(payload),
});

await a.submit({ message: 'hi' });

expect(seen.map((payload) => payload.event)).toEqual([
'ngaf:runtime_instance_created',
'ngaf:runtime_request_created',
'ngaf:stream_started',
'ngaf:stream_ended',
]);
expect(seen[0].properties).toEqual({ transport: 'ag-ui', surface: 'to_agent' });
expect(seen[1].properties).toEqual({ transport: 'ag-ui', surface: 'to_agent', requestType: 'submit' });
expect(seen[2].properties).toEqual({ transport: 'ag-ui', surface: 'to_agent' });
expect(seen[3].properties).toEqual({
transport: 'ag-ui',
surface: 'to_agent',
durationMs: expect.any(Number),
});
});

it('emits opt-in telemetry for AG-UI failures without error messages', async () => {
const stub = new StubAgent();
const seen: AgentRuntimeTelemetryPayload[] = [];
const a = toAgent(stub as unknown as AbstractAgent, {
telemetry: (payload) => seen.push(payload),
});
stub.runAgent.mockRejectedValueOnce(new SyntaxError('private app state'));

await a.submit({ message: 'hi' });

const errored = seen.find((payload) => payload.event === 'ngaf:stream_errored');
expect(errored?.properties).toEqual({
transport: 'ag-ui',
surface: 'to_agent',
durationMs: expect.any(Number),
errorClass: 'SyntaxError',
});
expect(JSON.stringify(seen)).not.toContain('private app state');
});

it('stop() calls source.abortRun()', async () => {
const stub = new StubAgent();
const a = toAgent(stub as unknown as AbstractAgent);
Expand Down
83 changes: 82 additions & 1 deletion libs/ag-ui/src/lib/to-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,45 @@ import { Subject } from 'rxjs';
import type { AbstractAgent } from '@ag-ui/client';
import type {
Agent, Message, AgentStatus, ToolCall, AgentEvent,
AgentRuntimeTelemetryEvent,
AgentRuntimeTelemetryProperties,
AgentRuntimeTelemetrySink,
AgentSubmitInput, AgentSubmitOptions,
} from '@ngaf/chat';
import { reduceEvent, type ReducerStore } from './reducer';

export interface ToAgentOptions {
/** Optional app-owned telemetry sink. No telemetry is emitted unless this is provided. */
telemetry?: AgentRuntimeTelemetrySink | false;
}

function captureAgentRuntimeTelemetry(
sink: AgentRuntimeTelemetrySink | false | undefined,
event: AgentRuntimeTelemetryEvent,
properties: AgentRuntimeTelemetryProperties,
): void {
if (!sink) return;
try {
void Promise.resolve(sink({ event, properties })).catch(() => undefined);
} catch {
// Keep telemetry side effects isolated from adapter control flow.
}
}

function agentRuntimeTelemetryErrorClass(error: unknown): string {
if (error instanceof Error) return error.name || error.constructor.name || 'Error';
if (
error
&& typeof error === 'object'
&& 'name' in error
&& typeof error.name === 'string'
&& error.name.length > 0
) {
return error.name;
}
return 'UnknownError';
}

/**
* Wraps an AG-UI AbstractAgent into the runtime-neutral Agent contract.
*
Expand All @@ -22,7 +57,7 @@ import { reduceEvent, type ReducerStore } from './reducer';
* agent instance they constructed. The subscriber registered via
* source.subscribe() will fire for the lifetime of source.
*/
export function toAgent(source: AbstractAgent): Agent {
export function toAgent(source: AbstractAgent, options: ToAgentOptions = {}): Agent {
const store: ReducerStore = {
messages: signal<Message[]>([]),
status: signal<AgentStatus>('idle'),
Expand All @@ -32,6 +67,45 @@ export function toAgent(source: AbstractAgent): Agent {
state: signal<Record<string, unknown>>({}),
events$: new Subject<AgentEvent>(),
};
const telemetryProperties = { transport: 'ag-ui' as const, surface: 'to_agent' };
let activeRun: { startedAt: number; errored: boolean } | null = null;

captureAgentRuntimeTelemetry(
options.telemetry,
'ngaf:runtime_instance_created',
telemetryProperties,
);

function startRunTelemetry(requestType: string): { startedAt: number; errored: boolean } {
const run = { startedAt: Date.now(), errored: false };
activeRun = run;
captureAgentRuntimeTelemetry(options.telemetry, 'ngaf:runtime_request_created', {
...telemetryProperties,
requestType,
});
captureAgentRuntimeTelemetry(options.telemetry, 'ngaf:stream_started', telemetryProperties);
return run;
}

function finishRunTelemetry(run: { startedAt: number; errored: boolean }): void {
if (run.errored) return;
captureAgentRuntimeTelemetry(options.telemetry, 'ngaf:stream_ended', {
...telemetryProperties,
durationMs: Date.now() - run.startedAt,
});
if (activeRun === run) activeRun = null;
}

function failRunTelemetry(error: unknown, run = activeRun): void {
if (!run || run.errored) return;
run.errored = true;
captureAgentRuntimeTelemetry(options.telemetry, 'ngaf:stream_errored', {
...telemetryProperties,
durationMs: Date.now() - run.startedAt,
errorClass: agentRuntimeTelemetryErrorClass(error),
});
if (activeRun === run) activeRun = null;
}

// Tap all events from the source agent via the AgentSubscriber API.
// This subscription lives for the lifetime of `source`.
Expand All @@ -43,6 +117,7 @@ export function toAgent(source: AbstractAgent): Agent {
store.status.set('error');
store.isLoading.set(false);
store.error.set(error);
failRunTelemetry(error);
},
});

Expand All @@ -65,14 +140,17 @@ export function toAgent(source: AbstractAgent): Agent {
source.addMessage(userMsg as Parameters<typeof source.addMessage>[0]);
}

const run = startRunTelemetry('submit');
try {
await source.runAgent();
finishRunTelemetry(run);
} catch (err) {
// If the run was aborted via stop(), abortRun() resolves the promise
// rather than rejecting — but catch any unexpected errors here.
store.status.set('error');
store.isLoading.set(false);
store.error.set(err);
failRunTelemetry(err, run);
}
},

Expand Down Expand Up @@ -113,12 +191,15 @@ export function toAgent(source: AbstractAgent): Agent {
// message in `trimmed` becomes the active prompt for the next run.
source.setMessages(trimmed as Parameters<typeof source.setMessages>[0]);

const run = startRunTelemetry('regenerate');
try {
await source.runAgent();
finishRunTelemetry(run);
} catch (err) {
store.status.set('error');
store.isLoading.set(false);
store.error.set(err);
failRunTelemetry(err, run);
}
},
};
Expand Down
1 change: 1 addition & 0 deletions libs/ag-ui/src/public-api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// SPDX-License-Identifier: MIT
export { toAgent } from './lib/to-agent';
export type { ToAgentOptions } from './lib/to-agent';
export { provideAgUiAgent, AG_UI_AGENT, injectAgUiAgent } from './lib/provide-ag-ui-agent';
export type { AgUiAgentConfig } from './lib/provide-ag-ui-agent';
export { FakeAgent } from './lib/testing/fake-agent';
Expand Down
6 changes: 6 additions & 0 deletions libs/chat/src/lib/agent/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,9 @@ export type {
} from './agent-event';
export type { AgentCheckpoint } from './agent-checkpoint';
export type { AgentWithHistory } from './agent-with-history';
export type {
AgentRuntimeTelemetryEvent,
AgentRuntimeTelemetryPayload,
AgentRuntimeTelemetryProperties,
AgentRuntimeTelemetrySink,
} from './runtime-telemetry';
25 changes: 25 additions & 0 deletions libs/chat/src/lib/agent/runtime-telemetry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// SPDX-License-Identifier: MIT

export type AgentRuntimeTelemetryEvent =
| 'ngaf:runtime_instance_created'
| 'ngaf:runtime_request_created'
| 'ngaf:stream_started'
| 'ngaf:stream_ended'
| 'ngaf:stream_errored';

export interface AgentRuntimeTelemetryProperties {
transport: 'langgraph' | 'ag-ui' | 'custom' | string;
surface?: string;
requestType?: string;
provider?: string;
model?: string;
durationMs?: number;
errorClass?: string;
}

export interface AgentRuntimeTelemetryPayload {
event: AgentRuntimeTelemetryEvent;
properties: AgentRuntimeTelemetryProperties;
}

export type AgentRuntimeTelemetrySink = (payload: AgentRuntimeTelemetryPayload) => void | Promise<void>;
4 changes: 4 additions & 0 deletions libs/chat/src/public-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ export type {
AgentStateUpdateEvent,
AgentCustomEvent,
AgentCheckpoint,
AgentRuntimeTelemetryEvent,
AgentRuntimeTelemetryPayload,
AgentRuntimeTelemetryProperties,
AgentRuntimeTelemetrySink,
} from './lib/agent';
export {
isUserMessage,
Expand Down
9 changes: 8 additions & 1 deletion libs/langgraph/src/lib/agent.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ import type {
SubmitOptions,
} from '@langchain/langgraph-sdk/ui';
import type { BaseMessage, AIMessage as CoreAIMessage } from '@langchain/core/messages';
import type { AgentSubmitInput, AgentSubmitOptions, AgentWithHistory } from '@ngaf/chat';
import type {
AgentRuntimeTelemetrySink,
AgentSubmitInput,
AgentSubmitOptions,
AgentWithHistory,
} from '@ngaf/chat';
import type { AgentLifecycle } from './lifecycle';

// Re-export SDK types so consumers don't need to import from langgraph-sdk directly
Expand Down Expand Up @@ -251,6 +256,8 @@ export interface AgentOptions<T, _ResolvedBag extends BagTemplate> {
toMessage?: (msg: unknown) => BaseMessage;
/** Custom transport. Defaults to FetchStreamTransport. */
transport?: AgentTransport;
/** Optional app-owned telemetry sink. No telemetry is emitted unless this is provided. */
telemetry?: AgentRuntimeTelemetrySink | false;
/** When true, subagent messages are filtered from the main messages signal. */
filterSubagentMessages?: boolean;
/** Tool names that indicate a subagent invocation. */
Expand Down
Loading
Loading