From 5428532aaaa87b702cbaf9b0f12694b1a5fe329b Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 27 May 2026 16:49:29 -0400 Subject: [PATCH 1/8] feat: add initial support for frame processor usage directly on tracks --- packages/livekit-rtc/src/audio_stream.ts | 28 ++++- packages/livekit-rtc/src/frame_processor.ts | 4 + packages/livekit-rtc/src/room.ts | 8 ++ packages/livekit-rtc/src/track.ts | 130 ++++++++++++++++++++ packages/livekit-rtc/tsconfig.json | 3 +- 5 files changed, 169 insertions(+), 4 deletions(-) diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index 97cea8b6..d3251251 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -13,6 +13,12 @@ import type { Track } from './track.js'; export interface AudioStreamOptions { noiseCancellation?: NoiseCancellationOptions | FrameProcessor; + /** + * If true and `noiseCancellation` is a {@link FrameProcessor}, leaves the + * processor open when the stream closes so the same processor can be reused + * with another {@link AudioStream}. Defaults to `false`. + */ + noiseCancellationLeaveOpen?: boolean; sampleRate?: number; numChannels?: number; frameSizeMs?: number; @@ -31,19 +37,23 @@ class AudioStreamSource implements UnderlyingSource { private sampleRate: number; private numChannels: number; private legacyNcOptions?: NoiseCancellationOptions; - private frameProcessor?: FrameProcessor; + private frameProcessor: FrameProcessor | null = null; + private leaveProcessorOpen = false; private frameSizeMs?: number; + private track: Track; constructor( track: Track, sampleRateOrOptions?: number | AudioStreamOptions, numChannels?: number, ) { + this.track = track; if (sampleRateOrOptions !== undefined && typeof sampleRateOrOptions !== 'number') { this.sampleRate = sampleRateOrOptions.sampleRate ?? 48000; this.numChannels = sampleRateOrOptions.numChannels ?? 1; if (isFrameProcessor(sampleRateOrOptions.noiseCancellation)) { this.frameProcessor = sampleRateOrOptions.noiseCancellation; + this.leaveProcessorOpen = sampleRateOrOptions.noiseCancellationLeaveOpen ?? false; } else { this.legacyNcOptions = sampleRateOrOptions.noiseCancellation; } @@ -77,6 +87,12 @@ class AudioStreamSource implements UnderlyingSource { this.ffiHandle = new FfiHandle(res.stream!.handle!.id!); FfiClient.instance.on(FfiClientEvent.FfiEvent, this.onEvent); + track.registerAudioStream(this); + } + + /** @internal */ + get processor(): FrameProcessor | null { + return this.frameProcessor; } private onEvent = (ev: FfiEvent) => { @@ -113,8 +129,11 @@ class AudioStreamSource implements UnderlyingSource { // while buffered frames are still in the ReadableStream queue. if (!this.disposed) { this.disposed = true; + this.track.unregisterAudioStream(this); this.ffiHandle.dispose(); - this.frameProcessor?.close(); + if (this.frameProcessor && !this.leaveProcessorOpen) { + this.frameProcessor.close(); + } } break; } @@ -128,10 +147,13 @@ class AudioStreamSource implements UnderlyingSource { FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent); if (!this.disposed) { this.disposed = true; + this.track.unregisterAudioStream(this); this.ffiHandle.dispose(); // Also close the frame processor on cancel for symmetry with the EOS path, // so resources are released regardless of how the stream ends. - this.frameProcessor?.close(); + if (this.frameProcessor && !this.leaveProcessorOpen) { + this.frameProcessor.close(); + } } } } diff --git a/packages/livekit-rtc/src/frame_processor.ts b/packages/livekit-rtc/src/frame_processor.ts index 629d883a..8feeaa2b 100644 --- a/packages/livekit-rtc/src/frame_processor.ts +++ b/packages/livekit-rtc/src/frame_processor.ts @@ -43,7 +43,11 @@ export abstract class FrameProcessor { // eslint-disable-next-line @typescript-eslint/no-unused-vars onStreamInfoUpdated(_info: FrameProcessorStreamInfo): void {} // eslint-disable-next-line @typescript-eslint/no-unused-vars + onStreamInfoCleared(): void {} + // eslint-disable-next-line @typescript-eslint/no-unused-vars onCredentialsUpdated(_credentials: FrameProcessorCredentials): void {} + // eslint-disable-next-line @typescript-eslint/no-unused-vars + onCredentialsCleared(): void {} abstract process(frame: Frame): Frame; abstract close(): void; diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 3958e72c..97886783 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -547,9 +547,15 @@ export class Room extends (EventEmitter as new () => TypedEmitter } } else if (ev.case == 'localTrackPublished') { const publication = this.localParticipant.trackPublications.get(ev.value.trackSid!); + if (publication?.track) { + publication.track.setRoom(this); + } this.emit(RoomEvent.LocalTrackPublished, publication!, this.localParticipant); } else if (ev.case == 'localTrackUnpublished') { const publication = this.localParticipant.trackPublications.get(ev.value.publicationSid!); + if (publication?.track) { + publication.track.setRoom(null); + } this.localParticipant.trackPublications.delete(ev.value.publicationSid!); this.emit(RoomEvent.LocalTrackUnpublished, publication!, this.localParticipant!); } else if ((ev.case as string) == 'localTrackRepublished') { @@ -610,6 +616,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter } else if (trackInfo.kind == TrackKind.KIND_AUDIO) { publication.track = new RemoteAudioTrack(ownedTrack); } + publication.track?.setRoom(this); this.emit(RoomEvent.TrackSubscribed, publication.track!, publication, participant); } catch (e: unknown) { @@ -622,6 +629,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter ev.value.trackSid!, ); const track = publication.track!; + track.setRoom(null); publication.track = undefined; publication.subscribed = false; this.emit(RoomEvent.TrackUnsubscribed, track, publication, participant); diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index 1a25dce0..a51587ca 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -10,10 +10,18 @@ import type { TrackKind, } from '@livekit/rtc-ffi-bindings'; import { CreateAudioTrackRequest, CreateVideoTrackRequest } from '@livekit/rtc-ffi-bindings'; +import type { AudioFrame } from './audio_frame.js'; import type { AudioSource } from './audio_source.js'; import { FfiClient, FfiHandle } from './ffi_client.js'; +import type { FrameProcessor } from './frame_processor.js'; +import type { Room } from './room.js'; import type { VideoSource } from './video_source.js'; +/** @internal */ +export interface AudioStreamLike { + readonly processor: FrameProcessor | null; +} + export abstract class Track { /** @internal */ info?: TrackInfo; @@ -21,9 +29,131 @@ export abstract class Track { /** @internal */ ffi_handle: FfiHandle; + private roomRef: WeakRef | null = null; + private audioStreams: Set> = new Set(); + private streamFinalizationRegistry: FinalizationRegistry>; + private onRoomTokenRefreshed = () => { + const room = this.resolveRoom(); + if (!room || !room.token || !room.serverUrl) return; + for (const stream of this.iterateStreams()) { + const processor = stream.processor; + if (!processor) continue; + processor.onCredentialsUpdated({ token: room.token, url: room.serverUrl }); + } + }; + constructor(owned: OwnedTrack) { this.info = owned.info; this.ffi_handle = new FfiHandle(owned.handle!.id!); + this.streamFinalizationRegistry = new FinalizationRegistry>((ref) => { + this.audioStreams.delete(ref); + }); + } + + /** @internal */ + resolveRoom(): Room | null { + return this.roomRef?.deref() ?? null; + } + + /** @internal */ + setRoom(room: Room | null): void { + const oldRoom = this.resolveRoom(); + if (oldRoom !== room) { + if (oldRoom) { + oldRoom.off('tokenRefreshed', this.onRoomTokenRefreshed); + } + if (room) { + room.on('tokenRefreshed', this.onRoomTokenRefreshed); + } + } + this.roomRef = room ? new WeakRef(room) : null; + for (const stream of this.iterateStreams()) { + this.pushProcessorMetadataToStream(stream, room); + } + } + + /** @internal */ + registerAudioStream(stream: AudioStreamLike): void { + const ref = new WeakRef(stream); + this.audioStreams.add(ref); + this.streamFinalizationRegistry.register(stream, ref); + const room = this.resolveRoom(); + if (room) { + this.pushProcessorMetadataToStream(stream, room); + } + } + + /** @internal */ + unregisterAudioStream(stream: AudioStreamLike): void { + for (const ref of this.audioStreams) { + if (ref.deref() === stream) { + this.audioStreams.delete(ref); + return; + } + } + } + + private *iterateStreams(): Generator { + const dead: Array> = []; + for (const ref of this.audioStreams) { + const stream = ref.deref(); + if (stream) { + yield stream; + } else { + dead.push(ref); + } + } + for (const ref of dead) { + this.audioStreams.delete(ref); + } + } + + private pushProcessorMetadataToStream(stream: AudioStreamLike, room: Room | null): void { + const processor = stream.processor; + if (!processor) return; + + if (!room) { + processor.onStreamInfoCleared(); + processor.onCredentialsCleared(); + return; + } + + let identity = ''; + let publicationSid = ''; + const trackSid = this.sid; + if (trackSid) { + let found = false; + for (const participant of room.remoteParticipants.values()) { + const publication = participant.trackPublications.get(trackSid); + if (publication) { + identity = participant.identity; + publicationSid = publication.sid ?? ''; + found = true; + break; + } + } + if (!found) { + const local = room.localParticipant; + if (local) { + for (const publication of local.trackPublications.values()) { + if (publication.sid === trackSid) { + identity = local.identity; + publicationSid = publication.sid ?? ''; + break; + } + } + } + } + } + + processor.onStreamInfoUpdated({ + roomName: room.name ?? '', + participantIdentity: identity, + publicationSid, + }); + if (room.token && room.serverUrl) { + processor.onCredentialsUpdated({ token: room.token, url: room.serverUrl }); + } } get sid(): string | undefined { diff --git a/packages/livekit-rtc/tsconfig.json b/packages/livekit-rtc/tsconfig.json index 21aeb92d..0c1a2ac4 100644 --- a/packages/livekit-rtc/tsconfig.json +++ b/packages/livekit-rtc/tsconfig.json @@ -2,7 +2,8 @@ "extends": "../../tsconfig.json", "compilerOptions": { "outDir": "dist", - "declarationDir": "dist" + "declarationDir": "dist", + "lib": ["es2015", "es2021.weakref"] }, "include": ["src/**/*.ts"], "exclude": ["src/**/*.test.ts", "vite.config.ts"] From 15509be00b27ad258fbab45a5892004113514d29 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Thu, 28 May 2026 12:35:26 -0400 Subject: [PATCH 2/8] fix: add optional chaining call to new *Cleared FrameProcessor methods --- packages/livekit-rtc/src/track.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index a51587ca..dfcc690f 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -113,8 +113,11 @@ export abstract class Track { if (!processor) return; if (!room) { - processor.onStreamInfoCleared(); - processor.onCredentialsCleared(); + // Guard with optional-call: plugins built against an older @livekit/rtc-node + // inherit a FrameProcessor base class that doesn't define these methods, + // so they could be undefined on the prototype chain. + processor.onStreamInfoCleared?.(); + processor.onCredentialsCleared?.(); return; } From 4e9c0f1ba9cca5bc2c211087fb8f0e703db0dffb Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Thu, 28 May 2026 13:32:48 -0400 Subject: [PATCH 3/8] fix: add missing changeset --- .changeset/proud-pianos-joke.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/proud-pianos-joke.md diff --git a/.changeset/proud-pianos-joke.md b/.changeset/proud-pianos-joke.md new file mode 100644 index 00000000..06ece037 --- /dev/null +++ b/.changeset/proud-pianos-joke.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Add initial support for frame processor usage directly on tracks From 50e9551052abc3600b06da49c90b6049facbcd17 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Thu, 28 May 2026 13:37:25 -0400 Subject: [PATCH 4/8] fix: get rid of AudioStreamLike --- packages/livekit-rtc/src/audio_stream.ts | 2 +- packages/livekit-rtc/src/track.ts | 24 +++++++++--------------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index d3251251..1e4083d2 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -30,7 +30,7 @@ export interface NoiseCancellationOptions { options: Record; } -class AudioStreamSource implements UnderlyingSource { +export class AudioStreamSource implements UnderlyingSource { private controller?: ReadableStreamDefaultController; private ffiHandle: FfiHandle; private disposed = false; diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index dfcc690f..d6b66681 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -10,17 +10,11 @@ import type { TrackKind, } from '@livekit/rtc-ffi-bindings'; import { CreateAudioTrackRequest, CreateVideoTrackRequest } from '@livekit/rtc-ffi-bindings'; -import type { AudioFrame } from './audio_frame.js'; import type { AudioSource } from './audio_source.js'; import { FfiClient, FfiHandle } from './ffi_client.js'; -import type { FrameProcessor } from './frame_processor.js'; import type { Room } from './room.js'; import type { VideoSource } from './video_source.js'; - -/** @internal */ -export interface AudioStreamLike { - readonly processor: FrameProcessor | null; -} +import type { AudioStreamSource } from './audio_stream.js'; export abstract class Track { /** @internal */ @@ -30,8 +24,8 @@ export abstract class Track { ffi_handle: FfiHandle; private roomRef: WeakRef | null = null; - private audioStreams: Set> = new Set(); - private streamFinalizationRegistry: FinalizationRegistry>; + private audioStreams: Set> = new Set(); + private streamFinalizationRegistry: FinalizationRegistry>; private onRoomTokenRefreshed = () => { const room = this.resolveRoom(); if (!room || !room.token || !room.serverUrl) return; @@ -45,7 +39,7 @@ export abstract class Track { constructor(owned: OwnedTrack) { this.info = owned.info; this.ffi_handle = new FfiHandle(owned.handle!.id!); - this.streamFinalizationRegistry = new FinalizationRegistry>((ref) => { + this.streamFinalizationRegistry = new FinalizationRegistry>((ref) => { this.audioStreams.delete(ref); }); } @@ -73,7 +67,7 @@ export abstract class Track { } /** @internal */ - registerAudioStream(stream: AudioStreamLike): void { + registerAudioStream(stream: AudioStreamSource): void { const ref = new WeakRef(stream); this.audioStreams.add(ref); this.streamFinalizationRegistry.register(stream, ref); @@ -84,7 +78,7 @@ export abstract class Track { } /** @internal */ - unregisterAudioStream(stream: AudioStreamLike): void { + unregisterAudioStream(stream: AudioStreamSource): void { for (const ref of this.audioStreams) { if (ref.deref() === stream) { this.audioStreams.delete(ref); @@ -93,8 +87,8 @@ export abstract class Track { } } - private *iterateStreams(): Generator { - const dead: Array> = []; + private *iterateStreams(): Generator { + const dead: Array> = []; for (const ref of this.audioStreams) { const stream = ref.deref(); if (stream) { @@ -108,7 +102,7 @@ export abstract class Track { } } - private pushProcessorMetadataToStream(stream: AudioStreamLike, room: Room | null): void { + private pushProcessorMetadataToStream(stream: AudioStreamSource, room: Room | null): void { const processor = stream.processor; if (!processor) return; From cbad604b6e70f1187172a3b635be62d4d69ee52d Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Thu, 28 May 2026 13:44:16 -0400 Subject: [PATCH 5/8] fix: run prettier --- packages/livekit-rtc/src/track.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index d6b66681..946c6e0c 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -11,10 +11,10 @@ import type { } from '@livekit/rtc-ffi-bindings'; import { CreateAudioTrackRequest, CreateVideoTrackRequest } from '@livekit/rtc-ffi-bindings'; import type { AudioSource } from './audio_source.js'; +import type { AudioStreamSource } from './audio_stream.js'; import { FfiClient, FfiHandle } from './ffi_client.js'; import type { Room } from './room.js'; import type { VideoSource } from './video_source.js'; -import type { AudioStreamSource } from './audio_stream.js'; export abstract class Track { /** @internal */ @@ -31,7 +31,9 @@ export abstract class Track { if (!room || !room.token || !room.serverUrl) return; for (const stream of this.iterateStreams()) { const processor = stream.processor; - if (!processor) continue; + if (!processor) { + continue; + } processor.onCredentialsUpdated({ token: room.token, url: room.serverUrl }); } }; @@ -39,9 +41,11 @@ export abstract class Track { constructor(owned: OwnedTrack) { this.info = owned.info; this.ffi_handle = new FfiHandle(owned.handle!.id!); - this.streamFinalizationRegistry = new FinalizationRegistry>((ref) => { - this.audioStreams.delete(ref); - }); + this.streamFinalizationRegistry = new FinalizationRegistry>( + (ref) => { + this.audioStreams.delete(ref); + }, + ); } /** @internal */ @@ -104,7 +108,9 @@ export abstract class Track { private pushProcessorMetadataToStream(stream: AudioStreamSource, room: Room | null): void { const processor = stream.processor; - if (!processor) return; + if (!processor) { + return; + } if (!room) { // Guard with optional-call: plugins built against an older @livekit/rtc-node From c1e6f43501fcdc03488c8e748dbc15bed9a51b52 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Fri, 29 May 2026 10:17:27 -0400 Subject: [PATCH 6/8] fix: address devin issue --- packages/livekit-rtc/src/track.ts | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index 946c6e0c..621f4067 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -56,13 +56,12 @@ export abstract class Track { /** @internal */ setRoom(room: Room | null): void { const oldRoom = this.resolveRoom(); - if (oldRoom !== room) { - if (oldRoom) { - oldRoom.off('tokenRefreshed', this.onRoomTokenRefreshed); - } - if (room) { - room.on('tokenRefreshed', this.onRoomTokenRefreshed); - } + if (oldRoom && oldRoom !== room) { + oldRoom.off('tokenRefreshed', this.onRoomTokenRefreshed); + } + if (room) { + room.off('tokenRefreshed', this.onRoomTokenRefreshed); + room.on('tokenRefreshed', this.onRoomTokenRefreshed); } this.roomRef = room ? new WeakRef(room) : null; for (const stream of this.iterateStreams()) { From 28da338cc5338607e8359b9dd7d6aa8cc5a35c84 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Thu, 28 May 2026 13:13:38 -0400 Subject: [PATCH 7/8] fix: address issue where audio stream would be held open after the publisher stops publishing data --- packages/livekit-rtc/src/audio_stream.ts | 53 ++++++++++++++---------- packages/livekit-rtc/src/track.ts | 19 +++++---- 2 files changed, 41 insertions(+), 31 deletions(-) diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index 1e4083d2..596f3288 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -121,20 +121,7 @@ export class AudioStreamSource implements UnderlyingSource { this.controller.enqueue(frame); break; case 'eos': - FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent); - this.controller.close(); - // Dispose the native handle so the FD is released on stream end, - // not just when cancel() is called explicitly by the consumer. - // Guard against double-dispose if cancel() is called after EOS - // while buffered frames are still in the ReadableStream queue. - if (!this.disposed) { - this.disposed = true; - this.track.unregisterAudioStream(this); - this.ffiHandle.dispose(); - if (this.frameProcessor && !this.leaveProcessorOpen) { - this.frameProcessor.close(); - } - } + this.teardown(); break; } }; @@ -144,16 +131,36 @@ export class AudioStreamSource implements UnderlyingSource { } cancel() { + this.teardown(); + } + + /** + * Tear down this stream from the Track side — used when the track has been + * unsubscribed remotely and there will be no more frames. Closes the + * controller so any in flight async iterators by downstream consumers terminate. + * @internal + */ + closeFromTrack(): void { + this.teardown(); + } + + private teardown(): void { + if (this.disposed) return; + this.disposed = true; + // Close the controller FIRST so any pending reader.read() resolves with + // {done:true} and consumer-side `for await` / `iterator.return()` can + // unblock. The native EOS event (if it ever arrives) would do this for us, + // but we can't count on it firing for remote-unsubscribe scenarios. + try { + this.controller?.close(); + } catch { + // Controller may already be closed if EOS arrived first; ignore. + } FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent); - if (!this.disposed) { - this.disposed = true; - this.track.unregisterAudioStream(this); - this.ffiHandle.dispose(); - // Also close the frame processor on cancel for symmetry with the EOS path, - // so resources are released regardless of how the stream ends. - if (this.frameProcessor && !this.leaveProcessorOpen) { - this.frameProcessor.close(); - } + this.track.unregisterAudioStream(this); + this.ffiHandle.dispose(); + if (this.frameProcessor && !this.leaveProcessorOpen) { + this.frameProcessor.close(); } } } diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index 621f4067..722bf0e9 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -105,21 +105,24 @@ export abstract class Track { } } - private pushProcessorMetadataToStream(stream: AudioStreamSource, room: Room | null): void { - const processor = stream.processor; - if (!processor) { - return; - } - + private pushProcessorMetadataToStream(stream: AudioStreamLike, room: Room | null): void { if (!room) { // Guard with optional-call: plugins built against an older @livekit/rtc-node // inherit a FrameProcessor base class that doesn't define these methods, // so they could be undefined on the prototype chain. - processor.onStreamInfoCleared?.(); - processor.onCredentialsCleared?.(); + // still alive — the subsequent closeFromTrack may invoke processor.close(). + stream.processor?.onStreamInfoCleared?.(); + stream.processor?.onCredentialsCleared?.(); + // Tell the stream the track left the room — no more frames will arrive + // (the remote peer has unsubscribed), so close it so consumers' `for + // await` loops can terminate. + stream.closeFromTrack?.(); return; } + const processor = stream.processor; + if (!processor) return; + let identity = ''; let publicationSid = ''; const trackSid = this.sid; From c31a6eee7e81c179e3e67b8fe6691ce379280997 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Fri, 29 May 2026 15:45:20 -0400 Subject: [PATCH 8/8] fix: consolidate on cancel method name --- packages/livekit-rtc/src/audio_stream.ts | 10 ---------- packages/livekit-rtc/src/track.ts | 8 ++++---- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index 596f3288..e9dff06a 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -134,16 +134,6 @@ export class AudioStreamSource implements UnderlyingSource { this.teardown(); } - /** - * Tear down this stream from the Track side — used when the track has been - * unsubscribed remotely and there will be no more frames. Closes the - * controller so any in flight async iterators by downstream consumers terminate. - * @internal - */ - closeFromTrack(): void { - this.teardown(); - } - private teardown(): void { if (this.disposed) return; this.disposed = true; diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index 722bf0e9..b913c593 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -105,18 +105,18 @@ export abstract class Track { } } - private pushProcessorMetadataToStream(stream: AudioStreamLike, room: Room | null): void { + private pushProcessorMetadataToStream(stream: AudioStreamSource, room: Room | null): void { if (!room) { // Guard with optional-call: plugins built against an older @livekit/rtc-node // inherit a FrameProcessor base class that doesn't define these methods, // so they could be undefined on the prototype chain. - // still alive — the subsequent closeFromTrack may invoke processor.close(). + // still alive — the subsequent cancel may invoke processor.close(). stream.processor?.onStreamInfoCleared?.(); stream.processor?.onCredentialsCleared?.(); // Tell the stream the track left the room — no more frames will arrive - // (the remote peer has unsubscribed), so close it so consumers' `for + // (the remote peer has unsubscribed), so cancel it so consumers' `for // await` loops can terminate. - stream.closeFromTrack?.(); + stream.cancel(); return; }