diff --git a/js/lite/src/ietf/subscriber.ts b/js/lite/src/ietf/subscriber.ts index db64345db..d13ffa879 100644 --- a/js/lite/src/ietf/subscriber.ts +++ b/js/lite/src/ietf/subscriber.ts @@ -3,7 +3,7 @@ import { Broadcast, type TrackRequest } from "../broadcast.ts"; import { Group } from "../group.ts"; import * as Path from "../path.ts"; import { type Reader, Stream } from "../stream.ts"; -import type { Track } from "../track.ts"; +import { Track } from "../track.ts"; import { error } from "../util/error.ts"; import type * as Control from "./control.ts"; import { Frame, type Group as GroupMessage } from "./object.ts"; @@ -37,19 +37,10 @@ export class Subscriber { #announcedConsumers = new Set(); // Our subscribed tracks - keyed by request ID - #subscribes = new Map(); + #subscribes = new Map }>(); - // A map of track aliases to request IDs - #trackAliases = new Map(); - - // Track subscription responses - keyed by request ID - #subscribeCallbacks = new Map< - bigint, - { - resolve: (msg: SubscribeOk) => void; - reject: (msg: Error) => void; - } - >(); + // A map of track aliases to tracks because moq-transport is DUMB + #trackAliases = new Map>(); #quic: WebTransport; @@ -201,7 +192,8 @@ export class Subscriber { const requestId = await this.#control.nextRequestId(); if (requestId === undefined) return; - this.#subscribes.set(requestId, request.track); + const okPending = Promise.withResolvers(); + this.#subscribes.set(requestId, { producer: request.track, ok: okPending }); console.debug(`subscribe start: id=${requestId} broadcast=${broadcast} track=${request.track.name}`); @@ -212,17 +204,22 @@ export class Subscriber { subscriberPriority: request.priority, }); - // Send SUBSCRIBE message on control stream and wait for response - const responsePromise = new Promise((resolve, reject) => { - this.#subscribeCallbacks.set(requestId, { resolve, reject }); - }); - await this.#control.write(msg); try { - const ok = await responsePromise; - this.#trackAliases.set(ok.trackAlias, requestId); - console.debug(`subscribe ok: id=${requestId} broadcast=${broadcast} track=${request.track.name}`); + const ok = await okPending.promise; + console.debug( + `subscribe ok: id=${requestId} broadcast=${broadcast} track=${request.track.name} alias=${ok.trackAlias}`, + ); + + // If a data stream arrived first (coin flip), we need to resolve the existing promise + const pending = this.#trackAliases.get(ok.trackAlias); + if (pending && typeof pending === "object" && "resolve" in pending) { + pending.resolve(request.track); + } + + // Store the track for the track alias so we don't need a promise any longer + this.#trackAliases.set(ok.trackAlias, request.track); try { await request.track.closed; @@ -231,6 +228,7 @@ export class Subscriber { await this.#control.write(msg); console.debug(`unsubscribe: id=${requestId} broadcast=${broadcast} track=${request.track.name}`); } finally { + // NOTE: A late data stream might recreate this, but it will clean it up itself. this.#trackAliases.delete(ok.trackAlias); } } catch (err) { @@ -242,7 +240,6 @@ export class Subscriber { ); } finally { this.#subscribes.delete(requestId); - this.#subscribeCallbacks.delete(requestId); } } @@ -253,12 +250,13 @@ export class Subscriber { * @internal */ async handleSubscribeOk(msg: SubscribeOk) { - const callback = this.#subscribeCallbacks.get(msg.requestId); - if (callback) { - callback.resolve(msg); - } else { + const subscribe = this.#subscribes.get(msg.requestId); + if (!subscribe) { console.warn("handleSubscribeOk unknown requestId", msg.requestId); + return; } + + subscribe.ok.resolve(msg); } /** @@ -268,12 +266,13 @@ export class Subscriber { * @internal */ async handleSubscribeError(msg: SubscribeError) { - const callback = this.#subscribeCallbacks.get(msg.requestId); - if (callback) { - callback.reject(new Error(`SUBSCRIBE_ERROR: code=${msg.errorCode} reason=${msg.reasonPhrase}`)); - } else { + const subscribe = this.#subscribes.get(msg.requestId); + if (!subscribe) { console.warn("handleSubscribeError unknown requestId", msg.requestId); + return; } + + subscribe.ok.reject(new Error(`SUBSCRIBE_ERROR: code=${msg.errorCode} reason=${msg.reasonPhrase}`)); } /** @@ -291,18 +290,28 @@ export class Subscriber { } try { - let requestId = this.#trackAliases.get(group.trackAlias); - if (requestId === undefined) { - // Just hope the track alias is the request ID - requestId = group.trackAlias; - console.warn("unknown track alias, using request ID"); + let track = this.#trackAliases.get(group.trackAlias); + if (!track) { + track = Promise.withResolvers(); + this.#trackAliases.set(group.trackAlias, track); } - const track = this.#subscribes.get(requestId); - if (!track) { - throw new Error( - `unknown track: trackAlias=${group.trackAlias} requestId=${this.#trackAliases.get(group.trackAlias)}`, - ); + if (!(track instanceof Track)) { + // Because moq-transport is DUMB, we need to wait for the track alias to be resolved. + // This is only known from the SUBSCRIBE_OK message, which will lose the race at least half the time. + // We can't block forever because this could be a late data stream too. + let cancel!: ReturnType; + const timeout = new Promise((resolve) => { + cancel = setTimeout(() => resolve(undefined), 1000); + }); + track = await Promise.race([track.promise, timeout]); + clearTimeout(cancel); + + if (track === undefined) { + // Clean up after ourselves if we timed out looking up the track alias + this.#trackAliases.delete(group.trackAlias); + throw new Error(`timeout waiting for track alias: ${group.trackAlias}`); + } } // Convert to Group (moq-lite equivalent) @@ -358,10 +367,15 @@ export class Subscriber { */ async handlePublishDone(msg: PublishDone) { // For lite compatibility, we treat this as subscription completion - const callback = this.#subscribeCallbacks.get(msg.requestId); - if (callback) { - callback.reject(new Error(`PUBLISH_DONE: code=${msg.statusCode} reason=${msg.reasonPhrase}`)); + const subscribe = this.#subscribes.get(msg.requestId); + if (!subscribe) { + console.warn("unknown PUBLISH_DONE requestId", msg.requestId); + return; } + + // Just in case we didn't get a SUBSCRIBE_OK, reject the promise + subscribe.ok.reject(new Error(`PUBLISH_DONE: code=${msg.statusCode} reason=${msg.reasonPhrase}`)); + this.#subscribes.delete(msg.requestId); } /** @@ -432,11 +446,10 @@ export class Subscriber { // v15: REQUEST_ERROR replaces SubscribeNamespaceError, etc. async handleRequestError(msg: RequestError) { // In v15, RequestError replaces SubscribeError for subscribe requests - const callback = this.#subscribeCallbacks.get(msg.requestId); - if (callback) { - callback.reject(new Error(`REQUEST_ERROR: code=${msg.errorCode} reason=${msg.reasonPhrase}`)); - } else { - console.warn("handleRequestError unknown requestId", msg.requestId); + const subscribe = this.#subscribes.get(msg.requestId); + if (subscribe) { + subscribe.ok.reject(new Error(`REQUEST_ERROR: code=${msg.errorCode} reason=${msg.reasonPhrase}`)); + this.#subscribes.delete(msg.requestId); } } }