Skip to content

Add dual MSF + Hang catalog support in @moq/watch#1147

Open
kixelated wants to merge 14 commits intodevfrom
msf-playback
Open

Add dual MSF + Hang catalog support in @moq/watch#1147
kixelated wants to merge 14 commits intodevfrom
msf-playback

Conversation

@kixelated
Copy link
Collaborator

@kixelated kixelated commented Mar 21, 2026

Summary

  • Create @moq/msf package with Zod schemas for MSF catalog format (matching rs/moq-msf types)
  • Update @moq/watch broadcast to subscribe to both catalog.json (hang) and catalog (MSF) in parallel
  • Hang gets a 100ms headstart; first catalog to arrive wins, loser track is closed
  • Add MSF→Hang conversion (toHang()) so the player works with MSF-only publishers
  • Includes Rust-side subscription API changes: UnknownBroadcast/UnknownTrack error split, TrackSubscriber redesign, and moq_mux consumer updates

Test plan

  • just check passes
  • Connect to relay publishing both formats — hang catalog should win due to headstart
  • Connect to MSF-only source — player constructs hang catalog from MSF correctly
  • Test catalog: ["hang"] and catalog: ["msf"] for single-format mode

🤖 Generated with Claude Code

kixelated and others added 14 commits March 18, 2026 14:40
- Add `ordered` and `max_latency` fields to `Track` struct
- Create `Subscription` struct for per-consumer preferences
- Create `TrackSubscription` type with RAII lifecycle (auto-removes on drop)
- Add `TrackProducer::poll_max` for aggregate subscription change detection
- Add `TrackConsumer::subscribe()` to register subscriptions
- Add `GroupProducer::is_aborted()` for duplicate group handling
- Allow `create_group` to replace aborted groups (for subscription restart)
- Replace `BroadcastDynamic::requested_track() -> TrackProducer` with
  `requested_track() -> TrackRequest` (handler creates producer, responds)
- Make `BroadcastConsumer::subscribe_track` async with request/response
- Add `BroadcastDynamic::insert_track` for handler-side track insertion
- Add dedup for pending track requests (multiple subscribers share reply)
- Update all callers across the workspace

Known issues:
- libmoq FFI tests deadlock (likely BroadcastConsumer upgrade in async context)
- moq-mux tests may deadlock similarly

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Track is now just `{ name: String }` — no priority/ordered/max_latency
- Subscription gains `start: Option<u64>` and `end: Option<u64>` fields
- Aggregation: start = min (None wins), end = max (None wins)
- No cap/ceiling from producer — aggregation is purely across subscribers
- TrackProducer::subscription() is async, blocks until aggregate changes
- TrackProducer stores prev_subscription internally (no prev arg needed)
- Remove TrackConsumer::start_at() — use Subscription.start instead
- Remove subscription tests (will rewrite in follow-up)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Revert broadcast.rs to sync: consume_track() looks up or creates
  tracks synchronously, requested_track() returns TrackProducer directly
- Remove TrackRequest and oneshot channel machinery
- Replace TrackSubscription with TrackSubscriber using conducer-based
  subscriptions for two-level poll aggregation
- TrackConsumer::subscribe() is async, blocks until first group exists
- TrackSubscriber::recv_group() respects start/end range
- Fix create_group aborted replacement: tombstone old entry, push new
- Update all callers to use consume_track (sync) or subscribe_track
- Fix moq-ffi to remove RUNTIME.block_on() now that consume_track is sync

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- libmoq: move consume_track + consumer construction before tokio::spawn
  so errors surface synchronously
- lite/ietf subscriber: take TrackProducer by value instead of &mut

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
None now means "no preference" rather than "latest wins all."
Callers wanting "latest" should start with None, then update()
with a concrete value once latest() is known.

