Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/proud-pianos-joke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Add initial support for frame processor usage directly on tracks
30 changes: 26 additions & 4 deletions packages/livekit-rtc/src/audio_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import type { Track } from './track.js';

export interface AudioStreamOptions {
noiseCancellation?: NoiseCancellationOptions | FrameProcessor<AudioFrame>;
/**
* If true and `noiseCancellation` is a {@link FrameProcessor}, leaves the
* processor open when the stream closes so the same processor can be reused
* with another {@link AudioStream}. Defaults to `false`.
*/
noiseCancellationLeaveOpen?: boolean;
sampleRate?: number;
numChannels?: number;
frameSizeMs?: number;
Expand All @@ -24,26 +30,30 @@ export interface NoiseCancellationOptions {
options: Record<string, any>;
}

class AudioStreamSource implements UnderlyingSource<AudioFrame> {
export class AudioStreamSource implements UnderlyingSource<AudioFrame> {
private controller?: ReadableStreamDefaultController<AudioFrame>;
private ffiHandle: FfiHandle;
private disposed = false;
private sampleRate: number;
private numChannels: number;
private legacyNcOptions?: NoiseCancellationOptions;
private frameProcessor?: FrameProcessor<AudioFrame>;
private frameProcessor: FrameProcessor<AudioFrame> | null = null;
private leaveProcessorOpen = false;
private frameSizeMs?: number;
private track: Track;

constructor(
track: Track,
sampleRateOrOptions?: number | AudioStreamOptions,
numChannels?: number,
) {
this.track = track;
if (sampleRateOrOptions !== undefined && typeof sampleRateOrOptions !== 'number') {
this.sampleRate = sampleRateOrOptions.sampleRate ?? 48000;
this.numChannels = sampleRateOrOptions.numChannels ?? 1;
if (isFrameProcessor(sampleRateOrOptions.noiseCancellation)) {
this.frameProcessor = sampleRateOrOptions.noiseCancellation;
this.leaveProcessorOpen = sampleRateOrOptions.noiseCancellationLeaveOpen ?? false;
} else {
this.legacyNcOptions = sampleRateOrOptions.noiseCancellation;
}
Expand Down Expand Up @@ -77,6 +87,12 @@ class AudioStreamSource implements UnderlyingSource<AudioFrame> {
this.ffiHandle = new FfiHandle(res.stream!.handle!.id!);

FfiClient.instance.on(FfiClientEvent.FfiEvent, this.onEvent);
track.registerAudioStream(this);
}

/** @internal */
get processor(): FrameProcessor<AudioFrame> | null {
return this.frameProcessor;
}

private onEvent = (ev: FfiEvent) => {
Expand Down Expand Up @@ -113,8 +129,11 @@ class AudioStreamSource implements UnderlyingSource<AudioFrame> {
// while buffered frames are still in the ReadableStream queue.
if (!this.disposed) {
this.disposed = true;
this.track.unregisterAudioStream(this);
this.ffiHandle.dispose();
this.frameProcessor?.close();
if (this.frameProcessor && !this.leaveProcessorOpen) {
this.frameProcessor.close();
}
}
break;
}
Expand All @@ -128,10 +147,13 @@ class AudioStreamSource implements UnderlyingSource<AudioFrame> {
FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);
if (!this.disposed) {
this.disposed = true;
this.track.unregisterAudioStream(this);
this.ffiHandle.dispose();
// Also close the frame processor on cancel for symmetry with the EOS path,
// so resources are released regardless of how the stream ends.
this.frameProcessor?.close();
if (this.frameProcessor && !this.leaveProcessorOpen) {
this.frameProcessor.close();
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions packages/livekit-rtc/src/frame_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ export abstract class FrameProcessor<Frame extends VideoFrame | AudioFrame> {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
onStreamInfoUpdated(_info: FrameProcessorStreamInfo): void {}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
onStreamInfoCleared(): void {}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
onCredentialsUpdated(_credentials: FrameProcessorCredentials): void {}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
onCredentialsCleared(): void {}

abstract process(frame: Frame): Frame;
abstract close(): void;
Expand Down
8 changes: 8 additions & 0 deletions packages/livekit-rtc/src/room.ts
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Missing processor metadata update in localTrackRepublished handler causes stale/empty metadata after reconnection

The PR adds setRoom(this) calls in localTrackPublished, localTrackUnpublished, trackSubscribed, and trackUnsubscribed handlers to push processor metadata. However, the localTrackRepublished handler (fired during full reconnect when the SDK auto-republishes tracks) has no corresponding setRoom call or metadata push. After republish, the publication receives a new SID via publication.updateInfo(newInfo) at room.ts:567, but the track's info.sid (used by pushProcessorMetadataToStream at track.ts:125) is never updated. This means any subsequent call to pushProcessorMetadataToStream (e.g., when a new AudioStream is registered on the track after reconnection) will fail the lookup publication.sid === trackSid at track.ts:141 because the publication now has the new SID while the track still has the old one. The processor receives empty participantIdentity and publicationSid.

(Refers to lines 566-570)

Prompt for agents
In the localTrackRepublished handler in room.ts (around line 561-573), after the publication info is updated and re-keyed in the map, the track's processors are never notified of the new publication SID. Two things need to happen:

1. The track's own info.sid needs to be updated to match the new publication SID, because pushProcessorMetadataToStream in track.ts uses this.sid (which reads this.info?.sid) to look up the publication in the trackPublications map.

2. A metadata push should be triggered so that processors attached to existing AudioStreams on this track receive the updated stream info (new publicationSid).

A fix would be to add something like this inside the `if (publication)` block in the localTrackRepublished handler, after the publication is re-keyed:

  if (publication.track) {
    publication.track.info = { ...publication.track.info, sid: publication.sid }; // or however TrackInfo is updated
    publication.track.setRoom(this); // re-pushes metadata to all registered audio streams
  }

Note: setRoom(this) when oldRoom === room will still properly re-register the listener and push metadata to streams. However, be careful that the track's info structure is updated correctly (it's a protobuf TrackInfo object).
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,15 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
}
} else if (ev.case == 'localTrackPublished') {
const publication = this.localParticipant.trackPublications.get(ev.value.trackSid!);
if (publication?.track) {
publication.track.setRoom(this);
}
this.emit(RoomEvent.LocalTrackPublished, publication!, this.localParticipant);
} else if (ev.case == 'localTrackUnpublished') {
const publication = this.localParticipant.trackPublications.get(ev.value.publicationSid!);
if (publication?.track) {
publication.track.setRoom(null);
}
this.localParticipant.trackPublications.delete(ev.value.publicationSid!);
this.emit(RoomEvent.LocalTrackUnpublished, publication!, this.localParticipant!);
} else if ((ev.case as string) == 'localTrackRepublished') {
Expand Down Expand Up @@ -610,6 +616,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
} else if (trackInfo.kind == TrackKind.KIND_AUDIO) {
publication.track = new RemoteAudioTrack(ownedTrack);
}
publication.track?.setRoom(this);

this.emit(RoomEvent.TrackSubscribed, publication.track!, publication, participant);
} catch (e: unknown) {
Expand All @@ -622,6 +629,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
ev.value.trackSid!,
);
const track = publication.track!;
track.setRoom(null);
publication.track = undefined;
publication.subscribed = false;
this.emit(RoomEvent.TrackUnsubscribed, track, publication, participant);
Expand Down
132 changes: 132 additions & 0 deletions packages/livekit-rtc/src/track.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import type {
} from '@livekit/rtc-ffi-bindings';
import { CreateAudioTrackRequest, CreateVideoTrackRequest } from '@livekit/rtc-ffi-bindings';
import type { AudioSource } from './audio_source.js';
import type { AudioStreamSource } from './audio_stream.js';
import { FfiClient, FfiHandle } from './ffi_client.js';
import type { Room } from './room.js';
import type { VideoSource } from './video_source.js';

export abstract class Track {
Expand All @@ -21,9 +23,139 @@ export abstract class Track {
/** @internal */
ffi_handle: FfiHandle;

private roomRef: WeakRef<Room> | null = null;
private audioStreams: Set<WeakRef<AudioStreamSource>> = new Set();
private streamFinalizationRegistry: FinalizationRegistry<WeakRef<AudioStreamSource>>;
private onRoomTokenRefreshed = () => {
const room = this.resolveRoom();
if (!room || !room.token || !room.serverUrl) return;
for (const stream of this.iterateStreams()) {
const processor = stream.processor;
if (!processor) {
continue;
}
processor.onCredentialsUpdated({ token: room.token, url: room.serverUrl });
}
};

constructor(owned: OwnedTrack) {
this.info = owned.info;
this.ffi_handle = new FfiHandle(owned.handle!.id!);
this.streamFinalizationRegistry = new FinalizationRegistry<WeakRef<AudioStreamSource>>(
(ref) => {
this.audioStreams.delete(ref);
},
);
}

/** @internal */
resolveRoom(): Room | null {
return this.roomRef?.deref() ?? null;
}

/** @internal */
setRoom(room: Room | null): void {
const oldRoom = this.resolveRoom();
if (oldRoom && oldRoom !== room) {
oldRoom.off('tokenRefreshed', this.onRoomTokenRefreshed);
}
if (room) {
room.off('tokenRefreshed', this.onRoomTokenRefreshed);
room.on('tokenRefreshed', this.onRoomTokenRefreshed);
}
this.roomRef = room ? new WeakRef(room) : null;
for (const stream of this.iterateStreams()) {
this.pushProcessorMetadataToStream(stream, room);
}
}
Comment thread
1egoman marked this conversation as resolved.

/** @internal */
registerAudioStream(stream: AudioStreamSource): void {
const ref = new WeakRef(stream);
this.audioStreams.add(ref);
this.streamFinalizationRegistry.register(stream, ref);
const room = this.resolveRoom();
if (room) {
this.pushProcessorMetadataToStream(stream, room);
}
}

/** @internal */
unregisterAudioStream(stream: AudioStreamSource): void {
for (const ref of this.audioStreams) {
if (ref.deref() === stream) {
this.audioStreams.delete(ref);
return;
}
}
}

private *iterateStreams(): Generator<AudioStreamSource> {
const dead: Array<WeakRef<AudioStreamSource>> = [];
for (const ref of this.audioStreams) {
const stream = ref.deref();
if (stream) {
yield stream;
} else {
dead.push(ref);
}
}
for (const ref of dead) {
this.audioStreams.delete(ref);
}
}

private pushProcessorMetadataToStream(stream: AudioStreamSource, room: Room | null): void {
const processor = stream.processor;
if (!processor) {
return;
}

if (!room) {
// Guard with optional-call: plugins built against an older @livekit/rtc-node
// inherit a FrameProcessor base class that doesn't define these methods,
// so they could be undefined on the prototype chain.
processor.onStreamInfoCleared?.();
processor.onCredentialsCleared?.();
return;
}

let identity = '';
let publicationSid = '';
const trackSid = this.sid;
if (trackSid) {
let found = false;
for (const participant of room.remoteParticipants.values()) {
const publication = participant.trackPublications.get(trackSid);
if (publication) {
identity = participant.identity;
publicationSid = publication.sid ?? '';
found = true;
break;
}
}
if (!found) {
const local = room.localParticipant;
if (local) {
for (const publication of local.trackPublications.values()) {
if (publication.sid === trackSid) {
identity = local.identity;
publicationSid = publication.sid ?? '';
break;
}
}
}
}
}

processor.onStreamInfoUpdated({
roomName: room.name ?? '',
participantIdentity: identity,
publicationSid,
});
if (room.token && room.serverUrl) {
processor.onCredentialsUpdated({ token: room.token, url: room.serverUrl });
}
}

get sid(): string | undefined {
Expand Down
3 changes: 2 additions & 1 deletion packages/livekit-rtc/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "dist",
"declarationDir": "dist"
"declarationDir": "dist",
"lib": ["es2015", "es2021.weakref"]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an ok addition to be making here? Or does there need to be a WeakRef polyfill which falls back to some sort of a no-op class wrapper?

(in case it's useful, WeakRef was introduced in node v16.4: mdn link)

},
"include": ["src/**/*.ts"],
"exclude": ["src/**/*.test.ts", "vite.config.ts"]
Expand Down
Loading