Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 230 additions & 0 deletions lib/ex_webrtc/rtp/av1/depayloader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
defmodule ExWebRTC.RTP.Depayloader.AV1 do
@moduledoc false
# Reassembles AV1 video temporal units from RTP packets.
#
# Resources:
# * [RTP Payload Format for AV1 (av1-rtp-spec)](https://aomediacodec.github.io/av1-rtp-spec/v1.0.0.html)
# * [AV1 spec](https://aomediacodec.github.io/av1-spec/av1-spec.pdf).
# * https://norkin.org/research/av1_decoder_model/index.html
# * https://chromium.googlesource.com/external/webrtc/+/HEAD/modules/rtp_rtcp/source/video_rtp_depacketizer_av1.cc

@behaviour ExWebRTC.RTP.Depayloader.Behaviour

require Logger

alias ExWebRTC.RTP.AV1.{OBU, Payload}

@type t :: %__MODULE__{
current_temporal_unit: [OBU.t()],
current_obu: binary() | nil,
current_timestamp: ExRTP.Packet.uint32() | nil
}

defstruct current_temporal_unit: [], current_obu: nil, current_timestamp: nil

@impl true
def new do
%__MODULE__{}
end

@impl true
def depayload(depayloader, packet)

def depayload(depayloader, %ExRTP.Packet{payload: <<>>, padding: true}), do: {nil, depayloader}

def depayload(depayloader, packet) do
case Payload.parse(packet.payload) do
{:ok, av1_payload} ->
do_depayload(depayloader, packet, av1_payload)

{:error, reason} ->
Logger.warning("""
Couldn't parse payload, reason: #{reason}. \
Resetting depayloader state. Payload: #{inspect(packet.payload)}.\
""")

{:ok, %__MODULE__{}}
end
end

defp do_depayload(depayloader, packet, %Payload{z: z, y: y} = av1_payload) do
{obus, current_obu_fragment, next_obu_fragment} =
av1_payload
|> Payload.depayload_obu_elements()
|> parse_obu_elements(z, y)

# TODO: handle marker, or not (?)
# av1-rtp-spec sec. 4.2.: It is possible for a receiver to receive the last packet of a temporal unit
# without the marker bit being set equal to 1, and a receiver should be able to handle this case.
# at the moment, we're looking at the timestamps only, and it seems to work
#
# TODO: handle the case where depayloader.current_timestamp > packet.timestamp
new_temporal_unit? = depayloader.current_timestamp != packet.timestamp

{depayloader, obus, next_obu_fragment} =
depayloader
|> update_current_obu(current_obu_fragment, new_temporal_unit?)
|> maybe_flush_current_obu(obus, next_obu_fragment, y)

if new_temporal_unit? do
{temporal_unit, depayloader} = flush_temporal_unit(depayloader)

{temporal_unit,
update_temporal_unit(depayloader, obus, next_obu_fragment, packet.timestamp)}
else
{nil, update_temporal_unit(depayloader, obus, next_obu_fragment, packet.timestamp)}
end
end

defp parse_obu_elements(obu_elements, z, y)

defp parse_obu_elements([], _, _) do
# TODO: decide where to use debug logs and where warnings
Logger.debug("AV1 payload contains no valid OBU elements. Dropping packet.")
{[], nil, nil}
end

defp parse_obu_elements(obus, 0, 0) do
{obus, nil, nil}
end

# Last OBU element is an OBU fragment that will be continued
defp parse_obu_elements(obu_elements, 0, 1) do
{next_obu_fragment, obus} = List.pop_at(obu_elements, -1)
{obus, nil, next_obu_fragment}
end

# First OBU element is an OBU fragment, a continuation of the current OBU
defp parse_obu_elements([current_obu_fragment | obus], 1, 0) do
{obus, current_obu_fragment, nil}
end

# Both. If packet contained exactly 1 OBU fragment, we store it as current_obu_fragment only
defp parse_obu_elements([current_obu_fragment | obu_elements], 1, 1) do
{next_obu_fragment, obus} = List.pop_at(obu_elements, -1)
{obus, current_obu_fragment, next_obu_fragment}
end

defp update_current_obu(depayloader, current_obu_fragment, new_temporal_unit?)

