From fe1107662b09d851bc9e921b9a546b55c97116b2 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Sat, 2 May 2026 10:02:43 +0900 Subject: [PATCH] Revert "[world-vercel] Use stream control frame for transparent reconnection (#1790)" This reverts commit 5ef9ac2073660a0e14ccbef2d6228fad5155ac44. --- .changeset/stream-reconnect-stable.md | 5 - packages/world-vercel/src/streamer.test.ts | 246 +-------------------- packages/world-vercel/src/streamer.ts | 198 ++--------------- 3 files changed, 15 insertions(+), 434 deletions(-) delete mode 100644 .changeset/stream-reconnect-stable.md diff --git a/.changeset/stream-reconnect-stable.md b/.changeset/stream-reconnect-stable.md deleted file mode 100644 index 150734b2fa..0000000000 --- a/.changeset/stream-reconnect-stable.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -'@workflow/world-vercel': patch ---- - -Use stream control frame to transparently reconnect on server timeout diff --git a/packages/world-vercel/src/streamer.test.ts b/packages/world-vercel/src/streamer.test.ts index 21b587e5e5..f3046d5832 100644 --- a/packages/world-vercel/src/streamer.test.ts +++ b/packages/world-vercel/src/streamer.test.ts @@ -1,10 +1,5 @@ import { afterEach, describe, expect, it, vi } from 'vitest'; -import { - encodeMultiChunks, - MAX_CHUNKS_PER_REQUEST, - parseStreamControlFrame, - STREAM_CONTROL_FRAME_SIZE, -} from './streamer.js'; +import { encodeMultiChunks, MAX_CHUNKS_PER_REQUEST } from './streamer.js'; describe('encodeMultiChunks', () => { /** @@ -274,242 +269,3 @@ describe('writeToStreamMulti pagination', () => { ]); }); }); - -/** - * Build a control frame matching the workflow-server format. - */ -function buildControlFrame(done: boolean, nextIndex: number): Uint8Array { - const frame = new Uint8Array(STREAM_CONTROL_FRAME_SIZE); - // Bytes 0-3: zero-frame marker (already 0x00) - frame[4] = done ? 1 : 0; - new DataView(frame.buffer).setUint32(5, nextIndex, false); - // Magic footer "WFCT" - frame.set(new Uint8Array([0x57, 0x46, 0x43, 0x54]), 9); - return frame; -} - -describe('parseStreamControlFrame', () => { - it('parses a valid done=true control frame', () => { - const frame = buildControlFrame(true, 42); - const result = parseStreamControlFrame(frame); - expect(result).toEqual({ - done: true, - nextIndex: 42, - totalLength: STREAM_CONTROL_FRAME_SIZE, - }); - }); - - it('parses a valid done=false (timeout) control frame', () => { - const frame = buildControlFrame(false, 100); - const result = parseStreamControlFrame(frame); - expect(result).toEqual({ - done: false, - nextIndex: 100, - totalLength: STREAM_CONTROL_FRAME_SIZE, - }); - }); - - it('parses control frame appended after data bytes', () => { - const data = new Uint8Array([1, 2, 3, 4, 5]); - const frame = buildControlFrame(false, 7); - const combined = new Uint8Array(data.length + frame.length); - combined.set(data, 0); - combined.set(frame, data.length); - - const result = parseStreamControlFrame(combined); - expect(result).toEqual({ - done: false, - nextIndex: 7, - totalLength: STREAM_CONTROL_FRAME_SIZE, - }); - }); - - it('returns null for buffer shorter than control frame size', () => { - expect(parseStreamControlFrame(new Uint8Array(12))).toBeNull(); - expect(parseStreamControlFrame(new Uint8Array(0))).toBeNull(); - }); - - it('returns null when magic footer does not match', () => { - const frame = buildControlFrame(true, 0); - frame[12] = 0xff; // corrupt magic footer - expect(parseStreamControlFrame(frame)).toBeNull(); - }); - - it('returns null when zero-frame marker is not all zeros', () => { - const frame = buildControlFrame(true, 0); - frame[0] = 1; // corrupt zero-frame marker - expect(parseStreamControlFrame(frame)).toBeNull(); - }); - - it('handles nextIndex=0', () => { - const frame = buildControlFrame(false, 0); - const result = parseStreamControlFrame(frame); - expect(result).toEqual({ - done: false, - nextIndex: 0, - totalLength: STREAM_CONTROL_FRAME_SIZE, - }); - }); - - it('handles large nextIndex values', () => { - const frame = buildControlFrame(true, 0xffffffff); - const result = parseStreamControlFrame(frame); - expect(result?.nextIndex).toBe(0xffffffff); - }); -}); - -describe('readFromStream reconnection', () => { - /** Collect every byte from a ReadableStream into one Uint8Array. */ - async function drain( - stream: ReadableStream - ): Promise { - const reader = stream.getReader(); - const parts: Uint8Array[] = []; - for (;;) { - const { done, value } = await reader.read(); - if (done) break; - parts.push(value); - } - const len = parts.reduce((s, p) => s + p.length, 0); - const out = new Uint8Array(len); - let off = 0; - for (const p of parts) { - out.set(p, off); - off += p.length; - } - return out; - } - - function chunkedStream(chunks: Uint8Array[]): ReadableStream { - let i = 0; - return new ReadableStream({ - pull(controller) { - if (i < chunks.length) { - controller.enqueue(chunks[i++]); - } else { - controller.close(); - } - }, - }); - } - - function streamResponse(...chunks: Uint8Array[]): Response { - return new Response(chunkedStream(chunks), { - status: 200, - headers: { 'Content-Type': 'application/octet-stream' }, - }); - } - - async function getStreamer() { - const { createStreamer } = await import('./streamer.js'); - return createStreamer(); - } - - afterEach(() => { - vi.restoreAllMocks(); - }); - - it('reconnects when server sends done=false and resumes from nextIndex', async () => { - const chunk1 = new TextEncoder().encode('aaa'); - const chunk2 = new TextEncoder().encode('bbb'); - const timeout = buildControlFrame(false, 3); - const done = buildControlFrame(true, 6); - - const fetchSpy = vi - .spyOn(globalThis, 'fetch') - .mockResolvedValueOnce(streamResponse(chunk1, timeout)) - .mockResolvedValueOnce(streamResponse(chunk2, done)); - - const streamer = await getStreamer(); - const result = await drain(await streamer.readFromStream('strm_test')); - - const expected = new Uint8Array([...chunk1, ...chunk2]); - expect(result).toEqual(expected); - expect(fetchSpy).toHaveBeenCalledTimes(2); - - const secondUrl = new URL(fetchSpy.mock.calls[1][0] as string); - expect(secondUrl.searchParams.get('startIndex')).toBe('3'); - }); - - it('falls through when no control frame is present (backward compat)', async () => { - const data = new TextEncoder().encode('legacy server'); - - vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce(streamResponse(data)); - - const streamer = await getStreamer(); - const result = await drain(await streamer.readFromStream('strm_test')); - - expect(result).toEqual(data); - }); - - it('propagates network error to consumer without retrying', async () => { - const data = new TextEncoder().encode('partial'); - - let callCount = 0; - const errorStream = new ReadableStream({ - pull(controller) { - if (callCount === 0) { - callCount++; - controller.enqueue(data); - } else { - controller.error(new Error('connection reset')); - } - }, - }); - - vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce( - new Response(errorStream, { status: 200 }) - ); - - const streamer = await getStreamer(); - // readFromStream returns the ReadableStream before the error surfaces, - // so the error propagates while draining — not on readFromStream itself. - const stream = await streamer.readFromStream('strm_test'); - await expect(drain(stream)).rejects.toThrow('connection reset'); - }); - - // Regression test for the 4.2.3 streaming break: a prior fix drained the - // upstream via `await response.arrayBuffer()` before returning the - // ReadableStream, which caused `run.getReadable()` to block until the - // entire stream had completed on the server — nothing arrived in the UI - // incrementally. This test asserts that data flows to the consumer before - // upstream close; any buffer-then-replay rewrite fails here. - it('emits upstream bytes to the consumer before the stream closes', async () => { - // 100-byte chunk: 13 held back for control-frame detection, 87 must - // reach the consumer without waiting for close. - const chunk = new Uint8Array(100).fill(0x41); - - let upstreamController!: ReadableStreamDefaultController; - const body = new ReadableStream({ - start(c) { - upstreamController = c; - }, - }); - vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce( - new Response(body, { status: 200 }) - ); - - const streamer = await getStreamer(); - const stream = await streamer.readFromStream('strm_test'); - const reader = stream.getReader(); - - // Deliver the chunk but DO NOT close upstream yet. - upstreamController.enqueue(chunk); - - // Consumer must receive the non-held-back prefix without waiting for - // upstream close. Cap the wait so a buffered impl fails fast. - const result = await Promise.race([ - reader.read(), - new Promise<'TIMED_OUT'>((resolve) => - setTimeout(() => resolve('TIMED_OUT'), 200) - ), - ]); - - expect(result).not.toBe('TIMED_OUT'); - if (result === 'TIMED_OUT') return; - expect(result.done).toBe(false); - expect(result.value?.length).toBe(100 - STREAM_CONTROL_FRAME_SIZE); - - await reader.cancel(); - }); -}); diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index eec5e81da2..6945a98edc 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -23,73 +23,6 @@ export const MAX_CHUNKS_PER_REQUEST = 1000; // (partial writes, long-lived reads), and duplex streams are incompatible // with undici's experimental H2 support. -/** - * Stream control frame constants, mirroring workflow-server's format. - * - * Control frame (13 bytes): - * [0-3] Zero-frame marker (0x00 0x00 0x00 0x00) - * [4] Flags — bit 0: done (1 = complete, 0 = timeout/reconnect) - * [5-8] nextIndex — big-endian uint32, chunk index to resume from - * [9-12] Magic footer — "WFCT" (0x57 0x46 0x43 0x54) - */ -export const STREAM_CONTROL_FRAME_SIZE = 13; -const STREAM_CONTROL_MAGIC = new Uint8Array([0x57, 0x46, 0x43, 0x54]); - -export interface StreamControlFrame { - done: boolean; - nextIndex: number; -} - -/** - * Try to parse a stream control frame from the tail of a buffer. - * Returns the parsed frame and the byte length of the control data, - * or null if no valid control frame is present. - */ -export function parseStreamControlFrame( - buffer: Uint8Array -): (StreamControlFrame & { totalLength: number }) | null { - if (buffer.length < STREAM_CONTROL_FRAME_SIZE) return null; - - const offset = buffer.length - STREAM_CONTROL_FRAME_SIZE; - - // Check zero-frame marker (bytes 0-3 must be 0x00) - if ( - buffer[offset] !== 0 || - buffer[offset + 1] !== 0 || - buffer[offset + 2] !== 0 || - buffer[offset + 3] !== 0 - ) { - return null; - } - - // Check magic footer at bytes 9-12 - if ( - buffer[offset + 9] !== STREAM_CONTROL_MAGIC[0] || - buffer[offset + 10] !== STREAM_CONTROL_MAGIC[1] || - buffer[offset + 11] !== STREAM_CONTROL_MAGIC[2] || - buffer[offset + 12] !== STREAM_CONTROL_MAGIC[3] - ) { - return null; - } - - const flags = buffer[offset + 4]; - const view = new DataView(buffer.buffer, buffer.byteOffset + offset + 5, 4); - const nextIndex = view.getUint32(0, false); - - return { - done: (flags & 1) === 1, - nextIndex, - totalLength: STREAM_CONTROL_FRAME_SIZE, - }; -} - -function concatUint8Arrays(a: Uint8Array, b: Uint8Array): Uint8Array { - const result = new Uint8Array(a.length + b.length); - result.set(a, 0); - result.set(b, a.length); - return result; -} - function getStreamUrl( name: string, runId: string | undefined, @@ -255,124 +188,21 @@ export function createStreamer(config?: APIConfig): Streamer { }, async readFromStream(name: string, startIndex?: number) { - let currentStartIndex = startIndex ?? 0; - - // Cap reconnections to prevent infinite loops if the server - // never completes the stream. 50 reconnects at 2-min server - // timeout ≈ 100 minutes of streaming. - const MAX_RECONNECTS = 50; - let reconnectCount = 0; - - const connect = async (): Promise< - ReadableStreamDefaultReader - > => { - const httpConfig = await getHttpConfig(config); - // Use v3 to receive the stream control frame for reconnection. - const url = new URL( - `${httpConfig.baseUrl}/v3/stream/${encodeURIComponent(name)}` - ); - url.searchParams.set('startIndex', String(currentStartIndex)); - const response = await fetch(url, { - headers: httpConfig.headers, - }); - if (!response.ok) { - throw new Error(`Failed to fetch stream: ${response.status}`); - } - if (!response.body) { - throw new Error('No response body for stream'); - } - return (response.body as ReadableStream).getReader(); - }; - - let reader = await connect(); - - // Hold back the last STREAM_CONTROL_FRAME_SIZE bytes at all times - // so we can detect the control frame when the upstream closes. - // Surplus bytes are forwarded to the consumer as soon as they arrive — - // this preserves incremental streaming (critical for AI UIs, etc.). - let tailBuffer = new Uint8Array(0); - - return new ReadableStream({ - pull: async (controller) => { - // Keep reading from the upstream until we have something to - // enqueue, close, or error. An iteration that only accumulates - // into the hold-back buffer (≤ 13 bytes total) loops without - // yielding, rather than returning and relying on the runtime to - // re-invoke pull when nothing was enqueued. - for (;;) { - let result: { done: boolean; value?: Uint8Array }; - try { - result = await reader.read(); - } catch (err) { - // Network error — not a clean close. Flush any buffered - // bytes and surface the error so consumers know the stream - // was truncated. - if (tailBuffer.length > 0) { - controller.enqueue(tailBuffer); - tailBuffer = new Uint8Array(0); - } - controller.error(err); - return; - } - - if (!result.done) { - // Append new data and forward everything except the last - // STREAM_CONTROL_FRAME_SIZE bytes. - const combined = concatUint8Arrays(tailBuffer, result.value!); - if (combined.length > STREAM_CONTROL_FRAME_SIZE) { - const emitLen = combined.length - STREAM_CONTROL_FRAME_SIZE; - controller.enqueue(combined.subarray(0, emitLen)); - tailBuffer = combined.slice(emitLen); - return; - } - // Everything fits in the holdback buffer — keep reading. - tailBuffer = new Uint8Array(combined); - continue; - } - - // Upstream closed. Check the tail for a control frame. - const control = parseStreamControlFrame(tailBuffer); - - if (control) { - const dataLen = tailBuffer.length - control.totalLength; - if (dataLen > 0) { - controller.enqueue(tailBuffer.subarray(0, dataLen)); - } - tailBuffer = new Uint8Array(0); - - if (control.done) { - controller.close(); - return; - } - - // Timeout — reconnect from the next chunk index. - reconnectCount++; - if (reconnectCount > MAX_RECONNECTS) { - controller.error( - new Error( - `Stream exceeded maximum reconnection attempts (${MAX_RECONNECTS})` - ) - ); - return; - } - currentStartIndex = control.nextIndex; - reader = await connect(); - continue; - } - - // No control frame (older server). Forward the tail and close. - if (tailBuffer.length > 0) { - controller.enqueue(tailBuffer); - tailBuffer = new Uint8Array(0); - } - controller.close(); - return; - } - }, - cancel: async () => { - await reader.cancel(); - }, + const httpConfig = await getHttpConfig(config); + const url = getStreamUrl(name, undefined, httpConfig); + if (typeof startIndex === 'number') { + url.searchParams.set('startIndex', String(startIndex)); + } + const response = await fetch(url, { + headers: httpConfig.headers, }); + if (!response.ok) { + throw new Error(`Failed to fetch stream: ${response.status}`); + } + if (!response.body) { + throw new Error('No response body for stream'); + } + return response.body as ReadableStream; }, async getStreamChunks(