Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/world-postgres-no-prepared-statements.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/world-postgres": minor
---

Add `noPreparedStatements` option (and `WORKFLOW_POSTGRES_NO_PREPARED_STATEMENTS` env var) that forwards graphile-worker's `noPreparedStatements: true` to its internal `run()` and `makeWorkerUtils()` calls. Required for pools that cannot honour per-session prepared statements, such as PgBouncer in transaction pooling mode or PGlite-socket (PGlite multiplexes many TCP clients onto a single WASM session).
3 changes: 3 additions & 0 deletions packages/world-postgres/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const world = createWorld({
jobPrefix: "myapp", // optional
queueConcurrency: 10, // optional
maxPoolSize: 10, // optional, overrides WORKFLOW_POSTGRES_MAX_POOL_SIZE when `pool` is omitted
noPreparedStatements: true, // optional, set when running against a pool that cannot honour prepared statements (PgBouncer txn mode, PGlite-socket)
});

// Or pass an existing pg.Pool (shared with your app Drizzle, etc.); `world.close()` will not end it.
Expand All @@ -70,6 +71,7 @@ const worldFromPool = createWorld({ pool });
| `pool` | `pg.Pool` | — | Optional. When set, used for Drizzle, Graphile Worker, and stream writes. `world.close()` does not end it. |
| `jobPrefix` | `string` | `process.env.WORKFLOW_POSTGRES_JOB_PREFIX` | Optional prefix for queue job names |
| `queueConcurrency` | `number` | `10` | Number of concurrent active step executions per process |
| `noPreparedStatements` | `boolean` | `process.env.WORKFLOW_POSTGRES_NO_PREPARED_STATEMENTS` (`1`/`true`) | Forwards graphile-worker's `noPreparedStatements: true` to its internal `run()`/`makeWorkerUtils()`. Required when the pool routes traffic through a layer that cannot honour per-session prepared statements, such as PgBouncer in transaction pooling mode or PGlite-socket. |

## Environment Variables

Expand All @@ -80,6 +82,7 @@ const worldFromPool = createWorld({ pool });
| `WORKFLOW_POSTGRES_JOB_PREFIX` | Prefix for queue job names | - |
| `WORKFLOW_POSTGRES_WORKER_CONCURRENCY` | Number of concurrent workers | `10` |
| `WORKFLOW_POSTGRES_MAX_POOL_SIZE` | Internal `pg.Pool` max size | `10` |
| `WORKFLOW_POSTGRES_NO_PREPARED_STATEMENTS` | Set to `1` or `true` to disable prepared statements in graphile-worker (PgBouncer txn mode, PGlite-socket) | - |

When `pool` is omitted, `maxPoolSize` precedence is: `createWorld({ maxPoolSize })`, then `WORKFLOW_POSTGRES_MAX_POOL_SIZE`, then the `pg.Pool` default.

Expand Down
9 changes: 9 additions & 0 deletions packages/world-postgres/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,13 @@ export type PostgresWorldConfig = PgConnectionConfig & {
* Default is 10ms. Set to 0 for immediate flushing.
*/
streamFlushIntervalMs?: number;
/**
* Disable prepared statements in the embedded graphile-worker. Required when
* the connection pool routes traffic through a layer that cannot honour
* per-session prepared statements, such as PgBouncer in transaction pooling
* mode or PGlite-socket (PGlite multiplexes many TCP clients onto a single
* WASM session). When `true`, graphile-worker's `noPreparedStatements` flag
* is forwarded to its internal `run()` and `makeWorkerUtils()` calls.
*/
noPreparedStatements?: boolean;
};
15 changes: 14 additions & 1 deletion packages/world-postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ function getDefaultMaxPoolSize(): number | undefined {
return Number.isFinite(parsed) && parsed > 0 ? parsed : undefined;
}

function getDefaultNoPreparedStatements(): boolean | undefined {
const raw = process.env.WORKFLOW_POSTGRES_NO_PREPARED_STATEMENTS;
if (raw === undefined) return undefined;
return raw === '1' || raw.toLowerCase() === 'true';
}

export function createWorld(
config: PostgresWorldConfig = {
connectionString:
Expand All @@ -51,8 +57,15 @@ export function createWorld(
...(maxPoolSize !== undefined ? { max: maxPoolSize } : {}),
});

const noPreparedStatements =
config.noPreparedStatements ?? getDefaultNoPreparedStatements();
const queueConfig: PostgresWorldConfig =
noPreparedStatements === undefined
? config
: { ...config, noPreparedStatements };

const drizzle = createClient(pool);
const queue = createQueue(config, pool);
const queue = createQueue(queueConfig, pool);
const storage = createStorage(drizzle);
const streamer = createStreamer(pool, drizzle);

Expand Down
25 changes: 25 additions & 0 deletions packages/world-postgres/src/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,31 @@ describe('postgres queue http execution', () => {
}
});

it('forwards noPreparedStatements to graphile-worker when configured', async () => {
const queue = buildQueue(
{ connectionString: 'postgres://test', noPreparedStatements: true },
pool
);
await queue.start();

expect(makeWorkerUtils).toHaveBeenCalledWith(
expect.objectContaining({ noPreparedStatements: true })
);
expect(run).toHaveBeenCalledWith(
expect.objectContaining({ noPreparedStatements: true })
);
});

it('omits noPreparedStatements by default to keep prepared statements enabled', async () => {
const queue = buildQueue({ connectionString: 'postgres://test' }, pool);
await queue.start();

const workerUtilsArgs = vi.mocked(makeWorkerUtils).mock.calls[0]?.[0];
const runArgs = vi.mocked(run).mock.calls[0]?.[0];
expect(workerUtilsArgs).not.toHaveProperty('noPreparedStatements');
expect(runArgs).not.toHaveProperty('noPreparedStatements');
});

it('queues producer delays and headers in graphile job metadata', async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date('2024-01-01T00:00:00.000Z'));
Expand Down
4 changes: 4 additions & 0 deletions packages/world-postgres/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ export function createQueue(
workerUtils = await makeWorkerUtils({
pgPool: pool,
logger: graphileLogger,
...(config.noPreparedStatements
? { noPreparedStatements: true }
: {}),
});
await workerUtils.migrate();
await migratePgBossJobs(workerUtils);
Expand Down Expand Up @@ -499,6 +502,7 @@ export function createQueue(
logger: graphileLogger,
pollInterval: 500, // 500ms = 0.5s (graphile-worker uses LISTEN/NOTIFY when available)
taskList,
...(config.noPreparedStatements ? { noPreparedStatements: true } : {}),
});
}

Expand Down
Loading