diff --git a/docs/channels.md b/docs/channels.md index b7ab1e1..d7953e4 100644 --- a/docs/channels.md +++ b/docs/channels.md @@ -1,10 +1,11 @@ # Channels -Bub supports running the same agent loop through message channels. -Use channels when you want remote operation from mobile or shared team environments. +Bub supports running the same agent loop through channel adapters. +Use channels when you want either local interactive operation or remote operation from mobile/shared team environments. ## Supported Channels +- `cli` (local): interactive terminal channel used by `uv run bub chat`. - [Telegram](telegram.md): direct messages and group chats. - [Discord](discord.md): servers, channels, and threads. @@ -20,6 +21,7 @@ If the process exits immediately, check that at least one channel is enabled in ## Session Isolation +- CLI session key: `cli` or `cli:` (from `--session-id`). - Telegram session key: `telegram:` - Discord session key: `discord:` diff --git a/docs/cli.md b/docs/cli.md index e4f0abc..436dff1 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -1,5 +1,7 @@ # Interactive CLI +`uv run bub chat` runs the local `cli` channel adapter (same channel pipeline as other channels, but local-only). + ## Run Commands ```bash diff --git a/docs/index.md b/docs/index.md index 19333f2..6b2d5dc 100644 --- a/docs/index.md +++ b/docs/index.md @@ -35,5 +35,5 @@ Its operating philosophy follows [Socialized Evaluation](https://psiace.me/posts - [Interactive CLI](cli.md): interactive workflow and troubleshooting. - [Architecture](architecture.md): runtime boundaries and internals. - [Deployment Guide](deployment.md): local and Docker operations. -- [Channels](channels.md): Telegram and Discord runtime model. +- [Channels](channels.md): CLI/Telegram/Discord runtime model. - [Post: Socialized Evaluation and Agent Partnership](posts/2026-03-01-bub-socialized-evaluation-and-agent-partnership.md): project position and principles. diff --git a/mkdocs.yml b/mkdocs.yml index 6886829..1fee5e7 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -15,9 +15,9 @@ nav: - "2025-07-16 ยท Baby Bub Bootstrap Milestone": posts/2025-07-16-baby-bub-bootstrap-milestone.md - Deployment: deployment.md - Architecture: architecture.md - - Interactive CLI: cli.md - Channels: - Overview: channels.md + - CLI (Local): cli.md - Telegram: telegram.md - Discord: discord.md diff --git a/src/bub/channels/__init__.py b/src/bub/channels/__init__.py index b9274cf..d45f044 100644 --- a/src/bub/channels/__init__.py +++ b/src/bub/channels/__init__.py @@ -1,6 +1,7 @@ """Channel adapters and bus exports.""" from bub.channels.base import BaseChannel +from bub.channels.cli import CliChannel from bub.channels.discord import DiscordChannel, DiscordConfig from bub.channels.manager import ChannelManager from bub.channels.telegram import TelegramChannel, TelegramConfig @@ -8,6 +9,7 @@ __all__ = [ "BaseChannel", "ChannelManager", + "CliChannel", "DiscordChannel", "DiscordConfig", "TelegramChannel", diff --git a/src/bub/channels/base.py b/src/bub/channels/base.py index 69d7103..01b398f 100644 --- a/src/bub/channels/base.py +++ b/src/bub/channels/base.py @@ -33,6 +33,11 @@ def output_channel(self) -> str: """The name of the channel to send outputs to. Defaults to the same channel.""" return self.name + @property + def debounce_enabled(self) -> bool: + """Whether inbound messages should be debounced before model execution.""" + return True + @abstractmethod def is_mentioned(self, message: T) -> bool: """Determine if the message is relevant to this channel.""" @@ -46,6 +51,10 @@ async def run_prompt(self, session_id: str, prompt: str) -> LoopResult: """Run the given prompt through the runtime and return the result.""" return await self.runtime.handle_input(session_id, prompt) + def format_prompt(self, prompt: str) -> str: + """Format accumulated prompt text before sending it to the runtime.""" + return f"channel: ${self.output_channel}\n{prompt}" + @abstractmethod async def process_output(self, session_id: str, output: LoopResult) -> None: """Process the output returned by the LLM.""" diff --git a/src/bub/channels/cli.py b/src/bub/channels/cli.py new file mode 100644 index 0000000..962f4be --- /dev/null +++ b/src/bub/channels/cli.py @@ -0,0 +1,148 @@ +"""CLI channel adapter.""" + +from __future__ import annotations + +from collections.abc import Awaitable, Callable +from datetime import datetime +from hashlib import md5 +from pathlib import Path + +from loguru import logger +from prompt_toolkit import PromptSession +from prompt_toolkit.completion import WordCompleter +from prompt_toolkit.formatted_text import FormattedText +from prompt_toolkit.history import FileHistory +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.patch_stdout import patch_stdout +from rich import get_console + +from bub.app.runtime import AppRuntime +from bub.channels.base import BaseChannel +from bub.cli.render import CliRenderer +from bub.core.agent_loop import LoopResult + + +class CliChannel(BaseChannel[str]): + """Interactive terminal channel.""" + + name = "cli" + + def __init__(self, runtime: AppRuntime, *, session_id: str = "cli") -> None: + super().__init__(runtime) + self._session_id = session_id + self._session = runtime.get_session(session_id) + self._renderer = CliRenderer(get_console()) + self._mode = "agent" + self._last_tape_info: object | None = None + self._prompt = self._build_prompt() + self._stop_requested = False + + @property + def debounce_enabled(self) -> bool: + return False + + async def start(self, on_receive: Callable[[str], Awaitable[None]]) -> None: + self._renderer.welcome(model=self.runtime.settings.model, workspace=str(self.runtime.workspace)) + await self._refresh_tape_info() + + while not self._stop_requested: + try: + with patch_stdout(raw=True): + raw = (await self._prompt.prompt_async(self._prompt_message())).strip() + except KeyboardInterrupt: + self._renderer.info("Interrupted. Use ',quit' to exit.") + continue + except EOFError: + break + + if not raw: + continue + + request = self._normalize_input(raw) + with self._renderer.console.status("[cyan]Processing...[/cyan]", spinner="dots"): + await on_receive(request) + + self._renderer.info("Bye.") + + def is_mentioned(self, message: str) -> bool: + _ = message + return True + + async def get_session_prompt(self, message: str) -> tuple[str, str]: + return self._session_id, message + + def format_prompt(self, prompt: str) -> str: + return prompt + + async def process_output(self, session_id: str, output: LoopResult) -> None: + _ = session_id + await self._refresh_tape_info() + if output.immediate_output: + self._renderer.command_output(output.immediate_output) + if output.error: + self._renderer.error(output.error) + if output.assistant_output: + self._renderer.assistant_output(output.assistant_output) + if output.exit_requested: + self._stop_requested = True + + async def _refresh_tape_info(self) -> None: + try: + self._last_tape_info = await self._session.tape.info() + except Exception as exc: + self._last_tape_info = None + logger.debug("cli.tape_info.unavailable session_id={} error={}", self._session_id, exc) + + def _build_prompt(self) -> PromptSession[str]: + kb = KeyBindings() + + @kb.add("c-x", eager=True) + def _toggle_mode(event) -> None: + self._mode = "shell" if self._mode == "agent" else "agent" + event.app.invalidate() + + def _tool_sort_key(tool_name: str) -> tuple[str, str]: + section, _, name = tool_name.rpartition(".") + return (section, name) + + history_file = self._history_file(self.runtime.settings.resolve_home(), self.runtime.workspace) + history_file.parent.mkdir(parents=True, exist_ok=True) + history = FileHistory(str(history_file)) + tool_names = sorted((f",{tool}" for tool in self._session.tool_view.all_tools()), key=_tool_sort_key) + completer = WordCompleter(tool_names, ignore_case=True) + return PromptSession( + completer=completer, + complete_while_typing=True, + key_bindings=kb, + history=history, + bottom_toolbar=self._render_bottom_toolbar, + ) + + def _prompt_message(self) -> FormattedText: + cwd = Path.cwd().name + symbol = ">" if self._mode == "agent" else "," + return FormattedText([("bold", f"{cwd} {symbol} ")]) + + def _render_bottom_toolbar(self) -> FormattedText: + info = self._last_tape_info + now = datetime.now().strftime("%H:%M") + left = f"{now} mode:{self._mode}" + right = ( + f"model:{self.runtime.settings.model} " + f"entries:{getattr(info, 'entries', '-')} " + f"anchors:{getattr(info, 'anchors', '-')} " + f"last:{getattr(info, 'last_anchor', None) or '-'}" + ) + return FormattedText([("", f"{left} {right}")]) + + def _normalize_input(self, raw: str) -> str: + if self._mode != "shell": + return raw + if raw.startswith(","): + return raw + return f", {raw}" + + @staticmethod + def _history_file(home: Path, workspace: Path) -> Path: + workspace_hash = md5(str(workspace).encode("utf-8")).hexdigest() # noqa: S324 + return home / "history" / f"{workspace_hash}.history" diff --git a/src/bub/channels/manager.py b/src/bub/channels/manager.py index 990b869..5f53a75 100644 --- a/src/bub/channels/manager.py +++ b/src/bub/channels/manager.py @@ -16,17 +16,24 @@ class ChannelManager: """Coordinate inbound routing and outbound dispatch for channels.""" - def __init__(self, runtime: AppRuntime) -> None: + def __init__(self, runtime: AppRuntime, *, include_defaults: bool = True) -> None: self.runtime = runtime self._channels: dict[str, BaseChannel] = {} self._channel_tasks: list[asyncio.Task[None]] = [] self._session_runners: dict[str, SessionRunner] = {} - for channel_cls in self.default_channels(): - self.register(channel_cls) + if include_defaults: + for channel_cls in self.default_channels(): + self.register(channel_cls) runtime.install_hooks(self) def register[T: type[BaseChannel]](self, channel: T) -> T: - self._channels[channel.name] = channel(self.runtime) + self.register_instance(channel(self.runtime)) + return channel + + def register_instance[T: BaseChannel](self, channel: T) -> T: + if channel.name in self._channels: + raise ValueError(f"channel '{channel.name}' already registered") + self._channels[channel.name] = channel return channel @property diff --git a/src/bub/channels/runner.py b/src/bub/channels/runner.py index 9b8a810..b307672 100644 --- a/src/bub/channels/runner.py +++ b/src/bub/channels/runner.py @@ -23,13 +23,15 @@ def __init__( async def _run(self, channel: BaseChannel) -> None: await self._event.wait() - prompt = f"channel: ${channel.output_channel}\n" + "\n".join(self._prompts) + prompt = channel.format_prompt("\n".join(self._prompts)) self._prompts.clear() self._running_task = None try: result = await channel.run_prompt(self.session_id, prompt) await channel.process_output(self.session_id, result) except Exception: + if not channel.debounce_enabled: + raise logger.exception("session.run.error session_id={}", self.session_id) def reset_timer(self, timeout: int) -> None: @@ -48,16 +50,25 @@ async def process_message(self, channel: BaseChannel, message: Any) -> None: self._last_mentioned_at = None logger.info("session.receive ignored session_id={} message={}", self.session_id, prompt) return - self._prompts.append(prompt) if prompt.startswith(","): logger.info("session.receive.command session_id={} message={}", self.session_id, prompt) try: result = await channel.run_prompt(self.session_id, prompt) await channel.process_output(self.session_id, result) except Exception: + if not channel.debounce_enabled: + raise logger.exception("session.run.error session_id={}", self.session_id) - elif is_mentioned: - # wait at most 1 second to reply to mentioned messages. + return + elif not channel.debounce_enabled: + logger.info("session.receive.immediate session_id={} message={}", self.session_id, prompt) + result = await channel.run_prompt(self.session_id, prompt) + await channel.process_output(self.session_id, result) + return + + self._prompts.append(prompt) + if is_mentioned: + # Debounce mentioned messages before responding. self._last_mentioned_at = now logger.info("session.receive.mentioned session_id={} message={}", self.session_id, prompt) self.reset_timer(self.debounce_seconds) diff --git a/src/bub/cli/app.py b/src/bub/cli/app.py index e295b7a..3128158 100644 --- a/src/bub/cli/app.py +++ b/src/bub/cli/app.py @@ -13,13 +13,10 @@ from bub.app import build_runtime from bub.app.runtime import AppRuntime -from bub.channels import ChannelManager -from bub.cli.interactive import InteractiveCli +from bub.channels import ChannelManager, CliChannel from bub.logging_utils import configure_logging app = typer.Typer(name="bub", help="Tape-first coding agent CLI", add_completion=False) -TELEGRAM_DISABLED_ERROR = "telegram is disabled; set BUB_TELEGRAM_ENABLED=true" -TELEGRAM_TOKEN_ERROR = "missing telegram token; set BUB_TELEGRAM_TOKEN" # noqa: S105 def _parse_subset(values: list[str] | None) -> set[str] | None: @@ -62,8 +59,9 @@ def chat( with build_runtime( resolved_workspace, model=model, max_tokens=max_tokens, enable_scheduler=not disable_scheduler ) as runtime: - cli = InteractiveCli(runtime, session_id=session_id) - asyncio.run(cli.run()) + manager = ChannelManager(runtime, include_defaults=False) + manager.register_instance(CliChannel(runtime, session_id=session_id)) + asyncio.run(_serve_channels(manager)) @app.command() @@ -169,7 +167,7 @@ def message( configure_logging() resolved_workspace = (workspace.expanduser() if workspace else Path.cwd()).resolve() logger.info( - "telegram.start workspace={} model={} max_tokens={}, proactive_response={}", + "message.start workspace={} model={} max_tokens={}, proactive_response={}", str(resolved_workspace), model or "", max_tokens if max_tokens is not None else "", diff --git a/src/bub/cli/interactive.py b/src/bub/cli/interactive.py index c508a60..ecc6242 100644 --- a/src/bub/cli/interactive.py +++ b/src/bub/cli/interactive.py @@ -1,126 +1,18 @@ -"""Interactive CLI implementation.""" +"""Backward-compatible interactive CLI wrapper.""" from __future__ import annotations -from datetime import datetime -from hashlib import md5 -from pathlib import Path +from bub.channels.cli import CliChannel -from prompt_toolkit import PromptSession -from prompt_toolkit.completion import WordCompleter -from prompt_toolkit.formatted_text import FormattedText -from prompt_toolkit.history import FileHistory -from prompt_toolkit.key_binding import KeyBindings -from prompt_toolkit.patch_stdout import patch_stdout -from rich import get_console -from bub.app.runtime import AppRuntime -from bub.cli.render import CliRenderer - - -class InteractiveCli: - """Single interactive CLI mode inspired by modern coding agent shells.""" - - def __init__(self, runtime: AppRuntime, *, session_id: str = "cli") -> None: - self._runtime = runtime - self._session_id = session_id - self._session = runtime.get_session(session_id) - self._renderer = CliRenderer(get_console()) - self._mode = "agent" - self._last_tape_info: object | None = None - self._prompt = self._build_prompt() +class InteractiveCli(CliChannel): + """Compatibility wrapper that runs the CLI channel directly.""" async def run(self) -> None: - async with self._runtime.graceful_shutdown(): - return await self._run() - - async def _run(self) -> None: - self._renderer.welcome(model=self._runtime.settings.model, workspace=str(self._runtime.workspace)) - await self._refresh_tape_info() - - while True: - try: - with patch_stdout(raw=True): - raw = (await self._prompt.prompt_async(self._prompt_message())).strip() - except KeyboardInterrupt: - self._renderer.info("Interrupted. Use ',quit' to exit.") - continue - except EOFError: - break - - if not raw: - continue - - request = self._normalize_input(raw) - with self._renderer.console.status("[cyan]Processing...[/cyan]", spinner="dots"): - result = await self._runtime.handle_input(self._session_id, request) - await self._refresh_tape_info() - if result.immediate_output: - self._renderer.command_output(result.immediate_output) - if result.error: - self._renderer.error(result.error) - if result.assistant_output: - self._renderer.assistant_output(result.assistant_output) - if result.exit_requested: - break - self._renderer.info("Bye.") - - async def _refresh_tape_info(self) -> None: - try: - self._last_tape_info = await self._session.tape.info() - except Exception: - self._last_tape_info = None - - def _build_prompt(self) -> PromptSession[str]: - kb = KeyBindings() - - @kb.add("c-x", eager=True) - def _toggle_mode(event) -> None: - self._mode = "shell" if self._mode == "agent" else "agent" - event.app.invalidate() - - def _tool_sort_key(tool_name: str) -> tuple[str, str]: - section, _, name = tool_name.rpartition(".") - return (section, name) - - history_file = self._history_file(self._runtime.settings.resolve_home(), self._runtime.workspace) - history_file.parent.mkdir(parents=True, exist_ok=True) - history = FileHistory(str(history_file)) - tool_names = sorted((f",{tool}" for tool in self._session.tool_view.all_tools()), key=_tool_sort_key) - completer = WordCompleter(tool_names, ignore_case=True) - return PromptSession( - completer=completer, - complete_while_typing=True, - key_bindings=kb, - history=history, - bottom_toolbar=self._render_bottom_toolbar, - ) - - def _prompt_message(self) -> FormattedText: - cwd = Path.cwd().name - symbol = ">" if self._mode == "agent" else "," - return FormattedText([("bold", f"{cwd} {symbol} ")]) - - def _render_bottom_toolbar(self) -> FormattedText: - info = self._last_tape_info - now = datetime.now().strftime("%H:%M") - left = f"{now} mode:{self._mode}" - right = ( - f"model:{self._runtime.settings.model} " - f"entries:{getattr(info, 'entries', '-')} " - f"anchors:{getattr(info, 'anchors', '-')} " - f"last:{getattr(info, 'last_anchor', None) or '-'}" - ) - return FormattedText([("", f"{left} {right}")]) - - def _normalize_input(self, raw: str) -> str: - if self._mode != "shell": - return raw - if raw.startswith(","): - return raw - return f", {raw}" + async with self.runtime.graceful_shutdown(): + await self.start(self._handle_local_input) - @staticmethod - def _history_file(home: Path, workspace: Path) -> Path: - workspace_hash = md5(str(workspace).encode("utf-8")).hexdigest() # noqa: S324 - return home / "history" / f"{workspace_hash}.history" + async def _handle_local_input(self, message: str) -> None: + session_id, prompt = await self.get_session_prompt(message) + result = await self.run_prompt(session_id, prompt) + await self.process_output(session_id, result) diff --git a/tests/test_channels.py b/tests/test_channels.py index a62dcc6..10d46e0 100644 --- a/tests/test_channels.py +++ b/tests/test_channels.py @@ -48,6 +48,14 @@ async def process_output(self, session_id: str, output) -> None: _ = (session_id, output) +def test_channel_manager_rejects_duplicate_channel_name() -> None: + manager = ChannelManager(_Runtime()) # type: ignore[arg-type] + manager.register(_FakeChannel) + + with pytest.raises(ValueError, match="already registered"): + manager.register(_FakeChannel) + + @pytest.mark.asyncio async def test_channel_manager_starts_and_stops_registered_channels() -> None: manager = ChannelManager(_Runtime()) # type: ignore[arg-type] diff --git a/tests/test_cli_app.py b/tests/test_cli_app.py index 68141a7..cf17498 100644 --- a/tests/test_cli_app.py +++ b/tests/test_cli_app.py @@ -18,10 +18,15 @@ def __init__(self, workspace: Path) -> None: class _Settings: model = "openrouter:test" telegram_enabled = False + discord_enabled = False telegram_token = None telegram_allow_from = () telegram_allow_chats = () + @staticmethod + def resolve_home() -> Path: + return Path.cwd() + self.settings = _Settings() self.registry = type("_Registry", (), {"descriptors": staticmethod(lambda: [])})() @@ -48,9 +53,13 @@ class _Info: class _Session: tape = _Tape() + tool_view = type("_ToolView", (), {"all_tools": staticmethod(lambda: [])})() return _Session() + def install_hooks(self, _manager) -> None: + return None + def handle_input(self, _session_id: str, _text: str): raise AssertionError @@ -60,28 +69,29 @@ async def graceful_shutdown(self): yield stop_event -def test_chat_command_invokes_interactive_runner(monkeypatch, tmp_path: Path) -> None: - called = {"run": False} +def test_chat_command_registers_cli_channel(monkeypatch, tmp_path: Path) -> None: + called: dict[str, object] = {} def _fake_build_runtime(workspace: Path, *, model=None, max_tokens=None, enable_scheduler=True): assert workspace == tmp_path assert enable_scheduler is True return DummyRuntime(workspace) - class _FakeInteractive: - def __init__(self, _runtime, session_id: str = "cli"): - assert session_id == "cli" - - async def run(self) -> None: - called["run"] = True + async def _fake_serve_channels(manager) -> None: + called["channels"] = manager.enabled_channels() + called["channel_type"] = type(manager.channels["cli"]).__name__ monkeypatch.setattr(cli_app_module, "build_runtime", _fake_build_runtime) - monkeypatch.setattr(cli_app_module, "InteractiveCli", _FakeInteractive) + monkeypatch.setattr(cli_app_module, "_serve_channels", _fake_serve_channels) runner = CliRunner() - result = runner.invoke(cli_app_module.app, ["chat", "--workspace", str(tmp_path)]) + result = runner.invoke( + cli_app_module.app, + ["chat", "--workspace", str(tmp_path), "--session-id", "cli:test"], + ) assert result.exit_code == 0 - assert called["run"] is True + assert called["channels"] == ["cli"] + assert called["channel_type"] == "CliChannel" def test_run_command_expands_home_in_workspace(monkeypatch, tmp_path: Path) -> None: diff --git a/tests/test_session_runner.py b/tests/test_session_runner.py new file mode 100644 index 0000000..7e84df3 --- /dev/null +++ b/tests/test_session_runner.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +from collections.abc import Awaitable, Callable + +import pytest + +from bub.channels.base import BaseChannel +from bub.channels.runner import SessionRunner +from bub.core.agent_loop import LoopResult + + +class _Runtime: + pass + + +class _ImmediateChannel(BaseChannel[str]): + name = "cli" + + def __init__(self) -> None: + super().__init__(_Runtime()) # type: ignore[arg-type] + self.run_prompts: list[str] = [] + self.processed = 0 + + @property + def debounce_enabled(self) -> bool: + return False + + async def start(self, on_receive: Callable[[str], Awaitable[None]]) -> None: + _ = on_receive + + def is_mentioned(self, message: str) -> bool: + _ = message + return True + + async def get_session_prompt(self, message: str) -> tuple[str, str]: + return "cli:test", message + + async def run_prompt(self, session_id: str, prompt: str) -> LoopResult: + _ = session_id + self.run_prompts.append(prompt) + return LoopResult( + immediate_output="", + assistant_output="", + exit_requested=False, + steps=0, + error=None, + ) + + async def process_output(self, session_id: str, output: LoopResult) -> None: + _ = (session_id, output) + self.processed += 1 + + +class _DebouncedChannel(BaseChannel[str]): + name = "telegram" + + def __init__(self) -> None: + super().__init__(_Runtime()) # type: ignore[arg-type] + self.run_prompts: list[str] = [] + + async def start(self, on_receive: Callable[[str], Awaitable[None]]) -> None: + _ = on_receive + + def is_mentioned(self, message: str) -> bool: + _ = message + return True + + async def get_session_prompt(self, message: str) -> tuple[str, str]: + return "telegram:1", message + + async def run_prompt(self, session_id: str, prompt: str) -> LoopResult: + _ = session_id + self.run_prompts.append(prompt) + return LoopResult( + immediate_output="", + assistant_output="", + exit_requested=False, + steps=0, + error=None, + ) + + async def process_output(self, session_id: str, output: LoopResult) -> None: + _ = (session_id, output) + + +class _ImmediateFailingChannel(_ImmediateChannel): + async def run_prompt(self, session_id: str, prompt: str) -> LoopResult: + _ = (session_id, prompt) + raise RuntimeError("cli failure") + + +@pytest.mark.asyncio +async def test_session_runner_runs_non_debounced_channel_immediately() -> None: + runner = SessionRunner( + session_id="cli:test", + debounce_seconds=10, + message_delay_seconds=10, + active_time_window_seconds=60, + ) + channel = _ImmediateChannel() + + await runner.process_message(channel, "first") + await runner.process_message(channel, "second") + + assert channel.run_prompts == ["first", "second"] + assert channel.processed == 2 + + +@pytest.mark.asyncio +async def test_session_runner_does_not_leak_command_into_batched_prompt() -> None: + runner = SessionRunner( + session_id="telegram:1", + debounce_seconds=0, + message_delay_seconds=0, + active_time_window_seconds=60, + ) + channel = _DebouncedChannel() + + await runner.process_message(channel, ",help") + await runner.process_message(channel, "hello") + + assert channel.run_prompts[0] == ",help" + assert channel.run_prompts[1] == "channel: $telegram\nhello" + + +@pytest.mark.asyncio +async def test_session_runner_raises_for_non_debounced_channel_errors() -> None: + runner = SessionRunner( + session_id="cli:test", + debounce_seconds=10, + message_delay_seconds=10, + active_time_window_seconds=60, + ) + channel = _ImmediateFailingChannel() + + with pytest.raises(RuntimeError, match="cli failure"): + await runner.process_message(channel, "hello")