Also: replace expect() with ? in consume_track, remove dead comment.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove recv_group/poll_recv_group from TrackConsumer (deprecated stubs remain)
- Remove index field from TrackConsumer
- Add poll_recv_group, recv_group, closed, poll_closed to TrackSubscriber
- Rename OrderedConsumer → OrderedSubscriber (takes TrackSubscriber)
- CatalogConsumer now takes TrackSubscriber
- Update all protocol publishers to subscribe() before iterating groups
- Update moq-ffi: subscribe_catalog/subscribe_media are now async
- Update libmoq: move subscribe into spawned tasks
- Update moq-mux: CatalogProducer::consume() is now async
- Fix hang tests: write first group before subscribe to avoid deadlock
- Fix hang tests: await tokio::time::sleep in paused-time tests
- Add unit tests for subscribe() blocking on first group, finish, abort

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Deprecate the generic NotFound error in favor of more specific
variants to make debugging easier. Each maps to its own wire code.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The cleanup task was firing immediately (unused() resolved with zero
consumers), removing tracks from the lookup before any subscriber
could find them. This matches main's behavior: stale entries are
cleaned up lazily by consume_track when it finds closed weak refs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Take dev's moq_mux::consumer types (OrderedConsumer, LegacyConsumer)
- Update them to use TrackSubscriber instead of TrackConsumer
- Take dev's deprecated stubs in hang::container::consumer
- Fix moq-cli subscribe to use consume_track + subscribe
- Fix moq-ffi to use LegacyConsumer with TrackSubscriber

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Create @moq/msf package with Zod schemas for MSF catalog format, and
update the watch broadcast to subscribe to both catalog formats in
parallel. Hang gets a 100ms headstart; first catalog to arrive wins.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 21, 2026

Walkthrough

This pull request introduces a new MSF (Media Streaming Format) catalog package to the JavaScript workspace and refactors the Rust subscription architecture across multiple crates. The @moq/msf package provides Zod-based schema definitions and encode/decode utilities for MSF catalogs, with integration into the js/watch broadcast module to support multiple catalog formats (Hang and MSF). The Rust codebase undergoes a systematic shift from a single-step subscribe_track() API to a two-step consume_track() followed by async subscribe() pattern, introducing a new Subscription struct for per-subscriber preferences and TrackSubscriber type for group iteration. The Track struct is simplified to exclude the priority field, and new error variants (UnknownBroadcast, UnknownTrack) are added to replace the deprecated NotFound type across all affected modules.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and concisely summarizes the main change: adding dual MSF + Hang catalog support to the @moq/watch package.
Description check ✅ Passed The description is well-related to the changeset, providing clear context about creating the @moq/msf package, implementing MSF→Hang conversion, and updating Broadcast for parallel catalog subscription.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch msf-playback
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch msf-playback

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rs/moq-mux/src/convert/fmp4.rs (1)

37-50: ⚠️ Potential issue | 🟠 Major

Avoid keeping CatalogGuard alive across the awaited track setup.

CatalogGuard publishes on drop, but here it stays live across every subscribe_track(...).await and gets mutated before later create_track(...) calls can fail. That means one slow/missing source track stalls the first catalog publish, and a mid-loop error can still drop/publish a partial catalog that points at tracks you never finished wiring up.

Also applies to: 80-106, 119-123

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/convert/fmp4.rs` around lines 37 - 50, The CatalogGuard
(output_catalog.lock() -> guard) is held across await points and possible
failures (subscribe_track.await and broadcast.create_track), causing premature
or partial catalog publishes; instead, call subscribe_track(...).await and
attempt broadcast.create_track(...) first without holding the guard, and only
after create_track succeeds acquire output_catalog.lock(), insert the rendition
into catalog.video.renditions (and any other catalog mutations), then drop the
guard immediately so the lock is not held across awaits or long ops. Update the
loop around subscribe_track, create_track, and guard usage in the code that
touches output_catalog, subscribe_track, and broadcast.create_track so the guard
is only alive for the minimal mutation window.
🧹 Nitpick comments (6)
js/watch/src/msf.ts (2)

43-44: Consider extracting default audio constants.

The default sampleRate of 48000 and numberOfChannels of 2 are reasonable but could be named constants for clarity.

♻️ Optional: Named constants
+const DEFAULT_SAMPLE_RATE = 48000;
+const DEFAULT_CHANNEL_COUNT = 2;
+
 function toAudioConfig(track: Msf.Track): Catalog.AudioConfig | undefined {
 	if (!track.codec) return undefined;

 	return {
 		codec: track.codec,
 		container: toContainer(track),
 		description: track.packaging !== "cmaf" && track.initData ? base64ToHex(track.initData) : undefined,
-		sampleRate: u53(track.samplerate ?? 48000),
-		numberOfChannels: u53(track.channelConfig ? Number.parseInt(track.channelConfig, 10) : 2),
+		sampleRate: u53(track.samplerate ?? DEFAULT_SAMPLE_RATE),
+		numberOfChannels: u53(track.channelConfig ? Number.parseInt(track.channelConfig, 10) : DEFAULT_CHANNEL_COUNT),
 		bitrate: track.bitrate != null ? u53(track.bitrate) : undefined,
 	};
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/msf.ts` around lines 43 - 44, Extract the literal defaults into
named constants (e.g., DEFAULT_SAMPLE_RATE = 48000 and
DEFAULT_NUMBER_OF_CHANNELS = 2) and replace the inline literals in the
sampleRate and numberOfChannels expressions so the code becomes
u53(track.samplerate ?? DEFAULT_SAMPLE_RATE) and
Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS; declare
the constants near the top of js/watch/src/msf.ts so they are reusable and
self-documenting.

