Skip to content

Subscription model refactor + async subscribe_track#1134

Draft
kixelated wants to merge 13 commits intodevfrom
fetch
Draft

Subscription model refactor + async subscribe_track#1134
kixelated wants to merge 13 commits intodevfrom
fetch

Conversation

@kixelated
Copy link
Collaborator

Summary

  • Add ordered and max_latency fields to Track for publisher preferences
  • Create Subscription / TrackSubscription types for per-consumer subscription tracking with RAII lifecycle
  • Add TrackProducer::poll_max() for aggregate subscription change detection (conducer-based polling)
  • Make BroadcastConsumer::subscribe_track async with TrackRequest request/response pattern
  • Add BroadcastDynamic::insert_track so the handler controls track creation
  • Allow create_group to replace aborted groups (for subscription restart after cancel)
  • Dedup pending track requests across multiple concurrent subscribers

Known issues

  • libmoq FFI tests deadlock — likely BroadcastConsumer state upgrade inside async context holding a lock
  • moq-mux tests may deadlock similarly
  • These will be investigated in follow-up

Test plan

  • cargo check passes
  • cargo clippy -- -D warnings passes
  • cargo fmt --check passes
  • moq-lite tests pass (242 tests including 9 new subscription/group tests)
  • hang tests pass (40 tests)
  • libmoq FFI tests (deadlock — needs investigation)
  • Integration testing with relay

🤖 Generated with Claude Code

kixelated and others added 13 commits March 18, 2026 14:40
- Add `ordered` and `max_latency` fields to `Track` struct
- Create `Subscription` struct for per-consumer preferences
- Create `TrackSubscription` type with RAII lifecycle (auto-removes on drop)
- Add `TrackProducer::poll_max` for aggregate subscription change detection
- Add `TrackConsumer::subscribe()` to register subscriptions
- Add `GroupProducer::is_aborted()` for duplicate group handling
- Allow `create_group` to replace aborted groups (for subscription restart)
- Replace `BroadcastDynamic::requested_track() -> TrackProducer` with
  `requested_track() -> TrackRequest` (handler creates producer, responds)
- Make `BroadcastConsumer::subscribe_track` async with request/response
- Add `BroadcastDynamic::insert_track` for handler-side track insertion
- Add dedup for pending track requests (multiple subscribers share reply)
- Update all callers across the workspace

Known issues:
- libmoq FFI tests deadlock (likely BroadcastConsumer upgrade in async context)
- moq-mux tests may deadlock similarly

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Track is now just `{ name: String }` — no priority/ordered/max_latency
- Subscription gains `start: Option<u64>` and `end: Option<u64>` fields
- Aggregation: start = min (None wins), end = max (None wins)
- No cap/ceiling from producer — aggregation is purely across subscribers
- TrackProducer::subscription() is async, blocks until aggregate changes
- TrackProducer stores prev_subscription internally (no prev arg needed)
- Remove TrackConsumer::start_at() — use Subscription.start instead
- Remove subscription tests (will rewrite in follow-up)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Revert broadcast.rs to sync: consume_track() looks up or creates
  tracks synchronously, requested_track() returns TrackProducer directly
- Remove TrackRequest and oneshot channel machinery
- Replace TrackSubscription with TrackSubscriber using conducer-based
  subscriptions for two-level poll aggregation
- TrackConsumer::subscribe() is async, blocks until first group exists
- TrackSubscriber::recv_group() respects start/end range
- Fix create_group aborted replacement: tombstone old entry, push new
- Update all callers to use consume_track (sync) or subscribe_track
- Fix moq-ffi to remove RUNTIME.block_on() now that consume_track is sync

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- libmoq: move consume_track + consumer construction before tokio::spawn
  so errors surface synchronously
- lite/ietf subscriber: take TrackProducer by value instead of &mut

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
None now means "no preference" rather than "latest wins all."
Callers wanting "latest" should start with None, then update()
with a concrete value once latest() is known.

Also: replace expect() with ? in consume_track, remove dead comment.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove recv_group/poll_recv_group from TrackConsumer (deprecated stubs remain)
- Remove index field from TrackConsumer
- Add poll_recv_group, recv_group, closed, poll_closed to TrackSubscriber
- Rename OrderedConsumer → OrderedSubscriber (takes TrackSubscriber)
- CatalogConsumer now takes TrackSubscriber
- Update all protocol publishers to subscribe() before iterating groups
- Update moq-ffi: subscribe_catalog/subscribe_media are now async
- Update libmoq: move subscribe into spawned tasks
- Update moq-mux: CatalogProducer::consume() is now async
- Fix hang tests: write first group before subscribe to avoid deadlock
- Fix hang tests: await tokio::time::sleep in paused-time tests
- Add unit tests for subscribe() blocking on first group, finish, abort

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Deprecate the generic NotFound error in favor of more specific
variants to make debugging easier. Each maps to its own wire code.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The cleanup task was firing immediately (unused() resolved with zero
consumers), removing tracks from the lookup before any subscriber
could find them. This matches main's behavior: stale entries are
cleaned up lazily by consume_track when it finds closed weak refs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Take dev's moq_mux::consumer types (OrderedConsumer, LegacyConsumer)
- Update them to use TrackSubscriber instead of TrackConsumer
- Take dev's deprecated stubs in hang::container::consumer
- Fix moq-cli subscribe to use consume_track + subscribe
- Fix moq-ffi to use LegacyConsumer with TrackSubscriber

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant