Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions agents/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,81 @@ export async function waitForParticipant({
}
}

export async function waitForParticipantAttribute({
room,
identity,
attribute,
value,
signal,
}: {
room: Room;
identity: string;
attribute: string;
value: string;
signal?: AbortSignal;
}): Promise<void> {
if (!room.isConnected) {
throw new Error('Room is not connected');
}
if (signal?.aborted) {
throw new Error('waitForParticipantAttribute aborted');
}

const participant = room.remoteParticipants.get(identity);
if (!participant) {
throw new Error(`Participant ${identity} is not in the room`);
}

const fut = new Future<void>();

const isMatch = (p: Participant) => p.identity === identity && p.attributes[attribute] === value;

const onParticipantAttributesChanged = (
_changedAttributes: Record<string, string>,
p: Participant,
) => {
if (!fut.done && isMatch(p)) {
fut.resolve();
}
};

const onParticipantDisconnected = (p: RemoteParticipant) => {
if (!fut.done && p.identity === identity) {
fut.reject(new Error(`Participant ${identity} disconnected while waiting for ${attribute}`));
}
};

const onDisconnected = () => {
if (!fut.done) {
fut.reject(new Error(`Room disconnected while waiting for ${identity} ${attribute}`));
}
};

const onAbort = () => {
if (!fut.done) {
fut.reject(new Error('waitForParticipantAttribute aborted'));
}
};

room.on(RoomEvent.ParticipantAttributesChanged, onParticipantAttributesChanged);
room.on(RoomEvent.ParticipantDisconnected, onParticipantDisconnected);
room.on(RoomEvent.Disconnected, onDisconnected);
signal?.addEventListener('abort', onAbort, { once: true });

try {
const current = room.remoteParticipants.get(identity);
if (current && current.attributes[attribute] === value) {
return;
}
Comment on lines +1120 to +1123
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 waitForParticipantAttribute hangs if participant disconnects between existence check and listener setup

In waitForParticipantAttribute, the participant existence is checked at line 1078 before event listeners are registered at lines 1114-1117. If the participant disconnects in that window, the ParticipantDisconnected event fires before our listener is registered (missed), and at line 1120 current is undefined (participant gone from map). The function falls through to await fut.await which can only resolve via room disconnect or signal abort — there's no path that detects the participant already left.

Comparison with existing waitForParticipant pattern

The sibling function waitForParticipant (agents/src/utils.ts:1028-1048) correctly avoids this by setting up listeners first and then checking existing participants, ensuring no events are missed. waitForParticipantAttribute should similarly check !current after listener registration and reject/throw.

In the AMD caller context this is mitigated by the trackGateAbort signal (eventually aborted by cleanup), but the utility function itself is incorrect for any caller without such a safety net.

Suggested change
const current = room.remoteParticipants.get(identity);
if (current && current.attributes[attribute] === value) {
return;
}
const current = room.remoteParticipants.get(identity);
if (!current) {
throw new Error(`Participant ${identity} is no longer in the room`);
}
if (current.attributes[attribute] === value) {
return;
}
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

await fut.await;
} finally {
room.off(RoomEvent.ParticipantAttributesChanged, onParticipantAttributesChanged);
room.off(RoomEvent.ParticipantDisconnected, onParticipantDisconnected);
room.off(RoomEvent.Disconnected, onDisconnected);
signal?.removeEventListener('abort', onAbort);
}
}

