Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/engine/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"@hono/node-server": "^1",
"@scalar/hono-api-reference": "^0.6",
"@stripe/sync-destination-google-sheets": "workspace:*",
"@stripe/sync-destination-aws-dsql": "workspace:*",
"@stripe/sync-destination-postgres": "workspace:*",
"@stripe/sync-hono-zod-openapi": "workspace:*",
"@stripe/sync-logger": "workspace:*",
Expand Down
2 changes: 2 additions & 0 deletions apps/engine/src/lib/default-connectors.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sourceStripe from '@stripe/sync-source-stripe'
import destinationAwsDsql from '@stripe/sync-destination-aws-dsql'
import destinationPostgres from '@stripe/sync-destination-postgres'
import destinationGoogleSheets from '@stripe/sync-destination-google-sheets'
import type { RegisteredConnectors } from './resolver.js'
Expand All @@ -7,6 +8,7 @@ import type { RegisteredConnectors } from './resolver.js'
export const defaultConnectors: RegisteredConnectors = {
sources: { stripe: sourceStripe },
destinations: {
aws_dsql: destinationAwsDsql,
postgres: destinationPostgres,
google_sheets: destinationGoogleSheets,
},
Expand Down
15 changes: 15 additions & 0 deletions demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,21 @@ outbound WebSocket connection.
| `stripe-to-postgres.sh` | Stripe → Postgres via the engine | `STRIPE_API_KEY`, `DATABASE_URL` |
| `stripe-to-google-sheets.sh` | Stripe → Google Sheets via the engine | `STRIPE_API_KEY`, `GOOGLE_*` |
| `stripe-to-postgres-live.sh` | Stripe → Postgres with live WebSocket streaming | `STRIPE_API_KEY`, `DATABASE_URL` |
| `stripe-to-dsql.ts` | Stripe → AWS DSQL via the engine | `STRIPE_API_KEY`, `AWS_*` |

### Stripe → AWS DSQL

Sync Stripe data to [Aurora DSQL](https://aws.amazon.com/rds/aurora/dsql/) (serverless distributed SQL):

```sh
# One-time: provision the DSQL cluster
cd terraform && terraform init && terraform apply && cd ..

# Sync (auto-reads endpoint from terraform output)
node --import tsx demo/stripe-to-dsql.ts
```

Or with explicit env vars: `DSQL_ENDPOINT=<id>.dsql.<region>.on.aws node --import tsx demo/stripe-to-dsql.ts`

### TypeScript API

Expand Down
94 changes: 94 additions & 0 deletions demo/stripe-to-dsql.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Sync Stripe → AWS DSQL via the engine API (TypeScript).
*
* Usage:
* npx tsx demo/stripe-to-dsql.ts
* bun demo/stripe-to-dsql.ts
*
* Env:
* STRIPE_API_KEY — Stripe secret key
* DSQL_ENDPOINT — DSQL cluster endpoint (e.g. <id>.dsql.us-east-1.on.aws)
* AWS_REGION — AWS region (default: us-east-1)
* AWS_ACCESS_KEY_ID — AWS credentials
* AWS_SECRET_ACCESS_KEY — AWS credentials
*/
import { execSync } from 'node:child_process'
import { createConnectorResolver, createEngine } from '../apps/engine/src/lib/index.js'
import { defaultConnectors } from '../apps/engine/src/lib/default-connectors.js'
import { fileStateStore } from '../apps/engine/src/lib/state-store.js'
import type { PipelineConfig } from '../packages/protocol/src/index.js'
import { buildPoolConfig, pg } from '../packages/destination-aws-dsql/src/index.js'

const stripeApiKey = process.env.STRIPE_API_KEY
const region = process.env.AWS_REGION ?? 'us-east-1'

// Auto-read endpoint from terraform output if not set
const dsqlEndpoint =
process.env.DSQL_ENDPOINT ??
(() => {
try {
return execSync('terraform -chdir=terraform output -raw cluster_endpoint', {
encoding: 'utf8',
}).trim()
} catch {
return undefined
}
})()

if (!stripeApiKey) throw new Error('Set STRIPE_API_KEY')
if (!dsqlEndpoint)
throw new Error('Set DSQL_ENDPOINT or run `terraform -chdir=terraform apply` first')

const pipeline: PipelineConfig = {
source: { type: 'stripe', stripe: { api_key: stripeApiKey, backfill_limit: 10 } },
destination: {
type: 'aws_dsql',
aws_dsql: { endpoint: dsqlEndpoint, region, schema: 'public' },
},
streams: [{ name: 'products' }, { name: 'prices' }, { name: 'customers' }],
}

const resolver = await createConnectorResolver(defaultConnectors, { path: true })
const engine = await createEngine(resolver)

// Create tables
for await (const _msg of engine.pipeline_setup(pipeline)) {
}

// State: file-backed, resumable across runs
const store = fileStateStore('.sync-state-dsql.json')
const state = await store.get()

// Sync
for await (const msg of engine.pipeline_sync(pipeline, { state })) {
if (msg.type === 'source_state') {
if (msg.source_state.state_type === 'global') await store.setGlobal(msg.source_state.data)
else await store.set(msg.source_state.stream, msg.source_state.data)
}
console.log(JSON.stringify(msg))
}

// Verify: query DSQL to show what was synced
console.log('\n--- Verifying data in DSQL ---')
const poolConfig = await buildPoolConfig({
endpoint: dsqlEndpoint,
region,
schema: 'public',
batch_size: 100,
})
const pool = new pg.Pool(poolConfig)

for (const table of ['customers', 'prices', 'products']) {
const { rows } = await pool.query(`SELECT count(*) FROM ${table}`)
console.log(`${table}: ${rows[0].count} rows`)
}

console.log('\nSample rows:')
for (const table of ['customers', 'products']) {
const { rows } = await pool.query(
`SELECT id, substring(_raw_data, 1, 100) as data FROM ${table} LIMIT 2`
)
for (const row of rows) console.log(` [${table}] ${row.id}: ${row.data}...`)
}

await pool.end()
32 changes: 32 additions & 0 deletions packages/destination-aws-dsql/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"name": "@stripe/sync-destination-aws-dsql",
"version": "0.1.0",
"private": false,
"type": "module",
"exports": {
".": {
"bun": "./src/index.ts",
"types": "./dist/index.d.ts",
"import": "./dist/index.js"
}
},
"scripts": {
"build": "tsc",
"test": "vitest"
},
"files": [
"dist",
"src"
],
"dependencies": {
"@aws-sdk/dsql-signer": "^3.1013.0",
"@stripe/sync-protocol": "workspace:*",
"@stripe/sync-util-postgres": "workspace:*",
"pg": "^8.16.3",
"zod": "^4.3.6"
},
"devDependencies": {
"@types/pg": "^8.15.5",
"vitest": "^3.2.4"
}
}
212 changes: 212 additions & 0 deletions packages/destination-aws-dsql/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import pg from 'pg'
import type { PoolConfig } from 'pg'
import { DsqlSigner } from '@aws-sdk/dsql-signer'
import type { Destination, DestinationInput, LogMessage } from '@stripe/sync-protocol'
import { sql, upsert } from '@stripe/sync-util-postgres'
import defaultSpec from './spec.js'
import type { Config } from './spec.js'

export { configSchema, type Config } from './spec.js'
export { default as pg } from 'pg'

function logMsg(message: string, level: LogMessage['log']['level'] = 'info'): LogMessage {
return { type: 'log', log: { level, message } }
}

/** Generate a fresh DSQL IAM auth token. */
async function generateToken(endpoint: string, region: string): Promise<string> {
const signer = new DsqlSigner({ hostname: endpoint, region })
return signer.getDbConnectAdminAuthToken()
}

/** Build a pg PoolConfig for DSQL with rotating IAM auth tokens. */
export async function buildPoolConfig(config: Config): Promise<PoolConfig> {
const token = await generateToken(config.endpoint, config.region)
return {
host: config.endpoint,
port: 5432,
database: 'postgres',
user: 'admin',
password: token,
ssl: true,
}
}

function createPool(poolConfig: PoolConfig): pg.Pool {
const pool = new pg.Pool(poolConfig)
pool.on('error', (err) => {
console.error('DSQL destination pool error:', err)
})
return pool
}

/**
* Build a CREATE TABLE IF NOT EXISTS statement for DSQL.
*
* DSQL does not support: triggers, generated columns, PL/pgSQL DO blocks, jsonb.
* We store _raw_data as text (JSON-serialized) with id as primary key.
*/
function buildCreateTableSQL(schema: string, tableName: string): string {
const q = (s: string) => `"${s.replace(/"/g, '""')}"`
return sql`
CREATE TABLE IF NOT EXISTS ${q(schema)}.${q(tableName)} (
"id" text NOT NULL,
"_raw_data" text NOT NULL,
"_last_synced_at" timestamptz,
"_updated_at" timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY ("id")
)
`
}

/**
* Upsert records into a DSQL table.
* Explicitly sets _updated_at = now() since DSQL has no trigger support.
*/
async function upsertMany(
pool: pg.Pool,
schema: string,
table: string,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
entries: Record<string, any>[]
): Promise<void> {
if (!entries.length) return
await upsert(
pool,
entries.map((e) => ({
id: String(e.id ?? ''),
_raw_data: JSON.stringify(e),
_updated_at: new Date().toISOString(),
})),
{
schema,
table,
keyColumns: ['id'],
noDiffColumns: ['_updated_at'],
}
)
}

/** Check if an error looks transient. */
function isTransient(err: unknown): boolean {
if (!(err instanceof Error)) return false
const msg = err.message.toLowerCase()
return msg.includes('econnrefused') || msg.includes('timeout') || msg.includes('connection')
}

const destination = {
async *spec() {
yield { type: 'spec' as const, spec: defaultSpec }
},

async *check({ config }) {
const pool = createPool(await buildPoolConfig(config))
try {
await pool.query('SELECT 1')
yield {
type: 'connection_status' as const,
connection_status: { status: 'succeeded' as const },
}
} catch (err) {
yield {
type: 'connection_status' as const,
connection_status: {
status: 'failed' as const,
message: err instanceof Error ? err.message : String(err),
},
}
} finally {
await pool.end()
}
},

async *setup({ config, catalog }) {
const pool = createPool(await buildPoolConfig(config))
try {
yield logMsg(`Creating schema "${config.schema}" (${catalog.streams.length} streams)`)
await pool.query(sql`CREATE SCHEMA IF NOT EXISTS "${config.schema}"`)
// DSQL requires sequential DDL — concurrent CREATE TABLE causes OC001 conflicts
for (const cs of catalog.streams) {
await pool.query(buildCreateTableSQL(config.schema, cs.stream.name))
}
} finally {
await pool.end()
}
},

async *teardown({ config }) {
const PROTECTED = new Set(['public', 'information_schema', 'pg_catalog', 'pg_toast'])
if (PROTECTED.has(config.schema)) {
throw new Error(`Refusing to drop protected schema "${config.schema}"`)
}
const pool = createPool(await buildPoolConfig(config))
try {
await pool.query(sql`DROP SCHEMA IF EXISTS "${config.schema}" CASCADE`)
} finally {
await pool.end()
}
},

async *write({ config }, $stdin) {
const pool = createPool(await buildPoolConfig(config))
const batchSize = config.batch_size
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const streamBuffers = new Map<string, Record<string, any>[]>()

const flushStream = async (streamName: string) => {
const buffer = streamBuffers.get(streamName)
if (!buffer || buffer.length === 0) return
await upsertMany(pool, config.schema, streamName, buffer)
streamBuffers.set(streamName, [])
}

const flushAll = async () => {
for (const streamName of streamBuffers.keys()) {
await flushStream(streamName)
}
}

try {
for await (const msg of $stdin as AsyncIterable<DestinationInput>) {
if (msg.type === 'record') {
const { stream, data } = msg.record
if (!streamBuffers.has(stream)) streamBuffers.set(stream, [])
const buffer = streamBuffers.get(stream)!
buffer.push(data as Record<string, unknown>)
if (buffer.length >= batchSize) await flushStream(stream)
} else if (msg.type === 'source_state') {
if (msg.source_state.state_type !== 'global') {
await flushStream(msg.source_state.stream)
}
yield msg
}
}
await flushAll()
} catch (err: unknown) {
try {
await flushAll()
} catch {
// ignore flush errors during error handling
}
yield {
type: 'trace' as const,
trace: {
trace_type: 'error' as const,
error: {
failure_type: isTransient(err)
? ('transient_error' as const)
: ('system_error' as const),
message: err instanceof Error ? err.message : String(err),
stack_trace: err instanceof Error ? err.stack : undefined,
},
},
}
} finally {
await pool.end()
}

yield logMsg(`DSQL destination: wrote to schema "${config.schema}"`)
},
} satisfies Destination<Config>

export default destination
Loading
Loading