From 775662d5668bfbf7a116ea5c030ed63ca0e4570a Mon Sep 17 00:00:00 2001 From: "rosetta-livekit-bot[bot]" <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 12:50:19 +0000 Subject: [PATCH] fix(amd): defer SIP listening until answer --- agents/src/utils.ts | 75 ++++++++++++++++++++++++++++++++ agents/src/voice/amd.ts | 94 ++++++++++++++++++++++++++++++++++------- 2 files changed, 153 insertions(+), 16 deletions(-) diff --git a/agents/src/utils.ts b/agents/src/utils.ts index 9988da762..fb279723d 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -1055,6 +1055,81 @@ export async function waitForParticipant({ } } +export async function waitForParticipantAttribute({ + room, + identity, + attribute, + value, + signal, +}: { + room: Room; + identity: string; + attribute: string; + value: string; + signal?: AbortSignal; +}): Promise { + if (!room.isConnected) { + throw new Error('Room is not connected'); + } + if (signal?.aborted) { + throw new Error('waitForParticipantAttribute aborted'); + } + + const participant = room.remoteParticipants.get(identity); + if (!participant) { + throw new Error(`Participant ${identity} is not in the room`); + } + + const fut = new Future(); + + const isMatch = (p: Participant) => p.identity === identity && p.attributes[attribute] === value; + + const onParticipantAttributesChanged = ( + _changedAttributes: Record, + p: Participant, + ) => { + if (!fut.done && isMatch(p)) { + fut.resolve(); + } + }; + + const onParticipantDisconnected = (p: RemoteParticipant) => { + if (!fut.done && p.identity === identity) { + fut.reject(new Error(`Participant ${identity} disconnected while waiting for ${attribute}`)); + } + }; + + const onDisconnected = () => { + if (!fut.done) { + fut.reject(new Error(`Room disconnected while waiting for ${identity} ${attribute}`)); + } + }; + + const onAbort = () => { + if (!fut.done) { + fut.reject(new Error('waitForParticipantAttribute aborted')); + } + }; + + room.on(RoomEvent.ParticipantAttributesChanged, onParticipantAttributesChanged); + room.on(RoomEvent.ParticipantDisconnected, onParticipantDisconnected); + room.on(RoomEvent.Disconnected, onDisconnected); + signal?.addEventListener('abort', onAbort, { once: true }); + + try { + const current = room.remoteParticipants.get(identity); + if (current && current.attributes[attribute] === value) { + return; + } + await fut.await; + } finally { + room.off(RoomEvent.ParticipantAttributesChanged, onParticipantAttributesChanged); + room.off(RoomEvent.ParticipantDisconnected, onParticipantDisconnected); + room.off(RoomEvent.Disconnected, onDisconnected); + signal?.removeEventListener('abort', onAbort); + } +} + export async function waitForTrackPublication({ room, identity, diff --git a/agents/src/voice/amd.ts b/agents/src/voice/amd.ts index c1af28457..814e186bf 100644 --- a/agents/src/voice/amd.ts +++ b/agents/src/voice/amd.ts @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; -import { TrackKind } from '@livekit/rtc-node'; +import { ParticipantKind, TrackKind } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import type { Span } from '@opentelemetry/api'; import { EventEmitter } from 'node:events'; @@ -18,7 +18,13 @@ import type { ToolContext } from '../llm/tool_context.js'; import { log } from '../log.js'; import { STT, SpeechEventType, type SpeechStream } from '../stt/stt.js'; import { traceTypes, tracer } from '../telemetry/index.js'; -import { Task, delay, isCloud, waitForTrackPublication } from '../utils.js'; +import { + Task, + delay, + isCloud, + waitForParticipantAttribute, + waitForTrackPublication, +} from '../utils.js'; import type { AgentSession } from './agent_session.js'; import { AgentSessionEventTypes, @@ -112,6 +118,9 @@ const MAX_EXTENSION_MS = 10_000; const DEFAULT_AMD_LLM_MODEL = 'google/gemini-3.1-flash-lite'; const DEFAULT_AMD_STT_MODEL = 'cartesia/ink-whisper'; +const SIP_CALL_STATUS_ATTR = 'sip.callStatus'; +const SIP_CALL_STATUS_ACTIVE = 'active'; + const EVALUATED_LLM_MODELS: ReadonlySet = new Set([ 'google/gemini-3.1-flash-lite', 'google/gemini-3-flash-preview', @@ -208,6 +217,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter) // --- execution state (reset per run) --- private active = false; + private listening = false; private settled = false; private transcriptParts: string[] = []; private verdictResult: AMDPredictionEvent | undefined; @@ -226,7 +236,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter) private sttPumpTask: Task | undefined; /** * Aborts pending {@link waitForTrackPublication} calls in - * {@link gateNoSpeechTimer}. Without this the room-event listener can + * {@link gateListening}. Without this the room-event listener can * outlive the AMD instance if the participant track never publishes * before the run settles. */ @@ -326,7 +336,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter) this.rejectRun = reject; this.subscribe(); this.startDetectionTimer(); - this.gateNoSpeechTimer(); + this.gateListening(); this.startSTTPump(); }); return result; @@ -390,6 +400,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter) private resetState(): void { this.settled = false; + this.listening = false; this.transcriptParts = []; this.verdictResult = undefined; this.machineSilenceReached = false; @@ -429,12 +440,13 @@ export class AMD extends (EventEmitter as new () => TypedEmitter) this.noSpeechTimer = setTimeout(() => this.settleNoSpeech(), this.noSpeechTimeoutMs); } - /** - * If the session has a `_roomIO`, defer the no-speech timer until the - * participant's audio track is both published and subscribed. Without - * `_roomIO` (e.g. unit tests, remote-session callers without participants), - * fall back to starting the timer immediately. - */ + private startListening(): void { + if (this.settled || this.listening) return; + this.listening = true; + this.startNoSpeechTimer(); + this._log.debug('AMD starts listening'); + } + /** * Mirrors python detector.py `_run_stt`. When AMD owns its STT, it runs a * private pump that: @@ -479,6 +491,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter) while (!signal.aborted && !this.settled) { const { done, value } = await reader.read(); if (done || !value) break; + if (!this.listening) continue; try { sttStream.pushFrame(value); } catch { @@ -522,13 +535,19 @@ export class AMD extends (EventEmitter as new () => TypedEmitter) await Promise.allSettled([sendPump, recvPump]); } - private gateNoSpeechTimer(): void { + /** + * If the session has a `_roomIO`, defer listening until the participant's + * audio track is subscribed. For SIP participants, also wait until the call + * is active so ringback and early media do not burn the no-speech budget. + * Without `_roomIO`, fall back to listening immediately. + */ + private gateListening(): void { const roomIO = this.session._roomIO; const room = roomIO?.rtcRoom; if (!room || !room.isConnected) { // Mirrors python: "session room_io unavailable, starting amd timers // immediately as fallback". - this.startNoSpeechTimer(); + this.startListening(); return; } @@ -549,9 +568,43 @@ export class AMD extends (EventEmitter as new () => TypedEmitter) waitForSubscription: true, signal: this.trackGateAbort.signal, }) - .then(() => { + .then(async (publication) => { + if (this.settled) { + return; + } + + const publicationSid = publication.sid; + const participant = targetIdentity + ? room.remoteParticipants.get(targetIdentity) + : publicationSid + ? [...room.remoteParticipants.values()].find((p) => + p.trackPublications.has(publicationSid), + ) + : undefined; + if (!participant) { + return; + } + + if (participant.kind !== ParticipantKind.SIP) { + this.startListening(); + return; + } + + try { + await waitForParticipantAttribute({ + room, + identity: participant.identity, + attribute: SIP_CALL_STATUS_ATTR, + value: SIP_CALL_STATUS_ACTIVE, + signal: this.trackGateAbort?.signal, + }); + } catch (err) { + this._log.debug({ err }, 'AMD SIP answer wait aborted'); + return; + } + if (!this.settled) { - this.startNoSpeechTimer(); + this.startListening(); } }) .catch((err) => { @@ -559,9 +612,12 @@ export class AMD extends (EventEmitter as new () => TypedEmitter) // room disconnected, aborted by `cleanup`, or the function rejects), // fall back to starting the timer so the run still settles within // `noSpeechTimeoutMs`. - this._log.debug({ err }, 'AMD track gating failed; starting no-speech timer immediately'); + if (this.trackGateAbort?.signal.aborted) { + return; + } + this._log.debug({ err }, 'AMD listening gate failed; starting immediately'); if (!this.settled) { - this.startNoSpeechTimer(); + this.startListening(); } }); } @@ -570,6 +626,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter) this.clearTimer('noSpeech'); this.clearTimer('detection'); this.clearTimer('silence'); + this.listening = false; this.session.off(AgentSessionEventTypes.UserInputTranscribed, this.handleTranscript); this.session.off(AgentSessionEventTypes.UserStateChanged, this.handleUserStateChanged); this.session.off(AgentSessionEventTypes.Close, this.handleClose); @@ -722,6 +779,10 @@ export class AMD extends (EventEmitter as new () => TypedEmitter) return; } + if (!this.listening) { + return; + } + if (ev.newState === 'speaking') { this.clearTimer('silence'); this.clearTimer('noSpeech'); @@ -791,6 +852,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter) */ private consumeTranscript(transcript: string): void { if (this.settled) return; + if (!this.listening) return; if (this.silenceTimer && this.silenceTimerTrigger === 'short_speech') { this.clearTimer('silence'); if (this.speechEndedAt !== undefined) {