From f1dd63689b8c4c191b81f41c715ce27d742eb0c7 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 30 Apr 2026 11:43:27 +0000 Subject: [PATCH] feat(world-postgres): add noPreparedStatements option for PgBouncer/PGlite Forwards graphile-worker's noPreparedStatements flag through createWorld() config and a WORKFLOW_POSTGRES_NO_PREPARED_STATEMENTS env var. Required when the pool routes traffic through a layer that cannot honour per-session prepared statements (PgBouncer txn mode, PGlite-socket). --- .../world-postgres-no-prepared-statements.md | 5 ++++ packages/world-postgres/README.md | 3 +++ packages/world-postgres/src/config.ts | 9 +++++++ packages/world-postgres/src/index.ts | 15 ++++++++++- packages/world-postgres/src/queue.test.ts | 25 +++++++++++++++++++ packages/world-postgres/src/queue.ts | 4 +++ 6 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 .changeset/world-postgres-no-prepared-statements.md diff --git a/.changeset/world-postgres-no-prepared-statements.md b/.changeset/world-postgres-no-prepared-statements.md new file mode 100644 index 0000000000..ec5a12c4ac --- /dev/null +++ b/.changeset/world-postgres-no-prepared-statements.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-postgres": minor +--- + +Add `noPreparedStatements` option (and `WORKFLOW_POSTGRES_NO_PREPARED_STATEMENTS` env var) that forwards graphile-worker's `noPreparedStatements: true` to its internal `run()` and `makeWorkerUtils()` calls. Required for pools that cannot honour per-session prepared statements, such as PgBouncer in transaction pooling mode or PGlite-socket (PGlite multiplexes many TCP clients onto a single WASM session). diff --git a/packages/world-postgres/README.md b/packages/world-postgres/README.md index 9363aff0a3..9ce67567df 100644 --- a/packages/world-postgres/README.md +++ b/packages/world-postgres/README.md @@ -53,6 +53,7 @@ const world = createWorld({ jobPrefix: "myapp", // optional queueConcurrency: 10, // optional maxPoolSize: 10, // optional, overrides WORKFLOW_POSTGRES_MAX_POOL_SIZE when `pool` is omitted + noPreparedStatements: true, // optional, set when running against a pool that cannot honour prepared statements (PgBouncer txn mode, PGlite-socket) }); // Or pass an existing pg.Pool (shared with your app Drizzle, etc.); `world.close()` will not end it. @@ -70,6 +71,7 @@ const worldFromPool = createWorld({ pool }); | `pool` | `pg.Pool` | — | Optional. When set, used for Drizzle, Graphile Worker, and stream writes. `world.close()` does not end it. | | `jobPrefix` | `string` | `process.env.WORKFLOW_POSTGRES_JOB_PREFIX` | Optional prefix for queue job names | | `queueConcurrency` | `number` | `10` | Number of concurrent active step executions per process | +| `noPreparedStatements` | `boolean` | `process.env.WORKFLOW_POSTGRES_NO_PREPARED_STATEMENTS` (`1`/`true`) | Forwards graphile-worker's `noPreparedStatements: true` to its internal `run()`/`makeWorkerUtils()`. Required when the pool routes traffic through a layer that cannot honour per-session prepared statements, such as PgBouncer in transaction pooling mode or PGlite-socket. | ## Environment Variables @@ -80,6 +82,7 @@ const worldFromPool = createWorld({ pool }); | `WORKFLOW_POSTGRES_JOB_PREFIX` | Prefix for queue job names | - | | `WORKFLOW_POSTGRES_WORKER_CONCURRENCY` | Number of concurrent workers | `10` | | `WORKFLOW_POSTGRES_MAX_POOL_SIZE` | Internal `pg.Pool` max size | `10` | +| `WORKFLOW_POSTGRES_NO_PREPARED_STATEMENTS` | Set to `1` or `true` to disable prepared statements in graphile-worker (PgBouncer txn mode, PGlite-socket) | - | When `pool` is omitted, `maxPoolSize` precedence is: `createWorld({ maxPoolSize })`, then `WORKFLOW_POSTGRES_MAX_POOL_SIZE`, then the `pg.Pool` default. diff --git a/packages/world-postgres/src/config.ts b/packages/world-postgres/src/config.ts index ca778914ea..a7e1ef53cd 100644 --- a/packages/world-postgres/src/config.ts +++ b/packages/world-postgres/src/config.ts @@ -12,4 +12,13 @@ export type PostgresWorldConfig = PgConnectionConfig & { * Default is 10ms. Set to 0 for immediate flushing. */ streamFlushIntervalMs?: number; + /** + * Disable prepared statements in the embedded graphile-worker. Required when + * the connection pool routes traffic through a layer that cannot honour + * per-session prepared statements, such as PgBouncer in transaction pooling + * mode or PGlite-socket (PGlite multiplexes many TCP clients onto a single + * WASM session). When `true`, graphile-worker's `noPreparedStatements` flag + * is forwarded to its internal `run()` and `makeWorkerUtils()` calls. + */ + noPreparedStatements?: boolean; }; diff --git a/packages/world-postgres/src/index.ts b/packages/world-postgres/src/index.ts index 31fb1d84ac..414dc844cc 100644 --- a/packages/world-postgres/src/index.ts +++ b/packages/world-postgres/src/index.ts @@ -30,6 +30,12 @@ function getDefaultMaxPoolSize(): number | undefined { return Number.isFinite(parsed) && parsed > 0 ? parsed : undefined; } +function getDefaultNoPreparedStatements(): boolean | undefined { + const raw = process.env.WORKFLOW_POSTGRES_NO_PREPARED_STATEMENTS; + if (raw === undefined) return undefined; + return raw === '1' || raw.toLowerCase() === 'true'; +} + export function createWorld( config: PostgresWorldConfig = { connectionString: @@ -51,8 +57,15 @@ export function createWorld( ...(maxPoolSize !== undefined ? { max: maxPoolSize } : {}), }); + const noPreparedStatements = + config.noPreparedStatements ?? getDefaultNoPreparedStatements(); + const queueConfig: PostgresWorldConfig = + noPreparedStatements === undefined + ? config + : { ...config, noPreparedStatements }; + const drizzle = createClient(pool); - const queue = createQueue(config, pool); + const queue = createQueue(queueConfig, pool); const storage = createStorage(drizzle); const streamer = createStreamer(pool, drizzle); diff --git a/packages/world-postgres/src/queue.test.ts b/packages/world-postgres/src/queue.test.ts index dd2122ecde..1ccbc8d9a1 100644 --- a/packages/world-postgres/src/queue.test.ts +++ b/packages/world-postgres/src/queue.test.ts @@ -287,6 +287,31 @@ describe('postgres queue http execution', () => { } }); + it('forwards noPreparedStatements to graphile-worker when configured', async () => { + const queue = buildQueue( + { connectionString: 'postgres://test', noPreparedStatements: true }, + pool + ); + await queue.start(); + + expect(makeWorkerUtils).toHaveBeenCalledWith( + expect.objectContaining({ noPreparedStatements: true }) + ); + expect(run).toHaveBeenCalledWith( + expect.objectContaining({ noPreparedStatements: true }) + ); + }); + + it('omits noPreparedStatements by default to keep prepared statements enabled', async () => { + const queue = buildQueue({ connectionString: 'postgres://test' }, pool); + await queue.start(); + + const workerUtilsArgs = vi.mocked(makeWorkerUtils).mock.calls[0]?.[0]; + const runArgs = vi.mocked(run).mock.calls[0]?.[0]; + expect(workerUtilsArgs).not.toHaveProperty('noPreparedStatements'); + expect(runArgs).not.toHaveProperty('noPreparedStatements'); + }); + it('queues producer delays and headers in graphile job metadata', async () => { vi.useFakeTimers(); vi.setSystemTime(new Date('2024-01-01T00:00:00.000Z')); diff --git a/packages/world-postgres/src/queue.ts b/packages/world-postgres/src/queue.ts index d432cb0d11..385c1308dc 100644 --- a/packages/world-postgres/src/queue.ts +++ b/packages/world-postgres/src/queue.ts @@ -336,6 +336,9 @@ export function createQueue( workerUtils = await makeWorkerUtils({ pgPool: pool, logger: graphileLogger, + ...(config.noPreparedStatements + ? { noPreparedStatements: true } + : {}), }); await workerUtils.migrate(); await migratePgBossJobs(workerUtils); @@ -499,6 +502,7 @@ export function createQueue( logger: graphileLogger, pollInterval: 500, // 500ms = 0.5s (graphile-worker uses LISTEN/NOTIFY when available) taskList, + ...(config.noPreparedStatements ? { noPreparedStatements: true } : {}), }); }