From 3a2450c19040f36a74ea7872a38edf67744c6414 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 26 Feb 2026 16:58:41 +0000 Subject: [PATCH 1/5] feat(server): Graceful handling of large batch item payloads --- .../app/presenters/v3/SpanPresenter.server.ts | 3 +- .../routes/api.v3.batches.$batchId.items.ts | 7 +- .../route.tsx | 104 ++++----- .../services/streamBatchItems.server.ts | 212 ++++++++++++++++-- .../webapp/app/v3/runEngineHandlers.server.ts | 40 ++++ .../test/engine/streamBatchItems.test.ts | 90 +++++++- .../run-engine/src/batch-queue/index.ts | 6 +- .../run-engine/src/engine/errors.ts | 1 + packages/core/src/v3/errors.ts | 1 + packages/core/src/v3/schemas/common.ts | 1 + references/hello-world/src/trigger/batches.ts | 36 +++ 11 files changed, 416 insertions(+), 85 deletions(-) diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index 59e717f7cd8..a85d8b20dd2 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -509,8 +509,7 @@ export class SpanPresenter extends BasePresenter { taskIdentifier: true, spanId: true, createdAt: true, - number: true, - taskVersion: true, + status: true, }, where: { parentSpanId: spanId, diff --git a/apps/webapp/app/routes/api.v3.batches.$batchId.items.ts b/apps/webapp/app/routes/api.v3.batches.$batchId.items.ts index 8307f34afce..b3ed1c22422 100644 --- a/apps/webapp/app/routes/api.v3.batches.$batchId.items.ts +++ b/apps/webapp/app/routes/api.v3.batches.$batchId.items.ts @@ -99,11 +99,8 @@ export async function action({ request, params }: ActionFunctionArgs) { if (error instanceof ServiceValidationError) { return json({ error: error.message }, { status: 422 }); } else if (error instanceof Error) { - // Check for stream parsing errors - if ( - error.message.includes("Invalid JSON") || - error.message.includes("exceeds maximum size") - ) { + // Check for stream parsing errors (e.g. invalid JSON) + if (error.message.includes("Invalid JSON")) { return json({ error: error.message }, { status: 400 }); } diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index ae8bdaa7077..a78c95d6036 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -58,6 +58,7 @@ import { RunTimeline, RunTimelineEvent, SpanTimeline } from "~/components/run/Ru import { PacketDisplay } from "~/components/runs/v3/PacketDisplay"; import { RunIcon } from "~/components/runs/v3/RunIcon"; import { RunTag } from "~/components/runs/v3/RunTag"; +import { TruncatedCopyableValue } from "~/components/primitives/TruncatedCopyableValue"; import { SpanEvents } from "~/components/runs/v3/SpanEvents"; import { SpanTitle } from "~/components/runs/v3/SpanTitle"; import { TaskRunAttemptStatusCombo } from "~/components/runs/v3/TaskRunAttemptStatus"; @@ -133,9 +134,10 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { name: error.name, message: error.message, stack: error.stack, - cause: error.cause instanceof Error - ? { name: error.cause.name, message: error.cause.message } - : error.cause, + cause: + error.cause instanceof Error + ? { name: error.cause.name, message: error.cause.message } + : error.cause, } : error, }); @@ -1003,7 +1005,7 @@ function RunBody({ )} -
+
{run.friendlyId !== runParam && ( Message {span.message} - {span.triggeredRuns.length > 0 && ( - -
- Triggered runs - - - - Run # - Task - Version - Created at - - - - {span.triggeredRuns.map((run) => { - const path = v3RunSpanPath( - organization, - project, - environment, - { friendlyId: run.friendlyId }, - { spanId: run.spanId } - ); - return ( - - - {run.number} - - - {run.taskIdentifier} - - - {run.taskVersion ?? "–"} - - - - - - ); - })} - -
-
-
- )} {span.events.length > 0 && } {span.properties !== undefined ? ( @@ -1268,6 +1228,48 @@ function SpanEntity({ span }: { span: Span }) { showOpenInModal /> ) : null} + {span.triggeredRuns.length > 0 && ( +
+ Runs + + + + ID + Task + Status + Created + + + + {span.triggeredRuns.map((run) => { + const path = v3RunSpanPath( + organization, + project, + environment, + { friendlyId: run.friendlyId }, + { spanId: run.spanId } + ); + return ( + + + + + + {run.taskIdentifier} + + + + + + + + + ); + })} + +
+
+ )}
); } diff --git a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts index 8206760f469..c64e827593c 100644 --- a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts +++ b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts @@ -14,6 +14,14 @@ export type StreamBatchItemsServiceOptions = { maxItemBytes: number; }; +export type OversizedItemMarker = { + __batchItemError: "OVERSIZED"; + index: number; + task: string; + actualSize: number; + maxSize: number; +}; + export type StreamBatchItemsServiceConstructorOptions = { prisma?: PrismaClientOrTransaction; engine?: RunEngine; @@ -110,6 +118,41 @@ export class StreamBatchItemsService extends WithRunEngine { // Process items from the stream for await (const rawItem of itemsIterator) { + // Check for oversized item markers from the NDJSON parser + if (rawItem && typeof rawItem === "object" && "__batchItemError" in rawItem) { + const marker = rawItem as OversizedItemMarker; + const itemIndex = marker.index >= 0 ? marker.index : lastIndex + 1; + + const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(1)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(1)} KB. Reduce the payload size or offload large data to external storage.`; + + // Enqueue with __error metadata - processItemCallback will detect this + // and use TriggerFailedTaskService to create a pre-failed run + const batchItem: BatchItem = { + task: marker.task, + payload: "{}", + payloadType: "application/json", + options: { + __error: errorMessage, + __errorCode: "PAYLOAD_TOO_LARGE", + }, + }; + + const result = await this._engine.enqueueBatchItem( + batchId, + environment.id, + itemIndex, + batchItem + ); + + if (result.enqueued) { + itemsAccepted++; + } else { + itemsDeduplicated++; + } + lastIndex = itemIndex; + continue; + } + // Parse and validate the item const parseResult = BatchItemNDJSONSchema.safeParse(rawItem); if (!parseResult.success) { @@ -281,6 +324,121 @@ export class StreamBatchItemsService extends WithRunEngine { } } +/** + * Extract `index` and `task` from raw JSON bytes without decoding the full line. + * Scans at most 512 bytes, tracking JSON nesting depth to only match top-level keys. + */ +export function extractIndexAndTask(bytes: Uint8Array): { index: number; task: string } { + let index = -1; + let task = "unknown"; + let depth = 0; + let foundIndex = false; + let foundTask = false; + const limit = Math.min(bytes.byteLength, 512); + + const QUOTE = 0x22; // " + const COLON = 0x3a; // : + const LBRACE = 0x7b; // { + const RBRACE = 0x7d; // } + const LBRACKET = 0x5b; // [ + const RBRACKET = 0x5d; // ] + const BACKSLASH = 0x5c; // \ + + // Byte patterns for "index" and "task" (without quotes) + const INDEX_BYTES = [0x69, 0x6e, 0x64, 0x65, 0x78]; // index + const TASK_BYTES = [0x74, 0x61, 0x73, 0x6b]; // task + + let i = 0; + while (i < limit && !(foundIndex && foundTask)) { + const b = bytes[i]; + + if (b === LBRACE || b === LBRACKET) { + depth++; + i++; + continue; + } + if (b === RBRACE || b === RBRACKET) { + depth--; + i++; + continue; + } + + // Only match keys at depth 1 (top-level object) + if (b === QUOTE && depth === 1) { + // Read the key inside quotes + const keyStart = i + 1; + let keyEnd = keyStart; + while (keyEnd < limit && bytes[keyEnd] !== QUOTE) { + if (bytes[keyEnd] === BACKSLASH) keyEnd++; // skip escaped char + keyEnd++; + } + + const keyLen = keyEnd - keyStart; + + // Check if this key matches "index" or "task" + const isIndex = + !foundIndex && + keyLen === INDEX_BYTES.length && + INDEX_BYTES.every((b, j) => bytes[keyStart + j] === b); + const isTask = + !foundTask && + keyLen === TASK_BYTES.length && + TASK_BYTES.every((b, j) => bytes[keyStart + j] === b); + + if (isIndex || isTask) { + // Skip past closing quote and find colon + let pos = keyEnd + 1; + while (pos < limit && bytes[pos] !== COLON) pos++; + pos++; // skip colon + // Skip whitespace + while (pos < limit && (bytes[pos] === 0x20 || bytes[pos] === 0x09)) pos++; + + if (isIndex) { + // Parse digits + let num = 0; + let hasDigit = false; + while (pos < limit && bytes[pos] >= 0x30 && bytes[pos] <= 0x39) { + num = num * 10 + (bytes[pos] - 0x30); + hasDigit = true; + pos++; + } + if (hasDigit) { + index = num; + foundIndex = true; + } + } else { + // Parse quoted string value + if (pos < limit && bytes[pos] === QUOTE) { + const valStart = pos + 1; + let valEnd = valStart; + while (valEnd < limit && bytes[valEnd] !== QUOTE) { + if (bytes[valEnd] === BACKSLASH) valEnd++; + valEnd++; + } + // Decode just this slice + try { + task = new TextDecoder("utf-8", { fatal: true }).decode( + bytes.slice(valStart, valEnd) + ); + foundTask = true; + } catch { + // Leave as "unknown" + } + } + } + } + + // Skip past the key's closing quote + i = keyEnd + 1; + continue; + } + + i++; + } + + return { index, task }; +} + /** * Create an NDJSON parser transform stream. * @@ -407,11 +565,19 @@ export function createNdjsonParserStream( while ((newlineIndex = findNewlineIndex()) !== -1) { // Check size limit BEFORE extracting/decoding (bytes up to newline) if (newlineIndex > maxItemBytes) { - throw new Error( - `Item at line ${ - lineNumber + 1 - } exceeds maximum size of ${maxItemBytes} bytes (actual: ${newlineIndex})` - ); + // Case 1: Complete line exceeds limit - emit marker instead of throwing + const lineBytes = extractLine(newlineIndex); + const extracted = extractIndexAndTask(lineBytes); + const marker: OversizedItemMarker = { + __batchItemError: "OVERSIZED", + index: extracted.index, + task: extracted.task, + actualSize: newlineIndex, + maxSize: maxItemBytes, + }; + controller.enqueue(marker); + lineNumber++; + continue; } const lineBytes = extractLine(newlineIndex); @@ -421,11 +587,21 @@ export function createNdjsonParserStream( // Check if the remaining buffer (incomplete line) exceeds the limit // This prevents OOM from a single huge line without newlines if (totalBytes > maxItemBytes) { - throw new Error( - `Item at line ${ - lineNumber + 1 - } exceeds maximum size of ${maxItemBytes} bytes (buffered: ${totalBytes}, no newline found)` - ); + // Case 2: Incomplete line exceeds limit - emit marker instead of throwing + const extracted = extractIndexAndTask(concatenateChunks()); + const marker: OversizedItemMarker = { + __batchItemError: "OVERSIZED", + index: extracted.index, + task: extracted.task, + actualSize: totalBytes, + maxSize: maxItemBytes, + }; + controller.enqueue(marker); + lineNumber++; + // Clear buffer since we consumed the oversized data + chunks = []; + totalBytes = 0; + return; } }, @@ -441,11 +617,17 @@ export function createNdjsonParserStream( // Check size limit before processing final line if (totalBytes > maxItemBytes) { - throw new Error( - `Item at line ${ - lineNumber + 1 - } exceeds maximum size of ${maxItemBytes} bytes (actual: ${totalBytes})` - ); + // Case 3: Flush with oversized remaining - emit marker instead of throwing + const extracted = extractIndexAndTask(concatenateChunks()); + const marker: OversizedItemMarker = { + __batchItemError: "OVERSIZED", + index: extracted.index, + task: extracted.task, + actualSize: totalBytes, + maxSize: maxItemBytes, + }; + controller.enqueue(marker); + return; } const finalBytes = concatenateChunks(); diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index f0cf449d36a..46fe5eaa796 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -669,6 +669,46 @@ export function setupBatchQueueCallbacks() { engine, }); + // Check for pre-marked error items (e.g. oversized payloads) + const itemError = item.options?.__error as string | undefined; + if (itemError) { + const errorCode = (item.options?.__errorCode as string) ?? "ITEM_ERROR"; + + let environment: AuthenticatedEnvironment | undefined; + try { + environment = (await findEnvironmentById(meta.environmentId)) ?? undefined; + } catch { + // Best-effort environment lookup + } + + if (environment) { + const failedRunId = await triggerFailedTaskService.call({ + taskId: item.task, + environment, + payload: item.payload ?? "{}", + payloadType: item.payloadType as string, + errorMessage: itemError, + errorCode: errorCode as TaskRunErrorCodes, + parentRunId: meta.parentRunId, + resumeParentOnCompletion: meta.resumeParentOnCompletion, + batch: { id: batchId, index: itemIndex }, + traceContext: meta.traceContext as Record | undefined, + spanParentAsLink: meta.spanParentAsLink, + }); + + if (failedRunId) { + span.setAttribute("batch.result.pre_failed", true); + span.setAttribute("batch.result.run_id", failedRunId); + span.end(); + return { success: true as const, runId: failedRunId }; + } + } + + // Fallback if TriggerFailedTaskService or environment lookup fails + span.end(); + return { success: false as const, error: itemError, errorCode }; + } + let environment: AuthenticatedEnvironment | undefined; try { environment = (await findEnvironmentById(meta.environmentId)) ?? undefined; diff --git a/apps/webapp/test/engine/streamBatchItems.test.ts b/apps/webapp/test/engine/streamBatchItems.test.ts index 9e3b3aafe8b..928c0e98de6 100644 --- a/apps/webapp/test/engine/streamBatchItems.test.ts +++ b/apps/webapp/test/engine/streamBatchItems.test.ts @@ -24,6 +24,8 @@ import { StreamBatchItemsService, createNdjsonParserStream, streamToAsyncIterable, + extractIndexAndTask, + type OversizedItemMarker, } from "../../app/runEngine/services/streamBatchItems.server"; import { ServiceValidationError } from "../../app/v3/services/baseService.server"; @@ -705,33 +707,44 @@ describe("createNdjsonParserStream", () => { expect(results).toEqual([{ greeting: "こんにちは" }]); }); - it("should reject lines exceeding maxItemBytes", async () => { + it("should emit OversizedItemMarker for lines exceeding maxItemBytes", async () => { const maxBytes = 50; - // Create a line that exceeds the limit - const largeJson = JSON.stringify({ data: "x".repeat(100) }) + "\n"; + // Create a line that exceeds the limit with index and task fields + const largeJson = JSON.stringify({ index: 3, task: "my-task", data: "x".repeat(100) }) + "\n"; const encoder = new TextEncoder(); const stream = chunksToStream([encoder.encode(largeJson)]); const parser = createNdjsonParserStream(maxBytes); + const results = await collectStream(stream.pipeThrough(parser)); - await expect(collectStream(stream.pipeThrough(parser))).rejects.toThrow(/exceeds maximum size/); + expect(results).toHaveLength(1); + const marker = results[0] as OversizedItemMarker; + expect(marker.__batchItemError).toBe("OVERSIZED"); + expect(marker.index).toBe(3); + expect(marker.task).toBe("my-task"); + expect(marker.maxSize).toBe(maxBytes); + expect(marker.actualSize).toBeGreaterThan(maxBytes); }); - it("should reject unbounded accumulation without newlines", async () => { + it("should emit OversizedItemMarker for unbounded accumulation without newlines", async () => { const maxBytes = 50; // Send data without any newlines that exceeds the buffer limit const encoder = new TextEncoder(); const chunks = [ - encoder.encode('{"start":"'), + encoder.encode('{"index":7,"task":"big-task","start":"'), encoder.encode("x".repeat(60)), // This will push buffer over 50 bytes ]; const stream = chunksToStream(chunks); const parser = createNdjsonParserStream(maxBytes); + const results = await collectStream(stream.pipeThrough(parser)); - await expect(collectStream(stream.pipeThrough(parser))).rejects.toThrow( - /exceeds maximum size.*no newline found/ - ); + expect(results).toHaveLength(1); + const marker = results[0] as OversizedItemMarker; + expect(marker.__batchItemError).toBe("OVERSIZED"); + expect(marker.index).toBe(7); + expect(marker.task).toBe("big-task"); + expect(marker.maxSize).toBe(maxBytes); }); it("should check byte size before decoding to prevent OOM", async () => { @@ -756,10 +769,12 @@ describe("createNdjsonParserStream", () => { const results1 = await collectStream(stream1.pipeThrough(parser1)); expect(results1).toHaveLength(1); - // Large one should fail + // Large one should emit an OversizedItemMarker const stream2 = chunksToStream([largeBytes]); const parser2 = createNdjsonParserStream(maxBytes); - await expect(collectStream(stream2.pipeThrough(parser2))).rejects.toThrow(/exceeds maximum/); + const results2 = await collectStream(stream2.pipeThrough(parser2)); + expect(results2).toHaveLength(1); + expect((results2[0] as OversizedItemMarker).__batchItemError).toBe("OVERSIZED"); }); it("should handle final line in flush without trailing newline", async () => { @@ -837,6 +852,28 @@ describe("createNdjsonParserStream", () => { expect(results).toEqual([]); }); + it("should pass normal items and emit markers for oversized items in the same stream", async () => { + const maxBytes = 50; + const encoder = new TextEncoder(); + // Normal item, then oversized item, then another normal item + const normalItem1 = '{"index":0,"task":"t","x":1}\n'; + const oversizedItem = JSON.stringify({ index: 1, task: "t", data: "x".repeat(100) }) + "\n"; + const normalItem2 = '{"index":2,"task":"t","x":2}\n'; + const stream = chunksToStream([encoder.encode(normalItem1 + oversizedItem + normalItem2)]); + + const parser = createNdjsonParserStream(maxBytes); + const results = await collectStream(stream.pipeThrough(parser)); + + expect(results).toHaveLength(3); + // First: normal parsed object + expect(results[0]).toEqual({ index: 0, task: "t", x: 1 }); + // Second: oversized marker + expect((results[1] as OversizedItemMarker).__batchItemError).toBe("OVERSIZED"); + expect((results[1] as OversizedItemMarker).index).toBe(1); + // Third: normal parsed object + expect(results[2]).toEqual({ index: 2, task: "t", x: 2 }); + }); + it("should handle stream with only whitespace", async () => { const encoder = new TextEncoder(); const stream = chunksToStream([encoder.encode(" \n\n \n")]); @@ -847,3 +884,34 @@ describe("createNdjsonParserStream", () => { expect(results).toEqual([]); }); }); + +describe("extractIndexAndTask", () => { + const encoder = new TextEncoder(); + + it("should extract index and task from JSON bytes", () => { + const bytes = encoder.encode('{"index":42,"task":"my-task","data":"x"}'); + const result = extractIndexAndTask(bytes); + expect(result.index).toBe(42); + expect(result.task).toBe("my-task"); + }); + + it("should return defaults for empty or malformed bytes", () => { + const result = extractIndexAndTask(new Uint8Array(0)); + expect(result.index).toBe(-1); + expect(result.task).toBe("unknown"); + }); + + it("should handle keys in any order", () => { + const bytes = encoder.encode('{"task":"other-task","data":"y","index":99}'); + const result = extractIndexAndTask(bytes); + expect(result.index).toBe(99); + expect(result.task).toBe("other-task"); + }); + + it("should not match nested keys", () => { + const bytes = encoder.encode('{"nested":{"index":999,"task":"inner"},"index":5,"task":"outer"}'); + const result = extractIndexAndTask(bytes); + expect(result.index).toBe(5); + expect(result.task).toBe("outer"); + }); +}); diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index 98bdacc052e..f571d7f51d3 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -848,7 +848,11 @@ export class BatchQueue { "BatchQueue.serializePayload", async (innerSpan) => { const str = - typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload); + item.payload === undefined || item.payload === null + ? "{}" + : typeof item.payload === "string" + ? item.payload + : JSON.stringify(item.payload); innerSpan?.setAttribute("batch.payloadSize", str.length); return str; } diff --git a/internal-packages/run-engine/src/engine/errors.ts b/internal-packages/run-engine/src/engine/errors.ts index 373f9daa14f..772282debd1 100644 --- a/internal-packages/run-engine/src/engine/errors.ts +++ b/internal-packages/run-engine/src/engine/errors.ts @@ -61,6 +61,7 @@ export function runStatusFromError( case "TASK_PROCESS_SIGTERM": case "TASK_DID_CONCURRENT_WAIT": case "BATCH_ITEM_COULD_NOT_TRIGGER": + case "PAYLOAD_TOO_LARGE": case "UNSPECIFIED_ERROR": return "SYSTEM_FAILURE"; default: diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index 91483251318..87fff767d7b 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -308,6 +308,7 @@ export function shouldRetryError(error: TaskRunError): boolean { case "TASK_HAS_N0_EXECUTION_SNAPSHOT": case "TASK_RUN_DEQUEUED_MAX_RETRIES": case "BATCH_ITEM_COULD_NOT_TRIGGER": + case "PAYLOAD_TOO_LARGE": case "UNSPECIFIED_ERROR": return false; diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index d489a59390e..f3757208335 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -188,6 +188,7 @@ export const TaskRunInternalError = z.object({ "TASK_DID_CONCURRENT_WAIT", "RECURSIVE_WAIT_DEADLOCK", "BATCH_ITEM_COULD_NOT_TRIGGER", + "PAYLOAD_TOO_LARGE", "UNSPECIFIED_ERROR", ]), message: z.string().optional(), diff --git a/references/hello-world/src/trigger/batches.ts b/references/hello-world/src/trigger/batches.ts index 6bbdf946120..15b57807293 100644 --- a/references/hello-world/src/trigger/batches.ts +++ b/references/hello-world/src/trigger/batches.ts @@ -999,6 +999,42 @@ export const largePayloadTask = task({ }, }); +// ============================================================================ +// Oversized Payload Graceful Handling +// ============================================================================ + +/** + * Test: Batch with oversized item should complete gracefully + * + * Sends 2 items: one normal, one oversized (~3.2MB). + * The oversized item should result in a pre-failed run (ok: false) + * while the normal item processes successfully (ok: true). + */ +export const batchSealFailureOversizedPayload = task({ + id: "batch-seal-failure-oversized", + maxDuration: 60, + run: async () => { + const results = await fixedLengthTask.batchTriggerAndWait([ + { payload: { waitSeconds: 1, output: "normal" } }, + { payload: { waitSeconds: 1, output: "x".repeat(3_200_000) } }, // ~3.2MB oversized + ]); + + const normal = results.runs[0]; + const oversized = results.runs[1]; + + logger.info("Batch results", { + normalOk: normal?.ok, + oversizedOk: oversized?.ok, + }); + + return { + normalOk: normal?.ok === true, + oversizedOk: oversized?.ok === false, + oversizedError: !oversized?.ok ? oversized?.error : undefined, + }; + }, +}); + type Payload = { waitSeconds: number; error?: string; From e7a6e7794df981a17c14bfb55f66ecfa0c4aa239 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 26 Feb 2026 17:52:58 +0000 Subject: [PATCH 2/5] gracefully handle larger ndjson lines that are chunked to the server over mulitple chunks instead of in 1 chunk --- .../services/streamBatchItems.server.ts | 53 ++++++++++++++++++- .../test/engine/streamBatchItems.test.ts | 32 +++++++++++ references/hello-world/src/trigger/batches.ts | 3 ++ 3 files changed, 87 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts index c64e827593c..859dfe2e6b9 100644 --- a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts +++ b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts @@ -211,6 +211,34 @@ export class StreamBatchItemsService extends WithRunEngine { // Validate we received the expected number of items if (enqueuedCount !== batch.runCount) { + // The batch queue consumers may have already processed all items and + // cleaned up the Redis keys before we got here (especially likely when + // items include pre-failed runs that complete instantly). Check if the + // batch was already sealed/completed in Postgres. + const currentBatch = await this._prisma.batchTaskRun.findUnique({ + where: { id: batchId }, + select: { sealed: true, status: true }, + }); + + if (currentBatch?.sealed) { + logger.info("Batch already sealed before count check (fast completion)", { + batchId: batchFriendlyId, + itemsAccepted, + itemsDeduplicated, + enqueuedCount, + expectedCount: batch.runCount, + batchStatus: currentBatch.status, + }); + + return { + id: batchFriendlyId, + itemsAccepted, + itemsDeduplicated, + sealed: true, + runCount: batch.runCount, + }; + } + logger.warn("Batch item count mismatch", { batchId: batchFriendlyId, expected: batch.runCount, @@ -463,6 +491,9 @@ export function createNdjsonParserStream( let chunks: Uint8Array[] = []; let totalBytes = 0; let lineNumber = 0; + // When an oversized incomplete line is detected (Case 2), we must discard + // all remaining bytes of that line until the next newline delimiter. + let skipUntilNewline = false; const NEWLINE_BYTE = 0x0a; // '\n' @@ -556,6 +587,24 @@ export function createNdjsonParserStream( return new TransformStream({ transform(chunk, controller) { + // If we're skipping the remainder of an oversized line, scan for the + // next newline in this chunk and discard everything before it. + if (skipUntilNewline) { + const nlPos = chunk.indexOf(NEWLINE_BYTE); + if (nlPos === -1) { + // Entire chunk is still part of the oversized line — discard it + return; + } + // Found the newline — keep everything after it + skipUntilNewline = false; + const remaining = chunk.slice(nlPos + 1); + if (remaining.byteLength === 0) { + return; + } + // Replace chunk with the remainder and fall through to normal processing + chunk = remaining; + } + // Append chunk to buffer chunks.push(chunk); totalBytes += chunk.byteLength; @@ -598,9 +647,11 @@ export function createNdjsonParserStream( }; controller.enqueue(marker); lineNumber++; - // Clear buffer since we consumed the oversized data + // Clear buffer and skip remaining bytes of this oversized line + // until the next newline delimiter is found in a subsequent chunk chunks = []; totalBytes = 0; + skipUntilNewline = true; return; } }, diff --git a/apps/webapp/test/engine/streamBatchItems.test.ts b/apps/webapp/test/engine/streamBatchItems.test.ts index 928c0e98de6..f4d286ba207 100644 --- a/apps/webapp/test/engine/streamBatchItems.test.ts +++ b/apps/webapp/test/engine/streamBatchItems.test.ts @@ -747,6 +747,38 @@ describe("createNdjsonParserStream", () => { expect(marker.maxSize).toBe(maxBytes); }); + it("should skip remaining bytes of oversized line arriving in subsequent chunks", async () => { + const maxBytes = 50; + const encoder = new TextEncoder(); + // Simulate a normal item, then an oversized item split across many chunks, + // then another normal item after the newline. + // The oversized line is: {"index":1,"task":"t","data":"xxxx...120 x's...xxxx"}\n + const normalItem1 = '{"index":0,"task":"t","x":1}\n'; + const oversizedStart = '{"index":1,"task":"t","data":"'; + const oversizedMiddle = "x".repeat(120); // way over 50 bytes + const oversizedEnd = '"}\n'; + const normalItem2 = '{"index":2,"task":"t","x":2}\n'; + + // Send as separate chunks to trigger Case 2 (no newline, buffer > limit) + const chunks = [ + encoder.encode(normalItem1 + oversizedStart), + encoder.encode(oversizedMiddle.slice(0, 60)), + encoder.encode(oversizedMiddle.slice(60)), + encoder.encode(oversizedEnd + normalItem2), + ]; + const stream = chunksToStream(chunks); + + const parser = createNdjsonParserStream(maxBytes); + const results = await collectStream(stream.pipeThrough(parser)); + + // Should get: normal item 1, oversized marker, normal item 2 + expect(results).toHaveLength(3); + expect(results[0]).toEqual({ index: 0, task: "t", x: 1 }); + expect((results[1] as OversizedItemMarker).__batchItemError).toBe("OVERSIZED"); + expect((results[1] as OversizedItemMarker).index).toBe(1); + expect(results[2]).toEqual({ index: 2, task: "t", x: 2 }); + }); + it("should check byte size before decoding to prevent OOM", async () => { // This test verifies that size is checked on raw bytes, not decoded string length // Unicode characters like "🎉" are 4 bytes but 2 UTF-16 code units (string length 2) diff --git a/references/hello-world/src/trigger/batches.ts b/references/hello-world/src/trigger/batches.ts index 15b57807293..b6a3f79e74e 100644 --- a/references/hello-world/src/trigger/batches.ts +++ b/references/hello-world/src/trigger/batches.ts @@ -1013,6 +1013,9 @@ export const largePayloadTask = task({ export const batchSealFailureOversizedPayload = task({ id: "batch-seal-failure-oversized", maxDuration: 60, + retry: { + maxAttempts: 1, + }, run: async () => { const results = await fixedLengthTask.batchTriggerAndWait([ { payload: { waitSeconds: 1, output: "normal" } }, From c4ca399a9d60e4d514b0aeb24cef8657dc9b2ff1 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 26 Feb 2026 17:55:18 +0000 Subject: [PATCH 3/5] added changesets --- .changeset/modern-boxes-watch.md | 5 +++++ .server-changes/graceful-oversized-batch-items.md | 10 ++++++++++ 2 files changed, 15 insertions(+) create mode 100644 .changeset/modern-boxes-watch.md create mode 100644 .server-changes/graceful-oversized-batch-items.md diff --git a/.changeset/modern-boxes-watch.md b/.changeset/modern-boxes-watch.md new file mode 100644 index 00000000000..e9539e2105b --- /dev/null +++ b/.changeset/modern-boxes-watch.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Add PAYLOAD_TOO_LARGE error to handle graceful recovery of sending batch trigger items with payloads that exceed the maximum payload size diff --git a/.server-changes/graceful-oversized-batch-items.md b/.server-changes/graceful-oversized-batch-items.md new file mode 100644 index 00000000000..980dd33e537 --- /dev/null +++ b/.server-changes/graceful-oversized-batch-items.md @@ -0,0 +1,10 @@ +--- +area: webapp +type: fix +--- + +Gracefully handle oversized batch items instead of aborting the stream. + +When an NDJSON batch item exceeds the maximum size, the parser now emits an error marker instead of throwing, allowing the batch to seal normally. The oversized item becomes a pre-failed run with `PAYLOAD_TOO_LARGE` error code, while other items in the batch process successfully. This prevents `batchTriggerAndWait` from seeing connection errors and retrying with exponential backoff. + +Also fixes the NDJSON parser not consuming the remainder of an oversized line split across multiple chunks, which caused "Invalid JSON" errors on subsequent lines. From 3a656b7a08babac2a11db0a91a937e36a57c97ed Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 26 Feb 2026 22:13:59 +0000 Subject: [PATCH 4/5] handle payload serialization in the same way inside the catch --- ailogger-output.log | 0 internal-packages/run-engine/src/batch-queue/index.ts | 6 +++++- 2 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 ailogger-output.log diff --git a/ailogger-output.log b/ailogger-output.log new file mode 100644 index 00000000000..e69de29bb2d diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index f571d7f51d3..312bf4772f7 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -916,7 +916,11 @@ export class BatchQueue { "BatchQueue.serializePayload", async (innerSpan) => { const str = - typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload); + item.payload === undefined || item.payload === null + ? "{}" + : typeof item.payload === "string" + ? item.payload + : JSON.stringify(item.payload); innerSpan?.setAttribute("batch.payloadSize", str.length); return str; } From 81177218f6f6f03cded6bc37ef3e240b45d982c4 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 27 Feb 2026 10:00:51 +0000 Subject: [PATCH 5/5] Add test to make sure newlines in batch item payloads don't break our newline scanner --- .../test/engine/streamBatchItems.test.ts | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/apps/webapp/test/engine/streamBatchItems.test.ts b/apps/webapp/test/engine/streamBatchItems.test.ts index f4d286ba207..2dee8668762 100644 --- a/apps/webapp/test/engine/streamBatchItems.test.ts +++ b/apps/webapp/test/engine/streamBatchItems.test.ts @@ -643,6 +643,25 @@ describe("createNdjsonParserStream", () => { expect(results).toEqual([{ id: 1 }, { id: 2 }]); }); + it("should handle escaped newlines in JSON string values", async () => { + // JSON.stringify escapes newlines as \n (two chars: backslash + n), + // so they don't break NDJSON line boundaries. This is the normal case + // when the SDK serializes payloads containing newlines. + const item1 = JSON.stringify({ payload: "line1\nline2\nline3" }); + const item2 = JSON.stringify({ payload: "no newlines" }); + const ndjson = item1 + "\n" + item2 + "\n"; + const encoder = new TextEncoder(); + const stream = chunksToStream([encoder.encode(ndjson)]); + + const parser = createNdjsonParserStream(1024); + const results = await collectStream(stream.pipeThrough(parser)); + + expect(results).toEqual([ + { payload: "line1\nline2\nline3" }, + { payload: "no newlines" }, + ]); + }); + it("should skip empty lines", async () => { const ndjson = '{"a":1}\n\n{"b":2}\n \n{"c":3}\n'; const encoder = new TextEncoder();