Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .changeset/stream-reconnect-stable.md

This file was deleted.

246 changes: 1 addition & 245 deletions packages/world-vercel/src/streamer.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import { afterEach, describe, expect, it, vi } from 'vitest';
import {
encodeMultiChunks,
MAX_CHUNKS_PER_REQUEST,
parseStreamControlFrame,
STREAM_CONTROL_FRAME_SIZE,
} from './streamer.js';
import { encodeMultiChunks, MAX_CHUNKS_PER_REQUEST } from './streamer.js';

describe('encodeMultiChunks', () => {
/**
Expand Down Expand Up @@ -274,242 +269,3 @@ describe('writeToStreamMulti pagination', () => {
]);
});
});

/**
* Build a control frame matching the workflow-server format.
*/
function buildControlFrame(done: boolean, nextIndex: number): Uint8Array {
const frame = new Uint8Array(STREAM_CONTROL_FRAME_SIZE);
// Bytes 0-3: zero-frame marker (already 0x00)
frame[4] = done ? 1 : 0;
new DataView(frame.buffer).setUint32(5, nextIndex, false);
// Magic footer "WFCT"
frame.set(new Uint8Array([0x57, 0x46, 0x43, 0x54]), 9);
return frame;
}

describe('parseStreamControlFrame', () => {
it('parses a valid done=true control frame', () => {
const frame = buildControlFrame(true, 42);
const result = parseStreamControlFrame(frame);
expect(result).toEqual({
done: true,
nextIndex: 42,
totalLength: STREAM_CONTROL_FRAME_SIZE,
});
});

it('parses a valid done=false (timeout) control frame', () => {
const frame = buildControlFrame(false, 100);
const result = parseStreamControlFrame(frame);
expect(result).toEqual({
done: false,
nextIndex: 100,
totalLength: STREAM_CONTROL_FRAME_SIZE,
});
});

it('parses control frame appended after data bytes', () => {
const data = new Uint8Array([1, 2, 3, 4, 5]);
const frame = buildControlFrame(false, 7);
const combined = new Uint8Array(data.length + frame.length);
combined.set(data, 0);
combined.set(frame, data.length);

const result = parseStreamControlFrame(combined);
expect(result).toEqual({
done: false,
nextIndex: 7,
totalLength: STREAM_CONTROL_FRAME_SIZE,
});
});

it('returns null for buffer shorter than control frame size', () => {
expect(parseStreamControlFrame(new Uint8Array(12))).toBeNull();
expect(parseStreamControlFrame(new Uint8Array(0))).toBeNull();
});

it('returns null when magic footer does not match', () => {
const frame = buildControlFrame(true, 0);
frame[12] = 0xff; // corrupt magic footer
expect(parseStreamControlFrame(frame)).toBeNull();
});

it('returns null when zero-frame marker is not all zeros', () => {
const frame = buildControlFrame(true, 0);
frame[0] = 1; // corrupt zero-frame marker
expect(parseStreamControlFrame(frame)).toBeNull();
});

it('handles nextIndex=0', () => {
const frame = buildControlFrame(false, 0);
const result = parseStreamControlFrame(frame);
expect(result).toEqual({
done: false,
nextIndex: 0,
totalLength: STREAM_CONTROL_FRAME_SIZE,
});
});

it('handles large nextIndex values', () => {
const frame = buildControlFrame(true, 0xffffffff);
const result = parseStreamControlFrame(frame);
expect(result?.nextIndex).toBe(0xffffffff);
});
});

describe('readFromStream reconnection', () => {
/** Collect every byte from a ReadableStream into one Uint8Array. */
async function drain(
stream: ReadableStream<Uint8Array>
): Promise<Uint8Array> {
const reader = stream.getReader();
const parts: Uint8Array[] = [];
for (;;) {
const { done, value } = await reader.read();
if (done) break;
parts.push(value);
}
const len = parts.reduce((s, p) => s + p.length, 0);
const out = new Uint8Array(len);
let off = 0;
for (const p of parts) {
out.set(p, off);
off += p.length;
}
return out;
}

function chunkedStream(chunks: Uint8Array[]): ReadableStream<Uint8Array> {
let i = 0;
return new ReadableStream({
pull(controller) {
if (i < chunks.length) {
controller.enqueue(chunks[i++]);
} else {
controller.close();
}
},
});
}

function streamResponse(...chunks: Uint8Array[]): Response {
return new Response(chunkedStream(chunks), {
status: 200,
headers: { 'Content-Type': 'application/octet-stream' },
});
}

async function getStreamer() {
const { createStreamer } = await import('./streamer.js');
return createStreamer();
}

afterEach(() => {
vi.restoreAllMocks();
});

it('reconnects when server sends done=false and resumes from nextIndex', async () => {
const chunk1 = new TextEncoder().encode('aaa');
const chunk2 = new TextEncoder().encode('bbb');
const timeout = buildControlFrame(false, 3);
const done = buildControlFrame(true, 6);

const fetchSpy = vi
.spyOn(globalThis, 'fetch')
.mockResolvedValueOnce(streamResponse(chunk1, timeout))
.mockResolvedValueOnce(streamResponse(chunk2, done));

const streamer = await getStreamer();
const result = await drain(await streamer.readFromStream('strm_test'));

const expected = new Uint8Array([...chunk1, ...chunk2]);
expect(result).toEqual(expected);
expect(fetchSpy).toHaveBeenCalledTimes(2);

const secondUrl = new URL(fetchSpy.mock.calls[1][0] as string);
expect(secondUrl.searchParams.get('startIndex')).toBe('3');
});

it('falls through when no control frame is present (backward compat)', async () => {
const data = new TextEncoder().encode('legacy server');

vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce(streamResponse(data));

const streamer = await getStreamer();
const result = await drain(await streamer.readFromStream('strm_test'));

expect(result).toEqual(data);
});

it('propagates network error to consumer without retrying', async () => {
const data = new TextEncoder().encode('partial');

let callCount = 0;
const errorStream = new ReadableStream<Uint8Array>({
pull(controller) {
if (callCount === 0) {
callCount++;
controller.enqueue(data);
} else {
controller.error(new Error('connection reset'));
}
},
});

vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce(
new Response(errorStream, { status: 200 })
);

const streamer = await getStreamer();
// readFromStream returns the ReadableStream before the error surfaces,
// so the error propagates while draining — not on readFromStream itself.
const stream = await streamer.readFromStream('strm_test');
await expect(drain(stream)).rejects.toThrow('connection reset');
});

// Regression test for the 4.2.3 streaming break: a prior fix drained the
// upstream via `await response.arrayBuffer()` before returning the
// ReadableStream, which caused `run.getReadable()` to block until the
// entire stream had completed on the server — nothing arrived in the UI
// incrementally. This test asserts that data flows to the consumer before
// upstream close; any buffer-then-replay rewrite fails here.
it('emits upstream bytes to the consumer before the stream closes', async () => {
// 100-byte chunk: 13 held back for control-frame detection, 87 must
// reach the consumer without waiting for close.
const chunk = new Uint8Array(100).fill(0x41);

let upstreamController!: ReadableStreamDefaultController<Uint8Array>;
const body = new ReadableStream<Uint8Array>({
start(c) {
upstreamController = c;
},
});
vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce(
new Response(body, { status: 200 })
);

const streamer = await getStreamer();
const stream = await streamer.readFromStream('strm_test');
const reader = stream.getReader();

// Deliver the chunk but DO NOT close upstream yet.
upstreamController.enqueue(chunk);

// Consumer must receive the non-held-back prefix without waiting for
// upstream close. Cap the wait so a buffered impl fails fast.
const result = await Promise.race([
reader.read(),
new Promise<'TIMED_OUT'>((resolve) =>
setTimeout(() => resolve('TIMED_OUT'), 200)
),
]);

expect(result).not.toBe('TIMED_OUT');
if (result === 'TIMED_OUT') return;
expect(result.done).toBe(false);
expect(result.value?.length).toBe(100 - STREAM_CONTROL_FRAME_SIZE);

await reader.cancel();
});
});
Loading
Loading