Skip to content

Commit a306a33

Browse files
committed
fix(core): a chunk timeout when processing llm stream
1 parent 1d9dcd2 commit a306a33

1 file changed

Lines changed: 27 additions & 1 deletion

File tree

packages/opencode/src/session/processor.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,25 @@ import { Question } from "@/question"
1818

1919
export namespace SessionProcessor {
2020
const DOOM_LOOP_THRESHOLD = 3
21+
const CHUNK_TIMEOUT = 3 * 60 * 1000
2122
const log = Log.create({ service: "session.processor" })
2223

24+
function createChunkTimer() {
25+
let timer: ReturnType<typeof setTimeout> | undefined
26+
return {
27+
promise: new Promise<never>((_, reject) => {
28+
timer = setTimeout(() => {
29+
reject(new Error("Chunk timeout exceeded"))
30+
}, CHUNK_TIMEOUT)
31+
}),
32+
clear() {
33+
if (!timer) return
34+
clearTimeout(timer)
35+
timer = undefined
36+
},
37+
}
38+
}
39+
2340
export type Info = Awaited<ReturnType<typeof create>>
2441
export type Result = Awaited<ReturnType<Info["process"]>>
2542

@@ -50,10 +67,19 @@ export namespace SessionProcessor {
5067
try {
5168
let currentText: MessageV2.TextPart | undefined
5269
let reasoningMap: Record<string, MessageV2.ReasoningPart> = {}
70+
5371
const stream = await LLM.stream(streamInput)
72+
const iterator = stream.fullStream[Symbol.asyncIterator]()
5473

55-
for await (const value of stream.fullStream) {
74+
while (true) {
5675
input.abort.throwIfAborted()
76+
const chunkTimer = createChunkTimer()
77+
const next = await Promise.race([iterator.next(), chunkTimer.promise]).finally(() => {
78+
chunkTimer.clear()
79+
})
80+
if (next.done) break
81+
const value = next.value
82+
5783
switch (value.type) {
5884
case "start":
5985
SessionStatus.set(input.sessionID, { type: "busy" })

0 commit comments

Comments
 (0)