Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/channels.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Channels

Bub supports running the same agent loop through message channels.
Use channels when you want remote operation from mobile or shared team environments.
Bub supports running the same agent loop through channel adapters.
Use channels when you want either local interactive operation or remote operation from mobile/shared team environments.

## Supported Channels

- `cli` (local): interactive terminal channel used by `uv run bub chat`.
- [Telegram](telegram.md): direct messages and group chats.
- [Discord](discord.md): servers, channels, and threads.

Expand All @@ -20,6 +21,7 @@ If the process exits immediately, check that at least one channel is enabled in

## Session Isolation

- CLI session key: `cli` or `cli:<name>` (from `--session-id`).
- Telegram session key: `telegram:<chat_id>`
- Discord session key: `discord:<channel_id>`

Expand Down
2 changes: 2 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Interactive CLI

`uv run bub chat` runs the local `cli` channel adapter (same channel pipeline as other channels, but local-only).

## Run Commands

```bash
Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ Its operating philosophy follows [Socialized Evaluation](https://psiace.me/posts
- [Interactive CLI](cli.md): interactive workflow and troubleshooting.
- [Architecture](architecture.md): runtime boundaries and internals.
- [Deployment Guide](deployment.md): local and Docker operations.
- [Channels](channels.md): Telegram and Discord runtime model.
- [Channels](channels.md): CLI/Telegram/Discord runtime model.
- [Post: Socialized Evaluation and Agent Partnership](posts/2026-03-01-bub-socialized-evaluation-and-agent-partnership.md): project position and principles.
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ nav:
- "2025-07-16 · Baby Bub Bootstrap Milestone": posts/2025-07-16-baby-bub-bootstrap-milestone.md
- Deployment: deployment.md
- Architecture: architecture.md
- Interactive CLI: cli.md
- Channels:
- Overview: channels.md
- CLI (Local): cli.md
- Telegram: telegram.md
- Discord: discord.md

Expand Down
2 changes: 2 additions & 0 deletions src/bub/channels/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""Channel adapters and bus exports."""

from bub.channels.base import BaseChannel
from bub.channels.cli import CliChannel
from bub.channels.discord import DiscordChannel, DiscordConfig
from bub.channels.manager import ChannelManager
from bub.channels.telegram import TelegramChannel, TelegramConfig

__all__ = [
"BaseChannel",
"ChannelManager",
"CliChannel",
"DiscordChannel",
"DiscordConfig",
"TelegramChannel",
Expand Down
9 changes: 9 additions & 0 deletions src/bub/channels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ def output_channel(self) -> str:
"""The name of the channel to send outputs to. Defaults to the same channel."""
return self.name

@property
def debounce_enabled(self) -> bool:
"""Whether inbound messages should be debounced before model execution."""
return True

@abstractmethod
def is_mentioned(self, message: T) -> bool:
"""Determine if the message is relevant to this channel."""
Expand All @@ -46,6 +51,10 @@ async def run_prompt(self, session_id: str, prompt: str) -> LoopResult:
"""Run the given prompt through the runtime and return the result."""
return await self.runtime.handle_input(session_id, prompt)

def format_prompt(self, prompt: str) -> str:
"""Format accumulated prompt text before sending it to the runtime."""
return f"channel: ${self.output_channel}\n{prompt}"

@abstractmethod
async def process_output(self, session_id: str, output: LoopResult) -> None:
"""Process the output returned by the LLM."""
Expand Down
148 changes: 148 additions & 0 deletions src/bub/channels/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""CLI channel adapter."""

from __future__ import annotations

from collections.abc import Awaitable, Callable
from datetime import datetime
from hashlib import md5
from pathlib import Path

from loguru import logger
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.formatted_text import FormattedText
from prompt_toolkit.history import FileHistory
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.patch_stdout import patch_stdout
from rich import get_console

from bub.app.runtime import AppRuntime
from bub.channels.base import BaseChannel
from bub.cli.render import CliRenderer
from bub.core.agent_loop import LoopResult


class CliChannel(BaseChannel[str]):
"""Interactive terminal channel."""

name = "cli"

def __init__(self, runtime: AppRuntime, *, session_id: str = "cli") -> None:
super().__init__(runtime)
self._session_id = session_id
self._session = runtime.get_session(session_id)
self._renderer = CliRenderer(get_console())
self._mode = "agent"
self._last_tape_info: object | None = None
self._prompt = self._build_prompt()
self._stop_requested = False

@property
def debounce_enabled(self) -> bool:
return False

async def start(self, on_receive: Callable[[str], Awaitable[None]]) -> None:
self._renderer.welcome(model=self.runtime.settings.model, workspace=str(self.runtime.workspace))
await self._refresh_tape_info()

while not self._stop_requested:
try:
with patch_stdout(raw=True):
raw = (await self._prompt.prompt_async(self._prompt_message())).strip()
except KeyboardInterrupt:
self._renderer.info("Interrupted. Use ',quit' to exit.")
continue
except EOFError:
break

if not raw:
continue

request = self._normalize_input(raw)
with self._renderer.console.status("[cyan]Processing...[/cyan]", spinner="dots"):
await on_receive(request)

self._renderer.info("Bye.")

def is_mentioned(self, message: str) -> bool:
_ = message
return True

async def get_session_prompt(self, message: str) -> tuple[str, str]:
return self._session_id, message

def format_prompt(self, prompt: str) -> str:
return prompt

async def process_output(self, session_id: str, output: LoopResult) -> None:
_ = session_id
await self._refresh_tape_info()
if output.immediate_output:
self._renderer.command_output(output.immediate_output)
if output.error:
self._renderer.error(output.error)
if output.assistant_output:
self._renderer.assistant_output(output.assistant_output)
if output.exit_requested:
self._stop_requested = True

async def _refresh_tape_info(self) -> None:
try:
self._last_tape_info = await self._session.tape.info()
except Exception as exc:
self._last_tape_info = None
logger.debug("cli.tape_info.unavailable session_id={} error={}", self._session_id, exc)

def _build_prompt(self) -> PromptSession[str]:
kb = KeyBindings()

@kb.add("c-x", eager=True)
def _toggle_mode(event) -> None:
self._mode = "shell" if self._mode == "agent" else "agent"
event.app.invalidate()

def _tool_sort_key(tool_name: str) -> tuple[str, str]:
section, _, name = tool_name.rpartition(".")
return (section, name)

history_file = self._history_file(self.runtime.settings.resolve_home(), self.runtime.workspace)
history_file.parent.mkdir(parents=True, exist_ok=True)
history = FileHistory(str(history_file))
tool_names = sorted((f",{tool}" for tool in self._session.tool_view.all_tools()), key=_tool_sort_key)
completer = WordCompleter(tool_names, ignore_case=True)
return PromptSession(
completer=completer,
complete_while_typing=True,
key_bindings=kb,
history=history,
bottom_toolbar=self._render_bottom_toolbar,
)

def _prompt_message(self) -> FormattedText:
cwd = Path.cwd().name
symbol = ">" if self._mode == "agent" else ","
return FormattedText([("bold", f"{cwd} {symbol} ")])

def _render_bottom_toolbar(self) -> FormattedText:
info = self._last_tape_info
now = datetime.now().strftime("%H:%M")
left = f"{now} mode:{self._mode}"
right = (
f"model:{self.runtime.settings.model} "
f"entries:{getattr(info, 'entries', '-')} "
f"anchors:{getattr(info, 'anchors', '-')} "
f"last:{getattr(info, 'last_anchor', None) or '-'}"
)
return FormattedText([("", f"{left} {right}")])

def _normalize_input(self, raw: str) -> str:
if self._mode != "shell":
return raw
if raw.startswith(","):
return raw
return f", {raw}"

@staticmethod
def _history_file(home: Path, workspace: Path) -> Path:
workspace_hash = md5(str(workspace).encode("utf-8")).hexdigest() # noqa: S324
return home / "history" / f"{workspace_hash}.history"
15 changes: 11 additions & 4 deletions src/bub/channels/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,24 @@
class ChannelManager:
"""Coordinate inbound routing and outbound dispatch for channels."""

def __init__(self, runtime: AppRuntime) -> None:
def __init__(self, runtime: AppRuntime, *, include_defaults: bool = True) -> None:
self.runtime = runtime
self._channels: dict[str, BaseChannel] = {}
self._channel_tasks: list[asyncio.Task[None]] = []
self._session_runners: dict[str, SessionRunner] = {}
for channel_cls in self.default_channels():
self.register(channel_cls)
if include_defaults:
for channel_cls in self.default_channels():
self.register(channel_cls)
runtime.install_hooks(self)

def register[T: type[BaseChannel]](self, channel: T) -> T:
self._channels[channel.name] = channel(self.runtime)
self.register_instance(channel(self.runtime))
return channel

def register_instance[T: BaseChannel](self, channel: T) -> T:
if channel.name in self._channels:
raise ValueError(f"channel '{channel.name}' already registered")
self._channels[channel.name] = channel
return channel

@property
Expand Down
19 changes: 15 additions & 4 deletions src/bub/channels/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ def __init__(

async def _run(self, channel: BaseChannel) -> None:
await self._event.wait()
prompt = f"channel: ${channel.output_channel}\n" + "\n".join(self._prompts)
prompt = channel.format_prompt("\n".join(self._prompts))
self._prompts.clear()
self._running_task = None
try:
result = await channel.run_prompt(self.session_id, prompt)
await channel.process_output(self.session_id, result)
except Exception:
if not channel.debounce_enabled:
raise
logger.exception("session.run.error session_id={}", self.session_id)

def reset_timer(self, timeout: int) -> None:
Expand All @@ -48,16 +50,25 @@ async def process_message(self, channel: BaseChannel, message: Any) -> None:
self._last_mentioned_at = None
logger.info("session.receive ignored session_id={} message={}", self.session_id, prompt)
return
self._prompts.append(prompt)
if prompt.startswith(","):
logger.info("session.receive.command session_id={} message={}", self.session_id, prompt)
try:
result = await channel.run_prompt(self.session_id, prompt)
await channel.process_output(self.session_id, result)
except Exception:
if not channel.debounce_enabled:
raise
logger.exception("session.run.error session_id={}", self.session_id)
elif is_mentioned:
# wait at most 1 second to reply to mentioned messages.
return
elif not channel.debounce_enabled:
logger.info("session.receive.immediate session_id={} message={}", self.session_id, prompt)
result = await channel.run_prompt(self.session_id, prompt)
await channel.process_output(self.session_id, result)
return

self._prompts.append(prompt)
if is_mentioned:
# Debounce mentioned messages before responding.
self._last_mentioned_at = now
logger.info("session.receive.mentioned session_id={} message={}", self.session_id, prompt)
self.reset_timer(self.debounce_seconds)
Expand Down
12 changes: 5 additions & 7 deletions src/bub/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@

from bub.app import build_runtime
from bub.app.runtime import AppRuntime
from bub.channels import ChannelManager
from bub.cli.interactive import InteractiveCli
from bub.channels import ChannelManager, CliChannel
from bub.logging_utils import configure_logging

app = typer.Typer(name="bub", help="Tape-first coding agent CLI", add_completion=False)
TELEGRAM_DISABLED_ERROR = "telegram is disabled; set BUB_TELEGRAM_ENABLED=true"
TELEGRAM_TOKEN_ERROR = "missing telegram token; set BUB_TELEGRAM_TOKEN" # noqa: S105


def _parse_subset(values: list[str] | None) -> set[str] | None:
Expand Down Expand Up @@ -62,8 +59,9 @@ def chat(
with build_runtime(
resolved_workspace, model=model, max_tokens=max_tokens, enable_scheduler=not disable_scheduler
) as runtime:
cli = InteractiveCli(runtime, session_id=session_id)
asyncio.run(cli.run())
manager = ChannelManager(runtime, include_defaults=False)
manager.register_instance(CliChannel(runtime, session_id=session_id))
asyncio.run(_serve_channels(manager))


@app.command()
Expand Down Expand Up @@ -169,7 +167,7 @@ def message(
configure_logging()
resolved_workspace = (workspace.expanduser() if workspace else Path.cwd()).resolve()
logger.info(
"telegram.start workspace={} model={} max_tokens={}, proactive_response={}",
"message.start workspace={} model={} max_tokens={}, proactive_response={}",
str(resolved_workspace),
model or "<default>",
max_tokens if max_tokens is not None else "<default>",
Expand Down
Loading
Loading