diff --git a/.gitignore b/.gitignore index 3bc6e49..30f8232 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ Makefile cmake_install.cmake out build/ +received_green.avif diff --git a/CMakeLists.txt b/CMakeLists.txt index ec04b46..45cce83 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -159,6 +159,7 @@ add_library(livekit include/livekit/audio_frame.h include/livekit/audio_source.h include/livekit/audio_stream.h + include/livekit/data_stream.h include/livekit/room.h include/livekit/room_event_types.h include/livekit/room_delegate.h @@ -184,6 +185,7 @@ add_library(livekit src/audio_frame.cpp src/audio_source.cpp src/audio_stream.cpp + src/data_stream.cpp src/ffi_handle.cpp src/ffi_client.cpp src/local_audio_track.cpp diff --git a/README.md b/README.md index fe44301..c6937a4 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,40 @@ The caller will automatically: - Print round-trip times - Annotate expected successes or expected failures +### SimpleDataStream +- The SimpleDataStream example demonstrates how to: +- Connect multiple participants to the same LiveKit room +- Register text stream and byte stream handlers by topic (e.g. "chat", "files") +- Send a text stream (chat message) from one participant to another +- Send a byte stream (file/image) from one participant to another +- Attach custom stream metadata (e.g. sent_ms) via stream attributes +- Measure and print one-way latency on the receiver using sender timestamps +- Receive streamed chunks and reconstruct the full payload on the receiver + +#### 🔑 Generate Tokens +Before running any participant, create JWT tokens with caller and greeter identities and your room name. +```bash +lk token create -r test -i caller --join --valid-for 99999h --dev --room=your_own_room +lk token create -r test -i greeter --join --valid-for 99999h --dev --room=your_own_room +``` + +#### ▶ Start Participants +Start the receiver first (so it registers stream handlers before messages arrive): +```bash +./build/examples/SimpleDataStream --url $URL --token +``` +On another terminal or computer, start the sender +```bash +./build/examples/SimpleDataStream --url $URL --token +``` + +**Sender** (e.g. greeter) +- Waits for the peer, then sends a text stream ("chat") and a file stream ("files") with timestamps and metadata, logging stream IDs and send times. + +**Receiver** (e.g. caller) +- Registers handlers for text and file streams, logs stream events, computes one-way latency, and saves the received file locally. + + ## 🧰 Recommended Setup ### macOS ```bash diff --git a/data/green.avif b/data/green.avif new file mode 100644 index 0000000..2efe5cc Binary files /dev/null and b/data/green.avif differ diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 39e223e..020d175 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -52,4 +52,23 @@ target_link_libraries(SimpleRpc PRIVATE nlohmann_json::nlohmann_json livekit +) + +#################### SimpleDataStream example ########################## + +add_executable(SimpleDataStream + simple_data_stream/main.cpp +) + +target_link_libraries(SimpleDataStream + PRIVATE + livekit +) + +add_custom_command( + TARGET SimpleDataStream + POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_directory + ${CMAKE_SOURCE_DIR}/data + $/data ) \ No newline at end of file diff --git a/examples/simple_data_stream/main.cpp b/examples/simple_data_stream/main.cpp new file mode 100644 index 0000000..094eb88 --- /dev/null +++ b/examples/simple_data_stream/main.cpp @@ -0,0 +1,279 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "livekit/livekit.h" +#include "livekit_ffi.h" + +using namespace livekit; + +namespace { + +std::atomic g_running{true}; + +void handleSignal(int) { g_running.store(false); } + +// Helper: get env var or empty string +std::string getenvOrEmpty(const char *name) { + const char *v = std::getenv(name); + return v ? std::string(v) : std::string{}; +} + +std::int64_t nowEpochMs() { + using namespace std::chrono; + return duration_cast(system_clock::now().time_since_epoch()) + .count(); +} + +std::string randomHexId(std::size_t nbytes = 16) { + static thread_local std::mt19937_64 rng{std::random_device{}()}; + std::ostringstream oss; + for (std::size_t i = 0; i < nbytes; ++i) { + std::uint8_t b = static_cast(rng() & 0xFF); + const char *hex = "0123456789abcdef"; + oss << hex[(b >> 4) & 0xF] << hex[b & 0xF]; + } + return oss.str(); +} + +// Greeting: send text + image +void greetParticipant(Room &room, const std::string &identity) { + std::cout << "[DataStream] Greeting participant: " << identity << "\n"; + + LocalParticipant *lp = room.localParticipant(); + if (!lp) { + std::cerr << "[DataStream] No local participant, cannot greet.\n"; + return; + } + + try { + const std::int64_t sent_ms = nowEpochMs(); + const std::string sender_id = + !lp->identity().empty() ? lp->identity() : std::string("cpp_sender"); + const std::vector dest{identity}; + + // Send text stream ("chat") + const std::string chat_stream_id = randomHexId(); + const std::string reply_to_id = ""; + std::map chat_attrs; + chat_attrs["sent_ms"] = std::to_string(sent_ms); + chat_attrs["kind"] = "chat"; + chat_attrs["test_flag"] = "1"; + chat_attrs["seq"] = "1"; + + // Put timestamp in payload too (so you can compute latency even if + // attributes aren’t plumbed through your reader info yet). + const std::string body = "Hi! Just a friendly message"; + const std::string payload = "sent_ms=" + std::to_string(sent_ms) + "\n" + + "stream_id=" + chat_stream_id + "\n" + body; + TextStreamWriter text_writer(*lp, "chat", chat_attrs, chat_stream_id, + payload.size(), reply_to_id, dest, sender_id); + + const std::string message = "Hi! Just a friendly message"; + text_writer.write(message); // will be chunked internally if needed + text_writer.close(); // optional reason/attributes omitted + + // Send image as byte stream + const std::string file_path = "data/green.avif"; + std::ifstream in(file_path, std::ios::binary); + if (!in) { + std::cerr << "[DataStream] Failed to open file: " << file_path << "\n"; + return; + } + + std::vector data((std::istreambuf_iterator(in)), + std::istreambuf_iterator()); + + const std::string file_stream_id = randomHexId(); + std::map file_attrs; + file_attrs["sent_ms"] = std::to_string(sent_ms); + file_attrs["kind"] = "file"; + file_attrs["test_flag"] = "1"; + file_attrs["orig_path"] = file_path; + const std::string name = + std::filesystem::path(file_path).filename().string(); + const std::string mime = "image/avif"; + ByteStreamWriter byte_writer(*lp, name, "files", file_attrs, file_stream_id, + data.size(), mime, dest, sender_id); + byte_writer.write(data); + byte_writer.close(); + + std::cout << "[DataStream] Greeting sent to " << identity + << " (sent_ms=" << sent_ms << ")\n"; + } catch (const std::exception &e) { + std::cerr << "[DataStream] Error greeting participant " << identity << ": " + << e.what() << "\n"; + } +} + +// Handlers for incoming streams +void handleChatMessage(std::shared_ptr reader, + const std::string &participant_identity) { + try { + const auto info = reader->info(); // copy (safe even if reader goes away) + const std::int64_t recv_ms = nowEpochMs(); + const std::int64_t sent_ms = info.timestamp; + const auto latency = (sent_ms > 0) ? (recv_ms - sent_ms) : -1; + std::string full_text = reader->readAll(); + std::cout << "[DataStream] Received chat from " << participant_identity + << " topic=" << info.topic << " stream_id=" << info.stream_id + << " latency_ms=" << latency << " text='" << full_text << "'\n"; + } catch (const std::exception &e) { + std::cerr << "[DataStream] Error reading chat stream from " + << participant_identity << ": " << e.what() << "\n"; + } +} + +void handleWelcomeImage(std::shared_ptr reader, + const std::string &participant_identity) { + try { + const auto info = reader->info(); + const std::string stream_id = + info.stream_id.empty() ? "unknown" : info.stream_id; + const std::string original_name = + info.name.empty() ? "received_image.bin" : info.name; + // Latency: prefer header timestamp + std::int64_t sent_ms = info.timestamp; + // Optional: override with explicit attribute if you set it + auto it = info.attributes.find("sent_ms"); + if (it != info.attributes.end()) { + try { + sent_ms = std::stoll(it->second); + } catch (...) { + } + } + const std::int64_t recv_ms = nowEpochMs(); + const std::int64_t latency_ms = (sent_ms > 0) ? (recv_ms - sent_ms) : -1; + const std::string out_file = "received_" + original_name; + std::cout << "[DataStream] Receiving image from " << participant_identity + << " stream_id=" << stream_id << " name='" << original_name << "'" + << " mime='" << info.mime_type << "'" + << " size=" + << (info.size ? std::to_string(*info.size) : "unknown") + << " latency_ms=" << latency_ms << " -> '" << out_file << "'\n"; + std::ofstream out(out_file, std::ios::binary); + if (!out) { + std::cerr << "[DataStream] Failed to open output file: " << out_file + << "\n"; + return; + } + std::vector chunk; + std::uint64_t total_written = 0; + while (reader->readNext(chunk)) { + if (!chunk.empty()) { + out.write(reinterpret_cast(chunk.data()), + static_cast(chunk.size())); + total_written += chunk.size(); + } + } + std::cout << "[DataStream] Saved image from " << participant_identity + << " stream_id=" << stream_id << " bytes=" << total_written + << " to '" << out_file << std::endl; + } catch (const std::exception &e) { + std::cerr << "[DataStream] Error reading image stream from " + << participant_identity << ": " << e.what() << "\n"; + } +} + +} // namespace + +int main(int argc, char *argv[]) { + // Get URL and token from env. + std::string url = getenvOrEmpty("LIVEKIT_URL"); + std::string token = getenvOrEmpty("LIVEKIT_TOKEN"); + + if (argc >= 3) { + // Allow overriding via CLI: ./SimpleDataStream + url = argv[1]; + token = argv[2]; + } + + if (url.empty() || token.empty()) { + std::cerr << "LIVEKIT_URL and LIVEKIT_TOKEN (or CLI args) are required\n"; + return 1; + } + + std::cout << "[DataStream] Connecting to: " << url << "\n"; + + std::signal(SIGINT, handleSignal); +#ifdef SIGTERM + std::signal(SIGTERM, handleSignal); +#endif + + Room room{}; + RoomOptions options; + options.auto_subscribe = true; + options.dynacast = false; + + bool ok = room.Connect(url, token, options); + std::cout << "[DataStream] Connect result: " << std::boolalpha << ok << "\n"; + if (!ok) { + std::cerr << "[DataStream] Failed to connect to room\n"; + FfiClient::instance().shutdown(); + return 1; + } + + auto info = room.room_info(); + std::cout << "[DataStream] Connected to room '" << info.name + << "', participants: " << info.num_participants << "\n"; + + // Register stream handlers + room.registerTextStreamHandler( + "chat", [](std::shared_ptr reader, + const std::string &participant_identity) { + std::thread t(handleChatMessage, std::move(reader), + participant_identity); + t.detach(); + }); + + room.registerByteStreamHandler( + "files", [](std::shared_ptr reader, + const std::string &participant_identity) { + std::thread t(handleWelcomeImage, std::move(reader), + participant_identity); + t.detach(); + }); + + // Greet existing participants + { + auto remotes = room.remoteParticipants(); + for (const auto &rp : remotes) { + if (!rp) + continue; + std::cout << "Remote: " << rp->identity() << "\n"; + greetParticipant(room, rp->identity()); + } + } + + // Optionally: greet on join + // + // If Room API exposes a participant-connected callback, you could do: + // + // room.onParticipantConnected( + // [&](RemoteParticipant& participant) { + // std::cout << "[DataStream] participant connected: " + // << participant.sid() << " " << participant.identity() + // << "\n"; + // greetParticipant(room, participant.identity()); + // }); + // + // Adjust to your actual event API. + std::cout << "[DataStream] Ready. Waiting for streams (Ctrl-C to exit)...\n"; + // Keep process alive until signal + while (g_running.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + + std::cout << "[DataStream] Shutting down...\n"; + FfiClient::instance().shutdown(); + return 0; +} diff --git a/examples/simple_room/main.cpp b/examples/simple_room/main.cpp index 5741cc5..dd865c5 100644 --- a/examples/simple_room/main.cpp +++ b/examples/simple_room/main.cpp @@ -286,7 +286,7 @@ int main(int argc, char *argv[]) { try { // publishTrack takes std::shared_ptr, LocalAudioTrack derives from // Track - audioPub = room.local_participant()->publishTrack(audioTrack, audioOpts); + audioPub = room.localParticipant()->publishTrack(audioTrack, audioOpts); std::cout << "Published track:\n" << " SID: " << audioPub->sid() << "\n" @@ -314,7 +314,7 @@ int main(int argc, char *argv[]) { try { // publishTrack takes std::shared_ptr, LocalAudioTrack derives from // Track - videoPub = room.local_participant()->publishTrack(videoTrack, videoOpts); + videoPub = room.localParticipant()->publishTrack(videoTrack, videoOpts); std::cout << "Published track:\n" << " SID: " << videoPub->sid() << "\n" @@ -341,12 +341,12 @@ int main(int argc, char *argv[]) { media.stopMic(); // Clean up the audio track publishment - room.local_participant()->unpublishTrack(audioPub->sid()); + room.localParticipant()->unpublishTrack(audioPub->sid()); media.stopCamera(); // Clean up the video track publishment - room.local_participant()->unpublishTrack(videoPub->sid()); + room.localParticipant()->unpublishTrack(videoPub->sid()); FfiClient::instance().shutdown(); std::cout << "Exiting.\n"; diff --git a/examples/simple_rpc/main.cpp b/examples/simple_rpc/main.cpp index 61151db..f93f8f7 100644 --- a/examples/simple_rpc/main.cpp +++ b/examples/simple_rpc/main.cpp @@ -72,7 +72,7 @@ bool waitForParticipant(Room &room, const std::string &identity, auto start = std::chrono::steady_clock::now(); while (std::chrono::steady_clock::now() - start < timeout) { - if (room.remote_participant(identity) != nullptr) { + if (room.remoteParticipant(identity) != nullptr) { return true; } std::this_thread::sleep_for(100ms); @@ -233,8 +233,8 @@ std::string parseStringFromJson(const std::string &json) { // RPC handler registration void registerReceiverMethods(Room &greeters_room, Room &math_genius_room) { - LocalParticipant *greeter_lp = greeters_room.local_participant(); - LocalParticipant *math_genius_lp = math_genius_room.local_participant(); + LocalParticipant *greeter_lp = greeters_room.localParticipant(); + LocalParticipant *math_genius_lp = math_genius_room.localParticipant(); // arrival greeter_lp->registerRpcMethod( @@ -312,7 +312,7 @@ void performGreeting(Room &room) { std::cout << "[Caller] Letting the greeter know that I've arrived\n"; double t0 = nowMs(); try { - std::string response = room.local_participant()->performRpc( + std::string response = room.localParticipant()->performRpc( "greeter", "arrival", "Hello", std::nullopt); double t1 = nowMs(); std::cout << "[Caller] RTT: " << (t1 - t0) << " ms\n"; @@ -331,7 +331,7 @@ void performSquareRoot(Room &room) { double t0 = nowMs(); try { std::string payload = makeNumberJson("number", 16.0); - std::string response = room.local_participant()->performRpc( + std::string response = room.localParticipant()->performRpc( "math-genius", "square-root", payload, std::nullopt); double t1 = nowMs(); std::cout << "[Caller] RTT: " << (t1 - t0) << " ms\n"; @@ -353,7 +353,7 @@ void performQuantumHyperGeometricSeries(Room &room) { double t0 = nowMs(); try { std::string payload = makeNumberJson("number", 42.0); - std::string response = room.local_participant()->performRpc( + std::string response = room.localParticipant()->performRpc( "math-genius", "quantum-hypergeometric-series", payload, std::nullopt); double t1 = nowMs(); std::cout << "[Caller] (Unexpected success) RTT=" << (t1 - t0) << " ms\n"; @@ -380,7 +380,7 @@ void performDivide(Room &room) { double t0 = nowMs(); try { std::string payload = "{\"dividend\":10,\"divisor\":0}"; - std::string response = room.local_participant()->performRpc( + std::string response = room.localParticipant()->performRpc( "math-genius", "divide", payload, std::nullopt); double t1 = nowMs(); std::cout << "[Caller] (Unexpected success) RTT=" << (t1 - t0) << " ms\n"; @@ -409,7 +409,7 @@ void performLongCalculation(Room &room) { << "[Caller] Giving only 10s to respond. EXPECTED RESULT: TIMEOUT.\n"; double t0 = nowMs(); try { - std::string response = room.local_participant()->performRpc( + std::string response = room.localParticipant()->performRpc( "math-genius", "long-calculation", "{}", 10.0); double t1 = nowMs(); std::cout << "[Caller] (Unexpected success) RTT=" << (t1 - t0) << " ms\n"; diff --git a/include/livekit/audio_frame.h b/include/livekit/audio_frame.h index a7011c9..a860915 100644 --- a/include/livekit/audio_frame.h +++ b/include/livekit/audio_frame.h @@ -85,7 +85,7 @@ class AudioFrame { /// Duration in seconds (samples_per_channel / sample_rate). double duration() const noexcept; - /// A human-readable description (like Python __repr__). + /// A human-readable description. std::string to_string() const; private: diff --git a/include/livekit/audio_source.h b/include/livekit/audio_source.h index e3c864a..92401de 100644 --- a/include/livekit/audio_source.h +++ b/include/livekit/audio_source.h @@ -102,11 +102,6 @@ class AudioSource { */ void captureFrame(const AudioFrame &frame, int timeout_ms = 20); - /** - * Block until the currently queued audio has (roughly) played out. - */ - void waitForPlayout() const; - private: // Internal helper to reset the local queue tracking (like _release_waiter). void resetQueueTracking() noexcept; diff --git a/include/livekit/data_stream.h b/include/livekit/data_stream.h new file mode 100644 index 0000000..3d2a471 --- /dev/null +++ b/include/livekit/data_stream.h @@ -0,0 +1,293 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace livekit { + +class LocalParticipant; + +// Chunk size for data streams (matches Python STREAM_CHUNK_SIZE). +// Chosen to balance throughput and latency, and to work well with WebRTC data +// channels. +constexpr std::size_t kStreamChunkSize = 15'000; // 15 KB + +/// Base metadata for any stream (text or bytes). +struct BaseStreamInfo { + /// Unique identifier for this stream. + std::string stream_id; + + /// MIME type of the stream (e.g. "text/plain", "application/octet-stream"). + std::string mime_type; + + /// Application-defined topic name. + std::string topic; + + /// Timestamp in milliseconds when the stream was created. + std::int64_t timestamp = 0; + + /// Total size of the stream in bytes, if known. + std::optional size; + + /// Arbitrary key–value attributes attached to the stream. + std::map attributes; +}; + +/// Metadata for a text stream. +struct TextStreamInfo : BaseStreamInfo { + /// IDs of any attached streams (for replies / threads). + std::vector attachments; +}; + +/// Metadata for a byte stream. +struct ByteStreamInfo : BaseStreamInfo { + /// Optional name of the binary object (e.g. filename). + std::string name; +}; + +// Readers +// - TextStreamReader: yields UTF-8 text chunks (std::string) +// - ByteStreamReader: yields raw bytes (std::vector) + +/// Reader for incoming text streams. +/// Created internally by the SDK when a text stream header is received. +class TextStreamReader { +public: + /// Construct a reader from initial stream metadata. + explicit TextStreamReader(const TextStreamInfo &info); + + TextStreamReader(const TextStreamReader &) = delete; + TextStreamReader &operator=(const TextStreamReader &) = delete; + + /// Blocking read of next text chunk. + /// Returns false when the stream has ended. + bool readNext(std::string &out); + + /// Convenience: read entire stream into a single string. + /// Blocks until the stream is closed. + std::string readAll(); + + /// Metadata associated with this stream. + const TextStreamInfo &info() const noexcept { return info_; } + +private: + friend class Room; + + /// Called by the Room when a new chunk arrives. + void onChunkUpdate(const std::string &text); + + /// Called by the Room when the stream is closed. + /// Additional trailer attributes are merged into info().attributes. + void onStreamClose(const std::map &trailer_attrs); + + TextStreamInfo info_; + + // Queue of text chunks; empty string with closed_==true means EOS. + std::deque queue_; + bool closed_ = false; + + std::mutex mutex_; + std::condition_variable cv_; +}; + +/// Reader for incoming byte streams. +class ByteStreamReader { +public: + /// Construct a reader from initial stream metadata. + explicit ByteStreamReader(const ByteStreamInfo &info); + + ByteStreamReader(const ByteStreamReader &) = delete; + ByteStreamReader &operator=(const ByteStreamReader &) = delete; + + /// Blocking read of next byte chunk. + /// Returns false when the stream has ended. + bool readNext(std::vector &out); + + /// Metadata associated with this stream. + const ByteStreamInfo &info() const noexcept { return info_; } + +private: + friend class Room; + + /// Called by the Room when a new chunk arrives. + void onChunkUpdate(const std::vector &bytes); + + /// Called by the Room when the stream is closed. + /// Additional trailer attributes are merged into info().attributes. + void onStreamClose(const std::map &trailer_attrs); + + ByteStreamInfo info_; + + std::deque> queue_; + bool closed_ = false; + + std::mutex mutex_; + std::condition_variable cv_; +}; + +/// Base class for sending data streams. +/// Concrete subclasses are TextStreamWriter and ByteStreamWriter. +class BaseStreamWriter { +public: + virtual ~BaseStreamWriter() = default; + + /// Stream id assigned to this writer. + const std::string &streamId() const noexcept { return stream_id_; } + + /// Topic of this stream. + const std::string &topic() const noexcept { return topic_; } + + /// MIME type for this stream. + const std::string &mimeType() const noexcept { return mime_type_; } + + /// Timestamp (ms) when the stream was created. + std::int64_t timestampMs() const noexcept { return timestamp_ms_; } + + /// Whether the stream has been closed. + bool isClosed() const noexcept { return closed_; } + + /// Close the stream with optional reason and attributes. + /// Throws on FFI error or if already closed. + void close(const std::string &reason = "", + const std::map &attributes = {}); + +protected: + BaseStreamWriter(LocalParticipant &local_participant, + const std::string &topic = "", + const std::map &attributes = {}, + const std::string &stream_id = "", + std::optional total_size = std::nullopt, + const std::string &mime_type = "", + const std::vector &destination_identities = {}, + const std::string &sender_identity = ""); + + enum class StreamKind { kUnknown, kText, kByte }; + + LocalParticipant &local_participant_; + + // Public-ish metadata (mirrors BaseStreamInfo, but kept simple here) + std::string stream_id_; + std::string mime_type_; + std::string topic_; + std::int64_t timestamp_ms_ = 0; + std::optional total_size_; + std::map attributes_; + std::vector destination_identities_; + std::string sender_identity_; + + bool closed_ = false; + bool header_sent_ = false; + std::uint64_t next_chunk_index_ = 0; + StreamKind kind_ = StreamKind::kUnknown; + std::string reply_to_id_; + std::string byte_name_; // Used by ByteStreamWriter + + /// Ensure the header has been sent once. + /// Throws on error. + void ensureHeaderSent(); + + /// Send a raw chunk of bytes. + /// Throws on error or if stream is closed. + void sendChunk(const std::vector &content); + + /// Send the trailer with given reason and attributes. + /// Throws on error. + void sendTrailer(const std::string &reason, + const std::map &attributes); +}; + +/// Writer for outgoing text streams. +class TextStreamWriter : public BaseStreamWriter { +public: + TextStreamWriter(LocalParticipant &local_participant, + const std::string &topic = "", + const std::map &attributes = {}, + const std::string &stream_id = "", + std::optional total_size = std::nullopt, + const std::string &reply_to_id = "", + const std::vector &destination_identities = {}, + const std::string &sender_identity = ""); + + /// Write a UTF-8 string to the stream. + /// Data will be split into chunks of at most kStreamChunkSize bytes. + /// Throws on error or if the stream is closed. + void write(const std::string &text); + + /// Metadata associated with this stream. + const TextStreamInfo &info() const noexcept { return info_; } + +private: + TextStreamInfo info_; + std::mutex write_mutex_; +}; + +/// Writer for outgoing byte streams. +class ByteStreamWriter : public BaseStreamWriter { +public: + ByteStreamWriter(LocalParticipant &local_participant, const std::string &name, + const std::string &topic = "", + const std::map &attributes = {}, + const std::string &stream_id = "", + std::optional total_size = std::nullopt, + const std::string &mime_type = "application/octet-stream", + const std::vector &destination_identities = {}, + const std::string &sender_identity = ""); + + /// Write binary data to the stream. + /// Data will be chunked into kStreamChunkSize-sized chunks. + /// Throws on error or if the stream is closed. + void write(const std::vector &data); + + /// Metadata associated with this stream. + const ByteStreamInfo &info() const noexcept { return info_; } + +private: + ByteStreamInfo info_; + std::mutex write_mutex_; +}; + +/* Callback invoked when a new incoming text stream is opened. + * + * The TextStreamReader is provided as a shared_ptr to ensure it remains + * alive for the duration of asynchronous reads (for example, when the + * user spawns a background thread to consume the stream). + */ +using TextStreamHandler = + std::function, + const std::string &participant_identity)>; + +/* Callback invoked when a new incoming byte stream is opened. + * + * The ByteStreamReader is provided as a shared_ptr to ensure it remains + * alive for the duration of asynchronous reads (for example, file writes + * or background processing). + */ +using ByteStreamHandler = + std::function, + const std::string &participant_identity)>; + +} // namespace livekit diff --git a/include/livekit/ffi_client.h b/include/livekit/ffi_client.h index 6b2fa3d..9851af1 100644 --- a/include/livekit/ffi_client.h +++ b/include/livekit/ffi_client.h @@ -25,6 +25,7 @@ #include #include "livekit/stats.h" +#include "room.pb.h" namespace livekit { @@ -35,7 +36,8 @@ class FfiEvent; class FfiResponse; class FfiRequest; class OwnedTrackPublication; -class TranscriptionSegment; +class DataStream; + } // namespace proto struct RoomOptions; @@ -97,10 +99,6 @@ class FfiClient { bool reliable, const std::vector &destination_identities, const std::string &topic); - std::future publishTranscriptionAsync( - std::uint64_t local_participant_handle, - const std::string &participant_identity, const std::string &track_id, - const std::vector &segments); std::future publishSipDtmfAsync(std::uint64_t local_participant_handle, std::uint32_t code, const std::string &digit, @@ -117,6 +115,22 @@ class FfiClient { const std::string &payload, std::optional response_timeout_ms = std::nullopt); + // Data stream functionalities + std::future + sendStreamHeaderAsync(std::uint64_t local_participant_handle, + const proto::DataStream::Header &header, + const std::vector &destination_identities, + const std::string &sender_identity); + std::future + sendStreamChunkAsync(std::uint64_t local_participant_handle, + const proto::DataStream::Chunk &chunk, + const std::vector &destination_identities, + const std::string &sender_identity); + std::future + sendStreamTrailerAsync(std::uint64_t local_participant_handle, + const proto::DataStream::Trailer &trailer, + const std::string &sender_identity); + // Generic function for sending a request to the Rust FFI. // Note: For asynchronous requests, use the dedicated async functions instead // of sendRequest. diff --git a/include/livekit/local_participant.h b/include/livekit/local_participant.h index 3d9e36a..0a7352a 100644 --- a/include/livekit/local_participant.h +++ b/include/livekit/local_participant.h @@ -35,8 +35,6 @@ struct ParticipantTrackPermission; class FfiClient; class Track; class LocalTrackPublication; -// TODO, should consider moving Transcription to local_participant.h? -struct Transcription; struct RpcInvocationData { std::string request_id; @@ -98,13 +96,6 @@ class LocalParticipant : public Participant { */ void publishDtmf(int code, const std::string &digit); - /** - * Publish transcription data to the room. - * - * @param transcription - */ - void publishTranscription(const Transcription &transcription); - // ------------------------------------------------------------------------- // Metadata APIs (set metadata / name / attributes) // ------------------------------------------------------------------------- diff --git a/include/livekit/remote_participant.h b/include/livekit/remote_participant.h index cbd8aa3..367061f 100644 --- a/include/livekit/remote_participant.h +++ b/include/livekit/remote_participant.h @@ -46,7 +46,6 @@ class RemoteParticipant : public Participant { return track_publications_; } - // C++ equivalent of Python's __repr__ std::string to_string() const; protected: diff --git a/include/livekit/room.h b/include/livekit/room.h index 27e6654..318baaf 100644 --- a/include/livekit/room.h +++ b/include/livekit/room.h @@ -17,6 +17,7 @@ #ifndef LIVEKIT_ROOM_H #define LIVEKIT_ROOM_H +#include "livekit/data_stream.h" #include "livekit/ffi_client.h" #include "livekit/ffi_handle.h" #include "livekit/room_event_types.h" @@ -34,8 +35,7 @@ class FfiEvent; class LocalParticipant; class RemoteParticipant; -// Represents end-to-end encryption (E2EE) settings. -// Mirrors python sdk: `E2EEOptions` +/// Represents end-to-end encryption (E2EE) settings. struct E2EEOptions { // Encryption algorithm type. int encryption_type = 0; @@ -54,7 +54,6 @@ struct E2EEOptions { }; // Represents a single ICE server configuration. -// Mirrors python: RtcConfiguration.ice_servers[*] struct IceServer { // TURN/STUN server URL (e.g. "stun:stun.l.google.com:19302"). std::string url; @@ -67,7 +66,6 @@ struct IceServer { }; // WebRTC configuration (ICE, transport, etc.). -// Mirrors python: `RtcConfiguration` struct RtcConfig { // ICE transport type (e.g., ALL, RELAY). Maps to proto::IceTransportType. int ice_transport_type = 0; @@ -81,7 +79,6 @@ struct RtcConfig { }; // Top-level room connection options. -// Mirrors python: `RoomOptions` struct RoomOptions { // If true (default), automatically subscribe to all remote tracks. // This is CRITICAL. Without auto_subscribe, you will never receive: @@ -99,82 +96,143 @@ struct RoomOptions { std::optional rtc_config; }; -// Represents a LiveKit room session. -// A Room manages: -// - the connection to the LiveKit server -// - participant list (local + remote) -// - track publications -// - server events forwarded to a RoomDelegate +/// Represents a LiveKit room session. +/// A Room manages: +/// - the connection to the LiveKit server +/// - participant list (local + remote) +/// - track publications +/// - server events forwarded to a RoomDelegate class Room { public: Room(); ~Room(); - // Assign a RoomDelegate that receives room lifecycle callbacks. - // - // The delegate must remain valid for the lifetime of the Room or until a - // different delegate is assigned. The Room does not take ownership. - // Typical usage: - // class MyDelegate : public RoomDelegate { ... }; - // MyDelegate del; - // Room room; - // room.setDelegate(&del); + /* Assign a RoomDelegate that receives room lifecycle callbacks. + * + * The delegate must remain valid for the lifetime of the Room or until a + * different delegate is assigned. The Room does not take ownership. + * Typical usage: + * class MyDelegate : public RoomDelegate { ... }; + * MyDelegate del; + * Room room; + * room.setDelegate(&del); + */ void setDelegate(RoomDelegate *delegate); - // Connect to a LiveKit room using the given URL and token, applying the - // supplied connection options. - // - // Parameters: - // url — WebSocket URL of the LiveKit server. - // token — Access token for authentication. - // options — Connection options controlling auto-subscribe, - // dynacast, E2EE, and WebRTC configuration. - // Behavior: - // - Registers an FFI event listener *before* sending the connect request. - // - Sends a proto::FfiRequest::Connect with the URL, token, - // and the provided RoomOptions. - // - Blocks until the FFI connect response arrives. - // - Initializes local participant and remote participants. - // - Emits room/participant/track events to the delegate. - // IMPORTANT: - // RoomOptions defaults auto_subscribe = true. - // Without auto_subscribe enabled, remote tracks will NOT be subscribed - // automatically, and no remote audio/video will ever arrive. + /* Connect to a LiveKit room using the given URL and token, applying the + * supplied connection options. + * + * Parameters: + * url — WebSocket URL of the LiveKit server. + * token — Access token for authentication. + * options — Connection options controlling auto-subscribe, + * dynacast, E2EE, and WebRTC configuration. + * Behavior: + * - Registers an FFI event listener *before* sending the connect request. + * - Sends a proto::FfiRequest::Connect with the URL, token, + * and the provided RoomOptions. + * - Blocks until the FFI connect response arrives. + * - Initializes local participant and remote participants. + * - Emits room/participant/track events to the delegate. + * IMPORTANT: + * RoomOptions defaults auto_subscribe = true. + * Without auto_subscribe enabled, remote tracks will NOT be subscribed + * automatically, and no remote audio/video will ever arrive. + */ bool Connect(const std::string &url, const std::string &token, const RoomOptions &options); // Accessors - // Retrieve static metadata about the room. - // This contains fields such as: - // - SID - // - room name - // - metadata - // - participant counts - // - creation timestamp + /* Retrieve static metadata about the room. + * This contains fields such as: + * - SID + * - room name + * - metadata + * - participant counts + * - creation timestamp + */ RoomInfoData room_info() const; - // Get the local participant. - // - // This object represents the current user, including: - // - published tracks (audio/video/screen) - // - identity, SID, metadata - // - publishing/unpublishing operations - // Return value: - // Non-null pointer after successful Connect(). - LocalParticipant *local_participant() const; - - // Look up a remote participant by identity. - // - // Parameters: - // identity — The participant’s identity string (not SID) - // Return value: - // Pointer to RemoteParticipant if present, otherwise nullptr. - // RemoteParticipant contains: - // - identity/name/metadata - // - track publications - // - callbacks for track subscribed/unsubscribed, muted/unmuted - RemoteParticipant *remote_participant(const std::string &identity) const; + /* Get the local participant. + * + * This object represents the current user, including: + * - published tracks (audio/video/screen) + * - identity, SID, metadata + * - publishing/unpublishing operations + * Return value: + * Non-null pointer after successful Connect(). + */ + LocalParticipant *localParticipant() const; + + /* Look up a remote participant by identity. + * + * Parameters: + * identity — The participant’s identity string (not SID) + * Return value: + * Pointer to RemoteParticipant if present, otherwise nullptr. + * RemoteParticipant contains: + * - identity/name/metadata + * - track publications + * - callbacks for track subscribed/unsubscribed, muted/unmuted + */ + RemoteParticipant *remoteParticipant(const std::string &identity) const; + + /// Returns a snapshot of all current remote participants. + std::vector> remoteParticipants() const; + + /* Register a handler for incoming text streams on a specific topic. + * + * When a remote participant opens a text stream with the given topic, + * the handler is invoked with: + * - a shared_ptr for consuming the stream + * - the identity of the participant who sent the stream + * + * Notes: + * - Only one handler may be registered per topic. + * - If no handler is registered for a topic, incoming streams with that + * topic are ignored. + * - The handler is invoked on the Room event thread. The handler must + * not block; spawn a background thread if synchronous reading is + * required. + * + * Throws: + * std::runtime_error if a handler is already registered for the topic. + */ + void registerTextStreamHandler(const std::string &topic, + TextStreamHandler handler); + + /* Unregister the text stream handler for the given topic. + * + * If no handler exists for the topic, this function is a no-op. + */ + void unregisterTextStreamHandler(const std::string &topic); + + /* Register a handler for incoming byte streams on a specific topic. + * + * When a remote participant opens a byte stream with the given topic, + * the handler is invoked with: + * - a shared_ptr for consuming the stream + * - the identity of the participant who sent the stream + * + * Notes: + * - Only one handler may be registered per topic. + * - If no handler is registered for a topic, incoming streams with that + * topic are ignored. + * - The ByteStreamReader remains valid as long as the shared_ptr is held, + * preventing lifetime-related crashes when reading asynchronously. + * + * Throws: + * std::runtime_error if a handler is already registered for the topic. + */ + void registerByteStreamHandler(const std::string &topic, + ByteStreamHandler handler); + + /* Unregister the byte stream handler for the given topic. + * + * If no handler exists for the topic, this function is a no-op. + */ + void unregisterByteStreamHandler(const std::string &topic); private: mutable std::mutex lock_; @@ -186,6 +244,13 @@ class Room { std::unordered_map> remote_participants_; ConnectionState connection_state_ = ConnectionState::Disconnected; + // Data stream + std::unordered_map text_stream_handlers_; + std::unordered_map byte_stream_handlers_; + std::unordered_map> + text_stream_readers_; + std::unordered_map> + byte_stream_readers_; void OnEvent(const proto::FfiEvent &event); }; diff --git a/include/livekit/room_event_types.h b/include/livekit/room_event_types.h index d70c060..63c7514 100644 --- a/include/livekit/room_event_types.h +++ b/include/livekit/room_event_types.h @@ -331,29 +331,6 @@ struct TrackPublishOptions { std::optional preconnect_buffer; }; -/** - * One transcription segment produced by speech recognition. - */ -struct TranscriptionSegment { - /** Segment identifier. */ - std::string id; - - /** Transcribed text. */ - std::string text; - - /** Start time (ms) relative to the beginning of the audio source. */ - std::uint64_t start_time = 0; - - /** End time (ms) relative to the beginning of the audio source. */ - std::uint64_t end_time = 0; - - /** True if this segment is final and will not be updated further. */ - bool final = false; - - /** Language code (e.g. "en-US"). */ - std::string language; -}; - // --------------------------------------------------------- // Event structs – public representations of RoomEvent.* // --------------------------------------------------------- @@ -610,20 +587,6 @@ struct SipDtmfReceivedEvent { RemoteParticipant *participant = nullptr; }; -/** - * One transcription unit with optional participant/track linkage. - */ -struct Transcription { - /** Optional identity of the participant who spoke. */ - std::optional participant_identity; - - /** Optional SID of the track associated with this transcription. */ - std::optional track_sid; - - /** Ordered segments that make up the transcription. */ - std::vector segments; -}; - /** * Fired when the room's connection state changes. */ diff --git a/include/livekit/rpc_error.h b/include/livekit/rpc_error.h index 0fd7195..2efb988 100644 --- a/include/livekit/rpc_error.h +++ b/include/livekit/rpc_error.h @@ -40,7 +40,7 @@ class RpcError; class RpcError : public std::runtime_error { public: /** - * Built-in error codes, mirroring the Python RpcError.ErrorCode enum. + * Built-in error codes */ enum class ErrorCode : std::uint32_t { APPLICATION_ERROR = 1500, @@ -97,7 +97,7 @@ class RpcError : public std::runtime_error { /** * Create a built-in RpcError using a predefined ErrorCode and default - * message text that matches the Python RpcError.ErrorMessage table. + * message text. * * @param code Built-in error code. * @param data Optional extra data payload (JSON recommended). diff --git a/src/audio_source.cpp b/src/audio_source.cpp index b8cebc9..37c7960 100644 --- a/src/audio_source.cpp +++ b/src/audio_source.cpp @@ -128,16 +128,4 @@ void AudioSource::captureFrame(const AudioFrame &frame, int timeout_ms) { } } -void AudioSource::waitForPlayout() const { - // Python uses a future + event loop timer that fires after q_size. - // Here we approximate that by simply sleeping for the current queued - // duration. - double dur = queuedDuration(); - if (dur <= 0.0) { - return; - } - - std::this_thread::sleep_for(std::chrono::duration(dur)); -} - } // namespace livekit diff --git a/src/audio_stream.cpp b/src/audio_stream.cpp index 908c6b3..45a3669 100644 --- a/src/audio_stream.cpp +++ b/src/audio_stream.cpp @@ -212,10 +212,6 @@ void AudioStream::onFfiEvent(const FfiEvent &event) { } if (ase.has_frame_received()) { const auto &fr = ase.frame_received(); - - // Convert owned buffer -> AudioFrame via helper. - // Implement AudioFrame::fromOwnedInfo(...) to mirror Python's - // AudioFrame._from_owned_info. AudioFrame frame = AudioFrame::fromOwnedInfo(fr.frame()); AudioFrameEvent ev{std::move(frame)}; pushFrame(std::move(ev)); diff --git a/src/data_stream.cpp b/src/data_stream.cpp new file mode 100644 index 0000000..e9fa3be --- /dev/null +++ b/src/data_stream.cpp @@ -0,0 +1,337 @@ +#include "livekit/data_stream.h" + +#include +#include +#include +#include + +#include "livekit/ffi_client.h" +#include "livekit/local_participant.h" +#include "room.pb.h" + +namespace livekit { + +namespace { + +std::string generateRandomId(std::size_t bytes = 16) { + static thread_local std::mt19937_64 rng{std::random_device{}()}; + std::uniform_int_distribution dist(0, 255); + + std::string out; + out.reserve(bytes * 2); + const char *hex = "0123456789abcdef"; + for (std::size_t i = 0; i < bytes; ++i) { + int v = dist(rng); + out.push_back(hex[(v >> 4) & 0xF]); + out.push_back(hex[v & 0xF]); + } + return out; +} + +// Split UTF-8 string into chunks of at most max_bytes, not breaking codepoints. +std::vector splitUtf8(const std::string &s, + std::size_t max_bytes) { + std::vector result; + if (s.empty()) + return result; + + std::size_t i = 0; + const std::size_t n = s.size(); + + while (i < n) { + std::size_t end = std::min(i + max_bytes, n); + + // If end points into a UTF-8 continuation byte, walk back to a lead byte. + // NOTE: end is an index into the string; ensure end < n before indexing. + while (end > i && end < n && + (static_cast(s[end]) & 0xC0) == 0x80) { + --end; + } + // If end==n, we may still have landed on a continuation byte due to min(). + while (end > i && end == n && + (static_cast(s[end - 1]) & 0xC0) == 0x80) { + // Walk back until we hit a lead byte boundary. + // This isn't perfect but is consistent with your earlier intent. + --end; + // If we backed up too far, fall through to fallback. + } + + if (end == i) { + // Fallback: avoid infinite loop if bytes are pathological. + end = std::min(i + max_bytes, n); + } + + result.emplace_back(s.substr(i, end - i)); + i = end; + } + + return result; +} + +void fillBaseInfo(BaseStreamInfo &dst, const std::string &stream_id, + const std::string &mime_type, const std::string &topic, + std::int64_t timestamp_ms, + const std::optional &total_size, + const std::map &attrs) { + dst.stream_id = stream_id; + dst.mime_type = mime_type; + dst.topic = topic; + dst.timestamp = timestamp_ms; + dst.size = total_size; + dst.attributes = attrs; +} + +} // namespace + +// ===================================================================== +// Reader implementation +// ===================================================================== + +TextStreamReader::TextStreamReader(const TextStreamInfo &info) : info_(info) {} + +void TextStreamReader::onChunkUpdate(const std::string &text) { + { + std::lock_guard lock(mutex_); + if (closed_) + return; + queue_.push_back(text); + } + cv_.notify_one(); +} + +void TextStreamReader::onStreamClose( + const std::map &trailer_attrs) { + { + std::lock_guard lock(mutex_); + for (const auto &kv : trailer_attrs) { + info_.attributes[kv.first] = kv.second; + } + closed_ = true; + } + cv_.notify_all(); +} + +bool TextStreamReader::readNext(std::string &out) { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this] { return !queue_.empty() || closed_; }); + + if (!queue_.empty()) { + out = std::move(queue_.front()); + queue_.pop_front(); + return true; + } + return false; // closed_ and empty +} + +std::string TextStreamReader::readAll() { + std::string result; + std::string chunk; + while (readNext(chunk)) + result += chunk; + return result; +} + +ByteStreamReader::ByteStreamReader(const ByteStreamInfo &info) : info_(info) {} + +void ByteStreamReader::onChunkUpdate(const std::vector &bytes) { + { + std::lock_guard lock(mutex_); + if (closed_) + return; + queue_.push_back(bytes); + } + cv_.notify_one(); +} + +void ByteStreamReader::onStreamClose( + const std::map &trailer_attrs) { + { + std::lock_guard lock(mutex_); + for (const auto &kv : trailer_attrs) { + info_.attributes[kv.first] = kv.second; + } + closed_ = true; + } + cv_.notify_all(); +} + +bool ByteStreamReader::readNext(std::vector &out) { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this] { return !queue_.empty() || closed_; }); + + if (!queue_.empty()) { + out = std::move(queue_.front()); + queue_.pop_front(); + return true; + } + return false; +} + +// ===================================================================== +// Writer implementation (uses your future-based FfiClient) +// ===================================================================== + +BaseStreamWriter::BaseStreamWriter( + LocalParticipant &local_participant, const std::string &topic, + const std::map &attributes, + const std::string &stream_id, std::optional total_size, + const std::string &mime_type, + const std::vector &destination_identities, + const std::string &sender_identity) + : local_participant_(local_participant), + stream_id_(stream_id.empty() ? generateRandomId() : stream_id), + mime_type_(mime_type), topic_(topic), + timestamp_ms_(std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count()), + total_size_(total_size), attributes_(attributes), + destination_identities_(destination_identities), + sender_identity_(sender_identity) { + if (sender_identity_.empty()) { + sender_identity_ = local_participant_.identity(); + } +} + +void BaseStreamWriter::ensureHeaderSent() { + if (header_sent_) + return; + proto::DataStream::Header header; + header.set_stream_id(stream_id_); + header.set_timestamp(timestamp_ms_); + header.set_mime_type(mime_type_); + header.set_topic(topic_); + + if (total_size_.has_value()) { + header.set_total_length(static_cast(*total_size_)); + } + for (const auto &kv : attributes_) { + (*header.mutable_attributes())[kv.first] = kv.second; + } + if (kind_ == StreamKind::kText) { + auto *th = header.mutable_text_header(); + th->set_operation_type( + proto::DataStream::OperationType::DataStream_OperationType_CREATE); + if (!reply_to_id_.empty()) { + th->set_reply_to_stream_id(reply_to_id_); + } + } else if (kind_ == StreamKind::kByte) { + header.mutable_byte_header()->set_name(byte_name_); + } + + FfiClient::instance() + .sendStreamHeaderAsync(local_participant_.ffiHandleId(), header, + destination_identities_, sender_identity_) + .get(); + header_sent_ = true; +} + +void BaseStreamWriter::sendChunk(const std::vector &content) { + if (closed_) + throw std::runtime_error("Cannot send chunk after stream is closed"); + + ensureHeaderSent(); + + proto::DataStream::Chunk chunk; + chunk.set_stream_id(stream_id_); + chunk.set_chunk_index(next_chunk_index_++); // ✅ uses the new member + chunk.set_content(content.data(), content.size()); + + FfiClient::instance() + .sendStreamChunkAsync(local_participant_.ffiHandleId(), chunk, + destination_identities_, sender_identity_) + .get(); +} + +void BaseStreamWriter::sendTrailer( + const std::string &reason, + const std::map &attributes) { + ensureHeaderSent(); + + proto::DataStream::Trailer trailer; + trailer.set_stream_id(stream_id_); + trailer.set_reason(reason); + + for (const auto &kv : attributes) { + (*trailer.mutable_attributes())[kv.first] = kv.second; + } + + FfiClient::instance() + .sendStreamTrailerAsync(local_participant_.ffiHandleId(), trailer, + sender_identity_) + .get(); +} + +void BaseStreamWriter::close( + const std::string &reason, + const std::map &attributes) { + if (closed_) + throw std::runtime_error("Stream already closed"); + closed_ = true; + sendTrailer(reason, attributes); +} + +TextStreamWriter::TextStreamWriter( + LocalParticipant &local_participant, const std::string &topic, + const std::map &attributes, + const std::string &stream_id, std::optional total_size, + const std::string &reply_to_id, + const std::vector &destination_identities, + const std::string &sender_identity) + : BaseStreamWriter( + local_participant, topic, attributes, stream_id, total_size, + /*mime_type=*/"text/plain", destination_identities, sender_identity) { + kind_ = StreamKind::kText; + reply_to_id_ = reply_to_id; + // ✅ Canonical user-facing metadata comes from BaseStreamWriter fields. + fillBaseInfo(info_, stream_id_, mime_type_, topic_, timestamp_ms_, + total_size_, attributes_); +} + +void TextStreamWriter::write(const std::string &text) { + std::lock_guard lock(write_mutex_); + if (closed_) + throw std::runtime_error("Cannot write to closed TextStreamWriter"); + + for (const auto &chunk_str : splitUtf8(text, kStreamChunkSize)) { + const auto *p = reinterpret_cast(chunk_str.data()); + std::vector bytes(p, p + chunk_str.size()); + std::cout << "sending chunk " << std::endl; + sendChunk(bytes); + } +} + +ByteStreamWriter::ByteStreamWriter( + LocalParticipant &local_participant, const std::string &name, + const std::string &topic, + const std::map &attributes, + const std::string &stream_id, std::optional total_size, + const std::string &mime_type, + const std::vector &destination_identities, + const std::string &sender_identity) + : BaseStreamWriter(local_participant, topic, attributes, stream_id, + total_size, mime_type, destination_identities, + sender_identity) { + kind_ = StreamKind::kByte; + byte_name_ = name; + fillBaseInfo(info_, stream_id_, mime_type_, topic_, timestamp_ms_, + total_size_, attributes_); + info_.name = name; +} + +void ByteStreamWriter::write(const std::vector &data) { + std::lock_guard lock(write_mutex_); + if (closed_) + throw std::runtime_error("Cannot write to closed ByteStreamWriter"); + + std::size_t offset = 0; + while (offset < data.size()) { + const std::size_t n = + std::min(kStreamChunkSize, data.size() - offset); + std::vector chunk(data.begin() + offset, + data.begin() + offset + n); + sendChunk(chunk); + offset += n; + } +} + +} // namespace livekit diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 38bc52d..ee7bba7 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -378,40 +378,6 @@ std::future FfiClient::publishDataAsync( }); } -std::future FfiClient::publishTranscriptionAsync( - std::uint64_t local_participant_handle, - const std::string &participant_identity, const std::string &track_id, - const std::vector &segments) { - proto::FfiRequest req; - auto *msg = req.mutable_publish_transcription(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_participant_identity(participant_identity); - msg->set_track_id(track_id); - for (const auto &seg : segments) { - auto *dst = msg->add_segments(); - dst->CopyFrom(seg); - } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_publish_transcription()) { - throw std::runtime_error("FfiResponse missing publish_transcription"); - } - const AsyncId async_id = resp.publish_transcription().async_id(); - return registerAsync( - [async_id](const proto::FfiEvent &event) { - return event.has_publish_transcription() && - event.publish_transcription().async_id() == async_id; - }, - [](const proto::FfiEvent &event, std::promise &pr) { - const auto &cb = event.publish_transcription(); - if (cb.has_error() && !cb.error().empty()) { - pr.set_exception( - std::make_exception_ptr(std::runtime_error(cb.error()))); - return; - } - pr.set_value(); - }); -} - std::future FfiClient::publishSipDtmfAsync( std::uint64_t local_participant_handle, std::uint32_t code, const std::string &digit, @@ -545,4 +511,107 @@ FfiClient::performRpcAsync(std::uint64_t local_participant_handle, }); } +std::future FfiClient::sendStreamHeaderAsync( + std::uint64_t local_participant_handle, + const proto::DataStream::Header &header, + const std::vector &destination_identities, + const std::string &sender_identity) { + proto::FfiRequest req; + auto *msg = req.mutable_send_stream_header(); + msg->set_local_participant_handle(local_participant_handle); + *msg->mutable_header() = header; + msg->set_sender_identity(sender_identity); + for (const auto &id : destination_identities) { + msg->add_destination_identities(id); + } + + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_send_stream_header()) { + throw std::runtime_error("FfiResponse missing send_stream_header"); + } + const AsyncId async_id = resp.send_stream_header().async_id(); + + return registerAsync( + [async_id](const proto::FfiEvent &e) { + return e.has_send_stream_header() && + e.send_stream_header().async_id() == async_id; + }, + [](const proto::FfiEvent &e, std::promise &pr) { + const auto &cb = e.send_stream_header(); + if (!cb.error().empty()) { + pr.set_exception( + std::make_exception_ptr(std::runtime_error(cb.error()))); + return; + } + pr.set_value(); + }); +} + +std::future FfiClient::sendStreamChunkAsync( + std::uint64_t local_participant_handle, + const proto::DataStream::Chunk &chunk, + const std::vector &destination_identities, + const std::string &sender_identity) { + proto::FfiRequest req; + auto *msg = req.mutable_send_stream_chunk(); + msg->set_local_participant_handle(local_participant_handle); + *msg->mutable_chunk() = chunk; + msg->set_sender_identity(sender_identity); + for (const auto &id : destination_identities) { + msg->add_destination_identities(id); + } + + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_send_stream_chunk()) { + throw std::runtime_error("FfiResponse missing send_stream_chunk"); + } + const AsyncId async_id = resp.send_stream_chunk().async_id(); + return registerAsync( + [async_id](const proto::FfiEvent &e) { + return e.has_send_stream_chunk() && + e.send_stream_chunk().async_id() == async_id; + }, + [](const proto::FfiEvent &e, std::promise &pr) { + const auto &cb = e.send_stream_chunk(); + if (!cb.error().empty()) { + pr.set_exception( + std::make_exception_ptr(std::runtime_error(cb.error()))); + return; + } + pr.set_value(); + }); +} + +std::future +FfiClient::sendStreamTrailerAsync(std::uint64_t local_participant_handle, + const proto::DataStream::Trailer &trailer, + const std::string &sender_identity) { + proto::FfiRequest req; + auto *msg = req.mutable_send_stream_trailer(); + msg->set_local_participant_handle(local_participant_handle); + *msg->mutable_trailer() = trailer; + msg->set_sender_identity(sender_identity); + + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_send_stream_trailer()) { + throw std::runtime_error("FfiResponse missing send_stream_trailer"); + } + const AsyncId async_id = resp.send_stream_trailer().async_id(); + + return registerAsync( + [async_id](const proto::FfiEvent &e) { + return e.has_send_stream_trailer() && + e.send_stream_trailer().async_id() == async_id; + }, + [](const proto::FfiEvent &e, std::promise &pr) { + const auto &cb = e.send_stream_trailer(); + if (!cb.error().empty()) { + pr.set_exception( + std::make_exception_ptr(std::runtime_error(cb.error()))); + return; + } + pr.set_value(); + }); +} + } // namespace livekit diff --git a/src/local_participant.cpp b/src/local_participant.cpp index d717024..27b4dd3 100644 --- a/src/local_participant.cpp +++ b/src/local_participant.cpp @@ -84,35 +84,6 @@ void LocalParticipant::publishDtmf(int code, const std::string &digit) { fut.get(); } -void LocalParticipant::publishTranscription( - const Transcription &transcription) { - auto handle_id = ffiHandleId(); - if (handle_id == 0) { - throw std::runtime_error( - "LocalParticipant::publishTranscription: invalid FFI handle"); - } - - std::vector segs; - segs.reserve(transcription.segments.size()); - for (const auto &seg : transcription.segments) { - segs.push_back(toProto(seg)); - } - - // Handle optional participant_identity / track_sid - const std::string participant_identity = - transcription.participant_identity.has_value() - ? *transcription.participant_identity - : std::string{}; - const std::string track_sid = transcription.track_sid.has_value() - ? *transcription.track_sid - : std::string{}; - // Call async API and block until completion - auto fut = FfiClient::instance().publishTranscriptionAsync( - static_cast(handle_id), participant_identity, track_sid, - segs); - fut.get(); -} - void LocalParticipant::setMetadata(const std::string &metadata) { auto handle_id = ffiHandleId(); if (handle_id == 0) { diff --git a/src/room.cpp b/src/room.cpp index eaf82f0..e3d93b2 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -122,7 +122,7 @@ bool Room::Connect(const std::string &url, const std::string &token, for (const auto &pt : participants) { const auto &owned = pt.participant(); auto rp = createRemoteParticipant(owned); - // Add the initial remote participant tracks (like Python does) + // Add the initial remote participant tracks for (const auto &owned_publication_info : pt.publications()) { auto publication = std::make_shared(owned_publication_info); @@ -148,17 +148,60 @@ RoomInfoData Room::room_info() const { return room_info_; } -LocalParticipant *Room::local_participant() const { +LocalParticipant *Room::localParticipant() const { std::lock_guard g(lock_); return local_participant_.get(); } -RemoteParticipant *Room::remote_participant(const std::string &identity) const { +RemoteParticipant *Room::remoteParticipant(const std::string &identity) const { std::lock_guard g(lock_); auto it = remote_participants_.find(identity); return it == remote_participants_.end() ? nullptr : it->second.get(); } +std::vector> +Room::remoteParticipants() const { + std::lock_guard guard(lock_); + std::vector> out; + out.reserve(remote_participants_.size()); + for (const auto &kv : remote_participants_) { + out.push_back(kv.second); + } + return out; +} + +void Room::registerTextStreamHandler(const std::string &topic, + TextStreamHandler handler) { + std::lock_guard g(lock_); + auto [it, inserted] = + text_stream_handlers_.emplace(topic, std::move(handler)); + if (!inserted) { + throw std::runtime_error("text stream handler for topic '" + topic + + "' already set"); + } +} + +void Room::unregisterTextStreamHandler(const std::string &topic) { + std::lock_guard g(lock_); + text_stream_handlers_.erase(topic); +} + +void Room::registerByteStreamHandler(const std::string &topic, + ByteStreamHandler handler) { + std::lock_guard g(lock_); + auto [it, inserted] = + byte_stream_handlers_.emplace(topic, std::move(handler)); + if (!inserted) { + throw std::runtime_error("byte stream handler for topic '" + topic + + "' already set"); + } +} + +void Room::unregisterByteStreamHandler(const std::string &topic) { + std::lock_guard g(lock_); + byte_stream_handlers_.erase(topic); +} + void Room::OnEvent(const FfiEvent &event) { // Take a snapshot of the delegate under lock, but do NOT call it under the // lock. @@ -198,10 +241,6 @@ void Room::OnEvent(const FfiEvent &event) { return; } - if (!delegate_snapshot) { - return; - } - switch (event.message_case()) { case FfiEvent::kRoomEvent: { const proto::RoomEvent &re = event.room_event(); @@ -218,14 +257,14 @@ void Room::OnEvent(const FfiEvent &event) { } ParticipantConnectedEvent ev; ev.participant = new_participant.get(); - delegate_snapshot->onParticipantConnected(*this, ev); - + if (delegate_snapshot) { + delegate_snapshot->onParticipantConnected(*this, ev); + } break; } case proto::RoomEvent::kParticipantDisconnected: { std::shared_ptr removed; DisconnectReason reason = DisconnectReason::Unknown; - { std::lock_guard guard(lock_); const auto &pd = re.participant_disconnected(); @@ -248,7 +287,9 @@ void Room::OnEvent(const FfiEvent &event) { ParticipantDisconnectedEvent ev; ev.participant = removed.get(); ev.reason = reason; - delegate_snapshot->onParticipantDisconnected(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onParticipantDisconnected(*this, ev); + } } break; } @@ -273,7 +314,9 @@ void Room::OnEvent(const FfiEvent &event) { ev.publication = it->second; ev.track = ev.publication ? ev.publication->track() : nullptr; } - delegate_snapshot->onLocalTrackPublished(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onLocalTrackPublished(*this, ev); + } break; } case proto::RoomEvent::kLocalTrackUnpublished: { @@ -296,7 +339,9 @@ void Room::OnEvent(const FfiEvent &event) { } ev.publication = it->second; } - delegate_snapshot->onLocalTrackUnpublished(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onLocalTrackUnpublished(*this, ev); + } break; } case proto::RoomEvent::kLocalTrackSubscribed: { @@ -319,7 +364,9 @@ void Room::OnEvent(const FfiEvent &event) { ev.track = publication ? publication->track() : nullptr; } - delegate_snapshot->onLocalTrackSubscribed(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onLocalTrackSubscribed(*this, ev); + } break; } case proto::RoomEvent::kTrackPublished: { @@ -347,7 +394,9 @@ void Room::OnEvent(const FfiEvent &event) { break; } } - delegate_snapshot->onTrackPublished(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onTrackPublished(*this, ev); + } break; } case proto::RoomEvent::kTrackUnpublished: { @@ -376,7 +425,9 @@ void Room::OnEvent(const FfiEvent &event) { pubs.erase(it); } - delegate_snapshot->onTrackUnpublished(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onTrackUnpublished(*this, ev); + } break; } case proto::RoomEvent::kTrackSubscribed: { @@ -428,7 +479,9 @@ void Room::OnEvent(const FfiEvent &event) { ev.track = remote_track; ev.publication = rpublication; ev.participant = rparticipant; - delegate_snapshot->onTrackSubscribed(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onTrackSubscribed(*this, ev); + } break; } case proto::RoomEvent::kTrackUnsubscribed: { @@ -461,7 +514,9 @@ void Room::OnEvent(const FfiEvent &event) { ev.track = track; } - delegate_snapshot->onTrackUnsubscribed(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onTrackUnsubscribed(*this, ev); + } break; } case proto::RoomEvent::kTrackSubscriptionFailed: { @@ -480,7 +535,9 @@ void Room::OnEvent(const FfiEvent &event) { ev.track_sid = tsf.track_sid(); ev.error = tsf.error(); } - delegate_snapshot->onTrackSubscriptionFailed(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onTrackSubscriptionFailed(*this, ev); + } break; } case proto::RoomEvent::kTrackMuted: { @@ -519,7 +576,7 @@ void Room::OnEvent(const FfiEvent &event) { success = true; } } - if (success) { + if (success && delegate_snapshot) { delegate_snapshot->onTrackMuted(*this, ev); } break; @@ -565,7 +622,7 @@ void Room::OnEvent(const FfiEvent &event) { ev.publication = pub; } - if (success) { + if (success && delegate_snapshot) { delegate_snapshot->onTrackUnmuted(*this, ev); } break; @@ -591,7 +648,9 @@ void Room::OnEvent(const FfiEvent &event) { } } } - delegate_snapshot->onActiveSpeakersChanged(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onActiveSpeakersChanged(*this, ev); + } break; } case proto::RoomEvent::kRoomMetadataChanged: { @@ -603,7 +662,9 @@ void Room::OnEvent(const FfiEvent &event) { ev.old_metadata = old_metadata; ev.new_metadata = room_info_.metadata; } - delegate_snapshot->onRoomMetadataChanged(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onRoomMetadataChanged(*this, ev); + } break; } case proto::RoomEvent::kRoomSidChanged: { @@ -613,7 +674,9 @@ void Room::OnEvent(const FfiEvent &event) { room_info_.sid = re.room_sid_changed().sid(); ev.sid = room_info_.sid.value_or(std::string{}); } - delegate_snapshot->onRoomSidChanged(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onRoomSidChanged(*this, ev); + } break; } case proto::RoomEvent::kParticipantMetadataChanged: { @@ -643,7 +706,9 @@ void Room::OnEvent(const FfiEvent &event) { ev.new_metadata = participant->metadata(); } - delegate_snapshot->onParticipantMetadataChanged(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onParticipantMetadataChanged(*this, ev); + } break; } case proto::RoomEvent::kParticipantNameChanged: { @@ -672,7 +737,9 @@ void Room::OnEvent(const FfiEvent &event) { ev.old_name = old_name; ev.new_name = participant->name(); } - delegate_snapshot->onParticipantNameChanged(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onParticipantNameChanged(*this, ev); + } break; } case proto::RoomEvent::kParticipantAttributesChanged: { @@ -709,7 +776,9 @@ void Room::OnEvent(const FfiEvent &event) { } ev.participant = participant; } - delegate_snapshot->onParticipantAttributesChanged(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onParticipantAttributesChanged(*this, ev); + } break; } case proto::RoomEvent::kParticipantEncryptionStatusChanged: { @@ -737,7 +806,9 @@ void Room::OnEvent(const FfiEvent &event) { ev.is_encrypted = pe.is_encrypted(); } - delegate_snapshot->onParticipantEncryptionStatusChanged(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onParticipantEncryptionStatusChanged(*this, ev); + } break; } case proto::RoomEvent::kConnectionQualityChanged: { @@ -764,16 +835,9 @@ void Room::OnEvent(const FfiEvent &event) { ev.quality = static_cast(cq.quality()); } - delegate_snapshot->onConnectionQualityChanged(*this, ev); - break; - } - - // ------------------------------------------------------------------------ - // Transcription - // ------------------------------------------------------------------------ - - case proto::RoomEvent::kTranscriptionReceived: { - // Deprecated event, do nothing. + if (delegate_snapshot) { + delegate_snapshot->onConnectionQualityChanged(*this, ev); + } break; } @@ -791,10 +855,11 @@ void Room::OnEvent(const FfiEvent &event) { } } const auto which_val = dp.value_case(); - if (which_val == proto::DataPacketReceived::kUser) { + if (which_val == proto::DataPacketReceived::kUser && delegate_snapshot) { UserDataPacketEvent ev = userDataPacketFromProto(dp, rp); delegate_snapshot->onUserPacketReceived(*this, ev); - } else if (which_val == proto::DataPacketReceived::kSipDtmf) { + } else if (which_val == proto::DataPacketReceived::kSipDtmf && + delegate_snapshot) { SipDtmfReceivedEvent ev = sipDtmfFromProto(dp, rp); delegate_snapshot->onSipDtmfReceived(*this, ev); } @@ -828,7 +893,9 @@ void Room::OnEvent(const FfiEvent &event) { ev.participant = participant; ev.state = static_cast(es.state()); } - delegate_snapshot->onE2eeStateChanged(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onE2eeStateChanged(*this, ev); + } break; } @@ -844,28 +911,38 @@ void Room::OnEvent(const FfiEvent &event) { connection_state_ = static_cast(cs.state()); ev.state = connection_state_; } - delegate_snapshot->onConnectionStateChanged(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onConnectionStateChanged(*this, ev); + } break; } case proto::RoomEvent::kDisconnected: { DisconnectedEvent ev; ev.reason = toDisconnectReason(re.disconnected().reason()); - delegate_snapshot->onDisconnected(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onDisconnected(*this, ev); + } break; } case proto::RoomEvent::kReconnecting: { ReconnectingEvent ev; - delegate_snapshot->onReconnecting(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onReconnecting(*this, ev); + } break; } case proto::RoomEvent::kReconnected: { ReconnectedEvent ev; - delegate_snapshot->onReconnected(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onReconnected(*this, ev); + } break; } case proto::RoomEvent::kEos: { RoomEosEvent ev; - delegate_snapshot->onRoomEos(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onRoomEos(*this, ev); + } break; } case proto::RoomEvent::kChatMessage: { @@ -873,44 +950,150 @@ void Room::OnEvent(const FfiEvent &event) { break; } case proto::RoomEvent::kStreamHeaderReceived: { - auto ev = fromProto(re.stream_header_received()); - delegate_snapshot->onDataStreamHeaderReceived(*this, ev); + const auto &sh = re.stream_header_received(); + const auto &header = sh.header(); + const std::string &participant_identity = sh.participant_identity(); + + // Snapshot handler + create reader without holding lock during user + // callback + TextStreamHandler text_cb; + ByteStreamHandler byte_cb; + std::shared_ptr text_reader; + std::shared_ptr byte_reader; + { + std::lock_guard guard(lock_); + + // Determine stream type from oneof in protobuf + // Adjust these names if your generated C++ uses different ones + const auto stream_type = header.content_header_case(); + if (stream_type == proto::DataStream::Header::kTextHeader) { + auto it = text_stream_handlers_.find(header.topic()); + if (it == text_stream_handlers_.end()) { + // Ignore if no callback attached + break; + } + text_cb = it->second; + + TextStreamInfo info = makeTextInfo(header); + text_reader = std::make_shared(info); + text_stream_readers_[header.stream_id()] = text_reader; + + } else if (stream_type == proto::DataStream::Header::kByteHeader) { + auto it = byte_stream_handlers_.find(header.topic()); + if (it == byte_stream_handlers_.end()) { + break; + } + byte_cb = it->second; + ByteStreamInfo info = makeByteInfo(header); + byte_reader = std::make_shared(info); + byte_stream_readers_[header.stream_id()] = byte_reader; + + } else { + // unknown header type: ignore + break; + } + } + + // Invoke user callback outside lock (very important) + if (text_reader) { + text_cb(text_reader, participant_identity); + } else if (byte_reader) { + byte_cb(byte_reader, participant_identity); + } break; } case proto::RoomEvent::kStreamChunkReceived: { - auto ev = fromProto(re.stream_chunk_received()); - delegate_snapshot->onDataStreamChunkReceived(*this, ev); + const auto &sc = re.stream_chunk_received(); + const auto &chunk = sc.chunk(); + std::shared_ptr text_reader; + std::shared_ptr byte_reader; + { + std::lock_guard guard(lock_); + auto itT = text_stream_readers_.find(chunk.stream_id()); + if (itT != text_stream_readers_.end()) { + text_reader = itT->second; + } else { + auto itB = byte_stream_readers_.find(chunk.stream_id()); + if (itB != byte_stream_readers_.end()) { + byte_reader = itB->second; + } + } + } + if (text_reader) { + // chunk.content() is bytes; treat as UTF-8 string. + text_reader->onChunkUpdate(chunk.content()); + } else if (byte_reader) { + // Convert string bytes -> vector + const std::string &s = chunk.content(); + std::vector bytes(s.begin(), s.end()); + byte_reader->onChunkUpdate(bytes); + } break; } case proto::RoomEvent::kStreamTrailerReceived: { - auto ev = fromProto(re.stream_trailer_received()); - delegate_snapshot->onDataStreamTrailerReceived(*this, ev); + const auto &st = re.stream_trailer_received(); + const auto &trailer = st.trailer(); + std::shared_ptr text_reader; + std::shared_ptr byte_reader; + std::map trailer_attrs; + for (const auto &kv : trailer.attributes()) { + trailer_attrs.emplace(kv.first, kv.second); + } + { + std::lock_guard guard(lock_); + auto itT = text_stream_readers_.find(trailer.stream_id()); + if (itT != text_stream_readers_.end()) { + text_reader = itT->second; + text_stream_readers_.erase(itT); + } else { + auto itB = byte_stream_readers_.find(trailer.stream_id()); + if (itB != byte_stream_readers_.end()) { + byte_reader = itB->second; + byte_stream_readers_.erase(itB); + } + } + } + if (text_reader) { + text_reader->onStreamClose(trailer_attrs); + } else if (byte_reader) { + byte_reader->onStreamClose(trailer_attrs); + } break; } case proto::RoomEvent::kDataChannelLowThresholdChanged: { auto ev = fromProto(re.data_channel_low_threshold_changed()); - delegate_snapshot->onDataChannelBufferedAmountLowThresholdChanged(*this, - ev); + if (delegate_snapshot) { + delegate_snapshot->onDataChannelBufferedAmountLowThresholdChanged(*this, + ev); + } break; } case proto::RoomEvent::kByteStreamOpened: { auto ev = fromProto(re.byte_stream_opened()); - delegate_snapshot->onByteStreamOpened(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onByteStreamOpened(*this, ev); + } break; } case proto::RoomEvent::kTextStreamOpened: { auto ev = fromProto(re.text_stream_opened()); - delegate_snapshot->onTextStreamOpened(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onTextStreamOpened(*this, ev); + } break; } case proto::RoomEvent::kRoomUpdated: { auto ev = roomUpdatedFromProto(re.room_updated()); - delegate_snapshot->onRoomUpdated(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onRoomUpdated(*this, ev); + } break; } case proto::RoomEvent::kMoved: { auto ev = roomMovedFromProto(re.moved()); - delegate_snapshot->onRoomMoved(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onRoomMoved(*this, ev); + } break; } case proto::RoomEvent::kParticipantsUpdated: { @@ -954,7 +1137,9 @@ void Room::OnEvent(const FfiEvent &event) { ev.participants.push_back(participant); } } - delegate_snapshot->onParticipantsUpdated(*this, ev); + if (delegate_snapshot) { + delegate_snapshot->onParticipantsUpdated(*this, ev); + } break; } diff --git a/src/room_proto_converter.cpp b/src/room_proto_converter.cpp index 9da3d24..989d05b 100644 --- a/src/room_proto_converter.cpp +++ b/src/room_proto_converter.cpp @@ -16,6 +16,7 @@ #include "room_proto_converter.h" +#include "livekit/data_stream.h" #include "livekit/local_participant.h" #include "room.pb.h" @@ -374,28 +375,6 @@ TrackPublishOptions fromProto(const proto::TrackPublishOptions &in) { return out; } -proto::TranscriptionSegment toProto(const TranscriptionSegment &in) { - proto::TranscriptionSegment msg; - msg.set_id(in.id); - msg.set_text(in.text); - msg.set_start_time(in.start_time); - msg.set_end_time(in.end_time); - msg.set_final(in.final); - msg.set_language(in.language); - return msg; -} - -TranscriptionSegment fromProto(const proto::TranscriptionSegment &in) { - TranscriptionSegment out; - out.id = in.id(); - out.text = in.text(); - out.start_time = in.start_time(); - out.end_time = in.end_time(); - out.final = in.final(); - out.language = in.language(); - return out; -} - UserDataPacketEvent userDataPacketFromProto(const proto::DataPacketReceived &in, RemoteParticipant *participant) { UserDataPacketEvent ev; @@ -426,4 +405,54 @@ SipDtmfReceivedEvent sipDtmfFromProto(const proto::DataPacketReceived &in, return ev; } +std::map +toAttrMap(const proto::DataStream::Trailer &trailer) { + std::map out; + for (const auto &kv : trailer.attributes()) { + out.emplace(kv.first, kv.second); + } + return out; +} + +TextStreamInfo makeTextInfo(const proto::DataStream::Header &header) { + TextStreamInfo info; + info.stream_id = header.stream_id(); + info.mime_type = header.mime_type(); + info.topic = header.topic(); + info.timestamp = header.timestamp(); + + if (header.has_total_length()) { + info.size = static_cast(header.total_length()); + } + + for (const auto &kv : header.attributes()) { + info.attributes.emplace(kv.first, kv.second); + } + + for (const auto &id : header.text_header().attached_stream_ids()) { + info.attachments.push_back(id); + } + + return info; +} + +ByteStreamInfo makeByteInfo(const proto::DataStream::Header &header) { + ByteStreamInfo info; + info.stream_id = header.stream_id(); + info.mime_type = header.mime_type(); + info.topic = header.topic(); + info.timestamp = header.timestamp(); + + if (header.has_total_length()) { + info.size = static_cast(header.total_length()); + } + + for (const auto &kv : header.attributes()) { + info.attributes.emplace(kv.first, kv.second); + } + + info.name = header.byte_header().name(); + return info; +} + } // namespace livekit diff --git a/src/room_proto_converter.h b/src/room_proto_converter.h index 0538207..23f8b0e 100644 --- a/src/room_proto_converter.h +++ b/src/room_proto_converter.h @@ -25,6 +25,8 @@ namespace livekit { enum class RpcErrorCode; class RemoteParticipant; +struct ByteStreamInfo; +struct TextStreamInfo; // --------- basic helper conversions --------- @@ -79,11 +81,6 @@ VideoEncodingOptions fromProto(const proto::VideoEncoding &in); proto::TrackPublishOptions toProto(const TrackPublishOptions &in); TrackPublishOptions fromProto(const proto::TrackPublishOptions &in); -// --------- room transcription conversions --------- - -proto::TranscriptionSegment toProto(const TranscriptionSegment &in); -TranscriptionSegment fromProto(const proto::TranscriptionSegment &in); - // --------- room Data Packet conversions --------- UserDataPacketEvent userDataPacketFromProto(const proto::DataPacketReceived &in, @@ -92,4 +89,10 @@ UserDataPacketEvent userDataPacketFromProto(const proto::DataPacketReceived &in, SipDtmfReceivedEvent sipDtmfFromProto(const proto::DataPacketReceived &in, RemoteParticipant *participant); +// --------- room Data Stream conversions --------- +std::map +toAttrMap(const proto::DataStream::Trailer &trailer); +ByteStreamInfo makeByteInfo(const proto::DataStream::Header &header); +TextStreamInfo makeTextInfo(const proto::DataStream::Header &header); + } // namespace livekit diff --git a/src/rpc_error.cpp b/src/rpc_error.cpp index 14c7d29..1e521cd 100644 --- a/src/rpc_error.cpp +++ b/src/rpc_error.cpp @@ -58,7 +58,6 @@ RpcError RpcError::builtIn(ErrorCode code, const std::string &data) { } const char *RpcError::defaultMessageFor(ErrorCode code) { - // Mirror Python RpcError.ErrorMessage mapping. switch (code) { case ErrorCode::APPLICATION_ERROR: return "Application error in method handler"; diff --git a/src/video_utils.cpp b/src/video_utils.cpp index 4f4c5c0..746e4ef 100644 --- a/src/video_utils.cpp +++ b/src/video_utils.cpp @@ -92,7 +92,7 @@ proto::VideoBufferInfo toProto(const LKVideoFrame &frame) { cmpt->set_size(plane.size); } - // Stride for main packed formats (matches Python logic) + // Stride for main packed formats. std::uint32_t stride = 0; switch (frame.type()) { case VideoBufferType::ARGB: