From 93f81b34e87cd827b4c071a66c9d7559c2ed9c46 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 6 May 2026 15:45:19 -0700 Subject: [PATCH 1/5] feat(destination-postgres): add PGlite support Refactor destination-postgres to support both pg.Pool and PGlite as backends. PGlite enables in-process WASM Postgres for testing without Docker and lightweight embedded deployments. - Add QueryClient/ManagedClient abstractions (src/client.ts) - Add `pglite` config option (true for in-memory, {data_dir} for persistent) - Refactor writeMany/upsertMany/deleteMany to accept QueryClient interface - Add @electric-sql/pglite as optional peer dependency - Add PGlite-specific test suite (no Docker required) Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- packages/destination-postgres/package.json | 7 +- packages/destination-postgres/src/client.ts | 64 +++++ packages/destination-postgres/src/index.ts | 156 +++++------ .../destination-postgres/src/pglite.test.ts | 244 ++++++++++++++++++ packages/destination-postgres/src/spec.ts | 13 + pnpm-lock.yaml | 13 +- 6 files changed, 401 insertions(+), 96 deletions(-) create mode 100644 packages/destination-postgres/src/client.ts create mode 100644 packages/destination-postgres/src/pglite.test.ts diff --git a/packages/destination-postgres/package.json b/packages/destination-postgres/package.json index e4484fe2b..13d317bcc 100644 --- a/packages/destination-postgres/package.json +++ b/packages/destination-postgres/package.json @@ -30,7 +30,8 @@ }, "peerDependencies": { "@aws-sdk/client-sts": "^3", - "@aws-sdk/rds-signer": "^3" + "@aws-sdk/rds-signer": "^3", + "@electric-sql/pglite": "^0.2" }, "peerDependenciesMeta": { "@aws-sdk/client-sts": { @@ -38,11 +39,15 @@ }, "@aws-sdk/rds-signer": { "optional": true + }, + "@electric-sql/pglite": { + "optional": true } }, "devDependencies": { "@aws-sdk/client-sts": "^3.1013.0", "@aws-sdk/rds-signer": "^3.1013.0", + "@electric-sql/pglite": "^0.2.17", "@types/pg": "^8.15.5", "vitest": "^3.2.4" } diff --git a/packages/destination-postgres/src/client.ts b/packages/destination-postgres/src/client.ts new file mode 100644 index 000000000..26449a9d2 --- /dev/null +++ b/packages/destination-postgres/src/client.ts @@ -0,0 +1,64 @@ +import type pg from 'pg' +import type { Logger } from '@stripe/sync-logger' +import { log } from './logger.js' + +export interface QueryClient { + query(text: string, values?: unknown[]): Promise +} + +export interface ManagedClient extends QueryClient { + close(): Promise + stats?(): { total_count: number; idle_count: number; waiting_count: number } +} + +export function pgPoolClient(pool: pg.Pool, logger: Logger = log): ManagedClient { + pool.on('error', (err) => { + logger.error({ err }, 'Postgres destination pool error') + }) + + return { + query(text: string, values?: unknown[]) { + return pool.query(text, values) + }, + async close() { + await pool.end() + }, + stats() { + return { + total_count: pool.totalCount, + idle_count: pool.idleCount, + waiting_count: pool.waitingCount, + } + }, + } +} + +export async function pgliteClient( + config: { data_dir?: string } = {} +): Promise { + const { PGlite } = await import('@electric-sql/pglite') + const db = await PGlite.create(config.data_dir) + + return { + async query(text: string, values?: unknown[]) { + const result = await db.query(text, values) + return { + rows: result.rows as Record[], + rowCount: result.affectedRows ?? null, + command: '', + oid: 0, + fields: result.fields?.map((f) => ({ + ...f, + tableID: 0, + columnID: 0, + dataTypeSize: 0, + dataTypeModifier: 0, + format: 'text' as const, + })) ?? [], + } as pg.QueryResult + }, + async close() { + await db.close() + }, + } +} diff --git a/packages/destination-postgres/src/index.ts b/packages/destination-postgres/src/index.ts index 89113c07f..db3bd555c 100644 --- a/packages/destination-postgres/src/index.ts +++ b/packages/destination-postgres/src/index.ts @@ -20,10 +20,14 @@ import { import defaultSpec from './spec.js' import { log } from './logger.js' import type { Config } from './spec.js' +import { pgPoolClient, pgliteClient } from './client.js' +import type { QueryClient, ManagedClient } from './client.js' // MARK: - Spec export { configSchema, type Config } from './spec.js' +export { pgPoolClient, pgliteClient } from './client.js' +export type { QueryClient, ManagedClient } from './client.js' export async function buildPoolConfig(config: Config): Promise { if (config.aws) { @@ -80,7 +84,7 @@ export interface WriteManyResult extends UpsertManyResult, DeleteManyResult {} * cleaned up — no production user is on the soft-delete code path. */ export async function writeMany( - pool: pg.Pool, + client: QueryClient, schema: string, table: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -91,8 +95,8 @@ export async function writeMany( const tombstones = entries.filter((e) => e.recordDeleted === true).map((r) => r.data) const liveRecords = entries.filter((e) => e.recordDeleted !== true).map((r) => r.data) - const u = await upsertMany(pool, schema, table, liveRecords, primaryKeyColumns, newerThanField) - const d = await deleteMany(pool, schema, table, tombstones, primaryKeyColumns) + const u = await upsertMany(client, schema, table, liveRecords, primaryKeyColumns, newerThanField) + const d = await deleteMany(client, schema, table, tombstones, primaryKeyColumns) return { ...u, deleted_count: d.deleted_count } } @@ -102,7 +106,7 @@ export async function writeMany( * `_synced_at` is the destination write time. */ export async function upsertMany( - pool: pg.Pool, + client: QueryClient, schema: string, table: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -128,7 +132,7 @@ export async function upsertMany( return { _raw_data: e, _synced_at: syncedAt, _updated_at: new Date(ts * 1000).toISOString() } }) - return await upsertWithStats(pool, records, { + return await upsertWithStats(client, records, { schema, table, primaryKeyColumns, @@ -141,7 +145,7 @@ export async function upsertMany( * terminal — once an object is deleted it cannot be undeleted. */ export async function deleteMany( - pool: pg.Pool, + client: QueryClient, schema: string, table: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -165,7 +169,7 @@ export async function deleteMany( USING (VALUES ${valueRows.join(', ')}) AS d(${identList(primaryKeyColumns)}) WHERE ${pkJoin}` - const result = await pool.query(stmt, params) + const result = await client.query(stmt, params) return { deleted_count: result.rowCount ?? 0 } } @@ -187,7 +191,7 @@ export { /** Throw if any stream's catalog enum allow-list disagrees with an existing CHECK constraint. */ async function assertEnumConstraintsConsistent( - pool: pg.Pool, + client: QueryClient, schema: string, streams: ReadonlyArray<{ stream: { name: string; json_schema?: Record } }> ): Promise { @@ -203,7 +207,7 @@ async function assertEnumConstraintsConsistent( if (enumColumns.size === 0) return const existing = await getExistingEnumAllowLists( - pool, + client, schema, streams.map((s) => s.stream.name), [...enumColumns] @@ -236,23 +240,6 @@ function errorMessage(err: unknown): string { return (err as NodeJS.ErrnoException).code ?? err.constructor.name } -function createPool(config: PoolConfig): pg.Pool { - const pool = new pg.Pool(config) - // Destination connectors should surface pool failures without crashing the host process. - pool.on('error', (err) => { - log.error({ err }, 'Postgres destination pool error') - }) - return pool -} - -function poolStats(pool: pg.Pool) { - return { - total_count: pool.totalCount, - idle_count: pool.idleCount, - waiting_count: pool.waitingCount, - } -} - function describePoolConfig(config: PoolConfig) { return { host: config.host, @@ -269,7 +256,19 @@ function describePoolConfig(config: PoolConfig) { } } -async function createInstrumentedPool(config: Config, operation: string): Promise { +async function createManagedClient(config: Config, operation: string): Promise { + if (config.pglite) { + const dataDir = config.pglite === true ? undefined : config.pglite.data_dir + log.debug({ operation, data_dir: dataDir }, 'dest postgres: creating PGlite client') + const startedAt = Date.now() + const client = await pgliteClient({ data_dir: dataDir }) + log.debug( + { operation, duration_ms: Date.now() - startedAt }, + 'dest postgres: PGlite client ready' + ) + return client + } + const configStartedAt = Date.now() log.debug({ operation }, 'dest postgres: building pool config') const poolConfig = await buildPoolConfig(config) @@ -282,35 +281,27 @@ async function createInstrumentedPool(config: Config, operation: string): Promis 'dest postgres: built pool config' ) - const pool = withQueryLogging(createPool(poolConfig), log) - log.debug({ operation, ...poolStats(pool) }, 'dest postgres: pool created') - return pool + const pool = withQueryLogging(new pg.Pool(poolConfig), log) + const client = pgPoolClient(pool, log) + log.debug({ operation, ...client.stats?.() }, 'dest postgres: pool created') + return client } -async function connectAndRelease(pool: pg.Pool, operation: string): Promise { +async function verifyConnectivity(client: ManagedClient, operation: string): Promise { const startedAt = Date.now() - log.debug({ operation, ...poolStats(pool) }, 'dest postgres: pool.connect start') - const client = await pool.connect() - try { - log.debug( - { - operation, - duration_ms: Date.now() - startedAt, - ...poolStats(pool), - }, - 'dest postgres: pool.connect complete' - ) - } finally { - client.release() - log.debug({ operation, ...poolStats(pool) }, 'dest postgres: pool.connect released') - } + log.debug({ operation, ...client.stats?.() }, 'dest postgres: connectivity check start') + await client.query('SELECT 1') + log.debug( + { operation, duration_ms: Date.now() - startedAt, ...client.stats?.() }, + 'dest postgres: connectivity check complete' + ) } -async function endPool(pool: pg.Pool, operation: string): Promise { +async function closeClient(client: ManagedClient, operation: string): Promise { const startedAt = Date.now() - log.debug({ operation, ...poolStats(pool) }, 'dest postgres: pool.end start') - await pool.end() - log.debug({ operation, duration_ms: Date.now() - startedAt }, 'dest postgres: pool.end complete') + log.debug({ operation, ...client.stats?.() }, 'dest postgres: closing client') + await client.close() + log.debug({ operation, duration_ms: Date.now() - startedAt }, 'dest postgres: client closed') } const destination = { @@ -319,10 +310,9 @@ const destination = { }, async *check({ config }) { - const pool = await createInstrumentedPool(config, 'check') + const client = await createManagedClient(config, 'check') try { - await connectAndRelease(pool, 'check') - await pool.query('SELECT 1') + await verifyConnectivity(client, 'check') yield { type: 'connection_status' as const, connection_status: { status: 'succeeded' as const }, @@ -336,40 +326,35 @@ const destination = { }, } } finally { - await endPool(pool, 'check') + await closeClient(client, 'check') } }, async *setup({ config, catalog }) { - log.debug({ schema: config.schema }, 'dest setup: connecting to pool') - const pool = await createInstrumentedPool(config, 'setup') + log.debug({ schema: config.schema }, 'dest setup: creating client') + const client = await createManagedClient(config, 'setup') try { - await connectAndRelease(pool, 'setup') + await verifyConnectivity(client, 'setup') log.info(`Creating schema "${config.schema}" (${catalog.streams.length} streams)`) log.debug('dest setup: creating schema') - await pool.query(sql`CREATE SCHEMA IF NOT EXISTS "${config.schema}"`) - // Backward-compat: drop legacy `set_updated_at()` (CASCADE removes any orphan `handle_updated_at` triggers from older deployments). + await client.query(sql`CREATE SCHEMA IF NOT EXISTS "${config.schema}"`) log.debug('dest setup: dropping legacy set_updated_at() function') - await pool.query(sql`DROP FUNCTION IF EXISTS "${config.schema}".set_updated_at() CASCADE`) + await client.query(sql`DROP FUNCTION IF EXISTS "${config.schema}".set_updated_at() CASCADE`) - // The DO $check$ block uses ADD CONSTRAINT + EXCEPTION WHEN duplicate_object, - // which silently no-ops on a changed enum list — surface it loudly instead. - await assertEnumConstraintsConsistent(pool, config.schema, catalog.streams) + await assertEnumConstraintsConsistent(client, config.schema, catalog.streams) log.debug({ streamCount: catalog.streams.length }, 'dest setup: creating tables') - await Promise.all( - catalog.streams.map(async (cs) => { - await pool.query( - buildCreateTableDDL(config.schema, cs.stream.name, cs.stream.json_schema ?? {}, { - system_columns: cs.system_columns, - primary_key: cs.stream.primary_key, - }) - ) - }) - ) + for (const cs of catalog.streams) { + await client.query( + buildCreateTableDDL(config.schema, cs.stream.name, cs.stream.json_schema ?? {}, { + system_columns: cs.system_columns, + primary_key: cs.stream.primary_key, + }) + ) + } log.debug('dest setup: complete') } finally { - await endPool(pool, 'setup') + await closeClient(client, 'setup') } }, @@ -380,17 +365,17 @@ const destination = { `Refusing to drop protected schema "${config.schema}" — teardown only drops user-created schemas` ) } - const pool = await createInstrumentedPool(config, 'teardown') + const client = await createManagedClient(config, 'teardown') try { - await connectAndRelease(pool, 'teardown') - await pool.query(sql`DROP SCHEMA IF EXISTS "${config.schema}" CASCADE`) + await verifyConnectivity(client, 'teardown') + await client.query(sql`DROP SCHEMA IF EXISTS "${config.schema}" CASCADE`) } finally { - await endPool(pool, 'teardown') + await closeClient(client, 'teardown') } }, async *write({ config, catalog }, $stdin) { - const pool = await createInstrumentedPool(config, 'write') + const client = await createManagedClient(config, 'write') const batchSize = config.batch_size // eslint-disable-next-line @typescript-eslint/no-explicit-any const streamBuffers = new Map[]>() @@ -406,7 +391,6 @@ const destination = { const failedStreams = new Set() - /** Flush and return error message if failed, undefined if ok. */ const flushStream = async (streamName: string): Promise => { if (failedStreams.has(streamName)) return undefined const buffer = streamBuffers.get(streamName) @@ -421,12 +405,12 @@ const destination = { schema: config.schema, primary_key: pk, newer_than_field: newerThan, - ...poolStats(pool), + ...client.stats?.(), }, 'dest write: flush start' ) try { - const stats = await writeMany(pool, config.schema, streamName, buffer, pk, newerThan) + const stats = await writeMany(client, config.schema, streamName, buffer, pk, newerThan) log.debug( { stream: streamName, @@ -438,7 +422,7 @@ const destination = { deleted: stats.deleted_count, skipped: stats.skipped_count, duration_ms: Date.now() - startedAt, - ...poolStats(pool), + ...client.stats?.(), }, `dest write: upsert ${config.schema}.${streamName}` ) @@ -455,7 +439,7 @@ const destination = { duration_ms: Date.now() - startedAt, error: errMsg, err, - ...poolStats(pool), + ...client.stats?.(), }, 'dest write: flush failed' ) @@ -475,7 +459,7 @@ const destination = { } try { - await connectAndRelease(pool, 'write') + await verifyConnectivity(client, 'write') for await (const msg of $stdin) { if (msg.type === 'record') { const { stream } = msg.record @@ -548,7 +532,7 @@ const destination = { log.debug(`Postgres destination: wrote to schema "${config.schema}"`) } } finally { - await endPool(pool, 'write') + await closeClient(client, 'write') } }, } satisfies Destination diff --git a/packages/destination-postgres/src/pglite.test.ts b/packages/destination-postgres/src/pglite.test.ts new file mode 100644 index 000000000..063ee16d1 --- /dev/null +++ b/packages/destination-postgres/src/pglite.test.ts @@ -0,0 +1,244 @@ +import { mkdtempSync, rmSync } from 'fs' +import { tmpdir } from 'os' +import { join } from 'path' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' +import destination, { deleteMany, upsertMany, writeMany, pgliteClient } from './index.js' +import type { ManagedClient } from './client.js' +import type { ConfiguredCatalog, DestinationInput, DestinationOutput } from '@stripe/sync-protocol' +import { collectFirst, drain } from '@stripe/sync-protocol' +import type { Config } from './spec.js' + +const SCHEMA = 'test_dest' +let dataDir: string + +function makeConfig(): Config { + return { pglite: { data_dir: dataDir }, schema: SCHEMA, batch_size: 100 } +} + +beforeAll(() => { + dataDir = mkdtempSync(join(tmpdir(), 'pglite-test-')) +}) + +afterAll(() => { + rmSync(dataDir, { recursive: true, force: true }) +}) + +let nextRecordTs = Math.floor(Date.now() / 1000) +function makeRecord(stream: string, data: Record) { + return { + type: 'record' as const, + record: { + stream, + data: { _updated_at: nextRecordTs++, ...data }, + emitted_at: new Date().toISOString(), + }, + } +} + +function makeState(stream: string, data: unknown) { + return { type: 'source_state' as const, source_state: { stream, data } } +} + +async function* toAsyncIter(msgs: DestinationInput[]): AsyncIterable { + for (const msg of msgs) yield msg +} + +async function collectOutputs(iter: AsyncIterable): Promise { + const results: DestinationOutput[] = [] + for await (const msg of iter) results.push(msg) + return results +} + +async function resetSchema() { + const c = await pgliteClient({ data_dir: dataDir }) + await c.query(`DROP SCHEMA IF EXISTS "${SCHEMA}" CASCADE`) + await c.close() +} + +const catalog: ConfiguredCatalog = { + streams: [ + { + stream: { + name: 'customer', + primary_key: [['id']], + newer_than_field: '_updated_at', + metadata: {}, + }, + sync_mode: 'full_refresh', + destination_sync_mode: 'overwrite', + }, + ], +} + +describe('PGlite destination', () => { + beforeEach(async () => { + await resetSchema() + }) + + describe('check()', () => { + it('succeeds with pglite config', async () => { + const statusMsg = await collectFirst( + destination.check({ config: makeConfig() }), + 'connection_status' + ) + expect(statusMsg.connection_status.status).toBe('succeeded') + }) + }) + + describe('setup()', () => { + it('creates schema and table', async () => { + await drain(destination.setup!({ config: makeConfig(), catalog })) + + const c = await pgliteClient({ data_dir: dataDir }) + try { + const { rows } = await c.query( + `SELECT table_name FROM information_schema.tables WHERE table_schema = $1`, + [SCHEMA] + ) + expect(rows.map((r) => r.table_name)).toContain('customer') + } finally { + await c.close() + } + }) + }) + + describe('write()', () => { + beforeEach(async () => { + await drain(destination.setup!({ config: makeConfig(), catalog })) + }) + + it('upserts records via PGlite', async () => { + const messages = toAsyncIter([ + makeRecord('customer', { id: 'cus_1', name: 'Alice' }), + makeRecord('customer', { id: 'cus_2', name: 'Bob' }), + ]) + + const outputs = await collectOutputs(destination.write({ config: makeConfig(), catalog }, messages)) + const records = outputs.filter((m) => m.type === 'record') + expect(records).toHaveLength(2) + }) + + it('re-emits SourceStateMessage after flushing', async () => { + const stateData = { cursor: 'abc123' } + const messages = toAsyncIter([ + makeRecord('customer', { id: 'cus_1', name: 'Alice' }), + makeState('customer', stateData), + ]) + + const outputs = await collectOutputs(destination.write({ config: makeConfig(), catalog }, messages)) + const stateOutputs = outputs.filter((m) => m.type === 'source_state') + expect(stateOutputs).toHaveLength(1) + expect(stateOutputs[0]).toEqual({ + type: 'source_state', + source_state: { stream: 'customer', data: stateData }, + }) + }) + + it('handles upsert (ON CONFLICT update)', async () => { + const messages1 = toAsyncIter([makeRecord('customer', { id: 'cus_1', name: 'Alice' })]) + await collectOutputs(destination.write({ config: makeConfig(), catalog }, messages1)) + + const messages2 = toAsyncIter([makeRecord('customer', { id: 'cus_1', name: 'Alice Updated' })]) + await collectOutputs(destination.write({ config: makeConfig(), catalog }, messages2)) + + const c = await pgliteClient({ data_dir: dataDir }) + try { + const { rows } = await c.query( + `SELECT _raw_data->>'name' AS name FROM "${SCHEMA}".customer WHERE id = 'cus_1'` + ) + expect(rows[0].name).toBe('Alice Updated') + } finally { + await c.close() + } + }) + }) +}) + +describe('PGlite upsertMany / deleteMany / writeMany', () => { + let client: ManagedClient + + beforeEach(async () => { + client = await pgliteClient({ data_dir: dataDir }) + await client.query(`DROP SCHEMA IF EXISTS "${SCHEMA}" CASCADE`) + await client.query(`CREATE SCHEMA IF NOT EXISTS "${SCHEMA}"`) + await client.query(`CREATE TABLE IF NOT EXISTS "${SCHEMA}".customer ( + "_raw_data" jsonb NOT NULL, + "_synced_at" timestamptz NOT NULL DEFAULT now(), + "_updated_at" timestamptz NOT NULL DEFAULT now(), + "id" text GENERATED ALWAYS AS ((_raw_data->>'id')::text) STORED, + PRIMARY KEY ("id") + )`) + }) + + afterEach(async () => { + await client?.close() + }) + + it('upsertMany inserts records', async () => { + const ts = Math.floor(Date.now() / 1000) + await upsertMany( + client, + SCHEMA, + 'customer', + [ + { id: 'cus_10', name: 'Direct', _updated_at: ts }, + { id: 'cus_11', name: 'Insert', _updated_at: ts }, + ], + ['id'], + '_updated_at' + ) + + const { rows } = await client.query(`SELECT count(*)::int AS n FROM "${SCHEMA}".customer`) + expect(rows[0].n).toBe(2) + }) + + it('deleteMany removes rows', async () => { + const ts = Math.floor(Date.now() / 1000) + await upsertMany( + client, + SCHEMA, + 'customer', + [ + { id: 'cus_keep', name: 'Keep', _updated_at: ts }, + { id: 'cus_drop', name: 'Drop', _updated_at: ts }, + ], + ['id'], + '_updated_at' + ) + + const result = await deleteMany(client, SCHEMA, 'customer', [{ id: 'cus_drop' }], ['id']) + expect(result.deleted_count).toBe(1) + + const { rows } = await client.query(`SELECT id FROM "${SCHEMA}".customer ORDER BY id`) + expect(rows).toEqual([{ id: 'cus_keep' }]) + }) + + it('writeMany routes mixed batch to upsert and delete', async () => { + const ts = Math.floor(Date.now() / 1000) + await upsertMany( + client, + SCHEMA, + 'customer', + [{ id: 'cus_old', name: 'Old', _updated_at: ts }], + ['id'], + '_updated_at' + ) + + const result = await writeMany( + client, + SCHEMA, + 'customer', + [ + { data: { id: 'cus_new', name: 'New', _updated_at: ts + 1 } }, + { recordDeleted: true, data: { id: 'cus_old', _updated_at: ts + 1 } }, + ], + ['id'], + '_updated_at' + ) + expect(result.created_count).toBe(1) + expect(result.deleted_count).toBe(1) + + const { rows } = await client.query(`SELECT id FROM "${SCHEMA}".customer ORDER BY id`) + expect(rows).toEqual([{ id: 'cus_new' }]) + }) +}) diff --git a/packages/destination-postgres/src/spec.ts b/packages/destination-postgres/src/spec.ts index f7b67b869..4a20dfb4b 100644 --- a/packages/destination-postgres/src/spec.ts +++ b/packages/destination-postgres/src/spec.ts @@ -19,6 +19,15 @@ export const configSchema = z }) .optional() .describe('AWS RDS IAM authentication config'), + pglite: z + .union([ + z.literal(true), + z.object({ + data_dir: z.string().optional().describe('Directory for persistent storage (omit for in-memory)'), + }), + ]) + .optional() + .describe('Use PGlite (in-process WASM Postgres) instead of connecting to an external server'), ssl_ca_pem: z .string() .optional() @@ -30,6 +39,10 @@ export const configSchema = z message: 'Specify either url/connection_string or aws config, not both', path: ['aws'], }) + .refine((config) => !(config.pglite && (config.url || config.connection_string || config.aws)), { + message: 'Specify pglite OR url/connection_string/aws, not both', + path: ['pglite'], + }) export type Config = z.infer diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 537f9f18e..11ccbe892 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -534,6 +534,9 @@ importers: '@aws-sdk/rds-signer': specifier: ^3.1013.0 version: 3.1013.0 + '@electric-sql/pglite': + specifier: ^0.2.17 + version: 0.2.17 '@types/pg': specifier: ^8.15.5 version: 8.15.6 @@ -7321,14 +7324,6 @@ snapshots: chai: 5.2.0 tinyrainbow: 2.0.0 - '@vitest/mocker@3.2.4(vite@7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1))': - dependencies: - '@vitest/spy': 3.2.4 - estree-walker: 3.0.3 - magic-string: 0.30.21 - optionalDependencies: - vite: 7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) - '@vitest/mocker@3.2.4(vite@7.2.2(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1))': dependencies: '@vitest/spy': 3.2.4 @@ -9391,7 +9386,7 @@ snapshots: dependencies: '@types/chai': 5.2.2 '@vitest/expect': 3.2.4 - '@vitest/mocker': 3.2.4(vite@7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1)) + '@vitest/mocker': 3.2.4(vite@7.2.2(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1)) '@vitest/pretty-format': 3.2.4 '@vitest/runner': 3.2.4 '@vitest/snapshot': 3.2.4 From 84b29ec95b514cf25ce307b268ddc0d7e761be25 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 6 May 2026 15:45:40 -0700 Subject: [PATCH 2/5] chore: regenerate OpenAPI specs for pglite config Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- apps/engine/src/__generated__/openapi.d.ts | 4 ++++ apps/engine/src/__generated__/openapi.json | 18 ++++++++++++++++++ apps/service/src/__generated__/openapi.d.ts | 4 ++++ apps/service/src/__generated__/openapi.json | 18 ++++++++++++++++++ 4 files changed, 44 insertions(+) diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index d11dfe064..916988041 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -372,6 +372,10 @@ export interface components { /** @description External ID for STS AssumeRole */ external_id?: string; }; + pglite?: true | { + /** @description Directory for persistent storage (omit for in-memory) */ + data_dir?: string; + }; /** @description PEM-encoded CA certificate for SSL verification (required for verify-ca / verify-full with a private CA) */ ssl_ca_pem?: string; }; diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index ace9d0e6b..faecca843 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -1209,6 +1209,24 @@ "additionalProperties": false, "description": "AWS RDS IAM authentication config" }, + "pglite": { + "anyOf": [ + { + "type": "boolean", + "const": true + }, + { + "type": "object", + "properties": { + "data_dir": { + "type": "string", + "description": "Directory for persistent storage (omit for in-memory)" + } + }, + "additionalProperties": false + } + ] + }, "ssl_ca_pem": { "type": "string", "description": "PEM-encoded CA certificate for SSL verification (required for verify-ca / verify-full with a private CA)" diff --git a/apps/service/src/__generated__/openapi.d.ts b/apps/service/src/__generated__/openapi.d.ts index d9a416097..468478c84 100644 --- a/apps/service/src/__generated__/openapi.d.ts +++ b/apps/service/src/__generated__/openapi.d.ts @@ -307,6 +307,10 @@ export interface components { /** @description External ID for STS AssumeRole */ external_id?: string; }; + pglite?: true | { + /** @description Directory for persistent storage (omit for in-memory) */ + data_dir?: string; + }; /** @description PEM-encoded CA certificate for SSL verification (required for verify-ca / verify-full with a private CA) */ ssl_ca_pem?: string; }; diff --git a/apps/service/src/__generated__/openapi.json b/apps/service/src/__generated__/openapi.json index 377ec6ed7..014287505 100644 --- a/apps/service/src/__generated__/openapi.json +++ b/apps/service/src/__generated__/openapi.json @@ -1420,6 +1420,24 @@ "additionalProperties": false, "description": "AWS RDS IAM authentication config" }, + "pglite": { + "anyOf": [ + { + "type": "boolean", + "const": true + }, + { + "type": "object", + "properties": { + "data_dir": { + "type": "string", + "description": "Directory for persistent storage (omit for in-memory)" + } + }, + "additionalProperties": false + } + ] + }, "ssl_ca_pem": { "type": "string", "description": "PEM-encoded CA certificate for SSL verification (required for verify-ca / verify-full with a private CA)" From 049f037f0be5f5b66d128373c1861e681d58876a Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 6 May 2026 16:08:58 -0700 Subject: [PATCH 3/5] feat: add allow_experimental_pglite gate, file:///memory:// URL support, upgrade to pglite 0.4.5 - Add `allow_experimental_pglite` boolean config gate - Detect file:// and memory:// URL schemes and route to PGlite - Upgrade @electric-sql/pglite from 0.2.17 to 0.4.5 Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- packages/destination-postgres/package.json | 4 ++-- packages/destination-postgres/src/client.ts | 10 ++++++++-- packages/destination-postgres/src/index.ts | 14 ++++++++------ .../destination-postgres/src/pglite.test.ts | 2 +- packages/destination-postgres/src/spec.ts | 17 +++++++++++++++++ pnpm-lock.yaml | 9 +++++++-- 6 files changed, 43 insertions(+), 13 deletions(-) diff --git a/packages/destination-postgres/package.json b/packages/destination-postgres/package.json index 13d317bcc..39b0ca544 100644 --- a/packages/destination-postgres/package.json +++ b/packages/destination-postgres/package.json @@ -31,7 +31,7 @@ "peerDependencies": { "@aws-sdk/client-sts": "^3", "@aws-sdk/rds-signer": "^3", - "@electric-sql/pglite": "^0.2" + "@electric-sql/pglite": "^0.4.5" }, "peerDependenciesMeta": { "@aws-sdk/client-sts": { @@ -47,7 +47,7 @@ "devDependencies": { "@aws-sdk/client-sts": "^3.1013.0", "@aws-sdk/rds-signer": "^3.1013.0", - "@electric-sql/pglite": "^0.2.17", + "@electric-sql/pglite": "^0.4.5", "@types/pg": "^8.15.5", "vitest": "^3.2.4" } diff --git a/packages/destination-postgres/src/client.ts b/packages/destination-postgres/src/client.ts index 26449a9d2..50a537200 100644 --- a/packages/destination-postgres/src/client.ts +++ b/packages/destination-postgres/src/client.ts @@ -33,11 +33,17 @@ export function pgPoolClient(pool: pg.Pool, logger: Logger = log): ManagedClient } } +export function isPGliteUrl(url: string): boolean { + return url.startsWith('file://') || url.startsWith('memory://') +} + export async function pgliteClient( - config: { data_dir?: string } = {} + config: { data_dir?: string; url?: string } = {} ): Promise { const { PGlite } = await import('@electric-sql/pglite') - const db = await PGlite.create(config.data_dir) + + const dataSource = config.url ?? config.data_dir + const db = await PGlite.create(dataSource) return { async query(text: string, values?: unknown[]) { diff --git a/packages/destination-postgres/src/index.ts b/packages/destination-postgres/src/index.ts index db3bd555c..d048df0bc 100644 --- a/packages/destination-postgres/src/index.ts +++ b/packages/destination-postgres/src/index.ts @@ -20,13 +20,13 @@ import { import defaultSpec from './spec.js' import { log } from './logger.js' import type { Config } from './spec.js' -import { pgPoolClient, pgliteClient } from './client.js' +import { pgPoolClient, pgliteClient, isPGliteUrl } from './client.js' import type { QueryClient, ManagedClient } from './client.js' // MARK: - Spec export { configSchema, type Config } from './spec.js' -export { pgPoolClient, pgliteClient } from './client.js' +export { pgPoolClient, pgliteClient, isPGliteUrl } from './client.js' export type { QueryClient, ManagedClient } from './client.js' export async function buildPoolConfig(config: Config): Promise { @@ -257,11 +257,13 @@ function describePoolConfig(config: PoolConfig) { } async function createManagedClient(config: Config, operation: string): Promise { - if (config.pglite) { - const dataDir = config.pglite === true ? undefined : config.pglite.data_dir - log.debug({ operation, data_dir: dataDir }, 'dest postgres: creating PGlite client') + const connectionUrl = config.url ?? config.connection_string + if (config.pglite || (connectionUrl && isPGliteUrl(connectionUrl))) { + const url = connectionUrl && isPGliteUrl(connectionUrl) ? connectionUrl : undefined + const dataDir = config.pglite && config.pglite !== true ? config.pglite.data_dir : undefined + log.debug({ operation, url, data_dir: dataDir }, 'dest postgres: creating PGlite client') const startedAt = Date.now() - const client = await pgliteClient({ data_dir: dataDir }) + const client = await pgliteClient({ url, data_dir: dataDir }) log.debug( { operation, duration_ms: Date.now() - startedAt }, 'dest postgres: PGlite client ready' diff --git a/packages/destination-postgres/src/pglite.test.ts b/packages/destination-postgres/src/pglite.test.ts index 063ee16d1..11cfa2ed5 100644 --- a/packages/destination-postgres/src/pglite.test.ts +++ b/packages/destination-postgres/src/pglite.test.ts @@ -12,7 +12,7 @@ const SCHEMA = 'test_dest' let dataDir: string function makeConfig(): Config { - return { pglite: { data_dir: dataDir }, schema: SCHEMA, batch_size: 100 } + return { pglite: { data_dir: dataDir }, schema: SCHEMA, batch_size: 100, allow_experimental_pglite: true } } beforeAll(() => { diff --git a/packages/destination-postgres/src/spec.ts b/packages/destination-postgres/src/spec.ts index 4a20dfb4b..c52dd6b83 100644 --- a/packages/destination-postgres/src/spec.ts +++ b/packages/destination-postgres/src/spec.ts @@ -19,6 +19,10 @@ export const configSchema = z }) .optional() .describe('AWS RDS IAM authentication config'), + allow_experimental_pglite: z + .boolean() + .optional() + .describe('Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs)'), pglite: z .union([ z.literal(true), @@ -43,6 +47,19 @@ export const configSchema = z message: 'Specify pglite OR url/connection_string/aws, not both', path: ['pglite'], }) + .refine( + (config) => { + if (config.pglite) return config.allow_experimental_pglite === true + const url = config.url ?? config.connection_string + if (url && (url.startsWith('file://') || url.startsWith('memory://'))) + return config.allow_experimental_pglite === true + return true + }, + { + message: 'Set allow_experimental_pglite: true to use PGlite or file:///memory:// URLs', + path: ['allow_experimental_pglite'], + } + ) export type Config = z.infer diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 11ccbe892..6b1d1f081 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -535,8 +535,8 @@ importers: specifier: ^3.1013.0 version: 3.1013.0 '@electric-sql/pglite': - specifier: ^0.2.17 - version: 0.2.17 + specifier: ^0.4.5 + version: 0.4.5 '@types/pg': specifier: ^8.15.5 version: 8.15.6 @@ -1046,6 +1046,9 @@ packages: '@electric-sql/pglite@0.2.17': resolution: {integrity: sha512-qEpKRT2oUaWDH6tjRxLHjdzMqRUGYDnGZlKrnL4dJ77JVMcP2Hpo3NYnOSPKdZdeec57B6QPprCUFg0picx5Pw==} + '@electric-sql/pglite@0.4.5': + resolution: {integrity: sha512-aGG2zGEyZzGWKy8P+9ZoNUV0jxt1+hgbeTf+bVAYyxVZZLXg3/9aFlfLxb08AYZVAfAkQlQIysmWjhc5hwDG8g==} + '@emnapi/runtime@1.9.1': resolution: {integrity: sha512-VYi5+ZVLhpgK4hQ0TAjiQiZ6ol0oe4mBx7mVv7IflsiEp0OWoVsp/+f9Vc1hOhE0TtkORVrI1GvzyreqpgWtkA==} @@ -5601,6 +5604,8 @@ snapshots: '@electric-sql/pglite@0.2.17': {} + '@electric-sql/pglite@0.4.5': {} + '@emnapi/runtime@1.9.1': dependencies: tslib: 2.8.1 From 712938cfc33342620c9764faab416b00d8032d47 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 6 May 2026 16:10:15 -0700 Subject: [PATCH 4/5] fix: handle multi-statement DDL in PGlite adapter PGlite's query() rejects multiple SQL statements in a single call. Fall back to exec() when the "multiple commands" error is detected. This fixes destination setup which uses buildCreateTableDDL (returns a DO block + standalone DO blocks). Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- packages/destination-postgres/src/client.ts | 45 ++++++++++++++------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/packages/destination-postgres/src/client.ts b/packages/destination-postgres/src/client.ts index 50a537200..85c302a8f 100644 --- a/packages/destination-postgres/src/client.ts +++ b/packages/destination-postgres/src/client.ts @@ -45,23 +45,38 @@ export async function pgliteClient( const dataSource = config.url ?? config.data_dir const db = await PGlite.create(dataSource) + function adaptResult(result: { rows: unknown[]; affectedRows?: number; fields?: { name: string; dataTypeID: number }[] }): pg.QueryResult { + return { + rows: result.rows as Record[], + rowCount: result.affectedRows ?? null, + command: '', + oid: 0, + fields: result.fields?.map((f) => ({ + ...f, + tableID: 0, + columnID: 0, + dataTypeSize: 0, + dataTypeModifier: 0, + format: 'text' as const, + })) ?? [], + } as pg.QueryResult + } + return { async query(text: string, values?: unknown[]) { - const result = await db.query(text, values) - return { - rows: result.rows as Record[], - rowCount: result.affectedRows ?? null, - command: '', - oid: 0, - fields: result.fields?.map((f) => ({ - ...f, - tableID: 0, - columnID: 0, - dataTypeSize: 0, - dataTypeModifier: 0, - format: 'text' as const, - })) ?? [], - } as pg.QueryResult + if (values && values.length > 0) { + return adaptResult(await db.query(text, values)) + } + // PGlite's query() rejects multiple statements; use exec() as fallback + try { + return adaptResult(await db.query(text)) + } catch (err) { + if (err instanceof Error && err.message.includes('multiple commands')) { + await db.exec(text) + return adaptResult({ rows: [], affectedRows: 0, fields: [] }) + } + throw err + } }, async close() { await db.close() From 65e3878ca880206290eb0154d747224e12f8ba4c Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 6 May 2026 16:10:35 -0700 Subject: [PATCH 5/5] chore: regenerate OpenAPI specs for allow_experimental_pglite Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- apps/engine/src/__generated__/openapi.d.ts | 2 ++ apps/engine/src/__generated__/openapi.json | 4 ++++ apps/service/src/__generated__/openapi.d.ts | 2 ++ apps/service/src/__generated__/openapi.json | 4 ++++ 4 files changed, 12 insertions(+) diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index 916988041..32387e79a 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -372,6 +372,8 @@ export interface components { /** @description External ID for STS AssumeRole */ external_id?: string; }; + /** @description Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs) */ + allow_experimental_pglite?: boolean; pglite?: true | { /** @description Directory for persistent storage (omit for in-memory) */ data_dir?: string; diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index faecca843..1ca548335 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -1209,6 +1209,10 @@ "additionalProperties": false, "description": "AWS RDS IAM authentication config" }, + "allow_experimental_pglite": { + "type": "boolean", + "description": "Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs)" + }, "pglite": { "anyOf": [ { diff --git a/apps/service/src/__generated__/openapi.d.ts b/apps/service/src/__generated__/openapi.d.ts index 468478c84..8f49a4188 100644 --- a/apps/service/src/__generated__/openapi.d.ts +++ b/apps/service/src/__generated__/openapi.d.ts @@ -307,6 +307,8 @@ export interface components { /** @description External ID for STS AssumeRole */ external_id?: string; }; + /** @description Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs) */ + allow_experimental_pglite?: boolean; pglite?: true | { /** @description Directory for persistent storage (omit for in-memory) */ data_dir?: string; diff --git a/apps/service/src/__generated__/openapi.json b/apps/service/src/__generated__/openapi.json index 014287505..e4f4e1090 100644 --- a/apps/service/src/__generated__/openapi.json +++ b/apps/service/src/__generated__/openapi.json @@ -1420,6 +1420,10 @@ "additionalProperties": false, "description": "AWS RDS IAM authentication config" }, + "allow_experimental_pglite": { + "type": "boolean", + "description": "Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs)" + }, "pglite": { "anyOf": [ {