Skip to content

Commit ae11d64

Browse files
Data Stream features and example (#16)
* update the readme and polish the example to measure the latency * some more cleanup
1 parent 6457ca9 commit ae11d64

27 files changed

+1538
-306
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ Makefile
66
cmake_install.cmake
77
out
88
build/
9+
received_green.avif

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ add_library(livekit
159159
include/livekit/audio_frame.h
160160
include/livekit/audio_source.h
161161
include/livekit/audio_stream.h
162+
include/livekit/data_stream.h
162163
include/livekit/room.h
163164
include/livekit/room_event_types.h
164165
include/livekit/room_delegate.h
@@ -184,6 +185,7 @@ add_library(livekit
184185
src/audio_frame.cpp
185186
src/audio_source.cpp
186187
src/audio_stream.cpp
188+
src/data_stream.cpp
187189
src/ffi_handle.cpp
188190
src/ffi_client.cpp
189191
src/local_audio_track.cpp

README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,40 @@ The caller will automatically:
8989
- Print round-trip times
9090
- Annotate expected successes or expected failures
9191

92+
### SimpleDataStream
93+
- The SimpleDataStream example demonstrates how to:
94+
- Connect multiple participants to the same LiveKit room
95+
- Register text stream and byte stream handlers by topic (e.g. "chat", "files")
96+
- Send a text stream (chat message) from one participant to another
97+
- Send a byte stream (file/image) from one participant to another
98+
- Attach custom stream metadata (e.g. sent_ms) via stream attributes
99+
- Measure and print one-way latency on the receiver using sender timestamps
100+
- Receive streamed chunks and reconstruct the full payload on the receiver
101+
102+
#### 🔑 Generate Tokens
103+
Before running any participant, create JWT tokens with caller and greeter identities and your room name.
104+
```bash
105+
lk token create -r test -i caller --join --valid-for 99999h --dev --room=your_own_room
106+
lk token create -r test -i greeter --join --valid-for 99999h --dev --room=your_own_room
107+
```
108+
109+
#### ▶ Start Participants
110+
Start the receiver first (so it registers stream handlers before messages arrive):
111+
```bash
112+
./build/examples/SimpleDataStream --url $URL --token <jwt-token>
113+
```
114+
On another terminal or computer, start the sender
115+
```bash
116+
./build/examples/SimpleDataStream --url $URL --token <jwt-token>
117+
```
118+
119+
**Sender** (e.g. greeter)
120+
- Waits for the peer, then sends a text stream ("chat") and a file stream ("files") with timestamps and metadata, logging stream IDs and send times.
121+
122+
**Receiver** (e.g. caller)
123+
- Registers handlers for text and file streams, logs stream events, computes one-way latency, and saves the received file locally.
124+
125+
92126
## 🧰 Recommended Setup
93127
### macOS
94128
```bash

data/green.avif

2.23 MB
Binary file not shown.

examples/CMakeLists.txt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,23 @@ target_link_libraries(SimpleRpc
5252
PRIVATE
5353
nlohmann_json::nlohmann_json
5454
livekit
55+
)
56+
57+
#################### SimpleDataStream example ##########################
58+
59+
add_executable(SimpleDataStream
60+
simple_data_stream/main.cpp
61+
)
62+
63+
target_link_libraries(SimpleDataStream
64+
PRIVATE
65+
livekit
66+
)
67+
68+
add_custom_command(
69+
TARGET SimpleDataStream
70+
POST_BUILD
71+
COMMAND ${CMAKE_COMMAND} -E copy_directory
72+
${CMAKE_SOURCE_DIR}/data
73+
$<TARGET_FILE_DIR:SimpleDataStream>/data
5574
)
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
#include <atomic>
2+
#include <csignal>
3+
#include <cstdlib>
4+
#include <filesystem>
5+
#include <fstream>
6+
#include <iostream>
7+
#include <map>
8+
#include <optional>
9+
#include <random>
10+
#include <string>
11+
#include <thread>
12+
#include <vector>
13+
14+
#include "livekit/livekit.h"
15+
#include "livekit_ffi.h"
16+
17+
using namespace livekit;
18+
19+
namespace {
20+
21+
std::atomic<bool> g_running{true};
22+
23+
void handleSignal(int) { g_running.store(false); }
24+
25+
// Helper: get env var or empty string
26+
std::string getenvOrEmpty(const char *name) {
27+
const char *v = std::getenv(name);
28+
return v ? std::string(v) : std::string{};
29+
}
30+
31+
std::int64_t nowEpochMs() {
32+
using namespace std::chrono;
33+
return duration_cast<milliseconds>(system_clock::now().time_since_epoch())
34+
.count();
35+
}
36+
37+
std::string randomHexId(std::size_t nbytes = 16) {
38+
static thread_local std::mt19937_64 rng{std::random_device{}()};
39+
std::ostringstream oss;
40+
for (std::size_t i = 0; i < nbytes; ++i) {
41+
std::uint8_t b = static_cast<std::uint8_t>(rng() & 0xFF);
42+
const char *hex = "0123456789abcdef";
43+
oss << hex[(b >> 4) & 0xF] << hex[b & 0xF];
44+
}
45+
return oss.str();
46+
}
47+
48+
// Greeting: send text + image
49+
void greetParticipant(Room &room, const std::string &identity) {
50+
std::cout << "[DataStream] Greeting participant: " << identity << "\n";
51+
52+
LocalParticipant *lp = room.localParticipant();
53+
if (!lp) {
54+
std::cerr << "[DataStream] No local participant, cannot greet.\n";
55+
return;
56+
}
57+
58+
try {
59+
const std::int64_t sent_ms = nowEpochMs();
60+
const std::string sender_id =
61+
!lp->identity().empty() ? lp->identity() : std::string("cpp_sender");
62+
const std::vector<std::string> dest{identity};
63+
64+
// Send text stream ("chat")
65+
const std::string chat_stream_id = randomHexId();
66+
const std::string reply_to_id = "";
67+
std::map<std::string, std::string> chat_attrs;
68+
chat_attrs["sent_ms"] = std::to_string(sent_ms);
69+
chat_attrs["kind"] = "chat";
70+
chat_attrs["test_flag"] = "1";
71+
chat_attrs["seq"] = "1";
72+
73+
// Put timestamp in payload too (so you can compute latency even if
74+
// attributes aren’t plumbed through your reader info yet).
75+
const std::string body = "Hi! Just a friendly message";
76+
const std::string payload = "sent_ms=" + std::to_string(sent_ms) + "\n" +
77+
"stream_id=" + chat_stream_id + "\n" + body;
78+
TextStreamWriter text_writer(*lp, "chat", chat_attrs, chat_stream_id,
79+
payload.size(), reply_to_id, dest, sender_id);
80+
81+
const std::string message = "Hi! Just a friendly message";
82+
text_writer.write(message); // will be chunked internally if needed
83+
text_writer.close(); // optional reason/attributes omitted
84+
85+
// Send image as byte stream
86+
const std::string file_path = "data/green.avif";
87+
std::ifstream in(file_path, std::ios::binary);
88+
if (!in) {
89+
std::cerr << "[DataStream] Failed to open file: " << file_path << "\n";
90+
return;
91+
}
92+
93+
std::vector<std::uint8_t> data((std::istreambuf_iterator<char>(in)),
94+
std::istreambuf_iterator<char>());
95+
96+
const std::string file_stream_id = randomHexId();
97+
std::map<std::string, std::string> file_attrs;
98+
file_attrs["sent_ms"] = std::to_string(sent_ms);
99+
file_attrs["kind"] = "file";
100+
file_attrs["test_flag"] = "1";
101+
file_attrs["orig_path"] = file_path;
102+
const std::string name =
103+
std::filesystem::path(file_path).filename().string();
104+
const std::string mime = "image/avif";
105+
ByteStreamWriter byte_writer(*lp, name, "files", file_attrs, file_stream_id,
106+
data.size(), mime, dest, sender_id);
107+
byte_writer.write(data);
108+
byte_writer.close();
109+
110+
std::cout << "[DataStream] Greeting sent to " << identity
111+
<< " (sent_ms=" << sent_ms << ")\n";
112+
} catch (const std::exception &e) {
113+
std::cerr << "[DataStream] Error greeting participant " << identity << ": "
114+
<< e.what() << "\n";
115+
}
116+
}
117+
118+
// Handlers for incoming streams
119+
void handleChatMessage(std::shared_ptr<livekit::TextStreamReader> reader,
120+
const std::string &participant_identity) {
121+
try {
122+
const auto info = reader->info(); // copy (safe even if reader goes away)
123+
const std::int64_t recv_ms = nowEpochMs();
124+
const std::int64_t sent_ms = info.timestamp;
125+
const auto latency = (sent_ms > 0) ? (recv_ms - sent_ms) : -1;
126+
std::string full_text = reader->readAll();
127+
std::cout << "[DataStream] Received chat from " << participant_identity
128+
<< " topic=" << info.topic << " stream_id=" << info.stream_id
129+
<< " latency_ms=" << latency << " text='" << full_text << "'\n";
130+
} catch (const std::exception &e) {
131+
std::cerr << "[DataStream] Error reading chat stream from "
132+
<< participant_identity << ": " << e.what() << "\n";
133+
}
134+
}
135+
136+
void handleWelcomeImage(std::shared_ptr<livekit::ByteStreamReader> reader,
137+
const std::string &participant_identity) {
138+
try {
139+
const auto info = reader->info();
140+
const std::string stream_id =
141+
info.stream_id.empty() ? "unknown" : info.stream_id;
142+
const std::string original_name =
143+
info.name.empty() ? "received_image.bin" : info.name;
144+
// Latency: prefer header timestamp
145+
std::int64_t sent_ms = info.timestamp;
146+
// Optional: override with explicit attribute if you set it
147+
auto it = info.attributes.find("sent_ms");
148+
if (it != info.attributes.end()) {
149+
try {
150+
sent_ms = std::stoll(it->second);
151+
} catch (...) {
152+
}
153+
}
154+
const std::int64_t recv_ms = nowEpochMs();
155+
const std::int64_t latency_ms = (sent_ms > 0) ? (recv_ms - sent_ms) : -1;
156+
const std::string out_file = "received_" + original_name;
157+
std::cout << "[DataStream] Receiving image from " << participant_identity
158+
<< " stream_id=" << stream_id << " name='" << original_name << "'"
159+
<< " mime='" << info.mime_type << "'"
160+
<< " size="
161+
<< (info.size ? std::to_string(*info.size) : "unknown")
162+
<< " latency_ms=" << latency_ms << " -> '" << out_file << "'\n";
163+
std::ofstream out(out_file, std::ios::binary);
164+
if (!out) {
165+
std::cerr << "[DataStream] Failed to open output file: " << out_file
166+
<< "\n";
167+
return;
168+
}
169+
std::vector<std::uint8_t> chunk;
170+
std::uint64_t total_written = 0;
171+
while (reader->readNext(chunk)) {
172+
if (!chunk.empty()) {
173+
out.write(reinterpret_cast<const char *>(chunk.data()),
174+
static_cast<std::streamsize>(chunk.size()));
175+
total_written += chunk.size();
176+
}
177+
}
178+
std::cout << "[DataStream] Saved image from " << participant_identity
179+
<< " stream_id=" << stream_id << " bytes=" << total_written
180+
<< " to '" << out_file << std::endl;
181+
} catch (const std::exception &e) {
182+
std::cerr << "[DataStream] Error reading image stream from "
183+
<< participant_identity << ": " << e.what() << "\n";
184+
}
185+
}
186+
187+
} // namespace
188+
189+
int main(int argc, char *argv[]) {
190+
// Get URL and token from env.
191+
std::string url = getenvOrEmpty("LIVEKIT_URL");
192+
std::string token = getenvOrEmpty("LIVEKIT_TOKEN");
193+
194+
if (argc >= 3) {
195+
// Allow overriding via CLI: ./SimpleDataStream <ws-url> <token>
196+
url = argv[1];
197+
token = argv[2];
198+
}
199+
200+
if (url.empty() || token.empty()) {
201+
std::cerr << "LIVEKIT_URL and LIVEKIT_TOKEN (or CLI args) are required\n";
202+
return 1;
203+
}
204+
205+
std::cout << "[DataStream] Connecting to: " << url << "\n";
206+
207+
std::signal(SIGINT, handleSignal);
208+
#ifdef SIGTERM
209+
std::signal(SIGTERM, handleSignal);
210+
#endif
211+
212+
Room room{};
213+
RoomOptions options;
214+
options.auto_subscribe = true;
215+
options.dynacast = false;
216+
217+
bool ok = room.Connect(url, token, options);
218+
std::cout << "[DataStream] Connect result: " << std::boolalpha << ok << "\n";
219+
if (!ok) {
220+
std::cerr << "[DataStream] Failed to connect to room\n";
221+
FfiClient::instance().shutdown();
222+
return 1;
223+
}
224+
225+
auto info = room.room_info();
226+
std::cout << "[DataStream] Connected to room '" << info.name
227+
<< "', participants: " << info.num_participants << "\n";
228+
229+
// Register stream handlers
230+
room.registerTextStreamHandler(
231+
"chat", [](std::shared_ptr<TextStreamReader> reader,
232+
const std::string &participant_identity) {
233+
std::thread t(handleChatMessage, std::move(reader),
234+
participant_identity);
235+
t.detach();
236+
});
237+
238+
room.registerByteStreamHandler(
239+
"files", [](std::shared_ptr<ByteStreamReader> reader,
240+
const std::string &participant_identity) {
241+
std::thread t(handleWelcomeImage, std::move(reader),
242+
participant_identity);
243+
t.detach();
244+
});
245+
246+
// Greet existing participants
247+
{
248+
auto remotes = room.remoteParticipants();
249+
for (const auto &rp : remotes) {
250+
if (!rp)
251+
continue;
252+
std::cout << "Remote: " << rp->identity() << "\n";
253+
greetParticipant(room, rp->identity());
254+
}
255+
}
256+
257+
// Optionally: greet on join
258+
//
259+
// If Room API exposes a participant-connected callback, you could do:
260+
//
261+
// room.onParticipantConnected(
262+
// [&](RemoteParticipant& participant) {
263+
// std::cout << "[DataStream] participant connected: "
264+
// << participant.sid() << " " << participant.identity()
265+
// << "\n";
266+
// greetParticipant(room, participant.identity());
267+
// });
268+
//
269+
// Adjust to your actual event API.
270+
std::cout << "[DataStream] Ready. Waiting for streams (Ctrl-C to exit)...\n";
271+
// Keep process alive until signal
272+
while (g_running.load()) {
273+
std::this_thread::sleep_for(std::chrono::milliseconds(5));
274+
}
275+
276+
std::cout << "[DataStream] Shutting down...\n";
277+
FfiClient::instance().shutdown();
278+
return 0;
279+
}

examples/simple_room/main.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ int main(int argc, char *argv[]) {
286286
try {
287287
// publishTrack takes std::shared_ptr<Track>, LocalAudioTrack derives from
288288
// Track
289-
audioPub = room.local_participant()->publishTrack(audioTrack, audioOpts);
289+
audioPub = room.localParticipant()->publishTrack(audioTrack, audioOpts);
290290

291291
std::cout << "Published track:\n"
292292
<< " SID: " << audioPub->sid() << "\n"
@@ -314,7 +314,7 @@ int main(int argc, char *argv[]) {
314314
try {
315315
// publishTrack takes std::shared_ptr<Track>, LocalAudioTrack derives from
316316
// Track
317-
videoPub = room.local_participant()->publishTrack(videoTrack, videoOpts);
317+
videoPub = room.localParticipant()->publishTrack(videoTrack, videoOpts);
318318

319319
std::cout << "Published track:\n"
320320
<< " SID: " << videoPub->sid() << "\n"
@@ -341,12 +341,12 @@ int main(int argc, char *argv[]) {
341341
media.stopMic();
342342

343343
// Clean up the audio track publishment
344-
room.local_participant()->unpublishTrack(audioPub->sid());
344+
room.localParticipant()->unpublishTrack(audioPub->sid());
345345

346346
media.stopCamera();
347347

348348
// Clean up the video track publishment
349-
room.local_participant()->unpublishTrack(videoPub->sid());
349+
room.localParticipant()->unpublishTrack(videoPub->sid());
350350

351351
FfiClient::instance().shutdown();
352352
std::cout << "Exiting.\n";

0 commit comments

Comments
 (0)