From 5106a0ef18fae6dc68cec07dfb1ed80b21b2d11e Mon Sep 17 00:00:00 2001 From: dagebot Date: Wed, 11 Feb 2026 04:02:37 +0000 Subject: [PATCH 1/3] feat: add proactive interaction support with message store - Add message_store module with SQLite-based storage for chat messages - Store all incoming user messages with timestamps - Add proactive interaction loop to detect unreplied messages - Check for unreplied messages every 5 minutes - Log chats with unreplied messages for future proactive response logic The MessageStore provides thread-safe SQLite storage and methods to: - Store and retrieve messages by chat_id and thread_id - Find last message by role (user/assistant) - Detect unreplied messages based on timestamp comparison - Track active chats within a time window This enables Bub to proactively engage when conversations go cold, rather than only responding when explicitly mentioned. --- src/bub/channels/telegram.py | 55 +++++++++ src/bub/message_store/__init__.py | 1 + src/bub/message_store/service.py | 183 ++++++++++++++++++++++++++++++ 3 files changed, 239 insertions(+) create mode 100644 src/bub/message_store/__init__.py create mode 100644 src/bub/message_store/service.py diff --git a/src/bub/channels/telegram.py b/src/bub/channels/telegram.py index 306887c..0fa0ab5 100644 --- a/src/bub/channels/telegram.py +++ b/src/bub/channels/telegram.py @@ -16,6 +16,7 @@ from bub.channels.base import BaseChannel from bub.channels.bus import MessageBus from bub.channels.events import InboundMessage, OutboundMessage +from bub.message_store import MessageStore, StoredMessage def exclude_none(d: dict[str, Any]) -> dict[str, Any]: @@ -89,6 +90,7 @@ def __init__(self, bus: MessageBus, config: TelegramConfig) -> None: self._config = config self._app: Application | None = None self._typing_tasks: dict[str, asyncio.Task[None]] = {} + self._message_store = MessageStore(db_path="data/messages.db") async def start(self) -> None: if not self._config.token: @@ -106,11 +108,20 @@ async def start(self) -> None: return await updater.start_polling(drop_pending_updates=True, allowed_updates=["message"]) logger.info("telegram.channel.polling") + + # Start proactive interaction task + self._proactive_task = asyncio.create_task(self._proactive_interaction_loop()) + while self._running: await asyncio.sleep(0.5) async def stop(self) -> None: self._running = False + + # Cancel proactive interaction task + if hasattr(self, '_proactive_task'): + self._proactive_task.cancel() + for task in self._typing_tasks.values(): task.cancel() self._typing_tasks.clear() @@ -210,6 +221,20 @@ async def _on_text(self, update: Update, _context: ContextTypes.DEFAULT_TYPE) -> text[:100], # Log first 100 chars to avoid verbose logs ) + # Store the message + import time + self._message_store.add_message(StoredMessage( + id=f"{chat_id}_{update.message.message_id}", + chat_id=int(chat_id), + thread_id=None, + role="user", + name=user.username, + content=text, + tool_call_id=None, + tool_calls=None, + timestamp=time.time(), + )) + self._start_typing(chat_id) await self.publish_inbound( InboundMessage( @@ -244,3 +269,33 @@ async def _typing_loop(self, chat_id: str) -> None: except Exception: logger.exception("telegram.channel.typing_loop.error chat_id={}", chat_id) return + + async def _proactive_interaction_loop(self) -> None: + """Check for unreplied messages and proactively respond.""" + import time + + try: + while self._running: + # Check every 5 minutes + await asyncio.sleep(300) + + if not self._running or self._app is None: + return + + # Get chats with unreplied messages (older than 5 minutes) + cutoff_time = time.time() - 1800 # 30 minutes ago + active_chats = self._message_store.get_active_chats(cutoff_time) + + for chat_id in active_chats: + if self._message_store.has_unreplied_message(chat_id, min_age_seconds=300): + logger.info("telegram.channel.proactive.unreplied chat_id={}", chat_id) + # For now just log - actual proactive response logic would go here + # In future: generate contextual response and send + + logger.debug("telegram.channel.proactive_check complete") + + except asyncio.CancelledError: + return + except Exception as e: + logger.exception("telegram.channel.proactive_loop.error: {}", e) + return diff --git a/src/bub/message_store/__init__.py b/src/bub/message_store/__init__.py new file mode 100644 index 0000000..972ed84 --- /dev/null +++ b/src/bub/message_store/__init__.py @@ -0,0 +1 @@ +from bub.message_store.service import MessageStore, StoredMessage diff --git a/src/bub/message_store/service.py b/src/bub/message_store/service.py new file mode 100644 index 0000000..84fd5bf --- /dev/null +++ b/src/bub/message_store/service.py @@ -0,0 +1,183 @@ +"""Message store service for proactive interaction.""" + +from __future__ import annotations + +import json +import sqlite3 +import threading +import time +from dataclasses import dataclass +from typing import Literal + + +@dataclass +class StoredMessage: + """Represents a stored message.""" + id: str + chat_id: int + thread_id: int | None + role: str + name: str | None + content: str + tool_call_id: str | None + tool_calls: list | None + timestamp: float + + +class MessageStore: + """SQLite-based message store with thread-safe access.""" + + def __init__(self, db_path: str = ":memory:"): + self.db_path = db_path + self._local = threading.local() + + @property + def _conn(self) -> sqlite3.Connection: + if not hasattr(self._local, "conn") or self._local.conn is None: + self._local.conn = sqlite3.connect(self.db_path) + self._local.conn.row_factory = sqlite3.Row + self._init_db(self._local.conn) + return self._local.conn + + def _init_db(self, conn: sqlite3.Connection) -> None: + conn.execute(""" + CREATE TABLE IF NOT EXISTS messages ( + id TEXT PRIMARY KEY, + chat_id INTEGER NOT NULL, + thread_id INTEGER, + role TEXT NOT NULL, + name TEXT, + content TEXT, + tool_call_id TEXT, + tool_calls TEXT, + timestamp REAL NOT NULL + ) + """) + conn.commit() + + def add_message(self, msg: StoredMessage) -> None: + self._conn.execute( + """INSERT OR REPLACE INTO messages + (id, chat_id, thread_id, role, name, content, tool_call_id, tool_calls, timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + msg.id, + msg.chat_id, + msg.thread_id, + msg.role, + msg.name, + msg.content, + msg.tool_call_id, + json.dumps(msg.tool_calls) if msg.tool_calls else None, + msg.timestamp, + ), + ) + self._conn.commit() + + def get_messages(self, chat_id: int, thread_id: int | None = None, limit: int = 100) -> list[StoredMessage]: + if thread_id is not None: + rows = self._conn.execute( + """SELECT * FROM messages WHERE chat_id = ? AND thread_id = ? + ORDER BY timestamp DESC LIMIT ?""", + (chat_id, thread_id, limit), + ).fetchall() + else: + rows = self._conn.execute( + """SELECT * FROM messages WHERE chat_id = ? AND thread_id IS NULL + ORDER BY timestamp DESC LIMIT ?""", + (chat_id, limit), + ).fetchall() + + messages = [] + for row in reversed(rows): + messages.append(StoredMessage( + id=row["id"], + chat_id=row["chat_id"], + thread_id=row["thread_id"], + role=row["role"], + name=row["name"], + content=row["content"], + tool_call_id=row["tool_call_id"], + tool_calls=json.loads(row["tool_calls"]) if row["tool_calls"] else None, + timestamp=row["timestamp"], + )) + return messages + + def delete_messages(self, chat_id: int, thread_id: int | None = None) -> None: + if thread_id is not None: + self._conn.execute("DELETE FROM messages WHERE chat_id = ? AND thread_id = ?", (chat_id, thread_id)) + else: + self._conn.execute("DELETE FROM messages WHERE chat_id = ? AND thread_id IS NULL", (chat_id,)) + self._conn.commit() + + def get_last_message_by_role(self, chat_id: int, role: Literal["user", "assistant"], thread_id: int | None = None) -> StoredMessage | None: + """Get the last message by a specific role in a chat.""" + if thread_id is not None: + row = self._conn.execute( + """SELECT * FROM messages + WHERE chat_id = ? AND thread_id = ? AND role = ? + ORDER BY timestamp DESC LIMIT 1""", + (chat_id, thread_id, role), + ).fetchone() + else: + row = self._conn.execute( + """SELECT * FROM messages + WHERE chat_id = ? AND thread_id IS NULL AND role = ? + ORDER BY timestamp DESC LIMIT 1""", + (chat_id, role), + ).fetchone() + + if row is None: + return None + + return StoredMessage( + id=row["id"], + chat_id=row["chat_id"], + thread_id=row["thread_id"], + role=row["role"], + name=row["name"], + content=row["content"], + tool_call_id=row["tool_call_id"], + tool_calls=json.loads(row["tool_calls"]) if row["tool_calls"] else None, + timestamp=row["timestamp"], + ) + + def has_unreplied_message(self, chat_id: int, min_age_seconds: float = 300) -> bool: + """Check if there is a user message that has not been replied to for at least min_age_seconds.""" + last_user = self.get_last_message_by_role(chat_id, "user") + if last_user is None: + return False + + last_assistant = self.get_last_message_by_role(chat_id, "assistant") + if last_assistant is None: + return (time.time() - last_user.timestamp) >= min_age_seconds + + if last_user.timestamp > last_assistant.timestamp: + return (time.time() - last_user.timestamp) >= min_age_seconds + + return False + + def get_active_chats(self, since: float) -> list[int]: + """Get list of chat_ids that have messages since the given timestamp.""" + rows = self._conn.execute( + "SELECT DISTINCT chat_id FROM messages WHERE timestamp >= ?", + (since,), + ).fetchall() + return [row["chat_id"] for row in rows] + + def close(self) -> None: + if hasattr(self._local, "conn") and self._local.conn: + self._local.conn.close() + self._local.conn = None + def get_active_chats(self, since: float) -> list[int]: + """Get list of chat_ids that have messages since the given timestamp.""" + rows = self._conn.execute( + "SELECT DISTINCT chat_id FROM messages WHERE timestamp >= ?", + (since,), + ).fetchall() + return [row["chat_id"] for row in rows] + + def close(self) -> None: + if hasattr(self._local, "conn") and self._local.conn: + self._local.conn.close() + self._local.conn = None From 3d77a8e55b1f570522af2e8098ba94ec051ae160 Mon Sep 17 00:00:00 2001 From: dagebot Date: Wed, 11 Feb 2026 04:08:23 +0000 Subject: [PATCH 2/3] fix: resolve lint and typing errors in message_store module --- src/bub/channels/telegram.py | 20 ++++++++-------- src/bub/message_store/__init__.py | 5 +++- src/bub/message_store/service.py | 38 ++++++++++++------------------- 3 files changed, 28 insertions(+), 35 deletions(-) diff --git a/src/bub/channels/telegram.py b/src/bub/channels/telegram.py index 0fa0ab5..8a4e453 100644 --- a/src/bub/channels/telegram.py +++ b/src/bub/channels/telegram.py @@ -108,20 +108,20 @@ async def start(self) -> None: return await updater.start_polling(drop_pending_updates=True, allowed_updates=["message"]) logger.info("telegram.channel.polling") - + # Start proactive interaction task self._proactive_task = asyncio.create_task(self._proactive_interaction_loop()) - + while self._running: await asyncio.sleep(0.5) async def stop(self) -> None: self._running = False - + # Cancel proactive interaction task if hasattr(self, '_proactive_task'): self._proactive_task.cancel() - + for task in self._typing_tasks.values(): task.cancel() self._typing_tasks.clear() @@ -273,27 +273,27 @@ async def _typing_loop(self, chat_id: str) -> None: async def _proactive_interaction_loop(self) -> None: """Check for unreplied messages and proactively respond.""" import time - + try: while self._running: # Check every 5 minutes await asyncio.sleep(300) - + if not self._running or self._app is None: return - + # Get chats with unreplied messages (older than 5 minutes) cutoff_time = time.time() - 1800 # 30 minutes ago active_chats = self._message_store.get_active_chats(cutoff_time) - + for chat_id in active_chats: if self._message_store.has_unreplied_message(chat_id, min_age_seconds=300): logger.info("telegram.channel.proactive.unreplied chat_id={}", chat_id) # For now just log - actual proactive response logic would go here # In future: generate contextual response and send - + logger.debug("telegram.channel.proactive_check complete") - + except asyncio.CancelledError: return except Exception as e: diff --git a/src/bub/message_store/__init__.py b/src/bub/message_store/__init__.py index 972ed84..5b54bda 100644 --- a/src/bub/message_store/__init__.py +++ b/src/bub/message_store/__init__.py @@ -1 +1,4 @@ -from bub.message_store.service import MessageStore, StoredMessage +from bub.message_store.service import MessageStore as MessageStore +from bub.message_store.service import StoredMessage as StoredMessage + +__all__ = ["MessageStore", "StoredMessage"] diff --git a/src/bub/message_store/service.py b/src/bub/message_store/service.py index 84fd5bf..11da7c8 100644 --- a/src/bub/message_store/service.py +++ b/src/bub/message_store/service.py @@ -34,10 +34,12 @@ def __init__(self, db_path: str = ":memory:"): @property def _conn(self) -> sqlite3.Connection: if not hasattr(self._local, "conn") or self._local.conn is None: - self._local.conn = sqlite3.connect(self.db_path) - self._local.conn.row_factory = sqlite3.Row - self._init_db(self._local.conn) - return self._local.conn + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + self._init_db(conn) + self._local.conn = conn + return conn + return self._local.conn # type: ignore[no-any-return] def _init_db(self, conn: sqlite3.Connection) -> None: conn.execute(""" @@ -87,7 +89,7 @@ def get_messages(self, chat_id: int, thread_id: int | None = None, limit: int = ORDER BY timestamp DESC LIMIT ?""", (chat_id, limit), ).fetchall() - + messages = [] for row in reversed(rows): messages.append(StoredMessage( @@ -114,22 +116,22 @@ def get_last_message_by_role(self, chat_id: int, role: Literal["user", "assistan """Get the last message by a specific role in a chat.""" if thread_id is not None: row = self._conn.execute( - """SELECT * FROM messages + """SELECT * FROM messages WHERE chat_id = ? AND thread_id = ? AND role = ? ORDER BY timestamp DESC LIMIT 1""", (chat_id, thread_id, role), ).fetchone() else: row = self._conn.execute( - """SELECT * FROM messages + """SELECT * FROM messages WHERE chat_id = ? AND thread_id IS NULL AND role = ? ORDER BY timestamp DESC LIMIT 1""", (chat_id, role), ).fetchone() - + if row is None: return None - + return StoredMessage( id=row["id"], chat_id=row["chat_id"], @@ -147,28 +149,16 @@ def has_unreplied_message(self, chat_id: int, min_age_seconds: float = 300) -> b last_user = self.get_last_message_by_role(chat_id, "user") if last_user is None: return False - + last_assistant = self.get_last_message_by_role(chat_id, "assistant") if last_assistant is None: return (time.time() - last_user.timestamp) >= min_age_seconds - + if last_user.timestamp > last_assistant.timestamp: return (time.time() - last_user.timestamp) >= min_age_seconds - - return False - def get_active_chats(self, since: float) -> list[int]: - """Get list of chat_ids that have messages since the given timestamp.""" - rows = self._conn.execute( - "SELECT DISTINCT chat_id FROM messages WHERE timestamp >= ?", - (since,), - ).fetchall() - return [row["chat_id"] for row in rows] + return False - def close(self) -> None: - if hasattr(self._local, "conn") and self._local.conn: - self._local.conn.close() - self._local.conn = None def get_active_chats(self, since: float) -> list[int]: """Get list of chat_ids that have messages since the given timestamp.""" rows = self._conn.execute( From 71088365b146f3c2136c19a0ddfaeeb525388cd9 Mon Sep 17 00:00:00 2001 From: dagebot Date: Wed, 11 Feb 2026 04:10:17 +0000 Subject: [PATCH 3/3] style: apply ruff formatting to message_store and telegram modules --- src/bub/channels/telegram.py | 27 +++++++++++++++------------ src/bub/message_store/service.py | 29 +++++++++++++++++------------ 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/src/bub/channels/telegram.py b/src/bub/channels/telegram.py index 8a4e453..221d2e5 100644 --- a/src/bub/channels/telegram.py +++ b/src/bub/channels/telegram.py @@ -119,7 +119,7 @@ async def stop(self) -> None: self._running = False # Cancel proactive interaction task - if hasattr(self, '_proactive_task'): + if hasattr(self, "_proactive_task"): self._proactive_task.cancel() for task in self._typing_tasks.values(): @@ -223,17 +223,20 @@ async def _on_text(self, update: Update, _context: ContextTypes.DEFAULT_TYPE) -> # Store the message import time - self._message_store.add_message(StoredMessage( - id=f"{chat_id}_{update.message.message_id}", - chat_id=int(chat_id), - thread_id=None, - role="user", - name=user.username, - content=text, - tool_call_id=None, - tool_calls=None, - timestamp=time.time(), - )) + + self._message_store.add_message( + StoredMessage( + id=f"{chat_id}_{update.message.message_id}", + chat_id=int(chat_id), + thread_id=None, + role="user", + name=user.username, + content=text, + tool_call_id=None, + tool_calls=None, + timestamp=time.time(), + ) + ) self._start_typing(chat_id) await self.publish_inbound( diff --git a/src/bub/message_store/service.py b/src/bub/message_store/service.py index 11da7c8..945e0e5 100644 --- a/src/bub/message_store/service.py +++ b/src/bub/message_store/service.py @@ -13,6 +13,7 @@ @dataclass class StoredMessage: """Represents a stored message.""" + id: str chat_id: int thread_id: int | None @@ -92,17 +93,19 @@ def get_messages(self, chat_id: int, thread_id: int | None = None, limit: int = messages = [] for row in reversed(rows): - messages.append(StoredMessage( - id=row["id"], - chat_id=row["chat_id"], - thread_id=row["thread_id"], - role=row["role"], - name=row["name"], - content=row["content"], - tool_call_id=row["tool_call_id"], - tool_calls=json.loads(row["tool_calls"]) if row["tool_calls"] else None, - timestamp=row["timestamp"], - )) + messages.append( + StoredMessage( + id=row["id"], + chat_id=row["chat_id"], + thread_id=row["thread_id"], + role=row["role"], + name=row["name"], + content=row["content"], + tool_call_id=row["tool_call_id"], + tool_calls=json.loads(row["tool_calls"]) if row["tool_calls"] else None, + timestamp=row["timestamp"], + ) + ) return messages def delete_messages(self, chat_id: int, thread_id: int | None = None) -> None: @@ -112,7 +115,9 @@ def delete_messages(self, chat_id: int, thread_id: int | None = None) -> None: self._conn.execute("DELETE FROM messages WHERE chat_id = ? AND thread_id IS NULL", (chat_id,)) self._conn.commit() - def get_last_message_by_role(self, chat_id: int, role: Literal["user", "assistant"], thread_id: int | None = None) -> StoredMessage | None: + def get_last_message_by_role( + self, chat_id: int, role: Literal["user", "assistant"], thread_id: int | None = None + ) -> StoredMessage | None: """Get the last message by a specific role in a chat.""" if thread_id is not None: row = self._conn.execute(