Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .fernignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ testapp/
src/cache/
src/core/fetcher/custom.ts
src/datastream/
src/event-capture.ts
src/events.test.ts
src/events.ts
src/index.ts
Expand All @@ -30,6 +31,7 @@ tests/unit/cache/local.test.ts
tests/unit/datastream/datastream-client.test.ts
tests/unit/datastream/merge.test.ts
tests/unit/datastream/websocket-client.test.ts
tests/unit/event-capture.test.ts
tests/unit/events.test.ts
tests/unit/rules-engine.test.ts
tests/unit/wasm-integration.test.ts
Expand Down
124 changes: 124 additions & 0 deletions src/event-capture.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { CreateEventRequestBody, EventBody, EventType } from "./api";
import * as serializers from "./serialization";
import type { FetchFunction } from "./core/fetcher/Fetcher";

export const DEFAULT_EVENT_CAPTURE_BASE_URL = "https://c.schematichq.com";
const DEFAULT_TIMEOUT_MS = 10_000;

export interface EventCaptureClientOptions {
apiKey: string;
/** Fetcher created by the SchematicClient — reused so that offline mode,
* default headers, and retry/logging behavior stay consistent. */
fetcher: FetchFunction;
/** Static headers to include on every request (e.g. X-Fern-SDK-Name,
* X-Fern-SDK-Version) so the capture service receives the same SDK
* identifying headers as the REST API. */
headers?: Record<string, string>;
baseUrl?: string;
timeoutMs?: number;
}

interface CapturePayload {
api_key: string;
type: EventType;
body?: unknown;
sent_at?: string;
}

interface BatchPayload {
events: CapturePayload[];
}

const buildEndpoint = (baseUrl: string): string => {
return baseUrl.replace(/\/+$/, "") + "/batch";
};

const toCapturePayload = (event: CreateEventRequestBody, apiKey: string): CapturePayload => {
const payload: CapturePayload = {
api_key: apiKey,
type: event.eventType,
};

if (event.body !== undefined) {
payload.body = serializers.EventBody.jsonOrThrow(event.body as EventBody, {
unrecognizedObjectKeys: "strip",
});
}

if (event.sentAt !== undefined) {
payload.sent_at = event.sentAt instanceof Date ? event.sentAt.toISOString() : event.sentAt;
}

return payload;
};

const buildBatch = (events: CreateEventRequestBody[], apiKey: string): BatchPayload => {
return {
events: events.map((e) => toCapturePayload(e, apiKey)),
};
};

const describeFetcherError = (error: unknown): string => {
if (typeof error !== "object" || error === null || !("reason" in error)) {
return "unknown error";
}
const err = error as { reason: string; statusCode?: number; errorMessage?: string; body?: unknown };
switch (err.reason) {
case "status-code": {
const body = typeof err.body === "string" ? err.body : JSON.stringify(err.body ?? "");
return `HTTP ${err.statusCode}: ${body}`;
}
case "timeout":
return "request timed out";
case "non-json":
return `non-JSON response (HTTP ${err.statusCode})`;
case "body-is-null":
return `empty response body (HTTP ${err.statusCode})`;
default:
return err.errorMessage ?? "unknown error";
}
};

/**
* HTTP client for sending event batches directly to the Schematic event
* capture service (default: https://c.schematichq.com/batch).
*/
export class EventCaptureClient {
private readonly apiKey: string;
private readonly baseUrl: string;
private readonly timeoutMs: number;
private readonly fetcher: FetchFunction;
private readonly headers: Record<string, string>;

constructor(options: EventCaptureClientOptions) {
this.apiKey = options.apiKey;
this.baseUrl = options.baseUrl ?? DEFAULT_EVENT_CAPTURE_BASE_URL;
this.timeoutMs = options.timeoutMs ?? DEFAULT_TIMEOUT_MS;
this.fetcher = options.fetcher;
this.headers = options.headers ?? {};
}

public async sendBatch(events: CreateEventRequestBody[]): Promise<void> {
if (events.length === 0) {
return;
}

const response = await this.fetcher({
url: buildEndpoint(this.baseUrl),
method: "POST",
contentType: "application/json",
requestType: "json",
headers: {
...this.headers,
"X-Schematic-Api-Key": this.apiKey,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue here - I'd like to have our standard SDK type, version etc headers

},
body: buildBatch(events, this.apiKey),
timeoutMs: this.timeoutMs,
maxRetries: 0,
});

if (!response.ok) {
throw new Error(`capture service returned ${describeFetcherError(response.error)}`);
}
}
}
10 changes: 5 additions & 5 deletions src/events.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CreateEventRequestBody } from "./api";
import { EventsClient } from "./api/resources/events/client/Client";
import { EventCaptureClient } from "./event-capture";
import { ConsoleLogger, Logger } from "./logger";

