Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 2 additions & 23 deletions livekit-agents/livekit/agents/voice/room_io/_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def __init__(

self._room.on("track_subscribed", self._on_track_available)
self._room.on("track_unpublished", self._on_track_unavailable)
self._room.on("token_refreshed", self._on_token_refreshed)

self._processor = processor
self._processor_owned = False
Expand Down Expand Up @@ -131,7 +130,6 @@ async def aclose(self) -> None:

self._room.off("track_subscribed", self._on_track_available)
self._room.off("track_unpublished", self._on_track_unavailable)
self._room.off("token_refreshed", self._on_token_refreshed)
self._data_ch.close()

@log_exceptions(logger=logger)
Expand Down Expand Up @@ -204,16 +202,6 @@ def _on_track_available(
self._close_stream()
self._stream = self._create_stream(track, participant)
self._publication = publication
if self._processor:
self._processor._on_stream_info_updated(
room_name=self._room.name,
participant_identity=participant.identity,
publication_sid=publication.sid,
)
if self._room._token is not None and self._room._server_url is not None:
self._processor._on_credentials_updated(
token=self._room._token, url=self._room._server_url
)
self._forward_atask = asyncio.create_task(
self._forward_task(self._forward_atask, self._stream, publication, participant)
)
Expand All @@ -238,16 +226,6 @@ def _on_track_unavailable(
if self._on_track_available(publication.track, publication, participant):
return

def _on_token_refreshed(self) -> None:
if (
self._processor is not None
and self._room._token is not None
and self._room._server_url is not None
):
self._processor._on_credentials_updated(
token=self._room._token, url=self._room._server_url
)


class _ParticipantAudioInputStream(_ParticipantInputStream[rtc.AudioFrame], AudioInput):
def __init__(
Expand Down Expand Up @@ -304,8 +282,9 @@ def _create_stream(self, track: rtc.Track, participant: rtc.Participant) -> rtc.
track=track,
sample_rate=self._sample_rate,
num_channels=self._num_channels,
noise_cancellation=noise_cancellation,
frame_size_ms=self._frame_size_ms,
noise_cancellation=noise_cancellation,
noise_cancellation_leave_open=True,
)

@override
Expand Down
21 changes: 1 addition & 20 deletions tests/test_room_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,11 @@ async def test_participant_input_stream_aclose_unregisters_track_unpublished() -

assert room.listener_count("track_subscribed") == 1
assert room.listener_count("track_unpublished") == 1
assert room.listener_count("token_refreshed") == 1

await stream.aclose()

assert room.listener_count("track_subscribed") == 0
assert room.listener_count("track_unpublished") == 0
assert room.listener_count("token_refreshed") == 0


@pytest.mark.asyncio
Expand Down Expand Up @@ -247,16 +245,12 @@ async def test_direct_processor_lifecycle() -> None:

assert stream._processor is processor
assert processor.close_calls == 0
assert len(processor.stream_info_calls) == 1
assert len(processor.credentials_calls) == 1

# track switch — processor must survive
stream._on_track_available(track2, pub2, participant)

assert stream._processor is processor
assert processor.close_calls == 0
assert len(processor.stream_info_calls) == 2
assert len(processor.credentials_calls) == 2

# final teardown closes the processor exactly once
await stream.aclose()
Expand Down Expand Up @@ -288,17 +282,13 @@ def selector(_params: NoiseCancellationParams) -> _MockFrameProcessor:

assert len(processors) == 1
assert stream._processor is processors[0]
assert len(processors[0].stream_info_calls) == 1
assert len(processors[0].credentials_calls) == 1

# track switch — old processor closed, new one receives lifecycle calls
stream._on_track_available(track2, pub2, participant)

assert len(processors) == 2
assert processors[0].close_calls == 1
assert stream._processor is processors[1]
assert len(processors[1].stream_info_calls) == 1
assert len(processors[1].credentials_calls) == 1

# final teardown closes the active processor
await stream.aclose()
Expand All @@ -307,8 +297,7 @@ def selector(_params: NoiseCancellationParams) -> _MockFrameProcessor:

@pytest.mark.asyncio
async def test_selector_processor_track_disappears() -> None:
"""When a track vanishes with no replacement, the selector-created processor
is closed and subsequent token refreshes don't touch it."""
"""When a track vanishes with no replacement, the selector-created processor is closed."""
room = _FakeRoom()
processor = _MockFrameProcessor()
stream = _make_audio_input_stream(room, noise_cancellation=lambda _params: processor)
Expand All @@ -320,21 +309,13 @@ async def test_selector_processor_track_disappears() -> None:
stream._on_track_available(track, publication, participant)

assert stream._processor is processor
assert len(processor.credentials_calls) == 1

# track unpublished with no replacement
stream._on_track_unavailable(publication, participant)

assert processor.close_calls == 1
assert stream._processor is None

# token refresh must not reach the closed processor
room._token = "refreshed-token"
room._server_url = "wss://refreshed.livekit.cloud"
stream._on_token_refreshed()

assert len(processor.credentials_calls) == 1

await stream.aclose()


Expand Down
Loading