Skip to content

Commit df9ffec

Browse files
committed
feat: integrate audio handling in ChatSurfaceService and bind runAndPersistAgentResponse method
1 parent 8effd46 commit df9ffec

2 files changed

Lines changed: 200 additions & 18 deletions

File tree

chatSurfaceService.ts

Lines changed: 199 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,42 @@ import type {
55
ChatSurfaceIncomingMessage,
66
IAdminForth,
77
} from "adminforth";
8-
import { Filters } from "adminforth";
8+
import { Filters, logger } from "adminforth";
99
import { randomUUID } from "crypto";
1010
import type { AgentEventEmitter } from "./agentEvents.js";
11-
import type { HandleTurnInput } from "./agentTurnService.js";
11+
import type {
12+
HandleTurnInput,
13+
RunAndPersistAgentResponseInput,
14+
RunAndPersistAgentResponseResult,
15+
} from "./agentTurnService.js";
1216
import type { PluginOptions } from "./types.js";
1317
import type { AgentSessionStore } from "./sessionStore.js";
18+
import { getErrorMessage, isAbortError } from "./errors.js";
19+
import { sanitizeSpeechText } from "./sanitizeSpeechText.js";
1420

1521
type ChatSurfaceConnectAction = {
1622
type: "url";
1723
label: string;
1824
url: string;
1925
};
2026