6-13: Consider handling invalid base64 gracefully.

atob() throws a DOMException if the input is not valid base64. If an MSF catalog contains malformed initData, this will cause an uncaught exception without helpful context.

♻️ Suggested defensive wrapper
 function base64ToHex(b64: string): string {
-	const raw = atob(b64);
+	let raw: string;
+	try {
+		raw = atob(b64);
+	} catch {
+		console.warn("invalid base64 initData, skipping conversion");
+		return "";
+	}
 	let hex = "";
 	for (let i = 0; i < raw.length; i++) {
 		hex += raw.charCodeAt(i).toString(16).padStart(2, "0");
 	}
 	return hex;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/msf.ts` around lines 6 - 13, The base64ToHex function currently
calls atob(b64) which will throw a DOMException for invalid base64; wrap the
atob call in a try/catch inside base64ToHex, handle the error by either
returning a safe sentinel (e.g., null or empty string) or rethrowing a clearer
Error with context (include the original exception and the offending b64 value),
and ensure callers of base64ToHex handle the sentinel/exception. Update
base64ToHex to validate or catch atob errors and document the chosen failure
behavior so callers (where initData is parsed) can react appropriately.
js/watch/src/broadcast.ts (2)

146-148: Redundant nullish coalescing.

update is already Catalog.Root | undefined, so update ?? undefined is equivalent to just update.

-								return update ?? undefined;
+								return update;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/broadcast.ts` around lines 146 - 148, The return statement
unnecessarily uses nullish coalescing on a value already typed as Catalog.Root |
undefined; replace the redundant "return update ?? undefined;" with a plain
"return update;" (locate the occurrence where the variable update is returned
after the array close in broadcast.ts) so the behavior and types remain
unchanged while removing the superfluous operator.

117-117: Consider extracting the headstart delay as a named constant.

The 100ms delay is a tuning parameter that may need adjustment. A named constant would clarify intent and make it easier to find/change.

♻️ Optional: Named constant
+const HANG_HEADSTART_MS = 100;
+
 // In `#runCatalog`:
 				const msfFetch = msfTrack
-					? new Promise((r) => setTimeout(r, 100))
+					? new Promise((r) => setTimeout(r, HANG_HEADSTART_MS))
 							.then(() => Msf.fetch(msfTrack))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/broadcast.ts` at line 117, Extract the magic number 100ms used
in the headstart delay into a named top-level constant (e.g., HEADSTART_DELAY_MS
or BROADCAST_HEADSTART_MS) in js/watch/src/broadcast.ts and replace the inline
literal in the ternary new Promise((r) => setTimeout(r, 100)) with that
constant; declare it near the top of the module (and export it if other modules
may tune it) so the tuning parameter is clearly named and easy to change.
rs/moq-mux/src/catalog.rs (1)

57-60: Make the consumed catalog format explicit.

CatalogProducer now publishes both catalog.json and catalog, but consume() still hardcodes the Hang path. Renaming this to something like consume_hang() or taking a format parameter would make the public API less surprising.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/catalog.rs` around lines 57 - 60, The public method consume()
currently always subscribes to the Hang path but CatalogProducer now publishes
both catalog.json and catalog, so update the API to make format explicit: either
rename consume() to consume_hang() and leave behavior unchanged, or add a
parameter (e.g., format: CatalogFormat enum with variants Hang and Json) to
consume() and branch on it to subscribe to the appropriate track (e.g., use
self.hang_track.consume() for Hang and the JSON track for Json). Update
hang::CatalogConsumer construction or add a new consumer type if needed, and
adjust all call sites and tests to use the new name or pass the format argument;
also update public docs/comments for the changed function.
rs/moq-lite/src/ietf/subscriber.rs (1)

521-522: Minor: Redundant variable rename.

The parameter is renamed to broadcast_path and immediately aliased back to broadcast. Consider keeping the original parameter name broadcast if the rest of the function uses that name.

Suggested simplification
-	async fn run_subscribe(&mut self, broadcast_path: Path<'_>, mut track: TrackProducer) {
-		let broadcast = broadcast_path;
+	async fn run_subscribe(&mut self, broadcast: Path<'_>, mut track: TrackProducer) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 521 - 522, The function
run_subscribe has a redundant alias: parameter broadcast_path is immediately
assigned to broadcast; update the function signature async fn run_subscribe(&mut
self, broadcast: Path<'_>, mut track: TrackProducer) and remove the temporary
let broadcast = broadcast_path; so all uses refer directly to the broadcast
parameter (or alternatively replace subsequent uses with broadcast_path if you
prefer that name) — adjust the signature and remove the redundant binding in
run_subscribe to eliminate the unnecessary rename.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@js/watch/src/broadcast.ts`:
- Around line 111-126: The current Promise.race can return undefined if
hangFetch resolves to undefined before msfFetch wakes up; change the logic so
only successful fetches compete: wrap hangFetch and msfFetch so they reject when
they would yield undefined (or use Promise.any with those wrapped promises) and
keep effect.cancel in the cancel slot; update the array built from hangFetch,
msfFetch and effect.cancel accordingly and then await the winner (e.g., await
Promise.any(wrappedCandidates)) so a quick failed hangFetch no longer causes an
early return and MSF can be selected when hang returns nothing.

In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 192-201: The Subscribe message is hardcoded with priority: 0 in
run_subscribe, preventing per-track priority; update the subscriber API to
accept and propagate a priority and set msg.priority accordingly: either add a
priority field to Track or TrackProducer (e.g., Track::priority or
TrackProducer::priority) and read it when building lite::Subscribe, or add a
priority parameter to run_subscribe so callers can pass the desired priority;
ensure the created Subscribe (in run_subscribe) uses that priority value instead
of 0 so the publisher can honor subscribe.priority when ordering deliveries.

---

Outside diff comments:
In `@rs/moq-mux/src/convert/fmp4.rs`:
- Around line 37-50: The CatalogGuard (output_catalog.lock() -> guard) is held
across await points and possible failures (subscribe_track.await and
broadcast.create_track), causing premature or partial catalog publishes;
instead, call subscribe_track(...).await and attempt broadcast.create_track(...)
first without holding the guard, and only after create_track succeeds acquire
output_catalog.lock(), insert the rendition into catalog.video.renditions (and
any other catalog mutations), then drop the guard immediately so the lock is not
held across awaits or long ops. Update the loop around subscribe_track,
create_track, and guard usage in the code that touches output_catalog,
subscribe_track, and broadcast.create_track so the guard is only alive for the
minimal mutation window.

---

Nitpick comments:
In `@js/watch/src/broadcast.ts`:
- Around line 146-148: The return statement unnecessarily uses nullish
coalescing on a value already typed as Catalog.Root | undefined; replace the
redundant "return update ?? undefined;" with a plain "return update;" (locate
the occurrence where the variable update is returned after the array close in
broadcast.ts) so the behavior and types remain unchanged while removing the
superfluous operator.
- Line 117: Extract the magic number 100ms used in the headstart delay into a
named top-level constant (e.g., HEADSTART_DELAY_MS or BROADCAST_HEADSTART_MS) in
js/watch/src/broadcast.ts and replace the inline literal in the ternary new
Promise((r) => setTimeout(r, 100)) with that constant; declare it near the top
of the module (and export it if other modules may tune it) so the tuning
parameter is clearly named and easy to change.

In `@js/watch/src/msf.ts`:
- Around line 43-44: Extract the literal defaults into named constants (e.g.,
DEFAULT_SAMPLE_RATE = 48000 and DEFAULT_NUMBER_OF_CHANNELS = 2) and replace the
inline literals in the sampleRate and numberOfChannels expressions so the code
becomes u53(track.samplerate ?? DEFAULT_SAMPLE_RATE) and
Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS; declare
the constants near the top of js/watch/src/msf.ts so they are reusable and
self-documenting.
- Around line 6-13: The base64ToHex function currently calls atob(b64) which
will throw a DOMException for invalid base64; wrap the atob call in a try/catch
inside base64ToHex, handle the error by either returning a safe sentinel (e.g.,
null or empty string) or rethrowing a clearer Error with context (include the
original exception and the offending b64 value), and ensure callers of
base64ToHex handle the sentinel/exception. Update base64ToHex to validate or
catch atob errors and document the chosen failure behavior so callers (where
initData is parsed) can react appropriately.

In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 521-522: The function run_subscribe has a redundant alias:
parameter broadcast_path is immediately assigned to broadcast; update the
function signature async fn run_subscribe(&mut self, broadcast: Path<'_>, mut
track: TrackProducer) and remove the temporary let broadcast = broadcast_path;
so all uses refer directly to the broadcast parameter (or alternatively replace
subsequent uses with broadcast_path if you prefer that name) — adjust the
signature and remove the redundant binding in run_subscribe to eliminate the
unnecessary rename.

In `@rs/moq-mux/src/catalog.rs`:
- Around line 57-60: The public method consume() currently always subscribes to
the Hang path but CatalogProducer now publishes both catalog.json and catalog,
so update the API to make format explicit: either rename consume() to
consume_hang() and leave behavior unchanged, or add a parameter (e.g., format:
CatalogFormat enum with variants Hang and Json) to consume() and branch on it to
subscribe to the appropriate track (e.g., use self.hang_track.consume() for Hang
and the JSON track for Json). Update hang::CatalogConsumer construction or add a
new consumer type if needed, and adjust all call sites and tests to use the new
name or pass the format argument; also update public docs/comments for the
changed function.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 10bb7c3e-9894-4069-ad11-b10ffd7b82a2

📥 Commits

Reviewing files that changed from the base of the PR and between 2626a25 and 73fd42b.

⛔ Files ignored due to path filters (1)
  • bun.lock is excluded by !**/*.lock
📒 Files selected for processing (38)
  • js/msf/package.json
  • js/msf/src/catalog.ts
  • js/msf/src/index.ts
  • js/msf/tsconfig.json
  • js/watch/package.json
  • js/watch/src/broadcast.ts
  • js/watch/src/msf.ts
  • package.json
  • rs/hang/examples/subscribe.rs
  • rs/hang/examples/video.rs
  • rs/hang/src/catalog/audio/mod.rs
  • rs/hang/src/catalog/consumer.rs
  • rs/hang/src/catalog/root.rs
  • rs/hang/src/catalog/video/mod.rs
  • rs/libmoq/src/consume.rs
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-clock/src/clock.rs
  • rs/moq-clock/src/main.rs
  • rs/moq-ffi/src/consumer.rs
  • rs/moq-ffi/src/test.rs
  • rs/moq-lite/src/error.rs
  • rs/moq-lite/src/ietf/adapter.rs
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/broadcast.rs
  • rs/moq-lite/src/model/group.rs
  • rs/moq-lite/src/model/track.rs
  • rs/moq-mux/src/catalog.rs
  • rs/moq-mux/src/consumer/ordered.rs
  • rs/moq-mux/src/convert/fmp4.rs
  • rs/moq-mux/src/convert/hang.rs
  • rs/moq-mux/src/convert/test.rs
  • rs/moq-native/examples/chat.rs
  • rs/moq-native/tests/backend.rs
  • rs/moq-native/tests/broadcast.rs
  • rs/moq-relay/src/web.rs

Comment on lines +111 to +126
// Race the first catalog fetch, giving hang a 100ms headstart
const hangFetch = hangTrack
? Catalog.fetch(hangTrack).then((r) => (r ? { kind: "hang" as const, root: r } : undefined))
: undefined;

const msfFetch = msfTrack
? new Promise((r) => setTimeout(r, 100))
.then(() => Msf.fetch(msfTrack))
.then((c) => (c ? { kind: "msf" as const, root: toHang(c) } : undefined))
: undefined;

const candidates = [effect.cancel, hangFetch, msfFetch].filter(
(c): c is NonNullable<typeof c> => c != null,
);
const first = await Promise.race(candidates);
if (!first) return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Race logic may exit early on Hang failure instead of falling back to MSF.

If hangFetch resolves to undefined (e.g., the track is closed or empty) before the MSF 100ms delay completes, Promise.race will return undefined and the function exits at line 126 without attempting MSF. This means a quick Hang failure prevents MSF fallback.

If the intent is "first successful catalog wins," consider filtering out undefined results so only successful fetches compete:

🔧 Suggested approach
-				const first = await Promise.race(candidates);
-				if (!first) return;
+				// Keep racing until we get a result or all candidates fail
+				let first: { kind: "hang" | "msf"; root: Catalog.Root } | undefined;
+				const pending = candidates.filter((c): c is Promise<{ kind: "hang" | "msf"; root: Catalog.Root } | undefined> => c !== effect.cancel);
+				
+				while (pending.length > 0 && !first) {
+					const result = await Promise.race([effect.cancel, ...pending]);
+					if (!result) {
+						// effect.cancel or undefined result
+						if (result === undefined) {
+							// Remove the resolved promise
+							// This is tricky - might need Promise.allSettled approach instead
+						}
+						break;
+					}
+					first = result;
+				}
+				if (!first) return;

Alternatively, a simpler approach using Promise.any (if supported) or explicit fallback logic could work better here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/broadcast.ts` around lines 111 - 126, The current Promise.race
can return undefined if hangFetch resolves to undefined before msfFetch wakes
up; change the logic so only successful fetches compete: wrap hangFetch and
msfFetch so they reject when they would yield undefined (or use Promise.any with
those wrapped promises) and keep effect.cancel in the cancel slot; update the
array built from hangFetch, msfFetch and effect.cancel accordingly and then
await the winner (e.g., await Promise.any(wrappedCandidates)) so a quick failed
hangFetch no longer causes an early return and MSF can be selected when hang
returns nothing.

Comment on lines 192 to 201
let msg = lite::Subscribe {
id,
broadcast: broadcast.to_owned(),
broadcast: broadcast_path.to_owned(),
track: (&track.info.name).into(),
priority: track.info.priority,
ordered: true,
max_latency: std::time::Duration::ZERO,
priority: 0,
ordered: false,
max_latency: Duration::ZERO,
start_group: None,
end_group: None,
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if priority is set elsewhere in the subscription flow
rg -n "priority" rs/moq-lite/src/lite/ -A2 -B2

Repository: moq-dev/moq

Length of output: 24827


🏁 Script executed:

#!/bin/bash
# Get the full function context around the hardcoded priority
sed -n '150,220p' rs/moq-lite/src/lite/subscriber.rs | cat -n

Repository: moq-dev/moq

Length of output: 2662


🏁 Script executed:

#!/bin/bash
# Search for TrackProducer definition to understand available fields
rg -n "TrackProducer" rs/moq-lite/src/lite/ -A5 | head -50

# Also search for TrackInfo or similar to see what fields track.info has
rg -n "track\.info" rs/moq-lite/src/lite/subscriber.rs -B2 -A2

Repository: moq-dev/moq

Length of output: 1584


🏁 Script executed:

#!/bin/bash
# Find TrackProducer definition - it's imported from model
rg -n "struct TrackProducer" -A10

# Also check the model module to see what fields are in TrackInfo
rg -n "struct TrackInfo\|pub.*info" rs/moq-lite/src/model/ -A5 | head -80

Repository: moq-dev/moq

Length of output: 711


🏁 Script executed:

#!/bin/bash
# Find the Track struct definition
rg -n "pub struct Track\|pub.*priority" rs/moq-lite/src/model/track.rs -B2 -A10 | head -100

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Search more broadly for Track struct
rg -n "struct Track" rs/moq-lite/src/model/ -A15

Repository: moq-dev/moq

Length of output: 4961


🏁 Script executed:

#!/bin/bash
# Check if there are any tests for subscriber.rs
rg -n "test.*subscribe\|#\[test\]" rs/moq-lite/src/lite/subscriber.rs -B2 -A5

# Check if there's any documentation about priority support in subscriber
rg -n "priority" rs/moq-lite/src/lite/subscriber.rs

Repository: moq-dev/moq

Length of output: 74


🏁 Script executed:

#!/bin/bash
# Check if there are any callers of run_subscribe or if priority is supposed to come from elsewhere
rg -n "run_subscribe\|Subscriber::new" rs/moq-lite/src/ -B3 -A3

# Check the lite protocol specification or comments about priority
rg -n "priority\|Priority" rs/moq-lite/src/lite/mod.rs -B2 -A2

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Let me search for how Subscriber is instantiated and used
rg -n "Subscriber\|new.*Subscriber" rs/moq-lite/src/ -B2 -A2 | head -80

# Also check if there are integration tests or examples
find rs/moq-lite -name "*test*" -o -name "*example*" | head -20

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Check the module level documentation and purpose
head -50 rs/moq-lite/src/lite/subscriber.rs

# Also check what the lite protocol is and its version handling
rg -n "Version::" rs/moq-lite/src/lite/subscribe.rs | head -20

Repository: moq-dev/moq

Length of output: 1966


🏁 Script executed:

#!/bin/bash
# Let me understand the version enum and when priority was introduced
rg -n "enum Version\|Lite0[123]" rs/moq-lite/src/lite/ -B2 -A8 | head -60

# Check if there are any TODOs or comments about priority in subscriber
rg -n "TODO.*priority\|FIXME.*priority" rs/moq-lite/src/

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Find the Version enum definition
rg -n "enum Version" rs/moq-lite/src/ -A10

# Check the protocol specification or MOQT spec references
find rs/moq-lite -name "*.md" -o -name "*.txt" | xargs grep -l "priority" 2>/dev/null | head -5

Repository: moq-dev/moq

Length of output: 1967


🏁 Script executed:

#!/bin/bash
# Check the CHANGELOG to understand version differences and priority support
cat rs/moq-lite/CHANGELOG.md | head -150

# Also check if the coding guidelines mention anything about version handling
grep -n "priority\|Version\|draft" coding_guidelines 2>/dev/null || echo "No coding_guidelines file found"

Repository: moq-dev/moq

Length of output: 8855


🏁 Script executed:

#!/bin/bash
# Check the subscribe.rs version handling for priority more carefully
sed -n '1,140p' rs/moq-lite/src/lite/subscribe.rs | cat -n

Repository: moq-dev/moq

Length of output: 4645


🏁 Script executed:

#!/bin/bash
# Check when priority was added and if there's a pattern in how it's used
rg -n "priority: 0" rs/moq-lite/src/lite/ -B3 -A1

# Check if there are any comments about priority behavior differences between versions
rg -n "Lite0[123].*priority\|priority.*Lite0[123]" rs/moq-lite/src/lite/

Repository: moq-dev/moq

Length of output: 989


🏁 Script executed:

#!/bin/bash
# Check git history to understand why priority is hardcoded to 0 in subscriber
git log --oneline -n 50 -- rs/moq-lite/src/lite/subscriber.rs | head -20

# Check if there's any design doc or specification about subscriber priority
find . -name "*.md" -type f | xargs grep -l "subscriber.*priority\|priority.*subscriber" 2>/dev/null | head -5

Repository: moq-dev/moq

Length of output: 150


🏁 Script executed:

#!/bin/bash
# Check the MOQ Lite specification for priority handling
cat ./doc/spec/draft-lcurley-moq-lite.md | head -200

# Also search for priority-related sections
grep -n -i "priority\|subscriber" ./doc/spec/draft-lcurley-moq-lite.md | head -30

Repository: moq-dev/moq

Length of output: 13392


Subscriber always sends priority: 0, preventing per-track priority differentiation.

The Subscribe message hardcodes priority: 0 regardless of any track-specific priority intent. The moq-lite specification states "The subscriber and publisher both indicate their delivery preference" where priority determines transmission order, but the current subscriber implementation has no mechanism to receive or pass through a priority value. The publisher correctly uses subscribe.priority to order group delivery (line 316 in publisher.rs), so all subscriptions will receive equal priority.

The Track struct contains only a name field with no priority metadata, and run_subscribe() receives no priority parameter. Consider adding priority as a parameter to run_subscribe() or as a field in the Track/TrackProducer struct to allow subscribers to express their priority preferences per the protocol specification.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/lite/subscriber.rs` around lines 192 - 201, The Subscribe
message is hardcoded with priority: 0 in run_subscribe, preventing per-track
priority; update the subscriber API to accept and propagate a priority and set
msg.priority accordingly: either add a priority field to Track or TrackProducer
(e.g., Track::priority or TrackProducer::priority) and read it when building
lite::Subscribe, or add a priority parameter to run_subscribe so callers can
pass the desired priority; ensure the created Subscribe (in run_subscribe) uses
that priority value instead of 0 so the publisher can honor subscribe.priority
when ordering deliveries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant