From 7cbf01ff27c52eec97f34716dcb4877f7fc4f94a Mon Sep 17 00:00:00 2001 From: huntharo Date: Sun, 24 May 2026 11:44:41 -0400 Subject: [PATCH 1/3] fix(acp): assign message ids to streamed content --- pyproject.toml | 2 +- src/kimi_cli/acp/server.py | 49 +++++++---------------- src/kimi_cli/acp/session.py | 49 ++++++++++++++++++----- tests/acp/test_server_initialize.py | 35 +++------------- tests/acp/test_session_notifications.py | 45 ++++++++++++++++++++- tests/ui_and_conv/test_acp_server_auth.py | 12 ++---- uv.lock | 8 ++-- 7 files changed, 112 insertions(+), 88 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e1f71a322..eea789f5b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ description = "Kimi Code CLI is your next CLI agent." readme = "README.md" requires-python = ">=3.12" dependencies = [ - "agent-client-protocol==0.8.0", + "agent-client-protocol==0.10.0", "aiofiles>=24.0,<26.0", "aiohttp==3.13.3", "typer==0.21.1", diff --git a/src/kimi_cli/acp/server.py b/src/kimi_cli/acp/server.py index 0b6b32538..84b5f4bfa 100644 --- a/src/kimi_cli/acp/server.py +++ b/src/kimi_cli/acp/server.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import sys import time from datetime import datetime from pathlib import Path @@ -33,7 +32,9 @@ def __init__(self) -> None: self.conn: acp.Client | None = None self.sessions: dict[str, tuple[ACPSession, _ModelIDConv]] = {} self.negotiated_version: ACPVersionSpec | None = None - self._auth_methods: list[acp.schema.AuthMethod] = [] + self._auth_methods: list[ + acp.schema.EnvVarAuthMethod | acp.schema.TerminalAuthMethod | acp.schema.AuthMethodAgent + ] = [] def on_connect(self, conn: acp.Client) -> None: logger.info("ACP client connected") @@ -66,32 +67,18 @@ async def initialize( version=getattr(client_info, "version", None), ) - # get command and args of current process for terminal-auth - command = sys.argv[0] - args: list[str] = [] - - # Build terminal auth data for error response - terminal_args = args + ["login"] - # Build and cache auth methods for reuse in AUTH_REQUIRED errors self._auth_methods = [ - acp.schema.AuthMethod( + acp.schema.TerminalAuthMethod( id="login", + type="terminal", name="Login with Kimi account", description=( "Run `kimi login` command in the terminal, " "then follow the instructions to finish login." ), - # Store auth data in field_meta for building AUTH_REQUIRED error - field_meta={ - "terminal-auth": { - "command": command, - "args": terminal_args, - "label": "Kimi Code Login", - "env": {}, - "type": "terminal", - } - }, + args=["login"], + env={}, ), ] @@ -129,20 +116,9 @@ def _check_auth(self) -> None: """Check if Kimi Code authentication is complete. Raise AUTH_REQUIRED if not.""" reason = self._check_token_usable() if reason: - auth_methods_data: list[dict[str, Any]] = [] - for m in self._auth_methods: - if m.field_meta and "terminal-auth" in m.field_meta: - terminal_auth = m.field_meta["terminal-auth"] - auth_methods_data.append( - { - "id": m.id, - "name": m.name, - "description": m.description, - "type": terminal_auth.get("type", "terminal"), - "args": terminal_auth.get("args", []), - "env": terminal_auth.get("env", {}), - } - ) + auth_methods_data = [ + m.model_dump(by_alias=True, exclude_none=True) for m in self._auth_methods + ] logger.warning("Authentication required, {reason}", reason=reason) raise acp.RequestError.auth_required({"authMethods": auth_methods_data}) @@ -390,7 +366,10 @@ async def authenticate(self, method_id: str, **kwargs: Any) -> acp.AuthenticateR raise acp.RequestError.auth_required( { "message": "Please complete login in terminal first", - "authMethods": self._auth_methods, + "authMethods": [ + m.model_dump(by_alias=True, exclude_none=True) + for m in self._auth_methods + ], } ) diff --git a/src/kimi_cli/acp/session.py b/src/kimi_cli/acp/session.py index fa74e801b..7c984db32 100644 --- a/src/kimi_cli/acp/session.py +++ b/src/kimi_cli/acp/session.py @@ -116,8 +116,22 @@ def __init__(self): self.tool_calls: dict[str, _ToolCallState] = {} """Map of tool call ID (LLM-side ID) to tool call state.""" self.last_tool_call: _ToolCallState | None = None + self.content_run_kind: str | None = None + """The active ACP content run kind: `message` or `thought`.""" + self.content_run_message_id: str | None = None + """Stable ACP message ID for the current contiguous content run.""" self.cancel_event = asyncio.Event() + def reset_content_run(self) -> None: + self.content_run_kind = None + self.content_run_message_id = None + + def content_run_id(self, kind: str) -> str: + if self.content_run_kind != kind or self.content_run_message_id is None: + self.content_run_kind = kind + self.content_run_message_id = str(uuid.uuid4()) + return self.content_run_message_id + class ACPSession: def __init__( @@ -161,29 +175,32 @@ async def prompt(self, prompt: list[ACPContentBlock]) -> acp.PromptResponse: async for msg in self._cli.run(user_input, self._turn_state.cancel_event): match msg: case TurnBegin(): - pass + self._reset_content_run() case SteerInput(): - pass + self._reset_content_run() case TurnEnd(): - pass + self._reset_content_run() case StepBegin(): - pass + self._reset_content_run() case StepInterrupted(): + self._reset_content_run() break case StepRetry(): - pass + self._reset_content_run() case CompactionBegin(): - pass + self._reset_content_run() case CompactionEnd(): - pass + self._reset_content_run() case MCPLoadingBegin(): - pass + self._reset_content_run() case MCPLoadingEnd(): - pass + self._reset_content_run() case StatusUpdate(): - pass + self._reset_content_run() case Notification(): + self._reset_content_run() await self._send_notification(msg) + self._reset_content_run() case ThinkPart(think=think): await self._send_thinking(think) case TextPart(text=text): @@ -253,6 +270,14 @@ async def cancel(self) -> None: self._turn_state.cancel_event.set() + def _reset_content_run(self) -> None: + if self._turn_state is not None: + self._turn_state.reset_content_run() + + def _content_run_id(self, kind: str) -> str: + assert self._turn_state is not None + return self._turn_state.content_run_id(kind) + async def _send_thinking(self, think: str): """Send thinking content to client.""" if not self._id or not self._conn: @@ -262,6 +287,7 @@ async def _send_thinking(self, think: str): self._id, acp.schema.AgentThoughtChunk( content=acp.schema.TextContentBlock(type="text", text=think), + message_id=self._content_run_id("thought"), session_update="agent_thought_chunk", ), ) @@ -275,6 +301,7 @@ async def _send_text(self, text: str): session_id=self._id, update=acp.schema.AgentMessageChunk( content=acp.schema.TextContentBlock(type="text", text=text), + message_id=self._content_run_id("message"), session_update="agent_message_chunk", ), ) @@ -292,6 +319,7 @@ async def _send_tool_call(self, tool_call: ToolCall): assert self._turn_state is not None if not self._id or not self._conn: return + self._reset_content_run() # Create and store tool call state state = _ToolCallState(tool_call) @@ -353,6 +381,7 @@ async def _send_tool_result(self, result: ToolResult): assert self._turn_state is not None if not self._id or not self._conn: return + self._reset_content_run() tool_ret = result.return_value diff --git a/tests/acp/test_server_initialize.py b/tests/acp/test_server_initialize.py index ad20a23e6..1810c7ce3 100644 --- a/tests/acp/test_server_initialize.py +++ b/tests/acp/test_server_initialize.py @@ -2,8 +2,6 @@ from __future__ import annotations -from unittest.mock import patch - import pytest from kimi_cli.acp.server import ACPServer @@ -11,38 +9,17 @@ pytestmark = pytest.mark.asyncio -@pytest.mark.parametrize( - "argv, expected_command, expected_terminal_args", - [ - # Standard entry-point: kimi acp - (["/usr/local/bin/kimi", "acp"], "/usr/local/bin/kimi", ["login"]), - # kimi-code entry-point (JetBrains scenario) - (["/usr/local/bin/kimi-code", "acp"], "/usr/local/bin/kimi-code", ["login"]), - # kimi-cli entry-point - (["/usr/local/bin/kimi-cli", "acp"], "/usr/local/bin/kimi-cli", ["login"]), - # Arbitrary wrapper script - (["/opt/wrapper.sh", "acp"], "/opt/wrapper.sh", ["login"]), - ], - ids=["kimi", "kimi-code", "kimi-cli", "wrapper-script"], -) -async def test_initialize_argv_handling( - argv: list[str], - expected_command: str, - expected_terminal_args: list[str], -): - """initialize() should not crash regardless of sys.argv content.""" +async def test_initialize_advertises_terminal_auth_method(): + """initialize() should advertise terminal auth using the ACP schema.""" server = ACPServer() - with patch("kimi_cli.acp.server.sys") as mock_sys: - mock_sys.argv = argv - resp = await server.initialize(protocol_version=1) + resp = await server.initialize(protocol_version=1) assert resp.protocol_version == 1 assert resp.auth_methods is not None assert len(resp.auth_methods) == 1 auth_method = resp.auth_methods[0] - assert auth_method.field_meta is not None - terminal_auth = auth_method.field_meta["terminal-auth"] - assert terminal_auth["command"] == expected_command - assert terminal_auth["args"] == expected_terminal_args + assert auth_method.type == "terminal" + assert auth_method.args == ["login"] + assert auth_method.env == {} diff --git a/tests/acp/test_session_notifications.py b/tests/acp/test_session_notifications.py index 70b9197d4..fbbfbd5ea 100644 --- a/tests/acp/test_session_notifications.py +++ b/tests/acp/test_session_notifications.py @@ -5,6 +5,7 @@ import acp import pytest +from kosong.tooling import ToolOk, ToolResult from kosong.tooling.empty import EmptyToolset from kimi_cli.acp.session import ACPSession @@ -14,7 +15,7 @@ from kimi_cli.soul.agent import Agent, Runtime from kimi_cli.soul.context import Context from kimi_cli.soul.kimisoul import KimiSoul -from kimi_cli.wire.types import Notification, TextPart, ToolCall, TurnBegin, TurnEnd +from kimi_cli.wire.types import Notification, TextPart, ThinkPart, ToolCall, TurnBegin, TurnEnd class _FakeConn: @@ -46,6 +47,21 @@ async def run(self, _user_input, _cancel_event): yield TurnEnd() +class _FakeStreamingCLI: + async def run(self, _user_input, _cancel_event): + yield TurnBegin(user_input=[TextPart(text="hello")]) + yield ThinkPart(think="thinking") + yield TextPart(text="Hi mom") + yield TextPart(text=", what is for dinner?") + yield ToolCall( + id="call-dinner", + function=ToolCall.FunctionBody(name="ReadFile", arguments='{"path":"menu.txt"}'), + ) + yield ToolResult(tool_call_id="call-dinner", return_value=ToolOk(output="pizza")) + yield TextPart(text="Tell Dad I said hi") + yield TurnEnd() + + @pytest.mark.asyncio async def test_acp_session_surfaces_notification_as_message_chunk() -> None: conn = _FakeConn() @@ -64,6 +80,33 @@ async def test_acp_session_surfaces_notification_as_message_chunk() -> None: assert text_update.content.text == "done" +@pytest.mark.asyncio +async def test_acp_session_assigns_message_ids_to_distinct_content_runs() -> None: + conn = _FakeConn() + session = ACPSession("session-1", _FakeStreamingCLI(), conn) # type: ignore[arg-type] + + response = await session.prompt([acp.text_block("hello")]) + + assert response.stop_reason == "end_turn" + chunks = [ + update + for _, update in conn.updates + if update.session_update in {"agent_thought_chunk", "agent_message_chunk"} + ] + assert [chunk.content.text for chunk in chunks] == [ + "thinking", + "Hi mom", + ", what is for dinner?", + "Tell Dad I said hi", + ] + assert chunks[0].message_id + assert chunks[1].message_id + assert chunks[1].message_id == chunks[2].message_id + assert chunks[3].message_id + assert chunks[3].message_id != chunks[1].message_id + assert chunks[0].message_id not in {chunks[1].message_id, chunks[3].message_id} + + class _BlockingApprovalConn(_FakeConn): def __init__(self) -> None: super().__init__() diff --git a/tests/ui_and_conv/test_acp_server_auth.py b/tests/ui_and_conv/test_acp_server_auth.py index 40a025821..ae7ed4e19 100644 --- a/tests/ui_and_conv/test_acp_server_auth.py +++ b/tests/ui_and_conv/test_acp_server_auth.py @@ -13,17 +13,13 @@ def server() -> ACPServer: """Create an ACPServer instance with mocked auth methods.""" s = ACPServer() s._auth_methods = [ - acp.schema.AuthMethod( + acp.schema.TerminalAuthMethod( id="login", + type="terminal", name="Test Login", description="Test description", - field_meta={ - "terminal-auth": { - "type": "terminal", - "args": ["kimi", "login"], - "env": {}, - } - }, + args=["login"], + env={}, ) ] return s diff --git a/uv.lock b/uv.lock index 9d1b6ed7c..384eeddef 100644 --- a/uv.lock +++ b/uv.lock @@ -17,14 +17,14 @@ members = [ [[package]] name = "agent-client-protocol" -version = "0.8.0" +version = "0.10.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "pydantic" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/a8/a4/26698e0186933b4ab6e814626c99ee52b5522d039b5c94c983ecb3a66eed/agent_client_protocol-0.8.0.tar.gz", hash = "sha256:f9eade29167ff72a10fae7a0a0c1f27436909a790e159fb10265c2874e58d922", size = 68577, upload-time = "2026-02-07T17:08:46.513Z" } +sdist = { url = "https://files.pythonhosted.org/packages/21/5c/d60196c536c8f66bb6a238c8a6d0d6fa84a2e3436008c139cb4a79003a25/agent_client_protocol-0.10.0.tar.gz", hash = "sha256:f8a6041e27423131e42e4d0cdd850d5b094b1092f79cc29501ab2bd57b92ee88", size = 81502, upload-time = "2026-05-07T19:11:56.784Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/4b/04/e55a3c549c09c0023cb92c696c7b98d97bb657088f940e34f4bc47d1a49a/agent_client_protocol-0.8.0-py3-none-any.whl", hash = "sha256:2d5712b88b3249dbd6148b24d32c6eb8992e5663f224db6291524ac80cca8037", size = 54362, upload-time = "2026-02-07T17:08:45.575Z" }, + { url = "https://files.pythonhosted.org/packages/6c/cc/139895c0c5cd2acefc365085da0735d93df0cf8427ff4ee351961090e1af/agent_client_protocol-0.10.0-py3-none-any.whl", hash = "sha256:97a1a69c5d094e2625b09c4dbc2d5cf19637c4943630ceed52ab0578ecd5e14c", size = 65400, upload-time = "2026-05-07T19:11:55.614Z" }, ] [[package]] @@ -1330,7 +1330,7 @@ dev = [ [package.metadata] requires-dist = [ - { name = "agent-client-protocol", specifier = "==0.8.0" }, + { name = "agent-client-protocol", specifier = "==0.10.0" }, { name = "aiofiles", specifier = ">=24.0,<26.0" }, { name = "aiohttp", specifier = "==3.13.3" }, { name = "batrachian-toad", marker = "python_full_version >= '3.14'", specifier = "==0.5.23" }, From 5e25b1d75355763cc235a4c8c0abd8308f2d9964 Mon Sep 17 00:00:00 2001 From: huntharo Date: Sat, 23 May 2026 21:50:37 -0400 Subject: [PATCH 2/3] fix(acp): replay history on session load --- src/kimi_cli/acp/AGENTS.md | 4 +- src/kimi_cli/acp/server.py | 36 ++++++-- src/kimi_cli/acp/session.py | 162 +++++++++++++++++++++++++++++++++- src/kimi_cli/app.py | 1 + tests/acp/test_protocol_v1.py | 118 ++++++++++++++++++++++++- 5 files changed, 312 insertions(+), 9 deletions(-) diff --git a/src/kimi_cli/acp/AGENTS.md b/src/kimi_cli/acp/AGENTS.md index 24efeacaa..8d63f7f2e 100644 --- a/src/kimi_cli/acp/AGENTS.md +++ b/src/kimi_cli/acp/AGENTS.md @@ -31,7 +31,9 @@ - MCP servers passed by ACP are converted via `acp_mcp_servers_to_mcp_config`. - `session/load` - Multi-session only: loads by `Session.find`, then builds `KimiCLI` and `ACPSession`. - - No history replay yet (TODO). + - Replays persisted `wire.jsonl` history as ACP `session/update` notifications, falls back to + context text history for older sessions without wire history, and returns initial modes/models + state. - Single-session: not implemented. - `session/list` - Multi-session only: lists sessions via `Session.list`, no pagination. diff --git a/src/kimi_cli/acp/server.py b/src/kimi_cli/acp/server.py index 84b5f4bfa..7134940a9 100644 --- a/src/kimi_cli/acp/server.py +++ b/src/kimi_cli/acp/server.py @@ -231,18 +231,42 @@ async def _setup_session( async def load_session( self, cwd: str, session_id: str, mcp_servers: list[MCPServer] | None = None, **kwargs: Any - ) -> None: + ) -> acp.schema.LoadSessionResponse: logger.info("Loading session: {id} for working directory: {cwd}", id=session_id, cwd=cwd) if session_id in self.sessions: logger.warning("Session already loaded: {id}", id=session_id) - return + acp_session, model_id_conv = self.sessions[session_id] + else: + # Check authentication before loading session + self._check_auth() - # Check authentication before loading session - self._check_auth() + acp_session, model_id_conv = await self._setup_session(cwd, session_id, mcp_servers) + + replayed_updates = await acp_session.replay_history() + logger.info( + "Replayed {count} ACP history updates for session: {id}", + count=replayed_updates, + id=session_id, + ) - await self._setup_session(cwd, session_id, mcp_servers) - # TODO: replay session history? + config = acp_session.cli.soul.runtime.config + return acp.schema.LoadSessionResponse( + modes=acp.schema.SessionModeState( + available_modes=[ + acp.schema.SessionMode( + id="default", + name="Default", + description="The default mode.", + ), + ], + current_mode_id="default", + ), + models=acp.schema.SessionModelState( + available_models=_expand_llm_models(config.models), + current_model_id=model_id_conv.to_acp_model_id(), + ), + ) async def resume_session( self, cwd: str, session_id: str, mcp_servers: list[MCPServer] | None = None, **kwargs: Any diff --git a/src/kimi_cli/acp/session.py b/src/kimi_cli/acp/session.py index 7c984db32..720014a10 100644 --- a/src/kimi_cli/acp/session.py +++ b/src/kimi_cli/acp/session.py @@ -2,7 +2,7 @@ import asyncio import uuid -from contextvars import ContextVar +from contextvars import ContextVar, Token import acp import streamingjson # type: ignore[reportMissingTypeStubs] @@ -22,9 +22,11 @@ from kimi_cli.wire.types import ( ApprovalRequest, ApprovalResponse, + AudioURLPart, CompactionBegin, CompactionEnd, ContentPart, + ImageURLPart, MCPLoadingBegin, MCPLoadingEnd, Notification, @@ -75,6 +77,44 @@ def should_hide_terminal_output(tool_call_id: str) -> bool: return calls is not None and tool_call_id in calls +def _content_part_to_acp_block(part: ContentPart) -> ACPContentBlock: + if isinstance(part, TextPart): + return acp.schema.TextContentBlock(type="text", text=part.text) + if isinstance(part, ImageURLPart): + mime_type, data = _split_data_url(part.image_url.url) + if data is not None: + return acp.schema.ImageContentBlock(type="image", mime_type=mime_type, data=data) + return acp.schema.ResourceContentBlock( + type="resource_link", + uri=part.image_url.url, + name="image", + mime_type=mime_type, + ) + if isinstance(part, AudioURLPart): + mime_type, data = _split_data_url(part.audio_url.url) + if data is not None: + return acp.schema.AudioContentBlock(type="audio", mime_type=mime_type, data=data) + return acp.schema.ResourceContentBlock( + type="resource_link", + uri=part.audio_url.url, + name="audio", + mime_type=mime_type, + ) + + logger.warning("Unsupported replay user content part: {part}", part=part) + return acp.schema.TextContentBlock(type="text", text=f"[{part.__class__.__name__}]") + + +def _split_data_url(url: str) -> tuple[str, str | None]: + if not url.startswith("data:"): + return "application/octet-stream", None + header, sep, data = url.partition(",") + if not sep or ";base64" not in header: + return "application/octet-stream", None + mime_type = header.removeprefix("data:").removesuffix(";base64") + return mime_type or "application/octet-stream", data + + class _ToolCallState: """Manages the state of a single tool call for streaming updates.""" @@ -263,6 +303,126 @@ async def prompt(self, prompt: list[ACPContentBlock]) -> acp.PromptResponse: _current_turn_id.reset(token) return acp.PromptResponse(stop_reason="end_turn") + async def replay_history(self) -> int: + """Replay persisted wire history as ACP session updates.""" + old_turn_state = self._turn_state + turn_token: Token[str | None] | None = None + replayed_updates = 0 + self._turn_state = None + try: + async for record in self._cli.soul.runtime.session.wire_file.iter_records(): + msg = record.to_wire_message() + match msg: + case TurnBegin(user_input=user_input): + if turn_token is not None: + _current_turn_id.reset(turn_token) + self._turn_state = _TurnState() + turn_token = _current_turn_id.set(self._turn_state.id) + replayed_updates += await self._send_user_input(user_input) + case SteerInput(user_input=user_input): + self._reset_content_run() + replayed_updates += await self._send_user_input(user_input) + case TurnEnd() | StepInterrupted(): + if turn_token is not None: + _current_turn_id.reset(turn_token) + turn_token = None + self._turn_state = None + case StepBegin(): + if turn_token is None: + turn_token = self._begin_replay_turn() + case ThinkPart(think=think): + await self._send_thinking(think) + replayed_updates += 1 + case TextPart(text=text): + await self._send_text(text) + replayed_updates += 1 + case ContentPart(): + logger.warning("Unsupported replay content part: {part}", part=msg) + await self._send_text(f"[{msg.__class__.__name__}]") + replayed_updates += 1 + case ToolCall(): + if turn_token is None: + turn_token = self._begin_replay_turn() + await self._send_tool_call(msg) + replayed_updates += 1 + case ToolCallPart(): + if self._turn_state is not None: + await self._send_tool_call_part(msg) + replayed_updates += 1 + case ToolResult(): + if self._turn_state is not None: + await self._send_tool_result(msg) + replayed_updates += 1 + case Notification(): + await self._send_notification(msg) + replayed_updates += 1 + case _: + pass + finally: + if turn_token is not None: + _current_turn_id.reset(turn_token) + self._turn_state = old_turn_state + if replayed_updates == 0: + replayed_updates = await self._replay_context_history() + return replayed_updates + + def _begin_replay_turn(self) -> Token[str | None]: + self._turn_state = _TurnState() + return _current_turn_id.set(self._turn_state.id) + + async def _replay_context_history(self) -> int: + old_turn_state = self._turn_state + turn_token: Token[str | None] | None = None + replayed_updates = 0 + self._turn_state = None + try: + for message in self._cli.soul.context.history: + if turn_token is not None: + _current_turn_id.reset(turn_token) + turn_token = self._begin_replay_turn() + + if message.role == "user": + replayed_updates += await self._send_user_input(list(message.content)) + elif message.role == "assistant": + for part in message.content: + if isinstance(part, ThinkPart): + await self._send_thinking(part.think) + elif isinstance(part, TextPart): + await self._send_text(part.text) + else: + logger.warning("Unsupported context replay part: {part}", part=part) + await self._send_text(f"[{part.__class__.__name__}]") + replayed_updates += 1 + finally: + if turn_token is not None: + _current_turn_id.reset(turn_token) + self._turn_state = old_turn_state + return replayed_updates + + async def _send_user_input(self, user_input: str | list[ContentPart]) -> int: + blocks: list[ACPContentBlock] + if isinstance(user_input, str): + blocks = [acp.schema.TextContentBlock(type="text", text=user_input)] + else: + blocks = [_content_part_to_acp_block(part) for part in user_input] + + for block in blocks: + await self._send_user_block(block) + return len(blocks) + + async def _send_user_block(self, block: ACPContentBlock) -> None: + if not self._id or not self._conn: + return + + await self._conn.session_update( + session_id=self._id, + update=acp.schema.UserMessageChunk( + content=block, + message_id=self._content_run_id("user"), + session_update="user_message_chunk", + ), + ) + async def cancel(self) -> None: if self._turn_state is None: logger.warning("Cancel requested but no prompt is running") diff --git a/src/kimi_cli/app.py b/src/kimi_cli/app.py index fb3b6ab31..66bdf1dd1 100644 --- a/src/kimi_cli/app.py +++ b/src/kimi_cli/app.py @@ -652,6 +652,7 @@ async def _mirror_external_cancel() -> None: user_input, _ui_loop_fn, run_cancel_event, + wire_file=self._runtime.session.wire_file, runtime=self._runtime, ) ) diff --git a/tests/acp/test_protocol_v1.py b/tests/acp/test_protocol_v1.py index 70e413caa..6f949cefd 100644 --- a/tests/acp/test_protocol_v1.py +++ b/tests/acp/test_protocol_v1.py @@ -2,12 +2,14 @@ from __future__ import annotations +import os + import acp import pytest from kimi_cli.acp.version import CURRENT_VERSION -from .conftest import ACPTestClient +from .conftest import ACPTestClient, _kimi_bin, _repo_root pytestmark = pytest.mark.asyncio @@ -148,6 +150,120 @@ async def test_resume_session_not_found( ) +async def test_load_session_replays_history(acp_share_dir, tmp_path): + """session/load rebinds the session and replays persisted transcript updates.""" + env = { + **os.environ, + "KIMI_SHARE_DIR": str(acp_share_dir), + } + work_dir = tmp_path / "workdir" + work_dir.mkdir(exist_ok=True) + + first_client = ACPTestClient() + async with acp.spawn_agent_process( + first_client, + _kimi_bin(), + "acp", + env=env, + cwd=str(_repo_root()), + use_unstable_protocol=True, + ) as (conn, _process): + await conn.initialize(protocol_version=1) + session_resp = await conn.new_session(cwd=str(work_dir)) + await conn.prompt( + prompt=[acp.text_block("Replay this please")], + session_id=session_resp.session_id, + ) + session_id = session_resp.session_id + + second_client = ACPTestClient() + async with acp.spawn_agent_process( + second_client, + _kimi_bin(), + "acp", + env=env, + cwd=str(_repo_root()), + use_unstable_protocol=True, + ) as (conn, _process): + await conn.initialize(protocol_version=1) + load_resp = await conn.load_session(cwd=str(work_dir), session_id=session_id) + + assert load_resp.modes is not None + assert load_resp.models is not None + + replayed = [ + (update.session_update, getattr(update.content, "text", None), update.message_id) + for update in second_client.updates + if hasattr(update, "content") + ] + assert any( + update_type == "user_message_chunk" and text == "Replay this please" and message_id + for update_type, text, message_id in replayed + ) + assert any( + update_type == "agent_message_chunk" and text == "Hello from scripted echo!" and message_id + for update_type, text, message_id in replayed + ) + + +async def test_load_session_replays_context_when_wire_history_missing(acp_share_dir, tmp_path): + """Older ACP sessions can still replay text history if no wire log was recorded.""" + env = { + **os.environ, + "KIMI_SHARE_DIR": str(acp_share_dir), + } + work_dir = tmp_path / "workdir" + work_dir.mkdir(exist_ok=True) + + first_client = ACPTestClient() + async with acp.spawn_agent_process( + first_client, + _kimi_bin(), + "acp", + env=env, + cwd=str(_repo_root()), + use_unstable_protocol=True, + ) as (conn, _process): + await conn.initialize(protocol_version=1) + session_resp = await conn.new_session(cwd=str(work_dir)) + await conn.prompt( + prompt=[acp.text_block("Replay from context")], + session_id=session_resp.session_id, + ) + session_id = session_resp.session_id + + wire_files = list(acp_share_dir.rglob("wire.jsonl")) + assert wire_files + for wire_file in wire_files: + wire_file.unlink() + + second_client = ACPTestClient() + async with acp.spawn_agent_process( + second_client, + _kimi_bin(), + "acp", + env=env, + cwd=str(_repo_root()), + use_unstable_protocol=True, + ) as (conn, _process): + await conn.initialize(protocol_version=1) + await conn.load_session(cwd=str(work_dir), session_id=session_id) + + replayed = [ + (update.session_update, getattr(update.content, "text", None), update.message_id) + for update in second_client.updates + if hasattr(update, "content") + ] + assert any( + update_type == "user_message_chunk" and text == "Replay from context" and message_id + for update_type, text, message_id in replayed + ) + assert any( + update_type == "agent_message_chunk" and text == "Hello from scripted echo!" and message_id + for update_type, text, message_id in replayed + ) + + async def test_cancel_session( acp_client: tuple[acp.ClientSideConnection, ACPTestClient], tmp_path, From 73f6534022b36642e6806a0e5f35aa45e09d7f3c Mon Sep 17 00:00:00 2001 From: huntharo Date: Sun, 24 May 2026 13:23:26 -0400 Subject: [PATCH 3/3] feat(acp): support permission mode switching --- src/kimi_cli/acp/AGENTS.md | 6 +- src/kimi_cli/acp/server.py | 98 +++++++++++++++-------- src/kimi_cli/acp/session.py | 71 ++++++++++------ tests/acp/test_protocol_v1.py | 79 ++++++++++++++++++ tests/acp/test_server_permission_modes.py | 93 +++++++++++++++++++++ 5 files changed, 290 insertions(+), 57 deletions(-) create mode 100644 tests/acp/test_server_permission_modes.py diff --git a/src/kimi_cli/acp/AGENTS.md b/src/kimi_cli/acp/AGENTS.md index 8d63f7f2e..c0ff455a5 100644 --- a/src/kimi_cli/acp/AGENTS.md +++ b/src/kimi_cli/acp/AGENTS.md @@ -73,10 +73,14 @@ then releases the terminal handle. - Approval requests in the core tool system are bridged to ACP `session/request_permission` with allow-once/allow-always/reject options. +- Permission mode switching is exposed through ACP `session/set_mode`. + - `default` asks for permissions as needed. + - `yolo` toggles Kimi's persisted yolo approval state and resolves pending approvals. ## Current gaps / not implemented - `authenticate` method (not used by current Zed ACP client). -- `session/set_mode` and `session/set_model` (no multi-mode/model switching in kimi-cli). +- `session/set_model` currently supports configured Kimi models, but there is no agent/model + provider discovery beyond the local config. - `ext_method` / `ext_notification` for custom ACP extensions are stubbed. - Single-session server does not implement `session/load` or `session/list`. diff --git a/src/kimi_cli/acp/server.py b/src/kimi_cli/acp/server.py index 7134940a9..a4e7ede9e 100644 --- a/src/kimi_cli/acp/server.py +++ b/src/kimi_cli/acp/server.py @@ -25,6 +25,21 @@ from kimi_cli.soul.toolset import KimiToolset from kimi_cli.utils.logging import logger +_DEFAULT_MODE_ID = "default" +_YOLO_MODE_ID = "yolo" +_SESSION_MODES = [ + acp.schema.SessionMode( + id=_DEFAULT_MODE_ID, + name="Default", + description="Ask for permission before running actions that require approval.", + ), + acp.schema.SessionMode( + id=_YOLO_MODE_ID, + name="Yolo", + description="Automatically approve actions for this session.", + ), +] + class ACPServer: def __init__(self) -> None: @@ -171,16 +186,7 @@ async def new_session( ) return acp.NewSessionResponse( session_id=session.id, - modes=acp.schema.SessionModeState( - available_modes=[ - acp.schema.SessionMode( - id="default", - name="Default", - description="The default mode.", - ), - ], - current_mode_id="default", - ), + modes=_session_mode_state(acp_session), models=acp.schema.SessionModelState( available_models=_expand_llm_models(config.models), current_model_id=model_id_conv.to_acp_model_id(), @@ -252,16 +258,7 @@ async def load_session( config = acp_session.cli.soul.runtime.config return acp.schema.LoadSessionResponse( - modes=acp.schema.SessionModeState( - available_modes=[ - acp.schema.SessionMode( - id="default", - name="Default", - description="The default mode.", - ), - ], - current_mode_id="default", - ), + modes=_session_mode_state(acp_session), models=acp.schema.SessionModelState( available_models=_expand_llm_models(config.models), current_model_id=model_id_conv.to_acp_model_id(), @@ -279,16 +276,7 @@ async def resume_session( acp_session, model_id_conv = self.sessions[session_id] config = acp_session.cli.soul.runtime.config return acp.schema.ResumeSessionResponse( - modes=acp.schema.SessionModeState( - available_modes=[ - acp.schema.SessionMode( - id="default", - name="Default", - description="The default mode.", - ), - ], - current_mode_id="default", - ), + modes=_session_mode_state(acp_session), models=acp.schema.SessionModelState( available_models=_expand_llm_models(config.models), current_model_id=model_id_conv.to_acp_model_id(), @@ -321,8 +309,41 @@ async def list_sessions( next_cursor=None, ) - async def set_session_mode(self, mode_id: str, session_id: str, **kwargs: Any) -> None: - assert mode_id == "default", "Only default mode is supported" + async def set_session_mode( + self, mode_id: str, session_id: str, **kwargs: Any + ) -> acp.schema.SetSessionModeResponse: + logger.info( + "Setting session mode to {mode_id} for session: {id}", + mode_id=mode_id, + id=session_id, + ) + if session_id not in self.sessions: + logger.error("Session not found: {id}", id=session_id) + raise acp.RequestError.invalid_params({"session_id": "Session not found"}) + if mode_id not in {_DEFAULT_MODE_ID, _YOLO_MODE_ID}: + logger.error("Mode not found: {mode_id}", mode_id=mode_id) + raise acp.RequestError.invalid_params({"mode_id": "Mode not found"}) + + acp_session, _ = self.sessions[session_id] + runtime = acp_session.cli.soul.runtime + enable_yolo = mode_id == _YOLO_MODE_ID + if runtime.approval.is_yolo() != enable_yolo: + runtime.approval.set_yolo(enable_yolo) + + if enable_yolo and runtime.approval_runtime is not None: + for request in runtime.approval_runtime.list_pending(): + runtime.approval_runtime.resolve(request.id, "approve") + + if self.conn is not None: + await self.conn.session_update( + session_id=session_id, + update=acp.schema.CurrentModeUpdate( + session_update="current_mode_update", + current_mode_id=_current_mode_id(acp_session), + ), + ) + + return acp.schema.SetSessionModeResponse() async def set_session_model(self, model_id: str, session_id: str, **kwargs: Any) -> None: logger.info( @@ -469,3 +490,16 @@ def _expand_llm_models(models: dict[str, LLMModel]) -> list[acp.schema.ModelInfo ) ) return expanded_models + + +def _current_mode_id(acp_session: ACPSession) -> str: + if acp_session.cli.soul.runtime.approval.is_yolo(): + return _YOLO_MODE_ID + return _DEFAULT_MODE_ID + + +def _session_mode_state(acp_session: ACPSession) -> acp.schema.SessionModeState: + return acp.schema.SessionModeState( + available_modes=_SESSION_MODES, + current_mode_id=_current_mode_id(acp_session), + ) diff --git a/src/kimi_cli/acp/session.py b/src/kimi_cli/acp/session.py index 720014a10..28b4e6b6d 100644 --- a/src/kimi_cli/acp/session.py +++ b/src/kimi_cli/acp/session.py @@ -607,33 +607,56 @@ async def _handle_approval_request(self, request: ApprovalRequest): ) ) - # Send permission request and wait for response + # Send permission request and wait for response. Also watch the + # local request object so protocol-level mode switches can resolve + # a pending approval without waiting for the client dialog. logger.debug("Requesting permission for action: {action}", action=request.action) - response = await self._conn.request_permission( - [ - acp.schema.PermissionOption( - option_id="approve", - name="Approve once", - kind="allow_once", - ), - acp.schema.PermissionOption( - option_id="approve_for_session", - name="Approve for this session", - kind="allow_always", - ), - acp.schema.PermissionOption( - option_id="reject", - name="Reject", - kind="reject_once", + permission_task = asyncio.create_task( + self._conn.request_permission( + [ + acp.schema.PermissionOption( + option_id="approve", + name="Approve once", + kind="allow_once", + ), + acp.schema.PermissionOption( + option_id="approve_for_session", + name="Approve for this session", + kind="allow_always", + ), + acp.schema.PermissionOption( + option_id="reject", + name="Reject", + kind="reject_once", + ), + ], + self._id, + acp.schema.ToolCallUpdate( + tool_call_id=state.acp_tool_call_id, + title=state.get_title(), + content=content, ), - ], - self._id, - acp.schema.ToolCallUpdate( - tool_call_id=state.acp_tool_call_id, - title=state.get_title(), - content=content, - ), + ) + ) + request_task = asyncio.create_task(request.wait()) + done, pending = await asyncio.wait( + {permission_task, request_task}, + return_when=asyncio.FIRST_COMPLETED, ) + for task in pending: + task.cancel() + if pending: + await asyncio.gather(*pending, return_exceptions=True) + + if permission_task not in done: + resolved_response = request_task.result() + logger.debug( + "Permission request resolved externally: {response}", + response=resolved_response, + ) + return + + response = permission_task.result() logger.debug("Received permission response: {response}", response=response) # Process the outcome diff --git a/tests/acp/test_protocol_v1.py b/tests/acp/test_protocol_v1.py index 6f949cefd..0ff6e240f 100644 --- a/tests/acp/test_protocol_v1.py +++ b/tests/acp/test_protocol_v1.py @@ -2,7 +2,10 @@ from __future__ import annotations +import asyncio import os +from collections.abc import Callable +from typing import Any import acp import pytest @@ -14,6 +17,14 @@ pytestmark = pytest.mark.asyncio +async def _wait_for_update(test_client: ACPTestClient, predicate: Callable[[Any], bool]) -> None: + for _ in range(20): + if any(predicate(update) for update in test_client.updates): + return + await asyncio.sleep(0.05) + pytest.fail("Expected ACP session update was not received") + + async def test_initialize_returns_negotiated_version( acp_client: tuple[acp.ClientSideConnection, ACPTestClient], ): @@ -53,6 +64,8 @@ async def test_new_session_response_shape( assert isinstance(resp.session_id, str) assert len(resp.session_id) > 0 assert resp.modes is not None + assert [mode.id for mode in resp.modes.available_modes] == ["default", "yolo"] + assert resp.modes.current_mode_id == "default" assert resp.models is not None @@ -132,6 +145,72 @@ async def test_resume_session( assert len(resume_resp.models.available_models) > 0 +async def test_set_session_mode_toggles_yolo( + acp_client: tuple[acp.ClientSideConnection, ACPTestClient], + tmp_path, +): + """session/set_mode toggles Kimi's approval mode through ACP.""" + conn, test_client = acp_client + await conn.initialize(protocol_version=1) + + work_dir = tmp_path / "workdir" + work_dir.mkdir(exist_ok=True) + session_resp = await conn.new_session(cwd=str(work_dir)) + + await conn.set_session_mode( + mode_id="yolo", + session_id=session_resp.session_id, + ) + + await _wait_for_update( + test_client, + lambda update: isinstance(update, acp.schema.CurrentModeUpdate) + and update.current_mode_id == "yolo", + ) + resume_resp = await conn.resume_session( + cwd=str(work_dir), + session_id=session_resp.session_id, + ) + assert resume_resp.modes is not None + assert resume_resp.modes.current_mode_id == "yolo" + + await conn.set_session_mode( + mode_id="default", + session_id=session_resp.session_id, + ) + + await _wait_for_update( + test_client, + lambda update: isinstance(update, acp.schema.CurrentModeUpdate) + and update.current_mode_id == "default", + ) + resume_resp = await conn.resume_session( + cwd=str(work_dir), + session_id=session_resp.session_id, + ) + assert resume_resp.modes is not None + assert resume_resp.modes.current_mode_id == "default" + + +async def test_set_session_mode_rejects_unknown_mode( + acp_client: tuple[acp.ClientSideConnection, ACPTestClient], + tmp_path, +): + """session/set_mode reports invalid params for unknown modes.""" + conn, _ = acp_client + await conn.initialize(protocol_version=1) + + work_dir = tmp_path / "workdir" + work_dir.mkdir(exist_ok=True) + session_resp = await conn.new_session(cwd=str(work_dir)) + + with pytest.raises(acp.RequestError): + await conn.set_session_mode( + mode_id="unknown", + session_id=session_resp.session_id, + ) + + async def test_resume_session_not_found( acp_client: tuple[acp.ClientSideConnection, ACPTestClient], tmp_path, diff --git a/tests/acp/test_server_permission_modes.py b/tests/acp/test_server_permission_modes.py new file mode 100644 index 000000000..d2cbe6f29 --- /dev/null +++ b/tests/acp/test_server_permission_modes.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from types import SimpleNamespace +from typing import Any + +import acp +import pytest + +from kimi_cli.acp.server import ACPServer +from kimi_cli.approval_runtime import ApprovalRuntime, ApprovalSource +from kimi_cli.soul.approval import Approval, ApprovalState + +pytestmark = pytest.mark.asyncio + + +class _FakeConn: + def __init__(self) -> None: + self.updates: list[tuple[str, Any]] = [] + + async def session_update(self, session_id: str, update: Any, **kwargs: Any) -> None: + self.updates.append((session_id, update)) + + +def _server_with_approval( + approval: Approval, approval_runtime: ApprovalRuntime +) -> tuple[ACPServer, _FakeConn]: + conn = _FakeConn() + server = ACPServer() + server.conn = conn # type: ignore[assignment] + runtime = SimpleNamespace(approval=approval, approval_runtime=approval_runtime) + soul = SimpleNamespace(runtime=runtime) + cli = SimpleNamespace(soul=soul) + acp_session = SimpleNamespace(cli=cli) + server.sessions["session-1"] = (acp_session, object()) # type: ignore[assignment] + return server, conn + + +async def test_set_session_mode_yolo_resolves_pending_approvals() -> None: + approval_runtime = ApprovalRuntime() + approval = Approval( + state=ApprovalState(yolo=False), + runtime=approval_runtime, + ) + server, conn = _server_with_approval(approval, approval_runtime) + + approval_runtime.create_request( + request_id="approval-1", + tool_call_id="tool-1", + sender="Shell", + action="shell_exec", + description="Run a command", + display=[], + source=ApprovalSource(kind="foreground_turn", id="turn-1"), + ) + + await server.set_session_mode(mode_id="yolo", session_id="session-1") + + record = approval_runtime.get_request("approval-1") + assert approval.is_yolo() is True + assert record is not None + assert record.status == "resolved" + assert record.response == "approve" + assert conn.updates == [ + ( + "session-1", + acp.schema.CurrentModeUpdate( + session_update="current_mode_update", + current_mode_id="yolo", + ), + ) + ] + + +async def test_set_session_mode_default_disables_yolo() -> None: + approval_runtime = ApprovalRuntime() + approval = Approval( + state=ApprovalState(yolo=True), + runtime=approval_runtime, + ) + server, conn = _server_with_approval(approval, approval_runtime) + + await server.set_session_mode(mode_id="default", session_id="session-1") + + assert approval.is_yolo() is False + assert conn.updates == [ + ( + "session-1", + acp.schema.CurrentModeUpdate( + session_update="current_mode_update", + current_mode_id="default", + ), + ) + ]