diff --git a/apps/dev-playground/app.yaml b/apps/dev-playground/app.yaml index e58e71a31..7b57e4ff8 100644 --- a/apps/dev-playground/app.yaml +++ b/apps/dev-playground/app.yaml @@ -5,6 +5,8 @@ env: valueFrom: genie-space - name: DATABRICKS_SERVING_ENDPOINT_NAME valueFrom: serving-endpoint + - name: DATABRICKS_JOB_ID + valueFrom: job # Files plugin manifest declares a static DATABRICKS_VOLUME_FILES # requirement; keep it bound so appkit's runtime validation passes # even though the policy harness below uses its own keys. diff --git a/docs/docs/api/appkit/Function.createApp.md b/docs/docs/api/appkit/Function.createApp.md index 6a0b7cb2a..85cb584b0 100644 --- a/docs/docs/api/appkit/Function.createApp.md +++ b/docs/docs/api/appkit/Function.createApp.md @@ -4,6 +4,7 @@ function createApp(config: { cache?: CacheConfig; client?: WorkspaceClient; + disableInternalTelemetry?: boolean; onPluginsReady?: (appkit: PluginMap) => void | Promise; plugins?: T; telemetry?: TelemetryConfig; @@ -30,9 +31,10 @@ with an `asUser(req)` method for user-scoped execution. | Parameter | Type | | ------ | ------ | -| `config` | \{ `cache?`: [`CacheConfig`](Interface.CacheConfig.md); `client?`: `WorkspaceClient`; `onPluginsReady?`: (`appkit`: `PluginMap`\<`T`\>) => `void` \| `Promise`\<`void`\>; `plugins?`: `T`; `telemetry?`: [`TelemetryConfig`](Interface.TelemetryConfig.md); \} | +| `config` | \{ `cache?`: [`CacheConfig`](Interface.CacheConfig.md); `client?`: `WorkspaceClient`; `disableInternalTelemetry?`: `boolean`; `onPluginsReady?`: (`appkit`: `PluginMap`\<`T`\>) => `void` \| `Promise`\<`void`\>; `plugins?`: `T`; `telemetry?`: [`TelemetryConfig`](Interface.TelemetryConfig.md); \} | | `config.cache?` | [`CacheConfig`](Interface.CacheConfig.md) | | `config.client?` | `WorkspaceClient` | +| `config.disableInternalTelemetry?` | `boolean` | | `config.onPluginsReady?` | (`appkit`: `PluginMap`\<`T`\>) => `void` \| `Promise`\<`void`\> | | `config.plugins?` | `T` | | `config.telemetry?` | [`TelemetryConfig`](Interface.TelemetryConfig.md) | diff --git a/docs/docs/privacy.mdx b/docs/docs/privacy.mdx new file mode 100644 index 000000000..abc1b6ea4 --- /dev/null +++ b/docs/docs/privacy.mdx @@ -0,0 +1,47 @@ +--- +sidebar_position: 99 +--- + +# Privacy + +AppKit sends a small amount of anonymized usage telemetry to Databricks +so the team can understand how the SDK is used and prioritize +improvements. This page documents exactly what is sent, when, and how +to turn it off. + +## What we collect + +Every event is a single record with three top-level fields: + +| Field | Type | Source | +| ---------------- | ------ | ----------------------------------- | +| `event_name` | enum | One of `APP_STARTUP`, `HEARTBEAT`, `REQUEST_METRICS` | +| `app_id` | string | The app's OAuth client UUID (`DATABRICKS_CLIENT_ID`) | +| `appkit_version` | string | The AppKit SDK version | + +Each event also carries one of three event-specific bodies: + +- **`APP_STARTUP`** — emitted once when `createApp` finishes booting. + Empty body. +- **`HEARTBEAT`** — emitted every five minutes from a running app. + Empty body. +- **`REQUEST_METRICS`** — emitted once per minute, one record per HTTP + endpoint that received traffic in the window. Each record contains: + - `endpoint` — the route template (e.g. `GET /api/genie/:space_id/messages`), + never the raw request URL or any user-provided values. + - `request_count` + - `request_latency_ms_avg` + - `response_count_http4xx` + - `response_count_http5xx` + +## How to opt out + +Set any one of the following: + +```sh +DISABLE_APPKIT_INTERNAL_TELEMETRY=true +DO_NOT_TRACK=1 +``` + +Either fully disables the reporter — no events are emitted and no +network calls are made. diff --git a/packages/appkit/src/core/appkit.ts b/packages/appkit/src/core/appkit.ts index 5d1dd4553..3421e03bb 100644 --- a/packages/appkit/src/core/appkit.ts +++ b/packages/appkit/src/core/appkit.ts @@ -8,8 +8,13 @@ import type { PluginData, PluginMap, } from "shared"; +import { version as productVersion } from "../../package.json"; import { CacheManager } from "../cache"; import { ServiceContext } from "../context"; +import { + isInternalTelemetryEnabled, + TelemetryReporter, +} from "../internal-telemetry"; import { createLogger } from "../logging/logger"; import { ResourceRegistry, ResourceType } from "../registry"; import type { TelemetryConfig } from "../telemetry"; @@ -191,6 +196,7 @@ export class AppKit { cache?: CacheConfig; client?: WorkspaceClient; onPluginsReady?: (appkit: PluginMap) => void | Promise; + disableInternalTelemetry?: boolean; } = {}, ): Promise> { // Initialize core services @@ -233,6 +239,10 @@ export class AppKit { logger.debug("onPluginsReady hook completed"); } + if (isInternalTelemetryEnabled(config)) { + AppKit.bootstrapInternalTelemetry(); + } + const serverPlugin = instance.#pluginInstances.server; if (serverPlugin && typeof (serverPlugin as any).start === "function") { await (serverPlugin as any).start(); @@ -241,6 +251,18 @@ export class AppKit { return handle; } + private static bootstrapInternalTelemetry(): void { + const serviceCtx = ServiceContext.get(); + const reporter = TelemetryReporter.initialize({ + workspaceId: serviceCtx.workspaceId, + client: serviceCtx.client, + appId: process.env.DATABRICKS_CLIENT_ID || "", + appkitVersion: productVersion, + }); + reporter.start(); + reporter.sendStartup().catch(() => {}); + } + private static preparePlugins( plugins: PluginData[], ) { @@ -300,6 +322,7 @@ export async function createApp< cache?: CacheConfig; client?: WorkspaceClient; onPluginsReady?: (appkit: PluginMap) => void | Promise; + disableInternalTelemetry?: boolean; } = {}, ): Promise> { return AppKit._createApp(config); diff --git a/packages/appkit/src/core/tests/databricks.test.ts b/packages/appkit/src/core/tests/databricks.test.ts index 9d3fe5f81..0083ec62e 100644 --- a/packages/appkit/src/core/tests/databricks.test.ts +++ b/packages/appkit/src/core/tests/databricks.test.ts @@ -6,6 +6,24 @@ import type { PluginManifest } from "../../registry/types"; import { ResourceType } from "../../registry/types"; import { AppKit, createApp } from "../appkit"; +const mockReporter = { + start: vi.fn(), + stop: vi.fn(), + sendStartup: vi.fn().mockResolvedValue(undefined), + sendHeartbeat: vi.fn().mockResolvedValue(undefined), + flushRequestMetrics: vi.fn().mockResolvedValue(undefined), + recordRequest: vi.fn(), +}; + +vi.mock("../../internal-telemetry", () => ({ + isInternalTelemetryEnabled: vi.fn().mockReturnValue(true), + TelemetryReporter: { + initialize: vi.fn(() => mockReporter), + getInstance: vi.fn(() => mockReporter), + _reset: vi.fn(), + }, +})); + // Generic test manifest for test plugins const createTestManifest = (name: string): PluginManifest => ({ name, @@ -629,6 +647,60 @@ describe("AppKit", () => { }); }); + describe("internal telemetry", () => { + test("initializes the reporter and fires sendStartup after createApp", async () => { + const { TelemetryReporter } = await import("../../internal-telemetry"); + mockReporter.sendStartup.mockClear(); + mockReporter.start.mockClear(); + vi.mocked(TelemetryReporter.initialize).mockClear(); + + await createApp({ + plugins: [{ plugin: CoreTestPlugin, config: {}, name: "coreTest" }], + }); + + // Allow the fire-and-forget promise chain to resolve + await new Promise((r) => setTimeout(r, 10)); + + expect(TelemetryReporter.initialize).toHaveBeenCalledWith( + expect.objectContaining({ + appkitVersion: expect.any(String), + client: expect.anything(), + }), + ); + expect(mockReporter.start).toHaveBeenCalledOnce(); + expect(mockReporter.sendStartup).toHaveBeenCalledOnce(); + }); + + test("skips bootstrap when isInternalTelemetryEnabled returns false", async () => { + const { isInternalTelemetryEnabled, TelemetryReporter } = await import( + "../../internal-telemetry" + ); + vi.mocked(TelemetryReporter.initialize).mockClear(); + mockReporter.sendStartup.mockClear(); + vi.mocked(isInternalTelemetryEnabled).mockReturnValue(false); + + await createApp({ plugins: [] }); + + await new Promise((r) => setTimeout(r, 10)); + + expect(TelemetryReporter.initialize).not.toHaveBeenCalled(); + expect(mockReporter.sendStartup).not.toHaveBeenCalled(); + vi.mocked(isInternalTelemetryEnabled).mockReturnValue(true); + }); + + test("does not crash startup if sendStartup rejects", async () => { + mockReporter.sendStartup.mockRejectedValueOnce( + new Error("telemetry failure"), + ); + + const instance = await createApp({ + plugins: [{ plugin: CoreTestPlugin, config: {}, name: "coreTest" }], + }); + + expect(instance).toBeDefined(); + }); + }); + describe("SDK context binding", () => { test("should bind SDK methods to plugin instance", async () => { class ContextTestPlugin implements BasePlugin { diff --git a/packages/appkit/src/internal-telemetry/appkit-log.ts b/packages/appkit/src/internal-telemetry/appkit-log.ts new file mode 100644 index 000000000..d04cf62a8 --- /dev/null +++ b/packages/appkit/src/internal-telemetry/appkit-log.ts @@ -0,0 +1,58 @@ +// IMPORTANT: keep this file in sync with the AppkitLog proto schema served by +// the Databricks client telemetry endpoint. Field names use proto JSON +// conventions (snake_case) so the wire format matches the backend. + +export type AppkitEventName = + | "APPKIT_EVENT_NAME_UNSPECIFIED" + | "APP_STARTUP" + | "HEARTBEAT" + | "REQUEST_METRICS"; + +export type AppStartupEvent = Record; + +export type HeartbeatEvent = Record; + +export interface RequestMetricsEvent { + endpoint?: string; + request_count?: number; + request_latency_ms_avg?: number; + response_count_http4xx?: number; + response_count_http5xx?: number; +} + +export interface AppkitLog { + event_name: AppkitEventName; + app_id?: string; + appkit_version?: string; + app_startup_event?: AppStartupEvent; + heartbeat_event?: HeartbeatEvent; + request_metrics_event?: RequestMetricsEvent; +} + +interface AppkitLogEnvelope { + frontend_log_event_id: string; + inferred_timestamp_millis: number; + entry: { appkit_log: AppkitLog }; +} + +interface TelemetryPayload { + uploadTime: number; + items: never[]; + protoLogs: string[]; +} + +export function wrapAppkitLog(log: AppkitLog): AppkitLogEnvelope { + return { + frontend_log_event_id: `appkit-${log.event_name.toLowerCase()}-${crypto.randomUUID()}`, + inferred_timestamp_millis: Date.now(), + entry: { appkit_log: log }, + }; +} + +export function buildAppkitPayload(logs: AppkitLog[]): TelemetryPayload { + return { + uploadTime: Date.now(), + items: [], + protoLogs: logs.map((log) => JSON.stringify(wrapAppkitLog(log))), + }; +} diff --git a/packages/appkit/src/internal-telemetry/config.ts b/packages/appkit/src/internal-telemetry/config.ts new file mode 100644 index 000000000..49fa8f2b6 --- /dev/null +++ b/packages/appkit/src/internal-telemetry/config.ts @@ -0,0 +1,12 @@ +/** + * Checks whether internal telemetry is enabled. + * Shared across all telemetry event types (startup, heartbeat, metrics, etc.). + */ +export function isInternalTelemetryEnabled(opts?: { + disableInternalTelemetry?: boolean; +}): boolean { + if (opts?.disableInternalTelemetry) return false; + if (process.env.DISABLE_APPKIT_INTERNAL_TELEMETRY === "true") return false; + if (process.env.DO_NOT_TRACK === "1") return false; + return true; +} diff --git a/packages/appkit/src/internal-telemetry/index.ts b/packages/appkit/src/internal-telemetry/index.ts new file mode 100644 index 000000000..c298cad02 --- /dev/null +++ b/packages/appkit/src/internal-telemetry/index.ts @@ -0,0 +1,8 @@ +// Internal telemetry: APP_STARTUP, HEARTBEAT, and REQUEST_METRICS events +// POSTed to /telemetry-ext so the Databricks team can prioritize SDK work. +// Disable with disableInternalTelemetry: true on createApp, +// DISABLE_APPKIT_INTERNAL_TELEMETRY=true, or DO_NOT_TRACK=1. +// Full data inventory: docs/docs/privacy.mdx. + +export { isInternalTelemetryEnabled } from "./config"; +export { TelemetryReporter } from "./reporter"; diff --git a/packages/appkit/src/internal-telemetry/reporter.ts b/packages/appkit/src/internal-telemetry/reporter.ts new file mode 100644 index 000000000..d6edbc5ee --- /dev/null +++ b/packages/appkit/src/internal-telemetry/reporter.ts @@ -0,0 +1,186 @@ +import type { WorkspaceClient } from "@databricks/sdk-experimental"; +import { + type AppkitLog, + buildAppkitPayload, + type RequestMetricsEvent, +} from "./appkit-log"; + +const DEFAULT_HEARTBEAT_INTERVAL_MS = 5 * 60 * 1000; +const DEFAULT_METRICS_FLUSH_INTERVAL_MS = 60 * 1000; + +interface ReporterOptions { + workspaceId: Promise | string; + client: WorkspaceClient; + appId: string; + appkitVersion: string; + heartbeatIntervalMs?: number; + metricsFlushIntervalMs?: number; +} + +interface RequestBucket { + count: number; + latencyMsTotal: number; + http4xx: number; + http5xx: number; +} + +function envIntervalMs(name: string, fallback: number): number { + const raw = process.env[name]; + if (!raw) return fallback; + const n = Number(raw); + return Number.isFinite(n) && n > 0 ? n : fallback; +} + +export class TelemetryReporter { + static #instance: TelemetryReporter | null = null; + + readonly #workspaceIdPromise: Promise; + readonly #client: WorkspaceClient; + readonly #appId: string; + readonly #appkitVersion: string; + readonly #heartbeatIntervalMs: number; + readonly #metricsFlushIntervalMs: number; + + #heartbeatTimer: NodeJS.Timeout | null = null; + #metricsTimer: NodeJS.Timeout | null = null; + #buckets: Map = new Map(); + + private constructor(opts: ReporterOptions) { + this.#workspaceIdPromise = Promise.resolve(opts.workspaceId); + // Mark the rejection (if any) as handled so a misconfigured workspaceId + // doesn't trigger an unhandled-rejection warning before the first #send + // awaits it. The original promise still rejects when awaited. + this.#workspaceIdPromise.catch(() => {}); + this.#client = opts.client; + this.#appId = opts.appId; + this.#appkitVersion = opts.appkitVersion; + this.#heartbeatIntervalMs = + opts.heartbeatIntervalMs ?? + envIntervalMs( + "APPKIT_TELEMETRY_HEARTBEAT_INTERVAL_MS", + DEFAULT_HEARTBEAT_INTERVAL_MS, + ); + this.#metricsFlushIntervalMs = + opts.metricsFlushIntervalMs ?? + envIntervalMs( + "APPKIT_TELEMETRY_METRICS_FLUSH_INTERVAL_MS", + DEFAULT_METRICS_FLUSH_INTERVAL_MS, + ); + } + + static initialize(opts: ReporterOptions): TelemetryReporter { + TelemetryReporter.#instance?.stop(); + TelemetryReporter.#instance = new TelemetryReporter(opts); + return TelemetryReporter.#instance; + } + + static getInstance(): TelemetryReporter | null { + return TelemetryReporter.#instance; + } + + /** @internal Test-only reset. */ + static _reset(): void { + TelemetryReporter.#instance?.stop(); + TelemetryReporter.#instance = null; + } + + start(): void { + if (this.#heartbeatTimer || this.#metricsTimer) return; + this.#heartbeatTimer = setInterval(() => { + this.sendHeartbeat().catch(() => {}); + }, this.#heartbeatIntervalMs); + this.#heartbeatTimer.unref?.(); + + this.#metricsTimer = setInterval(() => { + this.flushRequestMetrics().catch(() => {}); + }, this.#metricsFlushIntervalMs); + this.#metricsTimer.unref?.(); + } + + stop(): void { + if (this.#heartbeatTimer) clearInterval(this.#heartbeatTimer); + if (this.#metricsTimer) clearInterval(this.#metricsTimer); + this.#heartbeatTimer = null; + this.#metricsTimer = null; + } + + recordRequest( + method: string, + routeTemplate: string, + statusCode: number, + latencyMs: number, + ): void { + if (!routeTemplate) return; + const key = `${method.toUpperCase()} ${routeTemplate}`; + const bucket = this.#buckets.get(key) ?? { + count: 0, + latencyMsTotal: 0, + http4xx: 0, + http5xx: 0, + }; + bucket.count += 1; + bucket.latencyMsTotal += Math.max(0, latencyMs); + if (statusCode >= 400 && statusCode < 500) bucket.http4xx += 1; + if (statusCode >= 500 && statusCode < 600) bucket.http5xx += 1; + this.#buckets.set(key, bucket); + } + + async sendStartup(): Promise { + await this.#send([ + this.#wrap({ event_name: "APP_STARTUP", app_startup_event: {} }), + ]); + } + + async sendHeartbeat(): Promise { + await this.#send([ + this.#wrap({ event_name: "HEARTBEAT", heartbeat_event: {} }), + ]); + } + + async flushRequestMetrics(): Promise { + if (this.#buckets.size === 0) return; + const drained = this.#buckets; + this.#buckets = new Map(); + + const logs: AppkitLog[] = []; + for (const [endpoint, bucket] of drained) { + const event: RequestMetricsEvent = { + endpoint, + request_count: bucket.count, + request_latency_ms_avg: Math.round( + bucket.latencyMsTotal / bucket.count, + ), + response_count_http4xx: bucket.http4xx, + response_count_http5xx: bucket.http5xx, + }; + logs.push( + this.#wrap({ + event_name: "REQUEST_METRICS", + request_metrics_event: event, + }), + ); + } + await this.#send(logs); + } + + #wrap(partial: AppkitLog): AppkitLog { + return { + ...partial, + app_id: this.#appId, + appkit_version: this.#appkitVersion, + }; + } + + async #send(logs: AppkitLog[]): Promise { + if (logs.length === 0) return; + const workspaceId = await this.#workspaceIdPromise; + await this.#client.apiClient.request({ + path: "/telemetry-ext", + method: "POST", + query: { o: workspaceId }, + headers: new Headers(), + payload: buildAppkitPayload(logs), + raw: false, + }); + } +} diff --git a/packages/appkit/src/internal-telemetry/tests/appkit-log.test.ts b/packages/appkit/src/internal-telemetry/tests/appkit-log.test.ts new file mode 100644 index 000000000..6d419f0f9 --- /dev/null +++ b/packages/appkit/src/internal-telemetry/tests/appkit-log.test.ts @@ -0,0 +1,28 @@ +import { describe, expect, test } from "vitest"; +import { buildAppkitPayload, wrapAppkitLog } from "../appkit-log"; + +describe("appkit-log", () => { + test("wrapAppkitLog produces a typed envelope", () => { + const envelope = wrapAppkitLog({ + event_name: "HEARTBEAT", + app_id: "id", + appkit_version: "1.0.0", + heartbeat_event: {}, + }); + expect(envelope.frontend_log_event_id).toMatch(/^appkit-heartbeat-/); + expect(envelope.entry.appkit_log.event_name).toBe("HEARTBEAT"); + expect(typeof envelope.inferred_timestamp_millis).toBe("number"); + }); + + test("buildAppkitPayload encodes one protoLog per log", () => { + const payload = buildAppkitPayload([ + { event_name: "APP_STARTUP", app_startup_event: {} }, + { event_name: "HEARTBEAT", heartbeat_event: {} }, + ]); + expect(payload.items).toEqual([]); + expect(payload.protoLogs).toHaveLength(2); + expect(JSON.parse(payload.protoLogs[0]).entry.appkit_log.event_name).toBe( + "APP_STARTUP", + ); + }); +}); diff --git a/packages/appkit/src/internal-telemetry/tests/config.test.ts b/packages/appkit/src/internal-telemetry/tests/config.test.ts new file mode 100644 index 000000000..76f3842c9 --- /dev/null +++ b/packages/appkit/src/internal-telemetry/tests/config.test.ts @@ -0,0 +1,51 @@ +import { afterEach, describe, expect, test, vi } from "vitest"; +import { isInternalTelemetryEnabled } from "../config"; + +afterEach(() => { + vi.unstubAllEnvs(); +}); + +describe("isInternalTelemetryEnabled", () => { + test("returns true by default", () => { + expect(isInternalTelemetryEnabled()).toBe(true); + }); + + test("returns false when disableInternalTelemetry is true", () => { + expect(isInternalTelemetryEnabled({ disableInternalTelemetry: true })).toBe( + false, + ); + }); + + test("returns true when disableInternalTelemetry is false", () => { + expect( + isInternalTelemetryEnabled({ disableInternalTelemetry: false }), + ).toBe(true); + }); + + test("returns false when DISABLE_APPKIT_INTERNAL_TELEMETRY env var is true", () => { + vi.stubEnv("DISABLE_APPKIT_INTERNAL_TELEMETRY", "true"); + expect(isInternalTelemetryEnabled()).toBe(false); + }); + + test("returns true when DISABLE_APPKIT_INTERNAL_TELEMETRY env var is not true", () => { + vi.stubEnv("DISABLE_APPKIT_INTERNAL_TELEMETRY", "false"); + expect(isInternalTelemetryEnabled()).toBe(true); + }); + + test("config option takes precedence over env var", () => { + vi.stubEnv("DISABLE_APPKIT_INTERNAL_TELEMETRY", "false"); + expect(isInternalTelemetryEnabled({ disableInternalTelemetry: true })).toBe( + false, + ); + }); + + test("returns false when DO_NOT_TRACK env var is 1", () => { + vi.stubEnv("DO_NOT_TRACK", "1"); + expect(isInternalTelemetryEnabled()).toBe(false); + }); + + test("returns true when DO_NOT_TRACK env var is 0", () => { + vi.stubEnv("DO_NOT_TRACK", "0"); + expect(isInternalTelemetryEnabled()).toBe(true); + }); +}); diff --git a/packages/appkit/src/internal-telemetry/tests/reporter.test.ts b/packages/appkit/src/internal-telemetry/tests/reporter.test.ts new file mode 100644 index 000000000..e43cdb0f7 --- /dev/null +++ b/packages/appkit/src/internal-telemetry/tests/reporter.test.ts @@ -0,0 +1,225 @@ +import type { WorkspaceClient } from "@databricks/sdk-experimental"; +import { afterEach, describe, expect, test, vi } from "vitest"; +import { TelemetryReporter } from "../reporter"; + +type RequestSpy = ReturnType; + +function createMockClient(): { + client: WorkspaceClient; + request: RequestSpy; +} { + const request = vi.fn().mockResolvedValue({}); + const client = { apiClient: { request } } as unknown as WorkspaceClient; + return { client, request }; +} + +function baseOpts(): { + workspaceId: string; + client: WorkspaceClient; + appId: string; + appkitVersion: string; + heartbeatIntervalMs: number; + metricsFlushIntervalMs: number; + __spy: RequestSpy; +} { + const { client, request } = createMockClient(); + return { + workspaceId: "1234567890", + client, + appId: "app-uuid-1", + appkitVersion: "0.27.0", + heartbeatIntervalMs: 1_000_000, + metricsFlushIntervalMs: 1_000_000, + __spy: request, + }; +} + +afterEach(() => { + TelemetryReporter._reset(); + vi.restoreAllMocks(); +}); + +function lastProtoLog(spy: RequestSpy, callIndex = -1) { + const calls = spy.mock.calls; + const idx = callIndex < 0 ? calls.length + callIndex : callIndex; + const payload = calls[idx][0].payload as { protoLogs: string[] }; + return JSON.parse(payload.protoLogs[0]); +} + +describe("TelemetryReporter", () => { + test("getInstance returns null before initialize", () => { + expect(TelemetryReporter.getInstance()).toBeNull(); + }); + + test("sendStartup emits an APP_STARTUP appkit_log via apiClient.request", async () => { + const opts = baseOpts(); + const reporter = TelemetryReporter.initialize(opts); + await reporter.sendStartup(); + + expect(opts.__spy).toHaveBeenCalledOnce(); + const reqArg = opts.__spy.mock.calls[0][0]; + expect(reqArg).toMatchObject({ + path: "/telemetry-ext", + method: "POST", + query: { o: "1234567890" }, + raw: false, + }); + expect(lastProtoLog(opts.__spy).entry.appkit_log).toMatchObject({ + event_name: "APP_STARTUP", + app_id: "app-uuid-1", + appkit_version: "0.27.0", + app_startup_event: {}, + }); + }); + + test("sendHeartbeat emits a HEARTBEAT appkit_log", async () => { + const opts = baseOpts(); + const reporter = TelemetryReporter.initialize(opts); + await reporter.sendHeartbeat(); + + expect(lastProtoLog(opts.__spy).entry.appkit_log).toMatchObject({ + event_name: "HEARTBEAT", + heartbeat_event: {}, + }); + }); + + test("recordRequest aggregates by method+route and flush sends one log per endpoint", async () => { + const opts = baseOpts(); + const reporter = TelemetryReporter.initialize(opts); + reporter.recordRequest("GET", "/api/x", 200, 100); + reporter.recordRequest("get", "/api/x", 200, 200); + reporter.recordRequest("GET", "/api/x", 500, 50); + reporter.recordRequest("POST", "/api/y", 404, 10); + + await reporter.flushRequestMetrics(); + + expect(opts.__spy).toHaveBeenCalledOnce(); + const protoLogs = opts.__spy.mock.calls[0][0].payload.protoLogs as string[]; + expect(protoLogs).toHaveLength(2); + + const events = protoLogs + .map((s) => JSON.parse(s).entry.appkit_log.request_metrics_event) + .sort((a, b) => a.endpoint.localeCompare(b.endpoint)); + + expect(events[0]).toEqual({ + endpoint: "GET /api/x", + request_count: 3, + request_latency_ms_avg: 117, // (100 + 200 + 50) / 3 = 116.67 -> 117 + response_count_http4xx: 0, + response_count_http5xx: 1, + }); + expect(events[1]).toEqual({ + endpoint: "POST /api/y", + request_count: 1, + request_latency_ms_avg: 10, + response_count_http4xx: 1, + response_count_http5xx: 0, + }); + }); + + test("flushRequestMetrics is a no-op when there are no buckets", async () => { + const opts = baseOpts(); + const reporter = TelemetryReporter.initialize(opts); + await reporter.flushRequestMetrics(); + expect(opts.__spy).not.toHaveBeenCalled(); + }); + + test("flushRequestMetrics drains the aggregator after sending", async () => { + const opts = baseOpts(); + const reporter = TelemetryReporter.initialize(opts); + reporter.recordRequest("GET", "/api/x", 200, 10); + await reporter.flushRequestMetrics(); + opts.__spy.mockClear(); + await reporter.flushRequestMetrics(); + expect(opts.__spy).not.toHaveBeenCalled(); + }); + + test("recordRequest ignores entries without a route template", async () => { + const opts = baseOpts(); + const reporter = TelemetryReporter.initialize(opts); + reporter.recordRequest("GET", "", 200, 10); + await reporter.flushRequestMetrics(); + expect(opts.__spy).not.toHaveBeenCalled(); + }); + + test("start schedules heartbeat and metrics flush; stop clears them", () => { + vi.useFakeTimers(); + const reporter = TelemetryReporter.initialize({ + ...baseOpts(), + heartbeatIntervalMs: 1_000, + metricsFlushIntervalMs: 500, + }); + const heartbeatSpy = vi + .spyOn(reporter, "sendHeartbeat") + .mockResolvedValue(); + const flushSpy = vi + .spyOn(reporter, "flushRequestMetrics") + .mockResolvedValue(); + + reporter.start(); + vi.advanceTimersByTime(1_500); + expect(heartbeatSpy).toHaveBeenCalledTimes(1); + expect(flushSpy).toHaveBeenCalledTimes(3); + + reporter.stop(); + vi.advanceTimersByTime(5_000); + expect(heartbeatSpy).toHaveBeenCalledTimes(1); + expect(flushSpy).toHaveBeenCalledTimes(3); + vi.useRealTimers(); + }); + + test("propagates request errors so callers can surface them", async () => { + const opts = baseOpts(); + opts.__spy.mockRejectedValue(new Error("network down")); + const reporter = TelemetryReporter.initialize(opts); + await expect(reporter.sendHeartbeat()).rejects.toThrow("network down"); + }); + + test("propagates a rejecting workspaceId promise", async () => { + const opts = baseOpts(); + const reporter = TelemetryReporter.initialize({ + ...opts, + workspaceId: Promise.reject(new Error("nope")), + }); + await expect(reporter.sendHeartbeat()).rejects.toThrow("nope"); + expect(opts.__spy).not.toHaveBeenCalled(); + }); + + test("interval timers swallow rejections silently", async () => { + vi.useFakeTimers(); + const opts = baseOpts(); + opts.__spy.mockRejectedValue(new Error("network down")); + const reporter = TelemetryReporter.initialize({ + ...opts, + heartbeatIntervalMs: 100, + metricsFlushIntervalMs: 1_000_000, + }); + reporter.start(); + await vi.advanceTimersByTimeAsync(150); + // No unhandled rejection means the timer's outer .catch worked. + reporter.stop(); + vi.useRealTimers(); + }); + + test("re-initialize stops the previous instance's timers", () => { + vi.useFakeTimers(); + const first = TelemetryReporter.initialize({ + ...baseOpts(), + heartbeatIntervalMs: 100, + metricsFlushIntervalMs: 100, + }); + const firstHeartbeat = vi.spyOn(first, "sendHeartbeat").mockResolvedValue(); + first.start(); + + TelemetryReporter.initialize({ + ...baseOpts(), + heartbeatIntervalMs: 1_000_000, + metricsFlushIntervalMs: 1_000_000, + }); + + vi.advanceTimersByTime(500); + // The first reporter's timers must have been cleared by the re-init. + expect(firstHeartbeat).not.toHaveBeenCalled(); + vi.useRealTimers(); + }); +}); diff --git a/packages/appkit/src/plugins/server/index.ts b/packages/appkit/src/plugins/server/index.ts index 4e6dd6a2a..bd5e66745 100644 --- a/packages/appkit/src/plugins/server/index.ts +++ b/packages/appkit/src/plugins/server/index.ts @@ -6,6 +6,7 @@ import express from "express"; import getPort, { portNumbers } from "get-port"; import type { PluginClientConfigs, PluginPhase } from "shared"; import { ServerError } from "../../errors"; +import { TelemetryReporter } from "../../internal-telemetry"; import { createLogger } from "../../logging/logger"; import { Plugin, toPlugin } from "../../plugin"; import type { PluginManifest } from "../../registry"; @@ -109,6 +110,7 @@ export class ServerPlugin extends Plugin { * @returns The express application. */ async start(): Promise { + this.serverApplication.use(requestMetricsMiddleware); this.serverApplication.use( express.json({ type: (req) => { @@ -400,6 +402,8 @@ export class ServerPlugin extends Plugin { this.remoteTunnelController.cleanup(); } + TelemetryReporter.getInstance()?.stop(); + // 1. abort active operations from plugins const shutdownPlugins = this.context?.getPlugins(); if (shutdownPlugins) { @@ -470,6 +474,30 @@ export class ServerPlugin extends Plugin { const EXCLUDED_PLUGINS: string[] = [ServerPlugin.manifest.name]; +/** @internal Exported for unit tests. */ +export function requestMetricsMiddleware( + req: express.Request, + res: express.Response, + next: express.NextFunction, +) { + const startMs = Date.now(); + res.on("finish", () => { + const reporter = TelemetryReporter.getInstance(); + if (!reporter) return; + const routePath = (req.route as { path?: string } | undefined)?.path; + if (!routePath) return; + const baseUrl = req.baseUrl ?? ""; + const template = `${baseUrl}${routePath}`; + reporter.recordRequest( + req.method, + template, + res.statusCode, + Date.now() - startMs, + ); + }); + next(); +} + /** * @internal */ diff --git a/packages/appkit/src/plugins/server/tests/request-metrics-middleware.test.ts b/packages/appkit/src/plugins/server/tests/request-metrics-middleware.test.ts new file mode 100644 index 000000000..c908a494d --- /dev/null +++ b/packages/appkit/src/plugins/server/tests/request-metrics-middleware.test.ts @@ -0,0 +1,126 @@ +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { TelemetryReporter } from "../../../internal-telemetry"; +import { requestMetricsMiddleware } from "../index"; + +interface FakeRequest { + method: string; + baseUrl?: string; + route?: { path: string } | undefined; +} + +interface FakeResponse { + statusCode: number; + on: (event: string, handler: () => void) => void; + finish: () => void; +} + +function makeReq(opts: FakeRequest): FakeRequest { + return { ...opts }; +} + +function makeRes(statusCode = 200): FakeResponse { + let finishHandler: (() => void) | null = null; + return { + statusCode, + on(event, handler) { + if (event === "finish") finishHandler = handler; + }, + finish() { + finishHandler?.(); + }, + }; +} + +describe("requestMetricsMiddleware", () => { + let reporter: TelemetryReporter; + let recordSpy: ReturnType; + + beforeEach(() => { + reporter = TelemetryReporter.initialize({ + workspaceId: "ws-1", + client: { + apiClient: { request: vi.fn().mockResolvedValue({}) }, + } as any, + appId: "app-1", + appkitVersion: "0.0.0", + heartbeatIntervalMs: 1_000_000, + metricsFlushIntervalMs: 1_000_000, + }); + recordSpy = vi.spyOn(reporter, "recordRequest"); + }); + + afterEach(() => { + TelemetryReporter._reset(); + vi.restoreAllMocks(); + }); + + test("calls next() so the request continues", () => { + const next = vi.fn(); + requestMetricsMiddleware( + makeReq({ method: "GET", route: { path: "/x" } }) as any, + makeRes() as any, + next, + ); + expect(next).toHaveBeenCalledOnce(); + }); + + test("records (METHOD baseUrl+route, status, latency) on finish", () => { + const req = makeReq({ + method: "post", + baseUrl: "/api/users", + route: { path: "/:id/messages" }, + }); + const res = makeRes(201); + requestMetricsMiddleware(req as any, res as any, vi.fn()); + + res.finish(); + + expect(recordSpy).toHaveBeenCalledOnce(); + const [method, template, statusCode, latency] = recordSpy.mock.calls[0]; + expect(method).toBe("post"); + expect(template).toBe("/api/users/:id/messages"); + expect(statusCode).toBe(201); + expect(typeof latency).toBe("number"); + expect(latency).toBeGreaterThanOrEqual(0); + }); + + test("falls back to empty baseUrl when not on a sub-router", () => { + const req = makeReq({ method: "GET", route: { path: "/health" } }); + const res = makeRes(); + requestMetricsMiddleware(req as any, res as any, vi.fn()); + res.finish(); + expect(recordSpy.mock.calls.at(-1)?.[1]).toBe("/health"); + }); + + test("skips recording when no route was matched (no req.route)", () => { + const req = makeReq({ method: "GET", route: undefined }); + const res = makeRes(404); + requestMetricsMiddleware(req as any, res as any, vi.fn()); + res.finish(); + expect(recordSpy).not.toHaveBeenCalled(); + }); + + test("is a no-op when the reporter is not initialized", () => { + TelemetryReporter._reset(); + const req = makeReq({ method: "GET", route: { path: "/x" } }); + const res = makeRes(); + const next = vi.fn(); + requestMetricsMiddleware(req as any, res as any, next); + res.finish(); + expect(next).toHaveBeenCalledOnce(); + expect(recordSpy).not.toHaveBeenCalled(); + }); + + test("forwards 4xx and 5xx status codes intact", () => { + const req = makeReq({ method: "GET", route: { path: "/x" } }); + const res4xx = makeRes(404); + requestMetricsMiddleware(req as any, res4xx as any, vi.fn()); + res4xx.finish(); + const res5xx = makeRes(503); + requestMetricsMiddleware(req as any, res5xx as any, vi.fn()); + res5xx.finish(); + + expect(recordSpy.mock.calls[0][2]).toBe(404); + expect(recordSpy.mock.calls[1][2]).toBe(503); + }); +});