diff --git a/.changeset/block-turn-exceeded-handoff.md b/.changeset/block-turn-exceeded-handoff.md new file mode 100644 index 000000000..69d28ac12 --- /dev/null +++ b/.changeset/block-turn-exceeded-handoff.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +Block user turn exceeded callbacks while an agent handoff is starting. diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index f9678b26e..07d91a15d 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -195,6 +195,7 @@ export class AgentActivity implements RecognitionHooks { private turnDetectionMode?: TurnDetectionMode; private logger = log(); private _schedulingPaused = true; + private newTurnsBlocked = false; private _authorizationPaused = false; private _drainBlockedTasks: Task[] = []; private _currentSpeech?: SpeechHandle; @@ -671,6 +672,11 @@ export class AgentActivity implements RecognitionHooks { return this._schedulingPaused; } + /** @internal */ + blockNewTurns(): void { + this.newTurnsBlocked = true; + } + pauseReplyAuthorization(): void { this._authorizationPaused = true; this.wakeupMainTask(); @@ -1071,7 +1077,7 @@ export class AgentActivity implements RecognitionHooks { return; } - if (this.schedulingPaused) { + if (this.schedulingPaused || this.newTurnsBlocked) { // TODO(shubhra): should we "forward" this new turn to the next agent? this.logger.warn('skipping new realtime generation, the speech scheduling is not running'); return; @@ -1364,6 +1370,7 @@ export class AgentActivity implements RecognitionHooks { if ( !preemptiveOpts.enabled || this.schedulingPaused || + this.newTurnsBlocked || (this._currentSpeech !== undefined && !this._currentSpeech.interrupted) || !(this.llm instanceof LLM) ) { @@ -1563,7 +1570,7 @@ export class AgentActivity implements RecognitionHooks { } async onEndOfTurn(info: EndOfTurnInfo): Promise { - if (this.schedulingPaused) { + if (this.schedulingPaused || this.newTurnsBlocked) { this.cancelPreemptiveGeneration(); this.logger.warn( { user_input: info.newTranscript }, @@ -2003,6 +2010,18 @@ export class AgentActivity implements RecognitionHooks { transcriptConfidence: info.transcriptConfidence, }); + if (this.schedulingPaused || this.newTurnsBlocked) { + this.logger.warn( + { user_input: info.newTranscript }, + 'skipping onUserTurnCompleted, speech scheduling is paused', + ); + if (this.agentSession._closing) { + this.agent._chatCtx.items.push(userMessage); + this.agentSession._conversationItemAdded(userMessage); + } + return; + } + // create a temporary mutable chat context to pass to onUserTurnCompleted // the user can edit it for the current generation, but changes will not be kept inside the // Agent.chatCtx @@ -2027,6 +2046,18 @@ export class AgentActivity implements RecognitionHooks { return; } + if (this.schedulingPaused || this.newTurnsBlocked) { + this.logger.warn( + { user_input: info.newTranscript }, + 'skipping reply to user input, speech scheduling is paused', + ); + if (userMessage && this.agentSession._closing) { + this.agent._chatCtx.items.push(userMessage); + this.agentSession._conversationItemAdded(userMessage); + } + return; + } + const userMetricsReport: MetricsReport = {}; if (info.startedSpeakingAt !== undefined) { userMetricsReport.startedSpeakingAt = info.startedSpeakingAt / 1000; // ms -> seconds @@ -3539,6 +3570,7 @@ export class AgentActivity implements RecognitionHooks { if (!this._schedulingPaused) return; this._schedulingPaused = false; + this.newTurnsBlocked = false; this._mainTask = Task.from(({ signal }) => this.mainTask(signal)); } diff --git a/agents/src/voice/agent_activity_handoff.test.ts b/agents/src/voice/agent_activity_handoff.test.ts index 260ed6835..6fca85551 100644 --- a/agents/src/voice/agent_activity_handoff.test.ts +++ b/agents/src/voice/agent_activity_handoff.test.ts @@ -14,6 +14,8 @@ import { type ReusableResources, cleanupReusableResources, } from './agent_activity.js'; +import type { EndOfTurnInfo } from './audio_recognition.js'; +import type { UserTurnExceededEvent } from './events.js'; initializeLogger({ pretty: false, level: 'silent' }); @@ -288,6 +290,156 @@ describe('AgentActivity RT session reuse eligibility', () => { }); }); +describe('AgentActivity blockNewTurns (handoff transition)', () => { + /* eslint-disable @typescript-eslint/no-explicit-any */ + function createBareActivity(): any { + const activity = Object.create(AgentActivity.prototype); + activity.logger = { warn: vi.fn(), debug: vi.fn(), info: vi.fn(), error: vi.fn() }; + activity.cancelPreemptiveGeneration = vi.fn(); + activity.createSpeechTask = vi.fn(() => ({ cancel: vi.fn() })); + return activity; + } + + const endOfTurnInfo: EndOfTurnInfo = { + newTranscript: 'hello again', + transcriptConfidence: 1, + transcriptionDelay: 0, + endOfUtteranceDelay: 0, + startedSpeakingAt: undefined, + stoppedSpeakingAt: undefined, + }; + + // Regression for the mis-ported #5396 fix: blockNewTurns() must gate the speech + // scheduling paths (here onEndOfTurn) during the handoff transition window — even + // before drain() flips schedulingPaused — so a user turn arriving in that window is + // dropped instead of scheduling a reply against the outgoing agent. + it('onEndOfTurn skips a new user turn while new turns are blocked, even when scheduling is still running', async () => { + const activity = createBareActivity(); + activity._schedulingPaused = false; // scheduling still running (pre-drain window) + activity.newTurnsBlocked = false; + + (activity as AgentActivity).blockNewTurns(); + const handled = await (activity as AgentActivity).onEndOfTurn(endOfTurnInfo); + + expect(handled).toBe(true); + expect(activity.cancelPreemptiveGeneration).toHaveBeenCalledTimes(1); + // The turn is dropped at the guard; userTurnCompleted is never scheduled. + expect(activity.createSpeechTask).not.toHaveBeenCalled(); + }); + + it('onEndOfTurn schedules the turn normally when new turns are not blocked', async () => { + const activity = createBareActivity(); + activity._schedulingPaused = false; + activity.newTurnsBlocked = false; + // `get stt` returns undefined here, short-circuiting the interruption branch. + activity.agent = { stt: undefined }; + activity.agentSession = { stt: undefined }; + activity._currentSpeech = undefined; + activity._userTurnCompletedTask = undefined; + + const handled = await (activity as AgentActivity).onEndOfTurn(endOfTurnInfo); + + expect(handled).toBe(true); + expect(activity.createSpeechTask).toHaveBeenCalledTimes(1); + }); + + // The bot's port wrongly gated the user-turn-exceeded callback on newTurnsBlocked; + // Python never does. onUserTurnExceeded must stay independent of the handoff flag. + it('onUserTurnExceeded is independent of newTurnsBlocked', () => { + const activity = createBareActivity(); + activity._schedulingPaused = false; + activity.newTurnsBlocked = false; + activity.userTurnExceededLocked = false; + activity.userTurnExceededTask = undefined; + + (activity as AgentActivity).blockNewTurns(); + + const ev: UserTurnExceededEvent = { + type: 'user_turn_exceeded', + transcript: 'hi', + accumulatedTranscript: 'hi', + accumulatedWordCount: 10, + duration: 5000, + createdAt: Date.now(), + }; + (activity as AgentActivity).onUserTurnExceeded(ev); + + expect(activity.createSpeechTask).toHaveBeenCalledTimes(1); + }); + + // Parity with Python `_user_turn_completed` (agent_activity.py:2025): when new turns are + // blocked before the turn completes, the reply must be skipped *before* onUserTurnCompleted + // runs. When the session is not closing, the message is dropped (not added to chat ctx). + it('userTurnCompleted skips before the callback when new turns are blocked (not closing)', async () => { + const activity = createBareActivity(); + activity._schedulingPaused = false; + activity.newTurnsBlocked = false; + activity._currentSpeech = undefined; + const onUserTurnCompleted = vi.fn(async () => {}); + activity.agent = { llm: undefined, chatCtx: ChatContext.empty(), onUserTurnCompleted }; + activity.agentSession = { llm: undefined, _closing: false, _conversationItemAdded: vi.fn() }; + + (activity as AgentActivity).blockNewTurns(); + await (activity as any).userTurnCompleted(endOfTurnInfo); + + expect(onUserTurnCompleted).not.toHaveBeenCalled(); + expect(activity.agentSession._conversationItemAdded).not.toHaveBeenCalled(); + expect(activity.createSpeechTask).not.toHaveBeenCalled(); + }); + + // Parity with Python `_user_turn_completed` (agent_activity.py:2025): the skipped message is + // still committed to the chat context when the session is closing, so it is not lost. + it('userTurnCompleted commits the skipped message to chat ctx when closing', async () => { + const activity = createBareActivity(); + activity._schedulingPaused = false; + activity.newTurnsBlocked = false; + activity._currentSpeech = undefined; + const push = vi.fn(); + const conversationItemAdded = vi.fn(); + activity.agent = { + llm: undefined, + chatCtx: ChatContext.empty(), + _chatCtx: { items: { push } }, + onUserTurnCompleted: vi.fn(async () => {}), + }; + activity.agentSession = { + llm: undefined, + _closing: true, + _conversationItemAdded: conversationItemAdded, + }; + + (activity as AgentActivity).blockNewTurns(); + await (activity as any).userTurnCompleted(endOfTurnInfo); + + expect(push).toHaveBeenCalledTimes(1); + expect(conversationItemAdded).toHaveBeenCalledTimes(1); + expect(activity.createSpeechTask).not.toHaveBeenCalled(); + }); + + // Parity with Python `_user_turn_completed` (agent_activity.py:2059): the post-callback + // re-check catches a handoff triggered *inside* onUserTurnCompleted, so no reply is scheduled + // against the outgoing agent even though new turns were not blocked when the turn started. + it('userTurnCompleted re-checks after the callback when a handoff blocks new turns mid-callback', async () => { + const activity = createBareActivity(); + activity._schedulingPaused = false; + activity.newTurnsBlocked = false; + activity._currentSpeech = undefined; + // A handoff inside the user callback blocks new turns after guard A has already passed. + const onUserTurnCompleted = vi.fn(async () => { + activity.newTurnsBlocked = true; + }); + const plainLlm = { id: 'plain-llm' }; + activity.agent = { llm: plainLlm, chatCtx: ChatContext.empty(), onUserTurnCompleted }; + activity.agentSession = { llm: undefined, _closing: false, _conversationItemAdded: vi.fn() }; + + await (activity as any).userTurnCompleted(endOfTurnInfo); + + expect(onUserTurnCompleted).toHaveBeenCalledTimes(1); + expect(activity.createSpeechTask).not.toHaveBeenCalled(); + }); + /* eslint-enable @typescript-eslint/no-explicit-any */ +}); + describe('cleanupReusableResources', () => { it('closes both STT pipeline and RT session', async () => { const sttClose = vi.fn(async () => {}); diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 33920015f..837741f17 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -316,6 +316,11 @@ export class AgentSession< return this.started; } + /** @internal - Whether the session is closing/draining. */ + get _closing(): boolean { + return this.closing; + } + /** @internal - Current run state for testing */ _globalRunState?: RunResult; @@ -614,6 +619,10 @@ export class AgentSession< return; } + // immediately block the old activity from accepting new user turns + // during the transition window (before drain() formally pauses scheduling) + this.activity?.blockNewTurns(); + const _updateActivityTask = async (oldTask: Task | undefined, agent: Agent) => { if (oldTask) { try {