Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions include/livekit/room.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,26 @@ class LIVEKIT_API Room {
bool Connect(const std::string& url, const std::string& token, const RoomOptions& options);
// NOLINTEND(readability-identifier-naming)

/// Disconnect from the room.
///
/// Sends a `DisconnectRequest` to the server with the given reason, blocks
/// until the FFI acknowledges, and tears down all room state (participants,
/// listener, E2EE manager, subscription threads). The `onDisconnected`
/// delegate is invoked once with the supplied reason.
///
/// @warning Safe to call from any thread, but **must not** be called from inside a
/// `RoomDelegate` callback — doing so will deadlock the event listener.
///
/// @note `~Room()` invokes `disconnect()` automatically if the room is
/// still connected, so explicit calls are optional. Calling this
/// explicitly lets you handle the disconnect outcome and choose a
/// reason other than `ClientInitiated`.
///
/// @param reason Reason reported to the server (default: ClientInitiated).
/// @returns true if a disconnect roundtrip was performed; false if the
/// room was already disconnected.
bool disconnect(DisconnectReason reason = DisconnectReason::ClientInitiated);

// Accessors

/// Retrieve static metadata about the room.
Expand Down
31 changes: 31 additions & 0 deletions src/ffi_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,37 @@ std::future<proto::ConnectCallback> FfiClient::connectAsync(const std::string& u
return fut;
}

std::future<void> FfiClient::disconnectAsync(uintptr_t room_handle, DisconnectReason reason) {
const AsyncId async_id = generateAsyncId();

auto fut = registerAsync<void>(
async_id,
// match: this DisconnectCallback's async_id
[async_id](const proto::FfiEvent& event) {
return event.has_disconnect() && event.disconnect().async_id() == async_id;
},
// handler: nothing to extract; the callback is signal-only
[](const proto::FfiEvent& /*event*/, std::promise<void>& pr) { pr.set_value(); });

proto::FfiRequest req;
auto* disconnect = req.mutable_disconnect();
disconnect->set_room_handle(room_handle);
disconnect->set_request_async_id(async_id);
disconnect->set_reason(toProto(reason));

try {
const proto::FfiResponse resp = sendRequest(req);
if (!resp.has_disconnect()) {
logAndThrow("FfiResponse missing disconnect");
}
} catch (...) {
cancelPendingByAsyncId(async_id);
throw;
}

return fut;
}

// Track APIs Implementation
std::future<std::vector<RtcStats>> FfiClient::getTrackStatsAsync(uintptr_t track_handle) {
// Generate client-side async_id first
Expand Down
3 changes: 3 additions & 0 deletions src/ffi_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "data_track.pb.h"
#include "livekit/data_track_error.h"
#include "livekit/result.h"
#include "livekit/room_event_types.h"
#include "livekit/stats.h"
#include "livekit/visibility.h"
#include "lk_log.h"
Expand Down Expand Up @@ -94,6 +95,8 @@ class LIVEKIT_INTERNAL_API FfiClient {
std::future<proto::ConnectCallback> connectAsync(const std::string& url, const std::string& token,
const RoomOptions& options);

std::future<void> disconnectAsync(uintptr_t room_handle, DisconnectReason reason);

// Track APIs
std::future<std::vector<RtcStats>> getTrackStatsAsync(uintptr_t track_handle);

Expand Down
151 changes: 126 additions & 25 deletions src/room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,32 +76,17 @@ void readyForRoomEvent(std::uint64_t room_handle) {
Room::Room() : subscription_thread_dispatcher_(std::make_unique<SubscriptionThreadDispatcher>()) {}

Room::~Room() {
if (subscription_thread_dispatcher_) {
subscription_thread_dispatcher_->stopAll();
}

int listener_to_remove = 0;
std::unique_ptr<LocalParticipant> local_participant_to_cleanup;
{
const std::scoped_lock<std::mutex> g(lock_);
listener_to_remove = listener_id_;
listener_id_ = 0;
// Move local participant out for cleanup outside the lock
local_participant_to_cleanup = std::move(local_participant_);
}

// Shutdown local participant (unregisters RPC handlers, etc.) before
// removing the listener. This prevents in-flight RPC responses from
// trying to use destroyed handles.
if (local_participant_to_cleanup) {
local_participant_to_cleanup->shutdown();
}

if (listener_to_remove != 0) {
FfiClient::instance().removeListener(listener_to_remove);
// disconnect() is used for all tear down cases: it handles the
// already-disconnected case (returns false, no-op), the partial/Reconnecting
// case, and the FFI-failure case (local teardown still runs). Nothing else
// needs to live in the destructor.
try {
(void)disconnect(); // Don't need return value
} catch (const std::exception& e) {
LK_LOG_ERROR("Room::~Room: graceful disconnect failed: {}", e.what());
} catch (...) {
LK_LOG_ERROR("Room::~Room: graceful disconnect failed: unknown exception");
}

// local_participant_to_cleanup is destroyed here after listener is removed
}

void Room::setDelegate(RoomDelegate* delegate) {
Expand Down Expand Up @@ -234,6 +219,103 @@ bool Room::Connect(const std::string& url, const std::string& token, const RoomO
return connect(url, token, options);
}

bool Room::disconnect(DisconnectReason reason) {
TRACE_EVENT0("livekit", "Room::disconnect");

// Canonical teardown path. Move all owned state out under the lock, then
// operate on it outside the lock. The destructor (and any caller) gets
// the same behavior: once this returns, the Room is fully torn down.
//
// Return value:
// true - we owned live state and tore it down (FFI disconnect succeeded)
// false - either already disconnected (no-op) or FFI disconnect failed.
// In both false cases local-side teardown still completed.

std::shared_ptr<FfiHandle> handle;
RoomDelegate* delegate_snapshot = nullptr;
std::unique_ptr<LocalParticipant> local_participant_to_cleanup;
std::unordered_map<std::string, std::shared_ptr<RemoteParticipant>> remote_participants_to_clear;
std::unique_ptr<E2EEManager> e2ee_manager_to_clear;
std::unordered_map<std::string, std::shared_ptr<TextStreamReader>> text_stream_readers_to_clear;
std::unordered_map<std::string, std::shared_ptr<ByteStreamReader>> byte_stream_readers_to_clear;
int listener_to_remove = 0;

{
const std::scoped_lock<std::mutex> g(lock_);
if (connection_state_ == ConnectionState::Disconnected) {
// Already torn down (or never connected). Nothing to do.
return false;
}
handle = room_handle_;
delegate_snapshot = delegate_;
// Take ownership of everything under the lock so the kEos handler (which
// also tries to move it out) loses any race here — only one teardown
// path operates on this state.
local_participant_to_cleanup = std::move(local_participant_);
remote_participants_to_clear = std::move(remote_participants_);
e2ee_manager_to_clear = std::move(e2ee_manager_);
text_stream_readers_to_clear = std::move(text_stream_readers_);
byte_stream_readers_to_clear = std::move(byte_stream_readers_);
listener_to_remove = listener_id_;
listener_id_ = 0;
room_handle_.reset();
// Flip state immediately so the in-flight Disconnected room-event we'll
// get back doesn't double-fire onDisconnected. Mirrors Python's
// Room.disconnect(), which also flips state before sending the request.
connection_state_ = ConnectionState::Disconnected;
}

// Drain in-flight RPC handlers BEFORE telling Rust to tear down the room.
// Mirrors client-sdk-python's Room.disconnect() ordering: once the FFI
// dispatches the Disconnect, Rust starts invalidating participant handles
// in its table, and any listener-thread RPC handler still mid-flight
// would race with that invalidation and send to a dead handle →
// INVALID_HANDLE → terminate.
if (local_participant_to_cleanup) {
local_participant_to_cleanup->shutdown();
}

// Tell the FFI to close the room and wait for the callback. If this fails
// we still complete local-side teardown below — releasing the listener,
// dropping handles, and notifying the delegate — so the Room is fully
// cleaned up regardless of whether the FFI round-trip succeeded.
bool ffi_ok = true;
if (handle) {
try {
FfiClient::instance().disconnectAsync(handle->get(), reason).get();
} catch (const std::exception& e) {
LK_LOG_ERROR("Room::disconnect: FFI disconnect failed (continuing local teardown): {}", e.what());
ffi_ok = false;
}
}

// Stop dispatcher so no track callbacks fire mid-teardown.
if (subscription_thread_dispatcher_) {
subscription_thread_dispatcher_->stopAll();
}

if (listener_to_remove != 0) {
FfiClient::instance().removeListener(listener_to_remove);
}

// Fire onDisconnected exactly once, with the reason the caller passed.
if (delegate_snapshot) {
DisconnectedEvent ev;
ev.reason = reason;
try {
delegate_snapshot->onDisconnected(*this, ev);
} catch (const std::exception& e) {
LK_LOG_ERROR("Room::disconnect: onDisconnected threw: {}", e.what());
} catch (...) {
LK_LOG_ERROR("Room::disconnect: onDisconnected threw: unknown exception");
}
}

// Moved-out state (local participant, remote participants, e2ee manager,
// stream readers) destructs here, releasing FFI handles.
return ffi_ok;
}

RoomInfoData Room::roomInfo() const {
const std::scoped_lock<std::mutex> g(lock_);
return room_info_;
Expand Down Expand Up @@ -1139,6 +1221,17 @@ void Room::onEvent(const FfiEvent& event) {
break;
}
case proto::RoomEvent::kDisconnected: {
// If disconnect() was driven from our side, it already flipped state
// to Disconnected and fired the delegate; skip the duplicate here.
bool already_disconnected = false;
{
const std::scoped_lock<std::mutex> guard(lock_);
already_disconnected = (connection_state_ == ConnectionState::Disconnected);
connection_state_ = ConnectionState::Disconnected;
}
if (already_disconnected) {
break;
}
DisconnectedEvent ev;
ev.reason = toDisconnectReason(re.disconnected().reason());
if (delegate_snapshot) {
Expand Down Expand Up @@ -1193,6 +1286,14 @@ void Room::onEvent(const FfiEvent& event) {
old_byte_readers = std::move(byte_stream_readers_);
}

// Drain in-flight RPC invocations before destroying the local
// participant's FFI handle. Mirrors the ordering in disconnect();
// without this, a listener-thread RPC handler can race with handle
// disposal and send to a dead handle → INVALID_HANDLE → terminate.
if (old_local_participant) {
old_local_participant->shutdown();
}

// Remove listener outside lock
if (listener_to_remove != 0) {
FfiClient::instance().removeListener(listener_to_remove);
Expand Down
77 changes: 74 additions & 3 deletions src/room_proto_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,80 @@ DataPacketKind toDataPacketKind(proto::DataPacketKind in) {
}
}

DisconnectReason toDisconnectReason(proto::DisconnectReason /*in*/) {
// TODO: map each proto::DisconnectReason to your DisconnectReason enum
return DisconnectReason::Unknown;
DisconnectReason toDisconnectReason(proto::DisconnectReason in) {
switch (in) {
case proto::CLIENT_INITIATED:
return DisconnectReason::ClientInitiated;
case proto::DUPLICATE_IDENTITY:
return DisconnectReason::DuplicateIdentity;
case proto::SERVER_SHUTDOWN:
return DisconnectReason::ServerShutdown;
case proto::PARTICIPANT_REMOVED:
return DisconnectReason::ParticipantRemoved;
case proto::ROOM_DELETED:
return DisconnectReason::RoomDeleted;
case proto::STATE_MISMATCH:
return DisconnectReason::StateMismatch;
case proto::JOIN_FAILURE:
return DisconnectReason::JoinFailure;
case proto::MIGRATION:
return DisconnectReason::Migration;
case proto::SIGNAL_CLOSE:
return DisconnectReason::SignalClose;
case proto::ROOM_CLOSED:
return DisconnectReason::RoomClosed;
case proto::USER_UNAVAILABLE:
return DisconnectReason::UserUnavailable;
case proto::USER_REJECTED:
return DisconnectReason::UserRejected;
case proto::SIP_TRUNK_FAILURE:
return DisconnectReason::SipTrunkFailure;
case proto::CONNECTION_TIMEOUT:
return DisconnectReason::ConnectionTimeout;
case proto::MEDIA_FAILURE:
return DisconnectReason::MediaFailure;
case proto::UNKNOWN_REASON:
default:
return DisconnectReason::Unknown;
}
}

proto::DisconnectReason toProto(DisconnectReason in) {
switch (in) {
case DisconnectReason::ClientInitiated:
return proto::CLIENT_INITIATED;
case DisconnectReason::DuplicateIdentity:
return proto::DUPLICATE_IDENTITY;
case DisconnectReason::ServerShutdown:
return proto::SERVER_SHUTDOWN;
case DisconnectReason::ParticipantRemoved:
return proto::PARTICIPANT_REMOVED;
case DisconnectReason::RoomDeleted:
return proto::ROOM_DELETED;
case DisconnectReason::StateMismatch:
return proto::STATE_MISMATCH;
case DisconnectReason::JoinFailure:
return proto::JOIN_FAILURE;
case DisconnectReason::Migration:
return proto::MIGRATION;
case DisconnectReason::SignalClose:
return proto::SIGNAL_CLOSE;
case DisconnectReason::RoomClosed:
return proto::ROOM_CLOSED;
case DisconnectReason::UserUnavailable:
return proto::USER_UNAVAILABLE;
case DisconnectReason::UserRejected:
return proto::USER_REJECTED;
case DisconnectReason::SipTrunkFailure:
return proto::SIP_TRUNK_FAILURE;
case DisconnectReason::ConnectionTimeout:
return proto::CONNECTION_TIMEOUT;
case DisconnectReason::MediaFailure:
return proto::MEDIA_FAILURE;
case DisconnectReason::Unknown:
default:
return proto::UNKNOWN_REASON;
}
}

// --------- basic helper conversions ---------
Expand Down
1 change: 1 addition & 0 deletions src/room_proto_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ LIVEKIT_INTERNAL_API ConnectionQuality toConnectionQuality(proto::ConnectionQual
LIVEKIT_INTERNAL_API ConnectionState toConnectionState(proto::ConnectionState in);
LIVEKIT_INTERNAL_API DataPacketKind toDataPacketKind(proto::DataPacketKind in);
LIVEKIT_INTERNAL_API DisconnectReason toDisconnectReason(proto::DisconnectReason in);
LIVEKIT_INTERNAL_API proto::DisconnectReason toProto(DisconnectReason in);

LIVEKIT_INTERNAL_API UserPacketData fromProto(const proto::UserPacket& in);
LIVEKIT_INTERNAL_API SipDtmfData fromProto(const proto::SipDTMF& in);
Expand Down
Loading
Loading