defp update_current_obu(depayloader, current_obu_fragment, true) do
if depayloader.current_obu != nil do
Logger.debug(
"Received packet with timestamp from a new temporal unit without finishing the previous OBU. Dropping previous OBU."
)
end

if current_obu_fragment != nil do
Logger.debug(
"Received middle OBU fragment from a new temporal unit without beginning the OBU. Dropping this OBU fragment."
)
end

%{depayloader | current_obu: nil}
end

defp update_current_obu(%__MODULE__{current_obu: nil} = depayloader, nil, false) do
depayloader
end

defp update_current_obu(depayloader, nil, false) do
Logger.debug(
"Received start of new OBU without finishing the previous OBU. Dropping previous OBU."
)

%{depayloader | current_obu: nil}
end

defp update_current_obu(%__MODULE__{current_obu: nil} = depayloader, _obu_fragment, false) do
Logger.debug(
"Received middle OBU fragment without beginning the OBU. Dropping this OBU fragment."
)

depayloader
end

defp update_current_obu(%__MODULE__{current_obu: obu} = depayloader, obu_fragment, false) do
%{depayloader | current_obu: obu <> obu_fragment}
end

# current_obu is nil, nothing to flush
defp maybe_flush_current_obu(
%__MODULE__{current_obu: nil} = depayloader,
obus,
next_obu_fragment,
_y
) do
{depayloader, obus, next_obu_fragment}
end

# Packet contained exactly 1 OBU fragment, current_obu will be continued. Do not flush
# TODO: make sure `nil` is correct here

Check warning on line 161 in lib/ex_webrtc/rtp/av1/depayloader.ex

View workflow job for this annotation

GitHub Actions / CI on OTP 27 / Elixir 1.17

Found a TODO tag in a comment: # TODO: make sure `nil` is correct here
defp maybe_flush_current_obu(%__MODULE__{current_obu: incomplete_obu} = depayloader, [], nil, 1) do
{depayloader, [], incomplete_obu}
end

# Otherwise, flush
defp maybe_flush_current_obu(
%__MODULE__{current_obu: obu} = depayloader,
obus,
next_obu_fragment,
_y
) do
{depayloader, [obu | obus], next_obu_fragment}
end

defp update_temporal_unit(
%__MODULE__{current_temporal_unit: tu} = depayloader,
obus,
next_obu_fragment,
timestamp
) do
%{
depayloader
| current_obu: next_obu_fragment,
current_temporal_unit: append_obus(obus, tu),
current_timestamp: timestamp
}
end

defp flush_temporal_unit(%__MODULE__{current_temporal_unit: tu}) when tu != [] do
# Force s=1 for the low overhead bitstream format
tu_binary =
tu
|> Stream.map(&%OBU{&1 | s: 1})
|> Stream.map(&OBU.serialize/1)
|> Enum.reverse()
|> :erlang.iolist_to_binary()

# TODO: is it possible that `current_obu != nil` here?
{OBU.temporal_delimiter() <> tu_binary, %__MODULE__{}}
end

defp flush_temporal_unit(depayloader) do
Logger.debug("Previous temporal unit is empty, nothing to flush")
{nil, depayloader}
end

defp append_obus([], tu), do: tu

defp append_obus([obu_binary | rest], tu) do
with {:ok, obu, rest_of_binary} <- OBU.parse(obu_binary),
true <- OBU.should_be_transmitted?(obu) do
if rest_of_binary != <<>>,
do:
Logger.debug(
"OBU binary contains additional data after the decoded OBU, dropping the additional data"
)

append_obus(rest, [obu | tu])
else
{:error, :invalid_av1_bitstream} ->
Logger.debug("Unable to parse OBU from binary data, dropping")
append_obus(rest, tu)

false ->
Logger.debug("Dropping temporal delimiter/tile list OBU")
append_obus(rest, tu)
end
end
end
24 changes: 22 additions & 2 deletions lib/ex_webrtc/rtp/av1/obu.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule ExWebRTC.RTP.AV1.OBU do

@obu_sequence_header 1
@obu_temporal_delimiter 2
@obu_tile_list 8
@obu_padding 15

