diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..4f7a22e --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,112 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: CI +on: + schedule: + - cron: 0 0 * * 1 + workflow_dispatch: + pull_request: + types: [opened, reopened, synchronize] + push: + branches: [main] +concurrency: + group: "ci" + cancel-in-progress: true +jobs: + license-check: + name: License Check + uses: ./.github/workflows/license_check.yml + + pin-check: + name: Pin Check + uses: ./.github/workflows/pin_check.yml + + build: + name: Build (${{ matrix.name }}) + runs-on: ${{ matrix.runs_on }} + permissions: + contents: read + strategy: + fail-fast: false + matrix: + include: + - name: macOS arm64 + runs_on: macos-latest + - name: Linux x64 + runs_on: ubuntu-24.04 + - name: Linux ARM64 + runs_on: ubuntu-24.04-arm + steps: + - name: Checkout + uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1 + + - name: Install Linux dependencies + if: runner.os == 'Linux' + run: | + sudo apt-get update + sudo apt-get install -y --no-install-recommends \ + build-essential \ + cmake \ + git \ + libgtk-3-dev \ + libglfw3-dev \ + libgl1-mesa-dev \ + libglu1-mesa-dev \ + liblz4-dev \ + libprotobuf-dev \ + libssl-dev \ + libudev-dev \ + libusb-1.0-0-dev \ + libxss-dev \ + libzstd-dev \ + pkg-config \ + protobuf-compiler \ + wget \ + zlib1g-dev \ + && sudo apt-get autoremove -y && sudo apt-get clean -y + + - name: Build and install librealsense2 from source (Linux) + if: runner.os == 'Linux' + run: | + set -euxo pipefail + src="${{ runner.temp }}/librealsense" + librealsense_ref="v2.55.1" + git clone --branch "$librealsense_ref" --depth 1 --single-branch https://github.com/realsenseai/librealsense.git "$src" + cd "$src" + sudo cp config/99-realsense-libusb.rules /etc/udev/rules.d/ + sudo udevadm control --reload-rules && sudo udevadm trigger + mkdir -p build && cd build + cmake ../ -DCMAKE_BUILD_TYPE=Release + make -j"$(( $(nproc) - 1 ))" + sudo make install + + - name: Install macOS dependencies + if: runner.os == 'macOS' + run: | + brew install \ + cmake \ + librealsense \ + lz4 \ + pkg-config \ + protobuf \ + zstd + + - name: Setup project dependencies + run: ./setup_realsense.sh + + - name: Build project + env: + GITHUB_TOKEN: ${{ github.token }} + run: ./build.sh diff --git a/.github/workflows/license_check.yml b/.github/workflows/license_check.yml new file mode 100644 index 0000000..c1bc2ff --- /dev/null +++ b/.github/workflows/license_check.yml @@ -0,0 +1,37 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: License Check +on: + workflow_call: {} + workflow_dispatch: {} +jobs: + license-check: + name: License Check + runs-on: ubuntu-latest + permissions: + contents: read + steps: + - name: Checkout + uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1 + - name: Check License Headers + shell: bash + run: | + shopt -s globstar + # ghcr.io/google/addlicense v1.2.0 + docker run --rm -v ${PWD}:/src -w /src ghcr.io/google/addlicense@sha256:5a48f41ccc8cf3fdd04499649f02a9ee5877ab6f39fd1ac18fd1db5eb1062c5a \ + -check \ + -l apache \ + -c "LiveKit, Inc." \ + **/*.{cpp,h} diff --git a/.github/workflows/pin_check.yml b/.github/workflows/pin_check.yml new file mode 100644 index 0000000..77e28bd --- /dev/null +++ b/.github/workflows/pin_check.yml @@ -0,0 +1,32 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Pin Check +on: + workflow_call: {} + workflow_dispatch: + +jobs: + pin-check: + name: Pin Check + runs-on: ubuntu-latest + permissions: + contents: read + steps: + - name: Checkout + uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1 + - name: Pin Check + uses: suzuki-shunsuke/pinact-action@cf51507d80d4d6522a07348e3d58790290eaf0b6 # v2.0.0 + with: + skip_push: true diff --git a/CMakeLists.txt b/CMakeLists.txt index 05e33a8..a06cf6b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,14 +38,40 @@ else() endif() find_package(LiveKit CONFIG REQUIRED) -find_package(Protobuf REQUIRED CONFIG) +if(TARGET LiveKit::livekit) + set(LIVEKIT_CORE_TARGET LiveKit::livekit) +elseif(TARGET livekit) + set(LIVEKIT_CORE_TARGET livekit) +else() + message(FATAL_ERROR + "Could not find a LiveKit core target (expected LiveKit::livekit or livekit).") +endif() + +# Prefer a package config install so modern protobuf releases can expose their +# absl/utf8_range transitive dependencies. Fall back to CMake's built-in module +# for environments such as Ubuntu's libprotobuf-dev. +find_package(Protobuf CONFIG QUIET) +if(NOT TARGET protobuf::libprotobuf) + find_package(Protobuf REQUIRED) +endif() find_package(realsense2 REQUIRED) find_package(PkgConfig REQUIRED) -pkg_check_modules(SPDLOG REQUIRED IMPORTED_TARGET spdlog) pkg_check_modules(ZSTD REQUIRED IMPORTED_TARGET libzstd) pkg_check_modules(LZ4 REQUIRED IMPORTED_TARGET liblz4) find_package(ZLIB REQUIRED) +function(prepend_livekit_includes target_name) + get_target_property(_livekit_include_dirs + ${LIVEKIT_CORE_TARGET} + INTERFACE_INCLUDE_DIRECTORIES + ) + if(_livekit_include_dirs) + target_include_directories(${target_name} BEFORE PRIVATE + ${_livekit_include_dirs} + ) + endif() +endfunction() + set(REALSENSE_MCAP_DIR ${CMAKE_CURRENT_SOURCE_DIR}) set(REALSENSE_EXTERNAL_DIR "${CMAKE_CURRENT_SOURCE_DIR}/external") set(REALSENSE_GENERATED_DIR "${CMAKE_CURRENT_SOURCE_DIR}/generated") @@ -90,12 +116,12 @@ add_executable(realsense_publisher ${CMAKE_CURRENT_SOURCE_DIR} ) target_link_libraries(realsense_publisher PRIVATE - LiveKit::livekit - PkgConfig::SPDLOG + ${LIVEKIT_CORE_TARGET} ${realsense2_LIBRARY} protobuf::libprotobuf ZLIB::ZLIB ) + prepend_livekit_includes(realsense_publisher) add_library(mcap_recorder_core ${REALSENSE_MCAP_DIR}/src/common/mcap_recorder_core.cpp @@ -129,7 +155,7 @@ add_executable(realsense_publisher ) target_link_libraries(mcap_recorder PRIVATE mcap_recorder_core - LiveKit::livekit - PkgConfig::SPDLOG + ${LIVEKIT_CORE_TARGET} ) + prepend_livekit_includes(mcap_recorder) message(STATUS "LiveKit participants: realsense_publisher, mcap_recorder") diff --git a/README.md b/README.md index f24f226..427c8aa 100644 --- a/README.md +++ b/README.md @@ -56,3 +56,5 @@ Then point this project at that prefix: - `mcap_recorder`: subscriber/recorder participant. It connects to the same room, subscribes to the published RGB + depth tracks, and writes the received data back out to an MCAP file. + +__NOTE: On macOS you may need to run executables with `sudo`__ diff --git a/src/mcap_recorder.cpp b/src/mcap_recorder.cpp index 69e5272..e4b262e 100644 --- a/src/mcap_recorder.cpp +++ b/src/mcap_recorder.cpp @@ -18,17 +18,18 @@ * the received sensor stream to an MCAP file. * * Usage: - * mcap_recorder [output.mcap] - * LIVEKIT_URL=... LIVEKIT_TOKEN=... mcap_recorder [output.mcap] + * mcap_recorder [--publisher-id ] [output.mcap] + * LIVEKIT_URL=... LIVEKIT_TOKEN=... mcap_recorder [--publisher-id ] + * [output.mcap] * - * Token must grant identity "mcap_recorder". Start realsense_publisher in the - * same room first to publish camera/color and camera/depth. + * Token must grant identity "mcap_recorder". Subscribe to the given publisher + * participant identity (default: realsense_publisher) for camera/color and + * camera/depth. */ #include "livekit/livekit.h" #include "common/mcap_recorder_core.h" - #include #include @@ -36,19 +37,35 @@ #include #include #include +#include +#include +#include +#include #include +#include #include #include #include #include -static volatile std::sig_atomic_t g_running = 1; -static void signalHandler(int) { g_running = 0; } +namespace { + +// Signal handler communication uses std::sig_atomic_t; the handler only +// performs an async-signal-safe assignment to this flag. +volatile std::sig_atomic_t g_running = 1; -static const char kSenderIdentity[] = "realsense_publisher"; -static const char kColorTrackName[] = "camera/color"; // video track -static const char kDepthTrackName[] = "camera/depth"; // data track -static const char kHelloTrackName[] = "hello"; // test data track +void signalHandler(int) { g_running = 0; } + +constexpr const char kDefaultPublisherId[] = "realsense_publisher"; +constexpr const char kColorTrackName[] = "camera/color"; // video track +constexpr const char kDepthTrackName[] = "camera/depth"; // data track +constexpr const char kHelloTrackName[] = "hello"; // test data track + +std::string fixedOneDecimal(double value) { + std::ostringstream os; + os << std::fixed << std::setprecision(1) << value; + return os.str(); +} class RecorderRoomDelegate : public livekit::RoomDelegate { public: @@ -73,6 +90,7 @@ class RecorderRoomDelegate : public livekit::RoomDelegate { struct RecorderOptions { std::string output_path = "mcap_recorder_output.mcap"; + std::string publisher_id = kDefaultPublisherId; std::string url; std::string token; }; @@ -101,24 +119,25 @@ struct ReceiveStats { void printStats() const { if (last_depth_latency_us >= 0 && last_hello_latency_us >= 0) { - LK_LOG_INFO("[mcap_recorder] Stopping... (depth: {}, hello: {}, " - "depth_lat_ms={:.1f}, hello_lat_ms={:.1f})", - depth_received, hello_received, - last_depth_latency_us / 1000.0, - last_hello_latency_us / 1000.0); + std::cout << "[mcap_recorder] Stopping... (depth: " << depth_received + << ", hello: " << hello_received << ", depth_lat_ms=" + << fixedOneDecimal(last_depth_latency_us / 1000.0) + << ", hello_lat_ms=" + << fixedOneDecimal(last_hello_latency_us / 1000.0) << ")" + << '\n'; } else if (last_depth_latency_us >= 0) { - LK_LOG_INFO("[mcap_recorder] Stopping... (depth: {}, hello: {}, " - "depth_lat_ms={:.1f})", - depth_received, hello_received, - last_depth_latency_us / 1000.0); + std::cout << "[mcap_recorder] Stopping... (depth: " << depth_received + << ", hello: " << hello_received << ", depth_lat_ms=" + << fixedOneDecimal(last_depth_latency_us / 1000.0) << ")" + << '\n'; } else if (last_hello_latency_us >= 0) { - LK_LOG_INFO("[mcap_recorder] Stopping... (depth: {}, hello: {}, " - "hello_lat_ms={:.1f})", - depth_received, hello_received, - last_hello_latency_us / 1000.0); + std::cout << "[mcap_recorder] Stopping... (depth: " << depth_received + << ", hello: " << hello_received << ", hello_lat_ms=" + << fixedOneDecimal(last_hello_latency_us / 1000.0) << ")" + << '\n'; } else { - LK_LOG_INFO("[mcap_recorder] Stopping... (depth: {}, hello: {})", - depth_received, hello_received); + std::cout << "[mcap_recorder] Stopping... (depth: " << depth_received + << ", hello: " << hello_received << ")" << '\n'; } } @@ -130,26 +149,56 @@ struct ReceiveStats { } }; +} // namespace + static bool parseArgs(int argc, char *argv[], RecorderOptions *options) { const char *env_url = std::getenv("LIVEKIT_URL"); const char *env_token = std::getenv("LIVEKIT_TOKEN"); - if (argc >= 4) { - options->output_path = argv[1]; - options->url = argv[2]; - options->token = argv[3]; + std::vector positional; + for (int i = 1; i < argc; ++i) { + std::string arg = argv[i]; + if (arg == "--help" || arg == "-h") { + return false; + } + if (arg == "--publisher-id") { + if (i + 1 >= argc) { + return false; + } + options->publisher_id = argv[++i]; + continue; + } + if (arg.rfind("--publisher-id=", 0) == 0) { + options->publisher_id = arg.substr(std::strlen("--publisher-id=")); + continue; + } + if (!arg.empty() && arg[0] == '-') { + std::cerr << "mcap_recorder: unknown option: " << arg << '\n'; + return false; + } + positional.push_back(std::move(arg)); + } + + if (options->publisher_id.empty()) { + return false; + } + + if (positional.size() >= 3) { + options->output_path = positional[0]; + options->url = positional[1]; + options->token = positional[2]; return true; } - if (argc >= 3) { - options->url = argv[1]; - options->token = argv[2]; + if (positional.size() >= 2) { + options->url = positional[0]; + options->token = positional[1]; return true; } if (env_url && env_token) { options->url = env_url; options->token = env_token; - if (argc >= 2) { - options->output_path = argv[1]; + if (positional.size() >= 1) { + options->output_path = positional[0]; } return true; } @@ -160,13 +209,17 @@ int main(int argc, char *argv[]) { GOOGLE_PROTOBUF_VERIFY_VERSION; std::signal(SIGINT, signalHandler); +#ifdef SIGTERM std::signal(SIGTERM, signalHandler); +#endif RecorderOptions options; if (!parseArgs(argc, argv, &options)) { - LK_LOG_ERROR( - "Usage: mcap_recorder [output.mcap] | " - "LIVEKIT_URL=... LIVEKIT_TOKEN=... mcap_recorder [output.mcap]"); + std::cerr + << "Usage: mcap_recorder [--publisher-id ] [output.mcap] " + " | LIVEKIT_URL=... LIVEKIT_TOKEN=... mcap_recorder " + "[--publisher-id ] [output.mcap]" + << '\n'; return 1; } @@ -175,7 +228,7 @@ int main(int argc, char *argv[]) { if (!recorder->open(options.output_path)) { return 1; } - LK_LOG_INFO("[mcap_recorder] Recording to {}", options.output_path); + std::cout << "[mcap_recorder] Recording to " << options.output_path << '\n'; ReceiveStats stats; std::atomic room_connected{true}; @@ -185,11 +238,13 @@ int main(int argc, char *argv[]) { std::unique_ptr room = std::make_unique(); room->setDelegate(&delegate); - LK_LOG_INFO("[mcap_recorder] Connecting to {} ...", options.url); + std::cout << "[mcap_recorder] Connecting to " << options.url << " ..." + << '\n'; livekit::RoomOptions room_options; room_options.auto_subscribe = true; + room_options.dynacast = false; if (!room->Connect(options.url, options.token, room_options)) { - LK_LOG_ERROR("[mcap_recorder] Failed to connect."); + std::cerr << "[mcap_recorder] Failed to connect." << '\n'; room.reset(); recorder.reset(); livekit::shutdown(); @@ -197,7 +252,7 @@ int main(int argc, char *argv[]) { } room->setOnVideoFrameCallback( - kSenderIdentity, livekit::TrackSource::SOURCE_CAMERA, + options.publisher_id, kColorTrackName, [&recorder](const livekit::VideoFrame &frame, std::int64_t) { const std::uint8_t *data = frame.data(); const std::size_t size = frame.dataSize(); @@ -208,7 +263,7 @@ int main(int argc, char *argv[]) { }); room->addOnDataFrameCallback( - kSenderIdentity, kDepthTrackName, + options.publisher_id, kDepthTrackName, [&recorder, &stats](const std::vector &payload, std::optional user_timestamp) { if (payload.empty()) { @@ -220,7 +275,7 @@ int main(int argc, char *argv[]) { }); room->addOnDataFrameCallback( - kSenderIdentity, kHelloTrackName, + options.publisher_id, kHelloTrackName, [&recorder, &stats](const std::vector &payload, std::optional user_timestamp) { stats.recordHello(user_timestamp); @@ -228,9 +283,8 @@ int main(int argc, char *argv[]) { user_timestamp); }); - LK_LOG_INFO("[mcap_recorder] Connected. Waiting for {} (camera/color + " - "camera/depth). Press Ctrl+C to stop.", - kSenderIdentity); + std::cout << "[mcap_recorder] Connected. Waiting for " << options.publisher_id + << " (camera/color + camera/depth). Press Ctrl+C to stop." << '\n'; while (g_running && room_connected.load(std::memory_order_relaxed)) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); @@ -239,6 +293,7 @@ int main(int argc, char *argv[]) { // now that we are done, print the stats stats.printStats(); + room->setDelegate(nullptr); room.reset(); recorder.reset(); livekit::shutdown(); diff --git a/src/realsense_publisher.cpp b/src/realsense_publisher.cpp index 8c87311..6fb0145 100644 --- a/src/realsense_publisher.cpp +++ b/src/realsense_publisher.cpp @@ -35,27 +35,36 @@ #include +#include #include #include +#include #include -#include #include +#include #include +#include +#include #include #include #include +#include #include -static volatile std::sig_atomic_t g_running = 1; -static void signalHandler(int) { g_running = 0; } +namespace { -static uint64_t nowNs() { +// volatile are safe for signal handlers +volatile std::sig_atomic_t g_running = 1; + +void signalHandler(int) { g_running = 0; } + +std::uint64_t nowNs() { return std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); } -static std::string nowStr() { +std::string nowStr() { auto now = std::chrono::system_clock::now(); auto ms = std::chrono::duration_cast( now.time_since_epoch()) % @@ -69,9 +78,15 @@ static std::string nowStr() { return os.str(); } +std::string fixedOneDecimal(double value) { + std::ostringstream os; + os << std::fixed << std::setprecision(1) << value; + return os.str(); +} + /// Convert RGB8 to RGBA (alpha = 0xFF). Assumes dst has size width*height*4. -static void rgb8ToRgba(const std::uint8_t *rgb, std::uint8_t *rgba, int width, - int height) { +void rgb8ToRgba(const std::uint8_t *rgb, std::uint8_t *rgba, int width, + int height) { const int rgbStep = width * 3; const int rgbaStep = width * 4; for (int y = 0; y < height; ++y) { @@ -86,8 +101,12 @@ static void rgb8ToRgba(const std::uint8_t *rgb, std::uint8_t *rgba, int width, } } -static foxglove::RawImage makeDepthMessage(const std::uint64_t timestamp_ns, - const int width, const int height) { +std::vector toPayload(const std::string &text) { + return std::vector(text.begin(), text.end()); +} + +foxglove::RawImage makeDepthMessage(const std::uint64_t timestamp_ns, + const int width, const int height) { const auto secs = static_cast(timestamp_ns / 1000000000ULL); const auto nsecs = static_cast(timestamp_ns % 1000000000ULL); @@ -102,11 +121,15 @@ static foxglove::RawImage makeDepthMessage(const std::uint64_t timestamp_ns, return msg; } +} // namespace + int main(int argc, char *argv[]) { GOOGLE_PROTOBUF_VERIFY_VERSION; std::signal(SIGINT, signalHandler); +#ifdef SIGTERM std::signal(SIGTERM, signalHandler); +#endif std::string url; std::string token; @@ -119,14 +142,16 @@ int main(int argc, char *argv[]) { url = env_url; token = env_token; } else { - LK_LOG_ERROR("Usage: realsense_publisher | " - "LIVEKIT_URL=... LIVEKIT_TOKEN=... realsense_publisher"); + std::cerr << "Usage: realsense_publisher | " + << "LIVEKIT_URL=... LIVEKIT_TOKEN=... realsense_publisher" + << '\n'; return 1; } const int kWidth = 640; const int kHeight = 480; const int kDepthFps = 10; + const int kDepthDownsample = 3; // Color+depth pipeline. rs2::pipeline pipe; @@ -136,18 +161,22 @@ int main(int argc, char *argv[]) { try { pipe.start(cfg); } catch (const rs2::error &e) { - LK_LOG_ERROR("RealSense error: {}", e.what()); + std::cerr << "RealSense error: " << e.what() << '\n'; return 1; } + rs2::decimation_filter depth_decimator; + depth_decimator.set_option(RS2_OPTION_FILTER_MAGNITUDE, kDepthDownsample); + livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); std::unique_ptr room = std::make_unique(); livekit::RoomOptions options; options.auto_subscribe = false; + options.dynacast = false; - LK_LOG_INFO("[realsense_publisher] Connecting to {} ...", url); + std::cout << "[realsense_publisher] Connecting to " << url << " ..." << '\n'; if (!room->Connect(url, token, options)) { - LK_LOG_ERROR("[realsense_publisher] Failed to connect."); + std::cerr << "[realsense_publisher] Failed to connect." << '\n'; pipe.stop(); livekit::shutdown(); return 1; @@ -159,12 +188,40 @@ int main(int argc, char *argv[]) { auto video_source = std::make_shared(kWidth, kHeight); auto video_track = local_participant->publishVideoTrack( "camera/color", video_source, livekit::TrackSource::SOURCE_CAMERA); - auto depth_track = local_participant->publishDataTrack("camera/depth"); - auto hello_track = local_participant->publishDataTrack("hello"); + (void)video_track; + + auto depth_track_result = local_participant->publishDataTrack("camera/depth"); + if (!depth_track_result) { + const auto &error = depth_track_result.error(); + std::cerr << "[realsense_publisher] Failed to publish depth data track: " + << "code=" << static_cast(error.code) + << " message=" << error.message << '\n'; + room.reset(); + pipe.stop(); + livekit::shutdown(); + return 1; + } + std::shared_ptr depth_track = + depth_track_result.value(); + + auto hello_track_result = local_participant->publishDataTrack("hello"); + if (!hello_track_result) { + const auto &error = hello_track_result.error(); + std::cerr << "[realsense_publisher] Failed to publish hello data track: " + << "code=" << static_cast(error.code) + << " message=" << error.message << '\n'; + room.reset(); + pipe.stop(); + livekit::shutdown(); + return 1; + } + std::shared_ptr hello_track = + hello_track_result.value(); - LK_LOG_INFO( - "[realsense_publisher] Publishing camera/color (video), camera/depth " - "(DataTrack), and hello (DataTrack). Press Ctrl+C to stop."); + std::cout + << "[realsense_publisher] Publishing camera/color (video), " + << "camera/depth (DataTrack), and hello (DataTrack). Press Ctrl+C to " + << "stop." << '\n'; std::vector rgbaBuf( static_cast(kWidth * kHeight * 4)); @@ -187,12 +244,15 @@ int main(int argc, char *argv[]) { last_hello = loop_start; ++hello_seq; std::string text = "hello recorder #" + std::to_string(hello_seq); - uint64_t ts_us = static_cast(nowNs() / 1000); - bool ok = hello_track->tryPush( - reinterpret_cast(text.data()), text.size(), - ts_us); - LK_LOG_INFO("[{}] [realsense_publisher] Sent hello #{} ({} bytes) -> {}", - nowStr(), hello_seq, text.size(), ok ? "ok" : "FAILED"); + std::uint64_t ts_us = nowNs() / 1000; + auto push_result = hello_track->tryPush(toPayload(text), ts_us); + if (!push_result) { + const auto &error = push_result.error(); + std::cerr << "[realsense_publisher] Failed to push hello frame #" + << hello_seq + << ": code=" << static_cast(error.code) + << " message=" << error.message << '\n'; + } } rs2::frameset frames; @@ -219,14 +279,13 @@ int main(int argc, char *argv[]) { rgb8ToRgba(static_cast(color.get_data()), rgbaBuf.data(), kWidth, kHeight); try { - video_source->captureFrame( - livekit::VideoFrame( - kWidth, kHeight, livekit::VideoBufferType::RGBA, - std::vector(rgbaBuf.begin(), rgbaBuf.end())), - timestamp_us); + livekit::VideoFrame frame = livekit::VideoFrame::create( + kWidth, kHeight, livekit::VideoBufferType::RGBA); + std::copy(rgbaBuf.begin(), rgbaBuf.end(), frame.data()); + video_source->captureFrame(frame, timestamp_us); } catch (const std::exception &e) { - LK_LOG_ERROR("[realsense_publisher] Failed to capture video frame: {}", - e.what()); + std::cerr << "[realsense_publisher] Failed to capture video frame: " + << e.what() << '\n'; break; } @@ -234,10 +293,13 @@ int main(int argc, char *argv[]) { if (loop_start - last_depth >= depth_interval) { last_depth = loop_start; - foxglove::RawImage msg = - makeDepthMessage(timestamp_ns, depth.get_width(), depth.get_height()); - msg.set_step(depth.get_width() * 2); - msg.set_data(depth.get_data(), depth.get_data_size()); + auto decimated = depth_decimator.process(depth).as(); + const int dw = decimated.get_width(); + const int dh = decimated.get_height(); + + foxglove::RawImage msg = makeDepthMessage(timestamp_ns, dw, dh); + msg.set_step(dw * 2); + msg.set_data(decimated.get_data(), decimated.get_data_size()); std::string serialized = msg.SerializeAsString(); @@ -249,19 +311,18 @@ int main(int argc, char *argv[]) { static_cast(serialized.size()), Z_BEST_SPEED); const auto push_start = std::chrono::steady_clock::now(); - bool ok = false; - if (zrc == Z_OK) { - ok = depth_track->tryPush(compressed.data(), - static_cast(comp_size), - static_cast(timestamp_us)); - } else { - LK_LOG_WARN("[realsense_publisher] zlib compress failed ({}), sending " - "uncompressed", - zrc); - ok = depth_track->tryPush( - reinterpret_cast(serialized.data()), - serialized.size(), static_cast(timestamp_us)); - } + const auto push_result = [&]() { + if (zrc == Z_OK) { + compressed.resize(static_cast(comp_size)); + return depth_track->tryPush(std::move(compressed), timestamp_us); + } + + std::cerr << "[realsense_publisher] zlib compress failed (" << zrc + << "), sending uncompressed" << '\n'; + return depth_track->tryPush( + std::vector(serialized.begin(), serialized.end()), + timestamp_us); + }(); const auto push_dur = std::chrono::steady_clock::now() - push_start; const double push_ms = @@ -269,15 +330,19 @@ int main(int argc, char *argv[]) { .count() / 1000.0; - ++depth_pushed; - if (!ok) { - LK_LOG_ERROR( - "[{}] [realsense_publisher] Failed to push depth frame #{} (push " - "took {:.1f}ms)", - nowStr(), depth_pushed, push_ms); + if (!push_result) { + const auto &error = push_result.error(); + std::cerr << '[' << nowStr() + << "] [realsense_publisher] Failed to push depth frame #" + << depth_pushed + 1 + << ": code=" << static_cast(error.code) + << " message=" << error.message << " (push took " + << fixedOneDecimal(push_ms) << "ms)" << '\n'; break; } + ++depth_pushed; + // Report stats of the depth track every 10 frames if (depth_pushed == 1 || depth_pushed % 10 == 0) { const double elapsed_sec = @@ -290,13 +355,13 @@ int main(int argc, char *argv[]) { ? static_cast(depth_pushed - last_depth_report_count) / elapsed_sec : 0; - LK_LOG_INFO( - "[{}] [realsense_publisher] Depth #{} push={:.1f}ms {}B->{}B " - "actual={:.1f}fps", - nowStr(), depth_pushed, push_ms, serialized.size(), - zrc == Z_OK ? static_cast(comp_size) - : serialized.size(), - actual_fps); + std::cout << '[' << nowStr() << "] [realsense_publisher] Depth #" + << depth_pushed << " push=" << fixedOneDecimal(push_ms) + << "ms " << serialized.size() << "B->" + << (zrc == Z_OK ? static_cast(comp_size) + : serialized.size()) + << "B actual=" << fixedOneDecimal(actual_fps) << "fps" + << '\n'; last_depth_report = loop_start; last_depth_report_count = depth_pushed; } @@ -307,7 +372,7 @@ int main(int argc, char *argv[]) { } } - LK_LOG_INFO("[realsense_publisher] Stopping..."); + std::cout << "[realsense_publisher] Stopping..." << '\n'; room.reset(); pipe.stop(); livekit::shutdown(); diff --git a/src/tests/realsense_to_mcap.cpp b/src/tests/realsense_to_mcap.cpp index 9f36c06..048dabc 100644 --- a/src/tests/realsense_to_mcap.cpp +++ b/src/tests/realsense_to_mcap.cpp @@ -28,24 +28,29 @@ #include #include +#include #include #include -static volatile std::sig_atomic_t g_running = 1; +namespace { -static void signalHandler(int signum) { - (void)signum; - g_running = 0; -} +// Writes to volatile std::sig_atomic_t are async-signal-safe. +volatile std::sig_atomic_t g_running = 1; + +constexpr int kWidth = 640; +constexpr int kHeight = 480; +constexpr const char *kOutputPath = "realsense_to_mcap.mcap"; + +void signalHandler(int) { g_running = 0; } -static uint64_t nowNs() { +std::uint64_t nowNs() { return std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); } -static void rgb8ToRgba(const std::uint8_t *rgb, std::uint8_t *rgba, int width, - int height) { +void rgb8ToRgba(const std::uint8_t *rgb, std::uint8_t *rgba, int width, + int height) { const int rgb_step = width * 3; const int rgba_step = width * 4; for (int y = 0; y < height; ++y) { @@ -60,29 +65,37 @@ static void rgb8ToRgba(const std::uint8_t *rgb, std::uint8_t *rgba, int width, } } +} // namespace + int main() { GOOGLE_PROTOBUF_VERIFY_VERSION; std::signal(SIGINT, signalHandler); +#ifdef SIGTERM std::signal(SIGTERM, signalHandler); +#endif rs2::pipeline pipe; rs2::config cfg; - cfg.enable_stream(RS2_STREAM_COLOR, 640, 480, RS2_FORMAT_RGB8, 30); - cfg.enable_stream(RS2_STREAM_DEPTH, 640, 480, RS2_FORMAT_Z16, 30); - pipe.start(cfg); + cfg.enable_stream(RS2_STREAM_COLOR, kWidth, kHeight, RS2_FORMAT_RGB8, 30); + cfg.enable_stream(RS2_STREAM_DEPTH, kWidth, kHeight, RS2_FORMAT_Z16, 30); + try { + pipe.start(cfg); + } catch (const rs2::error &e) { + std::cerr << "RealSense error: " << e.what() << "\n"; + return 1; + } McapRecorderCore recorder; - if (!recorder.open("realsense_to_mcap.mcap")) { + if (!recorder.open(kOutputPath)) { pipe.stop(); return 1; } - std::cout - << "Recording to realsense_to_mcap.mcap ... Press Ctrl+C to stop.\n"; + std::cout << "Recording to " << kOutputPath << " ... Press Ctrl+C to stop.\n"; uint32_t seq = 0; - std::vector rgba_buf(640 * 480 * 4); + std::vector rgba_buf(kWidth * kHeight * 4); while (g_running) { rs2::frameset frames; if (!pipe.poll_for_frames(&frames)) {