diff --git a/.changeset/tidy-spoons-stream.md b/.changeset/tidy-spoons-stream.md
new file mode 100644
index 00000000..fd1f26ff
--- /dev/null
+++ b/.changeset/tidy-spoons-stream.md
@@ -0,0 +1,6 @@
+---
+"chat": minor
+"@chat-adapter/telegram": minor
+---
+
+Add native Telegram DM draft streaming with markdown-safe segment splitting, and expose segmented stream results in the chat core.
diff --git a/README.md b/README.md
index 1a45546e..63397e6a 100644
--- a/README.md
+++ b/README.md
@@ -52,7 +52,7 @@ Browse official, vendor-official, and community adapters on [chat-sdk.dev/adapte
## Features
- [**Event handlers**](https://chat-sdk.dev/docs/usage) — mentions, messages, reactions, button clicks, slash commands, modals
-- [**AI streaming**](https://chat-sdk.dev/docs/streaming) — stream LLM responses with native Slack streaming and post+edit fallback
+- [**AI streaming**](https://chat-sdk.dev/docs/streaming) — stream LLM responses with native Slack streaming, Telegram DM drafts, and post+edit fallback
- [**Cards**](https://chat-sdk.dev/docs/cards) — JSX-based interactive cards (Block Kit, Adaptive Cards, Google Chat Cards)
- [**Actions**](https://chat-sdk.dev/docs/actions) — handle button clicks and dropdown selections
- [**Modals**](https://chat-sdk.dev/docs/modals) — form dialogs with text inputs, dropdowns, and validation
diff --git a/apps/docs/content/docs/adapters.mdx b/apps/docs/content/docs/adapters.mdx
index ca31f6e9..a1641fd2 100644
--- a/apps/docs/content/docs/adapters.mdx
+++ b/apps/docs/content/docs/adapters.mdx
@@ -23,7 +23,7 @@ Ready to build your own? Follow the [building](/docs/contributing/building) guid
| Edit message | | | | | | | Partial | | |
| Delete message | | | | | | | Partial | | |
| File uploads | | | | | Single file/media | | | Images, audio, docs | |
-| Streaming | Native | Native (DMs) / Buffered | Post+Edit | Post+Edit | Post+Edit | Buffered | Agent sessions / Post+Edit | Buffered | Buffered |
+| Streaming | Native | Native (DMs) / Buffered | Post+Edit | Post+Edit | DM Draft + Post+Edit fallback | Buffered | Agent sessions / Post+Edit | Buffered | Buffered |
| Scheduled messages | Native | | | | | | | | |
### Rich content
diff --git a/apps/docs/content/docs/index.mdx b/apps/docs/content/docs/index.mdx
index 8a997904..3136ddf0 100644
--- a/apps/docs/content/docs/index.mdx
+++ b/apps/docs/content/docs/index.mdx
@@ -55,7 +55,7 @@ Each adapter factory auto-detects credentials from environment variables (`SLACK
| Microsoft Teams | `@chat-adapter/teams` | Yes | Read-only | Yes | Yes | Native (DMs) / Buffered | Yes |
| Google Chat | `@chat-adapter/gchat` | Yes | Yes | Yes | No | Post+Edit | Yes |
| Discord | `@chat-adapter/discord` | Yes | Yes | Yes | No | Post+Edit | Yes |
-| Telegram | `@chat-adapter/telegram` | Yes | Yes | Partial | No | Post+Edit | Yes |
+| Telegram | `@chat-adapter/telegram` | Yes | Yes | Partial | No | DM Draft + Fallback | Yes |
| GitHub | `@chat-adapter/github` | Yes | Yes | No | No | Buffered | No |
| Linear | `@chat-adapter/linear` | Yes | Yes | No | No | Agent sessions / Post+Edit | No |
| WhatsApp | `@chat-adapter/whatsapp` | N/A | Yes | Partial | No | Buffered | Yes |
diff --git a/apps/docs/content/docs/streaming.mdx b/apps/docs/content/docs/streaming.mdx
index 1e207470..588a5b13 100644
--- a/apps/docs/content/docs/streaming.mdx
+++ b/apps/docs/content/docs/streaming.mdx
@@ -59,10 +59,10 @@ await thread.post(stream);
| Platform | Method | Description |
|----------|--------|-------------|
| Slack | Native streaming API | Uses Slack's `chatStream` for smooth, real-time updates |
+| Telegram | Private chat draft streaming | Uses Telegram's `sendMessageDraft` in private chats and falls back to post + edit elsewhere |
| Teams | Native (DMs) / Buffered (group chats) | Uses the Teams SDK's native `stream.emit()` for direct messages; accumulates chunks and posts one final message when no native streamer is active |
| Google Chat | Post + Edit | Posts a message then edits it as chunks arrive |
| Discord | Post + Edit | Posts a message then edits it as chunks arrive |
-| Telegram | Post + Edit | Posts a message then edits it as chunks arrive |
| GitHub | Buffered | Accumulates chunks and posts one final comment |
| Linear | Agent sessions / Post + Edit | Uses agent session activities in agent-session threads; falls back to post+edit comments in issue threads |
| WhatsApp | Buffered | Accumulates chunks and sends one final message |
diff --git a/packages/adapter-telegram/README.md b/packages/adapter-telegram/README.md
index 923c8c0f..cc190a3f 100644
--- a/packages/adapter-telegram/README.md
+++ b/packages/adapter-telegram/README.md
@@ -144,7 +144,7 @@ TELEGRAM_API_BASE_URL=https://api.telegram.org
| Delete message | Yes |
| File uploads | Single file (`sendDocument`) |
| Attachment uploads | Single image/audio/video/file (`sendPhoto`, `sendAudio`, `sendVideo`, `sendDocument`) |
-| Streaming | Post+Edit fallback |
+| Streaming | DM Draft + Post+Edit fallback |
### Rich content
diff --git a/packages/adapter-telegram/src/index.test.ts b/packages/adapter-telegram/src/index.test.ts
index 76223c9f..04823c93 100644
--- a/packages/adapter-telegram/src/index.test.ts
+++ b/packages/adapter-telegram/src/index.test.ts
@@ -911,6 +911,591 @@ describe("TelegramAdapter", () => {
expect(sendMessageBody.text).toBe("hello");
});
+ it("streams draft updates for private chats and sends a final message", async () => {
+ mockFetch
+ .mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ )
+ .mockResolvedValueOnce(telegramOk(true))
+ .mockResolvedValueOnce(telegramOk(true))
+ .mockResolvedValueOnce(
+ telegramOk(
+ sampleMessage({
+ text: "hello world",
+ })
+ )
+ );
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ async function* textStream(): AsyncIterable {
+ yield "hello";
+ yield " world";
+ }
+
+ const result = await adapter.stream("telegram:123", textStream(), {
+ updateIntervalMs: 0,
+ });
+
+ expect(result).not.toBeNull();
+ expect(result?.id).toBe("123:11");
+ expect(result?.threadId).toBe("telegram:123");
+
+ const firstDraftUrl = String(mockFetch.mock.calls[1]?.[0]);
+ const secondDraftUrl = String(mockFetch.mock.calls[2]?.[0]);
+ const finalSendUrl = String(mockFetch.mock.calls[3]?.[0]);
+
+ expect(firstDraftUrl).toContain("/sendMessageDraft");
+ expect(secondDraftUrl).toContain("/sendMessageDraft");
+ expect(finalSendUrl).toContain("/sendMessage");
+
+ const firstDraftBody = JSON.parse(
+ String((mockFetch.mock.calls[1]?.[1] as RequestInit).body)
+ ) as {
+ chat_id: string;
+ draft_id: number;
+ parse_mode?: string;
+ text: string;
+ };
+
+ const secondDraftBody = JSON.parse(
+ String((mockFetch.mock.calls[2]?.[1] as RequestInit).body)
+ ) as {
+ chat_id: string;
+ draft_id: number;
+ parse_mode?: string;
+ text: string;
+ };
+
+ const finalSendBody = JSON.parse(
+ String((mockFetch.mock.calls[3]?.[1] as RequestInit).body)
+ ) as { chat_id: string; parse_mode?: string; text: string };
+
+ expect(firstDraftBody.chat_id).toBe("123");
+ expect(firstDraftBody.text).toBe("hello");
+ expect(firstDraftBody.parse_mode).toBe("MarkdownV2");
+ expect(secondDraftBody.draft_id).toBe(firstDraftBody.draft_id);
+ expect(secondDraftBody.text).toBe("hello world");
+ expect(finalSendBody.chat_id).toBe("123");
+ expect(finalSendBody.text).toBe("hello world");
+ expect(finalSendBody.parse_mode).toBe("MarkdownV2");
+ });
+
+ it("splits long private chat streams into multiple final messages", async () => {
+ const longPrefix = "a".repeat(3600);
+ const longSuffix = "b".repeat(1200);
+
+ mockFetch
+ .mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ )
+ .mockResolvedValueOnce(telegramOk(true))
+ .mockResolvedValueOnce(
+ telegramOk(
+ sampleMessage({
+ message_id: 21,
+ text: "a".repeat(3500),
+ })
+ )
+ )
+ .mockResolvedValueOnce(telegramOk(true))
+ .mockResolvedValueOnce(
+ telegramOk(
+ sampleMessage({
+ message_id: 22,
+ text: `${"a".repeat(100)}${"b".repeat(1200)}`,
+ })
+ )
+ );
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ async function* textStream(): AsyncIterable {
+ yield longPrefix;
+ yield longSuffix;
+ }
+
+ const result = await adapter.stream("telegram:123", textStream(), {
+ updateIntervalMs: Number.MAX_SAFE_INTEGER,
+ });
+
+ expect(result).not.toBeNull();
+ expect(result).toHaveProperty("messages");
+ if (!(result && "messages" in result)) {
+ throw new Error("Expected segmented stream result");
+ }
+ expect(result.messages).toHaveLength(2);
+ expect(result.messages[0]?.message.id).toBe("123:21");
+ expect(result.messages[1]?.message.id).toBe("123:22");
+
+ const draftBodies = mockFetch.mock.calls
+ .slice(1)
+ .filter((call) => String(call[0]).endsWith("/sendMessageDraft"))
+ .map(
+ (call) =>
+ JSON.parse(String((call[1] as RequestInit).body)) as {
+ draft_id: number;
+ text: string;
+ }
+ );
+ const sendBodies = mockFetch.mock.calls
+ .slice(1)
+ .filter((call) => String(call[0]).endsWith("/sendMessage"))
+ .map(
+ (call) =>
+ JSON.parse(String((call[1] as RequestInit).body)) as {
+ chat_id: string;
+ text: string;
+ }
+ );
+
+ expect(draftBodies).toHaveLength(2);
+ expect(draftBodies[0]?.text).toHaveLength(3500);
+ expect(draftBodies[1]?.text).toHaveLength(1300);
+ expect(draftBodies[1]?.draft_id).not.toBe(draftBodies[0]?.draft_id);
+
+ expect(sendBodies).toHaveLength(2);
+ expect(sendBodies[0]?.chat_id).toBe("123");
+ expect(sendBodies[0]?.text).toHaveLength(3500);
+ expect(sendBodies[1]?.text).toHaveLength(1300);
+ expect(`${sendBodies[0]?.text ?? ""}${sendBodies[1]?.text ?? ""}`).toBe(
+ `${longPrefix}${longSuffix}`
+ );
+ });
+
+ it("splits long markdown streams on a clean prefix before unbalanced markers", async () => {
+ const longPrefix = `${"a".repeat(3498)}**bo`;
+ const longSuffix = "ld**!";
+
+ mockFetch
+ .mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ )
+ .mockResolvedValueOnce(telegramOk(true))
+ .mockResolvedValueOnce(
+ telegramOk(
+ sampleMessage({
+ message_id: 31,
+ text: "a".repeat(3498),
+ })
+ )
+ )
+ .mockResolvedValueOnce(telegramOk(true))
+ .mockResolvedValueOnce(
+ telegramOk(
+ sampleMessage({
+ message_id: 32,
+ text: "**bold**!",
+ })
+ )
+ );
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ async function* textStream(): AsyncIterable {
+ yield longPrefix;
+ yield longSuffix;
+ }
+
+ const result = await adapter.stream("telegram:123", textStream(), {
+ updateIntervalMs: Number.MAX_SAFE_INTEGER,
+ });
+
+ expect(result).not.toBeNull();
+ expect(result).toHaveProperty("messages");
+ if (!(result && "messages" in result)) {
+ throw new Error("Expected segmented stream result");
+ }
+
+ const sendBodies = mockFetch.mock.calls
+ .slice(1)
+ .filter((call) => String(call[0]).endsWith("/sendMessage"))
+ .map(
+ (call) =>
+ JSON.parse(String((call[1] as RequestInit).body)) as {
+ parse_mode?: string;
+ text: string;
+ }
+ );
+
+ expect(sendBodies).toHaveLength(2);
+ expect(sendBodies[0]?.text).toBe("a".repeat(3498));
+ expect(sendBodies[1]?.parse_mode).toBe("MarkdownV2");
+ expect(sendBodies[1]?.text).toBe("*bold*\\!");
+ });
+
+ it("keeps markdown parse mode for an exact-limit clean segment", async () => {
+ const longMarkdown = `${"a".repeat(3494)}**ok**`;
+ const renderedMarkdown = `${"a".repeat(3494)}*ok*`;
+ const requestBodies: Array<{
+ method: string;
+ body: { parse_mode?: string; text?: string };
+ }> = [];
+ let nextMessageId = 41;
+
+ mockFetch.mockImplementation(async (input, init) => {
+ const url = String(input);
+ const method = url.split("/").at(-1) ?? url;
+ const rawBody = (init as RequestInit | undefined)?.body;
+ const body =
+ typeof rawBody === "string"
+ ? (JSON.parse(rawBody) as { parse_mode?: string; text?: string })
+ : {};
+
+ requestBodies.push({ method, body });
+
+ if (method === "getMe") {
+ return telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ });
+ }
+
+ if (method === "sendMessageDraft") {
+ return telegramOk(true);
+ }
+
+ if (method === "sendMessage") {
+ return telegramOk(
+ sampleMessage({
+ message_id: nextMessageId++,
+ text: `${"a".repeat(3494)}ok`,
+ })
+ );
+ }
+
+ throw new Error(`Unexpected Telegram method in test: ${method}`);
+ });
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ async function* textStream(): AsyncIterable {
+ yield longMarkdown;
+ }
+
+ const result = await adapter.stream("telegram:123", textStream(), {
+ updateIntervalMs: Number.MAX_SAFE_INTEGER,
+ });
+
+ expect(result).not.toBeNull();
+ expect(
+ requestBodies.map((request) => ({
+ method: request.method,
+ len: request.body.text?.length,
+ tail: request.body.text?.slice(-10),
+ parse_mode: request.body.parse_mode,
+ }))
+ ).toEqual([
+ {
+ method: "getMe",
+ len: undefined,
+ tail: undefined,
+ parse_mode: undefined,
+ },
+ {
+ method: "sendMessageDraft",
+ len: renderedMarkdown.length,
+ tail: renderedMarkdown.slice(-10),
+ parse_mode: "MarkdownV2",
+ },
+ {
+ method: "sendMessage",
+ len: renderedMarkdown.length,
+ tail: renderedMarkdown.slice(-10),
+ parse_mode: "MarkdownV2",
+ },
+ ]);
+
+ const draftBody = requestBodies[1]?.body as {
+ parse_mode?: string;
+ text: string;
+ };
+ const finalSendBody = requestBodies[2]?.body as {
+ parse_mode?: string;
+ text: string;
+ };
+
+ expect(draftBody.parse_mode).toBe("MarkdownV2");
+ expect(draftBody.text).toBe(renderedMarkdown);
+ expect(finalSendBody.parse_mode).toBe("MarkdownV2");
+ expect(finalSendBody.text).toBe(renderedMarkdown);
+ });
+
+ it("splits streams by rendered MarkdownV2 length before final sends can truncate", async () => {
+ const sourceMarkdown = ".".repeat(3500);
+ const renderedMarkdown = "\\.".repeat(3500);
+ const requestBodies: Array<{
+ method: string;
+ body: { parse_mode?: string; text?: string };
+ }> = [];
+ let nextMessageId = 51;
+
+ mockFetch.mockImplementation(async (input, init) => {
+ const url = String(input);
+ const method = url.split("/").at(-1) ?? url;
+ const rawBody = (init as RequestInit | undefined)?.body;
+ const body =
+ typeof rawBody === "string"
+ ? (JSON.parse(rawBody) as { parse_mode?: string; text?: string })
+ : {};
+
+ requestBodies.push({ method, body });
+
+ if (method === "getMe") {
+ return telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ });
+ }
+
+ if (method === "sendMessageDraft") {
+ return telegramOk(true);
+ }
+
+ if (method === "sendMessage") {
+ return telegramOk(
+ sampleMessage({
+ message_id: nextMessageId++,
+ text: body.text ?? "",
+ })
+ );
+ }
+
+ throw new Error(`Unexpected Telegram method in test: ${method}`);
+ });
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ async function* textStream(): AsyncIterable {
+ yield sourceMarkdown;
+ }
+
+ const result = await adapter.stream("telegram:123", textStream(), {
+ updateIntervalMs: Number.MAX_SAFE_INTEGER,
+ });
+
+ expect(result).not.toBeNull();
+ expect(result).toHaveProperty("messages");
+ if (!(result && "messages" in result)) {
+ throw new Error("Expected segmented stream result");
+ }
+
+ const finalSendBodies = requestBodies
+ .filter((request) => request.method === "sendMessage")
+ .map((request) => request.body);
+
+ expect(finalSendBodies.length).toBeGreaterThan(1);
+ expect(result.messages).toHaveLength(finalSendBodies.length);
+ expect(
+ finalSendBodies.every((body) => body.parse_mode === "MarkdownV2")
+ ).toBe(true);
+ expect(
+ finalSendBodies.every(
+ (body) => (body.text?.length ?? 0) <= TELEGRAM_MESSAGE_LIMIT
+ )
+ ).toBe(true);
+ expect(finalSendBodies.map((body) => body.text ?? "").join("")).toBe(
+ renderedMarkdown
+ );
+ });
+
+ it("returns null for non-DM streaming so Chat SDK can use fallback streaming", async () => {
+ mockFetch.mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ );
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ async function* textStream(): AsyncIterable {
+ yield "hello";
+ }
+
+ const result = await adapter.stream("telegram:-100123", textStream(), {
+ updateIntervalMs: 0,
+ });
+
+ expect(result).toBeNull();
+ expect(mockFetch).toHaveBeenCalledTimes(1);
+ });
+
+ it("falls back to a final message when draft streaming updates fail", async () => {
+ mockFetch
+ .mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ )
+ .mockResolvedValueOnce(
+ telegramError(400, 400, "Bad Request: chat not found")
+ )
+ .mockResolvedValueOnce(
+ telegramOk(
+ sampleMessage({
+ text: "hello world",
+ })
+ )
+ );
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ async function* textStream(): AsyncIterable {
+ yield "hello";
+ yield " world";
+ }
+
+ const result = await adapter.stream("telegram:123", textStream(), {
+ updateIntervalMs: 0,
+ });
+
+ expect(result?.id).toBe("123:11");
+ expect(mockLogger.warn).toHaveBeenCalledWith(
+ "Telegram draft streaming update failed",
+ expect.objectContaining({
+ threadId: "telegram:123",
+ })
+ );
+
+ const finalSendUrl = String(mockFetch.mock.calls[2]?.[0]);
+ expect(finalSendUrl).toContain("/sendMessage");
+ });
+
+ it("continues to the final message when markdown draft retry also fails", async () => {
+ mockFetch
+ .mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ )
+ .mockResolvedValueOnce(
+ telegramError(
+ 400,
+ 400,
+ "Bad Request: can't parse entities: Can't find end of the entity"
+ )
+ )
+ .mockResolvedValueOnce(
+ telegramError(429, 429, "Too Many Requests: retry later")
+ )
+ .mockResolvedValueOnce(
+ telegramOk(
+ sampleMessage({
+ text: "**broken",
+ })
+ )
+ );
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ async function* textStream(): AsyncIterable {
+ yield "**broken";
+ }
+
+ const result = await adapter.stream("telegram:123", textStream(), {
+ updateIntervalMs: 0,
+ });
+
+ expect(result).not.toBeNull();
+ expect(mockLogger.warn).toHaveBeenCalledWith(
+ "Telegram draft streaming update failed",
+ expect.objectContaining({
+ threadId: "telegram:123",
+ })
+ );
+
+ const finalSendBody = JSON.parse(
+ String((mockFetch.mock.calls[3]?.[1] as RequestInit).body)
+ ) as { parse_mode?: string; text: string };
+
+ expect(finalSendBody.parse_mode).toBeUndefined();
+ expect(finalSendBody.text).toBe("**broken");
+ });
+
it("postChannelMessage does not double-prefix channel ID", async () => {
mockFetch
.mockResolvedValueOnce(
@@ -932,21 +1517,322 @@ describe("TelegramAdapter", () => {
await adapter.initialize(createMockChat());
- const posted = await adapter.postChannelMessage("telegram:123", {
- markdown: "channel message",
+ const posted = await adapter.postChannelMessage("telegram:123", {
+ markdown: "channel message",
+ });
+
+ expect(posted.threadId).toBe("telegram:123");
+
+ const sendMessageBody = JSON.parse(
+ String((mockFetch.mock.calls[1]?.[1] as RequestInit).body)
+ ) as { chat_id: string; text: string };
+
+ expect(sendMessageBody.chat_id).toBe("123");
+ expect(sendMessageBody.text).toBe("channel message");
+ });
+
+ it("postChannelMessage works with raw channel ID", async () => {
+ mockFetch
+ .mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ )
+ .mockResolvedValueOnce(telegramOk(sampleMessage()));
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ const posted = await adapter.postChannelMessage("123", {
+ markdown: "raw id message",
+ });
+
+ expect(posted.threadId).toBe("telegram:123");
+
+ const sendMessageBody = JSON.parse(
+ String((mockFetch.mock.calls[1]?.[1] as RequestInit).body)
+ ) as { chat_id: string; text: string };
+
+ expect(sendMessageBody.chat_id).toBe("123");
+ expect(sendMessageBody.text).toBe("raw id message");
+ });
+
+ it.each([
+ {
+ field: "photo",
+ method: "sendPhoto",
+ mimeType: "image/png",
+ name: "image.png",
+ type: "image",
+ },
+ {
+ field: "audio",
+ method: "sendAudio",
+ mimeType: "audio/mpeg",
+ name: "track.mp3",
+ type: "audio",
+ },
+ {
+ field: "video",
+ method: "sendVideo",
+ mimeType: "video/mp4",
+ name: "clip.mp4",
+ type: "video",
+ },
+ {
+ field: "document",
+ method: "sendDocument",
+ mimeType: "application/pdf",
+ name: "report.pdf",
+ type: "file",
+ },
+ ] as const)("posts $type attachments with Telegram $method uploads", async ({
+ field,
+ method,
+ mimeType,
+ name,
+ type,
+ }) => {
+ mockFetch
+ .mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ )
+ .mockResolvedValueOnce(telegramOk(sampleMessage()));
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ await adapter.postMessage("telegram:-100123:42", {
+ markdown: "attached **media**",
+ attachments: [
+ {
+ data: Buffer.from("payload"),
+ height: type === "video" ? 720 : undefined,
+ mimeType,
+ name,
+ type,
+ width: type === "video" ? 1280 : undefined,
+ },
+ ],
+ });
+
+ expect(String(mockFetch.mock.calls[1]?.[0])).toContain(`/${method}`);
+
+ const formData = readFormData(1);
+ const upload = formData.get(field);
+
+ expect(formData.get("chat_id")).toBe("-100123");
+ expect(formData.get("message_thread_id")).toBe("42");
+ expect(formData.get("caption")).toBe("attached *media*");
+ expect(formData.get("parse_mode")).toBe("MarkdownV2");
+ expect(upload).toBeInstanceOf(Blob);
+ expect((upload as { name?: string }).name).toBe(name);
+ expect((upload as Blob).type).toBe(mimeType);
+
+ if (type === "video") {
+ expect(formData.get("width")).toBe("1280");
+ expect(formData.get("height")).toBe("720");
+ }
+ });
+
+ it("posts attachment data loaded through fetchData", async () => {
+ mockFetch
+ .mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ )
+ .mockResolvedValueOnce(telegramOk(sampleMessage()));
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+ const fetchData = vi.fn().mockResolvedValue(Buffer.from("payload"));
+
+ await adapter.initialize(createMockChat());
+
+ await adapter.postMessage("telegram:123", {
+ raw: "",
+ attachments: [
+ {
+ fetchData,
+ mimeType: "application/pdf",
+ name: "report.pdf",
+ type: "file",
+ },
+ ],
+ });
+
+ const formData = readFormData(1);
+
+ expect(fetchData).toHaveBeenCalledOnce();
+ expect(String(mockFetch.mock.calls[1]?.[0])).toContain("/sendDocument");
+ expect(formData.get("caption")).toBeNull();
+ expect(formData.get("parse_mode")).toBeNull();
+ expect(formData.get("document")).toBeInstanceOf(Blob);
+ });
+
+ it("posts URL-only attachments through Telegram URL fields", async () => {
+ mockFetch
+ .mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ )
+ .mockResolvedValueOnce(telegramOk(sampleMessage()));
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ await adapter.postMessage("telegram:-100123:42", {
+ markdown: "public **image**",
+ attachments: [
+ {
+ mimeType: "image/png",
+ name: "image.png",
+ type: "image",
+ url: "https://cdn.example.com/image.png",
+ },
+ ],
});
- expect(posted.threadId).toBe("telegram:123");
+ expect(String(mockFetch.mock.calls[1]?.[0])).toContain("/sendPhoto");
+ expect(mockFetch.mock.calls[1]?.[1]?.headers).toEqual({
+ "Content-Type": "application/json",
+ });
+ expect(JSON.parse(String(mockFetch.mock.calls[1]?.[1]?.body))).toEqual({
+ caption: "public *image*",
+ chat_id: "-100123",
+ message_thread_id: 42,
+ parse_mode: "MarkdownV2",
+ photo: "https://cdn.example.com/image.png",
+ });
+ });
- const sendMessageBody = JSON.parse(
- String((mockFetch.mock.calls[1]?.[1] as RequestInit).body)
- ) as { chat_id: string; text: string };
+ it("rejects multiple Telegram attachments in one message", async () => {
+ mockFetch.mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ );
- expect(sendMessageBody.chat_id).toBe("123");
- expect(sendMessageBody.text).toBe("channel message");
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ await expect(
+ adapter.postMessage("telegram:123", {
+ raw: "attachments",
+ attachments: [
+ { data: Buffer.from("one"), type: "image" },
+ { data: Buffer.from("two"), type: "image" },
+ ],
+ })
+ ).rejects.toThrow("single attachment upload");
+ expect(mockFetch.mock.calls).toHaveLength(1);
});
- it("postChannelMessage works with raw channel ID", async () => {
+ it("rejects mixed file uploads and attachments", async () => {
+ mockFetch.mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ );
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ await expect(
+ adapter.postMessage("telegram:123", {
+ raw: "mixed",
+ attachments: [{ data: Buffer.from("one"), type: "image" }],
+ files: [{ data: Buffer.from("two"), filename: "two.txt" }],
+ })
+ ).rejects.toThrow("mixing file uploads and attachments");
+ expect(mockFetch.mock.calls).toHaveLength(1);
+ });
+
+ it("rejects attachments without upload data", async () => {
+ mockFetch.mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ );
+
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ await expect(
+ adapter.postMessage("telegram:123", {
+ raw: "",
+ attachments: [{ type: "image" }],
+ })
+ ).rejects.toThrow("Attachment data or URL required for image");
+ expect(mockFetch.mock.calls).toHaveLength(1);
+ });
+
+ it("sets parse_mode for markdown messages", async () => {
mockFetch
.mockResolvedValueOnce(
telegramOk({
@@ -967,56 +1853,18 @@ describe("TelegramAdapter", () => {
await adapter.initialize(createMockChat());
- const posted = await adapter.postChannelMessage("123", {
- markdown: "raw id message",
+ await adapter.postMessage("telegram:123", {
+ markdown: "**bold** and _italic_",
});
- expect(posted.threadId).toBe("telegram:123");
-
const sendMessageBody = JSON.parse(
String((mockFetch.mock.calls[1]?.[1] as RequestInit).body)
- ) as { chat_id: string; text: string };
+ ) as { parse_mode?: string };
- expect(sendMessageBody.chat_id).toBe("123");
- expect(sendMessageBody.text).toBe("raw id message");
+ expect(sendMessageBody.parse_mode).toBe("MarkdownV2");
});
- it.each([
- {
- field: "photo",
- method: "sendPhoto",
- mimeType: "image/png",
- name: "image.png",
- type: "image",
- },
- {
- field: "audio",
- method: "sendAudio",
- mimeType: "audio/mpeg",
- name: "track.mp3",
- type: "audio",
- },
- {
- field: "video",
- method: "sendVideo",
- mimeType: "video/mp4",
- name: "clip.mp4",
- type: "video",
- },
- {
- field: "document",
- method: "sendDocument",
- mimeType: "application/pdf",
- name: "report.pdf",
- type: "file",
- },
- ] as const)("posts $type attachments with Telegram $method uploads", async ({
- field,
- method,
- mimeType,
- name,
- type,
- }) => {
+ it("sets parse_mode for AST messages", async () => {
mockFetch
.mockResolvedValueOnce(
telegramOk({
@@ -1037,40 +1885,51 @@ describe("TelegramAdapter", () => {
await adapter.initialize(createMockChat());
- await adapter.postMessage("telegram:-100123:42", {
- markdown: "attached **media**",
- attachments: [
- {
- data: Buffer.from("payload"),
- height: type === "video" ? 720 : undefined,
- mimeType,
- name,
- type,
- width: type === "video" ? 1280 : undefined,
- },
- ],
- });
+ const ast = new TelegramFormatConverter().toAst("**hello** world!");
+ await adapter.postMessage("telegram:123", { ast });
- expect(String(mockFetch.mock.calls[1]?.[0])).toContain(`/${method}`);
+ const sendMessageBody = JSON.parse(
+ String((mockFetch.mock.calls[1]?.[1] as RequestInit).body)
+ ) as { parse_mode?: string; text: string };
- const formData = readFormData(1);
- const upload = formData.get(field);
+ // AST messages were shipping without parse_mode, so Telegram rendered
+ // MarkdownV2 asterisks literally. Guard against regression.
+ expect(sendMessageBody.parse_mode).toBe("MarkdownV2");
+ expect(sendMessageBody.text).toContain("*hello*");
+ expect(sendMessageBody.text).toContain("world\\!");
+ });
- expect(formData.get("chat_id")).toBe("-100123");
- expect(formData.get("message_thread_id")).toBe("42");
- expect(formData.get("caption")).toBe("attached *media*");
- expect(formData.get("parse_mode")).toBe("MarkdownV2");
- expect(upload).toBeInstanceOf(Blob);
- expect((upload as { name?: string }).name).toBe(name);
- expect((upload as Blob).type).toBe(mimeType);
+ it("omits parse_mode for plain string messages", async () => {
+ mockFetch
+ .mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ )
+ .mockResolvedValueOnce(telegramOk(sampleMessage()));
- if (type === "video") {
- expect(formData.get("width")).toBe("1280");
- expect(formData.get("height")).toBe("720");
- }
+ const adapter = createTelegramAdapter({
+ botToken: "token",
+ mode: "webhook",
+ logger: mockLogger,
+ userName: "mybot",
+ });
+
+ await adapter.initialize(createMockChat());
+
+ await adapter.postMessage("telegram:123", "plain text message");
+
+ const sendMessageBody = JSON.parse(
+ String((mockFetch.mock.calls[1]?.[1] as RequestInit).body)
+ ) as { parse_mode?: string };
+
+ expect(sendMessageBody.parse_mode).toBeUndefined();
});
- it("posts attachment data loaded through fetchData", async () => {
+ it("omits parse_mode for raw messages", async () => {
mockFetch
.mockResolvedValueOnce(
telegramOk({
@@ -1088,32 +1947,20 @@ describe("TelegramAdapter", () => {
logger: mockLogger,
userName: "mybot",
});
- const fetchData = vi.fn().mockResolvedValue(Buffer.from("payload"));
await adapter.initialize(createMockChat());
- await adapter.postMessage("telegram:123", {
- raw: "",
- attachments: [
- {
- fetchData,
- mimeType: "application/pdf",
- name: "report.pdf",
- type: "file",
- },
- ],
- });
+ await adapter.postMessage("telegram:123", { raw: "raw.unparsed!(text)" });
- const formData = readFormData(1);
+ const sendMessageBody = JSON.parse(
+ String((mockFetch.mock.calls[1]?.[1] as RequestInit).body)
+ ) as { parse_mode?: string; text: string };
- expect(fetchData).toHaveBeenCalledOnce();
- expect(String(mockFetch.mock.calls[1]?.[0])).toContain("/sendDocument");
- expect(formData.get("caption")).toBeNull();
- expect(formData.get("parse_mode")).toBeNull();
- expect(formData.get("document")).toBeInstanceOf(Blob);
+ expect(sendMessageBody.parse_mode).toBeUndefined();
+ expect(sendMessageBody.text).toBe("raw.unparsed!(text)");
});
- it("posts URL-only attachments through Telegram URL fields", async () => {
+ it("retries markdown messages without parse_mode when Telegram can't parse entities", async () => {
mockFetch
.mockResolvedValueOnce(
telegramOk({
@@ -1123,7 +1970,20 @@ describe("TelegramAdapter", () => {
username: "mybot",
})
)
- .mockResolvedValueOnce(telegramOk(sampleMessage()));
+ .mockResolvedValueOnce(
+ telegramError(
+ 400,
+ 400,
+ "Bad Request: can't parse entities: Can't find end of the entity"
+ )
+ )
+ .mockResolvedValueOnce(
+ telegramOk(
+ sampleMessage({
+ text: "**broken",
+ })
+ )
+ );
const adapter = createTelegramAdapter({
botToken: "token",
@@ -1134,40 +1994,55 @@ describe("TelegramAdapter", () => {
await adapter.initialize(createMockChat());
- await adapter.postMessage("telegram:-100123:42", {
- markdown: "public **image**",
- attachments: [
- {
- mimeType: "image/png",
- name: "image.png",
- type: "image",
- url: "https://cdn.example.com/image.png",
- },
- ],
+ const result = await adapter.postMessage("telegram:123", {
+ markdown: "**broken",
});
- expect(String(mockFetch.mock.calls[1]?.[0])).toContain("/sendPhoto");
- expect(mockFetch.mock.calls[1]?.[1]?.headers).toEqual({
- "Content-Type": "application/json",
- });
- expect(JSON.parse(String(mockFetch.mock.calls[1]?.[1]?.body))).toEqual({
- caption: "public *image*",
- chat_id: "-100123",
- message_thread_id: 42,
- parse_mode: "MarkdownV2",
- photo: "https://cdn.example.com/image.png",
- });
- });
+ expect(result.id).toBe("123:11");
- it("rejects multiple Telegram attachments in one message", async () => {
- mockFetch.mockResolvedValueOnce(
- telegramOk({
- id: 999,
- is_bot: true,
- first_name: "Bot",
- username: "mybot",
+ const firstSendBody = JSON.parse(
+ String((mockFetch.mock.calls[1]?.[1] as RequestInit).body)
+ ) as { parse_mode?: string; text: string };
+ const secondSendBody = JSON.parse(
+ String((mockFetch.mock.calls[2]?.[1] as RequestInit).body)
+ ) as { parse_mode?: string; text: string };
+
+ expect(firstSendBody.parse_mode).toBe("MarkdownV2");
+ expect(secondSendBody.parse_mode).toBeUndefined();
+ expect(secondSendBody.text).toBe("**broken");
+ expect(mockLogger.warn).toHaveBeenCalledWith(
+ "Telegram markdown parse failed; retrying without parse mode",
+ expect.objectContaining({
+ method: "sendMessage",
+ threadId: "telegram:123",
})
);
+ });
+
+ it("retries markdown messages with original text when plain-text fallback would be empty", async () => {
+ mockFetch
+ .mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ )
+ .mockResolvedValueOnce(
+ telegramError(
+ 400,
+ 400,
+ "Bad Request: can't parse entities: Can't find end of the entity"
+ )
+ )
+ .mockResolvedValueOnce(
+ telegramOk(
+ sampleMessage({
+ text: "**",
+ })
+ )
+ );
const adapter = createTelegramAdapter({
botToken: "token",
@@ -1178,56 +2053,33 @@ describe("TelegramAdapter", () => {
await adapter.initialize(createMockChat());
- await expect(
- adapter.postMessage("telegram:123", {
- raw: "attachments",
- attachments: [
- { data: Buffer.from("one"), type: "image" },
- { data: Buffer.from("two"), type: "image" },
- ],
- })
- ).rejects.toThrow("single attachment upload");
- expect(mockFetch.mock.calls).toHaveLength(1);
- });
-
- it("rejects mixed file uploads and attachments", async () => {
- mockFetch.mockResolvedValueOnce(
- telegramOk({
- id: 999,
- is_bot: true,
- first_name: "Bot",
- username: "mybot",
- })
- );
-
- const adapter = createTelegramAdapter({
- botToken: "token",
- mode: "webhook",
- logger: mockLogger,
- userName: "mybot",
+ const result = await adapter.postMessage("telegram:123", {
+ markdown: "**",
});
- await adapter.initialize(createMockChat());
+ expect(result.id).toBe("123:11");
- await expect(
- adapter.postMessage("telegram:123", {
- raw: "mixed",
- attachments: [{ data: Buffer.from("one"), type: "image" }],
- files: [{ data: Buffer.from("two"), filename: "two.txt" }],
- })
- ).rejects.toThrow("mixing file uploads and attachments");
- expect(mockFetch.mock.calls).toHaveLength(1);
+ const secondSendBody = JSON.parse(
+ String((mockFetch.mock.calls[2]?.[1] as RequestInit).body)
+ ) as { parse_mode?: string; text: string };
+
+ expect(secondSendBody.parse_mode).toBeUndefined();
+ expect(secondSendBody.text).toBe("**");
});
- it("rejects attachments without upload data", async () => {
- mockFetch.mockResolvedValueOnce(
- telegramOk({
- id: 999,
- is_bot: true,
- first_name: "Bot",
- username: "mybot",
- })
- );
+ it("does not swallow non-parse validation errors during markdown send", async () => {
+ mockFetch
+ .mockResolvedValueOnce(
+ telegramOk({
+ id: 999,
+ is_bot: true,
+ first_name: "Bot",
+ username: "mybot",
+ })
+ )
+ .mockResolvedValueOnce(
+ telegramError(400, 400, "Bad Request: chat not found")
+ );
const adapter = createTelegramAdapter({
botToken: "token",
@@ -1240,14 +2092,12 @@ describe("TelegramAdapter", () => {
await expect(
adapter.postMessage("telegram:123", {
- raw: "",
- attachments: [{ type: "image" }],
+ markdown: "**broken**",
})
- ).rejects.toThrow("Attachment data or URL required for image");
- expect(mockFetch.mock.calls).toHaveLength(1);
+ ).rejects.toThrow("Bad Request: chat not found");
});
- it("sets parse_mode for markdown messages", async () => {
+ it("retries markdown edits without parse_mode when Telegram can't parse entities", async () => {
mockFetch
.mockResolvedValueOnce(
telegramOk({
@@ -1257,7 +2107,21 @@ describe("TelegramAdapter", () => {
username: "mybot",
})
)
- .mockResolvedValueOnce(telegramOk(sampleMessage()));
+ .mockResolvedValueOnce(telegramOk(sampleMessage()))
+ .mockResolvedValueOnce(
+ telegramError(
+ 400,
+ 400,
+ "Bad Request: can't parse entities: Can't find end of the entity"
+ )
+ )
+ .mockResolvedValueOnce(
+ telegramOk(
+ sampleMessage({
+ text: "**broken",
+ })
+ )
+ );
const adapter = createTelegramAdapter({
botToken: "token",
@@ -1268,18 +2132,34 @@ describe("TelegramAdapter", () => {
await adapter.initialize(createMockChat());
- await adapter.postMessage("telegram:123", {
- markdown: "**bold** and _italic_",
+ const posted = await adapter.postMessage("telegram:123", "hello");
+ const result = await adapter.editMessage("telegram:123", posted.id, {
+ markdown: "**broken",
});
- const sendMessageBody = JSON.parse(
- String((mockFetch.mock.calls[1]?.[1] as RequestInit).body)
- ) as { parse_mode?: string };
+ expect(result.id).toBe("123:11");
- expect(sendMessageBody.parse_mode).toBe("MarkdownV2");
+ const firstEditBody = JSON.parse(
+ String((mockFetch.mock.calls[2]?.[1] as RequestInit).body)
+ ) as { parse_mode?: string; text: string };
+ const secondEditBody = JSON.parse(
+ String((mockFetch.mock.calls[3]?.[1] as RequestInit).body)
+ ) as { parse_mode?: string; text: string };
+
+ expect(firstEditBody.parse_mode).toBe("MarkdownV2");
+ expect(secondEditBody.parse_mode).toBeUndefined();
+ expect(secondEditBody.text).toBe("**broken");
+ expect(mockLogger.warn).toHaveBeenCalledWith(
+ "Telegram markdown parse failed; retrying without parse mode",
+ expect.objectContaining({
+ messageId: posted.id,
+ method: "editMessageText",
+ threadId: "telegram:123",
+ })
+ );
});
- it("sets parse_mode for AST messages", async () => {
+ it("retries markdown edits with original text when plain-text fallback would be empty", async () => {
mockFetch
.mockResolvedValueOnce(
telegramOk({
@@ -1289,7 +2169,21 @@ describe("TelegramAdapter", () => {
username: "mybot",
})
)
- .mockResolvedValueOnce(telegramOk(sampleMessage()));
+ .mockResolvedValueOnce(telegramOk(sampleMessage()))
+ .mockResolvedValueOnce(
+ telegramError(
+ 400,
+ 400,
+ "Bad Request: can't parse entities: Can't find end of the entity"
+ )
+ )
+ .mockResolvedValueOnce(
+ telegramOk(
+ sampleMessage({
+ text: "**",
+ })
+ )
+ );
const adapter = createTelegramAdapter({
botToken: "token",
@@ -1300,21 +2194,22 @@ describe("TelegramAdapter", () => {
await adapter.initialize(createMockChat());
- const ast = new TelegramFormatConverter().toAst("**hello** world!");
- await adapter.postMessage("telegram:123", { ast });
+ const posted = await adapter.postMessage("telegram:123", "hello");
+ const result = await adapter.editMessage("telegram:123", posted.id, {
+ markdown: "**",
+ });
- const sendMessageBody = JSON.parse(
- String((mockFetch.mock.calls[1]?.[1] as RequestInit).body)
+ expect(result.id).toBe("123:11");
+
+ const secondEditBody = JSON.parse(
+ String((mockFetch.mock.calls[3]?.[1] as RequestInit).body)
) as { parse_mode?: string; text: string };
- // AST messages were shipping without parse_mode, so Telegram rendered
- // MarkdownV2 asterisks literally. Guard against regression.
- expect(sendMessageBody.parse_mode).toBe("MarkdownV2");
- expect(sendMessageBody.text).toContain("*hello*");
- expect(sendMessageBody.text).toContain("world\\!");
+ expect(secondEditBody.parse_mode).toBeUndefined();
+ expect(secondEditBody.text).toBe("**");
});
- it("omits parse_mode for plain string messages", async () => {
+ it("falls back to plain-text draft and final send when Telegram can't parse streamed markdown", async () => {
mockFetch
.mockResolvedValueOnce(
telegramOk({
@@ -1324,7 +2219,21 @@ describe("TelegramAdapter", () => {
username: "mybot",
})
)
- .mockResolvedValueOnce(telegramOk(sampleMessage()));
+ .mockResolvedValueOnce(
+ telegramError(
+ 400,
+ 400,
+ "Bad Request: can't parse entities: Can't find end of the entity"
+ )
+ )
+ .mockResolvedValueOnce(telegramOk(true))
+ .mockResolvedValueOnce(
+ telegramOk(
+ sampleMessage({
+ text: "**broken",
+ })
+ )
+ );
const adapter = createTelegramAdapter({
botToken: "token",
@@ -1335,16 +2244,30 @@ describe("TelegramAdapter", () => {
await adapter.initialize(createMockChat());
- await adapter.postMessage("telegram:123", "plain text message");
+ async function* textStream(): AsyncIterable {
+ yield "**broken";
+ }
- const sendMessageBody = JSON.parse(
- String((mockFetch.mock.calls[1]?.[1] as RequestInit).body)
- ) as { parse_mode?: string };
+ const result = await adapter.stream("telegram:123", textStream(), {
+ updateIntervalMs: 0,
+ });
- expect(sendMessageBody.parse_mode).toBeUndefined();
+ expect(result?.id).toBe("123:11");
+
+ const retryDraftBody = JSON.parse(
+ String((mockFetch.mock.calls[2]?.[1] as RequestInit).body)
+ ) as { parse_mode?: string; text: string };
+ const finalSendBody = JSON.parse(
+ String((mockFetch.mock.calls[3]?.[1] as RequestInit).body)
+ ) as { parse_mode?: string; text: string };
+
+ expect(retryDraftBody.parse_mode).toBeUndefined();
+ expect(retryDraftBody.text).toBe("**broken");
+ expect(finalSendBody.parse_mode).toBeUndefined();
+ expect(finalSendBody.text).toBe("**broken");
});
- it("omits parse_mode for raw messages", async () => {
+ it("reuses original text when streamed plain-text fallback would be empty", async () => {
mockFetch
.mockResolvedValueOnce(
telegramOk({
@@ -1354,7 +2277,21 @@ describe("TelegramAdapter", () => {
username: "mybot",
})
)
- .mockResolvedValueOnce(telegramOk(sampleMessage()));
+ .mockResolvedValueOnce(
+ telegramError(
+ 400,
+ 400,
+ "Bad Request: can't parse entities: Can't find end of the entity"
+ )
+ )
+ .mockResolvedValueOnce(telegramOk(true))
+ .mockResolvedValueOnce(
+ telegramOk(
+ sampleMessage({
+ text: "**",
+ })
+ )
+ );
const adapter = createTelegramAdapter({
botToken: "token",
@@ -1365,14 +2302,27 @@ describe("TelegramAdapter", () => {
await adapter.initialize(createMockChat());
- await adapter.postMessage("telegram:123", { raw: "raw.unparsed!(text)" });
+ async function* textStream(): AsyncIterable {
+ yield "**";
+ }
- const sendMessageBody = JSON.parse(
- String((mockFetch.mock.calls[1]?.[1] as RequestInit).body)
+ const result = await adapter.stream("telegram:123", textStream(), {
+ updateIntervalMs: 0,
+ });
+
+ expect(result?.id).toBe("123:11");
+
+ const retryDraftBody = JSON.parse(
+ String((mockFetch.mock.calls[2]?.[1] as RequestInit).body)
+ ) as { parse_mode?: string; text: string };
+ const finalSendBody = JSON.parse(
+ String((mockFetch.mock.calls[3]?.[1] as RequestInit).body)
) as { parse_mode?: string; text: string };
- expect(sendMessageBody.parse_mode).toBeUndefined();
- expect(sendMessageBody.text).toBe("raw.unparsed!(text)");
+ expect(retryDraftBody.parse_mode).toBeUndefined();
+ expect(retryDraftBody.text).toBe("**");
+ expect(finalSendBody.parse_mode).toBeUndefined();
+ expect(finalSendBody.text).toBe("**");
});
it("posts cards with inline keyboard buttons", async () => {
diff --git a/packages/adapter-telegram/src/index.ts b/packages/adapter-telegram/src/index.ts
index effac2c4..c88523c9 100644
--- a/packages/adapter-telegram/src/index.ts
+++ b/packages/adapter-telegram/src/index.ts
@@ -23,6 +23,9 @@ import type {
FormattedContent,
Logger,
RawMessage,
+ StreamChunk,
+ StreamOptions,
+ StreamResult,
ThreadInfo,
UserInfo,
WebhookOptions,
@@ -33,7 +36,10 @@ import {
defaultEmojiResolver,
getEmoji,
Message,
+ markdownToPlainText,
NotImplementedError,
+ StreamingMarkdownRenderer,
+ toPlainText,
} from "chat";
import {
cardToTelegramInlineKeyboard,
@@ -94,6 +100,12 @@ const EMOJI_NAME_PATTERN = /^[a-z0-9_+-]+$/i;
const TELEGRAM_DEFAULT_POLLING_TIMEOUT_SECONDS = 30;
const TELEGRAM_DEFAULT_POLLING_LIMIT = 100;
const TELEGRAM_DEFAULT_POLLING_RETRY_DELAY_MS = 1000;
+const TELEGRAM_DEFAULT_STREAM_UPDATE_INTERVAL_MS = 250;
+// Keep streaming segments below Telegram's 4096-character hard limit to leave
+// room for Markdown parsing and avoid truncating the final sent message.
+const TELEGRAM_STREAM_SEGMENT_LIMIT = 3500;
+const TELEGRAM_MARKDOWN_PARSE_ERROR_PATTERN =
+ /can't parse (?:caption )?entities/i;
const TELEGRAM_MAX_POLLING_LIMIT = 100;
const TELEGRAM_MIN_POLLING_LIMIT = 1;
const TELEGRAM_MIN_POLLING_TIMEOUT_SECONDS = 0;
@@ -233,6 +245,7 @@ export class TelegramAdapter
private pollingAbortController: AbortController | null = null;
private pollingTask: Promise | null = null;
private pollingActive = false;
+ private nextDraftId = Math.max(1, Date.now() % 2_147_483_647);
get botUserId(): string | undefined {
return this._botUserId;
@@ -706,6 +719,14 @@ export class TelegramAdapter
const card = extractCard(message);
const replyMarkup = card ? cardToTelegramInlineKeyboard(card) : undefined;
const parseMode = this.resolveParseMode(message, card);
+ const plainText = truncateForTelegram(
+ convertEmojiPlaceholders(
+ this.renderPlainTextMessage(message, card),
+ "gchat"
+ ),
+ TELEGRAM_MESSAGE_LIMIT,
+ "plain"
+ );
const text = truncateForTelegram(
convertEmojiPlaceholders(
card
@@ -753,6 +774,7 @@ export class TelegramAdapter
parsedThread,
file,
text,
+ plainText,
replyMarkup,
parseMode
);
@@ -768,6 +790,7 @@ export class TelegramAdapter
parsedThread,
attachment,
text,
+ plainText,
replyMarkup,
parseMode
);
@@ -776,13 +799,23 @@ export class TelegramAdapter
throw new ValidationError("telegram", "Message text cannot be empty");
}
- rawMessage = await this.telegramFetch("sendMessage", {
- chat_id: parsedThread.chatId,
- message_thread_id: parsedThread.messageThreadId,
- text,
- reply_markup: replyMarkup,
- parse_mode: toBotApiParseMode(parseMode),
- });
+ rawMessage = await this.withTelegramMarkdownFallback(
+ parseMode,
+ (resolvedParseMode, resolvedText) =>
+ this.telegramFetch("sendMessage", {
+ chat_id: parsedThread.chatId,
+ message_thread_id: parsedThread.messageThreadId,
+ text: resolvedText,
+ reply_markup: replyMarkup,
+ parse_mode: toBotApiParseMode(resolvedParseMode),
+ }),
+ {
+ initialText: text,
+ fallbackText: plainText,
+ method: "sendMessage",
+ threadId,
+ }
+ );
}
const resultingThreadId = this.encodeThreadId({
@@ -826,6 +859,14 @@ export class TelegramAdapter
const card = extractCard(message);
const replyMarkup = card ? cardToTelegramInlineKeyboard(card) : undefined;
const parseMode = this.resolveParseMode(message, card);
+ const plainText = truncateForTelegram(
+ convertEmojiPlaceholders(
+ this.renderPlainTextMessage(message, card),
+ "gchat"
+ ),
+ TELEGRAM_MESSAGE_LIMIT,
+ "plain"
+ );
const text = truncateForTelegram(
convertEmojiPlaceholders(
card
@@ -843,14 +884,22 @@ export class TelegramAdapter
throw new ValidationError("telegram", "Message text cannot be empty");
}
- const result = await this.telegramFetch(
- "editMessageText",
+ const result = await this.withTelegramMarkdownFallback(
+ parseMode,
+ (resolvedParseMode, resolvedText) =>
+ this.telegramFetch("editMessageText", {
+ chat_id: chatId,
+ message_id: telegramMessageId,
+ text: resolvedText,
+ reply_markup: replyMarkup ?? emptyTelegramInlineKeyboard(),
+ parse_mode: toBotApiParseMode(resolvedParseMode),
+ }),
{
- chat_id: chatId,
- message_id: telegramMessageId,
- text,
- reply_markup: replyMarkup ?? emptyTelegramInlineKeyboard(),
- parse_mode: toBotApiParseMode(parseMode),
+ initialText: text,
+ fallbackText: plainText,
+ messageId,
+ method: "editMessageText",
+ threadId,
}
);
@@ -955,6 +1004,293 @@ export class TelegramAdapter
});
}
+ async stream(
+ threadId: string,
+ textStream: AsyncIterable,
+ options?: StreamOptions
+ ): Promise<
+ RawMessage | StreamResult | null
+ > {
+ if (!this.isDM(threadId)) {
+ return null;
+ }
+
+ const parsedThread = this.resolveThreadId(threadId);
+ const updateIntervalMs = this.clampInteger(
+ options?.updateIntervalMs,
+ TELEGRAM_DEFAULT_STREAM_UPDATE_INTERVAL_MS,
+ 0,
+ Number.MAX_SAFE_INTEGER
+ );
+
+ let renderer = new StreamingMarkdownRenderer();
+ let segmentText = "";
+ let draftId = this.createDraftId();
+ let lastDraftText = "";
+ let lastFlushAt = 0;
+ let draftStreamingEnabled = true;
+ let streamUsesMarkdown = true;
+ let segmentUsesMarkdown = true;
+ const postedSegments: StreamResult["messages"] = [];
+
+ const renderMarkdownForTelegram = (text: string): string =>
+ convertEmojiPlaceholders(
+ this.formatConverter.fromMarkdown(text),
+ "gchat"
+ );
+
+ const renderMarkdownText = (text: string): string =>
+ truncateForTelegram(
+ renderMarkdownForTelegram(text),
+ TELEGRAM_MESSAGE_LIMIT,
+ "MarkdownV2"
+ );
+
+ const renderPlainText = (text: string): string =>
+ truncateForTelegram(
+ this.resolveTelegramFallbackText(text, markdownToPlainText(text)),
+ TELEGRAM_MESSAGE_LIMIT,
+ "plain"
+ );
+
+ const isMarkdownSegmentWithinLimit = (text: string): boolean => {
+ const rendered = renderMarkdownForTelegram(text);
+ return (
+ rendered.trim().length > 0 &&
+ truncateForTelegram(rendered, TELEGRAM_MESSAGE_LIMIT, "MarkdownV2") ===
+ rendered
+ );
+ };
+
+ const getCommittedPrefixFor = (text: string): string => {
+ const candidateRenderer = new StreamingMarkdownRenderer();
+ candidateRenderer.push(text);
+ return candidateRenderer.getCommittedMarkdownPrefix();
+ };
+
+ const findMarkdownSegmentPrefixWithinLimit = (text: string): string => {
+ if (!text.trim()) {
+ return "";
+ }
+
+ const renderedLength = renderMarkdownForTelegram(text).length;
+ let candidateLength = text.length;
+
+ if (renderedLength > TELEGRAM_MESSAGE_LIMIT) {
+ candidateLength = Math.max(
+ 1,
+ Math.min(
+ text.length - 1,
+ Math.floor((text.length * TELEGRAM_MESSAGE_LIMIT) / renderedLength)
+ )
+ );
+ }
+
+ while (candidateLength > 0) {
+ const candidate = getCommittedPrefixFor(text.slice(0, candidateLength));
+ if (candidate.trim() && isMarkdownSegmentWithinLimit(candidate)) {
+ return candidate;
+ }
+
+ const nextLength = Math.floor(candidateLength * 0.9);
+ candidateLength =
+ nextLength < candidateLength ? nextLength : candidateLength - 1;
+ }
+
+ return "";
+ };
+
+ const resetSegment = (nextText = ""): void => {
+ renderer = new StreamingMarkdownRenderer();
+ segmentText = "";
+ draftId = this.createDraftId();
+ lastDraftText = "";
+ lastFlushAt = 0;
+ segmentUsesMarkdown = streamUsesMarkdown;
+
+ if (nextText) {
+ renderer.push(nextText);
+ segmentText = nextText;
+ }
+ };
+
+ const postSegment = async (
+ text: string,
+ useMarkdown: boolean
+ ): Promise => {
+ if (!text.trim()) {
+ return;
+ }
+
+ const postable: AdapterPostableMessage =
+ useMarkdown && isMarkdownSegmentWithinLimit(text)
+ ? { markdown: text }
+ : this.resolveTelegramFallbackText(text, markdownToPlainText(text));
+ const message = await this.postMessage(threadId, postable);
+
+ postedSegments.push({
+ message,
+ postable,
+ });
+ };
+
+ const flushDraft = async (sourceText = segmentText): Promise => {
+ if (!draftStreamingEnabled) {
+ return;
+ }
+
+ const draftText = segmentUsesMarkdown
+ ? renderMarkdownText(
+ sourceText === segmentText ? renderer.render() : sourceText
+ )
+ : renderPlainText(sourceText);
+ if (!draftText.trim() || draftText === lastDraftText) {
+ return;
+ }
+
+ try {
+ if (segmentUsesMarkdown) {
+ await this.telegramFetch("sendMessageDraft", {
+ chat_id: parsedThread.chatId,
+ message_thread_id: parsedThread.messageThreadId,
+ draft_id: draftId,
+ text: draftText,
+ parse_mode: toBotApiParseMode("MarkdownV2"),
+ });
+ } else {
+ await this.telegramFetch("sendMessageDraft", {
+ chat_id: parsedThread.chatId,
+ message_thread_id: parsedThread.messageThreadId,
+ draft_id: draftId,
+ text: draftText,
+ });
+ }
+ lastDraftText = draftText;
+ lastFlushAt = Date.now();
+ } catch (error) {
+ if (segmentUsesMarkdown && this.isTelegramMarkdownParseError(error)) {
+ streamUsesMarkdown = false;
+ segmentUsesMarkdown = false;
+
+ const plainDraftText = renderPlainText(sourceText);
+ if (!plainDraftText.trim()) {
+ draftStreamingEnabled = false;
+ return;
+ }
+
+ try {
+ await this.telegramFetch("sendMessageDraft", {
+ chat_id: parsedThread.chatId,
+ message_thread_id: parsedThread.messageThreadId,
+ draft_id: draftId,
+ text: plainDraftText,
+ });
+ lastDraftText = plainDraftText;
+ lastFlushAt = Date.now();
+ } catch (retryError) {
+ draftStreamingEnabled = false;
+ this.logger.warn("Telegram draft streaming update failed", {
+ error: String(retryError),
+ threadId,
+ });
+ }
+ return;
+ }
+
+ draftStreamingEnabled = false;
+ this.logger.warn("Telegram draft streaming update failed", {
+ error: String(error),
+ threadId,
+ });
+ }
+ };
+
+ const appendText = async (text: string): Promise => {
+ let remaining = text;
+
+ while (remaining.length > 0) {
+ const available =
+ TELEGRAM_STREAM_SEGMENT_LIMIT - segmentText.length || 0;
+ const nextSlice = available > 0 ? remaining.slice(0, available) : "";
+
+ if (!nextSlice) {
+ await flushDraft();
+ await postSegment(segmentText, segmentUsesMarkdown);
+ resetSegment();
+ continue;
+ }
+
+ renderer.push(nextSlice);
+ segmentText += nextSlice;
+ remaining = remaining.slice(nextSlice.length);
+
+ if (Date.now() - lastFlushAt >= updateIntervalMs) {
+ await flushDraft();
+ }
+
+ const renderedOverflow =
+ segmentUsesMarkdown &&
+ segmentText.length > Math.floor(TELEGRAM_MESSAGE_LIMIT / 2) &&
+ !isMarkdownSegmentWithinLimit(segmentText);
+
+ if (
+ segmentText.length >= TELEGRAM_STREAM_SEGMENT_LIMIT ||
+ renderedOverflow
+ ) {
+ const committedPrefix = segmentUsesMarkdown
+ ? renderer.getCommittedMarkdownPrefix()
+ : "";
+ const markdownPrefix =
+ segmentUsesMarkdown && isMarkdownSegmentWithinLimit(committedPrefix)
+ ? committedPrefix
+ : findMarkdownSegmentPrefixWithinLimit(committedPrefix);
+
+ if (segmentUsesMarkdown && markdownPrefix.trim()) {
+ const overflow = segmentText.slice(markdownPrefix.length);
+ await flushDraft(markdownPrefix);
+ await postSegment(markdownPrefix, true);
+ resetSegment(overflow);
+ continue;
+ }
+
+ if (segmentUsesMarkdown) {
+ streamUsesMarkdown = false;
+ segmentUsesMarkdown = false;
+ }
+
+ await flushDraft();
+ await postSegment(segmentText, segmentUsesMarkdown);
+ resetSegment();
+ }
+ }
+ };
+
+ for await (const chunk of textStream) {
+ if (typeof chunk === "string") {
+ await appendText(chunk);
+ } else if (chunk.type === "markdown_text") {
+ await appendText(chunk.text);
+ }
+ }
+
+ await flushDraft();
+
+ if (segmentText.trim()) {
+ await postSegment(segmentText, segmentUsesMarkdown);
+ }
+
+ if (postedSegments.length === 0) {
+ throw new ValidationError(
+ "telegram",
+ "Telegram streaming requires text content"
+ );
+ }
+
+ return postedSegments.length === 1
+ ? postedSegments[0].message
+ : { messages: postedSegments };
+ }
+
async fetchMessages(
threadId: string,
options: FetchOptions = {}
@@ -1308,11 +1644,46 @@ export class TelegramAdapter
mimeType?: string;
},
text: string,
+ plainText: string,
replyMarkup?: TelegramInlineKeyboardMarkup,
parseMode: TelegramParseMode = "plain"
): Promise {
const buffer = await this.toTelegramBuffer(file.data);
+ return this.withTelegramMarkdownFallback(
+ parseMode,
+ (resolvedParseMode, resolvedText) =>
+ this.telegramFetch(
+ "sendDocument",
+ this.createTelegramDocumentFormData(
+ thread,
+ file,
+ buffer,
+ resolvedText,
+ replyMarkup,
+ resolvedParseMode
+ )
+ ),
+ {
+ initialText: text,
+ fallbackText: plainText,
+ method: "sendDocument",
+ threadId: this.encodeThreadId(thread),
+ }
+ );
+ }
+
+ private createTelegramDocumentFormData(
+ thread: TelegramThreadId,
+ file: {
+ filename: string;
+ mimeType?: string;
+ },
+ buffer: Buffer,
+ text: string,
+ replyMarkup?: TelegramInlineKeyboardMarkup,
+ parseMode: TelegramParseMode = "plain"
+ ): FormData {
const formData = new FormData();
formData.append("chat_id", thread.chatId);
if (typeof thread.messageThreadId === "number") {
@@ -1338,13 +1709,14 @@ export class TelegramAdapter
formData.append("reply_markup", JSON.stringify(replyMarkup));
}
- return this.telegramFetch("sendDocument", formData);
+ return formData;
}
protected async sendAttachment(
thread: TelegramThreadId,
attachment: Attachment,
text: string,
+ plainText: string,
replyMarkup?: TelegramInlineKeyboardMarkup,
parseMode: TelegramParseMode = "plain"
): Promise {
@@ -1360,81 +1732,97 @@ export class TelegramAdapter
);
}
- if (!data) {
- const payload: Record = {
- chat_id: thread.chatId,
- [upload.field]: attachment.url,
- };
+ const buffer = data ? await this.toTelegramBuffer(data) : undefined;
- if (typeof thread.messageThreadId === "number") {
- payload.message_thread_id = thread.messageThreadId;
- }
+ return this.withTelegramMarkdownFallback(
+ parseMode,
+ (resolvedParseMode, resolvedText) => {
+ if (!buffer) {
+ const payload: Record = {
+ chat_id: thread.chatId,
+ [upload.field]: attachment.url,
+ };
- if (text.trim()) {
- payload.caption = truncateForTelegram(
- text,
- TELEGRAM_CAPTION_LIMIT,
- parseMode
- );
- const botApiParseMode = toBotApiParseMode(parseMode);
- if (botApiParseMode) {
- payload.parse_mode = botApiParseMode;
- }
- }
+ if (typeof thread.messageThreadId === "number") {
+ payload.message_thread_id = thread.messageThreadId;
+ }
- if (attachment.type === "video") {
- if (Number.isInteger(attachment.width)) {
- payload.width = attachment.width;
- }
- if (Number.isInteger(attachment.height)) {
- payload.height = attachment.height;
- }
- }
+ if (resolvedText.trim()) {
+ payload.caption = truncateForTelegram(
+ resolvedText,
+ TELEGRAM_CAPTION_LIMIT,
+ resolvedParseMode
+ );
+ const botApiParseMode = toBotApiParseMode(resolvedParseMode);
+ if (botApiParseMode) {
+ payload.parse_mode = botApiParseMode;
+ }
+ }
- if (replyMarkup) {
- payload.reply_markup = replyMarkup;
- }
+ if (attachment.type === "video") {
+ if (Number.isInteger(attachment.width)) {
+ payload.width = attachment.width;
+ }
+ if (Number.isInteger(attachment.height)) {
+ payload.height = attachment.height;
+ }
+ }
- return this.telegramFetch(upload.method, payload);
- }
+ if (replyMarkup) {
+ payload.reply_markup = replyMarkup;
+ }
- const buffer = await this.toTelegramBuffer(data);
- const formData = new FormData();
+ return this.telegramFetch(upload.method, payload);
+ }
- formData.append("chat_id", thread.chatId);
- if (typeof thread.messageThreadId === "number") {
- formData.append("message_thread_id", String(thread.messageThreadId));
- }
+ const formData = new FormData();
- if (text.trim()) {
- formData.append(
- "caption",
- truncateForTelegram(text, TELEGRAM_CAPTION_LIMIT, parseMode)
- );
- const botApiParseMode = toBotApiParseMode(parseMode);
- if (botApiParseMode) {
- formData.append("parse_mode", botApiParseMode);
- }
- }
+ formData.append("chat_id", thread.chatId);
+ if (typeof thread.messageThreadId === "number") {
+ formData.append("message_thread_id", String(thread.messageThreadId));
+ }
- if (attachment.type === "video") {
- if (Number.isInteger(attachment.width)) {
- formData.append("width", String(attachment.width));
- }
- if (Number.isInteger(attachment.height)) {
- formData.append("height", String(attachment.height));
- }
- }
+ if (resolvedText.trim()) {
+ formData.append(
+ "caption",
+ truncateForTelegram(
+ resolvedText,
+ TELEGRAM_CAPTION_LIMIT,
+ resolvedParseMode
+ )
+ );
+ const botApiParseMode = toBotApiParseMode(resolvedParseMode);
+ if (botApiParseMode) {
+ formData.append("parse_mode", botApiParseMode);
+ }
+ }
- const blob = new Blob([new Uint8Array(buffer)], {
- type: attachment.mimeType ?? "application/octet-stream",
- });
- formData.append(upload.field, blob, attachment.name ?? "attachment");
- if (replyMarkup) {
- formData.append("reply_markup", JSON.stringify(replyMarkup));
- }
+ if (attachment.type === "video") {
+ if (Number.isInteger(attachment.width)) {
+ formData.append("width", String(attachment.width));
+ }
+ if (Number.isInteger(attachment.height)) {
+ formData.append("height", String(attachment.height));
+ }
+ }
- return this.telegramFetch(upload.method, formData);
+ const blob = new Blob([new Uint8Array(buffer)], {
+ type: attachment.mimeType ?? "application/octet-stream",
+ });
+ formData.append(upload.field, blob, attachment.name ?? "attachment");
+ if (replyMarkup) {
+ formData.append("reply_markup", JSON.stringify(replyMarkup));
+ }
+
+ return this.telegramFetch(upload.method, formData);
+ },
+ {
+ initialText: text,
+ fallbackText: plainText,
+ method: upload.method,
+ threadId: this.encodeThreadId(thread),
+ }
+ );
}
protected async toTelegramBuffer(
@@ -1550,6 +1938,12 @@ export class TelegramAdapter
return match ? Number.parseInt(match[1], 10) : 0;
}
+ protected createDraftId(): number {
+ this.nextDraftId =
+ this.nextDraftId >= 2_147_483_647 ? 1 : this.nextDraftId + 1;
+ return this.nextDraftId;
+ }
+
protected resolveThreadId(value: string): TelegramThreadId {
if (value.startsWith("telegram:")) {
return this.decodeThreadId(value);
@@ -1724,6 +2118,38 @@ export class TelegramAdapter
return "MarkdownV2";
}
+ protected renderPlainTextMessage(
+ message: AdapterPostableMessage,
+ card: ReturnType
+ ): string {
+ if (card) {
+ return cardToFallbackText(card);
+ }
+ if (typeof message === "string") {
+ return message;
+ }
+ if ("raw" in message) {
+ return message.raw;
+ }
+ if ("markdown" in message) {
+ return this.resolveTelegramFallbackText(
+ message.markdown,
+ markdownToPlainText(message.markdown)
+ );
+ }
+ if ("ast" in message) {
+ return toPlainText(message.ast);
+ }
+ return this.formatConverter.renderPostable(message);
+ }
+
+ protected resolveTelegramFallbackText(
+ originalText: string,
+ fallbackText: string
+ ): string {
+ return fallbackText.trim() ? fallbackText : originalText;
+ }
+
protected toTelegramReaction(
emoji: EmojiValue | string
): TelegramReactionType {
@@ -1999,6 +2425,53 @@ export class TelegramAdapter
`${description} (status ${status}, error ${errorCode})`
);
}
+
+ protected async withTelegramMarkdownFallback(
+ parseMode: TelegramParseMode,
+ operation: (parseMode: TelegramParseMode, text: string) => Promise,
+ context: {
+ initialText: string;
+ fallbackText: string;
+ method: string;
+ messageId?: string;
+ threadId?: string;
+ }
+ ): Promise {
+ try {
+ return await operation(parseMode, context.initialText);
+ } catch (error) {
+ if (
+ parseMode !== "MarkdownV2" ||
+ !this.isTelegramMarkdownParseError(error)
+ ) {
+ throw error;
+ }
+
+ this.logger.warn(
+ "Telegram markdown parse failed; retrying without parse mode",
+ {
+ error: String(error),
+ ...context,
+ }
+ );
+
+ return operation(
+ "plain",
+ this.resolveTelegramFallbackText(
+ context.initialText,
+ context.fallbackText
+ )
+ );
+ }
+ }
+
+ protected isTelegramMarkdownParseError(error: unknown): boolean {
+ return (
+ error instanceof ValidationError &&
+ error.adapter === "telegram" &&
+ TELEGRAM_MARKDOWN_PARSE_ERROR_PATTERN.test(error.message)
+ );
+ }
}
export function createTelegramAdapter(
diff --git a/packages/adapter-telegram/vitest.config.ts b/packages/adapter-telegram/vitest.config.ts
index edc2d946..f9fe2900 100644
--- a/packages/adapter-telegram/vitest.config.ts
+++ b/packages/adapter-telegram/vitest.config.ts
@@ -1,6 +1,12 @@
+import { resolve } from "node:path";
import { defineProject } from "vitest/config";
export default defineProject({
+ resolve: {
+ alias: {
+ chat: resolve(import.meta.dirname, "../chat/src/index.ts"),
+ },
+ },
test: {
globals: true,
environment: "node",
diff --git a/packages/chat/src/index.ts b/packages/chat/src/index.ts
index 14a7c7f7..6f477eae 100644
--- a/packages/chat/src/index.ts
+++ b/packages/chat/src/index.ts
@@ -398,6 +398,8 @@ export type {
StreamChunk,
StreamEvent,
StreamOptions,
+ StreamResult,
+ StreamSegment,
SubscribedMessageHandler,
TaskUpdateChunk,
Thread,
diff --git a/packages/chat/src/streaming-markdown.test.ts b/packages/chat/src/streaming-markdown.test.ts
index cddced85..b42a50c7 100644
--- a/packages/chat/src/streaming-markdown.test.ts
+++ b/packages/chat/src/streaming-markdown.test.ts
@@ -139,6 +139,67 @@ describe("StreamingMarkdownRenderer", () => {
expect(r.render()).toBe("Hello world");
});
+ it("returns the clean committed prefix before an unclosed inline marker", () => {
+ const r = new StreamingMarkdownRenderer();
+ r.push("Hello **wor");
+
+ expect(r.getCommittedMarkdownPrefix()).toBe("Hello ");
+ });
+
+ it("trims dangling trailing markers with no content", () => {
+ const r = new StreamingMarkdownRenderer();
+ r.push("Hello **");
+
+ expect(r.getCommittedMarkdownPrefix()).toBe("Hello ");
+ });
+
+ it("preserves valid closing bold markers in the committed prefix", () => {
+ const r = new StreamingMarkdownRenderer();
+ r.push("Hello **world**");
+
+ expect(r.getCommittedMarkdownPrefix()).toBe("Hello **world**");
+ });
+
+ it("preserves valid closing code markers in the committed prefix", () => {
+ const r = new StreamingMarkdownRenderer();
+ r.push("Use `code` and then:\n```ts\nconst x = 1;\n```\n");
+
+ expect(r.getCommittedMarkdownPrefix()).toBe(
+ "Use `code` and then:\n```ts\nconst x = 1;\n```\n"
+ );
+ });
+
+ it("preserves the full committed prefix for an exact-limit clean markdown segment", () => {
+ const r = new StreamingMarkdownRenderer();
+ const text = `${"a".repeat(3494)}**ok**`;
+ r.push(text);
+
+ expect(r.getCommittedMarkdownPrefix()).toBe(text);
+ });
+
+ it("preserves an escaped trailing bracket in the committed prefix", () => {
+ const r = new StreamingMarkdownRenderer();
+ r.push("Telegram literal \\[");
+
+ expect(r.getCommittedMarkdownPrefix()).toBe("Telegram literal \\[");
+ });
+
+ it("returns the committed prefix before an open code fence", () => {
+ const r = new StreamingMarkdownRenderer();
+ r.push("Intro\n```ts\nconst x = 1;");
+
+ expect(r.getCommittedMarkdownPrefix()).toBe("Intro\n");
+ });
+
+ it("returns the prefix before the last unmatched code fence", () => {
+ const r = new StreamingMarkdownRenderer();
+ r.push("Intro\n```ts\nconst a = 1;\n```\nBetween\n```js\nconst b = 2;");
+
+ expect(r.getCommittedMarkdownPrefix()).toBe(
+ "Intro\n```ts\nconst a = 1;\n```\nBetween\n"
+ );
+ });
+
it("should handle table header without trailing newline (incomplete line)", () => {
const r = new StreamingMarkdownRenderer();
r.push("Text\n\n| A | B |");
diff --git a/packages/chat/src/streaming-markdown.ts b/packages/chat/src/streaming-markdown.ts
index 3f6633ac..d1b9415d 100644
--- a/packages/chat/src/streaming-markdown.ts
+++ b/packages/chat/src/streaming-markdown.ts
@@ -145,6 +145,27 @@ export class StreamingMarkdownRenderer {
return findCleanPrefix(wrapped);
}
+ /**
+ * Get the longest source prefix that can be finalized as a standalone,
+ * markdown-safe message.
+ *
+ * Unlike `render()`, this never synthesizes missing closing markers. The
+ * returned string is always a prefix of the original accumulated source,
+ * which makes it safe to split a long stream into multiple persisted
+ * messages without duplicating or dropping source text.
+ */
+ getCommittedMarkdownPrefix(): string {
+ let committable = this.accumulated;
+
+ if (!this.finished) {
+ committable = this.isAccumulatedInsideFence()
+ ? getPrefixBeforeTrailingOpenFence(this.accumulated)
+ : getCommittablePrefix(this.accumulated);
+ }
+
+ return trimTrailingUnmatchedMarkerOpeners(findCleanPrefix(committable));
+ }
+
/** Raw accumulated text (no remend, no buffering). For the final edit. */
getText(): string {
return this.accumulated;
@@ -177,7 +198,8 @@ const INLINE_MARKER_CHARS = new Set(["*", "~", "`", "["]);
* from otherwise clean text (which is harmless for streaming).
*/
function isClean(text: string): boolean {
- return remend(text).length <= text.length;
+ const sanitized = sanitizeEscapedLinkOpeners(text);
+ return remend(sanitized).length <= sanitized.length;
}
/**
@@ -196,8 +218,12 @@ function findCleanPrefix(text: string): string {
for (let i = text.length - 1; i >= 0; i--) {
if (INLINE_MARKER_CHARS.has(text[i])) {
+ if (isEscaped(text, i)) {
+ continue;
+ }
+
// Group consecutive same characters (e.g., ** or ~~)
- while (i > 0 && text[i - 1] === text[i]) {
+ while (i > 0 && text[i - 1] === text[i] && !isEscaped(text, i - 1)) {
i--;
}
const candidate = text.slice(0, i);
@@ -297,6 +323,119 @@ function getCommittablePrefix(text: string): string {
return result;
}
+function getPrefixBeforeTrailingOpenFence(text: string): string {
+ let offset = 0;
+ let insideFence = false;
+ let lastOpenOffset = -1;
+
+ for (const line of text.split("\n")) {
+ const lineLengthWithNewline = offset + line.length < text.length ? 1 : 0;
+ const trimmed = line.trimStart();
+
+ if (trimmed.startsWith("```") || trimmed.startsWith("~~~")) {
+ if (insideFence) {
+ insideFence = false;
+ lastOpenOffset = -1;
+ } else {
+ insideFence = true;
+ lastOpenOffset = offset;
+ }
+ }
+
+ offset += line.length + lineLengthWithNewline;
+ }
+
+ return insideFence && lastOpenOffset >= 0
+ ? text.slice(0, lastOpenOffset)
+ : text;
+}
+
+function trimTrailingUnmatchedMarkerOpeners(text: string): string {
+ let result = text;
+
+ while (true) {
+ if (result.endsWith("**") && hasOddUnescapedTokenCount(result, "**")) {
+ result = result.slice(0, -2);
+ continue;
+ }
+
+ if (result.endsWith("~~") && hasOddUnescapedTokenCount(result, "~~")) {
+ result = result.slice(0, -2);
+ continue;
+ }
+
+ if (
+ result.endsWith("*") &&
+ !result.endsWith("**") &&
+ hasOddUnescapedCharCount(result, "*")
+ ) {
+ result = result.slice(0, -1);
+ continue;
+ }
+
+ if (result.endsWith("`") && hasOddUnescapedCharCount(result, "`")) {
+ result = result.slice(0, -1);
+ continue;
+ }
+
+ if (result.endsWith("[") && !isEscaped(result, result.length - 1)) {
+ result = result.slice(0, -1);
+ continue;
+ }
+
+ return result;
+ }
+}
+
+function hasOddUnescapedTokenCount(text: string, token: string): boolean {
+ let count = 0;
+
+ for (let i = 0; i <= text.length - token.length; ) {
+ if (text.slice(i, i + token.length) === token && !isEscaped(text, i)) {
+ count++;
+ i += token.length;
+ continue;
+ }
+
+ i++;
+ }
+
+ return count % 2 === 1;
+}
+
+function hasOddUnescapedCharCount(text: string, char: string): boolean {
+ let count = 0;
+
+ for (let i = 0; i < text.length; i++) {
+ if (text[i] === char && !isEscaped(text, i)) {
+ count++;
+ }
+ }
+
+ return count % 2 === 1;
+}
+
+function isEscaped(text: string, index: number): boolean {
+ let backslashCount = 0;
+
+ for (let i = index - 1; i >= 0 && text[i] === "\\"; i--) {
+ backslashCount++;
+ }
+
+ return backslashCount % 2 === 1;
+}
+
+function sanitizeEscapedLinkOpeners(text: string): string {
+ let result = "";
+
+ for (let i = 0; i < text.length; i++) {
+ const char = text[i];
+ result += char === "[" && isEscaped(text, i) ? "(" : char;
+ }
+
+ return result;
+}
+
/**
* Wraps confirmed GFM table blocks in code fences for append-only streaming.
*
diff --git a/packages/chat/src/thread.test.ts b/packages/chat/src/thread.test.ts
index 455fbc98..cc54f13b 100644
--- a/packages/chat/src/thread.test.ts
+++ b/packages/chat/src/thread.test.ts
@@ -11,6 +11,7 @@ import {
import { Plan } from "./plan";
import { StreamingPlan } from "./streaming-plan";
import { ThreadImpl } from "./thread";
+import { ThreadHistoryCache } from "./thread-history";
import type { Adapter, ScheduledMessage, StreamChunk } from "./types";
import { NotImplementedError } from "./types";
@@ -217,6 +218,73 @@ describe("ThreadImpl", () => {
expect(mockAdapter.postMessage).not.toHaveBeenCalled();
});
+ it("should append segmented native stream messages individually to history", async () => {
+ const threadHistory = new ThreadHistoryCache(mockState);
+ thread = new ThreadImpl({
+ id: "slack:C123:1234.5678",
+ adapter: mockAdapter,
+ channelId: "C123",
+ stateAdapter: mockState,
+ threadHistory,
+ });
+
+ mockAdapter.stream = vi.fn().mockResolvedValue({
+ messages: [
+ {
+ message: {
+ id: "msg-1",
+ threadId: "slack:C123:1234.5678",
+ raw: {},
+ },
+ postable: { markdown: "Hello " },
+ },
+ {
+ message: {
+ id: "msg-2",
+ threadId: "slack:C123:1234.5678",
+ raw: {},
+ },
+ postable: { markdown: "World" },
+ },
+ ],
+ });
+
+ const textStream = createTextStream(["Hello", " ", "World"]);
+ const result = await thread.post(textStream);
+ const stored = await threadHistory.getMessages("slack:C123:1234.5678");
+
+ expect(result.id).toBe("msg-2");
+ expect(result.text).toBe("World");
+ expect(result.segments?.map((segment) => segment.id)).toEqual([
+ "msg-1",
+ "msg-2",
+ ]);
+ expect(stored.map((message) => message.id)).toEqual(["msg-1", "msg-2"]);
+ expect(stored.map((message) => message.text)).toEqual(["Hello", "World"]);
+ });
+
+ it("should fall back when adapter.stream returns null", async () => {
+ mockAdapter.stream = vi.fn().mockResolvedValue(null);
+
+ const textStream = createTextStream(["Hello", " ", "World"]);
+ await thread.post(textStream);
+
+ expect(mockAdapter.stream).toHaveBeenCalledWith(
+ "slack:C123:1234.5678",
+ expect.any(Object),
+ expect.objectContaining({ updateIntervalMs: 500 })
+ );
+ expect(mockAdapter.postMessage).toHaveBeenCalledWith(
+ "slack:C123:1234.5678",
+ "..."
+ );
+ expect(mockAdapter.editMessage).toHaveBeenLastCalledWith(
+ "slack:C123:1234.5678",
+ "msg-1",
+ { markdown: "Hello World" }
+ );
+ });
+
it("should fall back to post+edit when adapter has no native streaming", async () => {
// Ensure no stream method
mockAdapter.stream = undefined;
diff --git a/packages/chat/src/thread.ts b/packages/chat/src/thread.ts
index b0cad54a..0af4c1f8 100644
--- a/packages/chat/src/thread.ts
+++ b/packages/chat/src/thread.ts
@@ -29,12 +29,14 @@ import type {
PostableMessage,
PostableObject,
PostEphemeralOptions,
+ RawMessage,
ScheduledMessage,
SentMessage,
StateAdapter,
StreamChunk,
StreamEvent,
StreamOptions,
+ StreamResult,
Thread,
} from "./types";
import { NotImplementedError, THREAD_STATE_TTL_MS } from "./types";
@@ -111,6 +113,12 @@ function isAsyncIterable(
);
}
+function isStreamResult(
+ value: RawMessage | StreamResult | null
+): value is StreamResult {
+ return value !== null && typeof value === "object" && "messages" in value;
+}
+
export class ThreadImpl>
implements Thread
{
@@ -604,7 +612,10 @@ export class ThreadImpl>
// Normalize: handles plain strings, AI SDK fullStream events, and StreamChunk objects
const textStream = fromFullStream(rawStream);
// Build streaming options from current message context + caller options
- const options: StreamOptions = { ...callerOptions };
+ const options: StreamOptions = {
+ updateIntervalMs: this._streamingUpdateIntervalMs,
+ ...callerOptions,
+ };
if (this._currentMessage) {
options.recipientUserId = this._currentMessage.author.userId;
// recipientTeamId is only consumed by the Slack adapter; other adapters
@@ -642,17 +653,43 @@ export class ThreadImpl>
};
const raw = await this.adapter.stream(this.id, wrappedStream, options);
- const sent = this.createSentMessage(
- raw.id,
- { markdown: accumulated },
- raw.threadId
- );
+ if (raw) {
+ if (isStreamResult(raw)) {
+ const sentSegments = raw.messages.map((segment) =>
+ this.createSentMessage(
+ segment.message.id,
+ segment.postable,
+ segment.message.threadId
+ )
+ );
+
+ if (this._threadHistory) {
+ for (const segment of sentSegments) {
+ await this._threadHistory.append(this.id, new Message(segment));
+ }
+ }
- if (this._threadHistory) {
- await this._threadHistory.append(this.id, new Message(sent));
- }
+ const lastSent = sentSegments.at(-1);
+ if (!lastSent) {
+ throw new Error("Segmented stream completed without messages");
+ }
+
+ lastSent.segments = sentSegments;
+ return lastSent;
+ }
+
+ const sent = this.createSentMessage(
+ raw.id,
+ { markdown: accumulated },
+ raw.threadId
+ );
- return sent;
+ if (this._threadHistory) {
+ await this._threadHistory.append(this.id, new Message(sent));
+ }
+
+ return sent;
+ }
}
// Fallback: post + edit with throttling.
diff --git a/packages/chat/src/types.ts b/packages/chat/src/types.ts
index 31e2a380..d0cdaedb 100644
--- a/packages/chat/src/types.ts
+++ b/packages/chat/src/types.ts
@@ -529,6 +529,8 @@ export interface Adapter {
*
* The adapter consumes the async iterable and handles the entire streaming lifecycle.
* Only available on platforms with native streaming support (e.g., Slack).
+ * Adapters may return `null` before consuming any chunks to delegate back to
+ * Chat SDK's built-in post+edit fallback for the current thread.
*
* The stream can yield plain strings (text chunks) or {@link StreamChunk} objects
* for rich content like task progress cards. Adapters that don't support structured
@@ -537,13 +539,14 @@ export interface Adapter {
* @param threadId - The thread to stream to
* @param textStream - Async iterable of text chunks or structured StreamChunk objects
* @param options - Platform-specific streaming options
- * @returns The raw message after streaming completes
+ * @returns The raw message after streaming completes, a segmented stream result,
+ * or `null` to use core fallback
*/
stream?(
threadId: string,
textStream: AsyncIterable,
options?: StreamOptions
- ): Promise>;
+ ): Promise | StreamResult | null>;
/** Bot username (can override global userName) */
readonly userName: string;
}
@@ -1420,6 +1423,26 @@ export interface RawMessage {
threadId: string;
}
+/**
+ * One persisted message emitted by a native streaming adapter.
+ *
+ * Adapters can return multiple of these when a single logical stream must be
+ * split into more than one platform message (for example, Telegram's 4096-char
+ * message limit in DM draft streaming).
+ */
+export interface StreamSegment {
+ message: RawMessage;
+ postable: AdapterPostableMessage;
+}
+
+/**
+ * Result of a native streaming operation that finalized into multiple
+ * persisted platform messages.
+ */
+export interface StreamResult {
+ messages: StreamSegment[];
+}
+
export interface Author {
/** Display name */
fullName: string;
@@ -1474,6 +1497,12 @@ export interface SentMessage
): Promise>;
/** Remove a reaction from this message */
removeReaction(emoji: EmojiValue | string): Promise;
+ /**
+ * Actual persisted messages emitted by a segmented native stream, in
+ * chronological order. Present only when a single `thread.post(stream)`
+ * finalized into multiple platform messages.
+ */
+ segments?: SentMessage[];
}
// =============================================================================