const DEFAULT_FLUSH_INTERVAL = 1000; // 1 second
Expand All @@ -18,7 +18,7 @@ interface EventBufferOptions {

class EventBuffer {
private events: CreateEventRequestBody[] = [];
private eventsApi: EventsClient;
private captureClient: EventCaptureClient;
private interval: number;
private intervalId: NodeJS.Timeout | null = null;
private logger: Logger;
Expand All @@ -30,7 +30,7 @@ class EventBuffer {
private stopped: boolean = false;
private flushing: boolean = false; // Add flush state tracking

constructor(eventsApi: EventsClient, opts?: EventBufferOptions) {
constructor(captureClient: EventCaptureClient, opts?: EventBufferOptions) {
const {
logger = new ConsoleLogger(),
maxSize = DEFAULT_MAX_SIZE,
Expand All @@ -39,7 +39,7 @@ class EventBuffer {
maxRetries = DEFAULT_MAX_RETRIES,
initialRetryDelay = DEFAULT_INITIAL_RETRY_DELAY,
} = opts || {};
this.eventsApi = eventsApi;
this.captureClient = captureClient;
this.interval = interval;
this.logger = logger;
this.maxSize = maxSize;
Expand Down Expand Up @@ -74,7 +74,7 @@ class EventBuffer {
}

// Attempt to send events
await this.eventsApi.createEventBatch({ events });
await this.captureClient.sendBatch(events);
success = true;
} catch (err) {
lastError = err;
Expand Down
35 changes: 33 additions & 2 deletions src/wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { SchematicClient as BaseClient } from "./Client";
import { type CacheProvider, LocalCache } from "./cache";
import { ConsoleLogger, Logger } from "./logger";
import { EventBuffer } from "./events";
import { EventCaptureClient } from "./event-capture";
import { offlineFetcher, provideFetcher } from "./core/fetcher/custom";
import { RUNTIME } from "./core/runtime";
import { DataStreamClient, type DataStreamClientOptions } from "./datastream";
Expand Down Expand Up @@ -41,6 +42,8 @@ export interface SchematicOptions {
};
/** If using an API key that is not environment-specific, use this option to specify the environment */
environmentId?: string;
/** Custom base URL for the event capture service (default: https://c.schematichq.com) */
eventCaptureBaseURL?: string;
/** Interval in milliseconds for flushing event buffer */
eventBufferInterval?: number;
/** Default values for feature flags */
Expand Down Expand Up @@ -92,6 +95,7 @@ export class SchematicClient extends BaseClient {
apiKey = "",
basePath,
eventBufferInterval,
eventCaptureBaseURL,
flagDefaults = {},
logger = new ConsoleLogger(),
timeoutMs,
Expand All @@ -117,16 +121,43 @@ export class SchematicClient extends BaseClient {
offline = true;
}

// Build the fetcher once and share it with the event capture client so
// that offline mode, default headers, and retry/logging behavior stay
// consistent across API calls and event capture submissions.
const fetcher = offline ? offlineFetcher : provideFetcher(headers);

// Initialize wrapped client
super({
apiKey,
environment: basePath,
fetcher: offline ? offlineFetcher : provideFetcher(headers),
fetcher,
timeoutInSeconds: timeoutMs !== undefined ? timeoutMs / 1000 : undefined,
});

this.logger = logger;
this.eventBuffer = new EventBuffer(this.events, {

// Forward the same SDK identifying headers (X-Fern-Language,
// X-Fern-SDK-Name, X-Fern-SDK-Version, etc.) that BaseClient added to
// this._options.headers so that capture-service requests are
// attributable to the same SDK build as REST requests.
const sdkHeaders: Record<string, string> = {};
const baseClientHeaders = this._options?.headers;
if (baseClientHeaders) {
for (const [key, value] of Object.entries(baseClientHeaders)) {
if (typeof value === "string") {
sdkHeaders[key] = value;
}
}
}

const captureClient = new EventCaptureClient({
apiKey,
fetcher,
headers: sdkHeaders,
baseUrl: eventCaptureBaseURL,
timeoutMs,
});
this.eventBuffer = new EventBuffer(captureClient, {
interval: eventBufferInterval,
logger,
offline,
Expand Down
Loading