diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 36208e8cd..7649ae412 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -669,6 +669,8 @@ jobs: test-sync-engine.test.ts env: SKIP_SETUP: '1' + GH_TOKEN: ${{ github.token }} + GITHUB_TOKEN: ${{ github.token }} - name: Network interruption tests (pauses containers) run: | diff --git a/.gitignore b/.gitignore index 8384b555d..a775d10f7 100644 --- a/.gitignore +++ b/.gitignore @@ -58,5 +58,6 @@ apps/visualizer/out/ # Reconcile / verification output tmp/ +.tmp/ prev-run.txt verify-*.json diff --git a/apps/engine/package.json b/apps/engine/package.json index 78ec30e3f..6c34af35e 100644 --- a/apps/engine/package.json +++ b/apps/engine/package.json @@ -48,12 +48,14 @@ ], "dependencies": { "@hono/node-server": "^1", + "@stripe/sync-destination-stripe": "workspace:*", "@scalar/hono-api-reference": "^0.6", "@stripe/sync-destination-google-sheets": "workspace:*", "@stripe/sync-destination-postgres": "workspace:*", "@stripe/sync-hono-zod-openapi": "workspace:*", "@stripe/sync-logger": "workspace:*", "@stripe/sync-protocol": "workspace:*", + "@stripe/sync-source-postgres": "workspace:*", "@stripe/sync-source-stripe": "workspace:*", "@stripe/sync-ts-cli": "workspace:*", "@stripe/sync-util-postgres": "workspace:*", diff --git a/apps/engine/src/lib/default-connectors.ts b/apps/engine/src/lib/default-connectors.ts index a3414b114..e1d2c7801 100644 --- a/apps/engine/src/lib/default-connectors.ts +++ b/apps/engine/src/lib/default-connectors.ts @@ -1,12 +1,15 @@ import sourceStripe from '@stripe/sync-source-stripe' +import sourcePostgres from '@stripe/sync-source-postgres' +import destinationStripe from '@stripe/sync-destination-stripe' import destinationPostgres from '@stripe/sync-destination-postgres' import destinationGoogleSheets from '@stripe/sync-destination-google-sheets' import type { RegisteredConnectors } from './resolver.js' /** Default in-process connectors bundled with the engine. */ export const defaultConnectors: RegisteredConnectors = { - sources: { stripe: sourceStripe }, + sources: { stripe: sourceStripe, postgres: sourcePostgres }, destinations: { + stripe: destinationStripe, postgres: destinationPostgres, google_sheets: destinationGoogleSheets, }, diff --git a/apps/engine/src/lib/destination-filter.test.ts b/apps/engine/src/lib/destination-filter.test.ts index 556f8a095..3f4c10ae2 100644 --- a/apps/engine/src/lib/destination-filter.test.ts +++ b/apps/engine/src/lib/destination-filter.test.ts @@ -40,6 +40,7 @@ describe('applySelection()', () => { id: { type: 'string' }, name: { type: 'string' }, email: { type: 'string' }, + _updated_at: { type: 'string' }, phone: { type: 'string' }, }, }, @@ -47,7 +48,7 @@ describe('applySelection()', () => { ]) const filtered = applySelection(catalog) - expect(Object.keys(props(filtered))).toEqual(['id', 'name', 'email']) + expect(Object.keys(props(filtered))).toEqual(['id', 'name', 'email', '_updated_at']) }) it('passes catalog through unchanged when no fields configured', () => { diff --git a/apps/engine/src/lib/destination-filter.ts b/apps/engine/src/lib/destination-filter.ts index eac266f8a..e29c2059f 100644 --- a/apps/engine/src/lib/destination-filter.ts +++ b/apps/engine/src/lib/destination-filter.ts @@ -17,6 +17,7 @@ export function applySelection(catalog: ConfiguredCatalog): ConfiguredCatalog { for (const path of cs.stream.primary_key) { if (path[0]) allowed.add(path[0]) } + if (cs.stream.newer_than_field) allowed.add(cs.stream.newer_than_field) return { ...cs, stream: { diff --git a/apps/engine/src/lib/reverse-etl.test.ts b/apps/engine/src/lib/reverse-etl.test.ts new file mode 100644 index 000000000..cc11f3ea9 --- /dev/null +++ b/apps/engine/src/lib/reverse-etl.test.ts @@ -0,0 +1,547 @@ +import { describe, expect, it } from 'vitest' +import type { ConnectorResolver, ResolvedConnector } from './resolver.js' +import type { Destination, Source } from '@stripe/sync-protocol' +import { createEngine } from './engine.js' +import { createPostgresSource } from '@stripe/sync-source-postgres' +import { createStripeDestination } from '@stripe/sync-destination-stripe' + +function makeResolver(source: Source, destination: Destination): ConnectorResolver { + return { + resolveSource: async () => source, + resolveDestination: async () => destination, + sources: () => new Map>(), + destinations: () => new Map>(), + } +} + +function queryResult>(rows: T[]) { + return { + rows, + rowCount: rows.length, + command: 'SELECT', + oid: 0, + fields: [], + } +} + +function stripeResponse(json: unknown, init?: ResponseInit): Response { + return new Response(JSON.stringify(json), { + status: 200, + headers: { 'content-type': 'application/json' }, + ...init, + }) +} + +describe('reverse ETL', () => { + it('advances source_state through insert-only standard object creates', async () => { + const rows = [ + { + id: 'crm_123', + email: 'jenny@example.com', + full_name: 'Jenny Rosen', + updated_at: '2026-01-01T00:00:00.000Z', + }, + ] + const stripeRequests: Array<{ url: string; init?: RequestInit }> = [] + + const source = createPostgresSource({ + now: () => new Date('2026-05-03T00:00:00.000Z'), + createPool: () => ({ + async query(text: string, values?: unknown[]) { + if (text.includes('information_schema.columns')) { + return queryResult([ + { column_name: 'id', data_type: 'text', is_nullable: 'NO' }, + { column_name: 'email', data_type: 'text', is_nullable: 'NO' }, + { column_name: 'full_name', data_type: 'text', is_nullable: 'NO' }, + { + column_name: 'updated_at', + data_type: 'timestamp with time zone', + is_nullable: 'NO', + }, + ]) + } + + const cursor = values && values.length > 1 ? String(values[0]) : undefined + return queryResult(rows.filter((row) => !cursor || row.updated_at > cursor)) + }, + async end() {}, + }), + }) + + const destination = createStripeDestination({ + sleep: async () => {}, + fetch: async (url, init) => { + stripeRequests.push({ url: String(url), init }) + return stripeResponse({ + id: 'cus_123', + object: 'customer', + }) + }, + }) + + const engine = await createEngine(makeResolver(source, destination)) + const result = await engine.pipeline_sync_batch( + { + source: { + type: 'postgres', + postgres: { + url: 'postgres://example', + table: 'customers', + stream: 'customer', + primary_key: ['id'], + cursor_field: 'updated_at', + page_size: 100, + }, + }, + destination: { + type: 'stripe', + stripe: { + api_key: 'sk_test_123', + api_version: '2026-03-25.dahlia', + base_url: 'https://stripe.test', + object: 'standard_object', + write_mode: 'create', + streams: { + customer: { + field_mapping: { + email: 'email', + name: 'full_name', + }, + }, + }, + }, + }, + streams: [{ name: 'customer', sync_mode: 'incremental' }], + }, + { run_id: 'run_reverse_etl_standard_object_create_test' } + ) + + expect(result.ending_state?.source.streams.customer).toEqual({ + cursor: '2026-01-01T00:00:00.000Z', + primary_key: ['crm_123'], + }) + expect(stripeRequests.map((request) => request.url)).toEqual([ + 'https://stripe.test/v1/customers', + ]) + expect(Object.fromEntries(new URLSearchParams(String(stripeRequests[0]!.init?.body)))).toEqual({ + email: 'jenny@example.com', + name: 'Jenny Rosen', + }) + }) + + it('advances source_state through append-only Custom Object creates', async () => { + const rows = [ + { + id: 'device_123', + name: 'living room tv', + time_from_harvest: '2 days', + updated_at: '2026-01-01T00:00:00.000Z', + }, + ] + const stripeRequests: Array<{ url: string; init?: RequestInit }> = [] + + const source = createPostgresSource({ + now: () => new Date('2026-05-03T00:00:00.000Z'), + createPool: () => ({ + async query(text: string, values?: unknown[]) { + if (text.includes('information_schema.columns')) { + return queryResult([ + { column_name: 'id', data_type: 'text', is_nullable: 'NO' }, + { column_name: 'name', data_type: 'text', is_nullable: 'NO' }, + { column_name: 'time_from_harvest', data_type: 'text', is_nullable: 'YES' }, + { + column_name: 'updated_at', + data_type: 'timestamp with time zone', + is_nullable: 'NO', + }, + ]) + } + + const cursor = values && values.length > 1 ? String(values[0]) : undefined + return queryResult(rows.filter((row) => !cursor || row.updated_at > cursor)) + }, + async end() {}, + }), + }) + + const destination = createStripeDestination({ + sleep: async () => {}, + fetch: async (url, init) => { + stripeRequests.push({ url: String(url), init }) + if (String(url).endsWith('/v2/extend/object_definitions')) { + return stripeResponse({ + data: [ + { + id: 'cobjdef_matcha', + api_name_plural: 'matcha_objects', + properties: { + name: { type: 'string' }, + time_from_harvest: { type: 'string' }, + }, + }, + ], + }) + } + return stripeResponse({ + id: 'objrec_test_123', + object: 'v2.extend.objects.matcha_object', + }) + }, + }) + + const engine = await createEngine(makeResolver(source, destination)) + const result = await engine.pipeline_sync_batch( + { + source: { + type: 'postgres', + postgres: { + url: 'postgres://example', + table: 'devices', + primary_key: ['id'], + cursor_field: 'updated_at', + page_size: 100, + }, + }, + destination: { + type: 'stripe', + stripe: { + api_key: 'sk_test_123', + api_version: 'unsafe-development', + base_url: 'https://stripe.test', + object: 'custom_object', + write_mode: 'create', + streams: { + devices: { + plural_name: 'matcha_objects', + field_mapping: { + name: 'name', + time_from_harvest: 'time_from_harvest', + }, + }, + }, + }, + }, + streams: [{ name: 'devices', sync_mode: 'incremental' }], + }, + { run_id: 'run_reverse_etl_custom_object_create_test' } + ) + + expect(result.ending_state?.source.streams.devices).toEqual({ + cursor: '2026-01-01T00:00:00.000Z', + primary_key: ['device_123'], + }) + expect(stripeRequests.map((request) => request.url)).toEqual([ + 'https://stripe.test/v2/extend/object_definitions', + 'https://stripe.test/v2/extend/objects/matcha_objects', + ]) + expect(stripeRequests[1]!.init?.body).toBe( + JSON.stringify({ fields: { name: 'living room tv', time_from_harvest: '2 days' } }) + ) + }) + + it('creates a new Custom Object record when the same source row changes twice', async () => { + let rows = [ + { + id: 'device_123', + name: 'living room tv', + time_from_harvest: '2 days', + updated_at: '2026-01-01T00:00:00.000Z', + }, + ] + const stripeRequests: Array<{ url: string; init?: RequestInit }> = [] + + const source = createPostgresSource({ + now: () => new Date('2026-05-03T00:00:00.000Z'), + createPool: () => ({ + async query(text: string, values?: unknown[]) { + if (text.includes('information_schema.columns')) { + return queryResult([ + { column_name: 'id', data_type: 'text', is_nullable: 'NO' }, + { column_name: 'name', data_type: 'text', is_nullable: 'NO' }, + { column_name: 'time_from_harvest', data_type: 'text', is_nullable: 'YES' }, + { + column_name: 'updated_at', + data_type: 'timestamp with time zone', + is_nullable: 'NO', + }, + ]) + } + + const cursor = values && values.length > 1 ? String(values[0]) : undefined + return queryResult(rows.filter((row) => !cursor || row.updated_at > cursor)) + }, + async end() {}, + }), + }) + + const destination = createStripeDestination({ + sleep: async () => {}, + fetch: async (url, init) => { + stripeRequests.push({ url: String(url), init }) + if (String(url).endsWith('/v2/extend/object_definitions')) { + return stripeResponse({ + data: [ + { + id: 'cobjdef_matcha', + api_name_plural: 'matcha_objects', + properties: { + name: { type: 'string' }, + time_from_harvest: { type: 'string' }, + }, + }, + ], + }) + } + return stripeResponse({ + id: `objrec_test_${stripeRequests.length}`, + object: 'v2.extend.objects.matcha_object', + }) + }, + }) + + const pipeline = { + source: { + type: 'postgres', + postgres: { + url: 'postgres://example', + table: 'devices', + primary_key: ['id'], + cursor_field: 'updated_at', + page_size: 100, + }, + }, + destination: { + type: 'stripe', + stripe: { + api_key: 'sk_test_123', + api_version: 'unsafe-development', + base_url: 'https://stripe.test', + object: 'custom_object', + write_mode: 'create', + streams: { + devices: { + plural_name: 'matcha_objects', + field_mapping: { + name: 'name', + time_from_harvest: 'time_from_harvest', + }, + }, + }, + }, + }, + streams: [{ name: 'devices', sync_mode: 'incremental' as const }], + } + + const engine = await createEngine(makeResolver(source, destination)) + const first = await engine.pipeline_sync_batch(pipeline, { + run_id: 'run_reverse_etl_custom_object_create_twice_test', + }) + + rows = [ + { + id: 'device_123', + name: 'living room tv', + time_from_harvest: '3 days', + updated_at: '2026-01-02T00:00:00.000Z', + }, + ] + const second = await engine.pipeline_sync_batch(pipeline, { + state: first.ending_state, + run_id: 'run_reverse_etl_custom_object_create_twice_test', + }) + + expect(first.ending_state?.source.streams.devices).toEqual({ + cursor: '2026-01-01T00:00:00.000Z', + primary_key: ['device_123'], + }) + expect(second.ending_state?.source.streams.devices).toEqual({ + cursor: '2026-01-02T00:00:00.000Z', + primary_key: ['device_123'], + }) + expect(stripeRequests.map((request) => request.url)).toEqual([ + 'https://stripe.test/v2/extend/object_definitions', + 'https://stripe.test/v2/extend/objects/matcha_objects', + 'https://stripe.test/v2/extend/object_definitions', + 'https://stripe.test/v2/extend/objects/matcha_objects', + ]) + expect(stripeRequests[1]!.init?.body).toBe( + JSON.stringify({ fields: { name: 'living room tv', time_from_harvest: '2 days' } }) + ) + expect(stripeRequests[3]!.init?.body).toBe( + JSON.stringify({ fields: { name: 'living room tv', time_from_harvest: '3 days' } }) + ) + }) + + it('withholds source_state when Custom Object create fails', async () => { + const rows = [ + { + id: 'device_123', + name: 'living room tv', + updated_at: '2026-01-01T00:00:00.000Z', + }, + ] + + const source = createPostgresSource({ + now: () => new Date('2026-05-03T00:00:00.000Z'), + createPool: () => ({ + async query(text: string, values?: unknown[]) { + if (text.includes('information_schema.columns')) { + return queryResult([ + { column_name: 'id', data_type: 'text', is_nullable: 'NO' }, + { column_name: 'name', data_type: 'text', is_nullable: 'NO' }, + { + column_name: 'updated_at', + data_type: 'timestamp with time zone', + is_nullable: 'NO', + }, + ]) + } + const cursor = values && values.length > 1 ? String(values[0]) : undefined + return queryResult(rows.filter((row) => !cursor || row.updated_at > cursor)) + }, + async end() {}, + }), + }) + + const destination = createStripeDestination({ + sleep: async () => {}, + fetch: async (url) => { + if (String(url).endsWith('/v2/extend/object_definitions')) { + return stripeResponse({ + data: [ + { + id: 'cobjdef_matcha', + api_name_plural: 'matcha_objects', + properties: { name: { type: 'string' } }, + }, + ], + }) + } + return stripeResponse({ error: { message: 'custom object invalid' } }, { status: 400 }) + }, + }) + + const engine = await createEngine(makeResolver(source, destination)) + const result = await engine.pipeline_sync_batch( + { + source: { + type: 'postgres', + postgres: { + url: 'postgres://example', + table: 'devices', + primary_key: ['id'], + cursor_field: 'updated_at', + page_size: 100, + }, + }, + destination: { + type: 'stripe', + stripe: { + api_key: 'sk_test_123', + api_version: 'unsafe-development', + base_url: 'https://stripe.test', + object: 'custom_object', + write_mode: 'create', + streams: { + devices: { + plural_name: 'matcha_objects', + field_mapping: { + name: 'name', + }, + }, + }, + }, + }, + streams: [{ name: 'devices', sync_mode: 'incremental' }], + }, + { run_id: 'run_reverse_etl_custom_object_create_failure_test' } + ) + + expect(result.status).toBe('failed') + expect(result.ending_state?.source.streams.devices).toBeUndefined() + }) + + it('withholds source_state when Custom Object setup fails before records', async () => { + const source: Source = { + async *spec() { + yield { type: 'spec', spec: { config: {} } } + }, + async *check() { + yield { type: 'connection_status', connection_status: { status: 'succeeded' } } + }, + async *discover() { + yield { + type: 'catalog', + catalog: { + streams: [ + { + name: 'devices', + primary_key: [['id']], + newer_than_field: 'updated_at', + json_schema: { + type: 'object', + properties: { + id: { type: 'string' }, + updated_at: { type: 'string' }, + }, + }, + }, + ], + }, + } + }, + async *read() { + yield { + type: 'source_state', + source_state: { + state_type: 'stream', + stream: 'devices', + data: { cursor: '2026-01-01T00:00:00.000Z', primary_key: ['device_123'] }, + }, + } + yield { + type: 'source_state', + source_state: { + state_type: 'global', + data: { cursor: 'global_cursor_after_setup_failure' }, + }, + } + }, + } + const destination = createStripeDestination({ + fetch: async () => stripeResponse({ data: [] }), + }) + const engine = await createEngine(makeResolver(source, destination)) + + const result = await engine.pipeline_sync_batch( + { + source: { type: 'state_only', state_only: {} }, + destination: { + type: 'stripe', + stripe: { + api_key: 'sk_test_123', + api_version: 'unsafe-development', + base_url: 'https://stripe.test', + object: 'custom_object', + write_mode: 'create', + streams: { + devices: { + plural_name: 'matcha_objects', + field_mapping: { + name: 'name', + }, + }, + }, + }, + }, + streams: [{ name: 'devices', sync_mode: 'incremental' }], + }, + { run_id: 'run_reverse_etl_custom_object_setup_failure_test' } + ) + + expect(result.status).toBe('failed') + expect(result.run_progress.derived.total_state_count).toBe(0) + expect(result.ending_state?.source.streams.devices).toBeUndefined() + expect(result.ending_state?.source.global).toEqual({}) + }) +}) diff --git a/docs/plans/2026-05-03-reverse-etl-mvp.md b/docs/plans/2026-05-03-reverse-etl-mvp.md new file mode 100644 index 000000000..cb158b143 --- /dev/null +++ b/docs/plans/2026-05-03-reverse-etl-mvp.md @@ -0,0 +1,199 @@ +# Reverse ETL MVP + +## Context + +Sync engine usually moves Stripe data out: + +```text +source-stripe -> sync engine -> destination-postgres +``` + +This branch proves the reverse direction using the same connector model: + +```text +source-postgres -> sync engine -> destination-stripe -> Stripe Custom Objects +``` + +The MVP is deliberately narrow. It reads rows from one Postgres table or query and writes append-only records into a configured Stripe Custom Object. It does not try to become a generic reverse ETL platform, a Customer upsert tool, or a transformation DSL. + +## Current Shape + +`source-postgres` reads a deterministic incremental stream: + +- Connects to Postgres using `url` or `connection_string`. +- Reads either a `table` or a `query`. +- Requires `primary_key` and `cursor_field`. +- Discovers a catalog stream with `primary_key` and `newer_than_field`. +- Pages by `(cursor_field, primary_key)`. +- Emits `source_state` after page boundaries. + +`destination-stripe` writes Stripe Custom Objects only: + +- Requires `object: "custom_object"`. +- Requires `api_version: "unsafe-development"`. +- Requires `write_mode: "create"`. +- Requires per-stream config under `streams`. +- Maps source fields into `fields` for `POST /v2/extend/objects/{plural_name}`. +- Uses JSON request bodies for Stripe v2 Custom Object creates. +- Uses stable idempotency keys that include stream, operation, primary key, and `newer_than_field` value. + +The engine stays generic. It discovers the source catalog, applies selected fields, pipes records and state into the destination, and persists only the `source_state` messages the destination re-emits. + +## Example Pipeline + +```json +{ + "source": { + "type": "postgres", + "postgres": { + "url": "postgres://...", + "table": "devices", + "primary_key": ["id"], + "cursor_field": "updated_at", + "page_size": 100 + } + }, + "destination": { + "type": "stripe", + "stripe": { + "api_key": "sk_test_...", + "api_version": "unsafe-development", + "object": "custom_object", + "write_mode": "create", + "streams": { + "devices": { + "plural_name": "matcha_objects", + "field_mapping": { + "name": "name", + "time_from_harvest": "time_from_harvest" + } + } + } + } + }, + "streams": [{ "name": "devices", "sync_mode": "incremental" }] +} +``` + +## Goals + +- Prove reverse ETL fits normal connector composition. +- Keep source and destination isolated behind `@stripe/sync-protocol`. +- Add a small Postgres source that can read one table or query incrementally. +- Add a small Stripe destination for append-only Custom Object creates. +- Make checkpoint safety explicit: source cursors advance only after destination writes succeed. +- Keep config validation strict and consistent through the engine/service JSON Schema path. + +## Non-Goals + +- No Customer upsert support in this MVP. +- No generic Stripe object writer. +- No update/delete behavior for Custom Objects. +- No mapping UI. +- No generic transformation DSL. +- No destination-owned mapping state. +- No CDC/logical replication. +- No dead-letter queue or per-record recovery workflow. + +## Config Validation + +The Stripe destination config schema is intentionally strict. Legacy Custom Object shorthand keys are rejected instead of ignored: + +- `plural_name` +- `field_mapping` +- `stripe_record_id_field` +- `auto_map_fields` + +Those keys are only valid inside `streams[stream_name]`, where each stream names the Custom Object plural name and field mapping. + +The schema avoids merge-critical `superRefine()` rules because the engine and service validate connector configs from JSON Schema with `z.fromJSONSchema()`. Rules that must hold on API/engine paths are encoded structurally: + +- `object` is a literal `custom_object`. +- `api_version` is a literal `unsafe-development`. +- `write_mode` is a literal `create`. +- Unknown top-level keys are rejected. +- Unknown per-stream keys are rejected. +- `streams` is required. + +Runtime guards still fail closed before writes. They are a backstop, not the primary user-facing validation path. + +## Checkpoint Contract + +The destination is the commit gate. + +`source-postgres` can emit records and `source_state`, but the engine only persists states returned by `destination-stripe`. This lets the destination withhold checkpoints when Stripe writes fail. + +Rules: + +- After a successful record write, the destination re-emits the record. +- After successful prior writes, the destination re-emits stream `source_state`. +- If any stream write fails, state for that stream is withheld. +- If any stream write fails, global source state is withheld. +- If destination setup/config/OpenAPI validation fails before records, all source state is withheld. +- Setup failure emits failed connection/stream status so the run fails instead of silently advancing. + +This keeps checkpoint safety local to the destination, where write durability is known. The engine does not special-case Stripe. + +## Field Selection And Idempotency + +`applySelection()` preserves both primary key fields and `newer_than_field` when pruning selected fields. + +That matters for reverse ETL. `destination-stripe` includes the `newer_than_field` value in the idempotency key. If field selection removed the cursor field from the record payload, two updates to the same source row could generate the same Stripe idempotency key and be treated as a replay instead of a distinct append-only Custom Object create. + +## Failure Behavior + +| Case | Behavior | +| ------------------------------------ | ----------------------------------------------------------------------- | +| Legacy config key | Reject config through direct Zod and JSON Schema round-trip validation. | +| Customer config key | Reject config. Customer writes are not part of this MVP. | +| Missing Custom Object definition | Fail setup/check. Do not advance checkpoints. | +| Unknown mapped Custom Object field | Fail setup/check. Do not advance checkpoints. | +| Stripe 400 write error | Mark stream errored. Withhold stream and global state. | +| Stripe 429/5xx/network error | Retry with backoff and stable idempotency key. | +| State-only input after setup failure | Emit failure status and withhold stream/global state. | +| No rows after resume | Emit no records and create no Stripe objects. | + +## Why This Fits + +The connector model already has the right shape: + +- Sources own source cursors. +- Destinations own write durability. +- The engine wires streams together. +- Connector-specific behavior stays inside the connector. + +This branch does not add a new engine mode. Reverse ETL is just another source/destination pairing: + +```text +source-postgres -> destination-stripe +``` + +The important extension is behavioral, not architectural: destinations must be careful about when they pass through `source_state`. + +## Validation Run + +The branch was validated with: + +- `pnpm --filter @stripe/sync-destination-stripe build` +- `pnpm --filter @stripe/sync-engine build` +- `NODE_OPTIONS='--conditions=bun' pnpm --filter @stripe/sync-destination-stripe exec vitest run src/index.test.ts` +- `NODE_OPTIONS='--conditions=bun' pnpm --filter @stripe/sync-engine exec vitest run src/lib/destination-filter.test.ts src/lib/reverse-etl.test.ts` +- `./scripts/generate-openapi.sh` +- `pnpm lint` +- `git diff --check` + +Live validation used Docker Postgres and real Stripe Custom Objects: + +- Seeded a `devices` table in Docker Postgres. +- Synced one row into live Stripe `matcha_objects`. +- Resumed with no source changes and verified no new Stripe object was created. +- Updated the same source row with a newer cursor and verified a distinct second Custom Object was created. +- Verified fields and cursors. +- Deleted all live test objects and verified zero leftovers. + +## Follow-Ups + +- Decide whether Customer upserts should be a separate destination mode or a separate future plan. +- Decide whether destination-owned mapping state is needed before supporting update semantics. +- Add durable docs/examples once the Custom Object API shape is stable. +- Consider a reusable live reverse ETL e2e test harness if this path becomes a supported product surface. diff --git a/e2e/connector-loading.test.sh b/e2e/connector-loading.test.sh index 63d94547f..4b7a9d093 100755 --- a/e2e/connector-loading.test.sh +++ b/e2e/connector-loading.test.sh @@ -28,7 +28,9 @@ cleanup() { rm -f "$REPO_ROOT"/stripe-sync-openapi-*.tgz rm -f "$REPO_ROOT"/stripe-sync-engine-*.tgz rm -f "$REPO_ROOT"/stripe-sync-source-stripe-*.tgz + rm -f "$REPO_ROOT"/stripe-sync-source-postgres-*.tgz rm -f "$REPO_ROOT"/stripe-sync-destination-postgres-*.tgz + rm -f "$REPO_ROOT"/stripe-sync-destination-stripe-*.tgz rm -f "$REPO_ROOT"/stripe-sync-destination-google-sheets-*.tgz rm -f "$REPO_ROOT"/stripe-sync-state-postgres-*.tgz rm -f "$REPO_ROOT"/stripe-sync-util-postgres-*.tgz @@ -52,7 +54,9 @@ PROTOCOL_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-protocol pack 2>/de OPENAPI_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-openapi pack 2>/dev/null | tail -1) ENGINE_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-engine pack 2>/dev/null | tail -1) SOURCE_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-source-stripe pack 2>/dev/null | tail -1) +SOURCE_PG_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-source-postgres pack 2>/dev/null | tail -1) DEST_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-destination-postgres pack 2>/dev/null | tail -1) +DEST_STRIPE_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-destination-stripe pack 2>/dev/null | tail -1) DEST_SHEETS_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-destination-google-sheets pack 2>/dev/null | tail -1) STATE_PG_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-state-postgres pack 2>/dev/null | tail -1) UTIL_PG_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-util-postgres pack 2>/dev/null | tail -1) @@ -61,8 +65,9 @@ HONO_ZOD_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-hono-zod-openapi pa SUPABASE_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-integration-supabase pack 2>/dev/null | tail -1) LOGGER_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-logger pack 2>/dev/null | tail -1) -for tgz in "$PROTOCOL_TGZ" "$OPENAPI_TGZ" "$ENGINE_TGZ" "$SOURCE_TGZ" "$DEST_TGZ" "$DEST_SHEETS_TGZ" \ - "$STATE_PG_TGZ" "$UTIL_PG_TGZ" "$TSCLI_TGZ" "$HONO_ZOD_TGZ" "$SUPABASE_TGZ" "$LOGGER_TGZ"; do +for tgz in "$PROTOCOL_TGZ" "$OPENAPI_TGZ" "$ENGINE_TGZ" "$SOURCE_TGZ" "$SOURCE_PG_TGZ" \ + "$DEST_TGZ" "$DEST_STRIPE_TGZ" "$DEST_SHEETS_TGZ" "$STATE_PG_TGZ" "$UTIL_PG_TGZ" \ + "$TSCLI_TGZ" "$HONO_ZOD_TGZ" "$SUPABASE_TGZ" "$LOGGER_TGZ"; do if [ ! -f "$tgz" ]; then echo "FAIL: tarball not found: $tgz" exit 1 @@ -102,7 +107,9 @@ cat > package.json < package.json <&1 | tail -5 echo "" diff --git a/e2e/package.json b/e2e/package.json index c893429fb..1f86d214d 100644 --- a/e2e/package.json +++ b/e2e/package.json @@ -10,10 +10,12 @@ "hono": "^4", "@stripe/sync-destination-google-sheets": "workspace:*", "@stripe/sync-destination-postgres": "workspace:*", + "@stripe/sync-destination-stripe": "workspace:*", "@stripe/sync-engine": "workspace:*", "@stripe/sync-openapi": "workspace:*", "@stripe/sync-protocol": "workspace:*", "@stripe/sync-service": "workspace:*", + "@stripe/sync-source-postgres": "workspace:*", "@stripe/sync-source-stripe": "workspace:*", "@stripe/sync-test-utils": "workspace:*", "@temporalio/client": "^1", diff --git a/e2e/reverse-etl-demo-loop.ts b/e2e/reverse-etl-demo-loop.ts new file mode 100644 index 000000000..c52862ea2 --- /dev/null +++ b/e2e/reverse-etl-demo-loop.ts @@ -0,0 +1,327 @@ +/** + * Live reverse ETL demo loop: Postgres -> Sync Engine -> Stripe. + * + * This is intentionally demo-focused, not assertion-focused. It creates the + * demo tables if needed, then polls both pipelines every 2 seconds: + * + * - `crm_customers` -> Stripe Customer + * - `devices` -> Stripe Device Custom Object + * + * The loop persists Sync Engine state to `.tmp/reverse-etl-demo-state.json`. + * That state is the cursor. Do not use a stable `run_id` for this loop: + * `run_id` is for bounded backfills, while this demo should continuously pick + * up rows inserted after the script starts. + * + * What must exist: + * - Stripe Custom Objects enabled for the API key/account. + * - A Device Custom Object definition at `/v2/extend/objects/devices` with + * `name`, `device_id`, `device_type`, `city`, and `customer_id` fields. + * + * Terminal 1: start Postgres + * docker rm -f reverse-etl-demo-pg 2>/dev/null || true + * docker run --rm -d --name reverse-etl-demo-pg \ + * -e POSTGRES_PASSWORD=postgres -p 55439:5432 postgres:18 + * + * Terminal 2: run the demo loop with an in-process Sync Engine + * STRIPE_API_KEY=sk_test_... \ + * DATABASE_URL=postgres://postgres:postgres@127.0.0.1:55439/postgres \ + * DEMO_CUSTOM_OBJECT_PLURAL=devices \ + * pnpm --filter @stripe/sync-e2e exec tsx --conditions bun reverse-etl-demo-loop.ts + * + * Optional Terminal 2/3: run through a real Sync Engine HTTP server instead + * PORT=4010 pnpm --filter @stripe/sync-engine dev + * + * STRIPE_API_KEY=sk_test_... \ + * DATABASE_URL=postgres://postgres:postgres@127.0.0.1:55439/postgres \ + * ENGINE_URL=http://127.0.0.1:4010 \ + * DEMO_CUSTOM_OBJECT_PLURAL=devices \ + * pnpm --filter @stripe/sync-e2e exec tsx --conditions bun reverse-etl-demo-loop.ts + * + * Terminal 3: insert a Customer row + * psql "$DATABASE_URL" -c " + * INSERT INTO crm_customers (id, email, full_name) + * VALUES ( + * 'customer_' || floor(extract(epoch from clock_timestamp()) * 1000)::text, + * 'demo+' || floor(extract(epoch from clock_timestamp()) * 1000)::text || '@example.com', + * 'Demo Customer' + * ); + * " + * + * Terminal 3: insert a Device row + * psql "$DATABASE_URL" -c " + * INSERT INTO devices (name, device_id, device_type, city, customer_id) + * VALUES ( + * 'Demo Reader', + * 'device_' || floor(extract(epoch from clock_timestamp()) * 1000)::text, + * 'reader', + * 'San Francisco', + * 'customer_demo' + * ); + * " + * + * Dashboard: + * - Customers: https://dashboard.stripe.com/test/customers + * - Devices: https://dashboard.stripe.com/test/custom-objects/devices + * + * Useful reset while practicing: + * rm -f .tmp/reverse-etl-demo-state.json + * psql "$DATABASE_URL" -c "TRUNCATE crm_customers; DROP TABLE IF EXISTS devices;" + */ + +import { mkdir, readFile, writeFile } from 'node:fs/promises' +import { dirname, resolve } from 'node:path' +import { fileURLToPath } from 'node:url' +import pg from 'pg' +import type { EofPayload, PipelineConfig, SyncState } from '@stripe/sync-protocol' +import type { ConnectorResolver } from '../apps/engine/src/lib/index.ts' +import { createEngine } from '../apps/engine/src/lib/engine.ts' +import { createPostgresSource } from '../packages/source-postgres/src/index.ts' +import { createStripeDestination } from '../packages/destination-stripe/src/index.ts' + +type DemoState = { + customer?: SyncState + device?: SyncState +} + +type PipelineRunner = ( + pipeline: PipelineConfig, + state: SyncState | undefined +) => Promise + +const databaseUrl = + process.env.DATABASE_URL ?? 'postgres://postgres:postgres@127.0.0.1:55439/postgres' +const stripeApiKey = process.env.STRIPE_API_KEY +const engineUrl = process.env.ENGINE_URL +const customObjectPluralName = process.env.DEMO_CUSTOM_OBJECT_PLURAL ?? 'devices' +const stripeApiVersion = process.env.STRIPE_API_VERSION ?? '2026-03-25.dahlia' +const customObjectApiVersion = 'unsafe-development' +const pollMs = process.env.POLL_MS ? Number.parseInt(process.env.POLL_MS, 10) : 2_000 +const defaultStateFile = fileURLToPath( + new URL('../.tmp/reverse-etl-demo-state.json', import.meta.url) +) +const stateFile = process.env.DEMO_STATE_FILE + ? resolve(process.env.DEMO_STATE_FILE) + : defaultStateFile + +if (!stripeApiKey) { + throw new Error('Set STRIPE_API_KEY before running reverse-etl-demo-loop.ts') +} + +function now() { + return new Date().toISOString() +} + +function log(message: string, data?: unknown) { + const suffix = data === undefined ? '' : ` ${JSON.stringify(data)}` + console.log(`[${now()}] ${message}${suffix}`) +} + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +function makeResolver( + source: ReturnType, + destination: ReturnType +): ConnectorResolver { + return { + resolveSource: async () => source, + resolveDestination: async () => destination, + sources: () => new Map(), + destinations: () => new Map(), + } +} + +async function preparePostgres() { + const client = new pg.Client({ connectionString: databaseUrl }) + await client.connect() + try { + await client.query(` + CREATE TABLE IF NOT EXISTS crm_customers ( + id text PRIMARY KEY, + email text NOT NULL, + full_name text NOT NULL, + ignored_internal_note text, + updated_at timestamptz(3) NOT NULL DEFAULT date_trunc('milliseconds', clock_timestamp()) + ) + `) + + await client.query(` + CREATE TABLE IF NOT EXISTS devices ( + device_id text NOT NULL, + name text NOT NULL, + device_type text, + city text NOT NULL, + customer_id text NOT NULL, + updated_at timestamptz(3) NOT NULL DEFAULT date_trunc('milliseconds', clock_timestamp()), + PRIMARY KEY (device_id) + ) + `) + + log('Demo tables are ready', { tables: ['crm_customers', 'devices'] }) + } finally { + await client.end() + } +} + +function customerPipeline(): PipelineConfig { + return { + source: { + type: 'postgres', + postgres: { + url: databaseUrl, + table: 'crm_customers', + stream: 'customer', + primary_key: ['id'], + cursor_field: 'updated_at', + page_size: 100, + }, + }, + destination: { + type: 'stripe', + stripe: { + api_key: stripeApiKey, + api_version: stripeApiVersion, + object: 'standard_object', + write_mode: 'create', + streams: { + customer: { + field_mapping: { + email: 'email', + name: 'full_name', + }, + }, + }, + }, + }, + streams: [{ name: 'customer', sync_mode: 'incremental' }], + } +} + +function devicePipeline(): PipelineConfig { + return { + source: { + type: 'postgres', + postgres: { + url: databaseUrl, + table: 'devices', + stream: 'devices', + primary_key: ['device_id'], + cursor_field: 'updated_at', + page_size: 100, + }, + }, + destination: { + type: 'stripe', + stripe: { + api_key: stripeApiKey, + api_version: customObjectApiVersion, + object: 'custom_object', + write_mode: 'create', + streams: { + devices: { + plural_name: customObjectPluralName, + field_mapping: { + name: 'name', + device_id: 'device_id', + device_type: 'device_type', + city: 'city', + customer_id: 'customer_id', + }, + }, + }, + }, + }, + streams: [{ name: 'devices', sync_mode: 'incremental' }], + } +} + +async function createRunner(): Promise { + if (engineUrl) { + const baseUrl = engineUrl.endsWith('/') ? engineUrl : `${engineUrl}/` + log('Using remote Sync Engine', { engine_url: engineUrl }) + return async (pipeline, state) => { + const response = await fetch(new URL('pipeline_sync_batch', baseUrl), { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ pipeline, state }), + }) + const text = await response.text() + const json = text ? JSON.parse(text) : {} + if (!response.ok) { + throw new Error(`Remote Sync Engine returned ${response.status}: ${JSON.stringify(json)}`) + } + return json as EofPayload + } + } + + log('Using in-process Sync Engine') + const source = createPostgresSource() + const destination = createStripeDestination() + const engine = await createEngine(makeResolver(source, destination)) + return (pipeline, state) => engine.pipeline_sync_batch(pipeline, { state }) +} + +async function loadState(): Promise { + try { + return JSON.parse(await readFile(stateFile, 'utf8')) as DemoState + } catch (err) { + if (err instanceof Error && 'code' in err && err.code === 'ENOENT') return {} + throw err + } +} + +async function saveState(state: DemoState) { + await mkdir(dirname(stateFile), { recursive: true }) + await writeFile(stateFile, JSON.stringify(state, null, 2) + '\n') +} + +function streamSummary(result: EofPayload, stream: string) { + return { + status: result.status, + has_more: result.has_more, + request_records: result.request_progress.streams[stream]?.record_count ?? 0, + total_records: result.run_progress.streams[stream]?.record_count ?? 0, + cursor: result.ending_state?.source.streams[stream], + } +} + +async function main() { + await preparePostgres() + const runner = await createRunner() + const pipelines = { + customer: customerPipeline(), + device: devicePipeline(), + } + let state = await loadState() + + log('Reverse ETL demo loop started', { + poll_ms: pollMs, + state_file: stateFile, + custom_object_plural_name: customObjectPluralName, + }) + log('Insert rows in another terminal, then watch the Stripe Dashboard.') + + while (true) { + try { + const customerResult = await runner(pipelines.customer, state.customer) + state = { ...state, customer: customerResult.ending_state } + log('Customer poll complete', streamSummary(customerResult, 'customer')) + + const deviceResult = await runner(pipelines.device, state.device) + state = { ...state, device: deviceResult.ending_state } + log('Device poll complete', streamSummary(deviceResult, 'devices')) + + await saveState(state) + } catch (err) { + console.error(`[${now()}] Demo poll failed`, err instanceof Error ? err.stack : err) + } + + await sleep(pollMs) + } +} + +main().catch((err) => { + console.error(err instanceof Error ? err.stack : String(err)) + process.exitCode = 1 +}) diff --git a/e2e/reverse-etl-e2e.ts b/e2e/reverse-etl-e2e.ts new file mode 100644 index 000000000..9718963c8 --- /dev/null +++ b/e2e/reverse-etl-e2e.ts @@ -0,0 +1,333 @@ +/** + * Live reverse ETL e2e script: Postgres -> destination-stripe -> Stripe. + * + * What must be running: + * - A local Postgres database reachable by DATABASE_URL. + * - Stripe Custom Objects must be enabled for the API key/account. + * - The Device Custom Object definition named by DEMO_CUSTOM_OBJECT_PLURAL must + * exist and define `name`, `device_id`, `device_type`, `city`, and `customer_id` + * fields. + * + * Example setup: + * docker run --rm -d --name reverse-etl-e2e-pg \ + * -e POSTGRES_PASSWORD=postgres -p 55439:5432 postgres:18 + * + * Example run: + * STRIPE_API_KEY=sk_test_... \ + * DATABASE_URL=postgres://postgres:postgres@127.0.0.1:55439/postgres \ + * DEMO_CUSTOM_OBJECT_PLURAL=devices \ + * pnpm --filter @stripe/sync-e2e exec tsx --conditions bun reverse-etl-e2e.ts + * + * The script creates disposable Postgres tables, syncs one row to a regular + * Stripe Customer and one row to a Stripe Custom Object, verifies both through + * the Stripe API, and best-effort cleans up the Stripe Customer + Postgres + * tables. Custom Object deletion is best effort because the v2 API is still + * evolving. + */ + +import pg from 'pg' +import { createEngine } from '../apps/engine/src/lib/engine.ts' +import { createPostgresSource } from '../packages/source-postgres/src/index.ts' +import { createStripeDestination } from '../packages/destination-stripe/src/index.ts' + +const databaseUrl = + process.env.DATABASE_URL ?? 'postgres://postgres:postgres@127.0.0.1:55439/postgres' +const stripeApiKey = process.env.STRIPE_API_KEY +const customObjectPluralName = process.env.DEMO_CUSTOM_OBJECT_PLURAL ?? 'devices' +const stripeApiVersion = '2026-03-25.dahlia' +const customObjectApiVersion = 'unsafe-development' +const runId = `reverse_etl_e2e_${Date.now()}` + +if (!stripeApiKey) { + throw new Error('Set STRIPE_API_KEY before running reverse-etl-e2e.ts') +} + +function now() { + return new Date().toISOString() +} + +function log(message: string, data?: unknown) { + const suffix = data === undefined ? '' : ` ${JSON.stringify(data)}` + console.log(`[${now()}] ${message}${suffix}`) +} + +function makeResolver(source: unknown, destination: unknown) { + return { + resolveSource: async () => source, + resolveDestination: async () => destination, + sources: () => new Map(), + destinations: () => new Map(), + } +} + +async function stripeJson( + method: string, + path: string, + apiVersion: string, + body?: Record +) { + const response = await fetch(new URL(path, 'https://api.stripe.com'), { + method, + headers: { + Authorization: `Bearer ${stripeApiKey}`, + 'Stripe-Version': apiVersion, + ...(body ? { 'Content-Type': 'application/x-www-form-urlencoded' } : {}), + }, + body: body ? new URLSearchParams(body as Record).toString() : undefined, + }) + const text = await response.text() + const json = text ? JSON.parse(text) : {} + if (!response.ok) { + throw new Error(json?.error?.message ?? text) + } + return json as Record +} + +function customObjectFieldValue(record: Record, field: string) { + const fields = record.fields as Record | undefined + const value = fields?.[field] ?? record[field] + if (value && typeof value === 'object' && 'value' in value) { + return (value as { value: unknown }).value + } + return value +} + +async function preparePostgres(client: pg.Client) { + await client.query(`DROP TABLE IF EXISTS crm_customers`) + await client.query(`DROP TABLE IF EXISTS devices`) + + await client.query(` + CREATE TABLE crm_customers ( + id text PRIMARY KEY, + email text NOT NULL, + full_name text NOT NULL, + ignored_internal_note text, + updated_at timestamptz(3) NOT NULL + ) + `) + await client.query(` + CREATE TABLE devices ( + device_id text PRIMARY KEY, + name text NOT NULL, + device_type text, + city text NOT NULL, + customer_id text NOT NULL, + updated_at timestamptz(3) NOT NULL + ) + `) + + await client.query( + `INSERT INTO crm_customers + (id, email, full_name, ignored_internal_note, updated_at) + VALUES ($1, $2, $3, $4, date_trunc('milliseconds', clock_timestamp()))`, + [ + 'customer_row_1', + `${runId}@example.com`, + `Sync Engine Customer ${runId}`, + 'must not be sent to Stripe', + ] + ) + await client.query( + `INSERT INTO devices + (device_id, name, device_type, city, customer_id, updated_at) + VALUES ($1, $2, $3, $4, $5, date_trunc('milliseconds', clock_timestamp()))`, + [`device_${runId}`, `Sync Engine Device ${runId}`, 'reader', 'San Francisco', 'customer_row_1'] + ) +} + +async function syncStripeCustomer(engine: Awaited>) { + log('Syncing Postgres table crm_customers -> Stripe Customer') + const result = await engine.pipeline_sync_batch( + { + source: { + type: 'postgres', + postgres: { + url: databaseUrl, + table: 'crm_customers', + stream: 'customer', + primary_key: ['id'], + cursor_field: 'updated_at', + page_size: 100, + }, + }, + destination: { + type: 'stripe', + stripe: { + api_key: stripeApiKey, + api_version: stripeApiVersion, + object: 'standard_object', + write_mode: 'create', + streams: { + customer: { + field_mapping: { + email: 'email', + name: 'full_name', + }, + }, + }, + }, + }, + streams: [{ name: 'customer', sync_mode: 'incremental' }], + }, + { run_id: `${runId}_customer` } + ) + + const email = `${runId}@example.com` + const list = await stripeJson( + 'GET', + `/v1/customers?email=${encodeURIComponent(email)}&limit=1`, + stripeApiVersion + ) + const customer = Array.isArray(list.data) + ? (list.data[0] as Record | undefined) + : undefined + if (!customer || typeof customer.id !== 'string') { + throw new Error(`Could not find created Stripe Customer for ${email}`) + } + if (customer.email !== email || customer.name !== `Sync Engine Customer ${runId}`) { + throw new Error( + `Created Stripe Customer fields did not match: email=${String(customer.email)} name=${String(customer.name)}` + ) + } + + log('Verified Stripe Customer', { + id: customer.id, + email: customer.email, + name: customer.name, + ending_state: result.ending_state?.source.streams.customer, + }) + return customer.id +} + +async function syncCustomObject(engine: Awaited>) { + log(`Syncing Postgres table devices -> Custom Object ${customObjectPluralName}`) + const result = await engine.pipeline_sync_batch( + { + source: { + type: 'postgres', + postgres: { + url: databaseUrl, + table: 'devices', + stream: 'devices', + primary_key: ['device_id'], + cursor_field: 'updated_at', + page_size: 100, + }, + }, + destination: { + type: 'stripe', + stripe: { + api_key: stripeApiKey, + api_version: customObjectApiVersion, + object: 'custom_object', + write_mode: 'create', + streams: { + devices: { + plural_name: customObjectPluralName, + field_mapping: { + name: 'name', + device_id: 'device_id', + device_type: 'device_type', + city: 'city', + customer_id: 'customer_id', + }, + }, + }, + }, + }, + streams: [{ name: 'devices', sync_mode: 'incremental' }], + }, + { run_id: `${runId}_custom_object` } + ) + + const list = await stripeJson( + 'GET', + `/v2/extend/objects/${customObjectPluralName}?limit=100`, + customObjectApiVersion + ) + const records = Array.isArray(list.data) ? (list.data as Record[]) : [] + const object = records.find( + (record) => customObjectFieldValue(record, 'device_id') === `device_${runId}` + ) + if (!object || typeof object.id !== 'string') { + throw new Error(`Could not find created Device Custom Object with device_id device_${runId}`) + } + const expectedDeviceFields = { + name: `Sync Engine Device ${runId}`, + device_id: `device_${runId}`, + device_type: 'reader', + city: 'San Francisco', + customer_id: 'customer_row_1', + } + for (const [field, expected] of Object.entries(expectedDeviceFields)) { + const actual = customObjectFieldValue(object, field) + if (actual !== expected) { + throw new Error(`Created Device Custom Object ${field} did not match: ${String(actual)}`) + } + } + + log('Verified Device Custom Object', { + id: object.id, + name: customObjectFieldValue(object, 'name'), + device_id: customObjectFieldValue(object, 'device_id'), + device_type: customObjectFieldValue(object, 'device_type'), + city: customObjectFieldValue(object, 'city'), + customer_id: customObjectFieldValue(object, 'customer_id'), + ending_state: result.ending_state?.source.streams.devices, + }) + return object.id +} + +async function main() { + const client = new pg.Client({ connectionString: databaseUrl }) + let customerId: string | undefined + let customObjectId: string | undefined + + await client.connect() + try { + await preparePostgres(client) + + const source = createPostgresSource() + const destination = createStripeDestination() + const engine = await createEngine(makeResolver(source, destination)) + + customerId = await syncStripeCustomer(engine) + customObjectId = await syncCustomObject(engine) + + log('Reverse ETL e2e passed', { + stripe_customer_id: customerId, + custom_object_id: customObjectId, + }) + } finally { + if (customerId) { + await stripeJson('DELETE', `/v1/customers/${customerId}`, stripeApiVersion).catch((err) => { + console.error( + `Customer cleanup failed: ${err instanceof Error ? err.message : String(err)}` + ) + }) + log('Deleted Stripe Customer', { id: customerId }) + } + if (customObjectId) { + await stripeJson( + 'DELETE', + `/v2/extend/objects/${customObjectPluralName}/${customObjectId}`, + customObjectApiVersion + ) + .then(() => log('Deleted Custom Object', { id: customObjectId })) + .catch((err) => { + console.error( + `Custom Object cleanup failed: ${err instanceof Error ? err.message : String(err)}` + ) + }) + } + + await client.query(`DROP TABLE IF EXISTS crm_customers`).catch(() => {}) + await client.query(`DROP TABLE IF EXISTS devices`).catch(() => {}) + await client.end() + } +} + +main().catch((err) => { + console.error(err instanceof Error ? err.stack : String(err)) + process.exitCode = 1 +}) diff --git a/e2e/test-server-all-api.test.ts b/e2e/test-server-all-api.test.ts index a6dd9a8e7..7cc57d317 100644 --- a/e2e/test-server-all-api.test.ts +++ b/e2e/test-server-all-api.test.ts @@ -98,6 +98,12 @@ function schemaForVersion(apiVersion: string): string { function getGithubToken(): string | null { if (githubToken !== undefined) return githubToken + const envToken = process.env.GH_TOKEN ?? process.env.GITHUB_TOKEN + if (envToken) { + githubToken = envToken + return githubToken + } + try { const token = execSync('gh auth token', { cwd: new URL('..', import.meta.url).pathname, diff --git a/packages/destination-stripe/package.json b/packages/destination-stripe/package.json new file mode 100644 index 000000000..d2ab7531f --- /dev/null +++ b/packages/destination-stripe/package.json @@ -0,0 +1,34 @@ +{ + "name": "@stripe/sync-destination-stripe", + "version": "0.2.5", + "private": false, + "type": "module", + "exports": { + ".": { + "bun": "./src/index.ts", + "types": "./dist/index.d.ts", + "import": "./dist/index.js" + } + }, + "bin": { + "destination-stripe": "./dist/bin.js" + }, + "scripts": { + "build": "tsc", + "test": "vitest" + }, + "files": [ + "src", + "dist" + ], + "dependencies": { + "@stripe/sync-logger": "workspace:*", + "@stripe/sync-openapi": "workspace:*", + "@stripe/sync-protocol": "workspace:*", + "zod": "^4.3.6" + }, + "devDependencies": { + "@types/node": "^24.10.1", + "vitest": "^3.2.4" + } +} diff --git a/packages/destination-stripe/src/bin.ts b/packages/destination-stripe/src/bin.ts new file mode 100644 index 000000000..0fc2b14b8 --- /dev/null +++ b/packages/destination-stripe/src/bin.ts @@ -0,0 +1,6 @@ +#!/usr/bin/env node +import connector from './index.js' +import { configSchema } from './spec.js' +import { runConnectorCli } from '@stripe/sync-protocol/cli' + +runConnectorCli(connector, { name: 'destination-stripe', configSchema }) diff --git a/packages/destination-stripe/src/index.test.ts b/packages/destination-stripe/src/index.test.ts new file mode 100644 index 000000000..61e36171f --- /dev/null +++ b/packages/destination-stripe/src/index.test.ts @@ -0,0 +1,707 @@ +import { describe, expect, it } from 'vitest' +import { z } from 'zod' +import type { ConfiguredCatalog, Message } from '@stripe/sync-protocol' +import { BUNDLED_API_VERSION } from '@stripe/sync-openapi' +import { createStripeDestination } from './index.js' +import spec, { configSchema } from './spec.js' + +async function collect(iterable: AsyncIterable): Promise { + const out: T[] = [] + for await (const item of iterable) out.push(item) + return out +} + +function response(json: unknown, init?: ResponseInit): Response { + return new Response(JSON.stringify(json), { + status: 200, + headers: { 'content-type': 'application/json' }, + ...init, + }) +} + +function inputMessages(): Message[] { + return [ + { + type: 'record', + record: { + stream: 'crm_customers', + data: { + id: 'crm_123', + email: 'jenny@example.com', + name: 'Jenny Rosen', + plan: 'enterprise', + updated_at: '2026-01-01T00:00:00.000Z', + }, + emitted_at: '2026-05-03T00:00:00.000Z', + }, + }, + { + type: 'source_state', + source_state: { + state_type: 'stream', + stream: 'crm_customers', + data: { cursor: '2026-01-01T00:00:00.000Z', primary_key: ['crm_123'] }, + }, + }, + ] +} + +const customObjectConfig = configSchema.parse({ + api_key: 'sk_test_123', + api_version: 'unsafe-development', + base_url: 'https://stripe.test', + object: 'custom_object', + write_mode: 'create', + streams: { + crm_customers: { + plural_name: 'loyalty_cards', + field_mapping: { + nickname: 'name', + tier: 'plan', + }, + }, + }, +}) + +const standardObjectConfig = configSchema.parse({ + api_key: 'sk_test_123', + api_version: BUNDLED_API_VERSION, + base_url: 'https://stripe.test', + object: 'standard_object', + write_mode: 'create', + streams: { + customer: { + field_mapping: { + email: 'email', + name: 'name', + }, + }, + }, +}) + +const catalog: ConfiguredCatalog = { + streams: [ + { + stream: { + name: 'crm_customers', + primary_key: [['id']], + newer_than_field: 'updated_at', + }, + sync_mode: 'incremental', + destination_sync_mode: 'append', + }, + ], +} + +const standardObjectCatalog: ConfiguredCatalog = { + streams: [ + { + stream: { + name: 'customer', + primary_key: [['id']], + newer_than_field: 'updated_at', + }, + sync_mode: 'incremental', + destination_sync_mode: 'append', + }, + ], +} + +function customObjectDefinitions(fields = ['nickname', 'tier']) { + return { + data: [ + { + id: 'cobjdef_123', + api_name_plural: 'loyalty_cards', + api_name_singular: 'loyalty_card', + properties: Object.fromEntries(fields.map((name) => [name, { type: 'string' }])), + }, + ], + } +} + +describe('destination-stripe', () => { + it('rejects unsupported Stripe objects without attempting a write', async () => { + const requests: Array<{ url: string; init?: RequestInit }> = [] + const destination = createStripeDestination({ + fetch: async (url, init) => { + requests.push({ url: String(url), init }) + return response({ id: 'unexpected' }) + }, + }) + const invoiceConfig = { ...customObjectConfig, object: 'invoice' } as typeof customObjectConfig + + const messages = await collect( + destination.write({ config: invoiceConfig, catalog }, inputMessages()) + ) + + expect(requests).toEqual([]) + expect(messages).toEqual([ + { + type: 'connection_status', + connection_status: { + status: 'failed', + message: + 'destination-stripe supports object: "custom_object" or "standard_object"; object "invoice" is not supported', + }, + }, + { + type: 'stream_status', + stream_status: { + stream: 'crm_customers', + status: 'error', + error: + 'destination-stripe supports object: "custom_object" or "standard_object"; object "invoice" is not supported', + }, + }, + ]) + }) + + it('rejects unsupported Stripe objects before reading stdin', async () => { + const requests: Array<{ url: string; init?: RequestInit }> = [] + const destination = createStripeDestination({ + fetch: async (url, init) => { + requests.push({ url: String(url), init }) + return response({ id: 'unexpected' }) + }, + }) + const invoiceConfig = { ...customObjectConfig, object: 'invoice' } as typeof customObjectConfig + + const messages = await collect(destination.write({ config: invoiceConfig, catalog }, [])) + + expect(requests).toEqual([]) + expect(messages).toEqual([ + { + type: 'connection_status', + connection_status: { + status: 'failed', + message: + 'destination-stripe supports object: "custom_object" or "standard_object"; object "invoice" is not supported', + }, + }, + { + type: 'stream_status', + stream_status: { + stream: 'crm_customers', + status: 'error', + error: + 'destination-stripe supports object: "custom_object" or "standard_object"; object "invoice" is not supported', + }, + }, + ]) + }) + + it('validates Custom Object and standard object config through the JSON Schema path', () => { + const jsonSchemaConfig = z.fromJSONSchema(spec.config) + const { streams: _streams, ...missingStreamsConfig } = customObjectConfig + + expect(jsonSchemaConfig.safeParse(customObjectConfig).success).toBe(true) + expect(jsonSchemaConfig.safeParse(standardObjectConfig).success).toBe(true) + for (const invalidConfig of [ + missingStreamsConfig, + { ...customObjectConfig, api_version: '2026-03-25.dahlia' }, + { ...customObjectConfig, object: 'customer' }, + { ...customObjectConfig, write_mode: 'upsert' }, + { ...standardObjectConfig, api_version: 'unsafe-development' }, + { ...standardObjectConfig, object: 'customer' }, + { ...standardObjectConfig, streams: { customer: {} } }, + { + ...standardObjectConfig, + streams: { customer: { field_mapping: { email: 'email' } } }, + mode: 'upsert', + }, + { ...customObjectConfig, identity: { external_id_field: 'id' } }, + { ...customObjectConfig, fields: { email: 'email' } }, + { ...customObjectConfig, plural_name: 'loyalty_cards' }, + { ...customObjectConfig, field_mapping: { nickname: 'name' } }, + { ...customObjectConfig, stripe_record_id_field: 'stripe_custom_object_id' }, + { ...customObjectConfig, auto_map_fields: true }, + ]) { + expect(jsonSchemaConfig.safeParse(invalidConfig).success).toBe(false) + expect(configSchema.safeParse(invalidConfig).success).toBe(false) + } + }) + + it('creates a standard object with mapped form parameters', async () => { + const requests: Array<{ url: string; init?: RequestInit }> = [] + const destination = createStripeDestination({ + sleep: async () => {}, + fetch: async (url, init) => { + requests.push({ url: String(url), init }) + return response({ id: 'cus_123', object: 'customer' }) + }, + }) + + const messages = await collect( + destination.write({ config: standardObjectConfig, catalog: standardObjectCatalog }, [ + { + type: 'record', + record: { + stream: 'customer', + data: { + id: 'crm_123', + email: 'jenny@example.com', + name: 'Jenny Rosen', + plan: 'enterprise', + updated_at: '2026-01-01T00:00:00.000Z', + }, + emitted_at: '2026-05-03T00:00:00.000Z', + }, + }, + { + type: 'source_state', + source_state: { + state_type: 'stream', + stream: 'customer', + data: { cursor: '2026-01-01T00:00:00.000Z', primary_key: ['crm_123'] }, + }, + }, + ]) + ) + + expect(messages.map((message) => message.type)).toEqual(['record', 'source_state']) + expect(requests).toHaveLength(1) + expect(requests[0]!.url).toBe('https://stripe.test/v1/customers') + expect(requests[0]!.init?.method).toBe('POST') + expect(Object.fromEntries(new URLSearchParams(String(requests[0]!.init?.body)))).toEqual({ + email: 'jenny@example.com', + name: 'Jenny Rosen', + }) + expect((requests[0]!.init?.headers as Record)['Content-Type']).toBe( + 'application/x-www-form-urlencoded' + ) + expect((requests[0]!.init?.headers as Record)['Stripe-Version']).toBe( + BUNDLED_API_VERSION + ) + expect((requests[0]!.init?.headers as Record)['Idempotency-Key']).toMatch( + /^reverse-etl-/ + ) + }) + + it('fails standard object check for unknown mapped create parameters', async () => { + const destination = createStripeDestination({ + fetch: async () => response({ id: 'unexpected' }), + }) + const invalidConfig = configSchema.parse({ + ...standardObjectConfig, + streams: { + customer: { + field_mapping: { + not_a_customer_param: 'email', + }, + }, + }, + }) + + const messages = await collect(destination.check({ config: invalidConfig })) + + expect(messages).toEqual([ + { + type: 'connection_status', + connection_status: { + status: 'failed', + message: + 'Standard object stream "customer" does not define create parameter(s): not_a_customer_param', + }, + }, + ]) + }) + + it('checks Custom Object definitions with the unsafe-development version header', async () => { + const requests: Array<{ url: string; init?: RequestInit }> = [] + const destination = createStripeDestination({ + fetch: async (url, init) => { + requests.push({ url: String(url), init }) + return response(customObjectDefinitions()) + }, + }) + + const messages = await collect(destination.check({ config: customObjectConfig })) + + expect(messages).toEqual([ + { type: 'connection_status', connection_status: { status: 'succeeded' } }, + ]) + expect(requests).toHaveLength(1) + expect(requests[0]!.url).toBe('https://stripe.test/v2/extend/object_definitions') + expect((requests[0]!.init?.headers as Record)['Stripe-Version']).toBe( + 'unsafe-development' + ) + expect((requests[0]!.init?.headers as Record)['Content-Type']).toBeUndefined() + }) + + it('fails Custom Object check when no definitions exist', async () => { + const destination = createStripeDestination({ + fetch: async () => response({ data: [] }), + }) + + const messages = await collect(destination.check({ config: customObjectConfig })) + + expect(messages).toEqual([ + { + type: 'connection_status', + connection_status: { + status: 'failed', + message: + 'No Stripe Custom Object definitions found; cannot validate configured custom object streams', + }, + }, + ]) + }) + + it('fails Custom Object check for unknown plural_name', async () => { + const destination = createStripeDestination({ + fetch: async () => + response({ + data: [{ api_name_plural: 'other_cards', properties: { nickname: { type: 'string' } } }], + }), + }) + + const messages = await collect(destination.check({ config: customObjectConfig })) + + expect(messages).toEqual([ + { + type: 'connection_status', + connection_status: { + status: 'failed', + message: + 'Stripe Custom Object definition "loyalty_cards" for stream "crm_customers" was not found', + }, + }, + ]) + }) + + it('fails Custom Object check for unknown mapped fields', async () => { + const destination = createStripeDestination({ + fetch: async () => response(customObjectDefinitions(['nickname'])), + }) + + const messages = await collect(destination.check({ config: customObjectConfig })) + + expect(messages).toEqual([ + { + type: 'connection_status', + connection_status: { + status: 'failed', + message: + 'Stripe Custom Object "loyalty_cards" for stream "crm_customers" does not define mapped field(s): tier', + }, + }, + ]) + }) + + it('withholds source_state when Custom Object setup fails before records', async () => { + const destination = createStripeDestination({ + fetch: async () => response({ data: [] }), + }) + + const messages = await collect( + destination.write({ config: customObjectConfig, catalog }, [ + { + type: 'source_state', + source_state: { + state_type: 'stream', + stream: 'crm_customers', + data: { cursor: '2026-01-01T00:00:00.000Z' }, + }, + }, + { + type: 'source_state', + source_state: { + state_type: 'global', + data: { cursor: 'global_cursor_after_setup_failure' }, + }, + }, + ]) + ) + + const error = + 'No Stripe Custom Object definitions found; cannot validate configured custom object streams' + expect(messages).toEqual([ + { + type: 'connection_status', + connection_status: { status: 'failed', message: error }, + }, + { + type: 'stream_status', + stream_status: { + stream: 'crm_customers', + status: 'error', + error, + }, + }, + ]) + }) + + it('fails setup when a selected Custom Object stream is unmapped', async () => { + const destination = createStripeDestination({ + fetch: async () => + response({ + data: [ + customObjectDefinitions().data[0], + { + id: 'cobjdef_456', + api_name_plural: 'account_cards', + api_name_singular: 'account_card', + properties: { label: { type: 'string' } }, + }, + ], + }), + }) + const multiStreamCatalog: ConfiguredCatalog = { + streams: [ + ...catalog.streams, + { + stream: { + name: 'crm_accounts', + primary_key: [['id']], + newer_than_field: 'updated_at', + }, + sync_mode: 'incremental', + destination_sync_mode: 'append', + }, + ], + } + + const messages = await collect( + destination.write({ config: customObjectConfig, catalog: multiStreamCatalog }, [ + { + type: 'source_state', + source_state: { + state_type: 'stream', + stream: 'crm_accounts', + data: { cursor: '2026-01-01T00:00:00.000Z' }, + }, + }, + ]) + ) + + const error = 'No Stripe Custom Object stream config found for stream "crm_accounts"' + expect(messages).toEqual([ + { + type: 'connection_status', + connection_status: { status: 'failed', message: error }, + }, + { + type: 'stream_status', + stream_status: { stream: 'crm_customers', status: 'error', error }, + }, + { + type: 'stream_status', + stream_status: { stream: 'crm_accounts', status: 'error', error }, + }, + ]) + }) + + it('creates a Custom Object with JSON fields and passes source_state after success', async () => { + const requests: Array<{ url: string; init?: RequestInit }> = [] + const destination = createStripeDestination({ + sleep: async () => {}, + fetch: async (url, init) => { + requests.push({ url: String(url), init }) + if (String(url).endsWith('/v2/extend/object_definitions')) { + return response(customObjectDefinitions()) + } + return response({ id: 'co_123', object: 'v2.extend.object' }) + }, + }) + + const messages = await collect( + destination.write({ config: customObjectConfig, catalog }, inputMessages()) + ) + + expect(messages.map((message) => message.type)).toEqual(['record', 'source_state']) + expect(requests).toHaveLength(2) + expect(requests[1]!.url).toBe('https://stripe.test/v2/extend/objects/loyalty_cards') + expect(requests[1]!.init?.method).toBe('POST') + expect(requests[1]!.init?.body).toBe( + JSON.stringify({ fields: { nickname: 'Jenny Rosen', tier: 'enterprise' } }) + ) + expect((requests[1]!.init?.headers as Record)['Content-Type']).toBe( + 'application/json' + ) + expect((requests[1]!.init?.headers as Record)['Stripe-Version']).toBe( + 'unsafe-development' + ) + }) + + it('routes multiple streams to different Custom Object plural names', async () => { + const requests: Array<{ url: string; init?: RequestInit }> = [] + const destination = createStripeDestination({ + sleep: async () => {}, + fetch: async (url, init) => { + requests.push({ url: String(url), init }) + if (String(url).endsWith('/v2/extend/object_definitions')) { + return response({ + data: [ + customObjectDefinitions().data[0], + { + id: 'cobjdef_456', + api_name_plural: 'account_cards', + api_name_singular: 'account_card', + properties: { label: { type: 'string' } }, + }, + ], + }) + } + return response({ id: 'co_123', object: 'v2.extend.object' }) + }, + }) + const multiStreamConfig = configSchema.parse({ + ...customObjectConfig, + streams: { + crm_customers: customObjectConfig.streams.crm_customers, + crm_accounts: { + plural_name: 'account_cards', + field_mapping: { label: 'name' }, + }, + }, + }) + const multiStreamCatalog: ConfiguredCatalog = { + streams: [ + ...catalog.streams, + { + stream: { + name: 'crm_accounts', + primary_key: [['id']], + newer_than_field: 'updated_at', + }, + sync_mode: 'incremental', + destination_sync_mode: 'append', + }, + ], + } + const input: Message[] = [ + inputMessages()[0]!, + { + type: 'record', + record: { + stream: 'crm_accounts', + data: { + id: 'acct_123', + name: 'Enterprise account', + updated_at: '2026-01-01T00:00:00.000Z', + }, + emitted_at: '2026-05-03T00:00:00.000Z', + }, + }, + ] + + const output = await collect( + destination.write({ config: multiStreamConfig, catalog: multiStreamCatalog }, input) + ) + + expect(output.map((message) => message.type)).toEqual(['record', 'record']) + expect(requests.map((request) => request.url)).toEqual([ + 'https://stripe.test/v2/extend/object_definitions', + 'https://stripe.test/v2/extend/objects/loyalty_cards', + 'https://stripe.test/v2/extend/objects/account_cards', + ]) + expect(requests[1]!.init?.body).toBe( + JSON.stringify({ fields: { nickname: 'Jenny Rosen', tier: 'enterprise' } }) + ) + expect(requests[2]!.init?.body).toBe( + JSON.stringify({ fields: { label: 'Enterprise account' } }) + ) + }) + + it('withholds source_state after a failed Custom Object write', async () => { + const destination = createStripeDestination({ + sleep: async () => {}, + fetch: async (url) => { + if (String(url).endsWith('/v2/extend/object_definitions')) { + return response(customObjectDefinitions()) + } + return response({ error: { message: 'custom object invalid' } }, { status: 400 }) + }, + }) + + const messages = await collect( + destination.write({ config: customObjectConfig, catalog }, inputMessages()) + ) + + expect(messages).toEqual([ + { + type: 'stream_status', + stream_status: { + stream: 'crm_customers', + status: 'error', + error: 'custom object invalid', + }, + }, + ]) + }) + + it('retries retryable non-JSON Stripe errors', async () => { + const requests: Array<{ url: string; init?: RequestInit }> = [] + let sleeps = 0 + const destination = createStripeDestination({ + sleep: async () => { + sleeps += 1 + }, + fetch: async (url, init) => { + requests.push({ url: String(url), init }) + if (String(url).endsWith('/v2/extend/object_definitions')) { + return response(customObjectDefinitions()) + } + if ( + requests.filter((request) => request.url.endsWith('/v2/extend/objects/loyalty_cards')) + .length === 1 + ) { + return new Response('temporary upstream failure', { + status: 500, + headers: { 'content-type': 'text/plain' }, + }) + } + return response({ id: 'co_123', object: 'v2.extend.object' }) + }, + }) + + const messages = await collect( + destination.write({ config: customObjectConfig, catalog }, [inputMessages()[0]!]) + ) + + expect(messages.map((message) => message.type)).toEqual(['record']) + expect(requests.map((request) => request.url)).toEqual([ + 'https://stripe.test/v2/extend/object_definitions', + 'https://stripe.test/v2/extend/objects/loyalty_cards', + 'https://stripe.test/v2/extend/objects/loyalty_cards', + ]) + expect(sleeps).toBe(1) + }) + + it('withholds global source_state after any Custom Object write failure', async () => { + const destination = createStripeDestination({ + sleep: async () => {}, + fetch: async (url) => { + if (String(url).endsWith('/v2/extend/object_definitions')) { + return response(customObjectDefinitions()) + } + return response({ error: { message: 'custom object invalid' } }, { status: 400 }) + }, + }) + + const messages = await collect( + destination.write({ config: customObjectConfig, catalog }, [ + ...inputMessages(), + { + type: 'source_state', + source_state: { + state_type: 'global', + data: { cursor: 'global_cursor_after_failed_record' }, + }, + }, + ]) + ) + + expect(messages).toEqual([ + { + type: 'stream_status', + stream_status: { + stream: 'crm_customers', + status: 'error', + error: 'custom object invalid', + }, + }, + ]) + }) +}) diff --git a/packages/destination-stripe/src/index.ts b/packages/destination-stripe/src/index.ts new file mode 100644 index 000000000..cf0b02194 --- /dev/null +++ b/packages/destination-stripe/src/index.ts @@ -0,0 +1,663 @@ +import { createHash } from 'node:crypto' +import type { ConfiguredCatalog, Destination, Stream } from '@stripe/sync-protocol' +import { createSourceMessageFactory } from '@stripe/sync-protocol' +import { resolveOpenApiSpec, SpecParser, type CreateEndpoint } from '@stripe/sync-openapi' +import defaultSpec, { + type Config, + type CustomObjectConfig, + type StandardObjectConfig, +} from './spec.js' +import { log } from './logger.js' + +export { configSchema, type Config } from './spec.js' + +type FetchFn = typeof globalThis.fetch + +export type StripeDestinationDeps = { + fetch?: FetchFn + sleep?: (ms: number) => Promise +} + +type RequestBodyEncoding = 'form' | 'json' + +type CustomObjectStreamConfig = CustomObjectConfig['streams'][string] +type StandardObjectStreamConfig = StandardObjectConfig['streams'][string] +type StandardObjectSetup = { + config: StandardObjectConfig + createEndpoints: Map +} + +class StripeWriteError extends Error { + constructor( + message: string, + readonly status?: number, + readonly responseHeaders?: Record + ) { + super(message) + this.name = 'StripeWriteError' + } +} + +const DEFAULT_STRIPE_API_BASE = 'https://api.stripe.com' +const SUPPORTED_CUSTOM_OBJECT = 'custom_object' +const SUPPORTED_STANDARD_OBJECT = 'standard_object' +const CUSTOM_OBJECT_API_VERSION = 'unsafe-development' +const msg = createSourceMessageFactory, Record>() + +function baseUrl(config: Config): string { + return (config.base_url ?? DEFAULT_STRIPE_API_BASE).replace(/\/$/, '') +} + +function requireCustomObjectConfig(config: Config): CustomObjectConfig { + const raw = config as Config & { + object?: unknown + api_version?: unknown + write_mode?: unknown + streams?: unknown + } + if (raw.object !== SUPPORTED_CUSTOM_OBJECT) { + throw new Error( + `destination-stripe expected object: "custom_object"; object "${String(raw.object)}" is not supported by this write path` + ) + } + if (raw.api_version !== CUSTOM_OBJECT_API_VERSION) { + throw new Error( + `api_version must be "${CUSTOM_OBJECT_API_VERSION}" for object: "custom_object"` + ) + } + if (raw.write_mode !== 'create') { + throw new Error('write_mode must be "create" for object: "custom_object"') + } + if (!isRecord(raw.streams) || Object.keys(raw.streams).length === 0) { + throw new Error('streams is required for object: "custom_object"') + } + return config as CustomObjectConfig +} + +function requireStandardObjectConfig(config: Config): StandardObjectConfig { + const raw = config as Config & { + object?: unknown + write_mode?: unknown + streams?: unknown + } + if (raw.object !== SUPPORTED_STANDARD_OBJECT) { + throw new Error( + `destination-stripe expected object: "standard_object"; object "${String(raw.object)}" is not supported by this write path` + ) + } + if (raw.write_mode !== 'create') { + throw new Error('write_mode must be "create" for object: "standard_object"') + } + if (!isRecord(raw.streams) || Object.keys(raw.streams).length === 0) { + throw new Error('streams is required for object: "standard_object"') + } + return config as StandardObjectConfig +} + +function encodeFormData(params: Record, prefix = ''): string { + const parts: string[] = [] + for (const [key, value] of Object.entries(params)) { + const fullKey = prefix ? `${prefix}[${key}]` : key + if (value == null) continue + if (typeof value === 'object' && !Array.isArray(value)) { + parts.push(encodeFormData(value as Record, fullKey)) + } else if (Array.isArray(value)) { + for (const item of value) { + parts.push(`${encodeURIComponent(`${fullKey}[]`)}=${encodeURIComponent(String(item))}`) + } + } else { + parts.push(`${encodeURIComponent(fullKey)}=${encodeURIComponent(String(value))}`) + } + } + return parts.filter(Boolean).join('&') +} + +function headersToRecord(headers: Headers): Record { + const out: Record = {} + headers.forEach((value, key) => { + out[key.toLowerCase()] = value + }) + return out +} + +function errorMessageFromJson(json: unknown): string { + if ( + json && + typeof json === 'object' && + 'error' in json && + json.error && + typeof json.error === 'object' && + 'message' in json.error + ) { + return String(json.error.message) + } + return 'Stripe request failed' +} + +function parseJson(text: string): unknown { + return JSON.parse(text) +} + +function retryAfterMs(headers: Record): number | undefined { + const value = headers['retry-after'] + if (!value) return undefined + const seconds = Number(value) + if (!Number.isFinite(seconds) || seconds <= 0) return undefined + return seconds * 1000 +} + +function isRetryable(err: unknown): boolean { + if (err instanceof StripeWriteError) { + return err.status === 429 || (err.status != null && err.status >= 500) + } + if (!(err instanceof Error)) return false + if (err.name === 'AbortError') return false + return err.name === 'TimeoutError' || /fetch failed|network|timeout/i.test(err.message) +} + +async function requestJson( + config: Config, + fetchFn: FetchFn, + method: string, + path: string, + params?: Record, + opts?: { idempotencyKey?: string; bodyEncoding?: RequestBodyEncoding; stripeVersion?: string } +): Promise { + const url = new URL(path, baseUrl(config)) + let body: string | undefined + + if (method === 'GET' && params) { + for (const [key, value] of Object.entries(params)) { + if (value != null) url.searchParams.set(key, String(value)) + } + } else if (params) { + body = opts?.bodyEncoding === 'json' ? JSON.stringify(params) : encodeFormData(params) + } + + const headers: Record = { + Authorization: `Bearer ${config.api_key}`, + 'Stripe-Version': opts?.stripeVersion ?? config.api_version, + } + if (body !== undefined) { + headers['Content-Type'] = + opts?.bodyEncoding === 'json' ? 'application/json' : 'application/x-www-form-urlencoded' + } + if (opts?.idempotencyKey) headers['Idempotency-Key'] = opts.idempotencyKey + + const response = await fetchFn(url, { method, headers, body }) + const responseHeaders = headersToRecord(response.headers) + const text = await response.text() + let json: unknown = {} + if (text) { + try { + json = parseJson(text) + } catch (err) { + if (!response.ok) { + throw new StripeWriteError(text, response.status, responseHeaders) + } + throw err + } + } + + if (!response.ok) { + throw new StripeWriteError(errorMessageFromJson(json), response.status, responseHeaders) + } + + return json as T +} + +async function withRetry( + fn: () => Promise, + opts: { maxRetries: number; sleep: (ms: number) => Promise; label: string } +): Promise { + let delayMs = 1000 + for (let attempt = 0; ; attempt++) { + try { + return await fn() + } catch (err) { + if (attempt >= opts.maxRetries || !isRetryable(err)) throw err + const headers = err instanceof StripeWriteError ? err.responseHeaders : undefined + const waitMs = headers ? (retryAfterMs(headers) ?? delayMs) : delayMs + log.warn( + { + attempt: attempt + 1, + max_retries: opts.maxRetries, + delay_ms: waitMs, + label: opts.label, + err, + }, + `Retrying Stripe write ${opts.label}` + ) + await opts.sleep(waitMs) + delayMs = Math.min(delayMs * 2, 32_000) + } + } +} + +function getPath(data: Record, path: string): unknown { + const parts = path.split('.') + let current: unknown = data + for (const part of parts) { + if (!current || typeof current !== 'object') return undefined + current = (current as Record)[part] + } + return current +} + +function streamFor(catalog: ConfiguredCatalog, name: string): Stream | undefined { + return catalog.streams.find((configured) => configured.stream.name === name)?.stream +} + +function idempotencyKey( + stream: Stream | undefined, + streamName: string, + operation: string, + data: Record +): string { + const pk = stream?.primary_key?.map((path) => getPath(data, path.join('.'))) ?? [data.id] + const version = stream?.newer_than_field ? getPath(data, stream.newer_than_field) : undefined + const raw = JSON.stringify({ stream: streamName, operation, pk, version }) + return `reverse-etl-${createHash('sha256').update(raw).digest('hex')}` +} + +type CustomObjectDefinition = Record + +function isRecord(value: unknown): value is Record { + return value != null && typeof value === 'object' && !Array.isArray(value) +} + +function objectRecords(value: unknown): CustomObjectDefinition[] { + return Array.isArray(value) ? value.filter(isRecord) : [] +} + +function extractCustomObjectDefinitions(json: unknown): CustomObjectDefinition[] { + if (Array.isArray(json)) return objectRecords(json) + if (!isRecord(json)) return [] + + if (Array.isArray(json.data)) return objectRecords(json.data) + if (Array.isArray(json.object_definitions)) return objectRecords(json.object_definitions) + return [] +} + +function customObjectDefinitionPluralName(definition: CustomObjectDefinition): string | undefined { + const pluralName = + definition.api_name_plural ?? definition.plural_name ?? definition.pluralName ?? definition.name + return typeof pluralName === 'string' ? pluralName : undefined +} + +function customObjectFieldName(field: unknown): string | undefined { + if (typeof field === 'string') return field + if (!isRecord(field)) return undefined + const name = field.name ?? field.key + return typeof name === 'string' ? name : undefined +} + +function customObjectFieldNames(definition: CustomObjectDefinition): Set | undefined { + const fields = definition.properties ?? definition.fields + if (fields == null) return undefined + + if (Array.isArray(fields)) { + return new Set( + fields.map(customObjectFieldName).filter((name): name is string => Boolean(name)) + ) + } + + if (isRecord(fields)) { + if (Array.isArray(fields.data)) { + return new Set( + fields.data.map(customObjectFieldName).filter((name): name is string => Boolean(name)) + ) + } + return new Set(Object.keys(fields)) + } + + return undefined +} + +async function validateCustomObjectConfig(config: Config, fetchFn: FetchFn): Promise { + const customConfig = requireCustomObjectConfig(config) + const json = await requestJson( + customConfig, + fetchFn, + 'GET', + '/v2/extend/object_definitions', + undefined, + { stripeVersion: CUSTOM_OBJECT_API_VERSION } + ) + const definitions = extractCustomObjectDefinitions(json) + if (definitions.length === 0) { + throw new Error( + `No Stripe Custom Object definitions found; cannot validate configured custom object streams` + ) + } + + const definitionsByPluralName = new Map( + definitions + .map((definition) => [customObjectDefinitionPluralName(definition), definition] as const) + .filter((entry): entry is [string, CustomObjectDefinition] => entry[0] != null) + ) + + for (const [streamName, streamConfig] of Object.entries(customConfig.streams)) { + const definition = definitionsByPluralName.get(streamConfig.plural_name) + if (!definition) { + throw new Error( + `Stripe Custom Object definition "${streamConfig.plural_name}" for stream "${streamName}" was not found` + ) + } + + const knownFields = customObjectFieldNames(definition) + if (knownFields === undefined) continue + + const unknownFields = Object.keys(streamConfig.field_mapping).filter( + (field) => !knownFields.has(field) + ) + if (unknownFields.length > 0) { + throw new Error( + `Stripe Custom Object "${streamConfig.plural_name}" for stream "${streamName}" does not define mapped field(s): ${unknownFields.join(', ')}` + ) + } + } +} + +async function validateStandardObjectConfig( + config: Config, + fetchFn: FetchFn +): Promise { + const standardConfig = requireStandardObjectConfig(config) + const resolved = await resolveOpenApiSpec({ apiVersion: standardConfig.api_version }, fetchFn) + const createEndpoints = new SpecParser().discoverCreateEndpoints(resolved.spec) + + for (const [streamName, streamConfig] of Object.entries(standardConfig.streams)) { + const endpoint = createEndpoints.get(streamName) + if (!endpoint) { + throw new Error(`Stripe create endpoint for stream "${streamName}" was not found`) + } + + const unknownParams = Object.keys(streamConfig.field_mapping).filter( + (stripeParam) => !endpoint.requestFields.has(stripeParam) + ) + if (unknownParams.length > 0) { + throw new Error( + `Standard object stream "${streamName}" does not define create parameter(s): ${unknownParams.join(', ')}` + ) + } + } + + return { config: standardConfig, createEndpoints } +} + +type DestinationSetup = + | { object: 'custom_object'; config: CustomObjectConfig } + | { + object: 'standard_object' + config: StandardObjectConfig + createEndpoints: Map + } + +async function validateConfig(config: Config, fetchFn: FetchFn): Promise { + const object = (config as { object?: unknown }).object + if (object === SUPPORTED_CUSTOM_OBJECT) { + await validateCustomObjectConfig(config, fetchFn) + return { object: 'custom_object', config: requireCustomObjectConfig(config) } + } + if (object === SUPPORTED_STANDARD_OBJECT) { + const setup = await validateStandardObjectConfig(config, fetchFn) + return { object: 'standard_object', ...setup } + } + throw new Error( + `destination-stripe supports object: "custom_object" or "standard_object"; object "${String(object)}" is not supported` + ) +} + +function validateCatalogStreams(setup: DestinationSetup, catalog: ConfiguredCatalog): void { + for (const configured of catalog.streams) { + const streamName = configured.stream.name + if (setup.object === 'custom_object') { + customObjectStreamConfig(setup.config, streamName) + } else { + standardObjectStreamConfig(setup.config, streamName) + const endpoint = setup.createEndpoints.get(streamName) + if (!endpoint) { + throw new Error(`Stripe create endpoint for stream "${streamName}" was not found`) + } + } + } +} + +function customObjectFields( + streamConfig: CustomObjectStreamConfig, + data: Record +): Record { + const fields: Record = {} + for (const [customObjectField, sourceField] of Object.entries(streamConfig.field_mapping)) { + const value = getPath(data, sourceField) + if (value != null) fields[customObjectField] = value + } + return fields +} + +function customObjectStreamConfig( + config: CustomObjectConfig, + streamName: string +): CustomObjectStreamConfig { + const streamConfig = config.streams[streamName] + if (!streamConfig) { + throw new Error(`No Stripe Custom Object stream config found for stream "${streamName}"`) + } + return streamConfig +} + +function standardObjectStreamConfig( + config: StandardObjectConfig, + streamName: string +): StandardObjectStreamConfig { + const streamConfig = config.streams[streamName] + if (!streamConfig) { + throw new Error(`No standard object stream config found for stream "${streamName}"`) + } + return streamConfig +} + +function standardObjectParams( + endpoint: CreateEndpoint, + streamConfig: StandardObjectStreamConfig, + data: Record +): Record { + const params: Record = {} + for (const [stripeParam, sourceField] of Object.entries(streamConfig.field_mapping)) { + if (!endpoint.requestFields.has(stripeParam)) continue + const value = getPath(data, sourceField) + if (value != null) params[stripeParam] = value + } + return params +} + +async function createCustomObject( + config: CustomObjectConfig, + streamConfig: CustomObjectStreamConfig, + fetchFn: FetchFn, + sleep: (ms: number) => Promise, + stream: Stream | undefined, + streamName: string, + data: Record +): Promise> { + const params = { fields: customObjectFields(streamConfig, data) } + const pluralName = encodeURIComponent(streamConfig.plural_name) + const idemKey = idempotencyKey(stream, streamName, 'create', data) + const record = await withRetry( + () => + requestJson>( + config, + fetchFn, + 'POST', + `/v2/extend/objects/${pluralName}`, + params, + { + bodyEncoding: 'json', + stripeVersion: CUSTOM_OBJECT_API_VERSION, + idempotencyKey: idemKey, + } + ), + { + maxRetries: config.max_retries, + sleep, + label: `create custom object ${streamConfig.plural_name}`, + } + ) + if (typeof record.id !== 'string') { + throw new Error(`Stripe Custom Object create response did not include a string id`) + } + return record +} + +async function createStandardObject( + setup: Extract, + fetchFn: FetchFn, + sleep: (ms: number) => Promise, + stream: Stream | undefined, + streamName: string, + data: Record +): Promise> { + const endpoint = setup.createEndpoints.get(streamName) + if (!endpoint) { + throw new Error(`Stripe create endpoint for stream "${streamName}" was not found`) + } + const params = standardObjectParams( + endpoint, + standardObjectStreamConfig(setup.config, streamName), + data + ) + const idemKey = idempotencyKey(stream, streamName, 'create', data) + const record = await withRetry( + () => + requestJson>( + setup.config, + fetchFn, + 'POST', + endpoint.apiPath, + params, + { + bodyEncoding: endpoint.bodyEncoding, + idempotencyKey: idemKey, + } + ), + { + maxRetries: setup.config.max_retries, + sleep, + label: `create standard object ${streamName}`, + } + ) + if (typeof record.id !== 'string') { + throw new Error( + `Standard object create response for stream "${streamName}" did not include a string id` + ) + } + return record +} + +function streamError(stream: string, error: unknown) { + const message = error instanceof Error ? error.message : String(error) + return msg.stream_status({ stream, status: 'error', error: message }) +} + +function connectionError(error: unknown) { + const message = error instanceof Error ? error.message : String(error) + return msg.connection_status({ status: 'failed', message }) +} + +export function createStripeDestination(deps: StripeDestinationDeps = {}): Destination { + const fetchFn = deps.fetch ?? globalThis.fetch + const sleep = + deps.sleep ?? ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms))) + + return { + async *spec() { + yield { type: 'spec' as const, spec: defaultSpec } + }, + + async *check({ config }) { + try { + await validateConfig(config, fetchFn) + yield msg.connection_status({ status: 'succeeded' }) + } catch (err) { + yield msg.connection_status({ + status: 'failed', + message: err instanceof Error ? err.message : String(err), + }) + } + }, + + async *write({ config, catalog }, $stdin) { + const failedStreams = new Set() + let setupError: unknown + let setup: DestinationSetup | undefined + + try { + setup = await validateConfig(config, fetchFn) + validateCatalogStreams(setup, catalog) + } catch (err) { + setupError = err + yield connectionError(err) + for (const configured of catalog.streams) { + failedStreams.add(configured.stream.name) + yield streamError(configured.stream.name, err) + } + } + + for await (const input of $stdin) { + if (input.type === 'record') { + const { stream, data } = input.record + if (failedStreams.has(stream)) continue + + try { + if (setupError) throw setupError + if (!setup) throw new Error('destination-stripe setup did not complete') + if (setup.object === 'custom_object') { + await createCustomObject( + setup.config, + customObjectStreamConfig(setup.config, stream), + fetchFn, + sleep, + streamFor(catalog, stream), + stream, + data as Record + ) + } else { + await createStandardObject( + setup, + fetchFn, + sleep, + streamFor(catalog, stream), + stream, + data as Record + ) + } + yield input + } catch (err) { + failedStreams.add(stream) + log.error({ stream, err }, 'destination-stripe write failed') + yield streamError(stream, err) + } + } else if (input.type === 'source_state') { + if (setupError) { + continue + } + if (input.source_state.state_type === 'global' && failedStreams.size > 0) { + continue + } + if ( + input.source_state.state_type === 'stream' && + failedStreams.has(input.source_state.stream) + ) { + continue + } + yield input + } else { + yield input + } + } + }, + } +} + +export default createStripeDestination() diff --git a/packages/destination-stripe/src/logger.ts b/packages/destination-stripe/src/logger.ts new file mode 100644 index 000000000..4acec2f3e --- /dev/null +++ b/packages/destination-stripe/src/logger.ts @@ -0,0 +1,4 @@ +import { createLogger } from '@stripe/sync-logger' +import type { Logger } from '@stripe/sync-logger' + +export const log: Logger = createLogger({ name: 'destination-stripe' }) diff --git a/packages/destination-stripe/src/spec.ts b/packages/destination-stripe/src/spec.ts new file mode 100644 index 000000000..688815f87 --- /dev/null +++ b/packages/destination-stripe/src/spec.ts @@ -0,0 +1,78 @@ +import { z } from 'zod' +import type { ConnectorSpecification } from '@stripe/sync-protocol' +import { SUPPORTED_API_VERSIONS } from '@stripe/sync-openapi' + +const customObjectStreamConfigSchema = z + .object({ + plural_name: z.string().describe('Stripe Custom Object api_name_plural'), + field_mapping: z + .record(z.string(), z.string()) + .describe('Mapping from Custom Object field names to source record fields.'), + }) + .strict() + +const standardObjectStreamConfigSchema = z + .object({ + field_mapping: z + .record(z.string(), z.string()) + .describe('Mapping from Stripe create parameter names to source record fields.'), + }) + .strict() + +const baseConfigSchema = z.object({ + api_key: z.string().describe('Stripe API key (sk_test_... or sk_live_...)'), + base_url: z + .string() + .url() + .optional() + .describe('Override the Stripe API base URL (e.g. http://localhost:12111 for tests)'), + max_retries: z + .number() + .int() + .nonnegative() + .default(3) + .describe('Retries for 429/5xx/network errors'), +}) + +const customObjectConfigSchema = baseConfigSchema + .extend({ + api_version: z + .literal('unsafe-development') + .describe('Stripe API version for Custom Object write requests'), + object: z.literal('custom_object').describe('Stripe object type to write.'), + write_mode: z.literal('create').describe('Custom Objects are append-only create writes.'), + streams: z + .record(z.string(), customObjectStreamConfigSchema) + .describe('Per-source-stream Custom Object write configuration.'), + }) + .strict() + +const standardObjectConfigSchema = baseConfigSchema + .extend({ + api_version: z.enum(SUPPORTED_API_VERSIONS).describe('Stripe API version for write requests'), + object: z.literal('standard_object').describe('Write standard Stripe API resources.'), + write_mode: z.literal('create').describe('Standard Stripe objects are insert-only creates.'), + streams: z + .record(z.string(), standardObjectStreamConfigSchema) + .describe('Per-source-stream standard Stripe object create configuration.'), + }) + .strict() + +export const configSchema = z.discriminatedUnion('object', [ + customObjectConfigSchema, + standardObjectConfigSchema, +]) + +export type Config = z.infer +export type CustomObjectConfig = z.infer +export type StandardObjectConfig = z.infer + +const configJsonSchema = { + ...z.toJSONSchema(configSchema), + type: 'object', + properties: {}, +} as const + +export default { + config: configJsonSchema, +} satisfies ConnectorSpecification diff --git a/packages/destination-stripe/tsconfig.json b/packages/destination-stripe/tsconfig.json new file mode 100644 index 000000000..2481fe545 --- /dev/null +++ b/packages/destination-stripe/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src/**/*"], + "exclude": ["src/**/*.test.ts", "src/**/__tests__/**"] +} diff --git a/packages/openapi/__tests__/specParser.test.ts b/packages/openapi/__tests__/specParser.test.ts index 0f6df93d5..dce45ab87 100644 --- a/packages/openapi/__tests__/specParser.test.ts +++ b/packages/openapi/__tests__/specParser.test.ts @@ -522,6 +522,122 @@ describe('SpecParser', () => { }) }) + describe('discoverCreateEndpoints', () => { + it('discovers top-level POST create endpoints and request fields', () => { + const parser = new SpecParser() + const spec: OpenApiSpec = { + openapi: '3.0.0', + paths: { + '/v1/customers': { + get: { + responses: { + '200': { + content: { + 'application/json': { + schema: { + type: 'object', + properties: { + object: { type: 'string', enum: ['list'] }, + data: { + type: 'array', + items: { $ref: '#/components/schemas/customer' }, + }, + }, + }, + }, + }, + }, + }, + }, + post: { + requestBody: { + content: { + 'application/x-www-form-urlencoded': { + schema: { $ref: '#/components/schemas/customer_create' }, + }, + }, + }, + responses: {}, + }, + }, + '/v1/customers/{customer}/sources': { + get: { + responses: { + '200': { + content: { + 'application/json': { + schema: { + type: 'object', + properties: { + object: { type: 'string', enum: ['list'] }, + data: { + type: 'array', + items: { $ref: '#/components/schemas/card' }, + }, + }, + }, + }, + }, + }, + }, + }, + post: { + requestBody: { + content: { + 'application/x-www-form-urlencoded': { + schema: { + type: 'object', + properties: { source: { type: 'string' } }, + }, + }, + }, + }, + responses: {}, + }, + }, + }, + components: { + schemas: { + customer: { + 'x-resourceId': 'customer', + type: 'object', + properties: { id: { type: 'string' } }, + }, + card: { + 'x-resourceId': 'card', + type: 'object', + properties: { id: { type: 'string' } }, + }, + customer_create: { + type: 'object', + properties: { + email: { type: 'string' }, + name: { type: 'string' }, + metadata: { + type: 'object', + additionalProperties: { type: 'string' }, + }, + }, + }, + }, + }, + } + + const endpoints = parser.discoverCreateEndpoints(spec) + + expect(Array.from(endpoints.keys())).toEqual(['customer']) + expect(endpoints.get('customer')).toMatchObject({ + tableName: 'customer', + resourceId: 'customer', + apiPath: '/v1/customers', + bodyEncoding: 'form', + }) + expect(endpoints.get('customer')?.requestFields).toEqual( + new Set(['email', 'metadata', 'name']) + ) + }) + }) + describe('discoverWebhookUpdatableResourceIds', () => { it('discovers resource ids that have create/update/delete webhook events', () => { const parser = new SpecParser() diff --git a/packages/openapi/index.ts b/packages/openapi/index.ts index cf2c1d3a6..416fb62fe 100644 --- a/packages/openapi/index.ts +++ b/packages/openapi/index.ts @@ -1,6 +1,6 @@ export type * from './types.js' export { SpecParser, OPENAPI_RESOURCE_TABLE_ALIASES, resolveTableName } from './specParser.js' -export type { ListEndpoint, NestedEndpoint } from './specParser.js' +export type { CreateEndpoint, ListEndpoint, NestedEndpoint } from './specParser.js' export { resolveOpenApiSpec, diff --git a/packages/openapi/specParser.ts b/packages/openapi/specParser.ts index 44a146c2b..8e7784b90 100644 --- a/packages/openapi/specParser.ts +++ b/packages/openapi/specParser.ts @@ -58,6 +58,14 @@ export type NestedEndpoint = { supportsPagination: boolean } +export type CreateEndpoint = { + tableName: string + resourceId: string + apiPath: string + requestFields: Set + bodyEncoding: 'form' | 'json' +} + type ColumnAccumulator = { type: ScalarType nullable: boolean @@ -290,6 +298,37 @@ export class SpecParser { return nested } + /** + * Discover top-level create endpoints whose POST path matches a list endpoint + * collection path, e.g. GET/POST `/v1/customers`. + */ + discoverCreateEndpoints( + spec: OpenApiSpec, + aliases: Record = OPENAPI_RESOURCE_TABLE_ALIASES + ): Map { + const endpoints = new Map() + for (const raw of this.iterListPaths(spec)) { + if (raw.isNested) continue + const postOp = spec.paths?.[raw.apiPath]?.post + if (!postOp) continue + + const requestBody = this.createRequestBody(postOp, spec) + if (!requestBody || requestBody.requestFields.size === 0) continue + + const tableName = resolveTableName(raw.resourceId, aliases) + if (endpoints.has(tableName)) continue + + endpoints.set(tableName, { + tableName, + resourceId: raw.resourceId, + apiPath: raw.apiPath, + requestFields: requestBody.requestFields, + bodyEncoding: requestBody.bodyEncoding, + }) + } + return endpoints + } + /** * Resolve the canonical table list for schema parsing. * Delegates to {@link discoverSyncableTables} so the parser and runtime @@ -429,10 +468,10 @@ export class SpecParser { private collectEnabledEventTypes(spec: OpenApiSpec): Set { const types = new Set() - const op = spec.paths?.['/v1/webhook_endpoints']?.post as - | { requestBody?: { content?: Record } } - | undefined - const schema = op?.requestBody?.content?.['application/x-www-form-urlencoded']?.schema + const schema = + spec.paths?.['/v1/webhook_endpoints']?.post?.requestBody?.content?.[ + 'application/x-www-form-urlencoded' + ]?.schema if (!schema || '$ref' in schema) return types const enabledEvents = schema.properties?.enabled_events if (!enabledEvents || '$ref' in enabledEvents) return types @@ -446,6 +485,31 @@ export class SpecParser { return types } + private createRequestBody( + operation: { + requestBody?: { content?: Record } + }, + spec: OpenApiSpec + ): { requestFields: Set; bodyEncoding: 'form' | 'json' } | undefined { + const formSchema = operation.requestBody?.content?.['application/x-www-form-urlencoded']?.schema + if (formSchema) { + return { + requestFields: new Set(this.collectPropertyCandidates(formSchema, spec).keys()), + bodyEncoding: 'form', + } + } + + const jsonSchema = operation.requestBody?.content?.['application/json']?.schema + if (jsonSchema) { + return { + requestFields: new Set(this.collectPropertyCandidates(jsonSchema, spec).keys()), + bodyEncoding: 'json', + } + } + + return undefined + } + /** Match event types like `customer.created` or `v2.core.account.updated` against listable resource ids. */ private matchEventTypesToResourceIds( eventTypes: ReadonlySet, diff --git a/packages/openapi/types.ts b/packages/openapi/types.ts index 246236ff2..600d98512 100644 --- a/packages/openapi/types.ts +++ b/packages/openapi/types.ts @@ -2,6 +2,7 @@ export type OpenApiSchemaObject = { type?: string format?: string nullable?: boolean + required?: string[] properties?: Record items?: OpenApiSchemaOrReference oneOf?: OpenApiSchemaOrReference[] @@ -43,6 +44,9 @@ export type OpenApiOperationObject = { required?: boolean schema?: OpenApiSchemaOrReference }[] + requestBody?: { + content?: Record + } responses?: Record } diff --git a/packages/source-postgres/package.json b/packages/source-postgres/package.json new file mode 100644 index 000000000..fd38d38ae --- /dev/null +++ b/packages/source-postgres/package.json @@ -0,0 +1,35 @@ +{ + "name": "@stripe/sync-source-postgres", + "version": "0.2.5", + "private": false, + "type": "module", + "exports": { + ".": { + "bun": "./src/index.ts", + "types": "./dist/index.d.ts", + "import": "./dist/index.js" + } + }, + "bin": { + "source-postgres": "./dist/bin.js" + }, + "scripts": { + "build": "tsc", + "test": "vitest" + }, + "files": [ + "src", + "dist" + ], + "dependencies": { + "@stripe/sync-logger": "workspace:*", + "@stripe/sync-protocol": "workspace:*", + "@stripe/sync-util-postgres": "workspace:*", + "pg": "^8.16.3", + "zod": "^4.3.6" + }, + "devDependencies": { + "@types/pg": "^8.15.5", + "vitest": "^3.2.4" + } +} diff --git a/packages/source-postgres/src/bin.ts b/packages/source-postgres/src/bin.ts new file mode 100644 index 000000000..33df1b05d --- /dev/null +++ b/packages/source-postgres/src/bin.ts @@ -0,0 +1,6 @@ +#!/usr/bin/env node +import connector from './index.js' +import { configSchema } from './spec.js' +import { runConnectorCli } from '@stripe/sync-protocol/cli' + +runConnectorCli(connector, { name: 'source-postgres', configSchema }) diff --git a/packages/source-postgres/src/index.test.ts b/packages/source-postgres/src/index.test.ts new file mode 100644 index 000000000..9d6e50eb7 --- /dev/null +++ b/packages/source-postgres/src/index.test.ts @@ -0,0 +1,258 @@ +import { describe, expect, it } from 'vitest' +import { z } from 'zod' +import type { ConfiguredCatalog } from '@stripe/sync-protocol' +import { createPostgresSource } from './index.js' +import spec, { configSchema } from './spec.js' + +async function collect(iterable: AsyncIterable): Promise { + const out: T[] = [] + for await (const item of iterable) out.push(item) + return out +} + +function queryResult>(rows: T[]) { + return { + rows, + rowCount: rows.length, + command: 'SELECT', + oid: 0, + fields: [], + } +} + +describe('source-postgres', () => { + it('validates config constraints through the JSON Schema path', () => { + const jsonSchemaConfig = z.fromJSONSchema(spec.config) + const validTableConfig = { + url: 'postgres://example', + table: 'crm_customers', + cursor_field: 'updated_at', + } + const validQueryConfig = { + url: 'postgres://example', + query: 'SELECT * FROM crm_customers', + stream: 'crm_customers', + cursor_field: 'updated_at', + } + + expect(jsonSchemaConfig.safeParse(validTableConfig).success).toBe(true) + expect(jsonSchemaConfig.safeParse(validQueryConfig).success).toBe(true) + for (const invalidConfig of [ + { table: 'crm_customers', cursor_field: 'updated_at' }, + { + url: 'postgres://example', + table: 'crm_customers', + query: 'SELECT * FROM crm_customers', + stream: 'crm_customers', + cursor_field: 'updated_at', + }, + { + url: 'postgres://example', + query: 'SELECT * FROM crm_customers', + cursor_field: 'updated_at', + }, + ]) { + expect(jsonSchemaConfig.safeParse(invalidConfig).success).toBe(false) + expect(configSchema.safeParse(invalidConfig).success).toBe(false) + } + }) + + it('discovers a configured table as one stream', async () => { + const config = configSchema.parse({ + url: 'postgres://example', + schema: 'public', + table: 'crm_customers', + primary_key: ['id'], + cursor_field: 'updated_at', + }) + + const source = createPostgresSource({ + createPool: () => ({ + async query(text: string) { + if (text.includes('information_schema.columns')) { + return queryResult([ + { column_name: 'id', data_type: 'text', is_nullable: 'NO' }, + { column_name: 'email', data_type: 'text', is_nullable: 'YES' }, + { + column_name: 'updated_at', + data_type: 'timestamp with time zone', + is_nullable: 'NO', + }, + ]) + } + return queryResult([]) + }, + async end() {}, + }), + }) + + const messages = await collect(source.discover({ config })) + + expect(messages).toEqual([ + { + type: 'catalog', + catalog: { + streams: [ + { + name: 'crm_customers', + primary_key: [['id']], + json_schema: { + type: 'object', + properties: { + id: { type: 'string' }, + email: { anyOf: [{ type: 'string' }, { type: 'null' }] }, + updated_at: { type: 'string' }, + }, + required: ['id', 'updated_at'], + additionalProperties: true, + }, + newer_than_field: 'updated_at', + }, + ], + }, + }, + ]) + }) + + it('discovers pg wire types and nullable values accurately', async () => { + const config = configSchema.parse({ + url: 'postgres://example', + table: 'orders', + primary_key: ['id'], + cursor_field: 'updated_at', + }) + + const source = createPostgresSource({ + createPool: () => ({ + async query(text: string) { + if (text.includes('information_schema.columns')) { + return queryResult([ + { column_name: 'id', data_type: 'text', is_nullable: 'NO' }, + { column_name: 'amount_cents', data_type: 'bigint', is_nullable: 'NO' }, + { column_name: 'ratio', data_type: 'numeric', is_nullable: 'YES' }, + { + column_name: 'updated_at', + data_type: 'timestamp with time zone', + is_nullable: 'NO', + }, + ]) + } + return queryResult([]) + }, + async end() {}, + }), + }) + + const messages = await collect(source.discover({ config })) + + expect(messages[0]).toMatchObject({ + type: 'catalog', + catalog: { + streams: [ + { + json_schema: { + properties: { + amount_cents: { type: 'string' }, + ratio: { anyOf: [{ type: 'string' }, { type: 'null' }] }, + }, + required: ['id', 'amount_cents', 'updated_at'], + }, + }, + ], + }, + }) + }) + + it('fails discovery when a configured table has no visible columns', async () => { + const config = configSchema.parse({ + url: 'postgres://example', + schema: 'public', + table: 'missing_table', + primary_key: ['id'], + cursor_field: 'updated_at', + }) + const source = createPostgresSource({ + createPool: () => ({ + async query() { + return queryResult([]) + }, + async end() {}, + }), + }) + + await expect(collect(source.discover({ config }))).rejects.toThrow( + 'Table "public.missing_table" was not found or has no visible columns' + ) + }) + + it('reads pages and emits source_state after each page', async () => { + const config = configSchema.parse({ + url: 'postgres://example', + table: 'crm_customers', + primary_key: ['id'], + cursor_field: 'updated_at', + page_size: 2, + }) + const rows = [ + { id: 'crm_1', email: 'a@example.com', updated_at: '2026-01-01T00:00:00.000Z' }, + { id: 'crm_2', email: 'b@example.com', updated_at: '2026-01-02T00:00:00.000Z' }, + { id: 'crm_3', email: 'c@example.com', updated_at: '2026-01-03T00:00:00.000Z' }, + ] + const catalog: ConfiguredCatalog = { + streams: [ + { + stream: { + name: 'crm_customers', + primary_key: [['id']], + newer_than_field: 'updated_at', + }, + sync_mode: 'incremental', + destination_sync_mode: 'append', + }, + ], + } + + const source = createPostgresSource({ + now: () => new Date('2026-05-03T00:00:00.000Z'), + createPool: () => ({ + async query(_text: string, values?: unknown[]) { + const limit = Number(values?.at(-1) ?? 100) + const cursor = values && values.length > 1 ? String(values[0]) : undefined + const pk = values && values.length > 1 ? String(values[1]) : undefined + const page = rows + .filter( + (row) => + !cursor || row.updated_at > cursor || (row.updated_at === cursor && row.id > pk!) + ) + .slice(0, limit) + return queryResult(page) + }, + async end() {}, + }), + }) + + const messages = await collect(source.read({ config, catalog })) + + expect(messages.map((message) => message.type)).toEqual([ + 'record', + 'record', + 'source_state', + 'record', + 'source_state', + ]) + expect(messages[2]).toMatchObject({ + type: 'source_state', + source_state: { + state_type: 'stream', + stream: 'crm_customers', + data: { cursor: '2026-01-02T00:00:00.000Z', primary_key: ['crm_2'] }, + }, + }) + expect(messages[4]).toMatchObject({ + type: 'source_state', + source_state: { + data: { cursor: '2026-01-03T00:00:00.000Z', primary_key: ['crm_3'] }, + }, + }) + }) +}) diff --git a/packages/source-postgres/src/index.ts b/packages/source-postgres/src/index.ts new file mode 100644 index 000000000..44bea4738 --- /dev/null +++ b/packages/source-postgres/src/index.ts @@ -0,0 +1,270 @@ +import pg from 'pg' +import type { PoolConfig, QueryResult } from 'pg' +import type { Source, Stream } from '@stripe/sync-protocol' +import { createSourceMessageFactory } from '@stripe/sync-protocol' +import { + ident, + qualifiedTable, + sslConfigFromConnectionString, + stripSslParams, + withPgConnectProxy, +} from '@stripe/sync-util-postgres' +import defaultSpec, { type Config, type StreamState } from './spec.js' +import { log } from './logger.js' + +export { configSchema, streamStateSpec, type Config, type StreamState } from './spec.js' + +type Queryable = { + query = Record>( + text: string, + values?: unknown[] + ): Promise> + end(): Promise +} + +export type PostgresSourceDeps = { + createPool?: (config: Config) => Queryable | Promise + now?: () => Date +} + +type Row = Record + +const msg = createSourceMessageFactory, Row>() + +export async function buildPoolConfig(config: Config): Promise { + const connectionString = config.url ?? config.connection_string + if (!connectionString) throw new Error('Either url or connection_string is required') + return withPgConnectProxy({ + connectionString: stripSslParams(connectionString), + ssl: sslConfigFromConnectionString(connectionString, { sslCaPem: config.ssl_ca_pem }), + }) +} + +async function createDefaultPool(config: Config): Promise { + const poolConfig = await buildPoolConfig(config) + const pool = new pg.Pool(poolConfig) + pool.on('error', (err) => { + log.error({ err }, 'Postgres source pool error') + }) + return pool +} + +function streamName(config: Config): string { + return config.stream ?? config.table! +} + +function sourceSql(config: Config): string { + if (config.query) return `(${config.query}) AS source_query` + return qualifiedTable(config.schema, config.table!) +} + +function serializeValue(value: unknown): unknown { + if (value instanceof Date) return value.toISOString() + if (Buffer.isBuffer(value)) return value.toString('base64') + if (Array.isArray(value)) return value.map(serializeValue) + if (value && typeof value === 'object') { + return Object.fromEntries( + Object.entries(value as Record).map(([key, inner]) => [ + key, + serializeValue(inner), + ]) + ) + } + return value +} + +function serializeRow(row: Row): Row { + return Object.fromEntries(Object.entries(row).map(([key, value]) => [key, serializeValue(value)])) +} + +function jsonTypeForPostgresType(dataType: string): Record { + switch (dataType) { + case 'boolean': + return { type: 'boolean' } + case 'smallint': + case 'integer': + return { type: 'integer' } + case 'bigint': + return { type: 'string' } + case 'real': + case 'double precision': + return { type: 'number' } + case 'numeric': + return { type: 'string' } + case 'json': + case 'jsonb': + return { type: 'object' } + case 'ARRAY': + return { type: 'array' } + default: + return { type: 'string' } + } +} + +async function discoverTableSchema( + pool: Queryable, + config: Config +): Promise> { + if (!config.table) { + return { type: 'object', additionalProperties: true } + } + + const result = await pool.query<{ + column_name: string + data_type: string + is_nullable: string + }>( + ` + SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_schema = $1 AND table_name = $2 + ORDER BY ordinal_position + `, + [config.schema, config.table] + ) + + if (result.rows.length === 0) { + throw new Error( + `Table "${config.schema}.${config.table}" was not found or has no visible columns` + ) + } + + const properties = Object.fromEntries( + result.rows.map((row) => { + const schema = jsonTypeForPostgresType(row.data_type) + return [ + row.column_name, + row.is_nullable === 'YES' ? { anyOf: [schema, { type: 'null' }] } : schema, + ] + }) + ) + const required = result.rows + .filter((row) => row.is_nullable === 'NO') + .map((row) => row.column_name) + + return { + type: 'object', + properties, + ...(required.length ? { required } : {}), + additionalProperties: true, + } +} + +function buildPageQuery( + config: Config, + state: StreamState | undefined +): { text: string; values: unknown[] } { + const cursorAndPk = [config.cursor_field, ...config.primary_key] + const orderBy = cursorAndPk.map((column) => `${ident(column)} ASC`).join(', ') + const values: unknown[] = [] + let where = '' + + if (state?.cursor !== undefined) { + values.push(state.cursor, ...state.primary_key) + const columns = `(${cursorAndPk.map(ident).join(', ')})` + const params = `(${cursorAndPk.map((_, index) => `$${index + 1}`).join(', ')})` + where = `WHERE ${columns} > ${params}` + } + + values.push(config.page_size) + const limitParam = `$${values.length}` + + return { + text: `SELECT * FROM ${sourceSql(config)} ${where} ORDER BY ${orderBy} LIMIT ${limitParam}`, + values, + } +} + +function nextState(config: Config, row: Row): StreamState { + return { + cursor: serializeValue(row[config.cursor_field]), + primary_key: config.primary_key.map((key) => serializeValue(row[key])), + } +} + +function streamFromConfig(config: Config, jsonSchema: Record): Stream { + return { + name: streamName(config), + primary_key: config.primary_key.map((key) => [key]), + json_schema: jsonSchema, + newer_than_field: config.cursor_field, + } +} + +export function createPostgresSource(deps: PostgresSourceDeps = {}): Source { + const createPool = deps.createPool ?? createDefaultPool + const now = deps.now ?? (() => new Date()) + + return { + async *spec() { + yield { type: 'spec' as const, spec: defaultSpec } + }, + + async *check({ config }) { + let pool: Queryable | undefined + try { + pool = await createPool(config) + await pool.query('SELECT 1') + yield msg.connection_status({ status: 'succeeded' }) + } catch (err) { + yield msg.connection_status({ + status: 'failed', + message: err instanceof Error ? err.message : String(err), + }) + } finally { + await pool?.end() + } + }, + + async *discover({ config }) { + const pool = await createPool(config) + try { + const jsonSchema = await discoverTableSchema(pool, config) + yield { + type: 'catalog' as const, + catalog: { streams: [streamFromConfig(config, jsonSchema)] }, + } + } finally { + await pool.end() + } + }, + + async *read({ config, catalog, state }) { + const selected = new Set(catalog.streams.map((configured) => configured.stream.name)) + const name = streamName(config) + if (!selected.has(name)) return + + const pool = await createPool(config) + try { + let currentState = state?.streams[name] as StreamState | undefined + for (;;) { + const pageQuery = buildPageQuery(config, currentState) + const page = await pool.query(pageQuery.text, pageQuery.values) + if (page.rows.length === 0) break + + for (const row of page.rows) { + const serialized = serializeRow(row) + yield msg.record({ + stream: name, + data: serialized, + emitted_at: now().toISOString(), + }) + currentState = nextState(config, row) + } + + yield msg.source_state({ + state_type: 'stream', + stream: name, + data: currentState!, + }) + + if (page.rows.length < config.page_size) break + } + } finally { + await pool.end() + } + }, + } +} + +export default createPostgresSource() diff --git a/packages/source-postgres/src/logger.ts b/packages/source-postgres/src/logger.ts new file mode 100644 index 000000000..978ae4088 --- /dev/null +++ b/packages/source-postgres/src/logger.ts @@ -0,0 +1,4 @@ +import { createLogger } from '@stripe/sync-logger' +import type { Logger } from '@stripe/sync-logger' + +export const log: Logger = createLogger({ name: 'source-postgres' }) diff --git a/packages/source-postgres/src/spec.ts b/packages/source-postgres/src/spec.ts new file mode 100644 index 000000000..7bf5716a5 --- /dev/null +++ b/packages/source-postgres/src/spec.ts @@ -0,0 +1,67 @@ +import { z } from 'zod' +import type { ConnectorSpecification } from '@stripe/sync-protocol' + +const baseConfigFields = { + schema: z.string().default('public').describe('Schema containing the source table'), + primary_key: z + .array(z.string()) + .min(1) + .default(['id']) + .describe('Columns that uniquely identify a row in this stream'), + cursor_field: z.string().describe('Monotonic column used for incremental reads'), + page_size: z.number().int().positive().default(100).describe('Rows to read per page'), + ssl_ca_pem: z + .string() + .optional() + .describe( + 'PEM-encoded CA certificate for SSL verification (required for verify-ca / verify-full with a private CA)' + ), +} + +const urlConfigFields = { + url: z.string().describe('Postgres connection string'), + connection_string: z.string().optional().describe('Deprecated alias for url; prefer url'), +} + +const connectionStringConfigFields = { + url: z.string().optional().describe('Postgres connection string'), + connection_string: z.string().describe('Deprecated alias for url; prefer url'), +} + +const tableConfigFields = { + table: z.string().describe('Table to read from'), + query: z.never().optional(), + stream: z + .string() + .optional() + .describe('Stream name emitted in the catalog and records. Defaults to table name.'), +} + +const queryConfigFields = { + table: z.never().optional(), + query: z + .string() + .describe('SQL query to read from. Must expose the primary_key and cursor_field columns.'), + stream: z.string().describe('Stream name emitted in the catalog and records.'), +} + +export const configSchema = z.union([ + z.object({ ...baseConfigFields, ...urlConfigFields, ...tableConfigFields }), + z.object({ ...baseConfigFields, ...connectionStringConfigFields, ...tableConfigFields }), + z.object({ ...baseConfigFields, ...urlConfigFields, ...queryConfigFields }), + z.object({ ...baseConfigFields, ...connectionStringConfigFields, ...queryConfigFields }), +]) + +export type Config = z.infer + +export const streamStateSpec = z.object({ + cursor: z.unknown().describe('Last emitted cursor_field value.'), + primary_key: z.array(z.unknown()).describe('Last emitted primary key tuple at the cursor.'), +}) + +export type StreamState = z.infer + +export default { + config: z.toJSONSchema(configSchema), + source_state_stream: z.toJSONSchema(streamStateSpec), +} satisfies ConnectorSpecification diff --git a/packages/source-postgres/tsconfig.json b/packages/source-postgres/tsconfig.json new file mode 100644 index 000000000..2481fe545 --- /dev/null +++ b/packages/source-postgres/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src/**/*"], + "exclude": ["src/**/*.test.ts", "src/**/__tests__/**"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e6ae53cb3..537f9f18e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -164,6 +164,9 @@ importers: '@stripe/sync-destination-postgres': specifier: workspace:* version: link:../../packages/destination-postgres + '@stripe/sync-destination-stripe': + specifier: workspace:* + version: link:../../packages/destination-stripe '@stripe/sync-hono-zod-openapi': specifier: workspace:* version: link:../../packages/hono-zod-openapi @@ -173,6 +176,9 @@ importers: '@stripe/sync-protocol': specifier: workspace:* version: link:../../packages/protocol + '@stripe/sync-source-postgres': + specifier: workspace:* + version: link:../../packages/source-postgres '@stripe/sync-source-stripe': specifier: workspace:* version: link:../../packages/source-stripe @@ -430,6 +436,9 @@ importers: '@stripe/sync-destination-postgres': specifier: workspace:* version: link:../packages/destination-postgres + '@stripe/sync-destination-stripe': + specifier: workspace:* + version: link:../packages/destination-stripe '@stripe/sync-engine': specifier: workspace:* version: link:../apps/engine @@ -442,6 +451,9 @@ importers: '@stripe/sync-service': specifier: workspace:* version: link:../apps/service + '@stripe/sync-source-postgres': + specifier: workspace:* + version: link:../packages/source-postgres '@stripe/sync-source-stripe': specifier: workspace:* version: link:../packages/source-stripe @@ -529,6 +541,28 @@ importers: specifier: ^3.2.4 version: 3.2.4(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/destination-stripe: + dependencies: + '@stripe/sync-logger': + specifier: workspace:* + version: link:../logger + '@stripe/sync-openapi': + specifier: workspace:* + version: link:../openapi + '@stripe/sync-protocol': + specifier: workspace:* + version: link:../protocol + zod: + specifier: ^4.3.6 + version: 4.3.6 + devDependencies: + '@types/node': + specifier: ^24.10.1 + version: 24.10.1 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/hono-zod-openapi: dependencies: '@hono/zod-validator': @@ -608,6 +642,31 @@ importers: specifier: ^3.2.1 version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/source-postgres: + dependencies: + '@stripe/sync-logger': + specifier: workspace:* + version: link:../logger + '@stripe/sync-protocol': + specifier: workspace:* + version: link:../protocol + '@stripe/sync-util-postgres': + specifier: workspace:* + version: link:../util-postgres + pg: + specifier: ^8.16.3 + version: 8.16.3 + zod: + specifier: ^4.3.6 + version: 4.3.6 + devDependencies: + '@types/pg': + specifier: ^8.15.5 + version: 8.20.0 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/source-stripe: dependencies: '@stripe/sync-logger': @@ -4661,6 +4720,7 @@ packages: uuid@9.0.1: resolution: {integrity: sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==} + deprecated: uuid@10 and below is no longer supported. For ESM codebases, update to uuid@latest. For CommonJS codebases, use uuid@11 (but be aware this version will likely be deprecated in 2028). hasBin: true vite-node@3.2.4: @@ -7261,6 +7321,14 @@ snapshots: chai: 5.2.0 tinyrainbow: 2.0.0 + '@vitest/mocker@3.2.4(vite@7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1))': + dependencies: + '@vitest/spy': 3.2.4 + estree-walker: 3.0.3 + magic-string: 0.30.21 + optionalDependencies: + vite: 7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + '@vitest/mocker@3.2.4(vite@7.2.2(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1))': dependencies: '@vitest/spy': 3.2.4 @@ -9323,7 +9391,7 @@ snapshots: dependencies: '@types/chai': 5.2.2 '@vitest/expect': 3.2.4 - '@vitest/mocker': 3.2.4(vite@7.2.2(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1)) + '@vitest/mocker': 3.2.4(vite@7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1)) '@vitest/pretty-format': 3.2.4 '@vitest/runner': 3.2.4 '@vitest/snapshot': 3.2.4