diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index d11dfe064..e9b2a7877 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -121,6 +121,26 @@ export interface paths { patch?: never; trace?: never; }; + "/pipeline_handle_events": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + get?: never; + put?: never; + /** + * Hand events to the source + * @description Streams the supplied events into the source connector's `handle_events` hook and returns the derived NDJSON messages (records, logs, traces). Stateless — no checkpointing or time limits. Fails 400 if the source does not implement `handle_events`. + */ + post: operations["pipeline_handle_events"]; + delete?: never; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; "/pipeline_write": { parameters: { query?: never; @@ -811,6 +831,38 @@ export interface components { type: "source_input"; source_input: unknown; }; + SourceStripeInput: { + /** @description Unique identifier for the object. */ + id: string; + /** @constant */ + object: "event"; + /** @description The connected account that originates the event. */ + account?: string; + api_version: string | null; + /** @description Time at which the object was created. Measured in seconds since the Unix epoch. */ + created: number; + data: { + object: { + [key: string]: unknown; + }; + previous_attributes?: { + [key: string]: unknown; + }; + }; + /** @description Has the value `true` if the object exists in live mode or the value `false` if the object exists in test mode. */ + livemode: boolean; + /** @description Number of webhooks that haven't been successfully delivered (for example, to return a 20x response) to the URLs you specify. */ + pending_webhooks: number; + request: { + id: string | null; + idempotency_key: string | null; + } | null; + /** @description Description of the event (for example, `invoice.created` or `charge.refunded`). */ + type: string; + }; + SourceEvents: { + stripe: components["schemas"]["SourceStripeInput"][]; + }; Message: components["schemas"]["RecordMessage"] | components["schemas"]["SourceStateMessage"] | components["schemas"]["CatalogMessage"] | components["schemas"]["LogMessage"] | components["schemas"]["SpecMessage"] | components["schemas"]["ConnectionStatusMessage"] | components["schemas"]["StreamStatusMessage"] | components["schemas"]["ControlMessage"] | components["schemas"]["ProgressMessage"] | components["schemas"]["EofMessage"] | components["schemas"]["SourceInputMessage"]; DiscoverOutput: components["schemas"]["CatalogMessage"] | components["schemas"]["LogMessage"]; DestinationOutput: components["schemas"]["Message"]; @@ -1078,6 +1130,45 @@ export interface operations { }; }; }; + pipeline_handle_events: { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + requestBody: { + content: { + "application/json": { + pipeline: components["schemas"]["PipelineConfig"]; + /** @description Events grouped by source connector type, e.g. { "stripe": [StripeEvent, ...] }. */ + events: components["schemas"]["SourceEvents"]; + }; + }; + }; + responses: { + /** @description NDJSON stream of messages emitted by handle_events */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/x-ndjson": components["schemas"]["Message"]; + }; + }; + /** @description Invalid params */ + 400: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": { + error: unknown; + }; + }; + }; + }; + }; pipeline_write: { parameters: { query?: never; diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index ace9d0e6b..a8e8685e6 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -3,7 +3,7 @@ "info": { "title": "Stripe Sync Engine", "version": "1.0.0", - "description": "Stripe Sync Engine — stateless, one-shot source/destination sync over HTTP.\nAll sync endpoints accept configuration via a JSON request body.\n\n## Endpoints\n\n| Method | Path | Summary |\n|--------|------|---------|\n| GET | /health | Health check |\n| POST | /pipeline_check | Check connector connection |\n| POST | /pipeline_setup | Set up destination schema |\n| POST | /pipeline_teardown | Tear down destination schema |\n| POST | /source_discover | Discover available streams |\n| POST | /pipeline_read | Read records from source |\n| POST | /pipeline_write | Write records to destination |\n| POST | /pipeline_sync | Run sync pipeline (read → write) |\n| POST | /pipeline_sync_batch | Run sync pipeline (batch, returns JSON) |\n| GET | /meta/sources | List available source connectors |\n| GET | /meta/sources/{type} | Get source connector spec |\n| GET | /meta/destinations | List available destination connectors |\n| GET | /meta/destinations/{type} | Get destination connector spec |\n| POST | /internal/query | Run a SQL query against a Postgres connection |" + "description": "Stripe Sync Engine — stateless, one-shot source/destination sync over HTTP.\nAll sync endpoints accept configuration via a JSON request body.\n\n## Endpoints\n\n| Method | Path | Summary |\n|--------|------|---------|\n| GET | /health | Health check |\n| POST | /pipeline_check | Check connector connection |\n| POST | /pipeline_setup | Set up destination schema |\n| POST | /pipeline_teardown | Tear down destination schema |\n| POST | /source_discover | Discover available streams |\n| POST | /pipeline_read | Read records from source |\n| POST | /pipeline_handle_events | Hand events to the source |\n| POST | /pipeline_write | Write records to destination |\n| POST | /pipeline_sync | Run sync pipeline (read → write) |\n| POST | /pipeline_sync_batch | Run sync pipeline (batch, returns JSON) |\n| GET | /meta/sources | List available source connectors |\n| GET | /meta/sources/{type} | Get source connector spec |\n| GET | /meta/destinations | List available destination connectors |\n| GET | /meta/destinations/{type} | Get destination connector spec |\n| POST | /internal/query | Run a SQL query against a Postgres connection |" }, "paths": { "/health": { @@ -397,6 +397,68 @@ } } }, + "/pipeline_handle_events": { + "post": { + "operationId": "pipeline_handle_events", + "tags": [ + "Stateless Sync API" + ], + "summary": "Hand events to the source", + "description": "Streams the supplied events into the source connector's `handle_events` hook and returns the derived NDJSON messages (records, logs, traces). Stateless — no checkpointing or time limits. Fails 400 if the source does not implement `handle_events`.", + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "pipeline": { + "$ref": "#/components/schemas/PipelineConfig" + }, + "events": { + "description": "Events grouped by source connector type, e.g. { \"stripe\": [StripeEvent, ...] }.", + "$ref": "#/components/schemas/SourceEvents" + } + }, + "required": [ + "pipeline", + "events" + ] + } + } + } + }, + "responses": { + "200": { + "description": "NDJSON stream of messages emitted by handle_events", + "content": { + "application/x-ndjson": { + "schema": { + "$ref": "#/components/schemas/Message" + } + } + } + }, + "400": { + "description": "Invalid params", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "error": {} + }, + "required": [ + "error" + ], + "additionalProperties": false + } + } + } + } + } + } + }, "/pipeline_write": { "post": { "operationId": "pipeline_write", @@ -2218,6 +2280,135 @@ "source_input" ] }, + "SourceStripeInput": { + "type": "object", + "properties": { + "id": { + "type": "string", + "description": "Unique identifier for the object." + }, + "object": { + "type": "string", + "const": "event" + }, + "account": { + "type": "string", + "description": "The connected account that originates the event." + }, + "api_version": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ] + }, + "created": { + "type": "number", + "description": "Time at which the object was created. Measured in seconds since the Unix epoch." + }, + "data": { + "type": "object", + "properties": { + "object": { + "type": "object", + "propertyNames": { + "type": "string" + }, + "additionalProperties": {} + }, + "previous_attributes": { + "type": "object", + "propertyNames": { + "type": "string" + }, + "additionalProperties": {} + } + }, + "required": [ + "object" + ], + "additionalProperties": false + }, + "livemode": { + "type": "boolean", + "description": "Has the value `true` if the object exists in live mode or the value `false` if the object exists in test mode." + }, + "pending_webhooks": { + "type": "number", + "description": "Number of webhooks that haven't been successfully delivered (for example, to return a 20x response) to the URLs you specify." + }, + "request": { + "anyOf": [ + { + "type": "object", + "properties": { + "id": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ] + }, + "idempotency_key": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ] + } + }, + "required": [ + "id", + "idempotency_key" + ], + "additionalProperties": false + }, + { + "type": "null" + } + ] + }, + "type": { + "type": "string", + "description": "Description of the event (for example, `invoice.created` or `charge.refunded`)." + } + }, + "required": [ + "id", + "object", + "api_version", + "created", + "data", + "livemode", + "pending_webhooks", + "request", + "type" + ], + "additionalProperties": false + }, + "SourceEvents": { + "type": "object", + "properties": { + "stripe": { + "type": "array", + "items": { + "$ref": "#/components/schemas/SourceStripeInput" + } + } + }, + "required": [ + "stripe" + ] + }, "Message": { "oneOf": [ { diff --git a/apps/engine/src/api/app.test.ts b/apps/engine/src/api/app.test.ts index 39851b8c6..60658f362 100644 --- a/apps/engine/src/api/app.test.ts +++ b/apps/engine/src/api/app.test.ts @@ -673,6 +673,41 @@ describe('POST /read', () => { }) }) +describe('POST /pipeline_handle_events', () => { + it('streams messages emitted by the source handle_events hook', async () => { + const app = await createApp(resolver) + + const events = { + test: [ + { + type: 'record', + record: { + stream: 'customer', + data: { id: 'cus_evt' }, + emitted_at: new Date().toISOString(), + }, + }, + ], + } + + const res = await app.request( + '/pipeline_handle_events', + jsonBody({ pipeline: testPipeline, events }) + ) + expect(res.status).toBe(200) + expect(res.headers.get('Content-Type')).toBe('application/x-ndjson') + + const messages = (await readNdjson(res)).filter((e) => e.type !== 'log') + expect(messages.some((e) => e.type === 'record' && e.record.data.id === 'cus_evt')).toBe(true) + }) + + it('returns 400 when body is missing', async () => { + const app = await createApp(resolver) + const res = await app.request('/pipeline_handle_events', { method: 'POST' }) + expect(res.status).toBe(400) + }) +}) + describe('POST /write', () => { it('accepts messages array, streams NDJSON state back', async () => { const app = await createApp(resolver) diff --git a/apps/engine/src/api/app.ts b/apps/engine/src/api/app.ts index e33c076cf..8edc6e948 100644 --- a/apps/engine/src/api/app.ts +++ b/apps/engine/src/api/app.ts @@ -114,6 +114,7 @@ export async function createApp(resolver: ConnectorResolver) { const { PipelineConfig: TypedPipelineConfig, + SourceEvents, sourceConfigNames, destConfigNames, } = createConnectorSchemas(resolver) @@ -186,6 +187,14 @@ export async function createApp(resolver: ConnectorResolver) { }), }) + const handleEventsRequestBody = z.object({ + pipeline: TypedPipelineConfig, + events: SourceEvents.meta({ + description: + 'Events grouped by source connector type, e.g. { "stripe": [StripeEvent, ...] }.', + }), + }) + const errorResponse = { description: 'Invalid params', content: { @@ -406,6 +415,62 @@ export async function createApp(resolver: ConnectorResolver) { }) }) + const pipelineHandleEventsRoute = createRoute({ + operationId: 'pipeline_handle_events', + method: 'post', + path: '/pipeline_handle_events', + tags: ['Stateless Sync API'], + summary: 'Hand events to the source', + description: + 'Streams the supplied events into the source connector\'s `handle_events` hook ' + + 'and returns the derived NDJSON messages (records, logs, traces). Stateless — ' + + 'no checkpointing or time limits. Fails 400 if the source does not implement ' + + '`handle_events`.', + requestBody: { + required: true, + content: { 'application/json': { schema: handleEventsRequestBody } }, + }, + responses: { + 200: { + description: 'NDJSON stream of messages emitted by handle_events', + content: { 'application/x-ndjson': { schema: ndjsonRef.Message } }, + }, + 400: errorResponse, + }, + }) + app.openapi(pipelineHandleEventsRoute, async (c) => { + const { pipeline, events } = c.req.valid('json') + const sourceEvents = (events as Record)[pipeline.source.type] + if (!Array.isArray(sourceEvents)) { + throw new HTTPException(400, { + message: `events.${pipeline.source.type} is required for source ${pipeline.source.type}`, + }) + } + + const input = (async function* () { + for (const event of sourceEvents) { + yield event + } + })() + + const context = { path: '/pipeline_handle_events', ...syncRequestContext(pipeline) } + const startedAt = Date.now() + log.info(context, 'Engine API /pipeline_handle_events started') + + const onDisconnect = () => + log.warn( + { elapsed_ms: Date.now() - startedAt, event: 'SYNC_CLIENT_DISCONNECT' }, + 'SYNC_CLIENT_DISCONNECT' + ) + const ac = createConnectionAbort(c, onDisconnect) + + const output = engine.pipeline_handle_events(pipeline, input) + return ndjsonResponse( + logApiStream('Engine API /pipeline_handle_events', output, context, startedAt), + { signal: ac.signal } + ) + }) + const pipelineWriteRoute = createRoute({ operationId: 'pipeline_write', method: 'post', diff --git a/apps/engine/src/lib/createSchemas.ts b/apps/engine/src/lib/createSchemas.ts index 5fe5769fd..0bdf296a8 100644 --- a/apps/engine/src/lib/createSchemas.ts +++ b/apps/engine/src/lib/createSchemas.ts @@ -94,9 +94,10 @@ export function createConnectorSchemas(resolver: ConnectorResolver) { .filter(([, r]) => r.rawInputJsonSchema != null) .map(([name, r]) => { const base = z.fromJSONSchema(r.rawInputJsonSchema!) - return (base instanceof z.ZodObject ? base : z.object({})).meta({ + const input = (base instanceof z.ZodObject ? base : z.object({})).meta({ id: connectorInputSchemaName(name), }) + return { name, input, variant: z.object({ [name]: z.array(input) }) } }) const SourceInputMessage = @@ -104,11 +105,16 @@ export function createConnectorSchemas(resolver: ConnectorResolver) { ? z .object({ type: z.literal('source_input'), - source_input: configUnion(inputSchemas), + source_input: configUnion(inputSchemas.map((s) => s.input)), }) .meta({ id: 'TypedSourceInputMessage' }) : undefined + const SourceEvents = + inputSchemas.length > 0 + ? configUnion(inputSchemas.map((s) => s.variant)).meta({ id: 'SourceEvents' }) + : z.record(z.string(), z.array(z.unknown())).meta({ id: 'SourceEvents' }) + const PipelineConfig = z .object({ source: SourceConfig, @@ -127,6 +133,7 @@ export function createConnectorSchemas(resolver: ConnectorResolver) { SourceConfig, DestinationConfig, SourceInputMessage, + SourceEvents, PipelineConfig, sourceConfigNames, destConfigNames, diff --git a/apps/engine/src/lib/engine.ts b/apps/engine/src/lib/engine.ts index 255a658f9..2d8138e10 100644 --- a/apps/engine/src/lib/engine.ts +++ b/apps/engine/src/lib/engine.ts @@ -154,6 +154,18 @@ export interface Engine { input?: AsyncIterable ): AsyncIterable + /** + * Hand a stream of externally-delivered events (webhooks, push payloads, etc.) + * to the source's `handle_events` hook and stream the derived {@link Message} + * objects back. Stateless — no checkpointing, no time limits. + * + * Throws if the source connector does not implement `handle_events`. + */ + pipeline_handle_events( + pipeline: PipelineConfig, + events: AsyncIterable + ): AsyncIterable + /** * Write a stream of messages to the destination. * Filters for record and state messages, enforces the configured catalog, @@ -581,6 +593,28 @@ export async function createEngine(resolver: ConnectorResolver): Promise ) }, + pipeline_handle_events(pipeline, events) { + return withAbortOnReturn(() => + (async function* (): AsyncGenerator { + const srcConnector = await resolver.resolveSource(pipeline.source.type) + if (!srcConnector.handle_events) { + throw new Error( + `Source connector "${pipeline.source.type}" does not implement handle_events()` + ) + } + const srcSpec = await getSpec(srcConnector, configPayload(pipeline.source)) + const { filteredCatalog } = await discoverCatalog(engine, pipeline) + const raw = srcConnector.handle_events( + { config: srcSpec.config, catalog: filteredCatalog }, + events + ) + for await (const msg of raw) { + yield Message.parse(msg) + } + })() + ) + }, + pipeline_write(pipeline, messages) { return withAbortOnReturn(() => (async function* () { diff --git a/apps/engine/src/lib/remote-engine.ts b/apps/engine/src/lib/remote-engine.ts index 3904aa197..74d086f55 100644 --- a/apps/engine/src/lib/remote-engine.ts +++ b/apps/engine/src/lib/remote-engine.ts @@ -149,6 +149,24 @@ export function createRemoteEngine(engineUrl: string): Engine { ) }, + pipeline_handle_events( + pipeline: PipelineConfig, + events: AsyncIterable + ): AsyncIterable { + return withAbortOnReturn((signal) => + (async function* () { + const eventBatch: unknown[] = [] + for await (const event of events) eventBatch.push(event) + const res = await post( + '/pipeline_handle_events', + { pipeline, events: { [pipeline.source.type]: eventBatch } }, + signal + ) + yield* parseNdjsonStream(res.body!) + })() + ) + }, + pipeline_write( pipeline: PipelineConfig, messages: AsyncIterable diff --git a/apps/engine/src/lib/source-test.ts b/apps/engine/src/lib/source-test.ts index 93d4ca130..7bd2bd8d8 100644 --- a/apps/engine/src/lib/source-test.ts +++ b/apps/engine/src/lib/source-test.ts @@ -67,6 +67,15 @@ export const sourceTest = { if (msg.type === 'record') recordCount++ } }, + + async *handle_events( + _params: { config: SourceTestConfig }, + events: AsyncIterable + ): AsyncIterable { + for await (const msg of events as AsyncIterable) { + yield msg + } + }, } satisfies Source export default sourceTest diff --git a/packages/protocol/src/protocol.ts b/packages/protocol/src/protocol.ts index 19e4314fb..3e825d7d1 100644 --- a/packages/protocol/src/protocol.ts +++ b/packages/protocol/src/protocol.ts @@ -744,6 +744,21 @@ export interface Source< $stdin?: AsyncIterable ): AsyncIterable + /** + * Process a stream of externally-delivered events (webhooks, push payloads, + * Kafka messages, etc.) and emit derived messages (record, log, trace). + * + * Decoupled from `read()`: backfill/polling lives in `read`, while + * `handle_events` is a pure event-driven entry point with no checkpoint + * state. Idempotent processing is the source's responsibility. + * + * Optional — only event-driven sources implement this. + */ + handle_events?( + params: { config: TConfig; catalog: ConfiguredCatalog }, + events: AsyncIterable + ): AsyncIterable + /** Provision external resources (webhook endpoints, replication slots, etc.). */ setup?(params: { config: TConfig; catalog: ConfiguredCatalog }): AsyncIterable diff --git a/packages/source-stripe/src/index.test.ts b/packages/source-stripe/src/index.test.ts index 2b652beb3..910202601 100644 --- a/packages/source-stripe/src/index.test.ts +++ b/packages/source-stripe/src/index.test.ts @@ -1760,6 +1760,72 @@ describe('StripeSource', () => { }) }) + describe('handle_events()', () => { + const registry: Record = { + customer: makeConfig({ order: 1, tableName: 'customer' }), + } + + beforeEach(() => { + vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + }) + + it('processes a stripe event and yields record + state, never paginating', async () => { + const event = makeEvent({ + id: 'evt_he_1', + type: 'customer.updated', + created: 1700000001, + dataObject: { id: 'cus_he_1', object: 'customer', name: 'Handled' }, + }) + + const messages = await collect( + source.handle_events!( + { config, catalog: catalog({ name: 'customer', primary_key: [['id']] }) }, + toIter(event) + ) + ) + + expect(messages).toHaveLength(2) + expect(messages[0]).toMatchObject({ + type: 'record', + record: { stream: 'customer', data: { id: 'cus_he_1', name: 'Handled' } }, + }) + expect(messages[1]).toMatchObject({ + type: 'source_state', + source_state: { stream: 'customer', data: { eventId: 'evt_he_1' } }, + }) + }) + + it('filters events for streams not in catalog', async () => { + const event = makeEvent({ + id: 'evt_he_other', + type: 'invoice.paid', + dataObject: { id: 'inv_he_1', object: 'invoice', amount: 100 }, + }) + + const messages = await collect( + source.handle_events!( + { config, catalog: catalog({ name: 'customer', primary_key: [['id']] }) }, + toIter(event) + ) + ) + + expect(messages).toHaveLength(0) + }) + + it('throws when raw webhook input is provided without webhook_secret', async () => { + const rawInput = { body: '{"id":"evt_1"}', headers: {} } + + await expect( + collect( + source.handle_events!( + { config, catalog: catalog({ name: 'customer' }) }, + toIter(rawInput) + ) + ) + ).rejects.toThrow('webhook_secret is required for raw webhook signature verification') + }) + }) + describe('read() — WebSocket streaming', () => { const registry: Record = { customer: makeConfig({ diff --git a/packages/source-stripe/src/index.ts b/packages/source-stripe/src/index.ts index 4f5d927ae..64d3cc350 100644 --- a/packages/source-stripe/src/index.ts +++ b/packages/source-stripe/src/index.ts @@ -14,8 +14,7 @@ import type { StripeEvent } from './spec.js' import { buildResourceRegistry, EXCLUDED_TABLES } from './resourceRegistry.js' import { catalogFromOpenApi, stampAccountIdEnum } from './catalog.js' import { BUNDLED_API_VERSION, resolveOpenApiSpec, SpecParser } from '@stripe/sync-openapi' -import { processStripeEvent } from './process-event.js' -import { processWebhookInput, createInputQueue, startWebhookServer } from './src-webhook.js' +import { processEventInput, createInputQueue, startWebhookServer } from './src-webhook.js' import { listApiBackfill, errorToConnectionStatus } from './src-list-api.js' import { pollEvents } from './src-events-api.js' import type { StripeWebSocketClient, StripeWebhookEvent } from './src-websocket.js' @@ -311,19 +310,7 @@ export function createStripeSource( // Event-driven mode: iterate over incoming webhook inputs if ($stdin) { for await (const input of $stdin) { - if ('body' in (input as object)) { - yield* processWebhookInput( - input as WebhookInput, - config, - catalog, - registry, - streamNames, - accountId - ) - } else { - const event = stripeEventSchema.parse(input) - yield* processStripeEvent(event, config, catalog, registry, streamNames, accountId) - } + yield* processEventInput(input, config, catalog, registry, streamNames, accountId) } return } @@ -390,25 +377,14 @@ export function createStripeSource( while (wsClient || httpServer) { const queued = await inputQueue.wait(signal) try { - if ('body' in queued.data) { - yield* processWebhookInput( - queued.data, - config, - catalog, - registry, - streamNames, - accountId - ) - } else { - yield* processStripeEvent( - queued.data, - config, - catalog, - registry, - streamNames, - accountId - ) - } + yield* processEventInput( + queued.data, + config, + catalog, + registry, + streamNames, + accountId + ) queued.resolve?.() } catch (err) { queued.reject?.(err instanceof Error ? err : new Error(String(err))) @@ -428,6 +404,37 @@ export function createStripeSource( })() ) }, + + handle_events({ config, catalog }, events) { + return withAbortOnReturn((signal) => + (async function* () { + const apiVersion = config.api_version ?? BUNDLED_API_VERSION + const client = makeClient({ ...config, api_version: apiVersion }, undefined, signal) + const resolved = await resolveOpenApiSpec({ apiVersion }, makeApiFetch(signal)) + const streamNames = new Set(catalog.streams.map((s) => s.stream.name)) + const registry = buildResourceRegistry( + resolved.spec, + config.api_key, + resolved.apiVersion, + config.base_url, + streamNames, + signal + ) + + let accountId: string + try { + accountId = (await resolveAccountMetadata(config, client)).accountId + } catch (err) { + yield errorToConnectionStatus(err) + return + } + + for await (const event of events) { + yield* processEventInput(event, config, catalog, registry, streamNames, accountId) + } + })() + ) + }, } } diff --git a/packages/source-stripe/src/src-webhook.ts b/packages/source-stripe/src/src-webhook.ts index 41b1927ce..2a39a17cf 100644 --- a/packages/source-stripe/src/src-webhook.ts +++ b/packages/source-stripe/src/src-webhook.ts @@ -1,6 +1,7 @@ import type { ConfiguredCatalog, Message } from '@stripe/sync-protocol' import http from 'node:http' import type { StripeEvent } from './spec.js' +import { stripeEventSchema } from './spec.js' import type { Config, WebhookInput } from './index.js' import type { ResourceConfig } from './types.js' import { processStripeEvent } from './process-event.js' @@ -31,6 +32,39 @@ export async function* processWebhookInput( yield* processStripeEvent(event, config, catalog, registry, streamNames, accountId) } +// MARK: - processEventInput + +/** + * Dispatch a single event input to the right processor based on its shape: + * raw `WebhookInput` (`{ body, headers }`) → signature-verify → process, + * anything else → parse as `StripeEvent` → process. + * + * Used by both `read()`'s event-driven branches and `handle_events()` so + * the dispatch logic lives in exactly one place. + */ +export async function* processEventInput( + input: WebhookInput | StripeEvent | unknown, + config: Config, + catalog: ConfiguredCatalog, + registry: Record, + streamNames: Set, + accountId?: string +): AsyncGenerator { + if (input != null && typeof input === 'object' && 'body' in input) { + yield* processWebhookInput( + input as WebhookInput, + config, + catalog, + registry, + streamNames, + accountId + ) + } else { + const event = stripeEventSchema.parse(input) + yield* processStripeEvent(event, config, catalog, registry, streamNames, accountId) + } +} + // MARK: - LiveInput queue /** An item in the live input queue. HTTP webhooks include resolve/reject for backpressure. */