@type t :: %__MODULE__{
Expand Down Expand Up @@ -72,9 +73,13 @@ defmodule ExWebRTC.RTP.AV1.OBU do

defp parse_extension_header(_, _), do: {:error, :invalid_av1_bitstream}

defp parse_payload(0, rest), do: {:ok, rest, <<>>}
# This can be reused to parse OBU elements from RTP packet payloads
@spec parse_payload(0 | 1, binary()) ::
{:ok, binary(), binary()} | {:error, :invalid_av1_bitstream}
def parse_payload(s, payload)
def parse_payload(0, rest), do: {:ok, rest, <<>>}

defp parse_payload(1, rest) do
def parse_payload(1, rest) do
with {:ok, leb128_size, payload_size} <- LEB128.read(rest),
<<_::binary-size(leb128_size), payload::binary-size(payload_size), rest::binary>> <- rest do
{:ok, payload, rest}
Expand Down Expand Up @@ -141,4 +146,19 @@ defmodule ExWebRTC.RTP.AV1.OBU do
end

def disable_dropping_in_decoder_if_applicable(obu), do: obu

@spec temporal_delimiter :: binary()
def temporal_delimiter,
do: %__MODULE__{type: @obu_temporal_delimiter, x: 0, s: 1, payload: <<>>} |> serialize()

@doc """
Determines whether the OBU should be removed when transmitting, and must be ignored when receiving
in accordance with av1-rtp-spec sec. 5.
"""
@spec should_be_transmitted?(t()) :: boolean()
def should_be_transmitted?(obu)

def should_be_transmitted?(%__MODULE__{type: @obu_temporal_delimiter}), do: false
def should_be_transmitted?(%__MODULE__{type: @obu_tile_list}), do: false
def should_be_transmitted?(_obu), do: true
end
49 changes: 44 additions & 5 deletions lib/ex_webrtc/rtp/av1/payload.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,31 @@ defmodule ExWebRTC.RTP.AV1.Payload do
#
# Based on [RTP Payload Format for AV1](https://aomediacodec.github.io/av1-rtp-spec/v1.0.0.html).
#
# RTP payload syntax:
# RTP payload syntax (OBU element = OBU, or a fragment of an OBU):
# 0 1 2 3 4 5 6 7
# +-+-+-+-+-+-+-+-+
# |Z|Y| W |N|-|-|-| (REQUIRED)
# +=+=+=+=+=+=+=+=+ (REPEATED W-1 times, or any times if W = 0)
# |1| |
# +-+ OBU fragment|
# +-+ OBU element |
# |1| | (REQUIRED, leb128 encoded)
# +-+ size |
# |0| |
# +-+-+-+-+-+-+-+-+
# | OBU fragment |
# | OBU element |
# | ... |
# +=+=+=+=+=+=+=+=+
# | ... |
# +=+=+=+=+=+=+=+=+ if W > 0, last fragment MUST NOT have size field
# | OBU fragment |
# +=+=+=+=+=+=+=+=+ if W > 0, last element MUST NOT have size field
# | OBU element |
# | ... |
# +=+=+=+=+=+=+=+=+

require Logger

alias ExWebRTC.RTP.AV1.OBU
alias ExWebRTC.Utils

@type t :: %__MODULE__{
z: 0 | 1,
y: 0 | 1,
Expand Down Expand Up @@ -93,4 +98,38 @@ defmodule ExWebRTC.RTP.AV1.Payload do

[first_obu_payload | next_obu_payloads]
end

@doc """
Depayloads OBU elements from the RTP AV1 packet payload.
"""
@spec depayload_obu_elements(t()) :: [binary()]
def depayload_obu_elements(%__MODULE__{w: w, payload: payload}) do
element_count = if w > 0, do: w, else: -1
parse_obu_elements(payload, element_count)
end

defp parse_obu_elements(data, count, obu_elements \\ [])

defp parse_obu_elements(<<>>, count, obu_elements) do
if count > 0,
do: Logger.debug("Invalid AV1 RTP aggregation header: expected #{count} more OBU elements.")

Enum.reverse(obu_elements)
end

defp parse_obu_elements(data, count, obu_elements) do
s = Utils.to_int(count != 1)

case OBU.parse_payload(s, data) do
{:ok, obu_element, rest} ->
parse_obu_elements(rest, count - 1, [obu_element | obu_elements])

{:error, :invalid_av1_bitstream} ->
Logger.warning("""
Unable to parse OBU element from invalid AV1 bitstream. Dropping rest of AV1 RTP payload.\
""")

parse_obu_elements(<<>>, count, obu_elements)
end
end
end
Loading
Loading