diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..0fd3846 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +build/ +dist/ +.git/ +external/microsoft/vcpkg/ +*.o +*.a +*.so +*.dylib diff --git a/CMakeLists.txt b/CMakeLists.txt index 3414420..d7746ad 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,9 @@ cmake_minimum_required(VERSION 3.23.5) -file(READ "${CMAKE_CURRENT_SOURCE_DIR}/version.txt" SYNAPSE_CPP_VERSION) -string(STRIP "${SYNAPSE_CPP_VERSION}" SYNAPSE_CPP_VERSION) +file(READ "${CMAKE_CURRENT_SOURCE_DIR}/version.txt" SYNAPSE_CPP_VERSION_FULL) +string(STRIP "${SYNAPSE_CPP_VERSION_FULL}" SYNAPSE_CPP_VERSION_FULL) +# Extract semver (before any -suffix) for CMake VERSION +string(REGEX REPLACE "^([0-9]+\\.[0-9]+\\.[0-9]+).*" "\\1" SYNAPSE_CPP_VERSION "${SYNAPSE_CPP_VERSION_FULL}") project(synapse VERSION ${SYNAPSE_CPP_VERSION}) set(CMAKE_CXX_STANDARD 17) @@ -10,7 +12,7 @@ set(CMAKE_CXX_EXTENSIONS OFF) find_package(gRPC CONFIG REQUIRED) find_package(Protobuf REQUIRED CONFIG) -find_package(science-libndtp REQUIRED CONFIG) +find_package(cppzmq CONFIG REQUIRED) find_package(science-scipp REQUIRED CONFIG) add_library(${PROJECT_NAME}) @@ -70,7 +72,7 @@ target_link_libraries( PRIVATE gRPC::grpc++ protobuf::libprotobuf - science::libndtp + cppzmq science::scipp ) @@ -111,21 +113,11 @@ install( ) if ("examples" IN_LIST VCPKG_MANIFEST_FEATURES) - add_executable(stream_out examples/stream_out/main.cpp) - file(GLOB_RECURSE TEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/examples/stream_out/*.cpp") - target_sources(stream_out PRIVATE ${TEST_SOURCES}) - target_include_directories(stream_out PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/examples) - target_link_libraries(stream_out PRIVATE ${PROJECT_NAME} science::scipp) - set_target_properties(stream_out PROPERTIES - RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/examples" - ) - - add_executable(stats examples/stats/main.cpp) - file(GLOB_RECURSE TEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/examples/stats/*.cpp") - target_sources(stats PRIVATE ${TEST_SOURCES}) - target_include_directories(stats PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/examples) - target_link_libraries(stats PRIVATE ${PROJECT_NAME} science::scipp) - set_target_properties(stats PROPERTIES + # Tap example + add_executable(tap_example examples/tap/main.cpp) + target_include_directories(tap_example PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/examples) + target_link_libraries(tap_example PRIVATE ${PROJECT_NAME} science::scipp) + set_target_properties(tap_example PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/examples" ) endif() diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8f83b78 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,55 @@ +FROM ubuntu:22.04 + +ENV DEBIAN_FRONTEND=noninteractive + +# Install build dependencies +RUN apt-get update && apt-get install -y \ + build-essential \ + git \ + curl \ + zip \ + unzip \ + tar \ + pkg-config \ + ninja-build \ + autoconf \ + automake \ + libtool \ + python3 \ + wget \ + gpg \ + && rm -rf /var/lib/apt/lists/* + +# Install CMake 3.27 via direct binary download (compatible with preset v6 and older vcpkg) +ARG CMAKE_VERSION=3.27.9 +RUN wget -qO- "https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-linux-aarch64.tar.gz" \ + | tar --strip-components=1 -xz -C /usr/local + +# Set up vcpkg (VCPKG_FORCE_SYSTEM_BINARIES required for ARM platforms) +ENV VCPKG_ROOT=/vcpkg +ENV VCPKG_FORCE_SYSTEM_BINARIES=1 +RUN git clone https://github.com/microsoft/vcpkg.git $VCPKG_ROOT && \ + cd $VCPKG_ROOT && \ + git checkout 1751f9f8c732c2e6f9e81ce56c10e4c4aa265b4a && \ + ./bootstrap-vcpkg.sh + +WORKDIR /src + +# Copy source files (excluding git dirs and build artifacts via .dockerignore) +COPY . . + +# Clone submodules at pinned versions +RUN rm -rf external/sciencecorp/synapse-api && \ + git clone --branch v2.1.0 https://github.com/sciencecorp/synapse-api.git external/sciencecorp/synapse-api + +RUN if [ ! -d "external/sciencecorp/vcpkg/ports" ]; then \ + rm -rf external/sciencecorp/vcpkg && \ + git clone https://github.com/sciencecorp/vcpkg.git external/sciencecorp/vcpkg; \ + fi + +# Configure and build +RUN cmake --preset=static -DCMAKE_BUILD_TYPE=Debug -DVCPKG_MANIFEST_FEATURES="examples;tests" +RUN cmake --build build + +# Run tests +CMD ["./build/synapse_tests"] diff --git a/README.md b/README.md index f53ec75..14cf695 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,72 @@ synapse provides CMake targets: This library offers a C++ interface to the Synapse API. -See the [examples](./examples) for more details. +### Device Operations + +```cpp +#include +#include + +// Connect to a device +synapse::Device device("192.168.1.100:50051"); + +// Get device info +synapse::DeviceInfo info; +device.info(&info); + +// Configure, start, and stop +synapse::Config config; +// ... add nodes to config ... +device.configure(&config); +device.start(); +device.stop(); + +// Query the device +synapse::QueryRequest req; +req.set_query_type(synapse::QueryType::kListTaps); +synapse::QueryResponse res; +device.query(req, &res); +``` + +### Tap Client (High-Throughput Data Streaming) + +```cpp +#include + +// Connect to a tap for streaming data +synapse::Tap tap("192.168.1.100:50051"); + +// List available taps +auto taps = tap.list_taps(); + +// Connect to a producer tap (read data from device) +tap.connect("broadband_tap"); + +// Read data +std::vector data; +while (tap.read(&data).ok()) { + // Process protobuf-encoded data (e.g., BroadbandFrame) +} -https://github.com/sciencecorp/synapse-cpp/blob/main/examples/stream_out/main.cpp +// Or read in batches for higher throughput +std::vector> batch; +size_t count = tap.read_batch(&batch, 100, 10); // up to 100 messages, 10ms timeout +``` + +### Discovery + +```cpp +#include + +// Discover devices on the network (passive UDP listening) +std::vector devices; +synapse::discover(5000, &devices); // 5 second timeout + +// Or use callback-based discovery +synapse::discover_iter([](const synapse::DeviceAdvertisement& device) { + std::cout << "Found: " << device.name << " at " << device.host << std::endl; + return true; // continue discovery +}, 10000); +``` + +See the [examples](./examples) for more details. diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..675a8a7 --- /dev/null +++ b/build.sh @@ -0,0 +1,19 @@ +#!/bin/bash +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR" + +IMAGE_NAME="synapse-cpp-builder" +CONTAINER_NAME="synapse-cpp-build" + +echo "Building Docker image..." +docker build -t "$IMAGE_NAME" . + +echo "Build successful!" +echo "" +echo "To run tests:" +echo " docker run --rm $IMAGE_NAME" +echo "" +echo "To get an interactive shell:" +echo " docker run --rm -it $IMAGE_NAME /bin/bash" diff --git a/cmake/Config.cmake.in b/cmake/Config.cmake.in index d6933a8..bb52c1b 100644 --- a/cmake/Config.cmake.in +++ b/cmake/Config.cmake.in @@ -2,7 +2,7 @@ find_package(gRPC REQUIRED CONFIG) find_package(Protobuf REQUIRED CONFIG) -find_package(science-libndtp REQUIRED CONFIG) +find_package(cppzmq REQUIRED CONFIG) find_package(science-scipp REQUIRED CONFIG) include("${CMAKE_CURRENT_LIST_DIR}/synapseTargets.cmake") diff --git a/examples/stats/main.cpp b/examples/stats/main.cpp deleted file mode 100644 index 354d7b6..0000000 --- a/examples/stats/main.cpp +++ /dev/null @@ -1,152 +0,0 @@ -#include -#include - -#include "science/scipp/status.h" -#include "science/synapse/channel.h" -#include "science/synapse/data.h" -#include "science/synapse/device.h" -#include "science/synapse/nodes/broadband_source.h" -#include "science/synapse/nodes/stream_out.h" -#include "packet_monitoring.h" - -using science::libndtp::NDTPHeader; -using synapse::Ch; -using synapse::Config; -using synapse::Device; -using synapse::DeviceInfo; -using synapse::Electrodes; -using synapse::NodeType; -using synapse::Signal; -using synapse::StreamOut; -using synapse::SynapseData; -using synapse::NodeConfig; -using synapse::Node; - -auto configure_stream(Device& device, std::shared_ptr* stream_out_ptr) -> science::Status { - const uint32_t N_CHANNELS = 32; // Using more channels for stats testing - if (stream_out_ptr == nullptr) { - return { science::StatusCode::kInvalidArgument, "stream out pointer is null" }; - } - - science::Status s; - DeviceInfo info; - s = device.info(&info); - if (!s.ok()) return s; - - // Configure signal with more channels for statistics gathering - Signal signal{ - Electrodes{ - .channels = {}, - .low_cutoff_hz = 500, - .high_cutoff_hz = 6000 - } - }; - auto& electrodes = std::get(signal.signal); - electrodes.channels.reserve(N_CHANNELS); - for (unsigned int i = 0; i < N_CHANNELS; i++) { - electrodes.channels.push_back(Ch{ - .id = i, - .electrode_id = i * 2, - .reference_id = i * 2 + 1 - }); - } - - Config config; - auto broadband_source = std::make_shared(100, 16, 30000, 20.0, signal); - - NodeConfig stream_out_config; - auto* stream_out_proto = stream_out_config.mutable_stream_out(); - auto* udp_config = stream_out_proto->mutable_udp_unicast(); - udp_config->set_destination_port(StreamOut::DEFAULT_STREAM_OUT_PORT); - - std::shared_ptr stream_out_node; - s = StreamOut::from_proto(stream_out_config, &stream_out_node); - if (!s.ok()) return s; - - *stream_out_ptr = std::dynamic_pointer_cast(stream_out_node); - if (!*stream_out_ptr) { - return { science::StatusCode::kInternal, "failed to cast stream out node" }; - } - - s = config.add_node(broadband_source); - if (!s.ok()) return s; - - s = config.add_node(*stream_out_ptr); - if (!s.ok()) return s; - - s = config.connect(broadband_source, *stream_out_ptr); - if (!s.ok()) return s; - - s = device.configure(&config); - if (!s.ok()) return s; - - std::cout << "Configured device..." << std::endl; - - s = device.start(); - if (!s.ok()) return s; - - std::cout << "Started device..." << std::endl; - return s; -} - -auto stream(const std::string& uri) -> int { - synapse::Device device(uri); - science::Status s; - - std::shared_ptr stream_out; - s = configure_stream(device, &stream_out); - if (!s.ok()) { - std::cout << "error configuring stream: (" - << static_cast(s.code()) << ") " << s.message() << std::endl; - return 1; - } - - if (stream_out == nullptr) { - std::cout << "stream out node not initialized" << std::endl; - return 1; - } - - std::cout << "Monitoring packet statistics..." << std::endl; - - // Initialize packet monitor - PacketMonitor monitor; - monitor.start_monitoring(); - auto last_stats_time = std::chrono::steady_clock::now(); - - while (true) { - size_t bytes_read; - NDTPHeader header; - SynapseData out; - s = stream_out->read(&out, &header, &bytes_read); - if (s.code() == science::StatusCode::kUnavailable) { - continue; - } - - if (!s.ok()) { - std::cout << "error reading from stream out node: (" - << static_cast(s.code()) << ") " << s.message() << std::endl; - continue; - } - - monitor.process_packet(header.seq_number, bytes_read); - - auto now = std::chrono::steady_clock::now(); - if (std::chrono::duration_cast(now - last_stats_time).count() >= 1) { - monitor.print_stats(); - last_stats_time = now; - } - } - - return 0; -} - -int main(int argc, char** argv) { - if (argc != 2) { - std::cout << "Usage: " << argv[0] << " " << std::endl; - std::cout << " uri: device URI (e.g., 192.168.0.1:647)" << std::endl; - return 1; - } - - std::string uri = argv[1]; - return stream(uri); -} diff --git a/examples/stats/packet_monitoring.cpp b/examples/stats/packet_monitoring.cpp deleted file mode 100644 index cf1d001..0000000 --- a/examples/stats/packet_monitoring.cpp +++ /dev/null @@ -1,102 +0,0 @@ -#include "stats/packet_monitoring.h" -#include -#include - -PacketMonitor::PacketMonitor() - : packet_count_(0) - , last_seq_number_(0) - , dropped_packets_(0) - , out_of_order_packets_(0) - , bytes_received_(0) - , bytes_received_in_interval_(0) - , last_jitter_(0) - , avg_jitter_(0) {} - -void PacketMonitor::start_monitoring() { - start_time_ = std::chrono::steady_clock::now(); - last_stats_time_ = start_time_; - last_bandwidth_time_ = start_time_; -} - -bool PacketMonitor::process_packet(uint16_t seq_number, size_t bytes_read) { - auto now = std::chrono::steady_clock::now(); - - if (packet_count_ == 0) { - first_packet_time_ = now; - last_packet_time_ = now; - auto elapsed = std::chrono::duration(now - start_time_).count(); - std::cout << "First packet received after " << std::fixed << std::setprecision(3) - << elapsed << " seconds\n\n"; - } else { - // Calculate jitter - auto interval = std::chrono::duration(now - last_packet_time_).count(); - if (packet_count_ > 1) { - double jitter_diff = std::abs(interval - last_jitter_); - avg_jitter_ += (jitter_diff - avg_jitter_) / 16.0; // RFC 3550 algorithm - } - last_jitter_ = interval; - last_packet_time_ = now; - - // Check for dropped or out-of-order packets - uint16_t expected = (last_seq_number_ + 1) % (1 << 16); - if (seq_number != expected) { - if (seq_number > expected) { - dropped_packets_ += (seq_number - expected) % (1 << 16); - } else { - out_of_order_packets_++; - } - } - } - - packet_count_++; - bytes_received_ += bytes_read; - bytes_received_in_interval_ += bytes_read; - last_seq_number_ = seq_number; - - return true; -} - -void PacketMonitor::clear_line() const { - // Move to start of line and clear it - std::cout << "\r" << std::string(80, ' ') << "\r"; -} - -std::string PacketMonitor::format_stats() const { - auto now = std::chrono::steady_clock::now(); - std::stringstream ss; - - // Runtime - auto runtime = std::chrono::duration(now - start_time_).count(); - ss << "Runtime " << std::fixed << std::setprecision(1) << runtime << "s | "; - - // Drop calculation - double drop_percent = (static_cast(dropped_packets_) / std::max(1, packet_count_)) * 100.0; - ss << "Dropped: " << dropped_packets_ << "/" << packet_count_ - << " (" << std::setprecision(1) << drop_percent << "%) | "; - - // Bandwidth calculation - auto dt_sec = std::chrono::duration(now - last_bandwidth_time_).count(); - if (dt_sec > 0) { - double bytes_per_second = bytes_received_in_interval_ / dt_sec; - double megabits_per_second = (bytes_per_second * 8) / 1'000'000; - ss << "Mbit/sec: " << std::setprecision(1) << megabits_per_second << " | "; - } - - // Jitter (in milliseconds) - double jitter_ms = avg_jitter_ * 1000; - ss << "Jitter: " << std::setprecision(2) << jitter_ms << " ms | "; - - // Out of order packets - ss << "Out of Order: " << out_of_order_packets_; - - return ss.str(); -} - -void PacketMonitor::print_stats() { - clear_line(); - std::cout << format_stats() << std::flush; - - // Reset interval counters - bytes_received_in_interval_ = 0; - last_bandwidth_time_ = std::chrono::steady_clock::now(); -} diff --git a/examples/stats/packet_monitoring.h b/examples/stats/packet_monitoring.h deleted file mode 100644 index 2d1c01a..0000000 --- a/examples/stats/packet_monitoring.h +++ /dev/null @@ -1,41 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -class PacketMonitor { - public: - PacketMonitor(); - - void start_monitoring(); - void print_stats(); - bool process_packet(uint16_t seq_number, size_t bytes_read); - - private: - // Packet tracking - uint64_t packet_count_; - uint16_t last_seq_number_; - uint64_t dropped_packets_; - uint64_t out_of_order_packets_; - - // Timing metrics - std::chrono::steady_clock::time_point start_time_; - std::chrono::steady_clock::time_point first_packet_time_; - std::chrono::steady_clock::time_point last_packet_time_; - std::chrono::steady_clock::time_point last_stats_time_; - - // Bandwidth tracking - uint64_t bytes_received_; - uint64_t bytes_received_in_interval_; - std::chrono::steady_clock::time_point last_bandwidth_time_; - - // Jitter tracking - double last_jitter_; - double avg_jitter_; - - // Helper methods - void clear_line() const; - std::string format_stats() const; -}; diff --git a/examples/stream_out/main.cpp b/examples/stream_out/main.cpp deleted file mode 100644 index b8fafe3..0000000 --- a/examples/stream_out/main.cpp +++ /dev/null @@ -1,235 +0,0 @@ -#include -#include - -#include "science/scipp/status.h" -#include "science/synapse/channel.h" -#include "science/synapse/data.h" -#include "science/synapse/device.h" -#include "science/synapse/nodes/broadband_source.h" -#include "science/synapse/nodes/stream_out.h" - -using synapse::BinnedSpiketrainData; -using synapse::Ch; -using synapse::Config; -using synapse::Device; -using synapse::DeviceInfo; -using synapse::Electrodes; -using synapse::ElectricalBroadbandData; -using synapse::NodeType; -using synapse::Signal; -using synapse::StreamOut; -using synapse::SynapseData; -using synapse::NodeConfig; -using synapse::Node; - - -auto stream_new(Device& device, std::shared_ptr* stream_out_ptr) -> science::Status { - const uint32_t N_CHANNELS = 10; - if (stream_out_ptr == nullptr) { - return { science::StatusCode::kInvalidArgument, "stream out pointer is null" }; - } - - science::Status s; - DeviceInfo info; - s = device.info(&info); - if (!s.ok()) return s; - - Signal signal{ - Electrodes{ - .channels = {}, - .low_cutoff_hz = 500, - .high_cutoff_hz = 6000 - } - }; - auto& electrodes = std::get(signal.signal); - electrodes.channels.reserve(N_CHANNELS); - for (unsigned int i = 0; i < N_CHANNELS; i++) { - electrodes.channels.push_back(Ch{ - .id = i, - .electrode_id = i * 2, - .reference_id = i * 2 + 1 - }); - } - - Config config; - auto broadband_source = std::make_shared(100, 16, 30000, 20.0, signal); - - // Create StreamOut with explicit configuration - NodeConfig stream_out_config; - auto* stream_out_proto = stream_out_config.mutable_stream_out(); - auto* udp_config = stream_out_proto->mutable_udp_unicast(); - udp_config->set_destination_port(StreamOut::DEFAULT_STREAM_OUT_PORT); - stream_out_proto->set_label("Broadband Stream"); - - std::shared_ptr stream_out_node; - s = StreamOut::from_proto(stream_out_config, &stream_out_node); - if (!s.ok()) return s; - - *stream_out_ptr = std::dynamic_pointer_cast(stream_out_node); - if (!*stream_out_ptr) { - return { science::StatusCode::kInternal, "failed to cast stream out node" }; - } - - s = config.add_node(broadband_source); - if (!s.ok()) return s; - - s = config.add_node(*stream_out_ptr); - if (!s.ok()) return s; - - s = config.connect(broadband_source, *stream_out_ptr); - if (!s.ok()) return s; - - s = device.configure(&config); - if (!s.ok()) return s; - - std::cout << "Configured device" << std::endl; - - s = device.start(); - std::cout << "Started device" << std::endl; - - return s; -} - -auto stream_existing(Device& device, std::shared_ptr* stream_out_ptr) -> science::Status { - if (stream_out_ptr == nullptr) { - return { science::StatusCode::kInvalidArgument, "stream out pointer is null" }; - } - - science::Status s; - DeviceInfo info; - s = device.info(&info); - if (!s.ok()) return s; - - uint32_t stream_out_id = 0; - NodeConfig stream_out_config; - const auto& nodes = info.configuration().nodes(); - for (const auto& node : nodes) { - if (node.type() == NodeType::kStreamOut) { - stream_out_id = node.id(); - stream_out_config = node; - break; - } - } - - if (stream_out_id == 0) { - return { science::StatusCode::kNotFound, "no stream out node found" }; - } - - std::shared_ptr stream_out_node; - s = StreamOut::from_proto(stream_out_config, &stream_out_node); - if (!s.ok()) return s; - - *stream_out_ptr = std::dynamic_pointer_cast(stream_out_node); - if (!*stream_out_ptr) { - return { science::StatusCode::kInternal, "failed to cast stream out node" }; - } - - std::cout << "found stream out node with id " << stream_out_id << std::endl; - - Config config; - s = config.add_node(*stream_out_ptr, stream_out_id); - if (!s.ok()) return s; - - s = config.set_device(&device); - if (!s.ok()) return s; - - return s; -} - -auto stream(const std::string& uri, bool configure) -> int { - synapse::Device device(uri); - science::Status s; - - std::shared_ptr stream_out; - if (configure) { - s = stream_new(device, &stream_out); - if (!s.ok()) { - std::cout << "error configuring stream out node: (" - << static_cast(s.code()) << ") " << s.message() << std::endl; - return 1; - } - } else { - s = stream_existing(device, &stream_out); - if (!s.ok()) { - std::cout << "error getting existing stream out node: (" - << static_cast(s.code()) << ") " << s.message() << std::endl; - return 1; - } - } - - if (stream_out == nullptr) { - std::cout << "stream out node not initialized" << std::endl; - return 1; - } - - std::cout << "Reading..." << std::endl; - while (true) { - SynapseData out; - s = stream_out->read(&out); - if (s.code() == science::StatusCode::kUnavailable) { - continue; - } - - if (!s.ok()) { - std::cout << "error reading from stream out node: (" - << static_cast(s.code()) << ") " << s.message() << std::endl; - continue; - } - - if (std::holds_alternative(out)) { - auto data = std::get(out); - std::cout << "recv electrical broadband data" << std::endl; - std::cout << " - t0: " << data.t0 << std::endl; - std::cout << " - bit_width: " << data.bit_width << std::endl; - std::cout << " - is_signed: " << data.is_signed << std::endl; - std::cout << " - sample_rate: " << data.sample_rate << std::endl; - std::cout << " - n_channels: " << data.channels.size() << std::endl; - - for (const auto& c : data.channels) { - size_t n_samples = c.channel_data.size(); - std::cout << " - channel [" << c.channel_id << "]" << std::endl; - - for (size_t i = 0; i < std::min(n_samples, size_t(10)); i++) { - std::cout << " - sample [" << i << "]: " << c.channel_data[i] << std::endl; - } - } - - } else if (std::holds_alternative(out)) { - auto data = std::get(out); - size_t n_spiking_channels = 0; - std::stringstream ss; - for (size_t i = 0; i < data.spike_counts.size(); i++) { - if (data.spike_counts[i] > 0) { - if (n_spiking_channels > 0) { - ss << ", "; - } - ss << "[" << i << "]: " << static_cast(data.spike_counts[i]); - n_spiking_channels++; - } - } - if (n_spiking_channels > 0) { - std::cout << " - received " << n_spiking_channels << " spiking channels:" << std::endl; - std::cout << ss.str() << std::endl; - } - } else { - std::cout << "received data of unknown type" << std::endl; - } - } - - return 0; -} - -int main(int argc, char** argv) { - if (argc != 2 && argc != 3) { - std::cout << "Usage: " << argv[0] << " [--config]" << std::endl; - std::cout << " uri: device URI (e.g., 192.168.0.1:647)" << std::endl; - std::cout << " --config: optional flag to configure a new stream" << std::endl; - std::cout << " if omitted, uses existing stream" << std::endl; - return 1; - } - - std::string uri = argv[1]; - bool configure = (argc == 3 && std::string(argv[2]) == "--config"); - - return stream(uri, configure); -} diff --git a/examples/tap/main.cpp b/examples/tap/main.cpp new file mode 100644 index 0000000..5df0e0b --- /dev/null +++ b/examples/tap/main.cpp @@ -0,0 +1,129 @@ +#include +#include +#include +#include + +#include "science/synapse/tap.h" +#include "science/synapse/api/datatype.pb.h" + +void print_usage(const char* program_name) { + std::cout << "Usage: " << program_name << " [tap_name]" << std::endl; + std::cout << " device_uri: The URI of the Synapse device (e.g., 192.168.1.100:647)" << std::endl; + std::cout << " tap_name: Optional tap name to connect to (if not provided, lists available taps)" << std::endl; +} + +void list_taps(synapse::Tap& tap) { + auto taps = tap.list_taps(); + + if (taps.empty()) { + std::cout << "No taps available on device" << std::endl; + return; + } + + std::cout << "Available taps:" << std::endl; + for (const auto& t : taps) { + std::cout << " - " << t.name() << std::endl; + std::cout << " endpoint: " << t.endpoint() << std::endl; + std::cout << " message_type: " << t.message_type() << std::endl; + std::cout << " type: "; + switch (t.tap_type()) { + case synapse::TapType::TAP_TYPE_PRODUCER: + std::cout << "producer (read data from device)" << std::endl; + break; + case synapse::TapType::TAP_TYPE_CONSUMER: + std::cout << "consumer (send data to device)" << std::endl; + break; + default: + std::cout << "unspecified" << std::endl; + } + } +} + +void stream_tap(synapse::Tap& tap, const std::string& tap_name) { + auto status = tap.connect(tap_name); + if (!status.ok()) { + std::cerr << "Failed to connect to tap: " << status.message() << std::endl; + return; + } + + auto connected = tap.connected_tap(); + if (!connected) { + std::cerr << "Not connected" << std::endl; + return; + } + + std::cout << "Connected to tap: " << connected->name() << std::endl; + std::cout << "Message type: " << connected->message_type() << std::endl; + + // Check if this is a producer tap (we read from it) + if (connected->tap_type() == synapse::TapType::TAP_TYPE_CONSUMER) { + std::cerr << "This is a consumer tap. This example only reads from producer taps." << std::endl; + tap.disconnect(); + return; + } + + std::cout << "Streaming data (Ctrl+C to stop)..." << std::endl; + + uint64_t message_count = 0; + auto start_time = std::chrono::steady_clock::now(); + + while (true) { + std::vector data; + auto status = tap.read(&data, 1000); + + if (!status.ok()) { + if (status.code() == science::StatusCode::kDeadlineExceeded) { + // Timeout, just continue + continue; + } + std::cerr << "Error reading: " << status.message() << std::endl; + break; + } + + message_count++; + + // Print stats every 1000 messages + if (message_count % 1000 == 0) { + auto now = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(now - start_time).count(); + double rate = (elapsed > 0) ? static_cast(message_count) / elapsed : 0; + std::cout << "Received " << message_count << " messages (" << rate << " msg/s), " + << "last message size: " << data.size() << " bytes" << std::endl; + } + + // Try to parse as BroadbandFrame if the message type matches + if (connected->message_type() == "synapse.BroadbandFrame" && message_count == 1) { + synapse::BroadbandFrame frame; + if (frame.ParseFromArray(data.data(), static_cast(data.size()))) { + std::cout << "First BroadbandFrame: timestamp=" << frame.timestamp_ns() + << ", seq=" << frame.sequence_number() + << ", samples=" << frame.frame_data_size() + << ", sample_rate=" << frame.sample_rate_hz() << " Hz" << std::endl; + } + } + } + + tap.disconnect(); + std::cout << "Disconnected. Total messages: " << message_count << std::endl; +} + +int main(int argc, char* argv[]) { + if (argc < 2) { + print_usage(argv[0]); + return 1; + } + + std::string device_uri = argv[1]; + synapse::Tap tap(device_uri); + + if (argc == 2) { + // Just list taps + list_taps(tap); + } else { + // Connect to specified tap and stream + std::string tap_name = argv[2]; + stream_tap(tap, tap_name); + } + + return 0; +} diff --git a/external/sciencecorp/synapse-api b/external/sciencecorp/synapse-api index 22f14a2..2b6307a 160000 --- a/external/sciencecorp/synapse-api +++ b/external/sciencecorp/synapse-api @@ -1 +1 @@ -Subproject commit 22f14a204e007fbbb7695aea84c344b41f5b47dc +Subproject commit 2b6307a97b6f612a17646de076743fc52aa518a9 diff --git a/include/science/synapse/data.h b/include/science/synapse/data.h deleted file mode 100644 index ea668b2..0000000 --- a/include/science/synapse/data.h +++ /dev/null @@ -1,10 +0,0 @@ -#pragma once -#include "science/libndtp/types.h" - -namespace synapse { - -using SynapseData = science::libndtp::SynapseData; -using BinnedSpiketrainData = science::libndtp::BinnedSpiketrainData; -using ElectricalBroadbandData = science::libndtp::ElectricalBroadbandData; - -} diff --git a/include/science/synapse/device.h b/include/science/synapse/device.h index 676fb96..2beb810 100644 --- a/include/science/synapse/device.h +++ b/include/science/synapse/device.h @@ -6,7 +6,11 @@ #include #include +#include "science/synapse/api/app.pb.h" +#include "science/synapse/api/device.pb.h" +#include "science/synapse/api/logging.pb.h" #include "science/synapse/api/node.pb.h" +#include "science/synapse/api/query.pb.h" #include "science/synapse/api/synapse.pb.h" #include "science/synapse/api/synapse.grpc.pb.h" #include "science/synapse/config.h" @@ -21,7 +25,6 @@ class IDevice { virtual auto info(synapse::DeviceInfo* info, std::optional timeout = std::nullopt) -> science::Status = 0; virtual auto start(std::optional timeout = std::nullopt) -> science::Status = 0; virtual auto stop(std::optional timeout = std::nullopt) -> science::Status = 0; - virtual auto sockets() const -> const std::vector& = 0; virtual auto uri() const -> const std::string& = 0; }; @@ -53,16 +56,55 @@ class Device : public IDevice { [[nodiscard]] auto stop(std::optional timeout = std::nullopt) -> science::Status; /** - * List the node sockets configured on the device. - * - * @return std::vector + * Execute a query on the device. + * + * @param request The query request. + * @param response Output parameter for the query response. + * @param timeout Optional timeout for the query. + * @return Status indicating success or failure. */ - [[nodiscard]] auto sockets() const -> const std::vector&; + [[nodiscard]] auto query(const synapse::QueryRequest& request, + synapse::QueryResponse* response, + std::optional timeout = std::nullopt) -> science::Status; /** - * List the node sockets configured on the device. - * - * @return std::vector + * Get device logs. + * + * @param request Log query parameters (time range, level filter). + * @param response Output parameter for log entries. + * @param timeout Optional timeout. + * @return Status indicating success or failure. + */ + [[nodiscard]] auto get_logs(const synapse::LogQueryRequest& request, + synapse::LogQueryResponse* response, + std::optional timeout = std::nullopt) -> science::Status; + + /** + * Update device settings. + * + * @param request Settings to update. + * @param response Output parameter for updated settings. + * @param timeout Optional timeout. + * @return Status indicating success or failure. + */ + [[nodiscard]] auto update_settings(const synapse::UpdateDeviceSettingsRequest& request, + synapse::UpdateDeviceSettingsResponse* response, + std::optional timeout = std::nullopt) -> science::Status; + + /** + * List installed applications on the device. + * + * @param response Output parameter for list of apps. + * @param timeout Optional timeout. + * @return Status indicating success or failure. + */ + [[nodiscard]] auto list_apps(synapse::ListAppsResponse* response, + std::optional timeout = std::nullopt) -> science::Status; + + /** + * Get the device URI. + * + * @return The device URI string. */ [[nodiscard]] auto uri() const -> const std::string&; @@ -71,8 +113,6 @@ class Device : public IDevice { std::shared_ptr channel_; std::unique_ptr rpc_; - std::vector sockets_; - [[nodiscard]] auto handle_status_response(const synapse::Status& status) -> science::Status; }; diff --git a/include/science/synapse/nodes/disk_writer.h b/include/science/synapse/nodes/disk_writer.h new file mode 100644 index 0000000..bd12eea --- /dev/null +++ b/include/science/synapse/nodes/disk_writer.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include "science/synapse/api/nodes/disk_writer.pb.h" +#include "science/synapse/node.h" + +namespace synapse { + +class DiskWriter : public Node { + public: + explicit DiskWriter(const std::string& filename); + + [[nodiscard]] static auto from_proto( + const synapse::NodeConfig& proto, + std::shared_ptr* node + ) -> science::Status; + + protected: + auto p_to_proto(synapse::NodeConfig* proto) -> science::Status override; + + private: + std::string filename_; +}; + +} // namespace synapse diff --git a/include/science/synapse/nodes/spike_binner.h b/include/science/synapse/nodes/spike_binner.h new file mode 100644 index 0000000..541bd30 --- /dev/null +++ b/include/science/synapse/nodes/spike_binner.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include "science/synapse/api/nodes/spike_binner.pb.h" +#include "science/synapse/node.h" + +namespace synapse { + +class SpikeBinner : public Node { + public: + explicit SpikeBinner(uint32_t bin_size_ms); + + [[nodiscard]] static auto from_proto( + const synapse::NodeConfig& proto, + std::shared_ptr* node + ) -> science::Status; + + protected: + auto p_to_proto(synapse::NodeConfig* proto) -> science::Status override; + + private: + uint32_t bin_size_ms_; +}; + +} // namespace synapse diff --git a/include/science/synapse/nodes/spike_detect.h b/include/science/synapse/nodes/spike_detect.h deleted file mode 100644 index bc2770a..0000000 --- a/include/science/synapse/nodes/spike_detect.h +++ /dev/null @@ -1,37 +0,0 @@ -#pragma once - -#include -#include "science/synapse/api/nodes/spike_detect.pb.h" -#include "science/synapse/channel_mask.h" -#include "science/synapse/node.h" - -namespace synapse { - -class SpikeDetect : public Node { - public: - explicit SpikeDetect( - const synapse::SpikeDetectConfig::SpikeDetectMode& mode, - uint32_t threshold_uv, - const ChannelMask& template_uv, - bool sort, - uint32_t bin_size_ms - ); - - [[nodiscard]] static auto from_proto( - const synapse::NodeConfig& proto, - std::shared_ptr* node - ) -> science::Status; - - - protected: - auto p_to_proto(synapse::NodeConfig* proto) -> science::Status override; - - private: - synapse::SpikeDetectConfig::SpikeDetectMode mode_; - uint32_t threshold_uv_; - ChannelMask template_uv_; - bool sort_; - uint32_t bin_size_ms_; -}; - -} // namespace synapse diff --git a/include/science/synapse/nodes/spike_detector.h b/include/science/synapse/nodes/spike_detector.h new file mode 100644 index 0000000..576affd --- /dev/null +++ b/include/science/synapse/nodes/spike_detector.h @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include "science/synapse/api/nodes/spike_detector.pb.h" +#include "science/synapse/node.h" + +namespace synapse { + +class SpikeDetector : public Node { + public: + // Create a thresholder-based spike detector + static auto create_thresholder(uint32_t threshold_uv, uint32_t samples_per_spike) + -> std::shared_ptr; + + // Create a template matcher-based spike detector + static auto create_template_matcher(const std::vector& template_uv, uint32_t samples_per_spike) + -> std::shared_ptr; + + [[nodiscard]] static auto from_proto( + const synapse::NodeConfig& proto, + std::shared_ptr* node + ) -> science::Status; + + protected: + auto p_to_proto(synapse::NodeConfig* proto) -> science::Status override; + + private: + SpikeDetector(); + + enum class Mode { Thresholder, TemplateMatcher }; + Mode mode_; + uint32_t threshold_uv_; + std::vector template_uv_; + uint32_t samples_per_spike_; +}; + +} // namespace synapse diff --git a/include/science/synapse/nodes/stream_in.h b/include/science/synapse/nodes/stream_in.h deleted file mode 100644 index 11bf607..0000000 --- a/include/science/synapse/nodes/stream_in.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include "science/scipp/status.h" -#include "science/synapse/api/nodes/stream_in.pb.h" -#include "science/synapse/nodes/udp_node.h" - -namespace synapse { - -class StreamIn : public UdpNode { - public: - explicit StreamIn(const synapse::DataType& data_type, const std::vector& shape); - - auto write(const std::vector& in) -> science::Status; - - [[nodiscard]] static auto from_proto( - const synapse::NodeConfig& proto, - std::shared_ptr* node - ) -> science::Status; - - protected: - auto p_to_proto(synapse::NodeConfig* proto) -> science::Status override; - - private: - const synapse::DataType data_type_; - const std::vector shape_; - auto init() -> science::Status; -}; - -} // namespace synapse diff --git a/include/science/synapse/nodes/stream_out.h b/include/science/synapse/nodes/stream_out.h deleted file mode 100644 index 6df0121..0000000 --- a/include/science/synapse/nodes/stream_out.h +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include "science/libndtp/types.h" -#include "science/scipp/status.h" -#include "science/synapse/api/nodes/stream_out.pb.h" -#include "science/synapse/node.h" - -namespace synapse { - -class StreamOut : public Node { - public: - static constexpr uint16_t DEFAULT_STREAM_OUT_PORT = 50038; - StreamOut(const std::string& destination_address = "", - uint16_t destination_port = DEFAULT_STREAM_OUT_PORT, - const std::string& label = ""); - ~StreamOut(); - - auto init() -> science::Status; - auto read(science::libndtp::SynapseData* out, science::libndtp::NDTPHeader* header = nullptr, size_t* bytes_read = nullptr) -> science::Status; - - [[nodiscard]] static auto from_proto( - const synapse::NodeConfig& proto, - std::shared_ptr* node - ) -> science::Status; - - protected: - auto p_to_proto(synapse::NodeConfig* proto) -> science::Status override; - - private: - std::string destination_address_; - uint16_t destination_port_; - std::string label_; - int socket_ = 0; - std::optional addr_; - - static constexpr uint32_t SOCKET_BUFSIZE_BYTES = 5 * 1024 * 1024; // 5MB -}; - -} // namespace synapse diff --git a/include/science/synapse/nodes/udp_node.h b/include/science/synapse/nodes/udp_node.h deleted file mode 100644 index 1a6e166..0000000 --- a/include/science/synapse/nodes/udp_node.h +++ /dev/null @@ -1,30 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "science/scipp/status.h" -#include "science/synapse/node.h" - -namespace synapse { - -class UdpNode : public Node { - public: - explicit UdpNode(const synapse::NodeType& node_type); - virtual ~UdpNode(); - - protected: - auto addr() const -> std::optional; - auto init() -> science::Status; - auto sock() const -> int; - virtual auto get_host(std::string* host) -> science::Status; - - private: - int socket_ = 0; - std::optional addr_; - - auto get_port(uint16_t* port) -> science::Status; -}; - -} // namespace synapse diff --git a/include/science/synapse/tap.h b/include/science/synapse/tap.h new file mode 100644 index 0000000..03e20c8 --- /dev/null +++ b/include/science/synapse/tap.h @@ -0,0 +1,115 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include "science/scipp/status.h" +#include "science/synapse/api/tap.pb.h" + +namespace synapse { + +/** + * A client for connecting to Synapse device taps. + * + * Taps provide high-throughput data streaming to/from a Synapse device + * using ZeroMQ. Producer taps emit data (e.g., broadband neural data), + * while consumer taps accept data (e.g., stimulation commands). + */ +class Tap { + public: + /** + * Create a Tap client for a device. + * + * @param device_uri The URI of the Synapse device (e.g., "192.168.1.100:647") + */ + explicit Tap(const std::string& device_uri); + ~Tap(); + + // Prevent copying + Tap(const Tap&) = delete; + Tap& operator=(const Tap&) = delete; + + // Allow moving + Tap(Tap&&) noexcept; + Tap& operator=(Tap&&) noexcept; + + /** + * Query available taps from the device. + * + * @return Vector of TapConnection objects describing available taps. + */ + [[nodiscard]] auto list_taps() -> std::vector; + + /** + * Connect to a tap by name. + * + * @param tap_name The name of the tap to connect to. + * @return Status indicating success or failure. + */ + [[nodiscard]] auto connect(const std::string& tap_name) -> science::Status; + + /** + * Disconnect from the current tap. + */ + void disconnect(); + + /** + * Check if currently connected to a tap. + * + * @return true if connected, false otherwise. + */ + [[nodiscard]] auto is_connected() const -> bool; + + /** + * Get the currently connected tap info. + * + * @return The connected TapConnection, or nullopt if not connected. + */ + [[nodiscard]] auto connected_tap() const -> std::optional; + + /** + * Read a single message from the tap (blocking with timeout). + * + * Only valid for producer taps (TAP_TYPE_PRODUCER). + * + * @param out Output buffer for the received data. + * @param timeout_ms Timeout in milliseconds (default: 1000). + * @return Status indicating success, timeout, or error. + */ + [[nodiscard]] auto read(std::vector* out, int timeout_ms = 1000) -> science::Status; + + /** + * Send data to the tap. + * + * Only valid for consumer taps (TAP_TYPE_CONSUMER). + * + * @param data The data to send. + * @return Status indicating success or failure. + */ + [[nodiscard]] auto send(const std::vector& data) -> science::Status; + + /** + * Read multiple messages in a batch (non-blocking). + * + * @param out Output vector for received messages. + * @param max_messages Maximum number of messages to read. + * @param timeout_ms Timeout for the entire batch operation. + * @return Number of messages read. + */ + [[nodiscard]] auto read_batch(std::vector>* out, + size_t max_messages, + int timeout_ms = 100) -> size_t; + + private: + std::string device_uri_; + std::unique_ptr zmq_context_; + std::unique_ptr zmq_socket_; + std::optional connected_tap_; + + void cleanup(); +}; + +} // namespace synapse diff --git a/include/science/synapse/util/discover.h b/include/science/synapse/util/discover.h index 732cebf..7d3098e 100644 --- a/include/science/synapse/util/discover.h +++ b/include/science/synapse/util/discover.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -8,20 +9,42 @@ namespace synapse { - /** * Discover Synapse devices on the network. - * - * Sends a DISCOVERY request to the network and waits for responses. - * - * @param timeout_ms The timeout in milliseconds. Optional. - * @param discovered A list of discovered devices to populate. Optional. - * @return science::Status + * + * Listens for device broadcast announcements on UDP port 6470. + * Devices periodically broadcast "ID serial capability port name" messages. + * + * @param timeout_ms The timeout in milliseconds (default: 10000ms). + * @param discovered A vector to populate with discovered devices. Optional. + * @return science::Status indicating success or failure. */ -auto discover(unsigned int timeout_ms = 3000, +auto discover(unsigned int timeout_ms = 10000, std::vector* discovered = nullptr) -> science::Status; +/** + * Discover devices with a callback for each device found. + * + * This allows processing devices as they're discovered rather than + * waiting for the full timeout. + * + * @param callback Called for each device discovered. Return false to stop discovery. + * @param timeout_ms The timeout in milliseconds (default: 10000ms). + * @return science::Status indicating success or failure. + */ +auto discover_iter(std::function callback, + unsigned int timeout_ms = 10000) -> science::Status; + +/** + * Parse a device advertisement message. + * + * @param host The IP address of the sender. + * @param payload The parsed tokens from the message. + * @param parsed Output parameter for the parsed advertisement. + * @return science::Status indicating success or failure. + */ auto parse(const std::string& host, const std::vector& payload, DeviceAdvertisement* parsed) -> science::Status; -} + +} // namespace synapse diff --git a/include/science/synapse/version.h.in b/include/science/synapse/version.h.in index 227e45f..a5acdaf 100644 --- a/include/science/synapse/version.h.in +++ b/include/science/synapse/version.h.in @@ -3,6 +3,6 @@ namespace synapse { -const std::string SYNAPSE_VERSION = "@CMAKE_PROJECT_VERSION@"; +const std::string SYNAPSE_VERSION = "@SYNAPSE_CPP_VERSION_FULL@"; } diff --git a/src/science/synapse/config.cpp b/src/science/synapse/config.cpp index 9413ab2..085b38d 100644 --- a/src/science/synapse/config.cpp +++ b/src/science/synapse/config.cpp @@ -1,12 +1,12 @@ #include "science/synapse/config.h" #include "science/synapse/nodes/broadband_source.h" +#include "science/synapse/nodes/disk_writer.h" #include "science/synapse/nodes/electrical_stimulation.h" #include "science/synapse/nodes/optical_stimulation.h" -#include "science/synapse/nodes/spike_detect.h" -#include "science/synapse/nodes/spike_source.h" #include "science/synapse/nodes/spectral_filter.h" -#include "science/synapse/nodes/stream_in.h" -#include "science/synapse/nodes/stream_out.h" +#include "science/synapse/nodes/spike_binner.h" +#include "science/synapse/nodes/spike_detector.h" +#include "science/synapse/nodes/spike_source.h" namespace synapse { @@ -24,26 +24,26 @@ auto create_node(const synapse::NodeConfig& config, std::shared_ptr* node_ case synapse::NodeType::kBroadbandSource: return BroadbandSource::from_proto(config, node_ptr); + case synapse::NodeType::kDiskWriter: + return DiskWriter::from_proto(config, node_ptr); + case synapse::NodeType::kElectricalStimulation: return ElectricalStimulation::from_proto(config, node_ptr); case synapse::NodeType::kOpticalStimulation: return OpticalStimulation::from_proto(config, node_ptr); - case synapse::NodeType::kSpikeDetect: - return SpikeDetect::from_proto(config, node_ptr); - - case synapse::NodeType::kSpikeSource: - return SpikeSource::from_proto(config, node_ptr); - case synapse::NodeType::kSpectralFilter: return SpectralFilter::from_proto(config, node_ptr); - case synapse::NodeType::kStreamIn: - return StreamIn::from_proto(config, node_ptr); + case synapse::NodeType::kSpikeBinner: + return SpikeBinner::from_proto(config, node_ptr); + + case synapse::NodeType::kSpikeDetector: + return SpikeDetector::from_proto(config, node_ptr); - case synapse::NodeType::kStreamOut: - return StreamOut::from_proto(config, node_ptr); + case synapse::NodeType::kSpikeSource: + return SpikeSource::from_proto(config, node_ptr); default: return { science::StatusCode::kInvalidArgument, "unknown node type \"" + std::to_string(config.type()) + "\"" }; diff --git a/src/science/synapse/device.cpp b/src/science/synapse/device.cpp index 310bb4d..c9a94de 100644 --- a/src/science/synapse/device.cpp +++ b/src/science/synapse/device.cpp @@ -151,8 +151,168 @@ auto Device::stop(std::optional timeout) -> science:: return status; } -auto Device::sockets() const -> const std::vector& { - return sockets_; +auto Device::query(const synapse::QueryRequest& request, + synapse::QueryResponse* response, + std::optional timeout) -> science::Status { + if (response == nullptr) { + return {science::StatusCode::kInvalidArgument, "response must not be null"}; + } + + grpc::ClientContext context; + if (timeout) { + context.set_deadline(std::chrono::system_clock::now() + *timeout); + } + + synapse::QueryResponse res; + science::Status status; + bool done = false; + std::mutex m; + std::condition_variable cv; + + rpc_->async()->Query( + &context, &request, &res, + [&res, &status, &done, &m, &cv, response](grpc::Status gstatus) { + science::Status s; + if (!gstatus.ok()) { + s = {static_cast(gstatus.error_code()), gstatus.error_message()}; + } else { + if (res.status().code() != synapse::StatusCode::kOk) { + s = {science::StatusCode::kInternal, + "(code: " + std::to_string(res.status().code()) + "): " + res.status().message()}; + } + response->CopyFrom(res); + } + + std::lock_guard lock(m); + done = true; + status = s; + cv.notify_one(); + }); + + std::unique_lock lock(m); + cv.wait(lock, [&done] { return done; }); + return status; +} + +auto Device::get_logs(const synapse::LogQueryRequest& request, + synapse::LogQueryResponse* response, + std::optional timeout) -> science::Status { + if (response == nullptr) { + return {science::StatusCode::kInvalidArgument, "response must not be null"}; + } + + grpc::ClientContext context; + if (timeout) { + context.set_deadline(std::chrono::system_clock::now() + *timeout); + } + + synapse::LogQueryResponse res; + science::Status status; + bool done = false; + std::mutex m; + std::condition_variable cv; + + rpc_->async()->GetLogs( + &context, &request, &res, + [&res, &status, &done, &m, &cv, response](grpc::Status gstatus) { + science::Status s; + if (!gstatus.ok()) { + s = {static_cast(gstatus.error_code()), gstatus.error_message()}; + } else { + response->CopyFrom(res); + } + + std::lock_guard lock(m); + done = true; + status = s; + cv.notify_one(); + }); + + std::unique_lock lock(m); + cv.wait(lock, [&done] { return done; }); + return status; +} + +auto Device::update_settings(const synapse::UpdateDeviceSettingsRequest& request, + synapse::UpdateDeviceSettingsResponse* response, + std::optional timeout) -> science::Status { + if (response == nullptr) { + return {science::StatusCode::kInvalidArgument, "response must not be null"}; + } + + grpc::ClientContext context; + if (timeout) { + context.set_deadline(std::chrono::system_clock::now() + *timeout); + } + + synapse::UpdateDeviceSettingsResponse res; + science::Status status; + bool done = false; + std::mutex m; + std::condition_variable cv; + + rpc_->async()->UpdateDeviceSettings( + &context, &request, &res, + [&res, &status, &done, &m, &cv, response](grpc::Status gstatus) { + science::Status s; + if (!gstatus.ok()) { + s = {static_cast(gstatus.error_code()), gstatus.error_message()}; + } else { + if (res.status().code() != synapse::StatusCode::kOk) { + s = {science::StatusCode::kInternal, + "(code: " + std::to_string(res.status().code()) + "): " + res.status().message()}; + } + response->CopyFrom(res); + } + + std::lock_guard lock(m); + done = true; + status = s; + cv.notify_one(); + }); + + std::unique_lock lock(m); + cv.wait(lock, [&done] { return done; }); + return status; +} + +auto Device::list_apps(synapse::ListAppsResponse* response, + std::optional timeout) -> science::Status { + if (response == nullptr) { + return {science::StatusCode::kInvalidArgument, "response must not be null"}; + } + + grpc::ClientContext context; + if (timeout) { + context.set_deadline(std::chrono::system_clock::now() + *timeout); + } + + synapse::ListAppsRequest req; + synapse::ListAppsResponse res; + science::Status status; + bool done = false; + std::mutex m; + std::condition_variable cv; + + rpc_->async()->ListApps( + &context, &req, &res, + [&res, &status, &done, &m, &cv, response](grpc::Status gstatus) { + science::Status s; + if (!gstatus.ok()) { + s = {static_cast(gstatus.error_code()), gstatus.error_message()}; + } else { + response->CopyFrom(res); + } + + std::lock_guard lock(m); + done = true; + status = s; + cv.notify_one(); + }); + + std::unique_lock lock(m); + cv.wait(lock, [&done] { return done; }); + return status; } auto Device::uri() const -> const std::string& { @@ -167,13 +327,6 @@ auto Device::handle_status_response(const synapse::Status& status) -> science::S }; } - sockets_.clear(); - for (const auto& socket : status.sockets()) { - synapse::NodeSocket s; - s.CopyFrom(socket); - sockets_.push_back(s); - } - return {}; } diff --git a/src/science/synapse/nodes/disk_writer.cpp b/src/science/synapse/nodes/disk_writer.cpp new file mode 100644 index 0000000..aa93bac --- /dev/null +++ b/src/science/synapse/nodes/disk_writer.cpp @@ -0,0 +1,33 @@ +#include "science/synapse/nodes/disk_writer.h" + +namespace synapse { + +DiskWriter::DiskWriter(const std::string& filename) + : Node(NodeType::kDiskWriter), + filename_(filename) {} + +auto DiskWriter::from_proto(const synapse::NodeConfig& proto, + std::shared_ptr* node) -> science::Status { + if (!proto.has_disk_writer()) { + return {science::StatusCode::kInvalidArgument, "missing disk_writer config"}; + } + + const auto& config = proto.disk_writer(); + + *node = std::make_shared(config.filename()); + + return {}; +} + +auto DiskWriter::p_to_proto(synapse::NodeConfig* proto) -> science::Status { + if (proto == nullptr) { + return {science::StatusCode::kInvalidArgument, "proto ptr must not be null"}; + } + + synapse::DiskWriterConfig* config = proto->mutable_disk_writer(); + config->set_filename(filename_); + + return {}; +} + +} // namespace synapse diff --git a/src/science/synapse/nodes/spike_binner.cpp b/src/science/synapse/nodes/spike_binner.cpp new file mode 100644 index 0000000..fe423f0 --- /dev/null +++ b/src/science/synapse/nodes/spike_binner.cpp @@ -0,0 +1,33 @@ +#include "science/synapse/nodes/spike_binner.h" + +namespace synapse { + +SpikeBinner::SpikeBinner(uint32_t bin_size_ms) + : Node(NodeType::kSpikeBinner), + bin_size_ms_(bin_size_ms) {} + +auto SpikeBinner::from_proto(const synapse::NodeConfig& proto, + std::shared_ptr* node) -> science::Status { + if (!proto.has_spike_binner()) { + return {science::StatusCode::kInvalidArgument, "missing spike_binner config"}; + } + + const auto& config = proto.spike_binner(); + + *node = std::make_shared(config.bin_size_ms()); + + return {}; +} + +auto SpikeBinner::p_to_proto(synapse::NodeConfig* proto) -> science::Status { + if (proto == nullptr) { + return {science::StatusCode::kInvalidArgument, "proto ptr must not be null"}; + } + + synapse::SpikeBinnerConfig* config = proto->mutable_spike_binner(); + config->set_bin_size_ms(bin_size_ms_); + + return {}; +} + +} // namespace synapse diff --git a/src/science/synapse/nodes/spike_detect.cpp b/src/science/synapse/nodes/spike_detect.cpp deleted file mode 100644 index 358db55..0000000 --- a/src/science/synapse/nodes/spike_detect.cpp +++ /dev/null @@ -1,60 +0,0 @@ -#include "science/synapse/nodes/spike_detect.h" - -namespace synapse { - -SpikeDetect::SpikeDetect( - const synapse::SpikeDetectConfig::SpikeDetectMode& mode, - uint32_t threshold_uv, - const ChannelMask& template_uv, - bool sort, - uint32_t bin_size_ms -) : Node(NodeType::kSpikeDetect), - mode_(mode), - threshold_uv_(threshold_uv), - template_uv_(template_uv), - sort_(sort), - bin_size_ms_(bin_size_ms) {} - -auto SpikeDetect::from_proto(const synapse::NodeConfig& proto, std::shared_ptr* node) -> science::Status { - if (!proto.has_spike_detect()) { - return { science::StatusCode::kInvalidArgument, "missing spike_detect config" }; - } - - const auto& config = proto.spike_detect(); - ChannelMask template_uv(std::vector(config.template_uv().begin(), config.template_uv().end())); - - *node = std::make_shared( - config.mode(), - config.threshold_uv(), - template_uv, - config.sort(), - config.bin_size_ms() - ); - - return {}; -} - -auto SpikeDetect::p_to_proto(synapse::NodeConfig* proto) -> science::Status { - if (proto == nullptr) { - return { science::StatusCode::kInvalidArgument, "proto ptr must not be null" }; - } - - synapse::SpikeDetectConfig* config = proto->mutable_spike_detect(); - - const auto& temp = template_uv_.channels(); - for (size_t i = 0; i < temp.size(); ++i) { - if (!temp[i]) { - continue; - } - config->add_template_uv(i); - } - - config->set_mode(mode_); - config->set_threshold_uv(threshold_uv_); - config->set_sort(sort_); - config->set_bin_size_ms(bin_size_ms_); - - return {}; -} - -} // namespace synapse diff --git a/src/science/synapse/nodes/spike_detector.cpp b/src/science/synapse/nodes/spike_detector.cpp new file mode 100644 index 0000000..36891b2 --- /dev/null +++ b/src/science/synapse/nodes/spike_detector.cpp @@ -0,0 +1,69 @@ +#include "science/synapse/nodes/spike_detector.h" + +namespace synapse { + +SpikeDetector::SpikeDetector() + : Node(NodeType::kSpikeDetector), + mode_(Mode::Thresholder), + threshold_uv_(0), + samples_per_spike_(0) {} + +auto SpikeDetector::create_thresholder(uint32_t threshold_uv, uint32_t samples_per_spike) + -> std::shared_ptr { + auto detector = std::shared_ptr(new SpikeDetector()); + detector->mode_ = Mode::Thresholder; + detector->threshold_uv_ = threshold_uv; + detector->samples_per_spike_ = samples_per_spike; + return detector; +} + +auto SpikeDetector::create_template_matcher(const std::vector& template_uv, uint32_t samples_per_spike) + -> std::shared_ptr { + auto detector = std::shared_ptr(new SpikeDetector()); + detector->mode_ = Mode::TemplateMatcher; + detector->template_uv_ = template_uv; + detector->samples_per_spike_ = samples_per_spike; + return detector; +} + +auto SpikeDetector::from_proto(const synapse::NodeConfig& proto, std::shared_ptr* node) -> science::Status { + if (!proto.has_spike_detector()) { + return { science::StatusCode::kInvalidArgument, "missing spike_detector config" }; + } + + const auto& config = proto.spike_detector(); + + if (config.has_thresholder()) { + *node = create_thresholder(config.thresholder().threshold_uv(), config.samples_per_spike()); + } else if (config.has_template_matcher()) { + const auto& tm = config.template_matcher(); + std::vector template_uv(tm.template_uv().begin(), tm.template_uv().end()); + *node = create_template_matcher(template_uv, config.samples_per_spike()); + } else { + return { science::StatusCode::kInvalidArgument, "spike_detector config must have thresholder or template_matcher" }; + } + + return {}; +} + +auto SpikeDetector::p_to_proto(synapse::NodeConfig* proto) -> science::Status { + if (proto == nullptr) { + return { science::StatusCode::kInvalidArgument, "proto ptr must not be null" }; + } + + synapse::SpikeDetectorConfig* config = proto->mutable_spike_detector(); + config->set_samples_per_spike(samples_per_spike_); + + if (mode_ == Mode::Thresholder) { + config->mutable_thresholder()->set_threshold_uv(threshold_uv_); + } else { + auto* tm = config->mutable_template_matcher(); + for (auto val : template_uv_) { + tm->add_template_uv(val); + } + } + + return {}; +} + +} // namespace synapse diff --git a/src/science/synapse/nodes/stream_in.cpp b/src/science/synapse/nodes/stream_in.cpp deleted file mode 100644 index 0e605ce..0000000 --- a/src/science/synapse/nodes/stream_in.cpp +++ /dev/null @@ -1,56 +0,0 @@ -#include "science/synapse/nodes/stream_in.h" - -#include -#include -#include -#include - -#include "science/synapse/device.h" - -namespace synapse { - -StreamIn::StreamIn(const synapse::DataType& data_type, const std::vector& shape) - : UdpNode(NodeType::kStreamIn), data_type_(data_type), shape_(shape) {} - -auto StreamIn::from_proto(const synapse::NodeConfig& proto, std::shared_ptr* node) -> science::Status { - if (!proto.has_stream_in()) { - return { science::StatusCode::kInvalidArgument, "missing stream_in config" }; - } - - const auto& config = proto.stream_in(); - auto data_type = config.data_type(); - auto shape = std::vector(config.shape().begin(), config.shape().end()); - *node = std::make_shared(data_type, shape); - return {}; -} - -auto StreamIn::write(const std::vector& in) -> science::Status { - if (!sock() || !addr()) { - auto s = UdpNode::init(); - if (!s.ok()) { - return { s.code(), "error initializing socket: " + s.message() }; - } - } - - const auto saddr = addr().value(); - sendto(sock(), in.data(), in.size(), 0, reinterpret_cast(&saddr), sizeof(saddr)); - - return {}; -} - -auto StreamIn::p_to_proto(synapse::NodeConfig* proto) -> science::Status { - if (proto == nullptr) { - return { science::StatusCode::kInvalidArgument, "proto ptr must not be null" }; - } - - synapse::StreamInConfig* config = proto->mutable_stream_in(); - - config->set_data_type(data_type_); - for (const auto& dim : shape_) { - config->add_shape(dim); - } - - return {}; -} - -} // namespace synapse diff --git a/src/science/synapse/nodes/stream_out.cpp b/src/science/synapse/nodes/stream_out.cpp deleted file mode 100644 index 0b73cae..0000000 --- a/src/science/synapse/nodes/stream_out.cpp +++ /dev/null @@ -1,253 +0,0 @@ -#include "science/synapse/nodes/stream_out.h" - -#include -#include -#include -#include -#include -#include - -#include "science/libndtp/ndtp.h" -#include "science/libndtp/types.h" -#include "science/synapse/device.h" - -namespace synapse { - -using science::libndtp::BinnedSpiketrainData; -using science::libndtp::ElectricalBroadbandData; -using science::libndtp::NDTPMessage; -using science::libndtp::SynapseData; - -const std::string LOCALHOST = "127.0.0.1"; - -static auto get_client_ip() -> std::string { - int sock = socket(AF_INET, SOCK_DGRAM, 0); - if (sock < 0) { - return LOCALHOST; - } - - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(80); - addr.sin_addr.s_addr = inet_addr("8.8.8.8"); - - if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { - close(sock); - return LOCALHOST; - } - - struct sockaddr_in local_addr; - socklen_t len = sizeof(local_addr); - if (getsockname(sock, (struct sockaddr*)&local_addr, &len) < 0) { - close(sock); - return LOCALHOST; - } - - close(sock); - return inet_ntoa(local_addr.sin_addr); -} - -static auto sockaddr(const std::string& host, uint16_t port) -> sockaddr_in { - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - inet_pton(AF_INET, host.c_str(), &addr.sin_addr); - return addr; -} - -auto unpack( - const std::vector& bytes, - SynapseData* data, - science::libndtp::NDTPHeader* header, - size_t* bytes_read -) -> science::Status { - NDTPMessage msg; - try { - msg = NDTPMessage::unpack(bytes); - } catch (const std::exception& e) { - return { science::StatusCode::kInternal, "error unpacking NDTP message: " + std::string(e.what()) }; - } - - if (header != nullptr) { - *header = msg.header; - } - if (bytes_read != nullptr) { - *bytes_read = bytes.size(); - } - - switch (msg.header.data_type) { - case DataType::kBroadband: - *data = ElectricalBroadbandData::unpack(msg); - break; - case DataType::kSpiketrain: - *data = BinnedSpiketrainData::unpack(msg); - break; - default: - return { science::StatusCode::kInternal, "unknown data type: " + std::to_string(msg.header.data_type) }; - } - - return {}; -} - -StreamOut::StreamOut(const std::string& destination_address, - uint16_t destination_port, - const std::string& label) - : Node(NodeType::kStreamOut), - destination_address_(destination_address.empty() ? get_client_ip() : destination_address), - destination_port_(destination_port ? destination_port : DEFAULT_STREAM_OUT_PORT), - label_(label) {} - -StreamOut::~StreamOut() { - if (socket_ > 0) { - close(socket_); - } -} - -auto StreamOut::init() -> science::Status { - socket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (socket_ < 0) { - return { science::StatusCode::kInternal, "error creating socket (code: " + std::to_string(socket_) + ")" }; - } - - // Allow reuse for easy restart - int reuse = 1; - auto rc = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); - if (rc < 0) { - return { science::StatusCode::kInternal, "error configuring SO_REUSEADDR (code: " + std::to_string(rc) + ")" }; - } - - #ifdef SO_REUSEPORT - rc = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse)); - if (rc < 0) { - return { science::StatusCode::kInternal, "error configuring SO_REUSEPORT (code: " + std::to_string(rc) + ")" }; - } - #endif - - // Set non-blocking mode - int flags = fcntl(socket_, F_GETFL, 0); - if (flags < 0) { - return { - science::StatusCode::kInternal, - "error getting socket flags (code: " + std::to_string(flags) + ")" - }; - } - rc = fcntl(socket_, F_SETFL, flags | O_NONBLOCK); - if (rc < 0) { - return { - science::StatusCode::kInternal, - "error setting non-blocking mode (code: " + std::to_string(rc) + ")" - }; - } - - // Try to set a large recv buffer - int bufsize = SOCKET_BUFSIZE_BYTES; - rc = setsockopt(socket_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)); - if (rc < 0) { - // continue - } - - int actual_bufsize; - socklen_t size = sizeof(actual_bufsize); - rc = getsockopt(socket_, SOL_SOCKET, SO_RCVBUF, &actual_bufsize, &size); - if (rc == 0 && actual_bufsize < SOCKET_BUFSIZE_BYTES) { - // continue - } - - addr_ = sockaddr(destination_address_, destination_port_); - - rc = bind(socket_, reinterpret_cast(&addr_.value()), sizeof(addr_.value())); - if (rc < 0) { - return { science::StatusCode::kInternal, - "error binding socket to " + destination_address_ + ":" + std::to_string(destination_port_) + - " (code: " + std::to_string(rc) + ", errno: " + std::to_string(errno) + ")" }; - } - - return {}; -} - -auto StreamOut::read(SynapseData* data, science::libndtp::NDTPHeader* header, size_t* bytes_read) -> science::Status { - if (!socket_ || !addr_) { - auto s = init(); - if (!s.ok()) { - return { s.code(), "error initializing socket: " + s.message() }; - } - } - - auto saddr = addr_.value(); - socklen_t saddr_len = sizeof(saddr); - - fd_set readfds; - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 1000; - - FD_ZERO(&readfds); - FD_SET(socket_, &readfds); - - int ready = select(socket_ + 1, &readfds, nullptr, nullptr, &tv); - if (ready < 0) { - return { science::StatusCode::kInternal, "error waiting for data: " + std::string(strerror(errno)) }; - } - if (ready == 0) { - return { science::StatusCode::kUnavailable, "no data available" }; - } - - std::vector buf; - buf.resize(8192); - auto rc = recvfrom(socket_, buf.data(), buf.size(), 0, reinterpret_cast(&saddr), &saddr_len); - if (rc < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - return { science::StatusCode::kUnavailable, "no data available" }; - } - buf.resize(0); - return { science::StatusCode::kInternal, "error reading from socket (code: " + std::to_string(rc) + ")" }; - } - - buf.resize(rc); - return unpack(buf, data, header, bytes_read); -} - -auto StreamOut::from_proto(const synapse::NodeConfig& proto, std::shared_ptr* node) -> science::Status { - if (!proto.has_stream_out()) { - return { science::StatusCode::kInvalidArgument, "missing stream_out config" }; - } - - const auto& config = proto.stream_out(); - const auto& label = config.label(); - - if (!config.has_udp_unicast()) { - // Use defaults - *node = std::make_shared("", DEFAULT_STREAM_OUT_PORT, label); - return {}; - } - - const auto& unicast = config.udp_unicast(); - std::string dest_addr = unicast.destination_address(); - uint16_t dest_port = unicast.destination_port(); - - if (dest_addr.empty()) { - dest_addr = get_client_ip(); - } - if (dest_port == 0) { - dest_port = DEFAULT_STREAM_OUT_PORT; - } - - *node = std::make_shared(dest_addr, dest_port, label); - return {}; -} - -auto StreamOut::p_to_proto(synapse::NodeConfig* proto) -> science::Status { - if (proto == nullptr) { - return { science::StatusCode::kInvalidArgument, "proto ptr must not be null" }; - } - - synapse::StreamOutConfig* config = proto->mutable_stream_out(); - synapse::UDPUnicastConfig* unicast = config->mutable_udp_unicast(); - unicast->set_destination_address(destination_address_.empty() ? LOCALHOST : destination_address_); - unicast->set_destination_port(destination_port_); - config->set_label(label_); - - return {}; -} - -} // namespace synapse diff --git a/src/science/synapse/nodes/udp_node.cpp b/src/science/synapse/nodes/udp_node.cpp deleted file mode 100644 index e3b20a8..0000000 --- a/src/science/synapse/nodes/udp_node.cpp +++ /dev/null @@ -1,130 +0,0 @@ -#include "science/synapse/nodes/udp_node.h" - -#include -#include -#include -#include - -#include "science/synapse/device.h" - -namespace synapse { - - -static auto sockaddr(const std::string& host, uint16_t port) -> sockaddr_in { - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - inet_pton(AF_INET, host.c_str(), &addr.sin_addr); - return addr; -} - -UdpNode::UdpNode(const synapse::NodeType& node_type) : Node(node_type) {} - -UdpNode::~UdpNode() { - if (socket_ > 0) { - close(socket_); - } -} - -auto UdpNode::addr() const -> std::optional { - return addr_; -} - -auto UdpNode::get_host(std::string* host) -> science::Status { - if (host == nullptr) { - return { science::StatusCode::kInvalidArgument, "host ptr must not be null" }; - } - - if (device_ == nullptr) { - return { science::StatusCode::kFailedPrecondition, "device not set, has Device been configured?" }; - } - const auto& uri = device_->uri(); - auto pos = uri.find(':'); - if (pos == std::string::npos) { - return { science::StatusCode::kInternal, "invalid uri on Device" }; - } - - *host = uri.substr(0, pos); - return {}; -} - -auto UdpNode::get_port(uint16_t* port) -> science::Status { - if (port == nullptr) { - return { science::StatusCode::kInvalidArgument, "port ptr must not be null" }; - } - - if (device_ == nullptr) { - return { science::StatusCode::kFailedPrecondition, "device not set, has Device been configured?" }; - } - - const auto& sockets = device_->sockets(); - const auto& self = std::find_if(sockets.begin(), sockets.end(), [this](const auto& s) { - return s.node_id() == id(); - }); - - if (self == sockets.end()) { - return { science::StatusCode::kFailedPrecondition, "socket not found, has Device been configured?" }; - } - - auto addr = self->bind(); - - auto pos = addr.find(':'); - - if (pos == std::string::npos) { - return { science::StatusCode::kInvalidArgument, "invalid bind address" }; - } - - try { - *port = std::stoi(addr.substr(pos + 1)); - } catch (const std::invalid_argument& e) { - return { science::StatusCode::kInvalidArgument, "invalid bind port" }; - } catch (const std::out_of_range& e) { - return { science::StatusCode::kInternal, "bind port out of range" }; - } - - return {}; -} - -auto UdpNode::init() -> science::Status { - std::string host; - uint16_t port; - - auto s = get_host(&host); - if (!s.ok()) { - return { s.code(), "error initializing socket: " + s.message() }; - } - - s = get_port(&port); - if (!s.ok()) { - return { s.code(), "error initializing socket: " + s.message() }; - } - - addr_ = sockaddr(host, port); - - socket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (socket_ < 0) { - return { science::StatusCode::kInternal, "error creating socket (code: " + std::to_string(socket_) + ")" }; - } - - int reuse = 1; - auto rc = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); - if (rc < 0) { - return { science::StatusCode::kInternal, "error configuring SO_REUSEADDR (code: " + std::to_string(rc) + ")" }; - } - - #ifdef SO_REUSEPORT - rc = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse)); - if (rc < 0) { - return { science::StatusCode::kInternal, "error configuring SO_REUSEPORT (code: " + std::to_string(rc) + ")" }; - } - #endif - - return {}; -} - -auto UdpNode::sock() const -> int { - return socket_; -} - - -} // namespace synapse diff --git a/src/science/synapse/tap.cpp b/src/science/synapse/tap.cpp new file mode 100644 index 0000000..3d9e42f --- /dev/null +++ b/src/science/synapse/tap.cpp @@ -0,0 +1,265 @@ +#include "science/synapse/tap.h" +#include "science/synapse/device.h" +#include "science/synapse/api/query.pb.h" + +#include +#include +#include + +namespace synapse { + +Tap::Tap(const std::string& device_uri) + : device_uri_(device_uri), + zmq_context_(nullptr), + zmq_socket_(nullptr), + connected_tap_(std::nullopt) {} + +Tap::~Tap() { + cleanup(); +} + +Tap::Tap(Tap&& other) noexcept + : device_uri_(std::move(other.device_uri_)), + zmq_context_(std::move(other.zmq_context_)), + zmq_socket_(std::move(other.zmq_socket_)), + connected_tap_(std::move(other.connected_tap_)) {} + +Tap& Tap::operator=(Tap&& other) noexcept { + if (this != &other) { + cleanup(); + device_uri_ = std::move(other.device_uri_); + zmq_context_ = std::move(other.zmq_context_); + zmq_socket_ = std::move(other.zmq_socket_); + connected_tap_ = std::move(other.connected_tap_); + } + return *this; +} + +auto Tap::list_taps() -> std::vector { + Device device(device_uri_); + + synapse::QueryRequest request; + request.set_query_type(synapse::QueryRequest::kListTaps); + request.mutable_list_taps_query(); + + synapse::QueryResponse response; + auto status = device.query(request, &response); + + if (!status.ok()) { + return {}; + } + + if (response.status().code() != synapse::StatusCode::kOk) { + return {}; + } + + std::vector taps; + for (const auto& tap : response.list_taps_response().taps()) { + taps.push_back(tap); + } + + return taps; +} + +auto Tap::connect(const std::string& tap_name) -> science::Status { + auto taps = list_taps(); + + // Find the tap with the specified name + const synapse::TapConnection* selected_tap = nullptr; + for (const auto& tap : taps) { + if (tap.name() == tap_name) { + selected_tap = &tap; + break; + } + } + + if (!selected_tap) { + return {science::StatusCode::kNotFound, "Tap '" + tap_name + "' not found"}; + } + + // Initialize ZMQ context + zmq_context_ = std::make_unique(1); + + // Create appropriate socket type based on tap type + if (selected_tap->tap_type() == synapse::TapType::TAP_TYPE_CONSUMER) { + // For consumer taps, we need to publish data TO the tap + zmq_socket_ = std::make_unique(*zmq_context_, zmq::socket_type::pub); + } else { + // For producer taps (or unspecified), we need to subscribe and listen FROM the tap + zmq_socket_ = std::make_unique(*zmq_context_, zmq::socket_type::sub); + } + + // Optimize ZMQ for high-throughput data +#ifdef _WIN32 + // Use smaller buffer sizes for Windows + zmq_socket_->set(zmq::sockopt::rcvbuf, 2 * 1024 * 1024); // 2MB + zmq_socket_->set(zmq::sockopt::rcvhwm, 5000); +#else + // Linux/macOS can handle larger buffers + zmq_socket_->set(zmq::sockopt::rcvbuf, 16 * 1024 * 1024); // 16MB + zmq_socket_->set(zmq::sockopt::rcvhwm, 10000); +#endif + + // Set TCP keepalive for connection stability + zmq_socket_->set(zmq::sockopt::tcp_keepalive, 1); + zmq_socket_->set(zmq::sockopt::tcp_keepalive_idle, 60); + + // Build the endpoint URL, replacing the host with our device URI + std::string endpoint = selected_tap->endpoint(); + if (endpoint.find("://") != std::string::npos) { + // Extract protocol and port from the endpoint + std::regex endpoint_regex("([^:]+)://[^:]+:(\\d+)"); + std::smatch match; + if (std::regex_match(endpoint, match, endpoint_regex)) { + std::string protocol = match[1].str(); + std::string port = match[2].str(); + + // Extract host from device URI (strip port if present) + std::string host = device_uri_; + auto colon_pos = host.find(':'); + if (colon_pos != std::string::npos) { + host = host.substr(0, colon_pos); + } + + endpoint = protocol + "://" + host + ":" + port; + } + } + + try { + zmq_socket_->connect(endpoint); + + // Only set subscription for subscriber sockets + if (selected_tap->tap_type() != synapse::TapType::TAP_TYPE_CONSUMER) { + zmq_socket_->set(zmq::sockopt::subscribe, ""); + } + + connected_tap_ = *selected_tap; + return {}; + } catch (const zmq::error_t& e) { + cleanup(); + return {science::StatusCode::kInternal, "Failed to connect to tap: " + std::string(e.what())}; + } +} + +void Tap::disconnect() { + cleanup(); +} + +auto Tap::is_connected() const -> bool { + return connected_tap_.has_value() && zmq_socket_ != nullptr; +} + +auto Tap::connected_tap() const -> std::optional { + return connected_tap_; +} + +auto Tap::read(std::vector* out, int timeout_ms) -> science::Status { + if (!is_connected()) { + return {science::StatusCode::kFailedPrecondition, "Not connected to any tap"}; + } + + if (connected_tap_->tap_type() == synapse::TapType::TAP_TYPE_CONSUMER) { + return {science::StatusCode::kInvalidArgument, "Cannot read from consumer tap"}; + } + + if (out == nullptr) { + return {science::StatusCode::kInvalidArgument, "Output buffer cannot be null"}; + } + + try { + zmq_socket_->set(zmq::sockopt::rcvtimeo, timeout_ms); + + zmq::message_t message; + auto result = zmq_socket_->recv(message); + + if (!result.has_value()) { + return {science::StatusCode::kDeadlineExceeded, "Timeout waiting for data"}; + } + + out->resize(message.size()); + std::memcpy(out->data(), message.data(), message.size()); + + return {}; + } catch (const zmq::error_t& e) { + if (e.num() == EAGAIN) { + return {science::StatusCode::kDeadlineExceeded, "Timeout waiting for data"}; + } + return {science::StatusCode::kInternal, "Error receiving message: " + std::string(e.what())}; + } +} + +auto Tap::send(const std::vector& data) -> science::Status { + if (!is_connected()) { + return {science::StatusCode::kFailedPrecondition, "Not connected to any tap"}; + } + + if (connected_tap_->tap_type() != synapse::TapType::TAP_TYPE_CONSUMER) { + return {science::StatusCode::kInvalidArgument, "Can only send to consumer tap"}; + } + + try { + zmq::message_t message(data.data(), data.size()); + auto result = zmq_socket_->send(message, zmq::send_flags::none); + + if (!result.has_value()) { + return {science::StatusCode::kInternal, "Failed to send message"}; + } + + return {}; + } catch (const zmq::error_t& e) { + return {science::StatusCode::kInternal, "Error sending message: " + std::string(e.what())}; + } +} + +auto Tap::read_batch(std::vector>* out, + size_t max_messages, + int timeout_ms) -> size_t { + if (!is_connected() || out == nullptr) { + return 0; + } + + if (connected_tap_->tap_type() == synapse::TapType::TAP_TYPE_CONSUMER) { + return 0; + } + + out->clear(); + out->reserve(max_messages); + + zmq_socket_->set(zmq::sockopt::rcvtimeo, timeout_ms); + + try { + while (out->size() < max_messages) { + zmq::message_t message; + auto result = zmq_socket_->recv(message, zmq::recv_flags::dontwait); + + if (!result.has_value()) { + // No more messages available right now + break; + } + + std::vector data(message.size()); + std::memcpy(data.data(), message.data(), message.size()); + out->push_back(std::move(data)); + } + } catch (const zmq::error_t&) { + // Ignore errors in batch mode + } + + return out->size(); +} + +void Tap::cleanup() { + if (zmq_socket_) { + zmq_socket_->close(); + zmq_socket_.reset(); + } + + if (zmq_context_) { + zmq_context_->close(); + zmq_context_.reset(); + } + + connected_tap_.reset(); +} + +} // namespace synapse diff --git a/src/science/synapse/util/discover.cpp b/src/science/synapse/util/discover.cpp index c00bf0d..906f345 100644 --- a/src/science/synapse/util/discover.cpp +++ b/src/science/synapse/util/discover.cpp @@ -4,6 +4,10 @@ #include #include #include +#include +#include +#include +#include #include #include @@ -13,42 +17,31 @@ using science::Status; using science::StatusCode; const char BROADCAST_SERVICE[] = "SYN"; -const char BROADCAST_ADDR[] = "224.0.0.245"; const uint16_t BROADCAST_PORT = 6470; -const uint16_t BROADCAST_TIMEOUT_SEC = 3; - -auto addr(const std::string& host, uint16_t port) -> sockaddr_in { - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - inet_pton(AF_INET, host.c_str(), &addr.sin_addr); - return addr; -} auto parse(const std::string& host, const std::vector& payload, DeviceAdvertisement* parsed) -> science::Status { if (payload.size() < 5) { - return { StatusCode::kInvalidArgument, "invalid response from server" }; + return {StatusCode::kInvalidArgument, "invalid response from server"}; } std::string cmd = payload[0]; if (cmd != "ID") { return { - StatusCode::kInvalidArgument, - "invalid response from server (expected ID, got {" + cmd + "})" - }; + StatusCode::kInvalidArgument, + "invalid response from server (expected ID, got {" + cmd + "})"}; } uint64_t port = 0; try { port = std::stoi(payload[3]); } catch (const std::exception& e) { - return { StatusCode::kInvalidArgument, "invalid port in response from server" }; + return {StatusCode::kInvalidArgument, "invalid port in response from server"}; } if (port < 1 || port > 65535) { - return { StatusCode::kInvalidArgument, "invalid port in response from server" }; + return {StatusCode::kInvalidArgument, "invalid port in response from server"}; } parsed->serial = payload[1]; @@ -76,80 +69,232 @@ auto validate_capability(const std::string& capability_str) -> bool { auto discover(unsigned int timeout_ms, std::vector* discovered) -> science::Status { - const std::string host = BROADCAST_ADDR; - const uint16_t port = BROADCAST_PORT; + // Default timeout: 10 seconds (matching Python) + if (timeout_ms == 0) { + timeout_ms = 10000; + } int sock = socket(AF_INET, SOCK_DGRAM, 0); if (sock < 0) { - return { StatusCode::kInternal, "error creating socket (code: " + std::to_string(sock) + ")" }; + return {StatusCode::kInternal, + "error creating socket (code: " + std::to_string(sock) + ")"}; } + // Enable address reuse int reuse = 1; auto rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); if (rc < 0) { - return { - StatusCode::kInternal, - "error configuring SO_REUSEADDR (code: " + std::to_string(rc) + ")" - }; + close(sock); + return {StatusCode::kInternal, + "error configuring SO_REUSEADDR (code: " + std::to_string(rc) + ")"}; } - #ifdef SO_REUSEPORT +#ifdef SO_REUSEPORT rc = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse)); if (rc < 0) { - return { - StatusCode::kInternal, - "error configuring SO_REUSEPORT (code: " + std::to_string(rc) + ")" - }; + close(sock); + return {StatusCode::kInternal, + "error configuring SO_REUSEPORT (code: " + std::to_string(rc) + ")"}; } - #endif +#endif - int ttl = 3; - rc = setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)); + // Bind to broadcast port to listen for device announcements + sockaddr_in bind_addr; + bind_addr.sin_family = AF_INET; + bind_addr.sin_port = htons(BROADCAST_PORT); + bind_addr.sin_addr.s_addr = INADDR_ANY; + + rc = bind(sock, reinterpret_cast(&bind_addr), sizeof(bind_addr)); if (rc < 0) { - return { - StatusCode::kInternal, - "error configuring socket options (code: " + std::to_string(rc) + ")" + close(sock); + return {StatusCode::kInternal, + "error binding socket (code: " + std::to_string(rc) + ")"}; + } + + if (discovered != nullptr) { + discovered->clear(); + } + + auto start_time = std::chrono::steady_clock::now(); + char buffer[1024]; + + while (true) { + // Check if we've exceeded the timeout + auto now = std::chrono::steady_clock::now(); + auto elapsed_ms = std::chrono::duration_cast(now - start_time).count(); + if (elapsed_ms >= timeout_ms) { + break; + } + + // Calculate remaining time for select + auto remaining_ms = timeout_ms - elapsed_ms; + timeval timeout = { + static_cast(remaining_ms / 1000), + static_cast((remaining_ms % 1000) * 1000) }; + + fd_set fbuffer; + FD_ZERO(&fbuffer); + FD_SET(sock, &fbuffer); + + int rc = select(sock + 1, &fbuffer, nullptr, nullptr, &timeout); + if (rc == 0) { + // Timeout on this iteration, check overall timeout + continue; + } else if (rc < 0) { + break; + } + + if (FD_ISSET(sock, &fbuffer)) { + sockaddr_in sender_addr; + socklen_t sender_len = sizeof(sender_addr); + int recv_len = recvfrom(sock, buffer, sizeof(buffer) - 1, 0, + reinterpret_cast(&sender_addr), &sender_len); + if (recv_len < 0) { + continue; + } + + buffer[recv_len] = '\0'; + + // Parse the response: "ID serial capability port name" + std::istringstream response(buffer); + std::vector tokens; + std::string token; + while (response >> token) { + tokens.push_back(token); + } + + if (tokens.empty() || tokens[0] != "ID") { + continue; + } + + // Get sender's IP address + char sender_ip[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &sender_addr.sin_addr, sender_ip, INET_ADDRSTRLEN); + + DeviceAdvertisement device; + auto s = parse(sender_ip, tokens, &device); + if (!s.ok()) { + continue; + } + + if (!validate_capability(device.capability)) { + continue; + } + + // Check for duplicates + if (discovered != nullptr) { + bool is_duplicate = false; + for (const auto& existing : *discovered) { + if (existing.serial == device.serial && existing.host == device.host) { + is_duplicate = true; + break; + } + } + if (!is_duplicate) { + discovered->push_back(device); + } + } + } } - auto saddr = addr(host, port); - std::string payload = "DISCOVER"; + close(sock); + return {}; +} + +auto discover_iter(std::function callback, + unsigned int timeout_ms) -> science::Status { + if (!callback) { + return {StatusCode::kInvalidArgument, "callback must not be null"}; + } + + // Default timeout: 10 seconds (matching Python) + if (timeout_ms == 0) { + timeout_ms = 10000; + } - rc = sendto(sock, payload.c_str(), payload.size(), 0, reinterpret_cast(&saddr), sizeof(saddr)); + int sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) { + return {StatusCode::kInternal, + "error creating socket (code: " + std::to_string(sock) + ")"}; + } + + // Enable address reuse + int reuse = 1; + auto rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); if (rc < 0) { - return { StatusCode::kInternal, "error sending data to socket (code: " + std::to_string(rc) + ")" }; + close(sock); + return {StatusCode::kInternal, + "error configuring SO_REUSEADDR (code: " + std::to_string(rc) + ")"}; } - fd_set fbuffer; - char buffer[1024]; - timeval timeout = { - timeout_ms ? timeout_ms / 1000 : BROADCAST_TIMEOUT_SEC, - 0 - }; +#ifdef SO_REUSEPORT + rc = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse)); + if (rc < 0) { + close(sock); + return {StatusCode::kInternal, + "error configuring SO_REUSEPORT (code: " + std::to_string(rc) + ")"}; + } +#endif - if (discovered != nullptr) { - discovered->clear(); + // Bind to broadcast port to listen for device announcements + sockaddr_in bind_addr; + bind_addr.sin_family = AF_INET; + bind_addr.sin_port = htons(BROADCAST_PORT); + bind_addr.sin_addr.s_addr = INADDR_ANY; + + rc = bind(sock, reinterpret_cast(&bind_addr), sizeof(bind_addr)); + if (rc < 0) { + close(sock); + return {StatusCode::kInternal, + "error binding socket (code: " + std::to_string(rc) + ")"}; } + // Track seen devices to avoid duplicate callbacks + std::set seen_devices; + + auto start_time = std::chrono::steady_clock::now(); + char buffer[1024]; + while (true) { + // Check if we've exceeded the timeout + auto now = std::chrono::steady_clock::now(); + auto elapsed_ms = std::chrono::duration_cast(now - start_time).count(); + if (elapsed_ms >= timeout_ms) { + break; + } + + // Calculate remaining time for select (use 1 second intervals for responsiveness) + auto remaining_ms = std::min(static_cast(1000), + static_cast(timeout_ms - elapsed_ms)); + timeval timeout = { + static_cast(remaining_ms / 1000), + static_cast((remaining_ms % 1000) * 1000) + }; + + fd_set fbuffer; FD_ZERO(&fbuffer); FD_SET(sock, &fbuffer); - int rc = select(sock + 1, &fbuffer, NULL, NULL, &timeout); + int rc = select(sock + 1, &fbuffer, nullptr, nullptr, &timeout); if (rc == 0) { - break; - + continue; } else if (rc < 0) { break; + } - } else if (FD_ISSET(sock, &fbuffer)) { - socklen_t saddr_len = sizeof(saddr); - int recv_len = recvfrom(sock, buffer, sizeof(buffer) - 1, 0, reinterpret_cast(&saddr), &saddr_len); + if (FD_ISSET(sock, &fbuffer)) { + sockaddr_in sender_addr; + socklen_t sender_len = sizeof(sender_addr); + int recv_len = recvfrom(sock, buffer, sizeof(buffer) - 1, 0, + reinterpret_cast(&sender_addr), &sender_len); if (recv_len < 0) { continue; } buffer[recv_len] = '\0'; + + // Parse the response: "ID serial capability port name" std::istringstream response(buffer); std::vector tokens; std::string token; @@ -157,22 +302,39 @@ auto discover(unsigned int timeout_ms, tokens.push_back(token); } - DeviceAdvertisement res; - auto s = parse(host, tokens, &res); + if (tokens.empty() || tokens[0] != "ID") { + continue; + } + + // Get sender's IP address + char sender_ip[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &sender_addr.sin_addr, sender_ip, INET_ADDRSTRLEN); + + DeviceAdvertisement device; + auto s = parse(sender_ip, tokens, &device); if (!s.ok()) { - break; + continue; } - if (!validate_capability(res.capability)) { + if (!validate_capability(device.capability)) { continue; } - if (discovered != nullptr) { - discovered->push_back(res); + // Check for duplicates using serial + host as key + std::string device_key = device.serial + ":" + device.host; + if (seen_devices.find(device_key) != seen_devices.end()) { + continue; + } + seen_devices.insert(device_key); + + // Call the callback; if it returns false, stop discovery + if (!callback(device)) { + break; } } } + close(sock); return {}; } diff --git a/test/nodes/test_stream_out.cpp b/test/nodes/test_stream_out.cpp deleted file mode 100644 index 1724f6a..0000000 --- a/test/nodes/test_stream_out.cpp +++ /dev/null @@ -1,235 +0,0 @@ -#include -#include -#include -#include -#include - -using namespace synapse; -using science::libndtp::NDTP_VERSION; -using science::libndtp::NDTPMessage; -using science::libndtp::NDTPHeader; -using science::libndtp::NDTPPayloadBroadband; -using science::libndtp::NDTPPayloadSpiketrain; -using science::libndtp::BinnedSpiketrainData; -using science::libndtp::ByteArray; -using science::libndtp::ElectricalBroadbandData; -using science::libndtp::SynapseData; - -class StreamOutTest : public ::testing::Test { -protected: - void SetUp() override { - } -}; - -TEST_F(StreamOutTest, Constructor) { - StreamOut stream_out("127.0.0.1", 12345, "test label"); - NodeConfig proto; - EXPECT_TRUE(stream_out.to_proto(&proto).ok()); - - EXPECT_TRUE(proto.has_stream_out()); - const auto& config = proto.stream_out(); - EXPECT_TRUE(config.has_udp_unicast()); - EXPECT_EQ(config.label(), "test label"); - EXPECT_EQ(config.udp_unicast().destination_address(), "127.0.0.1"); - EXPECT_EQ(config.udp_unicast().destination_port(), 12345); -} - -TEST_F(StreamOutTest, DefaultConstructor) { - StreamOut stream_out; - NodeConfig proto; - EXPECT_TRUE(stream_out.to_proto(&proto).ok()); - - EXPECT_TRUE(proto.has_stream_out()); - const auto& config = proto.stream_out(); - EXPECT_TRUE(config.has_udp_unicast()); - EXPECT_EQ(config.label(), ""); - EXPECT_EQ(config.udp_unicast().destination_port(), StreamOut::DEFAULT_STREAM_OUT_PORT); -} - -TEST_F(StreamOutTest, FromProto) { - NodeConfig proto; - auto* stream_out_config = proto.mutable_stream_out(); - auto* udp_config = stream_out_config->mutable_udp_unicast(); - udp_config->set_destination_address("192.168.1.1"); - udp_config->set_destination_port(9999); - stream_out_config->set_label("test label 123"); - - std::shared_ptr node; - EXPECT_TRUE(StreamOut::from_proto(proto, &node).ok()); - - // Convert back to proto to verify - NodeConfig result_proto; - EXPECT_TRUE(node->to_proto(&result_proto).ok()); - - EXPECT_TRUE(result_proto.has_stream_out()); - const auto& result_config = result_proto.stream_out(); - EXPECT_EQ(result_config.label(), "test label 123"); - EXPECT_EQ(result_config.udp_unicast().destination_address(), "192.168.1.1"); - EXPECT_EQ(result_config.udp_unicast().destination_port(), 9999); -} - -TEST_F(StreamOutTest, FromProtoDefaults) { - NodeConfig proto; - proto.mutable_stream_out(); // Empty config - - std::shared_ptr node; - EXPECT_TRUE(StreamOut::from_proto(proto, &node).ok()); - - NodeConfig result_proto; - EXPECT_TRUE(node->to_proto(&result_proto).ok()); - - EXPECT_TRUE(result_proto.has_stream_out()); - const auto& config = result_proto.stream_out(); - EXPECT_EQ(config.udp_unicast().destination_port(), StreamOut::DEFAULT_STREAM_OUT_PORT); -} - -TEST_F(StreamOutTest, FromProtoInvalid) { - NodeConfig proto; - // Don't set stream_out config - - std::shared_ptr node; - auto status = StreamOut::from_proto(proto, &node); - EXPECT_FALSE(status.ok()); - EXPECT_EQ(status.code(), science::StatusCode::kInvalidArgument); -} - -TEST_F(StreamOutTest, ReadBroadbandData) { - StreamOut stream_out("127.0.0.1", 12345, "test"); - auto status = stream_out.init(); - EXPECT_TRUE(status.ok()) << status.message(); - - NDTPPayloadBroadband payload{ - .is_signed = true, - .bit_width = 16, - .ch_count = 2, - .sample_rate = 30000, - .channels = { - { - .channel_id = 1, - .channel_data = {1000, 2000, 3000} - }, - { - .channel_id = 2, - .channel_data = {4000, 5000, 6000} - } - } - }; - - NDTPMessage msg{ - .header = { - .version = NDTP_VERSION, - .data_type = synapse::DataType::kBroadband, - .timestamp = 123456789, - .seq_number = 1 - }, - .payload = payload - }; - - auto bytes = msg.pack(); - int sock = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_GT(sock, 0); - - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(12345); - inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); - - auto rc = sendto(sock, bytes.data(), bytes.size(), 0, - reinterpret_cast(&addr), sizeof(addr)); - ASSERT_EQ(rc, bytes.size()); - - SynapseData received_data; - status = stream_out.read(&received_data); - EXPECT_TRUE(status.ok()) << status.message(); - - ASSERT_TRUE(std::holds_alternative(received_data)); - const auto& broadband = std::get(received_data); - - EXPECT_EQ(broadband.sample_rate, 30000); - ASSERT_EQ(broadband.channels.size(), 2); - EXPECT_EQ(broadband.channels[0].channel_id, 1); - EXPECT_EQ(broadband.channels[1].channel_id, 2); - - std::vector expected_ch1 = {1000, 2000, 3000}; - std::vector expected_ch2 = {4000, 5000, 6000}; - for (size_t i = 0; i < expected_ch1.size(); i++) { - EXPECT_EQ(broadband.channels[0].channel_data[i], expected_ch1[i]); - } - for (size_t i = 0; i < expected_ch2.size(); i++) { - EXPECT_EQ(broadband.channels[1].channel_data[i], expected_ch2[i]); - } - - close(sock); -} - -TEST_F(StreamOutTest, ReadSpiketrainData) { - StreamOut stream_out("127.0.0.1", 12346, "test"); - auto status = stream_out.init(); - EXPECT_TRUE(status.ok()) << status.message(); - - NDTPPayloadSpiketrain payload{ - .bin_size_ms = 1, - .spike_counts = {3, 0, 2, 1} - }; - - NDTPMessage msg{ - .header = { - .version = NDTP_VERSION, - .data_type = synapse::DataType::kSpiketrain, - .timestamp = 123456789, - .seq_number = 1 - }, - .payload = payload - }; - - auto bytes = msg.pack(); - int sock = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_GT(sock, 0); - - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(12346); - inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); - - auto rc = sendto(sock, bytes.data(), bytes.size(), 0, - reinterpret_cast(&addr), sizeof(addr)); - ASSERT_EQ(rc, bytes.size()); - - SynapseData received_data; - status = stream_out.read(&received_data); - EXPECT_TRUE(status.ok()) << status.message(); - - ASSERT_TRUE(std::holds_alternative(received_data)); - const auto& spiketrain = std::get(received_data); - - EXPECT_EQ(spiketrain.bin_size_ms, 1); - EXPECT_EQ(spiketrain.spike_counts, std::vector({3, 0, 2, 1})); - - close(sock); -} - -TEST_F(StreamOutTest, ReadInvalidData) { - StreamOut stream_out("127.0.0.1", 12347, "test"); - auto status = stream_out.init(); - EXPECT_TRUE(status.ok()) << status.message(); - - std::vector invalid_data = {0x01, 0x02, 0x03}; - int sock = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_GT(sock, 0); - - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(12347); - inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); - - auto rc = sendto(sock, invalid_data.data(), invalid_data.size(), 0, - reinterpret_cast(&addr), sizeof(addr)); - ASSERT_EQ(rc, invalid_data.size()); - - SynapseData received_data; - status = stream_out.read(&received_data); - EXPECT_FALSE(status.ok()); - EXPECT_EQ(status.code(), science::StatusCode::kInternal); - - close(sock); -} diff --git a/vcpkg.json b/vcpkg.json index a071352..42dfc46 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -4,7 +4,7 @@ "version": "0.1.0", "supports": "arm64 | x64 | linux | osx", "default-features": ["examples", "tests"], - "dependencies": ["grpc", "protobuf", "science-libndtp", "science-scipp"], + "dependencies": ["grpc", "protobuf", "cppzmq", "science-scipp"], "vcpkg-configuration": { "overlay-ports": ["./external/sciencecorp/vcpkg/ports"] }, diff --git a/version.txt b/version.txt index 3eefcb9..e88e200 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -1.0.0 +2.0.0-rc1