Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 122 additions & 54 deletions src/rtsp/rtsp.zig
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,86 @@ pub const Header = struct {
name: []const u8,
value: []const u8,

pub const Transport = struct {
proto: enum { tcp, udp } = .udp,
/// False means multicast
unicast: bool = true,
interleaved: ?struct { u8, u8 } = null,
client_port: ?struct { u16, u16 } = null,
server_port: ?struct { u16, u16 } = null,
mode: Method = .PLAY,

pub const Error = error{InvalidTransportHeader};

pub fn parse(header_value: []const u8) Transport.Error!Transport {
var it = std.mem.splitScalar(u8, header_value, ';');
var transport: Transport = .{};

const protocol = it.next().?;
if (std.mem.eql(u8, protocol, "RTP/AVP")) {
transport.proto = .udp;
} else if (std.mem.eql(u8, protocol, "RTP/AVP/UDP")) {
transport.proto = .udp;
} else if (std.mem.eql(u8, protocol, "RTP/AVP/TCP")) {
transport.proto = .tcp;
} else {
return error.InvalidTransportHeader;
}

while (it.next()) |parameter| {
if (std.mem.eql(u8, parameter, "unicast")) {
transport.unicast = true;
} else if (std.mem.eql(u8, parameter, "multicast")) {
transport.unicast = false;
} else if (std.mem.startsWith(u8, parameter, "interleaved=")) {
transport.interleaved = parseRange(u8, parameter[12..]) catch return error.InvalidTransportHeader;
} else if (std.mem.startsWith(u8, parameter, "client_port=")) {
transport.client_port = parseRange(u16, parameter[12..]) catch return error.InvalidTransportHeader;
} else if (std.mem.startsWith(u8, parameter, "server_port=")) {
transport.server_port = parseRange(u16, parameter[12..]) catch return error.InvalidTransportHeader;
} else if (std.mem.startsWith(u8, parameter, "mode=")) {
const method = std.mem.trim(u8, parameter[5..], "\"");
if (std.ascii.eqlIgnoreCase(method, "play"))
transport.mode = .PLAY
else if (std.ascii.eqlIgnoreCase(method, "record"))
transport.mode = .RECORD
else
return error.InvalidTransportHeader;
}
}

return transport;
}

pub fn write(self: *const Transport, writer: *std.Io.Writer) std.Io.Writer.Error!void {
try writer.writeAll(if (self.proto == .tcp) "RTP/AVP/TCP" else "RTP/AVP");
if (self.unicast) try writer.writeAll(";unicast") else try writer.writeAll(";multicast");

if (self.interleaved) |interleaved|
try writer.print(";interleaved={}-{}", .{ interleaved.@"0", interleaved.@"1" });

if (self.client_port) |client_port|
try writer.print(";client_port={}-{}", .{ client_port.@"0", client_port.@"1" });

if (self.server_port) |server_port|
try writer.print(";server_port={}-{}", .{ server_port.@"0", server_port.@"1" });

switch (self.mode) {
.RECORD => try writer.writeAll(";mode=record"),
else => {},
}
}

fn parseRange(T: type, value: []const u8) !struct { T, T } {
if (std.mem.cutScalar(u8, value, '-')) |range| {
const left, const right = range;
return .{ try std.fmt.parseInt(T, left, 10), try std.fmt.parseInt(T, right, 10) };
}

return error.ParseError;
}
};

pub fn parse(line: []const u8) Error!Header {
const colon_index = std.mem.indexOfScalar(u8, line, ':') orelse return error.ParseError;

Expand Down Expand Up @@ -125,67 +205,55 @@ pub const Header = struct {
try header.write(&writer);
try std.testing.expectEqualStrings("CSeq: 2\r\n", writer.buffer[0..writer.end]);
}
};

pub const TransportHeader = struct {
proto: enum { tcp, udp } = .udp,
/// False means multicast
unicast: bool = true,
interleaved: ?struct { u8, u8 } = null,

pub const TransportError = error{InvalidTransportHeader};

pub fn parse(header_value: []const u8) TransportError!TransportHeader {
var it = std.mem.splitScalar(u8, header_value, ';');
var transport: TransportHeader = .{};

const protocol = it.next().?;
if (std.mem.eql(u8, protocol, "RTP/AVP")) {
transport.proto = .udp;
} else if (std.mem.eql(u8, protocol, "RTP/AVP/UDP")) {
transport.proto = .udp;
} else if (std.mem.eql(u8, protocol, "RTP/AVP/TCP")) {
transport.proto = .tcp;
} else {
return error.InvalidTransportHeader;
test "Transport: parse" {
{
const transport = try Transport.parse("RTP/AVP/TCP;unicast;interleaved=0-1");
try std.testing.expect(transport.unicast);
try std.testing.expectEqual(.tcp, transport.proto);
try std.testing.expectEqual(.{ 0, 1 }, transport.interleaved);
}

while (it.next()) |parameter| {
if (std.mem.eql(u8, parameter, "unicast")) {
transport.unicast = true;
} else if (std.mem.eql(u8, parameter, "multicast")) {
transport.unicast = false;
} else if (std.mem.startsWith(u8, parameter, "interleaved=")) {
var interleaved_it = std.mem.splitScalar(u8, parameter[12..], '-');
transport.interleaved = .{
std.fmt.parseInt(u8, interleaved_it.next().?, 10) catch return error.InvalidTransportHeader,
std.fmt.parseInt(u8, interleaved_it.rest(), 10) catch return error.InvalidTransportHeader,
};
}
{
const transport = try Transport.parse("RTP/AVP/UDP;client_port=15000-15001;mode=\"recOrd\"");
try std.testing.expect(transport.unicast);
try std.testing.expectEqual(.udp, transport.proto);
try std.testing.expectEqual(null, transport.interleaved);
try std.testing.expectEqual(.{ 15000, 15001 }, transport.client_port);
try std.testing.expectEqual(.RECORD, transport.mode);
}

if (transport.proto == .tcp and transport.interleaved == null) return error.InvalidTransportHeader;

return transport;
}

pub fn write(self: *const TransportHeader, writer: *std.Io.Writer) std.Io.Writer.Error!void {
try writer.writeAll(if (self.proto == .tcp) "RTP/AVP/TCP" else "RTP/AVP");
if (self.unicast) {
try writer.writeAll(";unicast");
} else {
try writer.writeAll(";multicast");
}
if (self.interleaved) |interleaved| {
try writer.print(";interleaved={}-{}", .{ interleaved.@"0", interleaved.@"1" });
{
const transport = try Transport.parse("RTP/AVP;unicast;server_port=35000-35001");
try std.testing.expect(transport.unicast);
try std.testing.expectEqual(.udp, transport.proto);
try std.testing.expectEqual(null, transport.interleaved);
try std.testing.expectEqual(null, transport.client_port);
try std.testing.expectEqual(.{ 35000, 35001 }, transport.server_port);
try std.testing.expectEqual(.PLAY, transport.mode);
}
}

test "parse transport header" {
const transport = try parse("RTP/AVP/TCP;unicast;interleaved=0-1");
try std.testing.expect(transport.unicast);
try std.testing.expectEqual(.tcp, transport.proto);
try std.testing.expectEqual(.{ 0, 1 }, transport.interleaved);
test "Transport: write" {
var buffer: [1024]u8 = undefined;
var w = std.Io.Writer.fixed(&buffer);

var t = Header.Transport{
.proto = .udp,
.client_port = .{ 23900, 23901 },
};
try t.write(&w);
try std.testing.expectEqualStrings("RTP/AVP;unicast;client_port=23900-23901", w.buffered());

_ = w.consumeAll();
t = .{ .proto = .tcp, .interleaved = .{ 0, 1 }, .mode = .RECORD };
try t.write(&w);
try std.testing.expectEqualStrings("RTP/AVP/TCP;unicast;interleaved=0-1;mode=record", w.buffered());

_ = w.consumeAll();
t = .{ .proto = .udp, .server_port = .{ 10000, 10001 } };
try t.write(&w);
try std.testing.expectEqualStrings("RTP/AVP;unicast;server_port=10000-10001", w.buffered());
}
};

Expand Down Expand Up @@ -320,7 +388,7 @@ pub const Writer = struct {
try self.writer.print("{}\r\n", .{size});
}

pub fn writeTransportHeader(self: *Writer, header: TransportHeader) std.Io.Writer.Error!void {
pub fn writeTransportHeader(self: *Writer, header: Header.Transport) std.Io.Writer.Error!void {
try self.writer.writeAll("Transport: ");
try header.write(self.writer);
try self.writeLineFeed();
Expand Down
4 changes: 2 additions & 2 deletions src/rtsp/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub const Request = struct {
cseq: u32,
session: ?[]const u8,
authenticate: ?[]const u8,
transport: ?rtsp.TransportHeader,
transport: ?rtsp.Header.Transport,
content_length: u32,

pub const Error = error{
Expand All @@ -31,7 +31,7 @@ pub const Request = struct {
MissingSequenceHeader,
/// A request body is not expected for this METHOD.
BodyUnexpected,
} || rtsp.TransportHeader.TransportError;
} || rtsp.Header.Transport.Error;

pub fn parse(buffer: []const u8) !Head {
var it = std.mem.splitSequence(u8, buffer, "\r\n");
Expand Down
Loading