diff --git a/src/sink/imp.rs b/src/sink/imp.rs index 65f0fc6..aca9e15 100644 --- a/src/sink/imp.rs +++ b/src/sink/imp.rs @@ -1,29 +1,82 @@ -use anyhow::Context as _; +//! Async-friendly MoqSink that keeps the original dynamic-pad Element +//! behavior while pushing all network setup and CMAF publishing work into +//! a Tokio task. The GLib state change thread never blocks, pads still get +//! requested dynamically, and each pad simply forwards buffers/events to the +//! background worker via an unbounded channel. + +use std::collections::HashMap; +use std::sync::{LazyLock, Mutex}; + +use anyhow::{Context, Result}; +use bytes::Bytes; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; - -use std::collections::HashMap; -use std::sync::LazyLock; -use std::sync::Mutex; +use tokio::sync::mpsc; use url::Url; -static CAT: LazyLock = - LazyLock::new(|| gst::DebugCategory::new("moq-sink", gst::DebugColorFlags::empty(), Some("MoQ Sink Element"))); +use hang::moq_lite; -pub static RUNTIME: LazyLock = LazyLock::new(|| { +static RUNTIME: LazyLock = LazyLock::new(|| { tokio::runtime::Builder::new_multi_thread() .enable_all() - .worker_threads(1) .build() - .unwrap() + .expect("spawn tokio runtime") }); -#[derive(Default, Clone)] +static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "moq-sink", + gst::DebugColorFlags::empty(), + Some("MoQ Sink (async element)"), + ) +}); + +#[derive(Debug, Clone, Default)] struct Settings { - pub url: Option, - pub broadcast: Option, - pub tls_disable_verify: bool, + url: Option, + broadcast: Option, + tls_disable_verify: bool, +} + +#[derive(Debug, Clone)] +struct ResolvedSettings { + url: Url, + broadcast: String, + tls_disable_verify: bool, +} + +impl TryFrom for ResolvedSettings { + type Error = anyhow::Error; + + fn try_from(value: Settings) -> Result { + Ok(Self { + url: Url::parse(value.url.as_ref().context("url property is required")?)?, + broadcast: value + .broadcast + .as_ref() + .context("broadcast property is required")? + .clone(), + tls_disable_verify: value.tls_disable_verify, + }) + } +} + +#[derive(Debug)] +struct SessionHandle { + sender: mpsc::UnboundedSender, + join: tokio::task::JoinHandle<()>, +} + +impl SessionHandle { + fn stop(self) { + let _ = self.sender.send(ControlMessage::Shutdown); + RUNTIME.spawn(async move { + if let Err(err) = self.join.await { + gst::warning!(CAT, "session task ended with error: {err:?}"); + } + }); + } } struct PadState { @@ -31,17 +84,35 @@ struct PadState { reference_pts: Option, } -struct State { - _session: moq_lite::Session, +struct RuntimeState { + #[allow(dead_code)] + session: moq_lite::Session, broadcast: moq_lite::BroadcastProducer, catalog: moq_mux::CatalogProducer, pads: HashMap, } +#[derive(Debug)] +enum ControlMessage { + SetCaps { + pad_name: String, + caps: gst::Caps, + }, + Buffer { + pad_name: String, + data: Bytes, + pts: Option, + }, + DropPad { + pad_name: String, + }, + Shutdown, +} + #[derive(Default)] pub struct MoqSink { settings: Mutex, - state: Mutex>, + session: Mutex>, } #[glib::object_subclass] @@ -53,7 +124,7 @@ impl ObjectSubclass for MoqSink { impl ObjectImpl for MoqSink { fn properties() -> &'static [glib::ParamSpec] { - static PROPERTIES: LazyLock> = LazyLock::new(|| { + static PROPS: LazyLock> = LazyLock::new(|| { vec![ glib::ParamSpecString::builder("url") .nick("Source URL") @@ -70,28 +141,26 @@ impl ObjectImpl for MoqSink { .build(), ] }); - PROPERTIES.as_ref() + PROPS.as_ref() } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let mut settings = self.settings.lock().unwrap(); - match pspec.name() { "url" => settings.url = value.get().unwrap(), "broadcast" => settings.broadcast = value.get().unwrap(), "tls-disable-verify" => settings.tls_disable_verify = value.get().unwrap(), - _ => unimplemented!(), + _ => unreachable!(), } } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); - match pspec.name() { "url" => settings.url.to_value(), "broadcast" => settings.broadcast.to_value(), "tls-disable-verify" => settings.tls_disable_verify.to_value(), - _ => unimplemented!(), + _ => unreachable!(), } } } @@ -102,20 +171,18 @@ impl ElementImpl for MoqSink { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { gst::subclass::ElementMetadata::new( - "MoQ Sink", + "MoQ Sink (async)", "Sink/Network/MoQ", - "Transmits media over the network via MoQ", - "Luke Curley ", + "Transmits media over MoQ", + "Luke Curley , Steve McFarlin ", ) }); - Some(&*ELEMENT_METADATA) } fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { let mut caps = gst::Caps::new_empty(); - // Video caps.merge( gst::Caps::builder("video/x-h264") .field("stream-format", "byte-stream") @@ -129,7 +196,6 @@ impl ElementImpl for MoqSink { .build(), ); caps.merge(gst::Caps::builder("video/x-av1").build()); - // Audio caps.merge( gst::Caps::builder("audio/mpeg") .field("mpegversion", 4i32) @@ -140,7 +206,6 @@ impl ElementImpl for MoqSink { let templ = gst::PadTemplate::new("sink_%u", gst::PadDirection::Sink, gst::PadPresence::Request, &caps).unwrap(); - vec![templ] }); PAD_TEMPLATES.as_ref() @@ -152,24 +217,24 @@ impl ElementImpl for MoqSink { name: Option<&str>, _caps: Option<&gst::Caps>, ) -> Option { - let builder = gst::Pad::builder_from_template(templ) + let pad_builder = gst::Pad::builder_from_template(templ) .chain_function(|pad, parent, buffer| { let element = parent .and_then(|p| p.downcast_ref::()) .ok_or(gst::FlowError::Error)?; - element.imp().sink_chain(pad, buffer) + element.imp().forward_buffer(pad, buffer) }) .event_function(|pad, parent, event| { let Some(element) = parent.and_then(|p| p.downcast_ref::()) else { return false; }; - element.imp().sink_event(pad, event) + element.imp().forward_event(pad, event) }); let pad = if let Some(name) = name { - builder.name(name).build() + pad_builder.name(name).build() } else { - builder.build() + pad_builder.build() }; self.obj().add_pad(&pad).ok()?; @@ -177,9 +242,10 @@ impl ElementImpl for MoqSink { } fn release_pad(&self, pad: &gst::Pad) { - let pad_name = pad.name().to_string(); - if let Some(ref mut state) = *self.state.lock().unwrap() { - state.pads.remove(&pad_name); + if let Some(session) = self.session.lock().unwrap().as_ref() { + let _ = session.sender.send(ControlMessage::DropPad { + pad_name: pad.name().to_string(), + }); } let _ = self.obj().remove_pad(pad); } @@ -187,15 +253,12 @@ impl ElementImpl for MoqSink { fn change_state(&self, transition: gst::StateChange) -> Result { match transition { gst::StateChange::ReadyToPaused => { - let _guard = RUNTIME.enter(); - self.setup().map_err(|e| { - gst::error!(CAT, obj = self.obj(), "Failed to setup: {:?}", e); + self.start_session().map_err(|err| { + gst::error!(CAT, obj = self.obj(), "failed to start session: {err:#}"); gst::StateChangeError })?; } - gst::StateChange::PausedToReady => { - *self.state.lock().unwrap() = None; - } + gst::StateChange::PausedToReady => self.stop_session(), _ => (), } @@ -204,156 +267,201 @@ impl ElementImpl for MoqSink { } impl MoqSink { - fn setup(&self) -> anyhow::Result<()> { - let settings = self.settings.lock().unwrap(); - - let url = settings.url.as_ref().expect("url is required"); - let url = Url::parse(url).context("invalid URL")?; - let name = settings.broadcast.as_ref().expect("broadcast is required").clone(); - - let mut config = moq_native::ClientConfig::default(); - config.tls.disable_verify = Some(settings.tls_disable_verify); - - drop(settings); - - let origin = moq_lite::Origin::produce(); - let mut broadcast = moq_lite::Broadcast::produce(); - let broadcast_consumer = broadcast.consume(); - let catalog = moq_mux::CatalogProducer::new(&mut broadcast)?; + fn start_session(&self) -> Result<()> { + let settings = { + let settings = self.settings.lock().unwrap().clone(); + ResolvedSettings::try_from(settings)? + }; - origin.publish_broadcast(&name, broadcast_consumer); + let (tx, rx) = mpsc::unbounded_channel::(); + let join = RUNTIME.spawn(async move { + if let Err(err) = run_session(settings, rx).await { + gst::error!(CAT, "session error: {err:#}"); + } + }); - let client = config.init()?.with_publish(origin.consume()); + *self.session.lock().unwrap() = Some(SessionHandle { sender: tx, join }); + Ok(()) + } - RUNTIME.block_on(async { - let session = client.connect(url).await.context("failed to connect")?; + fn stop_session(&self) { + if let Some(handle) = self.session.lock().unwrap().take() { + handle.stop(); + } + } - *self.state.lock().unwrap() = Some(State { - _session: session, - broadcast, - catalog, - pads: HashMap::new(), - }); + fn forward_buffer(&self, pad: &gst::Pad, buffer: gst::Buffer) -> Result { + let sender = self + .session + .lock() + .unwrap() + .as_ref() + .map(|handle| handle.sender.clone()) + .ok_or(gst::FlowError::Flushing)?; + + let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + let pts = buffer.pts(); + let data = Bytes::copy_from_slice(map.as_slice()); + + sender + .send(ControlMessage::Buffer { + pad_name: pad.name().to_string(), + data, + pts, + }) + .map_err(|_| gst::FlowError::Flushing)?; - anyhow::Ok(()) - }) + Ok(gst::FlowSuccess::Ok) } - fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { + fn forward_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { match event.view() { - gst::EventView::Caps(caps_event) => { - let caps = caps_event.caps(); - if let Err(e) = self.handle_caps(pad, caps) { - gst::error!(CAT, obj = pad, "Failed to handle caps: {:?}", e); + gst::EventView::Caps(caps) => { + let sender = match self + .session + .lock() + .unwrap() + .as_ref() + .map(|handle| handle.sender.clone()) + { + Some(sender) => sender, + None => return false, + }; + + if sender + .send(ControlMessage::SetCaps { + pad_name: pad.name().to_string(), + caps: caps.caps().to_owned(), + }) + .is_err() + { return false; } + true } _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), } } +} - fn handle_caps(&self, pad: &gst::Pad, caps: &gst::CapsRef) -> anyhow::Result<()> { - let structure = caps.structure(0).context("empty caps")?; - let pad_name = pad.name().to_string(); - - let mut state = self.state.lock().unwrap(); - let state = state.as_mut().context("not connected")?; - - let decoder: moq_mux::import::Decoder = match structure.name().as_str() { - "video/x-h264" => { - let mut buf = bytes::Bytes::new(); - moq_mux::import::Decoder::new( - state.broadcast.clone(), - state.catalog.clone(), - moq_mux::import::DecoderFormat::Avc3, - &mut buf, - )? - } - "video/x-h265" => { - let mut buf = bytes::Bytes::new(); - moq_mux::import::Decoder::new( - state.broadcast.clone(), - state.catalog.clone(), - moq_mux::import::DecoderFormat::Hev1, - &mut buf, - )? - } - "video/x-av1" => { - let mut buf = bytes::Bytes::new(); - moq_mux::import::Decoder::new( - state.broadcast.clone(), - state.catalog.clone(), - moq_mux::import::DecoderFormat::Av01, - &mut buf, - )? - } - "audio/mpeg" => { - // aacparse provides AudioSpecificConfig as codec_data in caps - let codec_data = structure - .get::("codec_data") - .context("AAC caps missing codec_data")?; - let map = codec_data.map_readable().context("failed to map codec_data buffer")?; - let mut data = bytes::Bytes::copy_from_slice(map.as_slice()); - moq_mux::import::Decoder::new( - state.broadcast.clone(), - state.catalog.clone(), - moq_mux::import::DecoderFormat::Aac, - &mut data, - )? - } - "audio/x-opus" => { - let channels: i32 = structure.get("channels").unwrap_or(2); - let rate: i32 = structure.get("rate").unwrap_or(48000); - let config = moq_mux::import::OpusConfig { - sample_rate: rate as u32, - channel_count: channels as u32, - }; - moq_mux::import::Opus::new(state.broadcast.clone(), state.catalog.clone(), config)?.into() - } - other => anyhow::bail!("unsupported caps: {}", other), - }; +async fn run_session(settings: ResolvedSettings, mut rx: mpsc::UnboundedReceiver) -> Result<()> { + let mut client_config = moq_native::ClientConfig::default(); + client_config.tls.disable_verify = Some(settings.tls_disable_verify); - state.pads.insert( - pad_name.clone(), - PadState { - decoder, - reference_pts: None, - }, - ); + let client = client_config.init()?; - gst::info!(CAT, obj = pad, "Configured pad {}", pad_name); + let origin = moq_lite::Origin::produce(); + let mut broadcast = moq_lite::Broadcast::produce(); + let broadcast_consumer = broadcast.consume(); - Ok(()) - } + let catalog = moq_mux::CatalogProducer::new(&mut broadcast)?; - fn sink_chain(&self, pad: &gst::Pad, buffer: gst::Buffer) -> Result { - let _guard = RUNTIME.enter(); + anyhow::ensure!( + origin.publish_broadcast(&settings.broadcast, broadcast_consumer), + "failed to publish broadcast {}", + settings.broadcast + ); - let pad_name = pad.name(); - let mut state = self.state.lock().unwrap(); - let state = state.as_mut().ok_or(gst::FlowError::Error)?; + let client = client.with_publish(origin.consume()); + let session = client.connect(settings.url.clone()).await?; - let pad_state = state.pads.get_mut(pad_name.as_str()).ok_or_else(|| { - gst::error!(CAT, obj = pad, "Pad {} not configured", pad_name); - gst::FlowError::Error - })?; + let mut runtime = RuntimeState { + session, + broadcast, + catalog, + pads: HashMap::new(), + }; - // Compute relative PTS in microseconds - let pts = buffer.pts().and_then(|pts| { - let reference = *pad_state.reference_pts.get_or_insert(pts); - let relative = pts.checked_sub(reference)?; - hang::container::Timestamp::from_micros(relative.nseconds() / 1000).ok() - }); + while let Some(msg) = rx.recv().await { + match msg { + ControlMessage::SetCaps { pad_name, caps } => { + if let Err(err) = handle_caps(&mut runtime, pad_name, caps) { + gst::error!(CAT, "failed to configure pad: {err:#}"); + } + } + ControlMessage::Buffer { pad_name, data, pts } => { + if let Err(err) = handle_buffer(&mut runtime, pad_name, data, pts) { + gst::error!(CAT, "failed to publish buffer: {err:#}"); + } + } + ControlMessage::DropPad { pad_name } => { + runtime.pads.remove(&pad_name); + } + ControlMessage::Shutdown => break, + } + } - let data = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; - let mut bytes = bytes::Bytes::copy_from_slice(data.as_slice()); + Ok(()) +} - pad_state.decoder.decode_frame(&mut bytes, pts).map_err(|e| { - gst::error!(CAT, obj = pad, "Failed to decode: {}", e); - gst::FlowError::Error - })?; +fn handle_caps(runtime: &mut RuntimeState, pad_name: String, caps: gst::Caps) -> Result<()> { + let structure = caps.structure(0).context("empty caps")?; + let decoder: moq_mux::import::Decoder = match structure.name().as_str() { + "video/x-h264" => { + let mut bytes = Bytes::new(); + new_decoder(runtime, moq_mux::import::DecoderFormat::Avc3, &mut bytes)? + } + "video/x-h265" => { + let mut bytes = Bytes::new(); + new_decoder(runtime, moq_mux::import::DecoderFormat::Hev1, &mut bytes)? + } + "video/x-av1" => { + let mut bytes = Bytes::new(); + new_decoder(runtime, moq_mux::import::DecoderFormat::Av01, &mut bytes)? + } + "audio/mpeg" => { + let codec_data = structure + .get::("codec_data") + .context("AAC caps missing codec_data")?; + let map = codec_data.map_readable().context("failed to map codec_data")?; + let mut data = Bytes::copy_from_slice(map.as_slice()); + new_decoder(runtime, moq_mux::import::DecoderFormat::Aac, &mut data)? + } + "audio/x-opus" => { + let channels: i32 = structure.get("channels").unwrap_or(2); + let rate: i32 = structure.get("rate").unwrap_or(48_000); + let config = moq_mux::import::OpusConfig { + sample_rate: rate as u32, + channel_count: channels as u32, + }; + moq_mux::import::Opus::new(runtime.broadcast.clone(), runtime.catalog.clone(), config)?.into() + } + other => anyhow::bail!("unsupported caps: {}", other), + }; + + runtime.pads.insert( + pad_name, + PadState { + decoder, + reference_pts: None, + }, + ); + Ok(()) +} - Ok(gst::FlowSuccess::Ok) - } +fn new_decoder( + runtime: &mut RuntimeState, + format: moq_mux::import::DecoderFormat, + buf: &mut Bytes, +) -> Result { + let decoder = moq_mux::import::Decoder::new(runtime.broadcast.clone(), runtime.catalog.clone(), format, buf)?; + Ok(decoder) +} + +fn handle_buffer( + runtime: &mut RuntimeState, + pad_name: String, + mut data: Bytes, + pts: Option, +) -> Result<()> { + let pad = runtime.pads.get_mut(&pad_name).context("pad not configured")?; + + let ts = pts.and_then(|pts| { + let reference = *pad.reference_pts.get_or_insert(pts); + let relative = pts.checked_sub(reference)?; + hang::container::Timestamp::from_micros(relative.nseconds() / 1000).ok() + }); + + pad.decoder.decode_frame(&mut data, ts).map_err(|e| anyhow::anyhow!(e)) } diff --git a/src/source/imp.rs b/src/source/imp.rs index cb8b461..b34454f 100644 --- a/src/source/imp.rs +++ b/src/source/imp.rs @@ -1,53 +1,174 @@ -use anyhow::Context as _; +use std::collections::HashMap; +use std::sync::{LazyLock, Mutex}; +use std::time::Duration; + +use anyhow::{bail, Context, Result}; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; +use tokio::sync::{mpsc, oneshot, watch}; -use std::sync::LazyLock; -use std::sync::Mutex; +use hang::moq_lite; static CAT: LazyLock = LazyLock::new(|| gst::DebugCategory::new("moq-src", gst::DebugColorFlags::empty(), Some("MoQ Source Element"))); -pub static RUNTIME: LazyLock = LazyLock::new(|| { +static RUNTIME: LazyLock = LazyLock::new(|| { tokio::runtime::Builder::new_multi_thread() .enable_all() - .worker_threads(1) .build() - .unwrap() + .expect("spawn tokio runtime") }); -#[derive(Default, Clone)] +#[derive(Debug, Clone, Default)] struct Settings { - pub url: Option, - pub broadcast: Option, - pub tls_disable_verify: bool, + url: Option, + broadcast: Option, + tls_disable_verify: bool, +} + +#[derive(Debug, Clone)] +struct ResolvedSettings { + url: url::Url, + broadcast: String, + tls_disable_verify: bool, +} + +impl TryFrom for ResolvedSettings { + type Error = anyhow::Error; + + fn try_from(value: Settings) -> Result { + Ok(Self { + url: url::Url::parse(value.url.as_ref().context("url property is required")?)?, + broadcast: value + .broadcast + .as_ref() + .context("broadcast property is required")? + .clone(), + tls_disable_verify: value.tls_disable_verify, + }) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +enum TrackKind { + Video, + Audio, +} + +impl TrackKind { + fn template_name(&self) -> &'static str { + match self { + TrackKind::Video => "video_%u", + TrackKind::Audio => "audio_%u", + } + } +} + +#[derive(Debug, Clone)] +struct TrackDescriptor { + kind: TrackKind, + name: String, +} + +impl TrackDescriptor { + fn pad_name(&self) -> String { + match self.kind { + TrackKind::Video => format!("video_{}", self.name), + TrackKind::Audio => format!("audio_{}", self.name), + } + } +} + +#[derive(Debug)] +enum ControlMessage { + CreatePad { + descriptor: TrackDescriptor, + caps: gst::Caps, + reply: oneshot::Sender, + }, + NoMorePads, + ReportError(anyhow::Error), +} + +#[derive(Debug)] +enum PadMessage { + Buffer(gst::Buffer), + Eos, + Drop, +} + +#[derive(Debug, Clone)] +struct PadEndpoint { + sender: mpsc::UnboundedSender, +} + +impl PadEndpoint { + fn send(&self, msg: PadMessage) -> bool { + self.sender.send(msg).is_ok() + } +} + +struct PadHandle { + sender: mpsc::UnboundedSender, + task: glib::JoinHandle<()>, +} + +struct SessionController { + shutdown: watch::Sender, + join: tokio::task::JoinHandle<()>, +} + +impl SessionController { + fn start(settings: ResolvedSettings, control_tx: mpsc::UnboundedSender) -> Self { + let (shutdown_tx, mut shutdown_rx) = watch::channel(false); + let control_for_error = control_tx.clone(); + let join = RUNTIME.spawn(async move { + let result = run_session(settings, control_tx, &mut shutdown_rx).await; + if let Err(err) = result { + let _ = control_for_error.send(ControlMessage::ReportError(err)); + } + }); + + Self { + shutdown: shutdown_tx, + join, + } + } + + fn stop(self) { + let _ = self.shutdown.send(true); + RUNTIME.spawn(async move { + if let Err(err) = self.join.await { + gst::warning!(CAT, "session task ended with error: {err:?}"); + } + }); + } } #[derive(Default)] pub struct MoqSrc { settings: Mutex, - session: Mutex>, - tasks: Mutex>>, + pads: Mutex>, + control_task: Mutex>>, + control_sender: Mutex>>, + session: Mutex>, } #[glib::object_subclass] impl ObjectSubclass for MoqSrc { const NAME: &'static str = "MoqSrc"; type Type = super::MoqSrc; - type ParentType = gst::Bin; + type ParentType = gst::Element; fn new() -> Self { Self::default() } } -impl GstObjectImpl for MoqSrc {} -impl BinImpl for MoqSrc {} - impl ObjectImpl for MoqSrc { fn properties() -> &'static [glib::ParamSpec] { - static PROPERTIES: LazyLock> = LazyLock::new(|| { + static PROPS: LazyLock> = LazyLock::new(|| { vec![ glib::ParamSpecString::builder("url") .nick("Source URL") @@ -55,352 +176,484 @@ impl ObjectImpl for MoqSrc { .build(), glib::ParamSpecString::builder("broadcast") .nick("Broadcast") - .blurb("The name of the broadcast to consume") + .blurb("The broadcast name to subscribe to") .build(), glib::ParamSpecBoolean::builder("tls-disable-verify") - .nick("TLS disable verify") - .blurb("Disable TLS verification") + .nick("TLS Disable Verify") + .blurb("Disable TLS certificate verification") .default_value(false) .build(), ] }); - PROPERTIES.as_ref() + PROPS.as_ref() } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let mut settings = self.settings.lock().unwrap(); - match pspec.name() { "url" => settings.url = value.get().unwrap(), "broadcast" => settings.broadcast = value.get().unwrap(), "tls-disable-verify" => settings.tls_disable_verify = value.get().unwrap(), - _ => unimplemented!(), + _ => unreachable!(), } } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); - match pspec.name() { "url" => settings.url.to_value(), "broadcast" => settings.broadcast.to_value(), "tls-disable-verify" => settings.tls_disable_verify.to_value(), - _ => unimplemented!(), + _ => unreachable!(), } } } +impl GstObjectImpl for MoqSrc {} impl ElementImpl for MoqSrc { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { - static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { + static META: LazyLock = LazyLock::new(|| { gst::subclass::ElementMetadata::new( "MoQ Src", "Source/Network/MoQ", "Receives media over the network via MoQ", - "Luke Curley ", + "Luke Curley , Steve McFarlin ", ) }); - - Some(&*ELEMENT_METADATA) + Some(&*META) } fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { - let video = gst::PadTemplate::new( - "video_%u", - gst::PadDirection::Src, - gst::PadPresence::Sometimes, - &gst::Caps::new_any(), - ) - .unwrap(); - - let audio = gst::PadTemplate::new( - "audio_%u", - gst::PadDirection::Src, - gst::PadPresence::Sometimes, - &gst::Caps::new_any(), - ) - .unwrap(); - - vec![video, audio] + vec![ + gst::PadTemplate::new( + "video_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::new_any(), + ) + .unwrap(), + gst::PadTemplate::new( + "audio_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::new_any(), + ) + .unwrap(), + ] }); - PAD_TEMPLATES.as_ref() } fn change_state(&self, transition: gst::StateChange) -> Result { match transition { gst::StateChange::ReadyToPaused => { - if let Err(e) = RUNTIME.block_on(self.setup()) { - gst::error!(CAT, obj = self.obj(), "Failed to setup: {:?}", e); + if let Err(err) = self.start_session() { + gst::error!(CAT, obj = self.obj(), "failed to start session: {err:?}"); return Err(gst::StateChangeError); } - // Chain up first to let the bin handle the state change - let result = self.parent_change_state(transition); - result?; - // This is a live source - no preroll needed - return Ok(gst::StateChangeSuccess::NoPreroll); + let success = self.parent_change_state(transition)?; + let result = match success { + gst::StateChangeSuccess::Async => gst::StateChangeSuccess::Async, + _ => gst::StateChangeSuccess::NoPreroll, + }; + Ok(result) } - gst::StateChange::PausedToReady => { - // Cleanup publisher - self.cleanup(); + self.stop_session(); + self.parent_change_state(transition) } - - _ => (), + _ => self.parent_change_state(transition), } - - // Chain up for other transitions - self.parent_change_state(transition) } } impl MoqSrc { - async fn setup(&self) -> anyhow::Result<()> { - let (client, url, name, origin_consumer) = { - let settings = self.settings.lock().unwrap(); - let url = url::Url::parse(settings.url.as_ref().expect("url is required"))?; - let name = settings.broadcast.as_ref().expect("broadcast is required").clone(); - - let mut config = moq_native::ClientConfig::default(); - config.tls.disable_verify = Some(settings.tls_disable_verify); - - let origin = moq_lite::Origin::produce(); - let origin_consumer = origin.consume(); - - let client = config.init()?.with_consume(origin); - - (client, url, name, origin_consumer) + fn start_session(&self) -> Result<()> { + let settings = { + let settings = self.settings.lock().unwrap().clone(); + ResolvedSettings::try_from(settings)? }; - let session = client.connect(url).await?; - *self.session.lock().unwrap() = Some(session); - - let broadcast = origin_consumer - .consume_broadcast(&name) - .ok_or_else(|| anyhow::anyhow!("Broadcast '{}' not found", name))?; - - let catalog = broadcast.subscribe_track(&hang::Catalog::default_track())?; - let mut catalog = hang::CatalogConsumer::new(catalog); - - // TODO handle catalog updates - let catalog = catalog.next().await?.context("no catalog found")?.clone(); - - { - for (track_name, config) in catalog.video.renditions { - let track_ref = moq_lite::Track::new(&track_name); - let track_consumer = broadcast.subscribe_track(&track_ref)?; - let mut track = - hang::container::OrderedConsumer::new(track_consumer, std::time::Duration::from_secs(1)); - - let caps = match config.codec { - hang::catalog::VideoCodec::H264(_) => { - let builder = gst::Caps::builder("video/x-h264") - //.field("width", video.resolution.width) - //.field("height", video.resolution.height) - .field("alignment", "au"); - - if let Some(description) = config.description { - builder - .field("stream-format", "avc") - .field("codec_data", gst::Buffer::from_slice(description.clone())) - .build() - } else { - builder.field("stream-format", "annexb").build() - } - } - _ => unimplemented!(), - }; - - gst::info!(CAT, "caps: {:?}", caps); - - let templ = self.obj().element_class().pad_template("video_%u").unwrap(); + let (control_tx, control_rx) = mpsc::unbounded_channel(); + let obj = self.obj(); + let weak = obj.downgrade(); + let context = glib::MainContext::default(); + let control_task = spawn_main_context_forwarder(&context, control_rx, move |msg| { + if let Some(obj) = weak.upgrade() { + obj.imp().handle_control_message(msg); + true + } else { + false + } + }); - let srcpad = gst::Pad::builder_from_template(&templ).name(&track_name).build(); - srcpad.set_active(true).unwrap(); + *self.control_task.lock().unwrap() = Some(control_task); + *self.control_sender.lock().unwrap() = Some(control_tx.clone()); - let stream_start = gst::event::StreamStart::builder(&track_name) - .group_id(gst::GroupId::next()) - .build(); - srcpad.push_event(stream_start); + let session = SessionController::start(settings, control_tx); + *self.session.lock().unwrap() = Some(session); + Ok(()) + } - let caps_evt = gst::event::Caps::new(&caps); - srcpad.push_event(caps_evt); + fn stop_session(&self) { + if let Some(session) = self.session.lock().unwrap().take() { + session.stop(); + } - let segment = gst::event::Segment::new(&gst::FormattedSegment::::new()); - srcpad.push_event(segment); + if let Some(control_task) = self.control_task.lock().unwrap().take() { + control_task.abort(); + } - self.obj().add_pad(&srcpad).expect("Failed to add pad"); + let handles = self.pads.lock().unwrap().drain().collect::>(); + for (name, handle) in handles { + gst::debug!(CAT, "dropping pad {name}"); + let _ = handle.sender.send(PadMessage::Drop); + handle.task.abort(); + } - // Push to the srcpad in a background task. - let mut reference = None; - let handle = tokio::spawn(async move { - loop { - match track.read().await { - Ok(Some(frame)) => { - let payload: Vec = frame.payload.into_iter().flatten().collect(); - let mut buffer = gst::Buffer::from_slice(payload); - let buffer_mut = buffer.get_mut().unwrap(); + *self.control_sender.lock().unwrap() = None; + } - // Make timestamps relative to the first frame for proper playback - let pts = if let Some(reference_ts) = reference { - let timestamp: std::time::Duration = (frame.timestamp - reference_ts).into(); - gst::ClockTime::from_nseconds(timestamp.as_nanos() as _) - } else { - reference = Some(frame.timestamp); - gst::ClockTime::ZERO - }; - buffer_mut.set_pts(Some(pts)); + fn handle_control_message(&self, msg: ControlMessage) { + match msg { + ControlMessage::CreatePad { + descriptor, + caps, + reply, + } => { + if let Err(err) = self.create_pad(descriptor, caps, reply) { + gst::error!(CAT, obj = self.obj(), "failed to create pad: {err:?}"); + } + } + ControlMessage::NoMorePads => { + self.obj().no_more_pads(); + } + ControlMessage::ReportError(err) => { + gst::element_error!(self.obj(), gst::CoreError::Failed, ("session error"), ["{err:?}"]); + } + } + } - let mut flags = buffer_mut.flags(); - // First frame in each group is a keyframe - match frame.index == 0 { - true => flags.remove(gst::BufferFlags::DELTA_UNIT), - false => flags.insert(gst::BufferFlags::DELTA_UNIT), - }; + fn create_pad( + &self, + descriptor: TrackDescriptor, + caps: gst::Caps, + reply: oneshot::Sender, + ) -> Result<()> { + let obj = self.obj(); + let templ = obj + .element_class() + .pad_template(descriptor.kind.template_name()) + .context("missing pad template")?; + + let pad = gst::Pad::builder_from_template(&templ) + .name(descriptor.pad_name()) + .build(); + + pad.set_active(true)?; + + let stream_start = gst::event::StreamStart::builder(&descriptor.name) + .group_id(gst::GroupId::next()) + .build(); + pad.push_event(stream_start); + pad.push_event(gst::event::Caps::new(&caps)); + pad.push_event(gst::event::Segment::new(&gst::FormattedSegment::::new())); + + obj.add_pad(&pad)?; + + let (pad_tx, pad_rx) = mpsc::unbounded_channel(); + let pad_clone = pad.clone(); + let weak = obj.downgrade(); + let context = glib::MainContext::default(); + let task = spawn_main_context_forwarder(&context, pad_rx, move |msg| { + if let Some(obj) = weak.upgrade() { + let imp = obj.imp(); + imp.dispatch_pad_message(&pad_clone, msg) + } else { + false + } + }); - buffer_mut.set_flags(flags); + self.pads.lock().unwrap().insert( + descriptor.pad_name(), + PadHandle { + sender: pad_tx.clone(), + task, + }, + ); - gst::info!(CAT, "pushing sample: {:?}", buffer); + let _ = reply.send(PadEndpoint { sender: pad_tx }); + Ok(()) + } - if let Err(err) = srcpad.push(buffer) { - gst::warning!(CAT, "Failed to push sample: {:?}", err); - } - } - Ok(None) => { - // Stream ended normally - gst::info!(CAT, "Stream ended normally"); - break; - } - Err(e) => { - // Handle connection errors gracefully - gst::warning!(CAT, "Failed to read frame: {:?}", e); - break; - } - } - } - }); - self.tasks.lock().unwrap().push(handle); + fn dispatch_pad_message(&self, pad: &gst::Pad, msg: PadMessage) -> bool { + match msg { + PadMessage::Buffer(buffer) => { + if let Err(err) = pad.push(buffer) { + gst::warning!(CAT, "failed to push buffer: {err:?}"); + return false; + } + true + } + PadMessage::Eos => { + pad.push_event(gst::event::Eos::builder().build()); + true + } + PadMessage::Drop => { + let _ = pad.set_active(false); + let _ = self.obj().remove_pad(pad); + false } } + } +} - { - for (track_name, config) in catalog.audio.renditions { - let track_ref = moq_lite::Track::new(&track_name); - let track_consumer = broadcast.subscribe_track(&track_ref)?; - let mut track = - hang::container::OrderedConsumer::new(track_consumer, std::time::Duration::from_secs(1)); - - let caps = match &config.codec { - hang::catalog::AudioCodec::AAC(_aac) => { - let builder = gst::Caps::builder("audio/mpeg") - .field("mpegversion", 4) - .field("channels", config.channel_count) - .field("rate", config.sample_rate); - - if let Some(description) = config.description { - builder - .field("codec_data", gst::Buffer::from_slice(description.clone())) - .field("stream-format", "aac") - .build() - } else { - builder.field("stream-format", "adts").build() - } - } - hang::catalog::AudioCodec::Opus => { - let builder = gst::Caps::builder("audio/x-opus") - .field("rate", config.sample_rate) - .field("channels", config.channel_count); - - if let Some(description) = config.description { - builder - .field("codec_data", gst::Buffer::from_slice(description.clone())) - .field("stream-format", "ogg") - .build() - } else { - builder.field("stream-format", "opus").build() - } - } - _ => unimplemented!(), - }; +async fn run_session( + settings: ResolvedSettings, + control_tx: mpsc::UnboundedSender, + shutdown: &mut watch::Receiver, +) -> Result<()> { + let mut config = moq_native::ClientConfig::default(); + config.tls.disable_verify = Some(settings.tls_disable_verify); - gst::info!(CAT, "caps: {:?}", caps); + let origin = moq_lite::Origin::produce(); + let origin_consumer = origin.consume(); + let client = config.init()?.with_consume(origin); - let templ = self.obj().element_class().pad_template("audio_%u").unwrap(); + let _session = client.connect(settings.url.clone()).await?; - let srcpad = gst::Pad::builder_from_template(&templ).name(&track_name).build(); - srcpad.set_active(true).unwrap(); + let broadcast = origin_consumer + .consume_broadcast(&settings.broadcast) + .ok_or_else(|| anyhow::anyhow!("Broadcast '{}' not found", settings.broadcast))?; - let stream_start = gst::event::StreamStart::builder(&track_name) - .group_id(gst::GroupId::next()) - .build(); - srcpad.push_event(stream_start); + let catalog_track = broadcast.subscribe_track(&hang::catalog::Catalog::default_track())?; + let mut catalog = hang::catalog::CatalogConsumer::new(catalog_track); + let catalog = catalog.next().await?.context("catalog missing")?.clone(); - let caps_evt = gst::event::Caps::new(&caps); - srcpad.push_event(caps_evt); + let mut tasks = Vec::new(); - let segment = gst::event::Segment::new(&gst::FormattedSegment::::new()); - srcpad.push_event(segment); + for (track_name, config) in catalog.video.renditions { + let descriptor = TrackDescriptor { + kind: TrackKind::Video, + name: track_name.clone(), + }; + let caps = video_caps(&config)?; + let endpoint = request_pad(&control_tx, descriptor.clone(), caps).await?; + let track_ref = moq_lite::Track::new(&track_name); + let track_consumer = broadcast.subscribe_track(&track_ref)?; + let track = hang::container::OrderedConsumer::new(track_consumer, Duration::from_secs(1)); + tasks.push(spawn_track_pump(track, descriptor, endpoint, shutdown.clone())); + } - self.obj().add_pad(&srcpad).expect("Failed to add pad"); + for (track_name, config) in catalog.audio.renditions { + let descriptor = TrackDescriptor { + kind: TrackKind::Audio, + name: track_name.clone(), + }; + let caps = audio_caps(&config)?; + let endpoint = request_pad(&control_tx, descriptor.clone(), caps).await?; + let track_ref = moq_lite::Track::new(&track_name); + let track_consumer = broadcast.subscribe_track(&track_ref)?; + let track = hang::container::OrderedConsumer::new(track_consumer, Duration::from_secs(1)); + tasks.push(spawn_track_pump(track, descriptor, endpoint, shutdown.clone())); + } - // Push to the srcpad in a background task. - let mut reference = None; - let handle = tokio::spawn(async move { - loop { - match track.read().await { - Ok(Some(frame)) => { - let payload: Vec = frame.payload.into_iter().flatten().collect(); - let mut buffer = gst::Buffer::from_slice(payload); - let buffer_mut = buffer.get_mut().unwrap(); + let _ = control_tx.send(ControlMessage::NoMorePads); - // Make timestamps relative to the first frame for proper playback - let pts = if let Some(reference_ts) = reference { - let timestamp: std::time::Duration = (frame.timestamp - reference_ts).into(); - gst::ClockTime::from_nseconds(timestamp.as_nanos() as _) - } else { - reference = Some(frame.timestamp); - gst::ClockTime::ZERO - }; - buffer_mut.set_pts(Some(pts)); + for task in tasks { + let _ = task.await; + } - let mut flags = buffer_mut.flags(); - flags.remove(gst::BufferFlags::DELTA_UNIT); - buffer_mut.set_flags(flags); + Ok(()) +} - gst::info!(CAT, "pushing sample: {:?}", buffer); +async fn request_pad( + control_tx: &mpsc::UnboundedSender, + descriptor: TrackDescriptor, + caps: gst::Caps, +) -> Result { + let (reply_tx, reply_rx) = oneshot::channel(); + control_tx + .send(ControlMessage::CreatePad { + descriptor, + caps, + reply: reply_tx, + }) + .map_err(|_| anyhow::anyhow!("control plane shut down"))?; + + let endpoint = reply_rx.await.context("pad creation cancelled")?; + Ok(endpoint) +} - if let Err(err) = srcpad.push(buffer) { - gst::warning!(CAT, "Failed to push sample: {:?}", err); - } +fn spawn_track_pump( + track: hang::container::OrderedConsumer, + descriptor: TrackDescriptor, + pad_endpoint: PadEndpoint, + shutdown: watch::Receiver, +) -> tokio::task::JoinHandle<()> { + RUNTIME.spawn(run_track_pump(track, descriptor, pad_endpoint, shutdown)) +} + +async fn run_track_pump( + mut track: hang::container::OrderedConsumer, + descriptor: TrackDescriptor, + pad_endpoint: PadEndpoint, + mut shutdown: watch::Receiver, +) { + let mut reference_ts = None; + loop { + tokio::select! { + _ = shutdown.changed() => { + pad_endpoint.send(PadMessage::Drop); + break; + } + frame = track.read() => { + match frame { + Ok(Some(frame)) => { + let timestamp = frame.timestamp; + let is_keyframe = frame.is_keyframe(); + let payload = frame.payload; + let mut buffer = gst::Buffer::from_slice(payload.into_iter().flatten().collect::>()); + let buffer_mut = buffer.get_mut().unwrap(); + + let pts = match reference_ts { + Some(reference) => { + let delta: Duration = (timestamp - reference).into(); + gst::ClockTime::from_nseconds(delta.as_nanos() as u64) } - Ok(None) => { - // Stream ended normally - gst::info!(CAT, "Stream ended normally"); - break; + None => { + reference_ts = Some(timestamp); + gst::ClockTime::ZERO } - Err(e) => { - // Handle connection errors gracefully - gst::warning!(CAT, "Failed to read frame: {:?}", e); - break; + }; + buffer_mut.set_pts(Some(pts)); + + let mut flags = buffer_mut.flags(); + match descriptor.kind { + TrackKind::Video => { + if is_keyframe { + flags.remove(gst::BufferFlags::DELTA_UNIT); + } else { + flags.insert(gst::BufferFlags::DELTA_UNIT); + } + } + TrackKind::Audio => { + flags.remove(gst::BufferFlags::DELTA_UNIT); } } + buffer_mut.set_flags(flags); + + if !pad_endpoint.send(PadMessage::Buffer(buffer)) { + break; + } + } + Ok(None) => { + pad_endpoint.send(PadMessage::Eos); + pad_endpoint.send(PadMessage::Drop); + break; } - }); - self.tasks.lock().unwrap().push(handle); + Err(err) => { + gst::warning!(CAT, "track {} failed: {err:?}", descriptor.name); + pad_endpoint.send(PadMessage::Drop); + break; + } + } } } + } +} - // We downloaded the catalog and created all the pads. - self.obj().no_more_pads(); +fn video_caps(config: &hang::catalog::VideoConfig) -> Result { + use hang::catalog::VideoCodec; + + let caps = match &config.codec { + VideoCodec::H264(_) => { + let mut builder = gst::Caps::builder("video/x-h264").field("alignment", "au"); + if let Some(description) = &config.description { + builder = builder + .field("stream-format", "avc") + .field("codec_data", gst::Buffer::from_slice(description.clone())); + } else { + builder = builder.field("stream-format", "annexb"); + } + builder.build() + } + VideoCodec::H265(h265) => { + let mut builder = gst::Caps::builder("video/x-h265").field("alignment", "au"); + match &config.description { + Some(description) => { + let format = if h265.in_band { "hev1" } else { "hvc1" }; + builder = builder + .field("stream-format", format) + .field("codec_data", gst::Buffer::from_slice(description.clone())); + } + None => { + let format = if h265.in_band { "hev1" } else { "byte-stream" }; + builder = builder.field("stream-format", format); + } + } + builder.build() + } + VideoCodec::AV1(_) => { + let mut builder = gst::Caps::builder("video/x-av1"); + if let Some(description) = &config.description { + builder = builder.field("codec_data", gst::Buffer::from_slice(description.clone())); + } + builder.build() + } + other => bail!("unsupported video codec: {other:?}"), + }; + Ok(caps) +} - Ok(()) - } +fn audio_caps(config: &hang::catalog::AudioConfig) -> Result { + let caps = match &config.codec { + hang::catalog::AudioCodec::AAC(_) => { + let mut builder = gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4) + .field("rate", config.sample_rate) + .field("channels", config.channel_count); + if let Some(description) = &config.description { + builder = builder + .field("codec_data", gst::Buffer::from_slice(description.clone())) + .field("stream-format", "aac"); + } else { + builder = builder.field("stream-format", "adts"); + } + builder.build() + } + hang::catalog::AudioCodec::Opus => { + let mut builder = gst::Caps::builder("audio/x-opus") + .field("rate", config.sample_rate) + .field("channels", config.channel_count); + if let Some(description) = &config.description { + builder = builder + .field("codec_data", gst::Buffer::from_slice(description.clone())) + .field("stream-format", "ogg"); + } + builder.build() + } + other => bail!("unsupported audio codec: {other:?}"), + }; + Ok(caps) +} - fn cleanup(&self) { - for task in self.tasks.lock().unwrap().drain(..) { - task.abort(); +fn spawn_main_context_forwarder( + context: &glib::MainContext, + mut rx: mpsc::UnboundedReceiver, + mut handler: F, +) -> glib::JoinHandle<()> +where + T: Send + 'static, + F: FnMut(T) -> bool + 'static, +{ + let ctx = context.clone(); + ctx.spawn_local(async move { + while let Some(msg) = rx.recv().await { + if !handler(msg) { + break; + } } - *self.session.lock().unwrap() = None; - } + }) } diff --git a/src/source/mod.rs b/src/source/mod.rs index b41daca..f19d1b5 100644 --- a/src/source/mod.rs +++ b/src/source/mod.rs @@ -4,7 +4,7 @@ use gst::prelude::*; mod imp; glib::wrapper! { - pub struct MoqSrc(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; + pub struct MoqSrc(ObjectSubclass) @extends gst::Element, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {