diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index d11dfe064..9784eac4f 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -321,8 +321,6 @@ export interface components { poll_events?: boolean; /** @description Port for built-in webhook HTTP listener (e.g. 4242) */ webhook_port?: number; - /** @description Object types to re-fetch from Stripe API on webhook (e.g. ["subscription"]) */ - revalidate_objects?: string[]; /** @description Max objects to backfill per stream (useful for testing) */ backfill_limit?: number; /** @description Override max requests per second (default: auto-derived from API key mode — 20 live, 10 test). */ diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index ace9d0e6b..a695b57c3 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -1081,13 +1081,6 @@ "maximum": 9007199254740991, "description": "Port for built-in webhook HTTP listener (e.g. 4242)" }, - "revalidate_objects": { - "type": "array", - "items": { - "type": "string" - }, - "description": "Object types to re-fetch from Stripe API on webhook (e.g. [\"subscription\"])" - }, "backfill_limit": { "type": "integer", "exclusiveMinimum": 0, diff --git a/apps/service/src/__generated__/openapi.d.ts b/apps/service/src/__generated__/openapi.d.ts index d9a416097..a6e7103ab 100644 --- a/apps/service/src/__generated__/openapi.d.ts +++ b/apps/service/src/__generated__/openapi.d.ts @@ -256,8 +256,6 @@ export interface components { poll_events?: boolean; /** @description Port for built-in webhook HTTP listener (e.g. 4242) */ webhook_port?: number; - /** @description Object types to re-fetch from Stripe API on webhook (e.g. ["subscription"]) */ - revalidate_objects?: string[]; /** @description Max objects to backfill per stream (useful for testing) */ backfill_limit?: number; /** @description Override max requests per second (default: auto-derived from API key mode — 20 live, 10 test). */ diff --git a/apps/service/src/__generated__/openapi.json b/apps/service/src/__generated__/openapi.json index 377ec6ed7..c5e234e34 100644 --- a/apps/service/src/__generated__/openapi.json +++ b/apps/service/src/__generated__/openapi.json @@ -1292,13 +1292,6 @@ "maximum": 9007199254740991, "description": "Port for built-in webhook HTTP listener (e.g. 4242)" }, - "revalidate_objects": { - "type": "array", - "items": { - "type": "string" - }, - "description": "Object types to re-fetch from Stripe API on webhook (e.g. [\"subscription\"])" - }, "backfill_limit": { "type": "integer", "exclusiveMinimum": 0, diff --git a/apps/visualizer/src/lib/pglite.ts b/apps/visualizer/src/lib/pglite.ts index 365f8593f..89ce5b198 100644 --- a/apps/visualizer/src/lib/pglite.ts +++ b/apps/visualizer/src/lib/pglite.ts @@ -13,11 +13,7 @@ import { PGlite } from '@electric-sql/pglite' import { useEffect, useState, useCallback, useRef } from 'react' -import { - SpecParser, - OPENAPI_RESOURCE_TABLE_ALIASES, - type ParsedResourceTable, -} from '@stripe/sync-source-stripe/browser' +import { SpecParser, type ParsedResourceTable } from '@stripe/sync-source-stripe/browser' type PGliteInstance = InstanceType type QueryResult = Awaited> @@ -154,13 +150,8 @@ function generateSchema(spec: Record): { for (const schemaDef of Object.values(schemas)) { const resourceId = (schemaDef as Record)['x-resourceId'] if (!resourceId || typeof resourceId !== 'string') continue - const alias = OPENAPI_RESOURCE_TABLE_ALIASES[resourceId] - if (alias) { - allTableNames.add(alias) - } else { - const normalized = resourceId.toLowerCase().replace(/\./g, '_') - allTableNames.add(normalized.endsWith('s') ? normalized : `${normalized}s`) - } + const normalized = resourceId.toLowerCase().replace(/\./g, '_') + allTableNames.add(normalized.endsWith('s') ? normalized : `${normalized}s`) } const parser = new SpecParser() diff --git a/e2e/conformance.test.ts b/e2e/conformance.test.ts index 41155d6ce..4ccb815dd 100644 --- a/e2e/conformance.test.ts +++ b/e2e/conformance.test.ts @@ -95,8 +95,7 @@ describe.each(sources)('source: $name', ({ name, mod: initialMod }) => { it('spec().config is a valid JSON Schema object', async () => { const src = mod.default as Record const spec = (await collectFirst(src.spec(), 'spec')).spec - expect(spec.config.type).toBe('object') - expect(typeof spec.config.properties).toBe('object') + expect(spec.config.type === 'object' || Array.isArray(spec.config.anyOf)).toBe(true) }) it('exports a named configSchema (Zod schema)', () => { @@ -146,8 +145,7 @@ describe.each(destinations)('destination: $name', ({ name, mod: initialMod }) => it('spec().config is a valid JSON Schema object', async () => { const dest = mod.default as Record const spec = (await collectFirst(dest.spec(), 'spec')).spec - expect(spec.config.type).toBe('object') - expect(typeof spec.config.properties).toBe('object') + expect(spec.config.type === 'object' || Array.isArray(spec.config.anyOf)).toBe(true) }) it('exports a named configSchema (Zod schema)', () => { diff --git a/packages/openapi/__tests__/listFnResolver.test.ts b/packages/openapi/__tests__/listFnResolver.test.ts index b0556b185..8ce1b6977 100644 --- a/packages/openapi/__tests__/listFnResolver.test.ts +++ b/packages/openapi/__tests__/listFnResolver.test.ts @@ -28,8 +28,8 @@ describe('SpecParser.discoverListEndpoints', () => { supportsStartingAfter: true, supportsEndingBefore: true, }) - expect(endpoints.get('early_fraud_warning')).toEqual({ - tableName: 'early_fraud_warning', + expect(endpoints.get('radar_early_fraud_warning')).toEqual({ + tableName: 'radar_early_fraud_warning', resourceId: 'radar.early_fraud_warning', apiPath: '/v1/radar/early_fraud_warnings', supportsCreatedFilter: true, diff --git a/packages/openapi/__tests__/specParser.test.ts b/packages/openapi/__tests__/specParser.test.ts index dce45ab87..d83d21712 100644 --- a/packages/openapi/__tests__/specParser.test.ts +++ b/packages/openapi/__tests__/specParser.test.ts @@ -4,16 +4,16 @@ import { minimalStripeOpenApiSpec } from './fixtures/minimalSpec' import type { OpenApiSpec } from '../../types' describe('SpecParser', () => { - it('parses aliased resources into deterministic tables and column types', () => { + it('parses resources into deterministic tables and column types', () => { const parser = new SpecParser() const parsed = parser.parse(minimalStripeOpenApiSpec, { - allowedTables: ['checkout_session', 'customer', 'early_fraud_warning'], + allowedTables: ['checkout_session', 'customer', 'radar_early_fraud_warning'], }) expect(parsed.tables.map((table) => table.tableName)).toEqual([ 'checkout_session', 'customer', - 'early_fraud_warning', + 'radar_early_fraud_warning', ]) const customers = parsed.tables.find((table) => table.tableName === 'customer') @@ -709,14 +709,14 @@ describe('SpecParser', () => { const tableNames = parsed.tables.map((t) => t.tableName) expect(tableNames).toEqual([ - 'active_entitlement', 'checkout_session', 'customer', - 'early_fraud_warning', - 'feature', + 'entitlements_active_entitlement', + 'entitlements_feature', 'plan', 'price', 'product', + 'radar_early_fraud_warning', 'subscription_item', 'v2_core_account', 'v2_core_event_destination', @@ -828,11 +828,11 @@ describe('SpecParser', () => { expect(tableNames).toContain('customer') }) - it('resolves table name aliases from x-resourceId during discovery', () => { + it('derives table names from x-resourceId via dot-to-underscore', () => { const parser = new SpecParser() const parsed = parser.parse(minimalStripeOpenApiSpec) - const earlyFraud = parsed.tables.find((t) => t.tableName === 'early_fraud_warning') + const earlyFraud = parsed.tables.find((t) => t.tableName === 'radar_early_fraud_warning') expect(earlyFraud).toBeDefined() expect(earlyFraud?.resourceId).toBe('radar.early_fraud_warning') @@ -853,7 +853,7 @@ describe('SpecParser.discoverSyncableTables', () => { expect(tables).toContain('product') expect(tables).toContain('plan') expect(tables).toContain('checkout_session') - expect(tables).toContain('early_fraud_warning') + expect(tables).toContain('radar_early_fraud_warning') }) it('excludes resources that are listable but have no webhook events', () => { diff --git a/packages/openapi/browser.ts b/packages/openapi/browser.ts index dd3b194c4..d9e60e3c0 100644 --- a/packages/openapi/browser.ts +++ b/packages/openapi/browser.ts @@ -1,7 +1,7 @@ // Browser-safe entry. Excludes specFetchHelper (which imports node:fs / node:path) // so consumers in webpack/Next.js client bundles can import SpecParser without errors. -export { SpecParser, OPENAPI_RESOURCE_TABLE_ALIASES, resolveTableName } from './specParser.js' +export { SpecParser, resolveTableName } from './specParser.js' export type { ListEndpoint, NestedEndpoint } from './specParser.js' export { parsedTableToJsonSchema } from './jsonSchemaConverter.js' export type { diff --git a/packages/openapi/index.ts b/packages/openapi/index.ts index 416fb62fe..2d3941675 100644 --- a/packages/openapi/index.ts +++ b/packages/openapi/index.ts @@ -1,5 +1,5 @@ export type * from './types.js' -export { SpecParser, OPENAPI_RESOURCE_TABLE_ALIASES, resolveTableName } from './specParser.js' +export { SpecParser, resolveTableName } from './specParser.js' export type { CreateEndpoint, ListEndpoint, NestedEndpoint } from './specParser.js' export { diff --git a/packages/openapi/runtimeMappings.ts b/packages/openapi/runtimeMappings.ts deleted file mode 100644 index 87e91736c..000000000 --- a/packages/openapi/runtimeMappings.ts +++ /dev/null @@ -1,11 +0,0 @@ -/** - * Overrides for x-resourceId values whose table name cannot be inferred by the - * default snake_case + dot-to-underscore rule in SpecParser.resolveTableName. - * Values are singular, mirroring Stripe resource names (rule #2 of the schema spec). - */ -export const OPENAPI_RESOURCE_TABLE_ALIASES: Record = { - 'radar.early_fraud_warning': 'early_fraud_warning', - 'entitlements.active_entitlement': 'active_entitlement', - 'entitlements.feature': 'feature', - item: 'checkout_session_line_item', -} diff --git a/packages/openapi/specParser.ts b/packages/openapi/specParser.ts index 8e7784b90..f1e2243f8 100644 --- a/packages/openapi/specParser.ts +++ b/packages/openapi/specParser.ts @@ -7,7 +7,6 @@ import type { ParsedOpenApiSpec, ScalarType, } from './types.js' -import { OPENAPI_RESOURCE_TABLE_ALIASES } from './runtimeMappings.js' const SCHEMA_REF_PREFIX = '#/components/schemas/' const CRUD_SUFFIXES = ['.created', '.updated', '.deleted'] as const @@ -22,16 +21,12 @@ const RESERVED_COLUMNS = new Set([ 'deleted', ]) -export { OPENAPI_RESOURCE_TABLE_ALIASES } - /** * Resolve a Stripe x-resourceId to a canonical table name. * Singular, snake_cased, with version namespace dots converted to underscores. + * Optional `aliases` map allows callers to override specific resourceIds. */ -export function resolveTableName( - resourceId: string, - aliases: Record = OPENAPI_RESOURCE_TABLE_ALIASES -): string { +export function resolveTableName(resourceId: string, aliases: Record = {}): string { const alias = aliases[resourceId] if (alias) return alias return resourceId.toLowerCase().replace(/[.]/g, '_') @@ -105,7 +100,7 @@ export class SpecParser { throw new Error('OpenAPI spec is missing components.schemas') } - const aliases = { ...OPENAPI_RESOURCE_TABLE_ALIASES, ...(options.resourceAliases ?? {}) } + const aliases = options.resourceAliases ?? {} const excluded = new Set(options.excludedTables ?? []) const allowedTables = options.allowedTables ? new Set(options.allowedTables.filter((t) => !excluded.has(t))) @@ -198,7 +193,7 @@ export class SpecParser { excluded?: ReadonlySet } = {} ): ParsedOpenApiSpec { - const aliases = { ...OPENAPI_RESOURCE_TABLE_ALIASES, ...(options.aliases ?? {}) } + const aliases = options.aliases ?? {} const syncableTables = this.discoverSyncableTables(spec, { aliases, excluded: options.excluded, @@ -220,7 +215,7 @@ export class SpecParser { excluded?: ReadonlySet } = {} ): Set { - const aliases = { ...OPENAPI_RESOURCE_TABLE_ALIASES, ...(options.aliases ?? {}) } + const aliases = options.aliases ?? {} const excluded = options.excluded ?? new Set() const listableIds = this.discoverListableResourceIds(spec, { includeNested: true }) const webhookIds = this.discoverWebhookUpdatableResourceIds(spec, listableIds) @@ -240,7 +235,7 @@ export class SpecParser { */ discoverListEndpoints( spec: OpenApiSpec, - aliases: Record = OPENAPI_RESOURCE_TABLE_ALIASES + aliases: Record = {} ): Map { const endpoints = new Map() for (const raw of this.iterListPaths(spec)) { @@ -270,7 +265,7 @@ export class SpecParser { discoverNestedEndpoints( spec: OpenApiSpec, topLevelEndpoints: Map, - aliases: Record = OPENAPI_RESOURCE_TABLE_ALIASES + aliases: Record = {} ): NestedEndpoint[] { const topLevelByPath = new Map() for (const endpoint of topLevelEndpoints.values()) { @@ -304,7 +299,7 @@ export class SpecParser { */ discoverCreateEndpoints( spec: OpenApiSpec, - aliases: Record = OPENAPI_RESOURCE_TABLE_ALIASES + aliases: Record = {} ): Map { const endpoints = new Map() for (const raw of this.iterListPaths(spec)) { diff --git a/packages/source-stripe/.env.sample b/packages/source-stripe/.env.sample index 6374ebe05..92a692d89 100644 --- a/packages/source-stripe/.env.sample +++ b/packages/source-stripe/.env.sample @@ -38,10 +38,6 @@ BACKFILL_RELATED_ENTITIES=true # Max number of connections for the Postgres connection pool, higher value lead to more concurrent queries, but also more load on the database (connections are expensive) MAX_POSTGRES_CONNECTIONS=20 -# If true, the webhook data is not used and instead the webhook is just a trigger to fetch the entity from Stripe again. This ensures that a race condition with failed webhooks can never accidentally overwrite the data with an older state. -# Default: -REVALIDATE_OBJECTS_VIA_STRIPE_API=payment_intent,invoice,customer,subscription - # optional # Disable the automated database migrations on app startup # Default: false diff --git a/packages/source-stripe/src/__snapshots__/catalog.test.ts.snap b/packages/source-stripe/src/__snapshots__/catalog.test.ts.snap index b573e70e3..86c5c2cc6 100644 --- a/packages/source-stripe/src/__snapshots__/catalog.test.ts.snap +++ b/packages/source-stripe/src/__snapshots__/catalog.test.ts.snap @@ -20,9 +20,8 @@ exports[`catalogFromOpenApi stream list > all listable tables (no webhook filter "credit_note", "customer", "dispute", - "early_fraud_warning", + "entitlements_feature", "event", - "feature", "file", "file_link", "financial_connections_account", @@ -51,6 +50,7 @@ exports[`catalogFromOpenApi stream list > all listable tables (no webhook filter "product", "promotion_code", "quote", + "radar_early_fraud_warning", "radar_value_list", "refund", "reporting_report_run", @@ -95,7 +95,6 @@ exports[`catalogFromOpenApi stream list > default: only tables with webhook even "credit_note", "customer", "dispute", - "early_fraud_warning", "file", "financial_connections_account", "identity_verification_session", @@ -117,6 +116,7 @@ exports[`catalogFromOpenApi stream list > default: only tables with webhook even "product", "promotion_code", "quote", + "radar_early_fraud_warning", "refund", "reporting_report_run", "reporting_report_type", diff --git a/packages/source-stripe/src/catalog.test.ts b/packages/source-stripe/src/catalog.test.ts index f6bd8d8b1..a523e41ae 100644 --- a/packages/source-stripe/src/catalog.test.ts +++ b/packages/source-stripe/src/catalog.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from 'vitest' -import { SpecParser, OPENAPI_RESOURCE_TABLE_ALIASES } from '@stripe/sync-openapi' +import { SpecParser } from '@stripe/sync-openapi' import { buildResourceRegistry } from './resourceRegistry.js' import { catalogFromOpenApi } from './catalog.js' import { resolveOpenApiSpec, BUNDLED_API_VERSION } from '@stripe/sync-openapi' @@ -10,9 +10,7 @@ describe('catalogFromOpenApi stream list', () => { it('default: only tables with webhook events', async () => { const { spec, apiVersion } = await resolved - const parsed = parser.parse(spec, { - resourceAliases: OPENAPI_RESOURCE_TABLE_ALIASES, - }) + const parsed = parser.parse(spec) const allowedTables = new Set(parsed.tables.map((t) => t.tableName)) const registry = buildResourceRegistry( spec, @@ -31,9 +29,7 @@ describe('catalogFromOpenApi stream list', () => { it('every stream in the catalog has supports_realtime_sync = true', async () => { const { spec, apiVersion } = await resolved - const parsed = parser.parse(spec, { - resourceAliases: OPENAPI_RESOURCE_TABLE_ALIASES, - }) + const parsed = parser.parse(spec) const allowedTables = new Set(parsed.tables.map((t) => t.tableName)) const registry = buildResourceRegistry( spec, @@ -54,7 +50,6 @@ describe('catalogFromOpenApi stream list', () => { const { spec, apiVersion } = await resolved const allRegistry = buildResourceRegistry(spec, 'sk_test_fake', apiVersion) const parsed = parser.parse(spec, { - resourceAliases: OPENAPI_RESOURCE_TABLE_ALIASES, allowedTables: Object.values(allRegistry).map((r) => r.tableName), }) const registry = buildResourceRegistry( @@ -74,9 +69,7 @@ describe('catalogFromOpenApi stream list', () => { it('every stream has json_schema (no ghost tables)', async () => { const { spec, apiVersion } = await resolved - const parsed = parser.parse(spec, { - resourceAliases: OPENAPI_RESOURCE_TABLE_ALIASES, - }) + const parsed = parser.parse(spec) const allowedTables = new Set(parsed.tables.map((t) => t.tableName)) const registry = buildResourceRegistry( spec, diff --git a/packages/source-stripe/src/index.test.ts b/packages/source-stripe/src/index.test.ts index 2b652beb3..c0ec7fd76 100644 --- a/packages/source-stripe/src/index.test.ts +++ b/packages/source-stripe/src/index.test.ts @@ -1553,7 +1553,10 @@ describe('StripeSource', () => { it('entitlement summary event yields individual entitlement records', async () => { const registry: Record = { - active_entitlement: makeConfig({ order: 1, tableName: 'active_entitlement' }), + entitlements_active_entitlement: makeConfig({ + order: 1, + tableName: 'entitlements_active_entitlement', + }), } vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) @@ -1587,7 +1590,10 @@ describe('StripeSource', () => { }) const messages = await collect( - source.read({ config, catalog: catalog({ name: 'active_entitlement' }) }, toIter(event)) + source.read( + { config, catalog: catalog({ name: 'entitlements_active_entitlement' }) }, + toIter(event) + ) ) // 2 entitlement records + 1 state @@ -1595,7 +1601,7 @@ describe('StripeSource', () => { expect(messages[0]).toMatchObject({ type: 'record', record: { - stream: 'active_entitlement', + stream: 'entitlements_active_entitlement', data: { id: 'ent_1', feature: 'feat_premium', @@ -1607,7 +1613,7 @@ describe('StripeSource', () => { expect(messages[1]).toMatchObject({ type: 'record', record: { - stream: 'active_entitlement', + stream: 'entitlements_active_entitlement', data: { id: 'ent_2', feature: 'feat_basic', @@ -1618,84 +1624,11 @@ describe('StripeSource', () => { }) expect(messages[2]).toMatchObject({ type: 'source_state', - source_state: { stream: 'active_entitlement', data: { eventId: 'evt_ent_1' } }, - }) - }) - - it('revalidation re-fetches from Stripe API when object is not in final state', async () => { - const retrieveFn = vi.fn().mockResolvedValueOnce({ - id: 'sub_1', - object: 'subscription', - status: 'active', - extra: 'revalidated', - }) - - const registry: Record = { - subscription: makeConfig({ - order: 1, - tableName: 'subscription', - retrieveFn: retrieveFn as ResourceConfig['retrieveFn'], - isFinalState: (s: { status: string }) => s.status === 'canceled', - }), - } - - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) - const event = makeEvent({ - id: 'evt_reval_1', - type: 'customer.subscription.updated', - created: 1700000000, - dataObject: { id: 'sub_1', object: 'subscription', status: 'active' }, - }) - - const messages = await collect( - source.read( - { - config: { ...config, revalidate_objects: ['subscription'] }, - catalog: catalog({ name: 'subscription' }), - }, - toIter(event) - ) - ) - - expect(retrieveFn).toHaveBeenCalledWith('sub_1') - const records = messages.filter((m): m is RecordMessage => m.type === 'record') - expect(records[0].record.data).toMatchObject({ id: 'sub_1', extra: 'revalidated' }) - }) - - it('revalidation skips re-fetch when object is in final state', async () => { - const retrieveFn = vi.fn() - - const registry: Record = { - subscription: makeConfig({ - order: 1, - tableName: 'subscription', - retrieveFn: retrieveFn as ResourceConfig['retrieveFn'], - isFinalState: (s: { status: string }) => s.status === 'canceled', - }), - } - - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) - const event = makeEvent({ - id: 'evt_reval_2', - type: 'customer.subscription.deleted', - created: 1700000000, - dataObject: { id: 'sub_1', object: 'subscription', status: 'canceled' }, + source_state: { + stream: 'entitlements_active_entitlement', + data: { eventId: 'evt_ent_1' }, + }, }) - - const messages = await collect( - source.read( - { - config: { ...config, revalidate_objects: ['subscription'] }, - catalog: catalog({ name: 'subscription' }), - }, - toIter(event) - ) - ) - - // Should NOT re-fetch because isFinalState returns true - expect(retrieveFn).not.toHaveBeenCalled() - const records = messages.filter((m): m is RecordMessage => m.type === 'record') - expect(records[0].record.data).toMatchObject({ id: 'sub_1', status: 'canceled' }) }) it('preview objects (no id) produce no output', async () => { diff --git a/packages/source-stripe/src/index.ts b/packages/source-stripe/src/index.ts index 4f5d927ae..cabc32735 100644 --- a/packages/source-stripe/src/index.ts +++ b/packages/source-stripe/src/index.ts @@ -322,7 +322,7 @@ export function createStripeSource( ) } else { const event = stripeEventSchema.parse(input) - yield* processStripeEvent(event, config, catalog, registry, streamNames, accountId) + yield* processStripeEvent(event, catalog, registry, streamNames, accountId) } } return @@ -359,7 +359,7 @@ export function createStripeSource( maxConcurrentStreams, signal, drainQueue: wsClient - ? () => inputQueue.drain(config, catalog, registry, streamNames, accountId) + ? () => inputQueue.drain(catalog, registry, streamNames, accountId) : undefined, }) @@ -384,7 +384,7 @@ export function createStripeSource( // After backfill: stream live events from WebSocket and/or HTTP if (wsClient || httpServer) { // Drain anything that arrived during backfill - yield* inputQueue.drain(config, catalog, registry, streamNames, accountId) + yield* inputQueue.drain(catalog, registry, streamNames, accountId) // Block on new events (infinite loop until all live sources close) while (wsClient || httpServer) { @@ -402,7 +402,6 @@ export function createStripeSource( } else { yield* processStripeEvent( queued.data, - config, catalog, registry, streamNames, @@ -438,7 +437,7 @@ export default createStripeSource() export { subdivideRanges } from '@stripe/sync-protocol' export { buildResourceRegistry, DEFAULT_SYNC_OBJECTS, EXCLUDED_TABLES } from './resourceRegistry.js' export { catalogFromOpenApi } from './catalog.js' -export { SpecParser, OPENAPI_RESOURCE_TABLE_ALIASES } from '@stripe/sync-openapi' +export { SpecParser } from '@stripe/sync-openapi' export type { ParsedResourceTable, ParsedOpenApiSpec } from '@stripe/sync-openapi' export type { RateLimiter } from './rate-limiter.js' export { createInMemoryRateLimiter } from './rate-limiter.js' diff --git a/packages/source-stripe/src/process-event.ts b/packages/source-stripe/src/process-event.ts index 98f77a647..8eb05ed8b 100644 --- a/packages/source-stripe/src/process-event.ts +++ b/packages/source-stripe/src/process-event.ts @@ -4,12 +4,11 @@ import type { RecordMessage, SourceStateMessage, } from '@stripe/sync-protocol' +import { resolveTableName } from '@stripe/sync-openapi' import type { StripeEvent } from './spec.js' -import type { Config } from './index.js' import { msg } from './index.js' import { log } from './logger.js' import type { ResourceConfig } from './types.js' -import { normalizeStripeObjectName } from './resourceRegistry.js' // MARK: - Delete event detection @@ -58,7 +57,7 @@ export function fromStripeEvent( | undefined if (!dataObject?.object) return null - const objectType = normalizeStripeObjectName(dataObject.object) + const objectType = resolveTableName(dataObject.object) const config = registry[objectType] if (!config) return null @@ -93,15 +92,13 @@ export function fromStripeEvent( /** * Process a single verified StripeEvent through the full pipeline: - * entitlements, registry filter, delete detection, revalidation, - * subscription items. + * entitlements, registry filter, delete detection, subscription items. * * This is the canonical function — all event paths (webhook, events API, * WebSocket) converge here once a StripeEvent is in hand. */ export async function* processStripeEvent( event: StripeEvent, - config: Config, catalog: ConfiguredCatalog, registry: Record, streamNames: Set, @@ -121,7 +118,7 @@ export async function* processStripeEvent( // 2. Entitlements special case — the summary object type doesn't map to a // registry entry, so we must handle it before the registry lookup. if (event.type === 'entitlements.active_entitlement_summary.updated') { - if (!streamNames.has('active_entitlement')) return + if (!streamNames.has('entitlements_active_entitlement')) return const summary = dataObject as { customer: string entitlements: { @@ -137,7 +134,7 @@ export async function* processStripeEvent( } for (const e of summary.entitlements.data) { yield msg.record({ - stream: 'active_entitlement', + stream: 'entitlements_active_entitlement', emitted_at: new Date().toISOString(), data: { id: e.id, @@ -146,7 +143,7 @@ export async function* processStripeEvent( customer: summary.customer, livemode: e.livemode, lookup_key: e.lookup_key, - [newerThanField('active_entitlement')]: + [newerThanField('entitlements_active_entitlement')]: typeof e.updated === 'number' ? e.updated : event.created, ...(accountId ? { _account_id: accountId } : {}), }, @@ -154,14 +151,14 @@ export async function* processStripeEvent( } yield msg.source_state({ state_type: 'stream', - stream: 'active_entitlement', + stream: 'entitlements_active_entitlement', data: { eventId: event.id, eventCreated: event.created }, }) return } // 3. Filter by registry and catalog - const objectType = normalizeStripeObjectName(dataObject.object) + const objectType = resolveTableName(dataObject.object) const resourceConfig = registry[objectType] if (!resourceConfig) return if (!dataObject.id) return // skip preview/draft objects @@ -189,19 +186,9 @@ export async function* processStripeEvent( return } - // 5. Revalidation — re-fetch from Stripe API if configured - let data: Record = dataObject - if ( - config.revalidate_objects?.some((r) => normalizeStripeObjectName(r) === objectType) && - resourceConfig.isFinalState && - !resourceConfig.isFinalState(dataObject) - ) { - data = (await resourceConfig.retrieveFn!(dataObject.id)) as Record - } - - // 6. Yield main record + // 5. Yield main record const recordData: Record = { - ...data, + ...dataObject, [newerThanField(resourceConfig.tableName)]: _updated_at, ...(accountId ? { _account_id: accountId } : {}), } @@ -211,12 +198,15 @@ export async function* processStripeEvent( emitted_at: new Date().toISOString(), }) - // 7. Yield subscription items if applicable. - if (objectType === 'subscription' && (data as { items?: { data?: unknown[] } }).items?.data) { + // 6. Yield subscription items if applicable. + if ( + objectType === 'subscription' && + (dataObject as { items?: { data?: unknown[] } }).items?.data + ) { const subscriptionItemsNewerThanField = catalog.streams.find((cs) => cs.stream.name === 'subscription_item')?.stream .newer_than_field ?? newerThanField(resourceConfig.tableName) - for (const item of (data as { items: { data: Record[] } }).items.data) { + for (const item of (dataObject as { items: { data: Record[] } }).items.data) { yield msg.record({ stream: 'subscription_item', data: { diff --git a/packages/source-stripe/src/resourceRegistry.ts b/packages/source-stripe/src/resourceRegistry.ts index 7c4bc36b9..2e3cbb2fd 100644 --- a/packages/source-stripe/src/resourceRegistry.ts +++ b/packages/source-stripe/src/resourceRegistry.ts @@ -6,14 +6,7 @@ import type { NestedEndpoint, ParsedResourceTable, } from '@stripe/sync-openapi' -import { - SpecParser, - buildListFn, - buildRetrieveFn, - isV2Path, - resolveTableName, - OPENAPI_RESOURCE_TABLE_ALIASES, -} from '@stripe/sync-openapi' +import { SpecParser, buildListFn, buildRetrieveFn, isV2Path } from '@stripe/sync-openapi' import { tracedFetch } from './transport.js' import { withHttpRetry } from './retry.js' @@ -73,19 +66,11 @@ export const DEFAULT_SYNC_OBJECTS: readonly string[] = [ 'tax_id', 'credit_note', 'dispute', - 'early_fraud_warning', + 'radar_early_fraud_warning', 'refund', 'checkout_session', ] -export const REVALIDATE_ENTITIES = [ - ...DEFAULT_SYNC_OBJECTS, - 'radar.early_fraud_warning', - 'subscription_schedule', - 'entitlements', -] as const -export type RevalidateEntityName = (typeof REVALIDATE_ENTITIES)[number] - /** * Build a ResourceConfig for every listable resource discovered in the OpenAPI spec. * All resources get list + retrieve functions derived dynamically from the spec paths. @@ -194,60 +179,3 @@ export function buildResourceRegistry( return registry } - -export const STRIPE_OBJECT_TO_SYNC_OBJECT_ALIASES: Record = { - 'checkout.session': 'checkout_session', - 'radar.early_fraud_warning': 'early_fraud_warning', - 'entitlements.active_entitlement': 'active_entitlement', - 'entitlements.feature': 'active_entitlement', -} - -export function normalizeStripeObjectName(stripeObjectName: string): string { - return resolveTableName(stripeObjectName, { - ...OPENAPI_RESOURCE_TABLE_ALIASES, - ...STRIPE_OBJECT_TO_SYNC_OBJECT_ALIASES, - }) -} - -export const PREFIX_RESOURCE_MAP: Record = { - cus_: 'customer', - gcus_: 'customer', - in_: 'invoice', - price_: 'price', - prod_: 'product', - sub_: 'subscription', - seti_: 'setup_intent', - pm_: 'payment_method', - dp_: 'dispute', - du_: 'dispute', - ch_: 'charge', - pi_: 'payment_intent', - txi_: 'tax_id', - cn_: 'credit_note', - issfr_: 'early_fraud_warning', - prv_: 'review', - re_: 'refund', - feat_: 'active_entitlement', - cs_: 'checkout_session', -} - -const SORTED_PREFIXES = Object.keys(PREFIX_RESOURCE_MAP).sort((a, b) => b.length - a.length) - -export function getResourceFromPrefix(stripeId: string): string | undefined { - const prefix = SORTED_PREFIXES.find((p) => stripeId.startsWith(p)) - return prefix ? PREFIX_RESOURCE_MAP[prefix] : undefined -} - -export function getResourceConfigFromId( - stripeId: string, - registry: Record -): ResourceConfig | undefined { - const resourceName = getResourceFromPrefix(stripeId) - return resourceName ? registry[resourceName] : undefined -} - -export function getTableName(object: string, registry: Record): string { - const config = registry[object] - if (!config) throw new Error(`No resource config found for object type: ${object}`) - return config.tableName -} diff --git a/packages/source-stripe/src/spec.ts b/packages/source-stripe/src/spec.ts index 769fe1898..3a824dad7 100644 --- a/packages/source-stripe/src/spec.ts +++ b/packages/source-stripe/src/spec.ts @@ -40,10 +40,6 @@ export const configSchema = z.object({ .int() .optional() .describe('Port for built-in webhook HTTP listener (e.g. 4242)'), - revalidate_objects: z - .array(z.string()) - .optional() - .describe('Object types to re-fetch from Stripe API on webhook (e.g. ["subscription"])'), backfill_limit: z .number() .int() diff --git a/packages/source-stripe/src/src-events-api.ts b/packages/source-stripe/src/src-events-api.ts index 5ae6c8100..44bbb50f0 100644 --- a/packages/source-stripe/src/src-events-api.ts +++ b/packages/source-stripe/src/src-events-api.ts @@ -74,7 +74,7 @@ export async function* pollEvents(opts: { let latestEventCreated = cursor for (const event of events) { - yield* processStripeEvent(event, config, catalog, registry, streamNames, accountId) + yield* processStripeEvent(event, catalog, registry, streamNames, accountId) if (event.created > latestEventCreated) { latestEventCreated = event.created } diff --git a/packages/source-stripe/src/src-webhook.ts b/packages/source-stripe/src/src-webhook.ts index 41b1927ce..8323a2493 100644 --- a/packages/source-stripe/src/src-webhook.ts +++ b/packages/source-stripe/src/src-webhook.ts @@ -28,7 +28,7 @@ export async function* processWebhookInput( const signature = (input.headers['stripe-signature'] as string) ?? '' const event = verifyWebhookSignature(input.body, signature, config.webhook_secret) log.info({ eventId: event.id, eventType: event.type }, 'webhook signature verified') - yield* processStripeEvent(event, config, catalog, registry, streamNames, accountId) + yield* processStripeEvent(event, catalog, registry, streamNames, accountId) } // MARK: - LiveInput queue @@ -83,7 +83,6 @@ export function createInputQueue() { } async function* drain( - config: Config, catalog: ConfiguredCatalog, registry: Record, streamNames: Set, @@ -93,7 +92,6 @@ export function createInputQueue() { const queued = queue.shift()! yield* processStripeEvent( queued.data as StripeEvent, - config, catalog, registry, streamNames, diff --git a/packages/source-stripe/src/types.ts b/packages/source-stripe/src/types.ts index 9c6dace0a..2fb906bc8 100644 --- a/packages/source-stripe/src/types.ts +++ b/packages/source-stripe/src/types.ts @@ -1,5 +1,4 @@ import type { ListFn, RetrieveFn, ParsedResourceTable } from '@stripe/sync-openapi' -import type { RevalidateEntityName } from './resourceRegistry.js' /** * Simple logger interface compatible with both pino and console @@ -24,9 +23,6 @@ export type BaseResourceConfig = { sync?: boolean /** Resource types that must be backfilled before this one (e.g. price depends on product) */ dependencies?: readonly string[] - /** Function to check if an entity is in a final state and doesn't need revalidation */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - isFinalState?: (entity: any) => boolean } export type ResourceConfig = BaseResourceConfig & { @@ -49,5 +45,3 @@ export type ResourceConfig = BaseResourceConfig & { /** For nested resources, the parent path parameter name */ parentParamName?: string } - -export type RevalidateEntity = RevalidateEntityName diff --git a/packages/test-utils/src/openapi/endpoints.ts b/packages/test-utils/src/openapi/endpoints.ts index 9ebe12dd0..c3ec7536e 100644 --- a/packages/test-utils/src/openapi/endpoints.ts +++ b/packages/test-utils/src/openapi/endpoints.ts @@ -3,7 +3,6 @@ import { isV2Path, resolveOpenApiSpec, SpecParser, - OPENAPI_RESOURCE_TABLE_ALIASES, parsedTableToJsonSchema, type ListEndpoint, type OpenApiOperationObject, @@ -54,9 +53,7 @@ export async function resolveEndpointSet(options: { const jsonSchemaMap = new Map>() try { - const parsed = parser.parse(resolved.spec, { - resourceAliases: OPENAPI_RESOURCE_TABLE_ALIASES, - }) + const parsed = parser.parse(resolved.spec) for (const table of parsed.tables) { jsonSchemaMap.set(table.tableName, parsedTableToJsonSchema(table)) }