Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/block-turn-exceeded-handoff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": patch
---

Block user turn exceeded callbacks while an agent handoff is starting.
36 changes: 34 additions & 2 deletions agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ export class AgentActivity implements RecognitionHooks {
private turnDetectionMode?: TurnDetectionMode;
private logger = log();
private _schedulingPaused = true;
private newTurnsBlocked = false;
private _authorizationPaused = false;
private _drainBlockedTasks: Task<any>[] = [];
private _currentSpeech?: SpeechHandle;
Expand Down Expand Up @@ -671,6 +672,11 @@ export class AgentActivity implements RecognitionHooks {
return this._schedulingPaused;
}

/** @internal */
blockNewTurns(): void {
this.newTurnsBlocked = true;
}

pauseReplyAuthorization(): void {
this._authorizationPaused = true;
this.wakeupMainTask();
Expand Down Expand Up @@ -1071,7 +1077,7 @@ export class AgentActivity implements RecognitionHooks {
return;
}

if (this.schedulingPaused) {
if (this.schedulingPaused || this.newTurnsBlocked) {
// TODO(shubhra): should we "forward" this new turn to the next agent?
this.logger.warn('skipping new realtime generation, the speech scheduling is not running');
return;
Expand Down Expand Up @@ -1364,6 +1370,7 @@ export class AgentActivity implements RecognitionHooks {
if (
!preemptiveOpts.enabled ||
this.schedulingPaused ||
this.newTurnsBlocked ||
(this._currentSpeech !== undefined && !this._currentSpeech.interrupted) ||
!(this.llm instanceof LLM)
) {
Expand Down Expand Up @@ -1563,7 +1570,7 @@ export class AgentActivity implements RecognitionHooks {
}

async onEndOfTurn(info: EndOfTurnInfo): Promise<boolean> {
if (this.schedulingPaused) {
if (this.schedulingPaused || this.newTurnsBlocked) {
this.cancelPreemptiveGeneration();
this.logger.warn(
{ user_input: info.newTranscript },
Expand Down Expand Up @@ -2003,6 +2010,18 @@ export class AgentActivity implements RecognitionHooks {
transcriptConfidence: info.transcriptConfidence,
});

if (this.schedulingPaused || this.newTurnsBlocked) {
this.logger.warn(
{ user_input: info.newTranscript },
'skipping onUserTurnCompleted, speech scheduling is paused',
);
if (this.agentSession._closing) {
this.agent._chatCtx.items.push(userMessage);
this.agentSession._conversationItemAdded(userMessage);
}
return;
}

// create a temporary mutable chat context to pass to onUserTurnCompleted
// the user can edit it for the current generation, but changes will not be kept inside the
// Agent.chatCtx
Expand All @@ -2027,6 +2046,18 @@ export class AgentActivity implements RecognitionHooks {
return;
}

if (this.schedulingPaused || this.newTurnsBlocked) {
this.logger.warn(
{ user_input: info.newTranscript },
'skipping reply to user input, speech scheduling is paused',
);
if (userMessage && this.agentSession._closing) {
this.agent._chatCtx.items.push(userMessage);
this.agentSession._conversationItemAdded(userMessage);
}
return;
}

const userMetricsReport: MetricsReport = {};
if (info.startedSpeakingAt !== undefined) {
userMetricsReport.startedSpeakingAt = info.startedSpeakingAt / 1000; // ms -> seconds
Expand Down Expand Up @@ -3539,6 +3570,7 @@ export class AgentActivity implements RecognitionHooks {
if (!this._schedulingPaused) return;

this._schedulingPaused = false;
this.newTurnsBlocked = false;
this._mainTask = Task.from(({ signal }) => this.mainTask(signal));
}

Expand Down
152 changes: 152 additions & 0 deletions agents/src/voice/agent_activity_handoff.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import {
type ReusableResources,
cleanupReusableResources,
} from './agent_activity.js';
import type { EndOfTurnInfo } from './audio_recognition.js';
import type { UserTurnExceededEvent } from './events.js';

initializeLogger({ pretty: false, level: 'silent' });

Expand Down Expand Up @@ -288,6 +290,156 @@ describe('AgentActivity RT session reuse eligibility', () => {
});
});

describe('AgentActivity blockNewTurns (handoff transition)', () => {
/* eslint-disable @typescript-eslint/no-explicit-any */
function createBareActivity(): any {
const activity = Object.create(AgentActivity.prototype);
activity.logger = { warn: vi.fn(), debug: vi.fn(), info: vi.fn(), error: vi.fn() };
activity.cancelPreemptiveGeneration = vi.fn();
activity.createSpeechTask = vi.fn(() => ({ cancel: vi.fn() }));
return activity;
}

const endOfTurnInfo: EndOfTurnInfo = {
newTranscript: 'hello again',
transcriptConfidence: 1,
transcriptionDelay: 0,
endOfUtteranceDelay: 0,
startedSpeakingAt: undefined,
stoppedSpeakingAt: undefined,
};

// Regression for the mis-ported #5396 fix: blockNewTurns() must gate the speech
// scheduling paths (here onEndOfTurn) during the handoff transition window — even
// before drain() flips schedulingPaused — so a user turn arriving in that window is
// dropped instead of scheduling a reply against the outgoing agent.
it('onEndOfTurn skips a new user turn while new turns are blocked, even when scheduling is still running', async () => {
const activity = createBareActivity();
activity._schedulingPaused = false; // scheduling still running (pre-drain window)
activity.newTurnsBlocked = false;

(activity as AgentActivity).blockNewTurns();
const handled = await (activity as AgentActivity).onEndOfTurn(endOfTurnInfo);

expect(handled).toBe(true);
expect(activity.cancelPreemptiveGeneration).toHaveBeenCalledTimes(1);
// The turn is dropped at the guard; userTurnCompleted is never scheduled.
expect(activity.createSpeechTask).not.toHaveBeenCalled();
});

it('onEndOfTurn schedules the turn normally when new turns are not blocked', async () => {
const activity = createBareActivity();
activity._schedulingPaused = false;
activity.newTurnsBlocked = false;
// `get stt` returns undefined here, short-circuiting the interruption branch.
activity.agent = { stt: undefined };
activity.agentSession = { stt: undefined };
activity._currentSpeech = undefined;
activity._userTurnCompletedTask = undefined;

const handled = await (activity as AgentActivity).onEndOfTurn(endOfTurnInfo);

expect(handled).toBe(true);
expect(activity.createSpeechTask).toHaveBeenCalledTimes(1);
});

// The bot's port wrongly gated the user-turn-exceeded callback on newTurnsBlocked;
// Python never does. onUserTurnExceeded must stay independent of the handoff flag.
it('onUserTurnExceeded is independent of newTurnsBlocked', () => {
const activity = createBareActivity();
activity._schedulingPaused = false;
activity.newTurnsBlocked = false;
activity.userTurnExceededLocked = false;
activity.userTurnExceededTask = undefined;

(activity as AgentActivity).blockNewTurns();

const ev: UserTurnExceededEvent = {
type: 'user_turn_exceeded',
transcript: 'hi',
accumulatedTranscript: 'hi',
accumulatedWordCount: 10,
duration: 5000,
createdAt: Date.now(),
};
(activity as AgentActivity).onUserTurnExceeded(ev);

expect(activity.createSpeechTask).toHaveBeenCalledTimes(1);
});

// Parity with Python `_user_turn_completed` (agent_activity.py:2025): when new turns are
// blocked before the turn completes, the reply must be skipped *before* onUserTurnCompleted
// runs. When the session is not closing, the message is dropped (not added to chat ctx).
it('userTurnCompleted skips before the callback when new turns are blocked (not closing)', async () => {
const activity = createBareActivity();
activity._schedulingPaused = false;
activity.newTurnsBlocked = false;
activity._currentSpeech = undefined;
const onUserTurnCompleted = vi.fn(async () => {});
activity.agent = { llm: undefined, chatCtx: ChatContext.empty(), onUserTurnCompleted };
activity.agentSession = { llm: undefined, _closing: false, _conversationItemAdded: vi.fn() };

(activity as AgentActivity).blockNewTurns();
await (activity as any).userTurnCompleted(endOfTurnInfo);

expect(onUserTurnCompleted).not.toHaveBeenCalled();
expect(activity.agentSession._conversationItemAdded).not.toHaveBeenCalled();
expect(activity.createSpeechTask).not.toHaveBeenCalled();
});

// Parity with Python `_user_turn_completed` (agent_activity.py:2025): the skipped message is
// still committed to the chat context when the session is closing, so it is not lost.
it('userTurnCompleted commits the skipped message to chat ctx when closing', async () => {
const activity = createBareActivity();
activity._schedulingPaused = false;
activity.newTurnsBlocked = false;
activity._currentSpeech = undefined;
const push = vi.fn();
const conversationItemAdded = vi.fn();
activity.agent = {
llm: undefined,
chatCtx: ChatContext.empty(),
_chatCtx: { items: { push } },
onUserTurnCompleted: vi.fn(async () => {}),
};
activity.agentSession = {
llm: undefined,
_closing: true,
_conversationItemAdded: conversationItemAdded,
};

(activity as AgentActivity).blockNewTurns();
await (activity as any).userTurnCompleted(endOfTurnInfo);

expect(push).toHaveBeenCalledTimes(1);
expect(conversationItemAdded).toHaveBeenCalledTimes(1);
expect(activity.createSpeechTask).not.toHaveBeenCalled();
});

// Parity with Python `_user_turn_completed` (agent_activity.py:2059): the post-callback
// re-check catches a handoff triggered *inside* onUserTurnCompleted, so no reply is scheduled
// against the outgoing agent even though new turns were not blocked when the turn started.
it('userTurnCompleted re-checks after the callback when a handoff blocks new turns mid-callback', async () => {
const activity = createBareActivity();
activity._schedulingPaused = false;
activity.newTurnsBlocked = false;
activity._currentSpeech = undefined;
// A handoff inside the user callback blocks new turns after guard A has already passed.
const onUserTurnCompleted = vi.fn(async () => {
activity.newTurnsBlocked = true;
});
const plainLlm = { id: 'plain-llm' };
activity.agent = { llm: plainLlm, chatCtx: ChatContext.empty(), onUserTurnCompleted };
activity.agentSession = { llm: undefined, _closing: false, _conversationItemAdded: vi.fn() };

await (activity as any).userTurnCompleted(endOfTurnInfo);

expect(onUserTurnCompleted).toHaveBeenCalledTimes(1);
expect(activity.createSpeechTask).not.toHaveBeenCalled();
});
/* eslint-enable @typescript-eslint/no-explicit-any */
});

describe('cleanupReusableResources', () => {
it('closes both STT pipeline and RT session', async () => {
const sttClose = vi.fn(async () => {});
Expand Down
9 changes: 9 additions & 0 deletions agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ export class AgentSession<
return this.started;
}

/** @internal - Whether the session is closing/draining. */
get _closing(): boolean {
return this.closing;
}

/** @internal - Current run state for testing */
_globalRunState?: RunResult;

Expand Down Expand Up @@ -614,6 +619,10 @@ export class AgentSession<
return;
}

// immediately block the old activity from accepting new user turns
// during the transition window (before drain() formally pauses scheduling)
this.activity?.blockNewTurns();

const _updateActivityTask = async (oldTask: Task<void> | undefined, agent: Agent) => {
if (oldTask) {
try {
Expand Down
Loading