diff --git a/.changeset/proud-pianos-joke.md b/.changeset/proud-pianos-joke.md new file mode 100644 index 00000000..06ece037 --- /dev/null +++ b/.changeset/proud-pianos-joke.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Add initial support for frame processor usage directly on tracks diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index 97cea8b6..1e4083d2 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -13,6 +13,12 @@ import type { Track } from './track.js'; export interface AudioStreamOptions { noiseCancellation?: NoiseCancellationOptions | FrameProcessor; + /** + * If true and `noiseCancellation` is a {@link FrameProcessor}, leaves the + * processor open when the stream closes so the same processor can be reused + * with another {@link AudioStream}. Defaults to `false`. + */ + noiseCancellationLeaveOpen?: boolean; sampleRate?: number; numChannels?: number; frameSizeMs?: number; @@ -24,26 +30,30 @@ export interface NoiseCancellationOptions { options: Record; } -class AudioStreamSource implements UnderlyingSource { +export class AudioStreamSource implements UnderlyingSource { private controller?: ReadableStreamDefaultController; private ffiHandle: FfiHandle; private disposed = false; private sampleRate: number; private numChannels: number; private legacyNcOptions?: NoiseCancellationOptions; - private frameProcessor?: FrameProcessor; + private frameProcessor: FrameProcessor | null = null; + private leaveProcessorOpen = false; private frameSizeMs?: number; + private track: Track; constructor( track: Track, sampleRateOrOptions?: number | AudioStreamOptions, numChannels?: number, ) { + this.track = track; if (sampleRateOrOptions !== undefined && typeof sampleRateOrOptions !== 'number') { this.sampleRate = sampleRateOrOptions.sampleRate ?? 48000; this.numChannels = sampleRateOrOptions.numChannels ?? 1; if (isFrameProcessor(sampleRateOrOptions.noiseCancellation)) { this.frameProcessor = sampleRateOrOptions.noiseCancellation; + this.leaveProcessorOpen = sampleRateOrOptions.noiseCancellationLeaveOpen ?? false; } else { this.legacyNcOptions = sampleRateOrOptions.noiseCancellation; } @@ -77,6 +87,12 @@ class AudioStreamSource implements UnderlyingSource { this.ffiHandle = new FfiHandle(res.stream!.handle!.id!); FfiClient.instance.on(FfiClientEvent.FfiEvent, this.onEvent); + track.registerAudioStream(this); + } + + /** @internal */ + get processor(): FrameProcessor | null { + return this.frameProcessor; } private onEvent = (ev: FfiEvent) => { @@ -113,8 +129,11 @@ class AudioStreamSource implements UnderlyingSource { // while buffered frames are still in the ReadableStream queue. if (!this.disposed) { this.disposed = true; + this.track.unregisterAudioStream(this); this.ffiHandle.dispose(); - this.frameProcessor?.close(); + if (this.frameProcessor && !this.leaveProcessorOpen) { + this.frameProcessor.close(); + } } break; } @@ -128,10 +147,13 @@ class AudioStreamSource implements UnderlyingSource { FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent); if (!this.disposed) { this.disposed = true; + this.track.unregisterAudioStream(this); this.ffiHandle.dispose(); // Also close the frame processor on cancel for symmetry with the EOS path, // so resources are released regardless of how the stream ends. - this.frameProcessor?.close(); + if (this.frameProcessor && !this.leaveProcessorOpen) { + this.frameProcessor.close(); + } } } } diff --git a/packages/livekit-rtc/src/frame_processor.ts b/packages/livekit-rtc/src/frame_processor.ts index 629d883a..8feeaa2b 100644 --- a/packages/livekit-rtc/src/frame_processor.ts +++ b/packages/livekit-rtc/src/frame_processor.ts @@ -43,7 +43,11 @@ export abstract class FrameProcessor { // eslint-disable-next-line @typescript-eslint/no-unused-vars onStreamInfoUpdated(_info: FrameProcessorStreamInfo): void {} // eslint-disable-next-line @typescript-eslint/no-unused-vars + onStreamInfoCleared(): void {} + // eslint-disable-next-line @typescript-eslint/no-unused-vars onCredentialsUpdated(_credentials: FrameProcessorCredentials): void {} + // eslint-disable-next-line @typescript-eslint/no-unused-vars + onCredentialsCleared(): void {} abstract process(frame: Frame): Frame; abstract close(): void; diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 3958e72c..97886783 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -547,9 +547,15 @@ export class Room extends (EventEmitter as new () => TypedEmitter } } else if (ev.case == 'localTrackPublished') { const publication = this.localParticipant.trackPublications.get(ev.value.trackSid!); + if (publication?.track) { + publication.track.setRoom(this); + } this.emit(RoomEvent.LocalTrackPublished, publication!, this.localParticipant); } else if (ev.case == 'localTrackUnpublished') { const publication = this.localParticipant.trackPublications.get(ev.value.publicationSid!); + if (publication?.track) { + publication.track.setRoom(null); + } this.localParticipant.trackPublications.delete(ev.value.publicationSid!); this.emit(RoomEvent.LocalTrackUnpublished, publication!, this.localParticipant!); } else if ((ev.case as string) == 'localTrackRepublished') { @@ -610,6 +616,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter } else if (trackInfo.kind == TrackKind.KIND_AUDIO) { publication.track = new RemoteAudioTrack(ownedTrack); } + publication.track?.setRoom(this); this.emit(RoomEvent.TrackSubscribed, publication.track!, publication, participant); } catch (e: unknown) { @@ -622,6 +629,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter ev.value.trackSid!, ); const track = publication.track!; + track.setRoom(null); publication.track = undefined; publication.subscribed = false; this.emit(RoomEvent.TrackUnsubscribed, track, publication, participant); diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index 1a25dce0..621f4067 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -11,7 +11,9 @@ import type { } from '@livekit/rtc-ffi-bindings'; import { CreateAudioTrackRequest, CreateVideoTrackRequest } from '@livekit/rtc-ffi-bindings'; import type { AudioSource } from './audio_source.js'; +import type { AudioStreamSource } from './audio_stream.js'; import { FfiClient, FfiHandle } from './ffi_client.js'; +import type { Room } from './room.js'; import type { VideoSource } from './video_source.js'; export abstract class Track { @@ -21,9 +23,139 @@ export abstract class Track { /** @internal */ ffi_handle: FfiHandle; + private roomRef: WeakRef | null = null; + private audioStreams: Set> = new Set(); + private streamFinalizationRegistry: FinalizationRegistry>; + private onRoomTokenRefreshed = () => { + const room = this.resolveRoom(); + if (!room || !room.token || !room.serverUrl) return; + for (const stream of this.iterateStreams()) { + const processor = stream.processor; + if (!processor) { + continue; + } + processor.onCredentialsUpdated({ token: room.token, url: room.serverUrl }); + } + }; + constructor(owned: OwnedTrack) { this.info = owned.info; this.ffi_handle = new FfiHandle(owned.handle!.id!); + this.streamFinalizationRegistry = new FinalizationRegistry>( + (ref) => { + this.audioStreams.delete(ref); + }, + ); + } + + /** @internal */ + resolveRoom(): Room | null { + return this.roomRef?.deref() ?? null; + } + + /** @internal */ + setRoom(room: Room | null): void { + const oldRoom = this.resolveRoom(); + if (oldRoom && oldRoom !== room) { + oldRoom.off('tokenRefreshed', this.onRoomTokenRefreshed); + } + if (room) { + room.off('tokenRefreshed', this.onRoomTokenRefreshed); + room.on('tokenRefreshed', this.onRoomTokenRefreshed); + } + this.roomRef = room ? new WeakRef(room) : null; + for (const stream of this.iterateStreams()) { + this.pushProcessorMetadataToStream(stream, room); + } + } + + /** @internal */ + registerAudioStream(stream: AudioStreamSource): void { + const ref = new WeakRef(stream); + this.audioStreams.add(ref); + this.streamFinalizationRegistry.register(stream, ref); + const room = this.resolveRoom(); + if (room) { + this.pushProcessorMetadataToStream(stream, room); + } + } + + /** @internal */ + unregisterAudioStream(stream: AudioStreamSource): void { + for (const ref of this.audioStreams) { + if (ref.deref() === stream) { + this.audioStreams.delete(ref); + return; + } + } + } + + private *iterateStreams(): Generator { + const dead: Array> = []; + for (const ref of this.audioStreams) { + const stream = ref.deref(); + if (stream) { + yield stream; + } else { + dead.push(ref); + } + } + for (const ref of dead) { + this.audioStreams.delete(ref); + } + } + + private pushProcessorMetadataToStream(stream: AudioStreamSource, room: Room | null): void { + const processor = stream.processor; + if (!processor) { + return; + } + + if (!room) { + // Guard with optional-call: plugins built against an older @livekit/rtc-node + // inherit a FrameProcessor base class that doesn't define these methods, + // so they could be undefined on the prototype chain. + processor.onStreamInfoCleared?.(); + processor.onCredentialsCleared?.(); + return; + } + + let identity = ''; + let publicationSid = ''; + const trackSid = this.sid; + if (trackSid) { + let found = false; + for (const participant of room.remoteParticipants.values()) { + const publication = participant.trackPublications.get(trackSid); + if (publication) { + identity = participant.identity; + publicationSid = publication.sid ?? ''; + found = true; + break; + } + } + if (!found) { + const local = room.localParticipant; + if (local) { + for (const publication of local.trackPublications.values()) { + if (publication.sid === trackSid) { + identity = local.identity; + publicationSid = publication.sid ?? ''; + break; + } + } + } + } + } + + processor.onStreamInfoUpdated({ + roomName: room.name ?? '', + participantIdentity: identity, + publicationSid, + }); + if (room.token && room.serverUrl) { + processor.onCredentialsUpdated({ token: room.token, url: room.serverUrl }); + } } get sid(): string | undefined { diff --git a/packages/livekit-rtc/tsconfig.json b/packages/livekit-rtc/tsconfig.json index 21aeb92d..0c1a2ac4 100644 --- a/packages/livekit-rtc/tsconfig.json +++ b/packages/livekit-rtc/tsconfig.json @@ -2,7 +2,8 @@ "extends": "../../tsconfig.json", "compilerOptions": { "outDir": "dist", - "declarationDir": "dist" + "declarationDir": "dist", + "lib": ["es2015", "es2021.weakref"] }, "include": ["src/**/*.ts"], "exclude": ["src/**/*.test.ts", "vite.config.ts"]