Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions include/livekit/room.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
#pragma once

#include <cstdint>
#include <future>
#include <memory>
#include <mutex>

#include "livekit/data_stream.h"
#include "livekit/e2ee.h"
#include "livekit/ffi_handle.h"
#include "livekit/room_event_types.h"
#include "livekit/stats.h"
#include "livekit/subscription_thread_dispatcher.h"
#include "livekit/visibility.h"

Expand Down Expand Up @@ -181,6 +183,19 @@ class LIVEKIT_API Room {
/// Returns the current connection state of the room.
ConnectionState connectionState() const;

/// Retrieve aggregated WebRTC stats for this room session.
///
/// Dispatches an async request to the server and returns a future that
/// resolves with the stats.
///
/// @return Future of the room session stats.
///
/// @throws std::runtime_error Synchronously, if the room is not currently
/// connected, or if the FFI request fails to
/// dispatch.
/// @throws std::runtime_error On `future.get()`, if the async request fails.
std::future<SessionStats> getStats() const;

/// Register a handler for incoming text streams on a specific topic.
///
/// When a remote participant opens a text stream with the given topic,
Expand Down
8 changes: 8 additions & 0 deletions include/livekit/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,14 @@ struct RtcStats {
RtcStatsVariant stats;
};

/// Aggregated WebRTC stats for a connected room session.
struct SessionStats {
/// Stats from the publisher peer connection (outbound media).
std::vector<RtcStats> publisher_stats;
/// Stats from the subscriber peer connection (inbound media).
std::vector<RtcStats> subscriber_stats;
};

// ----------------------
// fromProto declarations
// ----------------------
Expand Down
53 changes: 53 additions & 0 deletions src/ffi_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,59 @@ std::future<std::vector<RtcStats>> FfiClient::getTrackStatsAsync(uintptr_t track
return fut;
}

std::future<SessionStats> FfiClient::getSessionStatsAsync(uintptr_t room_handle) {
const AsyncId async_id = generateAsyncId();

auto fut = registerAsync<SessionStats>(
async_id,
// match
[async_id](const proto::FfiEvent& event) {
return event.has_get_session_stats() && event.get_session_stats().async_id() == async_id;
},
// handler
[](const proto::FfiEvent& event, std::promise<SessionStats>& pr) {
const auto& cb = event.get_session_stats();
if (cb.has_error()) {
pr.set_exception(std::make_exception_ptr(std::runtime_error(cb.error())));
return;
}
if (!cb.has_result()) {
pr.set_exception(
std::make_exception_ptr(std::runtime_error("GetSessionStatsCallback missing result and error")));
return;
}

const auto& result = cb.result();
SessionStats stats;
stats.publisher_stats.reserve(result.publisher_stats_size());
for (const auto& ps : result.publisher_stats()) {
stats.publisher_stats.push_back(fromProto(ps));
}
stats.subscriber_stats.reserve(result.subscriber_stats_size());
for (const auto& ps : result.subscriber_stats()) {
stats.subscriber_stats.push_back(fromProto(ps));
}
pr.set_value(std::move(stats));
});

proto::FfiRequest req;
auto* get_session_stats_req = req.mutable_get_session_stats();
get_session_stats_req->set_room_handle(room_handle);
get_session_stats_req->set_request_async_id(async_id);

try {
const proto::FfiResponse resp = sendRequest(req);
if (!resp.has_get_session_stats()) {
logAndThrow("FfiResponse missing get_session_stats");
}
} catch (...) {
cancelPendingByAsyncId(async_id);
throw;
}

return fut;
}

// Participant APIs Implementation
std::future<proto::OwnedTrackPublication> FfiClient::publishTrackAsync(std::uint64_t local_participant_handle,
std::uint64_t track_handle,
Expand Down
2 changes: 2 additions & 0 deletions src/ffi_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class LIVEKIT_INTERNAL_API FfiClient {
// Track APIs
std::future<std::vector<RtcStats>> getTrackStatsAsync(uintptr_t track_handle);

std::future<SessionStats> getSessionStatsAsync(uintptr_t room_handle);

// Participant APIs
std::future<proto::OwnedTrackPublication> publishTrackAsync(std::uint64_t local_participant_handle,
std::uint64_t track_handle,
Expand Down
12 changes: 12 additions & 0 deletions src/room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,18 @@ ConnectionState Room::connectionState() const {
return connection_state_;
}

std::future<SessionStats> Room::getStats() const {
std::shared_ptr<FfiHandle> handle;
{
const std::scoped_lock<std::mutex> g(lock_);
handle = room_handle_;
}
if (!handle) {
throw std::runtime_error("Room::getStats called on a disconnected room");
}
return FfiClient::instance().getSessionStatsAsync(handle->get());
}

E2EEManager* Room::e2eeManager() const {
const std::scoped_lock<std::mutex> g(lock_);
return e2ee_manager_.get();
Expand Down
202 changes: 202 additions & 0 deletions src/tests/integration/test_session_stats.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Copyright 2026 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iomanip>
#include <iostream>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

#include "../common/audio_utils.h"
#include "../common/test_common.h"

namespace livekit::test {

using namespace std::chrono_literals;

namespace {

constexpr int kAudioSampleRate = kDefaultAudioSampleRate;
constexpr int kAudioChannels = kDefaultAudioChannels;

/// Time to let media flow before sampling stats; below this the RTP counters
/// are typically empty and the printed output is uninteresting.
constexpr auto kStatsWarmup = 5s;

const char* rtcStatsTypeName(const RtcStats& s) {
return std::visit(
[](const auto& v) -> const char* {
using T = std::decay_t<decltype(v)>;
if constexpr (std::is_same_v<T, RtcCodecStats>) {
return "Codec";
} else if constexpr (std::is_same_v<T, RtcInboundRtpStats>) {
return "InboundRtp";
} else if constexpr (std::is_same_v<T, RtcOutboundRtpStats>) {
return "OutboundRtp";
} else if constexpr (std::is_same_v<T, RtcRemoteInboundRtpStats>) {
return "RemoteInboundRtp";
} else if constexpr (std::is_same_v<T, RtcRemoteOutboundRtpStats>) {
return "RemoteOutboundRtp";
} else if constexpr (std::is_same_v<T, RtcMediaSourceStats>) {
return "MediaSource";
} else if constexpr (std::is_same_v<T, RtcMediaPlayoutStats>) {
return "MediaPlayout";
} else if constexpr (std::is_same_v<T, RtcPeerConnectionStats>) {
return "PeerConnection";
} else if constexpr (std::is_same_v<T, RtcDataChannelStats>) {
return "DataChannel";
} else if constexpr (std::is_same_v<T, RtcTransportStats>) {
return "Transport";
} else if constexpr (std::is_same_v<T, RtcCandidatePairStats>) {
return "CandidatePair";
} else if constexpr (std::is_same_v<T, RtcLocalCandidateStats>) {
return "LocalCandidate";
} else if constexpr (std::is_same_v<T, RtcRemoteCandidateStats>) {
return "RemoteCandidate";
} else if constexpr (std::is_same_v<T, RtcCertificateStats>) {
return "Certificate";
} else if constexpr (std::is_same_v<T, RtcStreamStats>) {
return "Stream";
} else {
return "Unknown";
}
},
s.stats);
}

void dumpInterestingEntries(const std::vector<RtcStats>& stats) {
for (const auto& stat : stats) {
std::visit(
[&](const auto& s) {
using T = std::decay_t<decltype(s)>;
if constexpr (std::is_same_v<T, RtcOutboundRtpStats>) {
std::cout << " [OutboundRtp] id=" << s.rtc.id << " kind=" << s.stream.kind
<< " packets_sent=" << s.sent.packets_sent << " bytes_sent=" << s.sent.bytes_sent
<< " target_bitrate=" << std::fixed << std::setprecision(2) << s.outbound.target_bitrate
<< std::endl;
} else if constexpr (std::is_same_v<T, RtcInboundRtpStats>) {
std::cout << " [InboundRtp] id=" << s.rtc.id << " kind=" << s.stream.kind
<< " packets_received=" << s.received.packets_received
<< " packets_lost=" << s.received.packets_lost << " jitter=" << std::fixed << std::setprecision(6)
<< s.received.jitter << " bytes_received=" << s.inbound.bytes_received << std::endl;
} else if constexpr (std::is_same_v<T, RtcCandidatePairStats>) {
std::cout << " [CandidatePair] id=" << s.rtc.id << " rtt=" << std::fixed << std::setprecision(4)
<< s.candidate_pair.current_round_trip_time << "s"
<< " in_bitrate=" << s.candidate_pair.available_incoming_bitrate
<< " out_bitrate=" << s.candidate_pair.available_outgoing_bitrate
<< " bytes_sent=" << s.candidate_pair.bytes_sent
<< " bytes_received=" << s.candidate_pair.bytes_received << std::endl;
} else if constexpr (std::is_same_v<T, RtcTransportStats>) {
std::cout << " [Transport] id=" << s.rtc.id << " packets_sent=" << s.transport.packets_sent
<< " packets_received=" << s.transport.packets_received
<< " bytes_sent=" << s.transport.bytes_sent << " bytes_received=" << s.transport.bytes_received
<< std::endl;
} else if constexpr (std::is_same_v<T, RtcPeerConnectionStats>) {
std::cout << " [PeerConnection] id=" << s.rtc.id
<< " data_channels_opened=" << s.pc.data_channels_opened
<< " data_channels_closed=" << s.pc.data_channels_closed << std::endl;
}
},
stat.stats);
}
}

void printSide(const std::string& side_label, const std::vector<RtcStats>& stats) {
std::cout << " " << side_label << " entries=" << stats.size();
std::map<std::string, int> type_counts;
for (const auto& s : stats) {
type_counts[rtcStatsTypeName(s)]++;
}
if (!type_counts.empty()) {
std::cout << " types:";
for (const auto& kv : type_counts) {
std::cout << " " << kv.first << "=" << kv.second;
}
}
std::cout << std::endl;
dumpInterestingEntries(stats);
}

void printSessionStats(const std::string& room_label, const SessionStats& stats) {
std::cout << "[SessionStats] " << room_label << ":" << std::endl;
printSide("publisher", stats.publisher_stats);
printSide("subscriber", stats.subscriber_stats);
}

} // namespace

class SessionStatsIntegrationTest : public LiveKitTestBase {};

TEST_F(SessionStatsIntegrationTest, PublishAudioThenFetchSessionStats) {
skipIfNotConfigured();

RoomOptions options;
options.auto_subscribe = true;
options.single_peer_connection = false;

auto receiver_room = std::make_unique<Room>();
ASSERT_TRUE(receiver_room->connect(config_.url, config_.token_b, options)) << "Receiver failed to connect";

auto sender_room = std::make_unique<Room>();
ASSERT_TRUE(sender_room->connect(config_.url, config_.token_a, options)) << "Sender failed to connect";

auto source = std::make_shared<AudioSource>(kAudioSampleRate, kAudioChannels, 0);
auto track = LocalAudioTrack::createLocalAudioTrack("session-stats-audio", source);
TrackPublishOptions opts;
opts.source = TrackSource::SOURCE_MICROPHONE;
sender_room->localParticipant()->publishTrack(track, opts);
std::cerr << "[SessionStats] published audio track sid=" << track->sid() << std::endl;

std::atomic<bool> running{true};
std::thread audio_thread([&]() { runToneLoop(source, running, /*base_freq_hz=*/440.0, /*siren_mode=*/false); });

std::this_thread::sleep_for(kStatsWarmup);

auto sender_fut = sender_room->getStats();
auto receiver_fut = receiver_room->getStats();

SessionStats sender_stats;
SessionStats receiver_stats;
ASSERT_NO_THROW(sender_stats = sender_fut.get()) << "Sender getStats threw";
ASSERT_NO_THROW(receiver_stats = receiver_fut.get()) << "Receiver getStats threw";

running.store(false, std::memory_order_relaxed);
if (audio_thread.joinable()) {
audio_thread.join();
}
if (track->publication()) {
sender_room->localParticipant()->unpublishTrack(track->publication()->sid());
}

printSessionStats("sender", sender_stats);
printSessionStats("receiver", receiver_stats);

EXPECT_FALSE(sender_stats.publisher_stats.empty()) << "Sender should have publisher stats";
EXPECT_FALSE(receiver_stats.subscriber_stats.empty()) << "Receiver should have subscriber stats";
}

TEST_F(SessionStatsIntegrationTest, NotConnectedThrows) {
Room room;
EXPECT_THROW(room.getStats(), std::runtime_error);
}

} // namespace livekit::test
6 changes: 6 additions & 0 deletions src/tests/unit/test_ffi_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ TEST_F(FfiClientTest, NotInitialized_GetTrackStatsAsyncThrows) {
EXPECT_THROW(FfiClient::instance().getTrackStatsAsync(1), std::runtime_error);
}

TEST_F(FfiClientTest, NotInitialized_GetSessionStatsAsyncThrows) {
ASSERT_FALSE(FfiClient::instance().isInitialized());

EXPECT_THROW(FfiClient::instance().getSessionStatsAsync(1), std::runtime_error);
}

TEST_F(FfiClientTest, NotInitialized_PublishDataTrackAsyncFails) {
ASSERT_FALSE(FfiClient::instance().isInitialized());

Expand Down
Loading