From 538fc134b80044f3f903a60d749cef87a4ad3833 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Mon, 18 May 2026 16:53:50 -0400 Subject: [PATCH 01/20] feat: add MVP of propagating room downwards from room -> track -> audio stream And extracting metadata from that room that can be fed into the frame processor. --- livekit-rtc/livekit/rtc/audio_stream.py | 65 ++++++++++++++++++++++++- livekit-rtc/livekit/rtc/room.py | 8 +++ livekit-rtc/livekit/rtc/track.py | 20 +++++++- 3 files changed, 91 insertions(+), 2 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 875b8db2..9fe12bdb 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -17,7 +17,7 @@ import asyncio import json from dataclasses import dataclass -from typing import Any, AsyncIterator, Optional +from typing import TYPE_CHECKING, Any, AsyncIterator, Optional from ._ffi_client import FfiClient, FfiHandle from ._proto import audio_frame_pb2 as proto_audio_frame @@ -30,6 +30,9 @@ from .track import Track from .frame_processor import FrameProcessor +if TYPE_CHECKING: + from .room import Room + @dataclass class AudioFrameEvent: @@ -65,6 +68,7 @@ def __init__( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + room: Optional["Room"] = None, **kwargs: Any, ) -> None: """Initialize an `AudioStream` instance. @@ -81,6 +85,9 @@ def __init__( noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional): If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance created by the noise cancellation module. + room (Optional[Room], optional): The room this stream's track belongs to, used to + resolve `room_name`, `participant_identity`, and `publication_sid`. May be `None` + if the track is not (yet) associated with a room. Example: ```python @@ -98,6 +105,8 @@ def __init__( ``` """ self._track: Track | None = track + self._room: Room | None = room + print("ROOM:", room) self._sample_rate = sample_rate self._num_channels = num_channels self._frame_size_ms = frame_size_ms @@ -132,6 +141,9 @@ def __init__( self._ffi_handle = FfiHandle(stream.handle.id) self._info = stream.info + if self._track is not None: + self._track._register_audio_stream(self) + @classmethod def from_participant( cls, @@ -144,6 +156,7 @@ def from_participant( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + room: Optional["Room"] = None, ) -> AudioStream: """Create an `AudioStream` from a participant's audio track. @@ -181,6 +194,7 @@ def from_participant( num_channels=num_channels, noise_cancellation=noise_cancellation, frame_size_ms=frame_size_ms, + room=room, ) @classmethod @@ -194,6 +208,7 @@ def from_track( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + room: Optional["Room"] = None, ) -> AudioStream: """Create an `AudioStream` from an existing audio track. @@ -227,8 +242,54 @@ def from_track( num_channels=num_channels, noise_cancellation=noise_cancellation, frame_size_ms=frame_size_ms, + room=room, ) + def _set_room(self, room: Optional["Room"]) -> None: + self._room = room + print("ROOM UPDATE:", room) + + @property + def room(self) -> Optional["Room"]: + return self._room + + @property + def room_name(self) -> Optional[str]: + return self._room.name if self._room is not None else None + + @property + def participant_identity(self) -> Optional[str]: + pub = self._find_publication() + if pub is None: + return None + identity, _ = pub + return identity + + @property + def publication_sid(self) -> Optional[str]: + pub = self._find_publication() + if pub is None: + return None + _, sid = pub + return sid + + def _find_publication(self) -> Optional[tuple[str, str]]: + if self._room is None or self._track is None: + return None + track_sid = self._track.sid + if not track_sid: + return None + for participant in self._room.remote_participants.values(): + publication = participant.track_publications.get(track_sid) + if publication is not None: + return participant.identity, publication.sid + local = self._room._local_participant + if local is not None: + for publication in local.track_publications.values(): + if publication.sid == track_sid: + return local.identity, publication.sid + return None + def __del__(self) -> None: FfiClient.instance.queue.unsubscribe(self._ffi_queue) @@ -303,6 +364,8 @@ async def aclose(self) -> None: This method cleans up resources associated with the audio stream and waits for any pending operations to complete. """ + if self._track is not None: + self._track._unregister_audio_stream(self) self._ffi_handle.dispose() await self._task diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index e3619d9c..f3808b16 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -733,10 +733,14 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None: sid = event.local_track_published.track_sid lpublication = self.local_participant.track_publications[sid] ltrack = lpublication.track + if ltrack is not None: + ltrack._set_room(self) self.emit("local_track_published", lpublication, ltrack) elif which == "local_track_unpublished": sid = event.local_track_unpublished.publication_sid lpublication = self.local_participant.track_publications[sid] + if lpublication.track is not None: + lpublication.track._set_room(None) self.emit("local_track_unpublished", lpublication) elif which == "local_track_republished": # The SDK auto-republished a local track during a full @@ -777,10 +781,12 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None: rpublication._subscribed = True if track_info.kind == TrackKind.KIND_VIDEO: remote_video_track = RemoteVideoTrack(owned_track_info) + remote_video_track._set_room(self) rpublication._track = remote_video_track self.emit("track_subscribed", remote_video_track, rpublication, rparticipant) elif track_info.kind == TrackKind.KIND_AUDIO: remote_audio_track = RemoteAudioTrack(owned_track_info) + remote_audio_track._set_room(self) rpublication._track = remote_audio_track self.emit("track_subscribed", remote_audio_track, rpublication, rparticipant) elif which == "track_unsubscribed": @@ -788,6 +794,8 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None: rparticipant = self._remote_participants[identity] rpublication = rparticipant.track_publications[event.track_unsubscribed.track_sid] rtrack = rpublication.track + if rtrack is not None: + rtrack._set_room(None) rpublication._track = None rpublication._subscribed = False self.emit("track_unsubscribed", rtrack, rpublication, rparticipant) diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index 0eeb3465..188900c5 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, List, Union +import weakref +from typing import TYPE_CHECKING, List, Optional, Union from ._ffi_client import FfiHandle, FfiClient from ._proto import ffi_pb2 as proto_ffi from ._proto import track_pb2 as proto_track @@ -20,6 +21,8 @@ if TYPE_CHECKING: from .audio_source import AudioSource + from .audio_stream import AudioStream + from .room import Room from .video_source import VideoSource @@ -27,6 +30,21 @@ class Track: def __init__(self, owned_info: proto_track.OwnedTrack): self._info = owned_info.info self._ffi_handle = FfiHandle(owned_info.handle.id) + self._room: Optional["Room"] = None + self._audio_streams: "weakref.WeakSet[AudioStream]" = weakref.WeakSet() + + def _set_room(self, room: Optional["Room"]) -> None: + self._room = room + for stream in self._audio_streams: + stream._set_room(room) + + def _register_audio_stream(self, stream: "AudioStream") -> None: + self._audio_streams.add(stream) + if self._room is not None: + stream._set_room(self._room) + + def _unregister_audio_stream(self, stream: "AudioStream") -> None: + self._audio_streams.discard(stream) @property def sid(self) -> str: From 7c7eaa42f34d5c0916036497e770c5236a1a0e91 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Tue, 19 May 2026 16:41:27 -0400 Subject: [PATCH 02/20] feat: call _on_stream_info_updated with parent room reference on audio_stream --- livekit-rtc/livekit/rtc/audio_stream.py | 46 ++++++++++--------------- livekit-rtc/livekit/rtc/track.py | 12 ++++--- 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 9fe12bdb..de6a18e3 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -16,6 +16,7 @@ import asyncio import json +import weakref from dataclasses import dataclass from typing import TYPE_CHECKING, Any, AsyncIterator, Optional @@ -105,7 +106,9 @@ def __init__( ``` """ self._track: Track | None = track - self._room: Room | None = room + self._room_ref: "Optional[weakref.ref[Room]]" = ( + weakref.ref(room) if room is not None else None + ) print("ROOM:", room) self._sample_rate = sample_rate self._num_channels = num_channels @@ -246,44 +249,33 @@ def from_track( ) def _set_room(self, room: Optional["Room"]) -> None: - self._room = room + self._room_ref = weakref.ref(room) if room is not None else None print("ROOM UPDATE:", room) - @property - def room(self) -> Optional["Room"]: - return self._room - - @property - def room_name(self) -> Optional[str]: - return self._room.name if self._room is not None else None - - @property - def participant_identity(self) -> Optional[str]: - pub = self._find_publication() - if pub is None: - return None - identity, _ = pub - return identity + if self._processor: + room = self._resolve_room() + participant_identity, publication_sid = self._find_publication() or ("", "") + self._processor._on_stream_info_updated( + room_name=room.name if room is not None else "", # FIXME: default value? + participant_identity=participant_identity, + publication_sid=publication_sid, + ) - @property - def publication_sid(self) -> Optional[str]: - pub = self._find_publication() - if pub is None: - return None - _, sid = pub - return sid + def _resolve_room(self) -> Optional["Room"]: + return self._room_ref() if self._room_ref is not None else None def _find_publication(self) -> Optional[tuple[str, str]]: - if self._room is None or self._track is None: + room = self._resolve_room() + if room is None or self._track is None: return None track_sid = self._track.sid if not track_sid: return None - for participant in self._room.remote_participants.values(): + for participant in room.remote_participants.values(): publication = participant.track_publications.get(track_sid) if publication is not None: return participant.identity, publication.sid - local = self._room._local_participant + local = room._local_participant if local is not None: for publication in local.track_publications.values(): if publication.sid == track_sid: diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index 188900c5..e07dfc0a 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -30,18 +30,22 @@ class Track: def __init__(self, owned_info: proto_track.OwnedTrack): self._info = owned_info.info self._ffi_handle = FfiHandle(owned_info.handle.id) - self._room: Optional["Room"] = None + self._room_ref: "Optional[weakref.ref[Room]]" = None self._audio_streams: "weakref.WeakSet[AudioStream]" = weakref.WeakSet() + def _resolve_room(self) -> Optional["Room"]: + return self._room_ref() if self._room_ref is not None else None + def _set_room(self, room: Optional["Room"]) -> None: - self._room = room + self._room_ref = weakref.ref(room) if room is not None else None for stream in self._audio_streams: stream._set_room(room) def _register_audio_stream(self, stream: "AudioStream") -> None: self._audio_streams.add(stream) - if self._room is not None: - stream._set_room(self._room) + room = self._resolve_room() + if room is not None: + stream._set_room(room) def _unregister_audio_stream(self, stream: "AudioStream") -> None: self._audio_streams.discard(stream) From 12718d1e656e45be8e08ab9130bca8575afe1f41 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 20 May 2026 12:58:29 -0400 Subject: [PATCH 03/20] feat: call _on_credentials_updated with token / server url extracted from room --- livekit-rtc/livekit/rtc/audio_stream.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index de6a18e3..3b26b551 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -254,6 +254,9 @@ def _set_room(self, room: Optional["Room"]) -> None: if self._processor: room = self._resolve_room() + if room and room._token is not None and room._server_url is not None: + self._processor._on_credentials_updated(token=room._token, url=room._server_url) + participant_identity, publication_sid = self._find_publication() or ("", "") self._processor._on_stream_info_updated( room_name=room.name if room is not None else "", # FIXME: default value? From af26b3dd3672d0400e383cbc558d0933ef84b411 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 20 May 2026 13:42:18 -0400 Subject: [PATCH 04/20] fix: remove debugging logs --- livekit-rtc/livekit/rtc/audio_stream.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 3b26b551..a2348242 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -109,7 +109,6 @@ def __init__( self._room_ref: "Optional[weakref.ref[Room]]" = ( weakref.ref(room) if room is not None else None ) - print("ROOM:", room) self._sample_rate = sample_rate self._num_channels = num_channels self._frame_size_ms = frame_size_ms @@ -250,7 +249,6 @@ def from_track( def _set_room(self, room: Optional["Room"]) -> None: self._room_ref = weakref.ref(room) if room is not None else None - print("ROOM UPDATE:", room) if self._processor: room = self._resolve_room() From 5ecca5dd099a422775103443782b7c0d2b3736fd Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 20 May 2026 13:45:05 -0400 Subject: [PATCH 05/20] fix: address lint errors --- livekit-rtc/livekit/rtc/audio_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index a2348242..0704baa6 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -257,7 +257,7 @@ def _set_room(self, room: Optional["Room"]) -> None: participant_identity, publication_sid = self._find_publication() or ("", "") self._processor._on_stream_info_updated( - room_name=room.name if room is not None else "", # FIXME: default value? + room_name=room.name if room is not None else "", # FIXME: default value? participant_identity=participant_identity, publication_sid=publication_sid, ) From af56d61e430c448a2475974c889a5351884829b4 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 20 May 2026 13:46:49 -0400 Subject: [PATCH 06/20] feat: only call frame processor handlers if room is set --- livekit-rtc/livekit/rtc/audio_stream.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 0704baa6..4abdce44 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -252,15 +252,16 @@ def _set_room(self, room: Optional["Room"]) -> None: if self._processor: room = self._resolve_room() - if room and room._token is not None and room._server_url is not None: - self._processor._on_credentials_updated(token=room._token, url=room._server_url) - - participant_identity, publication_sid = self._find_publication() or ("", "") - self._processor._on_stream_info_updated( - room_name=room.name if room is not None else "", # FIXME: default value? - participant_identity=participant_identity, - publication_sid=publication_sid, - ) + if room: + if room._token is not None and room._server_url is not None: + self._processor._on_credentials_updated(token=room._token, url=room._server_url) + + participant_identity, publication_sid = self._find_publication() or ("", "") + self._processor._on_stream_info_updated( + room_name=room.name, + participant_identity=participant_identity, + publication_sid=publication_sid, + ) def _resolve_room(self) -> Optional["Room"]: return self._room_ref() if self._room_ref is not None else None From f62c247feb7917d5aa1898dfa97297f4a6070b38 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 20 May 2026 16:58:34 -0400 Subject: [PATCH 07/20] fix: properly intercept room refresh token events --- livekit-rtc/livekit/rtc/audio_stream.py | 42 ++++++++++++++++--------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 4abdce44..f6e6c30e 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -106,9 +106,7 @@ def __init__( ``` """ self._track: Track | None = track - self._room_ref: "Optional[weakref.ref[Room]]" = ( - weakref.ref(room) if room is not None else None - ) + self._room_ref: "Optional[weakref.ref[Room]]" = None self._sample_rate = sample_rate self._num_channels = num_channels self._frame_size_ms = frame_size_ms @@ -143,6 +141,8 @@ def __init__( self._ffi_handle = FfiHandle(stream.handle.id) self._info = stream.info + self._set_room(room) + if self._track is not None: self._track._register_audio_stream(self) @@ -248,20 +248,33 @@ def from_track( ) def _set_room(self, room: Optional["Room"]) -> None: + old_room = self._resolve_room() + if old_room is not room: + if old_room is not None: + old_room.off("token_refreshed", self._on_room_token_refreshed) + if room is not None: + room.on("token_refreshed", self._on_room_token_refreshed) + self._room_ref = weakref.ref(room) if room is not None else None - if self._processor: - room = self._resolve_room() - if room: - if room._token is not None and room._server_url is not None: - self._processor._on_credentials_updated(token=room._token, url=room._server_url) + if self._processor and room is not None: + if room._token is not None and room._server_url is not None: + self._processor._on_credentials_updated(token=room._token, url=room._server_url) - participant_identity, publication_sid = self._find_publication() or ("", "") - self._processor._on_stream_info_updated( - room_name=room.name, - participant_identity=participant_identity, - publication_sid=publication_sid, - ) + participant_identity, publication_sid = self._find_publication() or ("", "") + self._processor._on_stream_info_updated( + room_name=room.name, + participant_identity=participant_identity, + publication_sid=publication_sid, + ) + + def _on_room_token_refreshed(self) -> None: + if self._processor is None: + return + room = self._resolve_room() + if room is None or room._token is None or room._server_url is None: + return + self._processor._on_credentials_updated(token=room._token, url=room._server_url) def _resolve_room(self) -> Optional["Room"]: return self._room_ref() if self._room_ref is not None else None @@ -360,6 +373,7 @@ async def aclose(self) -> None: """ if self._track is not None: self._track._unregister_audio_stream(self) + self._set_room(None) self._ffi_handle.dispose() await self._task From e7ab10eb4f080cb846b5f0137a39e49811c091d3 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Tue, 26 May 2026 11:27:47 -0400 Subject: [PATCH 08/20] feat: add from __future__ import annotations to remove string types --- livekit-rtc/livekit/rtc/audio_stream.py | 12 ++++++------ livekit-rtc/livekit/rtc/track.py | 14 ++++++++------ 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index f6e6c30e..39d57d4b 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -69,7 +69,7 @@ def __init__( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, - room: Optional["Room"] = None, + room: Optional[Room] = None, **kwargs: Any, ) -> None: """Initialize an `AudioStream` instance. @@ -106,7 +106,7 @@ def __init__( ``` """ self._track: Track | None = track - self._room_ref: "Optional[weakref.ref[Room]]" = None + self._room_ref: Optional[weakref.ref[Room]] = None self._sample_rate = sample_rate self._num_channels = num_channels self._frame_size_ms = frame_size_ms @@ -158,7 +158,7 @@ def from_participant( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, - room: Optional["Room"] = None, + room: Optional[Room] = None, ) -> AudioStream: """Create an `AudioStream` from a participant's audio track. @@ -210,7 +210,7 @@ def from_track( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, - room: Optional["Room"] = None, + room: Optional[Room] = None, ) -> AudioStream: """Create an `AudioStream` from an existing audio track. @@ -247,7 +247,7 @@ def from_track( room=room, ) - def _set_room(self, room: Optional["Room"]) -> None: + def _set_room(self, room: Optional[Room]) -> None: old_room = self._resolve_room() if old_room is not room: if old_room is not None: @@ -276,7 +276,7 @@ def _on_room_token_refreshed(self) -> None: return self._processor._on_credentials_updated(token=room._token, url=room._server_url) - def _resolve_room(self) -> Optional["Room"]: + def _resolve_room(self) -> Optional[Room]: return self._room_ref() if self._room_ref is not None else None def _find_publication(self) -> Optional[tuple[str, str]]: diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index e07dfc0a..d1c481c4 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import weakref from typing import TYPE_CHECKING, List, Optional, Union from ._ffi_client import FfiHandle, FfiClient @@ -30,24 +32,24 @@ class Track: def __init__(self, owned_info: proto_track.OwnedTrack): self._info = owned_info.info self._ffi_handle = FfiHandle(owned_info.handle.id) - self._room_ref: "Optional[weakref.ref[Room]]" = None - self._audio_streams: "weakref.WeakSet[AudioStream]" = weakref.WeakSet() + self._room_ref: Optional[weakref.ref[Room]] = None + self._audio_streams: weakref.WeakSet[AudioStream] = weakref.WeakSet() - def _resolve_room(self) -> Optional["Room"]: + def _resolve_room(self) -> Optional[Room]: return self._room_ref() if self._room_ref is not None else None - def _set_room(self, room: Optional["Room"]) -> None: + def _set_room(self, room: Optional[Room]) -> None: self._room_ref = weakref.ref(room) if room is not None else None for stream in self._audio_streams: stream._set_room(room) - def _register_audio_stream(self, stream: "AudioStream") -> None: + def _register_audio_stream(self, stream: AudioStream) -> None: self._audio_streams.add(stream) room = self._resolve_room() if room is not None: stream._set_room(room) - def _unregister_audio_stream(self, stream: "AudioStream") -> None: + def _unregister_audio_stream(self, stream: AudioStream) -> None: self._audio_streams.discard(stream) @property From f7f422d11a3414ce5afb64fca9d8797e643766e6 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Tue, 26 May 2026 11:30:56 -0400 Subject: [PATCH 09/20] fix: address incorrect docs --- livekit-rtc/livekit/rtc/audio_stream.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 39d57d4b..5f716fb8 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -87,8 +87,7 @@ def __init__( If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance created by the noise cancellation module. room (Optional[Room], optional): The room this stream's track belongs to, used to - resolve `room_name`, `participant_identity`, and `publication_sid`. May be `None` - if the track is not (yet) associated with a room. + resolve metadata required to properly initialize an associated FrameProcessor. Example: ```python From 24f2b6e5bb3a85ce67cf871a1b42cf49f59bd6b7 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Tue, 26 May 2026 15:19:07 -0400 Subject: [PATCH 10/20] refactor: centralize frame processor state logic into Track, not AudioStream This makes it less complex. --- livekit-rtc/livekit/rtc/audio_stream.py | 79 ++++++------------------- livekit-rtc/livekit/rtc/track.py | 53 ++++++++++++++++- 2 files changed, 68 insertions(+), 64 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 5f716fb8..373f1611 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -16,9 +16,8 @@ import asyncio import json -import weakref from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, AsyncIterator, Optional +from typing import Any, AsyncIterator, Optional from ._ffi_client import FfiClient, FfiHandle from ._proto import audio_frame_pb2 as proto_audio_frame @@ -31,9 +30,6 @@ from .track import Track from .frame_processor import FrameProcessor -if TYPE_CHECKING: - from .room import Room - @dataclass class AudioFrameEvent: @@ -69,7 +65,6 @@ def __init__( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, - room: Optional[Room] = None, **kwargs: Any, ) -> None: """Initialize an `AudioStream` instance. @@ -86,8 +81,6 @@ def __init__( noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional): If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance created by the noise cancellation module. - room (Optional[Room], optional): The room this stream's track belongs to, used to - resolve metadata required to properly initialize an associated FrameProcessor. Example: ```python @@ -105,7 +98,6 @@ def __init__( ``` """ self._track: Track | None = track - self._room_ref: Optional[weakref.ref[Room]] = None self._sample_rate = sample_rate self._num_channels = num_channels self._frame_size_ms = frame_size_ms @@ -140,8 +132,6 @@ def __init__( self._ffi_handle = FfiHandle(stream.handle.id) self._info = stream.info - self._set_room(room) - if self._track is not None: self._track._register_audio_stream(self) @@ -157,7 +147,6 @@ def from_participant( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, - room: Optional[Room] = None, ) -> AudioStream: """Create an `AudioStream` from a participant's audio track. @@ -195,7 +184,6 @@ def from_participant( num_channels=num_channels, noise_cancellation=noise_cancellation, frame_size_ms=frame_size_ms, - room=room, ) @classmethod @@ -209,7 +197,6 @@ def from_track( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, - room: Optional[Room] = None, ) -> AudioStream: """Create an `AudioStream` from an existing audio track. @@ -243,58 +230,27 @@ def from_track( num_channels=num_channels, noise_cancellation=noise_cancellation, frame_size_ms=frame_size_ms, - room=room, ) - def _set_room(self, room: Optional[Room]) -> None: - old_room = self._resolve_room() - if old_room is not room: - if old_room is not None: - old_room.off("token_refreshed", self._on_room_token_refreshed) - if room is not None: - room.on("token_refreshed", self._on_room_token_refreshed) - - self._room_ref = weakref.ref(room) if room is not None else None - - if self._processor and room is not None: - if room._token is not None and room._server_url is not None: - self._processor._on_credentials_updated(token=room._token, url=room._server_url) - - participant_identity, publication_sid = self._find_publication() or ("", "") - self._processor._on_stream_info_updated( - room_name=room.name, - participant_identity=participant_identity, - publication_sid=publication_sid, - ) - - def _on_room_token_refreshed(self) -> None: + def _on_processor_stream_info_updated( + self, + *, + room_name: str, + participant_identity: str, + publication_sid: str, + ) -> None: if self._processor is None: return - room = self._resolve_room() - if room is None or room._token is None or room._server_url is None: + self._processor._on_stream_info_updated( + room_name=room_name, + participant_identity=participant_identity, + publication_sid=publication_sid, + ) + + def _on_processor_credentials_updated(self, *, token: str, url: str) -> None: + if self._processor is None: return - self._processor._on_credentials_updated(token=room._token, url=room._server_url) - - def _resolve_room(self) -> Optional[Room]: - return self._room_ref() if self._room_ref is not None else None - - def _find_publication(self) -> Optional[tuple[str, str]]: - room = self._resolve_room() - if room is None or self._track is None: - return None - track_sid = self._track.sid - if not track_sid: - return None - for participant in room.remote_participants.values(): - publication = participant.track_publications.get(track_sid) - if publication is not None: - return participant.identity, publication.sid - local = room._local_participant - if local is not None: - for publication in local.track_publications.values(): - if publication.sid == track_sid: - return local.identity, publication.sid - return None + self._processor._on_credentials_updated(token=token, url=url) def __del__(self) -> None: FfiClient.instance.queue.unsubscribe(self._ffi_queue) @@ -372,7 +328,6 @@ async def aclose(self) -> None: """ if self._track is not None: self._track._unregister_audio_stream(self) - self._set_room(None) self._ffi_handle.dispose() await self._task diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index d1c481c4..c4b96e47 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -39,15 +39,64 @@ def _resolve_room(self) -> Optional[Room]: return self._room_ref() if self._room_ref is not None else None def _set_room(self, room: Optional[Room]) -> None: + old_room = self._resolve_room() + if old_room is not room: + if old_room is not None: + old_room.off("token_refreshed", self._on_room_token_refreshed) + if room is not None: + room.on("token_refreshed", self._on_room_token_refreshed) + self._room_ref = weakref.ref(room) if room is not None else None + for stream in self._audio_streams: - stream._set_room(room) + self._push_processor_metadata_to_stream(stream, room) + + def _on_room_token_refreshed(self) -> None: + room = self._resolve_room() + if room is None or room._token is None or room._server_url is None: + return + for stream in self._audio_streams: + stream._on_processor_credentials_updated(token=room._token, url=room._server_url) + + def _push_processor_metadata_to_stream(self, stream: AudioStream, room: Optional[Room]) -> None: + if room is None: + # track left a room — clear processor's room context + stream._on_processor_stream_info_updated( + room_name="", participant_identity="", publication_sid="" + ) + stream._on_processor_credentials_updated(token="", url="") + return + + identity = "" + pub_sid = "" + track_sid = self.sid + if track_sid: + for participant in room.remote_participants.values(): + publication = participant.track_publications.get(track_sid) + if publication is not None: + identity, pub_sid = participant.identity, publication.sid + break + else: + local = room._local_participant + if local is not None: + for publication in local.track_publications.values(): + if publication.sid == track_sid: + identity, pub_sid = local.identity, publication.sid + break + + stream._on_processor_stream_info_updated( + room_name=room.name, + participant_identity=identity, + publication_sid=pub_sid, + ) + if room._token is not None and room._server_url is not None: + stream._on_processor_credentials_updated(token=room._token, url=room._server_url) def _register_audio_stream(self, stream: AudioStream) -> None: self._audio_streams.add(stream) room = self._resolve_room() if room is not None: - stream._set_room(room) + self._push_processor_metadata_to_stream(stream, room) def _unregister_audio_stream(self, stream: AudioStream) -> None: self._audio_streams.discard(stream) From ad325743a8bf07c167a4d50719c162f995815f58 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Tue, 26 May 2026 16:08:07 -0400 Subject: [PATCH 11/20] feat: add auto cleanup of FrameProcessor as opt-out The agents sdk can pass this opt-out flag so that it can reuse the frame processor across many audio tracks --- livekit-rtc/livekit/rtc/audio_stream.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 373f1611..37071869 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -65,6 +65,8 @@ def __init__( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + *, + noise_cancellation_leave_open: bool = False, **kwargs: Any, ) -> None: """Initialize an `AudioStream` instance. @@ -81,6 +83,9 @@ def __init__( noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional): If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance created by the noise cancellation module. + noise_cancellation_leave_open (bool): + When the audio stream closes, leaves the FrameProcessor in an unclosed state so it + can be used with another AudioStream. Example: ```python @@ -113,11 +118,13 @@ def __init__( self._audio_filter_module: str | None = None self._audio_filter_options: dict[str, Any] | None = None self._processor: FrameProcessor[AudioFrame] | None = None + self._processor_leave_open = False if isinstance(noise_cancellation, NoiseCancellationOptions): self._audio_filter_module = noise_cancellation.module_id self._audio_filter_options = noise_cancellation.options elif isinstance(noise_cancellation, FrameProcessor): self._processor = noise_cancellation + self._processor_leave_open = noise_cancellation_leave_open self._task = self._loop.create_task(self._run()) self._task.add_done_callback(task_done_logger) @@ -253,6 +260,9 @@ def _on_processor_credentials_updated(self, *, token: str, url: str) -> None: self._processor._on_credentials_updated(token=token, url=url) def __del__(self) -> None: + if self._processor is not None and not self._processor_leave_open: + self._processor._close() + FfiClient.instance.queue.unsubscribe(self._ffi_queue) def _create_owned_stream(self) -> Any: From ce5e7937f878b8c577986b9f84e3c06b4efb4a9b Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Tue, 26 May 2026 16:09:08 -0400 Subject: [PATCH 12/20] fix: disable no-op credentials push Need to think about this a bit more, this pattern as written won't work, since the FrameProcessor today can't have a set of no-op credentials pushed. --- livekit-rtc/livekit/rtc/track.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index c4b96e47..f1ad4c68 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -61,10 +61,14 @@ def _on_room_token_refreshed(self) -> None: def _push_processor_metadata_to_stream(self, stream: AudioStream, room: Optional[Room]) -> None: if room is None: # track left a room — clear processor's room context + # FIXME: This isn't really good, and I can't figure out what should happen here + # Closing the processor doesn't work (the track could get added to another room later) + # Empty values like this don't work, because it causes a drm::Error in the plugin + # Talk to lukas about this in a 1:1 and see if he can think of anything better stream._on_processor_stream_info_updated( room_name="", participant_identity="", publication_sid="" ) - stream._on_processor_credentials_updated(token="", url="") + # stream._on_processor_credentials_updated(token="", url="") return identity = "" From b9f34d0de07c2d686cc7fbab2b33b4db2ec7341b Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Tue, 26 May 2026 16:13:57 -0400 Subject: [PATCH 13/20] fix: move processor close from __del__ to aclose --- livekit-rtc/livekit/rtc/audio_stream.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 37071869..48f985ea 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -260,9 +260,6 @@ def _on_processor_credentials_updated(self, *, token: str, url: str) -> None: self._processor._on_credentials_updated(token=token, url=url) def __del__(self) -> None: - if self._processor is not None and not self._processor_leave_open: - self._processor._close() - FfiClient.instance.queue.unsubscribe(self._ffi_queue) def _create_owned_stream(self) -> Any: @@ -336,6 +333,8 @@ async def aclose(self) -> None: This method cleans up resources associated with the audio stream and waits for any pending operations to complete. """ + if self._processor is not None and not self._processor_leave_open: + self._processor._close() if self._track is not None: self._track._unregister_audio_stream(self) self._ffi_handle.dispose() From 4c73cc81b38c47a04e6f42b7e6c61d178270e9b5 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Tue, 26 May 2026 16:37:11 -0400 Subject: [PATCH 14/20] fix: proxy throgh noise_cancellation_leave_open into AudioStream.from_track --- livekit-rtc/livekit/rtc/audio_stream.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 48f985ea..4ebe3af6 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -154,6 +154,7 @@ def from_participant( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + noise_cancellation_leave_open: bool = False, ) -> AudioStream: """Create an `AudioStream` from a participant's audio track. @@ -189,8 +190,9 @@ def from_participant( track=None, # type: ignore sample_rate=sample_rate, num_channels=num_channels, - noise_cancellation=noise_cancellation, frame_size_ms=frame_size_ms, + noise_cancellation=noise_cancellation, + noise_cancellation_leave_open=noise_cancellation_leave_open, ) @classmethod From 22f48965cffe8952a27319bd40e824fead7db7a4 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Tue, 26 May 2026 16:54:35 -0400 Subject: [PATCH 15/20] fix: include missed noise_cancellation_leave_open in from_track --- livekit-rtc/livekit/rtc/audio_stream.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 4ebe3af6..d82c2792 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -65,7 +65,6 @@ def __init__( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, - *, noise_cancellation_leave_open: bool = False, **kwargs: Any, ) -> None: @@ -206,6 +205,7 @@ def from_track( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + noise_cancellation_leave_open: bool = False, ) -> AudioStream: """Create an `AudioStream` from an existing audio track. @@ -215,9 +215,12 @@ def from_track( capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded). sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000. num_channels (int, optional): The number of audio channels. Defaults to 1. - noise_cancellation (Optional[NoiseCancellationOptions], optional): - If noise cancellation is used, pass a `NoiseCancellationOptions` instance + noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional): + If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance created by the noise cancellation module. + noise_cancellation_leave_open (bool): + When the audio stream closes, leaves the FrameProcessor in an unclosed state so it + can be used with another AudioStream. Returns: AudioStream: An instance of `AudioStream` that can be used to receive audio frames. @@ -237,8 +240,9 @@ def from_track( capacity=capacity, sample_rate=sample_rate, num_channels=num_channels, - noise_cancellation=noise_cancellation, frame_size_ms=frame_size_ms, + noise_cancellation=noise_cancellation, + noise_cancellation_leave_open=noise_cancellation_leave_open, ) def _on_processor_stream_info_updated( From 2dbe3504591653e22c0f53bc2a4b4c4816ca56e7 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Tue, 26 May 2026 17:05:19 -0400 Subject: [PATCH 16/20] fix: address type checker warning --- livekit-rtc/livekit/rtc/track.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index f1ad4c68..3cffb702 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -83,9 +83,9 @@ def _push_processor_metadata_to_stream(self, stream: AudioStream, room: Optional else: local = room._local_participant if local is not None: - for publication in local.track_publications.values(): - if publication.sid == track_sid: - identity, pub_sid = local.identity, publication.sid + for local_publication in local.track_publications.values(): + if local_publication.sid == track_sid: + identity, pub_sid = local.identity, local_publication.sid break stream._on_processor_stream_info_updated( From 07fec790c8dddc6ede2747fb6122c702511e481c Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 27 May 2026 11:28:21 -0400 Subject: [PATCH 17/20] feat: add new _on_stream_info_cleared / _on_credentials_cleared FrameProcessor methods, and use them when moving a track out of a room --- livekit-rtc/livekit/rtc/audio_stream.py | 20 -------------------- livekit-rtc/livekit/rtc/frame_processor.py | 4 ++++ livekit-rtc/livekit/rtc/track.py | 21 ++++++++++----------- 3 files changed, 14 insertions(+), 31 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index d82c2792..af35336f 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -245,26 +245,6 @@ def from_track( noise_cancellation_leave_open=noise_cancellation_leave_open, ) - def _on_processor_stream_info_updated( - self, - *, - room_name: str, - participant_identity: str, - publication_sid: str, - ) -> None: - if self._processor is None: - return - self._processor._on_stream_info_updated( - room_name=room_name, - participant_identity=participant_identity, - publication_sid=publication_sid, - ) - - def _on_processor_credentials_updated(self, *, token: str, url: str) -> None: - if self._processor is None: - return - self._processor._on_credentials_updated(token=token, url=url) - def __del__(self) -> None: FfiClient.instance.queue.unsubscribe(self._ffi_queue) diff --git a/livekit-rtc/livekit/rtc/frame_processor.py b/livekit-rtc/livekit/rtc/frame_processor.py index b8cc542e..ae03d537 100644 --- a/livekit-rtc/livekit/rtc/frame_processor.py +++ b/livekit-rtc/livekit/rtc/frame_processor.py @@ -24,8 +24,12 @@ def _on_stream_info_updated( publication_sid: str, ) -> None: ... + def _on_stream_info_cleared(self) -> None: ... + def _on_credentials_updated(self, *, token: str, url: str) -> None: ... + def _on_credentials_cleared(self) -> None: ... + @abstractmethod def _process(self, frame: T) -> T: ... diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index 3cffb702..896fadfe 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -56,19 +56,18 @@ def _on_room_token_refreshed(self) -> None: if room is None or room._token is None or room._server_url is None: return for stream in self._audio_streams: - stream._on_processor_credentials_updated(token=room._token, url=room._server_url) + if not stream._processor: + continue + stream._processor._on_credentials_updated(token=room._token, url=room._server_url) def _push_processor_metadata_to_stream(self, stream: AudioStream, room: Optional[Room]) -> None: + if not stream._processor: + return + if room is None: # track left a room — clear processor's room context - # FIXME: This isn't really good, and I can't figure out what should happen here - # Closing the processor doesn't work (the track could get added to another room later) - # Empty values like this don't work, because it causes a drm::Error in the plugin - # Talk to lukas about this in a 1:1 and see if he can think of anything better - stream._on_processor_stream_info_updated( - room_name="", participant_identity="", publication_sid="" - ) - # stream._on_processor_credentials_updated(token="", url="") + stream._processor._on_stream_info_cleared() + stream._processor._on_credentials_cleared() return identity = "" @@ -88,13 +87,13 @@ def _push_processor_metadata_to_stream(self, stream: AudioStream, room: Optional identity, pub_sid = local.identity, local_publication.sid break - stream._on_processor_stream_info_updated( + stream._processor._on_stream_info_updated( room_name=room.name, participant_identity=identity, publication_sid=pub_sid, ) if room._token is not None and room._server_url is not None: - stream._on_processor_credentials_updated(token=room._token, url=room._server_url) + stream._processor._on_credentials_updated(token=room._token, url=room._server_url) def _register_audio_stream(self, stream: AudioStream) -> None: self._audio_streams.add(stream) From 8d3f4fe313ac9130393df713f89e63a7a2b61c03 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 27 May 2026 11:57:08 -0400 Subject: [PATCH 18/20] fix: apply devin suggestion --- livekit-rtc/livekit/rtc/audio_stream.py | 4 ++-- livekit-rtc/livekit/rtc/track.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index af35336f..70b99c39 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -319,12 +319,12 @@ async def aclose(self) -> None: This method cleans up resources associated with the audio stream and waits for any pending operations to complete. """ - if self._processor is not None and not self._processor_leave_open: - self._processor._close() if self._track is not None: self._track._unregister_audio_stream(self) self._ffi_handle.dispose() await self._task + if self._processor is not None and not self._processor_leave_open: + self._processor._close() def _is_event(self, e: proto_ffi.FfiEvent) -> bool: return e.audio_stream_event.stream_handle == self._ffi_handle.handle diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index 896fadfe..684a6d4d 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -65,7 +65,7 @@ def _push_processor_metadata_to_stream(self, stream: AudioStream, room: Optional return if room is None: - # track left a room — clear processor's room context + # track left a room - clear processor's room context stream._processor._on_stream_info_cleared() stream._processor._on_credentials_cleared() return From 7743e6a52abb11c37b5c5d22a37bd4dd873488cf Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 27 May 2026 13:16:41 -0400 Subject: [PATCH 19/20] feat: add new frame processor tests These tests exercise all the frame processor track reparenting under room / etc paths. --- .../tests/test_audio_stream_room_lifecycle.py | 469 ++++++++++++++++++ 1 file changed, 469 insertions(+) create mode 100644 livekit-rtc/tests/test_audio_stream_room_lifecycle.py diff --git a/livekit-rtc/tests/test_audio_stream_room_lifecycle.py b/livekit-rtc/tests/test_audio_stream_room_lifecycle.py new file mode 100644 index 00000000..cf851f90 --- /dev/null +++ b/livekit-rtc/tests/test_audio_stream_room_lifecycle.py @@ -0,0 +1,469 @@ +"""Unit tests for Track-driven FrameProcessor metadata propagation. + +These tests drive the **real** `rtc.Track`, `rtc.Room`, `rtc.RemoteParticipant`, +and `rtc.RemoteTrackPublication` objects — constructed via `__new__` injection +to bypass FFI — against a `_RecordingProcessor` test double. They cover the +contract that an attached FrameProcessor receives: + - `_on_stream_info_updated` / `_on_credentials_updated` on every room transition + or token refresh, + - `_on_stream_info_cleared` / `_on_credentials_cleared` when the track leaves a room, +and that `AudioStream.aclose()` honors `noise_cancellation_leave_open`. +""" + +from __future__ import annotations + +import asyncio +import weakref +from types import SimpleNamespace +from typing import Any, Optional + +import pytest + +from livekit import rtc +from livekit.rtc.event_emitter import EventEmitter + + +# -- real-object helpers ------------------------------------------------------ + + +def _make_room(name: str = "room-x", token: str = "tok-x", url: str = "wss://x") -> rtc.Room: + """Build a real `rtc.Room` via __new__, injecting just the fields Track reads.""" + room = rtc.Room.__new__(rtc.Room) + EventEmitter.__init__(room) + room._info = SimpleNamespace(name=name) + room._token = token + room._server_url = url + room._remote_participants = {} + room._local_participant = None + room._ffi_handle = None # `Room.__del__` reads this on GC + return room + + +def _make_remote_participant(identity: str) -> rtc.RemoteParticipant: + p = rtc.RemoteParticipant.__new__(rtc.RemoteParticipant) + p._info = SimpleNamespace(identity=identity) + p._track_publications = {} + return p + + +def _make_remote_publication(sid: str) -> rtc.RemoteTrackPublication: + pub = rtc.RemoteTrackPublication.__new__(rtc.RemoteTrackPublication) + pub._info = SimpleNamespace(sid=sid) + return pub + + +def _make_track(sid: str = "TR_x") -> rtc.Track: + track = rtc.Track.__new__(rtc.Track) + track._info = SimpleNamespace(sid=sid) + track._ffi_handle = None + track._room_ref = None + track._audio_streams = weakref.WeakSet() + return track + + +def _attach_publication(room: rtc.Room, *, identity: str, track_sid: str, pub_sid: str) -> None: + participant = _make_remote_participant(identity=identity) + participant._track_publications[track_sid] = _make_remote_publication(sid=pub_sid) + room._remote_participants[identity] = participant + + +def _make_stream( + *, + track: Optional[rtc.Track] = None, + processor: Optional[rtc.FrameProcessor[rtc.AudioFrame]] = None, +) -> rtc.AudioStream: + """Build an AudioStream without going through the FFI-touching __init__.""" + stream: Any = rtc.AudioStream.__new__(rtc.AudioStream) + stream._track = track + stream._processor = processor + stream._audio_filter_module = None + stream._audio_filter_options = None + + if track is not None: + track._register_audio_stream(stream) + return stream + + +def _make_closeable_stream( + *, + track: Optional[rtc.Track] = None, + processor: Optional[rtc.FrameProcessor[rtc.AudioFrame]] = None, + leave_open: bool = False, +) -> rtc.AudioStream: + """Extends _make_stream with the minimal state `aclose()` touches.""" + stream = _make_stream(track=track, processor=processor) + stream._processor_leave_open = leave_open # type: ignore[attr-defined] + fut = asyncio.get_event_loop().create_future() + fut.set_result(None) + stream._task = fut # type: ignore[attr-defined] + stream._ffi_handle = SimpleNamespace(dispose=lambda: None) # type: ignore[attr-defined] + return stream + + +class _RecordingProcessor(rtc.FrameProcessor[rtc.AudioFrame]): + def __init__(self) -> None: + self._enabled = True + self.stream_info_calls: list[dict[str, str]] = [] + self.credentials_calls: list[dict[str, str]] = [] + self.stream_info_cleared_calls: int = 0 + self.credentials_cleared_calls: int = 0 + self.close_calls = 0 + + @property + def enabled(self) -> bool: + return self._enabled + + @enabled.setter + def enabled(self, value: bool) -> None: + self._enabled = value + + def _on_stream_info_updated( + self, *, room_name: str, participant_identity: str, publication_sid: str + ) -> None: + self.stream_info_calls.append( + { + "room_name": room_name, + "participant_identity": participant_identity, + "publication_sid": publication_sid, + } + ) + + def _on_stream_info_cleared(self) -> None: + self.stream_info_cleared_calls += 1 + + def _on_credentials_updated(self, *, token: str, url: str) -> None: + self.credentials_calls.append({"token": token, "url": url}) + + def _on_credentials_cleared(self) -> None: + self.credentials_cleared_calls += 1 + + def _process(self, frame: rtc.AudioFrame) -> rtc.AudioFrame: + return frame + + def _close(self) -> None: + self.close_calls += 1 + + +def test_processor_receives_lifecycle_callbacks_on_room_attach() -> None: + room = _make_room(name="room-1", token="tok-1", url="wss://room-1") + _attach_publication(room, identity="alice", track_sid="TR_1", pub_sid="PUB_1") + track = _make_track(sid="TR_1") + track._set_room(room) + processor = _RecordingProcessor() + + _stream = _make_stream(track=track, processor=processor) # noqa: F841 + + assert len(processor.stream_info_calls) == 1 + assert processor.stream_info_calls[0] == { + "room_name": "room-1", + "participant_identity": "alice", + "publication_sid": "PUB_1", + } + assert len(processor.credentials_calls) == 1 + assert processor.credentials_calls[0] == {"token": "tok-1", "url": "wss://room-1"} + + +def test_processor_callbacks_refire_on_track_room_change() -> None: + room_a = _make_room(name="room-a", token="tok-a", url="wss://a") + _attach_publication(room_a, identity="alice", track_sid="TR_1", pub_sid="PUB_1") + track = _make_track(sid="TR_1") + track._set_room(room_a) + processor = _RecordingProcessor() + _stream = _make_stream(track=track, processor=processor) # noqa: F841 + + assert len(processor.stream_info_calls) == 1 + assert len(processor.credentials_calls) == 1 + + room_b = _make_room(name="room-b", token="tok-b", url="wss://b") + _attach_publication(room_b, identity="bob", track_sid="TR_1", pub_sid="PUB_2") + track._set_room(room_b) + + assert len(processor.stream_info_calls) == 2 + assert processor.stream_info_calls[-1] == { + "room_name": "room-b", + "participant_identity": "bob", + "publication_sid": "PUB_2", + } + assert len(processor.credentials_calls) == 2 + assert processor.credentials_calls[-1] == {"token": "tok-b", "url": "wss://b"} + + +def test_token_refresh_propagates_to_processor() -> None: + room = _make_room(name="room-1", token="tok-initial", url="wss://r") + _attach_publication(room, identity="alice", track_sid="TR_1", pub_sid="PUB_1") + track = _make_track(sid="TR_1") + track._set_room(room) + processor = _RecordingProcessor() + _stream = _make_stream(track=track, processor=processor) # noqa: F841 + + baseline_creds = len(processor.credentials_calls) + baseline_info = len(processor.stream_info_calls) + + room._token = "tok-rotated" + room.emit("token_refreshed") + + assert len(processor.credentials_calls) == baseline_creds + 1 + assert processor.credentials_calls[-1] == {"token": "tok-rotated", "url": "wss://r"} + assert len(processor.stream_info_calls) == baseline_info + + +def test_repeated_set_room_with_same_room_does_not_double_register_listener() -> None: + room = _make_room(name="room-1", token="tok-1", url="wss://r") + _attach_publication(room, identity="alice", track_sid="TR_1", pub_sid="PUB_1") + track = _make_track(sid="TR_1") + track._set_room(room) + processor = _RecordingProcessor() + _stream = _make_stream(track=track, processor=processor) # noqa: F841 + + track._set_room(room) # idempotent set with the same room + + baseline = len(processor.credentials_calls) + room._token = "tok-rotated" + room.emit("token_refreshed") + + assert len(processor.credentials_calls) == baseline + 1 + + +def test_set_room_swaps_listener_to_new_room() -> None: + room_a = _make_room(name="a", token="ta", url="wss://a") + _attach_publication(room_a, identity="alice", track_sid="TR_1", pub_sid="PUB_1") + room_b = _make_room(name="b", token="tb", url="wss://b") + _attach_publication(room_b, identity="bob", track_sid="TR_1", pub_sid="PUB_2") + + track = _make_track(sid="TR_1") + track._set_room(room_a) + processor = _RecordingProcessor() + _stream = _make_stream(track=track, processor=processor) # noqa: F841 + + track._set_room(room_b) + + creds_before_a_emit = len(processor.credentials_calls) + room_a._token = "ta-rotated" + room_a.emit("token_refreshed") + assert len(processor.credentials_calls) == creds_before_a_emit, ( + "token_refreshed on the old room must not reach the processor" + ) + + room_b._token = "tb-rotated" + room_b.emit("token_refreshed") + assert len(processor.credentials_calls) == creds_before_a_emit + 1 + assert processor.credentials_calls[-1] == {"token": "tb-rotated", "url": "wss://b"} + + +def test_unregister_audio_stream_stops_metadata_pushes() -> None: + """After a stream is unregistered from the track, subsequent room events + (e.g. token_refreshed) must not reach the stream's processor.""" + room = _make_room(name="room-1", token="tok-1", url="wss://r") + _attach_publication(room, identity="alice", track_sid="TR_1", pub_sid="PUB_1") + track = _make_track(sid="TR_1") + track._set_room(room) + processor = _RecordingProcessor() + stream = _make_stream(track=track, processor=processor) + + track._unregister_audio_stream(stream) + + baseline = len(processor.credentials_calls) + room._token = "tok-rotated" + room.emit("token_refreshed") + + assert len(processor.credentials_calls) == baseline, ( + "unregistered stream's processor must not receive credentials on token_refreshed" + ) + + +def test_track_leaving_room_clears_processor_metadata() -> None: + """When a track's room transitions to None (e.g. on unsubscribe/unpublish), + the processor receives `_on_*_cleared` calls.""" + room = _make_room(name="room-1", token="tok-1", url="wss://r") + _attach_publication(room, identity="alice", track_sid="TR_1", pub_sid="PUB_1") + track = _make_track(sid="TR_1") + track._set_room(room) + processor = _RecordingProcessor() + _stream = _make_stream(track=track, processor=processor) # noqa: F841 + + assert len(processor.stream_info_calls) == 1 + assert len(processor.credentials_calls) == 1 + assert processor.stream_info_cleared_calls == 0 + assert processor.credentials_cleared_calls == 0 + + track._set_room(None) + + # _updated lists unchanged; _cleared counters bumped exactly once each + assert len(processor.stream_info_calls) == 1 + assert len(processor.credentials_calls) == 1 + assert processor.stream_info_cleared_calls == 1 + assert processor.credentials_cleared_calls == 1 + + +def test_fanout_to_multiple_registered_streams() -> None: + """A Track with multiple registered AudioStreams fans metadata out to all + of them on room attach AND on token refresh.""" + room = _make_room(name="room-1", token="tok-1", url="wss://r") + _attach_publication(room, identity="alice", track_sid="TR_1", pub_sid="PUB_1") + track = _make_track(sid="TR_1") + track._set_room(room) + + processor1 = _RecordingProcessor() + processor2 = _RecordingProcessor() + _s1 = _make_stream(track=track, processor=processor1) # noqa: F841 + _s2 = _make_stream(track=track, processor=processor2) # noqa: F841 + + # both processors got the initial push on registration + assert len(processor1.stream_info_calls) == 1 + assert len(processor1.credentials_calls) == 1 + assert len(processor2.stream_info_calls) == 1 + assert len(processor2.credentials_calls) == 1 + + room._token = "tok-rotated" + room.emit("token_refreshed") + + # token refresh fans credentials to BOTH processors; no new stream_info + assert len(processor1.credentials_calls) == 2 + assert processor1.credentials_calls[-1] == {"token": "tok-rotated", "url": "wss://r"} + assert len(processor2.credentials_calls) == 2 + assert processor2.credentials_calls[-1] == {"token": "tok-rotated", "url": "wss://r"} + assert len(processor1.stream_info_calls) == 1 + assert len(processor2.stream_info_calls) == 1 + + +def test_register_audio_stream_before_track_enters_room() -> None: + """A stream registered against a Track that has no room yet receives no + metadata until the Track is moved into a room.""" + track = _make_track(sid="TR_1") # track has no room + processor = _RecordingProcessor() + _stream = _make_stream(track=track, processor=processor) # noqa: F841 + + # nothing pushed yet — Track is roomless + assert len(processor.stream_info_calls) == 0 + assert len(processor.credentials_calls) == 0 + + room = _make_room(name="room-1", token="tok-1", url="wss://r") + _attach_publication(room, identity="alice", track_sid="TR_1", pub_sid="PUB_1") + track._set_room(room) + + # now the stream's processor sees the metadata + assert len(processor.stream_info_calls) == 1 + assert processor.stream_info_calls[0] == { + "room_name": "room-1", + "participant_identity": "alice", + "publication_sid": "PUB_1", + } + assert len(processor.credentials_calls) == 1 + assert processor.credentials_calls[0] == {"token": "tok-1", "url": "wss://r"} + + +def test_track_room_cycle_attach_detach_reattach() -> None: + """Track enters room A → leaves to None → enters room B. Processor sees + real(A), cleared, real(B); listener fully migrates each time.""" + room_a = _make_room(name="a", token="ta", url="wss://a") + _attach_publication(room_a, identity="alice", track_sid="TR_1", pub_sid="PUB_A") + room_b = _make_room(name="b", token="tb", url="wss://b") + _attach_publication(room_b, identity="bob", track_sid="TR_1", pub_sid="PUB_B") + + track = _make_track(sid="TR_1") + processor = _RecordingProcessor() + _stream = _make_stream(track=track, processor=processor) # noqa: F841 + + track._set_room(room_a) + track._set_room(None) + track._set_room(room_b) + + # two real updates (A then B) interleaved with one clear pass + assert len(processor.stream_info_calls) == 2 + assert processor.stream_info_calls[0]["room_name"] == "a" + assert processor.stream_info_calls[0]["publication_sid"] == "PUB_A" + assert processor.stream_info_calls[1]["room_name"] == "b" + assert processor.stream_info_calls[1]["publication_sid"] == "PUB_B" + + assert len(processor.credentials_calls) == 2 + assert processor.credentials_calls[0] == {"token": "ta", "url": "wss://a"} + assert processor.credentials_calls[1] == {"token": "tb", "url": "wss://b"} + + assert processor.stream_info_cleared_calls == 1 + assert processor.credentials_cleared_calls == 1 + + # listener fully detached from room_a; only room_b reaches the processor now + creds_after_cycle = len(processor.credentials_calls) + room_a._token = "ta-rotated" + room_a.emit("token_refreshed") + assert len(processor.credentials_calls) == creds_after_cycle, ( + "token_refreshed on the abandoned room must not reach the processor" + ) + + room_b._token = "tb-rotated" + room_b.emit("token_refreshed") + assert len(processor.credentials_calls) == creds_after_cycle + 1 + assert processor.credentials_calls[-1] == {"token": "tb-rotated", "url": "wss://b"} + + +def test_set_room_with_no_registered_streams_is_safe() -> None: + """Setting a room (or clearing it) on a Track with no registered streams + must be a safe no-op. Subsequent stream registration still picks up the + current room's metadata.""" + room = _make_room(name="room-1", token="tok-1", url="wss://r") + _attach_publication(room, identity="alice", track_sid="TR_1", pub_sid="PUB_1") + + track = _make_track(sid="TR_1") + track._set_room(room) # no streams yet — must not raise + track._set_room(None) # detach — must not raise + track._set_room(room) # re-attach — still safe + + processor = _RecordingProcessor() + _stream = _make_stream(track=track, processor=processor) # noqa: F841 + + assert len(processor.stream_info_calls) == 1 + assert processor.stream_info_calls[0]["room_name"] == "room-1" + assert len(processor.credentials_calls) == 1 + + +def test_unregister_one_of_many_streams_only_fans_out_to_remaining() -> None: + """When one of many streams is unregistered, subsequent token_refresh + reaches only the remaining streams' processors.""" + room = _make_room(name="room-1", token="tok-1", url="wss://r") + _attach_publication(room, identity="alice", track_sid="TR_1", pub_sid="PUB_1") + track = _make_track(sid="TR_1") + track._set_room(room) + + processor1 = _RecordingProcessor() + processor2 = _RecordingProcessor() + stream1 = _make_stream(track=track, processor=processor1) + _stream2 = _make_stream(track=track, processor=processor2) # noqa: F841 + + track._unregister_audio_stream(stream1) + + p1_creds_before = len(processor1.credentials_calls) + p2_creds_before = len(processor2.credentials_calls) + + room._token = "tok-rotated" + room.emit("token_refreshed") + + assert len(processor1.credentials_calls) == p1_creds_before, ( + "unregistered stream's processor must not receive the refreshed credentials" + ) + assert len(processor2.credentials_calls) == p2_creds_before + 1 + assert processor2.credentials_calls[-1] == {"token": "tok-rotated", "url": "wss://r"} + + +@pytest.mark.asyncio +async def test_aclose_closes_processor_when_leave_open_false() -> None: + """`aclose()` calls `_close()` on the attached FrameProcessor when + `noise_cancellation_leave_open` is False (the default).""" + processor = _RecordingProcessor() + stream = _make_closeable_stream(processor=processor, leave_open=False) + + await stream.aclose() + + assert processor.close_calls == 1 + + +@pytest.mark.asyncio +async def test_aclose_leaves_processor_open_when_leave_open_true() -> None: + """`aclose()` does NOT call `_close()` when `noise_cancellation_leave_open` + is True — the agents SDK path for sharing one processor across many track + attach/detach cycles.""" + processor = _RecordingProcessor() + stream = _make_closeable_stream(processor=processor, leave_open=True) + + await stream.aclose() + + assert processor.close_calls == 0 From 75d88742583972c2bf9e19ea9f141661fafde294 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 27 May 2026 15:03:49 -0400 Subject: [PATCH 20/20] fix: address type errors in tests --- .../tests/test_audio_stream_room_lifecycle.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/livekit-rtc/tests/test_audio_stream_room_lifecycle.py b/livekit-rtc/tests/test_audio_stream_room_lifecycle.py index cf851f90..25871329 100644 --- a/livekit-rtc/tests/test_audio_stream_room_lifecycle.py +++ b/livekit-rtc/tests/test_audio_stream_room_lifecycle.py @@ -15,11 +15,14 @@ import asyncio import weakref from types import SimpleNamespace -from typing import Any, Optional +from typing import Any, Optional, cast import pytest from livekit import rtc +from livekit.rtc._proto import participant_pb2 as proto_participant +from livekit.rtc._proto import room_pb2 as proto_room +from livekit.rtc._proto import track_pb2 as proto_track from livekit.rtc.event_emitter import EventEmitter @@ -30,7 +33,7 @@ def _make_room(name: str = "room-x", token: str = "tok-x", url: str = "wss://x") """Build a real `rtc.Room` via __new__, injecting just the fields Track reads.""" room = rtc.Room.__new__(rtc.Room) EventEmitter.__init__(room) - room._info = SimpleNamespace(name=name) + room._info = proto_room.RoomInfo(name=name) room._token = token room._server_url = url room._remote_participants = {} @@ -41,21 +44,21 @@ def _make_room(name: str = "room-x", token: str = "tok-x", url: str = "wss://x") def _make_remote_participant(identity: str) -> rtc.RemoteParticipant: p = rtc.RemoteParticipant.__new__(rtc.RemoteParticipant) - p._info = SimpleNamespace(identity=identity) + p._info = proto_participant.ParticipantInfo(identity=identity) p._track_publications = {} return p def _make_remote_publication(sid: str) -> rtc.RemoteTrackPublication: pub = rtc.RemoteTrackPublication.__new__(rtc.RemoteTrackPublication) - pub._info = SimpleNamespace(sid=sid) + pub._info = proto_track.TrackPublicationInfo(sid=sid) return pub def _make_track(sid: str = "TR_x") -> rtc.Track: track = rtc.Track.__new__(rtc.Track) - track._info = SimpleNamespace(sid=sid) - track._ffi_handle = None + track._info = proto_track.TrackInfo(sid=sid) + track._ffi_handle = cast(Any, None) track._room_ref = None track._audio_streams = weakref.WeakSet() return track @@ -73,7 +76,7 @@ def _make_stream( processor: Optional[rtc.FrameProcessor[rtc.AudioFrame]] = None, ) -> rtc.AudioStream: """Build an AudioStream without going through the FFI-touching __init__.""" - stream: Any = rtc.AudioStream.__new__(rtc.AudioStream) + stream = rtc.AudioStream.__new__(rtc.AudioStream) stream._track = track stream._processor = processor stream._audio_filter_module = None @@ -92,11 +95,9 @@ def _make_closeable_stream( ) -> rtc.AudioStream: """Extends _make_stream with the minimal state `aclose()` touches.""" stream = _make_stream(track=track, processor=processor) - stream._processor_leave_open = leave_open # type: ignore[attr-defined] - fut = asyncio.get_event_loop().create_future() - fut.set_result(None) - stream._task = fut # type: ignore[attr-defined] - stream._ffi_handle = SimpleNamespace(dispose=lambda: None) # type: ignore[attr-defined] + stream._processor_leave_open = leave_open + stream._task = asyncio.ensure_future(asyncio.sleep(0)) + stream._ffi_handle = cast(Any, SimpleNamespace(dispose=lambda: None)) return stream