diff --git a/include/livekit/room.h b/include/livekit/room.h index ac80fb4d..b02d8af7 100644 --- a/include/livekit/room.h +++ b/include/livekit/room.h @@ -141,6 +141,26 @@ class LIVEKIT_API Room { bool Connect(const std::string& url, const std::string& token, const RoomOptions& options); // NOLINTEND(readability-identifier-naming) + /// Disconnect from the room. + /// + /// Sends a `DisconnectRequest` to the server with the given reason, blocks + /// until the FFI acknowledges, and tears down all room state (participants, + /// listener, E2EE manager, subscription threads). The `onDisconnected` + /// delegate is invoked once with the supplied reason. + /// + /// @warning Safe to call from any thread, but **must not** be called from inside a + /// `RoomDelegate` callback — doing so will deadlock the event listener. + /// + /// @note `~Room()` invokes `disconnect()` automatically if the room is + /// still connected, so explicit calls are optional. Calling this + /// explicitly lets you handle the disconnect outcome and choose a + /// reason other than `ClientInitiated`. + /// + /// @param reason Reason reported to the server (default: ClientInitiated). + /// @returns true if a disconnect roundtrip was performed; false if the + /// room was already disconnected. + bool disconnect(DisconnectReason reason = DisconnectReason::ClientInitiated); + // Accessors /// Retrieve static metadata about the room. diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index de2ae065..1ae3ddf4 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -380,6 +380,37 @@ std::future FfiClient::connectAsync(const std::string& u return fut; } +std::future FfiClient::disconnectAsync(uintptr_t room_handle, DisconnectReason reason) { + const AsyncId async_id = generateAsyncId(); + + auto fut = registerAsync( + async_id, + // match: this DisconnectCallback's async_id + [async_id](const proto::FfiEvent& event) { + return event.has_disconnect() && event.disconnect().async_id() == async_id; + }, + // handler: nothing to extract; the callback is signal-only + [](const proto::FfiEvent& /*event*/, std::promise& pr) { pr.set_value(); }); + + proto::FfiRequest req; + auto* disconnect = req.mutable_disconnect(); + disconnect->set_room_handle(room_handle); + disconnect->set_request_async_id(async_id); + disconnect->set_reason(toProto(reason)); + + try { + const proto::FfiResponse resp = sendRequest(req); + if (!resp.has_disconnect()) { + logAndThrow("FfiResponse missing disconnect"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; +} + // Track APIs Implementation std::future> FfiClient::getTrackStatsAsync(uintptr_t track_handle) { // Generate client-side async_id first diff --git a/src/ffi_client.h b/src/ffi_client.h index 1cfdd361..5136e49e 100644 --- a/src/ffi_client.h +++ b/src/ffi_client.h @@ -30,6 +30,7 @@ #include "data_track.pb.h" #include "livekit/data_track_error.h" #include "livekit/result.h" +#include "livekit/room_event_types.h" #include "livekit/stats.h" #include "livekit/visibility.h" #include "lk_log.h" @@ -94,6 +95,8 @@ class LIVEKIT_INTERNAL_API FfiClient { std::future connectAsync(const std::string& url, const std::string& token, const RoomOptions& options); + std::future disconnectAsync(uintptr_t room_handle, DisconnectReason reason); + // Track APIs std::future> getTrackStatsAsync(uintptr_t track_handle); diff --git a/src/room.cpp b/src/room.cpp index 99b050a6..2f854ee5 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -76,32 +76,17 @@ void readyForRoomEvent(std::uint64_t room_handle) { Room::Room() : subscription_thread_dispatcher_(std::make_unique()) {} Room::~Room() { - if (subscription_thread_dispatcher_) { - subscription_thread_dispatcher_->stopAll(); - } - - int listener_to_remove = 0; - std::unique_ptr local_participant_to_cleanup; - { - const std::scoped_lock g(lock_); - listener_to_remove = listener_id_; - listener_id_ = 0; - // Move local participant out for cleanup outside the lock - local_participant_to_cleanup = std::move(local_participant_); - } - - // Shutdown local participant (unregisters RPC handlers, etc.) before - // removing the listener. This prevents in-flight RPC responses from - // trying to use destroyed handles. - if (local_participant_to_cleanup) { - local_participant_to_cleanup->shutdown(); - } - - if (listener_to_remove != 0) { - FfiClient::instance().removeListener(listener_to_remove); + // disconnect() is used for all tear down cases: it handles the + // already-disconnected case (returns false, no-op), the partial/Reconnecting + // case, and the FFI-failure case (local teardown still runs). Nothing else + // needs to live in the destructor. + try { + (void)disconnect(); // Don't need return value + } catch (const std::exception& e) { + LK_LOG_ERROR("Room::~Room: graceful disconnect failed: {}", e.what()); + } catch (...) { + LK_LOG_ERROR("Room::~Room: graceful disconnect failed: unknown exception"); } - - // local_participant_to_cleanup is destroyed here after listener is removed } void Room::setDelegate(RoomDelegate* delegate) { @@ -234,6 +219,103 @@ bool Room::Connect(const std::string& url, const std::string& token, const RoomO return connect(url, token, options); } +bool Room::disconnect(DisconnectReason reason) { + TRACE_EVENT0("livekit", "Room::disconnect"); + + // Canonical teardown path. Move all owned state out under the lock, then + // operate on it outside the lock. The destructor (and any caller) gets + // the same behavior: once this returns, the Room is fully torn down. + // + // Return value: + // true - we owned live state and tore it down (FFI disconnect succeeded) + // false - either already disconnected (no-op) or FFI disconnect failed. + // In both false cases local-side teardown still completed. + + std::shared_ptr handle; + RoomDelegate* delegate_snapshot = nullptr; + std::unique_ptr local_participant_to_cleanup; + std::unordered_map> remote_participants_to_clear; + std::unique_ptr e2ee_manager_to_clear; + std::unordered_map> text_stream_readers_to_clear; + std::unordered_map> byte_stream_readers_to_clear; + int listener_to_remove = 0; + + { + const std::scoped_lock g(lock_); + if (connection_state_ == ConnectionState::Disconnected) { + // Already torn down (or never connected). Nothing to do. + return false; + } + handle = room_handle_; + delegate_snapshot = delegate_; + // Take ownership of everything under the lock so the kEos handler (which + // also tries to move it out) loses any race here — only one teardown + // path operates on this state. + local_participant_to_cleanup = std::move(local_participant_); + remote_participants_to_clear = std::move(remote_participants_); + e2ee_manager_to_clear = std::move(e2ee_manager_); + text_stream_readers_to_clear = std::move(text_stream_readers_); + byte_stream_readers_to_clear = std::move(byte_stream_readers_); + listener_to_remove = listener_id_; + listener_id_ = 0; + room_handle_.reset(); + // Flip state immediately so the in-flight Disconnected room-event we'll + // get back doesn't double-fire onDisconnected. Mirrors Python's + // Room.disconnect(), which also flips state before sending the request. + connection_state_ = ConnectionState::Disconnected; + } + + // Drain in-flight RPC handlers BEFORE telling Rust to tear down the room. + // Mirrors client-sdk-python's Room.disconnect() ordering: once the FFI + // dispatches the Disconnect, Rust starts invalidating participant handles + // in its table, and any listener-thread RPC handler still mid-flight + // would race with that invalidation and send to a dead handle → + // INVALID_HANDLE → terminate. + if (local_participant_to_cleanup) { + local_participant_to_cleanup->shutdown(); + } + + // Tell the FFI to close the room and wait for the callback. If this fails + // we still complete local-side teardown below — releasing the listener, + // dropping handles, and notifying the delegate — so the Room is fully + // cleaned up regardless of whether the FFI round-trip succeeded. + bool ffi_ok = true; + if (handle) { + try { + FfiClient::instance().disconnectAsync(handle->get(), reason).get(); + } catch (const std::exception& e) { + LK_LOG_ERROR("Room::disconnect: FFI disconnect failed (continuing local teardown): {}", e.what()); + ffi_ok = false; + } + } + + // Stop dispatcher so no track callbacks fire mid-teardown. + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->stopAll(); + } + + if (listener_to_remove != 0) { + FfiClient::instance().removeListener(listener_to_remove); + } + + // Fire onDisconnected exactly once, with the reason the caller passed. + if (delegate_snapshot) { + DisconnectedEvent ev; + ev.reason = reason; + try { + delegate_snapshot->onDisconnected(*this, ev); + } catch (const std::exception& e) { + LK_LOG_ERROR("Room::disconnect: onDisconnected threw: {}", e.what()); + } catch (...) { + LK_LOG_ERROR("Room::disconnect: onDisconnected threw: unknown exception"); + } + } + + // Moved-out state (local participant, remote participants, e2ee manager, + // stream readers) destructs here, releasing FFI handles. + return ffi_ok; +} + RoomInfoData Room::roomInfo() const { const std::scoped_lock g(lock_); return room_info_; @@ -1139,6 +1221,17 @@ void Room::onEvent(const FfiEvent& event) { break; } case proto::RoomEvent::kDisconnected: { + // If disconnect() was driven from our side, it already flipped state + // to Disconnected and fired the delegate; skip the duplicate here. + bool already_disconnected = false; + { + const std::scoped_lock guard(lock_); + already_disconnected = (connection_state_ == ConnectionState::Disconnected); + connection_state_ = ConnectionState::Disconnected; + } + if (already_disconnected) { + break; + } DisconnectedEvent ev; ev.reason = toDisconnectReason(re.disconnected().reason()); if (delegate_snapshot) { @@ -1193,6 +1286,14 @@ void Room::onEvent(const FfiEvent& event) { old_byte_readers = std::move(byte_stream_readers_); } + // Drain in-flight RPC invocations before destroying the local + // participant's FFI handle. Mirrors the ordering in disconnect(); + // without this, a listener-thread RPC handler can race with handle + // disposal and send to a dead handle → INVALID_HANDLE → terminate. + if (old_local_participant) { + old_local_participant->shutdown(); + } + // Remove listener outside lock if (listener_to_remove != 0) { FfiClient::instance().removeListener(listener_to_remove); diff --git a/src/room_proto_converter.cpp b/src/room_proto_converter.cpp index 92918fa2..9fff5eb8 100644 --- a/src/room_proto_converter.cpp +++ b/src/room_proto_converter.cpp @@ -99,9 +99,80 @@ DataPacketKind toDataPacketKind(proto::DataPacketKind in) { } } -DisconnectReason toDisconnectReason(proto::DisconnectReason /*in*/) { - // TODO: map each proto::DisconnectReason to your DisconnectReason enum - return DisconnectReason::Unknown; +DisconnectReason toDisconnectReason(proto::DisconnectReason in) { + switch (in) { + case proto::CLIENT_INITIATED: + return DisconnectReason::ClientInitiated; + case proto::DUPLICATE_IDENTITY: + return DisconnectReason::DuplicateIdentity; + case proto::SERVER_SHUTDOWN: + return DisconnectReason::ServerShutdown; + case proto::PARTICIPANT_REMOVED: + return DisconnectReason::ParticipantRemoved; + case proto::ROOM_DELETED: + return DisconnectReason::RoomDeleted; + case proto::STATE_MISMATCH: + return DisconnectReason::StateMismatch; + case proto::JOIN_FAILURE: + return DisconnectReason::JoinFailure; + case proto::MIGRATION: + return DisconnectReason::Migration; + case proto::SIGNAL_CLOSE: + return DisconnectReason::SignalClose; + case proto::ROOM_CLOSED: + return DisconnectReason::RoomClosed; + case proto::USER_UNAVAILABLE: + return DisconnectReason::UserUnavailable; + case proto::USER_REJECTED: + return DisconnectReason::UserRejected; + case proto::SIP_TRUNK_FAILURE: + return DisconnectReason::SipTrunkFailure; + case proto::CONNECTION_TIMEOUT: + return DisconnectReason::ConnectionTimeout; + case proto::MEDIA_FAILURE: + return DisconnectReason::MediaFailure; + case proto::UNKNOWN_REASON: + default: + return DisconnectReason::Unknown; + } +} + +proto::DisconnectReason toProto(DisconnectReason in) { + switch (in) { + case DisconnectReason::ClientInitiated: + return proto::CLIENT_INITIATED; + case DisconnectReason::DuplicateIdentity: + return proto::DUPLICATE_IDENTITY; + case DisconnectReason::ServerShutdown: + return proto::SERVER_SHUTDOWN; + case DisconnectReason::ParticipantRemoved: + return proto::PARTICIPANT_REMOVED; + case DisconnectReason::RoomDeleted: + return proto::ROOM_DELETED; + case DisconnectReason::StateMismatch: + return proto::STATE_MISMATCH; + case DisconnectReason::JoinFailure: + return proto::JOIN_FAILURE; + case DisconnectReason::Migration: + return proto::MIGRATION; + case DisconnectReason::SignalClose: + return proto::SIGNAL_CLOSE; + case DisconnectReason::RoomClosed: + return proto::ROOM_CLOSED; + case DisconnectReason::UserUnavailable: + return proto::USER_UNAVAILABLE; + case DisconnectReason::UserRejected: + return proto::USER_REJECTED; + case DisconnectReason::SipTrunkFailure: + return proto::SIP_TRUNK_FAILURE; + case DisconnectReason::ConnectionTimeout: + return proto::CONNECTION_TIMEOUT; + case DisconnectReason::MediaFailure: + return proto::MEDIA_FAILURE; + case DisconnectReason::Unknown: + default: + return proto::UNKNOWN_REASON; + } } // --------- basic helper conversions --------- diff --git a/src/room_proto_converter.h b/src/room_proto_converter.h index 03d1c733..9b9d3d67 100644 --- a/src/room_proto_converter.h +++ b/src/room_proto_converter.h @@ -37,6 +37,7 @@ LIVEKIT_INTERNAL_API ConnectionQuality toConnectionQuality(proto::ConnectionQual LIVEKIT_INTERNAL_API ConnectionState toConnectionState(proto::ConnectionState in); LIVEKIT_INTERNAL_API DataPacketKind toDataPacketKind(proto::DataPacketKind in); LIVEKIT_INTERNAL_API DisconnectReason toDisconnectReason(proto::DisconnectReason in); +LIVEKIT_INTERNAL_API proto::DisconnectReason toProto(DisconnectReason in); LIVEKIT_INTERNAL_API UserPacketData fromProto(const proto::UserPacket& in); LIVEKIT_INTERNAL_API SipDtmfData fromProto(const proto::SipDTMF& in); diff --git a/src/tests/integration/test_room.cpp b/src/tests/integration/test_room.cpp index 9fb2490d..5dbd68c6 100644 --- a/src/tests/integration/test_room.cpp +++ b/src/tests/integration/test_room.cpp @@ -70,4 +70,57 @@ TEST_F(RoomTest, ConnectWithInvalidUrl) { EXPECT_FALSE(connected) << "Should fail to connect to invalid URL"; } +namespace { + +class DisconnectTrackingDelegate : public RoomDelegate { +public: + void onDisconnected(Room&, const DisconnectedEvent& ev) override { + ++count; + last_reason = ev.reason; + } + + std::atomic count{0}; + DisconnectReason last_reason = DisconnectReason::Unknown; +}; + +} // namespace + +// Case: User calls disconnect() +TEST_F(RoomTest, UserDisconnect) { + Room room; + DisconnectTrackingDelegate delegate; + room.setDelegate(&delegate); + + RoomOptions options; + ASSERT_TRUE(room.connect(server_url_, token_, options)) << "connect failed"; + ASSERT_EQ(room.connectionState(), ConnectionState::Connected); + ASSERT_NE(room.localParticipant(), nullptr); + + EXPECT_NO_THROW(room.disconnect()) << "disconnect should not throw on a connected room"; + EXPECT_EQ(room.connectionState(), ConnectionState::Disconnected); + EXPECT_EQ(room.localParticipant(), nullptr) << "local participant should be cleared after disconnect"; + EXPECT_EQ(delegate.count.load(), 1) << "onDisconnected should fire exactly once"; + EXPECT_EQ(delegate.last_reason, DisconnectReason::ClientInitiated); + + // Calling again on an already-disconnected room is a no-op + EXPECT_NO_THROW(room.disconnect()) << "second disconnect should not throw on an already-disconnected room"; + EXPECT_EQ(delegate.count.load(), 1) << "delegate must not double-fire"; +} + +// Case: Room goes out of scope while still connected +TEST_F(RoomTest, DestructorDisconnect) { + std::unique_ptr room = std::make_unique(); + + DisconnectTrackingDelegate delegate; + room->setDelegate(&delegate); + RoomOptions options; + ASSERT_TRUE(room->connect(server_url_, token_, options)); + ASSERT_EQ(room->connectionState(), ConnectionState::Connected); + + room.reset(); // invokes destructor which calls disconnect() + + EXPECT_EQ(delegate.count.load(), 1) << "destructor should fire onDisconnected exactly once"; + EXPECT_EQ(delegate.last_reason, DisconnectReason::ClientInitiated); +} + } // namespace livekit::test