diff --git a/.changeset/huge-guests-smash.md b/.changeset/huge-guests-smash.md new file mode 100644 index 0000000000..6fc0470cab --- /dev/null +++ b/.changeset/huge-guests-smash.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-vercel": patch +--- + +Propagate client stream cancellation diff --git a/.changeset/stream-reconnect-at-getreadable.md b/.changeset/stream-reconnect-at-getreadable.md new file mode 100644 index 0000000000..0850af0ce9 --- /dev/null +++ b/.changeset/stream-reconnect-at-getreadable.md @@ -0,0 +1,5 @@ +--- +'@workflow/core': patch +--- + +Auto-reconnect `getReadable()` streams on server close or transient errors diff --git a/.changeset/stream-reconnect-stable.md b/.changeset/stream-reconnect-stable.md deleted file mode 100644 index 150734b2fa..0000000000 --- a/.changeset/stream-reconnect-stable.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -'@workflow/world-vercel': patch ---- - -Use stream control frame to transparently reconnect on server timeout diff --git a/docs/content/docs/deploying/world/vercel-world.mdx b/docs/content/docs/deploying/world/vercel-world.mdx index dcfaef57db..dc2d9a1d0e 100644 --- a/docs/content/docs/deploying/world/vercel-world.mdx +++ b/docs/content/docs/deploying/world/vercel-world.mdx @@ -48,6 +48,8 @@ For self-hosted deployments, use the [Postgres World](/worlds/postgres). For loc - **Data residency** - The Vercel World is currently deployed in the `iad1` region. This means independently of the deployment location of your application, the data for your workflows will be stored in the `iad1` region. +- **Stream routes need `supportsCancellation`** - Routes that pipe `run.getReadable()` back to a client keep running — and billing — until the function's max duration, even after the client disconnects. Set [`supportsCancellation`](https://vercel.com/docs/functions/request-cancellation) in `vercel.json` for those routes so client aborts tear the invocation down. See [Streaming — Resuming Streams from a Specific Point](/docs/foundations/streaming#resuming-streams-from-a-specific-point). + ## Observability Workflow observability is built into the Vercel dashboard on your project page. It respects your existing authentication and project permission settings. diff --git a/docs/content/docs/foundations/streaming.mdx b/docs/content/docs/foundations/streaming.mdx index b21a0ea01c..7773af2f83 100644 --- a/docs/content/docs/foundations/streaming.mdx +++ b/docs/content/docs/foundations/streaming.mdx @@ -87,6 +87,12 @@ export async function GET( This allows clients to reconnect and continue receiving data from where they left off, rather than restarting from the beginning. + +**Vercel: long-lived stream routes need `supportsCancellation`** + +When a route like the one above pipes `run.getReadable()` out to a client on Vercel, the function keeps running — and billing — until the function's configured max duration, even after the client disconnects. Set [`supportsCancellation`](https://vercel.com/docs/functions/request-cancellation) in `vercel.json` for routes that stream workflow output so Vercel forwards the client abort signal and tears the invocation down when the client goes away. + + `startIndex` also supports **negative values** to read relative to the end of the stream. For example, `startIndex: -5` starts 5 chunks before the current end. This is useful when you want to show the most recent output without reading the entire stream history. On an active (not-yet-closed) stream, the negative index resolves relative to the chunk count at connection time; any chunks written afterward are still delivered normally. diff --git a/packages/core/src/reconnecting-framed-stream.test.ts b/packages/core/src/reconnecting-framed-stream.test.ts new file mode 100644 index 0000000000..72105ac726 --- /dev/null +++ b/packages/core/src/reconnecting-framed-stream.test.ts @@ -0,0 +1,241 @@ +import type { World } from '@workflow/world'; +import { afterEach, describe, expect, it, vi } from 'vitest'; + +vi.mock('./version.js', () => ({ version: '0.0.0-test' })); + +import { setWorld } from './runtime/world.js'; +import { createReconnectingFramedStream } from './serialization.js'; + +const FRAME_HEADER_SIZE = 4; + +function encodeFrame(payload: Uint8Array): Uint8Array { + const out = new Uint8Array(FRAME_HEADER_SIZE + payload.length); + new DataView(out.buffer).setUint32(0, payload.length, false); + out.set(payload, FRAME_HEADER_SIZE); + return out; +} + +function payloadFrame(n: number): Uint8Array { + return encodeFrame(new Uint8Array([n])); +} + +/** + * Build a stream from a scripted pull sequence. Each entry either + * enqueues a value or errors — this keeps the stream from transitioning + * to the errored state before earlier values are actually read (which + * `start()`-time `controller.error` does immediately). + */ +function scriptedStream( + steps: Array< + | { kind: 'value'; value: Uint8Array } + | { kind: 'error'; err: unknown } + | { kind: 'close' } + >, + onCancel?: (reason?: unknown) => void +): ReadableStream { + let i = 0; + return new ReadableStream({ + pull(controller) { + const step = steps[i++]; + if (!step) { + controller.close(); + return; + } + if (step.kind === 'value') controller.enqueue(step.value); + else if (step.kind === 'error') controller.error(step.err); + else controller.close(); + }, + cancel(reason) { + onCancel?.(reason); + }, + }); +} + +async function readAll( + stream: ReadableStream +): Promise { + const reader = stream.getReader(); + const chunks: Uint8Array[] = []; + for (;;) { + const r = await reader.read(); + if (r.done) break; + if (r.value) chunks.push(r.value); + } + return chunks; +} + +/** + * Builds a mock world whose readFromStream returns a prepared + * sequence per `startIndex`. Each call records the requested startIndex + * so assertions can check reconnect positioning. + */ +function makeWorldWithScriptedStreams( + scripts: Record ReadableStream> +): { world: World; calls: number[] } { + const calls: number[] = []; + const world = { + readFromStream: vi.fn(async (_name: string, startIndex?: number) => { + const idx = startIndex ?? 0; + calls.push(idx); + const factory = scripts[idx]; + if (!factory) { + throw new Error(`unexpected startIndex ${idx}`); + } + return factory(); + }), + } as unknown as World; + return { world, calls }; +} + +describe('createReconnectingFramedStream', () => { + afterEach(() => { + setWorld(undefined as unknown as World); + }); + + it('passes through complete frames and closes cleanly on EOF', async () => { + const { world, calls } = makeWorldWithScriptedStreams({ + 0: () => + scriptedStream([ + { kind: 'value', value: payloadFrame(1) }, + { kind: 'value', value: payloadFrame(2) }, + { kind: 'value', value: payloadFrame(3) }, + { kind: 'close' }, + ]), + }); + setWorld(world); + + const stream = createReconnectingFramedStream('s', 0); + const chunks = await readAll(stream); + + expect(chunks).toEqual([payloadFrame(1), payloadFrame(2), payloadFrame(3)]); + expect(calls).toEqual([0]); + }); + + it('forwards a frame delivered across multiple reads', async () => { + const full = payloadFrame(42); + const { world } = makeWorldWithScriptedStreams({ + 0: () => + scriptedStream([ + // Split frame into 3 byte-level reads to prove boundary-aware + // buffering works regardless of transport chunking. + { kind: 'value', value: full.slice(0, 2) }, + { kind: 'value', value: full.slice(2, 4) }, + { kind: 'value', value: full.slice(4) }, + { kind: 'close' }, + ]), + }); + setWorld(world); + + const stream = createReconnectingFramedStream('s', 0); + const chunks = await readAll(stream); + + expect(chunks).toHaveLength(1); + expect(chunks[0]).toEqual(full); + }); + + it('reconnects with startIndex = consumed count on upstream error', async () => { + const { world, calls } = makeWorldWithScriptedStreams({ + 0: () => + scriptedStream([ + { kind: 'value', value: payloadFrame(1) }, + { kind: 'value', value: payloadFrame(2) }, + // Simulate server 2-minute abort mid-frame: deliver the first + // 3 bytes of a frame then error. The wrapper should discard + // those partial bytes and reopen at the right index. + { kind: 'value', value: payloadFrame(3).slice(0, 3) }, + { kind: 'error', err: new Error('max-duration abort') }, + ]), + 2: () => + scriptedStream([ + { kind: 'value', value: payloadFrame(3) }, + { kind: 'value', value: payloadFrame(4) }, + { kind: 'close' }, + ]), + }); + setWorld(world); + + const stream = createReconnectingFramedStream('s', 0); + const chunks = await readAll(stream); + + expect(chunks).toEqual([ + payloadFrame(1), + payloadFrame(2), + payloadFrame(3), + payloadFrame(4), + ]); + // First connection: startIndex=0. After 2 frames consumed, reconnect + // opens a fresh stream at startIndex=2. + expect(calls).toEqual([0, 2]); + }); + + it('respects an initial non-zero startIndex on reconnect', async () => { + const { world, calls } = makeWorldWithScriptedStreams({ + 10: () => + scriptedStream([ + { kind: 'value', value: payloadFrame(10) }, + { kind: 'error', err: new Error('abort') }, + ]), + 11: () => + scriptedStream([ + { kind: 'value', value: payloadFrame(11) }, + { kind: 'close' }, + ]), + }); + setWorld(world); + + const stream = createReconnectingFramedStream('s', 10); + const chunks = await readAll(stream); + + expect(chunks).toEqual([payloadFrame(10), payloadFrame(11)]); + expect(calls).toEqual([10, 11]); + }); + + it('does not reconnect when startIndex is negative', async () => { + const { world, calls } = makeWorldWithScriptedStreams({ + [-5]: () => + scriptedStream([ + { kind: 'value', value: payloadFrame(99) }, + { kind: 'error', err: new Error('abort') }, + ]), + }); + setWorld(world); + + const stream = createReconnectingFramedStream('s', -5); + await expect(readAll(stream)).rejects.toThrow(/abort/); + expect(calls).toEqual([-5]); + }); + + it('cancel aborts the upstream reader', async () => { + const cancelSpy = vi.fn(); + const { world } = makeWorldWithScriptedStreams({ + 0: () => { + // Keep the upstream pending after the first value so cancel + // actually has a live stream to abort; an auto-closed upstream + // would swallow the cancel per web-streams spec. + let pulls = 0; + return new ReadableStream({ + async pull(controller) { + if (pulls++ === 0) { + controller.enqueue(payloadFrame(1)); + return; + } + await new Promise(() => {}); // hang forever + }, + cancel(reason) { + cancelSpy(reason); + }, + }); + }, + }); + setWorld(world); + + const stream = createReconnectingFramedStream('s', 0); + const reader = stream.getReader(); + const first = await reader.read(); + expect(first.done).toBe(false); + + await reader.cancel('client abort'); + + expect(cancelSpy).toHaveBeenCalled(); + }); +}); diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 37f931026b..cb391217b4 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -455,6 +455,153 @@ export class WorkflowServerReadableStream extends ReadableStream { } } +/** + * Maximum consecutive reconnect attempts for a single framed stream session. + * On serverfull backends, reconnects should only happen during transient errors. + * For serverless backends, we set this constant high enough to cover at least + * 10 minutes even if the server would be limited to e.g. 1 minute per session. + */ +const FRAMED_STREAM_MAX_RECONNECTS = 50; + +/** + * Wraps the length-prefix-framed byte WorkflowServerReadableStream + * with transparent auto-reconnect. + * + * Every fully-decoded outer frame corresponds to exactly one server-side + * chunk (the serialize transform enqueues one frame per workflow write, and + * the writable buffers one frame per chunk when multi-writing). The wrapper + * counts completed frames and, on upstream error, reopens the connection + * with `startIndex = resolvedStartIndex + consumedFrames`. Partial-frame + * bytes buffered before the cut are discarded — the server will resend the + * in-flight chunk in full from the new startIndex. + * + * A clean upstream close (EOF with no error) signals the stream is truly + * done; we close the wrapper and do not reconnect. + * + * Negative `startIndex` values (last-N semantics) skip the reconnect + * machinery because we cannot compute an absolute resume position without + * a tail-index lookup — the returned stream behaves as a single-shot read. + */ +export function createReconnectingFramedStream( + name: string, + startIndex?: number +): ReadableStream { + const reconnectSupported = startIndex === undefined || startIndex >= 0; + let currentStartIndex = startIndex ?? 0; + let consumedFrames = 0; + let reconnectCount = 0; + let reader: ReadableStreamDefaultReader | undefined; + let buffer = new Uint8Array(0); + + async function connect(): Promise { + const world = getWorld(); + const effectiveStartIndex = reconnectSupported + ? currentStartIndex + consumedFrames + : startIndex; + const stream = await world.readFromStream(name, effectiveStartIndex); + reader = stream.getReader(); + } + + async function reconnect(): Promise { + reconnectCount++; + if (reconnectCount > FRAMED_STREAM_MAX_RECONNECTS) { + throw new Error( + `Stream "${name}" exceeded maximum reconnection attempts (${FRAMED_STREAM_MAX_RECONNECTS})` + ); + } + if (reader) { + await reader.cancel().catch(() => {}); + reader = undefined; + } + currentStartIndex += consumedFrames; + consumedFrames = 0; + buffer = new Uint8Array(0); + await connect(); + } + + return new ReadableStream({ + pull: async (controller) => { + // Loop until we emit something, hit EOF, or fatally error. Reads that + // only extend the in-flight-frame buffer don't enqueue anything — we + // keep reading rather than returning empty-handed. + for (;;) { + if (!reader) { + try { + await connect(); + } catch (err) { + controller.error(err); + return; + } + } + + let result: { done: boolean; value?: Uint8Array }; + try { + // biome-ignore lint/style/noNonNullAssertion: connect() guarantees reader + result = await reader!.read(); + } catch (err) { + if (!reconnectSupported) { + controller.error(err); + return; + } + try { + await reconnect(); + } catch (reconnectErr) { + controller.error(reconnectErr); + return; + } + continue; + } + + if (result.done || !result.value) { + // Clean EOF — stream is truly complete. Drop any partial-frame + // bytes (there shouldn't be any; a well-formed stream ends on a + // frame boundary). + reader = undefined; + controller.close(); + return; + } + + // Append to buffer and emit all complete frames. + const incoming = result.value; + if (incoming.length > 0) { + const combined = new Uint8Array(buffer.length + incoming.length); + combined.set(buffer, 0); + combined.set(incoming, buffer.length); + buffer = combined; + } + + let emitted = false; + while (buffer.length >= FRAME_HEADER_SIZE) { + const frameLength = new DataView( + buffer.buffer, + buffer.byteOffset, + buffer.byteLength + ).getUint32(0, false); + const total = FRAME_HEADER_SIZE + frameLength; + if (buffer.length < total) break; + // Forward the entire framed chunk (header + payload) to the + // downstream deserializer, which already expects this layout. + controller.enqueue(buffer.slice(0, total)); + buffer = buffer.slice(total); + consumedFrames++; + emitted = true; + } + + if (emitted) return; + // Only partial bytes — read more. + } + }, + cancel: async () => { + if (reader) { + await reader.cancel().catch((err) => { + console.warn('Error closing ReadableStream reader:', err); + }); + reader = undefined; + } + }, + }); +} + /** * Default flush interval in milliseconds for buffered stream writes. * Chunks are accumulated and flushed together to reduce network overhead. @@ -1209,12 +1356,15 @@ export function getExternalRevivers( return response.body; } - const readable = new WorkflowServerReadableStream( - value.name, - value.startIndex - ); if (value.type === 'bytes') { - // For byte streams, use flushable pipe with lock polling + // For byte streams, use flushable pipe with lock polling. No + // auto-reconnect here: raw byte streams have no wire framing, so + // the caller owns its own reconnect strategy if it needs one (see + // WorkflowChatTransport for an app-level example). + const readable = new WorkflowServerReadableStream( + value.name, + value.startIndex + ); const state = createFlushableState(); ops.push(state.promise); @@ -1232,6 +1382,12 @@ export function getExternalRevivers( return userReadable; } else { + // Non-byte streams carry length-prefixed frames, so we can count + // completed frames and transparently reconnect. + const readable = createReconnectingFramedStream( + value.name, + value.startIndex + ); const transform = getDeserializeStream( getExternalRevivers(global, ops, runId, cryptoKey), cryptoKey diff --git a/packages/world-vercel/src/streamer.test.ts b/packages/world-vercel/src/streamer.test.ts index 21b587e5e5..2efcfda3e9 100644 --- a/packages/world-vercel/src/streamer.test.ts +++ b/packages/world-vercel/src/streamer.test.ts @@ -1,10 +1,5 @@ import { afterEach, describe, expect, it, vi } from 'vitest'; -import { - encodeMultiChunks, - MAX_CHUNKS_PER_REQUEST, - parseStreamControlFrame, - STREAM_CONTROL_FRAME_SIZE, -} from './streamer.js'; +import { encodeMultiChunks, MAX_CHUNKS_PER_REQUEST } from './streamer.js'; describe('encodeMultiChunks', () => { /** @@ -275,131 +270,7 @@ describe('writeToStreamMulti pagination', () => { }); }); -/** - * Build a control frame matching the workflow-server format. - */ -function buildControlFrame(done: boolean, nextIndex: number): Uint8Array { - const frame = new Uint8Array(STREAM_CONTROL_FRAME_SIZE); - // Bytes 0-3: zero-frame marker (already 0x00) - frame[4] = done ? 1 : 0; - new DataView(frame.buffer).setUint32(5, nextIndex, false); - // Magic footer "WFCT" - frame.set(new Uint8Array([0x57, 0x46, 0x43, 0x54]), 9); - return frame; -} - -describe('parseStreamControlFrame', () => { - it('parses a valid done=true control frame', () => { - const frame = buildControlFrame(true, 42); - const result = parseStreamControlFrame(frame); - expect(result).toEqual({ - done: true, - nextIndex: 42, - totalLength: STREAM_CONTROL_FRAME_SIZE, - }); - }); - - it('parses a valid done=false (timeout) control frame', () => { - const frame = buildControlFrame(false, 100); - const result = parseStreamControlFrame(frame); - expect(result).toEqual({ - done: false, - nextIndex: 100, - totalLength: STREAM_CONTROL_FRAME_SIZE, - }); - }); - - it('parses control frame appended after data bytes', () => { - const data = new Uint8Array([1, 2, 3, 4, 5]); - const frame = buildControlFrame(false, 7); - const combined = new Uint8Array(data.length + frame.length); - combined.set(data, 0); - combined.set(frame, data.length); - - const result = parseStreamControlFrame(combined); - expect(result).toEqual({ - done: false, - nextIndex: 7, - totalLength: STREAM_CONTROL_FRAME_SIZE, - }); - }); - - it('returns null for buffer shorter than control frame size', () => { - expect(parseStreamControlFrame(new Uint8Array(12))).toBeNull(); - expect(parseStreamControlFrame(new Uint8Array(0))).toBeNull(); - }); - - it('returns null when magic footer does not match', () => { - const frame = buildControlFrame(true, 0); - frame[12] = 0xff; // corrupt magic footer - expect(parseStreamControlFrame(frame)).toBeNull(); - }); - - it('returns null when zero-frame marker is not all zeros', () => { - const frame = buildControlFrame(true, 0); - frame[0] = 1; // corrupt zero-frame marker - expect(parseStreamControlFrame(frame)).toBeNull(); - }); - - it('handles nextIndex=0', () => { - const frame = buildControlFrame(false, 0); - const result = parseStreamControlFrame(frame); - expect(result).toEqual({ - done: false, - nextIndex: 0, - totalLength: STREAM_CONTROL_FRAME_SIZE, - }); - }); - - it('handles large nextIndex values', () => { - const frame = buildControlFrame(true, 0xffffffff); - const result = parseStreamControlFrame(frame); - expect(result?.nextIndex).toBe(0xffffffff); - }); -}); - -describe('readFromStream reconnection', () => { - /** Collect every byte from a ReadableStream into one Uint8Array. */ - async function drain( - stream: ReadableStream - ): Promise { - const reader = stream.getReader(); - const parts: Uint8Array[] = []; - for (;;) { - const { done, value } = await reader.read(); - if (done) break; - parts.push(value); - } - const len = parts.reduce((s, p) => s + p.length, 0); - const out = new Uint8Array(len); - let off = 0; - for (const p of parts) { - out.set(p, off); - off += p.length; - } - return out; - } - - function chunkedStream(chunks: Uint8Array[]): ReadableStream { - let i = 0; - return new ReadableStream({ - pull(controller) { - if (i < chunks.length) { - controller.enqueue(chunks[i++]); - } else { - controller.close(); - } - }, - }); - } - - function streamResponse(...chunks: Uint8Array[]): Response { - return new Response(chunkedStream(chunks), { - status: 200, - headers: { 'Content-Type': 'application/octet-stream' }, - }); - } - +describe('readFromStream', () => { async function getStreamer() { const { createStreamer } = await import('./streamer.js'); return createStreamer(); @@ -409,107 +280,43 @@ describe('readFromStream reconnection', () => { vi.restoreAllMocks(); }); - it('reconnects when server sends done=false and resumes from nextIndex', async () => { - const chunk1 = new TextEncoder().encode('aaa'); - const chunk2 = new TextEncoder().encode('bbb'); - const timeout = buildControlFrame(false, 3); - const done = buildControlFrame(true, 6); - - const fetchSpy = vi - .spyOn(globalThis, 'fetch') - .mockResolvedValueOnce(streamResponse(chunk1, timeout)) - .mockResolvedValueOnce(streamResponse(chunk2, done)); - - const streamer = await getStreamer(); - const result = await drain(await streamer.readFromStream('strm_test')); - - const expected = new Uint8Array([...chunk1, ...chunk2]); - expect(result).toEqual(expected); - expect(fetchSpy).toHaveBeenCalledTimes(2); - - const secondUrl = new URL(fetchSpy.mock.calls[1][0] as string); - expect(secondUrl.searchParams.get('startIndex')).toBe('3'); - }); - - it('falls through when no control frame is present (backward compat)', async () => { - const data = new TextEncoder().encode('legacy server'); - - vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce(streamResponse(data)); + it('aborts the fetch when the returned stream is cancelled', async () => { + let capturedSignal: AbortSignal | undefined; + // Upstream that stays pending until aborted — mirrors a long-running + // server stream that cancel() should tear down. + const makePendingResponse = (signal: AbortSignal) => { + const body = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])); + signal.addEventListener('abort', () => { + controller.error( + new DOMException('The operation was aborted.', 'AbortError') + ); + }); + }, + }); + return new Response(body, { status: 200 }); + }; - const streamer = await getStreamer(); - const result = await drain(await streamer.readFromStream('strm_test')); - - expect(result).toEqual(data); - }); - - it('propagates network error to consumer without retrying', async () => { - const data = new TextEncoder().encode('partial'); - - let callCount = 0; - const errorStream = new ReadableStream({ - pull(controller) { - if (callCount === 0) { - callCount++; - controller.enqueue(data); - } else { - controller.error(new Error('connection reset')); - } - }, + vi.spyOn(globalThis, 'fetch').mockImplementation(async (_url, init) => { + capturedSignal = init?.signal ?? undefined; + if (!capturedSignal) throw new Error('expected fetch to receive signal'); + return makePendingResponse(capturedSignal); }); - vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce( - new Response(errorStream, { status: 200 }) - ); - const streamer = await getStreamer(); - // readFromStream returns the ReadableStream before the error surfaces, - // so the error propagates while draining — not on readFromStream itself. - const stream = await streamer.readFromStream('strm_test'); - await expect(drain(stream)).rejects.toThrow('connection reset'); - }); + const stream = await streamer.readFromStream('s'); - // Regression test for the 4.2.3 streaming break: a prior fix drained the - // upstream via `await response.arrayBuffer()` before returning the - // ReadableStream, which caused `run.getReadable()` to block until the - // entire stream had completed on the server — nothing arrived in the UI - // incrementally. This test asserts that data flows to the consumer before - // upstream close; any buffer-then-replay rewrite fails here. - it('emits upstream bytes to the consumer before the stream closes', async () => { - // 100-byte chunk: 13 held back for control-frame detection, 87 must - // reach the consumer without waiting for close. - const chunk = new Uint8Array(100).fill(0x41); - - let upstreamController!: ReadableStreamDefaultController; - const body = new ReadableStream({ - start(c) { - upstreamController = c; - }, - }); - vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce( - new Response(body, { status: 200 }) - ); + expect(capturedSignal).toBeDefined(); + expect(capturedSignal?.aborted).toBe(false); - const streamer = await getStreamer(); - const stream = await streamer.readFromStream('strm_test'); const reader = stream.getReader(); + // Drain the first chunk so the pump is live. + const first = await reader.read(); + expect(first.done).toBe(false); - // Deliver the chunk but DO NOT close upstream yet. - upstreamController.enqueue(chunk); - - // Consumer must receive the non-held-back prefix without waiting for - // upstream close. Cap the wait so a buffered impl fails fast. - const result = await Promise.race([ - reader.read(), - new Promise<'TIMED_OUT'>((resolve) => - setTimeout(() => resolve('TIMED_OUT'), 200) - ), - ]); - - expect(result).not.toBe('TIMED_OUT'); - if (result === 'TIMED_OUT') return; - expect(result.done).toBe(false); - expect(result.value?.length).toBe(100 - STREAM_CONTROL_FRAME_SIZE); + await reader.cancel('client done'); - await reader.cancel(); + expect(capturedSignal?.aborted).toBe(true); }); }); diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index eec5e81da2..8bc4b95df1 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -23,73 +23,6 @@ export const MAX_CHUNKS_PER_REQUEST = 1000; // (partial writes, long-lived reads), and duplex streams are incompatible // with undici's experimental H2 support. -/** - * Stream control frame constants, mirroring workflow-server's format. - * - * Control frame (13 bytes): - * [0-3] Zero-frame marker (0x00 0x00 0x00 0x00) - * [4] Flags — bit 0: done (1 = complete, 0 = timeout/reconnect) - * [5-8] nextIndex — big-endian uint32, chunk index to resume from - * [9-12] Magic footer — "WFCT" (0x57 0x46 0x43 0x54) - */ -export const STREAM_CONTROL_FRAME_SIZE = 13; -const STREAM_CONTROL_MAGIC = new Uint8Array([0x57, 0x46, 0x43, 0x54]); - -export interface StreamControlFrame { - done: boolean; - nextIndex: number; -} - -/** - * Try to parse a stream control frame from the tail of a buffer. - * Returns the parsed frame and the byte length of the control data, - * or null if no valid control frame is present. - */ -export function parseStreamControlFrame( - buffer: Uint8Array -): (StreamControlFrame & { totalLength: number }) | null { - if (buffer.length < STREAM_CONTROL_FRAME_SIZE) return null; - - const offset = buffer.length - STREAM_CONTROL_FRAME_SIZE; - - // Check zero-frame marker (bytes 0-3 must be 0x00) - if ( - buffer[offset] !== 0 || - buffer[offset + 1] !== 0 || - buffer[offset + 2] !== 0 || - buffer[offset + 3] !== 0 - ) { - return null; - } - - // Check magic footer at bytes 9-12 - if ( - buffer[offset + 9] !== STREAM_CONTROL_MAGIC[0] || - buffer[offset + 10] !== STREAM_CONTROL_MAGIC[1] || - buffer[offset + 11] !== STREAM_CONTROL_MAGIC[2] || - buffer[offset + 12] !== STREAM_CONTROL_MAGIC[3] - ) { - return null; - } - - const flags = buffer[offset + 4]; - const view = new DataView(buffer.buffer, buffer.byteOffset + offset + 5, 4); - const nextIndex = view.getUint32(0, false); - - return { - done: (flags & 1) === 1, - nextIndex, - totalLength: STREAM_CONTROL_FRAME_SIZE, - }; -} - -function concatUint8Arrays(a: Uint8Array, b: Uint8Array): Uint8Array { - const result = new Uint8Array(a.length + b.length); - result.set(a, 0); - result.set(b, a.length); - return result; -} - function getStreamUrl( name: string, runId: string | undefined, @@ -255,122 +188,51 @@ export function createStreamer(config?: APIConfig): Streamer { }, async readFromStream(name: string, startIndex?: number) { - let currentStartIndex = startIndex ?? 0; - - // Cap reconnections to prevent infinite loops if the server - // never completes the stream. 50 reconnects at 2-min server - // timeout ≈ 100 minutes of streaming. - const MAX_RECONNECTS = 50; - let reconnectCount = 0; - - const connect = async (): Promise< - ReadableStreamDefaultReader - > => { - const httpConfig = await getHttpConfig(config); - // Use v3 to receive the stream control frame for reconnection. - const url = new URL( - `${httpConfig.baseUrl}/v3/stream/${encodeURIComponent(name)}` - ); - url.searchParams.set('startIndex', String(currentStartIndex)); - const response = await fetch(url, { - headers: httpConfig.headers, - }); - if (!response.ok) { - throw new Error(`Failed to fetch stream: ${response.status}`); - } - if (!response.body) { - throw new Error('No response body for stream'); - } - return (response.body as ReadableStream).getReader(); - }; - - let reader = await connect(); - - // Hold back the last STREAM_CONTROL_FRAME_SIZE bytes at all times - // so we can detect the control frame when the upstream closes. - // Surplus bytes are forwarded to the consumer as soon as they arrive — - // this preserves incremental streaming (critical for AI UIs, etc.). - let tailBuffer = new Uint8Array(0); - + const httpConfig = await getHttpConfig(config); + const url = getStreamUrl(name, undefined, httpConfig); + if (typeof startIndex === 'number') { + url.searchParams.set('startIndex', String(startIndex)); + } + // Attach an AbortController so cancelling the returned stream + // (from WorkflowServerReadableStream / getReadable consumers) + // actually aborts the in-flight HTTP request instead of leaving + // the socket hanging until the server times out. + const abortController = new AbortController(); + const response = await fetch(url, { + headers: httpConfig.headers, + signal: abortController.signal, + }); + if (!response.ok) { + throw new Error(`Failed to fetch stream: ${response.status}`); + } + if (!response.body) { + throw new Error('No response body for stream'); + } + const upstream = response.body as ReadableStream; + // Wrap the body so cancel() propagates to the AbortController — + // fetch implementations differ on whether cancelling the body + // alone tears down the socket. return new ReadableStream({ - pull: async (controller) => { - // Keep reading from the upstream until we have something to - // enqueue, close, or error. An iteration that only accumulates - // into the hold-back buffer (≤ 13 bytes total) loops without - // yielding, rather than returning and relying on the runtime to - // re-invoke pull when nothing was enqueued. - for (;;) { - let result: { done: boolean; value?: Uint8Array }; + start(controller) { + const reader = upstream.getReader(); + const pump = async () => { try { - result = await reader.read(); - } catch (err) { - // Network error — not a clean close. Flush any buffered - // bytes and surface the error so consumers know the stream - // was truncated. - if (tailBuffer.length > 0) { - controller.enqueue(tailBuffer); - tailBuffer = new Uint8Array(0); + for (;;) { + const { done, value } = await reader.read(); + if (done) { + controller.close(); + return; + } + controller.enqueue(value); } + } catch (err) { controller.error(err); - return; } - - if (!result.done) { - // Append new data and forward everything except the last - // STREAM_CONTROL_FRAME_SIZE bytes. - const combined = concatUint8Arrays(tailBuffer, result.value!); - if (combined.length > STREAM_CONTROL_FRAME_SIZE) { - const emitLen = combined.length - STREAM_CONTROL_FRAME_SIZE; - controller.enqueue(combined.subarray(0, emitLen)); - tailBuffer = combined.slice(emitLen); - return; - } - // Everything fits in the holdback buffer — keep reading. - tailBuffer = new Uint8Array(combined); - continue; - } - - // Upstream closed. Check the tail for a control frame. - const control = parseStreamControlFrame(tailBuffer); - - if (control) { - const dataLen = tailBuffer.length - control.totalLength; - if (dataLen > 0) { - controller.enqueue(tailBuffer.subarray(0, dataLen)); - } - tailBuffer = new Uint8Array(0); - - if (control.done) { - controller.close(); - return; - } - - // Timeout — reconnect from the next chunk index. - reconnectCount++; - if (reconnectCount > MAX_RECONNECTS) { - controller.error( - new Error( - `Stream exceeded maximum reconnection attempts (${MAX_RECONNECTS})` - ) - ); - return; - } - currentStartIndex = control.nextIndex; - reader = await connect(); - continue; - } - - // No control frame (older server). Forward the tail and close. - if (tailBuffer.length > 0) { - controller.enqueue(tailBuffer); - tailBuffer = new Uint8Array(0); - } - controller.close(); - return; - } + }; + pump(); }, - cancel: async () => { - await reader.cancel(); + cancel(reason) { + abortController.abort(reason); }, }); },