fix: sync Telegram alarms across directories and preserve latest tool states#382
fix: sync Telegram alarms across directories and preserve latest tool states#382
Conversation
There was a problem hiding this comment.
Pull request overview
This PR makes Telegram session alarm updates directory-aware across the UI → extended API → Telegram bridge pipeline, improves bridge observability/health messaging, and hardens frontend bootstrap merging so older sync payloads don’t overwrite newer SSE tool-part states.
Changes:
- Add directory propagation for pending questions and session alarm writes (including a new bridge
/session-alarmhandler and outbound stream scoping by directory). - Improve bridge logging and health messaging (single-source health message simplification; additional inbound/outbound logs).
- Update frontend sync bootstrap merge to preserve the most up-to-date tool part states.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| shared/telegram-session-store.ts | Adds directory field to pending questions; introduces disk refresh + new sessionMapKeys() for alarm/notification syncing. |
| shared/telegram-bridge.ts | Adds /session-alarm RPC, outbound stream scoping by directory, and more logging/health messaging updates. |
| shared/extended-api.ts | Forwards session alarm writes (including directory) to the bridge; expands notification sync reporting. |
| app-prefixable/tests/telegram-bridge.test.ts | Updates expectations for simplified single-source OpenCode health message. |
| app-prefixable/src/utils/extended-api.ts | Extends setSessionAlarm to include optional directory in requests. |
| app-prefixable/src/pages/settings.tsx | Adjusts Telegram alarm channel toggle gating/disabled behavior. |
| app-prefixable/src/pages/session.tsx | Updates per-session bell sync to send directory (and removes source resolution usage). |
| app-prefixable/src/context/sync.tsx | Reworks bootstrap merge to preserve newer SSE tool-part states over older sync payloads. |
| app-prefixable/src/components/telegram-settings.tsx | Only shows per-source OpenCode status list when multiple sources exist. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (url.pathname === "/session-alarm" && (req.method === "GET" || req.method === "PUT")) { | ||
| if (!allowTelegramHealthRequest(server.requestIP(req)?.address)) { | ||
| return new Response("Not Found", { status: 404 }) | ||
| } | ||
| return handleSessionAlarmEndpoint(runtime, req, url) | ||
| } |
There was a problem hiding this comment.
Same issue as the polling health server: /session-alarm is protected only by allowTelegramHealthRequest(), which can become public when TELEGRAM_HEALTH_PUBLIC is enabled. Since this endpoint mutates alarm state and can start outbound streams, it should remain local-only or require explicit authentication/secret even when health is public.
| function sessionAlarmSourceIdError(sourceId: string): string | undefined { | ||
| if (!sourceId) return "sourceId is required" | ||
| if (sourceId.length > 128) return "sourceId must be 128 characters or fewer" | ||
| if (!/^[a-zA-Z0-9._-]+$/.test(sourceId)) return "sourceId contains unsupported characters" | ||
| } |
There was a problem hiding this comment.
sessionAlarmSourceIdError() allows sourceIds like "Default" (it matches the [a-zA-Z0-9._-]+ regex) whereas the extended API explicitly reserves case-insensitive "default". Accepting "Default" here can create alarm keys under an unknown sourceId that the bridge will never resolve (and ensureOutboundNotifications won’t find a matching source), effectively orphaning the alarm state. Consider aligning validation with shared/extended-api.ts (reject any sourceId whose lowercased value is "default" unless it is exactly "default").
| async function refreshFromDisk() { | ||
| await ready | ||
| const file = await stat(path).catch(() => undefined) | ||
| if (!file) return | ||
| if (file.mtimeMs <= loadedMtimeMs) return | ||
| const data = await readStore(path).catch((error) => { | ||
| console.warn("[TelegramBridge] session store refresh failed", { path, error }) | ||
| return | ||
| }) | ||
| if (!data) return | ||
| applyData(data) | ||
| loadedMtimeMs = file.mtimeMs |
There was a problem hiding this comment.
refreshFromDisk() mutates the in-memory maps without coordinating with the ops/run() write serialization. Because each write operation calls refreshFromDisk() outside the run() queue, a concurrent refresh can reload older on-disk state while another flush() is in flight, and a subsequent flush() can persist that stale in-memory state back to disk (lost updates). Consider making refresh participate in the same mutex/queue as writes (e.g., wrap refresh + mutation inside run() or guard applyData()/loadedMtimeMs with the same lock) so reads/refreshes cannot interleave with writes.
| async function refreshFromDisk() { | |
| await ready | |
| const file = await stat(path).catch(() => undefined) | |
| if (!file) return | |
| if (file.mtimeMs <= loadedMtimeMs) return | |
| const data = await readStore(path).catch((error) => { | |
| console.warn("[TelegramBridge] session store refresh failed", { path, error }) | |
| return | |
| }) | |
| if (!data) return | |
| applyData(data) | |
| loadedMtimeMs = file.mtimeMs | |
| function refreshFromDisk() { | |
| return run(async () => { | |
| await ready | |
| const file = await stat(path).catch(() => undefined) | |
| if (!file) return | |
| if (file.mtimeMs <= loadedMtimeMs) return | |
| const data = await readStore(path).catch((error) => { | |
| console.warn("[TelegramBridge] session store refresh failed", { path, error }) | |
| return | |
| }) | |
| if (!data) return | |
| applyData(data) | |
| loadedMtimeMs = file.mtimeMs | |
| }) |
| async function refreshFromDisk() { | ||
| await ready | ||
| const file = await stat(path).catch(() => undefined) | ||
| if (!file) return | ||
| if (file.mtimeMs <= loadedMtimeMs) return |
There was a problem hiding this comment.
refreshFromDisk() does an stat(path) call on every store operation (get/set/delete/history/etc.). This can become a hot-path syscall cost in the bridge (it calls the store frequently per update/event). Consider throttling refresh checks (e.g., only stat at most once every N ms) or only refreshing on specific read paths where cross-process staleness is expected.
| const mappedKeys = await runtime.store.sessionKeys(scopedSessionId) | ||
| const fallbackKeys = !mappedKeys.length && runtime.store.sessionMapKeys | ||
| ? await runtime.store.sessionMapKeys() | ||
| : [] | ||
| const keys = mappedKeys.length ? mappedKeys : fallbackKeys | ||
| if (!keys.length) { | ||
| return { mappedChats: 0, sessionKeys: 0, fallbackSessionKeys: 0, usedFallback: false } | ||
| } |
There was a problem hiding this comment.
The sessionMapKeys() fallback causes enableAlarmTargets() to enable notifications for every chat key in the session map when a session has no mapped keys (because it doesn't filter by sessionId/value). That can unintentionally subscribe unrelated chats. If you need a fallback, scan sessions entries and filter keys whose stored sessionId matches scopedSessionId (and/or legacy variants), or drop the fallback entirely when mappedKeys is empty.
| if (!store.sessionKeys || !store.notificationSet) { | ||
| return { mappedChats: 0, sessionKeys: 0, fallbackSessionKeys: 0, usedFallback: false } | ||
| } | ||
| const mappedKeys = await store.sessionKeys(sessionId) | ||
| const fallbackKeys = !mappedKeys.length && store.sessionMapKeys | ||
| ? await store.sessionMapKeys() | ||
| : [] | ||
| const keys = mappedKeys.length ? mappedKeys : fallbackKeys | ||
| if (!keys.length) { | ||
| return { mappedChats: 0, sessionKeys: 0, fallbackSessionKeys: 0, usedFallback: false } | ||
| } |
There was a problem hiding this comment.
enableTelegramNotifyForSession() uses sessionMapKeys() as a fallback when sessionKeys(sessionId) is empty, but that fallback enables notifications for all chats in the store (keys aren’t filtered by the requested sessionId). This can cause unrelated chats to start receiving notifications. Either remove the fallback or change it to iterate session map entries and filter by stored sessionId == sessionId (or known legacy formats).
| @@ -592,20 +575,15 @@ export function Session() { | |||
|
|
|||
| /** Mirror bell toggle to server-side alarm state (fire-and-forget). */ | |||
| function syncAlarmToServer(id: string, enabled: boolean) { | |||
| const sourceId = notifySourceId(); | |||
| if (sourceId) { | |||
| setSessionAlarm(url, id, enabled, sourceId); | |||
| return; | |||
| } | |||
| const dir = directory || base64Decode(params.dir); | |||
| resolveTelegramSourceId(url, dir).then((resolved) => { | |||
| if (resolved) { | |||
| setNotifySourceId(resolved); | |||
| setSessionAlarm(url, id, enabled, resolved); | |||
| return; | |||
| const dir = directory || base64Decode(params.dir) | |||
| console.log("[session] syncing telegram session alarm", { sessionId: id, enabled, directory: dir }) | |||
| setSessionAlarm(url, id, enabled, undefined, dir).then((ok) => { | |||
| if (ok) { | |||
| console.log("[session] telegram session alarm synced", { sessionId: id, enabled, directory: dir }) | |||
| return | |||
| } | |||
| console.warn("[session] skip setSessionAlarm: telegram source id unresolved", { id, dir }); | |||
| }); | |||
| console.warn("[session] telegram session alarm sync failed", { sessionId: id, enabled, directory: dir }) | |||
| }) | |||
There was a problem hiding this comment.
Session alarm reads/writes now omit sourceId entirely (getSessionAlarm(url, id) and setSessionAlarm(url, id, enabled, undefined, dir)). In multi-source mode this will read/write the default-scoped alarm state even when the session belongs to a non-default source, so the bell UI can desync and alarms may not work for non-default sources. Consider reintroducing directory→source resolution (e.g. resolveTelegramSourceId) or passing scopedSessionId so the correct source-scoped alarm key is used.
| async fetch(req, server) { | ||
| const url = new URL(req.url) | ||
| if (!allowTelegramHealthRequest(server.requestIP(req)?.address)) { | ||
| return new Response("Not Found", { status: 404 }) | ||
| } | ||
| if (url.pathname === "/session-alarm" && (req.method === "GET" || req.method === "PUT")) { | ||
| return handleSessionAlarmEndpoint(runtime, req, url) | ||
| } |
There was a problem hiding this comment.
/session-alarm is gated by allowTelegramHealthRequest(), which becomes fully public when TELEGRAM_HEALTH_PUBLIC is enabled. Exposing a PUT endpoint that can toggle alarms and spawn outbound streams publicly is a security risk. Consider forcing /session-alarm to be local-only regardless of TELEGRAM_HEALTH_PUBLIC, or require an authenticated secret/token distinct from the health check.
a9767bf to
087a5e0
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| writeNotifyMap(map); | ||
| }); | ||
| resolveTelegramSourceId(url, dir) | ||
| .then((sourceId) => getSessionAlarm(url, id, sourceId)) |
There was a problem hiding this comment.
resolveTelegramSourceId(url, dir) can return undefined (e.g. multi-source enabled but no matching source for this directory). In that case this code calls getSessionAlarm without a sourceId, which will query the default source and can display the wrong alarm state for the current directory. Handle the undefined case explicitly (skip the request and keep local state, or surface a warning/error) rather than falling back to default implicitly.
| .then((sourceId) => getSessionAlarm(url, id, sourceId)) | |
| .then((sourceId) => { | |
| if (sourceId === undefined) return undefined; | |
| return getSessionAlarm(url, id, sourceId); | |
| }) |
| const dir = directory || base64Decode(params.dir) | ||
| console.log("[session] syncing telegram session alarm", { sessionId: id, enabled, directory: dir }) | ||
| resolveTelegramSourceId(url, dir) | ||
| .then((sourceId) => setSessionAlarm(url, id, enabled, sourceId, dir)) |
There was a problem hiding this comment.
If resolveTelegramSourceId(url, dir) resolves to undefined, this still calls setSessionAlarm without a sourceId, which will update the default source. In multi-source setups this can write the alarm state to the wrong source/directory. Only call setSessionAlarm once a sourceId is resolved; otherwise log and treat it as a failed sync.
| .then((sourceId) => setSessionAlarm(url, id, enabled, sourceId, dir)) | |
| .then((sourceId) => { | |
| if (sourceId === undefined) { | |
| console.warn("[session] telegram session alarm sync failed: unable to resolve telegram source", { | |
| sessionId: id, | |
| enabled, | |
| directory: dir, | |
| }) | |
| return false | |
| } | |
| return setSessionAlarm(url, id, enabled, sourceId, dir) | |
| }) |
| function logInboundUpdate(update: TelegramUpdate) { | ||
| const message = update.message | ||
| const text = logPreview(message?.text) | ||
| if (message?.chat?.id && text) { | ||
| console.log("[TelegramBridge] inbound message", { | ||
| updateId: update.update_id, | ||
| chatId: message.chat.id, | ||
| userId: message.from?.id, | ||
| text, | ||
| }) | ||
| return | ||
| } | ||
|
|
||
| const callback = update.callback_query | ||
| const data = logPreview(callback?.data) | ||
| if (callback?.message?.chat?.id && callback?.id && data) { | ||
| console.log("[TelegramBridge] inbound callback", { | ||
| updateId: update.update_id, | ||
| callbackId: callback.id, | ||
| chatId: callback.message.chat.id, | ||
| userId: callback.from?.id, | ||
| data, | ||
| }) |
There was a problem hiding this comment.
This logs inbound Telegram message text / callback data along with user/chat IDs. Even truncated, this can leak user content into application logs and can be a compliance/privacy issue in production. Consider removing the text/data fields (log only IDs + event type) or gating these logs behind an explicit debug flag/env var.
| function logInboundUpdate(update: TelegramUpdate) { | |
| const message = update.message | |
| const text = logPreview(message?.text) | |
| if (message?.chat?.id && text) { | |
| console.log("[TelegramBridge] inbound message", { | |
| updateId: update.update_id, | |
| chatId: message.chat.id, | |
| userId: message.from?.id, | |
| text, | |
| }) | |
| return | |
| } | |
| const callback = update.callback_query | |
| const data = logPreview(callback?.data) | |
| if (callback?.message?.chat?.id && callback?.id && data) { | |
| console.log("[TelegramBridge] inbound callback", { | |
| updateId: update.update_id, | |
| callbackId: callback.id, | |
| chatId: callback.message.chat.id, | |
| userId: callback.from?.id, | |
| data, | |
| }) | |
| function shouldLogTelegramContent(): boolean { | |
| return process.env.TELEGRAM_BRIDGE_LOG_CONTENT === "true" | |
| } | |
| function logInboundUpdate(update: TelegramUpdate) { | |
| const message = update.message | |
| if (message?.chat?.id) { | |
| const payload: { | |
| updateId: number | |
| chatId: number | |
| userId: number | undefined | |
| text?: string | |
| } = { | |
| updateId: update.update_id, | |
| chatId: message.chat.id, | |
| userId: message.from?.id, | |
| } | |
| if (shouldLogTelegramContent()) { | |
| const text = logPreview(message?.text) | |
| if (text) payload.text = text | |
| } | |
| console.log("[TelegramBridge] inbound message", payload) | |
| return | |
| } | |
| const callback = update.callback_query | |
| if (callback?.message?.chat?.id && callback?.id) { | |
| const payload: { | |
| updateId: number | |
| callbackId: string | |
| chatId: number | |
| userId: number | undefined | |
| data?: string | |
| } = { | |
| updateId: update.update_id, | |
| callbackId: callback.id, | |
| chatId: callback.message.chat.id, | |
| userId: callback.from?.id, | |
| } | |
| if (shouldLogTelegramContent()) { | |
| const data = logPreview(callback?.data) | |
| if (data) payload.data = data | |
| } | |
| console.log("[TelegramBridge] inbound callback", payload) |
| if (method === "sendMessage") { | ||
| console.log("[TelegramBridge] outbound message", { | ||
| chatId: typeof body.chat_id === "number" ? body.chat_id : undefined, | ||
| text: logPreview(body.text), |
There was a problem hiding this comment.
Logging outbound message body.text can expose user/session content in logs and may be very noisy. Consider redacting the message body (log length or a hash), or guard this behind a debug/trace flag so production logs don’t capture message content by default.
| text: logPreview(body.text), | |
| textLength: typeof body.text === "string" ? body.text.length : undefined, |
| runtime.outboundStreams = streams | ||
| if (streams.has(key)) return | ||
| streams.add(key) | ||
| void runOutboundNotificationsForDirectory(runtime, source, directory) |
There was a problem hiding this comment.
ensureOutboundNotifications marks a stream as started and fires runOutboundNotificationsForDirectory without awaiting/handling rejection. If runOutboundNotificationsForDirectory returns early (e.g. config unavailable) or ever throws before entering the retry loop, the key stays in outboundStreams and future calls won’t retry, and an unhandled rejection is possible. Attach a .catch(...) to log and remove the key on failure/exit (or only add the key after successful startup).
| void runOutboundNotificationsForDirectory(runtime, source, directory) | |
| void runOutboundNotificationsForDirectory(runtime, source, directory) | |
| .catch((error) => { | |
| console.error(`[TelegramBridge] outbound notifications stopped for source=${source.id}`, error) | |
| }) | |
| .finally(() => { | |
| streams.delete(key) | |
| }) |
Summary
/session-alarmhandling, and clearer single-source health messagingValidation