export async function waitForTrackPublication({
room,
identity,
Expand Down
94 changes: 78 additions & 16 deletions agents/src/voice/amd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0
import type { AudioFrame } from '@livekit/rtc-node';
import { TrackKind } from '@livekit/rtc-node';
import { ParticipantKind, TrackKind } from '@livekit/rtc-node';
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import type { Span } from '@opentelemetry/api';
import { EventEmitter } from 'node:events';
Expand All @@ -18,7 +18,13 @@ import type { ToolContext } from '../llm/tool_context.js';
import { log } from '../log.js';
import { STT, SpeechEventType, type SpeechStream } from '../stt/stt.js';
import { traceTypes, tracer } from '../telemetry/index.js';
import { Task, delay, isCloud, waitForTrackPublication } from '../utils.js';
import {
Task,
delay,
isCloud,
waitForParticipantAttribute,
waitForTrackPublication,
} from '../utils.js';
import type { AgentSession } from './agent_session.js';
import {
AgentSessionEventTypes,
Expand Down Expand Up @@ -112,6 +118,9 @@ const MAX_EXTENSION_MS = 10_000;
const DEFAULT_AMD_LLM_MODEL = 'google/gemini-3.1-flash-lite';
const DEFAULT_AMD_STT_MODEL = 'cartesia/ink-whisper';

const SIP_CALL_STATUS_ATTR = 'sip.callStatus';
const SIP_CALL_STATUS_ACTIVE = 'active';

const EVALUATED_LLM_MODELS: ReadonlySet<string> = new Set([
'google/gemini-3.1-flash-lite',
'google/gemini-3-flash-preview',
Expand Down Expand Up @@ -208,6 +217,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter<AMDCallbacks>)

// --- execution state (reset per run) ---
private active = false;
private listening = false;
private settled = false;
private transcriptParts: string[] = [];
private verdictResult: AMDPredictionEvent | undefined;
Expand All @@ -226,7 +236,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter<AMDCallbacks>)
private sttPumpTask: Task<void> | undefined;
/**
* Aborts pending {@link waitForTrackPublication} calls in
* {@link gateNoSpeechTimer}. Without this the room-event listener can
* {@link gateListening}. Without this the room-event listener can
* outlive the AMD instance if the participant track never publishes
* before the run settles.
*/
Expand Down Expand Up @@ -326,7 +336,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter<AMDCallbacks>)
this.rejectRun = reject;
this.subscribe();
this.startDetectionTimer();
this.gateNoSpeechTimer();
this.gateListening();
this.startSTTPump();
});
return result;
Expand Down Expand Up @@ -390,6 +400,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter<AMDCallbacks>)

private resetState(): void {
this.settled = false;
this.listening = false;
this.transcriptParts = [];
this.verdictResult = undefined;
this.machineSilenceReached = false;
Expand Down Expand Up @@ -429,12 +440,13 @@ export class AMD extends (EventEmitter as new () => TypedEmitter<AMDCallbacks>)
this.noSpeechTimer = setTimeout(() => this.settleNoSpeech(), this.noSpeechTimeoutMs);
}

/**
* If the session has a `_roomIO`, defer the no-speech timer until the
* participant's audio track is both published and subscribed. Without
* `_roomIO` (e.g. unit tests, remote-session callers without participants),
* fall back to starting the timer immediately.
*/
private startListening(): void {
if (this.settled || this.listening) return;
this.listening = true;
this.startNoSpeechTimer();
this._log.debug('AMD starts listening');
}

