diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index d11dfe064..32387e79a 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -372,6 +372,12 @@ export interface components { /** @description External ID for STS AssumeRole */ external_id?: string; }; + /** @description Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs) */ + allow_experimental_pglite?: boolean; + pglite?: true | { + /** @description Directory for persistent storage (omit for in-memory) */ + data_dir?: string; + }; /** @description PEM-encoded CA certificate for SSL verification (required for verify-ca / verify-full with a private CA) */ ssl_ca_pem?: string; }; diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index ace9d0e6b..1ca548335 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -1209,6 +1209,28 @@ "additionalProperties": false, "description": "AWS RDS IAM authentication config" }, + "allow_experimental_pglite": { + "type": "boolean", + "description": "Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs)" + }, + "pglite": { + "anyOf": [ + { + "type": "boolean", + "const": true + }, + { + "type": "object", + "properties": { + "data_dir": { + "type": "string", + "description": "Directory for persistent storage (omit for in-memory)" + } + }, + "additionalProperties": false + } + ] + }, "ssl_ca_pem": { "type": "string", "description": "PEM-encoded CA certificate for SSL verification (required for verify-ca / verify-full with a private CA)" diff --git a/apps/service/src/__generated__/openapi.d.ts b/apps/service/src/__generated__/openapi.d.ts index d9a416097..8f49a4188 100644 --- a/apps/service/src/__generated__/openapi.d.ts +++ b/apps/service/src/__generated__/openapi.d.ts @@ -307,6 +307,12 @@ export interface components { /** @description External ID for STS AssumeRole */ external_id?: string; }; + /** @description Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs) */ + allow_experimental_pglite?: boolean; + pglite?: true | { + /** @description Directory for persistent storage (omit for in-memory) */ + data_dir?: string; + }; /** @description PEM-encoded CA certificate for SSL verification (required for verify-ca / verify-full with a private CA) */ ssl_ca_pem?: string; }; diff --git a/apps/service/src/__generated__/openapi.json b/apps/service/src/__generated__/openapi.json index 377ec6ed7..e4f4e1090 100644 --- a/apps/service/src/__generated__/openapi.json +++ b/apps/service/src/__generated__/openapi.json @@ -1420,6 +1420,28 @@ "additionalProperties": false, "description": "AWS RDS IAM authentication config" }, + "allow_experimental_pglite": { + "type": "boolean", + "description": "Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs)" + }, + "pglite": { + "anyOf": [ + { + "type": "boolean", + "const": true + }, + { + "type": "object", + "properties": { + "data_dir": { + "type": "string", + "description": "Directory for persistent storage (omit for in-memory)" + } + }, + "additionalProperties": false + } + ] + }, "ssl_ca_pem": { "type": "string", "description": "PEM-encoded CA certificate for SSL verification (required for verify-ca / verify-full with a private CA)" diff --git a/packages/destination-postgres/package.json b/packages/destination-postgres/package.json index e4484fe2b..39b0ca544 100644 --- a/packages/destination-postgres/package.json +++ b/packages/destination-postgres/package.json @@ -30,7 +30,8 @@ }, "peerDependencies": { "@aws-sdk/client-sts": "^3", - "@aws-sdk/rds-signer": "^3" + "@aws-sdk/rds-signer": "^3", + "@electric-sql/pglite": "^0.4.5" }, "peerDependenciesMeta": { "@aws-sdk/client-sts": { @@ -38,11 +39,15 @@ }, "@aws-sdk/rds-signer": { "optional": true + }, + "@electric-sql/pglite": { + "optional": true } }, "devDependencies": { "@aws-sdk/client-sts": "^3.1013.0", "@aws-sdk/rds-signer": "^3.1013.0", + "@electric-sql/pglite": "^0.4.5", "@types/pg": "^8.15.5", "vitest": "^3.2.4" } diff --git a/packages/destination-postgres/src/client.ts b/packages/destination-postgres/src/client.ts new file mode 100644 index 000000000..85c302a8f --- /dev/null +++ b/packages/destination-postgres/src/client.ts @@ -0,0 +1,85 @@ +import type pg from 'pg' +import type { Logger } from '@stripe/sync-logger' +import { log } from './logger.js' + +export interface QueryClient { + query(text: string, values?: unknown[]): Promise +} + +export interface ManagedClient extends QueryClient { + close(): Promise + stats?(): { total_count: number; idle_count: number; waiting_count: number } +} + +export function pgPoolClient(pool: pg.Pool, logger: Logger = log): ManagedClient { + pool.on('error', (err) => { + logger.error({ err }, 'Postgres destination pool error') + }) + + return { + query(text: string, values?: unknown[]) { + return pool.query(text, values) + }, + async close() { + await pool.end() + }, + stats() { + return { + total_count: pool.totalCount, + idle_count: pool.idleCount, + waiting_count: pool.waitingCount, + } + }, + } +} + +export function isPGliteUrl(url: string): boolean { + return url.startsWith('file://') || url.startsWith('memory://') +} + +export async function pgliteClient( + config: { data_dir?: string; url?: string } = {} +): Promise { + const { PGlite } = await import('@electric-sql/pglite') + + const dataSource = config.url ?? config.data_dir + const db = await PGlite.create(dataSource) + + function adaptResult(result: { rows: unknown[]; affectedRows?: number; fields?: { name: string; dataTypeID: number }[] }): pg.QueryResult { + return { + rows: result.rows as Record[], + rowCount: result.affectedRows ?? null, + command: '', + oid: 0, + fields: result.fields?.map((f) => ({ + ...f, + tableID: 0, + columnID: 0, + dataTypeSize: 0, + dataTypeModifier: 0, + format: 'text' as const, + })) ?? [], + } as pg.QueryResult + } + + return { + async query(text: string, values?: unknown[]) { + if (values && values.length > 0) { + return adaptResult(await db.query(text, values)) + } + // PGlite's query() rejects multiple statements; use exec() as fallback + try { + return adaptResult(await db.query(text)) + } catch (err) { + if (err instanceof Error && err.message.includes('multiple commands')) { + await db.exec(text) + return adaptResult({ rows: [], affectedRows: 0, fields: [] }) + } + throw err + } + }, + async close() { + await db.close() + }, + } +} diff --git a/packages/destination-postgres/src/index.ts b/packages/destination-postgres/src/index.ts index 89113c07f..d048df0bc 100644 --- a/packages/destination-postgres/src/index.ts +++ b/packages/destination-postgres/src/index.ts @@ -20,10 +20,14 @@ import { import defaultSpec from './spec.js' import { log } from './logger.js' import type { Config } from './spec.js' +import { pgPoolClient, pgliteClient, isPGliteUrl } from './client.js' +import type { QueryClient, ManagedClient } from './client.js' // MARK: - Spec export { configSchema, type Config } from './spec.js' +export { pgPoolClient, pgliteClient, isPGliteUrl } from './client.js' +export type { QueryClient, ManagedClient } from './client.js' export async function buildPoolConfig(config: Config): Promise { if (config.aws) { @@ -80,7 +84,7 @@ export interface WriteManyResult extends UpsertManyResult, DeleteManyResult {} * cleaned up — no production user is on the soft-delete code path. */ export async function writeMany( - pool: pg.Pool, + client: QueryClient, schema: string, table: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -91,8 +95,8 @@ export async function writeMany( const tombstones = entries.filter((e) => e.recordDeleted === true).map((r) => r.data) const liveRecords = entries.filter((e) => e.recordDeleted !== true).map((r) => r.data) - const u = await upsertMany(pool, schema, table, liveRecords, primaryKeyColumns, newerThanField) - const d = await deleteMany(pool, schema, table, tombstones, primaryKeyColumns) + const u = await upsertMany(client, schema, table, liveRecords, primaryKeyColumns, newerThanField) + const d = await deleteMany(client, schema, table, tombstones, primaryKeyColumns) return { ...u, deleted_count: d.deleted_count } } @@ -102,7 +106,7 @@ export async function writeMany( * `_synced_at` is the destination write time. */ export async function upsertMany( - pool: pg.Pool, + client: QueryClient, schema: string, table: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -128,7 +132,7 @@ export async function upsertMany( return { _raw_data: e, _synced_at: syncedAt, _updated_at: new Date(ts * 1000).toISOString() } }) - return await upsertWithStats(pool, records, { + return await upsertWithStats(client, records, { schema, table, primaryKeyColumns, @@ -141,7 +145,7 @@ export async function upsertMany( * terminal — once an object is deleted it cannot be undeleted. */ export async function deleteMany( - pool: pg.Pool, + client: QueryClient, schema: string, table: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -165,7 +169,7 @@ export async function deleteMany( USING (VALUES ${valueRows.join(', ')}) AS d(${identList(primaryKeyColumns)}) WHERE ${pkJoin}` - const result = await pool.query(stmt, params) + const result = await client.query(stmt, params) return { deleted_count: result.rowCount ?? 0 } } @@ -187,7 +191,7 @@ export { /** Throw if any stream's catalog enum allow-list disagrees with an existing CHECK constraint. */ async function assertEnumConstraintsConsistent( - pool: pg.Pool, + client: QueryClient, schema: string, streams: ReadonlyArray<{ stream: { name: string; json_schema?: Record } }> ): Promise { @@ -203,7 +207,7 @@ async function assertEnumConstraintsConsistent( if (enumColumns.size === 0) return const existing = await getExistingEnumAllowLists( - pool, + client, schema, streams.map((s) => s.stream.name), [...enumColumns] @@ -236,23 +240,6 @@ function errorMessage(err: unknown): string { return (err as NodeJS.ErrnoException).code ?? err.constructor.name } -function createPool(config: PoolConfig): pg.Pool { - const pool = new pg.Pool(config) - // Destination connectors should surface pool failures without crashing the host process. - pool.on('error', (err) => { - log.error({ err }, 'Postgres destination pool error') - }) - return pool -} - -function poolStats(pool: pg.Pool) { - return { - total_count: pool.totalCount, - idle_count: pool.idleCount, - waiting_count: pool.waitingCount, - } -} - function describePoolConfig(config: PoolConfig) { return { host: config.host, @@ -269,7 +256,21 @@ function describePoolConfig(config: PoolConfig) { } } -async function createInstrumentedPool(config: Config, operation: string): Promise { +async function createManagedClient(config: Config, operation: string): Promise { + const connectionUrl = config.url ?? config.connection_string + if (config.pglite || (connectionUrl && isPGliteUrl(connectionUrl))) { + const url = connectionUrl && isPGliteUrl(connectionUrl) ? connectionUrl : undefined + const dataDir = config.pglite && config.pglite !== true ? config.pglite.data_dir : undefined + log.debug({ operation, url, data_dir: dataDir }, 'dest postgres: creating PGlite client') + const startedAt = Date.now() + const client = await pgliteClient({ url, data_dir: dataDir }) + log.debug( + { operation, duration_ms: Date.now() - startedAt }, + 'dest postgres: PGlite client ready' + ) + return client + } + const configStartedAt = Date.now() log.debug({ operation }, 'dest postgres: building pool config') const poolConfig = await buildPoolConfig(config) @@ -282,35 +283,27 @@ async function createInstrumentedPool(config: Config, operation: string): Promis 'dest postgres: built pool config' ) - const pool = withQueryLogging(createPool(poolConfig), log) - log.debug({ operation, ...poolStats(pool) }, 'dest postgres: pool created') - return pool + const pool = withQueryLogging(new pg.Pool(poolConfig), log) + const client = pgPoolClient(pool, log) + log.debug({ operation, ...client.stats?.() }, 'dest postgres: pool created') + return client } -async function connectAndRelease(pool: pg.Pool, operation: string): Promise { +async function verifyConnectivity(client: ManagedClient, operation: string): Promise { const startedAt = Date.now() - log.debug({ operation, ...poolStats(pool) }, 'dest postgres: pool.connect start') - const client = await pool.connect() - try { - log.debug( - { - operation, - duration_ms: Date.now() - startedAt, - ...poolStats(pool), - }, - 'dest postgres: pool.connect complete' - ) - } finally { - client.release() - log.debug({ operation, ...poolStats(pool) }, 'dest postgres: pool.connect released') - } + log.debug({ operation, ...client.stats?.() }, 'dest postgres: connectivity check start') + await client.query('SELECT 1') + log.debug( + { operation, duration_ms: Date.now() - startedAt, ...client.stats?.() }, + 'dest postgres: connectivity check complete' + ) } -async function endPool(pool: pg.Pool, operation: string): Promise { +async function closeClient(client: ManagedClient, operation: string): Promise { const startedAt = Date.now() - log.debug({ operation, ...poolStats(pool) }, 'dest postgres: pool.end start') - await pool.end() - log.debug({ operation, duration_ms: Date.now() - startedAt }, 'dest postgres: pool.end complete') + log.debug({ operation, ...client.stats?.() }, 'dest postgres: closing client') + await client.close() + log.debug({ operation, duration_ms: Date.now() - startedAt }, 'dest postgres: client closed') } const destination = { @@ -319,10 +312,9 @@ const destination = { }, async *check({ config }) { - const pool = await createInstrumentedPool(config, 'check') + const client = await createManagedClient(config, 'check') try { - await connectAndRelease(pool, 'check') - await pool.query('SELECT 1') + await verifyConnectivity(client, 'check') yield { type: 'connection_status' as const, connection_status: { status: 'succeeded' as const }, @@ -336,40 +328,35 @@ const destination = { }, } } finally { - await endPool(pool, 'check') + await closeClient(client, 'check') } }, async *setup({ config, catalog }) { - log.debug({ schema: config.schema }, 'dest setup: connecting to pool') - const pool = await createInstrumentedPool(config, 'setup') + log.debug({ schema: config.schema }, 'dest setup: creating client') + const client = await createManagedClient(config, 'setup') try { - await connectAndRelease(pool, 'setup') + await verifyConnectivity(client, 'setup') log.info(`Creating schema "${config.schema}" (${catalog.streams.length} streams)`) log.debug('dest setup: creating schema') - await pool.query(sql`CREATE SCHEMA IF NOT EXISTS "${config.schema}"`) - // Backward-compat: drop legacy `set_updated_at()` (CASCADE removes any orphan `handle_updated_at` triggers from older deployments). + await client.query(sql`CREATE SCHEMA IF NOT EXISTS "${config.schema}"`) log.debug('dest setup: dropping legacy set_updated_at() function') - await pool.query(sql`DROP FUNCTION IF EXISTS "${config.schema}".set_updated_at() CASCADE`) + await client.query(sql`DROP FUNCTION IF EXISTS "${config.schema}".set_updated_at() CASCADE`) - // The DO $check$ block uses ADD CONSTRAINT + EXCEPTION WHEN duplicate_object, - // which silently no-ops on a changed enum list — surface it loudly instead. - await assertEnumConstraintsConsistent(pool, config.schema, catalog.streams) + await assertEnumConstraintsConsistent(client, config.schema, catalog.streams) log.debug({ streamCount: catalog.streams.length }, 'dest setup: creating tables') - await Promise.all( - catalog.streams.map(async (cs) => { - await pool.query( - buildCreateTableDDL(config.schema, cs.stream.name, cs.stream.json_schema ?? {}, { - system_columns: cs.system_columns, - primary_key: cs.stream.primary_key, - }) - ) - }) - ) + for (const cs of catalog.streams) { + await client.query( + buildCreateTableDDL(config.schema, cs.stream.name, cs.stream.json_schema ?? {}, { + system_columns: cs.system_columns, + primary_key: cs.stream.primary_key, + }) + ) + } log.debug('dest setup: complete') } finally { - await endPool(pool, 'setup') + await closeClient(client, 'setup') } }, @@ -380,17 +367,17 @@ const destination = { `Refusing to drop protected schema "${config.schema}" — teardown only drops user-created schemas` ) } - const pool = await createInstrumentedPool(config, 'teardown') + const client = await createManagedClient(config, 'teardown') try { - await connectAndRelease(pool, 'teardown') - await pool.query(sql`DROP SCHEMA IF EXISTS "${config.schema}" CASCADE`) + await verifyConnectivity(client, 'teardown') + await client.query(sql`DROP SCHEMA IF EXISTS "${config.schema}" CASCADE`) } finally { - await endPool(pool, 'teardown') + await closeClient(client, 'teardown') } }, async *write({ config, catalog }, $stdin) { - const pool = await createInstrumentedPool(config, 'write') + const client = await createManagedClient(config, 'write') const batchSize = config.batch_size // eslint-disable-next-line @typescript-eslint/no-explicit-any const streamBuffers = new Map[]>() @@ -406,7 +393,6 @@ const destination = { const failedStreams = new Set() - /** Flush and return error message if failed, undefined if ok. */ const flushStream = async (streamName: string): Promise => { if (failedStreams.has(streamName)) return undefined const buffer = streamBuffers.get(streamName) @@ -421,12 +407,12 @@ const destination = { schema: config.schema, primary_key: pk, newer_than_field: newerThan, - ...poolStats(pool), + ...client.stats?.(), }, 'dest write: flush start' ) try { - const stats = await writeMany(pool, config.schema, streamName, buffer, pk, newerThan) + const stats = await writeMany(client, config.schema, streamName, buffer, pk, newerThan) log.debug( { stream: streamName, @@ -438,7 +424,7 @@ const destination = { deleted: stats.deleted_count, skipped: stats.skipped_count, duration_ms: Date.now() - startedAt, - ...poolStats(pool), + ...client.stats?.(), }, `dest write: upsert ${config.schema}.${streamName}` ) @@ -455,7 +441,7 @@ const destination = { duration_ms: Date.now() - startedAt, error: errMsg, err, - ...poolStats(pool), + ...client.stats?.(), }, 'dest write: flush failed' ) @@ -475,7 +461,7 @@ const destination = { } try { - await connectAndRelease(pool, 'write') + await verifyConnectivity(client, 'write') for await (const msg of $stdin) { if (msg.type === 'record') { const { stream } = msg.record @@ -548,7 +534,7 @@ const destination = { log.debug(`Postgres destination: wrote to schema "${config.schema}"`) } } finally { - await endPool(pool, 'write') + await closeClient(client, 'write') } }, } satisfies Destination diff --git a/packages/destination-postgres/src/pglite.test.ts b/packages/destination-postgres/src/pglite.test.ts new file mode 100644 index 000000000..11cfa2ed5 --- /dev/null +++ b/packages/destination-postgres/src/pglite.test.ts @@ -0,0 +1,244 @@ +import { mkdtempSync, rmSync } from 'fs' +import { tmpdir } from 'os' +import { join } from 'path' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' +import destination, { deleteMany, upsertMany, writeMany, pgliteClient } from './index.js' +import type { ManagedClient } from './client.js' +import type { ConfiguredCatalog, DestinationInput, DestinationOutput } from '@stripe/sync-protocol' +import { collectFirst, drain } from '@stripe/sync-protocol' +import type { Config } from './spec.js' + +const SCHEMA = 'test_dest' +let dataDir: string + +function makeConfig(): Config { + return { pglite: { data_dir: dataDir }, schema: SCHEMA, batch_size: 100, allow_experimental_pglite: true } +} + +beforeAll(() => { + dataDir = mkdtempSync(join(tmpdir(), 'pglite-test-')) +}) + +afterAll(() => { + rmSync(dataDir, { recursive: true, force: true }) +}) + +let nextRecordTs = Math.floor(Date.now() / 1000) +function makeRecord(stream: string, data: Record) { + return { + type: 'record' as const, + record: { + stream, + data: { _updated_at: nextRecordTs++, ...data }, + emitted_at: new Date().toISOString(), + }, + } +} + +function makeState(stream: string, data: unknown) { + return { type: 'source_state' as const, source_state: { stream, data } } +} + +async function* toAsyncIter(msgs: DestinationInput[]): AsyncIterable { + for (const msg of msgs) yield msg +} + +async function collectOutputs(iter: AsyncIterable): Promise { + const results: DestinationOutput[] = [] + for await (const msg of iter) results.push(msg) + return results +} + +async function resetSchema() { + const c = await pgliteClient({ data_dir: dataDir }) + await c.query(`DROP SCHEMA IF EXISTS "${SCHEMA}" CASCADE`) + await c.close() +} + +const catalog: ConfiguredCatalog = { + streams: [ + { + stream: { + name: 'customer', + primary_key: [['id']], + newer_than_field: '_updated_at', + metadata: {}, + }, + sync_mode: 'full_refresh', + destination_sync_mode: 'overwrite', + }, + ], +} + +describe('PGlite destination', () => { + beforeEach(async () => { + await resetSchema() + }) + + describe('check()', () => { + it('succeeds with pglite config', async () => { + const statusMsg = await collectFirst( + destination.check({ config: makeConfig() }), + 'connection_status' + ) + expect(statusMsg.connection_status.status).toBe('succeeded') + }) + }) + + describe('setup()', () => { + it('creates schema and table', async () => { + await drain(destination.setup!({ config: makeConfig(), catalog })) + + const c = await pgliteClient({ data_dir: dataDir }) + try { + const { rows } = await c.query( + `SELECT table_name FROM information_schema.tables WHERE table_schema = $1`, + [SCHEMA] + ) + expect(rows.map((r) => r.table_name)).toContain('customer') + } finally { + await c.close() + } + }) + }) + + describe('write()', () => { + beforeEach(async () => { + await drain(destination.setup!({ config: makeConfig(), catalog })) + }) + + it('upserts records via PGlite', async () => { + const messages = toAsyncIter([ + makeRecord('customer', { id: 'cus_1', name: 'Alice' }), + makeRecord('customer', { id: 'cus_2', name: 'Bob' }), + ]) + + const outputs = await collectOutputs(destination.write({ config: makeConfig(), catalog }, messages)) + const records = outputs.filter((m) => m.type === 'record') + expect(records).toHaveLength(2) + }) + + it('re-emits SourceStateMessage after flushing', async () => { + const stateData = { cursor: 'abc123' } + const messages = toAsyncIter([ + makeRecord('customer', { id: 'cus_1', name: 'Alice' }), + makeState('customer', stateData), + ]) + + const outputs = await collectOutputs(destination.write({ config: makeConfig(), catalog }, messages)) + const stateOutputs = outputs.filter((m) => m.type === 'source_state') + expect(stateOutputs).toHaveLength(1) + expect(stateOutputs[0]).toEqual({ + type: 'source_state', + source_state: { stream: 'customer', data: stateData }, + }) + }) + + it('handles upsert (ON CONFLICT update)', async () => { + const messages1 = toAsyncIter([makeRecord('customer', { id: 'cus_1', name: 'Alice' })]) + await collectOutputs(destination.write({ config: makeConfig(), catalog }, messages1)) + + const messages2 = toAsyncIter([makeRecord('customer', { id: 'cus_1', name: 'Alice Updated' })]) + await collectOutputs(destination.write({ config: makeConfig(), catalog }, messages2)) + + const c = await pgliteClient({ data_dir: dataDir }) + try { + const { rows } = await c.query( + `SELECT _raw_data->>'name' AS name FROM "${SCHEMA}".customer WHERE id = 'cus_1'` + ) + expect(rows[0].name).toBe('Alice Updated') + } finally { + await c.close() + } + }) + }) +}) + +describe('PGlite upsertMany / deleteMany / writeMany', () => { + let client: ManagedClient + + beforeEach(async () => { + client = await pgliteClient({ data_dir: dataDir }) + await client.query(`DROP SCHEMA IF EXISTS "${SCHEMA}" CASCADE`) + await client.query(`CREATE SCHEMA IF NOT EXISTS "${SCHEMA}"`) + await client.query(`CREATE TABLE IF NOT EXISTS "${SCHEMA}".customer ( + "_raw_data" jsonb NOT NULL, + "_synced_at" timestamptz NOT NULL DEFAULT now(), + "_updated_at" timestamptz NOT NULL DEFAULT now(), + "id" text GENERATED ALWAYS AS ((_raw_data->>'id')::text) STORED, + PRIMARY KEY ("id") + )`) + }) + + afterEach(async () => { + await client?.close() + }) + + it('upsertMany inserts records', async () => { + const ts = Math.floor(Date.now() / 1000) + await upsertMany( + client, + SCHEMA, + 'customer', + [ + { id: 'cus_10', name: 'Direct', _updated_at: ts }, + { id: 'cus_11', name: 'Insert', _updated_at: ts }, + ], + ['id'], + '_updated_at' + ) + + const { rows } = await client.query(`SELECT count(*)::int AS n FROM "${SCHEMA}".customer`) + expect(rows[0].n).toBe(2) + }) + + it('deleteMany removes rows', async () => { + const ts = Math.floor(Date.now() / 1000) + await upsertMany( + client, + SCHEMA, + 'customer', + [ + { id: 'cus_keep', name: 'Keep', _updated_at: ts }, + { id: 'cus_drop', name: 'Drop', _updated_at: ts }, + ], + ['id'], + '_updated_at' + ) + + const result = await deleteMany(client, SCHEMA, 'customer', [{ id: 'cus_drop' }], ['id']) + expect(result.deleted_count).toBe(1) + + const { rows } = await client.query(`SELECT id FROM "${SCHEMA}".customer ORDER BY id`) + expect(rows).toEqual([{ id: 'cus_keep' }]) + }) + + it('writeMany routes mixed batch to upsert and delete', async () => { + const ts = Math.floor(Date.now() / 1000) + await upsertMany( + client, + SCHEMA, + 'customer', + [{ id: 'cus_old', name: 'Old', _updated_at: ts }], + ['id'], + '_updated_at' + ) + + const result = await writeMany( + client, + SCHEMA, + 'customer', + [ + { data: { id: 'cus_new', name: 'New', _updated_at: ts + 1 } }, + { recordDeleted: true, data: { id: 'cus_old', _updated_at: ts + 1 } }, + ], + ['id'], + '_updated_at' + ) + expect(result.created_count).toBe(1) + expect(result.deleted_count).toBe(1) + + const { rows } = await client.query(`SELECT id FROM "${SCHEMA}".customer ORDER BY id`) + expect(rows).toEqual([{ id: 'cus_new' }]) + }) +}) diff --git a/packages/destination-postgres/src/spec.ts b/packages/destination-postgres/src/spec.ts index f7b67b869..c52dd6b83 100644 --- a/packages/destination-postgres/src/spec.ts +++ b/packages/destination-postgres/src/spec.ts @@ -19,6 +19,19 @@ export const configSchema = z }) .optional() .describe('AWS RDS IAM authentication config'), + allow_experimental_pglite: z + .boolean() + .optional() + .describe('Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs)'), + pglite: z + .union([ + z.literal(true), + z.object({ + data_dir: z.string().optional().describe('Directory for persistent storage (omit for in-memory)'), + }), + ]) + .optional() + .describe('Use PGlite (in-process WASM Postgres) instead of connecting to an external server'), ssl_ca_pem: z .string() .optional() @@ -30,6 +43,23 @@ export const configSchema = z message: 'Specify either url/connection_string or aws config, not both', path: ['aws'], }) + .refine((config) => !(config.pglite && (config.url || config.connection_string || config.aws)), { + message: 'Specify pglite OR url/connection_string/aws, not both', + path: ['pglite'], + }) + .refine( + (config) => { + if (config.pglite) return config.allow_experimental_pglite === true + const url = config.url ?? config.connection_string + if (url && (url.startsWith('file://') || url.startsWith('memory://'))) + return config.allow_experimental_pglite === true + return true + }, + { + message: 'Set allow_experimental_pglite: true to use PGlite or file:///memory:// URLs', + path: ['allow_experimental_pglite'], + } + ) export type Config = z.infer diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 537f9f18e..6b1d1f081 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -534,6 +534,9 @@ importers: '@aws-sdk/rds-signer': specifier: ^3.1013.0 version: 3.1013.0 + '@electric-sql/pglite': + specifier: ^0.4.5 + version: 0.4.5 '@types/pg': specifier: ^8.15.5 version: 8.15.6 @@ -1043,6 +1046,9 @@ packages: '@electric-sql/pglite@0.2.17': resolution: {integrity: sha512-qEpKRT2oUaWDH6tjRxLHjdzMqRUGYDnGZlKrnL4dJ77JVMcP2Hpo3NYnOSPKdZdeec57B6QPprCUFg0picx5Pw==} + '@electric-sql/pglite@0.4.5': + resolution: {integrity: sha512-aGG2zGEyZzGWKy8P+9ZoNUV0jxt1+hgbeTf+bVAYyxVZZLXg3/9aFlfLxb08AYZVAfAkQlQIysmWjhc5hwDG8g==} + '@emnapi/runtime@1.9.1': resolution: {integrity: sha512-VYi5+ZVLhpgK4hQ0TAjiQiZ6ol0oe4mBx7mVv7IflsiEp0OWoVsp/+f9Vc1hOhE0TtkORVrI1GvzyreqpgWtkA==} @@ -5598,6 +5604,8 @@ snapshots: '@electric-sql/pglite@0.2.17': {} + '@electric-sql/pglite@0.4.5': {} + '@emnapi/runtime@1.9.1': dependencies: tslib: 2.8.1 @@ -7321,14 +7329,6 @@ snapshots: chai: 5.2.0 tinyrainbow: 2.0.0 - '@vitest/mocker@3.2.4(vite@7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1))': - dependencies: - '@vitest/spy': 3.2.4 - estree-walker: 3.0.3 - magic-string: 0.30.21 - optionalDependencies: - vite: 7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) - '@vitest/mocker@3.2.4(vite@7.2.2(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1))': dependencies: '@vitest/spy': 3.2.4 @@ -9391,7 +9391,7 @@ snapshots: dependencies: '@types/chai': 5.2.2 '@vitest/expect': 3.2.4 - '@vitest/mocker': 3.2.4(vite@7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1)) + '@vitest/mocker': 3.2.4(vite@7.2.2(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1)) '@vitest/pretty-format': 3.2.4 '@vitest/runner': 3.2.4 '@vitest/snapshot': 3.2.4