diff --git a/lib/ex_webrtc/rtp/av1/depayloader.ex b/lib/ex_webrtc/rtp/av1/depayloader.ex new file mode 100644 index 00000000..4b434955 --- /dev/null +++ b/lib/ex_webrtc/rtp/av1/depayloader.ex @@ -0,0 +1,230 @@ +defmodule ExWebRTC.RTP.Depayloader.AV1 do + @moduledoc false + # Reassembles AV1 video temporal units from RTP packets. + # + # Resources: + # * [RTP Payload Format for AV1 (av1-rtp-spec)](https://aomediacodec.github.io/av1-rtp-spec/v1.0.0.html) + # * [AV1 spec](https://aomediacodec.github.io/av1-spec/av1-spec.pdf). + # * https://norkin.org/research/av1_decoder_model/index.html + # * https://chromium.googlesource.com/external/webrtc/+/HEAD/modules/rtp_rtcp/source/video_rtp_depacketizer_av1.cc + + @behaviour ExWebRTC.RTP.Depayloader.Behaviour + + require Logger + + alias ExWebRTC.RTP.AV1.{OBU, Payload} + + @type t :: %__MODULE__{ + current_temporal_unit: [OBU.t()], + current_obu: binary() | nil, + current_timestamp: ExRTP.Packet.uint32() | nil + } + + defstruct current_temporal_unit: [], current_obu: nil, current_timestamp: nil + + @impl true + def new do + %__MODULE__{} + end + + @impl true + def depayload(depayloader, packet) + + def depayload(depayloader, %ExRTP.Packet{payload: <<>>, padding: true}), do: {nil, depayloader} + + def depayload(depayloader, packet) do + case Payload.parse(packet.payload) do + {:ok, av1_payload} -> + do_depayload(depayloader, packet, av1_payload) + + {:error, reason} -> + Logger.warning(""" + Couldn't parse payload, reason: #{reason}. \ + Resetting depayloader state. Payload: #{inspect(packet.payload)}.\ + """) + + {:ok, %__MODULE__{}} + end + end + + defp do_depayload(depayloader, packet, %Payload{z: z, y: y} = av1_payload) do + {obus, current_obu_fragment, next_obu_fragment} = + av1_payload + |> Payload.depayload_obu_elements() + |> parse_obu_elements(z, y) + + # TODO: handle marker, or not (?) + # av1-rtp-spec sec. 4.2.: It is possible for a receiver to receive the last packet of a temporal unit + # without the marker bit being set equal to 1, and a receiver should be able to handle this case. + # at the moment, we're looking at the timestamps only, and it seems to work + # + # TODO: handle the case where depayloader.current_timestamp > packet.timestamp + new_temporal_unit? = depayloader.current_timestamp != packet.timestamp + + {depayloader, obus, next_obu_fragment} = + depayloader + |> update_current_obu(current_obu_fragment, new_temporal_unit?) + |> maybe_flush_current_obu(obus, next_obu_fragment, y) + + if new_temporal_unit? do + {temporal_unit, depayloader} = flush_temporal_unit(depayloader) + + {temporal_unit, + update_temporal_unit(depayloader, obus, next_obu_fragment, packet.timestamp)} + else + {nil, update_temporal_unit(depayloader, obus, next_obu_fragment, packet.timestamp)} + end + end + + defp parse_obu_elements(obu_elements, z, y) + + defp parse_obu_elements([], _, _) do + # TODO: decide where to use debug logs and where warnings + Logger.debug("AV1 payload contains no valid OBU elements. Dropping packet.") + {[], nil, nil} + end + + defp parse_obu_elements(obus, 0, 0) do + {obus, nil, nil} + end + + # Last OBU element is an OBU fragment that will be continued + defp parse_obu_elements(obu_elements, 0, 1) do + {next_obu_fragment, obus} = List.pop_at(obu_elements, -1) + {obus, nil, next_obu_fragment} + end + + # First OBU element is an OBU fragment, a continuation of the current OBU + defp parse_obu_elements([current_obu_fragment | obus], 1, 0) do + {obus, current_obu_fragment, nil} + end + + # Both. If packet contained exactly 1 OBU fragment, we store it as current_obu_fragment only + defp parse_obu_elements([current_obu_fragment | obu_elements], 1, 1) do + {next_obu_fragment, obus} = List.pop_at(obu_elements, -1) + {obus, current_obu_fragment, next_obu_fragment} + end + + defp update_current_obu(depayloader, current_obu_fragment, new_temporal_unit?) + + defp update_current_obu(depayloader, current_obu_fragment, true) do + if depayloader.current_obu != nil do + Logger.debug( + "Received packet with timestamp from a new temporal unit without finishing the previous OBU. Dropping previous OBU." + ) + end + + if current_obu_fragment != nil do + Logger.debug( + "Received middle OBU fragment from a new temporal unit without beginning the OBU. Dropping this OBU fragment." + ) + end + + %{depayloader | current_obu: nil} + end + + defp update_current_obu(%__MODULE__{current_obu: nil} = depayloader, nil, false) do + depayloader + end + + defp update_current_obu(depayloader, nil, false) do + Logger.debug( + "Received start of new OBU without finishing the previous OBU. Dropping previous OBU." + ) + + %{depayloader | current_obu: nil} + end + + defp update_current_obu(%__MODULE__{current_obu: nil} = depayloader, _obu_fragment, false) do + Logger.debug( + "Received middle OBU fragment without beginning the OBU. Dropping this OBU fragment." + ) + + depayloader + end + + defp update_current_obu(%__MODULE__{current_obu: obu} = depayloader, obu_fragment, false) do + %{depayloader | current_obu: obu <> obu_fragment} + end + + # current_obu is nil, nothing to flush + defp maybe_flush_current_obu( + %__MODULE__{current_obu: nil} = depayloader, + obus, + next_obu_fragment, + _y + ) do + {depayloader, obus, next_obu_fragment} + end + + # Packet contained exactly 1 OBU fragment, current_obu will be continued. Do not flush + # TODO: make sure `nil` is correct here + defp maybe_flush_current_obu(%__MODULE__{current_obu: incomplete_obu} = depayloader, [], nil, 1) do + {depayloader, [], incomplete_obu} + end + + # Otherwise, flush + defp maybe_flush_current_obu( + %__MODULE__{current_obu: obu} = depayloader, + obus, + next_obu_fragment, + _y + ) do + {depayloader, [obu | obus], next_obu_fragment} + end + + defp update_temporal_unit( + %__MODULE__{current_temporal_unit: tu} = depayloader, + obus, + next_obu_fragment, + timestamp + ) do + %{ + depayloader + | current_obu: next_obu_fragment, + current_temporal_unit: append_obus(obus, tu), + current_timestamp: timestamp + } + end + + defp flush_temporal_unit(%__MODULE__{current_temporal_unit: tu}) when tu != [] do + # Force s=1 for the low overhead bitstream format + tu_binary = + tu + |> Stream.map(&%OBU{&1 | s: 1}) + |> Stream.map(&OBU.serialize/1) + |> Enum.reverse() + |> :erlang.iolist_to_binary() + + # TODO: is it possible that `current_obu != nil` here? + {OBU.temporal_delimiter() <> tu_binary, %__MODULE__{}} + end + + defp flush_temporal_unit(depayloader) do + Logger.debug("Previous temporal unit is empty, nothing to flush") + {nil, depayloader} + end + + defp append_obus([], tu), do: tu + + defp append_obus([obu_binary | rest], tu) do + with {:ok, obu, rest_of_binary} <- OBU.parse(obu_binary), + true <- OBU.should_be_transmitted?(obu) do + if rest_of_binary != <<>>, + do: + Logger.debug( + "OBU binary contains additional data after the decoded OBU, dropping the additional data" + ) + + append_obus(rest, [obu | tu]) + else + {:error, :invalid_av1_bitstream} -> + Logger.debug("Unable to parse OBU from binary data, dropping") + append_obus(rest, tu) + + false -> + Logger.debug("Dropping temporal delimiter/tile list OBU") + append_obus(rest, tu) + end + end +end diff --git a/lib/ex_webrtc/rtp/av1/obu.ex b/lib/ex_webrtc/rtp/av1/obu.ex index 037ef570..3de7b1b9 100644 --- a/lib/ex_webrtc/rtp/av1/obu.ex +++ b/lib/ex_webrtc/rtp/av1/obu.ex @@ -24,6 +24,7 @@ defmodule ExWebRTC.RTP.AV1.OBU do @obu_sequence_header 1 @obu_temporal_delimiter 2 + @obu_tile_list 8 @obu_padding 15 @type t :: %__MODULE__{ @@ -72,9 +73,13 @@ defmodule ExWebRTC.RTP.AV1.OBU do defp parse_extension_header(_, _), do: {:error, :invalid_av1_bitstream} - defp parse_payload(0, rest), do: {:ok, rest, <<>>} + # This can be reused to parse OBU elements from RTP packet payloads + @spec parse_payload(0 | 1, binary()) :: + {:ok, binary(), binary()} | {:error, :invalid_av1_bitstream} + def parse_payload(s, payload) + def parse_payload(0, rest), do: {:ok, rest, <<>>} - defp parse_payload(1, rest) do + def parse_payload(1, rest) do with {:ok, leb128_size, payload_size} <- LEB128.read(rest), <<_::binary-size(leb128_size), payload::binary-size(payload_size), rest::binary>> <- rest do {:ok, payload, rest} @@ -141,4 +146,19 @@ defmodule ExWebRTC.RTP.AV1.OBU do end def disable_dropping_in_decoder_if_applicable(obu), do: obu + + @spec temporal_delimiter :: binary() + def temporal_delimiter, + do: %__MODULE__{type: @obu_temporal_delimiter, x: 0, s: 1, payload: <<>>} |> serialize() + + @doc """ + Determines whether the OBU should be removed when transmitting, and must be ignored when receiving + in accordance with av1-rtp-spec sec. 5. + """ + @spec should_be_transmitted?(t()) :: boolean() + def should_be_transmitted?(obu) + + def should_be_transmitted?(%__MODULE__{type: @obu_temporal_delimiter}), do: false + def should_be_transmitted?(%__MODULE__{type: @obu_tile_list}), do: false + def should_be_transmitted?(_obu), do: true end diff --git a/lib/ex_webrtc/rtp/av1/payload.ex b/lib/ex_webrtc/rtp/av1/payload.ex index af19867f..4498adc1 100644 --- a/lib/ex_webrtc/rtp/av1/payload.ex +++ b/lib/ex_webrtc/rtp/av1/payload.ex @@ -4,26 +4,31 @@ defmodule ExWebRTC.RTP.AV1.Payload do # # Based on [RTP Payload Format for AV1](https://aomediacodec.github.io/av1-rtp-spec/v1.0.0.html). # - # RTP payload syntax: + # RTP payload syntax (OBU element = OBU, or a fragment of an OBU): # 0 1 2 3 4 5 6 7 # +-+-+-+-+-+-+-+-+ # |Z|Y| W |N|-|-|-| (REQUIRED) # +=+=+=+=+=+=+=+=+ (REPEATED W-1 times, or any times if W = 0) # |1| | - # +-+ OBU fragment| + # +-+ OBU element | # |1| | (REQUIRED, leb128 encoded) # +-+ size | # |0| | # +-+-+-+-+-+-+-+-+ - # | OBU fragment | + # | OBU element | # | ... | # +=+=+=+=+=+=+=+=+ # | ... | - # +=+=+=+=+=+=+=+=+ if W > 0, last fragment MUST NOT have size field - # | OBU fragment | + # +=+=+=+=+=+=+=+=+ if W > 0, last element MUST NOT have size field + # | OBU element | # | ... | # +=+=+=+=+=+=+=+=+ + require Logger + + alias ExWebRTC.RTP.AV1.OBU + alias ExWebRTC.Utils + @type t :: %__MODULE__{ z: 0 | 1, y: 0 | 1, @@ -93,4 +98,38 @@ defmodule ExWebRTC.RTP.AV1.Payload do [first_obu_payload | next_obu_payloads] end + + @doc """ + Depayloads OBU elements from the RTP AV1 packet payload. + """ + @spec depayload_obu_elements(t()) :: [binary()] + def depayload_obu_elements(%__MODULE__{w: w, payload: payload}) do + element_count = if w > 0, do: w, else: -1 + parse_obu_elements(payload, element_count) + end + + defp parse_obu_elements(data, count, obu_elements \\ []) + + defp parse_obu_elements(<<>>, count, obu_elements) do + if count > 0, + do: Logger.debug("Invalid AV1 RTP aggregation header: expected #{count} more OBU elements.") + + Enum.reverse(obu_elements) + end + + defp parse_obu_elements(data, count, obu_elements) do + s = Utils.to_int(count != 1) + + case OBU.parse_payload(s, data) do + {:ok, obu_element, rest} -> + parse_obu_elements(rest, count - 1, [obu_element | obu_elements]) + + {:error, :invalid_av1_bitstream} -> + Logger.warning(""" + Unable to parse OBU element from invalid AV1 bitstream. Dropping rest of AV1 RTP payload.\ + """) + + parse_obu_elements(<<>>, count, obu_elements) + end + end end diff --git a/lib/ex_webrtc/rtp/av1/payloader.ex b/lib/ex_webrtc/rtp/av1/payloader.ex index e386180d..329d45a5 100644 --- a/lib/ex_webrtc/rtp/av1/payloader.ex +++ b/lib/ex_webrtc/rtp/av1/payloader.ex @@ -10,11 +10,12 @@ defmodule ExWebRTC.RTP.Payloader.AV1 do @behaviour ExWebRTC.RTP.Payloader.Behaviour + require Logger + alias ExWebRTC.RTP.AV1.{OBU, Payload} alias ExWebRTC.Utils @obu_sequence_header 1 - @obu_temporal_delimiter 2 @aggregation_header_size_bytes 1 @@ -34,20 +35,14 @@ defmodule ExWebRTC.RTP.Payloader.AV1 do def payload(payloader, temporal_unit) when temporal_unit != <<>> do # In AV1, a temporal unit consists of all OBUs associated with a specific time instant. # Temporal units always start with a temporal delimiter OBU. They may contain multiple AV1 frames. - # av1-rtp-spec sec. 5: The temporal delimiter OBU should be removed when transmitting. - obus = - case parse_obus(temporal_unit) do - [%OBU{type: @obu_temporal_delimiter} | next_obus] -> - next_obus - - _ -> - raise "Invalid AV1 temporal unit: does not start with temporal delimiter OBU" - end # With the current implementation, each RTP packet will contain one OBU element. # This element can be an entire OBU, or a fragment of an OBU bigger than max_payload_size. rtp_packets = - Stream.flat_map(obus, fn obu -> + temporal_unit + |> parse_obus() + |> Stream.filter(&OBU.should_be_transmitted?/1) + |> Stream.flat_map(fn obu -> n_bit = Utils.to_int(obu.type == @obu_sequence_header) obu @@ -72,7 +67,11 @@ defmodule ExWebRTC.RTP.Payloader.AV1 do parse_obus(rest, [obu | obus]) {:error, :invalid_av1_bitstream} -> - raise "Invalid AV1 bitstream: unable to parse OBU" + Logger.warning(""" + Unable to parse OBU from invalid AV1 bitstream. Dropping rest of temporal unit.\ + """) + + parse_obus(<<>>, obus) end end end diff --git a/lib/ex_webrtc/rtp/depayloader.ex b/lib/ex_webrtc/rtp/depayloader.ex index de92e4dc..4c332f16 100644 --- a/lib/ex_webrtc/rtp/depayloader.ex +++ b/lib/ex_webrtc/rtp/depayloader.ex @@ -37,6 +37,7 @@ defmodule ExWebRTC.RTP.Depayloader do case String.downcase(mime_type) do "video/vp8" -> {:ok, ExWebRTC.RTP.Depayloader.VP8} "video/h264" -> {:ok, ExWebRTC.RTP.Depayloader.H264} + "video/av1" -> {:ok, ExWebRTC.RTP.Depayloader.AV1} "audio/opus" -> {:ok, ExWebRTC.RTP.Depayloader.Opus} "audio/pcma" -> {:ok, ExWebRTC.RTP.Depayloader.G711} "audio/pcmu" -> {:ok, ExWebRTC.RTP.Depayloader.G711} diff --git a/lib/ex_webrtc/rtp/vp8/depayloader.ex b/lib/ex_webrtc/rtp/vp8/depayloader.ex index 42b6b2ce..efc1e5ab 100644 --- a/lib/ex_webrtc/rtp/vp8/depayloader.ex +++ b/lib/ex_webrtc/rtp/vp8/depayloader.ex @@ -11,8 +11,8 @@ defmodule ExWebRTC.RTP.Depayloader.VP8 do alias ExWebRTC.RTP.VP8.Payload @type t() :: %__MODULE__{ - current_frame: nil, - current_timestamp: nil + current_frame: binary() | nil, + current_timestamp: ExRTP.Packet.uint32() | nil } defstruct [:current_frame, :current_timestamp] @@ -38,7 +38,7 @@ defmodule ExWebRTC.RTP.Depayloader.VP8 do Resetting depayloader state. Payload: #{inspect(packet.payload)}.\ """) - {:ok, %{depayloader | current_frame: nil, current_timestamp: nil}} + {nil, %{depayloader | current_frame: nil, current_timestamp: nil}} end end diff --git a/test/ex_webrtc/rtp/payloader_test.exs b/test/ex_webrtc/rtp/payloader_test.exs index 6fafec3c..9fe4617c 100644 --- a/test/ex_webrtc/rtp/payloader_test.exs +++ b/test/ex_webrtc/rtp/payloader_test.exs @@ -29,7 +29,8 @@ defmodule ExWebRTC.RTP.PayloaderTest do Payloader.AV1.payload(payloader, @av1_temporal_unit) # The sample frame is not a valid AV1 temporal unit - assert_raise RuntimeError, fn -> Payloader.payload(payloader, @frame) end + assert {[], _payloader} = Payloader.payload(payloader, @frame) + assert Payloader.payload(payloader, @frame) == Payloader.AV1.payload(payloader, @frame) end test "creates an Opus payloader and dispatches calls to its module" do