Skip to content

Refactor Origin API: cleaner naming, remove Option from announcements#1142

Open
kixelated wants to merge 3 commits intodevfrom
origin-revamp
Open

Refactor Origin API: cleaner naming, remove Option from announcements#1142
kixelated wants to merge 3 commits intodevfrom
origin-revamp

Conversation

@kixelated
Copy link
Collaborator

Summary

  • Rename publish_only/consume_onlywith_filter() on both OriginProducer and OriginConsumer
  • Remove shortcut methods from OriginProducer (consume_only, consume_broadcast, consume clone alias)
  • OriginAnnounce no longer wraps BroadcastConsumer in Option — unannouncements are detected via BroadcastConsumer::closed() instead of None messages
  • announced() returns Result<OriginAnnounce, Error> instead of Option
  • Add async consume_broadcast() that waits for a broadcast to appear; rename the sync version to try_consume_broadcast()
  • Rewrite publisher announce loops with FuturesUnordered to track broadcast closures and send Ended/NamespaceDone
  • Rewrite cluster run_remotes with JoinSet for proper task lifecycle
  • Deprecate moq_announced::active FFI field (always true now)

Test plan

  • just check passes (clippy, formatting, all lints)
  • All 31 origin unit tests pass (2 new: async/sync consume_broadcast)
  • All 14 libmoq FFI tests pass (updated announced_deactivation test)
  • All 227 moq-lite tests pass
  • Integration tests with relay cluster

🤖 Generated with Claude Code

kixelated and others added 2 commits March 19, 2026 23:23
- Rename publish_only/consume_only to with_filter on both OriginProducer and OriginConsumer
- Remove shortcut methods (consume_only, consume_broadcast) from OriginProducer
- Remove OriginConsumer::consume() clone alias
- OriginAnnounce no longer wraps BroadcastConsumer in Option; unannouncements
  are detected via BroadcastConsumer::closed() instead of None messages
- announced() returns Result<OriginAnnounce, Error> instead of Option
- Add async consume_broadcast() that waits for a broadcast to appear;
  rename the sync version to try_consume_broadcast()
- Rewrite publisher announce loops with FuturesUnordered to track broadcast
  closures and send Ended/NamespaceDone when broadcasts close
- Rewrite cluster run_remotes with JoinSet for proper task lifecycle
- Deprecate moq_announced::active FFI field (always true now)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Resolved conflicts in:
- rs/moq-lite/src/model/origin.rs: Merged hops-based routing from dev with
  our no-Option announcements API. Reannounce uses announce() instead of
  removed reannounce(). Unannounce removed (broadcasts close silently).
  Updated dev's new hops tests to match new API semantics.
- rs/moq-lite/src/lite/publisher.rs: Kept FuturesUnordered pattern from HEAD,
  incorporated broadcast.info.hops from dev for Announce::Active messages.
- rs/moq-relay/src/cluster.rs: Took dev's simplified single-origin architecture,
  applied our API renames (with_filter, try_consume_broadcast).
- rs/moq-relay/src/connection.rs, web.rs, websocket.rs: Applied API renames
  (publish_only → with_filter, consume_only → consume().with_filter).
- rs/moq-cli/src/main.rs: Updated announced() from Option to Result API.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 20, 2026

Walkthrough

This pull request refactors the MOQ announcement and broadcast consumption APIs to use explicit error-based semantics instead of optional/None-based signaling. OriginAnnounce changes from (PathOwned, Option<BroadcastConsumer>) to (PathOwned, BroadcastConsumer) with unannouncements signaled via broadcast.closed() futures rather than None tuples. Broadcast consumption transitions from immediate consume_broadcast() returning Option to try_consume_broadcast() or an async consume_broadcast() returning Result. Method names shift (consume_only/publish_onlywith_filter), error handling is tightened throughout, and publisher logic gains concurrent broadcast-closure tracking. The relay cluster is simplified, removing token-scoped routing and roots/prefixes in favor of a single origin handle. CLI commands are restructured from Command::{Serve, Publish} to Command::{Server, Client} with nested Action variants.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and concisely summarizes the main changes: API method renaming (publish_only/consume_only to with_filter) and removal of Option wrapping from announcements. The two core themes are directly reflected in the changeset.
Description check ✅ Passed The description provides clear, structured details about the API changes, removal of Option from announcements, async broadcast consumption additions, publisher loop rewrites, cluster refactoring, and deprecations—all of which directly correspond to the changeset modifications across multiple files.
Docstring Coverage ✅ Passed Docstring coverage is 96.83% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch origin-revamp
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch origin-revamp

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
rs/moq-lite/src/model/origin.rs (1)

564-626: Pause Tokio time in these sleep-based tests.

These updated tests still wait on real tokio::time::sleep, so they stay slower and can wobble under load. Call tokio::time::pause() at the top so the cleanup waits run against virtual time instead.

As per coding guidelines "For async tests that sleep, call tokio::time::pause() at the start to simulate time instantly."

Also applies to: 628-679, 681-721, 1315-1333

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/origin.rs` around lines 564 - 626, The test
test_announce (and the other sleep-based async tests noted) use real
tokio::time::sleep; add a call to tokio::time::pause() at the start of each such
test (e.g., inside async fn test_announce) so sleeps run on virtual time, and
when the test needs to advance timers use tokio::time::advance(...) or otherwise
ensure the virtual clock is progressed before awaiting the sleep wakeups; update
the tests referenced (the blocks around startLines 628-679, 681-721, 1315-1333)
similarly to pause the tokio time at the beginning of each test function.
rs/moq-lite/src/ietf/publisher.rs (1)

447-458: Default this Version branch forward.

Hard-coding Draft17 here means the next draft will need another touch even if it should keep following the current "just finish the stream" behavior. Make the Draft17+ case the default branch instead.

Suggested fix
 		match version {
 			Version::Draft14 | Version::Draft15 | Version::Draft16 => {
 				let _ = stream
 					.writer
 					.encode_message(&ietf::PublishNamespaceDone {
 						track_namespace: suffix.to_owned(),
 						request_id,
 					})
 					.await;
 			}
-			Version::Draft17 => {}
+			_ => {}
 		}
As per coding guidelines "When matching on `Version` enums, default to the newest draft behavior so future versions default forward. Explicitly list older versions for old behavior."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/ietf/publisher.rs` around lines 447 - 458, The match
currently handles Version::Draft14|Draft15|Draft16 by sending a
PublishNamespaceDone via stream.writer.encode_message(...) and hard-codes
Version::Draft17 as the empty branch; change this so newer drafts default
forward: keep the explicit older-version arm (Version::Draft14 |
Version::Draft15 | Version::Draft16 => { let _ =
stream.writer.encode_message(&ietf::PublishNamespaceDone { track_namespace:
suffix.to_owned(), request_id, }).await; }) and replace the Version::Draft17 arm
with a default arm (_) => {} so any future Draft18+ will inherit the "do nothing
/ finish the stream" behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 473-480: consume_broadcast currently subscribes to a subtree via
with_filter(&[path]) and may return broadcasts for descendants; fix by looping
after subscribing and ignoring announced entries whose path != the requested
path. Specifically, in consume_broadcast, after the existing
try_consume_broadcast(&path) check and after creating scoped via
with_filter(&[path]) and calling scoped.announced(), examine the announced tuple
returned by announced() (the first element is the announced path) and if it does
not equal the requested path, continue awaiting announced() (or re-check
try_consume_broadcast) until you receive an announced entry whose path == path,
then return that BroadcastConsumer; keep using the same symbols:
consume_broadcast, try_consume_broadcast, with_filter, and announced.

In `@rs/moq-relay/src/cluster.rs`:
- Around line 200-248: The join_next() result can belong to a superseded task,
so change the scheme to tag each spawned task with a unique task id and only
remove the active handle when the completed task's id matches the
currently-stored id for that node; specifically, when spawning in the block that
calls this.run_remote(...) create a unique task_id (e.g. a UUID or incrementing
counter), have the spawned future return (node2.to_string(), task_id), and
change active from HashMap<String, AbortHandle> to HashMap<String, (TaskId,
tokio::task::AbortHandle)> (or equivalent tuple), then when handling
Some(result) = tasks.join_next() compare the returned task_id with the id in
active.get(&node) and only remove/log if they match — this ensures late
completions from superseded tasks do not remove the replacement handle.

---

Nitpick comments:
In `@rs/moq-lite/src/ietf/publisher.rs`:
- Around line 447-458: The match currently handles
Version::Draft14|Draft15|Draft16 by sending a PublishNamespaceDone via
stream.writer.encode_message(...) and hard-codes Version::Draft17 as the empty
branch; change this so newer drafts default forward: keep the explicit
older-version arm (Version::Draft14 | Version::Draft15 | Version::Draft16 => {
let _ = stream.writer.encode_message(&ietf::PublishNamespaceDone {
track_namespace: suffix.to_owned(), request_id, }).await; }) and replace the
Version::Draft17 arm with a default arm (_) => {} so any future Draft18+ will
inherit the "do nothing / finish the stream" behavior.

In `@rs/moq-lite/src/model/origin.rs`:
- Around line 564-626: The test test_announce (and the other sleep-based async
tests noted) use real tokio::time::sleep; add a call to tokio::time::pause() at
the start of each such test (e.g., inside async fn test_announce) so sleeps run
on virtual time, and when the test needs to advance timers use
tokio::time::advance(...) or otherwise ensure the virtual clock is progressed
before awaiting the sleep wakeups; update the tests referenced (the blocks
around startLines 628-679, 681-721, 1315-1333) similarly to pause the tokio time
at the beginning of each test function.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d162c59a-3fa8-463a-b8d8-6ab1058ff8af

📥 Commits

Reviewing files that changed from the base of the PR and between d543d18 and cd82878.

📒 Files selected for processing (13)
  • rs/hang/examples/subscribe.rs
  • rs/libmoq/src/api.rs
  • rs/libmoq/src/origin.rs
  • rs/libmoq/src/test.rs
  • rs/moq-clock/src/main.rs
  • rs/moq-ffi/src/origin.rs
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/moq-native/tests/backend.rs
  • rs/moq-native/tests/broadcast.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/web.rs
💤 Files with no reviewable changes (2)
  • rs/moq-native/tests/backend.rs
  • rs/moq-native/tests/broadcast.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

♻️ Duplicate comments (1)
rs/moq-lite/src/model/origin.rs (1)

536-544: ⚠️ Potential issue | 🟠 Major

Wait for the exact path in consume_broadcast().

with_filter(&[path]) subscribes to the whole subtree, not just the leaf. If foo/bar is announced while a caller is waiting on consume_broadcast("foo"), this returns the descendant broadcast and breaks the exact-path contract.

Suggested fix
 	pub async fn consume_broadcast(&self, path: impl AsPath) -> Result<BroadcastConsumer, Error> {
-		let path = path.as_path();
-		if let Some(bc) = self.try_consume_broadcast(&path) {
+		let path = path.as_path().to_owned();
+		if let Some(bc) = self.try_consume_broadcast(path.as_path()) {
 			return Ok(bc);
 		}
-		let mut scoped = self.with_filter(&[path]).ok_or(Error::NotFound)?;
-		let (_, broadcast) = scoped.announced().await?;
-		Ok(broadcast)
+		let mut scoped = self.with_filter(&[path.as_path()]).ok_or(Error::NotFound)?;
+		loop {
+			let (announced_path, broadcast) = scoped.announced().await?;
+			if announced_path == path {
+				return Ok(broadcast);
+			}
+		}
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/origin.rs` around lines 536 - 544, consume_broadcast
currently subscribes to a subtree via with_filter(&[path]) so it can return
descendant broadcasts (e.g., "foo/bar" when waiting for "foo"); change the logic
to wait for an announced() value that matches the exact path: keep the initial
try_consume_broadcast(&path) check, then create the scoped consumer with
with_filter(&[path]) (or existing API), but after calling
scoped.announced().await loop until the returned BroadcastConsumer (or its
metadata) refers to the exact path requested (compare the broadcast's path to
the original path) and only return when equal; continue awaiting/skipping any
non-matching descendant announcements to preserve the exact-path contract.
🧹 Nitpick comments (14)
rs/moq-token-cli/src/main.rs (1)

91-110: The inner #[allow(deprecated)] on line 102 is redundant.

The #[allow(deprecated)] attribute on line 91 already applies to the entire match arm body, including the Claims construction. The inner annotation on line 102 is unnecessary.

♻️ Suggested simplification
 		#[allow(deprecated)]
 		Commands::Sign {
 			root,
 			publish,
 			cluster,
 			subscribe,
 			expires,
 			issued,
 		} => {
 			let key = moq_token::Key::from_file(cli.key)?;

-			#[allow(deprecated)]
 			let payload = moq_token::Claims {
 				root,
 				publish,
 				cluster,
 				subscribe,
 				expires,
 				issued,
 			};

 			let token = key.encode(&payload)?;
 			println!("{token}");
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-token-cli/src/main.rs` around lines 91 - 110, The inner
#[allow(deprecated)] on the let payload = moq_token::Claims { ... } line is
redundant because the outer #[allow(deprecated)] applied to the Commands::Sign
match arm already covers the entire arm; remove the inner #[allow(deprecated)]
annotation and leave the outer attribute in place so Claims construction remains
unaffected.
js/watch/src/video/decoder.ts (1)

358-360: Consider adding error handling for base64 decoding.

The atob() function throws a DOMException if initData contains invalid base64 characters. Since this data comes from the catalog (potentially user-controlled or corrupted), a malformed value would crash the decoder track.

♻️ Proposed defensive handling
 		// Decode the base64 init segment to extract timescale
-		const initBytes = Uint8Array.from(atob(this.config.container.initData), (c) => c.charCodeAt(0));
+		let initBytes: Uint8Array;
+		try {
+			initBytes = Uint8Array.from(atob(this.config.container.initData), (c) => c.charCodeAt(0));
+		} catch {
+			console.error("Invalid base64 initData in CMAF container config");
+			return;
+		}
 		const { timescale } = Container.Cmaf.decodeInitSegment(initBytes);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/video/decoder.ts` around lines 358 - 360,
atob(this.config.container.initData) can throw on invalid base64 and will crash
the decoder; wrap the base64 decode and subsequent
Container.Cmaf.decodeInitSegment call in a try/catch around the block that
creates initBytes and reads timescale (referencing atob,
this.config.container.initData, initBytes, and
Container.Cmaf.decodeInitSegment), log or surface the decoding/parsing error via
the existing logger, and handle the failure path (e.g., mark the track
invalid/skip initialization or use a safe default timescale) so the decoder does
not crash on malformed catalog data.
rs/libmoq/src/api.rs (1)

100-101: Consider preserving the original error context.

The original error from log.init() is discarded when mapping to Error::LogInit. This loses diagnostic information that could help debug initialization failures.

♻️ Consider wrapping the original error

If Error::LogInit can hold a source error:

 .init()
-.map_err(|_| crate::error::Error::LogInit)?;
+.map_err(crate::error::Error::LogInit)?;

This assumes Error::LogInit is changed to wrap the underlying error, e.g., LogInit(tracing_subscriber::Error).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/libmoq/src/api.rs` around lines 100 - 101, The current
`.init().map_err(|_| crate::error::Error::LogInit)?` discards the original
error; change the mapping to preserve the source (e.g., `.map_err(|e|
crate::error::Error::LogInit(e))?`) and update the `Error::LogInit` variant to
wrap the underlying error type (or implement `#[from]`/source via thiserror) so
the original `tracing_subscriber` error/context is retained for diagnostics;
locate the `log.init()` call and the `Error::LogInit` enum/variant to make the
coordinated changes.
rs/libmoq/src/publish.rs (1)

41-45: Minor naming inconsistency.

The variable decoder on line 42 now holds a producer::Framed instance. Consider renaming to framed or media for consistency with the new type name.

♻️ Suggested rename
-		let decoder = producer::Framed::new(broadcast.clone(), catalog.clone(), format, &mut init)
+		let framed = producer::Framed::new(broadcast.clone(), catalog.clone(), format, &mut init)
 			.map_err(|err| Error::InitFailed(Arc::new(err)))?;
 
-		let id = self.media.insert(decoder)?;
+		let id = self.media.insert(framed)?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/libmoq/src/publish.rs` around lines 41 - 45, The local variable named
`decoder` actually holds a producer::Framed instance; rename `decoder` to a
clearer name such as `framed` (or `media`) and update all references accordingly
(e.g., the call to self.media.insert(decoder)? should become
self.media.insert(framed)?), ensuring the creation site
producer::Framed::new(...) and any error mappings still refer to the new
variable name.
js/watch/src/audio/mse.ts (1)

106-108: Consider error handling for malformed base64 initData.

atob() throws a DOMException if config.container.initData contains invalid base64. Since this data comes from an external catalog, consider wrapping in try-catch or validating before decode to provide a clearer error message.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/audio/mse.ts` around lines 106 - 108, The code currently calls
atob(config.container.initData) and passes the result to
Container.Cmaf.decodeInitSegment without handling malformed base64; wrap the
atob + decode step in a try-catch (around the initBytes creation and
Container.Cmaf.decodeInitSegment call), validate or catch DOMException from atob
and any errors from Container.Cmaf.decodeInitSegment, and then throw or log a
clearer error that includes context (e.g., mentioning config.container.initData
and the failing function names initBytes/Container.Cmaf.decodeInitSegment) so
callers can distinguish malformed base64 from decoding failures.
rs/moq-mux/src/consumer/frame.rs (1)

24-28: Clarify is_keyframe() semantics in the docstring.

The method returns index == 0, but as noted in the index field docs, this doesn't guarantee an actual keyframe for audio with duration-based grouping. Consider expanding the method docstring to align with the field documentation to prevent misuse.

📝 Suggested docstring enhancement
 impl OrderedFrame {
-	/// Returns true if this is the first frame in the group (index 0).
+	/// Returns true if this is the first frame in the group (index 0).
+	///
+	/// Note: For duration-based grouping (e.g., audio), the first frame
+	/// may not be a true keyframe — this only indicates position in group.
 	pub fn is_keyframe(&self) -> bool {
 		self.index == 0
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/consumer/frame.rs` around lines 24 - 28, The docstring for
OrderedFrame::is_keyframe is misleading because it returns self.index == 0 but
the OrderedFrame.index field doc explains index 0 only means the first frame in
a group and may not correspond to a true audio keyframe; update the
is_keyframe() docstring to state explicitly that it returns whether this frame
is the first in its grouping (index == 0), mention the caveat about
audio/duration-based grouping from the OrderedFrame.index docs, and warn callers
that they should not assume this implies a codec-level keyframe for audio
streams; keep the method semantics unchanged and reference OrderedFrame and its
index field in the docstring.
rs/moq-mux/src/producer/fmp4.rs (1)

579-584: Fallback to full mdat may mask data corruption.

When track_data_start >= mdat.data.len() or track_data_end > mdat.data.len(), the code falls back to using the entire mdat. This could produce incorrect output if the offset calculations are wrong. Consider returning an error instead for stricter validation:

♻️ Stricter bounds checking
 let track_mdat_data = if track_data_start < mdat.data.len() && track_data_end <= mdat.data.len() {
     &mdat.data[track_data_start..track_data_end]
 } else {
-    // Fallback: use the full mdat data (single-track case)
-    &mdat.data[..]
+    anyhow::bail!(
+        "invalid track data bounds: start={}, end={}, mdat_len={}",
+        track_data_start, track_data_end, mdat.data.len()
+    );
 };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/fmp4.rs` around lines 579 - 584, The current fallback
that sets track_mdat_data = &mdat.data[..] when track_data_start >=
mdat.data.len() or track_data_end > mdat.data.len() can mask corrupted offsets;
instead validate bounds and return an error when the requested slice is out of
range. In the code around track_mdat_data (referencing track_data_start,
track_data_end and mdat.data), replace the fallback branch with an early
Err(...) (or propagate a suitable error type) describing invalid track mdat
offsets so callers can handle or fail fast rather than silently using the whole
mdat.
rs/moq-relay/src/cluster.rs (1)

86-105: Consider whether giving up after repeated failures is the desired behavior.

The backoff caps at 300 seconds and then bails with an error. For a long-running relay, this means the cluster connection to a remote is permanently abandoned after a period of failures.

If continuous retry is preferred (e.g., for transient network partitions), consider capping the backoff without bailing:

♻️ Suggested change for infinite retry with capped backoff
 let timeout = tokio::time::Duration::from_secs(backoff);
-if timeout > tokio::time::Duration::from_secs(300) {
-    anyhow::bail!("remote connection keep failing, giving up");
-}
+let timeout = timeout.min(tokio::time::Duration::from_secs(300));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-relay/src/cluster.rs` around lines 86 - 105, The loop currently
multiplies backoff and calls anyhow::bail! once the computed timeout exceeds
300s, which permanently abandons retrying; instead modify the logic around the
backoff and timeout in the loop that calls run_remote_once(&url) so it caps the
backoff at a maximum (e.g., 300 seconds) rather than bailing: when Err(err) in
the match on res update the backoff but clamp it to a MAX_BACKOFF constant, log
the error with tracing::error!(%err,...), compute the sleep duration as the
minimum of backoff and MAX_BACKOFF, and remove the anyhow::bail! so the loop
continues retrying indefinitely with a capped backoff; refer to the backoff
variable, run_remote_once, and the error match branch to locate the change.
rs/moq-mux/src/consumer/fmp4.rs (1)

143-176: Consider potential integer overflow in timestamp conversion.

Line 151 computes dts = frame.timestamp.as_micros() as u64 * track.timescale / 1_000_000. For very long streams (e.g., 24+ hours at high timescale), the multiplication could overflow before the division.

♻️ Safer conversion using 128-bit intermediate
-let dts = frame.timestamp.as_micros() as u64 * track.timescale / 1_000_000;
+let dts = ((frame.timestamp.as_micros() as u128 * track.timescale as u128) / 1_000_000) as u64;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/consumer/fmp4.rs` around lines 143 - 176, The dts calculation
in frame() currently does u64_cast_of(frame.timestamp.as_micros()) *
track.timescale / 1_000_000 which can overflow; change the conversion to perform
the multiplication/division in a wider type (e.g., use a u128 intermediate or
checked arithmetic) then cast back to u64, e.g. compute
(frame.timestamp.as_micros() as u128 * track.timescale as u128 / 1_000_000u128)
and then try_to_cast_or_fail into u64 so build_moof gets a safe dts; update the
dts binding and handle any overflow error by returning an anyhow error from
frame().
rs/moq-mux/src/convert/test.rs (2)

221-257: Add tokio::time::pause() for faster test execution.

Per coding guidelines, async tests that use time-based operations should call tokio::time::pause() at the start. This test uses tokio::time::timeout, and pausing time will make the test run faster while still detecting hangs correctly when using start_paused = true:

♻️ Suggested fix
-#[tokio::test]
+#[tokio::test(start_paused = true)]
 async fn legacy_to_cmaf_video() {

Apply this pattern to all async tests in this file (legacy_to_cmaf_video, cmaf_to_legacy_video, roundtrip_legacy_cmaf_legacy, cmaf_passthrough, legacy_passthrough).

As per coding guidelines: "For async tests that sleep, call tokio::time::pause() at the start to simulate time instantly"

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/convert/test.rs` around lines 221 - 257, Add a call to
tokio::time::pause() at the start of each async test that uses time-based
operations to speed execution and enable deterministic time control: insert
tokio::time::pause(); as the first statement inside the test functions
legacy_to_cmaf_video, cmaf_to_legacy_video, roundtrip_legacy_cmaf_legacy,
cmaf_passthrough, and legacy_passthrough so that time-driven waits/timeouts run
instantly under the test runtime configured with start_paused.

205-217: Unbounded retry loop could hang indefinitely.

subscribe_video loops forever until the track appears. If the converter fails to create the track, this will hang. Consider adding a timeout or maximum retry count:

♻️ Suggested fix
 async fn subscribe_video(consumer: &moq_lite::BroadcastConsumer) -> moq_lite::TrackConsumer {
     let track = moq_lite::Track {
         name: "video".to_string(),
         priority: 1,
     };
+    let mut attempts = 0;
     loop {
         match consumer.subscribe_track(&track) {
             Ok(t) => return t,
-            Err(_) => tokio::task::yield_now().await,
+            Err(_) => {
+                attempts += 1;
+                assert!(attempts < 1000, "subscribe_video exceeded retry limit");
+                tokio::task::yield_now().await;
+            }
         }
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/convert/test.rs` around lines 205 - 217, The function
subscribe_video currently loops forever; change it to fail instead of hanging by
adding a bounded retry or timeout: modify
subscribe_video(&moq_lite::BroadcastConsumer) -> Result<moq_lite::TrackConsumer,
E> (or return Option) and implement either
tokio::time::timeout(Duration::from_secs(n), async { ... }) around the loop or
count attempts with a max_attempts and tokio::time::sleep between retries before
returning an Err/None; reference the subscribe_video function, the
moq_lite::Track named "video", and the consumer.subscribe_track call so the
retry/backoff and failure return are applied where the subscribe loop currently
runs.
rs/moq-mux/src/consumer/muxer.rs (1)

73-93: Consider using iterator methods for cleaner minimum selection.

The minimum timestamp selection logic is correct but verbose. Consider using Iterator::min_by_key for clarity:

♻️ Optional refactor
-       let mut min_idx = None;
-       let mut min_ts = None;
-
-       for (i, track) in self.tracks.iter().enumerate() {
-           if let Some(frame) = &track.pending {
-               let ts: std::time::Duration = frame.timestamp.into();
-               if min_ts.is_none() || ts < min_ts.unwrap() {
-                   min_ts = Some(ts);
-                   min_idx = Some(i);
-               }
-           }
-       }
-
-       // Return the frame with the smallest timestamp
-       if let Some(idx) = min_idx {
+       let min_idx = self.tracks.iter().enumerate()
+           .filter_map(|(i, t)| t.pending.as_ref().map(|f| (i, f)))
+           .min_by_key(|(_, f)| -> std::time::Duration { f.timestamp.into() })
+           .map(|(i, _)| i);
+
+       if let Some(idx) = min_idx {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/consumer/muxer.rs` around lines 73 - 93, The loop that
searches self.tracks for the pending frame with the smallest timestamp is
verbose; refactor it to use iterator helpers (e.g.
self.tracks.iter().enumerate().filter_map(...).min_by_key(...)) to find the
index and frame more concisely: map each (i, track) to Option<(i, timestamp)>
only when track.pending.is_some(), use min_by_key on the timestamp (convert
frame.timestamp into Duration or a comparable key), then take() the pending
frame from self.tracks[idx], clone track.name and return
Poll::Ready(Ok(Some(MuxedFrame { name, frame }))); ensure you preserve the same
semantics (taking the pending value and using the index) and handle the None
case by falling through as before.
rs/moq-mux/src/convert/fmp4.rs (1)

176-180: DTS calculation may lose precision for non-standard framerates.

The calculation frame.timestamp.as_micros() as u64 * timescale / 1_000_000 performs integer division which can lose precision. For timestamps that don't divide evenly, accumulated rounding errors may cause A/V drift. Consider using higher precision intermediate values:

♻️ Suggested improvement
-       let dts = frame.timestamp.as_micros() as u64 * timescale / 1_000_000;
+       // Use 128-bit intermediate to avoid overflow and maintain precision
+       let dts = ((frame.timestamp.as_micros() as u128 * timescale as u128) / 1_000_000) as u64;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/convert/fmp4.rs` around lines 176 - 180, The DTS calculation
in dts = frame.timestamp.as_micros() as u64 * timescale / 1_000_000 loses
precision due to integer division; change the computation in the block that
prepares dts (referencing frame.timestamp.as_micros(), timescale and
build_moof_mdat) to perform the multiplication in a wider integer type (e.g.,
cast to u128), multiply first, then divide, and apply rounding (add half of
1_000_000 before dividing) to avoid truncation and accumulated drift; keep seq
increment and subsequent call to build_moof_mdat(seq, 1, dts, &payload,
keyframe) unchanged.
rs/moq-cli/src/publish.rs (1)

62-72: Blocking I/O in constructor may cause issues if called from async context.

std::io::stdin().read_line() is a synchronous blocking call. If Publish::new() is called from within an async runtime on a worker thread, this could block other tasks. Consider using tokio::io::stdin() with BufReader::new(...).read_line() or document that construction must happen before entering async context.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-cli/src/publish.rs` around lines 62 - 72, In the InputFormat::Hls
branch the code uses blocking std::io::stdin().read_line(...) inside
Publish::new (creating producer::HlsConfig and producer::Hls), which can block
an async runtime; replace the sync read with an async stdin read or change the
API so the playlist is supplied before entering async context. Concretely: in
the InputFormat::Hls arm, use tokio::io::stdin() wrapped in tokio::io::BufReader
and call tokio::io::AsyncBufReadExt::read_line(&mut playlist).await (or perform
the read with tokio::task::spawn_blocking if you must keep a sync API), then
trim and pass that playlist into producer::HlsConfig::new and producer::Hls::new
(preserving the creation of Hls). If you choose the async approach, update
Publish::new (and its callers) to be async; otherwise document that construction
must happen before entering an async runtime and move the blocking read out of
Publish::new.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@doc/spec/draft-ietf-moq-cmsf-00.txt`:
- Around line 262-266: Fix three typos and the mismatched quote in the normative
text: change "Earlist Presentation Times" to "Earliest Presentation Times",
change the packaging value string from 'eventtimeline"' to 'eventtimeline'
(remove the stray double-quote), and correct "beings" to "begins" in the
sentence containing that word; search for the exact fragments "Earlist
Presentation Times", "'packaging' value of 'eventtimeline\"'", and "beings" to
locate the occurrences to update.
- Line 427: Replace the placeholder "TODO Security" with a concrete Security
Considerations section that enumerates relevant threats and mitigations for the
CMSF extension: describe attacker model and risks to confidentiality, integrity
and availability; address authentication/authorization requirements and
implications for key management and credential lifecycle; discuss replay,
downgrade and impersonation threats and recommended
replay-protection/non-repudiation measures; note privacy implications
(linkability/data minimization) and dependencies on the security properties of
underlying transport/protocols; provide recommended algorithms/parameters,
operational guidance (e.g., revocation, logging, error handling), and cite
applicable normative references and existing IETF security guidance.

In `@doc/spec/draft-ietf-moq-msf-00.txt`:
- Line 1781: Replace the literal "ToDo" placeholder in the Security
Considerations section (the section labeled "Security Considerations" that
currently contains the string "ToDo") with a concrete security considerations
text: describe the threat model and attacker capabilities, enumerate
confidentiality/integrity/availability risks specific to the MSF protocol, call
out authentication/authorization and replay/DoS concerns, provide
deployment/configuration mitigation guidance and recommended MUST/SHOULD wording
for implementers, note privacy/metadata considerations and interoperability
risks, and include references to normative mitigations and relevant
RFCs/standards for further reading.
- Around line 1601-1603: Replace the Catalog field used for media timeline
tracks from 'type' to 'packaging' to match the catalog field model in Section
5.1.12: update the sentence mentioning a media timeline track MUST carry a
'type' identifier with value "mediatimeline" to instead require a 'packaging'
identifier with value "mediatimeline", and ensure the surrounding text referring
to the Catalog and the 'depends' attribute (the "media timeline track" wording)
is updated accordingly.

In `@js/hang/src/catalog/container.ts`:
- Around line 18-19: The Zod schema uses the old chaining API: replace the
initData codec that currently uses z.string().base64() with the Zod 4 top-level
base64 codec z.base64() to ensure compatibility; locate the initData property in
the schema (symbol: initData) in container.ts and update its codec to
z.base64(), keeping any surrounding schema structure intact.

In `@rs/moq-cli/src/publish.rs`:
- Around line 109-124: The tokio::select! between run_import(&mut kind) and
converter.run() can cancel the converter when run_import finishes (stdin EOF),
losing buffered frames; update the logic for both ExportFormat::Fmp4 and
ExportFormat::Hang to ensure the converter is awaited to completion after import
finishes (or use tokio::try_join! to run both concurrently and propagate errors)
— locate the branches creating moq_mux::convert::Fmp4::new(import_consumer,
output_broadcast) and moq_mux::convert::Hang::new(...), remove the current
select-based cancellation, and instead: run run_import(&mut kind) and then await
converter.run() (or use tokio::try_join!(run_import(&mut kind),
converter.run())) so converter.run() is not dropped prematurely.

In `@rs/moq-cli/src/subscribe.rs`:
- Around line 51-57: The current tokio::select! in subscribe.rs (selecting
between converter.run() and mux_fmp4(catalog_track, cmaf_consumer, max_latency))
cancels and discards the other branch when one completes, risking truncated
output; change the logic so both tasks are awaited to completion (e.g., replace
the select! with tokio::try_join!(converter.run(), mux_fmp4(catalog_track,
cmaf_consumer, max_latency)) or otherwise make mux_fmp4 drive final completion
by returning Ok(()) only after all data is flushed), ensuring both converter.run
and mux_fmp4 results are propagated and errors returned instead of silently
dropped.

In `@rs/moq-mux/src/convert/hang.rs`:
- Around line 264-273: The PTS calculation for pts currently does: let pts =
(dts as i64 + entry.cts.unwrap_or_default() as i64) as u64; which can overflow
when dts + cts is negative or wraps; update the calculation in the block that
computes pts (before calling Timestamp::from_scale) to use checked/safer
arithmetic: cast dts and entry.cts to a signed wider type (or use checked_add on
i64), verify the sum is non-negative, and only convert to u64 when safe,
returning an error (or propagating a failure) if the checked add fails or yields
a negative value so Timestamp::from_scale receives a valid u64.
- Around line 249-259: The loop over traf.trun blindly treats any trun with
data_offset as if it came from build_moof_mdat (which sets data_offset =
moof_size + 8) and unconditionally sets offset = 0; instead, detect whether the
fragment is external vs self-produced and validate data_offset against expected
values before using it: in the trun handling code (look for variables/functions
trun, traf, data_offset, build_moof_mdat, moof_size, mdat.data, offset) add a
check that if the fragment is external (no moof_size available) then either
compute a correct offset from the provided data_offset or return an error/Err
variant (or log and skip) when data_offset does not match the expected moof_size
+ 8 pattern; ensure any early-return or error includes context (traf/trun id) so
callers can handle malformed external fragments.

---

Duplicate comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 536-544: consume_broadcast currently subscribes to a subtree via
with_filter(&[path]) so it can return descendant broadcasts (e.g., "foo/bar"
when waiting for "foo"); change the logic to wait for an announced() value that
matches the exact path: keep the initial try_consume_broadcast(&path) check,
then create the scoped consumer with with_filter(&[path]) (or existing API), but
after calling scoped.announced().await loop until the returned BroadcastConsumer
(or its metadata) refers to the exact path requested (compare the broadcast's
path to the original path) and only return when equal; continue
awaiting/skipping any non-matching descendant announcements to preserve the
exact-path contract.

---

Nitpick comments:
In `@js/watch/src/audio/mse.ts`:
- Around line 106-108: The code currently calls atob(config.container.initData)
and passes the result to Container.Cmaf.decodeInitSegment without handling
malformed base64; wrap the atob + decode step in a try-catch (around the
initBytes creation and Container.Cmaf.decodeInitSegment call), validate or catch
DOMException from atob and any errors from Container.Cmaf.decodeInitSegment, and
then throw or log a clearer error that includes context (e.g., mentioning
config.container.initData and the failing function names
initBytes/Container.Cmaf.decodeInitSegment) so callers can distinguish malformed
base64 from decoding failures.

In `@js/watch/src/video/decoder.ts`:
- Around line 358-360: atob(this.config.container.initData) can throw on invalid
base64 and will crash the decoder; wrap the base64 decode and subsequent
Container.Cmaf.decodeInitSegment call in a try/catch around the block that
creates initBytes and reads timescale (referencing atob,
this.config.container.initData, initBytes, and
Container.Cmaf.decodeInitSegment), log or surface the decoding/parsing error via
the existing logger, and handle the failure path (e.g., mark the track
invalid/skip initialization or use a safe default timescale) so the decoder does
not crash on malformed catalog data.

In `@rs/libmoq/src/api.rs`:
- Around line 100-101: The current `.init().map_err(|_|
crate::error::Error::LogInit)?` discards the original error; change the mapping
to preserve the source (e.g., `.map_err(|e| crate::error::Error::LogInit(e))?`)
and update the `Error::LogInit` variant to wrap the underlying error type (or
implement `#[from]`/source via thiserror) so the original `tracing_subscriber`
error/context is retained for diagnostics; locate the `log.init()` call and the
`Error::LogInit` enum/variant to make the coordinated changes.

In `@rs/libmoq/src/publish.rs`:
- Around line 41-45: The local variable named `decoder` actually holds a
producer::Framed instance; rename `decoder` to a clearer name such as `framed`
(or `media`) and update all references accordingly (e.g., the call to
self.media.insert(decoder)? should become self.media.insert(framed)?), ensuring
the creation site producer::Framed::new(...) and any error mappings still refer
to the new variable name.

In `@rs/moq-cli/src/publish.rs`:
- Around line 62-72: In the InputFormat::Hls branch the code uses blocking
std::io::stdin().read_line(...) inside Publish::new (creating
producer::HlsConfig and producer::Hls), which can block an async runtime;
replace the sync read with an async stdin read or change the API so the playlist
is supplied before entering async context. Concretely: in the InputFormat::Hls
arm, use tokio::io::stdin() wrapped in tokio::io::BufReader and call
tokio::io::AsyncBufReadExt::read_line(&mut playlist).await (or perform the read
with tokio::task::spawn_blocking if you must keep a sync API), then trim and
pass that playlist into producer::HlsConfig::new and producer::Hls::new
(preserving the creation of Hls). If you choose the async approach, update
Publish::new (and its callers) to be async; otherwise document that construction
must happen before entering an async runtime and move the blocking read out of
Publish::new.

In `@rs/moq-mux/src/consumer/fmp4.rs`:
- Around line 143-176: The dts calculation in frame() currently does
u64_cast_of(frame.timestamp.as_micros()) * track.timescale / 1_000_000 which can
overflow; change the conversion to perform the multiplication/division in a
wider type (e.g., use a u128 intermediate or checked arithmetic) then cast back
to u64, e.g. compute (frame.timestamp.as_micros() as u128 * track.timescale as
u128 / 1_000_000u128) and then try_to_cast_or_fail into u64 so build_moof gets a
safe dts; update the dts binding and handle any overflow error by returning an
anyhow error from frame().

In `@rs/moq-mux/src/consumer/frame.rs`:
- Around line 24-28: The docstring for OrderedFrame::is_keyframe is misleading
because it returns self.index == 0 but the OrderedFrame.index field doc explains
index 0 only means the first frame in a group and may not correspond to a true
audio keyframe; update the is_keyframe() docstring to state explicitly that it
returns whether this frame is the first in its grouping (index == 0), mention
the caveat about audio/duration-based grouping from the OrderedFrame.index docs,
and warn callers that they should not assume this implies a codec-level keyframe
for audio streams; keep the method semantics unchanged and reference
OrderedFrame and its index field in the docstring.

In `@rs/moq-mux/src/consumer/muxer.rs`:
- Around line 73-93: The loop that searches self.tracks for the pending frame
with the smallest timestamp is verbose; refactor it to use iterator helpers
(e.g. self.tracks.iter().enumerate().filter_map(...).min_by_key(...)) to find
the index and frame more concisely: map each (i, track) to Option<(i,
timestamp)> only when track.pending.is_some(), use min_by_key on the timestamp
(convert frame.timestamp into Duration or a comparable key), then take() the
pending frame from self.tracks[idx], clone track.name and return
Poll::Ready(Ok(Some(MuxedFrame { name, frame }))); ensure you preserve the same
semantics (taking the pending value and using the index) and handle the None
case by falling through as before.

In `@rs/moq-mux/src/convert/fmp4.rs`:
- Around line 176-180: The DTS calculation in dts = frame.timestamp.as_micros()
as u64 * timescale / 1_000_000 loses precision due to integer division; change
the computation in the block that prepares dts (referencing
frame.timestamp.as_micros(), timescale and build_moof_mdat) to perform the
multiplication in a wider integer type (e.g., cast to u128), multiply first,
then divide, and apply rounding (add half of 1_000_000 before dividing) to avoid
truncation and accumulated drift; keep seq increment and subsequent call to
build_moof_mdat(seq, 1, dts, &payload, keyframe) unchanged.

In `@rs/moq-mux/src/convert/test.rs`:
- Around line 221-257: Add a call to tokio::time::pause() at the start of each
async test that uses time-based operations to speed execution and enable
deterministic time control: insert tokio::time::pause(); as the first statement
inside the test functions legacy_to_cmaf_video, cmaf_to_legacy_video,
roundtrip_legacy_cmaf_legacy, cmaf_passthrough, and legacy_passthrough so that
time-driven waits/timeouts run instantly under the test runtime configured with
start_paused.
- Around line 205-217: The function subscribe_video currently loops forever;
change it to fail instead of hanging by adding a bounded retry or timeout:
modify subscribe_video(&moq_lite::BroadcastConsumer) ->
Result<moq_lite::TrackConsumer, E> (or return Option) and implement either
tokio::time::timeout(Duration::from_secs(n), async { ... }) around the loop or
count attempts with a max_attempts and tokio::time::sleep between retries before
returning an Err/None; reference the subscribe_video function, the
moq_lite::Track named "video", and the consumer.subscribe_track call so the
retry/backoff and failure return are applied where the subscribe loop currently
runs.

In `@rs/moq-mux/src/producer/fmp4.rs`:
- Around line 579-584: The current fallback that sets track_mdat_data =
&mdat.data[..] when track_data_start >= mdat.data.len() or track_data_end >
mdat.data.len() can mask corrupted offsets; instead validate bounds and return
an error when the requested slice is out of range. In the code around
track_mdat_data (referencing track_data_start, track_data_end and mdat.data),
replace the fallback branch with an early Err(...) (or propagate a suitable
error type) describing invalid track mdat offsets so callers can handle or fail
fast rather than silently using the whole mdat.

In `@rs/moq-relay/src/cluster.rs`:
- Around line 86-105: The loop currently multiplies backoff and calls
anyhow::bail! once the computed timeout exceeds 300s, which permanently abandons
retrying; instead modify the logic around the backoff and timeout in the loop
that calls run_remote_once(&url) so it caps the backoff at a maximum (e.g., 300
seconds) rather than bailing: when Err(err) in the match on res update the
backoff but clamp it to a MAX_BACKOFF constant, log the error with
tracing::error!(%err,...), compute the sleep duration as the minimum of backoff
and MAX_BACKOFF, and remove the anyhow::bail! so the loop continues retrying
indefinitely with a capped backoff; refer to the backoff variable,
run_remote_once, and the error match branch to locate the change.

In `@rs/moq-token-cli/src/main.rs`:
- Around line 91-110: The inner #[allow(deprecated)] on the let payload =
moq_token::Claims { ... } line is redundant because the outer
#[allow(deprecated)] applied to the Commands::Sign match arm already covers the
entire arm; remove the inner #[allow(deprecated)] annotation and leave the outer
attribute in place so Claims construction remains unaffected.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a6a340d8-2bcb-4eae-8bbb-986b28610d61

📥 Commits

Reviewing files that changed from the base of the PR and between cd82878 and 27979eb.

⛔ Files ignored due to path filters (4)
  • Cargo.lock is excluded by !**/*.lock
  • rs/moq-mux/src/producer/test/av1.mp4 is excluded by !**/*.mp4
  • rs/moq-mux/src/producer/test/bbb.mp4 is excluded by !**/*.mp4
  • rs/moq-mux/src/producer/test/vp9.mp4 is excluded by !**/*.mp4
📒 Files selected for processing (93)
  • .gitignore
  • CLAUDE.md
  • cdn/input.tf
  • cdn/main.tf
  • cdn/pub/demo-bbb.service.tftpl
  • cdn/pub/main.tf
  • cdn/pub/variables.tf
  • cdn/relay/dns.tf
  • cdn/relay/justfile
  • cdn/relay/main.tf
  • cdn/relay/moq-relay.service.tftpl
  • cdn/relay/variables.tf
  • dev/.gitignore
  • dev/leaf0.toml
  • dev/leaf1.toml
  • doc/rs/env/native.md
  • doc/spec/draft-ietf-moq-cmsf-00.txt
  • doc/spec/draft-ietf-moq-msf-00.txt
  • js/hang/src/catalog/container.ts
  • js/hang/src/container/cmaf/encode.ts
  • js/watch/src/audio/decoder.ts
  • js/watch/src/audio/mse.ts
  • js/watch/src/audio/source.ts
  • js/watch/src/backend.ts
  • js/watch/src/mse.ts
  • js/watch/src/sync.ts
  • js/watch/src/video/decoder.ts
  • js/watch/src/video/mse.ts
  • js/watch/src/video/source.ts
  • rs/hang/Cargo.toml
  • rs/hang/examples/subscribe.rs
  • rs/hang/examples/video.rs
  • rs/hang/src/catalog/container.rs
  • rs/hang/src/container/consumer.rs
  • rs/hang/src/container/mod.rs
  • rs/hang/src/container/producer.rs
  • rs/libmoq/src/api.rs
  • rs/libmoq/src/consume.rs
  • rs/libmoq/src/error.rs
  • rs/libmoq/src/publish.rs
  • rs/moq-cli/Cargo.toml
  • rs/moq-cli/src/client.rs
  • rs/moq-cli/src/main.rs
  • rs/moq-cli/src/publish.rs
  • rs/moq-cli/src/server.rs
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-clock/src/main.rs
  • rs/moq-ffi/src/consumer.rs
  • rs/moq-ffi/src/log.rs
  • rs/moq-ffi/src/producer.rs
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/broadcast.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/moq-mux/Cargo.toml
  • rs/moq-mux/src/catalog.rs
  • rs/moq-mux/src/consumer/container.rs
  • rs/moq-mux/src/consumer/fmp4.rs
  • rs/moq-mux/src/consumer/frame.rs
  • rs/moq-mux/src/consumer/mod.rs
  • rs/moq-mux/src/consumer/muxer.rs
  • rs/moq-mux/src/consumer/ordered.rs
  • rs/moq-mux/src/convert/fmp4.rs
  • rs/moq-mux/src/convert/hang.rs
  • rs/moq-mux/src/convert/mod.rs
  • rs/moq-mux/src/convert/test.rs
  • rs/moq-mux/src/lib.rs
  • rs/moq-mux/src/msf.rs
  • rs/moq-mux/src/producer/aac.rs
  • rs/moq-mux/src/producer/annexb.rs
  • rs/moq-mux/src/producer/av01.rs
  • rs/moq-mux/src/producer/avc3.rs
  • rs/moq-mux/src/producer/decoder.rs
  • rs/moq-mux/src/producer/fmp4.rs
  • rs/moq-mux/src/producer/hev1.rs
  • rs/moq-mux/src/producer/hls.rs
  • rs/moq-mux/src/producer/mod.rs
  • rs/moq-mux/src/producer/opus.rs
  • rs/moq-mux/src/producer/test/mod.rs
  • rs/moq-native/examples/chat.rs
  • rs/moq-native/src/log.rs
  • rs/moq-relay/Cargo.toml
  • rs/moq-relay/src/auth.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/config.rs
  • rs/moq-relay/src/connection.rs
  • rs/moq-relay/src/web.rs
  • rs/moq-relay/src/websocket.rs
  • rs/moq-token-cli/src/main.rs
  • rs/moq-token/src/claims.rs
  • rs/moq-token/src/key.rs
  • rs/moq-token/src/set.rs
💤 Files with no reviewable changes (3)
  • .gitignore
  • rs/moq-cli/src/server.rs
  • rs/moq-cli/src/client.rs
✅ Files skipped from review due to trivial changes (12)
  • cdn/pub/variables.tf
  • dev/.gitignore
  • rs/hang/src/container/mod.rs
  • CLAUDE.md
  • rs/moq-token/src/key.rs
  • rs/moq-cli/Cargo.toml
  • dev/leaf0.toml
  • rs/moq-token/src/set.rs
  • js/hang/src/container/cmaf/encode.ts
  • rs/moq-mux/src/producer/mod.rs
  • rs/moq-mux/src/consumer/mod.rs
  • rs/moq-mux/src/convert/mod.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • rs/moq-relay/src/web.rs
  • rs/moq-lite/src/lite/publisher.rs

Comment on lines +249 to +259

for trun in &traf.trun {
if trun.data_offset.is_some() {
// data_offset is relative to start of moof. Since we converted the
// fragment ourselves (build_moof_mdat sets data_offset = moof_size + 8),
// we subtract those to get an offset into mdat.data.
// For fragments we produce, data_offset points past the mdat header,
// so the offset into mdat.data is 0 for the first sample.
// For external fragments we don't have moof_size, so we reset to 0.
offset = 0;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

CMAF moof data_offset relative to moof or mdat

💡 Result:

In CMAF (and ISOBMFF movie fragments generally), the data_offset field in the Track Fragment Run Box (trun) is relative to the start of the Movie Fragment Box (moof) when movie-fragment relative addressing is used. This is the required addressing mode for CMAF media segments, which consist of a moof followed by mdat(s). The data_offset specifies the offset from the beginning of the enclosing moof to the first byte of the first sample in the run within the following mdat. This is confirmed by W3C MSE ISO BMFF Byte Stream Format specification (sources 1,3,6), which mandates movie-fragment relative addressing for valid media segments: the first trun in each traf has data-offset-present flag set, and all trafs have default-base-is-moof flag set (base offset is start of moof), or single traf without base-data-offset-present. With default-base-is-moof, data_offset in trun is relative to the moof start. CMAF inherits this from ISOBMFF/ISO 14496-12, and CMAF segments enforce moof+mdat structure with this relative addressing (sources 2,4,8,17). Absolute offsets (relative to file start) are not used in fragmented streaming formats like CMAF to enable self-contained segments.

Citations:


🏁 Script executed:

# First, let's see the full function context to understand how offset is used
cd rs/moq-mux/src/convert && cat -n hang.rs | sed -n '240,270p'

Repository: moq-dev/moq

Length of output: 1571


🏁 Script executed:

# Let's search for where offset is used after being set to 0
rg -n "offset" rs/moq-mux/src/convert/hang.rs -A 5 -B 5 | head -80

Repository: moq-dev/moq

Length of output: 1580


🏁 Script executed:

# Check if there are any tests related to external fragment handling
fd -e rs test | xargs rg -l "external.*fragment\|fragment.*external" 2>/dev/null | head -5

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Look for where convert_fragment_samples is called and how external fragments are passed
rg -n "convert_fragment_samples" rs/moq-mux/src/convert/hang.rs -B 5 -A 5

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Search for how fragments are created or loaded in the codebase to understand external fragment structure
rg -n "external.*fragment\|fragment.*external" rs/moq-mux/src/ -A 2 -B 2 | head -40

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Look for any validation or error handling related to data_offset
rg -n "data_offset" rs/moq-mux/src/ -A 3 -B 3 | head -60

Repository: moq-dev/moq

Length of output: 4440


External fragments with data_offset assume specific structure without validation.

The code silently assumes external CMAF fragments follow the same structure as self-produced fragments (where data_offset = moof_size + 8), setting offset = 0 regardless. For external fragments, this can cause incorrect sample extraction if data_offset has a different value. Add validation or error handling to catch external fragments that don't match this assumption.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/convert/hang.rs` around lines 249 - 259, The loop over
traf.trun blindly treats any trun with data_offset as if it came from
build_moof_mdat (which sets data_offset = moof_size + 8) and unconditionally
sets offset = 0; instead, detect whether the fragment is external vs
self-produced and validate data_offset against expected values before using it:
in the trun handling code (look for variables/functions trun, traf, data_offset,
build_moof_mdat, moof_size, mdat.data, offset) add a check that if the fragment
is external (no moof_size available) then either compute a correct offset from
the provided data_offset or return an error/Err variant (or log and skip) when
data_offset does not match the expected moof_size + 8 pattern; ensure any
early-return or error includes context (traf/trun id) so callers can handle
malformed external fragments.

@kixelated kixelated changed the base branch from main to dev March 20, 2026 13:25
…, and more

- Fix three typos in CMSF spec (Earlist, eventtimeline", beings)
- Replace TODO placeholders in CMSF and MSF Security Considerations
- Change MSF media timeline catalog field from 'type' to 'packaging'
- Update Zod schema to use z.base64() (Zod 4 API)
- Replace tokio::select! with try_join! in publish.rs and subscribe.rs
  to prevent premature converter cancellation
- Add checked arithmetic for PTS calculation in convert/hang.rs
- Fix consume_broadcast to match exact path, not prefix descendants
- Remove redundant #[allow(deprecated)] in moq-token-cli

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@doc/spec/draft-ietf-moq-cmsf-00.txt`:
- Around line 442-443: The document cites [SFrame] but there is no corresponding
entry in the references section (Section 8); either remove the inline citation
"[SFrame]" from the phrase "using Secure Frame [SFrame]" or add a full reference
entry for [SFrame] in the references list (Section 8) with the proper
bibliographic details (title, authors, draft/RFC number, date, and URL) so the
citation resolves; update the text to match the chosen action and ensure the
citation token "[SFrame]" exactly matches the new reference label.

In `@rs/moq-lite/src/model/origin.rs`:
- Around line 507-514: The docstring for announced() is incorrect: it claims the
returned path is absolute but the code strips the consumer root (via
strip_prefix(&self.root)), yielding paths relative to the configured root.
Update the documentation on fn announced() and the OriginAnnounce description to
state that the returned path is root-relative (i.e., has had self.root removed)
rather than absolute, and mention that it will match the consumer's prefix after
stripping; keep the behavior in announced(), strip_prefix usage, and
Error::Dropped semantics unchanged.

In `@rs/moq-mux/src/convert/hang.rs`:
- Around line 266-275: The code narrows dts (u64) to i64 before addition which
can overflow for large dts; instead compute pts by adding the signed composition
offset safely: take entry.cts.unwrap_or_default() as an i64 (or 0), call
u64::checked_add_signed(dts, cts_signed) (or equivalent helper) to get
Option<u64>, return the same "PTS overflow" error on None, remove the
intermediate i64 cast and the ensure-negativity check, and then pass the
resulting u64 pts into Timestamp::from_scale; update the block around the pts
calculation (references: dts, entry.cts, pts, Timestamp::from_scale)
accordingly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9c0c221b-97f9-450e-ba6c-f95ccf73a7b3

📥 Commits

Reviewing files that changed from the base of the PR and between 27979eb and a2db2ed.

📒 Files selected for processing (8)
  • doc/spec/draft-ietf-moq-cmsf-00.txt
  • doc/spec/draft-ietf-moq-msf-00.txt
  • js/hang/src/catalog/container.ts
  • rs/moq-cli/src/publish.rs
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/moq-mux/src/convert/hang.rs
  • rs/moq-token-cli/src/main.rs
💤 Files with no reviewable changes (1)
  • rs/moq-token-cli/src/main.rs
✅ Files skipped from review due to trivial changes (1)
  • js/hang/src/catalog/container.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • doc/spec/draft-ietf-moq-msf-00.txt
  • rs/moq-cli/src/publish.rs

Comment on lines +442 to +443
transport layer (e.g., using Secure Frame [SFrame]).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add or remove unresolved [SFrame] citation.

At Line 442, [SFrame] is cited but no matching reference entry appears in Section 8. This will usually produce an unresolved-reference warning/error in draft tooling.

✏️ Proposed fix (quick safe option)
-      transport layer (e.g., using Secure Frame [SFrame]).
+      transport layer (e.g., using Secure Frame).

If you want to keep the citation, add a proper [SFrame] reference entry in the references section instead.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
transport layer (e.g., using Secure Frame [SFrame]).
transport layer (e.g., using Secure Frame).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@doc/spec/draft-ietf-moq-cmsf-00.txt` around lines 442 - 443, The document
cites [SFrame] but there is no corresponding entry in the references section
(Section 8); either remove the inline citation "[SFrame]" from the phrase "using
Secure Frame [SFrame]" or add a full reference entry for [SFrame] in the
references list (Section 8) with the proper bibliographic details (title,
authors, draft/RFC number, date, and URL) so the citation resolves; update the
text to match the chosen action and ensure the citation token "[SFrame]" exactly
matches the new reference label.

Comment on lines +507 to +514
/// Returns the next announced broadcast and the absolute path.
///
/// The broadcast will only be announced if it was previously unannounced.
/// The same path won't be announced/unannounced twice, instead it will toggle.
/// Returns None if the consumer is closed.
/// If the same path is announced twice, the new broadcast replaces the old one.
/// The old broadcast's `closed()` future will resolve when it is no longer active.
/// Returns `Err(Error::Dropped)` if the consumer is closed.
///
/// Note: The returned path is absolute and will always match this consumer's prefix.
pub async fn announced(&mut self) -> Option<OriginAnnounce> {
self.updates.recv().await
pub async fn announced(&mut self) -> Result<OriginAnnounce, Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Rustdoc says “absolute path,” but returned path is root-stripped.

announced() currently documents an absolute path, but updates are emitted via strip_prefix(&self.root) (Line 43), so rooted consumers receive paths relative to their configured root. Please align the docs with actual behavior.

📝 Suggested doc fix
-	/// Returns the next announced broadcast and the absolute path.
+	/// Returns the next announced broadcast and its path relative to this consumer's root.
@@
-	/// Note: The returned path is absolute and will always match this consumer's prefix.
+	/// Note: The returned path has this consumer's prefix stripped.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Returns the next announced broadcast and the absolute path.
///
/// The broadcast will only be announced if it was previously unannounced.
/// The same path won't be announced/unannounced twice, instead it will toggle.
/// Returns None if the consumer is closed.
/// If the same path is announced twice, the new broadcast replaces the old one.
/// The old broadcast's `closed()` future will resolve when it is no longer active.
/// Returns `Err(Error::Dropped)` if the consumer is closed.
///
/// Note: The returned path is absolute and will always match this consumer's prefix.
pub async fn announced(&mut self) -> Option<OriginAnnounce> {
self.updates.recv().await
pub async fn announced(&mut self) -> Result<OriginAnnounce, Error> {
/// Returns the next announced broadcast and its path relative to this consumer's root.
///
/// If the same path is announced twice, the new broadcast replaces the old one.
/// The old broadcast's `closed()` future will resolve when it is no longer active.
/// Returns `Err(Error::Dropped)` if the consumer is closed.
///
/// Note: The returned path has this consumer's prefix stripped.
pub async fn announced(&mut self) -> Result<OriginAnnounce, Error> {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/origin.rs` around lines 507 - 514, The docstring for
announced() is incorrect: it claims the returned path is absolute but the code
strips the consumer root (via strip_prefix(&self.root)), yielding paths relative
to the configured root. Update the documentation on fn announced() and the
OriginAnnounce description to state that the returned path is root-relative
(i.e., has had self.root removed) rather than absolute, and mention that it will
match the consumer's prefix after stripping; keep the behavior in announced(),
strip_prefix usage, and Error::Dropped semantics unchanged.

Comment on lines +266 to 275
let pts = (dts as i64)
.checked_add(entry.cts.unwrap_or_default() as i64)
.context("PTS overflow")?;
anyhow::ensure!(
pts >= 0,
"PTS is negative: dts={dts} cts={}",
entry.cts.unwrap_or_default()
);
let pts = pts as u64;
let timestamp = Timestamp::from_scale(pts, timescale)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify current narrowing cast is present and checked_add_signed is not yet used in this file.
rg -nP '\(dts as i64\)|checked_add_signed' rs/moq-mux/src/convert/hang.rs -C2

Repository: moq-dev/moq

Length of output: 280


🏁 Script executed:

# Get the full function context and type signatures around line 266
sed -n '200,320p' rs/moq-mux/src/convert/hang.rs | cat -n

Repository: moq-dev/moq

Length of output: 3561


🏁 Script executed:

# Check what type dts is by looking at the function signature and earlier assignments
rg -B30 'let pts = \(dts as i64\)' rs/moq-mux/src/convert/hang.rs | head -60

Repository: moq-dev/moq

Length of output: 1220


🏁 Script executed:

# Find the mp4_atom type definition to see what base_media_decode_time and entry.cts types are
fd -e rs mp4_atom | head -5 | xargs rg -l 'base_media_decode_time|struct.*Trun|cts'

Repository: moq-dev/moq

Length of output: 1980


🏁 Script executed:

# Search for Tfdt and TrunEntry definitions to see the actual types
rg 'struct Tfdt|struct TrunEntry|base_media_decode_time|\.cts' -A3 --type rs | head -40

Repository: moq-dev/moq

Length of output: 81


🏁 Script executed:

# Find files that define or mention Tfdt and TrunEntry structures
rg 'struct Tfdt|struct TrunEntry' -A5 --type-list | head -20

Repository: moq-dev/moq

Length of output: 444


🏁 Script executed:

# Search more broadly for base_media_decode_time
rg 'base_media_decode_time' --type rust -B2 -A2 | head -50

Repository: moq-dev/moq

Length of output: 2165


🏁 Script executed:

# Check imports in hang.rs to see where mp4_atom comes from
head -30 rs/moq-mux/src/convert/hang.rs | rg 'use|mod'

Repository: moq-dev/moq

Length of output: 228


🏁 Script executed:

# Search for Tfdt struct definition
rg 'struct Tfdt' -A3 --type rust

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Find Cargo.toml and check mp4_atom dependency
fd -name 'Cargo.toml' | xargs rg 'mp4_atom' -B1 -A1

Repository: moq-dev/moq

Length of output: 20141


🏁 Script executed:

# Check if mp4_atom is a local module
fd -type d -name 'mp4_atom'

Repository: moq-dev/moq

Length of output: 225


🏁 Script executed:

# Find and display Cargo.toml for rs/moq-mux
cat rs/moq-mux/Cargo.toml | grep -A 5 -B 5 mp4_atom

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Check root Cargo.toml workspace
cat Cargo.toml | head -50

Repository: moq-dev/moq

Length of output: 1289


🏁 Script executed:

# Find and read rs/moq-mux Cargo.toml
find rs -name "Cargo.toml" -path "*moq-mux*" -exec cat {} \;

Repository: moq-dev/moq

Length of output: 1750


🌐 Web query:

mp4-atom crate version 0.10.0 Tfdt TrunEntry types

💡 Result:

In the mp4-atom Rust crate version 0.10.0, Tfdt and TrunEntry are types defined for MP4/ISOBMFF atom parsing and encoding. Tfdt is the Track Fragment Decode Time atom ('tfdt'), located in mp4_atom::moof::traf. It holds the base_media_decode_time (u64), with version 0 using u32 (cast to u64) and version 1 using u64 directly. #[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct Tfdt { pub base_media_decode_time: u64, } TrunEntry is the entry type for the Track Fragment Run atom ('trun'), used in movie fragments for sample information like duration, size, flags, etc. Trun contains a vector of TrunEntry. These are part of the low-level binary encoding/decoding without validation, supporting ISO/IEC 14496-12. Version 0.10.0 was released on 2026-01-14, with 0.10.1 following shortly after.

Citations:


🌐 Web query:

mp4-atom TrunEntry cts field type composition

💡 Result:

In MP4 (ISO Base Media File Format), the 'trun' atom (TrackFragmentRunBox) contains a sequence of TrunEntry structures for samples in a track fragment. The cts field in each TrunEntry is the sample composition time offset, which is the difference between the composition (presentation) timestamp (CT) and decode timestamp (DT): CT(n) = DT(n) + cts(n). The cts field is optional and present only if the trun flags have bit 8 (value 0x000800, flag 2048) set (sample_composition_time_offset_present). When present, there is one 32-bit integer cts value per sample (TrunEntry). The type and signedness of the cts field depend on the trun box version: - Version 0: unsigned int(32) sample_composition_time_offset - Version 1: signed int(32) sample_composition_time_offset This mirrors the 'ctts' box in non-fragmented MP4, where version 0 uses unsigned offsets and version 1 uses signed (for negative offsets in QuickTime compatibility). Sources confirming structure: Microsoft Smooth Streaming spec (derives from ISO 14496-12, explicitly names SampleCompositionTimeOffset) [18]; multiple MP4 parsing libraries (Go, Rust, Java, PHP) showing cts as uint32/sint32 per sample when flag set [19,21,30,31,32]; flags bit positions matching [18,30,31,32].

Citations:


Avoid narrowing dts to i64 before checked addition.

Line 266 casts dts (a u64) to i64 before checked_add, which can wrap for dts > i64::MAX (9,223,372,036,854,775,807), causing valid large timestamps to be incorrectly flagged as negative. Use u64::checked_add_signed instead, which safely handles adding a signed composition time offset to an unsigned decode time.

Proposed fix
-				let pts = (dts as i64)
-					.checked_add(entry.cts.unwrap_or_default() as i64)
-					.context("PTS overflow")?;
-				anyhow::ensure!(
-					pts >= 0,
-					"PTS is negative: dts={dts} cts={}",
-					entry.cts.unwrap_or_default()
-				);
-				let pts = pts as u64;
+				let cts = entry.cts.unwrap_or_default() as i64;
+				let pts = dts
+					.checked_add_signed(cts)
+					.context("PTS overflow/underflow")?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/convert/hang.rs` around lines 266 - 275, The code narrows dts
(u64) to i64 before addition which can overflow for large dts; instead compute
pts by adding the signed composition offset safely: take
entry.cts.unwrap_or_default() as an i64 (or 0), call
u64::checked_add_signed(dts, cts_signed) (or equivalent helper) to get
Option<u64>, return the same "PTS overflow" error on None, remove the
intermediate i64 cast and the ensure-negativity check, and then pass the
resulting u64 pts into Timestamp::from_scale; update the block around the pts
calculation (references: dts, entry.cts, pts, Timestamp::from_scale)
accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant