diff --git a/.changeset/byte-stream-wire-framing.md b/.changeset/byte-stream-wire-framing.md new file mode 100644 index 0000000000..6b9fcce276 --- /dev/null +++ b/.changeset/byte-stream-wire-framing.md @@ -0,0 +1,6 @@ +--- +"@workflow/core": minor +"workflow": minor +--- + +Add opt-in wire-level framing for byte streams (`type: 'bytes'`) so consumers can identify chunk boundaries — a prerequisite for transparent auto-reconnect. The framing decision is gated on a new `framedByteStreams` capability and recorded per-stream in the serialized ref (`framing: 'framed-v1'`); legacy raw streams continue to work unchanged. diff --git a/packages/core/src/byte-stream-framing.test.ts b/packages/core/src/byte-stream-framing.test.ts new file mode 100644 index 0000000000..a6e29be3e6 --- /dev/null +++ b/packages/core/src/byte-stream-framing.test.ts @@ -0,0 +1,523 @@ +import type { World } from '@workflow/world'; +import { afterEach, describe, expect, it, vi } from 'vitest'; +import { setWorld } from './runtime/world.js'; +import { + dehydrateStepReturnValue, + dehydrateWorkflowArguments, + getByteFramingStream, + getByteUnframingStream, +} from './serialization.js'; + +const FRAME_HEADER_SIZE = 4; + +/** Big-endian uint32 length prefix. */ +function header(length: number): Uint8Array { + const out = new Uint8Array(FRAME_HEADER_SIZE); + new DataView(out.buffer).setUint32(0, length, false); + return out; +} + +function concat(...parts: Uint8Array[]): Uint8Array { + const total = parts.reduce((s, p) => s + p.length, 0); + const out = new Uint8Array(total); + let offset = 0; + for (const p of parts) { + out.set(p, offset); + offset += p.length; + } + return out; +} + +/** + * Builds a ReadableStream from a fixed list of chunks. Each + * chunk is enqueued in its own `pull` call, so the consumer can observe + * read boundaries (important for the unframer's split-frame tests). + */ +function readableFromChunks(chunks: Uint8Array[]): ReadableStream { + let i = 0; + return new ReadableStream({ + pull(controller) { + if (i < chunks.length) { + controller.enqueue(chunks[i++]); + } else { + controller.close(); + } + }, + }); +} + +async function readAll( + stream: ReadableStream +): Promise { + const reader = stream.getReader(); + const out: Uint8Array[] = []; + for (;;) { + const r = await reader.read(); + if (r.done) break; + if (r.value) out.push(r.value); + } + return out; +} + +describe('getByteFramingStream', () => { + it('wraps each chunk in a 4-byte big-endian length prefix', async () => { + const chunks = [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5]), + new Uint8Array([6]), + ]; + const framed = await readAll( + readableFromChunks(chunks).pipeThrough(getByteFramingStream()) + ); + + expect(framed).toHaveLength(3); + expect(framed[0]).toEqual(concat(header(3), new Uint8Array([1, 2, 3]))); + expect(framed[1]).toEqual(concat(header(2), new Uint8Array([4, 5]))); + expect(framed[2]).toEqual(concat(header(1), new Uint8Array([6]))); + }); + + it('drops empty chunks', async () => { + // Empty frames would encode as `[0x00 0x00 0x00 0x00]`, which would + // collide with the legacy "looks framed" sniff in + // `getDeserializeStream`. They also carry no information, so we + // drop them on the writer side. + const framed = await readAll( + readableFromChunks([ + new Uint8Array([1]), + new Uint8Array(0), + new Uint8Array([2]), + ]).pipeThrough(getByteFramingStream()) + ); + + expect(framed).toHaveLength(2); + expect(framed[0]).toEqual(concat(header(1), new Uint8Array([1]))); + expect(framed[1]).toEqual(concat(header(1), new Uint8Array([2]))); + }); + + it('handles a large chunk', async () => { + const big = new Uint8Array(64_000); + for (let i = 0; i < big.length; i++) big[i] = i & 0xff; + + const framed = await readAll( + readableFromChunks([big]).pipeThrough(getByteFramingStream()) + ); + + expect(framed).toHaveLength(1); + expect(framed[0].length).toBe(FRAME_HEADER_SIZE + big.length); + // Header decodes to the chunk length + expect(new DataView(framed[0].buffer).getUint32(0, false)).toBe(big.length); + // Payload is preserved verbatim + expect(framed[0].slice(FRAME_HEADER_SIZE)).toEqual(big); + }); + + it('handles a stream with no chunks (clean EOF)', async () => { + const framed = await readAll( + readableFromChunks([]).pipeThrough(getByteFramingStream()) + ); + expect(framed).toHaveLength(0); + }); +}); + +describe('getByteUnframingStream', () => { + it('round-trips through the framer', async () => { + const chunks = [ + new TextEncoder().encode('hello'), + new TextEncoder().encode(', '), + new TextEncoder().encode('world'), + ]; + + const result = await readAll( + readableFromChunks(chunks) + .pipeThrough(getByteFramingStream()) + .pipeThrough(getByteUnframingStream()) + ); + + expect(result).toEqual(chunks); + }); + + it('reassembles a frame split across multiple reads', async () => { + // Frame: header(5) + 'hello'. Deliver byte-by-byte to prove the + // unframer buffers across read boundaries. + const full = concat(header(5), new TextEncoder().encode('hello')); + const split = Array.from(full).map((b) => new Uint8Array([b])); + + const result = await readAll( + readableFromChunks(split).pipeThrough(getByteUnframingStream()) + ); + + expect(result).toHaveLength(1); + expect(result[0]).toEqual(new TextEncoder().encode('hello')); + }); + + it('emits multiple frames coalesced into a single read', async () => { + // Three frames glued together in one transport chunk — the unframer + // should split them out. + const big = concat( + header(3), + new Uint8Array([1, 2, 3]), + header(2), + new Uint8Array([4, 5]), + header(1), + new Uint8Array([6]) + ); + + const result = await readAll( + readableFromChunks([big]).pipeThrough(getByteUnframingStream()) + ); + + expect(result).toEqual([ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5]), + new Uint8Array([6]), + ]); + }); + + it('errors if the stream ends mid-frame', async () => { + // Header advertises a 5-byte payload but only 2 bytes follow. + const truncated = concat(header(5), new Uint8Array([1, 2])); + + await expect( + readAll( + readableFromChunks([truncated]).pipeThrough(getByteUnframingStream()) + ) + ).rejects.toThrow(/truncated/i); + }); + + it('errors on a frame larger than the safety cap', async () => { + // 200MB length advertised — well past the 100MB cap. Ensures we + // fail fast instead of allocating an enormous buffer when fed a + // non-framed wire (e.g. a raw byte stream routed to a framed reader). + const bogus = concat(header(200_000_000), new Uint8Array([1, 2, 3])); + + await expect( + readAll(readableFromChunks([bogus]).pipeThrough(getByteUnframingStream())) + ).rejects.toThrow(/exceeds maximum/i); + }); + + it('treats clean EOF with no buffered data as success', async () => { + const result = await readAll( + readableFromChunks([]).pipeThrough(getByteUnframingStream()) + ); + expect(result).toHaveLength(0); + }); + + it('preserves chunk identity across many small reads', async () => { + // 100 single-byte chunks → 100 single-byte frames → after round-trip, + // 100 single-byte chunks emerge in the same order. + const chunks: Uint8Array[] = []; + for (let i = 0; i < 100; i++) chunks.push(new Uint8Array([i])); + + const result = await readAll( + readableFromChunks(chunks) + .pipeThrough(getByteFramingStream()) + .pipeThrough(getByteUnframingStream()) + ); + + expect(result).toHaveLength(100); + for (let i = 0; i < 100; i++) { + expect(result[i]).toEqual(new Uint8Array([i])); + } + }); +}); + +// ---------------------------------------------------------------------------- +// End-to-end: dehydrate + hydrate carries the framing decision through the +// stream ref, and round-trips byte data correctly in both modes. +// ---------------------------------------------------------------------------- + +/** + * In-memory mock world that captures stream writes and replays them on + * subsequent reads. Just enough surface for the dehydrate/hydrate paths + * exercised below — no event log, no queue, etc. + */ +function makeMockWorld(): World { + const streamData = new Map(); + const closedStreams = new Set(); + + const write = vi.fn( + async ( + _runId: string | Promise, + name: string, + chunk: string | Uint8Array + ) => { + const list = streamData.get(name) ?? []; + // Copy bytes — byte-stream pipes transfer ArrayBuffer ownership, + // so the source buffer may be detached by the time the test + // wants to compare it to expected values. + const stored = + typeof chunk === 'string' + ? new TextEncoder().encode(chunk) + : new Uint8Array(chunk); + list.push(stored); + streamData.set(name, list); + } + ); + + return { + streams: { + write, + writeMulti: vi.fn( + async ( + _runId: string | Promise, + name: string, + chunks: (string | Uint8Array)[] + ) => { + for (const chunk of chunks) { + await write(_runId, name, chunk); + } + } + ), + get: vi.fn(async (_runId: string, name: string) => { + const chunks = streamData.get(name) ?? []; + let i = 0; + return new ReadableStream({ + pull(controller) { + if (i < chunks.length) { + controller.enqueue(chunks[i++]); + } else { + controller.close(); + } + }, + }); + }), + close: vi.fn(async (_runId: string | Promise, name: string) => { + closedStreams.add(name); + }), + }, + } as unknown as World; +} + +describe('byte-stream framing end-to-end through dehydrate/hydrate', () => { + afterEach(() => { + setWorld(undefined as unknown as World); + }); + + async function readBytes( + stream: ReadableStream + ): Promise { + const reader = stream.getReader(); + const out: Uint8Array[] = []; + for (;;) { + const r = await reader.read(); + if (r.done) break; + if (r.value) out.push(r.value); + } + return out; + } + + it('emits no `framing` field when framedByteStreams is false (back-compat)', async () => { + setWorld(makeMockWorld()); + const stream = new ReadableStream({ + type: 'bytes', + pull(c) { + c.enqueue(new Uint8Array([1, 2, 3])); + c.close(); + }, + }); + + const ops: Promise[] = []; + const dehydrated = await dehydrateWorkflowArguments( + stream, + 'wrun_test', + undefined, + ops, + globalThis, + false, + // framedByteStreams = false — legacy raw bytes + false + ); + await Promise.all(ops); + + // The serialized devalue blob should reference a ReadableStream with + // no `framing` field (treated as raw on the consumer side). + expect(dehydrated).toBeInstanceOf(Uint8Array); + const text = new TextDecoder().decode(dehydrated as Uint8Array); + expect(text).toContain('ReadableStream'); + expect(text).not.toContain('framing'); + expect(text).not.toContain('framed-v1'); + }); + + it('emits `framing: framed-v1` when framedByteStreams is true', async () => { + setWorld(makeMockWorld()); + const stream = new ReadableStream({ + type: 'bytes', + pull(c) { + c.enqueue(new Uint8Array([1, 2, 3])); + c.close(); + }, + }); + + const ops: Promise[] = []; + const dehydrated = await dehydrateWorkflowArguments( + stream, + 'wrun_test', + undefined, + ops, + globalThis, + false, + true + ); + await Promise.all(ops); + + expect(dehydrated).toBeInstanceOf(Uint8Array); + const text = new TextDecoder().decode(dehydrated as Uint8Array); + expect(text).toContain('framed-v1'); + }); + + /** + * Pull the auto-generated stream name out of a devalue-serialized + * blob. Devalue uses index references rather than nested object + * literals, so the `name` field shows up as a flat string somewhere + * in the array. We just match the ULID pattern, which is unique + * enough that it can't conflict with anything else devalue might + * emit. + */ + function extractStreamName(dehydrated: Uint8Array): string { + const text = new TextDecoder().decode(dehydrated); + const m = text.match(/strm_[0-9A-HJKMNP-TV-Z]{26}/); + if (!m) { + throw new Error( + `Could not find strm_ in serialized payload: ${text.slice(0, 200)}` + ); + } + return m[0]; + } + + it('round-trips a framed byte stream: producer writes framed, consumer unframes', async () => { + setWorld(makeMockWorld()); + + const original = [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5]), + new Uint8Array([6, 7, 8, 9]), + ]; + // Snapshot for comparison since byte-stream pipes detach the source. + const expected = original.map((u) => new Uint8Array(u)); + let i = 0; + const stream = new ReadableStream({ + type: 'bytes', + pull(c) { + if (i < original.length) { + c.enqueue(original[i++]); + } else { + c.close(); + } + }, + }); + + const ops: Promise[] = []; + const dehydrated = await dehydrateStepReturnValue( + stream, + 'wrun_test', + undefined, + ops, + globalThis, + false, + true + ); + // Wait for the producer pipe to finish writing all chunks to the world. + await Promise.all(ops); + + // Sanity: the wire format is framed. + const text = new TextDecoder().decode(dehydrated as Uint8Array); + expect(text).toContain('framed-v1'); + + // Replay the bytes the world has captured into a fresh ReadableStream + // and pipe through the unframer — this is exactly what + // `getExternalRevivers` does for `framing === 'framed-v1'` refs. + const name = extractStreamName(dehydrated as Uint8Array); + const world = await (await import('./runtime/world.js')).getWorld(); + const wireStream = await world.streams.get('wrun_test', name); + const userBytes = await readBytes( + wireStream.pipeThrough(getByteUnframingStream()) + ); + + expect(userBytes).toEqual(expected); + }); + + it('round-trips a raw byte stream: producer writes raw, consumer reads raw', async () => { + setWorld(makeMockWorld()); + + const original = [new Uint8Array([10, 20, 30])]; + const expected = original.map((u) => new Uint8Array(u)); + let i = 0; + const stream = new ReadableStream({ + type: 'bytes', + pull(c) { + if (i < original.length) { + c.enqueue(original[i++]); + } else { + c.close(); + } + }, + }); + + const ops: Promise[] = []; + const dehydrated = await dehydrateStepReturnValue( + stream, + 'wrun_test', + undefined, + ops, + globalThis, + false, + false + ); + await Promise.all(ops); + + const text = new TextDecoder().decode(dehydrated as Uint8Array); + expect(text).not.toContain('framed-v1'); + + // Sanity: the world has the raw user bytes as written, without any + // length-prefix envelope. (The reviver-side dispatch on absent + // `framing` is exercised by the existing serialization tests in + // serialization.test.ts; here we just confirm the wire bytes match + // what the user wrote.) + const name = extractStreamName(dehydrated as Uint8Array); + const world = await (await import('./runtime/world.js')).getWorld(); + const wireStream = await world.streams.get('wrun_test', name); + const wireBytes = await readBytes(wireStream); + // Single chunk, no framing — just the user bytes. + expect(wireBytes).toEqual(expected); + }); + + it('hydrate of a framed-v1 ref unframes; absent ref reads raw', async () => { + // Direct exercise of the reviver dispatch: write framed bytes to a + // mock world under a known name, then construct the stream ref two + // different ways (with framing and without) to verify the consumer + // dispatches correctly. + setWorld(makeMockWorld()); + const world = await (await import('./runtime/world.js')).getWorld(); + + // Frame three user chunks into the wire format and stash them. + const chunks = [ + new Uint8Array([1, 2]), + new Uint8Array([3, 4, 5]), + new Uint8Array([6]), + ]; + const reader = new ReadableStream({ + pull(c) { + for (const ch of chunks) c.enqueue(ch); + c.close(); + }, + }) + .pipeThrough(getByteFramingStream()) + .getReader(); + + const wireBytes: Uint8Array[] = []; + for (;;) { + const r = await reader.read(); + if (r.done) break; + wireBytes.push(r.value); + } + for (const b of wireBytes) { + await world.streams.write('wrun_test', 'strm_known', b); + } + + // Now read back via wire stream + unframer — should produce original chunks. + const wireStream = await world.streams.get('wrun_test', 'strm_known'); + const got = await readBytes( + wireStream.pipeThrough(getByteUnframingStream()) + ); + expect(got).toEqual(chunks); + }); +}); diff --git a/packages/core/src/capabilities.test.ts b/packages/core/src/capabilities.test.ts index c402d7b821..643d2e3f7e 100644 --- a/packages/core/src/capabilities.test.ts +++ b/packages/core/src/capabilities.test.ts @@ -61,4 +61,42 @@ describe('getRunCapabilities', () => { expect(supportedFormats.has(SerializationFormat.ENCRYPTED)).toBe(true); }); }); + + describe('framedByteStreams (byte-stream wire framing)', () => { + it('is false when version is undefined', () => { + expect(getRunCapabilities(undefined).framedByteStreams).toBe(false); + }); + + it.each([ + 'not-a-version', + '', + 'dev', + ])('is false for invalid version "%s"', (version) => { + expect(getRunCapabilities(version).framedByteStreams).toBe(false); + }); + + it.each([ + // pre-cutoff: encryption introduced in 4.2.0-beta.64; framing is + // newer than that, so any 4.x version is too old + '4.2.0-beta.64', + '4.2.0', + '4.99.99', + '5.0.0-beta.2', + ])('is false for pre-framing version %s', (version) => { + expect(getRunCapabilities(version).framedByteStreams).toBe(false); + }); + + it('is true at the exact cutoff version (5.0.0-beta.3)', () => { + expect(getRunCapabilities('5.0.0-beta.3').framedByteStreams).toBe(true); + }); + + it.each([ + '5.0.0-beta.4', + '5.0.0', + '5.1.0', + '6.0.0', + ])('is true for post-framing version %s', (version) => { + expect(getRunCapabilities(version).framedByteStreams).toBe(true); + }); + }); }); diff --git a/packages/core/src/capabilities.ts b/packages/core/src/capabilities.ts index 22baebc1d4..5b139e801e 100644 --- a/packages/core/src/capabilities.ts +++ b/packages/core/src/capabilities.ts @@ -15,11 +15,19 @@ * `@workflow/core` version that supports it * 3. The `getRunCapabilities()` function will automatically include it * + * ## Adding a new non-format capability + * + * Some capabilities aren't serialization format prefixes — e.g. + * byte-stream wire framing is an envelope around chunks rather than + * a content format. For those, add a boolean field to `RunCapabilities` + * and an entry in `CAPABILITY_VERSION_TABLE` below. + * * ## History * * - `encr` (AES-256-GCM encryption): added in `4.2.0-beta.64` * Commit: 7618ac36 "Wire AES-GCM encryption into serialization layer (#1251)" * https://github.com/vercel/workflow/commit/7618ac36 + * - `framedByteStreams` (wire-level chunk framing for byte streams): added in `5.0.0-beta.3` */ import semver from 'semver'; @@ -38,6 +46,16 @@ export interface RunCapabilities { * if encryption is supported, etc. */ supportedFormats: ReadonlySet; + + /** + * Whether the target run can decode wire-framed byte streams. When true, + * byte streams (`type: 'bytes'` ReadableStreams passed across boundaries) + * are wrapped in a length-prefixed frame envelope on the wire so the + * reader can identify chunk boundaries — which enables auto-reconnect + * on transient stream errors. When false, byte streams are written as + * raw bytes (the legacy format) for compatibility with older runs. + */ + framedByteStreams: boolean; } /** @@ -55,6 +73,15 @@ const FORMAT_VERSION_TABLE: ReadonlyArray<{ // { format: SerializationFormat.ENCRYPTED_V2, minVersion: '5.x.y' }, ]; +/** + * Maps non-format capability flags (booleans on `RunCapabilities`) to the + * minimum `@workflow/core` version that introduced support for them. + */ +const CAPABILITY_VERSION_TABLE: ReadonlyArray<{ + capability: keyof Omit; + minVersion: string; +}> = [{ capability: 'framedByteStreams', minVersion: '5.0.0-beta.3' }]; + /** * The set of formats supported by all specVersion 2 runs, regardless of * `@workflow/core` version. These are the baseline formats that were present @@ -70,13 +97,17 @@ const BASELINE_FORMATS: ReadonlySet = new Set([ * * When the version is `undefined`, not a string, or not a valid semver string * (e.g. very old runs that predate the field, or corrupted metadata), - * we assume the most conservative capabilities (baseline formats only). + * we assume the most conservative capabilities (baseline formats only, + * non-format capabilities all `false`). */ export function getRunCapabilities( workflowCoreVersion: string | undefined ): RunCapabilities { if (!workflowCoreVersion || !semver.valid(workflowCoreVersion)) { - return { supportedFormats: BASELINE_FORMATS }; + return { + supportedFormats: BASELINE_FORMATS, + framedByteStreams: false, + }; } const formats = new Set(BASELINE_FORMATS); @@ -87,5 +118,16 @@ export function getRunCapabilities( } } - return { supportedFormats: formats }; + const result: RunCapabilities = { + supportedFormats: formats, + framedByteStreams: false, + }; + + for (const { capability, minVersion } of CAPABILITY_VERSION_TABLE) { + if (semver.gte(workflowCoreVersion, minVersion)) { + result[capability] = true; + } + } + + return result; } diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index eed7e62450..4f0c2be14e 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -129,12 +129,13 @@ export async function resumeHook( // Check the target run's capabilities to ensure we encode the // payload in a format the run's deployment can decode. For example, // runs created before encryption support was added cannot decode - // the 'encr' serialization format. + // the 'encr' serialization format, and runs created before + // byte-stream framing support cannot decode framed byte streams. const rawVersion = workflowRun.executionContext?.workflowCoreVersion; - const { supportedFormats } = getRunCapabilities( + const capabilities = getRunCapabilities( typeof rawVersion === 'string' ? rawVersion : undefined ); - if (!supportedFormats.has(SerializationFormat.ENCRYPTED)) { + if (!capabilities.supportedFormats.has(SerializationFormat.ENCRYPTED)) { encryptionKey = undefined; } @@ -147,7 +148,8 @@ export async function resumeHook( encryptionKey, ops, globalThis, - v1Compat + v1Compat, + capabilities.framedByteStreams ); // NOTE: Workaround instead of injecting catching undefined unhandled rejections in webhook bundle waitUntil( diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index 73dcdc5f79..9178d2413b 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -12,6 +12,7 @@ import { SPEC_VERSION_SUPPORTS_EVENT_SOURCING, } from '@workflow/world'; import { monotonicFactory } from 'ulid'; +import { getRunCapabilities } from '../capabilities.js'; import { importKey } from '../encryption.js'; import { runtimeLogger } from '../logger.js'; import type { Serializable } from '../schemas.js'; @@ -20,10 +21,21 @@ import * as Attribute from '../telemetry/semantic-conventions.js'; import { serializeTraceCarrier, trace } from '../telemetry.js'; import { waitedUntil } from '../util.js'; import { version as workflowCoreVersion } from '../version.js'; -import { getWorkflowQueueName } from './helpers.js'; +import { getWorkflowQueueName, healthCheck } from './helpers.js'; import { Run } from './run.js'; import { getWorld } from './world.js'; +/** + * Timeout for the cross-deployment capability probe done before + * dehydrating workflow arguments. Kept tight on purpose: the probe is + * an optimization (it lets the caller emit the framed byte-stream wire + * format when the target supports it), and the fallback on timeout is + * the legacy raw format which always works. Long delays here would just + * make `start({ deploymentId: ... })` slower for users whose target + * deployments don't recognize the health check at all. + */ +const CROSS_DEPLOYMENT_CAPABILITY_PROBE_TIMEOUT_MS = 2_000; + /** ULID generator for client-side runId generation */ const ulid = monotonicFactory(); @@ -150,7 +162,8 @@ export async function start( }); const world = opts?.world ?? (await getWorld()); - let deploymentId = opts.deploymentId ?? (await world.getDeploymentId()); + const currentDeploymentId = await world.getDeploymentId(); + let deploymentId = opts.deploymentId ?? currentDeploymentId; // When 'latest' is requested, resolve the actual latest deployment ID // for the current deployment's environment (same production target or @@ -164,6 +177,32 @@ export async function start( deploymentId = await world.resolveLatestDeploymentId(); } + // Decide whether to write byte streams in the framed wire format. + // For same-deployment starts (the common case) we know the target is + // running this same SDK version, so framing is safe. For cross- + // deployment starts (explicit deploymentId or 'latest' that resolves + // to a different deployment) we probe the target via healthCheck to + // learn its workflow-core version, then derive the capability. The + // probe has a tight timeout — on miss/failure we fall back to the + // legacy raw byte format, which is universally readable. + // + // Worlds that don't expose the `streams` API (e.g. minimal test + // mocks) can't service health checks, so we skip the probe for them. + let framedByteStreams: boolean; + if (deploymentId === currentDeploymentId) { + framedByteStreams = true; + } else if (typeof world.streams?.get !== 'function') { + framedByteStreams = false; + } else { + const probe = await healthCheck(world, 'workflow', { + deploymentId, + timeout: CROSS_DEPLOYMENT_CAPABILITY_PROBE_TIMEOUT_MS, + }).catch(() => undefined); + framedByteStreams = getRunCapabilities( + probe?.workflowCoreVersion + ).framedByteStreams; + } + const ops: Promise[] = []; // Generate runId client-side so we have it before serialization @@ -204,7 +243,8 @@ export async function start( encryptionKey, ops, globalThis, - v1Compat + v1Compat, + framedByteStreams ); const executionContext = { diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index f38184f293..74bcd26c40 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -796,11 +796,18 @@ const stepHandler = (worldHandlers: WorldHandlers) => {}, async (dehydrateSpan) => { const startTime = Date.now(); + // Step return values are consumed by the workflow VM + // running on this same deployment (version skew + // protection ensures it). Byte-stream framing is + // therefore always safe here. const dehydrated = await dehydrateStepReturnValue( result, workflowRunId, encryptionKey, - ops + ops, + globalThis, + false, + true ); const durationMs = Date.now() - startTime; dehydrateSpan?.setAttributes({ diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index c16b04f310..10f3618d48 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -34,6 +34,7 @@ import { contextStorage } from './step/context-storage.js'; import { BODY_INIT_SYMBOL, STABLE_ULID, + STREAM_FRAMING_SYMBOL, STREAM_NAME_SYMBOL, STREAM_TYPE_SYMBOL, WEBHOOK_RESPONSE_WRITABLE, @@ -412,6 +413,144 @@ export function getDeserializeStream( return stream; } +// ============================================================================ +// Byte-stream wire framing +// ============================================================================ +// +// Byte streams (`type: 'bytes'` ReadableStreams passed across boundaries) +// are written to the underlying world's stream transport one user chunk at +// a time. Without an in-band envelope, the reader sees a flat stream of +// bytes — there is no way to tell where one user chunk ends and the next +// begins, which makes mid-stream reconnect impossible (we don't know how +// many server-side chunks have been consumed). +// +// To enable transparent reconnect, this PR introduces an opt-in wire +// envelope that wraps each user chunk in a length-prefix: +// +// [4-byte big-endian length][user payload bytes] +// +// The envelope is identical in shape to `getSerializeStream`'s framing, +// but the payload here is *raw user bytes* — there is no inner +// format-prefix, no devalue, no encryption. A framed byte stream stays +// semantically a byte stream end-to-end; the framing is purely transport. +// +// The decision to use framing for a given stream is recorded in the +// serialized stream ref (`framing: 'framed-v1'`), so both sides agree on +// the wire format without runtime negotiation. Producers that target a +// run whose deployment doesn't support framing (see `getRunCapabilities` +// in capabilities.ts) emit raw bytes and a ref without the field — which +// the reader treats as legacy raw bytes for backwards compatibility. + +/** + * Wire-framing format identifier carried in the serialized + * `ReadableStream` ref's `framing` field. + * + * - absent / `'raw'`: chunks are written to the transport verbatim + * (legacy format — no auto-reconnect support). + * - `'framed-v1'`: each chunk is wrapped in a 4-byte big-endian length + * prefix, allowing the reader to identify chunk boundaries and + * transparently reconnect on transient stream errors. + */ +export type ByteStreamFraming = 'raw' | 'framed-v1'; + +/** + * Wraps each chunk of a byte stream in a 4-byte big-endian length + * prefix. Used by the producer side of a framed byte-stream pipe. + * + * Empty chunks (length 0) are dropped — the resulting `[0x00 0x00 0x00 0x00]` + * frame would be ambiguous with the legacy "looks framed" detection in + * `getDeserializeStream`, and it carries no information. + */ +export function getByteFramingStream(): TransformStream< + Uint8Array, + Uint8Array +> { + return new TransformStream({ + transform(chunk, controller) { + if (chunk.length === 0) return; + const frame = new Uint8Array(FRAME_HEADER_SIZE + chunk.length); + new DataView(frame.buffer).setUint32(0, chunk.length, false); + frame.set(chunk, FRAME_HEADER_SIZE); + controller.enqueue(frame); + }, + }); +} + +/** + * Unwraps length-prefixed byte-stream frames back into the original user + * chunks. Used by the consumer side of a framed byte-stream pipe. + * + * Buffers across read boundaries — the transport may split a single + * frame across multiple reads (header in one chunk, payload in another) + * or coalesce multiple frames into a single read. The transform emits + * whole user chunks regardless of transport chunking. + * + * Errors the stream if the length header advertises a frame larger than + * `MAX_FRAME_SIZE` bytes, since that almost certainly indicates a + * misframed wire (e.g. a raw byte stream being fed through this transform + * by mistake) and we don't want to allocate an enormous buffer. + */ +export function getByteUnframingStream(): TransformStream< + Uint8Array, + Uint8Array +> { + // Sanity cap: 100MB per chunk. Workflow byte chunks are typically far + // smaller; anything bigger almost certainly means we got a non-framed + // wire fed through this transform by mistake (e.g. legacy raw bytes + // routed to a framed reader). + const MAX_FRAME_SIZE = 100_000_000; + let buffer = new Uint8Array(0); + + function appendToBuffer(data: Uint8Array) { + const next = new Uint8Array(buffer.length + data.length); + next.set(buffer, 0); + next.set(data, buffer.length); + buffer = next; + } + + return new TransformStream({ + transform(chunk, controller) { + if (chunk.length > 0) appendToBuffer(chunk); + + while (buffer.length >= FRAME_HEADER_SIZE) { + const frameLength = new DataView( + buffer.buffer, + buffer.byteOffset, + buffer.byteLength + ).getUint32(0, false); + + if (frameLength > MAX_FRAME_SIZE) { + controller.error( + new WorkflowRuntimeError( + `Byte-stream frame length ${frameLength} exceeds maximum (${MAX_FRAME_SIZE}). ` + + `This usually means a non-framed byte stream is being read as framed.`, + { slug: 'serialization-failed' } + ) + ); + return; + } + + const total = FRAME_HEADER_SIZE + frameLength; + if (buffer.length < total) break; + + controller.enqueue(buffer.slice(FRAME_HEADER_SIZE, total)); + buffer = buffer.slice(total); + } + }, + flush(controller) { + if (buffer.length > 0) { + controller.error( + new WorkflowRuntimeError( + `Byte-stream ended with ${buffer.length} bytes of incomplete frame data. ` + + `The stream was truncated mid-frame.`, + { slug: 'serialization-failed' } + ) + ); + } + }, + }); +} + export class WorkflowServerReadableStream extends ReadableStream { #reader?: ReadableStreamDefaultReader; @@ -614,7 +753,21 @@ export interface SerializableSpecial { Int32Array: string; // base64 string Map: [any, any][]; ReadableStream: - | { name: string; type?: 'bytes'; startIndex?: number } + | { + name: string; + type?: 'bytes'; + startIndex?: number; + /** + * Wire-framing format for byte streams. See {@link ByteStreamFraming} + * and {@link getByteFramingStream} / {@link getByteUnframingStream}. + * + * Only meaningful when `type === 'bytes'`. Absent on object streams + * (which always use length-prefixed devalue framing) and on legacy + * byte streams written by SDKs that predate framing support — those + * are interpreted as `'raw'` by the consumer. + */ + framing?: ByteStreamFraming; + } | { bodyInit: any }; RegExp: { source: string; flags: string }; Request: { @@ -872,13 +1025,21 @@ function getCommonReducers(global: Record = globalThis) { * * @param global * @param ops + * @param runId + * @param cryptoKey + * @param framedByteStreams - When `true`, byte streams (`type: 'bytes'`) + * are wrapped in length-prefixed frames on the wire so the consumer + * can reconnect on transient errors. Should match the target run's + * capability — see `getRunCapabilities` in `capabilities.ts`. Defaults + * to `false` for backwards compatibility with older runs. * @returns */ export function getExternalReducers( global: Record = globalThis, ops: Promise[], runId: string, - cryptoKey: EncryptionKeyParam + cryptoKey: EncryptionKeyParam, + framedByteStreams = false ): Reducers { return { ...getCommonReducers(global), @@ -897,13 +1058,23 @@ export function getExternalReducers( const writable = new WorkflowServerWritableStream(runId, name); if (type === 'bytes') { - ops.push(value.pipeTo(writable)); + if (framedByteStreams) { + ops.push(value.pipeThrough(getByteFramingStream()).pipeTo(writable)); + } else { + ops.push(value.pipeTo(writable)); + } } else { ops.push( value .pipeThrough( getSerializeStream( - getExternalReducers(global, ops, runId, cryptoKey), + getExternalReducers( + global, + ops, + runId, + cryptoKey, + framedByteStreams + ), cryptoKey ) ) @@ -913,6 +1084,7 @@ export function getExternalReducers( const s: SerializableSpecial['ReadableStream'] = { name }; if (type) s.type = type; + if (type === 'bytes' && framedByteStreams) s.framing = 'framed-v1'; return s; }, @@ -963,6 +1135,9 @@ export function getWorkflowReducers( const s: SerializableSpecial['ReadableStream'] = { name }; const type = value[STREAM_TYPE_SYMBOL]; if (type) s.type = type; + const framing: ByteStreamFraming | undefined = + value[STREAM_FRAMING_SYMBOL]; + if (framing) s.framing = framing; return s; }, WritableStream: (value) => { @@ -989,7 +1164,8 @@ function getStepReducers( global: Record = globalThis, ops: Promise[], runId: string, - cryptoKey: EncryptionKeyParam + cryptoKey: EncryptionKeyParam, + framedByteStreams = false ): Reducers { return { ...getCommonReducers(global), @@ -1007,21 +1183,40 @@ function getStepReducers( // name and type. let name = value[STREAM_NAME_SYMBOL]; let type = value[STREAM_TYPE_SYMBOL]; + // The framing symbol is set when a workflow VM revives a stream + // handle from a previous step (see `getWorkflowRevivers`). When + // present we must propagate the same framing choice on the way + // back out, since the bytes already on the server's stream are in + // that format — switching format mid-stream would corrupt them. + let framing: ByteStreamFraming | undefined = value[STREAM_FRAMING_SYMBOL]; if (!name) { const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); name = `strm_${streamId}`; type = getStreamType(value); + framing = type === 'bytes' && framedByteStreams ? 'framed-v1' : framing; const writable = new WorkflowServerWritableStream(runId, name); if (type === 'bytes') { - ops.push(value.pipeTo(writable)); + if (framing === 'framed-v1') { + ops.push( + value.pipeThrough(getByteFramingStream()).pipeTo(writable) + ); + } else { + ops.push(value.pipeTo(writable)); + } } else { ops.push( value .pipeThrough( getSerializeStream( - getStepReducers(global, ops, runId, cryptoKey), + getStepReducers( + global, + ops, + runId, + cryptoKey, + framedByteStreams + ), cryptoKey ) ) @@ -1032,6 +1227,7 @@ function getStepReducers( const s: SerializableSpecial['ReadableStream'] = { name }; if (type) s.type = type; + if (framing) s.framing = framing; return s; }, @@ -1246,13 +1442,19 @@ export function getExternalRevivers( value.startIndex ); if (value.type === 'bytes') { - // For byte streams, use flushable pipe with lock polling + // For byte streams, use flushable pipe with lock polling. + // If the producer wrote framed bytes (framing === 'framed-v1'), + // unwrap the length-prefix envelope before handing chunks to + // the user. Absent / 'raw' framing means legacy raw bytes — + // pipe through unchanged for backwards compatibility. const state = createFlushableState(); ops.push(state.promise); - // Create an identity transform to give the user a readable + // Create an identity (or unframing) transform to give the user a readable const { readable: userReadable, writable } = - new global.TransformStream(); + value.framing === 'framed-v1' + ? getByteUnframingStream() + : new global.TransformStream(); // Start the flushable pipe in the background flushablePipe(readable, writable, state).catch(() => { @@ -1402,6 +1604,15 @@ export function getWorkflowRevivers( value: value.type, writable: false, }, + // Carry the wire-framing decision through the workflow VM so + // that when the handle is later passed to a step (which reads + // the actual bytes from the server) we know whether to unframe. + // Defaults to undefined for streams whose serialized ref didn't + // carry the field — those are treated as legacy raw bytes. + [STREAM_FRAMING_SYMBOL]: { + value: value.framing, + writable: false, + }, }); }, WritableStream: (value) => { @@ -1537,13 +1748,19 @@ function getStepRevivers( const readable = new WorkflowServerReadableStream(runId, value.name); if (value.type === 'bytes') { - // For byte streams, use flushable pipe with lock polling + // For byte streams, use flushable pipe with lock polling. + // If the producer wrote framed bytes (framing === 'framed-v1'), + // unwrap the length-prefix envelope before handing chunks to + // the user step. Absent / 'raw' framing means legacy raw bytes — + // pipe through unchanged for backwards compatibility. const state = createFlushableState(); ops.push(state.promise); - // Create an identity transform to give the user a readable + // Create an identity (or unframing) transform to give the user a readable const { readable: userReadable, writable } = - new global.TransformStream(); + value.framing === 'framed-v1' + ? getByteUnframingStream() + : new global.TransformStream(); // Start the flushable pipe in the background flushablePipe(readable, writable, state).catch(() => { @@ -1676,6 +1893,10 @@ export async function maybeDecrypt( * @param ops - Promise array for stream operations * @param global - Global object for serialization context * @param v1Compat - Enable legacy v1 compatibility mode + * @param framedByteStreams - Whether the target run can decode wire-framed + * byte streams. Should match the target deployment's capability — see + * `getRunCapabilities` in `capabilities.ts`. Defaults to `false` for + * backwards compatibility with older runs. * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ export async function dehydrateWorkflowArguments( @@ -1684,10 +1905,14 @@ export async function dehydrateWorkflowArguments( key: CryptoKey | undefined, ops: Promise[] = [], global: Record = globalThis, - v1Compat = false + v1Compat = false, + framedByteStreams = false ): Promise { try { - const str = stringify(value, getExternalReducers(global, ops, runId, key)); + const str = stringify( + value, + getExternalReducers(global, ops, runId, key, framedByteStreams) + ); if (v1Compat) { return revive(str); } @@ -1935,6 +2160,10 @@ export async function hydrateStepArguments( * @param ops - Promise array for stream operations * @param global - Global object for serialization context * @param v1Compat - Enable legacy v1 compatibility mode + * @param framedByteStreams - Whether the target run can decode wire-framed + * byte streams. Should match the target deployment's capability — see + * `getRunCapabilities` in `capabilities.ts`. Defaults to `false` for + * backwards compatibility with older runs. * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ export async function dehydrateStepReturnValue( @@ -1943,10 +2172,14 @@ export async function dehydrateStepReturnValue( key: CryptoKey | undefined, ops: Promise[] = [], global: Record = globalThis, - v1Compat = false + v1Compat = false, + framedByteStreams = false ): Promise { try { - const str = stringify(value, getStepReducers(global, ops, runId, key)); + const str = stringify( + value, + getStepReducers(global, ops, runId, key, framedByteStreams) + ); if (v1Compat) { return revive(str); } diff --git a/packages/core/src/step/writable-stream.ts b/packages/core/src/step/writable-stream.ts index 093a5d9c58..30d0612be0 100644 --- a/packages/core/src/step/writable-stream.ts +++ b/packages/core/src/step/writable-stream.ts @@ -46,9 +46,12 @@ export function getWritable( const runId = ctx.workflowMetadata.workflowRunId; const name = getWorkflowRunStreamId(runId, namespace); - // Create a transform stream that serializes chunks and pipes to the workflow server + // Create a transform stream that serializes chunks and pipes to the workflow server. + // The target run is the workflow run that owns this step, which (per + // version skew protection) is on this same SDK version, so byte-stream + // framing is always safe here. const serialize = getSerializeStream( - getExternalReducers(globalThis, ctx.ops, runId, ctx.encryptionKey), + getExternalReducers(globalThis, ctx.ops, runId, ctx.encryptionKey, true), ctx.encryptionKey ); diff --git a/packages/core/src/symbols.ts b/packages/core/src/symbols.ts index 92df4058db..6dce779eb1 100644 --- a/packages/core/src/symbols.ts +++ b/packages/core/src/symbols.ts @@ -6,6 +6,7 @@ export const WORKFLOW_GET_STREAM_ID = Symbol.for('WORKFLOW_GET_STREAM_ID'); export const STABLE_ULID = Symbol.for('WORKFLOW_STABLE_ULID'); export const STREAM_NAME_SYMBOL = Symbol.for('WORKFLOW_STREAM_NAME'); export const STREAM_TYPE_SYMBOL = Symbol.for('WORKFLOW_STREAM_TYPE'); +export const STREAM_FRAMING_SYMBOL = Symbol.for('WORKFLOW_STREAM_FRAMING'); export const BODY_INIT_SYMBOL = Symbol.for('BODY_INIT'); export const WEBHOOK_RESPONSE_WRITABLE = Symbol.for( 'WEBHOOK_RESPONSE_WRITABLE'