/**
* Mirrors python detector.py `_run_stt`. When AMD owns its STT, it runs a
* private pump that:
Expand Down Expand Up @@ -479,6 +491,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter<AMDCallbacks>)
while (!signal.aborted && !this.settled) {
const { done, value } = await reader.read();
if (done || !value) break;
if (!this.listening) continue;
try {
sttStream.pushFrame(value);
} catch {
Expand Down Expand Up @@ -522,13 +535,19 @@ export class AMD extends (EventEmitter as new () => TypedEmitter<AMDCallbacks>)
await Promise.allSettled([sendPump, recvPump]);
}

private gateNoSpeechTimer(): void {
/**
* If the session has a `_roomIO`, defer listening until the participant's
* audio track is subscribed. For SIP participants, also wait until the call
* is active so ringback and early media do not burn the no-speech budget.
* Without `_roomIO`, fall back to listening immediately.
*/
private gateListening(): void {
const roomIO = this.session._roomIO;
const room = roomIO?.rtcRoom;
if (!room || !room.isConnected) {
// Mirrors python: "session room_io unavailable, starting amd timers
// immediately as fallback".
this.startNoSpeechTimer();
this.startListening();
return;
}

Expand All @@ -549,19 +568,56 @@ export class AMD extends (EventEmitter as new () => TypedEmitter<AMDCallbacks>)
waitForSubscription: true,
signal: this.trackGateAbort.signal,
})
.then(() => {
.then(async (publication) => {
if (this.settled) {
return;
}

const publicationSid = publication.sid;
const participant = targetIdentity
? room.remoteParticipants.get(targetIdentity)
: publicationSid
? [...room.remoteParticipants.values()].find((p) =>
p.trackPublications.has(publicationSid),
)
: undefined;
if (!participant) {
return;
}
Comment on lines +584 to +586
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 AMD never starts listening when participant resolution fails after track publication

In gateListening(), when participant is undefined at line 584 (e.g., the participant disconnected between track publication and the lookup, or publicationSid is falsy), the .then() handler returns without calling startListening(). This means the no-speech timer never starts, VAD events and transcripts are silently dropped (handleUserStateChanged and consumeTranscript both return early when this.listening is false), and AMD only settles when the detection timer fires after 20 seconds, producing a meaningless UNCERTAIN result.

This is a regression from the previous code which always called startNoSpeechTimer() after track publication. The .catch() handler at agents/src/voice/amd.ts:610-622 correctly falls back to startListening(), but the !participant branch in .then() does not.

Suggested change
if (!participant) {
return;
}
if (!participant) {
this.startListening();
return;
}
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


if (participant.kind !== ParticipantKind.SIP) {
this.startListening();
return;
}

try {
await waitForParticipantAttribute({
room,
identity: participant.identity,
attribute: SIP_CALL_STATUS_ATTR,
value: SIP_CALL_STATUS_ACTIVE,
signal: this.trackGateAbort?.signal,
});
} catch (err) {
this._log.debug({ err }, 'AMD SIP answer wait aborted');
return;
}

if (!this.settled) {
this.startNoSpeechTimer();
this.startListening();
}
})
.catch((err) => {
// Track gating is best-effort: if waiting for publication fails (e.g.
// room disconnected, aborted by `cleanup`, or the function rejects),
// fall back to starting the timer so the run still settles within
// `noSpeechTimeoutMs`.
this._log.debug({ err }, 'AMD track gating failed; starting no-speech timer immediately');
if (this.trackGateAbort?.signal.aborted) {
return;
}
this._log.debug({ err }, 'AMD listening gate failed; starting immediately');
if (!this.settled) {
this.startNoSpeechTimer();
this.startListening();
}
});
}
Expand All @@ -570,6 +626,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter<AMDCallbacks>)
this.clearTimer('noSpeech');
this.clearTimer('detection');
this.clearTimer('silence');
this.listening = false;
this.session.off(AgentSessionEventTypes.UserInputTranscribed, this.handleTranscript);
this.session.off(AgentSessionEventTypes.UserStateChanged, this.handleUserStateChanged);
this.session.off(AgentSessionEventTypes.Close, this.handleClose);
Expand Down Expand Up @@ -722,6 +779,10 @@ export class AMD extends (EventEmitter as new () => TypedEmitter<AMDCallbacks>)
return;
}

if (!this.listening) {
return;
}

if (ev.newState === 'speaking') {
this.clearTimer('silence');
this.clearTimer('noSpeech');
Expand Down Expand Up @@ -791,6 +852,7 @@ export class AMD extends (EventEmitter as new () => TypedEmitter<AMDCallbacks>)
*/
private consumeTranscript(transcript: string): void {
if (this.settled) return;
if (!this.listening) return;
if (this.silenceTimer && this.silenceTimerTrigger === 'short_speech') {
this.clearTimer('silence');
if (this.speechEndedAt !== undefined) {
Expand Down
Loading