From 3b0090e7c178131e10bff087c99f555e69c06d63 Mon Sep 17 00:00:00 2001 From: Steve McFarlin Date: Wed, 4 Mar 2026 16:25:32 -0800 Subject: [PATCH 1/5] moqsink: refactor to async session controller with dynamic pad forwarding --- src/sink/imp.rs | 783 +++++++++++++++++++++++++++------------------- src/source/mod.rs | 2 +- 2 files changed, 459 insertions(+), 326 deletions(-) diff --git a/src/sink/imp.rs b/src/sink/imp.rs index 65f0fc6..11967fb 100644 --- a/src/sink/imp.rs +++ b/src/sink/imp.rs @@ -1,359 +1,492 @@ -use anyhow::Context as _; +#![allow(dead_code)] + +//! Async-friendly MoqSink that keeps the original dynamic-pad Element +//! behavior while pushing all network setup and CMAF publishing work into +//! a Tokio task. The GLib state change thread never blocks, pads still get +//! requested dynamically, and each pad simply forwards buffers/events to the +//! background worker via a bounded channel. + +use std::collections::HashMap; +use std::sync::{LazyLock, Mutex}; + +use anyhow::{Context, Result}; +use bytes::Bytes; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; - -use std::collections::HashMap; -use std::sync::LazyLock; -use std::sync::Mutex; +use tokio::sync::mpsc; use url::Url; -static CAT: LazyLock = - LazyLock::new(|| gst::DebugCategory::new("moq-sink", gst::DebugColorFlags::empty(), Some("MoQ Sink Element"))); +use hang::moq_lite; +use moq_mux; +use moq_native; -pub static RUNTIME: LazyLock = LazyLock::new(|| { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(1) - .build() - .unwrap() +static RUNTIME: LazyLock = LazyLock::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(4) + .build() + .expect("spawn tokio runtime") }); -#[derive(Default, Clone)] +static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "moq-sink", + gst::DebugColorFlags::empty(), + Some("MoQ Sink (async element)"), + ) +}); + +#[derive(Debug, Clone, Default)] struct Settings { - pub url: Option, - pub broadcast: Option, - pub tls_disable_verify: bool, + url: Option, + broadcast: Option, + tls_disable_verify: bool, +} + +#[derive(Debug, Clone)] +struct ResolvedSettings { + url: Url, + broadcast: String, + tls_disable_verify: bool, +} + +impl TryFrom for ResolvedSettings { + type Error = anyhow::Error; + + fn try_from(value: Settings) -> Result { + Ok(Self { + url: Url::parse(value.url.as_ref().context("url property is required")?)?, + broadcast: value + .broadcast + .as_ref() + .context("broadcast property is required")? + .clone(), + tls_disable_verify: value.tls_disable_verify, + }) + } +} + +#[derive(Debug)] +struct SessionHandle { + sender: mpsc::Sender, + join: tokio::task::JoinHandle<()>, +} + +impl SessionHandle { + fn stop(self) { + let _ = self.sender.blocking_send(ControlMessage::Shutdown); + RUNTIME.spawn(async move { + if let Err(err) = self.join.await { + gst::warning!(CAT, "session task ended with error: {err:?}"); + } + }); + } } struct PadState { - decoder: moq_mux::import::Decoder, - reference_pts: Option, + decoder: moq_mux::import::Decoder, + reference_pts: Option, } -struct State { - _session: moq_lite::Session, - broadcast: moq_lite::BroadcastProducer, - catalog: moq_mux::CatalogProducer, - pads: HashMap, +struct RuntimeState { + #[allow(dead_code)] + session: moq_lite::Session, + broadcast: moq_lite::BroadcastProducer, + catalog: moq_mux::CatalogProducer, + pads: HashMap, +} + +#[derive(Debug)] +enum ControlMessage { + SetCaps { pad_name: String, caps: gst::Caps }, + Buffer { + pad_name: String, + data: Bytes, + pts: Option, + }, + DropPad { pad_name: String }, + Shutdown, } #[derive(Default)] pub struct MoqSink { - settings: Mutex, - state: Mutex>, + settings: Mutex, + session: Mutex>, } #[glib::object_subclass] impl ObjectSubclass for MoqSink { - const NAME: &'static str = "MoqSink"; - type Type = super::MoqSink; - type ParentType = gst::Element; + const NAME: &'static str = "MoqSink"; + type Type = super::MoqSink; + type ParentType = gst::Element; } impl ObjectImpl for MoqSink { - fn properties() -> &'static [glib::ParamSpec] { - static PROPERTIES: LazyLock> = LazyLock::new(|| { - vec![ - glib::ParamSpecString::builder("url") - .nick("Source URL") - .blurb("Connect to the given URL") - .build(), - glib::ParamSpecString::builder("broadcast") - .nick("Broadcast") - .blurb("The name of the broadcast to publish") - .build(), - glib::ParamSpecBoolean::builder("tls-disable-verify") - .nick("TLS disable verify") - .blurb("Disable TLS verification") - .default_value(false) - .build(), - ] - }); - PROPERTIES.as_ref() - } - - fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { - let mut settings = self.settings.lock().unwrap(); - - match pspec.name() { - "url" => settings.url = value.get().unwrap(), - "broadcast" => settings.broadcast = value.get().unwrap(), - "tls-disable-verify" => settings.tls_disable_verify = value.get().unwrap(), - _ => unimplemented!(), - } - } - - fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { - let settings = self.settings.lock().unwrap(); - - match pspec.name() { - "url" => settings.url.to_value(), - "broadcast" => settings.broadcast.to_value(), - "tls-disable-verify" => settings.tls_disable_verify.to_value(), - _ => unimplemented!(), - } - } + fn properties() -> &'static [glib::ParamSpec] { + static PROPS: LazyLock> = LazyLock::new(|| { + vec![ + glib::ParamSpecString::builder("url") + .nick("Source URL") + .blurb("Connect to the given URL") + .build(), + glib::ParamSpecString::builder("broadcast") + .nick("Broadcast") + .blurb("The name of the broadcast to publish") + .build(), + glib::ParamSpecBoolean::builder("tls-disable-verify") + .nick("TLS disable verify") + .blurb("Disable TLS verification") + .default_value(false) + .build(), + ] + }); + PROPS.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut settings = self.settings.lock().unwrap(); + match pspec.name() { + "url" => settings.url = value.get().unwrap(), + "broadcast" => settings.broadcast = value.get().unwrap(), + "tls-disable-verify" => settings.tls_disable_verify = value.get().unwrap(), + _ => unreachable!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + match pspec.name() { + "url" => settings.url.to_value(), + "broadcast" => settings.broadcast.to_value(), + "tls-disable-verify" => settings.tls_disable_verify.to_value(), + _ => unreachable!(), + } + } } impl GstObjectImpl for MoqSink {} impl ElementImpl for MoqSink { - fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { - static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { - gst::subclass::ElementMetadata::new( - "MoQ Sink", - "Sink/Network/MoQ", - "Transmits media over the network via MoQ", - "Luke Curley ", - ) - }); - - Some(&*ELEMENT_METADATA) - } - - fn pad_templates() -> &'static [gst::PadTemplate] { - static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { - let mut caps = gst::Caps::new_empty(); - // Video - caps.merge( - gst::Caps::builder("video/x-h264") - .field("stream-format", "byte-stream") - .field("alignment", "au") - .build(), - ); - caps.merge( - gst::Caps::builder("video/x-h265") - .field("stream-format", "byte-stream") - .field("alignment", "au") - .build(), - ); - caps.merge(gst::Caps::builder("video/x-av1").build()); - // Audio - caps.merge( - gst::Caps::builder("audio/mpeg") - .field("mpegversion", 4i32) - .field("stream-format", "raw") - .build(), - ); - caps.merge(gst::Caps::builder("audio/x-opus").build()); - - let templ = - gst::PadTemplate::new("sink_%u", gst::PadDirection::Sink, gst::PadPresence::Request, &caps).unwrap(); - - vec![templ] - }); - PAD_TEMPLATES.as_ref() - } - - fn request_new_pad( - &self, - templ: &gst::PadTemplate, - name: Option<&str>, - _caps: Option<&gst::Caps>, - ) -> Option { - let builder = gst::Pad::builder_from_template(templ) - .chain_function(|pad, parent, buffer| { - let element = parent - .and_then(|p| p.downcast_ref::()) - .ok_or(gst::FlowError::Error)?; - element.imp().sink_chain(pad, buffer) - }) - .event_function(|pad, parent, event| { - let Some(element) = parent.and_then(|p| p.downcast_ref::()) else { - return false; - }; - element.imp().sink_event(pad, event) - }); - - let pad = if let Some(name) = name { - builder.name(name).build() - } else { - builder.build() - }; - - self.obj().add_pad(&pad).ok()?; - Some(pad) - } - - fn release_pad(&self, pad: &gst::Pad) { - let pad_name = pad.name().to_string(); - if let Some(ref mut state) = *self.state.lock().unwrap() { - state.pads.remove(&pad_name); - } - let _ = self.obj().remove_pad(pad); - } - - fn change_state(&self, transition: gst::StateChange) -> Result { - match transition { - gst::StateChange::ReadyToPaused => { - let _guard = RUNTIME.enter(); - self.setup().map_err(|e| { - gst::error!(CAT, obj = self.obj(), "Failed to setup: {:?}", e); - gst::StateChangeError - })?; - } - gst::StateChange::PausedToReady => { - *self.state.lock().unwrap() = None; - } - _ => (), - } - - self.parent_change_state(transition) - } + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { + gst::subclass::ElementMetadata::new( + "MoQ Sink (async)", + "Sink/Network/MoQ", + "Transmits media over MoQ", + "Luke Curley , Steve McFarlin ", + ) + }); + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { + let mut caps = gst::Caps::new_empty(); + caps.merge( + gst::Caps::builder("video/x-h264") + .field("stream-format", "byte-stream") + .field("alignment", "au") + .build(), + ); + caps.merge( + gst::Caps::builder("video/x-h265") + .field("stream-format", "byte-stream") + .field("alignment", "au") + .build(), + ); + caps.merge(gst::Caps::builder("video/x-av1").build()); + caps.merge( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .build(), + ); + caps.merge(gst::Caps::builder("audio/x-opus").build()); + + let templ = gst::PadTemplate::new( + "sink_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &caps, + ) + .unwrap(); + vec![templ] + }); + PAD_TEMPLATES.as_ref() + } + + fn request_new_pad( + &self, + templ: &gst::PadTemplate, + name: Option<&str>, + _caps: Option<&gst::Caps>, + ) -> Option { + let pad_builder = gst::Pad::builder_from_template(templ) + .chain_function(|pad, parent, buffer| { + let element = parent + .and_then(|p| p.downcast_ref::()) + .ok_or(gst::FlowError::Error)?; + element.imp().forward_buffer(pad, buffer) + }) + .event_function(|pad, parent, event| { + let Some(element) = parent.and_then(|p| p.downcast_ref::()) else { + return false; + }; + element.imp().forward_event(pad, event) + }); + + let pad = if let Some(name) = name { + pad_builder.name(name).build() + } else { + pad_builder.build() + }; + + self.obj().add_pad(&pad).ok()?; + Some(pad) + } + + fn release_pad(&self, pad: &gst::Pad) { + if let Some(session) = self.session.lock().unwrap().as_ref() { + let _ = session + .sender + .blocking_send(ControlMessage::DropPad { pad_name: pad.name().to_string() }); + } + let _ = self.obj().remove_pad(pad); + } + + fn change_state(&self, transition: gst::StateChange) -> Result { + match transition { + gst::StateChange::ReadyToPaused => { + self.start_session().map_err(|err| { + gst::error!(CAT, obj = self.obj(), "failed to start session: {err:#}"); + gst::StateChangeError + })?; + } + gst::StateChange::PausedToReady => self.stop_session(), + _ => (), + } + + self.parent_change_state(transition) + } } impl MoqSink { - fn setup(&self) -> anyhow::Result<()> { - let settings = self.settings.lock().unwrap(); - - let url = settings.url.as_ref().expect("url is required"); - let url = Url::parse(url).context("invalid URL")?; - let name = settings.broadcast.as_ref().expect("broadcast is required").clone(); - - let mut config = moq_native::ClientConfig::default(); - config.tls.disable_verify = Some(settings.tls_disable_verify); - - drop(settings); - - let origin = moq_lite::Origin::produce(); - let mut broadcast = moq_lite::Broadcast::produce(); - let broadcast_consumer = broadcast.consume(); - let catalog = moq_mux::CatalogProducer::new(&mut broadcast)?; - - origin.publish_broadcast(&name, broadcast_consumer); - - let client = config.init()?.with_publish(origin.consume()); - - RUNTIME.block_on(async { - let session = client.connect(url).await.context("failed to connect")?; - - *self.state.lock().unwrap() = Some(State { - _session: session, - broadcast, - catalog, - pads: HashMap::new(), - }); - - anyhow::Ok(()) - }) - } - - fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { - match event.view() { - gst::EventView::Caps(caps_event) => { - let caps = caps_event.caps(); - if let Err(e) = self.handle_caps(pad, caps) { - gst::error!(CAT, obj = pad, "Failed to handle caps: {:?}", e); - return false; - } - true - } - _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), - } - } - - fn handle_caps(&self, pad: &gst::Pad, caps: &gst::CapsRef) -> anyhow::Result<()> { - let structure = caps.structure(0).context("empty caps")?; - let pad_name = pad.name().to_string(); - - let mut state = self.state.lock().unwrap(); - let state = state.as_mut().context("not connected")?; - - let decoder: moq_mux::import::Decoder = match structure.name().as_str() { - "video/x-h264" => { - let mut buf = bytes::Bytes::new(); - moq_mux::import::Decoder::new( - state.broadcast.clone(), - state.catalog.clone(), - moq_mux::import::DecoderFormat::Avc3, - &mut buf, - )? - } - "video/x-h265" => { - let mut buf = bytes::Bytes::new(); - moq_mux::import::Decoder::new( - state.broadcast.clone(), - state.catalog.clone(), - moq_mux::import::DecoderFormat::Hev1, - &mut buf, - )? - } - "video/x-av1" => { - let mut buf = bytes::Bytes::new(); - moq_mux::import::Decoder::new( - state.broadcast.clone(), - state.catalog.clone(), - moq_mux::import::DecoderFormat::Av01, - &mut buf, - )? - } - "audio/mpeg" => { - // aacparse provides AudioSpecificConfig as codec_data in caps - let codec_data = structure - .get::("codec_data") - .context("AAC caps missing codec_data")?; - let map = codec_data.map_readable().context("failed to map codec_data buffer")?; - let mut data = bytes::Bytes::copy_from_slice(map.as_slice()); - moq_mux::import::Decoder::new( - state.broadcast.clone(), - state.catalog.clone(), - moq_mux::import::DecoderFormat::Aac, - &mut data, - )? - } - "audio/x-opus" => { - let channels: i32 = structure.get("channels").unwrap_or(2); - let rate: i32 = structure.get("rate").unwrap_or(48000); - let config = moq_mux::import::OpusConfig { - sample_rate: rate as u32, - channel_count: channels as u32, - }; - moq_mux::import::Opus::new(state.broadcast.clone(), state.catalog.clone(), config)?.into() - } - other => anyhow::bail!("unsupported caps: {}", other), - }; - - state.pads.insert( - pad_name.clone(), - PadState { - decoder, - reference_pts: None, - }, - ); - - gst::info!(CAT, obj = pad, "Configured pad {}", pad_name); - - Ok(()) - } - - fn sink_chain(&self, pad: &gst::Pad, buffer: gst::Buffer) -> Result { - let _guard = RUNTIME.enter(); - - let pad_name = pad.name(); - let mut state = self.state.lock().unwrap(); - let state = state.as_mut().ok_or(gst::FlowError::Error)?; - - let pad_state = state.pads.get_mut(pad_name.as_str()).ok_or_else(|| { - gst::error!(CAT, obj = pad, "Pad {} not configured", pad_name); - gst::FlowError::Error - })?; - - // Compute relative PTS in microseconds - let pts = buffer.pts().and_then(|pts| { - let reference = *pad_state.reference_pts.get_or_insert(pts); - let relative = pts.checked_sub(reference)?; - hang::container::Timestamp::from_micros(relative.nseconds() / 1000).ok() - }); - - let data = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; - let mut bytes = bytes::Bytes::copy_from_slice(data.as_slice()); - - pad_state.decoder.decode_frame(&mut bytes, pts).map_err(|e| { - gst::error!(CAT, obj = pad, "Failed to decode: {}", e); - gst::FlowError::Error - })?; - - Ok(gst::FlowSuccess::Ok) - } + fn start_session(&self) -> Result<()> { + let settings = { + let settings = self.settings.lock().unwrap().clone(); + ResolvedSettings::try_from(settings)? + }; + + let (tx, rx) = mpsc::channel::(128); + let join = RUNTIME.spawn(run_session(settings, rx)); + + *self.session.lock().unwrap() = Some(SessionHandle { sender: tx, join }); + Ok(()) + } + + fn stop_session(&self) { + if let Some(handle) = self.session.lock().unwrap().take() { + handle.stop(); + } + } + + fn forward_buffer(&self, pad: &gst::Pad, buffer: gst::Buffer) -> Result { + let sender = self + .session + .lock() + .unwrap() + .as_ref() + .map(|handle| handle.sender.clone()) + .ok_or(gst::FlowError::Flushing)?; + + let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + let pts = buffer.pts(); + let data = Bytes::copy_from_slice(map.as_slice()); + + sender + .blocking_send(ControlMessage::Buffer { + pad_name: pad.name().to_string(), + data, + pts, + }) + .map_err(|_| gst::FlowError::Flushing)?; + + Ok(gst::FlowSuccess::Ok) + } + + fn forward_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { + match event.view() { + gst::EventView::Caps(caps) => { + let sender = match self + .session + .lock() + .unwrap() + .as_ref() + .map(|handle| handle.sender.clone()) + { + Some(sender) => sender, + None => return false, + }; + + if sender + .blocking_send(ControlMessage::SetCaps { + pad_name: pad.name().to_string(), + caps: caps.caps().to_owned(), + }) + .is_err() + { + return false; + } + + true + } + _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), + } + } +} + +async fn run_session(settings: ResolvedSettings, mut rx: mpsc::Receiver) { + let mut client_config = moq_native::ClientConfig::default(); + client_config.tls.disable_verify = Some(settings.tls_disable_verify); + + let client = match client_config.init() { + Ok(client) => client, + Err(err) => { + gst::error!(CAT, "failed to init client: {err:#}"); + return; + } + }; + + let origin = moq_lite::Origin::produce(); + let mut broadcast = moq_lite::Broadcast::produce(); + let broadcast_consumer = broadcast.consume(); + + let mut catalog = match moq_mux::CatalogProducer::new(&mut broadcast) { + Ok(catalog) => catalog, + Err(err) => { + gst::error!(CAT, "failed to create catalog: {err:#}"); + return; + } + }; + + if !origin.publish_broadcast(&settings.broadcast, broadcast_consumer) { + gst::error!(CAT, "failed to publish broadcast {}", settings.broadcast); + return; + } + + let client = client.with_publish(origin.consume()); + + let session = match client.connect(settings.url.clone()).await { + Ok(session) => session, + Err(err) => { + gst::error!(CAT, "failed to connect: {err:#}"); + return; + } + }; + + let mut runtime = RuntimeState { + session, + broadcast, + catalog, + pads: HashMap::new(), + }; + + while let Some(msg) = rx.recv().await { + match msg { + ControlMessage::SetCaps { pad_name, caps } => { + if let Err(err) = handle_caps(&mut runtime, pad_name, caps) { + gst::error!(CAT, "failed to configure pad: {err:#}"); + } + } + ControlMessage::Buffer { pad_name, data, pts } => { + if let Err(err) = handle_buffer(&mut runtime, pad_name, data, pts) { + gst::error!(CAT, "failed to publish buffer: {err:#}"); + } + } + ControlMessage::DropPad { pad_name } => { + runtime.pads.remove(&pad_name); + } + ControlMessage::Shutdown => break, + } + } +} + +fn handle_caps(runtime: &mut RuntimeState, pad_name: String, caps: gst::Caps) -> Result<()> { + let structure = caps.structure(0).context("empty caps")?; + let decoder: moq_mux::import::Decoder = match structure.name().as_str() { + "video/x-h264" => { + let mut bytes = Bytes::new(); + new_decoder(runtime, moq_mux::import::DecoderFormat::Avc3, &mut bytes)? + } + "video/x-h265" => { + let mut bytes = Bytes::new(); + new_decoder(runtime, moq_mux::import::DecoderFormat::Hev1, &mut bytes)? + } + "video/x-av1" => { + let mut bytes = Bytes::new(); + new_decoder(runtime, moq_mux::import::DecoderFormat::Av01, &mut bytes)? + } + "audio/mpeg" => { + let codec_data = structure + .get::("codec_data") + .context("AAC caps missing codec_data")?; + let map = codec_data.map_readable().context("failed to map codec_data")?; + let mut data = Bytes::copy_from_slice(map.as_slice()); + new_decoder(runtime, moq_mux::import::DecoderFormat::Aac, &mut data)? + } + "audio/x-opus" => { + let channels: i32 = structure.get("channels").unwrap_or(2); + let rate: i32 = structure.get("rate").unwrap_or(48_000); + let config = moq_mux::import::OpusConfig { + sample_rate: rate as u32, + channel_count: channels as u32, + }; + moq_mux::import::Opus::new(runtime.broadcast.clone(), runtime.catalog.clone(), config)?.into() + } + other => anyhow::bail!("unsupported caps: {}", other), + }; + + runtime.pads.insert( + pad_name, + PadState { + decoder, + reference_pts: None, + }, + ); + Ok(()) +} + +fn new_decoder( + runtime: &mut RuntimeState, + format: moq_mux::import::DecoderFormat, + buf: &mut Bytes, +) -> Result { + let decoder = moq_mux::import::Decoder::new( + runtime.broadcast.clone(), + runtime.catalog.clone(), + format, + buf, + )?; + Ok(decoder) +} + +fn handle_buffer( + runtime: &mut RuntimeState, + pad_name: String, + mut data: Bytes, + pts: Option, +) -> Result<()> { + let pad = runtime + .pads + .get_mut(&pad_name) + .context("pad not configured")?; + + let ts = pts.and_then(|pts| { + let reference = *pad.reference_pts.get_or_insert(pts); + let relative = pts.checked_sub(reference)?; + hang::container::Timestamp::from_micros(relative.nseconds() / 1000).ok() + }); + + pad.decoder.decode_frame(&mut data, ts).map_err(|e| anyhow::anyhow!(e)) } diff --git a/src/source/mod.rs b/src/source/mod.rs index b41daca..f19d1b5 100644 --- a/src/source/mod.rs +++ b/src/source/mod.rs @@ -4,7 +4,7 @@ use gst::prelude::*; mod imp; glib::wrapper! { - pub struct MoqSrc(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; + pub struct MoqSrc(ObjectSubclass) @extends gst::Element, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { From 4f8509f4cbd57ab1103b1c1155e999048b0508f6 Mon Sep 17 00:00:00 2001 From: Steve McFarlin Date: Wed, 4 Mar 2026 18:27:52 -0800 Subject: [PATCH 2/5] Refactor MoqSrc into an async element with channel-driven pad forwarding and H.264/H.265/AV1 caps support. --- src/source/imp.rs | 831 +++++++++++++++++++++++++++++++--------------- 1 file changed, 563 insertions(+), 268 deletions(-) diff --git a/src/source/imp.rs b/src/source/imp.rs index cb8b461..85405b4 100644 --- a/src/source/imp.rs +++ b/src/source/imp.rs @@ -1,53 +1,176 @@ -use anyhow::Context as _; +use std::collections::HashMap; +use std::sync::{LazyLock, Mutex}; +use std::time::Duration; + +use anyhow::{bail, Context, Result}; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; +use tokio::sync::{mpsc, oneshot, watch}; -use std::sync::LazyLock; -use std::sync::Mutex; +use hang::moq_lite; static CAT: LazyLock = LazyLock::new(|| gst::DebugCategory::new("moq-src", gst::DebugColorFlags::empty(), Some("MoQ Source Element"))); -pub static RUNTIME: LazyLock = LazyLock::new(|| { +static RUNTIME: LazyLock = LazyLock::new(|| { tokio::runtime::Builder::new_multi_thread() .enable_all() - .worker_threads(1) + .worker_threads(4) .build() - .unwrap() + .expect("spawn tokio runtime") }); -#[derive(Default, Clone)] +#[derive(Debug, Clone, Default)] struct Settings { - pub url: Option, - pub broadcast: Option, - pub tls_disable_verify: bool, + url: Option, + broadcast: Option, + tls_disable_verify: bool, +} + +#[derive(Debug, Clone)] +struct ResolvedSettings { + url: url::Url, + broadcast: String, + tls_disable_verify: bool, +} + +impl TryFrom for ResolvedSettings { + type Error = anyhow::Error; + + fn try_from(value: Settings) -> Result { + Ok(Self { + url: url::Url::parse(value.url.as_ref().context("url property is required")?)?, + broadcast: value + .broadcast + .as_ref() + .context("broadcast property is required")? + .clone(), + tls_disable_verify: value.tls_disable_verify, + }) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +enum TrackKind { + Video, + Audio, +} + +impl TrackKind { + fn template_name(&self) -> &'static str { + match self { + TrackKind::Video => "video_%u", + TrackKind::Audio => "audio_%u", + } + } +} + +#[derive(Debug, Clone)] +struct TrackDescriptor { + kind: TrackKind, + name: String, +} + +impl TrackDescriptor { + fn pad_name(&self) -> String { + match self.kind { + TrackKind::Video => format!("video_{}", self.name), + TrackKind::Audio => format!("audio_{}", self.name), + } + } +} + +#[derive(Debug)] +enum ControlMessage { + CreatePad { + descriptor: TrackDescriptor, + caps: gst::Caps, + reply: oneshot::Sender, + }, + NoMorePads, + ReportError(anyhow::Error), +} + +#[derive(Debug)] +enum PadMessage { + Buffer(gst::Buffer), + Eos, + Drop, +} + +#[derive(Debug, Clone)] +struct PadEndpoint { + sender: mpsc::UnboundedSender, +} + +impl PadEndpoint { + fn send(&self, msg: PadMessage) -> bool { + self.sender.send(msg).is_ok() + } +} + +struct PadHandle { + sender: mpsc::UnboundedSender, + task: glib::JoinHandle<()>, +} + +struct SessionController { + shutdown: watch::Sender, + join: tokio::task::JoinHandle<()>, +} + +impl SessionController { + fn start(settings: ResolvedSettings, control_tx: mpsc::UnboundedSender) -> Self { + let (shutdown_tx, shutdown_rx) = watch::channel(false); + let mut shutdown_rx = shutdown_rx; + let control_for_error = control_tx.clone(); + let join = RUNTIME.spawn(async move { + let result = run_session(settings, control_tx, &mut shutdown_rx).await; + if let Err(err) = result { + let _ = control_for_error.send(ControlMessage::ReportError(err)); + } + }); + + Self { + shutdown: shutdown_tx, + join, + } + } + + fn stop(self) { + let _ = self.shutdown.send(true); + RUNTIME.spawn(async move { + if let Err(err) = self.join.await { + gst::warning!(CAT, "session task ended with error: {err:?}"); + } + }); + } } #[derive(Default)] pub struct MoqSrc { settings: Mutex, - session: Mutex>, - tasks: Mutex>>, + pads: Mutex>, + control_task: Mutex>>, + control_sender: Mutex>>, + session: Mutex>, } #[glib::object_subclass] impl ObjectSubclass for MoqSrc { const NAME: &'static str = "MoqSrc"; type Type = super::MoqSrc; - type ParentType = gst::Bin; + type ParentType = gst::Element; fn new() -> Self { Self::default() } } -impl GstObjectImpl for MoqSrc {} -impl BinImpl for MoqSrc {} - impl ObjectImpl for MoqSrc { fn properties() -> &'static [glib::ParamSpec] { - static PROPERTIES: LazyLock> = LazyLock::new(|| { + static PROPS: LazyLock> = LazyLock::new(|| { vec![ glib::ParamSpecString::builder("url") .nick("Source URL") @@ -55,352 +178,524 @@ impl ObjectImpl for MoqSrc { .build(), glib::ParamSpecString::builder("broadcast") .nick("Broadcast") - .blurb("The name of the broadcast to consume") + .blurb("The broadcast name to subscribe to") .build(), glib::ParamSpecBoolean::builder("tls-disable-verify") - .nick("TLS disable verify") - .blurb("Disable TLS verification") + .nick("TLS Disable Verify") + .blurb("Disable TLS certificate verification") .default_value(false) .build(), ] }); - PROPERTIES.as_ref() + PROPS.as_ref() } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let mut settings = self.settings.lock().unwrap(); - match pspec.name() { "url" => settings.url = value.get().unwrap(), "broadcast" => settings.broadcast = value.get().unwrap(), "tls-disable-verify" => settings.tls_disable_verify = value.get().unwrap(), - _ => unimplemented!(), + _ => unreachable!(), } } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); - match pspec.name() { "url" => settings.url.to_value(), "broadcast" => settings.broadcast.to_value(), "tls-disable-verify" => settings.tls_disable_verify.to_value(), - _ => unimplemented!(), + _ => unreachable!(), } } } +impl GstObjectImpl for MoqSrc {} impl ElementImpl for MoqSrc { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { - static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { + static META: LazyLock = LazyLock::new(|| { gst::subclass::ElementMetadata::new( "MoQ Src", "Source/Network/MoQ", "Receives media over the network via MoQ", - "Luke Curley ", + "Luke Curley , Steve McFarlin ", ) }); - - Some(&*ELEMENT_METADATA) + Some(&*META) } fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { - let video = gst::PadTemplate::new( - "video_%u", - gst::PadDirection::Src, - gst::PadPresence::Sometimes, - &gst::Caps::new_any(), - ) - .unwrap(); - - let audio = gst::PadTemplate::new( - "audio_%u", - gst::PadDirection::Src, - gst::PadPresence::Sometimes, - &gst::Caps::new_any(), - ) - .unwrap(); - - vec![video, audio] + vec![ + gst::PadTemplate::new( + "video_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::new_any(), + ) + .unwrap(), + gst::PadTemplate::new( + "audio_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::new_any(), + ) + .unwrap(), + ] }); - PAD_TEMPLATES.as_ref() } fn change_state(&self, transition: gst::StateChange) -> Result { match transition { gst::StateChange::ReadyToPaused => { - if let Err(e) = RUNTIME.block_on(self.setup()) { - gst::error!(CAT, obj = self.obj(), "Failed to setup: {:?}", e); + if let Err(err) = self.start_session() { + gst::error!(CAT, obj = self.obj(), "failed to start session: {err:?}"); return Err(gst::StateChangeError); } - // Chain up first to let the bin handle the state change - let result = self.parent_change_state(transition); - result?; - // This is a live source - no preroll needed - return Ok(gst::StateChangeSuccess::NoPreroll); + let success = self.parent_change_state(transition)?; + let result = match success { + gst::StateChangeSuccess::Async => gst::StateChangeSuccess::Async, + _ => gst::StateChangeSuccess::NoPreroll, + }; + Ok(result) } - gst::StateChange::PausedToReady => { - // Cleanup publisher - self.cleanup(); + self.stop_session(); + self.parent_change_state(transition) } - - _ => (), + _ => self.parent_change_state(transition), } - - // Chain up for other transitions - self.parent_change_state(transition) } } impl MoqSrc { - async fn setup(&self) -> anyhow::Result<()> { - let (client, url, name, origin_consumer) = { - let settings = self.settings.lock().unwrap(); - let url = url::Url::parse(settings.url.as_ref().expect("url is required"))?; - let name = settings.broadcast.as_ref().expect("broadcast is required").clone(); - - let mut config = moq_native::ClientConfig::default(); - config.tls.disable_verify = Some(settings.tls_disable_verify); - - let origin = moq_lite::Origin::produce(); - let origin_consumer = origin.consume(); - - let client = config.init()?.with_consume(origin); - - (client, url, name, origin_consumer) + fn start_session(&self) -> Result<()> { + let settings = { + let settings = self.settings.lock().unwrap().clone(); + ResolvedSettings::try_from(settings)? }; - let session = client.connect(url).await?; - *self.session.lock().unwrap() = Some(session); - - let broadcast = origin_consumer - .consume_broadcast(&name) - .ok_or_else(|| anyhow::anyhow!("Broadcast '{}' not found", name))?; - - let catalog = broadcast.subscribe_track(&hang::Catalog::default_track())?; - let mut catalog = hang::CatalogConsumer::new(catalog); - - // TODO handle catalog updates - let catalog = catalog.next().await?.context("no catalog found")?.clone(); - - { - for (track_name, config) in catalog.video.renditions { - let track_ref = moq_lite::Track::new(&track_name); - let track_consumer = broadcast.subscribe_track(&track_ref)?; - let mut track = - hang::container::OrderedConsumer::new(track_consumer, std::time::Duration::from_secs(1)); - - let caps = match config.codec { - hang::catalog::VideoCodec::H264(_) => { - let builder = gst::Caps::builder("video/x-h264") - //.field("width", video.resolution.width) - //.field("height", video.resolution.height) - .field("alignment", "au"); - - if let Some(description) = config.description { - builder - .field("stream-format", "avc") - .field("codec_data", gst::Buffer::from_slice(description.clone())) - .build() - } else { - builder.field("stream-format", "annexb").build() - } - } - _ => unimplemented!(), - }; - - gst::info!(CAT, "caps: {:?}", caps); - - let templ = self.obj().element_class().pad_template("video_%u").unwrap(); + let (control_tx, control_rx) = mpsc::unbounded_channel(); + let obj = self.obj(); + let weak = obj.downgrade(); + let context = glib::MainContext::default(); + let control_task = spawn_main_context_forwarder(&context, control_rx, move |msg| { + if let Some(obj) = weak.upgrade() { + obj.imp().handle_control_message(msg); + true + } else { + false + } + }); - let srcpad = gst::Pad::builder_from_template(&templ).name(&track_name).build(); - srcpad.set_active(true).unwrap(); + *self.control_task.lock().unwrap() = Some(control_task); + *self.control_sender.lock().unwrap() = Some(control_tx.clone()); - let stream_start = gst::event::StreamStart::builder(&track_name) - .group_id(gst::GroupId::next()) - .build(); - srcpad.push_event(stream_start); + let session = SessionController::start(settings, control_tx); + *self.session.lock().unwrap() = Some(session); + Ok(()) + } - let caps_evt = gst::event::Caps::new(&caps); - srcpad.push_event(caps_evt); + fn stop_session(&self) { + if let Some(session) = self.session.lock().unwrap().take() { + session.stop(); + } - let segment = gst::event::Segment::new(&gst::FormattedSegment::::new()); - srcpad.push_event(segment); + if let Some(control_task) = self.control_task.lock().unwrap().take() { + control_task.abort(); + } - self.obj().add_pad(&srcpad).expect("Failed to add pad"); + let handles = self.pads.lock().unwrap().drain().collect::>(); + for (name, handle) in handles { + gst::debug!(CAT, "dropping pad {name}"); + let _ = handle.sender.send(PadMessage::Drop); + handle.task.abort(); + } - // Push to the srcpad in a background task. - let mut reference = None; - let handle = tokio::spawn(async move { - loop { - match track.read().await { - Ok(Some(frame)) => { - let payload: Vec = frame.payload.into_iter().flatten().collect(); - let mut buffer = gst::Buffer::from_slice(payload); - let buffer_mut = buffer.get_mut().unwrap(); + *self.control_sender.lock().unwrap() = None; + } - // Make timestamps relative to the first frame for proper playback - let pts = if let Some(reference_ts) = reference { - let timestamp: std::time::Duration = (frame.timestamp - reference_ts).into(); - gst::ClockTime::from_nseconds(timestamp.as_nanos() as _) - } else { - reference = Some(frame.timestamp); - gst::ClockTime::ZERO - }; - buffer_mut.set_pts(Some(pts)); + fn handle_control_message(&self, msg: ControlMessage) { + match msg { + ControlMessage::CreatePad { + descriptor, + caps, + reply, + } => { + if let Err(err) = self.create_pad(descriptor, caps, reply) { + gst::error!(CAT, obj = self.obj(), "failed to create pad: {err:?}"); + } + } + ControlMessage::NoMorePads => { + self.obj().no_more_pads(); + } + ControlMessage::ReportError(err) => { + gst::element_error!(self.obj(), gst::CoreError::Failed, ("session error"), ["{err:?}"]); + } + } + } - let mut flags = buffer_mut.flags(); - // First frame in each group is a keyframe - match frame.index == 0 { - true => flags.remove(gst::BufferFlags::DELTA_UNIT), - false => flags.insert(gst::BufferFlags::DELTA_UNIT), - }; + fn create_pad( + &self, + descriptor: TrackDescriptor, + caps: gst::Caps, + reply: oneshot::Sender, + ) -> Result<()> { + let obj = self.obj(); + let templ = obj + .element_class() + .pad_template(descriptor.kind.template_name()) + .context("missing pad template")?; + + let pad = gst::Pad::builder_from_template(&templ) + .name(descriptor.pad_name()) + .build(); + + pad.set_active(true)?; + + let stream_start = gst::event::StreamStart::builder(&descriptor.name) + .group_id(gst::GroupId::next()) + .build(); + pad.push_event(stream_start); + pad.push_event(gst::event::Caps::new(&caps)); + pad.push_event(gst::event::Segment::new(&gst::FormattedSegment::::new())); + + obj.add_pad(&pad)?; + + let (pad_tx, pad_rx) = mpsc::unbounded_channel(); + let pad_clone = pad.clone(); + let weak = obj.downgrade(); + let context = glib::MainContext::default(); + let task = spawn_main_context_forwarder(&context, pad_rx, move |msg| { + if let Some(obj) = weak.upgrade() { + let imp = obj.imp(); + imp.dispatch_pad_message(&pad_clone, msg) + } else { + false + } + }); - buffer_mut.set_flags(flags); + self.pads.lock().unwrap().insert( + descriptor.pad_name(), + PadHandle { + sender: pad_tx.clone(), + task, + }, + ); - gst::info!(CAT, "pushing sample: {:?}", buffer); + let _ = reply.send(PadEndpoint { sender: pad_tx }); + Ok(()) + } - if let Err(err) = srcpad.push(buffer) { - gst::warning!(CAT, "Failed to push sample: {:?}", err); - } - } - Ok(None) => { - // Stream ended normally - gst::info!(CAT, "Stream ended normally"); - break; - } - Err(e) => { - // Handle connection errors gracefully - gst::warning!(CAT, "Failed to read frame: {:?}", e); - break; - } - } - } - }); - self.tasks.lock().unwrap().push(handle); + fn dispatch_pad_message(&self, pad: &gst::Pad, msg: PadMessage) -> bool { + match msg { + PadMessage::Buffer(buffer) => { + if let Err(err) = pad.push(buffer) { + gst::warning!(CAT, "failed to push buffer: {err:?}"); + return false; + } + true + } + PadMessage::Eos => { + pad.push_event(gst::event::Eos::builder().build()); + true + } + PadMessage::Drop => { + let _ = pad.set_active(false); + let _ = self.obj().remove_pad(pad); + false } } + } +} - { - for (track_name, config) in catalog.audio.renditions { - let track_ref = moq_lite::Track::new(&track_name); - let track_consumer = broadcast.subscribe_track(&track_ref)?; - let mut track = - hang::container::OrderedConsumer::new(track_consumer, std::time::Duration::from_secs(1)); - - let caps = match &config.codec { - hang::catalog::AudioCodec::AAC(_aac) => { - let builder = gst::Caps::builder("audio/mpeg") - .field("mpegversion", 4) - .field("channels", config.channel_count) - .field("rate", config.sample_rate); - - if let Some(description) = config.description { - builder - .field("codec_data", gst::Buffer::from_slice(description.clone())) - .field("stream-format", "aac") - .build() - } else { - builder.field("stream-format", "adts").build() - } - } - hang::catalog::AudioCodec::Opus => { - let builder = gst::Caps::builder("audio/x-opus") - .field("rate", config.sample_rate) - .field("channels", config.channel_count); - - if let Some(description) = config.description { - builder - .field("codec_data", gst::Buffer::from_slice(description.clone())) - .field("stream-format", "ogg") - .build() - } else { - builder.field("stream-format", "opus").build() - } - } - _ => unimplemented!(), - }; +async fn run_session( + settings: ResolvedSettings, + control_tx: mpsc::UnboundedSender, + shutdown: &mut watch::Receiver, +) -> Result<()> { + let mut config = moq_native::ClientConfig::default(); + config.tls.disable_verify = Some(settings.tls_disable_verify); - gst::info!(CAT, "caps: {:?}", caps); + let origin = moq_lite::Origin::produce(); + let origin_consumer = origin.consume(); + let client = config.init()?.with_consume(origin); - let templ = self.obj().element_class().pad_template("audio_%u").unwrap(); + let session = client.connect(settings.url.clone()).await?; + let _session = session; // keep session alive inside this task - let srcpad = gst::Pad::builder_from_template(&templ).name(&track_name).build(); - srcpad.set_active(true).unwrap(); + let broadcast = origin_consumer + .consume_broadcast(&settings.broadcast) + .ok_or_else(|| anyhow::anyhow!("Broadcast '{}' not found", settings.broadcast))?; - let stream_start = gst::event::StreamStart::builder(&track_name) - .group_id(gst::GroupId::next()) - .build(); - srcpad.push_event(stream_start); + let catalog_track = broadcast.subscribe_track(&hang::catalog::Catalog::default_track())?; + let mut catalog = hang::catalog::CatalogConsumer::new(catalog_track); + let catalog = catalog.next().await?.context("catalog missing")?.clone(); - let caps_evt = gst::event::Caps::new(&caps); - srcpad.push_event(caps_evt); + let mut tasks = Vec::new(); - let segment = gst::event::Segment::new(&gst::FormattedSegment::::new()); - srcpad.push_event(segment); + for (track_name, config) in catalog.video.renditions { + let descriptor = TrackDescriptor { + kind: TrackKind::Video, + name: track_name.clone(), + }; + let caps = video_caps(&config)?; + let endpoint = request_pad(&control_tx, descriptor.clone(), caps).await?; + let track_ref = moq_lite::Track::new(&track_name); + let track_consumer = broadcast.subscribe_track(&track_ref)?; + let track = hang::container::OrderedConsumer::new(track_consumer, Duration::from_secs(1)); + tasks.push(spawn_track_pump(track, descriptor, endpoint, shutdown.clone())); + } - self.obj().add_pad(&srcpad).expect("Failed to add pad"); + for (track_name, config) in catalog.audio.renditions { + let descriptor = TrackDescriptor { + kind: TrackKind::Audio, + name: track_name.clone(), + }; + let caps = audio_caps(&config)?; + let endpoint = request_pad(&control_tx, descriptor.clone(), caps).await?; + let track_ref = moq_lite::Track::new(&track_name); + let track_consumer = broadcast.subscribe_track(&track_ref)?; + let track = hang::container::OrderedConsumer::new(track_consumer, Duration::from_secs(1)); + tasks.push(spawn_track_pump(track, descriptor, endpoint, shutdown.clone())); + } - // Push to the srcpad in a background task. - let mut reference = None; - let handle = tokio::spawn(async move { - loop { - match track.read().await { - Ok(Some(frame)) => { - let payload: Vec = frame.payload.into_iter().flatten().collect(); - let mut buffer = gst::Buffer::from_slice(payload); - let buffer_mut = buffer.get_mut().unwrap(); + let _ = control_tx.send(ControlMessage::NoMorePads); - // Make timestamps relative to the first frame for proper playback - let pts = if let Some(reference_ts) = reference { - let timestamp: std::time::Duration = (frame.timestamp - reference_ts).into(); - gst::ClockTime::from_nseconds(timestamp.as_nanos() as _) - } else { - reference = Some(frame.timestamp); - gst::ClockTime::ZERO - }; - buffer_mut.set_pts(Some(pts)); + for task in tasks { + let _ = task.await; + } - let mut flags = buffer_mut.flags(); - flags.remove(gst::BufferFlags::DELTA_UNIT); - buffer_mut.set_flags(flags); + Ok(()) +} - gst::info!(CAT, "pushing sample: {:?}", buffer); +async fn request_pad( + control_tx: &mpsc::UnboundedSender, + descriptor: TrackDescriptor, + caps: gst::Caps, +) -> Result { + let (reply_tx, reply_rx) = oneshot::channel(); + control_tx + .send(ControlMessage::CreatePad { + descriptor, + caps, + reply: reply_tx, + }) + .map_err(|_| anyhow::anyhow!("control plane shut down"))?; + + let endpoint = reply_rx.await.context("pad creation cancelled")?; + Ok(endpoint) +} - if let Err(err) = srcpad.push(buffer) { - gst::warning!(CAT, "Failed to push sample: {:?}", err); +fn spawn_track_pump( + mut track: hang::container::OrderedConsumer, + descriptor: TrackDescriptor, + pad_endpoint: PadEndpoint, + mut shutdown: watch::Receiver, +) -> tokio::task::JoinHandle<()> { + RUNTIME.spawn(async move { + let mut reference_ts = None; + loop { + tokio::select! { + _ = shutdown.changed() => { + pad_endpoint.send(PadMessage::Drop); + break; + } + frame = track.read() => { + match frame { + Ok(Some(frame)) => { + let timestamp = frame.timestamp; + let is_keyframe = frame.is_keyframe(); + let payload = frame.payload; + let mut buffer = gst::Buffer::from_slice(payload.into_iter().flatten().collect::>()); + let buffer_mut = buffer.get_mut().unwrap(); + + let pts = match reference_ts { + Some(reference) => { + let delta: Duration = (timestamp - reference).into(); + gst::ClockTime::from_nseconds(delta.as_nanos() as u64) + } + None => { + reference_ts = Some(timestamp); + gst::ClockTime::ZERO + } + }; + buffer_mut.set_pts(Some(pts)); + + let mut flags = buffer_mut.flags(); + match descriptor.kind { + TrackKind::Video => { + if is_keyframe { + flags.remove(gst::BufferFlags::DELTA_UNIT); + } else { + flags.insert(gst::BufferFlags::DELTA_UNIT); + } + } + TrackKind::Audio => { + flags.remove(gst::BufferFlags::DELTA_UNIT); } } - Ok(None) => { - // Stream ended normally - gst::info!(CAT, "Stream ended normally"); - break; - } - Err(e) => { - // Handle connection errors gracefully - gst::warning!(CAT, "Failed to read frame: {:?}", e); + buffer_mut.set_flags(flags); + + if !pad_endpoint.send(PadMessage::Buffer(buffer)) { break; } } + Ok(None) => { + pad_endpoint.send(PadMessage::Eos); + pad_endpoint.send(PadMessage::Drop); + break; + } + Err(err) => { + gst::warning!(CAT, "track {} failed: {err:?}", descriptor.name); + pad_endpoint.send(PadMessage::Drop); + break; + } } - }); - self.tasks.lock().unwrap().push(handle); + } } } + }) +} - // We downloaded the catalog and created all the pads. - self.obj().no_more_pads(); +fn video_caps(config: &hang::catalog::VideoConfig) -> Result { + use hang::catalog::VideoCodec; + + let caps = match &config.codec { + VideoCodec::H264(_) => { + let mut builder = gst::Caps::builder("video/x-h264").field("alignment", "au"); + if let Some(description) = &config.description { + builder = builder + .field("stream-format", "avc") + .field("codec_data", gst::Buffer::from_slice(description.clone())); + } else { + builder = builder.field("stream-format", "annexb"); + } + builder.build() + } + VideoCodec::H265(h265) => { + let mut builder = gst::Caps::builder("video/x-h265").field("alignment", "au"); + match &config.description { + Some(description) => { + let format = if h265.in_band { "hev1" } else { "hvc1" }; + builder = builder + .field("stream-format", format) + .field("codec_data", gst::Buffer::from_slice(description.clone())); + } + None => { + let format = if h265.in_band { "hev1" } else { "byte-stream" }; + builder = builder.field("stream-format", format); + } + } + builder.build() + } + VideoCodec::AV1(_) => { + let mut builder = gst::Caps::builder("video/x-av1"); + if let Some(description) = &config.description { + builder = builder.field("codec_data", gst::Buffer::from_slice(description.clone())); + } + builder.build() + } + other => bail!("unsupported video codec: {other:?}"), + }; + Ok(caps) +} - Ok(()) - } +fn audio_caps(config: &hang::catalog::AudioConfig) -> Result { + let caps = match &config.codec { + hang::catalog::AudioCodec::AAC(_) => { + let mut builder = gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4) + .field("rate", config.sample_rate) + .field("channels", config.channel_count); + if let Some(description) = &config.description { + builder = builder + .field("codec_data", gst::Buffer::from_slice(description.clone())) + .field("stream-format", "aac"); + } else { + builder = builder.field("stream-format", "adts"); + } + builder.build() + } + hang::catalog::AudioCodec::Opus => { + let mut builder = gst::Caps::builder("audio/x-opus") + .field("rate", config.sample_rate) + .field("channels", config.channel_count); + if let Some(description) = &config.description { + builder = builder + .field("codec_data", gst::Buffer::from_slice(description.clone())) + .field("stream-format", "ogg"); + } + builder.build() + } + other => bail!("unsupported audio codec: {other:?}"), + }; + Ok(caps) +} - fn cleanup(&self) { - for task in self.tasks.lock().unwrap().drain(..) { - task.abort(); +fn spawn_main_context_forwarder( + context: &glib::MainContext, + mut rx: mpsc::UnboundedReceiver, + mut handler: F, +) -> glib::JoinHandle<()> +where + T: Send + 'static, + F: FnMut(T) -> bool + 'static, +{ + let ctx = context.clone(); + ctx.spawn_local(async move { + while let Some(msg) = rx.recv().await { + if !handler(msg) { + break; + } } - *self.session.lock().unwrap() = None; + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::cell::{Cell, RefCell}; + use std::rc::Rc; + + #[test] + fn forwarder_delivers_messages_in_order() { + let context = glib::MainContext::new(); + context + .with_thread_default(|| { + let (tx, rx) = mpsc::unbounded_channel(); + let received = Rc::new(RefCell::new(Vec::new())); + let done = Rc::new(Cell::new(false)); + + let handle = spawn_main_context_forwarder(&context, rx, { + let received = Rc::clone(&received); + let done = Rc::clone(&done); + move |msg: i32| { + received.borrow_mut().push(msg); + if received.borrow().len() >= 3 { + done.set(true); + false + } else { + true + } + } + }); + + tx.send(1).unwrap(); + tx.send(2).unwrap(); + tx.send(3).unwrap(); + drop(tx); + + while !done.get() { + context.iteration(true); + } + + handle.abort(); + + assert_eq!(*received.borrow(), vec![1, 2, 3]); + }) + .unwrap(); } } From 92c66ac7834e0d3164adc46ae73262c815323f00 Mon Sep 17 00:00:00 2001 From: Steve McFarlin Date: Wed, 4 Mar 2026 18:29:35 -0800 Subject: [PATCH 3/5] Change mutability due to warning --- src/sink/imp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sink/imp.rs b/src/sink/imp.rs index 11967fb..903841f 100644 --- a/src/sink/imp.rs +++ b/src/sink/imp.rs @@ -362,7 +362,7 @@ async fn run_session(settings: ResolvedSettings, mut rx: mpsc::Receiver catalog, Err(err) => { gst::error!(CAT, "failed to create catalog: {err:#}"); From 74aa88b2e7fc172b0b8b1288a4e6e3bbd8bd41ef Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 5 Mar 2026 13:35:17 -0800 Subject: [PATCH 4/5] cargo fmt --- src/sink/imp.rs | 800 ++++++++++++++++++++++++------------------------ 1 file changed, 395 insertions(+), 405 deletions(-) diff --git a/src/sink/imp.rs b/src/sink/imp.rs index 903841f..3f2a92f 100644 --- a/src/sink/imp.rs +++ b/src/sink/imp.rs @@ -18,475 +18,465 @@ use tokio::sync::mpsc; use url::Url; use hang::moq_lite; -use moq_mux; -use moq_native; static RUNTIME: LazyLock = LazyLock::new(|| { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(4) - .build() - .expect("spawn tokio runtime") + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(4) + .build() + .expect("spawn tokio runtime") }); static CAT: LazyLock = LazyLock::new(|| { - gst::DebugCategory::new( - "moq-sink", - gst::DebugColorFlags::empty(), - Some("MoQ Sink (async element)"), - ) + gst::DebugCategory::new( + "moq-sink", + gst::DebugColorFlags::empty(), + Some("MoQ Sink (async element)"), + ) }); #[derive(Debug, Clone, Default)] struct Settings { - url: Option, - broadcast: Option, - tls_disable_verify: bool, + url: Option, + broadcast: Option, + tls_disable_verify: bool, } #[derive(Debug, Clone)] struct ResolvedSettings { - url: Url, - broadcast: String, - tls_disable_verify: bool, + url: Url, + broadcast: String, + tls_disable_verify: bool, } impl TryFrom for ResolvedSettings { - type Error = anyhow::Error; - - fn try_from(value: Settings) -> Result { - Ok(Self { - url: Url::parse(value.url.as_ref().context("url property is required")?)?, - broadcast: value - .broadcast - .as_ref() - .context("broadcast property is required")? - .clone(), - tls_disable_verify: value.tls_disable_verify, - }) - } + type Error = anyhow::Error; + + fn try_from(value: Settings) -> Result { + Ok(Self { + url: Url::parse(value.url.as_ref().context("url property is required")?)?, + broadcast: value + .broadcast + .as_ref() + .context("broadcast property is required")? + .clone(), + tls_disable_verify: value.tls_disable_verify, + }) + } } #[derive(Debug)] struct SessionHandle { - sender: mpsc::Sender, - join: tokio::task::JoinHandle<()>, + sender: mpsc::Sender, + join: tokio::task::JoinHandle<()>, } impl SessionHandle { - fn stop(self) { - let _ = self.sender.blocking_send(ControlMessage::Shutdown); - RUNTIME.spawn(async move { - if let Err(err) = self.join.await { - gst::warning!(CAT, "session task ended with error: {err:?}"); - } - }); - } + fn stop(self) { + let _ = self.sender.blocking_send(ControlMessage::Shutdown); + RUNTIME.spawn(async move { + if let Err(err) = self.join.await { + gst::warning!(CAT, "session task ended with error: {err:?}"); + } + }); + } } struct PadState { - decoder: moq_mux::import::Decoder, - reference_pts: Option, + decoder: moq_mux::import::Decoder, + reference_pts: Option, } struct RuntimeState { - #[allow(dead_code)] - session: moq_lite::Session, - broadcast: moq_lite::BroadcastProducer, - catalog: moq_mux::CatalogProducer, - pads: HashMap, + #[allow(dead_code)] + session: moq_lite::Session, + broadcast: moq_lite::BroadcastProducer, + catalog: moq_mux::CatalogProducer, + pads: HashMap, } #[derive(Debug)] enum ControlMessage { - SetCaps { pad_name: String, caps: gst::Caps }, - Buffer { - pad_name: String, - data: Bytes, - pts: Option, - }, - DropPad { pad_name: String }, - Shutdown, + SetCaps { + pad_name: String, + caps: gst::Caps, + }, + Buffer { + pad_name: String, + data: Bytes, + pts: Option, + }, + DropPad { + pad_name: String, + }, + Shutdown, } #[derive(Default)] pub struct MoqSink { - settings: Mutex, - session: Mutex>, + settings: Mutex, + session: Mutex>, } #[glib::object_subclass] impl ObjectSubclass for MoqSink { - const NAME: &'static str = "MoqSink"; - type Type = super::MoqSink; - type ParentType = gst::Element; + const NAME: &'static str = "MoqSink"; + type Type = super::MoqSink; + type ParentType = gst::Element; } impl ObjectImpl for MoqSink { - fn properties() -> &'static [glib::ParamSpec] { - static PROPS: LazyLock> = LazyLock::new(|| { - vec![ - glib::ParamSpecString::builder("url") - .nick("Source URL") - .blurb("Connect to the given URL") - .build(), - glib::ParamSpecString::builder("broadcast") - .nick("Broadcast") - .blurb("The name of the broadcast to publish") - .build(), - glib::ParamSpecBoolean::builder("tls-disable-verify") - .nick("TLS disable verify") - .blurb("Disable TLS verification") - .default_value(false) - .build(), - ] - }); - PROPS.as_ref() - } - - fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { - let mut settings = self.settings.lock().unwrap(); - match pspec.name() { - "url" => settings.url = value.get().unwrap(), - "broadcast" => settings.broadcast = value.get().unwrap(), - "tls-disable-verify" => settings.tls_disable_verify = value.get().unwrap(), - _ => unreachable!(), - } - } - - fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { - let settings = self.settings.lock().unwrap(); - match pspec.name() { - "url" => settings.url.to_value(), - "broadcast" => settings.broadcast.to_value(), - "tls-disable-verify" => settings.tls_disable_verify.to_value(), - _ => unreachable!(), - } - } + fn properties() -> &'static [glib::ParamSpec] { + static PROPS: LazyLock> = LazyLock::new(|| { + vec![ + glib::ParamSpecString::builder("url") + .nick("Source URL") + .blurb("Connect to the given URL") + .build(), + glib::ParamSpecString::builder("broadcast") + .nick("Broadcast") + .blurb("The name of the broadcast to publish") + .build(), + glib::ParamSpecBoolean::builder("tls-disable-verify") + .nick("TLS disable verify") + .blurb("Disable TLS verification") + .default_value(false) + .build(), + ] + }); + PROPS.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut settings = self.settings.lock().unwrap(); + match pspec.name() { + "url" => settings.url = value.get().unwrap(), + "broadcast" => settings.broadcast = value.get().unwrap(), + "tls-disable-verify" => settings.tls_disable_verify = value.get().unwrap(), + _ => unreachable!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + match pspec.name() { + "url" => settings.url.to_value(), + "broadcast" => settings.broadcast.to_value(), + "tls-disable-verify" => settings.tls_disable_verify.to_value(), + _ => unreachable!(), + } + } } impl GstObjectImpl for MoqSink {} impl ElementImpl for MoqSink { - fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { - static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { - gst::subclass::ElementMetadata::new( - "MoQ Sink (async)", - "Sink/Network/MoQ", - "Transmits media over MoQ", - "Luke Curley , Steve McFarlin ", - ) - }); - Some(&*ELEMENT_METADATA) - } - - fn pad_templates() -> &'static [gst::PadTemplate] { - static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { - let mut caps = gst::Caps::new_empty(); - caps.merge( - gst::Caps::builder("video/x-h264") - .field("stream-format", "byte-stream") - .field("alignment", "au") - .build(), - ); - caps.merge( - gst::Caps::builder("video/x-h265") - .field("stream-format", "byte-stream") - .field("alignment", "au") - .build(), - ); - caps.merge(gst::Caps::builder("video/x-av1").build()); - caps.merge( - gst::Caps::builder("audio/mpeg") - .field("mpegversion", 4i32) - .field("stream-format", "raw") - .build(), - ); - caps.merge(gst::Caps::builder("audio/x-opus").build()); - - let templ = gst::PadTemplate::new( - "sink_%u", - gst::PadDirection::Sink, - gst::PadPresence::Request, - &caps, - ) - .unwrap(); - vec![templ] - }); - PAD_TEMPLATES.as_ref() - } - - fn request_new_pad( - &self, - templ: &gst::PadTemplate, - name: Option<&str>, - _caps: Option<&gst::Caps>, - ) -> Option { - let pad_builder = gst::Pad::builder_from_template(templ) - .chain_function(|pad, parent, buffer| { - let element = parent - .and_then(|p| p.downcast_ref::()) - .ok_or(gst::FlowError::Error)?; - element.imp().forward_buffer(pad, buffer) - }) - .event_function(|pad, parent, event| { - let Some(element) = parent.and_then(|p| p.downcast_ref::()) else { - return false; - }; - element.imp().forward_event(pad, event) - }); - - let pad = if let Some(name) = name { - pad_builder.name(name).build() - } else { - pad_builder.build() - }; - - self.obj().add_pad(&pad).ok()?; - Some(pad) - } - - fn release_pad(&self, pad: &gst::Pad) { - if let Some(session) = self.session.lock().unwrap().as_ref() { - let _ = session - .sender - .blocking_send(ControlMessage::DropPad { pad_name: pad.name().to_string() }); - } - let _ = self.obj().remove_pad(pad); - } - - fn change_state(&self, transition: gst::StateChange) -> Result { - match transition { - gst::StateChange::ReadyToPaused => { - self.start_session().map_err(|err| { - gst::error!(CAT, obj = self.obj(), "failed to start session: {err:#}"); - gst::StateChangeError - })?; - } - gst::StateChange::PausedToReady => self.stop_session(), - _ => (), - } - - self.parent_change_state(transition) - } + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { + gst::subclass::ElementMetadata::new( + "MoQ Sink (async)", + "Sink/Network/MoQ", + "Transmits media over MoQ", + "Luke Curley , Steve McFarlin ", + ) + }); + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { + let mut caps = gst::Caps::new_empty(); + caps.merge( + gst::Caps::builder("video/x-h264") + .field("stream-format", "byte-stream") + .field("alignment", "au") + .build(), + ); + caps.merge( + gst::Caps::builder("video/x-h265") + .field("stream-format", "byte-stream") + .field("alignment", "au") + .build(), + ); + caps.merge(gst::Caps::builder("video/x-av1").build()); + caps.merge( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .build(), + ); + caps.merge(gst::Caps::builder("audio/x-opus").build()); + + let templ = + gst::PadTemplate::new("sink_%u", gst::PadDirection::Sink, gst::PadPresence::Request, &caps).unwrap(); + vec![templ] + }); + PAD_TEMPLATES.as_ref() + } + + fn request_new_pad( + &self, + templ: &gst::PadTemplate, + name: Option<&str>, + _caps: Option<&gst::Caps>, + ) -> Option { + let pad_builder = gst::Pad::builder_from_template(templ) + .chain_function(|pad, parent, buffer| { + let element = parent + .and_then(|p| p.downcast_ref::()) + .ok_or(gst::FlowError::Error)?; + element.imp().forward_buffer(pad, buffer) + }) + .event_function(|pad, parent, event| { + let Some(element) = parent.and_then(|p| p.downcast_ref::()) else { + return false; + }; + element.imp().forward_event(pad, event) + }); + + let pad = if let Some(name) = name { + pad_builder.name(name).build() + } else { + pad_builder.build() + }; + + self.obj().add_pad(&pad).ok()?; + Some(pad) + } + + fn release_pad(&self, pad: &gst::Pad) { + if let Some(session) = self.session.lock().unwrap().as_ref() { + let _ = session.sender.blocking_send(ControlMessage::DropPad { + pad_name: pad.name().to_string(), + }); + } + let _ = self.obj().remove_pad(pad); + } + + fn change_state(&self, transition: gst::StateChange) -> Result { + match transition { + gst::StateChange::ReadyToPaused => { + self.start_session().map_err(|err| { + gst::error!(CAT, obj = self.obj(), "failed to start session: {err:#}"); + gst::StateChangeError + })?; + } + gst::StateChange::PausedToReady => self.stop_session(), + _ => (), + } + + self.parent_change_state(transition) + } } impl MoqSink { - fn start_session(&self) -> Result<()> { - let settings = { - let settings = self.settings.lock().unwrap().clone(); - ResolvedSettings::try_from(settings)? - }; - - let (tx, rx) = mpsc::channel::(128); - let join = RUNTIME.spawn(run_session(settings, rx)); - - *self.session.lock().unwrap() = Some(SessionHandle { sender: tx, join }); - Ok(()) - } - - fn stop_session(&self) { - if let Some(handle) = self.session.lock().unwrap().take() { - handle.stop(); - } - } - - fn forward_buffer(&self, pad: &gst::Pad, buffer: gst::Buffer) -> Result { - let sender = self - .session - .lock() - .unwrap() - .as_ref() - .map(|handle| handle.sender.clone()) - .ok_or(gst::FlowError::Flushing)?; - - let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; - let pts = buffer.pts(); - let data = Bytes::copy_from_slice(map.as_slice()); - - sender - .blocking_send(ControlMessage::Buffer { - pad_name: pad.name().to_string(), - data, - pts, - }) - .map_err(|_| gst::FlowError::Flushing)?; - - Ok(gst::FlowSuccess::Ok) - } - - fn forward_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { - match event.view() { - gst::EventView::Caps(caps) => { - let sender = match self - .session - .lock() - .unwrap() - .as_ref() - .map(|handle| handle.sender.clone()) - { - Some(sender) => sender, - None => return false, - }; - - if sender - .blocking_send(ControlMessage::SetCaps { - pad_name: pad.name().to_string(), - caps: caps.caps().to_owned(), - }) - .is_err() - { - return false; - } - - true - } - _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), - } - } + fn start_session(&self) -> Result<()> { + let settings = { + let settings = self.settings.lock().unwrap().clone(); + ResolvedSettings::try_from(settings)? + }; + + let (tx, rx) = mpsc::channel::(128); + let join = RUNTIME.spawn(run_session(settings, rx)); + + *self.session.lock().unwrap() = Some(SessionHandle { sender: tx, join }); + Ok(()) + } + + fn stop_session(&self) { + if let Some(handle) = self.session.lock().unwrap().take() { + handle.stop(); + } + } + + fn forward_buffer(&self, pad: &gst::Pad, buffer: gst::Buffer) -> Result { + let sender = self + .session + .lock() + .unwrap() + .as_ref() + .map(|handle| handle.sender.clone()) + .ok_or(gst::FlowError::Flushing)?; + + let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + let pts = buffer.pts(); + let data = Bytes::copy_from_slice(map.as_slice()); + + sender + .blocking_send(ControlMessage::Buffer { + pad_name: pad.name().to_string(), + data, + pts, + }) + .map_err(|_| gst::FlowError::Flushing)?; + + Ok(gst::FlowSuccess::Ok) + } + + fn forward_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { + match event.view() { + gst::EventView::Caps(caps) => { + let sender = match self + .session + .lock() + .unwrap() + .as_ref() + .map(|handle| handle.sender.clone()) + { + Some(sender) => sender, + None => return false, + }; + + if sender + .blocking_send(ControlMessage::SetCaps { + pad_name: pad.name().to_string(), + caps: caps.caps().to_owned(), + }) + .is_err() + { + return false; + } + + true + } + _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), + } + } } async fn run_session(settings: ResolvedSettings, mut rx: mpsc::Receiver) { - let mut client_config = moq_native::ClientConfig::default(); - client_config.tls.disable_verify = Some(settings.tls_disable_verify); - - let client = match client_config.init() { - Ok(client) => client, - Err(err) => { - gst::error!(CAT, "failed to init client: {err:#}"); - return; - } - }; - - let origin = moq_lite::Origin::produce(); - let mut broadcast = moq_lite::Broadcast::produce(); - let broadcast_consumer = broadcast.consume(); - - let catalog = match moq_mux::CatalogProducer::new(&mut broadcast) { - Ok(catalog) => catalog, - Err(err) => { - gst::error!(CAT, "failed to create catalog: {err:#}"); - return; - } - }; - - if !origin.publish_broadcast(&settings.broadcast, broadcast_consumer) { - gst::error!(CAT, "failed to publish broadcast {}", settings.broadcast); - return; - } - - let client = client.with_publish(origin.consume()); - - let session = match client.connect(settings.url.clone()).await { - Ok(session) => session, - Err(err) => { - gst::error!(CAT, "failed to connect: {err:#}"); - return; - } - }; - - let mut runtime = RuntimeState { - session, - broadcast, - catalog, - pads: HashMap::new(), - }; - - while let Some(msg) = rx.recv().await { - match msg { - ControlMessage::SetCaps { pad_name, caps } => { - if let Err(err) = handle_caps(&mut runtime, pad_name, caps) { - gst::error!(CAT, "failed to configure pad: {err:#}"); - } - } - ControlMessage::Buffer { pad_name, data, pts } => { - if let Err(err) = handle_buffer(&mut runtime, pad_name, data, pts) { - gst::error!(CAT, "failed to publish buffer: {err:#}"); - } - } - ControlMessage::DropPad { pad_name } => { - runtime.pads.remove(&pad_name); - } - ControlMessage::Shutdown => break, - } - } + let mut client_config = moq_native::ClientConfig::default(); + client_config.tls.disable_verify = Some(settings.tls_disable_verify); + + let client = match client_config.init() { + Ok(client) => client, + Err(err) => { + gst::error!(CAT, "failed to init client: {err:#}"); + return; + } + }; + + let origin = moq_lite::Origin::produce(); + let mut broadcast = moq_lite::Broadcast::produce(); + let broadcast_consumer = broadcast.consume(); + + let catalog = match moq_mux::CatalogProducer::new(&mut broadcast) { + Ok(catalog) => catalog, + Err(err) => { + gst::error!(CAT, "failed to create catalog: {err:#}"); + return; + } + }; + + if !origin.publish_broadcast(&settings.broadcast, broadcast_consumer) { + gst::error!(CAT, "failed to publish broadcast {}", settings.broadcast); + return; + } + + let client = client.with_publish(origin.consume()); + + let session = match client.connect(settings.url.clone()).await { + Ok(session) => session, + Err(err) => { + gst::error!(CAT, "failed to connect: {err:#}"); + return; + } + }; + + let mut runtime = RuntimeState { + session, + broadcast, + catalog, + pads: HashMap::new(), + }; + + while let Some(msg) = rx.recv().await { + match msg { + ControlMessage::SetCaps { pad_name, caps } => { + if let Err(err) = handle_caps(&mut runtime, pad_name, caps) { + gst::error!(CAT, "failed to configure pad: {err:#}"); + } + } + ControlMessage::Buffer { pad_name, data, pts } => { + if let Err(err) = handle_buffer(&mut runtime, pad_name, data, pts) { + gst::error!(CAT, "failed to publish buffer: {err:#}"); + } + } + ControlMessage::DropPad { pad_name } => { + runtime.pads.remove(&pad_name); + } + ControlMessage::Shutdown => break, + } + } } fn handle_caps(runtime: &mut RuntimeState, pad_name: String, caps: gst::Caps) -> Result<()> { - let structure = caps.structure(0).context("empty caps")?; - let decoder: moq_mux::import::Decoder = match structure.name().as_str() { - "video/x-h264" => { - let mut bytes = Bytes::new(); - new_decoder(runtime, moq_mux::import::DecoderFormat::Avc3, &mut bytes)? - } - "video/x-h265" => { - let mut bytes = Bytes::new(); - new_decoder(runtime, moq_mux::import::DecoderFormat::Hev1, &mut bytes)? - } - "video/x-av1" => { - let mut bytes = Bytes::new(); - new_decoder(runtime, moq_mux::import::DecoderFormat::Av01, &mut bytes)? - } - "audio/mpeg" => { - let codec_data = structure - .get::("codec_data") - .context("AAC caps missing codec_data")?; - let map = codec_data.map_readable().context("failed to map codec_data")?; - let mut data = Bytes::copy_from_slice(map.as_slice()); - new_decoder(runtime, moq_mux::import::DecoderFormat::Aac, &mut data)? - } - "audio/x-opus" => { - let channels: i32 = structure.get("channels").unwrap_or(2); - let rate: i32 = structure.get("rate").unwrap_or(48_000); - let config = moq_mux::import::OpusConfig { - sample_rate: rate as u32, - channel_count: channels as u32, - }; - moq_mux::import::Opus::new(runtime.broadcast.clone(), runtime.catalog.clone(), config)?.into() - } - other => anyhow::bail!("unsupported caps: {}", other), - }; - - runtime.pads.insert( - pad_name, - PadState { - decoder, - reference_pts: None, - }, - ); - Ok(()) + let structure = caps.structure(0).context("empty caps")?; + let decoder: moq_mux::import::Decoder = match structure.name().as_str() { + "video/x-h264" => { + let mut bytes = Bytes::new(); + new_decoder(runtime, moq_mux::import::DecoderFormat::Avc3, &mut bytes)? + } + "video/x-h265" => { + let mut bytes = Bytes::new(); + new_decoder(runtime, moq_mux::import::DecoderFormat::Hev1, &mut bytes)? + } + "video/x-av1" => { + let mut bytes = Bytes::new(); + new_decoder(runtime, moq_mux::import::DecoderFormat::Av01, &mut bytes)? + } + "audio/mpeg" => { + let codec_data = structure + .get::("codec_data") + .context("AAC caps missing codec_data")?; + let map = codec_data.map_readable().context("failed to map codec_data")?; + let mut data = Bytes::copy_from_slice(map.as_slice()); + new_decoder(runtime, moq_mux::import::DecoderFormat::Aac, &mut data)? + } + "audio/x-opus" => { + let channels: i32 = structure.get("channels").unwrap_or(2); + let rate: i32 = structure.get("rate").unwrap_or(48_000); + let config = moq_mux::import::OpusConfig { + sample_rate: rate as u32, + channel_count: channels as u32, + }; + moq_mux::import::Opus::new(runtime.broadcast.clone(), runtime.catalog.clone(), config)?.into() + } + other => anyhow::bail!("unsupported caps: {}", other), + }; + + runtime.pads.insert( + pad_name, + PadState { + decoder, + reference_pts: None, + }, + ); + Ok(()) } fn new_decoder( - runtime: &mut RuntimeState, - format: moq_mux::import::DecoderFormat, - buf: &mut Bytes, + runtime: &mut RuntimeState, + format: moq_mux::import::DecoderFormat, + buf: &mut Bytes, ) -> Result { - let decoder = moq_mux::import::Decoder::new( - runtime.broadcast.clone(), - runtime.catalog.clone(), - format, - buf, - )?; - Ok(decoder) + let decoder = moq_mux::import::Decoder::new(runtime.broadcast.clone(), runtime.catalog.clone(), format, buf)?; + Ok(decoder) } fn handle_buffer( - runtime: &mut RuntimeState, - pad_name: String, - mut data: Bytes, - pts: Option, + runtime: &mut RuntimeState, + pad_name: String, + mut data: Bytes, + pts: Option, ) -> Result<()> { - let pad = runtime - .pads - .get_mut(&pad_name) - .context("pad not configured")?; - - let ts = pts.and_then(|pts| { - let reference = *pad.reference_pts.get_or_insert(pts); - let relative = pts.checked_sub(reference)?; - hang::container::Timestamp::from_micros(relative.nseconds() / 1000).ok() - }); - - pad.decoder.decode_frame(&mut data, ts).map_err(|e| anyhow::anyhow!(e)) + let pad = runtime.pads.get_mut(&pad_name).context("pad not configured")?; + + let ts = pts.and_then(|pts| { + let reference = *pad.reference_pts.get_or_insert(pts); + let relative = pts.checked_sub(reference)?; + hang::container::Timestamp::from_micros(relative.nseconds() / 1000).ok() + }); + + pad.decoder.decode_frame(&mut data, ts).map_err(|e| anyhow::anyhow!(e)) } From 0ebb66424ffa322a6c9efa8ef20d24a467e870dd Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 6 Mar 2026 15:07:27 -0800 Subject: [PATCH 5/5] Address PR review feedback - Remove worker_threads(4) from both runtimes (Quinn doesn't scale past 1) - Switch sink to unbounded channel - Make sink run_session return Result<()> for centralized error handling - Extract spawn body into named run_track_pump function in source - Remove unnecessary forwarder test - Minor cleanups: remove #![allow(dead_code)], simplify destructure, collapse _session binding Co-Authored-By: Claude Opus 4.6 --- src/sink/imp.rs | 61 +++++++---------- src/source/imp.rs | 166 +++++++++++++++++----------------------------- 2 files changed, 85 insertions(+), 142 deletions(-) diff --git a/src/sink/imp.rs b/src/sink/imp.rs index 3f2a92f..aca9e15 100644 --- a/src/sink/imp.rs +++ b/src/sink/imp.rs @@ -1,10 +1,8 @@ -#![allow(dead_code)] - //! Async-friendly MoqSink that keeps the original dynamic-pad Element //! behavior while pushing all network setup and CMAF publishing work into //! a Tokio task. The GLib state change thread never blocks, pads still get //! requested dynamically, and each pad simply forwards buffers/events to the -//! background worker via a bounded channel. +//! background worker via an unbounded channel. use std::collections::HashMap; use std::sync::{LazyLock, Mutex}; @@ -22,7 +20,6 @@ use hang::moq_lite; static RUNTIME: LazyLock = LazyLock::new(|| { tokio::runtime::Builder::new_multi_thread() .enable_all() - .worker_threads(4) .build() .expect("spawn tokio runtime") }); @@ -67,13 +64,13 @@ impl TryFrom for ResolvedSettings { #[derive(Debug)] struct SessionHandle { - sender: mpsc::Sender, + sender: mpsc::UnboundedSender, join: tokio::task::JoinHandle<()>, } impl SessionHandle { fn stop(self) { - let _ = self.sender.blocking_send(ControlMessage::Shutdown); + let _ = self.sender.send(ControlMessage::Shutdown); RUNTIME.spawn(async move { if let Err(err) = self.join.await { gst::warning!(CAT, "session task ended with error: {err:?}"); @@ -246,7 +243,7 @@ impl ElementImpl for MoqSink { fn release_pad(&self, pad: &gst::Pad) { if let Some(session) = self.session.lock().unwrap().as_ref() { - let _ = session.sender.blocking_send(ControlMessage::DropPad { + let _ = session.sender.send(ControlMessage::DropPad { pad_name: pad.name().to_string(), }); } @@ -276,8 +273,12 @@ impl MoqSink { ResolvedSettings::try_from(settings)? }; - let (tx, rx) = mpsc::channel::(128); - let join = RUNTIME.spawn(run_session(settings, rx)); + let (tx, rx) = mpsc::unbounded_channel::(); + let join = RUNTIME.spawn(async move { + if let Err(err) = run_session(settings, rx).await { + gst::error!(CAT, "session error: {err:#}"); + } + }); *self.session.lock().unwrap() = Some(SessionHandle { sender: tx, join }); Ok(()) @@ -303,7 +304,7 @@ impl MoqSink { let data = Bytes::copy_from_slice(map.as_slice()); sender - .blocking_send(ControlMessage::Buffer { + .send(ControlMessage::Buffer { pad_name: pad.name().to_string(), data, pts, @@ -328,7 +329,7 @@ impl MoqSink { }; if sender - .blocking_send(ControlMessage::SetCaps { + .send(ControlMessage::SetCaps { pad_name: pad.name().to_string(), caps: caps.caps().to_owned(), }) @@ -344,44 +345,26 @@ impl MoqSink { } } -async fn run_session(settings: ResolvedSettings, mut rx: mpsc::Receiver) { +async fn run_session(settings: ResolvedSettings, mut rx: mpsc::UnboundedReceiver) -> Result<()> { let mut client_config = moq_native::ClientConfig::default(); client_config.tls.disable_verify = Some(settings.tls_disable_verify); - let client = match client_config.init() { - Ok(client) => client, - Err(err) => { - gst::error!(CAT, "failed to init client: {err:#}"); - return; - } - }; + let client = client_config.init()?; let origin = moq_lite::Origin::produce(); let mut broadcast = moq_lite::Broadcast::produce(); let broadcast_consumer = broadcast.consume(); - let catalog = match moq_mux::CatalogProducer::new(&mut broadcast) { - Ok(catalog) => catalog, - Err(err) => { - gst::error!(CAT, "failed to create catalog: {err:#}"); - return; - } - }; + let catalog = moq_mux::CatalogProducer::new(&mut broadcast)?; - if !origin.publish_broadcast(&settings.broadcast, broadcast_consumer) { - gst::error!(CAT, "failed to publish broadcast {}", settings.broadcast); - return; - } + anyhow::ensure!( + origin.publish_broadcast(&settings.broadcast, broadcast_consumer), + "failed to publish broadcast {}", + settings.broadcast + ); let client = client.with_publish(origin.consume()); - - let session = match client.connect(settings.url.clone()).await { - Ok(session) => session, - Err(err) => { - gst::error!(CAT, "failed to connect: {err:#}"); - return; - } - }; + let session = client.connect(settings.url.clone()).await?; let mut runtime = RuntimeState { session, @@ -408,6 +391,8 @@ async fn run_session(settings: ResolvedSettings, mut rx: mpsc::Receiver break, } } + + Ok(()) } fn handle_caps(runtime: &mut RuntimeState, pad_name: String, caps: gst::Caps) -> Result<()> { diff --git a/src/source/imp.rs b/src/source/imp.rs index 85405b4..b34454f 100644 --- a/src/source/imp.rs +++ b/src/source/imp.rs @@ -16,7 +16,6 @@ static CAT: LazyLock = static RUNTIME: LazyLock = LazyLock::new(|| { tokio::runtime::Builder::new_multi_thread() .enable_all() - .worker_threads(4) .build() .expect("spawn tokio runtime") }); @@ -122,8 +121,7 @@ struct SessionController { impl SessionController { fn start(settings: ResolvedSettings, control_tx: mpsc::UnboundedSender) -> Self { - let (shutdown_tx, shutdown_rx) = watch::channel(false); - let mut shutdown_rx = shutdown_rx; + let (shutdown_tx, mut shutdown_rx) = watch::channel(false); let control_for_error = control_tx.clone(); let join = RUNTIME.spawn(async move { let result = run_session(settings, control_tx, &mut shutdown_rx).await; @@ -423,8 +421,7 @@ async fn run_session( let origin_consumer = origin.consume(); let client = config.init()?.with_consume(origin); - let session = client.connect(settings.url.clone()).await?; - let _session = session; // keep session alive inside this task + let _session = client.connect(settings.url.clone()).await?; let broadcast = origin_consumer .consume_broadcast(&settings.broadcast) @@ -490,74 +487,81 @@ async fn request_pad( } fn spawn_track_pump( + track: hang::container::OrderedConsumer, + descriptor: TrackDescriptor, + pad_endpoint: PadEndpoint, + shutdown: watch::Receiver, +) -> tokio::task::JoinHandle<()> { + RUNTIME.spawn(run_track_pump(track, descriptor, pad_endpoint, shutdown)) +} + +async fn run_track_pump( mut track: hang::container::OrderedConsumer, descriptor: TrackDescriptor, pad_endpoint: PadEndpoint, mut shutdown: watch::Receiver, -) -> tokio::task::JoinHandle<()> { - RUNTIME.spawn(async move { - let mut reference_ts = None; - loop { - tokio::select! { - _ = shutdown.changed() => { - pad_endpoint.send(PadMessage::Drop); - break; - } - frame = track.read() => { - match frame { - Ok(Some(frame)) => { - let timestamp = frame.timestamp; - let is_keyframe = frame.is_keyframe(); - let payload = frame.payload; - let mut buffer = gst::Buffer::from_slice(payload.into_iter().flatten().collect::>()); - let buffer_mut = buffer.get_mut().unwrap(); - - let pts = match reference_ts { - Some(reference) => { - let delta: Duration = (timestamp - reference).into(); - gst::ClockTime::from_nseconds(delta.as_nanos() as u64) - } - None => { - reference_ts = Some(timestamp); - gst::ClockTime::ZERO - } - }; - buffer_mut.set_pts(Some(pts)); - - let mut flags = buffer_mut.flags(); - match descriptor.kind { - TrackKind::Video => { - if is_keyframe { - flags.remove(gst::BufferFlags::DELTA_UNIT); - } else { - flags.insert(gst::BufferFlags::DELTA_UNIT); - } - } - TrackKind::Audio => { +) { + let mut reference_ts = None; + loop { + tokio::select! { + _ = shutdown.changed() => { + pad_endpoint.send(PadMessage::Drop); + break; + } + frame = track.read() => { + match frame { + Ok(Some(frame)) => { + let timestamp = frame.timestamp; + let is_keyframe = frame.is_keyframe(); + let payload = frame.payload; + let mut buffer = gst::Buffer::from_slice(payload.into_iter().flatten().collect::>()); + let buffer_mut = buffer.get_mut().unwrap(); + + let pts = match reference_ts { + Some(reference) => { + let delta: Duration = (timestamp - reference).into(); + gst::ClockTime::from_nseconds(delta.as_nanos() as u64) + } + None => { + reference_ts = Some(timestamp); + gst::ClockTime::ZERO + } + }; + buffer_mut.set_pts(Some(pts)); + + let mut flags = buffer_mut.flags(); + match descriptor.kind { + TrackKind::Video => { + if is_keyframe { flags.remove(gst::BufferFlags::DELTA_UNIT); + } else { + flags.insert(gst::BufferFlags::DELTA_UNIT); } } - buffer_mut.set_flags(flags); - - if !pad_endpoint.send(PadMessage::Buffer(buffer)) { - break; + TrackKind::Audio => { + flags.remove(gst::BufferFlags::DELTA_UNIT); } } - Ok(None) => { - pad_endpoint.send(PadMessage::Eos); - pad_endpoint.send(PadMessage::Drop); - break; - } - Err(err) => { - gst::warning!(CAT, "track {} failed: {err:?}", descriptor.name); - pad_endpoint.send(PadMessage::Drop); + buffer_mut.set_flags(flags); + + if !pad_endpoint.send(PadMessage::Buffer(buffer)) { break; } } + Ok(None) => { + pad_endpoint.send(PadMessage::Eos); + pad_endpoint.send(PadMessage::Drop); + break; + } + Err(err) => { + gst::warning!(CAT, "track {} failed: {err:?}", descriptor.name); + pad_endpoint.send(PadMessage::Drop); + break; + } } } } - }) + } } fn video_caps(config: &hang::catalog::VideoConfig) -> Result { @@ -653,49 +657,3 @@ where } }) } - -#[cfg(test)] -mod tests { - use super::*; - use std::cell::{Cell, RefCell}; - use std::rc::Rc; - - #[test] - fn forwarder_delivers_messages_in_order() { - let context = glib::MainContext::new(); - context - .with_thread_default(|| { - let (tx, rx) = mpsc::unbounded_channel(); - let received = Rc::new(RefCell::new(Vec::new())); - let done = Rc::new(Cell::new(false)); - - let handle = spawn_main_context_forwarder(&context, rx, { - let received = Rc::clone(&received); - let done = Rc::clone(&done); - move |msg: i32| { - received.borrow_mut().push(msg); - if received.borrow().len() >= 3 { - done.set(true); - false - } else { - true - } - } - }); - - tx.send(1).unwrap(); - tx.send(2).unwrap(); - tx.send(3).unwrap(); - drop(tx); - - while !done.get() { - context.iteration(true); - } - - handle.abort(); - - assert_eq!(*received.borrow(), vec![1, 2, 3]); - }) - .unwrap(); - } -}