From 07c10f9dcf95ad29fe2177437f2b0d58bf2f4787 Mon Sep 17 00:00:00 2001 From: yuj Date: Tue, 19 May 2026 12:10:05 +0800 Subject: [PATCH] fix(web): handle BrokenPipeError in SessionProcess.send_message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `SessionProcess.send_message` writes to `process.stdin` and awaits `drain()` without guarding against the subprocess having exited between the `start()` call and the actual write. In a normal lifecycle `_read_loop` observes the exit and emits a "stopped" / "crashed" status, but there is a window where the caller (FastAPI/websocket handler) will otherwise see a raw `BrokenPipeError` propagate out of `send_message` — crashing the request and leaving any attached websocket clients with no diagnostic. Wrap the write+drain pair in a try/except for `BrokenPipeError` / `ConnectionResetError`, log a warning with the process returncode, and emit an "error" status with reason="stdin_broken" so subscribers learn about the failure synchronously instead of via the eventual read-loop exit message. No behavior change on the happy path. --- src/kimi_cli/web/runner/process.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/kimi_cli/web/runner/process.py b/src/kimi_cli/web/runner/process.py index 16f60a895..ba47a0125 100644 --- a/src/kimi_cli/web/runner/process.py +++ b/src/kimi_cli/web/runner/process.py @@ -661,8 +661,24 @@ async def send_message(self, message: str) -> None: logger.error(f"{e.__class__.__name__} {e}: Invalid JSONRPC in message: {message}") return - process.stdin.write((message + "\n").encode("utf-8")) - await process.stdin.drain() + try: + process.stdin.write((message + "\n").encode("utf-8")) + await process.stdin.drain() + except (BrokenPipeError, ConnectionResetError) as e: + # Subprocess died between our `start()` check above and the actual write. + # `_read_loop` will eventually observe the exit and emit "stopped" / + # "crashed", but right now the caller (FastAPI / websocket handler) would + # otherwise see a raw exception propagate to the response. Emit an error + # status so any attached websocket clients see the failure synchronously. + logger.warning( + f"send_message: subprocess stdin {e.__class__.__name__}; " + f"process likely exited (returncode={process.returncode})" + ) + await self._emit_status( + "error", + reason="stdin_broken", + detail=f"{e.__class__.__name__}: {e}", + ) class KimiCLIRunner: