diff --git a/pyproject.toml b/pyproject.toml index e1f71a322..eea789f5b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ description = "Kimi Code CLI is your next CLI agent." readme = "README.md" requires-python = ">=3.12" dependencies = [ - "agent-client-protocol==0.8.0", + "agent-client-protocol==0.10.0", "aiofiles>=24.0,<26.0", "aiohttp==3.13.3", "typer==0.21.1", diff --git a/src/kimi_cli/acp/AGENTS.md b/src/kimi_cli/acp/AGENTS.md index 24efeacaa..6ebd4c37b 100644 --- a/src/kimi_cli/acp/AGENTS.md +++ b/src/kimi_cli/acp/AGENTS.md @@ -20,7 +20,8 @@ - `prompt_capabilities`: `embedded_context=False`, `image=True`, `audio=False`. - `mcp_capabilities`: `http=True`, `sse=False`. - Single-session: `load_session=False`, no session list capabilities. -- Multi-session: `load_session=True`, `session_capabilities.list` supported. +- Multi-session: `load_session=True`, `session_capabilities.list` supported, and + `session_capabilities._meta.kimi.sessionHistoryReplay=True`. - `auth_methods=[]` (no authentication methods advertised). ## Session lifecycle (implemented behavior) @@ -31,8 +32,14 @@ - MCP servers passed by ACP are converted via `acp_mcp_servers_to_mcp_config`. - `session/load` - Multi-session only: loads by `Session.find`, then builds `KimiCLI` and `ACPSession`. - - No history replay yet (TODO). + - Replays persisted `wire.jsonl` history as ACP `session/update` notifications, falls back to + context text history for older sessions without wire history, and returns initial modes/models + state plus title metadata in `_meta.kimi.session`. + - Sends `SessionInfoUpdate` with title/updatedAt before replaying history. - Single-session: not implemented. +- `session/resume` + - Multi-session only: returns initial modes/models state plus title metadata in + `_meta.kimi.session`, and sends `SessionInfoUpdate` with title/updatedAt. - `session/list` - Multi-session only: lists sessions via `Session.list`, no pagination. - Single-session: not implemented. @@ -54,6 +61,9 @@ - `TodoDisplayBlock` is converted into `AgentPlanUpdate`. - Available commands: - `AvailableCommandsUpdate` is sent right after session creation. +- Session metadata: + - `SessionInfoUpdate` is sent after prompt completion when Kimi auto-generates a title, and + during `session/load`/`session/resume` so clients can hydrate title/updatedAt. ## Prompt/content conversion - Incoming prompt blocks: diff --git a/src/kimi_cli/acp/server.py b/src/kimi_cli/acp/server.py index 0b6b32538..9711670b0 100644 --- a/src/kimi_cli/acp/server.py +++ b/src/kimi_cli/acp/server.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import sys import time from datetime import datetime from pathlib import Path @@ -33,7 +32,9 @@ def __init__(self) -> None: self.conn: acp.Client | None = None self.sessions: dict[str, tuple[ACPSession, _ModelIDConv]] = {} self.negotiated_version: ACPVersionSpec | None = None - self._auth_methods: list[acp.schema.AuthMethod] = [] + self._auth_methods: list[ + acp.schema.EnvVarAuthMethod | acp.schema.TerminalAuthMethod | acp.schema.AuthMethodAgent + ] = [] def on_connect(self, conn: acp.Client) -> None: logger.info("ACP client connected") @@ -66,32 +67,18 @@ async def initialize( version=getattr(client_info, "version", None), ) - # get command and args of current process for terminal-auth - command = sys.argv[0] - args: list[str] = [] - - # Build terminal auth data for error response - terminal_args = args + ["login"] - # Build and cache auth methods for reuse in AUTH_REQUIRED errors self._auth_methods = [ - acp.schema.AuthMethod( + acp.schema.TerminalAuthMethod( id="login", + type="terminal", name="Login with Kimi account", description=( "Run `kimi login` command in the terminal, " "then follow the instructions to finish login." ), - # Store auth data in field_meta for building AUTH_REQUIRED error - field_meta={ - "terminal-auth": { - "command": command, - "args": terminal_args, - "label": "Kimi Code Login", - "env": {}, - "type": "terminal", - } - }, + args=["login"], + env={}, ), ] @@ -104,6 +91,7 @@ async def initialize( ), mcp_capabilities=acp.schema.McpCapabilities(http=True, sse=False), session_capabilities=acp.schema.SessionCapabilities( + field_meta={"kimi": {"sessionHistoryReplay": True}}, list=acp.schema.SessionListCapabilities(), resume=acp.schema.SessionResumeCapabilities(), ), @@ -129,20 +117,9 @@ def _check_auth(self) -> None: """Check if Kimi Code authentication is complete. Raise AUTH_REQUIRED if not.""" reason = self._check_token_usable() if reason: - auth_methods_data: list[dict[str, Any]] = [] - for m in self._auth_methods: - if m.field_meta and "terminal-auth" in m.field_meta: - terminal_auth = m.field_meta["terminal-auth"] - auth_methods_data.append( - { - "id": m.id, - "name": m.name, - "description": m.description, - "type": terminal_auth.get("type", "terminal"), - "args": terminal_auth.get("args", []), - "env": terminal_auth.get("env", {}), - } - ) + auth_methods_data = [ + m.model_dump(by_alias=True, exclude_none=True) for m in self._auth_methods + ] logger.warning("Authentication required, {reason}", reason=reason) raise acp.RequestError.auth_required({"authMethods": auth_methods_data}) @@ -255,18 +232,44 @@ async def _setup_session( async def load_session( self, cwd: str, session_id: str, mcp_servers: list[MCPServer] | None = None, **kwargs: Any - ) -> None: + ) -> acp.schema.LoadSessionResponse: logger.info("Loading session: {id} for working directory: {cwd}", id=session_id, cwd=cwd) if session_id in self.sessions: logger.warning("Session already loaded: {id}", id=session_id) - return + acp_session, model_id_conv = self.sessions[session_id] + else: + # Check authentication before loading session + self._check_auth() - # Check authentication before loading session - self._check_auth() + acp_session, model_id_conv = await self._setup_session(cwd, session_id, mcp_servers) - await self._setup_session(cwd, session_id, mcp_servers) - # TODO: replay session history? + await acp_session.send_session_info_update() + replayed_updates = await acp_session.replay_history() + logger.info( + "Replayed {count} ACP history updates for session: {id}", + count=replayed_updates, + id=session_id, + ) + + config = acp_session.cli.soul.runtime.config + return acp.schema.LoadSessionResponse( + field_meta=_session_response_meta(acp_session), + modes=acp.schema.SessionModeState( + available_modes=[ + acp.schema.SessionMode( + id="default", + name="Default", + description="The default mode.", + ), + ], + current_mode_id="default", + ), + models=acp.schema.SessionModelState( + available_models=_expand_llm_models(config.models), + current_model_id=model_id_conv.to_acp_model_id(), + ), + ) async def resume_session( self, cwd: str, session_id: str, mcp_servers: list[MCPServer] | None = None, **kwargs: Any @@ -277,8 +280,10 @@ async def resume_session( await self._setup_session(cwd, session_id, mcp_servers) acp_session, model_id_conv = self.sessions[session_id] + await acp_session.send_session_info_update() config = acp_session.cli.soul.runtime.config return acp.schema.ResumeSessionResponse( + field_meta=_session_response_meta(acp_session), modes=acp.schema.SessionModeState( available_modes=[ acp.schema.SessionMode( @@ -390,7 +395,10 @@ async def authenticate(self, method_id: str, **kwargs: Any) -> acp.AuthenticateR raise acp.RequestError.auth_required( { "message": "Please complete login in terminal first", - "authMethods": self._auth_methods, + "authMethods": [ + m.model_dump(by_alias=True, exclude_none=True) + for m in self._auth_methods + ], } ) @@ -466,3 +474,19 @@ def _expand_llm_models(models: dict[str, LLMModel]) -> list[acp.schema.ModelInfo ) ) return expanded_models + + +def _session_response_meta(acp_session: ACPSession) -> dict[str, Any]: + session = acp_session.cli.soul.runtime.session + title = session.state.custom_title or session.title + updated_at = ( + datetime.fromtimestamp(session.context_file.stat().st_mtime).astimezone().isoformat() + if session.context_file.exists() + else None + ) + meta: dict[str, Any] = {"sessionId": session.id} + if title != "Untitled": + meta["title"] = title + if updated_at is not None: + meta["updatedAt"] = updated_at + return {"kimi": {"session": meta}} diff --git a/src/kimi_cli/acp/session.py b/src/kimi_cli/acp/session.py index fa74e801b..29c7736fd 100644 --- a/src/kimi_cli/acp/session.py +++ b/src/kimi_cli/acp/session.py @@ -2,7 +2,8 @@ import asyncio import uuid -from contextvars import ContextVar +from contextvars import ContextVar, Token +from datetime import datetime import acp import streamingjson # type: ignore[reportMissingTypeStubs] @@ -22,9 +23,11 @@ from kimi_cli.wire.types import ( ApprovalRequest, ApprovalResponse, + AudioURLPart, CompactionBegin, CompactionEnd, ContentPart, + ImageURLPart, MCPLoadingBegin, MCPLoadingEnd, Notification, @@ -75,6 +78,44 @@ def should_hide_terminal_output(tool_call_id: str) -> bool: return calls is not None and tool_call_id in calls +def _content_part_to_acp_block(part: ContentPart) -> ACPContentBlock: + if isinstance(part, TextPart): + return acp.schema.TextContentBlock(type="text", text=part.text) + if isinstance(part, ImageURLPart): + mime_type, data = _split_data_url(part.image_url.url) + if data is not None: + return acp.schema.ImageContentBlock(type="image", mime_type=mime_type, data=data) + return acp.schema.ResourceContentBlock( + type="resource_link", + uri=part.image_url.url, + name="image", + mime_type=mime_type, + ) + if isinstance(part, AudioURLPart): + mime_type, data = _split_data_url(part.audio_url.url) + if data is not None: + return acp.schema.AudioContentBlock(type="audio", mime_type=mime_type, data=data) + return acp.schema.ResourceContentBlock( + type="resource_link", + uri=part.audio_url.url, + name="audio", + mime_type=mime_type, + ) + + logger.warning("Unsupported replay user content part: {part}", part=part) + return acp.schema.TextContentBlock(type="text", text=f"[{part.__class__.__name__}]") + + +def _split_data_url(url: str) -> tuple[str, str | None]: + if not url.startswith("data:"): + return "application/octet-stream", None + header, sep, data = url.partition(",") + if not sep or ";base64" not in header: + return "application/octet-stream", None + mime_type = header.removeprefix("data:").removesuffix(";base64") + return mime_type or "application/octet-stream", data + + class _ToolCallState: """Manages the state of a single tool call for streaming updates.""" @@ -116,8 +157,22 @@ def __init__(self): self.tool_calls: dict[str, _ToolCallState] = {} """Map of tool call ID (LLM-side ID) to tool call state.""" self.last_tool_call: _ToolCallState | None = None + self.content_run_kind: str | None = None + """The active ACP content run kind: `message` or `thought`.""" + self.content_run_message_id: str | None = None + """Stable ACP message ID for the current contiguous content run.""" self.cancel_event = asyncio.Event() + def reset_content_run(self) -> None: + self.content_run_kind = None + self.content_run_message_id = None + + def content_run_id(self, kind: str) -> str: + if self.content_run_kind != kind or self.content_run_message_id is None: + self.content_run_kind = kind + self.content_run_message_id = str(uuid.uuid4()) + return self.content_run_message_id + class ACPSession: def __init__( @@ -161,29 +216,32 @@ async def prompt(self, prompt: list[ACPContentBlock]) -> acp.PromptResponse: async for msg in self._cli.run(user_input, self._turn_state.cancel_event): match msg: case TurnBegin(): - pass + self._reset_content_run() case SteerInput(): - pass + self._reset_content_run() case TurnEnd(): - pass + self._reset_content_run() case StepBegin(): - pass + self._reset_content_run() case StepInterrupted(): + self._reset_content_run() break case StepRetry(): - pass + self._reset_content_run() case CompactionBegin(): - pass + self._reset_content_run() case CompactionEnd(): - pass + self._reset_content_run() case MCPLoadingBegin(): - pass + self._reset_content_run() case MCPLoadingEnd(): - pass + self._reset_content_run() case StatusUpdate(): - pass + self._reset_content_run() case Notification(): + self._reset_content_run() await self._send_notification(msg) + self._reset_content_run() case ThinkPart(think=think): await self._send_thinking(think) case TextPart(text=text): @@ -244,8 +302,141 @@ async def prompt(self, prompt: list[ACPContentBlock]) -> acp.PromptResponse: reset_current_kaos(kaos_token) _terminal_tool_call_ids.reset(terminal_tool_calls_token) _current_turn_id.reset(token) + await self.send_session_info_update() return acp.PromptResponse(stop_reason="end_turn") + async def replay_history(self) -> int: + """Replay persisted wire history as ACP session updates.""" + old_turn_state = self._turn_state + turn_token: Token[str | None] | None = None + replayed_updates = 0 + self._turn_state = None + try: + async for record in self._cli.soul.runtime.session.wire_file.iter_records(): + msg = record.to_wire_message() + match msg: + case TurnBegin(user_input=user_input): + if turn_token is not None: + _current_turn_id.reset(turn_token) + self._turn_state = _TurnState() + turn_token = _current_turn_id.set(self._turn_state.id) + replayed_updates += await self._send_user_input(user_input) + case SteerInput(user_input=user_input): + if turn_token is None: + turn_token = self._begin_replay_turn() + self._reset_content_run() + replayed_updates += await self._send_user_input(user_input) + case TurnEnd() | StepInterrupted(): + if turn_token is not None: + _current_turn_id.reset(turn_token) + turn_token = None + self._turn_state = None + case StepBegin(): + if turn_token is None: + turn_token = self._begin_replay_turn() + case ThinkPart(think=think): + if turn_token is None: + turn_token = self._begin_replay_turn() + await self._send_thinking(think) + replayed_updates += 1 + case TextPart(text=text): + if turn_token is None: + turn_token = self._begin_replay_turn() + await self._send_text(text) + replayed_updates += 1 + case ContentPart(): + if turn_token is None: + turn_token = self._begin_replay_turn() + logger.warning("Unsupported replay content part: {part}", part=msg) + await self._send_text(f"[{msg.__class__.__name__}]") + replayed_updates += 1 + case ToolCall(): + if turn_token is None: + turn_token = self._begin_replay_turn() + await self._send_tool_call(msg) + replayed_updates += 1 + case ToolCallPart(): + if self._turn_state is not None: + await self._send_tool_call_part(msg) + replayed_updates += 1 + case ToolResult(): + if self._turn_state is not None: + await self._send_tool_result(msg) + replayed_updates += 1 + case Notification(): + if turn_token is None: + turn_token = self._begin_replay_turn() + self._reset_content_run() + await self._send_notification(msg) + self._reset_content_run() + replayed_updates += 1 + case _: + pass + finally: + if turn_token is not None: + _current_turn_id.reset(turn_token) + self._turn_state = old_turn_state + if replayed_updates == 0: + replayed_updates = await self._replay_context_history() + return replayed_updates + + def _begin_replay_turn(self) -> Token[str | None]: + self._turn_state = _TurnState() + return _current_turn_id.set(self._turn_state.id) + + async def _replay_context_history(self) -> int: + old_turn_state = self._turn_state + turn_token: Token[str | None] | None = None + replayed_updates = 0 + self._turn_state = None + try: + for message in self._cli.soul.context.history: + if turn_token is not None: + _current_turn_id.reset(turn_token) + turn_token = self._begin_replay_turn() + + if message.role == "user": + replayed_updates += await self._send_user_input(list(message.content)) + elif message.role == "assistant": + for part in message.content: + if isinstance(part, ThinkPart): + await self._send_thinking(part.think) + elif isinstance(part, TextPart): + await self._send_text(part.text) + else: + logger.warning("Unsupported context replay part: {part}", part=part) + await self._send_text(f"[{part.__class__.__name__}]") + replayed_updates += 1 + finally: + if turn_token is not None: + _current_turn_id.reset(turn_token) + self._turn_state = old_turn_state + return replayed_updates + + async def _send_user_input(self, user_input: str | list[ContentPart]) -> int: + blocks: list[ACPContentBlock] + if isinstance(user_input, str): + blocks = [acp.schema.TextContentBlock(type="text", text=user_input)] + else: + blocks = [_content_part_to_acp_block(part) for part in user_input] + + for block in blocks: + await self._send_user_block(block) + return len(blocks) + + async def _send_user_block(self, block: ACPContentBlock) -> None: + if not self._id or not self._conn: + return + + await self._conn.session_update( + session_id=self._id, + update=acp.schema.UserMessageChunk( + content=block, + message_id=self._content_run_id("user"), + session_update="user_message_chunk", + ), + ) + async def cancel(self) -> None: if self._turn_state is None: logger.warning("Cancel requested but no prompt is running") @@ -253,6 +444,41 @@ async def cancel(self) -> None: self._turn_state.cancel_event.set() + async def send_session_info_update(self) -> None: + """Send current session metadata, including title, if available.""" + if not self._id or not self._conn: + return + + try: + session = self._cli.soul.runtime.session + except AttributeError: + return + title = session.state.custom_title or session.title + updated_at = ( + datetime.fromtimestamp(session.context_file.stat().st_mtime).astimezone().isoformat() + if session.context_file.exists() + else None + ) + if title == "Untitled" and updated_at is None: + return + + await self._conn.session_update( + session_id=self._id, + update=acp.schema.SessionInfoUpdate( + session_update="session_info_update", + title=title if title != "Untitled" else None, + updated_at=updated_at, + ), + ) + + def _reset_content_run(self) -> None: + if self._turn_state is not None: + self._turn_state.reset_content_run() + + def _content_run_id(self, kind: str) -> str: + assert self._turn_state is not None + return self._turn_state.content_run_id(kind) + async def _send_thinking(self, think: str): """Send thinking content to client.""" if not self._id or not self._conn: @@ -262,6 +488,7 @@ async def _send_thinking(self, think: str): self._id, acp.schema.AgentThoughtChunk( content=acp.schema.TextContentBlock(type="text", text=think), + message_id=self._content_run_id("thought"), session_update="agent_thought_chunk", ), ) @@ -275,6 +502,7 @@ async def _send_text(self, text: str): session_id=self._id, update=acp.schema.AgentMessageChunk( content=acp.schema.TextContentBlock(type="text", text=text), + message_id=self._content_run_id("message"), session_update="agent_message_chunk", ), ) @@ -292,6 +520,7 @@ async def _send_tool_call(self, tool_call: ToolCall): assert self._turn_state is not None if not self._id or not self._conn: return + self._reset_content_run() # Create and store tool call state state = _ToolCallState(tool_call) @@ -353,6 +582,7 @@ async def _send_tool_result(self, result: ToolResult): assert self._turn_state is not None if not self._id or not self._conn: return + self._reset_content_run() tool_ret = result.return_value diff --git a/src/kimi_cli/app.py b/src/kimi_cli/app.py index fb3b6ab31..66bdf1dd1 100644 --- a/src/kimi_cli/app.py +++ b/src/kimi_cli/app.py @@ -652,6 +652,7 @@ async def _mirror_external_cancel() -> None: user_input, _ui_loop_fn, run_cancel_event, + wire_file=self._runtime.session.wire_file, runtime=self._runtime, ) ) diff --git a/src/kimi_cli/cli/__init__.py b/src/kimi_cli/cli/__init__.py index b9aafb7a8..3435811af 100644 --- a/src/kimi_cli/cli/__init__.py +++ b/src/kimi_cli/cli/__init__.py @@ -1015,8 +1015,23 @@ def term( @cli.command() -def acp(): +def acp( + auth_command: Annotated[ + str | None, + typer.Argument( + help="Internal terminal-auth command.", + show_default=False, + hidden=True, + ), + ] = None, +): """Run Kimi Code CLI ACP server.""" + if auth_command == "login": + login(json=False) + return + if auth_command is not None: + raise typer.BadParameter("Unknown ACP auth command") + from kimi_cli.acp import acp_main acp_main() diff --git a/tests/acp/test_cli_auth_command.py b/tests/acp/test_cli_auth_command.py new file mode 100644 index 000000000..19975df12 --- /dev/null +++ b/tests/acp/test_cli_auth_command.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from typer.testing import CliRunner + +from kimi_cli.cli import cli + + +def test_acp_login_delegates_to_top_level_login(monkeypatch): + called: list[bool] = [] + + def fake_login(*, json: bool = False) -> None: + called.append(json) + + monkeypatch.setattr("kimi_cli.cli.login", fake_login) + + result = CliRunner().invoke(cli, ["acp", "login"]) + + assert result.exit_code == 0 + assert called == [False] diff --git a/tests/acp/test_protocol_v1.py b/tests/acp/test_protocol_v1.py index 70e413caa..53fbfcf1d 100644 --- a/tests/acp/test_protocol_v1.py +++ b/tests/acp/test_protocol_v1.py @@ -2,12 +2,14 @@ from __future__ import annotations +import os + import acp import pytest from kimi_cli.acp.version import CURRENT_VERSION -from .conftest import ACPTestClient +from .conftest import ACPTestClient, _kimi_bin, _repo_root pytestmark = pytest.mark.asyncio @@ -74,6 +76,10 @@ async def test_prompt_with_scripted_echo( assert resp.stop_reason in ("end_turn", "max_tokens", "max_turn_requests") # The scripted echo provider should have sent session updates assert len(test_client.updates) > 0 + assert any( + update.session_update == "session_info_update" and update.title == "Say hello" + for update in test_client.updates + ) async def test_list_sessions( @@ -128,6 +134,8 @@ async def test_resume_session( assert resume_resp.models is not None assert isinstance(resume_resp.models.current_model_id, str) assert len(resume_resp.models.available_models) > 0 + assert resume_resp.field_meta is not None + assert resume_resp.field_meta["kimi"]["session"]["title"] == "Hello" async def test_resume_session_not_found( @@ -148,6 +156,133 @@ async def test_resume_session_not_found( ) +async def test_load_session_replays_history(acp_share_dir, tmp_path): + """session/load rebinds the session and replays persisted transcript updates.""" + env = { + **os.environ, + "KIMI_SHARE_DIR": str(acp_share_dir), + } + work_dir = tmp_path / "workdir" + work_dir.mkdir(exist_ok=True) + + first_client = ACPTestClient() + async with acp.spawn_agent_process( + first_client, + _kimi_bin(), + "acp", + env=env, + cwd=str(_repo_root()), + use_unstable_protocol=True, + ) as (conn, _process): + await conn.initialize(protocol_version=1) + session_resp = await conn.new_session(cwd=str(work_dir)) + await conn.prompt( + prompt=[acp.text_block("Replay this please")], + session_id=session_resp.session_id, + ) + session_id = session_resp.session_id + + second_client = ACPTestClient() + async with acp.spawn_agent_process( + second_client, + _kimi_bin(), + "acp", + env=env, + cwd=str(_repo_root()), + use_unstable_protocol=True, + ) as (conn, _process): + await conn.initialize(protocol_version=1) + load_resp = await conn.load_session(cwd=str(work_dir), session_id=session_id) + + assert load_resp.modes is not None + assert load_resp.models is not None + assert load_resp.field_meta is not None + assert load_resp.field_meta["kimi"]["session"]["title"] == "Replay this please" + assert any( + update.session_update == "session_info_update" and update.title == "Replay this please" + for update in second_client.updates + ) + + replayed = [ + (update.session_update, getattr(update.content, "text", None), update.message_id) + for update in second_client.updates + if hasattr(update, "content") + ] + assert any( + update_type == "user_message_chunk" and text == "Replay this please" and message_id + for update_type, text, message_id in replayed + ) + assert any( + update_type == "agent_message_chunk" and text == "Hello from scripted echo!" and message_id + for update_type, text, message_id in replayed + ) + + +async def test_load_session_replays_context_when_wire_history_missing(acp_share_dir, tmp_path): + """Older ACP sessions can still replay text history if no wire log was recorded.""" + env = { + **os.environ, + "KIMI_SHARE_DIR": str(acp_share_dir), + } + work_dir = tmp_path / "workdir" + work_dir.mkdir(exist_ok=True) + + first_client = ACPTestClient() + async with acp.spawn_agent_process( + first_client, + _kimi_bin(), + "acp", + env=env, + cwd=str(_repo_root()), + use_unstable_protocol=True, + ) as (conn, _process): + await conn.initialize(protocol_version=1) + session_resp = await conn.new_session(cwd=str(work_dir)) + await conn.prompt( + prompt=[acp.text_block("Replay from context")], + session_id=session_resp.session_id, + ) + session_id = session_resp.session_id + + wire_files = list(acp_share_dir.rglob("wire.jsonl")) + assert wire_files + for wire_file in wire_files: + wire_file.unlink() + + second_client = ACPTestClient() + async with acp.spawn_agent_process( + second_client, + _kimi_bin(), + "acp", + env=env, + cwd=str(_repo_root()), + use_unstable_protocol=True, + ) as (conn, _process): + await conn.initialize(protocol_version=1) + load_resp = await conn.load_session(cwd=str(work_dir), session_id=session_id) + + assert load_resp.field_meta is not None + assert load_resp.field_meta["kimi"]["session"]["title"] == "Replay from context" + assert any( + update.session_update == "session_info_update" and update.title == "Replay from context" + for update in second_client.updates + ) + + replayed = [ + (update.session_update, getattr(update.content, "text", None), update.message_id) + for update in second_client.updates + if hasattr(update, "content") + ] + assert any( + update_type == "user_message_chunk" and text == "Replay from context" and message_id + for update_type, text, message_id in replayed + ) + assert any( + update_type == "agent_message_chunk" and text == "Hello from scripted echo!" and message_id + for update_type, text, message_id in replayed + ) + + async def test_cancel_session( acp_client: tuple[acp.ClientSideConnection, ACPTestClient], tmp_path, diff --git a/tests/acp/test_server_initialize.py b/tests/acp/test_server_initialize.py index ad20a23e6..31db83dcc 100644 --- a/tests/acp/test_server_initialize.py +++ b/tests/acp/test_server_initialize.py @@ -2,8 +2,6 @@ from __future__ import annotations -from unittest.mock import patch - import pytest from kimi_cli.acp.server import ACPServer @@ -11,38 +9,31 @@ pytestmark = pytest.mark.asyncio -@pytest.mark.parametrize( - "argv, expected_command, expected_terminal_args", - [ - # Standard entry-point: kimi acp - (["/usr/local/bin/kimi", "acp"], "/usr/local/bin/kimi", ["login"]), - # kimi-code entry-point (JetBrains scenario) - (["/usr/local/bin/kimi-code", "acp"], "/usr/local/bin/kimi-code", ["login"]), - # kimi-cli entry-point - (["/usr/local/bin/kimi-cli", "acp"], "/usr/local/bin/kimi-cli", ["login"]), - # Arbitrary wrapper script - (["/opt/wrapper.sh", "acp"], "/opt/wrapper.sh", ["login"]), - ], - ids=["kimi", "kimi-code", "kimi-cli", "wrapper-script"], -) -async def test_initialize_argv_handling( - argv: list[str], - expected_command: str, - expected_terminal_args: list[str], -): - """initialize() should not crash regardless of sys.argv content.""" +async def test_initialize_advertises_terminal_auth_method(): + """initialize() should advertise terminal auth using the ACP schema.""" server = ACPServer() - with patch("kimi_cli.acp.server.sys") as mock_sys: - mock_sys.argv = argv - resp = await server.initialize(protocol_version=1) + resp = await server.initialize(protocol_version=1) assert resp.protocol_version == 1 assert resp.auth_methods is not None assert len(resp.auth_methods) == 1 auth_method = resp.auth_methods[0] - assert auth_method.field_meta is not None - terminal_auth = auth_method.field_meta["terminal-auth"] - assert terminal_auth["command"] == expected_command - assert terminal_auth["args"] == expected_terminal_args + assert auth_method.type == "terminal" + assert auth_method.args == ["login"] + assert auth_method.env == {} + + +async def test_initialize_advertises_session_history_replay(): + """loadSession only means bind/resume; Kimi separately advertises transcript replay.""" + server = ACPServer() + + resp = await server.initialize(protocol_version=1) + + assert resp.agent_capabilities is not None + assert resp.agent_capabilities.load_session is True + assert resp.agent_capabilities.session_capabilities is not None + assert resp.agent_capabilities.session_capabilities.field_meta == { + "kimi": {"sessionHistoryReplay": True} + } diff --git a/tests/acp/test_session_notifications.py b/tests/acp/test_session_notifications.py index 70b9197d4..e7ad21093 100644 --- a/tests/acp/test_session_notifications.py +++ b/tests/acp/test_session_notifications.py @@ -2,9 +2,11 @@ import asyncio from pathlib import Path +from types import SimpleNamespace import acp import pytest +from kosong.tooling import ToolOk, ToolResult from kosong.tooling.empty import EmptyToolset from kimi_cli.acp.session import ACPSession @@ -14,7 +16,8 @@ from kimi_cli.soul.agent import Agent, Runtime from kimi_cli.soul.context import Context from kimi_cli.soul.kimisoul import KimiSoul -from kimi_cli.wire.types import Notification, TextPart, ToolCall, TurnBegin, TurnEnd +from kimi_cli.wire.file import WireFile +from kimi_cli.wire.types import Notification, TextPart, ThinkPart, ToolCall, TurnBegin, TurnEnd class _FakeConn: @@ -46,6 +49,43 @@ async def run(self, _user_input, _cancel_event): yield TurnEnd() +class _FakeStreamingCLI: + async def run(self, _user_input, _cancel_event): + yield TurnBegin(user_input=[TextPart(text="hello")]) + yield ThinkPart(think="thinking") + yield TextPart(text="Hi mom") + yield TextPart(text=", what is for dinner?") + yield ToolCall( + id="call-dinner", + function=ToolCall.FunctionBody(name="ReadFile", arguments='{"path":"menu.txt"}'), + ) + yield ToolResult(tool_call_id="call-dinner", return_value=ToolOk(output="pizza")) + yield TextPart(text="Tell Dad I said hi") + yield TurnEnd() + + +class _FakeReplayCLI: + def __init__(self, wire_file: WireFile) -> None: + session = SimpleNamespace(wire_file=wire_file) + runtime = SimpleNamespace(session=session) + self.soul = SimpleNamespace(runtime=runtime, context=SimpleNamespace(history=[])) + + +def _notification(title: str = "Background task completed: build project") -> Notification: + return Notification( + id="n1234567", + category="task", + type="task.completed", + source_kind="background_task", + source_id="b1234567", + title=title, + body="Task ID: b1234567\nStatus: completed", + severity="success", + created_at=123.456, + payload={"task_id": "b1234567"}, + ) + + @pytest.mark.asyncio async def test_acp_session_surfaces_notification_as_message_chunk() -> None: conn = _FakeConn() @@ -64,6 +104,90 @@ async def test_acp_session_surfaces_notification_as_message_chunk() -> None: assert text_update.content.text == "done" +@pytest.mark.asyncio +async def test_acp_session_assigns_message_ids_to_distinct_content_runs() -> None: + conn = _FakeConn() + session = ACPSession("session-1", _FakeStreamingCLI(), conn) # type: ignore[arg-type] + + response = await session.prompt([acp.text_block("hello")]) + + assert response.stop_reason == "end_turn" + chunks = [ + update + for _, update in conn.updates + if update.session_update in {"agent_thought_chunk", "agent_message_chunk"} + ] + assert [chunk.content.text for chunk in chunks] == [ + "thinking", + "Hi mom", + ", what is for dinner?", + "Tell Dad I said hi", + ] + assert chunks[0].message_id + assert chunks[1].message_id + assert chunks[1].message_id == chunks[2].message_id + assert chunks[3].message_id + assert chunks[3].message_id != chunks[1].message_id + assert chunks[0].message_id not in {chunks[1].message_id, chunks[3].message_id} + + +@pytest.mark.asyncio +async def test_replay_history_handles_notification_after_turn_end(tmp_path: Path) -> None: + wire_file = WireFile(tmp_path / "wire.jsonl") + await wire_file.append_message(TurnBegin(user_input=[TextPart(text="hello")])) + await wire_file.append_message(TextPart(text="done")) + await wire_file.append_message(TurnEnd()) + await wire_file.append_message(_notification()) + conn = _FakeConn() + session = ACPSession("session-1", _FakeReplayCLI(wire_file), conn) # type: ignore[arg-type] + + replayed_updates = await session.replay_history() + + assert replayed_updates == 3 + chunks = [ + update + for _, update in conn.updates + if update.session_update in {"agent_message_chunk", "user_message_chunk"} + ] + assert [chunk.content.text for chunk in chunks] == [ + "hello", + "done", + "[Notification] Background task completed: build project\n" + "Task ID: b1234567\n" + "Status: completed", + ] + assert all(chunk.message_id for chunk in chunks) + + +@pytest.mark.asyncio +async def test_replay_history_resets_message_id_around_notification(tmp_path: Path) -> None: + wire_file = WireFile(tmp_path / "wire.jsonl") + await wire_file.append_message(TurnBegin(user_input=[TextPart(text="hello")])) + await wire_file.append_message(TextPart(text="first")) + await wire_file.append_message(_notification()) + await wire_file.append_message(TextPart(text="second")) + await wire_file.append_message(TurnEnd()) + conn = _FakeConn() + session = ACPSession("session-1", _FakeReplayCLI(wire_file), conn) # type: ignore[arg-type] + + replayed_updates = await session.replay_history() + + assert replayed_updates == 4 + chunks = [ + update for _, update in conn.updates if update.session_update == "agent_message_chunk" + ] + assert [chunk.content.text for chunk in chunks] == [ + "first", + "[Notification] Background task completed: build project\n" + "Task ID: b1234567\n" + "Status: completed", + "second", + ] + assert chunks[0].message_id != chunks[1].message_id + assert chunks[1].message_id != chunks[2].message_id + assert chunks[0].message_id != chunks[2].message_id + + class _BlockingApprovalConn(_FakeConn): def __init__(self) -> None: super().__init__() diff --git a/tests/ui_and_conv/test_acp_server_auth.py b/tests/ui_and_conv/test_acp_server_auth.py index 40a025821..ae7ed4e19 100644 --- a/tests/ui_and_conv/test_acp_server_auth.py +++ b/tests/ui_and_conv/test_acp_server_auth.py @@ -13,17 +13,13 @@ def server() -> ACPServer: """Create an ACPServer instance with mocked auth methods.""" s = ACPServer() s._auth_methods = [ - acp.schema.AuthMethod( + acp.schema.TerminalAuthMethod( id="login", + type="terminal", name="Test Login", description="Test description", - field_meta={ - "terminal-auth": { - "type": "terminal", - "args": ["kimi", "login"], - "env": {}, - } - }, + args=["login"], + env={}, ) ] return s diff --git a/uv.lock b/uv.lock index 9d1b6ed7c..384eeddef 100644 --- a/uv.lock +++ b/uv.lock @@ -17,14 +17,14 @@ members = [ [[package]] name = "agent-client-protocol" -version = "0.8.0" +version = "0.10.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "pydantic" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/a8/a4/26698e0186933b4ab6e814626c99ee52b5522d039b5c94c983ecb3a66eed/agent_client_protocol-0.8.0.tar.gz", hash = "sha256:f9eade29167ff72a10fae7a0a0c1f27436909a790e159fb10265c2874e58d922", size = 68577, upload-time = "2026-02-07T17:08:46.513Z" } +sdist = { url = "https://files.pythonhosted.org/packages/21/5c/d60196c536c8f66bb6a238c8a6d0d6fa84a2e3436008c139cb4a79003a25/agent_client_protocol-0.10.0.tar.gz", hash = "sha256:f8a6041e27423131e42e4d0cdd850d5b094b1092f79cc29501ab2bd57b92ee88", size = 81502, upload-time = "2026-05-07T19:11:56.784Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/4b/04/e55a3c549c09c0023cb92c696c7b98d97bb657088f940e34f4bc47d1a49a/agent_client_protocol-0.8.0-py3-none-any.whl", hash = "sha256:2d5712b88b3249dbd6148b24d32c6eb8992e5663f224db6291524ac80cca8037", size = 54362, upload-time = "2026-02-07T17:08:45.575Z" }, + { url = "https://files.pythonhosted.org/packages/6c/cc/139895c0c5cd2acefc365085da0735d93df0cf8427ff4ee351961090e1af/agent_client_protocol-0.10.0-py3-none-any.whl", hash = "sha256:97a1a69c5d094e2625b09c4dbc2d5cf19637c4943630ceed52ab0578ecd5e14c", size = 65400, upload-time = "2026-05-07T19:11:55.614Z" }, ] [[package]] @@ -1330,7 +1330,7 @@ dev = [ [package.metadata] requires-dist = [ - { name = "agent-client-protocol", specifier = "==0.8.0" }, + { name = "agent-client-protocol", specifier = "==0.10.0" }, { name = "aiofiles", specifier = ">=24.0,<26.0" }, { name = "aiohttp", specifier = "==3.13.3" }, { name = "batrachian-toad", marker = "python_full_version >= '3.14'", specifier = "==0.5.23" },