diff --git a/.changeset/world-postgres-listen-self-healing.md b/.changeset/world-postgres-listen-self-healing.md new file mode 100644 index 0000000000..1b1fe27728 --- /dev/null +++ b/.changeset/world-postgres-listen-self-healing.md @@ -0,0 +1,5 @@ +--- +'@workflow/world-postgres': patch +--- + +Make stream delivery durable across `LISTEN`/`NOTIFY` interruptions: the dedicated `pg.Client` now reconnects with bounded exponential backoff (250 ms → 30 s), and `streams.get` runs a periodic re-query of `streams WHERE chunk_id > lastChunkId` as a polling safety net for chunks delivered while the LISTEN socket was down. The poll interval is configurable via `PostgresWorldConfig.streamPollIntervalMs` (default 5000 ms; set to 0 to disable). Tracks vercel/workflow#1855. diff --git a/packages/world-postgres/src/config.ts b/packages/world-postgres/src/config.ts index ca778914ea..1c77020b39 100644 --- a/packages/world-postgres/src/config.ts +++ b/packages/world-postgres/src/config.ts @@ -12,4 +12,12 @@ export type PostgresWorldConfig = PgConnectionConfig & { * Default is 10ms. Set to 0 for immediate flushing. */ streamFlushIntervalMs?: number; + /** + * How often (ms) `streams.get` re-queries the `streams` table as a safety + * net for chunks delivered while the LISTEN client was reconnecting. + * Default is 5000. Set to 0 to disable polling entirely. + * + * See `CreateStreamerOptions.pollIntervalMs` for the full contract. + */ + streamPollIntervalMs?: number; }; diff --git a/packages/world-postgres/src/index.ts b/packages/world-postgres/src/index.ts index 31fb1d84ac..eed68cbade 100644 --- a/packages/world-postgres/src/index.ts +++ b/packages/world-postgres/src/index.ts @@ -54,7 +54,9 @@ export function createWorld( const drizzle = createClient(pool); const queue = createQueue(config, pool); const storage = createStorage(drizzle); - const streamer = createStreamer(pool, drizzle); + const streamer = createStreamer(pool, drizzle, { + pollIntervalMs: config.streamPollIntervalMs, + }); return { specVersion: SPEC_VERSION_CURRENT, diff --git a/packages/world-postgres/src/streamer.ts b/packages/world-postgres/src/streamer.ts index a79c3ebd75..dee4184491 100644 --- a/packages/world-postgres/src/streamer.ts +++ b/packages/world-postgres/src/streamer.ts @@ -45,36 +45,123 @@ class Rc { /** * Subscribe to a PostgreSQL NOTIFY channel using a dedicated client created - * from the pool's connection options. `channel` must be a trusted identifier. + * from the pool's connection options. `channel` must be a trusted identifier + * (interpolated into the LISTEN statement; `pg` does not parameterise + * identifiers). + * + * The dedicated `Client` is long-lived and will eventually be dropped by the + * server (idle TCP timeout, pgbouncer rotation, k8s CNI eviction). Without + * reconnect handling, a process running for more than a few hours stops + * receiving notifications and only a restart restores delivery + * (cf. brianc/node-postgres#967). + * + * This implementation wraps the client in a reconnect loop with bounded + * exponential backoff (250 ms → 30 s cap). The initial connect must succeed + * (callers expect a live subscription before the promise resolves); + * subsequent reconnects are best-effort. Notifications fired while the + * dedicated client is reconnecting are lost on the wire — the polling + * fallback in {@link createStreamer}'s `streams.get` re-queries chunks from + * the database on its periodic tick, so end-to-end delivery stays correct + * even across LISTEN gaps. */ export const listenChannel = async ( pool: Pool, channel: string, onPayload: (payload: string) => Promise ): Promise<{ close: () => Promise }> => { - const client = new Client(pool.options); - - try { - await client.connect(); - await client.query(`LISTEN ${channel}`); - } catch (err) { - await client.end().catch(() => {}); - throw err; - } + let client: Client | null = null; + let closed = false; + let reconnectAttempt = 0; + let reconnectTimer: ReturnType | null = null; const onNotification = (msg: { payload?: string | undefined }) => { onPayload(msg.payload ?? '').catch(() => {}); }; - client.on('notification', onNotification); + const detach = (c: Client) => { + try { + c.removeListener('notification', onNotification); + } catch { + // listener may already be detached + } + c.end().catch(() => {}); + }; + + const scheduleReconnect = () => { + if (closed || reconnectTimer) return; + const delay = Math.min(30_000, 250 * 2 ** reconnectAttempt); + reconnectAttempt++; + reconnectTimer = setTimeout(() => { + reconnectTimer = null; + if (closed) return; + connect().catch((err) => { + console.warn( + '[world-postgres listenChannel] reconnect failed', + (err as Error)?.message ?? err + ); + scheduleReconnect(); + }); + }, delay); + }; + + const connect = async () => { + if (closed) return; + const next = new Client(pool.options); + next.on('error', (err) => { + console.warn( + '[world-postgres listenChannel] client error', + (err as Error)?.message ?? err + ); + if (client === next) client = null; + detach(next); + scheduleReconnect(); + }); + next.on('end', () => { + if (closed) return; + if (client === next) client = null; + scheduleReconnect(); + }); + try { + await next.connect(); + await next.query(`LISTEN ${channel}`); + } catch (err) { + await next.end().catch(() => {}); + throw err; + } + // `close()` may have been called while we were awaiting connect/LISTEN. + // Without this guard the freshly-connected client would attach a + // notification listener and survive past `close()` — a slow reconnect + // could outlive the subscription it's meant to back. + if (closed) { + await next.end().catch(() => {}); + return; + } + next.on('notification', onNotification); + client = next; + reconnectAttempt = 0; + }; + + await connect(); return { close: async () => { - client.removeListener('notification', onNotification); + closed = true; + if (reconnectTimer) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + const c = client; + client = null; + if (!c) return; try { - await client.query(`UNLISTEN ${channel}`); + c.removeListener('notification', onNotification); + } catch { + // listener may already be detached + } + try { + await c.query(`UNLISTEN ${channel}`); } finally { - await client.end(); + await c.end().catch(() => {}); } }, }; @@ -85,7 +172,28 @@ export type PostgresStreamer = Streamer & { close(): Promise; }; -export function createStreamer(pool: Pool, drizzle: Drizzle): PostgresStreamer { +export type CreateStreamerOptions = { + /** + * How often (ms) `streams.get` re-queries the `streams` table for chunks + * past `lastChunkId` as a safety net for notifications dropped while the + * LISTEN client was reconnecting. The poll dedupes against in-band + * notifications via the existing `enqueue` ordering check, so it is safe + * to run alongside `LISTEN/NOTIFY`. + * + * Lower values reduce recovery latency after a LISTEN disconnect; higher + * values reduce baseline DB load (one extra `SELECT` per active reader per + * tick). Set to `0` to disable polling — only do this if you know the + * LISTEN connection cannot be interrupted (e.g. tests). Default: 5000. + */ + pollIntervalMs?: number; +}; + +export function createStreamer( + pool: Pool, + drizzle: Drizzle, + options: CreateStreamerOptions = {} +): PostgresStreamer { + const pollIntervalMs = options.pollIntervalMs ?? 5_000; const ulid = monotonicFactory(); const events = new EventEmitter<{ [key: `strm:${string}`]: [StreamChunkEvent]; @@ -361,17 +469,49 @@ export function createStreamer(pool: Pool, drizzle: Drizzle): PostgresStreamer { let lastChunkId = ''; let offset = startIndex ?? 0; let buffer = [] as StreamChunkEvent[] | null; + let polling = false; + let closed = false; + let pollTimer: ReturnType | null = null; + + function onData(data: StreamChunkEvent) { + if (buffer) { + buffer.push(data); + return; + } + enqueue(data); + } + + // Idempotent teardown for the reader: detach the EventEmitter + // listener and clear the polling timer. Called both from + // `cancel()` (consumer aborts) and from `enqueue` on EOF + // (natural completion) so the polling timer doesn't keep + // ticking indefinitely after the controller has closed. + const stop = () => { + closed = true; + if (pollTimer) { + clearInterval(pollTimer); + pollTimer = null; + } + events.off(`strm:${name}`, onData); + }; function enqueue(msg: { id: string; data: Uint8Array; eof: boolean; }) { - if (lastChunkId >= msg.id) { - // already sent or out of order + if (closed || lastChunkId >= msg.id) { + // already sent, out of order, or stream torn down return; } + // Advance the high-water mark before any early return. The + // polling fallback re-queries `chunk_id > lastChunkId` and + // would otherwise re-enqueue chunks we intentionally skipped + // for `startIndex`, double-decrementing `offset` and + // eventually mis-delivering them. + lastChunkId = msg.id; + if (offset > 0) { offset--; return; @@ -382,21 +522,12 @@ export function createStreamer(pool: Pool, drizzle: Drizzle): PostgresStreamer { } if (msg.eof) { controller.close(); + stop(); } - lastChunkId = msg.id; } - function onData(data: StreamChunkEvent) { - if (buffer) { - buffer.push(data); - return; - } - enqueue(data); - } events.on(`strm:${name}`, onData); - cleanups.push(() => { - events.off(`strm:${name}`, onData); - }); + cleanups.push(stop); const chunks = await drizzle .select({ @@ -422,6 +553,57 @@ export function createStreamer(pool: Pool, drizzle: Drizzle): PostgresStreamer { enqueue(chunk); } buffer = null; + + // Polling fallback. NOTIFY is the fast path, but events are + // silently dropped while the dedicated LISTEN client is + // reconnecting. A light periodic re-query of chunks past + // `lastChunkId` is the always-on safety net: every + // `pollIntervalMs` it pulls any chunks the EventEmitter missed, + // deduped by the `enqueue` ordering check. The timer is cleared + // by `stop()` on EOF or cancel; transient query failures are + // logged so the next tick can retry. + const runPoll = async () => { + const fresh = await drizzle + .select({ + id: streams.chunkId, + eof: streams.eof, + data: streams.chunkData, + }) + .from(streams) + .where( + and( + eq(streams.streamId, name), + gt(streams.chunkId, lastChunkId as `chnk_${string}`) + ) + ) + .orderBy(streams.chunkId); + for (const chunk of fresh) { + if (closed) return; + enqueue(chunk); + } + }; + + const tick = async () => { + if (polling || closed) return; + polling = true; + try { + await runPoll(); + } catch (err) { + // Best-effort. Logs only; the next tick retries. + console.warn( + '[world-postgres streams.get] poll failed', + (err as Error)?.message ?? err + ); + } finally { + polling = false; + } + }; + + // Initial chunks may have already delivered EOF; in that case + // `stop()` cleared the flag and we don't start polling at all. + if (!closed && pollIntervalMs > 0) { + pollTimer = setInterval(tick, pollIntervalMs); + } }, cancel() { cleanups.forEach((fn) => void fn()); diff --git a/packages/world-postgres/test/streamer.test.ts b/packages/world-postgres/test/streamer.test.ts new file mode 100644 index 0000000000..5c9d9b1735 --- /dev/null +++ b/packages/world-postgres/test/streamer.test.ts @@ -0,0 +1,346 @@ +import { execSync } from 'node:child_process'; +import { PostgreSqlContainer } from '@testcontainers/postgresql'; +import { Pool } from 'pg'; +import { monotonicFactory } from 'ulid'; +import { + afterAll, + beforeAll, + beforeEach, + describe, + expect, + it, + test, + vi, +} from 'vitest'; +import { createClient, type Drizzle } from '../src/drizzle/index.js'; +import { createStreamer, listenChannel } from '../src/streamer.js'; + +async function readNext( + reader: ReadableStreamDefaultReader, + timeoutMs: number +): Promise> { + let timer: ReturnType | undefined; + try { + return await Promise.race([ + reader.read(), + new Promise((_, reject) => { + timer = setTimeout( + () => reject(new Error(`read() timed out after ${timeoutMs}ms`)), + timeoutMs + ); + }), + ]); + } finally { + if (timer) clearTimeout(timer); + } +} + +async function killOtherBackends(connectionString: string): Promise { + // Terminate every backend in this database except the one running the + // query — that includes the dedicated LISTEN client and any idle pool + // connections. We use a throwaway pool so the issuing connection itself + // doesn't survive in the shared pool (where a later acquire could pick it + // up and emit a stale-socket error). + const killer = new Pool({ connectionString, max: 1 }); + try { + killer.on('error', () => { + /* swallow tear-down errors */ + }); + await killer.query( + `SELECT pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE pid <> pg_backend_pid() + AND datname = current_database()` + ); + } finally { + await killer.end().catch(() => {}); + } +} + +describe('Streamer (Postgres integration)', () => { + if (process.platform === 'win32') { + test.skip('skipped on Windows since it relies on a docker container', () => {}); + return; + } + + let container: Awaited>; + let connectionString: string; + let pool: Pool; + let drizzle: Drizzle; + const ulid = monotonicFactory(); + + beforeAll(async () => { + container = await new PostgreSqlContainer('postgres:15-alpine').start(); + connectionString = container.getConnectionUri(); + process.env.DATABASE_URL = connectionString; + process.env.WORKFLOW_POSTGRES_URL = connectionString; + + execSync('pnpm db:push', { + stdio: 'inherit', + cwd: process.cwd(), + env: process.env, + }); + + pool = new Pool({ connectionString, max: 4 }); + // Stale-socket errors can surface on idle pool clients after we + // pg_terminate_backend them. Pool re-acquires fresh connections + // automatically; we just need to keep the error event from going unhandled. + pool.on('error', () => { + /* swallow */ + }); + drizzle = createClient(pool); + }, 120_000); + + beforeEach(async () => { + await pool.query( + 'TRUNCATE TABLE workflow.workflow_stream_chunks RESTART IDENTITY CASCADE' + ); + }); + + afterAll(async () => { + await pool.end(); + await container.stop(); + }); + + it('polling fallback delivers chunks inserted without NOTIFY', async () => { + // Polling-only path: short interval, no LISTEN involvement (we insert + // rows via raw SQL so NOTIFY never fires for them). The reader must + // still observe both the data chunk and EOF, proving the safety-net + // SELECT in `streams.get` is wired up correctly. + const streamer = createStreamer(pool, drizzle, { pollIntervalMs: 100 }); + try { + const streamName = 'stream-poll-only'; + const stream = await streamer.streams.get('run-1', streamName); + const reader = stream.getReader(); + + // Give createStreamer's LISTEN subscription a moment to settle so the + // initial DB read in start() has run. Polling tick fires every 100ms. + await new Promise((r) => setTimeout(r, 50)); + + // Insert directly — bypass streams.write so no pg_notify fires. + const chunkId = `chnk_${ulid()}`; + await pool.query( + `INSERT INTO workflow.workflow_stream_chunks (id, stream_id, run_id, data, eof) + VALUES ($1, $2, $3, $4, false)`, + [chunkId, streamName, 'run-1', Buffer.from([1, 2, 3])] + ); + + const first = await readNext(reader, 3_000); + expect(first.done).toBe(false); + expect(Array.from(first.value ?? [])).toEqual([1, 2, 3]); + + // Insert EOF marker — also via raw SQL, also relies on polling. + const eofId = `chnk_${ulid()}`; + await pool.query( + `INSERT INTO workflow.workflow_stream_chunks (id, stream_id, run_id, data, eof) + VALUES ($1, $2, $3, $4, true)`, + [eofId, streamName, 'run-1', Buffer.from([])] + ); + + const second = await readNext(reader, 3_000); + expect(second.done).toBe(true); + + reader.releaseLock(); + } finally { + await streamer.close(); + } + }, 15_000); + + it('reader recovers after LISTEN backend is terminated', async () => { + // End-to-end resilience: kill every backend (drops the dedicated LISTEN + // client), then publish a chunk via streams.write. The NOTIFY for that + // chunk reaches no live listener, so the only path to the reader is the + // polling fallback. We assert the chunk still arrives. + const streamer = createStreamer(pool, drizzle, { pollIntervalMs: 100 }); + try { + const streamName = 'stream-reconnect'; + const stream = await streamer.streams.get('run-2', streamName); + const reader = stream.getReader(); + + // Wait for LISTEN to be established before terminating it. + await new Promise((r) => setTimeout(r, 200)); + + await killOtherBackends(connectionString); + + // Pool may briefly hand out the now-dead connection on the first call; + // pg-Pool retries internally, but if our write itself races the kill we + // retry once. This mirrors what application code would do. + let writeOk = false; + for (let attempt = 0; attempt < 3 && !writeOk; attempt++) { + try { + await streamer.streams.write( + 'run-2', + streamName, + new Uint8Array([7, 8, 9]) + ); + writeOk = true; + } catch { + await new Promise((r) => setTimeout(r, 100)); + } + } + expect(writeOk).toBe(true); + + const first = await readNext(reader, 5_000); + expect(first.done).toBe(false); + expect(Array.from(first.value ?? [])).toEqual([7, 8, 9]); + + reader.releaseLock(); + } finally { + await streamer.close(); + } + }, 20_000); + + it('startIndex skip is idempotent across polling ticks', async () => { + // Regression test: with polling enabled, `streams.get` re-queries + // `chunk_id > lastChunkId` every tick. If `enqueue` skipped a chunk for + // `startIndex` without advancing `lastChunkId`, the same chunk would + // come back on the next poll and be skipped again — eventually + // exhausting `offset` against the same physical chunks and delivering + // them anyway. + const streamer = createStreamer(pool, drizzle, { pollIntervalMs: 100 }); + try { + const streamName = 'stream-startindex-poll'; + + // Pre-populate two chunks. With startIndex=2 they must be skipped. + const idA = `chnk_${ulid()}`; + const idB = `chnk_${ulid()}`; + await pool.query( + `INSERT INTO workflow.workflow_stream_chunks (id, stream_id, run_id, data, eof) + VALUES ($1, $2, $3, $4, false), ($5, $2, $3, $6, false)`, + [ + idA, + streamName, + 'run-3', + Buffer.from([0xaa]), + idB, + Buffer.from([0xbb]), + ] + ); + + const stream = await streamer.streams.get('run-3', streamName, 2); + const reader = stream.getReader(); + + // Let several poll ticks run with only the two skip-chunks in DB. If + // the bug regresses, polling will eventually deliver one of them. + await new Promise((r) => setTimeout(r, 500)); + + // Append a third chunk — this is the one the reader should receive. + const idC = `chnk_${ulid()}`; + await pool.query( + `INSERT INTO workflow.workflow_stream_chunks (id, stream_id, run_id, data, eof) + VALUES ($1, $2, $3, $4, false)`, + [idC, streamName, 'run-3', Buffer.from([0xcc])] + ); + + const next = await readNext(reader, 3_000); + expect(next.done).toBe(false); + expect(Array.from(next.value ?? [])).toEqual([0xcc]); + + reader.releaseLock(); + } finally { + await streamer.close(); + } + }, 15_000); + + it('clears polling timer on natural EOF (no resource leak)', async () => { + // The polling fallback registers a `setInterval`. Without explicit + // teardown on EOF, the timer keeps ticking forever once the consumer + // has finished reading — a slow drift that holds the event loop alive + // and accumulates per long-running stream. This test pins down the + // contract: after `done: true`, every `setInterval` we created must + // have been cleared. + const setSpy = vi.spyOn(globalThis, 'setInterval'); + const clearSpy = vi.spyOn(globalThis, 'clearInterval'); + const POLL_MS = 73; + + const streamer = createStreamer(pool, drizzle, { pollIntervalMs: POLL_MS }); + try { + const streamName = 'stream-eof-cleanup'; + const stream = await streamer.streams.get('run-eof', streamName); + const reader = stream.getReader(); + + await streamer.streams.write( + 'run-eof', + streamName, + new Uint8Array([1, 2]) + ); + await streamer.streams.close('run-eof', streamName); + + const first = await readNext(reader, 3_000); + expect(first.done).toBe(false); + + const final = await readNext(reader, 3_000); + expect(final.done).toBe(true); + + // Find every interval the streamer scheduled at our test poll cadence + // (filter by delay so any unrelated intervals from other libs don't + // pollute the assertion). Each one must appear in clearInterval + // calls. We do NOT call reader.cancel() — the point of this test is + // that EOF alone is enough to release resources. + const ourIntervalIds = setSpy.mock.results + .filter((_, i) => setSpy.mock.calls[i][1] === POLL_MS) + .map((r) => r.value); + expect(ourIntervalIds.length).toBeGreaterThan(0); + + const clearedIds = new Set(clearSpy.mock.calls.map((c) => c[0])); + for (const id of ourIntervalIds) { + expect(clearedIds.has(id)).toBe(true); + } + + reader.releaseLock(); + } finally { + setSpy.mockRestore(); + clearSpy.mockRestore(); + await streamer.close(); + } + }, 10_000); + + it('listenChannel reconnects after its backend is terminated', async () => { + // Low-level test for listenChannel itself: terminate its dedicated + // backend, wait past the initial 250ms backoff, then fire a NOTIFY. + // A successful reconnect means the payload reaches our handler. + const received: string[] = []; + const sub = await listenChannel( + pool, + 'workflow_test_reconnect', + async (payload) => { + received.push(payload); + } + ); + + try { + await new Promise((r) => setTimeout(r, 100)); + + await killOtherBackends(connectionString); + + // Backoff is 250ms → 30s. First reconnect attempt fires at ~250ms. + // Wait long enough for at least one reconnect cycle to succeed. + await new Promise((r) => setTimeout(r, 1_500)); + + // Notify retried in case the issuing pool connection itself is racing + // a reconnect. + let notified = false; + for (let attempt = 0; attempt < 3 && !notified; attempt++) { + try { + await pool.query(`SELECT pg_notify('workflow_test_reconnect', $1)`, [ + 'hello-after-reconnect', + ]); + notified = true; + } catch { + await new Promise((r) => setTimeout(r, 200)); + } + } + expect(notified).toBe(true); + + // Allow propagation through the reconnected client. + const deadline = Date.now() + 5_000; + while (received.length === 0 && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 100)); + } + + expect(received).toContain('hello-after-reconnect'); + } finally { + await sub.close(); + } + }, 20_000); +});