27+
type ChatSurfaceIncomingMessageWithAudio = ChatSurfaceIncomingMessage & {
28+
audio?: {
29+
buffer: Buffer;
30+
filename: string;
31+
mimeType: string;
32+
};
33+
};
34+
35+
type ChatSurfaceEventSinkWithAudio = ChatSurfaceEventSink & {
36+
emit(event: Parameters<ChatSurfaceEventSink["emit"]>[0] | {
37+
type: "audio";
38+
audio: Buffer;
39+
filename: string;
40+
mimeType: string;
41+
}): void | Promise<void>;
42+
};
43+
2144
export type ChatSurfaceAdapterWithConnectAction = ChatSurfaceAdapter & {
2245
createConnectAction?(input: {
2346
token: string;
@@ -41,6 +64,9 @@ export class ChatSurfaceService {
4164
private options: PluginOptions,
4265
private sessionStore: AgentSessionStore,
4366
private handleTurn: (input: HandleTurnInput) => Promise<unknown>,
67+
private runAndPersistAgentResponse: (
68+
input: RunAndPersistAgentResponseInput,
69+
) => Promise<RunAndPersistAgentResponseResult>,
4470
) {}
4571

4672
getConnectActionAdapters() {
@@ -108,10 +134,35 @@ export class ChatSurfaceService {
108134
incoming: ChatSurfaceIncomingMessage,
109135
sink: ChatSurfaceEventSink,
110136
) {
111-
if (typeof incoming.metadata?.startPayload !== "string") {
137+
if (incoming.metadata?.isStartCommand !== true) {
112138
return false;
113139
}
114140

141+
const externalUserIdField = this.options.chatExternalIdsField ?? DEFAULT_ADMIN_USER_EXTERNAL_USER_ID_FIELD;
142+
const adminforth = this.getAdminforth();
143+
const authResourceId = adminforth.config.auth!.usersResourceId!;
144+
const authResource = adminforth.config.resources.find((resource) => resource.resourceId === authResourceId)!;
145+
const primaryKeyField = authResource.columns.find((column) => column.primaryKey)!.name!;
146+
const linkedAdminUserRecord = (
147+
await adminforth.resource(authResourceId).list(Filters.IS_NOT_EMPTY(externalUserIdField))
148+
).find((user) => user[externalUserIdField]?.[incoming.surface] === incoming.externalUserId);
149+
150+
if (linkedAdminUserRecord) {
151+
await sink.emit({
152+
type: "done",
153+
text: `${incoming.surface} account is already connected to AdminForth.`,
154+
});
155+
return true;
156+
}
157+
158+
if (typeof incoming.metadata?.startPayload !== "string") {
159+
await sink.emit({
160+
type: "done",
161+
text: `Open AdminForth and connect your ${incoming.surface} account from Chat Surfaces settings.`,
162+
});
163+
return true;
164+
}
165+
115166
const payload = this.consumeLinkToken(incoming.surface, incoming.metadata.startPayload);
116167
if (!payload) {
117168
await sink.emit({
@@ -120,11 +171,7 @@ export class ChatSurfaceService {
120171
});
121172
return true;
122173
}
123-
const externalUserIdField = this.options.chatExternalIdsField ?? DEFAULT_ADMIN_USER_EXTERNAL_USER_ID_FIELD;
124-
const adminforth = this.getAdminforth();
125-
const authResourceId = adminforth.config.auth!.usersResourceId!;
126-
const authResource = adminforth.config.resources.find((resource) => resource.resourceId === authResourceId)!;
127-
const primaryKeyField = authResource.columns.find((column) => column.primaryKey)!.name!;
174+
128175
const adminUserRecord = await adminforth.resource(authResourceId).get([
129176
Filters.EQ(primaryKeyField, payload.adminUserId),
130177
]);
@@ -143,6 +190,143 @@ export class ChatSurfaceService {
143190
return true;
144191
}
145192

193+
private async handleAudioMessage(
194+
incoming: ChatSurfaceIncomingMessageWithAudio,
195+
sink: ChatSurfaceEventSinkWithAudio,
196+
adminUser: AdminUser,
197+
) {
198+
const audioAdapter = this.options.audioAdapter;
199+
if (!audioAdapter) {
200+
await sink.emit({
201+
type: "error",
202+
message: "Audio adapter is not configured for AdminForth Agent.",
203+
});
204+
return;
205+
}
206+
207+
let transcription;
208+
209+
try {
210+
transcription = await audioAdapter.transcribe({
211+
buffer: incoming.audio!.buffer,
212+
filename: incoming.audio!.filename,
213+
mimeType: incoming.audio!.mimeType,
214+
language: "auto",
215+
});
216+
} catch (error) {
217+
if (isAbortError(error)) {
218+
logger.info(`Agent ${incoming.surface} surface speech transcription aborted`);
219+
return;
220+
}
221+
222+
logger.error(`Agent ${incoming.surface} surface speech transcription failed:\n${getErrorMessage(error)}`);
223+
await sink.emit({
224+
type: "error",
225+
message: "Speech transcription failed. Check server logs for details.",
226+
});
227+
return;
228+
}
229+
230+
if (!transcription.text) {
231+
await sink.emit({
232+
type: "error",
233+
message: "Speech transcription is empty",
234+
});
235+
return;
236+
}
237+
238+
const agentResponse = await this.handleAgentSurfaceResponse(
239+
incoming,
240+
sink,
241+
adminUser,
242+
transcription.text,
243+
{ emitDone: false },
244+
);
245+
246+
if (!agentResponse || agentResponse.aborted || agentResponse.failed) {
247+
return;
248+
}
249+
250+
await sink.emit({
251+
type: "done",
252+
text: agentResponse.text,
253+
});
254+
255+
try {
256+
const speech = await audioAdapter.synthesize({
257+
text: sanitizeSpeechText(agentResponse.text),
258+
stream: false,
259+
format: "opus",
260+
});
261+
262+
await sink.emit({
263+
type: "audio",
264+
audio: speech.audio,
265+
filename: "agent-response.ogg",
266+
mimeType: speech.mimeType,
267+
});
268+
} catch (error) {
269+
if (isAbortError(error)) {
270+
logger.info(`Agent ${incoming.surface} surface speech synthesis aborted`);
271+
return;
272+
}
273+
274+
logger.error(`Agent ${incoming.surface} surface speech synthesis failed:\n${getErrorMessage(error)}`);
275+
await sink.emit({
276+
type: "error",
277+
message: getErrorMessage(error),
278+
});
279+
}
280+
}
281+
282+
private async handleAgentSurfaceResponse(
283+
incoming: ChatSurfaceIncomingMessage,
284+
sink: ChatSurfaceEventSink,
285+
adminUser: AdminUser,
286+
prompt: string,
287+
options?: { emitDone?: boolean },
288+
) {
289+
const emitDone = options?.emitDone ?? true;
290+
const sessionId = await this.sessionStore.getOrCreateChatSurfaceSession(
291+
{ ...incoming, prompt },
292+
adminUser,
293+
);
294+
295+
if (emitDone) {
296+
await this.handleTurn({
297+
prompt,
298+
sessionId,
299+
modeName: incoming.modeName,
300+
userTimeZone: incoming.userTimeZone ?? "UTC",
301+
adminUser,
302+
emit: this.createEventEmitter(sink),
303+
failureLogMessage: `Agent ${incoming.surface} surface response failed`,
304+
abortLogMessage: `Agent ${incoming.surface} surface response aborted`,
305+
});
306+
return null;
307+
}
308+
309+
const agentResponse = await this.runAndPersistAgentResponse({
310+
prompt,
311+
sessionId,
312+
modeName: incoming.modeName,
313+
userTimeZone: incoming.userTimeZone ?? "UTC",
314+
adminUser,
315+
emit: this.createEventEmitter(sink),
316+
failureLogMessage: `Agent ${incoming.surface} surface response failed`,
317+
abortLogMessage: `Agent ${incoming.surface} surface response aborted`,
318+
});
319+
320+
if (agentResponse.failed) {
321+
await sink.emit({
322+
type: "error",
323+
message: agentResponse.text,
324+
});
325+
}
326+
327+
return agentResponse;
328+
}
329+
146330
async handleMessage(
147331
adapter: ChatSurfaceAdapter,
148332
incoming: ChatSurfaceIncomingMessage,
@@ -175,15 +359,12 @@ export class ChatSurfaceService {
175359
dbUser: adminUserRecord,
176360
};
177361

178-
await this.handleTurn({
179-
prompt: incoming.prompt,
180-
sessionId: await this.sessionStore.getOrCreateChatSurfaceSession(incoming, adminUser),
181-
modeName: incoming.modeName,
182-
userTimeZone: incoming.userTimeZone ?? "UTC",
183-
adminUser,
184-
emit: this.createEventEmitter(sink),
185-
failureLogMessage: `Agent ${incoming.surface} surface response failed`,
186-
abortLogMessage: `Agent ${incoming.surface} surface response aborted`,
187-
});
362+
const incomingWithAudio = incoming as ChatSurfaceIncomingMessageWithAudio;
363+
if (incomingWithAudio.audio) {
364+
await this.handleAudioMessage(incomingWithAudio, sink as ChatSurfaceEventSinkWithAudio, adminUser);
365+
return;
366+
}
367+
368+
await this.handleAgentSurfaceResponse(incoming, sink, adminUser, incoming.prompt);
188369
}
189370
}

index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin {
7777
this.options,
7878
this.sessionStore,
7979
this.agentTurnService.handleTurn.bind(this.agentTurnService),
80+
this.agentTurnService.runAndPersistAgentResponse.bind(this.agentTurnService),
8081
);
8182
this.agentSystemPromptPromise = Promise.resolve(
8283
appendCustomSystemPrompt(DEFAULT_AGENT_SYSTEM_PROMPT, this.options.systemPrompt),

0 commit comments

Comments
 (0)