Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/world-postgres-listen-self-healing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world-postgres': patch
---

Make stream delivery durable across `LISTEN`/`NOTIFY` interruptions: the dedicated `pg.Client` now reconnects with bounded exponential backoff (250 ms → 30 s), and `streams.get` runs a periodic re-query of `streams WHERE chunk_id > lastChunkId` as a polling safety net for chunks delivered while the LISTEN socket was down. The poll interval is configurable via `PostgresWorldConfig.streamPollIntervalMs` (default 5000 ms; set to 0 to disable). Tracks vercel/workflow#1855.
8 changes: 8 additions & 0 deletions packages/world-postgres/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,12 @@ export type PostgresWorldConfig = PgConnectionConfig & {
* Default is 10ms. Set to 0 for immediate flushing.
*/
streamFlushIntervalMs?: number;
/**
* How often (ms) `streams.get` re-queries the `streams` table as a safety
* net for chunks delivered while the LISTEN client was reconnecting.
* Default is 5000. Set to 0 to disable polling entirely.
*
* See `CreateStreamerOptions.pollIntervalMs` for the full contract.
*/
streamPollIntervalMs?: number;
};
4 changes: 3 additions & 1 deletion packages/world-postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ export function createWorld(
const drizzle = createClient(pool);
const queue = createQueue(config, pool);
const storage = createStorage(drizzle);
const streamer = createStreamer(pool, drizzle);
const streamer = createStreamer(pool, drizzle, {
pollIntervalMs: config.streamPollIntervalMs,
});

return {
specVersion: SPEC_VERSION_CURRENT,
Expand Down
238 changes: 210 additions & 28 deletions packages/world-postgres/src/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,36 +45,123 @@ class Rc<T extends { drop(): void }> {

/**
* Subscribe to a PostgreSQL NOTIFY channel using a dedicated client created
* from the pool's connection options. `channel` must be a trusted identifier.
* from the pool's connection options. `channel` must be a trusted identifier
* (interpolated into the LISTEN statement; `pg` does not parameterise
* identifiers).
*
* The dedicated `Client` is long-lived and will eventually be dropped by the
* server (idle TCP timeout, pgbouncer rotation, k8s CNI eviction). Without
* reconnect handling, a process running for more than a few hours stops
* receiving notifications and only a restart restores delivery
* (cf. brianc/node-postgres#967).
*
* This implementation wraps the client in a reconnect loop with bounded
* exponential backoff (250 ms → 30 s cap). The initial connect must succeed
* (callers expect a live subscription before the promise resolves);
* subsequent reconnects are best-effort. Notifications fired while the
* dedicated client is reconnecting are lost on the wire — the polling
* fallback in {@link createStreamer}'s `streams.get` re-queries chunks from
* the database on its periodic tick, so end-to-end delivery stays correct
* even across LISTEN gaps.
*/
export const listenChannel = async (
pool: Pool,
channel: string,
onPayload: (payload: string) => Promise<void>
): Promise<{ close: () => Promise<void> }> => {
const client = new Client(pool.options);

try {
await client.connect();
await client.query(`LISTEN ${channel}`);
} catch (err) {
await client.end().catch(() => {});
throw err;
}
let client: Client | null = null;
let closed = false;
let reconnectAttempt = 0;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;

const onNotification = (msg: { payload?: string | undefined }) => {
onPayload(msg.payload ?? '').catch(() => {});
};

client.on('notification', onNotification);
const detach = (c: Client) => {
try {
c.removeListener('notification', onNotification);
} catch {
// listener may already be detached
}
c.end().catch(() => {});
};

const scheduleReconnect = () => {
if (closed || reconnectTimer) return;
const delay = Math.min(30_000, 250 * 2 ** reconnectAttempt);
reconnectAttempt++;
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
if (closed) return;
connect().catch((err) => {
console.warn(
'[world-postgres listenChannel] reconnect failed',
(err as Error)?.message ?? err
);
scheduleReconnect();
});
}, delay);
};

const connect = async () => {
if (closed) return;
const next = new Client(pool.options);
next.on('error', (err) => {
console.warn(
'[world-postgres listenChannel] client error',
(err as Error)?.message ?? err
);
if (client === next) client = null;
detach(next);
scheduleReconnect();
});
next.on('end', () => {
if (closed) return;
if (client === next) client = null;
scheduleReconnect();
});
try {
await next.connect();
await next.query(`LISTEN ${channel}`);
} catch (err) {
await next.end().catch(() => {});
throw err;
}
// `close()` may have been called while we were awaiting connect/LISTEN.
// Without this guard the freshly-connected client would attach a
// notification listener and survive past `close()` — a slow reconnect
// could outlive the subscription it's meant to back.
if (closed) {
await next.end().catch(() => {});
return;
}
next.on('notification', onNotification);
client = next;
reconnectAttempt = 0;
};

await connect();

return {
close: async () => {
client.removeListener('notification', onNotification);
closed = true;
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
const c = client;
client = null;
if (!c) return;
try {
await client.query(`UNLISTEN ${channel}`);
c.removeListener('notification', onNotification);
} catch {
// listener may already be detached
}
try {
await c.query(`UNLISTEN ${channel}`);
} finally {
await client.end();
await c.end().catch(() => {});
}
},
};
Expand All @@ -85,7 +172,28 @@ export type PostgresStreamer = Streamer & {
close(): Promise<void>;
};

export function createStreamer(pool: Pool, drizzle: Drizzle): PostgresStreamer {
export type CreateStreamerOptions = {
/**
* How often (ms) `streams.get` re-queries the `streams` table for chunks
* past `lastChunkId` as a safety net for notifications dropped while the
* LISTEN client was reconnecting. The poll dedupes against in-band
* notifications via the existing `enqueue` ordering check, so it is safe
* to run alongside `LISTEN/NOTIFY`.
*
* Lower values reduce recovery latency after a LISTEN disconnect; higher
* values reduce baseline DB load (one extra `SELECT` per active reader per
* tick). Set to `0` to disable polling — only do this if you know the
* LISTEN connection cannot be interrupted (e.g. tests). Default: 5000.
*/
pollIntervalMs?: number;
};

export function createStreamer(
pool: Pool,
drizzle: Drizzle,
options: CreateStreamerOptions = {}
): PostgresStreamer {
const pollIntervalMs = options.pollIntervalMs ?? 5_000;
const ulid = monotonicFactory();
const events = new EventEmitter<{
[key: `strm:${string}`]: [StreamChunkEvent];
Expand Down Expand Up @@ -361,17 +469,49 @@ export function createStreamer(pool: Pool, drizzle: Drizzle): PostgresStreamer {
let lastChunkId = '';
let offset = startIndex ?? 0;
let buffer = [] as StreamChunkEvent[] | null;
let polling = false;
let closed = false;
let pollTimer: ReturnType<typeof setInterval> | null = null;

function onData(data: StreamChunkEvent) {
if (buffer) {
buffer.push(data);
return;
}
enqueue(data);
}

// Idempotent teardown for the reader: detach the EventEmitter
// listener and clear the polling timer. Called both from
// `cancel()` (consumer aborts) and from `enqueue` on EOF
// (natural completion) so the polling timer doesn't keep
// ticking indefinitely after the controller has closed.
const stop = () => {
closed = true;
if (pollTimer) {
clearInterval(pollTimer);
pollTimer = null;
}
events.off(`strm:${name}`, onData);
};

function enqueue(msg: {
id: string;
data: Uint8Array;
eof: boolean;
}) {
if (lastChunkId >= msg.id) {
// already sent or out of order
if (closed || lastChunkId >= msg.id) {
// already sent, out of order, or stream torn down
return;
}

// Advance the high-water mark before any early return. The
// polling fallback re-queries `chunk_id > lastChunkId` and
// would otherwise re-enqueue chunks we intentionally skipped
// for `startIndex`, double-decrementing `offset` and
// eventually mis-delivering them.
lastChunkId = msg.id;

if (offset > 0) {
offset--;
return;
Expand All @@ -382,21 +522,12 @@ export function createStreamer(pool: Pool, drizzle: Drizzle): PostgresStreamer {
}
if (msg.eof) {
controller.close();
stop();
}
lastChunkId = msg.id;
}

function onData(data: StreamChunkEvent) {
if (buffer) {
buffer.push(data);
return;
}
enqueue(data);
}
events.on(`strm:${name}`, onData);
cleanups.push(() => {
events.off(`strm:${name}`, onData);
});
cleanups.push(stop);

const chunks = await drizzle
.select({
Expand All @@ -422,6 +553,57 @@ export function createStreamer(pool: Pool, drizzle: Drizzle): PostgresStreamer {
enqueue(chunk);
}
buffer = null;

// Polling fallback. NOTIFY is the fast path, but events are
// silently dropped while the dedicated LISTEN client is
// reconnecting. A light periodic re-query of chunks past
// `lastChunkId` is the always-on safety net: every
// `pollIntervalMs` it pulls any chunks the EventEmitter missed,
// deduped by the `enqueue` ordering check. The timer is cleared
// by `stop()` on EOF or cancel; transient query failures are
// logged so the next tick can retry.
const runPoll = async () => {
const fresh = await drizzle
.select({
id: streams.chunkId,
eof: streams.eof,
data: streams.chunkData,
})
.from(streams)
.where(
and(
eq(streams.streamId, name),
gt(streams.chunkId, lastChunkId as `chnk_${string}`)
)
)
.orderBy(streams.chunkId);
for (const chunk of fresh) {
if (closed) return;
enqueue(chunk);
}
};

const tick = async () => {
if (polling || closed) return;
polling = true;
try {
await runPoll();
} catch (err) {
// Best-effort. Logs only; the next tick retries.
console.warn(
'[world-postgres streams.get] poll failed',
(err as Error)?.message ?? err
);
} finally {
polling = false;
}
};

// Initial chunks may have already delivered EOF; in that case
// `stop()` cleared the flag and we don't start polling at all.
if (!closed && pollIntervalMs > 0) {
pollTimer = setInterval(tick, pollIntervalMs);
}
},
cancel() {
cleanups.forEach((fn) => void fn());
Expand Down
Loading
Loading