From 115cb7cbdfe1cc18862ff08a3a1ca77ae09b52ea Mon Sep 17 00:00:00 2001 From: Tyler Wright Date: Sat, 11 Apr 2026 13:47:03 -0700 Subject: [PATCH 1/3] feat: add thread() + adapter.openThread() for explicit threading Adds `thread.thread(messageId?)` as a means to officially support creating threads and guarantee a threaded response in channel-supporting platforms without impacting those that don't support channels. Adds openThread?(scopeId, messageId) as an optional method on the Adapter interface, following the openDM pattern, that returns an existing or new threadId. --- packages/chat/src/thread.test.ts | 103 +++++++++++++++++++++++++++++++ packages/chat/src/thread.ts | 33 ++++++++++ packages/chat/src/types.ts | 49 +++++++++++++++ 3 files changed, 185 insertions(+) diff --git a/packages/chat/src/thread.test.ts b/packages/chat/src/thread.test.ts index 8f38819a..2cec6531 100644 --- a/packages/chat/src/thread.test.ts +++ b/packages/chat/src/thread.test.ts @@ -1630,6 +1630,109 @@ describe("ThreadImpl", () => { }); }); + describe("thread()", () => { + it("returns self when already in a thread (id !== channelId)", async () => { + const mockAdapter = createMockAdapter(); + const mockState = createMockState(); + + const thread = new ThreadImpl({ + id: "slack:C123:1234.5678", + adapter: mockAdapter, + channelId: "slack:C123", + stateAdapter: mockState, + currentMessage: createTestMessage("msg1", "hello"), + }); + + const result = await thread.thread(); + expect(result).toBe(thread); + }); + + it("returns self when adapter lacks openThread", async () => { + const mockAdapter = createMockAdapter(); + const mockState = createMockState(); + + // Channel-scoped (id === channelId) but no openThread on adapter + const thread = new ThreadImpl({ + id: "slack:C123", + adapter: mockAdapter, + channelId: "slack:C123", + stateAdapter: mockState, + currentMessage: createTestMessage("msg1", "hello"), + }); + + const result = await thread.thread(); + expect(result).toBe(thread); + }); + + it("calls adapter.openThread and returns new ThreadImpl when on a channel", async () => { + const mockAdapter = createMockAdapter(); + mockAdapter.openThread = vi + .fn() + .mockResolvedValue("slack:C123:1234.5678"); + const mockState = createMockState(); + + const thread = new ThreadImpl({ + id: "slack:C123", + adapter: mockAdapter, + channelId: "slack:C123", + stateAdapter: mockState, + currentMessage: createTestMessage("msg-ts-1", "hello"), + }); + + const result = await thread.thread(); + + expect(result).not.toBe(thread); + expect(result.id).toBe("slack:C123:1234.5678"); + expect(result.channelId).toBe("slack:C123"); + expect(mockAdapter.openThread).toHaveBeenCalledWith( + "slack:C123", + "msg-ts-1" + ); + }); + + it("uses explicit messageId when provided", async () => { + const mockAdapter = createMockAdapter(); + mockAdapter.openThread = vi + .fn() + .mockResolvedValue("slack:C123:9999.0000"); + const mockState = createMockState(); + + const thread = new ThreadImpl({ + id: "slack:C123", + adapter: mockAdapter, + channelId: "slack:C123", + stateAdapter: mockState, + currentMessage: createTestMessage("msg-ts-1", "hello"), + }); + + const result = await thread.thread("earlier-msg-id"); + + expect(result.id).toBe("slack:C123:9999.0000"); + expect(mockAdapter.openThread).toHaveBeenCalledWith( + "slack:C123", + "earlier-msg-id" + ); + }); + + it("returns self when no messageId and no currentMessage", async () => { + const mockAdapter = createMockAdapter(); + mockAdapter.openThread = vi.fn(); + const mockState = createMockState(); + + const thread = new ThreadImpl({ + id: "slack:C123", + adapter: mockAdapter, + channelId: "slack:C123", + stateAdapter: mockState, + // no currentMessage + }); + + const result = await thread.thread(); + expect(result).toBe(thread); + expect(mockAdapter.openThread).not.toHaveBeenCalled(); + }); + }); + describe("subscribe and unsubscribe", () => { let thread: ThreadImpl; let mockAdapter: Adapter; diff --git a/packages/chat/src/thread.ts b/packages/chat/src/thread.ts index 6e571ecb..4ad0b093 100644 --- a/packages/chat/src/thread.ts +++ b/packages/chat/src/thread.ts @@ -356,6 +356,39 @@ export class ThreadImpl> }; } + /** + * Get or create a thread from this scope. If already a thread, returns itself. + * If a channel, returns a new/existing thread anchored to the (optionally latest) message. + */ + async thread(messageId?: string): Promise> { + // Already in a thread, or adapter doesn't support threading + if (this.id !== this.channelId || !this.adapter.openThread) { + return this; + } + + const anchorMessageId = messageId ?? this._currentMessage?.id; + if (!anchorMessageId) { + return this; + } + + const threadId = await this.adapter.openThread(this.id, anchorMessageId); + + return new ThreadImpl({ + id: threadId, + adapter: this.adapter, + channelId: this.channelId, + stateAdapter: this._stateAdapter, + isDM: this.adapter.isDM?.(threadId) ?? false, + channelVisibility: this.adapter.getChannelVisibility?.(threadId) ?? this.channelVisibility, + isSubscribedContext: false, + currentMessage: this._currentMessage, + logger: this._logger, + streamingUpdateIntervalMs: this._streamingUpdateIntervalMs, + fallbackStreamingPlaceholderText: this._fallbackStreamingPlaceholderText, + messageHistory: this._messageHistory, + }); + } + async isSubscribed(): Promise { // Short-circuit if we know we're in a subscribed context if (this._isSubscribedContext) { diff --git a/packages/chat/src/types.ts b/packages/chat/src/types.ts index 4aa501ad..83b80b28 100644 --- a/packages/chat/src/types.ts +++ b/packages/chat/src/types.ts @@ -356,6 +356,24 @@ export interface Adapter { */ onThreadSubscribe?(threadId: string): Promise; + /** + * Open a thread for a message. + * If the message is already in a thread, returns that thread's ID. + * If the message is at channel root, creates or retrieves a thread + * anchored to it and returns the thread ID. + * + * @param scopeId - The scope (channel or thread) containing the message + * @param messageId - The message to open a thread for + * @returns The thread ID + * + * @example + * ```typescript + * const threadId = await adapter.openThread(scopeId, messageId); + * await adapter.postMessage(threadId, "Threaded reply"); + * ``` + */ + openThread?(scopeId: string, messageId: string): Promise; + /** * Open a direct message conversation with a user. * @@ -1132,6 +1150,37 @@ export interface Thread, TRawMessage = unknown> */ startTyping(status?: string): Promise; + /** + * Get or create a thread from this scope. + * + * If already in a thread, returns this thread. + * If on a channel, creates or retrieves a thread anchored to the specified + * message (or the current message if none is specified). + * If the platform doesn't support threading, returns this thread. + * + * @param messageId - Optional message ID to anchor the thread to. Defaults to + * the current message. Use this when the triggering message is not the one + * you want to thread from (e.g., threading from an earlier message). + * @returns A thread guaranteed to be a real thread (not a channel) + * + * @example + * ```typescript + * // Thread from the current message + * chat.onNewMention(async (thread, message) => { + * const t = await thread.thread(); + * await t.subscribe(); + * await t.post("Let's continue in a thread."); + * }); + * + * // Thread from a specific message + * chat.onNewMention(async (thread, message) => { + * const t = await thread.thread(someEarlierMessage.id); + * await t.post("Replying to that earlier message."); + * }); + * ``` + */ + thread(messageId?: string): Promise>; + /** * Subscribe to future messages in this thread. * From 7880afec83bed6acd66331aa26dbeba1e2f76efe Mon Sep 17 00:00:00 2001 From: Tyler Wright Date: Sat, 11 Apr 2026 13:55:24 -0700 Subject: [PATCH 2/3] feat(adapters): implement openThread on slack, discord, telegram, gchat, teams Add the newly supported openThread method to each adapter that supports thread creation/retrieval from a channel's messageId. GitHub, Linear, and WhatsApp omit openThread, thread() returns self. Removes Discord's auto-thread-creation on @mention, inconsistent with every other adapter. Removes Slack's auto-thread-creation on *all* @mentions and messages. --- packages/adapter-discord/src/index.test.ts | 128 +++++++++++++++++---- packages/adapter-discord/src/index.ts | 79 +++++++------ packages/adapter-gchat/src/index.ts | 19 +++ packages/adapter-slack/src/index.test.ts | 37 ++++++ packages/adapter-slack/src/index.ts | 28 ++++- packages/adapter-teams/src/index.ts | 22 ++++ packages/adapter-telegram/src/index.ts | 22 ++++ 7 files changed, 270 insertions(+), 65 deletions(-) diff --git a/packages/adapter-discord/src/index.test.ts b/packages/adapter-discord/src/index.test.ts index 41e5096e..20be07ef 100644 --- a/packages/adapter-discord/src/index.test.ts +++ b/packages/adapter-discord/src/index.test.ts @@ -1990,6 +1990,79 @@ describe("startTyping", () => { }); }); +// ============================================================================ +// openThread Tests +// ============================================================================ + +describe("openThread", () => { + const adapter = createDiscordAdapter({ + botToken: "test-token", + publicKey: testPublicKey, + applicationId: "test-app-id", + logger: mockLogger, + }); + + it("creates a thread and returns encoded thread ID", async () => { + const mockResponse = new Response( + JSON.stringify({ id: "new-thread-id", name: "Thread" }), + { status: 200, headers: { "Content-Type": "application/json" } } + ); + const spy = vi + .spyOn(adapter as any, "discordFetch") + .mockResolvedValue(mockResponse); + + const result = await adapter.openThread( + "discord:guild1:channel456", + "msg123" + ); + + expect(result).toBe("discord:guild1:channel456:new-thread-id"); + expect(spy).toHaveBeenCalledWith( + "/channels/channel456/messages/msg123/threads", + "POST", + expect.objectContaining({ auto_archive_duration: 1440 }) + ); + + spy.mockRestore(); + }); + + it("returns existing thread ID when already in a thread", async () => { + const spy = vi.spyOn(adapter as any, "discordFetch"); + + const result = await adapter.openThread( + "discord:guild1:channel456:existing-thread", + "msg123" + ); + + expect(result).toBe("discord:guild1:channel456:existing-thread"); + expect(spy).not.toHaveBeenCalled(); + + spy.mockRestore(); + }); + + it("recovers from 160004 (thread already exists) by reusing message ID", async () => { + const { NetworkError } = await import("@chat-adapter/shared"); + + const spy = vi + .spyOn(adapter as any, "discordFetch") + .mockRejectedValue( + new NetworkError( + "discord", + JSON.stringify({ code: 160004, message: "A thread has already been created for this message" }) + ) + ); + + const result = await adapter.openThread( + "discord:guild1:channel456", + "msg123" + ); + + expect(result).toBe("discord:guild1:channel456:msg123"); + + spy.mockRestore(); + }); +}); + // ============================================================================ // openDM Tests // ============================================================================ @@ -3385,7 +3458,14 @@ describe("handleForwardedMessage - thread handling", () => { fetchSpy.mockRestore(); }); - it("creates thread when mentioned and not in a thread", async () => { + // Previously, the Discord adapter auto-created a thread when the bot was + // @mentioned in a channel. This was inconsistent with other adapters (Slack, + // Telegram, Teams, etc.) which all dispatch channel mentions with the channel + // thread ID (channelId === threadId). The SDK contract expects channel messages + // to have channelId === threadId — auto-creating a thread broke this, causing + // issues with subscribe() (subscribing to the channel instead of a thread) and + // handler routing. Thread creation is now explicit via thread.thread(). + it("does not auto-create thread when mentioned in a channel", async () => { const adapter = createDiscordAdapter({ botToken: "test-token", publicKey: testPublicKey, @@ -3401,14 +3481,7 @@ describe("handleForwardedMessage - thread handling", () => { processReaction: vi.fn(), } as unknown as ChatInstance); - const fetchSpy = vi - .spyOn(adapter as any, "discordFetch") - .mockResolvedValue( - new Response( - JSON.stringify({ id: "new-thread-id", name: "New Thread" }), - { status: 200, headers: { "Content-Type": "application/json" } } - ) - ); + const fetchSpy = vi.spyOn(adapter as any, "discordFetch"); const body = JSON.stringify({ type: "GATEWAY_MESSAGE_CREATE", @@ -3441,11 +3514,18 @@ describe("handleForwardedMessage - thread handling", () => { await adapter.handleWebhook(request); - // Should have created a thread - expect(fetchSpy).toHaveBeenCalledWith( + // Should NOT auto-create a thread — thread creation is now explicit via thread.thread() + expect(fetchSpy).not.toHaveBeenCalledWith( "/channels/channel456/messages/msg123/threads", "POST", - expect.objectContaining({ auto_archive_duration: 1440 }) + expect.anything() + ); + + // Should dispatch with channel-scoped thread ID (no thread part) + expect(handleIncomingMessage).toHaveBeenCalledWith( + adapter, + "discord:guild1:channel456", + expect.anything() ); fetchSpy.mockRestore(); @@ -3903,7 +3983,11 @@ describe("handleForwardedMessage - DM messages", () => { // ============================================================================ describe("mentionRoleIds handling", () => { - it("detects mention via role ID", async () => { + // Same rationale as "does not auto-create thread when mentioned in a channel" above. + // Role mentions follow the same path — the adapter detects the mention and sets + // isMention on the message, but does not create a thread. Consumers use + // thread.thread() to explicitly create one when desired. + it("detects mention via role ID without auto-creating thread", async () => { const adapter = createDiscordAdapter({ botToken: "test-token", publicKey: testPublicKey, @@ -3913,12 +3997,7 @@ describe("mentionRoleIds handling", () => { }); const handleIncomingMessage = vi.fn(); - const fetchSpy = vi.spyOn(adapter as any, "discordFetch").mockResolvedValue( - new Response(JSON.stringify({ id: "new-thread", name: "Thread" }), { - status: 200, - headers: { "Content-Type": "application/json" }, - }) - ); + const fetchSpy = vi.spyOn(adapter as any, "discordFetch"); await adapter.initialize({ handleIncomingMessage, @@ -3958,13 +4037,20 @@ describe("mentionRoleIds handling", () => { await adapter.handleWebhook(request); - // Should create a thread because of role mention - expect(fetchSpy).toHaveBeenCalledWith( + // Should NOT auto-create a thread — thread creation is now explicit via thread.thread() + expect(fetchSpy).not.toHaveBeenCalledWith( "/channels/channel456/messages/msg123/threads", "POST", expect.anything() ); + // Should still dispatch the message with mention detected + expect(handleIncomingMessage).toHaveBeenCalledWith( + adapter, + "discord:guild1:channel456", + expect.anything() + ); + fetchSpy.mockRestore(); }); }); diff --git a/packages/adapter-discord/src/index.ts b/packages/adapter-discord/src/index.ts index ea302485..2f249a11 100644 --- a/packages/adapter-discord/src/index.ts +++ b/packages/adapter-discord/src/index.ts @@ -634,24 +634,6 @@ export class DiscordAdapter implements Adapter { ); const isMentioned = isUserMentioned || isRoleMentioned; - // If mentioned and not in a thread, create one - if (!discordThreadId && isMentioned) { - try { - const newThread = await this.createDiscordThread(channelId, data.id); - discordThreadId = newThread.id; - this.logger.debug("Created Discord thread for forwarded mention", { - channelId, - messageId: data.id, - threadId: newThread.id, - }); - } catch (error) { - this.logger.error("Failed to create Discord thread for mention", { - error: String(error), - messageId: data.id, - }); - } - } - const threadId = this.encodeThreadId({ guildId, channelId: parentChannelId, @@ -1424,6 +1406,47 @@ export class DiscordAdapter implements Adapter { }; } + /** + * Open a thread for a message. + * Returns a thread ID that can be used to post threaded replies. + * + * For Discord, this creates a thread via the API if one doesn't exist. + * If the message already has a thread, returns the existing thread ID. + */ + async openThread(scopeId: string, messageId: string): Promise { + const { guildId, channelId, threadId } = this.decodeThreadId(scopeId); + + // Already in a thread — return the existing ID + if (threadId) { + this.logger.debug("openThread: already in a thread", { + scopeId, + threadId, + }); + return scopeId; + } + + // Channel scope — create or get a thread anchored to the message + this.logger.debug("openThread: creating thread", { + channelId, + messageId, + }); + + const thread = await this.createDiscordThread(channelId, messageId); + + const newThreadId = this.encodeThreadId({ + guildId, + channelId, + threadId: thread.id, + }); + + this.logger.debug("openThread: thread ready", { + threadId: thread.id, + encodedThreadId: newThreadId, + }); + + return newThreadId; + } + /** * Open a DM with a user. */ @@ -1959,26 +1982,6 @@ export class DiscordAdapter implements Adapter { parentChannelId = message.channel.parentId; } - // If not in a thread and bot is mentioned, create a thread immediately - // This ensures the Thread object has the correct ID from the start - if (!discordThreadId && isMentioned) { - try { - const newThread = await this.createDiscordThread(channelId, message.id); - discordThreadId = newThread.id; - this.logger.debug("Created Discord thread for incoming mention", { - channelId, - messageId: message.id, - threadId: newThread.id, - }); - } catch (error) { - this.logger.error("Failed to create Discord thread for mention", { - error: String(error), - messageId: message.id, - }); - // Continue without thread - will use channel - } - } - const threadId = this.encodeThreadId({ guildId, channelId: parentChannelId, diff --git a/packages/adapter-gchat/src/index.ts b/packages/adapter-gchat/src/index.ts index 488c674a..97f1480c 100644 --- a/packages/adapter-gchat/src/index.ts +++ b/packages/adapter-gchat/src/index.ts @@ -1691,6 +1691,25 @@ export class GoogleChatAdapter implements Adapter { // Google Chat doesn't have a typing indicator API for bots } + /** + * Open a thread for a message. + * Returns a thread ID that can be used to post threaded replies. + * + * For Google Chat, this encodes the message ID as the thread name. + * If already in a thread, returns the existing thread ID. + */ + async openThread(scopeId: string, messageId: string): Promise { + const { spaceName, threadName, isDM } = this.decodeThreadId(scopeId); + + // Already in a thread — return the existing ID + if (threadName) { + return scopeId; + } + + // Space scope — encode with messageId as threadName + return this.encodeThreadId({ spaceName, threadName: messageId, isDM }); + } + /** * Open a direct message conversation with a user. * Returns a thread ID that can be used to post messages. diff --git a/packages/adapter-slack/src/index.test.ts b/packages/adapter-slack/src/index.test.ts index 6197b549..48a27583 100644 --- a/packages/adapter-slack/src/index.test.ts +++ b/packages/adapter-slack/src/index.test.ts @@ -2885,6 +2885,43 @@ describe("startTyping", () => { }); }); +// ============================================================================ +// openThread Tests +// ============================================================================ + +describe("openThread", () => { + const secret = "test-signing-secret"; + + it("returns thread ID with messageId as threadTs from channel scope", async () => { + const adapter = createSlackAdapter({ + botToken: "xoxb-test-token", + signingSecret: secret, + logger: mockLogger, + }); + + const result = await adapter.openThread("slack:C123:", "1234567890.123456"); + + // Slack threading is implicit — encoding the messageId as threadTs is sufficient + expect(result).toBe("slack:C123:1234567890.123456"); + }); + + it("returns existing thread ID when already in a thread", async () => { + const adapter = createSlackAdapter({ + botToken: "xoxb-test-token", + signingSecret: secret, + logger: mockLogger, + }); + + const result = await adapter.openThread( + "slack:C123:9999999999.000000", + "1234567890.123456" + ); + + // Already in a thread — return the existing scope, ignore messageId + expect(result).toBe("slack:C123:9999999999.000000"); + }); +}); + // ============================================================================ // openDM Tests // ============================================================================ diff --git a/packages/adapter-slack/src/index.ts b/packages/adapter-slack/src/index.ts index 03289ca2..e77a4806 100644 --- a/packages/adapter-slack/src/index.ts +++ b/packages/adapter-slack/src/index.ts @@ -1355,11 +1355,8 @@ export class SlackAdapter implements Adapter { return; } - // For DMs: top-level messages use empty threadTs (matches openDM subscriptions), - // thread replies use thread_ts for per-conversation isolation. - // For channels: always use thread_ts or ts for per-thread IDs. - const isDM = event.channel_type === "im"; - const threadTs = isDM ? event.thread_ts || "" : event.thread_ts || event.ts; + // Use thread_ts when present (thread reply), empty string otherwise (channel message). + const threadTs = event.thread_ts || ""; const threadId = this.encodeThreadId({ channel: event.channel, threadTs, @@ -3256,6 +3253,25 @@ export class SlackAdapter implements Adapter { }; } + /** + * Open a thread for a message. + * Returns a thread ID that can be used to post threaded replies. + * + * For Slack, threading is implicit — posting with a thread_ts creates the thread. + * If already in a thread, returns the existing thread ID. + */ + async openThread(scopeId: string, messageId: string): Promise { + const { channel, threadTs } = this.decodeThreadId(scopeId); + + // Already in a thread — return the existing ID + if (threadTs) { + return scopeId; + } + + // Channel scope — encode with messageId as threadTs + return this.encodeThreadId({ channel, threadTs: messageId }); + } + /** * Open a direct message conversation with a user. * Returns a thread ID that can be used to post messages. @@ -3594,7 +3610,7 @@ export class SlackAdapter implements Adapter { parseMessage(raw: SlackEvent): Message { const event = raw; - const threadTs = event.thread_ts || event.ts || ""; + const threadTs = event.thread_ts || ""; const threadId = this.encodeThreadId({ channel: event.channel || "", threadTs, diff --git a/packages/adapter-teams/src/index.ts b/packages/adapter-teams/src/index.ts index 89925d69..94a80f26 100644 --- a/packages/adapter-teams/src/index.ts +++ b/packages/adapter-teams/src/index.ts @@ -1097,6 +1097,28 @@ export class TeamsAdapter implements Adapter { return { id: messageId ?? "", threadId, raw: { text: accumulated } }; } + /** + * Open a thread for a message. + * Returns a thread ID that can be used to post threaded replies. + * + * For Teams, this appends a ;messageid= suffix to the conversation ID. + * If already in a thread, returns the existing thread ID. + */ + async openThread(scopeId: string, messageId: string): Promise { + const { conversationId, serviceUrl } = this.decodeThreadId(scopeId); + + // Already in a thread (has ;messageid= suffix) — return the existing ID + if (conversationId.includes(";messageid=")) { + return scopeId; + } + + // Channel scope — encode with messageId as reply target + return this.encodeThreadId({ + conversationId: `${conversationId};messageid=${messageId}`, + serviceUrl, + }); + } + async openDM(userId: string): Promise { // Look up cached serviceUrl and tenantId for this user from state const cachedServiceUrl = await this.chat diff --git a/packages/adapter-telegram/src/index.ts b/packages/adapter-telegram/src/index.ts index 3993ac7b..940ff546 100644 --- a/packages/adapter-telegram/src/index.ts +++ b/packages/adapter-telegram/src/index.ts @@ -967,6 +967,28 @@ export class TelegramAdapter return `telegram:${chatId}`; } + /** + * Open a thread for a message. + * Returns a thread ID that can be used to post threaded replies. + * + * For Telegram, this encodes the message ID as a forum topic thread ID. + * If already in a topic thread, returns the existing thread ID. + */ + async openThread(scopeId: string, messageId: string): Promise { + const { chatId, messageThreadId } = this.decodeThreadId(scopeId); + + // Already in a topic thread — return the existing ID + if (messageThreadId !== undefined) { + return scopeId; + } + + // Channel/group scope — encode with messageId as topic thread ID + return this.encodeThreadId({ + chatId, + messageThreadId: Number.parseInt(messageId, 10), + }); + } + async openDM(userId: string): Promise { return this.encodeThreadId({ chatId: userId }); } From 9a377d3d37f1b97a0970a2e8f3a9285a9ff5033a Mon Sep 17 00:00:00 2001 From: Tyler Wright Date: Mon, 13 Apr 2026 16:32:46 -0700 Subject: [PATCH 3/3] feat(adapters): handle unsupported streaming on slack channels Streaming is used by default in thread.post() which is not supported within channels, add check for channel and accumulate stream to a single postMessage() --- packages/adapter-slack/src/index.test.ts | 98 +++++++++++++++++++++++- packages/adapter-slack/src/index.ts | 23 +++++- 2 files changed, 116 insertions(+), 5 deletions(-) diff --git a/packages/adapter-slack/src/index.test.ts b/packages/adapter-slack/src/index.test.ts index 48a27583..b08c5e4a 100644 --- a/packages/adapter-slack/src/index.test.ts +++ b/packages/adapter-slack/src/index.test.ts @@ -178,7 +178,8 @@ describe("encodeThreadId", () => { channel: "C12345", threadTs: "", }); - expect(threadId).toBe("slack:C12345:"); + // Channel-scoped: no trailing colon when threadTs is empty + expect(threadId).toBe("slack:C12345"); }); }); @@ -1712,7 +1713,7 @@ describe("DM message handling", () => { expect(chatInstance.processMessage).toHaveBeenCalledWith( adapter, - "slack:D_DM_CHAN:", + "slack:D_DM_CHAN", expect.any(Function), undefined ); @@ -2208,6 +2209,7 @@ describe("handleWebhook - slash commands", () => { // ============================================================================ interface MockableClient { + chatStream: ReturnType; assistant: { threads: { setStatus: ReturnType; @@ -2922,6 +2924,94 @@ describe("openThread", () => { }); }); +// ============================================================================ +// stream Tests +// ============================================================================ + +describe("stream", () => { + const secret = "test-signing-secret"; + + it("accumulates and posts via postMessage for channel-scope messages", async () => { + const adapter = createSlackAdapter({ + botToken: "xoxb-test-token", + signingSecret: secret, + logger: mockLogger, + }); + + const chatPostMessage = vi + .fn() + .mockResolvedValue({ ok: true, ts: "1234567890.999999" }); + mockClientMethod(adapter, "chat.postMessage", chatPostMessage); + + const client = getClient(adapter); + client.chatStream = vi.fn(); + + async function* textStream() { + yield "Hello "; + yield "world"; + } + + const result = await adapter.stream("slack:C123", textStream(), { + recipientUserId: "U123", + recipientTeamId: "T123", + }); + + // chatStream should NOT be called for channel-scope messages + expect(client.chatStream).not.toHaveBeenCalled(); + + // Should accumulate and delegate to postMessage + expect(chatPostMessage).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "C123", + text: "Hello world", + }) + ); + expect(result.id).toBe("1234567890.999999"); + }); + + it("uses chatStream for thread-scope messages", async () => { + const adapter = createSlackAdapter({ + botToken: "xoxb-test-token", + signingSecret: secret, + logger: mockLogger, + }); + + const mockStreamer = { + append: vi.fn().mockResolvedValue(null), + stop: vi.fn().mockResolvedValue({ + ok: true, + ts: "1234567890.999999", + message: { ts: "1234567890.999999" }, + }), + }; + + const client = getClient(adapter); + client.chatStream = vi.fn().mockReturnValue(mockStreamer); + + async function* textStream() { + yield "Hello"; + } + + const result = await adapter.stream( + "slack:C123:1700000001.123456", + textStream(), + { + recipientUserId: "U123", + recipientTeamId: "T123", + } + ); + + // chatStream SHOULD be called for thread-scope messages + expect(client.chatStream).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "C123", + thread_ts: "1700000001.123456", + }) + ); + expect(result.id).toBe("1234567890.999999"); + }); +}); + // ============================================================================ // openDM Tests // ============================================================================ @@ -2947,7 +3037,7 @@ describe("openDM", () => { const threadId = await adapter.openDM("U_TARGET_USER"); - expect(threadId).toBe("slack:D_DM_CHANNEL:"); + expect(threadId).toBe("slack:D_DM_CHANNEL"); const client = getClient(adapter); expect(client.conversations.open).toHaveBeenCalledWith( @@ -4510,7 +4600,7 @@ describe("handleWebhook - assistant events", () => { expect(chatInstance.processMemberJoinedChannel).toHaveBeenCalledWith( expect.objectContaining({ userId: "U_JOINED_USER", - channelId: "slack:C_TARGET_CHAN:", + channelId: "slack:C_TARGET_CHAN", inviterId: "U_INVITER", adapter, }), diff --git a/packages/adapter-slack/src/index.ts b/packages/adapter-slack/src/index.ts index e77a4806..fe6b1725 100644 --- a/packages/adapter-slack/src/index.ts +++ b/packages/adapter-slack/src/index.ts @@ -3132,6 +3132,24 @@ export class SlackAdapter implements Adapter { const { channel, threadTs } = this.decodeThreadId(threadId); this.logger.debug("Slack: starting stream", { channel, threadTs }); + // chatStream requires a thread context (it's Slack's Assistant API). + // For channel-scope messages, accumulate and post as a single message. + if (!threadTs) { + this.logger.debug( + "Slack: channel-scope stream, accumulating for postMessage", + { channel } + ); + let accumulated = ""; + for await (const chunk of textStream) { + if (typeof chunk === "string") { + accumulated += chunk; + } else if (chunk.type === "markdown_text") { + accumulated += chunk.text; + } + } + return this.postMessage(threadId, { markdown: accumulated }); + } + const token = this.getToken(); const streamer = this.client.chatStream({ channel, @@ -3553,7 +3571,10 @@ export class SlackAdapter implements Adapter { } encodeThreadId(platformData: SlackThreadId): string { - return `slack:${platformData.channel}:${platformData.threadTs}`; + if (platformData.threadTs) { + return `slack:${platformData.channel}:${platformData.threadTs}`; + } + return `slack:${platformData.channel}`; } /**