Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions packages/kaos/src/kaos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,21 @@ async def mkdir(
"""Create a directory at the given path."""
...

async def exec(self, *args: str, env: Mapping[str, str] | None = None) -> KaosProcess:
async def exec(
self,
*args: str,
env: Mapping[str, str] | None = None,
start_new_session: bool = False,
) -> KaosProcess:
"""
Execute a command with arguments and return the running process.

Args:
*args: Command and its arguments.
env: Environment variables for the subprocess. If None, inherits
from the parent process.
start_new_session: Run local processes in a separate process group/session
when the backend supports it.
"""
...

Expand Down Expand Up @@ -342,5 +349,13 @@ async def mkdir(path: StrOrKaosPath, parents: bool = False, exist_ok: bool = Fal
return await get_current_kaos().mkdir(path, parents=parents, exist_ok=exist_ok)


async def exec(*args: str, env: Mapping[str, str] | None = None) -> KaosProcess:
return await get_current_kaos().exec(*args, env=env)
async def exec(
*args: str,
env: Mapping[str, str] | None = None,
start_new_session: bool = False,
) -> KaosProcess:
return await get_current_kaos().exec(
*args,
env=env,
start_new_session=start_new_session,
)
31 changes: 25 additions & 6 deletions packages/kaos/src/kaos/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

import asyncio
import os
import subprocess
from asyncio.subprocess import Process as AsyncioProcess
from collections.abc import AsyncGenerator
from pathlib import Path, PurePath
from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING, Any, Literal

if os.name == "nt":
import ntpath as pathmodule
Expand Down Expand Up @@ -162,16 +163,34 @@ async def mkdir(
local_path = path.unsafe_to_local_path() if isinstance(path, KaosPath) else Path(path)
await asyncio.to_thread(local_path.mkdir, parents=parents, exist_ok=exist_ok)

async def exec(self, *args: str, env: Mapping[str, str] | None = None) -> KaosProcess:
async def exec(
self,
*args: str,
env: Mapping[str, str] | None = None,
start_new_session: bool = False,
) -> KaosProcess:
if not args:
raise ValueError("At least one argument (the program to execute) is required.")

spawn_kwargs: dict[str, Any] = {
"stdin": asyncio.subprocess.PIPE,
"stdout": asyncio.subprocess.PIPE,
"stderr": asyncio.subprocess.PIPE,
"env": env,
}
if start_new_session:
if os.name == "nt":
spawn_kwargs["creationflags"] = getattr(
subprocess,
"CREATE_NEW_PROCESS_GROUP",
0,
)
else:
spawn_kwargs["start_new_session"] = True

process = await asyncio.create_subprocess_exec(
*args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
**spawn_kwargs,
)
return self.Process(process)

Expand Down
7 changes: 6 additions & 1 deletion packages/kaos/src/kaos/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,12 @@ async def mkdir(
raise FileExistsError(f"{path} already exists")
await self._sftp.mkdir(str(path))

async def exec(self, *args: str, env: Mapping[str, str] | None = None) -> KaosProcess:
async def exec(
self,
*args: str,
env: Mapping[str, str] | None = None,
start_new_session: bool = False,
) -> KaosProcess:
if not args:
raise ValueError("At least one argument (the program to execute) is required.")
command = " ".join(shlex.quote(arg) for arg in args)
Expand Down
13 changes: 11 additions & 2 deletions src/kimi_cli/acp/kaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,17 @@ async def mkdir(
) -> None:
await self._fallback.mkdir(path, parents=parents, exist_ok=exist_ok)

async def exec(self, *args: str, env: Mapping[str, str] | None = None) -> KaosProcess:
return await self._fallback.exec(*args, env=env)
async def exec(
self,
*args: str,
env: Mapping[str, str] | None = None,
start_new_session: bool = False,
) -> KaosProcess:
return await self._fallback.exec(
*args,
env=env,
start_new_session=start_new_session,
)

def _abs_path(self, path: StrOrKaosPath) -> str:
kaos_path = path if isinstance(path, KaosPath) else KaosPath(path)
Expand Down
38 changes: 35 additions & 3 deletions src/kimi_cli/tools/shell/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import asyncio
import contextlib
import os
import signal
import subprocess
from collections.abc import Callable
from pathlib import Path
from typing import Self, override
Expand Down Expand Up @@ -234,7 +238,11 @@ async def _read_stream(stream: AsyncReadable, cb: Callable[[bytes], None]):
# running, not an empty/stale value inherited from the parent (most visible
# on Windows, where the parent's SHELL is typically empty or PowerShell).
env["SHELL"] = str(self._shell_path)
process = await kaos.exec(*self._shell_args(command), env=env)
process = await kaos.exec(
*self._shell_args(command),
env=env,
start_new_session=True,
)

# Close stdin immediately so interactive prompts (e.g. git password) get
# EOF instead of hanging forever waiting for input that will never come.
Expand All @@ -250,11 +258,35 @@ async def _read_stream(stream: AsyncReadable, cb: Callable[[bytes], None]):
)
return await process.wait()
except asyncio.CancelledError:
await process.kill()
await self._terminate_shell_process(process)
raise
except TimeoutError:
await process.kill()
await self._terminate_shell_process(process)
raise

def _shell_args(self, command: str) -> tuple[str, ...]:
return (str(self._shell_path), "-c", command)

async def _terminate_shell_process(self, process: kaos.KaosProcess) -> None:
pid = getattr(process, "pid", -1)

if pid > 0 and self._on_windows:
await asyncio.to_thread(
subprocess.run,
["taskkill", "/PID", str(pid), "/T", "/F"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
elif pid > 0 and os.name != "nt":
Comment on lines +273 to +281
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 _terminate_shell_process uses self._on_windows instead of os.name for platform detection, mismatching process creation

_terminate_shell_process checks self._on_windows (derived from environment.os_kind) to decide the termination strategy, but the process was created in packages/kaos/src/kaos/local.py:182 using os.name == "nt" to decide whether to set CREATE_NEW_PROCESS_GROUP (Windows) or start_new_session (Unix). The existing background worker at src/kimi_cli/background/worker.py:94 correctly uses os.name == "nt" consistently for both creation and termination.

If self._on_windows ever disagrees with os.name (e.g., a test constructs a Shell with os_kind="Windows" on Linux, or an SSH-backed scenario), _terminate_shell_process would try to run taskkill on a Unix machine. Since the taskkill path has no exception handling (unlike the Unix os.killpg path which catches OSError), subprocess.run(["taskkill", ...]) would raise FileNotFoundError, which propagates uncaught from _terminate_shell_process and replaces the original CancelledError/TimeoutError — leaving the process group alive and producing an unexpected exception type.

Suggested change
if pid > 0 and self._on_windows:
await asyncio.to_thread(
subprocess.run,
["taskkill", "/PID", str(pid), "/T", "/F"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
elif pid > 0 and os.name != "nt":
if pid > 0 and os.name == "nt":
await asyncio.to_thread(
subprocess.run,
["taskkill", "/PID", str(pid), "/T", "/F"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
elif pid > 0 and os.name != "nt":
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

try:
os.killpg(pid, signal.SIGKILL)
except OSError:
with contextlib.suppress(Exception):
await process.kill()
else:
await process.kill()
return

with contextlib.suppress(Exception):
await asyncio.wait_for(process.wait(), timeout=1)
5 changes: 4 additions & 1 deletion tests/tools/test_shell_bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ async def test_cancelled_command_kills_process(shell_tool: Shell, monkeypatch: p
"""Test that cancelling a shell run kills the underlying process."""

started = asyncio.Event()
exec_kwargs: list[dict] = []

class BlockingReadable:
async def readline(self) -> bytes:
Expand All @@ -384,7 +385,8 @@ async def kill(self) -> None:

fake_process = FakeProcess()

async def fake_exec(*_args, **_kwargs) -> FakeProcess:
async def fake_exec(*_args, **kwargs) -> FakeProcess:
exec_kwargs.append(kwargs)
return fake_process

monkeypatch.setattr("kimi_cli.tools.shell.kaos.exec", fake_exec)
Expand All @@ -399,3 +401,4 @@ async def fake_exec(*_args, **_kwargs) -> FakeProcess:
await task

assert fake_process.kill_calls == 1
assert exec_kwargs[0]["start_new_session"] is True
79 changes: 79 additions & 0 deletions tests/tools/test_shell_process_lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from __future__ import annotations

import asyncio

import pytest
from kaos.path import KaosPath

from kimi_cli.tools.shell import Shell
from kimi_cli.utils.environment import Environment


class _NullStdin:
def close(self) -> None:
pass


class _BlockingReadable:
def __init__(self, started: asyncio.Event) -> None:
self._started = started

async def readline(self) -> bytes:
self._started.set()
await asyncio.Event().wait()
raise AssertionError("unreachable")


class _FakeProcess:
stdin = _NullStdin()

def __init__(self, started: asyncio.Event) -> None:
self.stdout = _BlockingReadable(started)
self.stderr = _BlockingReadable(started)
self.kill_calls = 0

async def wait(self) -> int:
return 0

async def kill(self) -> None:
self.kill_calls += 1


def _make_shell(approval, runtime) -> Shell:
env = Environment(
os_kind="Linux",
os_arch="x86_64",
os_version="1.0",
shell_name="bash",
shell_path=KaosPath("/bin/bash"),
)
return Shell(approval, env, runtime)


async def test_foreground_shell_uses_new_session_and_kills_on_cancel(
approval,
runtime,
monkeypatch: pytest.MonkeyPatch,
):
started = asyncio.Event()
fake_process = _FakeProcess(started)
exec_kwargs: list[dict] = []

async def fake_exec(*_args, **kwargs) -> _FakeProcess:
exec_kwargs.append(kwargs)
return fake_process

monkeypatch.setattr("kimi_cli.tools.shell.kaos.exec", fake_exec)
shell = _make_shell(approval, runtime)

task = asyncio.create_task(
shell._run_shell_command("sleep 10", lambda _line: None, lambda _line: None, 60)
)
await asyncio.wait_for(started.wait(), timeout=1.0)
task.cancel()

with pytest.raises(asyncio.CancelledError):
await task

assert exec_kwargs[0]["start_new_session"] is True
assert fake_process.kill_calls == 1
Loading