Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions rs/moq-lite/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,9 @@ impl Client {
parameters,
};

// TODO pretty print the parameters.
tracing::trace!(?client, "sending client setup");
stream.writer.encode(&client).await?;

let mut server: setup::Server = stream.reader.decode().await?;
tracing::trace!(?server, "received server setup");

let version = supported
.iter()
Expand Down
2 changes: 0 additions & 2 deletions rs/moq-lite/src/ietf/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ impl<S: web_transport_trait::Session> ControlStreamAdapter<S> {
};

let size: u16 = reader.decode::<u16>().await?;
tracing::trace!(type_id, size, "adapter: reading control message");

let body = reader.read_exact(size as usize).await?;

Expand All @@ -417,7 +416,6 @@ impl<S: web_transport_trait::Session> ControlStreamAdapter<S> {

// Classify and route
let route = self.classify(type_id, &body)?;
tracing::trace!(?route, "adapter: classified message");

match route {
Route::NewRequest(request_id) => {
Expand Down
7 changes: 5 additions & 2 deletions rs/moq-lite/src/ietf/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ pub struct GroupHeader {

impl Encode<Version> for GroupHeader {
fn encode<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
tracing::trace!(?self, "encoding group header");
self.flags.encode()?.encode(w, version)?;
self.track_alias.encode(w, version)?;
self.group_id.encode(w, version)?;
Expand Down Expand Up @@ -194,13 +195,15 @@ impl Decode<Version> for GroupHeader {
128 // Default priority when absent
};

Ok(Self {
let result = Self {
track_alias,
group_id,
sub_group_id,
publisher_priority,
flags,
})
};
tracing::trace!(?result, "decoded group header");
Ok(result)
}
}

Expand Down
40 changes: 35 additions & 5 deletions rs/moq-lite/src/ietf/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub trait Message: Sized + std::fmt::Debug {

impl<T: Message> Encode<Version> for T {
fn encode<W: BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
tracing::trace!(?self, "encoding");
let mut sizer = Sizer::default();
self.encode_msg(&mut sizer, version)?;
let size: u16 = sizer.size.try_into().map_err(|_| EncodeError::TooLarge)?;
Expand All @@ -30,11 +31,40 @@ impl<T: Message> Encode<Version> for T {
impl<T: Message> Decode<Version> for T {
fn decode<B: Buf>(buf: &mut B, version: Version) -> Result<Self, DecodeError> {
let size = u16::decode(buf, version)? as usize;
let mut limited = buf.take(size);
let result = Self::decode_msg(&mut limited, version)?;
if limited.remaining() > 0 {
return Err(DecodeError::Long);

if tracing::enabled!(tracing::Level::TRACE) {
if buf.remaining() < size {
return Err(DecodeError::Short);
}
let raw = buf.copy_to_bytes(size);
let mut slice = &raw[..];
match Self::decode_msg(&mut slice, version) {
Ok(result) => {
if slice.remaining() > 0 {
return Err(DecodeError::Long);
}
tracing::trace!(?result, "decoded");
Ok(result)
}
Err(e) => {
tracing::warn!(%e, ?raw, "decode failed");
Err(e)
}
}
} else {
let mut limited = buf.take(size);
match Self::decode_msg(&mut limited, version) {
Ok(result) => {
if limited.remaining() > 0 {
return Err(DecodeError::Long);
}
Ok(result)
}
Err(e) => {
tracing::warn!(%e, "decode failed");
Err(e)
}
}
}
Ok(result)
}
}
2 changes: 0 additions & 2 deletions rs/moq-lite/src/ietf/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,6 @@ impl<S: web_transport_trait::Session> Publisher<S> {

stream.encode(&msg).await?;

tracing::trace!(?msg, "sending group header");

loop {
let frame = tokio::select! {
biased;
Expand Down
7 changes: 0 additions & 7 deletions rs/moq-lite/src/ietf/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,6 @@ impl<S: web_transport_trait::Session> Subscriber<S> {

pub async fn recv_group(&mut self, stream: &mut Reader<S::RecvStream, Version>) -> Result<(), Error> {
let group: ietf::GroupHeader = stream.decode().await?;
tracing::trace!(?group, "received group header");

if group.sub_group_id != 0 {
tracing::warn!(sub_group_id = %group.sub_group_id, "subgroup ID is not supported, dropping stream");
Expand Down Expand Up @@ -693,15 +692,13 @@ impl<S: web_transport_trait::Session> Subscriber<S> {

match res {
Err(Error::Cancel) => {
tracing::trace!(group = %producer.info.sequence, "group cancelled");
let _ = producer.abort(Error::Cancel);
}
Err(err) => {
tracing::debug!(%err, group = %producer.info.sequence, "group error");
let _ = producer.abort(err);
}
_ => {
tracing::trace!(group = %producer.info.sequence, "group complete");
let _ = producer.finish();
}
}
Expand Down Expand Up @@ -759,16 +756,12 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
) -> Result<(), Error> {
let mut remain = frame.info.size;

tracing::trace!(size = %frame.info.size, "reading frame");

while remain > 0 {
let chunk = stream.read(remain as usize).await?.ok_or(Error::WrongSize)?;
remain = remain.checked_sub(chunk.len() as u64).ok_or(Error::WrongSize)?;
frame.write(chunk)?;
}

tracing::trace!(size = %frame.info.size, "read frame");

Ok(())
}
}
40 changes: 35 additions & 5 deletions rs/moq-lite/src/lite/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub trait Message: Sized + std::fmt::Debug {

impl<T: Message> Encode<Version> for T {
fn encode<W: BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
tracing::trace!(?self, "encoding");
let mut sizer = Sizer::default();
self.encode_msg(&mut sizer, version)?;
sizer.size.encode(w, version)?;
Expand All @@ -27,11 +28,40 @@ impl<T: Message> Encode<Version> for T {
impl<T: Message> Decode<Version> for T {
fn decode<B: Buf>(buf: &mut B, version: Version) -> Result<Self, DecodeError> {
let size = usize::decode(buf, version)?;
let mut limited = buf.take(size);
let result = Self::decode_msg(&mut limited, version)?;
if limited.remaining() > 0 {
return Err(DecodeError::Long);

if tracing::enabled!(tracing::Level::TRACE) {
if buf.remaining() < size {
return Err(DecodeError::Short);
}
let raw = buf.copy_to_bytes(size);
let mut slice = &raw[..];
match Self::decode_msg(&mut slice, version) {
Ok(result) => {
if slice.remaining() > 0 {
return Err(DecodeError::Long);
}
tracing::trace!(?result, "decoded");
Ok(result)
}
Err(e) => {
tracing::warn!(%e, ?raw, "decode failed");
Err(e)
}
}
} else {
let mut limited = buf.take(size);
match Self::decode_msg(&mut limited, version) {
Ok(result) => {
if limited.remaining() > 0 {
return Err(DecodeError::Long);
}
Ok(result)
}
Err(e) => {
tracing::warn!(%e, "decode failed");
Err(e)
}
}
}
Ok(result)
}
}
9 changes: 0 additions & 9 deletions rs/moq-lite/src/lite/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ impl<S: web_transport_trait::Session> Publisher<S> {
let interest = stream.reader.decode::<lite::AnnouncePlease>().await?;
let prefix = interest.prefix.to_owned();

// For logging, show the full path that we're announcing.
tracing::trace!(root = %self.origin.absolute(&prefix), "announcing start");

let mut origin = self
.origin
.consume_only(&[prefix.as_path()])
Expand All @@ -144,8 +141,6 @@ impl<S: web_transport_trait::Session> Publisher<S> {
}

stream.writer.abort(&err);
} else {
tracing::trace!(prefix = %origin.absolute(prefix), "announcing complete");
}
});

Expand Down Expand Up @@ -359,8 +354,6 @@ impl<S: web_transport_trait::Session> Publisher<S> {
None => break,
};

tracing::trace!(size = %frame.info.size, "writing frame");

stream.encode(&frame.info.size).await?;

loop {
Expand All @@ -380,8 +373,6 @@ impl<S: web_transport_trait::Session> Publisher<S> {
None => break,
}
}

tracing::trace!(size = %frame.info.size, "wrote frame");
}

stream.finish()?;
Expand Down
8 changes: 0 additions & 8 deletions rs/moq-lite/src/lite/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
let mut stream = Stream::open(&self.session, self.version).await?;
stream.writer.encode(&lite::ControlType::Announce).await?;

tracing::trace!(root = %self.log_path(""), "announced start");

// Ask for everything.
// TODO This should actually ask for each root.
let msg = lite::AnnouncePlease { prefix: "".into() };
Expand Down Expand Up @@ -268,15 +266,13 @@ impl<S: web_transport_trait::Session> Subscriber<S> {

match res {
Err(Error::Cancel) => {
tracing::trace!(group = %group.info.sequence, "group cancelled");
let _ = group.abort(Error::Cancel);
}
Err(err) => {
tracing::debug!(%err, group = %group.info.sequence, "group error");
let _ = group.abort(err);
}
_ => {
tracing::trace!(group = %group.info.sequence, "group complete");
let _ = group.finish();
}
}
Expand Down Expand Up @@ -310,8 +306,6 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
) -> Result<(), Error> {
let mut remain = frame.info.size;

tracing::trace!(size = %frame.info.size, "reading frame");

const MAX_CHUNK: usize = 1024 * 1024; // 1 MiB
while remain > 0 {
let chunk = stream
Expand All @@ -322,8 +316,6 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
frame.write(chunk)?;
}

tracing::trace!(size = %frame.info.size, "read frame");

Ok(())
}

Expand Down
2 changes: 0 additions & 2 deletions rs/moq-lite/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ impl Server {
let mut stream = Stream::accept(&session, encoding).await?;

let mut client: setup::Client = stream.reader.decode().await?;
tracing::trace!(?client, "received client setup");

// Choose the version to use
let version = client
Expand All @@ -132,7 +131,6 @@ impl Server {
version: version.into(),
parameters,
};
tracing::trace!(?server, "sending server setup");
stream.writer.encode(&server).await?;

match version {
Expand Down
Loading