Skip to content
52 changes: 24 additions & 28 deletions packages/sync-engine/src/cli/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import {
StripeSync,
runMigrations,
createStripeWebSocketClient,
type SyncObject,
type StripeWebSocketClient,
type StripeWebhookEvent,
} from '../index'
import { createTunnel, type NgrokTunnel } from './ngrok'
import { type StripeObject } from '../resourceRegistry'
import { SYNC_OBJECTS, type StripeObject } from '../resourceRegistry'
import { install, uninstall } from '../supabase'
import { SIGMA_INGESTION_CONFIGS } from '../sigma/sigmaIngestionConfigs'

Expand Down Expand Up @@ -112,28 +111,6 @@ export interface DeployOptions {

export type { CliOptions }

const VALID_SYNC_OBJECTS: SyncObject[] = [
'all',
'customer',
'customer_with_entitlements',
'invoice',
'price',
'product',
'subscription',
'subscription_schedules',
'setup_intent',
'payment_method',
'dispute',
'charge',
'payment_intent',
'plan',
'tax_id',
'credit_note',
'early_fraud_warning',
'refund',
'checkout_sessions',
]

/**
* Backfill command - backfills a specific entity type from Stripe.
*/
Expand All @@ -149,11 +126,11 @@ export async function backfillCommand(options: CliOptions, entityName: string):

// Validate entity name - allow sigma table names when sigma is enabled
const sigmaTableNames = enableSigma ? Object.keys(SIGMA_INGESTION_CONFIGS) : []
const validEntities = [...VALID_SYNC_OBJECTS, ...sigmaTableNames]
if (!validEntities.includes(entityName as SyncObject)) {
const validEntities = new Set<string>([...SYNC_OBJECTS, ...sigmaTableNames])
if (!validEntities.has(entityName)) {
const entityList = enableSigma
? `${VALID_SYNC_OBJECTS.join(', ')}, and ${sigmaTableNames.length} sigma tables`
: VALID_SYNC_OBJECTS.join(', ')
? `${SYNC_OBJECTS.join(', ')}, and ${sigmaTableNames.length} sigma tables`
: SYNC_OBJECTS.join(', ')
console.error(
chalk.red(`Error: Invalid entity name "${entityName}". Valid entities are: ${entityList}`)
)
Expand Down Expand Up @@ -226,9 +203,14 @@ export async function backfillCommand(options: CliOptions, entityName: string):

// Run migrations first (will check for legacy installations and throw if detected)
try {
const schemaName = process.env.SYNC_SCHEMA_NAME ?? undefined
const syncTablesSchemaName = process.env.SYNC_TABLES_SCHEMA_NAME ?? undefined
await runMigrations({
databaseUrl: config.databaseUrl,
enableSigma,
stripeApiVersion: process.env.STRIPE_API_VERSION || '2020-08-27',
schemaName,
syncTablesSchemaName,
})
} catch (migrationError) {
console.error(chalk.red('Failed to run migrations:'))
Expand Down Expand Up @@ -358,9 +340,14 @@ export async function migrateCommand(options: CliOptions): Promise<void> {
}

try {
const schemaName = process.env.SYNC_SCHEMA_NAME ?? undefined
const syncTablesSchemaName = process.env.SYNC_TABLES_SCHEMA_NAME ?? undefined
await runMigrations({
databaseUrl,
enableSigma,
stripeApiVersion: process.env.STRIPE_API_VERSION || '2020-08-27',
schemaName,
syncTablesSchemaName,
})
console.log(chalk.green('✓ Migrations completed successfully'))
} catch (migrationError) {
Expand Down Expand Up @@ -480,9 +467,14 @@ export async function syncCommand(options: CliOptions): Promise<void> {

// 1. Run migrations (will check for legacy installations and throw if detected)
try {
const schemaName = process.env.SYNC_SCHEMA_NAME ?? undefined
const syncTablesSchemaName = process.env.SYNC_TABLES_SCHEMA_NAME ?? undefined
await runMigrations({
databaseUrl: config.databaseUrl,
enableSigma: config.enableSigma,
stripeApiVersion: process.env.STRIPE_API_VERSION || '2020-08-27',
schemaName,
syncTablesSchemaName,
})
} catch (migrationError) {
console.error(chalk.red('Failed to run migrations:'))
Expand Down Expand Up @@ -736,9 +728,13 @@ export async function fullSyncCommand(

// Run migrations first
try {
const schemaName = process.env.SYNC_SCHEMA_NAME ?? undefined
const syncTablesSchemaName = process.env.SYNC_TABLES_SCHEMA_NAME ?? undefined
await runMigrations({
databaseUrl: config.databaseUrl,
enableSigma,
schemaName,
syncTablesSchemaName,
})
} catch (migrationError) {
console.error(chalk.red('Failed to run migrations:'))
Expand Down
127 changes: 127 additions & 0 deletions packages/sync-engine/src/database/__tests__/migrate.openapi.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import fs from 'node:fs/promises'
import os from 'node:os'
import path from 'node:path'
import pg from 'pg'
import { afterAll, beforeAll, describe, expect, it } from 'vitest'
import { runMigrations } from '../migrate'
import { minimalStripeOpenApiSpec } from '../../openapi/__tests__/fixtures/minimalSpec'

const TEST_DB_URL = process.env.TEST_POSTGRES_DB_URL
const describeWithDb = TEST_DB_URL ? describe : describe.skip

describeWithDb('runMigrations openapi pipeline', () => {
let pool: pg.Pool
let specPath: string
let tempDir: string

beforeAll(async () => {
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'openapi-migrate-test-'))
specPath = path.join(tempDir, 'spec3.json')
await fs.writeFile(specPath, JSON.stringify(minimalStripeOpenApiSpec), 'utf8')

await runMigrations({
databaseUrl: TEST_DB_URL!,
openApiSpecPath: specPath,
stripeApiVersion: '2020-08-27',
})

pool = new pg.Pool({ connectionString: TEST_DB_URL! })
})

afterAll(async () => {
await pool?.end()
if (tempDir) {
await fs.rm(tempDir, { recursive: true, force: true })
}
})

it('creates bootstrap internal tables and the sync_runs view', async () => {
const tablesResult = await pool.query(
`SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'stripe'
AND table_name IN ('_migrations', 'accounts', '_managed_webhooks', '_sync_runs', '_sync_obj_runs')
ORDER BY table_name`
)
expect(tablesResult.rows.map((row) => row.table_name)).toEqual([
'_managed_webhooks',
'_migrations',
'_sync_obj_runs',
'_sync_runs',
'accounts',
])

const viewsResult = await pool.query(
`SELECT table_name
FROM information_schema.views
WHERE table_schema = 'stripe' AND table_name = 'sync_runs'`
)
expect(viewsResult.rows).toHaveLength(1)
})

it('materializes runtime-critical generated columns and naming contracts', async () => {
const subscriptionItemsColumns = await pool.query(
`SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'stripe' AND table_name = 'subscription_items'`
)
const subscriptionColumnSet = new Set(
subscriptionItemsColumns.rows.map((row) => row.column_name as string)
)
expect(subscriptionColumnSet.has('deleted')).toBe(true)
expect(subscriptionColumnSet.has('subscription')).toBe(true)

const entitlementColumns = await pool.query(
`SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'stripe' AND table_name = 'active_entitlements'`
)
expect(entitlementColumns.rows.some((row) => row.column_name === 'customer')).toBe(true)

const customerIdColumn = await pool.query(
`SELECT is_generated
FROM information_schema.columns
WHERE table_schema = 'stripe' AND table_name = 'customers' AND column_name = 'id'`
)
expect(customerIdColumn.rows[0]?.is_generated).toBe('ALWAYS')

const managedWebhookColumns = await pool.query(
`SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'stripe' AND table_name = '_managed_webhooks'`
)
const managedWebhookColumnSet = new Set(
managedWebhookColumns.rows.map((row) => row.column_name as string)
)
expect(managedWebhookColumnSet.has('account_id')).toBe(true)
expect(managedWebhookColumnSet.has('_account_id')).toBe(false)
})

it('enforces one active sync run per account+triggered_by', async () => {
const accountId = 'acct_openapi_migrate_test'
await pool.query(
`INSERT INTO "stripe"."accounts" ("_raw_data", "api_key_hashes")
VALUES ($1::jsonb, ARRAY['hash_openapi'])`,
[JSON.stringify({ id: accountId, object: 'account' })]
)

await pool.query(
`INSERT INTO "stripe"."_sync_runs" ("_account_id", "triggered_by", "started_at")
VALUES ($1, 'worker', date_trunc('milliseconds', now()))`,
[accountId]
)
await pool.query(
`INSERT INTO "stripe"."_sync_runs" ("_account_id", "triggered_by", "started_at")
VALUES ($1, 'sigma-worker', date_trunc('milliseconds', now()) + interval '1 second')`,
[accountId]
)

await expect(
pool.query(
`INSERT INTO "stripe"."_sync_runs" ("_account_id", "triggered_by", "started_at")
VALUES ($1, 'worker', date_trunc('milliseconds', now()) + interval '2 second')`,
[accountId]
)
).rejects.toThrow()
})
})
Loading