Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ Makefile
cmake_install.cmake
out
build/
received_green.avif
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ add_library(livekit
include/livekit/audio_frame.h
include/livekit/audio_source.h
include/livekit/audio_stream.h
include/livekit/data_stream.h
include/livekit/room.h
include/livekit/room_event_types.h
include/livekit/room_delegate.h
Expand All @@ -184,6 +185,7 @@ add_library(livekit
src/audio_frame.cpp
src/audio_source.cpp
src/audio_stream.cpp
src/data_stream.cpp
src/ffi_handle.cpp
src/ffi_client.cpp
src/local_audio_track.cpp
Expand Down
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,40 @@ The caller will automatically:
- Print round-trip times
- Annotate expected successes or expected failures

### SimpleDataStream
- The SimpleDataStream example demonstrates how to:
- Connect multiple participants to the same LiveKit room
- Register text stream and byte stream handlers by topic (e.g. "chat", "files")
- Send a text stream (chat message) from one participant to another
- Send a byte stream (file/image) from one participant to another
- Attach custom stream metadata (e.g. sent_ms) via stream attributes
- Measure and print one-way latency on the receiver using sender timestamps
- Receive streamed chunks and reconstruct the full payload on the receiver

#### 🔑 Generate Tokens
Before running any participant, create JWT tokens with caller and greeter identities and your room name.
```bash
lk token create -r test -i caller --join --valid-for 99999h --dev --room=your_own_room
lk token create -r test -i greeter --join --valid-for 99999h --dev --room=your_own_room
```

#### ▶ Start Participants
Start the receiver first (so it registers stream handlers before messages arrive):
```bash
./build/examples/SimpleDataStream --url $URL --token <jwt-token>
```
On another terminal or computer, start the sender
```bash
./build/examples/SimpleDataStream --url $URL --token <jwt-token>
```

**Sender** (e.g. greeter)
- Waits for the peer, then sends a text stream ("chat") and a file stream ("files") with timestamps and metadata, logging stream IDs and send times.

**Receiver** (e.g. caller)
- Registers handlers for text and file streams, logs stream events, computes one-way latency, and saves the received file locally.


## 🧰 Recommended Setup
### macOS
```bash
Expand Down
Binary file added data/green.avif
Binary file not shown.
19 changes: 19 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,23 @@ target_link_libraries(SimpleRpc
PRIVATE
nlohmann_json::nlohmann_json
livekit
)

#################### SimpleDataStream example ##########################

add_executable(SimpleDataStream
simple_data_stream/main.cpp
)

target_link_libraries(SimpleDataStream
PRIVATE
livekit
)

add_custom_command(
TARGET SimpleDataStream
POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_directory
${CMAKE_SOURCE_DIR}/data
$<TARGET_FILE_DIR:SimpleDataStream>/data
)
279 changes: 279 additions & 0 deletions examples/simple_data_stream/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
#include <atomic>
#include <csignal>
#include <cstdlib>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <map>
#include <optional>
#include <random>
#include <string>
#include <thread>
#include <vector>

#include "livekit/livekit.h"
#include "livekit_ffi.h"

using namespace livekit;

namespace {

std::atomic<bool> g_running{true};

void handleSignal(int) { g_running.store(false); }

// Helper: get env var or empty string
std::string getenvOrEmpty(const char *name) {
const char *v = std::getenv(name);
return v ? std::string(v) : std::string{};
}

std::int64_t nowEpochMs() {
using namespace std::chrono;
return duration_cast<milliseconds>(system_clock::now().time_since_epoch())
.count();
}

std::string randomHexId(std::size_t nbytes = 16) {
static thread_local std::mt19937_64 rng{std::random_device{}()};
std::ostringstream oss;
for (std::size_t i = 0; i < nbytes; ++i) {
std::uint8_t b = static_cast<std::uint8_t>(rng() & 0xFF);
const char *hex = "0123456789abcdef";
oss << hex[(b >> 4) & 0xF] << hex[b & 0xF];
}
return oss.str();
}

// Greeting: send text + image
void greetParticipant(Room &room, const std::string &identity) {
std::cout << "[DataStream] Greeting participant: " << identity << "\n";

LocalParticipant *lp = room.localParticipant();
if (!lp) {
std::cerr << "[DataStream] No local participant, cannot greet.\n";
return;
}

try {
const std::int64_t sent_ms = nowEpochMs();
const std::string sender_id =
!lp->identity().empty() ? lp->identity() : std::string("cpp_sender");
const std::vector<std::string> dest{identity};

// Send text stream ("chat")
const std::string chat_stream_id = randomHexId();
const std::string reply_to_id = "";
std::map<std::string, std::string> chat_attrs;
chat_attrs["sent_ms"] = std::to_string(sent_ms);
chat_attrs["kind"] = "chat";
chat_attrs["test_flag"] = "1";
chat_attrs["seq"] = "1";

// Put timestamp in payload too (so you can compute latency even if
// attributes aren’t plumbed through your reader info yet).
const std::string body = "Hi! Just a friendly message";
const std::string payload = "sent_ms=" + std::to_string(sent_ms) + "\n" +
"stream_id=" + chat_stream_id + "\n" + body;
TextStreamWriter text_writer(*lp, "chat", chat_attrs, chat_stream_id,
payload.size(), reply_to_id, dest, sender_id);

const std::string message = "Hi! Just a friendly message";
text_writer.write(message); // will be chunked internally if needed
text_writer.close(); // optional reason/attributes omitted

// Send image as byte stream
const std::string file_path = "data/green.avif";
std::ifstream in(file_path, std::ios::binary);
if (!in) {
std::cerr << "[DataStream] Failed to open file: " << file_path << "\n";
return;
}

std::vector<std::uint8_t> data((std::istreambuf_iterator<char>(in)),
std::istreambuf_iterator<char>());

const std::string file_stream_id = randomHexId();
std::map<std::string, std::string> file_attrs;
file_attrs["sent_ms"] = std::to_string(sent_ms);
file_attrs["kind"] = "file";
file_attrs["test_flag"] = "1";
file_attrs["orig_path"] = file_path;
const std::string name =
std::filesystem::path(file_path).filename().string();
const std::string mime = "image/avif";
ByteStreamWriter byte_writer(*lp, name, "files", file_attrs, file_stream_id,
data.size(), mime, dest, sender_id);
byte_writer.write(data);
byte_writer.close();

std::cout << "[DataStream] Greeting sent to " << identity
<< " (sent_ms=" << sent_ms << ")\n";
} catch (const std::exception &e) {
std::cerr << "[DataStream] Error greeting participant " << identity << ": "
<< e.what() << "\n";
}
}

// Handlers for incoming streams
void handleChatMessage(std::shared_ptr<livekit::TextStreamReader> reader,
const std::string &participant_identity) {
try {
const auto info = reader->info(); // copy (safe even if reader goes away)
const std::int64_t recv_ms = nowEpochMs();
const std::int64_t sent_ms = info.timestamp;
const auto latency = (sent_ms > 0) ? (recv_ms - sent_ms) : -1;
std::string full_text = reader->readAll();
std::cout << "[DataStream] Received chat from " << participant_identity
<< " topic=" << info.topic << " stream_id=" << info.stream_id
<< " latency_ms=" << latency << " text='" << full_text << "'\n";
} catch (const std::exception &e) {
std::cerr << "[DataStream] Error reading chat stream from "
<< participant_identity << ": " << e.what() << "\n";
}
}

void handleWelcomeImage(std::shared_ptr<livekit::ByteStreamReader> reader,
const std::string &participant_identity) {
try {
const auto info = reader->info();
const std::string stream_id =
info.stream_id.empty() ? "unknown" : info.stream_id;
const std::string original_name =
info.name.empty() ? "received_image.bin" : info.name;
// Latency: prefer header timestamp
std::int64_t sent_ms = info.timestamp;
// Optional: override with explicit attribute if you set it
auto it = info.attributes.find("sent_ms");
if (it != info.attributes.end()) {
try {
sent_ms = std::stoll(it->second);
} catch (...) {
}
}
const std::int64_t recv_ms = nowEpochMs();
const std::int64_t latency_ms = (sent_ms > 0) ? (recv_ms - sent_ms) : -1;
const std::string out_file = "received_" + original_name;
std::cout << "[DataStream] Receiving image from " << participant_identity
<< " stream_id=" << stream_id << " name='" << original_name << "'"
<< " mime='" << info.mime_type << "'"
<< " size="
<< (info.size ? std::to_string(*info.size) : "unknown")
<< " latency_ms=" << latency_ms << " -> '" << out_file << "'\n";
std::ofstream out(out_file, std::ios::binary);
if (!out) {
std::cerr << "[DataStream] Failed to open output file: " << out_file
<< "\n";
return;
}
std::vector<std::uint8_t> chunk;
std::uint64_t total_written = 0;
while (reader->readNext(chunk)) {
if (!chunk.empty()) {
out.write(reinterpret_cast<const char *>(chunk.data()),
static_cast<std::streamsize>(chunk.size()));
total_written += chunk.size();
}
}
std::cout << "[DataStream] Saved image from " << participant_identity
<< " stream_id=" << stream_id << " bytes=" << total_written
<< " to '" << out_file << std::endl;
} catch (const std::exception &e) {
std::cerr << "[DataStream] Error reading image stream from "
<< participant_identity << ": " << e.what() << "\n";
}
}

} // namespace

int main(int argc, char *argv[]) {
// Get URL and token from env.
std::string url = getenvOrEmpty("LIVEKIT_URL");
std::string token = getenvOrEmpty("LIVEKIT_TOKEN");

if (argc >= 3) {
// Allow overriding via CLI: ./SimpleDataStream <ws-url> <token>
url = argv[1];
token = argv[2];
}

if (url.empty() || token.empty()) {
std::cerr << "LIVEKIT_URL and LIVEKIT_TOKEN (or CLI args) are required\n";
return 1;
}

std::cout << "[DataStream] Connecting to: " << url << "\n";

std::signal(SIGINT, handleSignal);
#ifdef SIGTERM
std::signal(SIGTERM, handleSignal);
#endif

Room room{};
RoomOptions options;
options.auto_subscribe = true;
options.dynacast = false;

bool ok = room.Connect(url, token, options);
std::cout << "[DataStream] Connect result: " << std::boolalpha << ok << "\n";
if (!ok) {
std::cerr << "[DataStream] Failed to connect to room\n";
FfiClient::instance().shutdown();
return 1;
}

auto info = room.room_info();
std::cout << "[DataStream] Connected to room '" << info.name
<< "', participants: " << info.num_participants << "\n";

// Register stream handlers
room.registerTextStreamHandler(
"chat", [](std::shared_ptr<TextStreamReader> reader,
const std::string &participant_identity) {
std::thread t(handleChatMessage, std::move(reader),
participant_identity);
t.detach();
});

room.registerByteStreamHandler(
"files", [](std::shared_ptr<ByteStreamReader> reader,
const std::string &participant_identity) {
std::thread t(handleWelcomeImage, std::move(reader),
participant_identity);
t.detach();
});

// Greet existing participants
{
auto remotes = room.remoteParticipants();
for (const auto &rp : remotes) {
if (!rp)
continue;
std::cout << "Remote: " << rp->identity() << "\n";
greetParticipant(room, rp->identity());
}
}

// Optionally: greet on join
//
// If Room API exposes a participant-connected callback, you could do:
//
// room.onParticipantConnected(
// [&](RemoteParticipant& participant) {
// std::cout << "[DataStream] participant connected: "
// << participant.sid() << " " << participant.identity()
// << "\n";
// greetParticipant(room, participant.identity());
// });
//
// Adjust to your actual event API.
std::cout << "[DataStream] Ready. Waiting for streams (Ctrl-C to exit)...\n";
// Keep process alive until signal
while (g_running.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}

std::cout << "[DataStream] Shutting down...\n";
FfiClient::instance().shutdown();
return 0;
}
8 changes: 4 additions & 4 deletions examples/simple_room/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ int main(int argc, char *argv[]) {
try {
// publishTrack takes std::shared_ptr<Track>, LocalAudioTrack derives from
// Track
audioPub = room.local_participant()->publishTrack(audioTrack, audioOpts);
audioPub = room.localParticipant()->publishTrack(audioTrack, audioOpts);

std::cout << "Published track:\n"
<< " SID: " << audioPub->sid() << "\n"
Expand Down Expand Up @@ -314,7 +314,7 @@ int main(int argc, char *argv[]) {
try {
// publishTrack takes std::shared_ptr<Track>, LocalAudioTrack derives from
// Track
videoPub = room.local_participant()->publishTrack(videoTrack, videoOpts);
videoPub = room.localParticipant()->publishTrack(videoTrack, videoOpts);

std::cout << "Published track:\n"
<< " SID: " << videoPub->sid() << "\n"
Expand All @@ -341,12 +341,12 @@ int main(int argc, char *argv[]) {
media.stopMic();

// Clean up the audio track publishment
room.local_participant()->unpublishTrack(audioPub->sid());
room.localParticipant()->unpublishTrack(audioPub->sid());

media.stopCamera();

// Clean up the video track publishment
room.local_participant()->unpublishTrack(videoPub->sid());
room.localParticipant()->unpublishTrack(videoPub->sid());

FfiClient::instance().shutdown();
std::cout << "Exiting.\n";
Expand Down
Loading
Loading