-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix(web): handle BrokenPipeError in SessionProcess.send_message #2324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -661,8 +661,24 @@ async def send_message(self, message: str) -> None: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.error(f"{e.__class__.__name__} {e}: Invalid JSONRPC in message: {message}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| process.stdin.write((message + "\n").encode("utf-8")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await process.stdin.drain() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| process.stdin.write((message + "\n").encode("utf-8")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await process.stdin.drain() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except (BrokenPipeError, ConnectionResetError) as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Subprocess died between our `start()` check above and the actual write. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # `_read_loop` will eventually observe the exit and emit "stopped" / | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # "crashed", but right now the caller (FastAPI / websocket handler) would | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # otherwise see a raw exception propagate to the response. Emit an error | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # status so any attached websocket clients see the failure synchronously. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"send_message: subprocess stdin {e.__class__.__name__}; " | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"process likely exited (returncode={process.returncode})" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self._emit_status( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "error", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| reason="stdin_broken", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| detail=f"{e.__class__.__name__}: {e}", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+667
to
+681
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 In-flight prompt ID not cleaned up on BrokenPipeError leaves session stuck in 'busy' state When The comment says
Suggested change
Was this helpful? React with 👍 or 👎 to provide feedback. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class KimiCLIRunner: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the broken stdin write is for a
JSONRPCPromptMessage, the prompt id was already added to_in_flight_prompt_ids, but this new error path emits thestdin_brokenstatus without clearing it. That makes the session report an error whileis_busyremains true, so clients reacting immediately to the status can still be rejected by paths such asget_editable_session()'s busy check until_read_looplater catches up; the existing EOF/error paths explicitly clear in-flight ids before broadcasting for this reason. Clear_in_flight_prompt_idsbefore this_emit_statuscall.Useful? React with 👍 / 👎.