Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 62 additions & 49 deletions js/lite/src/ietf/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Broadcast, type TrackRequest } from "../broadcast.ts";
import { Group } from "../group.ts";
import * as Path from "../path.ts";
import { type Reader, Stream } from "../stream.ts";
import type { Track } from "../track.ts";
import { Track } from "../track.ts";
import { error } from "../util/error.ts";
import type * as Control from "./control.ts";
import { Frame, type Group as GroupMessage } from "./object.ts";
Expand Down Expand Up @@ -37,19 +37,10 @@ export class Subscriber {
#announcedConsumers = new Set<Announced>();

// Our subscribed tracks - keyed by request ID
#subscribes = new Map<bigint, Track>();
#subscribes = new Map<bigint, { producer: Track; ok: PromiseWithResolvers<SubscribeOk> }>();

// A map of track aliases to request IDs
#trackAliases = new Map<bigint, bigint>();

// Track subscription responses - keyed by request ID
#subscribeCallbacks = new Map<
bigint,
{
resolve: (msg: SubscribeOk) => void;
reject: (msg: Error) => void;
}
>();
// A map of track aliases to tracks because moq-transport is DUMB
#trackAliases = new Map<bigint, Track | PromiseWithResolvers<Track>>();

#quic: WebTransport;

Expand Down Expand Up @@ -201,7 +192,8 @@ export class Subscriber {
const requestId = await this.#control.nextRequestId();
if (requestId === undefined) return;

this.#subscribes.set(requestId, request.track);
const okPending = Promise.withResolvers<SubscribeOk>();
this.#subscribes.set(requestId, { producer: request.track, ok: okPending });

console.debug(`subscribe start: id=${requestId} broadcast=${broadcast} track=${request.track.name}`);

Expand All @@ -212,17 +204,22 @@ export class Subscriber {
subscriberPriority: request.priority,
});

// Send SUBSCRIBE message on control stream and wait for response
const responsePromise = new Promise<SubscribeOk>((resolve, reject) => {
this.#subscribeCallbacks.set(requestId, { resolve, reject });
});

await this.#control.write(msg);

try {
const ok = await responsePromise;
this.#trackAliases.set(ok.trackAlias, requestId);
console.debug(`subscribe ok: id=${requestId} broadcast=${broadcast} track=${request.track.name}`);
const ok = await okPending.promise;
console.debug(
`subscribe ok: id=${requestId} broadcast=${broadcast} track=${request.track.name} alias=${ok.trackAlias}`,
);

// If a data stream arrived first (coin flip), we need to resolve the existing promise
const pending = this.#trackAliases.get(ok.trackAlias);
if (pending && typeof pending === "object" && "resolve" in pending) {
pending.resolve(request.track);
}

// Store the track for the track alias so we don't need a promise any longer
this.#trackAliases.set(ok.trackAlias, request.track);

try {
await request.track.closed;
Expand All @@ -231,6 +228,7 @@ export class Subscriber {
await this.#control.write(msg);
console.debug(`unsubscribe: id=${requestId} broadcast=${broadcast} track=${request.track.name}`);
} finally {
// NOTE: A late data stream might recreate this, but it will clean it up itself.
this.#trackAliases.delete(ok.trackAlias);
}
} catch (err) {
Expand All @@ -242,7 +240,6 @@ export class Subscriber {
);
} finally {
this.#subscribes.delete(requestId);
this.#subscribeCallbacks.delete(requestId);
}
}

Expand All @@ -253,12 +250,13 @@ export class Subscriber {
* @internal
*/
async handleSubscribeOk(msg: SubscribeOk) {
const callback = this.#subscribeCallbacks.get(msg.requestId);
if (callback) {
callback.resolve(msg);
} else {
const subscribe = this.#subscribes.get(msg.requestId);
if (!subscribe) {
console.warn("handleSubscribeOk unknown requestId", msg.requestId);
return;
}

subscribe.ok.resolve(msg);
}

/**
Expand All @@ -268,12 +266,13 @@ export class Subscriber {
* @internal
*/
async handleSubscribeError(msg: SubscribeError) {
const callback = this.#subscribeCallbacks.get(msg.requestId);
if (callback) {
callback.reject(new Error(`SUBSCRIBE_ERROR: code=${msg.errorCode} reason=${msg.reasonPhrase}`));
} else {
const subscribe = this.#subscribes.get(msg.requestId);
if (!subscribe) {
console.warn("handleSubscribeError unknown requestId", msg.requestId);
return;
}

subscribe.ok.reject(new Error(`SUBSCRIBE_ERROR: code=${msg.errorCode} reason=${msg.reasonPhrase}`));
}

/**
Expand All @@ -291,18 +290,28 @@ export class Subscriber {
}

try {
let requestId = this.#trackAliases.get(group.trackAlias);
if (requestId === undefined) {
// Just hope the track alias is the request ID
requestId = group.trackAlias;
console.warn("unknown track alias, using request ID");
let track = this.#trackAliases.get(group.trackAlias);
if (!track) {
track = Promise.withResolvers<Track>();
this.#trackAliases.set(group.trackAlias, track);
}

const track = this.#subscribes.get(requestId);
if (!track) {
throw new Error(
`unknown track: trackAlias=${group.trackAlias} requestId=${this.#trackAliases.get(group.trackAlias)}`,
);
if (!(track instanceof Track)) {
// Because moq-transport is DUMB, we need to wait for the track alias to be resolved.
// This is only known from the SUBSCRIBE_OK message, which will lose the race at least half the time.
// We can't block forever because this could be a late data stream too.
let cancel!: ReturnType<typeof setTimeout>;
const timeout = new Promise<Track | undefined>((resolve) => {
cancel = setTimeout(() => resolve(undefined), 1000);
});
track = await Promise.race([track.promise, timeout]);
clearTimeout(cancel);

if (track === undefined) {
// Clean up after ourselves if we timed out looking up the track alias
this.#trackAliases.delete(group.trackAlias);
throw new Error(`timeout waiting for track alias: ${group.trackAlias}`);
}
Comment on lines +293 to +314
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Race condition: timeout cleanup can delete a valid Track entry set by a concurrent handleSubscribeOk.

If two handleGroup calls race on the same unresolved alias, the first timeout deletes the map entry at line 312. When handleSubscribeOk arrives afterwards, it stores the Track at line 222. But the second handleGroup's promise is already orphaned (never resolved), so its timeout also fires and deletes the freshly-stored Track—breaking all subsequent groups for that alias.

Guard the deletion so it only removes the entry it owns:

🐛 Proposed fix
+			// Keep a reference to the deferred we created / obtained so we can guard cleanup
+			const deferred = track;
+
 			if (!(track instanceof Track)) {
 				let cancel!: ReturnType<typeof setTimeout>;
 				const timeout = new Promise<Track | undefined>((resolve) => {
 					cancel = setTimeout(() => resolve(undefined), 1000);
 				});
 				track = await Promise.race([track.promise, timeout]);
 				clearTimeout(cancel);
 
 				if (track === undefined) {
-					this.#trackAliases.delete(group.trackAlias);
+					// Only clean up if the map still holds our deferred; another path may have replaced it.
+					if (this.#trackAliases.get(group.trackAlias) === deferred) {
+						this.#trackAliases.delete(group.trackAlias);
+					}
 					throw new Error(`timeout waiting for track alias: ${group.trackAlias}`);
 				}
 			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let track = this.#trackAliases.get(group.trackAlias);
if (!track) {
track = Promise.withResolvers<Track>();
this.#trackAliases.set(group.trackAlias, track);
}
const track = this.#subscribes.get(requestId);
if (!track) {
throw new Error(
`unknown track: trackAlias=${group.trackAlias} requestId=${this.#trackAliases.get(group.trackAlias)}`,
);
if (!(track instanceof Track)) {
// Because moq-transport is DUMB, we need to wait for the track alias to be resolved.
// This is only known from the SUBSCRIBE_OK message, which will lose the race at least half the time.
// We can't block forever because this could be a late data stream too.
let cancel!: ReturnType<typeof setTimeout>;
const timeout = new Promise<Track | undefined>((resolve) => {
cancel = setTimeout(() => resolve(undefined), 1000);
});
track = await Promise.race([track.promise, timeout]);
clearTimeout(cancel);
if (track === undefined) {
// Clean up after ourselves if we timed out looking up the track alias
this.#trackAliases.delete(group.trackAlias);
throw new Error(`timeout waiting for track alias: ${group.trackAlias}`);
}
let track = this.#trackAliases.get(group.trackAlias);
if (!track) {
track = Promise.withResolvers<Track>();
this.#trackAliases.set(group.trackAlias, track);
}
// Keep a reference to the deferred we created / obtained so we can guard cleanup
const deferred = track;
if (!(track instanceof Track)) {
// Because moq-transport is DUMB, we need to wait for the track alias to be resolved.
// This is only known from the SUBSCRIBE_OK message, which will lose the race at least half the time.
// We can't block forever because this could be a late data stream too.
let cancel!: ReturnType<typeof setTimeout>;
const timeout = new Promise<Track | undefined>((resolve) => {
cancel = setTimeout(() => resolve(undefined), 1000);
});
track = await Promise.race([track.promise, timeout]);
clearTimeout(cancel);
if (track === undefined) {
// Only clean up if the map still holds our deferred; another path may have replaced it.
if (this.#trackAliases.get(group.trackAlias) === deferred) {
this.#trackAliases.delete(group.trackAlias);
}
throw new Error(`timeout waiting for track alias: ${group.trackAlias}`);
}
🤖 Prompt for AI Agents
In `@js/lite/src/ietf/subscriber.ts` around lines 293 - 314, The timeout cleanup
can delete a Track set later by handleSubscribeOk because it unconditionally
deletes this.#trackAliases entry; fix by capturing the original unresolved
promise (the value returned by Promise.withResolvers<Track>() assigned to track
before awaiting) and, when timing out, only delete the map entry if
this.#trackAliases.get(group.trackAlias) === that original promise; keep
throwing the timeout error as before but avoid removing a newly-stored Track
from handleSubscribeOk.

}

// Convert to Group (moq-lite equivalent)
Expand Down Expand Up @@ -358,10 +367,15 @@ export class Subscriber {
*/
async handlePublishDone(msg: PublishDone) {
// For lite compatibility, we treat this as subscription completion
const callback = this.#subscribeCallbacks.get(msg.requestId);
if (callback) {
callback.reject(new Error(`PUBLISH_DONE: code=${msg.statusCode} reason=${msg.reasonPhrase}`));
const subscribe = this.#subscribes.get(msg.requestId);
if (!subscribe) {
console.warn("unknown PUBLISH_DONE requestId", msg.requestId);
return;
}

// Just in case we didn't get a SUBSCRIBE_OK, reject the promise
subscribe.ok.reject(new Error(`PUBLISH_DONE: code=${msg.statusCode} reason=${msg.reasonPhrase}`));
this.#subscribes.delete(msg.requestId);
}

/**
Expand Down Expand Up @@ -432,11 +446,10 @@ export class Subscriber {
// v15: REQUEST_ERROR replaces SubscribeNamespaceError, etc.
async handleRequestError(msg: RequestError) {
// In v15, RequestError replaces SubscribeError for subscribe requests
const callback = this.#subscribeCallbacks.get(msg.requestId);
if (callback) {
callback.reject(new Error(`REQUEST_ERROR: code=${msg.errorCode} reason=${msg.reasonPhrase}`));
} else {
console.warn("handleRequestError unknown requestId", msg.requestId);
const subscribe = this.#subscribes.get(msg.requestId);
if (subscribe) {
subscribe.ok.reject(new Error(`REQUEST_ERROR: code=${msg.errorCode} reason=${msg.reasonPhrase}`));
this.#subscribes.delete(msg.requestId);
}
}
}
Loading