From af69243743c1fe9619ba5095a921cab36a3a3747 Mon Sep 17 00:00:00 2001 From: Yufeng He <40085740+he-yufeng@users.noreply.github.com> Date: Wed, 20 May 2026 02:55:14 +0800 Subject: [PATCH] fix: terminate shell process trees on timeout --- packages/kaos/src/kaos/__init__.py | 21 +++++- packages/kaos/src/kaos/local.py | 31 ++++++-- packages/kaos/src/kaos/ssh.py | 7 +- src/kimi_cli/acp/kaos.py | 13 +++- src/kimi_cli/tools/shell/__init__.py | 38 +++++++++- tests/tools/test_shell_bash.py | 5 +- tests/tools/test_shell_process_lifecycle.py | 79 +++++++++++++++++++++ 7 files changed, 178 insertions(+), 16 deletions(-) create mode 100644 tests/tools/test_shell_process_lifecycle.py diff --git a/packages/kaos/src/kaos/__init__.py b/packages/kaos/src/kaos/__init__.py index f14c6cffb..16601cd87 100644 --- a/packages/kaos/src/kaos/__init__.py +++ b/packages/kaos/src/kaos/__init__.py @@ -216,7 +216,12 @@ async def mkdir( """Create a directory at the given path.""" ... - async def exec(self, *args: str, env: Mapping[str, str] | None = None) -> KaosProcess: + async def exec( + self, + *args: str, + env: Mapping[str, str] | None = None, + start_new_session: bool = False, + ) -> KaosProcess: """ Execute a command with arguments and return the running process. @@ -224,6 +229,8 @@ async def exec(self, *args: str, env: Mapping[str, str] | None = None) -> KaosPr *args: Command and its arguments. env: Environment variables for the subprocess. If None, inherits from the parent process. + start_new_session: Run local processes in a separate process group/session + when the backend supports it. """ ... @@ -342,5 +349,13 @@ async def mkdir(path: StrOrKaosPath, parents: bool = False, exist_ok: bool = Fal return await get_current_kaos().mkdir(path, parents=parents, exist_ok=exist_ok) -async def exec(*args: str, env: Mapping[str, str] | None = None) -> KaosProcess: - return await get_current_kaos().exec(*args, env=env) +async def exec( + *args: str, + env: Mapping[str, str] | None = None, + start_new_session: bool = False, +) -> KaosProcess: + return await get_current_kaos().exec( + *args, + env=env, + start_new_session=start_new_session, + ) diff --git a/packages/kaos/src/kaos/local.py b/packages/kaos/src/kaos/local.py index 7ab2f7cd0..e85865a6a 100644 --- a/packages/kaos/src/kaos/local.py +++ b/packages/kaos/src/kaos/local.py @@ -2,10 +2,11 @@ import asyncio import os +import subprocess from asyncio.subprocess import Process as AsyncioProcess from collections.abc import AsyncGenerator from pathlib import Path, PurePath -from typing import TYPE_CHECKING, Literal +from typing import TYPE_CHECKING, Any, Literal if os.name == "nt": import ntpath as pathmodule @@ -162,16 +163,34 @@ async def mkdir( local_path = path.unsafe_to_local_path() if isinstance(path, KaosPath) else Path(path) await asyncio.to_thread(local_path.mkdir, parents=parents, exist_ok=exist_ok) - async def exec(self, *args: str, env: Mapping[str, str] | None = None) -> KaosProcess: + async def exec( + self, + *args: str, + env: Mapping[str, str] | None = None, + start_new_session: bool = False, + ) -> KaosProcess: if not args: raise ValueError("At least one argument (the program to execute) is required.") + spawn_kwargs: dict[str, Any] = { + "stdin": asyncio.subprocess.PIPE, + "stdout": asyncio.subprocess.PIPE, + "stderr": asyncio.subprocess.PIPE, + "env": env, + } + if start_new_session: + if os.name == "nt": + spawn_kwargs["creationflags"] = getattr( + subprocess, + "CREATE_NEW_PROCESS_GROUP", + 0, + ) + else: + spawn_kwargs["start_new_session"] = True + process = await asyncio.create_subprocess_exec( *args, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=env, + **spawn_kwargs, ) return self.Process(process) diff --git a/packages/kaos/src/kaos/ssh.py b/packages/kaos/src/kaos/ssh.py index b623742d8..8264b3131 100644 --- a/packages/kaos/src/kaos/ssh.py +++ b/packages/kaos/src/kaos/ssh.py @@ -273,7 +273,12 @@ async def mkdir( raise FileExistsError(f"{path} already exists") await self._sftp.mkdir(str(path)) - async def exec(self, *args: str, env: Mapping[str, str] | None = None) -> KaosProcess: + async def exec( + self, + *args: str, + env: Mapping[str, str] | None = None, + start_new_session: bool = False, + ) -> KaosProcess: if not args: raise ValueError("At least one argument (the program to execute) is required.") command = " ".join(shlex.quote(arg) for arg in args) diff --git a/src/kimi_cli/acp/kaos.py b/src/kimi_cli/acp/kaos.py index 50319031f..712bf200c 100644 --- a/src/kimi_cli/acp/kaos.py +++ b/src/kimi_cli/acp/kaos.py @@ -283,8 +283,17 @@ async def mkdir( ) -> None: await self._fallback.mkdir(path, parents=parents, exist_ok=exist_ok) - async def exec(self, *args: str, env: Mapping[str, str] | None = None) -> KaosProcess: - return await self._fallback.exec(*args, env=env) + async def exec( + self, + *args: str, + env: Mapping[str, str] | None = None, + start_new_session: bool = False, + ) -> KaosProcess: + return await self._fallback.exec( + *args, + env=env, + start_new_session=start_new_session, + ) def _abs_path(self, path: StrOrKaosPath) -> str: kaos_path = path if isinstance(path, KaosPath) else KaosPath(path) diff --git a/src/kimi_cli/tools/shell/__init__.py b/src/kimi_cli/tools/shell/__init__.py index 790f425db..01fbaa791 100644 --- a/src/kimi_cli/tools/shell/__init__.py +++ b/src/kimi_cli/tools/shell/__init__.py @@ -1,4 +1,8 @@ import asyncio +import contextlib +import os +import signal +import subprocess from collections.abc import Callable from pathlib import Path from typing import Self, override @@ -234,7 +238,11 @@ async def _read_stream(stream: AsyncReadable, cb: Callable[[bytes], None]): # running, not an empty/stale value inherited from the parent (most visible # on Windows, where the parent's SHELL is typically empty or PowerShell). env["SHELL"] = str(self._shell_path) - process = await kaos.exec(*self._shell_args(command), env=env) + process = await kaos.exec( + *self._shell_args(command), + env=env, + start_new_session=True, + ) # Close stdin immediately so interactive prompts (e.g. git password) get # EOF instead of hanging forever waiting for input that will never come. @@ -250,11 +258,35 @@ async def _read_stream(stream: AsyncReadable, cb: Callable[[bytes], None]): ) return await process.wait() except asyncio.CancelledError: - await process.kill() + await self._terminate_shell_process(process) raise except TimeoutError: - await process.kill() + await self._terminate_shell_process(process) raise def _shell_args(self, command: str) -> tuple[str, ...]: return (str(self._shell_path), "-c", command) + + async def _terminate_shell_process(self, process: kaos.KaosProcess) -> None: + pid = getattr(process, "pid", -1) + + if pid > 0 and self._on_windows: + await asyncio.to_thread( + subprocess.run, + ["taskkill", "/PID", str(pid), "/T", "/F"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + ) + elif pid > 0 and os.name != "nt": + try: + os.killpg(pid, signal.SIGKILL) + except OSError: + with contextlib.suppress(Exception): + await process.kill() + else: + await process.kill() + return + + with contextlib.suppress(Exception): + await asyncio.wait_for(process.wait(), timeout=1) diff --git a/tests/tools/test_shell_bash.py b/tests/tools/test_shell_bash.py index 603988236..86c181140 100644 --- a/tests/tools/test_shell_bash.py +++ b/tests/tools/test_shell_bash.py @@ -358,6 +358,7 @@ async def test_cancelled_command_kills_process(shell_tool: Shell, monkeypatch: p """Test that cancelling a shell run kills the underlying process.""" started = asyncio.Event() + exec_kwargs: list[dict] = [] class BlockingReadable: async def readline(self) -> bytes: @@ -384,7 +385,8 @@ async def kill(self) -> None: fake_process = FakeProcess() - async def fake_exec(*_args, **_kwargs) -> FakeProcess: + async def fake_exec(*_args, **kwargs) -> FakeProcess: + exec_kwargs.append(kwargs) return fake_process monkeypatch.setattr("kimi_cli.tools.shell.kaos.exec", fake_exec) @@ -399,3 +401,4 @@ async def fake_exec(*_args, **_kwargs) -> FakeProcess: await task assert fake_process.kill_calls == 1 + assert exec_kwargs[0]["start_new_session"] is True diff --git a/tests/tools/test_shell_process_lifecycle.py b/tests/tools/test_shell_process_lifecycle.py new file mode 100644 index 000000000..ce2be0651 --- /dev/null +++ b/tests/tools/test_shell_process_lifecycle.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import asyncio + +import pytest +from kaos.path import KaosPath + +from kimi_cli.tools.shell import Shell +from kimi_cli.utils.environment import Environment + + +class _NullStdin: + def close(self) -> None: + pass + + +class _BlockingReadable: + def __init__(self, started: asyncio.Event) -> None: + self._started = started + + async def readline(self) -> bytes: + self._started.set() + await asyncio.Event().wait() + raise AssertionError("unreachable") + + +class _FakeProcess: + stdin = _NullStdin() + + def __init__(self, started: asyncio.Event) -> None: + self.stdout = _BlockingReadable(started) + self.stderr = _BlockingReadable(started) + self.kill_calls = 0 + + async def wait(self) -> int: + return 0 + + async def kill(self) -> None: + self.kill_calls += 1 + + +def _make_shell(approval, runtime) -> Shell: + env = Environment( + os_kind="Linux", + os_arch="x86_64", + os_version="1.0", + shell_name="bash", + shell_path=KaosPath("/bin/bash"), + ) + return Shell(approval, env, runtime) + + +async def test_foreground_shell_uses_new_session_and_kills_on_cancel( + approval, + runtime, + monkeypatch: pytest.MonkeyPatch, +): + started = asyncio.Event() + fake_process = _FakeProcess(started) + exec_kwargs: list[dict] = [] + + async def fake_exec(*_args, **kwargs) -> _FakeProcess: + exec_kwargs.append(kwargs) + return fake_process + + monkeypatch.setattr("kimi_cli.tools.shell.kaos.exec", fake_exec) + shell = _make_shell(approval, runtime) + + task = asyncio.create_task( + shell._run_shell_command("sleep 10", lambda _line: None, lambda _line: None, 60) + ) + await asyncio.wait_for(started.wait(), timeout=1.0) + task.cancel() + + with pytest.raises(asyncio.CancelledError): + await task + + assert exec_kwargs[0]["start_new_session"] is True + assert fake_process.kill_calls == 1